use std::{collections::HashMap, time::Duration};
use nautilus_common::{
cache::CacheConfig,
enums::Environment,
factories::{ClientConfig, DataClientFactory, ExecutionClientFactory},
logging::logger::LoggerConfig,
msgbus::database::MessageBusConfig,
};
use nautilus_core::UUID4;
use nautilus_data::client::DataClientAdapter;
use nautilus_execution::engine::ExecutionEngine;
use nautilus_model::identifiers::TraderId;
use nautilus_portfolio::config::PortfolioConfig;
use nautilus_system::{config::StreamingConfig, kernel::NautilusKernel};
use crate::{
config::{LiveDataEngineConfig, LiveExecEngineConfig, LiveNodeConfig, LiveRiskEngineConfig},
manager::{ExecutionManager, ExecutionManagerConfig},
node::LiveNode,
runner::AsyncRunner,
};
#[derive(Debug)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
)]
pub struct LiveNodeBuilder {
name: String,
config: LiveNodeConfig,
data_client_factories: HashMap<String, Box<dyn DataClientFactory>>,
exec_client_factories: HashMap<String, Box<dyn ExecutionClientFactory>>,
data_client_configs: HashMap<String, Box<dyn ClientConfig>>,
exec_client_configs: HashMap<String, Box<dyn ClientConfig>>,
}
impl LiveNodeBuilder {
pub fn new(trader_id: TraderId, environment: Environment) -> anyhow::Result<Self> {
match environment {
Environment::Sandbox | Environment::Live => {}
Environment::Backtest => {
anyhow::bail!("LiveNode cannot be used with Backtest environment");
}
}
let config = LiveNodeConfig {
environment,
trader_id,
..Default::default()
};
Ok(Self {
name: "LiveNode".to_string(),
config,
data_client_factories: HashMap::new(),
exec_client_factories: HashMap::new(),
data_client_configs: HashMap::new(),
exec_client_configs: HashMap::new(),
})
}
pub fn from_config(config: LiveNodeConfig) -> anyhow::Result<Self> {
match config.environment {
Environment::Sandbox | Environment::Live => {}
Environment::Backtest => {
anyhow::bail!("LiveNode cannot be used with Backtest environment");
}
}
Ok(Self {
name: "LiveNode".to_string(),
config,
data_client_factories: HashMap::new(),
exec_client_factories: HashMap::new(),
data_client_configs: HashMap::new(),
exec_client_configs: HashMap::new(),
})
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
#[must_use]
pub const fn with_instance_id(mut self, instance_id: UUID4) -> Self {
self.config.instance_id = Some(instance_id);
self
}
#[must_use]
pub const fn with_load_state(mut self, load_state: bool) -> Self {
self.config.load_state = load_state;
self
}
#[must_use]
pub const fn with_save_state(mut self, save_state: bool) -> Self {
self.config.save_state = save_state;
self
}
#[must_use]
pub const fn with_timeout_connection(mut self, timeout_secs: u64) -> Self {
self.config.timeout_connection = Duration::from_secs(timeout_secs);
self
}
#[must_use]
pub const fn with_timeout_reconciliation(mut self, timeout_secs: u64) -> Self {
self.config.timeout_reconciliation = Duration::from_secs(timeout_secs);
self
}
#[must_use]
pub fn with_reconciliation(mut self, reconciliation: bool) -> Self {
self.config.exec_engine.reconciliation = reconciliation;
self
}
#[must_use]
pub fn with_reconciliation_lookback_mins(mut self, mins: u32) -> Self {
self.config.exec_engine.reconciliation_lookback_mins = Some(mins);
self
}
#[must_use]
pub const fn with_timeout_portfolio(mut self, timeout_secs: u64) -> Self {
self.config.timeout_portfolio = Duration::from_secs(timeout_secs);
self
}
#[must_use]
pub const fn with_timeout_disconnection_secs(mut self, timeout_secs: u64) -> Self {
self.config.timeout_disconnection = Duration::from_secs(timeout_secs);
self
}
#[must_use]
pub const fn with_delay_post_stop_secs(mut self, delay_secs: u64) -> Self {
self.config.delay_post_stop = Duration::from_secs(delay_secs);
self
}
#[must_use]
pub const fn with_delay_shutdown_secs(mut self, delay_secs: u64) -> Self {
self.config.timeout_shutdown = Duration::from_secs(delay_secs);
self
}
#[must_use]
pub fn with_cache_config(mut self, config: CacheConfig) -> Self {
self.config.cache = Some(config);
self
}
#[must_use]
pub fn with_msgbus_config(mut self, config: MessageBusConfig) -> Self {
self.config.msgbus = Some(config);
self
}
#[must_use]
pub fn with_portfolio_config(mut self, config: PortfolioConfig) -> Self {
self.config.portfolio = Some(config);
self
}
#[must_use]
pub fn with_streaming_config(mut self, config: StreamingConfig) -> Self {
self.config.streaming = Some(config);
self
}
#[must_use]
pub fn with_data_engine_config(mut self, config: LiveDataEngineConfig) -> Self {
self.config.data_engine = config;
self
}
#[must_use]
pub fn with_risk_engine_config(mut self, config: LiveRiskEngineConfig) -> Self {
self.config.risk_engine = config;
self
}
#[must_use]
pub fn with_exec_engine_config(mut self, config: LiveExecEngineConfig) -> Self {
self.config.exec_engine = config;
self
}
#[must_use]
pub fn with_logging(mut self, logging: LoggerConfig) -> Self {
self.config.logging = logging;
self
}
pub fn add_data_client(
mut self,
name: Option<String>,
factory: Box<dyn DataClientFactory>,
config: Box<dyn ClientConfig>,
) -> anyhow::Result<Self> {
let name = name.unwrap_or_else(|| factory.name().to_string());
if self.data_client_factories.contains_key(&name) {
anyhow::bail!("Data client '{name}' is already registered");
}
self.data_client_factories.insert(name.clone(), factory);
self.data_client_configs.insert(name, config);
Ok(self)
}
pub fn add_exec_client(
mut self,
name: Option<String>,
factory: Box<dyn ExecutionClientFactory>,
config: Box<dyn ClientConfig>,
) -> anyhow::Result<Self> {
let name = name.unwrap_or_else(|| factory.name().to_string());
if self.exec_client_factories.contains_key(&name) {
anyhow::bail!("Execution client '{name}' is already registered");
}
self.exec_client_factories.insert(name.clone(), factory);
self.exec_client_configs.insert(name, config);
Ok(self)
}
pub fn build(mut self) -> anyhow::Result<LiveNode> {
log::info!(
"Building LiveNode with {} data clients and {} execution clients",
self.data_client_factories.len(),
self.exec_client_factories.len()
);
self.config.validate_runtime_support()?;
let runner = AsyncRunner::new();
runner.bind_senders();
let kernel = NautilusKernel::new(self.name.clone(), self.config.clone())?;
for (name, factory) in self.data_client_factories {
if let Some(config) = self.data_client_configs.remove(&name) {
log::debug!("Creating data client {name}");
let client =
factory.create(&name, config.as_ref(), kernel.cache(), kernel.clock())?;
let client_id = client.client_id();
let venue = client.venue();
let adapter = DataClientAdapter::new(
client_id, venue, true, true, client,
);
kernel
.data_engine
.borrow_mut()
.register_client(adapter, venue);
log::info!("Registered DataClient-{client_id}");
} else {
log::warn!("No config found for data client factory {name}");
}
}
for (name, factory) in self.exec_client_factories {
if let Some(config) = self.exec_client_configs.remove(&name) {
log::debug!("Creating execution client {name}");
let client = factory.create(&name, config.as_ref(), kernel.cache())?;
let client_id = client.client_id();
let venue = client.venue();
kernel.exec_engine.borrow_mut().register_client(client)?;
ExecutionEngine::subscribe_venue_instruments(&kernel.exec_engine, venue);
log::info!("Registered ExecutionClient-{client_id}");
} else {
log::warn!("No config found for execution client factory {name}");
}
}
let exec_manager_config = ExecutionManagerConfig::from(&self.config.exec_engine)
.with_trader_id(self.config.trader_id);
let exec_manager = ExecutionManager::new(
kernel.clock.clone(),
kernel.cache.clone(),
exec_manager_config,
);
log::info!("Built successfully");
Ok(LiveNode::new_from_builder(
kernel,
runner,
self.config,
exec_manager,
))
}
}