use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use lora_store::MutationEvent;
use crate::dir::SegmentDir;
use crate::error::WalError;
use crate::lsn::Lsn;
use crate::record::WalRecord;
use crate::segment::SegmentReader;
#[derive(Debug)]
pub struct ReplayOutcome {
pub committed_events: Vec<MutationEvent>,
pub max_lsn: Lsn,
pub torn_tail: Option<TornTailInfo>,
pub checkpoint_lsn_observed: Option<Lsn>,
}
#[derive(Debug)]
pub struct TornTailInfo {
pub segment_path: PathBuf,
pub last_good_offset: u64,
pub cause: WalError,
}
pub(crate) fn replay_segments(
paths: &[PathBuf],
checkpoint_lsn: Lsn,
) -> Result<ReplayOutcome, WalError> {
let mut pending: BTreeMap<Lsn, Vec<MutationEvent>> = BTreeMap::new();
let mut committed: Vec<MutationEvent> = Vec::new();
let mut max_lsn = Lsn::ZERO;
let mut last_lsn = Lsn::ZERO;
let mut last_segment_base: Option<Lsn> = None;
let mut torn_tail: Option<TornTailInfo> = None;
let mut checkpoint_lsn_observed: Option<Lsn> = None;
'outer: for path in paths {
let mut reader = SegmentReader::open(path)?;
let segment_base = reader.header().base_lsn;
if let Some(prev_base) = last_segment_base {
if segment_base <= prev_base {
return Err(WalError::Malformed(format!(
"segment base_lsn {} is not greater than previous base_lsn {} ({})",
segment_base.raw(),
prev_base.raw(),
path.display()
)));
}
}
if !last_lsn.is_zero() && segment_base <= last_lsn {
return Err(WalError::Malformed(format!(
"segment base_lsn {} is not greater than previous record lsn {} ({})",
segment_base.raw(),
last_lsn.raw(),
path.display()
)));
}
last_segment_base = Some(segment_base);
loop {
let before = reader.position();
match reader.read_record() {
Ok(Some(record)) => {
let lsn = record.lsn();
if lsn < segment_base {
return Err(WalError::Malformed(format!(
"record lsn {} is below segment base_lsn {} ({})",
lsn.raw(),
segment_base.raw(),
path.display()
)));
}
if !last_lsn.is_zero() && lsn <= last_lsn {
return Err(WalError::Malformed(format!(
"record lsn {} is not greater than previous lsn {} ({})",
lsn.raw(),
last_lsn.raw(),
path.display()
)));
}
last_lsn = lsn;
if lsn > max_lsn {
max_lsn = lsn;
}
if lsn.raw() <= checkpoint_lsn.raw() {
if let WalRecord::TxCommit { tx_begin_lsn, .. }
| WalRecord::TxAbort { tx_begin_lsn, .. } = &record
{
pending.remove(tx_begin_lsn);
}
continue;
}
match record {
WalRecord::Mutation {
tx_begin_lsn,
event,
..
} => {
let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
WalError::Malformed(format!(
"mutation at lsn {} references missing tx begin {}",
lsn.raw(),
tx_begin_lsn.raw()
))
})?;
events.push(event);
}
WalRecord::MutationBatch {
tx_begin_lsn,
events: batch,
..
} => {
let events = pending.get_mut(&tx_begin_lsn).ok_or_else(|| {
WalError::Malformed(format!(
"mutation batch at lsn {} references missing tx begin {}",
lsn.raw(),
tx_begin_lsn.raw()
))
})?;
events.extend(batch);
}
WalRecord::TxBegin { lsn } => {
if pending.insert(lsn, Vec::new()).is_some() {
return Err(WalError::Malformed(format!(
"duplicate tx begin at lsn {}",
lsn.raw()
)));
}
}
WalRecord::TxCommit { tx_begin_lsn, .. } => {
let events = pending.remove(&tx_begin_lsn).ok_or_else(|| {
WalError::Malformed(format!(
"commit at lsn {} references missing tx begin {}",
lsn.raw(),
tx_begin_lsn.raw()
))
})?;
committed.extend(events);
}
WalRecord::TxAbort { tx_begin_lsn, .. } => {
pending.remove(&tx_begin_lsn).ok_or_else(|| {
WalError::Malformed(format!(
"abort at lsn {} references missing tx begin {}",
lsn.raw(),
tx_begin_lsn.raw()
))
})?;
}
WalRecord::Checkpoint { snapshot_lsn, .. } => {
if snapshot_lsn > lsn {
return Err(WalError::Malformed(format!(
"checkpoint at lsn {} points to future snapshot lsn {}",
lsn.raw(),
snapshot_lsn.raw()
)));
}
if let Some(prev) = checkpoint_lsn_observed {
if snapshot_lsn < prev {
return Err(WalError::Malformed(format!(
"checkpoint snapshot lsn {} regressed below previous checkpoint {}",
snapshot_lsn.raw(),
prev.raw()
)));
}
}
checkpoint_lsn_observed = Some(snapshot_lsn);
}
}
}
Ok(None) => break,
Err(err) => {
torn_tail = Some(TornTailInfo {
segment_path: path.clone(),
last_good_offset: before,
cause: err,
});
break 'outer;
}
}
}
}
drop(pending);
Ok(ReplayOutcome {
committed_events: committed,
max_lsn,
torn_tail,
checkpoint_lsn_observed,
})
}
pub fn replay_dir(dir: &Path, checkpoint_lsn: Lsn) -> Result<ReplayOutcome, WalError> {
let entries = SegmentDir::new(dir).list()?;
let paths: Vec<PathBuf> = entries.into_iter().map(|e| e.path).collect();
replay_segments(&paths, checkpoint_lsn)
}