use crate::book::common::events::{
BookEvent, BookEventEnvelope, CancelCause, CancelledOrderEntry, EventMetadata, EventVec,
TradeRole, time_in_force_remainder_cancel_cause,
};
use crate::book::common::response::{CommandResponse, CommandResponseKind, PlaceOrderResult};
use crate::book::common::types::BookOrderInfo;
use crate::book::common::types::{
BatchMode, BatchProcessContext, BatchProcessDescriptor, BatchProcessState, BatchProcessTarget,
BinaryDepth, BinaryPriceSize, BookMarketState, BookOrderState, RunnerPrices,
};
use crate::book::common::{
CancelledOrdersChunk, apply_batch_cancelled_chunk_event, apply_batch_process_state_event,
collect_cancelled_orders_chunk, continue_batch_process, ensure_can_accept_new_orders,
ensure_state_change, filtered_batch_target, formulas, matches_filtered_batch_target,
maybe_start_or_retarget_batch, plan_await_live_market_with_lapse,
plan_go_live_market_with_lapse, plan_return_to_pre_market,
};
use crate::book::error::{BookError, RequestedBookState};
use crate::book::protocol::command::{
Command, CommandKind, MarketState, ReduceBinaryOrderCondition, ReduceBinaryOrderTarget, Side,
TimeInForce,
};
use crate::book::protocol::reject::RejectReason;
use crate::types::{
AccountId, CorrelationId, DateTime, FillPrice, FillQuantity, MarketId, MarketPhase, Money,
OrderId, Quantity, RunnerId,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::collections::BTreeMap;
use std::num::NonZeroU32;
use tracing::error;
const LEVEL_PAGE_LEN: usize = 64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(transparent)]
pub(crate) struct OrderKey(NonZeroU32);
impl OrderKey {
fn from_slab_index(idx: usize) -> Self {
let raw = u32::try_from(idx)
.expect("order store exceeded u32::MAX slab slots")
.checked_add(1)
.expect("order store key overflow");
Self(NonZeroU32::new(raw).expect("order store keys are 1-based"))
}
fn slab_index(self) -> usize {
(self.0.get() - 1) as usize
}
}
#[derive(Debug, Clone, Copy, Default)]
struct Links {
prev: Option<OrderKey>,
next: Option<OrderKey>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BinaryOrder {
info: BookOrderInfo,
price_ticks: u16,
qty_shares: u64,
filled_shares: u64,
time_in_force: TimeInForce,
}
impl BinaryOrder {
fn remaining_shares(&self) -> u64 {
self.qty_shares.saturating_sub(self.filled_shares)
}
}
#[derive(Debug, Clone)]
struct OrderSlot {
order: BinaryOrder,
links: Links,
in_level: bool,
level_side: Side,
level_tick: u16,
}
impl OrderSlot {
fn new(order: BinaryOrder) -> Self {
Self {
level_side: order.info.side,
level_tick: order.price_ticks,
order,
links: Links::default(),
in_level: false,
}
}
}
#[derive(Debug, Clone, Default)]
struct OrderStore {
slab: Slab<OrderSlot>,
by_id: HashMap<OrderId, OrderKey>,
by_id_sorted: BTreeMap<OrderId, OrderKey>,
}
impl OrderStore {
fn with_capacity(capacity: usize) -> Self {
Self {
slab: Slab::with_capacity(capacity),
by_id: HashMap::with_capacity(capacity),
by_id_sorted: BTreeMap::new(),
}
}
fn contains(&self, order_id: &OrderId) -> bool {
self.by_id.contains_key(order_id)
}
fn get_key(&self, order_id: &OrderId) -> Option<OrderKey> {
self.by_id.get(order_id).copied()
}
fn get(&self, order_id: &OrderId) -> Option<&BinaryOrder> {
let key = self.get_key(order_id)?;
Some(&self.slab.get(key.slab_index())?.order)
}
fn get_mut(&mut self, order_id: &OrderId) -> Option<&mut BinaryOrder> {
let key = self.get_key(order_id)?;
Some(&mut self.slab.get_mut(key.slab_index())?.order)
}
fn slot(&self, key: OrderKey) -> &OrderSlot {
&self.slab[key.slab_index()]
}
fn slot_mut(&mut self, key: OrderKey) -> &mut OrderSlot {
&mut self.slab[key.slab_index()]
}
fn insert(&mut self, order_id: OrderId, order: BinaryOrder) -> OrderKey {
debug_assert_eq!(order_id, order.info.order_id);
let key = OrderKey::from_slab_index(self.slab.insert(OrderSlot::new(order)));
self.by_id.insert(order_id, key);
self.by_id_sorted.insert(order_id, key);
key
}
fn iter_keys_sorted(&self) -> impl Iterator<Item = (OrderId, OrderKey, &BinaryOrder)> {
self.by_id_sorted
.iter()
.map(|(&oid, &key)| (oid, key, &self.slab[key.slab_index()].order))
}
fn iter_keys_sorted_from(
&self,
cursor_after: Option<OrderId>,
) -> impl Iterator<Item = (OrderId, OrderKey, &BinaryOrder)> {
use std::ops::Bound;
let range = match cursor_after {
Some(c) => (Bound::Excluded(c), Bound::Unbounded),
None => (Bound::Unbounded, Bound::Unbounded),
};
self.by_id_sorted
.range(range)
.map(|(&oid, &key)| (oid, key, &self.slab[key.slab_index()].order))
}
fn remove(&mut self, order_id: &OrderId) -> bool {
let Some(key) = self.by_id.remove(order_id) else {
return false;
};
self.by_id_sorted.remove(order_id);
self.slab.remove(key.slab_index());
true
}
}
#[derive(Debug, Clone, Default)]
struct Bitset {
words: Vec<u64>,
}
impl Bitset {
fn new(bits: usize) -> Self {
Self {
words: vec![0u64; bits.div_ceil(64)],
}
}
fn set(&mut self, idx: usize) {
let w = idx / 64;
let b = idx % 64;
if w < self.words.len() {
self.words[w] |= 1u64 << b;
}
}
fn clear(&mut self, idx: usize) {
let w = idx / 64;
let b = idx % 64;
if w < self.words.len() {
self.words[w] &= !(1u64 << b);
}
}
fn next_set_from(&self, start: usize, bit_len: usize) -> Option<usize> {
if start >= bit_len {
return None;
}
let mut w = start / 64;
let mut bits = self.words[w] & (!0u64 << (start % 64));
loop {
if bits != 0 {
let idx = w * 64 + bits.trailing_zeros() as usize;
return (idx < bit_len).then_some(idx);
}
w += 1;
if w >= self.words.len() {
return None;
}
bits = self.words[w];
}
}
fn prev_set_from(&self, start: usize, bit_len: usize) -> Option<usize> {
if bit_len == 0 {
return None;
}
let start = start.min(bit_len.saturating_sub(1));
let mut w = start / 64;
let mut bits = self.words[w] & (!0u64 >> (63 - (start % 64)));
loop {
if bits != 0 {
let idx = w * 64 + (63 - bits.leading_zeros() as usize);
return (idx < bit_len).then_some(idx);
}
if w == 0 {
return None;
}
w -= 1;
bits = self.words[w];
}
}
}
#[derive(Debug, Clone, Copy, Default)]
struct Level {
head: Option<OrderKey>,
tail: Option<OrderKey>,
total_remaining: u64,
}
#[derive(Debug, Clone)]
struct LevelPage {
levels: [Level; LEVEL_PAGE_LEN],
}
impl Default for LevelPage {
fn default() -> Self {
Self {
levels: [Level::default(); LEVEL_PAGE_LEN],
}
}
}
#[derive(Debug, Clone)]
struct SideBook {
page_refs: Box<[u16]>,
pages: Vec<LevelPage>,
mask: Bitset,
bits: usize,
}
impl SideBook {
fn new(bits: usize) -> Self {
let page_count = bits.div_ceil(LEVEL_PAGE_LEN);
debug_assert!(page_count <= u16::MAX as usize);
Self {
page_refs: vec![0; page_count].into_boxed_slice(),
pages: Vec::new(),
mask: Bitset::new(bits),
bits,
}
}
#[inline]
fn split_tick(tick: usize) -> (usize, usize) {
(tick / LEVEL_PAGE_LEN, tick % LEVEL_PAGE_LEN)
}
#[inline]
fn page_index(&self, page_idx: usize) -> Option<usize> {
match self.page_refs[page_idx] {
0 => None,
n => Some((n - 1) as usize),
}
}
#[inline]
fn level_ref(&self, tick: usize) -> Option<&Level> {
debug_assert!(tick < self.bits, "tick out of range: {tick}");
let (page_idx, slot_idx) = Self::split_tick(tick);
let page_idx = self.page_index(page_idx)?;
Some(&self.pages[page_idx].levels[slot_idx])
}
#[inline]
fn level_mut(&mut self, tick: usize) -> Option<&mut Level> {
debug_assert!(tick < self.bits, "tick out of range: {tick}");
let (page_idx, slot_idx) = Self::split_tick(tick);
let page_idx = self.page_index(page_idx)?;
Some(&mut self.pages[page_idx].levels[slot_idx])
}
#[inline]
fn level_mut_or_alloc(&mut self, tick: usize) -> &mut Level {
debug_assert!(tick < self.bits, "tick out of range: {tick}");
let (page_slot, slot_idx) = Self::split_tick(tick);
let page_idx = match self.page_index(page_slot) {
Some(page_idx) => page_idx,
None => {
debug_assert!(self.pages.len() < u16::MAX as usize);
self.pages.push(LevelPage::default());
let page_idx = self.pages.len() - 1;
self.page_refs[page_slot] = (page_idx as u16) + 1;
page_idx
}
};
&mut self.pages[page_idx].levels[slot_idx]
}
#[inline]
fn level_head(&self, tick: usize) -> Option<OrderKey> {
self.level_ref(tick).and_then(|level| level.head)
}
#[inline]
fn level_total_remaining(&self, tick: usize) -> u64 {
self.level_ref(tick)
.map(|level| level.total_remaining)
.unwrap_or(0)
}
#[inline]
fn decrement_total(&mut self, tick: usize, delta: u64) {
let Some(level) = self.level_mut(tick) else {
error!(tick, "level page missing during decrement");
return;
};
level.total_remaining = level.total_remaining.saturating_sub(delta);
}
fn best_asc(&self) -> Option<usize> {
self.mask.next_set_from(0, self.bits)
}
fn best_desc(&self) -> Option<usize> {
self.mask
.prev_set_from(self.bits.saturating_sub(1), self.bits)
}
fn next_asc_from(&self, start: usize) -> Option<usize> {
self.mask.next_set_from(start, self.bits)
}
fn next_desc_from(&self, start: usize) -> Option<usize> {
self.mask.prev_set_from(start, self.bits)
}
fn insert_tail(&mut self, orders: &mut OrderStore, key: OrderKey, remaining: u64) {
let tick = orders.slot(key).level_tick as usize;
let mut set_mask = false;
{
let level = self.level_mut_or_alloc(tick);
if level.head.is_none() {
set_mask = true;
level.head = Some(key);
level.tail = Some(key);
} else {
let Some(tail) = level.tail else {
error!(tick, "tail missing for non-empty level");
return;
};
orders.slot_mut(tail).links.next = Some(key);
orders.slot_mut(key).links.prev = Some(tail);
level.tail = Some(key);
}
level.total_remaining = level.total_remaining.saturating_add(remaining);
}
if set_mask {
self.mask.set(tick);
}
let slot = orders.slot_mut(key);
slot.in_level = true;
slot.links.next = None;
}
fn unlink(&mut self, orders: &mut OrderStore, key: OrderKey, remaining: u64) {
let (tick, prev, next) = {
let slot = orders.slot(key);
(slot.level_tick as usize, slot.links.prev, slot.links.next)
};
let became_empty = {
let Some(level) = self.level_mut(tick) else {
error!(tick, "level page missing during unlink");
return;
};
if level.head == Some(key) {
level.head = next;
}
if level.tail == Some(key) {
level.tail = prev;
}
if let Some(p) = prev {
orders.slot_mut(p).links.next = next;
}
if let Some(n) = next {
orders.slot_mut(n).links.prev = prev;
}
level.total_remaining = level.total_remaining.saturating_sub(remaining);
level.head.is_none()
};
if became_empty {
self.mask.clear(tick);
}
let slot = orders.slot_mut(key);
slot.links = Links::default();
slot.in_level = false;
}
}
#[derive(Debug, Clone)]
pub struct BinaryYesBook {
market_id: MarketId,
pub(crate) market_name: String,
market_kind: crate::types::MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
batch_process: Option<BatchProcessState>,
queued_batches: Vec<BatchProcessDescriptor>,
close_batch_max_events: u16,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
yes_runner_label: String,
no_runner_label: String,
max_price_ticks: u16,
next_order_id: u64,
orders: OrderStore,
active_order_count_cache: usize,
bids: SideBook,
asks: SideBook,
scratch: Scratch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BinaryYesBookSnapshot {
market_id: MarketId,
market_name: String,
market_kind: crate::types::MarketKind,
state: BookMarketState,
market_phase: MarketPhase,
batch_process: Option<BatchProcessState>,
#[serde(default)]
queued_batches: Vec<BatchProcessDescriptor>,
close_batch_max_events: u16,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
yes_runner_label: String,
no_runner_label: String,
max_price_ticks: u16,
next_order_id: u64,
orders: BTreeMap<OrderId, BinaryOrder>,
}
#[derive(Debug, Default, Clone)]
struct Scratch {
events: EventVec,
maker_fills: HashMap<OrderId, u64>,
}
#[derive(Debug, Clone)]
struct PlacePlan {
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
side: Side,
price_ticks: u16,
qty_shares: u64,
time_in_force: TimeInForce,
}
#[derive(Debug, Clone)]
struct ReducePlan {
correlation_id: Option<CorrelationId>,
order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
new_price_ticks: Option<u16>,
target: Option<ReduceBinaryOrderTarget>,
condition: Option<ReduceBinaryOrderCondition>,
}
#[derive(Debug, Clone)]
struct MatchRequest {
ts: DateTime,
taker_order_id: OrderId,
taker_account_id: AccountId,
taker_correlation_id: Option<CorrelationId>,
taker_side: Side,
taker_price_ticks: u16,
taker_qty_shares: u64,
}
impl BinaryYesBook {
pub fn new_with_capacity(
market_id: MarketId,
market_kind: crate::types::MarketKind,
market_phase: MarketPhase,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
order_store_capacity: usize,
) -> Self {
crate::book::common::types::assert_kind_supports_phase(market_kind, market_phase);
let bits = max_price_ticks as usize + 1;
Self {
market_id,
market_name: String::new(),
market_kind,
market_phase,
state: BookMarketState::Open,
batch_process: None,
queued_batches: Vec::new(),
close_batch_max_events: crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS,
yes_runner_id,
no_runner_id,
yes_runner_label: yes_runner_id.to_string(),
no_runner_label: no_runner_id.to_string(),
max_price_ticks,
next_order_id: 1,
orders: OrderStore::with_capacity(order_store_capacity),
active_order_count_cache: 0,
bids: SideBook::new(bits),
asks: SideBook::new(bits),
scratch: Scratch::default(),
}
}
fn rebuild_ladders(&mut self) {
type RebuildLevelKey = (Side, u16);
type RebuildLevelItem = (DateTime, OrderId, OrderKey, u64);
type RebuildPerLevel = HashMap<RebuildLevelKey, Vec<RebuildLevelItem>>;
let bits = self.max_price_ticks as usize + 1;
self.bids = SideBook::new(bits);
self.asks = SideBook::new(bits);
self.active_order_count_cache = 0;
let mut per_level: RebuildPerLevel = HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
let remaining = order.remaining_shares();
if remaining == 0 {
continue;
}
if order.price_ticks == 0 || order.price_ticks >= self.max_price_ticks {
continue;
}
per_level
.entry((order.info.side, order.price_ticks))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((_, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
for (_, _, key, remaining) in items {
match self.orders.slot(key).level_side {
Side::Yes => self.bids.insert_tail(&mut self.orders, key, remaining),
Side::No => self.asks.insert_tail(&mut self.orders, key, remaining),
}
}
}
}
fn apply_remove_order(&mut self, order_id: OrderId) {
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
if matches!(
self.orders.slot(key).order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
let prev_remaining = self.orders.slot(key).order.remaining_shares();
if self.orders.slot(key).in_level && prev_remaining > 0 {
match self.orders.slot(key).level_side {
Side::Yes => self.bids.unlink(&mut self.orders, key, prev_remaining),
Side::No => self.asks.unlink(&mut self.orders, key, prev_remaining),
}
}
self.orders.remove(&order_id);
}
pub(crate) fn set_close_batch_max_events(&mut self, batch_max_events: u16) {
crate::book::close_process::set_close_batch_max_events(
&mut self.close_batch_max_events,
batch_max_events,
);
}
pub(crate) fn set_market_state(&mut self, state: BookMarketState) {
self.state = state;
}
pub(crate) fn close_batch_max_events(&self) -> u16 {
self.close_batch_max_events
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn market_phase(&self) -> MarketPhase {
self.market_phase
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders
.get_key(&order_id)
.map(|k| self.orders.slot(k).in_level)
.unwrap_or(false)
}
pub fn active_order_count(&self) -> usize {
self.active_order_count_cache
}
pub fn batch_process_state(&self) -> Option<&BatchProcessState> {
self.batch_process.as_ref()
}
fn has_active_batch_process(&self) -> bool {
self.batch_process.is_some()
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
[self.yes_runner_id, self.no_runner_id].into_iter()
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
}
}
pub fn max_price_ticks(&self) -> u16 {
self.max_price_ticks
}
pub fn best_bid_ticks(&self) -> Option<u16> {
let t = self.bids.best_desc()?;
let t_u16 = t as u16;
(t_u16 > 0 && t_u16 < self.max_price_ticks).then_some(t_u16)
}
pub fn best_ask_ticks(&self) -> Option<u16> {
let t = self.asks.best_asc()?;
let t_u16 = t as u16;
(t_u16 > 0 && t_u16 < self.max_price_ticks).then_some(t_u16)
}
pub fn depth(&self, depth: usize) -> BinaryDepth {
let mut bids = Vec::with_capacity(depth);
let mut asks = Vec::with_capacity(depth);
let mut tick = self.bids.best_desc();
while let Some(t) = tick {
if bids.len() >= depth {
break;
}
let t_u16 = t as u16;
if t_u16 > 0 && t_u16 < self.max_price_ticks {
let total_remaining = self.bids.level_total_remaining(t);
if total_remaining > 0 {
bids.push(BinaryPriceSize {
price_ticks: t_u16,
size_shares: total_remaining,
});
}
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
let mut tick = self.asks.best_asc();
while let Some(t) = tick {
if asks.len() >= depth {
break;
}
let t_u16 = t as u16;
if t_u16 > 0 && t_u16 < self.max_price_ticks {
let total_remaining = self.asks.level_total_remaining(t);
if total_remaining > 0 {
asks.push(BinaryPriceSize {
price_ticks: t_u16,
size_shares: total_remaining,
});
}
}
tick = self.asks.next_asc_from(t + 1);
}
BinaryDepth {
max_price_ticks: self.max_price_ticks,
bids,
asks,
}
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<crate::book::PriceSize> {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return None;
}
None
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<crate::book::PriceSize> {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return None;
}
None
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return Money::zero();
}
Money::zero()
}
pub fn total_matched(&self) -> Money {
Money::zero()
}
pub(crate) fn set_runner_labels(&mut self, runner_ids: &[RunnerId], runner_labels: &[String]) {
assert_eq!(runner_ids.len(), 2, "binary market requires 2 runner ids");
assert_eq!(
runner_labels.len(),
2,
"binary market requires 2 runner labels"
);
for (runner_id, runner_label) in runner_ids.iter().zip(runner_labels.iter()) {
if *runner_id == self.yes_runner_id {
self.yes_runner_label = runner_label.clone();
} else if *runner_id == self.no_runner_id {
self.no_runner_label = runner_label.clone();
} else {
panic!("unknown runner id {:?} for binary market", runner_id);
}
}
}
pub fn runner_label(&self, runner_id: RunnerId) -> &str {
if runner_id == self.yes_runner_id {
&self.yes_runner_label
} else if runner_id == self.no_runner_id {
&self.no_runner_label
} else {
panic!("unknown runner id {:?} for binary market", runner_id);
}
}
fn emit(&self, ts: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_seq: 0,
timestamp: ts,
metadata: EventMetadata::default(),
event,
}
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let current_state = self.state;
let current_phase = self.market_phase;
let state_err = |reason, requested_state| {
BookError::state_error(
err_correlation_id.clone(),
reason,
current_state,
current_phase,
requested_state,
)
};
let ts = Utc::now();
if cmd.market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason } => {
let events = self
.cmd_halt_market(ts, reason)
.map_err(|reason| state_err(reason, RequestedBookState::Halted))?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id,
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id,
kind: None,
},
));
}
_ => {}
}
cmd.kind
.validate_book_gate(self.batch_process_state(), self.state.is_halted())
.map_err(&err)?;
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
scratch.events.clear();
scratch.maker_fills.clear();
let resp_kind = match &cmd.kind {
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("halt/resume handled above")
}
CommandKind::CreateMarket { .. } => return Err(err(RejectReason::InternalError)),
CommandKind::AddRunners { .. } | CommandKind::ChangeRunners { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::SetMarketState { state } => {
match state {
MarketState::Closed => self
.plan_close_market(
&mut scratch,
"SET_MARKET_STATE",
ts,
self.close_batch_max_events,
)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?,
_ => {
let reason = "SET_MARKET_STATE";
self.plan_set_market_state(&mut scratch, ts, *state, reason)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(*state))
})?;
}
}
None
}
CommandKind::AwaitLiveMarket => {
self.plan_await_live_market(&mut scratch, ts, "AWAIT_LIVE_MARKET")
.map_err(|reason| state_err(reason, RequestedBookState::AwaitLive))?;
None
}
CommandKind::ReturnToPreMarket { reason } => {
self.plan_return_to_pre_market(&mut scratch, ts, reason.as_str())
.map_err(|reason| state_err(reason, RequestedBookState::Pre))?;
None
}
CommandKind::GoLiveMarket => {
self.plan_go_live_market(&mut scratch, ts, "GO_LIVE_MARKET")
.map_err(|reason| state_err(reason, RequestedBookState::Live))?;
None
}
CommandKind::CloseMarket { reason } => {
self.plan_close_market(&mut scratch, reason, ts, self.close_batch_max_events)
.map_err(|reason| {
state_err(reason, RequestedBookState::Market(MarketState::Closed))
})?;
None
}
CommandKind::ContinueBatchProcess => {
self.plan_continue_batch_process(&mut scratch, ts)
.map_err(&err)?;
None
}
CommandKind::BatchCancelOrders {
from_created_at_inclusive,
to_created_at_inclusive,
account_id,
runner_id,
reason,
} => {
self.plan_batch_cancel_orders(
&mut scratch,
ts,
*from_created_at_inclusive,
*to_created_at_inclusive,
account_id.clone(),
*runner_id,
reason.as_str(),
)
.map_err(&err)?;
None
}
CommandKind::PlaceBinaryOrder {
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
..
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
let order_id = OrderId(self.next_order_id);
let result = self
.plan_place_binary_order(
&mut scratch,
ts,
PlacePlan {
correlation_id: correlation_id.clone(),
order_id,
account_id: account_id.clone(),
side: *side,
price_ticks: *price_ticks,
qty_shares: *qty_shares,
time_in_force: *time_in_force,
},
)
.map_err(&err)?;
Some(CommandResponseKind::PlaceOrder(result))
}
CommandKind::CancelOrder {
account_id,
order_id,
} => {
self.plan_cancel_binary_order(
&mut scratch,
ts,
*order_id,
account_id.clone(),
CancelCause::UserCancel,
)
.map_err(&err)?;
None
}
CommandKind::ReduceBinaryOrder {
account_id,
order_id,
new_price_ticks,
target,
condition,
} => {
ensure_can_accept_new_orders(self.state).map_err(&err)?;
let new_order_id = OrderId(self.next_order_id);
self.plan_reduce_binary_order(
&mut scratch,
ts,
ReducePlan {
correlation_id: correlation_id.clone(),
order_id: *order_id,
new_order_id,
account_id: account_id.clone(),
new_price_ticks: *new_price_ticks,
target: *target,
condition: condition.clone(),
},
)
.map_err(&err)?;
None
}
CommandKind::PlaceOrder { .. }
| CommandKind::ReduceOrder { .. }
| CommandKind::RemoveRunner { .. }
| CommandKind::RemoveRunners { .. }
| CommandKind::VoidTrades { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
};
let envs = std::mem::take(&mut scratch.events).into_vec();
Ok((
envs,
CommandResponse {
correlation_id: correlation_id.clone(),
kind: resp_kind,
},
))
})();
self.scratch = scratch;
result
}
fn plan_set_market_state(
&self,
scratch: &mut Scratch,
ts: DateTime,
state: MarketState,
reason: &str,
) -> Result<(), RejectReason> {
let to = match state {
MarketState::Open => BookMarketState::Open,
MarketState::Suspended => BookMarketState::Suspended,
MarketState::Closed => BookMarketState::Closed,
MarketState::Deactivated => BookMarketState::Deactivated,
};
ensure_state_change(self.state, to)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
close_batch_max_events: None,
},
));
Ok(())
}
fn plan_await_live_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &str,
) -> Result<(), RejectReason> {
let plan =
plan_await_live_market_with_lapse(self.state, self.market_phase, self.market_kind)?;
if plan.suspend_market {
self.emit_state_change(scratch, ts, BookMarketState::Suspended, reason, None);
}
if plan.set_pre_await_live {
self.emit_phase_change(scratch, ts, MarketPhase::PreAwaitLive, reason);
}
Ok(())
}
fn plan_go_live_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &str,
) -> Result<(), RejectReason> {
plan_go_live_market_with_lapse(self.state, self.market_phase, self.market_kind)?;
self.emit_phase_change(scratch, ts, MarketPhase::Live, reason);
Ok(())
}
fn plan_return_to_pre_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &str,
) -> Result<(), RejectReason> {
plan_return_to_pre_market(self.state, self.market_phase, self.market_kind)?;
self.emit_phase_change(scratch, ts, MarketPhase::Pre, reason);
Ok(())
}
fn emit_state_change(
&self,
scratch: &mut Scratch,
ts: DateTime,
to: BookMarketState,
reason: &str,
close_batch_max_events: Option<u16>,
) {
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
close_batch_max_events,
},
));
}
fn emit_phase_change(
&self,
scratch: &mut Scratch,
ts: DateTime,
to: MarketPhase,
reason: &str,
) {
scratch.events.push(self.emit(
ts,
BookEvent::MarketPhaseChanged {
to,
reason: reason.to_string(),
},
));
}
fn cmd_halt_market(&self, ts: DateTime, reason: &str) -> Result<EventVec, RejectReason> {
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: BookMarketState::Halted,
reason: reason.to_string(),
close_batch_max_events: None,
},
));
Ok(events)
}
fn cmd_resume_market(&self, ts: DateTime) -> Result<EventVec, RejectReason> {
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: BookMarketState::Open,
reason: "RESUME".to_string(),
close_batch_max_events: None,
},
));
Ok(events)
}
fn is_live_order(order: &BinaryOrder) -> bool {
matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
}
fn count_live_orders(&self) -> u64 {
self.orders
.iter_keys_sorted()
.filter(|(_, _, o)| Self::is_live_order(o))
.count() as u64
}
fn select_cancelled_chunk(
&self,
cursor_after: Option<OrderId>,
max_orders: usize,
should_cancel: impl FnMut(&BinaryOrder) -> bool,
) -> CancelledOrdersChunk {
collect_cancelled_orders_chunk(
self.orders
.iter_keys_sorted_from(cursor_after)
.map(|(order_id, _, order)| (order_id, order)),
max_orders,
should_cancel,
|order| &order.info,
)
}
fn select_live_orders_chunk(
&self,
cursor_after: Option<OrderId>,
max_orders: usize,
) -> CancelledOrdersChunk {
self.select_cancelled_chunk(cursor_after, max_orders, Self::is_live_order)
}
fn batch_context_for_mode(&self, batch_mode: BatchMode) -> BatchProcessContext {
match batch_mode {
BatchMode::Close => BatchProcessContext::Close {
total_live_orders: self.count_live_orders(),
},
_ => BatchProcessContext::None,
}
}
fn select_cancelled_chunk_for_target(
&self,
cursor_after: Option<OrderId>,
batch_max_events: usize,
target: &BatchProcessTarget,
) -> CancelledOrdersChunk {
match target {
BatchProcessTarget::AllLiveOrders => {
self.select_live_orders_chunk(cursor_after, batch_max_events)
}
BatchProcessTarget::Filtered { runner_filter, .. } => {
self.select_cancelled_chunk(cursor_after, batch_max_events, |order| {
Self::is_live_order(order)
&& matches_filtered_batch_target(
target,
&order.info,
runner_filter.is_none_or(|runner_id| {
runner_id == self.yes_runner_id || runner_id == self.no_runner_id
}),
)
})
}
BatchProcessTarget::LapseOrders => {
unreachable!("binary book does not use lapse batches")
}
BatchProcessTarget::RunnerRemoval { .. } => {
unreachable!("binary book does not use runner-removal batches")
}
}
}
fn plan_close_market(
&mut self,
scratch: &mut Scratch,
reason: &str,
ts: DateTime,
batch_max_events: u16,
) -> Result<(), RejectReason> {
ensure_state_change(self.state, BookMarketState::Closed)?;
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self
.batch_process_state()
.is_some_and(BatchProcessState::is_close)
{
return Err(RejectReason::MarketBatchCancelling);
}
self.emit_state_change(
scratch,
ts,
BookMarketState::Closed,
reason,
Some(batch_max_events),
);
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::Close,
batch_max_events,
&BatchProcessTarget::AllLiveOrders,
None,
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Ok(())
}
fn plan_continue_batch_process(
&mut self,
scratch: &mut Scratch,
ts: DateTime,
) -> Result<(), RejectReason> {
continue_batch_process(
self.batch_process_state(),
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
)
}
#[allow(clippy::too_many_arguments)]
fn plan_batch_cancel_orders(
&self,
scratch: &mut Scratch,
ts: DateTime,
from_created_at_inclusive: Option<DateTime>,
to_created_at_inclusive: Option<DateTime>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
reason: &str,
) -> Result<(), RejectReason> {
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
let batch_max_events = self.close_batch_max_events;
let from_ms_opt = from_created_at_inclusive.map(|d| d.timestamp_millis());
let to_ms_opt = to_created_at_inclusive.map(|d| d.timestamp_millis());
let started_at_ms = ts.timestamp_millis();
let target = filtered_batch_target(
started_at_ms,
from_ms_opt,
to_ms_opt,
account_filter,
runner_filter,
);
maybe_start_or_retarget_batch(
self.batch_process.as_ref(),
BatchMode::FilteredCancel,
batch_max_events,
&target,
Some(reason),
&mut scratch.events,
|event| self.emit(ts, event),
|cursor_after, max_orders, target| {
self.select_cancelled_chunk_for_target(cursor_after, max_orders, target)
},
);
Ok(())
}
fn validate_price_ticks(&self, price_ticks: u16) -> Result<(), RejectReason> {
if self.max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if price_ticks == 0 || price_ticks >= self.max_price_ticks {
return Err(RejectReason::InvalidPriceTicks);
}
Ok(())
}
fn validate_qty_shares(qty_shares: u64) -> Result<(), RejectReason> {
if qty_shares == 0 {
return Err(RejectReason::InvalidQtyShares);
}
Ok(())
}
fn available_to_match(
&self,
side: Side,
price_ticks: u16,
exclude_account_id: Option<&AccountId>,
) -> u64 {
if let Some(exclude_account_id) = exclude_account_id {
let limit = price_ticks as usize;
return match side {
Side::Yes => {
let mut tick = self.asks.best_asc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
tick = self.asks.next_asc_from(t + 1);
continue;
}
if t > limit {
break;
}
let mut maker_key = self.asks.level_head(t);
while let Some(k) = maker_key {
let slot = self.orders.slot(k);
maker_key = slot.links.next;
if &slot.order.info.account_id == exclude_account_id {
continue;
}
total = total.saturating_add(slot.order.remaining_shares());
}
tick = self.asks.next_asc_from(t + 1);
}
total
}
Side::No => {
let mut tick = self.bids.best_desc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
continue;
}
if t < limit {
break;
}
let mut maker_key = self.bids.level_head(t);
while let Some(k) = maker_key {
let slot = self.orders.slot(k);
maker_key = slot.links.next;
if &slot.order.info.account_id == exclude_account_id {
continue;
}
total = total.saturating_add(slot.order.remaining_shares());
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
total
}
};
}
let limit = price_ticks as usize;
match side {
Side::Yes => {
let mut tick = self.asks.best_asc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
tick = self.asks.next_asc_from(t + 1);
continue;
}
if t > limit {
break;
}
total = total.saturating_add(self.asks.level_total_remaining(t));
tick = self.asks.next_asc_from(t + 1);
}
total
}
Side::No => {
let mut tick = self.bids.best_desc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
continue;
}
if t < limit {
break;
}
total = total.saturating_add(self.bids.level_total_remaining(t));
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
total
}
}
}
fn plan_place_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
plan: PlacePlan,
) -> Result<PlaceOrderResult, RejectReason> {
let PlacePlan {
correlation_id,
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
} = plan;
debug_assert!(self.state.is_matchable());
self.validate_price_ticks(price_ticks)?;
Self::validate_qty_shares(qty_shares)?;
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let Some(required) = time_in_force.required_fok_fill(Quantity(qty_shares))? {
let available = self.available_to_match(side, price_ticks, Some(&account_id));
if available < required.0 {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.push(self.emit(
ts,
BookEvent::BinaryOrderAccepted {
correlation_id: correlation_id.clone(),
order_id,
account_id: account_id.clone(),
side,
price_ticks,
qty_shares,
time_in_force,
},
));
let (matched_shares, avg_price_ticks) = self.plan_matching(
scratch,
MatchRequest {
ts,
taker_order_id: order_id,
taker_account_id: account_id.clone(),
taker_correlation_id: correlation_id.clone(),
taker_side: side,
taker_price_ticks: price_ticks,
taker_qty_shares: qty_shares,
},
);
let mut remaining = qty_shares.saturating_sub(matched_shares);
let mut state = if remaining == 0 {
BookOrderState::ExecutionComplete
} else if matched_shares > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let remainder_cancel_cause =
time_in_force_remainder_cancel_cause(time_in_force, remaining > 0);
if let Some(cancel_cause) = remainder_cancel_cause {
remaining = 0;
state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry {
order_id,
account_id,
correlation_id,
},
cancel_cause,
cause_detail: None,
},
));
}
Ok(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Shares(matched_shares),
avg_matched_price: avg_price_ticks.map(FillPrice::Ticks),
remaining: FillQuantity::Shares(remaining),
final_order_state: Some(state),
})
}
fn plan_cancel_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
cancel_cause: CancelCause,
) -> Result<(), RejectReason> {
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
if !matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
return Err(RejectReason::OrderNotLive);
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
cancelled_order: CancelledOrderEntry::from_order_info(order_id, &order.info),
cancel_cause,
cause_detail: Some(cancel_cause.detail().to_string()),
},
));
Ok(())
}
fn validate_reduce_binary_condition(
old: &BinaryOrder,
condition: Option<&ReduceBinaryOrderCondition>,
) -> Result<(), RejectReason> {
let Some(condition) = condition else {
return Ok(());
};
if condition
.expected_price_ticks
.is_some_and(|price_ticks| price_ticks != old.price_ticks)
|| condition
.expected_qty_shares
.is_some_and(|qty_shares| qty_shares != old.qty_shares)
|| condition
.expected_filled_shares
.is_some_and(|filled_shares| filled_shares != old.filled_shares)
|| condition
.expected_remaining_shares
.is_some_and(|remaining| remaining != old.remaining_shares())
{
return Err(RejectReason::OrderStateChanged);
}
Ok(())
}
fn reduce_binary_target_remaining(
old: &BinaryOrder,
target: Option<ReduceBinaryOrderTarget>,
) -> Result<u64, RejectReason> {
match target {
None => Ok(old.remaining_shares()),
Some(ReduceBinaryOrderTarget::TotalShares(total_shares)) => {
if total_shares > old.qty_shares {
return Err(RejectReason::ExposureIncreaseNotAllowed);
}
Ok(total_shares.saturating_sub(old.filled_shares))
}
Some(ReduceBinaryOrderTarget::RemainingShares(remaining_shares)) => {
Ok(remaining_shares)
}
}
}
fn ensure_binary_reduce_only(
max_price_ticks: u16,
old: &BinaryOrder,
price_ticks: u16,
qty_shares: u64,
) -> Result<(), RejectReason> {
let old_remaining = old.remaining_shares();
if qty_shares > old_remaining {
return Err(RejectReason::ExposureIncreaseNotAllowed);
}
let current_reserve = formulas::binary_order_reserve(
max_price_ticks,
old.info.side,
old.price_ticks,
old_remaining,
);
let next_reserve =
formulas::binary_order_reserve(max_price_ticks, old.info.side, price_ticks, qty_shares);
if next_reserve > current_reserve {
return Err(RejectReason::ExposureIncreaseNotAllowed);
}
Ok(())
}
fn plan_reduce_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
plan: ReducePlan,
) -> Result<(), RejectReason> {
let ReducePlan {
correlation_id,
order_id,
new_order_id,
account_id,
new_price_ticks,
target,
condition,
} = plan;
let Some(old) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
Self::validate_reduce_binary_condition(old, condition.as_ref())?;
let price_ticks = new_price_ticks.unwrap_or(old.price_ticks);
self.validate_price_ticks(price_ticks)?;
let qty_shares = Self::reduce_binary_target_remaining(old, target)?;
Self::ensure_binary_reduce_only(self.max_price_ticks, old, price_ticks, qty_shares)?;
if price_ticks == old.price_ticks && qty_shares == old.remaining_shares() {
return Err(RejectReason::NoChange);
}
if !matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
return Err(RejectReason::OrderNotLive);
}
if qty_shares > 0 {
Self::validate_qty_shares(qty_shares)?;
self.plan_place_binary_order(
scratch,
ts,
PlacePlan {
correlation_id,
order_id: new_order_id,
account_id: account_id.clone(),
side: old.info.side,
price_ticks,
qty_shares,
time_in_force: TimeInForce::Gtc,
},
)?;
}
self.plan_cancel_binary_order(scratch, ts, order_id, account_id, CancelCause::Reduce)?;
Ok(())
}
fn plan_matching(&self, scratch: &mut Scratch, r: MatchRequest) -> (u64, Option<u16>) {
let MatchRequest {
ts,
taker_order_id,
taker_account_id,
taker_correlation_id,
taker_side,
taker_price_ticks,
taker_qty_shares,
} = r;
let mut remaining = taker_qty_shares;
if remaining == 0 {
return (0, None);
}
let mut matched = 0u64;
let mut price_qty_sum: u128 = 0;
match taker_side {
Side::Yes => {
let limit = taker_price_ticks as usize;
let mut tick = self.asks.best_asc();
while let Some(t) = tick {
if remaining == 0 || t > limit {
break;
}
let mut maker_key = self.asks.level_head(t);
while let Some(k) = maker_key {
if remaining == 0 {
break;
}
let maker = &self.orders.slot(k).order;
if maker.info.account_id == taker_account_id {
maker_key = self.orders.slot(k).links.next;
continue;
}
let maker_remaining = maker.remaining_shares().saturating_sub(
*scratch.maker_fills.get(&maker.info.order_id).unwrap_or(&0),
);
if maker_remaining == 0 {
maker_key = self.orders.slot(k).links.next;
continue;
}
let fill = remaining.min(maker_remaining);
remaining -= fill;
matched += fill;
price_qty_sum += fill as u128 * maker.price_ticks as u128;
scratch
.maker_fills
.entry(maker.info.order_id)
.and_modify(|v| *v = v.saturating_add(fill))
.or_insert(fill);
let maker_remaining = maker_remaining.saturating_sub(fill);
let taker_remaining = remaining;
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
correlation_id: maker.info.correlation_id.clone(),
order_id: maker.info.order_id,
account_id: maker.info.account_id.clone(),
role: TradeRole::Maker,
side: maker.info.side,
market_phase: self.market_phase,
price_ticks: maker.price_ticks,
counter_party: taker_order_id,
counter_party_account_id: taker_account_id.clone(),
remaining_qty_shares: maker_remaining,
matched_delta_shares: fill,
},
));
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
correlation_id: taker_correlation_id.clone(),
order_id: taker_order_id,
account_id: taker_account_id.clone(),
role: TradeRole::Taker,
side: taker_side,
market_phase: self.market_phase,
price_ticks: maker.price_ticks,
counter_party: maker.info.order_id,
counter_party_account_id: maker.info.account_id.clone(),
remaining_qty_shares: taker_remaining,
matched_delta_shares: fill,
},
));
maker_key = self.orders.slot(k).links.next;
}
tick = self.asks.next_asc_from(t + 1);
}
}
Side::No => {
let limit = taker_price_ticks as usize;
let mut tick = self.bids.best_desc();
while let Some(t) = tick {
if remaining == 0 || t < limit {
break;
}
let mut maker_key = self.bids.level_head(t);
while let Some(k) = maker_key {
if remaining == 0 {
break;
}
let maker = &self.orders.slot(k).order;
if maker.info.account_id == taker_account_id {
maker_key = self.orders.slot(k).links.next;
continue;
}
let maker_remaining = maker.remaining_shares().saturating_sub(
*scratch.maker_fills.get(&maker.info.order_id).unwrap_or(&0),
);
if maker_remaining == 0 {
maker_key = self.orders.slot(k).links.next;
continue;
}
let fill = remaining.min(maker_remaining);
remaining -= fill;
matched += fill;
price_qty_sum += fill as u128 * maker.price_ticks as u128;
scratch
.maker_fills
.entry(maker.info.order_id)
.and_modify(|v| *v = v.saturating_add(fill))
.or_insert(fill);
let maker_remaining = maker_remaining.saturating_sub(fill);
let taker_remaining = remaining;
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
correlation_id: maker.info.correlation_id.clone(),
order_id: maker.info.order_id,
account_id: maker.info.account_id.clone(),
role: TradeRole::Maker,
side: maker.info.side,
market_phase: self.market_phase,
price_ticks: maker.price_ticks,
counter_party: taker_order_id,
counter_party_account_id: taker_account_id.clone(),
remaining_qty_shares: maker_remaining,
matched_delta_shares: fill,
},
));
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
correlation_id: taker_correlation_id.clone(),
order_id: taker_order_id,
account_id: taker_account_id.clone(),
role: TradeRole::Taker,
side: taker_side,
market_phase: self.market_phase,
price_ticks: maker.price_ticks,
counter_party: maker.info.order_id,
counter_party_account_id: maker.info.account_id.clone(),
remaining_qty_shares: taker_remaining,
matched_delta_shares: fill,
},
));
maker_key = self.orders.slot(k).links.next;
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
}
}
let avg = if matched == 0 {
None
} else {
Some(((price_qty_sum + matched as u128 / 2) / matched as u128) as u16)
};
(matched, avg)
}
pub fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
let mut batch_process = self.batch_process.take();
let mut queued_batches = std::mem::take(&mut self.queued_batches);
let handled = if apply_batch_process_state_event(
&mut batch_process,
&mut queued_batches,
&env.event,
|batch_mode| self.batch_context_for_mode(batch_mode),
) {
true
} else {
apply_batch_cancelled_chunk_event(&mut batch_process, &env.event, |order_id| {
self.apply_remove_order(order_id)
})
};
self.batch_process = batch_process;
self.queued_batches = queued_batches;
if handled {
return;
}
match &env.event {
BookEvent::MarketCreated { .. } => {}
BookEvent::MarketStateChanged {
to,
close_batch_max_events: _,
..
} => {
self.state = *to;
}
BookEvent::MarketPhaseChanged { to, .. } => {
assert!(
self.market_kind
.can_transition_to_phase(self.market_phase, *to),
"market kind does not support applied phase transition"
);
self.market_phase = *to;
}
BookEvent::OrderCancelled {
cancelled_order, ..
} => {
self.apply_remove_order(cancelled_order.order_id);
}
BookEvent::BinaryOrderAccepted {
correlation_id,
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
let order = BinaryOrder {
info: BookOrderInfo {
order_id: *order_id,
account_id: account_id.clone(),
correlation_id: correlation_id.clone(),
side: *side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
price_ticks: *price_ticks,
qty_shares: *qty_shares,
filled_shares: 0,
time_in_force: *time_in_force,
};
let key = self.orders.insert(*order_id, order);
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
match side {
Side::Yes => self.bids.insert_tail(&mut self.orders, key, *qty_shares),
Side::No => self.asks.insert_tail(&mut self.orders, key, *qty_shares),
}
}
BookEvent::BinaryTradeMatched {
order_id,
matched_delta_shares,
..
} => {
self.apply_trade_fill(*order_id, *matched_delta_shares, ts);
}
BookEvent::OrderAccepted { .. }
| BookEvent::TradeMatched { .. }
| BookEvent::VoidTrades { .. }
| BookEvent::RunnerRemoved { .. }
| BookEvent::RunnersRemoved { .. }
| BookEvent::RunnersAdded { .. }
| BookEvent::MarketRemoved { .. } => {}
BookEvent::OrderCancelledBatched { .. }
| BookEvent::BatchProcessQueued { .. }
| BookEvent::BatchProcessStarted { .. }
| BookEvent::BatchProcessRetargeted { .. }
| BookEvent::BatchProcessCompleted { .. } => {
unreachable!("batch process events handled before match")
}
}
}
pub fn apply_all_events(&mut self, envs: &[BookEventEnvelope]) {
for env in envs {
self.apply_event(env);
}
}
fn apply_trade_fill(&mut self, order_id: OrderId, qty_shares: u64, ts: DateTime) {
if qty_shares == 0 {
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let was_live = matches!(
self.orders.slot(key).order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
let (side, tick, in_level, prev_remaining, filled_shares, remaining_shares) = {
let slot = self.orders.slot(key);
let prev_remaining = slot.order.remaining_shares();
if prev_remaining == 0 {
return;
}
let applied = qty_shares.min(prev_remaining);
let filled_shares = slot.order.filled_shares.saturating_add(applied);
let remaining_shares = slot.order.qty_shares.saturating_sub(filled_shares);
(
slot.level_side,
slot.level_tick as usize,
slot.in_level,
prev_remaining,
filled_shares,
remaining_shares,
)
};
if in_level {
let delta = qty_shares.min(prev_remaining);
match side {
Side::Yes => {
self.bids.decrement_total(tick, delta);
if remaining_shares == 0 {
self.bids.unlink(&mut self.orders, key, 0);
}
}
Side::No => {
self.asks.decrement_total(tick, delta);
if remaining_shares == 0 {
self.asks.unlink(&mut self.orders, key, 0);
}
}
}
}
if let Some(order) = self.orders.get_mut(&order_id) {
order.filled_shares = filled_shares;
order.info.last_updated_at = ts;
order.info.state = if remaining_shares == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if remaining_shares == 0 {
if was_live {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.orders.remove(&order_id);
}
}
}
impl Serialize for BinaryYesBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut orders = BTreeMap::new();
for (oid, _, order) in self.orders.iter_keys_sorted() {
orders.insert(oid, order.clone());
}
let snap = BinaryYesBookSnapshot {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_kind: self.market_kind,
state: self.state,
market_phase: self.market_phase,
batch_process: self.batch_process.clone(),
queued_batches: self.queued_batches.clone(),
close_batch_max_events: self.close_batch_max_events,
yes_runner_id: self.yes_runner_id,
no_runner_id: self.no_runner_id,
yes_runner_label: self.yes_runner_label.clone(),
no_runner_label: self.no_runner_label.clone(),
max_price_ticks: self.max_price_ticks,
next_order_id: self.next_order_id,
orders,
};
snap.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for BinaryYesBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = BinaryYesBookSnapshot::deserialize(deserializer)?;
let mut book = BinaryYesBook::new_with_capacity(
snap.market_id,
snap.market_kind,
snap.market_phase,
snap.yes_runner_id,
snap.no_runner_id,
snap.max_price_ticks,
snap.orders.len(),
);
book.market_name = snap.market_name;
book.state = snap.state;
book.batch_process = snap.batch_process;
book.queued_batches = snap.queued_batches;
book.close_batch_max_events = snap.close_batch_max_events;
book.yes_runner_label = snap.yes_runner_label;
book.no_runner_label = snap.no_runner_label;
book.next_order_id = snap.next_order_id;
for (oid, order) in snap.orders {
book.orders.insert(oid, order);
}
book.rebuild_ladders();
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::book::common::types::BookOrderState;
use crate::types::AccountId;
fn sample_order(order_id: OrderId, side: Side, price_ticks: u16) -> BinaryOrder {
BinaryOrder {
info: BookOrderInfo {
order_id,
account_id: AccountId::from(1_u64),
correlation_id: None,
side,
state: BookOrderState::ExecutableUnmatched,
created_at: Utc::now(),
last_updated_at: Utc::now(),
},
price_ticks,
qty_shares: 100,
filled_shares: 0,
time_in_force: TimeInForce::Gtc,
}
}
#[test]
fn sidebook_allocates_one_page_on_first_touch() {
let mut store = OrderStore::with_capacity(1);
let mut book = SideBook::new(257);
let key = store.insert(OrderId(1), sample_order(OrderId(1), Side::Yes, 65));
book.insert_tail(&mut store, key, 100);
assert_eq!(book.pages.len(), 1);
assert_eq!(book.page_refs[1], 1);
assert_eq!(book.best_desc(), Some(65));
}
#[test]
fn sidebook_unlink_clears_tick_bit_but_retains_page() {
let mut store = OrderStore::with_capacity(1);
let mut book = SideBook::new(257);
let key = store.insert(OrderId(1), sample_order(OrderId(1), Side::Yes, 65));
book.insert_tail(&mut store, key, 100);
book.unlink(&mut store, key, 100);
assert_eq!(book.pages.len(), 1);
assert_eq!(book.best_desc(), None);
assert_eq!(book.level_total_remaining(65), 0);
assert_eq!(book.level_head(65), None);
}
#[test]
fn sidebook_reuses_page_after_tick_reactivates() {
let mut store = OrderStore::with_capacity(2);
let mut book = SideBook::new(257);
let key1 = store.insert(OrderId(1), sample_order(OrderId(1), Side::Yes, 65));
book.insert_tail(&mut store, key1, 100);
book.unlink(&mut store, key1, 100);
let key2 = store.insert(OrderId(2), sample_order(OrderId(2), Side::Yes, 65));
book.insert_tail(&mut store, key2, 100);
assert_eq!(book.pages.len(), 1);
assert_eq!(book.level_head(65), Some(key2));
assert_eq!(book.level_total_remaining(65), 100);
}
#[test]
fn sidebook_allocates_multiple_pages_for_distant_ticks() {
let mut store = OrderStore::with_capacity(2);
let mut book = SideBook::new(257);
let key_a = store.insert(OrderId(1), sample_order(OrderId(1), Side::Yes, 17));
let key_b = store.insert(OrderId(2), sample_order(OrderId(2), Side::Yes, 145));
book.insert_tail(&mut store, key_a, 100);
book.insert_tail(&mut store, key_b, 200);
assert_eq!(book.pages.len(), 2);
assert_eq!(book.page_refs[0], 1);
assert_eq!(book.page_refs[2], 2);
assert_eq!(book.best_asc(), Some(17));
assert_eq!(book.next_asc_from(18), Some(145));
assert_eq!(book.level_total_remaining(17), 100);
assert_eq!(book.level_total_remaining(145), 200);
}
#[test]
fn decrement_total_missing_page_is_noop() {
let mut book = SideBook::new(257);
book.decrement_total(145, 50);
assert_eq!(book.pages.len(), 0);
assert_eq!(book.level_total_remaining(145), 0);
assert_eq!(book.level_head(145), None);
}
#[test]
fn absent_page_behaves_as_empty_level() {
let book = SideBook::new(257);
assert_eq!(book.level_total_remaining(17), 0);
assert_eq!(book.level_head(17), None);
}
#[test]
fn boundary_ticks_remain_excluded_from_best_price_and_depth() {
let mut book = BinaryYesBook::new_with_capacity(
MarketId(1),
crate::types::MarketKind::InPlayCapable,
MarketPhase::Pre,
RunnerId(1),
RunnerId(2),
10,
4,
);
let bid_zero = book
.orders
.insert(OrderId(1), sample_order(OrderId(1), Side::Yes, 0));
book.bids.insert_tail(&mut book.orders, bid_zero, 100);
let ask_max = book
.orders
.insert(OrderId(2), sample_order(OrderId(2), Side::No, 10));
book.asks.insert_tail(&mut book.orders, ask_max, 100);
assert_eq!(book.best_bid_ticks(), None);
assert_eq!(book.best_ask_ticks(), None);
let depth = book.depth(5);
assert!(depth.bids.is_empty());
assert!(depth.asks.is_empty());
}
}