1pub mod config;
17pub mod core;
18
19pub use core::StrategyCore;
20use std::panic::{AssertUnwindSafe, catch_unwind};
21
22use ahash::AHashSet;
23pub use config::{ImportableStrategyConfig, StrategyConfig};
24use nautilus_common::{
25 actor::DataActor,
26 component::Component,
27 enums::ComponentState,
28 logging::{EVT, RECV},
29 messages::execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
31 SubmitOrder, SubmitOrderList, TradingCommand,
32 },
33 msgbus,
34 timer::TimeEvent,
35};
36use nautilus_core::{Params, UUID4};
37use nautilus_model::{
38 enums::{OrderSide, OrderStatus, PositionSide, TimeInForce, TriggerType},
39 events::{
40 OrderAccepted, OrderCancelRejected, OrderDenied, OrderEmulated, OrderEventAny,
41 OrderExpired, OrderInitialized, OrderModifyRejected, OrderPendingCancel,
42 OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted, OrderTriggered,
43 OrderUpdated, PositionChanged, PositionClosed, PositionEvent, PositionOpened,
44 },
45 identifiers::{
46 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId,
47 },
48 orders::{
49 LIMIT_ORDER_TYPES, Order, OrderAny, OrderCore, OrderError, OrderList, STOP_ORDER_TYPES,
50 },
51 position::Position,
52 types::{Price, Quantity},
53};
54use ustr::Ustr;
55
56pub trait Strategy: DataActor {
88 fn core(&self) -> &StrategyCore;
92
93 fn core_mut(&mut self) -> &mut StrategyCore;
97
98 fn external_order_claims(&self) -> Option<Vec<InstrumentId>> {
103 None
104 }
105
106 fn submit_order(
112 &mut self,
113 order: OrderAny,
114 position_id: Option<PositionId>,
115 client_id: Option<ClientId>,
116 params: Option<Params>,
117 ) -> anyhow::Result<()> {
118 let core = self.core_mut();
119
120 let trader_id = registered_trader_id(core)?;
121 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
122 let ts_init = core.clock().timestamp_ns();
123
124 if order.status() != OrderStatus::Initialized {
125 anyhow::bail!(
126 "Order denied: invalid status for {}, expected INITIALIZED",
127 order.client_order_id()
128 );
129 }
130
131 let market_exit_tag = core.market_exit_tag;
132 let is_market_exit_order = order
133 .tags()
134 .is_some_and(|tags| tags.contains(&market_exit_tag));
135 let should_deny_for_market_exit =
136 core.is_exiting && !order.is_reduce_only() && !is_market_exit_order;
137
138 if should_deny_for_market_exit {
139 self.deny_order(&order, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
140 return Ok(());
141 }
142
143 let core = self.core_mut();
144 let params = params.filter(|params| !params.is_empty());
145
146 {
147 let cache_rc = core.cache_rc();
148 let mut cache = cache_rc.borrow_mut();
149 cache.add_order(order.clone(), position_id, client_id, true)?;
150 }
151
152 publish_order_initialized(&order);
153
154 let command = SubmitOrder::new(
155 trader_id,
156 client_id,
157 strategy_id,
158 order.instrument_id(),
159 order.client_order_id(),
160 order.init_event().clone(),
161 order.exec_algorithm_id(),
162 position_id,
163 params,
164 UUID4::new(),
165 ts_init,
166 None, );
168
169 let manager = core.order_manager();
170
171 if matches!(order.emulation_trigger(), Some(trigger) if trigger != TriggerType::NoTrigger) {
172 manager.send_emulator_command(TradingCommand::SubmitOrder(command));
173 } else if let Some(exec_algorithm_id) = order.exec_algorithm_id() {
174 manager.send_algo_command(command, exec_algorithm_id);
175 } else {
176 manager.send_risk_command(TradingCommand::SubmitOrder(command));
177 }
178
179 self.set_gtd_expiry(&order)?;
180 Ok(())
181 }
182
183 fn submit_order_list(
190 &mut self,
191 mut orders: Vec<OrderAny>,
192 position_id: Option<PositionId>,
193 client_id: Option<ClientId>,
194 params: Option<Params>,
195 ) -> anyhow::Result<()> {
196 if orders.is_empty() {
197 log::error!("OrderList denied: no orders to submit");
198 anyhow::bail!("OrderList denied: no orders to submit");
199 }
200
201 for order in &orders {
202 if order.status() != OrderStatus::Initialized {
203 anyhow::bail!(
204 "Order in list denied: invalid status for {}, expected INITIALIZED",
205 order.client_order_id()
206 );
207 }
208 }
209
210 let first_venue = orders[0].instrument_id().venue;
211 for order in &orders {
212 if order.instrument_id().venue != first_venue {
213 anyhow::bail!(
214 "OrderList denied: orders must share the same venue; \
215 expected {first_venue}, found {} on {}",
216 order.instrument_id().venue,
217 order.client_order_id(),
218 );
219 }
220 }
221
222 let should_deny = {
223 let core = self.core_mut();
224 let tag = core.market_exit_tag;
225 core.is_exiting
226 && orders.iter().any(|o| {
227 !o.is_reduce_only() && !o.tags().is_some_and(|tags| tags.contains(&tag))
228 })
229 };
230
231 if should_deny {
232 self.deny_order_list(&orders, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
233 return Ok(());
234 }
235
236 let core = self.core_mut();
237
238 let trader_id = registered_trader_id(core)?;
239 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
240 let ts_init = core.clock().timestamp_ns();
241
242 let order_list = if orders.first().is_some_and(|o| o.order_list_id().is_some()) {
244 OrderList::from_orders(&orders, ts_init)
245 } else {
246 core.order_factory().create_list(&mut orders, ts_init)
247 };
248
249 if let Err(e) = order_list.validate() {
250 log::error!("OrderList denied: {e}");
251 anyhow::bail!("OrderList denied: {e}");
252 }
253
254 {
255 let cache_rc = core.cache_rc();
256 let cache = cache_rc.borrow();
257 if cache.order_list_exists(&order_list.id) {
258 anyhow::bail!("OrderList denied: duplicate {}", order_list.id);
259 }
260
261 for order in &orders {
262 if cache.order_exists(&order.client_order_id()) {
263 anyhow::bail!(
264 "Order in list denied: duplicate {}",
265 order.client_order_id()
266 );
267 }
268 }
269 }
270
271 {
272 let cache_rc = core.cache_rc();
273 let mut cache = cache_rc.borrow_mut();
274 cache.add_order_list(order_list.clone())?;
275 }
276
277 for order in &orders {
278 {
279 let cache_rc = core.cache_rc();
280 let mut cache = cache_rc.borrow_mut();
281 cache.add_order(order.clone(), position_id, client_id, true)?;
282 }
283
284 publish_order_initialized(order);
285 }
286
287 let params = params.filter(|params| !params.is_empty());
288
289 let first_order = orders.first();
290 let order_inits: Vec<_> = orders.iter().map(|o| o.init_event().clone()).collect();
291 let exec_algorithm_id = first_order.and_then(|o| o.exec_algorithm_id());
292
293 let command = SubmitOrderList::new(
294 trader_id,
295 client_id,
296 strategy_id,
297 order_list,
298 order_inits,
299 exec_algorithm_id,
300 position_id,
301 params,
302 UUID4::new(),
303 ts_init,
304 None, );
306
307 let has_emulated_order = orders.iter().any(|o| {
308 matches!(o.emulation_trigger(), Some(trigger) if trigger != TriggerType::NoTrigger)
309 || o.is_emulated()
310 });
311
312 let manager = core.order_manager();
313
314 if has_emulated_order {
315 manager.send_emulator_command(TradingCommand::SubmitOrderList(command));
316 } else if let Some(algo_id) = exec_algorithm_id {
317 let endpoint = format!("{algo_id}.execute");
318 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrderList(command));
319 } else {
320 manager.send_risk_command(TradingCommand::SubmitOrderList(command));
321 }
322
323 for order in &orders {
324 self.set_gtd_expiry(order)?;
325 }
326
327 Ok(())
328 }
329
330 fn modify_order(
336 &mut self,
337 client_order_id: ClientOrderId,
338 quantity: Option<Quantity>,
339 price: Option<Price>,
340 trigger_price: Option<Price>,
341 client_id: Option<ClientId>,
342 params: Option<Params>,
343 ) -> anyhow::Result<()> {
344 let (trader_id, strategy_id) = {
345 let core = self.core_mut();
346 (
347 registered_trader_id(core)?,
348 StrategyId::from(core.actor_id().inner().as_str()),
349 )
350 };
351
352 let params = params.filter(|params| !params.is_empty());
353
354 let order = match self
356 .core_mut()
357 .cache_rc()
358 .borrow()
359 .order_owned(&client_order_id)
360 {
361 Some(order) => order,
362 None => anyhow::bail!("Cannot modify order: {client_order_id} not found in cache"),
363 };
364
365 let mut updating = false;
366
367 if quantity.is_some_and(|q| q != order.quantity()) {
368 updating = true;
369 }
370
371 if let Some(price) = price {
372 if !LIMIT_ORDER_TYPES.contains(&order.order_type()) {
373 anyhow::bail!("{} orders do not have a LIMIT price", order.order_type());
374 }
375
376 if Some(price) != order.price() {
377 updating = true;
378 }
379 }
380
381 if let Some(trigger_price) = trigger_price {
382 if !STOP_ORDER_TYPES.contains(&order.order_type()) {
383 anyhow::bail!(
384 "{} orders do not have a STOP trigger price",
385 order.order_type()
386 );
387 }
388
389 if Some(trigger_price) != order.trigger_price() {
390 updating = true;
391 }
392 }
393
394 if !updating {
395 log::error!(
396 "Cannot create command ModifyOrder: quantity, price and trigger were either None \
397 or the same as existing values"
398 );
399 return Ok(());
400 }
401
402 if order.is_closed() || order.is_pending_cancel() {
403 log::warn!(
404 "Cannot create command ModifyOrder: state is {:?}, {order:?}",
405 order.status()
406 );
407 return Ok(());
408 }
409
410 if !self.mark_order_pending_update(&order)? {
411 return Ok(());
412 }
413
414 let command = ModifyOrder::new(
415 trader_id,
416 client_id,
417 strategy_id,
418 order.instrument_id(),
419 order.client_order_id(),
420 order.venue_order_id(),
421 quantity,
422 price,
423 trigger_price,
424 UUID4::new(),
425 self.core_mut().clock().timestamp_ns(),
426 params,
427 None, );
429
430 let manager = self.core_mut().order_manager();
431
432 if order.is_emulated() {
433 manager.send_emulator_command(TradingCommand::ModifyOrder(command));
434 } else {
435 manager.send_risk_command(TradingCommand::ModifyOrder(command));
436 }
437 Ok(())
438 }
439
440 fn cancel_order(
446 &mut self,
447 client_order_id: ClientOrderId,
448 client_id: Option<ClientId>,
449 params: Option<Params>,
450 ) -> anyhow::Result<()> {
451 let (trader_id, strategy_id, ts_init) = {
452 let core = self.core_mut();
453 (
454 registered_trader_id(core)?,
455 StrategyId::from(core.actor_id().inner().as_str()),
456 core.clock().timestamp_ns(),
457 )
458 };
459
460 let params = params.filter(|params| !params.is_empty());
461
462 let order = match self
466 .core_mut()
467 .cache_rc()
468 .borrow()
469 .order_owned(&client_order_id)
470 {
471 Some(order) => order,
472 None => anyhow::bail!("Cannot cancel order: {client_order_id} not found in cache"),
473 };
474
475 if !self.mark_order_pending_cancel(&order)? {
476 return Ok(());
477 }
478
479 let command = CancelOrder::new(
480 trader_id,
481 client_id,
482 strategy_id,
483 order.instrument_id(),
484 order.client_order_id(),
485 order.venue_order_id(),
486 UUID4::new(),
487 ts_init,
488 params,
489 None, );
491
492 let manager = self.core_mut().order_manager();
493
494 if matches!(order.emulation_trigger(), Some(trigger) if trigger != TriggerType::NoTrigger)
495 || order.is_emulated()
496 {
497 manager.send_emulator_command(TradingCommand::CancelOrder(command));
498 } else if let Some(algo_id) = order
499 .exec_algorithm_id()
500 .filter(|_| order.is_active_local())
501 {
502 let endpoint = format!("{algo_id}.execute");
503 msgbus::send_any(endpoint.into(), &TradingCommand::CancelOrder(command));
504 } else {
505 manager.send_exec_command(TradingCommand::CancelOrder(command));
506 }
507
508 if self.core().config.manage_gtd_expiry
509 && order.time_in_force() == TimeInForce::Gtd
510 && self.has_gtd_expiry_timer(&order.client_order_id())
511 {
512 self.cancel_gtd_expiry(&order.client_order_id());
513 }
514
515 Ok(())
516 }
517
518 fn cancel_orders(
525 &mut self,
526 client_order_ids: Vec<ClientOrderId>,
527 client_id: Option<ClientId>,
528 params: Option<Params>,
529 ) -> anyhow::Result<()> {
530 if client_order_ids.is_empty() {
531 anyhow::bail!("Cannot batch cancel empty order list");
532 }
533
534 let (trader_id, strategy_id, ts_init) = {
535 let core = self.core_mut();
536 (
537 registered_trader_id(core)?,
538 StrategyId::from(core.actor_id().inner().as_str()),
539 core.clock().timestamp_ns(),
540 )
541 };
542
543 let orders: Vec<OrderAny> = {
545 let cache_rc = self.core_mut().cache_rc();
546 let cache = cache_rc.borrow();
547 client_order_ids
548 .iter()
549 .map(|id| {
550 cache.order_owned(id).ok_or_else(|| {
551 anyhow::anyhow!("Cannot cancel order: {id} not found in cache")
552 })
553 })
554 .collect::<Result<_, _>>()?
555 };
556
557 let instrument_id = orders[0].instrument_id();
558
559 for order in &orders {
560 if order.instrument_id() != instrument_id {
561 anyhow::bail!(
562 "Cannot batch cancel orders for different instruments: {} vs {}",
563 instrument_id,
564 order.instrument_id()
565 );
566 }
567
568 if order.is_emulated() || order.is_active_local() {
569 anyhow::bail!("Cannot include emulated or local orders in batch cancel");
570 }
571 }
572
573 let mut cancels = Vec::with_capacity(orders.len());
574
575 for order in orders {
576 if !self.mark_order_pending_cancel(&order)? {
577 continue;
578 }
579
580 cancels.push(CancelOrder::new(
581 trader_id,
582 client_id,
583 strategy_id,
584 instrument_id,
585 order.client_order_id(),
586 order.venue_order_id(),
587 UUID4::new(),
588 ts_init,
589 params.clone(),
590 None, ));
592 }
593
594 if cancels.is_empty() {
595 log::warn!("Cannot send `BatchCancelOrders`, no valid cancel commands");
596 return Ok(());
597 }
598
599 let manager = self.core_mut().order_manager();
600 let command = BatchCancelOrders::new(
601 trader_id,
602 client_id,
603 strategy_id,
604 instrument_id,
605 cancels,
606 UUID4::new(),
607 ts_init,
608 params,
609 None, );
611
612 manager.send_exec_command(TradingCommand::BatchCancelOrders(command));
613 Ok(())
614 }
615
616 fn mark_order_pending_update(&mut self, order: &OrderAny) -> anyhow::Result<bool> {
622 if order.is_active_local() {
623 return Ok(true);
624 }
625
626 let strategy_id = order.strategy_id();
627 required_account_id(order, "pending update")?;
628 let event = OrderEventAny::PendingUpdate(self.generate_order_pending_update(order));
629
630 {
631 let cache_rc = self.core_mut().cache_rc();
632 let mut cache = cache_rc.borrow_mut();
633 match cache.update_order(&event) {
634 Ok(_) => {}
635 Err(e)
636 if matches!(
637 e.downcast_ref::<OrderError>(),
638 Some(OrderError::InvalidStateTransition)
639 ) =>
640 {
641 log::warn!("InvalidStateTrigger: {e}, did not apply pending update event");
642 return Ok(false);
643 }
644 Err(e) => return Err(e),
645 }
646 }
647
648 let topic = format!("events.order.{strategy_id}");
649 msgbus::publish_order_event(topic.into(), &event);
650
651 Ok(true)
652 }
653
654 fn mark_order_pending_cancel(&mut self, order: &OrderAny) -> anyhow::Result<bool> {
660 if order.is_closed() || order.is_pending_cancel() {
661 log::warn!(
662 "Cannot cancel order: state is {:?}, {order:?}",
663 order.status()
664 );
665 return Ok(false);
666 }
667
668 if order.is_active_local() {
669 return Ok(true);
670 }
671
672 let strategy_id = order.strategy_id();
673 required_account_id(order, "pending cancel")?;
674 let event = OrderEventAny::PendingCancel(self.generate_order_pending_cancel(order));
675
676 {
677 let cache_rc = self.core_mut().cache_rc();
678 let mut cache = cache_rc.borrow_mut();
679 match cache.update_order(&event) {
680 Ok(_) => {}
681 Err(e)
682 if matches!(
683 e.downcast_ref::<OrderError>(),
684 Some(OrderError::InvalidStateTransition)
685 ) =>
686 {
687 log::warn!("InvalidStateTrigger: {e}, did not apply pending cancel event");
688 return Ok(false);
689 }
690 Err(e) => return Err(e),
691 }
692 cache.update_order_pending_cancel_local(order);
693 }
694
695 let topic = format!("events.order.{strategy_id}");
696 msgbus::publish_order_event(topic.into(), &event);
697
698 Ok(true)
699 }
700
701 fn generate_order_pending_update(&mut self, order: &OrderAny) -> OrderPendingUpdate {
703 let ts_now = self.core_mut().clock().timestamp_ns();
704
705 OrderPendingUpdate::new(
706 order.trader_id(),
707 order.strategy_id(),
708 order.instrument_id(),
709 order.client_order_id(),
710 order
711 .account_id()
712 .expect("Order must have account_id for pending update"),
713 UUID4::new(),
714 ts_now,
715 ts_now,
716 false,
717 order.venue_order_id(),
718 )
719 }
720
721 fn generate_order_pending_cancel(&mut self, order: &OrderAny) -> OrderPendingCancel {
723 let ts_now = self.core_mut().clock().timestamp_ns();
724
725 OrderPendingCancel::new(
726 order.trader_id(),
727 order.strategy_id(),
728 order.instrument_id(),
729 order.client_order_id(),
730 order
731 .account_id()
732 .expect("Order must have account_id for pending cancel"),
733 UUID4::new(),
734 ts_now,
735 ts_now,
736 false,
737 order.venue_order_id(),
738 )
739 }
740
741 fn cancel_all_orders(
747 &mut self,
748 instrument_id: InstrumentId,
749 order_side: Option<OrderSide>,
750 client_id: Option<ClientId>,
751 params: Option<Params>,
752 ) -> anyhow::Result<()> {
753 let params = params.filter(|params| !params.is_empty());
754 let core = self.core_mut();
755
756 let trader_id = registered_trader_id(core)?;
757 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
758 let ts_init = core.clock().timestamp_ns();
759 let cache = core.cache();
760
761 let open_count = cache.orders_open_count(
762 None,
763 Some(&instrument_id),
764 Some(&strategy_id),
765 None,
766 order_side,
767 );
768
769 let emulated_count = cache.orders_emulated_count(
770 None,
771 Some(&instrument_id),
772 Some(&strategy_id),
773 None,
774 order_side,
775 );
776
777 let inflight_count = cache.orders_inflight_count(
778 None,
779 Some(&instrument_id),
780 Some(&strategy_id),
781 None,
782 order_side,
783 );
784
785 let mut exec_algorithm_ids: Vec<_> = cache.exec_algorithm_ids().into_iter().collect();
789 exec_algorithm_ids.sort();
790 let mut algo_orders: Vec<OrderAny> = Vec::new();
791
792 for algo_id in &exec_algorithm_ids {
793 algo_orders.extend(
794 cache
795 .orders_for_exec_algorithm(
796 algo_id,
797 None,
798 Some(&instrument_id),
799 Some(&strategy_id),
800 None,
801 order_side,
802 )
803 .into_iter()
804 .map(|o| o.clone()),
805 );
806 }
807
808 let algo_count = algo_orders.len();
809
810 drop(cache);
811
812 if open_count == 0 && emulated_count == 0 && inflight_count == 0 && algo_count == 0 {
813 let side_str = order_side.map(|s| format!(" {s}")).unwrap_or_default();
814 log::info!("No {instrument_id} open, emulated, or inflight{side_str} orders to cancel");
815 return Ok(());
816 }
817
818 let manager = core.order_manager();
819
820 let side_str = order_side.map(|s| format!(" {s}")).unwrap_or_default();
821
822 if open_count > 0 {
823 log::info!(
824 "Canceling {open_count} open{side_str} {instrument_id} order{}",
825 if open_count == 1 { "" } else { "s" }
826 );
827 }
828
829 if emulated_count > 0 {
830 log::info!(
831 "Canceling {emulated_count} emulated{side_str} {instrument_id} order{}",
832 if emulated_count == 1 { "" } else { "s" }
833 );
834 }
835
836 if inflight_count > 0 {
837 log::info!(
838 "Canceling {inflight_count} inflight{side_str} {instrument_id} order{}",
839 if inflight_count == 1 { "" } else { "s" }
840 );
841 }
842
843 if open_count > 0 || inflight_count > 0 {
844 let command = CancelAllOrders::new(
845 trader_id,
846 client_id,
847 strategy_id,
848 instrument_id,
849 order_side.unwrap_or(OrderSide::NoOrderSide),
850 UUID4::new(),
851 ts_init,
852 params.clone(),
853 None, );
855
856 manager.send_exec_command(TradingCommand::CancelAllOrders(command));
857 }
858
859 if emulated_count > 0 {
860 let command = CancelAllOrders::new(
861 trader_id,
862 client_id,
863 strategy_id,
864 instrument_id,
865 order_side.unwrap_or(OrderSide::NoOrderSide),
866 UUID4::new(),
867 ts_init,
868 params,
869 None, );
871
872 manager.send_emulator_command(TradingCommand::CancelAllOrders(command));
873 }
874
875 for order in algo_orders {
876 self.cancel_order(order.client_order_id(), client_id, None)?;
877 }
878
879 Ok(())
880 }
881
882 fn close_position(
888 &mut self,
889 position: &Position,
890 client_id: Option<ClientId>,
891 tags: Option<Vec<Ustr>>,
892 time_in_force: Option<TimeInForce>,
893 reduce_only: Option<bool>,
894 quote_quantity: Option<bool>,
895 ) -> anyhow::Result<()> {
896 let core = self.core_mut();
897
898 if position.is_closed() {
899 log::warn!("Cannot close position (already closed): {}", position.id);
900 return Ok(());
901 }
902
903 let closing_side = OrderCore::closing_side(position.side);
904
905 let order = core.order_factory().market(
906 position.instrument_id,
907 closing_side,
908 position.quantity,
909 time_in_force,
910 reduce_only.or(Some(true)),
911 quote_quantity,
912 None,
913 None,
914 tags,
915 None,
916 );
917
918 self.submit_order(order, Some(position.id), client_id, None)
919 }
920
921 #[expect(clippy::too_many_arguments)]
927 fn close_all_positions(
928 &mut self,
929 instrument_id: InstrumentId,
930 position_side: Option<PositionSide>,
931 client_id: Option<ClientId>,
932 tags: Option<Vec<Ustr>>,
933 time_in_force: Option<TimeInForce>,
934 reduce_only: Option<bool>,
935 quote_quantity: Option<bool>,
936 ) -> anyhow::Result<()> {
937 let core = self.core_mut();
938 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
939 let cache = core.cache();
940
941 let positions_open = cache.positions_open(
942 None,
943 Some(&instrument_id),
944 Some(&strategy_id),
945 None,
946 position_side,
947 );
948
949 let side_str = position_side.map(|s| format!(" {s}")).unwrap_or_default();
950
951 if positions_open.is_empty() {
952 log::info!("No {instrument_id} open{side_str} positions to close");
953 return Ok(());
954 }
955
956 let count = positions_open.len();
957 log::info!(
958 "Closing {count} open{side_str} position{}",
959 if count == 1 { "" } else { "s" }
960 );
961
962 let positions_data: Vec<_> = positions_open
963 .iter()
964 .map(|p| (p.id, p.instrument_id, p.side, p.quantity, p.is_closed()))
965 .collect();
966 drop(positions_open);
967
968 drop(cache);
969
970 for (pos_id, pos_instrument_id, pos_side, pos_quantity, is_closed) in positions_data {
971 if is_closed {
972 continue;
973 }
974
975 let core = self.core_mut();
976 let closing_side = OrderCore::closing_side(pos_side);
977 let order = core.order_factory().market(
978 pos_instrument_id,
979 closing_side,
980 pos_quantity,
981 time_in_force,
982 reduce_only.or(Some(true)),
983 quote_quantity,
984 None,
985 None,
986 tags.clone(),
987 None,
988 );
989
990 self.submit_order(order, Some(pos_id), client_id, None)?;
991 }
992
993 Ok(())
994 }
995
996 fn query_account(
1005 &mut self,
1006 account_id: AccountId,
1007 client_id: Option<ClientId>,
1008 params: Option<Params>,
1009 ) -> anyhow::Result<()> {
1010 let core = self.core_mut();
1011
1012 let trader_id = registered_trader_id(core)?;
1013 let ts_init = core.clock().timestamp_ns();
1014
1015 let command = QueryAccount::new(
1016 trader_id,
1017 client_id,
1018 account_id,
1019 UUID4::new(),
1020 ts_init,
1021 params,
1022 None, );
1024
1025 core.order_manager()
1026 .send_exec_command(TradingCommand::QueryAccount(command));
1027 Ok(())
1028 }
1029
1030 fn query_order(
1039 &mut self,
1040 order: &OrderAny,
1041 client_id: Option<ClientId>,
1042 params: Option<Params>,
1043 ) -> anyhow::Result<()> {
1044 let core = self.core_mut();
1045
1046 let trader_id = registered_trader_id(core)?;
1047 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1048 let ts_init = core.clock().timestamp_ns();
1049
1050 let command = QueryOrder::new(
1051 trader_id,
1052 client_id,
1053 strategy_id,
1054 order.instrument_id(),
1055 order.client_order_id(),
1056 order.venue_order_id(),
1057 UUID4::new(),
1058 ts_init,
1059 params,
1060 None, );
1062
1063 core.order_manager()
1064 .send_exec_command(TradingCommand::QueryOrder(command));
1065 Ok(())
1066 }
1067
1068 fn handle_order_event(&mut self, event: OrderEventAny) {
1070 let state = {
1071 let core = self.core_mut();
1072 let id = &core.actor.actor_id;
1073 let is_warning = matches!(
1074 &event,
1075 OrderEventAny::Denied(_)
1076 | OrderEventAny::Rejected(_)
1077 | OrderEventAny::CancelRejected(_)
1078 | OrderEventAny::ModifyRejected(_)
1079 );
1080
1081 if is_warning {
1082 log::warn!("{id} {RECV}{EVT} {event}");
1083 } else if core.actor.config.log_events {
1084 log::info!("{id} {RECV}{EVT} {event}");
1085 }
1086
1087 core.actor.state()
1088 };
1089
1090 let client_order_id = event.client_order_id();
1091 let is_terminal = matches!(
1092 &event,
1093 OrderEventAny::Filled(_)
1094 | OrderEventAny::Canceled(_)
1095 | OrderEventAny::Rejected(_)
1096 | OrderEventAny::Expired(_)
1097 | OrderEventAny::Denied(_)
1098 );
1099
1100 if is_terminal {
1103 self.cancel_gtd_expiry(&client_order_id);
1104 }
1105
1106 if state != ComponentState::Running {
1109 return;
1110 }
1111
1112 {
1115 let core = self.core_mut();
1116 if let Some(manager) = &mut core.order_manager {
1117 manager.handle_event(&event);
1118 }
1119 }
1120
1121 match &event {
1122 OrderEventAny::Initialized(e) => self.on_order_initialized(e.clone()),
1123 OrderEventAny::Denied(e) => self.on_order_denied(*e),
1124 OrderEventAny::Emulated(e) => self.on_order_emulated(*e),
1125 OrderEventAny::Released(e) => self.on_order_released(*e),
1126 OrderEventAny::Submitted(e) => self.on_order_submitted(*e),
1127 OrderEventAny::Rejected(e) => self.on_order_rejected(*e),
1128 OrderEventAny::Accepted(e) => self.on_order_accepted(*e),
1129 OrderEventAny::Canceled(e) => {
1130 let _ = DataActor::on_order_canceled(self, e);
1131 }
1132 OrderEventAny::Expired(e) => self.on_order_expired(*e),
1133 OrderEventAny::Triggered(e) => self.on_order_triggered(*e),
1134 OrderEventAny::PendingUpdate(e) => self.on_order_pending_update(*e),
1135 OrderEventAny::PendingCancel(e) => self.on_order_pending_cancel(*e),
1136 OrderEventAny::ModifyRejected(e) => self.on_order_modify_rejected(*e),
1137 OrderEventAny::CancelRejected(e) => self.on_order_cancel_rejected(*e),
1138 OrderEventAny::Updated(e) => self.on_order_updated(*e),
1139 OrderEventAny::Filled(e) => {
1140 let _ = DataActor::on_order_filled(self, e);
1141 }
1142 }
1143 }
1144
1145 fn handle_position_event(&mut self, event: PositionEvent) {
1147 let state = {
1148 let core = self.core_mut();
1149
1150 if core.actor.config.log_events {
1151 let id = &core.actor.actor_id;
1152 log::info!("{id} {RECV}{EVT} {event:?}");
1153 }
1154
1155 core.actor.state()
1156 };
1157
1158 if state != ComponentState::Running {
1159 return;
1160 }
1161
1162 match event {
1163 PositionEvent::PositionOpened(e) => self.on_position_opened(e),
1164 PositionEvent::PositionChanged(e) => self.on_position_changed(e),
1165 PositionEvent::PositionClosed(e) => self.on_position_closed(e),
1166 PositionEvent::PositionAdjusted(_) => {
1167 }
1169 }
1170 }
1171
1172 fn on_start(&mut self) -> anyhow::Result<()> {
1183 let core = self.core_mut();
1184 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1185 log::info!("Starting {strategy_id}");
1186
1187 if core.config.manage_gtd_expiry {
1188 self.reactivate_gtd_timers();
1189 }
1190
1191 Ok(())
1192 }
1193
1194 fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
1203 if event.name.starts_with("GTD-EXPIRY:") {
1204 self.expire_gtd_order(event.clone());
1205 } else if event.name.starts_with("MARKET_EXIT_CHECK:") {
1206 self.check_market_exit(event.clone());
1207 }
1208 Ok(())
1209 }
1210
1211 #[allow(unused_variables)]
1217 fn on_order_initialized(&mut self, event: OrderInitialized) {}
1218
1219 #[allow(unused_variables)]
1223 fn on_order_denied(&mut self, event: OrderDenied) {}
1224
1225 #[allow(unused_variables)]
1229 fn on_order_emulated(&mut self, event: OrderEmulated) {}
1230
1231 #[allow(unused_variables)]
1235 fn on_order_released(&mut self, event: OrderReleased) {}
1236
1237 #[allow(unused_variables)]
1241 fn on_order_submitted(&mut self, event: OrderSubmitted) {}
1242
1243 #[allow(unused_variables)]
1247 fn on_order_rejected(&mut self, event: OrderRejected) {}
1248
1249 #[allow(unused_variables)]
1253 fn on_order_accepted(&mut self, event: OrderAccepted) {}
1254
1255 #[allow(unused_variables)]
1259 fn on_order_expired(&mut self, event: OrderExpired) {}
1260
1261 #[allow(unused_variables)]
1265 fn on_order_triggered(&mut self, event: OrderTriggered) {}
1266
1267 #[allow(unused_variables)]
1271 fn on_order_pending_update(&mut self, event: OrderPendingUpdate) {}
1272
1273 #[allow(unused_variables)]
1277 fn on_order_pending_cancel(&mut self, event: OrderPendingCancel) {}
1278
1279 #[allow(unused_variables)]
1283 fn on_order_modify_rejected(&mut self, event: OrderModifyRejected) {}
1284
1285 #[allow(unused_variables)]
1289 fn on_order_cancel_rejected(&mut self, event: OrderCancelRejected) {}
1290
1291 #[allow(unused_variables)]
1295 fn on_order_updated(&mut self, event: OrderUpdated) {}
1296
1297 #[allow(unused_variables)]
1303 fn on_position_opened(&mut self, event: PositionOpened) {}
1304
1305 #[allow(unused_variables)]
1309 fn on_position_changed(&mut self, event: PositionChanged) {}
1310
1311 #[allow(unused_variables)]
1315 fn on_position_closed(&mut self, event: PositionClosed) {}
1316
1317 fn on_market_exit(&mut self) {}
1321
1322 fn post_market_exit(&mut self) {}
1326
1327 fn is_exiting(&self) -> bool {
1331 self.core().is_exiting
1332 }
1333
1334 fn market_exit(&mut self) -> anyhow::Result<()> {
1350 let core = self.core_mut();
1351 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1352
1353 if core.actor.state() != ComponentState::Running {
1354 log::warn!("{strategy_id} Cannot market exit: strategy is not running");
1355 return Ok(());
1356 }
1357
1358 if core.is_exiting {
1359 log::warn!("{strategy_id} Market exit called when already in progress");
1360 return Ok(());
1361 }
1362
1363 core.is_exiting = true;
1364 core.market_exit_attempts = 0;
1365 let time_in_force = core.config.market_exit_time_in_force;
1366 let reduce_only = core.config.market_exit_reduce_only;
1367
1368 log::info!("{strategy_id} Initiating market exit...");
1369
1370 self.on_market_exit();
1371
1372 let core = self.core_mut();
1373 let cache = core.cache();
1374
1375 let mut instruments: AHashSet<InstrumentId> = AHashSet::new();
1376
1377 for client_order_id in
1378 cache.iter_client_order_ids_open(None, None, Some(&strategy_id), None)
1379 {
1380 if let Some(order) = cache.order(&client_order_id) {
1381 instruments.insert(order.instrument_id());
1382 }
1383 }
1384
1385 for client_order_id in
1386 cache.iter_client_order_ids_inflight(None, None, Some(&strategy_id), None)
1387 {
1388 if let Some(order) = cache.order(&client_order_id) {
1389 instruments.insert(order.instrument_id());
1390 }
1391 }
1392
1393 for position_id in cache.iter_position_open_ids(None, None, Some(&strategy_id), None) {
1394 if let Some(position) = cache.position(&position_id) {
1395 instruments.insert(position.instrument_id);
1396 }
1397 }
1398
1399 let market_exit_tag = core.market_exit_tag;
1400 let mut instruments: Vec<_> = instruments.into_iter().collect();
1404 instruments.sort();
1405 drop(cache);
1406
1407 for instrument_id in instruments {
1408 if let Err(e) = self.cancel_all_orders(instrument_id, None, None, None) {
1409 log::error!("Error canceling orders for {instrument_id}: {e}");
1410 }
1411
1412 if let Err(e) = self.close_all_positions(
1413 instrument_id,
1414 None,
1415 None,
1416 Some(vec![market_exit_tag]),
1417 Some(time_in_force),
1418 Some(reduce_only),
1419 None,
1420 ) {
1421 log::error!("Error closing positions for {instrument_id}: {e}");
1422 }
1423 }
1424
1425 let core = self.core_mut();
1426 let interval_ms = core.config.market_exit_interval_ms;
1427 let timer_name = core.market_exit_timer_name;
1428
1429 log::info!("{strategy_id} Setting market exit timer at {interval_ms}ms intervals");
1430
1431 let interval_ns = interval_ms * 1_000_000;
1432 let result = core.clock().set_timer_ns(
1433 timer_name.as_str(),
1434 interval_ns,
1435 None,
1436 None,
1437 None,
1438 None,
1439 None,
1440 );
1441
1442 if let Err(e) = result {
1443 core.is_exiting = false;
1445 core.market_exit_attempts = 0;
1446 return Err(e);
1447 }
1448
1449 Ok(())
1450 }
1451
1452 fn check_market_exit(&mut self, _event: TimeEvent) {
1456 if !self.is_exiting() {
1458 return;
1459 }
1460
1461 let core = self.core_mut();
1462 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1463
1464 core.market_exit_attempts += 1;
1465 let attempts = core.market_exit_attempts;
1466 let max_attempts = core.config.market_exit_max_attempts;
1467
1468 log::debug!(
1469 "{strategy_id} Market exit check triggered (attempt {attempts}/{max_attempts})"
1470 );
1471
1472 if attempts >= max_attempts {
1473 let cache = core.cache();
1474 let open_orders_count =
1475 cache.orders_open_count(None, None, Some(&strategy_id), None, None);
1476 let inflight_orders_count =
1477 cache.orders_inflight_count(None, None, Some(&strategy_id), None, None);
1478 let open_positions_count =
1479 cache.positions_open_count(None, None, Some(&strategy_id), None, None);
1480
1481 drop(cache);
1482
1483 log::warn!(
1484 "{strategy_id} Market exit max attempts ({max_attempts}) reached, \
1485 completing with open orders: {open_orders_count}, \
1486 inflight orders: {inflight_orders_count}, \
1487 open positions: {open_positions_count}"
1488 );
1489
1490 self.finalize_market_exit();
1491 return;
1492 }
1493
1494 let cache = core.cache();
1495 let has_open_orders = !cache
1496 .orders_open(None, None, Some(&strategy_id), None, None)
1497 .is_empty();
1498 let has_inflight_orders = !cache
1499 .orders_inflight(None, None, Some(&strategy_id), None, None)
1500 .is_empty();
1501
1502 if has_open_orders || has_inflight_orders {
1503 return;
1504 }
1505
1506 let positions_data: Vec<_> = cache
1507 .positions_open(None, None, Some(&strategy_id), None, None)
1508 .iter()
1509 .map(|p| (p.id, p.instrument_id, p.side, p.quantity, p.is_closed()))
1510 .collect();
1511
1512 if !positions_data.is_empty() {
1513 drop(cache);
1515
1516 for (pos_id, instrument_id, side, quantity, is_closed) in positions_data {
1517 if is_closed {
1518 continue;
1519 }
1520
1521 let core = self.core_mut();
1522 let time_in_force = core.config.market_exit_time_in_force;
1523 let reduce_only = core.config.market_exit_reduce_only;
1524 let market_exit_tag = core.market_exit_tag;
1525 let closing_side = OrderCore::closing_side(side);
1526 let order = core.order_factory().market(
1527 instrument_id,
1528 closing_side,
1529 quantity,
1530 Some(time_in_force),
1531 Some(reduce_only),
1532 None,
1533 None,
1534 None,
1535 Some(vec![market_exit_tag]),
1536 None,
1537 );
1538
1539 if let Err(e) = self.submit_order(order, Some(pos_id), None, None) {
1540 log::error!("Error re-submitting close order for position {pos_id}: {e}");
1541 }
1542 }
1543 return;
1544 }
1545
1546 drop(cache);
1547 self.finalize_market_exit();
1548 }
1549
1550 fn finalize_market_exit(&mut self) {
1555 let (strategy_id, should_stop) = {
1556 let core = self.core_mut();
1557 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1558 let should_stop = core.pending_stop;
1559 (strategy_id, should_stop)
1560 };
1561
1562 self.cancel_market_exit();
1563
1564 let hook_result = catch_unwind(AssertUnwindSafe(|| {
1565 self.post_market_exit();
1566 }));
1567
1568 if let Err(e) = hook_result {
1569 log::error!("{strategy_id} Error in post_market_exit: {e:?}");
1570 }
1571
1572 if should_stop {
1573 log::info!("{strategy_id} Market exit complete, stopping strategy");
1574
1575 if let Err(e) = Component::stop(self) {
1576 log::error!("{strategy_id} Failed to stop: {e}");
1577 }
1578 }
1579
1580 let core = self.core_mut();
1581 debug_assert!(
1582 !(core.pending_stop
1583 && !core.is_exiting
1584 && core.actor.state() == ComponentState::Running),
1585 "INVARIANT: stuck state after finalize_market_exit"
1586 );
1587 }
1588
1589 fn cancel_market_exit(&mut self) {
1593 let core = self.core_mut();
1594 let timer_name = core.market_exit_timer_name;
1595
1596 if core.clock().timer_names().contains(&timer_name.as_str()) {
1597 core.clock().cancel_timer(timer_name.as_str());
1598 }
1599
1600 core.is_exiting = false;
1601 core.pending_stop = false;
1602 core.market_exit_attempts = 0;
1603 }
1604
1605 fn stop(&mut self) -> bool {
1617 let (manage_stop, is_exiting, should_initiate_exit) = {
1618 let core = self.core_mut();
1619 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1620 let manage_stop = core.config.manage_stop;
1621 let state = core.actor.state();
1622 let pending_stop = core.pending_stop;
1623 let is_exiting = core.is_exiting;
1624
1625 if manage_stop {
1626 if state != ComponentState::Running {
1627 return true; }
1629
1630 if pending_stop {
1631 return false; }
1633
1634 core.pending_stop = true;
1635 let should_initiate_exit = !is_exiting;
1636
1637 if should_initiate_exit {
1638 log::info!("{strategy_id} Initiating market exit before stop");
1639 }
1640
1641 (manage_stop, is_exiting, should_initiate_exit)
1642 } else {
1643 (manage_stop, is_exiting, false)
1644 }
1645 };
1646
1647 if manage_stop {
1648 if should_initiate_exit && let Err(e) = self.market_exit() {
1649 log::warn!("Market exit failed during stop: {e}, proceeding with stop");
1650 self.core_mut().pending_stop = false;
1651 return true;
1652 }
1653 debug_assert!(
1654 self.is_exiting(),
1655 "INVARIANT: deferring stop but not exiting"
1656 );
1657 return false; }
1659
1660 if is_exiting {
1662 self.cancel_market_exit();
1663 }
1664
1665 true }
1667
1668 fn deny_order(&mut self, order: &OrderAny, reason: Ustr) {
1673 let core = self.core_mut();
1674 let Some(trader_id) = core.trader_id() else {
1675 log::error!(
1676 "Cannot deny order {}: trader_id is not set",
1677 order.client_order_id()
1678 );
1679 return;
1680 };
1681 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1682 let ts_now = core.clock().timestamp_ns();
1683
1684 let event = OrderDenied::new(
1685 trader_id,
1686 strategy_id,
1687 order.instrument_id(),
1688 order.client_order_id(),
1689 reason,
1690 UUID4::new(),
1691 ts_now,
1692 ts_now,
1693 );
1694
1695 log::warn!(
1696 "{strategy_id} Order {} denied: {reason}",
1697 order.client_order_id()
1698 );
1699
1700 let publish_initialized = {
1701 let cache_rc = core.cache_rc();
1702 let mut cache = cache_rc.borrow_mut();
1703 if cache.order_exists(&order.client_order_id()) {
1704 false
1705 } else {
1706 match cache.add_order(order.clone(), None, None, true) {
1707 Ok(()) => true,
1708 Err(e) => {
1709 log::warn!("Failed to add denied order to cache: {e}");
1710 false
1711 }
1712 }
1713 }
1714 };
1715
1716 if publish_initialized {
1717 publish_order_initialized(order);
1718 }
1719
1720 let event = OrderEventAny::Denied(event);
1721 let applied = {
1722 let cache_rc = core.cache_rc();
1723 let mut cache = cache_rc.borrow_mut();
1724 if let Err(e) = cache.update_order(&event) {
1725 log::warn!("Failed to apply OrderDenied event: {e}");
1726 false
1727 } else {
1728 true
1729 }
1730 };
1731
1732 if applied {
1733 let topic = format!("events.order.{strategy_id}");
1734 msgbus::publish_order_event(topic.into(), &event);
1735 }
1736 }
1737
1738 fn deny_order_list(&mut self, orders: &[OrderAny], reason: Ustr) {
1742 for order in orders {
1743 if !order.is_closed() {
1744 self.deny_order(order, reason);
1745 }
1746 }
1747 }
1748
1749 fn set_gtd_expiry(&mut self, order: &OrderAny) -> anyhow::Result<()> {
1759 let core = self.core_mut();
1760
1761 if !core.config.manage_gtd_expiry || order.time_in_force() != TimeInForce::Gtd {
1762 return Ok(());
1763 }
1764
1765 let Some(expire_time) = order.expire_time() else {
1766 return Ok(());
1767 };
1768
1769 let client_order_id = order.client_order_id();
1770 let timer_name = format!("GTD-EXPIRY:{client_order_id}");
1771
1772 let current_time_ns = {
1773 let clock = core.clock();
1774 clock.timestamp_ns()
1775 };
1776
1777 if current_time_ns >= expire_time.as_u64() {
1778 log::info!("GTD order {client_order_id} already expired, canceling immediately");
1779 return self.cancel_order(order.client_order_id(), None, None);
1780 }
1781
1782 {
1783 let mut clock = core.clock();
1784 clock.set_time_alert_ns(&timer_name, expire_time, None, None)?;
1785 }
1786
1787 core.gtd_timers
1788 .insert(client_order_id, Ustr::from(&timer_name));
1789
1790 log::debug!("Set GTD expiry timer for {client_order_id} at {expire_time}");
1791 Ok(())
1792 }
1793
1794 fn cancel_gtd_expiry(&mut self, client_order_id: &ClientOrderId) {
1796 let core = self.core_mut();
1797
1798 if let Some(timer_name) = core.gtd_timers.remove(client_order_id) {
1799 core.clock().cancel_timer(timer_name.as_str());
1800 log::debug!("Canceled GTD expiry timer for {client_order_id}");
1801 }
1802 }
1803
1804 fn has_gtd_expiry_timer(&mut self, client_order_id: &ClientOrderId) -> bool {
1806 let core = self.core_mut();
1807 core.gtd_timers.contains_key(client_order_id)
1808 }
1809
1810 fn expire_gtd_order(&mut self, event: TimeEvent) {
1814 let timer_name = event.name.to_string();
1815 let Some(client_order_id_str) = timer_name.strip_prefix("GTD-EXPIRY:") else {
1816 log::error!("Invalid GTD timer name format: {timer_name}");
1817 return;
1818 };
1819
1820 let client_order_id = ClientOrderId::from(client_order_id_str);
1821
1822 let core = self.core_mut();
1823 core.gtd_timers.remove(&client_order_id);
1824
1825 let order = core.cache().order(&client_order_id).map(|o| o.clone());
1826 let Some(order) = order else {
1827 log::warn!("GTD order {client_order_id} not found in cache");
1828 return;
1829 };
1830
1831 log::info!("GTD order {client_order_id} expired");
1832
1833 if let Err(e) = self.cancel_order(order.client_order_id(), None, None) {
1834 log::error!("Failed to cancel expired GTD order {client_order_id}: {e}");
1835 }
1836 }
1837
1838 fn reactivate_gtd_timers(&mut self) {
1843 let core = self.core_mut();
1844 let strategy_id = StrategyId::from(core.actor_id().inner().as_str());
1845 let current_time_ns = core.clock().timestamp_ns();
1846
1847 let gtd_orders: Vec<OrderAny> = core
1848 .cache()
1849 .orders_open(None, None, Some(&strategy_id), None, None)
1850 .into_iter()
1851 .filter(|o| o.time_in_force() == TimeInForce::Gtd)
1852 .map(|o| o.clone())
1853 .collect();
1854
1855 for order in gtd_orders {
1856 let Some(expire_time) = order.expire_time() else {
1857 continue;
1858 };
1859
1860 let expire_time_ns = expire_time.as_u64();
1861 let client_order_id = order.client_order_id();
1862
1863 if current_time_ns >= expire_time_ns {
1864 log::info!("GTD order {client_order_id} already expired, canceling immediately");
1865 if let Err(e) = self.cancel_order(order.client_order_id(), None, None) {
1866 log::error!("Failed to cancel expired GTD order {client_order_id}: {e}");
1867 }
1868 } else if let Err(e) = self.set_gtd_expiry(&order) {
1869 log::error!("Failed to set GTD expiry timer for {client_order_id}: {e}");
1870 }
1871 }
1872 }
1873}
1874
1875fn publish_order_initialized(order: &OrderAny) {
1876 let topic = format!("events.order.{}", order.strategy_id());
1877 let event = OrderEventAny::Initialized(order.init_event().clone());
1878 msgbus::publish_order_event(topic.into(), &event);
1879}
1880
1881fn registered_trader_id(core: &StrategyCore) -> anyhow::Result<TraderId> {
1882 core.trader_id()
1883 .ok_or_else(|| anyhow::anyhow!("Strategy not registered: trader_id is not set"))
1884}
1885
1886fn required_account_id(order: &OrderAny, operation: &str) -> anyhow::Result<AccountId> {
1887 order.account_id().ok_or_else(|| {
1888 anyhow::anyhow!(
1889 "Cannot generate {operation} event for {}: account_id is not set",
1890 order.client_order_id()
1891 )
1892 })
1893}
1894
1895#[cfg(test)]
1896mod tests {
1897 use std::{cell::RefCell, rc::Rc};
1898
1899 use nautilus_common::{
1900 actor::DataActor,
1901 cache::Cache,
1902 clock::{Clock, TestClock},
1903 component::Component,
1904 msgbus::{
1905 self, MessagingSwitchboard, TypedHandler, TypedIntoHandler,
1906 stubs::{
1907 TypedIntoMessageSavingHandler, TypedMessageSavingHandler,
1908 get_typed_into_message_saving_handler, get_typed_message_saving_handler,
1909 },
1910 },
1911 timer::{TimeEvent, TimeEventCallback},
1912 };
1913 use nautilus_core::UnixNanos;
1914 use nautilus_model::{
1915 enums::{LiquiditySide, OrderSide, OrderStatus, OrderType, PositionSide},
1916 events::{OrderAccepted, OrderCanceled, OrderFilled, OrderRejected},
1917 identifiers::{
1918 AccountId, ClientOrderId, InstrumentId, OrderListId, PositionId, StrategyId, TradeId,
1919 TraderId, VenueOrderId,
1920 },
1921 orderbook::own::OwnOrderBook,
1922 orders::{LimitOrder, MarketOrder, stubs::TestOrderEventStubs},
1923 stubs::TestDefault,
1924 types::{Currency, Money, Price},
1925 };
1926 use nautilus_portfolio::portfolio::Portfolio;
1927 use rstest::rstest;
1928 use serde_json::Value;
1929
1930 use super::*;
1931 use crate::nautilus_strategy;
1932
1933 #[derive(Debug)]
1934 struct TestStrategy {
1935 core: StrategyCore,
1936 on_order_rejected_called: bool,
1937 on_order_accepted_called: bool,
1938 on_order_canceled_called: bool,
1939 on_order_filled_called: bool,
1940 on_order_expired_called: bool,
1941 on_position_opened_called: bool,
1942 on_position_changed_called: bool,
1943 on_position_closed_called: bool,
1944 }
1945
1946 impl TestStrategy {
1947 fn new(config: StrategyConfig) -> Self {
1948 Self {
1949 core: StrategyCore::new(config),
1950 on_order_rejected_called: false,
1951 on_order_accepted_called: false,
1952 on_order_canceled_called: false,
1953 on_order_filled_called: false,
1954 on_order_expired_called: false,
1955 on_position_opened_called: false,
1956 on_position_changed_called: false,
1957 on_position_closed_called: false,
1958 }
1959 }
1960 }
1961
1962 impl DataActor for TestStrategy {
1963 fn on_order_canceled(&mut self, _event: &OrderCanceled) -> anyhow::Result<()> {
1964 self.on_order_canceled_called = true;
1965 Ok(())
1966 }
1967
1968 fn on_order_filled(&mut self, _event: &OrderFilled) -> anyhow::Result<()> {
1969 self.on_order_filled_called = true;
1970 Ok(())
1971 }
1972 }
1973
1974 nautilus_strategy!(TestStrategy, {
1975 fn on_order_rejected(&mut self, _event: OrderRejected) {
1976 self.on_order_rejected_called = true;
1977 }
1978
1979 fn on_order_accepted(&mut self, _event: OrderAccepted) {
1980 self.on_order_accepted_called = true;
1981 }
1982
1983 fn on_order_expired(&mut self, _event: OrderExpired) {
1984 self.on_order_expired_called = true;
1985 }
1986
1987 fn on_position_opened(&mut self, _event: PositionOpened) {
1988 self.on_position_opened_called = true;
1989 }
1990
1991 fn on_position_changed(&mut self, _event: PositionChanged) {
1992 self.on_position_changed_called = true;
1993 }
1994
1995 fn on_position_closed(&mut self, _event: PositionClosed) {
1996 self.on_position_closed_called = true;
1997 }
1998 });
1999
2000 fn create_test_strategy() -> TestStrategy {
2001 let config = StrategyConfig {
2002 strategy_id: Some(StrategyId::from("TEST-001")),
2003 order_id_tag: Some("001".to_string()),
2004 ..Default::default()
2005 };
2006 TestStrategy::new(config)
2007 }
2008
2009 fn register_strategy(strategy: &mut TestStrategy) {
2010 let trader_id = TraderId::from("TRADER-001");
2011 let clock = Rc::new(RefCell::new(TestClock::new()));
2012 let cache = Rc::new(RefCell::new(Cache::default()));
2013 let portfolio = Rc::new(RefCell::new(Portfolio::new(
2014 cache.clone(),
2015 clock.clone(),
2016 None,
2017 )));
2018
2019 strategy
2020 .core
2021 .register(trader_id, clock, cache, portfolio)
2022 .unwrap();
2023 strategy.initialize().unwrap();
2024 }
2025
2026 fn start_strategy(strategy: &mut TestStrategy) {
2027 strategy.start().unwrap();
2028 }
2029
2030 fn stop_strategy(strategy: &mut TestStrategy) {
2031 Component::stop(strategy).unwrap();
2032 }
2033
2034 fn make_filled(client_order_id: ClientOrderId) -> OrderEventAny {
2035 OrderEventAny::Filled(OrderFilled {
2036 trader_id: TraderId::from("TRADER-001"),
2037 strategy_id: StrategyId::from("TEST-001"),
2038 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2039 client_order_id,
2040 venue_order_id: VenueOrderId::test_default(),
2041 account_id: AccountId::from("ACC-001"),
2042 trade_id: TradeId::test_default(),
2043 position_id: None,
2044 order_side: OrderSide::Buy,
2045 order_type: OrderType::Market,
2046 last_qty: Quantity::default(),
2047 last_px: Price::default(),
2048 currency: Currency::from("USD"),
2049 liquidity_side: LiquiditySide::Taker,
2050 event_id: UUID4::default(),
2051 ts_event: UnixNanos::default(),
2052 ts_init: UnixNanos::default(),
2053 reconciliation: false,
2054 commission: None,
2055 causation_id: None,
2056 })
2057 }
2058
2059 fn make_canceled(client_order_id: ClientOrderId) -> OrderEventAny {
2060 OrderEventAny::Canceled(OrderCanceled {
2061 trader_id: TraderId::from("TRADER-001"),
2062 strategy_id: StrategyId::from("TEST-001"),
2063 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2064 client_order_id,
2065 venue_order_id: None,
2066 account_id: Some(AccountId::from("ACC-001")),
2067 event_id: UUID4::default(),
2068 ts_event: UnixNanos::default(),
2069 ts_init: UnixNanos::default(),
2070 reconciliation: false,
2071 causation_id: None,
2072 })
2073 }
2074
2075 fn make_rejected(client_order_id: ClientOrderId) -> OrderEventAny {
2076 OrderEventAny::Rejected(OrderRejected {
2077 trader_id: TraderId::from("TRADER-001"),
2078 strategy_id: StrategyId::from("TEST-001"),
2079 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2080 client_order_id,
2081 account_id: AccountId::from("ACC-001"),
2082 reason: "Test rejection".into(),
2083 event_id: UUID4::default(),
2084 ts_event: UnixNanos::default(),
2085 ts_init: UnixNanos::default(),
2086 reconciliation: false,
2087 due_post_only: false,
2088 causation_id: None,
2089 })
2090 }
2091
2092 fn make_expired(client_order_id: ClientOrderId) -> OrderEventAny {
2093 OrderEventAny::Expired(OrderExpired {
2094 trader_id: TraderId::from("TRADER-001"),
2095 strategy_id: StrategyId::from("TEST-001"),
2096 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2097 client_order_id,
2098 venue_order_id: None,
2099 account_id: Some(AccountId::from("ACC-001")),
2100 event_id: UUID4::default(),
2101 ts_event: UnixNanos::default(),
2102 ts_init: UnixNanos::default(),
2103 reconciliation: false,
2104 causation_id: None,
2105 })
2106 }
2107
2108 fn make_accepted(client_order_id: ClientOrderId) -> OrderEventAny {
2109 OrderEventAny::Accepted(OrderAccepted {
2110 trader_id: TraderId::from("TRADER-001"),
2111 strategy_id: StrategyId::from("TEST-001"),
2112 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2113 client_order_id,
2114 venue_order_id: VenueOrderId::test_default(),
2115 account_id: AccountId::from("ACC-001"),
2116 event_id: UUID4::default(),
2117 ts_event: UnixNanos::default(),
2118 ts_init: UnixNanos::default(),
2119 reconciliation: false,
2120 causation_id: None,
2121 })
2122 }
2123
2124 fn make_accepted_market_order(client_order_id: &str) -> OrderAny {
2125 let mut order = OrderAny::Market(MarketOrder::new(
2126 TraderId::from("TRADER-001"),
2127 StrategyId::from("TEST-001"),
2128 InstrumentId::from("BTCUSDT.BINANCE"),
2129 ClientOrderId::from(client_order_id),
2130 OrderSide::Buy,
2131 Quantity::from(100_000),
2132 TimeInForce::Gtc,
2133 UUID4::new(),
2134 UnixNanos::default(),
2135 false,
2136 false,
2137 None,
2138 None,
2139 None,
2140 None,
2141 None,
2142 None,
2143 None,
2144 None,
2145 ));
2146 let account_id = AccountId::from("ACC-001");
2147 order
2148 .apply(TestOrderEventStubs::submitted(&order, account_id))
2149 .unwrap();
2150 order
2151 .apply(TestOrderEventStubs::accepted(
2152 &order,
2153 account_id,
2154 VenueOrderId::test_default(),
2155 ))
2156 .unwrap();
2157 order
2158 }
2159
2160 fn make_accepted_limit_order(client_order_id: &str) -> OrderAny {
2161 let mut order = OrderAny::Limit(LimitOrder::new(
2162 TraderId::from("TRADER-001"),
2163 StrategyId::from("TEST-001"),
2164 InstrumentId::from("BTCUSDT.BINANCE"),
2165 ClientOrderId::from(client_order_id),
2166 OrderSide::Buy,
2167 Quantity::from("1.0"),
2168 Price::from("50000.0"),
2169 TimeInForce::Gtc,
2170 None,
2171 false,
2172 false,
2173 false,
2174 None,
2175 None,
2176 None,
2177 None,
2178 None,
2179 None,
2180 None,
2181 None,
2182 None,
2183 None,
2184 None,
2185 UUID4::new(),
2186 UnixNanos::default(),
2187 ));
2188 let account_id = AccountId::from("ACC-001");
2189 order
2190 .apply(TestOrderEventStubs::submitted(&order, account_id))
2191 .unwrap();
2192 order
2193 .apply(TestOrderEventStubs::accepted(
2194 &order,
2195 account_id,
2196 VenueOrderId::test_default(),
2197 ))
2198 .unwrap();
2199 order
2200 }
2201
2202 fn make_initialized_market_order(client_order_id: &str) -> OrderAny {
2203 OrderAny::Market(MarketOrder::new(
2204 TraderId::from("TRADER-001"),
2205 StrategyId::from("TEST-001"),
2206 InstrumentId::from("BTCUSDT.BINANCE"),
2207 ClientOrderId::from(client_order_id),
2208 OrderSide::Buy,
2209 Quantity::from(100_000),
2210 TimeInForce::Gtc,
2211 UUID4::new(),
2212 UnixNanos::default(),
2213 false,
2214 false,
2215 None,
2216 None,
2217 None,
2218 None,
2219 None,
2220 None,
2221 None,
2222 None,
2223 ))
2224 }
2225
2226 fn add_order_to_cache(strategy: &TestStrategy, order: &OrderAny) {
2227 let cache_rc = strategy.core.cache_rc();
2228 let mut cache = cache_rc.borrow_mut();
2229 cache.add_order(order.clone(), None, None, true).unwrap();
2230 }
2231
2232 fn add_order_to_cache_and_own_book(strategy: &TestStrategy, order: &OrderAny) {
2233 let cache_rc = strategy.core.cache_rc();
2234 let mut cache = cache_rc.borrow_mut();
2235 cache.add_order(order.clone(), None, None, true).unwrap();
2236 cache
2237 .add_own_order_book(OwnOrderBook::new(order.instrument_id()))
2238 .unwrap();
2239 cache.update_own_order_book(order);
2240 }
2241
2242 fn make_position_opened() -> PositionEvent {
2243 PositionEvent::PositionOpened(PositionOpened {
2244 trader_id: TraderId::from("TRADER-001"),
2245 strategy_id: StrategyId::from("TEST-001"),
2246 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2247 position_id: PositionId::test_default(),
2248 account_id: AccountId::from("ACC-001"),
2249 opening_order_id: ClientOrderId::from("O-001"),
2250 entry: OrderSide::Buy,
2251 side: PositionSide::Long,
2252 signed_qty: 1.0,
2253 quantity: Quantity::default(),
2254 last_qty: Quantity::default(),
2255 last_px: Price::default(),
2256 currency: Currency::from("USD"),
2257 avg_px_open: 0.0,
2258 event_id: UUID4::default(),
2259 ts_event: UnixNanos::default(),
2260 ts_init: UnixNanos::default(),
2261 })
2262 }
2263
2264 fn make_position_changed() -> PositionEvent {
2265 let currency = Currency::from("USD");
2266 PositionEvent::PositionChanged(PositionChanged {
2267 trader_id: TraderId::from("TRADER-001"),
2268 strategy_id: StrategyId::from("TEST-001"),
2269 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2270 position_id: PositionId::test_default(),
2271 account_id: AccountId::from("ACC-001"),
2272 opening_order_id: ClientOrderId::from("O-001"),
2273 entry: OrderSide::Buy,
2274 side: PositionSide::Long,
2275 signed_qty: 2.0,
2276 quantity: Quantity::default(),
2277 peak_quantity: Quantity::default(),
2278 last_qty: Quantity::default(),
2279 last_px: Price::default(),
2280 currency,
2281 avg_px_open: 0.0,
2282 avg_px_close: None,
2283 realized_return: 0.0,
2284 realized_pnl: None,
2285 unrealized_pnl: Money::new(0.0, currency),
2286 event_id: UUID4::default(),
2287 ts_opened: UnixNanos::default(),
2288 ts_event: UnixNanos::default(),
2289 ts_init: UnixNanos::default(),
2290 })
2291 }
2292
2293 fn make_position_closed() -> PositionEvent {
2294 let currency = Currency::from("USD");
2295 PositionEvent::PositionClosed(PositionClosed {
2296 trader_id: TraderId::from("TRADER-001"),
2297 strategy_id: StrategyId::from("TEST-001"),
2298 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2299 position_id: PositionId::test_default(),
2300 account_id: AccountId::from("ACC-001"),
2301 opening_order_id: ClientOrderId::from("O-001"),
2302 closing_order_id: Some(ClientOrderId::from("O-002")),
2303 entry: OrderSide::Buy,
2304 side: PositionSide::Flat,
2305 signed_qty: 0.0,
2306 quantity: Quantity::default(),
2307 peak_quantity: Quantity::default(),
2308 last_qty: Quantity::default(),
2309 last_px: Price::default(),
2310 currency,
2311 avg_px_open: 0.0,
2312 avg_px_close: None,
2313 realized_return: 0.0,
2314 realized_pnl: None,
2315 unrealized_pnl: Money::new(0.0, currency),
2316 duration: 0,
2317 event_id: UUID4::default(),
2318 ts_opened: UnixNanos::default(),
2319 ts_closed: None,
2320 ts_event: UnixNanos::default(),
2321 ts_init: UnixNanos::default(),
2322 })
2323 }
2324
2325 #[rstest]
2326 fn test_strategy_creation() {
2327 let strategy = create_test_strategy();
2328 assert_eq!(
2329 strategy.core.config.strategy_id,
2330 Some(StrategyId::from("TEST-001"))
2331 );
2332 assert!(!strategy.on_order_rejected_called);
2333 assert!(!strategy.on_position_opened_called);
2334 }
2335
2336 #[rstest]
2337 fn test_strategy_registration() {
2338 let mut strategy = create_test_strategy();
2339 register_strategy(&mut strategy);
2340
2341 assert!(strategy.core.order_manager.is_some());
2342 assert!(strategy.core.order_factory.is_some());
2343 assert!(strategy.core.portfolio.is_some());
2344 }
2345
2346 #[rstest]
2347 fn test_handle_order_event_dispatches_to_handler() {
2348 let mut strategy = create_test_strategy();
2349 register_strategy(&mut strategy);
2350 start_strategy(&mut strategy);
2351
2352 let event = OrderEventAny::Rejected(OrderRejected {
2353 trader_id: TraderId::from("TRADER-001"),
2354 strategy_id: StrategyId::from("TEST-001"),
2355 instrument_id: InstrumentId::from("BTCUSDT.BINANCE"),
2356 client_order_id: ClientOrderId::from("O-001"),
2357 account_id: AccountId::from("ACC-001"),
2358 reason: "Test rejection".into(),
2359 event_id: UUID4::default(),
2360 ts_event: UnixNanos::default(),
2361 ts_init: UnixNanos::default(),
2362 reconciliation: false,
2363 due_post_only: false,
2364 causation_id: None,
2365 });
2366
2367 strategy.handle_order_event(event);
2368
2369 assert!(strategy.on_order_rejected_called);
2370 }
2371
2372 #[rstest]
2373 #[case::opened(make_position_opened())]
2374 #[case::changed(make_position_changed())]
2375 #[case::closed(make_position_closed())]
2376 fn test_handle_position_event_dispatches_to_handler(#[case] event: PositionEvent) {
2377 let mut strategy = create_test_strategy();
2378 register_strategy(&mut strategy);
2379 start_strategy(&mut strategy);
2380
2381 let expected_opened = matches!(event, PositionEvent::PositionOpened(_));
2382 let expected_changed = matches!(event, PositionEvent::PositionChanged(_));
2383 let expected_closed = matches!(event, PositionEvent::PositionClosed(_));
2384
2385 strategy.handle_position_event(event);
2386
2387 assert_eq!(strategy.on_position_opened_called, expected_opened);
2388 assert_eq!(strategy.on_position_changed_called, expected_changed);
2389 assert_eq!(strategy.on_position_closed_called, expected_closed);
2390 }
2391
2392 #[rstest]
2393 fn test_handle_position_event_skips_dispatch_when_stopped() {
2394 let mut strategy = create_test_strategy();
2395 register_strategy(&mut strategy);
2396 start_strategy(&mut strategy);
2397 stop_strategy(&mut strategy);
2398 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
2399
2400 strategy.handle_position_event(make_position_opened());
2401
2402 assert!(!strategy.on_position_opened_called);
2403 }
2404
2405 #[rstest]
2406 fn test_strategy_default_handlers_do_not_panic() {
2407 let mut strategy = create_test_strategy();
2408
2409 strategy.on_order_initialized(OrderInitialized::default());
2410 strategy.on_order_denied(OrderDenied::default());
2411 strategy.on_order_emulated(OrderEmulated::default());
2412 strategy.on_order_released(OrderReleased::default());
2413 strategy.on_order_submitted(OrderSubmitted::default());
2414 strategy.on_order_rejected(OrderRejected::default());
2415 let _ = DataActor::on_order_canceled(&mut strategy, &OrderCanceled::default());
2416 strategy.on_order_expired(OrderExpired::default());
2417 strategy.on_order_triggered(OrderTriggered::default());
2418 strategy.on_order_pending_update(OrderPendingUpdate::default());
2419 strategy.on_order_pending_cancel(OrderPendingCancel::default());
2420 strategy.on_order_modify_rejected(OrderModifyRejected::default());
2421 strategy.on_order_cancel_rejected(OrderCancelRejected::default());
2422 strategy.on_order_updated(OrderUpdated::default());
2423 }
2424
2425 #[rstest]
2426 fn test_submit_order_publishes_order_initialized_after_cache_insert_before_send() {
2427 let mut strategy = create_test_strategy();
2428 register_strategy(&mut strategy);
2429
2430 let order = make_initialized_market_order("O-20250208-INIT-001");
2431 let client_order_id = order.client_order_id();
2432 let cache_rc = strategy.core.cache_rc();
2433 let timeline = Rc::new(RefCell::new(Vec::new()));
2434 let event_messages = Rc::new(RefCell::new(Vec::new()));
2435
2436 let event_handler = {
2437 let event_messages = event_messages.clone();
2438 let timeline = timeline.clone();
2439 TypedHandler::from_with_id("events.order.initialized", move |event: &OrderEventAny| {
2440 assert!(cache_rc.borrow().order_exists(&client_order_id));
2441 assert!(matches!(event, OrderEventAny::Initialized(_)));
2442 event_messages.borrow_mut().push(event.clone());
2443 timeline.borrow_mut().push("init");
2444 })
2445 };
2446 let risk_handler = {
2447 let timeline = timeline.clone();
2448 TypedIntoHandler::from_with_id(
2449 "RiskEngine.queue_execute",
2450 move |command: TradingCommand| {
2451 assert!(matches!(command, TradingCommand::SubmitOrder(_)));
2452 timeline.borrow_mut().push("command");
2453 },
2454 )
2455 };
2456 msgbus::register_trading_command_endpoint(
2457 MessagingSwitchboard::risk_engine_queue_execute(),
2458 risk_handler,
2459 );
2460
2461 let topic = format!("events.order.{}", order.strategy_id());
2462 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2463
2464 strategy
2465 .submit_order(order.clone(), None, None, None)
2466 .unwrap();
2467
2468 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2469
2470 let event_messages = event_messages.borrow();
2471 assert_eq!(event_messages.len(), 1);
2472 assert_eq!(
2473 event_messages[0],
2474 OrderEventAny::Initialized(order.init_event().clone())
2475 );
2476 assert_eq!(timeline.borrow().as_slice(), &["init", "command"]);
2477 }
2478
2479 #[rstest]
2480 fn test_submit_order_errors_when_strategy_not_registered() {
2481 let mut strategy = create_test_strategy();
2482 let order = make_initialized_market_order("O-20250208-UNREGISTERED-001");
2483
2484 let err = strategy
2485 .submit_order(order, None, None, None)
2486 .unwrap_err()
2487 .to_string();
2488
2489 assert_eq!(err, "Strategy not registered: trader_id is not set");
2490 }
2491
2492 #[rstest]
2493 fn test_required_account_id_errors_when_missing_for_strategy_event() {
2494 let order = make_initialized_market_order("O-20250208-NO-ACCOUNT-001");
2495
2496 let err = required_account_id(&order, "pending cancel")
2497 .unwrap_err()
2498 .to_string();
2499
2500 assert_eq!(
2501 err,
2502 "Cannot generate pending cancel event for O-20250208-NO-ACCOUNT-001: \
2503 account_id is not set"
2504 );
2505 }
2506
2507 #[rstest]
2508 fn test_submit_order_rejects_non_initialized_without_events() {
2509 let mut strategy = create_test_strategy();
2510 register_strategy(&mut strategy);
2511
2512 let order = make_accepted_market_order("O-20250208-ACCEPTED-001");
2513 let topic = format!("events.order.{}", order.strategy_id());
2514 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2515 get_typed_message_saving_handler(Some(Ustr::from("events.order.invalid")));
2516
2517 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2518 let result = strategy.submit_order(order, None, None, None);
2519
2520 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2521
2522 assert!(result.is_err());
2523 assert!(
2524 result
2525 .unwrap_err()
2526 .to_string()
2527 .contains("expected INITIALIZED")
2528 );
2529 assert!(event_messages.get_messages().is_empty());
2530 }
2531
2532 #[rstest]
2533 fn test_submit_order_list_publishes_order_initialized_after_cache_insert_before_send() {
2534 let mut strategy = create_test_strategy();
2535 register_strategy(&mut strategy);
2536
2537 let order_list_id = OrderListId::from("OL-20250208-LIST-INIT");
2538 let mut orders = vec![
2539 make_initialized_market_order("O-20250208-LIST-INIT-001"),
2540 make_initialized_market_order("O-20250208-LIST-INIT-002"),
2541 ];
2542
2543 for order in &mut orders {
2544 order.set_order_list_id(order_list_id);
2545 }
2546
2547 let client_order_id1 = orders[0].client_order_id();
2548 let client_order_id2 = orders[1].client_order_id();
2549 let cache_rc = strategy.core.cache_rc();
2550 let timeline = Rc::new(RefCell::new(Vec::new()));
2551 let event_messages = Rc::new(RefCell::new(Vec::new()));
2552
2553 let event_handler = {
2554 let event_messages = event_messages.clone();
2555 let timeline = timeline.clone();
2556 TypedHandler::from_with_id(
2557 "events.order.list_initialized",
2558 move |event: &OrderEventAny| {
2559 match event {
2560 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id1 => {
2561 assert!(cache_rc.borrow().order_exists(&client_order_id1));
2562 timeline.borrow_mut().push("init1");
2563 }
2564 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id2 => {
2565 assert!(cache_rc.borrow().order_exists(&client_order_id2));
2566 timeline.borrow_mut().push("init2");
2567 }
2568 _ => panic!("unexpected order event {event:?}"),
2569 }
2570 event_messages.borrow_mut().push(event.clone());
2571 },
2572 )
2573 };
2574 let risk_handler = {
2575 let timeline = timeline.clone();
2576 TypedIntoHandler::from_with_id(
2577 "RiskEngine.queue_execute",
2578 move |command: TradingCommand| {
2579 assert!(matches!(command, TradingCommand::SubmitOrderList(_)));
2580 timeline.borrow_mut().push("command");
2581 },
2582 )
2583 };
2584 msgbus::register_trading_command_endpoint(
2585 MessagingSwitchboard::risk_engine_queue_execute(),
2586 risk_handler,
2587 );
2588
2589 let topic = format!("events.order.{}", orders[0].strategy_id());
2590 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2591
2592 strategy
2593 .submit_order_list(orders.clone(), None, None, None)
2594 .unwrap();
2595
2596 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2597
2598 let event_messages = event_messages.borrow();
2599 assert_eq!(event_messages.len(), 2);
2600 assert_eq!(
2601 event_messages[0],
2602 OrderEventAny::Initialized(orders[0].init_event().clone())
2603 );
2604 assert_eq!(
2605 event_messages[1],
2606 OrderEventAny::Initialized(orders[1].init_event().clone())
2607 );
2608 assert_eq!(timeline.borrow().as_slice(), &["init1", "init2", "command"]);
2609 }
2610
2611 #[rstest]
2612 fn test_submit_order_list_create_list_branch_publishes_init_after_cache_insert() {
2613 let mut strategy = create_test_strategy();
2614 register_strategy(&mut strategy);
2615
2616 let orders = vec![
2617 make_initialized_market_order("O-20250208-LIST-CREATE-001"),
2618 make_initialized_market_order("O-20250208-LIST-CREATE-002"),
2619 ];
2620
2621 let client_order_id1 = orders[0].client_order_id();
2622 let client_order_id2 = orders[1].client_order_id();
2623 let cache_rc = strategy.core.cache_rc();
2624 let timeline = Rc::new(RefCell::new(Vec::new()));
2625 let event_messages = Rc::new(RefCell::new(Vec::new()));
2626
2627 let event_handler = {
2628 let event_messages = event_messages.clone();
2629 let timeline = timeline.clone();
2630 TypedHandler::from_with_id(
2631 "events.order.list_create_initialized",
2632 move |event: &OrderEventAny| {
2633 match event {
2634 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id1 => {
2635 assert!(cache_rc.borrow().order_exists(&client_order_id1));
2636 timeline.borrow_mut().push("init1");
2637 }
2638 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id2 => {
2639 assert!(cache_rc.borrow().order_exists(&client_order_id2));
2640 timeline.borrow_mut().push("init2");
2641 }
2642 _ => panic!("unexpected order event {event:?}"),
2643 }
2644 event_messages.borrow_mut().push(event.clone());
2645 },
2646 )
2647 };
2648 let risk_handler = {
2649 let timeline = timeline.clone();
2650 TypedIntoHandler::from_with_id(
2651 "RiskEngine.queue_execute",
2652 move |command: TradingCommand| {
2653 assert!(matches!(command, TradingCommand::SubmitOrderList(_)));
2654 timeline.borrow_mut().push("command");
2655 },
2656 )
2657 };
2658 msgbus::register_trading_command_endpoint(
2659 MessagingSwitchboard::risk_engine_queue_execute(),
2660 risk_handler,
2661 );
2662
2663 let topic = format!("events.order.{}", orders[0].strategy_id());
2664 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2665
2666 strategy
2667 .submit_order_list(orders.clone(), None, None, None)
2668 .unwrap();
2669
2670 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2671
2672 let event_messages = event_messages.borrow();
2673 assert_eq!(event_messages.len(), 2);
2674 assert_eq!(
2675 event_messages[0],
2676 OrderEventAny::Initialized(orders[0].init_event().clone())
2677 );
2678 assert_eq!(
2679 event_messages[1],
2680 OrderEventAny::Initialized(orders[1].init_event().clone())
2681 );
2682 assert_eq!(timeline.borrow().as_slice(), &["init1", "init2", "command"]);
2683
2684 let cache = strategy.core.cache();
2685 let cached_order1 = cache.order(&client_order_id1).unwrap();
2686 let cached_order2 = cache.order(&client_order_id2).unwrap();
2687 let order_list_id = cached_order1.order_list_id().unwrap();
2688 assert_eq!(cached_order2.order_list_id(), Some(order_list_id));
2689
2690 let order_list = cache.order_list(&order_list_id).unwrap();
2691 assert_eq!(
2692 order_list.client_order_ids.as_slice(),
2693 &[client_order_id1, client_order_id2]
2694 );
2695 }
2696
2697 #[rstest]
2698 fn test_submit_order_list_routes_optional_params_to_risk() {
2699 let mut strategy = create_test_strategy();
2700 register_strategy(&mut strategy);
2701
2702 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2703 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2704 msgbus::register_trading_command_endpoint(
2705 MessagingSwitchboard::risk_engine_queue_execute(),
2706 risk_handler,
2707 );
2708
2709 let no_params_orders = vec![
2710 make_initialized_market_order("O-20250208-LIST-001"),
2711 make_initialized_market_order("O-20250208-LIST-002"),
2712 ];
2713 strategy
2714 .submit_order_list(no_params_orders, None, None, None)
2715 .unwrap();
2716
2717 let mut params = Params::new();
2718 params.insert(
2719 "routing_hint".to_string(),
2720 Value::String("prefer_batch".to_string()),
2721 );
2722 let param_orders = vec![
2723 make_initialized_market_order("O-20250208-LIST-003"),
2724 make_initialized_market_order("O-20250208-LIST-004"),
2725 ];
2726 strategy
2727 .submit_order_list(param_orders, None, None, Some(params.clone()))
2728 .unwrap();
2729
2730 let risk_messages = risk_messages.get_messages();
2731 assert_eq!(risk_messages.len(), 2);
2732 let Some(TradingCommand::SubmitOrderList(no_params_command)) = risk_messages.first() else {
2733 panic!("expected SubmitOrderList command");
2734 };
2735 let Some(TradingCommand::SubmitOrderList(param_command)) = risk_messages.get(1) else {
2736 panic!("expected SubmitOrderList command");
2737 };
2738 assert!(no_params_command.params.is_none());
2739 assert_eq!(param_command.params.as_ref(), Some(¶ms));
2740 }
2741
2742 #[rstest]
2743 fn test_modify_order_routes_non_emulated_orders_to_risk() {
2744 let mut strategy = create_test_strategy();
2745 register_strategy(&mut strategy);
2746
2747 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2748 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2749 msgbus::register_trading_command_endpoint(
2750 MessagingSwitchboard::risk_engine_queue_execute(),
2751 risk_handler,
2752 );
2753
2754 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2755 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2756 msgbus::register_trading_command_endpoint(
2757 MessagingSwitchboard::exec_engine_queue_execute(),
2758 exec_handler,
2759 );
2760
2761 let order = OrderAny::Market(MarketOrder::new(
2762 TraderId::from("TRADER-001"),
2763 StrategyId::from("TEST-001"),
2764 InstrumentId::from("BTCUSDT.BINANCE"),
2765 ClientOrderId::from("O-20250208-0003"),
2766 OrderSide::Buy,
2767 Quantity::from(100_000),
2768 TimeInForce::Gtc,
2769 UUID4::new(),
2770 UnixNanos::default(),
2771 false,
2772 false,
2773 None,
2774 None,
2775 None,
2776 None,
2777 None,
2778 None,
2779 None,
2780 None,
2781 ));
2782 add_order_to_cache(&strategy, &order);
2783
2784 strategy
2785 .modify_order(
2786 order.client_order_id(),
2787 Some(Quantity::from(200_000)),
2788 None,
2789 None,
2790 None,
2791 None,
2792 )
2793 .unwrap();
2794
2795 let risk_messages = risk_messages.get_messages();
2796 let exec_messages = exec_messages.get_messages();
2797
2798 assert_eq!(risk_messages.len(), 1);
2799 assert!(matches!(
2800 risk_messages.first(),
2801 Some(TradingCommand::ModifyOrder(_))
2802 ));
2803 assert!(exec_messages.is_empty());
2804 }
2805
2806 #[rstest]
2807 fn test_modify_order_marks_order_pending_update_locally_before_send() {
2808 let mut strategy = create_test_strategy();
2809 register_strategy(&mut strategy);
2810
2811 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2812 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
2813 msgbus::register_trading_command_endpoint(
2814 MessagingSwitchboard::risk_engine_queue_execute(),
2815 risk_handler,
2816 );
2817
2818 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2819 get_typed_message_saving_handler(Some(Ustr::from("events.order.pending_update")));
2820 let order = make_accepted_limit_order("O-20250208-UPDATE-001");
2821 let topic = format!("events.order.{}", order.strategy_id());
2822 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2823 add_order_to_cache(&strategy, &order);
2824
2825 strategy
2826 .modify_order(
2827 order.client_order_id(),
2828 None,
2829 Some(Price::from("51000.0")),
2830 None,
2831 None,
2832 None,
2833 )
2834 .unwrap();
2835
2836 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2837
2838 {
2839 let cache = strategy.core.cache();
2840 let cached_order = cache.order(&order.client_order_id()).unwrap();
2841 assert_eq!(cached_order.status(), OrderStatus::PendingUpdate);
2842 }
2843
2844 let risk_messages = risk_messages.get_messages();
2845 assert_eq!(risk_messages.len(), 1);
2846 assert!(matches!(
2847 risk_messages.first(),
2848 Some(TradingCommand::ModifyOrder(_))
2849 ));
2850
2851 let event_messages = event_messages.get_messages();
2852 assert_eq!(event_messages.len(), 1);
2853 assert!(matches!(
2854 event_messages.first(),
2855 Some(OrderEventAny::PendingUpdate(_))
2856 ));
2857 }
2858
2859 #[rstest]
2860 fn test_cancel_order_marks_order_pending_cancel_locally_before_send() {
2861 let mut strategy = create_test_strategy();
2862 register_strategy(&mut strategy);
2863
2864 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2865 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2866 msgbus::register_trading_command_endpoint(
2867 MessagingSwitchboard::exec_engine_queue_execute(),
2868 exec_handler,
2869 );
2870
2871 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2872 get_typed_message_saving_handler(Some(Ustr::from("events.order.pending_cancel")));
2873 let order = make_accepted_market_order("O-20250208-CANCEL-001");
2874 let topic = format!("events.order.{}", order.strategy_id());
2875 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2876 add_order_to_cache(&strategy, &order);
2877
2878 strategy
2879 .cancel_order(order.client_order_id(), None, None)
2880 .unwrap();
2881
2882 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2883
2884 {
2885 let cache = strategy.core.cache();
2886 let cached_order = cache.order(&order.client_order_id()).unwrap();
2887 assert_eq!(cached_order.status(), OrderStatus::PendingCancel);
2888 assert!(cache.is_order_pending_cancel_local(&order.client_order_id()));
2889 }
2890
2891 let exec_messages = exec_messages.get_messages();
2892 assert_eq!(exec_messages.len(), 1);
2893 assert!(matches!(
2894 exec_messages.first(),
2895 Some(TradingCommand::CancelOrder(_))
2896 ));
2897
2898 let event_messages = event_messages.get_messages();
2899 assert_eq!(event_messages.len(), 1);
2900 assert!(matches!(
2901 event_messages.first(),
2902 Some(OrderEventAny::PendingCancel(_))
2903 ));
2904 }
2905
2906 #[rstest]
2907 fn test_cancel_orders_marks_orders_pending_cancel_locally_before_send() {
2908 let mut strategy = create_test_strategy();
2909 register_strategy(&mut strategy);
2910
2911 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2912 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2913 msgbus::register_trading_command_endpoint(
2914 MessagingSwitchboard::exec_engine_queue_execute(),
2915 exec_handler,
2916 );
2917
2918 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
2919 get_typed_message_saving_handler(Some(Ustr::from("events.order.batch_pending_cancel")));
2920 let order1 = make_accepted_market_order("O-20250208-CANCEL-001");
2921 let order2 = make_accepted_market_order("O-20250208-CANCEL-002");
2922 let topic = format!("events.order.{}", order1.strategy_id());
2923 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
2924 add_order_to_cache(&strategy, &order1);
2925 add_order_to_cache(&strategy, &order2);
2926
2927 strategy
2928 .cancel_orders(
2929 vec![order1.client_order_id(), order2.client_order_id()],
2930 None,
2931 None,
2932 )
2933 .unwrap();
2934
2935 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
2936
2937 {
2938 let cache = strategy.core.cache();
2939 let cached_order1 = cache.order(&order1.client_order_id()).unwrap();
2940 let cached_order2 = cache.order(&order2.client_order_id()).unwrap();
2941 assert_eq!(cached_order1.status(), OrderStatus::PendingCancel);
2942 assert_eq!(cached_order2.status(), OrderStatus::PendingCancel);
2943 assert!(cache.is_order_pending_cancel_local(&order1.client_order_id()));
2944 assert!(cache.is_order_pending_cancel_local(&order2.client_order_id()));
2945 }
2946
2947 let exec_messages = exec_messages.get_messages();
2948 assert_eq!(exec_messages.len(), 1);
2949 let Some(TradingCommand::BatchCancelOrders(command)) = exec_messages.first() else {
2950 panic!("expected BatchCancelOrders command");
2951 };
2952 assert_eq!(command.cancels.len(), 2);
2953
2954 let event_messages = event_messages.get_messages();
2955 assert_eq!(event_messages.len(), 2);
2956 assert!(
2957 event_messages
2958 .iter()
2959 .all(|event| matches!(event, OrderEventAny::PendingCancel(_)))
2960 );
2961 }
2962
2963 #[rstest]
2964 fn test_cancel_order_updates_own_book_status_before_send() {
2965 let mut strategy = create_test_strategy();
2966 register_strategy(&mut strategy);
2967
2968 let (exec_handler, _exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
2969 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
2970 msgbus::register_trading_command_endpoint(
2971 MessagingSwitchboard::exec_engine_queue_execute(),
2972 exec_handler,
2973 );
2974
2975 let order = make_accepted_limit_order("O-20250208-CANCEL-OWN-BOOK-001");
2976 add_order_to_cache_and_own_book(&strategy, &order);
2977
2978 strategy
2979 .cancel_order(order.client_order_id(), None, None)
2980 .unwrap();
2981
2982 let mut accepted = AHashSet::new();
2983 accepted.insert(OrderStatus::Accepted);
2984 let mut pending_cancel = AHashSet::new();
2985 pending_cancel.insert(OrderStatus::PendingCancel);
2986
2987 let cache = strategy.core.cache();
2988 let own_book = cache.own_order_book(&order.instrument_id()).unwrap();
2989 assert!(own_book.bids_as_map(Some(&accepted), None, None).is_empty());
2990 let pending_bids = own_book.bids_as_map(Some(&pending_cancel), None, None);
2991 assert_eq!(pending_bids.values().map(Vec::len).sum::<usize>(), 1);
2992 }
2993
2994 #[rstest]
2995 fn test_cancel_order_returns_error_when_not_in_cache() {
2996 let mut strategy = create_test_strategy();
2997 register_strategy(&mut strategy);
2998
2999 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
3000 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
3001 msgbus::register_trading_command_endpoint(
3002 MessagingSwitchboard::exec_engine_queue_execute(),
3003 exec_handler,
3004 );
3005
3006 let missing_id = ClientOrderId::from("O-MISSING");
3007 let err = strategy
3008 .cancel_order(missing_id, None, None)
3009 .expect_err("expected cancel_order to fail when order is not in cache");
3010
3011 assert!(
3012 err.to_string().contains("not found in cache"),
3013 "unexpected error: {err}"
3014 );
3015 assert!(exec_messages.get_messages().is_empty());
3016 }
3017
3018 #[rstest]
3019 fn test_modify_order_returns_error_when_not_in_cache() {
3020 let mut strategy = create_test_strategy();
3021 register_strategy(&mut strategy);
3022
3023 let (risk_handler, risk_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
3024 get_typed_into_message_saving_handler(Some(Ustr::from("RiskEngine.queue_execute")));
3025 msgbus::register_trading_command_endpoint(
3026 MessagingSwitchboard::risk_engine_queue_execute(),
3027 risk_handler,
3028 );
3029
3030 let missing_id = ClientOrderId::from("O-MISSING");
3031 let err = strategy
3032 .modify_order(missing_id, Some(Quantity::from(1)), None, None, None, None)
3033 .expect_err("expected modify_order to fail when order is not in cache");
3034
3035 assert!(
3036 err.to_string().contains("not found in cache"),
3037 "unexpected error: {err}"
3038 );
3039 assert!(risk_messages.get_messages().is_empty());
3040 }
3041
3042 #[rstest]
3043 fn test_cancel_orders_returns_error_when_any_id_missing() {
3044 let mut strategy = create_test_strategy();
3045 register_strategy(&mut strategy);
3046
3047 let (exec_handler, exec_messages): (_, TypedIntoMessageSavingHandler<TradingCommand>) =
3048 get_typed_into_message_saving_handler(Some(Ustr::from("ExecEngine.queue_execute")));
3049 msgbus::register_trading_command_endpoint(
3050 MessagingSwitchboard::exec_engine_queue_execute(),
3051 exec_handler,
3052 );
3053
3054 let order = make_accepted_limit_order("O-PRESENT");
3055 add_order_to_cache(&strategy, &order);
3056
3057 let err = strategy
3058 .cancel_orders(
3059 vec![order.client_order_id(), ClientOrderId::from("O-MISSING")],
3060 None,
3061 None,
3062 )
3063 .expect_err("expected cancel_orders to fail when any id is missing");
3064
3065 assert!(
3066 err.to_string().contains("not found in cache"),
3067 "unexpected error: {err}"
3068 );
3069 assert!(exec_messages.get_messages().is_empty());
3070 }
3071
3072 #[rstest]
3075 fn test_has_gtd_expiry_timer_when_timer_not_set() {
3076 let mut strategy = create_test_strategy();
3077 let client_order_id = ClientOrderId::from("O-001");
3078
3079 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3080 }
3081
3082 #[rstest]
3083 fn test_has_gtd_expiry_timer_when_timer_set() {
3084 let mut strategy = create_test_strategy();
3085 let client_order_id = ClientOrderId::from("O-001");
3086
3087 strategy
3088 .core
3089 .gtd_timers
3090 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3091
3092 assert!(strategy.has_gtd_expiry_timer(&client_order_id));
3093 }
3094
3095 #[rstest]
3096 fn test_cancel_gtd_expiry_removes_timer() {
3097 let mut strategy = create_test_strategy();
3098 register_strategy(&mut strategy);
3099
3100 let client_order_id = ClientOrderId::from("O-001");
3101 strategy
3102 .core
3103 .gtd_timers
3104 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3105
3106 strategy.cancel_gtd_expiry(&client_order_id);
3107
3108 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3109 }
3110
3111 #[rstest]
3112 fn test_cancel_gtd_expiry_when_timer_not_set() {
3113 let mut strategy = create_test_strategy();
3114 register_strategy(&mut strategy);
3115
3116 let client_order_id = ClientOrderId::from("O-001");
3117
3118 strategy.cancel_gtd_expiry(&client_order_id);
3119
3120 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3121 }
3122
3123 #[rstest]
3124 #[case::filled(make_filled)]
3125 #[case::canceled(make_canceled)]
3126 #[case::rejected(make_rejected)]
3127 #[case::expired(make_expired)]
3128 fn test_handle_order_event_cancels_gtd_timer_for_terminal_event(
3129 #[case] make_event: fn(ClientOrderId) -> OrderEventAny,
3130 ) {
3131 let mut strategy = create_test_strategy();
3132 register_strategy(&mut strategy);
3133 start_strategy(&mut strategy);
3134
3135 let client_order_id = ClientOrderId::from("O-001");
3136 strategy
3137 .core
3138 .gtd_timers
3139 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3140
3141 strategy.handle_order_event(make_event(client_order_id));
3142
3143 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3144 }
3145
3146 #[rstest]
3147 #[case::filled(make_filled)]
3148 #[case::canceled(make_canceled)]
3149 #[case::rejected(make_rejected)]
3150 #[case::expired(make_expired)]
3151 fn test_handle_order_event_cancels_gtd_timer_when_stopped(
3152 #[case] make_event: fn(ClientOrderId) -> OrderEventAny,
3153 ) {
3154 let mut strategy = create_test_strategy();
3155 register_strategy(&mut strategy);
3156 start_strategy(&mut strategy);
3157
3158 let client_order_id = ClientOrderId::from("O-001");
3159 strategy
3160 .core
3161 .gtd_timers
3162 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3163
3164 stop_strategy(&mut strategy);
3165 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
3166
3167 strategy.handle_order_event(make_event(client_order_id));
3168
3169 assert!(!strategy.has_gtd_expiry_timer(&client_order_id));
3170 }
3171
3172 #[rstest]
3173 fn test_handle_order_event_skips_gtd_cancel_for_non_terminal() {
3174 let mut strategy = create_test_strategy();
3175 register_strategy(&mut strategy);
3176 start_strategy(&mut strategy);
3177
3178 let client_order_id = ClientOrderId::from("O-001");
3179 strategy
3180 .core
3181 .gtd_timers
3182 .insert(client_order_id, Ustr::from("GTD-EXPIRY:O-001"));
3183
3184 strategy.handle_order_event(make_accepted(client_order_id));
3185
3186 assert!(strategy.has_gtd_expiry_timer(&client_order_id));
3187 }
3188
3189 #[rstest]
3190 fn test_handle_order_event_skips_dispatch_when_stopped() {
3191 let mut strategy = create_test_strategy();
3192 register_strategy(&mut strategy);
3193 start_strategy(&mut strategy);
3194 stop_strategy(&mut strategy);
3195 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
3196
3197 strategy.handle_order_event(make_rejected(ClientOrderId::from("O-001")));
3198
3199 assert!(!strategy.on_order_rejected_called);
3200 }
3201
3202 #[rstest]
3203 fn test_on_start_calls_reactivate_gtd_timers_when_enabled() {
3204 let config = StrategyConfig {
3205 strategy_id: Some(StrategyId::from("TEST-001")),
3206 order_id_tag: Some("001".to_string()),
3207 manage_gtd_expiry: true,
3208 ..Default::default()
3209 };
3210 let mut strategy = TestStrategy::new(config);
3211 register_strategy(&mut strategy);
3212
3213 let result = Strategy::on_start(&mut strategy);
3214 assert!(result.is_ok());
3215 }
3216
3217 #[rstest]
3218 fn test_on_start_does_not_panic_when_gtd_disabled() {
3219 let config = StrategyConfig {
3220 strategy_id: Some(StrategyId::from("TEST-001")),
3221 order_id_tag: Some("001".to_string()),
3222 manage_gtd_expiry: false,
3223 ..Default::default()
3224 };
3225 let mut strategy = TestStrategy::new(config);
3226 register_strategy(&mut strategy);
3227
3228 let result = Strategy::on_start(&mut strategy);
3229 assert!(result.is_ok());
3230 }
3231
3232 #[rstest]
3235 fn test_query_account_when_registered() {
3236 let mut strategy = create_test_strategy();
3237 register_strategy(&mut strategy);
3238
3239 let account_id = AccountId::from("ACC-001");
3240
3241 let result = strategy.query_account(account_id, None, None);
3242
3243 assert!(result.is_ok());
3244 }
3245
3246 #[rstest]
3247 fn test_query_account_with_client_id() {
3248 let mut strategy = create_test_strategy();
3249 register_strategy(&mut strategy);
3250
3251 let account_id = AccountId::from("ACC-001");
3252 let client_id = ClientId::from("BINANCE");
3253
3254 let result = strategy.query_account(account_id, Some(client_id), None);
3255
3256 assert!(result.is_ok());
3257 }
3258
3259 #[rstest]
3260 fn test_query_order_when_registered() {
3261 let mut strategy = create_test_strategy();
3262 register_strategy(&mut strategy);
3263
3264 let order = OrderAny::Market(MarketOrder::test_default());
3265
3266 let result = strategy.query_order(&order, None, None);
3267
3268 assert!(result.is_ok());
3269 }
3270
3271 #[rstest]
3272 fn test_query_order_with_client_id() {
3273 let mut strategy = create_test_strategy();
3274 register_strategy(&mut strategy);
3275
3276 let order = OrderAny::Market(MarketOrder::test_default());
3277 let client_id = ClientId::from("BINANCE");
3278
3279 let result = strategy.query_order(&order, Some(client_id), None);
3280
3281 assert!(result.is_ok());
3282 }
3283
3284 #[rstest]
3285 fn test_is_exiting_returns_false_by_default() {
3286 let strategy = create_test_strategy();
3287 assert!(!strategy.is_exiting());
3288 }
3289
3290 #[rstest]
3291 fn test_is_exiting_returns_true_when_set_manually() {
3292 let mut strategy = create_test_strategy();
3293 register_strategy(&mut strategy);
3294
3295 strategy.core.is_exiting = true;
3297
3298 assert!(strategy.is_exiting());
3299 }
3300
3301 #[rstest]
3302 fn test_market_exit_sets_is_exiting_flag() {
3303 let mut strategy = create_test_strategy();
3305 register_strategy(&mut strategy);
3306
3307 assert!(!strategy.core.is_exiting);
3308
3309 strategy.core.is_exiting = true;
3311 strategy.core.market_exit_attempts = 0;
3312
3313 assert!(strategy.core.is_exiting);
3314 assert_eq!(strategy.core.market_exit_attempts, 0);
3315 }
3316
3317 #[rstest]
3318 fn test_market_exit_uses_config_time_in_force_and_reduce_only() {
3319 let config = StrategyConfig {
3320 strategy_id: Some(StrategyId::from("TEST-001")),
3321 order_id_tag: Some("001".to_string()),
3322 market_exit_time_in_force: TimeInForce::Ioc,
3323 market_exit_reduce_only: false,
3324 ..Default::default()
3325 };
3326 let strategy = TestStrategy::new(config);
3327
3328 assert_eq!(
3329 strategy.core.config.market_exit_time_in_force,
3330 TimeInForce::Ioc
3331 );
3332 assert!(!strategy.core.config.market_exit_reduce_only);
3333 }
3334
3335 #[rstest]
3336 fn test_market_exit_resets_attempt_counter() {
3337 let mut strategy = create_test_strategy();
3338 register_strategy(&mut strategy);
3339
3340 strategy.core.market_exit_attempts = 50;
3342
3343 strategy.core.reset_market_exit_state();
3345
3346 assert_eq!(strategy.core.market_exit_attempts, 0);
3347 }
3348
3349 #[rstest]
3350 fn test_market_exit_second_call_returns_early_when_exiting() {
3351 let mut strategy = create_test_strategy();
3352 register_strategy(&mut strategy);
3353
3354 strategy.core.is_exiting = true;
3356
3357 let result = strategy.market_exit();
3359 assert!(result.is_ok());
3360 assert!(strategy.core.is_exiting);
3361 }
3362
3363 #[rstest]
3364 fn test_finalize_market_exit_resets_state() {
3365 let mut strategy = create_test_strategy();
3366 register_strategy(&mut strategy);
3367
3368 strategy.core.is_exiting = true;
3370 strategy.core.pending_stop = true;
3371 strategy.core.market_exit_attempts = 50;
3372
3373 strategy.finalize_market_exit();
3374
3375 assert!(!strategy.core.is_exiting);
3376 assert!(!strategy.core.pending_stop);
3377 assert_eq!(strategy.core.market_exit_attempts, 0);
3378 }
3379
3380 #[rstest]
3381 fn test_market_exit_config_defaults() {
3382 let config = StrategyConfig::default();
3383
3384 assert!(!config.manage_stop);
3385 assert_eq!(config.market_exit_interval_ms, 100);
3386 assert_eq!(config.market_exit_max_attempts, 100);
3387 }
3388
3389 #[rstest]
3390 fn test_market_exit_with_custom_config() {
3391 let config = StrategyConfig {
3392 strategy_id: Some(StrategyId::from("TEST-001")),
3393 manage_stop: true,
3394 market_exit_interval_ms: 50,
3395 market_exit_max_attempts: 200,
3396 ..Default::default()
3397 };
3398 let strategy = TestStrategy::new(config);
3399
3400 assert!(strategy.core.config.manage_stop);
3401 assert_eq!(strategy.core.config.market_exit_interval_ms, 50);
3402 assert_eq!(strategy.core.config.market_exit_max_attempts, 200);
3403 }
3404
3405 #[derive(Debug)]
3406 struct MarketExitHookTrackingStrategy {
3407 core: StrategyCore,
3408 on_market_exit_called: bool,
3409 post_market_exit_called: bool,
3410 }
3411
3412 impl MarketExitHookTrackingStrategy {
3413 fn new(config: StrategyConfig) -> Self {
3414 Self {
3415 core: StrategyCore::new(config),
3416 on_market_exit_called: false,
3417 post_market_exit_called: false,
3418 }
3419 }
3420 }
3421
3422 impl DataActor for MarketExitHookTrackingStrategy {}
3423
3424 nautilus_strategy!(MarketExitHookTrackingStrategy, {
3425 fn on_market_exit(&mut self) {
3426 self.on_market_exit_called = true;
3427 }
3428
3429 fn post_market_exit(&mut self) {
3430 self.post_market_exit_called = true;
3431 }
3432 });
3433
3434 #[rstest]
3435 fn test_market_exit_calls_on_market_exit_hook() {
3436 let config = StrategyConfig {
3437 strategy_id: Some(StrategyId::from("TEST-001")),
3438 order_id_tag: Some("001".to_string()),
3439 ..Default::default()
3440 };
3441 let mut strategy = MarketExitHookTrackingStrategy::new(config);
3442
3443 let trader_id = TraderId::from("TRADER-001");
3444 let clock = Rc::new(RefCell::new(TestClock::new()));
3445 let cache = Rc::new(RefCell::new(Cache::default()));
3446 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3447 cache.clone(),
3448 clock.clone(),
3449 None,
3450 )));
3451 strategy
3452 .core
3453 .register(trader_id, clock, cache, portfolio)
3454 .unwrap();
3455 strategy.initialize().unwrap();
3456 strategy.start().unwrap();
3457
3458 let _ = strategy.market_exit();
3459
3460 assert!(strategy.on_market_exit_called);
3461 }
3462
3463 #[rstest]
3464 fn test_finalize_market_exit_calls_post_market_exit_hook() {
3465 let config = StrategyConfig {
3466 strategy_id: Some(StrategyId::from("TEST-001")),
3467 order_id_tag: Some("001".to_string()),
3468 ..Default::default()
3469 };
3470 let mut strategy = MarketExitHookTrackingStrategy::new(config);
3471
3472 let trader_id = TraderId::from("TRADER-001");
3473 let clock = Rc::new(RefCell::new(TestClock::new()));
3474 let cache = Rc::new(RefCell::new(Cache::default()));
3475 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3476 cache.clone(),
3477 clock.clone(),
3478 None,
3479 )));
3480 strategy
3481 .core
3482 .register(trader_id, clock, cache, portfolio)
3483 .unwrap();
3484
3485 strategy.core.is_exiting = true;
3486 strategy.finalize_market_exit();
3487
3488 assert!(strategy.post_market_exit_called);
3489 }
3490
3491 #[derive(Debug)]
3492 struct FailingPostExitStrategy {
3493 core: StrategyCore,
3494 }
3495
3496 impl FailingPostExitStrategy {
3497 fn new(config: StrategyConfig) -> Self {
3498 Self {
3499 core: StrategyCore::new(config),
3500 }
3501 }
3502 }
3503
3504 impl DataActor for FailingPostExitStrategy {}
3505
3506 nautilus_strategy!(FailingPostExitStrategy, {
3507 fn post_market_exit(&mut self) {
3508 panic!("Simulated error in post_market_exit");
3509 }
3510 });
3511
3512 #[rstest]
3513 fn test_finalize_market_exit_handles_hook_panic() {
3514 let config = StrategyConfig {
3515 strategy_id: Some(StrategyId::from("TEST-001")),
3516 order_id_tag: Some("001".to_string()),
3517 ..Default::default()
3518 };
3519 let mut strategy = FailingPostExitStrategy::new(config);
3520
3521 let trader_id = TraderId::from("TRADER-001");
3522 let clock = Rc::new(RefCell::new(TestClock::new()));
3523 let cache = Rc::new(RefCell::new(Cache::default()));
3524 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3525 cache.clone(),
3526 clock.clone(),
3527 None,
3528 )));
3529 strategy
3530 .core
3531 .register(trader_id, clock, cache, portfolio)
3532 .unwrap();
3533
3534 strategy.core.is_exiting = true;
3535 strategy.core.pending_stop = true;
3536
3537 strategy.finalize_market_exit();
3539
3540 assert!(!strategy.core.is_exiting);
3542 assert!(!strategy.core.pending_stop);
3543 }
3544
3545 #[rstest]
3546 fn test_check_market_exit_increments_attempts_before_finalizing() {
3547 let mut strategy = create_test_strategy();
3548 register_strategy(&mut strategy);
3549
3550 strategy.core.is_exiting = true;
3551 assert_eq!(strategy.core.market_exit_attempts, 0);
3552
3553 let event = TimeEvent::new(
3554 Ustr::from("MARKET_EXIT_CHECK:TEST-001"),
3555 UUID4::new(),
3556 UnixNanos::default(),
3557 UnixNanos::default(),
3558 );
3559 strategy.check_market_exit(event);
3560
3561 assert!(!strategy.core.is_exiting);
3565 assert_eq!(strategy.core.market_exit_attempts, 0);
3566 }
3567
3568 #[rstest]
3569 fn test_check_market_exit_finalizes_when_max_attempts_reached() {
3570 let config = StrategyConfig {
3571 strategy_id: Some(StrategyId::from("TEST-001")),
3572 order_id_tag: Some("001".to_string()),
3573 market_exit_max_attempts: 3,
3574 ..Default::default()
3575 };
3576 let mut strategy = TestStrategy::new(config);
3577 register_strategy(&mut strategy);
3578
3579 strategy.core.is_exiting = true;
3580 strategy.core.market_exit_attempts = 2; let event = TimeEvent::new(
3583 Ustr::from("MARKET_EXIT_CHECK:TEST-001"),
3584 UUID4::new(),
3585 UnixNanos::default(),
3586 UnixNanos::default(),
3587 );
3588 strategy.check_market_exit(event);
3589
3590 assert!(!strategy.core.is_exiting);
3592 assert_eq!(strategy.core.market_exit_attempts, 0);
3593 }
3594
3595 #[rstest]
3596 fn test_check_market_exit_finalizes_when_no_orders_or_positions() {
3597 let mut strategy = create_test_strategy();
3598 register_strategy(&mut strategy);
3599
3600 strategy.core.is_exiting = true;
3601
3602 let event = TimeEvent::new(
3603 Ustr::from("MARKET_EXIT_CHECK:TEST-001"),
3604 UUID4::new(),
3605 UnixNanos::default(),
3606 UnixNanos::default(),
3607 );
3608 strategy.check_market_exit(event);
3609
3610 assert!(!strategy.core.is_exiting);
3612 }
3613
3614 #[rstest]
3615 fn test_market_exit_timer_name_format() {
3616 let config = StrategyConfig {
3617 strategy_id: Some(StrategyId::from("MY-STRATEGY-001")),
3618 ..Default::default()
3619 };
3620 let strategy = TestStrategy::new(config);
3621
3622 assert_eq!(
3623 strategy.core.market_exit_timer_name.as_str(),
3624 "MARKET_EXIT_CHECK:MY-STRATEGY-001"
3625 );
3626 }
3627
3628 #[rstest]
3629 fn test_reset_market_exit_state() {
3630 let mut strategy = create_test_strategy();
3631
3632 strategy.core.is_exiting = true;
3633 strategy.core.pending_stop = true;
3634 strategy.core.market_exit_attempts = 50;
3635
3636 strategy.core.reset_market_exit_state();
3637
3638 assert!(!strategy.core.is_exiting);
3639 assert!(!strategy.core.pending_stop);
3640 assert_eq!(strategy.core.market_exit_attempts, 0);
3641 }
3642
3643 #[rstest]
3644 fn test_cancel_market_exit_resets_state_without_hooks() {
3645 let config = StrategyConfig {
3646 strategy_id: Some(StrategyId::from("TEST-001")),
3647 order_id_tag: Some("001".to_string()),
3648 ..Default::default()
3649 };
3650 let mut strategy = MarketExitHookTrackingStrategy::new(config);
3651
3652 let trader_id = TraderId::from("TRADER-001");
3653 let clock = Rc::new(RefCell::new(TestClock::new()));
3654 let cache = Rc::new(RefCell::new(Cache::default()));
3655 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3656 cache.clone(),
3657 clock.clone(),
3658 None,
3659 )));
3660 strategy
3661 .core
3662 .register(trader_id, clock, cache, portfolio)
3663 .unwrap();
3664
3665 strategy.core.is_exiting = true;
3667 strategy.core.pending_stop = true;
3668 strategy.core.market_exit_attempts = 50;
3669
3670 strategy.cancel_market_exit();
3672
3673 assert!(!strategy.core.is_exiting);
3675 assert!(!strategy.core.pending_stop);
3676 assert_eq!(strategy.core.market_exit_attempts, 0);
3677
3678 assert!(!strategy.on_market_exit_called);
3680 assert!(!strategy.post_market_exit_called);
3681 }
3682
3683 #[rstest]
3684 fn test_market_exit_returns_early_when_not_running() {
3685 let mut strategy = create_test_strategy();
3686 register_strategy(&mut strategy);
3687
3688 assert_ne!(strategy.core.actor.state(), ComponentState::Running);
3690
3691 let result = strategy.market_exit();
3692
3693 assert!(result.is_ok());
3695 assert!(!strategy.core.is_exiting);
3696 }
3697
3698 #[rstest]
3699 fn test_stop_with_manage_stop_false_cleans_up_active_exit() {
3700 let config = StrategyConfig {
3701 strategy_id: Some(StrategyId::from("TEST-001")),
3702 order_id_tag: Some("001".to_string()),
3703 manage_stop: false,
3704 ..Default::default()
3705 };
3706 let mut strategy = TestStrategy::new(config);
3707 register_strategy(&mut strategy);
3708
3709 strategy.core.is_exiting = true;
3711 strategy.core.market_exit_attempts = 5;
3712
3713 let should_proceed = Strategy::stop(&mut strategy);
3715
3716 assert!(should_proceed);
3718 assert!(!strategy.core.is_exiting);
3719 assert_eq!(strategy.core.market_exit_attempts, 0);
3720 }
3721
3722 #[rstest]
3723 fn test_stop_with_manage_stop_true_defers_when_running() {
3724 let config = StrategyConfig {
3725 strategy_id: Some(StrategyId::from("TEST-001")),
3726 order_id_tag: Some("001".to_string()),
3727 manage_stop: true,
3728 ..Default::default()
3729 };
3730 let mut strategy = TestStrategy::new(config);
3731
3732 let trader_id = TraderId::from("TRADER-001");
3734 let clock = Rc::new(RefCell::new(TestClock::new()));
3735 clock
3736 .borrow_mut()
3737 .register_default_handler(TimeEventCallback::from(|_event: TimeEvent| {}));
3738 let cache = Rc::new(RefCell::new(Cache::default()));
3739 let portfolio = Rc::new(RefCell::new(Portfolio::new(
3740 cache.clone(),
3741 clock.clone(),
3742 None,
3743 )));
3744 strategy
3745 .core
3746 .register(trader_id, clock, cache, portfolio)
3747 .unwrap();
3748 strategy.initialize().unwrap();
3749 strategy.start().unwrap();
3750
3751 let should_proceed = Strategy::stop(&mut strategy);
3752
3753 assert!(!should_proceed);
3755 assert!(strategy.core.pending_stop);
3756 }
3757
3758 #[rstest]
3759 fn test_stop_with_manage_stop_true_returns_early_if_pending() {
3760 let config = StrategyConfig {
3761 strategy_id: Some(StrategyId::from("TEST-001")),
3762 order_id_tag: Some("001".to_string()),
3763 manage_stop: true,
3764 ..Default::default()
3765 };
3766 let mut strategy = TestStrategy::new(config);
3767 register_strategy(&mut strategy);
3768 start_strategy(&mut strategy);
3769 strategy.core.pending_stop = true;
3770
3771 let should_proceed = Strategy::stop(&mut strategy);
3773
3774 assert!(!should_proceed);
3776 assert!(strategy.core.pending_stop);
3777 }
3778
3779 #[rstest]
3780 fn test_stop_with_manage_stop_true_proceeds_when_not_running() {
3781 let config = StrategyConfig {
3782 strategy_id: Some(StrategyId::from("TEST-001")),
3783 order_id_tag: Some("001".to_string()),
3784 manage_stop: true,
3785 ..Default::default()
3786 };
3787 let mut strategy = TestStrategy::new(config);
3788 register_strategy(&mut strategy);
3789
3790 assert_ne!(strategy.core.actor.state(), ComponentState::Running);
3792
3793 let should_proceed = Strategy::stop(&mut strategy);
3794
3795 assert!(should_proceed);
3797 }
3798
3799 #[rstest]
3800 fn test_finalize_market_exit_stops_strategy_when_pending() {
3801 let config = StrategyConfig {
3802 strategy_id: Some(StrategyId::from("TEST-001")),
3803 order_id_tag: Some("001".to_string()),
3804 ..Default::default()
3805 };
3806 let mut strategy = TestStrategy::new(config);
3807 register_strategy(&mut strategy);
3808 start_strategy(&mut strategy);
3809
3810 strategy.core.is_exiting = true;
3812 strategy.core.pending_stop = true;
3813
3814 strategy.finalize_market_exit();
3815
3816 assert_eq!(strategy.core.actor.state(), ComponentState::Stopped);
3818 assert!(!strategy.core.is_exiting);
3819 assert!(!strategy.core.pending_stop);
3820 }
3821
3822 #[rstest]
3823 fn test_finalize_market_exit_stays_running_when_not_pending() {
3824 let config = StrategyConfig {
3825 strategy_id: Some(StrategyId::from("TEST-001")),
3826 order_id_tag: Some("001".to_string()),
3827 ..Default::default()
3828 };
3829 let mut strategy = TestStrategy::new(config);
3830 register_strategy(&mut strategy);
3831 start_strategy(&mut strategy);
3832
3833 strategy.core.is_exiting = true;
3835 strategy.core.pending_stop = false;
3836
3837 strategy.finalize_market_exit();
3838
3839 assert_eq!(strategy.core.actor.state(), ComponentState::Running);
3841 assert!(!strategy.core.is_exiting);
3842 }
3843
3844 #[rstest]
3845 fn test_submit_order_denied_during_market_exit_when_not_reduce_only() {
3846 let mut strategy = create_test_strategy();
3847 register_strategy(&mut strategy);
3848 start_strategy(&mut strategy);
3849 strategy.core.is_exiting = true;
3850
3851 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
3852 get_typed_message_saving_handler(Some(Ustr::from("events.order.denied")));
3853 let order = OrderAny::Market(MarketOrder::new(
3854 TraderId::from("TRADER-001"),
3855 StrategyId::from("TEST-001"),
3856 InstrumentId::from("BTCUSDT.BINANCE"),
3857 ClientOrderId::from("O-20250208-0001"),
3858 OrderSide::Buy,
3859 Quantity::from(100_000),
3860 TimeInForce::Gtc,
3861 UUID4::new(),
3862 UnixNanos::default(),
3863 false, false,
3865 None,
3866 None,
3867 None,
3868 None,
3869 None,
3870 None,
3871 None,
3872 None,
3873 ));
3874 let topic = format!("events.order.{}", order.strategy_id());
3875 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
3876 let client_order_id = order.client_order_id();
3877 let result = strategy.submit_order(order.clone(), None, None, None);
3878
3879 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
3880
3881 assert!(result.is_ok());
3882 let cache = strategy.core.cache();
3883 let cached_order = cache.order(&client_order_id).unwrap();
3884 assert_eq!(cached_order.status(), OrderStatus::Denied);
3885
3886 let event_messages = event_messages.get_messages();
3887 assert_eq!(event_messages.len(), 2);
3888 assert_eq!(
3889 event_messages[0],
3890 OrderEventAny::Initialized(order.init_event().clone())
3891 );
3892 let OrderEventAny::Denied(denied) = &event_messages[1] else {
3893 panic!("expected OrderDenied event");
3894 };
3895 assert_eq!(denied.reason, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
3896 }
3897
3898 #[rstest]
3899 fn test_submit_order_list_denied_during_market_exit_publishes_init_then_denied_events() {
3900 let mut strategy = create_test_strategy();
3901 register_strategy(&mut strategy);
3902 start_strategy(&mut strategy);
3903 strategy.core.is_exiting = true;
3904
3905 let orders = vec![
3906 make_initialized_market_order("O-20250208-LIST-DENY-001"),
3907 make_initialized_market_order("O-20250208-LIST-DENY-002"),
3908 ];
3909 let client_order_id1 = orders[0].client_order_id();
3910 let client_order_id2 = orders[1].client_order_id();
3911 let cache_rc = strategy.core.cache_rc();
3912 let timeline = Rc::new(RefCell::new(Vec::new()));
3913 let event_messages = Rc::new(RefCell::new(Vec::new()));
3914
3915 let event_handler = {
3916 let event_messages = event_messages.clone();
3917 let timeline = timeline.clone();
3918 TypedHandler::from_with_id("events.order.list_denied", move |event: &OrderEventAny| {
3919 match event {
3920 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id1 => {
3921 assert!(cache_rc.borrow().order_exists(&client_order_id1));
3922 timeline.borrow_mut().push("init1");
3923 }
3924 OrderEventAny::Initialized(e) if e.client_order_id == client_order_id2 => {
3925 assert!(cache_rc.borrow().order_exists(&client_order_id2));
3926 timeline.borrow_mut().push("init2");
3927 }
3928 OrderEventAny::Denied(e) if e.client_order_id == client_order_id1 => {
3929 assert_eq!(e.reason, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
3930 let cache = cache_rc.borrow();
3931 let cached_order = cache.order(&client_order_id1).unwrap();
3932 assert_eq!(cached_order.status(), OrderStatus::Denied);
3933 timeline.borrow_mut().push("denied1");
3934 }
3935 OrderEventAny::Denied(e) if e.client_order_id == client_order_id2 => {
3936 assert_eq!(e.reason, Ustr::from("MARKET_EXIT_IN_PROGRESS"));
3937 let cache = cache_rc.borrow();
3938 let cached_order = cache.order(&client_order_id2).unwrap();
3939 assert_eq!(cached_order.status(), OrderStatus::Denied);
3940 timeline.borrow_mut().push("denied2");
3941 }
3942 _ => panic!("unexpected order event {event:?}"),
3943 }
3944 event_messages.borrow_mut().push(event.clone());
3945 })
3946 };
3947 let risk_handler = {
3948 let timeline = timeline.clone();
3949 TypedIntoHandler::from_with_id(
3950 "RiskEngine.queue_execute",
3951 move |_command: TradingCommand| {
3952 timeline.borrow_mut().push("command");
3953 },
3954 )
3955 };
3956 msgbus::register_trading_command_endpoint(
3957 MessagingSwitchboard::risk_engine_queue_execute(),
3958 risk_handler,
3959 );
3960
3961 let topic = format!("events.order.{}", orders[0].strategy_id());
3962 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
3963 let result = strategy.submit_order_list(orders.clone(), None, None, None);
3964
3965 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
3966
3967 assert!(result.is_ok());
3968
3969 {
3970 let cache = strategy.core.cache();
3971 let cached_order1 = cache.order(&client_order_id1).unwrap();
3972 let cached_order2 = cache.order(&client_order_id2).unwrap();
3973 assert_eq!(cached_order1.status(), OrderStatus::Denied);
3974 assert_eq!(cached_order2.status(), OrderStatus::Denied);
3975 }
3976
3977 let event_messages = event_messages.borrow();
3978 assert_eq!(event_messages.len(), 4);
3979 assert_eq!(
3980 event_messages[0],
3981 OrderEventAny::Initialized(orders[0].init_event().clone())
3982 );
3983 assert!(matches!(
3984 &event_messages[1],
3985 OrderEventAny::Denied(e)
3986 if e.client_order_id == client_order_id1
3987 && e.reason == Ustr::from("MARKET_EXIT_IN_PROGRESS")
3988 ));
3989 assert_eq!(
3990 event_messages[2],
3991 OrderEventAny::Initialized(orders[1].init_event().clone())
3992 );
3993 assert!(matches!(
3994 &event_messages[3],
3995 OrderEventAny::Denied(e)
3996 if e.client_order_id == client_order_id2
3997 && e.reason == Ustr::from("MARKET_EXIT_IN_PROGRESS")
3998 ));
3999 assert_eq!(
4000 timeline.borrow().as_slice(),
4001 &["init1", "denied1", "init2", "denied2"]
4002 );
4003 }
4004
4005 #[rstest]
4006 fn test_submit_order_list_market_exit_rejects_non_initialized_without_events() {
4007 let mut strategy = create_test_strategy();
4008 register_strategy(&mut strategy);
4009 start_strategy(&mut strategy);
4010 strategy.core.is_exiting = true;
4011
4012 let order = make_accepted_market_order("O-20250208-LIST-DENY-ACCEPTED");
4013 let topic = format!("events.order.{}", order.strategy_id());
4014 let (event_handler, event_messages): (_, TypedMessageSavingHandler<OrderEventAny>) =
4015 get_typed_message_saving_handler(Some(Ustr::from("events.order.list_invalid")));
4016
4017 msgbus::subscribe_order_events(topic.clone().into(), event_handler.clone(), None);
4018 let result = strategy.submit_order_list(vec![order], None, None, None);
4019
4020 msgbus::unsubscribe_order_events(topic.into(), &event_handler);
4021
4022 assert!(result.is_err());
4023 assert!(
4024 result
4025 .unwrap_err()
4026 .to_string()
4027 .contains("expected INITIALIZED")
4028 );
4029 assert!(event_messages.get_messages().is_empty());
4030 }
4031
4032 #[rstest]
4033 fn test_submit_order_list_rejects_mixed_venues_with_friendly_error() {
4034 let mut strategy = create_test_strategy();
4035 register_strategy(&mut strategy);
4036 start_strategy(&mut strategy);
4037
4038 let binance_order = make_initialized_market_order("O-MIXED-VENUE-001");
4039 let bybit_order = OrderAny::Market(MarketOrder::new(
4040 TraderId::from("TRADER-001"),
4041 StrategyId::from("TEST-001"),
4042 InstrumentId::from("BTCUSDT.BYBIT"),
4043 ClientOrderId::from("O-MIXED-VENUE-002"),
4044 OrderSide::Buy,
4045 Quantity::from(100_000),
4046 TimeInForce::Gtc,
4047 UUID4::new(),
4048 UnixNanos::default(),
4049 false,
4050 false,
4051 None,
4052 None,
4053 None,
4054 None,
4055 None,
4056 None,
4057 None,
4058 None,
4059 ));
4060
4061 let result = strategy.submit_order_list(vec![binance_order, bybit_order], None, None, None);
4062
4063 let err = result.unwrap_err();
4064 let msg = err.to_string();
4065 assert!(
4066 msg.contains("OrderList denied: orders must share the same venue"),
4067 "unexpected error: {msg}",
4068 );
4069 assert!(msg.contains("BINANCE"), "expected BINANCE in error: {msg}");
4070 assert!(msg.contains("BYBIT"), "expected BYBIT in error: {msg}");
4071 }
4072
4073 #[rstest]
4074 fn test_submit_order_allowed_during_market_exit_when_reduce_only() {
4075 let mut strategy = create_test_strategy();
4076 register_strategy(&mut strategy);
4077 start_strategy(&mut strategy);
4078 strategy.core.is_exiting = true;
4079
4080 let order = OrderAny::Market(MarketOrder::new(
4081 TraderId::from("TRADER-001"),
4082 StrategyId::from("TEST-001"),
4083 InstrumentId::from("BTCUSDT.BINANCE"),
4084 ClientOrderId::from("O-20250208-0001"),
4085 OrderSide::Buy,
4086 Quantity::from(100_000),
4087 TimeInForce::Gtc,
4088 UUID4::new(),
4089 UnixNanos::default(),
4090 true, false,
4092 None,
4093 None,
4094 None,
4095 None,
4096 None,
4097 None,
4098 None,
4099 None,
4100 ));
4101 let client_order_id = order.client_order_id();
4102 let result = strategy.submit_order(order, None, None, None);
4103
4104 assert!(result.is_ok());
4105 let cache = strategy.core.cache();
4106 let cached_order = cache.order(&client_order_id).unwrap();
4107 assert_ne!(cached_order.status(), OrderStatus::Denied);
4108 }
4109
4110 #[rstest]
4111 fn test_submit_order_allowed_during_market_exit_when_tagged() {
4112 let mut strategy = create_test_strategy();
4113 register_strategy(&mut strategy);
4114 start_strategy(&mut strategy);
4115 strategy.core.is_exiting = true;
4116
4117 let order = OrderAny::Market(MarketOrder::new(
4118 TraderId::from("TRADER-001"),
4119 StrategyId::from("TEST-001"),
4120 InstrumentId::from("BTCUSDT.BINANCE"),
4121 ClientOrderId::from("O-20250208-0002"),
4122 OrderSide::Buy,
4123 Quantity::from(100_000),
4124 TimeInForce::Gtc,
4125 UUID4::new(),
4126 UnixNanos::default(),
4127 false, false,
4129 None,
4130 None,
4131 None,
4132 None,
4133 None,
4134 None,
4135 None,
4136 Some(vec![Ustr::from("MARKET_EXIT")]),
4137 ));
4138 let client_order_id = order.client_order_id();
4139 let result = strategy.submit_order(order, None, None, None);
4140
4141 assert!(result.is_ok());
4142 let cache = strategy.core.cache();
4143 let cached_order = cache.order(&client_order_id).unwrap();
4144 assert_ne!(cached_order.status(), OrderStatus::Denied);
4145 }
4146
4147 #[derive(Debug)]
4148 struct MacroTestSimple {
4149 core: StrategyCore,
4150 }
4151
4152 nautilus_strategy!(MacroTestSimple);
4153
4154 impl DataActor for MacroTestSimple {}
4155
4156 #[derive(Debug)]
4157 struct MacroTestWithHooks {
4158 core: StrategyCore,
4159 }
4160
4161 nautilus_strategy!(MacroTestWithHooks, {
4162 fn on_order_rejected(&mut self, _event: OrderRejected) {}
4163 });
4164
4165 impl DataActor for MacroTestWithHooks {}
4166
4167 #[derive(Debug)]
4168 struct MacroTestCustomField {
4169 inner: StrategyCore,
4170 }
4171
4172 nautilus_strategy!(MacroTestCustomField, inner, {
4173 fn external_order_claims(&self) -> Option<Vec<InstrumentId>> {
4174 None
4175 }
4176 });
4177
4178 impl DataActor for MacroTestCustomField {}
4179
4180 #[rstest]
4181 fn test_nautilus_strategy_macro_forms() {
4182 let config = StrategyConfig {
4183 strategy_id: Some(StrategyId::from("MACRO-001")),
4184 order_id_tag: Some("001".to_string()),
4185 ..Default::default()
4186 };
4187
4188 let simple = MacroTestSimple {
4189 core: StrategyCore::new(config.clone()),
4190 };
4191 assert_eq!(simple.core().config.strategy_id, config.strategy_id);
4192
4193 let hooks = MacroTestWithHooks {
4194 core: StrategyCore::new(config.clone()),
4195 };
4196 assert_eq!(hooks.core().config.strategy_id, config.strategy_id);
4197
4198 let custom = MacroTestCustomField {
4199 inner: StrategyCore::new(config.clone()),
4200 };
4201 assert_eq!(custom.core().config.strategy_id, config.strategy_id);
4202 assert!(custom.external_order_claims().is_none());
4203 }
4204}