1pub mod config;
17pub mod core;
18
19pub use core::StrategyCore;
20use std::panic::{AssertUnwindSafe, catch_unwind};
21
22use ahash::AHashSet;
23pub use config::{ImportableStrategyConfig, StrategyConfig};
24use nautilus_common::{
25 actor::DataActor,
26 component::Component,
27 enums::ComponentState,
28 logging::{EVT, RECV},
29 messages::execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
31 SubmitOrder, SubmitOrderList, TradingCommand,
32 },
33 msgbus,
34 timer::TimeEvent,
35};
36use nautilus_core::{Params, UUID4};
37use nautilus_model::{
38 enums::{OrderSide, OrderStatus, PositionSide, TimeInForce, TriggerType},
39 events::{
40 OrderAccepted, OrderCancelRejected, OrderDenied, OrderEmulated, OrderEventAny,
41 OrderExpired, OrderInitialized, OrderModifyRejected, OrderPendingCancel,
42 OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted, OrderTriggered,
43 OrderUpdated, PositionChanged, PositionClosed, PositionEvent, PositionOpened,
44 },
45 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId},
46 orders::{
47 LIMIT_ORDER_TYPES, Order, OrderAny, OrderCore, OrderError, OrderList, STOP_ORDER_TYPES,
48 },
49 position::Position,
50 types::{Price, Quantity},
51};
52use ustr::Ustr;
53
54pub trait Strategy: DataActor {
86 fn core(&self) -> &StrategyCore;
90
91 fn core_mut(&mut self) -> &mut StrategyCore;
95
96 fn external_order_claims(&self) -> Option<Vec<InstrumentId>> {
101 None
102 }
103
104 fn submit_order(
110 &mut self,
111 order: OrderAny,
112 position_id: Option<PositionId>,
113 client_id: Option<ClientId>,
114 params: Option<Params>,
115 ) -> anyhow::Result<()> {
116 let core = self.core_mut();
117
118 let trader_id = core.trader_id().expect("Trader ID not set");
119 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
120 let ts_init = core.clock().timestamp_ns();
121
122 if order.status() != OrderStatus::Initialized {
123 anyhow::bail!(
124 "Order denied: invalid status for {}, expected INITIALIZED",
125 order.client_order_id()
126 );
127 }
128
129 let market_exit_tag = core.market_exit_tag;
130 let is_market_exit_order = order
131 .tags()
132 .is_some_and(|tags| tags.contains(&market_exit_tag));
133 let should_deny_for_market_exit =
134 core.is_exiting && !order.is_reduce_only() && !is_market_exit_order;
135
136 if should_deny_for_market_exit {
137 self.deny_order(&order, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
138 return Ok(());
139 }
140
141 let core = self.core_mut();
142 let params = params.filter(|params| !params.is_empty());
143
144 {
145 let cache_rc = core.cache_rc();
146 let mut cache = cache_rc.borrow_mut();
147 cache.add_order(order.clone(), position_id, client_id, true)?;
148 }
149
150 publish_order_initialized(&order);
151
152 let command = SubmitOrder::new(
153 trader_id,
154 client_id,
155 strategy_id,
156 order.instrument_id(),
157 order.client_order_id(),
158 order.init_event().clone(),
159 order.exec_algorithm_id(),
160 position_id,
161 params,
162 UUID4::new(),
163 ts_init,
164 );
165
166 let manager = core.order_manager();
167
168 if matches!(order.emulation_trigger(), Some(trigger) if trigger != TriggerType::NoTrigger) {
169 manager.send_emulator_command(TradingCommand::SubmitOrder(command));
170 } else if order.exec_algorithm_id().is_some() {
171 manager.send_algo_command(command, order.exec_algorithm_id().unwrap());
172 } else {
173 manager.send_risk_command(TradingCommand::SubmitOrder(command));
174 }
175
176 self.set_gtd_expiry(&order)?;
177 Ok(())
178 }
179
180 fn submit_order_list(
187 &mut self,
188 mut orders: Vec<OrderAny>,
189 position_id: Option<PositionId>,
190 client_id: Option<ClientId>,
191 params: Option<Params>,
192 ) -> anyhow::Result<()> {
193 for order in &orders {
194 if order.status() != OrderStatus::Initialized {
195 anyhow::bail!(
196 "Order in list denied: invalid status for {}, expected INITIALIZED",
197 order.client_order_id()
198 );
199 }
200 }
201
202 let should_deny = {
203 let core = self.core_mut();
204 let tag = core.market_exit_tag;
205 core.is_exiting
206 && orders.iter().any(|o| {
207 !o.is_reduce_only() && !o.tags().is_some_and(|tags| tags.contains(&tag))
208 })
209 };
210
211 if should_deny {
212 self.deny_order_list(&orders, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
213 return Ok(());
214 }
215
216 let core = self.core_mut();
217
218 let trader_id = core.trader_id().expect("Trader ID not set");
219 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
220 let ts_init = core.clock().timestamp_ns();
221
222 let order_list = if orders.first().is_some_and(|o| o.order_list_id().is_some()) {
224 OrderList::from_orders(&orders, ts_init)
225 } else {
226 core.order_factory().create_list(&mut orders, ts_init)
227 };
228
229 {
230 let cache_rc = core.cache_rc();
231 let cache = cache_rc.borrow();
232 if cache.order_list_exists(&order_list.id) {
233 anyhow::bail!("OrderList denied: duplicate {}", order_list.id);
234 }
235
236 for order in &orders {
237 if cache.order_exists(&order.client_order_id()) {
238 anyhow::bail!(
239 "Order in list denied: duplicate {}",
240 order.client_order_id()
241 );
242 }
243 }
244 }
245
246 {
247 let cache_rc = core.cache_rc();
248 let mut cache = cache_rc.borrow_mut();
249 cache.add_order_list(order_list.clone())?;
250 }
251
252 for order in &orders {
253 {
254 let cache_rc = core.cache_rc();
255 let mut cache = cache_rc.borrow_mut();
256 cache.add_order(order.clone(), position_id, client_id, true)?;
257 }
258
259 publish_order_initialized(order);
260 }
261
262 let params = params.filter(|params| !params.is_empty());
263
264 let first_order = orders.first();
265 let order_inits: Vec<_> = orders.iter().map(|o| o.init_event().clone()).collect();
266 let exec_algorithm_id = first_order.and_then(|o| o.exec_algorithm_id());
267
268 let command = SubmitOrderList::new(
269 trader_id,
270 client_id,
271 strategy_id,
272 order_list,
273 order_inits,
274 exec_algorithm_id,
275 position_id,
276 params,
277 UUID4::new(),
278 ts_init,
279 );
280
281 let has_emulated_order = orders.iter().any(|o| {
282 matches!(o.emulation_trigger(), Some(trigger) if trigger != TriggerType::NoTrigger)
283 || o.is_emulated()
284 });
285
286 let manager = core.order_manager();
287
288 if has_emulated_order {
289 manager.send_emulator_command(TradingCommand::SubmitOrderList(command));
290 } else if let Some(algo_id) = exec_algorithm_id {
291 let endpoint = format!("{algo_id}.execute");
292 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrderList(command));
293 } else {
294 manager.send_risk_command(TradingCommand::SubmitOrderList(command));
295 }
296
297 for order in &orders {
298 self.set_gtd_expiry(order)?;
299 }
300
301 Ok(())
302 }
303
304 fn modify_order(
310 &mut self,
311 client_order_id: ClientOrderId,
312 quantity: Option<Quantity>,
313 price: Option<Price>,
314 trigger_price: Option<Price>,
315 client_id: Option<ClientId>,
316 params: Option<Params>,
317 ) -> anyhow::Result<()> {
318 let (trader_id, strategy_id) = {
319 let core = self.core_mut();
320 (
321 core.trader_id().expect("Trader ID not set"),
322 StrategyId::from(core.actor_id().inner().as_str()),
323 )
324 };
325
326 let params = params.filter(|params| !params.is_empty());
327
328 let order = match self
330 .core_mut()
331 .cache_rc()
332 .borrow()
333 .order_owned(&client_order_id)
334 {
335 Some(order) => order,
336 None => anyhow::bail!("Cannot modify order: {client_order_id} not found in cache"),
337 };
338
339 let mut updating = false;
340
341 if quantity.is_some_and(|q| q != order.quantity()) {
342 updating = true;
343 }
344
345 if let Some(price) = price {
346 if !LIMIT_ORDER_TYPES.contains(&order.order_type()) {
347 anyhow::bail!("{} orders do not have a LIMIT price", order.order_type());
348 }
349
350 if Some(price) != order.price() {
351 updating = true;
352 }
353 }
354
355 if let Some(trigger_price) = trigger_price {
356 if !STOP_ORDER_TYPES.contains(&order.order_type()) {
357 anyhow::bail!(
358 "{} orders do not have a STOP trigger price",
359 order.order_type()
360 );
361 }
362
363 if Some(trigger_price) != order.trigger_price() {
364 updating = true;
365 }
366 }
367
368 if !updating {
369 log::error!(
370 "Cannot create command ModifyOrder: quantity, price and trigger were either None \
371 or the same as existing values"
372 );
373 return Ok(());
374 }
375
376 if order.is_closed() || order.is_pending_cancel() {
377 log::warn!(
378 "Cannot create command ModifyOrder: state is {:?}, {order:?}",
379 order.status()
380 );
381 return Ok(());
382 }
383
384 if !self.mark_order_pending_update(&order)? {
385 return Ok(());
386 }
387
388 let command = ModifyOrder::new(
389 trader_id,
390 client_id,
391 strategy_id,
392 order.instrument_id(),
393 order.client_order_id(),
394 order.venue_order_id(),
395 quantity,
396 price,
397 trigger_price,
398 UUID4::new(),
399 self.core_mut().clock().timestamp_ns(),
400 params,
401 );
402
403 let manager = self.core_mut().order_manager();
404
405 if order.is_emulated() {
406 manager.send_emulator_command(TradingCommand::ModifyOrder(command));
407 } else {
408 manager.send_risk_command(TradingCommand::ModifyOrder(command));
409 }
410 Ok(())
411 }
412
413 fn cancel_order(
419 &mut self,
420 client_order_id: ClientOrderId,
421 client_id: Option<ClientId>,
422 params: Option<Params>,
423 ) -> anyhow::Result<()> {
424 let (trader_id, strategy_id, ts_init) = {
425 let core = self.core_mut();
426 (
427 core.trader_id().expect("Trader ID not set"),
428 StrategyId::from(core.actor_id().inner().as_str()),
429 core.clock().timestamp_ns(),
430 )
431 };
432
433 let params = params.filter(|params| !params.is_empty());
434
435 let order = match self
439 .core_mut()
440 .cache_rc()
441 .borrow()
442 .order_owned(&client_order_id)
443 {
444 Some(order) => order,
445 None => anyhow::bail!("Cannot cancel order: {client_order_id} not found in cache"),
446 };
447
448 if !self.mark_order_pending_cancel(&order)? {
449 return Ok(());
450 }
451
452 let command = CancelOrder::new(
453 trader_id,
454 client_id,
455 strategy_id,
456 order.instrument_id(),
457 order.client_order_id(),
458 order.venue_order_id(),
459 UUID4::new(),
460 ts_init,
461 params,
462 );
463
464 let manager = self.core_mut().order_manager();
465
466 if matches!(order.emulation_trigger(), Some(trigger) if trigger != TriggerType::NoTrigger)
467 || order.is_emulated()
468 {
469 manager.send_emulator_command(TradingCommand::CancelOrder(command));
470 } else if let Some(algo_id) = order
471 .exec_algorithm_id()
472 .filter(|_| order.is_active_local())
473 {
474 let endpoint = format!("{algo_id}.execute");
475 msgbus::send_any(endpoint.into(), &TradingCommand::CancelOrder(command));
476 } else {
477 manager.send_exec_command(TradingCommand::CancelOrder(command));
478 }
479
480 if self.core().config.manage_gtd_expiry
481 && order.time_in_force() == TimeInForce::Gtd
482 && self.has_gtd_expiry_timer(&order.client_order_id())
483 {
484 self.cancel_gtd_expiry(&order.client_order_id());
485 }
486
487 Ok(())
488 }
489
490 fn cancel_orders(
497 &mut self,
498 client_order_ids: Vec<ClientOrderId>,
499 client_id: Option<ClientId>,
500 params: Option<Params>,
501 ) -> anyhow::Result<()> {
502 if client_order_ids.is_empty() {
503 anyhow::bail!("Cannot batch cancel empty order list");
504 }
505
506 let (trader_id, strategy_id, ts_init) = {
507 let core = self.core_mut();
508 (
509 core.trader_id().expect("Trader ID not set"),
510 StrategyId::from(core.actor_id().inner().as_str()),
511 core.clock().timestamp_ns(),
512 )
513 };
514
515 let orders: Vec<OrderAny> = {
517 let cache_rc = self.core_mut().cache_rc();
518 let cache = cache_rc.borrow();
519 client_order_ids
520 .iter()
521 .map(|id| {
522 cache.order_owned(id).ok_or_else(|| {
523 anyhow::anyhow!("Cannot cancel order: {id} not found in cache")
524 })
525 })
526 .collect::<Result<_, _>>()?
527 };
528
529 let instrument_id = orders[0].instrument_id();
530
531 for order in &orders {
532 if order.instrument_id() != instrument_id {
533 anyhow::bail!(
534 "Cannot batch cancel orders for different instruments: {} vs {}",
535 instrument_id,
536 order.instrument_id()
537 );
538 }
539
540 if order.is_emulated() || order.is_active_local() {
541 anyhow::bail!("Cannot include emulated or local orders in batch cancel");
542 }
543 }
544
545 let mut cancels = Vec::with_capacity(orders.len());
546
547 for order in orders {
548 if !self.mark_order_pending_cancel(&order)? {
549 continue;
550 }
551
552 cancels.push(CancelOrder::new(
553 trader_id,
554 client_id,
555 strategy_id,
556 instrument_id,
557 order.client_order_id(),
558 order.venue_order_id(),
559 UUID4::new(),
560 ts_init,
561 params.clone(),
562 ));
563 }
564
565 if cancels.is_empty() {
566 log::warn!("Cannot send `BatchCancelOrders`, no valid cancel commands");
567 return Ok(());
568 }
569
570 let manager = self.core_mut().order_manager();
571 let command = BatchCancelOrders::new(
572 trader_id,
573 client_id,
574 strategy_id,
575 instrument_id,
576 cancels,
577 UUID4::new(),
578 ts_init,
579 params,
580 );
581
582 manager.send_exec_command(TradingCommand::BatchCancelOrders(command));
583 Ok(())
584 }
585
586 fn mark_order_pending_update(&mut self, order: &OrderAny) -> anyhow::Result<bool> {
592 if order.is_active_local() {
593 return Ok(true);
594 }
595
596 let strategy_id = order.strategy_id();
597 let event = OrderEventAny::PendingUpdate(self.generate_order_pending_update(order));
598
599 {
600 let cache_rc = self.core_mut().cache_rc();
601 let mut cache = cache_rc.borrow_mut();
602 match cache.update_order(&event) {
603 Ok(_) => {}
604 Err(e)
605 if matches!(
606 e.downcast_ref::<OrderError>(),
607 Some(OrderError::InvalidStateTransition)
608 ) =>
609 {
610 log::warn!("InvalidStateTrigger: {e}, did not apply pending update event");
611 return Ok(false);
612 }
613 Err(e) => return Err(e),
614 }
615 }
616
617 let topic = format!("events.order.{strategy_id}");
618 msgbus::publish_order_event(topic.into(), &event);
619
620 Ok(true)
621 }
622
623 fn mark_order_pending_cancel(&mut self, order: &OrderAny) -> anyhow::Result<bool> {
629 if order.is_closed() || order.is_pending_cancel() {
630 log::warn!(
631 "Cannot cancel order: state is {:?}, {order:?}",
632 order.status()
633 );
634 return Ok(false);
635 }
636
637 if order.is_active_local() {
638 return Ok(true);
639 }
640
641 let strategy_id = order.strategy_id();
642 let event = OrderEventAny::PendingCancel(self.generate_order_pending_cancel(order));
643
644 {
645 let cache_rc = self.core_mut().cache_rc();
646 let mut cache = cache_rc.borrow_mut();
647 match cache.update_order(&event) {
648 Ok(_) => {}
649 Err(e)
650 if matches!(
651 e.downcast_ref::<OrderError>(),
652 Some(OrderError::InvalidStateTransition)
653 ) =>
654 {
655 log::warn!("InvalidStateTrigger: {e}, did not apply pending cancel event");
656 return Ok(false);
657 }
658 Err(e) => return Err(e),
659 }
660 cache.update_order_pending_cancel_local(order);
661 }
662
663 let topic = format!("events.order.{strategy_id}");
664 msgbus::publish_order_event(topic.into(), &event);
665
666 Ok(true)
667 }
668
669 fn generate_order_pending_update(&mut self, order: &OrderAny) -> OrderPendingUpdate {
671 let ts_now = self.core_mut().clock().timestamp_ns();
672
673 OrderPendingUpdate::new(
674 order.trader_id(),
675 order.strategy_id(),
676 order.instrument_id(),
677 order.client_order_id(),
678 order
679 .account_id()
680 .expect("Order must have account_id for pending update"),
681 UUID4::new(),
682 ts_now,
683 ts_now,
684 false,
685 order.venue_order_id(),
686 )
687 }
688
689 fn generate_order_pending_cancel(&mut self, order: &OrderAny) -> OrderPendingCancel {
691 let ts_now = self.core_mut().clock().timestamp_ns();
692
693 OrderPendingCancel::new(
694 order.trader_id(),
695 order.strategy_id(),
696 order.instrument_id(),
697 order.client_order_id(),
698 order
699 .account_id()
700 .expect("Order must have account_id for pending cancel"),
701 UUID4::new(),
702 ts_now,
703 ts_now,
704 false,
705 order.venue_order_id(),
706 )
707 }
708
709 fn cancel_all_orders(
715 &mut self,
716 instrument_id: InstrumentId,
717 order_side: Option<OrderSide>,
718 client_id: Option<ClientId>,
719 params: Option<Params>,
720 ) -> anyhow::Result<()> {
721 let params = params.filter(|params| !params.is_empty());
722 let core = self.core_mut();
723
724 let trader_id = core.trader_id().expect("Trader ID not set");
725 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
726 let ts_init = core.clock().timestamp_ns();
727 let cache = core.cache();
728
729 let open_count = cache.orders_open_count(
730 None,
731 Some(&instrument_id),
732 Some(&strategy_id),
733 None,
734 order_side,
735 );
736
737 let emulated_count = cache.orders_emulated_count(
738 None,
739 Some(&instrument_id),
740 Some(&strategy_id),
741 None,
742 order_side,
743 );
744
745 let inflight_count = cache.orders_inflight_count(
746 None,
747 Some(&instrument_id),
748 Some(&strategy_id),
749 None,
750 order_side,
751 );
752
753 let mut exec_algorithm_ids: Vec<_> = cache.exec_algorithm_ids().into_iter().collect();
757 exec_algorithm_ids.sort();
758 let mut algo_orders: Vec<OrderAny> = Vec::new();
759
760 for algo_id in &exec_algorithm_ids {
761 algo_orders.extend(
762 cache
763 .orders_for_exec_algorithm(
764 algo_id,
765 None,
766 Some(&instrument_id),
767 Some(&strategy_id),
768 None,
769 order_side,
770 )
771 .into_iter()
772 .map(|o| o.clone()),
773 );
774 }
775
776 let algo_count = algo_orders.len();
777
778 drop(cache);
779
780 if open_count == 0 && emulated_count == 0 && inflight_count == 0 && algo_count == 0 {
781 let side_str = order_side.map(|s| format!(" {s}")).unwrap_or_default();
782 log::info!("No {instrument_id} open, emulated, or inflight{side_str} orders to cancel");
783 return Ok(());
784 }
785
786 let manager = core.order_manager();
787
788 let side_str = order_side.map(|s| format!(" {s}")).unwrap_or_default();
789
790 if open_count > 0 {
791 log::info!(
792 "Canceling {open_count} open{side_str} {instrument_id} order{}",
793 if open_count == 1 { "" } else { "s" }
794 );
795 }
796
797 if emulated_count > 0 {
798 log::info!(
799 "Canceling {emulated_count} emulated{side_str} {instrument_id} order{}",
800 if emulated_count == 1 { "" } else { "s" }
801 );
802 }
803
804 if inflight_count > 0 {
805 log::info!(
806 "Canceling {inflight_count} inflight{side_str} {instrument_id} order{}",
807 if inflight_count == 1 { "" } else { "s" }
808 );
809 }
810
811 if open_count > 0 || inflight_count > 0 {
812 let command = CancelAllOrders::new(
813 trader_id,
814 client_id,
815 strategy_id,
816 instrument_id,
817 order_side.unwrap_or(OrderSide::NoOrderSide),
818 UUID4::new(),
819 ts_init,
820 params.clone(),
821 );
822
823 manager.send_exec_command(TradingCommand::CancelAllOrders(command));
824 }
825
826 if emulated_count > 0 {
827 let command = CancelAllOrders::new(
828 trader_id,
829 client_id,
830 strategy_id,
831 instrument_id,
832 order_side.unwrap_or(OrderSide::NoOrderSide),
833 UUID4::new(),
834 ts_init,
835 params,
836 );
837
838 manager.send_emulator_command(TradingCommand::CancelAllOrders(command));
839 }
840
841 for order in algo_orders {
842 self.cancel_order(order.client_order_id(), client_id, None)?;
843 }
844
845 Ok(())
846 }
847
848 fn close_position(
854 &mut self,
855 position: &Position,
856 client_id: Option<ClientId>,
857 tags: Option<Vec<Ustr>>,
858 time_in_force: Option<TimeInForce>,
859 reduce_only: Option<bool>,
860 quote_quantity: Option<bool>,
861 ) -> anyhow::Result<()> {
862 let core = self.core_mut();
863
864 if position.is_closed() {
865 log::warn!("Cannot close position (already closed): {}", position.id);
866 return Ok(());
867 }
868
869 let closing_side = OrderCore::closing_side(position.side);
870
871 let order = core.order_factory().market(
872 position.instrument_id,
873 closing_side,
874 position.quantity,
875 time_in_force,
876 reduce_only.or(Some(true)),
877 quote_quantity,
878 None,
879 None,
880 tags,
881 None,
882 );
883
884 self.submit_order(order, Some(position.id), client_id, None)
885 }
886
887 #[expect(clippy::too_many_arguments)]
893 fn close_all_positions(
894 &mut self,
895 instrument_id: InstrumentId,
896 position_side: Option<PositionSide>,
897 client_id: Option<ClientId>,
898 tags: Option<Vec<Ustr>>,
899 time_in_force: Option<TimeInForce>,
900 reduce_only: Option<bool>,
901 quote_quantity: Option<bool>,
902 ) -> anyhow::Result<()> {
903 let core = self.core_mut();
904 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
905 let cache = core.cache();
906
907 let positions_open = cache.positions_open(
908 None,
909 Some(&instrument_id),
910 Some(&strategy_id),
911 None,
912 position_side,
913 );
914
915 let side_str = position_side.map(|s| format!(" {s}")).unwrap_or_default();
916
917 if positions_open.is_empty() {
918 log::info!("No {instrument_id} open{side_str} positions to close");
919 return Ok(());
920 }
921
922 let count = positions_open.len();
923 log::info!(
924 "Closing {count} open{side_str} position{}",
925 if count == 1 { "" } else { "s" }
926 );
927
928 let positions_data: Vec<_> = positions_open
929 .iter()
930 .map(|p| (p.id, p.instrument_id, p.side, p.quantity, p.is_closed()))
931 .collect();
932 drop(positions_open);
933
934 drop(cache);
935
936 for (pos_id, pos_instrument_id, pos_side, pos_quantity, is_closed) in positions_data {
937 if is_closed {
938 continue;
939 }
940
941 let core = self.core_mut();
942 let closing_side = OrderCore::closing_side(pos_side);
943 let order = core.order_factory().market(
944 pos_instrument_id,
945 closing_side,
946 pos_quantity,
947 time_in_force,
948 reduce_only.or(Some(true)),
949 quote_quantity,
950 None,
951 None,
952 tags.clone(),
953 None,
954 );
955
956 self.submit_order(order, Some(pos_id), client_id, None)?;
957 }
958
959 Ok(())
960 }
961
962 fn query_account(
971 &mut self,
972 account_id: AccountId,
973 client_id: Option<ClientId>,
974 params: Option<Params>,
975 ) -> anyhow::Result<()> {
976 let core = self.core_mut();
977
978 let trader_id = core.trader_id().expect("Trader ID not set");
979 let ts_init = core.clock().timestamp_ns();
980
981 let command = QueryAccount::new(
982 trader_id,
983 client_id,
984 account_id,
985 UUID4::new(),
986 ts_init,
987 params,
988 );
989
990 core.order_manager()
991 .send_exec_command(TradingCommand::QueryAccount(command));
992 Ok(())
993 }
994
995 fn query_order(
1004 &mut self,
1005 order: &OrderAny,
1006 client_id: Option<ClientId>,
1007 params: Option<Params>,
1008 ) -> anyhow::Result<()> {
1009 let core = self.core_mut();
1010
1011 let trader_id = core.trader_id().expect("Trader ID not set");
1012 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1013 let ts_init = core.clock().timestamp_ns();
1014
1015 let command = QueryOrder::new(
1016 trader_id,
1017 client_id,
1018 strategy_id,
1019 order.instrument_id(),
1020 order.client_order_id(),
1021 order.venue_order_id(),
1022 UUID4::new(),
1023 ts_init,
1024 params,
1025 );
1026
1027 core.order_manager()
1028 .send_exec_command(TradingCommand::QueryOrder(command));
1029 Ok(())
1030 }
1031
1032 fn handle_order_event(&mut self, event: OrderEventAny) {
1034 let state = {
1035 let core = self.core_mut();
1036 let id = &core.actor.actor_id;
1037 let is_warning = matches!(
1038 &event,
1039 OrderEventAny::Denied(_)
1040 | OrderEventAny::Rejected(_)
1041 | OrderEventAny::CancelRejected(_)
1042 | OrderEventAny::ModifyRejected(_)
1043 );
1044
1045 if is_warning {
1046 log::warn!("{id} {RECV}{EVT} {event}");
1047 } else if core.actor.config.log_events {
1048 log::info!("{id} {RECV}{EVT} {event}");
1049 }
1050
1051 core.actor.state()
1052 };
1053
1054 let client_order_id = event.client_order_id();
1055 let is_terminal = matches!(
1056 &event,
1057 OrderEventAny::Filled(_)
1058 | OrderEventAny::Canceled(_)
1059 | OrderEventAny::Rejected(_)
1060 | OrderEventAny::Expired(_)
1061 | OrderEventAny::Denied(_)
1062 );
1063
1064 if is_terminal {
1067 self.cancel_gtd_expiry(&client_order_id);
1068 }
1069
1070 if state != ComponentState::Running {
1073 return;
1074 }
1075
1076 {
1079 let core = self.core_mut();
1080 if let Some(manager) = &mut core.order_manager {
1081 manager.handle_event(&event);
1082 }
1083 }
1084
1085 match &event {
1086 OrderEventAny::Initialized(e) => self.on_order_initialized(e.clone()),
1087 OrderEventAny::Denied(e) => self.on_order_denied(*e),
1088 OrderEventAny::Emulated(e) => self.on_order_emulated(*e),
1089 OrderEventAny::Released(e) => self.on_order_released(*e),
1090 OrderEventAny::Submitted(e) => self.on_order_submitted(*e),
1091 OrderEventAny::Rejected(e) => self.on_order_rejected(*e),
1092 OrderEventAny::Accepted(e) => self.on_order_accepted(*e),
1093 OrderEventAny::Canceled(e) => {
1094 let _ = DataActor::on_order_canceled(self, e);
1095 }
1096 OrderEventAny::Expired(e) => self.on_order_expired(*e),
1097 OrderEventAny::Triggered(e) => self.on_order_triggered(*e),
1098 OrderEventAny::PendingUpdate(e) => self.on_order_pending_update(*e),
1099 OrderEventAny::PendingCancel(e) => self.on_order_pending_cancel(*e),
1100 OrderEventAny::ModifyRejected(e) => self.on_order_modify_rejected(*e),
1101 OrderEventAny::CancelRejected(e) => self.on_order_cancel_rejected(*e),
1102 OrderEventAny::Updated(e) => self.on_order_updated(*e),
1103 OrderEventAny::Filled(e) => {
1104 let _ = DataActor::on_order_filled(self, e);
1105 }
1106 }
1107 }
1108
1109 fn handle_position_event(&mut self, event: PositionEvent) {
1111 let state = {
1112 let core = self.core_mut();
1113
1114 if core.actor.config.log_events {
1115 let id = &core.actor.actor_id;
1116 log::info!("{id} {RECV}{EVT} {event:?}");
1117 }
1118
1119 core.actor.state()
1120 };
1121
1122 if state != ComponentState::Running {
1123 return;
1124 }
1125
1126 match event {
1127 PositionEvent::PositionOpened(e) => self.on_position_opened(e),
1128 PositionEvent::PositionChanged(e) => self.on_position_changed(e),
1129 PositionEvent::PositionClosed(e) => self.on_position_closed(e),
1130 PositionEvent::PositionAdjusted(_) => {
1131 }
1133 }
1134 }
1135
1136 fn on_start(&mut self) -> anyhow::Result<()> {
1147 let core = self.core_mut();
1148 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1149 log::info!("Starting {strategy_id}");
1150
1151 if core.config.manage_gtd_expiry {
1152 self.reactivate_gtd_timers();
1153 }
1154
1155 Ok(())
1156 }
1157
1158 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
1167 if event.name.starts_with("GTD-EXPIRY:") {
1168 self.expire_gtd_order(event.clone());
1169 } else if event.name.starts_with("MARKET_EXIT_CHECK:") {
1170 self.check_market_exit(event.clone());
1171 }
1172 Ok(())
1173 }
1174
1175 #[allow(unused_variables)]
1181 fn on_order_initialized(&mut self, event: OrderInitialized) {}
1182
1183 #[allow(unused_variables)]
1187 fn on_order_denied(&mut self, event: OrderDenied) {}
1188
1189 #[allow(unused_variables)]
1193 fn on_order_emulated(&mut self, event: OrderEmulated) {}
1194
1195 #[allow(unused_variables)]
1199 fn on_order_released(&mut self, event: OrderReleased) {}
1200
1201 #[allow(unused_variables)]
1205 fn on_order_submitted(&mut self, event: OrderSubmitted) {}
1206
1207 #[allow(unused_variables)]
1211 fn on_order_rejected(&mut self, event: OrderRejected) {}
1212
1213 #[allow(unused_variables)]
1217 fn on_order_accepted(&mut self, event: OrderAccepted) {}
1218
1219 #[allow(unused_variables)]
1223 fn on_order_expired(&mut self, event: OrderExpired) {}
1224
1225 #[allow(unused_variables)]
1229 fn on_order_triggered(&mut self, event: OrderTriggered) {}
1230
1231 #[allow(unused_variables)]
1235 fn on_order_pending_update(&mut self, event: OrderPendingUpdate) {}
1236
1237 #[allow(unused_variables)]
1241 fn on_order_pending_cancel(&mut self, event: OrderPendingCancel) {}
1242
1243 #[allow(unused_variables)]
1247 fn on_order_modify_rejected(&mut self, event: OrderModifyRejected) {}
1248
1249 #[allow(unused_variables)]
1253 fn on_order_cancel_rejected(&mut self, event: OrderCancelRejected) {}
1254
1255 #[allow(unused_variables)]
1259 fn on_order_updated(&mut self, event: OrderUpdated) {}
1260
1261 #[allow(unused_variables)]
1267 fn on_position_opened(&mut self, event: PositionOpened) {}
1268
1269 #[allow(unused_variables)]
1273 fn on_position_changed(&mut self, event: PositionChanged) {}
1274
1275 #[allow(unused_variables)]
1279 fn on_position_closed(&mut self, event: PositionClosed) {}
1280
1281 fn on_market_exit(&mut self) {}
1285
1286 fn post_market_exit(&mut self) {}
1290
1291 fn is_exiting(&self) -> bool {
1295 self.core().is_exiting
1296 }
1297
1298 fn market_exit(&mut self) -> anyhow::Result<()> {
1314 let core = self.core_mut();
1315 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1316
1317 if core.actor.state() != ComponentState::Running {
1318 log::warn!("{strategy_id} Cannot market exit: strategy is not running");
1319 return Ok(());
1320 }
1321
1322 if core.is_exiting {
1323 log::warn!("{strategy_id} Market exit called when already in progress");
1324 return Ok(());
1325 }
1326
1327 core.is_exiting = true;
1328 core.market_exit_attempts = 0;
1329 let time_in_force = core.config.market_exit_time_in_force;
1330 let reduce_only = core.config.market_exit_reduce_only;
1331
1332 log::info!("{strategy_id} Initiating market exit...");
1333
1334 self.on_market_exit();
1335
1336 let core = self.core_mut();
1337 let cache = core.cache();
1338
1339 let mut instruments: AHashSet<InstrumentId> = AHashSet::new();
1340
1341 for client_order_id in
1342 cache.iter_client_order_ids_open(None, None, Some(&strategy_id), None)
1343 {
1344 if let Some(order) = cache.order(&client_order_id) {
1345 instruments.insert(order.instrument_id());
1346 }
1347 }
1348
1349 for client_order_id in
1350 cache.iter_client_order_ids_inflight(None, None, Some(&strategy_id), None)
1351 {
1352 if let Some(order) = cache.order(&client_order_id) {
1353 instruments.insert(order.instrument_id());
1354 }
1355 }
1356
1357 for position_id in cache.iter_position_open_ids(None, None, Some(&strategy_id), None) {
1358 if let Some(position) = cache.position(&position_id) {
1359 instruments.insert(position.instrument_id);
1360 }
1361 }
1362
1363 let market_exit_tag = core.market_exit_tag;
1364 let mut instruments: Vec<_> = instruments.into_iter().collect();
1368 instruments.sort();
1369 drop(cache);
1370
1371 for instrument_id in instruments {
1372 if let Err(e) = self.cancel_all_orders(instrument_id, None, None, None) {
1373 log::error!("Error canceling orders for {instrument_id}: {e}");
1374 }
1375
1376 if let Err(e) = self.close_all_positions(
1377 instrument_id,
1378 None,
1379 None,
1380 Some(vec![market_exit_tag]),
1381 Some(time_in_force),
1382 Some(reduce_only),
1383 None,
1384 ) {
1385 log::error!("Error closing positions for {instrument_id}: {e}");
1386 }
1387 }
1388
1389 let core = self.core_mut();
1390 let interval_ms = core.config.market_exit_interval_ms;
1391 let timer_name = core.market_exit_timer_name;
1392
1393 log::info!("{strategy_id} Setting market exit timer at {interval_ms}ms intervals");
1394
1395 let interval_ns = interval_ms * 1_000_000;
1396 let result = core.clock().set_timer_ns(
1397 timer_name.as_str(),
1398 interval_ns,
1399 None,
1400 None,
1401 None,
1402 None,
1403 None,
1404 );
1405
1406 if let Err(e) = result {
1407 core.is_exiting = false;
1409 core.market_exit_attempts = 0;
1410 return Err(e);
1411 }
1412
1413 Ok(())
1414 }
1415
1416 fn check_market_exit(&mut self, _event: TimeEvent) {
1420 if !self.is_exiting() {
1422 return;
1423 }
1424
1425 let core = self.core_mut();
1426 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1427
1428 core.market_exit_attempts += 1;
1429 let attempts = core.market_exit_attempts;
1430 let max_attempts = core.config.market_exit_max_attempts;
1431
1432 log::debug!(
1433 "{strategy_id} Market exit check triggered (attempt {attempts}/{max_attempts})"
1434 );
1435
1436 if attempts >= max_attempts {
1437 let cache = core.cache();
1438 let open_orders_count =
1439 cache.orders_open_count(None, None, Some(&strategy_id), None, None);
1440 let inflight_orders_count =
1441 cache.orders_inflight_count(None, None, Some(&strategy_id), None, None);
1442 let open_positions_count =
1443 cache.positions_open_count(None, None, Some(&strategy_id), None, None);
1444
1445 drop(cache);
1446
1447 log::warn!(
1448 "{strategy_id} Market exit max attempts ({max_attempts}) reached, \
1449 completing with open orders: {open_orders_count}, \
1450 inflight orders: {inflight_orders_count}, \
1451 open positions: {open_positions_count}"
1452 );
1453
1454 self.finalize_market_exit();
1455 return;
1456 }
1457
1458 let cache = core.cache();
1459 let has_open_orders = !cache
1460 .orders_open(None, None, Some(&strategy_id), None, None)
1461 .is_empty();
1462 let has_inflight_orders = !cache
1463 .orders_inflight(None, None, Some(&strategy_id), None, None)
1464 .is_empty();
1465
1466 if has_open_orders || has_inflight_orders {
1467 return;
1468 }
1469
1470 let positions_data: Vec<_> = cache
1471 .positions_open(None, None, Some(&strategy_id), None, None)
1472 .iter()
1473 .map(|p| (p.id, p.instrument_id, p.side, p.quantity, p.is_closed()))
1474 .collect();
1475
1476 if !positions_data.is_empty() {
1477 drop(cache);
1479
1480 for (pos_id, instrument_id, side, quantity, is_closed) in positions_data {
1481 if is_closed {
1482 continue;
1483 }
1484
1485 let core = self.core_mut();
1486 let time_in_force = core.config.market_exit_time_in_force;
1487 let reduce_only = core.config.market_exit_reduce_only;
1488 let market_exit_tag = core.market_exit_tag;
1489 let closing_side = OrderCore::closing_side(side);
1490 let order = core.order_factory().market(
1491 instrument_id,
1492 closing_side,
1493 quantity,
1494 Some(time_in_force),
1495 Some(reduce_only),
1496 None,
1497 None,
1498 None,
1499 Some(vec![market_exit_tag]),
1500 None,
1501 );
1502
1503 if let Err(e) = self.submit_order(order, Some(pos_id), None, None) {
1504 log::error!("Error re-submitting close order for position {pos_id}: {e}");
1505 }
1506 }
1507 return;
1508 }
1509
1510 drop(cache);
1511 self.finalize_market_exit();
1512 }
1513
1514 fn finalize_market_exit(&mut self) {
1519 let (strategy_id, should_stop) = {
1520 let core = self.core_mut();
1521 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1522 let should_stop = core.pending_stop;
1523 (strategy_id, should_stop)
1524 };
1525
1526 self.cancel_market_exit();
1527
1528 let hook_result = catch_unwind(AssertUnwindSafe(|| {
1529 self.post_market_exit();
1530 }));
1531
1532 if let Err(e) = hook_result {
1533 log::error!("{strategy_id} Error in post_market_exit: {e:?}");
1534 }
1535
1536 if should_stop {
1537 log::info!("{strategy_id} Market exit complete, stopping strategy");
1538
1539 if let Err(e) = Component::stop(self) {
1540 log::error!("{strategy_id} Failed to stop: {e}");
1541 }
1542 }
1543
1544 let core = self.core_mut();
1545 debug_assert!(
1546 !(core.pending_stop
1547 && !core.is_exiting
1548 && core.actor.state() == ComponentState::Running),
1549 "INVARIANT: stuck state after finalize_market_exit"
1550 );
1551 }
1552
1553 fn cancel_market_exit(&mut self) {
1557 let core = self.core_mut();
1558 let timer_name = core.market_exit_timer_name;
1559
1560 if core.clock().timer_names().contains(&timer_name.as_str()) {
1561 core.clock().cancel_timer(timer_name.as_str());
1562 }
1563
1564 core.is_exiting = false;
1565 core.pending_stop = false;
1566 core.market_exit_attempts = 0;
1567 }
1568
1569 fn stop(&mut self) -> bool {
1581 let (manage_stop, is_exiting, should_initiate_exit) = {
1582 let core = self.core_mut();
1583 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1584 let manage_stop = core.config.manage_stop;
1585 let state = core.actor.state();
1586 let pending_stop = core.pending_stop;
1587 let is_exiting = core.is_exiting;
1588
1589 if manage_stop {
1590 if state != ComponentState::Running {
1591 return true; }
1593
1594 if pending_stop {
1595 return false; }
1597
1598 core.pending_stop = true;
1599 let should_initiate_exit = !is_exiting;
1600
1601 if should_initiate_exit {
1602 log::info!("{strategy_id} Initiating market exit before stop");
1603 }
1604
1605 (manage_stop, is_exiting, should_initiate_exit)
1606 } else {
1607 (manage_stop, is_exiting, false)
1608 }
1609 };
1610
1611 if manage_stop {
1612 if should_initiate_exit && let Err(e) = self.market_exit() {
1613 log::warn!("Market exit failed during stop: {e}, proceeding with stop");
1614 self.core_mut().pending_stop = false;
1615 return true;
1616 }
1617 debug_assert!(
1618 self.is_exiting(),
1619 "INVARIANT: deferring stop but not exiting"
1620 );
1621 return false; }
1623
1624 if is_exiting {
1626 self.cancel_market_exit();
1627 }
1628
1629 true }
1631
1632 fn deny_order(&mut self, order: &OrderAny, reason: Ustr) {
1637 let core = self.core_mut();
1638 let trader_id = core.trader_id().expect("Trader ID not set");
1639 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1640 let ts_now = core.clock().timestamp_ns();
1641
1642 let event = OrderDenied::new(
1643 trader_id,
1644 strategy_id,
1645 order.instrument_id(),
1646 order.client_order_id(),
1647 reason,
1648 UUID4::new(),
1649 ts_now,
1650 ts_now,
1651 );
1652
1653 log::warn!(
1654 "{strategy_id} Order {} denied: {reason}",
1655 order.client_order_id()
1656 );
1657
1658 let publish_initialized = {
1659 let cache_rc = core.cache_rc();
1660 let mut cache = cache_rc.borrow_mut();
1661 if cache.order_exists(&order.client_order_id()) {
1662 false
1663 } else {
1664 match cache.add_order(order.clone(), None, None, true) {
1665 Ok(()) => true,
1666 Err(e) => {
1667 log::warn!("Failed to add denied order to cache: {e}");
1668 false
1669 }
1670 }
1671 }
1672 };
1673
1674 if publish_initialized {
1675 publish_order_initialized(order);
1676 }
1677
1678 let event = OrderEventAny::Denied(event);
1679 let applied = {
1680 let cache_rc = core.cache_rc();
1681 let mut cache = cache_rc.borrow_mut();
1682 if let Err(e) = cache.update_order(&event) {
1683 log::warn!("Failed to apply OrderDenied event: {e}");
1684 false
1685 } else {
1686 true
1687 }
1688 };
1689
1690 if applied {
1691 let topic = format!("events.order.{strategy_id}");
1692 msgbus::publish_order_event(topic.into(), &event);
1693 }
1694 }
1695
1696 fn deny_order_list(&mut self, orders: &[OrderAny], reason: Ustr) {
1700 for order in orders {
1701 if !order.is_closed() {
1702 self.deny_order(order, reason);
1703 }
1704 }
1705 }
1706
1707 fn set_gtd_expiry(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1717 let core = self.core_mut();
1718
1719 if !core.config.manage_gtd_expiry || order.time_in_force() != TimeInForce::Gtd {
1720 return Ok(());
1721 }
1722
1723 let Some(expire_time) = order.expire_time() else {
1724 return Ok(());
1725 };
1726
1727 let client_order_id = order.client_order_id();
1728 let timer_name = format!("GTD-EXPIRY:{client_order_id}");
1729
1730 let current_time_ns = {
1731 let clock = core.clock();
1732 clock.timestamp_ns()
1733 };
1734
1735 if current_time_ns >= expire_time.as_u64() {
1736 log::info!("GTD order {client_order_id} already expired, canceling immediately");
1737 return self.cancel_order(order.client_order_id(), None, None);
1738 }
1739
1740 {
1741 let mut clock = core.clock();
1742 clock.set_time_alert_ns(&timer_name, expire_time, None, None)?;
1743 }
1744
1745 core.gtd_timers
1746 .insert(client_order_id, Ustr::from(&timer_name));
1747
1748 log::debug!("Set GTD expiry timer for {client_order_id} at {expire_time}");
1749 Ok(())
1750 }
1751
1752 fn cancel_gtd_expiry(&mut self, client_order_id: &ClientOrderId) {
1754 let core = self.core_mut();
1755
1756 if let Some(timer_name) = core.gtd_timers.remove(client_order_id) {
1757 core.clock().cancel_timer(timer_name.as_str());
1758 log::debug!("Canceled GTD expiry timer for {client_order_id}");
1759 }
1760 }
1761
1762 fn has_gtd_expiry_timer(&mut self, client_order_id: &ClientOrderId) -> bool {
1764 let core = self.core_mut();
1765 core.gtd_timers.contains_key(client_order_id)
1766 }
1767
1768 fn expire_gtd_order(&mut self, event: TimeEvent) {
1772 let timer_name = event.name.to_string();
1773 let Some(client_order_id_str) = timer_name.strip_prefix("GTD-EXPIRY:") else {
1774 log::error!("Invalid GTD timer name format: {timer_name}");
1775 return;
1776 };
1777
1778 let client_order_id = ClientOrderId::from(client_order_id_str);
1779
1780 let core = self.core_mut();
1781 core.gtd_timers.remove(&client_order_id);
1782
1783 let order = core.cache().order(&client_order_id).map(|o| o.clone());
1784 let Some(order) = order else {
1785 log::warn!("GTD order {client_order_id} not found in cache");
1786 return;
1787 };
1788
1789 log::info!("GTD order {client_order_id} expired");
1790
1791 if let Err(e) = self.cancel_order(order.client_order_id(), None, None) {
1792 log::error!("Failed to cancel expired GTD order {client_order_id}: {e}");
1793 }
1794 }
1795
1796 fn reactivate_gtd_timers(&mut self) {
1801 let core = self.core_mut();
1802 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1803 let current_time_ns = core.clock().timestamp_ns();
1804
1805 let gtd_orders: Vec<OrderAny> = core
1806 .cache()
1807 .orders_open(None, None, Some(&strategy_id), None, None)
1808 .into_iter()
1809 .filter(|o| o.time_in_force() == TimeInForce::Gtd)
1810 .map(|o| o.clone())
1811 .collect();
1812
1813 for order in gtd_orders {
1814 let Some(expire_time) = order.expire_time() else {
1815 continue;
1816 };
1817
1818 let expire_time_ns = expire_time.as_u64();
1819 let client_order_id = order.client_order_id();
1820
1821 if current_time_ns >= expire_time_ns {
1822 log::info!("GTD order {client_order_id} already expired, canceling immediately");
1823 if let Err(e) = self.cancel_order(order.client_order_id(), None, None) {
1824 log::error!("Failed to cancel expired GTD order {client_order_id}: {e}");
1825 }
1826 } else if let Err(e) = self.set_gtd_expiry(&order) {
1827 log::error!("Failed to set GTD expiry timer for {client_order_id}: {e}");
1828 }
1829 }
1830 }
1831}
1832
1833fn publish_order_initialized(order: &OrderAny) {
1834 let topic = format!("events.order.{}", order.strategy_id());
1835 let event = OrderEventAny::Initialized(order.init_event().clone());
1836 msgbus::publish_order_event(topic.into(), &event);
1837}
1838
1839#[cfg(test)]
1840mod tests {
1841 use std::{cell::RefCell, rc::Rc};
1842
1843 use nautilus_common::{
1844 actor::DataActor,
1845 cache::Cache,
1846 clock::{Clock, TestClock},
1847 component::Component,
1848 msgbus::{
1849 self, MessagingSwitchboard, TypedHandler, TypedIntoHandler,
1850 stubs::{
1851 TypedIntoMessageSavingHandler, TypedMessageSavingHandler,
1852 get_typed_into_message_saving_handler, get_typed_message_saving_handler,
1853 },
1854 },
1855 timer::{TimeEvent, TimeEventCallback},
1856 };
1857 use nautilus_core::UnixNanos;
1858 use nautilus_model::{
1859 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSide},
1860 events::{OrderAccepted, OrderCanceled, OrderFilled, OrderRejected},
1861 identifiers::{
1862 AccountId, ClientOrderId, InstrumentId, OrderListId, PositionId, StrategyId, TradeId,
1863 TraderId, VenueOrderId,
1864 },
1865 orderbook::own::OwnOrderBook,
1866 orders::{LimitOrder, MarketOrder, stubs::TestOrderEventStubs},
1867 stubs::TestDefault,
1868 types::{Currency, Money, Price},
1869 };
1870 use nautilus_portfolio::portfolio::Portfolio;
1871 use rstest::rstest;
1872 use serde_json::Value;
1873
1874 use super::*;
1875 use crate::nautilus_strategy;
1876
1877 #[derive(Debug)]
1878 struct TestStrategy {
1879 core: StrategyCore,
1880 on_order_rejected_called: bool,
1881 on_order_accepted_called: bool,
1882 on_order_canceled_called: bool,
1883 on_order_filled_called: bool,
1884 on_order_expired_called: bool,
1885 on_position_opened_called: bool,
1886 on_position_changed_called: bool,
1887 on_position_closed_called: bool,
1888 }
1889
1890 impl TestStrategy {
1891 fn new(config: StrategyConfig) -> Self {
1892 Self {
1893 core: StrategyCore::new(config),
1894 on_order_rejected_called: false,
1895 on_order_accepted_called: false,
1896 on_order_canceled_called: false,
1897 on_order_filled_called: false,
1898 on_order_expired_called: false,
1899 on_position_opened_called: false,
1900 on_position_changed_called: false,
1901 on_position_closed_called: false,
1902 }
1903 }
1904 }
1905
1906 impl DataActor for TestStrategy {
1907 fn on_order_canceled(&mut self, _event: &OrderCanceled) -> anyhow::Result<()> {
1908 self.on_order_canceled_called = true;
1909 Ok(())
1910 }
1911
1912 fn on_order_filled(&mut self, _event: &OrderFilled) -> anyhow::Result<()> {
1913 self.on_order_filled_called = true;
1914 Ok(())
1915 }
1916 }
1917
1918 nautilus_strategy!(TestStrategy, {
1919 fn on_order_rejected(&mut self, _event: OrderRejected) {
1920 self.on_order_rejected_called = true;
1921 }
1922
1923 fn on_order_accepted(&mut self, _event: OrderAccepted) {
1924 self.on_order_accepted_called = true;
1925 }
1926
1927 fn on_order_expired(&mut self, _event: OrderExpired) {
1928 self.on_order_expired_called = true;
1929 }
1930
1931 fn on_position_opened(&mut self, _event: PositionOpened) {
1932 self.on_position_opened_called = true;
1933 }
1934
1935 fn on_position_changed(&mut self, _event: PositionChanged) {
1936 self.on_position_changed_called = true;
1937 }
1938
1939 fn on_position_closed(&mut self, _event: PositionClosed) {
1940 self.on_position_closed_called = true;
1941 }
1942 });
1943
1944 fn create_test_strategy() -> TestStrategy {
1945 let config = StrategyConfig {
1946 strategy_id: Some(StrategyId::from("TEST-001")),
1947 order_id_tag: Some("001".to_string()),
1948 ..Default::default()
1949 };
1950 TestStrategy::new(config)
1951 }
1952
1953 fn register_strategy(strategy: &mut TestStrategy) {
1954 let trader_id = TraderId::from("TRADER-001");
1955 let clock = Rc::new(RefCell::new(TestClock::new()));
1956 let cache = Rc::new(RefCell::new(Cache::default()));
1957 let portfolio = Rc::new(RefCell::new(Portfolio::new(
1958 cache.clone(),
1959 clock.clone(),
1960 None,
1961 )));
1962
1963 strategy
1964 .core
1965 .register(trader_id, clock, cache, portfolio)
1966 .unwrap();
1967 strategy.initialize().unwrap();
1968 }
1969
1970 fn start_strategy(strategy: &mut TestStrategy) {
1971 strategy.start().unwrap();
1972 }
1973
1974 fn stop_strategy(strategy: &mut TestStrategy) {
1975 Component::stop(strategy).unwrap();
1976 }
1977
1978 fn make_filled(client_order_id: ClientOrderId) -> OrderEventAny {
1979 OrderEventAny::Filled(OrderFilled {
1980 trader_id: TraderId::from("TRADER-001"),
1981 strategy_id: StrategyId::from("TEST-001"),
1982 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
1983 client_order_id,
1984 venue_order_id: VenueOrderId::test_default(),
1985 account_id: AccountId::from("ACC-001"),
1986 trade_id: TradeId::test_default(),
1987 position_id: None,
1988 order_side: OrderSide::Buy,
1989 order_type: OrderType::Market,
1990 last_qty: Quantity::default(),
1991 last_px: Price::default(),
1992 currency: Currency::from("USD"),
1993 liquidity_side: LiquiditySide::Taker,
1994 event_id: UUID4::default(),
1995 ts_event: UnixNanos::default(),
1996 ts_init: UnixNanos::default(),
1997 reconciliation: false,
1998 commission: None,
1999 })
2000 }
2001
2002 fn make_canceled(client_order_id: ClientOrderId) -> OrderEventAny {
2003 OrderEventAny::Canceled(OrderCanceled {
2004 trader_id: TraderId::from("TRADER-001"),
2005 strategy_id: StrategyId::from("TEST-001"),
2006 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2007 client_order_id,
2008 venue_order_id: None,
2009 account_id: Some(AccountId::from("ACC-001")),
2010 event_id: UUID4::default(),
2011 ts_event: UnixNanos::default(),
2012 ts_init: UnixNanos::default(),
2013 reconciliation: 0,
2014 })
2015 }
2016
2017 fn make_rejected(client_order_id: ClientOrderId) -> OrderEventAny {
2018 OrderEventAny::Rejected(OrderRejected {
2019 trader_id: TraderId::from("TRADER-001"),
2020 strategy_id: StrategyId::from("TEST-001"),
2021 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2022 client_order_id,
2023 account_id: AccountId::from("ACC-001"),
2024 reason: "Test rejection".into(),
2025 event_id: UUID4::default(),
2026 ts_event: UnixNanos::default(),
2027 ts_init: UnixNanos::default(),
2028 reconciliation: 0,
2029 due_post_only: 0,
2030 })
2031 }
2032
2033 fn make_expired(client_order_id: ClientOrderId) -> OrderEventAny {
2034 OrderEventAny::Expired(OrderExpired {
2035 trader_id: TraderId::from("TRADER-001"),
2036 strategy_id: StrategyId::from("TEST-001"),
2037 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2038 client_order_id,
2039 venue_order_id: None,
2040 account_id: Some(AccountId::from("ACC-001")),
2041 event_id: UUID4::default(),
2042 ts_event: UnixNanos::default(),
2043 ts_init: UnixNanos::default(),
2044 reconciliation: 0,
2045 })
2046 }
2047
2048 fn make_accepted(client_order_id: ClientOrderId) -> OrderEventAny {
2049 OrderEventAny::Accepted(OrderAccepted {
2050 trader_id: TraderId::from("TRADER-001"),
2051 strategy_id: StrategyId::from("TEST-001"),
2052 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2053 client_order_id,
2054 venue_order_id: VenueOrderId::test_default(),
2055 account_id: AccountId::from("ACC-001"),
2056 event_id: UUID4::default(),
2057 ts_event: UnixNanos::default(),
2058 ts_init: UnixNanos::default(),
2059 reconciliation: 0,
2060 })
2061 }
2062
2063 fn make_accepted_market_order(client_order_id: &str) -> OrderAny {
2064 let mut order = OrderAny::Market(MarketOrder::new(
2065 TraderId::from("TRADER-001"),
2066 StrategyId::from("TEST-001"),
2067 InstrumentId::from("BTCUSDT.BINANCE"),
2068 ClientOrderId::from(client_order_id),
2069 OrderSide::Buy,
2070 Quantity::from(100_000),
2071 TimeInForce::Gtc,
2072 UUID4::new(),
2073 UnixNanos::default(),
2074 false,
2075 false,
2076 None,
2077 None,
2078 None,
2079 None,
2080 None,
2081 None,
2082 None,
2083 None,
2084 ));
2085 let account_id = AccountId::from("ACC-001");
2086 order
2087 .apply(TestOrderEventStubs::submitted(&order, account_id))
2088 .unwrap();
2089 order
2090 .apply(TestOrderEventStubs::accepted(
2091 &order,
2092 account_id,
2093 VenueOrderId::test_default(),
2094 ))
2095 .unwrap();
2096 order
2097 }
2098
2099 fn make_accepted_limit_order(client_order_id: &str) -> OrderAny {
2100 let mut order = OrderAny::Limit(LimitOrder::new(
2101 TraderId::from("TRADER-001"),
2102 StrategyId::from("TEST-001"),
2103 InstrumentId::from("BTCUSDT.BINANCE"),
2104 ClientOrderId::from(client_order_id),
2105 OrderSide::Buy,
2106 Quantity::from("1.0"),
2107 Price::from("50000.0"),
2108 TimeInForce::Gtc,
2109 None,
2110 false,
2111 false,
2112 false,
2113 None,
2114 None,
2115 None,
2116 None,
2117 None,
2118 None,
2119 None,
2120 None,
2121 None,
2122 None,
2123 None,
2124 UUID4::new(),
2125 UnixNanos::default(),
2126 ));
2127 let account_id = AccountId::from("ACC-001");
2128 order
2129 .apply(TestOrderEventStubs::submitted(&order, account_id))
2130 .unwrap();
2131 order
2132 .apply(TestOrderEventStubs::accepted(
2133 &order,
2134 account_id,
2135 VenueOrderId::test_default(),
2136 ))
2137 .unwrap();
2138 order
2139 }
2140
2141 fn make_initialized_market_order(client_order_id: &str) -> OrderAny {
2142 OrderAny::Market(MarketOrder::new(
2143 TraderId::from("TRADER-001"),
2144 StrategyId::from("TEST-001"),
2145 InstrumentId::from("BTCUSDT.BINANCE"),
2146 ClientOrderId::from(client_order_id),
2147 OrderSide::Buy,
2148 Quantity::from(100_000),
2149 TimeInForce::Gtc,
2150 UUID4::new(),
2151 UnixNanos::default(),
2152 false,
2153 false,
2154 None,
2155 None,
2156 None,
2157 None,
2158 None,
2159 None,
2160 None,
2161 None,
2162 ))
2163 }
2164
2165 fn add_order_to_cache(strategy: &TestStrategy, order: &OrderAny) {
2166 let cache_rc = strategy.core.cache_rc();
2167 let mut cache = cache_rc.borrow_mut();
2168 cache.add_order(order.clone(), None, None, true).unwrap();
2169 }
2170
2171 fn add_order_to_cache_and_own_book(strategy: &TestStrategy, order: &OrderAny) {
2172 let cache_rc = strategy.core.cache_rc();
2173 let mut cache = cache_rc.borrow_mut();
2174 cache.add_order(order.clone(), None, None, true).unwrap();
2175 cache
2176 .add_own_order_book(OwnOrderBook::new(order.instrument_id()))
2177 .unwrap();
2178 cache.update_own_order_book(order);
2179 }
2180
2181 fn make_position_opened() -> PositionEvent {
2182 PositionEvent::PositionOpened(PositionOpened {
2183 trader_id: TraderId::from("TRADER-001"),
2184 strategy_id: StrategyId::from("TEST-001"),
2185 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2186 position_id: PositionId::test_default(),
2187 account_id: AccountId::from("ACC-001"),
2188 opening_order_id: ClientOrderId::from("O-001"),
2189 entry: OrderSide::Buy,
2190 side: PositionSide::Long,
2191 signed_qty: 1.0,
2192 quantity: Quantity::default(),
2193 last_qty: Quantity::default(),
2194 last_px: Price::default(),
2195 currency: Currency::from("USD"),
2196 avg_px_open: 0.0,
2197 event_id: UUID4::default(),
2198 ts_event: UnixNanos::default(),
2199 ts_init: UnixNanos::default(),
2200 })
2201 }
2202
2203 fn make_position_changed() -> PositionEvent {
2204 let currency = Currency::from("USD");
2205 PositionEvent::PositionChanged(PositionChanged {
2206 trader_id: TraderId::from("TRADER-001"),
2207 strategy_id: StrategyId::from("TEST-001"),
2208 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2209 position_id: PositionId::test_default(),
2210 account_id: AccountId::from("ACC-001"),
2211 opening_order_id: ClientOrderId::from("O-001"),
2212 entry: OrderSide::Buy,
2213 side: PositionSide::Long,
2214 signed_qty: 2.0,
2215 quantity: Quantity::default(),
2216 peak_quantity: Quantity::default(),
2217 last_qty: Quantity::default(),
2218 last_px: Price::default(),
2219 currency,
2220 avg_px_open: 0.0,
2221 avg_px_close: None,
2222 realized_return: 0.0,
2223 realized_pnl: None,
2224 unrealized_pnl: Money::new(0.0, currency),
2225 event_id: UUID4::default(),
2226 ts_opened: UnixNanos::default(),
2227 ts_event: UnixNanos::default(),
2228 ts_init: UnixNanos::default(),
2229 })
2230 }
2231
2232 fn make_position_closed() -> PositionEvent {
2233 let currency = Currency::from("USD");
2234 PositionEvent::PositionClosed(PositionClosed {
2235 trader_id: TraderId::from("TRADER-001"),
2236 strategy_id: StrategyId::from("TEST-001"),
2237 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2238 position_id: PositionId::test_default(),
2239 account_id: AccountId::from("ACC-001"),
2240 opening_order_id: ClientOrderId::from("O-001"),
2241 closing_order_id: Some(ClientOrderId::from("O-002")),
2242 entry: OrderSide::Buy,
2243 side: PositionSide::Flat,
2244 signed_qty: 0.0,
2245 quantity: Quantity::default(),
2246 peak_quantity: Quantity::default(),
2247 last_qty: Quantity::default(),
2248 last_px: Price::default(),
2249 currency,
2250 avg_px_open: 0.0,
2251 avg_px_close: None,
2252 realized_return: 0.0,
2253 realized_pnl: None,
2254 unrealized_pnl: Money::new(0.0, currency),
2255 duration: 0,
2256 event_id: UUID4::default(),
2257 ts_opened: UnixNanos::default(),
2258 ts_closed: None,
2259 ts_event: UnixNanos::default(),
2260 ts_init: UnixNanos::default(),
2261 })
2262 }
2263
2264 #[rstest]
2265 fn test_strategy_creation() {
2266 let strategy = create_test_strategy();
2267 assert_eq!(
2268 strategy.core.config.strategy_id,
2269 Some(StrategyId::from("TEST-001"))
2270 );
2271 assert!(!strategy.on_order_rejected_called);
2272 assert!(!strategy.on_position_opened_called);
2273 }
2274
2275 #[rstest]
2276 fn test_strategy_registration() {
2277 let mut strategy = create_test_strategy();
2278 register_strategy(&mut strategy);
2279
2280 assert!(strategy.core.order_manager.is_some());
2281 assert!(strategy.core.order_factory.is_some());
2282 assert!(strategy.core.portfolio.is_some());
2283 }
2284
2285 #[rstest]
2286 fn test_handle_order_event_dispatches_to_handler() {
2287 let mut strategy = create_test_strategy();
2288 register_strategy(&mut strategy);
2289 start_strategy(&mut strategy);
2290
2291 let event = OrderEventAny::Rejected(OrderRejected {
2292 trader_id: TraderId::from("TRADER-001"),
2293 strategy_id: StrategyId::from("TEST-001"),
2294 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2295 client_order_id: ClientOrderId::from("O-001"),
2296 account_id: AccountId::from("ACC-001"),
2297 reason: "Test rejection".into(),
2298 event_id: UUID4::default(),
2299 ts_event: UnixNanos::default(),
2300 ts_init: UnixNanos::default(),
2301 reconciliation: 0,
2302 due_post_only: 0,
2303 });
2304
2305 strategy.handle_order_event(event);
2306
2307 assert!(strategy.on_order_rejected_called);
2308 }
2309
2310 #[rstest]
2311 #[case::opened(make_position_opened())]
2312 #[case::changed(make_position_changed())]
2313 #[case::closed(make_position_closed())]
2314 fn test_handle_position_event_dispatches_to_handler(#[case] event: PositionEvent) {
2315 let mut strategy = create_test_strategy();
2316 register_strategy(&mut strategy);
2317 start_strategy(&mut strategy);
2318
2319 let expected_opened = matches!(event, PositionEvent::PositionOpened(_));
2320 let expected_changed = matches!(event, PositionEvent::PositionChanged(_));
2321 let expected_closed = matches!(event, PositionEvent::PositionClosed(_));
2322
2323 strategy.handle_position_event(event);
2324
2325 assert_eq!(strategy.on_position_opened_called, expected_opened);
2326 assert_eq!(strategy.on_position_changed_called, expected_changed);
2327 assert_eq!(strategy.on_position_closed_called, expected_closed);
2328 }
2329
2330 #[rstest]
2331 fn test_handle_position_event_skips_dispatch_when_stopped() {
2332 let mut strategy = create_test_strategy();
2333 register_strategy(&mut strategy);
2334 start_strategy(&mut strategy);
2335 stop_strategy(&mut strategy);
2336 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
2337
2338 strategy.handle_position_event(make_position_opened());
2339
2340 assert!(!strategy.on_position_opened_called);
2341 }
2342
2343 #[rstest]
2344 fn test_strategy_default_handlers_do_not_panic() {
2345 let mut strategy = create_test_strategy();
2346
2347 strategy.on_order_initialized(OrderInitialized::default());
2348 strategy.on_order_denied(OrderDenied::default());
2349 strategy.on_order_emulated(OrderEmulated::default());
2350 strategy.on_order_released(OrderReleased::default());
2351 strategy.on_order_submitted(OrderSubmitted::default());
2352 strategy.on_order_rejected(OrderRejected::default());
2353 let _ = DataActor::on_order_canceled(&mut strategy, &OrderCanceled::default());
2354 strategy.on_order_expired(OrderExpired::default());
2355 strategy.on_order_triggered(OrderTriggered::default());
2356 strategy.on_order_pending_update(OrderPendingUpdate::default());
2357 strategy.on_order_pending_cancel(OrderPendingCancel::default());
2358 strategy.on_order_modify_rejected(OrderModifyRejected::default());
2359 strategy.on_order_cancel_rejected(OrderCancelRejected::default());
2360 strategy.on_order_updated(OrderUpdated::default());
2361 }
2362
2363 #[rstest]
2364 fn test_submit_order_publishes_order_initialized_after_cache_insert_before_send() {
2365 let mut strategy = create_test_strategy();
2366 register_strategy(&mut strategy);
2367
2368 let order = make_initialized_market_order("O-20250208-INIT-001");
2369 let client_order_id = order.client_order_id();
2370 let cache_rc = strategy.core.cache_rc();
2371 let timeline = Rc::new(RefCell::new(Vec::new()));
2372 let event_messages = Rc::new(RefCell::new(Vec::new()));
2373
2374 let event_handler = {
2375 let event_messages = event_messages.clone();
2376 let timeline = timeline.clone();
2377 TypedHandler::from_with_id("events.order.initialized", move |event: &OrderEventAny| {
2378 assert!(cache_rc.borrow().order_exists(&client_order_id));
2379 assert!(matches!(event, OrderEventAny::Initialized(_)));
2380 event_messages.borrow_mut().push(event.clone());
2381 timeline.borrow_mut().push("init");
2382 })
2383 };
2384 let risk_handler = {
2385 let timeline = timeline.clone();
2386 TypedIntoHandler::from_with_id(
2387 "RiskEngine.queue_execute",
2388 move |command: TradingCommand| {
2389 assert!(matches!(command, TradingCommand::SubmitOrder(_)));
2390 timeline.borrow_mut().push("command");
2391 },
2392 )
2393 };
2394 msgbus::register_trading_command_endpoint(
2395 MessagingSwitchboard::risk_engine_queue_execute(),
2396 risk_handler,
2397 );
2398
2399 let topic = format!("events.order.{}", order.strategy_id());
2400 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2401
2402 strategy
2403 .submit_order(order.clone(), None, None, None)
2404 .unwrap();
2405
2406 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2407
2408 let event_messages = event_messages.borrow();
2409 assert_eq!(event_messages.len(), 1);
2410 assert_eq!(
2411 event_messages[0],
2412 OrderEventAny::Initialized(order.init_event().clone())
2413 );
2414 assert_eq!(timeline.borrow().as_slice(), &["init", "command"]);
2415 }
2416
2417 #[rstest]
2418 fn test_submit_order_rejects_non_initialized_without_events() {
2419 let mut strategy = create_test_strategy();
2420 register_strategy(&mut strategy);
2421
2422 let order = make_accepted_market_order("O-20250208-ACCEPTED-001");
2423 let topic = format!("events.order.{}", order.strategy_id());
2424 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2425 get_typed_message_saving_handler(Some(Ustr::from("events.order.invalid")));
2426
2427 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2428 let result = strategy.submit_order(order, None, None, None);
2429
2430 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2431
2432 assert!(result.is_err());
2433 assert!(
2434 result
2435 .unwrap_err()
2436 .to_string()
2437 .contains("expected INITIALIZED")
2438 );
2439 assert!(event_messages.get_messages().is_empty());
2440 }
2441
2442 #[rstest]
2443 fn test_submit_order_list_publishes_order_initialized_after_cache_insert_before_send() {
2444 let mut strategy = create_test_strategy();
2445 register_strategy(&mut strategy);
2446
2447 let order_list_id = OrderListId::from("OL-20250208-LIST-INIT");
2448 let mut orders = vec![
2449 make_initialized_market_order("O-20250208-LIST-INIT-001"),
2450 make_initialized_market_order("O-20250208-LIST-INIT-002"),
2451 ];
2452
2453 for order in &mut orders {
2454 order.set_order_list_id(order_list_id);
2455 }
2456
2457 let client_order_id1 = orders[0].client_order_id();
2458 let client_order_id2 = orders[1].client_order_id();
2459 let cache_rc = strategy.core.cache_rc();
2460 let timeline = Rc::new(RefCell::new(Vec::new()));
2461 let event_messages = Rc::new(RefCell::new(Vec::new()));
2462
2463 let event_handler = {
2464 let event_messages = event_messages.clone();
2465 let timeline = timeline.clone();
2466 TypedHandler::from_with_id(
2467 "events.order.list_initialized",
2468 move |event: &OrderEventAny| {
2469 match event {
2470 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id1 => {
2471 assert!(cache_rc.borrow().order_exists(&client_order_id1));
2472 timeline.borrow_mut().push("init1");
2473 }
2474 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id2 => {
2475 assert!(cache_rc.borrow().order_exists(&client_order_id2));
2476 timeline.borrow_mut().push("init2");
2477 }
2478 _ => panic!("unexpected order event {event:?}"),
2479 }
2480 event_messages.borrow_mut().push(event.clone());
2481 },
2482 )
2483 };
2484 let risk_handler = {
2485 let timeline = timeline.clone();
2486 TypedIntoHandler::from_with_id(
2487 "RiskEngine.queue_execute",
2488 move |command: TradingCommand| {
2489 assert!(matches!(command, TradingCommand::SubmitOrderList(_)));
2490 timeline.borrow_mut().push("command");
2491 },
2492 )
2493 };
2494 msgbus::register_trading_command_endpoint(
2495 MessagingSwitchboard::risk_engine_queue_execute(),
2496 risk_handler,
2497 );
2498
2499 let topic = format!("events.order.{}", orders[0].strategy_id());
2500 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2501
2502 strategy
2503 .submit_order_list(orders.clone(), None, None, None)
2504 .unwrap();
2505
2506 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2507
2508 let event_messages = event_messages.borrow();
2509 assert_eq!(event_messages.len(), 2);
2510 assert_eq!(
2511 event_messages[0],
2512 OrderEventAny::Initialized(orders[0].init_event().clone())
2513 );
2514 assert_eq!(
2515 event_messages[1],
2516 OrderEventAny::Initialized(orders[1].init_event().clone())
2517 );
2518 assert_eq!(timeline.borrow().as_slice(), &["init1", "init2", "command"]);
2519 }
2520
2521 #[rstest]
2522 fn test_submit_order_list_create_list_branch_publishes_init_after_cache_insert() {
2523 let mut strategy = create_test_strategy();
2524 register_strategy(&mut strategy);
2525
2526 let orders = vec![
2527 make_initialized_market_order("O-20250208-LIST-CREATE-001"),
2528 make_initialized_market_order("O-20250208-LIST-CREATE-002"),
2529 ];
2530
2531 let client_order_id1 = orders[0].client_order_id();
2532 let client_order_id2 = orders[1].client_order_id();
2533 let cache_rc = strategy.core.cache_rc();
2534 let timeline = Rc::new(RefCell::new(Vec::new()));
2535 let event_messages = Rc::new(RefCell::new(Vec::new()));
2536
2537 let event_handler = {
2538 let event_messages = event_messages.clone();
2539 let timeline = timeline.clone();
2540 TypedHandler::from_with_id(
2541 "events.order.list_create_initialized",
2542 move |event: &OrderEventAny| {
2543 match event {
2544 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id1 => {
2545 assert!(cache_rc.borrow().order_exists(&client_order_id1));
2546 timeline.borrow_mut().push("init1");
2547 }
2548 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id2 => {
2549 assert!(cache_rc.borrow().order_exists(&client_order_id2));
2550 timeline.borrow_mut().push("init2");
2551 }
2552 _ => panic!("unexpected order event {event:?}"),
2553 }
2554 event_messages.borrow_mut().push(event.clone());
2555 },
2556 )
2557 };
2558 let risk_handler = {
2559 let timeline = timeline.clone();
2560 TypedIntoHandler::from_with_id(
2561 "RiskEngine.queue_execute",
2562 move |command: TradingCommand| {
2563 assert!(matches!(command, TradingCommand::SubmitOrderList(_)));
2564 timeline.borrow_mut().push("command");
2565 },
2566 )
2567 };
2568 msgbus::register_trading_command_endpoint(
2569 MessagingSwitchboard::risk_engine_queue_execute(),
2570 risk_handler,
2571 );
2572
2573 let topic = format!("events.order.{}", orders[0].strategy_id());
2574 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2575
2576 strategy
2577 .submit_order_list(orders.clone(), None, None, None)
2578 .unwrap();
2579
2580 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2581
2582 let event_messages = event_messages.borrow();
2583 assert_eq!(event_messages.len(), 2);
2584 assert_eq!(
2585 event_messages[0],
2586 OrderEventAny::Initialized(orders[0].init_event().clone())
2587 );
2588 assert_eq!(
2589 event_messages[1],
2590 OrderEventAny::Initialized(orders[1].init_event().clone())
2591 );
2592 assert_eq!(timeline.borrow().as_slice(), &["init1", "init2", "command"]);
2593
2594 let cache = strategy.core.cache();
2595 let cached_order1 = cache.order(&client_order_id1).unwrap();
2596 let cached_order2 = cache.order(&client_order_id2).unwrap();
2597 let order_list_id = cached_order1.order_list_id().unwrap();
2598 assert_eq!(cached_order2.order_list_id(), Some(order_list_id));
2599
2600 let order_list = cache.order_list(&order_list_id).unwrap();
2601 assert_eq!(
2602 order_list.client_order_ids.as_slice(),
2603 &[client_order_id1, client_order_id2]
2604 );
2605 }
2606
2607 #[rstest]
2608 fn test_submit_order_list_routes_optional_params_to_risk() {
2609 let mut strategy = create_test_strategy();
2610 register_strategy(&mut strategy);
2611
2612 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2613 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2614 msgbus::register_trading_command_endpoint(
2615 MessagingSwitchboard::risk_engine_queue_execute(),
2616 risk_handler,
2617 );
2618
2619 let no_params_orders = vec![
2620 make_initialized_market_order("O-20250208-LIST-001"),
2621 make_initialized_market_order("O-20250208-LIST-002"),
2622 ];
2623 strategy
2624 .submit_order_list(no_params_orders, None, None, None)
2625 .unwrap();
2626
2627 let mut params = Params::new();
2628 params.insert(
2629 "routing_hint".to_string(),
2630 Value::String("prefer_batch".to_string()),
2631 );
2632 let param_orders = vec![
2633 make_initialized_market_order("O-20250208-LIST-003"),
2634 make_initialized_market_order("O-20250208-LIST-004"),
2635 ];
2636 strategy
2637 .submit_order_list(param_orders, None, None, Some(params.clone()))
2638 .unwrap();
2639
2640 let risk_messages = risk_messages.get_messages();
2641 assert_eq!(risk_messages.len(), 2);
2642 let Some(TradingCommand::SubmitOrderList(no_params_command)) = risk_messages.first() else {
2643 panic!("expected SubmitOrderList command");
2644 };
2645 let Some(TradingCommand::SubmitOrderList(param_command)) = risk_messages.get(1) else {
2646 panic!("expected SubmitOrderList command");
2647 };
2648 assert!(no_params_command.params.is_none());
2649 assert_eq!(param_command.params.as_ref(), Some(¶ms));
2650 }
2651
2652 #[rstest]
2653 fn test_modify_order_routes_non_emulated_orders_to_risk() {
2654 let mut strategy = create_test_strategy();
2655 register_strategy(&mut strategy);
2656
2657 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2658 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2659 msgbus::register_trading_command_endpoint(
2660 MessagingSwitchboard::risk_engine_queue_execute(),
2661 risk_handler,
2662 );
2663
2664 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2665 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2666 msgbus::register_trading_command_endpoint(
2667 MessagingSwitchboard::exec_engine_queue_execute(),
2668 exec_handler,
2669 );
2670
2671 let order = OrderAny::Market(MarketOrder::new(
2672 TraderId::from("TRADER-001"),
2673 StrategyId::from("TEST-001"),
2674 InstrumentId::from("BTCUSDT.BINANCE"),
2675 ClientOrderId::from("O-20250208-0003"),
2676 OrderSide::Buy,
2677 Quantity::from(100_000),
2678 TimeInForce::Gtc,
2679 UUID4::new(),
2680 UnixNanos::default(),
2681 false,
2682 false,
2683 None,
2684 None,
2685 None,
2686 None,
2687 None,
2688 None,
2689 None,
2690 None,
2691 ));
2692 add_order_to_cache(&strategy, &order);
2693
2694 strategy
2695 .modify_order(
2696 order.client_order_id(),
2697 Some(Quantity::from(200_000)),
2698 None,
2699 None,
2700 None,
2701 None,
2702 )
2703 .unwrap();
2704
2705 let risk_messages = risk_messages.get_messages();
2706 let exec_messages = exec_messages.get_messages();
2707
2708 assert_eq!(risk_messages.len(), 1);
2709 assert!(matches!(
2710 risk_messages.first(),
2711 Some(TradingCommand::ModifyOrder(_))
2712 ));
2713 assert!(exec_messages.is_empty());
2714 }
2715
2716 #[rstest]
2717 fn test_modify_order_marks_order_pending_update_locally_before_send() {
2718 let mut strategy = create_test_strategy();
2719 register_strategy(&mut strategy);
2720
2721 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2722 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2723 msgbus::register_trading_command_endpoint(
2724 MessagingSwitchboard::risk_engine_queue_execute(),
2725 risk_handler,
2726 );
2727
2728 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2729 get_typed_message_saving_handler(Some(Ustr::from("events.order.pending_update")));
2730 let order = make_accepted_limit_order("O-20250208-UPDATE-001");
2731 let topic = format!("events.order.{}", order.strategy_id());
2732 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2733 add_order_to_cache(&strategy, &order);
2734
2735 strategy
2736 .modify_order(
2737 order.client_order_id(),
2738 None,
2739 Some(Price::from("51000.0")),
2740 None,
2741 None,
2742 None,
2743 )
2744 .unwrap();
2745
2746 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2747
2748 {
2749 let cache = strategy.core.cache();
2750 let cached_order = cache.order(&order.client_order_id()).unwrap();
2751 assert_eq!(cached_order.status(), OrderStatus::PendingUpdate);
2752 }
2753
2754 let risk_messages = risk_messages.get_messages();
2755 assert_eq!(risk_messages.len(), 1);
2756 assert!(matches!(
2757 risk_messages.first(),
2758 Some(TradingCommand::ModifyOrder(_))
2759 ));
2760
2761 let event_messages = event_messages.get_messages();
2762 assert_eq!(event_messages.len(), 1);
2763 assert!(matches!(
2764 event_messages.first(),
2765 Some(OrderEventAny::PendingUpdate(_))
2766 ));
2767 }
2768
2769 #[rstest]
2770 fn test_cancel_order_marks_order_pending_cancel_locally_before_send() {
2771 let mut strategy = create_test_strategy();
2772 register_strategy(&mut strategy);
2773
2774 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2775 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2776 msgbus::register_trading_command_endpoint(
2777 MessagingSwitchboard::exec_engine_queue_execute(),
2778 exec_handler,
2779 );
2780
2781 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2782 get_typed_message_saving_handler(Some(Ustr::from("events.order.pending_cancel")));
2783 let order = make_accepted_market_order("O-20250208-CANCEL-001");
2784 let topic = format!("events.order.{}", order.strategy_id());
2785 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2786 add_order_to_cache(&strategy, &order);
2787
2788 strategy
2789 .cancel_order(order.client_order_id(), None, None)
2790 .unwrap();
2791
2792 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2793
2794 {
2795 let cache = strategy.core.cache();
2796 let cached_order = cache.order(&order.client_order_id()).unwrap();
2797 assert_eq!(cached_order.status(), OrderStatus::PendingCancel);
2798 assert!(cache.is_order_pending_cancel_local(&order.client_order_id()));
2799 }
2800
2801 let exec_messages = exec_messages.get_messages();
2802 assert_eq!(exec_messages.len(), 1);
2803 assert!(matches!(
2804 exec_messages.first(),
2805 Some(TradingCommand::CancelOrder(_))
2806 ));
2807
2808 let event_messages = event_messages.get_messages();
2809 assert_eq!(event_messages.len(), 1);
2810 assert!(matches!(
2811 event_messages.first(),
2812 Some(OrderEventAny::PendingCancel(_))
2813 ));
2814 }
2815
2816 #[rstest]
2817 fn test_cancel_orders_marks_orders_pending_cancel_locally_before_send() {
2818 let mut strategy = create_test_strategy();
2819 register_strategy(&mut strategy);
2820
2821 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2822 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2823 msgbus::register_trading_command_endpoint(
2824 MessagingSwitchboard::exec_engine_queue_execute(),
2825 exec_handler,
2826 );
2827
2828 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2829 get_typed_message_saving_handler(Some(Ustr::from("events.order.batch_pending_cancel")));
2830 let order1 = make_accepted_market_order("O-20250208-CANCEL-001");
2831 let order2 = make_accepted_market_order("O-20250208-CANCEL-002");
2832 let topic = format!("events.order.{}", order1.strategy_id());
2833 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2834 add_order_to_cache(&strategy, &order1);
2835 add_order_to_cache(&strategy, &order2);
2836
2837 strategy
2838 .cancel_orders(
2839 vec![order1.client_order_id(), order2.client_order_id()],
2840 None,
2841 None,
2842 )
2843 .unwrap();
2844
2845 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2846
2847 {
2848 let cache = strategy.core.cache();
2849 let cached_order1 = cache.order(&order1.client_order_id()).unwrap();
2850 let cached_order2 = cache.order(&order2.client_order_id()).unwrap();
2851 assert_eq!(cached_order1.status(), OrderStatus::PendingCancel);
2852 assert_eq!(cached_order2.status(), OrderStatus::PendingCancel);
2853 assert!(cache.is_order_pending_cancel_local(&order1.client_order_id()));
2854 assert!(cache.is_order_pending_cancel_local(&order2.client_order_id()));
2855 }
2856
2857 let exec_messages = exec_messages.get_messages();
2858 assert_eq!(exec_messages.len(), 1);
2859 let Some(TradingCommand::BatchCancelOrders(command)) = exec_messages.first() else {
2860 panic!("expected BatchCancelOrders command");
2861 };
2862 assert_eq!(command.cancels.len(), 2);
2863
2864 let event_messages = event_messages.get_messages();
2865 assert_eq!(event_messages.len(), 2);
2866 assert!(
2867 event_messages
2868 .iter()
2869 .all(|event| matches!(event, OrderEventAny::PendingCancel(_)))
2870 );
2871 }
2872
2873 #[rstest]
2874 fn test_cancel_order_updates_own_book_status_before_send() {
2875 let mut strategy = create_test_strategy();
2876 register_strategy(&mut strategy);
2877
2878 let (exec_handler, _exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2879 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2880 msgbus::register_trading_command_endpoint(
2881 MessagingSwitchboard::exec_engine_queue_execute(),
2882 exec_handler,
2883 );
2884
2885 let order = make_accepted_limit_order("O-20250208-CANCEL-OWN-BOOK-001");
2886 add_order_to_cache_and_own_book(&strategy, &order);
2887
2888 strategy
2889 .cancel_order(order.client_order_id(), None, None)
2890 .unwrap();
2891
2892 let mut accepted = AHashSet::new();
2893 accepted.insert(OrderStatus::Accepted);
2894 let mut pending_cancel = AHashSet::new();
2895 pending_cancel.insert(OrderStatus::PendingCancel);
2896
2897 let cache = strategy.core.cache();
2898 let own_book = cache.own_order_book(&order.instrument_id()).unwrap();
2899 assert!(own_book.bids_as_map(Some(&accepted), None, None).is_empty());
2900 let pending_bids = own_book.bids_as_map(Some(&pending_cancel), None, None);
2901 assert_eq!(pending_bids.values().map(Vec::len).sum::<usize>(), 1);
2902 }
2903
2904 #[rstest]
2905 fn test_cancel_order_returns_error_when_not_in_cache() {
2906 let mut strategy = create_test_strategy();
2907 register_strategy(&mut strategy);
2908
2909 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2910 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2911 msgbus::register_trading_command_endpoint(
2912 MessagingSwitchboard::exec_engine_queue_execute(),
2913 exec_handler,
2914 );
2915
2916 let missing_id = ClientOrderId::from("O-MISSING");
2917 let err = strategy
2918 .cancel_order(missing_id, None, None)
2919 .expect_err("expected cancel_order to fail when order is not in cache");
2920
2921 assert!(
2922 err.to_string().contains("not found in cache"),
2923 "unexpected error: {err}"
2924 );
2925 assert!(exec_messages.get_messages().is_empty());
2926 }
2927
2928 #[rstest]
2929 fn test_modify_order_returns_error_when_not_in_cache() {
2930 let mut strategy = create_test_strategy();
2931 register_strategy(&mut strategy);
2932
2933 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2934 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2935 msgbus::register_trading_command_endpoint(
2936 MessagingSwitchboard::risk_engine_queue_execute(),
2937 risk_handler,
2938 );
2939
2940 let missing_id = ClientOrderId::from("O-MISSING");
2941 let err = strategy
2942 .modify_order(missing_id, Some(Quantity::from(1)), None, None, None, None)
2943 .expect_err("expected modify_order to fail when order is not in cache");
2944
2945 assert!(
2946 err.to_string().contains("not found in cache"),
2947 "unexpected error: {err}"
2948 );
2949 assert!(risk_messages.get_messages().is_empty());
2950 }
2951
2952 #[rstest]
2953 fn test_cancel_orders_returns_error_when_any_id_missing() {
2954 let mut strategy = create_test_strategy();
2955 register_strategy(&mut strategy);
2956
2957 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2958 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2959 msgbus::register_trading_command_endpoint(
2960 MessagingSwitchboard::exec_engine_queue_execute(),
2961 exec_handler,
2962 );
2963
2964 let order = make_accepted_limit_order("O-PRESENT");
2965 add_order_to_cache(&strategy, &order);
2966
2967 let err = strategy
2968 .cancel_orders(
2969 vec![order.client_order_id(), ClientOrderId::from("O-MISSING")],
2970 None,
2971 None,
2972 )
2973 .expect_err("expected cancel_orders to fail when any id is missing");
2974
2975 assert!(
2976 err.to_string().contains("not found in cache"),
2977 "unexpected error: {err}"
2978 );
2979 assert!(exec_messages.get_messages().is_empty());
2980 }
2981
2982 #[rstest]
2985 fn test_has_gtd_expiry_timer_when_timer_not_set() {
2986 let mut strategy = create_test_strategy();
2987 let client_order_id = ClientOrderId::from("O-001");
2988
2989 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
2990 }
2991
2992 #[rstest]
2993 fn test_has_gtd_expiry_timer_when_timer_set() {
2994 let mut strategy = create_test_strategy();
2995 let client_order_id = ClientOrderId::from("O-001");
2996
2997 strategy
2998 .core
2999 .gtd_timers
3000 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3001
3002 assert!(strategy.has_gtd_expiry_timer(&client_order_id));
3003 }
3004
3005 #[rstest]
3006 fn test_cancel_gtd_expiry_removes_timer() {
3007 let mut strategy = create_test_strategy();
3008 register_strategy(&mut strategy);
3009
3010 let client_order_id = ClientOrderId::from("O-001");
3011 strategy
3012 .core
3013 .gtd_timers
3014 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3015
3016 strategy.cancel_gtd_expiry(&client_order_id);
3017
3018 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3019 }
3020
3021 #[rstest]
3022 fn test_cancel_gtd_expiry_when_timer_not_set() {
3023 let mut strategy = create_test_strategy();
3024 register_strategy(&mut strategy);
3025
3026 let client_order_id = ClientOrderId::from("O-001");
3027
3028 strategy.cancel_gtd_expiry(&client_order_id);
3029
3030 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3031 }
3032
3033 #[rstest]
3034 #[case::filled(make_filled)]
3035 #[case::canceled(make_canceled)]
3036 #[case::rejected(make_rejected)]
3037 #[case::expired(make_expired)]
3038 fn test_handle_order_event_cancels_gtd_timer_for_terminal_event(
3039 #[case] make_event: fn(ClientOrderId) -> OrderEventAny,
3040 ) {
3041 let mut strategy = create_test_strategy();
3042 register_strategy(&mut strategy);
3043 start_strategy(&mut strategy);
3044
3045 let client_order_id = ClientOrderId::from("O-001");
3046 strategy
3047 .core
3048 .gtd_timers
3049 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3050
3051 strategy.handle_order_event(make_event(client_order_id));
3052
3053 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3054 }
3055
3056 #[rstest]
3057 #[case::filled(make_filled)]
3058 #[case::canceled(make_canceled)]
3059 #[case::rejected(make_rejected)]
3060 #[case::expired(make_expired)]
3061 fn test_handle_order_event_cancels_gtd_timer_when_stopped(
3062 #[case] make_event: fn(ClientOrderId) -> OrderEventAny,
3063 ) {
3064 let mut strategy = create_test_strategy();
3065 register_strategy(&mut strategy);
3066 start_strategy(&mut strategy);
3067
3068 let client_order_id = ClientOrderId::from("O-001");
3069 strategy
3070 .core
3071 .gtd_timers
3072 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3073
3074 stop_strategy(&mut strategy);
3075 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
3076
3077 strategy.handle_order_event(make_event(client_order_id));
3078
3079 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3080 }
3081
3082 #[rstest]
3083 fn test_handle_order_event_skips_gtd_cancel_for_non_terminal() {
3084 let mut strategy = create_test_strategy();
3085 register_strategy(&mut strategy);
3086 start_strategy(&mut strategy);
3087
3088 let client_order_id = ClientOrderId::from("O-001");
3089 strategy
3090 .core
3091 .gtd_timers
3092 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3093
3094 strategy.handle_order_event(make_accepted(client_order_id));
3095
3096 assert!(strategy.has_gtd_expiry_timer(&client_order_id));
3097 }
3098
3099 #[rstest]
3100 fn test_handle_order_event_skips_dispatch_when_stopped() {
3101 let mut strategy = create_test_strategy();
3102 register_strategy(&mut strategy);
3103 start_strategy(&mut strategy);
3104 stop_strategy(&mut strategy);
3105 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
3106
3107 strategy.handle_order_event(make_rejected(ClientOrderId::from("O-001")));
3108
3109 assert!(!strategy.on_order_rejected_called);
3110 }
3111
3112 #[rstest]
3113 fn test_on_start_calls_reactivate_gtd_timers_when_enabled() {
3114 let config = StrategyConfig {
3115 strategy_id: Some(StrategyId::from("TEST-001")),
3116 order_id_tag: Some("001".to_string()),
3117 manage_gtd_expiry: true,
3118 ..Default::default()
3119 };
3120 let mut strategy = TestStrategy::new(config);
3121 register_strategy(&mut strategy);
3122
3123 let result = Strategy::on_start(&mut strategy);
3124 assert!(result.is_ok());
3125 }
3126
3127 #[rstest]
3128 fn test_on_start_does_not_panic_when_gtd_disabled() {
3129 let config = StrategyConfig {
3130 strategy_id: Some(StrategyId::from("TEST-001")),
3131 order_id_tag: Some("001".to_string()),
3132 manage_gtd_expiry: false,
3133 ..Default::default()
3134 };
3135 let mut strategy = TestStrategy::new(config);
3136 register_strategy(&mut strategy);
3137
3138 let result = Strategy::on_start(&mut strategy);
3139 assert!(result.is_ok());
3140 }
3141
3142 #[rstest]
3145 fn test_query_account_when_registered() {
3146 let mut strategy = create_test_strategy();
3147 register_strategy(&mut strategy);
3148
3149 let account_id = AccountId::from("ACC-001");
3150
3151 let result = strategy.query_account(account_id, None, None);
3152
3153 assert!(result.is_ok());
3154 }
3155
3156 #[rstest]
3157 fn test_query_account_with_client_id() {
3158 let mut strategy = create_test_strategy();
3159 register_strategy(&mut strategy);
3160
3161 let account_id = AccountId::from("ACC-001");
3162 let client_id = ClientId::from("BINANCE");
3163
3164 let result = strategy.query_account(account_id, Some(client_id), None);
3165
3166 assert!(result.is_ok());
3167 }
3168
3169 #[rstest]
3170 fn test_query_order_when_registered() {
3171 let mut strategy = create_test_strategy();
3172 register_strategy(&mut strategy);
3173
3174 let order = OrderAny::Market(MarketOrder::test_default());
3175
3176 let result = strategy.query_order(&order, None, None);
3177
3178 assert!(result.is_ok());
3179 }
3180
3181 #[rstest]
3182 fn test_query_order_with_client_id() {
3183 let mut strategy = create_test_strategy();
3184 register_strategy(&mut strategy);
3185
3186 let order = OrderAny::Market(MarketOrder::test_default());
3187 let client_id = ClientId::from("BINANCE");
3188
3189 let result = strategy.query_order(&order, Some(client_id), None);
3190
3191 assert!(result.is_ok());
3192 }
3193
3194 #[rstest]
3195 fn test_is_exiting_returns_false_by_default() {
3196 let strategy = create_test_strategy();
3197 assert!(!strategy.is_exiting());
3198 }
3199
3200 #[rstest]
3201 fn test_is_exiting_returns_true_when_set_manually() {
3202 let mut strategy = create_test_strategy();
3203 register_strategy(&mut strategy);
3204
3205 strategy.core.is_exiting = true;
3207
3208 assert!(strategy.is_exiting());
3209 }
3210
3211 #[rstest]
3212 fn test_market_exit_sets_is_exiting_flag() {
3213 let mut strategy = create_test_strategy();
3215 register_strategy(&mut strategy);
3216
3217 assert!(!strategy.core.is_exiting);
3218
3219 strategy.core.is_exiting = true;
3221 strategy.core.market_exit_attempts = 0;
3222
3223 assert!(strategy.core.is_exiting);
3224 assert_eq!(strategy.core.market_exit_attempts, 0);
3225 }
3226
3227 #[rstest]
3228 fn test_market_exit_uses_config_time_in_force_and_reduce_only() {
3229 let config = StrategyConfig {
3230 strategy_id: Some(StrategyId::from("TEST-001")),
3231 order_id_tag: Some("001".to_string()),
3232 market_exit_time_in_force: TimeInForce::Ioc,
3233 market_exit_reduce_only: false,
3234 ..Default::default()
3235 };
3236 let strategy = TestStrategy::new(config);
3237
3238 assert_eq!(
3239 strategy.core.config.market_exit_time_in_force,
3240 TimeInForce::Ioc
3241 );
3242 assert!(!strategy.core.config.market_exit_reduce_only);
3243 }
3244
3245 #[rstest]
3246 fn test_market_exit_resets_attempt_counter() {
3247 let mut strategy = create_test_strategy();
3248 register_strategy(&mut strategy);
3249
3250 strategy.core.market_exit_attempts = 50;
3252
3253 strategy.core.reset_market_exit_state();
3255
3256 assert_eq!(strategy.core.market_exit_attempts, 0);
3257 }
3258
3259 #[rstest]
3260 fn test_market_exit_second_call_returns_early_when_exiting() {
3261 let mut strategy = create_test_strategy();
3262 register_strategy(&mut strategy);
3263
3264 strategy.core.is_exiting = true;
3266
3267 let result = strategy.market_exit();
3269 assert!(result.is_ok());
3270 assert!(strategy.core.is_exiting);
3271 }
3272
3273 #[rstest]
3274 fn test_finalize_market_exit_resets_state() {
3275 let mut strategy = create_test_strategy();
3276 register_strategy(&mut strategy);
3277
3278 strategy.core.is_exiting = true;
3280 strategy.core.pending_stop = true;
3281 strategy.core.market_exit_attempts = 50;
3282
3283 strategy.finalize_market_exit();
3284
3285 assert!(!strategy.core.is_exiting);
3286 assert!(!strategy.core.pending_stop);
3287 assert_eq!(strategy.core.market_exit_attempts, 0);
3288 }
3289
3290 #[rstest]
3291 fn test_market_exit_config_defaults() {
3292 let config = StrategyConfig::default();
3293
3294 assert!(!config.manage_stop);
3295 assert_eq!(config.market_exit_interval_ms, 100);
3296 assert_eq!(config.market_exit_max_attempts, 100);
3297 }
3298
3299 #[rstest]
3300 fn test_market_exit_with_custom_config() {
3301 let config = StrategyConfig {
3302 strategy_id: Some(StrategyId::from("TEST-001")),
3303 manage_stop: true,
3304 market_exit_interval_ms: 50,
3305 market_exit_max_attempts: 200,
3306 ..Default::default()
3307 };
3308 let strategy = TestStrategy::new(config);
3309
3310 assert!(strategy.core.config.manage_stop);
3311 assert_eq!(strategy.core.config.market_exit_interval_ms, 50);
3312 assert_eq!(strategy.core.config.market_exit_max_attempts, 200);
3313 }
3314
3315 #[derive(Debug)]
3316 struct MarketExitHookTrackingStrategy {
3317 core: StrategyCore,
3318 on_market_exit_called: bool,
3319 post_market_exit_called: bool,
3320 }
3321
3322 impl MarketExitHookTrackingStrategy {
3323 fn new(config: StrategyConfig) -> Self {
3324 Self {
3325 core: StrategyCore::new(config),
3326 on_market_exit_called: false,
3327 post_market_exit_called: false,
3328 }
3329 }
3330 }
3331
3332 impl DataActor for MarketExitHookTrackingStrategy {}
3333
3334 nautilus_strategy!(MarketExitHookTrackingStrategy, {
3335 fn on_market_exit(&mut self) {
3336 self.on_market_exit_called = true;
3337 }
3338
3339 fn post_market_exit(&mut self) {
3340 self.post_market_exit_called = true;
3341 }
3342 });
3343
3344 #[rstest]
3345 fn test_market_exit_calls_on_market_exit_hook() {
3346 let config = StrategyConfig {
3347 strategy_id: Some(StrategyId::from("TEST-001")),
3348 order_id_tag: Some("001".to_string()),
3349 ..Default::default()
3350 };
3351 let mut strategy = MarketExitHookTrackingStrategy::new(config);
3352
3353 let trader_id = TraderId::from("TRADER-001");
3354 let clock = Rc::new(RefCell::new(TestClock::new()));
3355 let cache = Rc::new(RefCell::new(Cache::default()));
3356 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3357 cache.clone(),
3358 clock.clone(),
3359 None,
3360 )));
3361 strategy
3362 .core
3363 .register(trader_id, clock, cache, portfolio)
3364 .unwrap();
3365 strategy.initialize().unwrap();
3366 strategy.start().unwrap();
3367
3368 let _ = strategy.market_exit();
3369
3370 assert!(strategy.on_market_exit_called);
3371 }
3372
3373 #[rstest]
3374 fn test_finalize_market_exit_calls_post_market_exit_hook() {
3375 let config = StrategyConfig {
3376 strategy_id: Some(StrategyId::from("TEST-001")),
3377 order_id_tag: Some("001".to_string()),
3378 ..Default::default()
3379 };
3380 let mut strategy = MarketExitHookTrackingStrategy::new(config);
3381
3382 let trader_id = TraderId::from("TRADER-001");
3383 let clock = Rc::new(RefCell::new(TestClock::new()));
3384 let cache = Rc::new(RefCell::new(Cache::default()));
3385 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3386 cache.clone(),
3387 clock.clone(),
3388 None,
3389 )));
3390 strategy
3391 .core
3392 .register(trader_id, clock, cache, portfolio)
3393 .unwrap();
3394
3395 strategy.core.is_exiting = true;
3396 strategy.finalize_market_exit();
3397
3398 assert!(strategy.post_market_exit_called);
3399 }
3400
3401 #[derive(Debug)]
3402 struct FailingPostExitStrategy {
3403 core: StrategyCore,
3404 }
3405
3406 impl FailingPostExitStrategy {
3407 fn new(config: StrategyConfig) -> Self {
3408 Self {
3409 core: StrategyCore::new(config),
3410 }
3411 }
3412 }
3413
3414 impl DataActor for FailingPostExitStrategy {}
3415
3416 nautilus_strategy!(FailingPostExitStrategy, {
3417 fn post_market_exit(&mut self) {
3418 panic!("Simulated error in post_market_exit");
3419 }
3420 });
3421
3422 #[rstest]
3423 fn test_finalize_market_exit_handles_hook_panic() {
3424 let config = StrategyConfig {
3425 strategy_id: Some(StrategyId::from("TEST-001")),
3426 order_id_tag: Some("001".to_string()),
3427 ..Default::default()
3428 };
3429 let mut strategy = FailingPostExitStrategy::new(config);
3430
3431 let trader_id = TraderId::from("TRADER-001");
3432 let clock = Rc::new(RefCell::new(TestClock::new()));
3433 let cache = Rc::new(RefCell::new(Cache::default()));
3434 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3435 cache.clone(),
3436 clock.clone(),
3437 None,
3438 )));
3439 strategy
3440 .core
3441 .register(trader_id, clock, cache, portfolio)
3442 .unwrap();
3443
3444 strategy.core.is_exiting = true;
3445 strategy.core.pending_stop = true;
3446
3447 strategy.finalize_market_exit();
3449
3450 assert!(!strategy.core.is_exiting);
3452 assert!(!strategy.core.pending_stop);
3453 }
3454
3455 #[rstest]
3456 fn test_check_market_exit_increments_attempts_before_finalizing() {
3457 let mut strategy = create_test_strategy();
3458 register_strategy(&mut strategy);
3459
3460 strategy.core.is_exiting = true;
3461 assert_eq!(strategy.core.market_exit_attempts, 0);
3462
3463 let event = TimeEvent::new(
3464 Ustr::from("MARKET_EXIT_CHECK:TEST-001"),
3465 UUID4::new(),
3466 UnixNanos::default(),
3467 UnixNanos::default(),
3468 );
3469 strategy.check_market_exit(event);
3470
3471 assert!(!strategy.core.is_exiting);
3475 assert_eq!(strategy.core.market_exit_attempts, 0);
3476 }
3477
3478 #[rstest]
3479 fn test_check_market_exit_finalizes_when_max_attempts_reached() {
3480 let config = StrategyConfig {
3481 strategy_id: Some(StrategyId::from("TEST-001")),
3482 order_id_tag: Some("001".to_string()),
3483 market_exit_max_attempts: 3,
3484 ..Default::default()
3485 };
3486 let mut strategy = TestStrategy::new(config);
3487 register_strategy(&mut strategy);
3488
3489 strategy.core.is_exiting = true;
3490 strategy.core.market_exit_attempts = 2; let event = TimeEvent::new(
3493 Ustr::from("MARKET_EXIT_CHECK:TEST-001"),
3494 UUID4::new(),
3495 UnixNanos::default(),
3496 UnixNanos::default(),
3497 );
3498 strategy.check_market_exit(event);
3499
3500 assert!(!strategy.core.is_exiting);
3502 assert_eq!(strategy.core.market_exit_attempts, 0);
3503 }
3504
3505 #[rstest]
3506 fn test_check_market_exit_finalizes_when_no_orders_or_positions() {
3507 let mut strategy = create_test_strategy();
3508 register_strategy(&mut strategy);
3509
3510 strategy.core.is_exiting = true;
3511
3512 let event = TimeEvent::new(
3513 Ustr::from("MARKET_EXIT_CHECK:TEST-001"),
3514 UUID4::new(),
3515 UnixNanos::default(),
3516 UnixNanos::default(),
3517 );
3518 strategy.check_market_exit(event);
3519
3520 assert!(!strategy.core.is_exiting);
3522 }
3523
3524 #[rstest]
3525 fn test_market_exit_timer_name_format() {
3526 let config = StrategyConfig {
3527 strategy_id: Some(StrategyId::from("MY-STRATEGY-001")),
3528 ..Default::default()
3529 };
3530 let strategy = TestStrategy::new(config);
3531
3532 assert_eq!(
3533 strategy.core.market_exit_timer_name.as_str(),
3534 "MARKET_EXIT_CHECK:MY-STRATEGY-001"
3535 );
3536 }
3537
3538 #[rstest]
3539 fn test_reset_market_exit_state() {
3540 let mut strategy = create_test_strategy();
3541
3542 strategy.core.is_exiting = true;
3543 strategy.core.pending_stop = true;
3544 strategy.core.market_exit_attempts = 50;
3545
3546 strategy.core.reset_market_exit_state();
3547
3548 assert!(!strategy.core.is_exiting);
3549 assert!(!strategy.core.pending_stop);
3550 assert_eq!(strategy.core.market_exit_attempts, 0);
3551 }
3552
3553 #[rstest]
3554 fn test_cancel_market_exit_resets_state_without_hooks() {
3555 let config = StrategyConfig {
3556 strategy_id: Some(StrategyId::from("TEST-001")),
3557 order_id_tag: Some("001".to_string()),
3558 ..Default::default()
3559 };
3560 let mut strategy = MarketExitHookTrackingStrategy::new(config);
3561
3562 let trader_id = TraderId::from("TRADER-001");
3563 let clock = Rc::new(RefCell::new(TestClock::new()));
3564 let cache = Rc::new(RefCell::new(Cache::default()));
3565 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3566 cache.clone(),
3567 clock.clone(),
3568 None,
3569 )));
3570 strategy
3571 .core
3572 .register(trader_id, clock, cache, portfolio)
3573 .unwrap();
3574
3575 strategy.core.is_exiting = true;
3577 strategy.core.pending_stop = true;
3578 strategy.core.market_exit_attempts = 50;
3579
3580 strategy.cancel_market_exit();
3582
3583 assert!(!strategy.core.is_exiting);
3585 assert!(!strategy.core.pending_stop);
3586 assert_eq!(strategy.core.market_exit_attempts, 0);
3587
3588 assert!(!strategy.on_market_exit_called);
3590 assert!(!strategy.post_market_exit_called);
3591 }
3592
3593 #[rstest]
3594 fn test_market_exit_returns_early_when_not_running() {
3595 let mut strategy = create_test_strategy();
3596 register_strategy(&mut strategy);
3597
3598 assert_ne!(strategy.core.actor.state(), ComponentState::Running);
3600
3601 let result = strategy.market_exit();
3602
3603 assert!(result.is_ok());
3605 assert!(!strategy.core.is_exiting);
3606 }
3607
3608 #[rstest]
3609 fn test_stop_with_manage_stop_false_cleans_up_active_exit() {
3610 let config = StrategyConfig {
3611 strategy_id: Some(StrategyId::from("TEST-001")),
3612 order_id_tag: Some("001".to_string()),
3613 manage_stop: false,
3614 ..Default::default()
3615 };
3616 let mut strategy = TestStrategy::new(config);
3617 register_strategy(&mut strategy);
3618
3619 strategy.core.is_exiting = true;
3621 strategy.core.market_exit_attempts = 5;
3622
3623 let should_proceed = Strategy::stop(&mut strategy);
3625
3626 assert!(should_proceed);
3628 assert!(!strategy.core.is_exiting);
3629 assert_eq!(strategy.core.market_exit_attempts, 0);
3630 }
3631
3632 #[rstest]
3633 fn test_stop_with_manage_stop_true_defers_when_running() {
3634 let config = StrategyConfig {
3635 strategy_id: Some(StrategyId::from("TEST-001")),
3636 order_id_tag: Some("001".to_string()),
3637 manage_stop: true,
3638 ..Default::default()
3639 };
3640 let mut strategy = TestStrategy::new(config);
3641
3642 let trader_id = TraderId::from("TRADER-001");
3644 let clock = Rc::new(RefCell::new(TestClock::new()));
3645 clock
3646 .borrow_mut()
3647 .register_default_handler(TimeEventCallback::from(|_event: TimeEvent| {}));
3648 let cache = Rc::new(RefCell::new(Cache::default()));
3649 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3650 cache.clone(),
3651 clock.clone(),
3652 None,
3653 )));
3654 strategy
3655 .core
3656 .register(trader_id, clock, cache, portfolio)
3657 .unwrap();
3658 strategy.initialize().unwrap();
3659 strategy.start().unwrap();
3660
3661 let should_proceed = Strategy::stop(&mut strategy);
3662
3663 assert!(!should_proceed);
3665 assert!(strategy.core.pending_stop);
3666 }
3667
3668 #[rstest]
3669 fn test_stop_with_manage_stop_true_returns_early_if_pending() {
3670 let config = StrategyConfig {
3671 strategy_id: Some(StrategyId::from("TEST-001")),
3672 order_id_tag: Some("001".to_string()),
3673 manage_stop: true,
3674 ..Default::default()
3675 };
3676 let mut strategy = TestStrategy::new(config);
3677 register_strategy(&mut strategy);
3678 start_strategy(&mut strategy);
3679 strategy.core.pending_stop = true;
3680
3681 let should_proceed = Strategy::stop(&mut strategy);
3683
3684 assert!(!should_proceed);
3686 assert!(strategy.core.pending_stop);
3687 }
3688
3689 #[rstest]
3690 fn test_stop_with_manage_stop_true_proceeds_when_not_running() {
3691 let config = StrategyConfig {
3692 strategy_id: Some(StrategyId::from("TEST-001")),
3693 order_id_tag: Some("001".to_string()),
3694 manage_stop: true,
3695 ..Default::default()
3696 };
3697 let mut strategy = TestStrategy::new(config);
3698 register_strategy(&mut strategy);
3699
3700 assert_ne!(strategy.core.actor.state(), ComponentState::Running);
3702
3703 let should_proceed = Strategy::stop(&mut strategy);
3704
3705 assert!(should_proceed);
3707 }
3708
3709 #[rstest]
3710 fn test_finalize_market_exit_stops_strategy_when_pending() {
3711 let config = StrategyConfig {
3712 strategy_id: Some(StrategyId::from("TEST-001")),
3713 order_id_tag: Some("001".to_string()),
3714 ..Default::default()
3715 };
3716 let mut strategy = TestStrategy::new(config);
3717 register_strategy(&mut strategy);
3718 start_strategy(&mut strategy);
3719
3720 strategy.core.is_exiting = true;
3722 strategy.core.pending_stop = true;
3723
3724 strategy.finalize_market_exit();
3725
3726 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
3728 assert!(!strategy.core.is_exiting);
3729 assert!(!strategy.core.pending_stop);
3730 }
3731
3732 #[rstest]
3733 fn test_finalize_market_exit_stays_running_when_not_pending() {
3734 let config = StrategyConfig {
3735 strategy_id: Some(StrategyId::from("TEST-001")),
3736 order_id_tag: Some("001".to_string()),
3737 ..Default::default()
3738 };
3739 let mut strategy = TestStrategy::new(config);
3740 register_strategy(&mut strategy);
3741 start_strategy(&mut strategy);
3742
3743 strategy.core.is_exiting = true;
3745 strategy.core.pending_stop = false;
3746
3747 strategy.finalize_market_exit();
3748
3749 assert_eq!(strategy.core.actor.state(), ComponentState::Running);
3751 assert!(!strategy.core.is_exiting);
3752 }
3753
3754 #[rstest]
3755 fn test_submit_order_denied_during_market_exit_when_not_reduce_only() {
3756 let mut strategy = create_test_strategy();
3757 register_strategy(&mut strategy);
3758 start_strategy(&mut strategy);
3759 strategy.core.is_exiting = true;
3760
3761 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
3762 get_typed_message_saving_handler(Some(Ustr::from("events.order.denied")));
3763 let order = OrderAny::Market(MarketOrder::new(
3764 TraderId::from("TRADER-001"),
3765 StrategyId::from("TEST-001"),
3766 InstrumentId::from("BTCUSDT.BINANCE"),
3767 ClientOrderId::from("O-20250208-0001"),
3768 OrderSide::Buy,
3769 Quantity::from(100_000),
3770 TimeInForce::Gtc,
3771 UUID4::new(),
3772 UnixNanos::default(),
3773 false, false,
3775 None,
3776 None,
3777 None,
3778 None,
3779 None,
3780 None,
3781 None,
3782 None,
3783 ));
3784 let topic = format!("events.order.{}", order.strategy_id());
3785 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
3786 let client_order_id = order.client_order_id();
3787 let result = strategy.submit_order(order.clone(), None, None, None);
3788
3789 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
3790
3791 assert!(result.is_ok());
3792 let cache = strategy.core.cache();
3793 let cached_order = cache.order(&client_order_id).unwrap();
3794 assert_eq!(cached_order.status(), OrderStatus::Denied);
3795
3796 let event_messages = event_messages.get_messages();
3797 assert_eq!(event_messages.len(), 2);
3798 assert_eq!(
3799 event_messages[0],
3800 OrderEventAny::Initialized(order.init_event().clone())
3801 );
3802 let OrderEventAny::Denied(denied) = &event_messages[1] else {
3803 panic!("expected OrderDenied event");
3804 };
3805 assert_eq!(denied.reason, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
3806 }
3807
3808 #[rstest]
3809 fn test_submit_order_list_denied_during_market_exit_publishes_init_then_denied_events() {
3810 let mut strategy = create_test_strategy();
3811 register_strategy(&mut strategy);
3812 start_strategy(&mut strategy);
3813 strategy.core.is_exiting = true;
3814
3815 let orders = vec![
3816 make_initialized_market_order("O-20250208-LIST-DENY-001"),
3817 make_initialized_market_order("O-20250208-LIST-DENY-002"),
3818 ];
3819 let client_order_id1 = orders[0].client_order_id();
3820 let client_order_id2 = orders[1].client_order_id();
3821 let cache_rc = strategy.core.cache_rc();
3822 let timeline = Rc::new(RefCell::new(Vec::new()));
3823 let event_messages = Rc::new(RefCell::new(Vec::new()));
3824
3825 let event_handler = {
3826 let event_messages = event_messages.clone();
3827 let timeline = timeline.clone();
3828 TypedHandler::from_with_id("events.order.list_denied", move |event: &OrderEventAny| {
3829 match event {
3830 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id1 => {
3831 assert!(cache_rc.borrow().order_exists(&client_order_id1));
3832 timeline.borrow_mut().push("init1");
3833 }
3834 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id2 => {
3835 assert!(cache_rc.borrow().order_exists(&client_order_id2));
3836 timeline.borrow_mut().push("init2");
3837 }
3838 OrderEventAny::Denied(e) if e.client_order_id == client_order_id1 => {
3839 assert_eq!(e.reason, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
3840 let cache = cache_rc.borrow();
3841 let cached_order = cache.order(&client_order_id1).unwrap();
3842 assert_eq!(cached_order.status(), OrderStatus::Denied);
3843 timeline.borrow_mut().push("denied1");
3844 }
3845 OrderEventAny::Denied(e) if e.client_order_id == client_order_id2 => {
3846 assert_eq!(e.reason, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
3847 let cache = cache_rc.borrow();
3848 let cached_order = cache.order(&client_order_id2).unwrap();
3849 assert_eq!(cached_order.status(), OrderStatus::Denied);
3850 timeline.borrow_mut().push("denied2");
3851 }
3852 _ => panic!("unexpected order event {event:?}"),
3853 }
3854 event_messages.borrow_mut().push(event.clone());
3855 })
3856 };
3857 let risk_handler = {
3858 let timeline = timeline.clone();
3859 TypedIntoHandler::from_with_id(
3860 "RiskEngine.queue_execute",
3861 move |_command: TradingCommand| {
3862 timeline.borrow_mut().push("command");
3863 },
3864 )
3865 };
3866 msgbus::register_trading_command_endpoint(
3867 MessagingSwitchboard::risk_engine_queue_execute(),
3868 risk_handler,
3869 );
3870
3871 let topic = format!("events.order.{}", orders[0].strategy_id());
3872 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
3873 let result = strategy.submit_order_list(orders.clone(), None, None, None);
3874
3875 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
3876
3877 assert!(result.is_ok());
3878
3879 {
3880 let cache = strategy.core.cache();
3881 let cached_order1 = cache.order(&client_order_id1).unwrap();
3882 let cached_order2 = cache.order(&client_order_id2).unwrap();
3883 assert_eq!(cached_order1.status(), OrderStatus::Denied);
3884 assert_eq!(cached_order2.status(), OrderStatus::Denied);
3885 }
3886
3887 let event_messages = event_messages.borrow();
3888 assert_eq!(event_messages.len(), 4);
3889 assert_eq!(
3890 event_messages[0],
3891 OrderEventAny::Initialized(orders[0].init_event().clone())
3892 );
3893 assert!(matches!(
3894 &event_messages[1],
3895 OrderEventAny::Denied(e)
3896 if e.client_order_id == client_order_id1
3897 && e.reason == Ustr::from("MARKET_EXIT_IN_PROGRESS")
3898 ));
3899 assert_eq!(
3900 event_messages[2],
3901 OrderEventAny::Initialized(orders[1].init_event().clone())
3902 );
3903 assert!(matches!(
3904 &event_messages[3],
3905 OrderEventAny::Denied(e)
3906 if e.client_order_id == client_order_id2
3907 && e.reason == Ustr::from("MARKET_EXIT_IN_PROGRESS")
3908 ));
3909 assert_eq!(
3910 timeline.borrow().as_slice(),
3911 &["init1", "denied1", "init2", "denied2"]
3912 );
3913 }
3914
3915 #[rstest]
3916 fn test_submit_order_list_market_exit_rejects_non_initialized_without_events() {
3917 let mut strategy = create_test_strategy();
3918 register_strategy(&mut strategy);
3919 start_strategy(&mut strategy);
3920 strategy.core.is_exiting = true;
3921
3922 let order = make_accepted_market_order("O-20250208-LIST-DENY-ACCEPTED");
3923 let topic = format!("events.order.{}", order.strategy_id());
3924 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
3925 get_typed_message_saving_handler(Some(Ustr::from("events.order.list_invalid")));
3926
3927 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
3928 let result = strategy.submit_order_list(vec![order], None, None, None);
3929
3930 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
3931
3932 assert!(result.is_err());
3933 assert!(
3934 result
3935 .unwrap_err()
3936 .to_string()
3937 .contains("expected INITIALIZED")
3938 );
3939 assert!(event_messages.get_messages().is_empty());
3940 }
3941
3942 #[rstest]
3943 fn test_submit_order_allowed_during_market_exit_when_reduce_only() {
3944 let mut strategy = create_test_strategy();
3945 register_strategy(&mut strategy);
3946 start_strategy(&mut strategy);
3947 strategy.core.is_exiting = true;
3948
3949 let order = OrderAny::Market(MarketOrder::new(
3950 TraderId::from("TRADER-001"),
3951 StrategyId::from("TEST-001"),
3952 InstrumentId::from("BTCUSDT.BINANCE"),
3953 ClientOrderId::from("O-20250208-0001"),
3954 OrderSide::Buy,
3955 Quantity::from(100_000),
3956 TimeInForce::Gtc,
3957 UUID4::new(),
3958 UnixNanos::default(),
3959 true, false,
3961 None,
3962 None,
3963 None,
3964 None,
3965 None,
3966 None,
3967 None,
3968 None,
3969 ));
3970 let client_order_id = order.client_order_id();
3971 let result = strategy.submit_order(order, None, None, None);
3972
3973 assert!(result.is_ok());
3974 let cache = strategy.core.cache();
3975 let cached_order = cache.order(&client_order_id).unwrap();
3976 assert_ne!(cached_order.status(), OrderStatus::Denied);
3977 }
3978
3979 #[rstest]
3980 fn test_submit_order_allowed_during_market_exit_when_tagged() {
3981 let mut strategy = create_test_strategy();
3982 register_strategy(&mut strategy);
3983 start_strategy(&mut strategy);
3984 strategy.core.is_exiting = true;
3985
3986 let order = OrderAny::Market(MarketOrder::new(
3987 TraderId::from("TRADER-001"),
3988 StrategyId::from("TEST-001"),
3989 InstrumentId::from("BTCUSDT.BINANCE"),
3990 ClientOrderId::from("O-20250208-0002"),
3991 OrderSide::Buy,
3992 Quantity::from(100_000),
3993 TimeInForce::Gtc,
3994 UUID4::new(),
3995 UnixNanos::default(),
3996 false, false,
3998 None,
3999 None,
4000 None,
4001 None,
4002 None,
4003 None,
4004 None,
4005 Some(vec![Ustr::from("MARKET_EXIT")]),
4006 ));
4007 let client_order_id = order.client_order_id();
4008 let result = strategy.submit_order(order, None, None, None);
4009
4010 assert!(result.is_ok());
4011 let cache = strategy.core.cache();
4012 let cached_order = cache.order(&client_order_id).unwrap();
4013 assert_ne!(cached_order.status(), OrderStatus::Denied);
4014 }
4015
4016 #[derive(Debug)]
4017 struct MacroTestSimple {
4018 core: StrategyCore,
4019 }
4020
4021 nautilus_strategy!(MacroTestSimple);
4022
4023 impl DataActor for MacroTestSimple {}
4024
4025 #[derive(Debug)]
4026 struct MacroTestWithHooks {
4027 core: StrategyCore,
4028 }
4029
4030 nautilus_strategy!(MacroTestWithHooks, {
4031 fn on_order_rejected(&mut self, _event: OrderRejected) {}
4032 });
4033
4034 impl DataActor for MacroTestWithHooks {}
4035
4036 #[derive(Debug)]
4037 struct MacroTestCustomField {
4038 inner: StrategyCore,
4039 }
4040
4041 nautilus_strategy!(MacroTestCustomField, inner, {
4042 fn external_order_claims(&self) -> Option<Vec<InstrumentId>> {
4043 None
4044 }
4045 });
4046
4047 impl DataActor for MacroTestCustomField {}
4048
4049 #[rstest]
4050 fn test_nautilus_strategy_macro_forms() {
4051 let config = StrategyConfig {
4052 strategy_id: Some(StrategyId::from("MACRO-001")),
4053 order_id_tag: Some("001".to_string()),
4054 ..Default::default()
4055 };
4056
4057 let simple = MacroTestSimple {
4058 core: StrategyCore::new(config.clone()),
4059 };
4060 assert_eq!(simple.core().config.strategy_id, config.strategy_id);
4061
4062 let hooks = MacroTestWithHooks {
4063 core: StrategyCore::new(config.clone()),
4064 };
4065 assert_eq!(hooks.core().config.strategy_id, config.strategy_id);
4066
4067 let custom = MacroTestCustomField {
4068 inner: StrategyCore::new(config.clone()),
4069 };
4070 assert_eq!(custom.core().config.strategy_id, config.strategy_id);
4071 assert!(custom.external_order_claims().is_none());
4072 }
4073}