lora_wal/recorder_adapter.rs
1//! Bridge between [`lora_store::MutationRecorder`] (the storage-side
2//! observer hook) and [`crate::Wal`] (the durable log handle).
3//!
4//! Lifecycle, viewed from `lora-database::Database::execute_with_params`:
5//!
6//! 1. Acquire the store write lock.
7//! 2. `recorder.arm()` — marks the recorder as inside-a-query but
8//! appends nothing to the WAL yet. A pure read query that fires
9//! no `MutationEvent` therefore touches the WAL zero times: no
10//! `TxBegin`, no `TxCommit`, no `flush`, no `fsync`.
11//! 3. Run analyze + compile + execute. The executor mutates the
12//! in-memory store, which fires `MutationRecorder::record` for each
13//! primitive mutation. The adapter buffers those events in memory.
14//! 4. On Ok: `recorder.commit()` writes `TxBegin`, one `MutationBatch`,
15//! and `TxCommit`, then the host runs `recorder.flush()` (per the
16//! configured `SyncMode`) **only** when `commit()` returned
17//! `WroteCommit::Yes`. A read-only query returns `WroteCommit::No`
18//! and the host skips the flush entirely.
19//! 5. On Err / panic: `recorder.abort()`. If any mutation events were
20//! buffered, the host quarantines the live handle because the engine
21//! has no rollback. Durable recovery stays atomic because the failed
22//! query never writes a committed batch to the WAL.
23//! 6. Before returning, the host inspects `recorder.poisoned()` once.
24//! If `Some`, the query fails loudly with a durability error so
25//! the caller can act on it; the WAL is now refusing further
26//! appends until the operator restarts the database, which
27//! recovers from the last consistent snapshot + WAL.
28//!
29//! ### Hot-path cost
30//!
31//! `record` is called once per primitive mutation. It now takes only the
32//! recorder mutex and pushes a clone into a query-local buffer; the WAL mutex,
33//! framing, checksum, and segment append work happen once at commit time.
34//!
35//! ### When `record` fires after a failed in-memory mutation
36//!
37//! `InMemoryGraph::emit` only calls the recorder *after* the mutation
38//! has been committed to the in-memory state. If the subsequent WAL
39//! append fails, the live in-memory store is briefly ahead of disk:
40//! the next query sees the partial state, but the next query also
41//! observes `poisoned() = Some(_)` and is rejected. Recovery from a
42//! snapshot + WAL after operator restart will not include the failed
43//! mutation, so durable state stays consistent. The cost is "the live
44//! process is wrong until the next restart"; the gain is that the
45//! storage trait does not need to learn about durability.
46
47use std::path::Path;
48use std::sync::{Arc, Mutex};
49
50use lora_store::{MutationEvent, MutationRecorder};
51
52use crate::error::WalError;
53use crate::lsn::Lsn;
54use crate::wal::Wal;
55
56/// Whether [`WalRecorder::commit`] actually wrote a `TxCommit` to the
57/// log. Read-only queries — those that never trigger
58/// `MutationRecorder::record` — return [`WroteCommit::No`] so the host
59/// can skip the surrounding `flush()` and avoid a per-query `fsync`
60/// just to record an empty transaction.
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub enum WroteCommit {
63 /// A `TxBegin` had been lazily allocated and was paired with a
64 /// matching `TxCommit`. Caller should `flush()` (under PerCommit).
65 Yes,
66 /// No mutation events fired during the query, so neither `TxBegin`
67 /// nor `TxCommit` was appended. Caller can skip `flush()` entirely.
68 No,
69}
70
71/// Optional side-effect after the WAL has successfully flushed.
72///
73/// The core WAL stays directory/segment based for append performance. Higher
74/// layers can install a mirror to copy that durable directory into another
75/// representation, such as the portable `.lora` archive file used by named
76/// databases.
77pub trait WalMirror: Send + Sync {
78 fn persist(&self, wal_dir: &Path) -> Result<(), WalError>;
79}
80
81#[derive(Default)]
82struct RecorderState {
83 /// True between `arm()` and the matching `commit()` / `abort()`.
84 /// Marks the host's critical section without committing the WAL
85 /// to a transaction yet — the actual `Wal::begin` happens lazily
86 /// on the first mutation event.
87 armed: bool,
88 /// LSN of the currently-open WAL transaction, if any. Normally this is
89 /// only set inside `commit()` while the buffered batch is being written.
90 active_tx: Option<Lsn>,
91 /// Query-local mutation buffer. This lets write-heavy statements commit
92 /// as one `MutationBatch` record instead of one framed record per event.
93 buffer: Vec<MutationEvent>,
94 /// Sticky failure flag. Once set, [`MutationRecorder::record`]
95 /// becomes a no-op (we cannot append safely) and `poisoned`
96 /// surfaces the message.
97 poisoned: Option<String>,
98}
99
100/// Adapter that lets a [`Wal`] act as a [`MutationRecorder`] on
101/// [`lora_store::InMemoryGraph::set_mutation_recorder`].
102pub struct WalRecorder {
103 wal: Arc<Wal>,
104 mirror: Option<Arc<dyn WalMirror>>,
105 state: Mutex<RecorderState>,
106}
107
108impl WalRecorder {
109 pub fn new(wal: Arc<Wal>) -> Self {
110 Self::new_with_mirror(wal, None)
111 }
112
113 pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
114 Self {
115 wal,
116 mirror,
117 state: Mutex::new(RecorderState::default()),
118 }
119 }
120
121 /// Underlying log handle. Exposed so admin paths
122 /// (`Database::checkpoint_to`, `truncate_up_to`) can hit the WAL
123 /// directly without going through the recorder's transaction
124 /// state machine.
125 pub fn wal(&self) -> &Arc<Wal> {
126 &self.wal
127 }
128
129 /// Mark the recorder as inside a query critical section. No WAL
130 /// I/O happens here — `Wal::begin` is deferred until the first
131 /// mutation event fires. A pure read query that never produces a
132 /// `MutationEvent` therefore costs the WAL nothing: no record
133 /// allocation, no buffer drain, no `fsync`.
134 ///
135 /// Errors with [`WalError::Poisoned`] if a prior failure has
136 /// poisoned the recorder, or if the host is double-arming
137 /// (`arm` already in effect).
138 pub fn arm(&self) -> Result<(), WalError> {
139 let mut state = self.state.lock().unwrap();
140 if state.poisoned.is_some() {
141 return Err(WalError::Poisoned);
142 }
143 if state.armed {
144 state.poisoned = Some("WalRecorder::arm called while already armed".into());
145 return Err(WalError::Poisoned);
146 }
147 state.armed = true;
148 state.buffer.clear();
149 Ok(())
150 }
151
152 /// Append a `TxCommit` for the active transaction (if any) and
153 /// clear the armed/active state.
154 ///
155 /// Returns:
156 /// - [`WroteCommit::Yes`] when a lazy `TxBegin` had been issued
157 /// and a matching `TxCommit` was now appended. The host should
158 /// `flush()` next under `SyncMode::PerCommit`.
159 /// - [`WroteCommit::No`] when no mutations fired during the query
160 /// and no records were written. The host should skip `flush()`.
161 pub fn commit(&self) -> Result<WroteCommit, WalError> {
162 let mut state = self.state.lock().unwrap();
163 if state.poisoned.is_some() {
164 return Err(WalError::Poisoned);
165 }
166 if !state.armed {
167 state.poisoned = Some("WalRecorder::commit called without an armed query".into());
168 return Err(WalError::Poisoned);
169 }
170 state.armed = false;
171 if state.buffer.is_empty() && state.active_tx.is_none() {
172 return Ok(WroteCommit::No);
173 }
174
175 let events = std::mem::take(&mut state.buffer);
176 let tx = match state.active_tx {
177 Some(tx) => tx,
178 None => self.wal.begin().inspect_err(|e| {
179 state.poisoned = Some(e.to_string());
180 })?,
181 };
182 state.active_tx = Some(tx);
183
184 self.wal.append_batch(tx, events).inspect_err(|e| {
185 state.poisoned = Some(e.to_string());
186 })?;
187 self.wal.commit(tx).inspect_err(|e| {
188 state.poisoned = Some(e.to_string());
189 })?;
190 state.active_tx = None;
191 Ok(WroteCommit::Yes)
192 }
193
194 /// Append a `TxAbort` for the active transaction (if any) and
195 /// clear the armed/active state. Returns `Ok(true)` when the live graph
196 /// may have observed mutations and should be quarantined, `Ok(false)` when
197 /// the query never mutated anything.
198 pub fn abort(&self) -> Result<bool, WalError> {
199 let mut state = self.state.lock().unwrap();
200 if state.poisoned.is_some() {
201 return Err(WalError::Poisoned);
202 }
203 // Tolerate "abort without arm" — the host calls abort in
204 // unwind paths and we'd rather no-op than poison.
205 state.armed = false;
206 let had_buffered_events = !state.buffer.is_empty();
207 state.buffer.clear();
208 match state.active_tx.take() {
209 Some(tx) => {
210 self.wal.abort(tx).inspect_err(|e| {
211 state.poisoned = Some(e.to_string());
212 })?;
213 Ok(true)
214 }
215 None => Ok(had_buffered_events),
216 }
217 }
218
219 /// Flush the WAL — write the pending buffer to the OS and
220 /// (under `SyncMode::PerCommit`) `fsync`.
221 pub fn flush(&self) -> Result<(), WalError> {
222 let mut state = self.state.lock().unwrap();
223 if state.poisoned.is_some() {
224 return Err(WalError::Poisoned);
225 }
226 self.wal.flush().inspect_err(|e| {
227 state.poisoned = Some(e.to_string());
228 })?;
229 if let Some(mirror) = &self.mirror {
230 mirror.persist(self.wal.dir()).inspect_err(|e| {
231 state.poisoned = Some(e.to_string());
232 })?;
233 }
234 Ok(())
235 }
236
237 /// Force the underlying WAL to write, `fsync`, and advance its
238 /// durable fence regardless of the configured sync mode. Admin
239 /// paths use this when they need a durability point immediately.
240 pub fn force_fsync(&self) -> Result<(), WalError> {
241 let mut state = self.state.lock().unwrap();
242 if state.poisoned.is_some() {
243 return Err(WalError::Poisoned);
244 }
245 self.wal.force_fsync().inspect_err(|e| {
246 state.poisoned = Some(e.to_string());
247 })?;
248 if let Some(mirror) = &self.mirror {
249 mirror.persist(self.wal.dir()).inspect_err(|e| {
250 state.poisoned = Some(e.to_string());
251 })?;
252 }
253 Ok(())
254 }
255
256 /// Append a `Checkpoint` marker. Used by the checkpoint admin
257 /// path after a successful snapshot rename — the marker doubles
258 /// as the log-side fence the next replay will trust.
259 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
260 let mut state = self.state.lock().unwrap();
261 if state.poisoned.is_some() {
262 return Err(WalError::Poisoned);
263 }
264 self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
265 state.poisoned = Some(e.to_string());
266 })
267 }
268
269 /// Drop sealed segments at or below `fence_lsn`. Forwards to
270 /// [`Wal::truncate_up_to`].
271 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
272 // Archive-backed databases must stay self-contained. Until snapshot
273 // checkpoint payloads are stored inside the archive too, preserving the
274 // full WAL history is the only safe way to let the archive recover by
275 // itself after a checkpoint marker.
276 if let Some(mirror) = &self.mirror {
277 mirror.persist(self.wal.dir())?;
278 return Ok(());
279 }
280 self.wal.truncate_up_to(fence_lsn)?;
281 Ok(())
282 }
283
284 /// True iff the recorder has already failed an append, **or** the
285 /// background flusher has latched a failure. Cheap to poll under
286 /// the store lock.
287 pub fn is_poisoned(&self) -> bool {
288 if self.state.lock().unwrap().poisoned.is_some() {
289 return true;
290 }
291 self.wal.bg_failure().is_some()
292 }
293
294 /// Quarantine the recorder after the host detects that the live
295 /// in-memory graph may no longer match durable state. Once poisoned,
296 /// future query arms fail until the database is restarted from a
297 /// snapshot + WAL.
298 pub fn poison(&self, reason: impl Into<String>) {
299 let mut state = self.state.lock().unwrap();
300 state.poisoned.get_or_insert_with(|| reason.into());
301 state.active_tx = None;
302 state.armed = false;
303 state.buffer.clear();
304 }
305
306 /// Test helper: clear the poisoned flag and reset the active
307 /// transaction. Production code should not call this — once the
308 /// WAL is poisoned the right move is to fail loudly and let the
309 /// operator restart from the last snapshot + WAL.
310 #[doc(hidden)]
311 pub fn clear_poisoned_for_tests(&self) {
312 let mut state = self.state.lock().unwrap();
313 state.poisoned = None;
314 state.active_tx = None;
315 state.armed = false;
316 state.buffer.clear();
317 }
318}
319
320impl MutationRecorder for WalRecorder {
321 fn record(&self, event: &MutationEvent) {
322 let mut state = self.state.lock().unwrap();
323 if state.poisoned.is_some() {
324 return;
325 }
326 if !state.armed {
327 state.poisoned.get_or_insert_with(|| {
328 "MutationRecorder::record fired outside an armed query".into()
329 });
330 return;
331 }
332 state.buffer.push(event.clone());
333 }
334
335 fn poisoned(&self) -> Option<String> {
336 // Surface a latched bg-flusher failure too — the recorder is
337 // the host's single point of contact for "is the WAL still
338 // safe to commit through?".
339 let state = self.state.lock().unwrap();
340 if let Some(msg) = state.poisoned.clone() {
341 return Some(msg);
342 }
343 self.wal.bg_failure()
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use std::sync::Arc;
351
352 use lora_store::{GraphStorageMut, InMemoryGraph, MutationEvent, Properties, PropertyValue};
353
354 use crate::config::SyncMode;
355 use crate::testing::TmpDir;
356 use crate::Wal;
357
358 fn open_wal(dir: &std::path::Path) -> Arc<Wal> {
359 let (wal, replay) =
360 Wal::open(dir, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
361 assert!(replay.is_empty());
362 wal
363 }
364
365 #[test]
366 fn record_outside_arm_poisons() {
367 let dir = TmpDir::new("no-arm");
368 let recorder = WalRecorder::new(open_wal(&dir.path));
369 recorder.record(&MutationEvent::Clear);
370 assert!(recorder.is_poisoned());
371 let msg = recorder.poisoned().unwrap();
372 assert!(msg.contains("outside an armed query"));
373 }
374
375 #[test]
376 fn arm_record_commit_round_trip_via_in_memory_graph() {
377 let dir = TmpDir::new("happy");
378 let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
379
380 let mut g = InMemoryGraph::new();
381 g.set_mutation_recorder(Some(recorder.clone()));
382
383 recorder.arm().unwrap();
384 let mut props = Properties::new();
385 props.insert("v".into(), PropertyValue::Int(1));
386 g.create_node(vec!["N".into()], props);
387 let mut props2 = Properties::new();
388 props2.insert("v".into(), PropertyValue::Int(2));
389 g.create_node(vec!["N".into()], props2);
390 let outcome = recorder.commit().unwrap();
391 assert_eq!(outcome, WroteCommit::Yes);
392 recorder.flush().unwrap();
393
394 assert!(!recorder.is_poisoned());
395
396 // Drop every recorder clone before re-opening the directory,
397 // otherwise we'd race with our own live WAL handle.
398 g.set_mutation_recorder(None);
399 drop(recorder);
400
401 let (_wal, events) =
402 Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
403 assert_eq!(events.len(), 2);
404 assert!(matches!(events[0], MutationEvent::CreateNode { id: 0, .. }));
405 assert!(matches!(events[1], MutationEvent::CreateNode { id: 1, .. }));
406 }
407
408 #[test]
409 fn arm_then_commit_with_no_mutations_writes_nothing() {
410 let dir = TmpDir::new("ro");
411 let recorder = WalRecorder::new(open_wal(&dir.path));
412
413 // Simulate a read-only query: arm + commit without any
414 // intervening `record` calls.
415 let next_before = recorder.wal().next_lsn();
416 recorder.arm().unwrap();
417 let outcome = recorder.commit().unwrap();
418 assert_eq!(outcome, WroteCommit::No);
419 let next_after = recorder.wal().next_lsn();
420 assert_eq!(
421 next_before, next_after,
422 "read-only commit must not allocate any LSNs"
423 );
424 }
425
426 #[test]
427 fn abort_drops_in_flight_events_on_replay() {
428 let dir = TmpDir::new("abort");
429 let recorder: Arc<WalRecorder> = Arc::new(WalRecorder::new(open_wal(&dir.path)));
430
431 let mut g = InMemoryGraph::new();
432 g.set_mutation_recorder(Some(recorder.clone()));
433
434 // Tx 1 commits.
435 recorder.arm().unwrap();
436 g.create_node(vec!["A".into()], Properties::new());
437 let _ = recorder.commit().unwrap();
438 recorder.flush().unwrap();
439
440 // Tx 2 aborts: the in-memory mutation already happened (the
441 // engine has no rollback) but the WAL marks it aborted, so
442 // recovery from a fresh process must skip it.
443 recorder.arm().unwrap();
444 g.create_node(vec!["B".into()], Properties::new());
445 let aborted = recorder.abort().unwrap();
446 assert!(aborted, "abort after buffered mutations should quarantine");
447 recorder.flush().unwrap();
448
449 g.set_mutation_recorder(None);
450 drop(recorder);
451
452 let (_wal, events) =
453 Wal::open(&dir.path, SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
454 assert_eq!(events.len(), 1);
455 if let MutationEvent::CreateNode { labels, .. } = &events[0] {
456 assert_eq!(labels, &vec!["A".to_string()]);
457 } else {
458 panic!("expected CreateNode for label A, got {:?}", events[0]);
459 }
460 }
461
462 #[test]
463 fn arm_while_armed_poisons() {
464 let dir = TmpDir::new("double-arm");
465 let recorder = WalRecorder::new(open_wal(&dir.path));
466 recorder.arm().unwrap();
467 let err = recorder.arm().unwrap_err();
468 assert!(matches!(err, WalError::Poisoned));
469 assert!(recorder.is_poisoned());
470 }
471
472 #[test]
473 fn poisoned_recorder_swallows_subsequent_records() {
474 let dir = TmpDir::new("swallow");
475 let recorder = WalRecorder::new(open_wal(&dir.path));
476
477 // Poison it.
478 recorder.record(&MutationEvent::Clear);
479 assert!(recorder.is_poisoned());
480
481 // After poisoning, further `record` calls must NOT touch the
482 // WAL or panic — they're a no-op so the engine can finish
483 // unwinding before the host observes `poisoned()` and fails
484 // the query.
485 for _ in 0..10 {
486 recorder.record(&MutationEvent::Clear);
487 }
488 assert!(recorder.is_poisoned());
489 }
490
491 #[test]
492 fn checkpoint_marker_through_recorder() {
493 let dir = TmpDir::new("ckpt");
494 let recorder = WalRecorder::new(open_wal(&dir.path));
495
496 recorder.arm().unwrap();
497 recorder.record(&MutationEvent::Clear);
498 assert_eq!(recorder.commit().unwrap(), WroteCommit::Yes);
499 recorder.force_fsync().unwrap();
500 let snapshot_lsn = recorder.wal().durable_lsn();
501
502 // Exercise the marker path via the recorder's shim after a
503 // real durable fence exists.
504 let marker_lsn = recorder.checkpoint_marker(snapshot_lsn).unwrap();
505 recorder.force_fsync().unwrap();
506 assert!(marker_lsn >= Lsn::new(1));
507
508 let outcome = crate::replay::replay_dir(&dir.path, Lsn::ZERO).unwrap();
509 assert_eq!(outcome.checkpoint_lsn_observed, Some(snapshot_lsn));
510 }
511}