1use std::{
26 any::Any,
27 cell::RefCell,
28 fmt::Debug,
29 path::{Path, PathBuf},
30 rc::Rc,
31 sync::{
32 Arc, Mutex,
33 atomic::{AtomicBool, AtomicU64, Ordering},
34 },
35 thread,
36 time::{Duration, Instant},
37};
38
39use bytes::Bytes;
40use nautilus_common::{
41 cache::{Cache, CacheSnapshotRef},
42 clock::Clock,
43 enums::Environment,
44 msgbus::{self, BusTap, Endpoint, MStr, MessagingSwitchboard},
45};
46#[cfg(feature = "live")]
47use nautilus_core::time::get_atomic_clock_realtime;
48use nautilus_core::{
49 UUID4, UnixNanos,
50 time::{AtomicTime, get_atomic_clock_static},
51};
52use nautilus_execution::engine::SnapshotAnchorer;
53use nautilus_system::{
54 KernelEventStore as KernelEventStoreTrait, RegisteredComponents,
55 event_store::{DataMarkerClass, DataMarkerConfig, EventStoreConfig, RetentionMode},
56};
57use ustr::Ustr;
58
59use crate::{
60 BusCaptureAdapter, CacheReplayError, CacheReplayReport, CaptureError, EncoderRegistry,
61 EntryDraft, EventStore, EventStoreError, EventStoreWriter, HaltCallback, HaltReason, Headers,
62 RedbBackend, RunId, RunManifest, RunStatus, ScanDirection, Topic, WriterConfig,
63 compute_snapshot_content_hash, default_registry,
64 markers::{
65 DataClass, DataMarkerCapture, DataMarkerExtractorRegistry, MarkerBackend, MarkerManifest,
66 MarkerWriter, MarkerWriterConfig, RedbMarkerBackend,
67 },
68 restore_cache_from_sealed_run, validate_event_store_replay_source,
69};
70
71const RUN_STARTED_TOPIC: &str = "run.lifecycle.RunStarted";
72const RUN_STARTED_PAYLOAD_TYPE: &str = "RunStarted";
73const RUN_ENDED_TOPIC: &str = "run.lifecycle.RunEnded";
74const RUN_ENDED_PAYLOAD_TYPE: &str = "RunEnded";
75
76#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct RecoveredRun {
79 pub run_id: RunId,
81 pub status: RunStatus,
84}
85
86#[derive(Debug, Default)]
88pub struct RecoveryOutcome {
89 pub recovered: Vec<RecoveredRun>,
91 pub parent_run_id: Option<RunId>,
95}
96
97type RegistryFactory = dyn Fn() -> EncoderRegistry + Send + Sync + 'static;
98type BackendOpenResult = Result<Box<dyn EventStore + Send>, EventStoreError>;
99type BackendOpener =
100 dyn Fn(&EventStoreConfig, &RunManifest) -> BackendOpenResult + Send + Sync + 'static;
101type MarkerRegistryFactory =
102 dyn Fn(&[DataClass]) -> DataMarkerExtractorRegistry + Send + Sync + 'static;
103type SharedMarkerCapture = Rc<RefCell<Option<DataMarkerCapture>>>;
104
105#[derive(Clone)]
111pub struct EventStoreLifecycleOptions {
112 registry_factory: Arc<RegistryFactory>,
113 backend_opener: Arc<BackendOpener>,
114 marker_registry_factory: Arc<MarkerRegistryFactory>,
115}
116
117impl Debug for EventStoreLifecycleOptions {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct(stringify!(EventStoreLifecycleOptions))
120 .finish_non_exhaustive()
121 }
122}
123
124impl Default for EventStoreLifecycleOptions {
125 fn default() -> Self {
126 Self {
127 registry_factory: Arc::new(default_registry),
128 backend_opener: Arc::new(default_backend_opener),
129 marker_registry_factory: Arc::new(DataMarkerExtractorRegistry::default_registry),
130 }
131 }
132}
133
134impl EventStoreLifecycleOptions {
135 #[must_use]
137 pub fn new() -> Self {
138 Self::default()
139 }
140
141 #[must_use]
143 pub fn with_registry_factory<F>(mut self, factory: F) -> Self
144 where
145 F: Fn() -> EncoderRegistry + Send + Sync + 'static,
146 {
147 self.registry_factory = Arc::new(factory);
148 self
149 }
150
151 #[must_use]
153 pub fn with_encoder_registry(self, registry: EncoderRegistry) -> Self {
154 self.with_registry_factory(move || registry.clone())
155 }
156
157 #[must_use]
159 pub fn with_backend_opener<F>(mut self, opener: F) -> Self
160 where
161 F: Fn(&EventStoreConfig, &RunManifest) -> BackendOpenResult + Send + Sync + 'static,
162 {
163 self.backend_opener = Arc::new(opener);
164 self
165 }
166
167 #[must_use]
169 pub fn with_marker_registry_factory<F>(mut self, factory: F) -> Self
170 where
171 F: Fn(&[DataClass]) -> DataMarkerExtractorRegistry + Send + Sync + 'static,
172 {
173 self.marker_registry_factory = Arc::new(factory);
174 self
175 }
176
177 fn build_registry(&self) -> EncoderRegistry {
178 (self.registry_factory)()
179 }
180
181 fn open_backend(&self, config: &EventStoreConfig, manifest: &RunManifest) -> BackendOpenResult {
182 (self.backend_opener)(config, manifest)
183 }
184
185 fn build_marker_registry(&self, classes: &[DataClass]) -> DataMarkerExtractorRegistry {
186 (self.marker_registry_factory)(classes)
187 }
188}
189
190fn default_backend_opener(config: &EventStoreConfig, manifest: &RunManifest) -> BackendOpenResult {
191 let mut backend = RedbBackend::new(config.base_dir.clone());
192 backend.open_run(manifest.clone())?;
193 Ok(Box::new(backend))
194}
195
196#[derive(Debug, thiserror::Error)]
198pub enum BootError {
199 #[error(transparent)]
202 EventStore(#[from] EventStoreError),
203 #[error("RunStarted submit failed: {0}")]
205 RunStartedSubmit(String),
206 #[error("RunStarted did not durably commit within {timeout:?}")]
209 RunStartedTimeout {
210 timeout: Duration,
213 },
214 #[error("event store halted during boot: {0:?}")]
217 HaltedDuringBoot(HaltReason),
218}
219
220#[derive(Clone, Debug)]
226pub struct HaltSignal {
227 halted: Arc<AtomicBool>,
228 reason: Arc<Mutex<Option<HaltReason>>>,
229}
230
231impl Default for HaltSignal {
232 fn default() -> Self {
233 Self::new()
234 }
235}
236
237impl HaltSignal {
238 #[must_use]
240 pub fn new() -> Self {
241 Self {
242 halted: Arc::new(AtomicBool::new(false)),
243 reason: Arc::new(Mutex::new(None)),
244 }
245 }
246
247 #[must_use]
253 pub fn callback(&self) -> HaltCallback {
254 let halted = Arc::clone(&self.halted);
255 let reason = Arc::clone(&self.reason);
256 Arc::new(move |r| {
257 if halted
258 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
259 .is_ok()
260 && let Ok(mut slot) = reason.lock()
261 {
262 *slot = Some(r);
263 }
264 })
265 }
266
267 #[must_use]
269 pub fn is_halted(&self) -> bool {
270 self.halted.load(Ordering::Acquire)
271 }
272
273 #[must_use]
278 pub fn reason(&self) -> Option<HaltReason> {
279 self.reason.lock().ok().and_then(|guard| guard.clone())
280 }
281}
282
283pub struct EventStoreSession {
285 writer: Option<Arc<EventStoreWriter>>,
286 adapter: Option<Arc<BusCaptureAdapter>>,
287 marker_capture: Option<SharedMarkerCapture>,
288 manifest: RunManifest,
289 halt_signal: HaltSignal,
290}
291
292impl Debug for EventStoreSession {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 f.debug_struct(stringify!(EventStoreSession))
295 .field("run_id", &self.manifest.run_id)
296 .field("parent_run_id", &self.manifest.parent_run_id)
297 .field("instance_id", &self.manifest.instance_id)
298 .field("halted", &self.halt_signal.is_halted())
299 .field("writer_attached", &self.writer.is_some())
300 .field("marker_capture_attached", &self.marker_capture.is_some())
301 .finish_non_exhaustive()
302 }
303}
304
305impl EventStoreSession {
306 #[must_use]
311 pub const fn manifest(&self) -> &RunManifest {
312 &self.manifest
313 }
314
315 #[must_use]
317 pub fn run_id(&self) -> &str {
318 self.manifest.run_id.as_str()
319 }
320
321 #[must_use]
323 pub fn parent_run_id(&self) -> Option<&str> {
324 self.manifest.parent_run_id.as_deref()
325 }
326
327 #[must_use]
329 pub fn is_halted(&self) -> bool {
330 self.halt_signal.is_halted()
331 }
332
333 #[must_use]
337 pub fn high_watermark(&self) -> u64 {
338 self.writer.as_ref().map_or(0, |w| w.high_watermark())
339 }
340
341 #[must_use]
347 pub fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
348 let writer = Arc::clone(self.writer.as_ref()?);
349
350 Some(Rc::new(move |snapshot_ref: CacheSnapshotRef| {
351 let content_hash = compute_snapshot_content_hash(snapshot_ref.blob.as_ref());
352 writer
353 .record_snapshot_anchor(snapshot_ref.blob_ref, content_hash)
354 .map(|_| ())
355 .map_err(|e| anyhow::anyhow!("record snapshot anchor: {e}"))
356 }))
357 }
358
359 #[must_use]
363 pub fn adapter(&self) -> Option<&Arc<BusCaptureAdapter>> {
364 self.adapter.as_ref()
365 }
366
367 pub fn close(&mut self, ts_init: UnixNanos) -> Result<(), EventStoreError> {
378 self.adapter = None;
382 let marker_capture = self.marker_capture.take();
383
384 let Some(writer_arc) = self.writer.take() else {
385 close_marker_capture(marker_capture);
386 return Ok(());
387 };
388 let Ok(writer) = Arc::try_unwrap(writer_arc) else {
389 close_marker_capture(marker_capture);
390 return Err(EventStoreError::Backend(
391 "event store writer has multiple owners; clear the bus tap before close"
392 .to_string(),
393 ));
394 };
395
396 let run_ended = run_ended_draft(ts_init);
397 let result = writer.close(run_ended);
398 close_marker_capture(marker_capture);
399 result?;
400 Ok(())
401 }
402}
403
404impl Drop for EventStoreSession {
405 fn drop(&mut self) {
406 self.adapter.take();
409 self.marker_capture.take();
410 self.writer.take();
411 }
412}
413
414fn close_marker_capture(marker_capture: Option<SharedMarkerCapture>) {
415 if let Some(marker_capture) = marker_capture
416 && let Some(capture) = marker_capture.borrow_mut().take()
417 {
418 capture.close();
419 }
420}
421
422#[derive(Debug, thiserror::Error)]
427pub enum KernelError {
428 #[error("event store boot failed: {0}")]
430 EventStoreBoot(#[from] BootError),
431 #[error("event store cache replay failed: {0}")]
433 CacheReplay(#[from] CacheReplayError),
434 #[error("event store halted: {0:?}")]
436 EventStoreHalted(HaltReason),
437}
438
439#[derive(Debug)]
449pub struct EventStoreLifecycle {
450 config: Option<EventStoreConfig>,
451 options: EventStoreLifecycleOptions,
452 recovered: Vec<RecoveredRun>,
453 parent_run_id: Option<String>,
454 session: Option<EventStoreSession>,
455 halt: HaltSignal,
456 clock: Rc<RefCell<dyn Clock>>,
460}
461
462impl EventStoreLifecycle {
463 pub fn boot(
473 config: Option<EventStoreConfig>,
474 instance_id: UUID4,
475 clock: Rc<RefCell<dyn Clock>>,
476 ) -> anyhow::Result<Self> {
477 Self::boot_with_options(
478 config,
479 instance_id,
480 clock,
481 EventStoreLifecycleOptions::default(),
482 )
483 }
484
485 pub fn boot_with_options(
495 config: Option<EventStoreConfig>,
496 instance_id: UUID4,
497 clock: Rc<RefCell<dyn Clock>>,
498 options: EventStoreLifecycleOptions,
499 ) -> anyhow::Result<Self> {
500 let (recovered, parent_run_id) = if let Some(cfg) = config.as_ref() {
501 let outcome = recover_predecessors(&cfg.base_dir, &instance_id.to_string())?;
502 if !outcome.recovered.is_empty() {
503 log::info!(
504 "Sealed {} crashed event-store predecessor(s); parent_run_id={:?}",
505 outcome.recovered.len(),
506 outcome.parent_run_id,
507 );
508 }
509 (outcome.recovered, outcome.parent_run_id)
510 } else {
511 (Vec::new(), None)
512 };
513 Ok(Self {
514 config,
515 options,
516 recovered,
517 parent_run_id,
518 session: None,
519 halt: HaltSignal::new(),
520 clock,
521 })
522 }
523
524 pub fn open(
539 &mut self,
540 instance_id: UUID4,
541 components: &RegisteredComponents,
542 environment: Environment,
543 ) -> Result<(), KernelError> {
544 let Some(config) = self.config.clone() else {
545 return Ok(());
546 };
547
548 if self.session.is_some() {
549 let ts = self.clock.borrow().timestamp_ns();
552 self.seal(ts);
553 }
554
555 let clock = Self::clock_for(environment);
556 let start_ts_init = self.clock.borrow().timestamp_ns();
557 let run_id = build_run_id(start_ts_init);
558 let parent_run_id = if let Some(replay_run_id) = config.replay_from_run_id.as_deref() {
559 validate_event_store_replay_source(
560 config.base_dir.clone(),
561 &instance_id.to_string(),
562 replay_run_id,
563 )?;
564 Some(replay_run_id.to_string())
565 } else {
566 self.parent_run_id.clone()
567 };
568 let session = open_run_with_options(
569 &config,
570 &instance_id.to_string(),
571 run_id,
572 parent_run_id,
573 start_ts_init,
574 components,
575 self.halt.clone(),
576 clock,
577 &self.options,
578 )?;
579 log::info!(
580 "Opened event-store run {} (parent_run_id={:?})",
581 session.run_id(),
582 session.parent_run_id(),
583 );
584
585 if let Some(adapter) = session.adapter() {
586 install_bus_tap(Arc::clone(adapter), session.marker_capture.clone(), clock);
587 }
588 self.session = Some(session);
589 Ok(())
590 }
591
592 pub fn restore_parent_cache(
603 &self,
604 instance_id: UUID4,
605 cache: &mut Cache,
606 ) -> Result<Option<CacheReplayReport>, KernelError> {
607 let Some(config) = self.config.as_ref() else {
608 return Ok(None);
609 };
610 let replay_run_id = config
611 .replay_from_run_id
612 .as_deref()
613 .or(self.parent_run_id.as_deref());
614 let Some(replay_run_id) = replay_run_id else {
615 return Ok(None);
616 };
617 let source = if config.replay_from_run_id.is_some() {
618 "configured replay run"
619 } else {
620 "parent run"
621 };
622
623 let report = restore_cache_from_sealed_run(
624 cache,
625 config.base_dir.clone(),
626 &instance_id.to_string(),
627 replay_run_id,
628 )?;
629
630 log::info!(
631 "Restored cache from event-store {source} {replay_run_id}: from_seq={}, to_seq={}, applied={}, ignored={}",
632 report.cache.plan.from_seq,
633 report.cache.plan.to_seq,
634 report.cache.applied_entries,
635 report.cache.ignored_entries,
636 );
637
638 Ok(Some(report.cache))
639 }
640
641 pub fn seal(&mut self, ts_init: UnixNanos) {
645 let Some(mut session) = self.session.take() else {
646 return;
647 };
648
649 msgbus::clear_bus_tap();
652
653 if session.is_halted() {
654 log::warn!(
655 "Event-store writer fail-stopped before close; run {} sealed by recovery sweep on next boot",
656 session.run_id(),
657 );
658 return;
659 }
660 let run_id = session.run_id().to_string();
661 if let Err(e) = session.close(ts_init) {
662 log::error!(
663 "Failed to seal event-store run {run_id} on graceful stop: {e}; run will be sealed as CrashedRecovered on next boot",
664 );
665 } else {
666 log::info!("Sealed event-store run {run_id}");
667 }
668 }
669
670 #[must_use]
672 pub fn recovered(&self) -> &[RecoveredRun] {
673 &self.recovered
674 }
675
676 #[must_use]
678 pub fn parent_run_id(&self) -> Option<&str> {
679 self.config
680 .as_ref()
681 .and_then(|config| config.replay_from_run_id.as_deref())
682 .or(self.parent_run_id.as_deref())
683 }
684
685 #[must_use]
687 pub fn is_event_store_replay_configured(&self) -> bool {
688 self.config
689 .as_ref()
690 .is_some_and(|config| config.replay_from_run_id.is_some())
691 }
692
693 #[must_use]
695 pub fn run_id(&self) -> Option<&str> {
696 self.session.as_ref().map(EventStoreSession::run_id)
697 }
698
699 #[must_use]
701 pub fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
702 self.session
703 .as_ref()
704 .and_then(EventStoreSession::snapshot_anchorer)
705 }
706
707 #[must_use]
709 pub fn is_halted(&self) -> bool {
710 self.halt.is_halted()
711 }
712
713 #[must_use]
715 pub fn halt_reason(&self) -> Option<HaltReason> {
716 self.halt.reason()
717 }
718
719 #[must_use]
722 pub fn check_halt(&self) -> Option<KernelError> {
723 self.halt_reason().map(KernelError::EventStoreHalted)
724 }
725
726 #[cfg(feature = "live")]
727 fn clock_for(environment: Environment) -> &'static AtomicTime {
728 match environment {
729 Environment::Backtest => get_atomic_clock_static(),
730 Environment::Live | Environment::Sandbox => get_atomic_clock_realtime(),
731 }
732 }
733
734 #[cfg(not(feature = "live"))]
735 fn clock_for(_environment: Environment) -> &'static AtomicTime {
736 get_atomic_clock_static()
737 }
738}
739
740impl Drop for EventStoreLifecycle {
741 fn drop(&mut self) {
742 if self.session.is_none() {
744 return;
745 }
746 let ts = self
747 .clock
748 .try_borrow()
749 .map(|c| c.timestamp_ns())
750 .unwrap_or_default();
751 self.seal(ts);
752 }
753}
754
755pub fn recover_predecessors(
773 base_dir: &Path,
774 instance_id: &str,
775) -> Result<RecoveryOutcome, EventStoreError> {
776 let manifests = RedbBackend::list_runs(base_dir, instance_id)?;
777 let crashed: Vec<RunManifest> = manifests
778 .into_iter()
779 .filter(|m| matches!(m.status, RunStatus::Running))
780 .collect();
781
782 let mut outcome = RecoveryOutcome::default();
783
784 for predecessor in crashed {
785 let run_id = predecessor.run_id.clone();
786 let mut backend = RedbBackend::new(base_dir.to_path_buf());
787
788 match backend.open_run(predecessor) {
789 Err(EventStoreError::CrashedPredecessor) => {}
790 Ok(()) => {
791 return Err(EventStoreError::Backend(format!(
792 "expected CrashedPredecessor reopening {run_id}, was Ok",
793 )));
794 }
795 Err(other) => return Err(other),
796 }
797
798 let high_watermark = backend.high_watermark()?;
799 let final_status = if high_watermark == 0 {
800 RunStatus::CrashedRecovered
801 } else {
802 match backend.scan_range(1, high_watermark, ScanDirection::Forward) {
803 Ok(entries) => {
804 let tail_is_run_ended = entries.last().is_some_and(|e| {
812 e.topic.as_ref() == RUN_ENDED_TOPIC
813 && e.payload_type.as_str() == RUN_ENDED_PAYLOAD_TYPE
814 });
815
816 if tail_is_run_ended {
817 RunStatus::Ended
818 } else {
819 RunStatus::CrashedRecovered
820 }
821 }
822 Err(
823 EventStoreError::HashMismatch { .. }
824 | EventStoreError::Corrupted(_)
825 | EventStoreError::Gap { .. },
826 ) => RunStatus::Quarantined,
827 Err(other) => return Err(other),
828 }
829 };
830
831 backend.seal(final_status)?;
832 outcome.recovered.push(RecoveredRun {
833 run_id: run_id.clone(),
834 status: final_status,
835 });
836
837 if matches!(final_status, RunStatus::CrashedRecovered) {
838 outcome.parent_run_id = Some(run_id);
839 }
840 }
841
842 Ok(outcome)
843}
844
845#[must_use]
852pub fn build_run_id(start_ts_init: UnixNanos) -> RunId {
853 let suffix: String = UUID4::new().to_string().chars().take(8).collect();
854 format!("{}-{suffix}", u64::from(start_ts_init))
855}
856
857#[allow(clippy::too_many_arguments)]
875pub fn open_run(
876 config: &EventStoreConfig,
877 instance_id: &str,
878 run_id: RunId,
879 parent_run_id: Option<RunId>,
880 start_ts_init: UnixNanos,
881 components: &RegisteredComponents,
882 halt_signal: HaltSignal,
883 clock: &'static AtomicTime,
884) -> Result<EventStoreSession, BootError> {
885 open_run_with_options(
886 config,
887 instance_id,
888 run_id,
889 parent_run_id,
890 start_ts_init,
891 components,
892 halt_signal,
893 clock,
894 &EventStoreLifecycleOptions::default(),
895 )
896}
897
898#[allow(clippy::too_many_arguments)]
910pub fn open_run_with_options(
911 config: &EventStoreConfig,
912 instance_id: &str,
913 run_id: RunId,
914 parent_run_id: Option<RunId>,
915 start_ts_init: UnixNanos,
916 components: &RegisteredComponents,
917 halt_signal: HaltSignal,
918 clock: &'static AtomicTime,
919 options: &EventStoreLifecycleOptions,
920) -> Result<EventStoreSession, BootError> {
921 let manifest = build_manifest(
922 config,
923 instance_id,
924 run_id,
925 parent_run_id,
926 start_ts_init,
927 components.clone(),
928 );
929
930 let backend = options.open_backend(config, &manifest)?;
931
932 let writer = Arc::new(EventStoreWriter::spawn(
933 backend,
934 clock,
935 halt_signal.callback(),
936 writer_config_from(config),
937 )?);
938
939 submit_run_started_blocking(
940 &writer,
941 components,
942 start_ts_init,
943 &halt_signal,
944 config.run_started_timeout,
945 )?;
946
947 let (marker_capture, submit_counter) =
948 build_marker_capture(config, &manifest, writer.high_watermark(), clock, options);
949 let mut adapter = BusCaptureAdapter::new(
950 Arc::clone(&writer),
951 Arc::new(options.build_registry()),
952 halt_signal.callback(),
953 );
954
955 if let Some(submit_counter) = submit_counter {
956 adapter = adapter.with_submit_counter(submit_counter);
957 }
958 let adapter = Arc::new(adapter);
959
960 Ok(EventStoreSession {
961 writer: Some(writer),
962 adapter: Some(adapter),
963 marker_capture,
964 manifest,
965 halt_signal,
966 })
967}
968
969fn build_marker_capture(
970 config: &EventStoreConfig,
971 manifest: &RunManifest,
972 initial_submit_counter: u64,
973 clock: &'static AtomicTime,
974 options: &EventStoreLifecycleOptions,
975) -> (Option<SharedMarkerCapture>, Option<Arc<AtomicU64>>) {
976 let Some(marker_config) = config.data_markers.as_ref() else {
977 return (None, None);
978 };
979
980 match open_marker_capture(
981 config,
982 manifest,
983 marker_config,
984 initial_submit_counter,
985 clock,
986 options,
987 ) {
988 Ok((capture, submit_counter)) => (
989 Some(Rc::new(RefCell::new(Some(capture)))),
990 Some(submit_counter),
991 ),
992 Err(e) => {
993 log::warn!(
994 "Data marker sidecar disabled for run {} after marker setup failed: {e}",
995 manifest.run_id,
996 );
997 (None, None)
998 }
999 }
1000}
1001
1002fn open_marker_capture(
1003 config: &EventStoreConfig,
1004 manifest: &RunManifest,
1005 marker_config: &DataMarkerConfig,
1006 initial_submit_counter: u64,
1007 clock: &'static AtomicTime,
1008 options: &EventStoreLifecycleOptions,
1009) -> Result<(DataMarkerCapture, Arc<AtomicU64>), EventStoreError> {
1010 let classes = marker_config
1011 .classes
1012 .iter()
1013 .copied()
1014 .map(data_marker_class_to_data_class)
1015 .collect::<Vec<_>>();
1016 let marker_manifest = marker_manifest_for(manifest, classes.clone(), marker_config);
1017 let marker_path = marker_file_path(config, &manifest.instance_id, &manifest.run_id);
1018 let mut marker_backend = RedbMarkerBackend::new(marker_path);
1019 marker_backend.open_run(marker_manifest)?;
1020 let writer = MarkerWriter::spawn(
1021 Box::new(marker_backend),
1022 clock,
1023 MarkerWriterConfig {
1024 channel_capacity: marker_config.channel_capacity,
1025 ..MarkerWriterConfig::default()
1026 },
1027 )?;
1028 let submit_counter = Arc::new(AtomicU64::new(initial_submit_counter));
1029 let registry = options.build_marker_registry(&classes);
1030 let capture =
1031 DataMarkerCapture::new(registry, writer, Arc::clone(&submit_counter), marker_config);
1032
1033 Ok((capture, submit_counter))
1034}
1035
1036fn marker_file_path(config: &EventStoreConfig, instance_id: &str, run_id: &str) -> PathBuf {
1037 config
1038 .base_dir
1039 .join(instance_id)
1040 .join(format!("{run_id}.markers.redb"))
1041}
1042
1043fn marker_manifest_for(
1044 manifest: &RunManifest,
1045 enabled_classes: Vec<DataClass>,
1046 config: &DataMarkerConfig,
1047) -> MarkerManifest {
1048 MarkerManifest {
1049 run_id: manifest.run_id.clone(),
1050 enabled_classes,
1051 high_fidelity: !config.high_fidelity.is_empty(),
1052 snapshot_count: 0,
1053 hifi_count: 0,
1054 gap_count: 0,
1055 dict_count: 0,
1056 status: RunStatus::Running,
1057 }
1058}
1059
1060const fn data_marker_class_to_data_class(class: DataMarkerClass) -> DataClass {
1061 match class {
1062 DataMarkerClass::BookDeltas => DataClass::BookDeltas,
1063 DataMarkerClass::BookDepth10 => DataClass::BookDepth10,
1064 DataMarkerClass::Quote => DataClass::Quote,
1065 DataMarkerClass::Trade => DataClass::Trade,
1066 DataMarkerClass::Bar => DataClass::Bar,
1067 }
1068}
1069
1070fn build_manifest(
1071 config: &EventStoreConfig,
1072 instance_id: &str,
1073 run_id: RunId,
1074 parent_run_id: Option<RunId>,
1075 start_ts_init: UnixNanos,
1076 components: RegisteredComponents,
1077) -> RunManifest {
1078 let mut feature_flags = config.identity.feature_flags.clone();
1079 feature_flags.push(format!("retention={}", retention_tag(config.retention)));
1080
1081 RunManifest {
1082 run_id,
1083 parent_run_id,
1084 instance_id: instance_id.to_string(),
1085 binary_hash: config.identity.binary_hash.clone(),
1086 schema_version: config.identity.schema_version,
1087 crate_versions: config.identity.crate_versions.clone(),
1088 feature_flags,
1089 adapter_versions: config.identity.adapter_versions.clone(),
1090 config_hash: config.identity.config_hash.clone(),
1091 registered_components: components,
1092 seed: config.identity.seed,
1093 start_ts_init,
1094 end_ts_init: None,
1095 high_watermark: 0,
1096 status: RunStatus::Running,
1097 }
1098}
1099
1100const fn retention_tag(mode: RetentionMode) -> &'static str {
1101 match mode {
1102 RetentionMode::Full => "full",
1103 RetentionMode::Bounded { .. } => "bounded",
1104 RetentionMode::SnapshotAnchored => "snapshot",
1105 }
1106}
1107
1108fn writer_config_from(config: &EventStoreConfig) -> WriterConfig {
1109 WriterConfig {
1110 channel_capacity: config.channel_capacity,
1111 max_batch_entries: config.max_batch_entries,
1112 max_batch_latency: config.max_batch_latency,
1113 halt_threshold: config.halt_threshold,
1114 }
1115}
1116
1117pub(crate) fn submit_run_started_blocking(
1129 writer: &EventStoreWriter,
1130 components: &RegisteredComponents,
1131 ts_init: UnixNanos,
1132 halt_signal: &HaltSignal,
1133 timeout: Duration,
1134) -> Result<(), BootError> {
1135 let payload = encode_run_started(components);
1136 let draft = EntryDraft::without_indices(
1137 Headers::empty(),
1138 Topic::from(RUN_STARTED_TOPIC),
1139 Ustr::from(RUN_STARTED_PAYLOAD_TYPE),
1140 payload,
1141 ts_init,
1142 );
1143
1144 writer
1145 .submit(draft)
1146 .map_err(|e| BootError::RunStartedSubmit(e.to_string()))?;
1147
1148 let start = Instant::now(); while writer.high_watermark() == 0 {
1153 if halt_signal.is_halted() {
1154 return Err(BootError::HaltedDuringBoot(
1155 halt_signal.reason().unwrap_or_else(|| {
1156 HaltReason::BackendError("event store halted during boot".to_string())
1157 }),
1158 ));
1159 }
1160
1161 let elapsed = start.elapsed();
1162
1163 if elapsed >= timeout {
1164 return Err(BootError::RunStartedTimeout { timeout });
1165 }
1166 thread::sleep(Duration::from_millis(1));
1167 }
1168
1169 Ok(())
1170}
1171
1172fn encode_run_started(components: &RegisteredComponents) -> Bytes {
1173 let bytes = bincode::serde::encode_to_vec(components, bincode::config::standard())
1176 .expect("RegisteredComponents serializes via serde, must not fail under standard config");
1177 Bytes::from(bytes)
1178}
1179
1180fn run_ended_draft(ts_init: UnixNanos) -> EntryDraft {
1181 EntryDraft::without_indices(
1182 Headers::empty(),
1183 Topic::from(RUN_ENDED_TOPIC),
1184 Ustr::from(RUN_ENDED_PAYLOAD_TYPE),
1185 Bytes::new(),
1186 ts_init,
1187 )
1188}
1189
1190struct EventStoreBusTap {
1197 adapter: Arc<BusCaptureAdapter>,
1198 marker_capture: Option<SharedMarkerCapture>,
1199 clock: &'static AtomicTime,
1200}
1201
1202impl Debug for EventStoreBusTap {
1203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1204 f.debug_struct(stringify!(EventStoreBusTap))
1205 .field("halted", &self.adapter.is_halted())
1206 .field("marker_capture_attached", &self.marker_capture.is_some())
1207 .finish_non_exhaustive()
1208 }
1209}
1210
1211impl BusTap for EventStoreBusTap {
1212 fn on_publish(&self, topic: Topic, message: &dyn Any) {
1213 let ts_init = self.clock.get_time_ns();
1214 self.capture(topic, message, ts_init);
1215 }
1216
1217 fn on_send(&self, endpoint: MStr<Endpoint>, message: &dyn Any) {
1218 let ts_init = self.clock.get_time_ns();
1219 let topic = Topic::from(*endpoint);
1222 self.capture(topic, message, ts_init);
1223 }
1224
1225 fn on_response(&self, _correlation_id: &UUID4, message: &dyn Any) {
1226 let ts_init = self.clock.get_time_ns();
1227 let topic = MessagingSwitchboard::data_response_topic();
1228 self.capture(topic, message, ts_init);
1229 }
1230}
1231
1232impl EventStoreBusTap {
1233 fn capture(&self, topic: Topic, message: &dyn Any, ts_init: UnixNanos) {
1234 let headers = self
1239 .adapter
1240 .registry()
1241 .headers_for_any(message)
1242 .unwrap_or_else(Headers::empty);
1243 match self.adapter.capture_any(topic, message, headers, ts_init) {
1246 Ok(captured) => {
1247 self.capture_marker(topic, message, ts_init, captured);
1248 }
1249 Err(CaptureError::Halted) => {}
1250 Err(CaptureError::Submit(e)) => {
1251 log::error!("Event store capture submit failed on {topic}: {e}");
1252 }
1253 Err(CaptureError::Encode(e)) => {
1254 log::warn!("Event store encoder rejected message on {topic}: {e}");
1255 }
1256 }
1257 }
1258
1259 fn capture_marker(&self, topic: Topic, message: &dyn Any, ts_init: UnixNanos, captured: bool) {
1260 let Some(marker_capture) = self.marker_capture.as_ref() else {
1261 return;
1262 };
1263 let mut marker_capture = marker_capture.borrow_mut();
1264 let Some(capture) = marker_capture.as_mut() else {
1265 return;
1266 };
1267
1268 if captured {
1269 capture.on_entry_submitted(ts_init);
1270 } else {
1271 capture.observe_publish(topic, message, ts_init);
1272 }
1273 capture.maybe_safety_flush(ts_init);
1274 }
1275}
1276
1277fn install_bus_tap(
1278 adapter: Arc<BusCaptureAdapter>,
1279 marker_capture: Option<SharedMarkerCapture>,
1280 clock: &'static AtomicTime,
1281) {
1282 let tap: Rc<dyn BusTap> = Rc::new(EventStoreBusTap {
1283 adapter,
1284 marker_capture,
1285 clock,
1286 });
1287 msgbus::set_bus_tap(tap);
1288}
1289
1290#[allow(clippy::use_self)]
1293impl KernelEventStoreTrait for EventStoreLifecycle {
1294 fn restore_parent_cache(
1295 &mut self,
1296 instance_id: UUID4,
1297 cache: &mut Cache,
1298 ) -> anyhow::Result<()> {
1299 EventStoreLifecycle::restore_parent_cache(self, instance_id, cache)
1300 .map(|_| ())
1301 .map_err(Into::into)
1302 }
1303
1304 fn open(
1305 &mut self,
1306 instance_id: UUID4,
1307 components: &RegisteredComponents,
1308 environment: Environment,
1309 ) -> anyhow::Result<()> {
1310 EventStoreLifecycle::open(self, instance_id, components, environment).map_err(Into::into)
1311 }
1312
1313 fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
1314 EventStoreLifecycle::snapshot_anchorer(self)
1315 }
1316
1317 fn seal(&mut self, ts_init: UnixNanos) {
1318 EventStoreLifecycle::seal(self, ts_init);
1319 }
1320
1321 fn run_id(&self) -> Option<&str> {
1322 EventStoreLifecycle::run_id(self)
1323 }
1324
1325 fn parent_run_id(&self) -> Option<&str> {
1326 EventStoreLifecycle::parent_run_id(self)
1327 }
1328
1329 fn is_event_store_replay_configured(&self) -> bool {
1330 EventStoreLifecycle::is_event_store_replay_configured(self)
1331 }
1332
1333 fn is_halted(&self) -> bool {
1334 EventStoreLifecycle::is_halted(self)
1335 }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340 #[cfg(madsim)]
1341 use std::path::Path;
1342 use std::path::PathBuf;
1343
1344 use indexmap::IndexMap;
1345 use nautilus_common::{
1346 clock::TestClock,
1347 messages::{
1348 data::{
1349 DataCommand, DataResponse, QuotesResponse, RequestCommand, RequestQuotes,
1350 SubscribeCommand, SubscribeQuotes,
1351 },
1352 execution::{SubmitOrder, TradingCommand},
1353 },
1354 timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
1355 };
1356 use nautilus_core::time::get_atomic_clock_static;
1357 use nautilus_model::{
1358 data::stubs::{quote_ethusdt_binance, stub_deltas},
1359 enums::TimeInForce,
1360 events::{
1361 OrderEventAny, OrderFilled,
1362 order::spec::{OrderFilledSpec, OrderInitializedSpec},
1363 },
1364 identifiers::{
1365 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
1366 VenueOrderId,
1367 },
1368 types::{Currency, Money, Price, Quantity},
1369 };
1370 use nautilus_system::event_store::{DataMarkerClass, DataMarkerConfig, RunIdentity};
1371 use rstest::rstest;
1372 use tempfile::TempDir;
1373
1374 use super::*;
1375 use crate::{
1376 AppendEntry, DataClass, EncodedPayload, EventStoreEntry, IndexKind, MarkerBackend,
1377 MemoryBackend, RedbMarkerBackend, SnapshotAnchor,
1378 capture::builtins::PAYLOAD_TYPE_TIME_EVENT, compute_entry_hash,
1379 };
1380
1381 const INSTANCE_ID: &str = "trader-001";
1382
1383 fn make_config(base_dir: PathBuf) -> EventStoreConfig {
1384 EventStoreConfig {
1385 base_dir,
1386 identity: RunIdentity {
1387 binary_hash: "deadbeef".to_string(),
1388 schema_version: 1,
1389 crate_versions: "feedface".to_string(),
1390 feature_flags: Vec::new(),
1391 adapter_versions: IndexMap::new(),
1392 config_hash: "cafebabe".to_string(),
1393 seed: None,
1394 },
1395 retention: RetentionMode::Full,
1396 replay_from_run_id: None,
1397 data_markers: None,
1398 channel_capacity: 64,
1399 max_batch_entries: 1,
1400 max_batch_latency: Duration::from_millis(2),
1401 halt_threshold: Duration::from_secs(2),
1402 run_started_timeout: Duration::from_secs(2),
1403 }
1404 }
1405
1406 #[derive(Clone, Copy, Debug)]
1407 enum CrashPoint {
1408 BeforeEnqueue,
1409 AfterEnqueueBeforeCommit,
1410 AfterCommitBeforeSnapshot,
1411 AfterSnapshot,
1412 }
1413
1414 fn append_entry(seq: u64, topic: &str, payload_type: &str, payload: Bytes) -> AppendEntry {
1415 let ts = UnixNanos::from(seq);
1416 let headers = Headers::empty();
1417 let hash = compute_entry_hash(seq, ts, ts, topic, payload_type, &payload, &headers);
1418 let entry = EventStoreEntry::new(
1419 hash,
1420 seq,
1421 headers,
1422 Topic::from(topic),
1423 Ustr::from(payload_type),
1424 payload,
1425 ts,
1426 ts,
1427 );
1428 AppendEntry::without_indices(entry)
1429 }
1430
1431 fn make_submit_order(client_order_id: ClientOrderId) -> SubmitOrder {
1432 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
1433 let order_init = OrderInitializedSpec::builder()
1434 .instrument_id(instrument_id)
1435 .client_order_id(client_order_id)
1436 .quantity(Quantity::from("1"))
1437 .time_in_force(TimeInForce::Gtc)
1438 .ts_event(UnixNanos::from(1))
1439 .ts_init(UnixNanos::from(2))
1440 .build();
1441 SubmitOrder::new(
1442 TraderId::from("TRADER-001"),
1443 Some(ClientId::from("BINANCE")),
1444 StrategyId::from("S-001"),
1445 instrument_id,
1446 client_order_id,
1447 order_init,
1448 None,
1449 None,
1450 None,
1451 UUID4::new(),
1452 UnixNanos::from(3),
1453 None, )
1455 }
1456
1457 fn append_run_started(seq: u64) -> AppendEntry {
1458 append_entry(
1459 seq,
1460 RUN_STARTED_TOPIC,
1461 RUN_STARTED_PAYLOAD_TYPE,
1462 encode_run_started(&RegisteredComponents::default()),
1463 )
1464 }
1465
1466 #[derive(Debug)]
1467 struct TestAuditMessage {
1468 value: u8,
1469 }
1470
1471 fn test_registry() -> EncoderRegistry {
1472 let mut registry = EncoderRegistry::new();
1473 registry.register::<TestAuditMessage, _>(Ustr::from("TestAuditMessage"), |message| {
1474 Ok(EncodedPayload::without_indices(Bytes::copy_from_slice(&[
1475 message.value,
1476 ])))
1477 });
1478 registry
1479 }
1480
1481 fn wait_for_high_watermark(store: &EventStoreLifecycle, expected: u64) {
1482 let deadline = Instant::now() + Duration::from_secs(2);
1483
1484 loop {
1485 let hwm = store
1486 .session
1487 .as_ref()
1488 .map_or(0, EventStoreSession::high_watermark);
1489
1490 if hwm >= expected {
1491 break;
1492 }
1493 assert!(
1494 Instant::now() < deadline,
1495 "event store high_watermark did not reach {expected} within deadline (hwm={hwm})",
1496 );
1497 thread::sleep(Duration::from_millis(2));
1498 }
1499 }
1500
1501 #[derive(Debug, Clone)]
1502 struct SharedMemoryBackend(Arc<Mutex<MemoryBackend>>);
1503
1504 impl EventStore for SharedMemoryBackend {
1505 fn open_run(&mut self, manifest: RunManifest) -> Result<(), EventStoreError> {
1506 self.0.lock().expect("memory backend").open_run(manifest)
1507 }
1508
1509 fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
1510 self.0.lock().expect("memory backend").append_batch(entries)
1511 }
1512
1513 fn scan_range(
1514 &self,
1515 from: u64,
1516 to: u64,
1517 direction: ScanDirection,
1518 ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
1519 self.0
1520 .lock()
1521 .expect("memory backend")
1522 .scan_range(from, to, direction)
1523 }
1524
1525 fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
1526 self.0.lock().expect("memory backend").scan_seq(seq)
1527 }
1528
1529 fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
1530 self.0.lock().expect("memory backend").lookup(kind, key)
1531 }
1532
1533 fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
1534 self.0.lock().expect("memory backend").iter_index_keys(kind)
1535 }
1536
1537 fn record_snapshot_anchor(
1538 &mut self,
1539 anchor: SnapshotAnchor,
1540 ) -> Result<(), EventStoreError> {
1541 self.0
1542 .lock()
1543 .expect("memory backend")
1544 .record_snapshot_anchor(anchor)
1545 }
1546
1547 fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
1548 self.0
1549 .lock()
1550 .expect("memory backend")
1551 .latest_snapshot_anchor()
1552 }
1553
1554 fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
1555 self.0.lock().expect("memory backend").seal(status)
1556 }
1557
1558 fn manifest(&self) -> Result<RunManifest, EventStoreError> {
1559 self.0.lock().expect("memory backend").manifest()
1560 }
1561
1562 fn high_watermark(&self) -> Result<u64, EventStoreError> {
1563 self.0.lock().expect("memory backend").high_watermark()
1564 }
1565 }
1566
1567 fn seed_crashed_predecessor(config: &EventStoreConfig, run_id: &str, crash_point: CrashPoint) {
1568 let mut backend = RedbBackend::new(config.base_dir.clone());
1569 backend
1570 .open_run(build_manifest(
1571 config,
1572 INSTANCE_ID,
1573 run_id.to_string(),
1574 None,
1575 UnixNanos::from(1_000),
1576 RegisteredComponents::default(),
1577 ))
1578 .expect("open predecessor");
1579
1580 match crash_point {
1581 CrashPoint::BeforeEnqueue | CrashPoint::AfterEnqueueBeforeCommit => {}
1585 CrashPoint::AfterCommitBeforeSnapshot => {
1586 backend
1587 .append_batch(&[append_run_started(1)])
1588 .expect("append committed entry");
1589 }
1590 CrashPoint::AfterSnapshot => {
1591 backend
1592 .append_batch(&[append_run_started(1)])
1593 .expect("append committed entry");
1594 backend
1595 .record_snapshot_anchor(SnapshotAnchor::new(
1596 1,
1597 "cache://snapshot/run-crash/1",
1598 "blake3:abc",
1599 ))
1600 .expect("record snapshot anchor");
1601 }
1602 }
1603 }
1604
1605 #[rstest]
1606 fn halt_signal_callback_records_first_reason() {
1607 let signal = HaltSignal::new();
1608 let cb = signal.callback();
1609 cb(HaltReason::BackendDisk("ENOSPC".to_string()));
1610 cb(HaltReason::BackendError("second".to_string()));
1611
1612 assert!(signal.is_halted());
1613 match signal.reason() {
1614 Some(HaltReason::BackendDisk(msg)) => assert!(msg.contains("ENOSPC")),
1615 other => panic!("expected first reason BackendDisk, was {other:?}"),
1616 }
1617 }
1618
1619 #[rstest]
1620 fn recover_predecessors_returns_empty_for_missing_directory() {
1621 let tmp = TempDir::new().expect("tempdir");
1622 let outcome =
1623 recover_predecessors(tmp.path(), INSTANCE_ID).expect("recover empty directory");
1624 assert!(outcome.recovered.is_empty());
1625 assert!(outcome.parent_run_id.is_none());
1626 }
1627
1628 #[rstest]
1629 fn restore_cache_snapshot_blob_rejects_hash_mismatch() {
1630 let mut cache = Cache::default();
1631 let blob = Bytes::from_static(b"snapshot");
1632 let anchor =
1633 crate::SnapshotAnchor::new(0, "cache://position-snapshots/P-1/0", "blake3:bad");
1634
1635 cache
1636 .add(&anchor.blob_ref, blob)
1637 .expect("seed snapshot blob");
1638 let err =
1639 crate::restore_cache_snapshot_blob(&mut cache, Some(&anchor)).expect_err("hash error");
1640
1641 assert!(
1642 err.to_string().contains("content_hash mismatch"),
1643 "err was: {err}",
1644 );
1645 }
1646
1647 #[rstest]
1648 fn open_run_writes_run_started_and_advances_watermark() {
1649 let tmp = TempDir::new().expect("tempdir");
1650 let config = make_config(tmp.path().to_path_buf());
1651 let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover empty");
1652 assert!(outcome.parent_run_id.is_none());
1653
1654 let halt = HaltSignal::new();
1655 let session = open_run(
1656 &config,
1657 INSTANCE_ID,
1658 build_run_id(UnixNanos::from(1_000)),
1659 outcome.parent_run_id,
1660 UnixNanos::from(1_000),
1661 &RegisteredComponents::default(),
1662 halt,
1663 get_atomic_clock_static(),
1664 )
1665 .expect("open run");
1666
1667 assert_eq!(session.high_watermark(), 1);
1669 assert_eq!(session.parent_run_id(), None);
1670
1671 let manifest = session.manifest();
1675 assert_eq!(manifest.instance_id, INSTANCE_ID);
1676 assert_eq!(manifest.status, RunStatus::Running);
1677 assert_eq!(manifest.binary_hash, "deadbeef");
1678 assert_eq!(manifest.schema_version, 1);
1679 assert_eq!(manifest.crate_versions, "feedface");
1680 assert_eq!(manifest.config_hash, "cafebabe");
1681 assert_eq!(manifest.start_ts_init, UnixNanos::from(1_000));
1682 assert_eq!(manifest.end_ts_init, None);
1683 assert!(
1684 manifest
1685 .feature_flags
1686 .contains(&"retention=full".to_string()),
1687 "feature_flags must record the retention mode, was {:?}",
1688 manifest.feature_flags,
1689 );
1690 }
1691
1692 #[rstest]
1693 fn lifecycle_options_default_registry_keeps_builtin_encoders() {
1694 let registry = EventStoreLifecycleOptions::default().build_registry();
1695
1696 assert!(registry.contains::<SubmitOrder>());
1697 assert!(registry.contains::<TradingCommand>());
1698 assert!(!registry.contains::<TestAuditMessage>());
1699 }
1700
1701 #[rstest]
1702 fn lifecycle_options_custom_registry_captures_registered_message() {
1703 let tmp = TempDir::new().expect("tempdir");
1704 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1705 let instance_id = UUID4::new();
1706 let options = EventStoreLifecycleOptions::new().with_encoder_registry(test_registry());
1707
1708 let mut store = EventStoreLifecycle::boot_with_options(
1709 Some(make_config(tmp.path().to_path_buf())),
1710 instance_id,
1711 clock_rc,
1712 options,
1713 )
1714 .expect("boot store");
1715 store
1716 .open(
1717 instance_id,
1718 &RegisteredComponents::default(),
1719 Environment::Backtest,
1720 )
1721 .expect("open run");
1722 let run_id = store.run_id().expect("run open").to_string();
1723
1724 let topic: MStr<msgbus::Topic> = MStr::from("events.test.audit");
1725 msgbus::publish_any(topic, &TestAuditMessage { value: 42 });
1726 wait_for_high_watermark(&store, 2);
1727
1728 drop(store);
1729
1730 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
1731 .expect("open sealed");
1732 let captured = sealed
1733 .scan_seq(2)
1734 .expect("scan")
1735 .expect("captured entry present");
1736
1737 assert_eq!(captured.payload_type.as_str(), "TestAuditMessage");
1738 assert_eq!(captured.topic.as_ref(), topic.as_str());
1739 assert_eq!(captured.payload.as_ref(), &[42]);
1740 }
1741
1742 #[rstest]
1743 fn lifecycle_options_memory_backend_opener_captures_and_seals() {
1744 let tmp = TempDir::new().expect("tempdir");
1745 let memory = Arc::new(Mutex::new(MemoryBackend::new()));
1746 let opener_memory = Arc::clone(&memory);
1747 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1748 let instance_id = UUID4::new();
1749 let options = EventStoreLifecycleOptions::new()
1750 .with_encoder_registry(test_registry())
1751 .with_backend_opener(move |_, manifest| {
1752 opener_memory
1753 .lock()
1754 .expect("memory backend")
1755 .open_run(manifest.clone())?;
1756 Ok(Box::new(SharedMemoryBackend(Arc::clone(&opener_memory))))
1757 });
1758
1759 let mut store = EventStoreLifecycle::boot_with_options(
1760 Some(make_config(tmp.path().to_path_buf())),
1761 instance_id,
1762 clock_rc,
1763 options,
1764 )
1765 .expect("boot store");
1766 store
1767 .open(
1768 instance_id,
1769 &RegisteredComponents::default(),
1770 Environment::Backtest,
1771 )
1772 .expect("open run");
1773
1774 let topic: MStr<msgbus::Topic> = MStr::from("events.test.memory");
1775 msgbus::publish_any(topic, &TestAuditMessage { value: 7 });
1776 wait_for_high_watermark(&store, 2);
1777
1778 store.seal(UnixNanos::from(1_000));
1779
1780 let backend = memory.lock().expect("memory backend");
1781 let manifest = backend.manifest().expect("manifest");
1782 let captured = backend
1783 .scan_seq(2)
1784 .expect("scan")
1785 .expect("captured entry present");
1786
1787 assert_eq!(manifest.instance_id, instance_id.to_string());
1788 assert_eq!(manifest.status, RunStatus::Ended);
1789 assert_eq!(manifest.high_watermark, 3);
1790 assert_eq!(captured.payload_type.as_str(), "TestAuditMessage");
1791 assert_eq!(captured.topic.as_ref(), topic.as_str());
1792 assert_eq!(captured.payload.as_ref(), &[7]);
1793 }
1794
1795 #[cfg(madsim)]
1796 #[rstest]
1797 fn lifecycle_options_memory_backend_opener_captures_deterministic_seq_order_under_madsim() {
1798 let first = capture_madsim_memory_lifecycle_summary(42);
1799 let second = capture_madsim_memory_lifecycle_summary(42);
1800 let expected = expected_madsim_memory_entries();
1801
1802 assert_eq!(first.entries, second.entries);
1803 assert_eq!(first.entries, expected);
1804 assert_eq!(
1805 first
1806 .entries
1807 .iter()
1808 .map(|entry| entry.seq)
1809 .collect::<Vec<_>>(),
1810 vec![1, 2, 3, 4],
1811 );
1812 assert!(
1813 first.redb_files.is_empty(),
1814 "memory opener must not create redb files, was {:?}",
1815 first.redb_files,
1816 );
1817 assert!(
1818 second.redb_files.is_empty(),
1819 "memory opener must not create redb files, was {:?}",
1820 second.redb_files,
1821 );
1822 }
1823
1824 #[cfg(madsim)]
1825 fn expected_madsim_memory_entries() -> Vec<CapturedEntrySummary> {
1826 vec![
1827 CapturedEntrySummary {
1828 seq: 1,
1829 topic: RUN_STARTED_TOPIC.to_string(),
1830 payload_type: RUN_STARTED_PAYLOAD_TYPE.to_string(),
1831 payload: encode_run_started(&RegisteredComponents::default()).to_vec(),
1832 ts_init: UnixNanos::from(0),
1833 ts_publish: UnixNanos::from(10_000),
1834 },
1835 CapturedEntrySummary {
1836 seq: 2,
1837 topic: "events.test.madsim".to_string(),
1838 payload_type: "TestAuditMessage".to_string(),
1839 payload: vec![1],
1840 ts_init: UnixNanos::from(20_000),
1841 ts_publish: UnixNanos::from(20_000),
1842 },
1843 CapturedEntrySummary {
1844 seq: 3,
1845 topic: "events.test.madsim".to_string(),
1846 payload_type: "TestAuditMessage".to_string(),
1847 payload: vec![2],
1848 ts_init: UnixNanos::from(30_000),
1849 ts_publish: UnixNanos::from(30_000),
1850 },
1851 CapturedEntrySummary {
1852 seq: 4,
1853 topic: RUN_ENDED_TOPIC.to_string(),
1854 payload_type: RUN_ENDED_PAYLOAD_TYPE.to_string(),
1855 payload: Vec::new(),
1856 ts_init: UnixNanos::from(40_000),
1857 ts_publish: UnixNanos::from(40_000),
1858 },
1859 ]
1860 }
1861
1862 #[rstest]
1863 fn open_run_with_options_surfaces_backend_opener_error() {
1864 let tmp = TempDir::new().expect("tempdir");
1865 let config = make_config(tmp.path().to_path_buf());
1866 let options = EventStoreLifecycleOptions::new().with_backend_opener(|_, _| {
1867 Err(EventStoreError::Backend(
1868 "test backend open failed".to_string(),
1869 ))
1870 });
1871
1872 let err = open_run_with_options(
1873 &config,
1874 INSTANCE_ID,
1875 "run-open-error".to_string(),
1876 None,
1877 UnixNanos::from(5_000),
1878 &RegisteredComponents::default(),
1879 HaltSignal::new(),
1880 get_atomic_clock_static(),
1881 &options,
1882 )
1883 .expect_err("backend opener error must stop run open");
1884
1885 match err {
1886 BootError::EventStore(EventStoreError::Backend(msg)) => {
1887 assert!(msg.contains("test backend open failed"));
1888 }
1889 other => panic!("expected backend open failure, was {other:?}"),
1890 }
1891 }
1892
1893 #[cfg(madsim)]
1894 #[derive(Debug, PartialEq, Eq)]
1895 struct MadsimMemoryLifecycleCapture {
1896 entries: Vec<CapturedEntrySummary>,
1897 redb_files: Vec<PathBuf>,
1898 }
1899
1900 #[cfg(madsim)]
1901 #[derive(Debug, PartialEq, Eq)]
1902 struct CapturedEntrySummary {
1903 seq: u64,
1904 topic: String,
1905 payload_type: String,
1906 payload: Vec<u8>,
1907 ts_init: UnixNanos,
1908 ts_publish: UnixNanos,
1909 }
1910
1911 #[cfg(madsim)]
1912 fn capture_madsim_memory_lifecycle_summary(seed: u64) -> MadsimMemoryLifecycleCapture {
1913 get_atomic_clock_static().set_time(UnixNanos::from(10_000));
1914
1915 let tmp = TempDir::new().expect("tempdir");
1916 let memory = Arc::new(Mutex::new(MemoryBackend::new()));
1917 let opener_memory = Arc::clone(&memory);
1918 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1919 let instance_id = UUID4::new();
1920 let mut config = make_config(tmp.path().to_path_buf());
1921 config.identity.seed = Some(seed);
1922 let options = EventStoreLifecycleOptions::new()
1923 .with_encoder_registry(test_registry())
1924 .with_backend_opener(move |_, manifest| {
1925 opener_memory
1926 .lock()
1927 .expect("memory backend")
1928 .open_run(manifest.clone())?;
1929 Ok(Box::new(SharedMemoryBackend(Arc::clone(&opener_memory))))
1930 });
1931
1932 let mut store =
1933 EventStoreLifecycle::boot_with_options(Some(config), instance_id, clock_rc, options)
1934 .expect("boot store");
1935 store
1936 .open(
1937 instance_id,
1938 &RegisteredComponents::default(),
1939 Environment::Backtest,
1940 )
1941 .expect("open run");
1942
1943 let topic: MStr<msgbus::Topic> = MStr::from("events.test.madsim");
1944 get_atomic_clock_static().set_time(UnixNanos::from(20_000));
1945 msgbus::publish_any(topic, &TestAuditMessage { value: 1 });
1946 get_atomic_clock_static().set_time(UnixNanos::from(30_000));
1947 msgbus::publish_any(topic, &TestAuditMessage { value: 2 });
1948 assert_eq!(
1949 store
1950 .session
1951 .as_ref()
1952 .expect("open session")
1953 .high_watermark(),
1954 3
1955 );
1956
1957 get_atomic_clock_static().set_time(UnixNanos::from(40_000));
1958 store.seal(UnixNanos::from(40_000));
1959
1960 let backend = memory.lock().expect("memory backend");
1961 let manifest = backend.manifest().expect("manifest");
1962 assert_eq!(manifest.seed, Some(seed));
1963 assert_eq!(manifest.status, RunStatus::Ended);
1964 assert_eq!(manifest.high_watermark, 4);
1965 let entries = backend
1966 .scan_range(1, manifest.high_watermark, ScanDirection::Forward)
1967 .expect("scan entries")
1968 .into_iter()
1969 .map(|entry| CapturedEntrySummary {
1970 seq: entry.seq,
1971 topic: entry.topic.as_ref().to_string(),
1972 payload_type: entry.payload_type.as_str().to_string(),
1973 payload: entry.payload.to_vec(),
1974 ts_init: entry.ts_init,
1975 ts_publish: entry.ts_publish,
1976 })
1977 .collect();
1978 drop(backend);
1979
1980 MadsimMemoryLifecycleCapture {
1981 entries,
1982 redb_files: redb_files_under(tmp.path()),
1983 }
1984 }
1985
1986 #[cfg(madsim)]
1987 fn redb_files_under(dir: &Path) -> Vec<PathBuf> {
1988 let mut paths = Vec::new();
1989 collect_redb_files(dir, &mut paths);
1990 paths.sort();
1991 paths
1992 }
1993
1994 #[cfg(madsim)]
1995 fn collect_redb_files(dir: &Path, paths: &mut Vec<PathBuf>) {
1996 for entry in std::fs::read_dir(dir).expect("read dir") {
1997 let path = entry.expect("dir entry").path();
1998 if path.is_dir() {
1999 collect_redb_files(&path, paths);
2000 } else if path
2001 .extension()
2002 .is_some_and(|extension| extension == "redb")
2003 {
2004 paths.push(path);
2005 }
2006 }
2007 }
2008
2009 #[rstest]
2010 fn close_seals_manifest_and_records_run_ended() {
2011 let tmp = TempDir::new().expect("tempdir");
2012 let config = make_config(tmp.path().to_path_buf());
2013
2014 let halt = HaltSignal::new();
2015 let mut session = open_run(
2016 &config,
2017 INSTANCE_ID,
2018 build_run_id(UnixNanos::from(2_000)),
2019 None,
2020 UnixNanos::from(2_000),
2021 &RegisteredComponents::default(),
2022 halt,
2023 get_atomic_clock_static(),
2024 )
2025 .expect("open run");
2026
2027 let run_id = session.run_id().to_string();
2028 session.close(UnixNanos::from(3_000)).expect("close");
2029
2030 let manifests = RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2031 let manifest = manifests
2032 .into_iter()
2033 .find(|m| m.run_id == run_id)
2034 .expect("manifest present");
2035 assert_eq!(manifest.status, RunStatus::Ended);
2036 assert!(manifest.high_watermark >= 2);
2037 }
2038
2039 #[rstest]
2040 fn snapshot_anchorer_persists_anchor_for_open_session() {
2041 let tmp = TempDir::new().expect("tempdir");
2042 let config = make_config(tmp.path().to_path_buf());
2043
2044 let halt = HaltSignal::new();
2045 let mut session = open_run(
2046 &config,
2047 INSTANCE_ID,
2048 build_run_id(UnixNanos::from(4_000)),
2049 None,
2050 UnixNanos::from(4_000),
2051 &RegisteredComponents::default(),
2052 halt,
2053 get_atomic_clock_static(),
2054 )
2055 .expect("open run");
2056
2057 let run_id = session.run_id().to_string();
2058
2059 {
2060 let anchorer = session.snapshot_anchorer().expect("snapshot anchorer");
2061 anchorer(CacheSnapshotRef::new(
2062 "cache://position-snapshots/P-1/0",
2063 Bytes::from_static(b"snapshot"),
2064 ))
2065 .expect("record snapshot anchor");
2066 }
2067
2068 session.close(UnixNanos::from(4_500)).expect("close");
2069
2070 let reader =
2071 RedbBackend::open_sealed(&config.base_dir, INSTANCE_ID, &run_id).expect("open sealed");
2072 let anchor = reader
2073 .latest_snapshot_anchor()
2074 .expect("latest snapshot anchor")
2075 .expect("anchor present");
2076
2077 assert_eq!(anchor.high_watermark, 1);
2078 assert_eq!(anchor.blob_ref, "cache://position-snapshots/P-1/0");
2079 assert_eq!(
2080 anchor.content_hash,
2081 compute_snapshot_content_hash(b"snapshot"),
2082 );
2083 }
2084
2085 #[rstest]
2086 fn recovery_seals_tail_ending_in_run_ended_as_ended_not_crashed() {
2087 let tmp = TempDir::new().expect("tempdir");
2096 let config = make_config(tmp.path().to_path_buf());
2097
2098 let halt = HaltSignal::new();
2099 let run_id = build_run_id(UnixNanos::from(7_000));
2100 let session = open_run(
2101 &config,
2102 INSTANCE_ID,
2103 run_id.clone(),
2104 None,
2105 UnixNanos::from(7_000),
2106 &RegisteredComponents::default(),
2107 halt,
2108 get_atomic_clock_static(),
2109 )
2110 .expect("open run");
2111
2112 let writer = session.writer.as_ref().expect("writer attached");
2113 writer
2114 .submit(run_ended_draft(UnixNanos::from(7_500)))
2115 .expect("submit RunEnded as tail entry");
2116 let deadline = Instant::now() + Duration::from_secs(2);
2120
2121 while session.high_watermark() < 2 {
2122 assert!(
2123 Instant::now() < deadline,
2124 "writer high_watermark stuck at {} before deadline",
2125 session.high_watermark(),
2126 );
2127 thread::sleep(Duration::from_millis(2));
2128 }
2129 drop(session);
2130
2131 let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2132 assert_eq!(outcome.recovered.len(), 1);
2133 assert_eq!(outcome.recovered[0].run_id, run_id);
2134 assert_eq!(
2135 outcome.recovered[0].status,
2136 RunStatus::Ended,
2137 "tail ending in RunEnded must seal as Ended",
2138 );
2139 assert!(
2140 outcome.parent_run_id.is_none(),
2141 "Ended runs must not become parents",
2142 );
2143
2144 let manifests = RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2145 let manifest = manifests
2146 .into_iter()
2147 .find(|m| m.run_id == run_id)
2148 .expect("manifest present");
2149 assert_eq!(manifest.status, RunStatus::Ended);
2150 }
2151
2152 #[rstest]
2153 fn drop_without_close_leaves_run_for_next_boot_to_recover() {
2154 let tmp = TempDir::new().expect("tempdir");
2155 let config = make_config(tmp.path().to_path_buf());
2156
2157 let halt = HaltSignal::new();
2158 let session = open_run(
2159 &config,
2160 INSTANCE_ID,
2161 build_run_id(UnixNanos::from(4_000)),
2162 None,
2163 UnixNanos::from(4_000),
2164 &RegisteredComponents::default(),
2165 halt,
2166 get_atomic_clock_static(),
2167 )
2168 .expect("open run");
2169 let run_id = session.run_id().to_string();
2170 drop(session);
2171
2172 let outcome =
2173 recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover after crash");
2174 assert_eq!(outcome.recovered.len(), 1);
2175 assert_eq!(outcome.recovered[0].run_id, run_id);
2176 assert_eq!(outcome.recovered[0].status, RunStatus::CrashedRecovered);
2177 assert_eq!(outcome.parent_run_id.as_deref(), Some(run_id.as_str()));
2178
2179 let manifests = RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2180 let sealed = manifests
2181 .into_iter()
2182 .find(|m| m.run_id == run_id)
2183 .expect("present");
2184 assert_eq!(sealed.status, RunStatus::CrashedRecovered);
2185 }
2186
2187 #[rstest]
2188 fn build_run_id_uses_expected_format() {
2189 let id = build_run_id(UnixNanos::from(123_456));
2193 let (prefix, suffix) = id.split_once('-').expect("run id must contain a hyphen");
2194 assert_eq!(prefix, "123456");
2195 assert_eq!(suffix.len(), 8, "suffix was {suffix:?}");
2196 assert!(
2197 suffix.chars().all(|c| c.is_ascii_hexdigit()),
2198 "suffix must be hex, was {suffix:?}",
2199 );
2200 }
2201
2202 #[rstest]
2203 fn crash_recovery_seals_predecessor_and_links_parent_run_id() {
2204 let tmp = TempDir::new().expect("tempdir");
2208 let config = make_config(tmp.path().to_path_buf());
2209
2210 let halt_first = HaltSignal::new();
2212 let first = open_run(
2213 &config,
2214 INSTANCE_ID,
2215 build_run_id(UnixNanos::from(10_000)),
2216 None,
2217 UnixNanos::from(10_000),
2218 &RegisteredComponents::default(),
2219 halt_first,
2220 get_atomic_clock_static(),
2221 )
2222 .expect("open first run");
2223 let crashed_run_id = first.run_id().to_string();
2224 drop(first);
2225
2226 let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2228 assert_eq!(outcome.recovered.len(), 1);
2229 assert_eq!(outcome.recovered[0].run_id, crashed_run_id);
2230 assert_eq!(outcome.recovered[0].status, RunStatus::CrashedRecovered);
2231 assert_eq!(
2232 outcome.parent_run_id.as_deref(),
2233 Some(crashed_run_id.as_str())
2234 );
2235
2236 let manifests_after_seal =
2238 RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2239 let predecessor = manifests_after_seal
2240 .iter()
2241 .find(|m| m.run_id == crashed_run_id)
2242 .expect("predecessor present");
2243 assert_eq!(predecessor.status, RunStatus::CrashedRecovered);
2244
2245 let halt_second = HaltSignal::new();
2247 let new_run_id = build_run_id(UnixNanos::from(20_000));
2248 let next = open_run(
2249 &config,
2250 INSTANCE_ID,
2251 new_run_id.clone(),
2252 outcome.parent_run_id,
2253 UnixNanos::from(20_000),
2254 &RegisteredComponents::default(),
2255 halt_second,
2256 get_atomic_clock_static(),
2257 )
2258 .expect("open second run");
2259 assert_eq!(next.parent_run_id(), Some(crashed_run_id.as_str()));
2260 assert_eq!(
2261 next.manifest().parent_run_id.as_deref(),
2262 Some(crashed_run_id.as_str()),
2263 );
2264 assert_eq!(next.high_watermark(), 1, "RunStarted is the first entry");
2265
2266 drop(next);
2269 let outcome_after = recover_predecessors(&config.base_dir, INSTANCE_ID)
2270 .expect("recover after second crash");
2271 assert_eq!(outcome_after.recovered.len(), 1);
2273 assert_eq!(outcome_after.recovered[0].run_id, new_run_id);
2274 assert_eq!(
2275 outcome_after.recovered[0].status,
2276 RunStatus::CrashedRecovered,
2277 );
2278
2279 let sealed = RedbBackend::open_sealed(&config.base_dir, INSTANCE_ID, &new_run_id)
2281 .expect("open sealed");
2282 let first_entry = sealed
2283 .scan_seq(1)
2284 .expect("scan")
2285 .expect("RunStarted present");
2286 assert_eq!(first_entry.payload_type.as_str(), "RunStarted");
2287 assert_eq!(first_entry.topic.as_ref(), "run.lifecycle.RunStarted");
2288 }
2289
2290 #[rstest]
2291 #[case::before_enqueue(CrashPoint::BeforeEnqueue, 0, false)]
2292 #[case::after_enqueue_before_commit(CrashPoint::AfterEnqueueBeforeCommit, 0, false)]
2293 #[case::after_commit_before_snapshot(CrashPoint::AfterCommitBeforeSnapshot, 1, false)]
2294 #[case::after_snapshot(CrashPoint::AfterSnapshot, 1, true)]
2295 fn crash_recovery_matrix_seals_predecessor_and_links_parent_run_id(
2296 #[case] crash_point: CrashPoint,
2297 #[case] expected_hwm: u64,
2298 #[case] expect_snapshot_anchor: bool,
2299 ) {
2300 let tmp = TempDir::new().expect("tempdir");
2301 let config = make_config(tmp.path().to_path_buf());
2302 let predecessor_run_id = format!("3000-{crash_point:?}");
2303 seed_crashed_predecessor(&config, &predecessor_run_id, crash_point);
2304
2305 let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2306 assert_eq!(outcome.recovered.len(), 1);
2307 assert_eq!(outcome.recovered[0].run_id, predecessor_run_id);
2308 assert_eq!(outcome.recovered[0].status, RunStatus::CrashedRecovered);
2309 assert_eq!(
2310 outcome.parent_run_id.as_deref(),
2311 Some(predecessor_run_id.as_str()),
2312 );
2313
2314 let predecessor =
2315 RedbBackend::open_sealed(&config.base_dir, INSTANCE_ID, &predecessor_run_id)
2316 .expect("open sealed predecessor");
2317 let manifest = predecessor.manifest().expect("manifest");
2318 let snapshot_anchor = predecessor.latest_snapshot_anchor().expect("anchor read");
2319
2320 assert_eq!(manifest.status, RunStatus::CrashedRecovered);
2321 assert_eq!(manifest.high_watermark, expected_hwm);
2322 assert_eq!(
2323 snapshot_anchor.is_some(),
2324 expect_snapshot_anchor,
2325 "snapshot anchor presence must match crash point",
2326 );
2327
2328 let next = open_run(
2329 &config,
2330 INSTANCE_ID,
2331 "4000-next".to_string(),
2332 outcome.parent_run_id,
2333 UnixNanos::from(4_000),
2334 &RegisteredComponents::default(),
2335 HaltSignal::new(),
2336 get_atomic_clock_static(),
2337 )
2338 .expect("open next run");
2339 assert_eq!(next.parent_run_id(), Some(predecessor_run_id.as_str()));
2340 assert_eq!(
2341 next.manifest().parent_run_id.as_deref(),
2342 Some(predecessor_run_id.as_str()),
2343 );
2344 }
2345
2346 #[rstest]
2347 fn kernel_event_store_open_seals_leftover_session_before_reopen() {
2348 let tmp = TempDir::new().expect("tempdir");
2353 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2354 let instance_id = UUID4::new();
2355
2356 let mut store = EventStoreLifecycle::boot(
2357 Some(make_config(tmp.path().to_path_buf())),
2358 instance_id,
2359 clock_rc,
2360 )
2361 .expect("boot store");
2362
2363 store
2364 .open(
2365 instance_id,
2366 &RegisteredComponents::default(),
2367 Environment::Backtest,
2368 )
2369 .expect("open first run");
2370 let run_one = store.run_id().expect("run one open").to_string();
2371
2372 store
2373 .open(
2374 instance_id,
2375 &RegisteredComponents::default(),
2376 Environment::Backtest,
2377 )
2378 .expect("open second run");
2379 let run_two = store.run_id().expect("run two open").to_string();
2380
2381 assert_ne!(run_one, run_two, "second open must produce a fresh run id");
2382
2383 drop(store);
2386 let manifests =
2387 RedbBackend::list_runs(tmp.path(), &instance_id.to_string()).expect("list runs");
2388 let m1 = manifests
2389 .iter()
2390 .find(|m| m.run_id == run_one)
2391 .expect("first run present");
2392 let m2 = manifests
2393 .iter()
2394 .find(|m| m.run_id == run_two)
2395 .expect("second run present");
2396 assert_eq!(m1.status, RunStatus::Ended);
2397 assert_eq!(m2.status, RunStatus::Ended);
2398 }
2399
2400 #[rstest]
2401 fn recover_picks_most_recent_crashed_recovered_as_parent() {
2402 let tmp = TempDir::new().expect("tempdir");
2405 let config = make_config(tmp.path().to_path_buf());
2406
2407 for ts in [1_000_u64, 2_000_u64, 3_000_u64] {
2408 let session = open_run(
2409 &config,
2410 INSTANCE_ID,
2411 build_run_id(UnixNanos::from(ts)),
2412 None,
2413 UnixNanos::from(ts),
2414 &RegisteredComponents::default(),
2415 HaltSignal::new(),
2416 get_atomic_clock_static(),
2417 )
2418 .expect("open");
2419 drop(session);
2420 }
2421
2422 let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2423 assert_eq!(outcome.recovered.len(), 3);
2424 assert!(
2425 outcome
2426 .recovered
2427 .iter()
2428 .all(|r| r.status == RunStatus::CrashedRecovered),
2429 "every predecessor must seal CrashedRecovered, was {:?}",
2430 outcome.recovered,
2431 );
2432 let parent = outcome.parent_run_id.as_deref().expect("parent set");
2434 assert!(
2435 parent.starts_with("3000-"),
2436 "parent must be the run with the highest start_ts_init, was {parent}",
2437 );
2438 }
2439
2440 #[rstest]
2441 fn submit_run_started_returns_timeout_when_writer_stalls() {
2442 let stub = StallBackend::default();
2446 let manifest = manifest_for("run-timeout");
2447 let mut backend: Box<dyn EventStore + Send> = Box::new(stub.clone());
2448 backend.open_run(manifest).expect("open stub");
2449
2450 let halt = HaltSignal::new();
2451
2452 let writer = EventStoreWriter::spawn(
2453 backend,
2454 get_atomic_clock_static(),
2455 halt.callback(),
2456 WriterConfig::default(),
2457 )
2458 .expect("spawn writer");
2459
2460 let err = submit_run_started_blocking(
2461 &writer,
2462 &RegisteredComponents::default(),
2463 UnixNanos::from(100),
2464 &halt,
2465 Duration::from_millis(20),
2466 )
2467 .expect_err("must time out");
2468
2469 match err {
2470 BootError::RunStartedTimeout { timeout } => {
2471 assert_eq!(timeout, Duration::from_millis(20));
2472 }
2473 other => panic!("expected RunStartedTimeout, was {other:?}"),
2474 }
2475
2476 stub.release();
2478 }
2479
2480 #[rstest]
2481 fn submit_run_started_returns_halted_when_writer_halts_during_wait() {
2482 let stub = StallBackend::default();
2485 let manifest = manifest_for("run-halt");
2486 let mut backend: Box<dyn EventStore + Send> = Box::new(stub.clone());
2487 backend.open_run(manifest).expect("open stub");
2488
2489 let halt = HaltSignal::new();
2490
2491 let writer = EventStoreWriter::spawn(
2492 backend,
2493 get_atomic_clock_static(),
2494 halt.callback(),
2495 WriterConfig::default(),
2496 )
2497 .expect("spawn writer");
2498
2499 let halt_for_thread = halt.clone();
2501
2502 let firer = thread::spawn(move || {
2503 thread::sleep(Duration::from_millis(10));
2504 halt_for_thread.callback()(HaltReason::BackendDisk("test stall".to_string()));
2505 });
2506
2507 let err = submit_run_started_blocking(
2508 &writer,
2509 &RegisteredComponents::default(),
2510 UnixNanos::from(200),
2511 &halt,
2512 Duration::from_secs(2),
2513 )
2514 .expect_err("must observe halt");
2515
2516 match err {
2517 BootError::HaltedDuringBoot(HaltReason::BackendDisk(msg)) => {
2518 assert!(msg.contains("test stall"), "reason msg was {msg}");
2519 }
2520 other => panic!("expected HaltedDuringBoot(BackendDisk), was {other:?}"),
2521 }
2522 firer.join().expect("halt firer joined");
2523 stub.release();
2524 }
2525
2526 fn manifest_for(run_id: &str) -> RunManifest {
2527 RunManifest {
2528 run_id: run_id.to_string(),
2529 parent_run_id: None,
2530 instance_id: INSTANCE_ID.to_string(),
2531 binary_hash: String::new(),
2532 schema_version: 1,
2533 crate_versions: String::new(),
2534 feature_flags: Vec::new(),
2535 adapter_versions: IndexMap::new(),
2536 config_hash: String::new(),
2537 registered_components: RegisteredComponents::default(),
2538 seed: None,
2539 start_ts_init: UnixNanos::default(),
2540 end_ts_init: None,
2541 high_watermark: 0,
2542 status: RunStatus::Running,
2543 }
2544 }
2545
2546 #[derive(Debug, Default, Clone)]
2550 struct StallBackend {
2551 inner: Arc<Mutex<StallInner>>,
2552 gate: Arc<(Mutex<bool>, std::sync::Condvar)>,
2553 }
2554
2555 #[derive(Debug, Default)]
2556 struct StallInner {
2557 manifest: Option<RunManifest>,
2558 }
2559
2560 impl StallBackend {
2561 fn release(&self) {
2562 let (lock, cvar) = &*self.gate;
2563 *lock.lock().expect("gate") = true;
2564 cvar.notify_all();
2565 }
2566 }
2567
2568 impl crate::EventStore for StallBackend {
2569 fn open_run(&mut self, manifest: RunManifest) -> Result<(), EventStoreError> {
2570 self.inner.lock().expect("inner").manifest = Some(manifest);
2571 Ok(())
2572 }
2573
2574 fn append_batch(&mut self, _: &[crate::AppendEntry]) -> Result<u64, EventStoreError> {
2575 let (lock, cvar) = &*self.gate;
2576 let mut released = lock.lock().expect("gate");
2577
2578 while !*released {
2579 released = cvar.wait(released).expect("gate wait");
2580 }
2581 Ok(0)
2582 }
2583
2584 fn scan_range(
2585 &self,
2586 _: u64,
2587 _: u64,
2588 _: crate::ScanDirection,
2589 ) -> Result<Vec<crate::EventStoreEntry>, EventStoreError> {
2590 Ok(Vec::new())
2591 }
2592
2593 fn scan_seq(&self, _: u64) -> Result<Option<crate::EventStoreEntry>, EventStoreError> {
2594 Ok(None)
2595 }
2596
2597 fn lookup(&self, _: crate::IndexKind, _: &str) -> Result<Option<u64>, EventStoreError> {
2598 Ok(None)
2599 }
2600
2601 fn iter_index_keys(
2602 &self,
2603 _: crate::IndexKind,
2604 ) -> Result<Vec<(String, u64)>, EventStoreError> {
2605 Ok(Vec::new())
2606 }
2607
2608 fn seal(&mut self, _: RunStatus) -> Result<(), EventStoreError> {
2609 Ok(())
2610 }
2611
2612 fn manifest(&self) -> Result<RunManifest, EventStoreError> {
2613 self.inner
2614 .lock()
2615 .expect("inner")
2616 .manifest
2617 .clone()
2618 .ok_or_else(|| EventStoreError::Backend("no manifest".to_string()))
2619 }
2620
2621 fn high_watermark(&self) -> Result<u64, EventStoreError> {
2622 Ok(0)
2623 }
2624 }
2625
2626 #[rstest]
2631 fn bus_tap_captures_submit_order_sent_through_msgbus() {
2632 let tmp = TempDir::new().expect("tempdir");
2633 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2634 let instance_id = UUID4::new();
2635
2636 let mut store = EventStoreLifecycle::boot(
2637 Some(make_config(tmp.path().to_path_buf())),
2638 instance_id,
2639 clock_rc,
2640 )
2641 .expect("boot store");
2642 store
2643 .open(
2644 instance_id,
2645 &RegisteredComponents::default(),
2646 Environment::Backtest,
2647 )
2648 .expect("open run");
2649 let run_id = store.run_id().expect("run open").to_string();
2650
2651 let trader_id = TraderId::from("TRADER-001");
2652 let strategy_id = StrategyId::from("S-001");
2653 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
2654 let client_order_id = ClientOrderId::from("O-20260510-000001");
2655 let order_init = OrderInitializedSpec::builder()
2656 .instrument_id(instrument_id)
2657 .client_order_id(client_order_id)
2658 .quantity(Quantity::from("1"))
2659 .time_in_force(TimeInForce::Gtc)
2660 .build();
2661 let submit_order = SubmitOrder::new(
2662 trader_id,
2663 Some(ClientId::from("BINANCE")),
2664 strategy_id,
2665 instrument_id,
2666 client_order_id,
2667 order_init,
2668 None,
2669 None,
2670 None,
2671 UUID4::new(),
2672 UnixNanos::from(3),
2673 None, );
2675
2676 let endpoint = MStr::<Endpoint>::from("test.exec.engine.process");
2677 msgbus::send_any_value(endpoint, &submit_order);
2678
2679 let deadline = Instant::now() + Duration::from_secs(2);
2682
2683 loop {
2684 let hwm = store
2685 .session
2686 .as_ref()
2687 .map_or(0, EventStoreSession::high_watermark);
2688
2689 if hwm >= 2 {
2690 break;
2691 }
2692 assert!(
2693 Instant::now() < deadline,
2694 "captured SubmitOrder did not commit within deadline (hwm={hwm})",
2695 );
2696 thread::sleep(Duration::from_millis(2));
2697 }
2698
2699 drop(store);
2701
2702 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
2703 .expect("open sealed");
2704 let captured = sealed
2705 .scan_seq(2)
2706 .expect("scan")
2707 .expect("captured entry present");
2708 assert_eq!(captured.payload_type.as_str(), "SubmitOrder");
2709 assert_eq!(captured.topic.as_ref(), endpoint.as_str());
2710
2711 let by_client = sealed
2714 .lookup(IndexKind::ClientOrderId, client_order_id.as_str())
2715 .expect("lookup")
2716 .expect("indexed");
2717 assert_eq!(by_client, 2);
2718 }
2719
2720 #[rstest]
2721 fn kernel_with_markers_captures_snapshots_over_synthetic_bus() {
2722 let tmp = TempDir::new().expect("tempdir");
2723 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2724 let instance_id = UUID4::new();
2725 let mut config = make_config(tmp.path().to_path_buf());
2726 config.data_markers = Some(DataMarkerConfig {
2727 classes: vec![DataMarkerClass::BookDeltas, DataMarkerClass::Quote],
2728 safety_flush_interval: Duration::from_secs(1),
2729 channel_capacity: 128,
2730 high_fidelity: Vec::new(),
2731 });
2732
2733 let mut store =
2734 EventStoreLifecycle::boot(Some(config), instance_id, clock_rc).expect("boot store");
2735 store
2736 .open(
2737 instance_id,
2738 &RegisteredComponents::default(),
2739 Environment::Backtest,
2740 )
2741 .expect("open run");
2742 let run_id = store.run_id().expect("run open").to_string();
2743
2744 let first = make_submit_order(ClientOrderId::from("O-marker-1"));
2745 msgbus::send_any_value(MStr::<Endpoint>::from("test.exec.process"), &first);
2746
2747 let quote = quote_ethusdt_binance();
2748 msgbus::publish_quote(MStr::from("data.quotes.BINANCE.ETHUSDT-PERP"), "e);
2749 let deltas = stub_deltas();
2750 msgbus::publish_deltas(MStr::from("data.deltas.XNAS.AAPL"), &deltas);
2751
2752 let second = make_submit_order(ClientOrderId::from("O-marker-2"));
2753 msgbus::send_any_value(MStr::<Endpoint>::from("test.exec.process"), &second);
2754 wait_for_high_watermark(&store, 3);
2755 store.seal(UnixNanos::from(500));
2756
2757 let marker_path = tmp
2758 .path()
2759 .join(instance_id.to_string())
2760 .join(format!("{run_id}.markers.redb"));
2761 let marker = RedbMarkerBackend::open_read_only_file(marker_path).expect("open markers");
2762 let snapshots = marker.scan_snapshots().expect("scan snapshots");
2763 let dict = marker.scan_dict().expect("scan dict");
2764
2765 assert_eq!(snapshots.len(), 1);
2766 assert_eq!(snapshots[0].event_seq_before, 3);
2767 assert_eq!(snapshots[0].advanced.len(), 2);
2768 assert_eq!(
2769 snapshots[0]
2770 .advanced
2771 .iter()
2772 .map(|cursor| cursor.count)
2773 .collect::<Vec<_>>(),
2774 vec![1, 1]
2775 );
2776 assert_eq!(
2777 dict.iter()
2778 .map(|entry| (entry.data_cls, entry.identifier.as_str()))
2779 .collect::<Vec<_>>(),
2780 vec![
2781 (DataClass::Quote, "ETHUSDT-PERP.BINANCE"),
2782 (DataClass::BookDeltas, "AAPL.XNAS"),
2783 ],
2784 );
2785 }
2786
2787 #[rstest]
2788 fn boot_recovery_ignores_marker_sidecar_files() {
2789 let tmp = TempDir::new().expect("tempdir");
2790 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2791 let instance_id = UUID4::new();
2792 let mut config = make_config(tmp.path().to_path_buf());
2793 config.data_markers = Some(DataMarkerConfig {
2794 classes: vec![DataMarkerClass::Quote],
2795 safety_flush_interval: Duration::from_secs(1),
2796 channel_capacity: 128,
2797 high_fidelity: Vec::new(),
2798 });
2799
2800 let mut store =
2801 EventStoreLifecycle::boot(Some(config.clone()), instance_id, Rc::clone(&clock_rc))
2802 .expect("boot store");
2803 store
2804 .open(
2805 instance_id,
2806 &RegisteredComponents::default(),
2807 Environment::Backtest,
2808 )
2809 .expect("open run");
2810 let run_id = store.run_id().expect("run open").to_string();
2811 store.seal(UnixNanos::from(500));
2812
2813 let marker_path = tmp
2814 .path()
2815 .join(instance_id.to_string())
2816 .join(format!("{run_id}.markers.redb"));
2817 assert!(marker_path.exists());
2818
2819 let rebooted = EventStoreLifecycle::boot(Some(config), instance_id, clock_rc)
2820 .expect("boot after marker sidecar");
2821
2822 assert!(rebooted.recovered().is_empty());
2823 }
2824
2825 #[rstest]
2826 fn marker_setup_failure_disables_markers_without_blocking_open() {
2827 let tmp = TempDir::new().expect("tempdir");
2828 let bad_base = tmp.path().join("not-a-directory");
2829 std::fs::write(&bad_base, b"not a directory").expect("write bad base");
2830 let memory = Arc::new(Mutex::new(MemoryBackend::new()));
2831 let opener_memory = Arc::clone(&memory);
2832 let mut config = make_config(bad_base);
2833 config.data_markers = Some(DataMarkerConfig {
2834 classes: vec![DataMarkerClass::Quote],
2835 safety_flush_interval: Duration::from_secs(1),
2836 channel_capacity: 128,
2837 high_fidelity: Vec::new(),
2838 });
2839 let options = EventStoreLifecycleOptions::new()
2840 .with_encoder_registry(test_registry())
2841 .with_backend_opener(move |_, manifest| {
2842 opener_memory
2843 .lock()
2844 .expect("memory backend")
2845 .open_run(manifest.clone())?;
2846 Ok(Box::new(SharedMemoryBackend(Arc::clone(&opener_memory))))
2847 });
2848
2849 let mut session = open_run_with_options(
2850 &config,
2851 INSTANCE_ID,
2852 "run-marker-setup-fails".to_string(),
2853 None,
2854 UnixNanos::from(5_000),
2855 &RegisteredComponents::default(),
2856 HaltSignal::new(),
2857 get_atomic_clock_static(),
2858 &options,
2859 )
2860 .expect("open run despite marker failure");
2861
2862 assert!(session.marker_capture.is_none());
2863
2864 let topic: MStr<msgbus::Topic> = MStr::from("events.test.marker-fallback");
2865 session
2866 .adapter()
2867 .expect("adapter")
2868 .capture::<TestAuditMessage>(
2869 topic,
2870 &TestAuditMessage { value: 11 },
2871 Headers::empty(),
2872 UnixNanos::from(5_001),
2873 )
2874 .expect("capture");
2875 let deadline = Instant::now() + Duration::from_secs(2);
2876
2877 while session.high_watermark() < 2 {
2878 assert!(
2879 Instant::now() < deadline,
2880 "event-store high_watermark {} did not reach 2 within deadline",
2881 session.high_watermark(),
2882 );
2883 thread::sleep(Duration::from_millis(2));
2884 }
2885 session
2886 .close(UnixNanos::from(6_000))
2887 .expect("close session");
2888
2889 let backend = memory.lock().expect("memory backend");
2890 let captured = backend
2891 .scan_seq(2)
2892 .expect("scan")
2893 .expect("captured entry present");
2894
2895 assert_eq!(captured.payload_type.as_str(), "TestAuditMessage");
2896 assert_eq!(captured.topic.as_ref(), topic.as_str());
2897 assert_eq!(captured.payload.as_ref(), &[11]);
2898 }
2899
2900 #[rstest]
2901 fn marker_registry_factory_receives_enabled_classes() {
2902 let tmp = TempDir::new().expect("tempdir");
2903 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2904 let instance_id = UUID4::new();
2905 let seen_classes: Arc<Mutex<Vec<Vec<DataClass>>>> = Arc::new(Mutex::new(Vec::new()));
2906 let seen_for_factory = Arc::clone(&seen_classes);
2907 let mut config = make_config(tmp.path().to_path_buf());
2908 config.data_markers = Some(DataMarkerConfig {
2909 classes: vec![DataMarkerClass::Trade, DataMarkerClass::Quote],
2910 safety_flush_interval: Duration::from_secs(1),
2911 channel_capacity: 128,
2912 high_fidelity: Vec::new(),
2913 });
2914 let options =
2915 EventStoreLifecycleOptions::new().with_marker_registry_factory(move |classes| {
2916 seen_for_factory
2917 .lock()
2918 .expect("seen classes")
2919 .push(classes.to_vec());
2920 DataMarkerExtractorRegistry::default_registry(classes)
2921 });
2922
2923 let mut store =
2924 EventStoreLifecycle::boot_with_options(Some(config), instance_id, clock_rc, options)
2925 .expect("boot store");
2926 store
2927 .open(
2928 instance_id,
2929 &RegisteredComponents::default(),
2930 Environment::Backtest,
2931 )
2932 .expect("open run");
2933 store.seal(UnixNanos::from(1_000));
2934
2935 let seen = seen_classes.lock().expect("seen classes");
2936 assert_eq!(seen.as_slice(), &[vec![DataClass::Trade, DataClass::Quote]]);
2937 }
2938
2939 #[rstest]
2940 fn markers_disabled_installs_no_file_and_no_cost() {
2941 let tmp = TempDir::new().expect("tempdir");
2942 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2943 let instance_id = UUID4::new();
2944
2945 let mut store = EventStoreLifecycle::boot(
2946 Some(make_config(tmp.path().to_path_buf())),
2947 instance_id,
2948 clock_rc,
2949 )
2950 .expect("boot store");
2951 store
2952 .open(
2953 instance_id,
2954 &RegisteredComponents::default(),
2955 Environment::Backtest,
2956 )
2957 .expect("open run");
2958 let run_id = store.run_id().expect("run open").to_string();
2959
2960 assert!(
2961 store
2962 .session
2963 .as_ref()
2964 .expect("session")
2965 .marker_capture
2966 .is_none()
2967 );
2968
2969 let quote = quote_ethusdt_binance();
2970 msgbus::publish_quote(MStr::from("data.quotes.BINANCE.ETHUSDT-PERP"), "e);
2971 store.seal(UnixNanos::from(500));
2972
2973 let marker_path = tmp
2974 .path()
2975 .join(instance_id.to_string())
2976 .join(format!("{run_id}.markers.redb"));
2977 assert!(!marker_path.exists());
2978 }
2979
2980 #[rstest]
2984 fn bus_tap_captures_time_event_handler_run() {
2985 let tmp = TempDir::new().expect("tempdir");
2986 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2987 let instance_id = UUID4::new();
2988
2989 let mut store = EventStoreLifecycle::boot(
2990 Some(make_config(tmp.path().to_path_buf())),
2991 instance_id,
2992 clock_rc,
2993 )
2994 .expect("boot store");
2995 store
2996 .open(
2997 instance_id,
2998 &RegisteredComponents::default(),
2999 Environment::Backtest,
3000 )
3001 .expect("open run");
3002 let run_id = store.run_id().expect("run open").to_string();
3003
3004 let event = TimeEvent::new(
3005 Ustr::from("strategy.heartbeat"),
3006 UUID4::new(),
3007 UnixNanos::from(100),
3008 UnixNanos::from(99),
3009 );
3010 let callback = TimeEventCallback::from(|_: TimeEvent| {});
3011 TimeEventHandler::new(event, callback).run();
3012
3013 let deadline = Instant::now() + Duration::from_secs(2);
3014
3015 loop {
3016 let hwm = store
3017 .session
3018 .as_ref()
3019 .map_or(0, EventStoreSession::high_watermark);
3020
3021 if hwm >= 2 {
3022 break;
3023 }
3024 assert!(
3025 Instant::now() < deadline,
3026 "captured TimeEvent did not commit within deadline (hwm={hwm})",
3027 );
3028 thread::sleep(Duration::from_millis(2));
3029 }
3030
3031 drop(store);
3032
3033 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3034 .expect("open sealed");
3035 let captured = sealed
3036 .scan_seq(2)
3037 .expect("scan")
3038 .expect("captured entry present");
3039
3040 assert_eq!(captured.payload_type.as_str(), PAYLOAD_TYPE_TIME_EVENT);
3041 assert_eq!(captured.topic, MessagingSwitchboard::time_event_topic());
3042 }
3043
3044 #[rstest]
3049 fn seal_clears_bus_tap_so_post_seal_dispatches_do_not_capture() {
3050 let tmp = TempDir::new().expect("tempdir");
3051 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3052 let instance_id = UUID4::new();
3053
3054 let mut store = EventStoreLifecycle::boot(
3055 Some(make_config(tmp.path().to_path_buf())),
3056 instance_id,
3057 clock_rc,
3058 )
3059 .expect("boot store");
3060 store
3061 .open(
3062 instance_id,
3063 &RegisteredComponents::default(),
3064 Environment::Backtest,
3065 )
3066 .expect("open run");
3067 let run_id = store.run_id().expect("run open").to_string();
3068
3069 store.seal(UnixNanos::from(0));
3070
3071 let endpoint = MStr::<Endpoint>::from("test.post.seal.endpoint");
3076 let payload: u32 = 99;
3077 msgbus::send_any_value(endpoint, &payload);
3078
3079 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3080 .expect("open sealed");
3081 assert!(
3083 sealed.scan_seq(3).expect("scan").is_none(),
3084 "no entry must land after seal",
3085 );
3086 }
3087
3088 #[rstest]
3094 fn bus_tap_captures_trading_command_envelope_with_inner_payload_type() {
3095 let tmp = TempDir::new().expect("tempdir");
3096 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3097 let instance_id = UUID4::new();
3098
3099 let mut store = EventStoreLifecycle::boot(
3100 Some(make_config(tmp.path().to_path_buf())),
3101 instance_id,
3102 clock_rc,
3103 )
3104 .expect("boot store");
3105 store
3106 .open(
3107 instance_id,
3108 &RegisteredComponents::default(),
3109 Environment::Backtest,
3110 )
3111 .expect("open run");
3112 let run_id = store.run_id().expect("run open").to_string();
3113
3114 let trader_id = TraderId::from("TRADER-001");
3115 let strategy_id = StrategyId::from("S-001");
3116 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
3117 let client_order_id = ClientOrderId::from("O-20260510-000002");
3118 let order_init = OrderInitializedSpec::builder()
3119 .instrument_id(instrument_id)
3120 .client_order_id(client_order_id)
3121 .quantity(Quantity::from("1"))
3122 .time_in_force(TimeInForce::Gtc)
3123 .build();
3124 let submit_order = SubmitOrder::new(
3125 trader_id,
3126 Some(ClientId::from("BINANCE")),
3127 strategy_id,
3128 instrument_id,
3129 client_order_id,
3130 order_init,
3131 None,
3132 None,
3133 None,
3134 UUID4::new(),
3135 UnixNanos::from(3),
3136 None, );
3138 let command = TradingCommand::SubmitOrder(submit_order.clone());
3139
3140 let endpoint = MStr::<Endpoint>::from("test.exec.engine.envelope");
3141 msgbus::send_trading_command(endpoint, command);
3142
3143 let deadline = Instant::now() + Duration::from_secs(2);
3144
3145 loop {
3146 let hwm = store
3147 .session
3148 .as_ref()
3149 .map_or(0, EventStoreSession::high_watermark);
3150
3151 if hwm >= 2 {
3152 break;
3153 }
3154 assert!(
3155 Instant::now() < deadline,
3156 "captured TradingCommand did not commit within deadline (hwm={hwm})",
3157 );
3158 thread::sleep(Duration::from_millis(2));
3159 }
3160
3161 drop(store);
3162
3163 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3164 .expect("open sealed");
3165 let captured = sealed
3166 .scan_seq(2)
3167 .expect("scan")
3168 .expect("captured entry present");
3169 assert_eq!(
3170 captured.payload_type.as_str(),
3171 "SubmitOrder",
3172 "wrapper envelope must stamp the inner payload_type",
3173 );
3174 assert_eq!(captured.topic.as_ref(), endpoint.as_str());
3175
3176 let by_client = sealed
3177 .lookup(IndexKind::ClientOrderId, client_order_id.as_str())
3178 .expect("lookup")
3179 .expect("indexed");
3180 assert_eq!(by_client, 2);
3181
3182 let decoded: SubmitOrder =
3186 rmp_serde::from_slice(&captured.payload).expect("decode captured SubmitOrder");
3187 assert_eq!(decoded, submit_order);
3188 }
3189
3190 #[rstest]
3194 fn bus_tap_captures_order_event_any_envelope_with_inner_payload_type() {
3195 let tmp = TempDir::new().expect("tempdir");
3196 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3197 let instance_id = UUID4::new();
3198
3199 let mut store = EventStoreLifecycle::boot(
3200 Some(make_config(tmp.path().to_path_buf())),
3201 instance_id,
3202 clock_rc,
3203 )
3204 .expect("boot store");
3205 store
3206 .open(
3207 instance_id,
3208 &RegisteredComponents::default(),
3209 Environment::Backtest,
3210 )
3211 .expect("open run");
3212 let run_id = store.run_id().expect("run open").to_string();
3213
3214 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
3215 let client_order_id = ClientOrderId::from("O-20260510-000003");
3216 let venue_order_id = VenueOrderId::from("V-99");
3217 let filled = OrderFilledSpec::builder()
3218 .instrument_id(instrument_id)
3219 .client_order_id(client_order_id)
3220 .venue_order_id(venue_order_id)
3221 .account_id(AccountId::from("BINANCE-001"))
3222 .trade_id(TradeId::from("T-1"))
3223 .last_qty(Quantity::from("1"))
3224 .last_px(Price::from("100.00"))
3225 .currency(Currency::USDT())
3226 .ts_event(UnixNanos::from(10))
3227 .ts_init(UnixNanos::from(11))
3228 .commission(Money::new(0.10, Currency::USDT()))
3229 .build();
3230 let event = OrderEventAny::Filled(filled);
3231
3232 let topic: MStr<msgbus::Topic> = MStr::from("events.order.ETHUSDT-PERP.BINANCE");
3233 msgbus::publish_order_event(topic, &event);
3234
3235 let deadline = Instant::now() + Duration::from_secs(2);
3236
3237 loop {
3238 let hwm = store
3239 .session
3240 .as_ref()
3241 .map_or(0, EventStoreSession::high_watermark);
3242
3243 if hwm >= 2 {
3244 break;
3245 }
3246 assert!(
3247 Instant::now() < deadline,
3248 "captured OrderEventAny did not commit within deadline (hwm={hwm})",
3249 );
3250 thread::sleep(Duration::from_millis(2));
3251 }
3252
3253 drop(store);
3254
3255 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3256 .expect("open sealed");
3257 let captured = sealed
3258 .scan_seq(2)
3259 .expect("scan")
3260 .expect("captured entry present");
3261 assert_eq!(
3262 captured.payload_type.as_str(),
3263 "OrderFilled",
3264 "wrapper envelope must stamp the inner payload_type",
3265 );
3266 assert_eq!(captured.topic.as_ref(), topic.as_str());
3267
3268 let by_client = sealed
3269 .lookup(IndexKind::ClientOrderId, client_order_id.as_str())
3270 .expect("lookup")
3271 .expect("indexed");
3272 let by_venue = sealed
3273 .lookup(IndexKind::VenueOrderId, venue_order_id.as_str())
3274 .expect("lookup")
3275 .expect("indexed");
3276 assert_eq!(by_client, 2);
3277 assert_eq!(by_venue, 2);
3278
3279 let decoded: OrderFilled =
3283 rmp_serde::from_slice(&captured.payload).expect("decode captured OrderFilled");
3284 assert_eq!(decoded, filled);
3285 }
3286
3287 #[rstest]
3292 fn bus_tap_captures_data_command_envelopes_with_category_payload_types() {
3293 let tmp = TempDir::new().expect("tempdir");
3294 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3295 let instance_id = UUID4::new();
3296
3297 let mut store = EventStoreLifecycle::boot(
3298 Some(make_config(tmp.path().to_path_buf())),
3299 instance_id,
3300 clock_rc,
3301 )
3302 .expect("boot store");
3303 store
3304 .open(
3305 instance_id,
3306 &RegisteredComponents::default(),
3307 Environment::Backtest,
3308 )
3309 .expect("open run");
3310 let run_id = store.run_id().expect("run open").to_string();
3311
3312 let request = RequestCommand::Quotes(RequestQuotes::new(
3313 InstrumentId::from("ETHUSDT-PERP.BINANCE"),
3314 None,
3315 None,
3316 None,
3317 Some(ClientId::from("BINANCE")),
3318 UUID4::new(),
3319 UnixNanos::from(20),
3320 None,
3321 ));
3322 let subscribe = SubscribeCommand::Quotes(SubscribeQuotes::new(
3323 InstrumentId::from("ETHUSDT-PERP.BINANCE"),
3324 Some(ClientId::from("BINANCE")),
3325 Some(Venue::from("BINANCE")),
3326 UUID4::new(),
3327 UnixNanos::from(21),
3328 Some(UUID4::new()),
3329 None,
3330 ));
3331
3332 let request_endpoint = MStr::<Endpoint>::from("test.data.engine.request");
3333 msgbus::send_data_command(request_endpoint, DataCommand::Request(request.clone()));
3334
3335 let subscribe_endpoint = MStr::<Endpoint>::from("test.data.engine.subscribe");
3336 msgbus::send_data_command(
3337 subscribe_endpoint,
3338 DataCommand::Subscribe(subscribe.clone()),
3339 );
3340
3341 let deadline = Instant::now() + Duration::from_secs(2);
3342
3343 loop {
3344 let hwm = store
3345 .session
3346 .as_ref()
3347 .map_or(0, EventStoreSession::high_watermark);
3348
3349 if hwm >= 3 {
3350 break;
3351 }
3352 assert!(
3353 Instant::now() < deadline,
3354 "captured DataCommand entries did not commit within deadline (hwm={hwm})",
3355 );
3356 thread::sleep(Duration::from_millis(2));
3357 }
3358
3359 drop(store);
3360
3361 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3362 .expect("open sealed");
3363 let captured_request = sealed
3364 .scan_seq(2)
3365 .expect("scan request")
3366 .expect("captured request present");
3367 assert_eq!(captured_request.payload_type.as_str(), "RequestCommand");
3368 assert_eq!(captured_request.topic.as_ref(), request_endpoint.as_str());
3369
3370 let decoded_request: RequestCommand =
3371 rmp_serde::from_slice(&captured_request.payload).expect("decode RequestCommand");
3372 match (decoded_request, request) {
3373 (RequestCommand::Quotes(decoded), RequestCommand::Quotes(expected)) => {
3374 assert_eq!(decoded.request_id, expected.request_id);
3375 assert_eq!(decoded.instrument_id, expected.instrument_id);
3376 assert_eq!(decoded.client_id, expected.client_id);
3377 assert_eq!(decoded.ts_init, expected.ts_init);
3378 }
3379 other => panic!("expected RequestCommand::Quotes round trip, was {other:?}"),
3380 }
3381
3382 let captured_subscribe = sealed
3383 .scan_seq(3)
3384 .expect("scan subscribe")
3385 .expect("captured subscribe present");
3386 assert_eq!(captured_subscribe.payload_type.as_str(), "SubscribeCommand");
3387 assert_eq!(
3388 captured_subscribe.topic.as_ref(),
3389 subscribe_endpoint.as_str()
3390 );
3391
3392 let decoded_subscribe: SubscribeCommand =
3393 rmp_serde::from_slice(&captured_subscribe.payload).expect("decode SubscribeCommand");
3394 match (decoded_subscribe, subscribe) {
3395 (SubscribeCommand::Quotes(decoded), SubscribeCommand::Quotes(expected)) => {
3396 assert_eq!(decoded.command_id, expected.command_id);
3397 assert_eq!(decoded.instrument_id, expected.instrument_id);
3398 assert_eq!(decoded.client_id, expected.client_id);
3399 assert_eq!(decoded.venue, expected.venue);
3400 assert_eq!(decoded.ts_init, expected.ts_init);
3401 assert_eq!(decoded.correlation_id, expected.correlation_id);
3402 }
3403 other => panic!("expected SubscribeCommand::Quotes round trip, was {other:?}"),
3404 }
3405 }
3406
3407 #[rstest]
3411 fn bus_tap_captures_data_response_sent_through_correlation_handler() {
3412 let tmp = TempDir::new().expect("tempdir");
3413 let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3414 let instance_id = UUID4::new();
3415
3416 let mut store = EventStoreLifecycle::boot(
3417 Some(make_config(tmp.path().to_path_buf())),
3418 instance_id,
3419 clock_rc,
3420 )
3421 .expect("boot store");
3422 store
3423 .open(
3424 instance_id,
3425 &RegisteredComponents::default(),
3426 Environment::Backtest,
3427 )
3428 .expect("open run");
3429 let run_id = store.run_id().expect("run open").to_string();
3430
3431 let correlation_id = UUID4::new();
3432 let handler_called = Rc::new(RefCell::new(false));
3433 let handler_called_clone = handler_called.clone();
3434 msgbus::register_response_handler(
3435 &correlation_id,
3436 msgbus::ShareableMessageHandler::from_typed(move |_resp: &QuotesResponse| {
3437 *handler_called_clone.borrow_mut() = true;
3438 }),
3439 );
3440
3441 let response = QuotesResponse::new(
3442 correlation_id,
3443 ClientId::from("BINANCE"),
3444 InstrumentId::from("ETHUSDT-PERP.BINANCE"),
3445 vec![],
3446 None,
3447 None,
3448 UnixNanos::from(30),
3449 None,
3450 );
3451 msgbus::send_response(&correlation_id, &DataResponse::Quotes(response.clone()));
3452
3453 let deadline = Instant::now() + Duration::from_secs(2);
3454
3455 loop {
3456 let hwm = store
3457 .session
3458 .as_ref()
3459 .map_or(0, EventStoreSession::high_watermark);
3460
3461 if hwm >= 2 {
3462 break;
3463 }
3464 assert!(
3465 Instant::now() < deadline,
3466 "captured DataResponse did not commit within deadline (hwm={hwm})",
3467 );
3468 thread::sleep(Duration::from_millis(2));
3469 }
3470
3471 assert!(*handler_called.borrow());
3472 drop(store);
3473
3474 let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3475 .expect("open sealed");
3476 let captured = sealed
3477 .scan_seq(2)
3478 .expect("scan")
3479 .expect("captured response present");
3480 assert_eq!(captured.payload_type.as_str(), "QuotesResponse");
3481 assert_eq!(captured.topic, MessagingSwitchboard::data_response_topic());
3482
3483 let decoded: QuotesResponse =
3484 rmp_serde::from_slice(&captured.payload).expect("decode QuotesResponse");
3485 assert_eq!(decoded.correlation_id, response.correlation_id);
3486 assert_eq!(decoded.client_id, response.client_id);
3487 assert_eq!(decoded.instrument_id, response.instrument_id);
3488 assert_eq!(decoded.ts_init, response.ts_init);
3489 assert!(decoded.data.is_empty());
3490 }
3491}