use super::error::EngineError;
use crate::{
book::{
BookEvent, BookEventEnvelope,
protocol::{
command::{Command, CommandKind},
reject::RejectReason,
response::Response,
},
},
config::{Config, ThreadPinning},
disruptor::{Envelope, ResponseCallback, RingSlot},
engine::{
journaler::JournalHandler,
root::EngineRoot,
state::{EngineState, Market, MarketConfig},
},
error::RuntimeError,
types::{MarketId, MarketKind},
};
use std::{
collections::VecDeque,
marker::PhantomData,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
use disrupt_rs::{
BusySpin, EventPoller, GatedSequence, MultiConsumerDependentsBarrier, Producer,
SingleProducerBarrier, build_single_producer,
builder::{MC, NC, SC, single::SPBuilder},
wait_strategies::WaitStrategy,
};
use tracing::{error, info, trace, warn};
mod tuple;
pub use tuple::HandlerRecovery;
#[doc(hidden)]
pub use tuple::RegisteredHandler;
pub type EngineEvent = RingSlot<BookEventEnvelope>;
type NcBuilder<W> = SPBuilder<NC, EngineEvent, W, MultiConsumerDependentsBarrier>;
type ScBuilder<W> = SPBuilder<SC, EngineEvent, W, MultiConsumerDependentsBarrier>;
type McBuilder<W> = SPBuilder<MC, EngineEvent, W, MultiConsumerDependentsBarrier>;
const INTERNAL_WORKLIST_CAPACITY: usize = 100;
pub struct EngineBuilder<B, W = BusySpin, Hs = ()>
where
W: WaitStrategy + Clone + Send + 'static,
{
config: Config,
root: EngineRoot,
wal_path: PathBuf,
registered_handlers: Hs,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
_builder_state: PhantomData<fn() -> B>,
}
impl EngineBuilder<NcBuilder<BusySpin>, BusySpin> {
pub fn with_markets(
config: Config,
markets: impl IntoIterator<Item = MarketConfig>,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let mut state = EngineState::new(config.clone());
for market_config in markets {
let market_id = market_config.market_id();
state.markets.insert(
market_id,
Market::from_config_with_cfg(&config, market_config),
);
}
let root = EngineRoot::new_with(state, 0);
Self::build_inner(root, config, wal_path)
}
pub fn new<I, S>(config: Config, markets: I, wal_path: impl AsRef<Path>) -> anyhow::Result<Self>
where
I: IntoIterator<Item = (MarketId, S)>,
S: Into<String>,
{
let markets = markets.into_iter().map(|(mid, name)| {
MarketConfig::multi_runner_dynamic(mid, name, MarketKind::InPlayCapable)
});
Self::with_markets(config, markets, wal_path)
}
pub fn from_root(
initial_root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
Self::build_inner(initial_root, config, wal_path)
}
fn build_inner(
root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
crate::btx_metrics_init!(
"engine",
"wal_poller",
"projection_apply",
"projection_live",
"projection_snapshot",
"projection_disk",
);
let metrics = crate::metrics::globals::metrics();
let pinning = config.pinning.clone().unwrap_or_default();
Ok(EngineBuilder {
config,
root,
wal_path: wal_path.as_ref().to_path_buf(),
registered_handlers: (),
shutdown: Arc::new(AtomicBool::new(false)),
metrics,
pinning,
wait_strategy: BusySpin,
_builder_state: PhantomData,
})
}
}
impl<B, W, R> EngineBuilder<B, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
fn map_handlers<B2, R2>(self, map: impl FnOnce(R) -> R2) -> EngineBuilder<B2, W, R2> {
let EngineBuilder {
config,
root,
wal_path,
registered_handlers,
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: _,
} = self;
EngineBuilder {
config,
root,
wal_path,
registered_handlers: map(registered_handlers),
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: PhantomData,
}
}
fn set_wait_strategy<B2, W2>(self, wait_strategy: W2) -> EngineBuilder<B2, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
EngineBuilder {
config: self.config,
root: self.root,
wal_path: self.wal_path,
registered_handlers: self.registered_handlers,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy,
_builder_state: PhantomData,
}
}
}
impl<W, R> EngineBuilder<NcBuilder<W>, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<NcBuilder<W2>, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
self.set_wait_strategy(wait_strategy)
}
}
#[allow(clippy::type_complexity)]
fn build_disruptor_pipeline<W>(
config: &Config,
wait_strategy: W,
) -> (
EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
NcBuilder<W>,
GatedSequence<W::Notifier>,
)
where
W: WaitStrategy,
{
let (wal_poller, builder) = build_single_producer(
config.ring_size_pow2,
|| {
RingSlot::new(Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
market_name: String::new(),
market_seq: 0,
timestamp: crate::types::unix_epoch(),
metadata: crate::book::EventMetadata::default(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let (builder, gates) = builder.new_gates::<1>();
let durability_gate = gates[0].clone();
let builder = builder.and_then_with_dependents(&gates);
(wal_poller, builder, durability_gate)
}
struct EngineBuildCtx<W: WaitStrategy> {
config: Config,
root: EngineRoot,
wal_poller: EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
journaler: JournalHandler,
durability_gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
}
fn build_engine<B, W>(
producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
ctx: EngineBuildCtx<W>,
) -> anyhow::Result<Engine>
where
B: disrupt_rs::Barrier + 'static,
W: WaitStrategy + Clone + Send + 'static,
{
let EngineBuildCtx {
config,
root,
wal_poller,
journaler,
durability_gate,
shutdown,
metrics,
pinning,
wait_strategy,
} = ctx;
let wal_thread = journaler
.into_poller(
wal_poller,
wait_strategy.clone(),
durability_gate,
Arc::clone(&shutdown),
)
.poll_pinned(pinning.wal_poller_core);
let (cmd_tx, cmd_rx) =
flume::bounded::<EngineMsg>(crate::config::DEFAULT_ENGINE_CHANNEL_CAPACITY);
let handler = EngineHandler { tx: cmd_tx };
let engine_thread = {
let shutdown = Arc::clone(&shutdown);
let core_id = pinning.engine_core;
thread::spawn(move || {
if let Some(id) = core_id
&& !core_affinity::set_for_current(core_affinity::CoreId { id })
{
warn!(
core_id = id,
"failed to pin engine thread to requested core"
);
}
crate::metrics_stage!("engine");
run_engine_loop(root, producer, cmd_rx, shutdown, config);
})
};
info!("engine threads started");
Ok(Engine {
handler,
metrics,
shutdown,
engine_thread: Some(engine_thread),
wal_thread: Some(wal_thread),
})
}
pub struct Engine {
handler: EngineHandler,
metrics: crate::metrics::Metrics,
shutdown: Arc<AtomicBool>,
engine_thread: Option<thread::JoinHandle<()>>,
wal_thread: Option<thread::JoinHandle<anyhow::Result<()>>>,
}
impl Engine {
pub fn handle(&self) -> EngineHandler {
self.handler.clone()
}
pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
self.metrics.snapshot()
}
pub fn metrics(&self) -> crate::metrics::Metrics {
self.metrics.clone()
}
pub fn stop(&mut self) -> anyhow::Result<()> {
self.shutdown.store(true, Ordering::Release);
let Some(engine_thread) = self.engine_thread.take() else {
if let Some(wal_thread) = self.wal_thread.take()
&& wal_thread.join().is_err()
{
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
return Ok(());
};
if engine_thread.join().is_err() {
error!("engine thread panicked");
}
if let Some(wal_thread) = self.wal_thread.take() {
match wal_thread.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!(error = ?e, "wal thread returned error");
return Err(e);
}
Err(_) => {
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
}
}
Ok(())
}
}
fn run_engine_loop<B, W>(
mut root: EngineRoot,
mut producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
cmd_rx: flume::Receiver<EngineMsg>,
shutdown: Arc<AtomicBool>,
config: Config,
) where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
let mut internal_work = VecDeque::with_capacity(INTERNAL_WORKLIST_CAPACITY);
seed_internal_worklist_from_root(&root, &mut internal_work);
let mut external_since_internal = 0usize;
let mut idle_streak = 0u32;
while !shutdown.load(Ordering::Acquire) {
crate::metric!({ inflight: 0 });
crate::metric!({ queue_len: cmd_rx.len().saturating_add(internal_work.len()) as i64 });
if !internal_work.is_empty()
&& external_since_internal >= config.engine_max_external_before_internal_burst
{
let limits = burst_limits_for_idle_streak(&config, idle_streak);
run_internal_burst(
&mut root,
&mut producer,
&mut internal_work,
&shutdown,
limits,
);
external_since_internal = 0;
continue;
};
match cmd_rx.recv_timeout(Duration::from_millis(config.engine_poll_timeout_ms)) {
Ok(msg) => {
idle_streak = 0;
if let Some(outcome) = process_msg(&mut root, &mut producer, msg, &shutdown) {
reconcile_internal_work(
&root,
outcome.market_id,
outcome.scheduler_maybe_changed,
outcome.status,
&mut internal_work,
);
}
if shutdown.load(Ordering::Acquire) {
break;
}
external_since_internal = external_since_internal.saturating_add(1);
}
Err(flume::RecvTimeoutError::Timeout) => {
idle_streak = idle_streak.saturating_add(1);
if internal_work.is_empty() {
continue;
}
let limits = burst_limits_for_idle_streak(&config, idle_streak);
run_internal_burst(
&mut root,
&mut producer,
&mut internal_work,
&shutdown,
limits,
);
external_since_internal = 0;
}
Err(flume::RecvTimeoutError::Disconnected) => break,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ProcessMsgStatus {
Applied,
Rejected,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct ProcessMsgOutcome {
market_id: MarketId,
scheduler_maybe_changed: bool,
status: ProcessMsgStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct InternalBurstLimits {
max_commands: usize,
max_elapsed: Duration,
}
fn process_msg<B, W>(
root: &mut EngineRoot,
producer: &mut disrupt_rs::SingleProducer<EngineEvent, B, W>,
msg: EngineMsg,
shutdown: &Arc<AtomicBool>,
) -> Option<ProcessMsgOutcome>
where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
crate::metric!({ inflight: 1, in_total: 1 });
let t0 = Instant::now();
let EngineMsg::Command { cmd, resp_tx } = msg;
let market_id = cmd.market_id;
let scheduler_maybe_changed = cmd.kind.may_affect_batch_scheduler();
trace!(
market_id = ?cmd.market_id,
correlation_id = ?cmd.correlation_id,
command = ?cmd.kind,
"engine command"
);
let start_seq = root.version().saturating_add(1);
if let Err(err) = root.handle(cmd) {
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 0,
inflight: 0
});
if let Some(tx) = resp_tx
&& tx
.send(Ok(Response::Rejected {
reason: err.into_reason(),
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending rejection");
}
return Some(ProcessMsgOutcome {
market_id,
scheduler_maybe_changed,
status: ProcessMsgStatus::Rejected,
});
}
let events = root.take_changes();
let events_len = events.len();
debug_assert!(
events_len > 0,
"accepted commands must emit at least one event"
);
let tx_id = root.take_tx_id();
let tx_len = match u16::try_from(events_len) {
Ok(len) => len,
Err(_) => {
error!(
events_len,
"tx_len overflow: too many events for single transaction"
);
if let Some(tx) = resp_tx.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 0,
inflight: 0
});
return Some(ProcessMsgOutcome {
market_id,
scheduler_maybe_changed,
status: ProcessMsgStatus::Rejected,
});
}
};
root.apply(&events);
let resp_tx_for_error = resp_tx.clone();
let callback = resp_tx.map(|tx| ResponseCallback { tx });
if events_len == 1 {
let Some(payload) = events.into_iter().next() else {
error!("event batch empty despite events_len == 1");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
return None;
};
producer.publish(move |slot| {
slot.seq = start_seq;
slot.payload = payload;
slot.response_cb = callback;
slot.tx_id = tx_id;
slot.tx_len = 1;
slot.tx_ix = 0;
});
} else {
let mut payloads = events.into_iter();
let mut callback = callback;
let mut publish_error = None;
producer.batch_publish(events_len, |iter| {
publish_error =
fill_batch_slots(iter, &mut payloads, start_seq, tx_id, tx_len, &mut callback)
.err();
});
if let Some(err_kind) = publish_error {
error!(?err_kind, "batch_publish payload/slot length mismatch");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
return None;
}
}
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: events_len as u64,
inflight: 0
});
Some(ProcessMsgOutcome {
market_id,
scheduler_maybe_changed,
status: ProcessMsgStatus::Applied,
})
}
fn run_internal_burst<B, W>(
root: &mut EngineRoot,
producer: &mut disrupt_rs::SingleProducer<EngineEvent, B, W>,
internal_work: &mut VecDeque<MarketId>,
shutdown: &Arc<AtomicBool>,
limits: InternalBurstLimits,
) where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
let started = Instant::now();
let mut processed = 0usize;
while processed < limits.max_commands
&& started.elapsed() < limits.max_elapsed
&& !shutdown.load(Ordering::Acquire)
{
prune_stale_internal_head(root, internal_work);
let Some(&market_id) = internal_work.front() else {
break;
};
let Some(cmd) = continuation_command_for_market(root, market_id) else {
internal_work.pop_front();
continue;
};
let Some(outcome) = process_msg(
root,
producer,
EngineMsg::Command { cmd, resp_tx: None },
shutdown,
) else {
break;
};
reconcile_internal_work(
root,
outcome.market_id,
outcome.scheduler_maybe_changed,
outcome.status,
internal_work,
);
if matches!(outcome.status, ProcessMsgStatus::Rejected)
&& market_has_active_batch_work(root, market_id)
{
warn!(
market_id = ?market_id,
"internal continuation rejected while batch work remains active"
);
break;
}
processed = processed.saturating_add(1);
}
}
fn reconcile_internal_work(
root: &EngineRoot,
market_id: MarketId,
scheduler_maybe_changed: bool,
status: ProcessMsgStatus,
internal_work: &mut VecDeque<MarketId>,
) {
if internal_work.front().copied() == Some(market_id)
&& !market_has_active_batch_work(root, market_id)
{
internal_work.pop_front();
}
if matches!(status, ProcessMsgStatus::Applied) && scheduler_maybe_changed {
enqueue_market_if_batch_active(root, market_id, internal_work);
}
}
fn prune_stale_internal_head(root: &EngineRoot, internal_work: &mut VecDeque<MarketId>) {
while let Some(&market_id) = internal_work.front() {
if market_has_active_batch_work(root, market_id) {
break;
}
internal_work.pop_front();
}
}
fn enqueue_market_if_batch_active(
root: &EngineRoot,
market_id: MarketId,
internal_work: &mut VecDeque<MarketId>,
) {
if !market_has_active_batch_work(root, market_id) || internal_work.contains(&market_id) {
return;
}
internal_work.push_back(market_id);
}
fn market_has_active_batch_work(root: &EngineRoot, market_id: MarketId) -> bool {
root.state()
.markets
.get(&market_id)
.and_then(|market| market.book.batch_process_state())
.is_some()
}
fn market_has_recorded_batch_progress(root: &EngineRoot, market_id: MarketId) -> bool {
root.state()
.markets
.get(&market_id)
.and_then(|market| market.book.batch_process_state())
.is_some_and(|state| state.chunks_done > 0)
}
fn seed_internal_worklist_from_root(root: &EngineRoot, internal_work: &mut VecDeque<MarketId>) {
let mut active_markets: Vec<_> = root
.state()
.markets
.keys()
.copied()
.filter(|&market_id| market_has_active_batch_work(root, market_id))
.collect();
active_markets.sort_unstable_by_key(|market_id| market_id.0);
let prioritized = active_markets
.iter()
.copied()
.find(|&market_id| market_has_recorded_batch_progress(root, market_id));
if let Some(market_id) = prioritized {
internal_work.push_back(market_id);
}
for market_id in active_markets {
if Some(market_id) != prioritized {
internal_work.push_back(market_id);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BatchFillError {
IteratorLongerThanPayloads,
IteratorShorterThanPayloads,
}
fn fill_batch_slots<'a, I>(
iter: I,
payloads: &mut std::vec::IntoIter<BookEventEnvelope>,
start_seq: u64,
tx_id: u64,
tx_len: u16,
callback: &mut Option<ResponseCallback>,
) -> Result<(), BatchFillError>
where
I: Iterator<Item = &'a mut EngineEvent>,
{
let mut last_slot: Option<&'a mut EngineEvent> = None;
for (ix, slot) in iter.enumerate() {
let Some(payload) = payloads.next() else {
return Err(BatchFillError::IteratorLongerThanPayloads);
};
slot.seq = start_seq.saturating_add(ix as u64);
slot.payload = payload;
slot.response_cb = None;
slot.tx_id = tx_id;
slot.tx_len = tx_len;
slot.tx_ix = ix as u16;
last_slot = Some(slot);
}
if payloads.next().is_some() {
return Err(BatchFillError::IteratorShorterThanPayloads);
}
if let Some(slot) = last_slot {
slot.response_cb = callback.take();
}
Ok(())
}
fn continue_close_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueCloseMarket,
}
}
fn continue_batch_cancel_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueBatchCancelOrders,
}
}
fn continue_lapse_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueLapseOrders,
}
}
fn continuation_command_for_market(root: &EngineRoot, market_id: MarketId) -> Option<Command> {
let market = root.state().markets.get(&market_id)?;
match market.book.batch_process_state()?.kind {
crate::book::BatchProcessKind::Close => Some(continue_close_command(market_id)),
crate::book::BatchProcessKind::Cancel => Some(continue_batch_cancel_command(market_id)),
crate::book::BatchProcessKind::Lapse => Some(continue_lapse_command(market_id)),
}
}
fn idle_burst_multiplier(idle_streak: u32) -> u32 {
match idle_streak {
0 => 1,
1 => 2,
_ => 4,
}
}
fn burst_limits_for_idle_streak(config: &Config, idle_streak: u32) -> InternalBurstLimits {
let multiplier = idle_burst_multiplier(idle_streak) as usize;
InternalBurstLimits {
max_commands: config
.engine_internal_burst_base_commands
.max(1)
.saturating_mul(multiplier),
max_elapsed: Duration::from_millis(
config
.engine_internal_burst_base_elapsed_ms
.max(1)
.saturating_mul(multiplier as u64),
),
}
}
impl Drop for Engine {
fn drop(&mut self) {
if let Err(err) = self.stop() {
error!(error = ?err, "failed to stop engine cleanly during drop");
}
}
}
#[derive(Clone)]
pub struct EngineHandler {
pub(crate) tx: flume::Sender<EngineMsg>,
}
pub(crate) enum EngineMsg {
Command {
cmd: Command,
resp_tx:
Option<flume::Sender<Result<crate::book::protocol::response::Response, EngineError>>>,
},
}
impl EngineHandler {
pub fn submit(&self, cmd: Command) -> Result<(), EngineError> {
self.tx
.send(EngineMsg::Command { cmd, resp_tx: None })
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(())
}
pub fn submit_with_response(
&self,
cmd: Command,
) -> Result<
flume::Receiver<Result<crate::book::protocol::response::Response, EngineError>>,
EngineError,
> {
let (resp_tx, resp_rx) = flume::bounded(1);
self.tx
.send(EngineMsg::Command {
cmd,
resp_tx: Some(resp_tx),
})
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(resp_rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
book::{
BookEvent, BookMarketState,
protocol::command::{Persistence, Side, TimeInForce},
},
engine::state::MarketConfig,
types::{AccountId, CorrelationId, Money, OddsX10000, RunnerId},
};
fn mk_env(market_seq: u64) -> BookEventEnvelope {
BookEventEnvelope {
market_id: MarketId(7),
market_name: String::new(),
market_seq,
timestamp: crate::types::unix_epoch(),
metadata: None,
event: BookEvent::MarketStateChanged {
to: BookMarketState::Open,
reason: "T".to_string(),
},
}
}
fn mk_slot() -> EngineEvent {
RingSlot::new(Envelope {
seq: 0,
payload: mk_env(0),
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
}
fn test_root(market_ids: impl IntoIterator<Item = MarketId>) -> EngineRoot {
let cfg = Config {
close_batch_max_events: crate::book::close_process::MIN_CLOSE_BATCH_EVENTS,
..Default::default()
};
let mut state = EngineState::new(cfg.clone());
for market_id in market_ids {
state.markets.insert(
market_id,
Market::from_config_with_cfg(
&cfg,
MarketConfig::two_runner(
market_id,
format!("Market {}", market_id.0),
RunnerId(1),
RunnerId(2),
MarketKind::InPlayCapable,
),
),
);
}
EngineRoot::new_with(state, 0)
}
fn place_order_cmd(market_id: MarketId, correlation_id: u64, account_id: u64) -> Command {
Command {
correlation_id: Some(CorrelationId(correlation_id.to_string())),
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: RunnerId(1),
account_id: AccountId::from(account_id),
client_order_id: None,
side: Side::Yes,
odds: OddsX10000(20_000),
stake: Money(100),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
}
}
fn close_market_cmd(market_id: MarketId) -> Command {
Command {
correlation_id: Some(CorrelationId(format!("close-{}", market_id.0))),
metadata: None,
market_id,
kind: CommandKind::CloseMarket,
}
}
fn apply_command(root: &mut EngineRoot, cmd: Command) {
root.handle(cmd).expect("command should succeed");
let events = root.take_changes();
root.take_tx_id();
root.apply(&events);
}
fn place_orders(root: &mut EngineRoot, market_id: MarketId, count: u64) {
for offset in 0..count {
apply_command(
root,
place_order_cmd(
market_id,
market_id.0 + offset + 1,
market_id.0 + 10_000 + offset,
),
);
}
}
fn apply_close_start_without_progress(root: &mut EngineRoot, market_id: MarketId) {
let market = root
.state()
.markets
.get(&market_id)
.expect("market should exist");
let env = BookEventEnvelope {
market_id,
market_name: market.name.clone(),
market_seq: market.last_market_seq.saturating_add(1),
timestamp: crate::types::unix_epoch(),
metadata: None,
event: BookEvent::MarketStateChanged {
to: BookMarketState::Closed,
reason: "SET_MARKET_STATE".to_string(),
},
};
root.apply(std::slice::from_ref(&env));
}
#[test]
fn fill_batch_slots_populates_and_sets_callback_on_last_slot() {
let mut payloads = vec![mk_env(1), mk_env(2)].into_iter();
let mut slots = [mk_slot(), mk_slot()];
let (tx, _rx) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 100, 99, 2, &mut cb);
assert_eq!(res, Ok(()));
assert!(cb.is_none(), "callback should move to last slot");
assert_eq!(slots[0].seq, 100);
assert_eq!(slots[1].seq, 101);
assert!(slots[0].response_cb.is_none());
assert!(slots[1].response_cb.is_some());
}
#[test]
fn fill_batch_slots_errors_when_slots_exceed_payloads() {
let mut payloads = vec![mk_env(1)].into_iter();
let mut slots = [mk_slot(), mk_slot()];
let (tx, _rx) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 10, 5, 2, &mut cb);
assert_eq!(res, Err(BatchFillError::IteratorLongerThanPayloads));
assert!(cb.is_some(), "callback should remain untouched on error");
}
#[test]
fn fill_batch_slots_errors_when_payloads_exceed_slots() {
let mut payloads = vec![mk_env(1), mk_env(2), mk_env(3)].into_iter();
let mut slots = [mk_slot(), mk_slot()];
let (tx, _rx) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 10, 5, 3, &mut cb);
assert_eq!(res, Err(BatchFillError::IteratorShorterThanPayloads));
assert!(cb.is_some(), "callback should remain untouched on error");
}
#[test]
fn idle_burst_limits_scale_with_idle_streak() {
let cfg = Config {
engine_internal_burst_base_commands: 4,
engine_internal_burst_base_elapsed_ms: 3,
..Default::default()
};
assert_eq!(idle_burst_multiplier(0), 1);
assert_eq!(idle_burst_multiplier(1), 2);
assert_eq!(idle_burst_multiplier(2), 4);
assert_eq!(idle_burst_multiplier(99), 4);
let streak0 = burst_limits_for_idle_streak(&cfg, 0);
let streak1 = burst_limits_for_idle_streak(&cfg, 1);
let streak2 = burst_limits_for_idle_streak(&cfg, 2);
assert_eq!(streak0.max_commands, 4);
assert_eq!(streak0.max_elapsed, Duration::from_millis(3));
assert_eq!(streak1.max_commands, 8);
assert_eq!(streak1.max_elapsed, Duration::from_millis(6));
assert_eq!(streak2.max_commands, 16);
assert_eq!(streak2.max_elapsed, Duration::from_millis(12));
}
#[test]
fn enqueue_market_if_batch_active_dedupes_and_skips_inactive_markets() {
let market_id = MarketId(10);
let inactive_market_id = MarketId(11);
let mut root = test_root([market_id, inactive_market_id]);
place_orders(&mut root, market_id, 3);
apply_command(&mut root, close_market_cmd(market_id));
let mut internal_work = VecDeque::with_capacity(INTERNAL_WORKLIST_CAPACITY);
enqueue_market_if_batch_active(&root, market_id, &mut internal_work);
enqueue_market_if_batch_active(&root, market_id, &mut internal_work);
enqueue_market_if_batch_active(&root, inactive_market_id, &mut internal_work);
assert_eq!(internal_work, VecDeque::from([market_id]));
}
#[test]
fn seed_internal_worklist_prioritizes_first_progressed_market_then_market_id_order() {
let mid1 = MarketId(1);
let mid2 = MarketId(2);
let mid3 = MarketId(3);
let mut root = test_root([mid1, mid2, mid3]);
place_orders(&mut root, mid1, 2);
place_orders(&mut root, mid2, 3);
place_orders(&mut root, mid3, 2);
apply_close_start_without_progress(&mut root, mid1);
apply_command(&mut root, close_market_cmd(mid2));
apply_close_start_without_progress(&mut root, mid3);
let mut internal_work = VecDeque::with_capacity(INTERNAL_WORKLIST_CAPACITY);
seed_internal_worklist_from_root(&root, &mut internal_work);
assert_eq!(internal_work, VecDeque::from([mid2, mid1, mid3]));
}
#[test]
fn prune_stale_internal_head_drops_inactive_entries() {
let active_market_id = MarketId(21);
let stale_market_id = MarketId(22);
let mut root = test_root([active_market_id, stale_market_id]);
place_orders(&mut root, active_market_id, 3);
apply_command(&mut root, close_market_cmd(active_market_id));
let mut internal_work = VecDeque::from([stale_market_id, active_market_id]);
prune_stale_internal_head(&root, &mut internal_work);
assert_eq!(internal_work, VecDeque::from([active_market_id]));
}
}