use crate::book::common::events::{
BookEvent, BookEventEnvelope, CancelCause, EventMetadata, EventVec, TradeRole,
};
use crate::book::common::response::{CommandResponse, CommandResponseKind, PlaceOrderResult};
use crate::book::common::types::BookOrderInfo;
use crate::book::common::types::{
BatchProcessState, BinaryDepth, BinaryPriceSize, BookMarketState, BookOrderState,
CancelProcessState, CloseProcessState, OrderBatchProcessState, RunnerPrices, RunnerResult,
};
use crate::book::error::BookError;
use crate::book::protocol::command::{Command, CommandKind, MarketState, Side, TimeInForce};
use crate::book::protocol::reject::RejectReason;
use crate::types::{
AccountId, CorrelationId, DateTime, FillPrice, FillQuantity, MarketId, Money, OrderId, RunnerId,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::collections::{BTreeMap, BTreeSet};
use tracing::error;
pub(crate) type OrderKey = usize;
#[derive(Debug, Clone, Copy, Default)]
struct Links {
prev: Option<OrderKey>,
next: Option<OrderKey>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BinaryOrder {
info: BookOrderInfo,
price_ticks: u16,
qty_shares: u64,
filled_shares: u64,
time_in_force: TimeInForce,
}
impl BinaryOrder {
fn remaining_shares(&self) -> u64 {
self.qty_shares.saturating_sub(self.filled_shares)
}
}
#[derive(Debug, Clone)]
struct OrderSlot {
order: BinaryOrder,
links: Links,
in_level: bool,
level_side: Side,
level_tick: u16,
}
impl OrderSlot {
fn new(order: BinaryOrder) -> Self {
Self {
level_side: order.info.side,
level_tick: order.price_ticks,
order,
links: Links::default(),
in_level: false,
}
}
}
#[derive(Debug, Clone, Default)]
struct OrderStore {
slab: Slab<OrderSlot>,
by_id: HashMap<OrderId, OrderKey>,
by_id_sorted: BTreeMap<OrderId, OrderKey>,
}
impl OrderStore {
fn with_capacity(capacity: usize) -> Self {
Self {
slab: Slab::with_capacity(capacity),
by_id: HashMap::with_capacity(capacity),
by_id_sorted: BTreeMap::new(),
}
}
fn contains(&self, order_id: &OrderId) -> bool {
self.by_id.contains_key(order_id)
}
fn get_key(&self, order_id: &OrderId) -> Option<OrderKey> {
self.by_id.get(order_id).copied()
}
fn get(&self, order_id: &OrderId) -> Option<&BinaryOrder> {
let key = self.get_key(order_id)?;
Some(&self.slab.get(key)?.order)
}
fn get_mut(&mut self, order_id: &OrderId) -> Option<&mut BinaryOrder> {
let key = self.get_key(order_id)?;
Some(&mut self.slab.get_mut(key)?.order)
}
fn slot(&self, key: OrderKey) -> &OrderSlot {
&self.slab[key]
}
fn slot_mut(&mut self, key: OrderKey) -> &mut OrderSlot {
&mut self.slab[key]
}
fn insert(&mut self, order_id: OrderId, order: BinaryOrder) -> OrderKey {
debug_assert_eq!(order_id, order.info.order_id);
let key = self.slab.insert(OrderSlot::new(order));
self.by_id.insert(order_id, key);
self.by_id_sorted.insert(order_id, key);
key
}
fn iter_keys_sorted(&self) -> impl Iterator<Item = (OrderId, OrderKey, &BinaryOrder)> {
self.by_id_sorted
.iter()
.map(|(&oid, &key)| (oid, key, &self.slab[key].order))
}
fn iter_keys_sorted_from(
&self,
cursor_after: Option<OrderId>,
) -> impl Iterator<Item = (OrderId, OrderKey, &BinaryOrder)> {
use std::ops::Bound;
let range = match cursor_after {
Some(c) => (Bound::Excluded(c), Bound::Unbounded),
None => (Bound::Unbounded, Bound::Unbounded),
};
self.by_id_sorted
.range(range)
.map(|(&oid, &key)| (oid, key, &self.slab[key].order))
}
fn remove(&mut self, order_id: &OrderId) -> bool {
let Some(key) = self.by_id.remove(order_id) else {
return false;
};
self.by_id_sorted.remove(order_id);
self.slab.remove(key);
true
}
}
#[derive(Debug, Clone, Default)]
struct Bitset {
words: Vec<u64>,
}
impl Bitset {
fn new(bits: usize) -> Self {
Self {
words: vec![0u64; bits.div_ceil(64)],
}
}
fn set(&mut self, idx: usize) {
let w = idx / 64;
let b = idx % 64;
if w < self.words.len() {
self.words[w] |= 1u64 << b;
}
}
fn clear(&mut self, idx: usize) {
let w = idx / 64;
let b = idx % 64;
if w < self.words.len() {
self.words[w] &= !(1u64 << b);
}
}
fn next_set_from(&self, start: usize, bit_len: usize) -> Option<usize> {
if start >= bit_len {
return None;
}
let mut w = start / 64;
let mut bits = self.words[w] & (!0u64 << (start % 64));
loop {
if bits != 0 {
let idx = w * 64 + bits.trailing_zeros() as usize;
return (idx < bit_len).then_some(idx);
}
w += 1;
if w >= self.words.len() {
return None;
}
bits = self.words[w];
}
}
fn prev_set_from(&self, start: usize, bit_len: usize) -> Option<usize> {
if bit_len == 0 {
return None;
}
let start = start.min(bit_len.saturating_sub(1));
let mut w = start / 64;
let mut bits = self.words[w] & (!0u64 >> (63 - (start % 64)));
loop {
if bits != 0 {
let idx = w * 64 + (63 - bits.leading_zeros() as usize);
return (idx < bit_len).then_some(idx);
}
if w == 0 {
return None;
}
w -= 1;
bits = self.words[w];
}
}
}
#[derive(Debug, Clone, Copy, Default)]
struct Level {
head: Option<OrderKey>,
tail: Option<OrderKey>,
total_remaining: u64,
}
#[derive(Debug, Clone)]
struct SideBook {
levels: Vec<Level>,
mask: Bitset,
bits: usize,
}
impl SideBook {
fn new(bits: usize) -> Self {
Self {
levels: vec![Level::default(); bits],
mask: Bitset::new(bits),
bits,
}
}
fn level(&self, tick: usize) -> &Level {
&self.levels[tick]
}
fn level_mut(&mut self, tick: usize) -> &mut Level {
&mut self.levels[tick]
}
fn best_asc(&self) -> Option<usize> {
self.mask.next_set_from(0, self.bits)
}
fn best_desc(&self) -> Option<usize> {
self.mask
.prev_set_from(self.bits.saturating_sub(1), self.bits)
}
fn next_asc_from(&self, start: usize) -> Option<usize> {
self.mask.next_set_from(start, self.bits)
}
fn next_desc_from(&self, start: usize) -> Option<usize> {
self.mask.prev_set_from(start, self.bits)
}
fn insert_tail(&mut self, orders: &mut OrderStore, key: OrderKey, remaining: u64) {
let tick = orders.slot(key).level_tick as usize;
let level = &mut self.levels[tick];
if level.head.is_none() {
self.mask.set(tick);
level.head = Some(key);
level.tail = Some(key);
} else {
let Some(tail) = level.tail else {
error!(tick, "tail missing for non-empty level");
return;
};
orders.slot_mut(tail).links.next = Some(key);
orders.slot_mut(key).links.prev = Some(tail);
level.tail = Some(key);
}
level.total_remaining = level.total_remaining.saturating_add(remaining);
let slot = orders.slot_mut(key);
slot.in_level = true;
slot.links.next = None;
}
fn unlink(&mut self, orders: &mut OrderStore, key: OrderKey, remaining: u64) {
let (tick, prev, next) = {
let slot = orders.slot(key);
(slot.level_tick as usize, slot.links.prev, slot.links.next)
};
let became_empty = {
let level = self.level_mut(tick);
if level.head == Some(key) {
level.head = next;
}
if level.tail == Some(key) {
level.tail = prev;
}
if let Some(p) = prev {
orders.slot_mut(p).links.next = next;
}
if let Some(n) = next {
orders.slot_mut(n).links.prev = prev;
}
level.total_remaining = level.total_remaining.saturating_sub(remaining);
level.head.is_none()
};
if became_empty {
self.mask.clear(tick);
}
let slot = orders.slot_mut(key);
slot.links = Links::default();
slot.in_level = false;
}
}
#[derive(Debug, Clone)]
pub struct BinaryYesBook {
market_id: MarketId,
pub(crate) market_name: String,
market_kind: crate::types::MarketKind,
state: BookMarketState,
state_before_suspend: Option<BookMarketState>,
state_before_halt: Option<BookMarketState>,
batch_process: Option<BatchProcessState>,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
next_order_id: u64,
orders: OrderStore,
active_order_count_cache: usize,
bids: SideBook,
asks: SideBook,
scratch: Scratch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BinaryYesBookSnapshot {
market_id: MarketId,
market_kind: crate::types::MarketKind,
state: BookMarketState,
state_before_suspend: Option<BookMarketState>,
state_before_halt: Option<BookMarketState>,
batch_process: Option<BatchProcessState>,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
next_order_id: u64,
orders: BTreeMap<OrderId, BinaryOrder>,
}
#[derive(Debug, Default, Clone)]
struct Scratch {
events: EventVec,
maker_fills: HashMap<OrderId, u64>,
}
#[derive(Debug, Clone)]
struct PlacePlan {
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
side: Side,
price_ticks: u16,
qty_shares: u64,
time_in_force: TimeInForce,
}
#[derive(Debug, Clone)]
struct ReplacePlan {
correlation_id: Option<CorrelationId>,
order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
new_price_ticks: Option<u16>,
new_qty_shares: Option<u64>,
}
#[derive(Debug, Clone)]
struct MatchRequest {
ts: DateTime,
taker_order_id: OrderId,
taker_account_id: AccountId,
taker_side: Side,
taker_price_ticks: u16,
taker_qty_shares: u64,
}
impl BinaryYesBook {
pub fn new_with_capacity(
market_id: MarketId,
market_kind: crate::types::MarketKind,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
order_store_capacity: usize,
) -> Self {
let bits = max_price_ticks as usize + 1;
Self {
market_id,
market_name: String::new(),
market_kind,
state: BookMarketState::Open,
state_before_suspend: None,
state_before_halt: None,
batch_process: None,
yes_runner_id,
no_runner_id,
max_price_ticks,
next_order_id: 1,
orders: OrderStore::with_capacity(order_store_capacity),
active_order_count_cache: 0,
bids: SideBook::new(bits),
asks: SideBook::new(bits),
scratch: Scratch::default(),
}
}
fn rebuild_ladders(&mut self) {
type RebuildLevelKey = (Side, u16);
type RebuildLevelItem = (DateTime, OrderId, OrderKey, u64);
type RebuildPerLevel = HashMap<RebuildLevelKey, Vec<RebuildLevelItem>>;
let bits = self.max_price_ticks as usize + 1;
self.bids = SideBook::new(bits);
self.asks = SideBook::new(bits);
self.active_order_count_cache = 0;
let keys: Vec<OrderKey> = self.orders.by_id_sorted.values().copied().collect();
for key in keys {
let slot = self.orders.slot_mut(key);
slot.links = Links::default();
slot.in_level = false;
}
let mut per_level: RebuildPerLevel = HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
let remaining = order.remaining_shares();
if remaining == 0 {
continue;
}
if order.price_ticks == 0 || order.price_ticks >= self.max_price_ticks {
continue;
}
per_level
.entry((order.info.side, order.price_ticks))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((_, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
for (_, _, key, remaining) in items {
match self.orders.slot(key).level_side {
Side::Yes => self.bids.insert_tail(&mut self.orders, key, remaining),
Side::No => self.asks.insert_tail(&mut self.orders, key, remaining),
}
}
}
}
fn apply_remove_order(&mut self, order_id: OrderId) {
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
if matches!(
self.orders.slot(key).order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
let prev_remaining = self.orders.slot(key).order.remaining_shares();
if self.orders.slot(key).in_level && prev_remaining > 0 {
match self.orders.slot(key).level_side {
Side::Yes => self.bids.unlink(&mut self.orders, key, prev_remaining),
Side::No => self.asks.unlink(&mut self.orders, key, prev_remaining),
}
}
self.orders.remove(&order_id);
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders
.get_key(&order_id)
.map(|k| self.orders.slot(k).in_level)
.unwrap_or(false)
}
pub fn active_order_count(&self) -> usize {
self.active_order_count_cache
}
pub fn close_process_state(&self) -> Option<CloseProcessState> {
match self.batch_process.as_ref() {
Some(BatchProcessState::Close(s)) => Some(*s),
_ => None,
}
}
pub fn cancel_process_state(&self) -> Option<&CancelProcessState> {
match self.batch_process.as_ref() {
Some(BatchProcessState::Cancel(s)) => Some(s),
_ => None,
}
}
pub fn batch_process_state(&self) -> Option<&BatchProcessState> {
self.batch_process.as_ref()
}
fn has_active_batch_process(&self) -> bool {
self.batch_process.is_some()
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
[self.yes_runner_id, self.no_runner_id].into_iter()
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
}
}
pub fn max_price_ticks(&self) -> u16 {
self.max_price_ticks
}
pub fn best_bid_ticks(&self) -> Option<u16> {
let t = self.bids.best_desc()?;
let t_u16 = t as u16;
(t_u16 > 0 && t_u16 < self.max_price_ticks).then_some(t_u16)
}
pub fn best_ask_ticks(&self) -> Option<u16> {
let t = self.asks.best_asc()?;
let t_u16 = t as u16;
(t_u16 > 0 && t_u16 < self.max_price_ticks).then_some(t_u16)
}
pub fn depth(&self, depth: usize) -> BinaryDepth {
let mut bids = Vec::with_capacity(depth);
let mut asks = Vec::with_capacity(depth);
let mut tick = self.bids.best_desc();
while let Some(t) = tick {
if bids.len() >= depth {
break;
}
let t_u16 = t as u16;
if t_u16 > 0 && t_u16 < self.max_price_ticks {
let lvl = self.bids.level(t);
if lvl.total_remaining > 0 {
bids.push(BinaryPriceSize {
price_ticks: t_u16,
size_shares: lvl.total_remaining,
});
}
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
let mut tick = self.asks.best_asc();
while let Some(t) = tick {
if asks.len() >= depth {
break;
}
let t_u16 = t as u16;
if t_u16 > 0 && t_u16 < self.max_price_ticks {
let lvl = self.asks.level(t);
if lvl.total_remaining > 0 {
asks.push(BinaryPriceSize {
price_ticks: t_u16,
size_shares: lvl.total_remaining,
});
}
}
tick = self.asks.next_asc_from(t + 1);
}
BinaryDepth {
max_price_ticks: self.max_price_ticks,
bids,
asks,
}
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<crate::book::PriceSize> {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return None;
}
None
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<crate::book::PriceSize> {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return None;
}
None
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return Money::zero();
}
Money::zero()
}
pub fn total_matched(&self) -> Money {
Money::zero()
}
fn emit(&self, ts: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_name: self.market_name.clone(),
market_seq: 0,
timestamp: ts,
metadata: EventMetadata::default(),
event,
}
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let ts = Utc::now();
if cmd.market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason } => {
let events = self.cmd_halt_market(ts, *reason).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
_ => {}
}
if self.has_active_batch_process()
&& !matches!(
&cmd.kind,
CommandKind::ContinueCloseMarket
| CommandKind::ContinueBatchCancelOrders
| CommandKind::ContinueLapseOrders
| CommandKind::ContinueVoidOrders
)
{
return Err(err(RejectReason::MarketBatchCancelling));
}
if self.state.is_halted()
&& !matches!(
&cmd.kind,
CommandKind::BatchCancelOrders { .. } | CommandKind::ContinueBatchCancelOrders
)
{
return Err(err(RejectReason::MarketHalted));
}
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
scratch.events.clear();
scratch.maker_fills.clear();
let resp_kind = match &cmd.kind {
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("halt/resume handled above")
}
CommandKind::CreateMarket { .. } => return Err(err(RejectReason::InternalError)),
CommandKind::SetMarketState { state } => {
match state {
MarketState::Closed => self
.plan_close_market(
&mut scratch,
ts,
crate::config::DEFAULT_SET_MARKET_STATE_BATCH,
)
.map_err(&err)?,
_ => {
let reason = "SET_MARKET_STATE";
self.plan_set_market_state(&mut scratch, ts, *state, reason)
.map_err(&err)?;
}
}
None
}
CommandKind::CloseMarket {
batch_max_events, ..
} => {
self.plan_close_market(&mut scratch, ts, *batch_max_events)
.map_err(&err)?;
None
}
CommandKind::ContinueCloseMarket => {
self.plan_continue_close_market(&mut scratch, ts)
.map_err(&err)?;
None
}
CommandKind::BatchCancelOrders {
from_created_at_inclusive,
to_created_at_inclusive,
account_id,
runner_id,
batch_max_events,
reason,
} => {
self.plan_batch_cancel_orders(
&mut scratch,
ts,
*from_created_at_inclusive,
*to_created_at_inclusive,
account_id.clone(),
*runner_id,
*batch_max_events,
reason.as_str(),
cmd.metadata.clone(),
)
.map_err(&err)?;
None
}
CommandKind::ContinueBatchCancelOrders => {
self.plan_continue_batch_cancel_orders(&mut scratch, ts)
.map_err(&err)?;
None
}
CommandKind::ContinueLapseOrders => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::ContinueVoidOrders => {
self.plan_continue_void_orders(&mut scratch, ts)
.map_err(&err)?;
None
}
CommandKind::VoidMarket { reason } => {
self.plan_void_market(&mut scratch, ts, reason)
.map_err(&err)?;
None
}
CommandKind::SettleMarket {
runner_results,
dead_heat_divisor,
} => {
self.plan_settle_market(&mut scratch, ts, runner_results, *dead_heat_divisor)
.map_err(&err)?;
None
}
CommandKind::PlaceBinaryOrder {
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
..
} => {
let order_id = OrderId(self.next_order_id);
let result = self
.plan_place_binary_order(
&mut scratch,
ts,
PlacePlan {
correlation_id: correlation_id.clone(),
order_id,
account_id: account_id.clone(),
side: *side,
price_ticks: *price_ticks,
qty_shares: *qty_shares,
time_in_force: *time_in_force,
},
)
.map_err(&err)?;
Some(CommandResponseKind::PlaceOrder(result))
}
CommandKind::CancelOrder {
account_id,
order_id,
} => {
self.plan_cancel_binary_order(
&mut scratch,
ts,
*order_id,
account_id.clone(),
"USER_CANCEL",
)
.map_err(&err)?;
None
}
CommandKind::ReplaceBinaryOrder {
account_id,
order_id,
new_price_ticks,
new_qty_shares,
} => {
let new_order_id = OrderId(self.next_order_id);
self.plan_replace_binary_order(
&mut scratch,
ts,
ReplacePlan {
correlation_id: correlation_id.clone(),
order_id: *order_id,
new_order_id,
account_id: account_id.clone(),
new_price_ticks: *new_price_ticks,
new_qty_shares: *new_qty_shares,
},
)
.map_err(&err)?;
None
}
CommandKind::PlaceOrder { .. }
| CommandKind::ReplaceOrder { .. }
| CommandKind::RemoveRunner { .. }
| CommandKind::VoidTradesFromTime { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::RemoveMarket => return Err(err(RejectReason::InternalError)),
};
let envs = scratch.events.iter().cloned().collect::<Vec<_>>();
Ok((
envs,
CommandResponse {
correlation_id: correlation_id.clone(),
kind: resp_kind,
},
))
})();
self.scratch = scratch;
result
}
fn plan_set_market_state(
&self,
scratch: &mut Scratch,
ts: DateTime,
state: MarketState,
reason: &str,
) -> Result<(), RejectReason> {
let to = match state {
MarketState::Open => BookMarketState::Open,
MarketState::InPlay => {
if matches!(self.market_kind, crate::types::MarketKind::PreEventOnly) {
return Err(RejectReason::MarketInPlayNotSupported);
}
BookMarketState::TurnInPlayEnabled
}
MarketState::Suspended => BookMarketState::Suspended,
MarketState::Closed => BookMarketState::Closed,
};
if self.state.is_terminal()
&& !matches!(to, BookMarketState::Settled | BookMarketState::Voided)
{
return Err(RejectReason::MarketTerminal);
}
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
},
));
Ok(())
}
fn emit_state_change(
&self,
scratch: &mut Scratch,
ts: DateTime,
to: BookMarketState,
reason: &str,
) {
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
},
));
}
fn cmd_halt_market(&self, ts: DateTime, reason: u32) -> Result<EventVec, RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: BookMarketState::Halted,
reason: format!("HALT:{reason}"),
},
));
Ok(events)
}
fn cmd_resume_market(&self, ts: DateTime) -> Result<EventVec, RejectReason> {
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let restored = self.state_before_halt.unwrap_or(BookMarketState::Open);
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: restored,
reason: "RESUME".to_string(),
},
));
Ok(events)
}
fn is_live_order(order: &BinaryOrder) -> bool {
matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
}
fn count_live_orders(&self) -> u64 {
self.orders
.iter_keys_sorted()
.filter(|(_, _, o)| Self::is_live_order(o))
.count() as u64
}
fn select_live_orders_chunk(
&self,
cursor_after: Option<OrderId>,
max_cancels: usize,
) -> (
Vec<OrderId>,
Vec<AccountId>,
Option<OrderId>,
Option<AccountId>,
bool,
u64,
) {
let mut orders_to_cancel = Vec::new();
let mut account_ids = Vec::new();
let mut last = cursor_after;
let mut last_account_id = None;
let mut it = self.orders.iter_keys_sorted_from(cursor_after).peekable();
while let Some((oid, _, order)) = it.next() {
last = Some(oid);
last_account_id = Some(order.info.account_id.clone());
if !Self::is_live_order(order) {
continue;
}
orders_to_cancel.push(oid);
account_ids.push(order.info.account_id.clone());
if orders_to_cancel.len() >= max_cancels {
let done = it.peek().is_none();
return (
orders_to_cancel,
account_ids,
last,
last_account_id,
done,
max_cancels as u64,
);
}
}
let cancelled = orders_to_cancel.len();
(
orders_to_cancel,
account_ids,
last,
last_account_id,
true,
cancelled as u64,
)
}
fn plan_close_market(
&mut self,
scratch: &mut Scratch,
ts: DateTime,
batch_max_events: u16,
) -> Result<(), RejectReason> {
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if batch_max_events < crate::book::close_process::MIN_CLOSE_BATCH_EVENTS {
return Err(RejectReason::InvalidBatchSize);
}
let total_live_orders = self.count_live_orders();
if total_live_orders > 0 && batch_max_events < 1 {
return Err(RejectReason::InvalidBatchSize);
}
let cancel_budget = batch_max_events as usize;
let (cancelled_order_ids, account_ids, cursor_after, cursor_after_account_id, done, _) =
self.select_live_orders_chunk(None, cancel_budget);
let reason = crate::book::close_process::close_start_reason(batch_max_events);
self.emit_state_change(scratch, ts, BookMarketState::Closed, &reason);
if !cancelled_order_ids.is_empty() {
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
account_ids,
cursor_after: if done { None } else { cursor_after },
cursor_after_account_id: if done { None } else { cursor_after_account_id },
order_ids: cancelled_order_ids,
is_final: done,
cancel_cause: CancelCause::CloseCancel,
cause_detail: None,
},
));
}
if done {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::CloseCancel,
},
));
}
Ok(())
}
fn plan_continue_close_market(
&mut self,
scratch: &mut Scratch,
ts: DateTime,
) -> Result<(), RejectReason> {
let Some(BatchProcessState::Close(proc_state)) = self.batch_process else {
return Err(RejectReason::MarketNotBatchCancelling);
};
if proc_state.cancelled_total >= proc_state.total_live_orders {
return Err(RejectReason::MarketNotBatchCancelling);
}
let cancel_budget = proc_state.batch_max_events as usize;
let (cancelled_order_ids, account_ids, cursor_after, cursor_after_account_id, done, _) =
self.select_live_orders_chunk(proc_state.cursor_after, cancel_budget);
if !cancelled_order_ids.is_empty() {
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
account_ids,
cursor_after: if done { None } else { cursor_after },
cursor_after_account_id: if done { None } else { cursor_after_account_id },
order_ids: cancelled_order_ids,
is_final: done,
cancel_cause: CancelCause::CloseCancel,
cause_detail: None,
},
));
}
if done {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::CloseCancel,
},
));
}
Ok(())
}
fn plan_void_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
reason: &str,
) -> Result<(), RejectReason> {
if self.state == BookMarketState::Voided {
return Err(RejectReason::MarketAlreadyVoided);
}
let batch_max_events = crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS;
let (order_ids, account_ids, cursor_after, cursor_after_account_id, done, _) =
self.select_live_orders_chunk(None, batch_max_events as usize);
self.emit_state_change(scratch, ts, BookMarketState::Voided, reason);
if !order_ids.is_empty() {
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_ids,
account_ids,
cursor_after: if done { None } else { cursor_after },
cursor_after_account_id: if done { None } else { cursor_after_account_id },
is_final: done,
cancel_cause: CancelCause::MarketVoid,
cause_detail: Some(reason.to_string()),
},
));
}
if done {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::MarketVoid,
},
));
}
Ok(())
}
fn plan_continue_void_orders(
&self,
scratch: &mut Scratch,
ts: DateTime,
) -> Result<(), RejectReason> {
let Some(BatchProcessState::Void(proc_state)) = self.batch_process.as_ref() else {
return Err(RejectReason::MarketNotBatchCancelling);
};
let (order_ids, account_ids, cursor_after, cursor_after_account_id, done, _) = self
.select_live_orders_chunk(
proc_state.cursor_after,
proc_state.batch_max_events as usize,
);
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_ids,
account_ids,
cursor_after: if done { None } else { cursor_after },
cursor_after_account_id: if done { None } else { cursor_after_account_id },
is_final: done,
cancel_cause: CancelCause::MarketVoid,
cause_detail: Some(proc_state.reason.clone()),
},
));
if done {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::MarketVoid,
},
));
}
Ok(())
}
fn plan_settle_market(
&self,
scratch: &mut Scratch,
ts: DateTime,
runner_results: &[(RunnerId, RunnerResult)],
dead_heat_divisor: Option<u32>,
) -> Result<(), RejectReason> {
if self.state != BookMarketState::Closed {
return Err(RejectReason::MarketNotClosed);
}
let result_runners: BTreeSet<_> = runner_results.iter().map(|(r, _)| *r).collect();
if result_runners.len() != runner_results.len() {
return Err(RejectReason::DuplicateRunner);
}
let active_runners: BTreeSet<_> = [self.yes_runner_id, self.no_runner_id]
.into_iter()
.collect();
if result_runners != active_runners {
return Err(RejectReason::IncompleteResults);
}
scratch.events.push(self.emit(
ts,
BookEvent::MarketSettled {
runner_results: runner_results.to_vec(),
dead_heat_divisor,
},
));
self.emit_state_change(scratch, ts, BookMarketState::Settled, "SETTLED");
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn plan_batch_cancel_orders(
&self,
scratch: &mut Scratch,
ts: DateTime,
from_created_at_inclusive: Option<DateTime>,
to_created_at_inclusive: Option<DateTime>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
batch_max_events: u16,
reason: &str,
final_event_metadata: Option<serde_json::Value>,
) -> Result<(), RejectReason> {
if self.has_active_batch_process() {
return Err(RejectReason::MarketBatchCancelling);
}
if batch_max_events < 1 {
return Err(RejectReason::InvalidBatchSize);
}
let from_ms_opt = from_created_at_inclusive.map(|d| d.timestamp_millis());
let to_ms_opt = to_created_at_inclusive.map(|d| d.timestamp_millis());
let metadata_json = final_event_metadata
.as_ref()
.and_then(|m| serde_json::to_string(m).ok());
let started_at_ms = ts.timestamp_millis();
scratch.events.push(self.emit(
ts,
BookEvent::BatchCancelStarted {
batch_max_events,
started_at_ms,
from_created_at_inclusive_ms: from_ms_opt,
to_created_at_inclusive_ms: to_ms_opt,
account_filter: account_filter.clone(),
runner_filter,
reason: reason.to_string(),
final_event_metadata_json: metadata_json,
},
));
let runner_filter_matches_market = runner_filter
.map(|r| r == self.yes_runner_id || r == self.no_runner_id)
.unwrap_or(true);
if !runner_filter_matches_market {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
return Ok(());
}
let mut cancelled_order_ids = Vec::new();
let mut cancelled_account_ids = Vec::new();
let mut last: Option<OrderId>;
let mut last_account_id: Option<AccountId>;
let mut cancelled = 0usize;
let mut it = self.orders.iter_keys_sorted().peekable();
while let Some((oid, _, order)) = it.next() {
last = Some(oid);
last_account_id = Some(order.info.account_id.clone());
if !Self::is_live_order(order) {
continue;
}
let created_at_ms = order.info.created_at.timestamp_millis();
if created_at_ms > started_at_ms {
continue;
}
if let Some(from_ms) = from_ms_opt
&& created_at_ms < from_ms
{
continue;
}
if let Some(to_ms) = to_ms_opt
&& created_at_ms > to_ms
{
continue;
}
if let Some(account_id) = account_filter.as_ref()
&& &order.info.account_id != account_id
{
continue;
}
cancelled_order_ids.push(oid);
cancelled_account_ids.push(order.info.account_id.clone());
cancelled += 1;
if cancelled >= batch_max_events as usize {
let done = it.peek().is_none();
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
account_ids: cancelled_account_ids,
cursor_after: if done { None } else { last },
cursor_after_account_id: if done { None } else { last_account_id },
order_ids: cancelled_order_ids,
is_final: done,
cancel_cause: CancelCause::BatchCancel,
cause_detail: Some(reason.to_string()),
},
));
if done {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
}
return Ok(());
}
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
account_ids: cancelled_account_ids,
cursor_after: None,
cursor_after_account_id: None,
order_ids: cancelled_order_ids,
is_final: true,
cancel_cause: CancelCause::BatchCancel,
cause_detail: Some(reason.to_string()),
},
));
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
Ok(())
}
fn plan_continue_batch_cancel_orders(
&self,
scratch: &mut Scratch,
ts: DateTime,
) -> Result<(), RejectReason> {
let Some(BatchProcessState::Cancel(proc_state)) = self.batch_process.as_ref() else {
return Err(RejectReason::MarketNotBatchCancelling);
};
let mut cancelled_order_ids = Vec::new();
let mut cancelled_account_ids = Vec::new();
let mut last: Option<OrderId>;
let mut last_account_id: Option<AccountId>;
let mut cancelled = 0usize;
let mut it = self
.orders
.iter_keys_sorted_from(proc_state.cursor_after)
.peekable();
while let Some((oid, _, order)) = it.next() {
last = Some(oid);
last_account_id = Some(order.info.account_id.clone());
if !Self::is_live_order(order) {
continue;
}
let created_at_ms = order.info.created_at.timestamp_millis();
if created_at_ms > proc_state.started_at_ms {
continue;
}
if let Some(from_ms) = proc_state.from_created_at_inclusive_ms
&& created_at_ms < from_ms
{
continue;
}
if let Some(to_ms) = proc_state.to_created_at_inclusive_ms
&& created_at_ms > to_ms
{
continue;
}
if let Some(account_id) = proc_state.account_filter.as_ref()
&& &order.info.account_id != account_id
{
continue;
}
cancelled_order_ids.push(oid);
cancelled_account_ids.push(order.info.account_id.clone());
cancelled += 1;
if cancelled >= proc_state.batch_max_events as usize {
let done = it.peek().is_none();
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
account_ids: cancelled_account_ids,
cursor_after: if done { None } else { last },
cursor_after_account_id: if done { None } else { last_account_id },
order_ids: cancelled_order_ids,
is_final: done,
cancel_cause: CancelCause::BatchCancel,
cause_detail: Some(proc_state.reason.clone()),
},
));
if done && let Some(last_event) = scratch.events.last_mut() {
last_event.metadata = proc_state
.final_event_metadata_json
.as_ref()
.and_then(|m| serde_json::from_str(m).ok());
}
if done {
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
}
return Ok(());
}
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
account_ids: cancelled_account_ids,
cursor_after: None,
cursor_after_account_id: None,
order_ids: cancelled_order_ids,
is_final: true,
cancel_cause: CancelCause::BatchCancel,
cause_detail: Some(proc_state.reason.clone()),
},
));
if let Some(last_event) = scratch.events.last_mut() {
last_event.metadata = proc_state
.final_event_metadata_json
.as_ref()
.and_then(|m| serde_json::from_str(m).ok());
}
scratch.events.push(self.emit(
ts,
BookEvent::BatchProcessCompleted {
cancel_cause: CancelCause::BatchCancel,
},
));
Ok(())
}
fn validate_price_ticks(&self, price_ticks: u16) -> Result<(), RejectReason> {
if self.max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if price_ticks == 0 || price_ticks >= self.max_price_ticks {
return Err(RejectReason::InvalidPriceTicks);
}
Ok(())
}
fn validate_qty_shares(qty_shares: u64) -> Result<(), RejectReason> {
if qty_shares == 0 {
return Err(RejectReason::InvalidQtyShares);
}
Ok(())
}
fn available_to_match(
&self,
side: Side,
price_ticks: u16,
exclude_account_id: Option<AccountId>,
) -> u64 {
if let Some(exclude_account_id) = exclude_account_id {
let limit = price_ticks as usize;
return match side {
Side::Yes => {
let mut tick = self.asks.best_asc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
tick = self.asks.next_asc_from(t + 1);
continue;
}
if t > limit {
break;
}
let mut maker_key = self.asks.level(t).head;
while let Some(k) = maker_key {
let slot = self.orders.slot(k);
maker_key = slot.links.next;
if slot.order.info.account_id == exclude_account_id {
continue;
}
total = total.saturating_add(slot.order.remaining_shares());
}
tick = self.asks.next_asc_from(t + 1);
}
total
}
Side::No => {
let mut tick = self.bids.best_desc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
continue;
}
if t < limit {
break;
}
let mut maker_key = self.bids.level(t).head;
while let Some(k) = maker_key {
let slot = self.orders.slot(k);
maker_key = slot.links.next;
if slot.order.info.account_id == exclude_account_id {
continue;
}
total = total.saturating_add(slot.order.remaining_shares());
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
total
}
};
}
let limit = price_ticks as usize;
match side {
Side::Yes => {
let mut tick = self.asks.best_asc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
tick = self.asks.next_asc_from(t + 1);
continue;
}
if t > limit {
break;
}
total = total.saturating_add(self.asks.level(t).total_remaining);
tick = self.asks.next_asc_from(t + 1);
}
total
}
Side::No => {
let mut tick = self.bids.best_desc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
continue;
}
if t < limit {
break;
}
total = total.saturating_add(self.bids.level(t).total_remaining);
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
total
}
}
}
fn plan_place_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
plan: PlacePlan,
) -> Result<PlaceOrderResult, RejectReason> {
let PlacePlan {
correlation_id,
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
} = plan;
if !self.state.is_matchable() {
return Err(RejectReason::MarketNotOpen);
}
self.validate_price_ticks(price_ticks)?;
Self::validate_qty_shares(qty_shares)?;
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let TimeInForce::FillOrKill { min_fill } = time_in_force {
let required = min_fill.map(|v| v.0).unwrap_or(qty_shares);
let available = self.available_to_match(side, price_ticks, Some(account_id.clone()));
if available < required {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.push(self.emit(
ts,
BookEvent::BinaryOrderAccepted {
correlation_id,
order_id,
account_id: account_id.clone(),
side,
price_ticks,
qty_shares,
time_in_force,
},
));
let (matched_shares, avg_price_ticks) = self.plan_matching(
scratch,
MatchRequest {
ts,
taker_order_id: order_id,
taker_account_id: account_id.clone(),
taker_side: side,
taker_price_ticks: price_ticks,
taker_qty_shares: qty_shares,
},
);
let mut remaining = qty_shares.saturating_sub(matched_shares);
let mut state = if remaining == 0 {
BookOrderState::ExecutionComplete
} else if matched_shares > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let should_cancel_remainder =
matches!(time_in_force, TimeInForce::FillOrKill { .. }) && remaining > 0;
if should_cancel_remainder {
remaining = 0;
state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_ids: vec![order_id],
account_ids: vec![account_id],
cursor_after: None,
cursor_after_account_id: None,
is_final: true,
cancel_cause: CancelCause::FokRemainder,
cause_detail: None,
},
));
}
Ok(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Shares(matched_shares),
avg_matched_price: avg_price_ticks.map(FillPrice::Ticks),
remaining: FillQuantity::Shares(remaining),
final_order_state: Some(state),
})
}
fn plan_cancel_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
reason: &str,
) -> Result<(), RejectReason> {
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
if !matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
return Err(RejectReason::OrderNotLive);
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_ids: vec![order_id],
account_ids: vec![account_id],
cursor_after: None,
cursor_after_account_id: None,
is_final: true,
cancel_cause: match reason {
"USER_CANCEL" => CancelCause::UserCancel,
"REPLACE" => CancelCause::Replace,
_ => CancelCause::Admin,
},
cause_detail: Some(reason.to_string()),
},
));
Ok(())
}
fn plan_replace_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
plan: ReplacePlan,
) -> Result<(), RejectReason> {
let ReplacePlan {
correlation_id,
order_id,
new_order_id,
account_id,
new_price_ticks,
new_qty_shares,
} = plan;
let Some(old) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
if !matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
return Err(RejectReason::OrderNotLive);
}
let price_ticks = new_price_ticks.unwrap_or(old.price_ticks);
let qty_shares = new_qty_shares.unwrap_or(old.remaining_shares());
self.validate_price_ticks(price_ticks)?;
Self::validate_qty_shares(qty_shares)?;
self.plan_cancel_binary_order(scratch, ts, order_id, account_id.clone(), "REPLACE")?;
self.plan_place_binary_order(
scratch,
ts,
PlacePlan {
correlation_id,
order_id: new_order_id,
account_id,
side: old.info.side,
price_ticks,
qty_shares,
time_in_force: TimeInForce::Gtc,
},
)?;
Ok(())
}
fn plan_matching(&self, scratch: &mut Scratch, r: MatchRequest) -> (u64, Option<u16>) {
let MatchRequest {
ts,
taker_order_id,
taker_account_id,
taker_side,
taker_price_ticks,
taker_qty_shares,
} = r;
let mut remaining = taker_qty_shares;
if remaining == 0 {
return (0, None);
}
let mut matched = 0u64;
let mut price_qty_sum: u128 = 0;
match taker_side {
Side::Yes => {
let limit = taker_price_ticks as usize;
let mut tick = self.asks.best_asc();
while let Some(t) = tick {
if remaining == 0 || t > limit {
break;
}
let head = self.asks.level(t).head;
let mut maker_key = head;
while let Some(k) = maker_key {
if remaining == 0 {
break;
}
let maker = &self.orders.slot(k).order;
if maker.info.account_id == taker_account_id {
maker_key = self.orders.slot(k).links.next;
continue;
}
let maker_remaining = maker.remaining_shares().saturating_sub(
*scratch.maker_fills.get(&maker.info.order_id).unwrap_or(&0),
);
if maker_remaining == 0 {
maker_key = self.orders.slot(k).links.next;
continue;
}
let fill = remaining.min(maker_remaining);
remaining -= fill;
matched += fill;
price_qty_sum += fill as u128 * maker.price_ticks as u128;
scratch
.maker_fills
.entry(maker.info.order_id)
.and_modify(|v| *v = v.saturating_add(fill))
.or_insert(fill);
let maker_remaining = maker_remaining.saturating_sub(fill);
let taker_remaining = remaining;
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
order_id: maker.info.order_id,
account_id: maker.info.account_id.clone(),
role: TradeRole::Maker,
side: maker.info.side,
price_ticks: maker.price_ticks,
counter_party: taker_order_id,
counter_party_account_id: taker_account_id.clone(),
remaining_qty_shares: maker_remaining,
matched_delta_shares: fill,
},
));
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
order_id: taker_order_id,
account_id: taker_account_id.clone(),
role: TradeRole::Taker,
side: taker_side,
price_ticks: maker.price_ticks,
counter_party: maker.info.order_id,
counter_party_account_id: maker.info.account_id.clone(),
remaining_qty_shares: taker_remaining,
matched_delta_shares: fill,
},
));
maker_key = self.orders.slot(k).links.next;
}
tick = self.asks.next_asc_from(t + 1);
}
}
Side::No => {
let limit = taker_price_ticks as usize;
let mut tick = self.bids.best_desc();
while let Some(t) = tick {
if remaining == 0 || t < limit {
break;
}
let head = self.bids.level(t).head;
let mut maker_key = head;
while let Some(k) = maker_key {
if remaining == 0 {
break;
}
let maker = &self.orders.slot(k).order;
if maker.info.account_id == taker_account_id {
maker_key = self.orders.slot(k).links.next;
continue;
}
let maker_remaining = maker.remaining_shares().saturating_sub(
*scratch.maker_fills.get(&maker.info.order_id).unwrap_or(&0),
);
if maker_remaining == 0 {
maker_key = self.orders.slot(k).links.next;
continue;
}
let fill = remaining.min(maker_remaining);
remaining -= fill;
matched += fill;
price_qty_sum += fill as u128 * maker.price_ticks as u128;
scratch
.maker_fills
.entry(maker.info.order_id)
.and_modify(|v| *v = v.saturating_add(fill))
.or_insert(fill);
let maker_remaining = maker_remaining.saturating_sub(fill);
let taker_remaining = remaining;
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
order_id: maker.info.order_id,
account_id: maker.info.account_id.clone(),
role: TradeRole::Maker,
side: maker.info.side,
price_ticks: maker.price_ticks,
counter_party: taker_order_id,
counter_party_account_id: taker_account_id.clone(),
remaining_qty_shares: maker_remaining,
matched_delta_shares: fill,
},
));
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
order_id: taker_order_id,
account_id: taker_account_id.clone(),
role: TradeRole::Taker,
side: taker_side,
price_ticks: maker.price_ticks,
counter_party: maker.info.order_id,
counter_party_account_id: maker.info.account_id.clone(),
remaining_qty_shares: taker_remaining,
matched_delta_shares: fill,
},
));
maker_key = self.orders.slot(k).links.next;
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
}
}
let avg = if matched == 0 {
None
} else {
Some(((price_qty_sum + matched as u128 / 2) / matched as u128) as u16)
};
(matched, avg)
}
pub fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
match &env.event {
BookEvent::MarketCreated { .. } => {}
BookEvent::MarketStateChanged { to, reason } => {
if *to == BookMarketState::Closed {
let batch_max_events =
crate::book::close_process::parse_close_start_batch_max_events(
reason.as_str(),
)
.unwrap_or(crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS);
let total_live_orders = self.count_live_orders();
self.batch_process = Some(BatchProcessState::Close(CloseProcessState {
batch_max_events,
cursor_after: None,
total_live_orders,
cancelled_total: 0,
chunks_done: 0,
}));
}
let from = self.state;
if *to == BookMarketState::Suspended {
if from != BookMarketState::Suspended {
self.state_before_suspend = Some(from);
}
} else if from == BookMarketState::Suspended {
self.state_before_suspend = None;
}
if *to == BookMarketState::Halted {
self.state_before_halt = Some(from);
} else if from == BookMarketState::Halted {
self.state_before_halt = None;
}
self.state = *to;
}
BookEvent::MarketSettled { .. } => self.state = BookMarketState::Settled,
BookEvent::BatchCancelStarted {
batch_max_events,
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
reason,
final_event_metadata_json,
} => {
self.batch_process = Some(BatchProcessState::Cancel(CancelProcessState {
batch_max_events: *batch_max_events,
cursor_after: None,
started_at_ms: *started_at_ms,
from_created_at_inclusive_ms: *from_created_at_inclusive_ms,
to_created_at_inclusive_ms: *to_created_at_inclusive_ms,
account_filter: account_filter.clone(),
runner_filter: *runner_filter,
reason: reason.clone(),
final_event_metadata_json: final_event_metadata_json.clone(),
cancelled_total: 0,
chunks_done: 0,
}));
}
BookEvent::OrderCancelled {
cursor_after,
order_ids,
is_final,
cancel_cause,
cause_detail,
..
} => {
for &order_id in order_ids {
self.apply_remove_order(order_id);
}
match cancel_cause {
CancelCause::CloseCancel => {
if let Some(BatchProcessState::Close(s)) = self.batch_process.as_mut() {
s.cursor_after = *cursor_after;
s.cancelled_total =
s.cancelled_total.saturating_add(order_ids.len() as u64);
s.chunks_done = s.chunks_done.saturating_add(1);
}
}
CancelCause::BatchCancel => {
if let Some(BatchProcessState::Cancel(s)) = self.batch_process.as_mut() {
s.cursor_after = *cursor_after;
s.cancelled_total =
s.cancelled_total.saturating_add(order_ids.len() as u64);
s.chunks_done = s.chunks_done.saturating_add(1);
}
}
CancelCause::MarketVoid => {
if let Some(BatchProcessState::Void(s)) = self.batch_process.as_mut() {
s.cursor_after = *cursor_after;
s.processed_total =
s.processed_total.saturating_add(order_ids.len() as u64);
s.chunks_done = s.chunks_done.saturating_add(1);
} else if !is_final {
self.batch_process =
Some(BatchProcessState::Void(OrderBatchProcessState {
batch_max_events:
crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS,
cursor_after: *cursor_after,
reason: cause_detail
.clone()
.unwrap_or_else(|| "MARKET_VOID".to_string()),
processed_total: order_ids.len() as u64,
chunks_done: 1,
}));
}
}
_ => {}
}
}
BookEvent::BatchProcessCompleted { cancel_cause } => match cancel_cause {
CancelCause::CloseCancel | CancelCause::BatchCancel | CancelCause::MarketVoid => {
self.batch_process = None
}
_ => {}
},
BookEvent::BinaryOrderAccepted {
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
let order = BinaryOrder {
info: BookOrderInfo {
order_id: *order_id,
account_id: account_id.clone(),
side: *side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
price_ticks: *price_ticks,
qty_shares: *qty_shares,
filled_shares: 0,
time_in_force: *time_in_force,
};
let key = self.orders.insert(*order_id, order);
self.active_order_count_cache = self.active_order_count_cache.saturating_add(1);
match side {
Side::Yes => self.bids.insert_tail(&mut self.orders, key, *qty_shares),
Side::No => self.asks.insert_tail(&mut self.orders, key, *qty_shares),
}
}
BookEvent::BinaryTradeMatched {
order_id,
matched_delta_shares,
..
} => {
self.apply_trade_fill(*order_id, *matched_delta_shares, ts);
}
BookEvent::OrderAccepted { .. }
| BookEvent::TradeMatched { .. }
| BookEvent::VoidTradesFromTime { .. }
| BookEvent::RunnerRemoved { .. }
| BookEvent::MarketRemoved { .. } => {}
}
}
pub fn apply_all_events(&mut self, envs: &[BookEventEnvelope]) {
for env in envs {
self.apply_event(env);
}
}
fn apply_trade_fill(&mut self, order_id: OrderId, qty_shares: u64, ts: DateTime) {
if qty_shares == 0 {
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let was_live = matches!(
self.orders.slot(key).order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
let (side, tick, in_level, prev_remaining, filled_shares, remaining_shares) = {
let slot = self.orders.slot(key);
let prev_remaining = slot.order.remaining_shares();
if prev_remaining == 0 {
return;
}
let applied = qty_shares.min(prev_remaining);
let filled_shares = slot.order.filled_shares.saturating_add(applied);
let remaining_shares = slot.order.qty_shares.saturating_sub(filled_shares);
(
slot.level_side,
slot.level_tick as usize,
slot.in_level,
prev_remaining,
filled_shares,
remaining_shares,
)
};
if in_level {
let delta = qty_shares.min(prev_remaining);
match side {
Side::Yes => {
self.bids.levels[tick].total_remaining =
self.bids.levels[tick].total_remaining.saturating_sub(delta);
if remaining_shares == 0 {
self.bids.unlink(&mut self.orders, key, 0);
}
}
Side::No => {
self.asks.levels[tick].total_remaining =
self.asks.levels[tick].total_remaining.saturating_sub(delta);
if remaining_shares == 0 {
self.asks.unlink(&mut self.orders, key, 0);
}
}
}
}
if let Some(order) = self.orders.get_mut(&order_id) {
order.filled_shares = filled_shares;
order.info.last_updated_at = ts;
order.info.state = if remaining_shares == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if remaining_shares == 0 {
if was_live {
self.active_order_count_cache = self.active_order_count_cache.saturating_sub(1);
}
self.orders.remove(&order_id);
}
}
}
impl Serialize for BinaryYesBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut orders = BTreeMap::new();
for (oid, _, order) in self.orders.iter_keys_sorted() {
orders.insert(oid, order.clone());
}
let snap = BinaryYesBookSnapshot {
market_id: self.market_id,
market_kind: self.market_kind,
state: self.state,
state_before_suspend: self.state_before_suspend,
state_before_halt: self.state_before_halt,
batch_process: self.batch_process.clone(),
yes_runner_id: self.yes_runner_id,
no_runner_id: self.no_runner_id,
max_price_ticks: self.max_price_ticks,
next_order_id: self.next_order_id,
orders,
};
snap.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for BinaryYesBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = BinaryYesBookSnapshot::deserialize(deserializer)?;
let mut book = BinaryYesBook::new_with_capacity(
snap.market_id,
snap.market_kind,
snap.yes_runner_id,
snap.no_runner_id,
snap.max_price_ticks,
snap.orders.len(),
);
book.state = snap.state;
book.state_before_suspend = snap.state_before_suspend;
book.state_before_halt = snap.state_before_halt;
book.batch_process = snap.batch_process;
book.next_order_id = snap.next_order_id;
for (oid, order) in snap.orders {
book.orders.insert(oid, order);
}
book.rebuild_ladders();
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}