use std::{cell::RefCell, fmt::Debug, path::PathBuf, rc::Rc, time::Duration};
use indexmap::IndexMap;
use nautilus_common::{cache::Cache, clock::Clock, enums::Environment};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_execution::engine::SnapshotAnchorer;
use serde::{Deserialize, Serialize};
pub type EventStoreFactory = Box<
dyn FnOnce(UUID4, Rc<RefCell<dyn Clock>>) -> anyhow::Result<Box<dyn KernelEventStore>>
+ 'static,
>;
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct RegisteredComponents {
pub actors: IndexMap<String, String>,
pub strategies: IndexMap<String, String>,
pub algorithms: IndexMap<String, String>,
pub subscriptions: Vec<String>,
pub endpoints: Vec<String>,
}
pub trait KernelEventStore: Debug {
fn restore_parent_cache(&mut self, instance_id: UUID4, cache: &mut Cache)
-> anyhow::Result<()>;
fn open(
&mut self,
instance_id: UUID4,
components: &RegisteredComponents,
environment: Environment,
) -> anyhow::Result<()>;
fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer>;
fn seal(&mut self, ts_init: UnixNanos);
fn run_id(&self) -> Option<&str>;
fn parent_run_id(&self) -> Option<&str>;
fn is_event_store_replay_configured(&self) -> bool {
false
}
fn is_halted(&self) -> bool;
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RetentionMode {
#[default]
Full,
Bounded {
keep_last: usize,
},
SnapshotAnchored,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct RunIdentity {
pub binary_hash: String,
pub schema_version: u32,
pub crate_versions: String,
pub feature_flags: Vec<String>,
pub adapter_versions: IndexMap<String, String>,
pub config_hash: String,
pub seed: Option<u64>,
}
pub type RunId = String;
pub const DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
pub const DEFAULT_DATA_MARKER_CHANNEL_CAPACITY: usize = 10_000;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum DataMarkerClass {
BookDeltas,
BookDepth10,
Quote,
Trade,
Bar,
}
impl DataMarkerClass {
pub const ALL: [Self; 5] = [
Self::BookDeltas,
Self::BookDepth10,
Self::Quote,
Self::Trade,
Self::Bar,
];
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct DataMarkerConfig {
#[serde(default = "default_data_marker_classes")]
pub classes: Vec<DataMarkerClass>,
#[serde(default = "default_data_marker_safety_flush_interval")]
pub safety_flush_interval: Duration,
#[serde(default = "default_data_marker_channel_capacity")]
pub channel_capacity: usize,
#[serde(default)]
pub high_fidelity: Vec<String>,
}
impl Default for DataMarkerConfig {
fn default() -> Self {
Self {
classes: default_data_marker_classes(),
safety_flush_interval: DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL,
channel_capacity: DEFAULT_DATA_MARKER_CHANNEL_CAPACITY,
high_fidelity: Vec::new(),
}
}
}
fn default_data_marker_classes() -> Vec<DataMarkerClass> {
DataMarkerClass::ALL.to_vec()
}
const fn default_data_marker_safety_flush_interval() -> Duration {
DEFAULT_DATA_MARKER_SAFETY_FLUSH_INTERVAL
}
const fn default_data_marker_channel_capacity() -> usize {
DEFAULT_DATA_MARKER_CHANNEL_CAPACITY
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EventStoreConfig {
pub base_dir: PathBuf,
pub identity: RunIdentity,
pub retention: RetentionMode,
pub replay_from_run_id: Option<RunId>,
#[serde(default)]
pub data_markers: Option<DataMarkerConfig>,
pub channel_capacity: usize,
pub max_batch_entries: usize,
pub max_batch_latency: Duration,
pub halt_threshold: Duration,
pub run_started_timeout: Duration,
}
impl Default for EventStoreConfig {
fn default() -> Self {
Self {
base_dir: PathBuf::new(),
identity: RunIdentity::default(),
retention: RetentionMode::default(),
replay_from_run_id: None,
data_markers: None,
channel_capacity: 10_000,
max_batch_entries: 100,
max_batch_latency: Duration::from_millis(5),
halt_threshold: Duration::from_millis(250),
run_started_timeout: Duration::from_secs(5),
}
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
#[rstest]
fn event_store_config_serde_roundtrip() {
let config = EventStoreConfig::default();
let json = serde_json::to_string(&config).expect("serialize");
let restored: EventStoreConfig = serde_json::from_str(&json).expect("deserialize");
assert_eq!(restored.channel_capacity, config.channel_capacity);
assert_eq!(restored.max_batch_entries, config.max_batch_entries);
assert_eq!(restored.max_batch_latency, config.max_batch_latency);
assert_eq!(restored.halt_threshold, config.halt_threshold);
assert_eq!(restored.run_started_timeout, config.run_started_timeout);
assert_eq!(restored.base_dir, config.base_dir);
assert_eq!(restored.retention, config.retention);
assert_eq!(restored.replay_from_run_id, config.replay_from_run_id);
assert_eq!(restored.identity, config.identity);
assert_eq!(restored.data_markers, config.data_markers);
}
#[rstest]
fn data_marker_config_serde_roundtrip() {
let config = EventStoreConfig {
data_markers: Some(DataMarkerConfig {
classes: vec![DataMarkerClass::Quote, DataMarkerClass::BookDeltas],
safety_flush_interval: Duration::from_millis(250),
channel_capacity: 512,
high_fidelity: vec!["ETHUSDT-PERP.BINANCE".to_string()],
}),
..Default::default()
};
let json = serde_json::to_string(&config).expect("serialize");
let restored: EventStoreConfig = serde_json::from_str(&json).expect("deserialize");
assert_eq!(restored.data_markers, config.data_markers);
}
#[rstest]
fn retention_mode_serde_roundtrip() {
for mode in [
RetentionMode::Full,
RetentionMode::Bounded { keep_last: 5 },
RetentionMode::SnapshotAnchored,
] {
let json = serde_json::to_string(&mode).expect("serialize");
let restored: RetentionMode = serde_json::from_str(&json).expect("deserialize");
assert_eq!(restored, mode);
}
}
#[rstest]
fn event_store_config_default_values() {
let config = EventStoreConfig::default();
assert_eq!(config.channel_capacity, 10_000);
assert_eq!(config.max_batch_entries, 100);
assert_eq!(config.max_batch_latency, Duration::from_millis(5));
assert_eq!(config.halt_threshold, Duration::from_millis(250));
assert_eq!(config.run_started_timeout, Duration::from_secs(5));
assert_eq!(config.base_dir, PathBuf::new());
assert_eq!(config.retention, RetentionMode::Full);
assert!(config.replay_from_run_id.is_none());
assert_eq!(config.identity, RunIdentity::default());
assert!(config.data_markers.is_none());
}
#[rstest]
fn data_markers_default_is_none() {
let config = EventStoreConfig::default();
assert!(config.data_markers.is_none());
}
}