use super::error::EngineError;
use crate::{
book::{
BookEvent, BookEventEnvelope,
protocol::{
command::{Command, CommandKind},
reject::RejectReason,
response::Response,
},
},
config::{Config, ThreadPinning},
disruptor::{Envelope, ResponseCallback, RingSlot},
engine::{
journaler::{JournalHandler, WalMaintenanceHandle},
root::EngineRoot,
state::{EngineState, Market, MarketConfig},
},
error::RuntimeError,
types::{MarketId, MarketKind, MarketPhase},
};
use std::{
collections::VecDeque,
marker::PhantomData,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
use disrupt_rs::{
BusySpin, EventPoller, GatedSequence, MultiConsumerDependentsBarrier, Producer,
SingleProducerBarrier, build_single_producer,
builder::{MC, NC, SC, single::SPBuilder},
wait_strategies::WaitStrategy,
};
use tracing::{error, info, trace, warn};
mod tuple;
pub use tuple::HandlerRecovery;
#[doc(hidden)]
pub use tuple::RegisteredHandler;
pub type EngineEvent = RingSlot<BookEventEnvelope>;
type NcBuilder<W> = SPBuilder<NC, EngineEvent, W, MultiConsumerDependentsBarrier>;
type ScBuilder<W> = SPBuilder<SC, EngineEvent, W, MultiConsumerDependentsBarrier>;
type McBuilder<W> = SPBuilder<MC, EngineEvent, W, MultiConsumerDependentsBarrier>;
const INTERNAL_WORKLIST_CAPACITY: usize = 100;
pub struct EngineBuilder<B, W = BusySpin, Hs = ()>
where
W: WaitStrategy + Clone + Send + 'static,
{
config: Config,
root: EngineRoot,
wal_path: PathBuf,
registered_handlers: Hs,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
_builder_state: PhantomData<fn() -> B>,
}
impl EngineBuilder<NcBuilder<BusySpin>, BusySpin> {
pub fn with_markets(
config: Config,
markets: impl IntoIterator<Item = MarketConfig>,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let mut state = EngineState::new(config.clone());
for market_config in markets {
let market_id = market_config.market_id();
state.markets.insert(
market_id,
Market::from_config_with_cfg(&config, market_config),
);
}
let root = EngineRoot::new_with(state, 0);
Self::build_inner(root, config, wal_path)
}
pub fn new<I, S>(config: Config, markets: I, wal_path: impl AsRef<Path>) -> anyhow::Result<Self>
where
I: IntoIterator<Item = (MarketId, S)>,
S: Into<String>,
{
let markets = markets
.into_iter()
.map(|(mid, name)| MarketConfig::MultiRunner {
market_id: mid,
name: name.into(),
runners: Vec::new(),
market_kind: MarketKind::InPlayCapable,
market_state: crate::book::BookMarketState::Open,
market_phase: MarketPhase::Pre,
});
Self::with_markets(config, markets, wal_path)
}
pub fn from_root(
initial_root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
Self::build_inner(initial_root, config, wal_path)
}
fn build_inner(
root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
crate::btx_metrics_init!(
"engine",
"wal_poller",
"projection_apply",
"projection_live",
"projection_snapshot",
"projection_disk",
);
let metrics = crate::metrics::globals::metrics();
let pinning = config.pinning.clone().unwrap_or_default();
Ok(EngineBuilder {
config,
root,
wal_path: wal_path.as_ref().to_path_buf(),
registered_handlers: (),
shutdown: Arc::new(AtomicBool::new(false)),
metrics,
pinning,
wait_strategy: BusySpin,
_builder_state: PhantomData,
})
}
}
impl<B, W, R> EngineBuilder<B, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn shutdown_signal(&self) -> Arc<AtomicBool> {
Arc::clone(&self.shutdown)
}
fn map_handlers<B2, R2>(self, map: impl FnOnce(R) -> R2) -> EngineBuilder<B2, W, R2> {
let EngineBuilder {
config,
root,
wal_path,
registered_handlers,
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: _,
} = self;
EngineBuilder {
config,
root,
wal_path,
registered_handlers: map(registered_handlers),
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: PhantomData,
}
}
fn set_wait_strategy<B2, W2>(self, wait_strategy: W2) -> EngineBuilder<B2, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
EngineBuilder {
config: self.config,
root: self.root,
wal_path: self.wal_path,
registered_handlers: self.registered_handlers,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy,
_builder_state: PhantomData,
}
}
}
impl<W, R> EngineBuilder<NcBuilder<W>, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<NcBuilder<W2>, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
self.set_wait_strategy(wait_strategy)
}
}
#[allow(clippy::type_complexity)]
fn build_disruptor_pipeline<W>(
config: &Config,
wait_strategy: W,
) -> (
EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
NcBuilder<W>,
GatedSequence<W::Notifier>,
)
where
W: WaitStrategy,
{
let (wal_poller, builder) = build_single_producer(
config.ring_size_pow2,
|| {
RingSlot::new(Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
market_name: String::new(),
market_seq: 0,
timestamp: crate::types::unix_epoch(),
metadata: crate::book::EventMetadata::default(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
close_batch_max_events: None,
},
},
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let (builder, gates) = builder.new_gates::<1>();
let durability_gate = gates[0].clone();
let builder = builder.and_then_with_dependents(&gates);
(wal_poller, builder, durability_gate)
}
struct EngineBuildCtx<W: WaitStrategy> {
config: Config,
root: EngineRoot,
wal_poller: EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
journaler: JournalHandler,
durability_gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
}
fn build_engine<B, W>(
producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
ctx: EngineBuildCtx<W>,
) -> anyhow::Result<Engine>
where
B: disrupt_rs::Barrier + 'static,
W: WaitStrategy + Clone + Send + 'static,
{
let EngineBuildCtx {
config,
root,
wal_poller,
journaler,
durability_gate,
shutdown,
metrics,
pinning,
wait_strategy,
} = ctx;
let wal_maintenance = WalMaintenanceHandle::default();
let wal_thread = journaler
.into_poller(
wal_poller,
wait_strategy,
durability_gate,
Arc::clone(&shutdown),
wal_maintenance.clone(),
)
.poll_pinned(pinning.wal_poller_core);
let channel_capacity = config.engine_channel_capacity.max(1);
let (normal_tx, normal_rx) = flume::bounded::<EngineMsg>(channel_capacity);
let (priority_tx, priority_rx) = flume::bounded::<EngineMsg>(channel_capacity);
let handler = EngineHandler {
normal_tx,
priority_tx,
};
let engine_thread = {
let shutdown = Arc::clone(&shutdown);
let core_id = pinning.engine_core;
thread::spawn(move || {
if let Some(id) = core_id
&& !core_affinity::set_for_current(core_affinity::CoreId { id })
{
warn!(
core_id = id,
"failed to pin engine thread to requested core"
);
}
crate::metrics_stage!("engine");
run_engine_loop(root, producer, priority_rx, normal_rx, shutdown, config);
})
};
info!("engine threads started");
Ok(Engine {
handler,
wal_maintenance,
metrics,
shutdown,
engine_thread: Some(engine_thread),
wal_thread: Some(wal_thread),
})
}
pub struct Engine {
handler: EngineHandler,
wal_maintenance: WalMaintenanceHandle,
metrics: crate::metrics::Metrics,
shutdown: Arc<AtomicBool>,
engine_thread: Option<thread::JoinHandle<()>>,
wal_thread: Option<thread::JoinHandle<anyhow::Result<()>>>,
}
impl Engine {
pub fn handle(&self) -> EngineHandler {
self.handler.clone()
}
pub fn wal_maintenance(&self) -> WalMaintenanceHandle {
self.wal_maintenance.clone()
}
pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
self.metrics.snapshot()
}
pub fn metrics(&self) -> crate::metrics::Metrics {
self.metrics.clone()
}
pub fn stop(&mut self) -> anyhow::Result<()> {
self.shutdown.store(true, Ordering::Release);
let Some(engine_thread) = self.engine_thread.take() else {
if let Some(wal_thread) = self.wal_thread.take()
&& wal_thread.join().is_err()
{
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
return Ok(());
};
if engine_thread.join().is_err() {
error!("engine thread panicked");
}
if let Some(wal_thread) = self.wal_thread.take() {
match wal_thread.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!(error = ?e, "wal thread returned error");
return Err(e);
}
Err(_) => {
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
}
}
Ok(())
}
}
fn run_engine_loop<B, W>(
mut root: EngineRoot,
mut producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
priority_rx: flume::Receiver<EngineMsg>,
normal_rx: flume::Receiver<EngineMsg>,
shutdown: Arc<AtomicBool>,
config: Config,
) where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
let mut internal_work = VecDeque::with_capacity(INTERNAL_WORKLIST_CAPACITY);
seed_internal_worklist_from_root(&root, &mut internal_work);
let mut external_since_internal = 0usize;
let mut idle_streak = 0u32;
let mut pending_normal = None;
while !shutdown.load(Ordering::Acquire) {
crate::metric!({ inflight: 0 });
crate::metric!({
queue_len: priority_rx
.len()
.saturating_add(normal_rx.len())
.saturating_add(internal_work.len())
.saturating_add(usize::from(pending_normal.is_some())) as i64
});
if !internal_work.is_empty()
&& external_since_internal >= config.engine_max_external_before_internal_burst
{
let limits = burst_limits_for_idle_streak(&config, idle_streak);
run_internal_burst(
&mut root,
&mut producer,
&mut internal_work,
&shutdown,
limits,
);
external_since_internal = 0;
continue;
};
match recv_external_timeout(
&priority_rx,
&normal_rx,
&mut pending_normal,
Duration::from_millis(config.engine_poll_timeout_ms),
) {
ExternalRecv::Message(msg) => {
idle_streak = 0;
if let Some(outcome) = process_msg(&mut root, &mut producer, msg, &shutdown) {
reconcile_internal_work(
&root,
outcome.market_id,
outcome.scheduler_maybe_changed,
outcome.status,
&mut internal_work,
);
}
if shutdown.load(Ordering::Acquire) {
break;
}
external_since_internal = external_since_internal.saturating_add(1);
}
ExternalRecv::Timeout => {
idle_streak = idle_streak.saturating_add(1);
if internal_work.is_empty() {
continue;
}
let limits = burst_limits_for_idle_streak(&config, idle_streak);
run_internal_burst(
&mut root,
&mut producer,
&mut internal_work,
&shutdown,
limits,
);
external_since_internal = 0;
}
ExternalRecv::Disconnected => break,
}
}
}
enum ExternalRecv {
Message(EngineMsg),
Timeout,
Disconnected,
}
#[inline]
fn try_recv_external(
priority_rx: &flume::Receiver<EngineMsg>,
normal_rx: &flume::Receiver<EngineMsg>,
pending_normal: &mut Option<EngineMsg>,
) -> ExternalRecv {
if let Ok(msg) = priority_rx.try_recv() {
return ExternalRecv::Message(msg);
}
if let Some(msg) = pending_normal.take() {
return ExternalRecv::Message(msg);
}
if let Ok(msg) = normal_rx.try_recv() {
return ExternalRecv::Message(msg);
}
if priority_rx.is_disconnected() && normal_rx.is_disconnected() {
ExternalRecv::Disconnected
} else {
ExternalRecv::Timeout
}
}
fn recv_external_timeout(
priority_rx: &flume::Receiver<EngineMsg>,
normal_rx: &flume::Receiver<EngineMsg>,
pending_normal: &mut Option<EngineMsg>,
timeout: Duration,
) -> ExternalRecv {
match try_recv_external(priority_rx, normal_rx, pending_normal) {
ExternalRecv::Timeout => {}
ready => return ready,
}
let priority_disconnected = priority_rx.is_disconnected();
let normal_disconnected = normal_rx.is_disconnected();
if priority_disconnected && normal_disconnected {
return ExternalRecv::Disconnected;
}
let mut selector = flume::Selector::new();
if !priority_disconnected {
selector = selector.recv(priority_rx, |msg| (CommandLane::Priority, msg));
}
if !normal_disconnected {
selector = selector.recv(normal_rx, |msg| (CommandLane::Normal, msg));
}
match selector.wait_timeout(timeout) {
Ok((CommandLane::Priority, Ok(msg))) => ExternalRecv::Message(msg),
Ok((CommandLane::Normal, Ok(msg))) => {
debug_assert!(pending_normal.is_none());
*pending_normal = Some(msg);
try_recv_external(priority_rx, normal_rx, pending_normal)
}
Ok((_, Err(_))) => try_recv_external(priority_rx, normal_rx, pending_normal),
Err(_) => ExternalRecv::Timeout,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProcessMsgStatus {
Applied,
Rejected,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct ProcessMsgOutcome {
market_id: MarketId,
scheduler_maybe_changed: bool,
status: ProcessMsgStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct InternalBurstLimits {
max_commands: usize,
max_elapsed: Duration,
}
fn process_msg<B, W>(
root: &mut EngineRoot,
producer: &mut disrupt_rs::SingleProducer<EngineEvent, B, W>,
msg: EngineMsg,
shutdown: &Arc<AtomicBool>,
) -> Option<ProcessMsgOutcome>
where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
crate::metric!({ inflight: 1, in_total: 1 });
let t0 = Instant::now();
let EngineMsg::Command { cmd, resp_tx } = msg;
let market_id = cmd.market_id;
let scheduler_maybe_changed = cmd.kind.may_affect_batch_scheduler();
trace!(
market_id = ?cmd.market_id,
correlation_id = ?cmd.correlation_id,
command = ?cmd.kind,
"engine command"
);
let start_seq = root.version().saturating_add(1);
if let Err(err) = root.handle(cmd) {
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 0,
inflight: 0
});
if let Some(tx) = resp_tx
&& tx
.send(Ok(Response::Rejected {
reason: err.into_reason(),
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending rejection");
}
return Some(ProcessMsgOutcome {
market_id,
scheduler_maybe_changed,
status: ProcessMsgStatus::Rejected,
});
}
let events = root.take_changes();
let events_len = events.len();
debug_assert!(
events_len > 0,
"accepted commands must emit at least one event"
);
let tx_id = root.take_tx_id();
let tx_len = match u16::try_from(events_len) {
Ok(len) => len,
Err(_) => {
error!(
events_len,
"tx_len overflow: too many events for single transaction"
);
if let Some(tx) = resp_tx.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 0,
inflight: 0
});
return Some(ProcessMsgOutcome {
market_id,
scheduler_maybe_changed,
status: ProcessMsgStatus::Rejected,
});
}
};
root.apply(&events);
let resp_tx_for_error = resp_tx.clone();
let callback = resp_tx.map(|tx| ResponseCallback { tx });
if events_len == 1 {
let Some(payload) = events.into_iter().next() else {
error!("event batch empty despite events_len == 1");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
return None;
};
producer.publish(move |slot| {
slot.seq = start_seq;
slot.payload = payload;
slot.response_cb = callback;
slot.tx_id = tx_id;
slot.tx_len = 1;
slot.tx_ix = 0;
});
} else {
let mut payloads = events.into_iter();
let mut callback = callback;
let mut publish_error = None;
producer.batch_publish(events_len, |iter| {
publish_error =
fill_batch_slots(iter, &mut payloads, start_seq, tx_id, tx_len, &mut callback)
.err();
});
if let Some(err_kind) = publish_error {
error!(?err_kind, "batch_publish payload/slot length mismatch");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
return None;
}
}
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: events_len as u64,
inflight: 0
});
Some(ProcessMsgOutcome {
market_id,
scheduler_maybe_changed,
status: ProcessMsgStatus::Applied,
})
}
fn run_internal_burst<B, W>(
root: &mut EngineRoot,
producer: &mut disrupt_rs::SingleProducer<EngineEvent, B, W>,
internal_work: &mut VecDeque<MarketId>,
shutdown: &Arc<AtomicBool>,
limits: InternalBurstLimits,
) where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
let started = Instant::now();
let mut processed = 0usize;
while processed < limits.max_commands
&& started.elapsed() < limits.max_elapsed
&& !shutdown.load(Ordering::Acquire)
{
prune_stale_internal_head(root, internal_work);
let Some(&market_id) = internal_work.front() else {
break;
};
let Some(cmd) = continuation_command_for_market(root, market_id) else {
internal_work.pop_front();
continue;
};
let Some(outcome) = process_msg(
root,
producer,
EngineMsg::Command { cmd, resp_tx: None },
shutdown,
) else {
break;
};
reconcile_internal_work(
root,
outcome.market_id,
outcome.scheduler_maybe_changed,
outcome.status,
internal_work,
);
if matches!(outcome.status, ProcessMsgStatus::Rejected)
&& market_has_active_batch_work(root, market_id)
{
warn!(
market_id = ?market_id,
"internal continuation rejected while batch work remains active"
);
break;
}
processed = processed.saturating_add(1);
}
}
fn reconcile_internal_work(
root: &EngineRoot,
market_id: MarketId,
scheduler_maybe_changed: bool,
status: ProcessMsgStatus,
internal_work: &mut VecDeque<MarketId>,
) {
if internal_work.front().copied() == Some(market_id)
&& !market_has_active_batch_work(root, market_id)
{
internal_work.pop_front();
}
if matches!(status, ProcessMsgStatus::Applied) && scheduler_maybe_changed {
enqueue_market_if_batch_active(root, market_id, internal_work);
}
}
fn prune_stale_internal_head(root: &EngineRoot, internal_work: &mut VecDeque<MarketId>) {
while let Some(&market_id) = internal_work.front() {
if market_has_active_batch_work(root, market_id) {
break;
}
internal_work.pop_front();
}
}
fn enqueue_market_if_batch_active(
root: &EngineRoot,
market_id: MarketId,
internal_work: &mut VecDeque<MarketId>,
) {
if !market_has_active_batch_work(root, market_id) || internal_work.contains(&market_id) {
return;
}
internal_work.push_back(market_id);
}
fn market_has_active_batch_work(root: &EngineRoot, market_id: MarketId) -> bool {
root.state()
.markets
.get(&market_id)
.and_then(|market| market.book.batch_process_state())
.is_some()
}
fn market_has_recorded_batch_progress(root: &EngineRoot, market_id: MarketId) -> bool {
root.state()
.markets
.get(&market_id)
.and_then(|market| market.book.batch_process_state())
.is_some_and(|state| state.chunks_done > 0)
}
fn seed_internal_worklist_from_root(root: &EngineRoot, internal_work: &mut VecDeque<MarketId>) {
let mut active_markets: Vec<_> = root
.state()
.markets
.keys()
.copied()
.filter(|&market_id| market_has_active_batch_work(root, market_id))
.collect();
active_markets.sort_unstable_by_key(|market_id| market_id.0);
let prioritized = active_markets
.iter()
.copied()
.find(|&market_id| market_has_recorded_batch_progress(root, market_id));
if let Some(market_id) = prioritized {
internal_work.push_back(market_id);
}
for market_id in active_markets {
if Some(market_id) != prioritized {
internal_work.push_back(market_id);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BatchFillError {
IteratorLongerThanPayloads,
IteratorShorterThanPayloads,
}
fn fill_batch_slots<'a, I>(
iter: I,
payloads: &mut std::vec::IntoIter<BookEventEnvelope>,
start_seq: u64,
tx_id: u64,
tx_len: u16,
callback: &mut Option<ResponseCallback>,
) -> Result<(), BatchFillError>
where
I: Iterator<Item = &'a mut EngineEvent>,
{
let mut last_slot: Option<&'a mut EngineEvent> = None;
for (ix, slot) in iter.enumerate() {
let Some(payload) = payloads.next() else {
return Err(BatchFillError::IteratorLongerThanPayloads);
};
slot.seq = start_seq.saturating_add(ix as u64);
slot.payload = payload;
slot.response_cb = None;
slot.tx_id = tx_id;
slot.tx_len = tx_len;
slot.tx_ix = ix as u16;
last_slot = Some(slot);
}
if payloads.next().is_some() {
return Err(BatchFillError::IteratorShorterThanPayloads);
}
if let Some(slot) = last_slot {
slot.response_cb = callback.take();
}
Ok(())
}
fn continue_batch_process_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueBatchProcess,
}
}
fn continuation_command_for_market(root: &EngineRoot, market_id: MarketId) -> Option<Command> {
let market = root.state().markets.get(&market_id)?;
market.book.batch_process_state()?;
Some(continue_batch_process_command(market_id))
}
fn idle_burst_multiplier(idle_streak: u32) -> u32 {
match idle_streak {
0 => 1,
1 => 2,
_ => 4,
}
}
fn burst_limits_for_idle_streak(config: &Config, idle_streak: u32) -> InternalBurstLimits {
let multiplier = idle_burst_multiplier(idle_streak) as usize;
InternalBurstLimits {
max_commands: config
.engine_internal_burst_base_commands
.max(1)
.saturating_mul(multiplier),
max_elapsed: Duration::from_millis(
config
.engine_internal_burst_base_elapsed_ms
.max(1)
.saturating_mul(multiplier as u64),
),
}
}
impl Drop for Engine {
fn drop(&mut self) {
if let Err(err) = self.stop() {
error!(error = ?err, "failed to stop engine cleanly during drop");
}
}
}
#[derive(Clone)]
pub struct EngineHandler {
pub(crate) normal_tx: flume::Sender<EngineMsg>,
pub(crate) priority_tx: flume::Sender<EngineMsg>,
}
pub(crate) enum EngineMsg {
Command {
cmd: Command,
resp_tx:
Option<flume::Sender<Result<crate::book::protocol::response::Response, EngineError>>>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommandLane {
Normal,
Priority,
}
impl EngineHandler {
pub fn submit(&self, cmd: Command) -> Result<(), EngineError> {
self.submit_on(CommandLane::Normal, cmd)
}
pub fn submit_on(&self, lane: CommandLane, cmd: Command) -> Result<(), EngineError> {
self.tx_for_lane(lane)
.send(EngineMsg::Command { cmd, resp_tx: None })
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(())
}
pub fn submit_with_response(
&self,
cmd: Command,
) -> Result<
flume::Receiver<Result<crate::book::protocol::response::Response, EngineError>>,
EngineError,
> {
self.submit_with_response_on(CommandLane::Normal, cmd)
}
pub fn submit_with_response_on(
&self,
lane: CommandLane,
cmd: Command,
) -> Result<
flume::Receiver<Result<crate::book::protocol::response::Response, EngineError>>,
EngineError,
> {
let (resp_tx, resp_rx) = flume::bounded(1);
self.tx_for_lane(lane)
.send(EngineMsg::Command {
cmd,
resp_tx: Some(resp_tx),
})
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(resp_rx)
}
#[inline]
fn tx_for_lane(&self, lane: CommandLane) -> &flume::Sender<EngineMsg> {
match lane {
CommandLane::Normal => &self.normal_tx,
CommandLane::Priority => &self.priority_tx,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
book::{
BookEvent, BookMarketState,
protocol::command::{MarketState, Persistence, Side, TimeInForce},
},
engine::state::MarketConfig,
types::{AccountId, CorrelationId, MarketPhase, Money, OddsX10000, RunnerId},
};
fn mk_env(market_seq: u64) -> BookEventEnvelope {
BookEventEnvelope {
market_id: MarketId(7),
market_name: String::new(),
market_seq,
timestamp: crate::types::unix_epoch(),
metadata: None,
event: BookEvent::MarketStateChanged {
to: BookMarketState::Open,
reason: "T".to_string(),
close_batch_max_events: None,
},
}
}
fn mk_slot() -> EngineEvent {
RingSlot::new(Envelope {
seq: 0,
payload: mk_env(0),
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
}
fn test_root(market_ids: impl IntoIterator<Item = MarketId>) -> EngineRoot {
let cfg = Config {
close_batch_max_events: crate::book::close_process::MIN_CLOSE_BATCH_EVENTS,
..Default::default()
};
let mut state = EngineState::new(cfg.clone());
for market_id in market_ids {
state.markets.insert(
market_id,
Market::from_config_with_cfg(
&cfg,
MarketConfig::TwoRunner {
market_id,
name: format!("Market {}", market_id.0),
runner_a: RunnerId(1),
runner_b: RunnerId(2),
market_kind: MarketKind::InPlayCapable,
market_state: crate::book::BookMarketState::Open,
market_phase: MarketPhase::Pre,
},
),
);
}
EngineRoot::new_with(state, 0)
}
fn place_order_cmd(market_id: MarketId, correlation_id: u64, account_id: u64) -> Command {
Command {
correlation_id: Some(CorrelationId(correlation_id.to_string())),
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: RunnerId(1),
account_id: AccountId::from(account_id),
client_order_id: None,
side: Side::Yes,
odds: OddsX10000(20_000),
stake: Money(100),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
}
}
fn close_market_cmd(market_id: MarketId) -> Command {
Command {
correlation_id: Some(CorrelationId(format!("close-{}", market_id.0))),
metadata: None,
market_id,
kind: CommandKind::CloseMarket {
reason: "TEST_CLOSE".to_string(),
},
}
}
fn state_cmd(label: &str) -> Command {
Command {
correlation_id: Some(CorrelationId(label.to_string())),
metadata: None,
market_id: MarketId(99),
kind: CommandKind::SetMarketState {
state: MarketState::Open,
},
}
}
fn state_msg(label: &str) -> EngineMsg {
EngineMsg::Command {
cmd: state_cmd(label),
resp_tx: None,
}
}
fn assert_msg_label(msg: EngineMsg, expected: &str) {
let EngineMsg::Command { cmd, .. } = msg;
assert_eq!(
cmd.correlation_id.as_ref().map(|id| id.as_str()),
Some(expected)
);
}
fn apply_command(root: &mut EngineRoot, cmd: Command) {
root.handle(cmd).expect("command should succeed");
let events = root.take_changes();
root.take_tx_id();
root.apply(&events);
}
fn place_orders(root: &mut EngineRoot, market_id: MarketId, count: u64) {
for offset in 0..count {
apply_command(
root,
place_order_cmd(
market_id,
market_id.0 + offset + 1,
market_id.0 + 10_000 + offset,
),
);
}
}
fn apply_close_start_without_progress(root: &mut EngineRoot, market_id: MarketId) {
let market = root
.state()
.markets
.get(&market_id)
.expect("market should exist");
let batch_max_events = crate::book::close_process::normalize_close_batch_events(
root.state().cfg.close_batch_max_events,
);
let state_changed = BookEventEnvelope {
market_id,
market_name: market.name.clone(),
market_seq: market.last_market_seq.saturating_add(1),
timestamp: crate::types::unix_epoch(),
metadata: None,
event: BookEvent::MarketStateChanged {
to: BookMarketState::Closed,
reason: "SET_MARKET_STATE".to_string(),
close_batch_max_events: Some(batch_max_events),
},
};
let batch_started = BookEventEnvelope {
market_id,
market_name: market.name.clone(),
market_seq: market.last_market_seq.saturating_add(2),
timestamp: crate::types::unix_epoch(),
metadata: None,
event: BookEvent::BatchProcessStarted {
batch_mode: crate::book::BatchMode::Close,
batch_max_events,
target: crate::book::BatchProcessTarget::AllLiveOrders,
detail: None,
},
};
root.apply(&[state_changed, batch_started]);
}
#[test]
fn fill_batch_slots_populates_and_sets_callback_on_last_slot() {
let mut payloads = vec![mk_env(1), mk_env(2)].into_iter();
let mut slots = [mk_slot(), mk_slot()];
let (tx, _) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 100, 99, 2, &mut cb);
assert_eq!(res, Ok(()));
assert!(cb.is_none(), "callback should move to last slot");
assert_eq!(slots[0].seq, 100);
assert_eq!(slots[1].seq, 101);
assert!(slots[0].response_cb.is_none());
assert!(slots[1].response_cb.is_some());
}
#[test]
fn fill_batch_slots_errors_when_slots_exceed_payloads() {
let mut payloads = vec![mk_env(1)].into_iter();
let mut slots = [mk_slot(), mk_slot()];
let (tx, _) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 10, 5, 2, &mut cb);
assert_eq!(res, Err(BatchFillError::IteratorLongerThanPayloads));
assert!(cb.is_some(), "callback should remain untouched on error");
}
#[test]
fn fill_batch_slots_errors_when_payloads_exceed_slots() {
let mut payloads = vec![mk_env(1), mk_env(2), mk_env(3)].into_iter();
let mut slots = [mk_slot(), mk_slot()];
let (tx, _) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 10, 5, 3, &mut cb);
assert_eq!(res, Err(BatchFillError::IteratorShorterThanPayloads));
assert!(cb.is_some(), "callback should remain untouched on error");
}
#[test]
fn idle_burst_limits_scale_with_idle_streak() {
let cfg = Config {
engine_internal_burst_base_commands: 4,
engine_internal_burst_base_elapsed_ms: 3,
..Default::default()
};
assert_eq!(idle_burst_multiplier(0), 1);
assert_eq!(idle_burst_multiplier(1), 2);
assert_eq!(idle_burst_multiplier(2), 4);
assert_eq!(idle_burst_multiplier(99), 4);
let streak0 = burst_limits_for_idle_streak(&cfg, 0);
let streak1 = burst_limits_for_idle_streak(&cfg, 1);
let streak2 = burst_limits_for_idle_streak(&cfg, 2);
assert_eq!(streak0.max_commands, 4);
assert_eq!(streak0.max_elapsed, Duration::from_millis(3));
assert_eq!(streak1.max_commands, 8);
assert_eq!(streak1.max_elapsed, Duration::from_millis(6));
assert_eq!(streak2.max_commands, 16);
assert_eq!(streak2.max_elapsed, Duration::from_millis(12));
}
#[test]
fn external_recv_prioritizes_priority_over_queued_normal() {
let (priority_tx, priority_rx) = flume::bounded(4);
let (normal_tx, normal_rx) = flume::bounded(4);
let mut pending_normal = None;
normal_tx.send(state_msg("normal-1")).unwrap();
priority_tx.send(state_msg("priority-1")).unwrap();
match try_recv_external(&priority_rx, &normal_rx, &mut pending_normal) {
ExternalRecv::Message(msg) => assert_msg_label(msg, "priority-1"),
_ => panic!("expected priority message"),
}
assert!(pending_normal.is_none());
match try_recv_external(&priority_rx, &normal_rx, &mut pending_normal) {
ExternalRecv::Message(msg) => assert_msg_label(msg, "normal-1"),
_ => panic!("expected normal message after priority drains"),
}
}
#[test]
fn external_recv_processes_pending_normal_before_normal_queue() {
let (_priority_tx, priority_rx) = flume::bounded(4);
let (normal_tx, normal_rx) = flume::bounded(4);
let mut pending_normal = Some(state_msg("pending-normal"));
normal_tx.send(state_msg("queued-normal")).unwrap();
match try_recv_external(&priority_rx, &normal_rx, &mut pending_normal) {
ExternalRecv::Message(msg) => assert_msg_label(msg, "pending-normal"),
_ => panic!("expected pending normal message"),
}
match try_recv_external(&priority_rx, &normal_rx, &mut pending_normal) {
ExternalRecv::Message(msg) => assert_msg_label(msg, "queued-normal"),
_ => panic!("expected queued normal message"),
}
}
#[test]
fn external_recv_prioritizes_priority_over_pending_normal() {
let (priority_tx, priority_rx) = flume::bounded(4);
let (_normal_tx, normal_rx) = flume::bounded(4);
let mut pending_normal = Some(state_msg("pending-normal"));
priority_tx.send(state_msg("priority-1")).unwrap();
match try_recv_external(&priority_rx, &normal_rx, &mut pending_normal) {
ExternalRecv::Message(msg) => assert_msg_label(msg, "priority-1"),
_ => panic!("expected priority message"),
}
match try_recv_external(&priority_rx, &normal_rx, &mut pending_normal) {
ExternalRecv::Message(msg) => assert_msg_label(msg, "pending-normal"),
_ => panic!("expected pending normal message after priority drains"),
}
}
#[test]
fn engine_handler_routes_submissions_to_selected_lane() {
let (normal_tx, normal_rx) = flume::bounded(4);
let (priority_tx, priority_rx) = flume::bounded(4);
let handler = EngineHandler {
normal_tx,
priority_tx,
};
handler
.submit_on(CommandLane::Normal, state_cmd("normal-1"))
.unwrap();
handler
.submit_on(CommandLane::Priority, state_cmd("priority-1"))
.unwrap();
let response_rx = handler
.submit_with_response_on(CommandLane::Priority, state_cmd("priority-2"))
.unwrap();
assert_msg_label(priority_rx.recv().unwrap(), "priority-1");
let EngineMsg::Command { cmd, resp_tx } = priority_rx.recv().unwrap();
assert_eq!(
cmd.correlation_id.as_ref().map(|id| id.as_str()),
Some("priority-2")
);
assert!(resp_tx.is_some());
drop(response_rx);
assert_msg_label(normal_rx.recv().unwrap(), "normal-1");
}
#[test]
fn enqueue_market_if_batch_active_dedupes_and_skips_inactive_markets() {
let market_id = MarketId(10);
let inactive_market_id = MarketId(11);
let mut root = test_root([market_id, inactive_market_id]);
place_orders(&mut root, market_id, 3);
apply_command(&mut root, close_market_cmd(market_id));
let mut internal_work = VecDeque::with_capacity(INTERNAL_WORKLIST_CAPACITY);
enqueue_market_if_batch_active(&root, market_id, &mut internal_work);
enqueue_market_if_batch_active(&root, market_id, &mut internal_work);
enqueue_market_if_batch_active(&root, inactive_market_id, &mut internal_work);
assert_eq!(internal_work, VecDeque::from([market_id]));
}
#[test]
fn seed_internal_worklist_prioritizes_first_progressed_market_then_market_id_order() {
let mid1 = MarketId(1);
let mid2 = MarketId(2);
let mid3 = MarketId(3);
let mut root = test_root([mid1, mid2, mid3]);
place_orders(&mut root, mid1, 2);
place_orders(&mut root, mid2, 3);
place_orders(&mut root, mid3, 2);
apply_close_start_without_progress(&mut root, mid1);
apply_command(&mut root, close_market_cmd(mid2));
apply_close_start_without_progress(&mut root, mid3);
let mut internal_work = VecDeque::with_capacity(INTERNAL_WORKLIST_CAPACITY);
seed_internal_worklist_from_root(&root, &mut internal_work);
assert_eq!(internal_work, VecDeque::from([mid2, mid1, mid3]));
}
#[test]
fn prune_stale_internal_head_drops_inactive_entries() {
let active_market_id = MarketId(21);
let stale_market_id = MarketId(22);
let mut root = test_root([active_market_id, stale_market_id]);
place_orders(&mut root, active_market_id, 3);
apply_command(&mut root, close_market_cmd(active_market_id));
let mut internal_work = VecDeque::from([stale_market_id, active_market_id]);
prune_stale_internal_head(&root, &mut internal_work);
assert_eq!(internal_work, VecDeque::from([active_market_id]));
}
}