use super::common::*;
use super::error::{BookError, RequestedBookState};
use super::protocol::{
command::{
Command, CommandKind, MarketState, Persistence, ReduceOrderCondition, ReduceOrderTarget,
Side, TimeInForce,
},
reject::RejectReason,
};
use crate::book::common::{formulas, types::BookOrderInfo};
use crate::types::*;
use chrono::Utc;
use std::collections::{BTreeSet, HashMap};
use tracing::error;
use super::common::fast::{OrderKey, OrderStore, RunnerBook};
use serde::{Deserialize, Serialize};
type TickIndex = u16;
type RunnerIdx = usize;
type TradePairKey = (OrderId, OrderId);
type RunnerLevelKey = (RunnerIdx, Side, TickIndex);
type RestingLevelOrder = (DateTime, OrderId, OrderKey, Money);
type RestingOrdersByLevel = std::collections::HashMap<RunnerLevelKey, Vec<RestingLevelOrder>>;
#[derive(Debug, Default)]
struct Scratch {
maker_matched_delta: std::collections::HashMap<OrderId, i64>,
matchable: Vec<MatchableOrder>,
fills: Vec<PlannedFill>,
events: EventVec,
}
#[derive(Debug, Clone)]
struct PlannedFill {
maker_order_id: OrderId,
maker_account_id: AccountId,
maker_correlation_id: Option<CorrelationId>,
maker_runner_id: RunnerId,
maker_side: Side,
maker_order_price: OddsX10000,
trade_price: OddsX10000,
maker_fill_amount: Money,
taker_fill_amount: Money,
maker_remaining_after: Money,
taker_remaining_after: Money,
}
#[derive(Debug, Clone)]
struct TradeMatchView {
order_id: OrderId,
role: TradeRole,
runner_id: RunnerId,
counter_party: OrderId,
remaining_stake: Money,
matched_delta: Money,
}
#[derive(Debug, Clone, Copy)]
struct PendingTradeView {
order_id: OrderId,
role: TradeRole,
runner_id: RunnerId,
matched_delta: Money,
}
#[derive(Debug, Clone)]
struct MatchRequest {
ts: DateTime,
taker_order_id: OrderId,
taker_account_id: AccountId,
taker_correlation_id: Option<CorrelationId>,
taker_side: Side,
taker_runner_id: RunnerId,
taker_price: OddsX10000,
taker_stake: Money,
taker_runner_idx: RunnerIdx,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TwoRunnerBookSnapshot {
market_id: MarketId,
market_name: String,
market_kind: MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
next_order_id: u64,
close_batch_max_events: u16,
runner_ids: [RunnerId; 2],
runner_labels: [String; 2],
removed_runners: BTreeSet<RunnerId>,
batch_process: Option<BatchProcessState>,
#[serde(default)]
queued_batches: Vec<BatchProcessDescriptor>,
orders: OrderStore,
runner_matched_volume: [Money; 2],
}
#[derive(Debug)]
pub struct TwoRunnerBook {
market_id: MarketId,
pub(crate) market_name: String,
market_kind: MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
next_order_id: u64,
close_batch_max_events: u16,
runner_ids: [RunnerId; 2],
runner_labels: [String; 2],
removed_runners: BTreeSet<RunnerId>,
runner_books: [RunnerBook; 2],
batch_process: Option<BatchProcessState>,
queued_batches: Vec<BatchProcessDescriptor>,
orders: OrderStore,
active_order_count_cache: usize,
runner_matched_volume: [Money; 2],
pending_trade_views: HashMap<TradePairKey, PendingTradeView>,
scratch: Scratch,
}
impl Serialize for TwoRunnerBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
TwoRunnerBookSnapshot {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
next_order_id: self.next_order_id,
close_batch_max_events: self.close_batch_max_events,
runner_ids: self.runner_ids,
runner_labels: self.runner_labels.clone(),
removed_runners: self.removed_runners.clone(),
batch_process: self.batch_process.clone(),
queued_batches: self.queued_batches.clone(),
orders: self.orders.clone(),
runner_matched_volume: self.runner_matched_volume,
}
.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for TwoRunnerBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = TwoRunnerBookSnapshot::deserialize(deserializer)?;
let mut book = Self {
market_id: snap.market_id,
market_name: snap.market_name,
market_kind: snap.market_kind,
state: snap.state,
market_phase: snap.market_phase,
next_order_id: snap.next_order_id,
close_batch_max_events: snap.close_batch_max_events,
runner_ids: snap.runner_ids,
runner_labels: snap.runner_labels,
removed_runners: snap.removed_runners,
runner_books: [RunnerBook::new(), RunnerBook::new()],
batch_process: snap.batch_process,
queued_batches: snap.queued_batches,
orders: snap.orders,
active_order_count_cache: 0,
runner_matched_volume: snap.runner_matched_volume,
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
};
book.rebuild_ladders();
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}
#[derive(Debug, Clone)]
struct MatchableOrder {
order_id: OrderId,
effective_price: OddsX10000,
created_at: DateTime,
}
impl Clone for TwoRunnerBook {
fn clone(&self) -> Self {
Self {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
next_order_id: self.next_order_id,
close_batch_max_events: self.close_batch_max_events,
runner_ids: self.runner_ids,
runner_labels: self.runner_labels.clone(),
removed_runners: self.removed_runners.clone(),
runner_books: self.runner_books.clone(),
batch_process: self.batch_process.clone(),
queued_batches: self.queued_batches.clone(),
orders: self.orders.clone(),
active_order_count_cache: self.active_order_count_cache,
runner_matched_volume: self.runner_matched_volume,
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
}
impl TwoRunnerBook {
pub fn new(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
runner_a: RunnerId,
runner_b: RunnerId,
) -> Self {
Self::new_with_capacity(
market_id,
market_kind,
market_phase,
runner_a,
runner_b,
20_000,
)
}
pub fn new_with_capacity(
market_id: MarketId,
market_kind: MarketKind,
market_phase: MarketPhase,
runner_a: RunnerId,
runner_b: RunnerId,
order_store_capacity: usize,
) -> Self {
assert_kind_supports_phase(market_kind, market_phase);
assert_ne!(
runner_a, runner_b,
"Two-runner book requires distinct runners"
);
Self {
market_id,
market_name: String::new(),
market_kind,
market_phase,
state: BookMarketState::Open,
next_order_id: 1,
close_batch_max_events: crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS,
runner_ids: [runner_a, runner_b],
runner_labels: [runner_a.to_string(), runner_b.to_string()],
removed_runners: BTreeSet::new(),
runner_books: [RunnerBook::new(), RunnerBook::new()],
batch_process: None,
queued_batches: Vec::new(),
orders: OrderStore::with_capacity(order_store_capacity),
active_order_count_cache: 0,
runner_matched_volume: [Money::zero(), Money::zero()],
pending_trade_views: HashMap::new(),
scratch: Scratch::default(),
}
}
fn runner_index(&self, runner_id: RunnerId) -> Option<usize> {
if runner_id == self.runner_ids[0] {
Some(0)
} else if runner_id == self.runner_ids[1] {
Some(1)
} else {
None
}
}
fn opposite_index(idx: usize) -> usize {
1 - idx
}
pub(crate) fn set_close_batch_max_events(&mut self, batch_max_events: u16) {
crate::book::close_process::set_close_batch_max_events(
&mut self.close_batch_max_events,
batch_max_events,
);
}
pub(crate) fn set_market_state(&mut self, state: BookMarketState) {
self.state = state;
}
pub(crate) fn close_batch_max_events(&self) -> u16 {
self.close_batch_max_events
}
pub(crate) fn set_runner_labels(&mut self, runner_ids: &[RunnerId], runner_labels: &[String]) {
assert_eq!(
runner_ids.len(),
2,
"two-runner market requires 2 runner ids"
);
assert_eq!(
runner_labels.len(),
2,
"two-runner market requires 2 runner labels"
);
for (runner_id, runner_label) in runner_ids.iter().zip(runner_labels.iter()) {
if *runner_id == self.runner_ids[0] {
self.runner_labels[0] = runner_label.clone();
} else if *runner_id == self.runner_ids[1] {
self.runner_labels[1] = runner_label.clone();
} else {
panic!("unknown runner id {:?} for two-runner market", runner_id);
}
}
}
pub fn runner_label(&self, runner_id: RunnerId) -> &str {
if runner_id == self.runner_ids[0] {
&self.runner_labels[0]
} else if runner_id == self.runner_ids[1] {
&self.runner_labels[1]
} else {
panic!("unknown runner id {:?} for two-runner market", runner_id);
}
}
fn rebuild_ladders(&mut self) {
self.runner_books = [RunnerBook::new(), RunnerBook::new()];
self.active_order_count_cache = 0;
let mut per_level: RestingOrdersByLevel = std::collections::HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
if self.removed_runners.contains(&order.runner_id) {
continue;
}
let remaining = order.remaining();
if remaining.0 <= 0 {
continue;
}
let Some(idx) = self.runner_index(order.runner_id) else {
continue;
};
let tick: TickIndex = self.orders.stored_tick(key) as u16;
per_level
.entry((idx, order.info.side, tick))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((idx, _, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
for (_, _, key, remaining) in items {
self.runner_books[idx].insert_tail(&mut self.orders, key, remaining);
}
}
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_kind(&self) -> MarketKind {
self.market_kind
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn market_phase(&self) -> MarketPhase {
self.market_phase
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn get_order(&self, order_id: OrderId) -> Option<&BookOrder> {
self.orders.get(&order_id)
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders.is_in_level(&order_id)
}
pub fn active_order_count(&self) -> usize {
self.active_order_count_cache
}
fn select_cancelled_chunk(
&self,
cursor_after: Option<OrderId>,
max_orders: usize,
should_cancel: impl FnMut(&BookOrder) -> bool,
) -> CancelledOrdersChunk {
collect_cancelled_orders_chunk(
self.orders.iter_sorted_from(cursor_after),
max_orders,
should_cancel,
|order| &order.info,
)
}
pub fn batch_process_state(&self) -> Option<&BatchProcessState> {
self.batch_process.as_ref()
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
self.runner_ids.iter().copied()
}
fn has_active_batch_process(&self) -> bool {
self.batch_process.is_some()
}
fn batch_context_for_mode(&self, batch_mode: BatchMode) -> BatchProcessContext {
match batch_mode {
BatchMode::Close => BatchProcessContext::Close {
total_live_orders: crate::book::close_process::count_live_orders(
self.orders.iter_sorted(),
),
},
_ => BatchProcessContext::None,
}
}
fn select_cancelled_chunk_for_target(
&self,
cursor_after: Option<OrderId>,
batch_max_events: usize,
target: &BatchProcessTarget,
) -> CancelledOrdersChunk {
match target {
BatchProcessTarget::AllLiveOrders => self.select_cancelled_chunk(
cursor_after,
batch_max_events,
crate::book::close_process::is_live_order,
),
BatchProcessTarget::LapseOrders => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
crate::book::close_process::is_live_order(order)
&& order.persistence == Persistence::Lapse
&& !self.removed_runners.contains(&order.runner_id)
})
}
BatchProcessTarget::RunnerRemoval { runner_ids } => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
crate::book::close_process::is_live_order(order)
&& runner_ids.contains(&order.runner_id)
})
}
BatchProcessTarget::Filtered { runner_filter, .. } => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
crate::book::close_process::is_live_order(order)
&& matches_filtered_batch_target(
target,
&order.info,
runner_filter.is_none_or(|runner_id| order.runner_id == runner_id),
)
})
}
}
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
let mut result = RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
};
if depth == 0 {
return result;
}
if self.removed_runners.contains(&runner_id) {
return result;
}
let Some(idx) = self.runner_index(runner_id) else {
return result;
};
let opposite_idx = Self::opposite_index(idx);
let runner_book = &self.runner_books[idx];
let opposite_book = &self.runner_books[opposite_idx];
let mut direct_back_tick = runner_book.best_tick_desc(Side::No);
let mut implied_back_tick = opposite_book.best_tick_asc(Side::Yes);
let mut direct_back: Option<(OddsX10000, Money)> = None;
let mut implied_back: Option<(OddsX10000, Money)> = None;
while result.available_to_back.len() < depth {
if direct_back.is_none() {
while let Some(t) = direct_back_tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
let size = runner_book.level_total_remaining(Side::No, t);
direct_back_tick = if t == 0 {
None
} else {
runner_book.next_tick_desc_from(Side::No, t - 1)
};
if size.0 > 0 {
direct_back = Some((px, size));
break;
}
}
}
if implied_back.is_none() {
while let Some(t) = implied_back_tick {
let maker_px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
let maker_size = opposite_book.level_total_remaining(Side::Yes, t);
implied_back_tick = opposite_book.next_tick_asc_from(Side::Yes, t + 1);
if maker_size.0 > 0 {
let implied_px = formulas::back_to_lay_odds(maker_px);
let taker_cap =
formulas::implied_taker_capacity_from_maker(maker_size, implied_px);
if taker_cap.0 <= 0 {
continue;
}
implied_back = Some((implied_px, taker_cap));
break;
}
}
}
let next = match (direct_back, implied_back) {
(Some(d), Some(i)) => {
if d.0.0 >= i.0.0 {
direct_back = None;
d
} else {
implied_back = None;
i
}
}
(Some(d), None) => {
direct_back = None;
d
}
(None, Some(i)) => {
implied_back = None;
i
}
(None, None) => break,
};
if let Some(last) = result.available_to_back.last_mut()
&& last.price == next.0
{
last.size = Money(last.size.0 + next.1.0);
continue;
}
result.available_to_back.push(PriceSize {
price: next.0,
size: next.1,
});
}
let mut direct_lay_tick = runner_book.best_tick_asc(Side::Yes);
let mut implied_lay_tick = opposite_book.best_tick_desc(Side::No);
let mut direct_lay: Option<(OddsX10000, Money)> = None;
let mut implied_lay: Option<(OddsX10000, Money)> = None;
while result.available_to_lay.len() < depth {
if direct_lay.is_none() {
while let Some(t) = direct_lay_tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
let size = runner_book.level_total_remaining(Side::Yes, t);
direct_lay_tick = runner_book.next_tick_asc_from(Side::Yes, t + 1);
if size.0 > 0 {
direct_lay = Some((px, size));
break;
}
}
}
if implied_lay.is_none() {
while let Some(t) = implied_lay_tick {
let maker_px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
let maker_size = opposite_book.level_total_remaining(Side::No, t);
implied_lay_tick = if t == 0 {
None
} else {
opposite_book.next_tick_desc_from(Side::No, t - 1)
};
if maker_size.0 > 0 {
let implied_px = formulas::lay_to_back_odds(maker_px);
let taker_cap =
formulas::implied_taker_capacity_from_maker(maker_size, implied_px);
if taker_cap.0 <= 0 {
continue;
}
implied_lay = Some((implied_px, taker_cap));
break;
}
}
}
let next = match (direct_lay, implied_lay) {
(Some(d), Some(i)) => {
if d.0.0 <= i.0.0 {
direct_lay = None;
d
} else {
implied_lay = None;
i
}
}
(Some(d), None) => {
direct_lay = None;
d
}
(None, Some(i)) => {
implied_lay = None;
i
}
(None, None) => break,
};
if let Some(last) = result.available_to_lay.last_mut()
&& last.price == next.0
{
last.size = Money(last.size.0 + next.1.0);
continue;
}
result.available_to_lay.push(PriceSize {
price: next.0,
size: next.1,
});
}
result
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_back.into_iter().next()
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<PriceSize> {
let prices = self.runner_prices(runner_id, 1);
prices.available_to_lay.into_iter().next()
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
self.runner_index(runner_id)
.map(|idx| self.runner_matched_volume[idx])
.unwrap_or(Money::zero())
}
pub fn total_matched(&self) -> Money {
Money(self.runner_matched_volume[0].0 + self.runner_matched_volume[1].0)
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(EventVec, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let current_state = self.state;
let current_phase = self.market_phase;
let state_err = |reason, requested_state| {
BookError::state_error(
err_correlation_id.clone(),
reason,
current_state,
current_phase,
requested_state,
)
};
let ts = Utc::now();
let market_id = cmd.market_id;
if market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason, .. } => {
let events = self
.cmd_halt_market(ts, reason)
.map_err(|reason| state_err(reason, RequestedBookState::Halted))?;
return Ok((
events,
CommandResponse {
correlation_id,
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events,
CommandResponse {
correlation_id,
kind: None,
},
));
}
_ => {}
}
cmd.kind
.validate_book_gate(self.batch_process_state(), self.state.is_halted())
.map_err(&err)?;
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(EventVec, CommandResponse), BookError> {
let (events, kind) = match &cmd.kind {
CommandKind::CreateMarket { .. } => {
return Err(err(RejectReason::InternalError));
}
CommandKind::AddRunners { .. } | CommandKind::ChangeRunners { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::SetMarketState { state, .. } => {
let reason = "SET_MARKET_STATE";
match state {
MarketState::Open => self
.cmd_open_market(&mut scratch, ts, reason)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
MarketState::Suspended => self
.cmd_suspend(&mut scratch, ts, reason)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
MarketState::Deactivated => self
.cmd_deactivate(&mut scratch, ts, reason)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
MarketState::Closed => self
.cmd_close_market("SET_MARKET_STATE", ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
}
}
CommandKind::AwaitLiveMarket => self
.cmd_await_live_market(&mut scratch, ts, "AWAIT_LIVE_MARKET")
.map_err(|reason| state_err(reason, RequestedBookState::AwaitLive))?,
CommandKind::ReturnToPreMarket { reason } => self
.cmd_return_to_pre_market(&mut scratch, ts, reason.as_str())
.map_err(|reason| state_err(reason, RequestedBookState::Pre))?,
CommandKind::GoLiveMarket => self
.cmd_go_live_market(&mut scratch, ts, "GO_LIVE_MARKET")
.map_err(|reason| state_err(reason, RequestedBookState::Live))?,
CommandKind::CloseMarket { reason } => self
.cmd_close_market(reason, ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(MarketState::Closed))
})?,
CommandKind::ContinueBatchProcess => {
self.cmd_continue_batch_process(ts).map_err(&err)?
}
CommandKind::BatchCancelOrders {
from_created_at_inclusive,
to_created_at_inclusive,
account_id,
runner_id,
reason,
} => self
.cmd_batch_cancel_orders(
&mut scratch,
ts,
*from_created_at_inclusive,
*to_created_at_inclusive,
account_id.clone(),
*runner_id,
reason.as_str(),
)
.map_err(&err)?,
CommandKind::PlaceOrder {
account_id,
runner_id,
side,
odds,
stake,
persistence,
time_in_force,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
let order_id = OrderId(self.next_order_id);
self.cmd_place_order(
&mut scratch,
ts,
correlation_id.clone(),
order_id,
account_id.clone(),
*runner_id,
*side,
*odds,
*stake,
*persistence,
*time_in_force,
)
.map_err(&err)?
}
CommandKind::PlaceBinaryOrder { .. } | CommandKind::ReduceBinaryOrder { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::CancelOrder {
order_id,
account_id,
..
} => self
.cmd_cancel_order(
&mut scratch,
ts,
*order_id,
account_id.clone(),
CancelCause::UserCancel,
)
.map_err(&err)?,
CommandKind::ReduceOrder {
order_id,
account_id,
new_odds,
target,
condition,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
self.cmd_reduce_order(
&mut scratch,
ts,
correlation_id.clone(),
*order_id,
OrderId(self.next_order_id),
account_id.clone(),
*new_odds,
*target,
condition.as_ref(),
)
.map_err(&err)?
}
CommandKind::RemoveRunner {
runner_id,
reduction_factor_bps,
..
} => self
.cmd_remove_runner(&mut scratch, ts, *runner_id, *reduction_factor_bps)
.map_err(&err)?,
CommandKind::RemoveRunners {
runner_ids,
reduction_factor_bps,
..
} => self
.cmd_remove_runners(&mut scratch, ts, runner_ids, *reduction_factor_bps)
.map_err(&err)?,
CommandKind::VoidTrades {
timestamp,
start_time,
end_time,
void_reason,
..
} => self
.cmd_void_trades(
&mut scratch,
*timestamp,
*start_time,
*end_time,
void_reason.as_str(),
)
.map_err(&err)?,
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("handled above")
}
};
Ok((
events,
CommandResponse {
correlation_id: correlation_id.clone(),
kind,
},
))
})();
self.scratch = scratch;
result
}
pub(crate) fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
let event = &env.event;
let mut batch_process = self.batch_process.take();
let mut queued_batches = std::mem::take(&mut self.queued_batches);
let handled = if apply_batch_process_state_event(
&mut batch_process,
&mut queued_batches,
event,
|batch_mode| self.batch_context_for_mode(batch_mode),
) {
true
} else {
apply_batch_cancelled_chunk_event(&mut batch_process, event, |order_id| {
self.order_cancelled(order_id)
})
};
self.batch_process = batch_process;
self.queued_batches = queued_batches;
if handled {
return;
}
match event {
BookEvent::MarketCreated { .. } => {}
BookEvent::RunnersAdded { .. } => {}
BookEvent::RunnersRemoved { runner_ids, .. } => {
self.apply_runners_removed(runner_ids);
}
BookEvent::MarketStateChanged {
to,
close_batch_max_events: _,
..
} => self.market_state_changed(*to),
BookEvent::MarketPhaseChanged { to, reason: _ } => self.market_phase_changed(*to),
BookEvent::OrderAccepted {
correlation_id,
order_id,
account_id,
runner_id,
side,
price,
stake,
persistence,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
self.order_accepted(
ts,
*order_id,
account_id.clone(),
correlation_id.clone(),
*runner_id,
*side,
*price,
*stake,
*persistence,
)
}
BookEvent::BinaryOrderAccepted { .. } => {}
BookEvent::OrderCancelled {
cancelled_order, ..
} => {
self.order_cancelled(cancelled_order.order_id);
}
BookEvent::TradeMatched {
order_id,
role,
runner_id,
counter_party,
remaining_stake,
matched_delta,
..
} => self.trade_matched(
ts,
TradeMatchView {
order_id: *order_id,
role: *role,
runner_id: *runner_id,
counter_party: *counter_party,
remaining_stake: *remaining_stake,
matched_delta: *matched_delta,
},
),
BookEvent::BinaryTradeMatched { .. } => {}
BookEvent::VoidTrades { .. } => {}
BookEvent::RunnerRemoved { runner_id, .. } => {
self.apply_runners_removed(&[*runner_id]);
}
BookEvent::MarketRemoved { .. } => {}
BookEvent::OrderCancelledBatched { .. }
| BookEvent::BatchProcessQueued { .. }
| BookEvent::BatchProcessStarted { .. }
| BookEvent::BatchProcessRetargeted { .. }
| BookEvent::BatchProcessCompleted { .. } => {
unreachable!("batch process events handled before match")
}
}
}
pub(crate) fn market_state_changed(&mut self, to: BookMarketState) {
self.state = to;
}
pub(crate) fn market_phase_changed(&mut self, to: MarketPhase) {
let from = self.market_phase;
assert!(
self.market_kind.can_transition_to_phase(from, to),
"market kind does not support applied phase transition"
);
self.market_phase = to;
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn order_accepted(
&mut self,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
correlation_id: Option<CorrelationId>,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
) {
if self.orders.contains(&order_id) {
return;
}
let Some(runner_idx) = self.runner_index(runner_id) else {
return;
};
if let Err(err) = self.orders.insert(
order_id,
BookOrder {
info: BookOrderInfo {
order_id,
account_id,
correlation_id,
side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
runner_id,
price,
stake,
matched: Money::zero(),
persistence,
},
) {
error!(order_id = ?order_id, reason = ?err, "failed to insert order");
return;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
let Some(key) = self.orders.get_key(&order_id) else {
error!(order_id = ?order_id, "missing order key after insert");
return;
};
self.runner_books[runner_idx].insert_tail(&mut self.orders, key, stake);
}
fn trade_pair_key(order_id: OrderId, counter_party: OrderId) -> TradePairKey {
if order_id <= counter_party {
(order_id, counter_party)
} else {
(counter_party, order_id)
}
}
fn trade_matched(&mut self, ts: DateTime, m: TradeMatchView) {
let was_live = self.orders.get(&m.order_id).is_some_and(|o| {
matches!(
o.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
});
if let Some(order) = self.orders.get_mut(&m.order_id) {
order.matched = Money(order.matched.0 + m.matched_delta.0);
order.info.last_updated_at = ts;
order.info.state = if m.remaining_stake.0 == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if let Some(order) = self.orders.get(&m.order_id)
&& let Some(runner_idx) = self.runner_index(order.runner_id)
{
self.update_level_after_fill(m.order_id, runner_idx, m.matched_delta);
}
if m.remaining_stake.0 == 0 {
if was_live {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.orders.remove(&m.order_id);
}
let current = PendingTradeView {
order_id: m.order_id,
role: m.role,
runner_id: m.runner_id,
matched_delta: m.matched_delta,
};
let key = Self::trade_pair_key(m.order_id, m.counter_party);
if let Some(first) = self.pending_trade_views.remove(&key) {
assert_eq!(first.order_id, m.counter_party);
let (maker, taker) = match (first.role, current.role) {
(TradeRole::Maker, TradeRole::Taker) => (first, current),
(TradeRole::Taker, TradeRole::Maker) => (current, first),
_ => {
debug_assert!(
false,
"trade pair must contain exactly one maker and one taker"
);
return;
}
};
if let Some(taker_idx) = self.runner_index(taker.runner_id) {
self.runner_matched_volume[taker_idx] =
Money(self.runner_matched_volume[taker_idx].0 + taker.matched_delta.0);
}
if maker.runner_id != taker.runner_id
&& let Some(maker_idx) = self.runner_index(maker.runner_id)
{
self.runner_matched_volume[maker_idx] =
Money(self.runner_matched_volume[maker_idx].0 + maker.matched_delta.0);
}
} else {
self.pending_trade_views.insert(key, current);
}
}
pub(crate) fn order_cancelled(&mut self, order_id: OrderId) {
if let Some(order) = self.orders.get(&order_id)
&& matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
{
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.remove_from_levels(order_id);
self.orders.remove(&order_id);
}
fn apply_runners_removed(&mut self, runner_ids: &[RunnerId]) {
for runner_id in runner_ids {
if self.runner_index(*runner_id).is_some() {
self.removed_runners.insert(*runner_id);
}
}
self.unlink_live_orders_for_runners(runner_ids);
}
fn unlink_live_orders_for_runners(&mut self, runner_ids: &[RunnerId]) {
let runner_ids: BTreeSet<_> = runner_ids.iter().copied().collect();
let order_ids: Vec<OrderId> = self
.orders
.iter_sorted()
.filter_map(|(order_id, order)| {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
);
if is_live && runner_ids.contains(&order.runner_id) {
Some(order_id)
} else {
None
}
})
.collect();
for order_id in order_ids {
self.remove_from_levels(order_id);
}
}
fn emit(&self, event_time: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_seq: 0,
timestamp: event_time,
metadata: EventMetadata::default(),
event,
}
}
fn state_change(
&self,
ts: DateTime,
to: BookMarketState,
reason: &str,
close_batch_max_events: Option<u16>,
out: &mut EventVec,
) {
if self.state == to {
return;
}
out.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
close_batch_max_events,
},
));
}
fn phase_change(&self, ts: DateTime, to: MarketPhase, reason: &str, out: &mut EventVec) {
if self.market_phase == to {
return;
}
out.push(self.emit(
ts,
BookEvent::MarketPhaseChanged {
to,
reason: reason.to_string(),
},
));
}
fn ok_noop(events: EventVec) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
Ok((events, None))
}
fn cmd_open_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
ensure_state_change(self.state, BookMarketState::Open)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
self.state_change(ts, BookMarketState::Open, reason, None, &mut scratch.events);
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_await_live_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
let plan =
plan_await_live_market_with_lapse(self.state, self.market_phase, self.market_kind)?;
if plan.suspend_market {
self.state_change(
ts,
BookMarketState::Suspended,
reason,
None,
&mut scratch.events,
);
}
if plan.set_pre_await_live {
self.phase_change(ts, MarketPhase::PreAwaitLive, reason, &mut scratch.events);
}
if plan.start_lapse {
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::InPlayLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
}
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_go_live_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
plan_go_live_market_with_lapse(self.state, self.market_phase, self.market_kind)?;
self.phase_change(ts, MarketPhase::Live, reason, &mut scratch.events);
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::InPlayLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_return_to_pre_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
plan_return_to_pre_market(self.state, self.market_phase, self.market_kind)?;
self.phase_change(ts, MarketPhase::Pre, reason, &mut scratch.events);
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_suspend(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
ensure_state_change(self.state, BookMarketState::Suspended)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if !matches!(
self.state,
BookMarketState::Open | BookMarketState::Deactivated
) {
return Err(RejectReason::MarketNotOpen);
}
if self.state == BookMarketState::Open {
self.state_change(
ts,
BookMarketState::Suspended,
reason,
None,
&mut scratch.events,
);
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::SuspendLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
return Self::ok_noop(std::mem::take(&mut scratch.events));
}
self.state_change(
ts,
BookMarketState::Suspended,
reason,
None,
&mut scratch.events,
);
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_deactivate(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &'static str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
ensure_state_change(self.state, BookMarketState::Deactivated)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if !matches!(
self.state,
BookMarketState::Open | BookMarketState::Suspended
) {
return Err(RejectReason::MarketNotOpen);
}
self.state_change(
ts,
BookMarketState::Deactivated,
reason,
None,
&mut scratch.events,
);
if self.state == BookMarketState::Open {
maybe_start_or_queue_batch(
self.batch_process.as_ref(),
&self.queued_batches,
BatchMode::SuspendLapse,
self.close_batch_max_events,
&BatchProcessTarget::LapseOrders,
None,
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
}
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_halt_market(&self, ts: DateTime, reason: &str) -> Result<EventVec, RejectReason> {
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Halted, reason, None, &mut events);
Ok(events)
}
fn cmd_resume_market(&self, ts: DateTime) -> Result<EventVec, RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let mut events = EventVec::new();
self.state_change(ts, BookMarketState::Open, "RESUME", None, &mut events);
Ok(events)
}
fn cmd_close_market(
&mut self,
reason: &str,
ts: DateTime,
batch_max_events: u16,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
ensure_state_change(self.state, BookMarketState::Closed)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self
.batch_process_state()
.is_some_and(BatchProcessState::is_close)
{
return Err(RejectReason::MarketBatchCancelling);
}
let mut events = EventVec::new();
self.state_change(
ts,
BookMarketState::Closed,
reason,
Some(batch_max_events),
&mut events,
);
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::Close,
batch_max_events,
&BatchProcessTarget::AllLiveOrders,
None,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Self::ok_noop(events)
}
fn cmd_continue_batch_process(
&self,
ts: DateTime,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
let mut events = EventVec::new();
continue_batch_process_with_queue(
self.batch_process_state(),
&self.queued_batches,
&mut events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
)?;
Self::ok_noop(events)
}
#[allow(clippy::too_many_arguments)]
fn cmd_batch_cancel_orders(
&self,
scratch: &mut Scratch,
ts: DateTime,
from_created_at_inclusive: Option<DateTime>,
to_created_at_inclusive: Option<DateTime>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
let batch_max_events = self.close_batch_max_events;
let from_ms_opt = from_created_at_inclusive.map(|d| d.timestamp_millis());
let to_ms_opt = to_created_at_inclusive.map(|d| d.timestamp_millis());
let started_at_ms = ts.timestamp_millis();
let target = filtered_batch_target(
started_at_ms,
from_ms_opt,
to_ms_opt,
account_filter,
runner_filter,
);
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::FilteredCancel,
batch_max_events,
&target,
Some(reason),
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Self::ok_noop(std::mem::take(&mut scratch.events))
}
#[allow(clippy::too_many_arguments)]
fn cmd_place_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
stake: Money,
persistence: Persistence,
time_in_force: TimeInForce,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
debug_assert!(self.state.is_matchable());
let Some(runner_idx) = self.runner_index(runner_id) else {
return Err(RejectReason::RunnerNotFound);
};
if self.removed_runners.contains(&runner_id) {
return Err(RejectReason::RunnerNotFound);
}
ensure_valid_odds_tick(price)?;
if !stake.is_positive() {
return Err(RejectReason::InvalidStake);
}
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let Some(required_qty) = time_in_force.required_fok_fill(Quantity(
u64::try_from(stake.0).map_err(|_| RejectReason::InvalidStake)?,
))? {
let required =
Money(i64::try_from(required_qty.0).map_err(|_| RejectReason::InvalidStake)?);
let available = self.available_to_match(runner_idx, side, price);
if available.0 < required.0 {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderAccepted {
correlation_id: correlation_id.clone(),
order_id,
account_id: account_id.clone(),
runner_id,
runner_label: self.runner_label(runner_id).to_string(),
side,
price,
stake,
persistence,
time_in_force,
},
));
let (matched, avg_price) = self.plan_matching(
scratch,
MatchRequest {
ts,
taker_order_id: order_id,
taker_account_id: account_id.clone(),
taker_correlation_id: correlation_id.clone(),
taker_side: side,
taker_runner_id: runner_id,
taker_price: price,
taker_stake: stake,
taker_runner_idx: runner_idx,
},
);
let mut final_remaining = Money(stake.0.saturating_sub(matched.0));
let mut final_state = if final_remaining.0 == 0 {
BookOrderState::ExecutionComplete
} else if matched.0 > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let remainder_cancel_cause =
time_in_force_remainder_cancel_cause(time_in_force, final_remaining.0 > 0);
if let Some(cancel_cause) = remainder_cancel_cause {
final_remaining = Money::zero();
final_state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry {
order_id,
account_id,
correlation_id,
},
cancel_cause,
cause_detail: None,
},
));
}
Ok((
std::mem::take(&mut scratch.events),
Some(CommandResponseKind::PlaceOrder(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Stake(matched),
avg_matched_price: avg_price.map(FillPrice::Odds),
remaining: FillQuantity::Stake(final_remaining),
final_order_state: Some(final_state),
})),
))
}
fn available_to_match(&self, runner_idx: usize, side: Side, price: OddsX10000) -> Money {
let opposite_idx = Self::opposite_index(runner_idx);
let derived_price = match side {
Side::Yes => formulas::max_opposite_back_odds_for_taker_back(price),
Side::No => formulas::min_opposite_lay_odds_for_taker_lay(price),
};
let mut total = 0i64;
match side {
Side::Yes => {
let rb = &self.runner_books[runner_idx];
let mut tick = rb.best_tick_desc(Side::No);
while let Some(t) = tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if px.0 < price.0 {
break;
}
total += rb.level_total_remaining(Side::No, t).0;
if t == 0 {
break;
}
tick = rb.next_tick_desc_from(Side::No, t - 1);
}
let rb = &self.runner_books[opposite_idx];
let mut tick = rb.best_tick_asc(Side::Yes);
while let Some(t) = tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if px.0 > derived_price.0 {
break;
}
let effective = formulas::back_to_lay_odds(px);
if effective.0 < price.0 {
break;
}
let cap = formulas::implied_taker_capacity_from_maker(
rb.level_total_remaining(Side::Yes, t),
effective,
);
total += cap.0;
tick = rb.next_tick_asc_from(Side::Yes, t + 1);
}
}
Side::No => {
let rb = &self.runner_books[runner_idx];
let mut tick = rb.best_tick_asc(Side::Yes);
while let Some(t) = tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if px.0 > price.0 {
break;
}
total += rb.level_total_remaining(Side::Yes, t).0;
tick = rb.next_tick_asc_from(Side::Yes, t + 1);
}
let rb = &self.runner_books[opposite_idx];
let mut tick = rb.best_tick_desc(Side::No);
while let Some(t) = tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if px.0 < derived_price.0 {
break;
}
let effective = formulas::lay_to_back_odds(px);
if effective.0 > price.0 {
break;
}
let cap = formulas::implied_taker_capacity_from_maker(
rb.level_total_remaining(Side::No, t),
effective,
);
total += cap.0;
if t == 0 {
break;
}
tick = rb.next_tick_desc_from(Side::No, t - 1);
}
}
}
Money(total)
}
fn plan_matching(&self, scratch: &mut Scratch, r: MatchRequest) -> (Money, Option<OddsX10000>) {
let opposite_idx = Self::opposite_index(r.taker_runner_idx);
let derived_taker_price = match r.taker_side {
Side::Yes => formulas::max_opposite_back_odds_for_taker_back(r.taker_price),
Side::No => formulas::min_opposite_lay_odds_for_taker_lay(r.taker_price),
};
scratch.maker_matched_delta.clear();
scratch.matchable.clear();
scratch.fills.clear();
match r.taker_side {
Side::Yes => {
let rb = &self.runner_books[r.taker_runner_idx];
let mut tick = rb.best_tick_desc(Side::No);
while let Some(t) = tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if px.0 < r.taker_price.0 {
break;
}
for maker_key in rb.iter_level_keys(Side::No, t, &self.orders) {
let maker = self.orders.order_by_key(maker_key);
scratch.matchable.push(MatchableOrder {
order_id: maker.info.order_id,
effective_price: px,
created_at: maker.info.created_at,
});
}
if t == 0 {
break;
}
tick = rb.next_tick_desc_from(Side::No, t - 1);
}
let rb = &self.runner_books[opposite_idx];
let mut tick = rb.best_tick_asc(Side::Yes);
while let Some(t) = tick {
let maker_px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if maker_px.0 > derived_taker_price.0 {
break;
}
let effective = formulas::back_to_lay_odds(maker_px);
if effective.0 < r.taker_price.0 {
break;
}
for maker_key in rb.iter_level_keys(Side::Yes, t, &self.orders) {
let maker = self.orders.order_by_key(maker_key);
scratch.matchable.push(MatchableOrder {
order_id: maker.info.order_id,
effective_price: effective,
created_at: maker.info.created_at,
});
}
tick = rb.next_tick_asc_from(Side::Yes, t + 1);
}
scratch.matchable.sort_by(|a, b| {
b.effective_price
.0
.cmp(&a.effective_price.0)
.then_with(|| a.created_at.cmp(&b.created_at))
.then_with(|| a.order_id.cmp(&b.order_id))
});
}
Side::No => {
let rb = &self.runner_books[r.taker_runner_idx];
let mut tick = rb.best_tick_asc(Side::Yes);
while let Some(t) = tick {
let px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if px.0 > r.taker_price.0 {
break;
}
for maker_key in rb.iter_level_keys(Side::Yes, t, &self.orders) {
let maker = self.orders.order_by_key(maker_key);
scratch.matchable.push(MatchableOrder {
order_id: maker.info.order_id,
effective_price: px,
created_at: maker.info.created_at,
});
}
tick = rb.next_tick_asc_from(Side::Yes, t + 1);
}
let rb = &self.runner_books[opposite_idx];
let mut tick = rb.best_tick_desc(Side::No);
while let Some(t) = tick {
let maker_px = OddsX10000(crate::types::odds::TICK_LADDER[t]);
if maker_px.0 < derived_taker_price.0 {
break;
}
let effective = formulas::lay_to_back_odds(maker_px);
if effective.0 > r.taker_price.0 {
break;
}
for maker_key in rb.iter_level_keys(Side::No, t, &self.orders) {
let maker = self.orders.order_by_key(maker_key);
scratch.matchable.push(MatchableOrder {
order_id: maker.info.order_id,
effective_price: effective,
created_at: maker.info.created_at,
});
}
if t == 0 {
break;
}
tick = rb.next_tick_desc_from(Side::No, t - 1);
}
scratch.matchable.sort_by(|a, b| {
a.effective_price
.0
.cmp(&b.effective_price.0)
.then_with(|| a.created_at.cmp(&b.created_at))
.then_with(|| a.order_id.cmp(&b.order_id))
});
}
}
let mut remaining = r.taker_stake;
let mut matched_total = Money::zero();
let mut sum_price_qty: u128 = 0;
for m in scratch.matchable.iter() {
if remaining.0 <= 0 {
break;
}
let Some(maker) = self.orders.get(&m.order_id) else {
continue;
};
if maker.info.account_id == r.taker_account_id {
continue;
}
let maker_is_live = matches!(
maker.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !maker_is_live {
continue;
}
let base_remaining = maker.stake.0.saturating_sub(maker.matched.0);
let delta = *scratch.maker_matched_delta.get(&m.order_id).unwrap_or(&0);
let maker_remaining = base_remaining.saturating_sub(delta);
if maker_remaining <= 0 {
continue;
}
let is_implied = maker.runner_id != r.taker_runner_id;
let (fill_i64, maker_fill_i64) = if is_implied {
let max_taker = formulas::implied_taker_capacity_from_maker(
Money(maker_remaining),
m.effective_price,
)
.0;
let fill_i64 = remaining.0.min(max_taker);
if fill_i64 <= 0 {
continue;
}
let maker_fill =
formulas::implied_maker_stake_from_taker(Money(fill_i64), m.effective_price)
.0
.min(maker_remaining);
if maker_fill <= 0 {
continue;
}
(fill_i64, maker_fill)
} else {
let fill_i64 = remaining.0.min(maker_remaining);
(fill_i64, fill_i64)
};
let taker_fill_amount = Money(fill_i64);
remaining = Money(remaining.0 - fill_i64);
let maker_remaining_after = Money(maker_remaining.saturating_sub(maker_fill_i64));
let taker_remaining_after = remaining;
let trade_price = m.effective_price;
*scratch.maker_matched_delta.entry(m.order_id).or_insert(0) += maker_fill_i64;
scratch.fills.push(PlannedFill {
maker_order_id: m.order_id,
maker_account_id: maker.info.account_id.clone(),
maker_correlation_id: maker.info.correlation_id.clone(),
maker_runner_id: maker.runner_id,
maker_side: maker.info.side,
maker_order_price: maker.price,
trade_price,
maker_fill_amount: Money(maker_fill_i64),
taker_fill_amount,
maker_remaining_after,
taker_remaining_after,
});
matched_total = Money(matched_total.0 + taker_fill_amount.0);
sum_price_qty =
sum_price_qty.saturating_add(trade_price.0 as u128 * fill_i64.max(0) as u128);
}
for fill in scratch.fills.iter() {
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
correlation_id: fill.maker_correlation_id.clone(),
order_id: fill.maker_order_id,
account_id: fill.maker_account_id.clone(),
role: TradeRole::Maker,
runner_id: fill.maker_runner_id,
runner_label: self.runner_label(fill.maker_runner_id).to_string(),
side: fill.maker_side,
market_phase: self.market_phase,
price: fill.maker_order_price,
stake: fill.maker_fill_amount,
counter_party: r.taker_order_id,
counter_party_account_id: r.taker_account_id.clone(),
remaining_stake: fill.maker_remaining_after,
matched_delta: fill.maker_fill_amount,
},
));
scratch.events.push(self.emit(
r.ts,
BookEvent::TradeMatched {
correlation_id: r.taker_correlation_id.clone(),
order_id: r.taker_order_id,
account_id: r.taker_account_id.clone(),
role: TradeRole::Taker,
runner_id: r.taker_runner_id,
runner_label: self.runner_label(r.taker_runner_id).to_string(),
side: r.taker_side,
market_phase: self.market_phase,
price: fill.trade_price,
stake: fill.taker_fill_amount,
counter_party: fill.maker_order_id,
counter_party_account_id: fill.maker_account_id.clone(),
remaining_stake: fill.taker_remaining_after,
matched_delta: fill.taker_fill_amount,
},
));
}
let avg_price = if matched_total.0 > 0 {
let denom = matched_total.0 as u128;
let px = (sum_price_qty / denom).min(u32::MAX as u128) as u32;
Some(OddsX10000(px))
} else {
None
};
(matched_total, avg_price)
}
fn update_level_after_fill(
&mut self,
maker_order_id: OrderId,
runner_idx: usize,
fill_amount: Money,
) {
let (maker_side, maker_remaining) = match self.orders.get(&maker_order_id) {
Some(o) => (o.info.side, o.remaining()),
None => return,
};
let Some(maker_key) = self.orders.get_key(&maker_order_id) else {
return;
};
let maker_tick = self.orders.stored_tick(maker_key);
self.runner_books[runner_idx].decrement_level_total(maker_side, maker_tick, fill_amount);
if maker_remaining.0 == 0 && self.orders.in_level_by_key(maker_key) {
self.runner_books[runner_idx].unlink(&mut self.orders, maker_key, Money::zero());
}
}
fn cmd_cancel_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
cancel_cause: CancelCause,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry::from_order_info(order_id, &order.info),
cancel_cause,
cause_detail: Some(cancel_cause.detail().to_string()),
},
));
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn remove_from_levels(&mut self, order_id: OrderId) {
if !self.orders.is_in_level(&order_id) {
return;
};
let Some(order) = self.orders.get(&order_id) else {
return;
};
let remaining = order.remaining();
if remaining.0 <= 0 {
return;
}
let Some(idx) = self.runner_index(order.runner_id) else {
return;
};
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
self.runner_books[idx].unlink(&mut self.orders, key, remaining);
}
#[allow(clippy::too_many_arguments)]
fn cmd_reduce_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
correlation_id: Option<CorrelationId>,
old_order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
new_odds: Option<OddsX10000>,
target: Option<ReduceOrderTarget>,
condition: Option<&ReduceOrderCondition>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
let Some(old) = self.orders.get(&old_order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
validate_reduce_order_condition(old, condition)?;
let new_price = new_odds.unwrap_or(old.price);
ensure_valid_odds_tick(new_price)?;
let next_stake = reduce_order_target_remaining(old, target)?;
ensure_reduce_only(old, new_price, next_stake)?;
if new_price == old.price && next_stake == old.remaining() {
return Err(RejectReason::NoChange);
}
let is_live = matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
return Err(RejectReason::OrderNotLive);
}
let (place_events, place_ok) = if next_stake.0 > 0 {
self.cmd_place_order(
scratch,
ts,
correlation_id,
new_order_id,
account_id.clone(),
old.runner_id,
old.info.side,
new_price,
next_stake,
old.persistence,
TimeInForce::Gtc,
)?
} else {
(EventVec::new(), None)
};
let (cancel_events, _) =
self.cmd_cancel_order(scratch, ts, old_order_id, account_id, CancelCause::Reduce)?;
scratch.events.extend(cancel_events);
scratch.events.extend(place_events);
Ok((std::mem::take(&mut scratch.events), place_ok))
}
fn cmd_remove_runner(
&self,
scratch: &mut Scratch,
ts: DateTime,
runner_id: RunnerId,
reduction_factor_bps: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
self.cmd_remove_runners(scratch, ts, &[runner_id], reduction_factor_bps)
}
fn cmd_remove_runners(
&self,
scratch: &mut Scratch,
ts: DateTime,
runner_ids: &[RunnerId],
reduction_factor_bps: Option<u32>,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
if runner_ids.is_empty() {
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_runner_ids: BTreeSet<_> = runner_ids.iter().copied().collect();
if distinct_runner_ids.len() != runner_ids.len() {
return Err(RejectReason::InvalidMarketConfig);
}
for runner_id in runner_ids {
let Some(_) = self.runner_index(*runner_id) else {
return Err(RejectReason::RunnerNotFound);
};
if self.removed_runners.contains(runner_id) {
return Err(RejectReason::RunnerAlreadyRemoved);
}
}
let runner_labels: Vec<String> = runner_ids
.iter()
.map(|runner_id| self.runner_label(*runner_id).to_string())
.collect();
scratch.events.push(self.emit(
ts,
BookEvent::RunnersRemoved {
runner_ids: runner_ids.to_vec(),
runner_labels,
reduction_factor_bps,
},
));
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::RunnerRemovalCancel,
self.close_batch_max_events,
&BatchProcessTarget::RunnerRemoval {
runner_ids: runner_ids.to_vec(),
},
None,
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Self::ok_noop(std::mem::take(&mut scratch.events))
}
fn cmd_void_trades(
&self,
scratch: &mut Scratch,
ts: DateTime,
start_time: DateTime,
end_time: DateTime,
void_reason: &str,
) -> Result<(EventVec, Option<CommandResponseKind>), RejectReason> {
scratch.events.clear();
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
scratch.events.push(self.emit(
ts,
BookEvent::VoidTrades {
market_phase: self.market_phase,
start_time,
end_time,
void_reason: void_reason.to_string(),
},
));
Self::ok_noop(std::mem::take(&mut scratch.events))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::book::common::types::{BookOrder, BookOrderInfo, BookOrderState};
use crate::book::protocol::command::{Persistence, Side};
fn sample_order(order_id: OrderId, side: Side, price: OddsX10000) -> BookOrder {
BookOrder {
info: BookOrderInfo {
order_id,
account_id: crate::types::AccountId::from(1_u64),
correlation_id: None,
side,
state: BookOrderState::ExecutableUnmatched,
created_at: chrono::Utc::now(),
last_updated_at: chrono::Utc::now(),
},
runner_id: RunnerId(1),
price,
stake: Money(100),
matched: Money::zero(),
persistence: Persistence::Persist,
}
}
#[test]
fn snapshot_roundtrip_rebuilds_lazy_pages_across_multiple_page_bands() {
let mut book = TwoRunnerBook::new_with_capacity(
MarketId(1),
MarketKind::InPlayCapable,
MarketPhase::Pre,
RunnerId(1),
RunnerId(2),
0,
);
let key_a = book
.orders
.insert(
OrderId(1),
sample_order(
OrderId(1),
Side::Back,
OddsX10000(crate::types::odds::TICK_LADDER[10]),
),
)
.expect("valid order");
let mut order_b = sample_order(
OrderId(2),
Side::Back,
OddsX10000(crate::types::odds::TICK_LADDER[130]),
);
order_b.stake = Money(200);
let key_b = book
.orders
.insert(OrderId(2), order_b)
.expect("valid order");
book.runner_books[0].insert_tail(&mut book.orders, key_a, Money(100));
book.runner_books[0].insert_tail(&mut book.orders, key_b, Money(200));
let json = serde_json::to_string(&book).expect("serialize snapshot");
let restored: TwoRunnerBook = serde_json::from_str(&json).expect("deserialize snapshot");
assert_eq!(restored.runner_books[0].best_tick_asc(Side::Yes), Some(10));
assert_eq!(
restored.runner_books[0].next_tick_asc_from(Side::Yes, 11),
Some(130)
);
assert_eq!(
restored.runner_books[0].level_total_remaining(Side::Yes, 10),
Money(100)
);
assert_eq!(
restored.runner_books[0].level_total_remaining(Side::Yes, 130),
Money(200)
);
assert_eq!(
restored.runner_books[0]
.iter_level_keys(Side::Yes, 10, &restored.orders)
.count(),
1
);
assert_eq!(
restored.runner_books[0]
.iter_level_keys(Side::Yes, 130, &restored.orders)
.count(),
1
);
}
}