Skip to main content

ankurah_proto/
request.rs

1use ankql::ast;
2use serde::{Deserialize, Serialize};
3use ulid::Ulid;
4
5use crate::{
6    auth::Attested, clock::Clock, collection::CollectionId, data::Event, id::EntityId, subscription::QueryId, transaction::TransactionId,
7    EntityState, EventFragment, EventId, StateFragment,
8};
9
10#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Hash, Default)]
11pub struct RequestId(Ulid);
12
13impl std::fmt::Display for RequestId {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        let id_str = self.0.to_string();
16        write!(f, "R{}", &id_str[20..])
17    }
18}
19
20impl RequestId {
21    pub fn new() -> Self { Self(Ulid::new()) }
22}
23
24/// A request from one node to another
25#[derive(Debug, Serialize, Deserialize)]
26pub struct NodeRequest {
27    pub id: RequestId,
28    pub to: EntityId,
29    pub from: EntityId,
30    pub body: NodeRequestBody,
31}
32
33/// Entity with known head for lineage attestation
34#[derive(Serialize, Deserialize, Clone, Debug)]
35pub struct KnownEntity {
36    pub entity_id: EntityId,
37    pub head: Clock,
38}
39
40/// Causal relation between two clocks: `subject` (local) vs `other`.
41/// - A `Clock` is a normalized antichain frontier (a lattice point).
42/// - `meet` is the GCA frontier: Max(Past(subject) ∩ Past(other)).
43#[derive(Serialize, Deserialize, Clone, Debug)]
44pub enum CausalRelation {
45    /// Identical lattice points.
46    Equal,
47
48    /// Subject strictly after other: Past(subject) ⊃ Past(other).
49    /// Action: apply other's state directly.
50    StrictDescends,
51
52    /// Subject strictly before other: Past(subject) ⊂ Past(other).
53    /// Action: no-op (keep subject).
54    StrictAscends,
55
56    /// Both sides have advanced since the meet (GCA).
57    /// `subject`/`other` are minimal antichains after `meet`.
58    DivergedSince {
59        /// GCA frontier (meet).
60        meet: Clock,
61        /// Minimal subject frontier after `meet`.
62        subject: Clock,
63        /// Minimal other frontier after `meet`.
64        other: Clock,
65    },
66
67    /// Proven different genesis events (single-root invariant).
68    /// Optional `gca` records any common non-minimal ancestors discovered en route.
69    Disjoint {
70        /// Optional non-minimal common ancestors (if any were found).
71        gca: Option<Clock>,
72        /// Proven genesis of subject.
73        subject_root: EventId,
74        /// Proven genesis of other.
75        other_root: EventId,
76    },
77
78    /// Traversal could not complete under budget; return current frontiers to resume.
79    BudgetExceeded { subject: Clock, other: Clock },
80}
81
82// Not actually sent over the wire - but used for validating lineage attestations (and converted to/from EntityHeadRelationFragment)
83#[derive(Serialize, Deserialize, Clone, Debug)]
84pub struct CausalAssertion {
85    pub entity_id: EntityId,
86    pub subject: Clock,
87    pub other: Clock,
88    // Directionality: subject CausalRelations other
89    pub relation: CausalRelation,
90}
91
92/// Wire-minimal lineage attestation (omits heads that are reconstructible)
93#[derive(Serialize, Deserialize, Clone, Debug)]
94pub struct CausalAssertionFragment {
95    pub relation: CausalRelation,
96    pub attestations: crate::auth::AttestationSet,
97}
98
99/// Content for entity initialization - either bridge, state, or attested state
100#[derive(Serialize, Deserialize, Clone, Debug)]
101pub enum DeltaContent {
102    /// Entity not in known_matches; send full state snapshot
103    StateSnapshot { state: StateFragment },
104    /// Entity present in known matches with a small event gap
105    EventBridge { events: Vec<EventFragment> },
106    /// Entity present in known matches with a large event gap; send state + causal assertion
107    StateAndRelation { state: StateFragment, relation: CausalAssertionFragment },
108}
109
110/// Entity initialization data returned in QuerySubscribed and Fetch
111#[derive(Serialize, Deserialize, Clone, Debug)]
112pub struct EntityDelta {
113    pub entity_id: EntityId,
114    pub collection: CollectionId,
115    pub content: DeltaContent,
116}
117
118/// The body of a request from one node to another
119#[derive(Debug, Serialize, Deserialize)]
120pub enum NodeRequestBody {
121    // Request that the Events to be committed on the remote node
122    CommitTransaction { id: TransactionId, events: Vec<Attested<Event>> },
123    // Request to fetch entities matching a predicate
124    Get { collection: CollectionId, ids: Vec<EntityId> },
125    GetEvents { collection: CollectionId, event_ids: Vec<EventId> },
126    Fetch { collection: CollectionId, selection: ast::Selection, known_matches: Vec<KnownEntity> },
127    SubscribeQuery { query_id: QueryId, collection: CollectionId, selection: ast::Selection, version: u32, known_matches: Vec<KnownEntity> },
128}
129
130/// A response from one node to another
131#[derive(Debug, Serialize, Deserialize)]
132pub struct NodeResponse {
133    pub request_id: RequestId,
134    pub from: EntityId,
135    pub to: EntityId,
136    pub body: NodeResponseBody,
137}
138
139#[derive(Debug, Serialize, Deserialize)]
140pub enum NodeResponseBody {
141    // Response to CommitEvents
142    CommitComplete { id: TransactionId },
143    Fetch(Vec<EntityDelta>),
144    Get(Vec<Attested<EntityState>>),
145    GetEvents(Vec<Attested<Event>>),
146    QuerySubscribed { query_id: QueryId, deltas: Vec<EntityDelta> },
147    Success,
148    Error(String),
149}
150
151impl std::fmt::Display for NodeRequest {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        write!(f, "Request {} from {}->{}: {}", self.id, self.from, self.to, self.body)
154    }
155}
156
157impl std::fmt::Display for NodeResponse {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        write!(f, "Response({}) {}->{} {}", self.request_id, self.from, self.to, self.body)
160    }
161}
162
163impl std::fmt::Display for NodeRequestBody {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        match self {
166            NodeRequestBody::CommitTransaction { id, events } => {
167                write!(f, "CommitTransaction {id} [{}]", events.iter().map(|e| format!("{}", e)).collect::<Vec<_>>().join(", "))
168            }
169            NodeRequestBody::Get { collection, ids } => {
170                write!(f, "Get {collection} {}", ids.iter().map(|id| id.to_base64_short()).collect::<Vec<_>>().join(", "))
171            }
172            NodeRequestBody::GetEvents { collection, event_ids } => {
173                write!(f, "GetEvents {collection} {}", event_ids.iter().map(|id| id.to_base64_short()).collect::<Vec<_>>().join(", "),)
174            }
175            NodeRequestBody::Fetch { collection, selection: query, known_matches } => {
176                write!(f, "Fetch {collection} {query} known:{}", known_matches.len())
177            }
178            NodeRequestBody::SubscribeQuery { query_id, collection, selection: query, version, known_matches } => {
179                write!(f, "Subscribe {query_id} {collection} {query} v{version} known:{}", known_matches.len())
180            }
181        }
182    }
183}
184impl std::fmt::Display for NodeResponseBody {
185    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186        match self {
187            NodeResponseBody::CommitComplete { id } => write!(f, "CommitComplete {id}"),
188            NodeResponseBody::Fetch(deltas) => {
189                write!(f, "Fetch [{}]", deltas.len()) // TODO display deltas
190            }
191            NodeResponseBody::Get(states) => {
192                write!(f, "Get [{}]", states.iter().map(|s| s.to_string()).collect::<Vec<_>>().join(", "))
193            }
194            NodeResponseBody::GetEvents(events) => {
195                write!(f, "GetEvents [{}]", events.iter().map(|e| e.payload.to_string()).collect::<Vec<_>>().join(", "))
196            }
197            NodeResponseBody::QuerySubscribed { query_id, deltas: initial } => write!(f, "Subscribed {query_id} initial:{}", initial.len()),
198            NodeResponseBody::Success => write!(f, "Success"),
199            NodeResponseBody::Error(e) => write!(f, "Error: {e}"),
200        }
201    }
202}
203
204impl std::fmt::Display for EntityDelta {
205    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
206        match &self.content {
207            DeltaContent::StateSnapshot { state } => write!(f, "EntityDelta {}: StateSnapshot({})", self.entity_id, state),
208            DeltaContent::EventBridge { events } => {
209                let mut event_strs = Vec::new();
210                for event in events {
211                    let event = Attested::<Event>::from_parts(self.entity_id, self.collection.clone(), event.clone());
212                    event_strs.push(event.payload.to_string());
213                }
214                write!(f, "EntityDelta {}: EventBridge({})", self.entity_id, event_strs.join(", "))
215            }
216            DeltaContent::StateAndRelation { state, relation } => write!(f, "EntityDelta {}: StateAndRelation({})", self.entity_id, state),
217        }
218    }
219}