lora_wal/replay.rs
1//! WAL replay: walk segments, surface only committed mutation events,
2//! and report the byte position of any torn tail so the caller can
3//! truncate the active segment cleanly.
4//!
5//! The walk is deliberately a single forward sweep over the segments in
6//! ascending id order. Per-transaction events are buffered in a
7//! `BTreeMap<tx_begin_lsn, Vec<MutationEvent>>` and only released on the
8//! corresponding [`WalRecord::TxCommit`]; an [`WalRecord::TxAbort`] (or
9//! a missing commit at end-of-log) causes the bucket to be dropped
10//! without emission. This is the contract the WAL plan calls
11//! "per-mutation log, per-query commit": the recorder writes one record
12//! per primitive mutation but replay only ever reapplies whole queries.
13//!
14//! Replay walks past `checkpoint_lsn`: any record whose LSN is at or
15//! below it is already represented in the loaded snapshot and is
16//! skipped. Because checkpoints are taken with the store write lock
17//! held, no transaction can straddle the fence — `tx_begin_lsn <=
18//! checkpoint_lsn` implies the matching commit (if any) is also at or
19//! below it.
20
21use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::errors::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::{SegmentReader, SEGMENT_HEADER_LEN};
31
32/// Outcome of a full replay walk.
33#[derive(Debug)]
34pub struct ReplayOutcome {
35 /// Mutation events from committed transactions, in append order.
36 /// Apply these to a fresh store (or one freshly loaded from a
37 /// snapshot at `checkpoint_lsn`) to reproduce the pre-crash state.
38 pub committed_events: Vec<MutationEvent>,
39
40 /// Highest LSN observed in any segment, regardless of whether the
41 /// owning transaction committed. Used to seed `next_lsn` for new
42 /// appends so we never reuse an already-allocated id.
43 pub max_lsn: Lsn,
44
45 /// Torn-tail diagnostic. `Some` iff a record failed to decode
46 /// before the natural end-of-log. The caller must truncate the
47 /// affected segment to `last_good_offset` before resuming
48 /// appends, otherwise replay-after-replay will keep tripping the
49 /// same CRC.
50 pub torn_tail: Option<TornTailInfo>,
51
52 /// Newest checkpoint LSN observed in a [`WalRecord::Checkpoint`]
53 /// marker.
54 ///
55 /// Informational only. The recovery contract today is "the
56 /// snapshot's `wal_lsn` is the replay fence" — if the operator
57 /// passes an older snapshot than the newest checkpoint marker on
58 /// disk we still replay every record above the snapshot's fence,
59 /// which is conservative-correct (the only cost is duplicated
60 /// work). A tighter "marker overrides snapshot" contract is
61 /// deferred to v2 because it would require us to know that the
62 /// marker's snapshot file actually exists where the operator can
63 /// reach it, which is a separate observability concern.
64 ///
65 /// Surfaced so callers (e.g. `lora-database::Database::recover`)
66 /// can log a warning when the snapshot is older than the newest
67 /// observed marker.
68 pub checkpoint_lsn_observed: Option<Lsn>,
69
70 /// Offset immediately after the last well-formed record in the last
71 /// segment walked. `Wal::open` uses this to reopen the active writer
72 /// without performing a second full scan of the active segment.
73 pub last_good_offset: u64,
74}
75
76#[derive(Debug)]
77pub struct TornTailInfo {
78 pub segment_path: PathBuf,
79 pub last_good_offset: u64,
80 pub cause: WalError,
81}
82
83/// Walk every segment in `paths` (already in ascending id order).
84///
85/// Records with `lsn <= checkpoint_lsn` are skipped — they are already
86/// captured in the snapshot the caller is about to restore from.
87pub(crate) fn replay_segments(
88 paths: &[PathBuf],
89 checkpoint_lsn: Lsn,
90) -> Result<ReplayOutcome, WalError> {
91 let mut pending: BTreeMap<Lsn, Vec<MutationEvent>> = BTreeMap::new();
92 let mut committed: Vec<MutationEvent> = Vec::new();
93 let mut max_lsn = Lsn::ZERO;
94 let mut last_lsn = Lsn::ZERO;
95 let mut last_segment_base: Option<Lsn> = None;
96 let mut torn_tail: Option<TornTailInfo> = None;
97 let mut checkpoint_lsn_observed: Option<Lsn> = None;
98 let mut last_good_offset = SEGMENT_HEADER_LEN as u64;
99
100 'outer: for path in paths {
101 let mut reader = SegmentReader::open(path)?;
102 last_good_offset = reader.position();
103 let segment_base = reader.header().base_lsn;
104 if let Some(prev_base) = last_segment_base {
105 if segment_base <= prev_base {
106 return Err(WalError::Malformed(format!(
107 "segment base_lsn {} is not greater than previous base_lsn {} ({})",
108 segment_base.raw(),
109 prev_base.raw(),
110 path.display()
111 )));
112 }
113 }
114 if !last_lsn.is_zero() && segment_base <= last_lsn {
115 return Err(WalError::Malformed(format!(
116 "segment base_lsn {} is not greater than previous record lsn {} ({})",
117 segment_base.raw(),
118 last_lsn.raw(),
119 path.display()
120 )));
121 }
122 last_segment_base = Some(segment_base);
123
124 loop {
125 // Capture position before the read so we can report the
126 // start-of-bad-record offset on torn tail.
127 let before = reader.position();
128 match reader.read_record() {
129 Ok(Some(record)) => {
130 let lsn = record.lsn();
131 if lsn < segment_base {
132 return Err(WalError::Malformed(format!(
133 "record lsn {} is below segment base_lsn {} ({})",
134 lsn.raw(),
135 segment_base.raw(),
136 path.display()
137 )));
138 }
139 if !last_lsn.is_zero() && lsn <= last_lsn {
140 return Err(WalError::Malformed(format!(
141 "record lsn {} is not greater than previous lsn {} ({})",
142 lsn.raw(),
143 last_lsn.raw(),
144 path.display()
145 )));
146 }
147 last_lsn = lsn;
148 if lsn > max_lsn {
149 max_lsn = lsn;
150 }
151 last_good_offset = reader.position();
152 if lsn.raw() <= checkpoint_lsn.raw() {
153 // Already in the snapshot. Markers below the
154 // fence still need to keep their pending
155 // bucket clean, but no checkpoint can split
156 // a transaction (the store write lock is held
157 // during checkpoint), so dropping events outright
158 // is safe.
159 if let WalRecord::TxCommit { tx_begin_lsn, .. }
160 | WalRecord::TxAbort { tx_begin_lsn, .. } = &record
161 {
162 pending.remove(tx_begin_lsn);
163 }
164 continue;
165 }
166 match record {
167 WalRecord::Mutation {
168 tx_begin_lsn,
169 event,
170 ..
171 } => {
172 let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
173 WalError::Malformed(format!(
174 "mutation at lsn {} references missing tx begin {}",
175 lsn.raw(),
176 tx_begin_lsn.raw()
177 ))
178 })?;
179 events.push(event);
180 }
181 WalRecord::MutationBatch {
182 tx_begin_lsn,
183 events: batch,
184 ..
185 } => {
186 let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
187 WalError::Malformed(format!(
188 "mutation batch at lsn {} references missing tx begin {}",
189 lsn.raw(),
190 tx_begin_lsn.raw()
191 ))
192 })?;
193 events.extend(batch);
194 }
195 WalRecord::TxBegin { lsn } => {
196 // Materialise the bucket eagerly so
197 // begin-without-mutations transactions
198 // still get a deterministic commit/abort.
199 if pending.insert(lsn, Vec::new()).is_some() {
200 return Err(WalError::Malformed(format!(
201 "duplicate tx begin at lsn {}",
202 lsn.raw()
203 )));
204 }
205 }
206 WalRecord::TxCommit { tx_begin_lsn, .. } => {
207 let events = pending.remove(&tx_begin_lsn).ok_or_else(|| {
208 WalError::Malformed(format!(
209 "commit at lsn {} references missing tx begin {}",
210 lsn.raw(),
211 tx_begin_lsn.raw()
212 ))
213 })?;
214 committed.extend(events);
215 }
216 WalRecord::TxAbort { tx_begin_lsn, .. } => {
217 pending.remove(&tx_begin_lsn).ok_or_else(|| {
218 WalError::Malformed(format!(
219 "abort at lsn {} references missing tx begin {}",
220 lsn.raw(),
221 tx_begin_lsn.raw()
222 ))
223 })?;
224 }
225 WalRecord::Checkpoint { snapshot_lsn, .. } => {
226 if snapshot_lsn > lsn {
227 return Err(WalError::Malformed(format!(
228 "checkpoint at lsn {} points to future snapshot lsn {}",
229 lsn.raw(),
230 snapshot_lsn.raw()
231 )));
232 }
233 if let Some(prev) = checkpoint_lsn_observed {
234 if snapshot_lsn < prev {
235 return Err(WalError::Malformed(format!(
236 "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
237 snapshot_lsn.raw(),
238 prev.raw()
239 )));
240 }
241 }
242 checkpoint_lsn_observed = Some(snapshot_lsn);
243 }
244 }
245 }
246 Ok(None) => break,
247 Err(err) => {
248 torn_tail = Some(TornTailInfo {
249 segment_path: path.clone(),
250 last_good_offset: before,
251 cause: err,
252 });
253 break 'outer;
254 }
255 }
256 }
257 }
258
259 // Any transaction still in `pending` at end-of-log was started but
260 // never committed (and never explicitly aborted). Treat as crashed
261 // mid-query and discard.
262 drop(pending);
263
264 Ok(ReplayOutcome {
265 committed_events: committed,
266 max_lsn,
267 torn_tail,
268 checkpoint_lsn_observed,
269 last_good_offset,
270 })
271}
272
273/// Convenience: replay every `*.wal` segment in `dir` after sorting by
274/// the numeric prefix encoded in the file name.
275///
276/// Used by recovery diagnostics in `lora-database::Database::recover`
277/// to peek at the newest checkpoint marker before opening the live
278/// `Wal` handle.
279pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
280 let entries = SegmentDir::new(dir).list()?;
281 let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
282 replay_segments(&paths, checkpoint_lsn)
283}