Skip to main content

nautilus_trading/algorithm/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Execution algorithm infrastructure for order slicing and execution optimization.
17//!
18//! This module provides the [`ExecutionAlgorithm`] trait and supporting infrastructure
19//! for implementing algorithms like TWAP (Time-Weighted Average Price) and VWAP
20//! (Volume-Weighted Average Price) that slice large orders into smaller child orders.
21//!
22//! # Architecture
23//!
24//! Execution algorithms extend [`DataActor`] (not [`Strategy`](super::Strategy)) because:
25//! - They don't own positions (the parent Strategy does).
26//! - Spawned orders carry the parent Strategy's ID, not the algorithm's ID.
27//! - They act as order processors/transformers, not position managers.
28//!
29//! # Order Flow
30//!
31//! 1. A Strategy submits an order with `exec_algorithm_id` set.
32//! 2. The order is routed to the algorithm's `{id}.execute` endpoint.
33//! 3. The algorithm receives the order via `on_order()`.
34//! 4. The algorithm spawns child orders using `spawn_market()`, `spawn_limit()`, etc.
35//! 5. Spawned orders are submitted through the RiskEngine.
36//! 6. The algorithm receives fill events and manages remaining quantity.
37
38pub mod config;
39pub mod core;
40pub mod twap;
41
42pub use core::{ExecutionAlgorithmCore, StrategyEventHandlers};
43
44pub use config::{ExecutionAlgorithmConfig, ImportableExecAlgorithmConfig};
45use nautilus_common::{
46    actor::{DataActor, registry::try_get_actor_unchecked},
47    enums::ComponentState,
48    logging::{CMD, EVT, RECV, SEND},
49    messages::execution::{CancelOrder, ModifyOrder, SubmitOrder, TradingCommand},
50    msgbus::{self, MessagingSwitchboard, TypedHandler},
51    timer::TimeEvent,
52};
53use nautilus_core::{UUID4, UnixNanos};
54use nautilus_model::{
55    enums::{OrderStatus, TimeInForce, TriggerType},
56    events::{
57        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderDenied, OrderEmulated,
58        OrderEventAny, OrderExpired, OrderFilled, OrderInitialized, OrderModifyRejected,
59        OrderPendingCancel, OrderPendingUpdate, OrderRejected, OrderReleased, OrderSubmitted,
60        OrderTriggered, OrderUpdated, PositionChanged, PositionClosed, PositionEvent,
61        PositionOpened,
62    },
63    identifiers::{AccountId, ClientId, ExecAlgorithmId, PositionId, StrategyId, TraderId},
64    orders::{LimitOrder, MarketOrder, MarketToLimitOrder, Order, OrderAny, OrderError, OrderList},
65    types::{Price, Quantity},
66};
67pub use twap::{TwapAlgorithm, TwapAlgorithmConfig};
68use ustr::Ustr;
69
70/// Core trait for implementing execution algorithms in NautilusTrader.
71///
72/// Execution algorithms are specialized [`DataActor`]s that receive orders from strategies
73/// and execute them by spawning child orders. They are used for order slicing algorithms
74/// like TWAP and VWAP.
75///
76/// # Key Capabilities
77///
78/// - All [`DataActor`] capabilities (data subscriptions, event handling, timers)
79/// - Order spawning (market, limit, market-to-limit)
80/// - Order lifecycle management (submit, modify, cancel)
81/// - Event filtering for algorithm-owned orders
82///
83/// # Implementation
84///
85/// User algorithms should implement the required methods and hold an
86/// [`ExecutionAlgorithmCore`] member. The struct should `Deref` and `DerefMut`
87/// to `ExecutionAlgorithmCore` (which itself derefs to `DataActorCore`).
88pub trait ExecutionAlgorithm: DataActor {
89    /// Provides mutable access to the internal `ExecutionAlgorithmCore`.
90    fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore;
91
92    /// Returns the execution algorithm ID.
93    fn id(&mut self) -> ExecAlgorithmId {
94        self.core_mut().exec_algorithm_id
95    }
96
97    /// Executes a trading command.
98    ///
99    /// This is the main entry point for commands routed to the algorithm.
100    /// Dispatches to the appropriate handler based on command type.
101    ///
102    /// Commands are only processed when the algorithm is in `Running` state.
103    ///
104    /// # Errors
105    ///
106    /// Returns an error if command handling fails.
107    fn execute(&mut self, command: TradingCommand) -> anyhow::Result<()>
108    where
109        Self: 'static + std::fmt::Debug + Sized,
110    {
111        let core = self.core_mut();
112        if core.config.log_commands {
113            let id = &core.actor.actor_id;
114            log::info!("{id} {RECV}{CMD} {command:?}");
115        }
116
117        if core.state() != ComponentState::Running {
118            return Ok(());
119        }
120
121        match command {
122            TradingCommand::SubmitOrder(cmd) => {
123                self.subscribe_to_strategy_events(cmd.strategy_id);
124                let order = self.core_mut().get_order(&cmd.client_order_id)?;
125                self.on_order(order)
126            }
127            TradingCommand::SubmitOrderList(cmd) => {
128                self.subscribe_to_strategy_events(cmd.strategy_id);
129                let orders = self.core_mut().get_orders_for_list(&cmd.order_list)?;
130                self.on_order_list(cmd.order_list, orders)
131            }
132            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(cmd),
133            _ => {
134                log::warn!("Unhandled command type: {command:?}");
135                Ok(())
136            }
137        }
138    }
139
140    /// Called when a primary order is received for execution.
141    ///
142    /// Override this method to implement the algorithm's order slicing logic.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if order handling fails.
147    fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()>;
148
149    /// Called when an order list is received for execution.
150    ///
151    /// Override this method to handle order lists. The default implementation
152    /// processes each order individually.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if order list handling fails.
157    fn on_order_list(
158        &mut self,
159        _order_list: OrderList,
160        orders: Vec<OrderAny>,
161    ) -> anyhow::Result<()> {
162        for order in orders {
163            self.on_order(order)?;
164        }
165        Ok(())
166    }
167
168    /// Handles a cancel order command for algorithm-managed orders.
169    ///
170    /// This generates an internal cancel event and publishes it. The order
171    /// is canceled locally without sending a command to the execution engine.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if cancellation fails.
176    fn handle_cancel_order(&mut self, command: CancelOrder) -> anyhow::Result<()> {
177        let (order, is_pending_cancel) = {
178            let cache = self.core_mut().cache();
179
180            let Some(order) = cache.order(&command.client_order_id) else {
181                log::warn!(
182                    "Cannot cancel order: {} not found in cache",
183                    command.client_order_id
184                );
185                return Ok(());
186            };
187
188            let is_pending = cache.is_order_pending_cancel_local(&command.client_order_id);
189            (order.clone(), is_pending)
190        };
191
192        if is_pending_cancel {
193            return Ok(());
194        }
195
196        if order.is_closed() {
197            log::warn!("Order already closed for {command:?}");
198            return Ok(());
199        }
200
201        let event = OrderEventAny::Canceled(self.generate_order_canceled(&order));
202
203        let order = {
204            let cache_rc = self.core_mut().cache_rc();
205            let mut cache = cache_rc.borrow_mut();
206            match cache.update_order(&event) {
207                Ok(order) => order,
208                Err(e)
209                    if matches!(
210                        e.downcast_ref::<OrderError>(),
211                        Some(OrderError::InvalidStateTransition)
212                    ) =>
213                {
214                    log::warn!("InvalidStateTrigger: {e}, did not apply cancel event");
215                    return Ok(());
216                }
217                Err(e) => return Err(e),
218            }
219        };
220
221        let topic = format!("events.order.{}", order.strategy_id());
222        msgbus::publish_order_event(topic.into(), &event);
223
224        Ok(())
225    }
226
227    /// Generates an OrderCanceled event for an order.
228    fn generate_order_canceled(&mut self, order: &OrderAny) -> OrderCanceled {
229        let ts_now = self.core_mut().clock().timestamp_ns();
230
231        OrderCanceled::new(
232            order.trader_id(),
233            order.strategy_id(),
234            order.instrument_id(),
235            order.client_order_id(),
236            UUID4::new(),
237            ts_now,
238            ts_now,
239            false, // reconciliation
240            order.venue_order_id(),
241            order.account_id(),
242        )
243    }
244
245    /// Generates an OrderPendingUpdate event for an order.
246    fn generate_order_pending_update(&mut self, order: &OrderAny) -> OrderPendingUpdate {
247        let ts_now = self.core_mut().clock().timestamp_ns();
248
249        OrderPendingUpdate::new(
250            order.trader_id(),
251            order.strategy_id(),
252            order.instrument_id(),
253            order.client_order_id(),
254            order
255                .account_id()
256                .expect("Order must have account_id for pending update"),
257            UUID4::new(),
258            ts_now,
259            ts_now,
260            false, // reconciliation
261            order.venue_order_id(),
262        )
263    }
264
265    /// Generates an OrderPendingCancel event for an order.
266    fn generate_order_pending_cancel(&mut self, order: &OrderAny) -> OrderPendingCancel {
267        let ts_now = self.core_mut().clock().timestamp_ns();
268
269        OrderPendingCancel::new(
270            order.trader_id(),
271            order.strategy_id(),
272            order.instrument_id(),
273            order.client_order_id(),
274            order
275                .account_id()
276                .expect("Order must have account_id for pending cancel"),
277            UUID4::new(),
278            ts_now,
279            ts_now,
280            false, // reconciliation
281            order.venue_order_id(),
282        )
283    }
284
285    /// Spawns a market order from a primary order.
286    ///
287    /// Creates a new market order with:
288    /// - A unique client order ID: `{primary_id}-E{sequence}`.
289    /// - The primary order's trader ID, strategy ID, and instrument ID.
290    /// - The algorithm's exec_algorithm_id.
291    /// - exec_spawn_id set to the primary order's client order ID.
292    ///
293    /// If `reduce_primary` is true, the primary order's quantity will be reduced
294    /// by the spawned quantity. If the spawned order is subsequently denied or
295    /// rejected (before acceptance), the deducted quantity is automatically
296    /// restored to the primary order.
297    fn spawn_market(
298        &mut self,
299        primary: &mut OrderAny,
300        quantity: Quantity,
301        time_in_force: TimeInForce,
302        reduce_only: bool,
303        tags: Option<Vec<Ustr>>,
304        reduce_primary: bool,
305    ) -> MarketOrder {
306        // Generate spawn ID first so we can track the reduction
307        let core = self.core_mut();
308        let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
309        let ts_init = core.clock().timestamp_ns();
310        let exec_algorithm_id = core.exec_algorithm_id;
311
312        if reduce_primary {
313            self.reduce_primary_order(primary, quantity);
314            self.core_mut()
315                .track_pending_spawn_reduction(client_order_id, quantity);
316        }
317
318        MarketOrder::new(
319            primary.trader_id(),
320            primary.strategy_id(),
321            primary.instrument_id(),
322            client_order_id,
323            primary.order_side(),
324            quantity,
325            time_in_force,
326            UUID4::new(),
327            ts_init,
328            reduce_only,
329            primary.is_quote_quantity(),
330            primary.contingency_type(),
331            primary.order_list_id(),
332            primary.linked_order_ids().map(|ids| ids.to_vec()),
333            primary.parent_order_id(),
334            Some(exec_algorithm_id),
335            primary.exec_algorithm_params().cloned(),
336            Some(primary.client_order_id()),
337            tags.or_else(|| primary.tags().map(|t| t.to_vec())),
338        )
339    }
340
341    /// Spawns a limit order from a primary order.
342    ///
343    /// Creates a new limit order with:
344    /// - A unique client order ID: `{primary_id}-E{sequence}`
345    /// - The primary order's trader ID, strategy ID, and instrument ID
346    /// - The algorithm's exec_algorithm_id
347    /// - exec_spawn_id set to the primary order's client order ID
348    ///
349    /// If `reduce_primary` is true, the primary order's quantity will be reduced
350    /// by the spawned quantity. If the spawned order is subsequently denied or
351    /// rejected (before acceptance), the deducted quantity is automatically
352    /// restored to the primary order.
353    #[expect(clippy::too_many_arguments)]
354    fn spawn_limit(
355        &mut self,
356        primary: &mut OrderAny,
357        quantity: Quantity,
358        price: Price,
359        time_in_force: TimeInForce,
360        expire_time: Option<UnixNanos>,
361        post_only: bool,
362        reduce_only: bool,
363        display_qty: Option<Quantity>,
364        emulation_trigger: Option<TriggerType>,
365        tags: Option<Vec<Ustr>>,
366        reduce_primary: bool,
367    ) -> LimitOrder {
368        // Generate spawn ID first so we can track the reduction
369        let core = self.core_mut();
370        let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
371        let ts_init = core.clock().timestamp_ns();
372        let exec_algorithm_id = core.exec_algorithm_id;
373
374        if reduce_primary {
375            self.reduce_primary_order(primary, quantity);
376            self.core_mut()
377                .track_pending_spawn_reduction(client_order_id, quantity);
378        }
379
380        LimitOrder::new(
381            primary.trader_id(),
382            primary.strategy_id(),
383            primary.instrument_id(),
384            client_order_id,
385            primary.order_side(),
386            quantity,
387            price,
388            time_in_force,
389            expire_time,
390            post_only,
391            reduce_only,
392            primary.is_quote_quantity(),
393            display_qty,
394            emulation_trigger,
395            None, // trigger_instrument_id
396            primary.contingency_type(),
397            primary.order_list_id(),
398            primary.linked_order_ids().map(|ids| ids.to_vec()),
399            primary.parent_order_id(),
400            Some(exec_algorithm_id),
401            primary.exec_algorithm_params().cloned(),
402            Some(primary.client_order_id()),
403            tags.or_else(|| primary.tags().map(|t| t.to_vec())),
404            UUID4::new(),
405            ts_init,
406        )
407    }
408
409    /// Spawns a market-to-limit order from a primary order.
410    ///
411    /// Creates a new market-to-limit order with:
412    /// - A unique client order ID: `{primary_id}-E{sequence}`
413    /// - The primary order's trader ID, strategy ID, and instrument ID
414    /// - The algorithm's exec_algorithm_id
415    /// - exec_spawn_id set to the primary order's client order ID
416    ///
417    /// If `reduce_primary` is true, the primary order's quantity will be reduced
418    /// by the spawned quantity. If the spawned order is subsequently denied or
419    /// rejected (before acceptance), the deducted quantity is automatically
420    /// restored to the primary order.
421    #[expect(clippy::too_many_arguments)]
422    fn spawn_market_to_limit(
423        &mut self,
424        primary: &mut OrderAny,
425        quantity: Quantity,
426        time_in_force: TimeInForce,
427        expire_time: Option<UnixNanos>,
428        reduce_only: bool,
429        display_qty: Option<Quantity>,
430        emulation_trigger: Option<TriggerType>,
431        tags: Option<Vec<Ustr>>,
432        reduce_primary: bool,
433    ) -> MarketToLimitOrder {
434        // Generate spawn ID first so we can track the reduction
435        let core = self.core_mut();
436        let client_order_id = core.spawn_client_order_id(&primary.client_order_id());
437        let ts_init = core.clock().timestamp_ns();
438        let exec_algorithm_id = core.exec_algorithm_id;
439
440        if reduce_primary {
441            self.reduce_primary_order(primary, quantity);
442            self.core_mut()
443                .track_pending_spawn_reduction(client_order_id, quantity);
444        }
445
446        let mut order = MarketToLimitOrder::new(
447            primary.trader_id(),
448            primary.strategy_id(),
449            primary.instrument_id(),
450            client_order_id,
451            primary.order_side(),
452            quantity,
453            time_in_force,
454            expire_time,
455            false, // post_only
456            reduce_only,
457            primary.is_quote_quantity(),
458            display_qty,
459            primary.contingency_type(),
460            primary.order_list_id(),
461            primary.linked_order_ids().map(|ids| ids.to_vec()),
462            primary.parent_order_id(),
463            Some(exec_algorithm_id),
464            primary.exec_algorithm_params().cloned(),
465            Some(primary.client_order_id()),
466            tags.or_else(|| primary.tags().map(|t| t.to_vec())),
467            UUID4::new(),
468            ts_init,
469        );
470
471        if emulation_trigger.is_some() {
472            order.set_emulation_trigger(emulation_trigger);
473        }
474
475        order
476    }
477
478    /// Reduces the primary order's quantity by the spawn quantity.
479    ///
480    /// Generates an `OrderUpdated` event and applies it to the primary order,
481    /// then updates the order in the cache.
482    ///
483    /// # Panics
484    ///
485    /// Panics if `spawn_qty` exceeds the primary order's `leaves_qty`.
486    fn reduce_primary_order(&mut self, primary: &mut OrderAny, spawn_qty: Quantity) {
487        let leaves_qty = primary.leaves_qty();
488        assert!(
489            leaves_qty >= spawn_qty,
490            "Spawn quantity {spawn_qty} exceeds primary leaves_qty {leaves_qty}"
491        );
492
493        let primary_qty = primary.quantity();
494        let new_qty = Quantity::from_raw(primary_qty.raw - spawn_qty.raw, primary_qty.precision);
495
496        let core = self.core_mut();
497        let ts_now = core.clock().timestamp_ns();
498
499        let updated = OrderUpdated::new(
500            primary.trader_id(),
501            primary.strategy_id(),
502            primary.instrument_id(),
503            primary.client_order_id(),
504            new_qty,
505            UUID4::new(),
506            ts_now,
507            ts_now,
508            false, // reconciliation
509            primary.venue_order_id(),
510            primary.account_id(),
511            None, // price
512            None, // trigger_price
513            None, // protection_price
514            primary.is_quote_quantity(),
515        );
516
517        let event = OrderEventAny::Updated(updated);
518
519        {
520            let cache_rc = core.cache_rc();
521            let mut cache = cache_rc.borrow_mut();
522            *primary = cache
523                .update_order(&event)
524                .expect("Failed to update order in cache");
525        }
526
527        publish_order_event(&event);
528    }
529
530    /// Restores the primary order quantity after a spawned order is denied or rejected.
531    ///
532    /// This is called when a spawned order fails before acceptance. The quantity
533    /// that was deducted from the primary order is restored (up to the spawned
534    /// order's leaves_qty to handle partial fills).
535    fn restore_primary_order_quantity(&mut self, order: &OrderAny) {
536        let Some(exec_spawn_id) = order.exec_spawn_id() else {
537            return;
538        };
539
540        let reduction_qty = {
541            let core = self.core_mut();
542            core.take_pending_spawn_reduction(&order.client_order_id())
543        };
544
545        let Some(reduction_qty) = reduction_qty else {
546            return;
547        };
548
549        let primary = {
550            let cache = self.core_mut().cache();
551            cache.order(&exec_spawn_id).map(|o| o.clone())
552        };
553
554        let Some(primary) = primary else {
555            log::warn!(
556                "Cannot restore primary order quantity: primary order {exec_spawn_id} not found",
557            );
558            return;
559        };
560
561        // Cap restore amount by leaves_qty to handle partial fills before rejection
562        let restore_raw = std::cmp::min(reduction_qty.raw, order.leaves_qty().raw);
563        if restore_raw == 0 {
564            return;
565        }
566
567        let restored_qty = Quantity::from_raw(
568            primary.quantity().raw + restore_raw,
569            primary.quantity().precision,
570        );
571
572        let core = self.core_mut();
573        let ts_now = core.clock().timestamp_ns();
574
575        let updated = OrderUpdated::new(
576            primary.trader_id(),
577            primary.strategy_id(),
578            primary.instrument_id(),
579            primary.client_order_id(),
580            restored_qty,
581            UUID4::new(),
582            ts_now,
583            ts_now,
584            false, // reconciliation
585            primary.venue_order_id(),
586            primary.account_id(),
587            None, // price
588            None, // trigger_price
589            None, // protection_price
590            primary.is_quote_quantity(),
591        );
592
593        let event = OrderEventAny::Updated(updated);
594
595        let primary = {
596            let cache_rc = core.cache_rc();
597            let mut cache = cache_rc.borrow_mut();
598            match cache.update_order(&event) {
599                Ok(primary) => primary,
600                Err(e) => {
601                    log::warn!("Failed to update primary order in cache: {e}");
602                    return;
603                }
604            }
605        };
606
607        publish_order_event(&event);
608
609        log::info!(
610            "Restored primary order {} quantity to {} after spawned order {} was denied/rejected",
611            primary.client_order_id(),
612            restored_qty,
613            order.client_order_id()
614        );
615    }
616
617    /// Submits an order to the execution engine via the risk engine.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if order submission fails.
622    fn submit_order(
623        &mut self,
624        order: OrderAny,
625        position_id: Option<PositionId>,
626        client_id: Option<ClientId>,
627    ) -> anyhow::Result<()> {
628        let core = self.core_mut();
629
630        let trader_id = registered_trader_id(core)?;
631        let ts_init = core.clock().timestamp_ns();
632
633        // For spawned orders, use the parent's strategy ID
634        let strategy_id = order.strategy_id();
635
636        let order_exists = {
637            let cache = core.cache();
638            cache.order_exists(&order.client_order_id())
639        };
640
641        {
642            let cache_rc = core.cache_rc();
643            let mut cache = cache_rc.borrow_mut();
644            cache.add_order(order.clone(), position_id, client_id, true)?;
645        }
646
647        if !order_exists {
648            publish_order_initialized(&order);
649        }
650
651        let command = SubmitOrder::new(
652            trader_id,
653            client_id,
654            strategy_id,
655            order.instrument_id(),
656            order.client_order_id(),
657            order.init_event().clone(),
658            order.exec_algorithm_id(),
659            position_id,
660            None, // params
661            UUID4::new(),
662            ts_init,
663            None, // correlation_id
664        );
665
666        if core.config.log_commands {
667            let id = &core.actor.actor_id;
668            log::info!("{id} {SEND}{CMD} {command:?}");
669        }
670
671        msgbus::send_trading_command(
672            MessagingSwitchboard::risk_engine_execute(),
673            TradingCommand::SubmitOrder(command),
674        );
675
676        Ok(())
677    }
678
679    /// Modifies an order.
680    ///
681    /// # Errors
682    ///
683    /// Returns an error if order modification fails.
684    fn modify_order(
685        &mut self,
686        order: &mut OrderAny,
687        quantity: Option<Quantity>,
688        price: Option<Price>,
689        trigger_price: Option<Price>,
690        client_id: Option<ClientId>,
691    ) -> anyhow::Result<()> {
692        let qty_changing = quantity.is_some_and(|q| q != order.quantity());
693        let price_changing = price.is_some() && price != order.price();
694        let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
695
696        if !qty_changing && !price_changing && !trigger_changing {
697            log::error!(
698                "Cannot create command ModifyOrder: \
699                quantity, price and trigger were either None \
700                or the same as existing values"
701            );
702            return Ok(());
703        }
704
705        if order.is_closed() || order.is_pending_cancel() {
706            log::warn!(
707                "Cannot create command ModifyOrder: state is {:?}, {order:?}",
708                order.status()
709            );
710            return Ok(());
711        }
712
713        let core = self.core_mut();
714        let trader_id = registered_trader_id(core)?;
715        let strategy_id = order.strategy_id();
716
717        if !order.is_active_local() {
718            required_account_id(order, "pending update")?;
719            let event = self.generate_order_pending_update(order);
720            let event = OrderEventAny::PendingUpdate(event);
721
722            {
723                let cache_rc = self.core_mut().cache_rc();
724                let mut cache = cache_rc.borrow_mut();
725                match cache.update_order(&event) {
726                    Ok(updated) => *order = updated,
727                    Err(e)
728                        if matches!(
729                            e.downcast_ref::<OrderError>(),
730                            Some(OrderError::InvalidStateTransition)
731                        ) =>
732                    {
733                        log::warn!("InvalidStateTrigger: {e}, did not apply pending update event");
734                        return Ok(());
735                    }
736                    Err(e) => return Err(e),
737                }
738            }
739
740            let topic = format!("events.order.{strategy_id}");
741            msgbus::publish_order_event(topic.into(), &event);
742        }
743
744        let ts_init = self.core_mut().clock().timestamp_ns();
745        let command = ModifyOrder::new(
746            trader_id,
747            client_id,
748            strategy_id,
749            order.instrument_id(),
750            order.client_order_id(),
751            order.venue_order_id(),
752            quantity,
753            price,
754            trigger_price,
755            UUID4::new(),
756            ts_init,
757            None, // params,
758            None, // correlation_id
759        );
760
761        if self.core_mut().config.log_commands {
762            let id = &self.core_mut().actor.actor_id;
763            log::info!("{id} {SEND}{CMD} {command:?}");
764        }
765
766        let has_emulation_trigger = order
767            .emulation_trigger()
768            .is_some_and(|t| t != TriggerType::NoTrigger);
769
770        if order.is_emulated() || has_emulation_trigger {
771            msgbus::send_trading_command(
772                MessagingSwitchboard::order_emulator_execute(),
773                TradingCommand::ModifyOrder(command),
774            );
775        } else {
776            msgbus::send_trading_command(
777                MessagingSwitchboard::risk_engine_execute(),
778                TradingCommand::ModifyOrder(command),
779            );
780        }
781
782        Ok(())
783    }
784
785    /// Modifies an INITIALIZED or RELEASED order in place without sending a command.
786    ///
787    /// This is useful for adjusting order parameters before submission. The order
788    /// is updated locally by applying an `OrderUpdated` event and updating the cache.
789    ///
790    /// At least one parameter must differ from the current order values.
791    ///
792    /// # Errors
793    ///
794    /// Returns an error if the order status is not INITIALIZED or RELEASED,
795    /// or if no parameters would change.
796    fn modify_order_in_place(
797        &mut self,
798        order: &mut OrderAny,
799        quantity: Option<Quantity>,
800        price: Option<Price>,
801        trigger_price: Option<Price>,
802    ) -> anyhow::Result<()> {
803        // Validate order status
804        let status = order.status();
805        if status != OrderStatus::Initialized && status != OrderStatus::Released {
806            anyhow::bail!(
807                "Cannot modify order in place: status is {status:?}, expected INITIALIZED or RELEASED"
808            );
809        }
810
811        // Validate order type compatibility
812        if price.is_some() && order.price().is_none() {
813            anyhow::bail!(
814                "Cannot modify order in place: {} orders do not have a LIMIT price",
815                order.order_type()
816            );
817        }
818
819        if trigger_price.is_some() && order.trigger_price().is_none() {
820            anyhow::bail!(
821                "Cannot modify order in place: {} orders do not have a STOP trigger price",
822                order.order_type()
823            );
824        }
825
826        // Check if any value would actually change
827        let qty_changing = quantity.is_some_and(|q| q != order.quantity());
828        let price_changing = price.is_some() && price != order.price();
829        let trigger_changing = trigger_price.is_some() && trigger_price != order.trigger_price();
830
831        if !qty_changing && !price_changing && !trigger_changing {
832            anyhow::bail!("Cannot modify order in place: no parameters differ from current values");
833        }
834
835        let core = self.core_mut();
836        let ts_now = core.clock().timestamp_ns();
837
838        let updated = OrderUpdated::new(
839            order.trader_id(),
840            order.strategy_id(),
841            order.instrument_id(),
842            order.client_order_id(),
843            quantity.unwrap_or_else(|| order.quantity()),
844            UUID4::new(),
845            ts_now,
846            ts_now,
847            false, // reconciliation
848            order.venue_order_id(),
849            order.account_id(),
850            price,
851            trigger_price,
852            None, // protection_price
853            order.is_quote_quantity(),
854        );
855
856        let event = OrderEventAny::Updated(updated);
857
858        {
859            let cache_rc = core.cache_rc();
860            let mut cache = cache_rc.borrow_mut();
861            *order = cache.update_order(&event)?;
862        }
863
864        publish_order_event(&event);
865
866        Ok(())
867    }
868
869    /// Cancels an order.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if order cancellation fails.
874    fn cancel_order(
875        &mut self,
876        order: &mut OrderAny,
877        client_id: Option<ClientId>,
878    ) -> anyhow::Result<()> {
879        if order.is_closed() || order.is_pending_cancel() {
880            log::warn!(
881                "Cannot cancel order: state is {:?}, {order:?}",
882                order.status()
883            );
884            return Ok(());
885        }
886
887        let core = self.core_mut();
888        let trader_id = registered_trader_id(core)?;
889        let strategy_id = order.strategy_id();
890
891        if !order.is_active_local() {
892            required_account_id(order, "pending cancel")?;
893            let event = self.generate_order_pending_cancel(order);
894            let event = OrderEventAny::PendingCancel(event);
895
896            {
897                let cache_rc = self.core_mut().cache_rc();
898                let mut cache = cache_rc.borrow_mut();
899                match cache.update_order(&event) {
900                    Ok(updated) => *order = updated,
901                    Err(e)
902                        if matches!(
903                            e.downcast_ref::<OrderError>(),
904                            Some(OrderError::InvalidStateTransition)
905                        ) =>
906                    {
907                        log::warn!("InvalidStateTrigger: {e}, did not apply pending cancel event");
908                        return Ok(());
909                    }
910                    Err(e) => return Err(e),
911                }
912            }
913
914            let topic = format!("events.order.{strategy_id}");
915            msgbus::publish_order_event(topic.into(), &event);
916        }
917
918        let ts_init = self.core_mut().clock().timestamp_ns();
919        let command = CancelOrder::new(
920            trader_id,
921            client_id,
922            strategy_id,
923            order.instrument_id(),
924            order.client_order_id(),
925            order.venue_order_id(),
926            UUID4::new(),
927            ts_init,
928            None, // params,
929            None, // correlation_id
930        );
931
932        if self.core_mut().config.log_commands {
933            let id = &self.core_mut().actor.actor_id;
934            log::info!("{id} {SEND}{CMD} {command:?}");
935        }
936
937        let has_emulation_trigger = order
938            .emulation_trigger()
939            .is_some_and(|t| t != TriggerType::NoTrigger);
940
941        if order.is_emulated() || order.status() == OrderStatus::Released || has_emulation_trigger {
942            msgbus::send_trading_command(
943                MessagingSwitchboard::order_emulator_execute(),
944                TradingCommand::CancelOrder(command),
945            );
946        } else {
947            msgbus::send_trading_command(
948                MessagingSwitchboard::exec_engine_execute(),
949                TradingCommand::CancelOrder(command),
950            );
951        }
952
953        Ok(())
954    }
955
956    /// Subscribes to events from a strategy.
957    ///
958    /// This is called automatically when the first order is received from a strategy.
959    fn subscribe_to_strategy_events(&mut self, strategy_id: StrategyId)
960    where
961        Self: 'static + std::fmt::Debug + Sized,
962    {
963        let core = self.core_mut();
964        if core.is_strategy_subscribed(&strategy_id) {
965            return;
966        }
967
968        let actor_id = core.actor.actor_id.inner();
969
970        let order_topic = format!("events.order.{strategy_id}");
971        let order_actor_id = actor_id;
972        let order_handler = TypedHandler::from(move |event: &OrderEventAny| {
973            if let Some(mut algo) = try_get_actor_unchecked::<Self>(&order_actor_id) {
974                algo.handle_order_event(event.clone());
975            } else {
976                log::error!(
977                    "ExecutionAlgorithm {order_actor_id} not found for order event handling"
978                );
979            }
980        });
981        msgbus::subscribe_order_events(order_topic.clone().into(), order_handler.clone(), None);
982
983        let position_topic = format!("events.position.{strategy_id}");
984        let position_handler = TypedHandler::from(move |event: &PositionEvent| {
985            if let Some(mut algo) = try_get_actor_unchecked::<Self>(&actor_id) {
986                algo.handle_position_event(event.clone());
987            } else {
988                log::error!("ExecutionAlgorithm {actor_id} not found for position event handling");
989            }
990        });
991        msgbus::subscribe_position_events(
992            position_topic.clone().into(),
993            position_handler.clone(),
994            None,
995        );
996
997        let handlers = StrategyEventHandlers {
998            order_topic,
999            order_handler,
1000            position_topic,
1001            position_handler,
1002        };
1003        core.store_strategy_event_handlers(strategy_id, handlers);
1004
1005        core.add_subscribed_strategy(strategy_id);
1006        log::info!("Subscribed to events for strategy {strategy_id}");
1007    }
1008
1009    /// Unsubscribes from all strategy event handlers.
1010    ///
1011    /// This should be called before reset to properly clean up msgbus subscriptions.
1012    fn unsubscribe_all_strategy_events(&mut self) {
1013        let handlers = self.core_mut().take_strategy_event_handlers();
1014        for (strategy_id, h) in handlers {
1015            msgbus::unsubscribe_order_events(h.order_topic.into(), &h.order_handler);
1016            msgbus::unsubscribe_position_events(h.position_topic.into(), &h.position_handler);
1017            log::info!("Unsubscribed from events for strategy {strategy_id}");
1018        }
1019        self.core_mut().clear_subscribed_strategies();
1020    }
1021
1022    /// Handles an order event, filtering for algorithm-owned orders.
1023    fn handle_order_event(&mut self, event: OrderEventAny) {
1024        if self.core_mut().state() != ComponentState::Running {
1025            return;
1026        }
1027
1028        let order = {
1029            let cache = self.core_mut().cache();
1030            cache.order(&event.client_order_id()).map(|o| o.clone())
1031        };
1032
1033        let Some(order) = order else {
1034            return;
1035        };
1036
1037        let Some(order_algo_id) = order.exec_algorithm_id() else {
1038            return;
1039        };
1040
1041        if order_algo_id != self.id() {
1042            return;
1043        }
1044
1045        {
1046            let core = self.core_mut();
1047            if core.config.log_events {
1048                let id = &core.actor.actor_id;
1049                log::info!("{id} {RECV}{EVT} {event}");
1050            }
1051        }
1052
1053        match &event {
1054            OrderEventAny::Initialized(e) => self.on_order_initialized(e.clone()),
1055            OrderEventAny::Denied(e) => {
1056                self.restore_primary_order_quantity(&order);
1057                self.on_order_denied(*e);
1058            }
1059            OrderEventAny::Emulated(e) => self.on_order_emulated(*e),
1060            OrderEventAny::Released(e) => self.on_order_released(*e),
1061            OrderEventAny::Submitted(e) => self.on_order_submitted(*e),
1062            OrderEventAny::Rejected(e) => {
1063                self.restore_primary_order_quantity(&order);
1064                self.on_order_rejected(*e);
1065            }
1066            OrderEventAny::Accepted(e) => {
1067                // Commit reduction - order accepted by venue
1068                self.core_mut()
1069                    .take_pending_spawn_reduction(&order.client_order_id());
1070                self.on_order_accepted(*e);
1071            }
1072            OrderEventAny::Canceled(e) => {
1073                self.core_mut()
1074                    .take_pending_spawn_reduction(&order.client_order_id());
1075                self.on_algo_order_canceled(*e);
1076            }
1077            OrderEventAny::Expired(e) => {
1078                self.core_mut()
1079                    .take_pending_spawn_reduction(&order.client_order_id());
1080                self.on_order_expired(*e);
1081            }
1082            OrderEventAny::Triggered(e) => self.on_order_triggered(*e),
1083            OrderEventAny::PendingUpdate(e) => self.on_order_pending_update(*e),
1084            OrderEventAny::PendingCancel(e) => self.on_order_pending_cancel(*e),
1085            OrderEventAny::ModifyRejected(e) => self.on_order_modify_rejected(*e),
1086            OrderEventAny::CancelRejected(e) => self.on_order_cancel_rejected(*e),
1087            OrderEventAny::Updated(e) => self.on_order_updated(*e),
1088            OrderEventAny::Filled(e) => self.on_algo_order_filled(*e),
1089        }
1090
1091        self.on_order_event(event);
1092    }
1093
1094    /// Handles a position event.
1095    fn handle_position_event(&mut self, event: PositionEvent) {
1096        if self.core_mut().state() != ComponentState::Running {
1097            return;
1098        }
1099
1100        {
1101            let core = self.core_mut();
1102            if core.config.log_events {
1103                let id = &core.actor.actor_id;
1104                log::info!("{id} {RECV}{EVT} {event:?}");
1105            }
1106        }
1107
1108        match &event {
1109            PositionEvent::PositionOpened(e) => self.on_position_opened(e.clone()),
1110            PositionEvent::PositionChanged(e) => self.on_position_changed(e.clone()),
1111            PositionEvent::PositionClosed(e) => self.on_position_closed(e.clone()),
1112            PositionEvent::PositionAdjusted(_) => {}
1113        }
1114
1115        self.on_position_event(event);
1116    }
1117
1118    /// Called when the algorithm is started.
1119    ///
1120    /// Override this method to implement custom initialization logic.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Returns an error if start fails.
1125    fn on_start(&mut self) -> anyhow::Result<()> {
1126        let id = self.id();
1127        log::info!("Starting {id}");
1128        Ok(())
1129    }
1130
1131    /// Called when the algorithm is stopped.
1132    ///
1133    /// # Errors
1134    ///
1135    /// Returns an error if stop fails.
1136    fn on_stop(&mut self) -> anyhow::Result<()> {
1137        Ok(())
1138    }
1139
1140    /// Called when the algorithm is reset.
1141    ///
1142    /// # Errors
1143    ///
1144    /// Returns an error if reset fails.
1145    fn on_reset(&mut self) -> anyhow::Result<()> {
1146        self.unsubscribe_all_strategy_events();
1147        self.core_mut().reset();
1148        Ok(())
1149    }
1150
1151    /// Called when a time event is received.
1152    ///
1153    /// Override this method for timer-based algorithms like TWAP.
1154    ///
1155    /// # Errors
1156    ///
1157    /// Returns an error if time event handling fails.
1158    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
1159        Ok(())
1160    }
1161
1162    /// Called when an order is initialized.
1163    #[allow(unused_variables)]
1164    fn on_order_initialized(&mut self, event: OrderInitialized) {}
1165
1166    /// Called when an order is denied.
1167    #[allow(unused_variables)]
1168    fn on_order_denied(&mut self, event: OrderDenied) {}
1169
1170    /// Called when an order is emulated.
1171    #[allow(unused_variables)]
1172    fn on_order_emulated(&mut self, event: OrderEmulated) {}
1173
1174    /// Called when an order is released from emulation.
1175    #[allow(unused_variables)]
1176    fn on_order_released(&mut self, event: OrderReleased) {}
1177
1178    /// Called when an order is submitted.
1179    #[allow(unused_variables)]
1180    fn on_order_submitted(&mut self, event: OrderSubmitted) {}
1181
1182    /// Called when an order is rejected.
1183    #[allow(unused_variables)]
1184    fn on_order_rejected(&mut self, event: OrderRejected) {}
1185
1186    /// Called when an order is accepted.
1187    #[allow(unused_variables)]
1188    fn on_order_accepted(&mut self, event: OrderAccepted) {}
1189
1190    /// Called when an order is canceled.
1191    #[allow(unused_variables)]
1192    fn on_algo_order_canceled(&mut self, event: OrderCanceled) {}
1193
1194    /// Called when an order expires.
1195    #[allow(unused_variables)]
1196    fn on_order_expired(&mut self, event: OrderExpired) {}
1197
1198    /// Called when an order is triggered.
1199    #[allow(unused_variables)]
1200    fn on_order_triggered(&mut self, event: OrderTriggered) {}
1201
1202    /// Called when an order modification is pending.
1203    #[allow(unused_variables)]
1204    fn on_order_pending_update(&mut self, event: OrderPendingUpdate) {}
1205
1206    /// Called when an order cancellation is pending.
1207    #[allow(unused_variables)]
1208    fn on_order_pending_cancel(&mut self, event: OrderPendingCancel) {}
1209
1210    /// Called when an order modification is rejected.
1211    #[allow(unused_variables)]
1212    fn on_order_modify_rejected(&mut self, event: OrderModifyRejected) {}
1213
1214    /// Called when an order cancellation is rejected.
1215    #[allow(unused_variables)]
1216    fn on_order_cancel_rejected(&mut self, event: OrderCancelRejected) {}
1217
1218    /// Called when an order is updated.
1219    #[allow(unused_variables)]
1220    fn on_order_updated(&mut self, event: OrderUpdated) {}
1221
1222    /// Called when an order is filled.
1223    #[allow(unused_variables)]
1224    fn on_algo_order_filled(&mut self, event: OrderFilled) {}
1225
1226    /// Called for any order event (after specific handler).
1227    #[allow(unused_variables)]
1228    fn on_order_event(&mut self, event: OrderEventAny) {}
1229
1230    /// Called when a position is opened.
1231    #[allow(unused_variables)]
1232    fn on_position_opened(&mut self, event: PositionOpened) {}
1233
1234    /// Called when a position is changed.
1235    #[allow(unused_variables)]
1236    fn on_position_changed(&mut self, event: PositionChanged) {}
1237
1238    /// Called when a position is closed.
1239    #[allow(unused_variables)]
1240    fn on_position_closed(&mut self, event: PositionClosed) {}
1241
1242    /// Called for any position event (after specific handler).
1243    #[allow(unused_variables)]
1244    fn on_position_event(&mut self, event: PositionEvent) {}
1245}
1246
1247fn publish_order_initialized(order: &OrderAny) {
1248    let event = OrderEventAny::Initialized(order.init_event().clone());
1249    publish_order_event(&event);
1250}
1251
1252fn publish_order_event(event: &OrderEventAny) {
1253    let topic = format!("events.order.{}", event.strategy_id());
1254    msgbus::publish_order_event(topic.into(), event);
1255}
1256
1257fn registered_trader_id(core: &ExecutionAlgorithmCore) -> anyhow::Result<TraderId> {
1258    core.trader_id()
1259        .ok_or_else(|| anyhow::anyhow!("ExecutionAlgorithm not registered: trader_id is not set"))
1260}
1261
1262fn required_account_id(order: &OrderAny, operation: &str) -> anyhow::Result<AccountId> {
1263    order.account_id().ok_or_else(|| {
1264        anyhow::anyhow!(
1265            "Cannot generate {operation} event for {}: account_id is not set",
1266            order.client_order_id()
1267        )
1268    })
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    use std::{cell::RefCell, rc::Rc};
1274
1275    use nautilus_common::{
1276        actor::DataActor, cache::Cache, clock::TestClock, component::Component,
1277        enums::ComponentTrigger, msgbus, msgbus::TypedHandler, nautilus_actor,
1278    };
1279    use nautilus_model::{
1280        enums::OrderSide,
1281        events::{
1282            OrderAccepted, OrderCanceled, OrderDenied, OrderRejected,
1283            order::spec::{
1284                OrderAcceptedSpec, OrderCanceledSpec, OrderDeniedSpec, OrderFilledSpec,
1285                OrderRejectedSpec,
1286            },
1287        },
1288        identifiers::{
1289            AccountId, ClientOrderId, ExecAlgorithmId, InstrumentId, StrategyId, TraderId,
1290            VenueOrderId,
1291        },
1292        orders::{LimitOrder, MarketOrder, OrderAny, stubs::TestOrderStubs},
1293        types::{Price, Quantity},
1294    };
1295    use rstest::rstest;
1296
1297    use super::*;
1298
1299    #[derive(Debug)]
1300    struct TestAlgorithm {
1301        core: ExecutionAlgorithmCore,
1302        on_order_called: bool,
1303        last_order_client_id: Option<ClientOrderId>,
1304    }
1305
1306    impl TestAlgorithm {
1307        fn new(config: ExecutionAlgorithmConfig) -> Self {
1308            Self {
1309                core: ExecutionAlgorithmCore::new(config),
1310                on_order_called: false,
1311                last_order_client_id: None,
1312            }
1313        }
1314    }
1315
1316    impl DataActor for TestAlgorithm {}
1317
1318    nautilus_actor!(TestAlgorithm);
1319
1320    impl ExecutionAlgorithm for TestAlgorithm {
1321        fn core_mut(&mut self) -> &mut ExecutionAlgorithmCore {
1322            &mut self.core
1323        }
1324
1325        fn on_order(&mut self, order: OrderAny) -> anyhow::Result<()> {
1326            self.on_order_called = true;
1327            self.last_order_client_id = Some(order.client_order_id());
1328            Ok(())
1329        }
1330    }
1331
1332    fn create_test_algorithm() -> TestAlgorithm {
1333        // Use unique ID to avoid thread-local registry/msgbus conflicts in parallel tests
1334        let unique_id = format!("TEST-{}", UUID4::new());
1335        let config = ExecutionAlgorithmConfig {
1336            exec_algorithm_id: Some(ExecAlgorithmId::new(&unique_id)),
1337            ..Default::default()
1338        };
1339        TestAlgorithm::new(config)
1340    }
1341
1342    fn register_algorithm(algo: &mut TestAlgorithm) {
1343        let trader_id = TraderId::from("TRADER-001");
1344        let clock = Rc::new(RefCell::new(TestClock::new()));
1345        let cache = Rc::new(RefCell::new(Cache::default()));
1346
1347        algo.core.register(trader_id, clock, cache).unwrap();
1348
1349        // Transition to Running state for tests
1350        algo.transition_state(ComponentTrigger::Initialize).unwrap();
1351        algo.transition_state(ComponentTrigger::Start).unwrap();
1352        algo.transition_state(ComponentTrigger::StartCompleted)
1353            .unwrap();
1354    }
1355
1356    fn subscribe_order_topic(
1357        strategy_id: StrategyId,
1358    ) -> (TypedHandler<OrderEventAny>, Rc<RefCell<Vec<OrderEventAny>>>) {
1359        let events = Rc::new(RefCell::new(Vec::new()));
1360        let handler = TypedHandler::from({
1361            let events = events.clone();
1362            move |event: &OrderEventAny| {
1363                events.borrow_mut().push(event.clone());
1364            }
1365        });
1366        msgbus::subscribe_order_events(
1367            format!("events.order.{strategy_id}").into(),
1368            handler.clone(),
1369            None,
1370        );
1371        (handler, events)
1372    }
1373
1374    #[rstest]
1375    fn test_algorithm_creation() {
1376        let algo = create_test_algorithm();
1377        assert!(algo.core.exec_algorithm_id.inner().starts_with("TEST-"));
1378        assert!(!algo.on_order_called);
1379        assert!(algo.last_order_client_id.is_none());
1380    }
1381
1382    #[rstest]
1383    fn test_algorithm_registration() {
1384        let mut algo = create_test_algorithm();
1385        register_algorithm(&mut algo);
1386
1387        assert!(algo.core.trader_id().is_some());
1388        assert_eq!(algo.core.trader_id(), Some(TraderId::from("TRADER-001")));
1389    }
1390
1391    #[rstest]
1392    fn test_submit_order_errors_when_algorithm_not_registered() {
1393        let mut algo = create_test_algorithm();
1394        let order = OrderAny::Market(MarketOrder::new(
1395            TraderId::from("TRADER-001"),
1396            StrategyId::from("STRAT-001"),
1397            InstrumentId::from("BTC/USDT.BINANCE"),
1398            ClientOrderId::from("O-UNREGISTERED-001"),
1399            OrderSide::Buy,
1400            Quantity::from("1.0"),
1401            TimeInForce::Gtc,
1402            UUID4::new(),
1403            0.into(),
1404            false,
1405            false,
1406            None,
1407            None,
1408            None,
1409            None,
1410            None,
1411            None,
1412            None,
1413            None,
1414        ));
1415
1416        let err = algo
1417            .submit_order(order, None, None)
1418            .unwrap_err()
1419            .to_string();
1420
1421        assert_eq!(
1422            err,
1423            "ExecutionAlgorithm not registered: trader_id is not set"
1424        );
1425    }
1426
1427    #[rstest]
1428    fn test_required_account_id_errors_when_missing_for_algorithm_event() {
1429        let order = OrderAny::Market(MarketOrder::new(
1430            TraderId::from("TRADER-001"),
1431            StrategyId::from("STRAT-001"),
1432            InstrumentId::from("BTC/USDT.BINANCE"),
1433            ClientOrderId::from("O-NO-ACCOUNT-001"),
1434            OrderSide::Buy,
1435            Quantity::from("1.0"),
1436            TimeInForce::Gtc,
1437            UUID4::new(),
1438            0.into(),
1439            false,
1440            false,
1441            None,
1442            None,
1443            None,
1444            None,
1445            None,
1446            None,
1447            None,
1448            None,
1449        ));
1450
1451        let err = required_account_id(&order, "pending update")
1452            .unwrap_err()
1453            .to_string();
1454
1455        assert_eq!(
1456            err,
1457            "Cannot generate pending update event for O-NO-ACCOUNT-001: account_id is not set"
1458        );
1459    }
1460
1461    #[rstest]
1462    fn test_algorithm_id() {
1463        let mut algo = create_test_algorithm();
1464        assert!(algo.id().inner().starts_with("TEST-"));
1465    }
1466
1467    #[rstest]
1468    fn test_algorithm_spawn_market_creates_valid_order() {
1469        let mut algo = create_test_algorithm();
1470        register_algorithm(&mut algo);
1471
1472        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1473        let mut primary = OrderAny::Market(MarketOrder::new(
1474            TraderId::from("TRADER-001"),
1475            StrategyId::from("STRAT-001"),
1476            instrument_id,
1477            ClientOrderId::from("O-001"),
1478            OrderSide::Buy,
1479            Quantity::from("1.0"),
1480            TimeInForce::Gtc,
1481            UUID4::new(),
1482            0.into(),
1483            false, // reduce_only
1484            false, // quote_quantity
1485            None,  // contingency_type
1486            None,  // order_list_id
1487            None,  // linked_order_ids
1488            None,  // parent_order_id
1489            None,  // exec_algorithm_id
1490            None,  // exec_algorithm_params
1491            None,  // exec_spawn_id
1492            None,  // tags
1493        ));
1494
1495        let spawned = algo.spawn_market(
1496            &mut primary,
1497            Quantity::from("0.5"),
1498            TimeInForce::Ioc,
1499            false,
1500            None,  // tags
1501            false, // reduce_primary
1502        );
1503
1504        assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1505        assert_eq!(spawned.instrument_id, instrument_id);
1506        assert_eq!(spawned.order_side(), OrderSide::Buy);
1507        assert_eq!(spawned.quantity, Quantity::from("0.5"));
1508        assert_eq!(spawned.time_in_force, TimeInForce::Ioc);
1509        assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1510        assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1511    }
1512
1513    #[rstest]
1514    fn test_algorithm_spawn_increments_sequence() {
1515        let mut algo = create_test_algorithm();
1516        register_algorithm(&mut algo);
1517
1518        let mut primary = OrderAny::Market(MarketOrder::new(
1519            TraderId::from("TRADER-001"),
1520            StrategyId::from("STRAT-001"),
1521            InstrumentId::from("BTC/USDT.BINANCE"),
1522            ClientOrderId::from("O-001"),
1523            OrderSide::Buy,
1524            Quantity::from("1.0"),
1525            TimeInForce::Gtc,
1526            UUID4::new(),
1527            0.into(),
1528            false,
1529            false,
1530            None,
1531            None,
1532            None,
1533            None,
1534            None,
1535            None,
1536            None,
1537            None,
1538        ));
1539
1540        let spawned1 = algo.spawn_market(
1541            &mut primary,
1542            Quantity::from("0.25"),
1543            TimeInForce::Ioc,
1544            false,
1545            None,
1546            false,
1547        );
1548        let spawned2 = algo.spawn_market(
1549            &mut primary,
1550            Quantity::from("0.25"),
1551            TimeInForce::Ioc,
1552            false,
1553            None,
1554            false,
1555        );
1556        let spawned3 = algo.spawn_market(
1557            &mut primary,
1558            Quantity::from("0.25"),
1559            TimeInForce::Ioc,
1560            false,
1561            None,
1562            false,
1563        );
1564
1565        assert_eq!(spawned1.client_order_id.as_str(), "O-001-E1");
1566        assert_eq!(spawned2.client_order_id.as_str(), "O-001-E2");
1567        assert_eq!(spawned3.client_order_id.as_str(), "O-001-E3");
1568    }
1569
1570    #[rstest]
1571    fn test_algorithm_default_handlers_do_not_panic() {
1572        let mut algo = create_test_algorithm();
1573
1574        algo.on_order_initialized(OrderInitialized::default());
1575        algo.on_order_denied(OrderDenied::default());
1576        algo.on_order_emulated(OrderEmulated::default());
1577        algo.on_order_released(OrderReleased::default());
1578        algo.on_order_submitted(OrderSubmitted::default());
1579        algo.on_order_rejected(OrderRejected::default());
1580        algo.on_order_accepted(OrderAccepted::default());
1581        algo.on_algo_order_canceled(OrderCanceled::default());
1582        algo.on_order_expired(OrderExpired::default());
1583        algo.on_order_triggered(OrderTriggered::default());
1584        algo.on_order_pending_update(OrderPendingUpdate::default());
1585        algo.on_order_pending_cancel(OrderPendingCancel::default());
1586        algo.on_order_modify_rejected(OrderModifyRejected::default());
1587        algo.on_order_cancel_rejected(OrderCancelRejected::default());
1588        algo.on_order_updated(OrderUpdated::default());
1589        algo.on_algo_order_filled(OrderFilledSpec::builder().build());
1590    }
1591
1592    #[rstest]
1593    fn test_strategy_subscription_tracking() {
1594        let mut algo = create_test_algorithm();
1595        let strategy_id = StrategyId::from("STRAT-001");
1596
1597        assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1598
1599        algo.subscribe_to_strategy_events(strategy_id);
1600        assert!(algo.core.is_strategy_subscribed(&strategy_id));
1601
1602        // Second call should be idempotent
1603        algo.subscribe_to_strategy_events(strategy_id);
1604        assert!(algo.core.is_strategy_subscribed(&strategy_id));
1605    }
1606
1607    #[rstest]
1608    fn test_algorithm_reset() {
1609        let mut algo = create_test_algorithm();
1610        let strategy_id = StrategyId::from("STRAT-001");
1611        let primary_id = ClientOrderId::new("O-001");
1612
1613        let _ = algo.core.spawn_client_order_id(&primary_id);
1614        algo.core.add_subscribed_strategy(strategy_id);
1615
1616        assert!(algo.core.spawn_sequence(&primary_id).is_some());
1617        assert!(algo.core.is_strategy_subscribed(&strategy_id));
1618
1619        ExecutionAlgorithm::on_reset(&mut algo).unwrap();
1620
1621        assert!(algo.core.spawn_sequence(&primary_id).is_none());
1622        assert!(!algo.core.is_strategy_subscribed(&strategy_id));
1623    }
1624
1625    #[rstest]
1626    fn test_algorithm_spawn_limit_creates_valid_order() {
1627        let mut algo = create_test_algorithm();
1628        register_algorithm(&mut algo);
1629
1630        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1631        let mut primary = OrderAny::Market(MarketOrder::new(
1632            TraderId::from("TRADER-001"),
1633            StrategyId::from("STRAT-001"),
1634            instrument_id,
1635            ClientOrderId::from("O-001"),
1636            OrderSide::Buy,
1637            Quantity::from("1.0"),
1638            TimeInForce::Gtc,
1639            UUID4::new(),
1640            0.into(),
1641            false,
1642            false,
1643            None,
1644            None,
1645            None,
1646            None,
1647            None,
1648            None,
1649            None,
1650            None,
1651        ));
1652
1653        let price = Price::from("50000.0");
1654        let spawned = algo.spawn_limit(
1655            &mut primary,
1656            Quantity::from("0.5"),
1657            price,
1658            TimeInForce::Gtc,
1659            None,  // expire_time
1660            false, // post_only
1661            false, // reduce_only
1662            None,  // display_qty
1663            None,  // emulation_trigger
1664            None,  // tags
1665            false, // reduce_primary
1666        );
1667
1668        assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1669        assert_eq!(spawned.instrument_id, instrument_id);
1670        assert_eq!(spawned.order_side(), OrderSide::Buy);
1671        assert_eq!(spawned.quantity, Quantity::from("0.5"));
1672        assert_eq!(spawned.price, price);
1673        assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1674        assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1675        assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1676    }
1677
1678    #[rstest]
1679    fn test_algorithm_spawn_market_to_limit_creates_valid_order() {
1680        let mut algo = create_test_algorithm();
1681        register_algorithm(&mut algo);
1682
1683        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
1684        let mut primary = OrderAny::Market(MarketOrder::new(
1685            TraderId::from("TRADER-001"),
1686            StrategyId::from("STRAT-001"),
1687            instrument_id,
1688            ClientOrderId::from("O-001"),
1689            OrderSide::Buy,
1690            Quantity::from("1.0"),
1691            TimeInForce::Gtc,
1692            UUID4::new(),
1693            0.into(),
1694            false,
1695            false,
1696            None,
1697            None,
1698            None,
1699            None,
1700            None,
1701            None,
1702            None,
1703            None,
1704        ));
1705
1706        let spawned = algo.spawn_market_to_limit(
1707            &mut primary,
1708            Quantity::from("0.5"),
1709            TimeInForce::Gtc,
1710            None,  // expire_time
1711            false, // reduce_only
1712            None,  // display_qty
1713            None,  // emulation_trigger
1714            None,  // tags
1715            false, // reduce_primary
1716        );
1717
1718        assert_eq!(spawned.client_order_id.as_str(), "O-001-E1");
1719        assert_eq!(spawned.instrument_id, instrument_id);
1720        assert_eq!(spawned.order_side(), OrderSide::Buy);
1721        assert_eq!(spawned.quantity, Quantity::from("0.5"));
1722        assert_eq!(spawned.time_in_force, TimeInForce::Gtc);
1723        assert_eq!(spawned.exec_algorithm_id, Some(algo.id()));
1724        assert_eq!(spawned.exec_spawn_id, Some(ClientOrderId::from("O-001")));
1725    }
1726
1727    #[rstest]
1728    fn test_algorithm_spawn_market_with_tags() {
1729        let mut algo = create_test_algorithm();
1730        register_algorithm(&mut algo);
1731
1732        let mut primary = OrderAny::Market(MarketOrder::new(
1733            TraderId::from("TRADER-001"),
1734            StrategyId::from("STRAT-001"),
1735            InstrumentId::from("BTC/USDT.BINANCE"),
1736            ClientOrderId::from("O-001"),
1737            OrderSide::Buy,
1738            Quantity::from("1.0"),
1739            TimeInForce::Gtc,
1740            UUID4::new(),
1741            0.into(),
1742            false,
1743            false,
1744            None,
1745            None,
1746            None,
1747            None,
1748            None,
1749            None,
1750            None,
1751            None,
1752        ));
1753
1754        let tags = vec![ustr::Ustr::from("TAG1"), ustr::Ustr::from("TAG2")];
1755        let spawned = algo.spawn_market(
1756            &mut primary,
1757            Quantity::from("0.5"),
1758            TimeInForce::Ioc,
1759            false,
1760            Some(tags.clone()),
1761            false,
1762        );
1763
1764        assert_eq!(spawned.tags, Some(tags));
1765    }
1766
1767    #[rstest]
1768    fn test_algorithm_spawn_propagates_primary_fields() {
1769        let mut algo = create_test_algorithm();
1770        register_algorithm(&mut algo);
1771
1772        let mut params = indexmap::IndexMap::new();
1773        params.insert(ustr::Ustr::from("horizon_secs"), ustr::Ustr::from("30"));
1774        params.insert(ustr::Ustr::from("interval_secs"), ustr::Ustr::from("10"));
1775        let primary_tags = vec![ustr::Ustr::from("PRIMARY_TAG")];
1776        let linked_order_ids = vec![ClientOrderId::from("LINK-1")];
1777
1778        let mut primary = OrderAny::Market(MarketOrder::new(
1779            TraderId::from("TRADER-001"),
1780            StrategyId::from("STRAT-001"),
1781            InstrumentId::from("BTC/USDT.BINANCE"),
1782            ClientOrderId::from("O-001"),
1783            OrderSide::Buy,
1784            Quantity::from("1.0"),
1785            TimeInForce::Gtc,
1786            UUID4::new(),
1787            0.into(),
1788            false, // reduce_only
1789            true,  // quote_quantity
1790            None,  // contingency_type
1791            None,  // order_list_id
1792            Some(linked_order_ids.clone()),
1793            None, // parent_order_id
1794            Some(algo.id()),
1795            Some(params.clone()),
1796            None, // exec_spawn_id
1797            Some(primary_tags.clone()),
1798        ));
1799
1800        let spawned_market = algo.spawn_market(
1801            &mut primary,
1802            Quantity::from("0.25"),
1803            TimeInForce::Ioc,
1804            false,
1805            None, // falls back to primary.tags
1806            false,
1807        );
1808        assert!(spawned_market.is_quote_quantity);
1809        assert_eq!(spawned_market.exec_algorithm_params, Some(params.clone()));
1810        assert_eq!(spawned_market.tags, Some(primary_tags.clone()));
1811        assert_eq!(
1812            spawned_market.linked_order_ids,
1813            Some(linked_order_ids.clone())
1814        );
1815
1816        let spawned_limit = algo.spawn_limit(
1817            &mut primary,
1818            Quantity::from("0.25"),
1819            Price::from("50000.0"),
1820            TimeInForce::Gtc,
1821            None,  // expire_time
1822            false, // post_only
1823            false, // reduce_only
1824            None,  // display_qty
1825            None,  // emulation_trigger
1826            None,  // falls back to primary.tags
1827            false,
1828        );
1829        assert!(spawned_limit.is_quote_quantity);
1830        assert_eq!(spawned_limit.exec_algorithm_params, Some(params.clone()));
1831        assert_eq!(spawned_limit.tags, Some(primary_tags.clone()));
1832        assert_eq!(
1833            spawned_limit.linked_order_ids,
1834            Some(linked_order_ids.clone())
1835        );
1836
1837        let spawned_mtl = algo.spawn_market_to_limit(
1838            &mut primary,
1839            Quantity::from("0.25"),
1840            TimeInForce::Gtc,
1841            None,  // expire_time
1842            false, // reduce_only
1843            None,  // display_qty
1844            None,  // emulation_trigger
1845            None,  // falls back to primary.tags
1846            false,
1847        );
1848        assert!(spawned_mtl.is_quote_quantity);
1849        assert_eq!(spawned_mtl.exec_algorithm_params, Some(params));
1850        assert_eq!(spawned_mtl.tags, Some(primary_tags));
1851        assert_eq!(spawned_mtl.linked_order_ids, Some(linked_order_ids));
1852    }
1853
1854    #[rstest]
1855    fn test_algorithm_reduce_primary_order() {
1856        let mut algo = create_test_algorithm();
1857        register_algorithm(&mut algo);
1858
1859        let order = OrderAny::Market(MarketOrder::new(
1860            TraderId::from("TRADER-001"),
1861            StrategyId::from("STRAT-001"),
1862            InstrumentId::from("BTC/USDT.BINANCE"),
1863            ClientOrderId::from("O-001"),
1864            OrderSide::Buy,
1865            Quantity::from("1.0"),
1866            TimeInForce::Gtc,
1867            UUID4::new(),
1868            0.into(),
1869            false,
1870            false,
1871            None,
1872            None,
1873            None,
1874            None,
1875            None,
1876            None,
1877            None,
1878            None,
1879        ));
1880
1881        // Make accepted so OrderUpdated can be applied
1882        let mut primary = TestOrderStubs::make_accepted_order(&order);
1883
1884        {
1885            let cache_rc = algo.core.cache_rc();
1886            let mut cache = cache_rc.borrow_mut();
1887            cache.add_order(primary.clone(), None, None, false).unwrap();
1888        }
1889
1890        let spawn_qty = Quantity::from("0.3");
1891        algo.reduce_primary_order(&mut primary, spawn_qty);
1892
1893        assert_eq!(primary.quantity(), Quantity::from("0.7"));
1894    }
1895
1896    #[rstest]
1897    fn test_algorithm_reduce_primary_order_publishes_updated_event() {
1898        let mut algo = create_test_algorithm();
1899        register_algorithm(&mut algo);
1900
1901        let strategy_id = StrategyId::from("STRAT-ALGO-REDUCE-PUBLISH");
1902        let order = OrderAny::Market(MarketOrder::new(
1903            TraderId::from("TRADER-001"),
1904            strategy_id,
1905            InstrumentId::from("BTC/USDT.BINANCE"),
1906            ClientOrderId::from("O-ALGO-REDUCE"),
1907            OrderSide::Buy,
1908            Quantity::from("1.0"),
1909            TimeInForce::Gtc,
1910            UUID4::new(),
1911            0.into(),
1912            false,
1913            false,
1914            None,
1915            None,
1916            None,
1917            None,
1918            None,
1919            None,
1920            None,
1921            None,
1922        ));
1923        let mut primary = TestOrderStubs::make_accepted_order(&order);
1924
1925        {
1926            let cache_rc = algo.core.cache_rc();
1927            let mut cache = cache_rc.borrow_mut();
1928            cache.add_order(primary.clone(), None, None, false).unwrap();
1929        }
1930
1931        let (handler, events) = subscribe_order_topic(strategy_id);
1932
1933        algo.reduce_primary_order(&mut primary, Quantity::from("0.3"));
1934
1935        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1936        let events = events.borrow();
1937
1938        assert_eq!(events.len(), 1);
1939        assert!(matches!(
1940            &events[0],
1941            OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
1942        ));
1943    }
1944
1945    #[rstest]
1946    fn test_algorithm_submit_order_publishes_initialized_for_new_order() {
1947        let mut algo = create_test_algorithm();
1948        register_algorithm(&mut algo);
1949
1950        let strategy_id = StrategyId::from("STRAT-ALGO-INIT-PUBLISH");
1951        let order = OrderAny::Market(MarketOrder::new(
1952            TraderId::from("TRADER-001"),
1953            strategy_id,
1954            InstrumentId::from("BTC/USDT.BINANCE"),
1955            ClientOrderId::from("O-ALGO-INIT"),
1956            OrderSide::Buy,
1957            Quantity::from("1.0"),
1958            TimeInForce::Gtc,
1959            UUID4::new(),
1960            0.into(),
1961            false,
1962            false,
1963            None,
1964            None,
1965            None,
1966            None,
1967            None,
1968            None,
1969            None,
1970            None,
1971        ));
1972        let (handler, events) = subscribe_order_topic(strategy_id);
1973
1974        algo.submit_order(order.clone(), None, None).unwrap();
1975
1976        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
1977        let events = events.borrow();
1978
1979        assert_eq!(events.len(), 1);
1980        assert!(matches!(
1981            &events[0],
1982            OrderEventAny::Initialized(event) if event.client_order_id == order.client_order_id()
1983        ));
1984    }
1985
1986    #[rstest]
1987    fn test_algorithm_submit_order_does_not_republish_initialized_for_existing_order() {
1988        let mut algo = create_test_algorithm();
1989        register_algorithm(&mut algo);
1990
1991        let strategy_id = StrategyId::from("STRAT-ALGO-INIT-EXISTING");
1992        let order = OrderAny::Market(MarketOrder::new(
1993            TraderId::from("TRADER-001"),
1994            strategy_id,
1995            InstrumentId::from("BTC/USDT.BINANCE"),
1996            ClientOrderId::from("O-ALGO-INIT-EXISTING"),
1997            OrderSide::Buy,
1998            Quantity::from("1.0"),
1999            TimeInForce::Gtc,
2000            UUID4::new(),
2001            0.into(),
2002            false,
2003            false,
2004            None,
2005            None,
2006            None,
2007            None,
2008            None,
2009            None,
2010            None,
2011            None,
2012        ));
2013        {
2014            let cache_rc = algo.core.cache_rc();
2015            let mut cache = cache_rc.borrow_mut();
2016            cache.add_order(order.clone(), None, None, true).unwrap();
2017        }
2018        let (handler, events) = subscribe_order_topic(strategy_id);
2019
2020        algo.submit_order(order, None, None).unwrap();
2021
2022        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2023        assert!(events.borrow().is_empty());
2024    }
2025
2026    #[rstest]
2027    fn test_algorithm_spawn_market_with_reduce_primary() {
2028        let mut algo = create_test_algorithm();
2029        register_algorithm(&mut algo);
2030
2031        let order = OrderAny::Market(MarketOrder::new(
2032            TraderId::from("TRADER-001"),
2033            StrategyId::from("STRAT-001"),
2034            InstrumentId::from("BTC/USDT.BINANCE"),
2035            ClientOrderId::from("O-001"),
2036            OrderSide::Buy,
2037            Quantity::from("1.0"),
2038            TimeInForce::Gtc,
2039            UUID4::new(),
2040            0.into(),
2041            false,
2042            false,
2043            None,
2044            None,
2045            None,
2046            None,
2047            None,
2048            None,
2049            None,
2050            None,
2051        ));
2052
2053        // Make accepted so OrderUpdated can be applied
2054        let mut primary = TestOrderStubs::make_accepted_order(&order);
2055
2056        {
2057            let cache_rc = algo.core.cache_rc();
2058            let mut cache = cache_rc.borrow_mut();
2059            cache.add_order(primary.clone(), None, None, false).unwrap();
2060        }
2061
2062        let spawned = algo.spawn_market(
2063            &mut primary,
2064            Quantity::from("0.4"),
2065            TimeInForce::Ioc,
2066            false,
2067            None,
2068            true, // reduce_primary = true
2069        );
2070
2071        assert_eq!(spawned.quantity, Quantity::from("0.4"));
2072        assert_eq!(primary.quantity(), Quantity::from("0.6"));
2073    }
2074
2075    #[rstest]
2076    fn test_algorithm_generate_order_canceled() {
2077        let mut algo = create_test_algorithm();
2078        register_algorithm(&mut algo);
2079
2080        let order = OrderAny::Market(MarketOrder::new(
2081            TraderId::from("TRADER-001"),
2082            StrategyId::from("STRAT-001"),
2083            InstrumentId::from("BTC/USDT.BINANCE"),
2084            ClientOrderId::from("O-001"),
2085            OrderSide::Buy,
2086            Quantity::from("1.0"),
2087            TimeInForce::Gtc,
2088            UUID4::new(),
2089            0.into(),
2090            false,
2091            false,
2092            None,
2093            None,
2094            None,
2095            None,
2096            None,
2097            None,
2098            None,
2099            None,
2100        ));
2101
2102        let event = algo.generate_order_canceled(&order);
2103
2104        assert_eq!(event.trader_id, TraderId::from("TRADER-001"));
2105        assert_eq!(event.strategy_id, StrategyId::from("STRAT-001"));
2106        assert_eq!(event.instrument_id, InstrumentId::from("BTC/USDT.BINANCE"));
2107        assert_eq!(event.client_order_id, ClientOrderId::from("O-001"));
2108    }
2109
2110    #[rstest]
2111    fn test_algorithm_modify_order_in_place_updates_quantity() {
2112        let mut algo = create_test_algorithm();
2113        register_algorithm(&mut algo);
2114
2115        let strategy_id = StrategyId::from("STRAT-ALGO-MODIFY-IN-PLACE");
2116        let mut order = OrderAny::Limit(LimitOrder::new(
2117            TraderId::from("TRADER-001"),
2118            strategy_id,
2119            InstrumentId::from("BTC/USDT.BINANCE"),
2120            ClientOrderId::from("O-001"),
2121            OrderSide::Buy,
2122            Quantity::from("1.0"),
2123            Price::from("50000.0"),
2124            TimeInForce::Gtc,
2125            None,  // expire_time
2126            false, // post_only
2127            false, // reduce_only
2128            false, // quote_quantity
2129            None,  // display_qty
2130            None,  // emulation_trigger
2131            None,  // trigger_instrument_id
2132            None,  // contingency_type
2133            None,  // order_list_id
2134            None,  // linked_order_ids
2135            None,  // parent_order_id
2136            None,  // exec_algorithm_id
2137            None,  // exec_algorithm_params
2138            None,  // exec_spawn_id
2139            None,  // tags
2140            UUID4::new(),
2141            0.into(),
2142        ));
2143
2144        {
2145            let cache_rc = algo.core.cache_rc();
2146            let mut cache = cache_rc.borrow_mut();
2147            cache.add_order(order.clone(), None, None, false).unwrap();
2148        }
2149
2150        let new_qty = Quantity::from("0.5");
2151        let (handler, events) = subscribe_order_topic(strategy_id);
2152
2153        algo.modify_order_in_place(&mut order, Some(new_qty), None, None)
2154            .unwrap();
2155
2156        msgbus::unsubscribe_order_events(format!("events.order.{strategy_id}").into(), &handler);
2157        let events = events.borrow();
2158
2159        assert_eq!(order.quantity(), new_qty);
2160        assert_eq!(events.len(), 1);
2161        assert!(matches!(
2162            &events[0],
2163            OrderEventAny::Updated(event) if event.quantity == new_qty
2164        ));
2165    }
2166
2167    #[rstest]
2168    fn test_algorithm_modify_order_in_place_rejects_no_changes() {
2169        let mut algo = create_test_algorithm();
2170        register_algorithm(&mut algo);
2171
2172        let mut order = OrderAny::Limit(LimitOrder::new(
2173            TraderId::from("TRADER-001"),
2174            StrategyId::from("STRAT-001"),
2175            InstrumentId::from("BTC/USDT.BINANCE"),
2176            ClientOrderId::from("O-001"),
2177            OrderSide::Buy,
2178            Quantity::from("1.0"),
2179            Price::from("50000.0"),
2180            TimeInForce::Gtc,
2181            None,
2182            false,
2183            false,
2184            false,
2185            None,
2186            None,
2187            None,
2188            None,
2189            None,
2190            None,
2191            None,
2192            None,
2193            None,
2194            None,
2195            None,
2196            UUID4::new(),
2197            0.into(),
2198        ));
2199
2200        // Try to modify with same quantity - should fail
2201        let result =
2202            algo.modify_order_in_place(&mut order, Some(Quantity::from("1.0")), None, None);
2203
2204        assert!(result.is_err());
2205        assert!(
2206            result
2207                .unwrap_err()
2208                .to_string()
2209                .contains("no parameters differ")
2210        );
2211    }
2212
2213    #[rstest]
2214    fn test_spawned_order_denied_restores_primary_quantity() {
2215        let mut algo = create_test_algorithm();
2216        register_algorithm(&mut algo);
2217
2218        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2219        let exec_algorithm_id = algo.id();
2220
2221        let mut primary = OrderAny::Market(MarketOrder::new(
2222            TraderId::from("TRADER-001"),
2223            StrategyId::from("STRAT-001"),
2224            instrument_id,
2225            ClientOrderId::from("O-001"),
2226            OrderSide::Buy,
2227            Quantity::from("1.0"),
2228            TimeInForce::Gtc,
2229            UUID4::new(),
2230            0.into(),
2231            false,
2232            false,
2233            None,
2234            None,
2235            None,
2236            None,
2237            Some(exec_algorithm_id),
2238            None,
2239            None,
2240            None,
2241        ));
2242
2243        {
2244            let cache_rc = algo.core.cache_rc();
2245            let mut cache = cache_rc.borrow_mut();
2246            cache.add_order(primary.clone(), None, None, false).unwrap();
2247        }
2248
2249        let spawned = algo.spawn_market(
2250            &mut primary,
2251            Quantity::from("0.5"),
2252            TimeInForce::Fok,
2253            false,
2254            None,
2255            true,
2256        );
2257
2258        assert_eq!(primary.quantity(), Quantity::from("0.5"));
2259
2260        let spawned_order = OrderAny::Market(spawned);
2261        {
2262            let cache_rc = algo.core.cache_rc();
2263            let mut cache = cache_rc.borrow_mut();
2264            cache
2265                .add_order(spawned_order.clone(), None, None, false)
2266                .unwrap();
2267        }
2268
2269        let denied = OrderDeniedSpec::builder()
2270            .trader_id(spawned_order.trader_id())
2271            .strategy_id(spawned_order.strategy_id())
2272            .instrument_id(spawned_order.instrument_id())
2273            .client_order_id(spawned_order.client_order_id())
2274            .reason("TEST_DENIAL".into())
2275            .build();
2276
2277        {
2278            let cache_rc = algo.core.cache_rc();
2279            let mut cache = cache_rc.borrow_mut();
2280            cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2281        }
2282
2283        algo.handle_order_event(OrderEventAny::Denied(denied));
2284
2285        let restored_primary = {
2286            let cache = algo.core.cache();
2287            cache
2288                .order(&ClientOrderId::from("O-001"))
2289                .map(|o| o.clone())
2290                .unwrap()
2291        };
2292        assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2293    }
2294
2295    #[rstest]
2296    fn test_spawned_order_rejected_restores_primary_quantity() {
2297        let mut algo = create_test_algorithm();
2298        register_algorithm(&mut algo);
2299
2300        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2301        let exec_algorithm_id = algo.id();
2302
2303        let mut primary = OrderAny::Market(MarketOrder::new(
2304            TraderId::from("TRADER-001"),
2305            StrategyId::from("STRAT-001"),
2306            instrument_id,
2307            ClientOrderId::from("O-001"),
2308            OrderSide::Buy,
2309            Quantity::from("1.0"),
2310            TimeInForce::Gtc,
2311            UUID4::new(),
2312            0.into(),
2313            false,
2314            false,
2315            None,
2316            None,
2317            None,
2318            None,
2319            Some(exec_algorithm_id),
2320            None,
2321            None,
2322            None,
2323        ));
2324
2325        {
2326            let cache_rc = algo.core.cache_rc();
2327            let mut cache = cache_rc.borrow_mut();
2328            cache.add_order(primary.clone(), None, None, false).unwrap();
2329        }
2330
2331        let spawned = algo.spawn_market(
2332            &mut primary,
2333            Quantity::from("0.5"),
2334            TimeInForce::Fok,
2335            false,
2336            None,
2337            true,
2338        );
2339
2340        assert_eq!(primary.quantity(), Quantity::from("0.5"));
2341
2342        let spawned_order = OrderAny::Market(spawned);
2343        {
2344            let cache_rc = algo.core.cache_rc();
2345            let mut cache = cache_rc.borrow_mut();
2346            cache
2347                .add_order(spawned_order.clone(), None, None, false)
2348                .unwrap();
2349        }
2350
2351        let rejected = OrderRejectedSpec::builder()
2352            .trader_id(spawned_order.trader_id())
2353            .strategy_id(spawned_order.strategy_id())
2354            .instrument_id(spawned_order.instrument_id())
2355            .client_order_id(spawned_order.client_order_id())
2356            .account_id(AccountId::from("BINANCE-001"))
2357            .reason("TEST_REJECTION".into())
2358            .build();
2359
2360        {
2361            let cache_rc = algo.core.cache_rc();
2362            let mut cache = cache_rc.borrow_mut();
2363            cache
2364                .update_order(&OrderEventAny::Rejected(rejected))
2365                .unwrap();
2366        }
2367
2368        algo.handle_order_event(OrderEventAny::Rejected(rejected));
2369
2370        let restored_primary = {
2371            let cache = algo.core.cache();
2372            cache
2373                .order(&ClientOrderId::from("O-001"))
2374                .map(|o| o.clone())
2375                .unwrap()
2376        };
2377        assert_eq!(restored_primary.quantity(), Quantity::from("1.0"));
2378    }
2379
2380    #[rstest]
2381    fn test_spawned_order_with_reduce_primary_false_does_not_restore() {
2382        let mut algo = create_test_algorithm();
2383        register_algorithm(&mut algo);
2384
2385        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2386        let exec_algorithm_id = algo.id();
2387
2388        let mut primary = OrderAny::Market(MarketOrder::new(
2389            TraderId::from("TRADER-001"),
2390            StrategyId::from("STRAT-001"),
2391            instrument_id,
2392            ClientOrderId::from("O-001"),
2393            OrderSide::Buy,
2394            Quantity::from("1.0"),
2395            TimeInForce::Gtc,
2396            UUID4::new(),
2397            0.into(),
2398            false,
2399            false,
2400            None,
2401            None,
2402            None,
2403            None,
2404            Some(exec_algorithm_id),
2405            None,
2406            None,
2407            None,
2408        ));
2409
2410        {
2411            let cache_rc = algo.core.cache_rc();
2412            let mut cache = cache_rc.borrow_mut();
2413            cache.add_order(primary.clone(), None, None, false).unwrap();
2414        }
2415
2416        let spawned = algo.spawn_market(
2417            &mut primary,
2418            Quantity::from("0.5"),
2419            TimeInForce::Fok,
2420            false,
2421            None,
2422            false,
2423        );
2424
2425        assert_eq!(primary.quantity(), Quantity::from("1.0"));
2426
2427        let spawned_order = OrderAny::Market(spawned);
2428        {
2429            let cache_rc = algo.core.cache_rc();
2430            let mut cache = cache_rc.borrow_mut();
2431            cache
2432                .add_order(spawned_order.clone(), None, None, false)
2433                .unwrap();
2434        }
2435
2436        let denied = OrderDeniedSpec::builder()
2437            .trader_id(spawned_order.trader_id())
2438            .strategy_id(spawned_order.strategy_id())
2439            .instrument_id(spawned_order.instrument_id())
2440            .client_order_id(spawned_order.client_order_id())
2441            .reason("TEST_DENIAL".into())
2442            .build();
2443
2444        {
2445            let cache_rc = algo.core.cache_rc();
2446            let mut cache = cache_rc.borrow_mut();
2447            cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2448        }
2449
2450        algo.handle_order_event(OrderEventAny::Denied(denied));
2451
2452        let final_primary = {
2453            let cache = algo.core.cache();
2454            cache
2455                .order(&ClientOrderId::from("O-001"))
2456                .map(|o| o.clone())
2457                .unwrap()
2458        };
2459        assert_eq!(final_primary.quantity(), Quantity::from("1.0"));
2460    }
2461
2462    #[rstest]
2463    fn test_multiple_spawns_with_one_denied_restores_correctly() {
2464        let mut algo = create_test_algorithm();
2465        register_algorithm(&mut algo);
2466
2467        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2468        let exec_algorithm_id = algo.id();
2469
2470        let mut primary = OrderAny::Market(MarketOrder::new(
2471            TraderId::from("TRADER-001"),
2472            StrategyId::from("STRAT-001"),
2473            instrument_id,
2474            ClientOrderId::from("O-001"),
2475            OrderSide::Buy,
2476            Quantity::from("1.0"),
2477            TimeInForce::Gtc,
2478            UUID4::new(),
2479            0.into(),
2480            false,
2481            false,
2482            None,
2483            None,
2484            None,
2485            None,
2486            Some(exec_algorithm_id),
2487            None,
2488            None,
2489            None,
2490        ));
2491
2492        {
2493            let cache_rc = algo.core.cache_rc();
2494            let mut cache = cache_rc.borrow_mut();
2495            cache.add_order(primary.clone(), None, None, false).unwrap();
2496        }
2497
2498        let spawned1 = algo.spawn_market(
2499            &mut primary,
2500            Quantity::from("0.3"),
2501            TimeInForce::Fok,
2502            false,
2503            None,
2504            true,
2505        );
2506        let spawned2 = algo.spawn_market(
2507            &mut primary,
2508            Quantity::from("0.4"),
2509            TimeInForce::Fok,
2510            false,
2511            None,
2512            true,
2513        );
2514        assert_eq!(primary.quantity(), Quantity::from("0.3"));
2515
2516        let spawned_order1 = OrderAny::Market(spawned1);
2517        let spawned_order2 = OrderAny::Market(spawned2);
2518        {
2519            let cache_rc = algo.core.cache_rc();
2520            let mut cache = cache_rc.borrow_mut();
2521            cache.add_order(spawned_order1, None, None, false).unwrap();
2522            cache
2523                .add_order(spawned_order2.clone(), None, None, false)
2524                .unwrap();
2525        }
2526
2527        let denied = OrderDeniedSpec::builder()
2528            .trader_id(spawned_order2.trader_id())
2529            .strategy_id(spawned_order2.strategy_id())
2530            .instrument_id(spawned_order2.instrument_id())
2531            .client_order_id(spawned_order2.client_order_id())
2532            .reason("TEST_DENIAL".into())
2533            .build();
2534
2535        {
2536            let cache_rc = algo.core.cache_rc();
2537            let mut cache = cache_rc.borrow_mut();
2538            cache.update_order(&OrderEventAny::Denied(denied)).unwrap();
2539        }
2540
2541        let (handler, events) = subscribe_order_topic(spawned_order2.strategy_id());
2542
2543        algo.handle_order_event(OrderEventAny::Denied(denied));
2544
2545        msgbus::unsubscribe_order_events(
2546            format!("events.order.{}", spawned_order2.strategy_id()).into(),
2547            &handler,
2548        );
2549        let events = events.borrow();
2550
2551        let restored_primary = {
2552            let cache = algo.core.cache();
2553            cache
2554                .order(&ClientOrderId::from("O-001"))
2555                .map(|o| o.clone())
2556                .unwrap()
2557        };
2558        assert_eq!(restored_primary.quantity(), Quantity::from("0.7"));
2559        assert_eq!(events.len(), 1);
2560        assert!(matches!(
2561            &events[0],
2562            OrderEventAny::Updated(event) if event.quantity == Quantity::from("0.7")
2563        ));
2564    }
2565
2566    #[rstest]
2567    fn test_spawned_order_accepted_prevents_restoration() {
2568        let mut algo = create_test_algorithm();
2569        register_algorithm(&mut algo);
2570
2571        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2572        let exec_algorithm_id = algo.id();
2573
2574        let mut primary = OrderAny::Market(MarketOrder::new(
2575            TraderId::from("TRADER-001"),
2576            StrategyId::from("STRAT-001"),
2577            instrument_id,
2578            ClientOrderId::from("O-001"),
2579            OrderSide::Buy,
2580            Quantity::from("1.0"),
2581            TimeInForce::Gtc,
2582            UUID4::new(),
2583            0.into(),
2584            false,
2585            false,
2586            None,
2587            None,
2588            None,
2589            None,
2590            Some(exec_algorithm_id),
2591            None,
2592            None,
2593            None,
2594        ));
2595
2596        {
2597            let cache_rc = algo.core.cache_rc();
2598            let mut cache = cache_rc.borrow_mut();
2599            cache.add_order(primary.clone(), None, None, false).unwrap();
2600        }
2601
2602        let spawned = algo.spawn_market(
2603            &mut primary,
2604            Quantity::from("0.5"),
2605            TimeInForce::Fok,
2606            false,
2607            None,
2608            true,
2609        );
2610
2611        assert_eq!(primary.quantity(), Quantity::from("0.5"));
2612
2613        let mut spawned_order = OrderAny::Market(spawned);
2614        {
2615            let cache_rc = algo.core.cache_rc();
2616            let mut cache = cache_rc.borrow_mut();
2617            cache
2618                .add_order(spawned_order.clone(), None, None, false)
2619                .unwrap();
2620        }
2621
2622        let accepted = OrderAcceptedSpec::builder()
2623            .trader_id(spawned_order.trader_id())
2624            .strategy_id(spawned_order.strategy_id())
2625            .instrument_id(spawned_order.instrument_id())
2626            .client_order_id(spawned_order.client_order_id())
2627            .venue_order_id(VenueOrderId::from("V-123"))
2628            .account_id(AccountId::from("BINANCE-001"))
2629            .build();
2630
2631        {
2632            let cache_rc = algo.core.cache_rc();
2633            let mut cache = cache_rc.borrow_mut();
2634            spawned_order = cache
2635                .update_order(&OrderEventAny::Accepted(accepted))
2636                .unwrap();
2637        }
2638
2639        algo.handle_order_event(OrderEventAny::Accepted(accepted));
2640
2641        let primary_after_accept = {
2642            let cache = algo.core.cache();
2643            cache
2644                .order(&ClientOrderId::from("O-001"))
2645                .map(|o| o.clone())
2646                .unwrap()
2647        };
2648        assert_eq!(primary_after_accept.quantity(), Quantity::from("0.5"));
2649
2650        // Cancel after acceptance - no restoration should occur
2651        let canceled = OrderCanceledSpec::builder()
2652            .trader_id(spawned_order.trader_id())
2653            .strategy_id(spawned_order.strategy_id())
2654            .instrument_id(spawned_order.instrument_id())
2655            .client_order_id(spawned_order.client_order_id())
2656            .venue_order_id(VenueOrderId::from("V-123"))
2657            .account_id(AccountId::from("BINANCE-001"))
2658            .build();
2659
2660        {
2661            let cache_rc = algo.core.cache_rc();
2662            let mut cache = cache_rc.borrow_mut();
2663            cache
2664                .update_order(&OrderEventAny::Canceled(canceled))
2665                .unwrap();
2666        }
2667
2668        algo.handle_order_event(OrderEventAny::Canceled(canceled));
2669
2670        let final_primary = {
2671            let cache = algo.core.cache();
2672            cache
2673                .order(&ClientOrderId::from("O-001"))
2674                .map(|o| o.clone())
2675                .unwrap()
2676        };
2677        assert_eq!(final_primary.quantity(), Quantity::from("0.5"));
2678    }
2679
2680    #[rstest]
2681    #[should_panic(expected = "exceeds primary leaves_qty")]
2682    fn test_spawn_quantity_exceeds_leaves_qty_panics() {
2683        let mut algo = create_test_algorithm();
2684        register_algorithm(&mut algo);
2685
2686        let instrument_id = InstrumentId::from("BTC/USDT.BINANCE");
2687        let exec_algorithm_id = algo.id();
2688
2689        let mut primary = OrderAny::Market(MarketOrder::new(
2690            TraderId::from("TRADER-001"),
2691            StrategyId::from("STRAT-001"),
2692            instrument_id,
2693            ClientOrderId::from("O-001"),
2694            OrderSide::Buy,
2695            Quantity::from("1.0"),
2696            TimeInForce::Gtc,
2697            UUID4::new(),
2698            0.into(),
2699            false,
2700            false,
2701            None,
2702            None,
2703            None,
2704            None,
2705            Some(exec_algorithm_id),
2706            None,
2707            None,
2708            None,
2709        ));
2710
2711        {
2712            let cache_rc = algo.core.cache_rc();
2713            let mut cache = cache_rc.borrow_mut();
2714            cache.add_order(primary.clone(), None, None, false).unwrap();
2715        }
2716
2717        let _ = algo.spawn_market(
2718            &mut primary,
2719            Quantity::from("0.8"),
2720            TimeInForce::Fok,
2721            false,
2722            None,
2723            true,
2724        );
2725
2726        assert_eq!(primary.quantity(), Quantity::from("0.2"));
2727        assert_eq!(primary.leaves_qty(), Quantity::from("0.2"));
2728
2729        // Should panic - spawning 0.5 when only 0.2 leaves_qty remains
2730        let _ = algo.spawn_market(
2731            &mut primary,
2732            Quantity::from("0.5"),
2733            TimeInForce::Fok,
2734            false,
2735            None,
2736            true,
2737        );
2738    }
2739}