betex 0.35.0

Betfair / Prediction Market Exchange
Documentation
use crate::{
    engine::state::EngineState,
    snapshot::{
        BinaryMarketSnapshot, ExchangeMarketSnapshot, MarketBookSnapshot, MarketSnapshot,
        RunnerRefSnapshot, RunnerSnapshot, SnapshotPriceSelections,
    },
    types::{MarketId, MarketModel, RunnerId},
};
use serde::{Deserialize, Serialize};
use std::ops::{Deref, DerefMut};
use tracing::trace;
use uuid::Uuid;

/// The engine's aggregate root (event-sourced).
///
/// This is a thin wrapper over `lucidstream::types::AggregateRoot<EngineState>` that adds engine
/// specific helpers like `market_snapshot`.
///
/// ## Invariants
///
/// The EngineRoot maintains several critical invariants:
///
/// - **Version Monotonicity**: Version increases by exactly 1 per applied event
/// - **Sequence Alignment**: Version equals the sequence number of the last applied event
/// - **tx_id Uniqueness**: Transaction IDs are never reused across the engine lifetime
///
/// ## Event Sourcing Model
///
/// Commands and events follow a strict separation:
///
/// - **`handle(cmd)`**: Validates command, emits events to pending buffer
/// - **`apply(events)`**: Mutates state based on events (pure state transition)
///
/// This separation enables:
/// 1. Replay: Apply events without re-validation
/// 2. Testing: Verify events without side effects
/// 3. Projection: Multiple projections from the same event stream
///
/// ## Recovery Protocol
///
/// On startup, the engine replays WAL events:
///
/// ```ignore
/// // Recovery loop (simplified)
/// for event in wal.events_after(snapshot.version) {
///     root.assert_next_seq(event.seq);  // Panic on gap
///     root.apply_single(&event);
/// }
/// ```
///
/// ## Snapshot Support
///
/// EngineRoot is `Serialize`/`Deserialize` for snapshotting:
///
/// ```ignore
/// // Save snapshot
/// let json = serde_json::to_string(&root)?;
/// fs::write("snapshot.json", json)?;
///
/// // Restore from snapshot
/// let root: EngineRoot = serde_json::from_str(&fs::read_to_string("snapshot.json")?)?;
/// let builder = EngineBuilder::from_root(root, config, wal_path)?;
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EngineRoot {
    inner: lucidstream::types::AggregateRoot<EngineState>,
    next_tx_id: u64,
}

impl EngineRoot {
    pub fn new(state: EngineState) -> Self {
        Self::new_with_id_and_version(Uuid::new_v4(), state, 0)
    }

    pub fn new_with(state: EngineState, version: u64) -> Self {
        Self::new_with_id_and_version(Uuid::new_v4(), state, version)
    }

    pub fn new_with_id(engine_id: Uuid, state: EngineState) -> Self {
        Self::new_with_id_and_version(engine_id, state, 0)
    }

    pub fn new_with_id_and_version(engine_id: Uuid, state: EngineState, version: u64) -> Self {
        Self {
            inner: lucidstream::types::AggregateRoot::new_with(engine_id, state, version),
            next_tx_id: 0,
        }
    }

    pub fn next_tx_id(&self) -> u64 {
        self.next_tx_id
    }

    pub fn observe_tx_id(&mut self, tx_id: u64) {
        self.next_tx_id = self.next_tx_id.max(tx_id.saturating_add(1));
    }

    pub fn take_tx_id(&mut self) -> u64 {
        let tx_id = self.next_tx_id;
        self.next_tx_id = self.next_tx_id.saturating_add(1);
        tx_id
    }

    pub fn assert_next_seq(&self, seq: u64) {
        let expected = self.version().saturating_add(1);
        assert_eq!(
            seq, expected,
            "sequence gap: expected {}, got {}",
            expected, seq
        );
    }

    /// Pure read-only query for a market snapshot at the current `version()`.
    pub fn market_snapshot(&self, market_id: MarketId, depth: usize) -> Option<MarketSnapshot> {
        trace!(market_id = ?market_id, depth, "building market snapshot");
        let state = self.state();
        let market = state.markets.get(&market_id)?;

        let book = match market.book.market_model() {
            MarketModel::ExchangeOdds => {
                let runners = market
                    .book
                    .runners()
                    .map(|runner_id| {
                        let prices = market.book.runner_prices(runner_id, depth);
                        let selected_to_back =
                            SnapshotPriceSelections::from_levels(&prices.available_to_back);
                        let selected_to_lay =
                            SnapshotPriceSelections::from_levels(&prices.available_to_lay);
                        RunnerSnapshot {
                            runner_id,
                            runner_label: market.book.runner_label(runner_id).to_owned(),
                            available_to_back: prices.available_to_back,
                            available_to_lay: prices.available_to_lay,
                            matched_volume: market.book.runner_matched_volume(runner_id),
                            selected_to_back,
                            selected_to_lay,
                        }
                    })
                    .collect();
                MarketBookSnapshot::ExchangeOdds(ExchangeMarketSnapshot { runners })
            }
            MarketModel::BinaryYes { .. } => {
                let runners: Vec<RunnerId> = market.book.runners().collect();
                let [yes_runner_id, no_runner_id]: [RunnerId; 2] = runners
                    .try_into()
                    .expect("binary market snapshot requires exactly two runners");
                let depth = market
                    .book
                    .binary_depth(depth)
                    .expect("binary market snapshot requires binary depth");
                MarketBookSnapshot::BinaryYes(BinaryMarketSnapshot {
                    yes_runner: RunnerRefSnapshot {
                        runner_id: yes_runner_id,
                        runner_label: market.book.runner_label(yes_runner_id).to_owned(),
                    },
                    no_runner: RunnerRefSnapshot {
                        runner_id: no_runner_id,
                        runner_label: market.book.runner_label(no_runner_id).to_owned(),
                    },
                    max_price_ticks: depth.max_price_ticks,
                    bids: depth.bids,
                    asks: depth.asks,
                })
            }
        };

        Some(MarketSnapshot {
            market_id,
            name: market.name.clone(),
            market_seq: market.last_market_seq,
            state: market.book.market_state(),
            phase: market.book.market_phase(),
            metadata: None,
            book,
        })
    }
}

impl Deref for EngineRoot {
    type Target = lucidstream::types::AggregateRoot<EngineState>;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl DerefMut for EngineRoot {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}