betex 0.27.3

Betfair / Prediction Market Exchange
Documentation
use super::{
    cancel::{CancelledOrdersChunk, push_cancel_chunk},
    events::{BookEvent, BookEventEnvelope, EventVec},
    types::{
        BatchMode, BatchProcessContext, BatchProcessState, BatchProcessTarget, BookMarketState,
        BookOrderInfo, ensure_phase_command_allowed_state,
    },
};
use crate::{
    book::protocol::reject::RejectReason,
    types::{AccountId, MarketKind, MarketPhase, OrderId, RunnerId},
};

pub(crate) fn filtered_batch_target(
    started_at_ms: i64,
    from_created_at_inclusive_ms: Option<i64>,
    to_created_at_inclusive_ms: Option<i64>,
    account_filter: Option<AccountId>,
    runner_filter: Option<RunnerId>,
) -> BatchProcessTarget {
    BatchProcessTarget::Filtered {
        started_at_ms,
        from_created_at_inclusive_ms,
        to_created_at_inclusive_ms,
        account_filter,
        runner_filter,
    }
}

pub(crate) fn matches_filtered_batch_target(
    target: &BatchProcessTarget,
    order_info: &BookOrderInfo,
    runner_matches_filter: bool,
) -> bool {
    let BatchProcessTarget::Filtered {
        started_at_ms,
        from_created_at_inclusive_ms,
        to_created_at_inclusive_ms,
        account_filter,
        runner_filter: _,
    } = target
    else {
        return false;
    };

    let created_at_ms = order_info.created_at.timestamp_millis();
    if created_at_ms > *started_at_ms {
        return false;
    }
    if let Some(from_ms) = *from_created_at_inclusive_ms
        && created_at_ms < from_ms
    {
        return false;
    }
    if let Some(to_ms) = *to_created_at_inclusive_ms
        && created_at_ms > to_ms
    {
        return false;
    }
    if let Some(account_id) = account_filter.as_ref()
        && &order_info.account_id != account_id
    {
        return false;
    }
    if !runner_matches_filter {
        return false;
    }
    true
}

pub(crate) fn build_batch_process_state(
    batch_mode: BatchMode,
    batch_max_events: u16,
    target: &BatchProcessTarget,
    detail: Option<&str>,
    context: BatchProcessContext,
) -> BatchProcessState {
    match target {
        BatchProcessTarget::AllLiveOrders => match context {
            BatchProcessContext::Close { total_live_orders } => {
                BatchProcessState::close(batch_max_events, total_live_orders)
            }
            BatchProcessContext::None => unreachable!("close target requires close context"),
        },
        BatchProcessTarget::LapseOrders => BatchProcessState::lapse(batch_max_events, batch_mode),
        BatchProcessTarget::Filtered {
            started_at_ms,
            from_created_at_inclusive_ms,
            to_created_at_inclusive_ms,
            account_filter,
            runner_filter,
        } => BatchProcessState::cancel(
            batch_max_events,
            *started_at_ms,
            *from_created_at_inclusive_ms,
            *to_created_at_inclusive_ms,
            account_filter.clone(),
            *runner_filter,
            detail.unwrap_or_default().to_owned(),
        ),
    }
}

#[allow(clippy::too_many_arguments)]
fn emit_batch_start_or_retarget(
    active: Option<&BatchProcessState>,
    batch_mode: BatchMode,
    batch_max_events: u16,
    target: &BatchProcessTarget,
    detail: Option<&str>,
    events: &mut EventVec,
    emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
) -> bool {
    if let Some(active) = active {
        if active.is_close() && batch_mode != BatchMode::Close {
            return false;
        }
        if active.batch_mode == batch_mode
            && &active.target == target
            && active.detail.as_deref() == detail
        {
            return false;
        }
        events.push((*emit)(BookEvent::BatchProcessRetargeted {
            from_mode: active.batch_mode,
            to_mode: batch_mode,
            batch_max_events,
            target: target.clone(),
            detail: detail.map(str::to_owned),
            abandoned_detail: active.detail.clone(),
        }));
    } else {
        events.push((*emit)(BookEvent::BatchProcessStarted {
            batch_mode,
            batch_max_events,
            target: target.clone(),
            detail: detail.map(str::to_owned),
        }));
    }
    true
}

#[allow(clippy::too_many_arguments)]
fn plan_batch_chunk_from_cursor(
    cursor_after: Option<OrderId>,
    batch_mode: BatchMode,
    batch_max_events: u16,
    target: &BatchProcessTarget,
    detail: Option<&str>,
    events: &mut EventVec,
    emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
    select_chunk: impl FnOnce(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) {
    let chunk = select_chunk(cursor_after, batch_max_events as usize, target);
    if chunk.done && chunk.cancelled_orders.is_empty() {
        events.push((*emit)(BookEvent::BatchProcessCompleted { batch_mode }));
        return;
    }
    push_cancel_chunk(
        events,
        |event| (*emit)(event),
        chunk,
        batch_mode,
        detail,
        false,
        true,
    );
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn maybe_start_or_retarget_batch(
    active: Option<&BatchProcessState>,
    batch_mode: BatchMode,
    batch_max_events: u16,
    target: &BatchProcessTarget,
    detail: Option<&str>,
    events: &mut EventVec,
    emit: impl FnMut(BookEvent) -> BookEventEnvelope,
    select_chunk: impl FnOnce(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) {
    let mut emit = emit;
    if emit_batch_start_or_retarget(
        active,
        batch_mode,
        batch_max_events,
        target,
        detail,
        events,
        &mut emit,
    ) {
        plan_batch_chunk_from_cursor(
            None,
            batch_mode,
            batch_max_events,
            target,
            detail,
            events,
            &mut emit,
            select_chunk,
        );
    }
}

pub(crate) fn continue_batch_process(
    proc_state: Option<&BatchProcessState>,
    events: &mut EventVec,
    emit: impl FnMut(BookEvent) -> BookEventEnvelope,
    select_chunk: impl FnOnce(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) -> Result<(), RejectReason> {
    let Some(proc_state) = proc_state else {
        return Err(RejectReason::MarketNotBatchCancelling);
    };

    let mut emit = emit;
    plan_batch_chunk_from_cursor(
        proc_state.cursor_after,
        proc_state.batch_mode,
        proc_state.batch_max_events,
        &proc_state.target,
        proc_state.detail.as_deref(),
        events,
        &mut emit,
        select_chunk,
    );
    Ok(())
}

pub(crate) fn apply_batch_process_state_event(
    batch_process: &mut Option<BatchProcessState>,
    event: &BookEvent,
    mut batch_context_for_mode: impl FnMut(BatchMode) -> BatchProcessContext,
) -> bool {
    match event {
        BookEvent::BatchProcessStarted {
            batch_mode,
            batch_max_events,
            target,
            detail,
        } => {
            let context = batch_context_for_mode(*batch_mode);
            *batch_process = Some(build_batch_process_state(
                *batch_mode,
                *batch_max_events,
                target,
                detail.as_deref(),
                context,
            ));
            true
        }
        BookEvent::BatchProcessRetargeted {
            to_mode,
            batch_max_events,
            target,
            detail,
            ..
        } => {
            let context = batch_context_for_mode(*to_mode);
            if let Some(state) = batch_process.as_mut() {
                state.retarget(
                    *to_mode,
                    *batch_max_events,
                    target.clone(),
                    detail.clone(),
                    context,
                );
            } else {
                *batch_process = Some(build_batch_process_state(
                    *to_mode,
                    *batch_max_events,
                    target,
                    detail.as_deref(),
                    context,
                ));
            }
            true
        }
        BookEvent::BatchProcessCompleted { .. } => {
            *batch_process = None;
            true
        }
        _ => false,
    }
}

pub(crate) fn apply_batch_cancelled_chunk_event(
    batch_process: &mut Option<BatchProcessState>,
    event: &BookEvent,
    mut cancel_order: impl FnMut(OrderId),
) -> bool {
    let BookEvent::OrderCancelledBatched {
        cancelled_orders,
        cursor_after,
        batch_mode,
        ..
    } = event
    else {
        return false;
    };

    let cursor_after_order_id = cursor_after.as_ref().map(|cursor| cursor.order_id);
    let cancelled_count = cancelled_orders.len() as u64;
    for cancelled_order in cancelled_orders {
        cancel_order(cancelled_order.order_id);
    }
    if let Some(state) = batch_process.as_mut()
        && state.batch_mode == *batch_mode
    {
        state.record_chunk(cursor_after_order_id, cancelled_count);
    }
    true
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct AwaitLiveMarketPlan {
    pub suspend_market: bool,
    pub set_pre_await_live: bool,
    pub start_lapse: bool,
}

pub(crate) fn plan_await_live_market_with_lapse(
    state: BookMarketState,
    market_phase: MarketPhase,
    market_kind: MarketKind,
) -> Result<AwaitLiveMarketPlan, RejectReason> {
    if market_kind != MarketKind::InPlayCapable {
        return Err(RejectReason::MarketInPlayNotSupported);
    }
    ensure_phase_command_allowed_state(state)?;

    match (state, market_phase) {
        (_, MarketPhase::Live) => Err(RejectReason::MarketInPlayNotSupported),
        (BookMarketState::Suspended, MarketPhase::PreAwaitLive)
        | (BookMarketState::Halted, MarketPhase::PreAwaitLive) => Err(RejectReason::NoChange),
        (BookMarketState::Open, phase) => Ok(AwaitLiveMarketPlan {
            suspend_market: true,
            set_pre_await_live: phase == MarketPhase::Pre,
            start_lapse: true,
        }),
        (BookMarketState::Suspended, MarketPhase::Pre)
        | (BookMarketState::Halted, MarketPhase::Pre) => Ok(AwaitLiveMarketPlan {
            suspend_market: false,
            set_pre_await_live: true,
            start_lapse: false,
        }),
        _ => Err(RejectReason::MarketInPlayNotSupported),
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct GoLiveMarketPlan {
    pub reopen_market: bool,
    pub set_live_phase: bool,
    pub start_in_play_lapse: bool,
}

pub(crate) fn plan_go_live_market_with_lapse(
    state: BookMarketState,
    market_phase: MarketPhase,
    market_kind: MarketKind,
) -> Result<GoLiveMarketPlan, RejectReason> {
    if market_kind == MarketKind::PreEventOnly {
        return Err(RejectReason::MarketInPlayNotSupported);
    }
    ensure_phase_command_allowed_state(state)?;

    match (state, market_phase) {
        (BookMarketState::Open, MarketPhase::Live)
        | (BookMarketState::Halted, MarketPhase::Live) => Err(RejectReason::NoChange),
        (BookMarketState::Suspended, MarketPhase::Live) => Ok(GoLiveMarketPlan {
            reopen_market: true,
            set_live_phase: false,
            start_in_play_lapse: false,
        }),
        (BookMarketState::Open, phase)
        | (BookMarketState::Suspended, phase)
        | (BookMarketState::Halted, phase) => {
            if !market_kind.can_transition_to_phase(phase, MarketPhase::Live) {
                return Err(RejectReason::MarketInPlayNotSupported);
            }
            Ok(GoLiveMarketPlan {
                reopen_market: state == BookMarketState::Suspended,
                set_live_phase: true,
                start_in_play_lapse: true,
            })
        }
        _ => Err(RejectReason::MarketInPlayNotSupported),
    }
}