Skip to main content

nautilus_system/
controller.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use nautilus_common::{
19    actor::{
20        DataActor, DataActorCore, data_actor::DataActorConfig, registry::try_get_actor_unchecked,
21    },
22    component::Component,
23    msgbus::{Endpoint, MStr, TypedHandler, get_message_bus},
24    nautilus_actor,
25};
26use nautilus_model::identifiers::{ActorId, StrategyId};
27use nautilus_trading::Strategy;
28
29use crate::{messages::ControllerCommand, trader::Trader};
30
31#[derive(Debug)]
32pub struct Controller {
33    core: DataActorCore,
34    trader: Rc<RefCell<Trader>>,
35}
36
37impl Controller {
38    pub const EXECUTE_ENDPOINT: &str = "Controller.execute";
39
40    #[must_use]
41    pub fn new(trader: Rc<RefCell<Trader>>, config: Option<DataActorConfig>) -> Self {
42        Self {
43            core: DataActorCore::new(config.unwrap_or_default()),
44            trader,
45        }
46    }
47
48    /// Sends a controller command to the registered controller endpoint.
49    ///
50    /// # Errors
51    ///
52    /// Returns an error if the controller execute endpoint is not registered.
53    pub fn send(command: ControllerCommand) -> anyhow::Result<()> {
54        let endpoint = Self::execute_endpoint();
55        let handler = {
56            let msgbus = get_message_bus();
57            msgbus
58                .borrow_mut()
59                .endpoint_map::<ControllerCommand>()
60                .get(endpoint)
61                .cloned()
62        };
63
64        let Some(handler) = handler else {
65            anyhow::bail!(
66                "Controller execute endpoint '{}' not registered",
67                endpoint.as_str()
68            );
69        };
70
71        handler.handle(&command);
72        Ok(())
73    }
74
75    /// Executes a controller command against the underlying trader.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the requested lifecycle operation fails.
80    pub fn execute(&mut self, command: ControllerCommand) -> anyhow::Result<()> {
81        match command {
82            ControllerCommand::StartActor(actor_id) => self.start_actor(&actor_id),
83            ControllerCommand::StopActor(actor_id) => self.stop_actor(&actor_id),
84            ControllerCommand::RemoveActor(actor_id) => self.remove_actor(&actor_id),
85            ControllerCommand::StartStrategy(strategy_id) => self.start_strategy(&strategy_id),
86            ControllerCommand::StopStrategy(strategy_id) => self.stop_strategy(&strategy_id),
87            ControllerCommand::ExitMarket(strategy_id) => self.exit_market(&strategy_id),
88            ControllerCommand::RemoveStrategy(strategy_id) => self.remove_strategy(&strategy_id),
89        }
90    }
91
92    /// Creates a new actor and optionally starts it.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if actor registration or startup fails.
97    pub fn create_actor<T>(&self, actor: T, start: bool) -> anyhow::Result<ActorId>
98    where
99        T: DataActor + Component + Debug + 'static,
100    {
101        let actor_id = actor.actor_id();
102        self.trader.borrow_mut().add_actor(actor)?;
103
104        self.start_created_actor(&actor_id, start)?;
105
106        Ok(actor_id)
107    }
108
109    /// Creates a new actor from a factory and optionally starts it.
110    ///
111    /// # Errors
112    ///
113    /// Returns an error if the factory, actor registration, or startup fails.
114    pub fn create_actor_from_factory<F, T>(
115        &self,
116        factory: F,
117        start: bool,
118    ) -> anyhow::Result<ActorId>
119    where
120        F: FnOnce() -> anyhow::Result<T>,
121        T: DataActor + Component + Debug + 'static,
122    {
123        let actor = factory()?;
124        self.create_actor(actor, start)
125    }
126
127    /// Creates a new strategy and optionally starts it.
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if strategy registration or startup fails.
132    pub fn create_strategy<T>(&self, strategy: T, start: bool) -> anyhow::Result<StrategyId>
133    where
134        T: Strategy + Component + Debug + 'static,
135    {
136        let strategy_id = StrategyId::from(strategy.component_id().inner().as_str());
137        self.trader.borrow_mut().add_strategy(strategy)?;
138
139        self.start_created_strategy(&strategy_id, start)?;
140
141        Ok(strategy_id)
142    }
143
144    /// Creates a new strategy from a factory and optionally starts it.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the factory, strategy registration, or startup fails.
149    pub fn create_strategy_from_factory<F, T>(
150        &self,
151        factory: F,
152        start: bool,
153    ) -> anyhow::Result<StrategyId>
154    where
155        F: FnOnce() -> anyhow::Result<T>,
156        T: Strategy + Component + Debug + 'static,
157    {
158        let strategy = factory()?;
159        self.create_strategy(strategy, start)
160    }
161
162    /// Starts the registered actor with the given identifier.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the actor is not registered or cannot be started.
167    pub fn start_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
168        self.trader.borrow().start_actor(actor_id)
169    }
170
171    /// Stops the registered actor with the given identifier.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if the actor is not registered or cannot be stopped.
176    pub fn stop_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
177        self.trader.borrow().stop_actor(actor_id)
178    }
179
180    /// Removes the registered actor with the given identifier.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the actor cannot be removed.
185    pub fn remove_actor(&self, actor_id: &ActorId) -> anyhow::Result<()> {
186        if actor_id.inner() == self.actor_id().inner() {
187            return Ok(());
188        }
189
190        self.trader.borrow_mut().remove_actor(actor_id)
191    }
192
193    /// Starts the registered strategy with the given identifier.
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the strategy is not registered or cannot be started.
198    pub fn start_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
199        self.trader.borrow().start_strategy(strategy_id)
200    }
201
202    /// Stops the registered strategy with the given identifier.
203    ///
204    /// # Errors
205    ///
206    /// Returns an error if the strategy is not registered or cannot be stopped.
207    pub fn stop_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
208        self.trader.borrow_mut().stop_strategy(strategy_id)
209    }
210
211    /// Sends an exit-market command to the registered strategy.
212    ///
213    /// # Errors
214    ///
215    /// Returns an error if the strategy is not registered or its control endpoint is missing.
216    pub fn exit_market(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
217        Trader::market_exit_strategy(&self.trader, strategy_id)
218    }
219
220    /// Removes the registered strategy with the given identifier.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if the strategy cannot be removed.
225    pub fn remove_strategy(&self, strategy_id: &StrategyId) -> anyhow::Result<()> {
226        self.trader.borrow_mut().remove_strategy(strategy_id)
227    }
228
229    fn start_created_actor(&self, actor_id: &ActorId, start: bool) -> anyhow::Result<()> {
230        if !start {
231            return Ok(());
232        }
233
234        if let Err(start_err) = self.start_actor(actor_id) {
235            return Err(self.rollback_actor_start_failure(actor_id, start_err));
236        }
237
238        Ok(())
239    }
240
241    fn start_created_strategy(&self, strategy_id: &StrategyId, start: bool) -> anyhow::Result<()> {
242        if !start {
243            return Ok(());
244        }
245
246        if let Err(start_err) = self.start_strategy(strategy_id) {
247            return Err(self.rollback_strategy_start_failure(strategy_id, start_err));
248        }
249
250        Ok(())
251    }
252
253    fn rollback_actor_start_failure(
254        &self,
255        actor_id: &ActorId,
256        start_err: anyhow::Error,
257    ) -> anyhow::Error {
258        match self.remove_actor(actor_id) {
259            Ok(()) => start_err,
260            Err(rollback_err) => anyhow::anyhow!(
261                "Failed to start actor {actor_id}: {start_err}; rollback failed: {rollback_err}"
262            ),
263        }
264    }
265
266    fn rollback_strategy_start_failure(
267        &self,
268        strategy_id: &StrategyId,
269        start_err: anyhow::Error,
270    ) -> anyhow::Error {
271        match self.remove_strategy(strategy_id) {
272            Ok(()) => start_err,
273            Err(rollback_err) => anyhow::anyhow!(
274                "Failed to start strategy {strategy_id}: {start_err}; rollback failed: {rollback_err}"
275            ),
276        }
277    }
278
279    fn register_execute_endpoint(&self) {
280        let controller_id = self.actor_id().inner();
281        let handler = TypedHandler::from(move |command: &ControllerCommand| {
282            if let Some(mut controller) = try_get_actor_unchecked::<Self>(&controller_id) {
283                if let Err(e) = controller.execute(*command) {
284                    log::error!("Controller command failed for {controller_id}: {e}");
285                }
286            } else {
287                log::error!("Controller {controller_id} not found for command handling");
288            }
289        });
290
291        get_message_bus()
292            .borrow_mut()
293            .endpoint_map::<ControllerCommand>()
294            .register(Self::execute_endpoint(), handler);
295    }
296
297    fn deregister_execute_endpoint(&self) {
298        get_message_bus()
299            .borrow_mut()
300            .endpoint_map::<ControllerCommand>()
301            .deregister(Self::execute_endpoint());
302    }
303
304    fn execute_endpoint() -> MStr<Endpoint> {
305        Self::EXECUTE_ENDPOINT.into()
306    }
307}
308
309impl DataActor for Controller {
310    fn on_start(&mut self) -> anyhow::Result<()> {
311        self.register_execute_endpoint();
312        Ok(())
313    }
314
315    fn on_stop(&mut self) -> anyhow::Result<()> {
316        self.deregister_execute_endpoint();
317        Ok(())
318    }
319
320    fn on_resume(&mut self) -> anyhow::Result<()> {
321        self.register_execute_endpoint();
322        Ok(())
323    }
324
325    fn on_dispose(&mut self) -> anyhow::Result<()> {
326        self.deregister_execute_endpoint();
327        Ok(())
328    }
329}
330
331nautilus_actor!(Controller);
332
333#[cfg(test)]
334mod tests {
335    use nautilus_common::{
336        cache::Cache,
337        clock::{Clock, TestClock},
338        enums::{ComponentState, Environment},
339        msgbus::{MessageBus, set_message_bus},
340    };
341    use nautilus_core::UUID4;
342    use nautilus_model::{identifiers::TraderId, stubs::TestDefault};
343    use nautilus_portfolio::portfolio::Portfolio;
344    use nautilus_trading::{
345        nautilus_strategy,
346        strategy::{StrategyConfig, StrategyCore},
347    };
348    use rstest::rstest;
349
350    use super::*;
351
352    #[derive(Debug)]
353    struct TestDataActor {
354        core: DataActorCore,
355    }
356
357    impl TestDataActor {
358        fn new(config: DataActorConfig) -> Self {
359            Self {
360                core: DataActorCore::new(config),
361            }
362        }
363    }
364
365    impl DataActor for TestDataActor {}
366
367    nautilus_actor!(TestDataActor);
368
369    #[derive(Debug)]
370    struct TestStrategy {
371        core: StrategyCore,
372    }
373
374    impl TestStrategy {
375        fn new(config: StrategyConfig) -> Self {
376            Self {
377                core: StrategyCore::new(config),
378            }
379        }
380    }
381
382    impl DataActor for TestStrategy {}
383
384    nautilus_strategy!(TestStrategy);
385
386    #[derive(Debug)]
387    struct FailingStartActor {
388        core: DataActorCore,
389    }
390
391    impl FailingStartActor {
392        fn new(config: DataActorConfig) -> Self {
393            Self {
394                core: DataActorCore::new(config),
395            }
396        }
397    }
398
399    impl DataActor for FailingStartActor {
400        fn on_start(&mut self) -> anyhow::Result<()> {
401            anyhow::bail!("Simulated actor start failure")
402        }
403    }
404
405    nautilus_actor!(FailingStartActor);
406
407    #[derive(Debug)]
408    struct FailingStartStrategy {
409        core: StrategyCore,
410    }
411
412    impl FailingStartStrategy {
413        fn new(config: StrategyConfig) -> Self {
414            Self {
415                core: StrategyCore::new(config),
416            }
417        }
418    }
419
420    impl DataActor for FailingStartStrategy {
421        fn on_start(&mut self) -> anyhow::Result<()> {
422            anyhow::bail!("Simulated strategy start failure")
423        }
424    }
425
426    nautilus_strategy!(FailingStartStrategy);
427
428    #[derive(Debug)]
429    struct ReentrantExitStrategy {
430        core: StrategyCore,
431        actor_to_stop: ActorId,
432    }
433
434    impl ReentrantExitStrategy {
435        fn new(config: StrategyConfig, actor_to_stop: ActorId) -> Self {
436            Self {
437                core: StrategyCore::new(config),
438                actor_to_stop,
439            }
440        }
441    }
442
443    impl DataActor for ReentrantExitStrategy {}
444
445    nautilus_strategy!(ReentrantExitStrategy, {
446        fn on_market_exit(&mut self) {
447            Controller::send(ControllerCommand::StopActor(self.actor_to_stop)).unwrap();
448        }
449    });
450
451    fn create_running_controller() -> (Rc<RefCell<Trader>>, ActorId) {
452        let trader_id = TraderId::test_default();
453        let instance_id = UUID4::new();
454        let clock = Rc::new(RefCell::new(TestClock::new()));
455        clock.borrow_mut().set_time(1_000_000_000u64.into());
456
457        let msgbus = Rc::new(RefCell::new(MessageBus::new(
458            trader_id,
459            instance_id,
460            Some("test".to_string()),
461            None,
462        )));
463        set_message_bus(msgbus);
464
465        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
466        let portfolio = Rc::new(RefCell::new(Portfolio::new(
467            cache.clone(),
468            clock.clone() as Rc<RefCell<dyn Clock>>,
469            None,
470        )));
471
472        let trader = Rc::new(RefCell::new(Trader::new(
473            trader_id,
474            instance_id,
475            Environment::Backtest,
476            clock as Rc<RefCell<dyn Clock>>,
477            cache,
478            portfolio,
479        )));
480        trader.borrow_mut().initialize().unwrap();
481
482        let controller = Controller::new(
483            trader.clone(),
484            Some(DataActorConfig {
485                actor_id: Some(ActorId::from("Controller-001")),
486                ..Default::default()
487            }),
488        );
489        let controller_id = controller.actor_id();
490
491        trader.borrow_mut().add_actor(controller).unwrap();
492        trader.borrow_mut().start().unwrap();
493
494        (trader, controller_id)
495    }
496
497    #[rstest]
498    fn test_controller_manages_actor_lifecycle_by_message() {
499        let (trader, controller_id) = create_running_controller();
500        let controller_actor_id = controller_id.inner();
501
502        let actor_id = {
503            let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
504            controller
505                .create_actor(
506                    TestDataActor::new(DataActorConfig {
507                        actor_id: Some(ActorId::from("TestActor-001")),
508                        ..Default::default()
509                    }),
510                    false,
511                )
512                .unwrap()
513        };
514
515        assert!(trader.borrow().actor_ids().contains(&actor_id));
516
517        Controller::send(ControllerCommand::StartActor(actor_id)).unwrap();
518        let actor_registry_id = actor_id.inner();
519        assert_eq!(
520            try_get_actor_unchecked::<TestDataActor>(&actor_registry_id)
521                .unwrap()
522                .state(),
523            ComponentState::Running
524        );
525
526        Controller::send(ControllerCommand::StopActor(actor_id)).unwrap();
527        assert_eq!(
528            try_get_actor_unchecked::<TestDataActor>(&actor_registry_id)
529                .unwrap()
530                .state(),
531            ComponentState::Stopped
532        );
533
534        Controller::send(ControllerCommand::RemoveActor(actor_id)).unwrap();
535        assert!(!trader.borrow().actor_ids().contains(&actor_id));
536
537        trader.borrow_mut().stop().unwrap();
538        trader.borrow_mut().dispose_components().unwrap();
539    }
540
541    #[rstest]
542    fn test_controller_manages_strategy_lifecycle_and_exit_market() {
543        let (trader, controller_id) = create_running_controller();
544        let controller_actor_id = controller_id.inner();
545
546        let strategy_id = {
547            let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
548            controller
549                .create_strategy(
550                    TestStrategy::new(StrategyConfig {
551                        strategy_id: Some(StrategyId::from("TestStrategy-001")),
552                        order_id_tag: Some("001".to_string()),
553                        ..Default::default()
554                    }),
555                    false,
556                )
557                .unwrap()
558        };
559
560        assert!(trader.borrow().strategy_ids().contains(&strategy_id));
561
562        Controller::send(ControllerCommand::StartStrategy(strategy_id)).unwrap();
563        let strategy_registry_id = strategy_id.inner();
564        assert_eq!(
565            try_get_actor_unchecked::<TestStrategy>(&strategy_registry_id)
566                .unwrap()
567                .state(),
568            ComponentState::Running
569        );
570
571        Controller::send(ControllerCommand::ExitMarket(strategy_id)).unwrap();
572        assert!(
573            try_get_actor_unchecked::<TestStrategy>(&strategy_registry_id)
574                .unwrap()
575                .is_exiting()
576        );
577
578        Controller::send(ControllerCommand::StopStrategy(strategy_id)).unwrap();
579        let strategy = try_get_actor_unchecked::<TestStrategy>(&strategy_registry_id).unwrap();
580        assert_eq!(strategy.state(), ComponentState::Stopped);
581        assert!(!strategy.is_exiting());
582        drop(strategy);
583
584        Controller::send(ControllerCommand::RemoveStrategy(strategy_id)).unwrap();
585        assert!(!trader.borrow().strategy_ids().contains(&strategy_id));
586
587        trader.borrow_mut().stop().unwrap();
588        trader.borrow_mut().dispose_components().unwrap();
589    }
590
591    #[rstest]
592    fn test_controller_create_actor_rolls_back_on_start_failure() {
593        let (trader, controller_id) = create_running_controller();
594        let controller_actor_id = controller_id.inner();
595        let actor_id = ActorId::from("FailingActor-001");
596
597        let result = {
598            let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
599            controller.create_actor(
600                FailingStartActor::new(DataActorConfig {
601                    actor_id: Some(actor_id),
602                    ..Default::default()
603                }),
604                true,
605            )
606        };
607
608        assert!(result.is_err());
609        assert!(
610            result
611                .unwrap_err()
612                .to_string()
613                .contains("Simulated actor start failure")
614        );
615        assert!(!trader.borrow().actor_ids().contains(&actor_id));
616        if let Some(actor) = try_get_actor_unchecked::<FailingStartActor>(&actor_id.inner()) {
617            assert_eq!(actor.state(), ComponentState::Disposed);
618        }
619
620        trader.borrow_mut().stop().unwrap();
621        trader.borrow_mut().dispose_components().unwrap();
622    }
623
624    #[rstest]
625    fn test_controller_create_strategy_rolls_back_on_start_failure() {
626        let (trader, controller_id) = create_running_controller();
627        let controller_actor_id = controller_id.inner();
628        let strategy_id = StrategyId::from("FailingStrategy-001");
629
630        let result = {
631            let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
632            controller.create_strategy(
633                FailingStartStrategy::new(StrategyConfig {
634                    strategy_id: Some(strategy_id),
635                    order_id_tag: Some("001".to_string()),
636                    ..Default::default()
637                }),
638                true,
639            )
640        };
641
642        assert!(result.is_err());
643        assert!(
644            result
645                .unwrap_err()
646                .to_string()
647                .contains("Simulated strategy start failure")
648        );
649        assert!(!trader.borrow().strategy_ids().contains(&strategy_id));
650
651        if let Some(strategy) =
652            try_get_actor_unchecked::<FailingStartStrategy>(&strategy_id.inner())
653        {
654            assert_eq!(strategy.state(), ComponentState::Disposed);
655        }
656
657        trader.borrow_mut().stop().unwrap();
658        trader.borrow_mut().dispose_components().unwrap();
659    }
660
661    #[rstest]
662    fn test_controller_exit_market_allows_reentrant_controller_commands() {
663        let (trader, controller_id) = create_running_controller();
664        let controller_actor_id = controller_id.inner();
665
666        let helper_actor_id = {
667            let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
668            controller
669                .create_actor(
670                    TestDataActor::new(DataActorConfig {
671                        actor_id: Some(ActorId::from("HelperActor-001")),
672                        ..Default::default()
673                    }),
674                    true,
675                )
676                .unwrap()
677        };
678
679        let strategy_id = {
680            let controller = try_get_actor_unchecked::<Controller>(&controller_actor_id).unwrap();
681            controller
682                .create_strategy(
683                    ReentrantExitStrategy::new(
684                        StrategyConfig {
685                            strategy_id: Some(StrategyId::from("ReentrantStrategy-001")),
686                            order_id_tag: Some("001".to_string()),
687                            ..Default::default()
688                        },
689                        helper_actor_id,
690                    ),
691                    false,
692                )
693                .unwrap()
694        };
695
696        Controller::send(ControllerCommand::StartStrategy(strategy_id)).unwrap();
697        Controller::send(ControllerCommand::ExitMarket(strategy_id)).unwrap();
698
699        let helper_actor =
700            try_get_actor_unchecked::<TestDataActor>(&helper_actor_id.inner()).unwrap();
701        assert_eq!(helper_actor.state(), ComponentState::Stopped);
702        drop(helper_actor);
703        assert!(
704            try_get_actor_unchecked::<ReentrantExitStrategy>(&strategy_id.inner())
705                .unwrap()
706                .is_exiting()
707        );
708
709        Controller::send(ControllerCommand::StopStrategy(strategy_id)).unwrap();
710        Controller::send(ControllerCommand::RemoveStrategy(strategy_id)).unwrap();
711        Controller::send(ControllerCommand::RemoveActor(helper_actor_id)).unwrap();
712        trader.borrow_mut().stop().unwrap();
713        trader.borrow_mut().dispose_components().unwrap();
714    }
715
716    #[rstest]
717    fn test_controller_send_fails_after_controller_stop() {
718        let (trader, _) = create_running_controller();
719
720        trader.borrow_mut().stop().unwrap();
721
722        let result = Controller::send(ControllerCommand::StopActor(ActorId::from("AnyActor-001")));
723        assert!(result.is_err());
724        assert_eq!(
725            result.unwrap_err().to_string(),
726            "Controller execute endpoint 'Controller.execute' not registered"
727        );
728
729        trader.borrow_mut().dispose_components().unwrap();
730    }
731}