1pub mod config;
24pub mod stubs;
25
26#[cfg(test)]
27mod tests;
28
29use std::{
30 cell::{RefCell, RefMut},
31 collections::{HashMap, HashSet},
32 fmt::Debug,
33 rc::Rc,
34 time::SystemTime,
35};
36
37use ahash::{AHashMap, AHashSet};
38use config::ExecutionEngineConfig;
39use futures::future::join_all;
40use nautilus_common::{
41 cache::Cache,
42 clock::Clock,
43 generators::position_id::PositionIdGenerator,
44 logging::{CMD, EVT, RECV, SEND},
45 messages::execution::{
46 BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder, QueryAccount, QueryOrder,
47 SubmitOrder, SubmitOrderList, TradingCommand,
48 },
49 msgbus::{
50 self, get_message_bus,
51 switchboard::{self},
52 },
53};
54use nautilus_core::UUID4;
55use nautilus_model::{
56 enums::{ContingencyType, OmsType, OrderSide, PositionSide},
57 events::{
58 OrderDenied, OrderEvent, OrderEventAny, OrderFilled, PositionChanged, PositionClosed,
59 PositionEvent, PositionOpened,
60 },
61 identifiers::{ClientId, ClientOrderId, InstrumentId, PositionId, StrategyId, Venue},
62 instruments::{Instrument, InstrumentAny},
63 orderbook::own::{OwnOrderBook, should_handle_own_book_order},
64 orders::{Order, OrderAny, OrderError},
65 position::Position,
66 reports::ExecutionMassStatus,
67 types::{Money, Price, Quantity},
68};
69
70use crate::client::{ExecutionClient, ExecutionClientAdapter};
71
72pub struct ExecutionEngine {
79 clock: Rc<RefCell<dyn Clock>>,
80 cache: Rc<RefCell<Cache>>,
81 clients: AHashMap<ClientId, ExecutionClientAdapter>,
82 default_client: Option<ExecutionClientAdapter>,
83 routing_map: HashMap<Venue, ClientId>,
84 oms_overrides: HashMap<StrategyId, OmsType>,
85 external_order_claims: HashMap<InstrumentId, StrategyId>,
86 external_clients: HashSet<ClientId>,
87 pos_id_generator: PositionIdGenerator,
88 config: ExecutionEngineConfig,
89}
90
91impl Debug for ExecutionEngine {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct(stringify!(ExecutionEngine))
94 .field("client_count", &self.clients.len())
95 .finish()
96 }
97}
98
99impl ExecutionEngine {
100 pub fn new(
102 clock: Rc<RefCell<dyn Clock>>,
103 cache: Rc<RefCell<Cache>>,
104 config: Option<ExecutionEngineConfig>,
105 ) -> Self {
106 let trader_id = get_message_bus().borrow().trader_id;
107 Self {
108 clock: clock.clone(),
109 cache,
110 clients: AHashMap::new(),
111 default_client: None,
112 routing_map: HashMap::new(),
113 oms_overrides: HashMap::new(),
114 external_order_claims: HashMap::new(),
115 external_clients: config
116 .as_ref()
117 .and_then(|c| c.external_clients.clone())
118 .unwrap_or_default()
119 .into_iter()
120 .collect(),
121 pos_id_generator: PositionIdGenerator::new(trader_id, clock),
122 config: config.unwrap_or_default(),
123 }
124 }
125
126 #[must_use]
127 pub fn position_id_count(&self, strategy_id: StrategyId) -> usize {
129 self.pos_id_generator.count(strategy_id)
130 }
131
132 #[must_use]
133 pub fn check_integrity(&self) -> bool {
135 self.cache.borrow_mut().check_integrity()
136 }
137
138 #[must_use]
139 pub fn check_connected(&self) -> bool {
141 let clients_connected = self.clients.values().all(|c| c.is_connected());
142 let default_connected = self
143 .default_client
144 .as_ref()
145 .is_none_or(|c| c.is_connected());
146 clients_connected && default_connected
147 }
148
149 #[must_use]
150 pub fn check_disconnected(&self) -> bool {
152 let clients_disconnected = self.clients.values().all(|c| !c.is_connected());
153 let default_disconnected = self
154 .default_client
155 .as_ref()
156 .is_none_or(|c| !c.is_connected());
157 clients_disconnected && default_disconnected
158 }
159
160 #[must_use]
161 pub fn check_residuals(&self) -> bool {
163 self.cache.borrow().check_residuals()
164 }
165
166 #[must_use]
167 pub fn get_external_order_claims_instruments(&self) -> HashSet<InstrumentId> {
169 self.external_order_claims.keys().copied().collect()
170 }
171
172 #[must_use]
173 pub fn get_external_client_ids(&self) -> HashSet<ClientId> {
175 self.external_clients.clone()
176 }
177
178 #[must_use]
179 pub fn get_external_order_claim(&self, instrument_id: &InstrumentId) -> Option<StrategyId> {
181 self.external_order_claims.get(instrument_id).copied()
182 }
183
184 pub fn register_client(&mut self, client: Box<dyn ExecutionClient>) -> anyhow::Result<()> {
192 let client_id = client.client_id();
193 let venue = client.venue();
194
195 if self.clients.contains_key(&client_id) {
196 anyhow::bail!("Client already registered with ID {client_id}");
197 }
198
199 let adapter = ExecutionClientAdapter::new(client);
200
201 self.routing_map.insert(venue, client_id);
202
203 log::debug!("Registered client {client_id}");
204 self.clients.insert(client_id, adapter);
205 Ok(())
206 }
207
208 pub fn register_default_client(&mut self, client: Box<dyn ExecutionClient>) {
210 let client_id = client.client_id();
211 let adapter = ExecutionClientAdapter::new(client);
212
213 log::debug!("Registered default client {client_id}");
214 self.default_client = Some(adapter);
215 }
216
217 #[must_use]
218 pub fn get_client(&self, client_id: &ClientId) -> Option<&dyn ExecutionClient> {
220 self.clients.get(client_id).map(|a| a.client.as_ref())
221 }
222
223 #[must_use]
224 pub fn get_client_adapter_mut(
226 &mut self,
227 client_id: &ClientId,
228 ) -> Option<&mut ExecutionClientAdapter> {
229 if let Some(default) = &self.default_client
230 && &default.client_id == client_id
231 {
232 return self.default_client.as_mut();
233 }
234 self.clients.get_mut(client_id)
235 }
236
237 pub async fn generate_mass_status(
243 &mut self,
244 client_id: &ClientId,
245 lookback_mins: Option<u64>,
246 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
247 if let Some(client) = self.get_client_adapter_mut(client_id) {
248 client.generate_mass_status(lookback_mins).await
249 } else {
250 anyhow::bail!("Client {client_id} not found")
251 }
252 }
253
254 #[must_use]
255 pub fn client_ids(&self) -> Vec<ClientId> {
257 let mut ids: Vec<_> = self.clients.keys().copied().collect();
258
259 if let Some(default) = &self.default_client {
260 ids.push(default.client_id);
261 }
262 ids
263 }
264
265 #[must_use]
266 pub fn get_clients_mut(&mut self) -> Vec<&mut ExecutionClientAdapter> {
268 let mut adapters: Vec<_> = self.clients.values_mut().collect();
269 if let Some(default) = &mut self.default_client {
270 adapters.push(default);
271 }
272 adapters
273 }
274
275 #[must_use]
276 pub fn get_clients_for_orders(&self, orders: &[OrderAny]) -> Vec<&dyn ExecutionClient> {
281 let mut client_ids: AHashSet<ClientId> = AHashSet::new();
282 let mut venues: AHashSet<Venue> = AHashSet::new();
283
284 for order in orders {
286 venues.insert(order.instrument_id().venue);
287 if let Some(client_id) = self.cache.borrow().client_id(&order.client_order_id()) {
288 client_ids.insert(*client_id);
289 }
290 }
291
292 let mut clients: Vec<&dyn ExecutionClient> = Vec::new();
293
294 for client_id in &client_ids {
296 if let Some(adapter) = self.clients.get(client_id)
297 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
298 {
299 clients.push(adapter.client.as_ref());
300 }
301 }
302
303 for venue in &venues {
305 if let Some(client_id) = self.routing_map.get(venue) {
306 if let Some(adapter) = self.clients.get(client_id)
307 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
308 {
309 clients.push(adapter.client.as_ref());
310 }
311 } else if let Some(adapter) = &self.default_client
312 && !clients.iter().any(|c| c.client_id() == adapter.client_id)
313 {
314 clients.push(adapter.client.as_ref());
315 }
316 }
317
318 clients
319 }
320
321 pub fn register_venue_routing(
327 &mut self,
328 client_id: ClientId,
329 venue: Venue,
330 ) -> anyhow::Result<()> {
331 if !self.clients.contains_key(&client_id) {
332 anyhow::bail!("No client registered with ID {client_id}");
333 }
334
335 self.routing_map.insert(venue, client_id);
336 log::info!("Set client {client_id} routing for {venue}");
337 Ok(())
338 }
339
340 pub fn register_oms_type(&mut self, strategy_id: StrategyId, oms_type: OmsType) {
344 self.oms_overrides.insert(strategy_id, oms_type);
345 log::info!("Registered OMS::{oms_type:?} for {strategy_id}");
346 }
347
348 pub fn register_external_order_claims(
356 &mut self,
357 strategy_id: StrategyId,
358 instrument_ids: HashSet<InstrumentId>,
359 ) -> anyhow::Result<()> {
360 for instrument_id in &instrument_ids {
362 if let Some(existing) = self.external_order_claims.get(instrument_id) {
363 anyhow::bail!(
364 "External order claim for {instrument_id} already exists for {existing}"
365 );
366 }
367 }
368
369 for instrument_id in &instrument_ids {
371 self.external_order_claims
372 .insert(*instrument_id, strategy_id);
373 }
374
375 if !instrument_ids.is_empty() {
376 log::info!("Registered external order claims for {strategy_id}: {instrument_ids:?}");
377 }
378
379 Ok(())
380 }
381
382 pub fn deregister_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
386 if self.clients.remove(&client_id).is_some() {
387 self.routing_map
389 .retain(|_, mapped_id| mapped_id != &client_id);
390 log::info!("Deregistered client {client_id}");
391 Ok(())
392 } else {
393 anyhow::bail!("No client registered with ID {client_id}")
394 }
395 }
396
397 pub async fn connect(&mut self) -> anyhow::Result<()> {
403 let futures: Vec<_> = self
404 .get_clients_mut()
405 .into_iter()
406 .map(|client| client.connect())
407 .collect();
408
409 let results = join_all(futures).await;
410 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
411
412 if errors.is_empty() {
413 Ok(())
414 } else {
415 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
416 anyhow::bail!(
417 "Failed to connect execution clients: {}",
418 error_msgs.join("; ")
419 )
420 }
421 }
422
423 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
429 let futures: Vec<_> = self
430 .get_clients_mut()
431 .into_iter()
432 .map(|client| client.disconnect())
433 .collect();
434
435 let results = join_all(futures).await;
436 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
437
438 if errors.is_empty() {
439 Ok(())
440 } else {
441 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
442 anyhow::bail!(
443 "Failed to disconnect execution clients: {}",
444 error_msgs.join("; ")
445 )
446 }
447 }
448
449 pub fn set_manage_own_order_books(&mut self, value: bool) {
451 self.config.manage_own_order_books = value;
452 }
453
454 pub fn set_convert_quote_qty_to_base(&mut self, value: bool) {
456 self.config.convert_quote_qty_to_base = value;
457 }
458
459 pub fn start_snapshot_timer(&mut self) {
463 if let Some(interval_secs) = self.config.snapshot_positions_interval_secs {
464 log::info!("Starting position snapshots timer at {interval_secs} second intervals");
465 }
466 }
467
468 pub fn stop_snapshot_timer(&mut self) {
470 if self.config.snapshot_positions_interval_secs.is_some() {
471 log::info!("Canceling position snapshots timer");
472 }
473 }
474
475 pub fn snapshot_open_position_states(&self) {
477 let positions: Vec<Position> = self
478 .cache
479 .borrow()
480 .positions_open(None, None, None, None)
481 .into_iter()
482 .cloned()
483 .collect();
484
485 for position in positions {
486 self.create_position_state_snapshot(&position);
487 }
488 }
489
490 #[allow(clippy::await_holding_refcell_ref)]
493 pub async fn load_cache(&mut self) -> anyhow::Result<()> {
499 let ts = SystemTime::now();
500
501 {
502 let mut cache = self.cache.borrow_mut();
503 cache.clear_index();
504 cache.cache_general()?;
505 self.cache.borrow_mut().cache_all().await?;
506 cache.build_index();
507 let _ = cache.check_integrity();
508
509 if self.config.manage_own_order_books {
510 for order in cache.orders(None, None, None, None) {
511 if order.is_closed() || !should_handle_own_book_order(order) {
512 continue;
513 }
514 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
515 own_book.add(order.to_own_book_order());
516 }
517 }
518 }
519
520 self.set_position_id_counts();
521
522 log::info!(
523 "Loaded cache in {}ms",
524 SystemTime::now()
525 .duration_since(ts)
526 .map_err(|e| anyhow::anyhow!("Failed to calculate duration: {e}"))?
527 .as_millis()
528 );
529
530 Ok(())
531 }
532
533 pub fn flush_db(&self) {
535 self.cache.borrow_mut().flush_db();
536 }
537
538 pub fn process(&mut self, event: &OrderEventAny) {
540 self.handle_event(event);
541 }
542
543 pub fn execute(&self, command: &TradingCommand) {
545 self.execute_command(command);
546 }
547
548 fn execute_command(&self, command: &TradingCommand) {
551 if self.config.debug {
552 log::debug!("{RECV}{CMD} {command:?}");
553 }
554
555 if let Some(cid) = command.client_id()
556 && self.external_clients.contains(&cid)
557 {
558 if self.config.debug {
559 log::debug!("Skipping execution command for external client {cid}: {command:?}");
560 }
561 return;
562 }
563
564 let client = if let Some(adapter) = command
565 .client_id()
566 .and_then(|cid| self.clients.get(&cid))
567 .or_else(|| {
568 self.routing_map
569 .get(&command.instrument_id().venue)
570 .and_then(|client_id| self.clients.get(client_id))
571 })
572 .or(self.default_client.as_ref())
573 {
574 adapter.client.as_ref()
575 } else {
576 log::error!(
577 "No execution client found for command: client_id={:?}, venue={}, command={command:?}",
578 command.client_id(),
579 command.instrument_id().venue,
580 );
581 return;
582 };
583
584 match command {
585 TradingCommand::SubmitOrder(cmd) => self.handle_submit_order(client, cmd),
586 TradingCommand::SubmitOrderList(cmd) => self.handle_submit_order_list(client, cmd),
587 TradingCommand::ModifyOrder(cmd) => self.handle_modify_order(client, cmd),
588 TradingCommand::CancelOrder(cmd) => self.handle_cancel_order(client, cmd),
589 TradingCommand::CancelAllOrders(cmd) => self.handle_cancel_all_orders(client, cmd),
590 TradingCommand::BatchCancelOrders(cmd) => self.handle_batch_cancel_orders(client, cmd),
591 TradingCommand::QueryOrder(cmd) => self.handle_query_order(client, cmd),
592 TradingCommand::QueryAccount(cmd) => self.handle_query_account(client, cmd),
593 }
594 }
595
596 fn handle_submit_order(&self, client: &dyn ExecutionClient, cmd: &SubmitOrder) {
597 let mut order = cmd.order.clone();
598 let client_order_id = order.client_order_id();
599 let instrument_id = order.instrument_id();
600
601 if !self.cache.borrow().order_exists(&client_order_id) {
603 {
605 let mut cache = self.cache.borrow_mut();
606 if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
607 {
608 log::error!("Error adding order to cache: {e}");
609 return;
610 }
611 }
612
613 if self.config.snapshot_orders {
614 self.create_order_state_snapshot(&order);
615 }
616 }
617
618 let instrument = {
620 let cache = self.cache.borrow();
621 if let Some(instrument) = cache.instrument(&instrument_id) {
622 instrument.clone()
623 } else {
624 log::error!(
625 "Cannot handle submit order: no instrument found for {instrument_id}, {cmd}",
626 );
627 return;
628 }
629 };
630
631 if self.config.convert_quote_qty_to_base
633 && !instrument.is_inverse()
634 && order.is_quote_quantity()
635 {
636 log::warn!(
637 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
638 );
639 let last_px = self.last_px_for_conversion(&instrument_id, order.order_side());
640
641 if let Some(price) = last_px {
642 let base_qty = instrument.get_base_quantity(order.quantity(), price);
643 self.set_order_base_qty(&mut order, base_qty);
644 } else {
645 self.deny_order(
646 &order,
647 &format!("no-price-to-convert-quote-qty {instrument_id}"),
648 );
649 return;
650 }
651 }
652
653 if self.config.manage_own_order_books && should_handle_own_book_order(&order) {
654 let mut own_book = self.get_or_init_own_order_book(&order.instrument_id());
655 own_book.add(order.to_own_book_order());
656 }
657
658 if let Err(e) = client.submit_order(cmd) {
660 log::error!("Error submitting order to client: {e}");
661 self.deny_order(
662 &cmd.order,
663 &format!("failed-to-submit-order-to-client: {e}"),
664 );
665 }
666 }
667
668 fn handle_submit_order_list(&self, client: &dyn ExecutionClient, cmd: &SubmitOrderList) {
669 let orders = cmd.order_list.orders.clone();
670
671 let mut cache = self.cache.borrow_mut();
672 for order in &orders {
673 if !cache.order_exists(&order.client_order_id()) {
674 if let Err(e) = cache.add_order(order.clone(), cmd.position_id, cmd.client_id, true)
675 {
676 log::error!("Error adding order to cache: {e}");
677 return;
678 }
679
680 if self.config.snapshot_orders {
681 self.create_order_state_snapshot(order);
682 }
683 }
684 }
685 drop(cache);
686
687 let instrument = {
688 let cache = self.cache.borrow();
689 if let Some(instrument) = cache.instrument(&cmd.instrument_id) {
690 instrument.clone()
691 } else {
692 log::error!(
693 "Cannot handle submit order list: no instrument found for {}, {cmd}",
694 cmd.instrument_id,
695 );
696 return;
697 }
698 };
699
700 if self.config.convert_quote_qty_to_base && !instrument.is_inverse() {
702 let mut conversions: Vec<(ClientOrderId, Quantity)> =
703 Vec::with_capacity(cmd.order_list.orders.len());
704
705 for order in &cmd.order_list.orders {
706 if !order.is_quote_quantity() {
707 continue; }
709
710 let last_px =
711 self.last_px_for_conversion(&order.instrument_id(), order.order_side());
712
713 if let Some(px) = last_px {
714 let base_qty = instrument.get_base_quantity(order.quantity(), px);
715 conversions.push((order.client_order_id(), base_qty));
716 } else {
717 for order in &cmd.order_list.orders {
718 self.deny_order(
719 order,
720 &format!("no-price-to-convert-quote-qty {}", order.instrument_id()),
721 );
722 }
723 return; }
725 }
726
727 if !conversions.is_empty() {
728 log::warn!(
729 "`convert_quote_qty_to_base` is deprecated; set `convert_quote_qty_to_base=false` to maintain consistent behavior"
730 );
731
732 let mut cache = self.cache.borrow_mut();
733 for (client_order_id, base_qty) in conversions {
734 if let Some(mut_order) = cache.mut_order(&client_order_id) {
735 self.set_order_base_qty(mut_order, base_qty);
736 }
737 }
738 }
739 }
740
741 if self.config.manage_own_order_books {
742 let mut own_book = self.get_or_init_own_order_book(&cmd.instrument_id);
743 for order in &cmd.order_list.orders {
744 if should_handle_own_book_order(order) {
745 own_book.add(order.to_own_book_order());
746 }
747 }
748 }
749
750 if let Err(e) = client.submit_order_list(cmd) {
752 log::error!("Error submitting order list to client: {e}");
753 for order in &orders {
754 self.deny_order(
755 order,
756 &format!("failed-to-submit-order-list-to-client: {e}"),
757 );
758 }
759 }
760 }
761
762 fn handle_modify_order(&self, client: &dyn ExecutionClient, cmd: &ModifyOrder) {
763 if let Err(e) = client.modify_order(cmd) {
764 log::error!("Error modifying order: {e}");
765 }
766 }
767
768 fn handle_cancel_order(&self, client: &dyn ExecutionClient, cmd: &CancelOrder) {
769 if let Err(e) = client.cancel_order(cmd) {
770 log::error!("Error canceling order: {e}");
771 }
772 }
773
774 fn handle_cancel_all_orders(&self, client: &dyn ExecutionClient, cmd: &CancelAllOrders) {
775 if let Err(e) = client.cancel_all_orders(cmd) {
776 log::error!("Error canceling all orders: {e}");
777 }
778 }
779
780 fn handle_batch_cancel_orders(&self, client: &dyn ExecutionClient, cmd: &BatchCancelOrders) {
781 if let Err(e) = client.batch_cancel_orders(cmd) {
782 log::error!("Error batch canceling orders: {e}");
783 }
784 }
785
786 fn handle_query_account(&self, client: &dyn ExecutionClient, cmd: &QueryAccount) {
787 if let Err(e) = client.query_account(cmd) {
788 log::error!("Error querying account: {e}");
789 }
790 }
791
792 fn handle_query_order(&self, client: &dyn ExecutionClient, cmd: &QueryOrder) {
793 if let Err(e) = client.query_order(cmd) {
794 log::error!("Error querying order: {e}");
795 }
796 }
797
798 fn create_order_state_snapshot(&self, order: &OrderAny) {
799 if self.config.debug {
800 log::debug!("Creating order state snapshot for {order}");
801 }
802
803 if self.cache.borrow().has_backing()
804 && let Err(e) = self.cache.borrow().snapshot_order_state(order)
805 {
806 log::error!("Failed to snapshot order state: {e}");
807 return;
808 }
809
810 if get_message_bus().borrow().has_backing {
811 let topic = switchboard::get_order_snapshots_topic(order.client_order_id());
812 msgbus::publish(topic, order);
813 }
814 }
815
816 fn create_position_state_snapshot(&self, position: &Position) {
817 if self.config.debug {
818 log::debug!("Creating position state snapshot for {position}");
819 }
820
821 let topic = switchboard::get_positions_snapshots_topic(position.id);
827 msgbus::publish(topic, position);
828 }
829
830 fn handle_event(&mut self, event: &OrderEventAny) {
833 if self.config.debug {
834 log::debug!("{RECV}{EVT} {event:?}");
835 }
836
837 let client_order_id = event.client_order_id();
838 let cache = self.cache.borrow();
839 let mut order = if let Some(order) = cache.order(&client_order_id) {
840 order.clone()
841 } else {
842 log::warn!(
843 "Order with {} not found in the cache to apply {}",
844 event.client_order_id(),
845 event
846 );
847
848 let venue_order_id = if let Some(id) = event.venue_order_id() {
850 id
851 } else {
852 log::error!(
853 "Cannot apply event to any order: {} not found in the cache with no VenueOrderId",
854 event.client_order_id()
855 );
856 return;
857 };
858
859 let client_order_id = if let Some(id) = cache.client_order_id(&venue_order_id) {
861 id
862 } else {
863 log::error!(
864 "Cannot apply event to any order: {} and {venue_order_id} not found in the cache",
865 event.client_order_id(),
866 );
867 return;
868 };
869
870 if let Some(order) = cache.order(client_order_id) {
872 log::info!("Order with {client_order_id} was found in the cache");
873 order.clone()
874 } else {
875 log::error!(
876 "Cannot apply event to any order: {client_order_id} and {venue_order_id} not found in cache",
877 );
878 return;
879 }
880 };
881
882 drop(cache);
883
884 match event {
885 OrderEventAny::Filled(fill) => {
886 let oms_type = self.determine_oms_type(fill);
887 let position_id = self.determine_position_id(*fill, oms_type, Some(&order));
888
889 let mut fill = *fill;
890 if fill.position_id.is_none() {
891 fill.position_id = Some(position_id);
892 }
893
894 if self.apply_fill_to_order(&mut order, fill).is_ok() {
895 self.handle_order_fill(&order, fill, oms_type);
896 }
897 }
898 _ => {
899 let _ = self.apply_event_to_order(&mut order, event.clone());
900 }
901 }
902 }
903
904 fn determine_oms_type(&self, fill: &OrderFilled) -> OmsType {
905 if let Some(oms_type) = self.oms_overrides.get(&fill.strategy_id) {
907 return *oms_type;
908 }
909
910 if let Some(client_id) = self.routing_map.get(&fill.instrument_id.venue)
912 && let Some(client) = self.clients.get(client_id)
913 {
914 return client.oms_type();
915 }
916
917 if let Some(client) = &self.default_client {
918 return client.oms_type();
919 }
920
921 OmsType::Netting }
923
924 fn determine_position_id(
925 &mut self,
926 fill: OrderFilled,
927 oms_type: OmsType,
928 order: Option<&OrderAny>,
929 ) -> PositionId {
930 match oms_type {
931 OmsType::Hedging => self.determine_hedging_position_id(fill, order),
932 OmsType::Netting => self.determine_netting_position_id(fill),
933 _ => self.determine_netting_position_id(fill), }
935 }
936
937 fn determine_hedging_position_id(
938 &mut self,
939 fill: OrderFilled,
940 order: Option<&OrderAny>,
941 ) -> PositionId {
942 if let Some(position_id) = fill.position_id {
944 if self.config.debug {
945 log::debug!("Already had a position ID of: {position_id}");
946 }
947 return position_id;
948 }
949
950 let cache = self.cache.borrow();
951
952 let order = if let Some(o) = order {
953 o
954 } else {
955 match cache.order(&fill.client_order_id()) {
956 Some(o) => o,
957 None => {
958 panic!(
959 "Order for {} not found to determine position ID",
960 fill.client_order_id()
961 );
962 }
963 }
964 };
965
966 if let Some(spawn_id) = order.exec_spawn_id() {
968 let spawn_orders = cache.orders_for_exec_spawn(&spawn_id);
969 for spawned_order in spawn_orders {
970 if let Some(pos_id) = spawned_order.position_id() {
971 if self.config.debug {
972 log::debug!("Found spawned {} for {}", pos_id, fill.client_order_id());
973 }
974 return pos_id;
975 }
976 }
977 }
978
979 let position_id = self.pos_id_generator.generate(fill.strategy_id, false);
981 if self.config.debug {
982 log::debug!("Generated {} for {}", position_id, fill.client_order_id());
983 }
984 position_id
985 }
986
987 fn determine_netting_position_id(&self, fill: OrderFilled) -> PositionId {
988 PositionId::new(format!("{}-{}", fill.instrument_id, fill.strategy_id))
989 }
990
991 fn apply_fill_to_order(&self, order: &mut OrderAny, fill: OrderFilled) -> anyhow::Result<()> {
992 if order.is_duplicate_fill(&fill) {
993 log::warn!(
994 "Duplicate fill: {} trade_id={} already applied, skipping",
995 order.client_order_id(),
996 fill.trade_id
997 );
998 anyhow::bail!("Duplicate fill");
999 }
1000
1001 self.check_overfill(order, &fill)?;
1002 let event = OrderEventAny::Filled(fill);
1003 self.apply_order_event(order, event)
1004 }
1005
1006 fn apply_event_to_order(
1007 &self,
1008 order: &mut OrderAny,
1009 event: OrderEventAny,
1010 ) -> anyhow::Result<()> {
1011 self.apply_order_event(order, event)
1012 }
1013
1014 fn apply_order_event(&self, order: &mut OrderAny, event: OrderEventAny) -> anyhow::Result<()> {
1015 if let Err(e) = order.apply(event.clone()) {
1016 match e {
1017 OrderError::InvalidStateTransition => {
1018 log::warn!("InvalidStateTrigger: {e}, did not apply {event}");
1021 }
1022 OrderError::DuplicateFill(trade_id) => {
1023 log::warn!(
1025 "Duplicate fill rejected at order level: trade_id={trade_id}, did not apply {event}"
1026 );
1027 anyhow::bail!("{e}");
1028 }
1029 _ => {
1030 log::error!("Error applying event: {e}, did not apply {event}");
1032 if should_handle_own_book_order(order) {
1033 self.cache.borrow_mut().update_own_order_book(order);
1034 }
1035 anyhow::bail!("{e}");
1036 }
1037 }
1038 }
1039
1040 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1041 log::error!("Error updating order in cache: {e}");
1042 }
1043
1044 if self.config.debug {
1045 log::debug!("{SEND}{EVT} {event}");
1046 }
1047
1048 let topic = switchboard::get_event_orders_topic(event.strategy_id());
1049 msgbus::publish(topic, &event);
1050
1051 if self.config.snapshot_orders {
1052 self.create_order_state_snapshot(order);
1053 }
1054
1055 Ok(())
1056 }
1057
1058 fn check_overfill(&self, order: &OrderAny, fill: &OrderFilled) -> anyhow::Result<()> {
1059 let potential_overfill = order.calculate_overfill(fill.last_qty);
1060
1061 if potential_overfill.is_positive() {
1062 if self.config.allow_overfills {
1063 log::warn!(
1064 "Order overfill detected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}",
1065 order.client_order_id(),
1066 potential_overfill,
1067 order.filled_qty(),
1068 fill.last_qty,
1069 order.quantity()
1070 );
1071 } else {
1072 let msg = format!(
1073 "Order overfill rejected: {} potential_overfill={}, current_filled={}, last_qty={}, quantity={}. \
1074 Set `allow_overfills=true` in ExecutionEngineConfig to allow overfills.",
1075 order.client_order_id(),
1076 potential_overfill,
1077 order.filled_qty(),
1078 fill.last_qty,
1079 order.quantity()
1080 );
1081 anyhow::bail!("{msg}");
1082 }
1083 }
1084
1085 Ok(())
1086 }
1087
1088 fn handle_order_fill(&mut self, order: &OrderAny, fill: OrderFilled, oms_type: OmsType) {
1089 let instrument =
1090 if let Some(instrument) = self.cache.borrow().instrument(&fill.instrument_id) {
1091 instrument.clone()
1092 } else {
1093 log::error!(
1094 "Cannot handle order fill: no instrument found for {}, {fill}",
1095 fill.instrument_id,
1096 );
1097 return;
1098 };
1099
1100 if self.cache.borrow().account(&fill.account_id).is_none() {
1101 log::error!(
1102 "Cannot handle order fill: no account found for {}, {fill}",
1103 fill.instrument_id.venue,
1104 );
1105 return;
1106 }
1107
1108 let position = if instrument.is_spread() {
1111 None
1112 } else {
1113 self.handle_position_update(instrument.clone(), fill, oms_type);
1114 let position_id = fill.position_id.unwrap();
1115 self.cache.borrow().position(&position_id).cloned()
1116 };
1117
1118 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1121 if !instrument.is_spread()
1123 && let Some(ref pos) = position
1124 && pos.is_open()
1125 {
1126 let position_id = pos.id;
1127 for client_order_id in order.linked_order_ids().unwrap_or_default() {
1128 let mut cache = self.cache.borrow_mut();
1129 let contingent_order = cache.mut_order(client_order_id);
1130 if let Some(contingent_order) = contingent_order
1131 && contingent_order.position_id().is_none()
1132 {
1133 contingent_order.set_position_id(Some(position_id));
1134
1135 if let Err(e) = self.cache.borrow_mut().add_position_id(
1136 &position_id,
1137 &contingent_order.instrument_id().venue,
1138 &contingent_order.client_order_id(),
1139 &contingent_order.strategy_id(),
1140 ) {
1141 log::error!("Failed to add position ID: {e}");
1142 }
1143 }
1144 }
1145 }
1146 }
1149 }
1150
1151 fn handle_position_update(
1155 &mut self,
1156 instrument: InstrumentAny,
1157 fill: OrderFilled,
1158 oms_type: OmsType,
1159 ) {
1160 let position_id = if let Some(position_id) = fill.position_id {
1161 position_id
1162 } else {
1163 log::error!("Cannot handle position update: no position ID found for fill {fill}");
1164 return;
1165 };
1166
1167 let position_opt = self.cache.borrow().position(&position_id).cloned();
1168
1169 match position_opt {
1170 None => {
1171 if self.open_position(instrument, None, fill, oms_type).is_ok() {
1173 }
1175 }
1176 Some(pos) if pos.is_closed() => {
1177 if self
1179 .open_position(instrument, Some(&pos), fill, oms_type)
1180 .is_ok()
1181 {
1182 }
1184 }
1185 Some(mut pos) => {
1186 if self.will_flip_position(&pos, fill) {
1187 self.flip_position(instrument, &mut pos, fill, oms_type);
1189 } else {
1190 self.update_position(&mut pos, fill);
1192 }
1193 }
1194 }
1195 }
1196
1197 fn open_position(
1198 &self,
1199 instrument: InstrumentAny,
1200 position: Option<&Position>,
1201 fill: OrderFilled,
1202 oms_type: OmsType,
1203 ) -> anyhow::Result<()> {
1204 if let Some(position) = position {
1205 if Self::is_duplicate_closed_fill(position, &fill) {
1206 log::warn!(
1207 "Ignoring duplicate fill {} for closed position {}; no position reopened (side={:?}, qty={}, px={})",
1208 fill.trade_id,
1209 position.id,
1210 fill.order_side,
1211 fill.last_qty,
1212 fill.last_px
1213 );
1214 return Ok(());
1215 }
1216 self.reopen_position(position, oms_type)?;
1217 }
1218
1219 let position = Position::new(&instrument, fill);
1220 self.cache
1221 .borrow_mut()
1222 .add_position(position.clone(), oms_type)?; if self.config.snapshot_positions {
1225 self.create_position_state_snapshot(&position);
1226 }
1227
1228 let ts_init = self.clock.borrow().timestamp_ns();
1229 let event = PositionOpened::create(&position, &fill, UUID4::new(), ts_init);
1230 let topic = switchboard::get_event_positions_topic(event.strategy_id);
1231 msgbus::publish(topic, &PositionEvent::PositionOpened(event));
1232
1233 Ok(())
1234 }
1235
1236 fn is_duplicate_closed_fill(position: &Position, fill: &OrderFilled) -> bool {
1237 position.events.iter().any(|event| {
1238 event.trade_id == fill.trade_id
1239 && event.order_side == fill.order_side
1240 && event.last_px == fill.last_px
1241 && event.last_qty == fill.last_qty
1242 })
1243 }
1244
1245 fn reopen_position(&self, position: &Position, oms_type: OmsType) -> anyhow::Result<()> {
1246 if oms_type == OmsType::Netting {
1247 if position.is_open() {
1248 anyhow::bail!(
1249 "Cannot reopen position {} (oms_type=NETTING): reopening is only valid for closed positions in NETTING mode",
1250 position.id
1251 );
1252 }
1253 self.cache.borrow_mut().snapshot_position(position)?;
1255 } else {
1256 log::warn!(
1258 "Received fill for closed position {} in HEDGING mode; creating new position and ignoring previous state",
1259 position.id
1260 );
1261 }
1262 Ok(())
1263 }
1264
1265 fn update_position(&self, position: &mut Position, fill: OrderFilled) {
1266 position.apply(&fill);
1268
1269 let is_closed = position.is_closed();
1271
1272 if let Err(e) = self.cache.borrow_mut().update_position(position) {
1274 log::error!("Failed to update position: {e:?}");
1275 return;
1276 }
1277
1278 let cache = self.cache.borrow();
1280
1281 drop(cache);
1282
1283 if self.config.snapshot_positions {
1285 self.create_position_state_snapshot(position);
1286 }
1287
1288 let topic = switchboard::get_event_positions_topic(position.strategy_id);
1290 let ts_init = self.clock.borrow().timestamp_ns();
1291
1292 if is_closed {
1293 let event = PositionClosed::create(position, &fill, UUID4::new(), ts_init);
1294 msgbus::publish(topic, &PositionEvent::PositionClosed(event));
1295 } else {
1296 let event = PositionChanged::create(position, &fill, UUID4::new(), ts_init);
1297 msgbus::publish(topic, &PositionEvent::PositionChanged(event));
1298 }
1299 }
1300
1301 fn will_flip_position(&self, position: &Position, fill: OrderFilled) -> bool {
1302 position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
1303 }
1304
1305 fn flip_position(
1306 &mut self,
1307 instrument: InstrumentAny,
1308 position: &mut Position,
1309 fill: OrderFilled,
1310 oms_type: OmsType,
1311 ) {
1312 let difference = match position.side {
1313 PositionSide::Long => Quantity::from_raw(
1314 fill.last_qty.raw - position.quantity.raw,
1315 position.size_precision,
1316 ),
1317 PositionSide::Short => Quantity::from_raw(
1318 position.quantity.raw.abs_diff(fill.last_qty.raw), position.size_precision,
1320 ),
1321 _ => fill.last_qty,
1322 };
1323
1324 let fill_percent = position.quantity.as_f64() / fill.last_qty.as_f64();
1326 let (commission1, commission2) = if let Some(commission) = fill.commission {
1327 let commission_currency = commission.currency;
1328 let commission1 = Money::new(commission * fill_percent, commission_currency);
1329 let commission2 = commission - commission1;
1330 (Some(commission1), Some(commission2))
1331 } else {
1332 log::error!("Commission is not available.");
1333 (None, None)
1334 };
1335
1336 let mut fill_split1: Option<OrderFilled> = None;
1337 if position.is_open() {
1338 fill_split1 = Some(OrderFilled::new(
1339 fill.trader_id,
1340 fill.strategy_id,
1341 fill.instrument_id,
1342 fill.client_order_id,
1343 fill.venue_order_id,
1344 fill.account_id,
1345 fill.trade_id,
1346 fill.order_side,
1347 fill.order_type,
1348 position.quantity,
1349 fill.last_px,
1350 fill.currency,
1351 fill.liquidity_side,
1352 UUID4::new(),
1353 fill.ts_event,
1354 fill.ts_init,
1355 fill.reconciliation,
1356 fill.position_id,
1357 commission1,
1358 ));
1359
1360 self.update_position(position, fill_split1.unwrap());
1361
1362 if oms_type == OmsType::Netting
1364 && let Err(e) = self.cache.borrow_mut().snapshot_position(position)
1365 {
1366 log::error!("Failed to snapshot position during flip: {e:?}");
1367 }
1368 }
1369
1370 if difference.raw == 0 {
1372 log::warn!(
1373 "Zero fill size during position flip calculation, this could be caused by a mismatch between instrument `size_precision` and a quantity `size_precision`"
1374 );
1375 return;
1376 }
1377
1378 let position_id_flip = if oms_type == OmsType::Hedging
1379 && let Some(position_id) = fill.position_id
1380 && position_id.is_virtual()
1381 {
1382 Some(self.pos_id_generator.generate(fill.strategy_id, true))
1384 } else {
1385 fill.position_id
1387 };
1388
1389 let fill_split2 = OrderFilled::new(
1390 fill.trader_id,
1391 fill.strategy_id,
1392 fill.instrument_id,
1393 fill.client_order_id,
1394 fill.venue_order_id,
1395 fill.account_id,
1396 fill.trade_id,
1397 fill.order_side,
1398 fill.order_type,
1399 difference,
1400 fill.last_px,
1401 fill.currency,
1402 fill.liquidity_side,
1403 UUID4::new(),
1404 fill.ts_event,
1405 fill.ts_init,
1406 fill.reconciliation,
1407 position_id_flip,
1408 commission2,
1409 );
1410
1411 if oms_type == OmsType::Hedging
1412 && let Some(position_id) = fill.position_id
1413 && position_id.is_virtual()
1414 {
1415 log::warn!("Closing position {fill_split1:?}");
1416 log::warn!("Flipping position {fill_split2:?}");
1417 }
1418
1419 if let Err(e) = self.open_position(instrument, None, fill_split2, oms_type) {
1421 log::error!("Failed to open flipped position: {e:?}");
1422 }
1423 }
1424
1425 fn set_position_id_counts(&mut self) {
1428 let cache = self.cache.borrow();
1430 let positions = cache.positions(None, None, None, None);
1431
1432 let mut counts: HashMap<StrategyId, usize> = HashMap::new();
1434
1435 for position in positions {
1436 *counts.entry(position.strategy_id).or_insert(0) += 1;
1437 }
1438
1439 self.pos_id_generator.reset();
1440
1441 for (strategy_id, count) in counts {
1442 self.pos_id_generator.set_count(count, strategy_id);
1443 log::info!("Set PositionId count for {strategy_id} to {count}");
1444 }
1445 }
1446
1447 fn last_px_for_conversion(
1448 &self,
1449 instrument_id: &InstrumentId,
1450 side: OrderSide,
1451 ) -> Option<Price> {
1452 let cache = self.cache.borrow();
1453
1454 if let Some(trade) = cache.trade(instrument_id) {
1456 return Some(trade.price);
1457 }
1458
1459 if let Some(quote) = cache.quote(instrument_id) {
1461 match side {
1462 OrderSide::Buy => Some(quote.ask_price),
1463 OrderSide::Sell => Some(quote.bid_price),
1464 OrderSide::NoOrderSide => None,
1465 }
1466 } else {
1467 None
1468 }
1469 }
1470
1471 fn set_order_base_qty(&self, order: &mut OrderAny, base_qty: Quantity) {
1472 log::info!(
1473 "Setting {} order quote quantity {} to base quantity {}",
1474 order.instrument_id(),
1475 order.quantity(),
1476 base_qty
1477 );
1478
1479 let original_qty = order.quantity();
1480 order.set_quantity(base_qty);
1481 order.set_leaves_qty(base_qty);
1482 order.set_is_quote_quantity(false);
1483
1484 if matches!(order.contingency_type(), Some(ContingencyType::Oto)) {
1485 return;
1486 }
1487
1488 if let Some(linked_order_ids) = order.linked_order_ids() {
1489 for client_order_id in linked_order_ids {
1490 match self.cache.borrow_mut().mut_order(client_order_id) {
1491 Some(contingent_order) => {
1492 if !contingent_order.is_quote_quantity() {
1493 continue; }
1495
1496 if contingent_order.quantity() != original_qty {
1497 log::warn!(
1498 "Contingent order quantity {} was not equal to the OTO parent original quantity {} when setting to base quantity of {}",
1499 contingent_order.quantity(),
1500 original_qty,
1501 base_qty
1502 );
1503 }
1504
1505 log::info!(
1506 "Setting {} order quote quantity {} to base quantity {}",
1507 contingent_order.instrument_id(),
1508 contingent_order.quantity(),
1509 base_qty
1510 );
1511
1512 contingent_order.set_quantity(base_qty);
1513 contingent_order.set_leaves_qty(base_qty);
1514 contingent_order.set_is_quote_quantity(false);
1515 }
1516 None => {
1517 log::error!("Contingency order {client_order_id} not found");
1518 }
1519 }
1520 }
1521 } else {
1522 log::warn!(
1523 "No linked order IDs found for order {}",
1524 order.client_order_id()
1525 );
1526 }
1527 }
1528
1529 fn deny_order(&self, order: &OrderAny, reason: &str) {
1530 log::error!(
1531 "Order denied: {reason}, order ID: {}",
1532 order.client_order_id()
1533 );
1534
1535 let denied = OrderDenied::new(
1536 order.trader_id(),
1537 order.strategy_id(),
1538 order.instrument_id(),
1539 order.client_order_id(),
1540 reason.into(),
1541 UUID4::new(),
1542 self.clock.borrow().timestamp_ns(),
1543 self.clock.borrow().timestamp_ns(),
1544 );
1545
1546 let mut order = order.clone();
1547
1548 if let Err(e) = order.apply(OrderEventAny::Denied(denied)) {
1549 log::error!("Failed to apply denied event to order: {e}");
1550 return;
1551 }
1552
1553 if let Err(e) = self.cache.borrow_mut().update_order(&order) {
1554 log::error!("Failed to update order in cache: {e}");
1555 return;
1556 }
1557
1558 let topic = switchboard::get_event_orders_topic(order.strategy_id());
1559 msgbus::publish(topic, &OrderEventAny::Denied(denied));
1560
1561 if self.config.snapshot_orders {
1562 self.create_order_state_snapshot(&order);
1563 }
1564 }
1565
1566 fn get_or_init_own_order_book(&self, instrument_id: &InstrumentId) -> RefMut<'_, OwnOrderBook> {
1567 let mut cache = self.cache.borrow_mut();
1568 if cache.own_order_book_mut(instrument_id).is_none() {
1569 let own_book = OwnOrderBook::new(*instrument_id);
1570 cache.add_own_order_book(own_book).unwrap();
1571 }
1572
1573 RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
1574 }
1575}