Skip to main content

nautilus_trading/algorithm/
core.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Core component for execution algorithms.
17
18use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use indexmap::IndexMap;
22use nautilus_common::{
23    actor::{DataActorConfig, DataActorCore, DataActorNative},
24    cache::Cache,
25    clock::Clock,
26    msgbus::TypedHandler,
27};
28use nautilus_model::{
29    events::{OrderEventAny, PositionEvent},
30    identifiers::{ActorId, ClientOrderId, ExecAlgorithmId, StrategyId, TraderId},
31    orders::{OrderAny, OrderList},
32    types::Quantity,
33};
34
35use super::config::ExecutionAlgorithmConfig;
36
37/// Holds event handlers for strategy event subscriptions.
38#[derive(Clone, Debug)]
39pub struct StrategyEventHandlers {
40    /// The topic string for order events.
41    pub order_topic: String,
42    /// The handler for order events.
43    pub order_handler: TypedHandler<OrderEventAny>,
44    /// The topic string for position events.
45    pub position_topic: String,
46    /// The handler for position events.
47    pub position_handler: TypedHandler<PositionEvent>,
48}
49
50/// The core component of an [`ExecutionAlgorithm`](super::ExecutionAlgorithm).
51///
52/// This struct manages the internal state for execution algorithms including
53/// spawn ID tracking and strategy subscriptions. It wraps a [`DataActorCore`]
54/// to provide data actor capabilities.
55///
56/// User algorithms should hold this as a member and use the
57/// `nautilus_execution_algorithm!` macro to provide native runtime wiring.
58/// Direct access to this core is native runtime wiring and belongs behind
59/// [`ExecutionAlgorithmNative`].
60pub struct ExecutionAlgorithmCore {
61    /// The underlying data actor core.
62    pub actor: DataActorCore,
63    /// The execution algorithm configuration.
64    pub config: ExecutionAlgorithmConfig,
65    /// The execution algorithm ID.
66    pub exec_algorithm_id: ExecAlgorithmId,
67    /// Maps primary order client IDs to their spawn sequence counter.
68    exec_spawn_ids: AHashMap<ClientOrderId, u32>,
69    /// Tracks strategies that have been subscribed to for events.
70    subscribed_strategies: AHashSet<StrategyId>,
71    /// Tracks pending spawn reductions for quantity restoration on denial/rejection.
72    pending_spawn_reductions: AHashMap<ClientOrderId, Quantity>,
73    /// Maps strategies to their event handlers for cleanup on reset.
74    strategy_event_handlers: IndexMap<StrategyId, StrategyEventHandlers>,
75}
76
77/// Native-only access to internal execution algorithm runtime state.
78///
79/// Use this trait from engine, runtime, testkit, or opt-in native algorithm
80/// code when direct access to host runtime objects matters for an explicit
81/// latency-sensitive path, or when host integration code needs access below
82/// the facade API.
83///
84/// Do not import this trait in code intended to run through Python or the
85/// plug-in authoring surface. Native borrows, `Rc<RefCell<_>>`, and core
86/// references do not cross those boundaries.
87pub trait ExecutionAlgorithmNative: DataActorNative {
88    /// Returns the execution algorithm core.
89    fn exec_algorithm_core(&self) -> &ExecutionAlgorithmCore;
90
91    /// Returns the mutable execution algorithm core.
92    fn exec_algorithm_core_mut(&mut self) -> &mut ExecutionAlgorithmCore;
93}
94
95impl Debug for ExecutionAlgorithmCore {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct(stringify!(ExecutionAlgorithmCore))
98            .field("actor", &self.actor)
99            .field("config", &self.config)
100            .field("exec_algorithm_id", &self.exec_algorithm_id)
101            .field("exec_spawn_ids", &self.exec_spawn_ids.len())
102            .field("subscribed_strategies", &self.subscribed_strategies.len())
103            .field(
104                "pending_spawn_reductions",
105                &self.pending_spawn_reductions.len(),
106            )
107            .field(
108                "strategy_event_handlers",
109                &self.strategy_event_handlers.len(),
110            )
111            .finish()
112    }
113}
114
115impl ExecutionAlgorithmCore {
116    /// Creates a new [`ExecutionAlgorithmCore`] instance.
117    ///
118    /// # Panics
119    ///
120    /// Panics if `config.exec_algorithm_id` is `None`.
121    #[must_use]
122    pub fn new(config: ExecutionAlgorithmConfig) -> Self {
123        let exec_algorithm_id = config
124            .exec_algorithm_id
125            .expect("ExecutionAlgorithmConfig must have exec_algorithm_id set");
126
127        let actor_config = DataActorConfig {
128            actor_id: Some(ActorId::from(exec_algorithm_id.inner().as_str())),
129            log_events: config.log_events,
130            log_commands: config.log_commands,
131        };
132
133        Self {
134            actor: DataActorCore::new(actor_config),
135            config,
136            exec_algorithm_id,
137            exec_spawn_ids: AHashMap::new(),
138            subscribed_strategies: AHashSet::new(),
139            pending_spawn_reductions: AHashMap::new(),
140            strategy_event_handlers: IndexMap::new(),
141        }
142    }
143
144    /// Registers the execution algorithm with the trading engine components.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if registration with the actor core fails.
149    pub fn register(
150        &mut self,
151        trader_id: TraderId,
152        clock: Rc<RefCell<dyn Clock>>,
153        cache: Rc<RefCell<Cache>>,
154    ) -> anyhow::Result<()> {
155        self.actor.register(trader_id, clock, cache)
156    }
157
158    /// Returns the execution algorithm ID.
159    #[must_use]
160    pub fn id(&self) -> ExecAlgorithmId {
161        self.exec_algorithm_id
162    }
163
164    /// Generates the next spawn client order ID for a primary order.
165    ///
166    /// The generated ID follows the pattern: `{primary_id}-E{sequence}`.
167    #[must_use]
168    pub fn spawn_client_order_id(&mut self, primary_id: &ClientOrderId) -> ClientOrderId {
169        let sequence = self
170            .exec_spawn_ids
171            .entry(*primary_id)
172            .and_modify(|s| *s += 1)
173            .or_insert(1);
174
175        ClientOrderId::new(format!("{primary_id}-E{sequence}"))
176    }
177
178    /// Returns the current spawn sequence for a primary order, if any.
179    #[must_use]
180    pub fn spawn_sequence(&self, primary_id: &ClientOrderId) -> Option<u32> {
181        self.exec_spawn_ids.get(primary_id).copied()
182    }
183
184    /// Checks if a strategy has been subscribed to for events.
185    #[must_use]
186    pub fn is_strategy_subscribed(&self, strategy_id: &StrategyId) -> bool {
187        self.subscribed_strategies.contains(strategy_id)
188    }
189
190    /// Marks a strategy as subscribed for events.
191    pub fn add_subscribed_strategy(&mut self, strategy_id: StrategyId) {
192        self.subscribed_strategies.insert(strategy_id);
193    }
194
195    /// Stores the event handlers for a strategy subscription.
196    pub fn store_strategy_event_handlers(
197        &mut self,
198        strategy_id: StrategyId,
199        handlers: StrategyEventHandlers,
200    ) {
201        self.strategy_event_handlers.insert(strategy_id, handlers);
202    }
203
204    /// Takes and returns all stored strategy event handlers, clearing the internal map.
205    pub fn take_strategy_event_handlers(&mut self) -> IndexMap<StrategyId, StrategyEventHandlers> {
206        std::mem::take(&mut self.strategy_event_handlers)
207    }
208
209    /// Clears all spawn tracking state.
210    pub fn clear_spawn_ids(&mut self) {
211        self.exec_spawn_ids.clear();
212    }
213
214    /// Clears all strategy subscriptions.
215    pub fn clear_subscribed_strategies(&mut self) {
216        self.subscribed_strategies.clear();
217    }
218
219    /// Tracks a pending spawn reduction for potential restoration.
220    pub fn track_pending_spawn_reduction(&mut self, spawn_id: ClientOrderId, quantity: Quantity) {
221        self.pending_spawn_reductions.insert(spawn_id, quantity);
222    }
223
224    /// Removes and returns the pending spawn reduction for an order, if any.
225    pub fn take_pending_spawn_reduction(&mut self, spawn_id: &ClientOrderId) -> Option<Quantity> {
226        self.pending_spawn_reductions.remove(spawn_id)
227    }
228
229    /// Clears all pending spawn reductions.
230    pub fn clear_pending_spawn_reductions(&mut self) {
231        self.pending_spawn_reductions.clear();
232    }
233
234    /// Resets the core to its initial state.
235    ///
236    /// Note: This clears handler storage but does NOT unsubscribe from msgbus.
237    /// Call `unsubscribe_all_strategy_events` first to properly unsubscribe.
238    pub fn reset(&mut self) {
239        self.exec_spawn_ids.clear();
240        self.subscribed_strategies.clear();
241        self.pending_spawn_reductions.clear();
242        self.strategy_event_handlers.clear();
243    }
244
245    /// Returns the order for the given client order ID from the cache.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if the order is not found in the cache.
250    pub fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
251        Ok(self.cache_ref().try_order_owned(client_order_id)?)
252    }
253
254    /// Returns all orders for the given order list from the cache.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if any order is not found in the cache.
259    pub fn get_orders_for_list(&self, order_list: &OrderList) -> anyhow::Result<Vec<OrderAny>> {
260        order_list
261            .client_order_ids
262            .iter()
263            .map(|id| self.get_order(id))
264            .collect()
265    }
266}
267
268impl DataActorNative for ExecutionAlgorithmCore {
269    fn core(&self) -> &DataActorCore {
270        &self.actor
271    }
272
273    fn core_mut(&mut self) -> &mut DataActorCore {
274        &mut self.actor
275    }
276}
277
278impl ExecutionAlgorithmNative for ExecutionAlgorithmCore {
279    fn exec_algorithm_core(&self) -> &ExecutionAlgorithmCore {
280        self
281    }
282
283    fn exec_algorithm_core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
284        self
285    }
286}
287
288#[cfg(test)]
289mod tests {
290    use rstest::rstest;
291
292    use super::*;
293
294    fn create_test_config() -> ExecutionAlgorithmConfig {
295        ExecutionAlgorithmConfig {
296            exec_algorithm_id: Some(ExecAlgorithmId::new("TWAP")),
297            ..Default::default()
298        }
299    }
300
301    #[rstest]
302    fn test_core_new() {
303        let config = create_test_config();
304        let core = ExecutionAlgorithmCore::new(config.clone());
305
306        assert_eq!(core.exec_algorithm_id, ExecAlgorithmId::new("TWAP"));
307        assert_eq!(core.config.log_events, config.log_events);
308        assert!(core.exec_spawn_ids.is_empty());
309        assert!(core.subscribed_strategies.is_empty());
310    }
311
312    #[rstest]
313    fn test_spawn_client_order_id_sequence() {
314        let config = create_test_config();
315        let mut core = ExecutionAlgorithmCore::new(config);
316
317        let primary_id = ClientOrderId::new("O-001");
318
319        let spawn1 = core.spawn_client_order_id(&primary_id);
320        assert_eq!(spawn1.as_str(), "O-001-E1");
321
322        let spawn2 = core.spawn_client_order_id(&primary_id);
323        assert_eq!(spawn2.as_str(), "O-001-E2");
324
325        let spawn3 = core.spawn_client_order_id(&primary_id);
326        assert_eq!(spawn3.as_str(), "O-001-E3");
327    }
328
329    #[rstest]
330    fn test_spawn_client_order_id_different_primaries() {
331        let config = create_test_config();
332        let mut core = ExecutionAlgorithmCore::new(config);
333
334        let primary1 = ClientOrderId::new("O-001");
335        let primary2 = ClientOrderId::new("O-002");
336
337        let spawn1_1 = core.spawn_client_order_id(&primary1);
338        let spawn2_1 = core.spawn_client_order_id(&primary2);
339        let spawn1_2 = core.spawn_client_order_id(&primary1);
340
341        assert_eq!(spawn1_1.as_str(), "O-001-E1");
342        assert_eq!(spawn2_1.as_str(), "O-002-E1");
343        assert_eq!(spawn1_2.as_str(), "O-001-E2");
344    }
345
346    #[rstest]
347    fn test_spawn_sequence() {
348        let config = create_test_config();
349        let mut core = ExecutionAlgorithmCore::new(config);
350
351        let primary_id = ClientOrderId::new("O-001");
352
353        assert_eq!(core.spawn_sequence(&primary_id), None);
354
355        let _ = core.spawn_client_order_id(&primary_id);
356        assert_eq!(core.spawn_sequence(&primary_id), Some(1));
357
358        let _ = core.spawn_client_order_id(&primary_id);
359        assert_eq!(core.spawn_sequence(&primary_id), Some(2));
360    }
361
362    #[rstest]
363    fn test_strategy_subscription_tracking() {
364        let config = create_test_config();
365        let mut core = ExecutionAlgorithmCore::new(config);
366
367        let strategy_id = StrategyId::new("TEST-001");
368
369        assert!(!core.is_strategy_subscribed(&strategy_id));
370
371        core.add_subscribed_strategy(strategy_id);
372        assert!(core.is_strategy_subscribed(&strategy_id));
373    }
374
375    #[rstest]
376    fn test_clear_spawn_ids() {
377        let config = create_test_config();
378        let mut core = ExecutionAlgorithmCore::new(config);
379
380        let primary_id = ClientOrderId::new("O-001");
381        let _ = core.spawn_client_order_id(&primary_id);
382
383        assert!(core.spawn_sequence(&primary_id).is_some());
384
385        core.clear_spawn_ids();
386        assert!(core.spawn_sequence(&primary_id).is_none());
387    }
388
389    #[rstest]
390    fn test_reset() {
391        let config = create_test_config();
392        let mut core = ExecutionAlgorithmCore::new(config);
393
394        let primary_id = ClientOrderId::new("O-001");
395        let strategy_id = StrategyId::new("TEST-001");
396
397        let _ = core.spawn_client_order_id(&primary_id);
398        core.add_subscribed_strategy(strategy_id);
399
400        core.reset();
401
402        assert!(core.spawn_sequence(&primary_id).is_none());
403        assert!(!core.is_strategy_subscribed(&strategy_id));
404    }
405
406    #[rstest]
407    fn test_data_actor_core_available_through_native_trait() {
408        let config = create_test_config();
409        let core = ExecutionAlgorithmCore::new(config);
410
411        assert!(DataActorNative::core(&core).trader_id().is_none());
412    }
413}