lora_wal/replay.rs
1//! WAL replay: walk segments, surface only committed mutation events,
2//! and report the byte position of any torn tail so the caller can
3//! truncate the active segment cleanly.
4//!
5//! The walk is deliberately a single forward sweep over the segments in
6//! ascending id order. Per-transaction events are buffered in a
7//! `BTreeMap<tx_begin_lsn, Vec<MutationEvent>>` and only released on the
8//! corresponding [`WalRecord::TxCommit`]; an [`WalRecord::TxAbort`] (or
9//! a missing commit at end-of-log) causes the bucket to be dropped
10//! without emission. This is the contract the WAL plan calls
11//! "per-mutation log, per-query commit": the recorder writes one record
12//! per primitive mutation but replay only ever reapplies whole queries.
13//!
14//! Replay walks past `checkpoint_lsn`: any record whose LSN is at or
15//! below it is already represented in the loaded snapshot and is
16//! skipped. Because checkpoints are taken with the store write lock
17//! held, no transaction can straddle the fence — `tx_begin_lsn <=
18//! checkpoint_lsn` implies the matching commit (if any) is also at or
19//! below it.
20
21use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::error::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::SegmentReader;
31
32/// Outcome of a full replay walk.
33#[derive(Debug)]
34pub struct ReplayOutcome {
35 /// Mutation events from committed transactions, in append order.
36 /// Apply these to a fresh store (or one freshly loaded from a
37 /// snapshot at `checkpoint_lsn`) to reproduce the pre-crash state.
38 pub committed_events: Vec<MutationEvent>,
39
40 /// Highest LSN observed in any segment, regardless of whether the
41 /// owning transaction committed. Used to seed `next_lsn` for new
42 /// appends so we never reuse an already-allocated id.
43 pub max_lsn: Lsn,
44
45 /// Torn-tail diagnostic. `Some` iff a record failed to decode
46 /// before the natural end-of-log. The caller must truncate the
47 /// affected segment to `last_good_offset` before resuming
48 /// appends, otherwise replay-after-replay will keep tripping the
49 /// same CRC.
50 pub torn_tail: Option<TornTailInfo>,
51
52 /// Newest checkpoint LSN observed in a [`WalRecord::Checkpoint`]
53 /// marker.
54 ///
55 /// Informational only. The recovery contract today is "the
56 /// snapshot's `wal_lsn` is the replay fence" — if the operator
57 /// passes an older snapshot than the newest checkpoint marker on
58 /// disk we still replay every record above the snapshot's fence,
59 /// which is conservative-correct (the only cost is duplicated
60 /// work). A tighter "marker overrides snapshot" contract is
61 /// deferred to v2 because it would require us to know that the
62 /// marker's snapshot file actually exists where the operator can
63 /// reach it, which is a separate observability concern.
64 ///
65 /// Surfaced so callers (e.g. `lora-database::Database::recover`)
66 /// can log a warning when the snapshot is older than the newest
67 /// observed marker.
68 pub checkpoint_lsn_observed: Option<Lsn>,
69}
70
71#[derive(Debug)]
72pub struct TornTailInfo {
73 pub segment_path: PathBuf,
74 pub last_good_offset: u64,
75 pub cause: WalError,
76}
77
78/// Walk every segment in `paths` (already in ascending id order).
79///
80/// Records with `lsn <= checkpoint_lsn` are skipped — they are already
81/// captured in the snapshot the caller is about to restore from.
82pub(crate) fn replay_segments(
83 paths: &[PathBuf],
84 checkpoint_lsn: Lsn,
85) -> Result<ReplayOutcome, WalError> {
86 let mut pending: BTreeMap<Lsn, Vec<MutationEvent>> = BTreeMap::new();
87 let mut committed: Vec<MutationEvent> = Vec::new();
88 let mut max_lsn = Lsn::ZERO;
89 let mut last_lsn = Lsn::ZERO;
90 let mut last_segment_base: Option<Lsn> = None;
91 let mut torn_tail: Option<TornTailInfo> = None;
92 let mut checkpoint_lsn_observed: Option<Lsn> = None;
93
94 'outer: for path in paths {
95 let mut reader = SegmentReader::open(path)?;
96 let segment_base = reader.header().base_lsn;
97 if let Some(prev_base) = last_segment_base {
98 if segment_base <= prev_base {
99 return Err(WalError::Malformed(format!(
100 "segment base_lsn {} is not greater than previous base_lsn {} ({})",
101 segment_base.raw(),
102 prev_base.raw(),
103 path.display()
104 )));
105 }
106 }
107 if !last_lsn.is_zero() && segment_base <= last_lsn {
108 return Err(WalError::Malformed(format!(
109 "segment base_lsn {} is not greater than previous record lsn {} ({})",
110 segment_base.raw(),
111 last_lsn.raw(),
112 path.display()
113 )));
114 }
115 last_segment_base = Some(segment_base);
116
117 loop {
118 // Capture position before the read so we can report the
119 // start-of-bad-record offset on torn tail.
120 let before = reader.position();
121 match reader.read_record() {
122 Ok(Some(record)) => {
123 let lsn = record.lsn();
124 if lsn < segment_base {
125 return Err(WalError::Malformed(format!(
126 "record lsn {} is below segment base_lsn {} ({})",
127 lsn.raw(),
128 segment_base.raw(),
129 path.display()
130 )));
131 }
132 if !last_lsn.is_zero() && lsn <= last_lsn {
133 return Err(WalError::Malformed(format!(
134 "record lsn {} is not greater than previous lsn {} ({})",
135 lsn.raw(),
136 last_lsn.raw(),
137 path.display()
138 )));
139 }
140 last_lsn = lsn;
141 if lsn > max_lsn {
142 max_lsn = lsn;
143 }
144 if lsn.raw() <= checkpoint_lsn.raw() {
145 // Already in the snapshot. Markers below the
146 // fence still need to keep their pending
147 // bucket clean, but no checkpoint can split
148 // a transaction (the store write lock is held
149 // during checkpoint), so dropping events outright
150 // is safe.
151 if let WalRecord::TxCommit { tx_begin_lsn, .. }
152 | WalRecord::TxAbort { tx_begin_lsn, .. } = &record
153 {
154 pending.remove(tx_begin_lsn);
155 }
156 continue;
157 }
158 match record {
159 WalRecord::Mutation {
160 tx_begin_lsn,
161 event,
162 ..
163 } => {
164 let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
165 WalError::Malformed(format!(
166 "mutation at lsn {} references missing tx begin {}",
167 lsn.raw(),
168 tx_begin_lsn.raw()
169 ))
170 })?;
171 events.push(event);
172 }
173 WalRecord::MutationBatch {
174 tx_begin_lsn,
175 events: batch,
176 ..
177 } => {
178 let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
179 WalError::Malformed(format!(
180 "mutation batch at lsn {} references missing tx begin {}",
181 lsn.raw(),
182 tx_begin_lsn.raw()
183 ))
184 })?;
185 events.extend(batch);
186 }
187 WalRecord::TxBegin { lsn } => {
188 // Materialise the bucket eagerly so
189 // begin-without-mutations transactions
190 // still get a deterministic commit/abort.
191 if pending.insert(lsn, Vec::new()).is_some() {
192 return Err(WalError::Malformed(format!(
193 "duplicate tx begin at lsn {}",
194 lsn.raw()
195 )));
196 }
197 }
198 WalRecord::TxCommit { tx_begin_lsn, .. } => {
199 let events = pending.remove(&tx_begin_lsn).ok_or_else(|| {
200 WalError::Malformed(format!(
201 "commit at lsn {} references missing tx begin {}",
202 lsn.raw(),
203 tx_begin_lsn.raw()
204 ))
205 })?;
206 committed.extend(events);
207 }
208 WalRecord::TxAbort { tx_begin_lsn, .. } => {
209 pending.remove(&tx_begin_lsn).ok_or_else(|| {
210 WalError::Malformed(format!(
211 "abort at lsn {} references missing tx begin {}",
212 lsn.raw(),
213 tx_begin_lsn.raw()
214 ))
215 })?;
216 }
217 WalRecord::Checkpoint { snapshot_lsn, .. } => {
218 if snapshot_lsn > lsn {
219 return Err(WalError::Malformed(format!(
220 "checkpoint at lsn {} points to future snapshot lsn {}",
221 lsn.raw(),
222 snapshot_lsn.raw()
223 )));
224 }
225 if let Some(prev) = checkpoint_lsn_observed {
226 if snapshot_lsn < prev {
227 return Err(WalError::Malformed(format!(
228 "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
229 snapshot_lsn.raw(),
230 prev.raw()
231 )));
232 }
233 }
234 checkpoint_lsn_observed = Some(snapshot_lsn);
235 }
236 }
237 }
238 Ok(None) => break,
239 Err(err) => {
240 torn_tail = Some(TornTailInfo {
241 segment_path: path.clone(),
242 last_good_offset: before,
243 cause: err,
244 });
245 break 'outer;
246 }
247 }
248 }
249 }
250
251 // Any transaction still in `pending` at end-of-log was started but
252 // never committed (and never explicitly aborted). Treat as crashed
253 // mid-query and discard.
254 drop(pending);
255
256 Ok(ReplayOutcome {
257 committed_events: committed,
258 max_lsn,
259 torn_tail,
260 checkpoint_lsn_observed,
261 })
262}
263
264/// Convenience: replay every `*.wal` segment in `dir` after sorting by
265/// the numeric prefix encoded in the file name.
266///
267/// Used by recovery diagnostics in `lora-database::Database::recover`
268/// to peek at the newest checkpoint marker before opening the live
269/// `Wal` handle.
270pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
271 let entries = SegmentDir::new(dir).list()?;
272 let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
273 replay_segments(&paths, checkpoint_lsn)
274}