betex 0.35.0

Betfair / Prediction Market Exchange
Documentation
use std::path::{Path, PathBuf};

use crate::{book::BookEventEnvelope, config::JournalConfig};

use super::{
    journal_scan::{
        JournalScanSource, ScanStartPolicy, decode_journal_event, replay_wal,
        validate_recovery_window, validate_segment_sequence,
    },
    journaler::{ENGINE_WAL_DB_NAME, ReadSegment, list_segment_dirs, segment_info_readonly},
};

pub use super::journal_scan::JournalEvent;

pub struct JournalReader {
    root_dir: PathBuf,
    config: JournalConfig,
}

impl JournalScanSource for JournalReader {
    fn root_dir(&self) -> &Path {
        &self.root_dir
    }

    fn visit_segment_entries<F>(
        &self,
        dir_name: &str,
        start_seq: u64,
        up_to_seq: Option<u64>,
        on_entry: F,
    ) -> anyhow::Result<()>
    where
        F: FnMut(u64, &[u8]) -> anyhow::Result<()>,
    {
        let path = self.root_dir.join(dir_name);
        let seg = ReadSegment::open(
            &path,
            ENGINE_WAL_DB_NAME,
            self.config.segment_map_size_bytes,
        )?;
        seg.visit_entries(start_seq, up_to_seq, on_entry)
    }
}

impl JournalReader {
    fn recover_events<F>(
        &self,
        after_seq: Option<u64>,
        up_to_seq: Option<u64>,
        start_policy: ScanStartPolicy,
        mut on_event: F,
    ) -> anyhow::Result<()>
    where
        F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
    {
        let validation = validate_recovery_window::<BookEventEnvelope, _>(
            self,
            after_seq,
            up_to_seq,
            start_policy,
            None,
        )?;
        let Some(validated_up_to_seq) = validation.summary.last_emitted_seq else {
            return Ok(());
        };
        replay_wal::<BookEventEnvelope, _, _>(
            self,
            after_seq,
            Some(validated_up_to_seq),
            start_policy,
            |bytes, end_of_batch| {
                let event = decode_journal_event(bytes)?;
                on_event(&event, end_of_batch)
            },
        )?;
        Ok(())
    }

    pub fn open(root_dir: impl AsRef<Path>, config: JournalConfig) -> anyhow::Result<Self> {
        let root_dir = root_dir.as_ref().to_path_buf();
        let reader = Self { root_dir, config };

        if reader.config.validate_on_startup {
            let dirs = list_segment_dirs(&reader.root_dir)?;
            if dirs.len() > 1 {
                validate_segment_sequence(&reader.root_dir, |dir_name| {
                    segment_info_readonly(
                        &reader.root_dir,
                        dir_name,
                        ENGINE_WAL_DB_NAME,
                        reader.config.segment_map_size_bytes,
                    )
                })?;
            }
        }

        Ok(reader)
    }

    pub fn head_seq(&self) -> anyhow::Result<Option<u64>> {
        Ok(validate_recovery_window::<BookEventEnvelope, _>(
            self,
            None,
            None,
            ScanStartPolicy::ExactContinuation,
            None,
        )?
        .summary
        .last_emitted_seq)
    }

    /// Recover the durable WAL prefix after an exact cursor.
    ///
    /// An incomplete transaction at the physical end of the WAL is trimmed when replay starts
    /// before that transaction begins. If `after_seq` falls inside that truncated tail
    /// transaction, recovery fails with `WalError::IncompleteTxAfterCursor`.
    pub fn recover_from<F>(&self, after_seq: Option<u64>, on_event: F) -> anyhow::Result<()>
    where
        F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
    {
        self.recover_events(
            after_seq,
            None,
            ScanStartPolicy::ExactContinuation,
            on_event,
        )
    }

    pub fn recover_range<F>(
        &self,
        after_seq: Option<u64>,
        up_to_seq: u64,
        on_event: F,
    ) -> anyhow::Result<()>
    where
        F: FnMut(&JournalEvent, bool) -> anyhow::Result<()>,
    {
        self.recover_events(
            after_seq,
            Some(up_to_seq),
            ScanStartPolicy::AllowPartialLeftEdge,
            on_event,
        )
    }
}