use std::{
fmt::Debug,
future::Future,
pin::Pin,
sync::{
Arc,
atomic::{AtomicBool, AtomicU8, Ordering},
},
time::Duration,
};
use nautilus_common::{
actor::{Actor, DataActor},
cache::database::CacheDatabaseAdapter,
component::Component,
enums::{Environment, LogColor},
live::dst,
log_info,
messages::{
DataEvent, ExecutionEvent, ExecutionReport,
data::DataCommand,
execution::{GenerateOrderStatusReports, TradingCommand},
},
timer::TimeEventHandler,
};
use nautilus_core::{
UUID4, UnixNanos,
datetime::{NANOSECONDS_IN_MILLISECOND, mins_to_secs, secs_to_nanos_unchecked},
};
use nautilus_model::{
events::OrderEventAny,
identifiers::{ClientOrderId, TraderId},
orders::Order,
reports::OrderStatusReport,
};
use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
use tabled::{Table, Tabled, settings::Style};
use crate::{
builder::LiveNodeBuilder,
config::{LiveNodeConfig, PluginConfig},
execution::LiveExecutionClient,
manager::{ExecutionManager, ExecutionManagerConfig, OpenOrderReportCheck},
runner::{AsyncRunner, AsyncRunnerChannels},
};
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[repr(u8)]
pub enum NodeState {
#[default]
Idle = 0,
Starting = 1,
Running = 2,
ShuttingDown = 3,
Stopped = 4,
}
impl NodeState {
#[must_use]
pub const fn from_u8(value: u8) -> Self {
match value {
0 => Self::Idle,
1 => Self::Starting,
2 => Self::Running,
3 => Self::ShuttingDown,
4 => Self::Stopped,
_ => panic!("Invalid NodeState value"),
}
}
#[must_use]
pub const fn as_u8(self) -> u8 {
self as u8
}
#[must_use]
pub const fn is_running(&self) -> bool {
matches!(self, Self::Running)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum EngineConnectionStatus {
Connected,
TimedOut,
StopRequested,
ShutdownRequested,
}
impl EngineConnectionStatus {
const fn abort_reason(self) -> Option<&'static str> {
match self {
Self::Connected | Self::TimedOut => None,
Self::StopRequested => Some("Stop signal received during startup"),
Self::ShutdownRequested => Some("Shutdown signal received during startup"),
}
}
}
#[derive(Clone, Debug)]
pub struct LiveNodeHandle {
pub(crate) stop_flag: Arc<AtomicBool>,
pub(crate) state: Arc<AtomicU8>,
}
impl Default for LiveNodeHandle {
fn default() -> Self {
Self::new()
}
}
impl LiveNodeHandle {
#[must_use]
pub fn new() -> Self {
Self {
stop_flag: Arc::new(AtomicBool::new(false)),
state: Arc::new(AtomicU8::new(NodeState::Idle.as_u8())),
}
}
pub(crate) fn set_state(&self, state: NodeState) {
self.state.store(state.as_u8(), Ordering::Relaxed);
if state == NodeState::Running {
self.stop_flag.store(false, Ordering::Relaxed);
}
}
#[must_use]
pub fn state(&self) -> NodeState {
NodeState::from_u8(self.state.load(Ordering::Relaxed))
}
#[must_use]
pub fn should_stop(&self) -> bool {
self.stop_flag.load(Ordering::Relaxed)
}
#[must_use]
pub fn is_running(&self) -> bool {
self.state().is_running()
}
pub fn stop(&self) {
self.stop_flag.store(true, Ordering::Relaxed);
}
}
#[derive(Debug)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.live", unsendable)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")
)]
pub struct LiveNode {
kernel: NautilusKernel,
runner: Option<AsyncRunner>,
config: LiveNodeConfig,
handle: LiveNodeHandle,
exec_manager: ExecutionManager,
exec_clients: Vec<LiveExecutionClient>,
shutdown_deadline: Option<dst::time::Instant>,
#[cfg(feature = "plugin")]
plugins: crate::plugin::NodePlugins,
#[cfg(feature = "python")]
#[allow(dead_code)] python_actors: Vec<pyo3::Py<pyo3::PyAny>>,
}
impl LiveNode {
#[must_use]
pub(crate) fn new_from_builder(
kernel: NautilusKernel,
runner: AsyncRunner,
config: LiveNodeConfig,
exec_manager: ExecutionManager,
exec_clients: Vec<LiveExecutionClient>,
) -> Self {
Self {
kernel,
runner: Some(runner),
config,
handle: LiveNodeHandle::new(),
exec_manager,
exec_clients,
shutdown_deadline: None,
#[cfg(feature = "plugin")]
plugins: crate::plugin::NodePlugins::default(),
#[cfg(feature = "python")]
python_actors: Vec::new(),
}
}
pub fn builder(
trader_id: TraderId,
environment: Environment,
) -> anyhow::Result<LiveNodeBuilder> {
LiveNodeBuilder::new(trader_id, environment)
}
pub fn build(name: String, config: Option<LiveNodeConfig>) -> anyhow::Result<Self> {
let mut config = config.unwrap_or_default();
config.environment = Environment::Live;
match config.environment() {
Environment::Sandbox | Environment::Live => {}
Environment::Backtest => {
anyhow::bail!("LiveNode cannot be used with Backtest environment");
}
}
config.validate_runtime_support()?;
if config.event_store.is_some() {
anyhow::bail!(
"LiveNodeConfig.event_store is set but LiveNode::build cannot install a factory; \
use LiveNodeBuilder::with_event_store(...) instead"
);
}
let runner = AsyncRunner::new();
runner.bind_senders();
let kernel = NautilusKernel::new(name, config.clone())?;
let exec_manager_config =
ExecutionManagerConfig::from(&config.exec_engine).with_trader_id(config.trader_id);
let exec_manager = ExecutionManager::new(
kernel.clock.clone(),
kernel.cache.clone(),
exec_manager_config,
);
#[cfg_attr(
not(feature = "plugin"),
expect(unused_mut, reason = "plugin builds need mutable node state")
)]
let mut node = Self {
kernel,
runner: Some(runner),
config,
handle: LiveNodeHandle::new(),
exec_manager,
exec_clients: Vec::new(),
shutdown_deadline: None,
#[cfg(feature = "plugin")]
plugins: crate::plugin::NodePlugins::default(),
#[cfg(feature = "python")]
python_actors: Vec::new(),
};
node.load_configured_plugins()?;
log::info!("LiveNode built successfully with kernel config");
Ok(node)
}
#[cfg(feature = "plugin")]
pub(crate) fn load_configured_plugins(&mut self) -> anyhow::Result<()> {
let configs = self.config.plugins.clone();
if configs.is_empty() {
return Ok(());
}
if self.state() != NodeState::Idle {
anyhow::bail!("Cannot load plug-ins after the node leaves Idle state");
}
let (loader, adapters) =
crate::plugin::load_configured_plugin_batch(&configs)?.into_parts();
for adapter in adapters {
self.install_plugin_adapter(adapter)?;
}
self.plugins.set_loader(loader);
Ok(())
}
#[cfg(not(feature = "plugin"))]
pub(crate) fn load_configured_plugins(&self) -> anyhow::Result<()> {
if self.config.plugins.is_empty() {
return Ok(());
}
anyhow::bail!("LiveNodeConfig.plugins requires the `plugin` feature")
}
#[cfg(feature = "plugin")]
pub fn add_plugin(&mut self, config: PluginConfig) -> anyhow::Result<()> {
config.validate_runtime_support(self.config.plugins.len())?;
if self.state() != NodeState::Idle {
anyhow::bail!("Cannot add plug-in after the node leaves Idle state");
}
let adapter = self.plugins.load_plugin(&config)?;
self.install_plugin_adapter(adapter)?;
self.config.plugins.push(config);
Ok(())
}
#[cfg(not(feature = "plugin"))]
#[expect(
clippy::needless_pass_by_value,
reason = "signature mirrors the plugin-enabled API"
)]
pub fn add_plugin(&mut self, config: PluginConfig) -> anyhow::Result<()> {
let _ = config;
anyhow::bail!("LiveNode::add_plugin requires the `plugin` feature")
}
#[cfg(feature = "plugin")]
fn install_plugin_adapter(
&mut self,
adapter: crate::plugin::NodePluginAdapter,
) -> anyhow::Result<()> {
match adapter {
crate::plugin::NodePluginAdapter::Actor(adapter) => self.add_actor(*adapter),
crate::plugin::NodePluginAdapter::Strategy(adapter) => self.add_strategy(*adapter),
crate::plugin::NodePluginAdapter::Controller(adapter) => {
self.plugins.push_controller(adapter);
Ok(())
}
}
}
#[must_use]
pub fn handle(&self) -> LiveNodeHandle {
self.handle.clone()
}
pub async fn start(&mut self) -> anyhow::Result<()> {
if self.state().is_running() {
anyhow::bail!("Already running");
}
if let Some(runner) = self.runner.as_ref() {
runner.bind_senders();
}
self.handle.set_state(NodeState::Starting);
self.kernel.reset_shutdown_flag();
self.kernel.start_async().await;
if self.kernel.is_event_store_replay() {
log::info!(
"Event-store replay loaded; skipping live client connection and reconciliation",
);
self.handle.set_state(NodeState::Running);
return Ok(());
}
if self.kernel.is_event_store_replay_configured() {
self.abort_startup("Event-store replay did not start")
.await?;
return Ok(());
}
self.kernel.connect_data_clients().await;
if let Some(runner) = self.runner.as_mut() {
runner.flush_pending_data();
}
self.kernel.connect_exec_clients().await;
if let Some(reason) = self.startup_abort_reason() {
self.abort_startup(reason).await?;
return Ok(());
}
match self.await_engines_connected().await {
EngineConnectionStatus::Connected => {}
EngineConnectionStatus::TimedOut => {
log::error!("Cannot start trader: engine client(s) not connected");
self.handle.set_state(NodeState::Running);
return Ok(());
}
EngineConnectionStatus::StopRequested => {
self.abort_startup("Stop signal received during startup")
.await?;
return Ok(());
}
EngineConnectionStatus::ShutdownRequested => {
self.abort_startup("Shutdown signal received during startup")
.await?;
return Ok(());
}
}
self.perform_startup_reconciliation().await?;
self.kernel.start_trader();
#[cfg(feature = "plugin")]
if let Err(e) = self.plugins.start_controllers() {
return self.abort_after_trader_start_failure(e).await;
}
self.handle.set_state(NodeState::Running);
Ok(())
}
pub async fn stop(&mut self) -> anyhow::Result<()> {
if !self.state().is_running() {
anyhow::bail!("Not running");
}
self.handle.set_state(NodeState::ShuttingDown);
#[cfg(feature = "plugin")]
let controller_stop_result = self.plugins.stop_controllers();
#[cfg(not(feature = "plugin"))]
let controller_stop_result: anyhow::Result<()> = Ok(());
self.kernel.stop_trader();
let delay = self.kernel.delay_post_stop();
log::info!("Awaiting residual events ({delay:?})...");
dst::time::sleep(delay).await;
let stop_result = self.finalize_stop().await;
match (controller_stop_result, stop_result) {
(Ok(()), Ok(())) => Ok(()),
(Err(controller_err), Ok(())) => Err(controller_err),
(Ok(()), Err(stop_err)) => Err(stop_err),
(Err(controller_err), Err(stop_err)) => {
log::error!("Error stopping plug-in controllers: {controller_err}");
Err(stop_err)
}
}
}
async fn await_engines_connected(&self) -> EngineConnectionStatus {
log::info!(
"Awaiting engine connections ({:?} timeout)...",
self.config.timeout_connection
);
let start = dst::time::Instant::now();
let timeout = self.config.timeout_connection;
let interval = Duration::from_millis(100);
while start.elapsed() < timeout {
if self.handle.should_stop() {
log::warn!("Stop signal received, aborting connection wait");
return EngineConnectionStatus::StopRequested;
}
if self.kernel.is_shutdown_requested() {
log::warn!("Shutdown signal received, aborting connection wait");
return EngineConnectionStatus::ShutdownRequested;
}
if self.kernel.check_engines_connected() {
log::info!("All engine clients connected");
return EngineConnectionStatus::Connected;
}
dst::time::sleep(interval).await;
}
self.log_connection_status();
EngineConnectionStatus::TimedOut
}
async fn await_engines_disconnected(&self) {
log::info!(
"Awaiting engine disconnections ({:?} timeout)...",
self.config.timeout_disconnection
);
let start = dst::time::Instant::now();
let timeout = self.config.timeout_disconnection;
let interval = Duration::from_millis(100);
while start.elapsed() < timeout {
if self.kernel.check_engines_disconnected() {
log::info!("All engine clients disconnected");
return;
}
dst::time::sleep(interval).await;
}
log::error!(
"Timed out ({:?}) waiting for engines to disconnect\n\
DataEngine.check_disconnected() == {}\n\
ExecEngine.check_disconnected() == {}",
timeout,
self.kernel.data_engine().check_disconnected(),
self.kernel.exec_engine().borrow().check_disconnected(),
);
}
fn log_connection_status(&self) {
#[derive(Tabled)]
struct ClientStatus {
#[tabled(rename = "Client")]
client: String,
#[tabled(rename = "Type")]
client_type: &'static str,
#[tabled(rename = "Connected")]
connected: bool,
}
let data_status = self.kernel.data_client_connection_status();
let exec_status = self.kernel.exec_client_connection_status();
let mut rows: Vec<ClientStatus> = Vec::new();
for (client_id, connected) in data_status {
rows.push(ClientStatus {
client: client_id.to_string(),
client_type: "Data",
connected,
});
}
for (client_id, connected) in exec_status {
rows.push(ClientStatus {
client: client_id.to_string(),
client_type: "Execution",
connected,
});
}
let table = Table::new(&rows).with(Style::rounded()).to_string();
log::warn!(
"Timed out ({:?}) waiting for engines to connect\n\n{table}\n\n\
DataEngine.check_connected() == {}\n\
ExecEngine.check_connected() == {}",
self.config.timeout_connection,
self.kernel.data_engine().check_connected(),
self.kernel.exec_engine().borrow().check_connected(),
);
}
#[expect(clippy::await_holding_refcell_ref)] async fn perform_startup_reconciliation(&mut self) -> anyhow::Result<()> {
if !self.config.exec_engine.reconciliation {
log::info!("Startup reconciliation disabled");
return Ok(());
}
log_info!(
"Starting execution state reconciliation...",
color = LogColor::Blue
);
let lookback_mins = self
.config
.exec_engine
.reconciliation_lookback_mins
.map(|m| m as u64);
let timeout = self.config.timeout_reconciliation;
let start = dst::time::Instant::now();
let client_ids = self.kernel.exec_engine.borrow().client_ids();
for client_id in client_ids {
if start.elapsed() > timeout {
log::warn!("Reconciliation timeout reached, stopping early");
break;
}
log_info!(
"Requesting mass status from {}...",
client_id,
color = LogColor::Blue
);
let mass_status_result = self
.kernel
.exec_engine
.borrow_mut()
.generate_mass_status(&client_id, lookback_mins)
.await;
match mass_status_result {
Ok(Some(mass_status)) => {
log_info!(
"Reconciling ExecutionMassStatus for {}",
client_id,
color = LogColor::Blue
);
let exec_engine_rc = self.kernel.exec_engine.clone();
let result = self
.exec_manager
.reconcile_execution_mass_status(mass_status, exec_engine_rc)
.await;
if result.events.is_empty() {
log_info!(
"Reconciliation for {} succeeded",
client_id,
color = LogColor::Blue
);
} else {
log::info!(
color = LogColor::Blue as u8;
"Reconciliation for {} processed {} events",
client_id,
result.events.len()
);
}
if !result.external_orders.is_empty() {
let exec_engine = self.kernel.exec_engine.borrow();
for external in result.external_orders {
exec_engine.register_external_order(
external.client_order_id,
external.venue_order_id,
external.instrument_id,
external.strategy_id,
external.ts_init,
);
}
}
}
Ok(None) => {
log::warn!(
"No mass status available from {client_id} \
(likely adapter error when generating reports)"
);
}
Err(e) => {
log::warn!("Failed to get mass status from {client_id}: {e}");
}
}
}
self.kernel.portfolio.borrow_mut().initialize_orders();
self.kernel.portfolio.borrow_mut().initialize_positions();
let elapsed_secs = start.elapsed().as_secs_f64();
log_info!(
"Startup reconciliation completed in {:.2}s",
elapsed_secs,
color = LogColor::Blue
);
Ok(())
}
pub async fn run(&mut self) -> anyhow::Result<()> {
if self.state().is_running() {
anyhow::bail!("Already running");
}
let Some(runner) = self.runner.take() else {
anyhow::bail!("Runner already consumed - run() called twice");
};
runner.bind_senders();
let AsyncRunnerChannels {
mut time_evt_rx,
mut exec_evt_rx,
mut exec_cmd_rx,
mut data_evt_rx,
mut data_cmd_rx,
} = runner.take_channels();
log::info!("Event loop starting");
self.handle.set_state(NodeState::Starting);
self.kernel.reset_shutdown_flag();
self.kernel.start_async().await;
if self.kernel.is_event_store_replay() {
log::info!(
"Event-store replay loaded; skipping live client connection and reconciliation",
);
self.handle.set_state(NodeState::Running);
return Ok(());
}
if self.kernel.is_event_store_replay_configured() {
self.abort_startup("Event-store replay did not start")
.await?;
return Ok(());
}
let stop_handle = self.handle.clone();
let mut pending = PendingEvents::default();
drive_with_event_buffering(
self.kernel.connect_data_clients(),
&mut pending,
&mut time_evt_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
)
.await;
flush_pending_data(&mut pending, &mut data_evt_rx, &mut data_cmd_rx);
debug_assert!(
pending.data_evts.is_empty() && pending.data_cmds.is_empty(),
"data must be drained into cache before exec clients connect",
);
let engine_connection_status = drive_with_event_buffering(
self.connect_exec_phase(),
&mut pending,
&mut time_evt_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
)
.await?;
flush_all_pending(
&mut pending,
&mut time_evt_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
debug_assert!(
pending.is_empty(),
"all startup events must be processed before reconciliation",
);
if let Some(reason) = engine_connection_status
.abort_reason()
.or_else(|| self.startup_abort_reason())
{
self.abort_startup(reason).await?;
self.drain_channels(
&mut time_evt_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
log::info!("Event loop stopped");
return Ok(());
}
if engine_connection_status == EngineConnectionStatus::Connected {
self.perform_startup_reconciliation().await?;
self.kernel.start_trader();
#[cfg(feature = "plugin")]
if let Err(e) = self.plugins.start_controllers() {
let result = self.abort_after_trader_start_failure(e).await;
self.drain_channels(
&mut time_evt_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
log::info!("Event loop stopped");
return result;
}
} else {
log::error!("Not starting trader: engine client(s) not connected");
}
self.handle.set_state(NodeState::Running);
let exec_config = &self.config.exec_engine;
let inflight_interval_ns =
(exec_config.inflight_check_interval_ms as u64) * NANOSECONDS_IN_MILLISECOND;
let open_interval_ns = exec_config
.open_check_interval_secs
.filter(|&s| s > 0.0)
.map_or(0, secs_to_nanos_unchecked);
let position_check_configured = exec_config
.position_check_interval_secs
.is_some_and(|interval_secs| interval_secs > 0.0);
let has_clients = !self
.kernel
.exec_engine
.borrow()
.get_all_clients()
.is_empty();
let recon_enabled = has_clients && (inflight_interval_ns > 0 || open_interval_ns > 0);
let recon_min_interval = if recon_enabled {
let mut intervals = Vec::new();
if exec_config.inflight_check_interval_ms > 0 {
intervals.push(Duration::from_millis(
exec_config.inflight_check_interval_ms as u64,
));
}
if let Some(s) = exec_config.open_check_interval_secs.filter(|&s| s > 0.0) {
intervals.push(Duration::from_secs_f64(s));
}
intervals
.into_iter()
.min()
.unwrap_or(Duration::from_secs(1))
} else {
Duration::from_secs(1) };
let startup_delay = if self.config.exec_engine.reconciliation {
Duration::from_secs_f64(exec_config.reconciliation_startup_delay_secs)
} else {
Duration::ZERO
};
let recon_start = dst::time::Instant::now() + startup_delay;
let mut ts_last_inflight = self.exec_manager.generate_timestamp_ns();
let mut ts_last_open = ts_last_inflight;
let far_future = Duration::from_secs(86400 * 365 * 100);
let make_schedule = |opt_dur: Option<Duration>| -> (Duration, dst::time::Instant) {
let dur = opt_dur.unwrap_or(far_future);
(dur, recon_start + dur)
};
let (recon_interval, mut recon_next) = make_schedule(if recon_enabled {
Some(recon_min_interval)
} else {
None
});
let (purge_orders_interval, mut purge_orders_next) = make_schedule(
exec_config
.purge_closed_orders_interval_mins
.filter(|&m| m > 0)
.map(|m| Duration::from_secs(mins_to_secs(m as u64))),
);
let (purge_positions_interval, mut purge_positions_next) = make_schedule(
exec_config
.purge_closed_positions_interval_mins
.filter(|&m| m > 0)
.map(|m| Duration::from_secs(mins_to_secs(m as u64))),
);
let (purge_account_interval, mut purge_account_next) = make_schedule(
exec_config
.purge_account_events_interval_mins
.filter(|&m| m > 0)
.map(|m| Duration::from_secs(mins_to_secs(m as u64))),
);
let (own_books_interval, mut own_books_next) = make_schedule(
exec_config
.own_books_audit_interval_secs
.filter(|&s| s > 0.0)
.map(Duration::from_secs_f64),
);
let (prune_fills_interval, mut prune_fills_next) =
make_schedule(Some(Duration::from_secs(60)));
let mut maintenance_timer = dst::time::interval(Duration::from_millis(100));
maintenance_timer.set_missed_tick_behavior(dst::time::MissedTickBehavior::Skip);
let mut stop_check_timer = dst::time::interval(Duration::from_millis(100));
stop_check_timer.set_missed_tick_behavior(dst::time::MissedTickBehavior::Skip);
let mut residual_events = 0usize;
let mut open_order_report_task: Option<OpenOrderReportTask> = None;
let ctrl_c = dst::signal::ctrl_c();
tokio::pin!(ctrl_c);
if has_clients && position_check_configured {
log::warn!(
"Skipping continuous position reconciliation: no non-blocking position \
reconciliation path is configured"
);
}
loop {
let shutdown_deadline = self.shutdown_deadline;
let is_shutting_down = self.state() == NodeState::ShuttingDown;
let is_running = self.state() == NodeState::Running;
tokio::select! {
biased;
result = &mut ctrl_c, if is_running => {
match result {
Ok(()) => log::info!("Received SIGINT, shutting down"),
Err(e) => log::error!("Failed to listen for SIGINT: {e}"),
}
self.initiate_shutdown();
}
_ = stop_check_timer.tick(), if is_running => {
if stop_handle.should_stop() {
log::info!("Received stop signal from handle");
self.initiate_shutdown();
} else if self.kernel.is_shutdown_requested() {
log::info!("Received ShutdownSystem command, shutting down");
self.initiate_shutdown();
}
}
() = async {
match shutdown_deadline {
Some(deadline) => dst::time::sleep_until(deadline).await,
None => std::future::pending::<()>().await,
}
}, if self.state() == NodeState::ShuttingDown => {
break;
}
result = async {
match open_order_report_task.as_mut() {
Some(task) => task.future.as_mut().await,
None => std::future::pending::<OpenOrderReportResult>().await,
}
}, if open_order_report_task.is_some() => {
open_order_report_task = None;
let events = self
.exec_manager
.reconcile_open_order_reports(&result.check, result.reports);
self.process_reconciliation_events(&events);
}
_ = maintenance_timer.tick(), if is_running => {
let mut now = dst::time::Instant::now();
if recon_enabled && now >= recon_next {
let recon_intervals = ReconciliationCheckIntervals {
inflight_ns: inflight_interval_ns,
open_ns: open_interval_ns,
};
let mut recon_state = ReconciliationCheckState {
ts_last_inflight: &mut ts_last_inflight,
ts_last_open: &mut ts_last_open,
open_order_report_task: &mut open_order_report_task,
};
self.run_reconciliation_checks(
recon_intervals,
&mut recon_state,
);
now = dst::time::Instant::now();
recon_next = now + recon_interval;
}
if now >= purge_orders_next {
self.exec_manager.purge_closed_orders();
purge_orders_next = now + purge_orders_interval;
}
if now >= purge_positions_next {
self.exec_manager.purge_closed_positions();
purge_positions_next = now + purge_positions_interval;
}
if now >= purge_account_next {
self.exec_manager.purge_account_events();
purge_account_next = now + purge_account_interval;
}
if now >= own_books_next {
self.kernel.cache().borrow_mut().audit_own_order_books();
own_books_next = now + own_books_interval;
}
if now >= prune_fills_next {
self.exec_manager.prune_recent_fills_cache(60.0);
prune_fills_next = now + prune_fills_interval;
}
}
Some(handler) = time_evt_rx.recv() => {
AsyncRunner::handle_time_event(handler);
if is_shutting_down {
log::debug!("Residual time event");
residual_events += 1;
}
}
Some(evt) = exec_evt_rx.recv() => {
if is_shutting_down {
log::debug!("Residual exec event: {evt:?}");
residual_events += 1;
}
let mut close_ids: Vec<ClientOrderId> = Vec::new();
match &evt {
ExecutionEvent::Order(order_evt) => {
self.exec_manager.record_local_activity(order_evt.client_order_id());
match order_evt {
OrderEventAny::Filled(fill) => {
self.exec_manager.record_position_activity(
fill.instrument_id,
fill.account_id,
fill.ts_event,
);
self.exec_manager.mark_fill_processed(fill.trade_id);
}
OrderEventAny::Accepted(_) => {
self.exec_manager.clear_recon_tracking(
&order_evt.client_order_id(), true,
);
}
OrderEventAny::Rejected(_)
| OrderEventAny::Canceled(_)
| OrderEventAny::Expired(_)
| OrderEventAny::Denied(_) => {
self.exec_manager.clear_recon_tracking(
&order_evt.client_order_id(), true,
);
}
_ => {}
}
close_ids.push(order_evt.client_order_id());
}
ExecutionEvent::OrderSubmittedBatch(batch) => {
for submitted in &batch.events {
self.exec_manager.record_local_activity(submitted.client_order_id);
}
}
ExecutionEvent::OrderAcceptedBatch(batch) => {
for accepted in &batch.events {
self.exec_manager.record_local_activity(accepted.client_order_id);
self.exec_manager.clear_recon_tracking(
&accepted.client_order_id, true,
);
}
}
ExecutionEvent::OrderCanceledBatch(batch) => {
for canceled in &batch.events {
self.exec_manager.record_local_activity(canceled.client_order_id);
self.exec_manager.clear_recon_tracking(
&canceled.client_order_id, true,
);
close_ids.push(canceled.client_order_id);
}
}
ExecutionEvent::Report(report) => {
if let ExecutionReport::Fill(fill_report) = report
&& self.exec_manager.is_fill_recently_processed(&fill_report.trade_id) {
log::debug!(
"Skipping recently processed fill report: {}",
fill_report.trade_id,
);
continue;
}
self.exec_manager.observe_execution_report(report);
}
ExecutionEvent::Account(_) => {}
}
AsyncRunner::handle_exec_event(evt);
for coid in &close_ids {
let is_closed = self.kernel.cache().borrow()
.order(coid).is_some_and(|o| o.is_closed());
if is_closed {
self.exec_manager.clear_recon_tracking(coid, true);
}
}
}
Some(cmd) = exec_cmd_rx.recv() => {
if is_shutting_down {
log::debug!("Residual exec command: {cmd:?}");
residual_events += 1;
}
match &cmd {
TradingCommand::SubmitOrder(submit) => {
self.exec_manager.register_inflight(submit.client_order_id);
}
TradingCommand::SubmitOrderList(submit) => {
for order_init in &submit.order_inits {
self.exec_manager.register_inflight(order_init.client_order_id);
}
}
TradingCommand::ModifyOrder(modify) => {
self.exec_manager.register_inflight(modify.client_order_id);
}
TradingCommand::CancelOrder(cancel) => {
self.exec_manager.register_inflight(cancel.client_order_id);
}
_ => {}
}
AsyncRunner::handle_exec_command(cmd);
}
Some(evt) = data_evt_rx.recv() => {
if is_shutting_down {
log::debug!("Residual data event: {evt:?}");
residual_events += 1;
}
AsyncRunner::handle_data_event(evt);
}
Some(cmd) = data_cmd_rx.recv() => {
if is_shutting_down {
log::debug!("Residual data command: {cmd:?}");
residual_events += 1;
}
AsyncRunner::handle_data_command(cmd);
}
}
}
if residual_events > 0 {
log::debug!("Processed {residual_events} residual events during shutdown");
}
drop(open_order_report_task.take());
let _ = self.kernel.cache().borrow().check_residuals();
self.finalize_stop().await?;
self.drain_channels(
&mut time_evt_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
log::info!("Event loop stopped");
Ok(())
}
fn process_reconciliation_events(&mut self, events: &[OrderEventAny]) {
if events.is_empty() {
return;
}
log::info!(
"Processing {} reconciliation event{}",
events.len(),
if events.len() == 1 { "" } else { "s" }
);
for event in events {
self.exec_manager
.record_local_activity(event.client_order_id());
if let OrderEventAny::Filled(fill) = event {
self.exec_manager.record_position_activity(
fill.instrument_id,
fill.account_id,
fill.ts_event,
);
self.exec_manager.mark_fill_processed(fill.trade_id);
}
self.kernel.exec_engine.borrow_mut().process(event);
}
}
async fn connect_exec_phase(&mut self) -> anyhow::Result<EngineConnectionStatus> {
self.kernel.connect_exec_clients().await;
Ok(self.await_engines_connected().await)
}
fn startup_abort_reason(&self) -> Option<&'static str> {
if self.handle.should_stop() {
Some("Stop signal received during startup")
} else if self.kernel.is_shutdown_requested() {
Some("Shutdown signal received during startup")
} else {
None
}
}
async fn abort_startup(&mut self, reason: &str) -> anyhow::Result<()> {
log::info!("{reason}, aborting startup");
self.handle.set_state(NodeState::ShuttingDown);
self.finalize_stop().await
}
#[cfg(feature = "plugin")]
async fn abort_after_trader_start_failure(
&mut self,
start_err: anyhow::Error,
) -> anyhow::Result<()> {
log::info!("Plug-in controller startup failed, aborting startup");
self.handle.set_state(NodeState::ShuttingDown);
self.kernel.stop_trader();
if let Err(finalize_err) = self.finalize_stop().await {
anyhow::bail!(
"failed to start plug-in controller: {start_err}; failed to finalize startup abort: {finalize_err}"
);
}
Err(start_err)
}
fn initiate_shutdown(&mut self) {
#[cfg(feature = "plugin")]
if let Err(e) = self.plugins.stop_controllers() {
log::error!("Error stopping plug-in controllers: {e}");
}
self.kernel.stop_trader();
let delay = self.kernel.delay_post_stop();
log::info!("Awaiting residual events ({delay:?})...");
self.shutdown_deadline = Some(dst::time::Instant::now() + delay);
self.handle.set_state(NodeState::ShuttingDown);
}
async fn finalize_stop(&mut self) -> anyhow::Result<()> {
let disconnect_result = self.kernel.disconnect_clients().await;
if let Err(ref e) = disconnect_result {
log::error!("Error disconnecting clients: {e}");
}
self.await_engines_disconnected().await;
self.kernel.finalize_stop().await;
self.handle.set_state(NodeState::Stopped);
disconnect_result
}
fn drain_channels(
&self,
time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
) {
let mut drained = 0;
while let Ok(handler) = time_evt_rx.try_recv() {
AsyncRunner::handle_time_event(handler);
drained += 1;
}
while let Ok(cmd) = data_cmd_rx.try_recv() {
AsyncRunner::handle_data_command(cmd);
drained += 1;
}
while let Ok(evt) = data_evt_rx.try_recv() {
AsyncRunner::handle_data_event(evt);
drained += 1;
}
while let Ok(cmd) = exec_cmd_rx.try_recv() {
AsyncRunner::handle_exec_command(cmd);
drained += 1;
}
while let Ok(evt) = exec_evt_rx.try_recv() {
AsyncRunner::handle_exec_event(evt);
drained += 1;
}
if drained > 0 {
log::info!("Drained {drained} remaining events during shutdown");
}
}
#[must_use]
pub fn environment(&self) -> Environment {
self.kernel.environment()
}
#[must_use]
pub const fn kernel(&self) -> &NautilusKernel {
&self.kernel
}
#[must_use]
pub const fn kernel_mut(&mut self) -> &mut NautilusKernel {
&mut self.kernel
}
#[must_use]
pub fn trader_id(&self) -> TraderId {
self.kernel.trader_id()
}
#[must_use]
pub const fn instance_id(&self) -> UUID4 {
self.kernel.instance_id()
}
#[must_use]
pub fn state(&self) -> NodeState {
self.handle.state()
}
#[must_use]
pub fn is_running(&self) -> bool {
self.state().is_running()
}
pub fn set_cache_database(
&mut self,
database: Box<dyn CacheDatabaseAdapter>,
) -> anyhow::Result<()> {
if self.state() != NodeState::Idle {
anyhow::bail!(
"Cannot set cache database while node is running, set it before calling start()"
);
}
self.kernel.cache().borrow_mut().set_database(database);
Ok(())
}
#[must_use]
pub fn exec_manager(&self) -> &ExecutionManager {
&self.exec_manager
}
#[must_use]
pub fn exec_manager_mut(&mut self) -> &mut ExecutionManager {
&mut self.exec_manager
}
pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
where
T: DataActor + Component + Actor + 'static,
{
if self.state() != NodeState::Idle {
anyhow::bail!(
"Cannot add actor while node is running, add actors before calling start()"
);
}
self.kernel.trader.borrow_mut().add_actor(actor)
}
pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
where
F: FnOnce() -> anyhow::Result<T>,
T: DataActor + Component + Actor + 'static,
{
if self.state() != NodeState::Idle {
anyhow::bail!(
"Cannot add actor while node is running, add actors before calling start()"
);
}
self.kernel
.trader
.borrow_mut()
.add_actor_from_factory(factory)
}
pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
where
T: Strategy + Component + Debug + 'static,
{
if self.state() != NodeState::Idle {
anyhow::bail!(
"Cannot add strategy while node is running, add strategies before calling start()"
);
}
let strategy_id = self
.kernel
.trader
.borrow()
.prepare_strategy_for_registration(&mut strategy)?;
if let Some(claims) = strategy.external_order_claims() {
for instrument_id in claims {
self.exec_manager
.claim_external_orders(instrument_id, strategy_id);
}
log_info!(
"Registered external order claims for {}: {:?}",
strategy_id,
strategy.external_order_claims(),
color = LogColor::Blue
);
}
self.kernel.trader.borrow_mut().add_strategy(strategy)
}
pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
where
T: ExecutionAlgorithm + Component + Debug + 'static,
{
if self.state() != NodeState::Idle {
anyhow::bail!(
"Cannot add exec algorithm while node is running, add exec algorithms before calling start()"
);
}
self.kernel
.trader
.borrow_mut()
.add_exec_algorithm(exec_algorithm)
}
fn run_reconciliation_checks(
&mut self,
intervals: ReconciliationCheckIntervals,
state: &mut ReconciliationCheckState<'_>,
) {
let ts_now = self.exec_manager.generate_timestamp_ns();
if reconciliation_check_due(ts_now, *state.ts_last_inflight, intervals.inflight_ns) {
if self.state() == NodeState::ShuttingDown {
return;
}
let result = self.exec_manager.check_inflight_orders();
self.process_reconciliation_events(&result.events);
for cmd in result.queries {
AsyncRunner::handle_exec_command(cmd);
}
*state.ts_last_inflight = ts_now;
}
if reconciliation_check_due(ts_now, *state.ts_last_open, intervals.open_ns) {
if self.state() == NodeState::ShuttingDown {
return;
}
if state.open_order_report_task.is_none() {
*state.open_order_report_task = self.start_open_order_report_check();
} else {
log::debug!("Open-order reconciliation already in progress");
}
*state.ts_last_open = ts_now;
}
}
fn start_open_order_report_check(&self) -> Option<OpenOrderReportTask> {
if self.exec_clients.is_empty() {
log::debug!("No execution clients to check orders consistency");
return None;
}
let check = self
.exec_manager
.prepare_open_order_report_check(UUID4::new());
let command = check.command.clone();
let clients = self.exec_clients.clone();
Some(OpenOrderReportTask {
future: Box::pin(async move {
let reports = request_open_order_reports(clients, command).await;
OpenOrderReportResult { check, reports }
}),
})
}
}
async fn request_open_order_reports(
clients: Vec<LiveExecutionClient>,
command: GenerateOrderStatusReports,
) -> Vec<OrderStatusReport> {
let mut all_reports = Vec::new();
for client in clients {
match client.generate_order_status_reports(&command).await {
Ok(reports) => {
all_reports.extend(reports);
}
Err(e) => {
log::error!(
"Failed to generate order status reports from {}: {e}",
client.client_id()
);
}
}
}
all_reports
}
fn reconciliation_check_due(ts_now: UnixNanos, ts_last: UnixNanos, interval_ns: u64) -> bool {
interval_ns > 0
&& ts_now
.duration_since(&ts_last)
.is_some_and(|elapsed_ns| elapsed_ns >= interval_ns)
}
#[derive(Clone, Copy)]
struct ReconciliationCheckIntervals {
inflight_ns: u64,
open_ns: u64,
}
struct ReconciliationCheckState<'a> {
ts_last_inflight: &'a mut UnixNanos,
ts_last_open: &'a mut UnixNanos,
open_order_report_task: &'a mut Option<OpenOrderReportTask>,
}
type OpenOrderReportFuture = Pin<Box<dyn Future<Output = OpenOrderReportResult>>>;
struct OpenOrderReportTask {
future: OpenOrderReportFuture,
}
struct OpenOrderReportResult {
check: OpenOrderReportCheck,
reports: Vec<OrderStatusReport>,
}
fn flush_pending_data(
pending: &mut PendingEvents,
data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
) {
loop {
let mut progressed = pending.drain_data();
while let Ok(evt) = data_evt_rx.try_recv() {
AsyncRunner::handle_data_event(evt);
progressed = true;
}
while let Ok(cmd) = data_cmd_rx.try_recv() {
AsyncRunner::handle_data_command(cmd);
progressed = true;
}
if !progressed {
break;
}
}
}
fn flush_all_pending(
pending: &mut PendingEvents,
time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
) {
while let Ok(handler) = time_evt_rx.try_recv() {
AsyncRunner::handle_time_event(handler);
}
while let Ok(evt) = data_evt_rx.try_recv() {
pending.data_evts.push(evt);
}
while let Ok(cmd) = data_cmd_rx.try_recv() {
pending.data_cmds.push(cmd);
}
while let Ok(evt) = exec_evt_rx.try_recv() {
match evt {
ExecutionEvent::Account(_) => {
AsyncRunner::handle_exec_event(evt);
}
ExecutionEvent::Report(report) => {
pending.exec_reports.push(report);
}
ExecutionEvent::Order(order_evt) => {
pending.order_evts.push(order_evt);
}
ExecutionEvent::OrderSubmittedBatch(batch) => {
for submitted in batch {
pending.order_evts.push(OrderEventAny::Submitted(submitted));
}
}
ExecutionEvent::OrderAcceptedBatch(batch) => {
for accepted in batch {
pending.order_evts.push(OrderEventAny::Accepted(accepted));
}
}
ExecutionEvent::OrderCanceledBatch(batch) => {
for canceled in batch {
pending.order_evts.push(OrderEventAny::Canceled(canceled));
}
}
}
}
while let Ok(cmd) = exec_cmd_rx.try_recv() {
pending.exec_cmds.push(cmd);
}
pending.drain();
}
async fn drive_with_event_buffering<F: std::future::Future>(
future: F,
pending: &mut PendingEvents,
time_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TimeEventHandler>,
data_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
data_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataCommand>,
exec_evt_rx: &mut tokio::sync::mpsc::UnboundedReceiver<ExecutionEvent>,
exec_cmd_rx: &mut tokio::sync::mpsc::UnboundedReceiver<TradingCommand>,
) -> F::Output {
tokio::pin!(future);
loop {
tokio::select! {
biased;
result = &mut future => {
break result;
}
Some(handler) = time_evt_rx.recv() => {
AsyncRunner::handle_time_event(handler);
}
Some(evt) = exec_evt_rx.recv() => {
match evt {
ExecutionEvent::Account(_) => {
AsyncRunner::handle_exec_event(evt);
}
ExecutionEvent::Report(report) => {
pending.exec_reports.push(report);
}
ExecutionEvent::Order(order_evt) => {
pending.order_evts.push(order_evt);
}
ExecutionEvent::OrderSubmittedBatch(batch) => {
for submitted in batch {
pending.order_evts.push(OrderEventAny::Submitted(submitted));
}
}
ExecutionEvent::OrderAcceptedBatch(batch) => {
for accepted in batch {
pending.order_evts.push(OrderEventAny::Accepted(accepted));
}
}
ExecutionEvent::OrderCanceledBatch(batch) => {
for canceled in batch {
pending.order_evts.push(OrderEventAny::Canceled(canceled));
}
}
}
}
Some(cmd) = exec_cmd_rx.recv() => {
pending.exec_cmds.push(cmd);
}
Some(evt) = data_evt_rx.recv() => {
pending.data_evts.push(evt);
}
Some(cmd) = data_cmd_rx.recv() => {
pending.data_cmds.push(cmd);
}
}
}
}
#[derive(Default)]
struct PendingEvents {
data_cmds: Vec<DataCommand>,
data_evts: Vec<DataEvent>,
exec_cmds: Vec<TradingCommand>,
exec_reports: Vec<ExecutionReport>,
order_evts: Vec<OrderEventAny>,
}
impl PendingEvents {
fn is_empty(&self) -> bool {
self.data_evts.is_empty()
&& self.data_cmds.is_empty()
&& self.exec_cmds.is_empty()
&& self.exec_reports.is_empty()
&& self.order_evts.is_empty()
}
fn drain_data(&mut self) -> bool {
let total = self.data_evts.len() + self.data_cmds.len();
if total > 0 {
log::debug!(
"Draining {total} data events/commands into cache \
(data_evts={}, data_cmds={})",
self.data_evts.len(),
self.data_cmds.len(),
);
}
for evt in self.data_evts.drain(..) {
AsyncRunner::handle_data_event(evt);
}
for cmd in self.data_cmds.drain(..) {
AsyncRunner::handle_data_command(cmd);
}
total > 0
}
fn drain(&mut self) {
let total = self.data_evts.len()
+ self.data_cmds.len()
+ self.exec_cmds.len()
+ self.exec_reports.len()
+ self.order_evts.len();
if total > 0 {
log::debug!(
"Processing {total} events/commands queued during startup \
(data_evts={}, data_cmds={}, exec_cmds={}, exec_reports={}, order_evts={})",
self.data_evts.len(),
self.data_cmds.len(),
self.exec_cmds.len(),
self.exec_reports.len(),
self.order_evts.len()
);
}
for evt in self.data_evts.drain(..) {
AsyncRunner::handle_data_event(evt);
}
for cmd in self.data_cmds.drain(..) {
AsyncRunner::handle_data_command(cmd);
}
for report in self.exec_reports.drain(..) {
AsyncRunner::handle_exec_event(ExecutionEvent::Report(report));
}
for cmd in self.exec_cmds.drain(..) {
AsyncRunner::handle_exec_command(cmd);
}
for evt in self.order_evts.drain(..) {
AsyncRunner::handle_exec_event(ExecutionEvent::Order(evt));
}
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "python")]
use std::sync::Arc;
use std::{cell::RefCell, rc::Rc};
#[cfg(feature = "python")]
use nautilus_common::runner::{
SyncDataCommandSender, SyncTradingCommandSender, replace_data_cmd_sender,
replace_exec_cmd_sender,
};
use nautilus_common::{
cache::Cache,
clock::Clock,
msgbus::{self, MessagingSwitchboard, TypedIntoHandler},
};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_execution::engine::{ExecutionEngine, SnapshotAnchorer};
use nautilus_model::{
enums::OrderType,
identifiers::{AccountId, ClientId, InstrumentId, TraderId, VenueOrderId},
instruments::{Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt},
orders::{OrderTestBuilder, stubs::TestOrderEventStubs},
types::{Price, Quantity},
};
use nautilus_system::{KernelEventStore, RegisteredComponents, event_store::EventStoreConfig};
use rstest::*;
use super::*;
#[derive(Debug)]
struct ReplayKernelEventStore {
fail_restore: bool,
}
impl KernelEventStore for ReplayKernelEventStore {
fn restore_parent_cache(
&mut self,
_instance_id: UUID4,
_cache: &mut Cache,
) -> anyhow::Result<()> {
if self.fail_restore {
anyhow::bail!("replay restore failed");
}
Ok(())
}
fn open(
&mut self,
_instance_id: UUID4,
_components: &RegisteredComponents,
_environment: Environment,
) -> anyhow::Result<()> {
Ok(())
}
fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
None
}
fn seal(&mut self, _ts_init: UnixNanos) {}
fn run_id(&self) -> Option<&str> {
Some("replay-child")
}
fn parent_run_id(&self) -> Option<&str> {
Some("seed-run")
}
fn is_event_store_replay_configured(&self) -> bool {
true
}
fn is_halted(&self) -> bool {
false
}
}
fn live_node_with_replay_store(fail_restore: bool) -> LiveNode {
let builder = LiveNodeBuilder::new(TraderId::default(), Environment::Live)
.unwrap()
.with_exec_engine_config(crate::config::LiveExecEngineConfig {
reconciliation: false,
..Default::default()
})
.with_load_state(true)
.with_name("TestKernel")
.with_event_store(move |_instance_id: UUID4, _clock: Rc<RefCell<dyn Clock>>| {
Ok(Box::new(ReplayKernelEventStore { fail_restore }) as Box<dyn KernelEventStore>)
});
builder.build().unwrap()
}
#[rstest]
fn test_run_reconciliation_checks_does_not_publish_open_order_queries() {
let config = LiveNodeConfig {
exec_engine: crate::config::LiveExecEngineConfig {
reconciliation: true,
open_check_interval_secs: Some(1.0),
position_check_interval_secs: Some(1.0),
max_single_order_queries_per_cycle: 5,
..Default::default()
},
..Default::default()
};
let mut node =
LiveNode::build("ReconciliationFallbackNode".to_string(), Some(config)).unwrap();
let client_id = ClientId::from("TEST-QUERY");
let account_id = AccountId::from("TEST-QUERY-001");
let trading_commands = Rc::new(RefCell::new(Vec::new()));
msgbus::register_trading_command_endpoint(
MessagingSwitchboard::exec_engine_execute(),
TypedIntoHandler::from({
let trading_commands = trading_commands.clone();
move |command: TradingCommand| {
trading_commands.borrow_mut().push(command);
}
}),
);
let venue_order_id = VenueOrderId::from("V-NODE-QUERY-001");
let instrument = crypto_perpetual_ethusdt();
let instrument_id = instrument.id();
let client_order_id = ClientOrderId::from("O-NODE-QUERY-001");
node.kernel
.cache
.borrow_mut()
.add_instrument(InstrumentAny::CryptoPerpetual(instrument))
.unwrap();
insert_accepted_limit_order_in_node(
&node,
account_id,
client_id,
instrument_id,
client_order_id,
venue_order_id,
);
let mut ts_last_inflight = UnixNanos::default();
let mut ts_last_open = UnixNanos::default();
let mut open_order_report_task = None;
node.run_reconciliation_checks(
ReconciliationCheckIntervals {
inflight_ns: 0,
open_ns: 1,
},
&mut ReconciliationCheckState {
ts_last_inflight: &mut ts_last_inflight,
ts_last_open: &mut ts_last_open,
open_order_report_task: &mut open_order_report_task,
},
);
let commands = trading_commands.borrow();
assert!(commands.is_empty());
assert!(open_order_report_task.is_none());
ExecutionEngine::register_msgbus_handlers(&node.kernel.exec_engine);
}
fn insert_accepted_limit_order_in_node(
node: &LiveNode,
account_id: AccountId,
client_id: ClientId,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
venue_order_id: VenueOrderId,
) {
let order = OrderTestBuilder::new(OrderType::Limit)
.client_order_id(client_order_id)
.instrument_id(instrument_id)
.quantity(Quantity::from("10.0"))
.price(Price::from("100.0"))
.build();
let submitted = TestOrderEventStubs::submitted(&order, account_id);
node.kernel
.cache
.borrow_mut()
.add_order(order, None, Some(client_id), false)
.unwrap();
let order = node
.kernel
.cache
.borrow_mut()
.update_order(&submitted)
.unwrap();
let accepted = TestOrderEventStubs::accepted(&order, account_id, venue_order_id);
node.kernel
.cache
.borrow_mut()
.update_order(&accepted)
.unwrap();
}
#[rstest]
#[case(0, NodeState::Idle)]
#[case(1, NodeState::Starting)]
#[case(2, NodeState::Running)]
#[case(3, NodeState::ShuttingDown)]
#[case(4, NodeState::Stopped)]
fn test_node_state_from_u8_valid(#[case] value: u8, #[case] expected: NodeState) {
assert_eq!(NodeState::from_u8(value), expected);
}
#[rstest]
#[case(5)]
#[case(255)]
#[should_panic(expected = "Invalid NodeState value")]
fn test_node_state_from_u8_invalid_panics(#[case] value: u8) {
let _ = NodeState::from_u8(value);
}
#[rstest]
fn test_node_state_roundtrip() {
for state in [
NodeState::Idle,
NodeState::Starting,
NodeState::Running,
NodeState::ShuttingDown,
NodeState::Stopped,
] {
assert_eq!(NodeState::from_u8(state.as_u8()), state);
}
}
#[rstest]
fn test_node_state_is_running_only_for_running() {
assert!(!NodeState::Idle.is_running());
assert!(!NodeState::Starting.is_running());
assert!(NodeState::Running.is_running());
assert!(!NodeState::ShuttingDown.is_running());
assert!(!NodeState::Stopped.is_running());
}
#[rstest]
#[tokio::test]
async fn test_await_engines_connected_returns_stop_requested() {
let node = LiveNode::build("TestNode".to_string(), None).unwrap();
let handle = node.handle();
handle.stop();
let status = node.await_engines_connected().await;
assert_eq!(status, EngineConnectionStatus::StopRequested);
assert!(handle.should_stop());
}
#[rstest]
#[tokio::test]
async fn test_await_engines_connected_returns_shutdown_requested() {
let node = LiveNode::build("TestNode".to_string(), None).unwrap();
node.kernel().shutdown_flag().set(true);
let status = node.await_engines_connected().await;
assert_eq!(status, EngineConnectionStatus::ShutdownRequested);
}
#[rstest]
#[tokio::test]
async fn test_start_stop_request_aborts_startup_without_running() {
let config = LiveNodeConfig {
exec_engine: crate::config::LiveExecEngineConfig {
reconciliation: false,
..Default::default()
},
timeout_disconnection: Duration::from_millis(50),
..Default::default()
};
let mut node = LiveNode::build("TestNode".to_string(), Some(config)).unwrap();
let handle = node.handle();
handle.stop();
node.start().await.unwrap();
assert_eq!(handle.state(), NodeState::Stopped);
assert!(handle.should_stop());
assert!(!handle.is_running());
}
#[rstest]
#[tokio::test]
async fn test_start_event_store_replay_skips_live_connections() {
let mut node = live_node_with_replay_store(false);
let handle = node.handle();
node.start().await.unwrap();
assert_eq!(handle.state(), NodeState::Running);
assert!(handle.is_running());
assert!(node.kernel.is_event_store_replay());
assert!(node.runner.is_some());
}
#[rstest]
#[tokio::test]
async fn test_start_event_store_replay_config_failure_aborts_startup() {
let mut node = live_node_with_replay_store(true);
let handle = node.handle();
node.start().await.unwrap();
assert_eq!(handle.state(), NodeState::Stopped);
assert!(!handle.is_running());
assert!(node.kernel.is_event_store_replay_configured());
assert!(!node.kernel.is_event_store_replay());
assert!(node.runner.is_some());
}
#[rstest]
#[tokio::test]
async fn test_run_event_store_replay_consumes_runner_and_stops_before_connections() {
let mut node = live_node_with_replay_store(false);
let handle = node.handle();
node.run().await.unwrap();
assert_eq!(handle.state(), NodeState::Running);
assert!(handle.is_running());
assert!(node.kernel.is_event_store_replay());
assert!(node.runner.is_none());
}
#[rstest]
#[tokio::test]
async fn test_run_event_store_replay_config_failure_aborts_startup() {
let mut node = live_node_with_replay_store(true);
let handle = node.handle();
node.run().await.unwrap();
assert_eq!(handle.state(), NodeState::Stopped);
assert!(!handle.is_running());
assert!(node.kernel.is_event_store_replay_configured());
assert!(!node.kernel.is_event_store_replay());
assert!(node.runner.is_none());
}
#[rstest]
fn test_build_rejects_event_store_config_without_factory() {
let config = LiveNodeConfig {
event_store: Some(EventStoreConfig::default()),
exec_engine: crate::config::LiveExecEngineConfig {
reconciliation: false,
..Default::default()
},
..Default::default()
};
let err = LiveNodeBuilder::from_config(config)
.expect("builder")
.build()
.expect_err("should reject event_store config without factory");
assert!(
err.to_string().contains("with_event_store"),
"error message should mention with_event_store, was: {err}"
);
}
#[rstest]
fn test_direct_build_rejects_event_store_config() {
let config = LiveNodeConfig {
event_store: Some(EventStoreConfig::default()),
exec_engine: crate::config::LiveExecEngineConfig {
reconciliation: false,
..Default::default()
},
..Default::default()
};
let err = LiveNode::build("TestNode".to_string(), Some(config))
.expect_err("LiveNode::build should reject event_store config");
assert!(
err.to_string().contains("with_event_store"),
"error message should mention with_event_store, was: {err}"
);
}
#[rstest]
fn test_handle_initial_state() {
let handle = LiveNodeHandle::new();
assert_eq!(handle.state(), NodeState::Idle);
assert!(!handle.should_stop());
assert!(!handle.is_running());
}
#[rstest]
fn test_handle_stop_sets_flag() {
let handle = LiveNodeHandle::new();
handle.stop();
assert!(handle.should_stop());
}
#[rstest]
fn test_handle_set_state_running_clears_stop_flag() {
let handle = LiveNodeHandle::new();
handle.stop();
assert!(handle.should_stop());
handle.set_state(NodeState::Running);
assert!(!handle.should_stop());
assert!(handle.is_running());
assert_eq!(handle.state(), NodeState::Running);
}
#[rstest]
fn test_handle_node_state_transitions() {
let handle = LiveNodeHandle::new();
assert_eq!(handle.state(), NodeState::Idle);
handle.set_state(NodeState::Starting);
assert_eq!(handle.state(), NodeState::Starting);
assert!(!handle.is_running());
handle.set_state(NodeState::Running);
assert_eq!(handle.state(), NodeState::Running);
assert!(handle.is_running());
handle.set_state(NodeState::ShuttingDown);
assert_eq!(handle.state(), NodeState::ShuttingDown);
assert!(!handle.is_running());
handle.set_state(NodeState::Stopped);
assert_eq!(handle.state(), NodeState::Stopped);
assert!(!handle.is_running());
}
#[rstest]
fn test_handle_clone_shares_state_bidirectionally() {
let handle1 = LiveNodeHandle::new();
let handle2 = handle1.clone();
handle1.stop();
assert!(handle2.should_stop());
handle2.set_state(NodeState::Running);
assert_eq!(handle1.state(), NodeState::Running);
}
#[rstest]
fn test_handle_stop_flag_independent_of_state() {
let handle = LiveNodeHandle::new();
handle.set_state(NodeState::Starting);
handle.stop();
assert!(handle.should_stop());
assert_eq!(handle.state(), NodeState::Starting);
handle.set_state(NodeState::ShuttingDown);
assert!(handle.should_stop());
handle.set_state(NodeState::Running);
assert!(!handle.should_stop()); }
#[rstest]
fn test_builder_creation() {
let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
assert!(result.is_ok());
}
#[rstest]
fn test_builder_rejects_backtest() {
let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Backtest);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Backtest"));
}
#[rstest]
fn test_builder_accepts_live_environment() {
let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live);
assert!(result.is_ok());
}
#[rstest]
fn test_builder_accepts_sandbox_environment() {
let result = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox);
assert!(result.is_ok());
}
#[rstest]
fn test_builder_fluent_api_chaining() {
let builder = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Live)
.unwrap()
.with_name("TestNode")
.with_instance_id(UUID4::new())
.with_load_state(false)
.with_save_state(true)
.with_timeout_connection(30)
.with_timeout_reconciliation(60)
.with_reconciliation(true)
.with_reconciliation_lookback_mins(120)
.with_timeout_portfolio(10)
.with_timeout_disconnection_secs(5)
.with_delay_post_stop_secs(3)
.with_delay_shutdown_secs(10);
assert_eq!(builder.name(), "TestNode");
}
#[cfg(feature = "python")]
#[rstest]
fn test_node_build_and_initial_state() {
let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
.unwrap()
.with_name("TestNode")
.build()
.unwrap();
assert_eq!(node.state(), NodeState::Idle);
assert!(!node.is_running());
assert_eq!(node.environment(), Environment::Sandbox);
assert_eq!(node.trader_id(), TraderId::from("TRADER-001"));
}
#[cfg(feature = "python")]
#[rstest]
fn test_node_build_replaces_stale_runner_senders() {
replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
let first = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
.unwrap()
.with_name("FirstNode")
.build()
.unwrap();
assert_eq!(first.state(), NodeState::Idle);
drop(first);
let second = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
.unwrap()
.with_name("SecondNode")
.build()
.unwrap();
assert_eq!(second.state(), NodeState::Idle);
assert!(!second.is_running());
}
#[cfg(feature = "python")]
#[rstest]
fn test_node_handle_reflects_node_state() {
let node = LiveNode::builder(TraderId::from("TRADER-001"), Environment::Sandbox)
.unwrap()
.with_name("TestNode")
.build()
.unwrap();
let handle = node.handle();
assert_eq!(handle.state(), NodeState::Idle);
assert!(!handle.is_running());
}
#[rstest]
fn test_pending_drain_data_returns_false_when_empty() {
let mut pending = PendingEvents::default();
assert!(!pending.drain_data());
}
#[rstest]
fn test_pending_drain_data_returns_true_when_non_empty() {
use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
let mut pending = PendingEvents::default();
pending
.data_evts
.push(DataEvent::Instrument(InstrumentAny::CryptoPerpetual(
crypto_perpetual_ethusdt(),
)));
assert!(pending.drain_data());
assert!(pending.data_evts.is_empty());
}
fn stub_data_event() -> DataEvent {
use nautilus_model::instruments::{InstrumentAny, stubs::crypto_perpetual_ethusdt};
DataEvent::Instrument(InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt()))
}
fn stub_data_command() -> DataCommand {
use nautilus_common::messages::data::{SubscribeCommand, subscribe::SubscribeInstruments};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::identifiers::Venue;
DataCommand::Subscribe(SubscribeCommand::Instruments(SubscribeInstruments::new(
None,
Venue::from("TEST"),
UUID4::new(),
UnixNanos::default(),
None,
None,
)))
}
#[rstest]
fn test_flush_pending_data_drains_events_and_commands() {
let (evt_tx, mut evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let mut pending = PendingEvents::default();
pending.data_evts.push(stub_data_event());
pending.data_cmds.push(stub_data_command());
evt_tx.send(stub_data_event()).unwrap();
cmd_tx.send(stub_data_command()).unwrap();
flush_pending_data(&mut pending, &mut evt_rx, &mut cmd_rx);
assert!(pending.data_evts.is_empty());
assert!(pending.data_cmds.is_empty());
assert!(evt_rx.try_recv().is_err());
assert!(cmd_rx.try_recv().is_err());
}
#[rstest]
fn test_flush_pending_data_drains_mixed_sources() {
let (evt_tx, mut evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let mut pending = PendingEvents::default();
pending.data_evts.push(stub_data_event());
cmd_tx.send(stub_data_command()).unwrap();
evt_tx.send(stub_data_event()).unwrap();
evt_tx.send(stub_data_event()).unwrap();
cmd_tx.send(stub_data_command()).unwrap();
flush_pending_data(&mut pending, &mut evt_rx, &mut cmd_rx);
assert!(pending.data_evts.is_empty());
assert!(pending.data_cmds.is_empty());
assert!(evt_rx.try_recv().is_err());
assert!(cmd_rx.try_recv().is_err());
}
fn stub_time_event_handler() -> TimeEventHandler {
use std::rc::Rc;
use nautilus_common::timer::{TimeEvent, TimeEventCallback, TimeEventHandler};
use nautilus_core::{UUID4, UnixNanos};
use ustr::Ustr;
TimeEventHandler::new(
TimeEvent::new(
Ustr::from("test-timer"),
UUID4::new(),
UnixNanos::default(),
UnixNanos::default(),
),
TimeEventCallback::RustLocal(Rc::new(|_| {})),
)
}
fn stub_trading_command() -> TradingCommand {
use nautilus_common::messages::execution::query::QueryAccount;
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::identifiers::AccountId;
TradingCommand::QueryAccount(QueryAccount::new(
TraderId::from("TESTER-001"),
None,
AccountId::from("TEST-001"),
UUID4::new(),
UnixNanos::default(),
None,
None, ))
}
fn stub_exec_event() -> ExecutionEvent {
use nautilus_model::{
enums::{LiquiditySide, OrderSide},
identifiers::{AccountId, InstrumentId, TradeId, VenueOrderId},
reports::FillReport,
types::{Money, Price, Quantity},
};
ExecutionEvent::Report(ExecutionReport::Fill(Box::new(FillReport::new(
AccountId::from("TEST-001"),
InstrumentId::from("TEST.VENUE"),
VenueOrderId::from("V-001"),
TradeId::from("T-001"),
OrderSide::Buy,
Quantity::from("1.0"),
Price::from("100.0"),
Money::from("0.01 USD"),
LiquiditySide::Maker,
None,
None,
nautilus_core::UnixNanos::default(),
nautilus_core::UnixNanos::default(),
None,
))))
}
#[rstest]
fn test_flush_all_pending_drains_buffered_channels() {
let (time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
let (data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let (exec_evt_tx, mut exec_evt_rx) =
tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
let (exec_cmd_tx, mut exec_cmd_rx) =
tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
let mut pending = PendingEvents::default();
pending.data_evts.push(stub_data_event());
pending.data_cmds.push(stub_data_command());
time_tx.send(stub_time_event_handler()).unwrap();
data_evt_tx.send(stub_data_event()).unwrap();
data_cmd_tx.send(stub_data_command()).unwrap();
exec_evt_tx.send(stub_exec_event()).unwrap();
exec_cmd_tx.send(stub_trading_command()).unwrap();
flush_all_pending(
&mut pending,
&mut time_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
assert!(pending.data_evts.is_empty());
assert!(pending.data_cmds.is_empty());
assert!(pending.exec_reports.is_empty());
assert!(pending.exec_cmds.is_empty());
assert!(pending.order_evts.is_empty());
assert!(time_rx.try_recv().is_err());
assert!(data_evt_rx.try_recv().is_err());
assert!(data_cmd_rx.try_recv().is_err());
assert!(exec_evt_rx.try_recv().is_err());
assert!(exec_cmd_rx.try_recv().is_err());
}
fn stub_order_event() -> ExecutionEvent {
use nautilus_model::events::order::spec::OrderSubmittedSpec;
ExecutionEvent::Order(OrderEventAny::Submitted(
OrderSubmittedSpec::builder().build(),
))
}
fn stub_account_event() -> ExecutionEvent {
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::{
enums::AccountType, events::account::state::AccountState, identifiers::AccountId,
};
ExecutionEvent::Account(AccountState::new(
AccountId::from("TEST-001"),
AccountType::Cash,
vec![],
vec![],
true,
UUID4::new(),
UnixNanos::default(),
UnixNanos::default(),
None,
))
}
#[rstest]
fn test_flush_all_pending_routes_order_event_to_order_evts() {
let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let (exec_evt_tx, mut exec_evt_rx) =
tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
let (_exec_cmd_tx, mut exec_cmd_rx) =
tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
let mut pending = PendingEvents::default();
exec_evt_tx.send(stub_order_event()).unwrap();
exec_evt_tx.send(stub_exec_event()).unwrap();
flush_all_pending(
&mut pending,
&mut time_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
assert!(pending.order_evts.is_empty());
assert!(pending.exec_reports.is_empty());
assert!(exec_evt_rx.try_recv().is_err());
}
#[rstest]
fn test_flush_all_pending_routes_account_event_immediately() {
let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let (exec_evt_tx, mut exec_evt_rx) =
tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
let (_exec_cmd_tx, mut exec_cmd_rx) =
tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
let mut pending = PendingEvents::default();
exec_evt_tx.send(stub_account_event()).unwrap();
flush_all_pending(
&mut pending,
&mut time_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
assert!(pending.exec_reports.is_empty());
assert!(pending.order_evts.is_empty());
assert!(pending.exec_cmds.is_empty());
assert!(exec_evt_rx.try_recv().is_err());
}
#[rstest]
fn test_pending_is_empty_when_default() {
let pending = PendingEvents::default();
assert!(pending.is_empty());
}
#[rstest]
fn test_pending_is_empty_false_with_data_evt() {
let mut pending = PendingEvents::default();
pending.data_evts.push(stub_data_event());
assert!(!pending.is_empty());
}
#[rstest]
fn test_pending_is_empty_false_with_data_cmd() {
let mut pending = PendingEvents::default();
pending.data_cmds.push(stub_data_command());
assert!(!pending.is_empty());
}
#[rstest]
fn test_pending_is_empty_false_with_exec_cmd() {
let mut pending = PendingEvents::default();
pending.exec_cmds.push(stub_trading_command());
assert!(!pending.is_empty());
}
#[rstest]
fn test_pending_is_empty_false_with_exec_report() {
let mut pending = PendingEvents::default();
if let ExecutionEvent::Report(report) = stub_exec_event() {
pending.exec_reports.push(report);
}
assert!(!pending.is_empty());
}
#[rstest]
fn test_pending_is_empty_false_with_order_evt() {
let mut pending = PendingEvents::default();
if let ExecutionEvent::Order(order_evt) = stub_order_event() {
pending.order_evts.push(order_evt);
}
assert!(!pending.is_empty());
}
fn stub_submitted_batch_event() -> ExecutionEvent {
use nautilus_model::{
events::{OrderSubmittedBatch, order::spec::OrderSubmittedSpec},
identifiers::ClientOrderId,
};
let events = vec![
OrderSubmittedSpec::builder()
.client_order_id(ClientOrderId::from("O-001"))
.build(),
OrderSubmittedSpec::builder()
.client_order_id(ClientOrderId::from("O-002"))
.build(),
];
ExecutionEvent::OrderSubmittedBatch(OrderSubmittedBatch::new(events))
}
fn stub_canceled_batch_event() -> ExecutionEvent {
use nautilus_model::{
events::{OrderCanceledBatch, order::spec::OrderCanceledSpec},
identifiers::ClientOrderId,
};
let events = vec![
OrderCanceledSpec::builder()
.client_order_id(ClientOrderId::from("O-001"))
.build(),
OrderCanceledSpec::builder()
.client_order_id(ClientOrderId::from("O-002"))
.build(),
];
ExecutionEvent::OrderCanceledBatch(OrderCanceledBatch::new(events))
}
#[rstest]
fn test_flush_all_pending_buffers_submitted_batch_as_individual_events() {
let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let (exec_evt_tx, mut exec_evt_rx) =
tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
let (_exec_cmd_tx, mut exec_cmd_rx) =
tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
let mut pending = PendingEvents::default();
exec_evt_tx.send(stub_submitted_batch_event()).unwrap();
flush_all_pending(
&mut pending,
&mut time_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
assert!(pending.order_evts.is_empty());
assert!(exec_evt_rx.try_recv().is_err());
}
#[rstest]
fn test_flush_all_pending_buffers_canceled_batch_as_individual_events() {
let (_time_tx, mut time_rx) = tokio::sync::mpsc::unbounded_channel::<TimeEventHandler>();
let (_data_evt_tx, mut data_evt_rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
let (_data_cmd_tx, mut data_cmd_rx) = tokio::sync::mpsc::unbounded_channel::<DataCommand>();
let (exec_evt_tx, mut exec_evt_rx) =
tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
let (_exec_cmd_tx, mut exec_cmd_rx) =
tokio::sync::mpsc::unbounded_channel::<TradingCommand>();
let mut pending = PendingEvents::default();
exec_evt_tx.send(stub_canceled_batch_event()).unwrap();
flush_all_pending(
&mut pending,
&mut time_rx,
&mut data_evt_rx,
&mut data_cmd_rx,
&mut exec_evt_rx,
&mut exec_cmd_rx,
);
assert!(pending.order_evts.is_empty());
assert!(exec_evt_rx.try_recv().is_err());
}
#[rstest]
fn test_flush_all_pending_expands_batch_into_order_evts_before_drain() {
use nautilus_model::identifiers::ClientOrderId;
let (exec_evt_tx, mut exec_evt_rx) =
tokio::sync::mpsc::unbounded_channel::<ExecutionEvent>();
exec_evt_tx.send(stub_canceled_batch_event()).unwrap();
let mut pending = PendingEvents::default();
while let Ok(evt) = exec_evt_rx.try_recv() {
match evt {
ExecutionEvent::Account(_) => {
AsyncRunner::handle_exec_event(evt);
}
ExecutionEvent::Report(report) => {
pending.exec_reports.push(report);
}
ExecutionEvent::Order(order_evt) => {
pending.order_evts.push(order_evt);
}
ExecutionEvent::OrderSubmittedBatch(batch) => {
for submitted in batch {
pending.order_evts.push(OrderEventAny::Submitted(submitted));
}
}
ExecutionEvent::OrderAcceptedBatch(batch) => {
for accepted in batch {
pending.order_evts.push(OrderEventAny::Accepted(accepted));
}
}
ExecutionEvent::OrderCanceledBatch(batch) => {
for canceled in batch {
pending.order_evts.push(OrderEventAny::Canceled(canceled));
}
}
}
}
assert_eq!(pending.order_evts.len(), 2);
assert!(
matches!(&pending.order_evts[0], OrderEventAny::Canceled(c) if c.client_order_id == ClientOrderId::from("O-001"))
);
assert!(
matches!(&pending.order_evts[1], OrderEventAny::Canceled(c) if c.client_order_id == ClientOrderId::from("O-002"))
);
}
}