orderbook_rs/orderbook/sequencer/replay.rs
1//! Deterministic replay engine for event journals.
2//!
3//! [`ReplayEngine`] reads a sequence of [`SequencerEvent`]s from a [`Journal`]
4//! and re-applies each command to a fresh [`OrderBook`], producing an
5//! identical final state. This enables disaster recovery, audit compliance,
6//! and state verification.
7
8use super::error::JournalError;
9use super::journal::Journal;
10use super::types::{SequencerCommand, SequencerEvent, SequencerResult};
11use crate::orderbook::{OrderBook, OrderBookError, OrderBookSnapshot};
12use serde::{Deserialize, Serialize};
13use std::marker::PhantomData;
14use thiserror::Error;
15
16/// Errors that can occur during journal replay.
17#[derive(Debug, Error)]
18pub enum ReplayError {
19 /// The journal contains no events to replay.
20 #[error("journal is empty — nothing to replay")]
21 EmptyJournal,
22
23 /// The requested starting sequence number exceeds the journal's last entry.
24 #[error("invalid from_sequence {from_sequence}: journal last sequence is {last_sequence}")]
25 InvalidSequence {
26 /// The sequence number requested.
27 from_sequence: u64,
28 /// The last sequence number in the journal.
29 last_sequence: u64,
30 },
31
32 /// A gap was detected between expected and found sequence numbers.
33 #[error("sequence gap detected: expected {expected}, found {found}")]
34 SequenceGap {
35 /// The expected next sequence number.
36 expected: u64,
37 /// The actual sequence number found.
38 found: u64,
39 },
40
41 /// An OrderBook operation failed during replay.
42 #[error("order book error during replay at sequence {sequence_num}: {source}")]
43 OrderBookError {
44 /// The sequence number of the event that caused the error.
45 sequence_num: u64,
46 /// The underlying error.
47 #[source]
48 source: OrderBookError,
49 },
50
51 /// The replayed state does not match the expected snapshot.
52 #[error("snapshot mismatch: replayed state diverges from expected snapshot")]
53 SnapshotMismatch,
54
55 /// Journal read error during replay.
56 #[error("journal error during replay: {0}")]
57 JournalError(#[from] JournalError),
58}
59
60/// Stateless replay engine that reconstructs [`OrderBook`] state from a [`Journal`].
61///
62/// All methods are associated functions (no `&self` receiver) — `ReplayEngine`
63/// holds no state itself. Use it as a namespace for replay operations.
64pub struct ReplayEngine<T> {
65 _phantom: PhantomData<T>,
66}
67
68impl<T> ReplayEngine<T>
69where
70 T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Default + 'static,
71{
72 /// Replays all events from `from_sequence` onwards onto a fresh [`OrderBook`].
73 ///
74 /// Returns the reconstructed book and the sequence number of the last
75 /// event applied. Only successful commands (non-`Rejected` results) are
76 /// replayed — rejected events are skipped without error.
77 ///
78 /// # Arguments
79 ///
80 /// * `journal` — the event source
81 /// * `from_sequence` — first sequence number to include (inclusive); pass `0` for full replay
82 /// * `symbol` — symbol used to create the fresh OrderBook
83 ///
84 /// # Errors
85 ///
86 /// - [`ReplayError::EmptyJournal`] if the journal has no events
87 /// - [`ReplayError::InvalidSequence`] if `from_sequence` > last journal sequence
88 /// - [`ReplayError::OrderBookError`] if a command fails unexpectedly during replay
89 /// - [`ReplayError::JournalError`] if reading from the journal fails
90 pub fn replay_from(
91 journal: &impl Journal<T>,
92 from_sequence: u64,
93 symbol: &str,
94 ) -> Result<(OrderBook<T>, u64), ReplayError> {
95 Self::replay_from_with_progress(journal, from_sequence, symbol, |_, _| {})
96 }
97
98 /// Replays events with a progress callback invoked after each applied event.
99 ///
100 /// The callback receives `(events_applied: u64, current_sequence: u64)`.
101 /// Useful for long replays where progress reporting is needed.
102 ///
103 /// # Arguments
104 ///
105 /// * `journal` — the event source
106 /// * `from_sequence` — first sequence number to include; pass `0` for full replay
107 /// * `symbol` — symbol for the fresh OrderBook
108 /// * `progress` — callback invoked after each event: `(events_applied, sequence_num)`
109 ///
110 /// # Errors
111 ///
112 /// Same as [`replay_from`](Self::replay_from).
113 pub fn replay_from_with_progress(
114 journal: &impl Journal<T>,
115 from_sequence: u64,
116 symbol: &str,
117 progress: impl Fn(u64, u64),
118 ) -> Result<(OrderBook<T>, u64), ReplayError> {
119 let last_seq = match journal.last_sequence() {
120 Some(seq) => seq,
121 None => return Err(ReplayError::EmptyJournal),
122 };
123
124 if from_sequence > last_seq {
125 return Err(ReplayError::InvalidSequence {
126 from_sequence,
127 last_sequence: last_seq,
128 });
129 }
130
131 let book = OrderBook::new(symbol);
132 let mut last_applied_seq = 0u64;
133 let mut count = 0u64;
134 let mut expected_seq = from_sequence;
135
136 let iter = journal.read_from(from_sequence)?;
137
138 for entry_result in iter {
139 let entry = entry_result?;
140 let event = &entry.event;
141
142 // Gap detection
143 if event.sequence_num != expected_seq {
144 return Err(ReplayError::SequenceGap {
145 expected: expected_seq,
146 found: event.sequence_num,
147 });
148 }
149
150 Self::apply_event(&book, event)?;
151 last_applied_seq = event.sequence_num;
152 count = count.saturating_add(1);
153 expected_seq = expected_seq.saturating_add(1);
154 progress(count, last_applied_seq);
155 }
156
157 Ok((book, last_applied_seq))
158 }
159
160 /// Replays the full journal and compares the result to an expected snapshot.
161 ///
162 /// Returns `Ok(true)` if the replayed state matches, `Ok(false)` if it
163 /// diverges. The comparison uses [`snapshots_match`] which checks symbol,
164 /// bid price levels, and ask price levels.
165 ///
166 /// # Errors
167 ///
168 /// - [`ReplayError::EmptyJournal`] if the journal has no events
169 /// - [`ReplayError::OrderBookError`] if replay fails
170 /// - [`ReplayError::JournalError`] if reading from the journal fails
171 pub fn verify(
172 journal: &impl Journal<T>,
173 expected_snapshot: &OrderBookSnapshot,
174 ) -> Result<bool, ReplayError> {
175 let (book, _) = Self::replay_from(journal, 0, &expected_snapshot.symbol)?;
176 let actual = book.create_snapshot(usize::MAX);
177 Ok(snapshots_match(&actual, expected_snapshot))
178 }
179
180 /// Applies a single sequencer event to the given book.
181 ///
182 /// Events with `Rejected` results are skipped — they represent commands
183 /// that failed at write time and must not be re-applied during replay.
184 fn apply_event(book: &OrderBook<T>, event: &SequencerEvent<T>) -> Result<(), ReplayError> {
185 // Skip events whose original execution was rejected.
186 if matches!(event.result, SequencerResult::Rejected { .. }) {
187 return Ok(());
188 }
189
190 match &event.command {
191 SequencerCommand::AddOrder(order) => {
192 book.add_order(order.clone())
193 .map_err(|e| ReplayError::OrderBookError {
194 sequence_num: event.sequence_num,
195 source: e,
196 })?;
197 }
198 SequencerCommand::CancelOrder(id) => {
199 book.cancel_order(*id)
200 .map_err(|e| ReplayError::OrderBookError {
201 sequence_num: event.sequence_num,
202 source: e,
203 })?;
204 }
205 SequencerCommand::UpdateOrder(update) => {
206 book.update_order(*update)
207 .map_err(|e| ReplayError::OrderBookError {
208 sequence_num: event.sequence_num,
209 source: e,
210 })?;
211 }
212 SequencerCommand::MarketOrder { id, quantity, side } => {
213 book.submit_market_order(*id, *quantity, *side)
214 .map_err(|e| ReplayError::OrderBookError {
215 sequence_num: event.sequence_num,
216 source: e,
217 })?;
218 }
219 SequencerCommand::CancelAll => {
220 let _ = book.cancel_all_orders();
221 }
222 SequencerCommand::CancelBySide { side } => {
223 let _ = book.cancel_orders_by_side(*side);
224 }
225 SequencerCommand::CancelByUser { user_id } => {
226 let _ = book.cancel_orders_by_user(*user_id);
227 }
228 SequencerCommand::CancelByPriceRange {
229 side,
230 min_price,
231 max_price,
232 } => {
233 let _ = book.cancel_orders_by_price_range(*side, *min_price, *max_price);
234 }
235 }
236
237 Ok(())
238 }
239}
240
241/// Compares two [`OrderBookSnapshot`]s for structural equality.
242///
243/// Two snapshots are considered equal when:
244/// - `symbol` is identical
245/// - The sorted bid price levels match (by price, then visible quantity)
246/// - The sorted ask price levels match (by price, then visible quantity)
247///
248/// Timestamps are intentionally excluded from comparison because replayed
249/// books may be created at a different wall-clock time than the original.
250#[must_use]
251pub fn snapshots_match(actual: &OrderBookSnapshot, expected: &OrderBookSnapshot) -> bool {
252 if actual.symbol != expected.symbol {
253 return false;
254 }
255
256 // Compare bids sorted by price descending (highest bid first)
257 let mut actual_bids: Vec<_> = actual.bids.iter().collect();
258 let mut expected_bids: Vec<_> = expected.bids.iter().collect();
259 actual_bids.sort_by_key(|b| std::cmp::Reverse(b.price()));
260 expected_bids.sort_by_key(|b| std::cmp::Reverse(b.price()));
261
262 if actual_bids.len() != expected_bids.len() {
263 return false;
264 }
265 for (a, b) in actual_bids.iter().zip(expected_bids.iter()) {
266 if a.price() != b.price() || a.visible_quantity() != b.visible_quantity() {
267 return false;
268 }
269 }
270
271 // Compare asks sorted by price ascending (lowest ask first)
272 let mut actual_asks: Vec<_> = actual.asks.iter().collect();
273 let mut expected_asks: Vec<_> = expected.asks.iter().collect();
274 actual_asks.sort_by_key(|l| l.price());
275 expected_asks.sort_by_key(|l| l.price());
276
277 if actual_asks.len() != expected_asks.len() {
278 return false;
279 }
280 for (a, b) in actual_asks.iter().zip(expected_asks.iter()) {
281 if a.price() != b.price() || a.visible_quantity() != b.visible_quantity() {
282 return false;
283 }
284 }
285
286 true
287}