lora_store/mutation.rs
1//! Mutation events and the optional recorder hook.
2//!
3//! [`MutationEvent`] is the vocabulary a write-ahead log (or any observer —
4//! replication, audit, change-data-capture) appends to a durable stream.
5//! The enum covers every method on [`GraphStorageMut`]: each event carries
6//! exactly the information needed to deterministically re-apply the mutation
7//! against an empty store (or a snapshot) and recover the same state.
8//!
9//! [`MutationRecorder`] is the observer trait. Backends that want to emit
10//! events install a recorder via [`InMemoryGraph::set_mutation_recorder`].
11//! The default is `None` so zero-WAL workloads pay only a null-pointer check
12//! per mutation — no allocation, no clone.
13//!
14//! The persistent WAL implementation lives in the `lora-wal` crate, which
15//! supplies a `WalRecorder` that implements `MutationRecorder` by
16//! appending each event to an on-disk log. The snapshot header's
17//! `wal_lsn` field is what makes the checkpoint hybrid expressible
18//! across crate boundaries without `lora-store` learning about the WAL.
19
20use serde::{Deserialize, Serialize};
21
22use crate::memory::{ConstraintRequest, IndexRequest};
23use crate::{NodeId, Properties, PropertyValue, RelationshipId};
24
25/// A durable, replayable mutation against a graph store.
26///
27/// Each variant mirrors a method on `GraphStorageMut`. Applying every event
28/// in order against a store initialised from the snapshot whose `wal_lsn`
29/// immediately precedes the first event reproduces the committed state.
30///
31/// The enum derives `Serialize`/`Deserialize` for non-WAL observers and
32/// tooling; the production WAL uses its own compact tagged codec.
33#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
34pub enum MutationEvent {
35 CreateNode {
36 /// Id the backend allocated for the new node. Captured so replay
37 /// against a clean store produces the same id assignment as the
38 /// original (`next_node_id` advances deterministically).
39 id: NodeId,
40 labels: Vec<String>,
41 properties: Properties,
42 },
43 CreateRelationship {
44 id: RelationshipId,
45 src: NodeId,
46 dst: NodeId,
47 rel_type: String,
48 properties: Properties,
49 },
50 SetNodeProperty {
51 node_id: NodeId,
52 key: String,
53 value: PropertyValue,
54 },
55 RemoveNodeProperty {
56 node_id: NodeId,
57 key: String,
58 },
59 AddNodeLabel {
60 node_id: NodeId,
61 label: String,
62 },
63 RemoveNodeLabel {
64 node_id: NodeId,
65 label: String,
66 },
67 SetRelationshipProperty {
68 rel_id: RelationshipId,
69 key: String,
70 value: PropertyValue,
71 },
72 RemoveRelationshipProperty {
73 rel_id: RelationshipId,
74 key: String,
75 },
76 DeleteRelationship {
77 rel_id: RelationshipId,
78 },
79 DeleteNode {
80 node_id: NodeId,
81 },
82 DetachDeleteNode {
83 node_id: NodeId,
84 },
85 Clear,
86 /// Catalog-level mutation: register an index in the catalog. Replay
87 /// re-applies via the same `register_index` path with the recorder
88 /// detached, so events do not duplicate themselves.
89 CreateIndex {
90 request: IndexRequest,
91 if_not_exists: bool,
92 },
93 /// Catalog-level mutation: drop an index by name.
94 DropIndex {
95 name: String,
96 if_exists: bool,
97 },
98 /// Catalog-level mutation: register a constraint in the catalog.
99 CreateConstraint {
100 request: ConstraintRequest,
101 if_not_exists: bool,
102 },
103 /// Catalog-level mutation: drop a constraint by name. The store
104 /// cascades to the backing index when the constraint owned one.
105 DropConstraint {
106 name: String,
107 if_exists: bool,
108 },
109}
110
111/// Observer that receives every successful mutation in the order the store
112/// applied it.
113///
114/// The recorder sees events *after* the mutation has been applied to the
115/// in-memory state, so it never observes a mutation that the store
116/// rejected (invalid id, empty relationship type, …). This matches the
117/// classic write-ahead-log convention of logging committed changes only.
118///
119/// Implementations must be `Send + Sync` so a shared recorder can be driven
120/// from any thread holding the store's write lock.
121pub trait MutationRecorder: Send + Sync + 'static {
122 fn record(&self, event: MutationEvent);
123
124 /// Sticky failure flag for durability-shaped recorders.
125 ///
126 /// `record` itself is infallible — non-WAL observers (audit taps,
127 /// replication shadows, CDC sinks) should not abort a write because
128 /// their downstream queue is full. Recorders that *do* care about
129 /// durability — most importantly the WAL adapter — flip a flag when
130 /// an append fails and surface it here. The host (typically
131 /// `Database::execute_with_params`) polls this once per critical
132 /// section while still holding the store write lock; if poisoned, the
133 /// query fails loudly and the caller observes the durability error
134 /// rather than a silently-lost write.
135 ///
136 /// The default returns `None`, so existing recorders compile
137 /// unchanged.
138 fn poisoned(&self) -> Option<String> {
139 None
140 }
141}
142
143/// Convenience adapter that turns any `Fn(MutationEvent) + Send + Sync`
144/// into a `MutationRecorder` — useful in tests and for quick wiring.
145pub struct ClosureRecorder<F>(pub F)
146where
147 F: Fn(MutationEvent) + Send + Sync + 'static;
148
149impl<F> MutationRecorder for ClosureRecorder<F>
150where
151 F: Fn(MutationEvent) + Send + Sync + 'static,
152{
153 fn record(&self, event: MutationEvent) {
154 (self.0)(event)
155 }
156}
157
158/// Set of record ids touched by a buffered [`MutationEvent`] stream.
159///
160/// Built incrementally as events buffer (or in one pass at commit
161/// time) by [`MutationWriteSet::extend_from_events`]. Used by the OCC
162/// auto-commit path to (a) sort lock-acquire on commit, (b) validate
163/// per-record Arc identity against the snapshot.
164#[derive(Debug, Default, Clone)]
165pub struct MutationWriteSet {
166 /// Nodes whose record was created, modified, or deleted.
167 pub nodes: std::collections::BTreeSet<NodeId>,
168 /// Relationships whose record was created, modified, or deleted.
169 pub rels: std::collections::BTreeSet<RelationshipId>,
170 /// `true` if the stream contained a `MutationEvent::Clear`. A
171 /// clear invalidates any per-record check — the writer must
172 /// fall back to a full-graph commit (or fail under OCC).
173 pub cleared: bool,
174}
175
176impl MutationWriteSet {
177 pub fn new() -> Self {
178 Self::default()
179 }
180
181 /// Walk a `MutationEvent` stream and accumulate every touched
182 /// record id. Variants that touch two records (e.g.
183 /// `CreateRelationship` mentions both `src` and `dst` plus the
184 /// new relationship) record both nodes — the writer's view of
185 /// those nodes' adjacency changed too.
186 pub fn extend_from_events<'a>(&mut self, events: impl IntoIterator<Item = &'a MutationEvent>) {
187 for event in events {
188 match event {
189 MutationEvent::CreateNode { id, .. } => {
190 self.nodes.insert(*id);
191 }
192 MutationEvent::CreateRelationship { id, src, dst, .. } => {
193 self.rels.insert(*id);
194 self.nodes.insert(*src);
195 self.nodes.insert(*dst);
196 }
197 MutationEvent::SetNodeProperty { node_id, .. }
198 | MutationEvent::RemoveNodeProperty { node_id, .. }
199 | MutationEvent::AddNodeLabel { node_id, .. }
200 | MutationEvent::RemoveNodeLabel { node_id, .. } => {
201 self.nodes.insert(*node_id);
202 }
203 MutationEvent::SetRelationshipProperty { rel_id, .. }
204 | MutationEvent::RemoveRelationshipProperty { rel_id, .. } => {
205 self.rels.insert(*rel_id);
206 }
207 MutationEvent::DeleteRelationship { rel_id } => {
208 self.rels.insert(*rel_id);
209 }
210 MutationEvent::DeleteNode { node_id } => {
211 self.nodes.insert(*node_id);
212 }
213 MutationEvent::DetachDeleteNode { node_id } => {
214 // Detach-delete also touches every incident
215 // relationship, but those fire as
216 // `DeleteRelationship` events of their own and
217 // the surrounding loop will pick them up.
218 self.nodes.insert(*node_id);
219 }
220 MutationEvent::Clear => {
221 self.cleared = true;
222 }
223 MutationEvent::CreateIndex { .. }
224 | MutationEvent::DropIndex { .. }
225 | MutationEvent::CreateConstraint { .. }
226 | MutationEvent::DropConstraint { .. } => {
227 // Catalog mutations don't touch node/rel write
228 // sets — they live next to the graph slabs but
229 // don't share record locks. The single-writer
230 // `writer` mutex on the database serialises them
231 // against everything else.
232 }
233 }
234 }
235 }
236
237 pub fn is_empty(&self) -> bool {
238 !self.cleared && self.nodes.is_empty() && self.rels.is_empty()
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn write_set_extracts_ids_from_events() {
248 let events = [
249 MutationEvent::CreateNode {
250 id: 1,
251 labels: vec!["A".into()],
252 properties: Default::default(),
253 },
254 MutationEvent::CreateRelationship {
255 id: 10,
256 src: 1,
257 dst: 2,
258 rel_type: "R".into(),
259 properties: Default::default(),
260 },
261 MutationEvent::SetNodeProperty {
262 node_id: 3,
263 key: "x".into(),
264 value: PropertyValue::Int(5),
265 },
266 MutationEvent::DeleteRelationship { rel_id: 11 },
267 ];
268
269 let mut ws = MutationWriteSet::new();
270 ws.extend_from_events(events.iter());
271
272 // CreateRelationship pulls in src=1, dst=2 alongside its own rel id.
273 assert_eq!(ws.nodes.iter().copied().collect::<Vec<_>>(), vec![1, 2, 3]);
274 assert_eq!(ws.rels.iter().copied().collect::<Vec<_>>(), vec![10, 11]);
275 assert!(!ws.cleared);
276 }
277
278 #[test]
279 fn write_set_clear_event_is_sticky() {
280 let mut ws = MutationWriteSet::new();
281 ws.extend_from_events([&MutationEvent::Clear]);
282 assert!(ws.cleared);
283 assert!(!ws.is_empty()); // cleared counts as non-empty
284 }
285}