Skip to main content

nautilus_system/
event_store.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kernel-facing seam for run-lifecycle event-sourcing.
17//!
18//! The [`KernelEventStore`] trait is the surface [`crate::kernel::NautilusKernel`] uses to wire
19//! a durable event-sourcing session into its boot, snapshot, and seal flow. The concrete
20//! implementation lives in `nautilus-event-store` so that crate can be developed and versioned
21//! independently of `nautilus-system`; callers inject an implementation through the builder
22//! (see [`crate::builder::NautilusKernelBuilder::with_event_store`]).
23
24use std::{cell::RefCell, fmt::Debug, path::PathBuf, rc::Rc, time::Duration};
25
26use indexmap::IndexMap;
27use nautilus_common::{cache::Cache, clock::Clock, enums::Environment};
28use nautilus_core::{UUID4, UnixNanos};
29use nautilus_execution::engine::SnapshotAnchorer;
30use serde::{Deserialize, Serialize};
31
32/// Factory closure invoked by the kernel to construct an injected event-store implementation.
33///
34/// Receives the kernel's instance id and clock so the resulting [`KernelEventStore`]
35/// implementation scans the same on-disk run directory the kernel later passes to
36/// `restore_parent_cache`/`open`, and stamps lifecycle timestamps against the same time
37/// source the kernel uses.
38pub type EventStoreFactory = Box<
39    dyn FnOnce(UUID4, Rc<RefCell<dyn Clock>>) -> anyhow::Result<Box<dyn KernelEventStore>>
40        + 'static,
41>;
42
43/// The component manifest captured into the event-store `RunStarted` entry.
44///
45/// Replay binds actors, strategies, algorithms, subscriptions, and command endpoints from
46/// this manifest without consulting external configuration.
47#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
48pub struct RegisteredComponents {
49    /// Registered actor ids and their config hashes.
50    pub actors: IndexMap<String, String>,
51    /// Registered strategy ids and their config hashes.
52    pub strategies: IndexMap<String, String>,
53    /// Registered algorithm ids and their config hashes.
54    pub algorithms: IndexMap<String, String>,
55    /// Subscription bindings active at run start.
56    pub subscriptions: Vec<String>,
57    /// Endpoint registrations active at run start.
58    pub endpoints: Vec<String>,
59}
60
61/// Kernel-facing seam for event-sourcing lifecycle integration.
62///
63/// `NautilusKernel` drives the open/restore/seal sequence through this trait so the concrete
64/// event-store machinery (writers, readers, bus tap, redb backend) lives outside
65/// `nautilus-system`. Implementations are typically built by the caller and injected via
66/// [`crate::builder::NautilusKernelBuilder::with_event_store`].
67pub trait KernelEventStore: Debug {
68    /// Restores cache state from a configured replay source or recovered parent run.
69    ///
70    /// Implementations may open a sealed replay source, validate its snapshot anchor, and
71    /// replay the tail directly into `cache`. The kernel calls this once before [`Self::open`].
72    ///
73    /// # Errors
74    ///
75    /// Returns an error when the source reader, snapshot restore, decode, or cache apply
76    /// step fails.
77    fn restore_parent_cache(&mut self, instance_id: UUID4, cache: &mut Cache)
78    -> anyhow::Result<()>;
79
80    /// Opens a fresh run for the current kernel session.
81    ///
82    /// `components` carries the registered manifest written to the run's `RunStarted` entry.
83    /// `environment` selects the clock source the implementation uses to stamp publish
84    /// timestamps. Idempotency across reset/rerun is the implementation's responsibility.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error when opening the new run, spawning the writer, or blocking on the
89    /// initial entry ack fails.
90    fn open(
91        &mut self,
92        instance_id: UUID4,
93        components: &RegisteredComponents,
94        environment: Environment,
95    ) -> anyhow::Result<()>;
96
97    /// Returns a snapshot anchorer for the currently open run, when capture is active.
98    ///
99    /// The execution engine installs the returned callback so position snapshots commit a
100    /// matching anchor entry against the durable high-watermark.
101    fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer>;
102
103    /// Seals the open run by writing the terminal entry and updating the manifest.
104    ///
105    /// Idempotent: a closed or absent session is a no-op. Halted sessions defer the seal to
106    /// the next-boot recovery sweep.
107    fn seal(&mut self, ts_init: UnixNanos);
108
109    /// Returns the run id of the currently open run, when capture is active.
110    fn run_id(&self) -> Option<&str>;
111
112    /// Returns the configured replay source or recovered parent run id, when present.
113    fn parent_run_id(&self) -> Option<&str>;
114
115    /// Returns whether the current config enables event-store replay.
116    ///
117    /// Event-store replay restores cache state and opens a child run for inspection. The kernel
118    /// promotes this config state to runtime state only after restore and open both succeed.
119    fn is_event_store_replay_configured(&self) -> bool {
120        false
121    }
122
123    /// Returns whether the implementation has signaled a fail-stop condition.
124    fn is_halted(&self) -> bool;
125}
126
127/// How the supervisor prunes sealed run files.
128///
129/// The kernel records the choice in the manifest's `feature_flags`; actual retention
130/// enforcement is performed by a separate supervisor process and is out of scope for
131/// the kernel boot path.
132#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
133pub enum RetentionMode {
134    /// Keep every sealed run; never reclaim.
135    #[default]
136    Full,
137    /// Keep at most `keep_last` sealed runs; the supervisor reclaims older files.
138    Bounded {
139        /// The number of sealed runs to retain.
140        keep_last: usize,
141    },
142    /// Keep the manifest plus a snapshot anchor and the tail since the anchor; older
143    /// entries reclaim once a newer anchor is durable.
144    SnapshotAnchored,
145}
146
147/// Per-run identification data the kernel populates from build metadata.
148///
149/// The kernel records what is available at run start; downstream binaries refine these
150/// values when their build-time wiring populates them. Defaults are placeholders so the
151/// kernel can boot before the binary-hash and crate-versions wiring is finalized.
152#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
153pub struct RunIdentity {
154    /// A hex-encoded hash of the trader binary.
155    pub binary_hash: String,
156    /// The entry payload schema version.
157    pub schema_version: u32,
158    /// A hex-encoded hash of `Cargo.lock` or an equivalent crate version manifest.
159    pub crate_versions: String,
160    /// The active Cargo features for the trader binary.
161    pub feature_flags: Vec<String>,
162    /// Per-adapter version stamp keyed by adapter name.
163    pub adapter_versions: IndexMap<String, String>,
164    /// A hex-encoded hash of the kernel configuration.
165    pub config_hash: String,
166    /// The deterministic seed, populated when the run executes under a seeded mode.
167    pub seed: Option<u64>,
168}
169
170/// The id of a captured run: `<start_ts_init>-<short_uuid>`, sortable by start time.
171///
172/// The runtime constructs this from the kernel's start timestamp plus a fresh `UUID4` so
173/// the representation stays stable across processes and platforms.
174pub type RunId = String;
175
176/// Default maximum interval between data-marker cursor snapshots when no entry boundary occurs.
177pub const DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
178/// Default capacity of the data-marker writer's bounded submit channel.
179pub const DEFAULT_DATA_MARKER_CHANNEL_CAPACITY: usize = 10_000;
180
181/// Market-data class enabled for data-marker capture.
182#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
183pub enum DataMarkerClass {
184    /// Order-book delta stream.
185    BookDeltas,
186    /// Level-10 order-book snapshot stream.
187    BookDepth10,
188    /// Quote (level-1 bid/ask) stream.
189    Quote,
190    /// Trade (last sale) stream.
191    Trade,
192    /// Bar (OHLCV aggregate) stream.
193    Bar,
194}
195
196impl DataMarkerClass {
197    /// All builtin data-marker classes in canonical order.
198    pub const ALL: [Self; 5] = [
199        Self::BookDeltas,
200        Self::BookDepth10,
201        Self::Quote,
202        Self::Trade,
203        Self::Bar,
204    ];
205}
206
207/// Opt-in data-marker sidecar settings for an event-store run.
208#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
209pub struct DataMarkerConfig {
210    /// Market-data classes captured into marker cursors.
211    #[serde(default = "default_data_marker_classes")]
212    pub classes: Vec<DataMarkerClass>,
213    /// Maximum interval between cursor snapshots when data advances without entry submissions.
214    #[serde(default = "default_data_marker_safety_flush_interval")]
215    pub safety_flush_interval: Duration,
216    /// Capacity of the marker writer's bounded submit channel.
217    #[serde(default = "default_data_marker_channel_capacity")]
218    pub channel_capacity: usize,
219    /// Instrument identifiers that emit one high-fidelity marker per observed data message.
220    #[serde(default)]
221    pub high_fidelity: Vec<String>,
222}
223
224impl Default for DataMarkerConfig {
225    fn default() -> Self {
226        Self {
227            classes: default_data_marker_classes(),
228            safety_flush_interval: DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL,
229            channel_capacity: DEFAULT_DATA_MARKER_CHANNEL_CAPACITY,
230            high_fidelity: Vec::new(),
231        }
232    }
233}
234
235fn default_data_marker_classes() -> Vec<DataMarkerClass> {
236    DataMarkerClass::ALL.to_vec()
237}
238
239const fn default_data_marker_safety_flush_interval() -> Duration {
240    DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL
241}
242
243const fn default_data_marker_channel_capacity() -> usize {
244    DEFAULT_DATA_MARKER_CHANNEL_CAPACITY
245}
246
247/// Configuration for the kernel-managed event store run lifecycle.
248#[derive(Clone, Debug, Serialize, Deserialize)]
249pub struct EventStoreConfig {
250    /// Root directory; the backend creates `<base_dir>/<instance_id>/<run_id>.redb`.
251    pub base_dir: PathBuf,
252    /// Stable identification for this trader instance and binary.
253    pub identity: RunIdentity,
254    /// How the supervisor reclaims sealed run files.
255    pub retention: RetentionMode,
256    /// Sealed run to restore cache state from before opening a fresh run.
257    ///
258    /// When set, this enables event-store replay: the kernel restores cache state from this run,
259    /// records it as the parent link for the fresh child run, and then skips engines, clients,
260    /// trader startup, and live reconciliation. Quarantined runs are rejected.
261    pub replay_from_run_id: Option<RunId>,
262    /// Data-marker sidecar settings. `None` disables marker capture for the run.
263    #[serde(default)]
264    pub data_markers: Option<DataMarkerConfig>,
265    /// Capacity of the writer's bounded submit channel.
266    pub channel_capacity: usize,
267    /// Maximum entries collected before the writer forces a commit.
268    pub max_batch_entries: usize,
269    /// Maximum time a batch may accumulate before the writer forces a commit.
270    pub max_batch_latency: Duration,
271    /// Submit-side stall ceiling that triggers writer fail-stop.
272    pub halt_threshold: Duration,
273    /// Maximum time to wait for the `RunStarted` entry to durably commit before the
274    /// kernel surfaces an event-store boot error.
275    pub run_started_timeout: Duration,
276}
277
278impl Default for EventStoreConfig {
279    fn default() -> Self {
280        Self {
281            base_dir: PathBuf::new(),
282            identity: RunIdentity::default(),
283            retention: RetentionMode::default(),
284            replay_from_run_id: None,
285            data_markers: None,
286            channel_capacity: 10_000,
287            max_batch_entries: 100,
288            max_batch_latency: Duration::from_millis(5),
289            halt_threshold: Duration::from_millis(250),
290            run_started_timeout: Duration::from_secs(5),
291        }
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use rstest::rstest;
298
299    use super::*;
300
301    #[rstest]
302    fn event_store_config_serde_roundtrip() {
303        let config = EventStoreConfig::default();
304        let json = serde_json::to_string(&config).expect("serialize");
305        let restored: EventStoreConfig = serde_json::from_str(&json).expect("deserialize");
306
307        assert_eq!(restored.channel_capacity, config.channel_capacity);
308        assert_eq!(restored.max_batch_entries, config.max_batch_entries);
309        assert_eq!(restored.max_batch_latency, config.max_batch_latency);
310        assert_eq!(restored.halt_threshold, config.halt_threshold);
311        assert_eq!(restored.run_started_timeout, config.run_started_timeout);
312        assert_eq!(restored.base_dir, config.base_dir);
313        assert_eq!(restored.retention, config.retention);
314        assert_eq!(restored.replay_from_run_id, config.replay_from_run_id);
315        assert_eq!(restored.identity, config.identity);
316        assert_eq!(restored.data_markers, config.data_markers);
317    }
318
319    #[rstest]
320    fn data_marker_config_serde_roundtrip() {
321        let config = EventStoreConfig {
322            data_markers: Some(DataMarkerConfig {
323                classes: vec![DataMarkerClass::Quote, DataMarkerClass::BookDeltas],
324                safety_flush_interval: Duration::from_millis(250),
325                channel_capacity: 512,
326                high_fidelity: vec!["ETHUSDT-PERP.BINANCE".to_string()],
327            }),
328            ..Default::default()
329        };
330        let json = serde_json::to_string(&config).expect("serialize");
331        let restored: EventStoreConfig = serde_json::from_str(&json).expect("deserialize");
332
333        assert_eq!(restored.data_markers, config.data_markers);
334    }
335
336    #[rstest]
337    fn retention_mode_serde_roundtrip() {
338        for mode in [
339            RetentionMode::Full,
340            RetentionMode::Bounded { keep_last: 5 },
341            RetentionMode::SnapshotAnchored,
342        ] {
343            let json = serde_json::to_string(&mode).expect("serialize");
344            let restored: RetentionMode = serde_json::from_str(&json).expect("deserialize");
345            assert_eq!(restored, mode);
346        }
347    }
348
349    #[rstest]
350    fn event_store_config_default_values() {
351        let config = EventStoreConfig::default();
352
353        assert_eq!(config.channel_capacity, 10_000);
354        assert_eq!(config.max_batch_entries, 100);
355        assert_eq!(config.max_batch_latency, Duration::from_millis(5));
356        assert_eq!(config.halt_threshold, Duration::from_millis(250));
357        assert_eq!(config.run_started_timeout, Duration::from_secs(5));
358        assert_eq!(config.base_dir, PathBuf::new());
359        assert_eq!(config.retention, RetentionMode::Full);
360        assert!(config.replay_from_run_id.is_none());
361        assert_eq!(config.identity, RunIdentity::default());
362        assert!(config.data_markers.is_none());
363    }
364
365    #[rstest]
366    fn data_markers_default_is_none() {
367        let config = EventStoreConfig::default();
368
369        assert!(config.data_markers.is_none());
370    }
371}