Skip to main content

lora_wal/
replay.rs

1//! WAL replay: walk segments, surface only committed mutation events,
2//! and report the byte position of any torn tail so the caller can
3//! truncate the active segment cleanly.
4//!
5//! The walk is deliberately a single forward sweep over the segments in
6//! ascending id order. Per-transaction events are buffered in a
7//! `BTreeMap<tx_begin_lsn, Vec<MutationEvent>>` and only released on the
8//! corresponding [`WalRecord::TxCommit`]; an [`WalRecord::TxAbort`] (or
9//! a missing commit at end-of-log) causes the bucket to be dropped
10//! without emission. This is the contract the WAL plan calls
11//! "per-mutation log, per-query commit": the recorder writes one record
12//! per primitive mutation but replay only ever reapplies whole queries.
13//!
14//! Replay walks past `checkpoint_lsn`: any record whose LSN is at or
15//! below it is already represented in the loaded snapshot and is
16//! skipped. Because checkpoints are taken with the store write lock
17//! held, no transaction can straddle the fence — `tx_begin_lsn <=
18//! checkpoint_lsn` implies the matching commit (if any) is also at or
19//! below it.
20
21use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::error::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::SegmentReader;
31
32/// Outcome of a full replay walk.
33#[derive(Debug)]
34pub struct ReplayOutcome {
35    /// Mutation events from committed transactions, in append order.
36    /// Apply these to a fresh store (or one freshly loaded from a
37    /// snapshot at `checkpoint_lsn`) to reproduce the pre-crash state.
38    pub committed_events: Vec<MutationEvent>,
39
40    /// Highest LSN observed in any segment, regardless of whether the
41    /// owning transaction committed. Used to seed `next_lsn` for new
42    /// appends so we never reuse an already-allocated id.
43    pub max_lsn: Lsn,
44
45    /// Torn-tail diagnostic. `Some` iff a record failed to decode
46    /// before the natural end-of-log. The caller must truncate the
47    /// affected segment to `last_good_offset` before resuming
48    /// appends, otherwise replay-after-replay will keep tripping the
49    /// same CRC.
50    pub torn_tail: Option<TornTailInfo>,
51
52    /// Newest checkpoint LSN observed in a [`WalRecord::Checkpoint`]
53    /// marker.
54    ///
55    /// Informational only. The recovery contract today is "the
56    /// snapshot's `wal_lsn` is the replay fence" — if the operator
57    /// passes an older snapshot than the newest checkpoint marker on
58    /// disk we still replay every record above the snapshot's fence,
59    /// which is conservative-correct (the only cost is duplicated
60    /// work). A tighter "marker overrides snapshot" contract is
61    /// deferred to v2 because it would require us to know that the
62    /// marker's snapshot file actually exists where the operator can
63    /// reach it, which is a separate observability concern.
64    ///
65    /// Surfaced so callers (e.g. `lora-database::Database::recover`)
66    /// can log a warning when the snapshot is older than the newest
67    /// observed marker.
68    pub checkpoint_lsn_observed: Option<Lsn>,
69}
70
71#[derive(Debug)]
72pub struct TornTailInfo {
73    pub segment_path: PathBuf,
74    pub last_good_offset: u64,
75    pub cause: WalError,
76}
77
78/// Walk every segment in `paths` (already in ascending id order).
79///
80/// Records with `lsn <= checkpoint_lsn` are skipped — they are already
81/// captured in the snapshot the caller is about to restore from.
82pub(crate) fn replay_segments(
83    paths: &[PathBuf],
84    checkpoint_lsn: Lsn,
85) -> Result<ReplayOutcome, WalError> {
86    let mut pending: BTreeMap<Lsn, Vec<MutationEvent>> = BTreeMap::new();
87    let mut committed: Vec<MutationEvent> = Vec::new();
88    let mut max_lsn = Lsn::ZERO;
89    let mut last_lsn = Lsn::ZERO;
90    let mut last_segment_base: Option<Lsn> = None;
91    let mut torn_tail: Option<TornTailInfo> = None;
92    let mut checkpoint_lsn_observed: Option<Lsn> = None;
93
94    'outer: for path in paths {
95        let mut reader = SegmentReader::open(path)?;
96        let segment_base = reader.header().base_lsn;
97        if let Some(prev_base) = last_segment_base {
98            if segment_base <= prev_base {
99                return Err(WalError::Malformed(format!(
100                    "segment base_lsn {} is not greater than previous base_lsn {} ({})",
101                    segment_base.raw(),
102                    prev_base.raw(),
103                    path.display()
104                )));
105            }
106        }
107        if !last_lsn.is_zero() && segment_base <= last_lsn {
108            return Err(WalError::Malformed(format!(
109                "segment base_lsn {} is not greater than previous record lsn {} ({})",
110                segment_base.raw(),
111                last_lsn.raw(),
112                path.display()
113            )));
114        }
115        last_segment_base = Some(segment_base);
116
117        loop {
118            // Capture position before the read so we can report the
119            // start-of-bad-record offset on torn tail.
120            let before = reader.position();
121            match reader.read_record() {
122                Ok(Some(record)) => {
123                    let lsn = record.lsn();
124                    if lsn < segment_base {
125                        return Err(WalError::Malformed(format!(
126                            "record lsn {} is below segment base_lsn {} ({})",
127                            lsn.raw(),
128                            segment_base.raw(),
129                            path.display()
130                        )));
131                    }
132                    if !last_lsn.is_zero() && lsn <= last_lsn {
133                        return Err(WalError::Malformed(format!(
134                            "record lsn {} is not greater than previous lsn {} ({})",
135                            lsn.raw(),
136                            last_lsn.raw(),
137                            path.display()
138                        )));
139                    }
140                    last_lsn = lsn;
141                    if lsn > max_lsn {
142                        max_lsn = lsn;
143                    }
144                    if lsn.raw() <= checkpoint_lsn.raw() {
145                        // Already in the snapshot. Markers below the
146                        // fence still need to keep their pending
147                        // bucket clean, but no checkpoint can split
148                        // a transaction (the store write lock is held
149                        // during checkpoint), so dropping events outright
150                        // is safe.
151                        if let WalRecord::TxCommit { tx_begin_lsn, .. }
152                        | WalRecord::TxAbort { tx_begin_lsn, .. } = &record
153                        {
154                            pending.remove(tx_begin_lsn);
155                        }
156                        continue;
157                    }
158                    match record {
159                        WalRecord::Mutation {
160                            tx_begin_lsn,
161                            event,
162                            ..
163                        } => {
164                            let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
165                                WalError::Malformed(format!(
166                                    "mutation at lsn {} references missing tx begin {}",
167                                    lsn.raw(),
168                                    tx_begin_lsn.raw()
169                                ))
170                            })?;
171                            events.push(event);
172                        }
173                        WalRecord::MutationBatch {
174                            tx_begin_lsn,
175                            events: batch,
176                            ..
177                        } => {
178                            let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
179                                WalError::Malformed(format!(
180                                    "mutation batch at lsn {} references missing tx begin {}",
181                                    lsn.raw(),
182                                    tx_begin_lsn.raw()
183                                ))
184                            })?;
185                            events.extend(batch);
186                        }
187                        WalRecord::TxBegin { lsn } => {
188                            // Materialise the bucket eagerly so
189                            // begin-without-mutations transactions
190                            // still get a deterministic commit/abort.
191                            if pending.insert(lsn, Vec::new()).is_some() {
192                                return Err(WalError::Malformed(format!(
193                                    "duplicate tx begin at lsn {}",
194                                    lsn.raw()
195                                )));
196                            }
197                        }
198                        WalRecord::TxCommit { tx_begin_lsn, .. } => {
199                            let events = pending.remove(&tx_begin_lsn).ok_or_else(|| {
200                                WalError::Malformed(format!(
201                                    "commit at lsn {} references missing tx begin {}",
202                                    lsn.raw(),
203                                    tx_begin_lsn.raw()
204                                ))
205                            })?;
206                            committed.extend(events);
207                        }
208                        WalRecord::TxAbort { tx_begin_lsn, .. } => {
209                            pending.remove(&tx_begin_lsn).ok_or_else(|| {
210                                WalError::Malformed(format!(
211                                    "abort at lsn {} references missing tx begin {}",
212                                    lsn.raw(),
213                                    tx_begin_lsn.raw()
214                                ))
215                            })?;
216                        }
217                        WalRecord::Checkpoint { snapshot_lsn, .. } => {
218                            if snapshot_lsn > lsn {
219                                return Err(WalError::Malformed(format!(
220                                    "checkpoint at lsn {} points to future snapshot lsn {}",
221                                    lsn.raw(),
222                                    snapshot_lsn.raw()
223                                )));
224                            }
225                            if let Some(prev) = checkpoint_lsn_observed {
226                                if snapshot_lsn < prev {
227                                    return Err(WalError::Malformed(format!(
228                                        "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
229                                        snapshot_lsn.raw(),
230                                        prev.raw()
231                                    )));
232                                }
233                            }
234                            checkpoint_lsn_observed = Some(snapshot_lsn);
235                        }
236                    }
237                }
238                Ok(None) => break,
239                Err(err) => {
240                    torn_tail = Some(TornTailInfo {
241                        segment_path: path.clone(),
242                        last_good_offset: before,
243                        cause: err,
244                    });
245                    break 'outer;
246                }
247            }
248        }
249    }
250
251    // Any transaction still in `pending` at end-of-log was started but
252    // never committed (and never explicitly aborted). Treat as crashed
253    // mid-query and discard.
254    drop(pending);
255
256    Ok(ReplayOutcome {
257        committed_events: committed,
258        max_lsn,
259        torn_tail,
260        checkpoint_lsn_observed,
261    })
262}
263
264/// Convenience: replay every `*.wal` segment in `dir` after sorting by
265/// the numeric prefix encoded in the file name.
266///
267/// Used by recovery diagnostics in `lora-database::Database::recover`
268/// to peek at the newest checkpoint marker before opening the live
269/// `Wal` handle.
270pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
271    let entries = SegmentDir::new(dir).list()?;
272    let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
273    replay_segments(&paths, checkpoint_lsn)
274}