1use ankql::ast;
2use serde::{Deserialize, Serialize};
3use ulid::Ulid;
4
5use crate::{
6 auth::Attested, clock::Clock, collection::CollectionId, data::Event, id::EntityId, subscription::QueryId, transaction::TransactionId,
7 EntityState, EventFragment, EventId, StateFragment,
8};
9
10#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Hash, Default)]
11pub struct RequestId(Ulid);
12
13impl std::fmt::Display for RequestId {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 let id_str = self.0.to_string();
16 write!(f, "R{}", &id_str[20..])
17 }
18}
19
20impl RequestId {
21 pub fn new() -> Self { Self(Ulid::new()) }
22}
23
24#[derive(Debug, Serialize, Deserialize)]
26pub struct NodeRequest {
27 pub id: RequestId,
28 pub to: EntityId,
29 pub from: EntityId,
30 pub body: NodeRequestBody,
31}
32
33#[derive(Serialize, Deserialize, Clone, Debug)]
35pub struct KnownEntity {
36 pub entity_id: EntityId,
37 pub head: Clock,
38}
39
40#[derive(Serialize, Deserialize, Clone, Debug)]
44pub enum CausalRelation {
45 Equal,
47
48 StrictDescends,
51
52 StrictAscends,
55
56 DivergedSince {
59 meet: Clock,
61 subject: Clock,
63 other: Clock,
65 },
66
67 Disjoint {
70 gca: Option<Clock>,
72 subject_root: EventId,
74 other_root: EventId,
76 },
77
78 BudgetExceeded { subject: Clock, other: Clock },
80}
81
82#[derive(Serialize, Deserialize, Clone, Debug)]
84pub struct CausalAssertion {
85 pub entity_id: EntityId,
86 pub subject: Clock,
87 pub other: Clock,
88 pub relation: CausalRelation,
90}
91
92#[derive(Serialize, Deserialize, Clone, Debug)]
94pub struct CausalAssertionFragment {
95 pub relation: CausalRelation,
96 pub attestations: crate::auth::AttestationSet,
97}
98
99#[derive(Serialize, Deserialize, Clone, Debug)]
101pub enum DeltaContent {
102 StateSnapshot { state: StateFragment },
104 EventBridge { events: Vec<EventFragment> },
106 StateAndRelation { state: StateFragment, relation: CausalAssertionFragment },
108}
109
110#[derive(Serialize, Deserialize, Clone, Debug)]
112pub struct EntityDelta {
113 pub entity_id: EntityId,
114 pub collection: CollectionId,
115 pub content: DeltaContent,
116}
117
118#[derive(Debug, Serialize, Deserialize)]
120pub enum NodeRequestBody {
121 CommitTransaction { id: TransactionId, events: Vec<Attested<Event>> },
123 Get { collection: CollectionId, ids: Vec<EntityId> },
125 GetEvents { collection: CollectionId, event_ids: Vec<EventId> },
126 Fetch { collection: CollectionId, selection: ast::Selection, known_matches: Vec<KnownEntity> },
127 SubscribeQuery { query_id: QueryId, collection: CollectionId, selection: ast::Selection, version: u32, known_matches: Vec<KnownEntity> },
128}
129
130#[derive(Debug, Serialize, Deserialize)]
132pub struct NodeResponse {
133 pub request_id: RequestId,
134 pub from: EntityId,
135 pub to: EntityId,
136 pub body: NodeResponseBody,
137}
138
139#[derive(Debug, Serialize, Deserialize)]
140pub enum NodeResponseBody {
141 CommitComplete { id: TransactionId },
143 Fetch(Vec<EntityDelta>),
144 Get(Vec<Attested<EntityState>>),
145 GetEvents(Vec<Attested<Event>>),
146 QuerySubscribed { query_id: QueryId, deltas: Vec<EntityDelta> },
147 Success,
148 Error(String),
149}
150
151impl std::fmt::Display for NodeRequest {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 write!(f, "Request {} from {}->{}: {}", self.id, self.from, self.to, self.body)
154 }
155}
156
157impl std::fmt::Display for NodeResponse {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 write!(f, "Response({}) {}->{} {}", self.request_id, self.from, self.to, self.body)
160 }
161}
162
163impl std::fmt::Display for NodeRequestBody {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 match self {
166 NodeRequestBody::CommitTransaction { id, events } => {
167 write!(f, "CommitTransaction {id} [{}]", events.iter().map(|e| format!("{}", e)).collect::<Vec<_>>().join(", "))
168 }
169 NodeRequestBody::Get { collection, ids } => {
170 write!(f, "Get {collection} {}", ids.iter().map(|id| id.to_base64_short()).collect::<Vec<_>>().join(", "))
171 }
172 NodeRequestBody::GetEvents { collection, event_ids } => {
173 write!(f, "GetEvents {collection} {}", event_ids.iter().map(|id| id.to_base64_short()).collect::<Vec<_>>().join(", "),)
174 }
175 NodeRequestBody::Fetch { collection, selection: query, known_matches } => {
176 write!(f, "Fetch {collection} {query} known:{}", known_matches.len())
177 }
178 NodeRequestBody::SubscribeQuery { query_id, collection, selection: query, version, known_matches } => {
179 write!(f, "Subscribe {query_id} {collection} {query} v{version} known:{}", known_matches.len())
180 }
181 }
182 }
183}
184impl std::fmt::Display for NodeResponseBody {
185 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 match self {
187 NodeResponseBody::CommitComplete { id } => write!(f, "CommitComplete {id}"),
188 NodeResponseBody::Fetch(deltas) => {
189 write!(f, "Fetch [{}]", deltas.len()) }
191 NodeResponseBody::Get(states) => {
192 write!(f, "Get [{}]", states.iter().map(|s| s.to_string()).collect::<Vec<_>>().join(", "))
193 }
194 NodeResponseBody::GetEvents(events) => {
195 write!(f, "GetEvents [{}]", events.iter().map(|e| e.payload.to_string()).collect::<Vec<_>>().join(", "))
196 }
197 NodeResponseBody::QuerySubscribed { query_id, deltas: initial } => write!(f, "Subscribed {query_id} initial:{}", initial.len()),
198 NodeResponseBody::Success => write!(f, "Success"),
199 NodeResponseBody::Error(e) => write!(f, "Error: {e}"),
200 }
201 }
202}
203
204impl std::fmt::Display for EntityDelta {
205 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206 match &self.content {
207 DeltaContent::StateSnapshot { state } => write!(f, "EntityDelta {}: StateSnapshot({})", self.entity_id, state),
208 DeltaContent::EventBridge { events } => {
209 let mut event_strs = Vec::new();
210 for event in events {
211 let event = Attested::<Event>::from_parts(self.entity_id, self.collection.clone(), event.clone());
212 event_strs.push(event.payload.to_string());
213 }
214 write!(f, "EntityDelta {}: EventBridge({})", self.entity_id, event_strs.join(", "))
215 }
216 DeltaContent::StateAndRelation { state, relation } => write!(f, "EntityDelta {}: StateAndRelation({})", self.entity_id, state),
217 }
218 }
219}