use betex::{
book::BookMarketState,
book::MIN_CLOSE_BATCH_EVENTS,
book::protocol::command::{Command, CommandKind, MarketState, Persistence, Side, TimeInForce},
book::{Book, BookEvent},
types::{AccountId, CorrelationId, MarketId, Money, OddsX10000, OrderId, RunnerId},
};
fn place_order_cmd(
market_id: MarketId,
correlation_id: u64,
account_id: u64,
runner_id: u32,
side: Side,
odds: u32,
stake: i64,
) -> Command {
Command {
correlation_id: CorrelationId(correlation_id),
market_id,
kind: CommandKind::PlaceOrder {
runner_id: RunnerId(runner_id),
account_id: AccountId(account_id),
client_order_id: None,
side,
odds: OddsX10000(odds),
stake: Money(stake),
persistence: Persistence::Persist,
time_in_force: TimeInForce::Gtc,
},
}
}
fn close_market_cmd(market_id: MarketId, batch_max_events: u16) -> Command {
Command {
correlation_id: CorrelationId(0),
market_id,
kind: CommandKind::CloseMarket { batch_max_events },
}
}
fn continue_close_cmd(market_id: MarketId) -> Command {
Command {
correlation_id: CorrelationId(0),
market_id,
kind: CommandKind::ContinueCloseMarket,
}
}
fn place_binary_order_cmd(
market_id: MarketId,
correlation_id: u64,
account_id: u64,
side: Side,
price_ticks: u16,
qty_shares: u64,
) -> Command {
Command {
correlation_id: CorrelationId(correlation_id),
market_id,
kind: CommandKind::PlaceBinaryOrder {
account_id: AccountId(account_id),
client_order_id: None,
side,
price_ticks,
qty_shares,
time_in_force: TimeInForce::Gtc,
},
}
}
fn exec(book: &mut Book, cmd: Command) -> Vec<BookEvent> {
let (events, _) = book.handle(&cmd).expect("command should succeed");
book.apply_all_events(&events);
events.into_iter().map(|e| e.event).collect()
}
fn exec_envs(book: &mut Book, cmd: Command) -> Vec<betex::book::BookEventEnvelope> {
let (events, _) = book.handle(&cmd).expect("command should succeed");
book.apply_all_events(&events);
events
}
fn drive_close(book: &mut Book, market_id: MarketId, batch_max_events: u16) -> Vec<BookEvent> {
let mut out = Vec::new();
out.extend(exec(book, close_market_cmd(market_id, batch_max_events)));
while book.market_state() == BookMarketState::Closing {
out.extend(exec(book, continue_close_cmd(market_id)));
}
out
}
fn assert_logical_close_state(book: &Book, order_ids: &[OrderId]) {
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
for &oid in order_ids {
assert!(
book.get_order(oid).is_none(),
"terminal order should not be queryable: {oid:?}"
);
assert!(
!book.is_resting(oid),
"order should not be resting after close"
);
}
}
#[test]
fn close_one_shot_vs_chunked_has_same_logical_result_multi_runner() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut initial = Book::new_multi_runner(market_id, runners);
let mut order_ids = Vec::new();
let orders_n = 300u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let side = if i % 2 == 0 { Side::Yes } else { Side::No };
let events = exec(
&mut initial,
place_order_cmd(market_id, i + 1, 10_000 + i, runner, side, 20000, 100),
);
if let Some(BookEvent::OrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let mut one_shot = initial.clone();
let mut chunked = initial.clone();
let one_shot_batch = (orders_n + 4) as u16;
let _ = drive_close(&mut one_shot, market_id, one_shot_batch);
let _ = drive_close(&mut chunked, market_id, 200);
assert_logical_close_state(&one_shot, &order_ids);
assert_logical_close_state(&chunked, &order_ids);
}
#[test]
fn close_one_shot_vs_chunked_has_same_logical_result_two_runner() {
let market_id = MarketId(1);
let mut initial = Book::new_two_runner(market_id, RunnerId(1), RunnerId(2));
let mut order_ids = Vec::new();
let orders_n = 300u64;
for i in 0..orders_n {
let runner = (i % 2) as u32 + 1;
let events = exec(
&mut initial,
place_order_cmd(market_id, i + 1, 30_000 + i, runner, Side::Yes, 20000, 100),
);
if let Some(BookEvent::OrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let mut one_shot = initial.clone();
let mut chunked = initial.clone();
let one_shot_batch = (orders_n + 4) as u16;
let _ = drive_close(&mut one_shot, market_id, one_shot_batch);
let _ = drive_close(&mut chunked, market_id, 200);
assert_logical_close_state(&one_shot, &order_ids);
assert_logical_close_state(&chunked, &order_ids);
}
#[test]
fn close_start_batch_respects_batch_max_events_empty_multi_runner() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let envs = exec_envs(
&mut book,
close_market_cmd(market_id, MIN_CLOSE_BATCH_EVENTS),
);
assert!(
envs.len() <= MIN_CLOSE_BATCH_EVENTS as usize,
"emitted {} events, budget {}",
envs.len(),
MIN_CLOSE_BATCH_EVENTS
);
assert_eq!(book.market_state(), BookMarketState::Closed);
}
#[test]
fn close_start_batch_respects_batch_max_events_binary_yes_done() {
let market_id = MarketId(1);
let mut book = Book::new_binary_yes(market_id, RunnerId(1), RunnerId(2), 10_000);
let _ = exec(
&mut book,
place_binary_order_cmd(market_id, 1, 50_000, Side::Yes, 4000, 1),
);
let _ = exec(
&mut book,
place_binary_order_cmd(market_id, 2, 50_001, Side::Yes, 4000, 1),
);
let envs = exec_envs(
&mut book,
close_market_cmd(market_id, MIN_CLOSE_BATCH_EVENTS),
);
assert!(
envs.len() <= MIN_CLOSE_BATCH_EVENTS as usize,
"emitted {} events, budget {}",
envs.len(),
MIN_CLOSE_BATCH_EVENTS
);
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
}
#[test]
fn close_process_chunked_completes_binary_yes() {
let market_id = MarketId(1);
let mut initial = Book::new_binary_yes(market_id, RunnerId(1), RunnerId(2), 10_000);
let mut order_ids = Vec::new();
let orders_n = 50u64;
for i in 0..orders_n {
let events = exec(
&mut initial,
place_binary_order_cmd(market_id, i + 1, 40_000 + i, Side::Yes, 4000, 1),
);
if let Some(BookEvent::BinaryOrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let events = drive_close(&mut initial, market_id, 2);
assert_logical_close_state(&initial, &order_ids);
let settled_chunks = events
.iter()
.filter(|e| matches!(e, BookEvent::MarketOrdersSettled { .. }))
.count();
assert!(
settled_chunks > 1,
"expected chunked close to produce multiple MarketOrdersSettled events"
);
}
#[test]
fn close_process_event_sequence_is_deterministic_ignoring_timestamps() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut initial = Book::new_multi_runner(market_id, runners);
let orders_n = 3000u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let side = if i % 2 == 0 { Side::Yes } else { Side::No };
let _ = exec(
&mut initial,
place_order_cmd(market_id, i + 1, 20_000 + i, runner, side, 20000, 50),
);
}
let mut a = initial.clone();
let mut b = initial.clone();
let evs_a = drive_close(&mut a, market_id, 1200);
let evs_b = drive_close(&mut b, market_id, 1200);
assert_eq!(a.market_state(), BookMarketState::Closed);
assert_eq!(b.market_state(), BookMarketState::Closed);
assert_eq!(evs_a, evs_b);
}
#[test]
fn continue_close_rejected_when_not_closing() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let cmd = Command {
correlation_id: CorrelationId(1),
market_id,
kind: CommandKind::SetMarketState {
state: MarketState::Open,
},
};
let _ = exec(&mut book, cmd);
let err = book
.handle(&continue_close_cmd(market_id))
.err()
.expect("should error");
assert_eq!(
err.reason,
betex::protocol::reject::RejectReason::MarketNotClosing
);
}
#[test]
fn continue_close_rejected_when_market_not_closing() {
let mut book = Book::new_multi_runner(MarketId(1), vec![RunnerId(1), RunnerId(2)]);
let cmd = Command {
correlation_id: CorrelationId(1),
market_id: MarketId(1),
kind: CommandKind::SetMarketState {
state: MarketState::Open,
},
};
exec(&mut book, cmd);
let err = book
.handle(&continue_close_cmd(MarketId(1)))
.err()
.expect("should error");
assert_eq!(
err.reason,
betex::protocol::reject::RejectReason::MarketNotClosing
);
}
#[test]
fn close_empty_market_succeeds() {
let market_id = MarketId(1);
let runners = vec![RunnerId(1), RunnerId(2)];
let mut book = Book::new_multi_runner(market_id, runners);
let events = drive_close(&mut book, market_id, 100);
assert!(
events
.iter()
.any(|e| matches!(e, BookEvent::MarketOrdersSettled { is_final: true, .. }))
);
assert_eq!(book.market_state(), BookMarketState::Closed);
assert_eq!(book.active_order_count(), 0);
}
#[test]
fn continue_close_on_empty_queue_is_rejected() {
let market_id = MarketId(1);
let runners = vec![RunnerId(1), RunnerId(2)];
let mut book = Book::new_multi_runner(market_id, runners);
let _ = drive_close(&mut book, market_id, 100);
assert_eq!(book.market_state(), BookMarketState::Closed);
let err = book
.handle(&continue_close_cmd(market_id))
.err()
.expect("should error");
assert_eq!(
err.reason,
betex::protocol::reject::RejectReason::MarketNotClosing
);
}
mod recovery {
use super::*;
#[test]
fn resume_close_from_cursor_after_simulated_restart() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let orders_n = 100u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let side = if i % 2 == 0 { Side::Yes } else { Side::No };
let _ = exec(
&mut book,
place_order_cmd(market_id, i + 1, 50_000 + i, runner, side, 20000, 100),
);
}
let batch_size = MIN_CLOSE_BATCH_EVENTS;
let events = exec(&mut book, close_market_cmd(market_id, batch_size));
if book.market_state() == BookMarketState::Closed {
assert_eq!(book.active_order_count(), 0);
assert!(
events
.iter()
.any(|e| matches!(e, BookEvent::MarketOrdersSettled { is_final: true, .. }))
);
return;
}
assert!(events.iter().any(|e| matches!(
e,
BookEvent::MarketStateChanged {
to: BookMarketState::Closing,
..
}
)));
assert_eq!(book.market_state(), BookMarketState::Closing);
let _close_state_before = book.close_process_state().expect("should have close state");
let mut restarted_book = book.clone();
let remaining_events: Vec<BookEvent> = {
let mut out = Vec::new();
while restarted_book.market_state() == BookMarketState::Closing {
out.extend(exec(&mut restarted_book, continue_close_cmd(market_id)));
}
out
};
assert_eq!(restarted_book.market_state(), BookMarketState::Closed);
assert_eq!(restarted_book.active_order_count(), 0);
let final_close_state = book.close_process_state();
assert_eq!(book.market_state(), BookMarketState::Closing);
assert!(final_close_state.is_some());
assert!(
remaining_events
.iter()
.any(|e| matches!(e, BookEvent::MarketOrdersSettled { is_final: true, .. }))
);
}
#[test]
fn cursor_idempotence_no_double_cancellation() {
let market_id = MarketId(1);
let runners = [RunnerId(1), RunnerId(2), RunnerId(3)];
let mut book = Book::new_multi_runner(market_id, runners);
let mut order_ids = Vec::new();
let orders_n = 100u64;
for i in 0..orders_n {
let runner = (i % 3) as u32 + 1;
let events = exec(
&mut book,
place_order_cmd(market_id, i + 1, 60_000 + i, runner, Side::Yes, 20000, 100),
);
if let Some(BookEvent::OrderAccepted { order_id, .. }) = events.first() {
order_ids.push(*order_id);
}
}
let batch_size = 20u16;
let first_batch_events = exec_envs(&mut book, close_market_cmd(market_id, batch_size));
let _state_after_first = book.clone();
let close_state = book.close_process_state().expect("close state");
let cursor = close_state.cursor_after;
let mut all_cancelled: Vec<OrderId> = Vec::new();
for env in &first_batch_events {
if let BookEvent::MarketOrdersSettled { order_ids, .. } = &env.event {
all_cancelled.extend(order_ids);
}
}
while book.market_state() == BookMarketState::Closing {
let events = exec_envs(&mut book, continue_close_cmd(market_id));
for env in events {
if let BookEvent::MarketOrdersSettled { order_ids, .. } = &env.event {
all_cancelled.extend(order_ids);
}
}
}
let mut sorted = all_cancelled.clone();
sorted.sort();
sorted.dedup();
assert_eq!(
all_cancelled.len(),
sorted.len(),
"Found duplicate cancelled order IDs"
);
if let Some(c) = cursor {
let first_batch_cancelled: Vec<_> = first_batch_events
.iter()
.filter_map(|env| match &env.event {
BookEvent::MarketOrdersSettled { order_ids, .. } => Some(order_ids.clone()),
_ => None,
})
.flatten()
.collect();
for &oid in &first_batch_cancelled {
assert!(
oid <= c,
"Order {:?} in first batch should be <= cursor {:?}",
oid,
c
);
}
}
}
#[test]
fn multi_market_close_recovery() {
let markets: Vec<(MarketId, Book)> = (1..=3)
.map(|i| {
let market_id = MarketId(i);
let mut book =
Book::new_multi_runner(market_id, vec![RunnerId(1), RunnerId(2), RunnerId(3)]);
let orders_n = 100 + (i as u64 * 50);
for j in 0..orders_n {
let runner = (j % 3) as u32 + 1;
let side = if j % 2 == 0 { Side::Yes } else { Side::No };
let _ = exec(
&mut book,
place_order_cmd(market_id, j + 1, i * 10000 + j, runner, side, 20000, 100),
);
}
(market_id, book)
})
.collect();
let mut closing_books: Vec<(MarketId, Book)> = markets
.into_iter()
.map(|(mid, mut book)| {
let _ = exec(&mut book, close_market_cmd(mid, MIN_CLOSE_BATCH_EVENTS));
(mid, book)
})
.collect();
let snapshot: Vec<(MarketId, Book)> = closing_books
.iter()
.map(|(mid, book)| (*mid, book.clone()))
.collect();
for (mid, book) in &snapshot {
assert!(
book.market_state() == BookMarketState::Closing
|| book.market_state() == BookMarketState::Closed,
"Market {:?} should be Closing or Closed, got {:?}",
mid,
book.market_state()
);
}
let mut recovered: Vec<(MarketId, Book)> = snapshot;
for (mid, book) in &mut recovered {
while book.market_state() == BookMarketState::Closing {
let _ = exec(book, continue_close_cmd(*mid));
}
}
for (mid, book) in &recovered {
assert_eq!(
book.market_state(),
BookMarketState::Closed,
"Market {:?} should be Closed after recovery",
mid
);
assert_eq!(
book.active_order_count(),
0,
"Market {:?} should have no active orders",
mid
);
}
for (mid, book) in &mut closing_books {
while book.market_state() == BookMarketState::Closing {
let _ = exec(book, continue_close_cmd(*mid));
}
assert_eq!(book.market_state(), BookMarketState::Closed);
}
}
}