use betex::{
book::BookMarketState,
book::MIN_CLOSE_BATCH_EVENTS,
book::protocol::command::{Command, CommandKind, Persistence, Side, TimeInForce},
book::{
BatchMode, BatchProcessContext, Book, BookEvent, BookEventEnvelope, CancelledOrderEntry,
EventMetadata, MAX_CLOSE_BATCH_EVENTS,
},
types::{AccountId, CorrelationId, MarketId, Money, OddsX10000, OrderId, RunnerId},
};
fn cancelled_order_ids(cancelled_orders: &[CancelledOrderEntry]) -> Vec<OrderId> {
cancelled_orders
.iter()
.map(|entry| entry.order_id)
.collect()
}
fn place_order_cmd(
market_id: MarketId,
correlation_id: u64,
account_id: u64,
runner_id: u32,
side: Side,
odds: u32,
stake: i64,
) -> Command {
Command {
correlation_id: Some(CorrelationId(correlation_id.to_string())),
metadata: None,
market_id,
kind: CommandKind::PlaceOrder {
runner_id: RunnerId(runner_id),
account_id: AccountId::from(account_id),
client_order_id: None,
side,
odds: OddsX10000(odds),
stake: Money(stake),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
}
}
fn close_market_cmd(market_id: MarketId) -> Command {
Command {
correlation_id: Some(CorrelationId(0.to_string())),
metadata: None,
market_id,
kind: CommandKind::CloseMarket {
reason: "MANUAL_CLOSE".to_string(),
},
}
}
fn continue_close_cmd(market_id: MarketId) -> Command {
Command {
correlation_id: Some(CorrelationId(0.to_string())),
metadata: None,
market_id,
kind: CommandKind::ContinueBatchProcess,
}
}
fn place_binary_order_cmd(
market_id: MarketId,
correlation_id: u64,
account_id: u64,
side: Side,
price_ticks: u16,
qty_shares: u64,
) -> Command {
Command {
correlation_id: Some(CorrelationId(correlation_id.to_string())),
metadata: None,
market_id,
kind: CommandKind::PlaceBinaryOrder {
account_id: AccountId::from(account_id),
client_order_id: None,
side,
price_ticks,
qty_shares,
time_in_force: TimeInForce::Gtc,
},
}
}
fn exec(book: &mut Book, cmd: Command) -> Vec<BookEvent> {
let (events, _) = book.handle(&cmd).expect("command should succeed");
book.apply_all_events(&events);
events.into_iter().map(|e| e.event).collect()
}
fn exec_envs(book: &mut Book, cmd: Command) -> Vec<betex::book::BookEventEnvelope> {
let (events, _) = book.handle(&cmd).expect("command should succeed");
book.apply_all_events(&events);
events
}
fn has_close_process(book: &Book) -> bool {
matches!(
book.batch_process_state(),
Some(state) if state.is_close()
)
}
fn close_process_state(book: &Book) -> &betex::book::BatchProcessState {
let state = book.batch_process_state().expect("close state");
assert_eq!(state.batch_mode, BatchMode::Close);
assert!(matches!(state.context, BatchProcessContext::Close { .. }));
state
}
fn drive_close(book: &mut Book, market_id: MarketId, batch_max_events: u16) -> Vec<BookEvent> {
book.set_close_batch_max_events(batch_max_events);
let mut out = Vec::new();
out.extend(exec(book, close_market_cmd(market_id)));
while has_close_process(book) {
out.extend(exec(book, continue_close_cmd(market_id)));
}
out
}
fn assert_logical_close_state(book: &Book, order_ids: &[OrderId]) {
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
for &oid in order_ids {
assert!(
book.get_order(oid).is_none(),
"terminal order should not be queryable: {oid:?}"
);
assert!(
!book.is_resting(oid),
"order should not be resting after close"
);
}
}
#[test]
fn close_one_shot_vs_chunked_has_same_logical_result_multi_runner() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut initial = Book::new_multi_runner(market_id, runners);
let mut order_ids = Vec::new();
let orders_n = 300u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let side = if i % 2 == 0 { Side::Yes } else { Side::No };
let events = exec(
&mut initial,
place_order_cmd(market_id, i + 1, 10_000 + i, runner, side, 20000, 100),
);
if let Some(BookEvent::OrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let mut one_shot = initial.clone();
let mut chunked = initial.clone();
let one_shot_batch = (orders_n + 4) as u16;
drop(drive_close(&mut one_shot, market_id, one_shot_batch));
drop(drive_close(&mut chunked, market_id, 200));
assert_logical_close_state(&one_shot, &order_ids);
assert_logical_close_state(&chunked, &order_ids);
}
#[test]
fn close_one_shot_vs_chunked_has_same_logical_result_two_runner() {
let market_id = MarketId(1);
let mut initial = Book::new_two_runner(market_id, RunnerId(1), RunnerId(2));
let mut order_ids = Vec::new();
let orders_n = 300u64;
for i in 0..orders_n {
let runner = (i % 2) as u32 + 1;
let events = exec(
&mut initial,
place_order_cmd(market_id, i + 1, 30_000 + i, runner, Side::Yes, 20000, 100),
);
if let Some(BookEvent::OrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let mut one_shot = initial.clone();
let mut chunked = initial.clone();
let one_shot_batch = (orders_n + 4) as u16;
drop(drive_close(&mut one_shot, market_id, one_shot_batch));
drop(drive_close(&mut chunked, market_id, 200));
assert_logical_close_state(&one_shot, &order_ids);
assert_logical_close_state(&chunked, &order_ids);
}
#[test]
fn close_start_batch_respects_batch_max_events_empty_multi_runner() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
book.set_close_batch_max_events(MIN_CLOSE_BATCH_EVENTS);
let envs = exec_envs(&mut book, close_market_cmd(market_id));
assert!(
envs.len() <= MIN_CLOSE_BATCH_EVENTS as usize + 1,
"emitted {} events, budget {}",
envs.len(),
MIN_CLOSE_BATCH_EVENTS
);
assert_eq!(book.market_state(), BookMarketState::Closed);
}
#[test]
fn close_start_batch_respects_batch_max_events_binary_yes_done() {
let market_id = MarketId(1);
let mut book = Book::new_binary_yes(market_id, RunnerId(1), RunnerId(2), 10_000);
drop(exec(
&mut book,
place_binary_order_cmd(market_id, 1, 50_000, Side::Yes, 4000, 1),
));
drop(exec(
&mut book,
place_binary_order_cmd(market_id, 2, 50_001, Side::Yes, 4000, 1),
));
book.set_close_batch_max_events(MIN_CLOSE_BATCH_EVENTS);
let envs = exec_envs(&mut book, close_market_cmd(market_id));
assert!(
envs.len() <= MIN_CLOSE_BATCH_EVENTS as usize + 2,
"emitted {} events, budget {}",
envs.len(),
MIN_CLOSE_BATCH_EVENTS
);
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
}
#[test]
fn close_process_chunked_completes_binary_yes() {
let market_id = MarketId(1);
let mut initial = Book::new_binary_yes(market_id, RunnerId(1), RunnerId(2), 10_000);
let mut order_ids = Vec::new();
let orders_n = 50u64;
for i in 0..orders_n {
let events = exec(
&mut initial,
place_binary_order_cmd(market_id, i + 1, 40_000 + i, Side::Yes, 4000, 1),
);
if let Some(BookEvent::BinaryOrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let events = drive_close(&mut initial, market_id, 2);
assert_logical_close_state(&initial, &order_ids);
let settled_chunks = events
.iter()
.filter(|e| matches!(e, BookEvent::OrderCancelledBatched { .. }))
.count();
assert!(
settled_chunks > 1,
"expected chunked close to produce multiple OrderCancelledBatched events"
);
}
#[test]
fn replayed_market_state_change_does_not_seed_close_batch_without_explicit_batch_start() {
let market_id = MarketId(1);
let mut book = Book::new_two_runner(market_id, RunnerId(1), RunnerId(2));
drop(exec(
&mut book,
place_order_cmd(market_id, 1, 50_000, 1, Side::Yes, 20_000, 100),
));
let env = BookEventEnvelope {
market_id,
market_name: String::new(),
market_seq: 1,
timestamp: betex::types::unix_epoch(),
metadata: EventMetadata::default(),
event: BookEvent::MarketStateChanged {
to: BookMarketState::Closed,
reason: "manual close".to_string(),
close_batch_max_events: None,
},
};
book.apply_event(&env);
assert_eq!(book.market_state(), BookMarketState::Closed);
assert!(book.batch_process_state().is_none());
}
#[test]
fn close_market_emits_human_readable_reason_and_typed_batch_size() {
let market_id = MarketId(1);
let mut book = Book::new_two_runner(market_id, RunnerId(1), RunnerId(2));
book.set_close_batch_max_events(MAX_CLOSE_BATCH_EVENTS);
let events = exec(&mut book, close_market_cmd(market_id));
let Some(BookEvent::MarketStateChanged {
to,
reason,
close_batch_max_events,
}) = events.first()
else {
panic!("expected first close event to be MarketStateChanged");
};
assert_eq!(*to, BookMarketState::Closed);
assert_eq!(reason, "MANUAL_CLOSE");
assert_eq!(*close_batch_max_events, Some(MAX_CLOSE_BATCH_EVENTS));
}
#[test]
fn close_process_event_sequence_is_deterministic_ignoring_timestamps() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut initial = Book::new_multi_runner(market_id, runners);
let orders_n = 3000u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let side = if i % 2 == 0 { Side::Yes } else { Side::No };
drop(exec(
&mut initial,
place_order_cmd(market_id, i + 1, 20_000 + i, runner, side, 20000, 50),
));
}
let mut a = initial.clone();
let mut b = initial.clone();
let evs_a = drive_close(&mut a, market_id, 1200);
let evs_b = drive_close(&mut b, market_id, 1200);
assert_eq!(a.market_state(), BookMarketState::Closed);
assert_eq!(b.market_state(), BookMarketState::Closed);
assert_eq!(evs_a, evs_b);
}
#[test]
fn continue_close_rejected_when_not_closing() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let err = book
.handle(&continue_close_cmd(market_id))
.expect_err("should error");
assert_eq!(
err.reason,
betex::book::protocol::reject::RejectReason::MarketNotBatchCancelling
);
}
#[test]
fn continue_close_rejected_when_market_not_closing() {
let mut book = Book::new_multi_runner(MarketId(1), vec![RunnerId(1), RunnerId(2)]);
let err = book
.handle(&continue_close_cmd(MarketId(1)))
.expect_err("should error");
assert_eq!(
err.reason,
betex::book::protocol::reject::RejectReason::MarketNotBatchCancelling
);
}
#[test]
fn close_empty_market_succeeds() {
let market_id = MarketId(1);
let runners = vec![RunnerId(1), RunnerId(2)];
let mut book = Book::new_multi_runner(market_id, runners);
let events = drive_close(&mut book, market_id, 100);
assert!(
events
.iter()
.any(|e| matches!(e, BookEvent::BatchProcessCompleted { .. }))
);
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
}
#[test]
fn continue_close_on_empty_queue_is_rejected() {
let market_id = MarketId(1);
let runners = vec![RunnerId(1), RunnerId(2)];
let mut book = Book::new_multi_runner(market_id, runners);
drop(drive_close(&mut book, market_id, 100));
assert_eq!(book.market_state(), BookMarketState::Closed);
let err = book
.handle(&continue_close_cmd(market_id))
.expect_err("should error");
assert_eq!(
err.reason,
betex::book::protocol::reject::RejectReason::MarketNotBatchCancelling
);
}
mod recovery {
use super::*;
#[test]
fn resume_close_from_cursor_after_simulated_restart() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let orders_n = 100u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let side = if i % 2 == 0 { Side::Yes } else { Side::No };
drop(exec(
&mut book,
place_order_cmd(market_id, i + 1, 50_000 + i, runner, side, 20000, 100),
));
}
let batch_size = MIN_CLOSE_BATCH_EVENTS;
book.set_close_batch_max_events(batch_size);
let events = exec(&mut book, close_market_cmd(market_id));
if !has_close_process(&book) {
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
assert!(
events
.iter()
.any(|e| matches!(e, BookEvent::BatchProcessCompleted { .. }))
);
return;
}
assert!(events.iter().any(|e| matches!(
e,
BookEvent::MarketStateChanged {
to: BookMarketState::Closed,
..
}
)));
assert!(has_close_process(&book));
let close_state_before = close_process_state(&book);
assert!(matches!(
close_state_before.context,
BatchProcessContext::Close { total_live_orders } if total_live_orders > 0
));
let mut restarted_book = book.clone();
let remaining_events: Vec<BookEvent> = {
let mut out = Vec::new();
while has_close_process(&restarted_book) {
out.extend(exec(&mut restarted_book, continue_close_cmd(market_id)));
}
out
};
assert_eq!(restarted_book.market_state(), BookMarketState::Closed);
assert_eq!(restarted_book.active_order_count(), 0);
let final_close_state = book.batch_process_state();
assert!(final_close_state.is_some());
assert!(
remaining_events
.iter()
.any(|e| matches!(e, BookEvent::BatchProcessCompleted { .. }))
);
}
#[test]
fn cursor_idempotence_no_double_cancellation() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let mut order_ids = Vec::new();
let orders_n = 100u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let events = exec(
&mut book,
place_order_cmd(market_id, i + 1, 60_000 + i, runner, Side::Yes, 20000, 100),
);
if let Some(BookEvent::OrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let batch_size = 20u16;
book.set_close_batch_max_events(batch_size);
let first_batch_events = exec_envs(&mut book, close_market_cmd(market_id));
let close_state = close_process_state(&book);
let cursor = close_state.cursor_after;
let mut all_cancelled: Vec<OrderId> = Vec::new();
for env in &first_batch_events {
if let BookEvent::OrderCancelledBatched {
cancelled_orders, ..
} = &env.event
{
all_cancelled.extend(cancelled_orders.iter().map(|entry| entry.order_id));
}
}
while has_close_process(&book) {
let events = exec_envs(&mut book, continue_close_cmd(market_id));
for env in events {
if let BookEvent::OrderCancelledBatched {
cancelled_orders, ..
} = &env.event
{
all_cancelled.extend(cancelled_orders.iter().map(|entry| entry.order_id));
}
}
}
let mut sorted = all_cancelled.clone();
sorted.sort();
sorted.dedup();
assert_eq!(
all_cancelled.len(),
sorted.len(),
"Found duplicate cancelled order IDs"
);
if let Some(c) = cursor {
let first_batch_cancelled: Vec<_> = first_batch_events
.iter()
.filter_map(|env| match &env.event {
BookEvent::OrderCancelledBatched {
cancelled_orders, ..
} => Some(cancelled_order_ids(cancelled_orders)),
_ => None,
})
.flatten()
.collect();
for &oid in &first_batch_cancelled {
assert!(
oid <= c,
"Order {:?} in first batch should be <= cursor {:?}",
oid,
c
);
}
}
}
#[test]
fn multi_market_close_recovery() {
let markets: Vec<(MarketId, Book)> = (1..=3)
.map(|i| {
let market_id = MarketId(i);
let mut book =
Book::new_multi_runner(market_id, vec![RunnerId(1), RunnerId(2), RunnerId(3)]);
let orders_n = 100 + (i * 50);
for j in 0..orders_n {
let runner = (j % 3) as u32 + 1;
let side = if j % 2 == 0 { Side::Yes } else { Side::No };
drop(exec(
&mut book,
place_order_cmd(market_id, j + 1, i * 10000 + j, runner, side, 20000, 100),
));
}
(market_id, book)
})
.collect();
let mut closing_books: Vec<(MarketId, Book)> = markets
.into_iter()
.map(|(mid, mut book)| {
book.set_close_batch_max_events(MIN_CLOSE_BATCH_EVENTS);
drop(exec(&mut book, close_market_cmd(mid)));
(mid, book)
})
.collect();
let snapshot: Vec<(MarketId, Book)> = closing_books
.iter()
.map(|(mid, book)| (*mid, book.clone()))
.collect();
for (mid, book) in &snapshot {
assert!(
book.market_state() == BookMarketState::Closed,
"Market {:?} should be Closed, got {:?}",
mid,
book.market_state()
);
}
let mut recovered: Vec<(MarketId, Book)> = snapshot;
for (mid, book) in &mut recovered {
while has_close_process(book) {
drop(exec(book, continue_close_cmd(*mid)));
}
}
for (mid, book) in &recovered {
assert_eq!(
book.market_state(),
BookMarketState::Closed,
"Market {:?} should be Closed after recovery",
mid
);
assert_eq!(
book.active_order_count(),
0,
"Market {:?} should have no active orders",
mid
);
}
for (mid, book) in &mut closing_books {
while has_close_process(book) {
drop(exec(book, continue_close_cmd(*mid)));
}
assert_eq!(book.market_state(), BookMarketState::Closed);
}
}
}