1use std::collections::BTreeMap;
22use std::path::{Path, PathBuf};
23
24use lora_store::MutationEvent;
25
26use crate::dir::SegmentDir;
27use crate::errors::WalError;
28use crate::lsn::Lsn;
29use crate::record::WalRecord;
30use crate::segment::{SegmentReader, SEGMENT_HEADER_LEN};
31
32#[derive(Debug)]
34pub struct ReplayOutcome {
35 pub committed_events: Vec<MutationEvent>,
39
40 pub max_lsn: Lsn,
44
45 pub torn_tail: Option<TornTailInfo>,
51
52 pub checkpoint_lsn_observed: Option<Lsn>,
69
70 pub last_good_offset: u64,
74}
75
76#[derive(Debug)]
77pub struct TornTailInfo {
78 pub segment_path: PathBuf,
79 pub last_good_offset: u64,
80 pub cause: WalError,
81}
82
83pub(crate) fn replay_segments(
88 paths: &[PathBuf],
89 checkpoint_lsn: Lsn,
90) -> Result<ReplayOutcome, WalError> {
91 let mut state = ReplayState::new();
92 let mut torn_tail: Option<TornTailInfo> = None;
93 let mut last_good_offset = SEGMENT_HEADER_LEN as u64;
94
95 'outer: for path in paths {
96 let mut reader = SegmentReader::open(path)?;
97 last_good_offset = reader.position();
98 let segment_base = reader.header().base_lsn;
99 state.validate_segment(segment_base, path)?;
100
101 loop {
102 let before = reader.position();
105 match reader.read_record() {
106 Ok(Some(record)) => {
107 state.accept_record(record, segment_base, checkpoint_lsn, path)?;
108 last_good_offset = reader.position();
109 }
110 Ok(None) => break,
111 Err(err) => {
112 torn_tail = Some(TornTailInfo {
113 segment_path: path.clone(),
114 last_good_offset: before,
115 cause: err,
116 });
117 break 'outer;
118 }
119 }
120 }
121 }
122
123 Ok(state.finish(torn_tail, last_good_offset))
124}
125
126struct ReplayState {
127 pending: BTreeMap<Lsn, Vec<MutationEvent>>,
128 committed: Vec<MutationEvent>,
129 max_lsn: Lsn,
130 last_lsn: Lsn,
131 last_segment_base: Option<Lsn>,
132 checkpoint_lsn_observed: Option<Lsn>,
133}
134
135impl ReplayState {
136 fn new() -> Self {
137 Self {
138 pending: BTreeMap::new(),
139 committed: Vec::new(),
140 max_lsn: Lsn::ZERO,
141 last_lsn: Lsn::ZERO,
142 last_segment_base: None,
143 checkpoint_lsn_observed: None,
144 }
145 }
146
147 fn validate_segment(&mut self, segment_base: Lsn, path: &Path) -> Result<(), WalError> {
148 if let Some(prev_base) = self.last_segment_base {
149 if segment_base <= prev_base {
150 return Err(WalError::Malformed(format!(
151 "segment base_lsn {} is not greater than previous base_lsn {} ({})",
152 segment_base.raw(),
153 prev_base.raw(),
154 path.display()
155 )));
156 }
157 }
158 if !self.last_lsn.is_zero() && segment_base <= self.last_lsn {
159 return Err(WalError::Malformed(format!(
160 "segment base_lsn {} is not greater than previous record lsn {} ({})",
161 segment_base.raw(),
162 self.last_lsn.raw(),
163 path.display()
164 )));
165 }
166 self.last_segment_base = Some(segment_base);
167 Ok(())
168 }
169
170 fn accept_record(
171 &mut self,
172 record: WalRecord,
173 segment_base: Lsn,
174 checkpoint_lsn: Lsn,
175 path: &Path,
176 ) -> Result<(), WalError> {
177 let lsn = record.lsn();
178 self.validate_record_lsn(lsn, segment_base, path)?;
179 self.observe_lsn(lsn);
180
181 if lsn.raw() <= checkpoint_lsn.raw() {
182 self.skip_fenced_record(&record);
183 return Ok(());
184 }
185
186 self.apply_record(record, lsn)
187 }
188
189 fn validate_record_lsn(
190 &self,
191 lsn: Lsn,
192 segment_base: Lsn,
193 path: &Path,
194 ) -> Result<(), WalError> {
195 if lsn < segment_base {
196 return Err(WalError::Malformed(format!(
197 "record lsn {} is below segment base_lsn {} ({})",
198 lsn.raw(),
199 segment_base.raw(),
200 path.display()
201 )));
202 }
203 if !self.last_lsn.is_zero() && lsn <= self.last_lsn {
204 return Err(WalError::Malformed(format!(
205 "record lsn {} is not greater than previous lsn {} ({})",
206 lsn.raw(),
207 self.last_lsn.raw(),
208 path.display()
209 )));
210 }
211 Ok(())
212 }
213
214 fn observe_lsn(&mut self, lsn: Lsn) {
215 self.last_lsn = lsn;
216 if lsn > self.max_lsn {
217 self.max_lsn = lsn;
218 }
219 }
220
221 fn skip_fenced_record(&mut self, record: &WalRecord) {
222 if let WalRecord::TxCommit { tx_begin_lsn, .. } | WalRecord::TxAbort { tx_begin_lsn, .. } =
227 record
228 {
229 self.pending.remove(tx_begin_lsn);
230 }
231 }
232
233 fn apply_record(&mut self, record: WalRecord, lsn: Lsn) -> Result<(), WalError> {
234 match record {
235 WalRecord::Mutation {
236 tx_begin_lsn,
237 event,
238 ..
239 } => self
240 .pending_events_mut(tx_begin_lsn, lsn, "mutation")?
241 .push(event),
242 WalRecord::MutationBatch {
243 tx_begin_lsn,
244 events,
245 ..
246 } => self
247 .pending_events_mut(tx_begin_lsn, lsn, "mutation batch")?
248 .extend(events),
249 WalRecord::TxBegin { lsn } => self.begin_transaction(lsn)?,
250 WalRecord::TxCommit { tx_begin_lsn, .. } => {
251 let events = self.take_pending(tx_begin_lsn, lsn, "commit")?;
252 self.committed.extend(events);
253 }
254 WalRecord::TxAbort { tx_begin_lsn, .. } => {
255 let _ = self.take_pending(tx_begin_lsn, lsn, "abort")?;
256 }
257 WalRecord::Checkpoint { snapshot_lsn, .. } => {
258 self.observe_checkpoint(lsn, snapshot_lsn)?;
259 }
260 }
261 Ok(())
262 }
263
264 fn begin_transaction(&mut self, lsn: Lsn) -> Result<(), WalError> {
265 if self.pending.insert(lsn, Vec::new()).is_some() {
268 return Err(WalError::Malformed(format!(
269 "duplicate tx begin at lsn {}",
270 lsn.raw()
271 )));
272 }
273 Ok(())
274 }
275
276 fn pending_events_mut(
277 &mut self,
278 tx_begin_lsn: Lsn,
279 record_lsn: Lsn,
280 kind: &str,
281 ) -> Result<&mut Vec<MutationEvent>, WalError> {
282 self.pending
283 .get_mut(&tx_begin_lsn)
284 .ok_or_else(|| missing_tx_begin(kind, record_lsn, tx_begin_lsn))
285 }
286
287 fn take_pending(
288 &mut self,
289 tx_begin_lsn: Lsn,
290 record_lsn: Lsn,
291 kind: &str,
292 ) -> Result<Vec<MutationEvent>, WalError> {
293 self.pending
294 .remove(&tx_begin_lsn)
295 .ok_or_else(|| missing_tx_begin(kind, record_lsn, tx_begin_lsn))
296 }
297
298 fn observe_checkpoint(&mut self, lsn: Lsn, snapshot_lsn: Lsn) -> Result<(), WalError> {
299 if snapshot_lsn > lsn {
300 return Err(WalError::Malformed(format!(
301 "checkpoint at lsn {} points to future snapshot lsn {}",
302 lsn.raw(),
303 snapshot_lsn.raw()
304 )));
305 }
306 if let Some(prev) = self.checkpoint_lsn_observed {
307 if snapshot_lsn < prev {
308 return Err(WalError::Malformed(format!(
309 "checkpoint snapshot lsn {} regressed below previous checkpoint {}",
310 snapshot_lsn.raw(),
311 prev.raw()
312 )));
313 }
314 }
315 self.checkpoint_lsn_observed = Some(snapshot_lsn);
316 Ok(())
317 }
318
319 fn finish(self, torn_tail: Option<TornTailInfo>, last_good_offset: u64) -> ReplayOutcome {
320 ReplayOutcome {
324 committed_events: self.committed,
325 max_lsn: self.max_lsn,
326 torn_tail,
327 checkpoint_lsn_observed: self.checkpoint_lsn_observed,
328 last_good_offset,
329 }
330 }
331}
332
333fn missing_tx_begin(kind: &str, record_lsn: Lsn, tx_begin_lsn: Lsn) -> WalError {
334 WalError::Malformed(format!(
335 "{kind} at lsn {} references missing tx begin {}",
336 record_lsn.raw(),
337 tx_begin_lsn.raw()
338 ))
339}
340
341pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
348 let entries = SegmentDir::new(dir).list()?;
349 let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
350 replay_segments(&paths, checkpoint_lsn)
351}