Skip to main content

lora_wal/
replay.rs

1//! WAL replay: walk segments, surface only committed mutation events,
2//! and report the byte position of any torn tail so the caller can
3//! truncate the active segment cleanly.
4//!
5//! The walk is deliberately a single forward sweep over the segments in
6//! ascending id order. Per-transaction events are buffered in a
7//! `BTreeMap<tx_begin_lsn, Vec<MutationEvent>>` and only released on the
8//! corresponding [`WalRecord::TxCommit`]; an [`WalRecord::TxAbort`] (or
9//! a missing commit at end-of-log) causes the bucket to be dropped
10//! without emission. This is the contract the WAL plan calls
11//! "per-mutation log, per-query commit": the recorder writes one record
12//! per primitive mutation but replay only ever reapplies whole queries.
13//!
14//! Replay walks past `checkpoint_lsn`: any record whose LSN is at or
15//! below it is already represented in the loaded snapshot and is
16//! skipped. Because checkpoints are taken with the store write lock
17//! held, no transaction can straddle the fence — `tx_begin_lsn <=
18//! checkpoint_lsn` implies the matching commit (if any) is also at or
19//! below it.
20
21use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::errors::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::{SegmentReader, SEGMENT_HEADER_LEN};
31
32/// Outcome of a full replay walk.
33#[derive(Debug)]
34pub struct ReplayOutcome {
35    /// Mutation events from committed transactions, in append order.
36    /// Apply these to a fresh store (or one freshly loaded from a
37    /// snapshot at `checkpoint_lsn`) to reproduce the pre-crash state.
38    pub committed_events: Vec<MutationEvent>,
39
40    /// Highest LSN observed in any segment, regardless of whether the
41    /// owning transaction committed. Used to seed `next_lsn` for new
42    /// appends so we never reuse an already-allocated id.
43    pub max_lsn: Lsn,
44
45    /// Torn-tail diagnostic. `Some` iff a record failed to decode
46    /// before the natural end-of-log. The caller must truncate the
47    /// affected segment to `last_good_offset` before resuming
48    /// appends, otherwise replay-after-replay will keep tripping the
49    /// same CRC.
50    pub torn_tail: Option<TornTailInfo>,
51
52    /// Newest checkpoint LSN observed in a [`WalRecord::Checkpoint`]
53    /// marker.
54    ///
55    /// Informational only. The recovery contract today is "the
56    /// snapshot's `wal_lsn` is the replay fence" — if the operator
57    /// passes an older snapshot than the newest checkpoint marker on
58    /// disk we still replay every record above the snapshot's fence,
59    /// which is conservative-correct (the only cost is duplicated
60    /// work). A tighter "marker overrides snapshot" contract is
61    /// deferred to v2 because it would require us to know that the
62    /// marker's snapshot file actually exists where the operator can
63    /// reach it, which is a separate observability concern.
64    ///
65    /// Surfaced so callers (e.g. `lora-database::Database::recover`)
66    /// can log a warning when the snapshot is older than the newest
67    /// observed marker.
68    pub checkpoint_lsn_observed: Option<Lsn>,
69
70    /// Offset immediately after the last well-formed record in the last
71    /// segment walked. `Wal::open` uses this to reopen the active writer
72    /// without performing a second full scan of the active segment.
73    pub last_good_offset: u64,
74}
75
76#[derive(Debug)]
77pub struct TornTailInfo {
78    pub segment_path: PathBuf,
79    pub last_good_offset: u64,
80    pub cause: WalError,
81}
82
83/// Walk every segment in `paths` (already in ascending id order).
84///
85/// Records with `lsn <= checkpoint_lsn` are skipped — they are already
86/// captured in the snapshot the caller is about to restore from.
87pub(crate) fn replay_segments(
88    paths: &[PathBuf],
89    checkpoint_lsn: Lsn,
90) -> Result<ReplayOutcome, WalError> {
91    let mut state = ReplayState::new();
92    let mut torn_tail: Option<TornTailInfo> = None;
93    let mut last_good_offset = SEGMENT_HEADER_LEN as u64;
94
95    'outer: for path in paths {
96        let mut reader = SegmentReader::open(path)?;
97        last_good_offset = reader.position();
98        let segment_base = reader.header().base_lsn;
99        state.validate_segment(segment_base, path)?;
100
101        loop {
102            // Capture position before the read so we can report the
103            // start-of-bad-record offset on torn tail.
104            let before = reader.position();
105            match reader.read_record() {
106                Ok(Some(record)) => {
107                    state.accept_record(record, segment_base, checkpoint_lsn, path)?;
108                    last_good_offset = reader.position();
109                }
110                Ok(None) => break,
111                Err(err) => {
112                    torn_tail = Some(TornTailInfo {
113                        segment_path: path.clone(),
114                        last_good_offset: before,
115                        cause: err,
116                    });
117                    break 'outer;
118                }
119            }
120        }
121    }
122
123    Ok(state.finish(torn_tail, last_good_offset))
124}
125
126struct ReplayState {
127    pending: BTreeMap<Lsn, Vec<MutationEvent>>,
128    committed: Vec<MutationEvent>,
129    max_lsn: Lsn,
130    last_lsn: Lsn,
131    last_segment_base: Option<Lsn>,
132    checkpoint_lsn_observed: Option<Lsn>,
133}
134
135impl ReplayState {
136    fn new() -> Self {
137        Self {
138            pending: BTreeMap::new(),
139            committed: Vec::new(),
140            max_lsn: Lsn::ZERO,
141            last_lsn: Lsn::ZERO,
142            last_segment_base: None,
143            checkpoint_lsn_observed: None,
144        }
145    }
146
147    fn validate_segment(&mut self, segment_base: Lsn, path: &Path) -> Result<(), WalError> {
148        if let Some(prev_base) = self.last_segment_base {
149            if segment_base <= prev_base {
150                return Err(WalError::Malformed(format!(
151                    "segment base_lsn {} is not greater than previous base_lsn {} ({})",
152                    segment_base.raw(),
153                    prev_base.raw(),
154                    path.display()
155                )));
156            }
157        }
158        if !self.last_lsn.is_zero() && segment_base <= self.last_lsn {
159            return Err(WalError::Malformed(format!(
160                "segment base_lsn {} is not greater than previous record lsn {} ({})",
161                segment_base.raw(),
162                self.last_lsn.raw(),
163                path.display()
164            )));
165        }
166        self.last_segment_base = Some(segment_base);
167        Ok(())
168    }
169
170    fn accept_record(
171        &mut self,
172        record: WalRecord,
173        segment_base: Lsn,
174        checkpoint_lsn: Lsn,
175        path: &Path,
176    ) -> Result<(), WalError> {
177        let lsn = record.lsn();
178        self.validate_record_lsn(lsn, segment_base, path)?;
179        self.observe_lsn(lsn);
180
181        if lsn.raw() <= checkpoint_lsn.raw() {
182            self.skip_fenced_record(&record);
183            return Ok(());
184        }
185
186        self.apply_record(record, lsn)
187    }
188
189    fn validate_record_lsn(
190        &self,
191        lsn: Lsn,
192        segment_base: Lsn,
193        path: &Path,
194    ) -> Result<(), WalError> {
195        if lsn < segment_base {
196            return Err(WalError::Malformed(format!(
197                "record lsn {} is below segment base_lsn {} ({})",
198                lsn.raw(),
199                segment_base.raw(),
200                path.display()
201            )));
202        }
203        if !self.last_lsn.is_zero() && lsn <= self.last_lsn {
204            return Err(WalError::Malformed(format!(
205                "record lsn {} is not greater than previous lsn {} ({})",
206                lsn.raw(),
207                self.last_lsn.raw(),
208                path.display()
209            )));
210        }
211        Ok(())
212    }
213
214    fn observe_lsn(&mut self, lsn: Lsn) {
215        self.last_lsn = lsn;
216        if lsn > self.max_lsn {
217            self.max_lsn = lsn;
218        }
219    }
220
221    fn skip_fenced_record(&mut self, record: &WalRecord) {
222        // Already in the snapshot. Markers below the fence still need
223        // to keep their pending bucket clean, but no checkpoint can
224        // split a transaction (the store write lock is held during
225        // checkpoint), so dropping events outright is safe.
226        if let WalRecord::TxCommit { tx_begin_lsn, .. } | WalRecord::TxAbort { tx_begin_lsn, .. } =
227            record
228        {
229            self.pending.remove(tx_begin_lsn);
230        }
231    }
232
233    fn apply_record(&mut self, record: WalRecord, lsn: Lsn) -> Result<(), WalError> {
234        match record {
235            WalRecord::Mutation {
236                tx_begin_lsn,
237                event,
238                ..
239            } => self
240                .pending_events_mut(tx_begin_lsn, lsn, "mutation")?
241                .push(event),
242            WalRecord::MutationBatch {
243                tx_begin_lsn,
244                events,
245                ..
246            } => self
247                .pending_events_mut(tx_begin_lsn, lsn, "mutation batch")?
248                .extend(events),
249            WalRecord::TxBegin { lsn } => self.begin_transaction(lsn)?,
250            WalRecord::TxCommit { tx_begin_lsn, .. } => {
251                let events = self.take_pending(tx_begin_lsn, lsn, "commit")?;
252                self.committed.extend(events);
253            }
254            WalRecord::TxAbort { tx_begin_lsn, .. } => {
255                let _ = self.take_pending(tx_begin_lsn, lsn, "abort")?;
256            }
257            WalRecord::Checkpoint { snapshot_lsn, .. } => {
258                self.observe_checkpoint(lsn, snapshot_lsn)?;
259            }
260        }
261        Ok(())
262    }
263
264    fn begin_transaction(&mut self, lsn: Lsn) -> Result<(), WalError> {
265        // Materialise the bucket eagerly so begin-without-mutations
266        // transactions still get a deterministic commit/abort.
267        if self.pending.insert(lsn, Vec::new()).is_some() {
268            return Err(WalError::Malformed(format!(
269                "duplicate tx begin at lsn {}",
270                lsn.raw()
271            )));
272        }
273        Ok(())
274    }
275
276    fn pending_events_mut(
277        &mut self,
278        tx_begin_lsn: Lsn,
279        record_lsn: Lsn,
280        kind: &str,
281    ) -> Result<&mut Vec<MutationEvent>, WalError> {
282        self.pending
283            .get_mut(&tx_begin_lsn)
284            .ok_or_else(|| missing_tx_begin(kind, record_lsn, tx_begin_lsn))
285    }
286
287    fn take_pending(
288        &mut self,
289        tx_begin_lsn: Lsn,
290        record_lsn: Lsn,
291        kind: &str,
292    ) -> Result<Vec<MutationEvent>, WalError> {
293        self.pending
294            .remove(&tx_begin_lsn)
295            .ok_or_else(|| missing_tx_begin(kind, record_lsn, tx_begin_lsn))
296    }
297
298    fn observe_checkpoint(&mut self, lsn: Lsn, snapshot_lsn: Lsn) -> Result<(), WalError> {
299        if snapshot_lsn > lsn {
300            return Err(WalError::Malformed(format!(
301                "checkpoint at lsn {} points to future snapshot lsn {}",
302                lsn.raw(),
303                snapshot_lsn.raw()
304            )));
305        }
306        if let Some(prev) = self.checkpoint_lsn_observed {
307            if snapshot_lsn < prev {
308                return Err(WalError::Malformed(format!(
309                    "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
310                    snapshot_lsn.raw(),
311                    prev.raw()
312                )));
313            }
314        }
315        self.checkpoint_lsn_observed = Some(snapshot_lsn);
316        Ok(())
317    }
318
319    fn finish(self, torn_tail: Option<TornTailInfo>, last_good_offset: u64) -> ReplayOutcome {
320        // Any transaction still in `pending` at end-of-log was started
321        // but never committed (and never explicitly aborted). Treat as
322        // crashed mid-query and discard.
323        ReplayOutcome {
324            committed_events: self.committed,
325            max_lsn: self.max_lsn,
326            torn_tail,
327            checkpoint_lsn_observed: self.checkpoint_lsn_observed,
328            last_good_offset,
329        }
330    }
331}
332
333fn missing_tx_begin(kind: &str, record_lsn: Lsn, tx_begin_lsn: Lsn) -> WalError {
334    WalError::Malformed(format!(
335        "{kind} at lsn {} references missing tx begin {}",
336        record_lsn.raw(),
337        tx_begin_lsn.raw()
338    ))
339}
340
341/// Convenience: replay every `*.wal` segment in `dir` after sorting by
342/// the numeric prefix encoded in the file name.
343///
344/// Used by recovery diagnostics in `lora-database::Database::recover`
345/// to peek at the newest checkpoint marker before opening the live
346/// `Wal` handle.
347pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
348    let entries = SegmentDir::new(dir).list()?;
349    let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
350    replay_segments(&paths, checkpoint_lsn)
351}