use super::{
cancel::{CancelledOrdersChunk, push_cancel_chunk},
events::{BookEvent, BookEventEnvelope, EventVec},
types::{
BatchMode, BatchProcessContext, BatchProcessState, BatchProcessTarget, BookMarketState,
BookOrderInfo, ensure_phase_command_allowed_state,
},
};
use crate::{
book::protocol::reject::RejectReason,
types::{AccountId, MarketKind, MarketPhase, OrderId, RunnerId},
};
pub(crate) fn filtered_batch_target(
started_at_ms: i64,
from_created_at_inclusive_ms: Option<i64>,
to_created_at_inclusive_ms: Option<i64>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
) -> BatchProcessTarget {
BatchProcessTarget::Filtered {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
}
}
pub(crate) fn matches_filtered_batch_target(
target: &BatchProcessTarget,
order_info: &BookOrderInfo,
runner_matches_filter: bool,
) -> bool {
let BatchProcessTarget::Filtered {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter: _,
} = target
else {
return false;
};
let created_at_ms = order_info.created_at.timestamp_millis();
if created_at_ms > *started_at_ms {
return false;
}
if let Some(from_ms) = *from_created_at_inclusive_ms
&& created_at_ms < from_ms
{
return false;
}
if let Some(to_ms) = *to_created_at_inclusive_ms
&& created_at_ms > to_ms
{
return false;
}
if let Some(account_id) = account_filter.as_ref()
&& &order_info.account_id != account_id
{
return false;
}
if !runner_matches_filter {
return false;
}
true
}
pub(crate) fn build_batch_process_state(
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
context: BatchProcessContext,
) -> BatchProcessState {
match target {
BatchProcessTarget::AllLiveOrders => match context {
BatchProcessContext::Close { total_live_orders } => {
BatchProcessState::close(batch_max_events, total_live_orders)
}
BatchProcessContext::None => unreachable!("close target requires close context"),
},
BatchProcessTarget::LapseOrders => BatchProcessState::lapse(batch_max_events, batch_mode),
BatchProcessTarget::Filtered {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
} => BatchProcessState::cancel(
batch_max_events,
*started_at_ms,
*from_created_at_inclusive_ms,
*to_created_at_inclusive_ms,
account_filter.clone(),
*runner_filter,
detail.unwrap_or_default().to_owned(),
),
}
}
#[allow(clippy::too_many_arguments)]
fn emit_batch_start_or_retarget(
active: Option<&BatchProcessState>,
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
) -> bool {
if let Some(active) = active {
if active.is_close() && batch_mode != BatchMode::Close {
return false;
}
if active.batch_mode == batch_mode
&& &active.target == target
&& active.detail.as_deref() == detail
{
return false;
}
events.push((*emit)(BookEvent::BatchProcessRetargeted {
from_mode: active.batch_mode,
to_mode: batch_mode,
batch_max_events,
target: target.clone(),
detail: detail.map(str::to_owned),
abandoned_detail: active.detail.clone(),
}));
} else {
events.push((*emit)(BookEvent::BatchProcessStarted {
batch_mode,
batch_max_events,
target: target.clone(),
detail: detail.map(str::to_owned),
}));
}
true
}
#[allow(clippy::too_many_arguments)]
fn plan_batch_chunk_from_cursor(
cursor_after: Option<OrderId>,
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnOnce(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) {
let chunk = select_chunk(cursor_after, batch_max_events as usize, target);
if chunk.done && chunk.cancelled_orders.is_empty() {
events.push((*emit)(BookEvent::BatchProcessCompleted { batch_mode }));
return;
}
push_cancel_chunk(
events,
|event| (*emit)(event),
chunk,
batch_mode,
detail,
false,
true,
);
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn maybe_start_or_retarget_batch(
active: Option<&BatchProcessState>,
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnOnce(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) {
let mut emit = emit;
if emit_batch_start_or_retarget(
active,
batch_mode,
batch_max_events,
target,
detail,
events,
&mut emit,
) {
plan_batch_chunk_from_cursor(
None,
batch_mode,
batch_max_events,
target,
detail,
events,
&mut emit,
select_chunk,
);
}
}
pub(crate) fn continue_batch_process(
proc_state: Option<&BatchProcessState>,
events: &mut EventVec,
emit: impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnOnce(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) -> Result<(), RejectReason> {
let Some(proc_state) = proc_state else {
return Err(RejectReason::MarketNotBatchCancelling);
};
let mut emit = emit;
plan_batch_chunk_from_cursor(
proc_state.cursor_after,
proc_state.batch_mode,
proc_state.batch_max_events,
&proc_state.target,
proc_state.detail.as_deref(),
events,
&mut emit,
select_chunk,
);
Ok(())
}
pub(crate) fn apply_batch_process_state_event(
batch_process: &mut Option<BatchProcessState>,
event: &BookEvent,
mut batch_context_for_mode: impl FnMut(BatchMode) -> BatchProcessContext,
) -> bool {
match event {
BookEvent::BatchProcessStarted {
batch_mode,
batch_max_events,
target,
detail,
} => {
let context = batch_context_for_mode(*batch_mode);
*batch_process = Some(build_batch_process_state(
*batch_mode,
*batch_max_events,
target,
detail.as_deref(),
context,
));
true
}
BookEvent::BatchProcessRetargeted {
to_mode,
batch_max_events,
target,
detail,
..
} => {
let context = batch_context_for_mode(*to_mode);
if let Some(state) = batch_process.as_mut() {
state.retarget(
*to_mode,
*batch_max_events,
target.clone(),
detail.clone(),
context,
);
} else {
*batch_process = Some(build_batch_process_state(
*to_mode,
*batch_max_events,
target,
detail.as_deref(),
context,
));
}
true
}
BookEvent::BatchProcessCompleted { .. } => {
*batch_process = None;
true
}
_ => false,
}
}
pub(crate) fn apply_batch_cancelled_chunk_event(
batch_process: &mut Option<BatchProcessState>,
event: &BookEvent,
mut cancel_order: impl FnMut(OrderId),
) -> bool {
let BookEvent::OrderCancelledBatched {
cancelled_orders,
cursor_after,
batch_mode,
..
} = event
else {
return false;
};
let cursor_after_order_id = cursor_after.as_ref().map(|cursor| cursor.order_id);
let cancelled_count = cancelled_orders.len() as u64;
for cancelled_order in cancelled_orders {
cancel_order(cancelled_order.order_id);
}
if let Some(state) = batch_process.as_mut()
&& state.batch_mode == *batch_mode
{
state.record_chunk(cursor_after_order_id, cancelled_count);
}
true
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct AwaitLiveMarketPlan {
pub suspend_market: bool,
pub set_pre_await_live: bool,
pub start_lapse: bool,
}
pub(crate) fn plan_await_live_market_with_lapse(
state: BookMarketState,
market_phase: MarketPhase,
market_kind: MarketKind,
) -> Result<AwaitLiveMarketPlan, RejectReason> {
if market_kind != MarketKind::InPlayCapable {
return Err(RejectReason::MarketInPlayNotSupported);
}
ensure_phase_command_allowed_state(state)?;
match (state, market_phase) {
(_, MarketPhase::Live) => Err(RejectReason::MarketInPlayNotSupported),
(BookMarketState::Suspended, MarketPhase::PreAwaitLive)
| (BookMarketState::Halted, MarketPhase::PreAwaitLive)
| (BookMarketState::Deactivated, MarketPhase::PreAwaitLive) => Err(RejectReason::NoChange),
(BookMarketState::Open, phase) => Ok(AwaitLiveMarketPlan {
suspend_market: true,
set_pre_await_live: phase == MarketPhase::Pre,
start_lapse: true,
}),
(BookMarketState::Suspended, MarketPhase::Pre)
| (BookMarketState::Halted, MarketPhase::Pre)
| (BookMarketState::Deactivated, MarketPhase::Pre) => Ok(AwaitLiveMarketPlan {
suspend_market: false,
set_pre_await_live: true,
start_lapse: false,
}),
_ => Err(RejectReason::MarketInPlayNotSupported),
}
}
pub(crate) fn plan_go_live_market_with_lapse(
state: BookMarketState,
market_phase: MarketPhase,
market_kind: MarketKind,
) -> Result<(), RejectReason> {
if market_kind == MarketKind::PreEventOnly {
return Err(RejectReason::MarketInPlayNotSupported);
}
ensure_phase_command_allowed_state(state)?;
match (state, market_phase) {
(BookMarketState::Open, MarketPhase::Live)
| (BookMarketState::Suspended, MarketPhase::Live)
| (BookMarketState::Halted, MarketPhase::Live)
| (BookMarketState::Deactivated, MarketPhase::Live) => Err(RejectReason::NoChange),
(BookMarketState::Open, phase)
| (BookMarketState::Suspended, phase)
| (BookMarketState::Halted, phase)
| (BookMarketState::Deactivated, phase) => {
if !market_kind.can_transition_to_phase(phase, MarketPhase::Live) {
return Err(RejectReason::MarketInPlayNotSupported);
}
Ok(())
}
_ => Err(RejectReason::MarketInPlayNotSupported),
}
}