lora_store/mutation.rs
1//! Mutation events and the optional recorder hook.
2//!
3//! [`MutationEvent`] is the vocabulary a write-ahead log (or any observer —
4//! replication, audit, change-data-capture) appends to a durable stream.
5//! The enum covers every method on [`GraphStorageMut`]: each event carries
6//! exactly the information needed to deterministically re-apply the mutation
7//! against an empty store (or a snapshot) and recover the same state.
8//!
9//! [`MutationRecorder`] is the observer trait. Backends that want to emit
10//! events install a recorder via [`InMemoryGraph::set_mutation_recorder`].
11//! The default is `None` so zero-WAL workloads pay only a null-pointer check
12//! per mutation — no allocation, no clone.
13//!
14//! The persistent WAL implementation lives in the `lora-wal` crate, which
15//! supplies a `WalRecorder` that implements `MutationRecorder` by
16//! appending each event to an on-disk log. The snapshot header's
17//! `wal_lsn` field is what makes the checkpoint hybrid expressible
18//! across crate boundaries without `lora-store` learning about the WAL.
19
20use serde::{Deserialize, Serialize};
21
22use crate::{NodeId, Properties, PropertyValue, RelationshipId};
23
24/// A durable, replayable mutation against a graph store.
25///
26/// Each variant mirrors a method on `GraphStorageMut`. Applying every event
27/// in order against a store initialised from the snapshot whose `wal_lsn`
28/// immediately precedes the first event reproduces the committed state.
29///
30/// The enum derives `Serialize`/`Deserialize` for non-WAL observers and
31/// tooling; the production WAL uses its own compact tagged codec.
32#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
33pub enum MutationEvent {
34 CreateNode {
35 /// Id the backend allocated for the new node. Captured so replay
36 /// against a clean store produces the same id assignment as the
37 /// original (`next_node_id` advances deterministically).
38 id: NodeId,
39 labels: Vec<String>,
40 properties: Properties,
41 },
42 CreateRelationship {
43 id: RelationshipId,
44 src: NodeId,
45 dst: NodeId,
46 rel_type: String,
47 properties: Properties,
48 },
49 SetNodeProperty {
50 node_id: NodeId,
51 key: String,
52 value: PropertyValue,
53 },
54 RemoveNodeProperty {
55 node_id: NodeId,
56 key: String,
57 },
58 AddNodeLabel {
59 node_id: NodeId,
60 label: String,
61 },
62 RemoveNodeLabel {
63 node_id: NodeId,
64 label: String,
65 },
66 SetRelationshipProperty {
67 rel_id: RelationshipId,
68 key: String,
69 value: PropertyValue,
70 },
71 RemoveRelationshipProperty {
72 rel_id: RelationshipId,
73 key: String,
74 },
75 DeleteRelationship {
76 rel_id: RelationshipId,
77 },
78 DeleteNode {
79 node_id: NodeId,
80 },
81 DetachDeleteNode {
82 node_id: NodeId,
83 },
84 Clear,
85}
86
87/// Observer that receives every successful mutation in the order the store
88/// applied it.
89///
90/// The recorder sees events *after* the mutation has been applied to the
91/// in-memory state, so it never observes a mutation that the store
92/// rejected (invalid id, empty relationship type, …). This matches the
93/// classic write-ahead-log convention of logging committed changes only.
94///
95/// Implementations must be `Send + Sync` so a shared recorder can be driven
96/// from any thread holding the store's write lock.
97pub trait MutationRecorder: Send + Sync + 'static {
98 fn record(&self, event: MutationEvent);
99
100 /// Sticky failure flag for durability-shaped recorders.
101 ///
102 /// `record` itself is infallible — non-WAL observers (audit taps,
103 /// replication shadows, CDC sinks) should not abort a write because
104 /// their downstream queue is full. Recorders that *do* care about
105 /// durability — most importantly the WAL adapter — flip a flag when
106 /// an append fails and surface it here. The host (typically
107 /// `Database::execute_with_params`) polls this once per critical
108 /// section while still holding the store write lock; if poisoned, the
109 /// query fails loudly and the caller observes the durability error
110 /// rather than a silently-lost write.
111 ///
112 /// The default returns `None`, so existing recorders compile
113 /// unchanged.
114 fn poisoned(&self) -> Option<String> {
115 None
116 }
117}
118
119/// Convenience adapter that turns any `Fn(MutationEvent) + Send + Sync`
120/// into a `MutationRecorder` — useful in tests and for quick wiring.
121pub struct ClosureRecorder<F>(pub F)
122where
123 F: Fn(MutationEvent) + Send + Sync + 'static;
124
125impl<F> MutationRecorder for ClosureRecorder<F>
126where
127 F: Fn(MutationEvent) + Send + Sync + 'static,
128{
129 fn record(&self, event: MutationEvent) {
130 (self.0)(event)
131 }
132}
133
134/// Set of record ids touched by a buffered [`MutationEvent`] stream.
135///
136/// Built incrementally as events buffer (or in one pass at commit
137/// time) by [`MutationWriteSet::extend_from_events`]. Used by the OCC
138/// auto-commit path to (a) sort lock-acquire on commit, (b) validate
139/// per-record Arc identity against the snapshot.
140#[derive(Debug, Default, Clone)]
141pub struct MutationWriteSet {
142 /// Nodes whose record was created, modified, or deleted.
143 pub nodes: std::collections::BTreeSet<NodeId>,
144 /// Relationships whose record was created, modified, or deleted.
145 pub rels: std::collections::BTreeSet<RelationshipId>,
146 /// `true` if the stream contained a `MutationEvent::Clear`. A
147 /// clear invalidates any per-record check — the writer must
148 /// fall back to a full-graph commit (or fail under OCC).
149 pub cleared: bool,
150}
151
152impl MutationWriteSet {
153 pub fn new() -> Self {
154 Self::default()
155 }
156
157 /// Walk a `MutationEvent` stream and accumulate every touched
158 /// record id. Variants that touch two records (e.g.
159 /// `CreateRelationship` mentions both `src` and `dst` plus the
160 /// new relationship) record both nodes — the writer's view of
161 /// those nodes' adjacency changed too.
162 pub fn extend_from_events<'a>(&mut self, events: impl IntoIterator<Item = &'a MutationEvent>) {
163 for event in events {
164 match event {
165 MutationEvent::CreateNode { id, .. } => {
166 self.nodes.insert(*id);
167 }
168 MutationEvent::CreateRelationship { id, src, dst, .. } => {
169 self.rels.insert(*id);
170 self.nodes.insert(*src);
171 self.nodes.insert(*dst);
172 }
173 MutationEvent::SetNodeProperty { node_id, .. }
174 | MutationEvent::RemoveNodeProperty { node_id, .. }
175 | MutationEvent::AddNodeLabel { node_id, .. }
176 | MutationEvent::RemoveNodeLabel { node_id, .. } => {
177 self.nodes.insert(*node_id);
178 }
179 MutationEvent::SetRelationshipProperty { rel_id, .. }
180 | MutationEvent::RemoveRelationshipProperty { rel_id, .. } => {
181 self.rels.insert(*rel_id);
182 }
183 MutationEvent::DeleteRelationship { rel_id } => {
184 self.rels.insert(*rel_id);
185 }
186 MutationEvent::DeleteNode { node_id } => {
187 self.nodes.insert(*node_id);
188 }
189 MutationEvent::DetachDeleteNode { node_id } => {
190 // Detach-delete also touches every incident
191 // relationship, but those fire as
192 // `DeleteRelationship` events of their own and
193 // the surrounding loop will pick them up.
194 self.nodes.insert(*node_id);
195 }
196 MutationEvent::Clear => {
197 self.cleared = true;
198 }
199 }
200 }
201 }
202
203 pub fn is_empty(&self) -> bool {
204 !self.cleared && self.nodes.is_empty() && self.rels.is_empty()
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn write_set_extracts_ids_from_events() {
214 let events = [
215 MutationEvent::CreateNode {
216 id: 1,
217 labels: vec!["A".into()],
218 properties: Default::default(),
219 },
220 MutationEvent::CreateRelationship {
221 id: 10,
222 src: 1,
223 dst: 2,
224 rel_type: "R".into(),
225 properties: Default::default(),
226 },
227 MutationEvent::SetNodeProperty {
228 node_id: 3,
229 key: "x".into(),
230 value: PropertyValue::Int(5),
231 },
232 MutationEvent::DeleteRelationship { rel_id: 11 },
233 ];
234
235 let mut ws = MutationWriteSet::new();
236 ws.extend_from_events(events.iter());
237
238 // CreateRelationship pulls in src=1, dst=2 alongside its own rel id.
239 assert_eq!(ws.nodes.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3]);
240 assert_eq!(ws.rels.iter().copied().collect::<Vec<_>>(), vec![10, 11]);
241 assert!(!ws.cleared);
242 }
243
244 #[test]
245 fn write_set_clear_event_is_sticky() {
246 let mut ws = MutationWriteSet::new();
247 ws.extend_from_events([&MutationEvent::Clear]);
248 assert!(ws.cleared);
249 assert!(!ws.is_empty()); // cleared counts as non-empty
250 }
251}