betex 0.13.0

Betfair / Prediction Market Exchange
Documentation
//! Shared helpers for the market close process.
//!
//! ## Close Process Overview
//!
//! When a market closes, all resting orders must be cancelled. To avoid
//! emitting unbounded events in a single transaction, the close process
//! uses **batched cancellation**:
//!
//! 1. `CloseMarket` command starts the close, cancelling up to `batch_max_events` orders
//! 2. Market transitions to `Closed` and starts a batched cancel process with [`CloseProcessState`]
//! 3. `ContinueCloseMarket` commands process additional batches
//! 4. Final batch emits completion marker
//!
//! ## Cursor Semantics
//!
//! The `cursor_after` field in [`CloseProcessState`] and [`BookEvent::OrderCancelled`]
//! tracks pagination through the order set:
//!
//! - `None` on the first batch (start from beginning)
//! - `Some(order_id)` indicates the last processed order
//! - Subsequent batches resume from orders greater than the cursor
//!
//! This cursor-based pagination ensures **idempotent recovery**: replaying
//! events that have already been applied is safe because the cursor prevents
//! re-processing orders.
//!
//! ## Recovery Guarantees
//!
//! The close process is designed for crash recovery:
//!
//! - **No double cancellation**: Orders before the cursor are skipped on replay
//! - **Deterministic ordering**: Orders are processed in BTreeMap order (by OrderId)
//! - **Progress tracking**: `chunks_done` and `cancelled_total` allow monitoring
//!
//! ## Event Budget
//!
//! The `batch_max_events` parameter controls how many order cancellations can
//! occur per batch. The minimum is [`MIN_CLOSE_BATCH_EVENTS`] (2) to ensure
//! at least the state-change event fits. Typical values are 1000-4096.

use crate::book::common::events::{BookEvent, BookEventEnvelope};
use crate::book::common::types::{BookMarketState, BookOrder, CloseProcessState};
use crate::types::{AccountId, OrderId};

// Account for MarketStateChanged + OrderCancelled(events-as-batch) events
pub const MIN_CLOSE_BATCH_EVENTS: u16 = 2;
pub const DEFAULT_CLOSE_BATCH_EVENTS: u16 = 4096;

/// Encode the close batch size into the market-state-change reason so the close process can be
/// initialized deterministically during event application (no command-time mutation).
pub fn close_start_reason(batch_max_events: u16) -> String {
    format!("CLOSE_START:{batch_max_events}")
}

pub fn parse_close_start_batch_max_events(reason: &str) -> Option<u16> {
    reason
        .strip_prefix("CLOSE_START:")
        .and_then(|s| s.parse::<u16>().ok())
}

#[inline]
pub fn is_live_order(o: &BookOrder) -> bool {
    matches!(
        o.info.state,
        crate::book::common::types::BookOrderState::ExecutableUnmatched
            | crate::book::common::types::BookOrderState::ExecutablePartiallyMatched
    )
}

pub fn count_live_orders<'a>(orders: impl Iterator<Item = (OrderId, &'a BookOrder)>) -> u64 {
    orders.filter(|(_, o)| is_live_order(o)).count() as u64
}

pub fn close_start_batch(
    batch_max_events: u16,
    mut state_change: impl FnMut(BookMarketState, &str, &mut Vec<BookEventEnvelope>),
    emit: impl Fn(BookEvent) -> BookEventEnvelope,
    mut cancel_chunk: impl FnMut(
        Option<OrderId>,
        usize,
    ) -> (
        Vec<OrderId>,
        Vec<AccountId>,
        Option<OrderId>,
        Option<AccountId>,
        bool,
        u64,
    ),
) -> Vec<BookEventEnvelope> {
    // Determine whether the close can complete in this initial batch before choosing the state
    // transition(s). This ensures the emitted event count never exceeds `batch_max_events`.
    //
    // Events emitted by this function:
    // - Start batch: MarketStateChanged(to=Closed) + OrderCancelled (2)
    let cancel_budget = batch_max_events as usize;
    let (cancelled_order_ids, account_ids, cursor_after, cursor_after_account_id, done, _) =
        cancel_chunk(None, cancel_budget);

    let mut events = Vec::new();

    let reason = close_start_reason(batch_max_events);
    state_change(BookMarketState::Closed, &reason, &mut events);
    events.push(emit(BookEvent::OrderCancelled {
        cursor_after: if done { None } else { cursor_after },
        cursor_after_account_id: if done { None } else { cursor_after_account_id },
        order_ids: cancelled_order_ids,
        account_ids,
        is_final: done,
        cancel_cause: crate::book::common::events::CancelCause::CloseCancel,
        cause_detail: None,
    }));

    events
}

pub fn close_continue_batch(
    proc_state: CloseProcessState,
    state_change: impl FnMut(BookMarketState, &str, &mut Vec<BookEventEnvelope>),
    emit: impl Fn(BookEvent) -> BookEventEnvelope,
    mut cancel_chunk: impl FnMut(
        Option<OrderId>,
        usize,
    ) -> (
        Vec<OrderId>,
        Vec<AccountId>,
        Option<OrderId>,
        Option<AccountId>,
        bool,
        u64,
    ),
) -> Vec<BookEventEnvelope> {
    let mut events = Vec::new();

    let cancel_budget = proc_state.batch_max_events as usize; // No overhead
    let (cancelled_order_ids, account_ids, cursor_after, cursor_after_account_id, done, _) =
        cancel_chunk(proc_state.cursor_after, cancel_budget);

    let _ = state_change;
    events.push(emit(BookEvent::OrderCancelled {
        cursor_after: if done { None } else { cursor_after },
        cursor_after_account_id: if done { None } else { cursor_after_account_id },
        order_ids: cancelled_order_ids,
        account_ids,
        is_final: done,
        cancel_cause: crate::book::common::events::CancelCause::CloseCancel,
        cause_detail: None,
    }));

    events
}