use super::error::BookError;
use super::protocol::{
command::{Command, CommandKind, MarketState, Persistence, Side, TimeInForce},
reject::RejectReason,
};
use crate::book::common::types::BookOrderInfo;
use crate::types::*;
use chrono::Utc;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use tracing::error;
use super::common::fast::{OrderKey, OrderStore, RunnerBook};
use super::common::*;
use serde::{Deserialize, Serialize};
type TickIndex = u16;
type RunnerLevelKey = (RunnerId, Side, TickIndex);
type RestingLevelOrder = (DateTime, OrderId, OrderKey, Money);
type RestingOrdersByLevel = HashMap<RunnerLevelKey, Vec<RestingLevelOrder>>;
#[derive(Debug, Default)]
struct Scratch {
maker_matched_delta: HashMap<OrderId, i64>,
matchable_ticks: Vec<TickIndex>,
fills: Vec<PlannedFill>,
events: EventVec,
}
#[derive(Debug, Clone, Copy)]
struct PlannedFill {
maker_order_id: OrderId,
maker_runner_id: RunnerId,
maker_side: Side,
price: OddsX10000,
fill_amount: Money,
maker_remaining_after: Money,
taker_remaining_after: Money,
}
#[derive(Debug, Clone)]
struct TradeMatchView {
trade_id: TradeId,
order_id: OrderId,
runner_id: RunnerId,
price: OddsX10000,
stake: Money,
counter_party: OrderId,
remaining_stake: Money,
matched_delta: Money,
}
#[derive(Debug, Clone, Copy)]
struct PendingTradeView {
order_id: OrderId,
runner_id: RunnerId,
price: OddsX10000,
stake: Money,
matched_delta: Money,
}
#[derive(Debug, Clone)]
struct MatchRequest {
ts: DateTime,
taker_account_id: AccountId,
taker_side: Side,
taker_runner_id: RunnerId,
taker_price: OddsX10000,
taker_stake: Money,
taker_order_id: OrderId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MultiRunnerBookSnapshot {
market_id: MarketId,
market_kind: MarketKind,
state: BookMarketState,
state_before_suspend: Option<BookMarketState>,
state_before_halt: Option<BookMarketState>,
next_trade_id: u64,
next_order_id: u64,
runners: BTreeSet<RunnerId>,
allow_unknown_runners: bool,
removed_runners: BTreeSet<RunnerId>,
close_process: Option<CloseProcessState>,
orders: OrderStore,
trades: BTreeMap<TradeId, BookTrade>,
runner_matched_volume: BTreeMap<RunnerId, Money>,
}
#[derive(Debug)]
pub struct MultiRunnerBook {
market_id: MarketId,
market_kind: MarketKind,
state: BookMarketState,
state_before_suspend: Option<BookMarketState>,
state_before_halt: Option<BookMarketState>,
next_trade_id: u64,
next_order_id: u64,
runners: BTreeSet<RunnerId>,
allow_unknown_runners: bool,
removed_runners: BTreeSet<RunnerId>,
runner_books: HashMap<RunnerId, RunnerBook>,
close_process: Option<CloseProcessState>,
orders: OrderStore,
trades: BTreeMap<TradeId, BookTrade>,
runner_matched_volume: BTreeMap<RunnerId, Money>,
pending_trade_views: HashMap<TradeId, PendingTradeView>,
scratch: Scratch,
}
impl Serialize for MultiRunnerBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
MultiRunnerBookSnapshot {
market_id: self.market_id,
market_kind: self.market_kind,
state: self.state,
state_before_suspend: self.state_before_suspend,
state_before_halt: self.state_before_halt,
next_trade_id: self.next_trade_id,
next_order_id: self.next_order_id,
runners: self.runners.clone(),
allow_unknown_runners: self.allow_unknown_runners,
removed_runners: self.removed_runners.clone(),
close_process: self.close_process,
orders: self.orders.clone(),
trades: self.trades.clone(),
runner_matched_volume: self.runner_matched_volume.clone(),
}
.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for MultiRunnerBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = MultiRunnerBookSnapshot::deserialize(deserializer)?;
let mut book = Self {
market_id: snap.market_id,
market_kind: snap.market_kind,
state: snap.state,
state_before_suspend: snap.state_before_suspend,
state_before_halt: snap.state_before_halt,
next_trade_id: snap.next_trade_id,
next_order_id: snap.next_order_id,
runners: snap.runners,
allow_unknown_runners: snap.allow_unknown_runners,
removed_runners: snap.removed_runners,
runner_books: HashMap::new(),
close_process: snap.close_process,
orders: snap.orders,
trades: snap.trades,
runner_matched_volume: snap.runner_matched_volume,
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
};
book.rebuild_ladders();
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}
impl Clone for MultiRunnerBook {
fn clone(&self) -> Self {
Self {
market_id: self.market_id,
market_kind: self.market_kind,
state: self.state,
state_before_suspend: self.state_before_suspend,
state_before_halt: self.state_before_halt,
next_trade_id: self.next_trade_id,
next_order_id: self.next_order_id,
runners: self.runners.clone(),
allow_unknown_runners: self.allow_unknown_runners,
removed_runners: self.removed_runners.clone(),
runner_books: self.runner_books.clone(),
close_process: self.close_process,
orders: self.orders.clone(),
trades: self.trades.clone(),
runner_matched_volume: self.runner_matched_volume.clone(),
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
}
impl MultiRunnerBook {
pub fn new(
market_id: MarketId,
market_kind: MarketKind,
runner_ids: impl IntoIterator<Item = RunnerId>,
) -> Self {
Self::new_with_capacity(market_id, market_kind, runner_ids, 20_000)
}
pub fn new_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
runner_ids: impl IntoIterator<Item = RunnerId>,
order_store_capacity: usize,
) -> Self {
let runners: BTreeSet<RunnerId> = runner_ids.into_iter().collect();
let runner_books: HashMap<RunnerId, RunnerBook> = runners
.iter()
.map(|&rid| (rid, RunnerBook::new()))
.collect();
Self {
market_id,
market_kind,
state: BookMarketState::Open,
state_before_suspend: None,
state_before_halt: None,
next_trade_id: 1,
next_order_id: 1,
runners,
allow_unknown_runners: false,
removed_runners: BTreeSet::new(),
runner_books,
close_process: None,
orders: OrderStore::with_capacity(order_store_capacity),
trades: BTreeMap::new(),
runner_matched_volume: BTreeMap::new(),
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
pub fn new_engine(market_id: MarketId, market_kind: MarketKind) -> Self {
let mut b = Self::new_with_capacity(
market_id,
market_kind,
std::iter::empty::<RunnerId>(),
20_000,
);
b.allow_unknown_runners = true;
b
}
pub fn new_engine_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
order_store_capacity: usize,
) -> Self {
let mut b = Self::new_with_capacity(
market_id,
market_kind,
std::iter::empty::<RunnerId>(),
order_store_capacity,
);
b.allow_unknown_runners = true;
b
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_kind(&self) -> MarketKind {
self.market_kind
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn get_order(&self, order_id: OrderId) -> Option<&BookOrder> {
self.orders.get(&order_id)
}
pub fn get_trade(&self, trade_id: TradeId) -> Option<&BookTrade> {
self.trades.get(&trade_id)
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders.is_in_level(&order_id)
}
pub fn active_order_count(&self) -> usize {
self.orders
.iter_sorted()
.filter(|(_, o)| {
matches!(
o.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
)
})
.count()
}
pub fn close_process_state(&self) -> Option<CloseProcessState> {
self.close_process
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
self.runners.iter().copied()
}
fn ensure_runner(&mut self, runner_id: RunnerId) {
if self.runners.insert(runner_id) {
self.runner_books.insert(runner_id, RunnerBook::new());
}
}
fn rebuild_ladders(&mut self) {
self.runner_books = self
.runners
.iter()
.copied()
.map(|rid| (rid, RunnerBook::new()))
.collect();
let mut per_level: RestingOrdersByLevel = HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
if self.removed_runners.contains(&order.runner_id) {
continue;
}
let remaining = order.remaining();
if remaining.0 <= 0 {
continue;
}
let tick: TickIndex = self.orders.stored_tick(key) as u16;
per_level
.entry((order.runner_id, order.info.side, tick))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((runner_id, _, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
let Some(rb) = self.runner_books.get_mut(&runner_id) else {
continue;
};
for (_, _, key, remaining) in items {
rb.insert_tail(&mut self.orders, key, remaining);
}
}
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
self.runner_books
.get(&runner_id)
.map(|rb| rb.runner_prices(runner_id, depth))
.unwrap_or(RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
})
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_back.into_iter().next()
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_lay.into_iter().next()
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
self.runner_matched_volume
.get(&runner_id)
.copied()
.unwrap_or(Money::zero())
}
pub fn total_matched(&self) -> Money {
Money(self.runner_matched_volume.values().map(|m| m.0).sum())
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let ts = Utc::now();
let market_id = cmd.market_id;
if market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason, .. } => {
let events = self.cmd_halt_market(ts, *reason).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
_ => {}
}
if self.state.is_halted() {
return Err(err(RejectReason::MarketHalted));
}
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let (events, kind) = match &cmd.kind {
CommandKind::CreateMarket { .. } => {
return Err(err(RejectReason::InternalError));
}
CommandKind::SetMarketState { state, .. } => {
let reason = "SET_MARKET_STATE";
match state {
MarketState::Open => {
if self.state == BookMarketState::Suspended {
self.cmd_unsuspend(ts, reason).map_err(&err)?
} else {
self.cmd_open_market(ts, reason).map_err(&err)?
}
}
MarketState::InPlay => self.cmd_turn_in_play(ts, reason).map_err(&err)?,
MarketState::Suspended => self.cmd_suspend(ts, reason).map_err(&err)?,
MarketState::Closed => self
.cmd_close_market(ts, crate::config::DEFAULT_SET_MARKET_STATE_BATCH)
.map_err(&err)?,
}
}
CommandKind::CloseMarket {
batch_max_events, ..
} => self.cmd_close_market(ts, *batch_max_events).map_err(&err)?,
CommandKind::ContinueCloseMarket => {
self.cmd_continue_close_market(ts).map_err(&err)?
}
CommandKind::VoidMarket { reason, .. } => {
self.cmd_void_market(ts, reason.as_str()).map_err(&err)?
}
CommandKind::SettleMarket {
runner_results,
dead_heat_divisor,
..
} => self
.cmd_settle_market(ts, runner_results.as_slice(), *dead_heat_divisor)
.map_err(&err)?,
CommandKind::PlaceOrder {
account_id,
runner_id,
side,
odds,
stake,
persistence,
time_in_force,
..
} => {
let order_id = OrderId(self.next_order_id);
self.cmd_place_order(
&mut scratch,
ts,
correlation_id.clone(),
order_id,
*account_id,
*runner_id,
*side,
*odds,
*stake,
*persistence,
*time_in_force,
)
.map_err(&err)?
}
CommandKind::ReplaceOrder {
order_id,
account_id,
new_odds,
new_stake,
..
} => {
let Some(old) = self.get_order(*order_id) else {
return Err(err(RejectReason::OrderNotFound));
};
if old.info.account_id != *account_id {
return Err(err(RejectReason::NotOrderOwner));
}
let new_price = new_odds.unwrap_or(old.price);
let new_stake = new_stake.unwrap_or_else(|| old.remaining());
let new_order_id = OrderId(self.next_order_id);
self.cmd_replace_order(
&mut scratch,
ts,
correlation_id.clone(),
*order_id,
new_order_id,
*account_id,
old.runner_id,
old.info.side,
new_price,
new_stake,
old.persistence,
)
.map_err(&err)?
}
CommandKind::CancelOrder {
order_id,
account_id,
..
} => self
.cmd_cancel_order(ts, *order_id, *account_id, "USER_CANCEL")
.map_err(&err)?,
CommandKind::PlaceBinaryOrder { .. } | CommandKind::ReplaceBinaryOrder { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::RemoveRunner {
runner_id,
reduction_factor_bps,
..
} => self
.cmd_remove_runner(ts, *runner_id, *reduction_factor_bps)
.map_err(&err)?,
CommandKind::CashoutRunner { .. } => {
return Err(err(RejectReason::CashoutNotSupported));
}
CommandKind::VoidTradesFromTime {
from_matched_at_inclusive,
reason,
..
} => self
.cmd_void_trades_from_time(ts, *from_matched_at_inclusive, reason.as_str())
.map_err(&err)?,
CommandKind::VoidTradeIds {
trade_ids, reason, ..
} => self
.cmd_void_trade_ids(ts, trade_ids.as_slice(), reason.as_str())
.map_err(&err)?,
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("handled above")
}
CommandKind::RemoveMarket => {
return Err(err(RejectReason::InternalError));
}
};
Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind,
},
))
})();
self.scratch = scratch;
result
}
pub(crate) fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
let event = &env.event;
match event {
BookEvent::MarketCreated { .. } => {}
BookEvent::MarketStateChanged { to, reason } => {
self.market_state_changed(*to, reason.as_str())
}
BookEvent::OrderAccepted {
order_id,
account_id,
runner_id,
side,
price,
stake,
persistence,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
self.order_accepted(
ts,
*order_id,
*account_id,
*runner_id,
*side,
*price,
*stake,
*persistence,
)
}
BookEvent::BinaryOrderAccepted { .. } => {}
BookEvent::OrderCancelled { order_id, .. } => self.order_cancelled(*order_id),
BookEvent::OrderLapsed { order_id, .. } => {
self.order_lapsed(*order_id);
}
BookEvent::OrderVoided { order_id, .. } => {
self.order_voided(*order_id);
}
BookEvent::TradeMatched {
trade_id,
order_id,
runner_id,
price,
stake,
counter_party,
remaining_stake,
matched_delta,
..
} => self.trade_matched(ts, TradeMatchView {
trade_id: *trade_id,
order_id: *order_id,
runner_id: *runner_id,
price: *price,
stake: *stake,
counter_party: *counter_party,
remaining_stake: *remaining_stake,
matched_delta: *matched_delta,
}),
BookEvent::BinaryTradeMatched { .. } => {}
BookEvent::TradeVoided { trade_id, .. } => self.trade_voided(ts, *trade_id),
BookEvent::RunnerRemoved { runner_id, .. } => {
self.ensure_runner(*runner_id);
self.removed_runners.insert(*runner_id);
}
BookEvent::MarketSettled { .. } => {}
BookEvent::MarketOrdersSettled {
cursor_after,
order_ids,
is_final,
} => {
for &order_id in order_ids {
self.order_cancelled(order_id);
}
if let Some(s) = self.close_process.as_mut() {
s.cursor_after = *cursor_after;
s.cancelled_total = s.cancelled_total.saturating_add(order_ids.len() as u64);
s.chunks_done = s.chunks_done.saturating_add(1);
if *is_final {
}
}
}
BookEvent::OrderRejected { .. } => {}
BookEvent::MarketRemoved { .. } => {}
}
}
pub(crate) fn market_state_changed(&mut self, to: BookMarketState, reason: &str) {
if self.market_kind == MarketKind::PreEventOnly {
assert!(
to != BookMarketState::TurnInPlayEnabled,
"pre-event-only market cannot be applied into TurnInPlayEnabled"
);
}
if to == BookMarketState::Closing {
let batch_max_events =
crate::book::close_process::parse_close_start_batch_max_events(reason)
.unwrap_or(crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS);
let total_live_orders =
crate::book::close_process::count_live_orders(self.orders.iter_sorted());
self.close_process = Some(CloseProcessState {
batch_max_events,
cursor_after: None,
total_live_orders,
cancelled_total: 0,
chunks_done: 0,
});
}
let from = self.state;
if to == BookMarketState::Suspended {
self.state_before_suspend = Some(from);
} else if from == BookMarketState::Suspended {
self.state_before_suspend = None;
}
if to == BookMarketState::Halted {
self.state_before_halt = Some(from);
} else if from == BookMarketState::Halted {
self.state_before_halt = None;
}
self.state = to;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn order_accepted(
&mut self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
) {
if self.orders.contains(&order_id) {
return;
}
if !self.runners.contains(&runner_id) {
self.ensure_runner(runner_id);
}
if let Err(err) = self.orders.insert(
order_id,
BookOrder {
info: BookOrderInfo {
order_id,
account_id,
side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
runner_id,
price,
stake,
matched: Money::zero(),
persistence,
},
) {
error!(order_id = ?order_id, reason = ?err, "failed to insert order");
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
error!(order_id = ?order_id, "missing order key after insert");
return;
};
self.runner_books
.entry(runner_id)
.or_default()
.insert_tail(&mut self.orders, key, stake);
}
fn trade_matched(&mut self, ts: DateTime, m: TradeMatchView) {
self.next_trade_id = self.next_trade_id.max(m.trade_id.0.saturating_add(1));
if let Some(order) = self.orders.get(&m.order_id) {
let runner_id = order.runner_id;
let side = order.info.side;
if let Some(key) = self.orders.get_key(&m.order_id)
&& let Some(runner_book) = self.runner_books.get_mut(&runner_id)
{
let tick = self.orders.stored_tick(key);
runner_book.decrement_level_total(side, tick, m.matched_delta);
if m.remaining_stake.0 == 0 && self.orders.in_level_by_key(key) {
runner_book.unlink(&mut self.orders, key, Money::zero());
}
}
}
if let Some(order) = self.orders.get_mut(&m.order_id) {
order.matched = Money(order.matched.0 + m.matched_delta.0);
order.info.last_updated_at = ts;
order.info.state = if m.remaining_stake.0 == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if m.remaining_stake.0 == 0 {
self.orders.remove(&m.order_id);
}
let current = PendingTradeView {
order_id: m.order_id,
runner_id: m.runner_id,
price: m.price,
stake: m.stake,
matched_delta: m.matched_delta,
};
if let Some(first) = self.pending_trade_views.remove(&m.trade_id) {
assert_eq!(first.order_id, m.counter_party);
assert_eq!(m.order_id, first.order_id.max(m.order_id));
let maker = first;
let taker = current;
self.add_runner_matched_volume(taker.runner_id, taker.matched_delta);
if maker.runner_id != taker.runner_id {
self.add_runner_matched_volume(maker.runner_id, maker.matched_delta);
}
self.trades.entry(m.trade_id).or_insert(BookTrade {
trade_id: m.trade_id,
maker_order_id: maker.order_id,
taker_order_id: taker.order_id,
runner_id: taker.runner_id,
price: taker.price,
stake: taker.stake,
matched_at: ts,
state: BookTradeState::Live,
});
} else {
self.pending_trade_views.insert(m.trade_id, current);
}
}
fn add_runner_matched_volume(&mut self, runner_id: RunnerId, delta: Money) {
*self
.runner_matched_volume
.entry(runner_id)
.or_insert(Money::zero()) = Money(
self.runner_matched_volume
.get(&runner_id)
.unwrap_or(&Money::zero())
.0
+ delta.0,
);
}
pub(crate) fn trade_voided(&mut self, ts: DateTime, trade_id: TradeId) {
self.pending_trade_views.remove(&trade_id);
let t = self
.trades
.get_mut(&trade_id)
.unwrap_or_else(|| panic!("trade_voided: trade {:?} not found", trade_id));
if t.state == BookTradeState::Voided {
return;
}
let stake = t.stake;
let runner_id = t.runner_id;
t.state = BookTradeState::Voided;
for oid in [t.maker_order_id, t.taker_order_id] {
let Some(o) = self.orders.get_mut(&oid) else {
continue;
};
o.matched = Money(o.matched.0.saturating_sub(stake.0));
o.stake = Money(o.stake.0.saturating_sub(stake.0));
o.info.last_updated_at = ts;
match o.info.state {
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
| BookOrderState::ExecutionComplete => {
let remaining = o.remaining();
o.info.state = if remaining.0 == 0 {
BookOrderState::ExecutionComplete
} else if o.matched.0 == 0 {
BookOrderState::ExecutableUnmatched
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
_ => {}
}
}
if let Some(v) = self.runner_matched_volume.get_mut(&runner_id) {
*v = Money(v.0.saturating_sub(stake.0));
}
}
pub(crate) fn order_cancelled(&mut self, order_id: OrderId) {
self.remove_from_levels(order_id);
self.orders.remove(&order_id);
}
pub(crate) fn order_lapsed(&mut self, order_id: OrderId) {
self.remove_from_levels(order_id);
self.orders.remove(&order_id);
}
pub(crate) fn order_voided(&mut self, order_id: OrderId) {
self.remove_from_levels(order_id);
self.orders.remove(&order_id);
}
fn emit(&self, event_time: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_seq: 0,
timestamp: event_time,
event,
}
}
fn state_change(&self, ts: DateTime, to: BookMarketState, reason: &str, out: &mut EventVec) {
if self.state == to {
return;
}
if self.market_kind == MarketKind::PreEventOnly {
assert!(
to != BookMarketState::TurnInPlayEnabled,
"pre-event-only market cannot transition to TurnInPlayEnabled"
);
}
out.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
},
));
}
fn cmd_open_market(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, reason, &mut events);
Ok((events, None))
}
fn cmd_turn_in_play(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.market_kind == MarketKind::PreEventOnly {
return Err(RejectReason::MarketInPlayNotSupported);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
if self.state == BookMarketState::Open {
let lapse_ids: Vec<OrderId> = self
.orders
.iter_sorted()
.filter_map(|(oid, o)| {
let is_live = matches!(
o.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
);
if is_live && o.persistence == Persistence::Lapse {
Some(oid)
} else {
None
}
})
.collect();
for oid in lapse_ids {
events.push(self.emit(
ts,
BookEvent::OrderLapsed {
order_id: oid,
reason: "IN_PLAY_LAPSE".to_string(),
},
));
}
}
self.state_change(ts, BookMarketState::TurnInPlayEnabled, reason, &mut events);
Ok((events, None))
}
fn cmd_suspend(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if !self.state.is_matchable() {
return Err(RejectReason::MarketNotOpen);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Suspended, reason, &mut events);
Ok((events, None))
}
fn cmd_unsuspend(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state != BookMarketState::Suspended {
return Err(RejectReason::MarketNotSuspended);
}
let restored = self.state_before_suspend.unwrap_or(BookMarketState::Open);
let mut events = EventVec::new();
self.state_change(ts, restored, reason, &mut events);
Ok((events, None))
}
fn cmd_halt_market(&mut self, ts: DateTime, reason: u32) -> Result<EventVec, RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
let mut events = EventVec::new();
self.state_change(
ts,
BookMarketState::Halted,
&format!("HALT:{reason}"),
&mut events,
);
Ok(events)
}
fn cmd_resume_market(&mut self, ts: DateTime) -> Result<EventVec, RejectReason> {
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let restored = self.state_before_halt.unwrap_or(BookMarketState::Open);
let mut events = EventVec::new();
self.state_change(ts, restored, "RESUME", &mut events);
Ok(events)
}
fn cmd_close_market(
&mut self,
ts: DateTime,
batch_max_events: u16,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state == BookMarketState::Closing {
return Err(RejectReason::MarketClosing);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if batch_max_events < MIN_CLOSE_BATCH_EVENTS {
return Err(RejectReason::InvalidBatchSize);
}
let events = close_start_batch(
batch_max_events,
|to, reason, out| {
let mut ev = EventVec::new();
self.state_change(ts, to, reason, &mut ev);
out.extend(ev);
},
|e| self.emit(ts, e),
|cursor_after, max_orders| {
let mut cancelled_order_ids = Vec::new();
let mut last = cursor_after;
let mut cancelled = 0usize;
let mut it = self.orders.iter_sorted_from(cursor_after).peekable();
while let Some((oid, order)) = it.next() {
last = Some(oid);
if !crate::book::close_process::is_live_order(order) {
continue;
}
cancelled_order_ids.push(oid);
cancelled += 1;
if cancelled >= max_orders {
let done = it.peek().is_none();
return (cancelled_order_ids, last, done, cancelled as u64);
}
}
(cancelled_order_ids, last, true, cancelled as u64)
},
);
Ok((EventVec::from_vec(events), None))
}
fn cmd_continue_close_market(
&mut self,
ts: DateTime,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state != BookMarketState::Closing {
return Err(RejectReason::MarketNotClosing);
}
let Some(proc_state) = self.close_process else {
return Err(RejectReason::InternalError);
};
let events = close_continue_batch(
proc_state,
|to, reason, out| {
let mut ev = EventVec::new();
self.state_change(ts, to, reason, &mut ev);
out.extend(ev);
},
|e| self.emit(ts, e),
|cursor_after, max_orders| {
let mut cancelled_order_ids = Vec::new();
let mut last = cursor_after;
let mut cancelled = 0usize;
let mut it = self.orders.iter_sorted_from(cursor_after).peekable();
while let Some((oid, order)) = it.next() {
last = Some(oid);
if !crate::book::close_process::is_live_order(order) {
continue;
}
cancelled_order_ids.push(oid);
cancelled += 1;
if cancelled >= max_orders {
let done = it.peek().is_none();
return (cancelled_order_ids, last, done, cancelled as u64);
}
}
(cancelled_order_ids, last, true, cancelled as u64)
},
);
Ok((EventVec::from_vec(events), None))
}
fn cmd_void_market(
&mut self,
ts: DateTime,
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state == BookMarketState::Voided {
return Err(RejectReason::MarketAlreadyVoided);
}
let mut events = EventVec::new();
let live_orders: Vec<OrderId> = self
.orders
.iter_sorted()
.filter_map(|(oid, o)| {
if matches!(
o.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
) {
Some(oid)
} else {
None
}
})
.collect();
for oid in live_orders {
events.push(self.emit(
ts,
BookEvent::OrderVoided {
order_id: oid,
reason: "MARKET_VOID".to_string(),
},
));
}
let trade_ids: Vec<TradeId> = self
.trades
.iter()
.filter_map(|(&tid, t)| (t.state == BookTradeState::Live).then_some(tid))
.collect();
for tid in trade_ids {
events.push(self.emit(
ts,
BookEvent::TradeVoided {
trade_id: tid,
reason: "MARKET_VOID".to_string(),
},
));
}
self.state_change(ts, BookMarketState::Voided, reason, &mut events);
Ok((events, None))
}
#[allow(clippy::too_many_arguments)]
fn cmd_place_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
time_in_force: TimeInForce,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if !self.state.is_matchable() {
return Err(RejectReason::MarketNotOpen);
}
if self.removed_runners.contains(&runner_id)
|| (!self.allow_unknown_runners && !self.runners.contains(&runner_id))
{
return Err(RejectReason::RunnerNotFound);
}
if !price.is_valid_tick() {
return Err(RejectReason::InvalidOdds);
}
if !stake.is_positive() {
return Err(RejectReason::InvalidStake);
}
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let TimeInForce::FillOrKill { min_fill } = time_in_force {
let required = match min_fill {
Some(qty) => {
let value = i64::try_from(qty.0).map_err(|_| RejectReason::InvalidStake)?;
Money(value)
}
None => stake,
};
let available = self.available_to_match(runner_id, side, price);
if available.0 < required.0 {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.clear();
scratch.events.push(self.emit(
ts,
BookEvent::OrderAccepted {
correlation_id,
order_id,
account_id,
runner_id,
side,
price,
stake,
persistence,
time_in_force,
},
));
let (matched, avg_price) = self.plan_matching(
scratch,
MatchRequest {
ts,
taker_account_id: account_id,
taker_side: side,
taker_runner_id: runner_id,
taker_price: price,
taker_stake: stake,
taker_order_id: order_id,
},
);
let mut final_remaining = Money(stake.0.saturating_sub(matched.0));
let mut final_state = if final_remaining.0 == 0 {
BookOrderState::ExecutionComplete
} else if matched.0 > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let should_cancel_remainder =
matches!(time_in_force, TimeInForce::FillOrKill { .. }) && final_remaining.0 > 0;
if should_cancel_remainder {
final_remaining = Money::zero();
final_state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_id,
reason: "FOK_REMAINDER".to_string(),
},
));
}
Ok((
std::mem::take(&mut scratch.events),
Some(CommandResponseKind::PlaceOrder(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Stake(matched),
avg_matched_price: avg_price.map(FillPrice::Odds),
remaining: FillQuantity::Stake(final_remaining),
final_order_state: Some(final_state),
})),
))
}
fn available_to_match(&self, runner_id: RunnerId, side: Side, price: OddsX10000) -> Money {
let Some(runner_book) = self.runner_books.get(&runner_id) else {
return Money::zero();
};
let Some(taker_tick) = price.tick_index() else {
return Money::zero();
};
let mut total = 0i64;
match side {
Side::Yes => {
let mut tick = runner_book.best_tick_asc(Side::No);
while let Some(t) = tick {
if t > taker_tick {
break;
}
total += runner_book.level_total_remaining(Side::No, t).0;
tick = runner_book.next_tick_asc_from(Side::No, t + 1);
}
}
Side::No => {
let mut tick = runner_book.best_tick_desc(Side::Yes);
while let Some(t) = tick {
if t < taker_tick {
break;
}
total += runner_book.level_total_remaining(Side::Yes, t).0;
if t == 0 {
break;
}
tick = runner_book.next_tick_desc_from(Side::Yes, t - 1);
}
}
}
Money(total)
}
fn plan_matching(&self, scratch: &mut Scratch, r: MatchRequest) -> (Money, Option<OddsX10000>) {
scratch.fills.clear();
scratch.maker_matched_delta.clear();
scratch.matchable_ticks.clear();
let mut remaining = r.taker_stake;
let mut matched_total = Money::zero();
let mut sum_price_qty: u128 = 0;
let Some(taker_tick) = r.taker_price.tick_index() else {
return (Money::zero(), None);
};
let Some(runner_book) = self.runner_books.get(&r.taker_runner_id) else {
return (Money::zero(), None);
};
match r.taker_side {
Side::Yes => {
let mut tick = runner_book.best_tick_asc(Side::No);
while let Some(t) = tick {
if t > taker_tick {
break;
}
scratch.matchable_ticks.push(t as u16);
tick = runner_book.next_tick_asc_from(Side::No, t + 1);
}
}
Side::No => {
let mut tick = runner_book.best_tick_desc(Side::Yes);
while let Some(t) = tick {
if t < taker_tick {
break;
}
scratch.matchable_ticks.push(t as u16);
if t == 0 {
break;
}
tick = runner_book.next_tick_desc_from(Side::Yes, t - 1);
}
}
}
for &tick_u16 in scratch.matchable_ticks.iter() {
if remaining.0 <= 0 {
break;
}
let tick = tick_u16 as usize;
let level_side = match r.taker_side {
Side::Yes => Side::No,
Side::No => Side::Yes,
};
let level_price = OddsX10000(crate::types::odds::TICK_LADDER[tick]);
for maker_key in runner_book.iter_level_keys(level_side, tick, &self.orders) {
if remaining.0 <= 0 {
break;
}
let maker = self.orders.order_by_key(maker_key);
let maker_order_id = maker.info.order_id;
if maker.info.account_id == r.taker_account_id {
continue;
}
let maker_is_live = matches!(
maker.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
);
if !maker_is_live {
continue;
}
let base_remaining = maker.stake.0.saturating_sub(maker.matched.0);
let delta = *scratch
.maker_matched_delta
.get(&maker_order_id)
.unwrap_or(&0);
let maker_remaining = base_remaining.saturating_sub(delta);
if maker_remaining <= 0 {
continue;
}
let fill_i64 = remaining.0.min(maker_remaining);
let fill_amount = Money(fill_i64);
remaining = Money(remaining.0 - fill_i64);
let maker_remaining_after = Money(maker_remaining.saturating_sub(fill_i64));
let taker_remaining_after = remaining;
*scratch
.maker_matched_delta
.entry(maker_order_id)
.or_insert(0) += fill_i64;
scratch.fills.push(PlannedFill {
maker_order_id,
maker_runner_id: maker.runner_id,
maker_side: maker.info.side,
price: level_price,
fill_amount,
maker_remaining_after,
taker_remaining_after,
});
matched_total = Money(matched_total.0 + fill_i64);
sum_price_qty =
sum_price_qty.saturating_add(level_price.0 as u128 * fill_i64.max(0) as u128);
}
}
let avg_price = if matched_total.0 > 0 {
let denom = matched_total.0 as u128;
let px = (sum_price_qty / denom).min(u32::MAX as u128) as u32;
Some(OddsX10000(px))
} else {
None
};
let mut next_trade_id = self.next_trade_id;
for fill in scratch.fills.iter().copied() {
let trade_id = TradeId(next_trade_id);
next_trade_id = next_trade_id.saturating_add(1);
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
trade_id,
order_id: fill.maker_order_id,
runner_id: fill.maker_runner_id,
side: fill.maker_side,
price: fill.price,
stake: fill.fill_amount,
counter_party: r.taker_order_id,
remaining_stake: fill.maker_remaining_after,
matched_delta: fill.fill_amount,
},
));
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
trade_id,
order_id: r.taker_order_id,
runner_id: r.taker_runner_id,
side: r.taker_side,
price: fill.price,
stake: fill.fill_amount,
counter_party: fill.maker_order_id,
remaining_stake: fill.taker_remaining_after,
matched_delta: fill.fill_amount,
},
));
}
(matched_total, avg_price)
}
fn cmd_cancel_order(
&self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_id,
reason: reason.to_string(),
},
));
Ok((events, None))
}
fn remove_from_levels(&mut self, order_id: OrderId) {
if !self.orders.is_in_level(&order_id) {
return;
}
let Some(order) = self.orders.get(&order_id) else {
return;
};
let remaining = order.remaining();
if remaining.0 <= 0 {
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let Some(runner_book) = self.runner_books.get_mut(&order.runner_id) else {
return;
};
runner_book.unlink(&mut self.orders, key, remaining);
}
#[allow(clippy::too_many_arguments)]
fn cmd_replace_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
old_order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
new_price: OddsX10000,
new_stake: Money,
persistence: Persistence,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(old) = self.orders.get(&old_order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
let is_live = matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let (place_events, place_ok) = self.cmd_place_order(
scratch,
ts,
correlation_id,
new_order_id,
account_id,
runner_id,
side,
new_price,
new_stake,
persistence,
TimeInForce::Gtc,
)?;
let (cancel_events, _) = self.cmd_cancel_order(ts, old_order_id, account_id, "REPLACE")?;
let mut events = cancel_events;
events.extend(place_events);
Ok((events, place_ok))
}
fn cmd_remove_runner(
&mut self,
ts: DateTime,
runner_id: RunnerId,
reduction_factor_bps: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if !self.runners.contains(&runner_id) {
return Err(RejectReason::RunnerNotFound);
}
if self.removed_runners.contains(&runner_id) {
return Err(RejectReason::RunnerAlreadyRemoved);
}
let mut events = EventVec::new();
let orders_to_cancel: Vec<(OrderId, AccountId)> = self
.orders
.iter_sorted()
.filter_map(|(oid, o)| {
if o.runner_id == runner_id
&& matches!(
o.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
)
{
Some((oid, o.info.account_id))
} else {
None
}
})
.collect();
for (oid, account_id) in orders_to_cancel {
let (cancel_events, _) =
self.cmd_cancel_order(ts, oid, account_id, "RUNNER_REMOVED")?;
events.extend(cancel_events);
}
let trades_to_void: Vec<TradeId> = self
.trades
.iter()
.filter_map(|(&tid, t)| {
if t.runner_id == runner_id && t.state == BookTradeState::Live {
Some(tid)
} else {
None
}
})
.collect();
for tid in trades_to_void {
events.push(self.emit(
ts,
BookEvent::TradeVoided {
trade_id: tid,
reason: "RUNNER_REMOVED".to_string(),
},
));
}
events.push(self.emit(
ts,
BookEvent::RunnerRemoved {
runner_id,
reduction_factor_bps,
},
));
Ok((events, None))
}
fn cmd_settle_market(
&mut self,
ts: DateTime,
runner_results: &[(RunnerId, RunnerResult)],
dead_heat_divisor: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state != BookMarketState::Closed {
return Err(RejectReason::MarketNotClosed);
}
let result_runners: BTreeSet<_> = runner_results.iter().map(|(r, _)| *r).collect();
if result_runners.len() != runner_results.len() {
return Err(RejectReason::DuplicateRunner);
}
let active_runners: BTreeSet<_> = self
.runners
.iter()
.filter(|r| !self.removed_runners.contains(r))
.copied()
.collect();
if result_runners != active_runners {
return Err(RejectReason::IncompleteResults);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketSettled {
runner_results: runner_results.to_vec(),
dead_heat_divisor,
},
));
self.state_change(ts, BookMarketState::Settled, "SETTLED", &mut events);
Ok((events, None))
}
fn cmd_void_trades_from_time(
&mut self,
ts: DateTime,
from_inclusive: DateTime,
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state == BookMarketState::Voided || self.state == BookMarketState::Settled {
return Err(RejectReason::MarketTerminal);
}
let mut targets: Vec<(DateTime, TradeId)> = self
.trades
.iter()
.filter_map(|(&tid, t)| {
(t.state == BookTradeState::Live && t.matched_at >= from_inclusive)
.then_some((t.matched_at, tid))
})
.collect();
targets.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
let mut events = EventVec::new();
for (_, tid) in targets {
events.push(self.emit(
ts,
BookEvent::TradeVoided {
trade_id: tid,
reason: reason.to_string(),
},
));
}
Ok((events, None))
}
fn cmd_void_trade_ids(
&mut self,
ts: DateTime,
trade_ids: &[TradeId],
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state == BookMarketState::Voided || self.state == BookMarketState::Settled {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
let mut trade_ids: Vec<TradeId> = trade_ids.to_vec();
trade_ids.sort();
trade_ids.dedup();
for tid in trade_ids {
let Some(t) = self.trades.get(&tid) else {
continue;
};
if t.state == BookTradeState::Voided {
continue;
}
events.push(self.emit(
ts,
BookEvent::TradeVoided {
trade_id: tid,
reason: reason.to_string(),
},
));
}
Ok((events, None))
}
}