use crate::cli::WaitStrategyOpt;
use crate::control_plane::{ControlPlane, EngineCtlMsg, EngineInstance};
use crate::projection::{PhaseCallback, ProjectionHandler, RemovalCallback, StatusCallback};
use crate::state::{
LiveState, MarketMeta, ProjectionDiskSnapshot, ProjectionSnapshot,
apply_runner_metadata_additions, apply_runner_metadata_removals,
};
use crate::ws_protocol::{MarketInfo, MarketStatus, ServerMsg};
use crate::{PROJECTION_PATH, SNAPSHOT_FREQUENCY, WAL_PATH};
use betex::book::protocol::command::{Command, CommandKind, MarketState};
use betex::{EngineBuilder, MarketConfig, engine::consumers::ResponseCallbackHandler, prelude::*};
use disrupt_rs::wait_strategies::BlockingWaitStrategy;
use parking_lot::RwLock;
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use uuid::Uuid;
pub(crate) fn engine_paths(engine_id: Uuid, default_engine_id: Uuid) -> (PathBuf, PathBuf) {
if engine_id == default_engine_id {
return (PathBuf::from(WAL_PATH), PathBuf::from(PROJECTION_PATH));
}
let wal_path = PathBuf::from("target/live_engines").join(format!("{engine_id}.lmdb"));
let projection_path = wal_path.join("projection.bin");
(wal_path, projection_path)
}
fn market_status_from_initial_state(state: MarketState) -> MarketStatus {
match state {
MarketState::Open | MarketState::Suspended | MarketState::Deactivated => {
MarketStatus::Active
}
MarketState::Closed => MarketStatus::Closed,
}
}
async fn submit_control_command(engine: &EngineInstance, cmd: Command) -> Result<(), String> {
match engine.handler.submit_with_response(cmd) {
Ok(rx) => tokio::task::spawn_blocking(move || rx.recv_timeout(Duration::from_secs(2)))
.await
.map_err(|e| format!("join error: {e}"))
.and_then(|x| x.map_err(|_| "engine response timeout".to_string()))
.and_then(|res| res.map_err(|e| format!("{e}")))
.and_then(|resp| match resp {
betex::book::protocol::response::Response::Rejected { reason, .. } => {
Err(format!("{:?}", reason))
}
betex::book::protocol::response::Response::Fatal { .. } => Err("fatal".into()),
betex::book::protocol::response::Response::Ok { .. } => Ok(()),
}),
Err(e) => Err(format!("{e}")),
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn build_engine_instance(
engine_id: Uuid,
cfg: betex::config::Config,
market_configs: Vec<MarketConfig>,
wal_path: PathBuf,
projection_path: PathBuf,
status_callback: Option<StatusCallback>,
phase_callback: Option<PhaseCallback>,
removal_callback: Option<RemovalCallback>,
market_id_counter: Arc<std::sync::atomic::AtomicU64>,
wait_strategy: WaitStrategyOpt,
) -> anyhow::Result<Arc<EngineInstance>> {
let wal_preexists = wal_path.exists();
std::fs::create_dir_all(&wal_path)?;
let projection_snapshot = Arc::new(RwLock::new(ProjectionSnapshot::default()));
let live_state = Arc::new(RwLock::new(LiveState::default()));
let market_meta: Arc<RwLock<HashMap<MarketId, MarketMeta>>> =
Arc::new(RwLock::new(HashMap::new()));
let final_disk_snapshot: Arc<parking_lot::Mutex<Option<ProjectionDiskSnapshot>>> =
Arc::new(parking_lot::Mutex::new(None));
let disk_snapshot = ProjectionDiskSnapshot::load_postcard(&projection_path)?;
let market_ids_from_disk = disk_snapshot.as_ref().map(|s| s.market_ids.clone());
let market_ids_from_cfg: Vec<MarketId> = market_configs.iter().map(|c| c.market_id()).collect();
let use_disk_snapshot = disk_snapshot.is_some()
&& (market_ids_from_cfg.is_empty()
|| market_ids_from_disk
.as_ref()
.is_some_and(|ids| *ids == market_ids_from_cfg));
let served_market_ids = if market_ids_from_cfg.is_empty() {
market_ids_from_disk.unwrap_or_default()
} else if use_disk_snapshot {
market_ids_from_disk.unwrap_or_else(|| market_ids_from_cfg.clone())
} else {
market_ids_from_cfg.clone()
};
let market_ids = Arc::new(RwLock::new(served_market_ids.clone()));
let (projection_handler, initial_engine_root) = if use_disk_snapshot {
if let Some(disk_snap) = disk_snapshot {
println!(
"[live][engine={engine_id}] Loaded disk snapshot: seq={} with {} market(s)",
disk_snap.global_seq(),
disk_snap.market_ids.len()
);
let engine_root = disk_snap.root.clone();
let projection_handler = ProjectionHandler::from_disk_snapshot(
disk_snap,
Arc::clone(&market_ids),
Arc::clone(&projection_snapshot),
Arc::clone(&live_state),
Arc::clone(&final_disk_snapshot),
projection_path,
SNAPSHOT_FREQUENCY,
status_callback.clone(),
phase_callback.clone(),
removal_callback.clone(),
Arc::clone(&market_meta),
Arc::clone(&market_id_counter),
);
(projection_handler, Some(engine_root))
} else {
println!("[live][engine={engine_id}] No disk snapshot; starting fresh");
let projection_handler = ProjectionHandler::new(
cfg.clone(),
market_configs.clone(),
Arc::clone(&market_ids),
Arc::clone(&projection_snapshot),
Arc::clone(&live_state),
Arc::clone(&final_disk_snapshot),
projection_path,
SNAPSHOT_FREQUENCY,
status_callback.clone(),
phase_callback.clone(),
removal_callback.clone(),
Arc::clone(&market_meta),
Arc::clone(&market_id_counter),
);
(projection_handler, None)
}
} else {
if disk_snapshot.is_some() {
println!(
"[live][engine={engine_id}] Disk snapshot market set differs from config; ignoring snapshot and replaying WAL"
);
}
println!("[live][engine={engine_id}] No disk snapshot; starting fresh");
let projection_handler = ProjectionHandler::new(
cfg.clone(),
market_configs.clone(),
Arc::clone(&market_ids),
Arc::clone(&projection_snapshot),
Arc::clone(&live_state),
Arc::clone(&final_disk_snapshot),
projection_path,
SNAPSHOT_FREQUENCY,
status_callback,
phase_callback,
removal_callback,
Arc::clone(&market_meta),
Arc::clone(&market_id_counter),
);
(projection_handler, None)
};
let is_recovering = initial_engine_root.is_some() || wal_preexists;
let markets: Vec<MarketConfig> = if served_market_ids == market_ids_from_cfg {
market_configs
} else {
served_market_ids
.iter()
.copied()
.map(|mid| MarketConfig::MultiRunner {
market_id: mid,
name: format!("Market {}", mid.0),
runners: Vec::new(),
market_kind: MarketKind::InPlayCapable,
market_state: betex::book::BookMarketState::Open,
market_phase: MarketPhase::Pre,
})
.collect()
};
let projection_recovery = HandlerRecovery::AfterSeq(
initial_engine_root
.as_ref()
.map(|root| root.version())
.unwrap_or(0),
);
let builder = match initial_engine_root {
Some(root) => EngineBuilder::from_root(root, cfg, &wal_path)?,
None => EngineBuilder::with_markets(cfg, markets, &wal_path)?,
};
let engine = match wait_strategy {
WaitStrategyOpt::BusySpin => builder
.register_handler(
ResponseCallbackHandler::default(),
HandlerRecovery::LiveOnly,
None,
)
.register_handler(projection_handler, projection_recovery, None)
.build()?,
WaitStrategyOpt::BlockingWait => builder
.with_wait_strategy(BlockingWaitStrategy::default())
.register_handler(
ResponseCallbackHandler::default(),
HandlerRecovery::LiveOnly,
None,
)
.register_handler(projection_handler, projection_recovery, None)
.build()?,
};
if is_recovering {
println!("[live][engine={engine_id}] Restored from prior state");
} else {
println!("[live][engine={engine_id}] Fresh start");
}
let handler = engine.handle();
Ok(Arc::new(EngineInstance {
id: engine_id,
engine: parking_lot::Mutex::new(Some(engine)),
handler,
projection_snapshot,
live_state,
market_ids,
market_meta,
}))
}
pub(crate) async fn run_engine_ctl_loop(
control: ControlPlane,
ctl_rx: flume::Receiver<EngineCtlMsg>,
default_engine_id: Uuid,
) {
while let Ok(msg) = ctl_rx.recv_async().await {
match msg {
EngineCtlMsg::CreateMarket {
name,
market_model,
book_type,
market_kind,
market_state,
market_phase,
runner_ids,
runner_labels,
resp,
} => {
let market_id = control.inner.market_allocator.next();
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
match market_model {
MarketModel::ExchangeOdds => match book_type {
Some(BookType::MultiRunner) => {
if runner_ids.len() == 1 {
let _ = resp.send(Err(
"ExchangeOdds with MULTI_RUNNER requires runner_ids to be empty or have length >= 2"
.into(),
));
continue;
}
}
Some(BookType::TwoRunner) => {
if runner_ids.len() != 2 {
let _ = resp.send(Err(
"ExchangeOdds with TWO_RUNNER requires exactly 2 runner_ids"
.into(),
));
continue;
}
}
None => {
let _ = resp.send(Err(
"ExchangeOdds requires book_type TWO_RUNNER or MULTI_RUNNER".into(),
));
continue;
}
},
MarketModel::BinaryYes { .. } => {
if book_type.is_some() {
let _ = resp.send(Err("BinaryYes does not accept book_type".into()));
continue;
}
if runner_ids.len() != 2 {
let _ =
resp.send(Err("BinaryYes requires exactly 2 runner_ids".into()));
continue;
}
}
}
if runner_ids.len() != runner_labels.len() {
let _ = resp.send(Err("runner_ids and runner_labels length mismatch".into()));
continue;
}
let engine_id = default_engine_id;
let Some(engine) = control.get_engine(engine_id) else {
let _ = resp.send(Err("engine not found".into()));
continue;
};
let cmd = Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::CreateMarket {
name: name.clone(),
market_model,
book_type,
market_kind,
market_state,
market_phase,
runner_ids: runner_ids.clone(),
runner_labels: runner_labels.clone(),
},
};
if let Err(e) = submit_control_command(&engine, cmd).await {
let _ = resp.send(Err(e));
continue;
}
{
let mut ids = engine.market_ids.write();
if !ids.contains(&market_id) {
ids.push(market_id);
}
}
let info = MarketInfo {
market_id,
name: name.clone(),
engine_id,
market_model,
market_kind,
market_phase,
runner_ids,
runner_labels,
status: market_status_from_initial_state(market_state),
created_at: now_ms,
};
control
.inner
.market_infos
.write()
.insert(market_id, info.clone());
println!(
"[live][engine={engine_id}] Created market: {} (id={}, runners={})",
name,
market_id.0,
info.runner_ids.len()
);
let _ = resp.send(Ok(info));
}
EngineCtlMsg::AddRunners {
market_id,
runner_ids,
runner_labels,
resp,
} => {
if runner_ids.len() < 2 {
let _ = resp.send(Err("runner_ids must have length >= 2".into()));
continue;
}
if runner_ids.len() != runner_labels.len() {
let _ = resp.send(Err("runner_ids and runner_labels length mismatch".into()));
continue;
}
let Some(existing_info) =
control.inner.market_infos.read().get(&market_id).cloned()
else {
let _ = resp.send(Err("market not found".into()));
continue;
};
let Some(engine) = control.get_engine(existing_info.engine_id) else {
let _ = resp.send(Err("engine not found".into()));
continue;
};
let cmd = Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::AddRunners {
runner_ids: runner_ids.clone(),
runner_labels: runner_labels.clone(),
},
};
if let Err(e) = submit_control_command(&engine, cmd).await {
let _ = resp.send(Err(e));
continue;
}
let mut info = existing_info;
apply_runner_metadata_additions(
&mut info.runner_ids,
&mut info.runner_labels,
&runner_ids,
&runner_labels,
);
control
.inner
.market_infos
.write()
.insert(market_id, info.clone());
println!(
"[live][engine={}] Added runners: market_id={} runners={}",
info.engine_id,
market_id.0,
info.runner_ids.len()
);
let _ = control
.inner
.status_broadcast_tx
.send(ServerMsg::MarketUpdated {
market: info.clone(),
});
let _ = resp.send(Ok(info));
}
EngineCtlMsg::ChangeRunners {
market_id,
add,
remove,
resp,
} => {
let Some(existing_info) =
control.inner.market_infos.read().get(&market_id).cloned()
else {
let _ = resp.send(Err("market not found".into()));
continue;
};
let Some(engine) = control.get_engine(existing_info.engine_id) else {
let _ = resp.send(Err("engine not found".into()));
continue;
};
let cmd = Command {
correlation_id: None,
metadata: None,
market_id,
kind: CommandKind::ChangeRunners {
add: add.clone(),
remove: remove.clone(),
},
};
if let Err(e) = submit_control_command(&engine, cmd).await {
let _ = resp.send(Err(e));
continue;
}
let mut info = existing_info;
let removed_runner_ids: Vec<_> =
remove.iter().map(|change| change.runner_id).collect();
let added_runner_ids: Vec<_> = add.iter().map(|change| change.runner_id).collect();
let added_runner_labels: Vec<_> = add
.iter()
.map(|change| change.runner_label.clone())
.collect();
apply_runner_metadata_removals(
&mut info.runner_ids,
&mut info.runner_labels,
&removed_runner_ids,
);
apply_runner_metadata_additions(
&mut info.runner_ids,
&mut info.runner_labels,
&added_runner_ids,
&added_runner_labels,
);
control
.inner
.market_infos
.write()
.insert(market_id, info.clone());
println!(
"[live][engine={}] Changed runners: market_id={} active_runners={}",
info.engine_id,
market_id.0,
info.runner_ids.len()
);
let _ = control
.inner
.status_broadcast_tx
.send(ServerMsg::MarketUpdated {
market: info.clone(),
});
let _ = resp.send(Ok(info));
}
EngineCtlMsg::ListMarkets { resp } => {
let markets: Vec<MarketInfo> = control
.inner
.market_infos
.read()
.values()
.cloned()
.collect();
let _ = resp.send(markets);
}
}
}
}