Skip to main content

moire_types/
diff.rs

1use facet::Facet;
2use std::fmt;
3
4use crate::{Edge, EdgeKind, Entity, EntityId, Event, Scope, ScopeId, Snapshot};
5
6/// Monotonic sequence number within one process change stream.
7///
8/// Sequence numbers are append-only and strictly increasing.
9#[derive(Facet, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
10#[facet(transparent)]
11pub struct SeqNo(pub u64);
12
13impl SeqNo {
14    pub const ZERO: Self = Self(0);
15
16    pub fn next(self) -> Self {
17        Self(self.0.saturating_add(1))
18    }
19}
20
21/// Identity of one append-only change stream.
22///
23/// This should come from protocol handshake/session identity and stay stable
24/// for the lifetime of that runtime stream.
25#[derive(Facet, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
26#[facet(transparent)]
27pub struct StreamId(pub String);
28
29/// Logical barrier identifier used to coordinate multi-process "cuts".
30#[derive(Facet, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
31#[facet(transparent)]
32pub struct CutId(pub String);
33
34impl CutId {
35    pub fn new(id: impl Into<String>) -> Self {
36        Self(id.into())
37    }
38
39    pub fn from_ordinal(value: u64) -> Self {
40        Self(format!("cut:{value}"))
41    }
42
43    pub fn as_str(&self) -> &str {
44        self.0.as_str()
45    }
46}
47
48impl fmt::Display for CutId {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        f.write_str(&self.0)
51    }
52}
53
54/// One canonical graph mutation in the append-only stream.
55#[derive(Facet)]
56#[repr(u8)]
57#[facet(rename_all = "snake_case")]
58pub enum Change {
59    /// Insert or replace entity state.
60    UpsertEntity(Entity),
61    /// Insert or replace scope state.
62    UpsertScope(Scope),
63    /// Remove entity and any incident edges in materialized state.
64    RemoveEntity { id: EntityId },
65    /// Remove scope in materialized state.
66    RemoveScope { id: ScopeId },
67    /// Insert or replace entity-scope membership state.
68    UpsertEntityScopeLink {
69        entity_id: EntityId,
70        scope_id: ScopeId,
71    },
72    /// Remove entity-scope membership state.
73    RemoveEntityScopeLink {
74        entity_id: EntityId,
75        scope_id: ScopeId,
76    },
77    /// Insert or replace edge state.
78    UpsertEdge(Edge),
79    /// Remove a specific edge in materialized state.
80    RemoveEdge {
81        src: EntityId,
82        dst: EntityId,
83        kind: EdgeKind,
84    },
85    /// Append event to timeline/event log.
86    AppendEvent(Event),
87}
88
89/// A sequence-stamped change item.
90#[derive(Facet)]
91pub struct StampedChange {
92    pub seq_no: SeqNo,
93    pub change: Change,
94}
95
96/// Pull request for change stream deltas.
97#[derive(Facet)]
98pub struct PullChangesRequest {
99    /// Stream identity chosen during process handshake.
100    pub stream_id: StreamId,
101    /// First sequence number the caller does not yet have.
102    pub from_seq_no: SeqNo,
103    /// Upper bound on number of changes to return.
104    pub max_changes: u32,
105}
106
107/// Delta response for one pull step.
108#[derive(Facet)]
109pub struct PullChangesResponse {
110    /// Stream identity chosen during process handshake.
111    pub stream_id: StreamId,
112    /// Echo of request cursor.
113    pub from_seq_no: SeqNo,
114    /// Next cursor the caller should request.
115    pub next_seq_no: SeqNo,
116    /// Returned changes in ascending `seq_no`.
117    pub changes: Vec<StampedChange>,
118    /// `true` when additional changes are available after this batch.
119    pub truncated: bool,
120    /// When present, requested history before this cursor was compacted away.
121    ///
122    /// Consumers should rebuild from a checkpoint and resume from this cursor.
123    #[facet(skip_unless_truthy)]
124    pub compacted_before_seq_no: Option<SeqNo>,
125}
126
127/// Last durable/applied cursor for one stream.
128#[derive(Facet)]
129pub struct StreamCursor {
130    pub stream_id: StreamId,
131    pub next_seq_no: SeqNo,
132}
133
134/// Server-to-process request to acknowledge current cursor for a cut.
135#[derive(Facet)]
136pub struct CutRequest {
137    pub cut_id: CutId,
138}
139
140/// Process-to-server acknowledgement of cursor for a cut barrier.
141#[derive(Facet)]
142pub struct CutAck {
143    pub cut_id: CutId,
144    pub cursor: StreamCursor,
145}
146
147/// Optional periodic checkpoint to bound replay time.
148///
149/// Checkpoints are full materialized state; deltas after `at_seq_no`
150/// replay on top.
151#[derive(Facet)]
152pub struct DiffCheckpoint {
153    pub stream_id: StreamId,
154    pub at_seq_no: SeqNo,
155    pub snapshot: Snapshot,
156}