lora_wal/recorder/recorder.rs
1//! [`WalRecorder`] — adapter from `MutationRecorder` to the durable
2//! [`Wal`].
3//!
4//! Lifecycle, viewed from `lora-database::Database::execute_with_params`:
5//!
6//! 1. Acquire the store write lock.
7//! 2. `recorder.arm()` — marks the recorder as inside-a-query but
8//! appends nothing to the WAL yet. A pure read query that fires
9//! no `MutationEvent` therefore touches the WAL zero times.
10//! 3. Run analyze + compile + execute. The executor mutates the
11//! in-memory store, which fires `MutationRecorder::record` for each
12//! primitive mutation. The adapter buffers those events in memory.
13//! 4. On Ok: `recorder.commit()` drains the buffered events and hands
14//! them to [`Wal::commit_tx`], which writes `TxBegin` +
15//! `MutationBatch` + `TxCommit` in one critical section and applies
16//! the configured single-thread flush policy. A read-only query returns
17//! `WroteCommit::No` and the WAL never wakes up.
18//! 5. On Err / panic: `recorder.abort()` discards the buffered events.
19//! Because `commit_tx` writes the begin/batch/commit triple
20//! atomically, an aborted query has *no* on-disk presence — there
21//! is no `TxBegin` to pair with a later `TxAbort`, so the WAL stays
22//! consistent without an explicit abort marker.
23//! 6. Before returning, the host inspects `recorder.poisoned()` once.
24//! If `Some`, the query fails loudly with a durability error so
25//! the caller can act on it; the WAL is now refusing further
26//! appends until the operator restarts the database, which
27//! recovers from the last consistent snapshot + WAL.
28//!
29//! ### Hot-path cost
30//!
31//! `record` is called once per primitive mutation. It takes only the
32//! recorder mutex and pushes the event into a query-local buffer; the
33//! WAL mutex, framing, checksum, and segment append happen once at
34//! commit time inside `Wal::commit_tx`.
35//!
36//! ### When `record` fires after a failed in-memory mutation
37//!
38//! `InMemoryGraph::emit` only calls the recorder *after* the mutation
39//! has been committed to the in-memory state. If the subsequent WAL
40//! append fails, the live in-memory store is briefly ahead of disk:
41//! the next query sees the partial state, but the next query also
42//! observes `poisoned() = Some(_)` and is rejected. Recovery from a
43//! snapshot + WAL after operator restart will not include the failed
44//! mutation, so durable state stays consistent. The cost is "the live
45//! process is wrong until the next restart"; the gain is that the
46//! storage trait does not need to learn about durability.
47
48use std::sync::{Arc, Mutex, MutexGuard};
49
50use lora_store::{MutationEvent, MutationRecorder};
51
52use super::errors::{WalBufferedCommitError, WalPoisonError, WroteCommit};
53use super::mirror::WalMirror;
54use crate::errors::WalError;
55use crate::lsn::Lsn;
56use crate::wal::Wal;
57
58#[derive(Default)]
59struct RecorderState {
60 /// True between `arm()` and the matching `commit()` / `abort()`.
61 /// Marks the host's critical section so [`MutationRecorder::record`]
62 /// knows whether to buffer fresh events or poison itself for an
63 /// out-of-scope call.
64 armed: bool,
65 /// Query-local mutation buffer. Drained by `commit()` and passed
66 /// to [`Wal::commit_tx`] as one batched WAL transaction; cleared
67 /// by `abort()` without ever touching the durable log.
68 buffer: Vec<MutationEvent>,
69 /// Sticky failure flag. Once set, [`MutationRecorder::record`]
70 /// becomes a no-op (we cannot append safely) and `poisoned`
71 /// surfaces the message.
72 poisoned: Option<String>,
73}
74
75/// Adapter that lets a [`Wal`] act as a [`MutationRecorder`] on
76/// [`lora_store::InMemoryGraph::set_mutation_recorder`].
77pub struct WalRecorder {
78 wal: Arc<Wal>,
79 mirror: Option<Arc<dyn WalMirror>>,
80 state: Mutex<RecorderState>,
81}
82
83impl WalRecorder {
84 pub fn new(wal: Arc<Wal>) -> Self {
85 Self::new_with_mirror(wal, None)
86 }
87
88 pub fn new_with_mirror(wal: Arc<Wal>, mirror: Option<Arc<dyn WalMirror>>) -> Self {
89 Self {
90 wal,
91 mirror,
92 state: Mutex::new(RecorderState::default()),
93 }
94 }
95
96 /// Underlying log handle. Exposed so admin paths
97 /// (`Database::checkpoint_to`, `truncate_up_to`) can hit the WAL
98 /// directly without going through the recorder's transaction
99 /// state machine.
100 pub fn wal(&self) -> &Arc<Wal> {
101 &self.wal
102 }
103
104 fn state_lock(&self) -> MutexGuard<'_, RecorderState> {
105 match self.state.lock() {
106 Ok(state) => state,
107 Err(poisoned) => {
108 let mut state = poisoned.into_inner();
109 state.poisoned.get_or_insert_with(|| {
110 "WalRecorder state lock was poisoned; buffered durability state is suspect"
111 .into()
112 });
113 state
114 }
115 }
116 }
117
118 /// Mark the recorder as inside a query critical section. No WAL
119 /// I/O happens here — `Wal::begin` is deferred until the first
120 /// mutation event fires. A pure read query that never produces a
121 /// `MutationEvent` therefore costs the WAL nothing: no record
122 /// allocation, no buffer drain, no `fsync`.
123 ///
124 /// Errors with [`WalError::Poisoned`] if a prior failure has
125 /// poisoned the recorder, or if the host is double-arming
126 /// (`arm` already in effect).
127 pub fn arm(&self) -> Result<(), WalError> {
128 let mut state = self.state_lock();
129 if state.poisoned.is_some() {
130 return Err(WalError::Poisoned);
131 }
132 if state.armed {
133 state.poisoned = Some("WalRecorder::arm called while already armed".into());
134 return Err(WalError::Poisoned);
135 }
136 state.armed = true;
137 state.buffer.clear();
138 Ok(())
139 }
140
141 /// Drain the buffered events and commit them as one durable WAL
142 /// transaction.
143 ///
144 /// Routes through [`Wal::commit_tx`], which encodes
145 /// `TxBegin` + `MutationBatch` + `TxCommit` in a single critical
146 /// section and applies the configured flush policy. Under `GroupSync`,
147 /// bytes are written before this method returns; storage durability is
148 /// completed by the background flusher or an explicit sync boundary.
149 ///
150 /// Returns:
151 /// - [`WroteCommit::Yes`] when mutation events fired and the WAL
152 /// wrote the begin/batch/commit triple.
153 /// - [`WroteCommit::No`] when no mutations fired during the query
154 /// and no records were written.
155 pub fn commit(&self) -> Result<WroteCommit, WalError> {
156 let events = {
157 let mut state = self.state_lock();
158 if state.poisoned.is_some() {
159 return Err(WalError::Poisoned);
160 }
161 if !state.armed {
162 state.poisoned = Some("WalRecorder::commit called without an armed query".into());
163 return Err(WalError::Poisoned);
164 }
165 state.armed = false;
166 std::mem::take(&mut state.buffer)
167 };
168
169 if events.is_empty() {
170 return Ok(WroteCommit::No);
171 }
172
173 self.wal.commit_tx(events).inspect_err(|e| {
174 self.state_lock()
175 .poisoned
176 .get_or_insert_with(|| e.to_string());
177 })
178 }
179
180 /// Commit an explicit transaction's externally-buffered mutation
181 /// events as one durable WAL transaction.
182 ///
183 /// Used by `lora-database`'s [`Transaction`] flow, which keeps its
184 /// own `Vec<MutationEvent>` per transaction (statements may
185 /// rollback to a savepoint, which the recorder's flat buffer
186 /// cannot model). At commit time the host hands the accumulated
187 /// events here and we route them through [`Wal::commit_tx`] in one
188 /// call.
189 ///
190 /// [`Transaction`]: lora_database::Transaction
191 pub fn commit_events(
192 &self,
193 events: impl IntoIterator<Item = MutationEvent>,
194 ) -> Result<WroteCommit, WalBufferedCommitError> {
195 self.ensure_not_poisoned()
196 .map_err(|e| WalBufferedCommitError::Poisoned(e.reason().to_string()))?;
197
198 let events: Vec<MutationEvent> = events.into_iter().collect();
199 if events.is_empty() {
200 return Ok(WroteCommit::No);
201 }
202
203 self.wal.commit_tx(events).map_err(|e| {
204 self.state_lock()
205 .poisoned
206 .get_or_insert_with(|| e.to_string());
207 WalBufferedCommitError::Commit(super::errors::WalCommitError::Commit(e))
208 })
209 }
210
211 /// Discard any buffered events and disarm the recorder.
212 ///
213 /// Because [`Wal::commit_tx`] writes the entire begin/batch/commit
214 /// triple atomically, an aborted query never has any on-disk
215 /// presence — there is no half-written transaction to follow up
216 /// with a `TxAbort` marker. The returned bool indicates whether
217 /// the query observed any mutations (so the host can decide
218 /// whether to quarantine the live in-memory graph).
219 pub fn abort(&self) -> Result<bool, WalError> {
220 let mut state = self.state_lock();
221 if state.poisoned.is_some() {
222 return Err(WalError::Poisoned);
223 }
224 // Tolerate "abort without arm" — the host calls abort in
225 // unwind paths and we'd rather no-op than poison.
226 state.armed = false;
227 let had_buffered_events = !state.buffer.is_empty();
228 state.buffer.clear();
229 Ok(had_buffered_events)
230 }
231
232 /// Flush the WAL — write the pending buffer to the OS.
233 pub fn flush(&self) -> Result<(), WalError> {
234 let mut state = self.state_lock();
235 if state.poisoned.is_some() {
236 return Err(WalError::Poisoned);
237 }
238 self.wal.flush().inspect_err(|e| {
239 state.poisoned = Some(e.to_string());
240 })?;
241 if let Some(mirror) = &self.mirror {
242 mirror.persist(self.wal.dir()).inspect_err(|e| {
243 state.poisoned = Some(e.to_string());
244 })?;
245 }
246 Ok(())
247 }
248
249 /// Force the underlying WAL to write, `fsync`, and advance its
250 /// durable fence regardless of the configured sync mode. Admin
251 /// paths use this when they need a durability point immediately.
252 pub fn force_fsync(&self) -> Result<(), WalError> {
253 let mut state = self.state_lock();
254 if state.poisoned.is_some() {
255 return Err(WalError::Poisoned);
256 }
257 self.wal.force_fsync().inspect_err(|e| {
258 state.poisoned = Some(e.to_string());
259 })?;
260 if let Some(mirror) = &self.mirror {
261 mirror.persist_force(self.wal.dir()).inspect_err(|e| {
262 state.poisoned = Some(e.to_string());
263 })?;
264 }
265 Ok(())
266 }
267
268 /// Force only the underlying WAL to storage durability, without invoking
269 /// the optional mirror. Container-backed callers use this when they need to
270 /// build a single richer mirror refresh (for example snapshot + WAL delta)
271 /// after the WAL bytes are durable.
272 pub fn force_fsync_wal_only(&self) -> Result<(), WalError> {
273 let mut state = self.state_lock();
274 if state.poisoned.is_some() {
275 return Err(WalError::Poisoned);
276 }
277 self.wal.force_fsync().inspect_err(|e| {
278 state.poisoned = Some(e.to_string());
279 })?;
280 Ok(())
281 }
282
283 /// Append a `Checkpoint` marker. Used by the checkpoint admin
284 /// path after a successful snapshot rename — the marker doubles
285 /// as the log-side fence the next replay will trust.
286 pub fn checkpoint_marker(&self, snapshot_lsn: Lsn) -> Result<Lsn, WalError> {
287 let mut state = self.state_lock();
288 if state.poisoned.is_some() {
289 return Err(WalError::Poisoned);
290 }
291 self.wal.checkpoint_marker(snapshot_lsn).inspect_err(|e| {
292 state.poisoned = Some(e.to_string());
293 })
294 }
295
296 /// Drop sealed segments at or below `fence_lsn`. Forwards to
297 /// [`Wal::truncate_up_to`].
298 pub fn truncate_up_to(&self, fence_lsn: Lsn) -> Result<(), WalError> {
299 // Archive-backed databases must stay self-contained. Until snapshot
300 // checkpoint payloads are stored inside the archive too, preserving the
301 // full WAL history is the only safe way to let the archive recover by
302 // itself after a checkpoint marker.
303 if let Some(mirror) = &self.mirror {
304 mirror.persist_force(self.wal.dir())?;
305 return Ok(());
306 }
307 self.wal.truncate_up_to(fence_lsn)?;
308 Ok(())
309 }
310
311 /// True iff the recorder has already failed an append, **or** the WAL has
312 /// latched a durability failure. Cheap to poll under the store lock.
313 pub fn is_poisoned(&self) -> bool {
314 self.poisoned_reason().is_some()
315 }
316
317 pub fn poisoned_reason(&self) -> Option<String> {
318 let state = self.state_lock();
319 if let Some(msg) = state.poisoned.clone() {
320 return Some(msg);
321 }
322 self.wal.bg_failure()
323 }
324
325 pub fn ensure_not_poisoned(&self) -> Result<(), WalPoisonError> {
326 if let Some(reason) = self.poisoned_reason() {
327 return Err(WalPoisonError { reason });
328 }
329 Ok(())
330 }
331
332 /// Quarantine the recorder after the host detects that the live
333 /// in-memory graph may no longer match durable state. Once poisoned,
334 /// future query arms fail until the database is restarted from a
335 /// snapshot + WAL.
336 pub fn poison(&self, reason: impl Into<String>) {
337 let mut state = self.state_lock();
338 state.poisoned.get_or_insert_with(|| reason.into());
339 state.armed = false;
340 state.buffer.clear();
341 }
342
343 /// Test helper: clear the poisoned flag and disarm. Production
344 /// code should not call this — once the WAL is poisoned the right
345 /// move is to fail loudly and let the operator restart from the
346 /// last snapshot + WAL.
347 #[doc(hidden)]
348 pub fn clear_poisoned_for_tests(&self) {
349 let mut state = self.state_lock();
350 state.poisoned = None;
351 state.armed = false;
352 state.buffer.clear();
353 }
354}
355
356impl MutationRecorder for WalRecorder {
357 fn record(&self, event: MutationEvent) {
358 let mut state = self.state_lock();
359 if state.poisoned.is_some() {
360 return;
361 }
362 if !state.armed {
363 state.poisoned.get_or_insert_with(|| {
364 "MutationRecorder::record fired outside an armed query".into()
365 });
366 return;
367 }
368 state.buffer.push(event);
369 }
370
371 fn poisoned(&self) -> Option<String> {
372 // Surface a latched WAL failure too — the recorder is the host's
373 // single point of contact for "is the WAL still safe to commit
374 // through?".
375 self.poisoned_reason()
376 }
377}