lora_wal/replay.rs
1//! WAL replay: walk segments, surface only committed mutation events,
2//! and report the byte position of any torn tail so the caller can
3//! truncate the active segment cleanly.
4//!
5//! The walk is deliberately a single forward sweep over the segments in
6//! ascending id order. Per-transaction events are buffered in a
7//! `BTreeMap<tx_begin_lsn, Vec<MutationEvent>>` and only released on the
8//! corresponding [`WalRecord::TxCommit`]; an [`WalRecord::TxAbort`] (or
9//! a missing commit at end-of-log) causes the bucket to be dropped
10//! without emission. This is the contract the WAL plan calls
11//! "per-mutation log, per-query commit": the recorder writes one record
12//! per primitive mutation but replay only ever reapplies whole queries.
13//!
14//! Replay walks past `checkpoint_lsn`: any record whose LSN is at or
15//! below it is already represented in the loaded snapshot and is
16//! skipped. Because checkpoints are taken with the engine mutex held,
17//! no transaction can straddle the fence — `tx_begin_lsn <=
18//! checkpoint_lsn` implies the matching commit (if any) is also at or
19//! below it.
20
21use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::error::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::SegmentReader;
31
32/// Outcome of a full replay walk.
33#[derive(Debug)]
34pub struct ReplayOutcome {
35 /// Mutation events from committed transactions, in append order.
36 /// Apply these to a fresh store (or one freshly loaded from a
37 /// snapshot at `checkpoint_lsn`) to reproduce the pre-crash state.
38 pub committed_events: Vec<MutationEvent>,
39
40 /// Highest LSN observed in any segment, regardless of whether the
41 /// owning transaction committed. Used to seed `next_lsn` for new
42 /// appends so we never reuse an already-allocated id.
43 pub max_lsn: Lsn,
44
45 /// Torn-tail diagnostic. `Some` iff a record failed to decode
46 /// before the natural end-of-log. The caller must truncate the
47 /// affected segment to `last_good_offset` before resuming
48 /// appends, otherwise replay-after-replay will keep tripping the
49 /// same CRC.
50 pub torn_tail: Option<TornTailInfo>,
51
52 /// Newest checkpoint LSN observed in a [`WalRecord::Checkpoint`]
53 /// marker.
54 ///
55 /// Informational only. The recovery contract today is "the
56 /// snapshot's `wal_lsn` is the replay fence" — if the operator
57 /// passes an older snapshot than the newest checkpoint marker on
58 /// disk we still replay every record above the snapshot's fence,
59 /// which is conservative-correct (the only cost is duplicated
60 /// work). A tighter "marker overrides snapshot" contract is
61 /// deferred to v2 because it would require us to know that the
62 /// marker's snapshot file actually exists where the operator can
63 /// reach it, which is a separate observability concern.
64 ///
65 /// Surfaced so callers (e.g. `lora-database::Database::recover`)
66 /// can log a warning when the snapshot is older than the newest
67 /// observed marker.
68 pub checkpoint_lsn_observed: Option<Lsn>,
69}
70
71#[derive(Debug)]
72pub struct TornTailInfo {
73 pub segment_path: PathBuf,
74 pub last_good_offset: u64,
75 pub cause: WalError,
76}
77
78/// Walk every segment in `paths` (already in ascending id order).
79///
80/// Records with `lsn <= checkpoint_lsn` are skipped — they are already
81/// captured in the snapshot the caller is about to restore from.
82pub(crate) fn replay_segments(
83 paths: &[PathBuf],
84 checkpoint_lsn: Lsn,
85) -> Result<ReplayOutcome, WalError> {
86 let mut pending: BTreeMap<Lsn, Vec<MutationEvent>> = BTreeMap::new();
87 let mut committed: Vec<MutationEvent> = Vec::new();
88 let mut max_lsn = Lsn::ZERO;
89 let mut last_lsn = Lsn::ZERO;
90 let mut last_segment_base: Option<Lsn> = None;
91 let mut torn_tail: Option<TornTailInfo> = None;
92 let mut checkpoint_lsn_observed: Option<Lsn> = None;
93
94 'outer: for path in paths {
95 let mut reader = SegmentReader::open(path)?;
96 let segment_base = reader.header().base_lsn;
97 if let Some(prev_base) = last_segment_base {
98 if segment_base <= prev_base {
99 return Err(WalError::Malformed(format!(
100 "segment base_lsn {} is not greater than previous base_lsn {} ({})",
101 segment_base.raw(),
102 prev_base.raw(),
103 path.display()
104 )));
105 }
106 }
107 if !last_lsn.is_zero() && segment_base <= last_lsn {
108 return Err(WalError::Malformed(format!(
109 "segment base_lsn {} is not greater than previous record lsn {} ({})",
110 segment_base.raw(),
111 last_lsn.raw(),
112 path.display()
113 )));
114 }
115 last_segment_base = Some(segment_base);
116
117 loop {
118 // Capture position before the read so we can report the
119 // start-of-bad-record offset on torn tail.
120 let before = reader.position();
121 match reader.read_record() {
122 Ok(Some(record)) => {
123 let lsn = record.lsn();
124 if lsn < segment_base {
125 return Err(WalError::Malformed(format!(
126 "record lsn {} is below segment base_lsn {} ({})",
127 lsn.raw(),
128 segment_base.raw(),
129 path.display()
130 )));
131 }
132 if !last_lsn.is_zero() && lsn <= last_lsn {
133 return Err(WalError::Malformed(format!(
134 "record lsn {} is not greater than previous lsn {} ({})",
135 lsn.raw(),
136 last_lsn.raw(),
137 path.display()
138 )));
139 }
140 last_lsn = lsn;
141 if lsn > max_lsn {
142 max_lsn = lsn;
143 }
144 if lsn.raw() <= checkpoint_lsn.raw() {
145 // Already in the snapshot. Markers below the
146 // fence still need to keep their pending
147 // bucket clean, but no checkpoint can split
148 // a transaction (mutex is held during
149 // checkpoint), so dropping events outright
150 // is safe.
151 if let WalRecord::TxCommit { tx_begin_lsn, .. }
152 | WalRecord::TxAbort { tx_begin_lsn, .. } = &record
153 {
154 pending.remove(tx_begin_lsn);
155 }
156 continue;
157 }
158 match record {
159 WalRecord::Mutation {
160 tx_begin_lsn,
161 event,
162 ..
163 } => {
164 let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
165 WalError::Malformed(format!(
166 "mutation at lsn {} references missing tx begin {}",
167 lsn.raw(),
168 tx_begin_lsn.raw()
169 ))
170 })?;
171 events.push(event);
172 }
173 WalRecord::TxBegin { lsn } => {
174 // Materialise the bucket eagerly so
175 // begin-without-mutations transactions
176 // still get a deterministic commit/abort.
177 if pending.insert(lsn, Vec::new()).is_some() {
178 return Err(WalError::Malformed(format!(
179 "duplicate tx begin at lsn {}",
180 lsn.raw()
181 )));
182 }
183 }
184 WalRecord::TxCommit { tx_begin_lsn, .. } => {
185 let events = pending.remove(&tx_begin_lsn).ok_or_else(|| {
186 WalError::Malformed(format!(
187 "commit at lsn {} references missing tx begin {}",
188 lsn.raw(),
189 tx_begin_lsn.raw()
190 ))
191 })?;
192 committed.extend(events);
193 }
194 WalRecord::TxAbort { tx_begin_lsn, .. } => {
195 pending.remove(&tx_begin_lsn).ok_or_else(|| {
196 WalError::Malformed(format!(
197 "abort at lsn {} references missing tx begin {}",
198 lsn.raw(),
199 tx_begin_lsn.raw()
200 ))
201 })?;
202 }
203 WalRecord::Checkpoint { snapshot_lsn, .. } => {
204 if snapshot_lsn > lsn {
205 return Err(WalError::Malformed(format!(
206 "checkpoint at lsn {} points to future snapshot lsn {}",
207 lsn.raw(),
208 snapshot_lsn.raw()
209 )));
210 }
211 if let Some(prev) = checkpoint_lsn_observed {
212 if snapshot_lsn < prev {
213 return Err(WalError::Malformed(format!(
214 "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
215 snapshot_lsn.raw(),
216 prev.raw()
217 )));
218 }
219 }
220 checkpoint_lsn_observed = Some(snapshot_lsn);
221 }
222 }
223 }
224 Ok(None) => break,
225 Err(err) => {
226 torn_tail = Some(TornTailInfo {
227 segment_path: path.clone(),
228 last_good_offset: before,
229 cause: err,
230 });
231 break 'outer;
232 }
233 }
234 }
235 }
236
237 // Any transaction still in `pending` at end-of-log was started but
238 // never committed (and never explicitly aborted). Treat as crashed
239 // mid-query and discard.
240 drop(pending);
241
242 Ok(ReplayOutcome {
243 committed_events: committed,
244 max_lsn,
245 torn_tail,
246 checkpoint_lsn_observed,
247 })
248}
249
250/// Convenience: replay every `*.wal` segment in `dir` after sorting by
251/// the numeric prefix encoded in the file name.
252///
253/// Used by recovery diagnostics in `lora-database::Database::recover`
254/// to peek at the newest checkpoint marker before opening the live
255/// `Wal` handle.
256pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
257 let entries = SegmentDir::new(dir).list()?;
258 let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
259 replay_segments(&paths, checkpoint_lsn)
260}