use crate::{
book::protocol::{
command::{Command, CommandKind},
reject::RejectReason,
},
book::{Book, BookEventEnvelope},
config::Config,
types::*,
};
use chrono::Utc;
use hashbrown::HashMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "camelCase")]
pub enum MarketConfig {
TwoRunner {
market_id: MarketId,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
},
BinaryYes {
market_id: MarketId,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
max_price_ticks: u16,
},
MultiRunner {
market_id: MarketId,
runners: Vec<RunnerId>,
market_kind: MarketKind,
},
}
impl MarketConfig {
pub fn two_runner(
market_id: MarketId,
runner_a: RunnerId,
runner_b: RunnerId,
market_kind: MarketKind,
) -> Self {
Self::TwoRunner {
market_id,
runner_a,
runner_b,
market_kind,
}
}
pub fn multi_runner(
market_id: MarketId,
runners: impl IntoIterator<Item = RunnerId>,
market_kind: MarketKind,
) -> Self {
Self::MultiRunner {
market_id,
runners: runners.into_iter().collect(),
market_kind,
}
}
pub fn multi_runner_dynamic(market_id: MarketId, market_kind: MarketKind) -> Self {
Self::MultiRunner {
market_id,
runners: Vec::new(),
market_kind,
}
}
pub fn binary_yes(
market_id: MarketId,
yes_runner_id: RunnerId,
no_runner_id: RunnerId,
market_kind: MarketKind,
max_price_ticks: u16,
) -> Self {
Self::BinaryYes {
market_id,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
}
}
pub fn market_id(&self) -> MarketId {
match self {
Self::TwoRunner { market_id, .. } => *market_id,
Self::BinaryYes { market_id, .. } => *market_id,
Self::MultiRunner { market_id, .. } => *market_id,
}
}
pub fn market_kind(&self) -> MarketKind {
match self {
Self::TwoRunner { market_kind, .. }
| Self::BinaryYes { market_kind, .. }
| Self::MultiRunner { market_kind, .. } => *market_kind,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Market {
pub id: MarketId,
pub name: String,
pub market_kind: MarketKind,
pub book: Book,
pub last_market_seq: u64,
}
impl Market {
pub fn new(id: MarketId, market_kind: MarketKind) -> Self {
Self {
id,
name: String::new(),
market_kind,
book: Book::new_engine_with_kind(id, market_kind),
last_market_seq: 0,
}
}
pub fn new_with_cfg(id: MarketId, market_kind: MarketKind, cfg: &Config) -> Self {
Self {
id,
name: String::new(),
market_kind,
book: Book::new_engine_with_kind_and_capacity(
id,
market_kind,
cfg.order_store_capacity,
),
last_market_seq: 0,
}
}
pub fn from_config(config: MarketConfig) -> Self {
match config {
MarketConfig::TwoRunner {
market_id,
runner_a,
runner_b,
market_kind,
} => Self {
id: market_id,
name: String::new(),
market_kind,
book: Book::new_two_runner_with_kind(market_id, market_kind, runner_a, runner_b),
last_market_seq: 0,
},
MarketConfig::BinaryYes {
market_id,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
} => Self {
id: market_id,
name: String::new(),
market_kind,
book: Book::new_binary_yes_with_kind(
market_id,
market_kind,
yes_runner_id,
no_runner_id,
max_price_ticks,
),
last_market_seq: 0,
},
MarketConfig::MultiRunner {
market_id,
runners,
market_kind,
} => Self {
id: market_id,
name: String::new(),
market_kind,
book: if runners.is_empty() {
Book::new_engine_with_kind(market_id, market_kind)
} else {
Book::new_multi_runner_with_kind(market_id, market_kind, runners)
},
last_market_seq: 0,
},
}
}
pub fn from_config_with_cfg(cfg: &Config, config: MarketConfig) -> Self {
match config {
MarketConfig::TwoRunner {
market_id,
runner_a,
runner_b,
market_kind,
} => Self {
id: market_id,
name: String::new(),
market_kind,
book: Book::new_two_runner_with_kind_and_capacity(
market_id,
market_kind,
runner_a,
runner_b,
cfg.order_store_capacity,
),
last_market_seq: 0,
},
MarketConfig::BinaryYes {
market_id,
yes_runner_id,
no_runner_id,
market_kind,
max_price_ticks,
} => Self {
id: market_id,
name: String::new(),
market_kind,
book: Book::new_binary_yes_with_kind_and_capacity(
market_id,
market_kind,
yes_runner_id,
no_runner_id,
max_price_ticks,
cfg.order_store_capacity,
),
last_market_seq: 0,
},
MarketConfig::MultiRunner {
market_id,
runners,
market_kind,
} => Self {
id: market_id,
name: String::new(),
market_kind,
book: if runners.is_empty() {
Book::new_engine_with_kind_and_capacity(
market_id,
market_kind,
cfg.order_store_capacity,
)
} else {
Book::new_multi_runner_with_kind_and_capacity(
market_id,
market_kind,
runners,
cfg.order_store_capacity,
)
},
last_market_seq: 0,
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineState {
pub cfg: Config,
pub markets: HashMap<MarketId, Market>,
}
impl EngineState {
fn assign_market_seqs(market: &Market, events: &mut [BookEventEnvelope]) {
let mut next = market.last_market_seq.saturating_add(1);
for event in events {
event.market_seq = next;
next = next.saturating_add(1);
}
}
pub fn new(cfg: Config) -> Self {
Self {
cfg,
markets: HashMap::new(),
}
}
pub fn handle_command(
&mut self,
cmd: &Command,
) -> Result<Vec<BookEventEnvelope>, RejectReason> {
match &cmd.kind {
CommandKind::CreateMarket {
name,
market_model,
market_kind,
runner_ids,
runner_labels,
} => {
let market_id = cmd.market_id;
if self.markets.contains_key(&market_id) {
return Err(RejectReason::MarketAlreadyExists);
}
if runner_ids.len() == 1 {
return Err(RejectReason::InvalidMarketConfig);
}
match market_model {
MarketModel::ExchangeOdds => {}
MarketModel::BinaryYes { max_price_ticks } => {
if *max_price_ticks < 2 {
return Err(RejectReason::InvalidMarketConfig);
}
if let Some(enforced) = self.cfg.enforce_binary_yes_max_price_ticks
&& *max_price_ticks != enforced
{
return Err(RejectReason::InvalidMarketConfig);
}
if runner_ids.len() != 2 {
return Err(RejectReason::InvalidMarketConfig);
}
}
}
if !runner_labels.is_empty() && runner_labels.len() != runner_ids.len() {
return Err(RejectReason::InvalidMarketConfig);
}
info!(
market_id = ?market_id,
market_name = %name,
market_model = ?market_model,
market_kind = ?market_kind,
runner_count = runner_ids.len(),
"creating market"
);
Ok(vec![BookEventEnvelope {
market_id,
market_name: name.clone(),
market_seq: 1,
timestamp: Utc::now(),
metadata: cmd.metadata.clone(),
event: crate::book::BookEvent::MarketCreated {
correlation_id: cmd.correlation_id.clone(),
name: name.clone(),
market_model: *market_model,
market_kind: *market_kind,
runner_ids: runner_ids.clone(),
runner_labels: runner_labels.clone(),
},
}])
}
CommandKind::RemoveMarket => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
if !market.book.market_state().is_terminal() {
return Err(RejectReason::MarketNotTerminal);
}
info!(market_id = ?market_id, "removing terminal market");
Ok(vec![BookEventEnvelope {
market_id,
market_name: market.name.clone(),
market_seq: market.last_market_seq.saturating_add(1),
timestamp: Utc::now(),
metadata: cmd.metadata.clone(),
event: crate::book::BookEvent::MarketRemoved {
reason: "removed via command".to_string(),
},
}])
}
_ => {
let market_id = cmd.market_id;
let Some(market) = self.markets.get_mut(&market_id) else {
return Err(RejectReason::MarketNotFound);
};
market
.book
.handle(cmd)
.map(|(mut events, _)| {
Self::assign_market_seqs(market, &mut events);
events
})
.map_err(|e| e.reason)
}
}
}
}
impl Default for EngineState {
fn default() -> Self {
Self::new(Config::default())
}
}
#[derive(Debug, Error)]
#[error("command rejected: {reason:?}")]
pub struct EngineCommandError {
pub reason: RejectReason,
}
impl EngineCommandError {
pub fn into_reason(self) -> RejectReason {
self.reason
}
}
impl lucidstream::traits::Aggregate for EngineState {
type Command = Command;
type Event = BookEventEnvelope;
type Error = EngineCommandError;
fn kind() -> &'static str {
"betex"
}
fn handle(&mut self, command: Self::Command) -> Result<Vec<Self::Event>, Self::Error> {
let events = self
.handle_command(&command)
.map_err(|reason| EngineCommandError { reason })?;
if events.is_empty() {
return Err(EngineCommandError {
reason: RejectReason::InternalError,
});
}
Ok(events)
}
fn apply(mut self, event: &Self::Event) -> Self {
let market_id = event.market_id;
if !self.markets.contains_key(&market_id) {
match &event.event {
crate::book::BookEvent::MarketCreated {
name,
market_model,
market_kind,
runner_ids,
..
} => {
assert_eq!(
event.market_seq, 1,
"market sequence must start at 1 for {:?}",
market_id
);
let config = match market_model {
MarketModel::ExchangeOdds => {
if runner_ids.len() == 2 {
MarketConfig::two_runner(
market_id,
runner_ids[0],
runner_ids[1],
*market_kind,
)
} else if runner_ids.is_empty() {
MarketConfig::multi_runner_dynamic(market_id, *market_kind)
} else {
MarketConfig::multi_runner(
market_id,
runner_ids.clone(),
*market_kind,
)
}
}
MarketModel::BinaryYes { max_price_ticks } => {
if runner_ids.len() != 2 {
panic!("BinaryYes market requires exactly 2 runner_ids");
}
MarketConfig::binary_yes(
market_id,
runner_ids[0],
runner_ids[1],
*market_kind,
*max_price_ticks,
)
}
};
self.markets
.insert(market_id, Market::from_config_with_cfg(&self.cfg, config));
if let Some(market) = self.markets.get_mut(&market_id) {
market.name = name.clone();
market.book.set_market_name(name.clone());
market.last_market_seq = event.market_seq;
}
return self;
}
_ => {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
)
}
}
}
if matches!(&event.event, crate::book::BookEvent::MarketRemoved { .. }) {
let Some(market) = self.markets.get(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
self.markets.remove(&market_id);
return self;
}
let Some(market) = self.markets.get_mut(&market_id) else {
panic!(
"event references unknown market_id {:?}; expected MARKET_CREATED first",
market_id
);
};
let expected_market_seq = market.last_market_seq.saturating_add(1);
assert_eq!(
event.market_seq, expected_market_seq,
"market sequence gap for {:?}: expected {}, got {}",
market_id, expected_market_seq, event.market_seq
);
match &event.event {
crate::book::BookEvent::MarketCreated { .. } => {}
_ => market.book.apply_event(event),
}
market.last_market_seq = event.market_seq;
self
}
}