Skip to main content

nautilus_trading/algorithm/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution algorithm infrastructure for order slicing and execution optimization.
17//!
18//! This module provides the [`ExecutionAlgorithm`] trait and supporting infrastructure
19//! for implementing algorithms like TWAP (Time-Weighted Average Price) and VWAP
20//! (Volume-Weighted Average Price) that slice large orders into smaller child orders.
21//!
22//! # Architecture
23//!
24//! Execution algorithms extend [`DataActor`] (not [`Strategy`](super::Strategy)) because:
25//! - They don't own positions (the parent Strategy does).
26//! - Spawned orders carry the parent Strategy's ID, not the algorithm's ID.
27//! - They act as order processors/transformers, not position managers.
28//!
29//! # Order Flow
30//!
31//! 1. A Strategy submits an order with `exec_algorithm_id` set.
32//! 2. The order is routed to the algorithm's `{id}.execute` endpoint.
33//! 3. The algorithm receives the order via `on_order()`.
34//! 4. The algorithm spawns child orders using `spawn_market()`, `spawn_limit()`, etc.
35//! 5. Spawned orders are submitted through the RiskEngine.
36//! 6. The algorithm receives fill events and manages remaining quantity.
37
38pub mod config;
39pub mod core;
40pub mod twap;
41
42pub use core::{ExecutionAlgorithmCore, StrategyEventHandlers};
43
44pub use config::{ExecutionAlgorithmConfig, ImportableExecAlgorithmConfig};
45use nautilus_common::{
46    actor::{DataActor, registry::try_get_actor_unchecked},
47    enums::ComponentState,
48    logging::{CMD, EVT, RECV, SEND},
49    messages::execution::{CancelOrder, ModifyOrder, SubmitOrder, TradingCommand},
50    msgbus::{self, MessagingSwitchboard, TypedHandler},
51    timer::TimeEvent,
52};
53use nautilus_core::{UUID4, UnixNanos};
54use nautilus_model::{
55    enums::{OrderStatus, TimeInForce, TriggerType},
56    events::{
57        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied, OrderEmulated,
58        OrderEventAny, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
59        OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted,
60        OrderTriggered, OrderUpdated, PositionChanged, PositionClosed, PositionEvent,
61        PositionOpened,
62    },
63    identifiers::{ClientId, ExecAlgorithmId, PositionId, StrategyId},
64    orders::{LimitOrder, MarketOrder, MarketToLimitOrder, Order, OrderAny, OrderError, OrderList},
65    types::{Price, Quantity},
66};
67pub use twap::{TwapAlgorithm, TwapAlgorithmConfig};
68use ustr::Ustr;
69
70/// Core trait for implementing execution algorithms in NautilusTrader.
71///
72/// Execution algorithms are specialized [`DataActor`]s that receive orders from strategies
73/// and execute them by spawning child orders. They are used for order slicing algorithms
74/// like TWAP and VWAP.
75///
76/// # Key Capabilities
77///
78/// - All [`DataActor`] capabilities (data subscriptions, event handling, timers)
79/// - Order spawning (market, limit, market-to-limit)
80/// - Order lifecycle management (submit, modify, cancel)
81/// - Event filtering for algorithm-owned orders
82///
83/// # Implementation
84///
85/// User algorithms should implement the required methods and hold an
86/// [`ExecutionAlgorithmCore`] member. The struct should `Deref` and `DerefMut`
87/// to `ExecutionAlgorithmCore` (which itself derefs to `DataActorCore`).
88pub trait ExecutionAlgorithm: DataActor {
89    /// Provides mutable access to the internal `ExecutionAlgorithmCore`.
90    fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore;
91
92    /// Returns the execution algorithm ID.
93    fn id(&mut self) -> ExecAlgorithmId {
94        self.core_mut().exec_algorithm_id
95    }
96
97    /// Executes a trading command.
98    ///
99    /// This is the main entry point for commands routed to the algorithm.
100    /// Dispatches to the appropriate handler based on command type.
101    ///
102    /// Commands are only processed when the algorithm is in `Running` state.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if command handling fails.
107    fn execute(&mut self, command: TradingCommand) -> anyhow::Result<()>
108    where
109        Self: 'static + std::fmt::Debug + Sized,
110    {
111        let core = self.core_mut();
112        if core.config.log_commands {
113            let id = &core.actor.actor_id;
114            log::info!("{id} {RECV}{CMD} {command:?}");
115        }
116
117        if core.state() != ComponentState::Running {
118            return Ok(());
119        }
120
121        match command {
122            TradingCommand::SubmitOrder(cmd) => {
123                self.subscribe_to_strategy_events(cmd.strategy_id);
124                let order = self.core_mut().get_order(&cmd.client_order_id)?;
125                self.on_order(order)
126            }
127            TradingCommand::SubmitOrderList(cmd) => {
128                self.subscribe_to_strategy_events(cmd.strategy_id);
129                let orders = self.core_mut().get_orders_for_list(&cmd.order_list)?;
130                self.on_order_list(cmd.order_list, orders)
131            }
132            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(cmd),
133            _ => {
134                log::warn!("Unhandled command type: {command:?}");
135                Ok(())
136            }
137        }
138    }
139
140    /// Called when a primary order is received for execution.
141    ///
142    /// Override this method to implement the algorithm's order slicing logic.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if order handling fails.
147    fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()>;
148
149    /// Called when an order list is received for execution.
150    ///
151    /// Override this method to handle order lists. The default implementation
152    /// processes each order individually.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if order list handling fails.
157    fn on_order_list(
158        &mut self,
159        _order_list: OrderList,
160        orders: Vec<OrderAny>,
161    ) -> anyhow::Result<()> {
162        for order in orders {
163            self.on_order(order)?;
164        }
165        Ok(())
166    }
167
168    /// Handles a cancel order command for algorithm-managed orders.
169    ///
170    /// This generates an internal cancel event and publishes it. The order
171    /// is canceled locally without sending a command to the execution engine.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if cancellation fails.
176    fn handle_cancel_order(&mut self, command: CancelOrder) -> anyhow::Result<()> {
177        let (order, is_pending_cancel) = {
178            let cache = self.core_mut().cache();
179
180            let Some(order) = cache.order(&command.client_order_id) else {
181                log::warn!(
182                    "Cannot cancel order: {} not found in cache",
183                    command.client_order_id
184                );
185                return Ok(());
186            };
187
188            let is_pending = cache.is_order_pending_cancel_local(&command.client_order_id);
189            (order.clone(), is_pending)
190        };
191
192        if is_pending_cancel {
193            return Ok(());
194        }
195
196        if order.is_closed() {
197            log::warn!("Order already closed for {command:?}");
198            return Ok(());
199        }
200
201        let event = OrderEventAny::Canceled(self.generate_order_canceled(&order));
202
203        let order = {
204            let cache_rc = self.core_mut().cache_rc();
205            let mut cache = cache_rc.borrow_mut();
206            match cache.update_order(&event) {
207                Ok(order) => order,
208                Err(e)
209                    if matches!(
210                        e.downcast_ref::<OrderError>(),
211                        Some(OrderError::InvalidStateTransition)
212                    ) =>
213                {
214                    log::warn!("InvalidStateTrigger: {e}, did not apply cancel event");
215                    return Ok(());
216                }
217                Err(e) => return Err(e),
218            }
219        };
220
221        let topic = format!("events.order.{}", order.strategy_id());
222        msgbus::publish_order_event(topic.into(), &event);
223
224        Ok(())
225    }
226
227    /// Generates an OrderCanceled event for an order.
228    fn generate_order_canceled(&mut self, order: &OrderAny) -> OrderCanceled {
229        let ts_now = self.core_mut().clock().timestamp_ns();
230
231        OrderCanceled::new(
232            order.trader_id(),
233            order.strategy_id(),
234            order.instrument_id(),
235            order.client_order_id(),
236            UUID4::new(),
237            ts_now,
238            ts_now,
239            false, // reconciliation
240            order.venue_order_id(),
241            order.account_id(),
242        )
243    }
244
245    /// Generates an OrderPendingUpdate event for an order.
246    fn generate_order_pending_update(&mut self, order: &OrderAny) -> OrderPendingUpdate {
247        let ts_now = self.core_mut().clock().timestamp_ns();
248
249        OrderPendingUpdate::new(
250            order.trader_id(),
251            order.strategy_id(),
252            order.instrument_id(),
253            order.client_order_id(),
254            order
255                .account_id()
256                .expect("Order must have account_id for pending update"),
257            UUID4::new(),
258            ts_now,
259            ts_now,
260            false, // reconciliation
261            order.venue_order_id(),
262        )
263    }
264
265    /// Generates an OrderPendingCancel event for an order.
266    fn generate_order_pending_cancel(&mut self, order: &OrderAny) -> OrderPendingCancel {
267        let ts_now = self.core_mut().clock().timestamp_ns();
268
269        OrderPendingCancel::new(
270            order.trader_id(),
271            order.strategy_id(),
272            order.instrument_id(),
273            order.client_order_id(),
274            order
275                .account_id()
276                .expect("Order must have account_id for pending cancel"),
277            UUID4::new(),
278            ts_now,
279            ts_now,
280            false, // reconciliation
281            order.venue_order_id(),
282        )
283    }
284
285    /// Spawns a market order from a primary order.
286    ///
287    /// Creates a new market order with:
288    /// - A unique client order ID: `{primary_id}-E{sequence}`.
289    /// - The primary order's trader ID, strategy ID, and instrument ID.
290    /// - The algorithm's exec_algorithm_id.
291    /// - exec_spawn_id set to the primary order's client order ID.
292    ///
293    /// If `reduce_primary` is true, the primary order's quantity will be reduced
294    /// by the spawned quantity. If the spawned order is subsequently denied or
295    /// rejected (before acceptance), the deducted quantity is automatically
296    /// restored to the primary order.
297    fn spawn_market(
298        &mut self,
299        primary: &mut OrderAny,
300        quantity: Quantity,
301        time_in_force: TimeInForce,
302        reduce_only: bool,
303        tags: Option<Vec<Ustr>>,
304        reduce_primary: bool,
305    ) -> MarketOrder {
306        // Generate spawn ID first so we can track the reduction
307        let core = self.core_mut();
308        let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
309        let ts_init = core.clock().timestamp_ns();
310        let exec_algorithm_id = core.exec_algorithm_id;
311
312        if reduce_primary {
313            self.reduce_primary_order(primary, quantity);
314            self.core_mut()
315                .track_pending_spawn_reduction(client_order_id, quantity);
316        }
317
318        MarketOrder::new(
319            primary.trader_id(),
320            primary.strategy_id(),
321            primary.instrument_id(),
322            client_order_id,
323            primary.order_side(),
324            quantity,
325            time_in_force,
326            UUID4::new(),
327            ts_init,
328            reduce_only,
329            primary.is_quote_quantity(),
330            primary.contingency_type(),
331            primary.order_list_id(),
332            primary.linked_order_ids().map(|ids| ids.to_vec()),
333            primary.parent_order_id(),
334            Some(exec_algorithm_id),
335            primary.exec_algorithm_params().cloned(),
336            Some(primary.client_order_id()),
337            tags.or_else(|| primary.tags().map(|t| t.to_vec())),
338        )
339    }
340
341    /// Spawns a limit order from a primary order.
342    ///
343    /// Creates a new limit order with:
344    /// - A unique client order ID: `{primary_id}-E{sequence}`
345    /// - The primary order's trader ID, strategy ID, and instrument ID
346    /// - The algorithm's exec_algorithm_id
347    /// - exec_spawn_id set to the primary order's client order ID
348    ///
349    /// If `reduce_primary` is true, the primary order's quantity will be reduced
350    /// by the spawned quantity. If the spawned order is subsequently denied or
351    /// rejected (before acceptance), the deducted quantity is automatically
352    /// restored to the primary order.
353    #[expect(clippy::too_many_arguments)]
354    fn spawn_limit(
355        &mut self,
356        primary: &mut OrderAny,
357        quantity: Quantity,
358        price: Price,
359        time_in_force: TimeInForce,
360        expire_time: Option<UnixNanos>,
361        post_only: bool,
362        reduce_only: bool,
363        display_qty: Option<Quantity>,
364        emulation_trigger: Option<TriggerType>,
365        tags: Option<Vec<Ustr>>,
366        reduce_primary: bool,
367    ) -> LimitOrder {
368        // Generate spawn ID first so we can track the reduction
369        let core = self.core_mut();
370        let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
371        let ts_init = core.clock().timestamp_ns();
372        let exec_algorithm_id = core.exec_algorithm_id;
373
374        if reduce_primary {
375            self.reduce_primary_order(primary, quantity);
376            self.core_mut()
377                .track_pending_spawn_reduction(client_order_id, quantity);
378        }
379
380        LimitOrder::new(
381            primary.trader_id(),
382            primary.strategy_id(),
383            primary.instrument_id(),
384            client_order_id,
385            primary.order_side(),
386            quantity,
387            price,
388            time_in_force,
389            expire_time,
390            post_only,
391            reduce_only,
392            primary.is_quote_quantity(),
393            display_qty,
394            emulation_trigger,
395            None, // trigger_instrument_id
396            primary.contingency_type(),
397            primary.order_list_id(),
398            primary.linked_order_ids().map(|ids| ids.to_vec()),
399            primary.parent_order_id(),
400            Some(exec_algorithm_id),
401            primary.exec_algorithm_params().cloned(),
402            Some(primary.client_order_id()),
403            tags.or_else(|| primary.tags().map(|t| t.to_vec())),
404            UUID4::new(),
405            ts_init,
406        )
407    }
408
409    /// Spawns a market-to-limit order from a primary order.
410    ///
411    /// Creates a new market-to-limit order with:
412    /// - A unique client order ID: `{primary_id}-E{sequence}`
413    /// - The primary order's trader ID, strategy ID, and instrument ID
414    /// - The algorithm's exec_algorithm_id
415    /// - exec_spawn_id set to the primary order's client order ID
416    ///
417    /// If `reduce_primary` is true, the primary order's quantity will be reduced
418    /// by the spawned quantity. If the spawned order is subsequently denied or
419    /// rejected (before acceptance), the deducted quantity is automatically
420    /// restored to the primary order.
421    #[expect(clippy::too_many_arguments)]
422    fn spawn_market_to_limit(
423        &mut self,
424        primary: &mut OrderAny,
425        quantity: Quantity,
426        time_in_force: TimeInForce,
427        expire_time: Option<UnixNanos>,
428        reduce_only: bool,
429        display_qty: Option<Quantity>,
430        emulation_trigger: Option<TriggerType>,
431        tags: Option<Vec<Ustr>>,
432        reduce_primary: bool,
433    ) -> MarketToLimitOrder {
434        // Generate spawn ID first so we can track the reduction
435        let core = self.core_mut();
436        let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
437        let ts_init = core.clock().timestamp_ns();
438        let exec_algorithm_id = core.exec_algorithm_id;
439
440        if reduce_primary {
441            self.reduce_primary_order(primary, quantity);
442            self.core_mut()
443                .track_pending_spawn_reduction(client_order_id, quantity);
444        }
445
446        let mut order = MarketToLimitOrder::new(
447            primary.trader_id(),
448            primary.strategy_id(),
449            primary.instrument_id(),
450            client_order_id,
451            primary.order_side(),
452            quantity,
453            time_in_force,
454            expire_time,
455            false, // post_only
456            reduce_only,
457            primary.is_quote_quantity(),
458            display_qty,
459            primary.contingency_type(),
460            primary.order_list_id(),
461            primary.linked_order_ids().map(|ids| ids.to_vec()),
462            primary.parent_order_id(),
463            Some(exec_algorithm_id),
464            primary.exec_algorithm_params().cloned(),
465            Some(primary.client_order_id()),
466            tags.or_else(|| primary.tags().map(|t| t.to_vec())),
467            UUID4::new(),
468            ts_init,
469        );
470
471        if emulation_trigger.is_some() {
472            order.set_emulation_trigger(emulation_trigger);
473        }
474
475        order
476    }
477
478    /// Reduces the primary order's quantity by the spawn quantity.
479    ///
480    /// Generates an `OrderUpdated` event and applies it to the primary order,
481    /// then updates the order in the cache.
482    ///
483    /// # Panics
484    ///
485    /// Panics if `spawn_qty` exceeds the primary order's `leaves_qty`.
486    fn reduce_primary_order(&mut self, primary: &mut OrderAny, spawn_qty: Quantity) {
487        let leaves_qty = primary.leaves_qty();
488        assert!(
489            leaves_qty >= spawn_qty,
490            "Spawn quantity {spawn_qty} exceeds primary leaves_qty {leaves_qty}"
491        );
492
493        let primary_qty = primary.quantity();
494        let new_qty = Quantity::from_raw(primary_qty.raw - spawn_qty.raw, primary_qty.precision);
495
496        let core = self.core_mut();
497        let ts_now = core.clock().timestamp_ns();
498
499        let updated = OrderUpdated::new(
500            primary.trader_id(),
501            primary.strategy_id(),
502            primary.instrument_id(),
503            primary.client_order_id(),
504            new_qty,
505            UUID4::new(),
506            ts_now,
507            ts_now,
508            false, // reconciliation
509            primary.venue_order_id(),
510            primary.account_id(),
511            None, // price
512            None, // trigger_price
513            None, // protection_price
514            primary.is_quote_quantity(),
515        );
516
517        let event = OrderEventAny::Updated(updated);
518
519        {
520            let cache_rc = core.cache_rc();
521            let mut cache = cache_rc.borrow_mut();
522            *primary = cache
523                .update_order(&event)
524                .expect("Failed to update order in cache");
525        }
526
527        publish_order_event(&event);
528    }
529
530    /// Restores the primary order quantity after a spawned order is denied or rejected.
531    ///
532    /// This is called when a spawned order fails before acceptance. The quantity
533    /// that was deducted from the primary order is restored (up to the spawned
534    /// order's leaves_qty to handle partial fills).
535    fn restore_primary_order_quantity(&mut self, order: &OrderAny) {
536        let Some(exec_spawn_id) = order.exec_spawn_id() else {
537            return;
538        };
539
540        let reduction_qty = {
541            let core = self.core_mut();
542            core.take_pending_spawn_reduction(&order.client_order_id())
543        };
544
545        let Some(reduction_qty) = reduction_qty else {
546            return;
547        };
548
549        let primary = {
550            let cache = self.core_mut().cache();
551            cache.order(&exec_spawn_id).map(|o| o.clone())
552        };
553
554        let Some(primary) = primary else {
555            log::warn!(
556                "Cannot restore primary order quantity: primary order {exec_spawn_id} not found",
557            );
558            return;
559        };
560
561        // Cap restore amount by leaves_qty to handle partial fills before rejection
562        let restore_raw = std::cmp::min(reduction_qty.raw, order.leaves_qty().raw);
563        if restore_raw == 0 {
564            return;
565        }
566
567        let restored_qty = Quantity::from_raw(
568            primary.quantity().raw + restore_raw,
569            primary.quantity().precision,
570        );
571
572        let core = self.core_mut();
573        let ts_now = core.clock().timestamp_ns();
574
575        let updated = OrderUpdated::new(
576            primary.trader_id(),
577            primary.strategy_id(),
578            primary.instrument_id(),
579            primary.client_order_id(),
580            restored_qty,
581            UUID4::new(),
582            ts_now,
583            ts_now,
584            false, // reconciliation
585            primary.venue_order_id(),
586            primary.account_id(),
587            None, // price
588            None, // trigger_price
589            None, // protection_price
590            primary.is_quote_quantity(),
591        );
592
593        let event = OrderEventAny::Updated(updated);
594
595        let primary = {
596            let cache_rc = core.cache_rc();
597            let mut cache = cache_rc.borrow_mut();
598            match cache.update_order(&event) {
599                Ok(primary) => primary,
600                Err(e) => {
601                    log::warn!("Failed to update primary order in cache: {e}");
602                    return;
603                }
604            }
605        };
606
607        publish_order_event(&event);
608
609        log::info!(
610            "Restored primary order {} quantity to {} after spawned order {} was denied/rejected",
611            primary.client_order_id(),
612            restored_qty,
613            order.client_order_id()
614        );
615    }
616
617    /// Submits an order to the execution engine via the risk engine.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if order submission fails.
622    fn submit_order(
623        &mut self,
624        order: OrderAny,
625        position_id: Option<PositionId>,
626        client_id: Option<ClientId>,
627    ) -> anyhow::Result<()> {
628        let core = self.core_mut();
629
630        let trader_id = core.trader_id().expect("Trader ID not set");
631        let ts_init = core.clock().timestamp_ns();
632
633        // For spawned orders, use the parent's strategy ID
634        let strategy_id = order.strategy_id();
635
636        let order_exists = {
637            let cache = core.cache();
638            cache.order_exists(&order.client_order_id())
639        };
640
641        {
642            let cache_rc = core.cache_rc();
643            let mut cache = cache_rc.borrow_mut();
644            cache.add_order(order.clone(), position_id, client_id, true)?;
645        }
646
647        if !order_exists {
648            publish_order_initialized(&order);
649        }
650
651        let command = SubmitOrder::new(
652            trader_id,
653            client_id,
654            strategy_id,
655            order.instrument_id(),
656            order.client_order_id(),
657            order.init_event().clone(),
658            order.exec_algorithm_id(),
659            position_id,
660            None, // params
661            UUID4::new(),
662            ts_init,
663        );
664
665        if core.config.log_commands {
666            let id = &core.actor.actor_id;
667            log::info!("{id} {SEND}{CMD} {command:?}");
668        }
669
670        msgbus::send_trading_command(
671            MessagingSwitchboard::risk_engine_execute(),
672            TradingCommand::SubmitOrder(command),
673        );
674
675        Ok(())
676    }
677
678    /// Modifies an order.
679    ///
680    /// # Errors
681    ///
682    /// Returns an error if order modification fails.
683    fn modify_order(
684        &mut self,
685        order: &mut OrderAny,
686        quantity: Option<Quantity>,
687        price: Option<Price>,
688        trigger_price: Option<Price>,
689        client_id: Option<ClientId>,
690    ) -> anyhow::Result<()> {
691        let qty_changing = quantity.is_some_and(|q| q != order.quantity());
692        let price_changing = price.is_some() && price != order.price();
693        let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
694
695        if !qty_changing && !price_changing && !trigger_changing {
696            log::error!(
697                "Cannot create command ModifyOrder: \
698                quantity, price and trigger were either None \
699                or the same as existing values"
700            );
701            return Ok(());
702        }
703
704        if order.is_closed() || order.is_pending_cancel() {
705            log::warn!(
706                "Cannot create command ModifyOrder: state is {:?}, {order:?}",
707                order.status()
708            );
709            return Ok(());
710        }
711
712        let core = self.core_mut();
713        let trader_id = core.trader_id().expect("Trader ID not set");
714        let strategy_id = order.strategy_id();
715
716        if !order.is_active_local() {
717            let event = self.generate_order_pending_update(order);
718            let event = OrderEventAny::PendingUpdate(event);
719
720            {
721                let cache_rc = self.core_mut().cache_rc();
722                let mut cache = cache_rc.borrow_mut();
723                match cache.update_order(&event) {
724                    Ok(updated) => *order = updated,
725                    Err(e)
726                        if matches!(
727                            e.downcast_ref::<OrderError>(),
728                            Some(OrderError::InvalidStateTransition)
729                        ) =>
730                    {
731                        log::warn!("InvalidStateTrigger: {e}, did not apply pending update event");
732                        return Ok(());
733                    }
734                    Err(e) => return Err(e),
735                }
736            }
737
738            let topic = format!("events.order.{strategy_id}");
739            msgbus::publish_order_event(topic.into(), &event);
740        }
741
742        let ts_init = self.core_mut().clock().timestamp_ns();
743        let command = ModifyOrder::new(
744            trader_id,
745            client_id,
746            strategy_id,
747            order.instrument_id(),
748            order.client_order_id(),
749            order.venue_order_id(),
750            quantity,
751            price,
752            trigger_price,
753            UUID4::new(),
754            ts_init,
755            None, // params
756        );
757
758        if self.core_mut().config.log_commands {
759            let id = &self.core_mut().actor.actor_id;
760            log::info!("{id} {SEND}{CMD} {command:?}");
761        }
762
763        let has_emulation_trigger = order
764            .emulation_trigger()
765            .is_some_and(|t| t != TriggerType::NoTrigger);
766
767        if order.is_emulated() || has_emulation_trigger {
768            msgbus::send_trading_command(
769                MessagingSwitchboard::order_emulator_execute(),
770                TradingCommand::ModifyOrder(command),
771            );
772        } else {
773            msgbus::send_trading_command(
774                MessagingSwitchboard::risk_engine_execute(),
775                TradingCommand::ModifyOrder(command),
776            );
777        }
778
779        Ok(())
780    }
781
782    /// Modifies an INITIALIZED or RELEASED order in place without sending a command.
783    ///
784    /// This is useful for adjusting order parameters before submission. The order
785    /// is updated locally by applying an `OrderUpdated` event and updating the cache.
786    ///
787    /// At least one parameter must differ from the current order values.
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if the order status is not INITIALIZED or RELEASED,
792    /// or if no parameters would change.
793    fn modify_order_in_place(
794        &mut self,
795        order: &mut OrderAny,
796        quantity: Option<Quantity>,
797        price: Option<Price>,
798        trigger_price: Option<Price>,
799    ) -> anyhow::Result<()> {
800        // Validate order status
801        let status = order.status();
802        if status != OrderStatus::Initialized && status != OrderStatus::Released {
803            anyhow::bail!(
804                "Cannot modify order in place: status is {status:?}, expected INITIALIZED or RELEASED"
805            );
806        }
807
808        // Validate order type compatibility
809        if price.is_some() && order.price().is_none() {
810            anyhow::bail!(
811                "Cannot modify order in place: {} orders do not have a LIMIT price",
812                order.order_type()
813            );
814        }
815
816        if trigger_price.is_some() && order.trigger_price().is_none() {
817            anyhow::bail!(
818                "Cannot modify order in place: {} orders do not have a STOP trigger price",
819                order.order_type()
820            );
821        }
822
823        // Check if any value would actually change
824        let qty_changing = quantity.is_some_and(|q| q != order.quantity());
825        let price_changing = price.is_some() && price != order.price();
826        let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
827
828        if !qty_changing && !price_changing && !trigger_changing {
829            anyhow::bail!("Cannot modify order in place: no parameters differ from current values");
830        }
831
832        let core = self.core_mut();
833        let ts_now = core.clock().timestamp_ns();
834
835        let updated = OrderUpdated::new(
836            order.trader_id(),
837            order.strategy_id(),
838            order.instrument_id(),
839            order.client_order_id(),
840            quantity.unwrap_or_else(|| order.quantity()),
841            UUID4::new(),
842            ts_now,
843            ts_now,
844            false, // reconciliation
845            order.venue_order_id(),
846            order.account_id(),
847            price,
848            trigger_price,
849            None, // protection_price
850            order.is_quote_quantity(),
851        );
852
853        let event = OrderEventAny::Updated(updated);
854
855        {
856            let cache_rc = core.cache_rc();
857            let mut cache = cache_rc.borrow_mut();
858            *order = cache.update_order(&event)?;
859        }
860
861        publish_order_event(&event);
862
863        Ok(())
864    }
865
866    /// Cancels an order.
867    ///
868    /// # Errors
869    ///
870    /// Returns an error if order cancellation fails.
871    fn cancel_order(
872        &mut self,
873        order: &mut OrderAny,
874        client_id: Option<ClientId>,
875    ) -> anyhow::Result<()> {
876        if order.is_closed() || order.is_pending_cancel() {
877            log::warn!(
878                "Cannot cancel order: state is {:?}, {order:?}",
879                order.status()
880            );
881            return Ok(());
882        }
883
884        let core = self.core_mut();
885        let trader_id = core.trader_id().expect("Trader ID not set");
886        let strategy_id = order.strategy_id();
887
888        if !order.is_active_local() {
889            let event = self.generate_order_pending_cancel(order);
890            let event = OrderEventAny::PendingCancel(event);
891
892            {
893                let cache_rc = self.core_mut().cache_rc();
894                let mut cache = cache_rc.borrow_mut();
895                match cache.update_order(&event) {
896                    Ok(updated) => *order = updated,
897                    Err(e)
898                        if matches!(
899                            e.downcast_ref::<OrderError>(),
900                            Some(OrderError::InvalidStateTransition)
901                        ) =>
902                    {
903                        log::warn!("InvalidStateTrigger: {e}, did not apply pending cancel event");
904                        return Ok(());
905                    }
906                    Err(e) => return Err(e),
907                }
908            }
909
910            let topic = format!("events.order.{strategy_id}");
911            msgbus::publish_order_event(topic.into(), &event);
912        }
913
914        let ts_init = self.core_mut().clock().timestamp_ns();
915        let command = CancelOrder::new(
916            trader_id,
917            client_id,
918            strategy_id,
919            order.instrument_id(),
920            order.client_order_id(),
921            order.venue_order_id(),
922            UUID4::new(),
923            ts_init,
924            None, // params
925        );
926
927        if self.core_mut().config.log_commands {
928            let id = &self.core_mut().actor.actor_id;
929            log::info!("{id} {SEND}{CMD} {command:?}");
930        }
931
932        let has_emulation_trigger = order
933            .emulation_trigger()
934            .is_some_and(|t| t != TriggerType::NoTrigger);
935
936        if order.is_emulated() || order.status() == OrderStatus::Released || has_emulation_trigger {
937            msgbus::send_trading_command(
938                MessagingSwitchboard::order_emulator_execute(),
939                TradingCommand::CancelOrder(command),
940            );
941        } else {
942            msgbus::send_trading_command(
943                MessagingSwitchboard::exec_engine_execute(),
944                TradingCommand::CancelOrder(command),
945            );
946        }
947
948        Ok(())
949    }
950
951    /// Subscribes to events from a strategy.
952    ///
953    /// This is called automatically when the first order is received from a strategy.
954    fn subscribe_to_strategy_events(&mut self, strategy_id: StrategyId)
955    where
956        Self: 'static + std::fmt::Debug + Sized,
957    {
958        let core = self.core_mut();
959        if core.is_strategy_subscribed(&strategy_id) {
960            return;
961        }
962
963        let actor_id = core.actor.actor_id.inner();
964
965        let order_topic = format!("events.order.{strategy_id}");
966        let order_actor_id = actor_id;
967        let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
968            if let Some(mut algo) = try_get_actor_unchecked::<Self>(&order_actor_id) {
969                algo.handle_order_event(event.clone());
970            } else {
971                log::error!(
972                    "ExecutionAlgorithm {order_actor_id} not found for order event handling"
973                );
974            }
975        });
976        msgbus::subscribe_order_events(order_topic.clone().into(), order_handler.clone(), None);
977
978        let position_topic = format!("events.position.{strategy_id}");
979        let position_handler = TypedHandler::from(move |event: &PositionEvent| {
980            if let Some(mut algo) = try_get_actor_unchecked::<Self>(&actor_id) {
981                algo.handle_position_event(event.clone());
982            } else {
983                log::error!("ExecutionAlgorithm {actor_id} not found for position event handling");
984            }
985        });
986        msgbus::subscribe_position_events(
987            position_topic.clone().into(),
988            position_handler.clone(),
989            None,
990        );
991
992        let handlers = StrategyEventHandlers {
993            order_topic,
994            order_handler,
995            position_topic,
996            position_handler,
997        };
998        core.store_strategy_event_handlers(strategy_id, handlers);
999
1000        core.add_subscribed_strategy(strategy_id);
1001        log::info!("Subscribed to events for strategy {strategy_id}");
1002    }
1003
1004    /// Unsubscribes from all strategy event handlers.
1005    ///
1006    /// This should be called before reset to properly clean up msgbus subscriptions.
1007    fn unsubscribe_all_strategy_events(&mut self) {
1008        let handlers = self.core_mut().take_strategy_event_handlers();
1009        for (strategy_id, h) in handlers {
1010            msgbus::unsubscribe_order_events(h.order_topic.into(), &h.order_handler);
1011            msgbus::unsubscribe_position_events(h.position_topic.into(), &h.position_handler);
1012            log::info!("Unsubscribed from events for strategy {strategy_id}");
1013        }
1014        self.core_mut().clear_subscribed_strategies();
1015    }
1016
1017    /// Handles an order event, filtering for algorithm-owned orders.
1018    fn handle_order_event(&mut self, event: OrderEventAny) {
1019        if self.core_mut().state() != ComponentState::Running {
1020            return;
1021        }
1022
1023        let order = {
1024            let cache = self.core_mut().cache();
1025            cache.order(&event.client_order_id()).map(|o| o.clone())
1026        };
1027
1028        let Some(order) = order else {
1029            return;
1030        };
1031
1032        let Some(order_algo_id) = order.exec_algorithm_id() else {
1033            return;
1034        };
1035
1036        if order_algo_id != self.id() {
1037            return;
1038        }
1039
1040        {
1041            let core = self.core_mut();
1042            if core.config.log_events {
1043                let id = &core.actor.actor_id;
1044                log::info!("{id} {RECV}{EVT} {event}");
1045            }
1046        }
1047
1048        match &event {
1049            OrderEventAny::Initialized(e) => self.on_order_initialized(e.clone()),
1050            OrderEventAny::Denied(e) => {
1051                self.restore_primary_order_quantity(&order);
1052                self.on_order_denied(*e);
1053            }
1054            OrderEventAny::Emulated(e) => self.on_order_emulated(*e),
1055            OrderEventAny::Released(e) => self.on_order_released(*e),
1056            OrderEventAny::Submitted(e) => self.on_order_submitted(*e),
1057            OrderEventAny::Rejected(e) => {
1058                self.restore_primary_order_quantity(&order);
1059                self.on_order_rejected(*e);
1060            }
1061            OrderEventAny::Accepted(e) => {
1062                // Commit reduction - order accepted by venue
1063                self.core_mut()
1064                    .take_pending_spawn_reduction(&order.client_order_id());
1065                self.on_order_accepted(*e);
1066            }
1067            OrderEventAny::Canceled(e) => {
1068                self.core_mut()
1069                    .take_pending_spawn_reduction(&order.client_order_id());
1070                self.on_algo_order_canceled(*e);
1071            }
1072            OrderEventAny::Expired(e) => {
1073                self.core_mut()
1074                    .take_pending_spawn_reduction(&order.client_order_id());
1075                self.on_order_expired(*e);
1076            }
1077            OrderEventAny::Triggered(e) => self.on_order_triggered(*e),
1078            OrderEventAny::PendingUpdate(e) => self.on_order_pending_update(*e),
1079            OrderEventAny::PendingCancel(e) => self.on_order_pending_cancel(*e),
1080            OrderEventAny::ModifyRejected(e) => self.on_order_modify_rejected(*e),
1081            OrderEventAny::CancelRejected(e) => self.on_order_cancel_rejected(*e),
1082            OrderEventAny::Updated(e) => self.on_order_updated(*e),
1083            OrderEventAny::Filled(e) => self.on_algo_order_filled(*e),
1084        }
1085
1086        self.on_order_event(event);
1087    }
1088
1089    /// Handles a position event.
1090    fn handle_position_event(&mut self, event: PositionEvent) {
1091        if self.core_mut().state() != ComponentState::Running {
1092            return;
1093        }
1094
1095        {
1096            let core = self.core_mut();
1097            if core.config.log_events {
1098                let id = &core.actor.actor_id;
1099                log::info!("{id} {RECV}{EVT} {event:?}");
1100            }
1101        }
1102
1103        match &event {
1104            PositionEvent::PositionOpened(e) => self.on_position_opened(e.clone()),
1105            PositionEvent::PositionChanged(e) => self.on_position_changed(e.clone()),
1106            PositionEvent::PositionClosed(e) => self.on_position_closed(e.clone()),
1107            PositionEvent::PositionAdjusted(_) => {}
1108        }
1109
1110        self.on_position_event(event);
1111    }
1112
1113    /// Called when the algorithm is started.
1114    ///
1115    /// Override this method to implement custom initialization logic.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if start fails.
1120    fn on_start(&mut self) -> anyhow::Result<()> {
1121        let id = self.id();
1122        log::info!("Starting {id}");
1123        Ok(())
1124    }
1125
1126    /// Called when the algorithm is stopped.
1127    ///
1128    /// # Errors
1129    ///
1130    /// Returns an error if stop fails.
1131    fn on_stop(&mut self) -> anyhow::Result<()> {
1132        Ok(())
1133    }
1134
1135    /// Called when the algorithm is reset.
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns an error if reset fails.
1140    fn on_reset(&mut self) -> anyhow::Result<()> {
1141        self.unsubscribe_all_strategy_events();
1142        self.core_mut().reset();
1143        Ok(())
1144    }
1145
1146    /// Called when a time event is received.
1147    ///
1148    /// Override this method for timer-based algorithms like TWAP.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if time event handling fails.
1153    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
1154        Ok(())
1155    }
1156
1157    /// Called when an order is initialized.
1158    #[allow(unused_variables)]
1159    fn on_order_initialized(&mut self, event: OrderInitialized) {}
1160
1161    /// Called when an order is denied.
1162    #[allow(unused_variables)]
1163    fn on_order_denied(&mut self, event: OrderDenied) {}
1164
1165    /// Called when an order is emulated.
1166    #[allow(unused_variables)]
1167    fn on_order_emulated(&mut self, event: OrderEmulated) {}
1168
1169    /// Called when an order is released from emulation.
1170    #[allow(unused_variables)]
1171    fn on_order_released(&mut self, event: OrderReleased) {}
1172
1173    /// Called when an order is submitted.
1174    #[allow(unused_variables)]
1175    fn on_order_submitted(&mut self, event: OrderSubmitted) {}
1176
1177    /// Called when an order is rejected.
1178    #[allow(unused_variables)]
1179    fn on_order_rejected(&mut self, event: OrderRejected) {}
1180
1181    /// Called when an order is accepted.
1182    #[allow(unused_variables)]
1183    fn on_order_accepted(&mut self, event: OrderAccepted) {}
1184
1185    /// Called when an order is canceled.
1186    #[allow(unused_variables)]
1187    fn on_algo_order_canceled(&mut self, event: OrderCanceled) {}
1188
1189    /// Called when an order expires.
1190    #[allow(unused_variables)]
1191    fn on_order_expired(&mut self, event: OrderExpired) {}
1192
1193    /// Called when an order is triggered.
1194    #[allow(unused_variables)]
1195    fn on_order_triggered(&mut self, event: OrderTriggered) {}
1196
1197    /// Called when an order modification is pending.
1198    #[allow(unused_variables)]
1199    fn on_order_pending_update(&mut self, event: OrderPendingUpdate) {}
1200
1201    /// Called when an order cancellation is pending.
1202    #[allow(unused_variables)]
1203    fn on_order_pending_cancel(&mut self, event: OrderPendingCancel) {}
1204
1205    /// Called when an order modification is rejected.
1206    #[allow(unused_variables)]
1207    fn on_order_modify_rejected(&mut self, event: OrderModifyRejected) {}
1208
1209    /// Called when an order cancellation is rejected.
1210    #[allow(unused_variables)]
1211    fn on_order_cancel_rejected(&mut self, event: OrderCancelRejected) {}
1212
1213    /// Called when an order is updated.
1214    #[allow(unused_variables)]
1215    fn on_order_updated(&mut self, event: OrderUpdated) {}
1216
1217    /// Called when an order is filled.
1218    #[allow(unused_variables)]
1219    fn on_algo_order_filled(&mut self, event: OrderFilled) {}
1220
1221    /// Called for any order event (after specific handler).
1222    #[allow(unused_variables)]
1223    fn on_order_event(&mut self, event: OrderEventAny) {}
1224
1225    /// Called when a position is opened.
1226    #[allow(unused_variables)]
1227    fn on_position_opened(&mut self, event: PositionOpened) {}
1228
1229    /// Called when a position is changed.
1230    #[allow(unused_variables)]
1231    fn on_position_changed(&mut self, event: PositionChanged) {}
1232
1233    /// Called when a position is closed.
1234    #[allow(unused_variables)]
1235    fn on_position_closed(&mut self, event: PositionClosed) {}
1236
1237    /// Called for any position event (after specific handler).
1238    #[allow(unused_variables)]
1239    fn on_position_event(&mut self, event: PositionEvent) {}
1240}
1241
1242fn publish_order_initialized(order: &OrderAny) {
1243    let event = OrderEventAny::Initialized(order.init_event().clone());
1244    publish_order_event(&event);
1245}
1246
1247fn publish_order_event(event: &OrderEventAny) {
1248    let topic = format!("events.order.{}", event.strategy_id());
1249    msgbus::publish_order_event(topic.into(), event);
1250}
1251
1252#[cfg(test)]
1253mod tests {
1254    use std::{cell::RefCell, rc::Rc};
1255
1256    use nautilus_common::{
1257        actor::DataActor, cache::Cache, clock::TestClock, component::Component,
1258        enums::ComponentTrigger, msgbus, msgbus::TypedHandler, nautilus_actor,
1259    };
1260    use nautilus_model::{
1261        enums::OrderSide,
1262        events::{
1263            OrderAccepted, OrderCanceled, OrderDenied, OrderRejected, order::spec::OrderFilledSpec,
1264        },
1265        identifiers::{
1266            AccountId, ClientOrderId, ExecAlgorithmId, InstrumentId, StrategyId, TraderId,
1267            VenueOrderId,
1268        },
1269        orders::{LimitOrder, MarketOrder, OrderAny, stubs::TestOrderStubs},
1270        types::{Price, Quantity},
1271    };
1272    use rstest::rstest;
1273
1274    use super::*;
1275
1276    #[derive(Debug)]
1277    struct TestAlgorithm {
1278        core: ExecutionAlgorithmCore,
1279        on_order_called: bool,
1280        last_order_client_id: Option<ClientOrderId>,
1281    }
1282
1283    impl TestAlgorithm {
1284        fn new(config: ExecutionAlgorithmConfig) -> Self {
1285            Self {
1286                core: ExecutionAlgorithmCore::new(config),
1287                on_order_called: false,
1288                last_order_client_id: None,
1289            }
1290        }
1291    }
1292
1293    impl DataActor for TestAlgorithm {}
1294
1295    nautilus_actor!(TestAlgorithm);
1296
1297    impl ExecutionAlgorithm for TestAlgorithm {
1298        fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
1299            &mut self.core
1300        }
1301
1302        fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()> {
1303            self.on_order_called = true;
1304            self.last_order_client_id = Some(order.client_order_id());
1305            Ok(())
1306        }
1307    }
1308
1309    fn create_test_algorithm() -> TestAlgorithm {
1310        // Use unique ID to avoid thread-local registry/msgbus conflicts in parallel tests
1311        let unique_id = format!("TEST-{}", UUID4::new());
1312        let config = ExecutionAlgorithmConfig {
1313            exec_algorithm_id: Some(ExecAlgorithmId::new(&unique_id)),
1314            ..Default::default()
1315        };
1316        TestAlgorithm::new(config)
1317    }
1318
1319    fn register_algorithm(algo: &mut TestAlgorithm) {
1320        let trader_id = TraderId::from("TRADER-001");
1321        let clock = Rc::new(RefCell::new(TestClock::new()));
1322        let cache = Rc::new(RefCell::new(Cache::default()));
1323
1324        algo.core.register(trader_id, clock, cache).unwrap();
1325
1326        // Transition to Running state for tests
1327        algo.transition_state(ComponentTrigger::Initialize).unwrap();
1328        algo.transition_state(ComponentTrigger::Start).unwrap();
1329        algo.transition_state(ComponentTrigger::StartCompleted)
1330            .unwrap();
1331    }
1332
1333    fn subscribe_order_topic(
1334        strategy_id: StrategyId,
1335    ) -> (TypedHandler<OrderEventAny>, Rc<RefCell<Vec<OrderEventAny>>>) {
1336        let events = Rc::new(RefCell::new(Vec::new()));
1337        let handler = TypedHandler::from({
1338            let events = events.clone();
1339            move |event: &OrderEventAny| {
1340                events.borrow_mut().push(event.clone());
1341            }
1342        });
1343        msgbus::subscribe_order_events(
1344            format!("events.order.{strategy_id}").into(),
1345            handler.clone(),
1346            None,
1347        );
1348        (handler, events)
1349    }
1350
1351    #[rstest]
1352    fn test_algorithm_creation() {
1353        let algo = create_test_algorithm();
1354        assert!(algo.core.exec_algorithm_id.inner().starts_with("TEST-"));
1355        assert!(!algo.on_order_called);
1356        assert!(algo.last_order_client_id.is_none());
1357    }
1358
1359    #[rstest]
1360    fn test_algorithm_registration() {
1361        let mut algo = create_test_algorithm();
1362        register_algorithm(&mut algo);
1363
1364        assert!(algo.core.trader_id().is_some());
1365        assert_eq!(algo.core.trader_id(), Some(TraderId::from("TRADER-001")));
1366    }
1367
1368    #[rstest]
1369    fn test_algorithm_id() {
1370        let mut algo = create_test_algorithm();
1371        assert!(algo.id().inner().starts_with("TEST-"));
1372    }
1373
1374    #[rstest]
1375    fn test_algorithm_spawn_market_creates_valid_order() {
1376        let mut algo = create_test_algorithm();
1377        register_algorithm(&mut algo);
1378
1379        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1380        let mut primary = OrderAny::Market(MarketOrder::new(
1381            TraderId::from("TRADER-001"),
1382            StrategyId::from("STRAT-001"),
1383            instrument_id,
1384            ClientOrderId::from("O-001"),
1385            OrderSide::Buy,
1386            Quantity::from("1.0"),
1387            TimeInForce::Gtc,
1388            UUID4::new(),
1389            0.into(),
1390            false, // reduce_only
1391            false, // quote_quantity
1392            None,  // contingency_type
1393            None,  // order_list_id
1394            None,  // linked_order_ids
1395            None,  // parent_order_id
1396            None,  // exec_algorithm_id
1397            None,  // exec_algorithm_params
1398            None,  // exec_spawn_id
1399            None,  // tags
1400        ));
1401
1402        let spawned = algo.spawn_market(
1403            &mut primary,
1404            Quantity::from("0.5"),
1405            TimeInForce::Ioc,
1406            false,
1407            None,  // tags
1408            false, // reduce_primary
1409        );
1410
1411        assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1412        assert_eq!(spawned.instrument_id, instrument_id);
1413        assert_eq!(spawned.order_side(), OrderSide::Buy);
1414        assert_eq!(spawned.quantity, Quantity::from("0.5"));
1415        assert_eq!(spawned.time_in_force, TimeInForce::Ioc);
1416        assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1417        assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1418    }
1419
1420    #[rstest]
1421    fn test_algorithm_spawn_increments_sequence() {
1422        let mut algo = create_test_algorithm();
1423        register_algorithm(&mut algo);
1424
1425        let mut primary = OrderAny::Market(MarketOrder::new(
1426            TraderId::from("TRADER-001"),
1427            StrategyId::from("STRAT-001"),
1428            InstrumentId::from("BTC/USDT.BINANCE"),
1429            ClientOrderId::from("O-001"),
1430            OrderSide::Buy,
1431            Quantity::from("1.0"),
1432            TimeInForce::Gtc,
1433            UUID4::new(),
1434            0.into(),
1435            false,
1436            false,
1437            None,
1438            None,
1439            None,
1440            None,
1441            None,
1442            None,
1443            None,
1444            None,
1445        ));
1446
1447        let spawned1 = algo.spawn_market(
1448            &mut primary,
1449            Quantity::from("0.25"),
1450            TimeInForce::Ioc,
1451            false,
1452            None,
1453            false,
1454        );
1455        let spawned2 = algo.spawn_market(
1456            &mut primary,
1457            Quantity::from("0.25"),
1458            TimeInForce::Ioc,
1459            false,
1460            None,
1461            false,
1462        );
1463        let spawned3 = algo.spawn_market(
1464            &mut primary,
1465            Quantity::from("0.25"),
1466            TimeInForce::Ioc,
1467            false,
1468            None,
1469            false,
1470        );
1471
1472        assert_eq!(spawned1.client_order_id.as_str(), "O-001-E1");
1473        assert_eq!(spawned2.client_order_id.as_str(), "O-001-E2");
1474        assert_eq!(spawned3.client_order_id.as_str(), "O-001-E3");
1475    }
1476
1477    #[rstest]
1478    fn test_algorithm_default_handlers_do_not_panic() {
1479        let mut algo = create_test_algorithm();
1480
1481        algo.on_order_initialized(OrderInitialized::default());
1482        algo.on_order_denied(OrderDenied::default());
1483        algo.on_order_emulated(OrderEmulated::default());
1484        algo.on_order_released(OrderReleased::default());
1485        algo.on_order_submitted(OrderSubmitted::default());
1486        algo.on_order_rejected(OrderRejected::default());
1487        algo.on_order_accepted(OrderAccepted::default());
1488        algo.on_algo_order_canceled(OrderCanceled::default());
1489        algo.on_order_expired(OrderExpired::default());
1490        algo.on_order_triggered(OrderTriggered::default());
1491        algo.on_order_pending_update(OrderPendingUpdate::default());
1492        algo.on_order_pending_cancel(OrderPendingCancel::default());
1493        algo.on_order_modify_rejected(OrderModifyRejected::default());
1494        algo.on_order_cancel_rejected(OrderCancelRejected::default());
1495        algo.on_order_updated(OrderUpdated::default());
1496        algo.on_algo_order_filled(OrderFilledSpec::builder().build());
1497    }
1498
1499    #[rstest]
1500    fn test_strategy_subscription_tracking() {
1501        let mut algo = create_test_algorithm();
1502        let strategy_id = StrategyId::from("STRAT-001");
1503
1504        assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1505
1506        algo.subscribe_to_strategy_events(strategy_id);
1507        assert!(algo.core.is_strategy_subscribed(&strategy_id));
1508
1509        // Second call should be idempotent
1510        algo.subscribe_to_strategy_events(strategy_id);
1511        assert!(algo.core.is_strategy_subscribed(&strategy_id));
1512    }
1513
1514    #[rstest]
1515    fn test_algorithm_reset() {
1516        let mut algo = create_test_algorithm();
1517        let strategy_id = StrategyId::from("STRAT-001");
1518        let primary_id = ClientOrderId::new("O-001");
1519
1520        let _ = algo.core.spawn_client_order_id(&primary_id);
1521        algo.core.add_subscribed_strategy(strategy_id);
1522
1523        assert!(algo.core.spawn_sequence(&primary_id).is_some());
1524        assert!(algo.core.is_strategy_subscribed(&strategy_id));
1525
1526        ExecutionAlgorithm::on_reset(&mut algo).unwrap();
1527
1528        assert!(algo.core.spawn_sequence(&primary_id).is_none());
1529        assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1530    }
1531
1532    #[rstest]
1533    fn test_algorithm_spawn_limit_creates_valid_order() {
1534        let mut algo = create_test_algorithm();
1535        register_algorithm(&mut algo);
1536
1537        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1538        let mut primary = OrderAny::Market(MarketOrder::new(
1539            TraderId::from("TRADER-001"),
1540            StrategyId::from("STRAT-001"),
1541            instrument_id,
1542            ClientOrderId::from("O-001"),
1543            OrderSide::Buy,
1544            Quantity::from("1.0"),
1545            TimeInForce::Gtc,
1546            UUID4::new(),
1547            0.into(),
1548            false,
1549            false,
1550            None,
1551            None,
1552            None,
1553            None,
1554            None,
1555            None,
1556            None,
1557            None,
1558        ));
1559
1560        let price = Price::from("50000.0");
1561        let spawned = algo.spawn_limit(
1562            &mut primary,
1563            Quantity::from("0.5"),
1564            price,
1565            TimeInForce::Gtc,
1566            None,  // expire_time
1567            false, // post_only
1568            false, // reduce_only
1569            None,  // display_qty
1570            None,  // emulation_trigger
1571            None,  // tags
1572            false, // reduce_primary
1573        );
1574
1575        assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1576        assert_eq!(spawned.instrument_id, instrument_id);
1577        assert_eq!(spawned.order_side(), OrderSide::Buy);
1578        assert_eq!(spawned.quantity, Quantity::from("0.5"));
1579        assert_eq!(spawned.price, price);
1580        assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1581        assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1582        assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1583    }
1584
1585    #[rstest]
1586    fn test_algorithm_spawn_market_to_limit_creates_valid_order() {
1587        let mut algo = create_test_algorithm();
1588        register_algorithm(&mut algo);
1589
1590        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1591        let mut primary = OrderAny::Market(MarketOrder::new(
1592            TraderId::from("TRADER-001"),
1593            StrategyId::from("STRAT-001"),
1594            instrument_id,
1595            ClientOrderId::from("O-001"),
1596            OrderSide::Buy,
1597            Quantity::from("1.0"),
1598            TimeInForce::Gtc,
1599            UUID4::new(),
1600            0.into(),
1601            false,
1602            false,
1603            None,
1604            None,
1605            None,
1606            None,
1607            None,
1608            None,
1609            None,
1610            None,
1611        ));
1612
1613        let spawned = algo.spawn_market_to_limit(
1614            &mut primary,
1615            Quantity::from("0.5"),
1616            TimeInForce::Gtc,
1617            None,  // expire_time
1618            false, // reduce_only
1619            None,  // display_qty
1620            None,  // emulation_trigger
1621            None,  // tags
1622            false, // reduce_primary
1623        );
1624
1625        assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1626        assert_eq!(spawned.instrument_id, instrument_id);
1627        assert_eq!(spawned.order_side(), OrderSide::Buy);
1628        assert_eq!(spawned.quantity, Quantity::from("0.5"));
1629        assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1630        assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1631        assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1632    }
1633
1634    #[rstest]
1635    fn test_algorithm_spawn_market_with_tags() {
1636        let mut algo = create_test_algorithm();
1637        register_algorithm(&mut algo);
1638
1639        let mut primary = OrderAny::Market(MarketOrder::new(
1640            TraderId::from("TRADER-001"),
1641            StrategyId::from("STRAT-001"),
1642            InstrumentId::from("BTC/USDT.BINANCE"),
1643            ClientOrderId::from("O-001"),
1644            OrderSide::Buy,
1645            Quantity::from("1.0"),
1646            TimeInForce::Gtc,
1647            UUID4::new(),
1648            0.into(),
1649            false,
1650            false,
1651            None,
1652            None,
1653            None,
1654            None,
1655            None,
1656            None,
1657            None,
1658            None,
1659        ));
1660
1661        let tags = vec![ustr::Ustr::from("TAG1"), ustr::Ustr::from("TAG2")];
1662        let spawned = algo.spawn_market(
1663            &mut primary,
1664            Quantity::from("0.5"),
1665            TimeInForce::Ioc,
1666            false,
1667            Some(tags.clone()),
1668            false,
1669        );
1670
1671        assert_eq!(spawned.tags, Some(tags));
1672    }
1673
1674    #[rstest]
1675    fn test_algorithm_spawn_propagates_primary_fields() {
1676        let mut algo = create_test_algorithm();
1677        register_algorithm(&mut algo);
1678
1679        let mut params = indexmap::IndexMap::new();
1680        params.insert(ustr::Ustr::from("horizon_secs"), ustr::Ustr::from("30"));
1681        params.insert(ustr::Ustr::from("interval_secs"), ustr::Ustr::from("10"));
1682        let primary_tags = vec![ustr::Ustr::from("PRIMARY_TAG")];
1683        let linked_order_ids = vec![ClientOrderId::from("LINK-1")];
1684
1685        let mut primary = OrderAny::Market(MarketOrder::new(
1686            TraderId::from("TRADER-001"),
1687            StrategyId::from("STRAT-001"),
1688            InstrumentId::from("BTC/USDT.BINANCE"),
1689            ClientOrderId::from("O-001"),
1690            OrderSide::Buy,
1691            Quantity::from("1.0"),
1692            TimeInForce::Gtc,
1693            UUID4::new(),
1694            0.into(),
1695            false, // reduce_only
1696            true,  // quote_quantity
1697            None,  // contingency_type
1698            None,  // order_list_id
1699            Some(linked_order_ids.clone()),
1700            None, // parent_order_id
1701            Some(algo.id()),
1702            Some(params.clone()),
1703            None, // exec_spawn_id
1704            Some(primary_tags.clone()),
1705        ));
1706
1707        let spawned_market = algo.spawn_market(
1708            &mut primary,
1709            Quantity::from("0.25"),
1710            TimeInForce::Ioc,
1711            false,
1712            None, // falls back to primary.tags
1713            false,
1714        );
1715        assert!(spawned_market.is_quote_quantity);
1716        assert_eq!(spawned_market.exec_algorithm_params, Some(params.clone()));
1717        assert_eq!(spawned_market.tags, Some(primary_tags.clone()));
1718        assert_eq!(
1719            spawned_market.linked_order_ids,
1720            Some(linked_order_ids.clone())
1721        );
1722
1723        let spawned_limit = algo.spawn_limit(
1724            &mut primary,
1725            Quantity::from("0.25"),
1726            Price::from("50000.0"),
1727            TimeInForce::Gtc,
1728            None,  // expire_time
1729            false, // post_only
1730            false, // reduce_only
1731            None,  // display_qty
1732            None,  // emulation_trigger
1733            None,  // falls back to primary.tags
1734            false,
1735        );
1736        assert!(spawned_limit.is_quote_quantity);
1737        assert_eq!(spawned_limit.exec_algorithm_params, Some(params.clone()));
1738        assert_eq!(spawned_limit.tags, Some(primary_tags.clone()));
1739        assert_eq!(
1740            spawned_limit.linked_order_ids,
1741            Some(linked_order_ids.clone())
1742        );
1743
1744        let spawned_mtl = algo.spawn_market_to_limit(
1745            &mut primary,
1746            Quantity::from("0.25"),
1747            TimeInForce::Gtc,
1748            None,  // expire_time
1749            false, // reduce_only
1750            None,  // display_qty
1751            None,  // emulation_trigger
1752            None,  // falls back to primary.tags
1753            false,
1754        );
1755        assert!(spawned_mtl.is_quote_quantity);
1756        assert_eq!(spawned_mtl.exec_algorithm_params, Some(params));
1757        assert_eq!(spawned_mtl.tags, Some(primary_tags));
1758        assert_eq!(spawned_mtl.linked_order_ids, Some(linked_order_ids));
1759    }
1760
1761    #[rstest]
1762    fn test_algorithm_reduce_primary_order() {
1763        let mut algo = create_test_algorithm();
1764        register_algorithm(&mut algo);
1765
1766        let order = OrderAny::Market(MarketOrder::new(
1767            TraderId::from("TRADER-001"),
1768            StrategyId::from("STRAT-001"),
1769            InstrumentId::from("BTC/USDT.BINANCE"),
1770            ClientOrderId::from("O-001"),
1771            OrderSide::Buy,
1772            Quantity::from("1.0"),
1773            TimeInForce::Gtc,
1774            UUID4::new(),
1775            0.into(),
1776            false,
1777            false,
1778            None,
1779            None,
1780            None,
1781            None,
1782            None,
1783            None,
1784            None,
1785            None,
1786        ));
1787
1788        // Make accepted so OrderUpdated can be applied
1789        let mut primary = TestOrderStubs::make_accepted_order(&order);
1790
1791        {
1792            let cache_rc = algo.core.cache_rc();
1793            let mut cache = cache_rc.borrow_mut();
1794            cache.add_order(primary.clone(), None, None, false).unwrap();
1795        }
1796
1797        let spawn_qty = Quantity::from("0.3");
1798        algo.reduce_primary_order(&mut primary, spawn_qty);
1799
1800        assert_eq!(primary.quantity(), Quantity::from("0.7"));
1801    }
1802
1803    #[rstest]
1804    fn test_algorithm_reduce_primary_order_publishes_updated_event() {
1805        let mut algo = create_test_algorithm();
1806        register_algorithm(&mut algo);
1807
1808        let strategy_id = StrategyId::from("STRAT-ALGO-REDUCE-PUBLISH");
1809        let order = OrderAny::Market(MarketOrder::new(
1810            TraderId::from("TRADER-001"),
1811            strategy_id,
1812            InstrumentId::from("BTC/USDT.BINANCE"),
1813            ClientOrderId::from("O-ALGO-REDUCE"),
1814            OrderSide::Buy,
1815            Quantity::from("1.0"),
1816            TimeInForce::Gtc,
1817            UUID4::new(),
1818            0.into(),
1819            false,
1820            false,
1821            None,
1822            None,
1823            None,
1824            None,
1825            None,
1826            None,
1827            None,
1828            None,
1829        ));
1830        let mut primary = TestOrderStubs::make_accepted_order(&order);
1831
1832        {
1833            let cache_rc = algo.core.cache_rc();
1834            let mut cache = cache_rc.borrow_mut();
1835            cache.add_order(primary.clone(), None, None, false).unwrap();
1836        }
1837
1838        let (handler, events) = subscribe_order_topic(strategy_id);
1839
1840        algo.reduce_primary_order(&mut primary, Quantity::from("0.3"));
1841
1842        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1843        let events = events.borrow();
1844
1845        assert_eq!(events.len(), 1);
1846        assert!(matches!(
1847            &events[0],
1848            OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
1849        ));
1850    }
1851
1852    #[rstest]
1853    fn test_algorithm_submit_order_publishes_initialized_for_new_order() {
1854        let mut algo = create_test_algorithm();
1855        register_algorithm(&mut algo);
1856
1857        let strategy_id = StrategyId::from("STRAT-ALGO-INIT-PUBLISH");
1858        let order = OrderAny::Market(MarketOrder::new(
1859            TraderId::from("TRADER-001"),
1860            strategy_id,
1861            InstrumentId::from("BTC/USDT.BINANCE"),
1862            ClientOrderId::from("O-ALGO-INIT"),
1863            OrderSide::Buy,
1864            Quantity::from("1.0"),
1865            TimeInForce::Gtc,
1866            UUID4::new(),
1867            0.into(),
1868            false,
1869            false,
1870            None,
1871            None,
1872            None,
1873            None,
1874            None,
1875            None,
1876            None,
1877            None,
1878        ));
1879        let (handler, events) = subscribe_order_topic(strategy_id);
1880
1881        algo.submit_order(order.clone(), None, None).unwrap();
1882
1883        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1884        let events = events.borrow();
1885
1886        assert_eq!(events.len(), 1);
1887        assert!(matches!(
1888            &events[0],
1889            OrderEventAny::Initialized(event) if event.client_order_id == order.client_order_id()
1890        ));
1891    }
1892
1893    #[rstest]
1894    fn test_algorithm_submit_order_does_not_republish_initialized_for_existing_order() {
1895        let mut algo = create_test_algorithm();
1896        register_algorithm(&mut algo);
1897
1898        let strategy_id = StrategyId::from("STRAT-ALGO-INIT-EXISTING");
1899        let order = OrderAny::Market(MarketOrder::new(
1900            TraderId::from("TRADER-001"),
1901            strategy_id,
1902            InstrumentId::from("BTC/USDT.BINANCE"),
1903            ClientOrderId::from("O-ALGO-INIT-EXISTING"),
1904            OrderSide::Buy,
1905            Quantity::from("1.0"),
1906            TimeInForce::Gtc,
1907            UUID4::new(),
1908            0.into(),
1909            false,
1910            false,
1911            None,
1912            None,
1913            None,
1914            None,
1915            None,
1916            None,
1917            None,
1918            None,
1919        ));
1920        {
1921            let cache_rc = algo.core.cache_rc();
1922            let mut cache = cache_rc.borrow_mut();
1923            cache.add_order(order.clone(), None, None, true).unwrap();
1924        }
1925        let (handler, events) = subscribe_order_topic(strategy_id);
1926
1927        algo.submit_order(order, None, None).unwrap();
1928
1929        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1930        assert!(events.borrow().is_empty());
1931    }
1932
1933    #[rstest]
1934    fn test_algorithm_spawn_market_with_reduce_primary() {
1935        let mut algo = create_test_algorithm();
1936        register_algorithm(&mut algo);
1937
1938        let order = OrderAny::Market(MarketOrder::new(
1939            TraderId::from("TRADER-001"),
1940            StrategyId::from("STRAT-001"),
1941            InstrumentId::from("BTC/USDT.BINANCE"),
1942            ClientOrderId::from("O-001"),
1943            OrderSide::Buy,
1944            Quantity::from("1.0"),
1945            TimeInForce::Gtc,
1946            UUID4::new(),
1947            0.into(),
1948            false,
1949            false,
1950            None,
1951            None,
1952            None,
1953            None,
1954            None,
1955            None,
1956            None,
1957            None,
1958        ));
1959
1960        // Make accepted so OrderUpdated can be applied
1961        let mut primary = TestOrderStubs::make_accepted_order(&order);
1962
1963        {
1964            let cache_rc = algo.core.cache_rc();
1965            let mut cache = cache_rc.borrow_mut();
1966            cache.add_order(primary.clone(), None, None, false).unwrap();
1967        }
1968
1969        let spawned = algo.spawn_market(
1970            &mut primary,
1971            Quantity::from("0.4"),
1972            TimeInForce::Ioc,
1973            false,
1974            None,
1975            true, // reduce_primary = true
1976        );
1977
1978        assert_eq!(spawned.quantity, Quantity::from("0.4"));
1979        assert_eq!(primary.quantity(), Quantity::from("0.6"));
1980    }
1981
1982    #[rstest]
1983    fn test_algorithm_generate_order_canceled() {
1984        let mut algo = create_test_algorithm();
1985        register_algorithm(&mut algo);
1986
1987        let order = OrderAny::Market(MarketOrder::new(
1988            TraderId::from("TRADER-001"),
1989            StrategyId::from("STRAT-001"),
1990            InstrumentId::from("BTC/USDT.BINANCE"),
1991            ClientOrderId::from("O-001"),
1992            OrderSide::Buy,
1993            Quantity::from("1.0"),
1994            TimeInForce::Gtc,
1995            UUID4::new(),
1996            0.into(),
1997            false,
1998            false,
1999            None,
2000            None,
2001            None,
2002            None,
2003            None,
2004            None,
2005            None,
2006            None,
2007        ));
2008
2009        let event = algo.generate_order_canceled(&order);
2010
2011        assert_eq!(event.trader_id, TraderId::from("TRADER-001"));
2012        assert_eq!(event.strategy_id, StrategyId::from("STRAT-001"));
2013        assert_eq!(event.instrument_id, InstrumentId::from("BTC/USDT.BINANCE"));
2014        assert_eq!(event.client_order_id, ClientOrderId::from("O-001"));
2015    }
2016
2017    #[rstest]
2018    fn test_algorithm_modify_order_in_place_updates_quantity() {
2019        let mut algo = create_test_algorithm();
2020        register_algorithm(&mut algo);
2021
2022        let strategy_id = StrategyId::from("STRAT-ALGO-MODIFY-IN-PLACE");
2023        let mut order = OrderAny::Limit(LimitOrder::new(
2024            TraderId::from("TRADER-001"),
2025            strategy_id,
2026            InstrumentId::from("BTC/USDT.BINANCE"),
2027            ClientOrderId::from("O-001"),
2028            OrderSide::Buy,
2029            Quantity::from("1.0"),
2030            Price::from("50000.0"),
2031            TimeInForce::Gtc,
2032            None,  // expire_time
2033            false, // post_only
2034            false, // reduce_only
2035            false, // quote_quantity
2036            None,  // display_qty
2037            None,  // emulation_trigger
2038            None,  // trigger_instrument_id
2039            None,  // contingency_type
2040            None,  // order_list_id
2041            None,  // linked_order_ids
2042            None,  // parent_order_id
2043            None,  // exec_algorithm_id
2044            None,  // exec_algorithm_params
2045            None,  // exec_spawn_id
2046            None,  // tags
2047            UUID4::new(),
2048            0.into(),
2049        ));
2050
2051        {
2052            let cache_rc = algo.core.cache_rc();
2053            let mut cache = cache_rc.borrow_mut();
2054            cache.add_order(order.clone(), None, None, false).unwrap();
2055        }
2056
2057        let new_qty = Quantity::from("0.5");
2058        let (handler, events) = subscribe_order_topic(strategy_id);
2059
2060        algo.modify_order_in_place(&mut order, Some(new_qty), None, None)
2061            .unwrap();
2062
2063        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2064        let events = events.borrow();
2065
2066        assert_eq!(order.quantity(), new_qty);
2067        assert_eq!(events.len(), 1);
2068        assert!(matches!(
2069            &events[0],
2070            OrderEventAny::Updated(event) if event.quantity == new_qty
2071        ));
2072    }
2073
2074    #[rstest]
2075    fn test_algorithm_modify_order_in_place_rejects_no_changes() {
2076        let mut algo = create_test_algorithm();
2077        register_algorithm(&mut algo);
2078
2079        let mut order = OrderAny::Limit(LimitOrder::new(
2080            TraderId::from("TRADER-001"),
2081            StrategyId::from("STRAT-001"),
2082            InstrumentId::from("BTC/USDT.BINANCE"),
2083            ClientOrderId::from("O-001"),
2084            OrderSide::Buy,
2085            Quantity::from("1.0"),
2086            Price::from("50000.0"),
2087            TimeInForce::Gtc,
2088            None,
2089            false,
2090            false,
2091            false,
2092            None,
2093            None,
2094            None,
2095            None,
2096            None,
2097            None,
2098            None,
2099            None,
2100            None,
2101            None,
2102            None,
2103            UUID4::new(),
2104            0.into(),
2105        ));
2106
2107        // Try to modify with same quantity - should fail
2108        let result =
2109            algo.modify_order_in_place(&mut order, Some(Quantity::from("1.0")), None, None);
2110
2111        assert!(result.is_err());
2112        assert!(
2113            result
2114                .unwrap_err()
2115                .to_string()
2116                .contains("no parameters differ")
2117        );
2118    }
2119
2120    #[rstest]
2121    fn test_spawned_order_denied_restores_primary_quantity() {
2122        let mut algo = create_test_algorithm();
2123        register_algorithm(&mut algo);
2124
2125        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2126        let exec_algorithm_id = algo.id();
2127
2128        let mut primary = OrderAny::Market(MarketOrder::new(
2129            TraderId::from("TRADER-001"),
2130            StrategyId::from("STRAT-001"),
2131            instrument_id,
2132            ClientOrderId::from("O-001"),
2133            OrderSide::Buy,
2134            Quantity::from("1.0"),
2135            TimeInForce::Gtc,
2136            UUID4::new(),
2137            0.into(),
2138            false,
2139            false,
2140            None,
2141            None,
2142            None,
2143            None,
2144            Some(exec_algorithm_id),
2145            None,
2146            None,
2147            None,
2148        ));
2149
2150        {
2151            let cache_rc = algo.core.cache_rc();
2152            let mut cache = cache_rc.borrow_mut();
2153            cache.add_order(primary.clone(), None, None, false).unwrap();
2154        }
2155
2156        let spawned = algo.spawn_market(
2157            &mut primary,
2158            Quantity::from("0.5"),
2159            TimeInForce::Fok,
2160            false,
2161            None,
2162            true,
2163        );
2164
2165        assert_eq!(primary.quantity(), Quantity::from("0.5"));
2166
2167        let spawned_order = OrderAny::Market(spawned);
2168        {
2169            let cache_rc = algo.core.cache_rc();
2170            let mut cache = cache_rc.borrow_mut();
2171            cache
2172                .add_order(spawned_order.clone(), None, None, false)
2173                .unwrap();
2174        }
2175
2176        let denied = OrderDenied::new(
2177            spawned_order.trader_id(),
2178            spawned_order.strategy_id(),
2179            spawned_order.instrument_id(),
2180            spawned_order.client_order_id(),
2181            "TEST_DENIAL".into(),
2182            UUID4::new(),
2183            0.into(),
2184            0.into(),
2185        );
2186
2187        {
2188            let cache_rc = algo.core.cache_rc();
2189            let mut cache = cache_rc.borrow_mut();
2190            cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2191        }
2192
2193        algo.handle_order_event(OrderEventAny::Denied(denied));
2194
2195        let restored_primary = {
2196            let cache = algo.core.cache();
2197            cache
2198                .order(&ClientOrderId::from("O-001"))
2199                .map(|o| o.clone())
2200                .unwrap()
2201        };
2202        assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2203    }
2204
2205    #[rstest]
2206    fn test_spawned_order_rejected_restores_primary_quantity() {
2207        let mut algo = create_test_algorithm();
2208        register_algorithm(&mut algo);
2209
2210        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2211        let exec_algorithm_id = algo.id();
2212
2213        let mut primary = OrderAny::Market(MarketOrder::new(
2214            TraderId::from("TRADER-001"),
2215            StrategyId::from("STRAT-001"),
2216            instrument_id,
2217            ClientOrderId::from("O-001"),
2218            OrderSide::Buy,
2219            Quantity::from("1.0"),
2220            TimeInForce::Gtc,
2221            UUID4::new(),
2222            0.into(),
2223            false,
2224            false,
2225            None,
2226            None,
2227            None,
2228            None,
2229            Some(exec_algorithm_id),
2230            None,
2231            None,
2232            None,
2233        ));
2234
2235        {
2236            let cache_rc = algo.core.cache_rc();
2237            let mut cache = cache_rc.borrow_mut();
2238            cache.add_order(primary.clone(), None, None, false).unwrap();
2239        }
2240
2241        let spawned = algo.spawn_market(
2242            &mut primary,
2243            Quantity::from("0.5"),
2244            TimeInForce::Fok,
2245            false,
2246            None,
2247            true,
2248        );
2249
2250        assert_eq!(primary.quantity(), Quantity::from("0.5"));
2251
2252        let spawned_order = OrderAny::Market(spawned);
2253        {
2254            let cache_rc = algo.core.cache_rc();
2255            let mut cache = cache_rc.borrow_mut();
2256            cache
2257                .add_order(spawned_order.clone(), None, None, false)
2258                .unwrap();
2259        }
2260
2261        let rejected = OrderRejected::new(
2262            spawned_order.trader_id(),
2263            spawned_order.strategy_id(),
2264            spawned_order.instrument_id(),
2265            spawned_order.client_order_id(),
2266            AccountId::from("BINANCE-001"),
2267            "TEST_REJECTION".into(),
2268            UUID4::new(),
2269            0.into(),
2270            0.into(),
2271            false,
2272            false,
2273        );
2274
2275        {
2276            let cache_rc = algo.core.cache_rc();
2277            let mut cache = cache_rc.borrow_mut();
2278            cache
2279                .update_order(&OrderEventAny::Rejected(rejected))
2280                .unwrap();
2281        }
2282
2283        algo.handle_order_event(OrderEventAny::Rejected(rejected));
2284
2285        let restored_primary = {
2286            let cache = algo.core.cache();
2287            cache
2288                .order(&ClientOrderId::from("O-001"))
2289                .map(|o| o.clone())
2290                .unwrap()
2291        };
2292        assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2293    }
2294
2295    #[rstest]
2296    fn test_spawned_order_with_reduce_primary_false_does_not_restore() {
2297        let mut algo = create_test_algorithm();
2298        register_algorithm(&mut algo);
2299
2300        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2301        let exec_algorithm_id = algo.id();
2302
2303        let mut primary = OrderAny::Market(MarketOrder::new(
2304            TraderId::from("TRADER-001"),
2305            StrategyId::from("STRAT-001"),
2306            instrument_id,
2307            ClientOrderId::from("O-001"),
2308            OrderSide::Buy,
2309            Quantity::from("1.0"),
2310            TimeInForce::Gtc,
2311            UUID4::new(),
2312            0.into(),
2313            false,
2314            false,
2315            None,
2316            None,
2317            None,
2318            None,
2319            Some(exec_algorithm_id),
2320            None,
2321            None,
2322            None,
2323        ));
2324
2325        {
2326            let cache_rc = algo.core.cache_rc();
2327            let mut cache = cache_rc.borrow_mut();
2328            cache.add_order(primary.clone(), None, None, false).unwrap();
2329        }
2330
2331        let spawned = algo.spawn_market(
2332            &mut primary,
2333            Quantity::from("0.5"),
2334            TimeInForce::Fok,
2335            false,
2336            None,
2337            false,
2338        );
2339
2340        assert_eq!(primary.quantity(), Quantity::from("1.0"));
2341
2342        let spawned_order = OrderAny::Market(spawned);
2343        {
2344            let cache_rc = algo.core.cache_rc();
2345            let mut cache = cache_rc.borrow_mut();
2346            cache
2347                .add_order(spawned_order.clone(), None, None, false)
2348                .unwrap();
2349        }
2350
2351        let denied = OrderDenied::new(
2352            spawned_order.trader_id(),
2353            spawned_order.strategy_id(),
2354            spawned_order.instrument_id(),
2355            spawned_order.client_order_id(),
2356            "TEST_DENIAL".into(),
2357            UUID4::new(),
2358            0.into(),
2359            0.into(),
2360        );
2361
2362        {
2363            let cache_rc = algo.core.cache_rc();
2364            let mut cache = cache_rc.borrow_mut();
2365            cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2366        }
2367
2368        algo.handle_order_event(OrderEventAny::Denied(denied));
2369
2370        let final_primary = {
2371            let cache = algo.core.cache();
2372            cache
2373                .order(&ClientOrderId::from("O-001"))
2374                .map(|o| o.clone())
2375                .unwrap()
2376        };
2377        assert_eq!(final_primary.quantity(), Quantity::from("1.0"));
2378    }
2379
2380    #[rstest]
2381    fn test_multiple_spawns_with_one_denied_restores_correctly() {
2382        let mut algo = create_test_algorithm();
2383        register_algorithm(&mut algo);
2384
2385        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2386        let exec_algorithm_id = algo.id();
2387
2388        let mut primary = OrderAny::Market(MarketOrder::new(
2389            TraderId::from("TRADER-001"),
2390            StrategyId::from("STRAT-001"),
2391            instrument_id,
2392            ClientOrderId::from("O-001"),
2393            OrderSide::Buy,
2394            Quantity::from("1.0"),
2395            TimeInForce::Gtc,
2396            UUID4::new(),
2397            0.into(),
2398            false,
2399            false,
2400            None,
2401            None,
2402            None,
2403            None,
2404            Some(exec_algorithm_id),
2405            None,
2406            None,
2407            None,
2408        ));
2409
2410        {
2411            let cache_rc = algo.core.cache_rc();
2412            let mut cache = cache_rc.borrow_mut();
2413            cache.add_order(primary.clone(), None, None, false).unwrap();
2414        }
2415
2416        let spawned1 = algo.spawn_market(
2417            &mut primary,
2418            Quantity::from("0.3"),
2419            TimeInForce::Fok,
2420            false,
2421            None,
2422            true,
2423        );
2424        let spawned2 = algo.spawn_market(
2425            &mut primary,
2426            Quantity::from("0.4"),
2427            TimeInForce::Fok,
2428            false,
2429            None,
2430            true,
2431        );
2432        assert_eq!(primary.quantity(), Quantity::from("0.3"));
2433
2434        let spawned_order1 = OrderAny::Market(spawned1);
2435        let spawned_order2 = OrderAny::Market(spawned2);
2436        {
2437            let cache_rc = algo.core.cache_rc();
2438            let mut cache = cache_rc.borrow_mut();
2439            cache.add_order(spawned_order1, None, None, false).unwrap();
2440            cache
2441                .add_order(spawned_order2.clone(), None, None, false)
2442                .unwrap();
2443        }
2444
2445        let denied = OrderDenied::new(
2446            spawned_order2.trader_id(),
2447            spawned_order2.strategy_id(),
2448            spawned_order2.instrument_id(),
2449            spawned_order2.client_order_id(),
2450            "TEST_DENIAL".into(),
2451            UUID4::new(),
2452            0.into(),
2453            0.into(),
2454        );
2455
2456        {
2457            let cache_rc = algo.core.cache_rc();
2458            let mut cache = cache_rc.borrow_mut();
2459            cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2460        }
2461
2462        let (handler, events) = subscribe_order_topic(spawned_order2.strategy_id());
2463
2464        algo.handle_order_event(OrderEventAny::Denied(denied));
2465
2466        msgbus::unsubscribe_order_events(
2467            format!("events.order.{}", spawned_order2.strategy_id()).into(),
2468            &handler,
2469        );
2470        let events = events.borrow();
2471
2472        let restored_primary = {
2473            let cache = algo.core.cache();
2474            cache
2475                .order(&ClientOrderId::from("O-001"))
2476                .map(|o| o.clone())
2477                .unwrap()
2478        };
2479        assert_eq!(restored_primary.quantity(), Quantity::from("0.7"));
2480        assert_eq!(events.len(), 1);
2481        assert!(matches!(
2482            &events[0],
2483            OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
2484        ));
2485    }
2486
2487    #[rstest]
2488    fn test_spawned_order_accepted_prevents_restoration() {
2489        let mut algo = create_test_algorithm();
2490        register_algorithm(&mut algo);
2491
2492        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2493        let exec_algorithm_id = algo.id();
2494
2495        let mut primary = OrderAny::Market(MarketOrder::new(
2496            TraderId::from("TRADER-001"),
2497            StrategyId::from("STRAT-001"),
2498            instrument_id,
2499            ClientOrderId::from("O-001"),
2500            OrderSide::Buy,
2501            Quantity::from("1.0"),
2502            TimeInForce::Gtc,
2503            UUID4::new(),
2504            0.into(),
2505            false,
2506            false,
2507            None,
2508            None,
2509            None,
2510            None,
2511            Some(exec_algorithm_id),
2512            None,
2513            None,
2514            None,
2515        ));
2516
2517        {
2518            let cache_rc = algo.core.cache_rc();
2519            let mut cache = cache_rc.borrow_mut();
2520            cache.add_order(primary.clone(), None, None, false).unwrap();
2521        }
2522
2523        let spawned = algo.spawn_market(
2524            &mut primary,
2525            Quantity::from("0.5"),
2526            TimeInForce::Fok,
2527            false,
2528            None,
2529            true,
2530        );
2531
2532        assert_eq!(primary.quantity(), Quantity::from("0.5"));
2533
2534        let mut spawned_order = OrderAny::Market(spawned);
2535        {
2536            let cache_rc = algo.core.cache_rc();
2537            let mut cache = cache_rc.borrow_mut();
2538            cache
2539                .add_order(spawned_order.clone(), None, None, false)
2540                .unwrap();
2541        }
2542
2543        let accepted = OrderAccepted::new(
2544            spawned_order.trader_id(),
2545            spawned_order.strategy_id(),
2546            spawned_order.instrument_id(),
2547            spawned_order.client_order_id(),
2548            VenueOrderId::from("V-123"),
2549            AccountId::from("BINANCE-001"),
2550            UUID4::new(),
2551            0.into(),
2552            0.into(),
2553            false,
2554        );
2555
2556        {
2557            let cache_rc = algo.core.cache_rc();
2558            let mut cache = cache_rc.borrow_mut();
2559            spawned_order = cache
2560                .update_order(&OrderEventAny::Accepted(accepted))
2561                .unwrap();
2562        }
2563
2564        algo.handle_order_event(OrderEventAny::Accepted(accepted));
2565
2566        let primary_after_accept = {
2567            let cache = algo.core.cache();
2568            cache
2569                .order(&ClientOrderId::from("O-001"))
2570                .map(|o| o.clone())
2571                .unwrap()
2572        };
2573        assert_eq!(primary_after_accept.quantity(), Quantity::from("0.5"));
2574
2575        // Cancel after acceptance - no restoration should occur
2576        let canceled = OrderCanceled::new(
2577            spawned_order.trader_id(),
2578            spawned_order.strategy_id(),
2579            spawned_order.instrument_id(),
2580            spawned_order.client_order_id(),
2581            UUID4::new(),
2582            0.into(),
2583            0.into(),
2584            false,
2585            Some(VenueOrderId::from("V-123")),
2586            Some(AccountId::from("BINANCE-001")),
2587        );
2588
2589        {
2590            let cache_rc = algo.core.cache_rc();
2591            let mut cache = cache_rc.borrow_mut();
2592            cache
2593                .update_order(&OrderEventAny::Canceled(canceled))
2594                .unwrap();
2595        }
2596
2597        algo.handle_order_event(OrderEventAny::Canceled(canceled));
2598
2599        let final_primary = {
2600            let cache = algo.core.cache();
2601            cache
2602                .order(&ClientOrderId::from("O-001"))
2603                .map(|o| o.clone())
2604                .unwrap()
2605        };
2606        assert_eq!(final_primary.quantity(), Quantity::from("0.5"));
2607    }
2608
2609    #[rstest]
2610    #[should_panic(expected = "exceeds primary leaves_qty")]
2611    fn test_spawn_quantity_exceeds_leaves_qty_panics() {
2612        let mut algo = create_test_algorithm();
2613        register_algorithm(&mut algo);
2614
2615        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2616        let exec_algorithm_id = algo.id();
2617
2618        let mut primary = OrderAny::Market(MarketOrder::new(
2619            TraderId::from("TRADER-001"),
2620            StrategyId::from("STRAT-001"),
2621            instrument_id,
2622            ClientOrderId::from("O-001"),
2623            OrderSide::Buy,
2624            Quantity::from("1.0"),
2625            TimeInForce::Gtc,
2626            UUID4::new(),
2627            0.into(),
2628            false,
2629            false,
2630            None,
2631            None,
2632            None,
2633            None,
2634            Some(exec_algorithm_id),
2635            None,
2636            None,
2637            None,
2638        ));
2639
2640        {
2641            let cache_rc = algo.core.cache_rc();
2642            let mut cache = cache_rc.borrow_mut();
2643            cache.add_order(primary.clone(), None, None, false).unwrap();
2644        }
2645
2646        let _ = algo.spawn_market(
2647            &mut primary,
2648            Quantity::from("0.8"),
2649            TimeInForce::Fok,
2650            false,
2651            None,
2652            true,
2653        );
2654
2655        assert_eq!(primary.quantity(), Quantity::from("0.2"));
2656        assert_eq!(primary.leaves_qty(), Quantity::from("0.2"));
2657
2658        // Should panic - spawning 0.5 when only 0.2 leaves_qty remains
2659        let _ = algo.spawn_market(
2660            &mut primary,
2661            Quantity::from("0.5"),
2662            TimeInForce::Fok,
2663            false,
2664            None,
2665            true,
2666        );
2667    }
2668}