nautilus_system/event_store.rs
1// -------------------------------------------------------------------------------------------------
2// Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3// https://nautechsystems.io
4//
5// Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6// You may not use this file except in compliance with the License.
7// You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Kernel-facing seam for run-lifecycle event-sourcing.
17//!
18//! The [`KernelEventStore`] trait is the surface [`crate::kernel::NautilusKernel`] uses to wire
19//! a durable event-sourcing session into its boot, snapshot, and seal flow. The concrete
20//! implementation lives in `nautilus-event-store` so that crate can be developed and versioned
21//! independently of `nautilus-system`; callers inject an implementation through the builder
22//! (see [`crate::builder::NautilusKernelBuilder::with_event_store`]).
23
24use std::{cell::RefCell, fmt::Debug, path::PathBuf, rc::Rc, time::Duration};
25
26use indexmap::IndexMap;
27use nautilus_common::{cache::Cache, clock::Clock, enums::Environment};
28use nautilus_core::{UUID4, UnixNanos};
29use nautilus_execution::engine::SnapshotAnchorer;
30use serde::{Deserialize, Serialize};
31
32/// Factory closure invoked by the kernel to construct an injected event-store implementation.
33///
34/// Receives the kernel's instance id and clock so the resulting [`KernelEventStore`]
35/// implementation scans the same on-disk run directory the kernel later passes to
36/// `restore_parent_cache`/`open`, and stamps lifecycle timestamps against the same time
37/// source the kernel uses.
38pub type EventStoreFactory = Box<
39 dyn FnOnce(UUID4, Rc<RefCell<dyn Clock>>) -> anyhow::Result<Box<dyn KernelEventStore>>
40 + 'static,
41>;
42
43/// The component manifest captured into the event-store `RunStarted` entry.
44///
45/// Replay binds actors, strategies, algorithms, subscriptions, and command endpoints from
46/// this manifest without consulting external configuration.
47#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
48pub struct RegisteredComponents {
49 /// Registered actor ids and their config hashes.
50 pub actors: IndexMap<String, String>,
51 /// Registered strategy ids and their config hashes.
52 pub strategies: IndexMap<String, String>,
53 /// Registered algorithm ids and their config hashes.
54 pub algorithms: IndexMap<String, String>,
55 /// Subscription bindings active at run start.
56 pub subscriptions: Vec<String>,
57 /// Endpoint registrations active at run start.
58 pub endpoints: Vec<String>,
59}
60
61/// Kernel-facing seam for event-sourcing lifecycle integration.
62///
63/// `NautilusKernel` drives the open/restore/seal sequence through this trait so the concrete
64/// event-store machinery (writers, readers, bus tap, redb backend) lives outside
65/// `nautilus-system`. Implementations are typically built by the caller and injected via
66/// [`crate::builder::NautilusKernelBuilder::with_event_store`].
67pub trait KernelEventStore: Debug {
68 /// Restores cache state from a configured replay source or recovered parent run.
69 ///
70 /// Implementations may open a sealed replay source, validate its snapshot anchor, and
71 /// replay the tail directly into `cache`. The kernel calls this once before [`Self::open`].
72 ///
73 /// # Errors
74 ///
75 /// Returns an error when the source reader, snapshot restore, decode, or cache apply
76 /// step fails.
77 fn restore_parent_cache(&mut self, instance_id: UUID4, cache: &mut Cache)
78 -> anyhow::Result<()>;
79
80 /// Opens a fresh run for the current kernel session.
81 ///
82 /// `components` carries the registered manifest written to the run's `RunStarted` entry.
83 /// `environment` selects the clock source the implementation uses to stamp publish
84 /// timestamps. Idempotency across reset/rerun is the implementation's responsibility.
85 ///
86 /// # Errors
87 ///
88 /// Returns an error when opening the new run, spawning the writer, or blocking on the
89 /// initial entry ack fails.
90 fn open(
91 &mut self,
92 instance_id: UUID4,
93 components: &RegisteredComponents,
94 environment: Environment,
95 ) -> anyhow::Result<()>;
96
97 /// Returns a snapshot anchorer for the currently open run, when capture is active.
98 ///
99 /// The execution engine installs the returned callback so position snapshots commit a
100 /// matching anchor entry against the durable high-watermark.
101 fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer>;
102
103 /// Seals the open run by writing the terminal entry and updating the manifest.
104 ///
105 /// Idempotent: a closed or absent session is a no-op. Halted sessions defer the seal to
106 /// the next-boot recovery sweep.
107 fn seal(&mut self, ts_init: UnixNanos);
108
109 /// Returns the run id of the currently open run, when capture is active.
110 fn run_id(&self) -> Option<&str>;
111
112 /// Returns the configured replay source or recovered parent run id, when present.
113 fn parent_run_id(&self) -> Option<&str>;
114
115 /// Returns whether the current config enables event-store replay.
116 ///
117 /// Event-store replay restores cache state and opens a child run for inspection. The kernel
118 /// promotes this config state to runtime state only after restore and open both succeed.
119 fn is_event_store_replay_configured(&self) -> bool {
120 false
121 }
122
123 /// Returns whether the implementation has signaled a fail-stop condition.
124 fn is_halted(&self) -> bool;
125}
126
127/// How the supervisor prunes sealed run files.
128///
129/// The kernel records the choice in the manifest's `feature_flags`; actual retention
130/// enforcement is performed by a separate supervisor process and is out of scope for
131/// the kernel boot path.
132#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
133pub enum RetentionMode {
134 /// Keep every sealed run; never reclaim.
135 #[default]
136 Full,
137 /// Keep at most `keep_last` sealed runs; the supervisor reclaims older files.
138 Bounded {
139 /// The number of sealed runs to retain.
140 keep_last: usize,
141 },
142 /// Keep the manifest plus a snapshot anchor and the tail since the anchor; older
143 /// entries reclaim once a newer anchor is durable.
144 SnapshotAnchored,
145}
146
147/// Per-run identification data the kernel populates from build metadata.
148///
149/// The kernel records what is available at run start; downstream binaries refine these
150/// values when their build-time wiring populates them. Defaults are placeholders so the
151/// kernel can boot before the binary-hash and crate-versions wiring is finalized.
152#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
153pub struct RunIdentity {
154 /// A hex-encoded hash of the trader binary.
155 pub binary_hash: String,
156 /// The entry payload schema version.
157 pub schema_version: u32,
158 /// A hex-encoded hash of `Cargo.lock` or an equivalent crate version manifest.
159 pub crate_versions: String,
160 /// The active Cargo features for the trader binary.
161 pub feature_flags: Vec<String>,
162 /// Per-adapter version stamp keyed by adapter name.
163 pub adapter_versions: IndexMap<String, String>,
164 /// A hex-encoded hash of the kernel configuration.
165 pub config_hash: String,
166 /// The deterministic seed, populated when the run executes under a seeded mode.
167 pub seed: Option<u64>,
168}
169
170/// The id of a captured run: `<start_ts_init>-<short_uuid>`, sortable by start time.
171///
172/// The runtime constructs this from the kernel's start timestamp plus a fresh `UUID4` so
173/// the representation stays stable across processes and platforms.
174pub type RunId = String;
175
176/// Default maximum interval between data-marker cursor snapshots when no entry boundary occurs.
177pub const DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
178/// Default capacity of the data-marker writer's bounded submit channel.
179pub const DEFAULT_DATA_MARKER_CHANNEL_CAPACITY: usize = 10_000;
180
181/// Market-data class enabled for data-marker capture.
182#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
183pub enum DataMarkerClass {
184 /// Order-book delta stream.
185 BookDeltas,
186 /// Level-10 order-book snapshot stream.
187 BookDepth10,
188 /// Quote (level-1 bid/ask) stream.
189 Quote,
190 /// Trade (last sale) stream.
191 Trade,
192 /// Bar (OHLCV aggregate) stream.
193 Bar,
194}
195
196impl DataMarkerClass {
197 /// All builtin data-marker classes in canonical order.
198 pub const ALL: [Self; 5] = [
199 Self::BookDeltas,
200 Self::BookDepth10,
201 Self::Quote,
202 Self::Trade,
203 Self::Bar,
204 ];
205}
206
207/// Opt-in data-marker sidecar settings for an event-store run.
208#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
209pub struct DataMarkerConfig {
210 /// Market-data classes captured into marker cursors.
211 #[serde(default = "default_data_marker_classes")]
212 pub classes: Vec<DataMarkerClass>,
213 /// Maximum interval between cursor snapshots when data advances without entry submissions.
214 #[serde(default = "default_data_marker_safety_flush_interval")]
215 pub safety_flush_interval: Duration,
216 /// Capacity of the marker writer's bounded submit channel.
217 #[serde(default = "default_data_marker_channel_capacity")]
218 pub channel_capacity: usize,
219 /// Instrument identifiers that emit one high-fidelity marker per observed data message.
220 #[serde(default)]
221 pub high_fidelity: Vec<String>,
222}
223
224impl Default for DataMarkerConfig {
225 fn default() -> Self {
226 Self {
227 classes: default_data_marker_classes(),
228 safety_flush_interval: DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL,
229 channel_capacity: DEFAULT_DATA_MARKER_CHANNEL_CAPACITY,
230 high_fidelity: Vec::new(),
231 }
232 }
233}
234
235fn default_data_marker_classes() -> Vec<DataMarkerClass> {
236 DataMarkerClass::ALL.to_vec()
237}
238
239const fn default_data_marker_safety_flush_interval() -> Duration {
240 DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL
241}
242
243const fn default_data_marker_channel_capacity() -> usize {
244 DEFAULT_DATA_MARKER_CHANNEL_CAPACITY
245}
246
247/// Configuration for the kernel-managed event store run lifecycle.
248#[derive(Clone, Debug, Serialize, Deserialize)]
249pub struct EventStoreConfig {
250 /// Root directory; the backend creates `<base_dir>/<instance_id>/<run_id>.redb`.
251 pub base_dir: PathBuf,
252 /// Stable identification for this trader instance and binary.
253 pub identity: RunIdentity,
254 /// How the supervisor reclaims sealed run files.
255 pub retention: RetentionMode,
256 /// Sealed run to restore cache state from before opening a fresh run.
257 ///
258 /// When set, this enables event-store replay: the kernel restores cache state from this run,
259 /// records it as the parent link for the fresh child run, and then skips engines, clients,
260 /// trader startup, and live reconciliation. Quarantined runs are rejected.
261 pub replay_from_run_id: Option<RunId>,
262 /// Data-marker sidecar settings. `None` disables marker capture for the run.
263 #[serde(default)]
264 pub data_markers: Option<DataMarkerConfig>,
265 /// Capacity of the writer's bounded submit channel.
266 pub channel_capacity: usize,
267 /// Maximum entries collected before the writer forces a commit.
268 pub max_batch_entries: usize,
269 /// Maximum time a batch may accumulate before the writer forces a commit.
270 pub max_batch_latency: Duration,
271 /// Submit-side stall ceiling that triggers writer fail-stop.
272 pub halt_threshold: Duration,
273 /// Maximum time to wait for the `RunStarted` entry to durably commit before the
274 /// kernel surfaces an event-store boot error.
275 pub run_started_timeout: Duration,
276}
277
278impl Default for EventStoreConfig {
279 fn default() -> Self {
280 Self {
281 base_dir: PathBuf::new(),
282 identity: RunIdentity::default(),
283 retention: RetentionMode::default(),
284 replay_from_run_id: None,
285 data_markers: None,
286 channel_capacity: 10_000,
287 max_batch_entries: 100,
288 max_batch_latency: Duration::from_millis(5),
289 halt_threshold: Duration::from_millis(250),
290 run_started_timeout: Duration::from_secs(5),
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use rstest::rstest;
298
299 use super::*;
300
301 #[rstest]
302 fn event_store_config_serde_roundtrip() {
303 let config = EventStoreConfig::default();
304 let json = serde_json::to_string(&config).expect("serialize");
305 let restored: EventStoreConfig = serde_json::from_str(&json).expect("deserialize");
306
307 assert_eq!(restored.channel_capacity, config.channel_capacity);
308 assert_eq!(restored.max_batch_entries, config.max_batch_entries);
309 assert_eq!(restored.max_batch_latency, config.max_batch_latency);
310 assert_eq!(restored.halt_threshold, config.halt_threshold);
311 assert_eq!(restored.run_started_timeout, config.run_started_timeout);
312 assert_eq!(restored.base_dir, config.base_dir);
313 assert_eq!(restored.retention, config.retention);
314 assert_eq!(restored.replay_from_run_id, config.replay_from_run_id);
315 assert_eq!(restored.identity, config.identity);
316 assert_eq!(restored.data_markers, config.data_markers);
317 }
318
319 #[rstest]
320 fn data_marker_config_serde_roundtrip() {
321 let config = EventStoreConfig {
322 data_markers: Some(DataMarkerConfig {
323 classes: vec![DataMarkerClass::Quote, DataMarkerClass::BookDeltas],
324 safety_flush_interval: Duration::from_millis(250),
325 channel_capacity: 512,
326 high_fidelity: vec!["ETHUSDT-PERP.BINANCE".to_string()],
327 }),
328 ..Default::default()
329 };
330 let json = serde_json::to_string(&config).expect("serialize");
331 let restored: EventStoreConfig = serde_json::from_str(&json).expect("deserialize");
332
333 assert_eq!(restored.data_markers, config.data_markers);
334 }
335
336 #[rstest]
337 fn retention_mode_serde_roundtrip() {
338 for mode in [
339 RetentionMode::Full,
340 RetentionMode::Bounded { keep_last: 5 },
341 RetentionMode::SnapshotAnchored,
342 ] {
343 let json = serde_json::to_string(&mode).expect("serialize");
344 let restored: RetentionMode = serde_json::from_str(&json).expect("deserialize");
345 assert_eq!(restored, mode);
346 }
347 }
348
349 #[rstest]
350 fn event_store_config_default_values() {
351 let config = EventStoreConfig::default();
352
353 assert_eq!(config.channel_capacity, 10_000);
354 assert_eq!(config.max_batch_entries, 100);
355 assert_eq!(config.max_batch_latency, Duration::from_millis(5));
356 assert_eq!(config.halt_threshold, Duration::from_millis(250));
357 assert_eq!(config.run_started_timeout, Duration::from_secs(5));
358 assert_eq!(config.base_dir, PathBuf::new());
359 assert_eq!(config.retention, RetentionMode::Full);
360 assert!(config.replay_from_run_id.is_none());
361 assert_eq!(config.identity, RunIdentity::default());
362 assert!(config.data_markers.is_none());
363 }
364
365 #[rstest]
366 fn data_markers_default_is_none() {
367 let config = EventStoreConfig::default();
368
369 assert!(config.data_markers.is_none());
370 }
371}