use crate::{
book::protocol::{
command::{Command, CommandKind},
reject::RejectReason,
},
book::{Book, BookEventEnvelope, MultiRunnerBook},
config::Config,
types::*,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "camelCase")]
pub enum MarketConfig {
TwoRunner {
market_id: MarketId,
name: String,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
market_state: crate::book::BookMarketState,
market_phase: MarketPhase,
},
BinaryYes {
market_id: MarketId,
name: String,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
market_state: crate::book::BookMarketState,
market_phase: MarketPhase,
max_price_ticks: u16,
},
MultiRunner {
market_id: MarketId,
name: String,
runners: Vec<RunnerId>,
market_kind: MarketKind,
market_state: crate::book::BookMarketState,
market_phase: MarketPhase,
},
}
impl MarketConfig {
pub fn market_id(&self) -> MarketId {
match self {
Self::TwoRunner { market_id, .. } => *market_id,
Self::BinaryYes { market_id, .. } => *market_id,
Self::MultiRunner { market_id, .. } => *market_id,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Market {
pub id: MarketId,
pub name: String,
pub market_kind: MarketKind,
pub book: Book,
pub last_market_seq: u64,
}
impl Market {
pub fn from_config_with_cfg(cfg: &Config, config: MarketConfig) -> Self {
let mut market = match config {
MarketConfig::TwoRunner {
market_id,
name,
runner_a,
runner_b,
market_kind,
market_state,
market_phase,
} => Self::named(
market_id,
name,
market_kind,
market_state,
Book::new_two_runner_with_kind_and_capacity(
market_id,
market_kind,
market_phase,
runner_a,
runner_b,
cfg.order_store_capacity,
),
),
MarketConfig::BinaryYes {
market_id,
name,
yes_runner_id,
no_runner_id,
market_kind,
market_state,
market_phase,
max_price_ticks,
} => Self::named(
market_id,
name,
market_kind,
market_state,
Book::new_binary_yes_with_kind_and_capacity(
market_id,
market_kind,
market_phase,
yes_runner_id,
no_runner_id,
max_price_ticks,
cfg.order_store_capacity,
),
),
MarketConfig::MultiRunner {
market_id,
name,
runners,
market_kind,
market_state,
market_phase,
} => Self::named(
market_id,
name,
market_kind,
market_state,
if runners.is_empty() {
Book::new_engine_with_kind_and_capacity(
market_id,
market_kind,
market_phase,
cfg.order_store_capacity,
)
} else {
Book::new_multi_runner_with_kind_and_capacity(
market_id,
market_kind,
market_phase,
runners,
cfg.order_store_capacity,
)
},
),
};
market
.book
.set_close_batch_max_events(cfg.close_batch_max_events);
market
}
fn named(
id: MarketId,
name: String,
market_kind: MarketKind,
market_state: crate::book::BookMarketState,
mut book: Book,
) -> Self {
assert_valid_market_name(&name);
assert!(
market_state.supports_initial_phase_for(market_kind, book.market_phase()),
"market kind {market_kind:?} does not support initial state {market_state:?} and phase {:?}",
book.market_phase()
);
book.set_market_name(&name);
book.set_market_state(market_state);
Self {
id,
name,
market_kind,
book,
last_market_seq: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineState {
pub cfg: Config,
pub markets: HashMap<MarketId, Market>,
}
impl EngineState {
fn assign_market_seqs(market: &Market, events: &mut [BookEventEnvelope]) {
let mut next = market.last_market_seq.saturating_add(1);
for event in events {
event.market_seq = next;
next = next.saturating_add(1);
}
}
fn append_market_removed_event(market: &Market, events: &mut Vec<BookEventEnvelope>) {
let last_event = events
.last()
.expect("accepted market removal flow must emit at least one prior event");
events.push(BookEventEnvelope {
market_id: market.id,
market_name: market.name.clone(),
market_seq: last_event.market_seq.saturating_add(1),
timestamp: last_event.timestamp,
metadata: last_event.metadata.clone(),
event: crate::book::BookEvent::MarketRemoved {
reason: "removed after close completion".to_string(),
},
});
}
fn emits_close_batch_process_completed(events: &[BookEventEnvelope]) -> bool {
events.iter().any(|event| {
matches!(
event.event,
crate::book::BookEvent::BatchProcessCompleted {
batch_mode: crate::book::BatchMode::Close,
}
)
})
}
pub fn new(cfg: Config) -> Self {
Self {
cfg,
markets: HashMap::new(),
}
}
pub fn handle_command(
&mut self,
cmd: &Command,
) -> Result<Vec<BookEventEnvelope>, RejectReason> {
match &cmd.kind {
CommandKind::CreateMarket {
name,
market_model,
book_type,
market_kind,
market_state,
market_phase,
runner_ids,
runner_labels,
} => {
let market_id = cmd.market_id;
if self.markets.contains_key(&market_id) {
return Err(RejectReason::MarketAlreadyExists);
}
if name.trim().is_empty() {
return Err(RejectReason::InvalidMarketConfig);
}
match market_model {
MarketModel::ExchangeOdds => match book_type {
Some(BookType::MultiRunner) => {
if runner_ids.len() == 1 {
return Err(RejectReason::InvalidMarketConfig);
}
}
Some(BookType::TwoRunner) => {
if runner_ids.len() != 2 {
return Err(RejectReason::InvalidMarketConfig);
}
}
None => return Err(RejectReason::InvalidMarketConfig),
},
MarketModel::BinaryYes { max_price_ticks } => {
if book_type.is_some() {
return Err(RejectReason::InvalidMarketConfig);
}
if *max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if let Some(enforced) = self.cfg.enforce_binary_yes_max_price_ticks
&& *max_price_ticks != enforced
{
return Err(RejectReason::InvalidMarketConfig);
}
if runner_ids.len() != 2 {
return Err(RejectReason::InvalidMarketConfig);
}
}
}
MultiRunnerBook::validate_runner_group(runner_ids, runner_labels)?;
let market_state = match market_state {
crate::book::protocol::command::MarketState::Open => {
crate::book::BookMarketState::Open
}
crate::book::protocol::command::MarketState::Suspended => {
crate::book::BookMarketState::Suspended
}
crate::book::protocol::command::MarketState::Deactivated => {
crate::book::BookMarketState::Deactivated
}
crate::book::protocol::command::MarketState::Closed => {
return Err(RejectReason::InvalidMarketConfig);
}
};
if !market_state.supports_initial_phase_for(*market_kind, *market_phase) {
return Err(RejectReason::InvalidMarketConfig);
}
info!(
market_id = ?market_id,
market_name = %name,
market_model = ?market_model,
book_type = ?book_type,
market_kind = ?market_kind,
market_state = ?market_state,
market_phase = ?market_phase,
runner_count = runner_ids.len(),
"creating market"
);
Ok(vec![BookEventEnvelope {
market_id,
market_name: name.clone(),
market_seq: 1,
timestamp: Utc::now(),
metadata: cmd.metadata.clone(),
event: crate::book::BookEvent::MarketCreated {
correlation_id: cmd.correlation_id.clone(),
name: name.clone(),
market_model: *market_model,
book_type: *book_type,
market_kind: *market_kind,
market_state,
market_phase: *market_phase,
runner_ids: runner_ids.clone(),
runner_labels: runner_labels.clone(),
},
}])
}
_ => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get_mut(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
let may_emit_close_batch_completion =
matches!(
&cmd.kind,
CommandKind::CloseMarket { .. }
| CommandKind::SetMarketState {
state: crate::book::protocol::command::MarketState::Closed,
}
) || (matches!(&cmd.kind, CommandKind::ContinueBatchProcess)
&& market
.book
.batch_process_state()
.is_some_and(crate::book::BatchProcessState::is_close));
market
.book
.handle(cmd)
.map(|(mut events, _)| {
Self::assign_market_seqs(market, &mut events);
if may_emit_close_batch_completion
&& Self::emits_close_batch_process_completed(&events)
{
Self::append_market_removed_event(market, &mut events);
}
events
})
.map_err(|e| e.reason)
}
}
}
}
impl Default for EngineState {
fn default() -> Self {
Self::new(Config::default())
}
}
#[derive(Debug, Error)]
#[error("command rejected: {reason:?}")]
pub struct EngineCommandError {
pub reason: RejectReason,
}
impl EngineCommandError {
pub fn into_reason(self) -> RejectReason {
self.reason
}
}
impl lucidstream::traits::Aggregate for EngineState {
type Command = Command;
type Event = BookEventEnvelope;
type Error = EngineCommandError;
fn kind() -> &'static str {
"betex"
}
fn handle(&mut self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
let events = self
.handle_command(&command)
.map_err(|reason| EngineCommandError { reason })?;
if events.is_empty() {
return Err(EngineCommandError {
reason: RejectReason::InternalError,
});
}
Ok(events)
}
fn apply(mut self, event: &Self::Event) -> Self {
let market_id = event.market_id;
if let crate::book::BookEvent::MarketCreated {
name,
market_model,
book_type,
market_kind,
market_state,
market_phase,
runner_ids,
runner_labels,
..
} = &event.event
{
assert_eq!(
event.market_seq, 1,
"market sequence must start at 1 for {:?}",
market_id
);
let config = match market_model {
MarketModel::ExchangeOdds => match (*book_type, runner_ids.as_slice()) {
(Some(BookType::TwoRunner), [runner_a, runner_b]) => MarketConfig::TwoRunner {
market_id,
name: name.clone(),
runner_a: *runner_a,
runner_b: *runner_b,
market_kind: *market_kind,
market_state: *market_state,
market_phase: *market_phase,
},
(Some(BookType::MultiRunner), [])
| (Some(BookType::MultiRunner), [_, _])
| (Some(BookType::MultiRunner), [_, _, _, ..]) => MarketConfig::MultiRunner {
market_id,
name: name.clone(),
runners: runner_ids.clone(),
market_kind: *market_kind,
market_state: *market_state,
market_phase: *market_phase,
},
(None, _)
| (Some(BookType::TwoRunner), [])
| (Some(BookType::TwoRunner), [_])
| (Some(BookType::TwoRunner), [_, _, ..])
| (Some(BookType::MultiRunner), [_]) => {
panic!("invalid exchange market config reached apply")
}
},
MarketModel::BinaryYes { max_price_ticks } => {
if book_type.is_some() {
panic!("BinaryYes market must not set exchange book_type");
}
if runner_ids.len() != 2 {
panic!("BinaryYes market requires exactly 2 runner_ids");
}
MarketConfig::BinaryYes {
market_id,
name: name.clone(),
yes_runner_id: runner_ids[0],
no_runner_id: runner_ids[1],
market_kind: *market_kind,
market_state: *market_state,
market_phase: *market_phase,
max_price_ticks: *max_price_ticks,
}
}
};
let mut market = Market::from_config_with_cfg(&self.cfg, config);
market.book.set_runner_labels(runner_ids, runner_labels);
market.last_market_seq = event.market_seq;
self.markets.insert(market_id, market);
return self;
}
if matches!(&event.event, crate::book::BookEvent::MarketRemoved { .. }) {
let Some(market) = self.markets.get(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
self.markets.remove(&market_id);
return self;
}
let Some(market) = self.markets.get_mut(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
market.book.apply_event(event);
market.last_market_seq = event.market_seq;
self
}
}
fn assert_valid_market_name(name: &str) {
assert!(!name.trim().is_empty(), "market name must not be blank");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
book::{
BookEvent,
protocol::command::{MarketState, Persistence, Side, TimeInForce},
},
types::unix_epoch,
};
use lucidstream::traits::Aggregate;
#[test]
fn market_config_name_propagates_to_order_events() {
let market_id = MarketId(7);
let market_name = "Match Odds";
let mut state = EngineState::new(Config::default());
state.markets.insert(
market_id,
Market::from_config_with_cfg(
&state.cfg,
MarketConfig::TwoRunner {
market_id,
name: market_name.to_string(),
runner_a: RunnerId(1),
runner_b: RunnerId(2),
market_kind: MarketKind::InPlayCapable,
market_state: crate::book::BookMarketState::Open,
market_phase: MarketPhase::Pre,
},
),
);
let events = state
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: RunnerId(1),
account_id: AccountId::from(10u64),
client_order_id: None,
side: Side::Back,
odds: OddsX10000(20000),
stake: Money(100),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
})
.expect("place order should succeed");
assert_eq!(events[0].market_name, market_name);
}
#[test]
fn create_market_rejects_blank_name() {
let mut state = EngineState::new(Config::default());
let err = state
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id: MarketId(1),
kind: CommandKind::CreateMarket {
name: " ".to_string(),
market_model: MarketModel::ExchangeOdds,
book_type: Some(BookType::TwoRunner),
market_kind: MarketKind::InPlayCapable,
market_state: crate::book::protocol::command::MarketState::Open,
market_phase: MarketPhase::Pre,
runner_ids: vec![RunnerId(1), RunnerId(2)],
runner_labels: vec!["A".to_string(), "B".to_string()],
},
})
.expect_err("blank name should be rejected");
assert_eq!(err, RejectReason::InvalidMarketConfig);
}
#[test]
fn create_market_carries_and_replays_initial_market_state() {
let market_id = MarketId(8);
let state = EngineState::new(Config::default());
let events = state
.clone()
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::CreateMarket {
name: "Deactivated at birth".to_string(),
market_model: MarketModel::ExchangeOdds,
book_type: Some(BookType::TwoRunner),
market_kind: MarketKind::InPlayCapable,
market_state: MarketState::Deactivated,
market_phase: MarketPhase::PreAwaitLive,
runner_ids: vec![RunnerId(1), RunnerId(2)],
runner_labels: vec!["A".to_string(), "B".to_string()],
},
})
.expect("create market should succeed");
match &events[0].event {
BookEvent::MarketCreated {
market_state,
market_phase,
..
} => {
assert_eq!(*market_state, crate::book::BookMarketState::Deactivated);
assert_eq!(*market_phase, MarketPhase::PreAwaitLive);
}
other => panic!("expected MarketCreated, got {other:?}"),
}
let state = Aggregate::apply(state, &events[0]);
let market = state.markets.get(&market_id).expect("market should exist");
assert_eq!(
market.book.market_state(),
crate::book::BookMarketState::Deactivated
);
assert_eq!(market.book.market_phase(), MarketPhase::PreAwaitLive);
}
#[test]
fn replayed_market_created_replaces_placeholder_market_metadata() {
let market_id = MarketId(9);
let runner_ids = [RunnerId(1), RunnerId(2)];
let mut state = EngineState::new(Config::default());
state.markets.insert(
market_id,
Market::from_config_with_cfg(
&state.cfg,
MarketConfig::MultiRunner {
market_id,
name: "Placeholder".to_string(),
runners: Vec::new(),
market_kind: MarketKind::InPlayCapable,
market_state: crate::book::BookMarketState::Open,
market_phase: MarketPhase::Pre,
},
),
);
let state = Aggregate::apply(
state,
&BookEventEnvelope {
market_id,
market_name: "Authoritative".to_string(),
market_seq: 1,
timestamp: unix_epoch(),
metadata: None,
event: BookEvent::MarketCreated {
correlation_id: None,
name: "Authoritative".to_string(),
market_model: MarketModel::ExchangeOdds,
book_type: Some(BookType::TwoRunner),
market_kind: MarketKind::InPlayCapable,
market_state: crate::book::BookMarketState::Open,
market_phase: MarketPhase::Pre,
runner_ids: runner_ids.to_vec(),
runner_labels: vec!["Home".to_string(), "Away".to_string()],
},
},
);
let market = state.markets.get(&market_id).expect("market should exist");
assert_eq!(market.name, "Authoritative");
let events = state
.clone()
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: runner_ids[0],
account_id: AccountId::from(10u64),
client_order_id: None,
side: Side::Back,
odds: OddsX10000(20000),
stake: Money(100),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
})
.expect("place order should succeed after replay");
assert_eq!(events[0].market_name, "Authoritative");
match &events[0].event {
BookEvent::OrderAccepted { runner_label, .. } => assert_eq!(runner_label, "Home"),
other => panic!("expected OrderAccepted, got {other:?}"),
}
}
}