Skip to main content

nautilus_execution/order_manager/
manager.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{cell::RefCell, fmt::Debug, rc::Rc};
17
18use ahash::AHashMap;
19use nautilus_common::{
20    cache::Cache,
21    clock::Clock,
22    logging::{CMD, EVT, SEND},
23    messages::execution::{SubmitOrder, TradingCommand},
24    msgbus,
25    msgbus::MessagingSwitchboard,
26};
27use nautilus_core::UUID4;
28use nautilus_model::{
29    enums::{ContingencyType, TriggerType},
30    events::{
31        OrderCanceled, OrderEventAny, OrderExpired, OrderFilled, OrderRejected, OrderUpdated,
32    },
33    identifiers::{ClientId, ClientOrderId, ExecAlgorithmId, PositionId},
34    orders::{Order, OrderAny},
35    types::Quantity,
36};
37
38use super::handlers::{
39    CancelOrderHandler, CancelOrderHandlerAny, ModifyOrderHandler, ModifyOrderHandlerAny,
40    SubmitOrderHandler, SubmitOrderHandlerAny,
41};
42
43/// Manages the lifecycle and state of orders with contingency handling.
44///
45/// The order manager is responsible for managing local order state, handling
46/// contingent orders (OTO, OCO, OUO), and coordinating with emulation and
47/// execution systems. It tracks order commands and manages complex order
48/// relationships for advanced order types.
49pub struct OrderManager {
50    clock: Rc<RefCell<dyn Clock>>,
51    cache: Rc<RefCell<Cache>>,
52    active_local: bool,
53    submit_order_handler: Option<SubmitOrderHandlerAny>,
54    cancel_order_handler: Option<CancelOrderHandlerAny>,
55    modify_order_handler: Option<ModifyOrderHandlerAny>,
56    submit_order_commands: AHashMap<ClientOrderId, SubmitOrder>,
57}
58
59impl Debug for OrderManager {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct(stringify!(OrderManager))
62            .field("pending_commands", &self.submit_order_commands.len())
63            .finish()
64    }
65}
66
67impl OrderManager {
68    /// Creates a new [`OrderManager`] instance.
69    pub fn new(
70        clock: Rc<RefCell<dyn Clock>>,
71        cache: Rc<RefCell<Cache>>,
72        active_local: bool,
73        submit_order_handler: Option<SubmitOrderHandlerAny>,
74        cancel_order_handler: Option<CancelOrderHandlerAny>,
75        modify_order_handler: Option<ModifyOrderHandlerAny>,
76    ) -> Self {
77        Self {
78            clock,
79            cache,
80            active_local,
81            submit_order_handler,
82            cancel_order_handler,
83            modify_order_handler,
84            submit_order_commands: AHashMap::new(),
85        }
86    }
87
88    /// Sets the handler for submit order commands to the emulator.
89    pub fn set_submit_order_handler(&mut self, handler: SubmitOrderHandlerAny) {
90        self.submit_order_handler = Some(handler);
91    }
92
93    /// Sets the handler for cancel order commands to the emulator.
94    pub fn set_cancel_order_handler(&mut self, handler: CancelOrderHandlerAny) {
95        self.cancel_order_handler = Some(handler);
96    }
97
98    /// Sets the handler for modify order commands to the emulator.
99    pub fn set_modify_order_handler(&mut self, handler: ModifyOrderHandlerAny) {
100        self.modify_order_handler = Some(handler);
101    }
102
103    #[must_use]
104    /// Returns a copy of all cached submit order commands.
105    pub fn get_submit_order_commands(&self) -> AHashMap<ClientOrderId, SubmitOrder> {
106        self.submit_order_commands.clone()
107    }
108
109    /// Caches a submit order command for later processing.
110    pub fn cache_submit_order_command(&mut self, command: SubmitOrder) {
111        self.submit_order_commands
112            .insert(command.client_order_id, command);
113    }
114
115    /// Removes and returns a cached submit order command.
116    pub fn pop_submit_order_command(
117        &mut self,
118        client_order_id: ClientOrderId,
119    ) -> Option<SubmitOrder> {
120        self.submit_order_commands.remove(&client_order_id)
121    }
122
123    /// Resets the order manager by clearing all cached commands.
124    pub fn reset(&mut self) {
125        self.submit_order_commands.clear();
126    }
127
128    /// Cancels an order if it's not already pending cancellation or closed.
129    pub fn cancel_order(&mut self, order: &OrderAny) {
130        let client_order_id = order.client_order_id();
131        let cache = self.cache.borrow();
132
133        if cache.is_order_pending_cancel_local(&client_order_id) {
134            return;
135        }
136
137        if order.is_closed() || cache.is_order_closed(&client_order_id) {
138            log::warn!("Cannot cancel order: already closed");
139            return;
140        }
141
142        drop(cache);
143        self.submit_order_commands.remove(&client_order_id);
144
145        if let Some(handler) = &self.cancel_order_handler {
146            handler.handle_cancel_order(order);
147        }
148    }
149
150    /// Modifies the quantity of an existing order.
151    pub fn modify_order_quantity(&mut self, order: &OrderAny, new_quantity: Quantity) {
152        if let Some(handler) = &self.modify_order_handler {
153            handler.handle_modify_order(order, new_quantity);
154        }
155    }
156
157    /// # Errors
158    ///
159    /// Returns an error if creating a new submit order fails.
160    pub fn create_new_submit_order(
161        &mut self,
162        order: &OrderAny,
163        position_id: Option<PositionId>,
164        client_id: Option<ClientId>,
165        correlation_id: Option<UUID4>,
166    ) -> anyhow::Result<()> {
167        let order_exists = self.cache.borrow().order_exists(&order.client_order_id());
168
169        self.cache
170            .borrow_mut()
171            .add_order(order.clone(), position_id, client_id, true)?;
172
173        if !order_exists {
174            publish_order_initialized(order);
175        }
176
177        let submit = SubmitOrder::new(
178            order.trader_id(),
179            client_id,
180            order.strategy_id(),
181            order.instrument_id(),
182            order.client_order_id(),
183            order.init_event().clone(),
184            order.exec_algorithm_id(),
185            position_id,
186            None, // params
187            UUID4::new(),
188            self.clock.borrow().timestamp_ns(),
189            correlation_id,
190        );
191
192        if order.emulation_trigger() == Some(TriggerType::NoTrigger) {
193            self.cache_submit_order_command(submit.clone());
194
195            match order.exec_algorithm_id() {
196                Some(exec_algorithm_id) => {
197                    self.send_algo_command(submit, exec_algorithm_id);
198                }
199                None => self.send_risk_command(TradingCommand::SubmitOrder(submit)),
200            }
201        } else if let Some(handler) = self.submit_order_handler.clone() {
202            self.cache_submit_order_command(submit.clone());
203            handler.handle_submit_order(submit);
204        }
205
206        Ok(())
207    }
208
209    #[must_use]
210    /// Returns true if the order manager should manage the given order.
211    pub fn should_manage_order(&self, order: &OrderAny) -> bool {
212        self.active_local && order.is_active_local()
213    }
214
215    // Event Handlers
216    /// Handles an order event by routing it to the appropriate handler method.
217    ///
218    /// Note: Only handles specific terminal/actionable events. Other events
219    /// like `OrderSubmitted`, `OrderAccepted`, etc. are no-ops for the order manager.
220    pub fn handle_event(&mut self, event: &OrderEventAny) {
221        match event {
222            OrderEventAny::Rejected(event) => self.handle_order_rejected(*event),
223            OrderEventAny::Canceled(event) => self.handle_order_canceled(*event),
224            OrderEventAny::Expired(event) => self.handle_order_expired(*event),
225            OrderEventAny::Updated(event) => self.handle_order_updated(*event),
226            OrderEventAny::Filled(event) => self.handle_order_filled(*event),
227            _ => {}
228        }
229    }
230
231    /// Handles an order rejected event and manages any contingent orders.
232    pub fn handle_order_rejected(&mut self, rejected: OrderRejected) {
233        let cloned_order = self
234            .cache
235            .borrow()
236            .order(&rejected.client_order_id)
237            .map(|o| o.clone());
238
239        if let Some(order) = cloned_order {
240            if order.contingency_type() != Some(ContingencyType::NoContingency) {
241                self.handle_contingencies(&order);
242            }
243        } else {
244            log::error!(
245                "Cannot handle `OrderRejected`: order for client_order_id: {} not found, {}",
246                rejected.client_order_id,
247                rejected
248            );
249        }
250    }
251
252    pub fn handle_order_canceled(&mut self, canceled: OrderCanceled) {
253        let cloned_order = self
254            .cache
255            .borrow()
256            .order(&canceled.client_order_id)
257            .map(|o| o.clone());
258
259        if let Some(order) = cloned_order {
260            if order.contingency_type() != Some(ContingencyType::NoContingency) {
261                self.handle_contingencies(&order);
262            }
263        } else {
264            log::error!(
265                "Cannot handle `OrderCanceled`: order for client_order_id: {} not found, {}",
266                canceled.client_order_id,
267                canceled
268            );
269        }
270    }
271
272    pub fn handle_order_expired(&mut self, expired: OrderExpired) {
273        let cloned_order = self
274            .cache
275            .borrow()
276            .order(&expired.client_order_id)
277            .map(|o| o.clone());
278        if let Some(order) = cloned_order {
279            if order.contingency_type() != Some(ContingencyType::NoContingency) {
280                self.handle_contingencies(&order);
281            }
282        } else {
283            log::error!(
284                "Cannot handle `OrderExpired`: order for client_order_id: {} not found, {}",
285                expired.client_order_id,
286                expired
287            );
288        }
289    }
290
291    pub fn handle_order_updated(&mut self, updated: OrderUpdated) {
292        let cloned_order = self
293            .cache
294            .borrow()
295            .order(&updated.client_order_id)
296            .map(|o| o.clone());
297        if let Some(order) = cloned_order {
298            if order.contingency_type() != Some(ContingencyType::NoContingency) {
299                self.handle_contingencies_update(&order);
300            }
301        } else {
302            log::error!(
303                "Cannot handle `OrderUpdated`: order for client_order_id: {} not found, {}",
304                updated.client_order_id,
305                updated
306            );
307        }
308    }
309
310    pub fn handle_order_filled(&mut self, filled: OrderFilled) {
311        let order = if let Some(order) = self
312            .cache
313            .borrow()
314            .order(&filled.client_order_id)
315            .map(|o| o.clone())
316        {
317            order
318        } else {
319            log::error!(
320                "Cannot handle `OrderFilled`: order for client_order_id: {} not found, {}",
321                filled.client_order_id,
322                filled
323            );
324            return;
325        };
326
327        match order.contingency_type() {
328            Some(ContingencyType::Oto) => {
329                let position_id = self
330                    .cache
331                    .borrow()
332                    .position_id(&order.client_order_id())
333                    .copied();
334                let client_id = self
335                    .cache
336                    .borrow()
337                    .client_id(&order.client_order_id())
338                    .copied();
339
340                let parent_filled_qty = match order.exec_spawn_id() {
341                    Some(spawn_id) => {
342                        if let Some(qty) = self
343                            .cache
344                            .borrow()
345                            .exec_spawn_total_filled_qty(&spawn_id, true)
346                        {
347                            qty
348                        } else {
349                            log::error!("Failed to get spawn filled quantity for {spawn_id}");
350                            return;
351                        }
352                    }
353                    None => order.filled_qty(),
354                };
355
356                let linked_orders = if let Some(orders) = order.linked_order_ids() {
357                    orders
358                } else {
359                    log::error!("No linked orders found for OTO order");
360                    return;
361                };
362
363                for client_order_id in linked_orders {
364                    let mut child_order = if let Some(order) = self
365                        .cache
366                        .borrow()
367                        .order(client_order_id)
368                        .map(|o| o.clone())
369                    {
370                        order
371                    } else {
372                        log::error!(
373                            "Cannot find OTO child order for client_order_id: {client_order_id}"
374                        );
375                        continue;
376                    };
377
378                    if !self.should_manage_order(&child_order) {
379                        continue;
380                    }
381
382                    if child_order.position_id().is_none() {
383                        child_order.set_position_id(position_id);
384                    }
385
386                    if parent_filled_qty != child_order.leaves_qty() {
387                        self.modify_order_quantity(&child_order, parent_filled_qty);
388                    }
389
390                    // if self.submit_order_handler.is_none() {
391                    //     return;
392                    // }
393
394                    if !self
395                        .submit_order_commands
396                        .contains_key(&child_order.client_order_id())
397                        && let Err(e) =
398                            self.create_new_submit_order(&child_order, position_id, client_id, None)
399                    {
400                        log::error!("Failed to create new submit order: {e}");
401                    }
402                }
403            }
404            Some(ContingencyType::Oco) => {
405                let linked_orders = if let Some(orders) = order.linked_order_ids() {
406                    orders
407                } else {
408                    log::error!("No linked orders found for OCO order");
409                    return;
410                };
411
412                for client_order_id in linked_orders {
413                    let contingent_order = match self
414                        .cache
415                        .borrow()
416                        .order(client_order_id)
417                        .map(|o| o.clone())
418                    {
419                        Some(contingent_order) => contingent_order,
420                        None => {
421                            log::error!(
422                                "Cannot find OCO contingent order for client_order_id: {client_order_id}"
423                            );
424                            continue;
425                        }
426                    };
427
428                    // Not being managed || Already completed
429                    if !self.should_manage_order(&contingent_order) || contingent_order.is_closed()
430                    {
431                        continue;
432                    }
433
434                    if contingent_order.client_order_id() != order.client_order_id() {
435                        self.cancel_order(&contingent_order);
436                    }
437                }
438            }
439            Some(ContingencyType::Ouo) => self.handle_contingencies(&order),
440            _ => {}
441        }
442    }
443
444    pub fn handle_contingencies(&mut self, order: &OrderAny) {
445        let (filled_qty, leaves_qty, is_spawn_active) =
446            if let Some(exec_spawn_id) = order.exec_spawn_id() {
447                if let (Some(filled), Some(leaves)) = (
448                    self.cache
449                        .borrow()
450                        .exec_spawn_total_filled_qty(&exec_spawn_id, true),
451                    self.cache
452                        .borrow()
453                        .exec_spawn_total_leaves_qty(&exec_spawn_id, true),
454                ) {
455                    (filled, leaves, leaves.raw > 0)
456                } else {
457                    log::error!("Failed to get spawn quantities for {exec_spawn_id}");
458                    return;
459                }
460            } else {
461                (order.filled_qty(), order.leaves_qty(), false)
462            };
463
464        let linked_orders = if let Some(orders) = order.linked_order_ids() {
465            orders
466        } else {
467            log::error!("No linked orders found");
468            return;
469        };
470
471        for client_order_id in linked_orders {
472            let contingent_order = if let Some(order) = self
473                .cache
474                .borrow()
475                .order(client_order_id)
476                .map(|o| o.clone())
477            {
478                order
479            } else {
480                log::error!("Cannot find contingent order for client_order_id: {client_order_id}");
481                continue;
482            };
483
484            if !self.should_manage_order(&contingent_order)
485                || client_order_id == &order.client_order_id()
486            {
487                continue;
488            }
489
490            if contingent_order.is_closed() {
491                self.submit_order_commands.remove(&order.client_order_id());
492                continue;
493            }
494
495            match order.contingency_type() {
496                Some(ContingencyType::Oto) => {
497                    if order.is_closed()
498                        && filled_qty.raw == 0
499                        && (order.exec_spawn_id().is_none() || !is_spawn_active)
500                    {
501                        self.cancel_order(&contingent_order);
502                    } else if filled_qty.raw > 0 && filled_qty != contingent_order.quantity() {
503                        self.modify_order_quantity(&contingent_order, filled_qty);
504                    }
505                }
506                Some(ContingencyType::Oco)
507                    if order.is_closed()
508                        && (order.exec_spawn_id().is_none() || !is_spawn_active) =>
509                {
510                    self.cancel_order(&contingent_order);
511                }
512                Some(ContingencyType::Ouo) => {
513                    if (leaves_qty.raw == 0 && order.exec_spawn_id().is_some())
514                        || (order.is_closed()
515                            && (order.exec_spawn_id().is_none() || !is_spawn_active))
516                    {
517                        self.cancel_order(&contingent_order);
518                    } else if leaves_qty != contingent_order.leaves_qty() {
519                        self.modify_order_quantity(&contingent_order, leaves_qty);
520                    }
521                }
522                _ => {}
523            }
524        }
525    }
526
527    pub fn handle_contingencies_update(&mut self, order: &OrderAny) {
528        let quantity = match order.exec_spawn_id() {
529            Some(exec_spawn_id) => {
530                if let Some(qty) = self
531                    .cache
532                    .borrow()
533                    .exec_spawn_total_quantity(&exec_spawn_id, true)
534                {
535                    qty
536                } else {
537                    log::error!("Failed to get spawn total quantity for {exec_spawn_id}");
538                    return;
539                }
540            }
541            None => order.quantity(),
542        };
543
544        if quantity.raw == 0 {
545            return;
546        }
547
548        let linked_orders = if let Some(orders) = order.linked_order_ids() {
549            orders
550        } else {
551            log::error!("No linked orders found for contingent order");
552            return;
553        };
554
555        for client_order_id in linked_orders {
556            let contingent_order = match self
557                .cache
558                .borrow()
559                .order(client_order_id)
560                .map(|o| o.clone())
561            {
562                Some(contingent_order) => contingent_order,
563                None => {
564                    log::error!(
565                        "Cannot find OCO contingent order for client_order_id: {client_order_id}"
566                    );
567                    continue;
568                }
569            };
570
571            if !self.should_manage_order(&contingent_order)
572                || client_order_id == &order.client_order_id()
573                || contingent_order.is_closed()
574            {
575                continue;
576            }
577
578            if let Some(contingency_type) = order.contingency_type()
579                && matches!(
580                    contingency_type,
581                    ContingencyType::Oto | ContingencyType::Oco
582                )
583                && quantity != contingent_order.quantity()
584            {
585                self.modify_order_quantity(&contingent_order, quantity);
586            }
587        }
588    }
589
590    // Message sending methods
591    pub fn send_emulator_command(&self, command: TradingCommand) {
592        log_cmd_send(&command);
593        let endpoint = MessagingSwitchboard::order_emulator_execute();
594        msgbus::send_trading_command(endpoint, command);
595    }
596
597    pub fn send_algo_command(&self, command: SubmitOrder, exec_algorithm_id: ExecAlgorithmId) {
598        let id = command.strategy_id;
599        log::info!("{id} {CMD}{SEND} {command}");
600
601        // Dynamic algorithm endpoint - uses Any-based dispatch
602        let endpoint = format!("{exec_algorithm_id}.execute");
603        msgbus::send_any(endpoint.into(), &TradingCommand::SubmitOrder(command));
604    }
605
606    pub fn send_risk_command(&self, command: TradingCommand) {
607        log_cmd_send(&command);
608
609        // Use queued endpoint for re-entrancy safety, commands may be sent from
610        // within event handlers which hold a mutable borrow on the strategy.
611        // This mirrors the pattern used by `send_exec_command()`.
612        let endpoint = MessagingSwitchboard::risk_engine_queue_execute();
613        msgbus::send_trading_command(endpoint, command);
614    }
615
616    pub fn send_exec_command(&self, command: TradingCommand) {
617        log_cmd_send(&command);
618
619        // Use queued endpoint for re-entrancy safety, commands may be sent from
620        // within event handlers which hold a mutable borrow on ExecutionEngine.
621        let endpoint = MessagingSwitchboard::exec_engine_queue_execute();
622        msgbus::send_trading_command(endpoint, command);
623    }
624
625    pub fn send_risk_event(&self, event: OrderEventAny) {
626        log_evt_send(&event);
627        let endpoint = MessagingSwitchboard::risk_engine_process();
628        msgbus::send_order_event(endpoint, event);
629    }
630
631    pub fn send_exec_event(&self, event: OrderEventAny) {
632        log_evt_send(&event);
633        let endpoint = MessagingSwitchboard::exec_engine_process();
634        msgbus::send_order_event(endpoint, event);
635    }
636}
637
638#[inline(always)]
639fn log_cmd_send(command: &TradingCommand) {
640    if let Some(id) = command.strategy_id() {
641        log::info!("{id} {CMD}{SEND} {command}");
642    } else {
643        log::info!("{CMD}{SEND} {command}");
644    }
645}
646
647#[inline(always)]
648fn log_evt_send(event: &OrderEventAny) {
649    let id = event.strategy_id();
650    log::info!("{id} {EVT}{SEND} {event}");
651}
652
653fn publish_order_initialized(order: &OrderAny) {
654    let event = OrderEventAny::Initialized(order.init_event().clone());
655    let topic = format!("events.order.{}", order.strategy_id());
656    msgbus::publish_order_event(topic.into(), &event);
657}
658
659#[cfg(test)]
660mod tests {
661    use std::{cell::RefCell, rc::Rc};
662
663    use nautilus_common::{cache::Cache, clock::TestClock, msgbus, msgbus::TypedHandler};
664    use nautilus_core::{UUID4, UnixNanos, WeakCell};
665    use nautilus_model::{
666        enums::{ContingencyType, OrderSide, OrderType, TriggerType},
667        events::{OrderAccepted, OrderSubmitted},
668        identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
669        instruments::{Instrument, InstrumentAny, stubs::audusd_sim},
670        orders::{Order, OrderTestBuilder, stubs::TestOrderEventStubs},
671        types::{Price, Quantity},
672    };
673    use rstest::rstest;
674
675    use super::*;
676    use crate::{
677        order_emulator::emulator::OrderEmulator,
678        order_manager::handlers::{
679            CancelOrderHandlerAny, ModifyOrderHandlerAny, SubmitOrderHandlerAny,
680        },
681    };
682
683    /// Verifies unhandled order events are no-ops and don't panic.
684    /// Previously, unhandled events would hit a todo!() panic.
685    #[rstest]
686    fn test_handle_event_unhandled_events_are_noop() {
687        let submitted = OrderEventAny::Submitted(OrderSubmitted {
688            trader_id: TraderId::from("TRADER-001"),
689            strategy_id: StrategyId::from("STRATEGY-001"),
690            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
691            client_order_id: ClientOrderId::from("O-001"),
692            account_id: AccountId::from("ACCOUNT-001"),
693            event_id: UUID4::new(),
694            ts_event: UnixNanos::default(),
695            ts_init: UnixNanos::default(),
696            causation_id: None,
697        });
698        let accepted = OrderEventAny::Accepted(OrderAccepted {
699            trader_id: TraderId::from("TRADER-001"),
700            strategy_id: StrategyId::from("STRATEGY-001"),
701            instrument_id: InstrumentId::from("BTC-USDT.OKX"),
702            client_order_id: ClientOrderId::from("O-001"),
703            venue_order_id: VenueOrderId::from("V-001"),
704            account_id: AccountId::from("ACCOUNT-001"),
705            event_id: UUID4::new(),
706            ts_event: UnixNanos::default(),
707            ts_init: UnixNanos::default(),
708            reconciliation: false,
709            causation_id: None,
710        });
711
712        match submitted {
713            OrderEventAny::Rejected(_) => panic!("Should not match"),
714            OrderEventAny::Canceled(_) => panic!("Should not match"),
715            OrderEventAny::Expired(_) => panic!("Should not match"),
716            OrderEventAny::Updated(_) => panic!("Should not match"),
717            OrderEventAny::Filled(_) => panic!("Should not match"),
718            _ => {}
719        }
720
721        match accepted {
722            OrderEventAny::Rejected(_) => panic!("Should not match"),
723            OrderEventAny::Canceled(_) => panic!("Should not match"),
724            OrderEventAny::Expired(_) => panic!("Should not match"),
725            OrderEventAny::Updated(_) => panic!("Should not match"),
726            OrderEventAny::Filled(_) => panic!("Should not match"),
727            _ => {}
728        }
729    }
730
731    #[expect(clippy::type_complexity)]
732    fn create_test_components() -> (
733        Rc<RefCell<dyn Clock>>,
734        Rc<RefCell<Cache>>,
735        Rc<RefCell<OrderEmulator>>,
736    ) {
737        let clock: Rc<RefCell<dyn Clock>> = Rc::new(RefCell::new(TestClock::new()));
738        let cache = Rc::new(RefCell::new(Cache::new(None, None)));
739        let emulator = Rc::new(RefCell::new(OrderEmulator::new(
740            clock.clone(),
741            cache.clone(),
742        )));
743        (clock, cache, emulator)
744    }
745
746    fn create_test_stop_order() -> OrderAny {
747        let instrument = audusd_sim();
748        OrderTestBuilder::new(OrderType::StopMarket)
749            .instrument_id(instrument.id())
750            .side(OrderSide::Buy)
751            .trigger_price(Price::from("1.00050"))
752            .quantity(Quantity::from(100_000))
753            .emulation_trigger(TriggerType::BidAsk)
754            .build()
755    }
756
757    // Creates a `SubmitOrder` command suitable for seeding `submit_order_commands`
758    // so that whether `cancel_order` removed the entry can be observed.
759    fn make_submit_command(order: &OrderAny) -> SubmitOrder {
760        SubmitOrder::new(
761            order.trader_id(),
762            None,
763            order.strategy_id(),
764            order.instrument_id(),
765            order.client_order_id(),
766            order.init_event().clone(),
767            None,
768            None,
769            None,
770            UUID4::new(),
771            UnixNanos::default(),
772            None, // correlation_id
773        )
774    }
775
776    fn subscribe_order_topic(
777        strategy_id: StrategyId,
778    ) -> (TypedHandler<OrderEventAny>, Rc<RefCell<Vec<OrderEventAny>>>) {
779        let events = Rc::new(RefCell::new(Vec::new()));
780        let handler = TypedHandler::from({
781            let events = events.clone();
782            move |event: &OrderEventAny| {
783                events.borrow_mut().push(event.clone());
784            }
785        });
786        msgbus::subscribe_order_events(
787            format!("events.order.{strategy_id}").into(),
788            handler.clone(),
789            None,
790        );
791        (handler, events)
792    }
793
794    #[rstest]
795    fn test_order_manager_with_handlers() {
796        let (clock, cache, emulator) = create_test_components();
797        let submit_handler =
798            SubmitOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
799        let cancel_handler =
800            CancelOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
801        let modify_handler =
802            ModifyOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
803
804        let manager = OrderManager::new(
805            clock,
806            cache,
807            true,
808            Some(submit_handler),
809            Some(cancel_handler),
810            Some(modify_handler),
811        );
812
813        assert!(manager.submit_order_handler.is_some());
814        assert!(manager.cancel_order_handler.is_some());
815        assert!(manager.modify_order_handler.is_some());
816    }
817
818    #[rstest]
819    fn test_order_manager_cancel_order_dispatches_to_handler() {
820        let (clock, cache, emulator) = create_test_components();
821        let cancel_handler =
822            CancelOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
823        let mut manager =
824            OrderManager::new(clock, cache.clone(), true, None, Some(cancel_handler), None);
825        let order = create_test_stop_order();
826        cache
827            .borrow_mut()
828            .add_order(order.clone(), None, None, false)
829            .unwrap();
830        manager
831            .submit_order_commands
832            .insert(order.client_order_id(), make_submit_command(&order));
833
834        manager.cancel_order(&order);
835
836        assert!(
837            !manager
838                .submit_order_commands
839                .contains_key(&order.client_order_id()),
840            "expected dispatch path to remove the submit command",
841        );
842    }
843
844    #[rstest]
845    fn test_order_manager_modify_order_dispatches_to_handler() {
846        let (clock, cache, emulator) = create_test_components();
847        let modify_handler =
848            ModifyOrderHandlerAny::OrderEmulator(WeakCell::from(Rc::downgrade(&emulator)));
849        let mut manager = OrderManager::new(clock, cache, true, None, None, Some(modify_handler));
850        let order = create_test_stop_order();
851        let new_quantity = Quantity::from(50_000);
852
853        manager.modify_order_quantity(&order, new_quantity);
854    }
855
856    #[rstest]
857    fn test_create_new_submit_order_publishes_initialized_for_new_order() {
858        let (clock, cache, _emulator) = create_test_components();
859        let mut manager = OrderManager::new(clock, cache, true, None, None, None);
860        let order = create_test_stop_order();
861        let strategy_id = order.strategy_id();
862        let (handler, events) = subscribe_order_topic(strategy_id);
863
864        manager
865            .create_new_submit_order(&order, None, None, None)
866            .unwrap();
867
868        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
869        let events = events.borrow();
870
871        assert_eq!(events.len(), 1);
872        assert!(matches!(
873            &events[0],
874            OrderEventAny::Initialized(event) if event.client_order_id == order.client_order_id()
875        ));
876    }
877
878    #[rstest]
879    fn test_create_new_submit_order_does_not_republish_initialized_for_existing_order() {
880        let (clock, cache, _emulator) = create_test_components();
881        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
882        let order = create_test_stop_order();
883        let strategy_id = order.strategy_id();
884        cache
885            .borrow_mut()
886            .add_order(order.clone(), None, None, true)
887            .unwrap();
888        let (handler, events) = subscribe_order_topic(strategy_id);
889
890        manager
891            .create_new_submit_order(&order, None, None, None)
892            .unwrap();
893
894        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
895        assert!(events.borrow().is_empty());
896    }
897
898    #[rstest]
899    fn test_order_manager_without_handlers() {
900        let (clock, cache, _emulator) = create_test_components();
901        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
902        let order = create_test_stop_order();
903        cache
904            .borrow_mut()
905            .add_order(order.clone(), None, None, false)
906            .unwrap();
907        manager
908            .submit_order_commands
909            .insert(order.client_order_id(), make_submit_command(&order));
910
911        manager.cancel_order(&order);
912        manager.modify_order_quantity(&order, Quantity::from(50_000));
913
914        assert!(
915            !manager
916                .submit_order_commands
917                .contains_key(&order.client_order_id()),
918            "no-handler dispatch path should still remove the submit command",
919        );
920    }
921
922    #[rstest]
923    fn test_handle_order_filled_skips_missing_oco_contingent_order() {
924        let (clock, cache, _emulator) = create_test_components();
925        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
926        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
927        let missing_client_order_id = ClientOrderId::from("O-MISSING");
928        let valid_client_order_id = ClientOrderId::from("O-CHILD");
929        let order = OrderTestBuilder::new(OrderType::Limit)
930            .instrument_id(instrument.id())
931            .client_order_id(ClientOrderId::from("O-PARENT"))
932            .side(OrderSide::Buy)
933            .price(Price::from("1.00000"))
934            .quantity(Quantity::from(100_000))
935            .contingency_type(ContingencyType::Oco)
936            .linked_order_ids(vec![missing_client_order_id, valid_client_order_id])
937            .build();
938        let child_order = OrderTestBuilder::new(OrderType::Limit)
939            .instrument_id(instrument.id())
940            .client_order_id(valid_client_order_id)
941            .side(OrderSide::Buy)
942            .price(Price::from("1.00000"))
943            .quantity(Quantity::from(100_000))
944            .build();
945        cache
946            .borrow_mut()
947            .add_order(order.clone(), None, None, false)
948            .unwrap();
949        cache
950            .borrow_mut()
951            .add_order(child_order.clone(), None, None, false)
952            .unwrap();
953        manager
954            .submit_order_commands
955            .insert(valid_client_order_id, make_submit_command(&child_order));
956        let filled = match TestOrderEventStubs::filled(
957            &order,
958            &instrument,
959            None,
960            None,
961            None,
962            None,
963            None,
964            None,
965            None,
966            Some(AccountId::from("SIM-001")),
967        ) {
968            OrderEventAny::Filled(event) => event,
969            event => panic!("expected OrderFilled, was {event:?}"),
970        };
971
972        manager.handle_order_filled(filled);
973
974        assert!(
975            !manager
976                .submit_order_commands
977                .contains_key(&valid_client_order_id)
978        );
979    }
980
981    #[rstest]
982    fn test_handle_order_filled_skips_missing_oto_child_order() {
983        let (clock, cache, _emulator) = create_test_components();
984        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
985        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
986        let missing_client_order_id = ClientOrderId::from("O-MISSING");
987        let valid_client_order_id = ClientOrderId::from("O-CHILD");
988        let order = OrderTestBuilder::new(OrderType::Limit)
989            .instrument_id(instrument.id())
990            .client_order_id(ClientOrderId::from("O-PARENT"))
991            .side(OrderSide::Buy)
992            .price(Price::from("1.00000"))
993            .quantity(Quantity::from(100_000))
994            .contingency_type(ContingencyType::Oto)
995            .linked_order_ids(vec![missing_client_order_id, valid_client_order_id])
996            .build();
997        let child_order = OrderTestBuilder::new(OrderType::Limit)
998            .instrument_id(instrument.id())
999            .client_order_id(valid_client_order_id)
1000            .side(OrderSide::Buy)
1001            .price(Price::from("1.00000"))
1002            .quantity(Quantity::from(100_000))
1003            .emulation_trigger(TriggerType::NoTrigger)
1004            .build();
1005        cache
1006            .borrow_mut()
1007            .add_order(order.clone(), None, None, false)
1008            .unwrap();
1009        cache
1010            .borrow_mut()
1011            .add_order(child_order, None, None, false)
1012            .unwrap();
1013        let filled = match TestOrderEventStubs::filled(
1014            &order,
1015            &instrument,
1016            None,
1017            None,
1018            None,
1019            None,
1020            None,
1021            None,
1022            None,
1023            Some(AccountId::from("SIM-001")),
1024        ) {
1025            OrderEventAny::Filled(event) => event,
1026            event => panic!("expected OrderFilled, was {event:?}"),
1027        };
1028
1029        manager.handle_order_filled(filled);
1030
1031        assert!(
1032            manager
1033                .submit_order_commands
1034                .contains_key(&valid_client_order_id)
1035        );
1036    }
1037
1038    #[rstest]
1039    fn test_handle_contingencies_skips_missing_linked_order() {
1040        let (clock, cache, _emulator) = create_test_components();
1041        let mut manager = OrderManager::new(clock, cache, true, None, None, None);
1042        let instrument = audusd_sim();
1043        let order = OrderTestBuilder::new(OrderType::Limit)
1044            .instrument_id(instrument.id())
1045            .client_order_id(ClientOrderId::from("O-PARENT"))
1046            .side(OrderSide::Buy)
1047            .price(Price::from("1.00000"))
1048            .quantity(Quantity::from(100_000))
1049            .contingency_type(ContingencyType::Oco)
1050            .linked_order_ids(vec![ClientOrderId::from("O-MISSING")])
1051            .build();
1052
1053        manager.handle_contingencies(&order);
1054
1055        assert!(manager.submit_order_commands.is_empty());
1056    }
1057
1058    #[rstest]
1059    fn test_handle_contingencies_update_skips_missing_linked_order() {
1060        let (clock, cache, _emulator) = create_test_components();
1061        let mut manager = OrderManager::new(clock, cache, true, None, None, None);
1062        let instrument = audusd_sim();
1063        let order = OrderTestBuilder::new(OrderType::Limit)
1064            .instrument_id(instrument.id())
1065            .client_order_id(ClientOrderId::from("O-PARENT"))
1066            .side(OrderSide::Buy)
1067            .price(Price::from("1.00000"))
1068            .quantity(Quantity::from(100_000))
1069            .contingency_type(ContingencyType::Oco)
1070            .linked_order_ids(vec![ClientOrderId::from("O-MISSING")])
1071            .build();
1072
1073        manager.handle_contingencies_update(&order);
1074
1075        assert!(manager.submit_order_commands.is_empty());
1076    }
1077
1078    #[rstest]
1079    fn test_cancel_order_skips_when_pending_cancel_local() {
1080        let (clock, cache, _emulator) = create_test_components();
1081        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
1082        let order = create_test_stop_order();
1083        cache
1084            .borrow_mut()
1085            .add_order(order.clone(), None, None, false)
1086            .unwrap();
1087        cache.borrow_mut().update_order_pending_cancel_local(&order);
1088        manager
1089            .submit_order_commands
1090            .insert(order.client_order_id(), make_submit_command(&order));
1091
1092        manager.cancel_order(&order);
1093
1094        assert!(
1095            manager
1096                .submit_order_commands
1097                .contains_key(&order.client_order_id()),
1098            "pending-cancel-local gate should short-circuit before removing the submit command",
1099        );
1100    }
1101
1102    #[rstest]
1103    fn test_cancel_order_skips_when_passed_order_is_closed() {
1104        // The caller has applied a closing event to its local clone but has
1105        // not yet called `cache.update_order`, so the cache index still
1106        // reports open. The gate must short-circuit on the local state.
1107        let (clock, cache, _emulator) = create_test_components();
1108        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
1109
1110        let mut order = OrderTestBuilder::new(OrderType::StopMarket)
1111            .instrument_id(audusd_sim().id())
1112            .side(OrderSide::Buy)
1113            .trigger_price(Price::from("1.00050"))
1114            .quantity(Quantity::from(100_000))
1115            .emulation_trigger(TriggerType::BidAsk)
1116            .submit(true)
1117            .build();
1118
1119        cache
1120            .borrow_mut()
1121            .add_order(order.clone(), None, None, false)
1122            .unwrap();
1123
1124        let canceled_event =
1125            TestOrderEventStubs::canceled(&order, AccountId::from("ACCOUNT-001"), None);
1126        order.apply(canceled_event).unwrap();
1127
1128        assert!(order.is_closed());
1129        assert!(!cache.borrow().is_order_closed(&order.client_order_id()));
1130
1131        manager
1132            .submit_order_commands
1133            .insert(order.client_order_id(), make_submit_command(&order));
1134
1135        manager.cancel_order(&order);
1136
1137        assert!(
1138            manager
1139                .submit_order_commands
1140                .contains_key(&order.client_order_id()),
1141            "closed-order gate should short-circuit on the local state when the cache index is stale",
1142        );
1143    }
1144
1145    #[rstest]
1146    fn test_cancel_order_skips_when_cache_index_marks_closed() {
1147        // The passed `OrderAny` is intentionally a stale (Submitted) clone so
1148        // this test would fail if `cancel_order` checked `order.is_closed()`
1149        // on the argument instead of `cache.is_order_closed(&id)`.
1150        let (clock, cache, _emulator) = create_test_components();
1151        let mut manager = OrderManager::new(clock, cache.clone(), true, None, None, None);
1152
1153        let mut order = OrderTestBuilder::new(OrderType::StopMarket)
1154            .instrument_id(audusd_sim().id())
1155            .side(OrderSide::Buy)
1156            .trigger_price(Price::from("1.00050"))
1157            .quantity(Quantity::from(100_000))
1158            .emulation_trigger(TriggerType::BidAsk)
1159            .submit(true)
1160            .build();
1161
1162        cache
1163            .borrow_mut()
1164            .add_order(order.clone(), None, None, false)
1165            .unwrap();
1166
1167        let stale_order = order.clone();
1168
1169        let canceled_event =
1170            TestOrderEventStubs::canceled(&order, AccountId::from("ACCOUNT-001"), None);
1171        order = cache.borrow_mut().update_order(&canceled_event).unwrap();
1172
1173        assert!(cache.borrow().is_order_closed(&order.client_order_id()));
1174
1175        manager.submit_order_commands.insert(
1176            stale_order.client_order_id(),
1177            make_submit_command(&stale_order),
1178        );
1179
1180        manager.cancel_order(&stale_order);
1181
1182        assert!(
1183            manager
1184                .submit_order_commands
1185                .contains_key(&stale_order.client_order_id()),
1186            "closed-order gate should short-circuit even when the passed reference is stale",
1187        );
1188    }
1189}