use std::path::{Path, PathBuf};
use crate::{book::BookEventEnvelope, config::JournalConfig};
use super::{
journal_scan::{
JournalScanSource, ScanStartPolicy, decode_journal_event, replay_wal,
validate_recovery_window, validate_segment_sequence,
},
journaler::{ENGINE_WAL_DB_NAME, ReadSegment, list_segment_dirs, segment_info_readonly},
};
pub use super::journal_scan::JournalEvent;
pub struct JournalReader {
root_dir: PathBuf,
config: JournalConfig,
}
impl JournalScanSource for JournalReader {
fn root_dir(&self) -> &Path {
&self.root_dir
}
fn visit_segment_entries<F>(
&self,
dir_name: &str,
start_seq: u64,
up_to_seq: Option<u64>,
on_entry: F,
) -> anyhow::Result<()>
where
F: FnMut(u64, &[u8]) -> anyhow::Result<()>,
{
let path = self.root_dir.join(dir_name);
let seg = ReadSegment::open(
&path,
ENGINE_WAL_DB_NAME,
self.config.segment_map_size_bytes,
)?;
seg.visit_entries(start_seq, up_to_seq, on_entry)
}
}
impl JournalReader {
fn recover_events<F>(
&self,
after_seq: Option<u64>,
up_to_seq: Option<u64>,
start_policy: ScanStartPolicy,
mut on_event: F,
) -> anyhow::Result<()>
where
F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
{
let validation = validate_recovery_window::<BookEventEnvelope, _>(
self,
after_seq,
up_to_seq,
start_policy,
None,
)?;
let Some(validated_up_to_seq) = validation.summary.last_emitted_seq else {
return Ok(());
};
replay_wal::<BookEventEnvelope, _, _>(
self,
after_seq,
Some(validated_up_to_seq),
start_policy,
|bytes, end_of_batch| {
let event = decode_journal_event(bytes)?;
on_event(&event, end_of_batch)
},
)?;
Ok(())
}
pub fn open(root_dir: impl AsRef<Path>, config: JournalConfig) -> anyhow::Result<Self> {
let root_dir = root_dir.as_ref().to_path_buf();
let reader = Self { root_dir, config };
if reader.config.validate_on_startup {
let dirs = list_segment_dirs(&reader.root_dir)?;
if dirs.len() > 1 {
validate_segment_sequence(&reader.root_dir, |dir_name| {
segment_info_readonly(
&reader.root_dir,
dir_name,
ENGINE_WAL_DB_NAME,
reader.config.segment_map_size_bytes,
)
})?;
}
}
Ok(reader)
}
pub fn head_seq(&self) -> anyhow::Result<Option<u64>> {
Ok(validate_recovery_window::<BookEventEnvelope, _>(
self,
None,
None,
ScanStartPolicy::ExactContinuation,
None,
)?
.summary
.last_emitted_seq)
}
pub fn recover_from<F>(&self, after_seq: Option<u64>, on_event: F) -> anyhow::Result<()>
where
F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
{
self.recover_events(
after_seq,
None,
ScanStartPolicy::ExactContinuation,
on_event,
)
}
pub fn recover_range<F>(
&self,
after_seq: Option<u64>,
up_to_seq: u64,
on_event: F,
) -> anyhow::Result<()>
where
F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
{
self.recover_events(
after_seq,
Some(up_to_seq),
ScanStartPolicy::AllowPartialLeftEdge,
on_event,
)
}
}