Skip to main content

lora_wal/recorder/
recorder.rs

1//! [`WalRecorder`] — adapter from `MutationRecorder` to the durable
2//! [`Wal`].
3//!
4//! Lifecycle, viewed from `lora-database::Database::execute_with_params`:
5//!
6//! 1. Acquire the store write lock.
7//! 2. `recorder.arm()` — marks the recorder as inside-a-query but
8//!    appends nothing to the WAL yet. A pure read query that fires
9//!    no `MutationEvent` therefore touches the WAL zero times: no
10//!    `TxBegin`, no `TxCommit`, no `flush`, no `fsync`.
11//! 3. Run analyze + compile + execute. The executor mutates the
12//!    in-memory store, which fires `MutationRecorder::record` for each
13//!    primitive mutation. The adapter buffers those events in memory.
14//! 4. On Ok: `recorder.commit_and_flush_if_needed()` writes `TxBegin`,
15//!    one `MutationBatch`, and `TxCommit`, then flushes only when
16//!    `commit()` returned `WroteCommit::Yes`. A read-only query returns
17//!    `WroteCommit::No` and skips the flush entirely.
18//! 5. On Err / panic: `recorder.abort()`. If any mutation events were
19//!    buffered, the host quarantines the live handle because the engine
20//!    has no rollback. Durable recovery stays atomic because the failed
21//!    query never writes a committed batch to the WAL.
22//! 6. Before returning, the host inspects `recorder.poisoned()` once.
23//!    If `Some`, the query fails loudly with a durability error so
24//!    the caller can act on it; the WAL is now refusing further
25//!    appends until the operator restarts the database, which
26//!    recovers from the last consistent snapshot + WAL.
27//!
28//! ### Hot-path cost
29//!
30//! `record` is called once per primitive mutation. It now takes only the
31//! recorder mutex and pushes a clone into a query-local buffer; the WAL mutex,
32//! framing, checksum, and segment append work happen once at commit time.
33//!
34//! ### When `record` fires after a failed in-memory mutation
35//!
36//! `InMemoryGraph::emit` only calls the recorder *after* the mutation
37//! has been committed to the in-memory state. If the subsequent WAL
38//! append fails, the live in-memory store is briefly ahead of disk:
39//! the next query sees the partial state, but the next query also
40//! observes `poisoned() = Some(_)` and is rejected. Recovery from a
41//! snapshot + WAL after operator restart will not include the failed
42//! mutation, so durable state stays consistent. The cost is "the live
43//! process is wrong until the next restart"; the gain is that the
44//! storage trait does not need to learn about durability.
45
46use std::sync::{Arc, Mutex, MutexGuard};
47
48use lora_store::{MutationEvent, MutationRecorder};
49
50use super::errors::{WalBufferedCommitError, WalCommitError, WalPoisonError, WroteCommit};
51use super::mirror::WalMirror;
52use crate::errors::WalError;
53use crate::lsn::Lsn;
54use crate::wal::Wal;
55
56#[derive(Default)]
57struct RecorderState {
58    /// True between `arm()` and the matching `commit()` / `abort()`.
59    /// Marks the host's critical section without committing the WAL
60    /// to a transaction yet — the actual `Wal::begin` happens lazily
61    /// on the first mutation event.
62    armed: bool,
63    /// LSN of the currently-open WAL transaction, if any. Normally this is
64    /// only set inside `commit()` while the buffered batch is being written.
65    active_tx: Option<Lsn>,
66    /// Query-local mutation buffer. This lets write-heavy statements commit
67    /// as one `MutationBatch` record instead of one framed record per event.
68    buffer: Vec<MutationEvent>,
69    /// Sticky failure flag. Once set, [`MutationRecorder::record`]
70    /// becomes a no-op (we cannot append safely) and `poisoned`
71    /// surfaces the message.
72    poisoned: Option<String>,
73}
74
75/// Adapter that lets a [`Wal`] act as a [`MutationRecorder`] on
76/// [`lora_store::InMemoryGraph::set_mutation_recorder`].
77pub struct WalRecorder {
78    wal: Arc<Wal>,
79    mirror: Option<Arc<dyn WalMirror>>,
80    state: Mutex<RecorderState>,
81}
82
83impl WalRecorder {
84    pub fn new(wal: Arc<Wal>) -> Self {
85        Self::new_with_mirror(wal, None)
86    }
87
88    pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
89        Self {
90            wal,
91            mirror,
92            state: Mutex::new(RecorderState::default()),
93        }
94    }
95
96    /// Underlying log handle. Exposed so admin paths
97    /// (`Database::checkpoint_to`, `truncate_up_to`) can hit the WAL
98    /// directly without going through the recorder's transaction
99    /// state machine.
100    pub fn wal(&self) -> &Arc<Wal> {
101        &self.wal
102    }
103
104    fn state_lock(&self) -> MutexGuard<'_, RecorderState> {
105        self.state
106            .lock()
107            .unwrap_or_else(|poisoned| poisoned.into_inner())
108    }
109
110    /// Mark the recorder as inside a query critical section. No WAL
111    /// I/O happens here — `Wal::begin` is deferred until the first
112    /// mutation event fires. A pure read query that never produces a
113    /// `MutationEvent` therefore costs the WAL nothing: no record
114    /// allocation, no buffer drain, no `fsync`.
115    ///
116    /// Errors with [`WalError::Poisoned`] if a prior failure has
117    /// poisoned the recorder, or if the host is double-arming
118    /// (`arm` already in effect).
119    pub fn arm(&self) -> Result<(), WalError> {
120        let mut state = self.state_lock();
121        if state.poisoned.is_some() {
122            return Err(WalError::Poisoned);
123        }
124        if state.armed {
125            state.poisoned = Some("WalRecorder::arm called while already armed".into());
126            return Err(WalError::Poisoned);
127        }
128        state.armed = true;
129        state.buffer.clear();
130        Ok(())
131    }
132
133    /// Append a `TxCommit` for the active transaction (if any) and
134    /// clear the armed/active state.
135    ///
136    /// Returns:
137    /// - [`WroteCommit::Yes`] when a lazy `TxBegin` had been issued
138    ///   and a matching `TxCommit` was now appended. The host should
139    ///   `flush()` next under `SyncMode::PerCommit`.
140    /// - [`WroteCommit::No`] when no mutations fired during the query
141    ///   and no records were written. The host should skip `flush()`.
142    pub fn commit(&self) -> Result<WroteCommit, WalError> {
143        let mut state = self.state_lock();
144        if state.poisoned.is_some() {
145            return Err(WalError::Poisoned);
146        }
147        if !state.armed {
148            state.poisoned = Some("WalRecorder::commit called without an armed query".into());
149            return Err(WalError::Poisoned);
150        }
151        state.armed = false;
152        if state.buffer.is_empty() && state.active_tx.is_none() {
153            return Ok(WroteCommit::No);
154        }
155
156        let events = std::mem::take(&mut state.buffer);
157        let tx = match state.active_tx {
158            Some(tx) => tx,
159            None => self.wal.begin().inspect_err(|e| {
160                state.poisoned = Some(e.to_string());
161            })?,
162        };
163        state.active_tx = Some(tx);
164
165        self.wal.append_batch(tx, events).inspect_err(|e| {
166            state.poisoned = Some(e.to_string());
167        })?;
168        self.wal.commit(tx).inspect_err(|e| {
169            state.poisoned = Some(e.to_string());
170        })?;
171        state.active_tx = None;
172        Ok(WroteCommit::Yes)
173    }
174
175    /// Commit the currently armed recorder and flush only when a commit record
176    /// was written. This is the normal durable boundary for query-scoped writes.
177    pub fn commit_and_flush_if_needed(&self) -> Result<WroteCommit, WalCommitError> {
178        let wrote_commit = self.commit().map_err(WalCommitError::Commit)?;
179        if wrote_commit.wrote() {
180            self.flush().map_err(WalCommitError::Flush)?;
181        }
182        Ok(wrote_commit)
183    }
184
185    /// Commit an explicit transaction's buffered mutation events as one durable
186    /// WAL transaction. The recorder is armed only for this replay window.
187    pub fn commit_events(
188        &self,
189        events: impl IntoIterator<Item = MutationEvent>,
190    ) -> Result<WroteCommit, WalBufferedCommitError> {
191        let mut events = events.into_iter().peekable();
192        if events.peek().is_none() {
193            self.ensure_not_poisoned()
194                .map_err(|e| WalBufferedCommitError::Poisoned(e.reason().to_string()))?;
195            return Ok(WroteCommit::No);
196        }
197
198        self.arm().map_err(WalBufferedCommitError::Arm)?;
199        for event in events {
200            self.record(event);
201            if let Some(reason) = self.poisoned_reason() {
202                return Err(WalBufferedCommitError::ReplayPoisoned(reason));
203            }
204        }
205
206        self.commit_and_flush_if_needed().map_err(Into::into)
207    }
208
209    /// Append a `TxAbort` for the active transaction (if any) and
210    /// clear the armed/active state. Returns `Ok(true)` when the live graph
211    /// may have observed mutations and should be quarantined, `Ok(false)` when
212    /// the query never mutated anything.
213    pub fn abort(&self) -> Result<bool, WalError> {
214        let mut state = self.state_lock();
215        if state.poisoned.is_some() {
216            return Err(WalError::Poisoned);
217        }
218        // Tolerate "abort without arm" — the host calls abort in
219        // unwind paths and we'd rather no-op than poison.
220        state.armed = false;
221        let had_buffered_events = !state.buffer.is_empty();
222        state.buffer.clear();
223        match state.active_tx.take() {
224            Some(tx) => {
225                self.wal.abort(tx).inspect_err(|e| {
226                    state.poisoned = Some(e.to_string());
227                })?;
228                Ok(true)
229            }
230            None => Ok(had_buffered_events),
231        }
232    }
233
234    /// Flush the WAL — write the pending buffer to the OS and
235    /// (under `SyncMode::PerCommit`) `fsync`.
236    pub fn flush(&self) -> Result<(), WalError> {
237        let mut state = self.state_lock();
238        if state.poisoned.is_some() {
239            return Err(WalError::Poisoned);
240        }
241        self.wal.flush().inspect_err(|e| {
242            state.poisoned = Some(e.to_string());
243        })?;
244        if let Some(mirror) = &self.mirror {
245            mirror.persist(self.wal.dir()).inspect_err(|e| {
246                state.poisoned = Some(e.to_string());
247            })?;
248        }
249        Ok(())
250    }
251
252    /// Force the underlying WAL to write, `fsync`, and advance its
253    /// durable fence regardless of the configured sync mode. Admin
254    /// paths use this when they need a durability point immediately.
255    pub fn force_fsync(&self) -> Result<(), WalError> {
256        let mut state = self.state_lock();
257        if state.poisoned.is_some() {
258            return Err(WalError::Poisoned);
259        }
260        self.wal.force_fsync().inspect_err(|e| {
261            state.poisoned = Some(e.to_string());
262        })?;
263        if let Some(mirror) = &self.mirror {
264            mirror.persist_force(self.wal.dir()).inspect_err(|e| {
265                state.poisoned = Some(e.to_string());
266            })?;
267        }
268        Ok(())
269    }
270
271    /// Append a `Checkpoint` marker. Used by the checkpoint admin
272    /// path after a successful snapshot rename — the marker doubles
273    /// as the log-side fence the next replay will trust.
274    pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
275        let mut state = self.state_lock();
276        if state.poisoned.is_some() {
277            return Err(WalError::Poisoned);
278        }
279        self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
280            state.poisoned = Some(e.to_string());
281        })
282    }
283
284    /// Drop sealed segments at or below `fence_lsn`. Forwards to
285    /// [`Wal::truncate_up_to`].
286    pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
287        // Archive-backed databases must stay self-contained. Until snapshot
288        // checkpoint payloads are stored inside the archive too, preserving the
289        // full WAL history is the only safe way to let the archive recover by
290        // itself after a checkpoint marker.
291        if let Some(mirror) = &self.mirror {
292            mirror.persist_force(self.wal.dir())?;
293            return Ok(());
294        }
295        self.wal.truncate_up_to(fence_lsn)?;
296        Ok(())
297    }
298
299    /// True iff the recorder has already failed an append, **or** the
300    /// background flusher has latched a failure. Cheap to poll under
301    /// the store lock.
302    pub fn is_poisoned(&self) -> bool {
303        self.poisoned_reason().is_some()
304    }
305
306    pub fn poisoned_reason(&self) -> Option<String> {
307        let state = self.state_lock();
308        if let Some(msg) = state.poisoned.clone() {
309            return Some(msg);
310        }
311        self.wal.bg_failure()
312    }
313
314    pub fn ensure_not_poisoned(&self) -> Result<(), WalPoisonError> {
315        if let Some(reason) = self.poisoned_reason() {
316            return Err(WalPoisonError { reason });
317        }
318        Ok(())
319    }
320
321    /// Quarantine the recorder after the host detects that the live
322    /// in-memory graph may no longer match durable state. Once poisoned,
323    /// future query arms fail until the database is restarted from a
324    /// snapshot + WAL.
325    pub fn poison(&self, reason: impl Into<String>) {
326        let mut state = self.state_lock();
327        state.poisoned.get_or_insert_with(|| reason.into());
328        state.active_tx = None;
329        state.armed = false;
330        state.buffer.clear();
331    }
332
333    /// Test helper: clear the poisoned flag and reset the active
334    /// transaction. Production code should not call this — once the
335    /// WAL is poisoned the right move is to fail loudly and let the
336    /// operator restart from the last snapshot + WAL.
337    #[doc(hidden)]
338    pub fn clear_poisoned_for_tests(&self) {
339        let mut state = self.state_lock();
340        state.poisoned = None;
341        state.active_tx = None;
342        state.armed = false;
343        state.buffer.clear();
344    }
345}
346
347impl MutationRecorder for WalRecorder {
348    fn record(&self, event: MutationEvent) {
349        let mut state = self.state_lock();
350        if state.poisoned.is_some() {
351            return;
352        }
353        if !state.armed {
354            state.poisoned.get_or_insert_with(|| {
355                "MutationRecorder::record fired outside an armed query".into()
356            });
357            return;
358        }
359        state.buffer.push(event);
360    }
361
362    fn poisoned(&self) -> Option<String> {
363        // Surface a latched bg-flusher failure too — the recorder is
364        // the host's single point of contact for "is the WAL still
365        // safe to commit through?".
366        self.poisoned_reason()
367    }
368}