Skip to main content

lora_wal/
replay.rs

1//! WAL replay: walk segments, surface only committed mutation events,
2//! and report the byte position of any torn tail so the caller can
3//! truncate the active segment cleanly.
4//!
5//! The walk is deliberately a single forward sweep over the segments in
6//! ascending id order. Per-transaction events are buffered in a
7//! `BTreeMap<tx_begin_lsn, Vec<MutationEvent>>` and only released on the
8//! corresponding [`WalRecord::TxCommit`]; an [`WalRecord::TxAbort`] (or
9//! a missing commit at end-of-log) causes the bucket to be dropped
10//! without emission. This is the contract the WAL plan calls
11//! "per-mutation log, per-query commit": the recorder writes one record
12//! per primitive mutation but replay only ever reapplies whole queries.
13//!
14//! Replay walks past `checkpoint_lsn`: any record whose LSN is at or
15//! below it is already represented in the loaded snapshot and is
16//! skipped. Because checkpoints are taken with the store write lock
17//! held, no transaction can straddle the fence — `tx_begin_lsn <=
18//! checkpoint_lsn` implies the matching commit (if any) is also at or
19//! below it.
20
21use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::errors::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::{SegmentReader, SEGMENT_HEADER_LEN};
31
32/// Outcome of a full replay walk.
33#[derive(Debug)]
34pub struct ReplayOutcome {
35    /// Mutation events from committed transactions, in append order.
36    /// Apply these to a fresh store (or one freshly loaded from a
37    /// snapshot at `checkpoint_lsn`) to reproduce the pre-crash state.
38    pub committed_events: Vec<MutationEvent>,
39
40    /// Highest LSN observed in any segment, regardless of whether the
41    /// owning transaction committed. Used to seed `next_lsn` for new
42    /// appends so we never reuse an already-allocated id.
43    pub max_lsn: Lsn,
44
45    /// Torn-tail diagnostic. `Some` iff a record failed to decode
46    /// before the natural end-of-log. The caller must truncate the
47    /// affected segment to `last_good_offset` before resuming
48    /// appends, otherwise replay-after-replay will keep tripping the
49    /// same CRC.
50    pub torn_tail: Option<TornTailInfo>,
51
52    /// Newest checkpoint LSN observed in a [`WalRecord::Checkpoint`]
53    /// marker.
54    ///
55    /// Informational only. The recovery contract today is "the
56    /// snapshot's `wal_lsn` is the replay fence" — if the operator
57    /// passes an older snapshot than the newest checkpoint marker on
58    /// disk we still replay every record above the snapshot's fence,
59    /// which is conservative-correct (the only cost is duplicated
60    /// work). A tighter "marker overrides snapshot" contract is
61    /// deferred to v2 because it would require us to know that the
62    /// marker's snapshot file actually exists where the operator can
63    /// reach it, which is a separate observability concern.
64    ///
65    /// Surfaced so callers (e.g. `lora-database::Database::recover`)
66    /// can log a warning when the snapshot is older than the newest
67    /// observed marker.
68    pub checkpoint_lsn_observed: Option<Lsn>,
69
70    /// Offset immediately after the last well-formed record in the last
71    /// segment walked. `Wal::open` uses this to reopen the active writer
72    /// without performing a second full scan of the active segment.
73    pub last_good_offset: u64,
74}
75
76#[derive(Debug)]
77pub struct TornTailInfo {
78    pub segment_path: PathBuf,
79    pub last_good_offset: u64,
80    pub cause: WalError,
81}
82
83/// Walk every segment in `paths` (already in ascending id order).
84///
85/// Records with `lsn <= checkpoint_lsn` are skipped — they are already
86/// captured in the snapshot the caller is about to restore from.
87pub(crate) fn replay_segments(
88    paths: &[PathBuf],
89    checkpoint_lsn: Lsn,
90) -> Result<ReplayOutcome, WalError> {
91    let mut pending: BTreeMap<Lsn, Vec<MutationEvent>> = BTreeMap::new();
92    let mut committed: Vec<MutationEvent> = Vec::new();
93    let mut max_lsn = Lsn::ZERO;
94    let mut last_lsn = Lsn::ZERO;
95    let mut last_segment_base: Option<Lsn> = None;
96    let mut torn_tail: Option<TornTailInfo> = None;
97    let mut checkpoint_lsn_observed: Option<Lsn> = None;
98    let mut last_good_offset = SEGMENT_HEADER_LEN as u64;
99
100    'outer: for path in paths {
101        let mut reader = SegmentReader::open(path)?;
102        last_good_offset = reader.position();
103        let segment_base = reader.header().base_lsn;
104        if let Some(prev_base) = last_segment_base {
105            if segment_base <= prev_base {
106                return Err(WalError::Malformed(format!(
107                    "segment base_lsn {} is not greater than previous base_lsn {} ({})",
108                    segment_base.raw(),
109                    prev_base.raw(),
110                    path.display()
111                )));
112            }
113        }
114        if !last_lsn.is_zero() && segment_base <= last_lsn {
115            return Err(WalError::Malformed(format!(
116                "segment base_lsn {} is not greater than previous record lsn {} ({})",
117                segment_base.raw(),
118                last_lsn.raw(),
119                path.display()
120            )));
121        }
122        last_segment_base = Some(segment_base);
123
124        loop {
125            // Capture position before the read so we can report the
126            // start-of-bad-record offset on torn tail.
127            let before = reader.position();
128            match reader.read_record() {
129                Ok(Some(record)) => {
130                    let lsn = record.lsn();
131                    if lsn < segment_base {
132                        return Err(WalError::Malformed(format!(
133                            "record lsn {} is below segment base_lsn {} ({})",
134                            lsn.raw(),
135                            segment_base.raw(),
136                            path.display()
137                        )));
138                    }
139                    if !last_lsn.is_zero() && lsn <= last_lsn {
140                        return Err(WalError::Malformed(format!(
141                            "record lsn {} is not greater than previous lsn {} ({})",
142                            lsn.raw(),
143                            last_lsn.raw(),
144                            path.display()
145                        )));
146                    }
147                    last_lsn = lsn;
148                    if lsn > max_lsn {
149                        max_lsn = lsn;
150                    }
151                    last_good_offset = reader.position();
152                    if lsn.raw() <= checkpoint_lsn.raw() {
153                        // Already in the snapshot. Markers below the
154                        // fence still need to keep their pending
155                        // bucket clean, but no checkpoint can split
156                        // a transaction (the store write lock is held
157                        // during checkpoint), so dropping events outright
158                        // is safe.
159                        if let WalRecord::TxCommit { tx_begin_lsn, .. }
160                        | WalRecord::TxAbort { tx_begin_lsn, .. } = &record
161                        {
162                            pending.remove(tx_begin_lsn);
163                        }
164                        continue;
165                    }
166                    match record {
167                        WalRecord::Mutation {
168                            tx_begin_lsn,
169                            event,
170                            ..
171                        } => {
172                            let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
173                                WalError::Malformed(format!(
174                                    "mutation at lsn {} references missing tx begin {}",
175                                    lsn.raw(),
176                                    tx_begin_lsn.raw()
177                                ))
178                            })?;
179                            events.push(event);
180                        }
181                        WalRecord::MutationBatch {
182                            tx_begin_lsn,
183                            events: batch,
184                            ..
185                        } => {
186                            let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
187                                WalError::Malformed(format!(
188                                    "mutation batch at lsn {} references missing tx begin {}",
189                                    lsn.raw(),
190                                    tx_begin_lsn.raw()
191                                ))
192                            })?;
193                            events.extend(batch);
194                        }
195                        WalRecord::TxBegin { lsn } => {
196                            // Materialise the bucket eagerly so
197                            // begin-without-mutations transactions
198                            // still get a deterministic commit/abort.
199                            if pending.insert(lsn, Vec::new()).is_some() {
200                                return Err(WalError::Malformed(format!(
201                                    "duplicate tx begin at lsn {}",
202                                    lsn.raw()
203                                )));
204                            }
205                        }
206                        WalRecord::TxCommit { tx_begin_lsn, .. } => {
207                            let events = pending.remove(&tx_begin_lsn).ok_or_else(|| {
208                                WalError::Malformed(format!(
209                                    "commit at lsn {} references missing tx begin {}",
210                                    lsn.raw(),
211                                    tx_begin_lsn.raw()
212                                ))
213                            })?;
214                            committed.extend(events);
215                        }
216                        WalRecord::TxAbort { tx_begin_lsn, .. } => {
217                            pending.remove(&tx_begin_lsn).ok_or_else(|| {
218                                WalError::Malformed(format!(
219                                    "abort at lsn {} references missing tx begin {}",
220                                    lsn.raw(),
221                                    tx_begin_lsn.raw()
222                                ))
223                            })?;
224                        }
225                        WalRecord::Checkpoint { snapshot_lsn, .. } => {
226                            if snapshot_lsn > lsn {
227                                return Err(WalError::Malformed(format!(
228                                    "checkpoint at lsn {} points to future snapshot lsn {}",
229                                    lsn.raw(),
230                                    snapshot_lsn.raw()
231                                )));
232                            }
233                            if let Some(prev) = checkpoint_lsn_observed {
234                                if snapshot_lsn < prev {
235                                    return Err(WalError::Malformed(format!(
236                                        "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
237                                        snapshot_lsn.raw(),
238                                        prev.raw()
239                                    )));
240                                }
241                            }
242                            checkpoint_lsn_observed = Some(snapshot_lsn);
243                        }
244                    }
245                }
246                Ok(None) => break,
247                Err(err) => {
248                    torn_tail = Some(TornTailInfo {
249                        segment_path: path.clone(),
250                        last_good_offset: before,
251                        cause: err,
252                    });
253                    break 'outer;
254                }
255            }
256        }
257    }
258
259    // Any transaction still in `pending` at end-of-log was started but
260    // never committed (and never explicitly aborted). Treat as crashed
261    // mid-query and discard.
262    drop(pending);
263
264    Ok(ReplayOutcome {
265        committed_events: committed,
266        max_lsn,
267        torn_tail,
268        checkpoint_lsn_observed,
269        last_good_offset,
270    })
271}
272
273/// Convenience: replay every `*.wal` segment in `dir` after sorting by
274/// the numeric prefix encoded in the file name.
275///
276/// Used by recovery diagnostics in `lora-database::Database::recover`
277/// to peek at the newest checkpoint marker before opening the live
278/// `Wal` handle.
279pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
280    let entries = SegmentDir::new(dir).list()?;
281    let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
282    replay_segments(&paths, checkpoint_lsn)
283}