use crate::{
book::protocol::{
command::{Command, CommandKind, Persistence, Side, TimeInForce},
reject::RejectReason,
},
book::{Book, BookEventEnvelope, PriceSize},
config::Config,
types::*,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::info;
fn clamp_i128_to_i64(v: i128) -> i64 {
if v > i64::MAX as i128 {
i64::MAX
} else if v < i64::MIN as i128 {
i64::MIN
} else {
v as i64
}
}
fn implied_maker_stake_from_taker(taker_stake: Money, trade_price: OddsX10000) -> Money {
let denom = trade_price.0 as i128 - 10_000;
if taker_stake.0 <= 0 || denom <= 0 {
return Money::zero();
}
let v = (taker_stake.0 as i128 * denom) / 10_000;
Money(clamp_i128_to_i64(v))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "camelCase")]
pub enum MarketConfig {
TwoRunner {
market_id: MarketId,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
},
BinaryYes {
market_id: MarketId,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
max_price_ticks: u16,
},
MultiRunner {
market_id: MarketId,
runners: Vec<RunnerId>,
market_kind: MarketKind,
},
}
impl MarketConfig {
pub fn two_runner(
market_id: MarketId,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
) -> Self {
Self::TwoRunner {
market_id,
runner_a,
runner_b,
market_kind,
}
}
pub fn multi_runner(
market_id: MarketId,
runners: impl IntoIterator<Item = RunnerId>,
market_kind: MarketKind,
) -> Self {
Self::MultiRunner {
market_id,
runners: runners.into_iter().collect(),
market_kind,
}
}
pub fn multi_runner_dynamic(market_id: MarketId, market_kind: MarketKind) -> Self {
Self::MultiRunner {
market_id,
runners: Vec::new(),
market_kind,
}
}
pub fn binary_yes(
market_id: MarketId,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
max_price_ticks: u16,
) -> Self {
Self::BinaryYes {
market_id,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
}
}
pub fn market_id(&self) -> MarketId {
match self {
Self::TwoRunner { market_id, .. } => *market_id,
Self::BinaryYes { market_id, .. } => *market_id,
Self::MultiRunner { market_id, .. } => *market_id,
}
}
pub fn market_kind(&self) -> MarketKind {
match self {
Self::TwoRunner { market_kind, .. }
| Self::BinaryYes { market_kind, .. }
| Self::MultiRunner { market_kind, .. } => *market_kind,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Market {
pub id: MarketId,
pub market_kind: MarketKind,
pub book: Book,
pub last_market_seq: u64,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
struct OrderMeta {
market_id: MarketId,
account_id: AccountId,
runner_id: RunnerId,
side: Side,
price: OddsX10000,
persistence: Persistence,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TradeMeta {
market_id: MarketId,
maker_runner_id: RunnerId,
taker_runner_id: RunnerId,
price: OddsX10000,
maker_stake: Money,
taker_stake: Money,
maker_account_id: AccountId,
maker_side: Side,
taker_account_id: AccountId,
taker_side: Side,
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
struct RunnerPosition {
win_pnl: i64,
lose_pnl: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
struct RiskTracker {
order_index: HashMap<OrderId, OrderMeta>,
trade_index: HashMap<TradeId, TradeMeta>,
positions: HashMap<MarketId, HashMap<AccountId, HashMap<RunnerId, RunnerPosition>>>,
}
impl RiskTracker {
fn position_for(
&self,
market_id: MarketId,
account_id: AccountId,
runner_id: RunnerId,
) -> Option<&RunnerPosition> {
self.positions
.get(&market_id)
.and_then(|by_account| by_account.get(&account_id))
.and_then(|by_runner| by_runner.get(&runner_id))
}
fn position_for_mut(
&mut self,
market_id: MarketId,
account_id: AccountId,
runner_id: RunnerId,
) -> &mut RunnerPosition {
self.positions
.entry(market_id)
.or_default()
.entry(account_id)
.or_default()
.entry(runner_id)
.or_default()
}
fn apply_order_accepted(&mut self, order_id: OrderId, meta: OrderMeta) {
self.order_index.insert(order_id, meta);
}
fn apply_order_terminal(&mut self, order_id: OrderId) {
self.order_index.remove(&order_id);
}
#[allow(clippy::too_many_arguments)]
fn apply_trade_matched(
&mut self,
market: &Market,
trade_id: TradeId,
maker_order_id: OrderId,
maker_side: Side,
taker_order_id: OrderId,
taker_side: Side,
price: OddsX10000,
stake: Money,
) {
let Some(maker_meta) = self.order_index.get(&maker_order_id) else {
return;
};
let Some(taker_meta) = self.order_index.get(&taker_order_id) else {
return;
};
let maker_meta = *maker_meta;
let taker_meta = *taker_meta;
let is_implied = maker_meta.runner_id != taker_meta.runner_id;
let maker_stake = if is_implied {
implied_maker_stake_from_taker(stake, price)
} else {
stake
};
let taker_stake = stake;
let maker_pos =
self.position_for_mut(market.id, maker_meta.account_id, maker_meta.runner_id);
apply_runner_position(maker_pos, maker_side, price, maker_stake, 1);
let taker_pos =
self.position_for_mut(market.id, taker_meta.account_id, taker_meta.runner_id);
apply_runner_position(taker_pos, taker_side, price, taker_stake, 1);
self.trade_index.insert(
trade_id,
TradeMeta {
market_id: market.id,
maker_runner_id: maker_meta.runner_id,
taker_runner_id: taker_meta.runner_id,
price,
maker_stake,
taker_stake,
maker_account_id: maker_meta.account_id,
maker_side,
taker_account_id: taker_meta.account_id,
taker_side,
},
);
}
fn apply_trade_voided(&mut self, trade_id: TradeId) {
let Some(trade) = self.trade_index.remove(&trade_id) else {
return;
};
let maker_pos = self.position_for_mut(
trade.market_id,
trade.maker_account_id,
trade.maker_runner_id,
);
apply_runner_position(
maker_pos,
trade.maker_side,
trade.price,
trade.maker_stake,
-1,
);
let taker_pos = self.position_for_mut(
trade.market_id,
trade.taker_account_id,
trade.taker_runner_id,
);
apply_runner_position(
taker_pos,
trade.taker_side,
trade.price,
trade.taker_stake,
-1,
);
}
fn clear_market(&mut self, market_id: MarketId) {
self.positions.remove(&market_id);
self.trade_index
.retain(|_, trade| trade.market_id != market_id);
self.order_index
.retain(|_, order| order.market_id != market_id);
}
}
fn apply_runner_position(
position: &mut RunnerPosition,
side: Side,
price: OddsX10000,
stake: Money,
sign: i64,
) {
let stake_i = stake.0 as i128;
let odds = price.0 as i128;
let profit = (stake_i * (odds - 10_000)) / 10_000;
let sign = sign as i128;
match side {
Side::Yes => {
position.win_pnl = clamp_i128_to_i64(position.win_pnl as i128 + sign * profit);
position.lose_pnl = clamp_i128_to_i64(position.lose_pnl as i128 + sign * -stake_i);
}
Side::No => {
position.win_pnl = clamp_i128_to_i64(position.win_pnl as i128 + sign * -profit);
position.lose_pnl = clamp_i128_to_i64(position.lose_pnl as i128 + sign * stake_i);
}
}
}
impl Market {
pub fn new(id: MarketId, market_kind: MarketKind) -> Self {
Self {
id,
market_kind,
book: Book::new_engine_with_kind(id, market_kind),
last_market_seq: 0,
}
}
pub fn new_with_cfg(id: MarketId, market_kind: MarketKind, cfg: &Config) -> Self {
Self {
id,
market_kind,
book: Book::new_engine_with_kind_and_capacity(
id,
market_kind,
cfg.order_store_capacity,
),
last_market_seq: 0,
}
}
pub fn from_config(config: MarketConfig) -> Self {
match config {
MarketConfig::TwoRunner {
market_id,
runner_a,
runner_b,
market_kind,
} => Self {
id: market_id,
market_kind,
book: Book::new_two_runner_with_kind(market_id, market_kind, runner_a, runner_b),
last_market_seq: 0,
},
MarketConfig::BinaryYes {
market_id,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
} => Self {
id: market_id,
market_kind,
book: Book::new_binary_yes_with_kind(
market_id,
market_kind,
yes_runner_id,
no_runner_id,
max_price_ticks,
),
last_market_seq: 0,
},
MarketConfig::MultiRunner {
market_id,
runners,
market_kind,
} => Self {
id: market_id,
market_kind,
book: if runners.is_empty() {
Book::new_engine_with_kind(market_id, market_kind)
} else {
Book::new_multi_runner_with_kind(market_id, market_kind, runners)
},
last_market_seq: 0,
},
}
}
pub fn from_config_with_cfg(cfg: &Config, config: MarketConfig) -> Self {
match config {
MarketConfig::TwoRunner {
market_id,
runner_a,
runner_b,
market_kind,
} => Self {
id: market_id,
market_kind,
book: Book::new_two_runner_with_kind_and_capacity(
market_id,
market_kind,
runner_a,
runner_b,
cfg.order_store_capacity,
),
last_market_seq: 0,
},
MarketConfig::BinaryYes {
market_id,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
} => Self {
id: market_id,
market_kind,
book: Book::new_binary_yes_with_kind_and_capacity(
market_id,
market_kind,
yes_runner_id,
no_runner_id,
max_price_ticks,
cfg.order_store_capacity,
),
last_market_seq: 0,
},
MarketConfig::MultiRunner {
market_id,
runners,
market_kind,
} => Self {
id: market_id,
market_kind,
book: if runners.is_empty() {
Book::new_engine_with_kind_and_capacity(
market_id,
market_kind,
cfg.order_store_capacity,
)
} else {
Book::new_multi_runner_with_kind_and_capacity(
market_id,
market_kind,
runners,
cfg.order_store_capacity,
)
},
last_market_seq: 0,
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineState {
pub cfg: Config,
pub markets: HashMap<MarketId, Market>,
risk: RiskTracker,
}
impl EngineState {
fn assign_market_seqs(market: &Market, events: &mut [BookEventEnvelope]) {
let mut next = market.last_market_seq.saturating_add(1);
for event in events {
event.market_seq = next;
next = next.saturating_add(1);
}
}
pub fn new(cfg: Config) -> Self {
Self {
cfg,
markets: HashMap::new(),
risk: RiskTracker::default(),
}
}
pub fn handle_command(
&mut self,
cmd: &Command,
) -> Result<Vec<BookEventEnvelope>, RejectReason> {
match &cmd.kind {
CommandKind::CreateMarket {
name,
market_model,
market_kind,
runner_ids,
runner_labels,
} => {
let market_id = cmd.market_id;
if self.markets.contains_key(&market_id) {
return Err(RejectReason::MarketAlreadyExists);
}
if runner_ids.len() == 1 {
return Err(RejectReason::InvalidMarketConfig);
}
match market_model {
MarketModel::ExchangeOdds => {}
MarketModel::BinaryYes { max_price_ticks } => {
if *max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if let Some(enforced) = self.cfg.enforce_binary_yes_max_price_ticks
&& *max_price_ticks != enforced
{
return Err(RejectReason::InvalidMarketConfig);
}
if runner_ids.len() != 2 {
return Err(RejectReason::InvalidMarketConfig);
}
}
}
if !runner_labels.is_empty() && runner_labels.len() != runner_ids.len() {
return Err(RejectReason::InvalidMarketConfig);
}
info!(
market_id = ?market_id,
market_name = %name,
market_model = ?market_model,
market_kind = ?market_kind,
runner_count = runner_ids.len(),
"creating market"
);
Ok(vec![BookEventEnvelope {
market_id,
market_seq: 1,
timestamp: Utc::now(),
event: crate::book::BookEvent::MarketCreated {
correlation_id: cmd.correlation_id.clone(),
name: name.clone(),
market_model: *market_model,
market_kind: *market_kind,
runner_ids: runner_ids.clone(),
runner_labels: runner_labels.clone(),
},
}])
}
CommandKind::RemoveMarket => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
if !market.book.market_state().is_terminal() {
return Err(RejectReason::MarketNotTerminal);
}
info!(market_id = ?market_id, "removing terminal market");
Ok(vec![BookEventEnvelope {
market_id,
market_seq: market.last_market_seq.saturating_add(1),
timestamp: Utc::now(),
event: crate::book::BookEvent::MarketRemoved {
reason: "removed via command".to_string(),
},
}])
}
CommandKind::CashoutRunner {
account_id,
runner_id,
percent_bps,
max_slippage_bps,
depth_levels,
} => {
let Some(market) = self.markets.get(&cmd.market_id) else {
return Err(RejectReason::MarketNotFound);
};
if matches!(market.book, Book::BinaryYes(_)) {
return Err(RejectReason::CashoutNotSupported);
}
self.handle_cashout_runner(
cmd.market_id,
*account_id,
*runner_id,
*percent_bps,
*max_slippage_bps,
*depth_levels,
cmd.correlation_id.clone(),
)
}
_ => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get_mut(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
market
.book
.handle(cmd)
.map(|(mut events, _)| {
Self::assign_market_seqs(market, &mut events);
events
})
.map_err(|e| e.reason)
}
}
}
#[allow(clippy::too_many_arguments)]
fn handle_cashout_runner(
&mut self,
market_id: MarketId,
account_id: AccountId,
runner_id: RunnerId,
percent_bps: u16,
max_slippage_bps: u16,
depth_levels: u16,
correlation_id: Option<CorrelationId>,
) -> Result<Vec<BookEventEnvelope>, RejectReason> {
let Some(market) = self.markets.get_mut(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
if !market.book.market_state().is_matchable() {
return Err(RejectReason::MarketNotOpen);
}
if percent_bps == 0 || percent_bps > 10_000 {
return Err(RejectReason::CashoutInvalidPercent);
}
let runner_ids: Vec<RunnerId> = market.book.runners().collect();
if !runner_ids.contains(&runner_id) {
return Err(RejectReason::RunnerNotFound);
}
if runner_ids.len() < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
let position = self
.risk
.position_for(market_id, account_id, runner_id)
.copied()
.unwrap_or_default();
if position.win_pnl == 0 && position.lose_pnl == 0 {
return Err(RejectReason::CashoutNoExposure);
}
let pwin = position.win_pnl;
let plose = position.lose_pnl;
let diff = pwin - plose;
if diff == 0 {
return Err(RejectReason::CashoutNoExposure);
}
let (side, diff_abs) = if diff > 0 {
(Side::No, diff)
} else {
(Side::Yes, -diff)
};
let depth_levels = if depth_levels == 0 {
DEFAULT_CASHOUT_DEPTH
} else {
depth_levels
};
let prices = market.book.runner_prices(runner_id, depth_levels as usize);
let levels = match side {
Side::Yes => prices.available_to_back,
Side::No => prices.available_to_lay,
};
let Some(quote_level) = levels.first() else {
return Err(RejectReason::CashoutNoLiquidity);
};
let Some(full_stake) = stake_from_diff(diff_abs, quote_level.price) else {
return Err(RejectReason::CashoutNoExposure);
};
let stake = apply_percent(full_stake, percent_bps);
if stake.0 <= 0 {
return Err(RejectReason::CashoutNoExposure);
}
let Some(wap) = walk_book(&levels, stake) else {
return Err(RejectReason::CashoutNoLiquidity);
};
let slippage = slippage_bps(quote_level.price, wap.avg_price);
if slippage > max_slippage_bps as u32 {
return Err(RejectReason::CashoutOddsChanged);
}
let place_cmd = Command {
correlation_id: correlation_id.clone(),
market_id,
kind: CommandKind::PlaceOrder {
runner_id,
account_id,
client_order_id: None,
side,
odds: wap.worst_price,
stake,
persistence: Persistence::Lapse,
time_in_force: TimeInForce::FillOrKill {
min_fill: Some(Quantity(1)),
},
},
};
market
.book
.handle(&place_cmd)
.map(|(mut events, _)| {
Self::assign_market_seqs(market, &mut events);
events
})
.map_err(|e| e.reason)
}
}
const DEFAULT_CASHOUT_DEPTH: u16 = 3;
struct WapResult {
avg_price: OddsX10000,
worst_price: OddsX10000,
}
fn stake_from_diff(diff_abs: i64, price: OddsX10000) -> Option<Money> {
if diff_abs <= 0 {
return None;
}
let numerator = diff_abs as i128 * 10_000;
let denom = price.0 as i128;
if denom <= 0 {
return None;
}
let stake = (numerator + denom / 2) / denom;
let stake_i64 = clamp_i128_to_i64(stake);
Money(stake_i64).is_positive().then_some(Money(stake_i64))
}
fn apply_percent(stake: Money, percent_bps: u16) -> Money {
let stake_i = stake.0 as i128;
let pct = percent_bps as i128;
let v = (stake_i * pct) / 10_000;
Money(clamp_i128_to_i64(v))
}
fn walk_book(levels: &[PriceSize], target: Money) -> Option<WapResult> {
if target.0 <= 0 {
return None;
}
let mut filled: i128 = 0;
let mut cost: i128 = 0;
let mut worst_price = None;
for level in levels {
if filled >= target.0 as i128 {
break;
}
let remaining = target.0 as i128 - filled;
let take = remaining.min(level.size.0 as i128);
if take <= 0 {
continue;
}
filled += take;
cost += take * level.price.0 as i128;
worst_price = Some(level.price);
}
if filled < target.0 as i128 {
return None;
}
let avg_price = ((cost + filled / 2) / filled) as u32;
Some(WapResult {
avg_price: OddsX10000(avg_price),
worst_price: worst_price.unwrap_or(OddsX10000(avg_price)),
})
}
fn slippage_bps(quote: OddsX10000, actual: OddsX10000) -> u32 {
let quote_i = quote.0 as i128;
let diff = (actual.0 as i128 - quote_i).abs();
((diff * 10_000) / quote_i.max(1)) as u32
}
impl Default for EngineState {
fn default() -> Self {
Self::new(Config::default())
}
}
#[derive(Debug, Error)]
#[error("command rejected: {reason:?}")]
pub struct EngineCommandError {
pub reason: RejectReason,
}
impl EngineCommandError {
pub fn into_reason(self) -> RejectReason {
self.reason
}
}
impl lucidstream::traits::Aggregate for EngineState {
type Command = Command;
type Event = BookEventEnvelope;
type Error = EngineCommandError;
fn kind() -> &'static str {
"betex"
}
fn handle(&mut self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
let events = self
.handle_command(&command)
.map_err(|reason| EngineCommandError { reason })?;
if events.is_empty() {
return Err(EngineCommandError {
reason: RejectReason::InternalError,
});
}
Ok(events)
}
fn apply(mut self, event: &Self::Event) -> Self {
let market_id = event.market_id;
if !self.markets.contains_key(&market_id) {
match &event.event {
crate::book::BookEvent::MarketCreated {
market_model,
market_kind,
runner_ids,
..
} => {
assert_eq!(
event.market_seq, 1,
"market sequence must start at 1 for {:?}",
market_id
);
let config = match market_model {
MarketModel::ExchangeOdds => {
if runner_ids.len() == 2 {
MarketConfig::two_runner(
market_id,
runner_ids[0],
runner_ids[1],
*market_kind,
)
} else if runner_ids.is_empty() {
MarketConfig::multi_runner_dynamic(market_id, *market_kind)
} else {
MarketConfig::multi_runner(
market_id,
runner_ids.clone(),
*market_kind,
)
}
}
MarketModel::BinaryYes { max_price_ticks } => {
if runner_ids.len() != 2 {
panic!("BinaryYes market requires exactly 2 runner_ids");
}
MarketConfig::binary_yes(
market_id,
runner_ids[0],
runner_ids[1],
*market_kind,
*max_price_ticks,
)
}
};
self.markets
.insert(market_id, Market::from_config_with_cfg(&self.cfg, config));
if let Some(market) = self.markets.get_mut(&market_id) {
market.last_market_seq = event.market_seq;
}
return self;
}
_ => {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
)
}
}
}
if matches!(&event.event, crate::book::BookEvent::MarketRemoved { .. }) {
let Some(market) = self.markets.get(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
self.markets.remove(&market_id);
self.risk.clear_market(market_id);
return self;
}
let Some(market) = self.markets.get_mut(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
match &event.event {
crate::book::BookEvent::MarketCreated { .. } => {}
_ => market.book.apply_event(event),
}
market.last_market_seq = event.market_seq;
match &event.event {
crate::book::BookEvent::OrderAccepted {
order_id,
account_id,
runner_id,
side,
price,
persistence,
..
} => self.risk.apply_order_accepted(
*order_id,
OrderMeta {
market_id,
account_id: *account_id,
runner_id: *runner_id,
side: *side,
price: *price,
persistence: *persistence,
},
),
crate::book::BookEvent::OrderCancelled { order_id, .. }
| crate::book::BookEvent::OrderLapsed { order_id, .. }
| crate::book::BookEvent::OrderVoided { order_id, .. } => {
self.risk.apply_order_terminal(*order_id);
}
crate::book::BookEvent::TradeMatched {
trade_id,
maker,
taker,
price,
stake,
..
} => {
self.risk.apply_trade_matched(
market, *trade_id, maker.id, maker.side, taker.id, taker.side, *price, *stake,
);
if market.book.get_order(maker.id).is_none() {
self.risk.apply_order_terminal(maker.id);
}
if market.book.get_order(taker.id).is_none() {
self.risk.apply_order_terminal(taker.id);
}
}
crate::book::BookEvent::TradeVoided { trade_id, .. } => {
self.risk.apply_trade_voided(*trade_id);
}
crate::book::BookEvent::MarketStateChanged { to, .. } => {
if matches!(
to,
crate::book::BookMarketState::Closed
| crate::book::BookMarketState::Voided
| crate::book::BookMarketState::Settled
) {
self.risk.clear_market(market_id);
}
}
crate::book::BookEvent::MarketSettled { .. } => {
self.risk.clear_market(market_id);
}
_ => {}
}
self
}
}