Skip to main content

lora_wal/
recorder_adapter.rs

1//! Bridge between [`lora_store::MutationRecorder`] (the storage-side
2//! observer hook) and [`crate::Wal`] (the durable log handle).
3//!
4//! Lifecycle, viewed from `lora-database::Database::execute_with_params`:
5//!
6//! 1. Acquire the engine mutex.
7//! 2. `recorder.arm()` — marks the recorder as inside-a-query but
8//!    appends nothing to the WAL yet. A pure read query that fires
9//!    no `MutationEvent` therefore touches the WAL zero times: no
10//!    `TxBegin`, no `TxCommit`, no `flush`, no `fsync`.
11//! 3. Run analyze + compile + execute. The executor mutates the
12//!    in-memory store, which fires `MutationRecorder::record` for each
13//!    primitive mutation. On the *first* such call the adapter lazily
14//!    issues `Wal::begin()` and from then on forwards every event to
15//!    `Wal::append(active_tx, event)`.
16//! 4. On Ok: `recorder.commit()` writes a `TxCommit` and the host
17//!    runs `recorder.flush()` (per the configured `SyncMode`) **only**
18//!    when `commit()` returned `WroteCommit::Yes`. A read-only query
19//!    returns `WroteCommit::No` and the host skips the flush entirely.
20//! 5. On Err / panic: `recorder.abort()`. If a `TxBegin` was lazily
21//!    issued, replay drops every mutation between begin and abort,
22//!    restoring per-query atomicity even though the engine has no
23//!    rollback. If no `TxBegin` was issued (read-only query that
24//!    errored out), abort is a no-op on the WAL.
25//! 6. Before returning, the host inspects `recorder.poisoned()` once.
26//!    If `Some`, the query fails loudly with a durability error so
27//!    the caller can act on it; the WAL is now refusing further
28//!    appends until the operator restarts the database, which
29//!    recovers from the last consistent snapshot + WAL.
30//!
31//! ### Hot-path cost
32//!
33//! `record` is called once per primitive mutation. The adapter holds
34//! exactly one `Mutex<RecorderState>` and one `Arc<Wal>`; both are
35//! uncontested in production because the engine mutex serialises us.
36//! The cost is two atomic-ish lock acquisitions plus one `Wal::append`
37//! (which itself takes one more mutex internally).
38//!
39//! ### When `record` fires after a failed in-memory mutation
40//!
41//! `InMemoryGraph::emit` only calls the recorder *after* the mutation
42//! has been committed to the in-memory state. If the subsequent WAL
43//! append fails, the live in-memory store is briefly ahead of disk:
44//! the next query sees the partial state, but the next query also
45//! observes `poisoned() = Some(_)` and is rejected. Recovery from a
46//! snapshot + WAL after operator restart will not include the failed
47//! mutation, so durable state stays consistent. The cost is "the live
48//! process is wrong until the next restart"; the gain is that the
49//! storage trait does not need to learn about durability.
50
51use std::sync::{Arc, Mutex};
52
53use lora_store::{MutationEvent, MutationRecorder};
54
55use crate::error::WalError;
56use crate::lsn::Lsn;
57use crate::wal::Wal;
58
59/// Whether [`WalRecorder::commit`] actually wrote a `TxCommit` to the
60/// log. Read-only queries — those that never trigger
61/// `MutationRecorder::record` — return [`WroteCommit::No`] so the host
62/// can skip the surrounding `flush()` and avoid a per-query `fsync`
63/// just to record an empty transaction.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum WroteCommit {
66    /// A `TxBegin` had been lazily allocated and was paired with a
67    /// matching `TxCommit`. Caller should `flush()` (under PerCommit).
68    Yes,
69    /// No mutation events fired during the query, so neither `TxBegin`
70    /// nor `TxCommit` was appended. Caller can skip `flush()` entirely.
71    No,
72}
73
74#[derive(Default)]
75struct RecorderState {
76    /// True between `arm()` and the matching `commit()` / `abort()`.
77    /// Marks the host's critical section without committing the WAL
78    /// to a transaction yet — the actual `Wal::begin` happens lazily
79    /// on the first mutation event.
80    armed: bool,
81    /// LSN of the currently-open WAL transaction, if any. Set by the
82    /// first `record()` (or by an explicit `begin_now()`) and cleared
83    /// by `commit` / `abort`.
84    active_tx: Option<Lsn>,
85    /// Sticky failure flag. Once set, [`MutationRecorder::record`]
86    /// becomes a no-op (we cannot append safely) and `poisoned`
87    /// surfaces the message.
88    poisoned: Option<String>,
89}
90
91/// Adapter that lets a [`Wal`] act as a [`MutationRecorder`] on
92/// [`lora_store::InMemoryGraph::set_mutation_recorder`].
93pub struct WalRecorder {
94    wal: Arc<Wal>,
95    state: Mutex<RecorderState>,
96}
97
98impl WalRecorder {
99    pub fn new(wal: Arc<Wal>) -> Self {
100        Self {
101            wal,
102            state: Mutex::new(RecorderState::default()),
103        }
104    }
105
106    /// Underlying log handle. Exposed so admin paths
107    /// (`Database::checkpoint_to`, `truncate_up_to`) can hit the WAL
108    /// directly without going through the recorder's transaction
109    /// state machine.
110    pub fn wal(&self) -> &Arc<Wal> {
111        &self.wal
112    }
113
114    /// Mark the recorder as inside a query critical section. No WAL
115    /// I/O happens here — `Wal::begin` is deferred until the first
116    /// mutation event fires. A pure read query that never produces a
117    /// `MutationEvent` therefore costs the WAL nothing: no record
118    /// allocation, no buffer drain, no `fsync`.
119    ///
120    /// Errors with [`WalError::Poisoned`] if a prior failure has
121    /// poisoned the recorder, or if the host is double-arming
122    /// (`arm` already in effect).
123    pub fn arm(&self) -> Result<(), WalError> {
124        let mut state = self.state.lock().unwrap();
125        if state.poisoned.is_some() {
126            return Err(WalError::Poisoned);
127        }
128        if state.armed {
129            state.poisoned = Some("WalRecorder::arm called while already armed".into());
130            return Err(WalError::Poisoned);
131        }
132        state.armed = true;
133        Ok(())
134    }
135
136    /// Append a `TxCommit` for the active transaction (if any) and
137    /// clear the armed/active state.
138    ///
139    /// Returns:
140    /// - [`WroteCommit::Yes`] when a lazy `TxBegin` had been issued
141    ///   and a matching `TxCommit` was now appended. The host should
142    ///   `flush()` next under `SyncMode::PerCommit`.
143    /// - [`WroteCommit::No`] when no mutations fired during the query
144    ///   and no records were written. The host should skip `flush()`.
145    pub fn commit(&self) -> Result<WroteCommit, WalError> {
146        let mut state = self.state.lock().unwrap();
147        if state.poisoned.is_some() {
148            return Err(WalError::Poisoned);
149        }
150        if !state.armed {
151            state.poisoned = Some("WalRecorder::commit called without an armed query".into());
152            return Err(WalError::Poisoned);
153        }
154        state.armed = false;
155        match state.active_tx.take() {
156            Some(tx) => {
157                self.wal.commit(tx).inspect_err(|e| {
158                    state.poisoned = Some(e.to_string());
159                })?;
160                Ok(WroteCommit::Yes)
161            }
162            None => Ok(WroteCommit::No),
163        }
164    }
165
166    /// Append a `TxAbort` for the active transaction (if any) and
167    /// clear the armed/active state. Returns `Ok(true)` when an abort
168    /// record was actually written, `Ok(false)` when the query never
169    /// got far enough to issue a `TxBegin`.
170    pub fn abort(&self) -> Result<bool, WalError> {
171        let mut state = self.state.lock().unwrap();
172        if state.poisoned.is_some() {
173            return Err(WalError::Poisoned);
174        }
175        // Tolerate "abort without arm" — the host calls abort in
176        // unwind paths and we'd rather no-op than poison.
177        state.armed = false;
178        match state.active_tx.take() {
179            Some(tx) => {
180                self.wal.abort(tx).inspect_err(|e| {
181                    state.poisoned = Some(e.to_string());
182                })?;
183                Ok(true)
184            }
185            None => Ok(false),
186        }
187    }
188
189    /// Flush the WAL — write the pending buffer to the OS and
190    /// (under `SyncMode::PerCommit`) `fsync`.
191    pub fn flush(&self) -> Result<(), WalError> {
192        let mut state = self.state.lock().unwrap();
193        if state.poisoned.is_some() {
194            return Err(WalError::Poisoned);
195        }
196        self.wal.flush().inspect_err(|e| {
197            state.poisoned = Some(e.to_string());
198        })
199    }
200
201    /// Force the underlying WAL to write, `fsync`, and advance its
202    /// durable fence regardless of the configured sync mode. Admin
203    /// paths use this when they need a durability point immediately.
204    pub fn force_fsync(&self) -> Result<(), WalError> {
205        let mut state = self.state.lock().unwrap();
206        if state.poisoned.is_some() {
207            return Err(WalError::Poisoned);
208        }
209        self.wal.force_fsync().inspect_err(|e| {
210            state.poisoned = Some(e.to_string());
211        })
212    }
213
214    /// Append a `Checkpoint` marker. Used by the checkpoint admin
215    /// path after a successful snapshot rename — the marker doubles
216    /// as the log-side fence the next replay will trust.
217    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
218        let mut state = self.state.lock().unwrap();
219        if state.poisoned.is_some() {
220            return Err(WalError::Poisoned);
221        }
222        self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
223            state.poisoned = Some(e.to_string());
224        })
225    }
226
227    /// Drop sealed segments at or below `fence_lsn`. Forwards to
228    /// [`Wal::truncate_up_to`].
229    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
230        self.wal.truncate_up_to(fence_lsn)
231    }
232
233    /// True iff the recorder has already failed an append, **or** the
234    /// background flusher has latched a failure. Cheap to poll under
235    /// the engine mutex.
236    pub fn is_poisoned(&self) -> bool {
237        if self.state.lock().unwrap().poisoned.is_some() {
238            return true;
239        }
240        self.wal.bg_failure().is_some()
241    }
242
243    /// Quarantine the recorder after the host detects that the live
244    /// in-memory graph may no longer match durable state. Once poisoned,
245    /// future query arms fail until the database is restarted from a
246    /// snapshot + WAL.
247    pub fn poison(&self, reason: impl Into<String>) {
248        let mut state = self.state.lock().unwrap();
249        state.poisoned.get_or_insert_with(|| reason.into());
250        state.active_tx = None;
251        state.armed = false;
252    }
253
254    /// Test helper: clear the poisoned flag and reset the active
255    /// transaction. Production code should not call this — once the
256    /// WAL is poisoned the right move is to fail loudly and let the
257    /// operator restart from the last snapshot + WAL.
258    #[doc(hidden)]
259    pub fn clear_poisoned_for_tests(&self) {
260        let mut state = self.state.lock().unwrap();
261        state.poisoned = None;
262        state.active_tx = None;
263        state.armed = false;
264    }
265}
266
267impl MutationRecorder for WalRecorder {
268    fn record(&self, event: &MutationEvent) {
269        // Hold the lock for the whole record path: we may need to
270        // lazily issue `Wal::begin` on the first event and then
271        // append, both under the same critical section so a racing
272        // commit can't observe a half-initialised state. The engine
273        // mutex already serialises queries, so this lock is
274        // uncontested.
275        let mut state = self.state.lock().unwrap();
276        if state.poisoned.is_some() {
277            return;
278        }
279        if !state.armed {
280            state.poisoned.get_or_insert_with(|| {
281                "MutationRecorder::record fired outside an armed query".into()
282            });
283            return;
284        }
285        let tx = match state.active_tx {
286            Some(lsn) => lsn,
287            None => match self.wal.begin() {
288                Ok(lsn) => {
289                    state.active_tx = Some(lsn);
290                    lsn
291                }
292                Err(e) => {
293                    state.poisoned.get_or_insert_with(|| e.to_string());
294                    return;
295                }
296            },
297        };
298        if let Err(e) = self.wal.append(tx, event) {
299            state.poisoned.get_or_insert_with(|| e.to_string());
300        }
301    }
302
303    fn poisoned(&self) -> Option<String> {
304        // Surface a latched bg-flusher failure too — the recorder is
305        // the host's single point of contact for "is the WAL still
306        // safe to commit through?".
307        let state = self.state.lock().unwrap();
308        if let Some(msg) = state.poisoned.clone() {
309            return Some(msg);
310        }
311        self.wal.bg_failure()
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use std::sync::Arc;
319
320    use lora_store::{GraphStorageMut, InMemoryGraph, MutationEvent, Properties, PropertyValue};
321
322    use crate::config::SyncMode;
323    use crate::testing::TmpDir;
324    use crate::Wal;
325
326    fn open_wal(dir: &std::path::Path) -> Arc<Wal> {
327        let (wal, replay) =
328            Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
329        assert!(replay.is_empty());
330        wal
331    }
332
333    #[test]
334    fn record_outside_arm_poisons() {
335        let dir = TmpDir::new("no-arm");
336        let recorder = WalRecorder::new(open_wal(&dir.path));
337        recorder.record(&MutationEvent::Clear);
338        assert!(recorder.is_poisoned());
339        let msg = recorder.poisoned().unwrap();
340        assert!(msg.contains("outside an armed query"));
341    }
342
343    #[test]
344    fn arm_record_commit_round_trip_via_in_memory_graph() {
345        let dir = TmpDir::new("happy");
346        let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
347
348        let mut g = InMemoryGraph::new();
349        g.set_mutation_recorder(Some(recorder.clone()));
350
351        recorder.arm().unwrap();
352        let mut props = Properties::new();
353        props.insert("v".into(), PropertyValue::Int(1));
354        g.create_node(vec!["N".into()], props);
355        let mut props2 = Properties::new();
356        props2.insert("v".into(), PropertyValue::Int(2));
357        g.create_node(vec!["N".into()], props2);
358        let outcome = recorder.commit().unwrap();
359        assert_eq!(outcome, WroteCommit::Yes);
360        recorder.flush().unwrap();
361
362        assert!(!recorder.is_poisoned());
363
364        // Drop every recorder clone before re-opening the directory,
365        // otherwise we'd race with our own live WAL handle.
366        g.set_mutation_recorder(None);
367        drop(recorder);
368
369        let (_wal, events) =
370            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
371        assert_eq!(events.len(), 2);
372        assert!(matches!(events[0], MutationEvent::CreateNode { id: 0, .. }));
373        assert!(matches!(events[1], MutationEvent::CreateNode { id: 1, .. }));
374    }
375
376    #[test]
377    fn arm_then_commit_with_no_mutations_writes_nothing() {
378        let dir = TmpDir::new("ro");
379        let recorder = WalRecorder::new(open_wal(&dir.path));
380
381        // Simulate a read-only query: arm + commit without any
382        // intervening `record` calls.
383        let next_before = recorder.wal().next_lsn();
384        recorder.arm().unwrap();
385        let outcome = recorder.commit().unwrap();
386        assert_eq!(outcome, WroteCommit::No);
387        let next_after = recorder.wal().next_lsn();
388        assert_eq!(
389            next_before, next_after,
390            "read-only commit must not allocate any LSNs"
391        );
392    }
393
394    #[test]
395    fn abort_drops_in_flight_events_on_replay() {
396        let dir = TmpDir::new("abort");
397        let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
398
399        let mut g = InMemoryGraph::new();
400        g.set_mutation_recorder(Some(recorder.clone()));
401
402        // Tx 1 commits.
403        recorder.arm().unwrap();
404        g.create_node(vec!["A".into()], Properties::new());
405        let _ = recorder.commit().unwrap();
406        recorder.flush().unwrap();
407
408        // Tx 2 aborts: the in-memory mutation already happened (the
409        // engine has no rollback) but the WAL marks it aborted, so
410        // recovery from a fresh process must skip it.
411        recorder.arm().unwrap();
412        g.create_node(vec!["B".into()], Properties::new());
413        let aborted = recorder.abort().unwrap();
414        assert!(aborted, "abort with active tx should write a TxAbort");
415        recorder.flush().unwrap();
416
417        g.set_mutation_recorder(None);
418        drop(recorder);
419
420        let (_wal, events) =
421            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
422        assert_eq!(events.len(), 1);
423        if let MutationEvent::CreateNode { labels, .. } = &events[0] {
424            assert_eq!(labels, &vec!["A".to_string()]);
425        } else {
426            panic!("expected CreateNode for label A, got {:?}", events[0]);
427        }
428    }
429
430    #[test]
431    fn arm_while_armed_poisons() {
432        let dir = TmpDir::new("double-arm");
433        let recorder = WalRecorder::new(open_wal(&dir.path));
434        recorder.arm().unwrap();
435        let err = recorder.arm().unwrap_err();
436        assert!(matches!(err, WalError::Poisoned));
437        assert!(recorder.is_poisoned());
438    }
439
440    #[test]
441    fn poisoned_recorder_swallows_subsequent_records() {
442        let dir = TmpDir::new("swallow");
443        let recorder = WalRecorder::new(open_wal(&dir.path));
444
445        // Poison it.
446        recorder.record(&MutationEvent::Clear);
447        assert!(recorder.is_poisoned());
448
449        // After poisoning, further `record` calls must NOT touch the
450        // WAL or panic — they're a no-op so the engine can finish
451        // unwinding before the host observes `poisoned()` and fails
452        // the query.
453        for _ in 0..10 {
454            recorder.record(&MutationEvent::Clear);
455        }
456        assert!(recorder.is_poisoned());
457    }
458
459    #[test]
460    fn checkpoint_marker_through_recorder() {
461        let dir = TmpDir::new("ckpt");
462        let recorder = WalRecorder::new(open_wal(&dir.path));
463
464        recorder.arm().unwrap();
465        recorder.record(&MutationEvent::Clear);
466        assert_eq!(recorder.commit().unwrap(), WroteCommit::Yes);
467        recorder.force_fsync().unwrap();
468        let snapshot_lsn = recorder.wal().durable_lsn();
469
470        // Exercise the marker path via the recorder's shim after a
471        // real durable fence exists.
472        let marker_lsn = recorder.checkpoint_marker(snapshot_lsn).unwrap();
473        recorder.force_fsync().unwrap();
474        assert!(marker_lsn >= Lsn::new(1));
475
476        let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
477        assert_eq!(outcome.checkpoint_lsn_observed, Some(snapshot_lsn));
478    }
479}