use std::{
cell::{Ref, RefCell},
rc::Rc,
time::Duration,
};
use nautilus_common::{
cache::{Cache, CacheConfig, database::CacheDatabaseAdapter},
clock::{Clock, TestClock},
component::Component,
enums::Environment,
logging::{
headers, init_logging,
logger::{LogGuard, LoggerConfig},
writer::FileWriterConfig,
},
msgbus::{MessageBus, get_message_bus, set_message_bus},
};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_data::engine::DataEngine;
use nautilus_execution::{engine::ExecutionEngine, order_emulator::adapter::OrderEmulatorAdapter};
use nautilus_model::identifiers::{ClientId, TraderId};
use nautilus_portfolio::portfolio::Portfolio;
use nautilus_risk::engine::RiskEngine;
use ustr::Ustr;
use crate::{builder::NautilusKernelBuilder, config::NautilusKernelConfig, trader::Trader};
#[derive(Debug)]
pub struct NautilusKernel {
pub name: String,
pub instance_id: UUID4,
pub machine_id: String,
pub config: Box<dyn NautilusKernelConfig>,
pub cache: Rc<RefCell<Cache>>,
pub clock: Rc<RefCell<dyn Clock>>,
pub portfolio: Rc<RefCell<Portfolio>>,
pub log_guard: LogGuard,
pub data_engine: Rc<RefCell<DataEngine>>,
pub risk_engine: Rc<RefCell<RiskEngine>>,
pub exec_engine: Rc<RefCell<ExecutionEngine>>,
pub order_emulator: OrderEmulatorAdapter,
pub trader: Rc<RefCell<Trader>>,
pub ts_created: UnixNanos,
pub ts_started: Option<UnixNanos>,
pub ts_shutdown: Option<UnixNanos>,
}
impl NautilusKernel {
#[must_use]
pub const fn builder(
name: String,
trader_id: TraderId,
environment: Environment,
) -> NautilusKernelBuilder {
NautilusKernelBuilder::new(name, trader_id, environment)
}
pub fn new<T: NautilusKernelConfig + 'static>(name: String, config: T) -> anyhow::Result<Self> {
let instance_id = config.instance_id().unwrap_or_default();
let machine_id = Self::determine_machine_id()?;
let logger_config = config.logging();
let log_guard = Self::initialize_logging(config.trader_id(), instance_id, logger_config)?;
headers::log_header(
config.trader_id(),
&machine_id,
instance_id,
Ustr::from(stringify!(LiveNode)),
);
log::info!("Building system kernel");
let clock = Self::initialize_clock(&config.environment());
let cache = Self::initialize_cache(config.cache());
let msgbus = Rc::new(RefCell::new(MessageBus::new(
config.trader_id(),
instance_id,
Some(name.clone()),
None,
)));
set_message_bus(msgbus);
let portfolio = Rc::new(RefCell::new(Portfolio::new(
cache.clone(),
clock.clone(),
config.portfolio(),
)));
let risk_engine = RiskEngine::new(
config.risk_engine().unwrap_or_default(),
portfolio.borrow().clone_shallow(),
clock.clone(),
cache.clone(),
);
let risk_engine = Rc::new(RefCell::new(risk_engine));
let exec_engine = ExecutionEngine::new(clock.clone(), cache.clone(), config.exec_engine());
let exec_engine = Rc::new(RefCell::new(exec_engine));
let order_emulator =
OrderEmulatorAdapter::new(config.trader_id(), clock.clone(), cache.clone());
let data_engine = DataEngine::new(clock.clone(), cache.clone(), config.data_engine());
let data_engine = Rc::new(RefCell::new(data_engine));
DataEngine::register_msgbus_handlers(&data_engine);
RiskEngine::register_msgbus_handlers(&risk_engine);
ExecutionEngine::register_msgbus_handlers(&exec_engine);
let trader = Rc::new(RefCell::new(Trader::new(
config.trader_id(),
instance_id,
config.environment(),
clock.clone(),
cache.clone(),
portfolio.clone(),
)));
let ts_created = clock.borrow().timestamp_ns();
Ok(Self {
name,
instance_id,
machine_id,
config: Box::new(config),
cache,
clock,
portfolio,
log_guard,
data_engine,
risk_engine,
exec_engine,
order_emulator,
trader,
ts_created,
ts_started: None,
ts_shutdown: None,
})
}
fn determine_machine_id() -> anyhow::Result<String> {
sysinfo::System::host_name().ok_or_else(|| anyhow::anyhow!("Failed to determine hostname"))
}
fn initialize_logging(
trader_id: TraderId,
instance_id: UUID4,
config: LoggerConfig,
) -> anyhow::Result<LogGuard> {
#[cfg(feature = "tracing-bridge")]
let use_tracing = config.use_tracing;
let log_guard = match init_logging(
trader_id,
instance_id,
config,
FileWriterConfig::default(), ) {
Ok(guard) => guard,
Err(e) => {
if e.downcast_ref::<log::SetLoggerError>().is_some() {
if let Some(guard) = LogGuard::new() {
guard
} else {
return Err(e.context(
"A non-Nautilus logger is already registered; \
cannot initialize Nautilus logging",
));
}
} else {
return Err(e);
}
}
};
#[cfg(feature = "tracing-bridge")]
if use_tracing && !nautilus_common::logging::bridge::tracing_is_initialized() {
nautilus_common::logging::bridge::init_tracing()?;
}
Ok(log_guard)
}
fn initialize_clock(environment: &Environment) -> Rc<RefCell<dyn Clock>> {
match environment {
Environment::Backtest => {
let test_clock = TestClock::new();
Rc::new(RefCell::new(test_clock))
}
#[cfg(feature = "live")]
Environment::Live | Environment::Sandbox => {
let live_clock = nautilus_common::live::clock::LiveClock::default(); Rc::new(RefCell::new(live_clock))
}
#[cfg(not(feature = "live"))]
Environment::Live | Environment::Sandbox => {
panic!(
"Live/Sandbox environment requires the 'live' feature to be enabled. \
Build with `--features live` or add `features = [\"live\"]` to your dependency."
);
}
}
}
fn initialize_cache(cache_config: Option<CacheConfig>) -> Rc<RefCell<Cache>> {
let cache_config = cache_config.unwrap_or_default();
let cache_database: Option<Box<dyn CacheDatabaseAdapter>> = None;
let cache = Cache::new(Some(cache_config), cache_database);
Rc::new(RefCell::new(cache))
}
fn cancel_timers(&self) {
self.clock.borrow_mut().cancel_timers();
}
#[must_use]
pub fn generate_timestamp_ns(&self) -> UnixNanos {
self.clock.borrow().timestamp_ns()
}
#[must_use]
pub fn environment(&self) -> Environment {
self.config.environment()
}
#[must_use]
pub const fn name(&self) -> &str {
self.name.as_str()
}
#[must_use]
pub fn trader_id(&self) -> TraderId {
self.config.trader_id()
}
#[must_use]
pub fn machine_id(&self) -> &str {
&self.machine_id
}
#[must_use]
pub const fn instance_id(&self) -> UUID4 {
self.instance_id
}
#[must_use]
pub fn delay_post_stop(&self) -> Duration {
self.config.delay_post_stop()
}
#[must_use]
pub const fn ts_created(&self) -> UnixNanos {
self.ts_created
}
#[must_use]
pub const fn ts_started(&self) -> Option<UnixNanos> {
self.ts_started
}
#[must_use]
pub const fn ts_shutdown(&self) -> Option<UnixNanos> {
self.ts_shutdown
}
#[must_use]
pub fn load_state(&self) -> bool {
self.config.load_state()
}
#[must_use]
pub fn save_state(&self) -> bool {
self.config.save_state()
}
#[must_use]
pub fn clock(&self) -> Rc<RefCell<dyn Clock>> {
self.clock.clone()
}
#[must_use]
pub fn cache(&self) -> Rc<RefCell<Cache>> {
self.cache.clone()
}
#[must_use]
pub fn portfolio(&self) -> Ref<'_, Portfolio> {
self.portfolio.borrow()
}
#[must_use]
pub fn data_engine(&self) -> Ref<'_, DataEngine> {
self.data_engine.borrow()
}
#[must_use]
pub const fn risk_engine(&self) -> &Rc<RefCell<RiskEngine>> {
&self.risk_engine
}
#[must_use]
pub const fn exec_engine(&self) -> &Rc<RefCell<ExecutionEngine>> {
&self.exec_engine
}
#[must_use]
pub fn trader(&self) -> &Rc<RefCell<Trader>> {
&self.trader
}
pub fn start(&mut self) {
log::info!("Starting");
self.start_engines();
log::info!("Initializing trader");
if let Err(e) = self.trader.borrow_mut().initialize() {
log::error!("Error initializing trader: {e:?}");
return;
}
log::info!("Starting clients...");
if let Err(e) = self.start_clients() {
log::error!("Error starting clients: {e:?}");
}
log::info!("Clients started");
self.ts_started = Some(self.clock.borrow().timestamp_ns());
log::info!("Started");
}
pub async fn start_async(&mut self) {
self.start();
}
pub fn start_trader(&mut self) {
log::info!("Starting trader...");
if let Err(e) = self.trader.borrow_mut().start() {
log::error!("Error starting trader: {e:?}");
}
log::info!("Trader started");
}
pub fn stop_trader(&mut self) {
if !self.trader.borrow().is_running() {
return;
}
log::info!("Stopping trader...");
if let Err(e) = self.trader.borrow_mut().stop() {
log::error!("Error stopping trader: {e}");
}
}
pub async fn finalize_stop(&mut self) {
if let Err(e) = self.stop_all_clients() {
log::error!("Error stopping clients: {e:?}");
}
self.stop_engines();
self.cancel_timers();
self.ts_shutdown = Some(self.clock.borrow().timestamp_ns());
log::info!("Stopped");
}
pub fn reset(&mut self) {
log::info!("Resetting");
if let Err(e) = self.trader.borrow_mut().reset() {
log::error!("Error resetting trader: {e:?}");
}
self.data_engine.borrow_mut().reset();
self.exec_engine.borrow_mut().reset();
self.risk_engine.borrow_mut().reset();
self.ts_started = None;
self.ts_shutdown = None;
log::info!("Reset");
}
pub fn dispose(&mut self) {
log::info!("Disposing");
if let Err(e) = self.trader.borrow_mut().dispose() {
log::error!("Error disposing trader: {e:?}");
}
self.stop_engines();
self.data_engine.borrow_mut().dispose();
self.exec_engine.borrow_mut().dispose();
self.risk_engine.borrow_mut().dispose();
self.cache.borrow_mut().dispose();
get_message_bus().borrow_mut().dispose();
log::info!("Disposed");
}
fn start_engines(&self) {
self.data_engine.borrow_mut().start();
self.exec_engine.borrow_mut().start();
self.risk_engine.borrow_mut().start();
}
fn stop_engines(&self) {
self.data_engine.borrow_mut().stop();
self.exec_engine.borrow_mut().stop();
self.risk_engine.borrow_mut().stop();
}
fn start_clients(&self) -> Result<(), Vec<anyhow::Error>> {
let mut errors = Vec::new();
{
let mut exec_engine = self.exec_engine.borrow_mut();
let exec_adapters = exec_engine.get_clients_mut();
for adapter in exec_adapters {
if let Err(e) = adapter.start() {
log::error!("Error starting execution client {}: {e}", adapter.client_id);
errors.push(e);
}
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
fn stop_all_clients(&self) -> Result<(), Vec<anyhow::Error>> {
let mut errors = Vec::new();
{
let mut exec_engine = self.exec_engine.borrow_mut();
let exec_adapters = exec_engine.get_clients_mut();
for adapter in exec_adapters {
if let Err(e) = adapter.stop() {
log::error!("Error stopping execution client {}: {e}", adapter.client_id);
errors.push(e);
}
}
}
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
#[allow(clippy::await_holding_refcell_ref)] pub async fn connect_data_clients(&mut self) {
log::info!("Connecting data clients...");
self.data_engine.borrow_mut().connect().await;
}
#[allow(clippy::await_holding_refcell_ref)] pub async fn connect_exec_clients(&mut self) {
log::info!("Connecting execution clients...");
self.exec_engine.borrow_mut().connect().await;
}
#[allow(clippy::await_holding_refcell_ref)] pub async fn disconnect_clients(&mut self) -> anyhow::Result<()> {
log::info!("Disconnecting clients...");
self.data_engine.borrow_mut().disconnect().await?;
self.exec_engine.borrow_mut().disconnect().await?;
Ok(())
}
#[must_use]
pub fn check_engines_connected(&self) -> bool {
self.data_engine.borrow().check_connected() && self.exec_engine.borrow().check_connected()
}
#[must_use]
pub fn check_engines_disconnected(&self) -> bool {
self.data_engine.borrow().check_disconnected()
&& self.exec_engine.borrow().check_disconnected()
}
#[must_use]
pub fn data_client_connection_status(&self) -> Vec<(ClientId, bool)> {
self.data_engine.borrow().client_connection_status()
}
#[must_use]
pub fn exec_client_connection_status(&self) -> Vec<(ClientId, bool)> {
self.exec_engine.borrow().client_connection_status()
}
}