use std::path::Path;
use tracing::{error, info, warn};
use crate::{
book::BookEventEnvelope,
disruptor::{Envelope, traits::RkyvError},
error::{SegmentError, WalError},
};
use super::journaler::{find_segment_for_seq, list_segment_dirs, parse_segment_dir};
pub type JournalEvent = Envelope<BookEventEnvelope>;
pub(crate) type ArchivedEnvelopeOf<T> = <Envelope<T> as rkyv::Archive>::Archived;
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ScanSummary {
pub emitted_events: usize,
pub last_emitted_seq: Option<u64>,
pub last_seen_seq: Option<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct IncompleteTailTx {
pub start_seq: u64,
pub end_seq: u64,
pub tx_id: u64,
pub next_expected_seq: u64,
}
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ValidatedScan {
pub summary: ScanSummary,
pub incomplete_tail: Option<IncompleteTailTx>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ScanStartPolicy {
ExactContinuation,
AllowPartialLeftEdge,
}
pub(crate) trait JournalScanSource {
fn root_dir(&self) -> &Path;
fn visit_segment_entries<F>(
&self,
dir_name: &str,
start_seq: u64,
up_to_seq: Option<u64>,
on_entry: F,
) -> anyhow::Result<()>
where
F: FnMut(u64, &[u8]) -> anyhow::Result<()>;
}
fn load_entry_at_seq<S>(source: &S, target_seq: u64) -> anyhow::Result<Option<Vec<u8>>>
where
S: JournalScanSource,
{
let dirs = list_segment_dirs(source.root_dir())?;
if dirs.is_empty() {
return Ok(None);
}
let start_idx = find_segment_for_seq(&dirs, target_seq);
let mut found = None;
for dir_name in &dirs[start_idx..] {
if parse_segment_dir(dir_name).is_some_and(|segment_start| segment_start > target_seq) {
break;
}
source.visit_segment_entries(
dir_name,
target_seq,
Some(target_seq),
|app_seq, val_bytes| {
if app_seq == target_seq {
found = Some(val_bytes.to_vec());
}
Ok(())
},
)?;
if found.is_some() {
break;
}
}
Ok(found)
}
fn access_archived_envelope<'a, T>(
app_seq: u64,
val_bytes: &'a [u8],
) -> anyhow::Result<&'a ArchivedEnvelopeOf<T>>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'b> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'b, RkyvError>>,
{
match rkyv::api::high::access::<ArchivedEnvelopeOf<T>, RkyvError>(val_bytes) {
Ok(archived) => Ok(archived),
Err(e) => {
error!(app_seq, error = %e, "corrupt WAL record");
Err(WalError::CorruptRecord {
app_seq,
details: e.to_string(),
}
.into())
}
}
}
fn ensure_visible_seq_continuity(
expected_visible_seq: &mut Option<u64>,
seq: u64,
) -> anyhow::Result<()> {
if let Some(expected) = *expected_visible_seq
&& seq != expected
{
error!(
expected_seq = expected,
found_seq = seq,
"wal sequence gap during recovery"
);
return Err(WalError::SequenceGap {
expected,
found: seq,
}
.into());
}
*expected_visible_seq = Some(seq.saturating_add(1));
Ok(())
}
fn ensure_recovered_through(
after_seq: Option<u64>,
required_through_seq: Option<u64>,
summary: &ScanSummary,
) -> anyhow::Result<()> {
let Some(required_through) = required_through_seq else {
return Ok(());
};
let recovered_through = summary.last_emitted_seq.or(after_seq).unwrap_or(0);
if recovered_through < required_through {
error!(
required_through,
recovered_through, "wal recovery stopped before the required replay head"
);
return Err(WalError::RecoveryIncomplete {
required_through,
recovered_through,
}
.into());
}
Ok(())
}
fn validate_first_visible_seq(
start_policy: ScanStartPolicy,
expected_start_seq: Option<u64>,
seq: u64,
tx_ix: u16,
can_resume_exact_cursor_mid_tx: bool,
) -> anyhow::Result<()> {
match (start_policy, expected_start_seq) {
(ScanStartPolicy::ExactContinuation, Some(expected)) if seq != expected => {
error!(
expected_seq = expected,
found_seq = seq,
"wal sequence gap during recovery"
);
Err(WalError::SequenceGap {
expected,
found: seq,
}
.into())
}
(ScanStartPolicy::ExactContinuation, Some(_))
if tx_ix != 0 && !can_resume_exact_cursor_mid_tx =>
{
let expected = seq.saturating_sub(u64::from(tx_ix));
error!(
expected_seq = expected,
found_seq = seq,
tx_ix,
"wal sequence gap during recovery"
);
Err(WalError::SequenceGap {
expected,
found: seq,
}
.into())
}
(ScanStartPolicy::ExactContinuation, None) if tx_ix != 0 => {
let expected = seq.saturating_sub(u64::from(tx_ix));
error!(
expected_seq = expected,
found_seq = seq,
tx_ix,
"wal sequence gap during recovery"
);
Err(WalError::SequenceGap {
expected,
found: seq,
}
.into())
}
(ScanStartPolicy::AllowPartialLeftEdge, Some(expected)) => {
let tx_start_seq = seq.saturating_sub(u64::from(tx_ix));
let can_skip = tx_ix != 0 && tx_start_seq <= expected;
if seq != expected && !can_skip {
error!(
expected_seq = expected,
found_seq = seq,
tx_ix,
"wal sequence gap during recovery"
);
return Err(WalError::SequenceGap {
expected,
found: seq,
}
.into());
}
Ok(())
}
_ => Ok(()),
}
}
pub(crate) fn scan_wal<T, S, F>(
source: &S,
after_seq: Option<u64>,
up_to_seq: Option<u64>,
start_policy: ScanStartPolicy,
can_resume_exact_cursor_mid_tx: bool,
required_through_seq: Option<u64>,
mut on_event: F,
) -> anyhow::Result<ValidatedScan>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
S: JournalScanSource,
F: FnMut(&[u8], bool) -> anyhow::Result<()>,
{
struct BufferedEntry {
app_seq: u64,
bytes: Vec<u8>,
}
let start_seq = after_seq.map(|s| s.saturating_add(1)).unwrap_or(0);
info!(
after_seq = ?after_seq,
up_to_seq = ?up_to_seq,
start_seq,
"starting segmented wal recovery"
);
if up_to_seq.is_some_and(|up_to| start_seq > up_to) {
return Ok(ValidatedScan::default());
}
let dirs = list_segment_dirs(source.root_dir())?;
if dirs.is_empty() {
info!("no segments found, nothing to recover");
return Ok(ValidatedScan::default());
}
let start_idx = find_segment_for_seq(&dirs, start_seq);
struct TxState {
tx_id: u64,
tx_len: u16,
count: u16,
emit: bool,
resumed_after_cursor: bool,
}
let mut current_tx: Option<TxState> = None;
let mut current_tx_buf: Vec<BufferedEntry> = Vec::new();
let mut pending_last: Option<BufferedEntry> = None;
let mut summary = ScanSummary::default();
let mut incomplete_tail = None;
let mut expected_visible_seq: Option<u64> = None;
let expected_start_seq = after_seq.map(|seq| seq.saturating_add(1));
let mut checked_first_visible = false;
let resume_partial_left_edge = matches!(start_policy, ScanStartPolicy::ExactContinuation)
&& can_resume_exact_cursor_mid_tx;
let mut first_tx_started = false;
let buffer_entry = |app_seq: u64, val_bytes: &[u8]| BufferedEntry {
app_seq,
bytes: val_bytes.to_vec(),
};
for dir_name in &dirs[start_idx..] {
if up_to_seq
.zip(parse_segment_dir(dir_name))
.is_some_and(|(up_to, segment_start)| segment_start > up_to)
{
break;
}
source.visit_segment_entries(dir_name, start_seq, up_to_seq, |app_seq, val_bytes| {
summary.last_seen_seq = Some(app_seq);
let archived = access_archived_envelope::<T>(app_seq, val_bytes)?;
let seq: u64 = archived.seq.into();
let tx_id: u64 = archived.tx_id.into();
let tx_len: u16 = archived.tx_len.into();
let tx_ix: u16 = archived.tx_ix.into();
if tx_len == 0 || tx_ix >= tx_len {
error!(app_seq, tx_id, tx_len, tx_ix, "invalid wal tx framing");
return Err(WalError::InvalidTxFraming {
app_seq,
tx_id,
tx_len,
tx_ix,
}
.into());
}
if !checked_first_visible {
checked_first_visible = true;
validate_first_visible_seq(
start_policy,
expected_start_seq,
seq,
tx_ix,
can_resume_exact_cursor_mid_tx,
)?;
}
ensure_visible_seq_continuity(&mut expected_visible_seq, seq)?;
match &mut current_tx {
None => {
let is_first = !first_tx_started;
first_tx_started = true;
if !is_first && tx_ix != 0 {
error!(app_seq, tx_id, tx_len, tx_ix, "invalid wal tx framing");
return Err(WalError::InvalidTxFraming {
app_seq,
tx_id,
tx_len,
tx_ix,
}
.into());
}
let can_resume_mid_tx = is_first && resume_partial_left_edge;
let emit = tx_ix == 0 || can_resume_mid_tx;
current_tx_buf.clear();
if emit {
current_tx_buf.push(buffer_entry(app_seq, val_bytes));
}
current_tx = Some(TxState {
tx_id,
tx_len,
count: tx_ix.saturating_add(1),
emit,
resumed_after_cursor: can_resume_mid_tx && tx_ix != 0,
});
}
Some(tx) => {
let expected_ix = tx.count;
if tx.tx_id != tx_id || tx.tx_len != tx_len || tx_ix != expected_ix {
error!(
app_seq,
tx_id,
tx_len,
tx_ix,
expected_tx_id = tx.tx_id,
expected_tx_len = tx.tx_len,
expected_tx_ix = expected_ix,
"invalid wal tx framing"
);
return Err(WalError::InvalidTxFraming {
app_seq,
tx_id,
tx_len,
tx_ix,
}
.into());
}
if tx.emit {
current_tx_buf.push(buffer_entry(app_seq, val_bytes));
}
tx.count += 1;
}
}
let is_complete = current_tx.as_ref().is_some_and(|tx| tx.count == tx.tx_len);
if is_complete {
for entry in current_tx_buf.drain(..) {
if let Some(prev) = pending_last.take() {
on_event(&prev.bytes, false)?;
summary.emitted_events += 1;
summary.last_emitted_seq = Some(prev.app_seq);
}
pending_last = Some(entry);
}
current_tx = None;
}
Ok(())
})?;
}
if let Some(tx) = current_tx.take() {
if tx.resumed_after_cursor {
let after_seq = after_seq.expect("partial cursor continuation requires after_seq");
let next_expected_seq = expected_visible_seq.unwrap_or(after_seq.saturating_add(1));
error!(
after_seq,
tx_id = tx.tx_id,
next_expected_seq,
"wal cannot complete transaction after cursor"
);
return Err(WalError::IncompleteTxAfterCursor {
after_seq,
tx_id: tx.tx_id,
next_expected_seq,
}
.into());
}
if tx.emit {
debug_assert!(
!current_tx_buf.is_empty(),
"emit=true implies buffered entries"
);
let trimmed_start = current_tx_buf.first().unwrap().app_seq;
let trimmed_end = current_tx_buf.last().unwrap().app_seq;
let trimmed_count = current_tx_buf.len();
let trim_to = trimmed_start.saturating_sub(1);
incomplete_tail = Some(IncompleteTailTx {
start_seq: trimmed_start,
end_seq: trimmed_end,
tx_id: tx.tx_id,
next_expected_seq: trimmed_end.saturating_add(1),
});
warn!(
trimmed_start,
trimmed_end, trimmed_count, trim_to, "dropping incomplete wal tail"
);
}
}
if let Some(last) = pending_last.take() {
on_event(&last.bytes, true)?;
summary.emitted_events += 1;
summary.last_emitted_seq = Some(last.app_seq);
}
ensure_recovered_through(after_seq, required_through_seq, &summary)?;
let last_seq = summary.last_seen_seq.or(after_seq).unwrap_or(0);
info!(
last_seq,
emitted_events = summary.emitted_events,
"wal recovery complete"
);
Ok(ValidatedScan {
summary,
incomplete_tail,
})
}
pub(crate) fn validate_recovery_window<T, S>(
source: &S,
after_seq: Option<u64>,
up_to_seq: Option<u64>,
start_policy: ScanStartPolicy,
required_through_seq: Option<u64>,
) -> anyhow::Result<ValidatedScan>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
S: JournalScanSource,
{
let can_resume_exact_cursor_mid_tx = match start_policy {
ScanStartPolicy::ExactContinuation => {
validate_exact_cursor_record::<T, _>(source, after_seq)?
}
ScanStartPolicy::AllowPartialLeftEdge => false,
};
let validation = scan_wal::<T, _, _>(
source,
after_seq,
up_to_seq,
start_policy,
can_resume_exact_cursor_mid_tx,
required_through_seq,
|_bytes, _end_of_batch| Ok(()),
)?;
Ok(validation)
}
pub(crate) fn replay_wal<T, S, F>(
source: &S,
after_seq: Option<u64>,
up_to_seq: Option<u64>,
start_policy: ScanStartPolicy,
on_event: F,
) -> anyhow::Result<ScanSummary>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
S: JournalScanSource,
F: FnMut(&[u8], bool) -> anyhow::Result<()>,
{
let can_resume_exact_cursor_mid_tx = match start_policy {
ScanStartPolicy::ExactContinuation => {
validate_exact_cursor_record::<T, _>(source, after_seq)?
}
ScanStartPolicy::AllowPartialLeftEdge => false,
};
Ok(scan_wal::<T, _, _>(
source,
after_seq,
up_to_seq,
start_policy,
can_resume_exact_cursor_mid_tx,
None,
on_event,
)?
.summary)
}
fn validate_exact_cursor_record<T, S>(source: &S, after_seq: Option<u64>) -> anyhow::Result<bool>
where
T: rkyv::Archive,
ArchivedEnvelopeOf<T>:
for<'a> rkyv::bytecheck::CheckBytes<rkyv::api::high::HighValidator<'a, RkyvError>>,
S: JournalScanSource,
{
let Some(after_seq) = after_seq else {
return Ok(false);
};
let Some(bytes) = load_entry_at_seq(source, after_seq)? else {
return Ok(false);
};
let archived = access_archived_envelope::<T>(after_seq, &bytes)?;
let tx_id: u64 = archived.tx_id.into();
let tx_len: u16 = archived.tx_len.into();
let tx_ix: u16 = archived.tx_ix.into();
if tx_len == 0 || tx_ix >= tx_len {
error!(after_seq, tx_id, tx_len, tx_ix, "invalid wal tx framing");
return Err(WalError::InvalidTxFraming {
app_seq: after_seq,
tx_id,
tx_len,
tx_ix,
}
.into());
}
let next_expected_tx_ix = tx_ix.saturating_add(1);
if next_expected_tx_ix >= tx_len {
return Ok(false);
}
let next_expected_seq = after_seq.saturating_add(1);
let Some(next_bytes) = load_entry_at_seq(source, next_expected_seq)? else {
error!(
after_seq,
tx_id, next_expected_seq, "wal cannot complete transaction after cursor"
);
return Err(WalError::IncompleteTxAfterCursor {
after_seq,
tx_id,
next_expected_seq,
}
.into());
};
let next = access_archived_envelope::<T>(next_expected_seq, &next_bytes)?;
let next_tx_id: u64 = next.tx_id.into();
let next_tx_len: u16 = next.tx_len.into();
let next_tx_ix: u16 = next.tx_ix.into();
if next_tx_len == 0 || next_tx_ix >= next_tx_len {
error!(
app_seq = next_expected_seq,
tx_id = next_tx_id,
tx_len = next_tx_len,
tx_ix = next_tx_ix,
"invalid wal tx framing"
);
return Err(WalError::InvalidTxFraming {
app_seq: next_expected_seq,
tx_id: next_tx_id,
tx_len: next_tx_len,
tx_ix: next_tx_ix,
}
.into());
}
if next_tx_id != tx_id || next_tx_len != tx_len || next_tx_ix != next_expected_tx_ix {
error!(
after_seq,
tx_id,
next_expected_seq,
next_tx_id,
next_tx_len,
next_tx_ix,
"wal cannot complete transaction after cursor"
);
return Err(WalError::IncompleteTxAfterCursor {
after_seq,
tx_id,
next_expected_seq,
}
.into());
}
Ok(true)
}
pub(crate) fn decode_journal_event(bytes: &[u8]) -> anyhow::Result<JournalEvent> {
let archived =
rkyv::api::high::access::<ArchivedEnvelopeOf<BookEventEnvelope>, RkyvError>(bytes)?;
let payload = rkyv::api::high::deserialize::<BookEventEnvelope, RkyvError>(&archived.payload)?;
Ok(Envelope {
seq: archived.seq.into(),
payload,
response_cb: None,
tx_id: archived.tx_id.into(),
tx_len: archived.tx_len.into(),
tx_ix: archived.tx_ix.into(),
})
}
pub(crate) fn validate_segment_sequence<F>(root_dir: &Path, mut load_info: F) -> anyhow::Result<()>
where
F: FnMut(&str) -> anyhow::Result<super::journaler::SegmentInfo>,
{
let dirs = list_segment_dirs(root_dir)?;
if dirs.is_empty() {
return Err(SegmentError::NoSegments {
path: root_dir.display().to_string(),
}
.into());
}
let mut prev_end: Option<u64> = None;
for dir_name in &dirs {
let info = load_info(dir_name)?;
if let Some(prev_end_seq) = prev_end
&& let Some(next_start_seq) = info.start_seq
{
let expected = prev_end_seq + 1;
if next_start_seq > expected {
return Err(SegmentError::ContiguityGap {
prev_end_seq,
next_start_seq,
expected,
}
.into());
}
if next_start_seq < expected {
return Err(SegmentError::ContiguityOverlap {
prev_end_seq,
next_start_seq,
}
.into());
}
}
if let Some(end) = info.end_seq {
prev_end = Some(end);
}
}
Ok(())
}