use std::{cell::RefCell, fmt::Debug, rc::Rc};
use ahash::AHashMap;
use nautilus_common::{
actor::{DataActor, registry::try_get_actor_unchecked},
cache::Cache,
clock::{Clock, TestClock},
component::{
Component, dispose_component, register_component_actor, reset_component, start_component,
stop_component,
},
enums::{ComponentState, ComponentTrigger, Environment},
messages::execution::TradingCommand,
msgbus,
msgbus::{
Endpoint, MStr, ShareableMessageHandler, TypedHandler, get_message_bus,
switchboard::{get_event_orders_topic, get_event_positions_topic},
},
timer::{TimeEvent, TimeEventCallback},
};
use nautilus_core::{UUID4, UnixNanos};
use nautilus_model::{
events::{OrderEventAny, PositionEvent},
identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId},
};
use nautilus_portfolio::portfolio::Portfolio;
use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
use ustr::Ustr;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum StrategyCommand {
ExitMarket,
}
fn strategy_control_endpoint(strategy_id: StrategyId) -> MStr<Endpoint> {
format!("{strategy_id}.control").into()
}
pub struct Trader {
pub trader_id: TraderId,
pub instance_id: UUID4,
pub environment: Environment,
state: ComponentState,
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
portfolio: Rc<RefCell<Portfolio>>,
actor_ids: Vec<ActorId>,
strategy_ids: Vec<StrategyId>,
strategy_stop_fns: AHashMap<StrategyId, Box<dyn FnMut() -> bool>>,
strategy_handler_ids: AHashMap<StrategyId, (Ustr, Ustr)>,
exec_algorithm_ids: Vec<ExecAlgorithmId>,
clocks: AHashMap<ComponentId, Rc<RefCell<dyn Clock>>>,
ts_created: UnixNanos,
ts_started: Option<UnixNanos>,
ts_stopped: Option<UnixNanos>,
}
impl Debug for Trader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", stringify!(TraderId)) }
}
impl Trader {
#[must_use]
pub fn new(
trader_id: TraderId,
instance_id: UUID4,
environment: Environment,
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
portfolio: Rc<RefCell<Portfolio>>,
) -> Self {
let ts_created = clock.borrow().timestamp_ns();
Self {
trader_id,
instance_id,
environment,
state: ComponentState::PreInitialized,
clock,
cache,
portfolio,
actor_ids: Vec::new(),
strategy_ids: Vec::new(),
strategy_stop_fns: AHashMap::new(),
strategy_handler_ids: AHashMap::new(),
exec_algorithm_ids: Vec::new(),
clocks: AHashMap::new(),
ts_created,
ts_started: None,
ts_stopped: None,
}
}
#[must_use]
pub const fn trader_id(&self) -> TraderId {
self.trader_id
}
#[must_use]
pub const fn instance_id(&self) -> UUID4 {
self.instance_id
}
#[must_use]
pub const fn environment(&self) -> Environment {
self.environment
}
#[must_use]
pub const fn state(&self) -> ComponentState {
self.state
}
#[must_use]
pub const fn ts_created(&self) -> UnixNanos {
self.ts_created
}
#[must_use]
pub const fn ts_started(&self) -> Option<UnixNanos> {
self.ts_started
}
#[must_use]
pub const fn ts_stopped(&self) -> Option<UnixNanos> {
self.ts_stopped
}
#[must_use]
pub const fn actor_count(&self) -> usize {
self.actor_ids.len()
}
#[must_use]
pub const fn strategy_count(&self) -> usize {
self.strategy_ids.len()
}
#[must_use]
pub const fn exec_algorithm_count(&self) -> usize {
self.exec_algorithm_ids.len()
}
pub fn get_component_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
self.clocks.values().cloned().collect()
}
#[must_use]
pub const fn component_count(&self) -> usize {
self.actor_ids.len() + self.strategy_ids.len() + self.exec_algorithm_ids.len()
}
#[must_use]
pub fn actor_ids(&self) -> Vec<ActorId> {
self.actor_ids.clone()
}
#[must_use]
pub fn strategy_ids(&self) -> Vec<StrategyId> {
self.strategy_ids.clone()
}
#[must_use]
pub fn exec_algorithm_ids(&self) -> Vec<ExecAlgorithmId> {
self.exec_algorithm_ids.clone()
}
pub fn create_component_clock(&mut self, component_id: ComponentId) -> Rc<RefCell<dyn Clock>> {
let clock: Rc<RefCell<dyn Clock>> = match self.environment {
Environment::Backtest => Rc::new(RefCell::new(TestClock::new())),
Environment::Live | Environment::Sandbox => Self::create_live_clock(),
};
self.clocks.insert(component_id, clock.clone());
clock
}
#[cfg(feature = "live")]
fn create_live_clock() -> Rc<RefCell<dyn Clock>> {
Rc::new(RefCell::new(
nautilus_common::live::clock::LiveClock::default(), ))
}
#[cfg(not(feature = "live"))]
fn create_live_clock() -> Rc<RefCell<dyn Clock>> {
panic!("Live/Sandbox environment requires the 'live' feature to be enabled");
}
pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
where
T: DataActor + Component + Debug + 'static,
{
self.validate_actor_or_strategy_registration()?;
let actor_id = actor.actor_id();
if self.actor_ids.contains(&actor_id) {
anyhow::bail!("Actor {actor_id} is already registered");
}
let component_id = ComponentId::new(actor_id.inner().as_str());
let clock = self.create_component_clock(component_id);
let mut actor_mut = actor;
actor_mut.register(self.trader_id, clock, self.cache.clone())?;
self.add_registered_actor(actor_mut)
}
pub fn add_actor_from_factory<F, T>(&mut self, factory: F) -> anyhow::Result<()>
where
F: FnOnce() -> anyhow::Result<T>,
T: DataActor + Component + Debug + 'static,
{
let actor = factory()?;
self.add_actor(actor)
}
pub fn add_registered_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
where
T: DataActor + Component + Debug + 'static,
{
let actor_id = actor.actor_id();
register_component_actor(actor);
self.actor_ids.push(actor_id);
log::info!("Registered actor {actor_id} with trader {}", self.trader_id);
Ok(())
}
pub fn add_actor_id_for_lifecycle(&mut self, actor_id: ActorId) -> anyhow::Result<()> {
if self.actor_ids.contains(&actor_id) {
anyhow::bail!("Actor '{actor_id}' is already tracked by trader");
}
self.actor_ids.push(actor_id);
log::debug!(
"Added actor ID '{actor_id}' to trader {} for lifecycle management",
self.trader_id
);
Ok(())
}
pub fn add_exec_algorithm_id_for_lifecycle(
&mut self,
exec_algorithm_id: ExecAlgorithmId,
) -> anyhow::Result<()> {
if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already tracked by trader");
}
self.exec_algorithm_ids.push(exec_algorithm_id);
log::debug!(
"Added exec algorithm ID '{exec_algorithm_id}' to trader {} for lifecycle management",
self.trader_id
);
Ok(())
}
pub fn add_strategy_id_with_subscriptions<T>(
&mut self,
strategy_id: StrategyId,
) -> anyhow::Result<()>
where
T: Strategy + Component + Debug + 'static,
{
if self.strategy_ids.contains(&strategy_id) {
anyhow::bail!("Strategy '{strategy_id}' is already tracked by trader");
}
let actor_id = Ustr::from(strategy_id.inner().as_str());
let order_topic = get_event_orders_topic(strategy_id);
let order_actor_id = actor_id;
let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
strategy.handle_order_event(event.clone());
} else {
log::error!("Strategy {order_actor_id} not found for order event handling");
}
});
let order_handler_id = order_handler.id();
msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
let position_topic = get_event_positions_topic(strategy_id);
let position_handler = TypedHandler::from(move |event: &PositionEvent| {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
strategy.handle_position_event(event.clone());
} else {
log::error!("Strategy {actor_id} not found for position event handling");
}
});
let position_handler_id = position_handler.id();
msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
let control_actor_id = actor_id;
let control_handler = TypedHandler::from(move |command: &StrategyCommand| {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&control_actor_id) {
match command {
StrategyCommand::ExitMarket => {
if let Err(e) = strategy.market_exit() {
log::error!(
"Error handling strategy command for {control_actor_id}: {e}"
);
}
}
}
} else {
log::error!("Strategy {control_actor_id} not found for control handling");
}
});
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.register(strategy_control_endpoint(strategy_id), control_handler);
self.strategy_ids.push(strategy_id);
self.strategy_handler_ids
.insert(strategy_id, (order_handler_id, position_handler_id));
let stop_actor_id = actor_id;
let stop_fn = Box::new(move || -> bool {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
Strategy::stop(&mut *strategy)
} else {
log::error!("Strategy {stop_actor_id} not found for stop");
true
}
});
self.strategy_stop_fns.insert(strategy_id, stop_fn);
log::debug!(
"Added strategy '{strategy_id}' to trader {} with event subscriptions",
self.trader_id
);
Ok(())
}
pub fn add_strategy<T>(&mut self, mut strategy: T) -> anyhow::Result<()>
where
T: Strategy + Component + Debug + 'static,
{
self.validate_actor_or_strategy_registration()?;
let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
if self.strategy_ids.contains(&strategy_id) {
anyhow::bail!("Strategy {strategy_id} is already registered");
}
let component_id = strategy.component_id();
let clock = self.create_component_clock(component_id);
strategy.core_mut().register(
self.trader_id,
clock.clone(),
self.cache.clone(),
self.portfolio.clone(),
)?;
let actor_id = strategy.actor_id().inner();
let callback = TimeEventCallback::from(move |event: TimeEvent| {
if let Some(mut actor) = try_get_actor_unchecked::<T>(&actor_id) {
actor.handle_time_event(&event);
} else {
log::error!("Strategy {actor_id} not found for time event handling");
}
});
clock.borrow_mut().register_default_handler(callback);
strategy.initialize()?;
register_component_actor(strategy);
let order_topic = get_event_orders_topic(strategy_id);
let order_actor_id = actor_id;
let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&order_actor_id) {
strategy.handle_order_event(event.clone());
} else {
log::error!("Strategy {order_actor_id} not found for order event handling");
}
});
let order_handler_id = order_handler.id();
msgbus::subscribe_order_events(order_topic.into(), order_handler, None);
let position_topic = get_event_positions_topic(strategy_id);
let position_handler = TypedHandler::from(move |event: &PositionEvent| {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&actor_id) {
strategy.handle_position_event(event.clone());
} else {
log::error!("Strategy {actor_id} not found for position event handling");
}
});
let position_handler_id = position_handler.id();
msgbus::subscribe_position_events(position_topic.into(), position_handler, None);
let control_actor_id = actor_id;
let control_handler = TypedHandler::from(move |command: &StrategyCommand| {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&control_actor_id) {
match command {
StrategyCommand::ExitMarket => {
if let Err(e) = strategy.market_exit() {
log::error!(
"Error handling strategy command for {control_actor_id}: {e}"
);
}
}
}
} else {
log::error!("Strategy {control_actor_id} not found for control handling");
}
});
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.register(strategy_control_endpoint(strategy_id), control_handler);
self.strategy_ids.push(strategy_id);
self.strategy_handler_ids
.insert(strategy_id, (order_handler_id, position_handler_id));
let stop_actor_id = actor_id;
let stop_fn = Box::new(move || -> bool {
if let Some(mut strategy) = try_get_actor_unchecked::<T>(&stop_actor_id) {
Strategy::stop(&mut *strategy)
} else {
log::error!("Strategy {stop_actor_id} not found for stop");
true }
});
self.strategy_stop_fns.insert(strategy_id, stop_fn);
log::info!(
"Registered strategy {strategy_id} with trader {}",
self.trader_id
);
Ok(())
}
pub fn add_exec_algorithm<T>(&mut self, mut exec_algorithm: T) -> anyhow::Result<()>
where
T: ExecutionAlgorithm + Component + Debug + 'static,
{
self.validate_exec_algorithm_registration()?;
let exec_algorithm_id =
ExecAlgorithmId::from(exec_algorithm.component_id().inner().as_str());
if self.exec_algorithm_ids.contains(&exec_algorithm_id) {
anyhow::bail!("Execution algorithm '{exec_algorithm_id}' is already registered");
}
let component_id = exec_algorithm.component_id();
let clock = self.create_component_clock(component_id);
exec_algorithm.register(self.trader_id, clock, self.cache.clone())?;
register_component_actor(exec_algorithm);
let actor_id = Ustr::from(exec_algorithm_id.inner().as_str());
let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
let handler = ShareableMessageHandler::from_typed(move |command: &TradingCommand| {
if let Some(mut algo) = try_get_actor_unchecked::<T>(&actor_id) {
if let Err(e) = algo.execute(command.clone()) {
log::error!("Error executing command on algorithm {actor_id}: {e}");
}
} else {
log::error!("Execution algorithm {actor_id} not found in registry");
}
});
msgbus::register_any(endpoint.into(), handler);
self.exec_algorithm_ids.push(exec_algorithm_id);
log::info!(
"Registered execution algorithm {exec_algorithm_id} with trader {}",
self.trader_id
);
Ok(())
}
fn validate_actor_or_strategy_registration(&self) -> anyhow::Result<()> {
match self.state {
ComponentState::PreInitialized
| ComponentState::Ready
| ComponentState::Stopped
| ComponentState::Running => Ok(()),
ComponentState::Disposed => {
anyhow::bail!("Cannot add components to disposed trader")
}
_ => anyhow::bail!("Cannot add components in current state: {}", self.state),
}
}
fn validate_exec_algorithm_registration(&self) -> anyhow::Result<()> {
match self.state {
ComponentState::PreInitialized | ComponentState::Ready | ComponentState::Stopped => {
Ok(())
}
ComponentState::Running => {
anyhow::bail!("Cannot add execution algorithms to running trader")
}
ComponentState::Disposed => {
anyhow::bail!("Cannot add components to disposed trader")
}
_ => anyhow::bail!(
"Cannot add execution algorithms in current state: {}",
self.state
),
}
}
pub fn start_components(&mut self) -> anyhow::Result<()> {
for actor_id in &self.actor_ids {
log::debug!("Starting actor {actor_id}");
start_component(&actor_id.inner())?;
}
for strategy_id in &self.strategy_ids {
log::debug!("Starting strategy {strategy_id}");
start_component(&strategy_id.inner())?;
}
for exec_algorithm_id in &self.exec_algorithm_ids {
log::debug!("Starting execution algorithm {exec_algorithm_id}");
start_component(&exec_algorithm_id.inner())?;
}
Ok(())
}
pub fn stop_components(&mut self) -> anyhow::Result<()> {
for actor_id in &self.actor_ids {
log::debug!("Stopping actor {actor_id}");
stop_component(&actor_id.inner())?;
}
for exec_algorithm_id in &self.exec_algorithm_ids {
log::debug!("Stopping execution algorithm {exec_algorithm_id}");
stop_component(&exec_algorithm_id.inner())?;
}
for strategy_id in self.strategy_ids.clone() {
log::debug!("Stopping strategy {strategy_id}");
let should_proceed = self
.strategy_stop_fns
.get_mut(&strategy_id)
.is_none_or(|stop_fn| stop_fn());
if should_proceed {
stop_component(&strategy_id.inner())?;
}
}
Ok(())
}
pub fn reset_components(&mut self) -> anyhow::Result<()> {
for actor_id in &self.actor_ids {
log::debug!("Resetting actor {actor_id}");
reset_component(&actor_id.inner())?;
}
for strategy_id in &self.strategy_ids {
log::debug!("Resetting strategy {strategy_id}");
reset_component(&strategy_id.inner())?;
}
for exec_algorithm_id in &self.exec_algorithm_ids {
log::debug!("Resetting execution algorithm {exec_algorithm_id}");
reset_component(&exec_algorithm_id.inner())?;
}
Ok(())
}
pub fn dispose_components(&mut self) -> anyhow::Result<()> {
for actor_id in &self.actor_ids {
log::debug!("Disposing actor {actor_id}");
dispose_component(&actor_id.inner())?;
}
for strategy_id in &self.strategy_ids {
log::debug!("Disposing strategy {strategy_id}");
dispose_component(&strategy_id.inner())?;
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.deregister(strategy_control_endpoint(*strategy_id));
}
for exec_algorithm_id in &self.exec_algorithm_ids {
log::debug!("Disposing execution algorithm {exec_algorithm_id}");
dispose_component(&exec_algorithm_id.inner())?;
let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
msgbus::deregister_any(endpoint.into());
}
self.actor_ids.clear();
self.strategy_ids.clear();
self.strategy_stop_fns.clear();
self.strategy_handler_ids.clear();
self.exec_algorithm_ids.clear();
self.clocks.clear();
Ok(())
}
pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
for strategy_id in &self.strategy_ids {
log::debug!("Disposing strategy {strategy_id}");
dispose_component(&strategy_id.inner())?;
let component_id = ComponentId::new(strategy_id.inner().as_str());
self.clocks.remove(&component_id);
if let Some((order_hid, position_hid)) = self.strategy_handler_ids.get(strategy_id) {
let order_topic = get_event_orders_topic(*strategy_id);
let position_topic = get_event_positions_topic(*strategy_id);
msgbus::remove_order_event_handler(order_topic.into(), *order_hid);
msgbus::remove_position_event_handler(position_topic.into(), *position_hid);
}
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.deregister(strategy_control_endpoint(*strategy_id));
}
self.strategy_ids.clear();
self.strategy_stop_fns.clear();
self.strategy_handler_ids.clear();
Ok(())
}
pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
for exec_algorithm_id in &self.exec_algorithm_ids {
log::debug!("Disposing execution algorithm {exec_algorithm_id}");
dispose_component(&exec_algorithm_id.inner())?;
let endpoint: Ustr = format!("{exec_algorithm_id}.execute").into();
msgbus::deregister_any(endpoint.into());
let component_id = ComponentId::new(exec_algorithm_id.inner().as_str());
self.clocks.remove(&component_id);
}
self.exec_algorithm_ids.clear();
Ok(())
}
pub fn start_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
if !self.actor_ids.contains(actor_id) {
anyhow::bail!("Cannot start actor, {actor_id} not found");
}
start_component(&actor_id.inner())
}
pub fn stop_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
if !self.actor_ids.contains(actor_id) {
anyhow::bail!("Cannot stop actor, {actor_id} not found");
}
stop_component(&actor_id.inner())
}
pub fn remove_actor(&mut self, actor_id: &ActorId) -> anyhow::Result<()> {
let pos = self
.actor_ids
.iter()
.position(|id| id == actor_id)
.ok_or_else(|| anyhow::anyhow!("Cannot remove actor, {actor_id} not found"))?;
let _ = stop_component(&actor_id.inner());
dispose_component(&actor_id.inner())?;
self.actor_ids.swap_remove(pos);
let component_id = ComponentId::new(actor_id.inner().as_str());
self.clocks.remove(&component_id);
log::info!("Removed actor {actor_id} from trader {}", self.trader_id);
Ok(())
}
pub fn start_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
if !self.strategy_ids.contains(strategy_id) {
anyhow::bail!("Cannot start strategy, {strategy_id} not found");
}
start_component(&strategy_id.inner())
}
pub fn stop_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()> {
if !self.strategy_ids.contains(strategy_id) {
anyhow::bail!("Cannot stop strategy, {strategy_id} not found");
}
let should_proceed = self
.strategy_stop_fns
.get_mut(strategy_id)
.is_none_or(|stop_fn| stop_fn());
if should_proceed {
stop_component(&strategy_id.inner())?;
}
Ok(())
}
pub fn market_exit_strategy(
trader: &Rc<RefCell<Self>>,
strategy_id: &StrategyId,
) -> anyhow::Result<()> {
let handler = trader.borrow().strategy_command_handler(strategy_id)?;
handler.handle(&StrategyCommand::ExitMarket);
Ok(())
}
fn strategy_command_handler(
&self,
strategy_id: &StrategyId,
) -> anyhow::Result<TypedHandler<StrategyCommand>> {
if !self.strategy_ids.contains(strategy_id) {
anyhow::bail!("Cannot market exit strategy, {strategy_id} not found");
}
let endpoint = strategy_control_endpoint(*strategy_id);
let handler = {
let msgbus = get_message_bus();
msgbus
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.get(endpoint)
.cloned()
};
let Some(handler) = handler else {
anyhow::bail!(
"Cannot exit market for strategy {strategy_id}: control endpoint '{}' not registered",
endpoint.as_str()
);
};
Ok(handler)
}
pub fn remove_strategy(&mut self, strategy_id: &StrategyId) -> anyhow::Result<()> {
let pos = self
.strategy_ids
.iter()
.position(|id| id == strategy_id)
.ok_or_else(|| anyhow::anyhow!("Cannot remove strategy, {strategy_id} not found"))?;
let _ = stop_component(&strategy_id.inner());
dispose_component(&strategy_id.inner())?;
if let Some((order_hid, position_hid)) = self.strategy_handler_ids.remove(strategy_id) {
let order_topic = get_event_orders_topic(*strategy_id);
let position_topic = get_event_positions_topic(*strategy_id);
msgbus::remove_order_event_handler(order_topic.into(), order_hid);
msgbus::remove_position_event_handler(position_topic.into(), position_hid);
}
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.deregister(strategy_control_endpoint(*strategy_id));
self.strategy_ids.swap_remove(pos);
self.strategy_stop_fns.remove(strategy_id);
let component_id = ComponentId::new(strategy_id.inner().as_str());
self.clocks.remove(&component_id);
log::info!(
"Removed strategy {strategy_id} from trader {}",
self.trader_id
);
Ok(())
}
pub fn initialize(&mut self) -> anyhow::Result<()> {
let new_state = self.state.transition(&ComponentTrigger::Initialize)?;
self.state = new_state;
Ok(())
}
fn on_start(&mut self) -> anyhow::Result<()> {
self.start_components()?;
self.ts_started = Some(self.clock.borrow().timestamp_ns());
Ok(())
}
fn on_stop(&mut self) -> anyhow::Result<()> {
self.stop_components()?;
self.ts_stopped = Some(self.clock.borrow().timestamp_ns());
Ok(())
}
fn on_reset(&mut self) -> anyhow::Result<()> {
self.reset_components()?;
self.ts_started = None;
self.ts_stopped = None;
Ok(())
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
if self.is_running() {
self.stop()?;
}
self.dispose_components()?;
Ok(())
}
}
impl Component for Trader {
fn component_id(&self) -> ComponentId {
ComponentId::new(format!("Trader-{}", self.trader_id))
}
fn state(&self) -> ComponentState {
self.state
}
fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
self.state = self.state.transition(&trigger)?;
log::info!("{}", self.state.variant_name());
Ok(())
}
fn register(
&mut self,
_trader_id: TraderId,
_clock: Rc<RefCell<dyn Clock>>,
_cache: Rc<RefCell<Cache>>,
) -> anyhow::Result<()> {
anyhow::bail!("Trader cannot register with itself")
}
fn on_start(&mut self) -> anyhow::Result<()> {
Self::on_start(self)
}
fn on_stop(&mut self) -> anyhow::Result<()> {
Self::on_stop(self)
}
fn on_reset(&mut self) -> anyhow::Result<()> {
Self::on_reset(self)
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
Self::on_dispose(self)
}
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use nautilus_common::{
actor::{DataActorCore, data_actor::DataActorConfig},
cache::Cache,
clock::TestClock,
enums::{ComponentState, Environment},
msgbus,
msgbus::{MessageBus, TypedHandler, switchboard::get_event_orders_topic},
nautilus_actor,
};
use nautilus_core::UUID4;
use nautilus_data::engine::{DataEngine, config::DataEngineConfig};
use nautilus_execution::engine::{ExecutionEngine, config::ExecutionEngineConfig};
use nautilus_model::{
events::OrderAccepted,
identifiers::{ActorId, ComponentId, TraderId},
orders::OrderAny,
stubs::TestDefault,
};
use nautilus_portfolio::portfolio::Portfolio;
use nautilus_risk::engine::{RiskEngine, config::RiskEngineConfig};
use nautilus_trading::{
ExecutionAlgorithm as ExecutionAlgorithmTrait, ExecutionAlgorithmConfig,
ExecutionAlgorithmCore, nautilus_strategy,
strategy::{config::StrategyConfig, core::StrategyCore},
};
use rstest::rstest;
use super::*;
#[derive(Debug)]
struct TestDataActor {
core: DataActorCore,
}
impl TestDataActor {
fn new(config: DataActorConfig) -> Self {
Self {
core: DataActorCore::new(config),
}
}
}
impl DataActor for TestDataActor {}
nautilus_actor!(TestDataActor);
#[derive(Debug)]
struct TestExecAlgorithm {
core: ExecutionAlgorithmCore,
}
impl TestExecAlgorithm {
fn new(config: ExecutionAlgorithmConfig) -> Self {
Self {
core: ExecutionAlgorithmCore::new(config),
}
}
}
impl DataActor for TestExecAlgorithm {}
nautilus_actor!(TestExecAlgorithm);
impl ExecutionAlgorithmTrait for TestExecAlgorithm {
fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
&mut self.core
}
fn on_order(&mut self, _order: OrderAny) -> anyhow::Result<()> {
Ok(())
}
}
#[derive(Debug)]
struct TestStrategy {
core: StrategyCore,
}
impl TestStrategy {
fn new(config: StrategyConfig) -> Self {
Self {
core: StrategyCore::new(config),
}
}
}
impl DataActor for TestStrategy {}
nautilus_strategy!(TestStrategy);
#[allow(clippy::type_complexity)]
fn create_trader_components() -> (
Rc<RefCell<MessageBus>>,
Rc<RefCell<Cache>>,
Rc<RefCell<Portfolio>>,
Rc<RefCell<DataEngine>>,
Rc<RefCell<RiskEngine>>,
Rc<RefCell<ExecutionEngine>>,
Rc<RefCell<TestClock>>,
) {
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let clock = Rc::new(RefCell::new(TestClock::new()));
clock.borrow_mut().set_time(1_000_000_000u64.into());
let msgbus = Rc::new(RefCell::new(MessageBus::new(
trader_id,
instance_id,
Some("test".to_string()),
None,
)));
let cache = Rc::new(RefCell::new(Cache::new(None, None)));
let portfolio = Rc::new(RefCell::new(Portfolio::new(
cache.clone(),
clock.clone() as Rc<RefCell<dyn Clock>>,
None,
)));
let data_engine = Rc::new(RefCell::new(DataEngine::new(
clock.clone(),
cache.clone(),
Some(DataEngineConfig::default()),
)));
let risk_cache = Rc::new(RefCell::new(Cache::new(None, None)));
let risk_clock = Rc::new(RefCell::new(TestClock::new()));
let risk_portfolio = Portfolio::new(
risk_cache.clone(),
risk_clock.clone() as Rc<RefCell<dyn Clock>>,
None,
);
let risk_engine = Rc::new(RefCell::new(RiskEngine::new(
RiskEngineConfig::default(),
risk_portfolio,
risk_clock as Rc<RefCell<dyn Clock>>,
risk_cache,
)));
let exec_engine = Rc::new(RefCell::new(ExecutionEngine::new(
clock.clone(),
cache.clone(),
Some(ExecutionEngineConfig::default()),
)));
(
msgbus,
cache,
portfolio,
data_engine,
risk_engine,
exec_engine,
clock,
)
}
#[rstest]
fn test_trader_creation() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
assert_eq!(trader.trader_id(), trader_id);
assert_eq!(trader.instance_id(), instance_id);
assert_eq!(trader.environment(), Environment::Backtest);
assert_eq!(trader.state(), ComponentState::PreInitialized);
assert_eq!(trader.actor_count(), 0);
assert_eq!(trader.strategy_count(), 0);
assert_eq!(trader.exec_algorithm_count(), 0);
assert_eq!(trader.component_count(), 0);
assert!(!trader.is_running());
assert!(!trader.is_stopped());
assert!(!trader.is_disposed());
assert!(trader.ts_created() > 0);
assert!(trader.ts_started().is_none());
assert!(trader.ts_stopped().is_none());
}
#[rstest]
fn test_trader_component_id() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::from("TRADER-001");
let instance_id = UUID4::new();
let trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
assert_eq!(
trader.component_id(),
ComponentId::from("Trader-TRADER-001")
);
}
#[rstest]
fn test_add_actor_success() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let actor = TestDataActor::new(DataActorConfig::default());
let actor_id = actor.actor_id();
let result = trader.add_actor(actor);
assert!(result.is_ok());
assert_eq!(trader.actor_count(), 1);
assert_eq!(trader.component_count(), 1);
assert!(trader.actor_ids().contains(&actor_id));
}
#[rstest]
fn test_add_duplicate_actor_fails() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let config = DataActorConfig {
actor_id: Some(ActorId::from("TestActor")),
..Default::default()
};
let actor1 = TestDataActor::new(config.clone());
let actor2 = TestDataActor::new(config);
assert!(trader.add_actor(actor1).is_ok());
assert_eq!(trader.actor_count(), 1);
let result = trader.add_actor(actor2);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("already registered")
);
assert_eq!(trader.actor_count(), 1);
}
#[rstest]
fn test_add_strategy_success() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let config = StrategyConfig {
strategy_id: Some(StrategyId::from("Test-Strategy")),
..Default::default()
};
let strategy = TestStrategy::new(config);
let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
let result = trader.add_strategy(strategy);
assert!(result.is_ok());
assert_eq!(trader.strategy_count(), 1);
assert_eq!(trader.component_count(), 1);
assert!(trader.strategy_ids().contains(&strategy_id));
}
#[rstest]
fn test_add_exec_algorithm_success() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let config = ExecutionAlgorithmConfig {
exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
..Default::default()
};
let exec_algorithm = TestExecAlgorithm::new(config);
let exec_algorithm_id = ExecAlgorithmId::from(exec_algorithm.actor_id().inner().as_str());
let result = trader.add_exec_algorithm(exec_algorithm);
assert!(result.is_ok());
assert_eq!(trader.exec_algorithm_count(), 1);
assert_eq!(trader.component_count(), 1);
assert!(trader.exec_algorithm_ids().contains(&exec_algorithm_id));
}
#[rstest]
fn test_cannot_add_exec_algorithm_while_running() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
trader.state = ComponentState::Running;
let config = ExecutionAlgorithmConfig {
exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
..Default::default()
};
let exec_algorithm = TestExecAlgorithm::new(config);
let result = trader.add_exec_algorithm(exec_algorithm);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Cannot add execution algorithms to running trader"
);
assert_eq!(trader.exec_algorithm_count(), 0);
}
#[rstest]
fn test_component_lifecycle() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let actor = TestDataActor::new(DataActorConfig::default());
let strategy_config = StrategyConfig {
strategy_id: Some(StrategyId::from("Test-Strategy")),
..Default::default()
};
let strategy = TestStrategy::new(strategy_config);
let exec_algorithm_config = ExecutionAlgorithmConfig {
exec_algorithm_id: Some(ExecAlgorithmId::from("TestExecAlgorithm")),
..Default::default()
};
let exec_algorithm = TestExecAlgorithm::new(exec_algorithm_config);
assert!(trader.add_actor(actor).is_ok());
assert!(trader.add_strategy(strategy).is_ok());
assert!(trader.add_exec_algorithm(exec_algorithm).is_ok());
assert_eq!(trader.component_count(), 3);
let start_result = trader.start_components();
assert!(start_result.is_ok(), "{:?}", start_result.unwrap_err());
assert!(trader.stop_components().is_ok());
assert!(trader.reset_components().is_ok());
assert!(trader.dispose_components().is_ok());
assert_eq!(trader.component_count(), 0);
}
#[rstest]
fn test_trader_component_lifecycle() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
assert_eq!(trader.state(), ComponentState::PreInitialized);
assert!(!trader.is_running());
assert!(!trader.is_stopped());
assert!(!trader.is_disposed());
assert!(trader.start().is_err());
trader.initialize().unwrap();
assert!(trader.start().is_ok());
assert_eq!(trader.state(), ComponentState::Running);
assert!(trader.is_running());
assert!(trader.ts_started().is_some());
assert!(trader.stop().is_ok());
assert_eq!(trader.state(), ComponentState::Stopped);
assert!(trader.is_stopped());
assert!(trader.ts_stopped().is_some());
assert!(trader.reset().is_ok());
assert_eq!(trader.state(), ComponentState::Ready);
assert!(trader.ts_started().is_none());
assert!(trader.ts_stopped().is_none());
assert!(trader.dispose().is_ok());
assert_eq!(trader.state(), ComponentState::Disposed);
assert!(trader.is_disposed());
}
#[rstest]
fn test_market_exit_strategy_fails_when_control_endpoint_missing() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let config = StrategyConfig {
strategy_id: Some(StrategyId::from("Test-Strategy")),
..Default::default()
};
let strategy = TestStrategy::new(config);
let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
trader.add_strategy(strategy).unwrap();
let endpoint = strategy_control_endpoint(strategy_id);
assert!(
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.is_registered(endpoint)
);
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.deregister(endpoint);
let trader = Rc::new(RefCell::new(trader));
let result = Trader::market_exit_strategy(&trader, &strategy_id);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
format!(
"Cannot exit market for strategy {strategy_id}: control endpoint '{}' not registered",
endpoint.as_str()
)
);
}
#[rstest]
fn test_remove_strategy_deregisters_strategy_endpoint() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let config = StrategyConfig {
strategy_id: Some(StrategyId::from("Test-Strategy")),
..Default::default()
};
let strategy = TestStrategy::new(config);
let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
trader.add_strategy(strategy).unwrap();
let endpoint = strategy_control_endpoint(strategy_id);
assert!(
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.is_registered(endpoint)
);
trader.remove_strategy(&strategy_id).unwrap();
assert!(
!get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.is_registered(endpoint)
);
}
#[rstest]
fn test_can_add_components_while_running() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
trader.state = ComponentState::Running;
let actor = TestDataActor::new(DataActorConfig::default());
let result = trader.add_actor(actor);
assert!(result.is_ok());
assert_eq!(trader.actor_count(), 1);
}
#[rstest]
fn test_cannot_add_components_while_disposed() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
trader.state = ComponentState::Disposed;
let actor = TestDataActor::new(DataActorConfig::default());
let result = trader.add_actor(actor);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("disposed trader"));
}
#[rstest]
fn test_create_component_clock_backtest_creates_individual_clocks() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock.clone(),
cache,
portfolio,
);
let component_a = ComponentId::new("ACTOR-A");
let component_b = ComponentId::new("ACTOR-B");
let clock_a = trader.create_component_clock(component_a);
let clock_b = trader.create_component_clock(component_b);
assert_ne!(clock_a.as_ptr() as *const _, clock.as_ptr() as *const _);
assert_ne!(clock_a.as_ptr() as *const _, clock_b.as_ptr() as *const _);
}
#[rstest]
fn test_clear_strategies_preserves_other_handlers() {
let (_msgbus, cache, portfolio, _data_engine, _risk_engine, _exec_engine, clock) =
create_trader_components();
let trader_id = TraderId::test_default();
let instance_id = UUID4::new();
let mut trader = Trader::new(
trader_id,
instance_id,
Environment::Backtest,
clock,
cache,
portfolio,
);
let config = StrategyConfig {
strategy_id: Some(StrategyId::from("Test-Strategy")),
..Default::default()
};
let strategy = TestStrategy::new(config);
let strategy_id = StrategyId::from(strategy.actor_id().inner().as_str());
trader.add_strategy(strategy).unwrap();
let endpoint = strategy_control_endpoint(strategy_id);
assert!(
get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.is_registered(endpoint)
);
let ext_received = Rc::new(RefCell::new(0));
let ext_clone = ext_received.clone();
let ext_handler =
TypedHandler::from_with_id("exec-algo-handler", move |_: &OrderEventAny| {
*ext_clone.borrow_mut() += 1;
});
let order_topic = get_event_orders_topic(strategy_id);
msgbus::subscribe_order_events(order_topic.into(), ext_handler, None);
trader.clear_strategies().unwrap();
assert_eq!(trader.strategy_count(), 0);
assert!(
!get_message_bus()
.borrow_mut()
.endpoint_map::<StrategyCommand>()
.is_registered(endpoint)
);
let event = OrderEventAny::Accepted(OrderAccepted::test_default());
msgbus::publish_order_event(order_topic, &event);
assert_eq!(*ext_received.borrow(), 1);
}
}