Skip to main content

lora_wal/
recorder_adapter.rs

1//! Bridge between [`lora_store::MutationRecorder`] (the storage-side
2//! observer hook) and [`crate::Wal`] (the durable log handle).
3//!
4//! Lifecycle, viewed from `lora-database::Database::execute_with_params`:
5//!
6//! 1. Acquire the store write lock.
7//! 2. `recorder.arm()` — marks the recorder as inside-a-query but
8//!    appends nothing to the WAL yet. A pure read query that fires
9//!    no `MutationEvent` therefore touches the WAL zero times: no
10//!    `TxBegin`, no `TxCommit`, no `flush`, no `fsync`.
11//! 3. Run analyze + compile + execute. The executor mutates the
12//!    in-memory store, which fires `MutationRecorder::record` for each
13//!    primitive mutation. The adapter buffers those events in memory.
14//! 4. On Ok: `recorder.commit()` writes `TxBegin`, one `MutationBatch`,
15//!    and `TxCommit`, then the host runs `recorder.flush()` (per the
16//!    configured `SyncMode`) **only** when `commit()` returned
17//!    `WroteCommit::Yes`. A read-only query returns `WroteCommit::No`
18//!    and the host skips the flush entirely.
19//! 5. On Err / panic: `recorder.abort()`. If any mutation events were
20//!    buffered, the host quarantines the live handle because the engine
21//!    has no rollback. Durable recovery stays atomic because the failed
22//!    query never writes a committed batch to the WAL.
23//! 6. Before returning, the host inspects `recorder.poisoned()` once.
24//!    If `Some`, the query fails loudly with a durability error so
25//!    the caller can act on it; the WAL is now refusing further
26//!    appends until the operator restarts the database, which
27//!    recovers from the last consistent snapshot + WAL.
28//!
29//! ### Hot-path cost
30//!
31//! `record` is called once per primitive mutation. It now takes only the
32//! recorder mutex and pushes a clone into a query-local buffer; the WAL mutex,
33//! framing, checksum, and segment append work happen once at commit time.
34//!
35//! ### When `record` fires after a failed in-memory mutation
36//!
37//! `InMemoryGraph::emit` only calls the recorder *after* the mutation
38//! has been committed to the in-memory state. If the subsequent WAL
39//! append fails, the live in-memory store is briefly ahead of disk:
40//! the next query sees the partial state, but the next query also
41//! observes `poisoned() = Some(_)` and is rejected. Recovery from a
42//! snapshot + WAL after operator restart will not include the failed
43//! mutation, so durable state stays consistent. The cost is "the live
44//! process is wrong until the next restart"; the gain is that the
45//! storage trait does not need to learn about durability.
46
47use std::path::Path;
48use std::sync::{Arc, Mutex};
49
50use lora_store::{MutationEvent, MutationRecorder};
51
52use crate::error::WalError;
53use crate::lsn::Lsn;
54use crate::wal::Wal;
55
56/// Whether [`WalRecorder::commit`] actually wrote a `TxCommit` to the
57/// log. Read-only queries — those that never trigger
58/// `MutationRecorder::record` — return [`WroteCommit::No`] so the host
59/// can skip the surrounding `flush()` and avoid a per-query `fsync`
60/// just to record an empty transaction.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum WroteCommit {
63    /// A `TxBegin` had been lazily allocated and was paired with a
64    /// matching `TxCommit`. Caller should `flush()` (under PerCommit).
65    Yes,
66    /// No mutation events fired during the query, so neither `TxBegin`
67    /// nor `TxCommit` was appended. Caller can skip `flush()` entirely.
68    No,
69}
70
71/// Optional side-effect after the WAL has successfully flushed.
72///
73/// The core WAL stays directory/segment based for append performance. Higher
74/// layers can install a mirror to copy that durable directory into another
75/// representation, such as the portable `.loradb` archive file used by named
76/// databases.
77pub trait WalMirror: Send + Sync {
78    fn persist(&self, wal_dir: &Path) -> Result<(), WalError>;
79
80    fn persist_force(&self, wal_dir: &Path) -> Result<(), WalError> {
81        self.persist(wal_dir)
82    }
83}
84
85#[derive(Default)]
86struct RecorderState {
87    /// True between `arm()` and the matching `commit()` / `abort()`.
88    /// Marks the host's critical section without committing the WAL
89    /// to a transaction yet — the actual `Wal::begin` happens lazily
90    /// on the first mutation event.
91    armed: bool,
92    /// LSN of the currently-open WAL transaction, if any. Normally this is
93    /// only set inside `commit()` while the buffered batch is being written.
94    active_tx: Option<Lsn>,
95    /// Query-local mutation buffer. This lets write-heavy statements commit
96    /// as one `MutationBatch` record instead of one framed record per event.
97    buffer: Vec<MutationEvent>,
98    /// Sticky failure flag. Once set, [`MutationRecorder::record`]
99    /// becomes a no-op (we cannot append safely) and `poisoned`
100    /// surfaces the message.
101    poisoned: Option<String>,
102}
103
104/// Adapter that lets a [`Wal`] act as a [`MutationRecorder`] on
105/// [`lora_store::InMemoryGraph::set_mutation_recorder`].
106pub struct WalRecorder {
107    wal: Arc<Wal>,
108    mirror: Option<Arc<dyn WalMirror>>,
109    state: Mutex<RecorderState>,
110}
111
112impl WalRecorder {
113    pub fn new(wal: Arc<Wal>) -> Self {
114        Self::new_with_mirror(wal, None)
115    }
116
117    pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
118        Self {
119            wal,
120            mirror,
121            state: Mutex::new(RecorderState::default()),
122        }
123    }
124
125    /// Underlying log handle. Exposed so admin paths
126    /// (`Database::checkpoint_to`, `truncate_up_to`) can hit the WAL
127    /// directly without going through the recorder's transaction
128    /// state machine.
129    pub fn wal(&self) -> &Arc<Wal> {
130        &self.wal
131    }
132
133    /// Mark the recorder as inside a query critical section. No WAL
134    /// I/O happens here — `Wal::begin` is deferred until the first
135    /// mutation event fires. A pure read query that never produces a
136    /// `MutationEvent` therefore costs the WAL nothing: no record
137    /// allocation, no buffer drain, no `fsync`.
138    ///
139    /// Errors with [`WalError::Poisoned`] if a prior failure has
140    /// poisoned the recorder, or if the host is double-arming
141    /// (`arm` already in effect).
142    pub fn arm(&self) -> Result<(), WalError> {
143        let mut state = self.state.lock().unwrap();
144        if state.poisoned.is_some() {
145            return Err(WalError::Poisoned);
146        }
147        if state.armed {
148            state.poisoned = Some("WalRecorder::arm called while already armed".into());
149            return Err(WalError::Poisoned);
150        }
151        state.armed = true;
152        state.buffer.clear();
153        Ok(())
154    }
155
156    /// Append a `TxCommit` for the active transaction (if any) and
157    /// clear the armed/active state.
158    ///
159    /// Returns:
160    /// - [`WroteCommit::Yes`] when a lazy `TxBegin` had been issued
161    ///   and a matching `TxCommit` was now appended. The host should
162    ///   `flush()` next under `SyncMode::PerCommit`.
163    /// - [`WroteCommit::No`] when no mutations fired during the query
164    ///   and no records were written. The host should skip `flush()`.
165    pub fn commit(&self) -> Result<WroteCommit, WalError> {
166        let mut state = self.state.lock().unwrap();
167        if state.poisoned.is_some() {
168            return Err(WalError::Poisoned);
169        }
170        if !state.armed {
171            state.poisoned = Some("WalRecorder::commit called without an armed query".into());
172            return Err(WalError::Poisoned);
173        }
174        state.armed = false;
175        if state.buffer.is_empty() && state.active_tx.is_none() {
176            return Ok(WroteCommit::No);
177        }
178
179        let events = std::mem::take(&mut state.buffer);
180        let tx = match state.active_tx {
181            Some(tx) => tx,
182            None => self.wal.begin().inspect_err(|e| {
183                state.poisoned = Some(e.to_string());
184            })?,
185        };
186        state.active_tx = Some(tx);
187
188        self.wal.append_batch(tx, events).inspect_err(|e| {
189            state.poisoned = Some(e.to_string());
190        })?;
191        self.wal.commit(tx).inspect_err(|e| {
192            state.poisoned = Some(e.to_string());
193        })?;
194        state.active_tx = None;
195        Ok(WroteCommit::Yes)
196    }
197
198    /// Append a `TxAbort` for the active transaction (if any) and
199    /// clear the armed/active state. Returns `Ok(true)` when the live graph
200    /// may have observed mutations and should be quarantined, `Ok(false)` when
201    /// the query never mutated anything.
202    pub fn abort(&self) -> Result<bool, WalError> {
203        let mut state = self.state.lock().unwrap();
204        if state.poisoned.is_some() {
205            return Err(WalError::Poisoned);
206        }
207        // Tolerate "abort without arm" — the host calls abort in
208        // unwind paths and we'd rather no-op than poison.
209        state.armed = false;
210        let had_buffered_events = !state.buffer.is_empty();
211        state.buffer.clear();
212        match state.active_tx.take() {
213            Some(tx) => {
214                self.wal.abort(tx).inspect_err(|e| {
215                    state.poisoned = Some(e.to_string());
216                })?;
217                Ok(true)
218            }
219            None => Ok(had_buffered_events),
220        }
221    }
222
223    /// Flush the WAL — write the pending buffer to the OS and
224    /// (under `SyncMode::PerCommit`) `fsync`.
225    pub fn flush(&self) -> Result<(), WalError> {
226        let mut state = self.state.lock().unwrap();
227        if state.poisoned.is_some() {
228            return Err(WalError::Poisoned);
229        }
230        self.wal.flush().inspect_err(|e| {
231            state.poisoned = Some(e.to_string());
232        })?;
233        if let Some(mirror) = &self.mirror {
234            mirror.persist(self.wal.dir()).inspect_err(|e| {
235                state.poisoned = Some(e.to_string());
236            })?;
237        }
238        Ok(())
239    }
240
241    /// Force the underlying WAL to write, `fsync`, and advance its
242    /// durable fence regardless of the configured sync mode. Admin
243    /// paths use this when they need a durability point immediately.
244    pub fn force_fsync(&self) -> Result<(), WalError> {
245        let mut state = self.state.lock().unwrap();
246        if state.poisoned.is_some() {
247            return Err(WalError::Poisoned);
248        }
249        self.wal.force_fsync().inspect_err(|e| {
250            state.poisoned = Some(e.to_string());
251        })?;
252        if let Some(mirror) = &self.mirror {
253            mirror.persist_force(self.wal.dir()).inspect_err(|e| {
254                state.poisoned = Some(e.to_string());
255            })?;
256        }
257        Ok(())
258    }
259
260    /// Append a `Checkpoint` marker. Used by the checkpoint admin
261    /// path after a successful snapshot rename — the marker doubles
262    /// as the log-side fence the next replay will trust.
263    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
264        let mut state = self.state.lock().unwrap();
265        if state.poisoned.is_some() {
266            return Err(WalError::Poisoned);
267        }
268        self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
269            state.poisoned = Some(e.to_string());
270        })
271    }
272
273    /// Drop sealed segments at or below `fence_lsn`. Forwards to
274    /// [`Wal::truncate_up_to`].
275    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
276        // Archive-backed databases must stay self-contained. Until snapshot
277        // checkpoint payloads are stored inside the archive too, preserving the
278        // full WAL history is the only safe way to let the archive recover by
279        // itself after a checkpoint marker.
280        if let Some(mirror) = &self.mirror {
281            mirror.persist_force(self.wal.dir())?;
282            return Ok(());
283        }
284        self.wal.truncate_up_to(fence_lsn)?;
285        Ok(())
286    }
287
288    /// True iff the recorder has already failed an append, **or** the
289    /// background flusher has latched a failure. Cheap to poll under
290    /// the store lock.
291    pub fn is_poisoned(&self) -> bool {
292        if self.state.lock().unwrap().poisoned.is_some() {
293            return true;
294        }
295        self.wal.bg_failure().is_some()
296    }
297
298    /// Quarantine the recorder after the host detects that the live
299    /// in-memory graph may no longer match durable state. Once poisoned,
300    /// future query arms fail until the database is restarted from a
301    /// snapshot + WAL.
302    pub fn poison(&self, reason: impl Into<String>) {
303        let mut state = self.state.lock().unwrap();
304        state.poisoned.get_or_insert_with(|| reason.into());
305        state.active_tx = None;
306        state.armed = false;
307        state.buffer.clear();
308    }
309
310    /// Test helper: clear the poisoned flag and reset the active
311    /// transaction. Production code should not call this — once the
312    /// WAL is poisoned the right move is to fail loudly and let the
313    /// operator restart from the last snapshot + WAL.
314    #[doc(hidden)]
315    pub fn clear_poisoned_for_tests(&self) {
316        let mut state = self.state.lock().unwrap();
317        state.poisoned = None;
318        state.active_tx = None;
319        state.armed = false;
320        state.buffer.clear();
321    }
322}
323
324impl MutationRecorder for WalRecorder {
325    fn record(&self, event: &MutationEvent) {
326        let mut state = self.state.lock().unwrap();
327        if state.poisoned.is_some() {
328            return;
329        }
330        if !state.armed {
331            state.poisoned.get_or_insert_with(|| {
332                "MutationRecorder::record fired outside an armed query".into()
333            });
334            return;
335        }
336        state.buffer.push(event.clone());
337    }
338
339    fn poisoned(&self) -> Option<String> {
340        // Surface a latched bg-flusher failure too — the recorder is
341        // the host's single point of contact for "is the WAL still
342        // safe to commit through?".
343        let state = self.state.lock().unwrap();
344        if let Some(msg) = state.poisoned.clone() {
345            return Some(msg);
346        }
347        self.wal.bg_failure()
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354    use std::sync::Arc;
355
356    use lora_store::{GraphStorageMut, InMemoryGraph, MutationEvent, Properties, PropertyValue};
357
358    use crate::config::SyncMode;
359    use crate::testing::TmpDir;
360    use crate::Wal;
361
362    fn open_wal(dir: &std::path::Path) -> Arc<Wal> {
363        let (wal, replay) =
364            Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
365        assert!(replay.is_empty());
366        wal
367    }
368
369    #[test]
370    fn record_outside_arm_poisons() {
371        let dir = TmpDir::new("no-arm");
372        let recorder = WalRecorder::new(open_wal(&dir.path));
373        recorder.record(&MutationEvent::Clear);
374        assert!(recorder.is_poisoned());
375        let msg = recorder.poisoned().unwrap();
376        assert!(msg.contains("outside an armed query"));
377    }
378
379    #[test]
380    fn arm_record_commit_round_trip_via_in_memory_graph() {
381        let dir = TmpDir::new("happy");
382        let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
383
384        let mut g = InMemoryGraph::new();
385        g.set_mutation_recorder(Some(recorder.clone()));
386
387        recorder.arm().unwrap();
388        let mut props = Properties::new();
389        props.insert("v".into(), PropertyValue::Int(1));
390        g.create_node(vec!["N".into()], props);
391        let mut props2 = Properties::new();
392        props2.insert("v".into(), PropertyValue::Int(2));
393        g.create_node(vec!["N".into()], props2);
394        let outcome = recorder.commit().unwrap();
395        assert_eq!(outcome, WroteCommit::Yes);
396        recorder.flush().unwrap();
397
398        assert!(!recorder.is_poisoned());
399
400        // Drop every recorder clone before re-opening the directory,
401        // otherwise we'd race with our own live WAL handle.
402        g.set_mutation_recorder(None);
403        drop(recorder);
404
405        let (_wal, events) =
406            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
407        assert_eq!(events.len(), 2);
408        assert!(matches!(events[0], MutationEvent::CreateNode { id: 0, .. }));
409        assert!(matches!(events[1], MutationEvent::CreateNode { id: 1, .. }));
410    }
411
412    #[test]
413    fn arm_then_commit_with_no_mutations_writes_nothing() {
414        let dir = TmpDir::new("ro");
415        let recorder = WalRecorder::new(open_wal(&dir.path));
416
417        // Simulate a read-only query: arm + commit without any
418        // intervening `record` calls.
419        let next_before = recorder.wal().next_lsn();
420        recorder.arm().unwrap();
421        let outcome = recorder.commit().unwrap();
422        assert_eq!(outcome, WroteCommit::No);
423        let next_after = recorder.wal().next_lsn();
424        assert_eq!(
425            next_before, next_after,
426            "read-only commit must not allocate any LSNs"
427        );
428    }
429
430    #[test]
431    fn abort_drops_in_flight_events_on_replay() {
432        let dir = TmpDir::new("abort");
433        let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
434
435        let mut g = InMemoryGraph::new();
436        g.set_mutation_recorder(Some(recorder.clone()));
437
438        // Tx 1 commits.
439        recorder.arm().unwrap();
440        g.create_node(vec!["A".into()], Properties::new());
441        let _ = recorder.commit().unwrap();
442        recorder.flush().unwrap();
443
444        // Tx 2 aborts: the in-memory mutation already happened (the
445        // engine has no rollback) but the WAL marks it aborted, so
446        // recovery from a fresh process must skip it.
447        recorder.arm().unwrap();
448        g.create_node(vec!["B".into()], Properties::new());
449        let aborted = recorder.abort().unwrap();
450        assert!(aborted, "abort after buffered mutations should quarantine");
451        recorder.flush().unwrap();
452
453        g.set_mutation_recorder(None);
454        drop(recorder);
455
456        let (_wal, events) =
457            Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
458        assert_eq!(events.len(), 1);
459        if let MutationEvent::CreateNode { labels, .. } = &events[0] {
460            assert_eq!(labels, &vec!["A".to_string()]);
461        } else {
462            panic!("expected CreateNode for label A, got {:?}", events[0]);
463        }
464    }
465
466    #[test]
467    fn arm_while_armed_poisons() {
468        let dir = TmpDir::new("double-arm");
469        let recorder = WalRecorder::new(open_wal(&dir.path));
470        recorder.arm().unwrap();
471        let err = recorder.arm().unwrap_err();
472        assert!(matches!(err, WalError::Poisoned));
473        assert!(recorder.is_poisoned());
474    }
475
476    #[test]
477    fn poisoned_recorder_swallows_subsequent_records() {
478        let dir = TmpDir::new("swallow");
479        let recorder = WalRecorder::new(open_wal(&dir.path));
480
481        // Poison it.
482        recorder.record(&MutationEvent::Clear);
483        assert!(recorder.is_poisoned());
484
485        // After poisoning, further `record` calls must NOT touch the
486        // WAL or panic — they're a no-op so the engine can finish
487        // unwinding before the host observes `poisoned()` and fails
488        // the query.
489        for _ in 0..10 {
490            recorder.record(&MutationEvent::Clear);
491        }
492        assert!(recorder.is_poisoned());
493    }
494
495    #[test]
496    fn checkpoint_marker_through_recorder() {
497        let dir = TmpDir::new("ckpt");
498        let recorder = WalRecorder::new(open_wal(&dir.path));
499
500        recorder.arm().unwrap();
501        recorder.record(&MutationEvent::Clear);
502        assert_eq!(recorder.commit().unwrap(), WroteCommit::Yes);
503        recorder.force_fsync().unwrap();
504        let snapshot_lsn = recorder.wal().durable_lsn();
505
506        // Exercise the marker path via the recorder's shim after a
507        // real durable fence exists.
508        let marker_lsn = recorder.checkpoint_marker(snapshot_lsn).unwrap();
509        recorder.force_fsync().unwrap();
510        assert!(marker_lsn >= Lsn::new(1));
511
512        let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
513        assert_eq!(outcome.checkpoint_lsn_observed, Some(snapshot_lsn));
514    }
515}