use std::{collections::HashMap, str::FromStr, time::Duration};
use ahash::AHashMap;
use nautilus_common::{
cache::CacheConfig, enums::Environment, logging::logger::LoggerConfig,
msgbus::database::MessageBusConfig, throttler::RateLimit,
};
use nautilus_core::{UUID4, datetime::NANOSECONDS_IN_SECOND};
use nautilus_data::engine::config::DataEngineConfig;
use nautilus_execution::{
engine::config::ExecutionEngineConfig, order_emulator::config::OrderEmulatorConfig,
};
use nautilus_model::{
enums::{BarAggregation, BarIntervalType},
identifiers::{ClientId, ClientOrderId, InstrumentId, TraderId},
};
use nautilus_portfolio::config::PortfolioConfig;
use nautilus_risk::engine::config::RiskEngineConfig;
use nautilus_system::config::{NautilusKernelConfig, StreamingConfig};
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
const DEFAULT_ORDER_RATE_LIMIT: &str = "100/00:00:01";
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct LiveDataEngineConfig {
#[builder(default = true)]
pub time_bars_build_with_no_updates: bool,
#[builder(default = true)]
pub time_bars_timestamp_on_close: bool,
#[builder(default)]
pub time_bars_skip_first_non_full_bar: bool,
#[builder(default = BarIntervalType::LeftOpen)]
pub time_bars_interval_type: BarIntervalType,
#[builder(default)]
pub time_bars_build_delay: u64,
#[builder(default)]
pub time_bars_origin_offset: HashMap<String, u64>,
#[builder(default)]
pub validate_data_sequence: bool,
#[builder(default)]
pub buffer_deltas: bool,
#[builder(default)]
pub emit_quotes_from_book: bool,
#[builder(default)]
pub emit_quotes_from_book_depths: bool,
pub external_clients: Option<Vec<ClientId>>,
#[builder(default)]
pub debug: bool,
#[builder(default)]
pub graceful_shutdown_on_error: bool,
#[builder(default = 100_000)]
pub qsize: u32,
}
impl Default for LiveDataEngineConfig {
fn default() -> Self {
Self::builder().build()
}
}
impl From<LiveDataEngineConfig> for DataEngineConfig {
fn from(config: LiveDataEngineConfig) -> Self {
let time_bars_origin_offset = config
.time_bars_origin_offset
.into_iter()
.map(|(agg, nanos)| {
let agg = BarAggregation::from_str(&agg)
.expect("validate_runtime_support must run before DataEngineConfig conversion");
(agg, Duration::from_nanos(nanos))
})
.collect();
Self {
time_bars_build_with_no_updates: config.time_bars_build_with_no_updates,
time_bars_timestamp_on_close: config.time_bars_timestamp_on_close,
time_bars_skip_first_non_full_bar: config.time_bars_skip_first_non_full_bar,
time_bars_interval_type: config.time_bars_interval_type,
time_bars_build_delay: config.time_bars_build_delay,
time_bars_origin_offset,
validate_data_sequence: config.validate_data_sequence,
buffer_deltas: config.buffer_deltas,
emit_quotes_from_book: config.emit_quotes_from_book,
emit_quotes_from_book_depths: config.emit_quotes_from_book_depths,
disable_historical_cache: false,
external_clients: config.external_clients,
debug: config.debug,
}
}
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct LiveRiskEngineConfig {
#[builder(default)]
pub bypass: bool,
#[builder(default = DEFAULT_ORDER_RATE_LIMIT.to_string())]
pub max_order_submit_rate: String,
#[builder(default = DEFAULT_ORDER_RATE_LIMIT.to_string())]
pub max_order_modify_rate: String,
#[builder(default)]
pub max_notional_per_order: HashMap<String, String>,
#[builder(default)]
pub debug: bool,
#[builder(default)]
pub graceful_shutdown_on_error: bool,
#[builder(default = 100_000)]
pub qsize: u32,
}
impl Default for LiveRiskEngineConfig {
fn default() -> Self {
Self::builder().build()
}
}
impl From<LiveRiskEngineConfig> for RiskEngineConfig {
fn from(config: LiveRiskEngineConfig) -> Self {
let max_notional_per_order = config
.max_notional_per_order
.into_iter()
.map(|(instrument_id, notional)| {
let instrument_id = InstrumentId::from_str(&instrument_id)
.expect("validate_runtime_support must run before RiskEngineConfig conversion");
let notional = Decimal::from_str(¬ional)
.expect("validate_runtime_support must run before RiskEngineConfig conversion");
(instrument_id, notional)
})
.collect::<AHashMap<_, _>>();
Self {
bypass: config.bypass,
max_order_submit: parse_rate_limit(&config.max_order_submit_rate)
.expect("validate_runtime_support must run before RiskEngineConfig conversion"),
max_order_modify: parse_rate_limit(&config.max_order_modify_rate)
.expect("validate_runtime_support must run before RiskEngineConfig conversion"),
max_notional_per_order,
debug: config.debug,
}
}
}
fn parse_rate_limit(input: &str) -> anyhow::Result<RateLimit> {
let (limit, interval) = input.split_once('/').ok_or_else(|| {
anyhow::anyhow!("invalid rate limit '{input}': expected 'limit/HH:MM:SS'")
})?;
let limit = limit
.parse::<usize>()
.map_err(|e| anyhow::anyhow!("invalid rate limit '{input}': {e}"))?;
if limit == 0 {
anyhow::bail!("invalid rate limit '{input}': limit must be greater than zero");
}
let mut parts = interval.split(':');
let mut next = |label: &str| -> anyhow::Result<u64> {
parts
.next()
.ok_or_else(|| {
anyhow::anyhow!("invalid rate limit '{input}': missing {label} component")
})?
.parse::<u64>()
.map_err(|e| anyhow::anyhow!("invalid rate limit '{input}': {label}: {e}"))
};
let hours = next("hours")?;
let minutes = next("minutes")?;
let seconds = next("seconds")?;
if parts.next().is_some() {
anyhow::bail!("invalid rate limit '{input}': expected 'limit/HH:MM:SS'");
}
let interval_ns = hours
.saturating_mul(3_600)
.saturating_add(minutes.saturating_mul(60))
.saturating_add(seconds)
.saturating_mul(NANOSECONDS_IN_SECOND);
if interval_ns == 0 {
anyhow::bail!("invalid rate limit '{input}': interval must be greater than zero");
}
Ok(RateLimit::new(limit, interval_ns))
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct LiveExecEngineConfig {
#[builder(default = true)]
pub load_cache: bool,
#[builder(default)]
pub snapshot_orders: bool,
#[builder(default)]
pub snapshot_positions: bool,
pub snapshot_positions_interval_secs: Option<f64>,
pub external_clients: Option<Vec<ClientId>>,
#[builder(default)]
pub debug: bool,
#[builder(default = true)]
pub reconciliation: bool,
#[builder(default = 10.0)]
pub reconciliation_startup_delay_secs: f64,
pub reconciliation_lookback_mins: Option<u32>,
pub reconciliation_instrument_ids: Option<Vec<String>>,
#[builder(default)]
pub filter_unclaimed_external_orders: bool,
#[builder(default)]
pub filter_position_reports: bool,
pub filtered_client_order_ids: Option<Vec<String>>,
#[builder(default = true)]
pub generate_missing_orders: bool,
#[builder(default = 2_000)]
pub inflight_check_interval_ms: u32,
#[builder(default = 5_000)]
pub inflight_check_threshold_ms: u32,
#[builder(default = 5)]
pub inflight_check_retries: u32,
pub open_check_interval_secs: Option<f64>,
pub open_check_lookback_mins: Option<u32>,
#[builder(default = 5_000)]
pub open_check_threshold_ms: u32,
#[builder(default = 5)]
pub open_check_missing_retries: u32,
#[builder(default = true)]
pub open_check_open_only: bool,
#[builder(default = 10)]
pub max_single_order_queries_per_cycle: u32,
#[builder(default = 100)]
pub single_order_query_delay_ms: u32,
pub position_check_interval_secs: Option<f64>,
#[builder(default = 60)]
pub position_check_lookback_mins: u32,
#[builder(default = 5_000)]
pub position_check_threshold_ms: u32,
#[builder(default = 3)]
pub position_check_retries: u32,
pub purge_closed_orders_interval_mins: Option<u32>,
pub purge_closed_orders_buffer_mins: Option<u32>,
pub purge_closed_positions_interval_mins: Option<u32>,
pub purge_closed_positions_buffer_mins: Option<u32>,
pub purge_account_events_interval_mins: Option<u32>,
pub purge_account_events_lookback_mins: Option<u32>,
#[builder(default)]
pub purge_from_database: bool,
pub own_books_audit_interval_secs: Option<f64>,
#[builder(default)]
pub graceful_shutdown_on_error: bool,
#[builder(default = 100_000)]
pub qsize: u32,
#[builder(default)]
pub allow_overfills: bool,
#[builder(default)]
pub manage_own_order_books: bool,
}
impl Default for LiveExecEngineConfig {
fn default() -> Self {
Self {
open_check_lookback_mins: Some(60),
..Self::builder().build()
}
}
}
impl From<LiveExecEngineConfig> for ExecutionEngineConfig {
fn from(config: LiveExecEngineConfig) -> Self {
Self {
load_cache: config.load_cache,
manage_own_order_books: config.manage_own_order_books,
snapshot_orders: config.snapshot_orders,
snapshot_positions: config.snapshot_positions,
snapshot_positions_interval_secs: config.snapshot_positions_interval_secs,
allow_overfills: config.allow_overfills,
external_clients: config.external_clients,
purge_closed_orders_interval_mins: config.purge_closed_orders_interval_mins,
purge_closed_orders_buffer_mins: config.purge_closed_orders_buffer_mins,
purge_closed_positions_interval_mins: config.purge_closed_positions_interval_mins,
purge_closed_positions_buffer_mins: config.purge_closed_positions_buffer_mins,
purge_account_events_interval_mins: config.purge_account_events_interval_mins,
purge_account_events_lookback_mins: config.purge_account_events_lookback_mins,
purge_from_database: config.purge_from_database,
debug: config.debug,
}
}
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct RoutingConfig {
#[builder(default)]
pub default: bool,
pub venues: Option<Vec<String>>,
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct InstrumentProviderConfig {
#[builder(default)]
pub load_all: bool,
pub load_ids: Option<Vec<String>>,
#[builder(default)]
pub filters: HashMap<String, serde_json::Value>,
pub filter_callable: Option<String>,
#[builder(default = true)]
pub log_warnings: bool,
}
impl Default for InstrumentProviderConfig {
fn default() -> Self {
Self::builder().build()
}
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct LiveDataClientConfig {
#[builder(default)]
pub handle_revised_bars: bool,
#[builder(default)]
pub instrument_provider: InstrumentProviderConfig,
#[builder(default)]
pub routing: RoutingConfig,
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct LiveExecClientConfig {
#[builder(default)]
pub instrument_provider: InstrumentProviderConfig,
#[builder(default)]
pub routing: RoutingConfig,
}
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
#[derive(Debug, Clone, Serialize, Deserialize, bon::Builder)]
#[serde(default, deny_unknown_fields)]
pub struct LiveNodeConfig {
#[builder(default = Environment::Live)]
pub environment: Environment,
#[builder(default = TraderId::from("TRADER-001"))]
pub trader_id: TraderId,
#[builder(default)]
pub load_state: bool,
#[builder(default)]
pub save_state: bool,
#[builder(default)]
pub logging: LoggerConfig,
pub instance_id: Option<UUID4>,
#[builder(default = Duration::from_secs(120))]
pub timeout_connection: Duration,
#[builder(default = Duration::from_secs(30))]
pub timeout_reconciliation: Duration,
#[builder(default = Duration::from_secs(10))]
pub timeout_portfolio: Duration,
#[builder(default = Duration::from_secs(10))]
pub timeout_disconnection: Duration,
#[builder(default = Duration::from_secs(10))]
pub delay_post_stop: Duration,
#[builder(default = Duration::from_secs(5))]
pub timeout_shutdown: Duration,
pub cache: Option<CacheConfig>,
pub msgbus: Option<MessageBusConfig>,
pub portfolio: Option<PortfolioConfig>,
pub emulator: Option<OrderEmulatorConfig>,
pub streaming: Option<StreamingConfig>,
#[builder(default)]
pub loop_debug: bool,
#[builder(default)]
pub data_engine: LiveDataEngineConfig,
#[builder(default)]
pub risk_engine: LiveRiskEngineConfig,
#[builder(default)]
pub exec_engine: LiveExecEngineConfig,
#[builder(default)]
pub data_clients: HashMap<String, LiveDataClientConfig>,
#[builder(default)]
pub exec_clients: HashMap<String, LiveExecClientConfig>,
}
impl Default for LiveNodeConfig {
fn default() -> Self {
Self::builder().build()
}
}
impl LiveNodeConfig {
pub(crate) fn validate_runtime_support(&self) -> anyhow::Result<()> {
if self.msgbus.is_some() {
anyhow::bail!("LiveNodeConfig.msgbus is not supported by the Rust live runtime yet");
}
if self.streaming.is_some() {
anyhow::bail!("LiveNodeConfig.streaming is not supported by the Rust live runtime yet");
}
if self.emulator.is_some() {
anyhow::bail!("LiveNodeConfig.emulator is not supported by the Rust live runtime yet");
}
if self.loop_debug {
anyhow::bail!(
"LiveNodeConfig.loop_debug is not supported by the Rust live runtime yet"
);
}
self.data_engine.validate_runtime_support()?;
self.risk_engine.validate_runtime_support()?;
self.exec_engine.validate_runtime_support()?;
Ok(())
}
}
impl LiveDataEngineConfig {
fn validate_runtime_support(&self) -> anyhow::Result<()> {
for agg_str in self.time_bars_origin_offset.keys() {
BarAggregation::from_str(agg_str).map_err(|e| {
anyhow::anyhow!(
"invalid LiveDataEngineConfig.time_bars_origin_offset key {agg_str:?}: {e}"
)
})?;
}
let default = Self::default();
if self.graceful_shutdown_on_error != default.graceful_shutdown_on_error {
anyhow::bail!(
"LiveDataEngineConfig.graceful_shutdown_on_error is not supported by the Rust live runtime yet"
);
}
if self.qsize != default.qsize {
anyhow::bail!(
"LiveDataEngineConfig.qsize is not supported by the Rust live runtime yet"
);
}
Ok(())
}
}
impl LiveRiskEngineConfig {
fn validate_runtime_support(&self) -> anyhow::Result<()> {
parse_rate_limit(&self.max_order_submit_rate).map_err(|e| {
anyhow::anyhow!("invalid LiveRiskEngineConfig.max_order_submit_rate: {e}")
})?;
parse_rate_limit(&self.max_order_modify_rate).map_err(|e| {
anyhow::anyhow!("invalid LiveRiskEngineConfig.max_order_modify_rate: {e}")
})?;
for (instrument_id, notional) in &self.max_notional_per_order {
InstrumentId::from_str(instrument_id).map_err(|e| {
anyhow::anyhow!(
"invalid LiveRiskEngineConfig.max_notional_per_order instrument ID {instrument_id:?}: {e}"
)
})?;
Decimal::from_str(notional).map_err(|e| {
anyhow::anyhow!(
"invalid LiveRiskEngineConfig.max_notional_per_order notional {notional:?}: {e}"
)
})?;
}
let default = Self::default();
if self.graceful_shutdown_on_error != default.graceful_shutdown_on_error {
anyhow::bail!(
"LiveRiskEngineConfig.graceful_shutdown_on_error is not supported by the Rust live runtime yet"
);
}
if self.qsize != default.qsize {
anyhow::bail!(
"LiveRiskEngineConfig.qsize is not supported by the Rust live runtime yet"
);
}
Ok(())
}
}
impl LiveExecEngineConfig {
fn validate_runtime_support(&self) -> anyhow::Result<()> {
if !self.reconciliation_startup_delay_secs.is_finite()
|| self.reconciliation_startup_delay_secs < 0.0
{
anyhow::bail!(
"invalid LiveExecEngineConfig.reconciliation_startup_delay_secs: {} (must be a non-negative finite number)",
self.reconciliation_startup_delay_secs
);
}
if let Some(instrument_ids) = &self.reconciliation_instrument_ids {
for instrument_id in instrument_ids {
InstrumentId::from_str(instrument_id).map_err(|e| {
anyhow::anyhow!(
"invalid LiveExecEngineConfig.reconciliation_instrument_ids entry {instrument_id:?}: {e}"
)
})?;
}
}
if let Some(client_order_ids) = &self.filtered_client_order_ids {
for client_order_id in client_order_ids {
ClientOrderId::new_checked(client_order_id).map_err(|e| {
anyhow::anyhow!(
"invalid LiveExecEngineConfig.filtered_client_order_ids entry {client_order_id:?}: {e}"
)
})?;
}
}
let default = Self::default();
if self.snapshot_orders != default.snapshot_orders {
anyhow::bail!(
"LiveExecEngineConfig.snapshot_orders is not supported by the Rust live runtime yet"
);
}
if self.snapshot_positions != default.snapshot_positions {
anyhow::bail!(
"LiveExecEngineConfig.snapshot_positions is not supported by the Rust live runtime yet"
);
}
if self.purge_from_database != default.purge_from_database {
anyhow::bail!(
"LiveExecEngineConfig.purge_from_database is not supported by the Rust live runtime yet"
);
}
if self.graceful_shutdown_on_error != default.graceful_shutdown_on_error {
anyhow::bail!(
"LiveExecEngineConfig.graceful_shutdown_on_error is not supported by the Rust live runtime yet"
);
}
if self.qsize != default.qsize {
anyhow::bail!(
"LiveExecEngineConfig.qsize is not supported by the Rust live runtime yet"
);
}
Ok(())
}
}
impl NautilusKernelConfig for LiveNodeConfig {
fn environment(&self) -> Environment {
self.environment
}
fn trader_id(&self) -> TraderId {
self.trader_id
}
fn load_state(&self) -> bool {
self.load_state
}
fn save_state(&self) -> bool {
self.save_state
}
fn logging(&self) -> LoggerConfig {
self.logging.clone()
}
fn instance_id(&self) -> Option<UUID4> {
self.instance_id
}
fn timeout_connection(&self) -> Duration {
self.timeout_connection
}
fn timeout_reconciliation(&self) -> Duration {
self.timeout_reconciliation
}
fn timeout_portfolio(&self) -> Duration {
self.timeout_portfolio
}
fn timeout_disconnection(&self) -> Duration {
self.timeout_disconnection
}
fn delay_post_stop(&self) -> Duration {
self.delay_post_stop
}
fn timeout_shutdown(&self) -> Duration {
self.timeout_shutdown
}
fn cache(&self) -> Option<CacheConfig> {
self.cache.clone()
}
fn msgbus(&self) -> Option<MessageBusConfig> {
self.msgbus.clone()
}
fn data_engine(&self) -> Option<DataEngineConfig> {
Some(self.data_engine.clone().into())
}
fn risk_engine(&self) -> Option<RiskEngineConfig> {
Some(self.risk_engine.clone().into())
}
fn exec_engine(&self) -> Option<ExecutionEngineConfig> {
Some(self.exec_engine.clone().into())
}
fn portfolio(&self) -> Option<PortfolioConfig> {
self.portfolio
}
fn streaming(&self) -> Option<StreamingConfig> {
self.streaming.clone()
}
}
#[cfg(test)]
mod tests {
use nautilus_system::config::RotationConfig;
use rstest::rstest;
use super::*;
#[rstest]
fn test_trading_node_config_default() {
let config = LiveNodeConfig::default();
assert_eq!(config.environment, Environment::Live);
assert_eq!(config.trader_id, TraderId::from("TRADER-001"));
assert_eq!(config.data_engine.qsize, 100_000);
assert_eq!(config.risk_engine.qsize, 100_000);
assert_eq!(config.exec_engine.qsize, 100_000);
assert!(config.exec_engine.reconciliation);
assert!(!config.exec_engine.filter_unclaimed_external_orders);
assert!(config.data_clients.is_empty());
assert!(config.exec_clients.is_empty());
}
#[rstest]
fn test_trading_node_config_as_kernel_config() {
let config = LiveNodeConfig::default();
assert_eq!(config.environment(), Environment::Live);
assert_eq!(config.trader_id(), TraderId::from("TRADER-001"));
assert!(config.data_engine().is_some());
assert!(config.risk_engine().is_some());
assert!(config.exec_engine().is_some());
assert!(!config.load_state());
assert!(!config.save_state());
}
#[rstest]
fn test_validate_runtime_support_with_defaults() {
let config = LiveNodeConfig::default();
assert!(config.validate_runtime_support().is_ok());
}
#[rstest]
fn test_validate_runtime_support_rejects_msgbus_config() {
let config = LiveNodeConfig {
msgbus: Some(MessageBusConfig::default()),
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err();
assert_eq!(
error.to_string(),
"LiveNodeConfig.msgbus is not supported by the Rust live runtime yet"
);
}
#[rstest]
fn test_validate_runtime_support_rejects_streaming_config() {
let config = LiveNodeConfig {
streaming: Some(StreamingConfig::new(
"catalog".to_string(),
"file".to_string(),
1_000,
false,
RotationConfig::NoRotation,
)),
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err();
assert_eq!(
error.to_string(),
"LiveNodeConfig.streaming is not supported by the Rust live runtime yet"
);
}
#[rstest]
fn test_validate_runtime_support_rejects_data_engine_qsize() {
let config = LiveNodeConfig {
data_engine: LiveDataEngineConfig {
qsize: 1,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err();
assert_eq!(
error.to_string(),
"LiveDataEngineConfig.qsize is not supported by the Rust live runtime yet"
);
}
#[rstest]
fn test_validate_runtime_support_rejects_risk_engine_qsize() {
let config = LiveNodeConfig {
risk_engine: LiveRiskEngineConfig {
qsize: 1,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err();
assert_eq!(
error.to_string(),
"LiveRiskEngineConfig.qsize is not supported by the Rust live runtime yet"
);
}
#[rstest]
fn test_live_data_engine_config_converts_to_data_engine_config() {
let config = LiveDataEngineConfig {
time_bars_build_with_no_updates: false,
time_bars_timestamp_on_close: false,
time_bars_skip_first_non_full_bar: true,
time_bars_interval_type: BarIntervalType::RightOpen,
time_bars_build_delay: 1_500,
validate_data_sequence: true,
buffer_deltas: true,
external_clients: Some(vec![ClientId::from("EXTERNAL")]),
debug: true,
..Default::default()
};
let converted: DataEngineConfig = config.into();
assert!(!converted.time_bars_build_with_no_updates);
assert!(!converted.time_bars_timestamp_on_close);
assert!(converted.time_bars_skip_first_non_full_bar);
assert_eq!(
converted.time_bars_interval_type,
BarIntervalType::RightOpen,
);
assert_eq!(converted.time_bars_build_delay, 1_500);
assert!(converted.time_bars_origin_offset.is_empty());
assert!(converted.validate_data_sequence);
assert!(converted.buffer_deltas);
assert!(!converted.emit_quotes_from_book);
assert!(!converted.emit_quotes_from_book_depths);
assert_eq!(
converted.external_clients,
Some(vec![ClientId::from("EXTERNAL")]),
);
assert!(converted.debug);
}
#[rstest]
fn test_live_data_engine_config_converts_time_bars_origin_offset() {
let config = LiveDataEngineConfig {
time_bars_origin_offset: HashMap::from([("Minute".to_string(), 5_000_000_000)]),
emit_quotes_from_book: true,
emit_quotes_from_book_depths: true,
..Default::default()
};
let converted: DataEngineConfig = config.into();
assert_eq!(converted.time_bars_origin_offset.len(), 1);
assert_eq!(
converted.time_bars_origin_offset[&BarAggregation::Minute],
Duration::from_nanos(5_000_000_000),
);
assert!(converted.emit_quotes_from_book);
assert!(converted.emit_quotes_from_book_depths);
}
#[rstest]
fn test_live_exec_engine_config_converts_to_exec_engine_config() {
let config = LiveExecEngineConfig {
load_cache: false,
snapshot_positions_interval_secs: Some(30.0),
..Default::default()
};
let converted: ExecutionEngineConfig = config.into();
assert!(!converted.load_cache);
assert_eq!(converted.snapshot_positions_interval_secs, Some(30.0));
}
#[rstest]
fn test_live_risk_engine_config_converts_to_risk_engine_config() {
let config = LiveRiskEngineConfig {
bypass: true,
max_order_submit_rate: "12/00:00:03".to_string(),
max_order_modify_rate: "7/00:00:05".to_string(),
max_notional_per_order: HashMap::from([(
"ETHUSDT.BINANCE".to_string(),
"1000.5".to_string(),
)]),
debug: true,
..Default::default()
};
let converted: RiskEngineConfig = config.into();
assert!(converted.bypass);
assert_eq!(
converted.max_order_submit,
RateLimit::new(12, 3_000_000_000)
);
assert_eq!(converted.max_order_modify, RateLimit::new(7, 5_000_000_000));
assert_eq!(
converted.max_notional_per_order[&"ETHUSDT.BINANCE".parse::<InstrumentId>().unwrap()],
Decimal::from_str("1000.5").unwrap(),
);
assert!(converted.debug);
}
#[rstest]
fn test_validate_runtime_support_rejects_exec_engine_snapshot_orders() {
let config = LiveNodeConfig {
exec_engine: LiveExecEngineConfig {
snapshot_orders: true,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err();
assert_eq!(
error.to_string(),
"LiveExecEngineConfig.snapshot_orders is not supported by the Rust live runtime yet"
);
}
#[rstest]
fn test_validate_runtime_support_rejects_invalid_rate_limit() {
let config = LiveNodeConfig {
risk_engine: LiveRiskEngineConfig {
max_order_submit_rate: "bad-rate".to_string(),
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("LiveRiskEngineConfig.max_order_submit_rate"));
}
#[rstest]
#[case(-1.0)]
#[case(f64::NAN)]
#[case(f64::INFINITY)]
#[case(f64::NEG_INFINITY)]
fn test_validate_runtime_support_rejects_hostile_startup_delay(#[case] value: f64) {
let config = LiveNodeConfig {
exec_engine: LiveExecEngineConfig {
reconciliation_startup_delay_secs: value,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("reconciliation_startup_delay_secs"));
}
#[rstest]
fn test_validate_runtime_support_rejects_invalid_reconciliation_instrument_id() {
let config = LiveNodeConfig {
exec_engine: LiveExecEngineConfig {
reconciliation_instrument_ids: Some(vec!["INVALID".to_string()]),
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("reconciliation_instrument_ids"));
}
#[rstest]
fn test_parse_rate_limit_happy_path() {
let limit = parse_rate_limit("150/00:00:02").unwrap();
assert_eq!(limit, RateLimit::new(150, 2_000_000_000));
}
#[rstest]
fn test_parse_rate_limit_rejects_trailing_component() {
let err = parse_rate_limit("10/00:00:01:99").unwrap_err().to_string();
assert!(err.contains("expected 'limit/HH:MM:SS'"));
}
#[rstest]
fn test_parse_rate_limit_rejects_zero_limit() {
let err = parse_rate_limit("0/00:00:01").unwrap_err().to_string();
assert!(err.contains("limit must be greater than zero"));
}
#[rstest]
fn test_parse_rate_limit_rejects_zero_interval() {
let err = parse_rate_limit("100/00:00:00").unwrap_err().to_string();
assert!(err.contains("interval must be greater than zero"));
}
#[rstest]
fn test_validate_runtime_support_rejects_exec_engine_qsize() {
let config = LiveNodeConfig {
exec_engine: LiveExecEngineConfig {
qsize: 1,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err();
assert_eq!(
error.to_string(),
"LiveExecEngineConfig.qsize is not supported by the Rust live runtime yet"
);
}
#[rstest]
fn test_validate_runtime_support_rejects_data_engine_graceful_shutdown() {
let config = LiveNodeConfig {
data_engine: LiveDataEngineConfig {
graceful_shutdown_on_error: true,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("graceful_shutdown_on_error"));
}
#[rstest]
fn test_validate_runtime_support_rejects_risk_engine_graceful_shutdown() {
let config = LiveNodeConfig {
risk_engine: LiveRiskEngineConfig {
graceful_shutdown_on_error: true,
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("graceful_shutdown_on_error"));
}
#[rstest]
fn test_validate_runtime_support_rejects_emulator() {
let config = LiveNodeConfig {
emulator: Some(OrderEmulatorConfig::default()),
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("emulator"));
}
#[rstest]
fn test_validate_runtime_support_rejects_loop_debug() {
let config = LiveNodeConfig {
loop_debug: true,
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("loop_debug"));
}
#[rstest]
fn test_validate_runtime_support_accepts_file_config() {
use nautilus_common::logging::writer::FileWriterConfig;
let config = LiveNodeConfig {
logging: LoggerConfig {
file_config: Some(FileWriterConfig::default()),
..Default::default()
},
..Default::default()
};
assert!(config.validate_runtime_support().is_ok());
}
#[rstest]
fn test_validate_runtime_support_accepts_clear_log_file() {
let config = LiveNodeConfig {
logging: LoggerConfig {
clear_log_file: true,
..Default::default()
},
..Default::default()
};
assert!(config.validate_runtime_support().is_ok());
}
#[rstest]
fn test_validate_runtime_support_rejects_invalid_time_bars_origin_offset_key() {
let config = LiveNodeConfig {
data_engine: LiveDataEngineConfig {
time_bars_origin_offset: HashMap::from([("INVALID".to_string(), 1_000)]),
..Default::default()
},
..Default::default()
};
let error = config.validate_runtime_support().unwrap_err().to_string();
assert!(error.contains("time_bars_origin_offset"));
}
#[rstest]
fn test_live_exec_engine_config_defaults() {
let config = LiveExecEngineConfig::default();
assert!(config.load_cache);
assert!(!config.snapshot_orders);
assert!(!config.snapshot_positions);
assert_eq!(config.snapshot_positions_interval_secs, None);
assert_eq!(config.external_clients, None);
assert!(!config.debug);
assert!(!config.manage_own_order_books);
assert!(!config.allow_overfills);
assert!(config.reconciliation);
assert_eq!(config.reconciliation_startup_delay_secs, 10.0);
assert_eq!(config.reconciliation_lookback_mins, None);
assert_eq!(config.reconciliation_instrument_ids, None);
assert_eq!(config.filtered_client_order_ids, None);
assert!(!config.filter_unclaimed_external_orders);
assert!(!config.filter_position_reports);
assert!(config.generate_missing_orders);
assert_eq!(config.inflight_check_interval_ms, 2_000);
assert_eq!(config.inflight_check_threshold_ms, 5_000);
assert_eq!(config.inflight_check_retries, 5);
assert_eq!(config.open_check_threshold_ms, 5_000);
assert_eq!(config.open_check_lookback_mins, Some(60));
assert_eq!(config.open_check_missing_retries, 5);
assert!(config.open_check_open_only);
assert_eq!(config.max_single_order_queries_per_cycle, 10);
assert_eq!(config.position_check_threshold_ms, 5_000);
assert_eq!(config.position_check_retries, 3);
assert!(!config.purge_from_database);
assert!(!config.graceful_shutdown_on_error);
assert_eq!(config.qsize, 100_000);
}
#[rstest]
fn test_live_data_engine_config_defaults() {
let config = LiveDataEngineConfig::default();
assert!(config.time_bars_build_with_no_updates);
assert!(config.time_bars_timestamp_on_close);
assert!(!config.time_bars_skip_first_non_full_bar);
assert_eq!(config.time_bars_interval_type, BarIntervalType::LeftOpen);
assert_eq!(config.time_bars_build_delay, 0);
assert!(config.time_bars_origin_offset.is_empty());
assert!(!config.validate_data_sequence);
assert!(!config.buffer_deltas);
assert!(!config.emit_quotes_from_book);
assert!(!config.emit_quotes_from_book_depths);
assert_eq!(config.external_clients, None);
assert!(!config.debug);
assert!(!config.graceful_shutdown_on_error);
assert_eq!(config.qsize, 100_000);
}
#[rstest]
fn test_live_risk_engine_config_defaults() {
let config = LiveRiskEngineConfig::default();
assert!(!config.bypass);
assert_eq!(config.max_order_submit_rate, DEFAULT_ORDER_RATE_LIMIT);
assert_eq!(config.max_order_modify_rate, DEFAULT_ORDER_RATE_LIMIT);
assert!(config.max_notional_per_order.is_empty());
assert!(!config.debug);
assert!(!config.graceful_shutdown_on_error);
assert_eq!(config.qsize, 100_000);
}
#[rstest]
fn test_routing_config_default() {
let config = RoutingConfig::default();
assert!(!config.default);
assert_eq!(config.venues, None);
}
#[rstest]
fn test_live_data_client_config_default() {
let config = LiveDataClientConfig::default();
assert!(!config.handle_revised_bars);
assert!(!config.instrument_provider.load_all);
assert!(config.instrument_provider.load_ids.is_none());
assert!(config.instrument_provider.filters.is_empty());
assert!(config.instrument_provider.filter_callable.is_none());
assert!(config.instrument_provider.log_warnings);
assert!(!config.routing.default);
}
#[rstest]
fn test_live_data_client_config_rejects_unknown_field() {
let error = serde_json::from_str::<LiveDataClientConfig>(
r#"{"handle_revised_bars":true,"unexpected":true}"#,
)
.unwrap_err();
assert!(error.to_string().contains("unknown field `unexpected`"));
}
#[rstest]
fn test_live_data_client_config_rejects_unknown_nested_field() {
let error = serde_json::from_str::<LiveDataClientConfig>(
r#"{"instrument_provider":{"load_all":true,"instrument_provider":{"load_all":false}}}"#,
)
.unwrap_err();
assert!(
error
.to_string()
.contains("unknown field `instrument_provider`")
);
}
#[rstest]
fn test_live_node_config_toml_minimal() {
let config: LiveNodeConfig = toml::from_str(
r#"
environment = "Live"
trader_id = "TRADER-042"
[data_engine]
debug = true
[risk_engine]
bypass = false
[exec_engine]
reconciliation = false
[data_clients.hyperliquid]
handle_revised_bars = true
[exec_clients.hyperliquid]
routing = { default = true, venues = ["HYPERLIQUID"] }
instrument_provider = { load_all = true }
"#,
)
.unwrap();
assert_eq!(config.environment, Environment::Live);
assert_eq!(config.trader_id, TraderId::from("TRADER-042"));
assert!(config.data_engine.debug);
assert!(!config.risk_engine.bypass);
assert!(!config.exec_engine.reconciliation);
assert!(config.data_clients["hyperliquid"].handle_revised_bars);
let exec_client = &config.exec_clients["hyperliquid"];
assert!(exec_client.routing.default);
assert_eq!(
exec_client.routing.venues,
Some(vec!["HYPERLIQUID".to_string()]),
);
assert!(exec_client.instrument_provider.load_all);
}
}