use super::error::{BookError, RequestedBookState};
use super::protocol::{
command::{
Command, CommandKind, MarketState, Persistence, ReduceOrderCondition, ReduceOrderTarget,
RunnerChange, Side, TimeInForce,
},
reject::RejectReason,
};
use crate::book::common::types::BookOrderInfo;
use crate::types::*;
use chrono::Utc;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use tracing::error;
use super::common::fast::{OrderKey, OrderStore, RunnerBook};
use super::common::*;
use serde::{Deserialize, Serialize};
type TickIndex = u16;
type TradePairKey = (OrderId, OrderId);
type RunnerLevelKey = (RunnerId, Side, TickIndex);
type RestingLevelOrder = (DateTime, OrderId, OrderKey, Money);
type RestingOrdersByLevel = HashMap<RunnerLevelKey, Vec<RestingLevelOrder>>;
#[derive(Debug, Default)]
struct Scratch {
maker_matched_delta: HashMap<OrderId, i64>,
matchable_ticks: Vec<TickIndex>,
fills: Vec<PlannedFill>,
events: EventVec,
}
#[derive(Debug, Clone)]
struct PlannedFill {
maker_order_id: OrderId,
maker_account_id: AccountId,
maker_correlation_id: Option<CorrelationId>,
maker_runner_id: RunnerId,
maker_side: Side,
price: OddsX10000,
fill_amount: Money,
maker_remaining_after: Money,
taker_remaining_after: Money,
}
#[derive(Debug, Clone)]
struct TradeMatchView {
order_id: OrderId,
role: TradeRole,
runner_id: RunnerId,
counter_party: OrderId,
remaining_stake: Money,
matched_delta: Money,
}
#[derive(Debug, Clone, Copy)]
struct PendingTradeView {
order_id: OrderId,
role: TradeRole,
runner_id: RunnerId,
matched_delta: Money,
}
#[derive(Debug, Clone)]
struct MatchRequest {
ts: DateTime,
taker_account_id: AccountId,
taker_correlation_id: Option<CorrelationId>,
taker_side: Side,
taker_runner_id: RunnerId,
taker_price: OddsX10000,
taker_stake: Money,
taker_order_id: OrderId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MultiRunnerBookSnapshot {
market_id: MarketId,
market_name: String,
market_kind: MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
next_order_id: u64,
close_batch_max_events: u16,
runners: BTreeSet<RunnerId>,
runner_labels: HashMap<RunnerId, String>,
allow_unknown_runners: bool,
removed_runners: BTreeSet<RunnerId>,
batch_process: Option<BatchProcessState>,
#[serde(default)]
queued_batches: Vec<BatchProcessDescriptor>,
orders: OrderStore,
runner_matched_volume: BTreeMap<RunnerId, Money>,
}
#[derive(Debug)]
pub struct MultiRunnerBook {
market_id: MarketId,
pub(crate) market_name: String,
market_kind: MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
next_order_id: u64,
close_batch_max_events: u16,
runners: BTreeSet<RunnerId>,
runner_labels: HashMap<RunnerId, String>,
allow_unknown_runners: bool,
removed_runners: BTreeSet<RunnerId>,
runner_books: HashMap<RunnerId, RunnerBook>,
batch_process: Option<BatchProcessState>,
queued_batches: Vec<BatchProcessDescriptor>,
orders: OrderStore,
active_order_count_cache: usize,
runner_matched_volume: BTreeMap<RunnerId, Money>,
pending_trade_views: HashMap<TradePairKey, PendingTradeView>,
scratch: Scratch,
}
impl Serialize for MultiRunnerBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
MultiRunnerBookSnapshot {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
next_order_id: self.next_order_id,
close_batch_max_events: self.close_batch_max_events,
runners: self.runners.clone(),
runner_labels: self
.runner_labels
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect(),
allow_unknown_runners: self.allow_unknown_runners,
removed_runners: self.removed_runners.clone(),
batch_process: self.batch_process.clone(),
queued_batches: self.queued_batches.clone(),
orders: self.orders.clone(),
runner_matched_volume: self.runner_matched_volume.clone(),
}
.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for MultiRunnerBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = MultiRunnerBookSnapshot::deserialize(deserializer)?;
let mut book = Self {
market_id: snap.market_id,
market_name: snap.market_name,
market_kind: snap.market_kind,
state: snap.state,
market_phase: snap.market_phase,
next_order_id: snap.next_order_id,
close_batch_max_events: snap.close_batch_max_events,
runners: snap.runners,
runner_labels: snap.runner_labels.into_iter().collect(),
allow_unknown_runners: snap.allow_unknown_runners,
removed_runners: snap.removed_runners,
runner_books: HashMap::new(),
batch_process: snap.batch_process,
queued_batches: snap.queued_batches,
orders: snap.orders,
active_order_count_cache: 0,
runner_matched_volume: snap.runner_matched_volume,
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
};
book.rebuild_ladders();
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}
impl Clone for MultiRunnerBook {
fn clone(&self) -> Self {
Self {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
next_order_id: self.next_order_id,
close_batch_max_events: self.close_batch_max_events,
runners: self.runners.clone(),
runner_labels: self.runner_labels.clone(),
allow_unknown_runners: self.allow_unknown_runners,
removed_runners: self.removed_runners.clone(),
runner_books: self.runner_books.clone(),
batch_process: self.batch_process.clone(),
queued_batches: self.queued_batches.clone(),
orders: self.orders.clone(),
active_order_count_cache: self.active_order_count_cache,
runner_matched_volume: self.runner_matched_volume.clone(),
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
}
impl MultiRunnerBook {
pub fn new(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
runner_ids: impl IntoIterator<Item = RunnerId>,
) -> Self {
Self::new_with_capacity(market_id, market_kind, market_phase, runner_ids, 20_000)
}
pub fn new_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
runner_ids: impl IntoIterator<Item = RunnerId>,
order_store_capacity: usize,
) -> Self {
assert_kind_supports_phase(market_kind, market_phase);
let runners: BTreeSet<RunnerId> = runner_ids.into_iter().collect();
let runner_labels: HashMap<RunnerId, String> =
runners.iter().map(|rid| (*rid, rid.to_string())).collect();
let runner_books: HashMap<RunnerId, RunnerBook> = runners
.iter()
.map(|&rid| (rid, RunnerBook::new()))
.collect();
Self {
market_id,
market_name: String::new(),
market_kind,
market_phase,
state: BookMarketState::Open,
next_order_id: 1,
close_batch_max_events: crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS,
runners,
runner_labels,
allow_unknown_runners: false,
removed_runners: BTreeSet::new(),
runner_books,
batch_process: None,
queued_batches: Vec::new(),
orders: OrderStore::with_capacity(order_store_capacity),
active_order_count_cache: 0,
runner_matched_volume: BTreeMap::new(),
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
pub fn new_engine(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
) -> Self {
Self::new_with_capacity(
market_id,
market_kind,
market_phase,
std::iter::empty::<RunnerId>(),
20_000,
)
}
pub fn new_engine_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
order_store_capacity: usize,
) -> Self {
Self::new_with_capacity(
market_id,
market_kind,
market_phase,
std::iter::empty::<RunnerId>(),
order_store_capacity,
)
}
pub(crate) fn set_close_batch_max_events(&mut self, batch_max_events: u16) {
crate::book::close_process::set_close_batch_max_events(
&mut self.close_batch_max_events,
batch_max_events,
);
}
pub(crate) fn set_market_state(&mut self, state: BookMarketState) {
self.state = state;
}
pub(crate) fn close_batch_max_events(&self) -> u16 {
self.close_batch_max_events
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_kind(&self) -> MarketKind {
self.market_kind
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn market_phase(&self) -> MarketPhase {
self.market_phase
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn get_order(&self, order_id: OrderId) -> Option<&BookOrder> {
self.orders.get(&order_id)
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders.is_in_level(&order_id)
}
pub fn active_order_count(&self) -> usize {
self.active_order_count_cache
}
fn select_cancelled_chunk(
&self,
cursor_after: Option<OrderId>,
max_orders: usize,
should_cancel: impl FnMut(&BookOrder) -> bool,
) -> CancelledOrdersChunk {
collect_cancelled_orders_chunk(
self.orders.iter_sorted_from(cursor_after),
max_orders,
should_cancel,
|order| &order.info,
)
}
pub fn batch_process_state(&self) -> Option<&BatchProcessState> {
self.batch_process.as_ref()
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
self.runners.iter().copied()
}
fn has_active_batch_process(&self) -> bool {
self.batch_process.is_some()
}
fn batch_context_for_mode(&self, batch_mode: BatchMode) -> BatchProcessContext {
match batch_mode {
BatchMode::Close => BatchProcessContext::Close {
total_live_orders: crate::book::close_process::count_live_orders(
self.orders.iter_sorted(),
),
},
_ => BatchProcessContext::None,
}
}
fn select_cancelled_chunk_for_target(
&self,
cursor_after: Option<OrderId>,
batch_max_events: usize,
target: &BatchProcessTarget,
) -> CancelledOrdersChunk {
match target {
BatchProcessTarget::AllLiveOrders => self.select_cancelled_chunk(
cursor_after,
batch_max_events,
crate::book::close_process::is_live_order,
),
BatchProcessTarget::LapseOrders => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
crate::book::close_process::is_live_order(order)
&& order.persistence == Persistence::Lapse
&& !self.removed_runners.contains(&order.runner_id)
})
}
BatchProcessTarget::RunnerRemoval { runner_ids } => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
crate::book::close_process::is_live_order(order)
&& runner_ids.contains(&order.runner_id)
})
}
BatchProcessTarget::Filtered { runner_filter, .. } => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
crate::book::close_process::is_live_order(order)
&& matches_filtered_batch_target(
target,
&order.info,
runner_filter.is_none_or(|runner_id| order.runner_id == runner_id),
)
})
}
}
}
pub(crate) fn set_runner_labels(&mut self, runner_ids: &[RunnerId], runner_labels: &[String]) {
assert_eq!(
runner_ids.len(),
runner_labels.len(),
"multi-runner label map length mismatch"
);
self.runner_labels.clear();
for runner_id in &self.runners {
self.runner_labels.insert(*runner_id, runner_id.to_string());
}
for (runner_id, runner_label) in runner_ids.iter().zip(runner_labels.iter()) {
if !self.runners.contains(runner_id) {
panic!("unknown runner id {:?} for multi-runner market", runner_id);
}
self.runner_labels.insert(*runner_id, runner_label.clone());
}
}
pub(crate) fn validate_runner_group(
runner_ids: &[RunnerId],
runner_labels: &[String],
) -> Result<(), RejectReason> {
if runner_ids.len() != runner_labels.len() {
return Err(RejectReason::InvalidMarketConfig);
}
if runner_labels.iter().any(|label| label.trim().is_empty()) {
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_runners: HashSet<_> = runner_ids.iter().copied().collect();
if distinct_runners.len() != runner_ids.len() {
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_labels: HashSet<_> = runner_labels.iter().map(String::as_str).collect();
if distinct_labels.len() != runner_labels.len() {
return Err(RejectReason::InvalidMarketConfig);
}
Ok(())
}
fn validate_runner_changes(changes: &[RunnerChange]) -> Result<(), RejectReason> {
if changes
.iter()
.any(|change| change.runner_label.trim().is_empty())
{
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_runners: HashSet<_> = changes.iter().map(|change| change.runner_id).collect();
if distinct_runners.len() != changes.len() {
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_labels: HashSet<_> = changes
.iter()
.map(|change| change.runner_label.as_str())
.collect();
if distinct_labels.len() != changes.len() {
return Err(RejectReason::InvalidMarketConfig);
}
Ok(())
}
fn plan_runner_additions<'a>(
&self,
additions: impl IntoIterator<Item = (RunnerId, &'a str)>,
) -> Result<(Vec<RunnerId>, Vec<String>), RejectReason> {
let mut added_runner_ids = Vec::new();
let mut added_runner_labels = Vec::new();
for (runner_id, runner_label) in additions {
if self.runners.contains(&runner_id) {
let Some(existing_label) = self.runner_labels.get(&runner_id) else {
return Err(RejectReason::InvalidMarketConfig);
};
if existing_label != runner_label {
return Err(RejectReason::InvalidMarketConfig);
}
if self.removed_runners.contains(&runner_id) {
added_runner_ids.push(runner_id);
added_runner_labels.push(runner_label.to_string());
}
continue;
}
if self
.runner_labels
.values()
.any(|label| label == runner_label)
{
return Err(RejectReason::InvalidMarketConfig);
}
added_runner_ids.push(runner_id);
added_runner_labels.push(runner_label.to_string());
}
Ok((added_runner_ids, added_runner_labels))
}
fn emit_runners_removed_and_start_batch(
&self,
ts: DateTime,
runner_ids: Vec<RunnerId>,
runner_labels: Vec<String>,
reduction_factor_bps: Option<u32>,
events: &mut EventVec,
) {
let target = BatchProcessTarget::RunnerRemoval {
runner_ids: runner_ids.clone(),
};
events.push(self.emit(
ts,
BookEvent::RunnersRemoved {
runner_ids,
runner_labels,
reduction_factor_bps,
},
));
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::RunnerRemovalCancel,
self.close_batch_max_events,
&target,
None,
events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
}
fn apply_runners_added(&mut self, runner_ids: &[RunnerId], runner_labels: &[String]) {
for (runner_id, runner_label) in runner_ids.iter().copied().zip(runner_labels.iter()) {
self.runners.insert(runner_id);
self.runner_books.entry(runner_id).or_default();
self.runner_labels.insert(runner_id, runner_label.clone());
self.removed_runners.remove(&runner_id);
}
}
fn apply_runners_removed(&mut self, runner_ids: &[RunnerId]) {
for runner_id in runner_ids {
assert!(
self.runners.contains(runner_id),
"runner removed event references unknown runner {:?}",
runner_id
);
self.removed_runners.insert(*runner_id);
}
self.unlink_live_orders_for_runners(runner_ids);
}
fn unlink_live_orders_for_runners(&mut self, runner_ids: &[RunnerId]) {
let runner_ids: HashSet<_> = runner_ids.iter().copied().collect();
let order_ids: Vec<OrderId> = self
.orders
.iter_sorted()
.filter_map(|(order_id, order)| {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
);
if is_live && runner_ids.contains(&order.runner_id) {
Some(order_id)
} else {
None
}
})
.collect();
for order_id in order_ids {
self.remove_from_levels(order_id);
}
}
pub fn runner_label(&self, runner_id: RunnerId) -> &str {
self.runner_labels
.get(&runner_id)
.map(|s| s.as_str())
.unwrap_or_else(|| panic!("unknown runner id {:?} for multi-runner market", runner_id))
}
fn rebuild_ladders(&mut self) {
self.runner_books = self
.runners
.iter()
.copied()
.map(|rid| (rid, RunnerBook::new()))
.collect();
self.active_order_count_cache = 0;
let mut per_level: RestingOrdersByLevel = HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
if self.removed_runners.contains(&order.runner_id) {
continue;
}
let remaining = order.remaining();
if remaining.0 <= 0 {
continue;
}
let tick: TickIndex = self.orders.stored_tick(key) as u16;
per_level
.entry((order.runner_id, order.info.side, tick))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((runner_id, _, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
let Some(rb) = self.runner_books.get_mut(&runner_id) else {
continue;
};
for (_, _, key, remaining) in items {
rb.insert_tail(&mut self.orders, key, remaining);
}
}
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
self.runner_books
.get(&runner_id)
.map(|rb| rb.runner_prices(runner_id, depth))
.unwrap_or(RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
})
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_back.into_iter().next()
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_lay.into_iter().next()
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
self.runner_matched_volume
.get(&runner_id)
.copied()
.unwrap_or(Money::zero())
}
pub fn total_matched(&self) -> Money {
Money(self.runner_matched_volume.values().map(|m| m.0).sum())
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let current_state = self.state;
let current_phase = self.market_phase;
let state_err = |reason, requested_state| {
BookError::state_error(
err_correlation_id.clone(),
reason,
current_state,
current_phase,
requested_state,
)
};
let ts = Utc::now();
let market_id = cmd.market_id;
if market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason, .. } => {
let events = self
.cmd_halt_market(ts, reason)
.map_err(|reason| state_err(reason, RequestedBookState::Halted))?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id,
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id,
kind: None,
},
));
}
_ => {}
}
cmd.kind
.validate_book_gate(self.batch_process_state(), self.state.is_halted())
.map_err(&err)?;
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let (events, kind) = match &cmd.kind {
CommandKind::CreateMarket { .. } => {
return Err(err(RejectReason::InternalError));
}
CommandKind::AddRunners {
runner_ids,
runner_labels,
} => self
.cmd_add_runners(ts, runner_ids, runner_labels)
.map_err(&err)?,
CommandKind::ChangeRunners { add, remove } => {
self.cmd_change_runners(ts, add, remove).map_err(&err)?
}
CommandKind::SetMarketState { state, .. } => {
let reason = "SET_MARKET_STATE";
match state {
MarketState::Open => {
self.cmd_open_market(ts, reason).map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?
}
MarketState::Suspended => {
self.cmd_suspend(ts, reason).map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?
}
MarketState::Deactivated => {
self.cmd_deactivate(ts, reason).map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?
}
MarketState::Closed => self
.cmd_close_market("SET_MARKET_STATE", ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
}
}
CommandKind::AwaitLiveMarket => self
.cmd_await_live_market(ts, "AWAIT_LIVE_MARKET")
.map_err(|reason| state_err(reason, RequestedBookState::AwaitLive))?,
CommandKind::ReturnToPreMarket { reason } => self
.cmd_return_to_pre_market(ts, reason.as_str())
.map_err(|reason| state_err(reason, RequestedBookState::Pre))?,
CommandKind::GoLiveMarket => self
.cmd_go_live_market(ts, "GO_LIVE_MARKET")
.map_err(|reason| state_err(reason, RequestedBookState::Live))?,
CommandKind::CloseMarket { reason } => self
.cmd_close_market(reason, ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(MarketState::Closed))
})?,
CommandKind::ContinueBatchProcess => {
self.cmd_continue_batch_process(ts).map_err(&err)?
}
CommandKind::BatchCancelOrders {
from_created_at_inclusive,
to_created_at_inclusive,
account_id,
runner_id,
reason,
} => self
.cmd_batch_cancel_orders(
ts,
*from_created_at_inclusive,
*to_created_at_inclusive,
account_id.clone(),
*runner_id,
reason.as_str(),
)
.map_err(&err)?,
CommandKind::PlaceOrder {
account_id,
runner_id,
side,
odds,
stake,
persistence,
time_in_force,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
let order_id = OrderId(self.next_order_id);
self.cmd_place_order(
&mut scratch,
ts,
correlation_id.clone(),
order_id,
account_id.clone(),
*runner_id,
*side,
*odds,
*stake,
*persistence,
*time_in_force,
)
.map_err(&err)?
}
CommandKind::ReduceOrder {
order_id,
account_id,
new_odds,
target,
condition,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
self.cmd_reduce_order(
&mut scratch,
ts,
correlation_id.clone(),
*order_id,
OrderId(self.next_order_id),
account_id.clone(),
*new_odds,
*target,
condition.as_ref(),
)
.map_err(&err)?
}
CommandKind::CancelOrder {
order_id,
account_id,
..
} => self
.cmd_cancel_order(ts, *order_id, account_id.clone(), CancelCause::UserCancel)
.map_err(&err)?,
CommandKind::PlaceBinaryOrder { .. } | CommandKind::ReduceBinaryOrder { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::RemoveRunner {
runner_id,
reduction_factor_bps,
..
} => self
.cmd_remove_runner(ts, *runner_id, *reduction_factor_bps)
.map_err(&err)?,
CommandKind::RemoveRunners {
runner_ids,
reduction_factor_bps,
..
} => self
.cmd_remove_runners(ts, runner_ids, *reduction_factor_bps)
.map_err(&err)?,
CommandKind::VoidTrades {
timestamp,
start_time,
end_time,
void_reason,
..
} => self
.cmd_void_trades(*timestamp, *start_time, *end_time, void_reason.as_str())
.map_err(&err)?,
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("handled above")
}
};
Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind,
},
))
})();
self.scratch = scratch;
result
}
pub(crate) fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
let event = &env.event;
let mut batch_process = self.batch_process.take();
let mut queued_batches = std::mem::take(&mut self.queued_batches);
let handled = if apply_batch_process_state_event(
&mut batch_process,
&mut queued_batches,
event,
|batch_mode| self.batch_context_for_mode(batch_mode),
) {
true
} else {
apply_batch_cancelled_chunk_event(&mut batch_process, event, |order_id| {
self.order_cancelled(order_id)
})
};
self.batch_process = batch_process;
self.queued_batches = queued_batches;
if handled {
return;
}
match event {
BookEvent::MarketCreated { .. } => {}
BookEvent::RunnersAdded {
runner_ids,
runner_labels,
} => self.apply_runners_added(runner_ids, runner_labels),
BookEvent::RunnersRemoved { runner_ids, .. } => self.apply_runners_removed(runner_ids),
BookEvent::MarketStateChanged {
to,
close_batch_max_events: _,
..
} => self.market_state_changed(*to),
BookEvent::MarketPhaseChanged { to, reason: _ } => self.market_phase_changed(*to),
BookEvent::OrderAccepted {
correlation_id,
order_id,
account_id,
runner_id,
side,
price,
stake,
persistence,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
self.order_accepted(
ts,
*order_id,
account_id.clone(),
correlation_id.clone(),
*runner_id,
*side,
*price,
*stake,
*persistence,
)
}
BookEvent::BinaryOrderAccepted { .. } => {}
BookEvent::OrderCancelled {
cancelled_order, ..
} => {
self.order_cancelled(cancelled_order.order_id);
}
BookEvent::TradeMatched {
order_id,
role,
runner_id,
counter_party,
remaining_stake,
matched_delta,
..
} => self.trade_matched(
ts,
TradeMatchView {
order_id: *order_id,
role: *role,
runner_id: *runner_id,
counter_party: *counter_party,
remaining_stake: *remaining_stake,
matched_delta: *matched_delta,
},
),
BookEvent::BinaryTradeMatched { .. } => {}
BookEvent::VoidTrades { .. } => {}
BookEvent::RunnerRemoved { runner_id, .. } => {
self.apply_runners_removed(&[*runner_id]);
}
BookEvent::MarketRemoved { .. } => {}
BookEvent::OrderCancelledBatched { .. }
| BookEvent::BatchProcessQueued { .. }
| BookEvent::BatchProcessStarted { .. }
| BookEvent::BatchProcessRetargeted { .. }
| BookEvent::BatchProcessCompleted { .. } => {
unreachable!("batch process events handled before match")
}
}
}
pub(crate) fn market_state_changed(&mut self, to: BookMarketState) {
self.state = to;
}
pub(crate) fn market_phase_changed(&mut self, to: MarketPhase) {
let from = self.market_phase;
assert!(
self.market_kind.can_transition_to_phase(from, to),
"market kind does not support applied phase transition"
);
self.market_phase = to;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn order_accepted(
&mut self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
correlation_id: Option<CorrelationId>,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
) {
if self.orders.contains(&order_id) {
return;
}
assert!(
self.runners.contains(&runner_id),
"order accepted event references unknown runner {:?}",
runner_id
);
if let Err(err) = self.orders.insert(
order_id,
BookOrder {
info: BookOrderInfo {
order_id,
account_id,
correlation_id,
side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
runner_id,
price,
stake,
matched: Money::zero(),
persistence,
},
) {
error!(order_id = ?order_id, reason = ?err, "failed to insert order");
return;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
let Some(key) = self.orders.get_key(&order_id) else {
error!(order_id = ?order_id, "missing order key after insert");
return;
};
self.runner_books
.entry(runner_id)
.or_default()
.insert_tail(&mut self.orders, key, stake);
}
fn trade_pair_key(order_id: OrderId, counter_party: OrderId) -> TradePairKey {
if order_id <= counter_party {
(order_id, counter_party)
} else {
(counter_party, order_id)
}
}
fn trade_matched(&mut self, ts: DateTime, m: TradeMatchView) {
let was_live = self.orders.get(&m.order_id).is_some_and(|o| {
matches!(
o.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
});
if let Some(order) = self.orders.get(&m.order_id) {
let runner_id = order.runner_id;
let side = order.info.side;
if let Some(key) = self.orders.get_key(&m.order_id)
&& let Some(runner_book) = self.runner_books.get_mut(&runner_id)
{
let tick = self.orders.stored_tick(key);
runner_book.decrement_level_total(side, tick, m.matched_delta);
if m.remaining_stake.0 == 0 && self.orders.in_level_by_key(key) {
runner_book.unlink(&mut self.orders, key, Money::zero());
}
}
}
if let Some(order) = self.orders.get_mut(&m.order_id) {
order.matched = Money(order.matched.0 + m.matched_delta.0);
order.info.last_updated_at = ts;
order.info.state = if m.remaining_stake.0 == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if m.remaining_stake.0 == 0 {
if was_live {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.orders.remove(&m.order_id);
}
let current = PendingTradeView {
order_id: m.order_id,
role: m.role,
runner_id: m.runner_id,
matched_delta: m.matched_delta,
};
let key = Self::trade_pair_key(m.order_id, m.counter_party);
if let Some(first) = self.pending_trade_views.remove(&key) {
assert_eq!(first.order_id, m.counter_party);
let (maker, taker) = match (first.role, current.role) {
(TradeRole::Maker, TradeRole::Taker) => (first, current),
(TradeRole::Taker, TradeRole::Maker) => (current, first),
_ => {
debug_assert!(
false,
"trade pair must contain exactly one maker and one taker"
);
return;
}
};
self.add_runner_matched_volume(taker.runner_id, taker.matched_delta);
if maker.runner_id != taker.runner_id {
self.add_runner_matched_volume(maker.runner_id, maker.matched_delta);
}
} else {
self.pending_trade_views.insert(key, current);
}
}
fn add_runner_matched_volume(&mut self, runner_id: RunnerId, delta: Money) {
*self
.runner_matched_volume
.entry(runner_id)
.or_insert(Money::zero()) = Money(
self.runner_matched_volume
.get(&runner_id)
.unwrap_or(&Money::zero())
.0
+ delta.0,
);
}
pub(crate) fn order_cancelled(&mut self, order_id: OrderId) {
if let Some(order) = self.orders.get(&order_id)
&& matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
{
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.remove_from_levels(order_id);
self.orders.remove(&order_id);
}
fn emit(&self, event_time: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_seq: 0,
timestamp: event_time,
metadata: EventMetadata::default(),
event,
}
}
fn state_change(
&self,
ts: DateTime,
to: BookMarketState,
reason: &str,
close_batch_max_events: Option<u16>,
out: &mut EventVec,
) {
if self.state == to {
return;
}
out.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
close_batch_max_events,
},
));
}
fn phase_change(&self, ts: DateTime, to: MarketPhase, reason: &str, out: &mut EventVec) {
if self.market_phase == to {
return;
}
out.push(self.emit(
ts,
BookEvent::MarketPhaseChanged {
to,
reason: reason.to_string(),
},
));
}
fn cmd_open_market(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Open)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, reason, None, &mut events);
Ok((events, None))
}
fn cmd_await_live_market(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let mut events = EventVec::new();
let plan =
plan_await_live_market_with_lapse(self.state, self.market_phase, self.market_kind)?;
if plan.suspend_market {
self.state_change(ts, BookMarketState::Suspended, reason, None, &mut events);
}
if plan.set_pre_await_live {
self.phase_change(ts, MarketPhase::PreAwaitLive, reason, &mut events);
}
if plan.start_lapse {
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::InPlayLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
}
Ok((events, None))
}
fn cmd_go_live_market(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let mut events = EventVec::new();
plan_go_live_market_with_lapse(self.state, self.market_phase, self.market_kind)?;
self.phase_change(ts, MarketPhase::Live, reason, &mut events);
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::InPlayLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Ok((events, None))
}
fn cmd_return_to_pre_market(
&mut self,
ts: DateTime,
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let mut events = EventVec::new();
plan_return_to_pre_market(self.state, self.market_phase, self.market_kind)?;
self.phase_change(ts, MarketPhase::Pre, reason, &mut events);
Ok((events, None))
}
fn cmd_suspend(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Suspended)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if !matches!(
self.state,
BookMarketState::Open | BookMarketState::Deactivated
) {
return Err(RejectReason::MarketNotOpen);
}
let mut events = EventVec::new();
if self.state == BookMarketState::Open {
self.state_change(ts, BookMarketState::Suspended, reason, None, &mut events);
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::SuspendLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
return Ok((events, None));
}
self.state_change(ts, BookMarketState::Suspended, reason, None, &mut events);
Ok((events, None))
}
fn cmd_deactivate(
&mut self,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Deactivated)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if !matches!(
self.state,
BookMarketState::Open | BookMarketState::Suspended
) {
return Err(RejectReason::MarketNotOpen);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Deactivated, reason, None, &mut events);
if self.state == BookMarketState::Open {
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::SuspendLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
}
Ok((events, None))
}
fn cmd_halt_market(&mut self, ts: DateTime, reason: &str) -> Result<EventVec, RejectReason> {
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Halted, reason, None, &mut events);
Ok(events)
}
fn cmd_resume_market(&mut self, ts: DateTime) -> Result<EventVec, RejectReason> {
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, "RESUME", None, &mut events);
Ok(events)
}
fn cmd_close_market(
&mut self,
reason: &str,
ts: DateTime,
batch_max_events: u16,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Closed)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self
.batch_process_state()
.is_some_and(BatchProcessState::is_close)
{
return Err(RejectReason::MarketBatchCancelling);
}
let mut events = EventVec::new();
self.state_change(
ts,
BookMarketState::Closed,
reason,
Some(batch_max_events),
&mut events,
);
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::Close,
batch_max_events,
&BatchProcessTarget::AllLiveOrders,
None,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Ok((events, None))
}
fn cmd_continue_batch_process(
&mut self,
ts: DateTime,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let mut events = EventVec::new();
continue_batch_process_with_queue(
self.batch_process_state(),
&self.queued_batches,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
)?;
Ok((events, None))
}
#[allow(clippy::too_many_arguments)]
fn cmd_batch_cancel_orders(
&self,
ts: DateTime,
from_created_at_inclusive: Option<DateTime>,
to_created_at_inclusive: Option<DateTime>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
let batch_max_events = self.close_batch_max_events;
let from_ms_opt = from_created_at_inclusive.map(|d| d.timestamp_millis());
let to_ms_opt = to_created_at_inclusive.map(|d| d.timestamp_millis());
let started_at_ms = ts.timestamp_millis();
let mut events = EventVec::new();
let target = filtered_batch_target(
started_at_ms,
from_ms_opt,
to_ms_opt,
account_filter,
runner_filter,
);
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::FilteredCancel,
batch_max_events,
&target,
Some(reason),
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Ok((events, None))
}
#[allow(clippy::too_many_arguments)]
fn cmd_place_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
time_in_force: TimeInForce,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
debug_assert!(self.state.is_matchable());
if self.removed_runners.contains(&runner_id) || !self.runners.contains(&runner_id) {
return Err(RejectReason::RunnerNotFound);
}
ensure_valid_odds_tick(price)?;
if !stake.is_positive() {
return Err(RejectReason::InvalidStake);
}
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let Some(required_qty) = time_in_force.required_fok_fill(Quantity(
u64::try_from(stake.0).map_err(|_| RejectReason::InvalidStake)?,
))? {
let required =
Money(i64::try_from(required_qty.0).map_err(|_| RejectReason::InvalidStake)?);
let available = self.available_to_match(runner_id, side, price);
if available.0 < required.0 {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.clear();
scratch.events.push(self.emit(
ts,
BookEvent::OrderAccepted {
correlation_id: correlation_id.clone(),
order_id,
account_id: account_id.clone(),
runner_id,
runner_label: self.runner_label(runner_id).to_string(),
side,
price,
stake,
persistence,
time_in_force,
},
));
let (matched, avg_price) = self.plan_matching(
scratch,
MatchRequest {
ts,
taker_account_id: account_id.clone(),
taker_correlation_id: correlation_id.clone(),
taker_side: side,
taker_runner_id: runner_id,
taker_price: price,
taker_stake: stake,
taker_order_id: order_id,
},
);
let mut final_remaining = Money(stake.0.saturating_sub(matched.0));
let mut final_state = if final_remaining.0 == 0 {
BookOrderState::ExecutionComplete
} else if matched.0 > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let remainder_cancel_cause =
time_in_force_remainder_cancel_cause(time_in_force, final_remaining.0 > 0);
if let Some(cancel_cause) = remainder_cancel_cause {
final_remaining = Money::zero();
final_state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry {
order_id,
account_id,
correlation_id,
},
cancel_cause,
cause_detail: None,
},
));
}
Ok((
std::mem::take(&mut scratch.events),
Some(CommandResponseKind::PlaceOrder(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Stake(matched),
avg_matched_price: avg_price.map(FillPrice::Odds),
remaining: FillQuantity::Stake(final_remaining),
final_order_state: Some(final_state),
})),
))
}
fn available_to_match(&self, runner_id: RunnerId, side: Side, price: OddsX10000) -> Money {
let Some(runner_book) = self.runner_books.get(&runner_id) else {
return Money::zero();
};
let Some(taker_tick) = price.tick_index() else {
return Money::zero();
};
let mut total = 0i64;
match side {
Side::Yes => {
let mut tick = runner_book.best_tick_desc(Side::No);
while let Some(t) = tick {
if t < taker_tick {
break;
}
total += runner_book.level_total_remaining(Side::No, t).0;
if t == 0 {
break;
}
tick = runner_book.next_tick_desc_from(Side::No, t - 1);
}
}
Side::No => {
let mut tick = runner_book.best_tick_asc(Side::Yes);
while let Some(t) = tick {
if t > taker_tick {
break;
}
total += runner_book.level_total_remaining(Side::Yes, t).0;
tick = runner_book.next_tick_asc_from(Side::Yes, t + 1);
}
}
}
Money(total)
}
fn plan_matching(&self, scratch: &mut Scratch, r: MatchRequest) -> (Money, Option<OddsX10000>) {
scratch.fills.clear();
scratch.maker_matched_delta.clear();
scratch.matchable_ticks.clear();
let mut remaining = r.taker_stake;
let mut matched_total = Money::zero();
let mut sum_price_qty: u128 = 0;
let Some(taker_tick) = r.taker_price.tick_index() else {
return (Money::zero(), None);
};
let Some(runner_book) = self.runner_books.get(&r.taker_runner_id) else {
return (Money::zero(), None);
};
match r.taker_side {
Side::Yes => {
let mut tick = runner_book.best_tick_desc(Side::No);
while let Some(t) = tick {
if t < taker_tick {
break;
}
scratch.matchable_ticks.push(t as u16);
if t == 0 {
break;
}
tick = runner_book.next_tick_desc_from(Side::No, t - 1);
}
}
Side::No => {
let mut tick = runner_book.best_tick_asc(Side::Yes);
while let Some(t) = tick {
if t > taker_tick {
break;
}
scratch.matchable_ticks.push(t as u16);
tick = runner_book.next_tick_asc_from(Side::Yes, t + 1);
}
}
}
for &tick_u16 in scratch.matchable_ticks.iter() {
if remaining.0 <= 0 {
break;
}
let tick = tick_u16 as usize;
let level_side = match r.taker_side {
Side::Yes => Side::No,
Side::No => Side::Yes,
};
let level_price = OddsX10000(crate::types::odds::TICK_LADDER[tick]);
for maker_key in runner_book.iter_level_keys(level_side, tick, &self.orders) {
if remaining.0 <= 0 {
break;
}
let maker = self.orders.order_by_key(maker_key);
let maker_order_id = maker.info.order_id;
if maker.info.account_id == r.taker_account_id {
continue;
}
let maker_is_live = matches!(
maker.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
);
if !maker_is_live {
continue;
}
let base_remaining = maker.stake.0.saturating_sub(maker.matched.0);
let delta = *scratch
.maker_matched_delta
.get(&maker_order_id)
.unwrap_or(&0);
let maker_remaining = base_remaining.saturating_sub(delta);
if maker_remaining <= 0 {
continue;
}
let fill_i64 = remaining.0.min(maker_remaining);
let fill_amount = Money(fill_i64);
remaining = Money(remaining.0 - fill_i64);
let maker_remaining_after = Money(maker_remaining.saturating_sub(fill_i64));
let taker_remaining_after = remaining;
*scratch
.maker_matched_delta
.entry(maker_order_id)
.or_insert(0) += fill_i64;
scratch.fills.push(PlannedFill {
maker_order_id,
maker_account_id: maker.info.account_id.clone(),
maker_correlation_id: maker.info.correlation_id.clone(),
maker_runner_id: maker.runner_id,
maker_side: maker.info.side,
price: level_price,
fill_amount,
maker_remaining_after,
taker_remaining_after,
});
matched_total = Money(matched_total.0 + fill_i64);
sum_price_qty =
sum_price_qty.saturating_add(level_price.0 as u128 * fill_i64.max(0) as u128);
}
}
let avg_price = if matched_total.0 > 0 {
let denom = matched_total.0 as u128;
let px = (sum_price_qty / denom).min(u32::MAX as u128) as u32;
Some(OddsX10000(px))
} else {
None
};
for fill in scratch.fills.iter() {
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
correlation_id: fill.maker_correlation_id.clone(),
order_id: fill.maker_order_id,
account_id: fill.maker_account_id.clone(),
role: TradeRole::Maker,
runner_id: fill.maker_runner_id,
runner_label: self.runner_label(fill.maker_runner_id).to_string(),
side: fill.maker_side,
market_phase: self.market_phase,
price: fill.price,
stake: fill.fill_amount,
counter_party: r.taker_order_id,
counter_party_account_id: r.taker_account_id.clone(),
remaining_stake: fill.maker_remaining_after,
matched_delta: fill.fill_amount,
},
));
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
correlation_id: r.taker_correlation_id.clone(),
order_id: r.taker_order_id,
account_id: r.taker_account_id.clone(),
role: TradeRole::Taker,
runner_id: r.taker_runner_id,
runner_label: self.runner_label(r.taker_runner_id).to_string(),
side: r.taker_side,
market_phase: self.market_phase,
price: fill.price,
stake: fill.fill_amount,
counter_party: fill.maker_order_id,
counter_party_account_id: fill.maker_account_id.clone(),
remaining_stake: fill.taker_remaining_after,
matched_delta: fill.fill_amount,
},
));
}
(matched_total, avg_price)
}
fn cmd_cancel_order(
&self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
cancel_cause: CancelCause,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry::from_order_info(order_id, &order.info),
cancel_cause,
cause_detail: Some(cancel_cause.detail().to_string()),
},
));
Ok((events, None))
}
fn remove_from_levels(&mut self, order_id: OrderId) {
if !self.orders.is_in_level(&order_id) {
return;
}
let Some(order) = self.orders.get(&order_id) else {
return;
};
let remaining = order.remaining();
if remaining.0 <= 0 {
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let Some(runner_book) = self.runner_books.get_mut(&order.runner_id) else {
return;
};
runner_book.unlink(&mut self.orders, key, remaining);
}
#[allow(clippy::too_many_arguments)]
fn cmd_reduce_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
old_order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
new_odds: Option<OddsX10000>,
target: Option<ReduceOrderTarget>,
condition: Option<&ReduceOrderCondition>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let Some(old) = self.orders.get(&old_order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
validate_reduce_order_condition(old, condition)?;
let new_price = new_odds.unwrap_or(old.price);
ensure_valid_odds_tick(new_price)?;
let next_stake = reduce_order_target_remaining(old, target)?;
ensure_reduce_only(old, new_price, next_stake)?;
if new_price == old.price && next_stake == old.remaining() {
return Err(RejectReason::NoChange);
}
let is_live = matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let (place_events, place_ok) = if next_stake.0 > 0 {
self.cmd_place_order(
scratch,
ts,
correlation_id,
new_order_id,
account_id.clone(),
old.runner_id,
old.info.side,
new_price,
next_stake,
old.persistence,
TimeInForce::Gtc,
)?
} else {
(EventVec::new(), None)
};
let (cancel_events, _) =
self.cmd_cancel_order(ts, old_order_id, account_id, CancelCause::Reduce)?;
let mut events = cancel_events;
events.extend(place_events);
Ok((events, place_ok))
}
fn cmd_add_runners(
&self,
ts: DateTime,
runner_ids: &[RunnerId],
runner_labels: &[String],
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.runners.is_empty() && runner_ids.len() < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if !self.runners.is_empty() && runner_ids.is_empty() {
return Err(RejectReason::InvalidMarketConfig);
}
Self::validate_runner_group(runner_ids, runner_labels)?;
let (added_runner_ids, added_runner_labels) = self.plan_runner_additions(
runner_ids
.iter()
.copied()
.zip(runner_labels.iter().map(String::as_str)),
)?;
if added_runner_ids.is_empty() {
return Err(RejectReason::InvalidMarketConfig);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::RunnersAdded {
runner_ids: added_runner_ids,
runner_labels: added_runner_labels,
},
));
Ok((events, None))
}
fn cmd_change_runners(
&mut self,
ts: DateTime,
add: &[RunnerChange],
remove: &[RunnerChange],
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
Self::validate_runner_changes(add)?;
Self::validate_runner_changes(remove)?;
let add_runner_ids: HashSet<_> = add.iter().map(|change| change.runner_id).collect();
if remove
.iter()
.any(|change| add_runner_ids.contains(&change.runner_id))
{
return Err(RejectReason::InvalidMarketConfig);
}
let (added_runner_ids, added_runner_labels) = self.plan_runner_additions(
add.iter()
.map(|change| (change.runner_id, change.runner_label.as_str())),
)?;
if self.runners.is_empty() && !added_runner_ids.is_empty() && added_runner_ids.len() < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
let mut removed_runner_ids = Vec::new();
let mut removed_runner_labels = Vec::new();
for change in remove {
let runner_id = change.runner_id;
if !self.runners.contains(&runner_id) || self.removed_runners.contains(&runner_id) {
continue;
}
let Some(existing_label) = self.runner_labels.get(&runner_id) else {
return Err(RejectReason::InvalidMarketConfig);
};
if existing_label != &change.runner_label {
return Err(RejectReason::InvalidMarketConfig);
}
removed_runner_ids.push(runner_id);
removed_runner_labels.push(existing_label.clone());
}
if added_runner_ids.is_empty() && removed_runner_ids.is_empty() {
return Err(RejectReason::NoChange);
}
let mut events = EventVec::new();
if !added_runner_ids.is_empty() {
events.push(self.emit(
ts,
BookEvent::RunnersAdded {
runner_ids: added_runner_ids,
runner_labels: added_runner_labels,
},
));
}
if !removed_runner_ids.is_empty() {
self.emit_runners_removed_and_start_batch(
ts,
removed_runner_ids,
removed_runner_labels,
None,
&mut events,
);
}
Ok((events, None))
}
fn cmd_remove_runner(
&mut self,
ts: DateTime,
runner_id: RunnerId,
reduction_factor_bps: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
self.cmd_remove_runners(ts, &[runner_id], reduction_factor_bps)
}
fn cmd_remove_runners(
&mut self,
ts: DateTime,
runner_ids: &[RunnerId],
reduction_factor_bps: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
if runner_ids.is_empty() {
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_runner_ids: HashSet<_> = runner_ids.iter().copied().collect();
if distinct_runner_ids.len() != runner_ids.len() {
return Err(RejectReason::InvalidMarketConfig);
}
for runner_id in runner_ids {
if !self.runners.contains(runner_id) {
return Err(RejectReason::RunnerNotFound);
}
if self.removed_runners.contains(runner_id) {
return Err(RejectReason::RunnerAlreadyRemoved);
}
}
let mut events = EventVec::new();
let runner_labels: Vec<String> = runner_ids
.iter()
.map(|runner_id| self.runner_label(*runner_id).to_string())
.collect();
self.emit_runners_removed_and_start_batch(
ts,
runner_ids.to_vec(),
runner_labels,
reduction_factor_bps,
&mut events,
);
Ok((events, None))
}
fn cmd_void_trades(
&mut self,
ts: DateTime,
start_time: DateTime,
end_time: DateTime,
void_reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::VoidTrades {
market_phase: self.market_phase,
start_time,
end_time,
void_reason: void_reason.to_string(),
},
));
Ok((events, None))
}
}