Skip to main content

nautilus_execution/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a generic `ExecutionEngine` for all environments.
17//!
18//! The execution engines primary responsibility is to orchestrate interactions
19//! between the `ExecutionClient` instances, and the rest of the platform. This
20//! includes sending commands to, and receiving events from, the trading venue
21//! endpoints via its registered execution clients.
22
23pub mod config;
24pub mod stubs;
25
26use std::{
27    cell::{Cell, RefCell, RefMut},
28    collections::{HashMap, HashSet},
29    fmt::{Debug, Display},
30    rc::Rc,
31    time::SystemTime,
32};
33
34use ahash::AHashSet;
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use indexmap::{IndexMap, IndexSet};
38use nautilus_common::{
39    cache::{Cache, CacheSnapshotRef, PositionRef},
40    clients::ExecutionClient,
41    clock::Clock,
42    generators::position_id::PositionIdGenerator,
43    logging::{CMD, EVT, RECV, SEND},
44    messages::{
45        ExecutionReport,
46        execution::{
47            BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
48            SubmitOrder, SubmitOrderList, TradingCommand,
49        },
50    },
51    msgbus::{
52        self, MessagingSwitchboard, TypedHandler, TypedIntoHandler, get_message_bus,
53        switchboard::{self},
54    },
55    runner::try_get_trading_cmd_sender,
56    timer::{TimeEvent, TimeEventCallback},
57};
58use nautilus_core::{
59    UUID4, UnixNanos, WeakCell,
60    datetime::{mins_to_nanos, mins_to_secs},
61};
62use nautilus_model::{
63    accounts::Account,
64    enums::{
65        ContingencyType, OmsType, OrderStatus, OrderType, PositionSide, TimeInForce,
66        TrailingOffsetType,
67    },
68    events::{
69        OrderAccepted, OrderCanceled, OrderDenied, OrderEvent, OrderEventAny, OrderExpired,
70        OrderFilled, OrderInitialized, PositionChanged, PositionClosed, PositionEvent,
71        PositionOpened,
72    },
73    identifiers::{
74        AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, Venue,
75        VenueOrderId,
76    },
77    instruments::{Instrument, InstrumentAny},
78    orderbook::own::{OwnBookOrder, OwnOrderBook, should_handle_own_book_order},
79    orders::{Order, OrderAny, OrderError},
80    position::Position,
81    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
82    types::{Money, Quantity},
83};
84use rust_decimal::Decimal;
85
86use crate::{
87    client::ExecutionClientAdapter,
88    reconciliation::{
89        check_position_reconciliation, create_incremental_inferred_fill,
90        generate_external_order_status_events, generate_reconciliation_order_events,
91        reconcile_fill_report as reconcile_fill,
92    },
93};
94
95const TIMER_PURGE_CLOSED_ORDERS: &str = "ExecEngine_PURGE_CLOSED_ORDERS";
96const TIMER_PURGE_CLOSED_POSITIONS: &str = "ExecEngine_PURGE_CLOSED_POSITIONS";
97const TIMER_PURGE_ACCOUNT_EVENTS: &str = "ExecEngine_PURGE_ACCOUNT_EVENTS";
98
99/// Callback that anchors cache snapshot metadata in an external store.
100pub type SnapshotAnchorer = Rc<dyn Fn(CacheSnapshotRef) -> anyhow::Result<()>>;
101
102/// Central execution engine responsible for orchestrating order routing and execution.
103///
104/// The execution engine manages the entire order lifecycle from submission to completion,
105/// handling routing to appropriate execution clients, position management, and event
106/// processing. It supports multiple execution venues through registered clients and
107/// provides sophisticated order management capabilities.
108pub struct ExecutionEngine {
109    clock: Rc<RefCell<dyn Clock>>,
110    cache: Rc<RefCell<Cache>>,
111    clients: IndexMap<ClientId, ExecutionClientAdapter>,
112    default_client: Option<ExecutionClientAdapter>,
113    routing_map: HashMap<Venue, ClientId>,
114    oms_overrides: HashMap<StrategyId, OmsType>,
115    external_order_claims: HashMap<InstrumentId, StrategyId>,
116    external_clients: HashSet<ClientId>,
117    pos_id_generator: PositionIdGenerator,
118    config: ExecutionEngineConfig,
119    command_count: Cell<u64>,
120    event_count: u64,
121    report_count: u64,
122    filtered_unclaimed_external_order_count: u64,
123    snapshot_anchorer: Option<SnapshotAnchorer>,
124}
125
126impl Debug for ExecutionEngine {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        f.debug_struct(stringify!(ExecutionEngine))
129            .field("client_count", &self.clients.len())
130            .finish()
131    }
132}
133
134impl ExecutionEngine {
135    /// Creates a new [`ExecutionEngine`] instance.
136    pub fn new(
137        clock: Rc<RefCell<dyn Clock>>,
138        cache: Rc<RefCell<Cache>>,
139        config: Option<ExecutionEngineConfig>,
140    ) -> Self {
141        let trader_id = get_message_bus().borrow().trader_id;
142        Self {
143            clock: clock.clone(),
144            cache,
145            clients: IndexMap::new(),
146            default_client: None,
147            routing_map: HashMap::new(),
148            oms_overrides: HashMap::new(),
149            external_order_claims: HashMap::new(),
150            external_clients: config
151                .as_ref()
152                .and_then(|c| c.external_clients.clone())
153                .unwrap_or_default()
154                .into_iter()
155                .collect(),
156            pos_id_generator: PositionIdGenerator::new(trader_id, clock),
157            config: config.unwrap_or_default(),
158            command_count: Cell::new(0),
159            event_count: 0,
160            report_count: 0,
161            filtered_unclaimed_external_order_count: 0,
162            snapshot_anchorer: None,
163        }
164    }
165
166    /// Registers all message bus handlers for the execution engine.
167    pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
168        let weak = WeakCell::from(Rc::downgrade(engine));
169
170        let weak1 = weak.clone();
171        msgbus::register_trading_command_endpoint(
172            MessagingSwitchboard::exec_engine_execute(),
173            TypedIntoHandler::from(move |cmd: TradingCommand| {
174                if let Some(rc) = weak1.upgrade() {
175                    rc.borrow().execute(cmd);
176                }
177            }),
178        );
179
180        // Queued endpoint for deferred command execution (re-entrancy safe),
181        // falls back to direct endpoint if no sender is initialized (e.g., backtest/test).
182        msgbus::register_trading_command_endpoint(
183            MessagingSwitchboard::exec_engine_queue_execute(),
184            TypedIntoHandler::from(move |cmd: TradingCommand| {
185                if let Some(sender) = try_get_trading_cmd_sender() {
186                    sender.execute(cmd);
187                } else {
188                    let endpoint = MessagingSwitchboard::exec_engine_execute();
189                    msgbus::send_trading_command(endpoint, cmd);
190                }
191            }),
192        );
193
194        let weak2 = weak.clone();
195        msgbus::register_order_event_endpoint(
196            MessagingSwitchboard::exec_engine_process(),
197            TypedIntoHandler::from(move |event: OrderEventAny| {
198                if let Some(rc) = weak2.upgrade() {
199                    rc.borrow_mut().process(&event);
200                }
201            }),
202        );
203
204        let weak3 = weak;
205        msgbus::register_execution_report_endpoint(
206            MessagingSwitchboard::exec_engine_reconcile_execution_report(),
207            TypedIntoHandler::from(move |report: ExecutionReport| {
208                if let Some(rc) = weak3.upgrade() {
209                    rc.borrow_mut().reconcile_execution_report(&report);
210                }
211            }),
212        );
213    }
214
215    /// Returns the total count of trading commands received by the engine.
216    #[must_use]
217    pub fn command_count(&self) -> u64 {
218        self.command_count.get()
219    }
220
221    /// Returns the total count of order events received by the engine.
222    #[must_use]
223    pub const fn event_count(&self) -> u64 {
224        self.event_count
225    }
226
227    /// Returns the total count of execution reports received by the engine.
228    #[must_use]
229    pub const fn report_count(&self) -> u64 {
230        self.report_count
231    }
232
233    /// Returns the count of unclaimed external venue orders filtered by execution reconciliation.
234    #[must_use]
235    pub const fn filtered_unclaimed_external_order_count(&self) -> u64 {
236        self.filtered_unclaimed_external_order_count
237    }
238
239    /// Subscribes to instrument updates for a venue via the message bus.
240    ///
241    /// When instruments are published by the `DataEngine`, the handler routes
242    /// them to the execution client registered for that venue.
243    pub fn subscribe_venue_instruments(engine: &Rc<RefCell<Self>>, venue: Venue) {
244        let weak = WeakCell::from(Rc::downgrade(engine));
245        let pattern = switchboard::get_instruments_pattern(venue);
246
247        let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
248            if let Some(rc) = weak.upgrade() {
249                let venue = instrument.id().venue;
250                let client_id = rc.borrow().routing_map.get(&venue).copied();
251                if let Some(client_id) = client_id {
252                    let mut engine = rc.borrow_mut();
253                    if let Some(adapter) = engine.get_client_adapter_mut(&client_id) {
254                        adapter.on_instrument(instrument.clone());
255                    }
256                }
257            }
258        });
259
260        msgbus::subscribe_instruments(pattern, handler, None);
261        log::info!("Subscribed to instrument updates for venue {venue}");
262    }
263
264    #[must_use]
265    /// Returns the position ID count for the specified strategy.
266    pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
267        self.pos_id_generator.count(strategy_id)
268    }
269
270    #[must_use]
271    /// Returns a reference to the cache.
272    pub fn cache(&self) -> &Rc<RefCell<Cache>> {
273        &self.cache
274    }
275
276    #[must_use]
277    /// Returns a reference to the configuration.
278    pub const fn config(&self) -> &ExecutionEngineConfig {
279        &self.config
280    }
281
282    /// Sets the cache snapshot anchorer.
283    ///
284    /// The system event-store integration installs this while a run is open. Passing
285    /// `None` disables anchor recording for later cache snapshots.
286    pub fn set_snapshot_anchorer(&mut self, anchorer: Option<SnapshotAnchorer>) {
287        self.snapshot_anchorer = anchorer;
288    }
289
290    #[must_use]
291    /// Checks the integrity of cached execution data.
292    pub fn check_integrity(&self) -> bool {
293        self.cache.borrow_mut().check_integrity()
294    }
295
296    #[must_use]
297    /// Returns true if all registered execution clients are connected.
298    pub fn check_connected(&self) -> bool {
299        let clients_connected = self.clients.values().all(|c| c.is_connected());
300        let default_connected = self
301            .default_client
302            .as_ref()
303            .is_none_or(|c| c.is_connected());
304        clients_connected && default_connected
305    }
306
307    #[must_use]
308    /// Returns true if all registered execution clients are disconnected.
309    pub fn check_disconnected(&self) -> bool {
310        let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
311        let default_disconnected = self
312            .default_client
313            .as_ref()
314            .is_none_or(|c| !c.is_connected());
315        clients_disconnected && default_disconnected
316    }
317
318    /// Returns connection status for each registered client.
319    #[must_use]
320    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
321        let mut status: Vec<_> = self
322            .clients
323            .values()
324            .map(|c| (c.client_id(), c.is_connected()))
325            .collect();
326
327        if let Some(default) = &self.default_client {
328            status.push((default.client_id(), default.is_connected()));
329        }
330
331        status
332    }
333
334    #[must_use]
335    /// Checks for residual positions and orders in the cache.
336    pub fn check_residuals(&self) -> bool {
337        self.cache.borrow().check_residuals()
338    }
339
340    #[must_use]
341    /// Returns the set of instruments that have external order claims.
342    pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
343        self.external_order_claims.keys().copied().collect()
344    }
345
346    #[must_use]
347    /// Returns the configured external client IDs.
348    pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
349        self.external_clients.clone()
350    }
351
352    #[must_use]
353    /// Returns any external order claim for the given instrument ID.
354    pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
355        self.external_order_claims.get(instrument_id).copied()
356    }
357
358    /// Registers a new execution client.
359    ///
360    /// # Errors
361    ///
362    /// Returns an error if a client with the same ID is already registered.
363    pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
364        let client_id = client.client_id();
365        let venue = client.venue();
366
367        if self.clients.contains_key(&client_id) {
368            anyhow::bail!("Client already registered with ID {client_id}");
369        }
370
371        let adapter = ExecutionClientAdapter::new(client);
372
373        if let Some(existing_client_id) = self.routing_map.get(&venue) {
374            anyhow::bail!(
375                "Venue {venue} already routed to {existing_client_id}, \
376                 cannot register {client_id} for the same venue"
377            );
378        }
379
380        self.routing_map.insert(venue, client_id);
381        log::debug!("Registered client {client_id}");
382        self.clients.insert(client_id, adapter);
383        Ok(())
384    }
385
386    /// Registers a default execution client for fallback routing.
387    pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
388        let client_id = client.client_id();
389        let adapter = ExecutionClientAdapter::new(client);
390
391        log::debug!("Registered default client {client_id}");
392        self.default_client = Some(adapter);
393    }
394
395    #[must_use]
396    /// Returns a reference to the execution client registered with the given ID.
397    pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
398        self.clients.get(client_id).map(|a| a.client.as_ref())
399    }
400
401    #[must_use]
402    /// Returns a mutable reference to the execution client adapter registered with the given ID.
403    pub fn get_client_adapter_mut(
404        &mut self,
405        client_id: &ClientId,
406    ) -> Option<&mut ExecutionClientAdapter> {
407        if let Some(default) = &self.default_client
408            && &default.client_id == client_id
409        {
410            return self.default_client.as_mut();
411        }
412        self.clients.get_mut(client_id)
413    }
414
415    /// Generates mass status for the given client.
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the client is not found or mass status generation fails.
420    pub async fn generate_mass_status(
421        &mut self,
422        client_id: &ClientId,
423        lookback_mins: Option<u64>,
424    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
425        if let Some(client) = self.get_client_adapter_mut(client_id) {
426            client.generate_mass_status(lookback_mins).await
427        } else {
428            anyhow::bail!("Client {client_id} not found")
429        }
430    }
431
432    /// Registers an external order with the execution client for tracking.
433    ///
434    /// This is called after reconciliation creates an external order, allowing the
435    /// execution client to track it for subsequent events (e.g., cancellations).
436    pub fn register_external_order(
437        &self,
438        client_order_id: ClientOrderId,
439        venue_order_id: VenueOrderId,
440        instrument_id: InstrumentId,
441        strategy_id: StrategyId,
442        ts_init: UnixNanos,
443    ) {
444        let venue = instrument_id.venue;
445        if let Some(client_id) = self.routing_map.get(&venue) {
446            if let Some(client) = self.clients.get(client_id) {
447                client.register_external_order(
448                    client_order_id,
449                    venue_order_id,
450                    instrument_id,
451                    strategy_id,
452                    ts_init,
453                );
454            }
455        } else if let Some(default) = &self.default_client {
456            default.register_external_order(
457                client_order_id,
458                venue_order_id,
459                instrument_id,
460                strategy_id,
461                ts_init,
462            );
463        }
464    }
465
466    #[must_use]
467    /// Returns all registered execution client IDs.
468    pub fn client_ids(&self) -> Vec<ClientId> {
469        let mut ids: Vec<_> = self.clients.keys().copied().collect();
470
471        if let Some(default) = &self.default_client {
472            ids.push(default.client_id);
473        }
474        ids
475    }
476
477    #[must_use]
478    /// Returns mutable access to all registered execution clients.
479    pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
480        let mut adapters: Vec<_> = self.clients.values_mut().collect();
481
482        if let Some(default) = &mut self.default_client {
483            adapters.push(default);
484        }
485        adapters
486    }
487
488    /// Returns all registered execution clients.
489    #[must_use]
490    pub fn get_all_clients(&self) -> Vec<&dyn ExecutionClient> {
491        let mut clients: Vec<&dyn ExecutionClient> =
492            self.clients.values().map(|a| a.client.as_ref()).collect();
493
494        if let Some(default) = &self.default_client {
495            clients.push(default.client.as_ref());
496        }
497
498        clients
499    }
500
501    #[must_use]
502    /// Returns execution clients that would handle the given orders.
503    ///
504    /// This method first attempts to resolve each order's originating client from the cache,
505    /// then falls back to venue routing for any orders without a cached client.
506    pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
507        let mut client_ids: IndexSet<ClientId> = IndexSet::new();
508        let mut venues: IndexSet<Venue> = IndexSet::new();
509
510        // Collect client IDs from cache and venues for fallback
511        for order in orders {
512            venues.insert(order.instrument_id().venue);
513            if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
514                client_ids.insert(*client_id);
515            }
516        }
517
518        let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
519
520        // Add clients for cached client IDs (orders go back to originating client)
521        for client_id in &client_ids {
522            if let Some(adapter) = self.clients.get(client_id)
523                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
524            {
525                clients.push(adapter.client.as_ref());
526            }
527        }
528
529        // Add clients for venue routing (for orders not in cache)
530        for venue in &venues {
531            if let Some(client_id) = self.routing_map.get(venue) {
532                if let Some(adapter) = self.clients.get(client_id)
533                    && !clients.iter().any(|c| c.client_id() == adapter.client_id)
534                {
535                    clients.push(adapter.client.as_ref());
536                }
537            } else if let Some(adapter) = &self.default_client
538                && !clients.iter().any(|c| c.client_id() == adapter.client_id)
539            {
540                clients.push(adapter.client.as_ref());
541            }
542        }
543
544        clients
545    }
546
547    /// Sets routing for a specific venue to a given client ID.
548    ///
549    /// # Errors
550    ///
551    /// Returns an error if the client ID is not registered.
552    pub fn register_venue_routing(
553        &mut self,
554        client_id: ClientId,
555        venue: Venue,
556    ) -> anyhow::Result<()> {
557        if !self.clients.contains_key(&client_id) {
558            anyhow::bail!("No client registered with ID {client_id}");
559        }
560
561        if let Some(existing_client_id) = self.routing_map.get(&venue)
562            && *existing_client_id != client_id
563        {
564            anyhow::bail!(
565                "Venue {venue} already routed to {existing_client_id}, \
566                 cannot re-route to {client_id}"
567            );
568        }
569
570        self.routing_map.insert(venue, client_id);
571        log::info!("Set client {client_id} routing for {venue}");
572        Ok(())
573    }
574
575    /// Registers the OMS (Order Management System) type for a strategy.
576    ///
577    /// If an OMS type is already registered for this strategy, it will be overridden.
578    pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
579        self.oms_overrides.insert(strategy_id, oms_type);
580        log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
581    }
582
583    /// Registers external order claims for a strategy.
584    ///
585    /// Venue-sourced external orders, fills, and materialized reconciliation activity for matching
586    /// instruments will be associated with the strategy.
587    ///
588    /// This operation is atomic: either all instruments are registered or none are.
589    ///
590    /// # Errors
591    ///
592    /// Returns an error if any instrument already has a registered claim.
593    pub fn register_external_order_claims(
594        &mut self,
595        strategy_id: StrategyId,
596        instrument_ids: &HashSet<InstrumentId>,
597    ) -> anyhow::Result<()> {
598        // Validate all instruments first
599        for instrument_id in instrument_ids {
600            if let Some(existing) = self.external_order_claims.get(instrument_id) {
601                anyhow::bail!(
602                    "External order claim for {instrument_id} already exists for {existing}"
603                );
604            }
605        }
606
607        // If validation passed, insert all claims
608        for instrument_id in instrument_ids {
609            self.external_order_claims
610                .insert(*instrument_id, strategy_id);
611        }
612
613        if !instrument_ids.is_empty() {
614            log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
615        }
616
617        Ok(())
618    }
619
620    /// # Errors
621    ///
622    /// Returns an error if no client is registered with the given ID.
623    pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
624        if self.clients.shift_remove(&client_id).is_some() {
625            // Remove from routing map if present
626            self.routing_map
627                .retain(|_, mapped_id| mapped_id != &client_id);
628            log::info!("Deregistered client {client_id}");
629            Ok(())
630        } else {
631            anyhow::bail!("No client registered with ID {client_id}")
632        }
633    }
634
635    /// Connects all registered execution clients concurrently.
636    ///
637    /// Connection failures are logged but do not prevent the node from running.
638    pub async fn connect(&mut self) {
639        let futures: Vec<_> = self
640            .get_clients_mut()
641            .into_iter()
642            .map(ExecutionClientAdapter::connect)
643            .collect();
644
645        let results = join_all(futures).await;
646
647        for error in results.into_iter().filter_map(Result::err) {
648            log::error!("Failed to connect execution client: {error:#}");
649        }
650    }
651
652    /// Disconnects all registered execution clients concurrently.
653    ///
654    /// # Errors
655    ///
656    /// Returns an error if any client fails to disconnect.
657    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
658        let futures: Vec<_> = self
659            .get_clients_mut()
660            .into_iter()
661            .map(ExecutionClientAdapter::disconnect)
662            .collect();
663
664        let results = join_all(futures).await;
665        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
666
667        if errors.is_empty() {
668            Ok(())
669        } else {
670            let error_msgs: Vec<_> = errors.iter().map(ToString::to_string).collect();
671            anyhow::bail!(
672                "Failed to disconnect execution clients: {}",
673                error_msgs.join("; ")
674            )
675        }
676    }
677
678    /// Sets the `manage_own_order_books` configuration option.
679    pub fn set_manage_own_order_books(&mut self, value: bool) {
680        self.config.manage_own_order_books = value;
681    }
682
683    /// Starts the position snapshot timer if configured.
684    ///
685    /// Timer functionality requires a live execution context with an active clock.
686    pub fn start_snapshot_timer(&mut self) {
687        if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
688            log::info!("Starting position snapshots timer at {interval_secs} second intervals");
689        }
690    }
691
692    /// Stops the position snapshot timer if running.
693    pub fn stop_snapshot_timer(&mut self) {
694        if self.config.snapshot_positions_interval_secs.is_some() {
695            log::info!("Canceling position snapshots timer");
696        }
697    }
698
699    /// Starts the purge timers if configured.
700    #[expect(
701        clippy::missing_panics_doc,
702        reason = "timer registration is not expected to fail"
703    )]
704    pub fn start_purge_timers(&mut self) {
705        if let Some(interval_mins) = self
706            .config
707            .purge_closed_orders_interval_mins
708            .filter(|&m| m > 0)
709            && !self
710                .clock
711                .borrow()
712                .timer_names()
713                .contains(&TIMER_PURGE_CLOSED_ORDERS)
714        {
715            let interval_ns = mins_to_nanos(u64::from(interval_mins));
716            let buffer_mins = self.config.purge_closed_orders_buffer_mins.unwrap_or(0);
717            let buffer_secs = mins_to_secs(u64::from(buffer_mins));
718            let cache = self.cache.clone();
719            let clock = self.clock.clone();
720
721            let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
722                let ts_now = clock.borrow().timestamp_ns();
723                cache.borrow_mut().purge_closed_orders(ts_now, buffer_secs);
724            });
725            let callback = TimeEventCallback::from(callback_fn);
726
727            log::info!("Starting purge closed orders timer at {interval_mins} minute intervals");
728            self.clock
729                .borrow_mut()
730                .set_timer_ns(
731                    TIMER_PURGE_CLOSED_ORDERS,
732                    interval_ns,
733                    None,
734                    None,
735                    Some(callback),
736                    None,
737                    None,
738                )
739                .expect("Failed to set purge closed orders timer");
740        }
741
742        if let Some(interval_mins) = self
743            .config
744            .purge_closed_positions_interval_mins
745            .filter(|&m| m > 0)
746            && !self
747                .clock
748                .borrow()
749                .timer_names()
750                .contains(&TIMER_PURGE_CLOSED_POSITIONS)
751        {
752            let interval_ns = mins_to_nanos(u64::from(interval_mins));
753            let buffer_mins = self.config.purge_closed_positions_buffer_mins.unwrap_or(0);
754            let buffer_secs = mins_to_secs(u64::from(buffer_mins));
755            let cache = self.cache.clone();
756            let clock = self.clock.clone();
757
758            let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
759                let ts_now = clock.borrow().timestamp_ns();
760                cache
761                    .borrow_mut()
762                    .purge_closed_positions(ts_now, buffer_secs);
763            });
764            let callback = TimeEventCallback::from(callback_fn);
765
766            log::info!("Starting purge closed positions timer at {interval_mins} minute intervals");
767            self.clock
768                .borrow_mut()
769                .set_timer_ns(
770                    TIMER_PURGE_CLOSED_POSITIONS,
771                    interval_ns,
772                    None,
773                    None,
774                    Some(callback),
775                    None,
776                    None,
777                )
778                .expect("Failed to set purge closed positions timer");
779        }
780
781        if let Some(interval_mins) = self
782            .config
783            .purge_account_events_interval_mins
784            .filter(|&m| m > 0)
785            && !self
786                .clock
787                .borrow()
788                .timer_names()
789                .contains(&TIMER_PURGE_ACCOUNT_EVENTS)
790        {
791            let interval_ns = mins_to_nanos(u64::from(interval_mins));
792            let lookback_mins = self.config.purge_account_events_lookback_mins.unwrap_or(0);
793            let lookback_secs = mins_to_secs(u64::from(lookback_mins));
794            let cache = self.cache.clone();
795            let clock = self.clock.clone();
796
797            let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
798                let ts_now = clock.borrow().timestamp_ns();
799                cache
800                    .borrow_mut()
801                    .purge_account_events(ts_now, lookback_secs);
802            });
803            let callback = TimeEventCallback::from(callback_fn);
804
805            log::info!("Starting purge account events timer at {interval_mins} minute intervals");
806            self.clock
807                .borrow_mut()
808                .set_timer_ns(
809                    TIMER_PURGE_ACCOUNT_EVENTS,
810                    interval_ns,
811                    None,
812                    None,
813                    Some(callback),
814                    None,
815                    None,
816                )
817                .expect("Failed to set purge account events timer");
818        }
819    }
820
821    /// Stops the purge timers if running.
822    pub fn stop_purge_timers(&mut self) {
823        let timer_names: Vec<String> = self
824            .clock
825            .borrow()
826            .timer_names()
827            .into_iter()
828            .map(String::from)
829            .collect();
830
831        if timer_names.iter().any(|n| n == TIMER_PURGE_CLOSED_ORDERS) {
832            log::info!("Canceling purge closed orders timer");
833            self.clock
834                .borrow_mut()
835                .cancel_timer(TIMER_PURGE_CLOSED_ORDERS);
836        }
837
838        if timer_names
839            .iter()
840            .any(|n| n == TIMER_PURGE_CLOSED_POSITIONS)
841        {
842            log::info!("Canceling purge closed positions timer");
843            self.clock
844                .borrow_mut()
845                .cancel_timer(TIMER_PURGE_CLOSED_POSITIONS);
846        }
847
848        if timer_names.iter().any(|n| n == TIMER_PURGE_ACCOUNT_EVENTS) {
849            log::info!("Canceling purge account events timer");
850            self.clock
851                .borrow_mut()
852                .cancel_timer(TIMER_PURGE_ACCOUNT_EVENTS);
853        }
854    }
855
856    /// Creates snapshots of all open positions.
857    pub fn snapshot_open_position_states(&self) {
858        let positions: Vec<Position> = self
859            .cache
860            .borrow()
861            .positions_open(None, None, None, None, None)
862            .into_iter()
863            .map(|p| p.cloned())
864            .collect();
865
866        for position in positions {
867            self.create_position_state_snapshot(&position);
868        }
869    }
870
871    #[expect(clippy::await_holding_refcell_ref)]
872    /// Loads persistent state into cache and rebuilds indices.
873    ///
874    /// # Errors
875    ///
876    /// Returns an error if any cache operation fails.
877    pub async fn load_cache(&mut self) -> anyhow::Result<()> {
878        let ts = SystemTime::now(); // dst-ok: init-time log timing, not on DST state path
879
880        {
881            let mut cache = self.cache.borrow_mut();
882            cache.clear_index();
883            cache.cache_general()?;
884        }
885
886        self.cache.borrow_mut().cache_all().await?;
887
888        // Snapshot before iterating: `get_or_init_own_order_book` re-enters `self.cache.borrow_mut()`.
889        let own_book_entries: Vec<(InstrumentId, OwnBookOrder)> = {
890            let mut cache = self.cache.borrow_mut();
891            cache.build_index();
892            let _ = cache.check_integrity();
893
894            if self.config.manage_own_order_books {
895                cache
896                    .orders(None, None, None, None, None)
897                    .into_iter()
898                    .filter(|o| !o.is_closed() && should_handle_own_book_order(o))
899                    .map(|o| (o.instrument_id(), o.to_own_book_order()))
900                    .collect()
901            } else {
902                Vec::new()
903            }
904        };
905
906        for (instrument_id, own_order) in own_book_entries {
907            let mut own_book = self.get_or_init_own_order_book(&instrument_id);
908            own_book.add(own_order);
909        }
910
911        self.set_position_id_counts();
912
913        log::info!(
914            "Loaded cache in {}ms",
915            SystemTime::now() // dst-ok: init-time log timing, not on DST state path
916                .duration_since(ts)
917                .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
918                .as_millis()
919        );
920
921        Ok(())
922    }
923
924    /// Flushes the database to persist all cached data.
925    pub fn flush_db(&self) {
926        self.cache.borrow_mut().flush_db();
927    }
928
929    /// Reconciles an execution report.
930    pub fn reconcile_execution_report(&mut self, report: &ExecutionReport) {
931        if !matches!(report, ExecutionReport::MassStatus(_)) {
932            self.report_count += 1;
933        }
934
935        match report {
936            ExecutionReport::Order(order_report) => {
937                self.reconcile_order_status_report(order_report);
938            }
939            ExecutionReport::Fill(fill_report) => {
940                self.reconcile_fill_report(fill_report);
941            }
942            ExecutionReport::OrderWithFills(order_report, fills) => {
943                self.reconcile_order_with_fills(order_report, fills);
944            }
945            ExecutionReport::Position(position_report) => {
946                self.reconcile_position_report(position_report);
947            }
948            ExecutionReport::MassStatus(mass_status) => {
949                self.reconcile_execution_mass_status(mass_status);
950            }
951        }
952    }
953
954    /// Reconciles an order status report received at runtime.
955    ///
956    /// Handles order status transitions by generating appropriate events when the venue
957    /// reports a different status than our local state. Supports all order states including
958    /// fills with inferred fill generation when instruments are available.
959    ///
960    /// When the order is not found in cache, creates an external order from the report.
961    /// This handles exchange-generated orders (liquidation, ADL, settlement) that were
962    /// not submitted locally.
963    pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
964        msgbus::publish_any(
965            MessagingSwitchboard::reconciliation_raw_order_status_report_topic(),
966            report,
967        );
968
969        let cache = self.cache.borrow();
970
971        let order = report
972            .client_order_id
973            .and_then(|id| cache.order(&id).map(|o| o.clone()))
974            .or_else(|| {
975                cache
976                    .client_order_id(&report.venue_order_id)
977                    .and_then(|cid| cache.order(cid).map(|o| o.clone()))
978            });
979
980        let instrument = cache.instrument(&report.instrument_id).cloned();
981
982        drop(cache);
983
984        if let Some(order) = order {
985            let ts_now = self.clock.borrow().timestamp_ns();
986            let events =
987                generate_reconciliation_order_events(&order, report, instrument.as_ref(), ts_now);
988
989            for event in &events {
990                self.handle_event(event);
991            }
992        } else {
993            self.create_external_order(report, instrument.as_ref());
994        }
995    }
996
997    fn create_external_order(
998        &mut self,
999        report: &OrderStatusReport,
1000        instrument: Option<&InstrumentAny>,
1001    ) {
1002        let Some(instrument) = instrument else {
1003            log::warn!(
1004                "Cannot create external order for venue_order_id={}: instrument {} not found",
1005                report.venue_order_id,
1006                report.instrument_id
1007            );
1008            return;
1009        };
1010
1011        let Some(order) = self.materialize_external_order_from_status(report) else {
1012            return;
1013        };
1014
1015        let ts_now = self.clock.borrow().timestamp_ns();
1016        let events = generate_external_order_status_events(
1017            &order,
1018            report,
1019            &report.account_id,
1020            instrument,
1021            ts_now,
1022        );
1023
1024        for event in &events {
1025            self.handle_event(event);
1026        }
1027    }
1028
1029    /// Builds and registers an external order from an [`OrderStatusReport`] without
1030    /// emitting status events. Returns the registered order.
1031    fn materialize_external_order_from_status(
1032        &mut self,
1033        report: &OrderStatusReport,
1034    ) -> Option<OrderAny> {
1035        let strategy_id = self.resolve_external_strategy(&report.instrument_id);
1036        if self.should_filter_unclaimed_external_order(strategy_id) {
1037            self.filtered_unclaimed_external_order_count += 1;
1038
1039            if self.filtered_unclaimed_external_order_count == 1 {
1040                let external_order_id = report
1041                    .client_order_id
1042                    .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1043                log::info!(
1044                    "Filtering unclaimed external orders; first filtered order {} ({}) for {}",
1045                    external_order_id,
1046                    report.venue_order_id,
1047                    report.instrument_id,
1048                );
1049            } else {
1050                let external_order_id = report
1051                    .client_order_id
1052                    .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1053                log::debug!(
1054                    "Filtered unclaimed external order {} ({}) for {}",
1055                    external_order_id,
1056                    report.venue_order_id,
1057                    report.instrument_id,
1058                );
1059            }
1060
1061            return None;
1062        }
1063
1064        self.materialize_external_order_from_status_with_strategy(report, strategy_id)
1065    }
1066
1067    fn materialize_external_order_from_status_with_strategy(
1068        &self,
1069        report: &OrderStatusReport,
1070        strategy_id: StrategyId,
1071    ) -> Option<OrderAny> {
1072        let client_order_id = report
1073            .client_order_id
1074            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1075
1076        let trader_id = get_message_bus().borrow().trader_id;
1077        let ts_now = self.clock.borrow().timestamp_ns();
1078
1079        let initialized = OrderInitialized::new(
1080            trader_id,
1081            strategy_id,
1082            report.instrument_id,
1083            client_order_id,
1084            report.order_side,
1085            report.order_type,
1086            report.quantity,
1087            report.time_in_force,
1088            report.post_only,
1089            report.reduce_only,
1090            false, // quote_quantity
1091            true,  // reconciliation
1092            UUID4::new(),
1093            ts_now,
1094            ts_now,
1095            report.price,
1096            report.trigger_price,
1097            report.trigger_type,
1098            report.limit_offset,
1099            report.trailing_offset,
1100            Some(report.trailing_offset_type),
1101            report.expire_time,
1102            report.display_qty,
1103            None, // emulation_trigger
1104            None, // trigger_instrument_id
1105            Some(report.contingency_type),
1106            report.order_list_id,
1107            report.linked_order_ids.clone(),
1108            report.parent_order_id,
1109            None, // exec_algorithm_id
1110            None, // exec_algorithm_params
1111            None, // exec_spawn_id
1112            None, // tags
1113        );
1114
1115        self.materialize_external_order(
1116            initialized,
1117            client_order_id,
1118            report.venue_order_id,
1119            report.instrument_id,
1120            strategy_id,
1121            ts_now,
1122            Some(report.order_status),
1123        )
1124    }
1125
1126    /// Builds and registers an external order from a [`FillReport`] when no matching
1127    /// order exists in cache. The order is created with `OrderType::Market` and a
1128    /// quantity equal to the fill's `last_qty`, so the fill consumes the entire
1129    /// order on application.
1130    ///
1131    /// This handles venue-initiated fills (most commonly Hyperliquid liquidations)
1132    /// where the venue does not surface a user-level order on its order channel.
1133    fn materialize_external_order_from_fill(&mut self, report: &FillReport) -> Option<OrderAny> {
1134        let strategy_id = self.resolve_external_strategy(&report.instrument_id);
1135        if self.should_filter_unclaimed_external_order(strategy_id) {
1136            self.filtered_unclaimed_external_order_count += 1;
1137
1138            let external_order_id = report
1139                .client_order_id
1140                .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1141
1142            if self.filtered_unclaimed_external_order_count == 1 {
1143                log::info!(
1144                    "Filtering unclaimed external orders; first filtered fill {} ({}) for {}",
1145                    external_order_id,
1146                    report.venue_order_id,
1147                    report.instrument_id,
1148                );
1149            } else {
1150                log::debug!(
1151                    "Filtered unclaimed external fill {} ({}) for {}",
1152                    external_order_id,
1153                    report.venue_order_id,
1154                    report.instrument_id,
1155                );
1156            }
1157
1158            return None;
1159        }
1160
1161        let client_order_id = report
1162            .client_order_id
1163            .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1164
1165        let trader_id = get_message_bus().borrow().trader_id;
1166        let ts_now = self.clock.borrow().timestamp_ns();
1167
1168        let initialized = OrderInitialized::new(
1169            trader_id,
1170            strategy_id,
1171            report.instrument_id,
1172            client_order_id,
1173            report.order_side,
1174            OrderType::Market,
1175            report.last_qty,
1176            TimeInForce::Ioc,
1177            false, // post_only
1178            true,  // reduce_only: venue-initiated closes always reduce
1179            false, // quote_quantity
1180            true,  // reconciliation
1181            UUID4::new(),
1182            ts_now,
1183            ts_now,
1184            None, // price
1185            None, // trigger_price
1186            None, // trigger_type
1187            None, // limit_offset
1188            None, // trailing_offset
1189            Some(TrailingOffsetType::NoTrailingOffset),
1190            None, // expire_time
1191            None, // display_qty
1192            None, // emulation_trigger
1193            None, // trigger_instrument_id
1194            Some(ContingencyType::NoContingency),
1195            None, // order_list_id
1196            None, // linked_order_ids
1197            None, // parent_order_id
1198            None, // exec_algorithm_id
1199            None, // exec_algorithm_params
1200            None, // exec_spawn_id
1201            None, // tags
1202        );
1203
1204        self.materialize_external_order(
1205            initialized,
1206            client_order_id,
1207            report.venue_order_id,
1208            report.instrument_id,
1209            strategy_id,
1210            ts_now,
1211            None,
1212        )
1213    }
1214
1215    fn resolve_external_strategy(&self, instrument_id: &InstrumentId) -> StrategyId {
1216        self.external_order_claims
1217            .get(instrument_id)
1218            .copied()
1219            .unwrap_or_else(StrategyId::external)
1220    }
1221
1222    fn should_filter_unclaimed_external_order(&self, strategy_id: StrategyId) -> bool {
1223        self.config.filter_unclaimed_external_orders && strategy_id.is_external()
1224    }
1225
1226    /// Adds an external order to the cache and registers it for adapter routing.
1227    /// Returns the registered order on success.
1228    #[allow(
1229        clippy::too_many_arguments,
1230        reason = "external order materialisation threads several ids and a timestamp"
1231    )]
1232    fn materialize_external_order(
1233        &self,
1234        initialized: OrderInitialized,
1235        client_order_id: ClientOrderId,
1236        venue_order_id: VenueOrderId,
1237        instrument_id: InstrumentId,
1238        strategy_id: StrategyId,
1239        ts_now: UnixNanos,
1240        order_status: Option<OrderStatus>,
1241    ) -> Option<OrderAny> {
1242        let initialized = OrderEventAny::Initialized(initialized);
1243        let order = match OrderAny::from_events(vec![initialized.clone()]) {
1244            Ok(order) => order,
1245            Err(e) => {
1246                log::error!("Failed to create external order from report: {e}");
1247                return None;
1248            }
1249        };
1250
1251        {
1252            let mut cache = self.cache.borrow_mut();
1253            if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1254                log::error!("Failed to add external order to cache: {e}");
1255                return None;
1256            }
1257
1258            if let Err(e) = cache.add_venue_order_id(&client_order_id, &venue_order_id, false) {
1259                log::warn!("Failed to add venue order ID index: {e}");
1260            }
1261        }
1262
1263        self.publish_order_event(&initialized);
1264
1265        match order_status {
1266            Some(status) => log::info!(
1267                "Created external order {client_order_id} ({venue_order_id}) for {instrument_id} [{status}]",
1268            ),
1269            None => log::info!(
1270                "Created external order {client_order_id} ({venue_order_id}) for {instrument_id}",
1271            ),
1272        }
1273
1274        self.register_external_order(
1275            client_order_id,
1276            venue_order_id,
1277            instrument_id,
1278            strategy_id,
1279            ts_now,
1280        );
1281
1282        Some(order)
1283    }
1284
1285    /// Reconciles a fill report received at runtime.
1286    ///
1287    /// Finds the associated order, validates the fill, and generates an `OrderFilled` event
1288    /// if the fill is not a duplicate and won't cause an overfill. When the order is not
1289    /// in cache, an external order is bootstrapped from the fill so that venue-initiated
1290    /// closures (e.g. Hyperliquid liquidations) that arrive without a companion order
1291    /// status report still update the local position.
1292    pub fn reconcile_fill_report(&mut self, report: &FillReport) {
1293        msgbus::publish_any(
1294            MessagingSwitchboard::reconciliation_raw_fill_report_topic(),
1295            report,
1296        );
1297
1298        let cache = self.cache.borrow();
1299
1300        let order = report
1301            .client_order_id
1302            .and_then(|id| cache.order(&id).map(|o| o.clone()))
1303            .or_else(|| {
1304                cache
1305                    .client_order_id(&report.venue_order_id)
1306                    .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1307            });
1308
1309        let instrument = cache.instrument(&report.instrument_id).cloned();
1310
1311        drop(cache);
1312
1313        let Some(instrument) = instrument else {
1314            log::debug!(
1315                "Cannot reconcile fill report for venue_order_id={}: instrument {} not found",
1316                report.venue_order_id,
1317                report.instrument_id
1318            );
1319            return;
1320        };
1321
1322        let order = match order {
1323            Some(order) => order,
1324            None => {
1325                let Some(order) = self.materialize_external_order_from_fill(report) else {
1326                    return;
1327                };
1328                let ts_now = self.clock.borrow().timestamp_ns();
1329                let accepted = OrderAccepted::new(
1330                    order.trader_id(),
1331                    order.strategy_id(),
1332                    order.instrument_id(),
1333                    order.client_order_id(),
1334                    report.venue_order_id,
1335                    report.account_id,
1336                    UUID4::new(),
1337                    report.ts_event,
1338                    ts_now,
1339                    true, // reconciliation
1340                );
1341                self.handle_event(&OrderEventAny::Accepted(accepted));
1342                self.cache
1343                    .borrow()
1344                    .order(&order.client_order_id())
1345                    .map(|o| o.clone())
1346                    .unwrap_or(order)
1347            }
1348        };
1349
1350        let ts_now = self.clock.borrow().timestamp_ns();
1351
1352        if let Some(event) = reconcile_fill(
1353            &order,
1354            report,
1355            &instrument,
1356            ts_now,
1357            self.config.allow_overfills,
1358        ) {
1359            self.handle_event(&event);
1360        }
1361    }
1362
1363    /// Reconciles an [`OrderStatusReport`] paired with companion [`FillReport`]s
1364    /// for the same venue event.
1365    ///
1366    /// Real fills supplied by the adapter are applied first so their `trade_id` and
1367    /// `commission` are preserved; any residual quantity not covered by the fills is
1368    /// then synthesised as an inferred fill from the status report's `avg_px`.
1369    /// Adapters use this to emit ADL / liquidation / settlement events without
1370    /// losing real fill metadata.
1371    pub fn reconcile_order_with_fills(&mut self, report: &OrderStatusReport, fills: &[FillReport]) {
1372        msgbus::publish_any(
1373            MessagingSwitchboard::reconciliation_raw_order_status_report_topic(),
1374            report,
1375        );
1376
1377        let fill_report_topic = MessagingSwitchboard::reconciliation_raw_fill_report_topic();
1378        for fill in fills {
1379            msgbus::publish_any(fill_report_topic, fill);
1380        }
1381
1382        let cache = self.cache.borrow();
1383        let order = report
1384            .client_order_id
1385            .and_then(|id| cache.order(&id).map(|o| o.clone()))
1386            .or_else(|| {
1387                cache
1388                    .client_order_id(&report.venue_order_id)
1389                    .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1390            });
1391        let instrument = cache.instrument(&report.instrument_id).cloned();
1392        drop(cache);
1393
1394        let Some(instrument) = instrument else {
1395            log::debug!(
1396                "Cannot reconcile bundled report for venue_order_id={}: instrument {} not found",
1397                report.venue_order_id,
1398                report.instrument_id,
1399            );
1400            return;
1401        };
1402
1403        // Bootstrap the external order with only OrderAccepted; defer fill events to
1404        // the per-fill loop so real fill metadata is preserved.
1405        let mut order = match order {
1406            Some(order) => order,
1407            None => {
1408                let Some(order) = self.materialize_external_order_from_status(report) else {
1409                    return;
1410                };
1411                let ts_now = self.clock.borrow().timestamp_ns();
1412                let accepted = OrderAccepted::new(
1413                    order.trader_id(),
1414                    order.strategy_id(),
1415                    order.instrument_id(),
1416                    order.client_order_id(),
1417                    report.venue_order_id,
1418                    report.account_id,
1419                    UUID4::new(),
1420                    report.ts_accepted,
1421                    ts_now,
1422                    true, // reconciliation
1423                );
1424                self.handle_event(&OrderEventAny::Accepted(accepted));
1425                order
1426            }
1427        };
1428
1429        let client_order_id = order.client_order_id();
1430
1431        for fill in fills {
1432            let ts_now = self.clock.borrow().timestamp_ns();
1433
1434            if let Some(event) = reconcile_fill(
1435                &order,
1436                fill,
1437                &instrument,
1438                ts_now,
1439                self.config.allow_overfills,
1440            ) {
1441                self.handle_event(&event);
1442            }
1443
1444            // Refresh order after fill to keep filled_qty accurate for the next iteration.
1445            if let Some(refreshed) = self
1446                .cache
1447                .borrow()
1448                .order(&client_order_id)
1449                .map(|o| o.clone())
1450            {
1451                order = refreshed;
1452            }
1453        }
1454
1455        // Cover any quantity gap between the status report and the real fills with
1456        // an inferred fill so the order reaches the venue-reported terminal state.
1457        if matches!(
1458            report.order_status,
1459            OrderStatus::PartiallyFilled | OrderStatus::Filled,
1460        ) && report.filled_qty > order.filled_qty()
1461        {
1462            let ts_now = self.clock.borrow().timestamp_ns();
1463
1464            if let Some(event) = create_incremental_inferred_fill(
1465                &order,
1466                report,
1467                &report.account_id,
1468                &instrument,
1469                ts_now,
1470                None,
1471            ) {
1472                self.handle_event(&event);
1473
1474                if let Some(refreshed) = self
1475                    .cache
1476                    .borrow()
1477                    .order(&client_order_id)
1478                    .map(|o| o.clone())
1479                {
1480                    order = refreshed;
1481                }
1482            }
1483        }
1484
1485        // Apply terminal events when the venue reports a non-fill closure.
1486        match report.order_status {
1487            OrderStatus::Canceled if !order.is_closed() => {
1488                let ts_now = self.clock.borrow().timestamp_ns();
1489                let canceled = OrderCanceled::new(
1490                    order.trader_id(),
1491                    order.strategy_id(),
1492                    order.instrument_id(),
1493                    order.client_order_id(),
1494                    UUID4::new(),
1495                    report.ts_last,
1496                    ts_now,
1497                    true,
1498                    Some(report.venue_order_id),
1499                    Some(report.account_id),
1500                );
1501                self.handle_event(&OrderEventAny::Canceled(canceled));
1502            }
1503            OrderStatus::Expired if !order.is_closed() => {
1504                let ts_now = self.clock.borrow().timestamp_ns();
1505                let expired = OrderExpired::new(
1506                    order.trader_id(),
1507                    order.strategy_id(),
1508                    order.instrument_id(),
1509                    order.client_order_id(),
1510                    UUID4::new(),
1511                    report.ts_last,
1512                    ts_now,
1513                    true,
1514                    Some(report.venue_order_id),
1515                    Some(report.account_id),
1516                );
1517                self.handle_event(&OrderEventAny::Expired(expired));
1518            }
1519            _ => {}
1520        }
1521    }
1522
1523    /// Reconciles a position status report received at runtime.
1524    ///
1525    /// Compares the venue-reported position with cached positions and logs any discrepancies.
1526    /// Handles both hedging (with `venue_position_id`) and netting (without) modes.
1527    pub fn reconcile_position_report(&mut self, report: &PositionStatusReport) {
1528        msgbus::publish_any(
1529            MessagingSwitchboard::reconciliation_raw_position_status_report_topic(),
1530            report,
1531        );
1532
1533        let cache = self.cache.borrow();
1534
1535        let size_precision = cache
1536            .instrument(&report.instrument_id)
1537            .map(InstrumentAny::size_precision);
1538
1539        if report.venue_position_id.is_some() {
1540            self.reconcile_position_report_hedging(report, &cache);
1541        } else {
1542            self.reconcile_position_report_netting(report, &cache, size_precision);
1543        }
1544    }
1545
1546    fn reconcile_position_report_hedging(&self, report: &PositionStatusReport, cache: &Cache) {
1547        let venue_position_id = report.venue_position_id.as_ref().unwrap();
1548
1549        log::debug!(
1550            "Reconciling HEDGE position for {}, venue_position_id={}",
1551            report.instrument_id,
1552            venue_position_id
1553        );
1554
1555        let Some(position) = cache.position(venue_position_id) else {
1556            log::error!("Cannot reconcile position: {venue_position_id} not found in cache");
1557            return;
1558        };
1559
1560        let cached_signed_qty = match position.side {
1561            PositionSide::Long => position.quantity.as_decimal(),
1562            PositionSide::Short => -position.quantity.as_decimal(),
1563            _ => Decimal::ZERO,
1564        };
1565        let venue_signed_qty = report.signed_decimal_qty;
1566
1567        if cached_signed_qty != venue_signed_qty {
1568            log::error!(
1569                "Position mismatch for {} {}: cached={}, venue={}",
1570                report.instrument_id,
1571                venue_position_id,
1572                cached_signed_qty,
1573                venue_signed_qty
1574            );
1575        }
1576    }
1577
1578    fn reconcile_position_report_netting(
1579        &self,
1580        report: &PositionStatusReport,
1581        cache: &Cache,
1582        size_precision: Option<u8>,
1583    ) {
1584        log::debug!("Reconciling NET position for {}", report.instrument_id);
1585
1586        let positions_open = Self::netting_positions_open_for_report(cache, report);
1587
1588        let position_refs = positions_open
1589            .iter()
1590            .map(|position| &**position)
1591            .collect::<Vec<_>>();
1592
1593        if let Some(message) =
1594            Self::netting_split_position_ownership_message(report, &position_refs)
1595        {
1596            log::warn!("{message}");
1597        }
1598
1599        // Sum up cached position quantities using domain types to avoid f64 precision loss
1600        let cached_signed_qty: Decimal = positions_open
1601            .iter()
1602            .map(|position| Self::position_signed_decimal_qty(position))
1603            .sum();
1604
1605        log::debug!(
1606            "Position report: venue_signed_qty={}, cached_signed_qty={}",
1607            report.signed_decimal_qty,
1608            cached_signed_qty
1609        );
1610
1611        let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
1612    }
1613
1614    fn netting_positions_open_for_report<'a>(
1615        cache: &'a Cache,
1616        report: &PositionStatusReport,
1617    ) -> Vec<PositionRef<'a>> {
1618        cache.positions_open(
1619            None,
1620            Some(&report.instrument_id),
1621            None,
1622            Some(&report.account_id),
1623            None,
1624        )
1625    }
1626
1627    fn netting_split_position_ownership_message(
1628        report: &PositionStatusReport,
1629        positions_open: &[&Position],
1630    ) -> Option<String> {
1631        let mut strategy_ids = positions_open
1632            .iter()
1633            .map(|position| position.strategy_id.to_string())
1634            .collect::<Vec<_>>();
1635        strategy_ids.sort();
1636        strategy_ids.dedup();
1637
1638        if strategy_ids.len() <= 1 {
1639            return None;
1640        }
1641
1642        let position_details = Self::position_details(positions_open.iter().copied());
1643
1644        Some(format!(
1645            "NETTING reconciliation found split ownership for account_id={}, instrument_id={}: \
1646             strategies=[{}], positions=[{}]",
1647            report.account_id,
1648            report.instrument_id,
1649            strategy_ids.join(", "),
1650            position_details
1651        ))
1652    }
1653
1654    /// Reconciles an execution mass status report.
1655    ///
1656    /// Processes all order reports, fill reports, and position reports contained
1657    /// in the mass status. Orders created as external during this pass already receive
1658    /// inferred fills, so their companion fill reports are skipped to avoid double-fills.
1659    pub fn reconcile_execution_mass_status(&mut self, mass_status: &ExecutionMassStatus) {
1660        self.report_count += 1;
1661
1662        log::info!(
1663            "Reconciling mass status for client={}, account={}, venue={}",
1664            mass_status.client_id,
1665            mass_status.account_id,
1666            mass_status.venue
1667        );
1668
1669        let mut external_venue_ids = AHashSet::new();
1670        let mut filtered_venue_ids = AHashSet::new();
1671
1672        for order_report in mass_status.order_reports().values() {
1673            let existed = {
1674                let cache = self.cache.borrow();
1675                order_report
1676                    .client_order_id
1677                    .and_then(|id| cache.order(&id).map(|o| o.clone()))
1678                    .or_else(|| {
1679                        cache
1680                            .client_order_id(&order_report.venue_order_id)
1681                            .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1682                    })
1683                    .is_some()
1684            };
1685            let filtered_count = self.filtered_unclaimed_external_order_count;
1686
1687            self.reconcile_order_status_report(order_report);
1688
1689            if !existed {
1690                if self.filtered_unclaimed_external_order_count > filtered_count {
1691                    filtered_venue_ids.insert(order_report.venue_order_id);
1692                } else {
1693                    let exists_after = {
1694                        let cache = self.cache.borrow();
1695                        order_report
1696                            .client_order_id
1697                            .and_then(|id| cache.order(&id).map(|o| o.clone()))
1698                            .or_else(|| {
1699                                cache
1700                                    .client_order_id(&order_report.venue_order_id)
1701                                    .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1702                            })
1703                            .is_some()
1704                    };
1705
1706                    if exists_after {
1707                        external_venue_ids.insert(order_report.venue_order_id);
1708                    }
1709                }
1710            }
1711        }
1712
1713        let raw_fill_topic = MessagingSwitchboard::reconciliation_raw_fill_report_topic();
1714
1715        for fill_reports in mass_status.fill_reports().values() {
1716            for fill_report in fill_reports {
1717                if external_venue_ids.contains(&fill_report.venue_order_id) {
1718                    // Skipped fills still arrived from the venue; capture them
1719                    // for forensic replay even though reconciliation is covered
1720                    // by the inferred fill generated above.
1721                    msgbus::publish_any(raw_fill_topic, fill_report);
1722
1723                    log::debug!(
1724                        "Skipping fill report for external order {}: covered by inferred fill",
1725                        fill_report.venue_order_id
1726                    );
1727                    continue;
1728                }
1729
1730                if filtered_venue_ids.contains(&fill_report.venue_order_id) {
1731                    msgbus::publish_any(raw_fill_topic, fill_report);
1732
1733                    log::debug!(
1734                        "Skipping fill report for filtered unclaimed external order {}",
1735                        fill_report.venue_order_id
1736                    );
1737                    continue;
1738                }
1739
1740                self.reconcile_fill_report(fill_report);
1741            }
1742        }
1743
1744        for position_reports in mass_status.position_reports().values() {
1745            for position_report in position_reports {
1746                self.reconcile_position_report(position_report);
1747            }
1748        }
1749
1750        log::info!(
1751            "Mass status reconciliation complete: {} orders, {} fills, {} positions",
1752            mass_status.order_reports().len(),
1753            mass_status
1754                .fill_reports()
1755                .values()
1756                .map(Vec::len)
1757                .sum::<usize>(),
1758            mass_status
1759                .position_reports()
1760                .values()
1761                .map(Vec::len)
1762                .sum::<usize>()
1763        );
1764    }
1765
1766    /// Executes a trading command by routing it to the appropriate execution client.
1767    pub fn execute(&self, command: TradingCommand) {
1768        self.execute_command(command);
1769    }
1770
1771    /// Processes an order event, updating internal state and routing as needed.
1772    pub fn process(&mut self, event: &OrderEventAny) {
1773        self.handle_event(event);
1774    }
1775
1776    /// Starts the execution engine and all registered execution clients.
1777    pub fn start(&mut self) {
1778        for client in self.get_clients_mut() {
1779            if let Err(e) = client.start() {
1780                log::error!("{e}");
1781            }
1782        }
1783
1784        self.start_snapshot_timer();
1785        self.start_purge_timers();
1786
1787        log::info!("Started");
1788    }
1789
1790    /// Stops the execution engine and all registered execution clients.
1791    ///
1792    /// Adapters are expected to be idempotent on repeated `stop()` calls
1793    /// (e.g. via an internal `is_stopped` guard); the backtest teardown
1794    /// sequence calls `stop()` more than once per run.
1795    pub fn stop(&mut self) {
1796        for client in self.get_clients_mut() {
1797            if let Err(e) = client.stop() {
1798                log::error!("{e}");
1799            }
1800        }
1801
1802        self.stop_snapshot_timer();
1803        self.stop_purge_timers();
1804
1805        log::info!("Stopped");
1806    }
1807
1808    /// Stops all registered execution clients without stopping the engine itself.
1809    pub fn stop_clients(&mut self) {
1810        for client in self.get_clients_mut() {
1811            if let Err(e) = client.stop() {
1812                log::error!("{e}");
1813            }
1814        }
1815    }
1816
1817    /// Resets the execution engine and all registered execution clients to initial state.
1818    ///
1819    /// Cancels engine-owned timers (snapshot, purge) but leaves timers owned by
1820    /// other components on the shared clock untouched.
1821    pub fn reset(&mut self) {
1822        for client in self.get_clients_mut() {
1823            if let Err(e) = client.reset() {
1824                log::error!("{e}");
1825            }
1826        }
1827
1828        self.cache.borrow_mut().reset();
1829        self.pos_id_generator.reset();
1830
1831        self.stop_snapshot_timer();
1832        self.stop_purge_timers();
1833
1834        self.command_count.set(0);
1835        self.event_count = 0;
1836        self.report_count = 0;
1837        self.filtered_unclaimed_external_order_count = 0;
1838
1839        log::info!("Reset");
1840    }
1841
1842    /// Disposes of the execution engine, releasing resources from all clients and timers.
1843    ///
1844    /// Cancels engine-owned timers (snapshot, purge) but leaves timers owned by
1845    /// other components on the shared clock untouched.
1846    pub fn dispose(&mut self) {
1847        for client in self.get_clients_mut() {
1848            if let Err(e) = client.dispose() {
1849                log::error!("{e}");
1850            }
1851        }
1852
1853        self.stop_snapshot_timer();
1854        self.stop_purge_timers();
1855
1856        log::info!("Disposed");
1857    }
1858
1859    fn execute_command(&self, command: TradingCommand) {
1860        self.command_count.set(self.command_count.get() + 1);
1861
1862        if self.config.debug {
1863            log::debug!("{RECV}{CMD} {command:?}");
1864        }
1865
1866        if let Some(cid) = command.client_id()
1867            && self.external_clients.contains(&cid)
1868        {
1869            let topic = format!("commands.trading.{cid}");
1870            msgbus::publish_any(topic.into(), &command);
1871
1872            if self.config.debug {
1873                log::debug!("Skipping execution command for external client {cid}: {command:?}");
1874            }
1875            return;
1876        }
1877
1878        let client = if let Some(adapter) = self.find_client_for_command(&command) {
1879            adapter.client.as_ref()
1880        } else {
1881            let routing_context = Self::routing_context_for_command(&command);
1882
1883            log::error!(
1884                "No execution client found for command: client_id={:?}, {routing_context}, command={command:?}",
1885                command.client_id(),
1886            );
1887
1888            let reason = format!(
1889                "No execution client found for client_id={:?}, {routing_context}",
1890                command.client_id(),
1891            );
1892
1893            match command {
1894                TradingCommand::SubmitOrder(cmd) => {
1895                    let order = self
1896                        .cache
1897                        .borrow()
1898                        .order(&cmd.client_order_id)
1899                        .map(|o| o.clone());
1900                    if let Some(order) = order {
1901                        self.deny_order(&order, &reason);
1902                    }
1903                }
1904                TradingCommand::SubmitOrderList(cmd) => {
1905                    let orders: Vec<OrderAny> = self
1906                        .cache
1907                        .borrow()
1908                        .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
1909
1910                    for order in &orders {
1911                        self.deny_order(order, &reason);
1912                    }
1913                }
1914                _ => {}
1915            }
1916
1917            return;
1918        };
1919
1920        match command {
1921            TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
1922            TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
1923            TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
1924            TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
1925            TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
1926            TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
1927            TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
1928            TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
1929        }
1930    }
1931
1932    fn routing_context_for_command(command: &TradingCommand) -> String {
1933        match command {
1934            TradingCommand::SubmitOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1935            TradingCommand::SubmitOrderList(cmd) => format!("venue={}", cmd.instrument_id.venue),
1936            TradingCommand::ModifyOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1937            TradingCommand::CancelOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1938            TradingCommand::CancelAllOrders(cmd) => format!("venue={}", cmd.instrument_id.venue),
1939            TradingCommand::BatchCancelOrders(cmd) => format!("venue={}", cmd.instrument_id.venue),
1940            TradingCommand::QueryOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1941            TradingCommand::QueryAccount(cmd) => {
1942                let issuer = cmd.account_id.get_issuer();
1943                format!("account_id={}, issuer={issuer}", cmd.account_id)
1944            }
1945        }
1946    }
1947
1948    fn find_client_for_command(&self, command: &TradingCommand) -> Option<&ExecutionClientAdapter> {
1949        if let Some(client_id) = command.client_id()
1950            && let Some(adapter) = self.clients.get(&client_id)
1951        {
1952            return Some(adapter);
1953        }
1954
1955        if let Some(account_id) = self.account_id_for_command(command) {
1956            let issuer = account_id.get_issuer();
1957            let issuer_client_id = ClientId::from(issuer.as_str());
1958
1959            if let Some(adapter) = self.clients.get(&issuer_client_id) {
1960                return Some(adapter);
1961            }
1962
1963            if let Some(client_id) = self.routing_map.get(&issuer)
1964                && let Some(adapter) = self.clients.get(client_id)
1965            {
1966                return Some(adapter);
1967            }
1968        }
1969
1970        if let Some(instrument_id) = Self::instrument_id_for_command(command)
1971            && let Some(client_id) = self.routing_map.get(&instrument_id.venue)
1972            && let Some(adapter) = self.clients.get(client_id)
1973        {
1974            return Some(adapter);
1975        }
1976
1977        self.default_client.as_ref()
1978    }
1979
1980    fn account_id_for_command(&self, command: &TradingCommand) -> Option<AccountId> {
1981        match command {
1982            TradingCommand::QueryAccount(cmd) => Some(cmd.account_id),
1983            TradingCommand::SubmitOrder(cmd) => self
1984                .cache
1985                .borrow()
1986                .order(&cmd.client_order_id)
1987                .and_then(|order| order.account_id()),
1988            TradingCommand::ModifyOrder(cmd) => self
1989                .cache
1990                .borrow()
1991                .order(&cmd.client_order_id)
1992                .and_then(|order| order.account_id()),
1993            TradingCommand::CancelOrder(cmd) => self
1994                .cache
1995                .borrow()
1996                .order(&cmd.client_order_id)
1997                .and_then(|order| order.account_id()),
1998            TradingCommand::SubmitOrderList(_)
1999            | TradingCommand::CancelAllOrders(_)
2000            | TradingCommand::BatchCancelOrders(_)
2001            | TradingCommand::QueryOrder(_) => None,
2002        }
2003    }
2004
2005    const fn instrument_id_for_command(command: &TradingCommand) -> Option<InstrumentId> {
2006        match command {
2007            TradingCommand::SubmitOrder(cmd) => Some(cmd.instrument_id),
2008            TradingCommand::SubmitOrderList(cmd) => Some(cmd.instrument_id),
2009            TradingCommand::ModifyOrder(cmd) => Some(cmd.instrument_id),
2010            TradingCommand::CancelOrder(cmd) => Some(cmd.instrument_id),
2011            TradingCommand::CancelAllOrders(cmd) => Some(cmd.instrument_id),
2012            TradingCommand::BatchCancelOrders(cmd) => Some(cmd.instrument_id),
2013            TradingCommand::QueryOrder(cmd) => Some(cmd.instrument_id),
2014            TradingCommand::QueryAccount(_) => None,
2015        }
2016    }
2017
2018    fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: SubmitOrder) {
2019        let client_order_id = cmd.client_order_id;
2020        let cached_order = { self.cache.borrow().order_owned(&client_order_id) };
2021
2022        let (order, added_to_cache) = match cached_order {
2023            Some(order) => (order, false),
2024            None => {
2025                let Some(order) =
2026                    self.add_order_from_init(&cmd.order_init, cmd.position_id, cmd.client_id, &cmd)
2027                else {
2028                    return;
2029                };
2030
2031                (order, true)
2032            }
2033        };
2034
2035        if added_to_cache && self.config.snapshot_orders {
2036            self.create_order_state_snapshot(&order);
2037        }
2038
2039        let order_venue = order.instrument_id().venue;
2040        let client_venue = client.venue();
2041        if !client.handles_order_venue(order_venue) {
2042            let client_id = client.client_id();
2043            self.deny_order(
2044                &order,
2045                &format!(
2046                    "Client {client_id} does not handle order venue {order_venue} (client venue {client_venue})"
2047                ),
2048            );
2049            return;
2050        }
2051
2052        if let Some(reason) = self.check_position_id_against_oms(
2053            cmd.instrument_id,
2054            cmd.strategy_id,
2055            cmd.position_id,
2056            client,
2057        ) {
2058            self.deny_order(&order, &reason);
2059            return;
2060        }
2061
2062        let instrument_id = order.instrument_id();
2063
2064        if !added_to_cache && self.config.snapshot_orders {
2065            self.create_order_state_snapshot(&order);
2066        }
2067
2068        {
2069            let cache = self.cache.borrow();
2070            if cache.instrument(&instrument_id).is_none() {
2071                log::error!(
2072                    "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
2073                );
2074                return;
2075            }
2076        }
2077
2078        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
2079            let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
2080            own_book.add(order.to_own_book_order());
2081        }
2082
2083        if let Err(e) = client.submit_order(cmd) {
2084            self.deny_order(&order, &format!("failed-to-submit-order-to-client: {e}"));
2085        }
2086    }
2087
2088    fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: SubmitOrderList) {
2089        let mut orders = Vec::with_capacity(cmd.order_list.client_order_ids.len());
2090        let mut added_client_order_ids = AHashSet::new();
2091
2092        for client_order_id in &cmd.order_list.client_order_ids {
2093            let cached_order = { self.cache.borrow().order_owned(client_order_id) };
2094
2095            if let Some(order) = cached_order {
2096                orders.push(order);
2097                continue;
2098            }
2099
2100            let Some(order_init) = cmd
2101                .order_inits
2102                .iter()
2103                .find(|init| init.client_order_id == *client_order_id)
2104            else {
2105                log::error!(
2106                    "Cannot handle submit order list: order not found in cache and no initialization event for {client_order_id}, {cmd}"
2107                );
2108                continue;
2109            };
2110
2111            let Some(order) =
2112                self.add_order_from_init(order_init, cmd.position_id, cmd.client_id, &cmd)
2113            else {
2114                continue;
2115            };
2116
2117            added_client_order_ids.insert(order.client_order_id());
2118            orders.push(order);
2119        }
2120
2121        if self.config.snapshot_orders {
2122            for order in &orders {
2123                if added_client_order_ids.contains(&order.client_order_id()) {
2124                    self.create_order_state_snapshot(order);
2125                }
2126            }
2127        }
2128
2129        if orders.len() != cmd.order_list.client_order_ids.len() {
2130            for order in &orders {
2131                self.deny_order(
2132                    order,
2133                    &format!("Incomplete order list: missing orders in cache for {cmd}"),
2134                );
2135            }
2136            return;
2137        }
2138
2139        let order_list_venue = cmd.instrument_id.venue;
2140        let client_venue = client.venue();
2141        if !client.handles_order_venue(order_list_venue) {
2142            let client_id = client.client_id();
2143
2144            for order in &orders {
2145                self.deny_order(
2146                    order,
2147                    &format!(
2148                        "Client {client_id} does not handle order list venue {order_list_venue} (client venue {client_venue})"
2149                    ),
2150                );
2151            }
2152            return;
2153        }
2154
2155        let is_uniform_instrument = orders
2156            .iter()
2157            .all(|o| o.instrument_id() == cmd.instrument_id);
2158
2159        if let Some(position_id) = cmd.position_id
2160            && !is_uniform_instrument
2161        {
2162            let reason = format!(
2163                "`position_id` {position_id} is not valid for a mixed-instrument order list; \
2164                 a position belongs to a single instrument",
2165            );
2166
2167            for order in &orders {
2168                self.deny_order(order, &reason);
2169            }
2170            return;
2171        }
2172
2173        if let Some(reason) = self.check_position_id_against_oms(
2174            cmd.instrument_id,
2175            cmd.strategy_id,
2176            cmd.position_id,
2177            client,
2178        ) {
2179            for order in &orders {
2180                self.deny_order(order, &reason);
2181            }
2182            return;
2183        }
2184
2185        if self.config.snapshot_orders {
2186            for order in &orders {
2187                if !added_client_order_ids.contains(&order.client_order_id()) {
2188                    self.create_order_state_snapshot(order);
2189                }
2190            }
2191        }
2192
2193        {
2194            let cache = self.cache.borrow();
2195            if cache.instrument(&cmd.instrument_id).is_none() {
2196                log::error!(
2197                    "Cannot handle submit order list: no instrument found for {}, {cmd}",
2198                    cmd.instrument_id,
2199                );
2200                return;
2201            }
2202        }
2203
2204        if self.config.manage_own_order_books {
2205            for order in &orders {
2206                if should_handle_own_book_order(order) {
2207                    let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
2208                    own_book.add(order.to_own_book_order());
2209                }
2210            }
2211        }
2212
2213        if let Err(e) = client.submit_order_list(cmd) {
2214            log::error!("Error submitting order list to client: {e}");
2215            for order in &orders {
2216                self.deny_order(
2217                    order,
2218                    &format!("failed-to-submit-order-list-to-client: {e}"),
2219                );
2220            }
2221        }
2222    }
2223
2224    fn add_order_from_init(
2225        &self,
2226        order_init: &OrderInitialized,
2227        position_id: Option<PositionId>,
2228        client_id: Option<ClientId>,
2229        context: &dyn Display,
2230    ) -> Option<OrderAny> {
2231        let client_order_id = order_init.client_order_id;
2232        let order = match OrderAny::from_events(vec![OrderEventAny::Initialized(
2233            order_init.clone(),
2234        )]) {
2235            Ok(order) => order,
2236            Err(e) => {
2237                log::error!(
2238                    "Cannot reconstruct order from initialization event for {client_order_id}: {e}, {context}"
2239                );
2240                return None;
2241            }
2242        };
2243
2244        if let Err(e) =
2245            self.cache
2246                .borrow_mut()
2247                .add_order(order.clone(), position_id, client_id, true)
2248        {
2249            log::error!(
2250                "Cannot add reconstructed order to cache for {client_order_id}: {e}, {context}"
2251            );
2252            return None;
2253        }
2254
2255        Some(order)
2256    }
2257
2258    fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: ModifyOrder) {
2259        if let Err(e) = client.modify_order(cmd) {
2260            log::error!("Error modifying order: {e}");
2261        }
2262    }
2263
2264    fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: CancelOrder) {
2265        if let Err(e) = client.cancel_order(cmd) {
2266            log::error!("Error canceling order: {e}");
2267        }
2268    }
2269
2270    fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: CancelAllOrders) {
2271        if let Err(e) = client.cancel_all_orders(cmd) {
2272            log::error!("Error canceling all orders: {e}");
2273        }
2274    }
2275
2276    fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: BatchCancelOrders) {
2277        if let Err(e) = client.batch_cancel_orders(cmd) {
2278            log::error!("Error batch canceling orders: {e}");
2279        }
2280    }
2281
2282    fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: QueryAccount) {
2283        if let Err(e) = client.query_account(cmd) {
2284            log::error!("Error querying account: {e}");
2285        }
2286    }
2287
2288    fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: QueryOrder) {
2289        if let Err(e) = client.query_order(cmd) {
2290            log::error!("Error querying order: {e}");
2291        }
2292    }
2293
2294    fn create_order_state_snapshot(&self, order: &OrderAny) {
2295        if self.config.debug {
2296            log::debug!("Creating order state snapshot for {order}");
2297        }
2298
2299        if self.cache.borrow().has_backing()
2300            && let Err(e) = self.cache.borrow().snapshot_order_state(order)
2301        {
2302            log::error!("Failed to snapshot order state: {e}");
2303        }
2304    }
2305
2306    fn create_position_state_snapshot(&self, position: &Position) {
2307        if self.config.debug {
2308            log::debug!("Creating position state snapshot for {position}");
2309        }
2310
2311        // let mut position: Position = position.clone();
2312        // if let Some(pnl) = self.cache.borrow().calculate_unrealized_pnl(&position) {
2313        //     position.unrealized_pnl(last)
2314        // }
2315    }
2316
2317    fn handle_event(&mut self, event: &OrderEventAny) {
2318        self.event_count += 1;
2319
2320        if self.config.debug {
2321            log::debug!("{RECV}{EVT} {event:?}");
2322        }
2323
2324        let event_client_order_id = event.client_order_id();
2325        let cache = self.cache.borrow();
2326        let client_order_id = if cache.order_exists(&event_client_order_id) {
2327            event_client_order_id
2328        } else {
2329            let is_leg_fill =
2330                matches!(event, OrderEventAny::Filled(fill) if self.is_leg_fill(fill));
2331            if !is_leg_fill {
2332                log::warn!(
2333                    "Order with {} not found in the cache to apply {}",
2334                    event.client_order_id(),
2335                    event
2336                );
2337            }
2338
2339            // Try to find order by venue order ID if available
2340            let venue_order_id = if let Some(id) = event.venue_order_id() {
2341                id
2342            } else {
2343                log::error!(
2344                    "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
2345                    event.client_order_id()
2346                );
2347                return;
2348            };
2349
2350            // Look up client order ID from venue order ID
2351            let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
2352                *id
2353            } else {
2354                if let OrderEventAny::Filled(fill) = event
2355                    && is_leg_fill
2356                {
2357                    log::info!(
2358                        "Processing leg fill without corresponding order: {} for instrument {}",
2359                        fill.client_order_id,
2360                        fill.instrument_id
2361                    );
2362                    drop(cache);
2363                    self.handle_leg_fill_without_order(*fill);
2364                    return;
2365                }
2366
2367                log::error!(
2368                    "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
2369                    event.client_order_id(),
2370                );
2371                return;
2372            };
2373
2374            // Get order using found client order ID
2375            if cache.order_exists(&client_order_id) {
2376                log::info!("Order with {client_order_id} was found in the cache");
2377                client_order_id
2378            } else {
2379                if let OrderEventAny::Filled(fill) = event
2380                    && is_leg_fill
2381                {
2382                    log::info!(
2383                        "Processing leg fill without corresponding order: {} for instrument {}",
2384                        fill.client_order_id,
2385                        fill.instrument_id
2386                    );
2387                    drop(cache);
2388                    self.handle_leg_fill_without_order(*fill);
2389                    return;
2390                }
2391
2392                log::error!(
2393                    "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
2394                );
2395                return;
2396            }
2397        };
2398        let order_before_fill = if matches!(event, OrderEventAny::Filled(_)) {
2399            cache.order(&client_order_id).map(|o| o.clone())
2400        } else {
2401            None
2402        };
2403
2404        drop(cache);
2405
2406        let event = if event_client_order_id == client_order_id {
2407            event.clone()
2408        } else {
2409            event.clone().with_client_order_id(client_order_id)
2410        };
2411
2412        match &event {
2413            OrderEventAny::Filled(fill) => {
2414                let Some(order_before_fill) = order_before_fill else {
2415                    log::error!(
2416                        "Cannot apply fill: order {} not found in the cache",
2417                        fill.client_order_id()
2418                    );
2419                    return;
2420                };
2421                let oms_type = self.determine_oms_type(fill);
2422                let position_id =
2423                    self.determine_position_id(*fill, oms_type, Some(&order_before_fill));
2424
2425                let mut fill = *fill;
2426                fill.position_id = Some(position_id);
2427
2428                if self
2429                    .validate_fill_for_order(&order_before_fill, &fill)
2430                    .is_ok()
2431                {
2432                    let event = OrderEventAny::Filled(fill);
2433                    let Some(order) = self.update_cached_order(client_order_id, &event) else {
2434                        return;
2435                    };
2436
2437                    let position_events = self.handle_order_fill(&order, fill, oms_type);
2438                    self.publish_order_event(&event);
2439                    self.publish_position_events(position_events);
2440                }
2441            }
2442            _ => {
2443                if self.update_cached_order(client_order_id, &event).is_some() {
2444                    self.publish_order_event(&event);
2445                }
2446            }
2447        }
2448    }
2449
2450    fn handle_leg_fill_without_order(&mut self, mut fill: OrderFilled) {
2451        let instrument =
2452            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
2453                instrument.clone()
2454            } else {
2455                log::error!(
2456                    "Cannot handle leg fill: no instrument found for {}, {fill}",
2457                    fill.instrument_id,
2458                );
2459                return;
2460            };
2461
2462        if self.cache.borrow().account(&fill.account_id).is_none() {
2463            log::error!(
2464                "Cannot handle leg fill: no account found for {}, {fill}",
2465                fill.instrument_id.venue,
2466            );
2467            return;
2468        }
2469
2470        let oms_type = self.determine_oms_type(&fill);
2471        let position_id = self.determine_leg_fill_position_id(fill, oms_type);
2472        fill.position_id = Some(position_id);
2473        let duplicate_position_fill = self.position_contains_trade_id(position_id, fill.trade_id);
2474
2475        let event = OrderEventAny::Filled(fill);
2476        let portfolio_endpoint = MessagingSwitchboard::portfolio_update_order();
2477        msgbus::send_order_event(portfolio_endpoint, event.clone());
2478
2479        let position_events = if duplicate_position_fill {
2480            log::warn!(
2481                "Duplicate leg fill: {} trade_id={} already applied to position {}, skipping position update",
2482                fill.client_order_id,
2483                fill.trade_id,
2484                position_id
2485            );
2486            Vec::new()
2487        } else {
2488            self.handle_position_update(&instrument, fill, oms_type)
2489        };
2490        self.publish_order_event(&event);
2491        self.publish_position_events(position_events);
2492    }
2493
2494    fn determine_leg_fill_position_id(
2495        &mut self,
2496        fill: OrderFilled,
2497        oms_type: OmsType,
2498    ) -> PositionId {
2499        let cache = self.cache.borrow();
2500        let cached_position_id = cache.position_id(&fill.client_order_id()).copied();
2501        drop(cache);
2502
2503        if let Some(position_id) = cached_position_id {
2504            if let Some(fill_position_id) = fill.position_id
2505                && fill_position_id != position_id
2506            {
2507                log::warn!(
2508                    "Incorrect position ID assigned to leg fill: \
2509                     cached={position_id}, assigned={fill_position_id}; \
2510                     re-assigning from cache",
2511                );
2512            }
2513
2514            return position_id;
2515        }
2516
2517        match oms_type {
2518            OmsType::Hedging => fill
2519                .position_id
2520                .unwrap_or_else(|| self.pos_id_generator.generate(fill.strategy_id, false)),
2521            OmsType::Netting => self.determine_netting_position_id(fill),
2522            _ => self.determine_netting_position_id(fill),
2523        }
2524    }
2525
2526    fn is_leg_fill(&self, fill: &OrderFilled) -> bool {
2527        if !fill.client_order_id.as_str().contains("-LEG-")
2528            && !fill.venue_order_id.as_str().contains("-LEG-")
2529        {
2530            return false;
2531        }
2532
2533        self.cache
2534            .borrow()
2535            .instrument(&fill.instrument_id)
2536            .is_some_and(|instrument| !instrument.is_spread())
2537    }
2538
2539    fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
2540        if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id)
2541            && *oms_type != OmsType::Unspecified
2542        {
2543            return *oms_type;
2544        }
2545
2546        if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
2547            && let Some(client) = self.clients.get(client_id)
2548        {
2549            return client.oms_type;
2550        }
2551
2552        if let Some(client) = &self.default_client {
2553            return client.oms_type;
2554        }
2555
2556        OmsType::Netting // Default fallback
2557    }
2558
2559    fn resolve_oms_type_for_client(
2560        &self,
2561        strategy_id: StrategyId,
2562        client: &dyn ExecutionClient,
2563    ) -> OmsType {
2564        if let Some(oms_type) = self.oms_overrides.get(&strategy_id)
2565            && *oms_type != OmsType::Unspecified
2566        {
2567            return *oms_type;
2568        }
2569
2570        client.oms_type()
2571    }
2572
2573    fn check_position_id_against_oms(
2574        &self,
2575        instrument_id: InstrumentId,
2576        strategy_id: StrategyId,
2577        position_id: Option<PositionId>,
2578        client: &dyn ExecutionClient,
2579    ) -> Option<String> {
2580        let position_id = position_id?;
2581
2582        if self.resolve_oms_type_for_client(strategy_id, client) != OmsType::Netting {
2583            return None;
2584        }
2585
2586        let expected = format!("{instrument_id}-{strategy_id}");
2587        if position_id.as_str() == expected {
2588            return None;
2589        }
2590
2591        Some(format!(
2592            "`position_id` {position_id} is not valid for NETTING OMS; \
2593             expected '{expected}' (use HEDGING for custom position IDs)"
2594        ))
2595    }
2596
2597    fn determine_position_id(
2598        &mut self,
2599        fill: OrderFilled,
2600        oms_type: OmsType,
2601        order: Option<&OrderAny>,
2602    ) -> PositionId {
2603        let cache = self.cache.borrow();
2604        let cached_position_id = cache.position_id(&fill.client_order_id()).copied();
2605        drop(cache);
2606
2607        if self.config.debug {
2608            log::debug!(
2609                "Determining position ID for {}, position_id={:?}",
2610                fill.client_order_id(),
2611                cached_position_id,
2612            );
2613        }
2614
2615        if let Some(position_id) = cached_position_id {
2616            if let Some(fill_position_id) = fill.position_id
2617                && fill_position_id != position_id
2618            {
2619                log::warn!(
2620                    "Incorrect position ID assigned to fill: \
2621                     cached={position_id}, assigned={fill_position_id}; \
2622                     re-assigning from cache",
2623                );
2624            }
2625
2626            if self.config.debug {
2627                log::debug!("Assigned {position_id} to {}", fill.client_order_id());
2628            }
2629
2630            return position_id;
2631        }
2632
2633        let position_id = match oms_type {
2634            OmsType::Hedging => self.determine_hedging_position_id(fill, order),
2635            OmsType::Netting => self.determine_netting_position_id(fill),
2636            _ => self.determine_netting_position_id(fill),
2637        };
2638
2639        let order = if let Some(o) = order {
2640            o.clone()
2641        } else {
2642            let cache = self.cache.borrow();
2643            cache.order(&fill.client_order_id()).map_or_else(
2644                || {
2645                    panic!(
2646                        "Order for {} not found to determine position ID",
2647                        fill.client_order_id()
2648                    )
2649                },
2650                |o| o.clone(),
2651            )
2652        };
2653
2654        if order.exec_algorithm_id().is_some()
2655            && let Some(exec_spawn_id) = order.exec_spawn_id()
2656        {
2657            let cache = self.cache.borrow();
2658            let primary = if let Some(p) = cache.order(&exec_spawn_id) {
2659                p.clone()
2660            } else {
2661                log::warn!(
2662                    "Primary exec spawn order {exec_spawn_id} not found, \
2663                     skipping position ID propagation"
2664                );
2665                return position_id;
2666            };
2667            let primary_already_indexed = cache.position_id(&primary.client_order_id()).is_some();
2668            drop(cache);
2669
2670            if primary.position_id().is_none() && !primary_already_indexed {
2671                if let Some(mut primary_mut) = self.cache.borrow_mut().order_mut(&exec_spawn_id) {
2672                    primary_mut.set_position_id(Some(position_id));
2673                }
2674                let _ = self.cache.borrow_mut().add_position_id(
2675                    &position_id,
2676                    &primary.instrument_id().venue,
2677                    &primary.client_order_id(),
2678                    &primary.strategy_id(),
2679                );
2680                log::debug!("Assigned primary order {position_id}");
2681            }
2682        }
2683
2684        position_id
2685    }
2686
2687    fn determine_hedging_position_id(
2688        &mut self,
2689        fill: OrderFilled,
2690        order: Option<&OrderAny>,
2691    ) -> PositionId {
2692        // Check if position ID already exists
2693        if let Some(position_id) = fill.position_id {
2694            if self.config.debug {
2695                log::debug!("Already had a position ID of: {position_id}");
2696            }
2697            return position_id;
2698        }
2699
2700        let cache = self.cache.borrow();
2701
2702        let exec_spawn_id = if let Some(o) = order {
2703            o.exec_spawn_id()
2704        } else {
2705            match cache.order(&fill.client_order_id()) {
2706                Some(o) => o.exec_spawn_id(),
2707                None => {
2708                    panic!(
2709                        "Order for {} not found to determine position ID",
2710                        fill.client_order_id()
2711                    );
2712                }
2713            }
2714        };
2715
2716        // Check execution spawn orders
2717        if let Some(spawn_id) = exec_spawn_id {
2718            let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
2719            for spawned_order in spawn_orders {
2720                if let Some(pos_id) = spawned_order.position_id() {
2721                    if self.config.debug {
2722                        log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
2723                    }
2724                    return pos_id;
2725                }
2726            }
2727        }
2728
2729        // Generate new position ID
2730        let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
2731
2732        if self.config.debug {
2733            log::debug!("Generated {} for {}", position_id, fill.client_order_id());
2734        }
2735        position_id
2736    }
2737
2738    fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
2739        PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
2740    }
2741
2742    fn validate_fill_for_order(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
2743        if order.is_duplicate_fill(fill) {
2744            log::warn!(
2745                "Duplicate fill: {} trade_id={} already applied, skipping",
2746                order.client_order_id(),
2747                fill.trade_id
2748            );
2749            anyhow::bail!("Duplicate fill");
2750        }
2751
2752        if let Some(position_id) = fill.position_id
2753            && self.position_contains_trade_id(position_id, fill.trade_id)
2754        {
2755            log::warn!(
2756                "Duplicate fill: {} trade_id={} already applied to position {}, skipping",
2757                order.client_order_id(),
2758                fill.trade_id,
2759                position_id
2760            );
2761            anyhow::bail!("Duplicate position fill");
2762        }
2763
2764        self.check_overfill(order, fill)
2765    }
2766
2767    fn position_contains_trade_id(&self, position_id: PositionId, trade_id: TradeId) -> bool {
2768        self.cache
2769            .borrow()
2770            .position(&position_id)
2771            .is_some_and(|position| position.trade_ids.contains(&trade_id))
2772    }
2773
2774    fn update_cached_order(
2775        &self,
2776        client_order_id: ClientOrderId,
2777        event: &OrderEventAny,
2778    ) -> Option<OrderAny> {
2779        let result = { self.cache.borrow_mut().update_order(event) };
2780
2781        let order = match result {
2782            Ok(order) => order,
2783            Err(e) => {
2784                if matches!(
2785                    e.downcast_ref::<OrderError>(),
2786                    Some(OrderError::InvalidStateTransition)
2787                ) {
2788                    log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
2789                    return None;
2790                }
2791
2792                if let Some(OrderError::DuplicateFill(trade_id)) = e.downcast_ref::<OrderError>() {
2793                    log::warn!(
2794                        "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
2795                    );
2796                    return None;
2797                }
2798
2799                log::error!("Error applying event: {e}, did not apply {event}");
2800
2801                if matches!(
2802                    event,
2803                    OrderEventAny::Denied(_)
2804                        | OrderEventAny::Rejected(_)
2805                        | OrderEventAny::Canceled(_)
2806                        | OrderEventAny::Expired(_)
2807                ) {
2808                    log::warn!(
2809                        "Terminal event {event} failed to apply to {client_order_id}, forcing cleanup from own book"
2810                    );
2811                    self.cache
2812                        .borrow_mut()
2813                        .force_remove_from_own_order_book(&client_order_id);
2814                } else {
2815                    let order = self
2816                        .cache
2817                        .borrow()
2818                        .order(&client_order_id)
2819                        .map(|o| o.clone());
2820                    if let Some(order) = order {
2821                        let should_update_own_book = {
2822                            let cache = self.cache.borrow();
2823                            let own_book = cache.own_order_book(&order.instrument_id());
2824                            (own_book.is_some() && order.is_closed())
2825                                || should_handle_own_book_order(&order)
2826                        };
2827
2828                        if should_update_own_book {
2829                            self.cache.borrow_mut().update_own_order_book(&order);
2830                        }
2831                    }
2832                }
2833                return None;
2834            }
2835        };
2836
2837        if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
2838            let needs_own_book = {
2839                self.cache
2840                    .borrow()
2841                    .own_order_book(&order.instrument_id())
2842                    .is_none()
2843            };
2844
2845            if needs_own_book {
2846                self.cache.borrow_mut().update_own_order_book(&order);
2847            }
2848        }
2849
2850        if self.config.debug {
2851            log::debug!("{SEND}{EVT} {event}");
2852        }
2853
2854        if self.config.snapshot_orders {
2855            self.create_order_state_snapshot(&order);
2856        }
2857
2858        self.send_order_update_to_portfolio(event);
2859
2860        Some(order)
2861    }
2862
2863    fn send_order_update_to_portfolio(&self, event: &OrderEventAny) {
2864        let send_to_portfolio = match event {
2865            OrderEventAny::Filled(fill) => self
2866                .cache
2867                .borrow()
2868                .account(&fill.account_id)
2869                .is_none_or(|account| !account.is_margin_account()),
2870            OrderEventAny::Accepted(_)
2871            | OrderEventAny::Canceled(_)
2872            | OrderEventAny::Expired(_)
2873            | OrderEventAny::Rejected(_)
2874            | OrderEventAny::Updated(_) => true,
2875            _ => false,
2876        };
2877
2878        if send_to_portfolio {
2879            let portfolio_endpoint = MessagingSwitchboard::portfolio_update_order();
2880            msgbus::send_order_event(portfolio_endpoint, event.clone());
2881        }
2882    }
2883
2884    fn publish_order_event(&self, event: &OrderEventAny) {
2885        let topic = switchboard::get_event_orders_topic(event.strategy_id());
2886        msgbus::publish_order_event(topic, event);
2887
2888        if let OrderEventAny::Canceled(_) = event {
2889            let cancels_topic = switchboard::get_order_cancels_topic(event.instrument_id());
2890            msgbus::publish_order_event(cancels_topic, event);
2891        }
2892    }
2893
2894    fn publish_position_events(&self, events: Vec<PositionEvent>) {
2895        for event in events {
2896            let strategy_id = match &event {
2897                PositionEvent::PositionOpened(event) => event.strategy_id,
2898                PositionEvent::PositionChanged(event) => event.strategy_id,
2899                PositionEvent::PositionClosed(event) => event.strategy_id,
2900                PositionEvent::PositionAdjusted(event) => event.strategy_id,
2901            };
2902            let topic = switchboard::get_event_positions_topic(strategy_id);
2903            msgbus::publish_position_event(topic, &event);
2904        }
2905    }
2906
2907    fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
2908        let potential_overfill = order.calculate_overfill(fill.last_qty);
2909
2910        if potential_overfill.is_positive() {
2911            if self.config.allow_overfills {
2912                log::warn!(
2913                    "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
2914                    order.client_order_id(),
2915                    potential_overfill,
2916                    order.filled_qty(),
2917                    fill.last_qty,
2918                    order.quantity()
2919                );
2920            } else {
2921                let msg = format!(
2922                    "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
2923                Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
2924                    order.client_order_id(),
2925                    potential_overfill,
2926                    order.filled_qty(),
2927                    fill.last_qty,
2928                    order.quantity()
2929                );
2930                anyhow::bail!("{msg}");
2931            }
2932        }
2933
2934        Ok(())
2935    }
2936
2937    fn handle_order_fill(
2938        &mut self,
2939        order: &OrderAny,
2940        fill: OrderFilled,
2941        oms_type: OmsType,
2942    ) -> Vec<PositionEvent> {
2943        let instrument =
2944            if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
2945                instrument.clone()
2946            } else {
2947                log::error!(
2948                    "Cannot handle order fill: no instrument found for {}, {fill}",
2949                    fill.instrument_id,
2950                );
2951                return Vec::new();
2952            };
2953
2954        let is_margin_account = {
2955            let cache = self.cache.borrow();
2956            let Some(account) = cache.account(&fill.account_id) else {
2957                log::error!(
2958                    "Cannot handle order fill: no account found for {}, {fill}",
2959                    fill.instrument_id.venue,
2960                );
2961                return Vec::new();
2962            };
2963
2964            account.is_margin_account()
2965        };
2966
2967        // Skip portfolio position updates for combo fills (spread instruments)
2968        // Combo fills are only used for order management, not portfolio updates
2969        if !instrument.is_spread() && is_margin_account {
2970            let portfolio_endpoint = MessagingSwitchboard::portfolio_update_order();
2971            msgbus::send_order_event(portfolio_endpoint, OrderEventAny::Filled(fill));
2972        }
2973
2974        let (position, position_events) = if instrument.is_spread() {
2975            (None, Vec::new())
2976        } else {
2977            let position_events = self.handle_position_update(&instrument, fill, oms_type);
2978            let position_id = fill.position_id.unwrap();
2979            (
2980                self.cache.borrow().position_owned(&position_id),
2981                position_events,
2982            )
2983        };
2984
2985        // Handle contingent orders for both spread and non-spread instruments
2986        // For spread instruments, contingent orders work without position linkage
2987        if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
2988            // For non-spread instruments, link to position if available
2989            if !instrument.is_spread()
2990                && let Some(ref pos) = position
2991                && pos.is_open()
2992            {
2993                let position_id = pos.id;
2994
2995                for client_order_id in order.linked_order_ids().unwrap_or_default() {
2996                    // Take a scoped write borrow on the contingent's cell. The borrow drops at
2997                    // the end of `and_then` so the subsequent `add_position_id` on the cache is
2998                    // free to take `&mut Cache`.
2999                    let link = self.cache.borrow_mut().order_mut(client_order_id).and_then(
3000                        |mut contingent_order| {
3001                            if contingent_order.position_id().is_none() {
3002                                contingent_order.set_position_id(Some(position_id));
3003                                Some((
3004                                    contingent_order.instrument_id().venue,
3005                                    contingent_order.client_order_id(),
3006                                    contingent_order.strategy_id(),
3007                                ))
3008                            } else {
3009                                None
3010                            }
3011                        },
3012                    );
3013
3014                    if let Some((venue, contingent_id, strategy_id)) = link
3015                        && let Err(e) = self.cache.borrow_mut().add_position_id(
3016                            &position_id,
3017                            &venue,
3018                            &contingent_id,
3019                            &strategy_id,
3020                        )
3021                    {
3022                        log::error!("Failed to add position ID: {e}");
3023                    }
3024                }
3025            }
3026            // For spread instruments, contingent orders can still be triggered
3027            // but without position linkage (since no position is created for spreads)
3028        }
3029
3030        let event = OrderEventAny::Filled(fill);
3031        let fills_topic = switchboard::get_order_fills_topic(fill.instrument_id);
3032        msgbus::publish_order_event(fills_topic, &event);
3033
3034        position_events
3035    }
3036
3037    /// Handle position creation or update for a fill.
3038    ///
3039    /// This function mirrors the Python `_handle_position_update` method.
3040    fn handle_position_update(
3041        &mut self,
3042        instrument: &InstrumentAny,
3043        fill: OrderFilled,
3044        oms_type: OmsType,
3045    ) -> Vec<PositionEvent> {
3046        let position_id = if let Some(position_id) = fill.position_id {
3047            position_id
3048        } else {
3049            log::error!("Cannot handle position update: no position ID found for fill {fill}");
3050            return Vec::new();
3051        };
3052
3053        let position_opt = self.cache.borrow().position_owned(&position_id);
3054
3055        match position_opt {
3056            None => {
3057                if self.reject_reduce_only_netting_position_open(&fill, oms_type) {
3058                    return Vec::new();
3059                }
3060
3061                self.open_position(instrument, None, fill, oms_type)
3062                    .unwrap_or_default()
3063            }
3064            Some(pos) if pos.is_closed() => {
3065                if self.reject_reduce_only_netting_position_open(&fill, oms_type) {
3066                    return Vec::new();
3067                }
3068
3069                self.open_position(instrument, Some(&pos), fill, oms_type)
3070                    .unwrap_or_default()
3071            }
3072            Some(mut pos) => {
3073                if self.will_flip_position(&pos, fill) {
3074                    self.flip_position(instrument, &mut pos, fill, oms_type)
3075                } else {
3076                    self.update_position(&mut pos, fill).into_iter().collect()
3077                }
3078            }
3079        }
3080    }
3081
3082    fn reject_reduce_only_netting_position_open(
3083        &self,
3084        fill: &OrderFilled,
3085        oms_type: OmsType,
3086    ) -> bool {
3087        if oms_type != OmsType::Netting {
3088            return false;
3089        }
3090
3091        let cache = self.cache.borrow();
3092        let Some(order) = cache.order_owned(&fill.client_order_id) else {
3093            return false;
3094        };
3095
3096        if !order.is_reduce_only() {
3097            return false;
3098        }
3099
3100        let positions_open = cache.positions_open(
3101            None,
3102            Some(&fill.instrument_id),
3103            None,
3104            Some(&fill.account_id),
3105            None,
3106        );
3107        let position_id = fill
3108            .position_id
3109            .map_or_else(|| "None".to_string(), |position_id| position_id.to_string());
3110        let matching_position_details = Self::position_details(
3111            positions_open
3112                .iter()
3113                .filter(|position| position.is_opposite_side(fill.order_side))
3114                .map(|position| &**position),
3115        );
3116        let open_position_details =
3117            Self::position_details(positions_open.iter().map(|position| &**position));
3118
3119        log::error!(
3120            "Cannot open NETTING position {position_id} from reduce-only fill {} for {}; \
3121             matching_reduce_positions=[{}], open_positions=[{}]",
3122            fill.trade_id,
3123            fill.instrument_id,
3124            matching_position_details,
3125            open_position_details
3126        );
3127
3128        true
3129    }
3130
3131    fn open_position(
3132        &self,
3133        instrument: &InstrumentAny,
3134        position: Option<&Position>,
3135        fill: OrderFilled,
3136        oms_type: OmsType,
3137    ) -> anyhow::Result<Vec<PositionEvent>> {
3138        if let Some(position) = position {
3139            if Self::is_duplicate_closed_fill(position, &fill) {
3140                log::warn!(
3141                    "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
3142                    fill.trade_id,
3143                    position.id,
3144                    fill.order_side,
3145                    fill.last_qty,
3146                    fill.last_px
3147                );
3148                return Ok(Vec::new());
3149            }
3150            self.reopen_position(position, oms_type)?;
3151        }
3152
3153        let position = Position::new(instrument, fill);
3154        self.cache.borrow_mut().add_position(&position, oms_type)?;
3155
3156        if self.config.snapshot_positions {
3157            self.create_position_state_snapshot(&position);
3158        }
3159
3160        let ts_init = self.clock.borrow().timestamp_ns();
3161        let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
3162
3163        Ok(vec![PositionEvent::PositionOpened(event)])
3164    }
3165
3166    fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
3167        position.events.iter().any(|event| {
3168            event.trade_id == fill.trade_id
3169                && event.order_side == fill.order_side
3170                && event.last_px == fill.last_px
3171                && event.last_qty == fill.last_qty
3172        })
3173    }
3174
3175    fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
3176        if oms_type == OmsType::Netting {
3177            if position.is_open() {
3178                anyhow::bail!(
3179                    "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
3180                    position.id
3181                );
3182            }
3183            // Snapshot closed position if reopening (NETTING mode)
3184            let snapshot_ref = self.cache.borrow_mut().snapshot_position(position)?;
3185            self.anchor_snapshot(snapshot_ref);
3186        } else {
3187            // HEDGING mode
3188            log::warn!(
3189                "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
3190                position.id
3191            );
3192        }
3193        Ok(())
3194    }
3195
3196    fn anchor_snapshot(&self, snapshot_ref: CacheSnapshotRef) {
3197        let Some(anchorer) = &self.snapshot_anchorer else {
3198            return;
3199        };
3200
3201        if let Err(e) = anchorer(snapshot_ref) {
3202            log::error!("Failed to record cache snapshot anchor: {e}");
3203        }
3204    }
3205
3206    fn update_position(&self, position: &mut Position, fill: OrderFilled) -> Option<PositionEvent> {
3207        // Apply the fill to the position
3208        position.apply(&fill);
3209
3210        // Check if position is closed after applying the fill
3211        let is_closed = position.is_closed();
3212
3213        // Update position in cache - this should handle the closed state tracking
3214        if let Err(e) = self.cache.borrow_mut().update_position(position) {
3215            log::error!("Failed to update position: {e:?}");
3216            return None;
3217        }
3218
3219        // Verify cache state after update
3220        let cache = self.cache.borrow();
3221
3222        drop(cache);
3223
3224        // Create position state snapshot if enabled
3225        if self.config.snapshot_positions {
3226            self.create_position_state_snapshot(position);
3227        }
3228
3229        let ts_init = self.clock.borrow().timestamp_ns();
3230
3231        if is_closed {
3232            let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
3233            Some(PositionEvent::PositionClosed(event))
3234        } else {
3235            let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
3236            Some(PositionEvent::PositionChanged(event))
3237        }
3238    }
3239
3240    fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
3241        position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
3242    }
3243
3244    fn position_signed_decimal_qty(position: &Position) -> Decimal {
3245        match position.side {
3246            PositionSide::Long => position.quantity.as_decimal(),
3247            PositionSide::Short => -position.quantity.as_decimal(),
3248            _ => Decimal::ZERO,
3249        }
3250    }
3251
3252    fn position_details<'a>(positions: impl IntoIterator<Item = &'a Position>) -> String {
3253        positions
3254            .into_iter()
3255            .map(|position| {
3256                format!(
3257                    "{} strategy_id={} signed_qty={}",
3258                    position.id,
3259                    position.strategy_id,
3260                    Self::position_signed_decimal_qty(position)
3261                )
3262            })
3263            .collect::<Vec<_>>()
3264            .join(", ")
3265    }
3266
3267    fn flip_position(
3268        &mut self,
3269        instrument: &InstrumentAny,
3270        position: &mut Position,
3271        fill: OrderFilled,
3272        oms_type: OmsType,
3273    ) -> Vec<PositionEvent> {
3274        let mut position_events = Vec::new();
3275        let difference = match position.side {
3276            PositionSide::Long => Quantity::from_raw(
3277                fill.last_qty.raw - position.quantity.raw,
3278                position.size_precision,
3279            ),
3280            PositionSide::Short => Quantity::from_raw(
3281                position.quantity.raw.abs_diff(fill.last_qty.raw), // Equivalent to Python's abs(position.quantity - fill.last_qty)
3282                position.size_precision,
3283            ),
3284            _ => fill.last_qty,
3285        };
3286
3287        // Split commission between two positions
3288        let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
3289        let (commission1, commission2) = if let Some(commission) = fill.commission {
3290            let commission_currency = commission.currency;
3291            let commission1 = Money::new(commission * fill_percent, commission_currency);
3292            let commission2 = commission - commission1;
3293            (Some(commission1), Some(commission2))
3294        } else {
3295            log::error!("Commission is not available");
3296            (None, None)
3297        };
3298
3299        let mut fill_split1: Option<OrderFilled> = None;
3300
3301        if position.is_open() {
3302            fill_split1 = Some(OrderFilled::new(
3303                fill.trader_id,
3304                fill.strategy_id,
3305                fill.instrument_id,
3306                fill.client_order_id,
3307                fill.venue_order_id,
3308                fill.account_id,
3309                fill.trade_id,
3310                fill.order_side,
3311                fill.order_type,
3312                position.quantity,
3313                fill.last_px,
3314                fill.currency,
3315                fill.liquidity_side,
3316                fill.event_id,
3317                fill.ts_event,
3318                fill.ts_init,
3319                fill.reconciliation,
3320                fill.position_id,
3321                commission1,
3322            ));
3323
3324            if let Some(position_event) = self.update_position(position, fill_split1.unwrap()) {
3325                position_events.push(position_event);
3326            }
3327
3328            // Snapshot closed position before reusing ID (NETTING mode)
3329            if oms_type == OmsType::Netting {
3330                match self.cache.borrow_mut().snapshot_position(position) {
3331                    Ok(snapshot_ref) => self.anchor_snapshot(snapshot_ref),
3332                    Err(e) => log::error!("Failed to snapshot position during flip: {e:?}"),
3333                }
3334            }
3335        }
3336
3337        // Guard against flipping a position with a zero fill size
3338        if difference.raw == 0 {
3339            log::warn!(
3340                "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
3341            );
3342            return position_events;
3343        }
3344
3345        let position_id_flip = if oms_type == OmsType::Hedging
3346            && let Some(position_id) = fill.position_id
3347            && position_id.is_virtual()
3348        {
3349            // Generate new position ID for flipped virtual position (Hedging OMS only)
3350            Some(self.pos_id_generator.generate(fill.strategy_id, true))
3351        } else {
3352            // Default: use the same position ID as the fill (Python behavior)
3353            fill.position_id
3354        };
3355
3356        let fill_split2 = OrderFilled::new(
3357            fill.trader_id,
3358            fill.strategy_id,
3359            fill.instrument_id,
3360            fill.client_order_id,
3361            fill.venue_order_id,
3362            fill.account_id,
3363            fill.trade_id,
3364            fill.order_side,
3365            fill.order_type,
3366            difference,
3367            fill.last_px,
3368            fill.currency,
3369            fill.liquidity_side,
3370            UUID4::new(),
3371            fill.ts_event,
3372            fill.ts_init,
3373            fill.reconciliation,
3374            position_id_flip,
3375            commission2,
3376        );
3377
3378        if oms_type == OmsType::Hedging
3379            && let Some(position_id) = fill.position_id
3380            && position_id.is_virtual()
3381        {
3382            log::warn!("Closing position {fill_split1:?}");
3383            log::warn!("Flipping position {fill_split2:?}");
3384        }
3385
3386        // Open flipped position
3387        match self.open_position(instrument, None, fill_split2, oms_type) {
3388            Ok(opened_events) => position_events.extend(opened_events),
3389            Err(e) => log::error!("Failed to open flipped position: {e:?}"),
3390        }
3391
3392        position_events
3393    }
3394
3395    /// Sets the internal position ID generator counts based on existing cached positions.
3396    pub fn set_position_id_counts(&mut self) {
3397        let cache = self.cache.borrow();
3398        let positions = cache.positions(None, None, None, None, None);
3399
3400        // Count positions per instrument_id using a HashMap
3401        let mut counts: HashMap<StrategyId, usize> = HashMap::new();
3402
3403        for position in positions {
3404            *counts.entry(position.strategy_id).or_insert(0) += 1;
3405        }
3406
3407        self.pos_id_generator.reset();
3408
3409        for (strategy_id, count) in counts {
3410            self.pos_id_generator.set_count(count, strategy_id);
3411            log::info!("Set PositionId count for {strategy_id} to {count}");
3412        }
3413    }
3414
3415    fn deny_order(&self, order: &OrderAny, reason: &str) {
3416        let denied = OrderDenied::new(
3417            order.trader_id(),
3418            order.strategy_id(),
3419            order.instrument_id(),
3420            order.client_order_id(),
3421            reason.into(),
3422            UUID4::new(),
3423            self.clock.borrow().timestamp_ns(),
3424            self.clock.borrow().timestamp_ns(),
3425        );
3426
3427        let event = OrderEventAny::Denied(denied);
3428        let order = match self.cache.borrow_mut().update_order(&event) {
3429            Ok(order) => order,
3430            Err(e) => {
3431                log::error!("Failed to apply denied event to order: {e}");
3432                return;
3433            }
3434        };
3435
3436        let topic = switchboard::get_event_orders_topic(order.strategy_id());
3437        msgbus::publish_order_event(topic, &event);
3438
3439        if self.config.snapshot_orders {
3440            self.create_order_state_snapshot(&order);
3441        }
3442    }
3443
3444    fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
3445        let mut cache = self.cache.borrow_mut();
3446        if cache.own_order_book_mut(instrument_id).is_none() {
3447            let own_book = OwnOrderBook::new(*instrument_id);
3448            cache.add_own_order_book(own_book).unwrap();
3449        }
3450
3451        RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
3452    }
3453}
3454
3455#[cfg(test)]
3456mod tests {
3457    use nautilus_model::{
3458        enums::{LiquiditySide, OrderSide, PositionSideSpecified},
3459        events::order::spec::OrderFilledSpec,
3460        identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
3461        instruments::{InstrumentAny, stubs::audusd_sim},
3462        types::Price,
3463    };
3464    use rstest::*;
3465
3466    use super::*;
3467
3468    #[rstest]
3469    fn netting_positions_open_for_report_scopes_positions_by_account() {
3470        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3471        let account1_id = AccountId::from("SIM-001");
3472        let account2_id = AccountId::from("SIM-002");
3473        let position1 = position_for_account(
3474            &instrument,
3475            account1_id,
3476            StrategyId::from("S-001"),
3477            PositionId::from("P-ACC-1"),
3478            OrderSide::Buy,
3479            Quantity::from(1_000),
3480        );
3481        let position2 = position_for_account(
3482            &instrument,
3483            account2_id,
3484            StrategyId::from("S-002"),
3485            PositionId::from("P-ACC-2"),
3486            OrderSide::Buy,
3487            Quantity::from(2_000),
3488        );
3489        let mut cache = Cache::default();
3490        cache.add_position(&position1, OmsType::Netting).unwrap();
3491        cache.add_position(&position2, OmsType::Netting).unwrap();
3492
3493        let report = PositionStatusReport::new(
3494            account1_id,
3495            instrument.id(),
3496            PositionSideSpecified::Long,
3497            Quantity::from(1_000),
3498            UnixNanos::from(1_000_000),
3499            UnixNanos::from(1_000_000),
3500            None,
3501            None,
3502            None,
3503        );
3504
3505        let positions_open = ExecutionEngine::netting_positions_open_for_report(&cache, &report);
3506        let signed_qty: Decimal = positions_open
3507            .iter()
3508            .map(|position| ExecutionEngine::position_signed_decimal_qty(position))
3509            .sum();
3510
3511        assert_eq!(positions_open.len(), 1);
3512        assert_eq!(positions_open[0].id, position1.id);
3513        assert_eq!(signed_qty, Decimal::from(1_000));
3514    }
3515
3516    #[rstest]
3517    fn netting_split_position_ownership_message_reports_only_split_ownership() {
3518        let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3519        let account_id = AccountId::from("SIM-001");
3520        let external_position = position_for_account(
3521            &instrument,
3522            account_id,
3523            StrategyId::from("EXTERNAL"),
3524            PositionId::from("P-EXTERNAL"),
3525            OrderSide::Buy,
3526            Quantity::from(1_000),
3527        );
3528        let strategy_position = position_for_account(
3529            &instrument,
3530            account_id,
3531            StrategyId::from("S-001"),
3532            PositionId::from("P-STRATEGY"),
3533            OrderSide::Buy,
3534            Quantity::from(500),
3535        );
3536        let same_strategy_position = position_for_account(
3537            &instrument,
3538            account_id,
3539            StrategyId::from("EXTERNAL"),
3540            PositionId::from("P-EXTERNAL-2"),
3541            OrderSide::Buy,
3542            Quantity::from(250),
3543        );
3544        let report = PositionStatusReport::new(
3545            account_id,
3546            instrument.id(),
3547            PositionSideSpecified::Long,
3548            Quantity::from(1_500),
3549            UnixNanos::from(1_000_000),
3550            UnixNanos::from(1_000_000),
3551            None,
3552            None,
3553            None,
3554        );
3555
3556        let message = ExecutionEngine::netting_split_position_ownership_message(
3557            &report,
3558            &[&external_position, &strategy_position],
3559        )
3560        .expect("split ownership should produce a warning message");
3561
3562        assert!(message.contains("account_id=SIM-001"));
3563        assert!(message.contains(&format!("instrument_id={}", instrument.id())));
3564        assert!(message.contains("EXTERNAL"));
3565        assert!(message.contains("S-001"));
3566        assert!(message.contains("P-EXTERNAL"));
3567        assert!(message.contains("P-STRATEGY"));
3568        assert!(message.contains("signed_qty=1000"));
3569        assert!(message.contains("signed_qty=500"));
3570        assert!(
3571            ExecutionEngine::netting_split_position_ownership_message(
3572                &report,
3573                &[&external_position, &same_strategy_position],
3574            )
3575            .is_none()
3576        );
3577    }
3578
3579    fn position_for_account(
3580        instrument: &InstrumentAny,
3581        account_id: AccountId,
3582        strategy_id: StrategyId,
3583        position_id: PositionId,
3584        order_side: OrderSide,
3585        quantity: Quantity,
3586    ) -> Position {
3587        let client_order_id = ClientOrderId::from(format!("O-{position_id}"));
3588        let fill = OrderFilledSpec::builder()
3589            .strategy_id(strategy_id)
3590            .instrument_id(instrument.id())
3591            .client_order_id(client_order_id)
3592            .venue_order_id(VenueOrderId::from(format!("V-{position_id}")))
3593            .account_id(account_id)
3594            .trade_id(TradeId::new(format!("T-{position_id}")))
3595            .order_side(order_side)
3596            .last_qty(quantity)
3597            .last_px(Price::from("1.0"))
3598            .currency(instrument.quote_currency())
3599            .liquidity_side(LiquiditySide::Maker)
3600            .position_id(position_id)
3601            .commission(Money::from("2 USD"))
3602            .build();
3603
3604        Position::new(instrument, fill)
3605    }
3606}