1pub mod config;
39pub mod core;
40pub mod twap;
41
42pub use core::{ExecutionAlgorithmCore, StrategyEventHandlers};
43
44pub use config::{ExecutionAlgorithmConfig, ImportableExecAlgorithmConfig};
45use nautilus_common::{
46 actor::{DataActor, registry::try_get_actor_unchecked},
47 enums::ComponentState,
48 logging::{CMD, EVT, RECV, SEND},
49 messages::execution::{CancelOrder, ModifyOrder, SubmitOrder, TradingCommand},
50 msgbus::{self, MessagingSwitchboard, TypedHandler},
51 timer::TimeEvent,
52};
53use nautilus_core::{UUID4, UnixNanos};
54use nautilus_model::{
55 enums::{OrderStatus, TimeInForce, TriggerType},
56 events::{
57 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied, OrderEmulated,
58 OrderEventAny, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
59 OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted,
60 OrderTriggered, OrderUpdated, PositionChanged, PositionClosed, PositionEvent,
61 PositionOpened,
62 },
63 identifiers::{AccountId, ClientId, ExecAlgorithmId, PositionId, StrategyId, TraderId},
64 orders::{LimitOrder, MarketOrder, MarketToLimitOrder, Order, OrderAny, OrderError, OrderList},
65 types::{Price, Quantity},
66};
67pub use twap::{TwapAlgorithm, TwapAlgorithmConfig};
68use ustr::Ustr;
69
70pub trait ExecutionAlgorithm: DataActor {
89 fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore;
91
92 fn id(&mut self) -> ExecAlgorithmId {
94 self.core_mut().exec_algorithm_id
95 }
96
97 fn execute(&mut self, command: TradingCommand) -> anyhow::Result<()>
108 where
109 Self: 'static + std::fmt::Debug + Sized,
110 {
111 let core = self.core_mut();
112 if core.config.log_commands {
113 let id = &core.actor.actor_id;
114 log::info!("{id} {RECV}{CMD} {command:?}");
115 }
116
117 if core.state() != ComponentState::Running {
118 return Ok(());
119 }
120
121 match command {
122 TradingCommand::SubmitOrder(cmd) => {
123 self.subscribe_to_strategy_events(cmd.strategy_id);
124 let order = self.core_mut().get_order(&cmd.client_order_id)?;
125 self.on_order(order)
126 }
127 TradingCommand::SubmitOrderList(cmd) => {
128 self.subscribe_to_strategy_events(cmd.strategy_id);
129 let orders = self.core_mut().get_orders_for_list(&cmd.order_list)?;
130 self.on_order_list(cmd.order_list, orders)
131 }
132 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(cmd),
133 _ => {
134 log::warn!("Unhandled command type: {command:?}");
135 Ok(())
136 }
137 }
138 }
139
140 fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()>;
148
149 fn on_order_list(
158 &mut self,
159 _order_list: OrderList,
160 orders: Vec<OrderAny>,
161 ) -> anyhow::Result<()> {
162 for order in orders {
163 self.on_order(order)?;
164 }
165 Ok(())
166 }
167
168 fn handle_cancel_order(&mut self, command: CancelOrder) -> anyhow::Result<()> {
177 let (order, is_pending_cancel) = {
178 let cache = self.core_mut().cache();
179
180 let Some(order) = cache.order(&command.client_order_id) else {
181 log::warn!(
182 "Cannot cancel order: {} not found in cache",
183 command.client_order_id
184 );
185 return Ok(());
186 };
187
188 let is_pending = cache.is_order_pending_cancel_local(&command.client_order_id);
189 (order.clone(), is_pending)
190 };
191
192 if is_pending_cancel {
193 return Ok(());
194 }
195
196 if order.is_closed() {
197 log::warn!("Order already closed for {command:?}");
198 return Ok(());
199 }
200
201 let event = OrderEventAny::Canceled(self.generate_order_canceled(&order));
202
203 let order = {
204 let cache_rc = self.core_mut().cache_rc();
205 let mut cache = cache_rc.borrow_mut();
206 match cache.update_order(&event) {
207 Ok(order) => order,
208 Err(e)
209 if matches!(
210 e.downcast_ref::<OrderError>(),
211 Some(OrderError::InvalidStateTransition)
212 ) =>
213 {
214 log::warn!("InvalidStateTrigger: {e}, did not apply cancel event");
215 return Ok(());
216 }
217 Err(e) => return Err(e),
218 }
219 };
220
221 let topic = format!("events.order.{}", order.strategy_id());
222 msgbus::publish_order_event(topic.into(), &event);
223
224 Ok(())
225 }
226
227 fn generate_order_canceled(&mut self, order: &OrderAny) -> OrderCanceled {
229 let ts_now = self.core_mut().clock().timestamp_ns();
230
231 OrderCanceled::new(
232 order.trader_id(),
233 order.strategy_id(),
234 order.instrument_id(),
235 order.client_order_id(),
236 UUID4::new(),
237 ts_now,
238 ts_now,
239 false, order.venue_order_id(),
241 order.account_id(),
242 )
243 }
244
245 fn generate_order_pending_update(&mut self, order: &OrderAny) -> OrderPendingUpdate {
247 let ts_now = self.core_mut().clock().timestamp_ns();
248
249 OrderPendingUpdate::new(
250 order.trader_id(),
251 order.strategy_id(),
252 order.instrument_id(),
253 order.client_order_id(),
254 order
255 .account_id()
256 .expect("Order must have account_id for pending update"),
257 UUID4::new(),
258 ts_now,
259 ts_now,
260 false, order.venue_order_id(),
262 )
263 }
264
265 fn generate_order_pending_cancel(&mut self, order: &OrderAny) -> OrderPendingCancel {
267 let ts_now = self.core_mut().clock().timestamp_ns();
268
269 OrderPendingCancel::new(
270 order.trader_id(),
271 order.strategy_id(),
272 order.instrument_id(),
273 order.client_order_id(),
274 order
275 .account_id()
276 .expect("Order must have account_id for pending cancel"),
277 UUID4::new(),
278 ts_now,
279 ts_now,
280 false, order.venue_order_id(),
282 )
283 }
284
285 fn spawn_market(
298 &mut self,
299 primary: &mut OrderAny,
300 quantity: Quantity,
301 time_in_force: TimeInForce,
302 reduce_only: bool,
303 tags: Option<Vec<Ustr>>,
304 reduce_primary: bool,
305 ) -> MarketOrder {
306 let core = self.core_mut();
308 let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
309 let ts_init = core.clock().timestamp_ns();
310 let exec_algorithm_id = core.exec_algorithm_id;
311
312 if reduce_primary {
313 self.reduce_primary_order(primary, quantity);
314 self.core_mut()
315 .track_pending_spawn_reduction(client_order_id, quantity);
316 }
317
318 MarketOrder::new(
319 primary.trader_id(),
320 primary.strategy_id(),
321 primary.instrument_id(),
322 client_order_id,
323 primary.order_side(),
324 quantity,
325 time_in_force,
326 UUID4::new(),
327 ts_init,
328 reduce_only,
329 primary.is_quote_quantity(),
330 primary.contingency_type(),
331 primary.order_list_id(),
332 primary.linked_order_ids().map(|ids| ids.to_vec()),
333 primary.parent_order_id(),
334 Some(exec_algorithm_id),
335 primary.exec_algorithm_params().cloned(),
336 Some(primary.client_order_id()),
337 tags.or_else(|| primary.tags().map(|t| t.to_vec())),
338 )
339 }
340
341 #[expect(clippy::too_many_arguments)]
354 fn spawn_limit(
355 &mut self,
356 primary: &mut OrderAny,
357 quantity: Quantity,
358 price: Price,
359 time_in_force: TimeInForce,
360 expire_time: Option<UnixNanos>,
361 post_only: bool,
362 reduce_only: bool,
363 display_qty: Option<Quantity>,
364 emulation_trigger: Option<TriggerType>,
365 tags: Option<Vec<Ustr>>,
366 reduce_primary: bool,
367 ) -> LimitOrder {
368 let core = self.core_mut();
370 let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
371 let ts_init = core.clock().timestamp_ns();
372 let exec_algorithm_id = core.exec_algorithm_id;
373
374 if reduce_primary {
375 self.reduce_primary_order(primary, quantity);
376 self.core_mut()
377 .track_pending_spawn_reduction(client_order_id, quantity);
378 }
379
380 LimitOrder::new(
381 primary.trader_id(),
382 primary.strategy_id(),
383 primary.instrument_id(),
384 client_order_id,
385 primary.order_side(),
386 quantity,
387 price,
388 time_in_force,
389 expire_time,
390 post_only,
391 reduce_only,
392 primary.is_quote_quantity(),
393 display_qty,
394 emulation_trigger,
395 None, primary.contingency_type(),
397 primary.order_list_id(),
398 primary.linked_order_ids().map(|ids| ids.to_vec()),
399 primary.parent_order_id(),
400 Some(exec_algorithm_id),
401 primary.exec_algorithm_params().cloned(),
402 Some(primary.client_order_id()),
403 tags.or_else(|| primary.tags().map(|t| t.to_vec())),
404 UUID4::new(),
405 ts_init,
406 )
407 }
408
409 #[expect(clippy::too_many_arguments)]
422 fn spawn_market_to_limit(
423 &mut self,
424 primary: &mut OrderAny,
425 quantity: Quantity,
426 time_in_force: TimeInForce,
427 expire_time: Option<UnixNanos>,
428 reduce_only: bool,
429 display_qty: Option<Quantity>,
430 emulation_trigger: Option<TriggerType>,
431 tags: Option<Vec<Ustr>>,
432 reduce_primary: bool,
433 ) -> MarketToLimitOrder {
434 let core = self.core_mut();
436 let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
437 let ts_init = core.clock().timestamp_ns();
438 let exec_algorithm_id = core.exec_algorithm_id;
439
440 if reduce_primary {
441 self.reduce_primary_order(primary, quantity);
442 self.core_mut()
443 .track_pending_spawn_reduction(client_order_id, quantity);
444 }
445
446 let mut order = MarketToLimitOrder::new(
447 primary.trader_id(),
448 primary.strategy_id(),
449 primary.instrument_id(),
450 client_order_id,
451 primary.order_side(),
452 quantity,
453 time_in_force,
454 expire_time,
455 false, reduce_only,
457 primary.is_quote_quantity(),
458 display_qty,
459 primary.contingency_type(),
460 primary.order_list_id(),
461 primary.linked_order_ids().map(|ids| ids.to_vec()),
462 primary.parent_order_id(),
463 Some(exec_algorithm_id),
464 primary.exec_algorithm_params().cloned(),
465 Some(primary.client_order_id()),
466 tags.or_else(|| primary.tags().map(|t| t.to_vec())),
467 UUID4::new(),
468 ts_init,
469 );
470
471 if emulation_trigger.is_some() {
472 order.set_emulation_trigger(emulation_trigger);
473 }
474
475 order
476 }
477
478 fn reduce_primary_order(&mut self, primary: &mut OrderAny, spawn_qty: Quantity) {
487 let leaves_qty = primary.leaves_qty();
488 assert!(
489 leaves_qty >= spawn_qty,
490 "Spawn quantity {spawn_qty} exceeds primary leaves_qty {leaves_qty}"
491 );
492
493 let primary_qty = primary.quantity();
494 let new_qty = Quantity::from_raw(primary_qty.raw - spawn_qty.raw, primary_qty.precision);
495
496 let core = self.core_mut();
497 let ts_now = core.clock().timestamp_ns();
498
499 let updated = OrderUpdated::new(
500 primary.trader_id(),
501 primary.strategy_id(),
502 primary.instrument_id(),
503 primary.client_order_id(),
504 new_qty,
505 UUID4::new(),
506 ts_now,
507 ts_now,
508 false, primary.venue_order_id(),
510 primary.account_id(),
511 None, None, None, primary.is_quote_quantity(),
515 );
516
517 let event = OrderEventAny::Updated(updated);
518
519 {
520 let cache_rc = core.cache_rc();
521 let mut cache = cache_rc.borrow_mut();
522 *primary = cache
523 .update_order(&event)
524 .expect("Failed to update order in cache");
525 }
526
527 publish_order_event(&event);
528 }
529
530 fn restore_primary_order_quantity(&mut self, order: &OrderAny) {
536 let Some(exec_spawn_id) = order.exec_spawn_id() else {
537 return;
538 };
539
540 let reduction_qty = {
541 let core = self.core_mut();
542 core.take_pending_spawn_reduction(&order.client_order_id())
543 };
544
545 let Some(reduction_qty) = reduction_qty else {
546 return;
547 };
548
549 let primary = {
550 let cache = self.core_mut().cache();
551 cache.order(&exec_spawn_id).map(|o| o.clone())
552 };
553
554 let Some(primary) = primary else {
555 log::warn!(
556 "Cannot restore primary order quantity: primary order {exec_spawn_id} not found",
557 );
558 return;
559 };
560
561 let restore_raw = std::cmp::min(reduction_qty.raw, order.leaves_qty().raw);
563 if restore_raw == 0 {
564 return;
565 }
566
567 let restored_qty = Quantity::from_raw(
568 primary.quantity().raw + restore_raw,
569 primary.quantity().precision,
570 );
571
572 let core = self.core_mut();
573 let ts_now = core.clock().timestamp_ns();
574
575 let updated = OrderUpdated::new(
576 primary.trader_id(),
577 primary.strategy_id(),
578 primary.instrument_id(),
579 primary.client_order_id(),
580 restored_qty,
581 UUID4::new(),
582 ts_now,
583 ts_now,
584 false, primary.venue_order_id(),
586 primary.account_id(),
587 None, None, None, primary.is_quote_quantity(),
591 );
592
593 let event = OrderEventAny::Updated(updated);
594
595 let primary = {
596 let cache_rc = core.cache_rc();
597 let mut cache = cache_rc.borrow_mut();
598 match cache.update_order(&event) {
599 Ok(primary) => primary,
600 Err(e) => {
601 log::warn!("Failed to update primary order in cache: {e}");
602 return;
603 }
604 }
605 };
606
607 publish_order_event(&event);
608
609 log::info!(
610 "Restored primary order {} quantity to {} after spawned order {} was denied/rejected",
611 primary.client_order_id(),
612 restored_qty,
613 order.client_order_id()
614 );
615 }
616
617 fn submit_order(
623 &mut self,
624 order: OrderAny,
625 position_id: Option<PositionId>,
626 client_id: Option<ClientId>,
627 ) -> anyhow::Result<()> {
628 let core = self.core_mut();
629
630 let trader_id = registered_trader_id(core)?;
631 let ts_init = core.clock().timestamp_ns();
632
633 let strategy_id = order.strategy_id();
635
636 let order_exists = {
637 let cache = core.cache();
638 cache.order_exists(&order.client_order_id())
639 };
640
641 {
642 let cache_rc = core.cache_rc();
643 let mut cache = cache_rc.borrow_mut();
644 cache.add_order(order.clone(), position_id, client_id, true)?;
645 }
646
647 if !order_exists {
648 publish_order_initialized(&order);
649 }
650
651 let command = SubmitOrder::new(
652 trader_id,
653 client_id,
654 strategy_id,
655 order.instrument_id(),
656 order.client_order_id(),
657 order.init_event().clone(),
658 order.exec_algorithm_id(),
659 position_id,
660 None, UUID4::new(),
662 ts_init,
663 None, );
665
666 if core.config.log_commands {
667 let id = &core.actor.actor_id;
668 log::info!("{id} {SEND}{CMD} {command:?}");
669 }
670
671 msgbus::send_trading_command(
672 MessagingSwitchboard::risk_engine_execute(),
673 TradingCommand::SubmitOrder(command),
674 );
675
676 Ok(())
677 }
678
679 fn modify_order(
685 &mut self,
686 order: &mut OrderAny,
687 quantity: Option<Quantity>,
688 price: Option<Price>,
689 trigger_price: Option<Price>,
690 client_id: Option<ClientId>,
691 ) -> anyhow::Result<()> {
692 let qty_changing = quantity.is_some_and(|q| q != order.quantity());
693 let price_changing = price.is_some() && price != order.price();
694 let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
695
696 if !qty_changing && !price_changing && !trigger_changing {
697 log::error!(
698 "Cannot create command ModifyOrder: \
699 quantity, price and trigger were either None \
700 or the same as existing values"
701 );
702 return Ok(());
703 }
704
705 if order.is_closed() || order.is_pending_cancel() {
706 log::warn!(
707 "Cannot create command ModifyOrder: state is {:?}, {order:?}",
708 order.status()
709 );
710 return Ok(());
711 }
712
713 let core = self.core_mut();
714 let trader_id = registered_trader_id(core)?;
715 let strategy_id = order.strategy_id();
716
717 if !order.is_active_local() {
718 required_account_id(order, "pending update")?;
719 let event = self.generate_order_pending_update(order);
720 let event = OrderEventAny::PendingUpdate(event);
721
722 {
723 let cache_rc = self.core_mut().cache_rc();
724 let mut cache = cache_rc.borrow_mut();
725 match cache.update_order(&event) {
726 Ok(updated) => *order = updated,
727 Err(e)
728 if matches!(
729 e.downcast_ref::<OrderError>(),
730 Some(OrderError::InvalidStateTransition)
731 ) =>
732 {
733 log::warn!("InvalidStateTrigger: {e}, did not apply pending update event");
734 return Ok(());
735 }
736 Err(e) => return Err(e),
737 }
738 }
739
740 let topic = format!("events.order.{strategy_id}");
741 msgbus::publish_order_event(topic.into(), &event);
742 }
743
744 let ts_init = self.core_mut().clock().timestamp_ns();
745 let command = ModifyOrder::new(
746 trader_id,
747 client_id,
748 strategy_id,
749 order.instrument_id(),
750 order.client_order_id(),
751 order.venue_order_id(),
752 quantity,
753 price,
754 trigger_price,
755 UUID4::new(),
756 ts_init,
757 None, None, );
760
761 if self.core_mut().config.log_commands {
762 let id = &self.core_mut().actor.actor_id;
763 log::info!("{id} {SEND}{CMD} {command:?}");
764 }
765
766 let has_emulation_trigger = order
767 .emulation_trigger()
768 .is_some_and(|t| t != TriggerType::NoTrigger);
769
770 if order.is_emulated() || has_emulation_trigger {
771 msgbus::send_trading_command(
772 MessagingSwitchboard::order_emulator_execute(),
773 TradingCommand::ModifyOrder(command),
774 );
775 } else {
776 msgbus::send_trading_command(
777 MessagingSwitchboard::risk_engine_execute(),
778 TradingCommand::ModifyOrder(command),
779 );
780 }
781
782 Ok(())
783 }
784
785 fn modify_order_in_place(
797 &mut self,
798 order: &mut OrderAny,
799 quantity: Option<Quantity>,
800 price: Option<Price>,
801 trigger_price: Option<Price>,
802 ) -> anyhow::Result<()> {
803 let status = order.status();
805 if status != OrderStatus::Initialized && status != OrderStatus::Released {
806 anyhow::bail!(
807 "Cannot modify order in place: status is {status:?}, expected INITIALIZED or RELEASED"
808 );
809 }
810
811 if price.is_some() && order.price().is_none() {
813 anyhow::bail!(
814 "Cannot modify order in place: {} orders do not have a LIMIT price",
815 order.order_type()
816 );
817 }
818
819 if trigger_price.is_some() && order.trigger_price().is_none() {
820 anyhow::bail!(
821 "Cannot modify order in place: {} orders do not have a STOP trigger price",
822 order.order_type()
823 );
824 }
825
826 let qty_changing = quantity.is_some_and(|q| q != order.quantity());
828 let price_changing = price.is_some() && price != order.price();
829 let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
830
831 if !qty_changing && !price_changing && !trigger_changing {
832 anyhow::bail!("Cannot modify order in place: no parameters differ from current values");
833 }
834
835 let core = self.core_mut();
836 let ts_now = core.clock().timestamp_ns();
837
838 let updated = OrderUpdated::new(
839 order.trader_id(),
840 order.strategy_id(),
841 order.instrument_id(),
842 order.client_order_id(),
843 quantity.unwrap_or_else(|| order.quantity()),
844 UUID4::new(),
845 ts_now,
846 ts_now,
847 false, order.venue_order_id(),
849 order.account_id(),
850 price,
851 trigger_price,
852 None, order.is_quote_quantity(),
854 );
855
856 let event = OrderEventAny::Updated(updated);
857
858 {
859 let cache_rc = core.cache_rc();
860 let mut cache = cache_rc.borrow_mut();
861 *order = cache.update_order(&event)?;
862 }
863
864 publish_order_event(&event);
865
866 Ok(())
867 }
868
869 fn cancel_order(
875 &mut self,
876 order: &mut OrderAny,
877 client_id: Option<ClientId>,
878 ) -> anyhow::Result<()> {
879 if order.is_closed() || order.is_pending_cancel() {
880 log::warn!(
881 "Cannot cancel order: state is {:?}, {order:?}",
882 order.status()
883 );
884 return Ok(());
885 }
886
887 let core = self.core_mut();
888 let trader_id = registered_trader_id(core)?;
889 let strategy_id = order.strategy_id();
890
891 if !order.is_active_local() {
892 required_account_id(order, "pending cancel")?;
893 let event = self.generate_order_pending_cancel(order);
894 let event = OrderEventAny::PendingCancel(event);
895
896 {
897 let cache_rc = self.core_mut().cache_rc();
898 let mut cache = cache_rc.borrow_mut();
899 match cache.update_order(&event) {
900 Ok(updated) => *order = updated,
901 Err(e)
902 if matches!(
903 e.downcast_ref::<OrderError>(),
904 Some(OrderError::InvalidStateTransition)
905 ) =>
906 {
907 log::warn!("InvalidStateTrigger: {e}, did not apply pending cancel event");
908 return Ok(());
909 }
910 Err(e) => return Err(e),
911 }
912 }
913
914 let topic = format!("events.order.{strategy_id}");
915 msgbus::publish_order_event(topic.into(), &event);
916 }
917
918 let ts_init = self.core_mut().clock().timestamp_ns();
919 let command = CancelOrder::new(
920 trader_id,
921 client_id,
922 strategy_id,
923 order.instrument_id(),
924 order.client_order_id(),
925 order.venue_order_id(),
926 UUID4::new(),
927 ts_init,
928 None, None, );
931
932 if self.core_mut().config.log_commands {
933 let id = &self.core_mut().actor.actor_id;
934 log::info!("{id} {SEND}{CMD} {command:?}");
935 }
936
937 let has_emulation_trigger = order
938 .emulation_trigger()
939 .is_some_and(|t| t != TriggerType::NoTrigger);
940
941 if order.is_emulated() || order.status() == OrderStatus::Released || has_emulation_trigger {
942 msgbus::send_trading_command(
943 MessagingSwitchboard::order_emulator_execute(),
944 TradingCommand::CancelOrder(command),
945 );
946 } else {
947 msgbus::send_trading_command(
948 MessagingSwitchboard::exec_engine_execute(),
949 TradingCommand::CancelOrder(command),
950 );
951 }
952
953 Ok(())
954 }
955
956 fn subscribe_to_strategy_events(&mut self, strategy_id: StrategyId)
960 where
961 Self: 'static + std::fmt::Debug + Sized,
962 {
963 let core = self.core_mut();
964 if core.is_strategy_subscribed(&strategy_id) {
965 return;
966 }
967
968 let actor_id = core.actor.actor_id.inner();
969
970 let order_topic = format!("events.order.{strategy_id}");
971 let order_actor_id = actor_id;
972 let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
973 if let Some(mut algo) = try_get_actor_unchecked::<Self>(&order_actor_id) {
974 algo.handle_order_event(event.clone());
975 } else {
976 log::error!(
977 "ExecutionAlgorithm {order_actor_id} not found for order event handling"
978 );
979 }
980 });
981 msgbus::subscribe_order_events(order_topic.clone().into(), order_handler.clone(), None);
982
983 let position_topic = format!("events.position.{strategy_id}");
984 let position_handler = TypedHandler::from(move |event: &PositionEvent| {
985 if let Some(mut algo) = try_get_actor_unchecked::<Self>(&actor_id) {
986 algo.handle_position_event(event.clone());
987 } else {
988 log::error!("ExecutionAlgorithm {actor_id} not found for position event handling");
989 }
990 });
991 msgbus::subscribe_position_events(
992 position_topic.clone().into(),
993 position_handler.clone(),
994 None,
995 );
996
997 let handlers = StrategyEventHandlers {
998 order_topic,
999 order_handler,
1000 position_topic,
1001 position_handler,
1002 };
1003 core.store_strategy_event_handlers(strategy_id, handlers);
1004
1005 core.add_subscribed_strategy(strategy_id);
1006 log::info!("Subscribed to events for strategy {strategy_id}");
1007 }
1008
1009 fn unsubscribe_all_strategy_events(&mut self) {
1013 let handlers = self.core_mut().take_strategy_event_handlers();
1014 for (strategy_id, h) in handlers {
1015 msgbus::unsubscribe_order_events(h.order_topic.into(), &h.order_handler);
1016 msgbus::unsubscribe_position_events(h.position_topic.into(), &h.position_handler);
1017 log::info!("Unsubscribed from events for strategy {strategy_id}");
1018 }
1019 self.core_mut().clear_subscribed_strategies();
1020 }
1021
1022 fn handle_order_event(&mut self, event: OrderEventAny) {
1024 if self.core_mut().state() != ComponentState::Running {
1025 return;
1026 }
1027
1028 let order = {
1029 let cache = self.core_mut().cache();
1030 cache.order(&event.client_order_id()).map(|o| o.clone())
1031 };
1032
1033 let Some(order) = order else {
1034 return;
1035 };
1036
1037 let Some(order_algo_id) = order.exec_algorithm_id() else {
1038 return;
1039 };
1040
1041 if order_algo_id != self.id() {
1042 return;
1043 }
1044
1045 {
1046 let core = self.core_mut();
1047 if core.config.log_events {
1048 let id = &core.actor.actor_id;
1049 log::info!("{id} {RECV}{EVT} {event}");
1050 }
1051 }
1052
1053 match &event {
1054 OrderEventAny::Initialized(e) => self.on_order_initialized(e.clone()),
1055 OrderEventAny::Denied(e) => {
1056 self.restore_primary_order_quantity(&order);
1057 self.on_order_denied(*e);
1058 }
1059 OrderEventAny::Emulated(e) => self.on_order_emulated(*e),
1060 OrderEventAny::Released(e) => self.on_order_released(*e),
1061 OrderEventAny::Submitted(e) => self.on_order_submitted(*e),
1062 OrderEventAny::Rejected(e) => {
1063 self.restore_primary_order_quantity(&order);
1064 self.on_order_rejected(*e);
1065 }
1066 OrderEventAny::Accepted(e) => {
1067 self.core_mut()
1069 .take_pending_spawn_reduction(&order.client_order_id());
1070 self.on_order_accepted(*e);
1071 }
1072 OrderEventAny::Canceled(e) => {
1073 self.core_mut()
1074 .take_pending_spawn_reduction(&order.client_order_id());
1075 self.on_algo_order_canceled(*e);
1076 }
1077 OrderEventAny::Expired(e) => {
1078 self.core_mut()
1079 .take_pending_spawn_reduction(&order.client_order_id());
1080 self.on_order_expired(*e);
1081 }
1082 OrderEventAny::Triggered(e) => self.on_order_triggered(*e),
1083 OrderEventAny::PendingUpdate(e) => self.on_order_pending_update(*e),
1084 OrderEventAny::PendingCancel(e) => self.on_order_pending_cancel(*e),
1085 OrderEventAny::ModifyRejected(e) => self.on_order_modify_rejected(*e),
1086 OrderEventAny::CancelRejected(e) => self.on_order_cancel_rejected(*e),
1087 OrderEventAny::Updated(e) => self.on_order_updated(*e),
1088 OrderEventAny::Filled(e) => self.on_algo_order_filled(*e),
1089 }
1090
1091 self.on_order_event(event);
1092 }
1093
1094 fn handle_position_event(&mut self, event: PositionEvent) {
1096 if self.core_mut().state() != ComponentState::Running {
1097 return;
1098 }
1099
1100 {
1101 let core = self.core_mut();
1102 if core.config.log_events {
1103 let id = &core.actor.actor_id;
1104 log::info!("{id} {RECV}{EVT} {event:?}");
1105 }
1106 }
1107
1108 match &event {
1109 PositionEvent::PositionOpened(e) => self.on_position_opened(e.clone()),
1110 PositionEvent::PositionChanged(e) => self.on_position_changed(e.clone()),
1111 PositionEvent::PositionClosed(e) => self.on_position_closed(e.clone()),
1112 PositionEvent::PositionAdjusted(_) => {}
1113 }
1114
1115 self.on_position_event(event);
1116 }
1117
1118 fn on_start(&mut self) -> anyhow::Result<()> {
1126 let id = self.id();
1127 log::info!("Starting {id}");
1128 Ok(())
1129 }
1130
1131 fn on_stop(&mut self) -> anyhow::Result<()> {
1137 Ok(())
1138 }
1139
1140 fn on_reset(&mut self) -> anyhow::Result<()> {
1146 self.unsubscribe_all_strategy_events();
1147 self.core_mut().reset();
1148 Ok(())
1149 }
1150
1151 fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
1159 Ok(())
1160 }
1161
1162 #[allow(unused_variables)]
1164 fn on_order_initialized(&mut self, event: OrderInitialized) {}
1165
1166 #[allow(unused_variables)]
1168 fn on_order_denied(&mut self, event: OrderDenied) {}
1169
1170 #[allow(unused_variables)]
1172 fn on_order_emulated(&mut self, event: OrderEmulated) {}
1173
1174 #[allow(unused_variables)]
1176 fn on_order_released(&mut self, event: OrderReleased) {}
1177
1178 #[allow(unused_variables)]
1180 fn on_order_submitted(&mut self, event: OrderSubmitted) {}
1181
1182 #[allow(unused_variables)]
1184 fn on_order_rejected(&mut self, event: OrderRejected) {}
1185
1186 #[allow(unused_variables)]
1188 fn on_order_accepted(&mut self, event: OrderAccepted) {}
1189
1190 #[allow(unused_variables)]
1192 fn on_algo_order_canceled(&mut self, event: OrderCanceled) {}
1193
1194 #[allow(unused_variables)]
1196 fn on_order_expired(&mut self, event: OrderExpired) {}
1197
1198 #[allow(unused_variables)]
1200 fn on_order_triggered(&mut self, event: OrderTriggered) {}
1201
1202 #[allow(unused_variables)]
1204 fn on_order_pending_update(&mut self, event: OrderPendingUpdate) {}
1205
1206 #[allow(unused_variables)]
1208 fn on_order_pending_cancel(&mut self, event: OrderPendingCancel) {}
1209
1210 #[allow(unused_variables)]
1212 fn on_order_modify_rejected(&mut self, event: OrderModifyRejected) {}
1213
1214 #[allow(unused_variables)]
1216 fn on_order_cancel_rejected(&mut self, event: OrderCancelRejected) {}
1217
1218 #[allow(unused_variables)]
1220 fn on_order_updated(&mut self, event: OrderUpdated) {}
1221
1222 #[allow(unused_variables)]
1224 fn on_algo_order_filled(&mut self, event: OrderFilled) {}
1225
1226 #[allow(unused_variables)]
1228 fn on_order_event(&mut self, event: OrderEventAny) {}
1229
1230 #[allow(unused_variables)]
1232 fn on_position_opened(&mut self, event: PositionOpened) {}
1233
1234 #[allow(unused_variables)]
1236 fn on_position_changed(&mut self, event: PositionChanged) {}
1237
1238 #[allow(unused_variables)]
1240 fn on_position_closed(&mut self, event: PositionClosed) {}
1241
1242 #[allow(unused_variables)]
1244 fn on_position_event(&mut self, event: PositionEvent) {}
1245}
1246
1247fn publish_order_initialized(order: &OrderAny) {
1248 let event = OrderEventAny::Initialized(order.init_event().clone());
1249 publish_order_event(&event);
1250}
1251
1252fn publish_order_event(event: &OrderEventAny) {
1253 let topic = format!("events.order.{}", event.strategy_id());
1254 msgbus::publish_order_event(topic.into(), event);
1255}
1256
1257fn registered_trader_id(core: &ExecutionAlgorithmCore) -> anyhow::Result<TraderId> {
1258 core.trader_id()
1259 .ok_or_else(|| anyhow::anyhow!("ExecutionAlgorithm not registered: trader_id is not set"))
1260}
1261
1262fn required_account_id(order: &OrderAny, operation: &str) -> anyhow::Result<AccountId> {
1263 order.account_id().ok_or_else(|| {
1264 anyhow::anyhow!(
1265 "Cannot generate {operation} event for {}: account_id is not set",
1266 order.client_order_id()
1267 )
1268 })
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use std::{cell::RefCell, rc::Rc};
1274
1275 use nautilus_common::{
1276 actor::DataActor, cache::Cache, clock::TestClock, component::Component,
1277 enums::ComponentTrigger, msgbus, msgbus::TypedHandler, nautilus_actor,
1278 };
1279 use nautilus_model::{
1280 enums::OrderSide,
1281 events::{
1282 OrderAccepted, OrderCanceled, OrderDenied, OrderRejected,
1283 order::spec::{
1284 OrderAcceptedSpec, OrderCanceledSpec, OrderDeniedSpec, OrderFilledSpec,
1285 OrderRejectedSpec,
1286 },
1287 },
1288 identifiers::{
1289 AccountId, ClientOrderId, ExecAlgorithmId, InstrumentId, StrategyId, TraderId,
1290 VenueOrderId,
1291 },
1292 orders::{LimitOrder, MarketOrder, OrderAny, stubs::TestOrderStubs},
1293 types::{Price, Quantity},
1294 };
1295 use rstest::rstest;
1296
1297 use super::*;
1298
1299 #[derive(Debug)]
1300 struct TestAlgorithm {
1301 core: ExecutionAlgorithmCore,
1302 on_order_called: bool,
1303 last_order_client_id: Option<ClientOrderId>,
1304 }
1305
1306 impl TestAlgorithm {
1307 fn new(config: ExecutionAlgorithmConfig) -> Self {
1308 Self {
1309 core: ExecutionAlgorithmCore::new(config),
1310 on_order_called: false,
1311 last_order_client_id: None,
1312 }
1313 }
1314 }
1315
1316 impl DataActor for TestAlgorithm {}
1317
1318 nautilus_actor!(TestAlgorithm);
1319
1320 impl ExecutionAlgorithm for TestAlgorithm {
1321 fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
1322 &mut self.core
1323 }
1324
1325 fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()> {
1326 self.on_order_called = true;
1327 self.last_order_client_id = Some(order.client_order_id());
1328 Ok(())
1329 }
1330 }
1331
1332 fn create_test_algorithm() -> TestAlgorithm {
1333 let unique_id = format!("TEST-{}", UUID4::new());
1335 let config = ExecutionAlgorithmConfig {
1336 exec_algorithm_id: Some(ExecAlgorithmId::new(&unique_id)),
1337 ..Default::default()
1338 };
1339 TestAlgorithm::new(config)
1340 }
1341
1342 fn register_algorithm(algo: &mut TestAlgorithm) {
1343 let trader_id = TraderId::from("TRADER-001");
1344 let clock = Rc::new(RefCell::new(TestClock::new()));
1345 let cache = Rc::new(RefCell::new(Cache::default()));
1346
1347 algo.core.register(trader_id, clock, cache).unwrap();
1348
1349 algo.transition_state(ComponentTrigger::Initialize).unwrap();
1351 algo.transition_state(ComponentTrigger::Start).unwrap();
1352 algo.transition_state(ComponentTrigger::StartCompleted)
1353 .unwrap();
1354 }
1355
1356 fn subscribe_order_topic(
1357 strategy_id: StrategyId,
1358 ) -> (TypedHandler<OrderEventAny>, Rc<RefCell<Vec<OrderEventAny>>>) {
1359 let events = Rc::new(RefCell::new(Vec::new()));
1360 let handler = TypedHandler::from({
1361 let events = events.clone();
1362 move |event: &OrderEventAny| {
1363 events.borrow_mut().push(event.clone());
1364 }
1365 });
1366 msgbus::subscribe_order_events(
1367 format!("events.order.{strategy_id}").into(),
1368 handler.clone(),
1369 None,
1370 );
1371 (handler, events)
1372 }
1373
1374 #[rstest]
1375 fn test_algorithm_creation() {
1376 let algo = create_test_algorithm();
1377 assert!(algo.core.exec_algorithm_id.inner().starts_with("TEST-"));
1378 assert!(!algo.on_order_called);
1379 assert!(algo.last_order_client_id.is_none());
1380 }
1381
1382 #[rstest]
1383 fn test_algorithm_registration() {
1384 let mut algo = create_test_algorithm();
1385 register_algorithm(&mut algo);
1386
1387 assert!(algo.core.trader_id().is_some());
1388 assert_eq!(algo.core.trader_id(), Some(TraderId::from("TRADER-001")));
1389 }
1390
1391 #[rstest]
1392 fn test_submit_order_errors_when_algorithm_not_registered() {
1393 let mut algo = create_test_algorithm();
1394 let order = OrderAny::Market(MarketOrder::new(
1395 TraderId::from("TRADER-001"),
1396 StrategyId::from("STRAT-001"),
1397 InstrumentId::from("BTC/USDT.BINANCE"),
1398 ClientOrderId::from("O-UNREGISTERED-001"),
1399 OrderSide::Buy,
1400 Quantity::from("1.0"),
1401 TimeInForce::Gtc,
1402 UUID4::new(),
1403 0.into(),
1404 false,
1405 false,
1406 None,
1407 None,
1408 None,
1409 None,
1410 None,
1411 None,
1412 None,
1413 None,
1414 ));
1415
1416 let err = algo
1417 .submit_order(order, None, None)
1418 .unwrap_err()
1419 .to_string();
1420
1421 assert_eq!(
1422 err,
1423 "ExecutionAlgorithm not registered: trader_id is not set"
1424 );
1425 }
1426
1427 #[rstest]
1428 fn test_required_account_id_errors_when_missing_for_algorithm_event() {
1429 let order = OrderAny::Market(MarketOrder::new(
1430 TraderId::from("TRADER-001"),
1431 StrategyId::from("STRAT-001"),
1432 InstrumentId::from("BTC/USDT.BINANCE"),
1433 ClientOrderId::from("O-NO-ACCOUNT-001"),
1434 OrderSide::Buy,
1435 Quantity::from("1.0"),
1436 TimeInForce::Gtc,
1437 UUID4::new(),
1438 0.into(),
1439 false,
1440 false,
1441 None,
1442 None,
1443 None,
1444 None,
1445 None,
1446 None,
1447 None,
1448 None,
1449 ));
1450
1451 let err = required_account_id(&order, "pending update")
1452 .unwrap_err()
1453 .to_string();
1454
1455 assert_eq!(
1456 err,
1457 "Cannot generate pending update event for O-NO-ACCOUNT-001: account_id is not set"
1458 );
1459 }
1460
1461 #[rstest]
1462 fn test_algorithm_id() {
1463 let mut algo = create_test_algorithm();
1464 assert!(algo.id().inner().starts_with("TEST-"));
1465 }
1466
1467 #[rstest]
1468 fn test_algorithm_spawn_market_creates_valid_order() {
1469 let mut algo = create_test_algorithm();
1470 register_algorithm(&mut algo);
1471
1472 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1473 let mut primary = OrderAny::Market(MarketOrder::new(
1474 TraderId::from("TRADER-001"),
1475 StrategyId::from("STRAT-001"),
1476 instrument_id,
1477 ClientOrderId::from("O-001"),
1478 OrderSide::Buy,
1479 Quantity::from("1.0"),
1480 TimeInForce::Gtc,
1481 UUID4::new(),
1482 0.into(),
1483 false, false, None, None, None, None, None, None, None, None, ));
1494
1495 let spawned = algo.spawn_market(
1496 &mut primary,
1497 Quantity::from("0.5"),
1498 TimeInForce::Ioc,
1499 false,
1500 None, false, );
1503
1504 assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1505 assert_eq!(spawned.instrument_id, instrument_id);
1506 assert_eq!(spawned.order_side(), OrderSide::Buy);
1507 assert_eq!(spawned.quantity, Quantity::from("0.5"));
1508 assert_eq!(spawned.time_in_force, TimeInForce::Ioc);
1509 assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1510 assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1511 }
1512
1513 #[rstest]
1514 fn test_algorithm_spawn_increments_sequence() {
1515 let mut algo = create_test_algorithm();
1516 register_algorithm(&mut algo);
1517
1518 let mut primary = OrderAny::Market(MarketOrder::new(
1519 TraderId::from("TRADER-001"),
1520 StrategyId::from("STRAT-001"),
1521 InstrumentId::from("BTC/USDT.BINANCE"),
1522 ClientOrderId::from("O-001"),
1523 OrderSide::Buy,
1524 Quantity::from("1.0"),
1525 TimeInForce::Gtc,
1526 UUID4::new(),
1527 0.into(),
1528 false,
1529 false,
1530 None,
1531 None,
1532 None,
1533 None,
1534 None,
1535 None,
1536 None,
1537 None,
1538 ));
1539
1540 let spawned1 = algo.spawn_market(
1541 &mut primary,
1542 Quantity::from("0.25"),
1543 TimeInForce::Ioc,
1544 false,
1545 None,
1546 false,
1547 );
1548 let spawned2 = algo.spawn_market(
1549 &mut primary,
1550 Quantity::from("0.25"),
1551 TimeInForce::Ioc,
1552 false,
1553 None,
1554 false,
1555 );
1556 let spawned3 = algo.spawn_market(
1557 &mut primary,
1558 Quantity::from("0.25"),
1559 TimeInForce::Ioc,
1560 false,
1561 None,
1562 false,
1563 );
1564
1565 assert_eq!(spawned1.client_order_id.as_str(), "O-001-E1");
1566 assert_eq!(spawned2.client_order_id.as_str(), "O-001-E2");
1567 assert_eq!(spawned3.client_order_id.as_str(), "O-001-E3");
1568 }
1569
1570 #[rstest]
1571 fn test_algorithm_default_handlers_do_not_panic() {
1572 let mut algo = create_test_algorithm();
1573
1574 algo.on_order_initialized(OrderInitialized::default());
1575 algo.on_order_denied(OrderDenied::default());
1576 algo.on_order_emulated(OrderEmulated::default());
1577 algo.on_order_released(OrderReleased::default());
1578 algo.on_order_submitted(OrderSubmitted::default());
1579 algo.on_order_rejected(OrderRejected::default());
1580 algo.on_order_accepted(OrderAccepted::default());
1581 algo.on_algo_order_canceled(OrderCanceled::default());
1582 algo.on_order_expired(OrderExpired::default());
1583 algo.on_order_triggered(OrderTriggered::default());
1584 algo.on_order_pending_update(OrderPendingUpdate::default());
1585 algo.on_order_pending_cancel(OrderPendingCancel::default());
1586 algo.on_order_modify_rejected(OrderModifyRejected::default());
1587 algo.on_order_cancel_rejected(OrderCancelRejected::default());
1588 algo.on_order_updated(OrderUpdated::default());
1589 algo.on_algo_order_filled(OrderFilledSpec::builder().build());
1590 }
1591
1592 #[rstest]
1593 fn test_strategy_subscription_tracking() {
1594 let mut algo = create_test_algorithm();
1595 let strategy_id = StrategyId::from("STRAT-001");
1596
1597 assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1598
1599 algo.subscribe_to_strategy_events(strategy_id);
1600 assert!(algo.core.is_strategy_subscribed(&strategy_id));
1601
1602 algo.subscribe_to_strategy_events(strategy_id);
1604 assert!(algo.core.is_strategy_subscribed(&strategy_id));
1605 }
1606
1607 #[rstest]
1608 fn test_algorithm_reset() {
1609 let mut algo = create_test_algorithm();
1610 let strategy_id = StrategyId::from("STRAT-001");
1611 let primary_id = ClientOrderId::new("O-001");
1612
1613 let _ = algo.core.spawn_client_order_id(&primary_id);
1614 algo.core.add_subscribed_strategy(strategy_id);
1615
1616 assert!(algo.core.spawn_sequence(&primary_id).is_some());
1617 assert!(algo.core.is_strategy_subscribed(&strategy_id));
1618
1619 ExecutionAlgorithm::on_reset(&mut algo).unwrap();
1620
1621 assert!(algo.core.spawn_sequence(&primary_id).is_none());
1622 assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1623 }
1624
1625 #[rstest]
1626 fn test_algorithm_spawn_limit_creates_valid_order() {
1627 let mut algo = create_test_algorithm();
1628 register_algorithm(&mut algo);
1629
1630 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1631 let mut primary = OrderAny::Market(MarketOrder::new(
1632 TraderId::from("TRADER-001"),
1633 StrategyId::from("STRAT-001"),
1634 instrument_id,
1635 ClientOrderId::from("O-001"),
1636 OrderSide::Buy,
1637 Quantity::from("1.0"),
1638 TimeInForce::Gtc,
1639 UUID4::new(),
1640 0.into(),
1641 false,
1642 false,
1643 None,
1644 None,
1645 None,
1646 None,
1647 None,
1648 None,
1649 None,
1650 None,
1651 ));
1652
1653 let price = Price::from("50000.0");
1654 let spawned = algo.spawn_limit(
1655 &mut primary,
1656 Quantity::from("0.5"),
1657 price,
1658 TimeInForce::Gtc,
1659 None, false, false, None, None, None, false, );
1667
1668 assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1669 assert_eq!(spawned.instrument_id, instrument_id);
1670 assert_eq!(spawned.order_side(), OrderSide::Buy);
1671 assert_eq!(spawned.quantity, Quantity::from("0.5"));
1672 assert_eq!(spawned.price, price);
1673 assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1674 assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1675 assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1676 }
1677
1678 #[rstest]
1679 fn test_algorithm_spawn_market_to_limit_creates_valid_order() {
1680 let mut algo = create_test_algorithm();
1681 register_algorithm(&mut algo);
1682
1683 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1684 let mut primary = OrderAny::Market(MarketOrder::new(
1685 TraderId::from("TRADER-001"),
1686 StrategyId::from("STRAT-001"),
1687 instrument_id,
1688 ClientOrderId::from("O-001"),
1689 OrderSide::Buy,
1690 Quantity::from("1.0"),
1691 TimeInForce::Gtc,
1692 UUID4::new(),
1693 0.into(),
1694 false,
1695 false,
1696 None,
1697 None,
1698 None,
1699 None,
1700 None,
1701 None,
1702 None,
1703 None,
1704 ));
1705
1706 let spawned = algo.spawn_market_to_limit(
1707 &mut primary,
1708 Quantity::from("0.5"),
1709 TimeInForce::Gtc,
1710 None, false, None, None, None, false, );
1717
1718 assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1719 assert_eq!(spawned.instrument_id, instrument_id);
1720 assert_eq!(spawned.order_side(), OrderSide::Buy);
1721 assert_eq!(spawned.quantity, Quantity::from("0.5"));
1722 assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1723 assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1724 assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1725 }
1726
1727 #[rstest]
1728 fn test_algorithm_spawn_market_with_tags() {
1729 let mut algo = create_test_algorithm();
1730 register_algorithm(&mut algo);
1731
1732 let mut primary = OrderAny::Market(MarketOrder::new(
1733 TraderId::from("TRADER-001"),
1734 StrategyId::from("STRAT-001"),
1735 InstrumentId::from("BTC/USDT.BINANCE"),
1736 ClientOrderId::from("O-001"),
1737 OrderSide::Buy,
1738 Quantity::from("1.0"),
1739 TimeInForce::Gtc,
1740 UUID4::new(),
1741 0.into(),
1742 false,
1743 false,
1744 None,
1745 None,
1746 None,
1747 None,
1748 None,
1749 None,
1750 None,
1751 None,
1752 ));
1753
1754 let tags = vec![ustr::Ustr::from("TAG1"), ustr::Ustr::from("TAG2")];
1755 let spawned = algo.spawn_market(
1756 &mut primary,
1757 Quantity::from("0.5"),
1758 TimeInForce::Ioc,
1759 false,
1760 Some(tags.clone()),
1761 false,
1762 );
1763
1764 assert_eq!(spawned.tags, Some(tags));
1765 }
1766
1767 #[rstest]
1768 fn test_algorithm_spawn_propagates_primary_fields() {
1769 let mut algo = create_test_algorithm();
1770 register_algorithm(&mut algo);
1771
1772 let mut params = indexmap::IndexMap::new();
1773 params.insert(ustr::Ustr::from("horizon_secs"), ustr::Ustr::from("30"));
1774 params.insert(ustr::Ustr::from("interval_secs"), ustr::Ustr::from("10"));
1775 let primary_tags = vec![ustr::Ustr::from("PRIMARY_TAG")];
1776 let linked_order_ids = vec![ClientOrderId::from("LINK-1")];
1777
1778 let mut primary = OrderAny::Market(MarketOrder::new(
1779 TraderId::from("TRADER-001"),
1780 StrategyId::from("STRAT-001"),
1781 InstrumentId::from("BTC/USDT.BINANCE"),
1782 ClientOrderId::from("O-001"),
1783 OrderSide::Buy,
1784 Quantity::from("1.0"),
1785 TimeInForce::Gtc,
1786 UUID4::new(),
1787 0.into(),
1788 false, true, None, None, Some(linked_order_ids.clone()),
1793 None, Some(algo.id()),
1795 Some(params.clone()),
1796 None, Some(primary_tags.clone()),
1798 ));
1799
1800 let spawned_market = algo.spawn_market(
1801 &mut primary,
1802 Quantity::from("0.25"),
1803 TimeInForce::Ioc,
1804 false,
1805 None, false,
1807 );
1808 assert!(spawned_market.is_quote_quantity);
1809 assert_eq!(spawned_market.exec_algorithm_params, Some(params.clone()));
1810 assert_eq!(spawned_market.tags, Some(primary_tags.clone()));
1811 assert_eq!(
1812 spawned_market.linked_order_ids,
1813 Some(linked_order_ids.clone())
1814 );
1815
1816 let spawned_limit = algo.spawn_limit(
1817 &mut primary,
1818 Quantity::from("0.25"),
1819 Price::from("50000.0"),
1820 TimeInForce::Gtc,
1821 None, false, false, None, None, None, false,
1828 );
1829 assert!(spawned_limit.is_quote_quantity);
1830 assert_eq!(spawned_limit.exec_algorithm_params, Some(params.clone()));
1831 assert_eq!(spawned_limit.tags, Some(primary_tags.clone()));
1832 assert_eq!(
1833 spawned_limit.linked_order_ids,
1834 Some(linked_order_ids.clone())
1835 );
1836
1837 let spawned_mtl = algo.spawn_market_to_limit(
1838 &mut primary,
1839 Quantity::from("0.25"),
1840 TimeInForce::Gtc,
1841 None, false, None, None, None, false,
1847 );
1848 assert!(spawned_mtl.is_quote_quantity);
1849 assert_eq!(spawned_mtl.exec_algorithm_params, Some(params));
1850 assert_eq!(spawned_mtl.tags, Some(primary_tags));
1851 assert_eq!(spawned_mtl.linked_order_ids, Some(linked_order_ids));
1852 }
1853
1854 #[rstest]
1855 fn test_algorithm_reduce_primary_order() {
1856 let mut algo = create_test_algorithm();
1857 register_algorithm(&mut algo);
1858
1859 let order = OrderAny::Market(MarketOrder::new(
1860 TraderId::from("TRADER-001"),
1861 StrategyId::from("STRAT-001"),
1862 InstrumentId::from("BTC/USDT.BINANCE"),
1863 ClientOrderId::from("O-001"),
1864 OrderSide::Buy,
1865 Quantity::from("1.0"),
1866 TimeInForce::Gtc,
1867 UUID4::new(),
1868 0.into(),
1869 false,
1870 false,
1871 None,
1872 None,
1873 None,
1874 None,
1875 None,
1876 None,
1877 None,
1878 None,
1879 ));
1880
1881 let mut primary = TestOrderStubs::make_accepted_order(&order);
1883
1884 {
1885 let cache_rc = algo.core.cache_rc();
1886 let mut cache = cache_rc.borrow_mut();
1887 cache.add_order(primary.clone(), None, None, false).unwrap();
1888 }
1889
1890 let spawn_qty = Quantity::from("0.3");
1891 algo.reduce_primary_order(&mut primary, spawn_qty);
1892
1893 assert_eq!(primary.quantity(), Quantity::from("0.7"));
1894 }
1895
1896 #[rstest]
1897 fn test_algorithm_reduce_primary_order_publishes_updated_event() {
1898 let mut algo = create_test_algorithm();
1899 register_algorithm(&mut algo);
1900
1901 let strategy_id = StrategyId::from("STRAT-ALGO-REDUCE-PUBLISH");
1902 let order = OrderAny::Market(MarketOrder::new(
1903 TraderId::from("TRADER-001"),
1904 strategy_id,
1905 InstrumentId::from("BTC/USDT.BINANCE"),
1906 ClientOrderId::from("O-ALGO-REDUCE"),
1907 OrderSide::Buy,
1908 Quantity::from("1.0"),
1909 TimeInForce::Gtc,
1910 UUID4::new(),
1911 0.into(),
1912 false,
1913 false,
1914 None,
1915 None,
1916 None,
1917 None,
1918 None,
1919 None,
1920 None,
1921 None,
1922 ));
1923 let mut primary = TestOrderStubs::make_accepted_order(&order);
1924
1925 {
1926 let cache_rc = algo.core.cache_rc();
1927 let mut cache = cache_rc.borrow_mut();
1928 cache.add_order(primary.clone(), None, None, false).unwrap();
1929 }
1930
1931 let (handler, events) = subscribe_order_topic(strategy_id);
1932
1933 algo.reduce_primary_order(&mut primary, Quantity::from("0.3"));
1934
1935 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1936 let events = events.borrow();
1937
1938 assert_eq!(events.len(), 1);
1939 assert!(matches!(
1940 &events[0],
1941 OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
1942 ));
1943 }
1944
1945 #[rstest]
1946 fn test_algorithm_submit_order_publishes_initialized_for_new_order() {
1947 let mut algo = create_test_algorithm();
1948 register_algorithm(&mut algo);
1949
1950 let strategy_id = StrategyId::from("STRAT-ALGO-INIT-PUBLISH");
1951 let order = OrderAny::Market(MarketOrder::new(
1952 TraderId::from("TRADER-001"),
1953 strategy_id,
1954 InstrumentId::from("BTC/USDT.BINANCE"),
1955 ClientOrderId::from("O-ALGO-INIT"),
1956 OrderSide::Buy,
1957 Quantity::from("1.0"),
1958 TimeInForce::Gtc,
1959 UUID4::new(),
1960 0.into(),
1961 false,
1962 false,
1963 None,
1964 None,
1965 None,
1966 None,
1967 None,
1968 None,
1969 None,
1970 None,
1971 ));
1972 let (handler, events) = subscribe_order_topic(strategy_id);
1973
1974 algo.submit_order(order.clone(), None, None).unwrap();
1975
1976 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1977 let events = events.borrow();
1978
1979 assert_eq!(events.len(), 1);
1980 assert!(matches!(
1981 &events[0],
1982 OrderEventAny::Initialized(event) if event.client_order_id == order.client_order_id()
1983 ));
1984 }
1985
1986 #[rstest]
1987 fn test_algorithm_submit_order_does_not_republish_initialized_for_existing_order() {
1988 let mut algo = create_test_algorithm();
1989 register_algorithm(&mut algo);
1990
1991 let strategy_id = StrategyId::from("STRAT-ALGO-INIT-EXISTING");
1992 let order = OrderAny::Market(MarketOrder::new(
1993 TraderId::from("TRADER-001"),
1994 strategy_id,
1995 InstrumentId::from("BTC/USDT.BINANCE"),
1996 ClientOrderId::from("O-ALGO-INIT-EXISTING"),
1997 OrderSide::Buy,
1998 Quantity::from("1.0"),
1999 TimeInForce::Gtc,
2000 UUID4::new(),
2001 0.into(),
2002 false,
2003 false,
2004 None,
2005 None,
2006 None,
2007 None,
2008 None,
2009 None,
2010 None,
2011 None,
2012 ));
2013 {
2014 let cache_rc = algo.core.cache_rc();
2015 let mut cache = cache_rc.borrow_mut();
2016 cache.add_order(order.clone(), None, None, true).unwrap();
2017 }
2018 let (handler, events) = subscribe_order_topic(strategy_id);
2019
2020 algo.submit_order(order, None, None).unwrap();
2021
2022 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2023 assert!(events.borrow().is_empty());
2024 }
2025
2026 #[rstest]
2027 fn test_algorithm_spawn_market_with_reduce_primary() {
2028 let mut algo = create_test_algorithm();
2029 register_algorithm(&mut algo);
2030
2031 let order = OrderAny::Market(MarketOrder::new(
2032 TraderId::from("TRADER-001"),
2033 StrategyId::from("STRAT-001"),
2034 InstrumentId::from("BTC/USDT.BINANCE"),
2035 ClientOrderId::from("O-001"),
2036 OrderSide::Buy,
2037 Quantity::from("1.0"),
2038 TimeInForce::Gtc,
2039 UUID4::new(),
2040 0.into(),
2041 false,
2042 false,
2043 None,
2044 None,
2045 None,
2046 None,
2047 None,
2048 None,
2049 None,
2050 None,
2051 ));
2052
2053 let mut primary = TestOrderStubs::make_accepted_order(&order);
2055
2056 {
2057 let cache_rc = algo.core.cache_rc();
2058 let mut cache = cache_rc.borrow_mut();
2059 cache.add_order(primary.clone(), None, None, false).unwrap();
2060 }
2061
2062 let spawned = algo.spawn_market(
2063 &mut primary,
2064 Quantity::from("0.4"),
2065 TimeInForce::Ioc,
2066 false,
2067 None,
2068 true, );
2070
2071 assert_eq!(spawned.quantity, Quantity::from("0.4"));
2072 assert_eq!(primary.quantity(), Quantity::from("0.6"));
2073 }
2074
2075 #[rstest]
2076 fn test_algorithm_generate_order_canceled() {
2077 let mut algo = create_test_algorithm();
2078 register_algorithm(&mut algo);
2079
2080 let order = OrderAny::Market(MarketOrder::new(
2081 TraderId::from("TRADER-001"),
2082 StrategyId::from("STRAT-001"),
2083 InstrumentId::from("BTC/USDT.BINANCE"),
2084 ClientOrderId::from("O-001"),
2085 OrderSide::Buy,
2086 Quantity::from("1.0"),
2087 TimeInForce::Gtc,
2088 UUID4::new(),
2089 0.into(),
2090 false,
2091 false,
2092 None,
2093 None,
2094 None,
2095 None,
2096 None,
2097 None,
2098 None,
2099 None,
2100 ));
2101
2102 let event = algo.generate_order_canceled(&order);
2103
2104 assert_eq!(event.trader_id, TraderId::from("TRADER-001"));
2105 assert_eq!(event.strategy_id, StrategyId::from("STRAT-001"));
2106 assert_eq!(event.instrument_id, InstrumentId::from("BTC/USDT.BINANCE"));
2107 assert_eq!(event.client_order_id, ClientOrderId::from("O-001"));
2108 }
2109
2110 #[rstest]
2111 fn test_algorithm_modify_order_in_place_updates_quantity() {
2112 let mut algo = create_test_algorithm();
2113 register_algorithm(&mut algo);
2114
2115 let strategy_id = StrategyId::from("STRAT-ALGO-MODIFY-IN-PLACE");
2116 let mut order = OrderAny::Limit(LimitOrder::new(
2117 TraderId::from("TRADER-001"),
2118 strategy_id,
2119 InstrumentId::from("BTC/USDT.BINANCE"),
2120 ClientOrderId::from("O-001"),
2121 OrderSide::Buy,
2122 Quantity::from("1.0"),
2123 Price::from("50000.0"),
2124 TimeInForce::Gtc,
2125 None, false, false, false, None, None, None, None, None, None, None, None, None, None, None, UUID4::new(),
2141 0.into(),
2142 ));
2143
2144 {
2145 let cache_rc = algo.core.cache_rc();
2146 let mut cache = cache_rc.borrow_mut();
2147 cache.add_order(order.clone(), None, None, false).unwrap();
2148 }
2149
2150 let new_qty = Quantity::from("0.5");
2151 let (handler, events) = subscribe_order_topic(strategy_id);
2152
2153 algo.modify_order_in_place(&mut order, Some(new_qty), None, None)
2154 .unwrap();
2155
2156 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2157 let events = events.borrow();
2158
2159 assert_eq!(order.quantity(), new_qty);
2160 assert_eq!(events.len(), 1);
2161 assert!(matches!(
2162 &events[0],
2163 OrderEventAny::Updated(event) if event.quantity == new_qty
2164 ));
2165 }
2166
2167 #[rstest]
2168 fn test_algorithm_modify_order_in_place_rejects_no_changes() {
2169 let mut algo = create_test_algorithm();
2170 register_algorithm(&mut algo);
2171
2172 let mut order = OrderAny::Limit(LimitOrder::new(
2173 TraderId::from("TRADER-001"),
2174 StrategyId::from("STRAT-001"),
2175 InstrumentId::from("BTC/USDT.BINANCE"),
2176 ClientOrderId::from("O-001"),
2177 OrderSide::Buy,
2178 Quantity::from("1.0"),
2179 Price::from("50000.0"),
2180 TimeInForce::Gtc,
2181 None,
2182 false,
2183 false,
2184 false,
2185 None,
2186 None,
2187 None,
2188 None,
2189 None,
2190 None,
2191 None,
2192 None,
2193 None,
2194 None,
2195 None,
2196 UUID4::new(),
2197 0.into(),
2198 ));
2199
2200 let result =
2202 algo.modify_order_in_place(&mut order, Some(Quantity::from("1.0")), None, None);
2203
2204 assert!(result.is_err());
2205 assert!(
2206 result
2207 .unwrap_err()
2208 .to_string()
2209 .contains("no parameters differ")
2210 );
2211 }
2212
2213 #[rstest]
2214 fn test_spawned_order_denied_restores_primary_quantity() {
2215 let mut algo = create_test_algorithm();
2216 register_algorithm(&mut algo);
2217
2218 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2219 let exec_algorithm_id = algo.id();
2220
2221 let mut primary = OrderAny::Market(MarketOrder::new(
2222 TraderId::from("TRADER-001"),
2223 StrategyId::from("STRAT-001"),
2224 instrument_id,
2225 ClientOrderId::from("O-001"),
2226 OrderSide::Buy,
2227 Quantity::from("1.0"),
2228 TimeInForce::Gtc,
2229 UUID4::new(),
2230 0.into(),
2231 false,
2232 false,
2233 None,
2234 None,
2235 None,
2236 None,
2237 Some(exec_algorithm_id),
2238 None,
2239 None,
2240 None,
2241 ));
2242
2243 {
2244 let cache_rc = algo.core.cache_rc();
2245 let mut cache = cache_rc.borrow_mut();
2246 cache.add_order(primary.clone(), None, None, false).unwrap();
2247 }
2248
2249 let spawned = algo.spawn_market(
2250 &mut primary,
2251 Quantity::from("0.5"),
2252 TimeInForce::Fok,
2253 false,
2254 None,
2255 true,
2256 );
2257
2258 assert_eq!(primary.quantity(), Quantity::from("0.5"));
2259
2260 let spawned_order = OrderAny::Market(spawned);
2261 {
2262 let cache_rc = algo.core.cache_rc();
2263 let mut cache = cache_rc.borrow_mut();
2264 cache
2265 .add_order(spawned_order.clone(), None, None, false)
2266 .unwrap();
2267 }
2268
2269 let denied = OrderDeniedSpec::builder()
2270 .trader_id(spawned_order.trader_id())
2271 .strategy_id(spawned_order.strategy_id())
2272 .instrument_id(spawned_order.instrument_id())
2273 .client_order_id(spawned_order.client_order_id())
2274 .reason("TEST_DENIAL".into())
2275 .build();
2276
2277 {
2278 let cache_rc = algo.core.cache_rc();
2279 let mut cache = cache_rc.borrow_mut();
2280 cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2281 }
2282
2283 algo.handle_order_event(OrderEventAny::Denied(denied));
2284
2285 let restored_primary = {
2286 let cache = algo.core.cache();
2287 cache
2288 .order(&ClientOrderId::from("O-001"))
2289 .map(|o| o.clone())
2290 .unwrap()
2291 };
2292 assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2293 }
2294
2295 #[rstest]
2296 fn test_spawned_order_rejected_restores_primary_quantity() {
2297 let mut algo = create_test_algorithm();
2298 register_algorithm(&mut algo);
2299
2300 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2301 let exec_algorithm_id = algo.id();
2302
2303 let mut primary = OrderAny::Market(MarketOrder::new(
2304 TraderId::from("TRADER-001"),
2305 StrategyId::from("STRAT-001"),
2306 instrument_id,
2307 ClientOrderId::from("O-001"),
2308 OrderSide::Buy,
2309 Quantity::from("1.0"),
2310 TimeInForce::Gtc,
2311 UUID4::new(),
2312 0.into(),
2313 false,
2314 false,
2315 None,
2316 None,
2317 None,
2318 None,
2319 Some(exec_algorithm_id),
2320 None,
2321 None,
2322 None,
2323 ));
2324
2325 {
2326 let cache_rc = algo.core.cache_rc();
2327 let mut cache = cache_rc.borrow_mut();
2328 cache.add_order(primary.clone(), None, None, false).unwrap();
2329 }
2330
2331 let spawned = algo.spawn_market(
2332 &mut primary,
2333 Quantity::from("0.5"),
2334 TimeInForce::Fok,
2335 false,
2336 None,
2337 true,
2338 );
2339
2340 assert_eq!(primary.quantity(), Quantity::from("0.5"));
2341
2342 let spawned_order = OrderAny::Market(spawned);
2343 {
2344 let cache_rc = algo.core.cache_rc();
2345 let mut cache = cache_rc.borrow_mut();
2346 cache
2347 .add_order(spawned_order.clone(), None, None, false)
2348 .unwrap();
2349 }
2350
2351 let rejected = OrderRejectedSpec::builder()
2352 .trader_id(spawned_order.trader_id())
2353 .strategy_id(spawned_order.strategy_id())
2354 .instrument_id(spawned_order.instrument_id())
2355 .client_order_id(spawned_order.client_order_id())
2356 .account_id(AccountId::from("BINANCE-001"))
2357 .reason("TEST_REJECTION".into())
2358 .build();
2359
2360 {
2361 let cache_rc = algo.core.cache_rc();
2362 let mut cache = cache_rc.borrow_mut();
2363 cache
2364 .update_order(&OrderEventAny::Rejected(rejected))
2365 .unwrap();
2366 }
2367
2368 algo.handle_order_event(OrderEventAny::Rejected(rejected));
2369
2370 let restored_primary = {
2371 let cache = algo.core.cache();
2372 cache
2373 .order(&ClientOrderId::from("O-001"))
2374 .map(|o| o.clone())
2375 .unwrap()
2376 };
2377 assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2378 }
2379
2380 #[rstest]
2381 fn test_spawned_order_with_reduce_primary_false_does_not_restore() {
2382 let mut algo = create_test_algorithm();
2383 register_algorithm(&mut algo);
2384
2385 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2386 let exec_algorithm_id = algo.id();
2387
2388 let mut primary = OrderAny::Market(MarketOrder::new(
2389 TraderId::from("TRADER-001"),
2390 StrategyId::from("STRAT-001"),
2391 instrument_id,
2392 ClientOrderId::from("O-001"),
2393 OrderSide::Buy,
2394 Quantity::from("1.0"),
2395 TimeInForce::Gtc,
2396 UUID4::new(),
2397 0.into(),
2398 false,
2399 false,
2400 None,
2401 None,
2402 None,
2403 None,
2404 Some(exec_algorithm_id),
2405 None,
2406 None,
2407 None,
2408 ));
2409
2410 {
2411 let cache_rc = algo.core.cache_rc();
2412 let mut cache = cache_rc.borrow_mut();
2413 cache.add_order(primary.clone(), None, None, false).unwrap();
2414 }
2415
2416 let spawned = algo.spawn_market(
2417 &mut primary,
2418 Quantity::from("0.5"),
2419 TimeInForce::Fok,
2420 false,
2421 None,
2422 false,
2423 );
2424
2425 assert_eq!(primary.quantity(), Quantity::from("1.0"));
2426
2427 let spawned_order = OrderAny::Market(spawned);
2428 {
2429 let cache_rc = algo.core.cache_rc();
2430 let mut cache = cache_rc.borrow_mut();
2431 cache
2432 .add_order(spawned_order.clone(), None, None, false)
2433 .unwrap();
2434 }
2435
2436 let denied = OrderDeniedSpec::builder()
2437 .trader_id(spawned_order.trader_id())
2438 .strategy_id(spawned_order.strategy_id())
2439 .instrument_id(spawned_order.instrument_id())
2440 .client_order_id(spawned_order.client_order_id())
2441 .reason("TEST_DENIAL".into())
2442 .build();
2443
2444 {
2445 let cache_rc = algo.core.cache_rc();
2446 let mut cache = cache_rc.borrow_mut();
2447 cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2448 }
2449
2450 algo.handle_order_event(OrderEventAny::Denied(denied));
2451
2452 let final_primary = {
2453 let cache = algo.core.cache();
2454 cache
2455 .order(&ClientOrderId::from("O-001"))
2456 .map(|o| o.clone())
2457 .unwrap()
2458 };
2459 assert_eq!(final_primary.quantity(), Quantity::from("1.0"));
2460 }
2461
2462 #[rstest]
2463 fn test_multiple_spawns_with_one_denied_restores_correctly() {
2464 let mut algo = create_test_algorithm();
2465 register_algorithm(&mut algo);
2466
2467 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2468 let exec_algorithm_id = algo.id();
2469
2470 let mut primary = OrderAny::Market(MarketOrder::new(
2471 TraderId::from("TRADER-001"),
2472 StrategyId::from("STRAT-001"),
2473 instrument_id,
2474 ClientOrderId::from("O-001"),
2475 OrderSide::Buy,
2476 Quantity::from("1.0"),
2477 TimeInForce::Gtc,
2478 UUID4::new(),
2479 0.into(),
2480 false,
2481 false,
2482 None,
2483 None,
2484 None,
2485 None,
2486 Some(exec_algorithm_id),
2487 None,
2488 None,
2489 None,
2490 ));
2491
2492 {
2493 let cache_rc = algo.core.cache_rc();
2494 let mut cache = cache_rc.borrow_mut();
2495 cache.add_order(primary.clone(), None, None, false).unwrap();
2496 }
2497
2498 let spawned1 = algo.spawn_market(
2499 &mut primary,
2500 Quantity::from("0.3"),
2501 TimeInForce::Fok,
2502 false,
2503 None,
2504 true,
2505 );
2506 let spawned2 = algo.spawn_market(
2507 &mut primary,
2508 Quantity::from("0.4"),
2509 TimeInForce::Fok,
2510 false,
2511 None,
2512 true,
2513 );
2514 assert_eq!(primary.quantity(), Quantity::from("0.3"));
2515
2516 let spawned_order1 = OrderAny::Market(spawned1);
2517 let spawned_order2 = OrderAny::Market(spawned2);
2518 {
2519 let cache_rc = algo.core.cache_rc();
2520 let mut cache = cache_rc.borrow_mut();
2521 cache.add_order(spawned_order1, None, None, false).unwrap();
2522 cache
2523 .add_order(spawned_order2.clone(), None, None, false)
2524 .unwrap();
2525 }
2526
2527 let denied = OrderDeniedSpec::builder()
2528 .trader_id(spawned_order2.trader_id())
2529 .strategy_id(spawned_order2.strategy_id())
2530 .instrument_id(spawned_order2.instrument_id())
2531 .client_order_id(spawned_order2.client_order_id())
2532 .reason("TEST_DENIAL".into())
2533 .build();
2534
2535 {
2536 let cache_rc = algo.core.cache_rc();
2537 let mut cache = cache_rc.borrow_mut();
2538 cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2539 }
2540
2541 let (handler, events) = subscribe_order_topic(spawned_order2.strategy_id());
2542
2543 algo.handle_order_event(OrderEventAny::Denied(denied));
2544
2545 msgbus::unsubscribe_order_events(
2546 format!("events.order.{}", spawned_order2.strategy_id()).into(),
2547 &handler,
2548 );
2549 let events = events.borrow();
2550
2551 let restored_primary = {
2552 let cache = algo.core.cache();
2553 cache
2554 .order(&ClientOrderId::from("O-001"))
2555 .map(|o| o.clone())
2556 .unwrap()
2557 };
2558 assert_eq!(restored_primary.quantity(), Quantity::from("0.7"));
2559 assert_eq!(events.len(), 1);
2560 assert!(matches!(
2561 &events[0],
2562 OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
2563 ));
2564 }
2565
2566 #[rstest]
2567 fn test_spawned_order_accepted_prevents_restoration() {
2568 let mut algo = create_test_algorithm();
2569 register_algorithm(&mut algo);
2570
2571 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2572 let exec_algorithm_id = algo.id();
2573
2574 let mut primary = OrderAny::Market(MarketOrder::new(
2575 TraderId::from("TRADER-001"),
2576 StrategyId::from("STRAT-001"),
2577 instrument_id,
2578 ClientOrderId::from("O-001"),
2579 OrderSide::Buy,
2580 Quantity::from("1.0"),
2581 TimeInForce::Gtc,
2582 UUID4::new(),
2583 0.into(),
2584 false,
2585 false,
2586 None,
2587 None,
2588 None,
2589 None,
2590 Some(exec_algorithm_id),
2591 None,
2592 None,
2593 None,
2594 ));
2595
2596 {
2597 let cache_rc = algo.core.cache_rc();
2598 let mut cache = cache_rc.borrow_mut();
2599 cache.add_order(primary.clone(), None, None, false).unwrap();
2600 }
2601
2602 let spawned = algo.spawn_market(
2603 &mut primary,
2604 Quantity::from("0.5"),
2605 TimeInForce::Fok,
2606 false,
2607 None,
2608 true,
2609 );
2610
2611 assert_eq!(primary.quantity(), Quantity::from("0.5"));
2612
2613 let mut spawned_order = OrderAny::Market(spawned);
2614 {
2615 let cache_rc = algo.core.cache_rc();
2616 let mut cache = cache_rc.borrow_mut();
2617 cache
2618 .add_order(spawned_order.clone(), None, None, false)
2619 .unwrap();
2620 }
2621
2622 let accepted = OrderAcceptedSpec::builder()
2623 .trader_id(spawned_order.trader_id())
2624 .strategy_id(spawned_order.strategy_id())
2625 .instrument_id(spawned_order.instrument_id())
2626 .client_order_id(spawned_order.client_order_id())
2627 .venue_order_id(VenueOrderId::from("V-123"))
2628 .account_id(AccountId::from("BINANCE-001"))
2629 .build();
2630
2631 {
2632 let cache_rc = algo.core.cache_rc();
2633 let mut cache = cache_rc.borrow_mut();
2634 spawned_order = cache
2635 .update_order(&OrderEventAny::Accepted(accepted))
2636 .unwrap();
2637 }
2638
2639 algo.handle_order_event(OrderEventAny::Accepted(accepted));
2640
2641 let primary_after_accept = {
2642 let cache = algo.core.cache();
2643 cache
2644 .order(&ClientOrderId::from("O-001"))
2645 .map(|o| o.clone())
2646 .unwrap()
2647 };
2648 assert_eq!(primary_after_accept.quantity(), Quantity::from("0.5"));
2649
2650 let canceled = OrderCanceledSpec::builder()
2652 .trader_id(spawned_order.trader_id())
2653 .strategy_id(spawned_order.strategy_id())
2654 .instrument_id(spawned_order.instrument_id())
2655 .client_order_id(spawned_order.client_order_id())
2656 .venue_order_id(VenueOrderId::from("V-123"))
2657 .account_id(AccountId::from("BINANCE-001"))
2658 .build();
2659
2660 {
2661 let cache_rc = algo.core.cache_rc();
2662 let mut cache = cache_rc.borrow_mut();
2663 cache
2664 .update_order(&OrderEventAny::Canceled(canceled))
2665 .unwrap();
2666 }
2667
2668 algo.handle_order_event(OrderEventAny::Canceled(canceled));
2669
2670 let final_primary = {
2671 let cache = algo.core.cache();
2672 cache
2673 .order(&ClientOrderId::from("O-001"))
2674 .map(|o| o.clone())
2675 .unwrap()
2676 };
2677 assert_eq!(final_primary.quantity(), Quantity::from("0.5"));
2678 }
2679
2680 #[rstest]
2681 #[should_panic(expected = "exceeds primary leaves_qty")]
2682 fn test_spawn_quantity_exceeds_leaves_qty_panics() {
2683 let mut algo = create_test_algorithm();
2684 register_algorithm(&mut algo);
2685
2686 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2687 let exec_algorithm_id = algo.id();
2688
2689 let mut primary = OrderAny::Market(MarketOrder::new(
2690 TraderId::from("TRADER-001"),
2691 StrategyId::from("STRAT-001"),
2692 instrument_id,
2693 ClientOrderId::from("O-001"),
2694 OrderSide::Buy,
2695 Quantity::from("1.0"),
2696 TimeInForce::Gtc,
2697 UUID4::new(),
2698 0.into(),
2699 false,
2700 false,
2701 None,
2702 None,
2703 None,
2704 None,
2705 Some(exec_algorithm_id),
2706 None,
2707 None,
2708 None,
2709 ));
2710
2711 {
2712 let cache_rc = algo.core.cache_rc();
2713 let mut cache = cache_rc.borrow_mut();
2714 cache.add_order(primary.clone(), None, None, false).unwrap();
2715 }
2716
2717 let _ = algo.spawn_market(
2718 &mut primary,
2719 Quantity::from("0.8"),
2720 TimeInForce::Fok,
2721 false,
2722 None,
2723 true,
2724 );
2725
2726 assert_eq!(primary.quantity(), Quantity::from("0.2"));
2727 assert_eq!(primary.leaves_qty(), Quantity::from("0.2"));
2728
2729 let _ = algo.spawn_market(
2731 &mut primary,
2732 Quantity::from("0.5"),
2733 TimeInForce::Fok,
2734 false,
2735 None,
2736 true,
2737 );
2738 }
2739}