Skip to main content

lora_store/
mutation.rs

1//! Mutation events and the optional recorder hook.
2//!
3//! [`MutationEvent`] is the vocabulary a write-ahead log (or any observer —
4//! replication, audit, change-data-capture) appends to a durable stream.
5//! The enum covers every method on [`GraphStorageMut`]: each event carries
6//! exactly the information needed to deterministically re-apply the mutation
7//! against an empty store (or a snapshot) and recover the same state.
8//!
9//! [`MutationRecorder`] is the observer trait. Backends that want to emit
10//! events install a recorder via [`InMemoryGraph::set_mutation_recorder`].
11//! The default is `None` so zero-WAL workloads pay only a null-pointer check
12//! per mutation — no allocation, no clone.
13//!
14//! The persistent WAL implementation lives in the `lora-wal` crate, which
15//! supplies a `WalRecorder` that implements `MutationRecorder` by
16//! appending each event to an on-disk log. The snapshot header's
17//! `wal_lsn` field is what makes the checkpoint hybrid expressible
18//! across crate boundaries without `lora-store` learning about the WAL.
19
20use serde::{Deserialize, Serialize};
21
22use crate::memory::{ConstraintRequest, IndexRequest};
23use crate::{NodeId, Properties, PropertyValue, RelationshipId};
24
25/// A durable, replayable mutation against a graph store.
26///
27/// Each variant mirrors a method on `GraphStorageMut`. Applying every event
28/// in order against a store initialised from the snapshot whose `wal_lsn`
29/// immediately precedes the first event reproduces the committed state.
30///
31/// The enum derives `Serialize`/`Deserialize` for non-WAL observers and
32/// tooling; the production WAL uses its own compact tagged codec.
33#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
34pub enum MutationEvent {
35    CreateNode {
36        /// Id the backend allocated for the new node. Captured so replay
37        /// against a clean store produces the same id assignment as the
38        /// original (`next_node_id` advances deterministically).
39        id: NodeId,
40        labels: Vec<String>,
41        properties: Properties,
42    },
43    CreateRelationship {
44        id: RelationshipId,
45        src: NodeId,
46        dst: NodeId,
47        rel_type: String,
48        properties: Properties,
49    },
50    SetNodeProperty {
51        node_id: NodeId,
52        key: String,
53        value: PropertyValue,
54    },
55    RemoveNodeProperty {
56        node_id: NodeId,
57        key: String,
58    },
59    AddNodeLabel {
60        node_id: NodeId,
61        label: String,
62    },
63    RemoveNodeLabel {
64        node_id: NodeId,
65        label: String,
66    },
67    SetRelationshipProperty {
68        rel_id: RelationshipId,
69        key: String,
70        value: PropertyValue,
71    },
72    RemoveRelationshipProperty {
73        rel_id: RelationshipId,
74        key: String,
75    },
76    DeleteRelationship {
77        rel_id: RelationshipId,
78    },
79    DeleteNode {
80        node_id: NodeId,
81    },
82    DetachDeleteNode {
83        node_id: NodeId,
84    },
85    Clear,
86    /// Catalog-level mutation: register an index in the catalog. Replay
87    /// re-applies via the same `register_index` path with the recorder
88    /// detached, so events do not duplicate themselves.
89    CreateIndex {
90        request: IndexRequest,
91        if_not_exists: bool,
92    },
93    /// Catalog-level mutation: drop an index by name.
94    DropIndex {
95        name: String,
96        if_exists: bool,
97    },
98    /// Catalog-level mutation: register a constraint in the catalog.
99    CreateConstraint {
100        request: ConstraintRequest,
101        if_not_exists: bool,
102    },
103    /// Catalog-level mutation: drop a constraint by name. The store
104    /// cascades to the backing index when the constraint owned one.
105    DropConstraint {
106        name: String,
107        if_exists: bool,
108    },
109}
110
111/// Observer that receives every successful mutation in the order the store
112/// applied it.
113///
114/// The recorder sees events *after* the mutation has been applied to the
115/// in-memory state, so it never observes a mutation that the store
116/// rejected (invalid id, empty relationship type, …). This matches the
117/// classic write-ahead-log convention of logging committed changes only.
118///
119/// Implementations must be `Send + Sync` so a shared recorder can be driven
120/// from any thread holding the store's write lock.
121pub trait MutationRecorder: Send + Sync + 'static {
122    fn record(&self, event: MutationEvent);
123
124    /// Sticky failure flag for durability-shaped recorders.
125    ///
126    /// `record` itself is infallible — non-WAL observers (audit taps,
127    /// replication shadows, CDC sinks) should not abort a write because
128    /// their downstream queue is full. Recorders that *do* care about
129    /// durability — most importantly the WAL adapter — flip a flag when
130    /// an append fails and surface it here. The host (typically
131    /// `Database::execute_with_params`) polls this once per critical
132    /// section while still holding the store write lock; if poisoned, the
133    /// query fails loudly and the caller observes the durability error
134    /// rather than a silently-lost write.
135    ///
136    /// The default returns `None`, so existing recorders compile
137    /// unchanged.
138    fn poisoned(&self) -> Option<String> {
139        None
140    }
141}
142
143/// Convenience adapter that turns any `Fn(MutationEvent) + Send + Sync`
144/// into a `MutationRecorder` — useful in tests and for quick wiring.
145pub struct ClosureRecorder<F>(pub F)
146where
147    F: Fn(MutationEvent) + Send + Sync + 'static;
148
149impl<F> MutationRecorder for ClosureRecorder<F>
150where
151    F: Fn(MutationEvent) + Send + Sync + 'static,
152{
153    fn record(&self, event: MutationEvent) {
154        (self.0)(event)
155    }
156}
157
158/// Set of record ids touched by a buffered [`MutationEvent`] stream.
159///
160/// Built incrementally as events buffer (or in one pass at commit
161/// time) by [`MutationWriteSet::extend_from_events`]. Used by the OCC
162/// auto-commit path to (a) sort lock-acquire on commit, (b) validate
163/// per-record Arc identity against the snapshot.
164#[derive(Debug, Default, Clone)]
165pub struct MutationWriteSet {
166    /// Nodes whose record was created, modified, or deleted.
167    pub nodes: std::collections::BTreeSet<NodeId>,
168    /// Relationships whose record was created, modified, or deleted.
169    pub rels: std::collections::BTreeSet<RelationshipId>,
170    /// `true` if the stream contained a `MutationEvent::Clear`. A
171    /// clear invalidates any per-record check — the writer must
172    /// fall back to a full-graph commit (or fail under OCC).
173    pub cleared: bool,
174}
175
176impl MutationWriteSet {
177    pub fn new() -> Self {
178        Self::default()
179    }
180
181    /// Walk a `MutationEvent` stream and accumulate every touched
182    /// record id. Variants that touch two records (e.g.
183    /// `CreateRelationship` mentions both `src` and `dst` plus the
184    /// new relationship) record both nodes — the writer's view of
185    /// those nodes' adjacency changed too.
186    pub fn extend_from_events<'a>(&mut self, events: impl IntoIterator<Item = &'a MutationEvent>) {
187        for event in events {
188            match event {
189                MutationEvent::CreateNode { id, .. } => {
190                    self.nodes.insert(*id);
191                }
192                MutationEvent::CreateRelationship { id, src, dst, .. } => {
193                    self.rels.insert(*id);
194                    self.nodes.insert(*src);
195                    self.nodes.insert(*dst);
196                }
197                MutationEvent::SetNodeProperty { node_id, .. }
198                | MutationEvent::RemoveNodeProperty { node_id, .. }
199                | MutationEvent::AddNodeLabel { node_id, .. }
200                | MutationEvent::RemoveNodeLabel { node_id, .. } => {
201                    self.nodes.insert(*node_id);
202                }
203                MutationEvent::SetRelationshipProperty { rel_id, .. }
204                | MutationEvent::RemoveRelationshipProperty { rel_id, .. } => {
205                    self.rels.insert(*rel_id);
206                }
207                MutationEvent::DeleteRelationship { rel_id } => {
208                    self.rels.insert(*rel_id);
209                }
210                MutationEvent::DeleteNode { node_id } => {
211                    self.nodes.insert(*node_id);
212                }
213                MutationEvent::DetachDeleteNode { node_id } => {
214                    // Detach-delete also touches every incident
215                    // relationship, but those fire as
216                    // `DeleteRelationship` events of their own and
217                    // the surrounding loop will pick them up.
218                    self.nodes.insert(*node_id);
219                }
220                MutationEvent::Clear => {
221                    self.cleared = true;
222                }
223                MutationEvent::CreateIndex { .. }
224                | MutationEvent::DropIndex { .. }
225                | MutationEvent::CreateConstraint { .. }
226                | MutationEvent::DropConstraint { .. } => {
227                    // Catalog mutations don't touch node/rel write
228                    // sets — they live next to the graph slabs but
229                    // don't share record locks. The single-writer
230                    // `writer` mutex on the database serialises them
231                    // against everything else.
232                }
233            }
234        }
235    }
236
237    pub fn is_empty(&self) -> bool {
238        !self.cleared && self.nodes.is_empty() && self.rels.is_empty()
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn write_set_extracts_ids_from_events() {
248        let events = [
249            MutationEvent::CreateNode {
250                id: 1,
251                labels: vec!["A".into()],
252                properties: Default::default(),
253            },
254            MutationEvent::CreateRelationship {
255                id: 10,
256                src: 1,
257                dst: 2,
258                rel_type: "R".into(),
259                properties: Default::default(),
260            },
261            MutationEvent::SetNodeProperty {
262                node_id: 3,
263                key: "x".into(),
264                value: PropertyValue::Int(5),
265            },
266            MutationEvent::DeleteRelationship { rel_id: 11 },
267        ];
268
269        let mut ws = MutationWriteSet::new();
270        ws.extend_from_events(events.iter());
271
272        // CreateRelationship pulls in src=1, dst=2 alongside its own rel id.
273        assert_eq!(ws.nodes.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3]);
274        assert_eq!(ws.rels.iter().copied().collect::<Vec<_>>(), vec![10, 11]);
275        assert!(!ws.cleared);
276    }
277
278    #[test]
279    fn write_set_clear_event_is_sticky() {
280        let mut ws = MutationWriteSet::new();
281        ws.extend_from_events([&MutationEvent::Clear]);
282        assert!(ws.cleared);
283        assert!(!ws.is_empty()); // cleared counts as non-empty
284    }
285}