use super::{
cancel::{CancelledOrdersChunk, push_cancel_chunk},
events::{BookEvent, BookEventEnvelope, EventVec},
types::{
BatchMode, BatchProcessContext, BatchProcessDescriptor, BatchProcessState,
BatchProcessTarget, BookMarketState, BookOrderInfo, ensure_phase_command_allowed_state,
},
};
use crate::{
book::protocol::reject::RejectReason,
types::{AccountId, MarketKind, MarketPhase, OrderId, RunnerId},
};
use std::collections::HashSet;
pub(crate) fn filtered_batch_target(
started_at_ms: i64,
from_created_at_inclusive_ms: Option<i64>,
to_created_at_inclusive_ms: Option<i64>,
account_filter: Option<AccountId>,
runner_filter: Option<RunnerId>,
) -> BatchProcessTarget {
BatchProcessTarget::Filtered {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
}
}
pub(crate) fn matches_filtered_batch_target(
target: &BatchProcessTarget,
order_info: &BookOrderInfo,
runner_matches_filter: bool,
) -> bool {
let BatchProcessTarget::Filtered {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter: _,
} = target
else {
return false;
};
let created_at_ms = order_info.created_at.timestamp_millis();
if created_at_ms > *started_at_ms {
return false;
}
if let Some(from_ms) = *from_created_at_inclusive_ms
&& created_at_ms < from_ms
{
return false;
}
if let Some(to_ms) = *to_created_at_inclusive_ms
&& created_at_ms > to_ms
{
return false;
}
if let Some(account_id) = account_filter.as_ref()
&& &order_info.account_id != account_id
{
return false;
}
if !runner_matches_filter {
return false;
}
true
}
pub(crate) fn build_batch_process_state(
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
context: BatchProcessContext,
) -> BatchProcessState {
target.assert_valid_for_mode(batch_mode);
match target {
BatchProcessTarget::AllLiveOrders => match context {
BatchProcessContext::Close { total_live_orders } => {
BatchProcessState::close(batch_max_events, total_live_orders)
}
BatchProcessContext::None => unreachable!("close target requires close context"),
},
BatchProcessTarget::LapseOrders => BatchProcessState::lapse(batch_max_events, batch_mode),
BatchProcessTarget::RunnerRemoval { runner_ids } => {
BatchProcessState::runner_removal(batch_max_events, runner_ids.clone())
}
BatchProcessTarget::Filtered {
started_at_ms,
from_created_at_inclusive_ms,
to_created_at_inclusive_ms,
account_filter,
runner_filter,
} => BatchProcessState::cancel(
batch_max_events,
*started_at_ms,
*from_created_at_inclusive_ms,
*to_created_at_inclusive_ms,
account_filter.clone(),
*runner_filter,
detail.unwrap_or_default().to_owned(),
),
}
}
fn batch_descriptor(
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
) -> BatchProcessDescriptor {
BatchProcessDescriptor::new(
batch_mode,
batch_max_events,
target.clone(),
detail.map(str::to_owned),
)
}
#[allow(clippy::too_many_arguments)]
fn emit_batch_start_or_retarget(
active: Option<&BatchProcessState>,
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
) -> bool {
target.assert_valid_for_mode(batch_mode);
if let Some(active) = active {
if active.is_close() && batch_mode != BatchMode::Close {
return false;
}
if active.batch_mode == batch_mode
&& &active.target == target
&& active.detail.as_deref() == detail
{
return false;
}
events.push((*emit)(BookEvent::BatchProcessRetargeted {
from_mode: active.batch_mode,
to_mode: batch_mode,
batch_max_events,
target: target.clone(),
detail: detail.map(str::to_owned),
abandoned_detail: active.detail.clone(),
}));
} else {
events.push((*emit)(BookEvent::BatchProcessStarted {
batch_mode,
batch_max_events,
target: target.clone(),
detail: detail.map(str::to_owned),
}));
}
true
}
fn should_queue_behind_active(active: Option<&BatchProcessState>, batch_mode: BatchMode) -> bool {
matches!(
(active.map(|state| state.batch_mode), batch_mode),
(
Some(BatchMode::RunnerRemovalCancel),
BatchMode::SuspendLapse | BatchMode::InPlayLapse
)
)
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn maybe_start_or_queue_batch(
active: Option<&BatchProcessState>,
queued_batches: &[BatchProcessDescriptor],
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnMut(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) {
target.assert_valid_for_mode(batch_mode);
if should_queue_behind_active(active, batch_mode) {
let descriptor = batch_descriptor(batch_mode, batch_max_events, target, detail);
if queued_batches.iter().any(|queued| queued == &descriptor) {
return;
}
let mut emit = emit;
events.push(emit(BookEvent::BatchProcessQueued {
batch_mode,
batch_max_events,
target: target.clone(),
detail: detail.map(str::to_owned),
}));
return;
}
maybe_start_or_retarget_batch(
active,
batch_mode,
batch_max_events,
target,
detail,
events,
emit,
select_chunk,
);
}
#[allow(clippy::too_many_arguments)]
fn plan_batch_chunk_from_cursor(
cursor_after: Option<OrderId>,
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: &mut impl FnMut(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
planned_cancelled_order_ids: &mut HashSet<OrderId>,
) -> bool {
target.assert_valid_for_mode(batch_mode);
let mut chunk = select_chunk(cursor_after, batch_max_events as usize, target);
if !planned_cancelled_order_ids.is_empty() {
chunk
.cancelled_orders
.retain(|entry| !planned_cancelled_order_ids.contains(&entry.order_id));
}
if chunk.done && chunk.cancelled_orders.is_empty() {
events.push((*emit)(BookEvent::BatchProcessCompleted { batch_mode }));
return true;
}
let done = chunk.done;
for entry in &chunk.cancelled_orders {
planned_cancelled_order_ids.insert(entry.order_id);
}
push_cancel_chunk(
events,
|event| (*emit)(event),
chunk,
batch_mode,
detail,
false,
true,
);
done
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn maybe_start_or_retarget_batch(
active: Option<&BatchProcessState>,
batch_mode: BatchMode,
batch_max_events: u16,
target: &BatchProcessTarget,
detail: Option<&str>,
events: &mut EventVec,
emit: impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnMut(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) {
let mut emit = emit;
if emit_batch_start_or_retarget(
active,
batch_mode,
batch_max_events,
target,
detail,
events,
&mut emit,
) {
let mut select_chunk = select_chunk;
let mut planned_cancelled_order_ids = HashSet::new();
plan_batch_chunk_from_cursor(
None,
batch_mode,
batch_max_events,
target,
detail,
events,
&mut emit,
&mut select_chunk,
&mut planned_cancelled_order_ids,
);
}
}
fn promote_queued_batches(
queued_batches: &[BatchProcessDescriptor],
events: &mut EventVec,
emit: &mut impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: &mut impl FnMut(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
planned_cancelled_order_ids: &mut HashSet<OrderId>,
) {
for descriptor in queued_batches {
events.push((*emit)(BookEvent::BatchProcessStarted {
batch_mode: descriptor.batch_mode,
batch_max_events: descriptor.batch_max_events,
target: descriptor.target.clone(),
detail: descriptor.detail.clone(),
}));
let completed = plan_batch_chunk_from_cursor(
None,
descriptor.batch_mode,
descriptor.batch_max_events,
&descriptor.target,
descriptor.detail.as_deref(),
events,
emit,
select_chunk,
planned_cancelled_order_ids,
);
if !completed {
break;
}
}
}
pub(crate) fn continue_batch_process(
proc_state: Option<&BatchProcessState>,
events: &mut EventVec,
emit: impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnMut(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) -> Result<(), RejectReason> {
continue_batch_process_with_queue(proc_state, &[], events, emit, select_chunk)
}
pub(crate) fn continue_batch_process_with_queue(
proc_state: Option<&BatchProcessState>,
queued_batches: &[BatchProcessDescriptor],
events: &mut EventVec,
emit: impl FnMut(BookEvent) -> BookEventEnvelope,
select_chunk: impl FnMut(Option<OrderId>, usize, &BatchProcessTarget) -> CancelledOrdersChunk,
) -> Result<(), RejectReason> {
let Some(proc_state) = proc_state else {
return Err(RejectReason::MarketNotBatchCancelling);
};
let mut emit = emit;
let mut select_chunk = select_chunk;
let mut planned_cancelled_order_ids = HashSet::new();
let completed = plan_batch_chunk_from_cursor(
proc_state.cursor_after,
proc_state.batch_mode,
proc_state.batch_max_events,
&proc_state.target,
proc_state.detail.as_deref(),
events,
&mut emit,
&mut select_chunk,
&mut planned_cancelled_order_ids,
);
if completed {
promote_queued_batches(
queued_batches,
events,
&mut emit,
&mut select_chunk,
&mut planned_cancelled_order_ids,
);
}
Ok(())
}
pub(crate) fn apply_batch_process_state_event(
batch_process: &mut Option<BatchProcessState>,
queued_batches: &mut Vec<BatchProcessDescriptor>,
event: &BookEvent,
mut batch_context_for_mode: impl FnMut(BatchMode) -> BatchProcessContext,
) -> bool {
match event {
BookEvent::BatchProcessQueued {
batch_mode,
batch_max_events,
target,
detail,
} => {
queued_batches.push(BatchProcessDescriptor::new(
*batch_mode,
*batch_max_events,
target.clone(),
detail.clone(),
));
true
}
BookEvent::BatchProcessStarted {
batch_mode,
batch_max_events,
target,
detail,
} => {
let descriptor = BatchProcessDescriptor::new(
*batch_mode,
*batch_max_events,
target.clone(),
detail.clone(),
);
if queued_batches
.first()
.is_some_and(|queued| queued == &descriptor)
{
queued_batches.remove(0);
}
let context = batch_context_for_mode(*batch_mode);
*batch_process = Some(build_batch_process_state(
*batch_mode,
*batch_max_events,
target,
detail.as_deref(),
context,
));
true
}
BookEvent::BatchProcessRetargeted {
to_mode,
batch_max_events,
target,
detail,
..
} => {
if *to_mode == BatchMode::Close {
queued_batches.clear();
}
let context = batch_context_for_mode(*to_mode);
if let Some(state) = batch_process.as_mut() {
state.retarget(
*to_mode,
*batch_max_events,
target.clone(),
detail.clone(),
context,
);
} else {
*batch_process = Some(build_batch_process_state(
*to_mode,
*batch_max_events,
target,
detail.as_deref(),
context,
));
}
true
}
BookEvent::BatchProcessCompleted { .. } => {
*batch_process = None;
true
}
_ => false,
}
}
pub(crate) fn apply_batch_cancelled_chunk_event(
batch_process: &mut Option<BatchProcessState>,
event: &BookEvent,
mut cancel_order: impl FnMut(OrderId),
) -> bool {
let BookEvent::OrderCancelledBatched {
cancelled_orders,
cursor_after,
batch_mode,
..
} = event
else {
return false;
};
let cursor_after_order_id = cursor_after.as_ref().map(|cursor| cursor.order_id);
let cancelled_count = cancelled_orders.len() as u64;
for cancelled_order in cancelled_orders {
cancel_order(cancelled_order.order_id);
}
if let Some(state) = batch_process.as_mut()
&& state.batch_mode == *batch_mode
{
state.record_chunk(cursor_after_order_id, cancelled_count);
}
true
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct AwaitLiveMarketPlan {
pub suspend_market: bool,
pub set_pre_await_live: bool,
pub start_lapse: bool,
}
pub(crate) fn plan_await_live_market_with_lapse(
state: BookMarketState,
market_phase: MarketPhase,
market_kind: MarketKind,
) -> Result<AwaitLiveMarketPlan, RejectReason> {
if market_kind != MarketKind::InPlayCapable {
return Err(RejectReason::MarketInPlayNotSupported);
}
ensure_phase_command_allowed_state(state)?;
match (state, market_phase) {
(_, MarketPhase::Live) => Err(RejectReason::MarketInPlayNotSupported),
(BookMarketState::Suspended, MarketPhase::PreAwaitLive)
| (BookMarketState::Halted, MarketPhase::PreAwaitLive)
| (BookMarketState::Deactivated, MarketPhase::PreAwaitLive) => Err(RejectReason::NoChange),
(BookMarketState::Open, phase) => Ok(AwaitLiveMarketPlan {
suspend_market: true,
set_pre_await_live: phase == MarketPhase::Pre,
start_lapse: true,
}),
(BookMarketState::Suspended, MarketPhase::Pre)
| (BookMarketState::Halted, MarketPhase::Pre)
| (BookMarketState::Deactivated, MarketPhase::Pre) => Ok(AwaitLiveMarketPlan {
suspend_market: false,
set_pre_await_live: true,
start_lapse: false,
}),
_ => Err(RejectReason::MarketInPlayNotSupported),
}
}
pub(crate) fn plan_go_live_market_with_lapse(
state: BookMarketState,
market_phase: MarketPhase,
market_kind: MarketKind,
) -> Result<(), RejectReason> {
if market_kind == MarketKind::PreEventOnly {
return Err(RejectReason::MarketInPlayNotSupported);
}
ensure_phase_command_allowed_state(state)?;
match (state, market_phase) {
(BookMarketState::Open, MarketPhase::Live)
| (BookMarketState::Suspended, MarketPhase::Live)
| (BookMarketState::Halted, MarketPhase::Live)
| (BookMarketState::Deactivated, MarketPhase::Live) => Err(RejectReason::NoChange),
(BookMarketState::Open, phase)
| (BookMarketState::Suspended, phase)
| (BookMarketState::Halted, phase)
| (BookMarketState::Deactivated, phase) => {
if !market_kind.can_transition_to_phase(phase, MarketPhase::Live) {
return Err(RejectReason::MarketInPlayNotSupported);
}
Ok(())
}
_ => Err(RejectReason::MarketInPlayNotSupported),
}
}
pub(crate) fn plan_return_to_pre_market(
state: BookMarketState,
market_phase: MarketPhase,
market_kind: MarketKind,
) -> Result<(), RejectReason> {
ensure_phase_command_allowed_state(state)?;
match market_phase {
MarketPhase::Pre => Err(RejectReason::NoChange),
MarketPhase::Live => Err(RejectReason::MarketInPlayNotSupported),
MarketPhase::PreAwaitLive
if market_kind.can_transition_to_phase(market_phase, MarketPhase::Pre) =>
{
Ok(())
}
MarketPhase::PreAwaitLive => Err(RejectReason::MarketInPlayNotSupported),
}
}