use crate::book::common::events::{BookEvent, BookEventEnvelope, EventVec, OrderInfo};
use crate::book::common::response::{CommandResponse, CommandResponseKind, PlaceOrderResult};
use crate::book::common::types::BookOrderInfo;
use crate::book::common::types::{
BinaryDepth, BinaryPriceSize, BookMarketState, BookOrderState, CloseProcessState, RunnerPrices,
};
use crate::book::error::BookError;
use crate::book::protocol::command::{Command, CommandKind, MarketState, Side, TimeInForce};
use crate::book::protocol::reject::RejectReason;
use crate::types::{
AccountId, CorrelationId, DateTime, FillPrice, FillQuantity, MarketId, Money, OrderId,
RunnerId, TradeId,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use slab::Slab;
use std::collections::BTreeMap;
use tracing::{error, warn};
pub(crate) type OrderKey = usize;
#[derive(Debug, Clone, Copy, Default)]
struct Links {
prev: Option<OrderKey>,
next: Option<OrderKey>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BinaryOrder {
info: BookOrderInfo,
price_ticks: u16,
qty_shares: u64,
filled_shares: u64,
time_in_force: TimeInForce,
}
impl BinaryOrder {
fn remaining_shares(&self) -> u64 {
self.qty_shares.saturating_sub(self.filled_shares)
}
}
#[derive(Debug, Clone)]
struct OrderSlot {
order: BinaryOrder,
links: Links,
in_level: bool,
level_side: Side,
level_tick: u16,
}
impl OrderSlot {
fn new(order: BinaryOrder) -> Self {
Self {
level_side: order.info.side,
level_tick: order.price_ticks,
order,
links: Links::default(),
in_level: false,
}
}
}
#[derive(Debug, Clone, Default)]
struct OrderStore {
slab: Slab<OrderSlot>,
by_id: HashMap<OrderId, OrderKey>,
by_id_sorted: BTreeMap<OrderId, OrderKey>,
}
impl OrderStore {
fn with_capacity(capacity: usize) -> Self {
Self {
slab: Slab::with_capacity(capacity),
by_id: HashMap::with_capacity(capacity),
by_id_sorted: BTreeMap::new(),
}
}
fn contains(&self, order_id: &OrderId) -> bool {
self.by_id.contains_key(order_id)
}
fn get_key(&self, order_id: &OrderId) -> Option<OrderKey> {
self.by_id.get(order_id).copied()
}
fn get(&self, order_id: &OrderId) -> Option<&BinaryOrder> {
let key = self.get_key(order_id)?;
Some(&self.slab.get(key)?.order)
}
fn get_mut(&mut self, order_id: &OrderId) -> Option<&mut BinaryOrder> {
let key = self.get_key(order_id)?;
Some(&mut self.slab.get_mut(key)?.order)
}
fn slot(&self, key: OrderKey) -> &OrderSlot {
&self.slab[key]
}
fn slot_mut(&mut self, key: OrderKey) -> &mut OrderSlot {
&mut self.slab[key]
}
fn insert(&mut self, order_id: OrderId, order: BinaryOrder) -> OrderKey {
debug_assert_eq!(order_id, order.info.order_id);
let key = self.slab.insert(OrderSlot::new(order));
self.by_id.insert(order_id, key);
self.by_id_sorted.insert(order_id, key);
key
}
fn iter_keys_sorted(&self) -> impl Iterator<Item = (OrderId, OrderKey, &BinaryOrder)> {
self.by_id_sorted
.iter()
.map(|(&oid, &key)| (oid, key, &self.slab[key].order))
}
fn iter_keys_sorted_from(
&self,
cursor_after: Option<OrderId>,
) -> impl Iterator<Item = (OrderId, OrderKey, &BinaryOrder)> {
use std::ops::Bound;
let range = match cursor_after {
Some(c) => (Bound::Excluded(c), Bound::Unbounded),
None => (Bound::Unbounded, Bound::Unbounded),
};
self.by_id_sorted
.range(range)
.map(|(&oid, &key)| (oid, key, &self.slab[key].order))
}
fn remove(&mut self, order_id: &OrderId) -> bool {
let Some(key) = self.by_id.remove(order_id) else {
return false;
};
self.by_id_sorted.remove(order_id);
self.slab.remove(key);
true
}
}
#[derive(Debug, Clone, Default)]
struct Bitset {
words: Vec<u64>,
}
impl Bitset {
fn new(bits: usize) -> Self {
Self {
words: vec![0u64; bits.div_ceil(64)],
}
}
fn set(&mut self, idx: usize) {
let w = idx / 64;
let b = idx % 64;
if w < self.words.len() {
self.words[w] |= 1u64 << b;
}
}
fn clear(&mut self, idx: usize) {
let w = idx / 64;
let b = idx % 64;
if w < self.words.len() {
self.words[w] &= !(1u64 << b);
}
}
fn next_set_from(&self, start: usize, bit_len: usize) -> Option<usize> {
if start >= bit_len {
return None;
}
let mut w = start / 64;
let mut bits = self.words[w] & (!0u64 << (start % 64));
loop {
if bits != 0 {
let idx = w * 64 + bits.trailing_zeros() as usize;
return (idx < bit_len).then_some(idx);
}
w += 1;
if w >= self.words.len() {
return None;
}
bits = self.words[w];
}
}
fn prev_set_from(&self, start: usize, bit_len: usize) -> Option<usize> {
if bit_len == 0 {
return None;
}
let start = start.min(bit_len.saturating_sub(1));
let mut w = start / 64;
let mut bits = self.words[w] & (!0u64 >> (63 - (start % 64)));
loop {
if bits != 0 {
let idx = w * 64 + (63 - bits.leading_zeros() as usize);
return (idx < bit_len).then_some(idx);
}
if w == 0 {
return None;
}
w -= 1;
bits = self.words[w];
}
}
}
#[derive(Debug, Clone, Copy, Default)]
struct Level {
head: Option<OrderKey>,
tail: Option<OrderKey>,
total_remaining: u64,
}
#[derive(Debug, Clone)]
struct SideBook {
levels: Vec<Level>,
mask: Bitset,
bits: usize,
}
impl SideBook {
fn new(bits: usize) -> Self {
Self {
levels: vec![Level::default(); bits],
mask: Bitset::new(bits),
bits,
}
}
fn level(&self, tick: usize) -> &Level {
&self.levels[tick]
}
fn level_mut(&mut self, tick: usize) -> &mut Level {
&mut self.levels[tick]
}
fn best_asc(&self) -> Option<usize> {
self.mask.next_set_from(0, self.bits)
}
fn best_desc(&self) -> Option<usize> {
self.mask
.prev_set_from(self.bits.saturating_sub(1), self.bits)
}
fn next_asc_from(&self, start: usize) -> Option<usize> {
self.mask.next_set_from(start, self.bits)
}
fn next_desc_from(&self, start: usize) -> Option<usize> {
self.mask.prev_set_from(start, self.bits)
}
fn insert_tail(&mut self, orders: &mut OrderStore, key: OrderKey, remaining: u64) {
let tick = orders.slot(key).level_tick as usize;
let level = &mut self.levels[tick];
if level.head.is_none() {
self.mask.set(tick);
level.head = Some(key);
level.tail = Some(key);
} else {
let Some(tail) = level.tail else {
error!(tick, "tail missing for non-empty level");
return;
};
orders.slot_mut(tail).links.next = Some(key);
orders.slot_mut(key).links.prev = Some(tail);
level.tail = Some(key);
}
level.total_remaining = level.total_remaining.saturating_add(remaining);
let slot = orders.slot_mut(key);
slot.in_level = true;
slot.links.next = None;
}
fn unlink(&mut self, orders: &mut OrderStore, key: OrderKey, remaining: u64) {
let (tick, prev, next) = {
let slot = orders.slot(key);
(slot.level_tick as usize, slot.links.prev, slot.links.next)
};
let became_empty = {
let level = self.level_mut(tick);
if level.head == Some(key) {
level.head = next;
}
if level.tail == Some(key) {
level.tail = prev;
}
if let Some(p) = prev {
orders.slot_mut(p).links.next = next;
}
if let Some(n) = next {
orders.slot_mut(n).links.prev = prev;
}
level.total_remaining = level.total_remaining.saturating_sub(remaining);
level.head.is_none()
};
if became_empty {
self.mask.clear(tick);
}
let slot = orders.slot_mut(key);
slot.links = Links::default();
slot.in_level = false;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BinaryTrade {
pub trade_id: TradeId,
pub maker_order_id: OrderId,
pub taker_order_id: OrderId,
pub price_ticks: u16,
pub qty_shares: u64,
pub matched_at: DateTime,
}
#[derive(Debug, Clone)]
pub struct BinaryYesBook {
market_id: MarketId,
market_kind: crate::types::MarketKind,
state: BookMarketState,
state_before_suspend: Option<BookMarketState>,
state_before_halt: Option<BookMarketState>,
close_process: Option<CloseProcessState>,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
next_trade_id: u64,
next_order_id: u64,
orders: OrderStore,
bids: SideBook,
asks: SideBook,
trades: BTreeMap<TradeId, BinaryTrade>,
scratch: Scratch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct BinaryYesBookSnapshot {
market_id: MarketId,
market_kind: crate::types::MarketKind,
state: BookMarketState,
state_before_suspend: Option<BookMarketState>,
state_before_halt: Option<BookMarketState>,
close_process: Option<CloseProcessState>,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
next_trade_id: u64,
next_order_id: u64,
orders: BTreeMap<OrderId, BinaryOrder>,
trades: BTreeMap<TradeId, BinaryTrade>,
}
#[derive(Debug, Default, Clone)]
struct Scratch {
events: EventVec,
maker_fills: HashMap<OrderId, u64>,
next_trade_id: u64,
}
#[derive(Debug, Clone)]
struct PlacePlan {
correlation_id: Option<CorrelationId>,
order_id: OrderId,
account_id: AccountId,
side: Side,
price_ticks: u16,
qty_shares: u64,
time_in_force: TimeInForce,
}
#[derive(Debug, Clone)]
struct ReplacePlan {
correlation_id: Option<CorrelationId>,
order_id: OrderId,
new_order_id: OrderId,
account_id: AccountId,
new_price_ticks: Option<u16>,
new_qty_shares: Option<u64>,
}
impl BinaryYesBook {
pub fn new_with_capacity(
market_id: MarketId,
market_kind: crate::types::MarketKind,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
max_price_ticks: u16,
order_store_capacity: usize,
) -> Self {
let bits = max_price_ticks as usize + 1;
Self {
market_id,
market_kind,
state: BookMarketState::Open,
state_before_suspend: None,
state_before_halt: None,
close_process: None,
yes_runner_id,
no_runner_id,
max_price_ticks,
next_trade_id: 1,
next_order_id: 1,
orders: OrderStore::with_capacity(order_store_capacity),
bids: SideBook::new(bits),
asks: SideBook::new(bits),
trades: BTreeMap::new(),
scratch: Scratch::default(),
}
}
fn rebuild_ladders(&mut self) {
type RebuildLevelKey = (Side, u16);
type RebuildLevelItem = (DateTime, OrderId, OrderKey, u64);
type RebuildPerLevel = HashMap<RebuildLevelKey, Vec<RebuildLevelItem>>;
let bits = self.max_price_ticks as usize + 1;
self.bids = SideBook::new(bits);
self.asks = SideBook::new(bits);
let keys: Vec<OrderKey> = self.orders.by_id_sorted.values().copied().collect();
for key in keys {
let slot = self.orders.slot_mut(key);
slot.links = Links::default();
slot.in_level = false;
}
let mut per_level: RebuildPerLevel = HashMap::new();
for (oid, key, order) in self.orders.iter_keys_sorted() {
let is_live = matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
);
if !is_live {
continue;
}
let remaining = order.remaining_shares();
if remaining == 0 {
continue;
}
if order.price_ticks == 0 || order.price_ticks >= self.max_price_ticks {
continue;
}
per_level
.entry((order.info.side, order.price_ticks))
.or_default()
.push((order.info.created_at, oid, key, remaining));
}
for ((_, _), mut items) in per_level {
items.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
for (_, _, key, remaining) in items {
match self.orders.slot(key).level_side {
Side::Yes => self.bids.insert_tail(&mut self.orders, key, remaining),
Side::No => self.asks.insert_tail(&mut self.orders, key, remaining),
}
}
}
}
fn apply_remove_order(&mut self, order_id: OrderId) {
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let prev_remaining = self.orders.slot(key).order.remaining_shares();
if self.orders.slot(key).in_level && prev_remaining > 0 {
match self.orders.slot(key).level_side {
Side::Yes => self.bids.unlink(&mut self.orders, key, prev_remaining),
Side::No => self.asks.unlink(&mut self.orders, key, prev_remaining),
}
}
self.orders.remove(&order_id);
}
pub fn market_id(&self) -> MarketId {
self.market_id
}
pub fn market_state(&self) -> BookMarketState {
self.state
}
pub fn is_halted(&self) -> bool {
self.state.is_halted()
}
pub fn is_resting(&self, order_id: OrderId) -> bool {
self.orders
.get_key(&order_id)
.map(|k| self.orders.slot(k).in_level)
.unwrap_or(false)
}
pub fn active_order_count(&self) -> usize {
self.orders
.iter_keys_sorted()
.filter(|(_, _, o)| {
matches!(
o.info.state,
BookOrderState::ExecutableUnmatched
| BookOrderState::ExecutablePartiallyMatched
)
})
.count()
}
pub fn close_process_state(&self) -> Option<CloseProcessState> {
self.close_process
}
pub fn runners(&self) -> impl Iterator<Item = RunnerId> + '_ {
[self.yes_runner_id, self.no_runner_id].into_iter()
}
pub fn runner_prices(&self, runner_id: RunnerId, depth: usize) -> RunnerPrices {
RunnerPrices {
runner_id,
available_to_back: Vec::with_capacity(depth),
available_to_lay: Vec::with_capacity(depth),
}
}
pub fn max_price_ticks(&self) -> u16 {
self.max_price_ticks
}
pub fn best_bid_ticks(&self) -> Option<u16> {
let t = self.bids.best_desc()?;
let t_u16 = t as u16;
(t_u16 > 0 && t_u16 < self.max_price_ticks).then_some(t_u16)
}
pub fn best_ask_ticks(&self) -> Option<u16> {
let t = self.asks.best_asc()?;
let t_u16 = t as u16;
(t_u16 > 0 && t_u16 < self.max_price_ticks).then_some(t_u16)
}
pub fn depth(&self, depth: usize) -> BinaryDepth {
let mut bids = Vec::with_capacity(depth);
let mut asks = Vec::with_capacity(depth);
let mut tick = self.bids.best_desc();
while let Some(t) = tick {
if bids.len() >= depth {
break;
}
let t_u16 = t as u16;
if t_u16 > 0 && t_u16 < self.max_price_ticks {
let lvl = self.bids.level(t);
if lvl.total_remaining > 0 {
bids.push(BinaryPriceSize {
price_ticks: t_u16,
size_shares: lvl.total_remaining,
});
}
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
let mut tick = self.asks.best_asc();
while let Some(t) = tick {
if asks.len() >= depth {
break;
}
let t_u16 = t as u16;
if t_u16 > 0 && t_u16 < self.max_price_ticks {
let lvl = self.asks.level(t);
if lvl.total_remaining > 0 {
asks.push(BinaryPriceSize {
price_ticks: t_u16,
size_shares: lvl.total_remaining,
});
}
}
tick = self.asks.next_asc_from(t + 1);
}
BinaryDepth {
max_price_ticks: self.max_price_ticks,
bids,
asks,
}
}
pub fn best_back_price(&self, runner_id: RunnerId) -> Option<crate::book::PriceSize> {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return None;
}
None
}
pub fn best_lay_price(&self, runner_id: RunnerId) -> Option<crate::book::PriceSize> {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return None;
}
None
}
pub fn runner_matched_volume(&self, runner_id: RunnerId) -> Money {
if runner_id != self.yes_runner_id && runner_id != self.no_runner_id {
return Money::zero();
}
Money::zero()
}
pub fn total_matched(&self) -> Money {
Money::zero()
}
fn emit(&self, ts: DateTime, event: BookEvent) -> BookEventEnvelope {
BookEventEnvelope {
market_id: self.market_id,
market_seq: 0,
timestamp: ts,
event,
}
}
pub(crate) fn handle_command(
&mut self,
cmd: &Command,
) -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
let correlation_id = cmd.correlation_id.clone();
let err_correlation_id = correlation_id.clone();
let err = |reason| BookError::new(err_correlation_id.clone(), reason);
let ts = Utc::now();
if cmd.market_id != self.market_id {
return Err(err(RejectReason::MarketIdMismatch));
}
match &cmd.kind {
CommandKind::HaltMarket { reason } => {
let events = self.cmd_halt_market(ts, *reason).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
CommandKind::ResumeMarket => {
let events = self.cmd_resume_market(ts).map_err(&err)?;
return Ok((
events.into_vec(),
CommandResponse {
correlation_id: correlation_id.clone(),
kind: None,
},
));
}
_ => {}
}
if self.state.is_halted() {
return Err(err(RejectReason::MarketHalted));
}
let mut scratch = std::mem::take(&mut self.scratch);
let result = (|| -> Result<(Vec<BookEventEnvelope>, CommandResponse), BookError> {
scratch.events.clear();
scratch.maker_fills.clear();
scratch.next_trade_id = self.next_trade_id;
let resp_kind = match &cmd.kind {
CommandKind::HaltMarket { .. } | CommandKind::ResumeMarket => {
unreachable!("halt/resume handled above")
}
CommandKind::CreateMarket { .. } => return Err(err(RejectReason::InternalError)),
CommandKind::SetMarketState { state } => {
let reason = "SET_MARKET_STATE";
self.plan_set_market_state(&mut scratch, ts, *state, reason)
.map_err(&err)?;
None
}
CommandKind::CloseMarket {
batch_max_events, ..
} => {
self.plan_close_market(&mut scratch, ts, *batch_max_events)
.map_err(&err)?;
None
}
CommandKind::ContinueCloseMarket => {
self.plan_continue_close_market(&mut scratch, ts)
.map_err(&err)?;
None
}
CommandKind::VoidMarket { reason } => {
if self.state == BookMarketState::Voided {
return Err(err(RejectReason::MarketAlreadyVoided));
}
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: BookMarketState::Voided,
reason: reason.clone(),
},
));
None
}
CommandKind::SettleMarket {
runner_results,
dead_heat_divisor,
} => {
scratch.events.push(self.emit(
ts,
BookEvent::MarketSettled {
runner_results: runner_results.clone(),
dead_heat_divisor: *dead_heat_divisor,
},
));
None
}
CommandKind::PlaceBinaryOrder {
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
..
} => {
let order_id = OrderId(self.next_order_id);
let result = self
.plan_place_binary_order(
&mut scratch,
ts,
PlacePlan {
correlation_id: correlation_id.clone(),
order_id,
account_id: *account_id,
side: *side,
price_ticks: *price_ticks,
qty_shares: *qty_shares,
time_in_force: *time_in_force,
},
)
.map_err(&err)?;
Some(CommandResponseKind::PlaceOrder(result))
}
CommandKind::CancelOrder {
account_id,
order_id,
} => {
self.plan_cancel_binary_order(
&mut scratch,
ts,
*order_id,
*account_id,
"USER_CANCEL",
)
.map_err(&err)?;
None
}
CommandKind::ReplaceBinaryOrder {
account_id,
order_id,
new_price_ticks,
new_qty_shares,
} => {
let new_order_id = OrderId(self.next_order_id);
self.plan_replace_binary_order(
&mut scratch,
ts,
ReplacePlan {
correlation_id: correlation_id.clone(),
order_id: *order_id,
new_order_id,
account_id: *account_id,
new_price_ticks: *new_price_ticks,
new_qty_shares: *new_qty_shares,
},
)
.map_err(&err)?;
None
}
CommandKind::PlaceOrder { .. }
| CommandKind::ReplaceOrder { .. }
| CommandKind::CashoutRunner { .. }
| CommandKind::RemoveRunner { .. }
| CommandKind::VoidTradesFromTime { .. }
| CommandKind::VoidTradeIds { .. } => {
return Err(err(RejectReason::MarketModelMismatch));
}
CommandKind::RemoveMarket => return Err(err(RejectReason::InternalError)),
};
let envs = scratch.events.iter().cloned().collect::<Vec<_>>();
Ok((
envs,
CommandResponse {
correlation_id: correlation_id.clone(),
kind: resp_kind,
},
))
})();
self.scratch = scratch;
result
}
fn plan_set_market_state(
&self,
scratch: &mut Scratch,
ts: DateTime,
state: MarketState,
reason: &str,
) -> Result<(), RejectReason> {
let to = match state {
MarketState::Open => BookMarketState::Open,
MarketState::InPlay => {
if matches!(self.market_kind, crate::types::MarketKind::PreEventOnly) {
return Err(RejectReason::MarketInPlayNotSupported);
}
BookMarketState::TurnInPlayEnabled
}
MarketState::Suspended => BookMarketState::Suspended,
MarketState::Closed => BookMarketState::Closed,
};
if self.state.is_terminal()
&& !matches!(to, BookMarketState::Settled | BookMarketState::Voided)
{
return Err(RejectReason::MarketTerminal);
}
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
},
));
Ok(())
}
fn emit_state_change(
&self,
scratch: &mut Scratch,
ts: DateTime,
to: BookMarketState,
reason: &str,
) {
scratch.events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to,
reason: reason.to_string(),
},
));
}
fn cmd_halt_market(&self, ts: DateTime, reason: u32) -> Result<EventVec, RejectReason> {
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if self.state.is_halted() {
return Err(RejectReason::MarketAlreadyHalted);
}
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: BookMarketState::Halted,
reason: format!("HALT:{reason}"),
},
));
Ok(events)
}
fn cmd_resume_market(&self, ts: DateTime) -> Result<EventVec, RejectReason> {
if !self.state.is_halted() {
return Err(RejectReason::MarketNotHalted);
}
let restored = self.state_before_halt.unwrap_or(BookMarketState::Open);
let mut events = EventVec::new();
events.push(self.emit(
ts,
BookEvent::MarketStateChanged {
to: restored,
reason: "RESUME".to_string(),
},
));
Ok(events)
}
fn is_live_order(order: &BinaryOrder) -> bool {
matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
)
}
fn count_live_orders(&self) -> u64 {
self.orders
.iter_keys_sorted()
.filter(|(_, _, o)| Self::is_live_order(o))
.count() as u64
}
fn select_live_orders_chunk(
&self,
cursor_after: Option<OrderId>,
max_cancels: usize,
) -> (Vec<OrderId>, Option<OrderId>, bool, u64) {
let mut orders_to_cancel = Vec::new();
let mut last = cursor_after;
let mut it = self.orders.iter_keys_sorted_from(cursor_after).peekable();
while let Some((oid, _, order)) = it.next() {
last = Some(oid);
if !Self::is_live_order(order) {
continue;
}
orders_to_cancel.push(oid);
if orders_to_cancel.len() >= max_cancels {
let done = it.peek().is_none();
return (orders_to_cancel, last, done, max_cancels as u64);
}
}
let cancelled = orders_to_cancel.len();
(orders_to_cancel, last, true, cancelled as u64)
}
fn plan_close_market(
&mut self,
scratch: &mut Scratch,
ts: DateTime,
batch_max_events: u16,
) -> Result<(), RejectReason> {
if self.state == BookMarketState::Closing {
return Err(RejectReason::MarketClosing);
}
if self.state.is_terminal() {
return Err(RejectReason::MarketTerminal);
}
if batch_max_events < crate::book::close_process::MIN_CLOSE_BATCH_EVENTS {
return Err(RejectReason::InvalidBatchSize);
}
let total_live_orders = self.count_live_orders();
if total_live_orders > 0 && batch_max_events < 1 {
return Err(RejectReason::InvalidBatchSize);
}
let cancel_budget = batch_max_events as usize;
let (cancelled_order_ids, cursor_after, done, _) =
self.select_live_orders_chunk(None, cancel_budget);
if done {
self.emit_state_change(scratch, ts, BookMarketState::Closed, "CLOSE_DONE");
scratch.events.push(self.emit(
ts,
BookEvent::MarketOrdersSettled {
cursor_after: None,
order_ids: cancelled_order_ids,
is_final: true,
},
));
} else {
let reason = crate::book::close_process::close_start_reason(batch_max_events);
self.emit_state_change(scratch, ts, BookMarketState::Closing, &reason);
scratch.events.push(self.emit(
ts,
BookEvent::MarketOrdersSettled {
cursor_after,
order_ids: cancelled_order_ids,
is_final: false,
},
));
}
Ok(())
}
fn plan_continue_close_market(
&mut self,
scratch: &mut Scratch,
ts: DateTime,
) -> Result<(), RejectReason> {
if self.state != BookMarketState::Closing {
return Err(RejectReason::MarketNotClosing);
}
let Some(proc_state) = self.close_process else {
return Err(RejectReason::InternalError);
};
let cancel_budget = proc_state.batch_max_events as usize;
let (cancelled_order_ids, cursor_after, done, _) =
self.select_live_orders_chunk(proc_state.cursor_after, cancel_budget);
if done {
self.emit_state_change(scratch, ts, BookMarketState::Closed, "CLOSE_DONE");
scratch.events.push(self.emit(
ts,
BookEvent::MarketOrdersSettled {
cursor_after: None,
order_ids: cancelled_order_ids,
is_final: true,
},
));
} else {
scratch.events.push(self.emit(
ts,
BookEvent::MarketOrdersSettled {
cursor_after,
order_ids: cancelled_order_ids,
is_final: false,
},
));
}
Ok(())
}
fn validate_price_ticks(&self, price_ticks: u16) -> Result<(), RejectReason> {
if self.max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if price_ticks == 0 || price_ticks >= self.max_price_ticks {
return Err(RejectReason::InvalidPriceTicks);
}
Ok(())
}
fn validate_qty_shares(qty_shares: u64) -> Result<(), RejectReason> {
if qty_shares == 0 {
return Err(RejectReason::InvalidQtyShares);
}
Ok(())
}
fn available_to_match(&self, side: Side, price_ticks: u16) -> u64 {
let limit = price_ticks as usize;
match side {
Side::Yes => {
let mut tick = self.asks.best_asc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
tick = self.asks.next_asc_from(t + 1);
continue;
}
if t > limit {
break;
}
total = total.saturating_add(self.asks.level(t).total_remaining);
tick = self.asks.next_asc_from(t + 1);
}
total
}
Side::No => {
let mut tick = self.bids.best_desc();
let mut total = 0u64;
while let Some(t) = tick {
if t == 0 || t >= self.max_price_ticks as usize {
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
continue;
}
if t < limit {
break;
}
total = total.saturating_add(self.bids.level(t).total_remaining);
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
total
}
}
}
fn plan_place_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
plan: PlacePlan,
) -> Result<PlaceOrderResult, RejectReason> {
let PlacePlan {
correlation_id,
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
} = plan;
if !self.state.is_matchable() {
return Err(RejectReason::MarketNotOpen);
}
self.validate_price_ticks(price_ticks)?;
Self::validate_qty_shares(qty_shares)?;
if self.orders.contains(&order_id) {
return Err(RejectReason::Duplicate);
}
if let TimeInForce::FillOrKill { min_fill } = time_in_force {
let required = min_fill.map(|v| v.0).unwrap_or(qty_shares);
let available = self.available_to_match(side, price_ticks);
if available < required {
return Err(RejectReason::WouldNotFillFok);
}
}
scratch.events.push(self.emit(
ts,
BookEvent::BinaryOrderAccepted {
correlation_id,
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
},
));
let (matched_shares, avg_price_ticks) =
self.plan_matching(scratch, ts, order_id, side, price_ticks, qty_shares);
let mut remaining = qty_shares.saturating_sub(matched_shares);
let mut state = if remaining == 0 {
BookOrderState::ExecutionComplete
} else if matched_shares > 0 {
BookOrderState::ExecutablePartiallyMatched
} else {
BookOrderState::ExecutableUnmatched
};
let should_cancel_remainder =
matches!(time_in_force, TimeInForce::FillOrKill { .. }) && remaining > 0;
if should_cancel_remainder {
remaining = 0;
state = BookOrderState::Cancelled;
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_id,
reason: "FOK_REMAINDER".to_string(),
},
));
}
Ok(PlaceOrderResult {
accepted: true,
order_id,
matched: FillQuantity::Shares(matched_shares),
avg_matched_price: avg_price_ticks.map(FillPrice::Ticks),
remaining: FillQuantity::Shares(remaining),
final_order_state: Some(state),
})
}
fn plan_cancel_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
order_id: OrderId,
account_id: AccountId,
reason: &str,
) -> Result<(), RejectReason> {
let Some(order) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if order.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
if !matches!(
order.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
return Err(RejectReason::OrderNotLive);
}
scratch.events.push(self.emit(
ts,
BookEvent::OrderCancelled {
order_id,
reason: reason.to_string(),
},
));
Ok(())
}
fn plan_replace_binary_order(
&self,
scratch: &mut Scratch,
ts: DateTime,
plan: ReplacePlan,
) -> Result<(), RejectReason> {
let ReplacePlan {
correlation_id,
order_id,
new_order_id,
account_id,
new_price_ticks,
new_qty_shares,
} = plan;
let Some(old) = self.orders.get(&order_id) else {
return Err(RejectReason::OrderNotFound);
};
if old.info.account_id != account_id {
return Err(RejectReason::NotOrderOwner);
}
if !matches!(
old.info.state,
BookOrderState::ExecutableUnmatched | BookOrderState::ExecutablePartiallyMatched
) {
return Err(RejectReason::OrderNotLive);
}
let price_ticks = new_price_ticks.unwrap_or(old.price_ticks);
let qty_shares = new_qty_shares.unwrap_or(old.remaining_shares());
self.validate_price_ticks(price_ticks)?;
Self::validate_qty_shares(qty_shares)?;
self.plan_cancel_binary_order(scratch, ts, order_id, account_id, "REPLACE")?;
self.plan_place_binary_order(
scratch,
ts,
PlacePlan {
correlation_id,
order_id: new_order_id,
account_id,
side: old.info.side,
price_ticks,
qty_shares,
time_in_force: old.time_in_force,
},
)?;
Ok(())
}
fn plan_matching(
&self,
scratch: &mut Scratch,
ts: DateTime,
taker_order_id: OrderId,
taker_side: Side,
taker_price_ticks: u16,
taker_qty_shares: u64,
) -> (u64, Option<u16>) {
let mut remaining = taker_qty_shares;
if remaining == 0 {
return (0, None);
}
let mut matched = 0u64;
let mut price_qty_sum: u128 = 0;
match taker_side {
Side::Yes => {
let limit = taker_price_ticks as usize;
let mut tick = self.asks.best_asc();
while let Some(t) = tick {
if remaining == 0 || t > limit {
break;
}
let head = self.asks.level(t).head;
let mut maker_key = head;
while let Some(k) = maker_key {
if remaining == 0 {
break;
}
let maker = &self.orders.slot(k).order;
let maker_remaining = maker.remaining_shares().saturating_sub(
*scratch.maker_fills.get(&maker.info.order_id).unwrap_or(&0),
);
if maker_remaining == 0 {
maker_key = self.orders.slot(k).links.next;
continue;
}
let fill = remaining.min(maker_remaining);
remaining -= fill;
matched += fill;
price_qty_sum += fill as u128 * maker.price_ticks as u128;
scratch
.maker_fills
.entry(maker.info.order_id)
.and_modify(|v| *v = v.saturating_add(fill))
.or_insert(fill);
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
trade_id: {
let id = TradeId(scratch.next_trade_id);
scratch.next_trade_id += 1;
id
},
maker: OrderInfo {
id: maker.info.order_id,
side: maker.info.side,
},
taker: OrderInfo {
id: taker_order_id,
side: taker_side,
},
price_ticks: maker.price_ticks,
qty_shares: fill,
},
));
maker_key = self.orders.slot(k).links.next;
}
tick = self.asks.next_asc_from(t + 1);
}
}
Side::No => {
let limit = taker_price_ticks as usize;
let mut tick = self.bids.best_desc();
while let Some(t) = tick {
if remaining == 0 || t < limit {
break;
}
let head = self.bids.level(t).head;
let mut maker_key = head;
while let Some(k) = maker_key {
if remaining == 0 {
break;
}
let maker = &self.orders.slot(k).order;
let maker_remaining = maker.remaining_shares().saturating_sub(
*scratch.maker_fills.get(&maker.info.order_id).unwrap_or(&0),
);
if maker_remaining == 0 {
maker_key = self.orders.slot(k).links.next;
continue;
}
let fill = remaining.min(maker_remaining);
remaining -= fill;
matched += fill;
price_qty_sum += fill as u128 * maker.price_ticks as u128;
scratch
.maker_fills
.entry(maker.info.order_id)
.and_modify(|v| *v = v.saturating_add(fill))
.or_insert(fill);
scratch.events.push(self.emit(
ts,
BookEvent::BinaryTradeMatched {
trade_id: {
let id = TradeId(scratch.next_trade_id);
scratch.next_trade_id += 1;
id
},
maker: OrderInfo {
id: maker.info.order_id,
side: maker.info.side,
},
taker: OrderInfo {
id: taker_order_id,
side: taker_side,
},
price_ticks: maker.price_ticks,
qty_shares: fill,
},
));
maker_key = self.orders.slot(k).links.next;
}
if t == 0 {
break;
}
tick = self.bids.next_desc_from(t - 1);
}
}
}
let avg = if matched == 0 {
None
} else {
Some(((price_qty_sum + matched as u128 / 2) / matched as u128) as u16)
};
(matched, avg)
}
pub fn apply_event(&mut self, env: &BookEventEnvelope) {
let ts = env.timestamp;
match &env.event {
BookEvent::MarketCreated { .. } => {}
BookEvent::MarketStateChanged { to, reason } => {
if *to == BookMarketState::Closing {
let batch_max_events =
crate::book::close_process::parse_close_start_batch_max_events(
reason.as_str(),
)
.unwrap_or(crate::book::close_process::DEFAULT_CLOSE_BATCH_EVENTS);
let total_live_orders = self.count_live_orders();
self.close_process = Some(CloseProcessState {
batch_max_events,
cursor_after: None,
total_live_orders,
cancelled_total: 0,
chunks_done: 0,
});
}
let from = self.state;
if *to == BookMarketState::Suspended {
self.state_before_suspend = Some(from);
} else if from == BookMarketState::Suspended {
self.state_before_suspend = None;
}
if *to == BookMarketState::Halted {
self.state_before_halt = Some(from);
} else if from == BookMarketState::Halted {
self.state_before_halt = None;
}
self.state = *to;
}
BookEvent::MarketSettled { .. } => self.state = BookMarketState::Settled,
BookEvent::MarketOrdersSettled {
cursor_after,
order_ids,
is_final,
} => {
for order_id in order_ids {
self.apply_remove_order(*order_id);
}
if let Some(s) = self.close_process.as_mut() {
s.cursor_after = *cursor_after;
s.cancelled_total = s.cancelled_total.saturating_add(order_ids.len() as u64);
s.chunks_done = s.chunks_done.saturating_add(1);
if *is_final {
self.close_process = None;
}
}
}
BookEvent::BinaryOrderAccepted {
order_id,
account_id,
side,
price_ticks,
qty_shares,
time_in_force,
..
} => {
self.next_order_id = self.next_order_id.max(order_id.0.saturating_add(1));
let order = BinaryOrder {
info: BookOrderInfo {
order_id: *order_id,
account_id: *account_id,
side: *side,
state: BookOrderState::ExecutableUnmatched,
created_at: ts,
last_updated_at: ts,
},
price_ticks: *price_ticks,
qty_shares: *qty_shares,
filled_shares: 0,
time_in_force: *time_in_force,
};
let key = self.orders.insert(*order_id, order);
match side {
Side::Yes => self.bids.insert_tail(&mut self.orders, key, *qty_shares),
Side::No => self.asks.insert_tail(&mut self.orders, key, *qty_shares),
}
}
BookEvent::BinaryTradeMatched {
trade_id,
maker,
taker,
price_ticks,
qty_shares,
..
} => {
self.trades.insert(
*trade_id,
BinaryTrade {
trade_id: *trade_id,
maker_order_id: maker.id,
taker_order_id: taker.id,
price_ticks: *price_ticks,
qty_shares: *qty_shares,
matched_at: ts,
},
);
self.next_trade_id = self.next_trade_id.max(trade_id.0 + 1);
self.apply_trade_fill(maker.id, *qty_shares, ts);
self.apply_trade_fill(taker.id, *qty_shares, ts);
}
BookEvent::OrderCancelled { order_id, .. } => {
self.apply_remove_order(*order_id);
}
BookEvent::OrderLapsed { order_id, .. } => {
warn!(
market_id=?self.market_id,
order_id=?order_id,
"OrderLapsed ignored for binary book"
);
}
BookEvent::OrderVoided { order_id, .. } => {
self.apply_remove_order(*order_id);
}
BookEvent::OrderAccepted { .. }
| BookEvent::OrderRejected { .. }
| BookEvent::TradeMatched { .. }
| BookEvent::TradeVoided { .. }
| BookEvent::RunnerRemoved { .. }
| BookEvent::MarketRemoved { .. } => {}
}
}
pub fn apply_all_events(&mut self, envs: &[BookEventEnvelope]) {
for env in envs {
self.apply_event(env);
}
}
fn apply_trade_fill(&mut self, order_id: OrderId, qty_shares: u64, ts: DateTime) {
if qty_shares == 0 {
return;
}
let Some(key) = self.orders.get_key(&order_id) else {
return;
};
let (side, tick, in_level, prev_remaining, filled_shares, remaining_shares) = {
let slot = self.orders.slot(key);
let prev_remaining = slot.order.remaining_shares();
if prev_remaining == 0 {
return;
}
let applied = qty_shares.min(prev_remaining);
let filled_shares = slot.order.filled_shares.saturating_add(applied);
let remaining_shares = slot.order.qty_shares.saturating_sub(filled_shares);
(
slot.level_side,
slot.level_tick as usize,
slot.in_level,
prev_remaining,
filled_shares,
remaining_shares,
)
};
if in_level {
let delta = qty_shares.min(prev_remaining);
match side {
Side::Yes => {
self.bids.levels[tick].total_remaining =
self.bids.levels[tick].total_remaining.saturating_sub(delta);
if remaining_shares == 0 {
self.bids.unlink(&mut self.orders, key, 0);
}
}
Side::No => {
self.asks.levels[tick].total_remaining =
self.asks.levels[tick].total_remaining.saturating_sub(delta);
if remaining_shares == 0 {
self.asks.unlink(&mut self.orders, key, 0);
}
}
}
}
if let Some(order) = self.orders.get_mut(&order_id) {
order.filled_shares = filled_shares;
order.info.last_updated_at = ts;
order.info.state = if remaining_shares == 0 {
BookOrderState::ExecutionComplete
} else {
BookOrderState::ExecutablePartiallyMatched
};
}
if remaining_shares == 0 {
self.orders.remove(&order_id);
}
}
}
impl Serialize for BinaryYesBook {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut orders = BTreeMap::new();
for (oid, _, order) in self.orders.iter_keys_sorted() {
orders.insert(oid, order.clone());
}
let snap = BinaryYesBookSnapshot {
market_id: self.market_id,
market_kind: self.market_kind,
state: self.state,
state_before_suspend: self.state_before_suspend,
state_before_halt: self.state_before_halt,
close_process: self.close_process,
yes_runner_id: self.yes_runner_id,
no_runner_id: self.no_runner_id,
max_price_ticks: self.max_price_ticks,
next_trade_id: self.next_trade_id,
next_order_id: self.next_order_id,
orders,
trades: self.trades.clone(),
};
snap.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for BinaryYesBook {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let snap = BinaryYesBookSnapshot::deserialize(deserializer)?;
let mut book = BinaryYesBook::new_with_capacity(
snap.market_id,
snap.market_kind,
snap.yes_runner_id,
snap.no_runner_id,
snap.max_price_ticks,
snap.orders.len(),
);
book.state = snap.state;
book.state_before_suspend = snap.state_before_suspend;
book.state_before_halt = snap.state_before_halt;
book.close_process = snap.close_process;
book.trades = snap.trades;
book.next_trade_id = snap.next_trade_id;
book.next_order_id = snap.next_order_id;
for (oid, order) in snap.orders {
book.orders.insert(oid, order);
}
book.rebuild_ladders();
if let Some(max_tid) = book.trades.keys().last() {
book.next_trade_id = book.next_trade_id.max(max_tid.0 + 1);
}
if let Some((max_oid, _, _)) = book.orders.iter_keys_sorted().last() {
book.next_order_id = book.next_order_id.max(max_oid.0.saturating_add(1));
}
Ok(book)
}
}