betex 0.26.0

Betfair / Prediction Market Exchange
Documentation
use super::{
    Engine, EngineBuildCtx, EngineBuilder, EngineEvent, McBuilder, NcBuilder, ScBuilder,
    build_disruptor_pipeline, build_engine,
};

use crate::{
    disruptor::RingSlot,
    engine::{
        journal_scan::{IncompleteTailTx, JournalEvent},
        journaler::JournalHandler,
        root::EngineRoot,
    },
    error::WalError,
};

use disrupt_rs::{
    EventHandler, MultiConsumerBarrier, SingleConsumerBarrier, wait_strategies::WaitStrategy,
};
use tracing::info;

mod macros;

use macros::{impl_handler_tuple, impl_many_handler_builder};

type CoreSettings = Option<(usize, &'static str)>;

/// Startup replay policy for a downstream handler.
///
/// `AfterSeq(seq)` is an exact continuation cursor: startup may trim a trailing incomplete
/// transaction only when `seq` is before that transaction starts. If `seq` falls inside the
/// truncated tail transaction, engine build fails instead of silently skipping it. The cursor may
/// be ahead of the engine snapshot as long as it does not exceed the recoverable startup head.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandlerRecovery {
    LiveOnly,
    AfterSeq(u64),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct StartupRecoveryPlan {
    scan_floor: u64,
    required_through_seq: Option<u64>,
}

#[doc(hidden)]
pub struct RegisteredHandler<H> {
    handler: H,
    recovery: HandlerRecovery,
    core_settings: CoreSettings,
}

impl<H> RegisteredHandler<H> {
    fn new(handler: H, recovery: HandlerRecovery, core_settings: CoreSettings) -> Self {
        Self {
            handler,
            recovery,
            core_settings,
        }
    }

    fn update_recovery_plan(&self, snapshot_seq: u64, plan: &mut StartupRecoveryPlan) {
        if let HandlerRecovery::AfterSeq(after_seq) = self.recovery {
            plan.scan_floor = plan.scan_floor.min(after_seq);
            if after_seq < snapshot_seq {
                plan.required_through_seq = Some(snapshot_seq);
            }
        }
    }

    fn validate_cursor(
        &self,
        incomplete_tail: Option<IncompleteTailTx>,
        recoverable_head: u64,
    ) -> anyhow::Result<()> {
        let HandlerRecovery::AfterSeq(after_seq) = self.recovery else {
            return Ok(());
        };

        if after_seq > recoverable_head {
            return Err(WalError::CursorBeyondRecoverableHead {
                after_seq,
                recoverable_head,
            }
            .into());
        }

        if let Some(incomplete_tail) = incomplete_tail {
            validate_cursor_against_incomplete_tail(after_seq, incomplete_tail)?;
        }

        Ok(())
    }
}

impl<H> RegisteredHandler<H>
where
    H: EventHandler<EngineEvent>,
{
    fn replay_event(
        &mut self,
        event: &JournalEvent,
        sequence: disrupt_rs::Sequence,
        end_of_batch: bool,
        replay_event: &mut Option<EngineEvent>,
    ) {
        let HandlerRecovery::AfterSeq(after_seq) = self.recovery else {
            return;
        };

        if event.seq <= after_seq {
            return;
        }

        let replay_event = replay_event.get_or_insert_with(|| RingSlot::new(event.clone()));
        self.handler.on_event(replay_event, sequence, end_of_batch);
    }
}

trait HandlerTuple {
    fn startup_recovery_plan(&self, snapshot_seq: u64) -> StartupRecoveryPlan;
    fn validate_cursors(
        &self,
        incomplete_tail: Option<IncompleteTailTx>,
        recoverable_head: u64,
    ) -> anyhow::Result<()>;
    fn replay_event(
        &mut self,
        event: &JournalEvent,
        sequence: disrupt_rs::Sequence,
        end_of_batch: bool,
        replay_event: &mut Option<EngineEvent>,
    );
}

trait AttachHandlers<W>
where
    W: WaitStrategy + Clone + Send + 'static,
{
    type Barrier: disrupt_rs::Barrier + 'static;

    fn attach(
        self,
        builder: NcBuilder<W>,
    ) -> disrupt_rs::SingleProducer<EngineEvent, Self::Barrier, W>;
}

// Support up to 16 concrete downstream handlers without dynamic dispatch.
impl_handler_tuple!(SingleConsumerBarrier => H1 h1);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14, H15 h15);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14, H15 h15, H16 h16);

impl_many_handler_builder!(
    (H1 h1, H2 h2) => H3,
    (H1 h1, H2 h2, H3 h3) => H4,
    (H1 h1, H2 h2, H3 h3, H4 h4) => H5,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5) => H6,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6) => H7,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7) => H8,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8) => H9,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9) => H10,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10) => H11,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11) => H12,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12) => H13,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13) => H14,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14) => H15,
    (H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14, H15 h15) => H16,
);

impl<W, R> EngineBuilder<ScBuilder<W>, W, R>
where
    W: WaitStrategy + Clone + Send + 'static,
{
    pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<ScBuilder<W2>, W2, R>
    where
        W2: WaitStrategy + Clone + Send + 'static,
    {
        self.set_wait_strategy(wait_strategy)
    }
}

impl<W, R> EngineBuilder<McBuilder<W>, W, R>
where
    W: WaitStrategy + Clone + Send + 'static,
{
    pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<McBuilder<W2>, W2, R>
    where
        W2: WaitStrategy + Clone + Send + 'static,
    {
        self.set_wait_strategy(wait_strategy)
    }
}

impl<W> EngineBuilder<NcBuilder<W>, W, ()>
where
    W: WaitStrategy + Clone + Send + 'static,
{
    pub fn register_handler<H>(
        self,
        handler: H,
        recovery: HandlerRecovery,
        core_settings: CoreSettings,
    ) -> EngineBuilder<ScBuilder<W>, W, (RegisteredHandler<H>,)>
    where
        H: EventHandler<EngineEvent> + Send + 'static,
    {
        self.map_handlers::<ScBuilder<W>, _>(|()| {
            (RegisteredHandler::new(handler, recovery, core_settings),)
        })
    }
}

impl<W, H1> EngineBuilder<ScBuilder<W>, W, (RegisteredHandler<H1>,)>
where
    W: WaitStrategy + Clone + Send + 'static,
    H1: EventHandler<EngineEvent> + Send + 'static,
{
    pub fn register_handler<H2>(
        self,
        handler: H2,
        recovery: HandlerRecovery,
        core_settings: CoreSettings,
    ) -> EngineBuilder<McBuilder<W>, W, (RegisteredHandler<H1>, RegisteredHandler<H2>)>
    where
        H2: EventHandler<EngineEvent> + Send + 'static,
    {
        self.map_handlers::<McBuilder<W>, _>(|(h1,)| {
            (h1, RegisteredHandler::new(handler, recovery, core_settings))
        })
    }

    pub fn build(self) -> anyhow::Result<Engine> {
        build_engine_with_handlers(self)
    }
}

fn build_engine_with_handlers<B, W, Hs>(builder: EngineBuilder<B, W, Hs>) -> anyhow::Result<Engine>
where
    W: WaitStrategy + Clone + Send + 'static,
    Hs: HandlerTuple + AttachHandlers<W>,
{
    let EngineBuilder {
        config,
        mut root,
        wal_path,
        mut registered_handlers,
        shutdown,
        metrics,
        pinning,
        wait_strategy,
        _builder_state: _,
    } = builder;

    let snapshot_seq = root.version();
    let recovery_plan = registered_handlers.startup_recovery_plan(snapshot_seq);
    info!(wal_path = ?wal_path, "opening engine wal");
    let mut journaler = JournalHandler::open_engine(&wal_path, config.journal.clone())?;
    let validation = journaler.validate_events_from_required(
        Some(recovery_plan.scan_floor),
        recovery_plan.required_through_seq,
    )?;
    validate_startup_recovery_cursors(
        validation.incomplete_tail,
        validation.summary.last_emitted_seq,
        snapshot_seq,
        &registered_handlers,
    )?;
    replay_startup_recovery(
        &journaler,
        &mut root,
        &mut registered_handlers,
        recovery_plan,
        validation.summary.last_emitted_seq,
    )?;
    journaler.apply_startup_maintenance()?;
    if config.journal.validate_on_startup {
        journaler.validate_segments()?;
    }

    let (wal_poller, builder, durability_gate) =
        build_disruptor_pipeline(&config, wait_strategy.clone());
    let producer = registered_handlers.attach(builder);

    let ctx = EngineBuildCtx {
        config,
        root,
        wal_poller,
        journaler,
        durability_gate,
        shutdown,
        metrics,
        pinning,
        wait_strategy,
    };

    build_engine(producer, ctx)
}

fn replay_startup_recovery(
    journaler: &JournalHandler,
    root: &mut EngineRoot,
    handlers: &mut impl HandlerTuple,
    recovery_plan: StartupRecoveryPlan,
    validated_up_to_seq: Option<u64>,
) -> anyhow::Result<()> {
    let Some(validated_up_to_seq) = validated_up_to_seq else {
        info!(
            scan_floor = recovery_plan.scan_floor,
            "no complete wal transactions found during startup recovery"
        );
        return Ok(());
    };

    let summary = journaler.replay_events_from(
        Some(recovery_plan.scan_floor),
        Some(validated_up_to_seq),
        |event, end_of_batch| {
            root.observe_tx_id(event.tx_id);
            if event.seq > root.version() {
                root.assert_next_seq(event.seq);
                root.apply_single(&event.payload);
            }

            let sequence = i64::try_from(event.seq).unwrap_or(i64::MAX);
            let mut replay_event = None;
            handlers.replay_event(event, sequence, end_of_batch, &mut replay_event);

            Ok(())
        },
    )?;

    if summary.emitted_events == 0 {
        info!(
            scan_floor = recovery_plan.scan_floor,
            "no complete wal transactions found during startup recovery"
        );
        return Ok(());
    }

    info!(
        scan_floor = recovery_plan.scan_floor,
        startup_head = ?summary.last_emitted_seq,
        emitted_events = summary.emitted_events,
        root_seq = root.version(),
        "completed startup wal recovery"
    );
    Ok(())
}

fn validate_cursor_against_incomplete_tail(
    after_seq: u64,
    incomplete_tail: IncompleteTailTx,
) -> anyhow::Result<()> {
    if (incomplete_tail.start_seq..=incomplete_tail.end_seq).contains(&after_seq) {
        return Err(WalError::IncompleteTxAfterCursor {
            after_seq,
            tx_id: incomplete_tail.tx_id,
            next_expected_seq: incomplete_tail.next_expected_seq,
        }
        .into());
    }

    Ok(())
}

fn validate_startup_recovery_cursors(
    incomplete_tail: Option<IncompleteTailTx>,
    validated_wal_head_seq: Option<u64>,
    snapshot_seq: u64,
    handlers: &impl HandlerTuple,
) -> anyhow::Result<()> {
    if let Some(incomplete_tail) = incomplete_tail {
        validate_cursor_against_incomplete_tail(snapshot_seq, incomplete_tail)?;
    }

    let recoverable_head = snapshot_seq.max(validated_wal_head_seq.unwrap_or(0));
    handlers.validate_cursors(incomplete_tail, recoverable_head)
}

fn configure_handler_builder<B, W>(builder: B, core_settings: CoreSettings) -> B
where
    B: disrupt_rs::ProcessorSettings<EngineEvent, W>,
    W: WaitStrategy,
{
    match core_settings {
        Some((id, name)) => builder.thread_name(name).pin_at_core(id),
        None => builder,
    }
}