1pub mod config;
24pub mod stubs;
25
26use std::{
27 cell::{Cell, RefCell, RefMut},
28 collections::{HashMap, HashSet},
29 fmt::{Debug, Display},
30 rc::Rc,
31 time::SystemTime,
32};
33
34use ahash::AHashSet;
35use config::ExecutionEngineConfig;
36use futures::future::join_all;
37use indexmap::{IndexMap, IndexSet};
38use nautilus_common::{
39 cache::{Cache, CacheSnapshotRef, PositionRef},
40 clients::ExecutionClient,
41 clock::Clock,
42 generators::position_id::PositionIdGenerator,
43 logging::{CMD, EVT, RECV, SEND},
44 messages::{
45 ExecutionReport,
46 execution::{
47 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
48 SubmitOrder, SubmitOrderList, TradingCommand,
49 },
50 },
51 msgbus::{
52 self, MessagingSwitchboard, TypedHandler, TypedIntoHandler, get_message_bus,
53 switchboard::{self},
54 },
55 runner::try_get_trading_cmd_sender,
56 timer::{TimeEvent, TimeEventCallback},
57};
58use nautilus_core::{
59 UUID4, UnixNanos, WeakCell,
60 datetime::{mins_to_nanos, mins_to_secs},
61};
62use nautilus_model::{
63 accounts::Account,
64 enums::{
65 ContingencyType, OmsType, OrderStatus, OrderType, PositionSide, TimeInForce,
66 TrailingOffsetType,
67 },
68 events::{
69 OrderAccepted, OrderCanceled, OrderDenied, OrderEvent, OrderEventAny, OrderExpired,
70 OrderFilled, OrderInitialized, PositionChanged, PositionClosed, PositionEvent,
71 PositionOpened,
72 },
73 identifiers::{
74 AccountId, ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, Venue,
75 VenueOrderId,
76 },
77 instruments::{Instrument, InstrumentAny},
78 orderbook::own::{OwnBookOrder, OwnOrderBook, should_handle_own_book_order},
79 orders::{Order, OrderAny, OrderError},
80 position::Position,
81 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
82 types::{Money, Quantity},
83};
84use rust_decimal::Decimal;
85
86use crate::{
87 client::ExecutionClientAdapter,
88 reconciliation::{
89 check_position_reconciliation, create_incremental_inferred_fill,
90 generate_external_order_status_events, generate_reconciliation_order_events,
91 reconcile_fill_report as reconcile_fill,
92 },
93};
94
95const TIMER_PURGE_CLOSED_ORDERS: &str = "ExecEngine_PURGE_CLOSED_ORDERS";
96const TIMER_PURGE_CLOSED_POSITIONS: &str = "ExecEngine_PURGE_CLOSED_POSITIONS";
97const TIMER_PURGE_ACCOUNT_EVENTS: &str = "ExecEngine_PURGE_ACCOUNT_EVENTS";
98
99pub type SnapshotAnchorer = Rc<dyn Fn(CacheSnapshotRef) -> anyhow::Result<()>>;
101
102pub struct ExecutionEngine {
109 clock: Rc<RefCell<dyn Clock>>,
110 cache: Rc<RefCell<Cache>>,
111 clients: IndexMap<ClientId, ExecutionClientAdapter>,
112 default_client: Option<ExecutionClientAdapter>,
113 routing_map: HashMap<Venue, ClientId>,
114 oms_overrides: HashMap<StrategyId, OmsType>,
115 external_order_claims: HashMap<InstrumentId, StrategyId>,
116 external_clients: HashSet<ClientId>,
117 pos_id_generator: PositionIdGenerator,
118 config: ExecutionEngineConfig,
119 command_count: Cell<u64>,
120 event_count: u64,
121 report_count: u64,
122 filtered_unclaimed_external_order_count: u64,
123 snapshot_anchorer: Option<SnapshotAnchorer>,
124}
125
126impl Debug for ExecutionEngine {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct(stringify!(ExecutionEngine))
129 .field("client_count", &self.clients.len())
130 .finish()
131 }
132}
133
134impl ExecutionEngine {
135 pub fn new(
137 clock: Rc<RefCell<dyn Clock>>,
138 cache: Rc<RefCell<Cache>>,
139 config: Option<ExecutionEngineConfig>,
140 ) -> Self {
141 let trader_id = get_message_bus().borrow().trader_id;
142 Self {
143 clock: clock.clone(),
144 cache,
145 clients: IndexMap::new(),
146 default_client: None,
147 routing_map: HashMap::new(),
148 oms_overrides: HashMap::new(),
149 external_order_claims: HashMap::new(),
150 external_clients: config
151 .as_ref()
152 .and_then(|c| c.external_clients.clone())
153 .unwrap_or_default()
154 .into_iter()
155 .collect(),
156 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
157 config: config.unwrap_or_default(),
158 command_count: Cell::new(0),
159 event_count: 0,
160 report_count: 0,
161 filtered_unclaimed_external_order_count: 0,
162 snapshot_anchorer: None,
163 }
164 }
165
166 pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
168 let weak = WeakCell::from(Rc::downgrade(engine));
169
170 let weak1 = weak.clone();
171 msgbus::register_trading_command_endpoint(
172 MessagingSwitchboard::exec_engine_execute(),
173 TypedIntoHandler::from(move |cmd: TradingCommand| {
174 if let Some(rc) = weak1.upgrade() {
175 rc.borrow().execute(cmd);
176 }
177 }),
178 );
179
180 msgbus::register_trading_command_endpoint(
183 MessagingSwitchboard::exec_engine_queue_execute(),
184 TypedIntoHandler::from(move |cmd: TradingCommand| {
185 if let Some(sender) = try_get_trading_cmd_sender() {
186 sender.execute(cmd);
187 } else {
188 let endpoint = MessagingSwitchboard::exec_engine_execute();
189 msgbus::send_trading_command(endpoint, cmd);
190 }
191 }),
192 );
193
194 let weak2 = weak.clone();
195 msgbus::register_order_event_endpoint(
196 MessagingSwitchboard::exec_engine_process(),
197 TypedIntoHandler::from(move |event: OrderEventAny| {
198 if let Some(rc) = weak2.upgrade() {
199 rc.borrow_mut().process(&event);
200 }
201 }),
202 );
203
204 let weak3 = weak;
205 msgbus::register_execution_report_endpoint(
206 MessagingSwitchboard::exec_engine_reconcile_execution_report(),
207 TypedIntoHandler::from(move |report: ExecutionReport| {
208 if let Some(rc) = weak3.upgrade() {
209 rc.borrow_mut().reconcile_execution_report(&report);
210 }
211 }),
212 );
213 }
214
215 #[must_use]
217 pub fn command_count(&self) -> u64 {
218 self.command_count.get()
219 }
220
221 #[must_use]
223 pub const fn event_count(&self) -> u64 {
224 self.event_count
225 }
226
227 #[must_use]
229 pub const fn report_count(&self) -> u64 {
230 self.report_count
231 }
232
233 #[must_use]
235 pub const fn filtered_unclaimed_external_order_count(&self) -> u64 {
236 self.filtered_unclaimed_external_order_count
237 }
238
239 pub fn subscribe_venue_instruments(engine: &Rc<RefCell<Self>>, venue: Venue) {
244 let weak = WeakCell::from(Rc::downgrade(engine));
245 let pattern = switchboard::get_instruments_pattern(venue);
246
247 let handler = TypedHandler::from(move |instrument: &InstrumentAny| {
248 if let Some(rc) = weak.upgrade() {
249 let venue = instrument.id().venue;
250 let client_id = rc.borrow().routing_map.get(&venue).copied();
251 if let Some(client_id) = client_id {
252 let mut engine = rc.borrow_mut();
253 if let Some(adapter) = engine.get_client_adapter_mut(&client_id) {
254 adapter.on_instrument(instrument.clone());
255 }
256 }
257 }
258 });
259
260 msgbus::subscribe_instruments(pattern, handler, None);
261 log::info!("Subscribed to instrument updates for venue {venue}");
262 }
263
264 #[must_use]
265 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
267 self.pos_id_generator.count(strategy_id)
268 }
269
270 #[must_use]
271 pub fn cache(&self) -> &Rc<RefCell<Cache>> {
273 &self.cache
274 }
275
276 #[must_use]
277 pub const fn config(&self) -> &ExecutionEngineConfig {
279 &self.config
280 }
281
282 pub fn set_snapshot_anchorer(&mut self, anchorer: Option<SnapshotAnchorer>) {
287 self.snapshot_anchorer = anchorer;
288 }
289
290 #[must_use]
291 pub fn check_integrity(&self) -> bool {
293 self.cache.borrow_mut().check_integrity()
294 }
295
296 #[must_use]
297 pub fn check_connected(&self) -> bool {
299 let clients_connected = self.clients.values().all(|c| c.is_connected());
300 let default_connected = self
301 .default_client
302 .as_ref()
303 .is_none_or(|c| c.is_connected());
304 clients_connected && default_connected
305 }
306
307 #[must_use]
308 pub fn check_disconnected(&self) -> bool {
310 let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
311 let default_disconnected = self
312 .default_client
313 .as_ref()
314 .is_none_or(|c| !c.is_connected());
315 clients_disconnected && default_disconnected
316 }
317
318 #[must_use]
320 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
321 let mut status: Vec<_> = self
322 .clients
323 .values()
324 .map(|c| (c.client_id(), c.is_connected()))
325 .collect();
326
327 if let Some(default) = &self.default_client {
328 status.push((default.client_id(), default.is_connected()));
329 }
330
331 status
332 }
333
334 #[must_use]
335 pub fn check_residuals(&self) -> bool {
337 self.cache.borrow().check_residuals()
338 }
339
340 #[must_use]
341 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
343 self.external_order_claims.keys().copied().collect()
344 }
345
346 #[must_use]
347 pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
349 self.external_clients.clone()
350 }
351
352 #[must_use]
353 pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
355 self.external_order_claims.get(instrument_id).copied()
356 }
357
358 pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
364 let client_id = client.client_id();
365 let venue = client.venue();
366
367 if self.clients.contains_key(&client_id) {
368 anyhow::bail!("Client already registered with ID {client_id}");
369 }
370
371 let adapter = ExecutionClientAdapter::new(client);
372
373 if let Some(existing_client_id) = self.routing_map.get(&venue) {
374 anyhow::bail!(
375 "Venue {venue} already routed to {existing_client_id}, \
376 cannot register {client_id} for the same venue"
377 );
378 }
379
380 self.routing_map.insert(venue, client_id);
381 log::debug!("Registered client {client_id}");
382 self.clients.insert(client_id, adapter);
383 Ok(())
384 }
385
386 pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
388 let client_id = client.client_id();
389 let adapter = ExecutionClientAdapter::new(client);
390
391 log::debug!("Registered default client {client_id}");
392 self.default_client = Some(adapter);
393 }
394
395 #[must_use]
396 pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
398 self.clients.get(client_id).map(|a| a.client.as_ref())
399 }
400
401 #[must_use]
402 pub fn get_client_adapter_mut(
404 &mut self,
405 client_id: &ClientId,
406 ) -> Option<&mut ExecutionClientAdapter> {
407 if let Some(default) = &self.default_client
408 && &default.client_id == client_id
409 {
410 return self.default_client.as_mut();
411 }
412 self.clients.get_mut(client_id)
413 }
414
415 pub async fn generate_mass_status(
421 &mut self,
422 client_id: &ClientId,
423 lookback_mins: Option<u64>,
424 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
425 if let Some(client) = self.get_client_adapter_mut(client_id) {
426 client.generate_mass_status(lookback_mins).await
427 } else {
428 anyhow::bail!("Client {client_id} not found")
429 }
430 }
431
432 pub fn register_external_order(
437 &self,
438 client_order_id: ClientOrderId,
439 venue_order_id: VenueOrderId,
440 instrument_id: InstrumentId,
441 strategy_id: StrategyId,
442 ts_init: UnixNanos,
443 ) {
444 let venue = instrument_id.venue;
445 if let Some(client_id) = self.routing_map.get(&venue) {
446 if let Some(client) = self.clients.get(client_id) {
447 client.register_external_order(
448 client_order_id,
449 venue_order_id,
450 instrument_id,
451 strategy_id,
452 ts_init,
453 );
454 }
455 } else if let Some(default) = &self.default_client {
456 default.register_external_order(
457 client_order_id,
458 venue_order_id,
459 instrument_id,
460 strategy_id,
461 ts_init,
462 );
463 }
464 }
465
466 #[must_use]
467 pub fn client_ids(&self) -> Vec<ClientId> {
469 let mut ids: Vec<_> = self.clients.keys().copied().collect();
470
471 if let Some(default) = &self.default_client {
472 ids.push(default.client_id);
473 }
474 ids
475 }
476
477 #[must_use]
478 pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
480 let mut adapters: Vec<_> = self.clients.values_mut().collect();
481
482 if let Some(default) = &mut self.default_client {
483 adapters.push(default);
484 }
485 adapters
486 }
487
488 #[must_use]
490 pub fn get_all_clients(&self) -> Vec<&dyn ExecutionClient> {
491 let mut clients: Vec<&dyn ExecutionClient> =
492 self.clients.values().map(|a| a.client.as_ref()).collect();
493
494 if let Some(default) = &self.default_client {
495 clients.push(default.client.as_ref());
496 }
497
498 clients
499 }
500
501 #[must_use]
502 pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
507 let mut client_ids: IndexSet<ClientId> = IndexSet::new();
508 let mut venues: IndexSet<Venue> = IndexSet::new();
509
510 for order in orders {
512 venues.insert(order.instrument_id().venue);
513 if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
514 client_ids.insert(*client_id);
515 }
516 }
517
518 let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
519
520 for client_id in &client_ids {
522 if let Some(adapter) = self.clients.get(client_id)
523 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
524 {
525 clients.push(adapter.client.as_ref());
526 }
527 }
528
529 for venue in &venues {
531 if let Some(client_id) = self.routing_map.get(venue) {
532 if let Some(adapter) = self.clients.get(client_id)
533 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
534 {
535 clients.push(adapter.client.as_ref());
536 }
537 } else if let Some(adapter) = &self.default_client
538 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
539 {
540 clients.push(adapter.client.as_ref());
541 }
542 }
543
544 clients
545 }
546
547 pub fn register_venue_routing(
553 &mut self,
554 client_id: ClientId,
555 venue: Venue,
556 ) -> anyhow::Result<()> {
557 if !self.clients.contains_key(&client_id) {
558 anyhow::bail!("No client registered with ID {client_id}");
559 }
560
561 if let Some(existing_client_id) = self.routing_map.get(&venue)
562 && *existing_client_id != client_id
563 {
564 anyhow::bail!(
565 "Venue {venue} already routed to {existing_client_id}, \
566 cannot re-route to {client_id}"
567 );
568 }
569
570 self.routing_map.insert(venue, client_id);
571 log::info!("Set client {client_id} routing for {venue}");
572 Ok(())
573 }
574
575 pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
579 self.oms_overrides.insert(strategy_id, oms_type);
580 log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
581 }
582
583 pub fn register_external_order_claims(
594 &mut self,
595 strategy_id: StrategyId,
596 instrument_ids: &HashSet<InstrumentId>,
597 ) -> anyhow::Result<()> {
598 for instrument_id in instrument_ids {
600 if let Some(existing) = self.external_order_claims.get(instrument_id) {
601 anyhow::bail!(
602 "External order claim for {instrument_id} already exists for {existing}"
603 );
604 }
605 }
606
607 for instrument_id in instrument_ids {
609 self.external_order_claims
610 .insert(*instrument_id, strategy_id);
611 }
612
613 if !instrument_ids.is_empty() {
614 log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
615 }
616
617 Ok(())
618 }
619
620 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
624 if self.clients.shift_remove(&client_id).is_some() {
625 self.routing_map
627 .retain(|_, mapped_id| mapped_id != &client_id);
628 log::info!("Deregistered client {client_id}");
629 Ok(())
630 } else {
631 anyhow::bail!("No client registered with ID {client_id}")
632 }
633 }
634
635 pub async fn connect(&mut self) {
639 let futures: Vec<_> = self
640 .get_clients_mut()
641 .into_iter()
642 .map(ExecutionClientAdapter::connect)
643 .collect();
644
645 let results = join_all(futures).await;
646
647 for error in results.into_iter().filter_map(Result::err) {
648 log::error!("Failed to connect execution client: {error:#}");
649 }
650 }
651
652 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
658 let futures: Vec<_> = self
659 .get_clients_mut()
660 .into_iter()
661 .map(ExecutionClientAdapter::disconnect)
662 .collect();
663
664 let results = join_all(futures).await;
665 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
666
667 if errors.is_empty() {
668 Ok(())
669 } else {
670 let error_msgs: Vec<_> = errors.iter().map(ToString::to_string).collect();
671 anyhow::bail!(
672 "Failed to disconnect execution clients: {}",
673 error_msgs.join("; ")
674 )
675 }
676 }
677
678 pub fn set_manage_own_order_books(&mut self, value: bool) {
680 self.config.manage_own_order_books = value;
681 }
682
683 pub fn start_snapshot_timer(&mut self) {
687 if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
688 log::info!("Starting position snapshots timer at {interval_secs} second intervals");
689 }
690 }
691
692 pub fn stop_snapshot_timer(&mut self) {
694 if self.config.snapshot_positions_interval_secs.is_some() {
695 log::info!("Canceling position snapshots timer");
696 }
697 }
698
699 #[expect(
701 clippy::missing_panics_doc,
702 reason = "timer registration is not expected to fail"
703 )]
704 pub fn start_purge_timers(&mut self) {
705 if let Some(interval_mins) = self
706 .config
707 .purge_closed_orders_interval_mins
708 .filter(|&m| m > 0)
709 && !self
710 .clock
711 .borrow()
712 .timer_names()
713 .contains(&TIMER_PURGE_CLOSED_ORDERS)
714 {
715 let interval_ns = mins_to_nanos(u64::from(interval_mins));
716 let buffer_mins = self.config.purge_closed_orders_buffer_mins.unwrap_or(0);
717 let buffer_secs = mins_to_secs(u64::from(buffer_mins));
718 let cache = self.cache.clone();
719 let clock = self.clock.clone();
720
721 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
722 let ts_now = clock.borrow().timestamp_ns();
723 cache.borrow_mut().purge_closed_orders(ts_now, buffer_secs);
724 });
725 let callback = TimeEventCallback::from(callback_fn);
726
727 log::info!("Starting purge closed orders timer at {interval_mins} minute intervals");
728 self.clock
729 .borrow_mut()
730 .set_timer_ns(
731 TIMER_PURGE_CLOSED_ORDERS,
732 interval_ns,
733 None,
734 None,
735 Some(callback),
736 None,
737 None,
738 )
739 .expect("Failed to set purge closed orders timer");
740 }
741
742 if let Some(interval_mins) = self
743 .config
744 .purge_closed_positions_interval_mins
745 .filter(|&m| m > 0)
746 && !self
747 .clock
748 .borrow()
749 .timer_names()
750 .contains(&TIMER_PURGE_CLOSED_POSITIONS)
751 {
752 let interval_ns = mins_to_nanos(u64::from(interval_mins));
753 let buffer_mins = self.config.purge_closed_positions_buffer_mins.unwrap_or(0);
754 let buffer_secs = mins_to_secs(u64::from(buffer_mins));
755 let cache = self.cache.clone();
756 let clock = self.clock.clone();
757
758 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
759 let ts_now = clock.borrow().timestamp_ns();
760 cache
761 .borrow_mut()
762 .purge_closed_positions(ts_now, buffer_secs);
763 });
764 let callback = TimeEventCallback::from(callback_fn);
765
766 log::info!("Starting purge closed positions timer at {interval_mins} minute intervals");
767 self.clock
768 .borrow_mut()
769 .set_timer_ns(
770 TIMER_PURGE_CLOSED_POSITIONS,
771 interval_ns,
772 None,
773 None,
774 Some(callback),
775 None,
776 None,
777 )
778 .expect("Failed to set purge closed positions timer");
779 }
780
781 if let Some(interval_mins) = self
782 .config
783 .purge_account_events_interval_mins
784 .filter(|&m| m > 0)
785 && !self
786 .clock
787 .borrow()
788 .timer_names()
789 .contains(&TIMER_PURGE_ACCOUNT_EVENTS)
790 {
791 let interval_ns = mins_to_nanos(u64::from(interval_mins));
792 let lookback_mins = self.config.purge_account_events_lookback_mins.unwrap_or(0);
793 let lookback_secs = mins_to_secs(u64::from(lookback_mins));
794 let cache = self.cache.clone();
795 let clock = self.clock.clone();
796
797 let callback_fn: Rc<dyn Fn(TimeEvent)> = Rc::new(move |_event| {
798 let ts_now = clock.borrow().timestamp_ns();
799 cache
800 .borrow_mut()
801 .purge_account_events(ts_now, lookback_secs);
802 });
803 let callback = TimeEventCallback::from(callback_fn);
804
805 log::info!("Starting purge account events timer at {interval_mins} minute intervals");
806 self.clock
807 .borrow_mut()
808 .set_timer_ns(
809 TIMER_PURGE_ACCOUNT_EVENTS,
810 interval_ns,
811 None,
812 None,
813 Some(callback),
814 None,
815 None,
816 )
817 .expect("Failed to set purge account events timer");
818 }
819 }
820
821 pub fn stop_purge_timers(&mut self) {
823 let timer_names: Vec<String> = self
824 .clock
825 .borrow()
826 .timer_names()
827 .into_iter()
828 .map(String::from)
829 .collect();
830
831 if timer_names.iter().any(|n| n == TIMER_PURGE_CLOSED_ORDERS) {
832 log::info!("Canceling purge closed orders timer");
833 self.clock
834 .borrow_mut()
835 .cancel_timer(TIMER_PURGE_CLOSED_ORDERS);
836 }
837
838 if timer_names
839 .iter()
840 .any(|n| n == TIMER_PURGE_CLOSED_POSITIONS)
841 {
842 log::info!("Canceling purge closed positions timer");
843 self.clock
844 .borrow_mut()
845 .cancel_timer(TIMER_PURGE_CLOSED_POSITIONS);
846 }
847
848 if timer_names.iter().any(|n| n == TIMER_PURGE_ACCOUNT_EVENTS) {
849 log::info!("Canceling purge account events timer");
850 self.clock
851 .borrow_mut()
852 .cancel_timer(TIMER_PURGE_ACCOUNT_EVENTS);
853 }
854 }
855
856 pub fn snapshot_open_position_states(&self) {
858 let positions: Vec<Position> = self
859 .cache
860 .borrow()
861 .positions_open(None, None, None, None, None)
862 .into_iter()
863 .map(|p| p.cloned())
864 .collect();
865
866 for position in positions {
867 self.create_position_state_snapshot(&position);
868 }
869 }
870
871 #[expect(clippy::await_holding_refcell_ref)]
872 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
878 let ts = SystemTime::now(); {
881 let mut cache = self.cache.borrow_mut();
882 cache.clear_index();
883 cache.cache_general()?;
884 }
885
886 self.cache.borrow_mut().cache_all().await?;
887
888 let own_book_entries: Vec<(InstrumentId, OwnBookOrder)> = {
890 let mut cache = self.cache.borrow_mut();
891 cache.build_index();
892 let _ = cache.check_integrity();
893
894 if self.config.manage_own_order_books {
895 cache
896 .orders(None, None, None, None, None)
897 .into_iter()
898 .filter(|o| !o.is_closed() && should_handle_own_book_order(o))
899 .map(|o| (o.instrument_id(), o.to_own_book_order()))
900 .collect()
901 } else {
902 Vec::new()
903 }
904 };
905
906 for (instrument_id, own_order) in own_book_entries {
907 let mut own_book = self.get_or_init_own_order_book(&instrument_id);
908 own_book.add(own_order);
909 }
910
911 self.set_position_id_counts();
912
913 log::info!(
914 "Loaded cache in {}ms",
915 SystemTime::now() .duration_since(ts)
917 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
918 .as_millis()
919 );
920
921 Ok(())
922 }
923
924 pub fn flush_db(&self) {
926 self.cache.borrow_mut().flush_db();
927 }
928
929 pub fn reconcile_execution_report(&mut self, report: &ExecutionReport) {
931 if !matches!(report, ExecutionReport::MassStatus(_)) {
932 self.report_count += 1;
933 }
934
935 match report {
936 ExecutionReport::Order(order_report) => {
937 self.reconcile_order_status_report(order_report);
938 }
939 ExecutionReport::Fill(fill_report) => {
940 self.reconcile_fill_report(fill_report);
941 }
942 ExecutionReport::OrderWithFills(order_report, fills) => {
943 self.reconcile_order_with_fills(order_report, fills);
944 }
945 ExecutionReport::Position(position_report) => {
946 self.reconcile_position_report(position_report);
947 }
948 ExecutionReport::MassStatus(mass_status) => {
949 self.reconcile_execution_mass_status(mass_status);
950 }
951 }
952 }
953
954 pub fn reconcile_order_status_report(&mut self, report: &OrderStatusReport) {
964 msgbus::publish_any(
965 MessagingSwitchboard::reconciliation_raw_order_status_report_topic(),
966 report,
967 );
968
969 let cache = self.cache.borrow();
970
971 let order = report
972 .client_order_id
973 .and_then(|id| cache.order(&id).map(|o| o.clone()))
974 .or_else(|| {
975 cache
976 .client_order_id(&report.venue_order_id)
977 .and_then(|cid| cache.order(cid).map(|o| o.clone()))
978 });
979
980 let instrument = cache.instrument(&report.instrument_id).cloned();
981
982 drop(cache);
983
984 if let Some(order) = order {
985 let ts_now = self.clock.borrow().timestamp_ns();
986 let events =
987 generate_reconciliation_order_events(&order, report, instrument.as_ref(), ts_now);
988
989 for event in &events {
990 self.handle_event(event);
991 }
992 } else {
993 self.create_external_order(report, instrument.as_ref());
994 }
995 }
996
997 fn create_external_order(
998 &mut self,
999 report: &OrderStatusReport,
1000 instrument: Option<&InstrumentAny>,
1001 ) {
1002 let Some(instrument) = instrument else {
1003 log::warn!(
1004 "Cannot create external order for venue_order_id={}: instrument {} not found",
1005 report.venue_order_id,
1006 report.instrument_id
1007 );
1008 return;
1009 };
1010
1011 let Some(order) = self.materialize_external_order_from_status(report) else {
1012 return;
1013 };
1014
1015 let ts_now = self.clock.borrow().timestamp_ns();
1016 let events = generate_external_order_status_events(
1017 &order,
1018 report,
1019 &report.account_id,
1020 instrument,
1021 ts_now,
1022 );
1023
1024 for event in &events {
1025 self.handle_event(event);
1026 }
1027 }
1028
1029 fn materialize_external_order_from_status(
1032 &mut self,
1033 report: &OrderStatusReport,
1034 ) -> Option<OrderAny> {
1035 let strategy_id = self.resolve_external_strategy(&report.instrument_id);
1036 if self.should_filter_unclaimed_external_order(strategy_id) {
1037 self.filtered_unclaimed_external_order_count += 1;
1038
1039 if self.filtered_unclaimed_external_order_count == 1 {
1040 let external_order_id = report
1041 .client_order_id
1042 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1043 log::info!(
1044 "Filtering unclaimed external orders; first filtered order {} ({}) for {}",
1045 external_order_id,
1046 report.venue_order_id,
1047 report.instrument_id,
1048 );
1049 } else {
1050 let external_order_id = report
1051 .client_order_id
1052 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1053 log::debug!(
1054 "Filtered unclaimed external order {} ({}) for {}",
1055 external_order_id,
1056 report.venue_order_id,
1057 report.instrument_id,
1058 );
1059 }
1060
1061 return None;
1062 }
1063
1064 self.materialize_external_order_from_status_with_strategy(report, strategy_id)
1065 }
1066
1067 fn materialize_external_order_from_status_with_strategy(
1068 &self,
1069 report: &OrderStatusReport,
1070 strategy_id: StrategyId,
1071 ) -> Option<OrderAny> {
1072 let client_order_id = report
1073 .client_order_id
1074 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1075
1076 let trader_id = get_message_bus().borrow().trader_id;
1077 let ts_now = self.clock.borrow().timestamp_ns();
1078
1079 let initialized = OrderInitialized::new(
1080 trader_id,
1081 strategy_id,
1082 report.instrument_id,
1083 client_order_id,
1084 report.order_side,
1085 report.order_type,
1086 report.quantity,
1087 report.time_in_force,
1088 report.post_only,
1089 report.reduce_only,
1090 false, true, UUID4::new(),
1093 ts_now,
1094 ts_now,
1095 report.price,
1096 report.trigger_price,
1097 report.trigger_type,
1098 report.limit_offset,
1099 report.trailing_offset,
1100 Some(report.trailing_offset_type),
1101 report.expire_time,
1102 report.display_qty,
1103 None, None, Some(report.contingency_type),
1106 report.order_list_id,
1107 report.linked_order_ids.clone(),
1108 report.parent_order_id,
1109 None, None, None, None, );
1114
1115 self.materialize_external_order(
1116 initialized,
1117 client_order_id,
1118 report.venue_order_id,
1119 report.instrument_id,
1120 strategy_id,
1121 ts_now,
1122 Some(report.order_status),
1123 )
1124 }
1125
1126 fn materialize_external_order_from_fill(&mut self, report: &FillReport) -> Option<OrderAny> {
1134 let strategy_id = self.resolve_external_strategy(&report.instrument_id);
1135 if self.should_filter_unclaimed_external_order(strategy_id) {
1136 self.filtered_unclaimed_external_order_count += 1;
1137
1138 let external_order_id = report
1139 .client_order_id
1140 .map_or_else(|| report.venue_order_id.to_string(), |id| id.to_string());
1141
1142 if self.filtered_unclaimed_external_order_count == 1 {
1143 log::info!(
1144 "Filtering unclaimed external orders; first filtered fill {} ({}) for {}",
1145 external_order_id,
1146 report.venue_order_id,
1147 report.instrument_id,
1148 );
1149 } else {
1150 log::debug!(
1151 "Filtered unclaimed external fill {} ({}) for {}",
1152 external_order_id,
1153 report.venue_order_id,
1154 report.instrument_id,
1155 );
1156 }
1157
1158 return None;
1159 }
1160
1161 let client_order_id = report
1162 .client_order_id
1163 .unwrap_or_else(|| ClientOrderId::from(report.venue_order_id.as_str()));
1164
1165 let trader_id = get_message_bus().borrow().trader_id;
1166 let ts_now = self.clock.borrow().timestamp_ns();
1167
1168 let initialized = OrderInitialized::new(
1169 trader_id,
1170 strategy_id,
1171 report.instrument_id,
1172 client_order_id,
1173 report.order_side,
1174 OrderType::Market,
1175 report.last_qty,
1176 TimeInForce::Ioc,
1177 false, true, false, true, UUID4::new(),
1182 ts_now,
1183 ts_now,
1184 None, None, None, None, None, Some(TrailingOffsetType::NoTrailingOffset),
1190 None, None, None, None, Some(ContingencyType::NoContingency),
1195 None, None, None, None, None, None, None, );
1203
1204 self.materialize_external_order(
1205 initialized,
1206 client_order_id,
1207 report.venue_order_id,
1208 report.instrument_id,
1209 strategy_id,
1210 ts_now,
1211 None,
1212 )
1213 }
1214
1215 fn resolve_external_strategy(&self, instrument_id: &InstrumentId) -> StrategyId {
1216 self.external_order_claims
1217 .get(instrument_id)
1218 .copied()
1219 .unwrap_or_else(StrategyId::external)
1220 }
1221
1222 fn should_filter_unclaimed_external_order(&self, strategy_id: StrategyId) -> bool {
1223 self.config.filter_unclaimed_external_orders && strategy_id.is_external()
1224 }
1225
1226 #[allow(
1229 clippy::too_many_arguments,
1230 reason = "external order materialisation threads several ids and a timestamp"
1231 )]
1232 fn materialize_external_order(
1233 &self,
1234 initialized: OrderInitialized,
1235 client_order_id: ClientOrderId,
1236 venue_order_id: VenueOrderId,
1237 instrument_id: InstrumentId,
1238 strategy_id: StrategyId,
1239 ts_now: UnixNanos,
1240 order_status: Option<OrderStatus>,
1241 ) -> Option<OrderAny> {
1242 let initialized = OrderEventAny::Initialized(initialized);
1243 let order = match OrderAny::from_events(vec![initialized.clone()]) {
1244 Ok(order) => order,
1245 Err(e) => {
1246 log::error!("Failed to create external order from report: {e}");
1247 return None;
1248 }
1249 };
1250
1251 {
1252 let mut cache = self.cache.borrow_mut();
1253 if let Err(e) = cache.add_order(order.clone(), None, None, false) {
1254 log::error!("Failed to add external order to cache: {e}");
1255 return None;
1256 }
1257
1258 if let Err(e) = cache.add_venue_order_id(&client_order_id, &venue_order_id, false) {
1259 log::warn!("Failed to add venue order ID index: {e}");
1260 }
1261 }
1262
1263 self.publish_order_event(&initialized);
1264
1265 match order_status {
1266 Some(status) => log::info!(
1267 "Created external order {client_order_id} ({venue_order_id}) for {instrument_id} [{status}]",
1268 ),
1269 None => log::info!(
1270 "Created external order {client_order_id} ({venue_order_id}) for {instrument_id}",
1271 ),
1272 }
1273
1274 self.register_external_order(
1275 client_order_id,
1276 venue_order_id,
1277 instrument_id,
1278 strategy_id,
1279 ts_now,
1280 );
1281
1282 Some(order)
1283 }
1284
1285 pub fn reconcile_fill_report(&mut self, report: &FillReport) {
1293 msgbus::publish_any(
1294 MessagingSwitchboard::reconciliation_raw_fill_report_topic(),
1295 report,
1296 );
1297
1298 let cache = self.cache.borrow();
1299
1300 let order = report
1301 .client_order_id
1302 .and_then(|id| cache.order(&id).map(|o| o.clone()))
1303 .or_else(|| {
1304 cache
1305 .client_order_id(&report.venue_order_id)
1306 .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1307 });
1308
1309 let instrument = cache.instrument(&report.instrument_id).cloned();
1310
1311 drop(cache);
1312
1313 let Some(instrument) = instrument else {
1314 log::debug!(
1315 "Cannot reconcile fill report for venue_order_id={}: instrument {} not found",
1316 report.venue_order_id,
1317 report.instrument_id
1318 );
1319 return;
1320 };
1321
1322 let order = match order {
1323 Some(order) => order,
1324 None => {
1325 let Some(order) = self.materialize_external_order_from_fill(report) else {
1326 return;
1327 };
1328 let ts_now = self.clock.borrow().timestamp_ns();
1329 let accepted = OrderAccepted::new(
1330 order.trader_id(),
1331 order.strategy_id(),
1332 order.instrument_id(),
1333 order.client_order_id(),
1334 report.venue_order_id,
1335 report.account_id,
1336 UUID4::new(),
1337 report.ts_event,
1338 ts_now,
1339 true, );
1341 self.handle_event(&OrderEventAny::Accepted(accepted));
1342 self.cache
1343 .borrow()
1344 .order(&order.client_order_id())
1345 .map(|o| o.clone())
1346 .unwrap_or(order)
1347 }
1348 };
1349
1350 let ts_now = self.clock.borrow().timestamp_ns();
1351
1352 if let Some(event) = reconcile_fill(
1353 &order,
1354 report,
1355 &instrument,
1356 ts_now,
1357 self.config.allow_overfills,
1358 ) {
1359 self.handle_event(&event);
1360 }
1361 }
1362
1363 pub fn reconcile_order_with_fills(&mut self, report: &OrderStatusReport, fills: &[FillReport]) {
1372 msgbus::publish_any(
1373 MessagingSwitchboard::reconciliation_raw_order_status_report_topic(),
1374 report,
1375 );
1376
1377 let fill_report_topic = MessagingSwitchboard::reconciliation_raw_fill_report_topic();
1378 for fill in fills {
1379 msgbus::publish_any(fill_report_topic, fill);
1380 }
1381
1382 let cache = self.cache.borrow();
1383 let order = report
1384 .client_order_id
1385 .and_then(|id| cache.order(&id).map(|o| o.clone()))
1386 .or_else(|| {
1387 cache
1388 .client_order_id(&report.venue_order_id)
1389 .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1390 });
1391 let instrument = cache.instrument(&report.instrument_id).cloned();
1392 drop(cache);
1393
1394 let Some(instrument) = instrument else {
1395 log::debug!(
1396 "Cannot reconcile bundled report for venue_order_id={}: instrument {} not found",
1397 report.venue_order_id,
1398 report.instrument_id,
1399 );
1400 return;
1401 };
1402
1403 let mut order = match order {
1406 Some(order) => order,
1407 None => {
1408 let Some(order) = self.materialize_external_order_from_status(report) else {
1409 return;
1410 };
1411 let ts_now = self.clock.borrow().timestamp_ns();
1412 let accepted = OrderAccepted::new(
1413 order.trader_id(),
1414 order.strategy_id(),
1415 order.instrument_id(),
1416 order.client_order_id(),
1417 report.venue_order_id,
1418 report.account_id,
1419 UUID4::new(),
1420 report.ts_accepted,
1421 ts_now,
1422 true, );
1424 self.handle_event(&OrderEventAny::Accepted(accepted));
1425 order
1426 }
1427 };
1428
1429 let client_order_id = order.client_order_id();
1430
1431 for fill in fills {
1432 let ts_now = self.clock.borrow().timestamp_ns();
1433
1434 if let Some(event) = reconcile_fill(
1435 &order,
1436 fill,
1437 &instrument,
1438 ts_now,
1439 self.config.allow_overfills,
1440 ) {
1441 self.handle_event(&event);
1442 }
1443
1444 if let Some(refreshed) = self
1446 .cache
1447 .borrow()
1448 .order(&client_order_id)
1449 .map(|o| o.clone())
1450 {
1451 order = refreshed;
1452 }
1453 }
1454
1455 if matches!(
1458 report.order_status,
1459 OrderStatus::PartiallyFilled | OrderStatus::Filled,
1460 ) && report.filled_qty > order.filled_qty()
1461 {
1462 let ts_now = self.clock.borrow().timestamp_ns();
1463
1464 if let Some(event) = create_incremental_inferred_fill(
1465 &order,
1466 report,
1467 &report.account_id,
1468 &instrument,
1469 ts_now,
1470 None,
1471 ) {
1472 self.handle_event(&event);
1473
1474 if let Some(refreshed) = self
1475 .cache
1476 .borrow()
1477 .order(&client_order_id)
1478 .map(|o| o.clone())
1479 {
1480 order = refreshed;
1481 }
1482 }
1483 }
1484
1485 match report.order_status {
1487 OrderStatus::Canceled if !order.is_closed() => {
1488 let ts_now = self.clock.borrow().timestamp_ns();
1489 let canceled = OrderCanceled::new(
1490 order.trader_id(),
1491 order.strategy_id(),
1492 order.instrument_id(),
1493 order.client_order_id(),
1494 UUID4::new(),
1495 report.ts_last,
1496 ts_now,
1497 true,
1498 Some(report.venue_order_id),
1499 Some(report.account_id),
1500 );
1501 self.handle_event(&OrderEventAny::Canceled(canceled));
1502 }
1503 OrderStatus::Expired if !order.is_closed() => {
1504 let ts_now = self.clock.borrow().timestamp_ns();
1505 let expired = OrderExpired::new(
1506 order.trader_id(),
1507 order.strategy_id(),
1508 order.instrument_id(),
1509 order.client_order_id(),
1510 UUID4::new(),
1511 report.ts_last,
1512 ts_now,
1513 true,
1514 Some(report.venue_order_id),
1515 Some(report.account_id),
1516 );
1517 self.handle_event(&OrderEventAny::Expired(expired));
1518 }
1519 _ => {}
1520 }
1521 }
1522
1523 pub fn reconcile_position_report(&mut self, report: &PositionStatusReport) {
1528 msgbus::publish_any(
1529 MessagingSwitchboard::reconciliation_raw_position_status_report_topic(),
1530 report,
1531 );
1532
1533 let cache = self.cache.borrow();
1534
1535 let size_precision = cache
1536 .instrument(&report.instrument_id)
1537 .map(InstrumentAny::size_precision);
1538
1539 if report.venue_position_id.is_some() {
1540 self.reconcile_position_report_hedging(report, &cache);
1541 } else {
1542 self.reconcile_position_report_netting(report, &cache, size_precision);
1543 }
1544 }
1545
1546 fn reconcile_position_report_hedging(&self, report: &PositionStatusReport, cache: &Cache) {
1547 let venue_position_id = report.venue_position_id.as_ref().unwrap();
1548
1549 log::debug!(
1550 "Reconciling HEDGE position for {}, venue_position_id={}",
1551 report.instrument_id,
1552 venue_position_id
1553 );
1554
1555 let Some(position) = cache.position(venue_position_id) else {
1556 log::error!("Cannot reconcile position: {venue_position_id} not found in cache");
1557 return;
1558 };
1559
1560 let cached_signed_qty = match position.side {
1561 PositionSide::Long => position.quantity.as_decimal(),
1562 PositionSide::Short => -position.quantity.as_decimal(),
1563 _ => Decimal::ZERO,
1564 };
1565 let venue_signed_qty = report.signed_decimal_qty;
1566
1567 if cached_signed_qty != venue_signed_qty {
1568 log::error!(
1569 "Position mismatch for {} {}: cached={}, venue={}",
1570 report.instrument_id,
1571 venue_position_id,
1572 cached_signed_qty,
1573 venue_signed_qty
1574 );
1575 }
1576 }
1577
1578 fn reconcile_position_report_netting(
1579 &self,
1580 report: &PositionStatusReport,
1581 cache: &Cache,
1582 size_precision: Option<u8>,
1583 ) {
1584 log::debug!("Reconciling NET position for {}", report.instrument_id);
1585
1586 let positions_open = Self::netting_positions_open_for_report(cache, report);
1587
1588 let position_refs = positions_open
1589 .iter()
1590 .map(|position| &**position)
1591 .collect::<Vec<_>>();
1592
1593 if let Some(message) =
1594 Self::netting_split_position_ownership_message(report, &position_refs)
1595 {
1596 log::warn!("{message}");
1597 }
1598
1599 let cached_signed_qty: Decimal = positions_open
1601 .iter()
1602 .map(|position| Self::position_signed_decimal_qty(position))
1603 .sum();
1604
1605 log::debug!(
1606 "Position report: venue_signed_qty={}, cached_signed_qty={}",
1607 report.signed_decimal_qty,
1608 cached_signed_qty
1609 );
1610
1611 let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
1612 }
1613
1614 fn netting_positions_open_for_report<'a>(
1615 cache: &'a Cache,
1616 report: &PositionStatusReport,
1617 ) -> Vec<PositionRef<'a>> {
1618 cache.positions_open(
1619 None,
1620 Some(&report.instrument_id),
1621 None,
1622 Some(&report.account_id),
1623 None,
1624 )
1625 }
1626
1627 fn netting_split_position_ownership_message(
1628 report: &PositionStatusReport,
1629 positions_open: &[&Position],
1630 ) -> Option<String> {
1631 let mut strategy_ids = positions_open
1632 .iter()
1633 .map(|position| position.strategy_id.to_string())
1634 .collect::<Vec<_>>();
1635 strategy_ids.sort();
1636 strategy_ids.dedup();
1637
1638 if strategy_ids.len() <= 1 {
1639 return None;
1640 }
1641
1642 let position_details = Self::position_details(positions_open.iter().copied());
1643
1644 Some(format!(
1645 "NETTING reconciliation found split ownership for account_id={}, instrument_id={}: \
1646 strategies=[{}], positions=[{}]",
1647 report.account_id,
1648 report.instrument_id,
1649 strategy_ids.join(", "),
1650 position_details
1651 ))
1652 }
1653
1654 pub fn reconcile_execution_mass_status(&mut self, mass_status: &ExecutionMassStatus) {
1660 self.report_count += 1;
1661
1662 log::info!(
1663 "Reconciling mass status for client={}, account={}, venue={}",
1664 mass_status.client_id,
1665 mass_status.account_id,
1666 mass_status.venue
1667 );
1668
1669 let mut external_venue_ids = AHashSet::new();
1670 let mut filtered_venue_ids = AHashSet::new();
1671
1672 for order_report in mass_status.order_reports().values() {
1673 let existed = {
1674 let cache = self.cache.borrow();
1675 order_report
1676 .client_order_id
1677 .and_then(|id| cache.order(&id).map(|o| o.clone()))
1678 .or_else(|| {
1679 cache
1680 .client_order_id(&order_report.venue_order_id)
1681 .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1682 })
1683 .is_some()
1684 };
1685 let filtered_count = self.filtered_unclaimed_external_order_count;
1686
1687 self.reconcile_order_status_report(order_report);
1688
1689 if !existed {
1690 if self.filtered_unclaimed_external_order_count > filtered_count {
1691 filtered_venue_ids.insert(order_report.venue_order_id);
1692 } else {
1693 let exists_after = {
1694 let cache = self.cache.borrow();
1695 order_report
1696 .client_order_id
1697 .and_then(|id| cache.order(&id).map(|o| o.clone()))
1698 .or_else(|| {
1699 cache
1700 .client_order_id(&order_report.venue_order_id)
1701 .and_then(|cid| cache.order(cid).map(|o| o.clone()))
1702 })
1703 .is_some()
1704 };
1705
1706 if exists_after {
1707 external_venue_ids.insert(order_report.venue_order_id);
1708 }
1709 }
1710 }
1711 }
1712
1713 let raw_fill_topic = MessagingSwitchboard::reconciliation_raw_fill_report_topic();
1714
1715 for fill_reports in mass_status.fill_reports().values() {
1716 for fill_report in fill_reports {
1717 if external_venue_ids.contains(&fill_report.venue_order_id) {
1718 msgbus::publish_any(raw_fill_topic, fill_report);
1722
1723 log::debug!(
1724 "Skipping fill report for external order {}: covered by inferred fill",
1725 fill_report.venue_order_id
1726 );
1727 continue;
1728 }
1729
1730 if filtered_venue_ids.contains(&fill_report.venue_order_id) {
1731 msgbus::publish_any(raw_fill_topic, fill_report);
1732
1733 log::debug!(
1734 "Skipping fill report for filtered unclaimed external order {}",
1735 fill_report.venue_order_id
1736 );
1737 continue;
1738 }
1739
1740 self.reconcile_fill_report(fill_report);
1741 }
1742 }
1743
1744 for position_reports in mass_status.position_reports().values() {
1745 for position_report in position_reports {
1746 self.reconcile_position_report(position_report);
1747 }
1748 }
1749
1750 log::info!(
1751 "Mass status reconciliation complete: {} orders, {} fills, {} positions",
1752 mass_status.order_reports().len(),
1753 mass_status
1754 .fill_reports()
1755 .values()
1756 .map(Vec::len)
1757 .sum::<usize>(),
1758 mass_status
1759 .position_reports()
1760 .values()
1761 .map(Vec::len)
1762 .sum::<usize>()
1763 );
1764 }
1765
1766 pub fn execute(&self, command: TradingCommand) {
1768 self.execute_command(command);
1769 }
1770
1771 pub fn process(&mut self, event: &OrderEventAny) {
1773 self.handle_event(event);
1774 }
1775
1776 pub fn start(&mut self) {
1778 for client in self.get_clients_mut() {
1779 if let Err(e) = client.start() {
1780 log::error!("{e}");
1781 }
1782 }
1783
1784 self.start_snapshot_timer();
1785 self.start_purge_timers();
1786
1787 log::info!("Started");
1788 }
1789
1790 pub fn stop(&mut self) {
1796 for client in self.get_clients_mut() {
1797 if let Err(e) = client.stop() {
1798 log::error!("{e}");
1799 }
1800 }
1801
1802 self.stop_snapshot_timer();
1803 self.stop_purge_timers();
1804
1805 log::info!("Stopped");
1806 }
1807
1808 pub fn stop_clients(&mut self) {
1810 for client in self.get_clients_mut() {
1811 if let Err(e) = client.stop() {
1812 log::error!("{e}");
1813 }
1814 }
1815 }
1816
1817 pub fn reset(&mut self) {
1822 for client in self.get_clients_mut() {
1823 if let Err(e) = client.reset() {
1824 log::error!("{e}");
1825 }
1826 }
1827
1828 self.cache.borrow_mut().reset();
1829 self.pos_id_generator.reset();
1830
1831 self.stop_snapshot_timer();
1832 self.stop_purge_timers();
1833
1834 self.command_count.set(0);
1835 self.event_count = 0;
1836 self.report_count = 0;
1837 self.filtered_unclaimed_external_order_count = 0;
1838
1839 log::info!("Reset");
1840 }
1841
1842 pub fn dispose(&mut self) {
1847 for client in self.get_clients_mut() {
1848 if let Err(e) = client.dispose() {
1849 log::error!("{e}");
1850 }
1851 }
1852
1853 self.stop_snapshot_timer();
1854 self.stop_purge_timers();
1855
1856 log::info!("Disposed");
1857 }
1858
1859 fn execute_command(&self, command: TradingCommand) {
1860 self.command_count.set(self.command_count.get() + 1);
1861
1862 if self.config.debug {
1863 log::debug!("{RECV}{CMD} {command:?}");
1864 }
1865
1866 if let Some(cid) = command.client_id()
1867 && self.external_clients.contains(&cid)
1868 {
1869 let topic = format!("commands.trading.{cid}");
1870 msgbus::publish_any(topic.into(), &command);
1871
1872 if self.config.debug {
1873 log::debug!("Skipping execution command for external client {cid}: {command:?}");
1874 }
1875 return;
1876 }
1877
1878 let client = if let Some(adapter) = self.find_client_for_command(&command) {
1879 adapter.client.as_ref()
1880 } else {
1881 let routing_context = Self::routing_context_for_command(&command);
1882
1883 log::error!(
1884 "No execution client found for command: client_id={:?}, {routing_context}, command={command:?}",
1885 command.client_id(),
1886 );
1887
1888 let reason = format!(
1889 "No execution client found for client_id={:?}, {routing_context}",
1890 command.client_id(),
1891 );
1892
1893 match command {
1894 TradingCommand::SubmitOrder(cmd) => {
1895 let order = self
1896 .cache
1897 .borrow()
1898 .order(&cmd.client_order_id)
1899 .map(|o| o.clone());
1900 if let Some(order) = order {
1901 self.deny_order(&order, &reason);
1902 }
1903 }
1904 TradingCommand::SubmitOrderList(cmd) => {
1905 let orders: Vec<OrderAny> = self
1906 .cache
1907 .borrow()
1908 .orders_for_ids(&cmd.order_list.client_order_ids, &cmd);
1909
1910 for order in &orders {
1911 self.deny_order(order, &reason);
1912 }
1913 }
1914 _ => {}
1915 }
1916
1917 return;
1918 };
1919
1920 match command {
1921 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
1922 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
1923 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
1924 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
1925 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
1926 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
1927 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
1928 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
1929 }
1930 }
1931
1932 fn routing_context_for_command(command: &TradingCommand) -> String {
1933 match command {
1934 TradingCommand::SubmitOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1935 TradingCommand::SubmitOrderList(cmd) => format!("venue={}", cmd.instrument_id.venue),
1936 TradingCommand::ModifyOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1937 TradingCommand::CancelOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1938 TradingCommand::CancelAllOrders(cmd) => format!("venue={}", cmd.instrument_id.venue),
1939 TradingCommand::BatchCancelOrders(cmd) => format!("venue={}", cmd.instrument_id.venue),
1940 TradingCommand::QueryOrder(cmd) => format!("venue={}", cmd.instrument_id.venue),
1941 TradingCommand::QueryAccount(cmd) => {
1942 let issuer = cmd.account_id.get_issuer();
1943 format!("account_id={}, issuer={issuer}", cmd.account_id)
1944 }
1945 }
1946 }
1947
1948 fn find_client_for_command(&self, command: &TradingCommand) -> Option<&ExecutionClientAdapter> {
1949 if let Some(client_id) = command.client_id()
1950 && let Some(adapter) = self.clients.get(&client_id)
1951 {
1952 return Some(adapter);
1953 }
1954
1955 if let Some(account_id) = self.account_id_for_command(command) {
1956 let issuer = account_id.get_issuer();
1957 let issuer_client_id = ClientId::from(issuer.as_str());
1958
1959 if let Some(adapter) = self.clients.get(&issuer_client_id) {
1960 return Some(adapter);
1961 }
1962
1963 if let Some(client_id) = self.routing_map.get(&issuer)
1964 && let Some(adapter) = self.clients.get(client_id)
1965 {
1966 return Some(adapter);
1967 }
1968 }
1969
1970 if let Some(instrument_id) = Self::instrument_id_for_command(command)
1971 && let Some(client_id) = self.routing_map.get(&instrument_id.venue)
1972 && let Some(adapter) = self.clients.get(client_id)
1973 {
1974 return Some(adapter);
1975 }
1976
1977 self.default_client.as_ref()
1978 }
1979
1980 fn account_id_for_command(&self, command: &TradingCommand) -> Option<AccountId> {
1981 match command {
1982 TradingCommand::QueryAccount(cmd) => Some(cmd.account_id),
1983 TradingCommand::SubmitOrder(cmd) => self
1984 .cache
1985 .borrow()
1986 .order(&cmd.client_order_id)
1987 .and_then(|order| order.account_id()),
1988 TradingCommand::ModifyOrder(cmd) => self
1989 .cache
1990 .borrow()
1991 .order(&cmd.client_order_id)
1992 .and_then(|order| order.account_id()),
1993 TradingCommand::CancelOrder(cmd) => self
1994 .cache
1995 .borrow()
1996 .order(&cmd.client_order_id)
1997 .and_then(|order| order.account_id()),
1998 TradingCommand::SubmitOrderList(_)
1999 | TradingCommand::CancelAllOrders(_)
2000 | TradingCommand::BatchCancelOrders(_)
2001 | TradingCommand::QueryOrder(_) => None,
2002 }
2003 }
2004
2005 const fn instrument_id_for_command(command: &TradingCommand) -> Option<InstrumentId> {
2006 match command {
2007 TradingCommand::SubmitOrder(cmd) => Some(cmd.instrument_id),
2008 TradingCommand::SubmitOrderList(cmd) => Some(cmd.instrument_id),
2009 TradingCommand::ModifyOrder(cmd) => Some(cmd.instrument_id),
2010 TradingCommand::CancelOrder(cmd) => Some(cmd.instrument_id),
2011 TradingCommand::CancelAllOrders(cmd) => Some(cmd.instrument_id),
2012 TradingCommand::BatchCancelOrders(cmd) => Some(cmd.instrument_id),
2013 TradingCommand::QueryOrder(cmd) => Some(cmd.instrument_id),
2014 TradingCommand::QueryAccount(_) => None,
2015 }
2016 }
2017
2018 fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: SubmitOrder) {
2019 let client_order_id = cmd.client_order_id;
2020 let cached_order = { self.cache.borrow().order_owned(&client_order_id) };
2021
2022 let (order, added_to_cache) = match cached_order {
2023 Some(order) => (order, false),
2024 None => {
2025 let Some(order) =
2026 self.add_order_from_init(&cmd.order_init, cmd.position_id, cmd.client_id, &cmd)
2027 else {
2028 return;
2029 };
2030
2031 (order, true)
2032 }
2033 };
2034
2035 if added_to_cache && self.config.snapshot_orders {
2036 self.create_order_state_snapshot(&order);
2037 }
2038
2039 let order_venue = order.instrument_id().venue;
2040 let client_venue = client.venue();
2041 if !client.handles_order_venue(order_venue) {
2042 let client_id = client.client_id();
2043 self.deny_order(
2044 &order,
2045 &format!(
2046 "Client {client_id} does not handle order venue {order_venue} (client venue {client_venue})"
2047 ),
2048 );
2049 return;
2050 }
2051
2052 if let Some(reason) = self.check_position_id_against_oms(
2053 cmd.instrument_id,
2054 cmd.strategy_id,
2055 cmd.position_id,
2056 client,
2057 ) {
2058 self.deny_order(&order, &reason);
2059 return;
2060 }
2061
2062 let instrument_id = order.instrument_id();
2063
2064 if !added_to_cache && self.config.snapshot_orders {
2065 self.create_order_state_snapshot(&order);
2066 }
2067
2068 {
2069 let cache = self.cache.borrow();
2070 if cache.instrument(&instrument_id).is_none() {
2071 log::error!(
2072 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
2073 );
2074 return;
2075 }
2076 }
2077
2078 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
2079 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
2080 own_book.add(order.to_own_book_order());
2081 }
2082
2083 if let Err(e) = client.submit_order(cmd) {
2084 self.deny_order(&order, &format!("failed-to-submit-order-to-client: {e}"));
2085 }
2086 }
2087
2088 fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: SubmitOrderList) {
2089 let mut orders = Vec::with_capacity(cmd.order_list.client_order_ids.len());
2090 let mut added_client_order_ids = AHashSet::new();
2091
2092 for client_order_id in &cmd.order_list.client_order_ids {
2093 let cached_order = { self.cache.borrow().order_owned(client_order_id) };
2094
2095 if let Some(order) = cached_order {
2096 orders.push(order);
2097 continue;
2098 }
2099
2100 let Some(order_init) = cmd
2101 .order_inits
2102 .iter()
2103 .find(|init| init.client_order_id == *client_order_id)
2104 else {
2105 log::error!(
2106 "Cannot handle submit order list: order not found in cache and no initialization event for {client_order_id}, {cmd}"
2107 );
2108 continue;
2109 };
2110
2111 let Some(order) =
2112 self.add_order_from_init(order_init, cmd.position_id, cmd.client_id, &cmd)
2113 else {
2114 continue;
2115 };
2116
2117 added_client_order_ids.insert(order.client_order_id());
2118 orders.push(order);
2119 }
2120
2121 if self.config.snapshot_orders {
2122 for order in &orders {
2123 if added_client_order_ids.contains(&order.client_order_id()) {
2124 self.create_order_state_snapshot(order);
2125 }
2126 }
2127 }
2128
2129 if orders.len() != cmd.order_list.client_order_ids.len() {
2130 for order in &orders {
2131 self.deny_order(
2132 order,
2133 &format!("Incomplete order list: missing orders in cache for {cmd}"),
2134 );
2135 }
2136 return;
2137 }
2138
2139 let order_list_venue = cmd.instrument_id.venue;
2140 let client_venue = client.venue();
2141 if !client.handles_order_venue(order_list_venue) {
2142 let client_id = client.client_id();
2143
2144 for order in &orders {
2145 self.deny_order(
2146 order,
2147 &format!(
2148 "Client {client_id} does not handle order list venue {order_list_venue} (client venue {client_venue})"
2149 ),
2150 );
2151 }
2152 return;
2153 }
2154
2155 let is_uniform_instrument = orders
2156 .iter()
2157 .all(|o| o.instrument_id() == cmd.instrument_id);
2158
2159 if let Some(position_id) = cmd.position_id
2160 && !is_uniform_instrument
2161 {
2162 let reason = format!(
2163 "`position_id` {position_id} is not valid for a mixed-instrument order list; \
2164 a position belongs to a single instrument",
2165 );
2166
2167 for order in &orders {
2168 self.deny_order(order, &reason);
2169 }
2170 return;
2171 }
2172
2173 if let Some(reason) = self.check_position_id_against_oms(
2174 cmd.instrument_id,
2175 cmd.strategy_id,
2176 cmd.position_id,
2177 client,
2178 ) {
2179 for order in &orders {
2180 self.deny_order(order, &reason);
2181 }
2182 return;
2183 }
2184
2185 if self.config.snapshot_orders {
2186 for order in &orders {
2187 if !added_client_order_ids.contains(&order.client_order_id()) {
2188 self.create_order_state_snapshot(order);
2189 }
2190 }
2191 }
2192
2193 {
2194 let cache = self.cache.borrow();
2195 if cache.instrument(&cmd.instrument_id).is_none() {
2196 log::error!(
2197 "Cannot handle submit order list: no instrument found for {}, {cmd}",
2198 cmd.instrument_id,
2199 );
2200 return;
2201 }
2202 }
2203
2204 if self.config.manage_own_order_books {
2205 for order in &orders {
2206 if should_handle_own_book_order(order) {
2207 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
2208 own_book.add(order.to_own_book_order());
2209 }
2210 }
2211 }
2212
2213 if let Err(e) = client.submit_order_list(cmd) {
2214 log::error!("Error submitting order list to client: {e}");
2215 for order in &orders {
2216 self.deny_order(
2217 order,
2218 &format!("failed-to-submit-order-list-to-client: {e}"),
2219 );
2220 }
2221 }
2222 }
2223
2224 fn add_order_from_init(
2225 &self,
2226 order_init: &OrderInitialized,
2227 position_id: Option<PositionId>,
2228 client_id: Option<ClientId>,
2229 context: &dyn Display,
2230 ) -> Option<OrderAny> {
2231 let client_order_id = order_init.client_order_id;
2232 let order = match OrderAny::from_events(vec![OrderEventAny::Initialized(
2233 order_init.clone(),
2234 )]) {
2235 Ok(order) => order,
2236 Err(e) => {
2237 log::error!(
2238 "Cannot reconstruct order from initialization event for {client_order_id}: {e}, {context}"
2239 );
2240 return None;
2241 }
2242 };
2243
2244 if let Err(e) =
2245 self.cache
2246 .borrow_mut()
2247 .add_order(order.clone(), position_id, client_id, true)
2248 {
2249 log::error!(
2250 "Cannot add reconstructed order to cache for {client_order_id}: {e}, {context}"
2251 );
2252 return None;
2253 }
2254
2255 Some(order)
2256 }
2257
2258 fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: ModifyOrder) {
2259 if let Err(e) = client.modify_order(cmd) {
2260 log::error!("Error modifying order: {e}");
2261 }
2262 }
2263
2264 fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: CancelOrder) {
2265 if let Err(e) = client.cancel_order(cmd) {
2266 log::error!("Error canceling order: {e}");
2267 }
2268 }
2269
2270 fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: CancelAllOrders) {
2271 if let Err(e) = client.cancel_all_orders(cmd) {
2272 log::error!("Error canceling all orders: {e}");
2273 }
2274 }
2275
2276 fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: BatchCancelOrders) {
2277 if let Err(e) = client.batch_cancel_orders(cmd) {
2278 log::error!("Error batch canceling orders: {e}");
2279 }
2280 }
2281
2282 fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: QueryAccount) {
2283 if let Err(e) = client.query_account(cmd) {
2284 log::error!("Error querying account: {e}");
2285 }
2286 }
2287
2288 fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: QueryOrder) {
2289 if let Err(e) = client.query_order(cmd) {
2290 log::error!("Error querying order: {e}");
2291 }
2292 }
2293
2294 fn create_order_state_snapshot(&self, order: &OrderAny) {
2295 if self.config.debug {
2296 log::debug!("Creating order state snapshot for {order}");
2297 }
2298
2299 if self.cache.borrow().has_backing()
2300 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
2301 {
2302 log::error!("Failed to snapshot order state: {e}");
2303 }
2304 }
2305
2306 fn create_position_state_snapshot(&self, position: &Position) {
2307 if self.config.debug {
2308 log::debug!("Creating position state snapshot for {position}");
2309 }
2310
2311 }
2316
2317 fn handle_event(&mut self, event: &OrderEventAny) {
2318 self.event_count += 1;
2319
2320 if self.config.debug {
2321 log::debug!("{RECV}{EVT} {event:?}");
2322 }
2323
2324 let event_client_order_id = event.client_order_id();
2325 let cache = self.cache.borrow();
2326 let client_order_id = if cache.order_exists(&event_client_order_id) {
2327 event_client_order_id
2328 } else {
2329 let is_leg_fill =
2330 matches!(event, OrderEventAny::Filled(fill) if self.is_leg_fill(fill));
2331 if !is_leg_fill {
2332 log::warn!(
2333 "Order with {} not found in the cache to apply {}",
2334 event.client_order_id(),
2335 event
2336 );
2337 }
2338
2339 let venue_order_id = if let Some(id) = event.venue_order_id() {
2341 id
2342 } else {
2343 log::error!(
2344 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
2345 event.client_order_id()
2346 );
2347 return;
2348 };
2349
2350 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
2352 *id
2353 } else {
2354 if let OrderEventAny::Filled(fill) = event
2355 && is_leg_fill
2356 {
2357 log::info!(
2358 "Processing leg fill without corresponding order: {} for instrument {}",
2359 fill.client_order_id,
2360 fill.instrument_id
2361 );
2362 drop(cache);
2363 self.handle_leg_fill_without_order(*fill);
2364 return;
2365 }
2366
2367 log::error!(
2368 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
2369 event.client_order_id(),
2370 );
2371 return;
2372 };
2373
2374 if cache.order_exists(&client_order_id) {
2376 log::info!("Order with {client_order_id} was found in the cache");
2377 client_order_id
2378 } else {
2379 if let OrderEventAny::Filled(fill) = event
2380 && is_leg_fill
2381 {
2382 log::info!(
2383 "Processing leg fill without corresponding order: {} for instrument {}",
2384 fill.client_order_id,
2385 fill.instrument_id
2386 );
2387 drop(cache);
2388 self.handle_leg_fill_without_order(*fill);
2389 return;
2390 }
2391
2392 log::error!(
2393 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
2394 );
2395 return;
2396 }
2397 };
2398 let order_before_fill = if matches!(event, OrderEventAny::Filled(_)) {
2399 cache.order(&client_order_id).map(|o| o.clone())
2400 } else {
2401 None
2402 };
2403
2404 drop(cache);
2405
2406 let event = if event_client_order_id == client_order_id {
2407 event.clone()
2408 } else {
2409 event.clone().with_client_order_id(client_order_id)
2410 };
2411
2412 match &event {
2413 OrderEventAny::Filled(fill) => {
2414 let Some(order_before_fill) = order_before_fill else {
2415 log::error!(
2416 "Cannot apply fill: order {} not found in the cache",
2417 fill.client_order_id()
2418 );
2419 return;
2420 };
2421 let oms_type = self.determine_oms_type(fill);
2422 let position_id =
2423 self.determine_position_id(*fill, oms_type, Some(&order_before_fill));
2424
2425 let mut fill = *fill;
2426 fill.position_id = Some(position_id);
2427
2428 if self
2429 .validate_fill_for_order(&order_before_fill, &fill)
2430 .is_ok()
2431 {
2432 let event = OrderEventAny::Filled(fill);
2433 let Some(order) = self.update_cached_order(client_order_id, &event) else {
2434 return;
2435 };
2436
2437 let position_events = self.handle_order_fill(&order, fill, oms_type);
2438 self.publish_order_event(&event);
2439 self.publish_position_events(position_events);
2440 }
2441 }
2442 _ => {
2443 if self.update_cached_order(client_order_id, &event).is_some() {
2444 self.publish_order_event(&event);
2445 }
2446 }
2447 }
2448 }
2449
2450 fn handle_leg_fill_without_order(&mut self, mut fill: OrderFilled) {
2451 let instrument =
2452 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
2453 instrument.clone()
2454 } else {
2455 log::error!(
2456 "Cannot handle leg fill: no instrument found for {}, {fill}",
2457 fill.instrument_id,
2458 );
2459 return;
2460 };
2461
2462 if self.cache.borrow().account(&fill.account_id).is_none() {
2463 log::error!(
2464 "Cannot handle leg fill: no account found for {}, {fill}",
2465 fill.instrument_id.venue,
2466 );
2467 return;
2468 }
2469
2470 let oms_type = self.determine_oms_type(&fill);
2471 let position_id = self.determine_leg_fill_position_id(fill, oms_type);
2472 fill.position_id = Some(position_id);
2473 let duplicate_position_fill = self.position_contains_trade_id(position_id, fill.trade_id);
2474
2475 let event = OrderEventAny::Filled(fill);
2476 let portfolio_endpoint = MessagingSwitchboard::portfolio_update_order();
2477 msgbus::send_order_event(portfolio_endpoint, event.clone());
2478
2479 let position_events = if duplicate_position_fill {
2480 log::warn!(
2481 "Duplicate leg fill: {} trade_id={} already applied to position {}, skipping position update",
2482 fill.client_order_id,
2483 fill.trade_id,
2484 position_id
2485 );
2486 Vec::new()
2487 } else {
2488 self.handle_position_update(&instrument, fill, oms_type)
2489 };
2490 self.publish_order_event(&event);
2491 self.publish_position_events(position_events);
2492 }
2493
2494 fn determine_leg_fill_position_id(
2495 &mut self,
2496 fill: OrderFilled,
2497 oms_type: OmsType,
2498 ) -> PositionId {
2499 let cache = self.cache.borrow();
2500 let cached_position_id = cache.position_id(&fill.client_order_id()).copied();
2501 drop(cache);
2502
2503 if let Some(position_id) = cached_position_id {
2504 if let Some(fill_position_id) = fill.position_id
2505 && fill_position_id != position_id
2506 {
2507 log::warn!(
2508 "Incorrect position ID assigned to leg fill: \
2509 cached={position_id}, assigned={fill_position_id}; \
2510 re-assigning from cache",
2511 );
2512 }
2513
2514 return position_id;
2515 }
2516
2517 match oms_type {
2518 OmsType::Hedging => fill
2519 .position_id
2520 .unwrap_or_else(|| self.pos_id_generator.generate(fill.strategy_id, false)),
2521 OmsType::Netting => self.determine_netting_position_id(fill),
2522 _ => self.determine_netting_position_id(fill),
2523 }
2524 }
2525
2526 fn is_leg_fill(&self, fill: &OrderFilled) -> bool {
2527 if !fill.client_order_id.as_str().contains("-LEG-")
2528 && !fill.venue_order_id.as_str().contains("-LEG-")
2529 {
2530 return false;
2531 }
2532
2533 self.cache
2534 .borrow()
2535 .instrument(&fill.instrument_id)
2536 .is_some_and(|instrument| !instrument.is_spread())
2537 }
2538
2539 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
2540 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id)
2541 && *oms_type != OmsType::Unspecified
2542 {
2543 return *oms_type;
2544 }
2545
2546 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
2547 && let Some(client) = self.clients.get(client_id)
2548 {
2549 return client.oms_type;
2550 }
2551
2552 if let Some(client) = &self.default_client {
2553 return client.oms_type;
2554 }
2555
2556 OmsType::Netting }
2558
2559 fn resolve_oms_type_for_client(
2560 &self,
2561 strategy_id: StrategyId,
2562 client: &dyn ExecutionClient,
2563 ) -> OmsType {
2564 if let Some(oms_type) = self.oms_overrides.get(&strategy_id)
2565 && *oms_type != OmsType::Unspecified
2566 {
2567 return *oms_type;
2568 }
2569
2570 client.oms_type()
2571 }
2572
2573 fn check_position_id_against_oms(
2574 &self,
2575 instrument_id: InstrumentId,
2576 strategy_id: StrategyId,
2577 position_id: Option<PositionId>,
2578 client: &dyn ExecutionClient,
2579 ) -> Option<String> {
2580 let position_id = position_id?;
2581
2582 if self.resolve_oms_type_for_client(strategy_id, client) != OmsType::Netting {
2583 return None;
2584 }
2585
2586 let expected = format!("{instrument_id}-{strategy_id}");
2587 if position_id.as_str() == expected {
2588 return None;
2589 }
2590
2591 Some(format!(
2592 "`position_id` {position_id} is not valid for NETTING OMS; \
2593 expected '{expected}' (use HEDGING for custom position IDs)"
2594 ))
2595 }
2596
2597 fn determine_position_id(
2598 &mut self,
2599 fill: OrderFilled,
2600 oms_type: OmsType,
2601 order: Option<&OrderAny>,
2602 ) -> PositionId {
2603 let cache = self.cache.borrow();
2604 let cached_position_id = cache.position_id(&fill.client_order_id()).copied();
2605 drop(cache);
2606
2607 if self.config.debug {
2608 log::debug!(
2609 "Determining position ID for {}, position_id={:?}",
2610 fill.client_order_id(),
2611 cached_position_id,
2612 );
2613 }
2614
2615 if let Some(position_id) = cached_position_id {
2616 if let Some(fill_position_id) = fill.position_id
2617 && fill_position_id != position_id
2618 {
2619 log::warn!(
2620 "Incorrect position ID assigned to fill: \
2621 cached={position_id}, assigned={fill_position_id}; \
2622 re-assigning from cache",
2623 );
2624 }
2625
2626 if self.config.debug {
2627 log::debug!("Assigned {position_id} to {}", fill.client_order_id());
2628 }
2629
2630 return position_id;
2631 }
2632
2633 let position_id = match oms_type {
2634 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
2635 OmsType::Netting => self.determine_netting_position_id(fill),
2636 _ => self.determine_netting_position_id(fill),
2637 };
2638
2639 let order = if let Some(o) = order {
2640 o.clone()
2641 } else {
2642 let cache = self.cache.borrow();
2643 cache.order(&fill.client_order_id()).map_or_else(
2644 || {
2645 panic!(
2646 "Order for {} not found to determine position ID",
2647 fill.client_order_id()
2648 )
2649 },
2650 |o| o.clone(),
2651 )
2652 };
2653
2654 if order.exec_algorithm_id().is_some()
2655 && let Some(exec_spawn_id) = order.exec_spawn_id()
2656 {
2657 let cache = self.cache.borrow();
2658 let primary = if let Some(p) = cache.order(&exec_spawn_id) {
2659 p.clone()
2660 } else {
2661 log::warn!(
2662 "Primary exec spawn order {exec_spawn_id} not found, \
2663 skipping position ID propagation"
2664 );
2665 return position_id;
2666 };
2667 let primary_already_indexed = cache.position_id(&primary.client_order_id()).is_some();
2668 drop(cache);
2669
2670 if primary.position_id().is_none() && !primary_already_indexed {
2671 if let Some(mut primary_mut) = self.cache.borrow_mut().order_mut(&exec_spawn_id) {
2672 primary_mut.set_position_id(Some(position_id));
2673 }
2674 let _ = self.cache.borrow_mut().add_position_id(
2675 &position_id,
2676 &primary.instrument_id().venue,
2677 &primary.client_order_id(),
2678 &primary.strategy_id(),
2679 );
2680 log::debug!("Assigned primary order {position_id}");
2681 }
2682 }
2683
2684 position_id
2685 }
2686
2687 fn determine_hedging_position_id(
2688 &mut self,
2689 fill: OrderFilled,
2690 order: Option<&OrderAny>,
2691 ) -> PositionId {
2692 if let Some(position_id) = fill.position_id {
2694 if self.config.debug {
2695 log::debug!("Already had a position ID of: {position_id}");
2696 }
2697 return position_id;
2698 }
2699
2700 let cache = self.cache.borrow();
2701
2702 let exec_spawn_id = if let Some(o) = order {
2703 o.exec_spawn_id()
2704 } else {
2705 match cache.order(&fill.client_order_id()) {
2706 Some(o) => o.exec_spawn_id(),
2707 None => {
2708 panic!(
2709 "Order for {} not found to determine position ID",
2710 fill.client_order_id()
2711 );
2712 }
2713 }
2714 };
2715
2716 if let Some(spawn_id) = exec_spawn_id {
2718 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
2719 for spawned_order in spawn_orders {
2720 if let Some(pos_id) = spawned_order.position_id() {
2721 if self.config.debug {
2722 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
2723 }
2724 return pos_id;
2725 }
2726 }
2727 }
2728
2729 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
2731
2732 if self.config.debug {
2733 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
2734 }
2735 position_id
2736 }
2737
2738 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
2739 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
2740 }
2741
2742 fn validate_fill_for_order(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
2743 if order.is_duplicate_fill(fill) {
2744 log::warn!(
2745 "Duplicate fill: {} trade_id={} already applied, skipping",
2746 order.client_order_id(),
2747 fill.trade_id
2748 );
2749 anyhow::bail!("Duplicate fill");
2750 }
2751
2752 if let Some(position_id) = fill.position_id
2753 && self.position_contains_trade_id(position_id, fill.trade_id)
2754 {
2755 log::warn!(
2756 "Duplicate fill: {} trade_id={} already applied to position {}, skipping",
2757 order.client_order_id(),
2758 fill.trade_id,
2759 position_id
2760 );
2761 anyhow::bail!("Duplicate position fill");
2762 }
2763
2764 self.check_overfill(order, fill)
2765 }
2766
2767 fn position_contains_trade_id(&self, position_id: PositionId, trade_id: TradeId) -> bool {
2768 self.cache
2769 .borrow()
2770 .position(&position_id)
2771 .is_some_and(|position| position.trade_ids.contains(&trade_id))
2772 }
2773
2774 fn update_cached_order(
2775 &self,
2776 client_order_id: ClientOrderId,
2777 event: &OrderEventAny,
2778 ) -> Option<OrderAny> {
2779 let result = { self.cache.borrow_mut().update_order(event) };
2780
2781 let order = match result {
2782 Ok(order) => order,
2783 Err(e) => {
2784 if matches!(
2785 e.downcast_ref::<OrderError>(),
2786 Some(OrderError::InvalidStateTransition)
2787 ) {
2788 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
2789 return None;
2790 }
2791
2792 if let Some(OrderError::DuplicateFill(trade_id)) = e.downcast_ref::<OrderError>() {
2793 log::warn!(
2794 "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
2795 );
2796 return None;
2797 }
2798
2799 log::error!("Error applying event: {e}, did not apply {event}");
2800
2801 if matches!(
2802 event,
2803 OrderEventAny::Denied(_)
2804 | OrderEventAny::Rejected(_)
2805 | OrderEventAny::Canceled(_)
2806 | OrderEventAny::Expired(_)
2807 ) {
2808 log::warn!(
2809 "Terminal event {event} failed to apply to {client_order_id}, forcing cleanup from own book"
2810 );
2811 self.cache
2812 .borrow_mut()
2813 .force_remove_from_own_order_book(&client_order_id);
2814 } else {
2815 let order = self
2816 .cache
2817 .borrow()
2818 .order(&client_order_id)
2819 .map(|o| o.clone());
2820 if let Some(order) = order {
2821 let should_update_own_book = {
2822 let cache = self.cache.borrow();
2823 let own_book = cache.own_order_book(&order.instrument_id());
2824 (own_book.is_some() && order.is_closed())
2825 || should_handle_own_book_order(&order)
2826 };
2827
2828 if should_update_own_book {
2829 self.cache.borrow_mut().update_own_order_book(&order);
2830 }
2831 }
2832 }
2833 return None;
2834 }
2835 };
2836
2837 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
2838 let needs_own_book = {
2839 self.cache
2840 .borrow()
2841 .own_order_book(&order.instrument_id())
2842 .is_none()
2843 };
2844
2845 if needs_own_book {
2846 self.cache.borrow_mut().update_own_order_book(&order);
2847 }
2848 }
2849
2850 if self.config.debug {
2851 log::debug!("{SEND}{EVT} {event}");
2852 }
2853
2854 if self.config.snapshot_orders {
2855 self.create_order_state_snapshot(&order);
2856 }
2857
2858 self.send_order_update_to_portfolio(event);
2859
2860 Some(order)
2861 }
2862
2863 fn send_order_update_to_portfolio(&self, event: &OrderEventAny) {
2864 let send_to_portfolio = match event {
2865 OrderEventAny::Filled(fill) => self
2866 .cache
2867 .borrow()
2868 .account(&fill.account_id)
2869 .is_none_or(|account| !account.is_margin_account()),
2870 OrderEventAny::Accepted(_)
2871 | OrderEventAny::Canceled(_)
2872 | OrderEventAny::Expired(_)
2873 | OrderEventAny::Rejected(_)
2874 | OrderEventAny::Updated(_) => true,
2875 _ => false,
2876 };
2877
2878 if send_to_portfolio {
2879 let portfolio_endpoint = MessagingSwitchboard::portfolio_update_order();
2880 msgbus::send_order_event(portfolio_endpoint, event.clone());
2881 }
2882 }
2883
2884 fn publish_order_event(&self, event: &OrderEventAny) {
2885 let topic = switchboard::get_event_orders_topic(event.strategy_id());
2886 msgbus::publish_order_event(topic, event);
2887
2888 if let OrderEventAny::Canceled(_) = event {
2889 let cancels_topic = switchboard::get_order_cancels_topic(event.instrument_id());
2890 msgbus::publish_order_event(cancels_topic, event);
2891 }
2892 }
2893
2894 fn publish_position_events(&self, events: Vec<PositionEvent>) {
2895 for event in events {
2896 let strategy_id = match &event {
2897 PositionEvent::PositionOpened(event) => event.strategy_id,
2898 PositionEvent::PositionChanged(event) => event.strategy_id,
2899 PositionEvent::PositionClosed(event) => event.strategy_id,
2900 PositionEvent::PositionAdjusted(event) => event.strategy_id,
2901 };
2902 let topic = switchboard::get_event_positions_topic(strategy_id);
2903 msgbus::publish_position_event(topic, &event);
2904 }
2905 }
2906
2907 fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
2908 let potential_overfill = order.calculate_overfill(fill.last_qty);
2909
2910 if potential_overfill.is_positive() {
2911 if self.config.allow_overfills {
2912 log::warn!(
2913 "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
2914 order.client_order_id(),
2915 potential_overfill,
2916 order.filled_qty(),
2917 fill.last_qty,
2918 order.quantity()
2919 );
2920 } else {
2921 let msg = format!(
2922 "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
2923 Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
2924 order.client_order_id(),
2925 potential_overfill,
2926 order.filled_qty(),
2927 fill.last_qty,
2928 order.quantity()
2929 );
2930 anyhow::bail!("{msg}");
2931 }
2932 }
2933
2934 Ok(())
2935 }
2936
2937 fn handle_order_fill(
2938 &mut self,
2939 order: &OrderAny,
2940 fill: OrderFilled,
2941 oms_type: OmsType,
2942 ) -> Vec<PositionEvent> {
2943 let instrument =
2944 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
2945 instrument.clone()
2946 } else {
2947 log::error!(
2948 "Cannot handle order fill: no instrument found for {}, {fill}",
2949 fill.instrument_id,
2950 );
2951 return Vec::new();
2952 };
2953
2954 let is_margin_account = {
2955 let cache = self.cache.borrow();
2956 let Some(account) = cache.account(&fill.account_id) else {
2957 log::error!(
2958 "Cannot handle order fill: no account found for {}, {fill}",
2959 fill.instrument_id.venue,
2960 );
2961 return Vec::new();
2962 };
2963
2964 account.is_margin_account()
2965 };
2966
2967 if !instrument.is_spread() && is_margin_account {
2970 let portfolio_endpoint = MessagingSwitchboard::portfolio_update_order();
2971 msgbus::send_order_event(portfolio_endpoint, OrderEventAny::Filled(fill));
2972 }
2973
2974 let (position, position_events) = if instrument.is_spread() {
2975 (None, Vec::new())
2976 } else {
2977 let position_events = self.handle_position_update(&instrument, fill, oms_type);
2978 let position_id = fill.position_id.unwrap();
2979 (
2980 self.cache.borrow().position_owned(&position_id),
2981 position_events,
2982 )
2983 };
2984
2985 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
2988 if !instrument.is_spread()
2990 && let Some(ref pos) = position
2991 && pos.is_open()
2992 {
2993 let position_id = pos.id;
2994
2995 for client_order_id in order.linked_order_ids().unwrap_or_default() {
2996 let link = self.cache.borrow_mut().order_mut(client_order_id).and_then(
3000 |mut contingent_order| {
3001 if contingent_order.position_id().is_none() {
3002 contingent_order.set_position_id(Some(position_id));
3003 Some((
3004 contingent_order.instrument_id().venue,
3005 contingent_order.client_order_id(),
3006 contingent_order.strategy_id(),
3007 ))
3008 } else {
3009 None
3010 }
3011 },
3012 );
3013
3014 if let Some((venue, contingent_id, strategy_id)) = link
3015 && let Err(e) = self.cache.borrow_mut().add_position_id(
3016 &position_id,
3017 &venue,
3018 &contingent_id,
3019 &strategy_id,
3020 )
3021 {
3022 log::error!("Failed to add position ID: {e}");
3023 }
3024 }
3025 }
3026 }
3029
3030 let event = OrderEventAny::Filled(fill);
3031 let fills_topic = switchboard::get_order_fills_topic(fill.instrument_id);
3032 msgbus::publish_order_event(fills_topic, &event);
3033
3034 position_events
3035 }
3036
3037 fn handle_position_update(
3041 &mut self,
3042 instrument: &InstrumentAny,
3043 fill: OrderFilled,
3044 oms_type: OmsType,
3045 ) -> Vec<PositionEvent> {
3046 let position_id = if let Some(position_id) = fill.position_id {
3047 position_id
3048 } else {
3049 log::error!("Cannot handle position update: no position ID found for fill {fill}");
3050 return Vec::new();
3051 };
3052
3053 let position_opt = self.cache.borrow().position_owned(&position_id);
3054
3055 match position_opt {
3056 None => {
3057 if self.reject_reduce_only_netting_position_open(&fill, oms_type) {
3058 return Vec::new();
3059 }
3060
3061 self.open_position(instrument, None, fill, oms_type)
3062 .unwrap_or_default()
3063 }
3064 Some(pos) if pos.is_closed() => {
3065 if self.reject_reduce_only_netting_position_open(&fill, oms_type) {
3066 return Vec::new();
3067 }
3068
3069 self.open_position(instrument, Some(&pos), fill, oms_type)
3070 .unwrap_or_default()
3071 }
3072 Some(mut pos) => {
3073 if self.will_flip_position(&pos, fill) {
3074 self.flip_position(instrument, &mut pos, fill, oms_type)
3075 } else {
3076 self.update_position(&mut pos, fill).into_iter().collect()
3077 }
3078 }
3079 }
3080 }
3081
3082 fn reject_reduce_only_netting_position_open(
3083 &self,
3084 fill: &OrderFilled,
3085 oms_type: OmsType,
3086 ) -> bool {
3087 if oms_type != OmsType::Netting {
3088 return false;
3089 }
3090
3091 let cache = self.cache.borrow();
3092 let Some(order) = cache.order_owned(&fill.client_order_id) else {
3093 return false;
3094 };
3095
3096 if !order.is_reduce_only() {
3097 return false;
3098 }
3099
3100 let positions_open = cache.positions_open(
3101 None,
3102 Some(&fill.instrument_id),
3103 None,
3104 Some(&fill.account_id),
3105 None,
3106 );
3107 let position_id = fill
3108 .position_id
3109 .map_or_else(|| "None".to_string(), |position_id| position_id.to_string());
3110 let matching_position_details = Self::position_details(
3111 positions_open
3112 .iter()
3113 .filter(|position| position.is_opposite_side(fill.order_side))
3114 .map(|position| &**position),
3115 );
3116 let open_position_details =
3117 Self::position_details(positions_open.iter().map(|position| &**position));
3118
3119 log::error!(
3120 "Cannot open NETTING position {position_id} from reduce-only fill {} for {}; \
3121 matching_reduce_positions=[{}], open_positions=[{}]",
3122 fill.trade_id,
3123 fill.instrument_id,
3124 matching_position_details,
3125 open_position_details
3126 );
3127
3128 true
3129 }
3130
3131 fn open_position(
3132 &self,
3133 instrument: &InstrumentAny,
3134 position: Option<&Position>,
3135 fill: OrderFilled,
3136 oms_type: OmsType,
3137 ) -> anyhow::Result<Vec<PositionEvent>> {
3138 if let Some(position) = position {
3139 if Self::is_duplicate_closed_fill(position, &fill) {
3140 log::warn!(
3141 "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
3142 fill.trade_id,
3143 position.id,
3144 fill.order_side,
3145 fill.last_qty,
3146 fill.last_px
3147 );
3148 return Ok(Vec::new());
3149 }
3150 self.reopen_position(position, oms_type)?;
3151 }
3152
3153 let position = Position::new(instrument, fill);
3154 self.cache.borrow_mut().add_position(&position, oms_type)?;
3155
3156 if self.config.snapshot_positions {
3157 self.create_position_state_snapshot(&position);
3158 }
3159
3160 let ts_init = self.clock.borrow().timestamp_ns();
3161 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
3162
3163 Ok(vec![PositionEvent::PositionOpened(event)])
3164 }
3165
3166 fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
3167 position.events.iter().any(|event| {
3168 event.trade_id == fill.trade_id
3169 && event.order_side == fill.order_side
3170 && event.last_px == fill.last_px
3171 && event.last_qty == fill.last_qty
3172 })
3173 }
3174
3175 fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
3176 if oms_type == OmsType::Netting {
3177 if position.is_open() {
3178 anyhow::bail!(
3179 "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
3180 position.id
3181 );
3182 }
3183 let snapshot_ref = self.cache.borrow_mut().snapshot_position(position)?;
3185 self.anchor_snapshot(snapshot_ref);
3186 } else {
3187 log::warn!(
3189 "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
3190 position.id
3191 );
3192 }
3193 Ok(())
3194 }
3195
3196 fn anchor_snapshot(&self, snapshot_ref: CacheSnapshotRef) {
3197 let Some(anchorer) = &self.snapshot_anchorer else {
3198 return;
3199 };
3200
3201 if let Err(e) = anchorer(snapshot_ref) {
3202 log::error!("Failed to record cache snapshot anchor: {e}");
3203 }
3204 }
3205
3206 fn update_position(&self, position: &mut Position, fill: OrderFilled) -> Option<PositionEvent> {
3207 position.apply(&fill);
3209
3210 let is_closed = position.is_closed();
3212
3213 if let Err(e) = self.cache.borrow_mut().update_position(position) {
3215 log::error!("Failed to update position: {e:?}");
3216 return None;
3217 }
3218
3219 let cache = self.cache.borrow();
3221
3222 drop(cache);
3223
3224 if self.config.snapshot_positions {
3226 self.create_position_state_snapshot(position);
3227 }
3228
3229 let ts_init = self.clock.borrow().timestamp_ns();
3230
3231 if is_closed {
3232 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
3233 Some(PositionEvent::PositionClosed(event))
3234 } else {
3235 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
3236 Some(PositionEvent::PositionChanged(event))
3237 }
3238 }
3239
3240 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
3241 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
3242 }
3243
3244 fn position_signed_decimal_qty(position: &Position) -> Decimal {
3245 match position.side {
3246 PositionSide::Long => position.quantity.as_decimal(),
3247 PositionSide::Short => -position.quantity.as_decimal(),
3248 _ => Decimal::ZERO,
3249 }
3250 }
3251
3252 fn position_details<'a>(positions: impl IntoIterator<Item = &'a Position>) -> String {
3253 positions
3254 .into_iter()
3255 .map(|position| {
3256 format!(
3257 "{} strategy_id={} signed_qty={}",
3258 position.id,
3259 position.strategy_id,
3260 Self::position_signed_decimal_qty(position)
3261 )
3262 })
3263 .collect::<Vec<_>>()
3264 .join(", ")
3265 }
3266
3267 fn flip_position(
3268 &mut self,
3269 instrument: &InstrumentAny,
3270 position: &mut Position,
3271 fill: OrderFilled,
3272 oms_type: OmsType,
3273 ) -> Vec<PositionEvent> {
3274 let mut position_events = Vec::new();
3275 let difference = match position.side {
3276 PositionSide::Long => Quantity::from_raw(
3277 fill.last_qty.raw - position.quantity.raw,
3278 position.size_precision,
3279 ),
3280 PositionSide::Short => Quantity::from_raw(
3281 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
3283 ),
3284 _ => fill.last_qty,
3285 };
3286
3287 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
3289 let (commission1, commission2) = if let Some(commission) = fill.commission {
3290 let commission_currency = commission.currency;
3291 let commission1 = Money::new(commission * fill_percent, commission_currency);
3292 let commission2 = commission - commission1;
3293 (Some(commission1), Some(commission2))
3294 } else {
3295 log::error!("Commission is not available");
3296 (None, None)
3297 };
3298
3299 let mut fill_split1: Option<OrderFilled> = None;
3300
3301 if position.is_open() {
3302 fill_split1 = Some(OrderFilled::new(
3303 fill.trader_id,
3304 fill.strategy_id,
3305 fill.instrument_id,
3306 fill.client_order_id,
3307 fill.venue_order_id,
3308 fill.account_id,
3309 fill.trade_id,
3310 fill.order_side,
3311 fill.order_type,
3312 position.quantity,
3313 fill.last_px,
3314 fill.currency,
3315 fill.liquidity_side,
3316 fill.event_id,
3317 fill.ts_event,
3318 fill.ts_init,
3319 fill.reconciliation,
3320 fill.position_id,
3321 commission1,
3322 ));
3323
3324 if let Some(position_event) = self.update_position(position, fill_split1.unwrap()) {
3325 position_events.push(position_event);
3326 }
3327
3328 if oms_type == OmsType::Netting {
3330 match self.cache.borrow_mut().snapshot_position(position) {
3331 Ok(snapshot_ref) => self.anchor_snapshot(snapshot_ref),
3332 Err(e) => log::error!("Failed to snapshot position during flip: {e:?}"),
3333 }
3334 }
3335 }
3336
3337 if difference.raw == 0 {
3339 log::warn!(
3340 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
3341 );
3342 return position_events;
3343 }
3344
3345 let position_id_flip = if oms_type == OmsType::Hedging
3346 && let Some(position_id) = fill.position_id
3347 && position_id.is_virtual()
3348 {
3349 Some(self.pos_id_generator.generate(fill.strategy_id, true))
3351 } else {
3352 fill.position_id
3354 };
3355
3356 let fill_split2 = OrderFilled::new(
3357 fill.trader_id,
3358 fill.strategy_id,
3359 fill.instrument_id,
3360 fill.client_order_id,
3361 fill.venue_order_id,
3362 fill.account_id,
3363 fill.trade_id,
3364 fill.order_side,
3365 fill.order_type,
3366 difference,
3367 fill.last_px,
3368 fill.currency,
3369 fill.liquidity_side,
3370 UUID4::new(),
3371 fill.ts_event,
3372 fill.ts_init,
3373 fill.reconciliation,
3374 position_id_flip,
3375 commission2,
3376 );
3377
3378 if oms_type == OmsType::Hedging
3379 && let Some(position_id) = fill.position_id
3380 && position_id.is_virtual()
3381 {
3382 log::warn!("Closing position {fill_split1:?}");
3383 log::warn!("Flipping position {fill_split2:?}");
3384 }
3385
3386 match self.open_position(instrument, None, fill_split2, oms_type) {
3388 Ok(opened_events) => position_events.extend(opened_events),
3389 Err(e) => log::error!("Failed to open flipped position: {e:?}"),
3390 }
3391
3392 position_events
3393 }
3394
3395 pub fn set_position_id_counts(&mut self) {
3397 let cache = self.cache.borrow();
3398 let positions = cache.positions(None, None, None, None, None);
3399
3400 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
3402
3403 for position in positions {
3404 *counts.entry(position.strategy_id).or_insert(0) += 1;
3405 }
3406
3407 self.pos_id_generator.reset();
3408
3409 for (strategy_id, count) in counts {
3410 self.pos_id_generator.set_count(count, strategy_id);
3411 log::info!("Set PositionId count for {strategy_id} to {count}");
3412 }
3413 }
3414
3415 fn deny_order(&self, order: &OrderAny, reason: &str) {
3416 let denied = OrderDenied::new(
3417 order.trader_id(),
3418 order.strategy_id(),
3419 order.instrument_id(),
3420 order.client_order_id(),
3421 reason.into(),
3422 UUID4::new(),
3423 self.clock.borrow().timestamp_ns(),
3424 self.clock.borrow().timestamp_ns(),
3425 );
3426
3427 let event = OrderEventAny::Denied(denied);
3428 let order = match self.cache.borrow_mut().update_order(&event) {
3429 Ok(order) => order,
3430 Err(e) => {
3431 log::error!("Failed to apply denied event to order: {e}");
3432 return;
3433 }
3434 };
3435
3436 let topic = switchboard::get_event_orders_topic(order.strategy_id());
3437 msgbus::publish_order_event(topic, &event);
3438
3439 if self.config.snapshot_orders {
3440 self.create_order_state_snapshot(&order);
3441 }
3442 }
3443
3444 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
3445 let mut cache = self.cache.borrow_mut();
3446 if cache.own_order_book_mut(instrument_id).is_none() {
3447 let own_book = OwnOrderBook::new(*instrument_id);
3448 cache.add_own_order_book(own_book).unwrap();
3449 }
3450
3451 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
3452 }
3453}
3454
3455#[cfg(test)]
3456mod tests {
3457 use nautilus_model::{
3458 enums::{LiquiditySide, OrderSide, PositionSideSpecified},
3459 events::order::spec::OrderFilledSpec,
3460 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
3461 instruments::{InstrumentAny, stubs::audusd_sim},
3462 types::Price,
3463 };
3464 use rstest::*;
3465
3466 use super::*;
3467
3468 #[rstest]
3469 fn netting_positions_open_for_report_scopes_positions_by_account() {
3470 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3471 let account1_id = AccountId::from("SIM-001");
3472 let account2_id = AccountId::from("SIM-002");
3473 let position1 = position_for_account(
3474 &instrument,
3475 account1_id,
3476 StrategyId::from("S-001"),
3477 PositionId::from("P-ACC-1"),
3478 OrderSide::Buy,
3479 Quantity::from(1_000),
3480 );
3481 let position2 = position_for_account(
3482 &instrument,
3483 account2_id,
3484 StrategyId::from("S-002"),
3485 PositionId::from("P-ACC-2"),
3486 OrderSide::Buy,
3487 Quantity::from(2_000),
3488 );
3489 let mut cache = Cache::default();
3490 cache.add_position(&position1, OmsType::Netting).unwrap();
3491 cache.add_position(&position2, OmsType::Netting).unwrap();
3492
3493 let report = PositionStatusReport::new(
3494 account1_id,
3495 instrument.id(),
3496 PositionSideSpecified::Long,
3497 Quantity::from(1_000),
3498 UnixNanos::from(1_000_000),
3499 UnixNanos::from(1_000_000),
3500 None,
3501 None,
3502 None,
3503 );
3504
3505 let positions_open = ExecutionEngine::netting_positions_open_for_report(&cache, &report);
3506 let signed_qty: Decimal = positions_open
3507 .iter()
3508 .map(|position| ExecutionEngine::position_signed_decimal_qty(position))
3509 .sum();
3510
3511 assert_eq!(positions_open.len(), 1);
3512 assert_eq!(positions_open[0].id, position1.id);
3513 assert_eq!(signed_qty, Decimal::from(1_000));
3514 }
3515
3516 #[rstest]
3517 fn netting_split_position_ownership_message_reports_only_split_ownership() {
3518 let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3519 let account_id = AccountId::from("SIM-001");
3520 let external_position = position_for_account(
3521 &instrument,
3522 account_id,
3523 StrategyId::from("EXTERNAL"),
3524 PositionId::from("P-EXTERNAL"),
3525 OrderSide::Buy,
3526 Quantity::from(1_000),
3527 );
3528 let strategy_position = position_for_account(
3529 &instrument,
3530 account_id,
3531 StrategyId::from("S-001"),
3532 PositionId::from("P-STRATEGY"),
3533 OrderSide::Buy,
3534 Quantity::from(500),
3535 );
3536 let same_strategy_position = position_for_account(
3537 &instrument,
3538 account_id,
3539 StrategyId::from("EXTERNAL"),
3540 PositionId::from("P-EXTERNAL-2"),
3541 OrderSide::Buy,
3542 Quantity::from(250),
3543 );
3544 let report = PositionStatusReport::new(
3545 account_id,
3546 instrument.id(),
3547 PositionSideSpecified::Long,
3548 Quantity::from(1_500),
3549 UnixNanos::from(1_000_000),
3550 UnixNanos::from(1_000_000),
3551 None,
3552 None,
3553 None,
3554 );
3555
3556 let message = ExecutionEngine::netting_split_position_ownership_message(
3557 &report,
3558 &[&external_position, &strategy_position],
3559 )
3560 .expect("split ownership should produce a warning message");
3561
3562 assert!(message.contains("account_id=SIM-001"));
3563 assert!(message.contains(&format!("instrument_id={}", instrument.id())));
3564 assert!(message.contains("EXTERNAL"));
3565 assert!(message.contains("S-001"));
3566 assert!(message.contains("P-EXTERNAL"));
3567 assert!(message.contains("P-STRATEGY"));
3568 assert!(message.contains("signed_qty=1000"));
3569 assert!(message.contains("signed_qty=500"));
3570 assert!(
3571 ExecutionEngine::netting_split_position_ownership_message(
3572 &report,
3573 &[&external_position, &same_strategy_position],
3574 )
3575 .is_none()
3576 );
3577 }
3578
3579 fn position_for_account(
3580 instrument: &InstrumentAny,
3581 account_id: AccountId,
3582 strategy_id: StrategyId,
3583 position_id: PositionId,
3584 order_side: OrderSide,
3585 quantity: Quantity,
3586 ) -> Position {
3587 let client_order_id = ClientOrderId::from(format!("O-{position_id}"));
3588 let fill = OrderFilledSpec::builder()
3589 .strategy_id(strategy_id)
3590 .instrument_id(instrument.id())
3591 .client_order_id(client_order_id)
3592 .venue_order_id(VenueOrderId::from(format!("V-{position_id}")))
3593 .account_id(account_id)
3594 .trade_id(TradeId::new(format!("T-{position_id}")))
3595 .order_side(order_side)
3596 .last_qty(quantity)
3597 .last_px(Price::from("1.0"))
3598 .currency(instrument.quote_currency())
3599 .liquidity_side(LiquiditySide::Maker)
3600 .position_id(position_id)
3601 .commission(Money::from("2 USD"))
3602 .build();
3603
3604 Position::new(instrument, fill)
3605 }
3606}