Skip to main content

nautilus_event_store/
replay.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bootstrap replay for restoring cache state after a cache-owned snapshot.
17//!
18//! This module is deliberately state-only: it consumes event-store entries, decodes the
19//! cache-affecting payloads, and mutates [`nautilus_common::cache::Cache`] directly. It
20//! does not publish to the live message bus, send commands, invoke adapters, or submit
21//! entries back into the event store.
22
23use std::{fmt::Display, path::PathBuf};
24
25use nautilus_common::{
26    cache::Cache,
27    messages::{
28        data::{
29            BarsResponse, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
30            QuotesResponse, TradesResponse,
31        },
32        execution::SubmitOrderList,
33    },
34};
35use nautilus_core::UnixNanos;
36use nautilus_model::{
37    data::{Bar, QuoteTick, TradeTick},
38    enums::OmsType,
39    events::{
40        AccountState, OrderEventAny, OrderFilled, OrderInitialized, PositionAdjusted,
41        PositionChanged, PositionClosed, PositionOpened,
42    },
43    orders::OrderAny,
44    position::Position,
45};
46use serde::de::DeserializeOwned;
47
48#[cfg(test)]
49use crate::capture::builtins::{
50    PAYLOAD_TYPE_BATCH_CANCEL_ORDERS, PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
51    PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE, PAYLOAD_TYPE_BOOK_RESPONSE, PAYLOAD_TYPE_CANCEL_ALL_ORDERS,
52    PAYLOAD_TYPE_CANCEL_ORDER, PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE,
53    PAYLOAD_TYPE_EXECUTION_MASS_STATUS, PAYLOAD_TYPE_FILL_REPORT,
54    PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE, PAYLOAD_TYPE_MODIFY_ORDER,
55    PAYLOAD_TYPE_ORDER_STATUS_REPORT, PAYLOAD_TYPE_ORDER_WITH_FILLS,
56    PAYLOAD_TYPE_POSITION_STATUS_REPORT, PAYLOAD_TYPE_QUERY_ACCOUNT, PAYLOAD_TYPE_QUERY_ORDER,
57    PAYLOAD_TYPE_REQUEST_COMMAND, PAYLOAD_TYPE_SUBMIT_ORDER, PAYLOAD_TYPE_SUBSCRIBE_COMMAND,
58    PAYLOAD_TYPE_TIME_EVENT, PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND,
59};
60#[cfg(all(test, feature = "defi"))]
61use crate::capture::builtins::{
62    PAYLOAD_TYPE_DEFI_REQUEST_COMMAND, PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND,
63    PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND,
64};
65use crate::{
66    RedbBackend,
67    backend::{EventStore, ScanDirection},
68    capture::builtins::{
69        PAYLOAD_TYPE_ACCOUNT_STATE, PAYLOAD_TYPE_BARS_RESPONSE,
70        PAYLOAD_TYPE_FUNDING_RATES_RESPONSE, PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
71        PAYLOAD_TYPE_INSTRUMENTS_RESPONSE, PAYLOAD_TYPE_ORDER_ACCEPTED,
72        PAYLOAD_TYPE_ORDER_CANCEL_REJECTED, PAYLOAD_TYPE_ORDER_CANCELED, PAYLOAD_TYPE_ORDER_DENIED,
73        PAYLOAD_TYPE_ORDER_EMULATED, PAYLOAD_TYPE_ORDER_EXPIRED, PAYLOAD_TYPE_ORDER_FILLED,
74        PAYLOAD_TYPE_ORDER_INITIALIZED, PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
75        PAYLOAD_TYPE_ORDER_PENDING_CANCEL, PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
76        PAYLOAD_TYPE_ORDER_REJECTED, PAYLOAD_TYPE_ORDER_RELEASED, PAYLOAD_TYPE_ORDER_SUBMITTED,
77        PAYLOAD_TYPE_ORDER_TRIGGERED, PAYLOAD_TYPE_ORDER_UPDATED, PAYLOAD_TYPE_POSITION_ADJUSTED,
78        PAYLOAD_TYPE_POSITION_CHANGED, PAYLOAD_TYPE_POSITION_CLOSED, PAYLOAD_TYPE_POSITION_OPENED,
79        PAYLOAD_TYPE_QUOTES_RESPONSE, PAYLOAD_TYPE_SUBMIT_ORDER_LIST, PAYLOAD_TYPE_TRADES_RESPONSE,
80    },
81    entry::EventStoreEntry,
82    error::EventStoreError,
83    manifest::{RunManifest, RunStatus},
84    reader::{EventStoreReader, SnapshotReplayPlan},
85    snapshot::{SnapshotAnchor, compute_snapshot_content_hash},
86};
87
88#[cfg(feature = "persistence")]
89mod catalog;
90
91#[cfg(feature = "persistence")]
92pub use catalog::ParquetReplayCatalog;
93
94/// Summary of a cache snapshot-tail replay.
95#[derive(Clone, Debug, PartialEq, Eq)]
96pub struct CacheReplayReport {
97    /// Replay bounds derived from the latest snapshot anchor.
98    pub plan: SnapshotReplayPlan,
99    /// Number of entries applied to cache state.
100    pub applied_entries: usize,
101    /// Number of event-store entries that do not have a cache replay rule yet.
102    pub ignored_entries: usize,
103}
104
105/// Summary of an event-store replay source and cache restore.
106#[derive(Clone, Debug, PartialEq, Eq)]
107pub struct EventStoreReplayReport {
108    /// Manifest of the sealed replay source.
109    pub manifest: RunManifest,
110    /// Cache snapshot-tail replay result.
111    pub cache: CacheReplayReport,
112}
113
114#[cfg(test)]
115pub(crate) const CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES: &[&str] = &[
116    PAYLOAD_TYPE_SUBMIT_ORDER_LIST,
117    PAYLOAD_TYPE_ACCOUNT_STATE,
118    PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
119    PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
120    PAYLOAD_TYPE_QUOTES_RESPONSE,
121    PAYLOAD_TYPE_TRADES_RESPONSE,
122    PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
123    PAYLOAD_TYPE_BARS_RESPONSE,
124    PAYLOAD_TYPE_ORDER_INITIALIZED,
125    PAYLOAD_TYPE_ORDER_DENIED,
126    PAYLOAD_TYPE_ORDER_EMULATED,
127    PAYLOAD_TYPE_ORDER_RELEASED,
128    PAYLOAD_TYPE_ORDER_SUBMITTED,
129    PAYLOAD_TYPE_ORDER_ACCEPTED,
130    PAYLOAD_TYPE_ORDER_REJECTED,
131    PAYLOAD_TYPE_ORDER_CANCELED,
132    PAYLOAD_TYPE_ORDER_EXPIRED,
133    PAYLOAD_TYPE_ORDER_TRIGGERED,
134    PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
135    PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
136    PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
137    PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
138    PAYLOAD_TYPE_ORDER_UPDATED,
139    PAYLOAD_TYPE_ORDER_FILLED,
140    PAYLOAD_TYPE_POSITION_OPENED,
141    PAYLOAD_TYPE_POSITION_CHANGED,
142    PAYLOAD_TYPE_POSITION_CLOSED,
143    PAYLOAD_TYPE_POSITION_ADJUSTED,
144];
145
146#[cfg(test)]
147pub(crate) const FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES: &[&str] = &[
148    PAYLOAD_TYPE_SUBMIT_ORDER,
149    PAYLOAD_TYPE_MODIFY_ORDER,
150    PAYLOAD_TYPE_CANCEL_ORDER,
151    PAYLOAD_TYPE_CANCEL_ALL_ORDERS,
152    PAYLOAD_TYPE_BATCH_CANCEL_ORDERS,
153    PAYLOAD_TYPE_QUERY_ORDER,
154    PAYLOAD_TYPE_QUERY_ACCOUNT,
155    PAYLOAD_TYPE_ORDER_STATUS_REPORT,
156    PAYLOAD_TYPE_FILL_REPORT,
157    PAYLOAD_TYPE_ORDER_WITH_FILLS,
158    PAYLOAD_TYPE_POSITION_STATUS_REPORT,
159    PAYLOAD_TYPE_EXECUTION_MASS_STATUS,
160    PAYLOAD_TYPE_TIME_EVENT,
161    PAYLOAD_TYPE_REQUEST_COMMAND,
162    PAYLOAD_TYPE_SUBSCRIBE_COMMAND,
163    PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND,
164    #[cfg(feature = "defi")]
165    PAYLOAD_TYPE_DEFI_REQUEST_COMMAND,
166    #[cfg(feature = "defi")]
167    PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND,
168    #[cfg(feature = "defi")]
169    PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND,
170    PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE,
171    PAYLOAD_TYPE_BOOK_RESPONSE,
172    PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
173    PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE,
174    PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE,
175];
176
177/// Inclusive event-store `seq` bounds for replay input scans.
178#[derive(Clone, Copy, Debug, PartialEq, Eq)]
179pub struct ReplaySeqRange {
180    /// First event-store `seq` to scan.
181    pub from_seq: u64,
182    /// Last event-store `seq` to scan.
183    pub to_seq: u64,
184}
185
186impl ReplaySeqRange {
187    /// Builds inclusive event-store `seq` bounds.
188    #[must_use]
189    pub const fn new(from_seq: u64, to_seq: u64) -> Self {
190        Self { from_seq, to_seq }
191    }
192}
193
194/// Inclusive nanosecond time bounds for catalog slice selection.
195#[derive(Clone, Copy, Debug, PartialEq, Eq)]
196pub struct ReplayTimeRange {
197    /// First catalog timestamp to include.
198    pub start: UnixNanos,
199    /// Last catalog timestamp to include.
200    pub end: UnixNanos,
201}
202
203impl ReplayTimeRange {
204    /// Builds inclusive nanosecond time bounds.
205    #[must_use]
206    pub const fn new(start: UnixNanos, end: UnixNanos) -> Self {
207        Self { start, end }
208    }
209
210    fn from_entry(entry: &EventStoreEntry) -> Self {
211        Self {
212            start: entry.ts_init,
213            end: entry.ts_init,
214        }
215    }
216
217    fn include_entry(&mut self, entry: &EventStoreEntry) {
218        self.start = self.start.min(entry.ts_init);
219        self.end = self.end.max(entry.ts_init);
220    }
221}
222
223/// Caller-selected data catalog slice before replay window defaults are applied.
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct CatalogSliceSelector {
226    /// Catalog data class or directory name, such as `quotes`, `trades`, or `bars`.
227    pub data_cls: String,
228    /// Optional catalog identifiers, such as instrument IDs or bar type strings.
229    pub identifiers: Vec<String>,
230    /// Optional lower timestamp bound. When absent, the event-store scan lower bound applies.
231    pub start: Option<UnixNanos>,
232    /// Optional upper timestamp bound. When absent, the event-store scan upper bound applies.
233    pub end: Option<UnixNanos>,
234    /// Whether loading should fail when the catalog reports no files for this slice.
235    pub required: bool,
236}
237
238impl CatalogSliceSelector {
239    /// Builds a selector for `data_cls` with no identifiers or explicit time bounds.
240    pub fn new(data_cls: impl Into<String>) -> Self {
241        Self {
242            data_cls: data_cls.into(),
243            identifiers: Vec::new(),
244            start: None,
245            end: None,
246            required: false,
247        }
248    }
249
250    /// Adds one catalog identifier to the selector.
251    #[must_use]
252    pub fn with_identifier(mut self, identifier: impl Into<String>) -> Self {
253        self.identifiers.push(identifier.into());
254        self
255    }
256
257    /// Sets explicit inclusive catalog time bounds.
258    #[must_use]
259    pub const fn with_time_bounds(mut self, start: UnixNanos, end: UnixNanos) -> Self {
260        self.start = Some(start);
261        self.end = Some(end);
262        self
263    }
264
265    /// Marks the selector as required.
266    #[must_use]
267    pub const fn require_coverage(mut self) -> Self {
268        self.required = true;
269        self
270    }
271}
272
273/// Resolved catalog query after replay time bounds have been applied.
274#[derive(Clone, Debug, PartialEq, Eq)]
275pub struct CatalogSliceQuery {
276    /// Catalog data class or directory name, such as `quotes`, `trades`, or `bars`.
277    pub data_cls: String,
278    /// Catalog identifiers, such as instrument IDs or bar type strings.
279    pub identifiers: Vec<String>,
280    /// Inclusive lower timestamp bound.
281    pub start: UnixNanos,
282    /// Inclusive upper timestamp bound.
283    pub end: UnixNanos,
284    /// Whether loading should fail when the catalog reports no files for this slice.
285    pub required: bool,
286}
287
288impl CatalogSliceQuery {
289    /// Returns identifiers in the shape expected by catalog APIs.
290    #[must_use]
291    pub fn identifiers_option(&self) -> Option<Vec<String>> {
292        if self.identifiers.is_empty() {
293            None
294        } else {
295            Some(self.identifiers.clone())
296        }
297    }
298}
299
300/// Catalog file and interval coverage for a planned slice.
301#[derive(Clone, Debug, Default, PartialEq, Eq)]
302pub struct CatalogSliceCoverage {
303    /// Catalog files selected for the slice.
304    pub files: Vec<String>,
305    /// Covered timestamp intervals reported by the catalog.
306    pub intervals: Vec<ReplayTimeRange>,
307}
308
309impl CatalogSliceCoverage {
310    /// Builds coverage from selected catalog files.
311    #[must_use]
312    pub fn from_files(files: Vec<String>) -> Self {
313        Self {
314            files,
315            intervals: Vec::new(),
316        }
317    }
318
319    /// Returns whether the catalog found no files for the slice.
320    #[must_use]
321    pub fn is_missing(&self) -> bool {
322        self.files.is_empty()
323    }
324}
325
326/// Planned catalog slice joined to a replay input scan.
327#[derive(Clone, Debug, PartialEq, Eq)]
328pub struct CatalogSlicePlan {
329    /// Resolved catalog query.
330    pub query: CatalogSliceQuery,
331    /// Catalog coverage reported during planning.
332    pub coverage: CatalogSliceCoverage,
333}
334
335impl CatalogSlicePlan {
336    /// Returns whether the catalog reported no files for this slice.
337    #[must_use]
338    pub fn is_missing(&self) -> bool {
339        self.coverage.is_missing()
340    }
341}
342
343/// Typed catalog data loaded for replay context.
344#[derive(Clone, Copy, Debug, PartialEq, Eq)]
345pub enum CatalogReplayData {
346    /// Quote tick loaded from the `quotes` catalog.
347    Quote(QuoteTick),
348    /// Trade tick loaded from the `trades` catalog.
349    Trade(TradeTick),
350    /// Bar loaded from the `bars` catalog.
351    Bar(Bar),
352}
353
354impl CatalogReplayData {
355    /// Returns the catalog data class for this record.
356    #[must_use]
357    pub const fn data_cls(&self) -> &'static str {
358        match self {
359            Self::Quote(_) => "quotes",
360            Self::Trade(_) => "trades",
361            Self::Bar(_) => "bars",
362        }
363    }
364
365    /// Returns the catalog identifier for this record.
366    #[must_use]
367    pub fn identifier(&self) -> String {
368        match self {
369            Self::Quote(quote) => quote.instrument_id.to_string(),
370            Self::Trade(trade) => trade.instrument_id.to_string(),
371            Self::Bar(bar) => bar.bar_type.to_string(),
372        }
373    }
374
375    /// Returns the initialization timestamp for this record.
376    #[must_use]
377    pub const fn ts_init(&self) -> UnixNanos {
378        match self {
379            Self::Quote(quote) => quote.ts_init,
380            Self::Trade(trade) => trade.ts_init,
381            Self::Bar(bar) => bar.ts_init,
382        }
383    }
384}
385
386impl From<QuoteTick> for CatalogReplayData {
387    fn from(value: QuoteTick) -> Self {
388        Self::Quote(value)
389    }
390}
391
392impl From<TradeTick> for CatalogReplayData {
393    fn from(value: TradeTick) -> Self {
394        Self::Trade(value)
395    }
396}
397
398impl From<Bar> for CatalogReplayData {
399    fn from(value: Bar) -> Self {
400        Self::Bar(value)
401    }
402}
403
404/// Catalog record loaded for replay context.
405#[derive(Clone, Debug, PartialEq, Eq)]
406pub struct CatalogReplayRecord {
407    /// Catalog data class or directory name for the record.
408    pub data_cls: String,
409    /// Optional catalog identifier for the record.
410    pub identifier: Option<String>,
411    /// Record timestamp used for contextual joins.
412    pub ts_init: UnixNanos,
413    /// Typed catalog data loaded for contextual analysis.
414    pub data: CatalogReplayData,
415}
416
417impl CatalogReplayRecord {
418    /// Builds a typed catalog replay record.
419    #[must_use]
420    pub fn from_data(data: CatalogReplayData) -> Self {
421        Self {
422            data_cls: data.data_cls().to_string(),
423            identifier: Some(data.identifier()),
424            ts_init: data.ts_init(),
425            data,
426        }
427    }
428}
429
430/// Loaded catalog records for one planned slice.
431#[derive(Clone, Debug, PartialEq, Eq)]
432pub struct CatalogReplaySlice {
433    /// Planned catalog slice metadata.
434    pub plan: CatalogSlicePlan,
435    /// Loaded catalog records.
436    pub records: Vec<CatalogReplayRecord>,
437}
438
439/// Planned replay inputs for an event-store scan with optional catalog context.
440#[derive(Clone, Debug, PartialEq, Eq)]
441pub struct ReplayInputPlan {
442    /// Requested event-store `seq` bounds.
443    pub requested_range: ReplaySeqRange,
444    /// Actual event-store range found inside the requested bounds.
445    pub event_range: Option<ReplaySeqRange>,
446    /// Number of event-store entries found inside the requested bounds.
447    pub event_count: usize,
448    /// Minimum and maximum event-store `ts_init` values inside the requested bounds.
449    pub event_time_range: Option<ReplayTimeRange>,
450    /// Catalog slices joined to the event-store scan.
451    pub catalog_slices: Vec<CatalogSlicePlan>,
452}
453
454impl ReplayInputPlan {
455    /// Returns all catalog slices that had no selected files.
456    #[must_use]
457    pub fn missing_catalog_slices(&self) -> Vec<&CatalogSlicePlan> {
458        self.catalog_slices
459            .iter()
460            .filter(|slice| slice.is_missing())
461            .collect()
462    }
463}
464
465/// Loaded replay inputs with event-store entries and optional catalog context.
466#[derive(Clone, Debug, PartialEq, Eq)]
467pub struct ReplayInputs {
468    /// Event-store entries in durable `seq` order.
469    pub entries: Vec<EventStoreEntry>,
470    /// Catalog slices loaded as contextual input.
471    pub catalog_slices: Vec<CatalogReplaySlice>,
472}
473
474/// Read-only catalog source used by catalog-joined replay input loaders.
475pub trait ReplayCatalog {
476    /// Catalog-specific error type.
477    type Error: Display;
478
479    /// Plans one catalog slice without mutating catalog state.
480    ///
481    /// # Errors
482    ///
483    /// Returns the catalog implementation's error when slice planning fails.
484    fn plan_slice(
485        &mut self,
486        query: &CatalogSliceQuery,
487    ) -> Result<CatalogSliceCoverage, Self::Error>;
488
489    /// Loads records for one planned catalog slice without live venue access.
490    ///
491    /// Implementations return records in catalog order so marker cursor joins can take a
492    /// deterministic prefix from a cumulative stream slice.
493    ///
494    /// # Errors
495    ///
496    /// Returns the catalog implementation's error when slice loading fails.
497    fn load_slice(
498        &mut self,
499        plan: &CatalogSlicePlan,
500    ) -> Result<Vec<CatalogReplayRecord>, Self::Error>;
501}
502
503/// Errors surfaced while planning or loading replay inputs.
504#[derive(Debug, thiserror::Error)]
505pub enum ReplayInputError {
506    /// The event-store reader failed.
507    #[error(transparent)]
508    EventStore(#[from] EventStoreError),
509    /// The requested event-store `seq` range is invalid.
510    #[error("invalid replay seq range {from_seq}..={to_seq}: {message}")]
511    InvalidSeqRange {
512        /// Requested lower `seq`.
513        from_seq: u64,
514        /// Requested upper `seq`.
515        to_seq: u64,
516        /// Validation failure.
517        message: String,
518    },
519    /// A catalog-joined replay plan had no selected catalog slices.
520    #[error("catalog replay requires at least one selected catalog slice")]
521    EmptyCatalogSelection,
522    /// A catalog slice needs time bounds, but neither selector nor event-store scan supplied them.
523    #[error(
524        "catalog slice {data_cls} requires explicit time bounds because the replay scan is empty"
525    )]
526    MissingCatalogTimeBounds {
527        /// Catalog data class or directory name.
528        data_cls: String,
529    },
530    /// A catalog slice has an invalid timestamp range.
531    #[error("invalid catalog time range for {data_cls}: {start}..={end}")]
532    InvalidCatalogTimeRange {
533        /// Catalog data class or directory name.
534        data_cls: String,
535        /// Lower timestamp bound.
536        start: u64,
537        /// Upper timestamp bound.
538        end: u64,
539    },
540    /// A required catalog slice had no files.
541    #[error("required catalog slice {data_cls} is missing for identifiers {identifiers:?}")]
542    MissingCatalogSlice {
543        /// Catalog data class or directory name.
544        data_cls: String,
545        /// Catalog identifiers.
546        identifiers: Vec<String>,
547    },
548    /// The catalog source failed.
549    #[error("catalog slice {data_cls}: {message}")]
550    Catalog {
551        /// Catalog data class or directory name.
552        data_cls: String,
553        /// Catalog error message.
554        message: String,
555    },
556}
557
558/// Errors surfaced while restoring a cache snapshot tail.
559#[derive(Debug, thiserror::Error)]
560pub enum CacheReplayError {
561    /// The event-store reader failed.
562    #[error(transparent)]
563    EventStore(#[from] EventStoreError),
564    /// The caller-provided snapshot restore hook failed.
565    #[error("restore cache snapshot {blob_ref}: {message}")]
566    SnapshotRestore {
567        /// Cache-owned snapshot blob reference.
568        blob_ref: String,
569        /// Error message returned by the restore hook.
570        message: String,
571    },
572    /// The replay scan yielded an entry outside the derived restore bounds.
573    #[error("entry seq {seq} is before replay start seq {from_seq}")]
574    UnexpectedSeq {
575        /// Entry sequence yielded by the scan.
576        seq: u64,
577        /// First sequence this replay is allowed to apply.
578        from_seq: u64,
579    },
580    /// A captured payload failed to decode.
581    #[error("decode seq {seq} payload_type {payload_type}: {message}")]
582    Decode {
583        /// Event-store sequence number.
584        seq: u64,
585        /// Captured payload type tag.
586        payload_type: String,
587        /// Decode error message.
588        message: String,
589    },
590    /// Applying a decoded payload to the cache failed.
591    #[error("apply seq {seq} payload_type {payload_type}: {message}")]
592    Apply {
593        /// Event-store sequence number.
594        seq: u64,
595        /// Captured payload type tag.
596        payload_type: String,
597        /// Apply error message.
598        message: String,
599    },
600}
601
602impl CacheReplayError {
603    /// Builds a snapshot-restore error for `anchor`.
604    #[must_use]
605    pub fn snapshot_restore(anchor: &SnapshotAnchor, error: impl Display) -> Self {
606        Self::SnapshotRestore {
607            blob_ref: anchor.blob_ref.clone(),
608            message: error.to_string(),
609        }
610    }
611}
612
613/// Replays the cache snapshot tail after the caller restores the cache-owned snapshot blob.
614///
615/// The restore hook runs before the tail iterator is consumed. When `anchor` is `Some`,
616/// the hook should fetch and apply the cache-owned blob identified by
617/// [`SnapshotAnchor::blob_ref`] and validate it against
618/// [`SnapshotAnchor::content_hash`]. When `anchor` is `None`, restore starts from
619/// event-store seq `1` and the hook may be a no-op.
620///
621/// This is a bootstrap path: it mutates cache state directly and never publishes replay
622/// entries to the live message bus.
623///
624/// # Errors
625///
626/// Returns [`CacheReplayError::EventStore`] when the reader fails, `restore_snapshot`'s
627/// error when the cache snapshot restore hook fails, [`CacheReplayError::Decode`] when
628/// a supported payload cannot be decoded, and [`CacheReplayError::Apply`] when the
629/// decoded payload cannot be applied to the cache.
630pub fn restore_cache_snapshot_and_replay_tail<B, F>(
631    cache: &mut Cache,
632    reader: &EventStoreReader<B>,
633    restore_snapshot: F,
634) -> Result<CacheReplayReport, CacheReplayError>
635where
636    B: EventStore,
637    F: FnOnce(&mut Cache, Option<&SnapshotAnchor>) -> Result<(), CacheReplayError>,
638{
639    let (plan, scan) = reader.scan_snapshot_replay_tail()?;
640    restore_snapshot(cache, plan.anchor.as_ref())?;
641
642    let mut applied_entries = 0;
643    let mut ignored_entries = 0;
644
645    for entry in scan {
646        let entry = entry?;
647
648        if entry.seq < plan.from_seq {
649            return Err(CacheReplayError::UnexpectedSeq {
650                seq: entry.seq,
651                from_seq: plan.from_seq,
652            });
653        }
654
655        if apply_cache_replay_entry(cache, &entry)? {
656            applied_entries += 1;
657        } else {
658            ignored_entries += 1;
659        }
660    }
661
662    Ok(CacheReplayReport {
663        plan,
664        applied_entries,
665        ignored_entries,
666    })
667}
668
669/// Replays the cache snapshot tail when the cache snapshot has already been restored.
670///
671/// This is a convenience wrapper for callers that load the cache-owned snapshot blob
672/// before entering the event-store replay path.
673///
674/// # Errors
675///
676/// See [`restore_cache_snapshot_and_replay_tail`].
677pub fn replay_cache_snapshot_tail<B>(
678    cache: &mut Cache,
679    reader: &EventStoreReader<B>,
680) -> Result<CacheReplayReport, CacheReplayError>
681where
682    B: EventStore,
683{
684    restore_cache_snapshot_and_replay_tail(cache, reader, |_, _| Ok(()))
685}
686
687/// Plans event-store-only forensics replay inputs.
688///
689/// The plan scans the requested range in durable `seq` order and records only summary
690/// metadata. Use [`load_forensics_replay_inputs`] to materialize the entries.
691///
692/// # Errors
693///
694/// Returns [`ReplayInputError::InvalidSeqRange`] when `range` is invalid and
695/// [`ReplayInputError::EventStore`] when the reader scan fails.
696pub fn plan_forensics_replay_inputs<B>(
697    reader: &EventStoreReader<B>,
698    range: ReplaySeqRange,
699) -> Result<ReplayInputPlan, ReplayInputError>
700where
701    B: EventStore,
702{
703    let span = collect_replay_entry_span(reader, range)?;
704    Ok(ReplayInputPlan {
705        requested_range: range,
706        event_range: span.event_range,
707        event_count: span.event_count,
708        event_time_range: span.time_range,
709        catalog_slices: Vec::new(),
710    })
711}
712
713/// Loads event-store-only forensics replay inputs.
714///
715/// Entries are returned in durable `seq` order. This function does not touch the data catalog,
716/// live venues, strategy code, reconciliation, or clocks.
717///
718/// # Errors
719///
720/// Returns [`ReplayInputError::EventStore`] when the reader scan fails.
721pub fn load_forensics_replay_inputs<B>(
722    reader: &EventStoreReader<B>,
723    plan: &ReplayInputPlan,
724) -> Result<ReplayInputs, ReplayInputError>
725where
726    B: EventStore,
727{
728    let entries = load_replay_entries(reader, plan.requested_range)?;
729    Ok(ReplayInputs {
730        entries,
731        catalog_slices: Vec::new(),
732    })
733}
734
735/// Plans replay inputs by joining event-store entries with selected catalog slices.
736///
737/// The event-store range supplies durable replay order. Catalog slices are contextual input
738/// selected by the caller; their timestamps bound data lookup but never replace `seq` ordering.
739///
740/// # Errors
741///
742/// Returns [`ReplayInputError::EmptyCatalogSelection`] when no catalog slices are selected,
743/// [`ReplayInputError::InvalidSeqRange`] when `range` is invalid,
744/// [`ReplayInputError::MissingCatalogTimeBounds`] when an unbounded selector cannot inherit
745/// bounds from an empty event-store scan, [`ReplayInputError::InvalidCatalogTimeRange`] when a
746/// resolved slice has `start > end`, [`ReplayInputError::Catalog`] when catalog planning fails,
747/// and [`ReplayInputError::EventStore`] when the reader scan fails.
748pub fn plan_catalog_replay_inputs<B, C>(
749    reader: &EventStoreReader<B>,
750    catalog: &mut C,
751    range: ReplaySeqRange,
752    catalog_slices: &[CatalogSliceSelector],
753) -> Result<ReplayInputPlan, ReplayInputError>
754where
755    B: EventStore,
756    C: ReplayCatalog,
757{
758    plan_catalog_joined_replay_inputs(reader, catalog, range, catalog_slices)
759}
760
761/// Loads catalog replay inputs from an existing plan.
762///
763/// Event-store entries are returned in durable `seq` order. Catalog records are loaded through
764/// the caller-provided catalog source only; this function does not query live venues or run engine
765/// logic.
766///
767/// # Errors
768///
769/// Returns [`ReplayInputError::MissingCatalogSlice`] when a required slice is missing,
770/// [`ReplayInputError::Catalog`] when catalog loading fails, and
771/// [`ReplayInputError::EventStore`] when the reader scan fails.
772pub fn load_catalog_replay_inputs<B, C>(
773    reader: &EventStoreReader<B>,
774    catalog: &mut C,
775    plan: &ReplayInputPlan,
776) -> Result<ReplayInputs, ReplayInputError>
777where
778    B: EventStore,
779    C: ReplayCatalog,
780{
781    load_catalog_joined_replay_inputs(reader, catalog, plan)
782}
783
784/// Restores cache state from a sealed run without publishing to the bus or touching live venues.
785///
786/// The loader opens `<base_dir>/<instance_id>/<run_id>.redb` through the sealed-run reader path,
787/// rejects quarantined sources, restores the cache-owned snapshot blob when an anchor exists, and
788/// applies the event-store tail in `seq` order. It does not open adapters, reconcile against a
789/// venue, submit new entries, or query the data catalog.
790///
791/// # Errors
792///
793/// Returns [`CacheReplayError::EventStore`] when the run is missing, not sealed, quarantined, or
794/// unreadable; see [`restore_cache_snapshot_and_replay_tail`] for snapshot, decode, and apply
795/// failures.
796pub fn restore_cache_from_sealed_run(
797    cache: &mut Cache,
798    base_dir: impl Into<PathBuf>,
799    instance_id: &str,
800    run_id: &str,
801) -> Result<EventStoreReplayReport, CacheReplayError> {
802    let (manifest, reader) = open_event_store_replay_source(base_dir, instance_id, run_id)?;
803    let cache_report =
804        restore_cache_snapshot_and_replay_tail(cache, &reader, restore_cache_snapshot_blob)?;
805
806    Ok(EventStoreReplayReport {
807        manifest,
808        cache: cache_report,
809    })
810}
811
812/// Opens a sealed run for replay without touching live venues.
813///
814/// # Errors
815///
816/// Returns [`CacheReplayError::EventStore`] when the run is missing, not sealed, quarantined, or
817/// unreadable.
818pub fn open_event_store_replay_source(
819    base_dir: impl Into<PathBuf>,
820    instance_id: &str,
821    run_id: &str,
822) -> Result<(RunManifest, EventStoreReader<RedbBackend>), CacheReplayError> {
823    let backend = RedbBackend::open_sealed(base_dir, instance_id, run_id)?;
824    let manifest = backend.manifest()?;
825    reject_quarantined_replay_source(run_id, manifest.status)?;
826    Ok((manifest, EventStoreReader::new(backend)))
827}
828
829/// Validates that a configured replay source exists, is sealed, and is not quarantined.
830///
831/// # Errors
832///
833/// Returns [`CacheReplayError::EventStore`] when the run is missing, not sealed, quarantined, or
834/// unreadable.
835pub fn validate_event_store_replay_source(
836    base_dir: impl Into<PathBuf>,
837    instance_id: &str,
838    run_id: &str,
839) -> Result<RunManifest, CacheReplayError> {
840    let backend = RedbBackend::open_sealed(base_dir, instance_id, run_id)?;
841    let manifest = backend.manifest()?;
842    reject_quarantined_replay_source(run_id, manifest.status)?;
843    Ok(manifest)
844}
845
846#[derive(Clone, Copy, Debug, PartialEq, Eq)]
847struct ReplayEntrySpan {
848    event_range: Option<ReplaySeqRange>,
849    event_count: usize,
850    time_range: Option<ReplayTimeRange>,
851}
852
853fn plan_catalog_joined_replay_inputs<B, C>(
854    reader: &EventStoreReader<B>,
855    catalog: &mut C,
856    range: ReplaySeqRange,
857    catalog_slices: &[CatalogSliceSelector],
858) -> Result<ReplayInputPlan, ReplayInputError>
859where
860    B: EventStore,
861    C: ReplayCatalog,
862{
863    if catalog_slices.is_empty() {
864        return Err(ReplayInputError::EmptyCatalogSelection);
865    }
866
867    let span = collect_replay_entry_span(reader, range)?;
868    let catalog_slices = plan_catalog_slices(catalog, catalog_slices, span.time_range)?;
869
870    Ok(ReplayInputPlan {
871        requested_range: range,
872        event_range: span.event_range,
873        event_count: span.event_count,
874        event_time_range: span.time_range,
875        catalog_slices,
876    })
877}
878
879fn load_catalog_joined_replay_inputs<B, C>(
880    reader: &EventStoreReader<B>,
881    catalog: &mut C,
882    plan: &ReplayInputPlan,
883) -> Result<ReplayInputs, ReplayInputError>
884where
885    B: EventStore,
886    C: ReplayCatalog,
887{
888    let entries = load_replay_entries(reader, plan.requested_range)?;
889    let catalog_slices = load_catalog_slices(catalog, &plan.catalog_slices)?;
890
891    Ok(ReplayInputs {
892        entries,
893        catalog_slices,
894    })
895}
896
897fn collect_replay_entry_span<B>(
898    reader: &EventStoreReader<B>,
899    range: ReplaySeqRange,
900) -> Result<ReplayEntrySpan, ReplayInputError>
901where
902    B: EventStore,
903{
904    validate_seq_range(range)?;
905
906    let mut first_seq = None;
907    let mut last_seq = None;
908    let mut event_count = 0;
909    let mut time_range: Option<ReplayTimeRange> = None;
910
911    for entry in reader.scan_range(range.from_seq, range.to_seq, ScanDirection::Forward) {
912        let entry = entry?;
913        first_seq.get_or_insert(entry.seq);
914        last_seq = Some(entry.seq);
915        event_count += 1;
916
917        match time_range.as_mut() {
918            Some(bounds) => bounds.include_entry(&entry),
919            None => time_range = Some(ReplayTimeRange::from_entry(&entry)),
920        }
921    }
922
923    let event_range = match (first_seq, last_seq) {
924        (Some(from_seq), Some(to_seq)) => Some(ReplaySeqRange::new(from_seq, to_seq)),
925        _ => None,
926    };
927
928    Ok(ReplayEntrySpan {
929        event_range,
930        event_count,
931        time_range,
932    })
933}
934
935fn load_replay_entries<B>(
936    reader: &EventStoreReader<B>,
937    range: ReplaySeqRange,
938) -> Result<Vec<EventStoreEntry>, ReplayInputError>
939where
940    B: EventStore,
941{
942    validate_seq_range(range)?;
943
944    reader
945        .scan_range(range.from_seq, range.to_seq, ScanDirection::Forward)
946        .collect::<Result<Vec<_>, _>>()
947        .map_err(ReplayInputError::from)
948}
949
950fn plan_catalog_slices<C>(
951    catalog: &mut C,
952    selectors: &[CatalogSliceSelector],
953    event_time_range: Option<ReplayTimeRange>,
954) -> Result<Vec<CatalogSlicePlan>, ReplayInputError>
955where
956    C: ReplayCatalog,
957{
958    let mut plans = Vec::with_capacity(selectors.len());
959
960    for selector in selectors {
961        let query = resolve_catalog_slice_query(selector, event_time_range)?;
962        let coverage = catalog
963            .plan_slice(&query)
964            .map_err(|e| ReplayInputError::Catalog {
965                data_cls: query.data_cls.clone(),
966                message: e.to_string(),
967            })?;
968        plans.push(CatalogSlicePlan { query, coverage });
969    }
970
971    Ok(plans)
972}
973
974fn load_catalog_slices<C>(
975    catalog: &mut C,
976    plans: &[CatalogSlicePlan],
977) -> Result<Vec<CatalogReplaySlice>, ReplayInputError>
978where
979    C: ReplayCatalog,
980{
981    let mut slices = Vec::with_capacity(plans.len());
982
983    for plan in plans {
984        if plan.is_missing() {
985            if plan.query.required {
986                return Err(ReplayInputError::MissingCatalogSlice {
987                    data_cls: plan.query.data_cls.clone(),
988                    identifiers: plan.query.identifiers.clone(),
989                });
990            }
991
992            slices.push(CatalogReplaySlice {
993                plan: plan.clone(),
994                records: Vec::new(),
995            });
996            continue;
997        }
998
999        let records = catalog
1000            .load_slice(plan)
1001            .map_err(|e| ReplayInputError::Catalog {
1002                data_cls: plan.query.data_cls.clone(),
1003                message: e.to_string(),
1004            })?;
1005        slices.push(CatalogReplaySlice {
1006            plan: plan.clone(),
1007            records,
1008        });
1009    }
1010
1011    Ok(slices)
1012}
1013
1014fn resolve_catalog_slice_query(
1015    selector: &CatalogSliceSelector,
1016    event_time_range: Option<ReplayTimeRange>,
1017) -> Result<CatalogSliceQuery, ReplayInputError> {
1018    let Some(start) = selector
1019        .start
1020        .or(event_time_range.map(|bounds| bounds.start))
1021    else {
1022        return Err(ReplayInputError::MissingCatalogTimeBounds {
1023            data_cls: selector.data_cls.clone(),
1024        });
1025    };
1026    let Some(end) = selector.end.or(event_time_range.map(|bounds| bounds.end)) else {
1027        return Err(ReplayInputError::MissingCatalogTimeBounds {
1028            data_cls: selector.data_cls.clone(),
1029        });
1030    };
1031
1032    if start > end {
1033        return Err(ReplayInputError::InvalidCatalogTimeRange {
1034            data_cls: selector.data_cls.clone(),
1035            start: start.as_u64(),
1036            end: end.as_u64(),
1037        });
1038    }
1039
1040    Ok(CatalogSliceQuery {
1041        data_cls: selector.data_cls.clone(),
1042        identifiers: selector.identifiers.clone(),
1043        start,
1044        end,
1045        required: selector.required,
1046    })
1047}
1048
1049fn validate_seq_range(range: ReplaySeqRange) -> Result<(), ReplayInputError> {
1050    if range.from_seq == 0 {
1051        return Err(ReplayInputError::InvalidSeqRange {
1052            from_seq: range.from_seq,
1053            to_seq: range.to_seq,
1054            message: "seq is 1-based".to_string(),
1055        });
1056    }
1057
1058    if range.from_seq > range.to_seq {
1059        return Err(ReplayInputError::InvalidSeqRange {
1060            from_seq: range.from_seq,
1061            to_seq: range.to_seq,
1062            message: "from_seq exceeds to_seq".to_string(),
1063        });
1064    }
1065
1066    Ok(())
1067}
1068
1069/// Restores the cache-owned snapshot blob identified by `anchor`.
1070///
1071/// # Errors
1072///
1073/// Returns [`CacheReplayError::SnapshotRestore`] when the blob is missing, fails to load, fails its
1074/// content hash check, or fails to restore into the cache.
1075pub fn restore_cache_snapshot_blob(
1076    cache: &mut Cache,
1077    anchor: Option<&SnapshotAnchor>,
1078) -> Result<(), CacheReplayError> {
1079    let Some(anchor) = anchor else {
1080        return Ok(());
1081    };
1082
1083    let blob = cache
1084        .load_snapshot_blob(&anchor.blob_ref)
1085        .map_err(|e| CacheReplayError::snapshot_restore(anchor, e))?
1086        .ok_or_else(|| CacheReplayError::snapshot_restore(anchor, "snapshot blob not found"))?;
1087    let actual_hash = compute_snapshot_content_hash(blob.as_ref());
1088
1089    if actual_hash != anchor.content_hash {
1090        return Err(CacheReplayError::snapshot_restore(
1091            anchor,
1092            format!(
1093                "content_hash mismatch: expected {}, actual {actual_hash}",
1094                anchor.content_hash
1095            ),
1096        ));
1097    }
1098
1099    cache
1100        .restore_snapshot_blob(&anchor.blob_ref, blob)
1101        .map_err(|e| CacheReplayError::snapshot_restore(anchor, e))
1102}
1103
1104/// Applies one event-store entry to cache state when a replay rule exists.
1105///
1106/// Returns `Ok(true)` when the entry changed cache state and `Ok(false)` when the
1107/// payload is outside the current cache bootstrap replay surface.
1108///
1109/// # Errors
1110///
1111/// Returns [`CacheReplayError::Decode`] when a supported payload cannot be decoded and
1112/// [`CacheReplayError::Apply`] when the decoded payload cannot be applied to the cache.
1113pub fn apply_cache_replay_entry(
1114    cache: &mut Cache,
1115    entry: &EventStoreEntry,
1116) -> Result<bool, CacheReplayError> {
1117    if apply_complete_cache_payload_entry(cache, entry)? {
1118        return Ok(true);
1119    }
1120
1121    match entry.payload_type.as_str() {
1122        PAYLOAD_TYPE_ACCOUNT_STATE => {
1123            let state = decode_payload::<AccountState>(entry)?;
1124            apply_result(entry, cache.update_account_state(&state))?;
1125        }
1126        PAYLOAD_TYPE_ORDER_INITIALIZED => {
1127            let event = decode_order_event::<OrderInitialized>(entry, OrderEventAny::Initialized)?;
1128            let order = OrderAny::from_events(vec![event]).map_err(|e| apply_error(entry, e))?;
1129            apply_result(entry, cache.add_order(order, None, None, false))?;
1130        }
1131        PAYLOAD_TYPE_ORDER_DENIED => {
1132            apply_order_event(cache, entry, OrderEventAny::Denied)?;
1133        }
1134        PAYLOAD_TYPE_ORDER_EMULATED => {
1135            apply_order_event(cache, entry, OrderEventAny::Emulated)?;
1136        }
1137        PAYLOAD_TYPE_ORDER_RELEASED => {
1138            apply_order_event(cache, entry, OrderEventAny::Released)?;
1139        }
1140        PAYLOAD_TYPE_ORDER_SUBMITTED => {
1141            apply_order_event(cache, entry, OrderEventAny::Submitted)?;
1142        }
1143        PAYLOAD_TYPE_ORDER_ACCEPTED => {
1144            apply_order_event(cache, entry, OrderEventAny::Accepted)?;
1145        }
1146        PAYLOAD_TYPE_ORDER_REJECTED => {
1147            apply_order_event(cache, entry, OrderEventAny::Rejected)?;
1148        }
1149        PAYLOAD_TYPE_ORDER_CANCELED => {
1150            apply_order_event(cache, entry, OrderEventAny::Canceled)?;
1151        }
1152        PAYLOAD_TYPE_ORDER_EXPIRED => {
1153            apply_order_event(cache, entry, OrderEventAny::Expired)?;
1154        }
1155        PAYLOAD_TYPE_ORDER_TRIGGERED => {
1156            apply_order_event(cache, entry, OrderEventAny::Triggered)?;
1157        }
1158        PAYLOAD_TYPE_ORDER_PENDING_UPDATE => {
1159            apply_order_event(cache, entry, OrderEventAny::PendingUpdate)?;
1160        }
1161        PAYLOAD_TYPE_ORDER_PENDING_CANCEL => {
1162            apply_order_event(cache, entry, OrderEventAny::PendingCancel)?;
1163        }
1164        PAYLOAD_TYPE_ORDER_MODIFY_REJECTED => {
1165            apply_order_event(cache, entry, OrderEventAny::ModifyRejected)?;
1166        }
1167        PAYLOAD_TYPE_ORDER_CANCEL_REJECTED => {
1168            apply_order_event(cache, entry, OrderEventAny::CancelRejected)?;
1169        }
1170        PAYLOAD_TYPE_ORDER_UPDATED => {
1171            apply_order_event(cache, entry, OrderEventAny::Updated)?;
1172        }
1173        PAYLOAD_TYPE_ORDER_FILLED => {
1174            let fill = decode_payload::<OrderFilled>(entry)?;
1175            let event = OrderEventAny::Filled(fill);
1176            apply_result(entry, cache.update_order(&event))?;
1177            apply_fill_to_position(cache, entry, &fill)?;
1178        }
1179        PAYLOAD_TYPE_POSITION_OPENED => {
1180            let opened = decode_payload::<PositionOpened>(entry)?;
1181            apply_position_opened(cache, entry, &opened)?;
1182        }
1183        PAYLOAD_TYPE_POSITION_CHANGED => {
1184            let changed = decode_payload::<PositionChanged>(entry)?;
1185            apply_position_changed(cache, entry, &changed)?;
1186        }
1187        PAYLOAD_TYPE_POSITION_CLOSED => {
1188            let closed = decode_payload::<PositionClosed>(entry)?;
1189            apply_position_closed(cache, entry, &closed)?;
1190        }
1191        PAYLOAD_TYPE_POSITION_ADJUSTED => {
1192            let adjustment = decode_payload::<PositionAdjusted>(entry)?;
1193            apply_position_adjustment(cache, entry, adjustment)?;
1194        }
1195        _ => return Ok(false),
1196    }
1197
1198    Ok(true)
1199}
1200
1201fn apply_complete_cache_payload_entry(
1202    cache: &mut Cache,
1203    entry: &EventStoreEntry,
1204) -> Result<bool, CacheReplayError> {
1205    match entry.payload_type.as_str() {
1206        PAYLOAD_TYPE_SUBMIT_ORDER_LIST => {
1207            let command = decode_payload::<SubmitOrderList>(entry)?;
1208            apply_result(entry, cache.add_order_list(command.order_list))?;
1209        }
1210        PAYLOAD_TYPE_INSTRUMENT_RESPONSE => {
1211            let response = decode_payload::<InstrumentResponse>(entry)?;
1212            apply_result(entry, cache.add_instrument(response.data))?;
1213        }
1214        PAYLOAD_TYPE_INSTRUMENTS_RESPONSE => {
1215            let response = decode_payload::<InstrumentsResponse>(entry)?;
1216            for instrument in response.data {
1217                apply_result(entry, cache.add_instrument(instrument))?;
1218            }
1219        }
1220        PAYLOAD_TYPE_QUOTES_RESPONSE => {
1221            let response = decode_payload::<QuotesResponse>(entry)?;
1222            if !response.data.is_empty() {
1223                apply_result(entry, cache.add_quotes(&response.data))?;
1224            }
1225        }
1226        PAYLOAD_TYPE_TRADES_RESPONSE => {
1227            let response = decode_payload::<TradesResponse>(entry)?;
1228            if !response.data.is_empty() {
1229                apply_result(entry, cache.add_trades(&response.data))?;
1230            }
1231        }
1232        PAYLOAD_TYPE_FUNDING_RATES_RESPONSE => {
1233            let response = decode_payload::<FundingRatesResponse>(entry)?;
1234            if !response.data.is_empty() {
1235                apply_result(entry, cache.add_funding_rates(&response.data))?;
1236            }
1237        }
1238        PAYLOAD_TYPE_BARS_RESPONSE => {
1239            let response = decode_payload::<BarsResponse>(entry)?;
1240            if !response.data.is_empty() {
1241                apply_result(entry, cache.add_bars(&response.data))?;
1242            }
1243        }
1244        _ => return Ok(false),
1245    }
1246
1247    Ok(true)
1248}
1249
1250fn apply_order_event<T>(
1251    cache: &mut Cache,
1252    entry: &EventStoreEntry,
1253    wrap: impl FnOnce(T) -> OrderEventAny,
1254) -> Result<(), CacheReplayError>
1255where
1256    T: DeserializeOwned,
1257{
1258    let event = decode_order_event(entry, wrap)?;
1259    apply_result(entry, cache.update_order(&event))?;
1260    Ok(())
1261}
1262
1263fn decode_order_event<T>(
1264    entry: &EventStoreEntry,
1265    wrap: impl FnOnce(T) -> OrderEventAny,
1266) -> Result<OrderEventAny, CacheReplayError>
1267where
1268    T: DeserializeOwned,
1269{
1270    Ok(wrap(decode_payload(entry)?))
1271}
1272
1273fn apply_fill_to_position(
1274    cache: &mut Cache,
1275    entry: &EventStoreEntry,
1276    fill: &OrderFilled,
1277) -> Result<(), CacheReplayError> {
1278    let Some(position_id) = fill.position_id else {
1279        return Ok(());
1280    };
1281
1282    if let Some(mut position) = cache.position_owned(&position_id) {
1283        if position.trade_ids().contains(&fill.trade_id) {
1284            return Ok(());
1285        }
1286
1287        position.apply(fill);
1288        apply_result(entry, cache.update_position(&position))?;
1289        return Ok(());
1290    }
1291
1292    let Some(instrument) = cache.instrument(&fill.instrument_id).cloned() else {
1293        return Ok(());
1294    };
1295
1296    let position = Position::new(&instrument, *fill);
1297    apply_result(entry, cache.add_position(&position, OmsType::Unspecified))?;
1298    Ok(())
1299}
1300
1301fn apply_position_opened(
1302    cache: &mut Cache,
1303    entry: &EventStoreEntry,
1304    opened: &PositionOpened,
1305) -> Result<(), CacheReplayError> {
1306    let Some(mut position) = cache.position_owned(&opened.position_id) else {
1307        return Ok(());
1308    };
1309
1310    position.trader_id = opened.trader_id;
1311    position.strategy_id = opened.strategy_id;
1312    position.instrument_id = opened.instrument_id;
1313    position.id = opened.position_id;
1314    position.account_id = opened.account_id;
1315    position.opening_order_id = opened.opening_order_id;
1316    position.closing_order_id = None;
1317    position.entry = opened.entry;
1318    position.side = opened.side;
1319    position.signed_qty = opened.signed_qty;
1320    position.quantity = opened.quantity;
1321    position.peak_qty = opened.quantity;
1322    position.quote_currency = opened.currency;
1323    position.ts_opened = opened.ts_event;
1324    position.ts_last = opened.ts_event;
1325    position.ts_closed = None;
1326    position.duration_ns = 0;
1327    position.avg_px_open = opened.avg_px_open;
1328    position.avg_px_close = None;
1329    position.realized_return = 0.0;
1330
1331    apply_result(entry, cache.update_position(&position))?;
1332    Ok(())
1333}
1334
1335fn apply_position_changed(
1336    cache: &mut Cache,
1337    entry: &EventStoreEntry,
1338    changed: &PositionChanged,
1339) -> Result<(), CacheReplayError> {
1340    let Some(mut position) = cache.position_owned(&changed.position_id) else {
1341        return Ok(());
1342    };
1343
1344    position.trader_id = changed.trader_id;
1345    position.strategy_id = changed.strategy_id;
1346    position.instrument_id = changed.instrument_id;
1347    position.id = changed.position_id;
1348    position.account_id = changed.account_id;
1349    position.opening_order_id = changed.opening_order_id;
1350    position.entry = changed.entry;
1351    position.side = changed.side;
1352    position.signed_qty = changed.signed_qty;
1353    position.quantity = changed.quantity;
1354    position.peak_qty = changed.peak_quantity;
1355    position.quote_currency = changed.currency;
1356    position.ts_opened = changed.ts_opened;
1357    position.ts_last = changed.ts_event;
1358    position.ts_closed = None;
1359    position.avg_px_open = changed.avg_px_open;
1360    position.avg_px_close = changed.avg_px_close;
1361    position.realized_return = changed.realized_return;
1362    position.realized_pnl = changed.realized_pnl;
1363
1364    apply_result(entry, cache.update_position(&position))?;
1365    Ok(())
1366}
1367
1368fn apply_position_closed(
1369    cache: &mut Cache,
1370    entry: &EventStoreEntry,
1371    closed: &PositionClosed,
1372) -> Result<(), CacheReplayError> {
1373    let Some(mut position) = cache.position_owned(&closed.position_id) else {
1374        return Ok(());
1375    };
1376
1377    position.trader_id = closed.trader_id;
1378    position.strategy_id = closed.strategy_id;
1379    position.instrument_id = closed.instrument_id;
1380    position.id = closed.position_id;
1381    position.account_id = closed.account_id;
1382    position.opening_order_id = closed.opening_order_id;
1383    position.closing_order_id = closed.closing_order_id;
1384    position.entry = closed.entry;
1385    position.side = closed.side;
1386    position.signed_qty = closed.signed_qty;
1387    position.quantity = closed.quantity;
1388    position.peak_qty = closed.peak_quantity;
1389    position.quote_currency = closed.currency;
1390    position.ts_opened = closed.ts_opened;
1391    position.ts_last = closed.ts_event;
1392    position.ts_closed = closed.ts_closed;
1393    position.duration_ns = closed.duration;
1394    position.avg_px_open = closed.avg_px_open;
1395    position.avg_px_close = closed.avg_px_close;
1396    position.realized_return = closed.realized_return;
1397    position.realized_pnl = closed.realized_pnl;
1398
1399    apply_result(entry, cache.update_position(&position))?;
1400    Ok(())
1401}
1402
1403fn apply_position_adjustment(
1404    cache: &mut Cache,
1405    entry: &EventStoreEntry,
1406    adjustment: PositionAdjusted,
1407) -> Result<(), CacheReplayError> {
1408    let Some(mut position) = cache.position_owned(&adjustment.position_id) else {
1409        return Ok(());
1410    };
1411
1412    position.apply_adjustment(adjustment);
1413    apply_result(entry, cache.update_position(&position))?;
1414    Ok(())
1415}
1416
1417fn decode_payload<T>(entry: &EventStoreEntry) -> Result<T, CacheReplayError>
1418where
1419    T: DeserializeOwned,
1420{
1421    rmp_serde::from_slice(&entry.payload).map_err(|e| CacheReplayError::Decode {
1422        seq: entry.seq,
1423        payload_type: entry.payload_type.to_string(),
1424        message: e.to_string(),
1425    })
1426}
1427
1428fn apply_result<T, E>(entry: &EventStoreEntry, result: Result<T, E>) -> Result<T, CacheReplayError>
1429where
1430    E: Display,
1431{
1432    result.map_err(|e| apply_error(entry, e))
1433}
1434
1435fn apply_error(entry: &EventStoreEntry, error: impl Display) -> CacheReplayError {
1436    CacheReplayError::Apply {
1437        seq: entry.seq,
1438        payload_type: entry.payload_type.to_string(),
1439        message: error.to_string(),
1440    }
1441}
1442
1443fn reject_quarantined_replay_source(
1444    run_id: &str,
1445    status: RunStatus,
1446) -> Result<(), CacheReplayError> {
1447    if matches!(status, RunStatus::Quarantined) {
1448        let error = EventStoreError::Backend(format!("replay source {run_id} is quarantined"));
1449        return Err(CacheReplayError::from(error));
1450    }
1451
1452    Ok(())
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457    use std::{any::Any, cell::Cell, rc::Rc};
1458
1459    use ahash::AHashSet;
1460    use bytes::Bytes;
1461    use indexmap::IndexMap;
1462    use nautilus_common::msgbus::{self, BusTap, Endpoint, MStr, Topic as BusTopic};
1463    use nautilus_core::{UUID4, UnixNanos};
1464    use nautilus_model::{
1465        accounts::AccountAny,
1466        data::{Bar, BarSpecification, BarType, FundingRateUpdate, QuoteTick, TradeTick},
1467        enums::{
1468            AggregationSource, AggressorSide, BarAggregation, OrderSide, OrderStatus,
1469            PositionAdjustmentType, PriceType,
1470        },
1471        events::{
1472            PositionEvent,
1473            account::stubs::{cash_account_state, cash_account_state_million_usd},
1474            order::spec::{
1475                OrderAcceptedSpec, OrderFilledSpec, OrderInitializedSpec, OrderSubmittedSpec,
1476            },
1477        },
1478        identifiers::{
1479            AccountId, ClientId, ClientOrderId, InstrumentId, OrderListId, PositionId, TradeId,
1480            VenueOrderId,
1481        },
1482        instruments::{Instrument, InstrumentAny, stubs::audusd_sim},
1483        orders::{Order, OrderList},
1484        types::{Currency, Money, Price, Quantity},
1485    };
1486    use rstest::rstest;
1487    use serde::Serialize;
1488    use tempfile::TempDir;
1489    use ustr::Ustr;
1490
1491    use super::*;
1492    use crate::{
1493        backend::{AppendEntry, MemoryBackend, RedbBackend},
1494        capture::{
1495            builtins::{
1496                DEFAULT_CAPTURE_PAYLOAD_TYPES, encode_order_event_any, encode_position_event,
1497            },
1498            encode_account_state,
1499        },
1500        entry::Topic as EntryTopic,
1501        hash::compute_entry_hash,
1502        headers::Headers,
1503        manifest::{RegisteredComponents, RunManifest, RunStatus},
1504        snapshot::SnapshotAnchor,
1505    };
1506
1507    fn manifest(run_id: &str) -> RunManifest {
1508        RunManifest {
1509            run_id: run_id.to_string(),
1510            parent_run_id: None,
1511            instance_id: "trader-001".to_string(),
1512            binary_hash: "deadbeef".to_string(),
1513            schema_version: 1,
1514            crate_versions: "feedface".to_string(),
1515            feature_flags: Vec::new(),
1516            adapter_versions: IndexMap::new(),
1517            config_hash: "cafebabe".to_string(),
1518            registered_components: RegisteredComponents::default(),
1519            seed: None,
1520            start_ts_init: UnixNanos::from(0),
1521            end_ts_init: None,
1522            high_watermark: 0,
1523            status: RunStatus::Running,
1524        }
1525    }
1526
1527    fn append_payload(seq: u64, payload_type: &str, payload: Bytes) -> AppendEntry {
1528        append_payload_with_ts(seq, seq, payload_type, payload)
1529    }
1530
1531    fn append_serde_payload<T: Serialize>(seq: u64, payload_type: &str, value: &T) -> AppendEntry {
1532        let payload = rmp_serde::to_vec_named(value).expect("encode replay payload");
1533        append_payload(seq, payload_type, Bytes::from(payload))
1534    }
1535
1536    fn append_payload_with_ts(
1537        seq: u64,
1538        ts_init: u64,
1539        payload_type: &str,
1540        payload: Bytes,
1541    ) -> AppendEntry {
1542        let topic = EntryTopic::from("events.account.SIM");
1543        let ts = UnixNanos::from(ts_init);
1544        let headers = Headers::empty();
1545        let hash = compute_entry_hash(
1546            seq,
1547            ts,
1548            ts,
1549            topic.as_ref(),
1550            payload_type,
1551            &payload,
1552            &headers,
1553        );
1554        let entry = EventStoreEntry::new(
1555            hash,
1556            seq,
1557            headers,
1558            topic,
1559            Ustr::from(payload_type),
1560            payload,
1561            ts,
1562            ts,
1563        );
1564        AppendEntry::without_indices(entry)
1565    }
1566
1567    fn append_account_state(seq: u64, state: &AccountState) -> AppendEntry {
1568        let encoded = encode_account_state(state).expect("encode account state");
1569        append_payload(seq, PAYLOAD_TYPE_ACCOUNT_STATE, encoded.payload)
1570    }
1571
1572    fn append_order_event(seq: u64, event: &OrderEventAny) -> AppendEntry {
1573        let encoded = encode_order_event_any(event).expect("encode order event");
1574        let payload_type = encoded.payload_type.expect("order payload type");
1575        append_payload(seq, payload_type.as_str(), encoded.payload)
1576    }
1577
1578    fn append_position_event(seq: u64, event: &PositionEvent) -> AppendEntry {
1579        let encoded = encode_position_event(event).expect("encode position event");
1580        let payload_type = encoded.payload_type.expect("position payload type");
1581        append_payload(seq, payload_type.as_str(), encoded.payload)
1582    }
1583
1584    fn reader_with_entries(
1585        run_id: &str,
1586        entries: &[AppendEntry],
1587    ) -> EventStoreReader<MemoryBackend> {
1588        let mut backend = MemoryBackend::new();
1589        backend.open_run(manifest(run_id)).expect("open");
1590        backend.append_batch(entries).expect("append");
1591        EventStoreReader::new(backend)
1592    }
1593
1594    fn reader_with_anchor(anchor_seq: u64) -> (EventStoreReader<MemoryBackend>, AccountState) {
1595        let anchored = cash_account_state();
1596        let replayed = cash_account_state_million_usd("200 USD", "0 USD", "200 USD");
1597        let mut backend = MemoryBackend::new();
1598        backend.open_run(manifest("run-replay")).expect("open");
1599        backend
1600            .append_batch(&[
1601                append_account_state(1, &anchored),
1602                append_account_state(2, &replayed),
1603            ])
1604            .expect("append");
1605        backend
1606            .record_snapshot_anchor(SnapshotAnchor::new(anchor_seq, "cache://account", "hash"))
1607            .expect("record anchor");
1608        (EventStoreReader::new(backend), replayed)
1609    }
1610
1611    fn catalog_quote_record(ts_init: u64) -> CatalogReplayRecord {
1612        let instrument_id = InstrumentId::from("AUD/USD.SIM");
1613        CatalogReplayRecord::from_data(CatalogReplayData::Quote(QuoteTick::new(
1614            instrument_id,
1615            Price::from("1.0001"),
1616            Price::from("1.0002"),
1617            Quantity::from("100"),
1618            Quantity::from("100"),
1619            UnixNanos::from(ts_init),
1620            UnixNanos::from(ts_init),
1621        )))
1622    }
1623
1624    fn catalog_trade_record(ts_init: u64) -> CatalogReplayRecord {
1625        let instrument_id = InstrumentId::from("AUD/USD.SIM");
1626        CatalogReplayRecord::from_data(CatalogReplayData::Trade(TradeTick::new(
1627            instrument_id,
1628            Price::from("1.0001"),
1629            Quantity::from("100"),
1630            AggressorSide::Buyer,
1631            TradeId::from("T-1"),
1632            UnixNanos::from(ts_init),
1633            UnixNanos::from(ts_init),
1634        )))
1635    }
1636
1637    #[derive(Debug)]
1638    struct CountingTap {
1639        calls: Rc<Cell<usize>>,
1640    }
1641
1642    impl CountingTap {
1643        fn new(calls: Rc<Cell<usize>>) -> Self {
1644            Self { calls }
1645        }
1646
1647        fn increment(&self) {
1648            self.calls.set(self.calls.get() + 1);
1649        }
1650    }
1651
1652    impl BusTap for CountingTap {
1653        fn on_publish(&self, _topic: MStr<BusTopic>, _message: &dyn Any) {
1654            self.increment();
1655        }
1656
1657        fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn Any) {
1658            self.increment();
1659        }
1660    }
1661
1662    #[derive(Debug)]
1663    struct FakeReplayCatalog {
1664        coverage: CatalogSliceCoverage,
1665        records: Vec<CatalogReplayRecord>,
1666        plan_queries: Vec<CatalogSliceQuery>,
1667        load_plans: Vec<CatalogSlicePlan>,
1668    }
1669
1670    impl FakeReplayCatalog {
1671        fn new(coverage: CatalogSliceCoverage, records: Vec<CatalogReplayRecord>) -> Self {
1672            Self {
1673                coverage,
1674                records,
1675                plan_queries: Vec::new(),
1676                load_plans: Vec::new(),
1677            }
1678        }
1679    }
1680
1681    impl ReplayCatalog for FakeReplayCatalog {
1682        type Error = String;
1683
1684        fn plan_slice(
1685            &mut self,
1686            query: &CatalogSliceQuery,
1687        ) -> Result<CatalogSliceCoverage, Self::Error> {
1688            self.plan_queries.push(query.clone());
1689            Ok(self.coverage.clone())
1690        }
1691
1692        fn load_slice(
1693            &mut self,
1694            plan: &CatalogSlicePlan,
1695        ) -> Result<Vec<CatalogReplayRecord>, Self::Error> {
1696            self.load_plans.push(plan.clone());
1697            Ok(self.records.clone())
1698        }
1699    }
1700
1701    struct BusTapGuard;
1702
1703    impl Drop for BusTapGuard {
1704        fn drop(&mut self) {
1705            msgbus::clear_bus_tap();
1706        }
1707    }
1708
1709    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1710    enum CacheMutationRecoveryClass {
1711        SnapshotOwned,
1712        EventStoreCapturedAndReplayed,
1713        ForensicOnly,
1714        MissingLiveRecovery,
1715    }
1716
1717    #[derive(Clone, Copy, Debug)]
1718    struct CacheMutationCoverage {
1719        method: &'static str,
1720        class: CacheMutationRecoveryClass,
1721        payload_types: &'static [&'static str],
1722    }
1723
1724    const CACHE_MUTATION_COVERAGE: &[CacheMutationCoverage] = &[
1725        cache_mutation(
1726            "set_database",
1727            CacheMutationRecoveryClass::SnapshotOwned,
1728            &[],
1729        ),
1730        cache_mutation(
1731            "cache_general",
1732            CacheMutationRecoveryClass::SnapshotOwned,
1733            &[],
1734        ),
1735        cache_mutation("cache_all", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1736        cache_mutation(
1737            "cache_currencies",
1738            CacheMutationRecoveryClass::SnapshotOwned,
1739            &[],
1740        ),
1741        cache_mutation(
1742            "cache_instruments",
1743            CacheMutationRecoveryClass::SnapshotOwned,
1744            &[],
1745        ),
1746        cache_mutation(
1747            "cache_synthetics",
1748            CacheMutationRecoveryClass::SnapshotOwned,
1749            &[],
1750        ),
1751        cache_mutation(
1752            "cache_accounts",
1753            CacheMutationRecoveryClass::SnapshotOwned,
1754            &[],
1755        ),
1756        cache_mutation(
1757            "cache_orders",
1758            CacheMutationRecoveryClass::SnapshotOwned,
1759            &[],
1760        ),
1761        cache_mutation(
1762            "cache_positions",
1763            CacheMutationRecoveryClass::SnapshotOwned,
1764            &[],
1765        ),
1766        cache_mutation(
1767            "build_index",
1768            CacheMutationRecoveryClass::SnapshotOwned,
1769            &[],
1770        ),
1771        cache_mutation(
1772            "purge_closed_orders",
1773            CacheMutationRecoveryClass::SnapshotOwned,
1774            &[],
1775        ),
1776        cache_mutation(
1777            "purge_closed_positions",
1778            CacheMutationRecoveryClass::SnapshotOwned,
1779            &[],
1780        ),
1781        cache_mutation(
1782            "purge_order",
1783            CacheMutationRecoveryClass::SnapshotOwned,
1784            &[],
1785        ),
1786        cache_mutation(
1787            "purge_position",
1788            CacheMutationRecoveryClass::SnapshotOwned,
1789            &[],
1790        ),
1791        cache_mutation(
1792            "purge_instrument",
1793            CacheMutationRecoveryClass::SnapshotOwned,
1794            &[],
1795        ),
1796        cache_mutation(
1797            "purge_account_events",
1798            CacheMutationRecoveryClass::SnapshotOwned,
1799            &[],
1800        ),
1801        cache_mutation(
1802            "clear_index",
1803            CacheMutationRecoveryClass::SnapshotOwned,
1804            &[],
1805        ),
1806        cache_mutation("reset", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1807        cache_mutation("dispose", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1808        cache_mutation("flush_db", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1809        cache_mutation("add", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1810        cache_mutation(
1811            "add_order_book",
1812            CacheMutationRecoveryClass::ForensicOnly,
1813            &[PAYLOAD_TYPE_BOOK_RESPONSE],
1814        ),
1815        cache_mutation(
1816            "add_own_order_book",
1817            CacheMutationRecoveryClass::SnapshotOwned,
1818            &[],
1819        ),
1820        cache_mutation(
1821            "add_mark_price",
1822            CacheMutationRecoveryClass::MissingLiveRecovery,
1823            &[],
1824        ),
1825        cache_mutation(
1826            "add_index_price",
1827            CacheMutationRecoveryClass::MissingLiveRecovery,
1828            &[],
1829        ),
1830        cache_mutation(
1831            "add_funding_rate",
1832            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1833            &[PAYLOAD_TYPE_FUNDING_RATES_RESPONSE],
1834        ),
1835        cache_mutation(
1836            "add_funding_rates",
1837            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1838            &[PAYLOAD_TYPE_FUNDING_RATES_RESPONSE],
1839        ),
1840        cache_mutation(
1841            "add_instrument_status",
1842            CacheMutationRecoveryClass::MissingLiveRecovery,
1843            &[],
1844        ),
1845        cache_mutation(
1846            "add_quote",
1847            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1848            &[PAYLOAD_TYPE_QUOTES_RESPONSE],
1849        ),
1850        cache_mutation(
1851            "add_quotes",
1852            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1853            &[PAYLOAD_TYPE_QUOTES_RESPONSE],
1854        ),
1855        cache_mutation(
1856            "add_trade",
1857            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1858            &[PAYLOAD_TYPE_TRADES_RESPONSE],
1859        ),
1860        cache_mutation(
1861            "add_trades",
1862            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1863            &[PAYLOAD_TYPE_TRADES_RESPONSE],
1864        ),
1865        cache_mutation(
1866            "add_bar",
1867            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1868            &[PAYLOAD_TYPE_BARS_RESPONSE],
1869        ),
1870        cache_mutation(
1871            "add_bars",
1872            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1873            &[PAYLOAD_TYPE_BARS_RESPONSE],
1874        ),
1875        cache_mutation(
1876            "add_greeks",
1877            CacheMutationRecoveryClass::MissingLiveRecovery,
1878            &[],
1879        ),
1880        cache_mutation(
1881            "add_option_greeks",
1882            CacheMutationRecoveryClass::MissingLiveRecovery,
1883            &[],
1884        ),
1885        cache_mutation(
1886            "add_yield_curve",
1887            CacheMutationRecoveryClass::MissingLiveRecovery,
1888            &[],
1889        ),
1890        cache_mutation(
1891            "add_currency",
1892            CacheMutationRecoveryClass::SnapshotOwned,
1893            &[],
1894        ),
1895        cache_mutation(
1896            "add_instrument",
1897            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1898            &[
1899                PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
1900                PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
1901            ],
1902        ),
1903        cache_mutation(
1904            "add_synthetic",
1905            CacheMutationRecoveryClass::SnapshotOwned,
1906            &[],
1907        ),
1908        cache_mutation(
1909            "add_account",
1910            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1911            &[PAYLOAD_TYPE_ACCOUNT_STATE],
1912        ),
1913        cache_mutation(
1914            "add_venue_order_id",
1915            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1916            &[PAYLOAD_TYPE_ORDER_ACCEPTED, PAYLOAD_TYPE_ORDER_UPDATED],
1917        ),
1918        cache_mutation(
1919            "add_order",
1920            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1921            &[PAYLOAD_TYPE_ORDER_INITIALIZED],
1922        ),
1923        cache_mutation(
1924            "add_order_list",
1925            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1926            &[PAYLOAD_TYPE_SUBMIT_ORDER_LIST],
1927        ),
1928        cache_mutation(
1929            "add_position_id",
1930            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1931            &[
1932                PAYLOAD_TYPE_ORDER_FILLED,
1933                PAYLOAD_TYPE_POSITION_OPENED,
1934                PAYLOAD_TYPE_POSITION_CHANGED,
1935                PAYLOAD_TYPE_POSITION_CLOSED,
1936            ],
1937        ),
1938        cache_mutation(
1939            "add_position",
1940            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1941            &[PAYLOAD_TYPE_ORDER_FILLED],
1942        ),
1943        cache_mutation(
1944            "update_account",
1945            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1946            &[PAYLOAD_TYPE_ACCOUNT_STATE],
1947        ),
1948        cache_mutation(
1949            "take_account",
1950            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1951            &[PAYLOAD_TYPE_ACCOUNT_STATE],
1952        ),
1953        cache_mutation(
1954            "cache_account_owned",
1955            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1956            &[PAYLOAD_TYPE_ACCOUNT_STATE],
1957        ),
1958        cache_mutation(
1959            "update_account_owned",
1960            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1961            &[PAYLOAD_TYPE_ACCOUNT_STATE],
1962        ),
1963        cache_mutation(
1964            "update_account_state",
1965            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1966            &[PAYLOAD_TYPE_ACCOUNT_STATE],
1967        ),
1968        cache_mutation(
1969            "replace_order",
1970            CacheMutationRecoveryClass::ForensicOnly,
1971            &[
1972                PAYLOAD_TYPE_ORDER_STATUS_REPORT,
1973                PAYLOAD_TYPE_ORDER_WITH_FILLS,
1974                PAYLOAD_TYPE_EXECUTION_MASS_STATUS,
1975            ],
1976        ),
1977        cache_mutation(
1978            "update_order",
1979            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1980            &[
1981                PAYLOAD_TYPE_ORDER_DENIED,
1982                PAYLOAD_TYPE_ORDER_EMULATED,
1983                PAYLOAD_TYPE_ORDER_RELEASED,
1984                PAYLOAD_TYPE_ORDER_SUBMITTED,
1985                PAYLOAD_TYPE_ORDER_ACCEPTED,
1986                PAYLOAD_TYPE_ORDER_REJECTED,
1987                PAYLOAD_TYPE_ORDER_CANCELED,
1988                PAYLOAD_TYPE_ORDER_EXPIRED,
1989                PAYLOAD_TYPE_ORDER_TRIGGERED,
1990                PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
1991                PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
1992                PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
1993                PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
1994                PAYLOAD_TYPE_ORDER_UPDATED,
1995                PAYLOAD_TYPE_ORDER_FILLED,
1996            ],
1997        ),
1998        cache_mutation(
1999            "update_order_pending_cancel_local",
2000            CacheMutationRecoveryClass::MissingLiveRecovery,
2001            &[],
2002        ),
2003        cache_mutation(
2004            "update_position",
2005            CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
2006            &[
2007                PAYLOAD_TYPE_ORDER_FILLED,
2008                PAYLOAD_TYPE_POSITION_OPENED,
2009                PAYLOAD_TYPE_POSITION_CHANGED,
2010                PAYLOAD_TYPE_POSITION_CLOSED,
2011                PAYLOAD_TYPE_POSITION_ADJUSTED,
2012            ],
2013        ),
2014        cache_mutation(
2015            "snapshot_position",
2016            CacheMutationRecoveryClass::SnapshotOwned,
2017            &[],
2018        ),
2019        cache_mutation(
2020            "snapshot_position_state",
2021            CacheMutationRecoveryClass::SnapshotOwned,
2022            &[],
2023        ),
2024        cache_mutation(
2025            "load_snapshot_blob",
2026            CacheMutationRecoveryClass::SnapshotOwned,
2027            &[],
2028        ),
2029        cache_mutation(
2030            "restore_snapshot_blob",
2031            CacheMutationRecoveryClass::SnapshotOwned,
2032            &[],
2033        ),
2034        cache_mutation(
2035            "order_mut",
2036            CacheMutationRecoveryClass::MissingLiveRecovery,
2037            &[],
2038        ),
2039        cache_mutation(
2040            "position_mut",
2041            CacheMutationRecoveryClass::MissingLiveRecovery,
2042            &[],
2043        ),
2044        cache_mutation(
2045            "order_book_mut",
2046            CacheMutationRecoveryClass::ForensicOnly,
2047            &[
2048                PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
2049                PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE,
2050            ],
2051        ),
2052        cache_mutation(
2053            "own_order_book_mut",
2054            CacheMutationRecoveryClass::SnapshotOwned,
2055            &[],
2056        ),
2057        cache_mutation(
2058            "set_mark_xrate",
2059            CacheMutationRecoveryClass::MissingLiveRecovery,
2060            &[],
2061        ),
2062        cache_mutation(
2063            "clear_mark_xrate",
2064            CacheMutationRecoveryClass::MissingLiveRecovery,
2065            &[],
2066        ),
2067        cache_mutation(
2068            "clear_mark_xrates",
2069            CacheMutationRecoveryClass::MissingLiveRecovery,
2070            &[],
2071        ),
2072        cache_mutation(
2073            "account_mut",
2074            CacheMutationRecoveryClass::MissingLiveRecovery,
2075            &[],
2076        ),
2077        cache_mutation(
2078            "update_own_order_book",
2079            CacheMutationRecoveryClass::SnapshotOwned,
2080            &[],
2081        ),
2082        cache_mutation(
2083            "force_remove_from_own_order_book",
2084            CacheMutationRecoveryClass::SnapshotOwned,
2085            &[],
2086        ),
2087        cache_mutation(
2088            "audit_own_order_books",
2089            CacheMutationRecoveryClass::SnapshotOwned,
2090            &[],
2091        ),
2092    ];
2093
2094    const CACHE_MUTATION_EXCLUSIONS: &[&str] = &["check_integrity"];
2095
2096    const fn cache_mutation(
2097        method: &'static str,
2098        class: CacheMutationRecoveryClass,
2099        payload_types: &'static [&'static str],
2100    ) -> CacheMutationCoverage {
2101        CacheMutationCoverage {
2102            method,
2103            class,
2104            payload_types,
2105        }
2106    }
2107
2108    fn cache_public_methods() -> AHashSet<&'static str> {
2109        collect_cache_public_methods(false)
2110    }
2111
2112    fn cache_public_mutable_methods() -> AHashSet<&'static str> {
2113        collect_cache_public_methods(true)
2114    }
2115
2116    fn collect_cache_public_methods(require_mut_self: bool) -> AHashSet<&'static str> {
2117        let source = include_str!("../../common/src/cache/mod.rs");
2118        let mut methods = AHashSet::new();
2119        let mut pending_name: Option<&'static str> = None;
2120        let mut pending_signature = String::new();
2121
2122        for line in source.lines() {
2123            let trimmed = line.trim_start();
2124
2125            if pending_name.is_none() {
2126                let Some(rest) = trimmed
2127                    .strip_prefix("pub fn ")
2128                    .or_else(|| trimmed.strip_prefix("pub async fn "))
2129                else {
2130                    continue;
2131                };
2132                pending_name = rest.split('(').next();
2133                pending_signature.clear();
2134                pending_signature.push_str(trimmed);
2135            } else {
2136                pending_signature.push(' ');
2137                pending_signature.push_str(trimmed);
2138            }
2139
2140            if trimmed.contains('{') {
2141                if let Some(name) = pending_name.take()
2142                    && (!require_mut_self || pending_signature.contains("&mut self"))
2143                {
2144                    methods.insert(name);
2145                }
2146                pending_signature.clear();
2147            }
2148        }
2149
2150        methods
2151    }
2152
2153    fn sorted_missing_methods<'a>(
2154        actual: &'a AHashSet<&'static str>,
2155        classified: &'a AHashSet<&'static str>,
2156    ) -> Vec<&'static str> {
2157        let mut missing: Vec<_> = actual
2158            .iter()
2159            .copied()
2160            .filter(|method| !classified.contains(method))
2161            .collect();
2162        missing.sort_unstable();
2163        missing
2164    }
2165
2166    fn sorted_stale_methods<'a>(
2167        classified: &'a AHashSet<&'static str>,
2168        actual: &'a AHashSet<&'static str>,
2169    ) -> Vec<&'static str> {
2170        let mut stale: Vec<_> = classified
2171            .iter()
2172            .copied()
2173            .filter(|method| !actual.contains(method))
2174            .collect();
2175        stale.sort_unstable();
2176        stale
2177    }
2178
2179    #[rstest]
2180    fn catalog_replay_inputs_join_event_entries_with_selected_catalog_slice() {
2181        let reader = reader_with_entries(
2182            "run-catalog",
2183            &[
2184                append_payload_with_ts(1, 120, "RunStarted", Bytes::from_static(b"started")),
2185                append_payload_with_ts(2, 100, "SubmitOrder", Bytes::from_static(b"submit")),
2186            ],
2187        );
2188        let record = catalog_quote_record(110);
2189        let mut catalog = FakeReplayCatalog::new(
2190            CatalogSliceCoverage::from_files(vec!["quotes/AUDUSD.SIM/100_120.parquet".into()]),
2191            vec![record.clone()],
2192        );
2193
2194        let plan = plan_catalog_replay_inputs(
2195            &reader,
2196            &mut catalog,
2197            ReplaySeqRange::new(1, 2),
2198            &[CatalogSliceSelector::new("quotes").with_identifier("AUD/USD.SIM")],
2199        )
2200        .expect("plan catalog replay");
2201
2202        assert_eq!(plan.event_range, Some(ReplaySeqRange::new(1, 2)));
2203        assert_eq!(plan.event_count, 2);
2204        assert_eq!(
2205            plan.event_time_range,
2206            Some(ReplayTimeRange::new(
2207                UnixNanos::from(100),
2208                UnixNanos::from(120),
2209            )),
2210        );
2211        assert!(!plan.catalog_slices[0].is_missing());
2212        assert_eq!(catalog.plan_queries.len(), 1);
2213        assert_eq!(catalog.plan_queries[0].data_cls, "quotes");
2214        assert_eq!(
2215            catalog.plan_queries[0].identifiers,
2216            vec!["AUD/USD.SIM".to_string()],
2217        );
2218        assert_eq!(catalog.plan_queries[0].start, UnixNanos::from(100));
2219        assert_eq!(catalog.plan_queries[0].end, UnixNanos::from(120));
2220
2221        let loaded =
2222            load_catalog_replay_inputs(&reader, &mut catalog, &plan).expect("load catalog");
2223        let seqs: Vec<_> = loaded.entries.iter().map(|entry| entry.seq).collect();
2224
2225        assert_eq!(seqs, vec![1, 2]);
2226        assert_eq!(loaded.catalog_slices.len(), 1);
2227        assert_eq!(loaded.catalog_slices[0].records, vec![record]);
2228        assert_eq!(catalog.load_plans.len(), 1);
2229    }
2230
2231    #[rstest]
2232    fn catalog_plan_marks_missing_catalog_slice() {
2233        let reader = reader_with_entries(
2234            "run-missing-catalog",
2235            &[append_payload_with_ts(
2236                1,
2237                1_000,
2238                "RunStarted",
2239                Bytes::from_static(b"started"),
2240            )],
2241        );
2242        let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2243
2244        let plan = plan_catalog_replay_inputs(
2245            &reader,
2246            &mut catalog,
2247            ReplaySeqRange::new(1, 1),
2248            &[CatalogSliceSelector::new("trades").with_identifier("AUD/USD.SIM")],
2249        )
2250        .expect("plan catalog replay");
2251        let missing = plan.missing_catalog_slices();
2252
2253        assert_eq!(missing.len(), 1);
2254        assert_eq!(missing[0].query.data_cls, "trades");
2255        assert_eq!(
2256            missing[0].query.identifiers,
2257            vec!["AUD/USD.SIM".to_string()],
2258        );
2259        assert_eq!(missing[0].query.start, UnixNanos::from(1_000));
2260        assert_eq!(missing[0].query.end, UnixNanos::from(1_000));
2261    }
2262
2263    #[rstest]
2264    fn required_missing_catalog_slice_rejects_load() {
2265        let reader = reader_with_entries(
2266            "run-required-missing",
2267            &[append_payload_with_ts(
2268                1,
2269                1_000,
2270                "RunStarted",
2271                Bytes::from_static(b"started"),
2272            )],
2273        );
2274        let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2275        let plan = plan_catalog_replay_inputs(
2276            &reader,
2277            &mut catalog,
2278            ReplaySeqRange::new(1, 1),
2279            &[CatalogSliceSelector::new("quotes")
2280                .with_identifier("AUD/USD.SIM")
2281                .require_coverage()],
2282        )
2283        .expect("plan missing slice");
2284
2285        let err = load_catalog_replay_inputs(&reader, &mut catalog, &plan)
2286            .expect_err("required missing slice must fail");
2287
2288        match err {
2289            ReplayInputError::MissingCatalogSlice {
2290                data_cls,
2291                identifiers,
2292            } => {
2293                assert_eq!(data_cls, "quotes");
2294                assert_eq!(identifiers, vec!["AUD/USD.SIM".to_string()]);
2295            }
2296            other => panic!("expected MissingCatalogSlice, was {other:?}"),
2297        }
2298    }
2299
2300    #[rstest]
2301    fn optional_missing_catalog_slice_loads_as_empty_without_catalog_load() {
2302        let reader = reader_with_entries(
2303            "run-optional-missing",
2304            &[append_payload_with_ts(
2305                1,
2306                1_000,
2307                "RunStarted",
2308                Bytes::from_static(b"started"),
2309            )],
2310        );
2311        let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2312        let plan = plan_catalog_replay_inputs(
2313            &reader,
2314            &mut catalog,
2315            ReplaySeqRange::new(1, 1),
2316            &[CatalogSliceSelector::new("quotes").with_identifier("AUD/USD.SIM")],
2317        )
2318        .expect("plan optional missing slice");
2319
2320        let loaded =
2321            load_catalog_replay_inputs(&reader, &mut catalog, &plan).expect("load optional");
2322
2323        assert_eq!(loaded.catalog_slices.len(), 1);
2324        assert!(loaded.catalog_slices[0].plan.is_missing());
2325        assert!(loaded.catalog_slices[0].records.is_empty());
2326        assert!(catalog.load_plans.is_empty());
2327    }
2328
2329    #[rstest]
2330    fn catalog_joined_planner_rejects_empty_catalog_selection() {
2331        let reader = reader_with_entries(
2332            "run-empty-selection",
2333            &[append_payload_with_ts(
2334                1,
2335                1_000,
2336                "RunStarted",
2337                Bytes::from_static(b"started"),
2338            )],
2339        );
2340        let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2341
2342        let err = plan_catalog_replay_inputs(&reader, &mut catalog, ReplaySeqRange::new(1, 1), &[])
2343            .expect_err("empty catalog selection must fail");
2344
2345        match err {
2346            ReplayInputError::EmptyCatalogSelection => {}
2347            other => panic!("expected EmptyCatalogSelection, was {other:?}"),
2348        }
2349        assert!(catalog.plan_queries.is_empty());
2350    }
2351
2352    #[rstest]
2353    fn catalog_selector_explicit_time_bounds_override_event_span() {
2354        let reader = reader_with_entries(
2355            "run-explicit-bounds",
2356            &[append_payload_with_ts(
2357                1,
2358                1_000,
2359                "RunStarted",
2360                Bytes::from_static(b"started"),
2361            )],
2362        );
2363        let mut catalog = FakeReplayCatalog::new(
2364            CatalogSliceCoverage::from_files(vec!["bars/AUDUSD.SIM/900_950.parquet".into()]),
2365            Vec::new(),
2366        );
2367
2368        let plan = plan_catalog_replay_inputs(
2369            &reader,
2370            &mut catalog,
2371            ReplaySeqRange::new(1, 1),
2372            &[CatalogSliceSelector::new("bars")
2373                .with_identifier("AUD/USD.SIM-1-MINUTE-BID-EXTERNAL")
2374                .with_time_bounds(UnixNanos::from(900), UnixNanos::from(950))],
2375        )
2376        .expect("plan explicit bounds");
2377
2378        assert_eq!(plan.catalog_slices[0].query.start, UnixNanos::from(900));
2379        assert_eq!(plan.catalog_slices[0].query.end, UnixNanos::from(950));
2380        assert_eq!(catalog.plan_queries[0].start, UnixNanos::from(900));
2381        assert_eq!(catalog.plan_queries[0].end, UnixNanos::from(950));
2382    }
2383
2384    #[rstest]
2385    fn catalog_replay_inputs_load_catalog_records() {
2386        let reader = reader_with_entries(
2387            "run-catalog-load",
2388            &[
2389                append_payload_with_ts(1, 100, "RunStarted", Bytes::from_static(b"started")),
2390                append_payload_with_ts(2, 110, "OrderFilled", Bytes::from_static(b"filled")),
2391            ],
2392        );
2393        let record = catalog_trade_record(105);
2394        let mut catalog = FakeReplayCatalog::new(
2395            CatalogSliceCoverage::from_files(vec!["trades/AUDUSD.SIM/100_110.parquet".into()]),
2396            vec![record.clone()],
2397        );
2398        let plan = plan_catalog_replay_inputs(
2399            &reader,
2400            &mut catalog,
2401            ReplaySeqRange::new(1, 2),
2402            &[CatalogSliceSelector::new("trades").with_identifier("AUD/USD.SIM")],
2403        )
2404        .expect("plan catalog replay");
2405
2406        assert_eq!(
2407            plan.catalog_slices[0].query.identifiers_option(),
2408            Some(vec!["AUD/USD.SIM".to_string()]),
2409        );
2410
2411        let loaded =
2412            load_catalog_replay_inputs(&reader, &mut catalog, &plan).expect("load catalog");
2413        let seqs: Vec<_> = loaded.entries.iter().map(|entry| entry.seq).collect();
2414
2415        assert_eq!(seqs, vec![1, 2]);
2416        assert_eq!(loaded.catalog_slices[0].records, vec![record]);
2417        assert_eq!(catalog.load_plans.len(), 1);
2418    }
2419
2420    #[rstest]
2421    fn unbounded_catalog_selector_rejects_empty_event_scan() {
2422        let reader = reader_with_entries("run-empty", &[]);
2423        let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2424
2425        let err = plan_catalog_replay_inputs(
2426            &reader,
2427            &mut catalog,
2428            ReplaySeqRange::new(1, 10),
2429            &[CatalogSliceSelector::new("quotes")],
2430        )
2431        .expect_err("empty replay scan must need explicit bounds");
2432
2433        match err {
2434            ReplayInputError::MissingCatalogTimeBounds { data_cls } => {
2435                assert_eq!(data_cls, "quotes");
2436            }
2437            other => panic!("expected MissingCatalogTimeBounds, was {other:?}"),
2438        }
2439    }
2440
2441    #[rstest]
2442    fn invalid_catalog_time_bounds_are_rejected_before_catalog_access() {
2443        let reader = reader_with_entries(
2444            "run-invalid-bounds",
2445            &[append_payload_with_ts(
2446                1,
2447                1_000,
2448                "RunStarted",
2449                Bytes::from_static(b"started"),
2450            )],
2451        );
2452        let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2453
2454        let err = plan_catalog_replay_inputs(
2455            &reader,
2456            &mut catalog,
2457            ReplaySeqRange::new(1, 1),
2458            &[CatalogSliceSelector::new("quotes")
2459                .with_time_bounds(UnixNanos::from(200), UnixNanos::from(100))],
2460        )
2461        .expect_err("invalid catalog bounds must fail");
2462
2463        match err {
2464            ReplayInputError::InvalidCatalogTimeRange {
2465                data_cls,
2466                start,
2467                end,
2468            } => {
2469                assert_eq!(data_cls, "quotes");
2470                assert_eq!(start, 200);
2471                assert_eq!(end, 100);
2472            }
2473            other => panic!("expected InvalidCatalogTimeRange, was {other:?}"),
2474        }
2475        assert!(catalog.plan_queries.is_empty());
2476    }
2477
2478    #[rstest]
2479    fn forensics_replay_inputs_do_not_require_catalog_source() {
2480        let reader = reader_with_entries(
2481            "run-forensics",
2482            &[append_payload_with_ts(
2483                1,
2484                500,
2485                "RunStarted",
2486                Bytes::from_static(b"started"),
2487            )],
2488        );
2489
2490        let plan = plan_forensics_replay_inputs(&reader, ReplaySeqRange::new(1, 1))
2491            .expect("plan forensics");
2492        let loaded = load_forensics_replay_inputs(&reader, &plan).expect("load forensics");
2493
2494        assert!(plan.catalog_slices.is_empty());
2495        assert_eq!(loaded.entries.len(), 1);
2496        assert!(loaded.catalog_slices.is_empty());
2497    }
2498
2499    #[rstest]
2500    #[case::zero_start(ReplaySeqRange::new(0, 1), "seq is 1-based")]
2501    #[case::from_after_to(ReplaySeqRange::new(2, 1), "from_seq exceeds to_seq")]
2502    fn invalid_replay_seq_range_rejected(
2503        #[case] range: ReplaySeqRange,
2504        #[case] expected_message: &str,
2505    ) {
2506        let reader = reader_with_entries("run-invalid-seq", &[]);
2507
2508        let err =
2509            plan_forensics_replay_inputs(&reader, range).expect_err("invalid seq range must fail");
2510
2511        match err {
2512            ReplayInputError::InvalidSeqRange {
2513                from_seq,
2514                to_seq,
2515                message,
2516            } => {
2517                assert_eq!(from_seq, range.from_seq);
2518                assert_eq!(to_seq, range.to_seq);
2519                assert_eq!(message, expected_message);
2520            }
2521            other => panic!("expected InvalidSeqRange, was {other:?}"),
2522        }
2523    }
2524
2525    #[rstest]
2526    fn replay_restores_snapshot_before_applying_tail() {
2527        let (reader, replayed) = reader_with_anchor(1);
2528        let mut cache = Cache::default();
2529        let restored = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
2530        let restored_id = restored.account_id;
2531
2532        let report =
2533            restore_cache_snapshot_and_replay_tail(&mut cache, &reader, |cache, anchor| {
2534                assert_eq!(anchor.expect("anchor").high_watermark, 1);
2535                let account = AccountAny::from_events(std::slice::from_ref(&restored))
2536                    .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))?;
2537                cache
2538                    .add_account(account)
2539                    .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))
2540            })
2541            .expect("replay");
2542
2543        let account = cache.account_owned(&restored_id).expect("account restored");
2544        let events = account.events();
2545
2546        assert_eq!(report.plan.from_seq, 2);
2547        assert_eq!(report.applied_entries, 1);
2548        assert_eq!(report.ignored_entries, 0);
2549        assert_eq!(events, vec![restored, replayed]);
2550    }
2551
2552    #[rstest]
2553    fn replay_does_not_apply_entries_at_or_below_anchor_watermark() {
2554        let (reader, _) = reader_with_anchor(2);
2555        let mut cache = Cache::default();
2556        let restored = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
2557        let restored_id = restored.account_id;
2558
2559        let report =
2560            restore_cache_snapshot_and_replay_tail(&mut cache, &reader, |cache, anchor| {
2561                assert_eq!(anchor.expect("anchor").high_watermark, 2);
2562                let account = AccountAny::from_events(std::slice::from_ref(&restored))
2563                    .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))?;
2564                cache
2565                    .add_account(account)
2566                    .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))
2567            })
2568            .expect("replay");
2569
2570        let account = cache.account_owned(&restored_id).expect("account restored");
2571
2572        assert!(report.plan.is_empty());
2573        assert_eq!(report.applied_entries, 0);
2574        assert_eq!(report.ignored_entries, 0);
2575        assert_eq!(account.events(), vec![restored]);
2576    }
2577
2578    #[rstest]
2579    fn replay_from_start_applies_account_state_without_bus_publish() {
2580        let state = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
2581        let account_id = AccountId::from("SIM-001");
2582        let bus_calls = Rc::new(Cell::new(0));
2583        msgbus::set_bus_tap(Rc::new(CountingTap::new(Rc::clone(&bus_calls))));
2584        let _guard = BusTapGuard;
2585        let mut backend = MemoryBackend::new();
2586        backend.open_run(manifest("run-replay")).expect("open");
2587        backend
2588            .append_batch(&[append_account_state(1, &state)])
2589            .expect("append");
2590        let reader = EventStoreReader::new(backend);
2591        let mut cache = Cache::default();
2592
2593        let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
2594        let account = cache.account_owned(&account_id).expect("account replayed");
2595
2596        assert_eq!(report.plan.anchor, None);
2597        assert_eq!(report.plan.from_seq, 1);
2598        assert_eq!(report.applied_entries, 1);
2599        assert_eq!(bus_calls.get(), 0);
2600        assert_eq!(account.last_event(), Some(state));
2601        assert_eq!(account.base_currency(), Some(Currency::USD()));
2602    }
2603
2604    #[rstest]
2605    fn unsupported_payload_is_ignored() {
2606        let mut backend = MemoryBackend::new();
2607        backend.open_run(manifest("run-replay")).expect("open");
2608        backend
2609            .append_batch(&[append_payload(
2610                1,
2611                "RunStarted",
2612                Bytes::copy_from_slice(UUID4::new().to_string().as_bytes()),
2613            )])
2614            .expect("append");
2615        let reader = EventStoreReader::new(backend);
2616        let mut cache = Cache::default();
2617
2618        let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
2619
2620        assert_eq!(report.applied_entries, 0);
2621        assert_eq!(report.ignored_entries, 1);
2622    }
2623
2624    #[rstest]
2625    fn default_capture_payload_types_are_classified_for_cache_replay() {
2626        let mut classified = AHashSet::new();
2627        let mut overlap = Vec::new();
2628
2629        for payload_type in CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES {
2630            classified.insert(*payload_type);
2631        }
2632
2633        for payload_type in FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES {
2634            if !classified.insert(*payload_type) {
2635                overlap.push(*payload_type);
2636            }
2637        }
2638
2639        let mut seen_defaults = AHashSet::new();
2640        let duplicate_defaults: Vec<_> = DEFAULT_CAPTURE_PAYLOAD_TYPES
2641            .iter()
2642            .copied()
2643            .filter(|payload_type| !seen_defaults.insert(*payload_type))
2644            .collect();
2645        let unclassified: Vec<_> = DEFAULT_CAPTURE_PAYLOAD_TYPES
2646            .iter()
2647            .copied()
2648            .filter(|payload_type| !classified.contains(payload_type))
2649            .collect();
2650        let extra: Vec<_> = classified
2651            .iter()
2652            .copied()
2653            .filter(|payload_type| !seen_defaults.contains(payload_type))
2654            .collect();
2655
2656        assert!(
2657            duplicate_defaults.is_empty(),
2658            "default capture payload types must be unique: {duplicate_defaults:?}",
2659        );
2660        assert!(
2661            overlap.is_empty(),
2662            "cache replay and forensic-only classes must not overlap: {overlap:?}",
2663        );
2664        assert!(
2665            unclassified.is_empty(),
2666            "default capture payload types must be cache replayed or forensic-only: {unclassified:?}",
2667        );
2668        assert!(
2669            extra.is_empty(),
2670            "cache replay classification must not list uncaptured payload types: {extra:?}",
2671        );
2672    }
2673
2674    #[rstest]
2675    fn cache_replay_capture_payload_types_have_replay_rules() {
2676        for payload_type in CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES {
2677            let entry = append_payload(1, payload_type, Bytes::from_static(&[0xc1])).entry;
2678            let mut cache = Cache::default();
2679
2680            let err = apply_cache_replay_entry(&mut cache, &entry)
2681                .expect_err("cache replay payload type must have a decode rule");
2682
2683            match err {
2684                CacheReplayError::Decode {
2685                    payload_type: actual,
2686                    ..
2687                } => {
2688                    assert_eq!(actual, *payload_type);
2689                }
2690                other => panic!("expected Decode for {payload_type}, was {other:?}"),
2691            }
2692        }
2693    }
2694
2695    #[rstest]
2696    fn forensic_only_capture_payload_types_are_not_cache_replayed() {
2697        for payload_type in FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES {
2698            let entry = append_payload(1, payload_type, Bytes::from_static(&[0xc1])).entry;
2699            let mut cache = Cache::default();
2700
2701            let applied = apply_cache_replay_entry(&mut cache, &entry)
2702                .expect("forensic-only payload type must not be decoded by cache replay");
2703
2704            assert!(
2705                !applied,
2706                "forensic-only payload type must be ignored by cache replay: {payload_type}",
2707            );
2708        }
2709    }
2710
2711    #[rstest]
2712    fn cache_public_mutators_have_recovery_classification() {
2713        let mut classified = AHashSet::new();
2714        let mut duplicates = Vec::new();
2715
2716        for row in CACHE_MUTATION_COVERAGE {
2717            if !classified.insert(row.method) {
2718                duplicates.push(row.method);
2719            }
2720        }
2721
2722        for method in CACHE_MUTATION_EXCLUSIONS {
2723            if !classified.insert(*method) {
2724                duplicates.push(*method);
2725            }
2726        }
2727
2728        let public_methods = cache_public_methods();
2729        let mutable_methods = cache_public_mutable_methods();
2730        let missing = sorted_missing_methods(&mutable_methods, &classified);
2731        let stale = sorted_stale_methods(&classified, &public_methods);
2732
2733        assert!(
2734            duplicates.is_empty(),
2735            "cache mutation recovery classifications must be unique: {duplicates:?}",
2736        );
2737        assert!(
2738            missing.is_empty(),
2739            "public Cache mutators must be classified for recovery: {missing:?}",
2740        );
2741        assert!(
2742            stale.is_empty(),
2743            "cache mutation recovery classifications reference missing methods: {stale:?}",
2744        );
2745    }
2746
2747    #[rstest]
2748    fn cache_mutation_replay_classification_matches_payload_buckets() {
2749        for row in CACHE_MUTATION_COVERAGE {
2750            match row.class {
2751                CacheMutationRecoveryClass::EventStoreCapturedAndReplayed => {
2752                    assert!(
2753                        !row.payload_types.is_empty(),
2754                        "cache-replayed mutation must cite captured payloads: {}",
2755                        row.method,
2756                    );
2757
2758                    for payload_type in row.payload_types {
2759                        assert!(
2760                            CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES.contains(payload_type),
2761                            "cache mutation {} cites non-replayed payload {payload_type}",
2762                            row.method,
2763                        );
2764                    }
2765                }
2766                CacheMutationRecoveryClass::ForensicOnly => {
2767                    assert!(
2768                        !row.payload_types.is_empty(),
2769                        "forensic-only mutation must cite forensic payloads: {}",
2770                        row.method,
2771                    );
2772
2773                    for payload_type in row.payload_types {
2774                        assert!(
2775                            FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES.contains(payload_type),
2776                            "cache mutation {} cites non-forensic payload {payload_type}",
2777                            row.method,
2778                        );
2779                    }
2780                }
2781                CacheMutationRecoveryClass::SnapshotOwned
2782                | CacheMutationRecoveryClass::MissingLiveRecovery => {
2783                    assert!(
2784                        row.payload_types.is_empty(),
2785                        "non-event-store cache mutation {} should not cite payloads",
2786                        row.method,
2787                    );
2788                }
2789            }
2790        }
2791    }
2792
2793    #[rstest]
2794    fn submit_order_list_replay_restores_order_list() {
2795        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
2796        let instrument_id = instrument.id();
2797        let first_init = OrderInitializedSpec::builder()
2798            .instrument_id(instrument_id)
2799            .client_order_id(ClientOrderId::from("O-LIST-001"))
2800            .build();
2801        let second_init = OrderInitializedSpec::builder()
2802            .instrument_id(instrument_id)
2803            .client_order_id(ClientOrderId::from("O-LIST-002"))
2804            .build();
2805        let order_list = OrderList::new(
2806            OrderListId::from("OL-001"),
2807            instrument_id,
2808            first_init.strategy_id,
2809            vec![first_init.client_order_id, second_init.client_order_id],
2810            UnixNanos::from(1),
2811        );
2812        let command = SubmitOrderList::new(
2813            first_init.trader_id,
2814            Some(ClientId::from("SIM")),
2815            first_init.strategy_id,
2816            order_list.clone(),
2817            vec![first_init, second_init],
2818            None,
2819            None,
2820            None,
2821            UUID4::new(),
2822            UnixNanos::from(2),
2823            None,
2824        );
2825        let entry = append_serde_payload(1, PAYLOAD_TYPE_SUBMIT_ORDER_LIST, &command).entry;
2826        let mut cache = Cache::default();
2827
2828        let applied = apply_cache_replay_entry(&mut cache, &entry).expect("apply order list");
2829        let replayed = cache
2830            .order_list(&order_list.id)
2831            .expect("order list replayed");
2832
2833        assert!(applied);
2834        assert_eq!(replayed, &order_list);
2835    }
2836
2837    #[rstest]
2838    fn data_response_replay_restores_instruments_and_market_data() {
2839        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
2840        let instrument_id = instrument.id();
2841        let client_id = ClientId::from("DATA");
2842        let quote = QuoteTick::new(
2843            instrument_id,
2844            Price::from("1.00000"),
2845            Price::from("1.00010"),
2846            Quantity::from("100000"),
2847            Quantity::from("100000"),
2848            UnixNanos::from(10),
2849            UnixNanos::from(11),
2850        );
2851        let trade = TradeTick::new(
2852            instrument_id,
2853            Price::from("1.00005"),
2854            Quantity::from("50000"),
2855            AggressorSide::Buyer,
2856            TradeId::from("T-DATA-001"),
2857            UnixNanos::from(12),
2858            UnixNanos::from(13),
2859        );
2860        let funding_rate = FundingRateUpdate::new(
2861            instrument_id,
2862            "0.0001".parse().expect("funding rate"),
2863            Some(480),
2864            Some(UnixNanos::from(60)),
2865            UnixNanos::from(14),
2866            UnixNanos::from(15),
2867        );
2868        let bar_type = BarType::new(
2869            instrument_id,
2870            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
2871            AggregationSource::External,
2872        );
2873        let bar = Bar::new(
2874            bar_type,
2875            Price::from("1.00000"),
2876            Price::from("1.00020"),
2877            Price::from("0.99990"),
2878            Price::from("1.00010"),
2879            Quantity::from("150000"),
2880            UnixNanos::from(16),
2881            UnixNanos::from(17),
2882        );
2883        let reader = reader_with_entries(
2884            "run-data-response-replay",
2885            &[
2886                append_serde_payload(
2887                    1,
2888                    PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
2889                    &InstrumentResponse::new(
2890                        UUID4::new(),
2891                        client_id,
2892                        instrument_id,
2893                        instrument.clone(),
2894                        None,
2895                        None,
2896                        UnixNanos::from(1),
2897                        None,
2898                    ),
2899                ),
2900                append_serde_payload(
2901                    2,
2902                    PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
2903                    &InstrumentsResponse::new(
2904                        UUID4::new(),
2905                        client_id,
2906                        instrument_id.venue,
2907                        vec![instrument],
2908                        None,
2909                        None,
2910                        UnixNanos::from(2),
2911                        None,
2912                    ),
2913                ),
2914                append_serde_payload(
2915                    3,
2916                    PAYLOAD_TYPE_QUOTES_RESPONSE,
2917                    &QuotesResponse::new(
2918                        UUID4::new(),
2919                        client_id,
2920                        instrument_id,
2921                        vec![quote],
2922                        None,
2923                        None,
2924                        UnixNanos::from(3),
2925                        None,
2926                    ),
2927                ),
2928                append_serde_payload(
2929                    4,
2930                    PAYLOAD_TYPE_TRADES_RESPONSE,
2931                    &TradesResponse::new(
2932                        UUID4::new(),
2933                        client_id,
2934                        instrument_id,
2935                        vec![trade],
2936                        None,
2937                        None,
2938                        UnixNanos::from(4),
2939                        None,
2940                    ),
2941                ),
2942                append_serde_payload(
2943                    5,
2944                    PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
2945                    &FundingRatesResponse::new(
2946                        UUID4::new(),
2947                        client_id,
2948                        instrument_id,
2949                        vec![funding_rate],
2950                        None,
2951                        None,
2952                        UnixNanos::from(5),
2953                        None,
2954                    ),
2955                ),
2956                append_serde_payload(
2957                    6,
2958                    PAYLOAD_TYPE_BARS_RESPONSE,
2959                    &BarsResponse::new(
2960                        UUID4::new(),
2961                        client_id,
2962                        bar_type,
2963                        vec![bar],
2964                        None,
2965                        None,
2966                        UnixNanos::from(6),
2967                        None,
2968                    ),
2969                ),
2970            ],
2971        );
2972        let mut cache = Cache::default();
2973
2974        let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
2975
2976        assert_eq!(report.applied_entries, 6);
2977        assert_eq!(report.ignored_entries, 0);
2978        assert_eq!(
2979            cache.instrument(&instrument_id).map(Instrument::id),
2980            Some(instrument_id)
2981        );
2982        assert_eq!(cache.quotes(&instrument_id), Some(vec![quote]));
2983        assert_eq!(cache.trades(&instrument_id), Some(vec![trade]));
2984        assert_eq!(
2985            cache.funding_rates(&instrument_id),
2986            Some(vec![funding_rate])
2987        );
2988        assert_eq!(cache.bars(&bar_type), Some(vec![bar]));
2989    }
2990
2991    #[rstest]
2992    fn empty_data_response_replay_is_noop() {
2993        let instrument_id = InstrumentAny::CurrencyPair(audusd_sim()).id();
2994        let client_id = ClientId::from("DATA");
2995        let bar_type = BarType::new(
2996            instrument_id,
2997            BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
2998            AggregationSource::External,
2999        );
3000        let reader = reader_with_entries(
3001            "run-empty-data-response-replay",
3002            &[
3003                append_serde_payload(
3004                    1,
3005                    PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
3006                    &InstrumentsResponse::new(
3007                        UUID4::new(),
3008                        client_id,
3009                        instrument_id.venue,
3010                        Vec::new(),
3011                        None,
3012                        None,
3013                        UnixNanos::from(1),
3014                        None,
3015                    ),
3016                ),
3017                append_serde_payload(
3018                    2,
3019                    PAYLOAD_TYPE_QUOTES_RESPONSE,
3020                    &QuotesResponse::new(
3021                        UUID4::new(),
3022                        client_id,
3023                        instrument_id,
3024                        Vec::new(),
3025                        None,
3026                        None,
3027                        UnixNanos::from(2),
3028                        None,
3029                    ),
3030                ),
3031                append_serde_payload(
3032                    3,
3033                    PAYLOAD_TYPE_TRADES_RESPONSE,
3034                    &TradesResponse::new(
3035                        UUID4::new(),
3036                        client_id,
3037                        instrument_id,
3038                        Vec::new(),
3039                        None,
3040                        None,
3041                        UnixNanos::from(3),
3042                        None,
3043                    ),
3044                ),
3045                append_serde_payload(
3046                    4,
3047                    PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
3048                    &FundingRatesResponse::new(
3049                        UUID4::new(),
3050                        client_id,
3051                        instrument_id,
3052                        Vec::new(),
3053                        None,
3054                        None,
3055                        UnixNanos::from(4),
3056                        None,
3057                    ),
3058                ),
3059                append_serde_payload(
3060                    5,
3061                    PAYLOAD_TYPE_BARS_RESPONSE,
3062                    &BarsResponse::new(
3063                        UUID4::new(),
3064                        client_id,
3065                        bar_type,
3066                        Vec::new(),
3067                        None,
3068                        None,
3069                        UnixNanos::from(5),
3070                        None,
3071                    ),
3072                ),
3073            ],
3074        );
3075        let mut cache = Cache::default();
3076
3077        let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
3078
3079        assert_eq!(report.applied_entries, 5);
3080        assert_eq!(report.ignored_entries, 0);
3081        assert!(cache.instrument(&instrument_id).is_none());
3082        assert_eq!(cache.quotes(&instrument_id), None);
3083        assert_eq!(cache.trades(&instrument_id), None);
3084        assert_eq!(cache.funding_rates(&instrument_id), None);
3085        assert_eq!(cache.bars(&bar_type), None);
3086    }
3087
3088    #[rstest]
3089    fn order_fill_replay_updates_order_and_creates_position() {
3090        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3091        let position_id = PositionId::from("P-001");
3092        let initialized = OrderInitializedSpec::builder()
3093            .instrument_id(instrument.id())
3094            .build();
3095        let client_order_id = initialized.client_order_id;
3096        let submitted = OrderSubmittedSpec::builder()
3097            .instrument_id(instrument.id())
3098            .client_order_id(client_order_id)
3099            .build();
3100        let accepted = OrderAcceptedSpec::builder()
3101            .instrument_id(instrument.id())
3102            .client_order_id(client_order_id)
3103            .account_id(submitted.account_id)
3104            .build();
3105        let filled = OrderFilledSpec::builder()
3106            .instrument_id(instrument.id())
3107            .client_order_id(client_order_id)
3108            .venue_order_id(accepted.venue_order_id)
3109            .account_id(submitted.account_id)
3110            .position_id(position_id)
3111            .commission(Money::from("1 USD"))
3112            .build();
3113        let filled_event = OrderEventAny::Filled(filled);
3114        let reader = reader_with_entries(
3115            "run-order-replay",
3116            &[
3117                append_order_event(1, &OrderEventAny::Initialized(initialized)),
3118                append_order_event(2, &OrderEventAny::Submitted(submitted)),
3119                append_order_event(3, &OrderEventAny::Accepted(accepted)),
3120                append_order_event(4, &filled_event),
3121            ],
3122        );
3123        let mut cache = Cache::default();
3124        cache.add_instrument(instrument).expect("add instrument");
3125
3126        let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
3127        let order = cache.order_owned(&client_order_id).expect("order replayed");
3128        let position = cache
3129            .position_owned(&position_id)
3130            .expect("position replayed");
3131
3132        assert_eq!(report.applied_entries, 4);
3133        assert_eq!(report.ignored_entries, 0);
3134        assert_eq!(order.status(), OrderStatus::Filled);
3135        assert_eq!(order.event_count(), 4);
3136        assert_eq!(order.last_event(), &filled_event);
3137        assert_eq!(position.event_count(), 1);
3138        assert_eq!(position.last_event(), Some(filled));
3139        assert_eq!(position.trade_ids(), vec![filled.trade_id]);
3140        assert_eq!(position.commissions(), vec![Money::from("1 USD")]);
3141    }
3142
3143    #[rstest]
3144    fn position_lifecycle_replay_updates_existing_position() {
3145        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3146        let position_id = PositionId::from("P-001");
3147        let opened_fill = OrderFilledSpec::builder()
3148            .instrument_id(instrument.id())
3149            .client_order_id(ClientOrderId::from("O-OPEN"))
3150            .venue_order_id(VenueOrderId::from("V-OPEN"))
3151            .trade_id(TradeId::from("T-OPEN"))
3152            .position_id(position_id)
3153            .last_qty(Quantity::from("1"))
3154            .last_px(Price::from("1.00000"))
3155            .build();
3156        let mut live_position = Position::new(&instrument, opened_fill);
3157        let opened = PositionOpened::create(
3158            &live_position,
3159            &opened_fill,
3160            UUID4::new(),
3161            UnixNanos::from(10),
3162        );
3163
3164        let changed_fill = OrderFilledSpec::builder()
3165            .instrument_id(instrument.id())
3166            .client_order_id(ClientOrderId::from("O-CHANGE"))
3167            .venue_order_id(VenueOrderId::from("V-CHANGE"))
3168            .trade_id(TradeId::from("T-CHANGE"))
3169            .position_id(position_id)
3170            .last_qty(Quantity::from("2"))
3171            .last_px(Price::from("1.10000"))
3172            .build();
3173        live_position.apply(&changed_fill);
3174        let changed = PositionChanged::create(
3175            &live_position,
3176            &changed_fill,
3177            UUID4::new(),
3178            UnixNanos::from(20),
3179        );
3180
3181        let closed_fill = OrderFilledSpec::builder()
3182            .instrument_id(instrument.id())
3183            .client_order_id(ClientOrderId::from("O-CLOSE"))
3184            .venue_order_id(VenueOrderId::from("V-CLOSE"))
3185            .trade_id(TradeId::from("T-CLOSE"))
3186            .order_side(OrderSide::Sell)
3187            .position_id(position_id)
3188            .last_qty(Quantity::from("3"))
3189            .last_px(Price::from("1.20000"))
3190            .build();
3191        live_position.apply(&closed_fill);
3192        let closed = PositionClosed::create(
3193            &live_position,
3194            &closed_fill,
3195            UUID4::new(),
3196            UnixNanos::from(30),
3197        );
3198
3199        let mut stale_position = Position::new(&instrument, opened_fill);
3200        stale_position.signed_qty = 9.0;
3201        stale_position.quantity = Quantity::from("9");
3202        let mut cache = Cache::default();
3203        cache
3204            .add_position(&stale_position, OmsType::Unspecified)
3205            .expect("seed stale position");
3206
3207        let opened_entry =
3208            append_position_event(1, &PositionEvent::PositionOpened(opened.clone())).entry;
3209        let changed_entry =
3210            append_position_event(2, &PositionEvent::PositionChanged(changed.clone())).entry;
3211        let closed_entry =
3212            append_position_event(3, &PositionEvent::PositionClosed(closed.clone())).entry;
3213
3214        assert!(apply_cache_replay_entry(&mut cache, &opened_entry).expect("apply opened"));
3215        let replayed = cache
3216            .position_owned(&position_id)
3217            .expect("position after opened");
3218        assert_eq!(replayed.signed_qty.to_bits(), opened.signed_qty.to_bits());
3219        assert_eq!(replayed.quantity, opened.quantity);
3220        assert_eq!(replayed.ts_last, opened.ts_event);
3221
3222        assert!(apply_cache_replay_entry(&mut cache, &changed_entry).expect("apply changed"));
3223        let replayed = cache
3224            .position_owned(&position_id)
3225            .expect("position after changed");
3226        assert_eq!(replayed.signed_qty.to_bits(), changed.signed_qty.to_bits());
3227        assert_eq!(replayed.quantity, changed.quantity);
3228        assert_eq!(replayed.peak_qty, changed.peak_quantity);
3229        assert_eq!(
3230            replayed.avg_px_open.to_bits(),
3231            changed.avg_px_open.to_bits()
3232        );
3233        assert!(replayed.is_open());
3234
3235        assert!(apply_cache_replay_entry(&mut cache, &closed_entry).expect("apply closed"));
3236        let replayed = cache
3237            .position_owned(&position_id)
3238            .expect("position after closed");
3239        assert_eq!(replayed.signed_qty.to_bits(), closed.signed_qty.to_bits());
3240        assert_eq!(replayed.quantity, closed.quantity);
3241        assert_eq!(replayed.closing_order_id, closed.closing_order_id);
3242        assert_eq!(replayed.duration_ns, closed.duration);
3243        assert!(replayed.is_closed());
3244        assert!(cache.is_position_closed(&position_id));
3245    }
3246
3247    #[rstest]
3248    fn position_adjustment_replay_updates_existing_position() {
3249        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3250        let position_id = PositionId::from("P-001");
3251        let fill = OrderFilledSpec::builder()
3252            .instrument_id(instrument.id())
3253            .position_id(position_id)
3254            .build();
3255        let position = Position::new(&instrument, fill);
3256        let adjustment = PositionAdjusted::new(
3257            fill.trader_id,
3258            fill.strategy_id,
3259            fill.instrument_id,
3260            position_id,
3261            fill.account_id,
3262            PositionAdjustmentType::Funding,
3263            None,
3264            Some(Money::from("2 USD")),
3265            Some(Ustr::from("funding")),
3266            UUID4::new(),
3267            UnixNanos::from(10),
3268            UnixNanos::from(11),
3269        );
3270        let entry = append_position_event(1, &PositionEvent::PositionAdjusted(adjustment)).entry;
3271        let mut cache = Cache::default();
3272        cache
3273            .add_position(&position, OmsType::Unspecified)
3274            .expect("seed position");
3275
3276        let applied = apply_cache_replay_entry(&mut cache, &entry).expect("apply");
3277        let position = cache
3278            .position_owned(&position_id)
3279            .expect("position updated");
3280
3281        assert!(applied);
3282        assert_eq!(position.adjustments, vec![adjustment]);
3283        assert_eq!(position.realized_pnl, Some(Money::from("2 USD")));
3284        assert_eq!(position.ts_last, adjustment.ts_event);
3285    }
3286
3287    #[rstest]
3288    fn duplicate_position_fill_is_not_applied_twice() {
3289        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3290        let position_id = PositionId::from("P-001");
3291        let fill = OrderFilledSpec::builder()
3292            .instrument_id(instrument.id())
3293            .position_id(position_id)
3294            .commission(Money::from("1 USD"))
3295            .build();
3296        let position = Position::new(&instrument, fill);
3297        let entry = append_order_event(1, &OrderEventAny::Filled(fill)).entry;
3298        let mut cache = Cache::default();
3299        cache
3300            .add_position(&position, OmsType::Unspecified)
3301            .expect("seed position");
3302
3303        apply_fill_to_position(&mut cache, &entry, &fill).expect("apply fill");
3304        let position = cache
3305            .position_owned(&position_id)
3306            .expect("position updated");
3307
3308        assert_eq!(position.event_count(), 1);
3309        assert_eq!(position.trade_ids(), vec![fill.trade_id]);
3310        assert_eq!(position.commissions(), vec![Money::from("1 USD")]);
3311    }
3312
3313    #[rstest]
3314    fn corrupt_supported_payload_returns_decode_error() {
3315        let reader = reader_with_entries(
3316            "run-decode-error",
3317            &[append_payload(
3318                1,
3319                PAYLOAD_TYPE_ACCOUNT_STATE,
3320                Bytes::copy_from_slice(&[0xc1]),
3321            )],
3322        );
3323        let mut cache = Cache::default();
3324
3325        let err = replay_cache_snapshot_tail(&mut cache, &reader).expect_err("decode error");
3326
3327        match err {
3328            CacheReplayError::Decode {
3329                seq, payload_type, ..
3330            } => {
3331                assert_eq!(seq, 1);
3332                assert_eq!(payload_type, PAYLOAD_TYPE_ACCOUNT_STATE);
3333            }
3334            other => panic!("expected Decode, was {other:?}"),
3335        }
3336    }
3337
3338    #[rstest]
3339    fn missing_order_event_returns_apply_error() {
3340        let submitted = OrderSubmittedSpec::builder().build();
3341        let reader = reader_with_entries(
3342            "run-apply-error",
3343            &[append_order_event(1, &OrderEventAny::Submitted(submitted))],
3344        );
3345        let mut cache = Cache::default();
3346
3347        let err = replay_cache_snapshot_tail(&mut cache, &reader).expect_err("apply error");
3348
3349        match err {
3350            CacheReplayError::Apply {
3351                seq,
3352                payload_type,
3353                message,
3354            } => {
3355                assert_eq!(seq, 1);
3356                assert_eq!(payload_type, PAYLOAD_TYPE_ORDER_SUBMITTED);
3357                assert!(
3358                    message.contains("not found"),
3359                    "message should include cache apply failure: {message}",
3360                );
3361            }
3362            other => panic!("expected Apply, was {other:?}"),
3363        }
3364    }
3365
3366    #[rstest]
3367    fn restore_cache_from_sealed_run_restores_snapshot_and_tail() {
3368        let tmp = TempDir::new().expect("tempdir");
3369        let run_id = "sealed-replay";
3370        let instance_id = "trader-001";
3371        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3372        let fill = OrderFilledSpec::builder()
3373            .instrument_id(instrument.id())
3374            .position_id(PositionId::from("P-SEALED-REPLAY-1"))
3375            .build();
3376        let position = Position::new(&instrument, fill);
3377        let mut snapshot_cache = Cache::default();
3378        let snapshot_ref = snapshot_cache
3379            .snapshot_position(&position)
3380            .expect("snapshot position");
3381        let anchored_state = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
3382        let replayed_state = cash_account_state_million_usd("200 USD", "0 USD", "200 USD");
3383
3384        {
3385            let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3386            backend.open_run(manifest(run_id)).expect("open run");
3387            backend
3388                .append_batch(&[append_account_state(1, &anchored_state)])
3389                .expect("append anchored state");
3390            backend
3391                .record_snapshot_anchor(SnapshotAnchor::new(
3392                    1,
3393                    snapshot_ref.blob_ref.clone(),
3394                    compute_snapshot_content_hash(snapshot_ref.blob.as_ref()),
3395                ))
3396                .expect("record snapshot anchor");
3397            backend
3398                .append_batch(&[append_account_state(2, &replayed_state)])
3399                .expect("append replay tail");
3400            backend.seal(RunStatus::Ended).expect("seal run");
3401        }
3402
3403        let mut cache = Cache::default();
3404        cache
3405            .add(&snapshot_ref.blob_ref, snapshot_ref.blob.clone())
3406            .expect("seed snapshot blob");
3407
3408        let report = restore_cache_from_sealed_run(
3409            &mut cache,
3410            tmp.path().to_path_buf(),
3411            instance_id,
3412            run_id,
3413        )
3414        .expect("restore sealed run");
3415
3416        let frames = cache
3417            .position_snapshot_bytes(&position.id)
3418            .expect("restored position snapshot");
3419        let account = cache
3420            .account_owned(&replayed_state.account_id)
3421            .expect("replayed account");
3422
3423        assert_eq!(report.manifest.run_id, run_id);
3424        assert_eq!(report.manifest.status, RunStatus::Ended);
3425        assert_eq!(report.cache.plan.from_seq, 2);
3426        assert_eq!(report.cache.applied_entries, 1);
3427        assert_eq!(report.cache.ignored_entries, 0);
3428        assert_eq!(frames.len(), 1);
3429        assert_eq!(frames[0].as_slice(), snapshot_ref.blob.as_ref());
3430        assert_eq!(account.events(), vec![replayed_state]);
3431    }
3432
3433    #[rstest]
3434    fn restore_cache_from_sealed_run_rejects_snapshot_hash_mismatch() {
3435        let tmp = TempDir::new().expect("tempdir");
3436        let run_id = "sealed-replay-bad-snapshot";
3437        let instance_id = "trader-001";
3438        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3439        let fill = OrderFilledSpec::builder()
3440            .instrument_id(instrument.id())
3441            .position_id(PositionId::from("P-SEALED-REPLAY-BAD-SNAPSHOT-1"))
3442            .build();
3443        let position = Position::new(&instrument, fill);
3444        let mut snapshot_cache = Cache::default();
3445        let snapshot_ref = snapshot_cache
3446            .snapshot_position(&position)
3447            .expect("snapshot position");
3448
3449        {
3450            let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3451            backend.open_run(manifest(run_id)).expect("open run");
3452            backend
3453                .record_snapshot_anchor(SnapshotAnchor::new(
3454                    0,
3455                    snapshot_ref.blob_ref.clone(),
3456                    compute_snapshot_content_hash(snapshot_ref.blob.as_ref()),
3457                ))
3458                .expect("record snapshot anchor");
3459            backend.seal(RunStatus::Ended).expect("seal run");
3460        }
3461
3462        let mut cache = Cache::default();
3463        cache
3464            .add(
3465                &snapshot_ref.blob_ref,
3466                Bytes::from_static(b"tampered snapshot"),
3467            )
3468            .expect("seed tampered snapshot blob");
3469
3470        let err = restore_cache_from_sealed_run(
3471            &mut cache,
3472            tmp.path().to_path_buf(),
3473            instance_id,
3474            run_id,
3475        )
3476        .expect_err("hash mismatch");
3477
3478        match err {
3479            CacheReplayError::SnapshotRestore { blob_ref, message } => {
3480                assert_eq!(blob_ref, snapshot_ref.blob_ref);
3481                assert!(
3482                    message.contains("content_hash mismatch"),
3483                    "message should explain hash mismatch: {message}",
3484                );
3485            }
3486            other => panic!("expected SnapshotRestore, was {other:?}"),
3487        }
3488    }
3489
3490    #[rstest]
3491    fn open_event_store_replay_source_rejects_running_run() {
3492        let tmp = TempDir::new().expect("tempdir");
3493        let run_id = "running-replay";
3494        {
3495            let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3496            backend.open_run(manifest(run_id)).expect("open run");
3497        }
3498
3499        let err = open_event_store_replay_source(tmp.path().to_path_buf(), "trader-001", run_id)
3500            .expect_err("running source must fail");
3501
3502        assert!(
3503            err.to_string().contains("not sealed"),
3504            "error should name sealed-run requirement: {err}",
3505        );
3506    }
3507
3508    #[rstest]
3509    fn validate_event_store_replay_source_rejects_quarantined_run() {
3510        let tmp = TempDir::new().expect("tempdir");
3511        let run_id = "quarantined-replay";
3512        {
3513            let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3514            backend.open_run(manifest(run_id)).expect("open run");
3515            backend
3516                .append_batch(&[append_payload(1, "RunStarted", Bytes::new())])
3517                .expect("append");
3518            backend.seal(RunStatus::Quarantined).expect("seal run");
3519        }
3520
3521        let err =
3522            validate_event_store_replay_source(tmp.path().to_path_buf(), "trader-001", run_id)
3523                .expect_err("quarantined source must fail");
3524
3525        assert!(
3526            err.to_string().contains("quarantined"),
3527            "error should reject quarantined replay sources: {err}",
3528        );
3529    }
3530}