Skip to main content

lora_store/
mutation.rs

1//! Mutation events and the optional recorder hook.
2//!
3//! [`MutationEvent`] is the vocabulary a write-ahead log (or any observer —
4//! replication, audit, change-data-capture) appends to a durable stream.
5//! The enum covers every method on [`GraphStorageMut`]: each event carries
6//! exactly the information needed to deterministically re-apply the mutation
7//! against an empty store (or a snapshot) and recover the same state.
8//!
9//! [`MutationRecorder`] is the observer trait. Backends that want to emit
10//! events install a recorder via [`InMemoryGraph::set_mutation_recorder`].
11//! The default is `None` so zero-WAL workloads pay only a null-pointer check
12//! per mutation — no allocation, no clone.
13//!
14//! The persistent WAL implementation lives in the `lora-wal` crate, which
15//! supplies a `WalRecorder` that implements `MutationRecorder` by
16//! appending each event to an on-disk log. The snapshot header's
17//! `wal_lsn` field is what makes the checkpoint hybrid expressible
18//! across crate boundaries without `lora-store` learning about the WAL.
19
20use serde::{Deserialize, Serialize};
21
22use crate::{NodeId, Properties, PropertyValue, RelationshipId};
23
24/// A durable, replayable mutation against a graph store.
25///
26/// Each variant mirrors a method on `GraphStorageMut`. Applying every event
27/// in order against a store initialised from the snapshot whose `wal_lsn`
28/// immediately precedes the first event reproduces the committed state.
29///
30/// The enum derives `Serialize`/`Deserialize` for non-WAL observers and
31/// tooling; the production WAL uses its own compact tagged codec.
32#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33pub enum MutationEvent {
34    CreateNode {
35        /// Id the backend allocated for the new node. Captured so replay
36        /// against a clean store produces the same id assignment as the
37        /// original (`next_node_id` advances deterministically).
38        id: NodeId,
39        labels: Vec<String>,
40        properties: Properties,
41    },
42    CreateRelationship {
43        id: RelationshipId,
44        src: NodeId,
45        dst: NodeId,
46        rel_type: String,
47        properties: Properties,
48    },
49    SetNodeProperty {
50        node_id: NodeId,
51        key: String,
52        value: PropertyValue,
53    },
54    RemoveNodeProperty {
55        node_id: NodeId,
56        key: String,
57    },
58    AddNodeLabel {
59        node_id: NodeId,
60        label: String,
61    },
62    RemoveNodeLabel {
63        node_id: NodeId,
64        label: String,
65    },
66    SetRelationshipProperty {
67        rel_id: RelationshipId,
68        key: String,
69        value: PropertyValue,
70    },
71    RemoveRelationshipProperty {
72        rel_id: RelationshipId,
73        key: String,
74    },
75    DeleteRelationship {
76        rel_id: RelationshipId,
77    },
78    DeleteNode {
79        node_id: NodeId,
80    },
81    DetachDeleteNode {
82        node_id: NodeId,
83    },
84    Clear,
85}
86
87/// Observer that receives every successful mutation in the order the store
88/// applied it.
89///
90/// The recorder sees events *after* the mutation has been applied to the
91/// in-memory state, so it never observes a mutation that the store
92/// rejected (invalid id, empty relationship type, …). This matches the
93/// classic write-ahead-log convention of logging committed changes only.
94///
95/// Implementations must be `Send + Sync` so a shared recorder can be driven
96/// from any thread holding the store's write lock.
97pub trait MutationRecorder: Send + Sync + 'static {
98    fn record(&self, event: MutationEvent);
99
100    /// Sticky failure flag for durability-shaped recorders.
101    ///
102    /// `record` itself is infallible — non-WAL observers (audit taps,
103    /// replication shadows, CDC sinks) should not abort a write because
104    /// their downstream queue is full. Recorders that *do* care about
105    /// durability — most importantly the WAL adapter — flip a flag when
106    /// an append fails and surface it here. The host (typically
107    /// `Database::execute_with_params`) polls this once per critical
108    /// section while still holding the store write lock; if poisoned, the
109    /// query fails loudly and the caller observes the durability error
110    /// rather than a silently-lost write.
111    ///
112    /// The default returns `None`, so existing recorders compile
113    /// unchanged.
114    fn poisoned(&self) -> Option<String> {
115        None
116    }
117}
118
119/// Convenience adapter that turns any `Fn(MutationEvent) + Send + Sync`
120/// into a `MutationRecorder` — useful in tests and for quick wiring.
121pub struct ClosureRecorder<F>(pub F)
122where
123    F: Fn(MutationEvent) + Send + Sync + 'static;
124
125impl<F> MutationRecorder for ClosureRecorder<F>
126where
127    F: Fn(MutationEvent) + Send + Sync + 'static,
128{
129    fn record(&self, event: MutationEvent) {
130        (self.0)(event)
131    }
132}
133
134/// Set of record ids touched by a buffered [`MutationEvent`] stream.
135///
136/// Built incrementally as events buffer (or in one pass at commit
137/// time) by [`MutationWriteSet::extend_from_events`]. Used by the OCC
138/// auto-commit path to (a) sort lock-acquire on commit, (b) validate
139/// per-record Arc identity against the snapshot.
140#[derive(Debug, Default, Clone)]
141pub struct MutationWriteSet {
142    /// Nodes whose record was created, modified, or deleted.
143    pub nodes: std::collections::BTreeSet<NodeId>,
144    /// Relationships whose record was created, modified, or deleted.
145    pub rels: std::collections::BTreeSet<RelationshipId>,
146    /// `true` if the stream contained a `MutationEvent::Clear`. A
147    /// clear invalidates any per-record check — the writer must
148    /// fall back to a full-graph commit (or fail under OCC).
149    pub cleared: bool,
150}
151
152impl MutationWriteSet {
153    pub fn new() -> Self {
154        Self::default()
155    }
156
157    /// Walk a `MutationEvent` stream and accumulate every touched
158    /// record id. Variants that touch two records (e.g.
159    /// `CreateRelationship` mentions both `src` and `dst` plus the
160    /// new relationship) record both nodes — the writer's view of
161    /// those nodes' adjacency changed too.
162    pub fn extend_from_events<'a>(&mut self, events: impl IntoIterator<Item = &'a MutationEvent>) {
163        for event in events {
164            match event {
165                MutationEvent::CreateNode { id, .. } => {
166                    self.nodes.insert(*id);
167                }
168                MutationEvent::CreateRelationship { id, src, dst, .. } => {
169                    self.rels.insert(*id);
170                    self.nodes.insert(*src);
171                    self.nodes.insert(*dst);
172                }
173                MutationEvent::SetNodeProperty { node_id, .. }
174                | MutationEvent::RemoveNodeProperty { node_id, .. }
175                | MutationEvent::AddNodeLabel { node_id, .. }
176                | MutationEvent::RemoveNodeLabel { node_id, .. } => {
177                    self.nodes.insert(*node_id);
178                }
179                MutationEvent::SetRelationshipProperty { rel_id, .. }
180                | MutationEvent::RemoveRelationshipProperty { rel_id, .. } => {
181                    self.rels.insert(*rel_id);
182                }
183                MutationEvent::DeleteRelationship { rel_id } => {
184                    self.rels.insert(*rel_id);
185                }
186                MutationEvent::DeleteNode { node_id } => {
187                    self.nodes.insert(*node_id);
188                }
189                MutationEvent::DetachDeleteNode { node_id } => {
190                    // Detach-delete also touches every incident
191                    // relationship, but those fire as
192                    // `DeleteRelationship` events of their own and
193                    // the surrounding loop will pick them up.
194                    self.nodes.insert(*node_id);
195                }
196                MutationEvent::Clear => {
197                    self.cleared = true;
198                }
199            }
200        }
201    }
202
203    pub fn is_empty(&self) -> bool {
204        !self.cleared && self.nodes.is_empty() && self.rels.is_empty()
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn write_set_extracts_ids_from_events() {
214        let events = [
215            MutationEvent::CreateNode {
216                id: 1,
217                labels: vec!["A".into()],
218                properties: Default::default(),
219            },
220            MutationEvent::CreateRelationship {
221                id: 10,
222                src: 1,
223                dst: 2,
224                rel_type: "R".into(),
225                properties: Default::default(),
226            },
227            MutationEvent::SetNodeProperty {
228                node_id: 3,
229                key: "x".into(),
230                value: PropertyValue::Int(5),
231            },
232            MutationEvent::DeleteRelationship { rel_id: 11 },
233        ];
234
235        let mut ws = MutationWriteSet::new();
236        ws.extend_from_events(events.iter());
237
238        // CreateRelationship pulls in src=1, dst=2 alongside its own rel id.
239        assert_eq!(ws.nodes.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3]);
240        assert_eq!(ws.rels.iter().copied().collect::<Vec<_>>(), vec![10, 11]);
241        assert!(!ws.cleared);
242    }
243
244    #[test]
245    fn write_set_clear_event_is_sticky() {
246        let mut ws = MutationWriteSet::new();
247        ws.extend_from_events([&MutationEvent::Clear]);
248        assert!(ws.cleared);
249        assert!(!ws.is_empty()); // cleared counts as non-empty
250    }
251}