use std::fs::File;
use std::io::Read;
use std::path::Path;
use crate::api::errors::{Error, Result};
use super::codec::{decode_file_header, decode_record, FileHeader, FILE_HEADER_SIZE};
use super::txn_op::TxnOp;
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub struct ReplayStats {
pub records_seen: u64,
pub highest_seq: Option<u64>,
pub torn_tail_at: Option<u64>,
}
pub fn replay<F>(path: &Path, mut callback: F) -> Result<(FileHeader, ReplayStats)>
where
F: FnMut(&TxnOp, u64, u64) -> Result<()>,
{
let mut file = File::open(path)?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
replay_bytes(&bytes, &mut callback)
}
pub fn replay_bytes<F>(bytes: &[u8], callback: &mut F) -> Result<(FileHeader, ReplayStats)>
where
F: FnMut(&TxnOp, u64, u64) -> Result<()>,
{
if bytes.len() < FILE_HEADER_SIZE {
return Err(Error::ReplaySanityFailed {
context: "WAL too short — missing file header",
record_offset: 0,
});
}
let header = decode_file_header(&bytes[..FILE_HEADER_SIZE])?;
let mut offset = FILE_HEADER_SIZE;
let mut records_seen = 0u64;
let mut highest_seq: Option<u64> = None;
let mut torn_tail_at: Option<u64> = None;
while offset < bytes.len() {
match decode_record(&bytes[offset..]) {
Ok(r) => {
if let TxnOp::Batch { ops, .. } = &r.op {
for (i, inner) in ops.iter().enumerate() {
let inner_seq = r.seq.wrapping_add(i as u64);
callback(inner, inner_seq, offset as u64)
.map_err(|e| patch_offset(e, offset))?;
highest_seq = Some(match highest_seq {
None => inner_seq,
Some(s) => s.max(inner_seq),
});
}
} else {
callback(&r.op, r.seq, offset as u64).map_err(|e| patch_offset(e, offset))?;
highest_seq = Some(match highest_seq {
None => r.seq,
Some(s) => s.max(r.seq),
});
}
records_seen += 1;
offset += r.bytes_consumed;
}
Err(Error::ReplaySanityFailed { context, .. }) if is_torn_tail(context) => {
torn_tail_at = Some(offset as u64);
break;
}
Err(e) => {
return Err(patch_offset(e, offset));
}
}
}
Ok((
header,
ReplayStats {
records_seen,
highest_seq,
torn_tail_at,
},
))
}
fn is_torn_tail(context: &'static str) -> bool {
context == "record header truncated" || context == "record body truncated"
}
fn patch_offset(e: Error, offset: usize) -> Error {
match e {
Error::ReplaySanityFailed { context, .. } => Error::ReplaySanityFailed {
context,
record_offset: offset as u64,
},
other => other,
}
}