use super::error::{BookError, RequestedBookState};
use super::protocol::{
command::{Command, CommandKind, MarketState, Persistence, Side, TimeInForce},
reject::RejectReason,
};
use crate::book::common::types::BookOrderInfo;
use crate::types::*;
use chrono::Utc;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use tracing::error;
use super::common::fast::{OrderKey, OrderStore, RunnerBook};
use super::common::*;
use serde::{Deserialize, Serialize};
type TickIndex = u16;
type TradePairKey = (OrderId, OrderId);
type RunnerLevelKey = (RunnerId, Side, TickIndex);
type RestingLevelOrder = (DateTime, OrderId, OrderKey, Money);
type RestingOrdersByLevel = HashMap<RunnerLevelKey, Vec<RestingLevelOrder>>;
#[derive(Debug, Default)]
struct Scratch {
maker_matched_delta: HashMap<OrderId, i64>,
matchable_ticks: Vec<TickIndex>,
fills: Vec<PlannedFill>,
events: EventVec,
}
#[derive(Debug, Clone)]
struct PlannedFill {
maker_order_id: OrderId,
maker_account_id: AccountId,
maker_correlation_id: Option<CorrelationId>,
maker_runner_id: RunnerId,
maker_side: Side,
price: OddsX10000,
fill_amount: Money,
maker_remaining_after: Money,
taker_remaining_after: Money,
}
#[derive(Debug, Clone)]
struct TradeMatchView {
order_id: OrderId,
role: TradeRole,
runner_id: RunnerId,
counter_party: OrderId,
remaining_stake: Money,
matched_delta: Money,
}
#[derive(Debug, Clone, Copy)]
struct PendingTradeView {
order_id: OrderId,
role: TradeRole,
runner_id: RunnerId,
matched_delta: Money,
}
#[derive(Debug, Clone)]
struct MatchRequest {
ts: DateTime,
taker_account_id: AccountId,
taker_correlation_id: Option<CorrelationId>,
taker_side: Side,
taker_runner_id: RunnerId,
taker_price: OddsX10000,
taker_stake: Money,
taker_order_id: OrderId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MultiRunnerBookSnapshot {
market_id: MarketId,
market_name: String,
market_kind: MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
next_order_id: u64,
#[serde(default = "crate::book::close_process::default_close_batch_events")]
close_batch_max_events: u16,
runners: BTreeSet<RunnerId>,
runner_labels: HashMap<RunnerId, String>,
allow_unknown_runners: bool,
removed_runners: BTreeSet<RunnerId>,
batch_process: Option<BatchProcessState>,
orders: OrderStore,
runner_matched_volume: BTreeMap<RunnerId, Money>,
}
#[derive(Debug)]
pub struct MultiRunnerBook {
market_id: MarketId,
pub(crate) market_name: String,
market_kind: MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
next_order_id: u64,
close_batch_max_events: u16,
runners: BTreeSet<RunnerId>,
runner_labels: HashMap<RunnerId, String>,
allow_unknown_runners: bool,
removed_runners: BTreeSet<RunnerId>,
runner_books: HashMap<RunnerId, RunnerBook>,
batch_process: Option<BatchProcessState>,
orders: OrderStore,
active_order_count_cache: usize,
runner_matched_volume: BTreeMap<RunnerId, Money>,
pending_trade_views: HashMap<TradePairKey, PendingTradeView>,
scratch: Scratch,
}
impl Serialize for MultiRunnerBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
MultiRunnerBookSnapshot {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
next_order_id: self.next_order_id,
close_batch_max_events: self.close_batch_max_events,
runners: self.runners.clone(),
runner_labels: self
.runner_labels
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect(),
allow_unknown_runners: self.allow_unknown_runners,
removed_runners: self.removed_runners.clone(),
batch_process: self.batch_process.clone(),
orders: self.orders.clone(),
runner_matched_volume: self.runner_matched_volume.clone(),
}
.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for MultiRunnerBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = MultiRunnerBookSnapshot::deserialize(deserializer)?;
let mut book = Self {
market_id: snap.market_id,
market_name: snap.market_name,
market_kind: snap.market_kind,
state: snap.state,
market_phase: snap.market_phase,
next_order_id: snap.next_order_id,
close_batch_max_events: snap.close_batch_max_events,
runners: snap.runners,
runner_labels: snap.runner_labels.into_iter().collect(),
allow_unknown_runners: snap.allow_unknown_runners,
removed_runners: snap.removed_runners,
runner_books: HashMap::new(),
batch_process: snap.batch_process,
orders: snap.orders,
active_order_count_cache: 0,
runner_matched_volume: snap.runner_matched_volume,
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
};
book.rebuild_ladders();
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}
impl Clone for MultiRunnerBook {
fn clone(&self) -> Self {
Self {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
next_order_id: self.next_order_id,
close_batch_max_events: self.close_batch_max_events,
runners: self.runners.clone(),
runner_labels: self.runner_labels.clone(),
allow_unknown_runners: self.allow_unknown_runners,
removed_runners: self.removed_runners.clone(),
runner_books: self.runner_books.clone(),
batch_process: self.batch_process.clone(),
orders: self.orders.clone(),
active_order_count_cache: self.active_order_count_cache,
runner_matched_volume: self.runner_matched_volume.clone(),
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
}
impl MultiRunnerBook {
pub fn new(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
runner_ids: impl IntoIterator<Item = RunnerId>,
) -> Self {
Self::new_with_capacity(market_id, market_kind, market_phase, runner_ids, 20_000)
}
pub fn new_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
runner_ids: impl IntoIterator<Item = RunnerId>,
order_store_capacity: usize,
) -> Self {
assert!(
market_kind.supports_initial_phase(market_phase),
"market kind {market_kind:?} does not support initial phase {market_phase:?}"
);
let runners: BTreeSet<RunnerId> = runner_ids.into_iter().collect();
let runner_labels: HashMap<RunnerId, String> =
runners.iter().map(|rid| (*rid, rid.to_string())).collect();
let runner_books: HashMap<RunnerId, RunnerBook> = runners
.iter()
.map(|&rid| (rid, RunnerBook::new()))
.collect();
Self {
market_id,
market_name: String::new(),
market_kind,
market_phase,
state: BookMarketState::Open,
next_order_id: 1,
close_batch_max_events: crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS,
runners,
runner_labels,
allow_unknown_runners: false,
removed_runners: BTreeSet::new(),
runner_books,
batch_process: None,
orders: OrderStore::with_capacity(order_store_capacity),
active_order_count_cache: 0,
runner_matched_volume: BTreeMap::new(),
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
pub fn new_engine(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
) -> Self {
Self::new_with_capacity(
market_id,
market_kind,
market_phase,
std::iter::empty::<RunnerId>(),
20_000,
)
}
pub fn new_engine_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
order_store_capacity: usize,
) -> Self {
Self::new_with_capacity(
market_id,
market_kind,
market_phase,
std::iter::empty::<RunnerId>(),
order_store_capacity,
)
}
pub(crate) fn set_close_batch_max_events(&mut self, batch_max_events: u16) {
crate::book::close_process::set_close_batch_max_events(
&mut self.close_batch_max_events,
batch_max_events,
);
}
pub(crate) fn close_batch_max_events(&self) -> u16 {
self.close_batch_max_events
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_kind(&self) -> MarketKind {
self.market_kind
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn market_phase(&self) -> MarketPhase {
self.market_phase
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn get_order(&self, order_id: OrderId) -> Option<&BookOrder> {
self.orders.get(&order_id)
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders.is_in_level(&order_id)
}
pub fn active_order_count(&self) -> usize {
self.active_order_count_cache
}
fn select_cancelled_chunk(
&self,
cursor_after: Option<OrderId>,
max_orders: usize,
should_cancel: impl FnMut(&BookOrder) -> bool,
) -> CancelledOrdersChunk {
collect_cancelled_orders_chunk(
self.orders.iter_sorted_from(cursor_after),
max_orders,
should_cancel,
|order| &order.info,
)
}
pub fn batch_process_state(&self) -> Option<&BatchProcessState> {
self.batch_process.as_ref()
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
self.runners.iter().copied()
}
fn has_active_batch_process(&self) -> bool {
self.batch_process.is_some()
}
pub(crate) fn set_runner_labels(&mut self, runner_ids: &[RunnerId], runner_labels: &[String]) {
assert_eq!(
runner_ids.len(),
runner_labels.len(),
"multi-runner label map length mismatch"
);
self.runner_labels.clear();
for runner_id in &self.runners {
self.runner_labels.insert(*runner_id, runner_id.to_string());
}
for (runner_id, runner_label) in runner_ids.iter().zip(runner_labels.iter()) {
if !self.runners.contains(runner_id) {
panic!("unknown runner id {:?} for multi-runner market", runner_id);
}
self.runner_labels.insert(*runner_id, runner_label.clone());
}
}
pub fn runner_label(&self, runner_id: RunnerId) -> &str {
self.runner_labels
.get(&runner_id)
.map(|s| s.as_str())
.unwrap_or_else(|| panic!("unknown runner id {:?} for multi-runner market", runner_id))
}
fn rebuild_ladders(&mut self) {
self.runner_books = self
.runners
.iter()
.copied()
.map(|rid| (rid, RunnerBook::new()))
.collect();
self.active_order_count_cache = 0;
let mut per_level: RestingOrdersByLevel = HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
if self.removed_runners.contains(&order.runner_id) {
continue;
}
let remaining = order.remaining();
if remaining.0 <= 0 {
continue;
}
let tick: TickIndex = self.orders.stored_tick(key) as u16;
per_level
.entry((order.runner_id, order.info.side, tick))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((runner_id, _, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
let Some(rb) = self.runner_books.get_mut(&runner_id) else {
continue;
};
for (_, _, key, remaining) in items {
rb.insert_tail(&mut self.orders, key, remaining);
}
}
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
self.runner_books
.get(&runner_id)
.map(|rb| rb.runner_prices(runner_id, depth))
.unwrap_or(RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
})
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_back.into_iter().next()
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_lay.into_iter().next()
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
self.runner_matched_volume
.get(&runner_id)
.copied()
.unwrap_or(Money::zero())
}
pub fn total_matched(&self) -> Money {
Money(self.runner_matched_volume.values().map(|m| m.0).sum())
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let current_state = self.state;
let current_phase = self.market_phase;
let state_err = |reason, requested_state| {
BookError::state_error(
err_correlation_id.clone(),
reason,
current_state,
current_phase,
requested_state,
)
};
let ts = Utc::now();
let market_id = cmd.market_id;
if market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason, .. } => {
let events = self
.cmd_halt_market(ts, *reason)
.map_err(|reason| state_err(reason, RequestedBookState::Halted))?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
_ => {}
}
cmd.kind
.validate_book_gate(self.batch_process_state(), self.state.is_halted())
.map_err(&err)?;
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let (events, kind) = match &cmd.kind {
CommandKind::CreateMarket { .. } => {
return Err(err(RejectReason::InternalError));
}
CommandKind::SetMarketState { state, .. } => {
let reason = "SET_MARKET_STATE";
match state {
MarketState::Open => {
if self.state == BookMarketState::Suspended {
self.cmd_unsuspend(ts, reason).map_err(&err)?
} else {
self.cmd_open_market(ts, reason).map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?
}
}
MarketState::Suspended => {
self.cmd_suspend(ts, reason).map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?
}
MarketState::Closed => self
.cmd_close_market(ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
}
}
CommandKind::SetMarketPhase { phase } => self
.cmd_set_market_phase(ts, *phase, "SET_MARKET_PHASE")
.map_err(|reason| state_err(reason, RequestedBookState::Phase(*phase)))?,
CommandKind::CloseMarket => self
.cmd_close_market(ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(MarketState::Closed))
})?,
CommandKind::ContinueCloseMarket => {
self.cmd_continue_close_market(ts).map_err(&err)?
}
CommandKind::BatchCancelOrders {
from_created_at_inclusive,
to_created_at_inclusive,
account_id,
runner_id,
reason,
} => self
.cmd_batch_cancel_orders(
ts,
*from_created_at_inclusive,
*to_created_at_inclusive,
account_id.clone(),
*runner_id,
reason.as_str(),
cmd.metadata.clone(),
)
.map_err(&err)?,
CommandKind::ContinueBatchCancelOrders => {
self.cmd_continue_batch_cancel_orders(ts).map_err(&err)?
}
CommandKind::ContinueLapseOrders => {
self.cmd_continue_lapse_orders(ts).map_err(&err)?
}
CommandKind::SettleMarket {
runner_results,
void_reason,
..
} => self
.cmd_settle_market(ts, runner_results.as_slice(), void_reason.as_str())
.map_err(|reason| state_err(reason, RequestedBookState::Settled))?,
CommandKind::PlaceOrder {
account_id,
runner_id,
side,
odds,
stake,
persistence,
time_in_force,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
let order_id = OrderId(self.next_order_id);
self.cmd_place_order(
&mut scratch,
ts,
correlation_id.clone(),
order_id,
account_id.clone(),
*runner_id,
*side,
*odds,
*stake,
*persistence,
*time_in_force,
)
.map_err(&err)?
}
CommandKind::ReplaceOrder {
order_id,
account_id,
new_odds,
new_stake,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
let Some(old) = self.get_order(*order_id) else {
return Err(err(RejectReason::OrderNotFound));
};
if &old.info.account_id != account_id {
return Err(err(RejectReason::NotOrderOwner));
}
let new_price = new_odds.unwrap_or(old.price);
let new_stake = new_stake.unwrap_or_else(|| old.remaining());
if new_price == old.price && new_stake == old.remaining() {
return Err(err(RejectReason::NoChange));
}
let new_order_id = OrderId(self.next_order_id);
self.cmd_replace_order(
&mut scratch,
ts,
correlation_id.clone(),
*order_id,
new_order_id,
account_id.clone(),
old.runner_id,
old.info.side,
new_price,
new_stake,
old.persistence,
)
.map_err(&err)?
}
CommandKind::CancelOrder {
order_id,
account_id,
..
} => self
.cmd_cancel_order(ts, *order_id, account_id.clone(), "USER_CANCEL")
.map_err(&err)?,
CommandKind::PlaceBinaryOrder { .. } | CommandKind::ReplaceBinaryOrder { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::RemoveRunner {
runner_id,
reduction_factor_bps,
..
} => self
.cmd_remove_runner(ts, *runner_id, *reduction_factor_bps)
.map_err(&err)?,
CommandKind::VoidTrades {
timestamp,
start_time,
end_time,
void_reason,
..
} => self
.cmd_void_trades(*timestamp, *start_time, *end_time, void_reason.as_str())
.map_err(&err)?,
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("handled above")
}
};
Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind,
},
))
})();
self.scratch = scratch;
result
}
pub(crate) fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
let event = &env.event;
match event {
BookEvent::MarketCreated { .. } => {}
BookEvent::MarketStateChanged { to, reason } => {
self.market_state_changed(*to, reason.as_str())
}
BookEvent::MarketPhaseChanged { to, reason } => {
self.market_phase_changed(*to, reason.as_str())
}
BookEvent::OrderAccepted {
correlation_id,
order_id,
account_id,
runner_id,
side,
price,
stake,
persistence,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
self.order_accepted(
ts,
*order_id,
account_id.clone(),
correlation_id.clone(),
*runner_id,
*side,
*price,
*stake,
*persistence,
)
}
BookEvent::BinaryOrderAccepted { .. } => {}
BookEvent::OrderCancelled {
cancelled_order, ..
} => {
self.order_cancelled(cancelled_order.order_id);
}
BookEvent::OrderCancelledBatched {
cancelled_orders,
cursor_after,
cancel_cause,
cause_detail: _,
} => {
let cursor_after_order_id = cursor_after.as_ref().map(|cursor| cursor.order_id);
let cancelled_count = cancelled_orders.len() as u64;
for cancelled_order in cancelled_orders {
self.order_cancelled(cancelled_order.order_id);
}
match cancel_cause {
CancelCause::CloseCancel => {
if let Some(s) = self.batch_process.as_mut()
&& s.is_close()
{
s.record_chunk(cursor_after_order_id, cancelled_count);
}
}
CancelCause::SuspendLapse | CancelCause::InPlayLapse => {
if let Some(s) = self.batch_process.as_mut()
&& s.kind == BatchProcessKind::Lapse
{
s.record_chunk(cursor_after_order_id, cancelled_count);
} else if self.batch_process.is_none() {
let mut state = BatchProcessState::lapse(
self.close_batch_max_events,
*cancel_cause,
);
state.record_chunk(cursor_after_order_id, cancelled_count);
self.batch_process = Some(state);
}
}
_ => {
if let Some(s) = self.batch_process.as_mut()
&& s.kind == BatchProcessKind::Cancel
{
s.record_chunk(cursor_after_order_id, cancelled_count);
}
}
}
}
BookEvent::TradeMatched {
order_id,
role,
runner_id,
counter_party,
remaining_stake,
matched_delta,
..
} => self.trade_matched(
ts,
TradeMatchView {
order_id: *order_id,
role: *role,
runner_id: *runner_id,
counter_party: *counter_party,
remaining_stake: *remaining_stake,
matched_delta: *matched_delta,
},
),
BookEvent::BinaryTradeMatched { .. } => {}
BookEvent::VoidTrades { .. } => {}
BookEvent::RunnerRemoved { runner_id, .. } => {
assert!(
self.runners.contains(runner_id),
"runner removed event references unknown runner {:?}",
runner_id
);
self.removed_runners.insert(*runner_id);
}
BookEvent::MarketSettled { .. } => {}
BookEvent::BatchCancelStarted {
batch_max_events,
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
reason,
final_event_metadata_json,
} => {
self.batch_process = Some(BatchProcessState::cancel(
*batch_max_events,
*started_at_ms,
*from_created_at_inclusive_ms,
*to_created_at_inclusive_ms,
account_filter.clone(),
*runner_filter,
reason.clone(),
final_event_metadata_json.clone(),
));
}
BookEvent::BatchProcessCompleted { .. } => {
self.batch_process = None;
}
BookEvent::MarketRemoved { .. } => {}
}
}
pub(crate) fn market_state_changed(&mut self, to: BookMarketState, reason: &str) {
let is_close_batch = self
.batch_process
.as_ref()
.is_some_and(BatchProcessState::is_close);
if to == BookMarketState::Closed && !is_close_batch {
let batch_max_events =
crate::book::close_process::parse_close_start_batch_max_events(reason)
.unwrap_or(self.close_batch_max_events);
let total_live_orders =
crate::book::close_process::count_live_orders(self.orders.iter_sorted());
self.batch_process = (total_live_orders > 0)
.then(|| BatchProcessState::close(batch_max_events, total_live_orders));
}
let from = self.state;
let lapse_cause = if to == BookMarketState::Suspended && from == BookMarketState::Open {
Some(CancelCause::SuspendLapse)
} else {
None
};
if let Some(cancel_cause) = lapse_cause
&& self.batch_process.is_none()
{
let has_lapse = self.orders.iter_sorted().any(|(_, o)| {
crate::book::close_process::is_live_order(o) && o.persistence == Persistence::Lapse
});
if has_lapse {
self.batch_process = Some(BatchProcessState::lapse(
self.close_batch_max_events,
cancel_cause,
));
}
}
self.state = to;
}
pub(crate) fn market_phase_changed(&mut self, to: MarketPhase, _reason: &str) {
let from = self.market_phase;
assert!(
self.market_kind.can_transition_to_phase(from, to),
"pre-event-only market cannot be applied into live phase"
);
if from == MarketPhase::Pre && to == MarketPhase::Live && self.batch_process.is_none() {
let has_lapse = self.orders.iter_sorted().any(|(_, o)| {
crate::book::close_process::is_live_order(o) && o.persistence == Persistence::Lapse
});
if has_lapse {
self.batch_process = Some(BatchProcessState::lapse(
self.close_batch_max_events,
CancelCause::InPlayLapse,
));
}
}
self.market_phase = to;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn order_accepted(
&mut self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
correlation_id: Option<CorrelationId>,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
) {
if self.orders.contains(&order_id) {
return;
}
assert!(
self.runners.contains(&runner_id),
"order accepted event references unknown runner {:?}",
runner_id
);
if let Err(err) = self.orders.insert(
order_id,
BookOrder {
info: BookOrderInfo {
order_id,
account_id,
correlation_id,
side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
runner_id,
price,
stake,
matched: Money::zero(),
persistence,
},
) {
error!(order_id = ?order_id, reason = ?err, "failed to insert order");
return;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
let Some(key) = self.orders.get_key(&order_id) else {
error!(order_id = ?order_id, "missing order key after insert");
return;
};
self.runner_books
.entry(runner_id)
.or_default()
.insert_tail(&mut self.orders, key, stake);
}
fn trade_pair_key(order_id: OrderId, counter_party: OrderId) -> TradePairKey {
if order_id <= counter_party {
(order_id, counter_party)
} else {
(counter_party, order_id)
}
}
fn trade_matched(&mut self, ts: DateTime, m: TradeMatchView) {
let was_live = self.orders.get(&m.order_id).is_some_and(|o| {
matches!(
o.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
});
if let Some(order) = self.orders.get(&m.order_id) {
let runner_id = order.runner_id;
let side = order.info.side;
if let Some(key) = self.orders.get_key(&m.order_id)
&& let Some(runner_book) = self.runner_books.get_mut(&runner_id)
{
let tick = self.orders.stored_tick(key);
runner_book.decrement_level_total(side, tick, m.matched_delta);
if m.remaining_stake.0 == 0 && self.orders.in_level_by_key(key) {
runner_book.unlink(&mut self.orders, key, Money::zero());
}
}
}
if let Some(order) = self.orders.get_mut(&m.order_id) {
order.matched = Money(order.matched.0 + m.matched_delta.0);
order.info.last_updated_at = ts;
order.info.state = if m.remaining_stake.0 == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if m.remaining_stake.0 == 0 {
if was_live {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.orders.remove(&m.order_id);
}
let current = PendingTradeView {
order_id: m.order_id,
role: m.role,
runner_id: m.runner_id,
matched_delta: m.matched_delta,
};
let key = Self::trade_pair_key(m.order_id, m.counter_party);
if let Some(first) = self.pending_trade_views.remove(&key) {
assert_eq!(first.order_id, m.counter_party);
let (maker, taker) = match (first.role, current.role) {
(TradeRole::Maker, TradeRole::Taker) => (first, current),
(TradeRole::Taker, TradeRole::Maker) => (current, first),
_ => {
debug_assert!(
false,
"trade pair must contain exactly one maker and one taker"
);
return;
}
};
self.add_runner_matched_volume(taker.runner_id, taker.matched_delta);
if maker.runner_id != taker.runner_id {
self.add_runner_matched_volume(maker.runner_id, maker.matched_delta);
}
} else {
self.pending_trade_views.insert(key, current);
}
}
fn add_runner_matched_volume(&mut self, runner_id: RunnerId, delta: Money) {
*self
.runner_matched_volume
.entry(runner_id)
.or_insert(Money::zero()) = Money(
self.runner_matched_volume
.get(&runner_id)
.unwrap_or(&Money::zero())
.0
+ delta.0,
);
}
pub(crate) fn order_cancelled(&mut self, order_id: OrderId) {
if let Some(order) = self.orders.get(&order_id)
&& matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
{
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.remove_from_levels(order_id);
self.orders.remove(&order_id);
}
fn emit(&self, event_time: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_seq: 0,
timestamp: event_time,
metadata: EventMetadata::default(),
event,
}
}
fn state_change(&self, ts: DateTime, to: BookMarketState, reason: &str, out: &mut EventVec) {
if self.state == to {
return;
}
out.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
},
));
}
fn phase_change(&self, ts: DateTime, to: MarketPhase, reason: &str, out: &mut EventVec) {
if self.market_phase == to {
return;
}
out.push(self.emit(
ts,
BookEvent::MarketPhaseChanged {
to,
reason: reason.to_string(),
},
));
}
fn cmd_open_market(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Open)?;
let can_reopen_during_batch =
self.has_active_batch_process() && self.state == BookMarketState::Closed;
if self.state.is_terminal() && !can_reopen_during_batch {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, reason, &mut events);
Ok((events, None))
}
fn cmd_set_market_phase(
&mut self,
ts: DateTime,
phase: MarketPhase,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_phase_change(self.market_phase, phase)?;
if !self
.market_kind
.can_transition_to_phase(self.market_phase, phase)
{
return Err(RejectReason::MarketInPlayNotSupported);
}
if self.has_active_batch_process() {
if self.state == BookMarketState::Settled {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.phase_change(ts, phase, reason, &mut events);
return Ok((events, None));
}
if self.state != BookMarketState::Open {
return Err(RejectReason::MarketNotOpen);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.phase_change(ts, phase, reason, &mut events);
if self.market_phase == MarketPhase::Pre && phase == MarketPhase::Live {
let batch_max_events = self.close_batch_max_events;
let chunk = self.select_cancelled_chunk(None, batch_max_events as usize, |order| {
crate::book::close_process::is_live_order(order)
&& order.persistence == Persistence::Lapse
});
push_cancel_chunk(
&mut events,
|event| self.emit(ts, event),
chunk,
CancelCause::InPlayLapse,
None,
false,
true,
);
}
Ok((events, None))
}
fn cmd_suspend(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Suspended)?;
let can_suspend_during_batch = self.has_active_batch_process()
&& matches!(
self.state,
BookMarketState::Closed | BookMarketState::Halted
);
if self.state.is_terminal() && !can_suspend_during_batch {
return Err(RejectReason::MarketTerminal);
}
if !self.state.is_matchable() && !can_suspend_during_batch {
return Err(RejectReason::MarketNotOpen);
}
let mut events = EventVec::new();
if self.has_active_batch_process() {
self.state_change(ts, BookMarketState::Suspended, reason, &mut events);
return Ok((events, None));
}
if self.state == BookMarketState::Open {
let batch_max_events = self.close_batch_max_events;
let chunk = self.select_cancelled_chunk(None, batch_max_events as usize, |order| {
crate::book::close_process::is_live_order(order)
&& order.persistence == Persistence::Lapse
});
self.state_change(ts, BookMarketState::Suspended, reason, &mut events);
push_cancel_chunk(
&mut events,
|event| self.emit(ts, event),
chunk,
CancelCause::SuspendLapse,
None,
false,
true,
);
return Ok((events, None));
}
self.state_change(ts, BookMarketState::Suspended, reason, &mut events);
Ok((events, None))
}
fn cmd_unsuspend(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state != BookMarketState::Suspended {
return Err(RejectReason::MarketNotSuspended);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, reason, &mut events);
Ok((events, None))
}
fn cmd_halt_market(&mut self, ts: DateTime, reason: u32) -> Result<EventVec, RejectReason> {
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
let can_halt_closed_batch =
self.has_active_batch_process() && self.state == BookMarketState::Closed;
if self.state.is_terminal() && !can_halt_closed_batch {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.state_change(
ts,
BookMarketState::Halted,
&format!("HALT:{reason}"),
&mut events,
);
Ok(events)
}
fn cmd_resume_market(&mut self, ts: DateTime) -> Result<EventVec, RejectReason> {
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, "RESUME", &mut events);
Ok(events)
}
fn cmd_close_market(
&mut self,
ts: DateTime,
batch_max_events: u16,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Closed)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self
.batch_process_state()
.is_some_and(BatchProcessState::is_close)
{
return Err(RejectReason::MarketBatchCancelling);
}
let reason = crate::book::close_process::close_start_reason(batch_max_events);
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Closed, reason.as_str(), &mut events);
let chunk = self.select_cancelled_chunk(None, batch_max_events as usize, |order| {
crate::book::close_process::is_live_order(order)
});
push_cancel_chunk(
&mut events,
|event| self.emit(ts, event),
chunk,
CancelCause::CloseCancel,
None,
false,
true,
);
Ok((events, None))
}
fn cmd_continue_close_market(
&mut self,
ts: DateTime,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(proc_state) = self.batch_process_state() else {
return Err(RejectReason::MarketNotBatchCancelling);
};
if !proc_state.is_close() {
return Err(RejectReason::MarketNotBatchCancelling);
}
let chunk = self.select_cancelled_chunk(
proc_state.cursor_after,
proc_state.batch_max_events as usize,
crate::book::close_process::is_live_order,
);
let mut events = EventVec::new();
push_cancel_chunk(
&mut events,
|event| self.emit(ts, event),
chunk,
CancelCause::CloseCancel,
None,
false,
true,
);
Ok((events, None))
}
#[allow(clippy::too_many_arguments)]
fn cmd_batch_cancel_orders(
&self,
ts: DateTime,
from_created_at_inclusive: Option<DateTime>,
to_created_at_inclusive: Option<DateTime>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
reason: &str,
final_event_metadata: Option<serde_json::Value>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
let batch_max_events = self.close_batch_max_events;
let from_ms_opt = from_created_at_inclusive.map(|d| d.timestamp_millis());
let to_ms_opt = to_created_at_inclusive.map(|d| d.timestamp_millis());
let metadata_json = final_event_metadata
.as_ref()
.and_then(|m| serde_json::to_string(m).ok());
let started_at_ms = ts.timestamp_millis();
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::BatchCancelStarted {
batch_max_events,
started_at_ms,
from_created_at_inclusive_ms: from_ms_opt,
to_created_at_inclusive_ms: to_ms_opt,
account_filter: account_filter.clone(),
runner_filter,
reason: reason.to_string(),
final_event_metadata_json: metadata_json,
},
));
let chunk = self.select_cancelled_chunk(None, batch_max_events as usize, |order| {
if !crate::book::close_process::is_live_order(order) {
return false;
}
let created_at_ms = order.info.created_at.timestamp_millis();
if created_at_ms > started_at_ms {
return false;
}
if let Some(from_ms) = from_ms_opt
&& created_at_ms < from_ms
{
return false;
}
if let Some(to_ms) = to_ms_opt
&& created_at_ms > to_ms
{
return false;
}
if let Some(account_id) = account_filter.as_ref()
&& &order.info.account_id != account_id
{
return false;
}
if let Some(runner_id) = runner_filter
&& order.runner_id != runner_id
{
return false;
}
true
});
let done = chunk.done;
let mut cancel_event = self.emit(
ts,
chunk.into_event(CancelCause::BatchCancel, Some(reason.to_string())),
);
if done {
cancel_event.metadata = final_event_metadata.clone();
}
events.push(cancel_event);
if done {
events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
}
Ok((events, None))
}
fn cmd_continue_batch_cancel_orders(
&self,
ts: DateTime,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(proc_state) = self.batch_process.as_ref() else {
return Err(RejectReason::MarketNotBatchCancelling);
};
if proc_state.kind != BatchProcessKind::Cancel {
return Err(RejectReason::MarketNotBatchCancelling);
}
let BatchProcessTarget::FilteredCancel {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
..
} = &proc_state.target
else {
return Err(RejectReason::MarketNotBatchCancelling);
};
let mut events = EventVec::new();
let chunk = self.select_cancelled_chunk(
proc_state.cursor_after,
proc_state.batch_max_events as usize,
|order| {
if !crate::book::close_process::is_live_order(order) {
return false;
}
let created_at_ms = order.info.created_at.timestamp_millis();
if created_at_ms > *started_at_ms {
return false;
}
if let Some(from_ms) = *from_created_at_inclusive_ms
&& created_at_ms < from_ms
{
return false;
}
if let Some(to_ms) = *to_created_at_inclusive_ms
&& created_at_ms > to_ms
{
return false;
}
if let Some(account_id) = account_filter.as_ref()
&& &order.info.account_id != account_id
{
return false;
}
if let Some(runner_id) = *runner_filter
&& order.runner_id != runner_id
{
return false;
}
true
},
);
let done = chunk.done;
let mut cancel_event = self.emit(
ts,
chunk.into_event(
proc_state.reason.cancel_cause,
proc_state.reason.detail.clone(),
),
);
if done {
cancel_event.metadata = proc_state
.final_event_metadata_json()
.and_then(|m| serde_json::from_str(m).ok());
}
events.push(cancel_event);
if done {
events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
}
Ok((events, None))
}
fn cmd_continue_lapse_orders(
&self,
ts: DateTime,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(proc_state) = self.batch_process.as_ref() else {
return Err(RejectReason::MarketNotBatchCancelling);
};
if proc_state.kind != BatchProcessKind::Lapse {
return Err(RejectReason::MarketNotBatchCancelling);
}
let chunk = self.select_cancelled_chunk(
proc_state.cursor_after,
proc_state.batch_max_events as usize,
|order| {
crate::book::close_process::is_live_order(order)
&& order.persistence == Persistence::Lapse
},
);
let mut events = EventVec::new();
push_cancel_chunk(
&mut events,
|event| self.emit(ts, event),
chunk,
proc_state.reason.cancel_cause,
proc_state.reason.detail.clone(),
true,
true,
);
Ok((events, None))
}
#[allow(clippy::too_many_arguments)]
fn cmd_place_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
time_in_force: TimeInForce,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
debug_assert!(self.state.is_matchable());
if self.removed_runners.contains(&runner_id) || !self.runners.contains(&runner_id) {
return Err(RejectReason::RunnerNotFound);
}
if !price.is_valid_tick() {
return Err(RejectReason::InvalidOdds);
}
if !stake.is_positive() {
return Err(RejectReason::InvalidStake);
}
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let TimeInForce::FillOrKill { min_fill } = time_in_force {
let required = match min_fill {
Some(qty) => {
let value = i64::try_from(qty.0).map_err(|_| RejectReason::InvalidStake)?;
Money(value)
}
None => stake,
};
let available = self.available_to_match(runner_id, side, price);
if available.0 < required.0 {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.clear();
scratch.events.push(self.emit(
ts,
BookEvent::OrderAccepted {
correlation_id: correlation_id.clone(),
order_id,
account_id: account_id.clone(),
runner_id,
runner_label: self.runner_label(runner_id).to_string(),
side,
price,
stake,
persistence,
time_in_force,
},
));
let (matched, avg_price) = self.plan_matching(
scratch,
MatchRequest {
ts,
taker_account_id: account_id.clone(),
taker_correlation_id: correlation_id.clone(),
taker_side: side,
taker_runner_id: runner_id,
taker_price: price,
taker_stake: stake,
taker_order_id: order_id,
},
);
let mut final_remaining = Money(stake.0.saturating_sub(matched.0));
let mut final_state = if final_remaining.0 == 0 {
BookOrderState::ExecutionComplete
} else if matched.0 > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let should_cancel_remainder =
matches!(time_in_force, TimeInForce::FillOrKill { .. }) && final_remaining.0 > 0;
if should_cancel_remainder {
final_remaining = Money::zero();
final_state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry {
order_id,
account_id,
correlation_id,
},
cancel_cause: CancelCause::FokRemainder,
cause_detail: None,
},
));
}
Ok((
std::mem::take(&mut scratch.events),
Some(CommandResponseKind::PlaceOrder(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Stake(matched),
avg_matched_price: avg_price.map(FillPrice::Odds),
remaining: FillQuantity::Stake(final_remaining),
final_order_state: Some(final_state),
})),
))
}
fn available_to_match(&self, runner_id: RunnerId, side: Side, price: OddsX10000) -> Money {
let Some(runner_book) = self.runner_books.get(&runner_id) else {
return Money::zero();
};
let Some(taker_tick) = price.tick_index() else {
return Money::zero();
};
let mut total = 0i64;
match side {
Side::Yes => {
let mut tick = runner_book.best_tick_desc(Side::No);
while let Some(t) = tick {
if t < taker_tick {
break;
}
total += runner_book.level_total_remaining(Side::No, t).0;
if t == 0 {
break;
}
tick = runner_book.next_tick_desc_from(Side::No, t - 1);
}
}
Side::No => {
let mut tick = runner_book.best_tick_asc(Side::Yes);
while let Some(t) = tick {
if t > taker_tick {
break;
}
total += runner_book.level_total_remaining(Side::Yes, t).0;
tick = runner_book.next_tick_asc_from(Side::Yes, t + 1);
}
}
}
Money(total)
}
fn plan_matching(&self, scratch: &mut Scratch, r: MatchRequest) -> (Money, Option<OddsX10000>) {
scratch.fills.clear();
scratch.maker_matched_delta.clear();
scratch.matchable_ticks.clear();
let mut remaining = r.taker_stake;
let mut matched_total = Money::zero();
let mut sum_price_qty: u128 = 0;
let Some(taker_tick) = r.taker_price.tick_index() else {
return (Money::zero(), None);
};
let Some(runner_book) = self.runner_books.get(&r.taker_runner_id) else {
return (Money::zero(), None);
};
match r.taker_side {
Side::Yes => {
let mut tick = runner_book.best_tick_desc(Side::No);
while let Some(t) = tick {
if t < taker_tick {
break;
}
scratch.matchable_ticks.push(t as u16);
if t == 0 {
break;
}
tick = runner_book.next_tick_desc_from(Side::No, t - 1);
}
}
Side::No => {
let mut tick = runner_book.best_tick_asc(Side::Yes);
while let Some(t) = tick {
if t > taker_tick {
break;
}
scratch.matchable_ticks.push(t as u16);
tick = runner_book.next_tick_asc_from(Side::Yes, t + 1);
}
}
}
for &tick_u16 in scratch.matchable_ticks.iter() {
if remaining.0 <= 0 {
break;
}
let tick = tick_u16 as usize;
let level_side = match r.taker_side {
Side::Yes => Side::No,
Side::No => Side::Yes,
};
let level_price = OddsX10000(crate::types::odds::TICK_LADDER[tick]);
for maker_key in runner_book.iter_level_keys(level_side, tick, &self.orders) {
if remaining.0 <= 0 {
break;
}
let maker = self.orders.order_by_key(maker_key);
let maker_order_id = maker.info.order_id;
if maker.info.account_id == r.taker_account_id {
continue;
}
let maker_is_live = matches!(
maker.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
);
if !maker_is_live {
continue;
}
let base_remaining = maker.stake.0.saturating_sub(maker.matched.0);
let delta = *scratch
.maker_matched_delta
.get(&maker_order_id)
.unwrap_or(&0);
let maker_remaining = base_remaining.saturating_sub(delta);
if maker_remaining <= 0 {
continue;
}
let fill_i64 = remaining.0.min(maker_remaining);
let fill_amount = Money(fill_i64);
remaining = Money(remaining.0 - fill_i64);
let maker_remaining_after = Money(maker_remaining.saturating_sub(fill_i64));
let taker_remaining_after = remaining;
*scratch
.maker_matched_delta
.entry(maker_order_id)
.or_insert(0) += fill_i64;
scratch.fills.push(PlannedFill {
maker_order_id,
maker_account_id: maker.info.account_id.clone(),
maker_correlation_id: maker.info.correlation_id.clone(),
maker_runner_id: maker.runner_id,
maker_side: maker.info.side,
price: level_price,
fill_amount,
maker_remaining_after,
taker_remaining_after,
});
matched_total = Money(matched_total.0 + fill_i64);
sum_price_qty =
sum_price_qty.saturating_add(level_price.0 as u128 * fill_i64.max(0) as u128);
}
}
let avg_price = if matched_total.0 > 0 {
let denom = matched_total.0 as u128;
let px = (sum_price_qty / denom).min(u32::MAX as u128) as u32;
Some(OddsX10000(px))
} else {
None
};
for fill in scratch.fills.iter() {
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
correlation_id: fill.maker_correlation_id.clone(),
order_id: fill.maker_order_id,
account_id: fill.maker_account_id.clone(),
role: TradeRole::Maker,
runner_id: fill.maker_runner_id,
runner_label: self.runner_label(fill.maker_runner_id).to_string(),
side: fill.maker_side,
market_phase: self.market_phase,
price: fill.price,
stake: fill.fill_amount,
counter_party: r.taker_order_id,
counter_party_account_id: r.taker_account_id.clone(),
remaining_stake: fill.maker_remaining_after,
matched_delta: fill.fill_amount,
},
));
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
correlation_id: r.taker_correlation_id.clone(),
order_id: r.taker_order_id,
account_id: r.taker_account_id.clone(),
role: TradeRole::Taker,
runner_id: r.taker_runner_id,
runner_label: self.runner_label(r.taker_runner_id).to_string(),
side: r.taker_side,
market_phase: self.market_phase,
price: fill.price,
stake: fill.fill_amount,
counter_party: fill.maker_order_id,
counter_party_account_id: fill.maker_account_id.clone(),
remaining_stake: fill.taker_remaining_after,
matched_delta: fill.fill_amount,
},
));
}
(matched_total, avg_price)
}
fn cmd_cancel_order(
&self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry::from_order_info(order_id, &order.info),
cancel_cause: match reason {
"USER_CANCEL" => CancelCause::UserCancel,
"REPLACE" => CancelCause::Replace,
"RUNNER_REMOVED" => CancelCause::RunnerRemoved,
_ => CancelCause::Admin,
},
cause_detail: Some(reason.to_string()),
},
));
Ok((events, None))
}
fn remove_from_levels(&mut self, order_id: OrderId) {
if !self.orders.is_in_level(&order_id) {
return;
}
let Some(order) = self.orders.get(&order_id) else {
return;
};
let remaining = order.remaining();
if remaining.0 <= 0 {
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let Some(runner_book) = self.runner_books.get_mut(&order.runner_id) else {
return;
};
runner_book.unlink(&mut self.orders, key, remaining);
}
#[allow(clippy::too_many_arguments)]
fn cmd_replace_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
old_order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
new_price: OddsX10000,
new_stake: Money,
persistence: Persistence,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(old) = self.orders.get(&old_order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
let is_live = matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let (place_events, place_ok) = self.cmd_place_order(
scratch,
ts,
correlation_id,
new_order_id,
account_id.clone(),
runner_id,
side,
new_price,
new_stake,
persistence,
TimeInForce::Gtc,
)?;
let (cancel_events, _) = self.cmd_cancel_order(ts, old_order_id, account_id, "REPLACE")?;
let mut events = cancel_events;
events.extend(place_events);
Ok((events, place_ok))
}
fn cmd_remove_runner(
&mut self,
ts: DateTime,
runner_id: RunnerId,
reduction_factor_bps: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if !self.runners.contains(&runner_id) {
return Err(RejectReason::RunnerNotFound);
}
if self.removed_runners.contains(&runner_id) {
return Err(RejectReason::RunnerAlreadyRemoved);
}
let mut events = EventVec::new();
let orders_to_cancel: Vec<(OrderId, AccountId)> = self
.orders
.iter_sorted()
.filter_map(|(oid, o)| {
if o.runner_id == runner_id
&& matches!(
o.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
)
{
Some((oid, o.info.account_id.clone()))
} else {
None
}
})
.collect();
for (oid, account_id) in orders_to_cancel {
let (cancel_events, _) =
self.cmd_cancel_order(ts, oid, account_id, "RUNNER_REMOVED")?;
events.extend(cancel_events);
}
events.push(self.emit(
ts,
BookEvent::RunnerRemoved {
runner_id,
runner_label: self.runner_label(runner_id).to_string(),
reduction_factor_bps,
},
));
Ok((events, None))
}
fn cmd_settle_market(
&mut self,
ts: DateTime,
runner_results: &[RunnerSettlement],
void_reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
validate_market_settlement(
self.state,
runner_results,
self.runners
.iter()
.filter(|runner_id| !self.removed_runners.contains(runner_id))
.copied(),
)?;
let mut events = EventVec::new();
events.push(self.emit(
ts,
build_market_settled_event(runner_results, void_reason, |runner_id| {
self.runner_label(runner_id).to_string()
}),
));
self.state_change(ts, BookMarketState::Settled, "SETTLED", &mut events);
Ok((events, None))
}
fn cmd_void_trades(
&mut self,
ts: DateTime,
start_time: DateTime,
end_time: DateTime,
void_reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state == BookMarketState::Settled {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::VoidTrades {
market_phase: self.market_phase,
start_time,
end_time,
void_reason: void_reason.to_string(),
},
));
Ok((events, None))
}
}