Skip to main content

orderbook_rs/orderbook/sequencer/
replay.rs

1//! Deterministic replay engine for event journals.
2//!
3//! [`ReplayEngine`] reads a sequence of [`SequencerEvent`]s from a [`Journal`]
4//! and re-applies each command to a fresh [`OrderBook`], producing an
5//! identical final state. This enables disaster recovery, audit compliance,
6//! and state verification.
7
8use super::error::JournalError;
9use super::journal::Journal;
10use super::types::{SequencerCommand, SequencerEvent, SequencerResult};
11use crate::orderbook::{OrderBook, OrderBookError, OrderBookSnapshot};
12use serde::{Deserialize, Serialize};
13use std::marker::PhantomData;
14use thiserror::Error;
15
16/// Errors that can occur during journal replay.
17#[derive(Debug, Error)]
18pub enum ReplayError {
19    /// The journal contains no events to replay.
20    #[error("journal is empty — nothing to replay")]
21    EmptyJournal,
22
23    /// The requested starting sequence number exceeds the journal's last entry.
24    #[error("invalid from_sequence {from_sequence}: journal last sequence is {last_sequence}")]
25    InvalidSequence {
26        /// The sequence number requested.
27        from_sequence: u64,
28        /// The last sequence number in the journal.
29        last_sequence: u64,
30    },
31
32    /// A gap was detected between expected and found sequence numbers.
33    #[error("sequence gap detected: expected {expected}, found {found}")]
34    SequenceGap {
35        /// The expected next sequence number.
36        expected: u64,
37        /// The actual sequence number found.
38        found: u64,
39    },
40
41    /// An OrderBook operation failed during replay.
42    #[error("order book error during replay at sequence {sequence_num}: {source}")]
43    OrderBookError {
44        /// The sequence number of the event that caused the error.
45        sequence_num: u64,
46        /// The underlying error.
47        #[source]
48        source: OrderBookError,
49    },
50
51    /// The replayed state does not match the expected snapshot.
52    #[error("snapshot mismatch: replayed state diverges from expected snapshot")]
53    SnapshotMismatch,
54
55    /// Journal read error during replay.
56    #[error("journal error during replay: {0}")]
57    JournalError(#[from] JournalError),
58}
59
60/// Stateless replay engine that reconstructs [`OrderBook`] state from a [`Journal`].
61///
62/// All methods are associated functions (no `&self` receiver) — `ReplayEngine`
63/// holds no state itself. Use it as a namespace for replay operations.
64pub struct ReplayEngine<T> {
65    _phantom: PhantomData<T>,
66}
67
68impl<T> ReplayEngine<T>
69where
70    T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + Default + 'static,
71{
72    /// Replays all events from `from_sequence` onwards onto a fresh [`OrderBook`].
73    ///
74    /// Returns the reconstructed book and the sequence number of the last
75    /// event applied. Only successful commands (non-`Rejected` results) are
76    /// replayed — rejected events are skipped without error.
77    ///
78    /// # Arguments
79    ///
80    /// * `journal` — the event source
81    /// * `from_sequence` — first sequence number to include (inclusive); pass `0` for full replay
82    /// * `symbol` — symbol used to create the fresh OrderBook
83    ///
84    /// # Errors
85    ///
86    /// - [`ReplayError::EmptyJournal`] if the journal has no events
87    /// - [`ReplayError::InvalidSequence`] if `from_sequence` > last journal sequence
88    /// - [`ReplayError::OrderBookError`] if a command fails unexpectedly during replay
89    /// - [`ReplayError::JournalError`] if reading from the journal fails
90    pub fn replay_from(
91        journal: &impl Journal<T>,
92        from_sequence: u64,
93        symbol: &str,
94    ) -> Result<(OrderBook<T>, u64), ReplayError> {
95        Self::replay_from_with_progress(journal, from_sequence, symbol, |_, _| {})
96    }
97
98    /// Replays events with a progress callback invoked after each applied event.
99    ///
100    /// The callback receives `(events_applied: u64, current_sequence: u64)`.
101    /// Useful for long replays where progress reporting is needed.
102    ///
103    /// # Arguments
104    ///
105    /// * `journal` — the event source
106    /// * `from_sequence` — first sequence number to include; pass `0` for full replay
107    /// * `symbol` — symbol for the fresh OrderBook
108    /// * `progress` — callback invoked after each event: `(events_applied, sequence_num)`
109    ///
110    /// # Errors
111    ///
112    /// Same as [`replay_from`](Self::replay_from).
113    pub fn replay_from_with_progress(
114        journal: &impl Journal<T>,
115        from_sequence: u64,
116        symbol: &str,
117        progress: impl Fn(u64, u64),
118    ) -> Result<(OrderBook<T>, u64), ReplayError> {
119        let last_seq = match journal.last_sequence() {
120            Some(seq) => seq,
121            None => return Err(ReplayError::EmptyJournal),
122        };
123
124        if from_sequence > last_seq {
125            return Err(ReplayError::InvalidSequence {
126                from_sequence,
127                last_sequence: last_seq,
128            });
129        }
130
131        let book = OrderBook::new(symbol);
132        let mut last_applied_seq = 0u64;
133        let mut count = 0u64;
134        let mut expected_seq = from_sequence;
135
136        let iter = journal.read_from(from_sequence)?;
137
138        for entry_result in iter {
139            let entry = entry_result?;
140            let event = &entry.event;
141
142            // Gap detection
143            if event.sequence_num != expected_seq {
144                return Err(ReplayError::SequenceGap {
145                    expected: expected_seq,
146                    found: event.sequence_num,
147                });
148            }
149
150            Self::apply_event(&book, event)?;
151            last_applied_seq = event.sequence_num;
152            count = count.saturating_add(1);
153            expected_seq = expected_seq.saturating_add(1);
154            progress(count, last_applied_seq);
155        }
156
157        Ok((book, last_applied_seq))
158    }
159
160    /// Replays the full journal and compares the result to an expected snapshot.
161    ///
162    /// Returns `Ok(true)` if the replayed state matches, `Ok(false)` if it
163    /// diverges. The comparison uses [`snapshots_match`] which checks symbol,
164    /// bid price levels, and ask price levels.
165    ///
166    /// # Errors
167    ///
168    /// - [`ReplayError::EmptyJournal`] if the journal has no events
169    /// - [`ReplayError::OrderBookError`] if replay fails
170    /// - [`ReplayError::JournalError`] if reading from the journal fails
171    pub fn verify(
172        journal: &impl Journal<T>,
173        expected_snapshot: &OrderBookSnapshot,
174    ) -> Result<bool, ReplayError> {
175        let (book, _) = Self::replay_from(journal, 0, &expected_snapshot.symbol)?;
176        let actual = book.create_snapshot(usize::MAX);
177        Ok(snapshots_match(&actual, expected_snapshot))
178    }
179
180    /// Applies a single sequencer event to the given book.
181    ///
182    /// Events with `Rejected` results are skipped — they represent commands
183    /// that failed at write time and must not be re-applied during replay.
184    fn apply_event(book: &OrderBook<T>, event: &SequencerEvent<T>) -> Result<(), ReplayError> {
185        // Skip events whose original execution was rejected.
186        if matches!(event.result, SequencerResult::Rejected { .. }) {
187            return Ok(());
188        }
189
190        match &event.command {
191            SequencerCommand::AddOrder(order) => {
192                book.add_order(order.clone())
193                    .map_err(|e| ReplayError::OrderBookError {
194                        sequence_num: event.sequence_num,
195                        source: e,
196                    })?;
197            }
198            SequencerCommand::CancelOrder(id) => {
199                book.cancel_order(*id)
200                    .map_err(|e| ReplayError::OrderBookError {
201                        sequence_num: event.sequence_num,
202                        source: e,
203                    })?;
204            }
205            SequencerCommand::UpdateOrder(update) => {
206                book.update_order(*update)
207                    .map_err(|e| ReplayError::OrderBookError {
208                        sequence_num: event.sequence_num,
209                        source: e,
210                    })?;
211            }
212            SequencerCommand::MarketOrder { id, quantity, side } => {
213                book.submit_market_order(*id, *quantity, *side)
214                    .map_err(|e| ReplayError::OrderBookError {
215                        sequence_num: event.sequence_num,
216                        source: e,
217                    })?;
218            }
219            SequencerCommand::CancelAll => {
220                let _ = book.cancel_all_orders();
221            }
222            SequencerCommand::CancelBySide { side } => {
223                let _ = book.cancel_orders_by_side(*side);
224            }
225            SequencerCommand::CancelByUser { user_id } => {
226                let _ = book.cancel_orders_by_user(*user_id);
227            }
228            SequencerCommand::CancelByPriceRange {
229                side,
230                min_price,
231                max_price,
232            } => {
233                let _ = book.cancel_orders_by_price_range(*side, *min_price, *max_price);
234            }
235        }
236
237        Ok(())
238    }
239}
240
241/// Compares two [`OrderBookSnapshot`]s for structural equality.
242///
243/// Two snapshots are considered equal when:
244/// - `symbol` is identical
245/// - The sorted bid price levels match (by price, then visible quantity)
246/// - The sorted ask price levels match (by price, then visible quantity)
247///
248/// Timestamps are intentionally excluded from comparison because replayed
249/// books may be created at a different wall-clock time than the original.
250#[must_use]
251pub fn snapshots_match(actual: &OrderBookSnapshot, expected: &OrderBookSnapshot) -> bool {
252    if actual.symbol != expected.symbol {
253        return false;
254    }
255
256    // Compare bids sorted by price descending (highest bid first)
257    let mut actual_bids: Vec<_> = actual.bids.iter().collect();
258    let mut expected_bids: Vec<_> = expected.bids.iter().collect();
259    actual_bids.sort_by_key(|b| std::cmp::Reverse(b.price()));
260    expected_bids.sort_by_key(|b| std::cmp::Reverse(b.price()));
261
262    if actual_bids.len() != expected_bids.len() {
263        return false;
264    }
265    for (a, b) in actual_bids.iter().zip(expected_bids.iter()) {
266        if a.price() != b.price() || a.visible_quantity() != b.visible_quantity() {
267            return false;
268        }
269    }
270
271    // Compare asks sorted by price ascending (lowest ask first)
272    let mut actual_asks: Vec<_> = actual.asks.iter().collect();
273    let mut expected_asks: Vec<_> = expected.asks.iter().collect();
274    actual_asks.sort_by_key(|l| l.price());
275    expected_asks.sort_by_key(|l| l.price());
276
277    if actual_asks.len() != expected_asks.len() {
278        return false;
279    }
280    for (a, b) in actual_asks.iter().zip(expected_asks.iter()) {
281        if a.price() != b.price() || a.visible_quantity() != b.visible_quantity() {
282            return false;
283        }
284    }
285
286    true
287}