1use std::{fmt::Display, path::PathBuf};
24
25use nautilus_common::{
26 cache::Cache,
27 messages::{
28 data::{
29 BarsResponse, FundingRatesResponse, InstrumentResponse, InstrumentsResponse,
30 QuotesResponse, TradesResponse,
31 },
32 execution::SubmitOrderList,
33 },
34};
35use nautilus_core::UnixNanos;
36use nautilus_model::{
37 data::{Bar, QuoteTick, TradeTick},
38 enums::OmsType,
39 events::{
40 AccountState, OrderEventAny, OrderFilled, OrderInitialized, PositionAdjusted,
41 PositionChanged, PositionClosed, PositionOpened,
42 },
43 orders::OrderAny,
44 position::Position,
45};
46use serde::de::DeserializeOwned;
47
48#[cfg(test)]
49use crate::capture::builtins::{
50 PAYLOAD_TYPE_BATCH_CANCEL_ORDERS, PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
51 PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE, PAYLOAD_TYPE_BOOK_RESPONSE, PAYLOAD_TYPE_CANCEL_ALL_ORDERS,
52 PAYLOAD_TYPE_CANCEL_ORDER, PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE,
53 PAYLOAD_TYPE_EXECUTION_MASS_STATUS, PAYLOAD_TYPE_FILL_REPORT,
54 PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE, PAYLOAD_TYPE_MODIFY_ORDER,
55 PAYLOAD_TYPE_ORDER_STATUS_REPORT, PAYLOAD_TYPE_ORDER_WITH_FILLS,
56 PAYLOAD_TYPE_POSITION_STATUS_REPORT, PAYLOAD_TYPE_QUERY_ACCOUNT, PAYLOAD_TYPE_QUERY_ORDER,
57 PAYLOAD_TYPE_REQUEST_COMMAND, PAYLOAD_TYPE_SUBMIT_ORDER, PAYLOAD_TYPE_SUBSCRIBE_COMMAND,
58 PAYLOAD_TYPE_TIME_EVENT, PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND,
59};
60#[cfg(all(test, feature = "defi"))]
61use crate::capture::builtins::{
62 PAYLOAD_TYPE_DEFI_REQUEST_COMMAND, PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND,
63 PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND,
64};
65use crate::{
66 RedbBackend,
67 backend::{EventStore, ScanDirection},
68 capture::builtins::{
69 PAYLOAD_TYPE_ACCOUNT_STATE, PAYLOAD_TYPE_BARS_RESPONSE,
70 PAYLOAD_TYPE_FUNDING_RATES_RESPONSE, PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
71 PAYLOAD_TYPE_INSTRUMENTS_RESPONSE, PAYLOAD_TYPE_ORDER_ACCEPTED,
72 PAYLOAD_TYPE_ORDER_CANCEL_REJECTED, PAYLOAD_TYPE_ORDER_CANCELED, PAYLOAD_TYPE_ORDER_DENIED,
73 PAYLOAD_TYPE_ORDER_EMULATED, PAYLOAD_TYPE_ORDER_EXPIRED, PAYLOAD_TYPE_ORDER_FILLED,
74 PAYLOAD_TYPE_ORDER_INITIALIZED, PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
75 PAYLOAD_TYPE_ORDER_PENDING_CANCEL, PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
76 PAYLOAD_TYPE_ORDER_REJECTED, PAYLOAD_TYPE_ORDER_RELEASED, PAYLOAD_TYPE_ORDER_SUBMITTED,
77 PAYLOAD_TYPE_ORDER_TRIGGERED, PAYLOAD_TYPE_ORDER_UPDATED, PAYLOAD_TYPE_POSITION_ADJUSTED,
78 PAYLOAD_TYPE_POSITION_CHANGED, PAYLOAD_TYPE_POSITION_CLOSED, PAYLOAD_TYPE_POSITION_OPENED,
79 PAYLOAD_TYPE_QUOTES_RESPONSE, PAYLOAD_TYPE_SUBMIT_ORDER_LIST, PAYLOAD_TYPE_TRADES_RESPONSE,
80 },
81 entry::EventStoreEntry,
82 error::EventStoreError,
83 manifest::{RunManifest, RunStatus},
84 reader::{EventStoreReader, SnapshotReplayPlan},
85 snapshot::{SnapshotAnchor, compute_snapshot_content_hash},
86};
87
88#[cfg(feature = "persistence")]
89mod catalog;
90
91#[cfg(feature = "persistence")]
92pub use catalog::ParquetReplayCatalog;
93
94#[derive(Clone, Debug, PartialEq, Eq)]
96pub struct CacheReplayReport {
97 pub plan: SnapshotReplayPlan,
99 pub applied_entries: usize,
101 pub ignored_entries: usize,
103}
104
105#[derive(Clone, Debug, PartialEq, Eq)]
107pub struct EventStoreReplayReport {
108 pub manifest: RunManifest,
110 pub cache: CacheReplayReport,
112}
113
114#[cfg(test)]
115pub(crate) const CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES: &[&str] = &[
116 PAYLOAD_TYPE_SUBMIT_ORDER_LIST,
117 PAYLOAD_TYPE_ACCOUNT_STATE,
118 PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
119 PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
120 PAYLOAD_TYPE_QUOTES_RESPONSE,
121 PAYLOAD_TYPE_TRADES_RESPONSE,
122 PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
123 PAYLOAD_TYPE_BARS_RESPONSE,
124 PAYLOAD_TYPE_ORDER_INITIALIZED,
125 PAYLOAD_TYPE_ORDER_DENIED,
126 PAYLOAD_TYPE_ORDER_EMULATED,
127 PAYLOAD_TYPE_ORDER_RELEASED,
128 PAYLOAD_TYPE_ORDER_SUBMITTED,
129 PAYLOAD_TYPE_ORDER_ACCEPTED,
130 PAYLOAD_TYPE_ORDER_REJECTED,
131 PAYLOAD_TYPE_ORDER_CANCELED,
132 PAYLOAD_TYPE_ORDER_EXPIRED,
133 PAYLOAD_TYPE_ORDER_TRIGGERED,
134 PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
135 PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
136 PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
137 PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
138 PAYLOAD_TYPE_ORDER_UPDATED,
139 PAYLOAD_TYPE_ORDER_FILLED,
140 PAYLOAD_TYPE_POSITION_OPENED,
141 PAYLOAD_TYPE_POSITION_CHANGED,
142 PAYLOAD_TYPE_POSITION_CLOSED,
143 PAYLOAD_TYPE_POSITION_ADJUSTED,
144];
145
146#[cfg(test)]
147pub(crate) const FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES: &[&str] = &[
148 PAYLOAD_TYPE_SUBMIT_ORDER,
149 PAYLOAD_TYPE_MODIFY_ORDER,
150 PAYLOAD_TYPE_CANCEL_ORDER,
151 PAYLOAD_TYPE_CANCEL_ALL_ORDERS,
152 PAYLOAD_TYPE_BATCH_CANCEL_ORDERS,
153 PAYLOAD_TYPE_QUERY_ORDER,
154 PAYLOAD_TYPE_QUERY_ACCOUNT,
155 PAYLOAD_TYPE_ORDER_STATUS_REPORT,
156 PAYLOAD_TYPE_FILL_REPORT,
157 PAYLOAD_TYPE_ORDER_WITH_FILLS,
158 PAYLOAD_TYPE_POSITION_STATUS_REPORT,
159 PAYLOAD_TYPE_EXECUTION_MASS_STATUS,
160 PAYLOAD_TYPE_TIME_EVENT,
161 PAYLOAD_TYPE_REQUEST_COMMAND,
162 PAYLOAD_TYPE_SUBSCRIBE_COMMAND,
163 PAYLOAD_TYPE_UNSUBSCRIBE_COMMAND,
164 #[cfg(feature = "defi")]
165 PAYLOAD_TYPE_DEFI_REQUEST_COMMAND,
166 #[cfg(feature = "defi")]
167 PAYLOAD_TYPE_DEFI_SUBSCRIBE_COMMAND,
168 #[cfg(feature = "defi")]
169 PAYLOAD_TYPE_DEFI_UNSUBSCRIBE_COMMAND,
170 PAYLOAD_TYPE_CUSTOM_DATA_RESPONSE,
171 PAYLOAD_TYPE_BOOK_RESPONSE,
172 PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
173 PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE,
174 PAYLOAD_TYPE_FORWARD_PRICES_RESPONSE,
175];
176
177#[derive(Clone, Copy, Debug, PartialEq, Eq)]
179pub struct ReplaySeqRange {
180 pub from_seq: u64,
182 pub to_seq: u64,
184}
185
186impl ReplaySeqRange {
187 #[must_use]
189 pub const fn new(from_seq: u64, to_seq: u64) -> Self {
190 Self { from_seq, to_seq }
191 }
192}
193
194#[derive(Clone, Copy, Debug, PartialEq, Eq)]
196pub struct ReplayTimeRange {
197 pub start: UnixNanos,
199 pub end: UnixNanos,
201}
202
203impl ReplayTimeRange {
204 #[must_use]
206 pub const fn new(start: UnixNanos, end: UnixNanos) -> Self {
207 Self { start, end }
208 }
209
210 fn from_entry(entry: &EventStoreEntry) -> Self {
211 Self {
212 start: entry.ts_init,
213 end: entry.ts_init,
214 }
215 }
216
217 fn include_entry(&mut self, entry: &EventStoreEntry) {
218 self.start = self.start.min(entry.ts_init);
219 self.end = self.end.max(entry.ts_init);
220 }
221}
222
223#[derive(Clone, Debug, PartialEq, Eq)]
225pub struct CatalogSliceSelector {
226 pub data_cls: String,
228 pub identifiers: Vec<String>,
230 pub start: Option<UnixNanos>,
232 pub end: Option<UnixNanos>,
234 pub required: bool,
236}
237
238impl CatalogSliceSelector {
239 pub fn new(data_cls: impl Into<String>) -> Self {
241 Self {
242 data_cls: data_cls.into(),
243 identifiers: Vec::new(),
244 start: None,
245 end: None,
246 required: false,
247 }
248 }
249
250 #[must_use]
252 pub fn with_identifier(mut self, identifier: impl Into<String>) -> Self {
253 self.identifiers.push(identifier.into());
254 self
255 }
256
257 #[must_use]
259 pub const fn with_time_bounds(mut self, start: UnixNanos, end: UnixNanos) -> Self {
260 self.start = Some(start);
261 self.end = Some(end);
262 self
263 }
264
265 #[must_use]
267 pub const fn require_coverage(mut self) -> Self {
268 self.required = true;
269 self
270 }
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
275pub struct CatalogSliceQuery {
276 pub data_cls: String,
278 pub identifiers: Vec<String>,
280 pub start: UnixNanos,
282 pub end: UnixNanos,
284 pub required: bool,
286}
287
288impl CatalogSliceQuery {
289 #[must_use]
291 pub fn identifiers_option(&self) -> Option<Vec<String>> {
292 if self.identifiers.is_empty() {
293 None
294 } else {
295 Some(self.identifiers.clone())
296 }
297 }
298}
299
300#[derive(Clone, Debug, Default, PartialEq, Eq)]
302pub struct CatalogSliceCoverage {
303 pub files: Vec<String>,
305 pub intervals: Vec<ReplayTimeRange>,
307}
308
309impl CatalogSliceCoverage {
310 #[must_use]
312 pub fn from_files(files: Vec<String>) -> Self {
313 Self {
314 files,
315 intervals: Vec::new(),
316 }
317 }
318
319 #[must_use]
321 pub fn is_missing(&self) -> bool {
322 self.files.is_empty()
323 }
324}
325
326#[derive(Clone, Debug, PartialEq, Eq)]
328pub struct CatalogSlicePlan {
329 pub query: CatalogSliceQuery,
331 pub coverage: CatalogSliceCoverage,
333}
334
335impl CatalogSlicePlan {
336 #[must_use]
338 pub fn is_missing(&self) -> bool {
339 self.coverage.is_missing()
340 }
341}
342
343#[derive(Clone, Copy, Debug, PartialEq, Eq)]
345pub enum CatalogReplayData {
346 Quote(QuoteTick),
348 Trade(TradeTick),
350 Bar(Bar),
352}
353
354impl CatalogReplayData {
355 #[must_use]
357 pub const fn data_cls(&self) -> &'static str {
358 match self {
359 Self::Quote(_) => "quotes",
360 Self::Trade(_) => "trades",
361 Self::Bar(_) => "bars",
362 }
363 }
364
365 #[must_use]
367 pub fn identifier(&self) -> String {
368 match self {
369 Self::Quote(quote) => quote.instrument_id.to_string(),
370 Self::Trade(trade) => trade.instrument_id.to_string(),
371 Self::Bar(bar) => bar.bar_type.to_string(),
372 }
373 }
374
375 #[must_use]
377 pub const fn ts_init(&self) -> UnixNanos {
378 match self {
379 Self::Quote(quote) => quote.ts_init,
380 Self::Trade(trade) => trade.ts_init,
381 Self::Bar(bar) => bar.ts_init,
382 }
383 }
384}
385
386impl From<QuoteTick> for CatalogReplayData {
387 fn from(value: QuoteTick) -> Self {
388 Self::Quote(value)
389 }
390}
391
392impl From<TradeTick> for CatalogReplayData {
393 fn from(value: TradeTick) -> Self {
394 Self::Trade(value)
395 }
396}
397
398impl From<Bar> for CatalogReplayData {
399 fn from(value: Bar) -> Self {
400 Self::Bar(value)
401 }
402}
403
404#[derive(Clone, Debug, PartialEq, Eq)]
406pub struct CatalogReplayRecord {
407 pub data_cls: String,
409 pub identifier: Option<String>,
411 pub ts_init: UnixNanos,
413 pub data: CatalogReplayData,
415}
416
417impl CatalogReplayRecord {
418 #[must_use]
420 pub fn from_data(data: CatalogReplayData) -> Self {
421 Self {
422 data_cls: data.data_cls().to_string(),
423 identifier: Some(data.identifier()),
424 ts_init: data.ts_init(),
425 data,
426 }
427 }
428}
429
430#[derive(Clone, Debug, PartialEq, Eq)]
432pub struct CatalogReplaySlice {
433 pub plan: CatalogSlicePlan,
435 pub records: Vec<CatalogReplayRecord>,
437}
438
439#[derive(Clone, Debug, PartialEq, Eq)]
441pub struct ReplayInputPlan {
442 pub requested_range: ReplaySeqRange,
444 pub event_range: Option<ReplaySeqRange>,
446 pub event_count: usize,
448 pub event_time_range: Option<ReplayTimeRange>,
450 pub catalog_slices: Vec<CatalogSlicePlan>,
452}
453
454impl ReplayInputPlan {
455 #[must_use]
457 pub fn missing_catalog_slices(&self) -> Vec<&CatalogSlicePlan> {
458 self.catalog_slices
459 .iter()
460 .filter(|slice| slice.is_missing())
461 .collect()
462 }
463}
464
465#[derive(Clone, Debug, PartialEq, Eq)]
467pub struct ReplayInputs {
468 pub entries: Vec<EventStoreEntry>,
470 pub catalog_slices: Vec<CatalogReplaySlice>,
472}
473
474pub trait ReplayCatalog {
476 type Error: Display;
478
479 fn plan_slice(
485 &mut self,
486 query: &CatalogSliceQuery,
487 ) -> Result<CatalogSliceCoverage, Self::Error>;
488
489 fn load_slice(
498 &mut self,
499 plan: &CatalogSlicePlan,
500 ) -> Result<Vec<CatalogReplayRecord>, Self::Error>;
501}
502
503#[derive(Debug, thiserror::Error)]
505pub enum ReplayInputError {
506 #[error(transparent)]
508 EventStore(#[from] EventStoreError),
509 #[error("invalid replay seq range {from_seq}..={to_seq}: {message}")]
511 InvalidSeqRange {
512 from_seq: u64,
514 to_seq: u64,
516 message: String,
518 },
519 #[error("catalog replay requires at least one selected catalog slice")]
521 EmptyCatalogSelection,
522 #[error(
524 "catalog slice {data_cls} requires explicit time bounds because the replay scan is empty"
525 )]
526 MissingCatalogTimeBounds {
527 data_cls: String,
529 },
530 #[error("invalid catalog time range for {data_cls}: {start}..={end}")]
532 InvalidCatalogTimeRange {
533 data_cls: String,
535 start: u64,
537 end: u64,
539 },
540 #[error("required catalog slice {data_cls} is missing for identifiers {identifiers:?}")]
542 MissingCatalogSlice {
543 data_cls: String,
545 identifiers: Vec<String>,
547 },
548 #[error("catalog slice {data_cls}: {message}")]
550 Catalog {
551 data_cls: String,
553 message: String,
555 },
556}
557
558#[derive(Debug, thiserror::Error)]
560pub enum CacheReplayError {
561 #[error(transparent)]
563 EventStore(#[from] EventStoreError),
564 #[error("restore cache snapshot {blob_ref}: {message}")]
566 SnapshotRestore {
567 blob_ref: String,
569 message: String,
571 },
572 #[error("entry seq {seq} is before replay start seq {from_seq}")]
574 UnexpectedSeq {
575 seq: u64,
577 from_seq: u64,
579 },
580 #[error("decode seq {seq} payload_type {payload_type}: {message}")]
582 Decode {
583 seq: u64,
585 payload_type: String,
587 message: String,
589 },
590 #[error("apply seq {seq} payload_type {payload_type}: {message}")]
592 Apply {
593 seq: u64,
595 payload_type: String,
597 message: String,
599 },
600}
601
602impl CacheReplayError {
603 #[must_use]
605 pub fn snapshot_restore(anchor: &SnapshotAnchor, error: impl Display) -> Self {
606 Self::SnapshotRestore {
607 blob_ref: anchor.blob_ref.clone(),
608 message: error.to_string(),
609 }
610 }
611}
612
613pub fn restore_cache_snapshot_and_replay_tail<B, F>(
631 cache: &mut Cache,
632 reader: &EventStoreReader<B>,
633 restore_snapshot: F,
634) -> Result<CacheReplayReport, CacheReplayError>
635where
636 B: EventStore,
637 F: FnOnce(&mut Cache, Option<&SnapshotAnchor>) -> Result<(), CacheReplayError>,
638{
639 let (plan, scan) = reader.scan_snapshot_replay_tail()?;
640 restore_snapshot(cache, plan.anchor.as_ref())?;
641
642 let mut applied_entries = 0;
643 let mut ignored_entries = 0;
644
645 for entry in scan {
646 let entry = entry?;
647
648 if entry.seq < plan.from_seq {
649 return Err(CacheReplayError::UnexpectedSeq {
650 seq: entry.seq,
651 from_seq: plan.from_seq,
652 });
653 }
654
655 if apply_cache_replay_entry(cache, &entry)? {
656 applied_entries += 1;
657 } else {
658 ignored_entries += 1;
659 }
660 }
661
662 Ok(CacheReplayReport {
663 plan,
664 applied_entries,
665 ignored_entries,
666 })
667}
668
669pub fn replay_cache_snapshot_tail<B>(
678 cache: &mut Cache,
679 reader: &EventStoreReader<B>,
680) -> Result<CacheReplayReport, CacheReplayError>
681where
682 B: EventStore,
683{
684 restore_cache_snapshot_and_replay_tail(cache, reader, |_, _| Ok(()))
685}
686
687pub fn plan_forensics_replay_inputs<B>(
697 reader: &EventStoreReader<B>,
698 range: ReplaySeqRange,
699) -> Result<ReplayInputPlan, ReplayInputError>
700where
701 B: EventStore,
702{
703 let span = collect_replay_entry_span(reader, range)?;
704 Ok(ReplayInputPlan {
705 requested_range: range,
706 event_range: span.event_range,
707 event_count: span.event_count,
708 event_time_range: span.time_range,
709 catalog_slices: Vec::new(),
710 })
711}
712
713pub fn load_forensics_replay_inputs<B>(
722 reader: &EventStoreReader<B>,
723 plan: &ReplayInputPlan,
724) -> Result<ReplayInputs, ReplayInputError>
725where
726 B: EventStore,
727{
728 let entries = load_replay_entries(reader, plan.requested_range)?;
729 Ok(ReplayInputs {
730 entries,
731 catalog_slices: Vec::new(),
732 })
733}
734
735pub fn plan_catalog_replay_inputs<B, C>(
749 reader: &EventStoreReader<B>,
750 catalog: &mut C,
751 range: ReplaySeqRange,
752 catalog_slices: &[CatalogSliceSelector],
753) -> Result<ReplayInputPlan, ReplayInputError>
754where
755 B: EventStore,
756 C: ReplayCatalog,
757{
758 plan_catalog_joined_replay_inputs(reader, catalog, range, catalog_slices)
759}
760
761pub fn load_catalog_replay_inputs<B, C>(
773 reader: &EventStoreReader<B>,
774 catalog: &mut C,
775 plan: &ReplayInputPlan,
776) -> Result<ReplayInputs, ReplayInputError>
777where
778 B: EventStore,
779 C: ReplayCatalog,
780{
781 load_catalog_joined_replay_inputs(reader, catalog, plan)
782}
783
784pub fn restore_cache_from_sealed_run(
797 cache: &mut Cache,
798 base_dir: impl Into<PathBuf>,
799 instance_id: &str,
800 run_id: &str,
801) -> Result<EventStoreReplayReport, CacheReplayError> {
802 let (manifest, reader) = open_event_store_replay_source(base_dir, instance_id, run_id)?;
803 let cache_report =
804 restore_cache_snapshot_and_replay_tail(cache, &reader, restore_cache_snapshot_blob)?;
805
806 Ok(EventStoreReplayReport {
807 manifest,
808 cache: cache_report,
809 })
810}
811
812pub fn open_event_store_replay_source(
819 base_dir: impl Into<PathBuf>,
820 instance_id: &str,
821 run_id: &str,
822) -> Result<(RunManifest, EventStoreReader<RedbBackend>), CacheReplayError> {
823 let backend = RedbBackend::open_sealed(base_dir, instance_id, run_id)?;
824 let manifest = backend.manifest()?;
825 reject_quarantined_replay_source(run_id, manifest.status)?;
826 Ok((manifest, EventStoreReader::new(backend)))
827}
828
829pub fn validate_event_store_replay_source(
836 base_dir: impl Into<PathBuf>,
837 instance_id: &str,
838 run_id: &str,
839) -> Result<RunManifest, CacheReplayError> {
840 let backend = RedbBackend::open_sealed(base_dir, instance_id, run_id)?;
841 let manifest = backend.manifest()?;
842 reject_quarantined_replay_source(run_id, manifest.status)?;
843 Ok(manifest)
844}
845
846#[derive(Clone, Copy, Debug, PartialEq, Eq)]
847struct ReplayEntrySpan {
848 event_range: Option<ReplaySeqRange>,
849 event_count: usize,
850 time_range: Option<ReplayTimeRange>,
851}
852
853fn plan_catalog_joined_replay_inputs<B, C>(
854 reader: &EventStoreReader<B>,
855 catalog: &mut C,
856 range: ReplaySeqRange,
857 catalog_slices: &[CatalogSliceSelector],
858) -> Result<ReplayInputPlan, ReplayInputError>
859where
860 B: EventStore,
861 C: ReplayCatalog,
862{
863 if catalog_slices.is_empty() {
864 return Err(ReplayInputError::EmptyCatalogSelection);
865 }
866
867 let span = collect_replay_entry_span(reader, range)?;
868 let catalog_slices = plan_catalog_slices(catalog, catalog_slices, span.time_range)?;
869
870 Ok(ReplayInputPlan {
871 requested_range: range,
872 event_range: span.event_range,
873 event_count: span.event_count,
874 event_time_range: span.time_range,
875 catalog_slices,
876 })
877}
878
879fn load_catalog_joined_replay_inputs<B, C>(
880 reader: &EventStoreReader<B>,
881 catalog: &mut C,
882 plan: &ReplayInputPlan,
883) -> Result<ReplayInputs, ReplayInputError>
884where
885 B: EventStore,
886 C: ReplayCatalog,
887{
888 let entries = load_replay_entries(reader, plan.requested_range)?;
889 let catalog_slices = load_catalog_slices(catalog, &plan.catalog_slices)?;
890
891 Ok(ReplayInputs {
892 entries,
893 catalog_slices,
894 })
895}
896
897fn collect_replay_entry_span<B>(
898 reader: &EventStoreReader<B>,
899 range: ReplaySeqRange,
900) -> Result<ReplayEntrySpan, ReplayInputError>
901where
902 B: EventStore,
903{
904 validate_seq_range(range)?;
905
906 let mut first_seq = None;
907 let mut last_seq = None;
908 let mut event_count = 0;
909 let mut time_range: Option<ReplayTimeRange> = None;
910
911 for entry in reader.scan_range(range.from_seq, range.to_seq, ScanDirection::Forward) {
912 let entry = entry?;
913 first_seq.get_or_insert(entry.seq);
914 last_seq = Some(entry.seq);
915 event_count += 1;
916
917 match time_range.as_mut() {
918 Some(bounds) => bounds.include_entry(&entry),
919 None => time_range = Some(ReplayTimeRange::from_entry(&entry)),
920 }
921 }
922
923 let event_range = match (first_seq, last_seq) {
924 (Some(from_seq), Some(to_seq)) => Some(ReplaySeqRange::new(from_seq, to_seq)),
925 _ => None,
926 };
927
928 Ok(ReplayEntrySpan {
929 event_range,
930 event_count,
931 time_range,
932 })
933}
934
935fn load_replay_entries<B>(
936 reader: &EventStoreReader<B>,
937 range: ReplaySeqRange,
938) -> Result<Vec<EventStoreEntry>, ReplayInputError>
939where
940 B: EventStore,
941{
942 validate_seq_range(range)?;
943
944 reader
945 .scan_range(range.from_seq, range.to_seq, ScanDirection::Forward)
946 .collect::<Result<Vec<_>, _>>()
947 .map_err(ReplayInputError::from)
948}
949
950fn plan_catalog_slices<C>(
951 catalog: &mut C,
952 selectors: &[CatalogSliceSelector],
953 event_time_range: Option<ReplayTimeRange>,
954) -> Result<Vec<CatalogSlicePlan>, ReplayInputError>
955where
956 C: ReplayCatalog,
957{
958 let mut plans = Vec::with_capacity(selectors.len());
959
960 for selector in selectors {
961 let query = resolve_catalog_slice_query(selector, event_time_range)?;
962 let coverage = catalog
963 .plan_slice(&query)
964 .map_err(|e| ReplayInputError::Catalog {
965 data_cls: query.data_cls.clone(),
966 message: e.to_string(),
967 })?;
968 plans.push(CatalogSlicePlan { query, coverage });
969 }
970
971 Ok(plans)
972}
973
974fn load_catalog_slices<C>(
975 catalog: &mut C,
976 plans: &[CatalogSlicePlan],
977) -> Result<Vec<CatalogReplaySlice>, ReplayInputError>
978where
979 C: ReplayCatalog,
980{
981 let mut slices = Vec::with_capacity(plans.len());
982
983 for plan in plans {
984 if plan.is_missing() {
985 if plan.query.required {
986 return Err(ReplayInputError::MissingCatalogSlice {
987 data_cls: plan.query.data_cls.clone(),
988 identifiers: plan.query.identifiers.clone(),
989 });
990 }
991
992 slices.push(CatalogReplaySlice {
993 plan: plan.clone(),
994 records: Vec::new(),
995 });
996 continue;
997 }
998
999 let records = catalog
1000 .load_slice(plan)
1001 .map_err(|e| ReplayInputError::Catalog {
1002 data_cls: plan.query.data_cls.clone(),
1003 message: e.to_string(),
1004 })?;
1005 slices.push(CatalogReplaySlice {
1006 plan: plan.clone(),
1007 records,
1008 });
1009 }
1010
1011 Ok(slices)
1012}
1013
1014fn resolve_catalog_slice_query(
1015 selector: &CatalogSliceSelector,
1016 event_time_range: Option<ReplayTimeRange>,
1017) -> Result<CatalogSliceQuery, ReplayInputError> {
1018 let Some(start) = selector
1019 .start
1020 .or(event_time_range.map(|bounds| bounds.start))
1021 else {
1022 return Err(ReplayInputError::MissingCatalogTimeBounds {
1023 data_cls: selector.data_cls.clone(),
1024 });
1025 };
1026 let Some(end) = selector.end.or(event_time_range.map(|bounds| bounds.end)) else {
1027 return Err(ReplayInputError::MissingCatalogTimeBounds {
1028 data_cls: selector.data_cls.clone(),
1029 });
1030 };
1031
1032 if start > end {
1033 return Err(ReplayInputError::InvalidCatalogTimeRange {
1034 data_cls: selector.data_cls.clone(),
1035 start: start.as_u64(),
1036 end: end.as_u64(),
1037 });
1038 }
1039
1040 Ok(CatalogSliceQuery {
1041 data_cls: selector.data_cls.clone(),
1042 identifiers: selector.identifiers.clone(),
1043 start,
1044 end,
1045 required: selector.required,
1046 })
1047}
1048
1049fn validate_seq_range(range: ReplaySeqRange) -> Result<(), ReplayInputError> {
1050 if range.from_seq == 0 {
1051 return Err(ReplayInputError::InvalidSeqRange {
1052 from_seq: range.from_seq,
1053 to_seq: range.to_seq,
1054 message: "seq is 1-based".to_string(),
1055 });
1056 }
1057
1058 if range.from_seq > range.to_seq {
1059 return Err(ReplayInputError::InvalidSeqRange {
1060 from_seq: range.from_seq,
1061 to_seq: range.to_seq,
1062 message: "from_seq exceeds to_seq".to_string(),
1063 });
1064 }
1065
1066 Ok(())
1067}
1068
1069pub fn restore_cache_snapshot_blob(
1076 cache: &mut Cache,
1077 anchor: Option<&SnapshotAnchor>,
1078) -> Result<(), CacheReplayError> {
1079 let Some(anchor) = anchor else {
1080 return Ok(());
1081 };
1082
1083 let blob = cache
1084 .load_snapshot_blob(&anchor.blob_ref)
1085 .map_err(|e| CacheReplayError::snapshot_restore(anchor, e))?
1086 .ok_or_else(|| CacheReplayError::snapshot_restore(anchor, "snapshot blob not found"))?;
1087 let actual_hash = compute_snapshot_content_hash(blob.as_ref());
1088
1089 if actual_hash != anchor.content_hash {
1090 return Err(CacheReplayError::snapshot_restore(
1091 anchor,
1092 format!(
1093 "content_hash mismatch: expected {}, actual {actual_hash}",
1094 anchor.content_hash
1095 ),
1096 ));
1097 }
1098
1099 cache
1100 .restore_snapshot_blob(&anchor.blob_ref, blob)
1101 .map_err(|e| CacheReplayError::snapshot_restore(anchor, e))
1102}
1103
1104pub fn apply_cache_replay_entry(
1114 cache: &mut Cache,
1115 entry: &EventStoreEntry,
1116) -> Result<bool, CacheReplayError> {
1117 if apply_complete_cache_payload_entry(cache, entry)? {
1118 return Ok(true);
1119 }
1120
1121 match entry.payload_type.as_str() {
1122 PAYLOAD_TYPE_ACCOUNT_STATE => {
1123 let state = decode_payload::<AccountState>(entry)?;
1124 apply_result(entry, cache.update_account_state(&state))?;
1125 }
1126 PAYLOAD_TYPE_ORDER_INITIALIZED => {
1127 let event = decode_order_event::<OrderInitialized>(entry, OrderEventAny::Initialized)?;
1128 let order = OrderAny::from_events(vec![event]).map_err(|e| apply_error(entry, e))?;
1129 apply_result(entry, cache.add_order(order, None, None, false))?;
1130 }
1131 PAYLOAD_TYPE_ORDER_DENIED => {
1132 apply_order_event(cache, entry, OrderEventAny::Denied)?;
1133 }
1134 PAYLOAD_TYPE_ORDER_EMULATED => {
1135 apply_order_event(cache, entry, OrderEventAny::Emulated)?;
1136 }
1137 PAYLOAD_TYPE_ORDER_RELEASED => {
1138 apply_order_event(cache, entry, OrderEventAny::Released)?;
1139 }
1140 PAYLOAD_TYPE_ORDER_SUBMITTED => {
1141 apply_order_event(cache, entry, OrderEventAny::Submitted)?;
1142 }
1143 PAYLOAD_TYPE_ORDER_ACCEPTED => {
1144 apply_order_event(cache, entry, OrderEventAny::Accepted)?;
1145 }
1146 PAYLOAD_TYPE_ORDER_REJECTED => {
1147 apply_order_event(cache, entry, OrderEventAny::Rejected)?;
1148 }
1149 PAYLOAD_TYPE_ORDER_CANCELED => {
1150 apply_order_event(cache, entry, OrderEventAny::Canceled)?;
1151 }
1152 PAYLOAD_TYPE_ORDER_EXPIRED => {
1153 apply_order_event(cache, entry, OrderEventAny::Expired)?;
1154 }
1155 PAYLOAD_TYPE_ORDER_TRIGGERED => {
1156 apply_order_event(cache, entry, OrderEventAny::Triggered)?;
1157 }
1158 PAYLOAD_TYPE_ORDER_PENDING_UPDATE => {
1159 apply_order_event(cache, entry, OrderEventAny::PendingUpdate)?;
1160 }
1161 PAYLOAD_TYPE_ORDER_PENDING_CANCEL => {
1162 apply_order_event(cache, entry, OrderEventAny::PendingCancel)?;
1163 }
1164 PAYLOAD_TYPE_ORDER_MODIFY_REJECTED => {
1165 apply_order_event(cache, entry, OrderEventAny::ModifyRejected)?;
1166 }
1167 PAYLOAD_TYPE_ORDER_CANCEL_REJECTED => {
1168 apply_order_event(cache, entry, OrderEventAny::CancelRejected)?;
1169 }
1170 PAYLOAD_TYPE_ORDER_UPDATED => {
1171 apply_order_event(cache, entry, OrderEventAny::Updated)?;
1172 }
1173 PAYLOAD_TYPE_ORDER_FILLED => {
1174 let fill = decode_payload::<OrderFilled>(entry)?;
1175 let event = OrderEventAny::Filled(fill);
1176 apply_result(entry, cache.update_order(&event))?;
1177 apply_fill_to_position(cache, entry, &fill)?;
1178 }
1179 PAYLOAD_TYPE_POSITION_OPENED => {
1180 let opened = decode_payload::<PositionOpened>(entry)?;
1181 apply_position_opened(cache, entry, &opened)?;
1182 }
1183 PAYLOAD_TYPE_POSITION_CHANGED => {
1184 let changed = decode_payload::<PositionChanged>(entry)?;
1185 apply_position_changed(cache, entry, &changed)?;
1186 }
1187 PAYLOAD_TYPE_POSITION_CLOSED => {
1188 let closed = decode_payload::<PositionClosed>(entry)?;
1189 apply_position_closed(cache, entry, &closed)?;
1190 }
1191 PAYLOAD_TYPE_POSITION_ADJUSTED => {
1192 let adjustment = decode_payload::<PositionAdjusted>(entry)?;
1193 apply_position_adjustment(cache, entry, adjustment)?;
1194 }
1195 _ => return Ok(false),
1196 }
1197
1198 Ok(true)
1199}
1200
1201fn apply_complete_cache_payload_entry(
1202 cache: &mut Cache,
1203 entry: &EventStoreEntry,
1204) -> Result<bool, CacheReplayError> {
1205 match entry.payload_type.as_str() {
1206 PAYLOAD_TYPE_SUBMIT_ORDER_LIST => {
1207 let command = decode_payload::<SubmitOrderList>(entry)?;
1208 apply_result(entry, cache.add_order_list(command.order_list))?;
1209 }
1210 PAYLOAD_TYPE_INSTRUMENT_RESPONSE => {
1211 let response = decode_payload::<InstrumentResponse>(entry)?;
1212 apply_result(entry, cache.add_instrument(response.data))?;
1213 }
1214 PAYLOAD_TYPE_INSTRUMENTS_RESPONSE => {
1215 let response = decode_payload::<InstrumentsResponse>(entry)?;
1216 for instrument in response.data {
1217 apply_result(entry, cache.add_instrument(instrument))?;
1218 }
1219 }
1220 PAYLOAD_TYPE_QUOTES_RESPONSE => {
1221 let response = decode_payload::<QuotesResponse>(entry)?;
1222 if !response.data.is_empty() {
1223 apply_result(entry, cache.add_quotes(&response.data))?;
1224 }
1225 }
1226 PAYLOAD_TYPE_TRADES_RESPONSE => {
1227 let response = decode_payload::<TradesResponse>(entry)?;
1228 if !response.data.is_empty() {
1229 apply_result(entry, cache.add_trades(&response.data))?;
1230 }
1231 }
1232 PAYLOAD_TYPE_FUNDING_RATES_RESPONSE => {
1233 let response = decode_payload::<FundingRatesResponse>(entry)?;
1234 if !response.data.is_empty() {
1235 apply_result(entry, cache.add_funding_rates(&response.data))?;
1236 }
1237 }
1238 PAYLOAD_TYPE_BARS_RESPONSE => {
1239 let response = decode_payload::<BarsResponse>(entry)?;
1240 if !response.data.is_empty() {
1241 apply_result(entry, cache.add_bars(&response.data))?;
1242 }
1243 }
1244 _ => return Ok(false),
1245 }
1246
1247 Ok(true)
1248}
1249
1250fn apply_order_event<T>(
1251 cache: &mut Cache,
1252 entry: &EventStoreEntry,
1253 wrap: impl FnOnce(T) -> OrderEventAny,
1254) -> Result<(), CacheReplayError>
1255where
1256 T: DeserializeOwned,
1257{
1258 let event = decode_order_event(entry, wrap)?;
1259 apply_result(entry, cache.update_order(&event))?;
1260 Ok(())
1261}
1262
1263fn decode_order_event<T>(
1264 entry: &EventStoreEntry,
1265 wrap: impl FnOnce(T) -> OrderEventAny,
1266) -> Result<OrderEventAny, CacheReplayError>
1267where
1268 T: DeserializeOwned,
1269{
1270 Ok(wrap(decode_payload(entry)?))
1271}
1272
1273fn apply_fill_to_position(
1274 cache: &mut Cache,
1275 entry: &EventStoreEntry,
1276 fill: &OrderFilled,
1277) -> Result<(), CacheReplayError> {
1278 let Some(position_id) = fill.position_id else {
1279 return Ok(());
1280 };
1281
1282 if let Some(mut position) = cache.position_owned(&position_id) {
1283 if position.trade_ids().contains(&fill.trade_id) {
1284 return Ok(());
1285 }
1286
1287 position.apply(fill);
1288 apply_result(entry, cache.update_position(&position))?;
1289 return Ok(());
1290 }
1291
1292 let Some(instrument) = cache.instrument(&fill.instrument_id).cloned() else {
1293 return Ok(());
1294 };
1295
1296 let position = Position::new(&instrument, *fill);
1297 apply_result(entry, cache.add_position(&position, OmsType::Unspecified))?;
1298 Ok(())
1299}
1300
1301fn apply_position_opened(
1302 cache: &mut Cache,
1303 entry: &EventStoreEntry,
1304 opened: &PositionOpened,
1305) -> Result<(), CacheReplayError> {
1306 let Some(mut position) = cache.position_owned(&opened.position_id) else {
1307 return Ok(());
1308 };
1309
1310 position.trader_id = opened.trader_id;
1311 position.strategy_id = opened.strategy_id;
1312 position.instrument_id = opened.instrument_id;
1313 position.id = opened.position_id;
1314 position.account_id = opened.account_id;
1315 position.opening_order_id = opened.opening_order_id;
1316 position.closing_order_id = None;
1317 position.entry = opened.entry;
1318 position.side = opened.side;
1319 position.signed_qty = opened.signed_qty;
1320 position.quantity = opened.quantity;
1321 position.peak_qty = opened.quantity;
1322 position.quote_currency = opened.currency;
1323 position.ts_opened = opened.ts_event;
1324 position.ts_last = opened.ts_event;
1325 position.ts_closed = None;
1326 position.duration_ns = 0;
1327 position.avg_px_open = opened.avg_px_open;
1328 position.avg_px_close = None;
1329 position.realized_return = 0.0;
1330
1331 apply_result(entry, cache.update_position(&position))?;
1332 Ok(())
1333}
1334
1335fn apply_position_changed(
1336 cache: &mut Cache,
1337 entry: &EventStoreEntry,
1338 changed: &PositionChanged,
1339) -> Result<(), CacheReplayError> {
1340 let Some(mut position) = cache.position_owned(&changed.position_id) else {
1341 return Ok(());
1342 };
1343
1344 position.trader_id = changed.trader_id;
1345 position.strategy_id = changed.strategy_id;
1346 position.instrument_id = changed.instrument_id;
1347 position.id = changed.position_id;
1348 position.account_id = changed.account_id;
1349 position.opening_order_id = changed.opening_order_id;
1350 position.entry = changed.entry;
1351 position.side = changed.side;
1352 position.signed_qty = changed.signed_qty;
1353 position.quantity = changed.quantity;
1354 position.peak_qty = changed.peak_quantity;
1355 position.quote_currency = changed.currency;
1356 position.ts_opened = changed.ts_opened;
1357 position.ts_last = changed.ts_event;
1358 position.ts_closed = None;
1359 position.avg_px_open = changed.avg_px_open;
1360 position.avg_px_close = changed.avg_px_close;
1361 position.realized_return = changed.realized_return;
1362 position.realized_pnl = changed.realized_pnl;
1363
1364 apply_result(entry, cache.update_position(&position))?;
1365 Ok(())
1366}
1367
1368fn apply_position_closed(
1369 cache: &mut Cache,
1370 entry: &EventStoreEntry,
1371 closed: &PositionClosed,
1372) -> Result<(), CacheReplayError> {
1373 let Some(mut position) = cache.position_owned(&closed.position_id) else {
1374 return Ok(());
1375 };
1376
1377 position.trader_id = closed.trader_id;
1378 position.strategy_id = closed.strategy_id;
1379 position.instrument_id = closed.instrument_id;
1380 position.id = closed.position_id;
1381 position.account_id = closed.account_id;
1382 position.opening_order_id = closed.opening_order_id;
1383 position.closing_order_id = closed.closing_order_id;
1384 position.entry = closed.entry;
1385 position.side = closed.side;
1386 position.signed_qty = closed.signed_qty;
1387 position.quantity = closed.quantity;
1388 position.peak_qty = closed.peak_quantity;
1389 position.quote_currency = closed.currency;
1390 position.ts_opened = closed.ts_opened;
1391 position.ts_last = closed.ts_event;
1392 position.ts_closed = closed.ts_closed;
1393 position.duration_ns = closed.duration;
1394 position.avg_px_open = closed.avg_px_open;
1395 position.avg_px_close = closed.avg_px_close;
1396 position.realized_return = closed.realized_return;
1397 position.realized_pnl = closed.realized_pnl;
1398
1399 apply_result(entry, cache.update_position(&position))?;
1400 Ok(())
1401}
1402
1403fn apply_position_adjustment(
1404 cache: &mut Cache,
1405 entry: &EventStoreEntry,
1406 adjustment: PositionAdjusted,
1407) -> Result<(), CacheReplayError> {
1408 let Some(mut position) = cache.position_owned(&adjustment.position_id) else {
1409 return Ok(());
1410 };
1411
1412 position.apply_adjustment(adjustment);
1413 apply_result(entry, cache.update_position(&position))?;
1414 Ok(())
1415}
1416
1417fn decode_payload<T>(entry: &EventStoreEntry) -> Result<T, CacheReplayError>
1418where
1419 T: DeserializeOwned,
1420{
1421 rmp_serde::from_slice(&entry.payload).map_err(|e| CacheReplayError::Decode {
1422 seq: entry.seq,
1423 payload_type: entry.payload_type.to_string(),
1424 message: e.to_string(),
1425 })
1426}
1427
1428fn apply_result<T, E>(entry: &EventStoreEntry, result: Result<T, E>) -> Result<T, CacheReplayError>
1429where
1430 E: Display,
1431{
1432 result.map_err(|e| apply_error(entry, e))
1433}
1434
1435fn apply_error(entry: &EventStoreEntry, error: impl Display) -> CacheReplayError {
1436 CacheReplayError::Apply {
1437 seq: entry.seq,
1438 payload_type: entry.payload_type.to_string(),
1439 message: error.to_string(),
1440 }
1441}
1442
1443fn reject_quarantined_replay_source(
1444 run_id: &str,
1445 status: RunStatus,
1446) -> Result<(), CacheReplayError> {
1447 if matches!(status, RunStatus::Quarantined) {
1448 let error = EventStoreError::Backend(format!("replay source {run_id} is quarantined"));
1449 return Err(CacheReplayError::from(error));
1450 }
1451
1452 Ok(())
1453}
1454
1455#[cfg(test)]
1456mod tests {
1457 use std::{any::Any, cell::Cell, rc::Rc};
1458
1459 use ahash::AHashSet;
1460 use bytes::Bytes;
1461 use indexmap::IndexMap;
1462 use nautilus_common::msgbus::{self, BusTap, Endpoint, MStr, Topic as BusTopic};
1463 use nautilus_core::{UUID4, UnixNanos};
1464 use nautilus_model::{
1465 accounts::AccountAny,
1466 data::{Bar, BarSpecification, BarType, FundingRateUpdate, QuoteTick, TradeTick},
1467 enums::{
1468 AggregationSource, AggressorSide, BarAggregation, OrderSide, OrderStatus,
1469 PositionAdjustmentType, PriceType,
1470 },
1471 events::{
1472 PositionEvent,
1473 account::stubs::{cash_account_state, cash_account_state_million_usd},
1474 order::spec::{
1475 OrderAcceptedSpec, OrderFilledSpec, OrderInitializedSpec, OrderSubmittedSpec,
1476 },
1477 },
1478 identifiers::{
1479 AccountId, ClientId, ClientOrderId, InstrumentId, OrderListId, PositionId, TradeId,
1480 VenueOrderId,
1481 },
1482 instruments::{Instrument, InstrumentAny, stubs::audusd_sim},
1483 orders::{Order, OrderList},
1484 types::{Currency, Money, Price, Quantity},
1485 };
1486 use rstest::rstest;
1487 use serde::Serialize;
1488 use tempfile::TempDir;
1489 use ustr::Ustr;
1490
1491 use super::*;
1492 use crate::{
1493 backend::{AppendEntry, MemoryBackend, RedbBackend},
1494 capture::{
1495 builtins::{
1496 DEFAULT_CAPTURE_PAYLOAD_TYPES, encode_order_event_any, encode_position_event,
1497 },
1498 encode_account_state,
1499 },
1500 entry::Topic as EntryTopic,
1501 hash::compute_entry_hash,
1502 headers::Headers,
1503 manifest::{RegisteredComponents, RunManifest, RunStatus},
1504 snapshot::SnapshotAnchor,
1505 };
1506
1507 fn manifest(run_id: &str) -> RunManifest {
1508 RunManifest {
1509 run_id: run_id.to_string(),
1510 parent_run_id: None,
1511 instance_id: "trader-001".to_string(),
1512 binary_hash: "deadbeef".to_string(),
1513 schema_version: 1,
1514 crate_versions: "feedface".to_string(),
1515 feature_flags: Vec::new(),
1516 adapter_versions: IndexMap::new(),
1517 config_hash: "cafebabe".to_string(),
1518 registered_components: RegisteredComponents::default(),
1519 seed: None,
1520 start_ts_init: UnixNanos::from(0),
1521 end_ts_init: None,
1522 high_watermark: 0,
1523 status: RunStatus::Running,
1524 }
1525 }
1526
1527 fn append_payload(seq: u64, payload_type: &str, payload: Bytes) -> AppendEntry {
1528 append_payload_with_ts(seq, seq, payload_type, payload)
1529 }
1530
1531 fn append_serde_payload<T: Serialize>(seq: u64, payload_type: &str, value: &T) -> AppendEntry {
1532 let payload = rmp_serde::to_vec_named(value).expect("encode replay payload");
1533 append_payload(seq, payload_type, Bytes::from(payload))
1534 }
1535
1536 fn append_payload_with_ts(
1537 seq: u64,
1538 ts_init: u64,
1539 payload_type: &str,
1540 payload: Bytes,
1541 ) -> AppendEntry {
1542 let topic = EntryTopic::from("events.account.SIM");
1543 let ts = UnixNanos::from(ts_init);
1544 let headers = Headers::empty();
1545 let hash = compute_entry_hash(
1546 seq,
1547 ts,
1548 ts,
1549 topic.as_ref(),
1550 payload_type,
1551 &payload,
1552 &headers,
1553 );
1554 let entry = EventStoreEntry::new(
1555 hash,
1556 seq,
1557 headers,
1558 topic,
1559 Ustr::from(payload_type),
1560 payload,
1561 ts,
1562 ts,
1563 );
1564 AppendEntry::without_indices(entry)
1565 }
1566
1567 fn append_account_state(seq: u64, state: &AccountState) -> AppendEntry {
1568 let encoded = encode_account_state(state).expect("encode account state");
1569 append_payload(seq, PAYLOAD_TYPE_ACCOUNT_STATE, encoded.payload)
1570 }
1571
1572 fn append_order_event(seq: u64, event: &OrderEventAny) -> AppendEntry {
1573 let encoded = encode_order_event_any(event).expect("encode order event");
1574 let payload_type = encoded.payload_type.expect("order payload type");
1575 append_payload(seq, payload_type.as_str(), encoded.payload)
1576 }
1577
1578 fn append_position_event(seq: u64, event: &PositionEvent) -> AppendEntry {
1579 let encoded = encode_position_event(event).expect("encode position event");
1580 let payload_type = encoded.payload_type.expect("position payload type");
1581 append_payload(seq, payload_type.as_str(), encoded.payload)
1582 }
1583
1584 fn reader_with_entries(
1585 run_id: &str,
1586 entries: &[AppendEntry],
1587 ) -> EventStoreReader<MemoryBackend> {
1588 let mut backend = MemoryBackend::new();
1589 backend.open_run(manifest(run_id)).expect("open");
1590 backend.append_batch(entries).expect("append");
1591 EventStoreReader::new(backend)
1592 }
1593
1594 fn reader_with_anchor(anchor_seq: u64) -> (EventStoreReader<MemoryBackend>, AccountState) {
1595 let anchored = cash_account_state();
1596 let replayed = cash_account_state_million_usd("200 USD", "0 USD", "200 USD");
1597 let mut backend = MemoryBackend::new();
1598 backend.open_run(manifest("run-replay")).expect("open");
1599 backend
1600 .append_batch(&[
1601 append_account_state(1, &anchored),
1602 append_account_state(2, &replayed),
1603 ])
1604 .expect("append");
1605 backend
1606 .record_snapshot_anchor(SnapshotAnchor::new(anchor_seq, "cache://account", "hash"))
1607 .expect("record anchor");
1608 (EventStoreReader::new(backend), replayed)
1609 }
1610
1611 fn catalog_quote_record(ts_init: u64) -> CatalogReplayRecord {
1612 let instrument_id = InstrumentId::from("AUD/USD.SIM");
1613 CatalogReplayRecord::from_data(CatalogReplayData::Quote(QuoteTick::new(
1614 instrument_id,
1615 Price::from("1.0001"),
1616 Price::from("1.0002"),
1617 Quantity::from("100"),
1618 Quantity::from("100"),
1619 UnixNanos::from(ts_init),
1620 UnixNanos::from(ts_init),
1621 )))
1622 }
1623
1624 fn catalog_trade_record(ts_init: u64) -> CatalogReplayRecord {
1625 let instrument_id = InstrumentId::from("AUD/USD.SIM");
1626 CatalogReplayRecord::from_data(CatalogReplayData::Trade(TradeTick::new(
1627 instrument_id,
1628 Price::from("1.0001"),
1629 Quantity::from("100"),
1630 AggressorSide::Buyer,
1631 TradeId::from("T-1"),
1632 UnixNanos::from(ts_init),
1633 UnixNanos::from(ts_init),
1634 )))
1635 }
1636
1637 #[derive(Debug)]
1638 struct CountingTap {
1639 calls: Rc<Cell<usize>>,
1640 }
1641
1642 impl CountingTap {
1643 fn new(calls: Rc<Cell<usize>>) -> Self {
1644 Self { calls }
1645 }
1646
1647 fn increment(&self) {
1648 self.calls.set(self.calls.get() + 1);
1649 }
1650 }
1651
1652 impl BusTap for CountingTap {
1653 fn on_publish(&self, _topic: MStr<BusTopic>, _message: &dyn Any) {
1654 self.increment();
1655 }
1656
1657 fn on_send(&self, _endpoint: MStr<Endpoint>, _message: &dyn Any) {
1658 self.increment();
1659 }
1660 }
1661
1662 #[derive(Debug)]
1663 struct FakeReplayCatalog {
1664 coverage: CatalogSliceCoverage,
1665 records: Vec<CatalogReplayRecord>,
1666 plan_queries: Vec<CatalogSliceQuery>,
1667 load_plans: Vec<CatalogSlicePlan>,
1668 }
1669
1670 impl FakeReplayCatalog {
1671 fn new(coverage: CatalogSliceCoverage, records: Vec<CatalogReplayRecord>) -> Self {
1672 Self {
1673 coverage,
1674 records,
1675 plan_queries: Vec::new(),
1676 load_plans: Vec::new(),
1677 }
1678 }
1679 }
1680
1681 impl ReplayCatalog for FakeReplayCatalog {
1682 type Error = String;
1683
1684 fn plan_slice(
1685 &mut self,
1686 query: &CatalogSliceQuery,
1687 ) -> Result<CatalogSliceCoverage, Self::Error> {
1688 self.plan_queries.push(query.clone());
1689 Ok(self.coverage.clone())
1690 }
1691
1692 fn load_slice(
1693 &mut self,
1694 plan: &CatalogSlicePlan,
1695 ) -> Result<Vec<CatalogReplayRecord>, Self::Error> {
1696 self.load_plans.push(plan.clone());
1697 Ok(self.records.clone())
1698 }
1699 }
1700
1701 struct BusTapGuard;
1702
1703 impl Drop for BusTapGuard {
1704 fn drop(&mut self) {
1705 msgbus::clear_bus_tap();
1706 }
1707 }
1708
1709 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1710 enum CacheMutationRecoveryClass {
1711 SnapshotOwned,
1712 EventStoreCapturedAndReplayed,
1713 ForensicOnly,
1714 MissingLiveRecovery,
1715 }
1716
1717 #[derive(Clone, Copy, Debug)]
1718 struct CacheMutationCoverage {
1719 method: &'static str,
1720 class: CacheMutationRecoveryClass,
1721 payload_types: &'static [&'static str],
1722 }
1723
1724 const CACHE_MUTATION_COVERAGE: &[CacheMutationCoverage] = &[
1725 cache_mutation(
1726 "set_database",
1727 CacheMutationRecoveryClass::SnapshotOwned,
1728 &[],
1729 ),
1730 cache_mutation(
1731 "cache_general",
1732 CacheMutationRecoveryClass::SnapshotOwned,
1733 &[],
1734 ),
1735 cache_mutation("cache_all", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1736 cache_mutation(
1737 "cache_currencies",
1738 CacheMutationRecoveryClass::SnapshotOwned,
1739 &[],
1740 ),
1741 cache_mutation(
1742 "cache_instruments",
1743 CacheMutationRecoveryClass::SnapshotOwned,
1744 &[],
1745 ),
1746 cache_mutation(
1747 "cache_synthetics",
1748 CacheMutationRecoveryClass::SnapshotOwned,
1749 &[],
1750 ),
1751 cache_mutation(
1752 "cache_accounts",
1753 CacheMutationRecoveryClass::SnapshotOwned,
1754 &[],
1755 ),
1756 cache_mutation(
1757 "cache_orders",
1758 CacheMutationRecoveryClass::SnapshotOwned,
1759 &[],
1760 ),
1761 cache_mutation(
1762 "cache_positions",
1763 CacheMutationRecoveryClass::SnapshotOwned,
1764 &[],
1765 ),
1766 cache_mutation(
1767 "build_index",
1768 CacheMutationRecoveryClass::SnapshotOwned,
1769 &[],
1770 ),
1771 cache_mutation(
1772 "purge_closed_orders",
1773 CacheMutationRecoveryClass::SnapshotOwned,
1774 &[],
1775 ),
1776 cache_mutation(
1777 "purge_closed_positions",
1778 CacheMutationRecoveryClass::SnapshotOwned,
1779 &[],
1780 ),
1781 cache_mutation(
1782 "purge_order",
1783 CacheMutationRecoveryClass::SnapshotOwned,
1784 &[],
1785 ),
1786 cache_mutation(
1787 "purge_position",
1788 CacheMutationRecoveryClass::SnapshotOwned,
1789 &[],
1790 ),
1791 cache_mutation(
1792 "purge_instrument",
1793 CacheMutationRecoveryClass::SnapshotOwned,
1794 &[],
1795 ),
1796 cache_mutation(
1797 "purge_account_events",
1798 CacheMutationRecoveryClass::SnapshotOwned,
1799 &[],
1800 ),
1801 cache_mutation(
1802 "clear_index",
1803 CacheMutationRecoveryClass::SnapshotOwned,
1804 &[],
1805 ),
1806 cache_mutation("reset", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1807 cache_mutation("dispose", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1808 cache_mutation("flush_db", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1809 cache_mutation("add", CacheMutationRecoveryClass::SnapshotOwned, &[]),
1810 cache_mutation(
1811 "add_order_book",
1812 CacheMutationRecoveryClass::ForensicOnly,
1813 &[PAYLOAD_TYPE_BOOK_RESPONSE],
1814 ),
1815 cache_mutation(
1816 "add_own_order_book",
1817 CacheMutationRecoveryClass::SnapshotOwned,
1818 &[],
1819 ),
1820 cache_mutation(
1821 "add_mark_price",
1822 CacheMutationRecoveryClass::MissingLiveRecovery,
1823 &[],
1824 ),
1825 cache_mutation(
1826 "add_index_price",
1827 CacheMutationRecoveryClass::MissingLiveRecovery,
1828 &[],
1829 ),
1830 cache_mutation(
1831 "add_funding_rate",
1832 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1833 &[PAYLOAD_TYPE_FUNDING_RATES_RESPONSE],
1834 ),
1835 cache_mutation(
1836 "add_funding_rates",
1837 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1838 &[PAYLOAD_TYPE_FUNDING_RATES_RESPONSE],
1839 ),
1840 cache_mutation(
1841 "add_instrument_status",
1842 CacheMutationRecoveryClass::MissingLiveRecovery,
1843 &[],
1844 ),
1845 cache_mutation(
1846 "add_quote",
1847 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1848 &[PAYLOAD_TYPE_QUOTES_RESPONSE],
1849 ),
1850 cache_mutation(
1851 "add_quotes",
1852 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1853 &[PAYLOAD_TYPE_QUOTES_RESPONSE],
1854 ),
1855 cache_mutation(
1856 "add_trade",
1857 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1858 &[PAYLOAD_TYPE_TRADES_RESPONSE],
1859 ),
1860 cache_mutation(
1861 "add_trades",
1862 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1863 &[PAYLOAD_TYPE_TRADES_RESPONSE],
1864 ),
1865 cache_mutation(
1866 "add_bar",
1867 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1868 &[PAYLOAD_TYPE_BARS_RESPONSE],
1869 ),
1870 cache_mutation(
1871 "add_bars",
1872 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1873 &[PAYLOAD_TYPE_BARS_RESPONSE],
1874 ),
1875 cache_mutation(
1876 "add_greeks",
1877 CacheMutationRecoveryClass::MissingLiveRecovery,
1878 &[],
1879 ),
1880 cache_mutation(
1881 "add_option_greeks",
1882 CacheMutationRecoveryClass::MissingLiveRecovery,
1883 &[],
1884 ),
1885 cache_mutation(
1886 "add_yield_curve",
1887 CacheMutationRecoveryClass::MissingLiveRecovery,
1888 &[],
1889 ),
1890 cache_mutation(
1891 "add_currency",
1892 CacheMutationRecoveryClass::SnapshotOwned,
1893 &[],
1894 ),
1895 cache_mutation(
1896 "add_instrument",
1897 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1898 &[
1899 PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
1900 PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
1901 ],
1902 ),
1903 cache_mutation(
1904 "add_synthetic",
1905 CacheMutationRecoveryClass::SnapshotOwned,
1906 &[],
1907 ),
1908 cache_mutation(
1909 "add_account",
1910 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1911 &[PAYLOAD_TYPE_ACCOUNT_STATE],
1912 ),
1913 cache_mutation(
1914 "add_venue_order_id",
1915 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1916 &[PAYLOAD_TYPE_ORDER_ACCEPTED, PAYLOAD_TYPE_ORDER_UPDATED],
1917 ),
1918 cache_mutation(
1919 "add_order",
1920 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1921 &[PAYLOAD_TYPE_ORDER_INITIALIZED],
1922 ),
1923 cache_mutation(
1924 "add_order_list",
1925 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1926 &[PAYLOAD_TYPE_SUBMIT_ORDER_LIST],
1927 ),
1928 cache_mutation(
1929 "add_position_id",
1930 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1931 &[
1932 PAYLOAD_TYPE_ORDER_FILLED,
1933 PAYLOAD_TYPE_POSITION_OPENED,
1934 PAYLOAD_TYPE_POSITION_CHANGED,
1935 PAYLOAD_TYPE_POSITION_CLOSED,
1936 ],
1937 ),
1938 cache_mutation(
1939 "add_position",
1940 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1941 &[PAYLOAD_TYPE_ORDER_FILLED],
1942 ),
1943 cache_mutation(
1944 "update_account",
1945 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1946 &[PAYLOAD_TYPE_ACCOUNT_STATE],
1947 ),
1948 cache_mutation(
1949 "take_account",
1950 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1951 &[PAYLOAD_TYPE_ACCOUNT_STATE],
1952 ),
1953 cache_mutation(
1954 "cache_account_owned",
1955 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1956 &[PAYLOAD_TYPE_ACCOUNT_STATE],
1957 ),
1958 cache_mutation(
1959 "update_account_owned",
1960 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1961 &[PAYLOAD_TYPE_ACCOUNT_STATE],
1962 ),
1963 cache_mutation(
1964 "update_account_state",
1965 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1966 &[PAYLOAD_TYPE_ACCOUNT_STATE],
1967 ),
1968 cache_mutation(
1969 "replace_order",
1970 CacheMutationRecoveryClass::ForensicOnly,
1971 &[
1972 PAYLOAD_TYPE_ORDER_STATUS_REPORT,
1973 PAYLOAD_TYPE_ORDER_WITH_FILLS,
1974 PAYLOAD_TYPE_EXECUTION_MASS_STATUS,
1975 ],
1976 ),
1977 cache_mutation(
1978 "update_order",
1979 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
1980 &[
1981 PAYLOAD_TYPE_ORDER_DENIED,
1982 PAYLOAD_TYPE_ORDER_EMULATED,
1983 PAYLOAD_TYPE_ORDER_RELEASED,
1984 PAYLOAD_TYPE_ORDER_SUBMITTED,
1985 PAYLOAD_TYPE_ORDER_ACCEPTED,
1986 PAYLOAD_TYPE_ORDER_REJECTED,
1987 PAYLOAD_TYPE_ORDER_CANCELED,
1988 PAYLOAD_TYPE_ORDER_EXPIRED,
1989 PAYLOAD_TYPE_ORDER_TRIGGERED,
1990 PAYLOAD_TYPE_ORDER_PENDING_UPDATE,
1991 PAYLOAD_TYPE_ORDER_PENDING_CANCEL,
1992 PAYLOAD_TYPE_ORDER_MODIFY_REJECTED,
1993 PAYLOAD_TYPE_ORDER_CANCEL_REJECTED,
1994 PAYLOAD_TYPE_ORDER_UPDATED,
1995 PAYLOAD_TYPE_ORDER_FILLED,
1996 ],
1997 ),
1998 cache_mutation(
1999 "update_order_pending_cancel_local",
2000 CacheMutationRecoveryClass::MissingLiveRecovery,
2001 &[],
2002 ),
2003 cache_mutation(
2004 "update_position",
2005 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed,
2006 &[
2007 PAYLOAD_TYPE_ORDER_FILLED,
2008 PAYLOAD_TYPE_POSITION_OPENED,
2009 PAYLOAD_TYPE_POSITION_CHANGED,
2010 PAYLOAD_TYPE_POSITION_CLOSED,
2011 PAYLOAD_TYPE_POSITION_ADJUSTED,
2012 ],
2013 ),
2014 cache_mutation(
2015 "snapshot_position",
2016 CacheMutationRecoveryClass::SnapshotOwned,
2017 &[],
2018 ),
2019 cache_mutation(
2020 "snapshot_position_state",
2021 CacheMutationRecoveryClass::SnapshotOwned,
2022 &[],
2023 ),
2024 cache_mutation(
2025 "load_snapshot_blob",
2026 CacheMutationRecoveryClass::SnapshotOwned,
2027 &[],
2028 ),
2029 cache_mutation(
2030 "restore_snapshot_blob",
2031 CacheMutationRecoveryClass::SnapshotOwned,
2032 &[],
2033 ),
2034 cache_mutation(
2035 "order_mut",
2036 CacheMutationRecoveryClass::MissingLiveRecovery,
2037 &[],
2038 ),
2039 cache_mutation(
2040 "position_mut",
2041 CacheMutationRecoveryClass::MissingLiveRecovery,
2042 &[],
2043 ),
2044 cache_mutation(
2045 "order_book_mut",
2046 CacheMutationRecoveryClass::ForensicOnly,
2047 &[
2048 PAYLOAD_TYPE_BOOK_DELTAS_RESPONSE,
2049 PAYLOAD_TYPE_BOOK_DEPTH_RESPONSE,
2050 ],
2051 ),
2052 cache_mutation(
2053 "own_order_book_mut",
2054 CacheMutationRecoveryClass::SnapshotOwned,
2055 &[],
2056 ),
2057 cache_mutation(
2058 "set_mark_xrate",
2059 CacheMutationRecoveryClass::MissingLiveRecovery,
2060 &[],
2061 ),
2062 cache_mutation(
2063 "clear_mark_xrate",
2064 CacheMutationRecoveryClass::MissingLiveRecovery,
2065 &[],
2066 ),
2067 cache_mutation(
2068 "clear_mark_xrates",
2069 CacheMutationRecoveryClass::MissingLiveRecovery,
2070 &[],
2071 ),
2072 cache_mutation(
2073 "account_mut",
2074 CacheMutationRecoveryClass::MissingLiveRecovery,
2075 &[],
2076 ),
2077 cache_mutation(
2078 "update_own_order_book",
2079 CacheMutationRecoveryClass::SnapshotOwned,
2080 &[],
2081 ),
2082 cache_mutation(
2083 "force_remove_from_own_order_book",
2084 CacheMutationRecoveryClass::SnapshotOwned,
2085 &[],
2086 ),
2087 cache_mutation(
2088 "audit_own_order_books",
2089 CacheMutationRecoveryClass::SnapshotOwned,
2090 &[],
2091 ),
2092 ];
2093
2094 const CACHE_MUTATION_EXCLUSIONS: &[&str] = &["check_integrity"];
2095
2096 const fn cache_mutation(
2097 method: &'static str,
2098 class: CacheMutationRecoveryClass,
2099 payload_types: &'static [&'static str],
2100 ) -> CacheMutationCoverage {
2101 CacheMutationCoverage {
2102 method,
2103 class,
2104 payload_types,
2105 }
2106 }
2107
2108 fn cache_public_methods() -> AHashSet<&'static str> {
2109 collect_cache_public_methods(false)
2110 }
2111
2112 fn cache_public_mutable_methods() -> AHashSet<&'static str> {
2113 collect_cache_public_methods(true)
2114 }
2115
2116 fn collect_cache_public_methods(require_mut_self: bool) -> AHashSet<&'static str> {
2117 let source = include_str!("../../common/src/cache/mod.rs");
2118 let mut methods = AHashSet::new();
2119 let mut pending_name: Option<&'static str> = None;
2120 let mut pending_signature = String::new();
2121
2122 for line in source.lines() {
2123 let trimmed = line.trim_start();
2124
2125 if pending_name.is_none() {
2126 let Some(rest) = trimmed
2127 .strip_prefix("pub fn ")
2128 .or_else(|| trimmed.strip_prefix("pub async fn "))
2129 else {
2130 continue;
2131 };
2132 pending_name = rest.split('(').next();
2133 pending_signature.clear();
2134 pending_signature.push_str(trimmed);
2135 } else {
2136 pending_signature.push(' ');
2137 pending_signature.push_str(trimmed);
2138 }
2139
2140 if trimmed.contains('{') {
2141 if let Some(name) = pending_name.take()
2142 && (!require_mut_self || pending_signature.contains("&mut self"))
2143 {
2144 methods.insert(name);
2145 }
2146 pending_signature.clear();
2147 }
2148 }
2149
2150 methods
2151 }
2152
2153 fn sorted_missing_methods<'a>(
2154 actual: &'a AHashSet<&'static str>,
2155 classified: &'a AHashSet<&'static str>,
2156 ) -> Vec<&'static str> {
2157 let mut missing: Vec<_> = actual
2158 .iter()
2159 .copied()
2160 .filter(|method| !classified.contains(method))
2161 .collect();
2162 missing.sort_unstable();
2163 missing
2164 }
2165
2166 fn sorted_stale_methods<'a>(
2167 classified: &'a AHashSet<&'static str>,
2168 actual: &'a AHashSet<&'static str>,
2169 ) -> Vec<&'static str> {
2170 let mut stale: Vec<_> = classified
2171 .iter()
2172 .copied()
2173 .filter(|method| !actual.contains(method))
2174 .collect();
2175 stale.sort_unstable();
2176 stale
2177 }
2178
2179 #[rstest]
2180 fn catalog_replay_inputs_join_event_entries_with_selected_catalog_slice() {
2181 let reader = reader_with_entries(
2182 "run-catalog",
2183 &[
2184 append_payload_with_ts(1, 120, "RunStarted", Bytes::from_static(b"started")),
2185 append_payload_with_ts(2, 100, "SubmitOrder", Bytes::from_static(b"submit")),
2186 ],
2187 );
2188 let record = catalog_quote_record(110);
2189 let mut catalog = FakeReplayCatalog::new(
2190 CatalogSliceCoverage::from_files(vec!["quotes/AUDUSD.SIM/100_120.parquet".into()]),
2191 vec![record.clone()],
2192 );
2193
2194 let plan = plan_catalog_replay_inputs(
2195 &reader,
2196 &mut catalog,
2197 ReplaySeqRange::new(1, 2),
2198 &[CatalogSliceSelector::new("quotes").with_identifier("AUD/USD.SIM")],
2199 )
2200 .expect("plan catalog replay");
2201
2202 assert_eq!(plan.event_range, Some(ReplaySeqRange::new(1, 2)));
2203 assert_eq!(plan.event_count, 2);
2204 assert_eq!(
2205 plan.event_time_range,
2206 Some(ReplayTimeRange::new(
2207 UnixNanos::from(100),
2208 UnixNanos::from(120),
2209 )),
2210 );
2211 assert!(!plan.catalog_slices[0].is_missing());
2212 assert_eq!(catalog.plan_queries.len(), 1);
2213 assert_eq!(catalog.plan_queries[0].data_cls, "quotes");
2214 assert_eq!(
2215 catalog.plan_queries[0].identifiers,
2216 vec!["AUD/USD.SIM".to_string()],
2217 );
2218 assert_eq!(catalog.plan_queries[0].start, UnixNanos::from(100));
2219 assert_eq!(catalog.plan_queries[0].end, UnixNanos::from(120));
2220
2221 let loaded =
2222 load_catalog_replay_inputs(&reader, &mut catalog, &plan).expect("load catalog");
2223 let seqs: Vec<_> = loaded.entries.iter().map(|entry| entry.seq).collect();
2224
2225 assert_eq!(seqs, vec![1, 2]);
2226 assert_eq!(loaded.catalog_slices.len(), 1);
2227 assert_eq!(loaded.catalog_slices[0].records, vec![record]);
2228 assert_eq!(catalog.load_plans.len(), 1);
2229 }
2230
2231 #[rstest]
2232 fn catalog_plan_marks_missing_catalog_slice() {
2233 let reader = reader_with_entries(
2234 "run-missing-catalog",
2235 &[append_payload_with_ts(
2236 1,
2237 1_000,
2238 "RunStarted",
2239 Bytes::from_static(b"started"),
2240 )],
2241 );
2242 let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2243
2244 let plan = plan_catalog_replay_inputs(
2245 &reader,
2246 &mut catalog,
2247 ReplaySeqRange::new(1, 1),
2248 &[CatalogSliceSelector::new("trades").with_identifier("AUD/USD.SIM")],
2249 )
2250 .expect("plan catalog replay");
2251 let missing = plan.missing_catalog_slices();
2252
2253 assert_eq!(missing.len(), 1);
2254 assert_eq!(missing[0].query.data_cls, "trades");
2255 assert_eq!(
2256 missing[0].query.identifiers,
2257 vec!["AUD/USD.SIM".to_string()],
2258 );
2259 assert_eq!(missing[0].query.start, UnixNanos::from(1_000));
2260 assert_eq!(missing[0].query.end, UnixNanos::from(1_000));
2261 }
2262
2263 #[rstest]
2264 fn required_missing_catalog_slice_rejects_load() {
2265 let reader = reader_with_entries(
2266 "run-required-missing",
2267 &[append_payload_with_ts(
2268 1,
2269 1_000,
2270 "RunStarted",
2271 Bytes::from_static(b"started"),
2272 )],
2273 );
2274 let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2275 let plan = plan_catalog_replay_inputs(
2276 &reader,
2277 &mut catalog,
2278 ReplaySeqRange::new(1, 1),
2279 &[CatalogSliceSelector::new("quotes")
2280 .with_identifier("AUD/USD.SIM")
2281 .require_coverage()],
2282 )
2283 .expect("plan missing slice");
2284
2285 let err = load_catalog_replay_inputs(&reader, &mut catalog, &plan)
2286 .expect_err("required missing slice must fail");
2287
2288 match err {
2289 ReplayInputError::MissingCatalogSlice {
2290 data_cls,
2291 identifiers,
2292 } => {
2293 assert_eq!(data_cls, "quotes");
2294 assert_eq!(identifiers, vec!["AUD/USD.SIM".to_string()]);
2295 }
2296 other => panic!("expected MissingCatalogSlice, was {other:?}"),
2297 }
2298 }
2299
2300 #[rstest]
2301 fn optional_missing_catalog_slice_loads_as_empty_without_catalog_load() {
2302 let reader = reader_with_entries(
2303 "run-optional-missing",
2304 &[append_payload_with_ts(
2305 1,
2306 1_000,
2307 "RunStarted",
2308 Bytes::from_static(b"started"),
2309 )],
2310 );
2311 let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2312 let plan = plan_catalog_replay_inputs(
2313 &reader,
2314 &mut catalog,
2315 ReplaySeqRange::new(1, 1),
2316 &[CatalogSliceSelector::new("quotes").with_identifier("AUD/USD.SIM")],
2317 )
2318 .expect("plan optional missing slice");
2319
2320 let loaded =
2321 load_catalog_replay_inputs(&reader, &mut catalog, &plan).expect("load optional");
2322
2323 assert_eq!(loaded.catalog_slices.len(), 1);
2324 assert!(loaded.catalog_slices[0].plan.is_missing());
2325 assert!(loaded.catalog_slices[0].records.is_empty());
2326 assert!(catalog.load_plans.is_empty());
2327 }
2328
2329 #[rstest]
2330 fn catalog_joined_planner_rejects_empty_catalog_selection() {
2331 let reader = reader_with_entries(
2332 "run-empty-selection",
2333 &[append_payload_with_ts(
2334 1,
2335 1_000,
2336 "RunStarted",
2337 Bytes::from_static(b"started"),
2338 )],
2339 );
2340 let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2341
2342 let err = plan_catalog_replay_inputs(&reader, &mut catalog, ReplaySeqRange::new(1, 1), &[])
2343 .expect_err("empty catalog selection must fail");
2344
2345 match err {
2346 ReplayInputError::EmptyCatalogSelection => {}
2347 other => panic!("expected EmptyCatalogSelection, was {other:?}"),
2348 }
2349 assert!(catalog.plan_queries.is_empty());
2350 }
2351
2352 #[rstest]
2353 fn catalog_selector_explicit_time_bounds_override_event_span() {
2354 let reader = reader_with_entries(
2355 "run-explicit-bounds",
2356 &[append_payload_with_ts(
2357 1,
2358 1_000,
2359 "RunStarted",
2360 Bytes::from_static(b"started"),
2361 )],
2362 );
2363 let mut catalog = FakeReplayCatalog::new(
2364 CatalogSliceCoverage::from_files(vec!["bars/AUDUSD.SIM/900_950.parquet".into()]),
2365 Vec::new(),
2366 );
2367
2368 let plan = plan_catalog_replay_inputs(
2369 &reader,
2370 &mut catalog,
2371 ReplaySeqRange::new(1, 1),
2372 &[CatalogSliceSelector::new("bars")
2373 .with_identifier("AUD/USD.SIM-1-MINUTE-BID-EXTERNAL")
2374 .with_time_bounds(UnixNanos::from(900), UnixNanos::from(950))],
2375 )
2376 .expect("plan explicit bounds");
2377
2378 assert_eq!(plan.catalog_slices[0].query.start, UnixNanos::from(900));
2379 assert_eq!(plan.catalog_slices[0].query.end, UnixNanos::from(950));
2380 assert_eq!(catalog.plan_queries[0].start, UnixNanos::from(900));
2381 assert_eq!(catalog.plan_queries[0].end, UnixNanos::from(950));
2382 }
2383
2384 #[rstest]
2385 fn catalog_replay_inputs_load_catalog_records() {
2386 let reader = reader_with_entries(
2387 "run-catalog-load",
2388 &[
2389 append_payload_with_ts(1, 100, "RunStarted", Bytes::from_static(b"started")),
2390 append_payload_with_ts(2, 110, "OrderFilled", Bytes::from_static(b"filled")),
2391 ],
2392 );
2393 let record = catalog_trade_record(105);
2394 let mut catalog = FakeReplayCatalog::new(
2395 CatalogSliceCoverage::from_files(vec!["trades/AUDUSD.SIM/100_110.parquet".into()]),
2396 vec![record.clone()],
2397 );
2398 let plan = plan_catalog_replay_inputs(
2399 &reader,
2400 &mut catalog,
2401 ReplaySeqRange::new(1, 2),
2402 &[CatalogSliceSelector::new("trades").with_identifier("AUD/USD.SIM")],
2403 )
2404 .expect("plan catalog replay");
2405
2406 assert_eq!(
2407 plan.catalog_slices[0].query.identifiers_option(),
2408 Some(vec!["AUD/USD.SIM".to_string()]),
2409 );
2410
2411 let loaded =
2412 load_catalog_replay_inputs(&reader, &mut catalog, &plan).expect("load catalog");
2413 let seqs: Vec<_> = loaded.entries.iter().map(|entry| entry.seq).collect();
2414
2415 assert_eq!(seqs, vec![1, 2]);
2416 assert_eq!(loaded.catalog_slices[0].records, vec![record]);
2417 assert_eq!(catalog.load_plans.len(), 1);
2418 }
2419
2420 #[rstest]
2421 fn unbounded_catalog_selector_rejects_empty_event_scan() {
2422 let reader = reader_with_entries("run-empty", &[]);
2423 let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2424
2425 let err = plan_catalog_replay_inputs(
2426 &reader,
2427 &mut catalog,
2428 ReplaySeqRange::new(1, 10),
2429 &[CatalogSliceSelector::new("quotes")],
2430 )
2431 .expect_err("empty replay scan must need explicit bounds");
2432
2433 match err {
2434 ReplayInputError::MissingCatalogTimeBounds { data_cls } => {
2435 assert_eq!(data_cls, "quotes");
2436 }
2437 other => panic!("expected MissingCatalogTimeBounds, was {other:?}"),
2438 }
2439 }
2440
2441 #[rstest]
2442 fn invalid_catalog_time_bounds_are_rejected_before_catalog_access() {
2443 let reader = reader_with_entries(
2444 "run-invalid-bounds",
2445 &[append_payload_with_ts(
2446 1,
2447 1_000,
2448 "RunStarted",
2449 Bytes::from_static(b"started"),
2450 )],
2451 );
2452 let mut catalog = FakeReplayCatalog::new(CatalogSliceCoverage::default(), Vec::new());
2453
2454 let err = plan_catalog_replay_inputs(
2455 &reader,
2456 &mut catalog,
2457 ReplaySeqRange::new(1, 1),
2458 &[CatalogSliceSelector::new("quotes")
2459 .with_time_bounds(UnixNanos::from(200), UnixNanos::from(100))],
2460 )
2461 .expect_err("invalid catalog bounds must fail");
2462
2463 match err {
2464 ReplayInputError::InvalidCatalogTimeRange {
2465 data_cls,
2466 start,
2467 end,
2468 } => {
2469 assert_eq!(data_cls, "quotes");
2470 assert_eq!(start, 200);
2471 assert_eq!(end, 100);
2472 }
2473 other => panic!("expected InvalidCatalogTimeRange, was {other:?}"),
2474 }
2475 assert!(catalog.plan_queries.is_empty());
2476 }
2477
2478 #[rstest]
2479 fn forensics_replay_inputs_do_not_require_catalog_source() {
2480 let reader = reader_with_entries(
2481 "run-forensics",
2482 &[append_payload_with_ts(
2483 1,
2484 500,
2485 "RunStarted",
2486 Bytes::from_static(b"started"),
2487 )],
2488 );
2489
2490 let plan = plan_forensics_replay_inputs(&reader, ReplaySeqRange::new(1, 1))
2491 .expect("plan forensics");
2492 let loaded = load_forensics_replay_inputs(&reader, &plan).expect("load forensics");
2493
2494 assert!(plan.catalog_slices.is_empty());
2495 assert_eq!(loaded.entries.len(), 1);
2496 assert!(loaded.catalog_slices.is_empty());
2497 }
2498
2499 #[rstest]
2500 #[case::zero_start(ReplaySeqRange::new(0, 1), "seq is 1-based")]
2501 #[case::from_after_to(ReplaySeqRange::new(2, 1), "from_seq exceeds to_seq")]
2502 fn invalid_replay_seq_range_rejected(
2503 #[case] range: ReplaySeqRange,
2504 #[case] expected_message: &str,
2505 ) {
2506 let reader = reader_with_entries("run-invalid-seq", &[]);
2507
2508 let err =
2509 plan_forensics_replay_inputs(&reader, range).expect_err("invalid seq range must fail");
2510
2511 match err {
2512 ReplayInputError::InvalidSeqRange {
2513 from_seq,
2514 to_seq,
2515 message,
2516 } => {
2517 assert_eq!(from_seq, range.from_seq);
2518 assert_eq!(to_seq, range.to_seq);
2519 assert_eq!(message, expected_message);
2520 }
2521 other => panic!("expected InvalidSeqRange, was {other:?}"),
2522 }
2523 }
2524
2525 #[rstest]
2526 fn replay_restores_snapshot_before_applying_tail() {
2527 let (reader, replayed) = reader_with_anchor(1);
2528 let mut cache = Cache::default();
2529 let restored = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
2530 let restored_id = restored.account_id;
2531
2532 let report =
2533 restore_cache_snapshot_and_replay_tail(&mut cache, &reader, |cache, anchor| {
2534 assert_eq!(anchor.expect("anchor").high_watermark, 1);
2535 let account = AccountAny::from_events(std::slice::from_ref(&restored))
2536 .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))?;
2537 cache
2538 .add_account(account)
2539 .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))
2540 })
2541 .expect("replay");
2542
2543 let account = cache.account_owned(&restored_id).expect("account restored");
2544 let events = account.events();
2545
2546 assert_eq!(report.plan.from_seq, 2);
2547 assert_eq!(report.applied_entries, 1);
2548 assert_eq!(report.ignored_entries, 0);
2549 assert_eq!(events, vec![restored, replayed]);
2550 }
2551
2552 #[rstest]
2553 fn replay_does_not_apply_entries_at_or_below_anchor_watermark() {
2554 let (reader, _) = reader_with_anchor(2);
2555 let mut cache = Cache::default();
2556 let restored = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
2557 let restored_id = restored.account_id;
2558
2559 let report =
2560 restore_cache_snapshot_and_replay_tail(&mut cache, &reader, |cache, anchor| {
2561 assert_eq!(anchor.expect("anchor").high_watermark, 2);
2562 let account = AccountAny::from_events(std::slice::from_ref(&restored))
2563 .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))?;
2564 cache
2565 .add_account(account)
2566 .map_err(|e| CacheReplayError::snapshot_restore(anchor.unwrap(), e))
2567 })
2568 .expect("replay");
2569
2570 let account = cache.account_owned(&restored_id).expect("account restored");
2571
2572 assert!(report.plan.is_empty());
2573 assert_eq!(report.applied_entries, 0);
2574 assert_eq!(report.ignored_entries, 0);
2575 assert_eq!(account.events(), vec![restored]);
2576 }
2577
2578 #[rstest]
2579 fn replay_from_start_applies_account_state_without_bus_publish() {
2580 let state = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
2581 let account_id = AccountId::from("SIM-001");
2582 let bus_calls = Rc::new(Cell::new(0));
2583 msgbus::set_bus_tap(Rc::new(CountingTap::new(Rc::clone(&bus_calls))));
2584 let _guard = BusTapGuard;
2585 let mut backend = MemoryBackend::new();
2586 backend.open_run(manifest("run-replay")).expect("open");
2587 backend
2588 .append_batch(&[append_account_state(1, &state)])
2589 .expect("append");
2590 let reader = EventStoreReader::new(backend);
2591 let mut cache = Cache::default();
2592
2593 let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
2594 let account = cache.account_owned(&account_id).expect("account replayed");
2595
2596 assert_eq!(report.plan.anchor, None);
2597 assert_eq!(report.plan.from_seq, 1);
2598 assert_eq!(report.applied_entries, 1);
2599 assert_eq!(bus_calls.get(), 0);
2600 assert_eq!(account.last_event(), Some(state));
2601 assert_eq!(account.base_currency(), Some(Currency::USD()));
2602 }
2603
2604 #[rstest]
2605 fn unsupported_payload_is_ignored() {
2606 let mut backend = MemoryBackend::new();
2607 backend.open_run(manifest("run-replay")).expect("open");
2608 backend
2609 .append_batch(&[append_payload(
2610 1,
2611 "RunStarted",
2612 Bytes::copy_from_slice(UUID4::new().to_string().as_bytes()),
2613 )])
2614 .expect("append");
2615 let reader = EventStoreReader::new(backend);
2616 let mut cache = Cache::default();
2617
2618 let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
2619
2620 assert_eq!(report.applied_entries, 0);
2621 assert_eq!(report.ignored_entries, 1);
2622 }
2623
2624 #[rstest]
2625 fn default_capture_payload_types_are_classified_for_cache_replay() {
2626 let mut classified = AHashSet::new();
2627 let mut overlap = Vec::new();
2628
2629 for payload_type in CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES {
2630 classified.insert(*payload_type);
2631 }
2632
2633 for payload_type in FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES {
2634 if !classified.insert(*payload_type) {
2635 overlap.push(*payload_type);
2636 }
2637 }
2638
2639 let mut seen_defaults = AHashSet::new();
2640 let duplicate_defaults: Vec<_> = DEFAULT_CAPTURE_PAYLOAD_TYPES
2641 .iter()
2642 .copied()
2643 .filter(|payload_type| !seen_defaults.insert(*payload_type))
2644 .collect();
2645 let unclassified: Vec<_> = DEFAULT_CAPTURE_PAYLOAD_TYPES
2646 .iter()
2647 .copied()
2648 .filter(|payload_type| !classified.contains(payload_type))
2649 .collect();
2650 let extra: Vec<_> = classified
2651 .iter()
2652 .copied()
2653 .filter(|payload_type| !seen_defaults.contains(payload_type))
2654 .collect();
2655
2656 assert!(
2657 duplicate_defaults.is_empty(),
2658 "default capture payload types must be unique: {duplicate_defaults:?}",
2659 );
2660 assert!(
2661 overlap.is_empty(),
2662 "cache replay and forensic-only classes must not overlap: {overlap:?}",
2663 );
2664 assert!(
2665 unclassified.is_empty(),
2666 "default capture payload types must be cache replayed or forensic-only: {unclassified:?}",
2667 );
2668 assert!(
2669 extra.is_empty(),
2670 "cache replay classification must not list uncaptured payload types: {extra:?}",
2671 );
2672 }
2673
2674 #[rstest]
2675 fn cache_replay_capture_payload_types_have_replay_rules() {
2676 for payload_type in CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES {
2677 let entry = append_payload(1, payload_type, Bytes::from_static(&[0xc1])).entry;
2678 let mut cache = Cache::default();
2679
2680 let err = apply_cache_replay_entry(&mut cache, &entry)
2681 .expect_err("cache replay payload type must have a decode rule");
2682
2683 match err {
2684 CacheReplayError::Decode {
2685 payload_type: actual,
2686 ..
2687 } => {
2688 assert_eq!(actual, *payload_type);
2689 }
2690 other => panic!("expected Decode for {payload_type}, was {other:?}"),
2691 }
2692 }
2693 }
2694
2695 #[rstest]
2696 fn forensic_only_capture_payload_types_are_not_cache_replayed() {
2697 for payload_type in FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES {
2698 let entry = append_payload(1, payload_type, Bytes::from_static(&[0xc1])).entry;
2699 let mut cache = Cache::default();
2700
2701 let applied = apply_cache_replay_entry(&mut cache, &entry)
2702 .expect("forensic-only payload type must not be decoded by cache replay");
2703
2704 assert!(
2705 !applied,
2706 "forensic-only payload type must be ignored by cache replay: {payload_type}",
2707 );
2708 }
2709 }
2710
2711 #[rstest]
2712 fn cache_public_mutators_have_recovery_classification() {
2713 let mut classified = AHashSet::new();
2714 let mut duplicates = Vec::new();
2715
2716 for row in CACHE_MUTATION_COVERAGE {
2717 if !classified.insert(row.method) {
2718 duplicates.push(row.method);
2719 }
2720 }
2721
2722 for method in CACHE_MUTATION_EXCLUSIONS {
2723 if !classified.insert(*method) {
2724 duplicates.push(*method);
2725 }
2726 }
2727
2728 let public_methods = cache_public_methods();
2729 let mutable_methods = cache_public_mutable_methods();
2730 let missing = sorted_missing_methods(&mutable_methods, &classified);
2731 let stale = sorted_stale_methods(&classified, &public_methods);
2732
2733 assert!(
2734 duplicates.is_empty(),
2735 "cache mutation recovery classifications must be unique: {duplicates:?}",
2736 );
2737 assert!(
2738 missing.is_empty(),
2739 "public Cache mutators must be classified for recovery: {missing:?}",
2740 );
2741 assert!(
2742 stale.is_empty(),
2743 "cache mutation recovery classifications reference missing methods: {stale:?}",
2744 );
2745 }
2746
2747 #[rstest]
2748 fn cache_mutation_replay_classification_matches_payload_buckets() {
2749 for row in CACHE_MUTATION_COVERAGE {
2750 match row.class {
2751 CacheMutationRecoveryClass::EventStoreCapturedAndReplayed => {
2752 assert!(
2753 !row.payload_types.is_empty(),
2754 "cache-replayed mutation must cite captured payloads: {}",
2755 row.method,
2756 );
2757
2758 for payload_type in row.payload_types {
2759 assert!(
2760 CACHE_REPLAY_CAPTURE_PAYLOAD_TYPES.contains(payload_type),
2761 "cache mutation {} cites non-replayed payload {payload_type}",
2762 row.method,
2763 );
2764 }
2765 }
2766 CacheMutationRecoveryClass::ForensicOnly => {
2767 assert!(
2768 !row.payload_types.is_empty(),
2769 "forensic-only mutation must cite forensic payloads: {}",
2770 row.method,
2771 );
2772
2773 for payload_type in row.payload_types {
2774 assert!(
2775 FORENSIC_ONLY_CAPTURE_PAYLOAD_TYPES.contains(payload_type),
2776 "cache mutation {} cites non-forensic payload {payload_type}",
2777 row.method,
2778 );
2779 }
2780 }
2781 CacheMutationRecoveryClass::SnapshotOwned
2782 | CacheMutationRecoveryClass::MissingLiveRecovery => {
2783 assert!(
2784 row.payload_types.is_empty(),
2785 "non-event-store cache mutation {} should not cite payloads",
2786 row.method,
2787 );
2788 }
2789 }
2790 }
2791 }
2792
2793 #[rstest]
2794 fn submit_order_list_replay_restores_order_list() {
2795 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
2796 let instrument_id = instrument.id();
2797 let first_init = OrderInitializedSpec::builder()
2798 .instrument_id(instrument_id)
2799 .client_order_id(ClientOrderId::from("O-LIST-001"))
2800 .build();
2801 let second_init = OrderInitializedSpec::builder()
2802 .instrument_id(instrument_id)
2803 .client_order_id(ClientOrderId::from("O-LIST-002"))
2804 .build();
2805 let order_list = OrderList::new(
2806 OrderListId::from("OL-001"),
2807 instrument_id,
2808 first_init.strategy_id,
2809 vec![first_init.client_order_id, second_init.client_order_id],
2810 UnixNanos::from(1),
2811 );
2812 let command = SubmitOrderList::new(
2813 first_init.trader_id,
2814 Some(ClientId::from("SIM")),
2815 first_init.strategy_id,
2816 order_list.clone(),
2817 vec![first_init, second_init],
2818 None,
2819 None,
2820 None,
2821 UUID4::new(),
2822 UnixNanos::from(2),
2823 None,
2824 );
2825 let entry = append_serde_payload(1, PAYLOAD_TYPE_SUBMIT_ORDER_LIST, &command).entry;
2826 let mut cache = Cache::default();
2827
2828 let applied = apply_cache_replay_entry(&mut cache, &entry).expect("apply order list");
2829 let replayed = cache
2830 .order_list(&order_list.id)
2831 .expect("order list replayed");
2832
2833 assert!(applied);
2834 assert_eq!(replayed, &order_list);
2835 }
2836
2837 #[rstest]
2838 fn data_response_replay_restores_instruments_and_market_data() {
2839 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
2840 let instrument_id = instrument.id();
2841 let client_id = ClientId::from("DATA");
2842 let quote = QuoteTick::new(
2843 instrument_id,
2844 Price::from("1.00000"),
2845 Price::from("1.00010"),
2846 Quantity::from("100000"),
2847 Quantity::from("100000"),
2848 UnixNanos::from(10),
2849 UnixNanos::from(11),
2850 );
2851 let trade = TradeTick::new(
2852 instrument_id,
2853 Price::from("1.00005"),
2854 Quantity::from("50000"),
2855 AggressorSide::Buyer,
2856 TradeId::from("T-DATA-001"),
2857 UnixNanos::from(12),
2858 UnixNanos::from(13),
2859 );
2860 let funding_rate = FundingRateUpdate::new(
2861 instrument_id,
2862 "0.0001".parse().expect("funding rate"),
2863 Some(480),
2864 Some(UnixNanos::from(60)),
2865 UnixNanos::from(14),
2866 UnixNanos::from(15),
2867 );
2868 let bar_type = BarType::new(
2869 instrument_id,
2870 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
2871 AggregationSource::External,
2872 );
2873 let bar = Bar::new(
2874 bar_type,
2875 Price::from("1.00000"),
2876 Price::from("1.00020"),
2877 Price::from("0.99990"),
2878 Price::from("1.00010"),
2879 Quantity::from("150000"),
2880 UnixNanos::from(16),
2881 UnixNanos::from(17),
2882 );
2883 let reader = reader_with_entries(
2884 "run-data-response-replay",
2885 &[
2886 append_serde_payload(
2887 1,
2888 PAYLOAD_TYPE_INSTRUMENT_RESPONSE,
2889 &InstrumentResponse::new(
2890 UUID4::new(),
2891 client_id,
2892 instrument_id,
2893 instrument.clone(),
2894 None,
2895 None,
2896 UnixNanos::from(1),
2897 None,
2898 ),
2899 ),
2900 append_serde_payload(
2901 2,
2902 PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
2903 &InstrumentsResponse::new(
2904 UUID4::new(),
2905 client_id,
2906 instrument_id.venue,
2907 vec![instrument],
2908 None,
2909 None,
2910 UnixNanos::from(2),
2911 None,
2912 ),
2913 ),
2914 append_serde_payload(
2915 3,
2916 PAYLOAD_TYPE_QUOTES_RESPONSE,
2917 &QuotesResponse::new(
2918 UUID4::new(),
2919 client_id,
2920 instrument_id,
2921 vec![quote],
2922 None,
2923 None,
2924 UnixNanos::from(3),
2925 None,
2926 ),
2927 ),
2928 append_serde_payload(
2929 4,
2930 PAYLOAD_TYPE_TRADES_RESPONSE,
2931 &TradesResponse::new(
2932 UUID4::new(),
2933 client_id,
2934 instrument_id,
2935 vec![trade],
2936 None,
2937 None,
2938 UnixNanos::from(4),
2939 None,
2940 ),
2941 ),
2942 append_serde_payload(
2943 5,
2944 PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
2945 &FundingRatesResponse::new(
2946 UUID4::new(),
2947 client_id,
2948 instrument_id,
2949 vec![funding_rate],
2950 None,
2951 None,
2952 UnixNanos::from(5),
2953 None,
2954 ),
2955 ),
2956 append_serde_payload(
2957 6,
2958 PAYLOAD_TYPE_BARS_RESPONSE,
2959 &BarsResponse::new(
2960 UUID4::new(),
2961 client_id,
2962 bar_type,
2963 vec![bar],
2964 None,
2965 None,
2966 UnixNanos::from(6),
2967 None,
2968 ),
2969 ),
2970 ],
2971 );
2972 let mut cache = Cache::default();
2973
2974 let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
2975
2976 assert_eq!(report.applied_entries, 6);
2977 assert_eq!(report.ignored_entries, 0);
2978 assert_eq!(
2979 cache.instrument(&instrument_id).map(Instrument::id),
2980 Some(instrument_id)
2981 );
2982 assert_eq!(cache.quotes(&instrument_id), Some(vec![quote]));
2983 assert_eq!(cache.trades(&instrument_id), Some(vec![trade]));
2984 assert_eq!(
2985 cache.funding_rates(&instrument_id),
2986 Some(vec![funding_rate])
2987 );
2988 assert_eq!(cache.bars(&bar_type), Some(vec![bar]));
2989 }
2990
2991 #[rstest]
2992 fn empty_data_response_replay_is_noop() {
2993 let instrument_id = InstrumentAny::CurrencyPair(audusd_sim()).id();
2994 let client_id = ClientId::from("DATA");
2995 let bar_type = BarType::new(
2996 instrument_id,
2997 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
2998 AggregationSource::External,
2999 );
3000 let reader = reader_with_entries(
3001 "run-empty-data-response-replay",
3002 &[
3003 append_serde_payload(
3004 1,
3005 PAYLOAD_TYPE_INSTRUMENTS_RESPONSE,
3006 &InstrumentsResponse::new(
3007 UUID4::new(),
3008 client_id,
3009 instrument_id.venue,
3010 Vec::new(),
3011 None,
3012 None,
3013 UnixNanos::from(1),
3014 None,
3015 ),
3016 ),
3017 append_serde_payload(
3018 2,
3019 PAYLOAD_TYPE_QUOTES_RESPONSE,
3020 &QuotesResponse::new(
3021 UUID4::new(),
3022 client_id,
3023 instrument_id,
3024 Vec::new(),
3025 None,
3026 None,
3027 UnixNanos::from(2),
3028 None,
3029 ),
3030 ),
3031 append_serde_payload(
3032 3,
3033 PAYLOAD_TYPE_TRADES_RESPONSE,
3034 &TradesResponse::new(
3035 UUID4::new(),
3036 client_id,
3037 instrument_id,
3038 Vec::new(),
3039 None,
3040 None,
3041 UnixNanos::from(3),
3042 None,
3043 ),
3044 ),
3045 append_serde_payload(
3046 4,
3047 PAYLOAD_TYPE_FUNDING_RATES_RESPONSE,
3048 &FundingRatesResponse::new(
3049 UUID4::new(),
3050 client_id,
3051 instrument_id,
3052 Vec::new(),
3053 None,
3054 None,
3055 UnixNanos::from(4),
3056 None,
3057 ),
3058 ),
3059 append_serde_payload(
3060 5,
3061 PAYLOAD_TYPE_BARS_RESPONSE,
3062 &BarsResponse::new(
3063 UUID4::new(),
3064 client_id,
3065 bar_type,
3066 Vec::new(),
3067 None,
3068 None,
3069 UnixNanos::from(5),
3070 None,
3071 ),
3072 ),
3073 ],
3074 );
3075 let mut cache = Cache::default();
3076
3077 let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
3078
3079 assert_eq!(report.applied_entries, 5);
3080 assert_eq!(report.ignored_entries, 0);
3081 assert!(cache.instrument(&instrument_id).is_none());
3082 assert_eq!(cache.quotes(&instrument_id), None);
3083 assert_eq!(cache.trades(&instrument_id), None);
3084 assert_eq!(cache.funding_rates(&instrument_id), None);
3085 assert_eq!(cache.bars(&bar_type), None);
3086 }
3087
3088 #[rstest]
3089 fn order_fill_replay_updates_order_and_creates_position() {
3090 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3091 let position_id = PositionId::from("P-001");
3092 let initialized = OrderInitializedSpec::builder()
3093 .instrument_id(instrument.id())
3094 .build();
3095 let client_order_id = initialized.client_order_id;
3096 let submitted = OrderSubmittedSpec::builder()
3097 .instrument_id(instrument.id())
3098 .client_order_id(client_order_id)
3099 .build();
3100 let accepted = OrderAcceptedSpec::builder()
3101 .instrument_id(instrument.id())
3102 .client_order_id(client_order_id)
3103 .account_id(submitted.account_id)
3104 .build();
3105 let filled = OrderFilledSpec::builder()
3106 .instrument_id(instrument.id())
3107 .client_order_id(client_order_id)
3108 .venue_order_id(accepted.venue_order_id)
3109 .account_id(submitted.account_id)
3110 .position_id(position_id)
3111 .commission(Money::from("1 USD"))
3112 .build();
3113 let filled_event = OrderEventAny::Filled(filled);
3114 let reader = reader_with_entries(
3115 "run-order-replay",
3116 &[
3117 append_order_event(1, &OrderEventAny::Initialized(initialized)),
3118 append_order_event(2, &OrderEventAny::Submitted(submitted)),
3119 append_order_event(3, &OrderEventAny::Accepted(accepted)),
3120 append_order_event(4, &filled_event),
3121 ],
3122 );
3123 let mut cache = Cache::default();
3124 cache.add_instrument(instrument).expect("add instrument");
3125
3126 let report = replay_cache_snapshot_tail(&mut cache, &reader).expect("replay");
3127 let order = cache.order_owned(&client_order_id).expect("order replayed");
3128 let position = cache
3129 .position_owned(&position_id)
3130 .expect("position replayed");
3131
3132 assert_eq!(report.applied_entries, 4);
3133 assert_eq!(report.ignored_entries, 0);
3134 assert_eq!(order.status(), OrderStatus::Filled);
3135 assert_eq!(order.event_count(), 4);
3136 assert_eq!(order.last_event(), &filled_event);
3137 assert_eq!(position.event_count(), 1);
3138 assert_eq!(position.last_event(), Some(filled));
3139 assert_eq!(position.trade_ids(), vec![filled.trade_id]);
3140 assert_eq!(position.commissions(), vec![Money::from("1 USD")]);
3141 }
3142
3143 #[rstest]
3144 fn position_lifecycle_replay_updates_existing_position() {
3145 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3146 let position_id = PositionId::from("P-001");
3147 let opened_fill = OrderFilledSpec::builder()
3148 .instrument_id(instrument.id())
3149 .client_order_id(ClientOrderId::from("O-OPEN"))
3150 .venue_order_id(VenueOrderId::from("V-OPEN"))
3151 .trade_id(TradeId::from("T-OPEN"))
3152 .position_id(position_id)
3153 .last_qty(Quantity::from("1"))
3154 .last_px(Price::from("1.00000"))
3155 .build();
3156 let mut live_position = Position::new(&instrument, opened_fill);
3157 let opened = PositionOpened::create(
3158 &live_position,
3159 &opened_fill,
3160 UUID4::new(),
3161 UnixNanos::from(10),
3162 );
3163
3164 let changed_fill = OrderFilledSpec::builder()
3165 .instrument_id(instrument.id())
3166 .client_order_id(ClientOrderId::from("O-CHANGE"))
3167 .venue_order_id(VenueOrderId::from("V-CHANGE"))
3168 .trade_id(TradeId::from("T-CHANGE"))
3169 .position_id(position_id)
3170 .last_qty(Quantity::from("2"))
3171 .last_px(Price::from("1.10000"))
3172 .build();
3173 live_position.apply(&changed_fill);
3174 let changed = PositionChanged::create(
3175 &live_position,
3176 &changed_fill,
3177 UUID4::new(),
3178 UnixNanos::from(20),
3179 );
3180
3181 let closed_fill = OrderFilledSpec::builder()
3182 .instrument_id(instrument.id())
3183 .client_order_id(ClientOrderId::from("O-CLOSE"))
3184 .venue_order_id(VenueOrderId::from("V-CLOSE"))
3185 .trade_id(TradeId::from("T-CLOSE"))
3186 .order_side(OrderSide::Sell)
3187 .position_id(position_id)
3188 .last_qty(Quantity::from("3"))
3189 .last_px(Price::from("1.20000"))
3190 .build();
3191 live_position.apply(&closed_fill);
3192 let closed = PositionClosed::create(
3193 &live_position,
3194 &closed_fill,
3195 UUID4::new(),
3196 UnixNanos::from(30),
3197 );
3198
3199 let mut stale_position = Position::new(&instrument, opened_fill);
3200 stale_position.signed_qty = 9.0;
3201 stale_position.quantity = Quantity::from("9");
3202 let mut cache = Cache::default();
3203 cache
3204 .add_position(&stale_position, OmsType::Unspecified)
3205 .expect("seed stale position");
3206
3207 let opened_entry =
3208 append_position_event(1, &PositionEvent::PositionOpened(opened.clone())).entry;
3209 let changed_entry =
3210 append_position_event(2, &PositionEvent::PositionChanged(changed.clone())).entry;
3211 let closed_entry =
3212 append_position_event(3, &PositionEvent::PositionClosed(closed.clone())).entry;
3213
3214 assert!(apply_cache_replay_entry(&mut cache, &opened_entry).expect("apply opened"));
3215 let replayed = cache
3216 .position_owned(&position_id)
3217 .expect("position after opened");
3218 assert_eq!(replayed.signed_qty.to_bits(), opened.signed_qty.to_bits());
3219 assert_eq!(replayed.quantity, opened.quantity);
3220 assert_eq!(replayed.ts_last, opened.ts_event);
3221
3222 assert!(apply_cache_replay_entry(&mut cache, &changed_entry).expect("apply changed"));
3223 let replayed = cache
3224 .position_owned(&position_id)
3225 .expect("position after changed");
3226 assert_eq!(replayed.signed_qty.to_bits(), changed.signed_qty.to_bits());
3227 assert_eq!(replayed.quantity, changed.quantity);
3228 assert_eq!(replayed.peak_qty, changed.peak_quantity);
3229 assert_eq!(
3230 replayed.avg_px_open.to_bits(),
3231 changed.avg_px_open.to_bits()
3232 );
3233 assert!(replayed.is_open());
3234
3235 assert!(apply_cache_replay_entry(&mut cache, &closed_entry).expect("apply closed"));
3236 let replayed = cache
3237 .position_owned(&position_id)
3238 .expect("position after closed");
3239 assert_eq!(replayed.signed_qty.to_bits(), closed.signed_qty.to_bits());
3240 assert_eq!(replayed.quantity, closed.quantity);
3241 assert_eq!(replayed.closing_order_id, closed.closing_order_id);
3242 assert_eq!(replayed.duration_ns, closed.duration);
3243 assert!(replayed.is_closed());
3244 assert!(cache.is_position_closed(&position_id));
3245 }
3246
3247 #[rstest]
3248 fn position_adjustment_replay_updates_existing_position() {
3249 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3250 let position_id = PositionId::from("P-001");
3251 let fill = OrderFilledSpec::builder()
3252 .instrument_id(instrument.id())
3253 .position_id(position_id)
3254 .build();
3255 let position = Position::new(&instrument, fill);
3256 let adjustment = PositionAdjusted::new(
3257 fill.trader_id,
3258 fill.strategy_id,
3259 fill.instrument_id,
3260 position_id,
3261 fill.account_id,
3262 PositionAdjustmentType::Funding,
3263 None,
3264 Some(Money::from("2 USD")),
3265 Some(Ustr::from("funding")),
3266 UUID4::new(),
3267 UnixNanos::from(10),
3268 UnixNanos::from(11),
3269 );
3270 let entry = append_position_event(1, &PositionEvent::PositionAdjusted(adjustment)).entry;
3271 let mut cache = Cache::default();
3272 cache
3273 .add_position(&position, OmsType::Unspecified)
3274 .expect("seed position");
3275
3276 let applied = apply_cache_replay_entry(&mut cache, &entry).expect("apply");
3277 let position = cache
3278 .position_owned(&position_id)
3279 .expect("position updated");
3280
3281 assert!(applied);
3282 assert_eq!(position.adjustments, vec![adjustment]);
3283 assert_eq!(position.realized_pnl, Some(Money::from("2 USD")));
3284 assert_eq!(position.ts_last, adjustment.ts_event);
3285 }
3286
3287 #[rstest]
3288 fn duplicate_position_fill_is_not_applied_twice() {
3289 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3290 let position_id = PositionId::from("P-001");
3291 let fill = OrderFilledSpec::builder()
3292 .instrument_id(instrument.id())
3293 .position_id(position_id)
3294 .commission(Money::from("1 USD"))
3295 .build();
3296 let position = Position::new(&instrument, fill);
3297 let entry = append_order_event(1, &OrderEventAny::Filled(fill)).entry;
3298 let mut cache = Cache::default();
3299 cache
3300 .add_position(&position, OmsType::Unspecified)
3301 .expect("seed position");
3302
3303 apply_fill_to_position(&mut cache, &entry, &fill).expect("apply fill");
3304 let position = cache
3305 .position_owned(&position_id)
3306 .expect("position updated");
3307
3308 assert_eq!(position.event_count(), 1);
3309 assert_eq!(position.trade_ids(), vec![fill.trade_id]);
3310 assert_eq!(position.commissions(), vec![Money::from("1 USD")]);
3311 }
3312
3313 #[rstest]
3314 fn corrupt_supported_payload_returns_decode_error() {
3315 let reader = reader_with_entries(
3316 "run-decode-error",
3317 &[append_payload(
3318 1,
3319 PAYLOAD_TYPE_ACCOUNT_STATE,
3320 Bytes::copy_from_slice(&[0xc1]),
3321 )],
3322 );
3323 let mut cache = Cache::default();
3324
3325 let err = replay_cache_snapshot_tail(&mut cache, &reader).expect_err("decode error");
3326
3327 match err {
3328 CacheReplayError::Decode {
3329 seq, payload_type, ..
3330 } => {
3331 assert_eq!(seq, 1);
3332 assert_eq!(payload_type, PAYLOAD_TYPE_ACCOUNT_STATE);
3333 }
3334 other => panic!("expected Decode, was {other:?}"),
3335 }
3336 }
3337
3338 #[rstest]
3339 fn missing_order_event_returns_apply_error() {
3340 let submitted = OrderSubmittedSpec::builder().build();
3341 let reader = reader_with_entries(
3342 "run-apply-error",
3343 &[append_order_event(1, &OrderEventAny::Submitted(submitted))],
3344 );
3345 let mut cache = Cache::default();
3346
3347 let err = replay_cache_snapshot_tail(&mut cache, &reader).expect_err("apply error");
3348
3349 match err {
3350 CacheReplayError::Apply {
3351 seq,
3352 payload_type,
3353 message,
3354 } => {
3355 assert_eq!(seq, 1);
3356 assert_eq!(payload_type, PAYLOAD_TYPE_ORDER_SUBMITTED);
3357 assert!(
3358 message.contains("not found"),
3359 "message should include cache apply failure: {message}",
3360 );
3361 }
3362 other => panic!("expected Apply, was {other:?}"),
3363 }
3364 }
3365
3366 #[rstest]
3367 fn restore_cache_from_sealed_run_restores_snapshot_and_tail() {
3368 let tmp = TempDir::new().expect("tempdir");
3369 let run_id = "sealed-replay";
3370 let instance_id = "trader-001";
3371 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3372 let fill = OrderFilledSpec::builder()
3373 .instrument_id(instrument.id())
3374 .position_id(PositionId::from("P-SEALED-REPLAY-1"))
3375 .build();
3376 let position = Position::new(&instrument, fill);
3377 let mut snapshot_cache = Cache::default();
3378 let snapshot_ref = snapshot_cache
3379 .snapshot_position(&position)
3380 .expect("snapshot position");
3381 let anchored_state = cash_account_state_million_usd("100 USD", "0 USD", "100 USD");
3382 let replayed_state = cash_account_state_million_usd("200 USD", "0 USD", "200 USD");
3383
3384 {
3385 let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3386 backend.open_run(manifest(run_id)).expect("open run");
3387 backend
3388 .append_batch(&[append_account_state(1, &anchored_state)])
3389 .expect("append anchored state");
3390 backend
3391 .record_snapshot_anchor(SnapshotAnchor::new(
3392 1,
3393 snapshot_ref.blob_ref.clone(),
3394 compute_snapshot_content_hash(snapshot_ref.blob.as_ref()),
3395 ))
3396 .expect("record snapshot anchor");
3397 backend
3398 .append_batch(&[append_account_state(2, &replayed_state)])
3399 .expect("append replay tail");
3400 backend.seal(RunStatus::Ended).expect("seal run");
3401 }
3402
3403 let mut cache = Cache::default();
3404 cache
3405 .add(&snapshot_ref.blob_ref, snapshot_ref.blob.clone())
3406 .expect("seed snapshot blob");
3407
3408 let report = restore_cache_from_sealed_run(
3409 &mut cache,
3410 tmp.path().to_path_buf(),
3411 instance_id,
3412 run_id,
3413 )
3414 .expect("restore sealed run");
3415
3416 let frames = cache
3417 .position_snapshot_bytes(&position.id)
3418 .expect("restored position snapshot");
3419 let account = cache
3420 .account_owned(&replayed_state.account_id)
3421 .expect("replayed account");
3422
3423 assert_eq!(report.manifest.run_id, run_id);
3424 assert_eq!(report.manifest.status, RunStatus::Ended);
3425 assert_eq!(report.cache.plan.from_seq, 2);
3426 assert_eq!(report.cache.applied_entries, 1);
3427 assert_eq!(report.cache.ignored_entries, 0);
3428 assert_eq!(frames.len(), 1);
3429 assert_eq!(frames[0].as_slice(), snapshot_ref.blob.as_ref());
3430 assert_eq!(account.events(), vec![replayed_state]);
3431 }
3432
3433 #[rstest]
3434 fn restore_cache_from_sealed_run_rejects_snapshot_hash_mismatch() {
3435 let tmp = TempDir::new().expect("tempdir");
3436 let run_id = "sealed-replay-bad-snapshot";
3437 let instance_id = "trader-001";
3438 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3439 let fill = OrderFilledSpec::builder()
3440 .instrument_id(instrument.id())
3441 .position_id(PositionId::from("P-SEALED-REPLAY-BAD-SNAPSHOT-1"))
3442 .build();
3443 let position = Position::new(&instrument, fill);
3444 let mut snapshot_cache = Cache::default();
3445 let snapshot_ref = snapshot_cache
3446 .snapshot_position(&position)
3447 .expect("snapshot position");
3448
3449 {
3450 let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3451 backend.open_run(manifest(run_id)).expect("open run");
3452 backend
3453 .record_snapshot_anchor(SnapshotAnchor::new(
3454 0,
3455 snapshot_ref.blob_ref.clone(),
3456 compute_snapshot_content_hash(snapshot_ref.blob.as_ref()),
3457 ))
3458 .expect("record snapshot anchor");
3459 backend.seal(RunStatus::Ended).expect("seal run");
3460 }
3461
3462 let mut cache = Cache::default();
3463 cache
3464 .add(
3465 &snapshot_ref.blob_ref,
3466 Bytes::from_static(b"tampered snapshot"),
3467 )
3468 .expect("seed tampered snapshot blob");
3469
3470 let err = restore_cache_from_sealed_run(
3471 &mut cache,
3472 tmp.path().to_path_buf(),
3473 instance_id,
3474 run_id,
3475 )
3476 .expect_err("hash mismatch");
3477
3478 match err {
3479 CacheReplayError::SnapshotRestore { blob_ref, message } => {
3480 assert_eq!(blob_ref, snapshot_ref.blob_ref);
3481 assert!(
3482 message.contains("content_hash mismatch"),
3483 "message should explain hash mismatch: {message}",
3484 );
3485 }
3486 other => panic!("expected SnapshotRestore, was {other:?}"),
3487 }
3488 }
3489
3490 #[rstest]
3491 fn open_event_store_replay_source_rejects_running_run() {
3492 let tmp = TempDir::new().expect("tempdir");
3493 let run_id = "running-replay";
3494 {
3495 let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3496 backend.open_run(manifest(run_id)).expect("open run");
3497 }
3498
3499 let err = open_event_store_replay_source(tmp.path().to_path_buf(), "trader-001", run_id)
3500 .expect_err("running source must fail");
3501
3502 assert!(
3503 err.to_string().contains("not sealed"),
3504 "error should name sealed-run requirement: {err}",
3505 );
3506 }
3507
3508 #[rstest]
3509 fn validate_event_store_replay_source_rejects_quarantined_run() {
3510 let tmp = TempDir::new().expect("tempdir");
3511 let run_id = "quarantined-replay";
3512 {
3513 let mut backend = RedbBackend::new(tmp.path().to_path_buf());
3514 backend.open_run(manifest(run_id)).expect("open run");
3515 backend
3516 .append_batch(&[append_payload(1, "RunStarted", Bytes::new())])
3517 .expect("append");
3518 backend.seal(RunStatus::Quarantined).expect("seal run");
3519 }
3520
3521 let err =
3522 validate_event_store_replay_source(tmp.path().to_path_buf(), "trader-001", run_id)
3523 .expect_err("quarantined source must fail");
3524
3525 assert!(
3526 err.to_string().contains("quarantined"),
3527 "error should reject quarantined replay sources: {err}",
3528 );
3529 }
3530}