1use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use ahash::AHashMap;
19use nautilus_common::{
20 cache::Cache,
21 clock::Clock,
22 logging::{CMD, EVT, SEND},
23 messages::execution::{SubmitOrder, TradingCommand},
24 msgbus,
25 msgbus::MessagingSwitchboard,
26};
27use nautilus_core::UUID4;
28use nautilus_model::{
29 enums::{ContingencyType, TriggerType},
30 events::{
31 OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
32 },
33 identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
34 orders::{Order, OrderAny},
35 types::Quantity,
36};
37
38use super::handlers::{
39 CancelOrderHandler, CancelOrderHandlerAny, ModifyOrderHandler, ModifyOrderHandlerAny,
40 SubmitOrderHandler, SubmitOrderHandlerAny,
41};
42
43pub struct OrderManager {
50 clock: Rc<RefCell<dyn Clock>>,
51 cache: Rc<RefCell<Cache>>,
52 active_local: bool,
53 submit_order_handler: Option<SubmitOrderHandlerAny>,
54 cancel_order_handler: Option<CancelOrderHandlerAny>,
55 modify_order_handler: Option<ModifyOrderHandlerAny>,
56 submit_order_commands: AHashMap<ClientOrderId, SubmitOrder>,
57}
58
59impl Debug for OrderManager {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct(stringify!(OrderManager))
62 .field("pending_commands", &self.submit_order_commands.len())
63 .finish()
64 }
65}
66
67impl OrderManager {
68 pub fn new(
70 clock: Rc<RefCell<dyn Clock>>,
71 cache: Rc<RefCell<Cache>>,
72 active_local: bool,
73 submit_order_handler: Option<SubmitOrderHandlerAny>,
74 cancel_order_handler: Option<CancelOrderHandlerAny>,
75 modify_order_handler: Option<ModifyOrderHandlerAny>,
76 ) -> Self {
77 Self {
78 clock,
79 cache,
80 active_local,
81 submit_order_handler,
82 cancel_order_handler,
83 modify_order_handler,
84 submit_order_commands: AHashMap::new(),
85 }
86 }
87
88 pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
90 self.submit_order_handler = Some(handler);
91 }
92
93 pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
95 self.cancel_order_handler = Some(handler);
96 }
97
98 pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
100 self.modify_order_handler = Some(handler);
101 }
102
103 #[must_use]
104 pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
106 self.submit_order_commands.clone()
107 }
108
109 pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
111 self.submit_order_commands
112 .insert(command.client_order_id, command);
113 }
114
115 pub fn pop_submit_order_command(
117 &mut self,
118 client_order_id: ClientOrderId,
119 ) -> Option<SubmitOrder> {
120 self.submit_order_commands.remove(&client_order_id)
121 }
122
123 pub fn reset(&mut self) {
125 self.submit_order_commands.clear();
126 }
127
128 pub fn cancel_order(&mut self, order: &OrderAny) {
130 let client_order_id = order.client_order_id();
131 let cache = self.cache.borrow();
132
133 if cache.is_order_pending_cancel_local(&client_order_id) {
134 return;
135 }
136
137 if order.is_closed() || cache.is_order_closed(&client_order_id) {
138 log::warn!("Cannot cancel order: already closed");
139 return;
140 }
141
142 drop(cache);
143 self.submit_order_commands.remove(&client_order_id);
144
145 if let Some(handler) = &self.cancel_order_handler {
146 handler.handle_cancel_order(order);
147 }
148 }
149
150 pub fn modify_order_quantity(&mut self, order: &OrderAny, new_quantity: Quantity) {
152 if let Some(handler) = &self.modify_order_handler {
153 handler.handle_modify_order(order, new_quantity);
154 }
155 }
156
157 pub fn create_new_submit_order(
161 &mut self,
162 order: &OrderAny,
163 position_id: Option<PositionId>,
164 client_id: Option<ClientId>,
165 correlation_id: Option<UUID4>,
166 ) -> anyhow::Result<()> {
167 let order_exists = self.cache.borrow().order_exists(&order.client_order_id());
168
169 self.cache
170 .borrow_mut()
171 .add_order(order.clone(), position_id, client_id, true)?;
172
173 if !order_exists {
174 publish_order_initialized(order);
175 }
176
177 let submit = SubmitOrder::new(
178 order.trader_id(),
179 client_id,
180 order.strategy_id(),
181 order.instrument_id(),
182 order.client_order_id(),
183 order.init_event().clone(),
184 order.exec_algorithm_id(),
185 position_id,
186 None, UUID4::new(),
188 self.clock.borrow().timestamp_ns(),
189 correlation_id,
190 );
191
192 if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
193 self.cache_submit_order_command(submit.clone());
194
195 match order.exec_algorithm_id() {
196 Some(exec_algorithm_id) => {
197 self.send_algo_command(submit, exec_algorithm_id);
198 }
199 None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
200 }
201 } else if let Some(handler) = self.submit_order_handler.clone() {
202 self.cache_submit_order_command(submit.clone());
203 handler.handle_submit_order(submit);
204 }
205
206 Ok(())
207 }
208
209 #[must_use]
210 pub fn should_manage_order(&self, order: &OrderAny) -> bool {
212 self.active_local && order.is_active_local()
213 }
214
215 pub fn handle_event(&mut self, event: &OrderEventAny) {
221 match event {
222 OrderEventAny::Rejected(event) => self.handle_order_rejected(*event),
223 OrderEventAny::Canceled(event) => self.handle_order_canceled(*event),
224 OrderEventAny::Expired(event) => self.handle_order_expired(*event),
225 OrderEventAny::Updated(event) => self.handle_order_updated(*event),
226 OrderEventAny::Filled(event) => self.handle_order_filled(*event),
227 _ => {}
228 }
229 }
230
231 pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
233 let cloned_order = self
234 .cache
235 .borrow()
236 .order(&rejected.client_order_id)
237 .map(|o| o.clone());
238
239 if let Some(order) = cloned_order {
240 if order.contingency_type() != Some(ContingencyType::NoContingency) {
241 self.handle_contingencies(&order);
242 }
243 } else {
244 log::error!(
245 "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
246 rejected.client_order_id,
247 rejected
248 );
249 }
250 }
251
252 pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
253 let cloned_order = self
254 .cache
255 .borrow()
256 .order(&canceled.client_order_id)
257 .map(|o| o.clone());
258
259 if let Some(order) = cloned_order {
260 if order.contingency_type() != Some(ContingencyType::NoContingency) {
261 self.handle_contingencies(&order);
262 }
263 } else {
264 log::error!(
265 "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
266 canceled.client_order_id,
267 canceled
268 );
269 }
270 }
271
272 pub fn handle_order_expired(&mut self, expired: OrderExpired) {
273 let cloned_order = self
274 .cache
275 .borrow()
276 .order(&expired.client_order_id)
277 .map(|o| o.clone());
278 if let Some(order) = cloned_order {
279 if order.contingency_type() != Some(ContingencyType::NoContingency) {
280 self.handle_contingencies(&order);
281 }
282 } else {
283 log::error!(
284 "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
285 expired.client_order_id,
286 expired
287 );
288 }
289 }
290
291 pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
292 let cloned_order = self
293 .cache
294 .borrow()
295 .order(&updated.client_order_id)
296 .map(|o| o.clone());
297 if let Some(order) = cloned_order {
298 if order.contingency_type() != Some(ContingencyType::NoContingency) {
299 self.handle_contingencies_update(&order);
300 }
301 } else {
302 log::error!(
303 "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
304 updated.client_order_id,
305 updated
306 );
307 }
308 }
309
310 pub fn handle_order_filled(&mut self, filled: OrderFilled) {
311 let order = if let Some(order) = self
312 .cache
313 .borrow()
314 .order(&filled.client_order_id)
315 .map(|o| o.clone())
316 {
317 order
318 } else {
319 log::error!(
320 "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
321 filled.client_order_id,
322 filled
323 );
324 return;
325 };
326
327 match order.contingency_type() {
328 Some(ContingencyType::Oto) => {
329 let position_id = self
330 .cache
331 .borrow()
332 .position_id(&order.client_order_id())
333 .copied();
334 let client_id = self
335 .cache
336 .borrow()
337 .client_id(&order.client_order_id())
338 .copied();
339
340 let parent_filled_qty = match order.exec_spawn_id() {
341 Some(spawn_id) => {
342 if let Some(qty) = self
343 .cache
344 .borrow()
345 .exec_spawn_total_filled_qty(&spawn_id, true)
346 {
347 qty
348 } else {
349 log::error!("Failed to get spawn filled quantity for {spawn_id}");
350 return;
351 }
352 }
353 None => order.filled_qty(),
354 };
355
356 let linked_orders = if let Some(orders) = order.linked_order_ids() {
357 orders
358 } else {
359 log::error!("No linked orders found for OTO order");
360 return;
361 };
362
363 for client_order_id in linked_orders {
364 let mut child_order = if let Some(order) = self
365 .cache
366 .borrow()
367 .order(client_order_id)
368 .map(|o| o.clone())
369 {
370 order
371 } else {
372 log::error!(
373 "Cannot find OTO child order for client_order_id: {client_order_id}"
374 );
375 continue;
376 };
377
378 if !self.should_manage_order(&child_order) {
379 continue;
380 }
381
382 if child_order.position_id().is_none() {
383 child_order.set_position_id(position_id);
384 }
385
386 if parent_filled_qty != child_order.leaves_qty() {
387 self.modify_order_quantity(&child_order, parent_filled_qty);
388 }
389
390 if !self
395 .submit_order_commands
396 .contains_key(&child_order.client_order_id())
397 && let Err(e) =
398 self.create_new_submit_order(&child_order, position_id, client_id, None)
399 {
400 log::error!("Failed to create new submit order: {e}");
401 }
402 }
403 }
404 Some(ContingencyType::Oco) => {
405 let linked_orders = if let Some(orders) = order.linked_order_ids() {
406 orders
407 } else {
408 log::error!("No linked orders found for OCO order");
409 return;
410 };
411
412 for client_order_id in linked_orders {
413 let contingent_order = match self
414 .cache
415 .borrow()
416 .order(client_order_id)
417 .map(|o| o.clone())
418 {
419 Some(contingent_order) => contingent_order,
420 None => {
421 log::error!(
422 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
423 );
424 continue;
425 }
426 };
427
428 if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
430 {
431 continue;
432 }
433
434 if contingent_order.client_order_id() != order.client_order_id() {
435 self.cancel_order(&contingent_order);
436 }
437 }
438 }
439 Some(ContingencyType::Ouo) => self.handle_contingencies(&order),
440 _ => {}
441 }
442 }
443
444 pub fn handle_contingencies(&mut self, order: &OrderAny) {
445 let (filled_qty, leaves_qty, is_spawn_active) =
446 if let Some(exec_spawn_id) = order.exec_spawn_id() {
447 if let (Some(filled), Some(leaves)) = (
448 self.cache
449 .borrow()
450 .exec_spawn_total_filled_qty(&exec_spawn_id, true),
451 self.cache
452 .borrow()
453 .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
454 ) {
455 (filled, leaves, leaves.raw > 0)
456 } else {
457 log::error!("Failed to get spawn quantities for {exec_spawn_id}");
458 return;
459 }
460 } else {
461 (order.filled_qty(), order.leaves_qty(), false)
462 };
463
464 let linked_orders = if let Some(orders) = order.linked_order_ids() {
465 orders
466 } else {
467 log::error!("No linked orders found");
468 return;
469 };
470
471 for client_order_id in linked_orders {
472 let contingent_order = if let Some(order) = self
473 .cache
474 .borrow()
475 .order(client_order_id)
476 .map(|o| o.clone())
477 {
478 order
479 } else {
480 log::error!("Cannot find contingent order for client_order_id: {client_order_id}");
481 continue;
482 };
483
484 if !self.should_manage_order(&contingent_order)
485 || client_order_id == &order.client_order_id()
486 {
487 continue;
488 }
489
490 if contingent_order.is_closed() {
491 self.submit_order_commands.remove(&order.client_order_id());
492 continue;
493 }
494
495 match order.contingency_type() {
496 Some(ContingencyType::Oto) => {
497 if order.is_closed()
498 && filled_qty.raw == 0
499 && (order.exec_spawn_id().is_none() || !is_spawn_active)
500 {
501 self.cancel_order(&contingent_order);
502 } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
503 self.modify_order_quantity(&contingent_order, filled_qty);
504 }
505 }
506 Some(ContingencyType::Oco)
507 if order.is_closed()
508 && (order.exec_spawn_id().is_none() || !is_spawn_active) =>
509 {
510 self.cancel_order(&contingent_order);
511 }
512 Some(ContingencyType::Ouo) => {
513 if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
514 || (order.is_closed()
515 && (order.exec_spawn_id().is_none() || !is_spawn_active))
516 {
517 self.cancel_order(&contingent_order);
518 } else if leaves_qty != contingent_order.leaves_qty() {
519 self.modify_order_quantity(&contingent_order, leaves_qty);
520 }
521 }
522 _ => {}
523 }
524 }
525 }
526
527 pub fn handle_contingencies_update(&mut self, order: &OrderAny) {
528 let quantity = match order.exec_spawn_id() {
529 Some(exec_spawn_id) => {
530 if let Some(qty) = self
531 .cache
532 .borrow()
533 .exec_spawn_total_quantity(&exec_spawn_id, true)
534 {
535 qty
536 } else {
537 log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
538 return;
539 }
540 }
541 None => order.quantity(),
542 };
543
544 if quantity.raw == 0 {
545 return;
546 }
547
548 let linked_orders = if let Some(orders) = order.linked_order_ids() {
549 orders
550 } else {
551 log::error!("No linked orders found for contingent order");
552 return;
553 };
554
555 for client_order_id in linked_orders {
556 let contingent_order = match self
557 .cache
558 .borrow()
559 .order(client_order_id)
560 .map(|o| o.clone())
561 {
562 Some(contingent_order) => contingent_order,
563 None => {
564 log::error!(
565 "Cannot find OCO contingent order for client_order_id: {client_order_id}"
566 );
567 continue;
568 }
569 };
570
571 if !self.should_manage_order(&contingent_order)
572 || client_order_id == &order.client_order_id()
573 || contingent_order.is_closed()
574 {
575 continue;
576 }
577
578 if let Some(contingency_type) = order.contingency_type()
579 && matches!(
580 contingency_type,
581 ContingencyType::Oto | ContingencyType::Oco
582 )
583 && quantity != contingent_order.quantity()
584 {
585 self.modify_order_quantity(&contingent_order, quantity);
586 }
587 }
588 }
589
590 pub fn send_emulator_command(&self, command: TradingCommand) {
592 log_cmd_send(&command);
593 let endpoint = MessagingSwitchboard::order_emulator_execute();
594 msgbus::send_trading_command(endpoint, command);
595 }
596
597 pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
598 let id = command.strategy_id;
599 log::info!("{id} {CMD}{SEND} {command}");
600
601 let endpoint = format!("{exec_algorithm_id}.execute");
603 msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
604 }
605
606 pub fn send_risk_command(&self, command: TradingCommand) {
607 log_cmd_send(&command);
608
609 let endpoint = MessagingSwitchboard::risk_engine_queue_execute();
613 msgbus::send_trading_command(endpoint, command);
614 }
615
616 pub fn send_exec_command(&self, command: TradingCommand) {
617 log_cmd_send(&command);
618
619 let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
622 msgbus::send_trading_command(endpoint, command);
623 }
624
625 pub fn send_risk_event(&self, event: OrderEventAny) {
626 log_evt_send(&event);
627 let endpoint = MessagingSwitchboard::risk_engine_process();
628 msgbus::send_order_event(endpoint, event);
629 }
630
631 pub fn send_exec_event(&self, event: OrderEventAny) {
632 log_evt_send(&event);
633 let endpoint = MessagingSwitchboard::exec_engine_process();
634 msgbus::send_order_event(endpoint, event);
635 }
636}
637
638#[inline(always)]
639fn log_cmd_send(command: &TradingCommand) {
640 if let Some(id) = command.strategy_id() {
641 log::info!("{id} {CMD}{SEND} {command}");
642 } else {
643 log::info!("{CMD}{SEND} {command}");
644 }
645}
646
647#[inline(always)]
648fn log_evt_send(event: &OrderEventAny) {
649 let id = event.strategy_id();
650 log::info!("{id} {EVT}{SEND} {event}");
651}
652
653fn publish_order_initialized(order: &OrderAny) {
654 let event = OrderEventAny::Initialized(order.init_event().clone());
655 let topic = format!("events.order.{}", order.strategy_id());
656 msgbus::publish_order_event(topic.into(), &event);
657}
658
659#[cfg(test)]
660mod tests {
661 use std::{cell::RefCell, rc::Rc};
662
663 use nautilus_common::{cache::Cache, clock::TestClock, msgbus, msgbus::TypedHandler};
664 use nautilus_core::{UUID4, UnixNanos, WeakCell};
665 use nautilus_model::{
666 enums::{ContingencyType, OrderSide, OrderType, TriggerType},
667 events::{OrderAccepted, OrderSubmitted},
668 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
669 instruments::{Instrument, InstrumentAny, stubs::audusd_sim},
670 orders::{Order, OrderTestBuilder, stubs::TestOrderEventStubs},
671 types::{Price, Quantity},
672 };
673 use rstest::rstest;
674
675 use super::*;
676 use crate::{
677 order_emulator::emulator::OrderEmulator,
678 order_manager::handlers::{
679 CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny,
680 },
681 };
682
683 #[rstest]
686 fn test_handle_event_unhandled_events_are_noop() {
687 let submitted = OrderEventAny::Submitted(OrderSubmitted {
688 trader_id: TraderId::from("TRADER-001"),
689 strategy_id: StrategyId::from("STRATEGY-001"),
690 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
691 client_order_id: ClientOrderId::from("O-001"),
692 account_id: AccountId::from("ACCOUNT-001"),
693 event_id: UUID4::new(),
694 ts_event: UnixNanos::default(),
695 ts_init: UnixNanos::default(),
696 causation_id: None,
697 });
698 let accepted = OrderEventAny::Accepted(OrderAccepted {
699 trader_id: TraderId::from("TRADER-001"),
700 strategy_id: StrategyId::from("STRATEGY-001"),
701 instrument_id: InstrumentId::from("BTC-USDT.OKX"),
702 client_order_id: ClientOrderId::from("O-001"),
703 venue_order_id: VenueOrderId::from("V-001"),
704 account_id: AccountId::from("ACCOUNT-001"),
705 event_id: UUID4::new(),
706 ts_event: UnixNanos::default(),
707 ts_init: UnixNanos::default(),
708 reconciliation: false,
709 causation_id: None,
710 });
711
712 match submitted {
713 OrderEventAny::Rejected(_) => panic!("Should not match"),
714 OrderEventAny::Canceled(_) => panic!("Should not match"),
715 OrderEventAny::Expired(_) => panic!("Should not match"),
716 OrderEventAny::Updated(_) => panic!("Should not match"),
717 OrderEventAny::Filled(_) => panic!("Should not match"),
718 _ => {}
719 }
720
721 match accepted {
722 OrderEventAny::Rejected(_) => panic!("Should not match"),
723 OrderEventAny::Canceled(_) => panic!("Should not match"),
724 OrderEventAny::Expired(_) => panic!("Should not match"),
725 OrderEventAny::Updated(_) => panic!("Should not match"),
726 OrderEventAny::Filled(_) => panic!("Should not match"),
727 _ => {}
728 }
729 }
730
731 #[expect(clippy::type_complexity)]
732 fn create_test_components() -> (
733 Rc<RefCell<dyn Clock>>,
734 Rc<RefCell<Cache>>,
735 Rc<RefCell<OrderEmulator>>,
736 ) {
737 let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
738 let cache = Rc::new(RefCell::new(Cache::new(None, None)));
739 let emulator = Rc::new(RefCell::new(OrderEmulator::new(
740 clock.clone(),
741 cache.clone(),
742 )));
743 (clock, cache, emulator)
744 }
745
746 fn create_test_stop_order() -> OrderAny {
747 let instrument = audusd_sim();
748 OrderTestBuilder::new(OrderType::StopMarket)
749 .instrument_id(instrument.id())
750 .side(OrderSide::Buy)
751 .trigger_price(Price::from("1.00050"))
752 .quantity(Quantity::from(100_000))
753 .emulation_trigger(TriggerType::BidAsk)
754 .build()
755 }
756
757 fn make_submit_command(order: &OrderAny) -> SubmitOrder {
760 SubmitOrder::new(
761 order.trader_id(),
762 None,
763 order.strategy_id(),
764 order.instrument_id(),
765 order.client_order_id(),
766 order.init_event().clone(),
767 None,
768 None,
769 None,
770 UUID4::new(),
771 UnixNanos::default(),
772 None, )
774 }
775
776 fn subscribe_order_topic(
777 strategy_id: StrategyId,
778 ) -> (TypedHandler<OrderEventAny>, Rc<RefCell<Vec<OrderEventAny>>>) {
779 let events = Rc::new(RefCell::new(Vec::new()));
780 let handler = TypedHandler::from({
781 let events = events.clone();
782 move |event: &OrderEventAny| {
783 events.borrow_mut().push(event.clone());
784 }
785 });
786 msgbus::subscribe_order_events(
787 format!("events.order.{strategy_id}").into(),
788 handler.clone(),
789 None,
790 );
791 (handler, events)
792 }
793
794 #[rstest]
795 fn test_order_manager_with_handlers() {
796 let (clock, cache, emulator) = create_test_components();
797 let submit_handler =
798 SubmitOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
799 let cancel_handler =
800 CancelOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
801 let modify_handler =
802 ModifyOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
803
804 let manager = OrderManager::new(
805 clock,
806 cache,
807 true,
808 Some(submit_handler),
809 Some(cancel_handler),
810 Some(modify_handler),
811 );
812
813 assert!(manager.submit_order_handler.is_some());
814 assert!(manager.cancel_order_handler.is_some());
815 assert!(manager.modify_order_handler.is_some());
816 }
817
818 #[rstest]
819 fn test_order_manager_cancel_order_dispatches_to_handler() {
820 let (clock, cache, emulator) = create_test_components();
821 let cancel_handler =
822 CancelOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
823 let mut manager =
824 OrderManager::new(clock, cache.clone(), true, None, Some(cancel_handler), None);
825 let order = create_test_stop_order();
826 cache
827 .borrow_mut()
828 .add_order(order.clone(), None, None, false)
829 .unwrap();
830 manager
831 .submit_order_commands
832 .insert(order.client_order_id(), make_submit_command(&order));
833
834 manager.cancel_order(&order);
835
836 assert!(
837 !manager
838 .submit_order_commands
839 .contains_key(&order.client_order_id()),
840 "expected dispatch path to remove the submit command",
841 );
842 }
843
844 #[rstest]
845 fn test_order_manager_modify_order_dispatches_to_handler() {
846 let (clock, cache, emulator) = create_test_components();
847 let modify_handler =
848 ModifyOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
849 let mut manager = OrderManager::new(clock, cache, true, None, None, Some(modify_handler));
850 let order = create_test_stop_order();
851 let new_quantity = Quantity::from(50_000);
852
853 manager.modify_order_quantity(&order, new_quantity);
854 }
855
856 #[rstest]
857 fn test_create_new_submit_order_publishes_initialized_for_new_order() {
858 let (clock, cache, _emulator) = create_test_components();
859 let mut manager = OrderManager::new(clock, cache, true, None, None, None);
860 let order = create_test_stop_order();
861 let strategy_id = order.strategy_id();
862 let (handler, events) = subscribe_order_topic(strategy_id);
863
864 manager
865 .create_new_submit_order(&order, None, None, None)
866 .unwrap();
867
868 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
869 let events = events.borrow();
870
871 assert_eq!(events.len(), 1);
872 assert!(matches!(
873 &events[0],
874 OrderEventAny::Initialized(event) if event.client_order_id == order.client_order_id()
875 ));
876 }
877
878 #[rstest]
879 fn test_create_new_submit_order_does_not_republish_initialized_for_existing_order() {
880 let (clock, cache, _emulator) = create_test_components();
881 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
882 let order = create_test_stop_order();
883 let strategy_id = order.strategy_id();
884 cache
885 .borrow_mut()
886 .add_order(order.clone(), None, None, true)
887 .unwrap();
888 let (handler, events) = subscribe_order_topic(strategy_id);
889
890 manager
891 .create_new_submit_order(&order, None, None, None)
892 .unwrap();
893
894 msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
895 assert!(events.borrow().is_empty());
896 }
897
898 #[rstest]
899 fn test_order_manager_without_handlers() {
900 let (clock, cache, _emulator) = create_test_components();
901 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
902 let order = create_test_stop_order();
903 cache
904 .borrow_mut()
905 .add_order(order.clone(), None, None, false)
906 .unwrap();
907 manager
908 .submit_order_commands
909 .insert(order.client_order_id(), make_submit_command(&order));
910
911 manager.cancel_order(&order);
912 manager.modify_order_quantity(&order, Quantity::from(50_000));
913
914 assert!(
915 !manager
916 .submit_order_commands
917 .contains_key(&order.client_order_id()),
918 "no-handler dispatch path should still remove the submit command",
919 );
920 }
921
922 #[rstest]
923 fn test_handle_order_filled_skips_missing_oco_contingent_order() {
924 let (clock, cache, _emulator) = create_test_components();
925 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
926 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
927 let missing_client_order_id = ClientOrderId::from("O-MISSING");
928 let valid_client_order_id = ClientOrderId::from("O-CHILD");
929 let order = OrderTestBuilder::new(OrderType::Limit)
930 .instrument_id(instrument.id())
931 .client_order_id(ClientOrderId::from("O-PARENT"))
932 .side(OrderSide::Buy)
933 .price(Price::from("1.00000"))
934 .quantity(Quantity::from(100_000))
935 .contingency_type(ContingencyType::Oco)
936 .linked_order_ids(vec![missing_client_order_id, valid_client_order_id])
937 .build();
938 let child_order = OrderTestBuilder::new(OrderType::Limit)
939 .instrument_id(instrument.id())
940 .client_order_id(valid_client_order_id)
941 .side(OrderSide::Buy)
942 .price(Price::from("1.00000"))
943 .quantity(Quantity::from(100_000))
944 .build();
945 cache
946 .borrow_mut()
947 .add_order(order.clone(), None, None, false)
948 .unwrap();
949 cache
950 .borrow_mut()
951 .add_order(child_order.clone(), None, None, false)
952 .unwrap();
953 manager
954 .submit_order_commands
955 .insert(valid_client_order_id, make_submit_command(&child_order));
956 let filled = match TestOrderEventStubs::filled(
957 &order,
958 &instrument,
959 None,
960 None,
961 None,
962 None,
963 None,
964 None,
965 None,
966 Some(AccountId::from("SIM-001")),
967 ) {
968 OrderEventAny::Filled(event) => event,
969 event => panic!("expected OrderFilled, was {event:?}"),
970 };
971
972 manager.handle_order_filled(filled);
973
974 assert!(
975 !manager
976 .submit_order_commands
977 .contains_key(&valid_client_order_id)
978 );
979 }
980
981 #[rstest]
982 fn test_handle_order_filled_skips_missing_oto_child_order() {
983 let (clock, cache, _emulator) = create_test_components();
984 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
985 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
986 let missing_client_order_id = ClientOrderId::from("O-MISSING");
987 let valid_client_order_id = ClientOrderId::from("O-CHILD");
988 let order = OrderTestBuilder::new(OrderType::Limit)
989 .instrument_id(instrument.id())
990 .client_order_id(ClientOrderId::from("O-PARENT"))
991 .side(OrderSide::Buy)
992 .price(Price::from("1.00000"))
993 .quantity(Quantity::from(100_000))
994 .contingency_type(ContingencyType::Oto)
995 .linked_order_ids(vec![missing_client_order_id, valid_client_order_id])
996 .build();
997 let child_order = OrderTestBuilder::new(OrderType::Limit)
998 .instrument_id(instrument.id())
999 .client_order_id(valid_client_order_id)
1000 .side(OrderSide::Buy)
1001 .price(Price::from("1.00000"))
1002 .quantity(Quantity::from(100_000))
1003 .emulation_trigger(TriggerType::NoTrigger)
1004 .build();
1005 cache
1006 .borrow_mut()
1007 .add_order(order.clone(), None, None, false)
1008 .unwrap();
1009 cache
1010 .borrow_mut()
1011 .add_order(child_order, None, None, false)
1012 .unwrap();
1013 let filled = match TestOrderEventStubs::filled(
1014 &order,
1015 &instrument,
1016 None,
1017 None,
1018 None,
1019 None,
1020 None,
1021 None,
1022 None,
1023 Some(AccountId::from("SIM-001")),
1024 ) {
1025 OrderEventAny::Filled(event) => event,
1026 event => panic!("expected OrderFilled, was {event:?}"),
1027 };
1028
1029 manager.handle_order_filled(filled);
1030
1031 assert!(
1032 manager
1033 .submit_order_commands
1034 .contains_key(&valid_client_order_id)
1035 );
1036 }
1037
1038 #[rstest]
1039 fn test_handle_contingencies_skips_missing_linked_order() {
1040 let (clock, cache, _emulator) = create_test_components();
1041 let mut manager = OrderManager::new(clock, cache, true, None, None, None);
1042 let instrument = audusd_sim();
1043 let order = OrderTestBuilder::new(OrderType::Limit)
1044 .instrument_id(instrument.id())
1045 .client_order_id(ClientOrderId::from("O-PARENT"))
1046 .side(OrderSide::Buy)
1047 .price(Price::from("1.00000"))
1048 .quantity(Quantity::from(100_000))
1049 .contingency_type(ContingencyType::Oco)
1050 .linked_order_ids(vec![ClientOrderId::from("O-MISSING")])
1051 .build();
1052
1053 manager.handle_contingencies(&order);
1054
1055 assert!(manager.submit_order_commands.is_empty());
1056 }
1057
1058 #[rstest]
1059 fn test_handle_contingencies_update_skips_missing_linked_order() {
1060 let (clock, cache, _emulator) = create_test_components();
1061 let mut manager = OrderManager::new(clock, cache, true, None, None, None);
1062 let instrument = audusd_sim();
1063 let order = OrderTestBuilder::new(OrderType::Limit)
1064 .instrument_id(instrument.id())
1065 .client_order_id(ClientOrderId::from("O-PARENT"))
1066 .side(OrderSide::Buy)
1067 .price(Price::from("1.00000"))
1068 .quantity(Quantity::from(100_000))
1069 .contingency_type(ContingencyType::Oco)
1070 .linked_order_ids(vec![ClientOrderId::from("O-MISSING")])
1071 .build();
1072
1073 manager.handle_contingencies_update(&order);
1074
1075 assert!(manager.submit_order_commands.is_empty());
1076 }
1077
1078 #[rstest]
1079 fn test_cancel_order_skips_when_pending_cancel_local() {
1080 let (clock, cache, _emulator) = create_test_components();
1081 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
1082 let order = create_test_stop_order();
1083 cache
1084 .borrow_mut()
1085 .add_order(order.clone(), None, None, false)
1086 .unwrap();
1087 cache.borrow_mut().update_order_pending_cancel_local(&order);
1088 manager
1089 .submit_order_commands
1090 .insert(order.client_order_id(), make_submit_command(&order));
1091
1092 manager.cancel_order(&order);
1093
1094 assert!(
1095 manager
1096 .submit_order_commands
1097 .contains_key(&order.client_order_id()),
1098 "pending-cancel-local gate should short-circuit before removing the submit command",
1099 );
1100 }
1101
1102 #[rstest]
1103 fn test_cancel_order_skips_when_passed_order_is_closed() {
1104 let (clock, cache, _emulator) = create_test_components();
1108 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
1109
1110 let mut order = OrderTestBuilder::new(OrderType::StopMarket)
1111 .instrument_id(audusd_sim().id())
1112 .side(OrderSide::Buy)
1113 .trigger_price(Price::from("1.00050"))
1114 .quantity(Quantity::from(100_000))
1115 .emulation_trigger(TriggerType::BidAsk)
1116 .submit(true)
1117 .build();
1118
1119 cache
1120 .borrow_mut()
1121 .add_order(order.clone(), None, None, false)
1122 .unwrap();
1123
1124 let canceled_event =
1125 TestOrderEventStubs::canceled(&order, AccountId::from("ACCOUNT-001"), None);
1126 order.apply(canceled_event).unwrap();
1127
1128 assert!(order.is_closed());
1129 assert!(!cache.borrow().is_order_closed(&order.client_order_id()));
1130
1131 manager
1132 .submit_order_commands
1133 .insert(order.client_order_id(), make_submit_command(&order));
1134
1135 manager.cancel_order(&order);
1136
1137 assert!(
1138 manager
1139 .submit_order_commands
1140 .contains_key(&order.client_order_id()),
1141 "closed-order gate should short-circuit on the local state when the cache index is stale",
1142 );
1143 }
1144
1145 #[rstest]
1146 fn test_cancel_order_skips_when_cache_index_marks_closed() {
1147 let (clock, cache, _emulator) = create_test_components();
1151 let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
1152
1153 let mut order = OrderTestBuilder::new(OrderType::StopMarket)
1154 .instrument_id(audusd_sim().id())
1155 .side(OrderSide::Buy)
1156 .trigger_price(Price::from("1.00050"))
1157 .quantity(Quantity::from(100_000))
1158 .emulation_trigger(TriggerType::BidAsk)
1159 .submit(true)
1160 .build();
1161
1162 cache
1163 .borrow_mut()
1164 .add_order(order.clone(), None, None, false)
1165 .unwrap();
1166
1167 let stale_order = order.clone();
1168
1169 let canceled_event =
1170 TestOrderEventStubs::canceled(&order, AccountId::from("ACCOUNT-001"), None);
1171 order = cache.borrow_mut().update_order(&canceled_event).unwrap();
1172
1173 assert!(cache.borrow().is_order_closed(&order.client_order_id()));
1174
1175 manager.submit_order_commands.insert(
1176 stale_order.client_order_id(),
1177 make_submit_command(&stale_order),
1178 );
1179
1180 manager.cancel_order(&stale_order);
1181
1182 assert!(
1183 manager
1184 .submit_order_commands
1185 .contains_key(&stale_order.client_order_id()),
1186 "closed-order gate should short-circuit even when the passed reference is stale",
1187 );
1188 }
1189}