use crate::{
book::protocol::{
command::{Command, CommandKind},
reject::RejectReason,
},
book::{Book, BookEventEnvelope},
config::Config,
types::*,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "camelCase")]
pub enum MarketConfig {
TwoRunner {
market_id: MarketId,
name: String,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
},
BinaryYes {
market_id: MarketId,
name: String,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
max_price_ticks: u16,
},
MultiRunner {
market_id: MarketId,
name: String,
runners: Vec<RunnerId>,
market_kind: MarketKind,
},
}
impl MarketConfig {
pub fn two_runner(
market_id: MarketId,
name: impl Into<String>,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
) -> Self {
let name = name.into();
assert_valid_market_name(&name);
Self::TwoRunner {
market_id,
name,
runner_a,
runner_b,
market_kind,
}
}
pub fn multi_runner(
market_id: MarketId,
name: impl Into<String>,
runners: impl IntoIterator<Item = RunnerId>,
market_kind: MarketKind,
) -> Self {
let name = name.into();
assert_valid_market_name(&name);
Self::MultiRunner {
market_id,
name,
runners: runners.into_iter().collect(),
market_kind,
}
}
pub fn multi_runner_dynamic(
market_id: MarketId,
name: impl Into<String>,
market_kind: MarketKind,
) -> Self {
let name = name.into();
assert_valid_market_name(&name);
Self::MultiRunner {
market_id,
name,
runners: Vec::new(),
market_kind,
}
}
pub fn binary_yes(
market_id: MarketId,
name: impl Into<String>,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
max_price_ticks: u16,
) -> Self {
let name = name.into();
assert_valid_market_name(&name);
Self::BinaryYes {
market_id,
name,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
}
}
pub fn market_id(&self) -> MarketId {
match self {
Self::TwoRunner { market_id, .. } => *market_id,
Self::BinaryYes { market_id, .. } => *market_id,
Self::MultiRunner { market_id, .. } => *market_id,
}
}
pub fn name(&self) -> &str {
match self {
Self::TwoRunner { name, .. }
| Self::BinaryYes { name, .. }
| Self::MultiRunner { name, .. } => name,
}
}
pub fn market_kind(&self) -> MarketKind {
match self {
Self::TwoRunner { market_kind, .. }
| Self::BinaryYes { market_kind, .. }
| Self::MultiRunner { market_kind, .. } => *market_kind,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Market {
pub id: MarketId,
pub name: String,
pub market_kind: MarketKind,
pub book: Book,
pub last_market_seq: u64,
}
impl Market {
pub fn from_config_with_cfg(cfg: &Config, config: MarketConfig) -> Self {
match config {
MarketConfig::TwoRunner {
market_id,
name,
runner_a,
runner_b,
market_kind,
} => Self::named(
market_id,
name,
market_kind,
Book::new_two_runner_with_kind_and_capacity(
market_id,
market_kind,
runner_a,
runner_b,
cfg.order_store_capacity,
),
),
MarketConfig::BinaryYes {
market_id,
name,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
} => Self::named(
market_id,
name,
market_kind,
Book::new_binary_yes_with_kind_and_capacity(
market_id,
market_kind,
yes_runner_id,
no_runner_id,
max_price_ticks,
cfg.order_store_capacity,
),
),
MarketConfig::MultiRunner {
market_id,
name,
runners,
market_kind,
} => Self::named(
market_id,
name,
market_kind,
if runners.is_empty() {
Book::new_engine_with_kind_and_capacity(
market_id,
market_kind,
cfg.order_store_capacity,
)
} else {
Book::new_multi_runner_with_kind_and_capacity(
market_id,
market_kind,
runners,
cfg.order_store_capacity,
)
},
),
}
}
fn named(id: MarketId, name: String, market_kind: MarketKind, mut book: Book) -> Self {
book.set_market_name(&name);
Self {
id,
name,
market_kind,
book,
last_market_seq: 0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineState {
pub cfg: Config,
pub markets: HashMap<MarketId, Market>,
}
impl EngineState {
fn assign_market_seqs(market: &Market, events: &mut [BookEventEnvelope]) {
let mut next = market.last_market_seq.saturating_add(1);
for event in events {
event.market_seq = next;
next = next.saturating_add(1);
}
}
pub fn new(cfg: Config) -> Self {
Self {
cfg,
markets: HashMap::new(),
}
}
pub fn handle_command(
&mut self,
cmd: &Command,
) -> Result<Vec<BookEventEnvelope>, RejectReason> {
match &cmd.kind {
CommandKind::CreateMarket {
name,
market_model,
market_kind,
runner_ids,
runner_labels,
} => {
let market_id = cmd.market_id;
if self.markets.contains_key(&market_id) {
return Err(RejectReason::MarketAlreadyExists);
}
if name.trim().is_empty() {
return Err(RejectReason::InvalidMarketConfig);
}
if runner_ids.len() < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
match market_model {
MarketModel::ExchangeOdds => {}
MarketModel::BinaryYes { max_price_ticks } => {
if *max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if let Some(enforced) = self.cfg.enforce_binary_yes_max_price_ticks
&& *max_price_ticks != enforced
{
return Err(RejectReason::InvalidMarketConfig);
}
if runner_ids.len() != 2 {
return Err(RejectReason::InvalidMarketConfig);
}
}
}
if runner_labels.len() != runner_ids.len() {
return Err(RejectReason::InvalidMarketConfig);
}
if runner_labels.iter().any(|label| label.trim().is_empty()) {
return Err(RejectReason::InvalidMarketConfig);
}
let distinct_labels: std::collections::HashSet<_> = runner_labels.iter().collect();
if distinct_labels.len() != runner_labels.len() {
return Err(RejectReason::InvalidMarketConfig);
}
info!(
market_id = ?market_id,
market_name = %name,
market_model = ?market_model,
market_kind = ?market_kind,
runner_count = runner_ids.len(),
"creating market"
);
Ok(vec![BookEventEnvelope {
market_id,
market_name: name.clone(),
market_seq: 1,
timestamp: Utc::now(),
metadata: cmd.metadata.clone(),
event: crate::book::BookEvent::MarketCreated {
correlation_id: cmd.correlation_id.clone(),
name: name.clone(),
market_model: *market_model,
market_kind: *market_kind,
runner_ids: runner_ids.clone(),
runner_labels: runner_labels.clone(),
},
}])
}
CommandKind::RemoveMarket => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
if !market.book.market_state().is_terminal() {
return Err(RejectReason::MarketNotTerminal);
}
info!(market_id = ?market_id, "removing terminal market");
Ok(vec![BookEventEnvelope {
market_id,
market_name: market.name.clone(),
market_seq: market.last_market_seq.saturating_add(1),
timestamp: Utc::now(),
metadata: cmd.metadata.clone(),
event: crate::book::BookEvent::MarketRemoved {
reason: "removed via command".to_string(),
},
}])
}
_ => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get_mut(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
market
.book
.handle(cmd)
.map(|(mut events, _)| {
Self::assign_market_seqs(market, &mut events);
events
})
.map_err(|e| e.reason)
}
}
}
}
impl Default for EngineState {
fn default() -> Self {
Self::new(Config::default())
}
}
#[derive(Debug, Error)]
#[error("command rejected: {reason:?}")]
pub struct EngineCommandError {
pub reason: RejectReason,
}
impl EngineCommandError {
pub fn into_reason(self) -> RejectReason {
self.reason
}
}
impl lucidstream::traits::Aggregate for EngineState {
type Command = Command;
type Event = BookEventEnvelope;
type Error = EngineCommandError;
fn kind() -> &'static str {
"betex"
}
fn handle(&mut self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
let events = self
.handle_command(&command)
.map_err(|reason| EngineCommandError { reason })?;
if events.is_empty() {
return Err(EngineCommandError {
reason: RejectReason::InternalError,
});
}
Ok(events)
}
fn apply(mut self, event: &Self::Event) -> Self {
let market_id = event.market_id;
if let crate::book::BookEvent::MarketCreated {
name,
market_model,
market_kind,
runner_ids,
runner_labels,
..
} = &event.event
{
assert_eq!(
event.market_seq, 1,
"market sequence must start at 1 for {:?}",
market_id
);
let config = match market_model {
MarketModel::ExchangeOdds => {
if runner_ids.len() == 2 {
MarketConfig::two_runner(
market_id,
name.clone(),
runner_ids[0],
runner_ids[1],
*market_kind,
)
} else if runner_ids.is_empty() {
MarketConfig::multi_runner_dynamic(market_id, name.clone(), *market_kind)
} else {
MarketConfig::multi_runner(
market_id,
name.clone(),
runner_ids.clone(),
*market_kind,
)
}
}
MarketModel::BinaryYes { max_price_ticks } => {
if runner_ids.len() != 2 {
panic!("BinaryYes market requires exactly 2 runner_ids");
}
MarketConfig::binary_yes(
market_id,
name.clone(),
runner_ids[0],
runner_ids[1],
*market_kind,
*max_price_ticks,
)
}
};
let mut market = Market::from_config_with_cfg(&self.cfg, config);
market.book.set_runner_labels(runner_ids, runner_labels);
market.last_market_seq = event.market_seq;
self.markets.insert(market_id, market);
return self;
}
if matches!(&event.event, crate::book::BookEvent::MarketRemoved { .. }) {
let Some(market) = self.markets.get(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
self.markets.remove(&market_id);
return self;
}
let Some(market) = self.markets.get_mut(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
market.book.apply_event(event);
market.last_market_seq = event.market_seq;
self
}
}
fn assert_valid_market_name(name: &str) {
assert!(!name.trim().is_empty(), "market name must not be blank");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
book::{
BookEvent,
protocol::command::{Persistence, Side, TimeInForce},
},
types::unix_epoch,
};
use lucidstream::traits::Aggregate;
#[test]
fn market_config_name_propagates_to_order_events() {
let market_id = MarketId(7);
let market_name = "Match Odds";
let mut state = EngineState::new(Config::default());
state.markets.insert(
market_id,
Market::from_config_with_cfg(
&state.cfg,
MarketConfig::two_runner(
market_id,
market_name,
RunnerId(1),
RunnerId(2),
MarketKind::InPlayCapable,
),
),
);
let events = state
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: RunnerId(1),
account_id: AccountId::from(10u64),
client_order_id: None,
side: Side::Back,
odds: OddsX10000(20000),
stake: Money(100),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
})
.expect("place order should succeed");
assert_eq!(events[0].market_name, market_name);
}
#[test]
fn create_market_rejects_blank_name() {
let mut state = EngineState::new(Config::default());
let err = state
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id: MarketId(1),
kind: CommandKind::CreateMarket {
name: " ".to_string(),
market_model: MarketModel::ExchangeOdds,
market_kind: MarketKind::InPlayCapable,
runner_ids: vec![RunnerId(1), RunnerId(2)],
runner_labels: vec!["A".to_string(), "B".to_string()],
},
})
.expect_err("blank name should be rejected");
assert_eq!(err, RejectReason::InvalidMarketConfig);
}
#[test]
fn replayed_market_created_replaces_placeholder_market_metadata() {
let market_id = MarketId(9);
let runner_ids = [RunnerId(1), RunnerId(2)];
let mut state = EngineState::new(Config::default());
state.markets.insert(
market_id,
Market::from_config_with_cfg(
&state.cfg,
MarketConfig::multi_runner_dynamic(
market_id,
"Placeholder",
MarketKind::InPlayCapable,
),
),
);
let state = Aggregate::apply(
state,
&BookEventEnvelope {
market_id,
market_name: "Authoritative".to_string(),
market_seq: 1,
timestamp: unix_epoch(),
metadata: None,
event: BookEvent::MarketCreated {
correlation_id: None,
name: "Authoritative".to_string(),
market_model: MarketModel::ExchangeOdds,
market_kind: MarketKind::InPlayCapable,
runner_ids: runner_ids.to_vec(),
runner_labels: vec!["Home".to_string(), "Away".to_string()],
},
},
);
let market = state.markets.get(&market_id).expect("market should exist");
assert_eq!(market.name, "Authoritative");
let events = state
.clone()
.handle_command(&Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: runner_ids[0],
account_id: AccountId::from(10u64),
client_order_id: None,
side: Side::Back,
odds: OddsX10000(20000),
stake: Money(100),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
})
.expect("place order should succeed after replay");
assert_eq!(events[0].market_name, "Authoritative");
match &events[0].event {
BookEvent::OrderAccepted { runner_label, .. } => assert_eq!(runner_label, "Home"),
other => panic!("expected OrderAccepted, got {other:?}"),
}
}
}