use super::error::EngineError;
use crate::{
book::{
BookEvent, BookEventEnvelope,
protocol::{
command::{Command, CommandKind},
reject::RejectReason,
response::Response,
},
},
config::{Config, ThreadPinning},
disruptor::{Envelope, ResponseCallback, RingSlot},
engine::{
journaler::JournalHandler,
root::EngineRoot,
state::{EngineState, Market, MarketConfig},
},
error::RuntimeError,
types::{MarketId, MarketKind},
};
use std::{
collections::VecDeque,
marker::PhantomData,
path::{Path, PathBuf},
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
thread,
time::{Duration, Instant},
};
use disrupt_rs::{
BusySpin, EventPoller, GatedSequence, MultiConsumerDependentsBarrier, Producer,
SingleProducerBarrier, build_single_producer,
builder::{MC, NC, SC, single::SPBuilder},
wait_strategies::WaitStrategy,
};
use tracing::{error, info, trace, warn};
mod tuple;
pub use tuple::HandlerRecovery;
#[doc(hidden)]
pub use tuple::RegisteredHandler;
pub type EngineEvent = RingSlot<BookEventEnvelope>;
type NcBuilder<W> = SPBuilder<NC, EngineEvent, W, MultiConsumerDependentsBarrier>;
type ScBuilder<W> = SPBuilder<SC, EngineEvent, W, MultiConsumerDependentsBarrier>;
type McBuilder<W> = SPBuilder<MC, EngineEvent, W, MultiConsumerDependentsBarrier>;
pub struct EngineBuilder<B, W = BusySpin, Hs = ()>
where
W: WaitStrategy + Clone + Send + 'static,
{
config: Config,
root: EngineRoot,
wal_path: PathBuf,
registered_handlers: Hs,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
_builder_state: PhantomData<fn() -> B>,
}
impl EngineBuilder<NcBuilder<BusySpin>, BusySpin> {
pub fn with_markets(
config: Config,
markets: impl IntoIterator<Item = MarketConfig>,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let mut state = EngineState::new(config.clone());
for market_config in markets {
let market_id = market_config.market_id();
state.markets.insert(
market_id,
Market::from_config_with_cfg(&config, market_config),
);
}
let root = EngineRoot::new_with(state, 0);
Self::build_inner(root, config, wal_path)
}
pub fn new(
config: Config,
market_ids: impl IntoIterator<Item = MarketId>,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
let markets = market_ids
.into_iter()
.map(|mid| MarketConfig::multi_runner_dynamic(mid, MarketKind::InPlayCapable));
Self::with_markets(config, markets, wal_path)
}
pub fn from_root(
initial_root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
Self::build_inner(initial_root, config, wal_path)
}
fn build_inner(
root: EngineRoot,
config: Config,
wal_path: impl AsRef<Path>,
) -> anyhow::Result<Self> {
crate::btx_metrics_init!(
"engine",
"wal_poller",
"projection_apply",
"projection_live",
"projection_snapshot",
"projection_disk",
);
let metrics = crate::metrics::globals::metrics();
let pinning = config.pinning.clone().unwrap_or_default();
Ok(EngineBuilder {
config,
root,
wal_path: wal_path.as_ref().to_path_buf(),
registered_handlers: (),
shutdown: Arc::new(AtomicBool::new(false)),
metrics,
pinning,
wait_strategy: BusySpin,
_builder_state: PhantomData,
})
}
}
impl<B, W, R> EngineBuilder<B, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
fn map_handlers<B2, R2>(self, map: impl FnOnce(R) -> R2) -> EngineBuilder<B2, W, R2> {
let EngineBuilder {
config,
root,
wal_path,
registered_handlers,
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: _,
} = self;
EngineBuilder {
config,
root,
wal_path,
registered_handlers: map(registered_handlers),
shutdown,
metrics,
pinning,
wait_strategy,
_builder_state: PhantomData,
}
}
fn set_wait_strategy<B2, W2>(self, wait_strategy: W2) -> EngineBuilder<B2, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
EngineBuilder {
config: self.config,
root: self.root,
wal_path: self.wal_path,
registered_handlers: self.registered_handlers,
shutdown: self.shutdown,
metrics: self.metrics,
pinning: self.pinning,
wait_strategy,
_builder_state: PhantomData,
}
}
}
impl<W, R> EngineBuilder<NcBuilder<W>, W, R>
where
W: WaitStrategy + Clone + Send + 'static,
{
pub fn with_wait_strategy<W2>(self, wait_strategy: W2) -> EngineBuilder<NcBuilder<W2>, W2, R>
where
W2: WaitStrategy + Clone + Send + 'static,
{
self.set_wait_strategy(wait_strategy)
}
}
#[allow(clippy::type_complexity)]
fn build_disruptor_pipeline<W>(
config: &Config,
wait_strategy: W,
) -> (
EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
NcBuilder<W>,
GatedSequence<W::Notifier>,
)
where
W: WaitStrategy,
{
let (wal_poller, builder) = build_single_producer(
config.ring_size_pow2,
|| {
RingSlot::new(Envelope {
seq: 0,
payload: BookEventEnvelope {
market_id: MarketId(0),
market_name: String::new(),
market_seq: 0,
timestamp: crate::types::unix_epoch(),
metadata: crate::book::EventMetadata::default(),
event: BookEvent::MarketStateChanged {
to: crate::book::common::types::BookMarketState::Open,
reason: String::new(),
},
},
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
},
wait_strategy,
)
.event_poller();
let (builder, gates) = builder.new_gates::<1>();
let durability_gate = gates[0].clone();
let builder = builder.and_then_with_dependents(&gates);
(wal_poller, builder, durability_gate)
}
struct EngineBuildCtx<W: WaitStrategy> {
root: EngineRoot,
wal_poller: EventPoller<EngineEvent, SingleProducerBarrier, W::Notifier>,
journaler: JournalHandler,
durability_gate: GatedSequence<W::Notifier>,
shutdown: Arc<AtomicBool>,
metrics: crate::metrics::Metrics,
pinning: ThreadPinning,
wait_strategy: W,
}
fn build_engine<B, W>(
producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
ctx: EngineBuildCtx<W>,
) -> anyhow::Result<Engine>
where
B: disrupt_rs::Barrier + 'static,
W: WaitStrategy + Clone + Send + 'static,
{
let EngineBuildCtx {
root,
wal_poller,
journaler,
durability_gate,
shutdown,
metrics,
pinning,
wait_strategy,
} = ctx;
let wal_thread = journaler
.into_poller(
wal_poller,
wait_strategy.clone(),
durability_gate,
Arc::clone(&shutdown),
)
.poll_pinned(pinning.wal_poller_core);
let (cmd_tx, cmd_rx) =
flume::bounded::<EngineMsg>(crate::config::DEFAULT_ENGINE_CHANNEL_CAPACITY);
let handler = EngineHandler { tx: cmd_tx };
let engine_thread = {
let shutdown = Arc::clone(&shutdown);
let core_id = pinning.engine_core;
thread::spawn(move || {
if let Some(id) = core_id
&& !core_affinity::set_for_current(core_affinity::CoreId { id })
{
warn!(
core_id = id,
"failed to pin engine thread to requested core"
);
}
crate::metrics_stage!("engine");
run_engine_loop(root, producer, cmd_rx, shutdown);
})
};
info!("engine threads started");
Ok(Engine {
handler,
metrics,
shutdown,
engine_thread: Some(engine_thread),
wal_thread: Some(wal_thread),
})
}
pub struct Engine {
handler: EngineHandler,
metrics: crate::metrics::Metrics,
shutdown: Arc<AtomicBool>,
engine_thread: Option<thread::JoinHandle<()>>,
wal_thread: Option<thread::JoinHandle<anyhow::Result<()>>>,
}
impl Engine {
pub fn handle(&self) -> EngineHandler {
self.handler.clone()
}
pub fn metrics_snapshot(&self) -> crate::metrics::MetricsSnapshot {
self.metrics.snapshot()
}
pub fn metrics(&self) -> crate::metrics::Metrics {
self.metrics.clone()
}
pub fn stop(&mut self) -> anyhow::Result<()> {
self.shutdown.store(true, Ordering::Release);
let Some(engine_thread) = self.engine_thread.take() else {
if let Some(wal_thread) = self.wal_thread.take()
&& wal_thread.join().is_err()
{
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
return Ok(());
};
if engine_thread.join().is_err() {
error!("engine thread panicked");
}
if let Some(wal_thread) = self.wal_thread.take() {
match wal_thread.join() {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!(error = ?e, "wal thread returned error");
return Err(e);
}
Err(_) => {
error!("wal thread panicked");
return Err(RuntimeError::WalThreadPanic.into());
}
}
}
Ok(())
}
}
fn run_engine_loop<B, W>(
mut root: EngineRoot,
mut producer: disrupt_rs::SingleProducer<EngineEvent, B, W>,
cmd_rx: flume::Receiver<EngineMsg>,
shutdown: Arc<AtomicBool>,
) where
B: disrupt_rs::Barrier,
W: WaitStrategy,
{
let mut batch_process_rr_cursor: Option<MarketId> = None;
let mut internal_queue: VecDeque<EngineMsg> = VecDeque::new();
while !shutdown.load(Ordering::Acquire) {
crate::metric!({ inflight: 0 });
crate::metric!({ queue_len: cmd_rx.len().saturating_add(internal_queue.len()) as i64 });
let msg = if let Some(m) = internal_queue.pop_front() {
m
} else {
match cmd_rx.recv_timeout(Duration::from_millis(
crate::config::DEFAULT_ENGINE_POLL_TIMEOUT_MS,
)) {
Ok(m) => m,
Err(flume::RecvTimeoutError::Timeout) => {
if let Some(msg) =
pick_next_batch_process_command(&root, &mut batch_process_rr_cursor)
{
msg
} else {
continue;
}
}
Err(flume::RecvTimeoutError::Disconnected) => break,
}
};
crate::metric!({ inflight: 1, in_total: 1 });
crate::metric!({ queue_len: cmd_rx.len().saturating_add(internal_queue.len()) as i64 });
let t0 = Instant::now();
let EngineMsg::Command { cmd, resp_tx } = msg;
let had_response_channel = resp_tx.is_some();
let was_internal_continue = matches!(
&cmd.kind,
CommandKind::ContinueCloseMarket
| CommandKind::ContinueBatchCancelOrders
| CommandKind::ContinueLapseOrders
| CommandKind::ContinueVoidOrders
);
trace!(
market_id = ?cmd.market_id,
correlation_id = ?cmd.correlation_id,
command = ?cmd.kind,
"engine command"
);
let start_seq = root.version().saturating_add(1);
if let Err(err) = root.handle(cmd) {
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: 0,
inflight: 0
});
if let Some(tx) = resp_tx
&& tx
.send(Ok(Response::Rejected {
reason: err.into_reason(),
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending rejection");
}
if !was_internal_continue
&& let Some(msg) =
pick_next_batch_process_command(&root, &mut batch_process_rr_cursor)
{
internal_queue.push_back(msg);
}
continue;
}
let events = root.take_changes();
let events_len = events.len();
debug_assert!(
events_len > 0,
"accepted commands must emit at least one event"
);
let tx_id = root.take_tx_id();
let tx_len = match u16::try_from(events_len) {
Ok(len) => len,
Err(_) => {
error!(
events_len,
"tx_len overflow: too many events for single transaction"
);
if let Some(tx) = resp_tx.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
continue;
}
};
root.apply(&events);
let resp_tx_for_error = resp_tx.clone();
let callback = resp_tx.map(|tx| ResponseCallback { tx });
if events_len == 1 {
let Some(payload) = events.into_iter().next() else {
error!("event batch empty despite events_len == 1");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
break;
};
producer.publish(move |slot| {
slot.seq = start_seq;
slot.payload = payload;
slot.response_cb = callback;
slot.tx_id = tx_id;
slot.tx_len = 1;
slot.tx_ix = 0;
});
} else {
let mut payloads = events.into_iter();
let mut callback = callback;
let mut publish_error = None;
producer.batch_publish(events_len, |iter| {
publish_error =
fill_batch_slots(iter, &mut payloads, start_seq, tx_id, tx_len, &mut callback)
.err();
});
if let Some(err_kind) = publish_error {
error!(?err_kind, "batch_publish payload/slot length mismatch");
if let Some(tx) = resp_tx_for_error.as_ref()
&& tx
.send(Ok(Response::Rejected {
reason: RejectReason::InternalError,
tx_start_seq: None,
tx_len: 0,
}))
.is_err()
{
trace!("response receiver dropped before sending internal rejection");
}
shutdown.store(true, Ordering::Release);
break;
}
}
crate::metric!({
duration_ns: t0.elapsed().as_nanos() as u64,
out_total: events_len as u64,
inflight: 0
});
if (!was_internal_continue || had_response_channel)
&& let Some(msg) = pick_next_batch_process_command(&root, &mut batch_process_rr_cursor)
{
internal_queue.push_back(msg);
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BatchFillError {
IteratorLongerThanPayloads,
IteratorShorterThanPayloads,
}
fn fill_batch_slots<'a, I>(
iter: I,
payloads: &mut std::vec::IntoIter<BookEventEnvelope>,
start_seq: u64,
tx_id: u64,
tx_len: u16,
callback: &mut Option<ResponseCallback>,
) -> Result<(), BatchFillError>
where
I: Iterator<Item = &'a mut EngineEvent>,
{
let mut last_slot: Option<&'a mut EngineEvent> = None;
for (ix, slot) in iter.enumerate() {
let Some(payload) = payloads.next() else {
return Err(BatchFillError::IteratorLongerThanPayloads);
};
slot.seq = start_seq.saturating_add(ix as u64);
slot.payload = payload;
slot.response_cb = None;
slot.tx_id = tx_id;
slot.tx_len = tx_len;
slot.tx_ix = ix as u16;
last_slot = Some(slot);
}
if payloads.next().is_some() {
return Err(BatchFillError::IteratorShorterThanPayloads);
}
if let Some(slot) = last_slot {
slot.response_cb = callback.take();
}
Ok(())
}
fn continue_close_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueCloseMarket,
}
}
fn continue_batch_cancel_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueBatchCancelOrders,
}
}
fn continue_lapse_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueLapseOrders,
}
}
fn continue_void_command(market_id: MarketId) -> Command {
Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ContinueVoidOrders,
}
}
impl Drop for Engine {
fn drop(&mut self) {
if let Err(err) = self.stop() {
error!(error = ?err, "failed to stop engine cleanly during drop");
}
}
}
fn pick_next_batch_process_command(
root: &EngineRoot,
cursor: &mut Option<MarketId>,
) -> Option<EngineMsg> {
fn classify_market(m: &crate::engine::state::Market) -> Option<CommandKind> {
if m.book.close_process_state().is_some() {
return Some(CommandKind::ContinueCloseMarket);
}
if m.book.cancel_process_state().is_some() {
return Some(CommandKind::ContinueBatchCancelOrders);
}
match m.book.batch_process_state() {
Some(crate::book::BatchProcessState::Lapse(_)) => {
Some(CommandKind::ContinueLapseOrders)
}
Some(crate::book::BatchProcessState::Void(_)) => Some(CommandKind::ContinueVoidOrders),
_ => None,
}
}
let mut best_after_cursor: Option<(MarketId, CommandKind)> = None;
let mut global_min: Option<(MarketId, CommandKind)> = None;
for (&mid, m) in root.state().markets.iter() {
let Some(kind) = classify_market(m) else {
continue;
};
if global_min.as_ref().is_none_or(|(best, _)| mid.0 < best.0) {
global_min = Some((mid, kind.clone()));
}
if let Some(cur) = *cursor
&& mid.0 > cur.0
&& best_after_cursor
.as_ref()
.is_none_or(|(best, _)| mid.0 < best.0)
{
best_after_cursor = Some((mid, kind));
}
}
let next = best_after_cursor.or(global_min)?;
*cursor = Some(next.0);
let cmd = match next.1 {
CommandKind::ContinueCloseMarket => continue_close_command(next.0),
CommandKind::ContinueBatchCancelOrders => continue_batch_cancel_command(next.0),
CommandKind::ContinueLapseOrders => continue_lapse_command(next.0),
CommandKind::ContinueVoidOrders => continue_void_command(next.0),
_ => return None,
};
Some(EngineMsg::Command { cmd, resp_tx: None })
}
#[derive(Clone)]
pub struct EngineHandler {
pub(crate) tx: flume::Sender<EngineMsg>,
}
pub(crate) enum EngineMsg {
Command {
cmd: Command,
resp_tx:
Option<flume::Sender<Result<crate::book::protocol::response::Response, EngineError>>>,
},
}
impl EngineHandler {
pub fn submit(&self, cmd: Command) -> Result<(), EngineError> {
self.tx
.send(EngineMsg::Command { cmd, resp_tx: None })
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(())
}
pub fn submit_with_response(
&self,
cmd: Command,
) -> Result<
flume::Receiver<Result<crate::book::protocol::response::Response, EngineError>>,
EngineError,
> {
let (resp_tx, resp_rx) = flume::bounded(1);
self.tx
.send(EngineMsg::Command {
cmd,
resp_tx: Some(resp_tx),
})
.map_err(|_| EngineError::DisruptorStopped)?;
Ok(resp_rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::book::{BookEvent, BookMarketState};
fn mk_env(market_seq: u64) -> BookEventEnvelope {
BookEventEnvelope {
market_id: MarketId(7),
market_name: String::new(),
market_seq,
timestamp: crate::types::unix_epoch(),
metadata: None,
event: BookEvent::MarketStateChanged {
to: BookMarketState::Open,
reason: "T".to_string(),
},
}
}
fn mk_slot() -> EngineEvent {
RingSlot::new(Envelope {
seq: 0,
payload: mk_env(0),
response_cb: None,
tx_id: 0,
tx_len: 1,
tx_ix: 0,
})
}
#[test]
fn fill_batch_slots_populates_and_sets_callback_on_last_slot() {
let mut payloads = vec![mk_env(1), mk_env(2)].into_iter();
let mut slots = vec![mk_slot(), mk_slot()];
let (tx, _rx) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 100, 99, 2, &mut cb);
assert_eq!(res, Ok(()));
assert!(cb.is_none(), "callback should move to last slot");
assert_eq!(slots[0].seq, 100);
assert_eq!(slots[1].seq, 101);
assert!(slots[0].response_cb.is_none());
assert!(slots[1].response_cb.is_some());
}
#[test]
fn fill_batch_slots_errors_when_slots_exceed_payloads() {
let mut payloads = vec![mk_env(1)].into_iter();
let mut slots = vec![mk_slot(), mk_slot()];
let (tx, _rx) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 10, 5, 2, &mut cb);
assert_eq!(res, Err(BatchFillError::IteratorLongerThanPayloads));
assert!(cb.is_some(), "callback should remain untouched on error");
}
#[test]
fn fill_batch_slots_errors_when_payloads_exceed_slots() {
let mut payloads = vec![mk_env(1), mk_env(2), mk_env(3)].into_iter();
let mut slots = vec![mk_slot(), mk_slot()];
let (tx, _rx) = flume::bounded(1);
let mut cb = Some(ResponseCallback { tx });
let res = fill_batch_slots(slots.iter_mut(), &mut payloads, 10, 5, 3, &mut cb);
assert_eq!(res, Err(BatchFillError::IteratorShorterThanPayloads));
assert!(cb.is_some(), "callback should remain untouched on error");
}
}