1pub mod config;
39pub mod core;
40pub mod twap;
41
42pub use core::{ExecutionAlgorithmCore, ExecutionAlgorithmNative, StrategyEventHandlers};
43
44pub use config::{ExecutionAlgorithmConfig, ImportableExecAlgorithmConfig};
45use nautilus_common::{
46 actor::{DataActor, DataActorNative, registry::try_get_actor_unchecked},
47 enums::ComponentState,
48 logging::{CMD, EVT, RECV, SEND},
49 messages::execution::{CancelOrder, ModifyOrder, SubmitOrder, TradingCommand},
50 msgbus::{self, MessagingSwitchboard, TypedHandler},
51 timer::TimeEvent,
52};
53use nautilus_core::{UUID4, UnixNanos};
54use nautilus_model::{
55 enums::{OrderStatus, TimeInForce, TriggerType},
56 events::{
57 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied, OrderEmulated,
58 OrderEventAny, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
59 OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted,
60 OrderTriggered, OrderUpdated, PositionChanged, PositionClosed, PositionEvent,
61 PositionOpened,
62 },
63 identifiers::{AccountId, ClientId, ExecAlgorithmId, PositionId, StrategyId, TraderId},
64 orders::{LimitOrder, MarketOrder, MarketToLimitOrder, Order, OrderAny, OrderError, OrderList},
65 types::{Price, Quantity},
66};
67pub use twap::{TwapAlgorithm, TwapAlgorithmConfig};
68use ustr::Ustr;
69
70pub trait ExecutionAlgorithm: DataActor {
92 fn id(&self) -> ExecAlgorithmId
94 where
95 Self: ExecutionAlgorithmNative,
96 {
97 ExecutionAlgorithmNative::exec_algorithm_core(self).exec_algorithm_id
98 }
99
100 fn execute(&mut self, command: TradingCommand) -> anyhow::Result<()>
111 where
112 Self: ExecutionAlgorithmNative,
113 Self: 'static + std::fmt::Debug + Sized,
114 {
115 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
116 if core.config.log_commands {
117 let id = &core.actor.actor_id;
118 log::info!("{id} {RECV}{CMD} {command:?}");
119 }
120
121 if DataActorNative::core(core).state() != ComponentState::Running {
122 return Ok(());
123 }
124
125 match command {
126 TradingCommand::SubmitOrder(cmd) => {
127 self.subscribe_to_strategy_events(cmd.strategy_id);
128 let order = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
129 .get_order(&cmd.client_order_id)?;
130 self.on_order(order)
131 }
132 TradingCommand::SubmitOrderList(cmd) => {
133 self.subscribe_to_strategy_events(cmd.strategy_id);
134 let orders = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
135 .get_orders_for_list(&cmd.order_list)?;
136 self.on_order_list(cmd.order_list, orders)
137 }
138 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(cmd),
139 _ => {
140 log::warn!("Unhandled command type: {command:?}");
141 Ok(())
142 }
143 }
144 }
145
146 fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()>;
154
155 fn on_order_list(
164 &mut self,
165 _order_list: OrderList,
166 orders: Vec<OrderAny>,
167 ) -> anyhow::Result<()> {
168 for order in orders {
169 self.on_order(order)?;
170 }
171 Ok(())
172 }
173
174 fn handle_cancel_order(&mut self, command: CancelOrder) -> anyhow::Result<()>
183 where
184 Self: ExecutionAlgorithmNative,
185 {
186 let (order, is_pending_cancel) = {
187 let cache = ExecutionAlgorithmNative::exec_algorithm_core_mut(self).cache_ref();
188
189 let Some(order) = cache.order(&command.client_order_id) else {
190 log::warn!(
191 "Cannot cancel order: {} not found in cache",
192 command.client_order_id
193 );
194 return Ok(());
195 };
196
197 let is_pending = cache.is_order_pending_cancel_local(&command.client_order_id);
198 (order.clone(), is_pending)
199 };
200
201 if is_pending_cancel {
202 return Ok(());
203 }
204
205 if order.is_closed() {
206 log::warn!("Order already closed for {command:?}");
207 return Ok(());
208 }
209
210 let event = OrderEventAny::Canceled(self.generate_order_canceled(&order));
211
212 let order = {
213 let cache_rc = ExecutionAlgorithmNative::exec_algorithm_core_mut(self).cache_rc();
214 let mut cache = cache_rc.borrow_mut();
215 match cache.update_order(&event) {
216 Ok(order) => order,
217 Err(e)
218 if matches!(
219 e.downcast_ref::<OrderError>(),
220 Some(OrderError::InvalidStateTransition)
221 ) =>
222 {
223 log::warn!("InvalidStateTrigger: {e}, did not apply cancel event");
224 return Ok(());
225 }
226 Err(e) => return Err(e),
227 }
228 };
229
230 let topic = format!("events.order.{}", order.strategy_id());
231 msgbus::publish_order_event(topic.into(), &event);
232 msgbus::publish_order_event(
233 msgbus::switchboard::get_order_canceled_topic(order.instrument_id()),
234 &event,
235 );
236
237 Ok(())
238 }
239
240 fn generate_order_canceled(&mut self, order: &OrderAny) -> OrderCanceled
242 where
243 Self: ExecutionAlgorithmNative,
244 {
245 let ts_now = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
246 .clock_mut()
247 .timestamp_ns();
248
249 OrderCanceled::new(
250 order.trader_id(),
251 order.strategy_id(),
252 order.instrument_id(),
253 order.client_order_id(),
254 UUID4::new(),
255 ts_now,
256 ts_now,
257 false, order.venue_order_id(),
259 order.account_id(),
260 )
261 }
262
263 fn generate_order_pending_update(&mut self, order: &OrderAny) -> OrderPendingUpdate
265 where
266 Self: ExecutionAlgorithmNative,
267 {
268 let ts_now = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
269 .clock_mut()
270 .timestamp_ns();
271
272 OrderPendingUpdate::new(
273 order.trader_id(),
274 order.strategy_id(),
275 order.instrument_id(),
276 order.client_order_id(),
277 order
278 .account_id()
279 .expect("Order must have account_id for pending update"),
280 UUID4::new(),
281 ts_now,
282 ts_now,
283 false, order.venue_order_id(),
285 )
286 }
287
288 fn generate_order_pending_cancel(&mut self, order: &OrderAny) -> OrderPendingCancel
290 where
291 Self: ExecutionAlgorithmNative,
292 {
293 let ts_now = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
294 .clock_mut()
295 .timestamp_ns();
296
297 OrderPendingCancel::new(
298 order.trader_id(),
299 order.strategy_id(),
300 order.instrument_id(),
301 order.client_order_id(),
302 order
303 .account_id()
304 .expect("Order must have account_id for pending cancel"),
305 UUID4::new(),
306 ts_now,
307 ts_now,
308 false, order.venue_order_id(),
310 )
311 }
312
313 fn spawn_market(
326 &mut self,
327 primary: &mut OrderAny,
328 quantity: Quantity,
329 time_in_force: TimeInForce,
330 reduce_only: bool,
331 tags: Option<Vec<Ustr>>,
332 reduce_primary: bool,
333 ) -> MarketOrder
334 where
335 Self: ExecutionAlgorithmNative,
336 {
337 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
339 let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
340 let ts_init = core.clock_mut().timestamp_ns();
341 let exec_algorithm_id = core.exec_algorithm_id;
342
343 if reduce_primary {
344 self.reduce_primary_order(primary, quantity);
345 ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
346 .track_pending_spawn_reduction(client_order_id, quantity);
347 }
348
349 MarketOrder::new(
350 primary.trader_id(),
351 primary.strategy_id(),
352 primary.instrument_id(),
353 client_order_id,
354 primary.order_side(),
355 quantity,
356 time_in_force,
357 UUID4::new(),
358 ts_init,
359 reduce_only,
360 primary.is_quote_quantity(),
361 primary.contingency_type(),
362 primary.order_list_id(),
363 primary.linked_order_ids().map(|ids| ids.to_vec()),
364 primary.parent_order_id(),
365 Some(exec_algorithm_id),
366 primary.exec_algorithm_params().cloned(),
367 Some(primary.client_order_id()),
368 tags.or_else(|| primary.tags().map(|t| t.to_vec())),
369 )
370 }
371
372 #[expect(clippy::too_many_arguments)]
385 fn spawn_limit(
386 &mut self,
387 primary: &mut OrderAny,
388 quantity: Quantity,
389 price: Price,
390 time_in_force: TimeInForce,
391 expire_time: Option<UnixNanos>,
392 post_only: bool,
393 reduce_only: bool,
394 display_qty: Option<Quantity>,
395 emulation_trigger: Option<TriggerType>,
396 tags: Option<Vec<Ustr>>,
397 reduce_primary: bool,
398 ) -> LimitOrder
399 where
400 Self: ExecutionAlgorithmNative,
401 {
402 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
404 let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
405 let ts_init = core.clock_mut().timestamp_ns();
406 let exec_algorithm_id = core.exec_algorithm_id;
407
408 if reduce_primary {
409 self.reduce_primary_order(primary, quantity);
410 ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
411 .track_pending_spawn_reduction(client_order_id, quantity);
412 }
413
414 LimitOrder::new(
415 primary.trader_id(),
416 primary.strategy_id(),
417 primary.instrument_id(),
418 client_order_id,
419 primary.order_side(),
420 quantity,
421 price,
422 time_in_force,
423 expire_time,
424 post_only,
425 reduce_only,
426 primary.is_quote_quantity(),
427 display_qty,
428 emulation_trigger,
429 None, primary.contingency_type(),
431 primary.order_list_id(),
432 primary.linked_order_ids().map(|ids| ids.to_vec()),
433 primary.parent_order_id(),
434 Some(exec_algorithm_id),
435 primary.exec_algorithm_params().cloned(),
436 Some(primary.client_order_id()),
437 tags.or_else(|| primary.tags().map(|t| t.to_vec())),
438 UUID4::new(),
439 ts_init,
440 )
441 }
442
443 #[expect(clippy::too_many_arguments)]
456 fn spawn_market_to_limit(
457 &mut self,
458 primary: &mut OrderAny,
459 quantity: Quantity,
460 time_in_force: TimeInForce,
461 expire_time: Option<UnixNanos>,
462 reduce_only: bool,
463 display_qty: Option<Quantity>,
464 emulation_trigger: Option<TriggerType>,
465 tags: Option<Vec<Ustr>>,
466 reduce_primary: bool,
467 ) -> MarketToLimitOrder
468 where
469 Self: ExecutionAlgorithmNative,
470 {
471 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
473 let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
474 let ts_init = core.clock_mut().timestamp_ns();
475 let exec_algorithm_id = core.exec_algorithm_id;
476
477 if reduce_primary {
478 self.reduce_primary_order(primary, quantity);
479 ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
480 .track_pending_spawn_reduction(client_order_id, quantity);
481 }
482
483 let mut order = MarketToLimitOrder::new(
484 primary.trader_id(),
485 primary.strategy_id(),
486 primary.instrument_id(),
487 client_order_id,
488 primary.order_side(),
489 quantity,
490 time_in_force,
491 expire_time,
492 false, reduce_only,
494 primary.is_quote_quantity(),
495 display_qty,
496 primary.contingency_type(),
497 primary.order_list_id(),
498 primary.linked_order_ids().map(|ids| ids.to_vec()),
499 primary.parent_order_id(),
500 Some(exec_algorithm_id),
501 primary.exec_algorithm_params().cloned(),
502 Some(primary.client_order_id()),
503 tags.or_else(|| primary.tags().map(|t| t.to_vec())),
504 UUID4::new(),
505 ts_init,
506 );
507
508 if emulation_trigger.is_some() {
509 order.set_emulation_trigger(emulation_trigger);
510 }
511
512 order
513 }
514
515 fn reduce_primary_order(&mut self, primary: &mut OrderAny, spawn_qty: Quantity)
524 where
525 Self: ExecutionAlgorithmNative,
526 {
527 let leaves_qty = primary.leaves_qty();
528 assert!(
529 leaves_qty >= spawn_qty,
530 "Spawn quantity {spawn_qty} exceeds primary leaves_qty {leaves_qty}"
531 );
532
533 let primary_qty = primary.quantity();
534 let new_qty = Quantity::from_raw(primary_qty.raw - spawn_qty.raw, primary_qty.precision);
535
536 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
537 let ts_now = core.clock_mut().timestamp_ns();
538
539 let updated = OrderUpdated::new(
540 primary.trader_id(),
541 primary.strategy_id(),
542 primary.instrument_id(),
543 primary.client_order_id(),
544 new_qty,
545 UUID4::new(),
546 ts_now,
547 ts_now,
548 false, primary.venue_order_id(),
550 primary.account_id(),
551 None, None, None, primary.is_quote_quantity(),
555 );
556
557 let event = OrderEventAny::Updated(updated);
558
559 {
560 let cache_rc = core.cache_rc();
561 let mut cache = cache_rc.borrow_mut();
562 *primary = cache
563 .update_order(&event)
564 .expect("Failed to update order in cache");
565 }
566
567 publish_order_event(&event);
568 }
569
570 fn restore_primary_order_quantity(&mut self, order: &OrderAny)
576 where
577 Self: ExecutionAlgorithmNative,
578 {
579 let Some(exec_spawn_id) = order.exec_spawn_id() else {
580 return;
581 };
582
583 let reduction_qty = {
584 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
585 core.take_pending_spawn_reduction(&order.client_order_id())
586 };
587
588 let Some(reduction_qty) = reduction_qty else {
589 return;
590 };
591
592 let primary = {
593 let cache = ExecutionAlgorithmNative::exec_algorithm_core_mut(self).cache_ref();
594 cache.order(&exec_spawn_id).map(|o| o.clone())
595 };
596
597 let Some(primary) = primary else {
598 log::warn!(
599 "Cannot restore primary order quantity: primary order {exec_spawn_id} not found",
600 );
601 return;
602 };
603
604 let restore_raw = std::cmp::min(reduction_qty.raw, order.leaves_qty().raw);
606 if restore_raw == 0 {
607 return;
608 }
609
610 let restored_qty = Quantity::from_raw(
611 primary.quantity().raw + restore_raw,
612 primary.quantity().precision,
613 );
614
615 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
616 let ts_now = core.clock_mut().timestamp_ns();
617
618 let updated = OrderUpdated::new(
619 primary.trader_id(),
620 primary.strategy_id(),
621 primary.instrument_id(),
622 primary.client_order_id(),
623 restored_qty,
624 UUID4::new(),
625 ts_now,
626 ts_now,
627 false, primary.venue_order_id(),
629 primary.account_id(),
630 None, None, None, primary.is_quote_quantity(),
634 );
635
636 let event = OrderEventAny::Updated(updated);
637
638 let primary = {
639 let cache_rc = core.cache_rc();
640 let mut cache = cache_rc.borrow_mut();
641 match cache.update_order(&event) {
642 Ok(primary) => primary,
643 Err(e) => {
644 log::warn!("Failed to update primary order in cache: {e}");
645 return;
646 }
647 }
648 };
649
650 publish_order_event(&event);
651
652 log::info!(
653 "Restored primary order {} quantity to {} after spawned order {} was denied/rejected",
654 primary.client_order_id(),
655 restored_qty,
656 order.client_order_id()
657 );
658 }
659
660 fn submit_order(
666 &mut self,
667 order: OrderAny,
668 position_id: Option<PositionId>,
669 client_id: Option<ClientId>,
670 ) -> anyhow::Result<()>
671 where
672 Self: ExecutionAlgorithmNative,
673 {
674 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
675
676 let trader_id = registered_trader_id(core)?;
677 let ts_init = core.clock_mut().timestamp_ns();
678
679 let strategy_id = order.strategy_id();
681
682 let order_exists = {
683 let cache = core.cache_ref();
684 cache.order_exists(&order.client_order_id())
685 };
686
687 {
688 let cache_rc = core.cache_rc();
689 let mut cache = cache_rc.borrow_mut();
690 cache.add_order(order.clone(), position_id, client_id, true)?;
691 }
692
693 if !order_exists {
694 publish_order_initialized(&order);
695 }
696
697 let command = SubmitOrder::new(
698 trader_id,
699 client_id,
700 strategy_id,
701 order.instrument_id(),
702 order.client_order_id(),
703 order.init_event().clone(),
704 order.exec_algorithm_id(),
705 position_id,
706 None, UUID4::new(),
708 ts_init,
709 None, );
711
712 if core.config.log_commands {
713 let id = &core.actor.actor_id;
714 log::info!("{id} {SEND}{CMD} {command:?}");
715 }
716
717 msgbus::send_trading_command(
718 MessagingSwitchboard::risk_engine_execute(),
719 TradingCommand::SubmitOrder(command),
720 );
721
722 Ok(())
723 }
724
725 fn modify_order(
731 &mut self,
732 order: &mut OrderAny,
733 quantity: Option<Quantity>,
734 price: Option<Price>,
735 trigger_price: Option<Price>,
736 client_id: Option<ClientId>,
737 ) -> anyhow::Result<()>
738 where
739 Self: ExecutionAlgorithmNative,
740 {
741 let qty_changing = quantity.is_some_and(|q| q != order.quantity());
742 let price_changing = price.is_some() && price != order.price();
743 let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
744
745 if !qty_changing && !price_changing && !trigger_changing {
746 log::error!(
747 "Cannot create command ModifyOrder: \
748 quantity, price and trigger were either None \
749 or the same as existing values"
750 );
751 return Ok(());
752 }
753
754 if order.is_closed() || order.is_pending_cancel() {
755 log::warn!(
756 "Cannot create command ModifyOrder: state is {:?}, {order:?}",
757 order.status()
758 );
759 return Ok(());
760 }
761
762 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
763 let trader_id = registered_trader_id(core)?;
764 let strategy_id = order.strategy_id();
765
766 if !order.is_active_local() {
767 required_account_id(order, "pending update")?;
768 let event = self.generate_order_pending_update(order);
769 let event = OrderEventAny::PendingUpdate(event);
770
771 {
772 let cache_rc = ExecutionAlgorithmNative::exec_algorithm_core_mut(self).cache_rc();
773 let mut cache = cache_rc.borrow_mut();
774 match cache.update_order(&event) {
775 Ok(updated) => *order = updated,
776 Err(e)
777 if matches!(
778 e.downcast_ref::<OrderError>(),
779 Some(OrderError::InvalidStateTransition)
780 ) =>
781 {
782 log::warn!("InvalidStateTrigger: {e}, did not apply pending update event");
783 return Ok(());
784 }
785 Err(e) => return Err(e),
786 }
787 }
788
789 let topic = format!("events.order.{strategy_id}");
790 msgbus::publish_order_event(topic.into(), &event);
791 msgbus::publish_order_event(
792 msgbus::switchboard::get_order_pending_update_topic(order.instrument_id()),
793 &event,
794 );
795 }
796
797 let ts_init = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
798 .clock_mut()
799 .timestamp_ns();
800 let command = ModifyOrder::new(
801 trader_id,
802 client_id,
803 strategy_id,
804 order.instrument_id(),
805 order.client_order_id(),
806 order.venue_order_id(),
807 quantity,
808 price,
809 trigger_price,
810 UUID4::new(),
811 ts_init,
812 None, None, );
815
816 if ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
817 .config
818 .log_commands
819 {
820 let id = &ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
821 .actor
822 .actor_id;
823 log::info!("{id} {SEND}{CMD} {command:?}");
824 }
825
826 let has_emulation_trigger = order
827 .emulation_trigger()
828 .is_some_and(|t| t != TriggerType::NoTrigger);
829
830 if order.is_emulated() || has_emulation_trigger {
831 msgbus::send_trading_command(
832 MessagingSwitchboard::order_emulator_execute(),
833 TradingCommand::ModifyOrder(command),
834 );
835 } else {
836 msgbus::send_trading_command(
837 MessagingSwitchboard::risk_engine_execute(),
838 TradingCommand::ModifyOrder(command),
839 );
840 }
841
842 Ok(())
843 }
844
845 fn modify_order_in_place(
857 &mut self,
858 order: &mut OrderAny,
859 quantity: Option<Quantity>,
860 price: Option<Price>,
861 trigger_price: Option<Price>,
862 ) -> anyhow::Result<()>
863 where
864 Self: ExecutionAlgorithmNative,
865 {
866 let status = order.status();
868 if status != OrderStatus::Initialized && status != OrderStatus::Released {
869 anyhow::bail!(
870 "Cannot modify order in place: status is {status:?}, expected INITIALIZED or RELEASED"
871 );
872 }
873
874 if price.is_some() && order.price().is_none() {
876 anyhow::bail!(
877 "Cannot modify order in place: {} orders do not have a LIMIT price",
878 order.order_type()
879 );
880 }
881
882 if trigger_price.is_some() && order.trigger_price().is_none() {
883 anyhow::bail!(
884 "Cannot modify order in place: {} orders do not have a STOP trigger price",
885 order.order_type()
886 );
887 }
888
889 let qty_changing = quantity.is_some_and(|q| q != order.quantity());
891 let price_changing = price.is_some() && price != order.price();
892 let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
893
894 if !qty_changing && !price_changing && !trigger_changing {
895 anyhow::bail!("Cannot modify order in place: no parameters differ from current values");
896 }
897
898 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
899 let ts_now = core.clock_mut().timestamp_ns();
900
901 let updated = OrderUpdated::new(
902 order.trader_id(),
903 order.strategy_id(),
904 order.instrument_id(),
905 order.client_order_id(),
906 quantity.unwrap_or_else(|| order.quantity()),
907 UUID4::new(),
908 ts_now,
909 ts_now,
910 false, order.venue_order_id(),
912 order.account_id(),
913 price,
914 trigger_price,
915 None, order.is_quote_quantity(),
917 );
918
919 let event = OrderEventAny::Updated(updated);
920
921 {
922 let cache_rc = core.cache_rc();
923 let mut cache = cache_rc.borrow_mut();
924 *order = cache.update_order(&event)?;
925 }
926
927 publish_order_event(&event);
928
929 Ok(())
930 }
931
932 fn cancel_order(
938 &mut self,
939 order: &mut OrderAny,
940 client_id: Option<ClientId>,
941 ) -> anyhow::Result<()>
942 where
943 Self: ExecutionAlgorithmNative,
944 {
945 if order.is_closed() || order.is_pending_cancel() {
946 log::warn!(
947 "Cannot cancel order: state is {:?}, {order:?}",
948 order.status()
949 );
950 return Ok(());
951 }
952
953 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
954 let trader_id = registered_trader_id(core)?;
955 let strategy_id = order.strategy_id();
956
957 if !order.is_active_local() {
958 required_account_id(order, "pending cancel")?;
959 let event = self.generate_order_pending_cancel(order);
960 let event = OrderEventAny::PendingCancel(event);
961
962 {
963 let cache_rc = ExecutionAlgorithmNative::exec_algorithm_core_mut(self).cache_rc();
964 let mut cache = cache_rc.borrow_mut();
965 match cache.update_order(&event) {
966 Ok(updated) => *order = updated,
967 Err(e)
968 if matches!(
969 e.downcast_ref::<OrderError>(),
970 Some(OrderError::InvalidStateTransition)
971 ) =>
972 {
973 log::warn!("InvalidStateTrigger: {e}, did not apply pending cancel event");
974 return Ok(());
975 }
976 Err(e) => return Err(e),
977 }
978 }
979
980 let topic = format!("events.order.{strategy_id}");
981 msgbus::publish_order_event(topic.into(), &event);
982 msgbus::publish_order_event(
983 msgbus::switchboard::get_order_pending_cancel_topic(order.instrument_id()),
984 &event,
985 );
986 }
987
988 let ts_init = ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
989 .clock_mut()
990 .timestamp_ns();
991 let command = CancelOrder::new(
992 trader_id,
993 client_id,
994 strategy_id,
995 order.instrument_id(),
996 order.client_order_id(),
997 order.venue_order_id(),
998 UUID4::new(),
999 ts_init,
1000 None, None, );
1003
1004 if ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
1005 .config
1006 .log_commands
1007 {
1008 let id = &ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
1009 .actor
1010 .actor_id;
1011 log::info!("{id} {SEND}{CMD} {command:?}");
1012 }
1013
1014 let has_emulation_trigger = order
1015 .emulation_trigger()
1016 .is_some_and(|t| t != TriggerType::NoTrigger);
1017
1018 if order.is_emulated() || order.status() == OrderStatus::Released || has_emulation_trigger {
1019 msgbus::send_trading_command(
1020 MessagingSwitchboard::order_emulator_execute(),
1021 TradingCommand::CancelOrder(command),
1022 );
1023 } else {
1024 msgbus::send_trading_command(
1025 MessagingSwitchboard::exec_engine_execute(),
1026 TradingCommand::CancelOrder(command),
1027 );
1028 }
1029
1030 Ok(())
1031 }
1032
1033 fn subscribe_to_strategy_events(&mut self, strategy_id: StrategyId)
1037 where
1038 Self: ExecutionAlgorithmNative,
1039 Self: 'static + std::fmt::Debug + Sized,
1040 {
1041 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
1042 if core.is_strategy_subscribed(&strategy_id) {
1043 return;
1044 }
1045
1046 let actor_id = core.actor.actor_id.inner();
1047
1048 let order_topic = format!("events.order.{strategy_id}");
1049 let order_actor_id = actor_id;
1050 let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
1051 if let Some(mut algo) = try_get_actor_unchecked::<Self>(&order_actor_id) {
1052 algo.handle_order_event(event.clone());
1053 } else {
1054 log::error!(
1055 "ExecutionAlgorithm {order_actor_id} not found for order event handling"
1056 );
1057 }
1058 });
1059 msgbus::subscribe_order_events(order_topic.clone().into(), order_handler.clone(), None);
1060
1061 let position_topic = format!("events.position.{strategy_id}");
1062 let position_handler = TypedHandler::from(move |event: &PositionEvent| {
1063 if let Some(mut algo) = try_get_actor_unchecked::<Self>(&actor_id) {
1064 algo.handle_position_event(event.clone());
1065 } else {
1066 log::error!("ExecutionAlgorithm {actor_id} not found for position event handling");
1067 }
1068 });
1069 msgbus::subscribe_position_events(
1070 position_topic.clone().into(),
1071 position_handler.clone(),
1072 None,
1073 );
1074
1075 let handlers = StrategyEventHandlers {
1076 order_topic,
1077 order_handler,
1078 position_topic,
1079 position_handler,
1080 };
1081 core.store_strategy_event_handlers(strategy_id, handlers);
1082
1083 core.add_subscribed_strategy(strategy_id);
1084 log::info!("Subscribed to events for strategy {strategy_id}");
1085 }
1086
1087 fn unsubscribe_all_strategy_events(&mut self)
1091 where
1092 Self: ExecutionAlgorithmNative,
1093 {
1094 let handlers =
1095 ExecutionAlgorithmNative::exec_algorithm_core_mut(self).take_strategy_event_handlers();
1096
1097 for (strategy_id, h) in handlers {
1098 msgbus::unsubscribe_order_events(h.order_topic.into(), &h.order_handler);
1099 msgbus::unsubscribe_position_events(h.position_topic.into(), &h.position_handler);
1100 log::info!("Unsubscribed from events for strategy {strategy_id}");
1101 }
1102 ExecutionAlgorithmNative::exec_algorithm_core_mut(self).clear_subscribed_strategies();
1103 }
1104
1105 fn handle_order_event(&mut self, event: OrderEventAny)
1107 where
1108 Self: ExecutionAlgorithmNative,
1109 {
1110 if DataActorNative::core(ExecutionAlgorithmNative::exec_algorithm_core_mut(self)).state()
1111 != ComponentState::Running
1112 {
1113 return;
1114 }
1115
1116 let order = {
1117 let cache = ExecutionAlgorithmNative::exec_algorithm_core_mut(self).cache_ref();
1118 cache.order(&event.client_order_id()).map(|o| o.clone())
1119 };
1120
1121 let Some(order) = order else {
1122 return;
1123 };
1124
1125 let Some(order_algo_id) = order.exec_algorithm_id() else {
1126 return;
1127 };
1128
1129 if order_algo_id != self.id() {
1130 return;
1131 }
1132
1133 {
1134 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
1135 if core.config.log_events {
1136 let id = &core.actor.actor_id;
1137 log::info!("{id} {RECV}{EVT} {event}");
1138 }
1139 }
1140
1141 match &event {
1142 OrderEventAny::Initialized(e) => self.on_order_initialized(e.clone()),
1143 OrderEventAny::Denied(e) => {
1144 self.restore_primary_order_quantity(&order);
1145 self.on_order_denied(*e);
1146 }
1147 OrderEventAny::Emulated(e) => self.on_order_emulated(*e),
1148 OrderEventAny::Released(e) => self.on_order_released(*e),
1149 OrderEventAny::Submitted(e) => self.on_order_submitted(*e),
1150 OrderEventAny::Rejected(e) => {
1151 self.restore_primary_order_quantity(&order);
1152 self.on_order_rejected(*e);
1153 }
1154 OrderEventAny::Accepted(e) => {
1155 ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
1157 .take_pending_spawn_reduction(&order.client_order_id());
1158 self.on_order_accepted(*e);
1159 }
1160 OrderEventAny::Canceled(e) => {
1161 ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
1162 .take_pending_spawn_reduction(&order.client_order_id());
1163 self.on_algo_order_canceled(*e);
1164 }
1165 OrderEventAny::Expired(e) => {
1166 ExecutionAlgorithmNative::exec_algorithm_core_mut(self)
1167 .take_pending_spawn_reduction(&order.client_order_id());
1168 self.on_order_expired(*e);
1169 }
1170 OrderEventAny::Triggered(e) => self.on_order_triggered(*e),
1171 OrderEventAny::PendingUpdate(e) => self.on_order_pending_update(*e),
1172 OrderEventAny::PendingCancel(e) => self.on_order_pending_cancel(*e),
1173 OrderEventAny::ModifyRejected(e) => self.on_order_modify_rejected(*e),
1174 OrderEventAny::CancelRejected(e) => self.on_order_cancel_rejected(*e),
1175 OrderEventAny::Updated(e) => self.on_order_updated(*e),
1176 OrderEventAny::Filled(e) => self.on_algo_order_filled(*e),
1177 }
1178
1179 self.on_order_event(event);
1180 }
1181
1182 fn handle_position_event(&mut self, event: PositionEvent)
1184 where
1185 Self: ExecutionAlgorithmNative,
1186 {
1187 if DataActorNative::core(ExecutionAlgorithmNative::exec_algorithm_core_mut(self)).state()
1188 != ComponentState::Running
1189 {
1190 return;
1191 }
1192
1193 {
1194 let core = ExecutionAlgorithmNative::exec_algorithm_core_mut(self);
1195 if core.config.log_events {
1196 let id = &core.actor.actor_id;
1197 log::info!("{id} {RECV}{EVT} {event:?}");
1198 }
1199 }
1200
1201 match &event {
1202 PositionEvent::PositionOpened(e) => self.on_position_opened(e.clone()),
1203 PositionEvent::PositionChanged(e) => self.on_position_changed(e.clone()),
1204 PositionEvent::PositionClosed(e) => self.on_position_closed(e.clone()),
1205 PositionEvent::PositionAdjusted(_) => {}
1206 }
1207
1208 self.on_position_event(event);
1209 }
1210
1211 fn on_start(&mut self) -> anyhow::Result<()>
1219 where
1220 Self: ExecutionAlgorithmNative,
1221 {
1222 let id = self.id();
1223 log::info!("Starting {id}");
1224 Ok(())
1225 }
1226
1227 fn on_stop(&mut self) -> anyhow::Result<()> {
1233 Ok(())
1234 }
1235
1236 fn on_reset(&mut self) -> anyhow::Result<()>
1242 where
1243 Self: ExecutionAlgorithmNative,
1244 {
1245 self.unsubscribe_all_strategy_events();
1246 ExecutionAlgorithmNative::exec_algorithm_core_mut(self).reset();
1247 Ok(())
1248 }
1249
1250 fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
1258 Ok(())
1259 }
1260
1261 #[allow(unused_variables)]
1263 fn on_order_initialized(&mut self, event: OrderInitialized) {}
1264
1265 #[allow(unused_variables)]
1267 fn on_order_denied(&mut self, event: OrderDenied) {}
1268
1269 #[allow(unused_variables)]
1271 fn on_order_emulated(&mut self, event: OrderEmulated) {}
1272
1273 #[allow(unused_variables)]
1275 fn on_order_released(&mut self, event: OrderReleased) {}
1276
1277 #[allow(unused_variables)]
1279 fn on_order_submitted(&mut self, event: OrderSubmitted) {}
1280
1281 #[allow(unused_variables)]
1283 fn on_order_rejected(&mut self, event: OrderRejected) {}
1284
1285 #[allow(unused_variables)]
1287 fn on_order_accepted(&mut self, event: OrderAccepted) {}
1288
1289 #[allow(unused_variables)]
1291 fn on_algo_order_canceled(&mut self, event: OrderCanceled) {}
1292
1293 #[allow(unused_variables)]
1295 fn on_order_expired(&mut self, event: OrderExpired) {}
1296
1297 #[allow(unused_variables)]
1299 fn on_order_triggered(&mut self, event: OrderTriggered) {}
1300
1301 #[allow(unused_variables)]
1303 fn on_order_pending_update(&mut self, event: OrderPendingUpdate) {}
1304
1305 #[allow(unused_variables)]
1307 fn on_order_pending_cancel(&mut self, event: OrderPendingCancel) {}
1308
1309 #[allow(unused_variables)]
1311 fn on_order_modify_rejected(&mut self, event: OrderModifyRejected) {}
1312
1313 #[allow(unused_variables)]
1315 fn on_order_cancel_rejected(&mut self, event: OrderCancelRejected) {}
1316
1317 #[allow(unused_variables)]
1319 fn on_order_updated(&mut self, event: OrderUpdated) {}
1320
1321 #[allow(unused_variables)]
1323 fn on_algo_order_filled(&mut self, event: OrderFilled) {}
1324
1325 #[allow(unused_variables)]
1327 fn on_order_event(&mut self, event: OrderEventAny) {}
1328
1329 #[allow(unused_variables)]
1331 fn on_position_opened(&mut self, event: PositionOpened) {}
1332
1333 #[allow(unused_variables)]
1335 fn on_position_changed(&mut self, event: PositionChanged) {}
1336
1337 #[allow(unused_variables)]
1339 fn on_position_closed(&mut self, event: PositionClosed) {}
1340
1341 #[allow(unused_variables)]
1343 fn on_position_event(&mut self, event: PositionEvent) {}
1344}
1345
1346fn publish_order_initialized(order: &OrderAny) {
1347 let event = OrderEventAny::Initialized(order.init_event().clone());
1348 publish_order_event(&event);
1349}
1350
1351fn publish_order_event(event: &OrderEventAny) {
1352 let topic = format!("events.order.{}", event.strategy_id());
1353 msgbus::publish_order_event(topic.into(), event);
1354}
1355
1356fn registered_trader_id(core: &ExecutionAlgorithmCore) -> anyhow::Result<TraderId> {
1357 DataActorNative::core(core)
1358 .trader_id()
1359 .ok_or_else(|| anyhow::anyhow!("ExecutionAlgorithm not registered: trader_id is not set"))
1360}
1361
1362fn required_account_id(order: &OrderAny, operation: &str) -> anyhow::Result<AccountId> {
1363 order.account_id().ok_or_else(|| {
1364 anyhow::anyhow!(
1365 "Cannot generate {operation} event for {}: account_id is not set",
1366 order.client_order_id()
1367 )
1368 })
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373 use std::{cell::RefCell, rc::Rc};
1374
1375 use nautilus_common::{
1376 actor::DataActor,
1377 cache::Cache,
1378 clock::{Clock, TestClock},
1379 component::Component,
1380 enums::ComponentTrigger,
1381 msgbus,
1382 msgbus::TypedHandler,
1383 };
1384 use nautilus_model::{
1385 enums::{OrderSide, OrderType},
1386 events::{
1387 OrderAccepted, OrderCanceled, OrderDenied, OrderRejected,
1388 order::spec::{
1389 OrderAcceptedSpec, OrderCanceledSpec, OrderDeniedSpec, OrderFilledSpec,
1390 OrderRejectedSpec,
1391 },
1392 },
1393 identifiers::{
1394 AccountId, ActorId, ClientOrderId, ComponentId, ExecAlgorithmId, InstrumentId,
1395 StrategyId, TraderId, VenueOrderId,
1396 },
1397 orders::{LimitOrder, MarketOrder, OrderAny, OrderTestBuilder, stubs::TestOrderStubs},
1398 types::{Price, Quantity},
1399 };
1400 use rstest::rstest;
1401
1402 use super::*;
1403 use crate::nautilus_execution_algorithm;
1404
1405 #[derive(Debug)]
1406 struct TestAlgorithm {
1407 core: ExecutionAlgorithmCore,
1408 on_order_called: bool,
1409 last_order_client_id: Option<ClientOrderId>,
1410 }
1411
1412 #[derive(Debug)]
1413 struct CoreFreeExecutionAlgorithm {
1414 state: ComponentState,
1415 orders_seen: usize,
1416 }
1417
1418 #[derive(Debug)]
1419 struct MacroTestCustomField {
1420 inner: ExecutionAlgorithmCore,
1421 }
1422
1423 impl Component for CoreFreeExecutionAlgorithm {
1424 fn component_id(&self) -> ComponentId {
1425 ComponentId::new("CoreFreeExecutionAlgorithm")
1426 }
1427
1428 fn state(&self) -> ComponentState {
1429 self.state
1430 }
1431
1432 fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
1433 self.state = self.state.transition(&trigger)?;
1434 Ok(())
1435 }
1436
1437 fn register(
1438 &mut self,
1439 _trader_id: TraderId,
1440 _clock: Rc<RefCell<dyn Clock>>,
1441 _cache: Rc<RefCell<Cache>>,
1442 ) -> anyhow::Result<()> {
1443 Ok(())
1444 }
1445 }
1446
1447 impl DataActor for CoreFreeExecutionAlgorithm {}
1448
1449 impl ExecutionAlgorithm for CoreFreeExecutionAlgorithm {
1450 fn on_order(&mut self, _order: OrderAny) -> anyhow::Result<()> {
1451 self.orders_seen += 1;
1452 Ok(())
1453 }
1454 }
1455
1456 impl DataActor for MacroTestCustomField {}
1457
1458 nautilus_execution_algorithm!(MacroTestCustomField, inner, {
1459 fn on_order(&mut self, _order: OrderAny) -> anyhow::Result<()> {
1460 Ok(())
1461 }
1462 });
1463
1464 impl TestAlgorithm {
1465 fn new(config: ExecutionAlgorithmConfig) -> Self {
1466 Self {
1467 core: ExecutionAlgorithmCore::new(config),
1468 on_order_called: false,
1469 last_order_client_id: None,
1470 }
1471 }
1472 }
1473
1474 impl DataActor for TestAlgorithm {}
1475
1476 nautilus_execution_algorithm!(TestAlgorithm, {
1477 fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()> {
1478 self.on_order_called = true;
1479 self.last_order_client_id = Some(order.client_order_id());
1480 Ok(())
1481 }
1482 });
1483
1484 fn create_test_algorithm() -> TestAlgorithm {
1485 let unique_id = format!("TEST-{}", UUID4::new());
1487 let config = ExecutionAlgorithmConfig {
1488 exec_algorithm_id: Some(ExecAlgorithmId::new(&unique_id)),
1489 ..Default::default()
1490 };
1491 TestAlgorithm::new(config)
1492 }
1493
1494 fn register_algorithm(algo: &mut TestAlgorithm) {
1495 let trader_id = TraderId::from("TRADER-001");
1496 let clock = Rc::new(RefCell::new(TestClock::new()));
1497 let cache = Rc::new(RefCell::new(Cache::default()));
1498
1499 algo.core.register(trader_id, clock, cache).unwrap();
1500
1501 algo.transition_state(ComponentTrigger::Initialize).unwrap();
1503 algo.transition_state(ComponentTrigger::Start).unwrap();
1504 algo.transition_state(ComponentTrigger::StartCompleted)
1505 .unwrap();
1506 }
1507
1508 fn subscribe_order_topic(
1509 strategy_id: StrategyId,
1510 ) -> (TypedHandler<OrderEventAny>, Rc<RefCell<Vec<OrderEventAny>>>) {
1511 let events = Rc::new(RefCell::new(Vec::new()));
1512 let handler = TypedHandler::from({
1513 let events = events.clone();
1514 move |event: &OrderEventAny| {
1515 events.borrow_mut().push(event.clone());
1516 }
1517 });
1518 msgbus::subscribe_order_events(
1519 format!("events.order.{strategy_id}").into(),
1520 handler.clone(),
1521 None,
1522 );
1523 (handler, events)
1524 }
1525
1526 #[rstest]
1527 fn test_algorithm_creation() {
1528 let algo = create_test_algorithm();
1529 assert!(algo.id().inner().starts_with("TEST-"));
1530 assert!(!algo.on_order_called);
1531 assert!(algo.last_order_client_id.is_none());
1532 }
1533
1534 #[rstest]
1535 fn test_algorithm_registration() {
1536 let mut algo = create_test_algorithm();
1537 register_algorithm(&mut algo);
1538
1539 assert_eq!(algo.trader_id(), Some(TraderId::from("TRADER-001")));
1540 }
1541
1542 #[rstest]
1543 fn test_submit_order_errors_when_algorithm_not_registered() {
1544 let mut algo = create_test_algorithm();
1545 let order = OrderAny::Market(MarketOrder::new(
1546 TraderId::from("TRADER-001"),
1547 StrategyId::from("STRAT-001"),
1548 InstrumentId::from("BTC/USDT.BINANCE"),
1549 ClientOrderId::from("O-UNREGISTERED-001"),
1550 OrderSide::Buy,
1551 Quantity::from("1.0"),
1552 TimeInForce::Gtc,
1553 UUID4::new(),
1554 0.into(),
1555 false,
1556 false,
1557 None,
1558 None,
1559 None,
1560 None,
1561 None,
1562 None,
1563 None,
1564 None,
1565 ));
1566
1567 let err = algo
1568 .submit_order(order, None, None)
1569 .unwrap_err()
1570 .to_string();
1571
1572 assert_eq!(
1573 err,
1574 "ExecutionAlgorithm not registered: trader_id is not set"
1575 );
1576 }
1577
1578 #[rstest]
1579 fn test_required_account_id_errors_when_missing_for_algorithm_event() {
1580 let order = OrderAny::Market(MarketOrder::new(
1581 TraderId::from("TRADER-001"),
1582 StrategyId::from("STRAT-001"),
1583 InstrumentId::from("BTC/USDT.BINANCE"),
1584 ClientOrderId::from("O-NO-ACCOUNT-001"),
1585 OrderSide::Buy,
1586 Quantity::from("1.0"),
1587 TimeInForce::Gtc,
1588 UUID4::new(),
1589 0.into(),
1590 false,
1591 false,
1592 None,
1593 None,
1594 None,
1595 None,
1596 None,
1597 None,
1598 None,
1599 None,
1600 ));
1601
1602 let err = required_account_id(&order, "pending update")
1603 .unwrap_err()
1604 .to_string();
1605
1606 assert_eq!(
1607 err,
1608 "Cannot generate pending update event for O-NO-ACCOUNT-001: account_id is not set"
1609 );
1610 }
1611
1612 #[rstest]
1613 fn test_algorithm_id() {
1614 let algo = create_test_algorithm();
1615 assert!(algo.id().inner().starts_with("TEST-"));
1616 }
1617
1618 #[rstest]
1619 fn test_execution_algorithm_behavior_does_not_require_native_core_access() {
1620 fn assert_execution_algorithm<T: ExecutionAlgorithm + DataActor + Component>() {}
1621
1622 assert_execution_algorithm::<CoreFreeExecutionAlgorithm>();
1623
1624 let mut algorithm = CoreFreeExecutionAlgorithm {
1625 state: ComponentState::PreInitialized,
1626 orders_seen: 0,
1627 };
1628 let order = OrderTestBuilder::new(OrderType::Market)
1629 .instrument_id(InstrumentId::from("BTC/USDT.BINANCE"))
1630 .quantity(Quantity::from("1.0"))
1631 .build();
1632
1633 algorithm.on_order(order).unwrap();
1634
1635 assert_eq!(algorithm.orders_seen, 1);
1636 }
1637
1638 #[rstest]
1639 fn test_nautilus_execution_algorithm_macro_custom_field() {
1640 let exec_algorithm_id = ExecAlgorithmId::from("MACRO-001");
1641 let algorithm = MacroTestCustomField {
1642 inner: ExecutionAlgorithmCore::new(ExecutionAlgorithmConfig {
1643 exec_algorithm_id: Some(exec_algorithm_id),
1644 ..Default::default()
1645 }),
1646 };
1647
1648 assert_eq!(algorithm.id(), exec_algorithm_id);
1649 assert_eq!(algorithm.actor_id(), ActorId::from("MACRO-001"));
1650 }
1651
1652 #[rstest]
1653 fn test_algorithm_spawn_market_creates_valid_order() {
1654 let mut algo = create_test_algorithm();
1655 register_algorithm(&mut algo);
1656
1657 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1658 let mut primary = OrderAny::Market(MarketOrder::new(
1659 TraderId::from("TRADER-001"),
1660 StrategyId::from("STRAT-001"),
1661 instrument_id,
1662 ClientOrderId::from("O-001"),
1663 OrderSide::Buy,
1664 Quantity::from("1.0"),
1665 TimeInForce::Gtc,
1666 UUID4::new(),
1667 0.into(),
1668 false, false, None, None, None, None, None, None, None, None, ));
1679
1680 let spawned = algo.spawn_market(
1681 &mut primary,
1682 Quantity::from("0.5"),
1683 TimeInForce::Ioc,
1684 false,
1685 None, false, );
1688
1689 assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1690 assert_eq!(spawned.instrument_id, instrument_id);
1691 assert_eq!(spawned.order_side(), OrderSide::Buy);
1692 assert_eq!(spawned.quantity, Quantity::from("0.5"));
1693 assert_eq!(spawned.time_in_force, TimeInForce::Ioc);
1694 assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1695 assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1696 }
1697
1698 #[rstest]
1699 fn test_algorithm_spawn_increments_sequence() {
1700 let mut algo = create_test_algorithm();
1701 register_algorithm(&mut algo);
1702
1703 let mut primary = OrderAny::Market(MarketOrder::new(
1704 TraderId::from("TRADER-001"),
1705 StrategyId::from("STRAT-001"),
1706 InstrumentId::from("BTC/USDT.BINANCE"),
1707 ClientOrderId::from("O-001"),
1708 OrderSide::Buy,
1709 Quantity::from("1.0"),
1710 TimeInForce::Gtc,
1711 UUID4::new(),
1712 0.into(),
1713 false,
1714 false,
1715 None,
1716 None,
1717 None,
1718 None,
1719 None,
1720 None,
1721 None,
1722 None,
1723 ));
1724
1725 let spawned1 = algo.spawn_market(
1726 &mut primary,
1727 Quantity::from("0.25"),
1728 TimeInForce::Ioc,
1729 false,
1730 None,
1731 false,
1732 );
1733 let spawned2 = algo.spawn_market(
1734 &mut primary,
1735 Quantity::from("0.25"),
1736 TimeInForce::Ioc,
1737 false,
1738 None,
1739 false,
1740 );
1741 let spawned3 = algo.spawn_market(
1742 &mut primary,
1743 Quantity::from("0.25"),
1744 TimeInForce::Ioc,
1745 false,
1746 None,
1747 false,
1748 );
1749
1750 assert_eq!(spawned1.client_order_id.as_str(), "O-001-E1");
1751 assert_eq!(spawned2.client_order_id.as_str(), "O-001-E2");
1752 assert_eq!(spawned3.client_order_id.as_str(), "O-001-E3");
1753 }
1754
1755 #[rstest]
1756 fn test_algorithm_default_handlers_do_not_panic() {
1757 let mut algo = create_test_algorithm();
1758
1759 algo.on_order_initialized(OrderInitialized::default());
1760 algo.on_order_denied(OrderDenied::default());
1761 algo.on_order_emulated(OrderEmulated::default());
1762 algo.on_order_released(OrderReleased::default());
1763 algo.on_order_submitted(OrderSubmitted::default());
1764 algo.on_order_rejected(OrderRejected::default());
1765 algo.on_order_accepted(OrderAccepted::default());
1766 algo.on_algo_order_canceled(OrderCanceled::default());
1767 algo.on_order_expired(OrderExpired::default());
1768 algo.on_order_triggered(OrderTriggered::default());
1769 algo.on_order_pending_update(OrderPendingUpdate::default());
1770 algo.on_order_pending_cancel(OrderPendingCancel::default());
1771 algo.on_order_modify_rejected(OrderModifyRejected::default());
1772 algo.on_order_cancel_rejected(OrderCancelRejected::default());
1773 algo.on_order_updated(OrderUpdated::default());
1774 algo.on_algo_order_filled(OrderFilledSpec::builder().build());
1775 }
1776
1777 #[rstest]
1778 fn test_strategy_subscription_tracking() {
1779 let mut algo = create_test_algorithm();
1780 let strategy_id = StrategyId::from("STRAT-001");
1781
1782 assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1783
1784 algo.subscribe_to_strategy_events(strategy_id);
1785 assert!(algo.core.is_strategy_subscribed(&strategy_id));
1786
1787 algo.subscribe_to_strategy_events(strategy_id);
1789 assert!(algo.core.is_strategy_subscribed(&strategy_id));
1790 }
1791
1792 #[rstest]
1793 fn test_algorithm_reset() {
1794 let mut algo = create_test_algorithm();
1795 let strategy_id = StrategyId::from("STRAT-001");
1796 let primary_id = ClientOrderId::new("O-001");
1797
1798 let _ = algo.core.spawn_client_order_id(&primary_id);
1799 algo.core.add_subscribed_strategy(strategy_id);
1800
1801 assert!(algo.core.spawn_sequence(&primary_id).is_some());
1802 assert!(algo.core.is_strategy_subscribed(&strategy_id));
1803
1804 ExecutionAlgorithm::on_reset(&mut algo).unwrap();
1805
1806 assert!(algo.core.spawn_sequence(&primary_id).is_none());
1807 assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1808 }
1809
1810 #[rstest]
1811 fn test_algorithm_spawn_limit_creates_valid_order() {
1812 let mut algo = create_test_algorithm();
1813 register_algorithm(&mut algo);
1814
1815 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1816 let mut primary = OrderAny::Market(MarketOrder::new(
1817 TraderId::from("TRADER-001"),
1818 StrategyId::from("STRAT-001"),
1819 instrument_id,
1820 ClientOrderId::from("O-001"),
1821 OrderSide::Buy,
1822 Quantity::from("1.0"),
1823 TimeInForce::Gtc,
1824 UUID4::new(),
1825 0.into(),
1826 false,
1827 false,
1828 None,
1829 None,
1830 None,
1831 None,
1832 None,
1833 None,
1834 None,
1835 None,
1836 ));
1837
1838 let price = Price::from("50000.0");
1839 let spawned = algo.spawn_limit(
1840 &mut primary,
1841 Quantity::from("0.5"),
1842 price,
1843 TimeInForce::Gtc,
1844 None, false, false, None, None, None, false, );
1852
1853 assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1854 assert_eq!(spawned.instrument_id, instrument_id);
1855 assert_eq!(spawned.order_side(), OrderSide::Buy);
1856 assert_eq!(spawned.quantity, Quantity::from("0.5"));
1857 assert_eq!(spawned.price, price);
1858 assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1859 assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1860 assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1861 }
1862
1863 #[rstest]
1864 fn test_algorithm_spawn_market_to_limit_creates_valid_order() {
1865 let mut algo = create_test_algorithm();
1866 register_algorithm(&mut algo);
1867
1868 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1869 let mut primary = OrderAny::Market(MarketOrder::new(
1870 TraderId::from("TRADER-001"),
1871 StrategyId::from("STRAT-001"),
1872 instrument_id,
1873 ClientOrderId::from("O-001"),
1874 OrderSide::Buy,
1875 Quantity::from("1.0"),
1876 TimeInForce::Gtc,
1877 UUID4::new(),
1878 0.into(),
1879 false,
1880 false,
1881 None,
1882 None,
1883 None,
1884 None,
1885 None,
1886 None,
1887 None,
1888 None,
1889 ));
1890
1891 let spawned = algo.spawn_market_to_limit(
1892 &mut primary,
1893 Quantity::from("0.5"),
1894 TimeInForce::Gtc,
1895 None, false, None, None, None, false, );
1902
1903 assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1904 assert_eq!(spawned.instrument_id, instrument_id);
1905 assert_eq!(spawned.order_side(), OrderSide::Buy);
1906 assert_eq!(spawned.quantity, Quantity::from("0.5"));
1907 assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1908 assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1909 assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1910 }
1911
1912 #[rstest]
1913 fn test_algorithm_spawn_market_with_tags() {
1914 let mut algo = create_test_algorithm();
1915 register_algorithm(&mut algo);
1916
1917 let mut primary = OrderAny::Market(MarketOrder::new(
1918 TraderId::from("TRADER-001"),
1919 StrategyId::from("STRAT-001"),
1920 InstrumentId::from("BTC/USDT.BINANCE"),
1921 ClientOrderId::from("O-001"),
1922 OrderSide::Buy,
1923 Quantity::from("1.0"),
1924 TimeInForce::Gtc,
1925 UUID4::new(),
1926 0.into(),
1927 false,
1928 false,
1929 None,
1930 None,
1931 None,
1932 None,
1933 None,
1934 None,
1935 None,
1936 None,
1937 ));
1938
1939 let tags = vec![ustr::Ustr::from("TAG1"), ustr::Ustr::from("TAG2")];
1940 let spawned = algo.spawn_market(
1941 &mut primary,
1942 Quantity::from("0.5"),
1943 TimeInForce::Ioc,
1944 false,
1945 Some(tags.clone()),
1946 false,
1947 );
1948
1949 assert_eq!(spawned.tags, Some(tags));
1950 }
1951
1952 #[rstest]
1953 fn test_algorithm_spawn_propagates_primary_fields() {
1954 let mut algo = create_test_algorithm();
1955 register_algorithm(&mut algo);
1956
1957 let mut params = indexmap::IndexMap::new();
1958 params.insert(ustr::Ustr::from("horizon_secs"), ustr::Ustr::from("30"));
1959 params.insert(ustr::Ustr::from("interval_secs"), ustr::Ustr::from("10"));
1960 let primary_tags = vec![ustr::Ustr::from("PRIMARY_TAG")];
1961 let linked_order_ids = vec![ClientOrderId::from("LINK-1")];
1962
1963 let mut primary = OrderAny::Market(MarketOrder::new(
1964 TraderId::from("TRADER-001"),
1965 StrategyId::from("STRAT-001"),
1966 InstrumentId::from("BTC/USDT.BINANCE"),
1967 ClientOrderId::from("O-001"),
1968 OrderSide::Buy,
1969 Quantity::from("1.0"),
1970 TimeInForce::Gtc,
1971 UUID4::new(),
1972 0.into(),
1973 false, true, None, None, Some(linked_order_ids.clone()),
1978 None, Some(algo.id()),
1980 Some(params.clone()),
1981 None, Some(primary_tags.clone()),
1983 ));
1984
1985 let spawned_market = algo.spawn_market(
1986 &mut primary,
1987 Quantity::from("0.25"),
1988 TimeInForce::Ioc,
1989 false,
1990 None, false,
1992 );
1993 assert!(spawned_market.is_quote_quantity);
1994 assert_eq!(spawned_market.exec_algorithm_params, Some(params.clone()));
1995 assert_eq!(spawned_market.tags, Some(primary_tags.clone()));
1996 assert_eq!(
1997 spawned_market.linked_order_ids,
1998 Some(linked_order_ids.clone())
1999 );
2000
2001 let spawned_limit = algo.spawn_limit(
2002 &mut primary,
2003 Quantity::from("0.25"),
2004 Price::from("50000.0"),
2005 TimeInForce::Gtc,
2006 None, false, false, None, None, None, false,
2013 );
2014 assert!(spawned_limit.is_quote_quantity);
2015 assert_eq!(spawned_limit.exec_algorithm_params, Some(params.clone()));
2016 assert_eq!(spawned_limit.tags, Some(primary_tags.clone()));
2017 assert_eq!(
2018 spawned_limit.linked_order_ids,
2019 Some(linked_order_ids.clone())
2020 );
2021
2022 let spawned_mtl = algo.spawn_market_to_limit(
2023 &mut primary,
2024 Quantity::from("0.25"),
2025 TimeInForce::Gtc,
2026 None, false, None, None, None, false,
2032 );
2033 assert!(spawned_mtl.is_quote_quantity);
2034 assert_eq!(spawned_mtl.exec_algorithm_params, Some(params));
2035 assert_eq!(spawned_mtl.tags, Some(primary_tags));
2036 assert_eq!(spawned_mtl.linked_order_ids, Some(linked_order_ids));
2037 }
2038
2039 #[rstest]
2040 fn test_algorithm_reduce_primary_order() {
2041 let mut algo = create_test_algorithm();
2042 register_algorithm(&mut algo);
2043
2044 let order = OrderAny::Market(MarketOrder::new(
2045 TraderId::from("TRADER-001"),
2046 StrategyId::from("STRAT-001"),
2047 InstrumentId::from("BTC/USDT.BINANCE"),
2048 ClientOrderId::from("O-001"),
2049 OrderSide::Buy,
2050 Quantity::from("1.0"),
2051 TimeInForce::Gtc,
2052 UUID4::new(),
2053 0.into(),
2054 false,
2055 false,
2056 None,
2057 None,
2058 None,
2059 None,
2060 None,
2061 None,
2062 None,
2063 None,
2064 ));
2065
2066 let mut primary = TestOrderStubs::make_accepted_order(&order);
2068
2069 {
2070 let cache_rc = algo.core.cache_rc();
2071 let mut cache = cache_rc.borrow_mut();
2072 cache.add_order(primary.clone(), None, None, false).unwrap();
2073 }
2074
2075 let spawn_qty = Quantity::from("0.3");
2076 algo.reduce_primary_order(&mut primary, spawn_qty);
2077
2078 assert_eq!(primary.quantity(), Quantity::from("0.7"));
2079 }
2080
2081 #[rstest]
2082 fn test_algorithm_reduce_primary_order_publishes_updated_event() {
2083 let mut algo = create_test_algorithm();
2084 register_algorithm(&mut algo);
2085
2086 let strategy_id = StrategyId::from("STRAT-ALGO-REDUCE-PUBLISH");
2087 let order = OrderAny::Market(MarketOrder::new(
2088 TraderId::from("TRADER-001"),
2089 strategy_id,
2090 InstrumentId::from("BTC/USDT.BINANCE"),
2091 ClientOrderId::from("O-ALGO-REDUCE"),
2092 OrderSide::Buy,
2093 Quantity::from("1.0"),
2094 TimeInForce::Gtc,
2095 UUID4::new(),
2096 0.into(),
2097 false,
2098 false,
2099 None,
2100 None,
2101 None,
2102 None,
2103 None,
2104 None,
2105 None,
2106 None,
2107 ));
2108 let mut primary = TestOrderStubs::make_accepted_order(&order);
2109
2110 {
2111 let cache_rc = algo.core.cache_rc();
2112 let mut cache = cache_rc.borrow_mut();
2113 cache.add_order(primary.clone(), None, None, false).unwrap();
2114 }
2115
2116 let (handler, events) = subscribe_order_topic(strategy_id);
2117
2118 algo.reduce_primary_order(&mut primary, Quantity::from("0.3"));
2119
2120 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2121 let events = events.borrow();
2122
2123 assert_eq!(events.len(), 1);
2124 assert!(matches!(
2125 &events[0],
2126 OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
2127 ));
2128 }
2129
2130 #[rstest]
2131 fn test_algorithm_submit_order_publishes_initialized_for_new_order() {
2132 let mut algo = create_test_algorithm();
2133 register_algorithm(&mut algo);
2134
2135 let strategy_id = StrategyId::from("STRAT-ALGO-INIT-PUBLISH");
2136 let order = OrderAny::Market(MarketOrder::new(
2137 TraderId::from("TRADER-001"),
2138 strategy_id,
2139 InstrumentId::from("BTC/USDT.BINANCE"),
2140 ClientOrderId::from("O-ALGO-INIT"),
2141 OrderSide::Buy,
2142 Quantity::from("1.0"),
2143 TimeInForce::Gtc,
2144 UUID4::new(),
2145 0.into(),
2146 false,
2147 false,
2148 None,
2149 None,
2150 None,
2151 None,
2152 None,
2153 None,
2154 None,
2155 None,
2156 ));
2157 let (handler, events) = subscribe_order_topic(strategy_id);
2158
2159 algo.submit_order(order.clone(), None, None).unwrap();
2160
2161 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2162 let events = events.borrow();
2163
2164 assert_eq!(events.len(), 1);
2165 assert!(matches!(
2166 &events[0],
2167 OrderEventAny::Initialized(event) if event.client_order_id == order.client_order_id()
2168 ));
2169 }
2170
2171 #[rstest]
2172 fn test_algorithm_submit_order_does_not_republish_initialized_for_existing_order() {
2173 let mut algo = create_test_algorithm();
2174 register_algorithm(&mut algo);
2175
2176 let strategy_id = StrategyId::from("STRAT-ALGO-INIT-EXISTING");
2177 let order = OrderAny::Market(MarketOrder::new(
2178 TraderId::from("TRADER-001"),
2179 strategy_id,
2180 InstrumentId::from("BTC/USDT.BINANCE"),
2181 ClientOrderId::from("O-ALGO-INIT-EXISTING"),
2182 OrderSide::Buy,
2183 Quantity::from("1.0"),
2184 TimeInForce::Gtc,
2185 UUID4::new(),
2186 0.into(),
2187 false,
2188 false,
2189 None,
2190 None,
2191 None,
2192 None,
2193 None,
2194 None,
2195 None,
2196 None,
2197 ));
2198 {
2199 let cache_rc = algo.core.cache_rc();
2200 let mut cache = cache_rc.borrow_mut();
2201 cache.add_order(order.clone(), None, None, true).unwrap();
2202 }
2203 let (handler, events) = subscribe_order_topic(strategy_id);
2204
2205 algo.submit_order(order, None, None).unwrap();
2206
2207 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2208 assert!(events.borrow().is_empty());
2209 }
2210
2211 #[rstest]
2212 fn test_algorithm_spawn_market_with_reduce_primary() {
2213 let mut algo = create_test_algorithm();
2214 register_algorithm(&mut algo);
2215
2216 let order = OrderAny::Market(MarketOrder::new(
2217 TraderId::from("TRADER-001"),
2218 StrategyId::from("STRAT-001"),
2219 InstrumentId::from("BTC/USDT.BINANCE"),
2220 ClientOrderId::from("O-001"),
2221 OrderSide::Buy,
2222 Quantity::from("1.0"),
2223 TimeInForce::Gtc,
2224 UUID4::new(),
2225 0.into(),
2226 false,
2227 false,
2228 None,
2229 None,
2230 None,
2231 None,
2232 None,
2233 None,
2234 None,
2235 None,
2236 ));
2237
2238 let mut primary = TestOrderStubs::make_accepted_order(&order);
2240
2241 {
2242 let cache_rc = algo.core.cache_rc();
2243 let mut cache = cache_rc.borrow_mut();
2244 cache.add_order(primary.clone(), None, None, false).unwrap();
2245 }
2246
2247 let spawned = algo.spawn_market(
2248 &mut primary,
2249 Quantity::from("0.4"),
2250 TimeInForce::Ioc,
2251 false,
2252 None,
2253 true, );
2255
2256 assert_eq!(spawned.quantity, Quantity::from("0.4"));
2257 assert_eq!(primary.quantity(), Quantity::from("0.6"));
2258 }
2259
2260 #[rstest]
2261 fn test_algorithm_generate_order_canceled() {
2262 let mut algo = create_test_algorithm();
2263 register_algorithm(&mut algo);
2264
2265 let order = OrderAny::Market(MarketOrder::new(
2266 TraderId::from("TRADER-001"),
2267 StrategyId::from("STRAT-001"),
2268 InstrumentId::from("BTC/USDT.BINANCE"),
2269 ClientOrderId::from("O-001"),
2270 OrderSide::Buy,
2271 Quantity::from("1.0"),
2272 TimeInForce::Gtc,
2273 UUID4::new(),
2274 0.into(),
2275 false,
2276 false,
2277 None,
2278 None,
2279 None,
2280 None,
2281 None,
2282 None,
2283 None,
2284 None,
2285 ));
2286
2287 let event = algo.generate_order_canceled(&order);
2288
2289 assert_eq!(event.trader_id, TraderId::from("TRADER-001"));
2290 assert_eq!(event.strategy_id, StrategyId::from("STRAT-001"));
2291 assert_eq!(event.instrument_id, InstrumentId::from("BTC/USDT.BINANCE"));
2292 assert_eq!(event.client_order_id, ClientOrderId::from("O-001"));
2293 }
2294
2295 #[rstest]
2296 fn test_algorithm_handle_cancel_order_publishes_instrument_canceled_topic() {
2297 let mut algo = create_test_algorithm();
2298 register_algorithm(&mut algo);
2299
2300 let strategy_id = StrategyId::from("STRAT-ALGO-CANCEL-PUBLISH");
2301 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2302 let order = OrderAny::Market(MarketOrder::new(
2303 TraderId::from("TRADER-001"),
2304 strategy_id,
2305 instrument_id,
2306 ClientOrderId::from("O-ALGO-CANCEL"),
2307 OrderSide::Buy,
2308 Quantity::from("1.0"),
2309 TimeInForce::Gtc,
2310 UUID4::new(),
2311 0.into(),
2312 false,
2313 false,
2314 None,
2315 None,
2316 None,
2317 None,
2318 None,
2319 None,
2320 None,
2321 None,
2322 ));
2323 let order = TestOrderStubs::make_accepted_order(&order);
2324
2325 {
2326 let cache_rc = algo.core.cache_rc();
2327 let mut cache = cache_rc.borrow_mut();
2328 cache.add_order(order.clone(), None, None, false).unwrap();
2329 }
2330
2331 let received = Rc::new(RefCell::new(Vec::<OrderEventAny>::new()));
2332 let handler = TypedHandler::from({
2333 let received = received.clone();
2334 move |event: &OrderEventAny| {
2335 received.borrow_mut().push(event.clone());
2336 }
2337 });
2338 let topic = msgbus::switchboard::get_order_canceled_topic(instrument_id);
2339 msgbus::subscribe_order_events(topic.into(), handler.clone(), None);
2340
2341 let command = CancelOrder::new(
2342 order.trader_id(),
2343 None,
2344 strategy_id,
2345 instrument_id,
2346 order.client_order_id(),
2347 order.venue_order_id(),
2348 UUID4::new(),
2349 0.into(),
2350 None,
2351 None,
2352 );
2353 algo.handle_cancel_order(command).unwrap();
2354
2355 msgbus::unsubscribe_order_events(topic.into(), &handler);
2356 let received = received.borrow();
2357 assert_eq!(received.len(), 1);
2358 assert!(matches!(received[0], OrderEventAny::Canceled(_)));
2359 assert_eq!(received[0].client_order_id(), order.client_order_id());
2360 assert_eq!(received[0].instrument_id(), instrument_id);
2361 }
2362
2363 #[rstest]
2364 fn test_algorithm_modify_order_in_place_updates_quantity() {
2365 let mut algo = create_test_algorithm();
2366 register_algorithm(&mut algo);
2367
2368 let strategy_id = StrategyId::from("STRAT-ALGO-MODIFY-IN-PLACE");
2369 let mut order = OrderAny::Limit(LimitOrder::new(
2370 TraderId::from("TRADER-001"),
2371 strategy_id,
2372 InstrumentId::from("BTC/USDT.BINANCE"),
2373 ClientOrderId::from("O-001"),
2374 OrderSide::Buy,
2375 Quantity::from("1.0"),
2376 Price::from("50000.0"),
2377 TimeInForce::Gtc,
2378 None, false, false, false, None, None, None, None, None, None, None, None, None, None, None, UUID4::new(),
2394 0.into(),
2395 ));
2396
2397 {
2398 let cache_rc = algo.core.cache_rc();
2399 let mut cache = cache_rc.borrow_mut();
2400 cache.add_order(order.clone(), None, None, false).unwrap();
2401 }
2402
2403 let new_qty = Quantity::from("0.5");
2404 let (handler, events) = subscribe_order_topic(strategy_id);
2405
2406 algo.modify_order_in_place(&mut order, Some(new_qty), None, None)
2407 .unwrap();
2408
2409 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2410 let events = events.borrow();
2411
2412 assert_eq!(order.quantity(), new_qty);
2413 assert_eq!(events.len(), 1);
2414 assert!(matches!(
2415 &events[0],
2416 OrderEventAny::Updated(event) if event.quantity == new_qty
2417 ));
2418 }
2419
2420 #[rstest]
2421 fn test_algorithm_modify_order_in_place_rejects_no_changes() {
2422 let mut algo = create_test_algorithm();
2423 register_algorithm(&mut algo);
2424
2425 let mut order = OrderAny::Limit(LimitOrder::new(
2426 TraderId::from("TRADER-001"),
2427 StrategyId::from("STRAT-001"),
2428 InstrumentId::from("BTC/USDT.BINANCE"),
2429 ClientOrderId::from("O-001"),
2430 OrderSide::Buy,
2431 Quantity::from("1.0"),
2432 Price::from("50000.0"),
2433 TimeInForce::Gtc,
2434 None,
2435 false,
2436 false,
2437 false,
2438 None,
2439 None,
2440 None,
2441 None,
2442 None,
2443 None,
2444 None,
2445 None,
2446 None,
2447 None,
2448 None,
2449 UUID4::new(),
2450 0.into(),
2451 ));
2452
2453 let result =
2455 algo.modify_order_in_place(&mut order, Some(Quantity::from("1.0")), None, None);
2456
2457 assert!(result.is_err());
2458 assert!(
2459 result
2460 .unwrap_err()
2461 .to_string()
2462 .contains("no parameters differ")
2463 );
2464 }
2465
2466 #[rstest]
2467 fn test_spawned_order_denied_restores_primary_quantity() {
2468 let mut algo = create_test_algorithm();
2469 register_algorithm(&mut algo);
2470
2471 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2472 let exec_algorithm_id = algo.id();
2473
2474 let mut primary = OrderAny::Market(MarketOrder::new(
2475 TraderId::from("TRADER-001"),
2476 StrategyId::from("STRAT-001"),
2477 instrument_id,
2478 ClientOrderId::from("O-001"),
2479 OrderSide::Buy,
2480 Quantity::from("1.0"),
2481 TimeInForce::Gtc,
2482 UUID4::new(),
2483 0.into(),
2484 false,
2485 false,
2486 None,
2487 None,
2488 None,
2489 None,
2490 Some(exec_algorithm_id),
2491 None,
2492 None,
2493 None,
2494 ));
2495
2496 {
2497 let cache_rc = algo.core.cache_rc();
2498 let mut cache = cache_rc.borrow_mut();
2499 cache.add_order(primary.clone(), None, None, false).unwrap();
2500 }
2501
2502 let spawned = algo.spawn_market(
2503 &mut primary,
2504 Quantity::from("0.5"),
2505 TimeInForce::Fok,
2506 false,
2507 None,
2508 true,
2509 );
2510
2511 assert_eq!(primary.quantity(), Quantity::from("0.5"));
2512
2513 let spawned_order = OrderAny::Market(spawned);
2514 {
2515 let cache_rc = algo.core.cache_rc();
2516 let mut cache = cache_rc.borrow_mut();
2517 cache
2518 .add_order(spawned_order.clone(), None, None, false)
2519 .unwrap();
2520 }
2521
2522 let denied = OrderDeniedSpec::builder()
2523 .trader_id(spawned_order.trader_id())
2524 .strategy_id(spawned_order.strategy_id())
2525 .instrument_id(spawned_order.instrument_id())
2526 .client_order_id(spawned_order.client_order_id())
2527 .reason("TEST_DENIAL".into())
2528 .build();
2529
2530 {
2531 let cache_rc = algo.core.cache_rc();
2532 let mut cache = cache_rc.borrow_mut();
2533 cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2534 }
2535
2536 algo.handle_order_event(OrderEventAny::Denied(denied));
2537
2538 let restored_primary = algo.cache().order(&ClientOrderId::from("O-001")).unwrap();
2539 assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2540 }
2541
2542 #[rstest]
2543 fn test_spawned_order_rejected_restores_primary_quantity() {
2544 let mut algo = create_test_algorithm();
2545 register_algorithm(&mut algo);
2546
2547 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2548 let exec_algorithm_id = algo.id();
2549
2550 let mut primary = OrderAny::Market(MarketOrder::new(
2551 TraderId::from("TRADER-001"),
2552 StrategyId::from("STRAT-001"),
2553 instrument_id,
2554 ClientOrderId::from("O-001"),
2555 OrderSide::Buy,
2556 Quantity::from("1.0"),
2557 TimeInForce::Gtc,
2558 UUID4::new(),
2559 0.into(),
2560 false,
2561 false,
2562 None,
2563 None,
2564 None,
2565 None,
2566 Some(exec_algorithm_id),
2567 None,
2568 None,
2569 None,
2570 ));
2571
2572 {
2573 let cache_rc = algo.core.cache_rc();
2574 let mut cache = cache_rc.borrow_mut();
2575 cache.add_order(primary.clone(), None, None, false).unwrap();
2576 }
2577
2578 let spawned = algo.spawn_market(
2579 &mut primary,
2580 Quantity::from("0.5"),
2581 TimeInForce::Fok,
2582 false,
2583 None,
2584 true,
2585 );
2586
2587 assert_eq!(primary.quantity(), Quantity::from("0.5"));
2588
2589 let spawned_order = OrderAny::Market(spawned);
2590 {
2591 let cache_rc = algo.core.cache_rc();
2592 let mut cache = cache_rc.borrow_mut();
2593 cache
2594 .add_order(spawned_order.clone(), None, None, false)
2595 .unwrap();
2596 }
2597
2598 let rejected = OrderRejectedSpec::builder()
2599 .trader_id(spawned_order.trader_id())
2600 .strategy_id(spawned_order.strategy_id())
2601 .instrument_id(spawned_order.instrument_id())
2602 .client_order_id(spawned_order.client_order_id())
2603 .account_id(AccountId::from("BINANCE-001"))
2604 .reason("TEST_REJECTION".into())
2605 .build();
2606
2607 {
2608 let cache_rc = algo.core.cache_rc();
2609 let mut cache = cache_rc.borrow_mut();
2610 cache
2611 .update_order(&OrderEventAny::Rejected(rejected))
2612 .unwrap();
2613 }
2614
2615 algo.handle_order_event(OrderEventAny::Rejected(rejected));
2616
2617 let restored_primary = algo.cache().order(&ClientOrderId::from("O-001")).unwrap();
2618 assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2619 }
2620
2621 #[rstest]
2622 fn test_spawned_order_with_reduce_primary_false_does_not_restore() {
2623 let mut algo = create_test_algorithm();
2624 register_algorithm(&mut algo);
2625
2626 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2627 let exec_algorithm_id = algo.id();
2628
2629 let mut primary = OrderAny::Market(MarketOrder::new(
2630 TraderId::from("TRADER-001"),
2631 StrategyId::from("STRAT-001"),
2632 instrument_id,
2633 ClientOrderId::from("O-001"),
2634 OrderSide::Buy,
2635 Quantity::from("1.0"),
2636 TimeInForce::Gtc,
2637 UUID4::new(),
2638 0.into(),
2639 false,
2640 false,
2641 None,
2642 None,
2643 None,
2644 None,
2645 Some(exec_algorithm_id),
2646 None,
2647 None,
2648 None,
2649 ));
2650
2651 {
2652 let cache_rc = algo.core.cache_rc();
2653 let mut cache = cache_rc.borrow_mut();
2654 cache.add_order(primary.clone(), None, None, false).unwrap();
2655 }
2656
2657 let spawned = algo.spawn_market(
2658 &mut primary,
2659 Quantity::from("0.5"),
2660 TimeInForce::Fok,
2661 false,
2662 None,
2663 false,
2664 );
2665
2666 assert_eq!(primary.quantity(), Quantity::from("1.0"));
2667
2668 let spawned_order = OrderAny::Market(spawned);
2669 {
2670 let cache_rc = algo.core.cache_rc();
2671 let mut cache = cache_rc.borrow_mut();
2672 cache
2673 .add_order(spawned_order.clone(), None, None, false)
2674 .unwrap();
2675 }
2676
2677 let denied = OrderDeniedSpec::builder()
2678 .trader_id(spawned_order.trader_id())
2679 .strategy_id(spawned_order.strategy_id())
2680 .instrument_id(spawned_order.instrument_id())
2681 .client_order_id(spawned_order.client_order_id())
2682 .reason("TEST_DENIAL".into())
2683 .build();
2684
2685 {
2686 let cache_rc = algo.core.cache_rc();
2687 let mut cache = cache_rc.borrow_mut();
2688 cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2689 }
2690
2691 algo.handle_order_event(OrderEventAny::Denied(denied));
2692
2693 let final_primary = algo.cache().order(&ClientOrderId::from("O-001")).unwrap();
2694 assert_eq!(final_primary.quantity(), Quantity::from("1.0"));
2695 }
2696
2697 #[rstest]
2698 fn test_multiple_spawns_with_one_denied_restores_correctly() {
2699 let mut algo = create_test_algorithm();
2700 register_algorithm(&mut algo);
2701
2702 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2703 let exec_algorithm_id = algo.id();
2704
2705 let mut primary = OrderAny::Market(MarketOrder::new(
2706 TraderId::from("TRADER-001"),
2707 StrategyId::from("STRAT-001"),
2708 instrument_id,
2709 ClientOrderId::from("O-001"),
2710 OrderSide::Buy,
2711 Quantity::from("1.0"),
2712 TimeInForce::Gtc,
2713 UUID4::new(),
2714 0.into(),
2715 false,
2716 false,
2717 None,
2718 None,
2719 None,
2720 None,
2721 Some(exec_algorithm_id),
2722 None,
2723 None,
2724 None,
2725 ));
2726
2727 {
2728 let cache_rc = algo.core.cache_rc();
2729 let mut cache = cache_rc.borrow_mut();
2730 cache.add_order(primary.clone(), None, None, false).unwrap();
2731 }
2732
2733 let spawned1 = algo.spawn_market(
2734 &mut primary,
2735 Quantity::from("0.3"),
2736 TimeInForce::Fok,
2737 false,
2738 None,
2739 true,
2740 );
2741 let spawned2 = algo.spawn_market(
2742 &mut primary,
2743 Quantity::from("0.4"),
2744 TimeInForce::Fok,
2745 false,
2746 None,
2747 true,
2748 );
2749 assert_eq!(primary.quantity(), Quantity::from("0.3"));
2750
2751 let spawned_order1 = OrderAny::Market(spawned1);
2752 let spawned_order2 = OrderAny::Market(spawned2);
2753 {
2754 let cache_rc = algo.core.cache_rc();
2755 let mut cache = cache_rc.borrow_mut();
2756 cache.add_order(spawned_order1, None, None, false).unwrap();
2757 cache
2758 .add_order(spawned_order2.clone(), None, None, false)
2759 .unwrap();
2760 }
2761
2762 let denied = OrderDeniedSpec::builder()
2763 .trader_id(spawned_order2.trader_id())
2764 .strategy_id(spawned_order2.strategy_id())
2765 .instrument_id(spawned_order2.instrument_id())
2766 .client_order_id(spawned_order2.client_order_id())
2767 .reason("TEST_DENIAL".into())
2768 .build();
2769
2770 {
2771 let cache_rc = algo.core.cache_rc();
2772 let mut cache = cache_rc.borrow_mut();
2773 cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2774 }
2775
2776 let (handler, events) = subscribe_order_topic(spawned_order2.strategy_id());
2777
2778 algo.handle_order_event(OrderEventAny::Denied(denied));
2779
2780 msgbus::unsubscribe_order_events(
2781 format!("events.order.{}", spawned_order2.strategy_id()).into(),
2782 &handler,
2783 );
2784 let events = events.borrow();
2785
2786 let restored_primary = algo.cache().order(&ClientOrderId::from("O-001")).unwrap();
2787 assert_eq!(restored_primary.quantity(), Quantity::from("0.7"));
2788 assert_eq!(events.len(), 1);
2789 assert!(matches!(
2790 &events[0],
2791 OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
2792 ));
2793 }
2794
2795 #[rstest]
2796 fn test_spawned_order_accepted_prevents_restoration() {
2797 let mut algo = create_test_algorithm();
2798 register_algorithm(&mut algo);
2799
2800 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2801 let exec_algorithm_id = algo.id();
2802
2803 let mut primary = OrderAny::Market(MarketOrder::new(
2804 TraderId::from("TRADER-001"),
2805 StrategyId::from("STRAT-001"),
2806 instrument_id,
2807 ClientOrderId::from("O-001"),
2808 OrderSide::Buy,
2809 Quantity::from("1.0"),
2810 TimeInForce::Gtc,
2811 UUID4::new(),
2812 0.into(),
2813 false,
2814 false,
2815 None,
2816 None,
2817 None,
2818 None,
2819 Some(exec_algorithm_id),
2820 None,
2821 None,
2822 None,
2823 ));
2824
2825 {
2826 let cache_rc = algo.core.cache_rc();
2827 let mut cache = cache_rc.borrow_mut();
2828 cache.add_order(primary.clone(), None, None, false).unwrap();
2829 }
2830
2831 let spawned = algo.spawn_market(
2832 &mut primary,
2833 Quantity::from("0.5"),
2834 TimeInForce::Fok,
2835 false,
2836 None,
2837 true,
2838 );
2839
2840 assert_eq!(primary.quantity(), Quantity::from("0.5"));
2841
2842 let mut spawned_order = OrderAny::Market(spawned);
2843 {
2844 let cache_rc = algo.core.cache_rc();
2845 let mut cache = cache_rc.borrow_mut();
2846 cache
2847 .add_order(spawned_order.clone(), None, None, false)
2848 .unwrap();
2849 }
2850
2851 let accepted = OrderAcceptedSpec::builder()
2852 .trader_id(spawned_order.trader_id())
2853 .strategy_id(spawned_order.strategy_id())
2854 .instrument_id(spawned_order.instrument_id())
2855 .client_order_id(spawned_order.client_order_id())
2856 .venue_order_id(VenueOrderId::from("V-123"))
2857 .account_id(AccountId::from("BINANCE-001"))
2858 .build();
2859
2860 {
2861 let cache_rc = algo.core.cache_rc();
2862 let mut cache = cache_rc.borrow_mut();
2863 spawned_order = cache
2864 .update_order(&OrderEventAny::Accepted(accepted))
2865 .unwrap();
2866 }
2867
2868 algo.handle_order_event(OrderEventAny::Accepted(accepted));
2869
2870 let primary_after_accept = algo.cache().order(&ClientOrderId::from("O-001")).unwrap();
2871 assert_eq!(primary_after_accept.quantity(), Quantity::from("0.5"));
2872
2873 let canceled = OrderCanceledSpec::builder()
2875 .trader_id(spawned_order.trader_id())
2876 .strategy_id(spawned_order.strategy_id())
2877 .instrument_id(spawned_order.instrument_id())
2878 .client_order_id(spawned_order.client_order_id())
2879 .venue_order_id(VenueOrderId::from("V-123"))
2880 .account_id(AccountId::from("BINANCE-001"))
2881 .build();
2882
2883 {
2884 let cache_rc = algo.core.cache_rc();
2885 let mut cache = cache_rc.borrow_mut();
2886 cache
2887 .update_order(&OrderEventAny::Canceled(canceled))
2888 .unwrap();
2889 }
2890
2891 algo.handle_order_event(OrderEventAny::Canceled(canceled));
2892
2893 let final_primary = algo.cache().order(&ClientOrderId::from("O-001")).unwrap();
2894 assert_eq!(final_primary.quantity(), Quantity::from("0.5"));
2895 }
2896
2897 #[rstest]
2898 #[should_panic(expected = "exceeds primary leaves_qty")]
2899 fn test_spawn_quantity_exceeds_leaves_qty_panics() {
2900 let mut algo = create_test_algorithm();
2901 register_algorithm(&mut algo);
2902
2903 let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2904 let exec_algorithm_id = algo.id();
2905
2906 let mut primary = OrderAny::Market(MarketOrder::new(
2907 TraderId::from("TRADER-001"),
2908 StrategyId::from("STRAT-001"),
2909 instrument_id,
2910 ClientOrderId::from("O-001"),
2911 OrderSide::Buy,
2912 Quantity::from("1.0"),
2913 TimeInForce::Gtc,
2914 UUID4::new(),
2915 0.into(),
2916 false,
2917 false,
2918 None,
2919 None,
2920 None,
2921 None,
2922 Some(exec_algorithm_id),
2923 None,
2924 None,
2925 None,
2926 ));
2927
2928 {
2929 let cache_rc = algo.core.cache_rc();
2930 let mut cache = cache_rc.borrow_mut();
2931 cache.add_order(primary.clone(), None, None, false).unwrap();
2932 }
2933
2934 let _ = algo.spawn_market(
2935 &mut primary,
2936 Quantity::from("0.8"),
2937 TimeInForce::Fok,
2938 false,
2939 None,
2940 true,
2941 );
2942
2943 assert_eq!(primary.quantity(), Quantity::from("0.2"));
2944 assert_eq!(primary.leaves_qty(), Quantity::from("0.2"));
2945
2946 let _ = algo.spawn_market(
2948 &mut primary,
2949 Quantity::from("0.5"),
2950 TimeInForce::Fok,
2951 false,
2952 None,
2953 true,
2954 );
2955 }
2956}