1use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use nautilus_common::{
19 actor::{
20 DataActor, DataActorCore, data_actor::DataActorConfig, registry::try_get_actor_unchecked,
21 },
22 component::Component,
23 msgbus::{Endpoint, MStr, TypedHandler, get_message_bus},
24 nautilus_actor,
25};
26use nautilus_model::identifiers::{ActorId, StrategyId};
27use nautilus_trading::Strategy;
28
29use crate::{messages::ControllerCommand, trader::Trader};
30
31#[derive(Debug)]
32pub struct Controller {
33 core: DataActorCore,
34 trader: Rc<RefCell<Trader>>,
35}
36
37impl Controller {
38 pub const EXECUTE_ENDPOINT: &str = "Controller.execute";
39
40 #[must_use]
41 pub fn new(trader: Rc<RefCell<Trader>>, config: Option<DataActorConfig>) -> Self {
42 Self {
43 core: DataActorCore::new(config.unwrap_or_default()),
44 trader,
45 }
46 }
47
48 pub fn send(command: ControllerCommand) -> anyhow::Result<()> {
54 let endpoint = Self::execute_endpoint();
55 let handler = {
56 let msgbus = get_message_bus();
57 msgbus
58 .borrow_mut()
59 .endpoint_map::<ControllerCommand>()
60 .get(endpoint)
61 .cloned()
62 };
63
64 let Some(handler) = handler else {
65 anyhow::bail!(
66 "Controller execute endpoint '{}' not registered",
67 endpoint.as_str()
68 );
69 };
70
71 handler.handle(&command);
72 Ok(())
73 }
74
75 pub fn execute(&mut self, command: ControllerCommand) -> anyhow::Result<()> {
81 match command {
82 ControllerCommand::StartActor(actor_id) => self.start_actor(&actor_id),
83 ControllerCommand::StopActor(actor_id) => self.stop_actor(&actor_id),
84 ControllerCommand::RemoveActor(actor_id) => self.remove_actor(&actor_id),
85 ControllerCommand::StartStrategy(strategy_id) => self.start_strategy(&strategy_id),
86 ControllerCommand::StopStrategy(strategy_id) => self.stop_strategy(&strategy_id),
87 ControllerCommand::ExitMarket(strategy_id) => self.exit_market(&strategy_id),
88 ControllerCommand::RemoveStrategy(strategy_id) => self.remove_strategy(&strategy_id),
89 }
90 }
91
92 pub fn create_actor<T>(&self, actor: T, start: bool) -> anyhow::Result<ActorId>
98 where
99 T: DataActor + Component + Debug + 'static,
100 {
101 let actor_id = actor.actor_id();
102 self.trader.borrow_mut().add_actor(actor)?;
103
104 self.start_created_actor(&actor_id, start)?;
105
106 Ok(actor_id)
107 }
108
109 pub fn create_actor_from_factory<F, T>(
115 &self,
116 factory: F,
117 start: bool,
118 ) -> anyhow::Result<ActorId>
119 where
120 F: FnOnce() -> anyhow::Result<T>,
121 T: DataActor + Component + Debug + 'static,
122 {
123 let actor = factory()?;
124 self.create_actor(actor, start)
125 }
126
127 pub fn create_strategy<T>(&self, strategy: T, start: bool) -> anyhow::Result<StrategyId>
133 where
134 T: Strategy + Component + Debug + 'static,
135 {
136 let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
137 self.trader.borrow_mut().add_strategy(strategy)?;
138
139 self.start_created_strategy(&strategy_id, start)?;
140
141 Ok(strategy_id)
142 }
143
144 pub fn create_strategy_from_factory<F, T>(
150 &self,
151 factory: F,
152 start: bool,
153 ) -> anyhow::Result<StrategyId>
154 where
155 F: FnOnce() -> anyhow::Result<T>,
156 T: Strategy + Component + Debug + 'static,
157 {
158 let strategy = factory()?;
159 self.create_strategy(strategy, start)
160 }
161
162 pub fn start_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
168 self.trader.borrow().start_actor(actor_id)
169 }
170
171 pub fn stop_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
177 self.trader.borrow().stop_actor(actor_id)
178 }
179
180 pub fn remove_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
186 if actor_id.inner() == self.actor_id().inner() {
187 return Ok(());
188 }
189
190 self.trader.borrow_mut().remove_actor(actor_id)
191 }
192
193 pub fn start_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
199 self.trader.borrow().start_strategy(strategy_id)
200 }
201
202 pub fn stop_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
208 self.trader.borrow_mut().stop_strategy(strategy_id)
209 }
210
211 pub fn exit_market(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
217 Trader::market_exit_strategy(&self.trader, strategy_id)
218 }
219
220 pub fn remove_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
226 self.trader.borrow_mut().remove_strategy(strategy_id)
227 }
228
229 fn start_created_actor(&self, actor_id: &ActorId, start: bool) -> anyhow::Result<()> {
230 if !start {
231 return Ok(());
232 }
233
234 if let Err(start_err) = self.start_actor(actor_id) {
235 return Err(self.rollback_actor_start_failure(actor_id, start_err));
236 }
237
238 Ok(())
239 }
240
241 fn start_created_strategy(&self, strategy_id: &StrategyId, start: bool) -> anyhow::Result<()> {
242 if !start {
243 return Ok(());
244 }
245
246 if let Err(start_err) = self.start_strategy(strategy_id) {
247 return Err(self.rollback_strategy_start_failure(strategy_id, start_err));
248 }
249
250 Ok(())
251 }
252
253 fn rollback_actor_start_failure(
254 &self,
255 actor_id: &ActorId,
256 start_err: anyhow::Error,
257 ) -> anyhow::Error {
258 match self.remove_actor(actor_id) {
259 Ok(()) => start_err,
260 Err(rollback_err) => anyhow::anyhow!(
261 "Failed to start actor {actor_id}: {start_err}; rollback failed: {rollback_err}"
262 ),
263 }
264 }
265
266 fn rollback_strategy_start_failure(
267 &self,
268 strategy_id: &StrategyId,
269 start_err: anyhow::Error,
270 ) -> anyhow::Error {
271 match self.remove_strategy(strategy_id) {
272 Ok(()) => start_err,
273 Err(rollback_err) => anyhow::anyhow!(
274 "Failed to start strategy {strategy_id}: {start_err}; rollback failed: {rollback_err}"
275 ),
276 }
277 }
278
279 fn register_execute_endpoint(&self) {
280 let controller_id = self.actor_id().inner();
281 let handler = TypedHandler::from(move |command: &ControllerCommand| {
282 if let Some(mut controller) = try_get_actor_unchecked::<Self>(&controller_id) {
283 if let Err(e) = controller.execute(*command) {
284 log::error!("Controller command failed for {controller_id}: {e}");
285 }
286 } else {
287 log::error!("Controller {controller_id} not found for command handling");
288 }
289 });
290
291 get_message_bus()
292 .borrow_mut()
293 .endpoint_map::<ControllerCommand>()
294 .register(Self::execute_endpoint(), handler);
295 }
296
297 fn deregister_execute_endpoint(&self) {
298 get_message_bus()
299 .borrow_mut()
300 .endpoint_map::<ControllerCommand>()
301 .deregister(Self::execute_endpoint());
302 }
303
304 fn execute_endpoint() -> MStr<Endpoint> {
305 Self::EXECUTE_ENDPOINT.into()
306 }
307}
308
309impl DataActor for Controller {
310 fn on_start(&mut self) -> anyhow::Result<()> {
311 self.register_execute_endpoint();
312 Ok(())
313 }
314
315 fn on_stop(&mut self) -> anyhow::Result<()> {
316 self.deregister_execute_endpoint();
317 Ok(())
318 }
319
320 fn on_resume(&mut self) -> anyhow::Result<()> {
321 self.register_execute_endpoint();
322 Ok(())
323 }
324
325 fn on_dispose(&mut self) -> anyhow::Result<()> {
326 self.deregister_execute_endpoint();
327 Ok(())
328 }
329}
330
331nautilus_actor!(Controller);
332
333#[cfg(test)]
334mod tests {
335 use nautilus_common::{
336 cache::Cache,
337 clock::{Clock, TestClock},
338 enums::{ComponentState, Environment},
339 msgbus::{MessageBus, set_message_bus},
340 };
341 use nautilus_core::UUID4;
342 use nautilus_model::{identifiers::TraderId, stubs::TestDefault};
343 use nautilus_portfolio::portfolio::Portfolio;
344 use nautilus_trading::{
345 nautilus_strategy,
346 strategy::{StrategyConfig, StrategyCore},
347 };
348 use rstest::rstest;
349
350 use super::*;
351
352 #[derive(Debug)]
353 struct TestDataActor {
354 core: DataActorCore,
355 }
356
357 impl TestDataActor {
358 fn new(config: DataActorConfig) -> Self {
359 Self {
360 core: DataActorCore::new(config),
361 }
362 }
363 }
364
365 impl DataActor for TestDataActor {}
366
367 nautilus_actor!(TestDataActor);
368
369 #[derive(Debug)]
370 struct TestStrategy {
371 core: StrategyCore,
372 }
373
374 impl TestStrategy {
375 fn new(config: StrategyConfig) -> Self {
376 Self {
377 core: StrategyCore::new(config),
378 }
379 }
380 }
381
382 impl DataActor for TestStrategy {}
383
384 nautilus_strategy!(TestStrategy);
385
386 #[derive(Debug)]
387 struct FailingStartActor {
388 core: DataActorCore,
389 }
390
391 impl FailingStartActor {
392 fn new(config: DataActorConfig) -> Self {
393 Self {
394 core: DataActorCore::new(config),
395 }
396 }
397 }
398
399 impl DataActor for FailingStartActor {
400 fn on_start(&mut self) -> anyhow::Result<()> {
401 anyhow::bail!("Simulated actor start failure")
402 }
403 }
404
405 nautilus_actor!(FailingStartActor);
406
407 #[derive(Debug)]
408 struct FailingStartStrategy {
409 core: StrategyCore,
410 }
411
412 impl FailingStartStrategy {
413 fn new(config: StrategyConfig) -> Self {
414 Self {
415 core: StrategyCore::new(config),
416 }
417 }
418 }
419
420 impl DataActor for FailingStartStrategy {
421 fn on_start(&mut self) -> anyhow::Result<()> {
422 anyhow::bail!("Simulated strategy start failure")
423 }
424 }
425
426 nautilus_strategy!(FailingStartStrategy);
427
428 #[derive(Debug)]
429 struct ReentrantExitStrategy {
430 core: StrategyCore,
431 actor_to_stop: ActorId,
432 }
433
434 impl ReentrantExitStrategy {
435 fn new(config: StrategyConfig, actor_to_stop: ActorId) -> Self {
436 Self {
437 core: StrategyCore::new(config),
438 actor_to_stop,
439 }
440 }
441 }
442
443 impl DataActor for ReentrantExitStrategy {}
444
445 nautilus_strategy!(ReentrantExitStrategy, {
446 fn on_market_exit(&mut self) {
447 Controller::send(ControllerCommand::StopActor(self.actor_to_stop)).unwrap();
448 }
449 });
450
451 fn create_running_controller() -> (Rc<RefCell<Trader>>, ActorId) {
452 let trader_id = TraderId::test_default();
453 let instance_id = UUID4::new();
454 let clock = Rc::new(RefCell::new(TestClock::new()));
455 clock.borrow_mut().set_time(1_000_000_000u64.into());
456
457 let msgbus = Rc::new(RefCell::new(MessageBus::new(
458 trader_id,
459 instance_id,
460 Some("test".to_string()),
461 None,
462 )));
463 set_message_bus(msgbus);
464
465 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
466 let portfolio = Rc::new(RefCell::new(Portfolio::new(
467 cache.clone(),
468 clock.clone() as Rc<RefCell<dyn Clock>>,
469 None,
470 )));
471
472 let trader = Rc::new(RefCell::new(Trader::new(
473 trader_id,
474 instance_id,
475 Environment::Backtest,
476 clock as Rc<RefCell<dyn Clock>>,
477 cache,
478 portfolio,
479 )));
480 trader.borrow_mut().initialize().unwrap();
481
482 let controller = Controller::new(
483 trader.clone(),
484 Some(DataActorConfig {
485 actor_id: Some(ActorId::from("Controller-001")),
486 ..Default::default()
487 }),
488 );
489 let controller_id = controller.actor_id();
490
491 trader.borrow_mut().add_actor(controller).unwrap();
492 trader.borrow_mut().start().unwrap();
493
494 (trader, controller_id)
495 }
496
497 #[rstest]
498 fn test_controller_manages_actor_lifecycle_by_message() {
499 let (trader, controller_id) = create_running_controller();
500 let controller_actor_id = controller_id.inner();
501
502 let actor_id = {
503 let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
504 controller
505 .create_actor(
506 TestDataActor::new(DataActorConfig {
507 actor_id: Some(ActorId::from("TestActor-001")),
508 ..Default::default()
509 }),
510 false,
511 )
512 .unwrap()
513 };
514
515 assert!(trader.borrow().actor_ids().contains(&actor_id));
516
517 Controller::send(ControllerCommand::StartActor(actor_id)).unwrap();
518 let actor_registry_id = actor_id.inner();
519 assert_eq!(
520 try_get_actor_unchecked::<TestDataActor>(&actor_registry_id)
521 .unwrap()
522 .state(),
523 ComponentState::Running
524 );
525
526 Controller::send(ControllerCommand::StopActor(actor_id)).unwrap();
527 assert_eq!(
528 try_get_actor_unchecked::<TestDataActor>(&actor_registry_id)
529 .unwrap()
530 .state(),
531 ComponentState::Stopped
532 );
533
534 Controller::send(ControllerCommand::RemoveActor(actor_id)).unwrap();
535 assert!(!trader.borrow().actor_ids().contains(&actor_id));
536
537 trader.borrow_mut().stop().unwrap();
538 trader.borrow_mut().dispose_components().unwrap();
539 }
540
541 #[rstest]
542 fn test_controller_manages_strategy_lifecycle_and_exit_market() {
543 let (trader, controller_id) = create_running_controller();
544 let controller_actor_id = controller_id.inner();
545
546 let strategy_id = {
547 let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
548 controller
549 .create_strategy(
550 TestStrategy::new(StrategyConfig {
551 strategy_id: Some(StrategyId::from("TestStrategy-001")),
552 order_id_tag: Some("001".to_string()),
553 ..Default::default()
554 }),
555 false,
556 )
557 .unwrap()
558 };
559
560 assert!(trader.borrow().strategy_ids().contains(&strategy_id));
561
562 Controller::send(ControllerCommand::StartStrategy(strategy_id)).unwrap();
563 let strategy_registry_id = strategy_id.inner();
564 assert_eq!(
565 try_get_actor_unchecked::<TestStrategy>(&strategy_registry_id)
566 .unwrap()
567 .state(),
568 ComponentState::Running
569 );
570
571 Controller::send(ControllerCommand::ExitMarket(strategy_id)).unwrap();
572 assert!(
573 try_get_actor_unchecked::<TestStrategy>(&strategy_registry_id)
574 .unwrap()
575 .is_exiting()
576 );
577
578 Controller::send(ControllerCommand::StopStrategy(strategy_id)).unwrap();
579 let strategy = try_get_actor_unchecked::<TestStrategy>(&strategy_registry_id).unwrap();
580 assert_eq!(strategy.state(), ComponentState::Stopped);
581 assert!(!strategy.is_exiting());
582 drop(strategy);
583
584 Controller::send(ControllerCommand::RemoveStrategy(strategy_id)).unwrap();
585 assert!(!trader.borrow().strategy_ids().contains(&strategy_id));
586
587 trader.borrow_mut().stop().unwrap();
588 trader.borrow_mut().dispose_components().unwrap();
589 }
590
591 #[rstest]
592 fn test_controller_create_actor_rolls_back_on_start_failure() {
593 let (trader, controller_id) = create_running_controller();
594 let controller_actor_id = controller_id.inner();
595 let actor_id = ActorId::from("FailingActor-001");
596
597 let result = {
598 let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
599 controller.create_actor(
600 FailingStartActor::new(DataActorConfig {
601 actor_id: Some(actor_id),
602 ..Default::default()
603 }),
604 true,
605 )
606 };
607
608 assert!(result.is_err());
609 assert!(
610 result
611 .unwrap_err()
612 .to_string()
613 .contains("Simulated actor start failure")
614 );
615 assert!(!trader.borrow().actor_ids().contains(&actor_id));
616 if let Some(actor) = try_get_actor_unchecked::<FailingStartActor>(&actor_id.inner()) {
617 assert_eq!(actor.state(), ComponentState::Disposed);
618 }
619
620 trader.borrow_mut().stop().unwrap();
621 trader.borrow_mut().dispose_components().unwrap();
622 }
623
624 #[rstest]
625 fn test_controller_create_strategy_rolls_back_on_start_failure() {
626 let (trader, controller_id) = create_running_controller();
627 let controller_actor_id = controller_id.inner();
628 let strategy_id = StrategyId::from("FailingStrategy-001");
629
630 let result = {
631 let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
632 controller.create_strategy(
633 FailingStartStrategy::new(StrategyConfig {
634 strategy_id: Some(strategy_id),
635 order_id_tag: Some("001".to_string()),
636 ..Default::default()
637 }),
638 true,
639 )
640 };
641
642 assert!(result.is_err());
643 assert!(
644 result
645 .unwrap_err()
646 .to_string()
647 .contains("Simulated strategy start failure")
648 );
649 assert!(!trader.borrow().strategy_ids().contains(&strategy_id));
650
651 if let Some(strategy) =
652 try_get_actor_unchecked::<FailingStartStrategy>(&strategy_id.inner())
653 {
654 assert_eq!(strategy.state(), ComponentState::Disposed);
655 }
656
657 trader.borrow_mut().stop().unwrap();
658 trader.borrow_mut().dispose_components().unwrap();
659 }
660
661 #[rstest]
662 fn test_controller_exit_market_allows_reentrant_controller_commands() {
663 let (trader, controller_id) = create_running_controller();
664 let controller_actor_id = controller_id.inner();
665
666 let helper_actor_id = {
667 let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
668 controller
669 .create_actor(
670 TestDataActor::new(DataActorConfig {
671 actor_id: Some(ActorId::from("HelperActor-001")),
672 ..Default::default()
673 }),
674 true,
675 )
676 .unwrap()
677 };
678
679 let strategy_id = {
680 let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
681 controller
682 .create_strategy(
683 ReentrantExitStrategy::new(
684 StrategyConfig {
685 strategy_id: Some(StrategyId::from("ReentrantStrategy-001")),
686 order_id_tag: Some("001".to_string()),
687 ..Default::default()
688 },
689 helper_actor_id,
690 ),
691 false,
692 )
693 .unwrap()
694 };
695
696 Controller::send(ControllerCommand::StartStrategy(strategy_id)).unwrap();
697 Controller::send(ControllerCommand::ExitMarket(strategy_id)).unwrap();
698
699 let helper_actor =
700 try_get_actor_unchecked::<TestDataActor>(&helper_actor_id.inner()).unwrap();
701 assert_eq!(helper_actor.state(), ComponentState::Stopped);
702 drop(helper_actor);
703 assert!(
704 try_get_actor_unchecked::<ReentrantExitStrategy>(&strategy_id.inner())
705 .unwrap()
706 .is_exiting()
707 );
708
709 Controller::send(ControllerCommand::StopStrategy(strategy_id)).unwrap();
710 Controller::send(ControllerCommand::RemoveStrategy(strategy_id)).unwrap();
711 Controller::send(ControllerCommand::RemoveActor(helper_actor_id)).unwrap();
712 trader.borrow_mut().stop().unwrap();
713 trader.borrow_mut().dispose_components().unwrap();
714 }
715
716 #[rstest]
717 fn test_controller_send_fails_after_controller_stop() {
718 let (trader, _) = create_running_controller();
719
720 trader.borrow_mut().stop().unwrap();
721
722 let result = Controller::send(ControllerCommand::StopActor(ActorId::from("AnyActor-001")));
723 assert!(result.is_err());
724 assert_eq!(
725 result.unwrap_err().to_string(),
726 "Controller execute endpoint 'Controller.execute' not registered"
727 );
728
729 trader.borrow_mut().dispose_components().unwrap();
730 }
731}