nautilus_trading/algorithm/
core.rs1use std::{cell::RefCell, fmt::Debug, rc::Rc};
19
20use ahash::{AHashMap, AHashSet};
21use indexmap::IndexMap;
22use nautilus_common::{
23 actor::{DataActorConfig, DataActorCore, DataActorNative},
24 cache::Cache,
25 clock::Clock,
26 msgbus::TypedHandler,
27};
28use nautilus_model::{
29 events::{OrderEventAny, PositionEvent},
30 identifiers::{ActorId, ClientOrderId, ExecAlgorithmId, StrategyId, TraderId},
31 orders::{OrderAny, OrderList},
32 types::Quantity,
33};
34
35use super::config::ExecutionAlgorithmConfig;
36
37#[derive(Clone, Debug)]
39pub struct StrategyEventHandlers {
40 pub order_topic: String,
42 pub order_handler: TypedHandler<OrderEventAny>,
44 pub position_topic: String,
46 pub position_handler: TypedHandler<PositionEvent>,
48}
49
50pub struct ExecutionAlgorithmCore {
61 pub actor: DataActorCore,
63 pub config: ExecutionAlgorithmConfig,
65 pub exec_algorithm_id: ExecAlgorithmId,
67 exec_spawn_ids: AHashMap<ClientOrderId, u32>,
69 subscribed_strategies: AHashSet<StrategyId>,
71 pending_spawn_reductions: AHashMap<ClientOrderId, Quantity>,
73 strategy_event_handlers: IndexMap<StrategyId, StrategyEventHandlers>,
75}
76
77pub trait ExecutionAlgorithmNative: DataActorNative {
88 fn exec_algorithm_core(&self) -> &ExecutionAlgorithmCore;
90
91 fn exec_algorithm_core_mut(&mut self) -> &mut ExecutionAlgorithmCore;
93}
94
95impl Debug for ExecutionAlgorithmCore {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct(stringify!(ExecutionAlgorithmCore))
98 .field("actor", &self.actor)
99 .field("config", &self.config)
100 .field("exec_algorithm_id", &self.exec_algorithm_id)
101 .field("exec_spawn_ids", &self.exec_spawn_ids.len())
102 .field("subscribed_strategies", &self.subscribed_strategies.len())
103 .field(
104 "pending_spawn_reductions",
105 &self.pending_spawn_reductions.len(),
106 )
107 .field(
108 "strategy_event_handlers",
109 &self.strategy_event_handlers.len(),
110 )
111 .finish()
112 }
113}
114
115impl ExecutionAlgorithmCore {
116 #[must_use]
122 pub fn new(config: ExecutionAlgorithmConfig) -> Self {
123 let exec_algorithm_id = config
124 .exec_algorithm_id
125 .expect("ExecutionAlgorithmConfig must have exec_algorithm_id set");
126
127 let actor_config = DataActorConfig {
128 actor_id: Some(ActorId::from(exec_algorithm_id.inner().as_str())),
129 log_events: config.log_events,
130 log_commands: config.log_commands,
131 };
132
133 Self {
134 actor: DataActorCore::new(actor_config),
135 config,
136 exec_algorithm_id,
137 exec_spawn_ids: AHashMap::new(),
138 subscribed_strategies: AHashSet::new(),
139 pending_spawn_reductions: AHashMap::new(),
140 strategy_event_handlers: IndexMap::new(),
141 }
142 }
143
144 pub fn register(
150 &mut self,
151 trader_id: TraderId,
152 clock: Rc<RefCell<dyn Clock>>,
153 cache: Rc<RefCell<Cache>>,
154 ) -> anyhow::Result<()> {
155 self.actor.register(trader_id, clock, cache)
156 }
157
158 #[must_use]
160 pub fn id(&self) -> ExecAlgorithmId {
161 self.exec_algorithm_id
162 }
163
164 #[must_use]
168 pub fn spawn_client_order_id(&mut self, primary_id: &ClientOrderId) -> ClientOrderId {
169 let sequence = self
170 .exec_spawn_ids
171 .entry(*primary_id)
172 .and_modify(|s| *s += 1)
173 .or_insert(1);
174
175 ClientOrderId::new(format!("{primary_id}-E{sequence}"))
176 }
177
178 #[must_use]
180 pub fn spawn_sequence(&self, primary_id: &ClientOrderId) -> Option<u32> {
181 self.exec_spawn_ids.get(primary_id).copied()
182 }
183
184 #[must_use]
186 pub fn is_strategy_subscribed(&self, strategy_id: &StrategyId) -> bool {
187 self.subscribed_strategies.contains(strategy_id)
188 }
189
190 pub fn add_subscribed_strategy(&mut self, strategy_id: StrategyId) {
192 self.subscribed_strategies.insert(strategy_id);
193 }
194
195 pub fn store_strategy_event_handlers(
197 &mut self,
198 strategy_id: StrategyId,
199 handlers: StrategyEventHandlers,
200 ) {
201 self.strategy_event_handlers.insert(strategy_id, handlers);
202 }
203
204 pub fn take_strategy_event_handlers(&mut self) -> IndexMap<StrategyId, StrategyEventHandlers> {
206 std::mem::take(&mut self.strategy_event_handlers)
207 }
208
209 pub fn clear_spawn_ids(&mut self) {
211 self.exec_spawn_ids.clear();
212 }
213
214 pub fn clear_subscribed_strategies(&mut self) {
216 self.subscribed_strategies.clear();
217 }
218
219 pub fn track_pending_spawn_reduction(&mut self, spawn_id: ClientOrderId, quantity: Quantity) {
221 self.pending_spawn_reductions.insert(spawn_id, quantity);
222 }
223
224 pub fn take_pending_spawn_reduction(&mut self, spawn_id: &ClientOrderId) -> Option<Quantity> {
226 self.pending_spawn_reductions.remove(spawn_id)
227 }
228
229 pub fn clear_pending_spawn_reductions(&mut self) {
231 self.pending_spawn_reductions.clear();
232 }
233
234 pub fn reset(&mut self) {
239 self.exec_spawn_ids.clear();
240 self.subscribed_strategies.clear();
241 self.pending_spawn_reductions.clear();
242 self.strategy_event_handlers.clear();
243 }
244
245 pub fn get_order(&self, client_order_id: &ClientOrderId) -> anyhow::Result<OrderAny> {
251 Ok(self.cache_ref().try_order_owned(client_order_id)?)
252 }
253
254 pub fn get_orders_for_list(&self, order_list: &OrderList) -> anyhow::Result<Vec<OrderAny>> {
260 order_list
261 .client_order_ids
262 .iter()
263 .map(|id| self.get_order(id))
264 .collect()
265 }
266}
267
268impl DataActorNative for ExecutionAlgorithmCore {
269 fn core(&self) -> &DataActorCore {
270 &self.actor
271 }
272
273 fn core_mut(&mut self) -> &mut DataActorCore {
274 &mut self.actor
275 }
276}
277
278impl ExecutionAlgorithmNative for ExecutionAlgorithmCore {
279 fn exec_algorithm_core(&self) -> &ExecutionAlgorithmCore {
280 self
281 }
282
283 fn exec_algorithm_core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
284 self
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use rstest::rstest;
291
292 use super::*;
293
294 fn create_test_config() -> ExecutionAlgorithmConfig {
295 ExecutionAlgorithmConfig {
296 exec_algorithm_id: Some(ExecAlgorithmId::new("TWAP")),
297 ..Default::default()
298 }
299 }
300
301 #[rstest]
302 fn test_core_new() {
303 let config = create_test_config();
304 let core = ExecutionAlgorithmCore::new(config.clone());
305
306 assert_eq!(core.exec_algorithm_id, ExecAlgorithmId::new("TWAP"));
307 assert_eq!(core.config.log_events, config.log_events);
308 assert!(core.exec_spawn_ids.is_empty());
309 assert!(core.subscribed_strategies.is_empty());
310 }
311
312 #[rstest]
313 fn test_spawn_client_order_id_sequence() {
314 let config = create_test_config();
315 let mut core = ExecutionAlgorithmCore::new(config);
316
317 let primary_id = ClientOrderId::new("O-001");
318
319 let spawn1 = core.spawn_client_order_id(&primary_id);
320 assert_eq!(spawn1.as_str(), "O-001-E1");
321
322 let spawn2 = core.spawn_client_order_id(&primary_id);
323 assert_eq!(spawn2.as_str(), "O-001-E2");
324
325 let spawn3 = core.spawn_client_order_id(&primary_id);
326 assert_eq!(spawn3.as_str(), "O-001-E3");
327 }
328
329 #[rstest]
330 fn test_spawn_client_order_id_different_primaries() {
331 let config = create_test_config();
332 let mut core = ExecutionAlgorithmCore::new(config);
333
334 let primary1 = ClientOrderId::new("O-001");
335 let primary2 = ClientOrderId::new("O-002");
336
337 let spawn1_1 = core.spawn_client_order_id(&primary1);
338 let spawn2_1 = core.spawn_client_order_id(&primary2);
339 let spawn1_2 = core.spawn_client_order_id(&primary1);
340
341 assert_eq!(spawn1_1.as_str(), "O-001-E1");
342 assert_eq!(spawn2_1.as_str(), "O-002-E1");
343 assert_eq!(spawn1_2.as_str(), "O-001-E2");
344 }
345
346 #[rstest]
347 fn test_spawn_sequence() {
348 let config = create_test_config();
349 let mut core = ExecutionAlgorithmCore::new(config);
350
351 let primary_id = ClientOrderId::new("O-001");
352
353 assert_eq!(core.spawn_sequence(&primary_id), None);
354
355 let _ = core.spawn_client_order_id(&primary_id);
356 assert_eq!(core.spawn_sequence(&primary_id), Some(1));
357
358 let _ = core.spawn_client_order_id(&primary_id);
359 assert_eq!(core.spawn_sequence(&primary_id), Some(2));
360 }
361
362 #[rstest]
363 fn test_strategy_subscription_tracking() {
364 let config = create_test_config();
365 let mut core = ExecutionAlgorithmCore::new(config);
366
367 let strategy_id = StrategyId::new("TEST-001");
368
369 assert!(!core.is_strategy_subscribed(&strategy_id));
370
371 core.add_subscribed_strategy(strategy_id);
372 assert!(core.is_strategy_subscribed(&strategy_id));
373 }
374
375 #[rstest]
376 fn test_clear_spawn_ids() {
377 let config = create_test_config();
378 let mut core = ExecutionAlgorithmCore::new(config);
379
380 let primary_id = ClientOrderId::new("O-001");
381 let _ = core.spawn_client_order_id(&primary_id);
382
383 assert!(core.spawn_sequence(&primary_id).is_some());
384
385 core.clear_spawn_ids();
386 assert!(core.spawn_sequence(&primary_id).is_none());
387 }
388
389 #[rstest]
390 fn test_reset() {
391 let config = create_test_config();
392 let mut core = ExecutionAlgorithmCore::new(config);
393
394 let primary_id = ClientOrderId::new("O-001");
395 let strategy_id = StrategyId::new("TEST-001");
396
397 let _ = core.spawn_client_order_id(&primary_id);
398 core.add_subscribed_strategy(strategy_id);
399
400 core.reset();
401
402 assert!(core.spawn_sequence(&primary_id).is_none());
403 assert!(!core.is_strategy_subscribed(&strategy_id));
404 }
405
406 #[rstest]
407 fn test_data_actor_core_available_through_native_trait() {
408 let config = create_test_config();
409 let core = ExecutionAlgorithmCore::new(config);
410
411 assert!(DataActorNative::core(&core).trader_id().is_none());
412 }
413}