Skip to main content

nautilus_event_store/
kernel.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Run lifecycle and kernel boot integration for the event store.
17//!
18//! This module owns the kernel side of the SPEC's run lifecycle: it scans the on-disk
19//! instance directory for crashed predecessors before a fresh run opens, seals each
20//! survivor, opens the new run, blocks `start()` until the writer acknowledges the
21//! `RunStarted` entry, and seals the manifest with a final `RunEnded` entry on graceful
22//! stop. The writer's halt callback is wrapped in a typed [`HaltSignal`] that the kernel
23//! caller polls to convert a fail-stop into kernel shutdown rather than a panic.
24
25use std::{
26    any::Any,
27    cell::RefCell,
28    fmt::Debug,
29    path::{Path, PathBuf},
30    rc::Rc,
31    sync::{
32        Arc, Mutex,
33        atomic::{AtomicBool, AtomicU64, Ordering},
34    },
35    thread,
36    time::{Duration, Instant},
37};
38
39use bytes::Bytes;
40use nautilus_common::{
41    cache::{Cache, CacheSnapshotRef},
42    clock::Clock,
43    enums::Environment,
44    msgbus::{self, BusTap, Endpoint, MStr, MessagingSwitchboard},
45};
46#[cfg(feature = "live")]
47use nautilus_core::time::get_atomic_clock_realtime;
48use nautilus_core::{
49    UUID4, UnixNanos,
50    time::{AtomicTime, get_atomic_clock_static},
51};
52use nautilus_execution::engine::SnapshotAnchorer;
53use nautilus_system::{
54    KernelEventStore as KernelEventStoreTrait, RegisteredComponents,
55    event_store::{DataMarkerClass, DataMarkerConfig, EventStoreConfig, RetentionMode},
56};
57use ustr::Ustr;
58
59use crate::{
60    BusCaptureAdapter, CacheReplayError, CacheReplayReport, CaptureError, EncoderRegistry,
61    EntryDraft, EventStore, EventStoreError, EventStoreWriter, HaltCallback, HaltReason, Headers,
62    RedbBackend, RunId, RunManifest, RunStatus, ScanDirection, Topic, WriterConfig,
63    compute_snapshot_content_hash, default_registry,
64    markers::{
65        DataClass, DataMarkerCapture, DataMarkerExtractorRegistry, MarkerBackend, MarkerManifest,
66        MarkerWriter, MarkerWriterConfig, RedbMarkerBackend,
67    },
68    restore_cache_from_sealed_run, validate_event_store_replay_source,
69};
70
71const RUN_STARTED_TOPIC: &str = "run.lifecycle.RunStarted";
72const RUN_STARTED_PAYLOAD_TYPE: &str = "RunStarted";
73const RUN_ENDED_TOPIC: &str = "run.lifecycle.RunEnded";
74const RUN_ENDED_PAYLOAD_TYPE: &str = "RunEnded";
75
76/// The outcome of sealing a single crashed predecessor.
77#[derive(Clone, Debug, PartialEq, Eq)]
78pub struct RecoveredRun {
79    /// The id of the sealed predecessor.
80    pub run_id: RunId,
81    /// The terminal status applied: [`RunStatus::CrashedRecovered`] or
82    /// [`RunStatus::Quarantined`].
83    pub status: RunStatus,
84}
85
86/// Result of the predecessor recovery sweep performed in the kernel constructor.
87#[derive(Debug, Default)]
88pub struct RecoveryOutcome {
89    /// One entry per predecessor that was sealed by the sweep.
90    pub recovered: Vec<RecoveredRun>,
91    /// The id of the most-recently-crashed predecessor sealed as
92    /// [`RunStatus::CrashedRecovered`], or `None` when no recoverable predecessor
93    /// existed (or every predecessor was quarantined).
94    pub parent_run_id: Option<RunId>,
95}
96
97type RegistryFactory = dyn Fn() -> EncoderRegistry + Send + Sync + 'static;
98type BackendOpenResult = Result<Box<dyn EventStore + Send>, EventStoreError>;
99type BackendOpener =
100    dyn Fn(&EventStoreConfig, &RunManifest) -> BackendOpenResult + Send + Sync + 'static;
101type MarkerRegistryFactory =
102    dyn Fn(&[DataClass]) -> DataMarkerExtractorRegistry + Send + Sync + 'static;
103type SharedMarkerCapture = Rc<RefCell<Option<DataMarkerCapture>>>;
104
105/// Non-serialized lifecycle policy for advanced event-store callers.
106///
107/// [`EventStoreConfig`] remains the serializable run policy. This type carries process-local
108/// construction choices, such as the encoder registry and backend opener used when a kernel
109/// opens a run.
110#[derive(Clone)]
111pub struct EventStoreLifecycleOptions {
112    registry_factory: Arc<RegistryFactory>,
113    backend_opener: Arc<BackendOpener>,
114    marker_registry_factory: Arc<MarkerRegistryFactory>,
115}
116
117impl Debug for EventStoreLifecycleOptions {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct(stringify!(EventStoreLifecycleOptions))
120            .finish_non_exhaustive()
121    }
122}
123
124impl Default for EventStoreLifecycleOptions {
125    fn default() -> Self {
126        Self {
127            registry_factory: Arc::new(default_registry),
128            backend_opener: Arc::new(default_backend_opener),
129            marker_registry_factory: Arc::new(DataMarkerExtractorRegistry::default_registry),
130        }
131    }
132}
133
134impl EventStoreLifecycleOptions {
135    /// Creates options that use [`default_registry`] and [`RedbBackend`].
136    #[must_use]
137    pub fn new() -> Self {
138        Self::default()
139    }
140
141    /// Uses a caller-supplied encoder registry factory for each opened run.
142    #[must_use]
143    pub fn with_registry_factory<F>(mut self, factory: F) -> Self
144    where
145        F: Fn() -> EncoderRegistry + Send + Sync + 'static,
146    {
147        self.registry_factory = Arc::new(factory);
148        self
149    }
150
151    /// Uses a caller-supplied encoder registry for each opened run.
152    #[must_use]
153    pub fn with_encoder_registry(self, registry: EncoderRegistry) -> Self {
154        self.with_registry_factory(move || registry.clone())
155    }
156
157    /// Uses a caller-supplied backend opener for each opened run.
158    #[must_use]
159    pub fn with_backend_opener<F>(mut self, opener: F) -> Self
160    where
161        F: Fn(&EventStoreConfig, &RunManifest) -> BackendOpenResult + Send + Sync + 'static,
162    {
163        self.backend_opener = Arc::new(opener);
164        self
165    }
166
167    /// Uses a caller-supplied data-marker extractor registry factory for each opened run.
168    #[must_use]
169    pub fn with_marker_registry_factory<F>(mut self, factory: F) -> Self
170    where
171        F: Fn(&[DataClass]) -> DataMarkerExtractorRegistry + Send + Sync + 'static,
172    {
173        self.marker_registry_factory = Arc::new(factory);
174        self
175    }
176
177    fn build_registry(&self) -> EncoderRegistry {
178        (self.registry_factory)()
179    }
180
181    fn open_backend(&self, config: &EventStoreConfig, manifest: &RunManifest) -> BackendOpenResult {
182        (self.backend_opener)(config, manifest)
183    }
184
185    fn build_marker_registry(&self, classes: &[DataClass]) -> DataMarkerExtractorRegistry {
186        (self.marker_registry_factory)(classes)
187    }
188}
189
190fn default_backend_opener(config: &EventStoreConfig, manifest: &RunManifest) -> BackendOpenResult {
191    let mut backend = RedbBackend::new(config.base_dir.clone());
192    backend.open_run(manifest.clone())?;
193    Ok(Box::new(backend))
194}
195
196/// Errors surfaced by the boot path.
197#[derive(Debug, thiserror::Error)]
198pub enum BootError {
199    /// The event store backend rejected an open, scan, or seal during recovery or
200    /// new-run creation.
201    #[error(transparent)]
202    EventStore(#[from] EventStoreError),
203    /// The writer rejected the `RunStarted` submit.
204    #[error("RunStarted submit failed: {0}")]
205    RunStartedSubmit(String),
206    /// The writer accepted `RunStarted` but did not durably commit it inside the
207    /// configured timeout.
208    #[error("RunStarted did not durably commit within {timeout:?}")]
209    RunStartedTimeout {
210        /// The configured ceiling that elapsed before the writer's high-watermark
211        /// advanced.
212        timeout: Duration,
213    },
214    /// The writer signaled fail-stop while the boot path was waiting for the
215    /// `RunStarted` entry to commit.
216    #[error("event store halted during boot: {0:?}")]
217    HaltedDuringBoot(HaltReason),
218}
219
220/// A thread-safe halt signal the kernel registers with the writer.
221///
222/// The writer thread fires the callback once on the first unrecoverable condition;
223/// the kernel polls [`HaltSignal::is_halted`] and converts it into a typed kernel
224/// shutdown rather than letting the writer-thread error escape as a panic.
225#[derive(Clone, Debug)]
226pub struct HaltSignal {
227    halted: Arc<AtomicBool>,
228    reason: Arc<Mutex<Option<HaltReason>>>,
229}
230
231impl Default for HaltSignal {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237impl HaltSignal {
238    /// Constructs a fresh, un-fired halt signal.
239    #[must_use]
240    pub fn new() -> Self {
241        Self {
242            halted: Arc::new(AtomicBool::new(false)),
243            reason: Arc::new(Mutex::new(None)),
244        }
245    }
246
247    /// Returns the [`HaltCallback`] the writer fires when an unrecoverable condition
248    /// occurs.
249    ///
250    /// The callback records the [`HaltReason`] (preserving only the first one when
251    /// multiple submits race past the halt threshold) and flips the halted flag.
252    #[must_use]
253    pub fn callback(&self) -> HaltCallback {
254        let halted = Arc::clone(&self.halted);
255        let reason = Arc::clone(&self.reason);
256        Arc::new(move |r| {
257            if halted
258                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
259                .is_ok()
260                && let Ok(mut slot) = reason.lock()
261            {
262                *slot = Some(r);
263            }
264        })
265    }
266
267    /// Returns whether the writer has signaled fail-stop.
268    #[must_use]
269    pub fn is_halted(&self) -> bool {
270        self.halted.load(Ordering::Acquire)
271    }
272
273    /// Returns the [`HaltReason`] recorded on the first fail-stop, if any.
274    ///
275    /// Calling this does not clear the signal; the kernel's halted flag remains set so
276    /// subsequent submits surface as fail-stopped.
277    #[must_use]
278    pub fn reason(&self) -> Option<HaltReason> {
279        self.reason.lock().ok().and_then(|guard| guard.clone())
280    }
281}
282
283/// Live event-store session owned by the kernel between `start()` and `finalize_stop()`.
284pub struct EventStoreSession {
285    writer: Option<Arc<EventStoreWriter>>,
286    adapter: Option<Arc<BusCaptureAdapter>>,
287    marker_capture: Option<SharedMarkerCapture>,
288    manifest: RunManifest,
289    halt_signal: HaltSignal,
290}
291
292impl Debug for EventStoreSession {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct(stringify!(EventStoreSession))
295            .field("run_id", &self.manifest.run_id)
296            .field("parent_run_id", &self.manifest.parent_run_id)
297            .field("instance_id", &self.manifest.instance_id)
298            .field("halted", &self.halt_signal.is_halted())
299            .field("writer_attached", &self.writer.is_some())
300            .field("marker_capture_attached", &self.marker_capture.is_some())
301            .finish_non_exhaustive()
302    }
303}
304
305impl EventStoreSession {
306    /// Returns the captured manifest as it was written to disk at run start.
307    ///
308    /// The high-watermark and `end_ts_init` advance after seal; the snapshot here is
309    /// frozen at boot time.
310    #[must_use]
311    pub const fn manifest(&self) -> &RunManifest {
312        &self.manifest
313    }
314
315    /// Returns the run id of the currently open run.
316    #[must_use]
317    pub fn run_id(&self) -> &str {
318        self.manifest.run_id.as_str()
319    }
320
321    /// Returns the parent run id for the current run.
322    #[must_use]
323    pub fn parent_run_id(&self) -> Option<&str> {
324        self.manifest.parent_run_id.as_deref()
325    }
326
327    /// Returns whether the writer has fail-stopped.
328    #[must_use]
329    pub fn is_halted(&self) -> bool {
330        self.halt_signal.is_halted()
331    }
332
333    /// Returns the writer's current durable high-watermark.
334    ///
335    /// Returns `0` when the writer has been consumed by a prior `close`.
336    #[must_use]
337    pub fn high_watermark(&self) -> u64 {
338        self.writer.as_ref().map_or(0, |w| w.high_watermark())
339    }
340
341    /// Returns a snapshot anchorer bound to the open writer.
342    ///
343    /// The execution engine installs this callback while the run is open. The callback
344    /// records the cache-owned snapshot reference against the writer's durable
345    /// high-watermark after flushing earlier captured entries.
346    #[must_use]
347    pub fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
348        let writer = Arc::clone(self.writer.as_ref()?);
349
350        Some(Rc::new(move |snapshot_ref: CacheSnapshotRef| {
351            let content_hash = compute_snapshot_content_hash(snapshot_ref.blob.as_ref());
352            writer
353                .record_snapshot_anchor(snapshot_ref.blob_ref, content_hash)
354                .map(|_| ())
355                .map_err(|e| anyhow::anyhow!("record snapshot anchor: {e}"))
356        }))
357    }
358
359    /// Returns the live bus capture adapter, when one was wired into this run.
360    ///
361    /// `None` after [`Self::close`] consumes the writer.
362    #[must_use]
363    pub fn adapter(&self) -> Option<&Arc<BusCaptureAdapter>> {
364        self.adapter.as_ref()
365    }
366
367    /// Submits the terminal `RunEnded` entry, drains pending entries, and seals the
368    /// manifest as [`RunStatus::Ended`].
369    ///
370    /// Consumes the inner writer; subsequent calls return without effect.
371    ///
372    /// # Errors
373    ///
374    /// Returns [`EventStoreError`] if the writer fails to commit the final batch, the
375    /// seal step fails, or the writer Arc has outstanding clones (the bus tap must be
376    /// cleared before close to release the adapter's writer reference).
377    pub fn close(&mut self, ts_init: UnixNanos) -> Result<(), EventStoreError> {
378        // Drop the adapter first so the writer Arc has no other strong owners on
379        // try_unwrap. The kernel clears the bus tap before this site, so dropping the
380        // session-side adapter clone here is the last release before close.
381        self.adapter = None;
382        let marker_capture = self.marker_capture.take();
383
384        let Some(writer_arc) = self.writer.take() else {
385            close_marker_capture(marker_capture);
386            return Ok(());
387        };
388        let Ok(writer) = Arc::try_unwrap(writer_arc) else {
389            close_marker_capture(marker_capture);
390            return Err(EventStoreError::Backend(
391                "event store writer has multiple owners; clear the bus tap before close"
392                    .to_string(),
393            ));
394        };
395
396        let run_ended = run_ended_draft(ts_init);
397        let result = writer.close(run_ended);
398        close_marker_capture(marker_capture);
399        result?;
400        Ok(())
401    }
402}
403
404impl Drop for EventStoreSession {
405    fn drop(&mut self) {
406        // Drop without close: release adapter then writer so the writer thread exits
407        // unsealed; the next boot recovers.
408        self.adapter.take();
409        self.marker_capture.take();
410        self.writer.take();
411    }
412}
413
414fn close_marker_capture(marker_capture: Option<SharedMarkerCapture>) {
415    if let Some(marker_capture) = marker_capture
416        && let Some(capture) = marker_capture.borrow_mut().take()
417    {
418        capture.close();
419    }
420}
421
422/// Typed error surfaced when the event store fails the run lifecycle.
423///
424/// Wraps the boot-time and shutdown-time failure modes so a kernel caller can react to a
425/// fail-stop without inspecting individual writer/backend errors.
426#[derive(Debug, thiserror::Error)]
427pub enum KernelError {
428    /// The event-store boot path failed.
429    #[error("event store boot failed: {0}")]
430    EventStoreBoot(#[from] BootError),
431    /// Cache state reconstruction from a recovered event-store run failed.
432    #[error("event store cache replay failed: {0}")]
433    CacheReplay(#[from] CacheReplayError),
434    /// The writer signaled fail-stop after the kernel was already started.
435    #[error("event store halted: {0:?}")]
436    EventStoreHalted(HaltReason),
437}
438
439/// Kernel-facing wrapper that bundles every event-store concern: predecessor recovery,
440/// the open run, the halt signal, and the seal-on-drop fail-safe.
441///
442/// One instance is typically owned by [`nautilus_system::NautilusKernel`] via the
443/// [`KernelEventStoreTrait`] seam: the kernel calls [`EventStoreLifecycle::open`] from
444/// `start()`, [`EventStoreLifecycle::seal`] from `finalize_stop()` / `dispose()`, and
445/// the wrapper's [`Drop`] runs as the last-chance seal site for callers that skip both
446/// teardown paths (e.g. imperative `engine.run(...)` followed by drop in
447/// `BacktestEngine`).
448#[derive(Debug)]
449pub struct EventStoreLifecycle {
450    config: Option<EventStoreConfig>,
451    options: EventStoreLifecycleOptions,
452    recovered: Vec<RecoveredRun>,
453    parent_run_id: Option<String>,
454    session: Option<EventStoreSession>,
455    halt: HaltSignal,
456    // Held so `Drop` can stamp the seal even when the kernel never called seal()
457    // explicitly. Cloning the kernel's clock Rc keeps the wrapper independent of
458    // its owner.
459    clock: Rc<RefCell<dyn Clock>>,
460}
461
462impl EventStoreLifecycle {
463    /// Boots the wrapper at kernel construction time.
464    ///
465    /// Runs the predecessor recovery sweep against `<base_dir>/<instance_id>/`. When
466    /// `config` is `None` the wrapper is inert: every method becomes a no-op.
467    ///
468    /// # Errors
469    ///
470    /// Returns the underlying [`EventStoreError`] when the recovery sweep fails for a
471    /// reason other than the expected `CrashedPredecessor` handshake.
472    pub fn boot(
473        config: Option<EventStoreConfig>,
474        instance_id: UUID4,
475        clock: Rc<RefCell<dyn Clock>>,
476    ) -> anyhow::Result<Self> {
477        Self::boot_with_options(
478            config,
479            instance_id,
480            clock,
481            EventStoreLifecycleOptions::default(),
482        )
483    }
484
485    /// Boots the wrapper at kernel construction time with process-local lifecycle options.
486    ///
487    /// `EventStoreConfig` remains serializable. `options` carries runtime-only construction
488    /// policy for the encoder registry and backend opener.
489    ///
490    /// # Errors
491    ///
492    /// Returns the underlying [`EventStoreError`] when the recovery sweep fails for a
493    /// reason other than the expected `CrashedPredecessor` handshake.
494    pub fn boot_with_options(
495        config: Option<EventStoreConfig>,
496        instance_id: UUID4,
497        clock: Rc<RefCell<dyn Clock>>,
498        options: EventStoreLifecycleOptions,
499    ) -> anyhow::Result<Self> {
500        let (recovered, parent_run_id) = if let Some(cfg) = config.as_ref() {
501            let outcome = recover_predecessors(&cfg.base_dir, &instance_id.to_string())?;
502            if !outcome.recovered.is_empty() {
503                log::info!(
504                    "Sealed {} crashed event-store predecessor(s); parent_run_id={:?}",
505                    outcome.recovered.len(),
506                    outcome.parent_run_id,
507                );
508            }
509            (outcome.recovered, outcome.parent_run_id)
510        } else {
511            (Vec::new(), None)
512        };
513        Ok(Self {
514            config,
515            options,
516            recovered,
517            parent_run_id,
518            session: None,
519            halt: HaltSignal::new(),
520            clock,
521        })
522    }
523
524    /// Opens a fresh run on kernel `start()`. Idempotent against reset/rerun: a
525    /// leftover session from a prior `start()` is sealed before a new one opens, so
526    /// `RunStarted` remains the first entry of every run.
527    ///
528    /// `components` is the manifest captured into the `RunStarted` payload. `environment`
529    /// selects the static (backtest) or realtime (live) clock used to stamp `ts_publish`
530    /// inside the writer.
531    ///
532    /// Returns without effect when no event-store config was supplied.
533    ///
534    /// # Errors
535    ///
536    /// Returns [`KernelError::EventStoreBoot`] when opening the new run, spawning the
537    /// writer, or blocking on the `RunStarted` ack fails.
538    pub fn open(
539        &mut self,
540        instance_id: UUID4,
541        components: &RegisteredComponents,
542        environment: Environment,
543    ) -> Result<(), KernelError> {
544        let Some(config) = self.config.clone() else {
545            return Ok(());
546        };
547
548        if self.session.is_some() {
549            // Reset/rerun (BacktestEngine::run -> reset -> run) reuses the kernel
550            // across runs. Seal the leftover session before opening a fresh one.
551            let ts = self.clock.borrow().timestamp_ns();
552            self.seal(ts);
553        }
554
555        let clock = Self::clock_for(environment);
556        let start_ts_init = self.clock.borrow().timestamp_ns();
557        let run_id = build_run_id(start_ts_init);
558        let parent_run_id = if let Some(replay_run_id) = config.replay_from_run_id.as_deref() {
559            validate_event_store_replay_source(
560                config.base_dir.clone(),
561                &instance_id.to_string(),
562                replay_run_id,
563            )?;
564            Some(replay_run_id.to_string())
565        } else {
566            self.parent_run_id.clone()
567        };
568        let session = open_run_with_options(
569            &config,
570            &instance_id.to_string(),
571            run_id,
572            parent_run_id,
573            start_ts_init,
574            components,
575            self.halt.clone(),
576            clock,
577            &self.options,
578        )?;
579        log::info!(
580            "Opened event-store run {} (parent_run_id={:?})",
581            session.run_id(),
582            session.parent_run_id(),
583        );
584
585        if let Some(adapter) = session.adapter() {
586            install_bus_tap(Arc::clone(adapter), session.marker_capture.clone(), clock);
587        }
588        self.session = Some(session);
589        Ok(())
590    }
591
592    /// Restores cache state from the configured replay run or recovered parent run.
593    ///
594    /// This is a bootstrap-only reconstruction path. It opens the sealed replay source
595    /// for read-only replay, restores the cache-owned snapshot blob, then replays only
596    /// the entries after the snapshot anchor directly into [`Cache`].
597    ///
598    /// # Errors
599    ///
600    /// Returns [`KernelError::CacheReplay`] when the source reader, snapshot restore, decode,
601    /// or cache apply step fails.
602    pub fn restore_parent_cache(
603        &self,
604        instance_id: UUID4,
605        cache: &mut Cache,
606    ) -> Result<Option<CacheReplayReport>, KernelError> {
607        let Some(config) = self.config.as_ref() else {
608            return Ok(None);
609        };
610        let replay_run_id = config
611            .replay_from_run_id
612            .as_deref()
613            .or(self.parent_run_id.as_deref());
614        let Some(replay_run_id) = replay_run_id else {
615            return Ok(None);
616        };
617        let source = if config.replay_from_run_id.is_some() {
618            "configured replay run"
619        } else {
620            "parent run"
621        };
622
623        let report = restore_cache_from_sealed_run(
624            cache,
625            config.base_dir.clone(),
626            &instance_id.to_string(),
627            replay_run_id,
628        )?;
629
630        log::info!(
631            "Restored cache from event-store {source} {replay_run_id}: from_seq={}, to_seq={}, applied={}, ignored={}",
632            report.cache.plan.from_seq,
633            report.cache.plan.to_seq,
634            report.cache.applied_entries,
635            report.cache.ignored_entries,
636        );
637
638        Ok(Some(report.cache))
639    }
640
641    /// Seals the open session by writing `RunEnded` and updating the manifest to
642    /// `Ended`. Idempotent: a closed or absent session makes this a no-op. Halted
643    /// sessions skip the close (the recovery sweep on next boot owns the seal).
644    pub fn seal(&mut self, ts_init: UnixNanos) {
645        let Some(mut session) = self.session.take() else {
646            return;
647        };
648
649        // Drop the bus tap before close so the adapter's writer Arc is released; the
650        // close path then takes sole ownership of the writer and commits RunEnded.
651        msgbus::clear_bus_tap();
652
653        if session.is_halted() {
654            log::warn!(
655                "Event-store writer fail-stopped before close; run {} sealed by recovery sweep on next boot",
656                session.run_id(),
657            );
658            return;
659        }
660        let run_id = session.run_id().to_string();
661        if let Err(e) = session.close(ts_init) {
662            log::error!(
663                "Failed to seal event-store run {run_id} on graceful stop: {e}; run will be sealed as CrashedRecovered on next boot",
664            );
665        } else {
666            log::info!("Sealed event-store run {run_id}");
667        }
668    }
669
670    /// Returns the recovery report from the boot sweep.
671    #[must_use]
672    pub fn recovered(&self) -> &[RecoveredRun] {
673        &self.recovered
674    }
675
676    /// Returns the configured replay source or recovered parent run id, when present.
677    #[must_use]
678    pub fn parent_run_id(&self) -> Option<&str> {
679        self.config
680            .as_ref()
681            .and_then(|config| config.replay_from_run_id.as_deref())
682            .or(self.parent_run_id.as_deref())
683    }
684
685    /// Returns whether this lifecycle is configured for event-store-only replay.
686    #[must_use]
687    pub fn is_event_store_replay_configured(&self) -> bool {
688        self.config
689            .as_ref()
690            .is_some_and(|config| config.replay_from_run_id.is_some())
691    }
692
693    /// Returns the run id of the open session, when capture is active.
694    #[must_use]
695    pub fn run_id(&self) -> Option<&str> {
696        self.session.as_ref().map(EventStoreSession::run_id)
697    }
698
699    /// Returns a snapshot anchorer for the open run, when capture is active.
700    #[must_use]
701    pub fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
702        self.session
703            .as_ref()
704            .and_then(EventStoreSession::snapshot_anchorer)
705    }
706
707    /// Returns whether the writer has signaled fail-stop.
708    #[must_use]
709    pub fn is_halted(&self) -> bool {
710        self.halt.is_halted()
711    }
712
713    /// Returns the [`HaltReason`] recorded on the first fail-stop, if any.
714    #[must_use]
715    pub fn halt_reason(&self) -> Option<HaltReason> {
716        self.halt.reason()
717    }
718
719    /// Surfaces the current halt as a typed [`KernelError`], or `None` when the
720    /// writer has not halted.
721    #[must_use]
722    pub fn check_halt(&self) -> Option<KernelError> {
723        self.halt_reason().map(KernelError::EventStoreHalted)
724    }
725
726    #[cfg(feature = "live")]
727    fn clock_for(environment: Environment) -> &'static AtomicTime {
728        match environment {
729            Environment::Backtest => get_atomic_clock_static(),
730            Environment::Live | Environment::Sandbox => get_atomic_clock_realtime(),
731        }
732    }
733
734    #[cfg(not(feature = "live"))]
735    fn clock_for(_environment: Environment) -> &'static AtomicTime {
736        get_atomic_clock_static()
737    }
738}
739
740impl Drop for EventStoreLifecycle {
741    fn drop(&mut self) {
742        // Last-chance seal: callers may skip both finalize_stop() and dispose().
743        if self.session.is_none() {
744            return;
745        }
746        let ts = self
747            .clock
748            .try_borrow()
749            .map(|c| c.timestamp_ns())
750            .unwrap_or_default();
751        self.seal(ts);
752    }
753}
754
755/// Sweeps `<base_dir>/<instance_id>/` for crashed predecessor runs and seals each one.
756///
757/// A predecessor is a run file whose manifest still reads [`RunStatus::Running`]: the
758/// previous trader exited (cleanly via drop, or crashed) without sealing. The sweep
759/// scans every entry in the run, validating hashes; on success the manifest seals as
760/// [`RunStatus::CrashedRecovered`], otherwise as [`RunStatus::Quarantined`]. The
761/// most-recently-crashed survivor's `run_id` is returned so the new run records it as
762/// `parent_run_id`.
763///
764/// Quarantined runs do not become parents: a future replay must skip the corrupted
765/// tail rather than chain through it.
766///
767/// # Errors
768///
769/// Returns [`EventStoreError`] when the directory enumeration, open, or seal fails for
770/// reasons other than the expected [`EventStoreError::CrashedPredecessor`] handshake
771/// the backend uses to surface unsealed runs.
772pub fn recover_predecessors(
773    base_dir: &Path,
774    instance_id: &str,
775) -> Result<RecoveryOutcome, EventStoreError> {
776    let manifests = RedbBackend::list_runs(base_dir, instance_id)?;
777    let crashed: Vec<RunManifest> = manifests
778        .into_iter()
779        .filter(|m| matches!(m.status, RunStatus::Running))
780        .collect();
781
782    let mut outcome = RecoveryOutcome::default();
783
784    for predecessor in crashed {
785        let run_id = predecessor.run_id.clone();
786        let mut backend = RedbBackend::new(base_dir.to_path_buf());
787
788        match backend.open_run(predecessor) {
789            Err(EventStoreError::CrashedPredecessor) => {}
790            Ok(()) => {
791                return Err(EventStoreError::Backend(format!(
792                    "expected CrashedPredecessor reopening {run_id}, was Ok",
793                )));
794            }
795            Err(other) => return Err(other),
796        }
797
798        let high_watermark = backend.high_watermark()?;
799        let final_status = if high_watermark == 0 {
800            RunStatus::CrashedRecovered
801        } else {
802            match backend.scan_range(1, high_watermark, ScanDirection::Forward) {
803                Ok(entries) => {
804                    // The writer commits RunEnded before seal; a crash between those
805                    // two steps leaves a graceful tail without a sealed manifest.
806                    // Honor the tail: if the last entry is the kernel's RunEnded
807                    // marker, the predecessor closed cleanly and is not a crash to
808                    // chain through. Match both topic and payload_type so a future
809                    // capture-registry entry that happens to share the payload tag
810                    // cannot be misclassified as a graceful close.
811                    let tail_is_run_ended = entries.last().is_some_and(|e| {
812                        e.topic.as_ref() == RUN_ENDED_TOPIC
813                            && e.payload_type.as_str() == RUN_ENDED_PAYLOAD_TYPE
814                    });
815
816                    if tail_is_run_ended {
817                        RunStatus::Ended
818                    } else {
819                        RunStatus::CrashedRecovered
820                    }
821                }
822                Err(
823                    EventStoreError::HashMismatch { .. }
824                    | EventStoreError::Corrupted(_)
825                    | EventStoreError::Gap { .. },
826                ) => RunStatus::Quarantined,
827                Err(other) => return Err(other),
828            }
829        };
830
831        backend.seal(final_status)?;
832        outcome.recovered.push(RecoveredRun {
833            run_id: run_id.clone(),
834            status: final_status,
835        });
836
837        if matches!(final_status, RunStatus::CrashedRecovered) {
838            outcome.parent_run_id = Some(run_id);
839        }
840    }
841
842    Ok(outcome)
843}
844
845/// Builds the `<start_ts_init>-<short_uuid>` run id used as the manifest key and on-disk
846/// file name.
847///
848/// The id is sortable by start time so directory listings produce chronological order;
849/// the short uuid suffix keeps it unique even when two kernels start at the same
850/// nanosecond on different machines.
851#[must_use]
852pub fn build_run_id(start_ts_init: UnixNanos) -> RunId {
853    let suffix: String = UUID4::new().to_string().chars().take(8).collect();
854    format!("{}-{suffix}", u64::from(start_ts_init))
855}
856
857/// Opens a fresh run, spawns the writer, and submits a blocking `RunStarted` entry.
858///
859/// The kernel calls this from `start()` after components have registered with the
860/// trader so the captured `RunStarted` payload reflects the actual boot configuration.
861/// The function blocks until the writer's high-watermark advances past zero (i.e. the
862/// `RunStarted` entry has durably committed) or until [`EventStoreConfig::run_started_timeout`]
863/// elapses.
864///
865/// `feature_flags` is appended after the configured `feature_flags` so the retention
866/// mode survives in the manifest as `retention=<mode>`.
867///
868/// # Errors
869///
870/// Returns [`BootError::EventStore`] when the backend rejects open, [`BootError::RunStartedSubmit`]
871/// when the writer rejects the submit, [`BootError::RunStartedTimeout`] when the
872/// commit does not happen inside the configured ceiling, and [`BootError::HaltedDuringBoot`]
873/// when the writer fail-stops while waiting for the commit.
874#[allow(clippy::too_many_arguments)]
875pub fn open_run(
876    config: &EventStoreConfig,
877    instance_id: &str,
878    run_id: RunId,
879    parent_run_id: Option<RunId>,
880    start_ts_init: UnixNanos,
881    components: &RegisteredComponents,
882    halt_signal: HaltSignal,
883    clock: &'static AtomicTime,
884) -> Result<EventStoreSession, BootError> {
885    open_run_with_options(
886        config,
887        instance_id,
888        run_id,
889        parent_run_id,
890        start_ts_init,
891        components,
892        halt_signal,
893        clock,
894        &EventStoreLifecycleOptions::default(),
895    )
896}
897
898/// Opens a fresh run with process-local lifecycle options.
899///
900/// This follows [`open_run`] but obtains the backend and encoder registry from
901/// `options`.
902///
903/// # Errors
904///
905/// Returns [`BootError::EventStore`] when the backend rejects open, [`BootError::RunStartedSubmit`]
906/// when the writer rejects the submit, [`BootError::RunStartedTimeout`] when the
907/// commit does not happen inside the configured ceiling, and [`BootError::HaltedDuringBoot`]
908/// when the writer fail-stops while waiting for the commit.
909#[allow(clippy::too_many_arguments)]
910pub fn open_run_with_options(
911    config: &EventStoreConfig,
912    instance_id: &str,
913    run_id: RunId,
914    parent_run_id: Option<RunId>,
915    start_ts_init: UnixNanos,
916    components: &RegisteredComponents,
917    halt_signal: HaltSignal,
918    clock: &'static AtomicTime,
919    options: &EventStoreLifecycleOptions,
920) -> Result<EventStoreSession, BootError> {
921    let manifest = build_manifest(
922        config,
923        instance_id,
924        run_id,
925        parent_run_id,
926        start_ts_init,
927        components.clone(),
928    );
929
930    let backend = options.open_backend(config, &manifest)?;
931
932    let writer = Arc::new(EventStoreWriter::spawn(
933        backend,
934        clock,
935        halt_signal.callback(),
936        writer_config_from(config),
937    )?);
938
939    submit_run_started_blocking(
940        &writer,
941        components,
942        start_ts_init,
943        &halt_signal,
944        config.run_started_timeout,
945    )?;
946
947    let (marker_capture, submit_counter) =
948        build_marker_capture(config, &manifest, writer.high_watermark(), clock, options);
949    let mut adapter = BusCaptureAdapter::new(
950        Arc::clone(&writer),
951        Arc::new(options.build_registry()),
952        halt_signal.callback(),
953    );
954
955    if let Some(submit_counter) = submit_counter {
956        adapter = adapter.with_submit_counter(submit_counter);
957    }
958    let adapter = Arc::new(adapter);
959
960    Ok(EventStoreSession {
961        writer: Some(writer),
962        adapter: Some(adapter),
963        marker_capture,
964        manifest,
965        halt_signal,
966    })
967}
968
969fn build_marker_capture(
970    config: &EventStoreConfig,
971    manifest: &RunManifest,
972    initial_submit_counter: u64,
973    clock: &'static AtomicTime,
974    options: &EventStoreLifecycleOptions,
975) -> (Option<SharedMarkerCapture>, Option<Arc<AtomicU64>>) {
976    let Some(marker_config) = config.data_markers.as_ref() else {
977        return (None, None);
978    };
979
980    match open_marker_capture(
981        config,
982        manifest,
983        marker_config,
984        initial_submit_counter,
985        clock,
986        options,
987    ) {
988        Ok((capture, submit_counter)) => (
989            Some(Rc::new(RefCell::new(Some(capture)))),
990            Some(submit_counter),
991        ),
992        Err(e) => {
993            log::warn!(
994                "Data marker sidecar disabled for run {} after marker setup failed: {e}",
995                manifest.run_id,
996            );
997            (None, None)
998        }
999    }
1000}
1001
1002fn open_marker_capture(
1003    config: &EventStoreConfig,
1004    manifest: &RunManifest,
1005    marker_config: &DataMarkerConfig,
1006    initial_submit_counter: u64,
1007    clock: &'static AtomicTime,
1008    options: &EventStoreLifecycleOptions,
1009) -> Result<(DataMarkerCapture, Arc<AtomicU64>), EventStoreError> {
1010    let classes = marker_config
1011        .classes
1012        .iter()
1013        .copied()
1014        .map(data_marker_class_to_data_class)
1015        .collect::<Vec<_>>();
1016    let marker_manifest = marker_manifest_for(manifest, classes.clone(), marker_config);
1017    let marker_path = marker_file_path(config, &manifest.instance_id, &manifest.run_id);
1018    let mut marker_backend = RedbMarkerBackend::new(marker_path);
1019    marker_backend.open_run(marker_manifest)?;
1020    let writer = MarkerWriter::spawn(
1021        Box::new(marker_backend),
1022        clock,
1023        MarkerWriterConfig {
1024            channel_capacity: marker_config.channel_capacity,
1025            ..MarkerWriterConfig::default()
1026        },
1027    )?;
1028    let submit_counter = Arc::new(AtomicU64::new(initial_submit_counter));
1029    let registry = options.build_marker_registry(&classes);
1030    let capture =
1031        DataMarkerCapture::new(registry, writer, Arc::clone(&submit_counter), marker_config);
1032
1033    Ok((capture, submit_counter))
1034}
1035
1036fn marker_file_path(config: &EventStoreConfig, instance_id: &str, run_id: &str) -> PathBuf {
1037    config
1038        .base_dir
1039        .join(instance_id)
1040        .join(format!("{run_id}.markers.redb"))
1041}
1042
1043fn marker_manifest_for(
1044    manifest: &RunManifest,
1045    enabled_classes: Vec<DataClass>,
1046    config: &DataMarkerConfig,
1047) -> MarkerManifest {
1048    MarkerManifest {
1049        run_id: manifest.run_id.clone(),
1050        enabled_classes,
1051        high_fidelity: !config.high_fidelity.is_empty(),
1052        snapshot_count: 0,
1053        hifi_count: 0,
1054        gap_count: 0,
1055        dict_count: 0,
1056        status: RunStatus::Running,
1057    }
1058}
1059
1060const fn data_marker_class_to_data_class(class: DataMarkerClass) -> DataClass {
1061    match class {
1062        DataMarkerClass::BookDeltas => DataClass::BookDeltas,
1063        DataMarkerClass::BookDepth10 => DataClass::BookDepth10,
1064        DataMarkerClass::Quote => DataClass::Quote,
1065        DataMarkerClass::Trade => DataClass::Trade,
1066        DataMarkerClass::Bar => DataClass::Bar,
1067    }
1068}
1069
1070fn build_manifest(
1071    config: &EventStoreConfig,
1072    instance_id: &str,
1073    run_id: RunId,
1074    parent_run_id: Option<RunId>,
1075    start_ts_init: UnixNanos,
1076    components: RegisteredComponents,
1077) -> RunManifest {
1078    let mut feature_flags = config.identity.feature_flags.clone();
1079    feature_flags.push(format!("retention={}", retention_tag(config.retention)));
1080
1081    RunManifest {
1082        run_id,
1083        parent_run_id,
1084        instance_id: instance_id.to_string(),
1085        binary_hash: config.identity.binary_hash.clone(),
1086        schema_version: config.identity.schema_version,
1087        crate_versions: config.identity.crate_versions.clone(),
1088        feature_flags,
1089        adapter_versions: config.identity.adapter_versions.clone(),
1090        config_hash: config.identity.config_hash.clone(),
1091        registered_components: components,
1092        seed: config.identity.seed,
1093        start_ts_init,
1094        end_ts_init: None,
1095        high_watermark: 0,
1096        status: RunStatus::Running,
1097    }
1098}
1099
1100const fn retention_tag(mode: RetentionMode) -> &'static str {
1101    match mode {
1102        RetentionMode::Full => "full",
1103        RetentionMode::Bounded { .. } => "bounded",
1104        RetentionMode::SnapshotAnchored => "snapshot",
1105    }
1106}
1107
1108fn writer_config_from(config: &EventStoreConfig) -> WriterConfig {
1109    WriterConfig {
1110        channel_capacity: config.channel_capacity,
1111        max_batch_entries: config.max_batch_entries,
1112        max_batch_latency: config.max_batch_latency,
1113        halt_threshold: config.halt_threshold,
1114    }
1115}
1116
1117/// Submits the `RunStarted` draft and blocks until the writer durably acknowledges it,
1118/// the writer fail-stops, or `timeout` elapses.
1119///
1120/// Exposed at `pub(crate)` so tests can drive it against a stub backend without going
1121/// through [`open_run`].
1122///
1123/// # Errors
1124///
1125/// Returns [`BootError::RunStartedSubmit`] when the writer rejects the submit,
1126/// [`BootError::HaltedDuringBoot`] when the writer fail-stops during the wait, and
1127/// [`BootError::RunStartedTimeout`] when the writer does not commit within `timeout`.
1128pub(crate) fn submit_run_started_blocking(
1129    writer: &EventStoreWriter,
1130    components: &RegisteredComponents,
1131    ts_init: UnixNanos,
1132    halt_signal: &HaltSignal,
1133    timeout: Duration,
1134) -> Result<(), BootError> {
1135    let payload = encode_run_started(components);
1136    let draft = EntryDraft::without_indices(
1137        Headers::empty(),
1138        Topic::from(RUN_STARTED_TOPIC),
1139        Ustr::from(RUN_STARTED_PAYLOAD_TYPE),
1140        payload,
1141        ts_init,
1142    );
1143
1144    writer
1145        .submit(draft)
1146        .map_err(|e| BootError::RunStartedSubmit(e.to_string()))?;
1147
1148    // Wall-clock timeout against the writer thread: the writer drives the seam,
1149    // not the kernel state machine, so monotonic Instant timing is correct here.
1150    let start = Instant::now(); // dst-ok
1151
1152    while writer.high_watermark() == 0 {
1153        if halt_signal.is_halted() {
1154            return Err(BootError::HaltedDuringBoot(
1155                halt_signal.reason().unwrap_or_else(|| {
1156                    HaltReason::BackendError("event store halted during boot".to_string())
1157                }),
1158            ));
1159        }
1160
1161        let elapsed = start.elapsed();
1162
1163        if elapsed >= timeout {
1164            return Err(BootError::RunStartedTimeout { timeout });
1165        }
1166        thread::sleep(Duration::from_millis(1));
1167    }
1168
1169    Ok(())
1170}
1171
1172fn encode_run_started(components: &RegisteredComponents) -> Bytes {
1173    // bincode keeps the payload compact and matches the manifest encoding the backend
1174    // already uses; replay's RunStarted decoder pairs with this representation.
1175    let bytes = bincode::serde::encode_to_vec(components, bincode::config::standard())
1176        .expect("RegisteredComponents serializes via serde, must not fail under standard config");
1177    Bytes::from(bytes)
1178}
1179
1180fn run_ended_draft(ts_init: UnixNanos) -> EntryDraft {
1181    EntryDraft::without_indices(
1182        Headers::empty(),
1183        Topic::from(RUN_ENDED_TOPIC),
1184        Ustr::from(RUN_ENDED_PAYLOAD_TYPE),
1185        Bytes::new(),
1186        ts_init,
1187    )
1188}
1189
1190/// Bus tap that forwards captured publish and send dispatches to the event store.
1191///
1192/// Built and registered by [`EventStoreLifecycle::open`]; cleared by
1193/// [`EventStoreLifecycle::seal`] and the wrapper's [`Drop`]. The tap reads `ts_init` from
1194/// the kernel's `AtomicTime` at capture time so non-Phase-A headers carry a
1195/// writer-receive timestamp.
1196struct EventStoreBusTap {
1197    adapter: Arc<BusCaptureAdapter>,
1198    marker_capture: Option<SharedMarkerCapture>,
1199    clock: &'static AtomicTime,
1200}
1201
1202impl Debug for EventStoreBusTap {
1203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1204        f.debug_struct(stringify!(EventStoreBusTap))
1205            .field("halted", &self.adapter.is_halted())
1206            .field("marker_capture_attached", &self.marker_capture.is_some())
1207            .finish_non_exhaustive()
1208    }
1209}
1210
1211impl BusTap for EventStoreBusTap {
1212    fn on_publish(&self, topic: Topic, message: &dyn Any) {
1213        let ts_init = self.clock.get_time_ns();
1214        self.capture(topic, message, ts_init);
1215    }
1216
1217    fn on_send(&self, endpoint: MStr<Endpoint>, message: &dyn Any) {
1218        let ts_init = self.clock.get_time_ns();
1219        // Reuse the endpoint string as the captured topic. The MStr markers differ but
1220        // the underlying interned string is the same; offline scans match either way.
1221        let topic = Topic::from(*endpoint);
1222        self.capture(topic, message, ts_init);
1223    }
1224
1225    fn on_response(&self, _correlation_id: &UUID4, message: &dyn Any) {
1226        let ts_init = self.clock.get_time_ns();
1227        let topic = MessagingSwitchboard::data_response_topic();
1228        self.capture(topic, message, ts_init);
1229    }
1230}
1231
1232impl EventStoreBusTap {
1233    fn capture(&self, topic: Topic, message: &dyn Any, ts_init: UnixNanos) {
1234        // The registry both gates capture (no encoder -> no entry) and supplies headers
1235        // for entries that do flow through. Looking the headers up here keeps the
1236        // adapter encoder-only and lets header propagation light up per-type as the
1237        // SPEC's workstream A lands fields on commands and events.
1238        let headers = self
1239            .adapter
1240            .registry()
1241            .headers_for_any(message)
1242            .unwrap_or_else(Headers::empty);
1243        // Submit failures fire the adapter halt callback before returning; HaltSignal
1244        // is the observation path. Halted means the signal already fired.
1245        match self.adapter.capture_any(topic, message, headers, ts_init) {
1246            Ok(captured) => {
1247                self.capture_marker(topic, message, ts_init, captured);
1248            }
1249            Err(CaptureError::Halted) => {}
1250            Err(CaptureError::Submit(e)) => {
1251                log::error!("Event store capture submit failed on {topic}: {e}");
1252            }
1253            Err(CaptureError::Encode(e)) => {
1254                log::warn!("Event store encoder rejected message on {topic}: {e}");
1255            }
1256        }
1257    }
1258
1259    fn capture_marker(&self, topic: Topic, message: &dyn Any, ts_init: UnixNanos, captured: bool) {
1260        let Some(marker_capture) = self.marker_capture.as_ref() else {
1261            return;
1262        };
1263        let mut marker_capture = marker_capture.borrow_mut();
1264        let Some(capture) = marker_capture.as_mut() else {
1265            return;
1266        };
1267
1268        if captured {
1269            capture.on_entry_submitted(ts_init);
1270        } else {
1271            capture.observe_publish(topic, message, ts_init);
1272        }
1273        capture.maybe_safety_flush(ts_init);
1274    }
1275}
1276
1277fn install_bus_tap(
1278    adapter: Arc<BusCaptureAdapter>,
1279    marker_capture: Option<SharedMarkerCapture>,
1280    clock: &'static AtomicTime,
1281) {
1282    let tap: Rc<dyn BusTap> = Rc::new(EventStoreBusTap {
1283        adapter,
1284        marker_capture,
1285        clock,
1286    });
1287    msgbus::set_bus_tap(tap);
1288}
1289
1290// Use fully qualified `EventStoreLifecycle::` to dispatch to the inherent methods;
1291// `Self::` would resolve back into this trait impl and recurse.
1292#[allow(clippy::use_self)]
1293impl KernelEventStoreTrait for EventStoreLifecycle {
1294    fn restore_parent_cache(
1295        &mut self,
1296        instance_id: UUID4,
1297        cache: &mut Cache,
1298    ) -> anyhow::Result<()> {
1299        EventStoreLifecycle::restore_parent_cache(self, instance_id, cache)
1300            .map(|_| ())
1301            .map_err(Into::into)
1302    }
1303
1304    fn open(
1305        &mut self,
1306        instance_id: UUID4,
1307        components: &RegisteredComponents,
1308        environment: Environment,
1309    ) -> anyhow::Result<()> {
1310        EventStoreLifecycle::open(self, instance_id, components, environment).map_err(Into::into)
1311    }
1312
1313    fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
1314        EventStoreLifecycle::snapshot_anchorer(self)
1315    }
1316
1317    fn seal(&mut self, ts_init: UnixNanos) {
1318        EventStoreLifecycle::seal(self, ts_init);
1319    }
1320
1321    fn run_id(&self) -> Option<&str> {
1322        EventStoreLifecycle::run_id(self)
1323    }
1324
1325    fn parent_run_id(&self) -> Option<&str> {
1326        EventStoreLifecycle::parent_run_id(self)
1327    }
1328
1329    fn is_event_store_replay_configured(&self) -> bool {
1330        EventStoreLifecycle::is_event_store_replay_configured(self)
1331    }
1332
1333    fn is_halted(&self) -> bool {
1334        EventStoreLifecycle::is_halted(self)
1335    }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340    #[cfg(madsim)]
1341    use std::path::Path;
1342    use std::path::PathBuf;
1343
1344    use indexmap::IndexMap;
1345    use nautilus_common::{
1346        clock::TestClock,
1347        messages::{
1348            data::{
1349                DataCommand, DataResponse, QuotesResponse, RequestCommand, RequestQuotes,
1350                SubscribeCommand, SubscribeQuotes,
1351            },
1352            execution::{SubmitOrder, TradingCommand},
1353        },
1354        timer::{TimeEvent, TimeEventCallback, TimeEventHandler},
1355    };
1356    use nautilus_core::time::get_atomic_clock_static;
1357    use nautilus_model::{
1358        data::stubs::{quote_ethusdt_binance, stub_deltas},
1359        enums::TimeInForce,
1360        events::{
1361            OrderEventAny, OrderFilled,
1362            order::spec::{OrderFilledSpec, OrderInitializedSpec},
1363        },
1364        identifiers::{
1365            AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TradeId, TraderId, Venue,
1366            VenueOrderId,
1367        },
1368        types::{Currency, Money, Price, Quantity},
1369    };
1370    use nautilus_system::event_store::{DataMarkerClass, DataMarkerConfig, RunIdentity};
1371    use rstest::rstest;
1372    use tempfile::TempDir;
1373
1374    use super::*;
1375    use crate::{
1376        AppendEntry, DataClass, EncodedPayload, EventStoreEntry, IndexKind, MarkerBackend,
1377        MemoryBackend, RedbMarkerBackend, SnapshotAnchor,
1378        capture::builtins::PAYLOAD_TYPE_TIME_EVENT, compute_entry_hash,
1379    };
1380
1381    const INSTANCE_ID: &str = "trader-001";
1382
1383    fn make_config(base_dir: PathBuf) -> EventStoreConfig {
1384        EventStoreConfig {
1385            base_dir,
1386            identity: RunIdentity {
1387                binary_hash: "deadbeef".to_string(),
1388                schema_version: 1,
1389                crate_versions: "feedface".to_string(),
1390                feature_flags: Vec::new(),
1391                adapter_versions: IndexMap::new(),
1392                config_hash: "cafebabe".to_string(),
1393                seed: None,
1394            },
1395            retention: RetentionMode::Full,
1396            replay_from_run_id: None,
1397            data_markers: None,
1398            channel_capacity: 64,
1399            max_batch_entries: 1,
1400            max_batch_latency: Duration::from_millis(2),
1401            halt_threshold: Duration::from_secs(2),
1402            run_started_timeout: Duration::from_secs(2),
1403        }
1404    }
1405
1406    #[derive(Clone, Copy, Debug)]
1407    enum CrashPoint {
1408        BeforeEnqueue,
1409        AfterEnqueueBeforeCommit,
1410        AfterCommitBeforeSnapshot,
1411        AfterSnapshot,
1412    }
1413
1414    fn append_entry(seq: u64, topic: &str, payload_type: &str, payload: Bytes) -> AppendEntry {
1415        let ts = UnixNanos::from(seq);
1416        let headers = Headers::empty();
1417        let hash = compute_entry_hash(seq, ts, ts, topic, payload_type, &payload, &headers);
1418        let entry = EventStoreEntry::new(
1419            hash,
1420            seq,
1421            headers,
1422            Topic::from(topic),
1423            Ustr::from(payload_type),
1424            payload,
1425            ts,
1426            ts,
1427        );
1428        AppendEntry::without_indices(entry)
1429    }
1430
1431    fn make_submit_order(client_order_id: ClientOrderId) -> SubmitOrder {
1432        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
1433        let order_init = OrderInitializedSpec::builder()
1434            .instrument_id(instrument_id)
1435            .client_order_id(client_order_id)
1436            .quantity(Quantity::from("1"))
1437            .time_in_force(TimeInForce::Gtc)
1438            .ts_event(UnixNanos::from(1))
1439            .ts_init(UnixNanos::from(2))
1440            .build();
1441        SubmitOrder::new(
1442            TraderId::from("TRADER-001"),
1443            Some(ClientId::from("BINANCE")),
1444            StrategyId::from("S-001"),
1445            instrument_id,
1446            client_order_id,
1447            order_init,
1448            None,
1449            None,
1450            None,
1451            UUID4::new(),
1452            UnixNanos::from(3),
1453            None, // correlation_id
1454        )
1455    }
1456
1457    fn append_run_started(seq: u64) -> AppendEntry {
1458        append_entry(
1459            seq,
1460            RUN_STARTED_TOPIC,
1461            RUN_STARTED_PAYLOAD_TYPE,
1462            encode_run_started(&RegisteredComponents::default()),
1463        )
1464    }
1465
1466    #[derive(Debug)]
1467    struct TestAuditMessage {
1468        value: u8,
1469    }
1470
1471    fn test_registry() -> EncoderRegistry {
1472        let mut registry = EncoderRegistry::new();
1473        registry.register::<TestAuditMessage, _>(Ustr::from("TestAuditMessage"), |message| {
1474            Ok(EncodedPayload::without_indices(Bytes::copy_from_slice(&[
1475                message.value,
1476            ])))
1477        });
1478        registry
1479    }
1480
1481    fn wait_for_high_watermark(store: &EventStoreLifecycle, expected: u64) {
1482        let deadline = Instant::now() + Duration::from_secs(2);
1483
1484        loop {
1485            let hwm = store
1486                .session
1487                .as_ref()
1488                .map_or(0, EventStoreSession::high_watermark);
1489
1490            if hwm >= expected {
1491                break;
1492            }
1493            assert!(
1494                Instant::now() < deadline,
1495                "event store high_watermark did not reach {expected} within deadline (hwm={hwm})",
1496            );
1497            thread::sleep(Duration::from_millis(2));
1498        }
1499    }
1500
1501    #[derive(Debug, Clone)]
1502    struct SharedMemoryBackend(Arc<Mutex<MemoryBackend>>);
1503
1504    impl EventStore for SharedMemoryBackend {
1505        fn open_run(&mut self, manifest: RunManifest) -> Result<(), EventStoreError> {
1506            self.0.lock().expect("memory backend").open_run(manifest)
1507        }
1508
1509        fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
1510            self.0.lock().expect("memory backend").append_batch(entries)
1511        }
1512
1513        fn scan_range(
1514            &self,
1515            from: u64,
1516            to: u64,
1517            direction: ScanDirection,
1518        ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
1519            self.0
1520                .lock()
1521                .expect("memory backend")
1522                .scan_range(from, to, direction)
1523        }
1524
1525        fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
1526            self.0.lock().expect("memory backend").scan_seq(seq)
1527        }
1528
1529        fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
1530            self.0.lock().expect("memory backend").lookup(kind, key)
1531        }
1532
1533        fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
1534            self.0.lock().expect("memory backend").iter_index_keys(kind)
1535        }
1536
1537        fn record_snapshot_anchor(
1538            &mut self,
1539            anchor: SnapshotAnchor,
1540        ) -> Result<(), EventStoreError> {
1541            self.0
1542                .lock()
1543                .expect("memory backend")
1544                .record_snapshot_anchor(anchor)
1545        }
1546
1547        fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
1548            self.0
1549                .lock()
1550                .expect("memory backend")
1551                .latest_snapshot_anchor()
1552        }
1553
1554        fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
1555            self.0.lock().expect("memory backend").seal(status)
1556        }
1557
1558        fn manifest(&self) -> Result<RunManifest, EventStoreError> {
1559            self.0.lock().expect("memory backend").manifest()
1560        }
1561
1562        fn high_watermark(&self) -> Result<u64, EventStoreError> {
1563            self.0.lock().expect("memory backend").high_watermark()
1564        }
1565    }
1566
1567    fn seed_crashed_predecessor(config: &EventStoreConfig, run_id: &str, crash_point: CrashPoint) {
1568        let mut backend = RedbBackend::new(config.base_dir.clone());
1569        backend
1570            .open_run(build_manifest(
1571                config,
1572                INSTANCE_ID,
1573                run_id.to_string(),
1574                None,
1575                UnixNanos::from(1_000),
1576                RegisteredComponents::default(),
1577            ))
1578            .expect("open predecessor");
1579
1580        match crash_point {
1581            // An entry sitting only in the writer channel leaves no durable redb
1582            // footprint after process death, so these two fault points intentionally
1583            // recover from the same on-disk state.
1584            CrashPoint::BeforeEnqueue | CrashPoint::AfterEnqueueBeforeCommit => {}
1585            CrashPoint::AfterCommitBeforeSnapshot => {
1586                backend
1587                    .append_batch(&[append_run_started(1)])
1588                    .expect("append committed entry");
1589            }
1590            CrashPoint::AfterSnapshot => {
1591                backend
1592                    .append_batch(&[append_run_started(1)])
1593                    .expect("append committed entry");
1594                backend
1595                    .record_snapshot_anchor(SnapshotAnchor::new(
1596                        1,
1597                        "cache://snapshot/run-crash/1",
1598                        "blake3:abc",
1599                    ))
1600                    .expect("record snapshot anchor");
1601            }
1602        }
1603    }
1604
1605    #[rstest]
1606    fn halt_signal_callback_records_first_reason() {
1607        let signal = HaltSignal::new();
1608        let cb = signal.callback();
1609        cb(HaltReason::BackendDisk("ENOSPC".to_string()));
1610        cb(HaltReason::BackendError("second".to_string()));
1611
1612        assert!(signal.is_halted());
1613        match signal.reason() {
1614            Some(HaltReason::BackendDisk(msg)) => assert!(msg.contains("ENOSPC")),
1615            other => panic!("expected first reason BackendDisk, was {other:?}"),
1616        }
1617    }
1618
1619    #[rstest]
1620    fn recover_predecessors_returns_empty_for_missing_directory() {
1621        let tmp = TempDir::new().expect("tempdir");
1622        let outcome =
1623            recover_predecessors(tmp.path(), INSTANCE_ID).expect("recover empty directory");
1624        assert!(outcome.recovered.is_empty());
1625        assert!(outcome.parent_run_id.is_none());
1626    }
1627
1628    #[rstest]
1629    fn restore_cache_snapshot_blob_rejects_hash_mismatch() {
1630        let mut cache = Cache::default();
1631        let blob = Bytes::from_static(b"snapshot");
1632        let anchor =
1633            crate::SnapshotAnchor::new(0, "cache://position-snapshots/P-1/0", "blake3:bad");
1634
1635        cache
1636            .add(&anchor.blob_ref, blob)
1637            .expect("seed snapshot blob");
1638        let err =
1639            crate::restore_cache_snapshot_blob(&mut cache, Some(&anchor)).expect_err("hash error");
1640
1641        assert!(
1642            err.to_string().contains("content_hash mismatch"),
1643            "err was: {err}",
1644        );
1645    }
1646
1647    #[rstest]
1648    fn open_run_writes_run_started_and_advances_watermark() {
1649        let tmp = TempDir::new().expect("tempdir");
1650        let config = make_config(tmp.path().to_path_buf());
1651        let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover empty");
1652        assert!(outcome.parent_run_id.is_none());
1653
1654        let halt = HaltSignal::new();
1655        let session = open_run(
1656            &config,
1657            INSTANCE_ID,
1658            build_run_id(UnixNanos::from(1_000)),
1659            outcome.parent_run_id,
1660            UnixNanos::from(1_000),
1661            &RegisteredComponents::default(),
1662            halt,
1663            get_atomic_clock_static(),
1664        )
1665        .expect("open run");
1666
1667        // Watermark + run-status snapshot.
1668        assert_eq!(session.high_watermark(), 1);
1669        assert_eq!(session.parent_run_id(), None);
1670
1671        // Every identity field must thread from EventStoreConfig into the manifest.
1672        // A field-swap mutation in build_manifest (e.g. assigning binary_hash from
1673        // config.identity.config_hash) would fail one of these assertions.
1674        let manifest = session.manifest();
1675        assert_eq!(manifest.instance_id, INSTANCE_ID);
1676        assert_eq!(manifest.status, RunStatus::Running);
1677        assert_eq!(manifest.binary_hash, "deadbeef");
1678        assert_eq!(manifest.schema_version, 1);
1679        assert_eq!(manifest.crate_versions, "feedface");
1680        assert_eq!(manifest.config_hash, "cafebabe");
1681        assert_eq!(manifest.start_ts_init, UnixNanos::from(1_000));
1682        assert_eq!(manifest.end_ts_init, None);
1683        assert!(
1684            manifest
1685                .feature_flags
1686                .contains(&"retention=full".to_string()),
1687            "feature_flags must record the retention mode, was {:?}",
1688            manifest.feature_flags,
1689        );
1690    }
1691
1692    #[rstest]
1693    fn lifecycle_options_default_registry_keeps_builtin_encoders() {
1694        let registry = EventStoreLifecycleOptions::default().build_registry();
1695
1696        assert!(registry.contains::<SubmitOrder>());
1697        assert!(registry.contains::<TradingCommand>());
1698        assert!(!registry.contains::<TestAuditMessage>());
1699    }
1700
1701    #[rstest]
1702    fn lifecycle_options_custom_registry_captures_registered_message() {
1703        let tmp = TempDir::new().expect("tempdir");
1704        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1705        let instance_id = UUID4::new();
1706        let options = EventStoreLifecycleOptions::new().with_encoder_registry(test_registry());
1707
1708        let mut store = EventStoreLifecycle::boot_with_options(
1709            Some(make_config(tmp.path().to_path_buf())),
1710            instance_id,
1711            clock_rc,
1712            options,
1713        )
1714        .expect("boot store");
1715        store
1716            .open(
1717                instance_id,
1718                &RegisteredComponents::default(),
1719                Environment::Backtest,
1720            )
1721            .expect("open run");
1722        let run_id = store.run_id().expect("run open").to_string();
1723
1724        let topic: MStr<msgbus::Topic> = MStr::from("events.test.audit");
1725        msgbus::publish_any(topic, &TestAuditMessage { value: 42 });
1726        wait_for_high_watermark(&store, 2);
1727
1728        drop(store);
1729
1730        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
1731            .expect("open sealed");
1732        let captured = sealed
1733            .scan_seq(2)
1734            .expect("scan")
1735            .expect("captured entry present");
1736
1737        assert_eq!(captured.payload_type.as_str(), "TestAuditMessage");
1738        assert_eq!(captured.topic.as_ref(), topic.as_str());
1739        assert_eq!(captured.payload.as_ref(), &[42]);
1740    }
1741
1742    #[rstest]
1743    fn lifecycle_options_memory_backend_opener_captures_and_seals() {
1744        let tmp = TempDir::new().expect("tempdir");
1745        let memory = Arc::new(Mutex::new(MemoryBackend::new()));
1746        let opener_memory = Arc::clone(&memory);
1747        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1748        let instance_id = UUID4::new();
1749        let options = EventStoreLifecycleOptions::new()
1750            .with_encoder_registry(test_registry())
1751            .with_backend_opener(move |_, manifest| {
1752                opener_memory
1753                    .lock()
1754                    .expect("memory backend")
1755                    .open_run(manifest.clone())?;
1756                Ok(Box::new(SharedMemoryBackend(Arc::clone(&opener_memory))))
1757            });
1758
1759        let mut store = EventStoreLifecycle::boot_with_options(
1760            Some(make_config(tmp.path().to_path_buf())),
1761            instance_id,
1762            clock_rc,
1763            options,
1764        )
1765        .expect("boot store");
1766        store
1767            .open(
1768                instance_id,
1769                &RegisteredComponents::default(),
1770                Environment::Backtest,
1771            )
1772            .expect("open run");
1773
1774        let topic: MStr<msgbus::Topic> = MStr::from("events.test.memory");
1775        msgbus::publish_any(topic, &TestAuditMessage { value: 7 });
1776        wait_for_high_watermark(&store, 2);
1777
1778        store.seal(UnixNanos::from(1_000));
1779
1780        let backend = memory.lock().expect("memory backend");
1781        let manifest = backend.manifest().expect("manifest");
1782        let captured = backend
1783            .scan_seq(2)
1784            .expect("scan")
1785            .expect("captured entry present");
1786
1787        assert_eq!(manifest.instance_id, instance_id.to_string());
1788        assert_eq!(manifest.status, RunStatus::Ended);
1789        assert_eq!(manifest.high_watermark, 3);
1790        assert_eq!(captured.payload_type.as_str(), "TestAuditMessage");
1791        assert_eq!(captured.topic.as_ref(), topic.as_str());
1792        assert_eq!(captured.payload.as_ref(), &[7]);
1793    }
1794
1795    #[cfg(madsim)]
1796    #[rstest]
1797    fn lifecycle_options_memory_backend_opener_captures_deterministic_seq_order_under_madsim() {
1798        let first = capture_madsim_memory_lifecycle_summary(42);
1799        let second = capture_madsim_memory_lifecycle_summary(42);
1800        let expected = expected_madsim_memory_entries();
1801
1802        assert_eq!(first.entries, second.entries);
1803        assert_eq!(first.entries, expected);
1804        assert_eq!(
1805            first
1806                .entries
1807                .iter()
1808                .map(|entry| entry.seq)
1809                .collect::<Vec<_>>(),
1810            vec![1, 2, 3, 4],
1811        );
1812        assert!(
1813            first.redb_files.is_empty(),
1814            "memory opener must not create redb files, was {:?}",
1815            first.redb_files,
1816        );
1817        assert!(
1818            second.redb_files.is_empty(),
1819            "memory opener must not create redb files, was {:?}",
1820            second.redb_files,
1821        );
1822    }
1823
1824    #[cfg(madsim)]
1825    fn expected_madsim_memory_entries() -> Vec<CapturedEntrySummary> {
1826        vec![
1827            CapturedEntrySummary {
1828                seq: 1,
1829                topic: RUN_STARTED_TOPIC.to_string(),
1830                payload_type: RUN_STARTED_PAYLOAD_TYPE.to_string(),
1831                payload: encode_run_started(&RegisteredComponents::default()).to_vec(),
1832                ts_init: UnixNanos::from(0),
1833                ts_publish: UnixNanos::from(10_000),
1834            },
1835            CapturedEntrySummary {
1836                seq: 2,
1837                topic: "events.test.madsim".to_string(),
1838                payload_type: "TestAuditMessage".to_string(),
1839                payload: vec![1],
1840                ts_init: UnixNanos::from(20_000),
1841                ts_publish: UnixNanos::from(20_000),
1842            },
1843            CapturedEntrySummary {
1844                seq: 3,
1845                topic: "events.test.madsim".to_string(),
1846                payload_type: "TestAuditMessage".to_string(),
1847                payload: vec![2],
1848                ts_init: UnixNanos::from(30_000),
1849                ts_publish: UnixNanos::from(30_000),
1850            },
1851            CapturedEntrySummary {
1852                seq: 4,
1853                topic: RUN_ENDED_TOPIC.to_string(),
1854                payload_type: RUN_ENDED_PAYLOAD_TYPE.to_string(),
1855                payload: Vec::new(),
1856                ts_init: UnixNanos::from(40_000),
1857                ts_publish: UnixNanos::from(40_000),
1858            },
1859        ]
1860    }
1861
1862    #[rstest]
1863    fn open_run_with_options_surfaces_backend_opener_error() {
1864        let tmp = TempDir::new().expect("tempdir");
1865        let config = make_config(tmp.path().to_path_buf());
1866        let options = EventStoreLifecycleOptions::new().with_backend_opener(|_, _| {
1867            Err(EventStoreError::Backend(
1868                "test backend open failed".to_string(),
1869            ))
1870        });
1871
1872        let err = open_run_with_options(
1873            &config,
1874            INSTANCE_ID,
1875            "run-open-error".to_string(),
1876            None,
1877            UnixNanos::from(5_000),
1878            &RegisteredComponents::default(),
1879            HaltSignal::new(),
1880            get_atomic_clock_static(),
1881            &options,
1882        )
1883        .expect_err("backend opener error must stop run open");
1884
1885        match err {
1886            BootError::EventStore(EventStoreError::Backend(msg)) => {
1887                assert!(msg.contains("test backend open failed"));
1888            }
1889            other => panic!("expected backend open failure, was {other:?}"),
1890        }
1891    }
1892
1893    #[cfg(madsim)]
1894    #[derive(Debug, PartialEq, Eq)]
1895    struct MadsimMemoryLifecycleCapture {
1896        entries: Vec<CapturedEntrySummary>,
1897        redb_files: Vec<PathBuf>,
1898    }
1899
1900    #[cfg(madsim)]
1901    #[derive(Debug, PartialEq, Eq)]
1902    struct CapturedEntrySummary {
1903        seq: u64,
1904        topic: String,
1905        payload_type: String,
1906        payload: Vec<u8>,
1907        ts_init: UnixNanos,
1908        ts_publish: UnixNanos,
1909    }
1910
1911    #[cfg(madsim)]
1912    fn capture_madsim_memory_lifecycle_summary(seed: u64) -> MadsimMemoryLifecycleCapture {
1913        get_atomic_clock_static().set_time(UnixNanos::from(10_000));
1914
1915        let tmp = TempDir::new().expect("tempdir");
1916        let memory = Arc::new(Mutex::new(MemoryBackend::new()));
1917        let opener_memory = Arc::clone(&memory);
1918        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
1919        let instance_id = UUID4::new();
1920        let mut config = make_config(tmp.path().to_path_buf());
1921        config.identity.seed = Some(seed);
1922        let options = EventStoreLifecycleOptions::new()
1923            .with_encoder_registry(test_registry())
1924            .with_backend_opener(move |_, manifest| {
1925                opener_memory
1926                    .lock()
1927                    .expect("memory backend")
1928                    .open_run(manifest.clone())?;
1929                Ok(Box::new(SharedMemoryBackend(Arc::clone(&opener_memory))))
1930            });
1931
1932        let mut store =
1933            EventStoreLifecycle::boot_with_options(Some(config), instance_id, clock_rc, options)
1934                .expect("boot store");
1935        store
1936            .open(
1937                instance_id,
1938                &RegisteredComponents::default(),
1939                Environment::Backtest,
1940            )
1941            .expect("open run");
1942
1943        let topic: MStr<msgbus::Topic> = MStr::from("events.test.madsim");
1944        get_atomic_clock_static().set_time(UnixNanos::from(20_000));
1945        msgbus::publish_any(topic, &TestAuditMessage { value: 1 });
1946        get_atomic_clock_static().set_time(UnixNanos::from(30_000));
1947        msgbus::publish_any(topic, &TestAuditMessage { value: 2 });
1948        assert_eq!(
1949            store
1950                .session
1951                .as_ref()
1952                .expect("open session")
1953                .high_watermark(),
1954            3
1955        );
1956
1957        get_atomic_clock_static().set_time(UnixNanos::from(40_000));
1958        store.seal(UnixNanos::from(40_000));
1959
1960        let backend = memory.lock().expect("memory backend");
1961        let manifest = backend.manifest().expect("manifest");
1962        assert_eq!(manifest.seed, Some(seed));
1963        assert_eq!(manifest.status, RunStatus::Ended);
1964        assert_eq!(manifest.high_watermark, 4);
1965        let entries = backend
1966            .scan_range(1, manifest.high_watermark, ScanDirection::Forward)
1967            .expect("scan entries")
1968            .into_iter()
1969            .map(|entry| CapturedEntrySummary {
1970                seq: entry.seq,
1971                topic: entry.topic.as_ref().to_string(),
1972                payload_type: entry.payload_type.as_str().to_string(),
1973                payload: entry.payload.to_vec(),
1974                ts_init: entry.ts_init,
1975                ts_publish: entry.ts_publish,
1976            })
1977            .collect();
1978        drop(backend);
1979
1980        MadsimMemoryLifecycleCapture {
1981            entries,
1982            redb_files: redb_files_under(tmp.path()),
1983        }
1984    }
1985
1986    #[cfg(madsim)]
1987    fn redb_files_under(dir: &Path) -> Vec<PathBuf> {
1988        let mut paths = Vec::new();
1989        collect_redb_files(dir, &mut paths);
1990        paths.sort();
1991        paths
1992    }
1993
1994    #[cfg(madsim)]
1995    fn collect_redb_files(dir: &Path, paths: &mut Vec<PathBuf>) {
1996        for entry in std::fs::read_dir(dir).expect("read dir") {
1997            let path = entry.expect("dir entry").path();
1998            if path.is_dir() {
1999                collect_redb_files(&path, paths);
2000            } else if path
2001                .extension()
2002                .is_some_and(|extension| extension == "redb")
2003            {
2004                paths.push(path);
2005            }
2006        }
2007    }
2008
2009    #[rstest]
2010    fn close_seals_manifest_and_records_run_ended() {
2011        let tmp = TempDir::new().expect("tempdir");
2012        let config = make_config(tmp.path().to_path_buf());
2013
2014        let halt = HaltSignal::new();
2015        let mut session = open_run(
2016            &config,
2017            INSTANCE_ID,
2018            build_run_id(UnixNanos::from(2_000)),
2019            None,
2020            UnixNanos::from(2_000),
2021            &RegisteredComponents::default(),
2022            halt,
2023            get_atomic_clock_static(),
2024        )
2025        .expect("open run");
2026
2027        let run_id = session.run_id().to_string();
2028        session.close(UnixNanos::from(3_000)).expect("close");
2029
2030        let manifests = RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2031        let manifest = manifests
2032            .into_iter()
2033            .find(|m| m.run_id == run_id)
2034            .expect("manifest present");
2035        assert_eq!(manifest.status, RunStatus::Ended);
2036        assert!(manifest.high_watermark >= 2);
2037    }
2038
2039    #[rstest]
2040    fn snapshot_anchorer_persists_anchor_for_open_session() {
2041        let tmp = TempDir::new().expect("tempdir");
2042        let config = make_config(tmp.path().to_path_buf());
2043
2044        let halt = HaltSignal::new();
2045        let mut session = open_run(
2046            &config,
2047            INSTANCE_ID,
2048            build_run_id(UnixNanos::from(4_000)),
2049            None,
2050            UnixNanos::from(4_000),
2051            &RegisteredComponents::default(),
2052            halt,
2053            get_atomic_clock_static(),
2054        )
2055        .expect("open run");
2056
2057        let run_id = session.run_id().to_string();
2058
2059        {
2060            let anchorer = session.snapshot_anchorer().expect("snapshot anchorer");
2061            anchorer(CacheSnapshotRef::new(
2062                "cache://position-snapshots/P-1/0",
2063                Bytes::from_static(b"snapshot"),
2064            ))
2065            .expect("record snapshot anchor");
2066        }
2067
2068        session.close(UnixNanos::from(4_500)).expect("close");
2069
2070        let reader =
2071            RedbBackend::open_sealed(&config.base_dir, INSTANCE_ID, &run_id).expect("open sealed");
2072        let anchor = reader
2073            .latest_snapshot_anchor()
2074            .expect("latest snapshot anchor")
2075            .expect("anchor present");
2076
2077        assert_eq!(anchor.high_watermark, 1);
2078        assert_eq!(anchor.blob_ref, "cache://position-snapshots/P-1/0");
2079        assert_eq!(
2080            anchor.content_hash,
2081            compute_snapshot_content_hash(b"snapshot"),
2082        );
2083    }
2084
2085    #[rstest]
2086    fn recovery_seals_tail_ending_in_run_ended_as_ended_not_crashed() {
2087        // The writer commits RunEnded before sealing the manifest. A crash between
2088        // those two steps leaves the manifest Running while the tail already proves
2089        // graceful close: recovery must seal as Ended (not CrashedRecovered) and
2090        // must not chain the next run to it as a crashed parent.
2091        //
2092        // Reproduce the in-between state by submitting a RunEnded draft through the
2093        // writer's normal append path and then dropping the session without going
2094        // through close() (which is what would have sealed the manifest).
2095        let tmp = TempDir::new().expect("tempdir");
2096        let config = make_config(tmp.path().to_path_buf());
2097
2098        let halt = HaltSignal::new();
2099        let run_id = build_run_id(UnixNanos::from(7_000));
2100        let session = open_run(
2101            &config,
2102            INSTANCE_ID,
2103            run_id.clone(),
2104            None,
2105            UnixNanos::from(7_000),
2106            &RegisteredComponents::default(),
2107            halt,
2108            get_atomic_clock_static(),
2109        )
2110        .expect("open run");
2111
2112        let writer = session.writer.as_ref().expect("writer attached");
2113        writer
2114            .submit(run_ended_draft(UnixNanos::from(7_500)))
2115            .expect("submit RunEnded as tail entry");
2116        // Wait until the writer durably commits the RunEnded entry before dropping;
2117        // otherwise the on-disk tail might not include it and the recovery test
2118        // would fall back to CrashedRecovered for an unrelated reason.
2119        let deadline = Instant::now() + Duration::from_secs(2);
2120
2121        while session.high_watermark() < 2 {
2122            assert!(
2123                Instant::now() < deadline,
2124                "writer high_watermark stuck at {} before deadline",
2125                session.high_watermark(),
2126            );
2127            thread::sleep(Duration::from_millis(2));
2128        }
2129        drop(session);
2130
2131        let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2132        assert_eq!(outcome.recovered.len(), 1);
2133        assert_eq!(outcome.recovered[0].run_id, run_id);
2134        assert_eq!(
2135            outcome.recovered[0].status,
2136            RunStatus::Ended,
2137            "tail ending in RunEnded must seal as Ended",
2138        );
2139        assert!(
2140            outcome.parent_run_id.is_none(),
2141            "Ended runs must not become parents",
2142        );
2143
2144        let manifests = RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2145        let manifest = manifests
2146            .into_iter()
2147            .find(|m| m.run_id == run_id)
2148            .expect("manifest present");
2149        assert_eq!(manifest.status, RunStatus::Ended);
2150    }
2151
2152    #[rstest]
2153    fn drop_without_close_leaves_run_for_next_boot_to_recover() {
2154        let tmp = TempDir::new().expect("tempdir");
2155        let config = make_config(tmp.path().to_path_buf());
2156
2157        let halt = HaltSignal::new();
2158        let session = open_run(
2159            &config,
2160            INSTANCE_ID,
2161            build_run_id(UnixNanos::from(4_000)),
2162            None,
2163            UnixNanos::from(4_000),
2164            &RegisteredComponents::default(),
2165            halt,
2166            get_atomic_clock_static(),
2167        )
2168        .expect("open run");
2169        let run_id = session.run_id().to_string();
2170        drop(session);
2171
2172        let outcome =
2173            recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover after crash");
2174        assert_eq!(outcome.recovered.len(), 1);
2175        assert_eq!(outcome.recovered[0].run_id, run_id);
2176        assert_eq!(outcome.recovered[0].status, RunStatus::CrashedRecovered);
2177        assert_eq!(outcome.parent_run_id.as_deref(), Some(run_id.as_str()));
2178
2179        let manifests = RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2180        let sealed = manifests
2181            .into_iter()
2182            .find(|m| m.run_id == run_id)
2183            .expect("present");
2184        assert_eq!(sealed.status, RunStatus::CrashedRecovered);
2185    }
2186
2187    #[rstest]
2188    fn build_run_id_uses_expected_format() {
2189        // Format: "<start_ts_init>-<8 hex chars>". The prefix is sortable by start
2190        // time so directory listings produce chronological order; the suffix
2191        // disambiguates concurrent starts at the same nanosecond.
2192        let id = build_run_id(UnixNanos::from(123_456));
2193        let (prefix, suffix) = id.split_once('-').expect("run id must contain a hyphen");
2194        assert_eq!(prefix, "123456");
2195        assert_eq!(suffix.len(), 8, "suffix was {suffix:?}");
2196        assert!(
2197            suffix.chars().all(|c| c.is_ascii_hexdigit()),
2198            "suffix must be hex, was {suffix:?}",
2199        );
2200    }
2201
2202    #[rstest]
2203    fn crash_recovery_seals_predecessor_and_links_parent_run_id() {
2204        // SPEC Phase 7 acceptance: kill mid-run, restart, assert the predecessor seals
2205        // as CrashedRecovered, the new run's parent_run_id points to it, and the new
2206        // run's first entry is a RunStarted at seq=1.
2207        let tmp = TempDir::new().expect("tempdir");
2208        let config = make_config(tmp.path().to_path_buf());
2209
2210        // Kernel boot 1: open a run and crash (drop the session without close).
2211        let halt_first = HaltSignal::new();
2212        let first = open_run(
2213            &config,
2214            INSTANCE_ID,
2215            build_run_id(UnixNanos::from(10_000)),
2216            None,
2217            UnixNanos::from(10_000),
2218            &RegisteredComponents::default(),
2219            halt_first,
2220            get_atomic_clock_static(),
2221        )
2222        .expect("open first run");
2223        let crashed_run_id = first.run_id().to_string();
2224        drop(first);
2225
2226        // Kernel boot 2: recover predecessors then open the next run.
2227        let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2228        assert_eq!(outcome.recovered.len(), 1);
2229        assert_eq!(outcome.recovered[0].run_id, crashed_run_id);
2230        assert_eq!(outcome.recovered[0].status, RunStatus::CrashedRecovered);
2231        assert_eq!(
2232            outcome.parent_run_id.as_deref(),
2233            Some(crashed_run_id.as_str())
2234        );
2235
2236        // Predecessor's on-disk manifest is sealed CrashedRecovered.
2237        let manifests_after_seal =
2238            RedbBackend::list_runs(&config.base_dir, INSTANCE_ID).expect("list");
2239        let predecessor = manifests_after_seal
2240            .iter()
2241            .find(|m| m.run_id == crashed_run_id)
2242            .expect("predecessor present");
2243        assert_eq!(predecessor.status, RunStatus::CrashedRecovered);
2244
2245        // New run is opened with parent_run_id pointing to the predecessor.
2246        let halt_second = HaltSignal::new();
2247        let new_run_id = build_run_id(UnixNanos::from(20_000));
2248        let next = open_run(
2249            &config,
2250            INSTANCE_ID,
2251            new_run_id.clone(),
2252            outcome.parent_run_id,
2253            UnixNanos::from(20_000),
2254            &RegisteredComponents::default(),
2255            halt_second,
2256            get_atomic_clock_static(),
2257        )
2258        .expect("open second run");
2259        assert_eq!(next.parent_run_id(), Some(crashed_run_id.as_str()));
2260        assert_eq!(
2261            next.manifest().parent_run_id.as_deref(),
2262            Some(crashed_run_id.as_str()),
2263        );
2264        assert_eq!(next.high_watermark(), 1, "RunStarted is the first entry");
2265
2266        // The first entry in the new run is RunStarted at seq=1; close cleanly so we
2267        // can read the on-disk file without contending with the writer's lock.
2268        drop(next);
2269        let outcome_after = recover_predecessors(&config.base_dir, INSTANCE_ID)
2270            .expect("recover after second crash");
2271        // Only the second run shows up because the first is already sealed.
2272        assert_eq!(outcome_after.recovered.len(), 1);
2273        assert_eq!(outcome_after.recovered[0].run_id, new_run_id);
2274        assert_eq!(
2275            outcome_after.recovered[0].status,
2276            RunStatus::CrashedRecovered,
2277        );
2278
2279        // Open the recovered run read-only and verify seq=1 is RunStarted.
2280        let sealed = RedbBackend::open_sealed(&config.base_dir, INSTANCE_ID, &new_run_id)
2281            .expect("open sealed");
2282        let first_entry = sealed
2283            .scan_seq(1)
2284            .expect("scan")
2285            .expect("RunStarted present");
2286        assert_eq!(first_entry.payload_type.as_str(), "RunStarted");
2287        assert_eq!(first_entry.topic.as_ref(), "run.lifecycle.RunStarted");
2288    }
2289
2290    #[rstest]
2291    #[case::before_enqueue(CrashPoint::BeforeEnqueue, 0, false)]
2292    #[case::after_enqueue_before_commit(CrashPoint::AfterEnqueueBeforeCommit, 0, false)]
2293    #[case::after_commit_before_snapshot(CrashPoint::AfterCommitBeforeSnapshot, 1, false)]
2294    #[case::after_snapshot(CrashPoint::AfterSnapshot, 1, true)]
2295    fn crash_recovery_matrix_seals_predecessor_and_links_parent_run_id(
2296        #[case] crash_point: CrashPoint,
2297        #[case] expected_hwm: u64,
2298        #[case] expect_snapshot_anchor: bool,
2299    ) {
2300        let tmp = TempDir::new().expect("tempdir");
2301        let config = make_config(tmp.path().to_path_buf());
2302        let predecessor_run_id = format!("3000-{crash_point:?}");
2303        seed_crashed_predecessor(&config, &predecessor_run_id, crash_point);
2304
2305        let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2306        assert_eq!(outcome.recovered.len(), 1);
2307        assert_eq!(outcome.recovered[0].run_id, predecessor_run_id);
2308        assert_eq!(outcome.recovered[0].status, RunStatus::CrashedRecovered);
2309        assert_eq!(
2310            outcome.parent_run_id.as_deref(),
2311            Some(predecessor_run_id.as_str()),
2312        );
2313
2314        let predecessor =
2315            RedbBackend::open_sealed(&config.base_dir, INSTANCE_ID, &predecessor_run_id)
2316                .expect("open sealed predecessor");
2317        let manifest = predecessor.manifest().expect("manifest");
2318        let snapshot_anchor = predecessor.latest_snapshot_anchor().expect("anchor read");
2319
2320        assert_eq!(manifest.status, RunStatus::CrashedRecovered);
2321        assert_eq!(manifest.high_watermark, expected_hwm);
2322        assert_eq!(
2323            snapshot_anchor.is_some(),
2324            expect_snapshot_anchor,
2325            "snapshot anchor presence must match crash point",
2326        );
2327
2328        let next = open_run(
2329            &config,
2330            INSTANCE_ID,
2331            "4000-next".to_string(),
2332            outcome.parent_run_id,
2333            UnixNanos::from(4_000),
2334            &RegisteredComponents::default(),
2335            HaltSignal::new(),
2336            get_atomic_clock_static(),
2337        )
2338        .expect("open next run");
2339        assert_eq!(next.parent_run_id(), Some(predecessor_run_id.as_str()));
2340        assert_eq!(
2341            next.manifest().parent_run_id.as_deref(),
2342            Some(predecessor_run_id.as_str()),
2343        );
2344    }
2345
2346    #[rstest]
2347    fn kernel_event_store_open_seals_leftover_session_before_reopen() {
2348        // BacktestEngine::run -> reset -> run reuses the kernel. EventStoreLifecycle::open
2349        // must seal any leftover session before opening a fresh one so RunStarted is
2350        // the first entry of every run. The UUID suffix in build_run_id keeps the
2351        // two ids distinct even though TestClock holds start_ts_init at zero.
2352        let tmp = TempDir::new().expect("tempdir");
2353        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2354        let instance_id = UUID4::new();
2355
2356        let mut store = EventStoreLifecycle::boot(
2357            Some(make_config(tmp.path().to_path_buf())),
2358            instance_id,
2359            clock_rc,
2360        )
2361        .expect("boot store");
2362
2363        store
2364            .open(
2365                instance_id,
2366                &RegisteredComponents::default(),
2367                Environment::Backtest,
2368            )
2369            .expect("open first run");
2370        let run_one = store.run_id().expect("run one open").to_string();
2371
2372        store
2373            .open(
2374                instance_id,
2375                &RegisteredComponents::default(),
2376                Environment::Backtest,
2377            )
2378            .expect("open second run");
2379        let run_two = store.run_id().expect("run two open").to_string();
2380
2381        assert_ne!(run_one, run_two, "second open must produce a fresh run id");
2382
2383        // Drop the wrapper so any open run seals via Drop, then read both manifests
2384        // off disk and assert each closed cleanly as Ended.
2385        drop(store);
2386        let manifests =
2387            RedbBackend::list_runs(tmp.path(), &instance_id.to_string()).expect("list runs");
2388        let m1 = manifests
2389            .iter()
2390            .find(|m| m.run_id == run_one)
2391            .expect("first run present");
2392        let m2 = manifests
2393            .iter()
2394            .find(|m| m.run_id == run_two)
2395            .expect("second run present");
2396        assert_eq!(m1.status, RunStatus::Ended);
2397        assert_eq!(m2.status, RunStatus::Ended);
2398    }
2399
2400    #[rstest]
2401    fn recover_picks_most_recent_crashed_recovered_as_parent() {
2402        // With multiple unsealed predecessors, the sweep must seal every one and the
2403        // new run's parent_run_id must point to the most recently started survivor.
2404        let tmp = TempDir::new().expect("tempdir");
2405        let config = make_config(tmp.path().to_path_buf());
2406
2407        for ts in [1_000_u64, 2_000_u64, 3_000_u64] {
2408            let session = open_run(
2409                &config,
2410                INSTANCE_ID,
2411                build_run_id(UnixNanos::from(ts)),
2412                None,
2413                UnixNanos::from(ts),
2414                &RegisteredComponents::default(),
2415                HaltSignal::new(),
2416                get_atomic_clock_static(),
2417            )
2418            .expect("open");
2419            drop(session);
2420        }
2421
2422        let outcome = recover_predecessors(&config.base_dir, INSTANCE_ID).expect("recover sweep");
2423        assert_eq!(outcome.recovered.len(), 3);
2424        assert!(
2425            outcome
2426                .recovered
2427                .iter()
2428                .all(|r| r.status == RunStatus::CrashedRecovered),
2429            "every predecessor must seal CrashedRecovered, was {:?}",
2430            outcome.recovered,
2431        );
2432        // Most-recent (start_ts_init=3_000) becomes the parent.
2433        let parent = outcome.parent_run_id.as_deref().expect("parent set");
2434        assert!(
2435            parent.starts_with("3000-"),
2436            "parent must be the run with the highest start_ts_init, was {parent}",
2437        );
2438    }
2439
2440    #[rstest]
2441    fn submit_run_started_returns_timeout_when_writer_stalls() {
2442        // A backend whose append_batch never returns simulates a stuck writer. The
2443        // wait loop must surface BootError::RunStartedTimeout after the configured
2444        // ceiling elapses, never block indefinitely.
2445        let stub = StallBackend::default();
2446        let manifest = manifest_for("run-timeout");
2447        let mut backend: Box<dyn EventStore + Send> = Box::new(stub.clone());
2448        backend.open_run(manifest).expect("open stub");
2449
2450        let halt = HaltSignal::new();
2451
2452        let writer = EventStoreWriter::spawn(
2453            backend,
2454            get_atomic_clock_static(),
2455            halt.callback(),
2456            WriterConfig::default(),
2457        )
2458        .expect("spawn writer");
2459
2460        let err = submit_run_started_blocking(
2461            &writer,
2462            &RegisteredComponents::default(),
2463            UnixNanos::from(100),
2464            &halt,
2465            Duration::from_millis(20),
2466        )
2467        .expect_err("must time out");
2468
2469        match err {
2470            BootError::RunStartedTimeout { timeout } => {
2471                assert_eq!(timeout, Duration::from_millis(20));
2472            }
2473            other => panic!("expected RunStartedTimeout, was {other:?}"),
2474        }
2475
2476        // Release the gate so the writer thread can exit cleanly before drop.
2477        stub.release();
2478    }
2479
2480    #[rstest]
2481    fn submit_run_started_returns_halted_when_writer_halts_during_wait() {
2482        // A halt signal fired before the writer can commit must surface
2483        // BootError::HaltedDuringBoot with the recorded reason.
2484        let stub = StallBackend::default();
2485        let manifest = manifest_for("run-halt");
2486        let mut backend: Box<dyn EventStore + Send> = Box::new(stub.clone());
2487        backend.open_run(manifest).expect("open stub");
2488
2489        let halt = HaltSignal::new();
2490
2491        let writer = EventStoreWriter::spawn(
2492            backend,
2493            get_atomic_clock_static(),
2494            halt.callback(),
2495            WriterConfig::default(),
2496        )
2497        .expect("spawn writer");
2498
2499        // Fire the halt from a peer thread shortly after we begin waiting.
2500        let halt_for_thread = halt.clone();
2501
2502        let firer = thread::spawn(move || {
2503            thread::sleep(Duration::from_millis(10));
2504            halt_for_thread.callback()(HaltReason::BackendDisk("test stall".to_string()));
2505        });
2506
2507        let err = submit_run_started_blocking(
2508            &writer,
2509            &RegisteredComponents::default(),
2510            UnixNanos::from(200),
2511            &halt,
2512            Duration::from_secs(2),
2513        )
2514        .expect_err("must observe halt");
2515
2516        match err {
2517            BootError::HaltedDuringBoot(HaltReason::BackendDisk(msg)) => {
2518                assert!(msg.contains("test stall"), "reason msg was {msg}");
2519            }
2520            other => panic!("expected HaltedDuringBoot(BackendDisk), was {other:?}"),
2521        }
2522        firer.join().expect("halt firer joined");
2523        stub.release();
2524    }
2525
2526    fn manifest_for(run_id: &str) -> RunManifest {
2527        RunManifest {
2528            run_id: run_id.to_string(),
2529            parent_run_id: None,
2530            instance_id: INSTANCE_ID.to_string(),
2531            binary_hash: String::new(),
2532            schema_version: 1,
2533            crate_versions: String::new(),
2534            feature_flags: Vec::new(),
2535            adapter_versions: IndexMap::new(),
2536            config_hash: String::new(),
2537            registered_components: RegisteredComponents::default(),
2538            seed: None,
2539            start_ts_init: UnixNanos::default(),
2540            end_ts_init: None,
2541            high_watermark: 0,
2542            status: RunStatus::Running,
2543        }
2544    }
2545
2546    /// Stub backend whose `append_batch` blocks until `release()` is called. Used to
2547    /// hold the writer's high-watermark at zero so the boot path's wait loop can
2548    /// exercise its timeout and halt branches deterministically.
2549    #[derive(Debug, Default, Clone)]
2550    struct StallBackend {
2551        inner: Arc<Mutex<StallInner>>,
2552        gate: Arc<(Mutex<bool>, std::sync::Condvar)>,
2553    }
2554
2555    #[derive(Debug, Default)]
2556    struct StallInner {
2557        manifest: Option<RunManifest>,
2558    }
2559
2560    impl StallBackend {
2561        fn release(&self) {
2562            let (lock, cvar) = &*self.gate;
2563            *lock.lock().expect("gate") = true;
2564            cvar.notify_all();
2565        }
2566    }
2567
2568    impl crate::EventStore for StallBackend {
2569        fn open_run(&mut self, manifest: RunManifest) -> Result<(), EventStoreError> {
2570            self.inner.lock().expect("inner").manifest = Some(manifest);
2571            Ok(())
2572        }
2573
2574        fn append_batch(&mut self, _: &[crate::AppendEntry]) -> Result<u64, EventStoreError> {
2575            let (lock, cvar) = &*self.gate;
2576            let mut released = lock.lock().expect("gate");
2577
2578            while !*released {
2579                released = cvar.wait(released).expect("gate wait");
2580            }
2581            Ok(0)
2582        }
2583
2584        fn scan_range(
2585            &self,
2586            _: u64,
2587            _: u64,
2588            _: crate::ScanDirection,
2589        ) -> Result<Vec<crate::EventStoreEntry>, EventStoreError> {
2590            Ok(Vec::new())
2591        }
2592
2593        fn scan_seq(&self, _: u64) -> Result<Option<crate::EventStoreEntry>, EventStoreError> {
2594            Ok(None)
2595        }
2596
2597        fn lookup(&self, _: crate::IndexKind, _: &str) -> Result<Option<u64>, EventStoreError> {
2598            Ok(None)
2599        }
2600
2601        fn iter_index_keys(
2602            &self,
2603            _: crate::IndexKind,
2604        ) -> Result<Vec<(String, u64)>, EventStoreError> {
2605            Ok(Vec::new())
2606        }
2607
2608        fn seal(&mut self, _: RunStatus) -> Result<(), EventStoreError> {
2609            Ok(())
2610        }
2611
2612        fn manifest(&self) -> Result<RunManifest, EventStoreError> {
2613            self.inner
2614                .lock()
2615                .expect("inner")
2616                .manifest
2617                .clone()
2618                .ok_or_else(|| EventStoreError::Backend("no manifest".to_string()))
2619        }
2620
2621        fn high_watermark(&self) -> Result<u64, EventStoreError> {
2622            Ok(0)
2623        }
2624    }
2625
2626    /// Integration: the kernel-installed bus tap forwards a `SubmitOrder` dispatched
2627    /// through the typed-send path into the event store before any subscriber observes
2628    /// it. The captured entry carries the dispatching endpoint as the topic and the
2629    /// canonical `SubmitOrder` payload type tag.
2630    #[rstest]
2631    fn bus_tap_captures_submit_order_sent_through_msgbus() {
2632        let tmp = TempDir::new().expect("tempdir");
2633        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2634        let instance_id = UUID4::new();
2635
2636        let mut store = EventStoreLifecycle::boot(
2637            Some(make_config(tmp.path().to_path_buf())),
2638            instance_id,
2639            clock_rc,
2640        )
2641        .expect("boot store");
2642        store
2643            .open(
2644                instance_id,
2645                &RegisteredComponents::default(),
2646                Environment::Backtest,
2647            )
2648            .expect("open run");
2649        let run_id = store.run_id().expect("run open").to_string();
2650
2651        let trader_id = TraderId::from("TRADER-001");
2652        let strategy_id = StrategyId::from("S-001");
2653        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
2654        let client_order_id = ClientOrderId::from("O-20260510-000001");
2655        let order_init = OrderInitializedSpec::builder()
2656            .instrument_id(instrument_id)
2657            .client_order_id(client_order_id)
2658            .quantity(Quantity::from("1"))
2659            .time_in_force(TimeInForce::Gtc)
2660            .build();
2661        let submit_order = SubmitOrder::new(
2662            trader_id,
2663            Some(ClientId::from("BINANCE")),
2664            strategy_id,
2665            instrument_id,
2666            client_order_id,
2667            order_init,
2668            None,
2669            None,
2670            None,
2671            UUID4::new(),
2672            UnixNanos::from(3),
2673            None, // correlation_id
2674        );
2675
2676        let endpoint = MStr::<Endpoint>::from("test.exec.engine.process");
2677        msgbus::send_any_value(endpoint, &submit_order);
2678
2679        // RunStarted is seq=1; the captured SubmitOrder lands at seq=2 once the
2680        // writer commits.
2681        let deadline = Instant::now() + Duration::from_secs(2);
2682
2683        loop {
2684            let hwm = store
2685                .session
2686                .as_ref()
2687                .map_or(0, EventStoreSession::high_watermark);
2688
2689            if hwm >= 2 {
2690                break;
2691            }
2692            assert!(
2693                Instant::now() < deadline,
2694                "captured SubmitOrder did not commit within deadline (hwm={hwm})",
2695            );
2696            thread::sleep(Duration::from_millis(2));
2697        }
2698
2699        // Seal cleanly so we can re-open the run read-only
2700        drop(store);
2701
2702        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
2703            .expect("open sealed");
2704        let captured = sealed
2705            .scan_seq(2)
2706            .expect("scan")
2707            .expect("captured entry present");
2708        assert_eq!(captured.payload_type.as_str(), "SubmitOrder");
2709        assert_eq!(captured.topic.as_ref(), endpoint.as_str());
2710
2711        // The SubmitOrder encoder commits a ClientOrderId sidecar index; the lookup
2712        // must resolve to the captured seq.
2713        let by_client = sealed
2714            .lookup(IndexKind::ClientOrderId, client_order_id.as_str())
2715            .expect("lookup")
2716            .expect("indexed");
2717        assert_eq!(by_client, 2);
2718    }
2719
2720    #[rstest]
2721    fn kernel_with_markers_captures_snapshots_over_synthetic_bus() {
2722        let tmp = TempDir::new().expect("tempdir");
2723        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2724        let instance_id = UUID4::new();
2725        let mut config = make_config(tmp.path().to_path_buf());
2726        config.data_markers = Some(DataMarkerConfig {
2727            classes: vec![DataMarkerClass::BookDeltas, DataMarkerClass::Quote],
2728            safety_flush_interval: Duration::from_secs(1),
2729            channel_capacity: 128,
2730            high_fidelity: Vec::new(),
2731        });
2732
2733        let mut store =
2734            EventStoreLifecycle::boot(Some(config), instance_id, clock_rc).expect("boot store");
2735        store
2736            .open(
2737                instance_id,
2738                &RegisteredComponents::default(),
2739                Environment::Backtest,
2740            )
2741            .expect("open run");
2742        let run_id = store.run_id().expect("run open").to_string();
2743
2744        let first = make_submit_order(ClientOrderId::from("O-marker-1"));
2745        msgbus::send_any_value(MStr::<Endpoint>::from("test.exec.process"), &first);
2746
2747        let quote = quote_ethusdt_binance();
2748        msgbus::publish_quote(MStr::from("data.quotes.BINANCE.ETHUSDT-PERP"), &quote);
2749        let deltas = stub_deltas();
2750        msgbus::publish_deltas(MStr::from("data.deltas.XNAS.AAPL"), &deltas);
2751
2752        let second = make_submit_order(ClientOrderId::from("O-marker-2"));
2753        msgbus::send_any_value(MStr::<Endpoint>::from("test.exec.process"), &second);
2754        wait_for_high_watermark(&store, 3);
2755        store.seal(UnixNanos::from(500));
2756
2757        let marker_path = tmp
2758            .path()
2759            .join(instance_id.to_string())
2760            .join(format!("{run_id}.markers.redb"));
2761        let marker = RedbMarkerBackend::open_read_only_file(marker_path).expect("open markers");
2762        let snapshots = marker.scan_snapshots().expect("scan snapshots");
2763        let dict = marker.scan_dict().expect("scan dict");
2764
2765        assert_eq!(snapshots.len(), 1);
2766        assert_eq!(snapshots[0].event_seq_before, 3);
2767        assert_eq!(snapshots[0].advanced.len(), 2);
2768        assert_eq!(
2769            snapshots[0]
2770                .advanced
2771                .iter()
2772                .map(|cursor| cursor.count)
2773                .collect::<Vec<_>>(),
2774            vec![1, 1]
2775        );
2776        assert_eq!(
2777            dict.iter()
2778                .map(|entry| (entry.data_cls, entry.identifier.as_str()))
2779                .collect::<Vec<_>>(),
2780            vec![
2781                (DataClass::Quote, "ETHUSDT-PERP.BINANCE"),
2782                (DataClass::BookDeltas, "AAPL.XNAS"),
2783            ],
2784        );
2785    }
2786
2787    #[rstest]
2788    fn boot_recovery_ignores_marker_sidecar_files() {
2789        let tmp = TempDir::new().expect("tempdir");
2790        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2791        let instance_id = UUID4::new();
2792        let mut config = make_config(tmp.path().to_path_buf());
2793        config.data_markers = Some(DataMarkerConfig {
2794            classes: vec![DataMarkerClass::Quote],
2795            safety_flush_interval: Duration::from_secs(1),
2796            channel_capacity: 128,
2797            high_fidelity: Vec::new(),
2798        });
2799
2800        let mut store =
2801            EventStoreLifecycle::boot(Some(config.clone()), instance_id, Rc::clone(&clock_rc))
2802                .expect("boot store");
2803        store
2804            .open(
2805                instance_id,
2806                &RegisteredComponents::default(),
2807                Environment::Backtest,
2808            )
2809            .expect("open run");
2810        let run_id = store.run_id().expect("run open").to_string();
2811        store.seal(UnixNanos::from(500));
2812
2813        let marker_path = tmp
2814            .path()
2815            .join(instance_id.to_string())
2816            .join(format!("{run_id}.markers.redb"));
2817        assert!(marker_path.exists());
2818
2819        let rebooted = EventStoreLifecycle::boot(Some(config), instance_id, clock_rc)
2820            .expect("boot after marker sidecar");
2821
2822        assert!(rebooted.recovered().is_empty());
2823    }
2824
2825    #[rstest]
2826    fn marker_setup_failure_disables_markers_without_blocking_open() {
2827        let tmp = TempDir::new().expect("tempdir");
2828        let bad_base = tmp.path().join("not-a-directory");
2829        std::fs::write(&bad_base, b"not a directory").expect("write bad base");
2830        let memory = Arc::new(Mutex::new(MemoryBackend::new()));
2831        let opener_memory = Arc::clone(&memory);
2832        let mut config = make_config(bad_base);
2833        config.data_markers = Some(DataMarkerConfig {
2834            classes: vec![DataMarkerClass::Quote],
2835            safety_flush_interval: Duration::from_secs(1),
2836            channel_capacity: 128,
2837            high_fidelity: Vec::new(),
2838        });
2839        let options = EventStoreLifecycleOptions::new()
2840            .with_encoder_registry(test_registry())
2841            .with_backend_opener(move |_, manifest| {
2842                opener_memory
2843                    .lock()
2844                    .expect("memory backend")
2845                    .open_run(manifest.clone())?;
2846                Ok(Box::new(SharedMemoryBackend(Arc::clone(&opener_memory))))
2847            });
2848
2849        let mut session = open_run_with_options(
2850            &config,
2851            INSTANCE_ID,
2852            "run-marker-setup-fails".to_string(),
2853            None,
2854            UnixNanos::from(5_000),
2855            &RegisteredComponents::default(),
2856            HaltSignal::new(),
2857            get_atomic_clock_static(),
2858            &options,
2859        )
2860        .expect("open run despite marker failure");
2861
2862        assert!(session.marker_capture.is_none());
2863
2864        let topic: MStr<msgbus::Topic> = MStr::from("events.test.marker-fallback");
2865        session
2866            .adapter()
2867            .expect("adapter")
2868            .capture::<TestAuditMessage>(
2869                topic,
2870                &TestAuditMessage { value: 11 },
2871                Headers::empty(),
2872                UnixNanos::from(5_001),
2873            )
2874            .expect("capture");
2875        let deadline = Instant::now() + Duration::from_secs(2);
2876
2877        while session.high_watermark() < 2 {
2878            assert!(
2879                Instant::now() < deadline,
2880                "event-store high_watermark {} did not reach 2 within deadline",
2881                session.high_watermark(),
2882            );
2883            thread::sleep(Duration::from_millis(2));
2884        }
2885        session
2886            .close(UnixNanos::from(6_000))
2887            .expect("close session");
2888
2889        let backend = memory.lock().expect("memory backend");
2890        let captured = backend
2891            .scan_seq(2)
2892            .expect("scan")
2893            .expect("captured entry present");
2894
2895        assert_eq!(captured.payload_type.as_str(), "TestAuditMessage");
2896        assert_eq!(captured.topic.as_ref(), topic.as_str());
2897        assert_eq!(captured.payload.as_ref(), &[11]);
2898    }
2899
2900    #[rstest]
2901    fn marker_registry_factory_receives_enabled_classes() {
2902        let tmp = TempDir::new().expect("tempdir");
2903        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2904        let instance_id = UUID4::new();
2905        let seen_classes: Arc<Mutex<Vec<Vec<DataClass>>>> = Arc::new(Mutex::new(Vec::new()));
2906        let seen_for_factory = Arc::clone(&seen_classes);
2907        let mut config = make_config(tmp.path().to_path_buf());
2908        config.data_markers = Some(DataMarkerConfig {
2909            classes: vec![DataMarkerClass::Trade, DataMarkerClass::Quote],
2910            safety_flush_interval: Duration::from_secs(1),
2911            channel_capacity: 128,
2912            high_fidelity: Vec::new(),
2913        });
2914        let options =
2915            EventStoreLifecycleOptions::new().with_marker_registry_factory(move |classes| {
2916                seen_for_factory
2917                    .lock()
2918                    .expect("seen classes")
2919                    .push(classes.to_vec());
2920                DataMarkerExtractorRegistry::default_registry(classes)
2921            });
2922
2923        let mut store =
2924            EventStoreLifecycle::boot_with_options(Some(config), instance_id, clock_rc, options)
2925                .expect("boot store");
2926        store
2927            .open(
2928                instance_id,
2929                &RegisteredComponents::default(),
2930                Environment::Backtest,
2931            )
2932            .expect("open run");
2933        store.seal(UnixNanos::from(1_000));
2934
2935        let seen = seen_classes.lock().expect("seen classes");
2936        assert_eq!(seen.as_slice(), &[vec![DataClass::Trade, DataClass::Quote]]);
2937    }
2938
2939    #[rstest]
2940    fn markers_disabled_installs_no_file_and_no_cost() {
2941        let tmp = TempDir::new().expect("tempdir");
2942        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2943        let instance_id = UUID4::new();
2944
2945        let mut store = EventStoreLifecycle::boot(
2946            Some(make_config(tmp.path().to_path_buf())),
2947            instance_id,
2948            clock_rc,
2949        )
2950        .expect("boot store");
2951        store
2952            .open(
2953                instance_id,
2954                &RegisteredComponents::default(),
2955                Environment::Backtest,
2956            )
2957            .expect("open run");
2958        let run_id = store.run_id().expect("run open").to_string();
2959
2960        assert!(
2961            store
2962                .session
2963                .as_ref()
2964                .expect("session")
2965                .marker_capture
2966                .is_none()
2967        );
2968
2969        let quote = quote_ethusdt_binance();
2970        msgbus::publish_quote(MStr::from("data.quotes.BINANCE.ETHUSDT-PERP"), &quote);
2971        store.seal(UnixNanos::from(500));
2972
2973        let marker_path = tmp
2974            .path()
2975            .join(instance_id.to_string())
2976            .join(format!("{run_id}.markers.redb"));
2977        assert!(!marker_path.exists());
2978    }
2979
2980    /// Fired clock events do not pass through normal message bus publish/send calls.
2981    /// `TimeEventHandler::run` must still hit the installed tap so timer-driven
2982    /// strategy logic has a durable trigger record.
2983    #[rstest]
2984    fn bus_tap_captures_time_event_handler_run() {
2985        let tmp = TempDir::new().expect("tempdir");
2986        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
2987        let instance_id = UUID4::new();
2988
2989        let mut store = EventStoreLifecycle::boot(
2990            Some(make_config(tmp.path().to_path_buf())),
2991            instance_id,
2992            clock_rc,
2993        )
2994        .expect("boot store");
2995        store
2996            .open(
2997                instance_id,
2998                &RegisteredComponents::default(),
2999                Environment::Backtest,
3000            )
3001            .expect("open run");
3002        let run_id = store.run_id().expect("run open").to_string();
3003
3004        let event = TimeEvent::new(
3005            Ustr::from("strategy.heartbeat"),
3006            UUID4::new(),
3007            UnixNanos::from(100),
3008            UnixNanos::from(99),
3009        );
3010        let callback = TimeEventCallback::from(|_: TimeEvent| {});
3011        TimeEventHandler::new(event, callback).run();
3012
3013        let deadline = Instant::now() + Duration::from_secs(2);
3014
3015        loop {
3016            let hwm = store
3017                .session
3018                .as_ref()
3019                .map_or(0, EventStoreSession::high_watermark);
3020
3021            if hwm >= 2 {
3022                break;
3023            }
3024            assert!(
3025                Instant::now() < deadline,
3026                "captured TimeEvent did not commit within deadline (hwm={hwm})",
3027            );
3028            thread::sleep(Duration::from_millis(2));
3029        }
3030
3031        drop(store);
3032
3033        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3034            .expect("open sealed");
3035        let captured = sealed
3036            .scan_seq(2)
3037            .expect("scan")
3038            .expect("captured entry present");
3039
3040        assert_eq!(captured.payload_type.as_str(), PAYLOAD_TYPE_TIME_EVENT);
3041        assert_eq!(captured.topic, MessagingSwitchboard::time_event_topic());
3042    }
3043
3044    /// `EventStoreLifecycle::seal` must clear the bus tap so a publish issued after the
3045    /// run closes cannot reach the sealed writer. Without the clear, the dropped
3046    /// adapter would still receive captures and `Arc::try_unwrap` inside close would
3047    /// fail with multiple owners.
3048    #[rstest]
3049    fn seal_clears_bus_tap_so_post_seal_dispatches_do_not_capture() {
3050        let tmp = TempDir::new().expect("tempdir");
3051        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3052        let instance_id = UUID4::new();
3053
3054        let mut store = EventStoreLifecycle::boot(
3055            Some(make_config(tmp.path().to_path_buf())),
3056            instance_id,
3057            clock_rc,
3058        )
3059        .expect("boot store");
3060        store
3061            .open(
3062                instance_id,
3063                &RegisteredComponents::default(),
3064                Environment::Backtest,
3065            )
3066            .expect("open run");
3067        let run_id = store.run_id().expect("run open").to_string();
3068
3069        store.seal(UnixNanos::from(0));
3070
3071        // Post-seal dispatch: any tap that survived would either capture into the
3072        // dropped writer (panic via the channel close path) or hold the adapter Arc
3073        // and fail the close try_unwrap. The session is already gone, so this just
3074        // exercises the cleared-tap path through msgbus dispatch.
3075        let endpoint = MStr::<Endpoint>::from("test.post.seal.endpoint");
3076        let payload: u32 = 99;
3077        msgbus::send_any_value(endpoint, &payload);
3078
3079        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3080            .expect("open sealed");
3081        // RunStarted at seq=1, RunEnded at seq=2; no captured u32 entry must exist
3082        assert!(
3083            sealed.scan_seq(3).expect("scan").is_none(),
3084            "no entry must land after seal",
3085        );
3086    }
3087
3088    /// Production code reaches the bus tap with [`TradingCommand`] wrapped around the
3089    /// inner command (the wrapper's `TypeId`, not `SubmitOrder`'s). The envelope
3090    /// dispatcher in [`default_registry`] must unwrap the variant, stamp the inner
3091    /// `payload_type` (`SubmitOrder`), and commit the same indices the bare-type encoder
3092    /// would have produced.
3093    #[rstest]
3094    fn bus_tap_captures_trading_command_envelope_with_inner_payload_type() {
3095        let tmp = TempDir::new().expect("tempdir");
3096        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3097        let instance_id = UUID4::new();
3098
3099        let mut store = EventStoreLifecycle::boot(
3100            Some(make_config(tmp.path().to_path_buf())),
3101            instance_id,
3102            clock_rc,
3103        )
3104        .expect("boot store");
3105        store
3106            .open(
3107                instance_id,
3108                &RegisteredComponents::default(),
3109                Environment::Backtest,
3110            )
3111            .expect("open run");
3112        let run_id = store.run_id().expect("run open").to_string();
3113
3114        let trader_id = TraderId::from("TRADER-001");
3115        let strategy_id = StrategyId::from("S-001");
3116        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
3117        let client_order_id = ClientOrderId::from("O-20260510-000002");
3118        let order_init = OrderInitializedSpec::builder()
3119            .instrument_id(instrument_id)
3120            .client_order_id(client_order_id)
3121            .quantity(Quantity::from("1"))
3122            .time_in_force(TimeInForce::Gtc)
3123            .build();
3124        let submit_order = SubmitOrder::new(
3125            trader_id,
3126            Some(ClientId::from("BINANCE")),
3127            strategy_id,
3128            instrument_id,
3129            client_order_id,
3130            order_init,
3131            None,
3132            None,
3133            None,
3134            UUID4::new(),
3135            UnixNanos::from(3),
3136            None, // correlation_id
3137        );
3138        let command = TradingCommand::SubmitOrder(submit_order.clone());
3139
3140        let endpoint = MStr::<Endpoint>::from("test.exec.engine.envelope");
3141        msgbus::send_trading_command(endpoint, command);
3142
3143        let deadline = Instant::now() + Duration::from_secs(2);
3144
3145        loop {
3146            let hwm = store
3147                .session
3148                .as_ref()
3149                .map_or(0, EventStoreSession::high_watermark);
3150
3151            if hwm >= 2 {
3152                break;
3153            }
3154            assert!(
3155                Instant::now() < deadline,
3156                "captured TradingCommand did not commit within deadline (hwm={hwm})",
3157            );
3158            thread::sleep(Duration::from_millis(2));
3159        }
3160
3161        drop(store);
3162
3163        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3164            .expect("open sealed");
3165        let captured = sealed
3166            .scan_seq(2)
3167            .expect("scan")
3168            .expect("captured entry present");
3169        assert_eq!(
3170            captured.payload_type.as_str(),
3171            "SubmitOrder",
3172            "wrapper envelope must stamp the inner payload_type",
3173        );
3174        assert_eq!(captured.topic.as_ref(), endpoint.as_str());
3175
3176        let by_client = sealed
3177            .lookup(IndexKind::ClientOrderId, client_order_id.as_str())
3178            .expect("lookup")
3179            .expect("indexed");
3180        assert_eq!(by_client, 2);
3181
3182        // Round-trip the captured payload back through the inner-type decoder so the
3183        // bytes-equal-bare invariant is checked at the integration layer too: a mutation
3184        // that wrote the wrapper-typed bytes instead of the inner would fail here.
3185        let decoded: SubmitOrder =
3186            rmp_serde::from_slice(&captured.payload).expect("decode captured SubmitOrder");
3187        assert_eq!(decoded, submit_order);
3188    }
3189
3190    /// `publish_order_event` reaches the bus tap with `OrderEventAny::Filled(...)`; the
3191    /// envelope dispatcher must unwrap to `OrderFilled`, stamp `OrderFilled` as the
3192    /// `payload_type`, and commit both the `client_order_id` and `venue_order_id` indices.
3193    #[rstest]
3194    fn bus_tap_captures_order_event_any_envelope_with_inner_payload_type() {
3195        let tmp = TempDir::new().expect("tempdir");
3196        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3197        let instance_id = UUID4::new();
3198
3199        let mut store = EventStoreLifecycle::boot(
3200            Some(make_config(tmp.path().to_path_buf())),
3201            instance_id,
3202            clock_rc,
3203        )
3204        .expect("boot store");
3205        store
3206            .open(
3207                instance_id,
3208                &RegisteredComponents::default(),
3209                Environment::Backtest,
3210            )
3211            .expect("open run");
3212        let run_id = store.run_id().expect("run open").to_string();
3213
3214        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
3215        let client_order_id = ClientOrderId::from("O-20260510-000003");
3216        let venue_order_id = VenueOrderId::from("V-99");
3217        let filled = OrderFilledSpec::builder()
3218            .instrument_id(instrument_id)
3219            .client_order_id(client_order_id)
3220            .venue_order_id(venue_order_id)
3221            .account_id(AccountId::from("BINANCE-001"))
3222            .trade_id(TradeId::from("T-1"))
3223            .last_qty(Quantity::from("1"))
3224            .last_px(Price::from("100.00"))
3225            .currency(Currency::USDT())
3226            .ts_event(UnixNanos::from(10))
3227            .ts_init(UnixNanos::from(11))
3228            .commission(Money::new(0.10, Currency::USDT()))
3229            .build();
3230        let event = OrderEventAny::Filled(filled);
3231
3232        let topic: MStr<msgbus::Topic> = MStr::from("events.order.ETHUSDT-PERP.BINANCE");
3233        msgbus::publish_order_event(topic, &event);
3234
3235        let deadline = Instant::now() + Duration::from_secs(2);
3236
3237        loop {
3238            let hwm = store
3239                .session
3240                .as_ref()
3241                .map_or(0, EventStoreSession::high_watermark);
3242
3243            if hwm >= 2 {
3244                break;
3245            }
3246            assert!(
3247                Instant::now() < deadline,
3248                "captured OrderEventAny did not commit within deadline (hwm={hwm})",
3249            );
3250            thread::sleep(Duration::from_millis(2));
3251        }
3252
3253        drop(store);
3254
3255        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3256            .expect("open sealed");
3257        let captured = sealed
3258            .scan_seq(2)
3259            .expect("scan")
3260            .expect("captured entry present");
3261        assert_eq!(
3262            captured.payload_type.as_str(),
3263            "OrderFilled",
3264            "wrapper envelope must stamp the inner payload_type",
3265        );
3266        assert_eq!(captured.topic.as_ref(), topic.as_str());
3267
3268        let by_client = sealed
3269            .lookup(IndexKind::ClientOrderId, client_order_id.as_str())
3270            .expect("lookup")
3271            .expect("indexed");
3272        let by_venue = sealed
3273            .lookup(IndexKind::VenueOrderId, venue_order_id.as_str())
3274            .expect("lookup")
3275            .expect("indexed");
3276        assert_eq!(by_client, 2);
3277        assert_eq!(by_venue, 2);
3278
3279        // Round-trip the captured payload back through the inner-type decoder so a
3280        // mutation that wrote the wrapper-typed bytes instead of the inner would fail
3281        // here rather than only at the unit-level bytes-equal-bare check.
3282        let decoded: OrderFilled =
3283            rmp_serde::from_slice(&captured.payload).expect("decode captured OrderFilled");
3284        assert_eq!(decoded, filled);
3285    }
3286
3287    /// `send_data_command` reaches the bus tap with the [`DataCommand`] wrapper. The
3288    /// envelope dispatcher must unwrap to the request/subscription category, stamp that
3289    /// category as the `payload_type`, and write bytes that decode as the inner command
3290    /// enum.
3291    #[rstest]
3292    fn bus_tap_captures_data_command_envelopes_with_category_payload_types() {
3293        let tmp = TempDir::new().expect("tempdir");
3294        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3295        let instance_id = UUID4::new();
3296
3297        let mut store = EventStoreLifecycle::boot(
3298            Some(make_config(tmp.path().to_path_buf())),
3299            instance_id,
3300            clock_rc,
3301        )
3302        .expect("boot store");
3303        store
3304            .open(
3305                instance_id,
3306                &RegisteredComponents::default(),
3307                Environment::Backtest,
3308            )
3309            .expect("open run");
3310        let run_id = store.run_id().expect("run open").to_string();
3311
3312        let request = RequestCommand::Quotes(RequestQuotes::new(
3313            InstrumentId::from("ETHUSDT-PERP.BINANCE"),
3314            None,
3315            None,
3316            None,
3317            Some(ClientId::from("BINANCE")),
3318            UUID4::new(),
3319            UnixNanos::from(20),
3320            None,
3321        ));
3322        let subscribe = SubscribeCommand::Quotes(SubscribeQuotes::new(
3323            InstrumentId::from("ETHUSDT-PERP.BINANCE"),
3324            Some(ClientId::from("BINANCE")),
3325            Some(Venue::from("BINANCE")),
3326            UUID4::new(),
3327            UnixNanos::from(21),
3328            Some(UUID4::new()),
3329            None,
3330        ));
3331
3332        let request_endpoint = MStr::<Endpoint>::from("test.data.engine.request");
3333        msgbus::send_data_command(request_endpoint, DataCommand::Request(request.clone()));
3334
3335        let subscribe_endpoint = MStr::<Endpoint>::from("test.data.engine.subscribe");
3336        msgbus::send_data_command(
3337            subscribe_endpoint,
3338            DataCommand::Subscribe(subscribe.clone()),
3339        );
3340
3341        let deadline = Instant::now() + Duration::from_secs(2);
3342
3343        loop {
3344            let hwm = store
3345                .session
3346                .as_ref()
3347                .map_or(0, EventStoreSession::high_watermark);
3348
3349            if hwm >= 3 {
3350                break;
3351            }
3352            assert!(
3353                Instant::now() < deadline,
3354                "captured DataCommand entries did not commit within deadline (hwm={hwm})",
3355            );
3356            thread::sleep(Duration::from_millis(2));
3357        }
3358
3359        drop(store);
3360
3361        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3362            .expect("open sealed");
3363        let captured_request = sealed
3364            .scan_seq(2)
3365            .expect("scan request")
3366            .expect("captured request present");
3367        assert_eq!(captured_request.payload_type.as_str(), "RequestCommand");
3368        assert_eq!(captured_request.topic.as_ref(), request_endpoint.as_str());
3369
3370        let decoded_request: RequestCommand =
3371            rmp_serde::from_slice(&captured_request.payload).expect("decode RequestCommand");
3372        match (decoded_request, request) {
3373            (RequestCommand::Quotes(decoded), RequestCommand::Quotes(expected)) => {
3374                assert_eq!(decoded.request_id, expected.request_id);
3375                assert_eq!(decoded.instrument_id, expected.instrument_id);
3376                assert_eq!(decoded.client_id, expected.client_id);
3377                assert_eq!(decoded.ts_init, expected.ts_init);
3378            }
3379            other => panic!("expected RequestCommand::Quotes round trip, was {other:?}"),
3380        }
3381
3382        let captured_subscribe = sealed
3383            .scan_seq(3)
3384            .expect("scan subscribe")
3385            .expect("captured subscribe present");
3386        assert_eq!(captured_subscribe.payload_type.as_str(), "SubscribeCommand");
3387        assert_eq!(
3388            captured_subscribe.topic.as_ref(),
3389            subscribe_endpoint.as_str()
3390        );
3391
3392        let decoded_subscribe: SubscribeCommand =
3393            rmp_serde::from_slice(&captured_subscribe.payload).expect("decode SubscribeCommand");
3394        match (decoded_subscribe, subscribe) {
3395            (SubscribeCommand::Quotes(decoded), SubscribeCommand::Quotes(expected)) => {
3396                assert_eq!(decoded.command_id, expected.command_id);
3397                assert_eq!(decoded.instrument_id, expected.instrument_id);
3398                assert_eq!(decoded.client_id, expected.client_id);
3399                assert_eq!(decoded.venue, expected.venue);
3400                assert_eq!(decoded.ts_init, expected.ts_init);
3401                assert_eq!(decoded.correlation_id, expected.correlation_id);
3402            }
3403            other => panic!("expected SubscribeCommand::Quotes round trip, was {other:?}"),
3404        }
3405    }
3406
3407    // `send_response` dispatches through a correlation handler rather than an endpoint
3408    // or pub/sub topic. The bus tap must still capture the `DataResponse` envelope and
3409    // stamp the inner response category as the payload type.
3410    #[rstest]
3411    fn bus_tap_captures_data_response_sent_through_correlation_handler() {
3412        let tmp = TempDir::new().expect("tempdir");
3413        let clock_rc: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
3414        let instance_id = UUID4::new();
3415
3416        let mut store = EventStoreLifecycle::boot(
3417            Some(make_config(tmp.path().to_path_buf())),
3418            instance_id,
3419            clock_rc,
3420        )
3421        .expect("boot store");
3422        store
3423            .open(
3424                instance_id,
3425                &RegisteredComponents::default(),
3426                Environment::Backtest,
3427            )
3428            .expect("open run");
3429        let run_id = store.run_id().expect("run open").to_string();
3430
3431        let correlation_id = UUID4::new();
3432        let handler_called = Rc::new(RefCell::new(false));
3433        let handler_called_clone = handler_called.clone();
3434        msgbus::register_response_handler(
3435            &correlation_id,
3436            msgbus::ShareableMessageHandler::from_typed(move |_resp: &QuotesResponse| {
3437                *handler_called_clone.borrow_mut() = true;
3438            }),
3439        );
3440
3441        let response = QuotesResponse::new(
3442            correlation_id,
3443            ClientId::from("BINANCE"),
3444            InstrumentId::from("ETHUSDT-PERP.BINANCE"),
3445            vec![],
3446            None,
3447            None,
3448            UnixNanos::from(30),
3449            None,
3450        );
3451        msgbus::send_response(&correlation_id, &DataResponse::Quotes(response.clone()));
3452
3453        let deadline = Instant::now() + Duration::from_secs(2);
3454
3455        loop {
3456            let hwm = store
3457                .session
3458                .as_ref()
3459                .map_or(0, EventStoreSession::high_watermark);
3460
3461            if hwm >= 2 {
3462                break;
3463            }
3464            assert!(
3465                Instant::now() < deadline,
3466                "captured DataResponse did not commit within deadline (hwm={hwm})",
3467            );
3468            thread::sleep(Duration::from_millis(2));
3469        }
3470
3471        assert!(*handler_called.borrow());
3472        drop(store);
3473
3474        let sealed = RedbBackend::open_sealed(tmp.path(), &instance_id.to_string(), &run_id)
3475            .expect("open sealed");
3476        let captured = sealed
3477            .scan_seq(2)
3478            .expect("scan")
3479            .expect("captured response present");
3480        assert_eq!(captured.payload_type.as_str(), "QuotesResponse");
3481        assert_eq!(captured.topic, MessagingSwitchboard::data_response_topic());
3482
3483        let decoded: QuotesResponse =
3484            rmp_serde::from_slice(&captured.payload).expect("decode QuotesResponse");
3485        assert_eq!(decoded.correlation_id, response.correlation_id);
3486        assert_eq!(decoded.client_id, response.client_id);
3487        assert_eq!(decoded.instrument_id, response.instrument_id);
3488        assert_eq!(decoded.ts_init, response.ts_init);
3489        assert!(decoded.data.is_empty());
3490    }
3491}