use super::{
Engine, EngineBuildCtx, EngineBuilder, EngineEvent, McBuilder, NcBuilder, ScBuilder,
build_disruptor_pipeline, build_engine,
};
use crate::{
disruptor::RingSlot,
engine::{
journal_scan::{IncompleteTailTx, JournalEvent},
journaler::JournalHandler,
root::EngineRoot,
},
error::WalError,
};
use disrupt_rs::{
EventHandler, EventPoller, MultiConsumerBarrier, MultiConsumerDependentsBarrier,
SingleConsumerBarrier, wait_strategies::WaitStrategy,
};
use tracing::info;
mod macros;
use macros::{impl_handler_tuple, impl_many_handler_builder};
type CoreSettings = Option<(usize, &'static str)>;
type HandlerPoller<W> =
EventPoller<EngineEvent, MultiConsumerDependentsBarrier, <W as WaitStrategy>::Notifier>;
type HandlerProducer<W> = disrupt_rs::SingleProducer<EngineEvent, MultiConsumerBarrier, W>;
type EngineWithPoller<W> = (Engine, HandlerPoller<W>);
type AttachWithPollerResult<W> = (HandlerPoller<W>, HandlerProducer<W>);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HandlerRecovery {
LiveOnly,
AfterSeq(u64),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct StartupRecoveryPlan {
scan_floor: u64,
required_through_seq: Option<u64>,
}
#[doc(hidden)]
pub struct RegisteredHandler<H> {
handler: H,
recovery: HandlerRecovery,
core_settings: CoreSettings,
}
impl<H> RegisteredHandler<H> {
fn new(handler: H, recovery: HandlerRecovery, core_settings: CoreSettings) -> Self {
Self {
handler,
recovery,
core_settings,
}
}
fn update_recovery_plan(&self, snapshot_seq: u64, plan: &mut StartupRecoveryPlan) {
if let HandlerRecovery::AfterSeq(after_seq) = self.recovery {
plan.scan_floor = plan.scan_floor.min(after_seq);
if after_seq < snapshot_seq {
plan.required_through_seq = Some(snapshot_seq);
}
}
}
fn validate_cursor(
&self,
incomplete_tail: Option<IncompleteTailTx>,
recoverable_head: u64,
) -> anyhow::Result<()> {
let HandlerRecovery::AfterSeq(after_seq) = self.recovery else {
return Ok(());
};
if after_seq > recoverable_head {
return Err(WalError::CursorBeyondRecoverableHead {
after_seq,
recoverable_head,
}
.into());
}
if let Some(incomplete_tail) = incomplete_tail {
validate_cursor_against_incomplete_tail(after_seq, incomplete_tail)?;
}
Ok(())
}
}
impl<H> RegisteredHandler<H>
where
H: EventHandler<EngineEvent>,
{
fn replay_event(
&mut self,
event: &JournalEvent,
sequence: disrupt_rs::Sequence,
end_of_batch: bool,
replay_event: &mut Option<EngineEvent>,
) {
let HandlerRecovery::AfterSeq(after_seq) = self.recovery else {
return;
};
if event.seq <= after_seq {
return;
}
let replay_event = replay_event.get_or_insert_with(|| RingSlot::new(event.clone()));
self.handler.on_event(replay_event, sequence, end_of_batch);
}
}
trait HandlerTuple {
fn startup_recovery_plan(&self, snapshot_seq: u64) -> StartupRecoveryPlan;
fn validate_cursors(
&self,
incomplete_tail: Option<IncompleteTailTx>,
recoverable_head: u64,
) -> anyhow::Result<()>;
fn replay_event(
&mut self,
event: &JournalEvent,
sequence: disrupt_rs::Sequence,
end_of_batch: bool,
replay_event: &mut Option<EngineEvent>,
);
}
trait AttachHandlers<W>
where
W: WaitStrategy + Clone + Send + 'static,
{
type Barrier: disrupt_rs::Barrier + 'static;
fn attach(
self,
builder: NcBuilder<W>,
) -> disrupt_rs::SingleProducer<EngineEvent, Self::Barrier, W>;
}
trait AttachHandlersWithPoller<W>
where
W: WaitStrategy + Clone + Send + 'static,
{
fn attach_with_poller(self, builder: NcBuilder<W>) -> AttachWithPollerResult<W>;
}
impl_handler_tuple!(SingleConsumerBarrier => H1 h1);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14, H15 h15);
impl_handler_tuple!(MultiConsumerBarrier => H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14, H15 h15, H16 h16);
impl_many_handler_builder!(
(H1 h1, H2 h2) => H3,
(H1 h1, H2 h2, H3 h3) => H4,
(H1 h1, H2 h2, H3 h3, H4 h4) => H5,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5) => H6,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6) => H7,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7) => H8,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8) => H9,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9) => H10,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10) => H11,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11) => H12,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12) => H13,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13) => H14,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14) => H15,
(H1 h1, H2 h2, H3 h3, H4 h4, H5 h5, H6 h6, H7 h7, H8 h8, H9 h9, H10 h10, H11 h11, H12 h12, H13 h13, H14 h14, H15 h15) => H16,
);
impl<W, R> EngineBuilder<ScBuilder<W>, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<ScBuilder<W2>, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
self.set_wait_strategy(wait_strategy)
}
}
impl<W, R> EngineBuilder<McBuilder<W>, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<McBuilder<W2>, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
self.set_wait_strategy(wait_strategy)
}
}
impl<W> EngineBuilder<NcBuilder<W>, W, ()>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn register_handler<H>(
self,
handler: H,
recovery: HandlerRecovery,
core_settings: CoreSettings,
) -> EngineBuilder<ScBuilder<W>, W, (RegisteredHandler<H>,)>
where
H: EventHandler<EngineEvent> + Send + 'static,
{
self.map_handlers::<ScBuilder<W>, _>(|()| {
(RegisteredHandler::new(handler, recovery, core_settings),)
})
}
}
impl<W, H1> EngineBuilder<ScBuilder<W>, W, (RegisteredHandler<H1>,)>
where
W: WaitStrategy + Clone + Send + 'static,
H1: EventHandler<EngineEvent> + Send + 'static,
{
pub fn register_handler<H2>(
self,
handler: H2,
recovery: HandlerRecovery,
core_settings: CoreSettings,
) -> EngineBuilder<McBuilder<W>, W, (RegisteredHandler<H1>, RegisteredHandler<H2>)>
where
H2: EventHandler<EngineEvent> + Send + 'static,
{
self.map_handlers::<McBuilder<W>, _>(|(h1,)| {
(h1, RegisteredHandler::new(handler, recovery, core_settings))
})
}
pub fn build(self) -> anyhow::Result<Engine> {
build_engine_with_handlers(self)
}
pub fn build_with_poller(self) -> anyhow::Result<EngineWithPoller<W>> {
build_engine_with_handlers_and_poller(self)
}
}
fn build_engine_with_handlers<B, W, Hs>(builder: EngineBuilder<B, W, Hs>) -> anyhow::Result<Engine>
where
W: WaitStrategy + Clone + Send + 'static,
Hs: HandlerTuple + AttachHandlers<W>,
{
prepare_engine_build(builder, |registered_handlers, builder, ctx| {
let producer = registered_handlers.attach(builder);
build_engine(producer, ctx)
})
}
fn prepare_engine_build<B, W, Hs, R>(
builder: EngineBuilder<B, W, Hs>,
finish: impl FnOnce(Hs, NcBuilder<W>, EngineBuildCtx<W>) -> anyhow::Result<R>,
) -> anyhow::Result<R>
where
W: WaitStrategy + Clone + Send + 'static,
Hs: HandlerTuple,
{
let EngineBuilder {
config,
mut root,
wal_path,
mut registered_handlers,
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: _,
} = builder;
let snapshot_seq = root.version();
let recovery_plan = registered_handlers.startup_recovery_plan(snapshot_seq);
info!(wal_path = ?wal_path, "opening engine wal");
let mut journaler = JournalHandler::open_engine(&wal_path, config.journal.clone())?;
let validation = journaler.validate_events_from_required(
Some(recovery_plan.scan_floor),
recovery_plan.required_through_seq,
)?;
validate_startup_recovery_cursors(
validation.incomplete_tail,
validation.summary.last_emitted_seq,
snapshot_seq,
®istered_handlers,
)?;
replay_startup_recovery(
&journaler,
&mut root,
&mut registered_handlers,
recovery_plan,
validation.summary.last_emitted_seq,
)?;
journaler.apply_startup_maintenance()?;
if config.journal.validate_on_startup {
journaler.validate_segments()?;
}
let (wal_poller, builder, durability_gate) =
build_disruptor_pipeline(&config, wait_strategy.clone());
let ctx = EngineBuildCtx {
config,
root,
wal_poller,
journaler,
durability_gate,
shutdown,
metrics,
pinning,
wait_strategy,
};
finish(registered_handlers, builder, ctx)
}
fn build_engine_with_handlers_and_poller<B, W, Hs>(
builder: EngineBuilder<B, W, Hs>,
) -> anyhow::Result<EngineWithPoller<W>>
where
W: WaitStrategy + Clone + Send + 'static,
Hs: HandlerTuple + AttachHandlersWithPoller<W>,
{
prepare_engine_build(builder, |registered_handlers, builder, ctx| {
let (poller, producer) = registered_handlers.attach_with_poller(builder);
let engine = build_engine(producer, ctx)?;
Ok((engine, poller))
})
}
fn replay_startup_recovery(
journaler: &JournalHandler,
root: &mut EngineRoot,
handlers: &mut impl HandlerTuple,
recovery_plan: StartupRecoveryPlan,
validated_up_to_seq: Option<u64>,
) -> anyhow::Result<()> {
let Some(validated_up_to_seq) = validated_up_to_seq else {
info!(
scan_floor = recovery_plan.scan_floor,
"no complete wal transactions found during startup recovery"
);
return Ok(());
};
let summary = journaler.replay_events_from(
Some(recovery_plan.scan_floor),
Some(validated_up_to_seq),
|event, end_of_batch| {
root.observe_tx_id(event.tx_id);
if event.seq > root.version() {
root.assert_next_seq(event.seq);
root.apply_single(&event.payload);
}
let sequence = i64::try_from(event.seq).unwrap_or(i64::MAX);
let mut replay_event = None;
handlers.replay_event(event, sequence, end_of_batch, &mut replay_event);
Ok(())
},
)?;
if summary.emitted_events == 0 {
info!(
scan_floor = recovery_plan.scan_floor,
"no complete wal transactions found during startup recovery"
);
return Ok(());
}
info!(
scan_floor = recovery_plan.scan_floor,
startup_head = ?summary.last_emitted_seq,
emitted_events = summary.emitted_events,
root_seq = root.version(),
"completed startup wal recovery"
);
Ok(())
}
fn validate_cursor_against_incomplete_tail(
after_seq: u64,
incomplete_tail: IncompleteTailTx,
) -> anyhow::Result<()> {
if (incomplete_tail.start_seq..=incomplete_tail.end_seq).contains(&after_seq) {
return Err(WalError::IncompleteTxAfterCursor {
after_seq,
tx_id: incomplete_tail.tx_id,
next_expected_seq: incomplete_tail.next_expected_seq,
}
.into());
}
Ok(())
}
fn validate_startup_recovery_cursors(
incomplete_tail: Option<IncompleteTailTx>,
validated_wal_head_seq: Option<u64>,
snapshot_seq: u64,
handlers: &impl HandlerTuple,
) -> anyhow::Result<()> {
if let Some(incomplete_tail) = incomplete_tail {
validate_cursor_against_incomplete_tail(snapshot_seq, incomplete_tail)?;
}
let recoverable_head = snapshot_seq.max(validated_wal_head_seq.unwrap_or(0));
handlers.validate_cursors(incomplete_tail, recoverable_head)
}
fn configure_handler_builder<B, W>(builder: B, core_settings: CoreSettings) -> B
where
B: disrupt_rs::ProcessorSettings<EngineEvent, W>,
W: WaitStrategy,
{
match core_settings {
Some((id, name)) => builder.thread_name(name).pin_at_core(id),
None => builder,
}
}