lora_wal/recorder/recorder.rs
1//! [`WalRecorder`] — adapter from `MutationRecorder` to the durable
2//! [`Wal`].
3//!
4//! Lifecycle, viewed from `lora-database::Database::execute_with_params`:
5//!
6//! 1. Acquire the store write lock.
7//! 2. `recorder.arm()` — marks the recorder as inside-a-query but
8//! appends nothing to the WAL yet. A pure read query that fires
9//! no `MutationEvent` therefore touches the WAL zero times: no
10//! `TxBegin`, no `TxCommit`, no `flush`, no `fsync`.
11//! 3. Run analyze + compile + execute. The executor mutates the
12//! in-memory store, which fires `MutationRecorder::record` for each
13//! primitive mutation. The adapter buffers those events in memory.
14//! 4. On Ok: `recorder.commit_and_flush_if_needed()` writes `TxBegin`,
15//! one `MutationBatch`, and `TxCommit`, then flushes only when
16//! `commit()` returned `WroteCommit::Yes`. A read-only query returns
17//! `WroteCommit::No` and skips the flush entirely.
18//! 5. On Err / panic: `recorder.abort()`. If any mutation events were
19//! buffered, the host quarantines the live handle because the engine
20//! has no rollback. Durable recovery stays atomic because the failed
21//! query never writes a committed batch to the WAL.
22//! 6. Before returning, the host inspects `recorder.poisoned()` once.
23//! If `Some`, the query fails loudly with a durability error so
24//! the caller can act on it; the WAL is now refusing further
25//! appends until the operator restarts the database, which
26//! recovers from the last consistent snapshot + WAL.
27//!
28//! ### Hot-path cost
29//!
30//! `record` is called once per primitive mutation. It now takes only the
31//! recorder mutex and pushes a clone into a query-local buffer; the WAL mutex,
32//! framing, checksum, and segment append work happen once at commit time.
33//!
34//! ### When `record` fires after a failed in-memory mutation
35//!
36//! `InMemoryGraph::emit` only calls the recorder *after* the mutation
37//! has been committed to the in-memory state. If the subsequent WAL
38//! append fails, the live in-memory store is briefly ahead of disk:
39//! the next query sees the partial state, but the next query also
40//! observes `poisoned() = Some(_)` and is rejected. Recovery from a
41//! snapshot + WAL after operator restart will not include the failed
42//! mutation, so durable state stays consistent. The cost is "the live
43//! process is wrong until the next restart"; the gain is that the
44//! storage trait does not need to learn about durability.
45
46use std::sync::{Arc, Mutex, MutexGuard};
47
48use lora_store::{MutationEvent, MutationRecorder};
49
50use super::errors::{WalBufferedCommitError, WalCommitError, WalPoisonError, WroteCommit};
51use super::mirror::WalMirror;
52use crate::errors::WalError;
53use crate::lsn::Lsn;
54use crate::wal::Wal;
55
56#[derive(Default)]
57struct RecorderState {
58 /// True between `arm()` and the matching `commit()` / `abort()`.
59 /// Marks the host's critical section without committing the WAL
60 /// to a transaction yet — the actual `Wal::begin` happens lazily
61 /// on the first mutation event.
62 armed: bool,
63 /// LSN of the currently-open WAL transaction, if any. Normally this is
64 /// only set inside `commit()` while the buffered batch is being written.
65 active_tx: Option<Lsn>,
66 /// Query-local mutation buffer. This lets write-heavy statements commit
67 /// as one `MutationBatch` record instead of one framed record per event.
68 buffer: Vec<MutationEvent>,
69 /// Sticky failure flag. Once set, [`MutationRecorder::record`]
70 /// becomes a no-op (we cannot append safely) and `poisoned`
71 /// surfaces the message.
72 poisoned: Option<String>,
73}
74
75/// Adapter that lets a [`Wal`] act as a [`MutationRecorder`] on
76/// [`lora_store::InMemoryGraph::set_mutation_recorder`].
77pub struct WalRecorder {
78 wal: Arc<Wal>,
79 mirror: Option<Arc<dyn WalMirror>>,
80 state: Mutex<RecorderState>,
81}
82
83impl WalRecorder {
84 pub fn new(wal: Arc<Wal>) -> Self {
85 Self::new_with_mirror(wal, None)
86 }
87
88 pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
89 Self {
90 wal,
91 mirror,
92 state: Mutex::new(RecorderState::default()),
93 }
94 }
95
96 /// Underlying log handle. Exposed so admin paths
97 /// (`Database::checkpoint_to`, `truncate_up_to`) can hit the WAL
98 /// directly without going through the recorder's transaction
99 /// state machine.
100 pub fn wal(&self) -> &Arc<Wal> {
101 &self.wal
102 }
103
104 fn state_lock(&self) -> MutexGuard<'_, RecorderState> {
105 self.state
106 .lock()
107 .unwrap_or_else(|poisoned| poisoned.into_inner())
108 }
109
110 /// Mark the recorder as inside a query critical section. No WAL
111 /// I/O happens here — `Wal::begin` is deferred until the first
112 /// mutation event fires. A pure read query that never produces a
113 /// `MutationEvent` therefore costs the WAL nothing: no record
114 /// allocation, no buffer drain, no `fsync`.
115 ///
116 /// Errors with [`WalError::Poisoned`] if a prior failure has
117 /// poisoned the recorder, or if the host is double-arming
118 /// (`arm` already in effect).
119 pub fn arm(&self) -> Result<(), WalError> {
120 let mut state = self.state_lock();
121 if state.poisoned.is_some() {
122 return Err(WalError::Poisoned);
123 }
124 if state.armed {
125 state.poisoned = Some("WalRecorder::arm called while already armed".into());
126 return Err(WalError::Poisoned);
127 }
128 state.armed = true;
129 state.buffer.clear();
130 Ok(())
131 }
132
133 /// Append a `TxCommit` for the active transaction (if any) and
134 /// clear the armed/active state.
135 ///
136 /// Returns:
137 /// - [`WroteCommit::Yes`] when a lazy `TxBegin` had been issued
138 /// and a matching `TxCommit` was now appended. The host should
139 /// `flush()` next under `SyncMode::PerCommit`.
140 /// - [`WroteCommit::No`] when no mutations fired during the query
141 /// and no records were written. The host should skip `flush()`.
142 pub fn commit(&self) -> Result<WroteCommit, WalError> {
143 let mut state = self.state_lock();
144 if state.poisoned.is_some() {
145 return Err(WalError::Poisoned);
146 }
147 if !state.armed {
148 state.poisoned = Some("WalRecorder::commit called without an armed query".into());
149 return Err(WalError::Poisoned);
150 }
151 state.armed = false;
152 if state.buffer.is_empty() && state.active_tx.is_none() {
153 return Ok(WroteCommit::No);
154 }
155
156 let events = std::mem::take(&mut state.buffer);
157 let tx = match state.active_tx {
158 Some(tx) => tx,
159 None => self.wal.begin().inspect_err(|e| {
160 state.poisoned = Some(e.to_string());
161 })?,
162 };
163 state.active_tx = Some(tx);
164
165 self.wal.append_batch(tx, events).inspect_err(|e| {
166 state.poisoned = Some(e.to_string());
167 })?;
168 self.wal.commit(tx).inspect_err(|e| {
169 state.poisoned = Some(e.to_string());
170 })?;
171 state.active_tx = None;
172 Ok(WroteCommit::Yes)
173 }
174
175 /// Commit the currently armed recorder and flush only when a commit record
176 /// was written. This is the normal durable boundary for query-scoped writes.
177 pub fn commit_and_flush_if_needed(&self) -> Result<WroteCommit, WalCommitError> {
178 let wrote_commit = self.commit().map_err(WalCommitError::Commit)?;
179 if wrote_commit.wrote() {
180 self.flush().map_err(WalCommitError::Flush)?;
181 }
182 Ok(wrote_commit)
183 }
184
185 /// Commit an explicit transaction's buffered mutation events as one durable
186 /// WAL transaction. The recorder is armed only for this replay window.
187 pub fn commit_events(
188 &self,
189 events: impl IntoIterator<Item = MutationEvent>,
190 ) -> Result<WroteCommit, WalBufferedCommitError> {
191 let mut events = events.into_iter().peekable();
192 if events.peek().is_none() {
193 self.ensure_not_poisoned()
194 .map_err(|e| WalBufferedCommitError::Poisoned(e.reason().to_string()))?;
195 return Ok(WroteCommit::No);
196 }
197
198 self.arm().map_err(WalBufferedCommitError::Arm)?;
199 for event in events {
200 self.record(event);
201 if let Some(reason) = self.poisoned_reason() {
202 return Err(WalBufferedCommitError::ReplayPoisoned(reason));
203 }
204 }
205
206 self.commit_and_flush_if_needed().map_err(Into::into)
207 }
208
209 /// Append a `TxAbort` for the active transaction (if any) and
210 /// clear the armed/active state. Returns `Ok(true)` when the live graph
211 /// may have observed mutations and should be quarantined, `Ok(false)` when
212 /// the query never mutated anything.
213 pub fn abort(&self) -> Result<bool, WalError> {
214 let mut state = self.state_lock();
215 if state.poisoned.is_some() {
216 return Err(WalError::Poisoned);
217 }
218 // Tolerate "abort without arm" — the host calls abort in
219 // unwind paths and we'd rather no-op than poison.
220 state.armed = false;
221 let had_buffered_events = !state.buffer.is_empty();
222 state.buffer.clear();
223 match state.active_tx.take() {
224 Some(tx) => {
225 self.wal.abort(tx).inspect_err(|e| {
226 state.poisoned = Some(e.to_string());
227 })?;
228 Ok(true)
229 }
230 None => Ok(had_buffered_events),
231 }
232 }
233
234 /// Flush the WAL — write the pending buffer to the OS and
235 /// (under `SyncMode::PerCommit`) `fsync`.
236 pub fn flush(&self) -> Result<(), WalError> {
237 let mut state = self.state_lock();
238 if state.poisoned.is_some() {
239 return Err(WalError::Poisoned);
240 }
241 self.wal.flush().inspect_err(|e| {
242 state.poisoned = Some(e.to_string());
243 })?;
244 if let Some(mirror) = &self.mirror {
245 mirror.persist(self.wal.dir()).inspect_err(|e| {
246 state.poisoned = Some(e.to_string());
247 })?;
248 }
249 Ok(())
250 }
251
252 /// Force the underlying WAL to write, `fsync`, and advance its
253 /// durable fence regardless of the configured sync mode. Admin
254 /// paths use this when they need a durability point immediately.
255 pub fn force_fsync(&self) -> Result<(), WalError> {
256 let mut state = self.state_lock();
257 if state.poisoned.is_some() {
258 return Err(WalError::Poisoned);
259 }
260 self.wal.force_fsync().inspect_err(|e| {
261 state.poisoned = Some(e.to_string());
262 })?;
263 if let Some(mirror) = &self.mirror {
264 mirror.persist_force(self.wal.dir()).inspect_err(|e| {
265 state.poisoned = Some(e.to_string());
266 })?;
267 }
268 Ok(())
269 }
270
271 /// Append a `Checkpoint` marker. Used by the checkpoint admin
272 /// path after a successful snapshot rename — the marker doubles
273 /// as the log-side fence the next replay will trust.
274 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
275 let mut state = self.state_lock();
276 if state.poisoned.is_some() {
277 return Err(WalError::Poisoned);
278 }
279 self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
280 state.poisoned = Some(e.to_string());
281 })
282 }
283
284 /// Drop sealed segments at or below `fence_lsn`. Forwards to
285 /// [`Wal::truncate_up_to`].
286 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
287 // Archive-backed databases must stay self-contained. Until snapshot
288 // checkpoint payloads are stored inside the archive too, preserving the
289 // full WAL history is the only safe way to let the archive recover by
290 // itself after a checkpoint marker.
291 if let Some(mirror) = &self.mirror {
292 mirror.persist_force(self.wal.dir())?;
293 return Ok(());
294 }
295 self.wal.truncate_up_to(fence_lsn)?;
296 Ok(())
297 }
298
299 /// True iff the recorder has already failed an append, **or** the
300 /// background flusher has latched a failure. Cheap to poll under
301 /// the store lock.
302 pub fn is_poisoned(&self) -> bool {
303 self.poisoned_reason().is_some()
304 }
305
306 pub fn poisoned_reason(&self) -> Option<String> {
307 let state = self.state_lock();
308 if let Some(msg) = state.poisoned.clone() {
309 return Some(msg);
310 }
311 self.wal.bg_failure()
312 }
313
314 pub fn ensure_not_poisoned(&self) -> Result<(), WalPoisonError> {
315 if let Some(reason) = self.poisoned_reason() {
316 return Err(WalPoisonError { reason });
317 }
318 Ok(())
319 }
320
321 /// Quarantine the recorder after the host detects that the live
322 /// in-memory graph may no longer match durable state. Once poisoned,
323 /// future query arms fail until the database is restarted from a
324 /// snapshot + WAL.
325 pub fn poison(&self, reason: impl Into<String>) {
326 let mut state = self.state_lock();
327 state.poisoned.get_or_insert_with(|| reason.into());
328 state.active_tx = None;
329 state.armed = false;
330 state.buffer.clear();
331 }
332
333 /// Test helper: clear the poisoned flag and reset the active
334 /// transaction. Production code should not call this — once the
335 /// WAL is poisoned the right move is to fail loudly and let the
336 /// operator restart from the last snapshot + WAL.
337 #[doc(hidden)]
338 pub fn clear_poisoned_for_tests(&self) {
339 let mut state = self.state_lock();
340 state.poisoned = None;
341 state.active_tx = None;
342 state.armed = false;
343 state.buffer.clear();
344 }
345}
346
347impl MutationRecorder for WalRecorder {
348 fn record(&self, event: MutationEvent) {
349 let mut state = self.state_lock();
350 if state.poisoned.is_some() {
351 return;
352 }
353 if !state.armed {
354 state.poisoned.get_or_insert_with(|| {
355 "MutationRecorder::record fired outside an armed query".into()
356 });
357 return;
358 }
359 state.buffer.push(event);
360 }
361
362 fn poisoned(&self) -> Option<String> {
363 // Surface a latched bg-flusher failure too — the recorder is
364 // the host's single point of contact for "is the WAL still
365 // safe to commit through?".
366 self.poisoned_reason()
367 }
368}