1#![allow(dead_code)]
18#![allow(unused_variables)]
19
20use std::{
21 any::Any,
22 cell::RefCell,
23 cmp::min,
24 fmt::Debug,
25 ops::{Add, Sub},
26 rc::Rc,
27};
28
29use ahash::AHashMap;
30use chrono::TimeDelta;
31use nautilus_common::{
32 cache::Cache,
33 clock::Clock,
34 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
35 msgbus,
36};
37use nautilus_core::{UUID4, UnixNanos};
38use nautilus_model::{
39 data::{Bar, BarType, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick, order::BookOrder},
40 enums::{
41 AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
42 LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OrderSide, OrderSideSpecified,
43 OrderStatus, OrderType, PriceType, TimeInForce,
44 },
45 events::{
46 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
47 OrderFilled, OrderModifyRejected, OrderRejected, OrderTriggered, OrderUpdated,
48 },
49 identifiers::{
50 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TraderId, Venue,
51 VenueOrderId,
52 },
53 instruments::{EXPIRING_INSTRUMENT_CLASSES, Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 orders::{Order, OrderAny, PassiveOrderAny, StopOrderAny},
56 position::Position,
57 types::{
58 Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
59 quantity::QuantityRaw,
60 },
61};
62use ustr::Ustr;
63
64use crate::{
65 matching_core::OrderMatchingCore,
66 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
67 models::{
68 fee::{FeeModel, FeeModelAny},
69 fill::FillModel,
70 },
71 protection::protection_price_calculate,
72 trailing::trailing_stop_calculate,
73};
74
75pub struct OrderMatchingEngine {
77 pub venue: Venue,
79 pub instrument: InstrumentAny,
81 pub raw_id: u32,
83 pub book_type: BookType,
85 pub oms_type: OmsType,
87 pub account_type: AccountType,
89 pub market_status: MarketStatus,
91 pub config: OrderMatchingEngineConfig,
93 clock: Rc<RefCell<dyn Clock>>,
94 cache: Rc<RefCell<Cache>>,
95 book: OrderBook,
96 pub core: OrderMatchingCore,
97 fill_model: FillModel,
98 fee_model: FeeModelAny,
99 target_bid: Option<Price>,
100 target_ask: Option<Price>,
101 target_last: Option<Price>,
102 last_bar_bid: Option<Bar>,
103 last_bar_ask: Option<Bar>,
104 execution_bar_types: AHashMap<InstrumentId, BarType>,
105 execution_bar_deltas: AHashMap<BarType, TimeDelta>,
106 account_ids: AHashMap<TraderId, AccountId>,
107 cached_filled_qty: AHashMap<ClientOrderId, Quantity>,
108 ids_generator: IdsGenerator,
109 last_trade_size: Option<Quantity>,
110 bid_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
111 ask_consumption: AHashMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
112 trade_consumption: QuantityRaw,
113}
114
115impl Debug for OrderMatchingEngine {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 f.debug_struct(stringify!(OrderMatchingEngine))
118 .field("venue", &self.venue)
119 .field("instrument", &self.instrument.id())
120 .finish()
121 }
122}
123
124impl OrderMatchingEngine {
125 #[allow(clippy::too_many_arguments)]
127 pub fn new(
128 instrument: InstrumentAny,
129 raw_id: u32,
130 fill_model: FillModel,
131 fee_model: FeeModelAny,
132 book_type: BookType,
133 oms_type: OmsType,
134 account_type: AccountType,
135 clock: Rc<RefCell<dyn Clock>>,
136 cache: Rc<RefCell<Cache>>,
137 config: OrderMatchingEngineConfig,
138 ) -> Self {
139 let book = OrderBook::new(instrument.id(), book_type);
140 let core = OrderMatchingCore::new(
141 instrument.id(),
142 instrument.price_increment(),
143 None, None, None, );
147 let ids_generator = IdsGenerator::new(
148 instrument.id().venue,
149 oms_type,
150 raw_id,
151 config.use_random_ids,
152 config.use_position_ids,
153 cache.clone(),
154 );
155
156 Self {
157 venue: instrument.id().venue,
158 instrument,
159 raw_id,
160 fill_model,
161 fee_model,
162 book_type,
163 oms_type,
164 account_type,
165 clock,
166 cache,
167 book,
168 core,
169 market_status: MarketStatus::Open,
170 config,
171 target_bid: None,
172 target_ask: None,
173 target_last: None,
174 last_bar_bid: None,
175 last_bar_ask: None,
176 execution_bar_types: AHashMap::new(),
177 execution_bar_deltas: AHashMap::new(),
178 account_ids: AHashMap::new(),
179 cached_filled_qty: AHashMap::new(),
180 ids_generator,
181 last_trade_size: None,
182 bid_consumption: AHashMap::new(),
183 ask_consumption: AHashMap::new(),
184 trade_consumption: 0,
185 }
186 }
187
188 pub fn reset(&mut self) {
194 self.book.clear(0, UnixNanos::default());
195 self.execution_bar_types.clear();
196 self.execution_bar_deltas.clear();
197 self.account_ids.clear();
198 self.cached_filled_qty.clear();
199 self.core.reset();
200 self.target_bid = None;
201 self.target_ask = None;
202 self.target_last = None;
203 self.last_trade_size = None;
204 self.bid_consumption.clear();
205 self.ask_consumption.clear();
206 self.trade_consumption = 0;
207 self.ids_generator.reset();
208
209 log::info!("Reset {}", self.instrument.id());
210 }
211
212 fn apply_liquidity_consumption(
213 &mut self,
214 fills: Vec<(Price, Quantity)>,
215 order_side: OrderSide,
216 leaves_qty: Quantity,
217 ) -> Vec<(Price, Quantity)> {
218 if !self.config.liquidity_consumption {
219 return fills;
220 }
221
222 let consumption = match order_side {
223 OrderSide::Buy => &mut self.ask_consumption,
224 OrderSide::Sell => &mut self.bid_consumption,
225 _ => return fills,
226 };
227
228 let mut adjusted_fills = Vec::with_capacity(fills.len());
229 let mut remaining_qty = leaves_qty.raw;
230
231 for (price, qty) in fills {
232 if remaining_qty == 0 {
233 break;
234 }
235
236 let price_raw = price.raw;
237 let book_size_f64 = self.book.get_quantity_for_price(price, order_side);
238 let book_size_raw = (book_size_f64 * 10f64.powi(FIXED_PRECISION as i32)) as QuantityRaw;
239
240 let (original_size, consumed) =
241 consumption.entry(price_raw).or_insert((book_size_raw, 0));
242
243 if *original_size != book_size_raw {
245 *original_size = book_size_raw;
246 *consumed = 0;
247 }
248
249 let available = original_size.saturating_sub(*consumed);
250 if available == 0 {
251 continue;
252 }
253
254 let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
255 if adjusted_qty_raw == 0 {
256 continue;
257 }
258
259 *consumed += adjusted_qty_raw;
260 remaining_qty -= adjusted_qty_raw;
261
262 let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
263 adjusted_fills.push((price, adjusted_qty));
264 }
265
266 adjusted_fills
267 }
268
269 pub const fn set_fill_model(&mut self, fill_model: FillModel) {
271 self.fill_model = fill_model;
272 }
273
274 #[must_use]
275 pub fn best_bid_price(&self) -> Option<Price> {
277 self.book.best_bid_price()
278 }
279
280 #[must_use]
281 pub fn best_ask_price(&self) -> Option<Price> {
283 self.book.best_ask_price()
284 }
285
286 #[must_use]
287 pub const fn get_book(&self) -> &OrderBook {
289 &self.book
290 }
291
292 #[must_use]
293 pub const fn get_open_bid_orders(&self) -> &[PassiveOrderAny] {
295 self.core.get_orders_bid()
296 }
297
298 #[must_use]
299 pub const fn get_open_ask_orders(&self) -> &[PassiveOrderAny] {
301 self.core.get_orders_ask()
302 }
303
304 #[must_use]
305 pub fn get_open_orders(&self) -> Vec<PassiveOrderAny> {
307 let mut orders = Vec::new();
309 orders.extend_from_slice(self.core.get_orders_bid());
310 orders.extend_from_slice(self.core.get_orders_ask());
311 orders
312 }
313
314 #[must_use]
315 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
317 self.core.order_exists(client_order_id)
318 }
319
320 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
330 log::debug!("Processing {delta}");
331
332 if matches!(delta.action, BookAction::Add | BookAction::Update) {
334 let price_prec = self.instrument.price_precision();
335 let size_prec = self.instrument.size_precision();
336 let instrument_id = self.instrument.id();
337
338 if delta.order.price.precision != price_prec {
339 anyhow::bail!(
340 "Invalid delta order price precision {prec}, expected {price_prec} for {instrument_id}",
341 prec = delta.order.price.precision
342 );
343 }
344 if delta.order.size.precision != size_prec {
345 anyhow::bail!(
346 "Invalid delta order size precision {prec}, expected {size_prec} for {instrument_id}",
347 prec = delta.order.size.precision
348 );
349 }
350 }
351
352 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
353 self.book.apply_delta(delta)?;
354 }
355
356 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
357 Ok(())
358 }
359
360 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
368 log::debug!("Processing {deltas}");
369
370 let price_prec = self.instrument.price_precision();
372 let size_prec = self.instrument.size_precision();
373 let instrument_id = self.instrument.id();
374
375 for delta in &deltas.deltas {
376 if matches!(delta.action, BookAction::Add | BookAction::Update) {
377 if delta.order.price.precision != price_prec {
378 anyhow::bail!(
379 "Invalid delta order price precision {prec}, expected {price_prec} for {instrument_id}",
380 prec = delta.order.price.precision
381 );
382 }
383 if delta.order.size.precision != size_prec {
384 anyhow::bail!(
385 "Invalid delta order size precision {prec}, expected {size_prec} for {instrument_id}",
386 prec = delta.order.size.precision
387 );
388 }
389 }
390 }
391
392 if self.book_type == BookType::L2_MBP || self.book_type == BookType::L3_MBO {
393 self.book.apply_deltas(deltas)?;
394 }
395
396 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
397 Ok(())
398 }
399
400 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
406 log::debug!("Processing {quote}");
407
408 let price_prec = self.instrument.price_precision();
409 let size_prec = self.instrument.size_precision();
410 let instrument_id = self.instrument.id();
411
412 assert!(
413 quote.bid_price.precision == price_prec,
414 "Invalid bid_price precision {}, expected {price_prec} for {instrument_id}",
415 quote.bid_price.precision
416 );
417 assert!(
418 quote.ask_price.precision == price_prec,
419 "Invalid ask_price precision {}, expected {price_prec} for {instrument_id}",
420 quote.ask_price.precision
421 );
422 assert!(
423 quote.bid_size.precision == size_prec,
424 "Invalid bid_size precision {}, expected {size_prec} for {instrument_id}",
425 quote.bid_size.precision
426 );
427 assert!(
428 quote.ask_size.precision == size_prec,
429 "Invalid ask_size precision {}, expected {size_prec} for {instrument_id}",
430 quote.ask_size.precision
431 );
432
433 if self.book_type == BookType::L1_MBP {
434 self.book.update_quote_tick(quote).unwrap();
435 }
436
437 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
438 }
439
440 pub fn process_bar(&mut self, bar: &Bar) {
451 log::debug!("Processing {bar}");
452
453 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
455 return;
456 }
457
458 let bar_type = bar.bar_type;
459 if bar_type.aggregation_source() == AggregationSource::Internal {
461 return;
462 }
463
464 let price_prec = self.instrument.price_precision();
465 let size_prec = self.instrument.size_precision();
466 let instrument_id = self.instrument.id();
467
468 assert!(
469 bar.open.precision == price_prec,
470 "Invalid bar open precision {}, expected {price_prec} for {instrument_id}",
471 bar.open.precision
472 );
473 assert!(
474 bar.high.precision == price_prec,
475 "Invalid bar high precision {}, expected {price_prec} for {instrument_id}",
476 bar.high.precision
477 );
478 assert!(
479 bar.low.precision == price_prec,
480 "Invalid bar low precision {}, expected {price_prec} for {instrument_id}",
481 bar.low.precision
482 );
483 assert!(
484 bar.close.precision == price_prec,
485 "Invalid bar close precision {}, expected {price_prec} for {instrument_id}",
486 bar.close.precision
487 );
488 assert!(
489 bar.volume.precision == size_prec,
490 "Invalid bar volume precision {}, expected {size_prec} for {instrument_id}",
491 bar.volume.precision
492 );
493
494 let execution_bar_type =
495 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
496 execution_bar_type.to_owned()
497 } else {
498 self.execution_bar_types
499 .insert(bar.instrument_id(), bar_type);
500 self.execution_bar_deltas
501 .insert(bar_type, bar_type.spec().timedelta());
502 bar_type
503 };
504
505 if execution_bar_type != bar_type {
506 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
507 if bar_type_timedelta.is_none() {
508 bar_type_timedelta = Some(bar_type.spec().timedelta());
509 self.execution_bar_deltas
510 .insert(bar_type, bar_type_timedelta.unwrap());
511 }
512 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
513 >= &bar_type_timedelta.unwrap()
514 {
515 self.execution_bar_types
516 .insert(bar_type.instrument_id(), bar_type);
517 } else {
518 return;
519 }
520 }
521
522 match bar_type.spec().price_type {
523 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
524 PriceType::Bid => {
525 self.last_bar_bid = Some(bar.to_owned());
526 self.process_quote_ticks_from_bar(bar);
527 }
528 PriceType::Ask => {
529 self.last_bar_ask = Some(bar.to_owned());
530 self.process_quote_ticks_from_bar(bar);
531 }
532 PriceType::Mark => panic!("Not implemented"),
533 }
534 }
535
536 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
537 let quarter_raw = bar.volume.raw / 4;
539 let remainder_raw = bar.volume.raw % 4;
540 let size = Quantity::from_raw(quarter_raw, bar.volume.precision);
541 let close_size = Quantity::from_raw(quarter_raw + remainder_raw, bar.volume.precision);
542
543 let aggressor_side = if !self.core.is_last_initialized || bar.open > self.core.last.unwrap()
544 {
545 AggressorSide::Buyer
546 } else {
547 AggressorSide::Seller
548 };
549
550 let mut trade_tick = TradeTick::new(
552 bar.instrument_id(),
553 bar.open,
554 size,
555 aggressor_side,
556 self.ids_generator.generate_trade_id(),
557 bar.ts_init,
558 bar.ts_init,
559 );
560
561 if !self.core.is_last_initialized {
564 self.book.update_trade_tick(&trade_tick).unwrap();
565 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
566 self.core.set_last_raw(trade_tick.price);
567 }
568
569 if self.core.last.is_some_and(|last| bar.high > last) {
572 trade_tick.price = bar.high;
573 trade_tick.aggressor_side = AggressorSide::Buyer;
574 trade_tick.trade_id = self.ids_generator.generate_trade_id();
575
576 self.book.update_trade_tick(&trade_tick).unwrap();
577 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
578
579 self.core.set_last_raw(trade_tick.price);
580 }
581
582 if self.core.last.is_some_and(|last| bar.low < last) {
586 trade_tick.price = bar.low;
587 trade_tick.aggressor_side = AggressorSide::Seller;
588 trade_tick.trade_id = self.ids_generator.generate_trade_id();
589
590 self.book.update_trade_tick(&trade_tick).unwrap();
591 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
592
593 self.core.set_last_raw(trade_tick.price);
594 }
595
596 if self.core.last.is_some_and(|last| bar.close != last) {
601 trade_tick.price = bar.close;
602 trade_tick.size = close_size;
603 if bar.close > self.core.last.unwrap() {
604 trade_tick.aggressor_side = AggressorSide::Buyer;
605 } else {
606 trade_tick.aggressor_side = AggressorSide::Seller;
607 }
608 trade_tick.trade_id = self.ids_generator.generate_trade_id();
609
610 self.book.update_trade_tick(&trade_tick).unwrap();
611 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
612
613 self.core.set_last_raw(trade_tick.price);
614 }
615 }
616
617 fn process_quote_ticks_from_bar(&mut self, bar: &Bar) {
618 if self.last_bar_bid.is_none()
620 || self.last_bar_ask.is_none()
621 || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
622 {
623 return;
624 }
625 let bid_bar = self.last_bar_bid.unwrap();
626 let ask_bar = self.last_bar_ask.unwrap();
627
628 let bid_quarter = bid_bar.volume.raw / 4;
630 let bid_remainder = bid_bar.volume.raw % 4;
631 let ask_quarter = ask_bar.volume.raw / 4;
632 let ask_remainder = ask_bar.volume.raw % 4;
633
634 let bid_size = Quantity::from_raw(bid_quarter, bar.volume.precision);
635 let ask_size = Quantity::from_raw(ask_quarter, bar.volume.precision);
636 let bid_close_size = Quantity::from_raw(bid_quarter + bid_remainder, bar.volume.precision);
637 let ask_close_size = Quantity::from_raw(ask_quarter + ask_remainder, bar.volume.precision);
638
639 let mut quote_tick = QuoteTick::new(
641 self.book.instrument_id,
642 bid_bar.open,
643 ask_bar.open,
644 bid_size,
645 ask_size,
646 bid_bar.ts_init,
647 bid_bar.ts_init,
648 );
649
650 self.book.update_quote_tick("e_tick).unwrap();
652 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
653
654 quote_tick.bid_price = bid_bar.high;
656 quote_tick.ask_price = ask_bar.high;
657 self.book.update_quote_tick("e_tick).unwrap();
658 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
659
660 quote_tick.bid_price = bid_bar.low;
662 quote_tick.ask_price = ask_bar.low;
663 self.book.update_quote_tick("e_tick).unwrap();
664 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
665
666 quote_tick.bid_price = bid_bar.close;
668 quote_tick.ask_price = ask_bar.close;
669 quote_tick.bid_size = bid_close_size;
670 quote_tick.ask_size = ask_close_size;
671 self.book.update_quote_tick("e_tick).unwrap();
672 self.iterate(quote_tick.ts_init, AggressorSide::NoAggressor);
673
674 self.last_bar_bid = None;
676 self.last_bar_ask = None;
677 }
678
679 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
690 log::debug!("Processing {trade}");
691
692 let price_prec = self.instrument.price_precision();
693 let size_prec = self.instrument.size_precision();
694 let instrument_id = self.instrument.id();
695
696 assert!(
697 trade.price.precision == price_prec,
698 "Invalid trade price precision {}, expected {price_prec} for {instrument_id}",
699 trade.price.precision
700 );
701 assert!(
702 trade.size.precision == size_prec,
703 "Invalid trade size precision {}, expected {size_prec} for {instrument_id}",
704 trade.size.precision
705 );
706
707 if self.book_type == BookType::L1_MBP {
708 self.book.update_trade_tick(trade).unwrap();
709 }
710
711 let price_raw = trade.price.raw;
712 self.core.set_last_raw(trade.price);
713
714 let mut original_bid: Option<Price> = None;
715 let mut original_ask: Option<Price> = None;
716
717 let mut aggressor_side = AggressorSide::NoAggressor;
720
721 if self.config.trade_execution {
722 aggressor_side = trade.aggressor_side;
723
724 match aggressor_side {
725 AggressorSide::Buyer => {
726 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
727 self.core.set_ask_raw(trade.price);
728 }
729 if self.core.bid.is_none()
730 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
731 {
732 self.core.set_bid_raw(trade.price);
733 }
734 }
735 AggressorSide::Seller => {
736 if self.core.bid.is_none()
737 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
738 {
739 self.core.set_bid_raw(trade.price);
740 }
741 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
742 self.core.set_ask_raw(trade.price);
743 }
744 }
745 AggressorSide::NoAggressor => {
746 if self.core.bid.is_none()
747 || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
748 {
749 self.core.set_bid_raw(trade.price);
750 }
751 if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
752 self.core.set_ask_raw(trade.price);
753 }
754 }
755 }
756
757 original_bid = self.core.bid;
758 original_ask = self.core.ask;
759
760 match aggressor_side {
761 AggressorSide::Seller => {
762 if original_ask.is_some_and(|ask| price_raw < ask.raw) {
763 self.core.set_ask_raw(trade.price);
764 }
765 }
766 AggressorSide::Buyer => {
767 if original_bid.is_some_and(|bid| price_raw > bid.raw) {
768 self.core.set_bid_raw(trade.price);
769 }
770 }
771 AggressorSide::NoAggressor => {}
772 }
773
774 self.last_trade_size = Some(trade.size);
775 self.trade_consumption = 0;
776 }
777
778 self.iterate(trade.ts_init, aggressor_side);
779
780 if self.config.trade_execution {
781 self.last_trade_size = None;
782 self.trade_consumption = 0;
783
784 match aggressor_side {
785 AggressorSide::Seller => {
786 if let Some(ask) = original_ask
787 && price_raw < ask.raw
788 {
789 self.core.ask = Some(ask);
790 }
791 }
792 AggressorSide::Buyer => {
793 if let Some(bid) = original_bid
794 && price_raw > bid.raw
795 {
796 self.core.bid = Some(bid);
797 }
798 }
799 AggressorSide::NoAggressor => {}
800 }
801 }
802 }
803
804 pub fn process_status(&mut self, action: MarketStatusAction) {
806 log::debug!("Processing {action}");
807
808 if self.market_status == MarketStatus::Closed
810 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
811 {
812 self.market_status = MarketStatus::Open;
813 }
814 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
816 self.market_status = MarketStatus::Paused;
817 }
818 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
820 self.market_status = MarketStatus::Suspended;
821 }
822 if self.market_status == MarketStatus::Open
824 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
825 {
826 self.market_status = MarketStatus::Closed;
827 }
828 }
829
830 #[allow(clippy::needless_return)]
841 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
842 {
844 let cache_borrow = self.cache.as_ref().borrow();
845
846 if self.core.order_exists(order.client_order_id()) {
847 self.generate_order_rejected(order, "Order already exists".into());
848 return;
849 }
850
851 self.account_ids.insert(order.trader_id(), account_id);
853
854 if EXPIRING_INSTRUMENT_CLASSES.contains(&self.instrument.instrument_class()) {
856 if let Some(activation_ns) = self.instrument.activation_ns()
857 && self.clock.borrow().timestamp_ns() < activation_ns
858 {
859 self.generate_order_rejected(
860 order,
861 format!(
862 "Contract {} is not yet active, activation {}",
863 self.instrument.id(),
864 self.instrument.activation_ns().unwrap()
865 )
866 .into(),
867 );
868 return;
869 }
870 if let Some(expiration_ns) = self.instrument.expiration_ns()
871 && self.clock.borrow().timestamp_ns() >= expiration_ns
872 {
873 self.generate_order_rejected(
874 order,
875 format!(
876 "Contract {} has expired, expiration {}",
877 self.instrument.id(),
878 self.instrument.expiration_ns().unwrap()
879 )
880 .into(),
881 );
882 return;
883 }
884 }
885
886 if self.config.support_contingent_orders {
888 if let Some(parent_order_id) = order.parent_order_id() {
889 let parent_order = cache_borrow.order(&parent_order_id);
890 if parent_order.is_none()
891 || parent_order.unwrap().contingency_type().unwrap() != ContingencyType::Oto
892 {
893 panic!("OTO parent not found");
894 }
895 if let Some(parent_order) = parent_order {
896 let parent_order_status = parent_order.status();
897 let order_is_open = order.is_open();
898 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
899 self.generate_order_rejected(
900 order,
901 format!("Rejected OTO order from {parent_order_id}").into(),
902 );
903 return;
904 } else if parent_order.status() == OrderStatus::Accepted
905 && parent_order.status() == OrderStatus::Triggered
906 {
907 log::info!(
908 "Pending OTO order {} triggers from {parent_order_id}",
909 order.client_order_id(),
910 );
911 return;
912 }
913 }
914 }
915
916 if let Some(linked_order_ids) = order.linked_order_ids() {
917 for client_order_id in linked_order_ids {
918 match cache_borrow.order(client_order_id) {
919 Some(contingent_order)
920 if (order.contingency_type().unwrap() == ContingencyType::Oco
921 || order.contingency_type().unwrap()
922 == ContingencyType::Ouo)
923 && !order.is_closed()
924 && contingent_order.is_closed() =>
925 {
926 self.generate_order_rejected(
927 order,
928 format!("Contingent order {client_order_id} already closed")
929 .into(),
930 );
931 return;
932 }
933 None => panic!("Cannot find contingent order for {client_order_id}"),
934 _ => {}
935 }
936 }
937 }
938 }
939
940 if order.quantity().precision != self.instrument.size_precision() {
942 self.generate_order_rejected(
943 order,
944 format!(
945 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
946 order.client_order_id(),
947 order.quantity().precision,
948 self.instrument.id(),
949 self.instrument.size_precision()
950 )
951 .into(),
952 );
953 return;
954 }
955
956 if let Some(price) = order.price()
958 && price.precision != self.instrument.price_precision()
959 {
960 self.generate_order_rejected(
961 order,
962 format!(
963 "Invalid order price precision for order {}, was {} when {} price precision is {}",
964 order.client_order_id(),
965 price.precision,
966 self.instrument.id(),
967 self.instrument.price_precision()
968 )
969 .into(),
970 );
971 return;
972 }
973
974 if let Some(trigger_price) = order.trigger_price()
976 && trigger_price.precision != self.instrument.price_precision()
977 {
978 self.generate_order_rejected(
979 order,
980 format!(
981 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
982 order.client_order_id(),
983 trigger_price.precision,
984 self.instrument.id(),
985 self.instrument.price_precision()
986 )
987 .into(),
988 );
989 return;
990 }
991
992 let position: Option<&Position> = cache_borrow
994 .position_for_order(&order.client_order_id())
995 .or_else(|| {
996 if self.oms_type == OmsType::Netting {
997 let position_id = PositionId::new(
998 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
999 );
1000 cache_borrow.position(&position_id)
1001 } else {
1002 None
1003 }
1004 });
1005
1006 if order.order_side() == OrderSide::Sell
1008 && self.account_type != AccountType::Margin
1009 && matches!(self.instrument, InstrumentAny::Equity(_))
1010 && (position.is_none()
1011 || !order.would_reduce_only(position.unwrap().side, position.unwrap().quantity))
1012 {
1013 let position_string = position.map_or("None".to_string(), |pos| pos.id.to_string());
1014 self.generate_order_rejected(
1015 order,
1016 format!(
1017 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
1018 )
1019 .into(),
1020 );
1021 return;
1022 }
1023
1024 if self.config.use_reduce_only
1026 && order.is_reduce_only()
1027 && !order.is_closed()
1028 && position.is_none_or(|pos| {
1029 pos.is_closed()
1030 || (order.is_buy() && pos.is_long())
1031 || (order.is_sell() && pos.is_short())
1032 })
1033 {
1034 self.generate_order_rejected(
1035 order,
1036 format!(
1037 "Reduce-only order {} ({}-{}) would have increased position",
1038 order.client_order_id(),
1039 order.order_type().to_string().to_uppercase(),
1040 order.order_side().to_string().to_uppercase()
1041 )
1042 .into(),
1043 );
1044 return;
1045 }
1046 }
1047
1048 match order.order_type() {
1049 OrderType::Market if self.config.price_protection_points.is_some() => {
1050 self.process_market_order_with_protection(order);
1051 }
1052 OrderType::Market => self.process_market_order(order),
1053 OrderType::Limit => self.process_limit_order(order),
1054 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
1055 OrderType::StopMarket if self.config.price_protection_points.is_some() => {
1056 self.process_stop_market_order_with_protection(order);
1057 }
1058 OrderType::StopMarket => self.process_stop_market_order(order),
1059 OrderType::StopLimit => self.process_stop_limit_order(order),
1060 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
1061 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
1062 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
1063 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
1064 }
1065 }
1066
1067 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
1069 if let Some(order) = self.core.get_order(command.client_order_id) {
1070 self.update_order(
1071 &mut order.to_any(),
1072 command.quantity,
1073 command.price,
1074 command.trigger_price,
1075 None,
1076 );
1077 } else {
1078 self.generate_order_modify_rejected(
1079 command.trader_id,
1080 command.strategy_id,
1081 command.instrument_id,
1082 command.client_order_id,
1083 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1084 command.venue_order_id,
1085 Some(account_id),
1086 );
1087 }
1088 }
1089
1090 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
1092 match self.core.get_order(command.client_order_id) {
1093 Some(passive_order) => {
1094 if passive_order.is_inflight() || passive_order.is_open() {
1095 self.cancel_order(&OrderAny::from(passive_order.to_owned()), None);
1096 }
1097 }
1098 None => self.generate_order_cancel_rejected(
1099 command.trader_id,
1100 command.strategy_id,
1101 account_id,
1102 command.instrument_id,
1103 command.client_order_id,
1104 command.venue_order_id,
1105 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
1106 ),
1107 }
1108 }
1109
1110 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, account_id: AccountId) {
1112 let instrument_id = command.instrument_id;
1113 let open_orders = self
1114 .cache
1115 .borrow()
1116 .orders_open(None, Some(&instrument_id), None, None)
1117 .into_iter()
1118 .cloned()
1119 .collect::<Vec<OrderAny>>();
1120 for order in open_orders {
1121 if command.order_side != OrderSide::NoOrderSide
1122 && command.order_side != order.order_side()
1123 {
1124 continue;
1125 }
1126 if order.is_inflight() || order.is_open() {
1127 self.cancel_order(&order, None);
1128 }
1129 }
1130 }
1131
1132 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
1134 for order in &command.cancels {
1135 self.process_cancel(order, account_id);
1136 }
1137 }
1138
1139 fn process_market_order(&mut self, order: &mut OrderAny) {
1140 if order.time_in_force() == TimeInForce::AtTheOpen
1141 || order.time_in_force() == TimeInForce::AtTheClose
1142 {
1143 log::error!(
1144 "Market auction for the time in force {} is currently not supported",
1145 order.time_in_force()
1146 );
1147 return;
1148 }
1149
1150 let order_side = order.order_side();
1152 let is_ask_initialized = self.core.is_ask_initialized;
1153 let is_bid_initialized = self.core.is_bid_initialized;
1154 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1155 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1156 {
1157 self.generate_order_rejected(
1158 order,
1159 format!("No market for {}", order.instrument_id()).into(),
1160 );
1161 return;
1162 }
1163
1164 self.fill_market_order(order);
1165 }
1166
1167 fn process_market_order_with_protection(&mut self, order: &mut OrderAny) {
1168 if order.time_in_force() == TimeInForce::AtTheOpen
1169 || order.time_in_force() == TimeInForce::AtTheClose
1170 {
1171 log::error!(
1172 "Market auction for the time in force {} is currently not supported",
1173 order.time_in_force()
1174 );
1175 return;
1176 }
1177
1178 let order_side = order.order_side();
1180 let is_ask_initialized = self.core.is_ask_initialized;
1181 let is_bid_initialized = self.core.is_bid_initialized;
1182 if (order_side == OrderSide::Buy && !self.core.is_ask_initialized)
1183 || (order_side == OrderSide::Sell && !self.core.is_bid_initialized)
1184 {
1185 self.generate_order_rejected(
1186 order,
1187 format!("No market for {}", order.instrument_id()).into(),
1188 );
1189 return;
1190 }
1191
1192 self.update_protection_price(order);
1193
1194 let protection_price = order
1195 .price()
1196 .expect("Market order with protection must have a protection price");
1197
1198 self.accept_order(order);
1200
1201 if self
1203 .core
1204 .is_limit_matched(order.order_side_specified(), protection_price)
1205 {
1206 if order.liquidity_side().is_some()
1208 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
1209 {
1210 order.set_liquidity_side(LiquiditySide::Taker);
1211 }
1212 self.fill_limit_order(order);
1213 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1214 self.cancel_order(order, None);
1215 }
1216 }
1217
1218 fn process_limit_order(&mut self, order: &mut OrderAny) {
1219 let limit_px = order.price().expect("Limit order must have a price");
1220 if order.is_post_only()
1221 && self
1222 .core
1223 .is_limit_matched(order.order_side_specified(), limit_px)
1224 {
1225 self.generate_order_rejected(
1226 order,
1227 format!(
1228 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
1229 order.order_type(),
1230 order.order_side(),
1231 order.price().unwrap(),
1232 self.core
1233 .bid
1234 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1235 self.core
1236 .ask
1237 .map_or_else(|| "None".to_string(), |p| p.to_string())
1238 )
1239 .into(),
1240 );
1241 return;
1242 }
1243
1244 self.accept_order(order);
1246
1247 if self
1249 .core
1250 .is_limit_matched(order.order_side_specified(), limit_px)
1251 {
1252 if order.liquidity_side().is_some()
1254 && order.liquidity_side().unwrap() == LiquiditySide::NoLiquiditySide
1255 {
1256 order.set_liquidity_side(LiquiditySide::Taker);
1257 }
1258 self.fill_limit_order(order);
1259 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
1260 self.cancel_order(order, None);
1261 }
1262 }
1263
1264 fn process_market_to_limit_order(&mut self, order: &mut OrderAny) {
1265 if (order.order_side() == OrderSide::Buy && !self.core.is_ask_initialized)
1267 || (order.order_side() == OrderSide::Sell && !self.core.is_bid_initialized)
1268 {
1269 self.generate_order_rejected(
1270 order,
1271 format!("No market for {}", order.instrument_id()).into(),
1272 );
1273 return;
1274 }
1275
1276 self.fill_market_order(order);
1278
1279 if order.is_open() {
1280 self.accept_order(order);
1281 }
1282 }
1283
1284 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
1285 let stop_px = order
1286 .trigger_price()
1287 .expect("Stop order must have a trigger price");
1288 if self
1289 .core
1290 .is_stop_matched(order.order_side_specified(), stop_px)
1291 {
1292 if self.config.reject_stop_orders {
1293 self.generate_order_rejected(
1294 order,
1295 format!(
1296 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1297 order.order_type(),
1298 order.order_side(),
1299 order.trigger_price().unwrap(),
1300 self.core
1301 .bid
1302 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1303 self.core
1304 .ask
1305 .map_or_else(|| "None".to_string(), |p| p.to_string())
1306 ).into(),
1307 );
1308 return;
1309 }
1310 self.fill_market_order(order);
1311 return;
1312 }
1313
1314 self.accept_order(order);
1316 }
1317
1318 fn process_stop_market_order_with_protection(&mut self, order: &mut OrderAny) {
1319 let stop_px = order
1320 .trigger_price()
1321 .expect("Stop order must have a trigger price");
1322
1323 let order_side = order.order_side();
1324 let is_ask_initialized = self.core.is_ask_initialized;
1325 let is_bid_initialized = self.core.is_bid_initialized;
1326 if (order_side == OrderSide::Buy && !self.core.is_ask_initialized)
1327 || (order_side == OrderSide::Sell && !self.core.is_bid_initialized)
1328 {
1329 self.generate_order_rejected(
1330 order,
1331 format!("No market for {}", order.instrument_id()).into(),
1332 );
1333 return;
1334 }
1335
1336 self.update_protection_price(order);
1337 let protection_price = order
1338 .price()
1339 .expect("Market order with protection must have a protection price");
1340
1341 if self
1342 .core
1343 .is_stop_matched(order.order_side_specified(), stop_px)
1344 {
1345 if self.config.reject_stop_orders {
1346 self.generate_order_rejected(
1347 order,
1348 format!(
1349 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1350 order.order_type(),
1351 order.order_side(),
1352 order.trigger_price().unwrap(),
1353 self.core
1354 .bid
1355 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1356 self.core
1357 .ask
1358 .map_or_else(|| "None".to_string(), |p| p.to_string())
1359 ).into(),
1360 );
1361 return;
1362 } else {
1363 self.accept_order(order);
1365 }
1366
1367 if self
1368 .core
1369 .is_limit_matched(order.order_side_specified(), protection_price)
1370 {
1371 self.fill_limit_order(order);
1372 }
1373 return;
1374 }
1375 self.accept_order(order);
1377 }
1378
1379 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
1380 let stop_px = order
1381 .trigger_price()
1382 .expect("Stop order must have a trigger price");
1383 if self
1384 .core
1385 .is_stop_matched(order.order_side_specified(), stop_px)
1386 {
1387 if self.config.reject_stop_orders {
1388 self.generate_order_rejected(
1389 order,
1390 format!(
1391 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1392 order.order_type(),
1393 order.order_side(),
1394 order.trigger_price().unwrap(),
1395 self.core
1396 .bid
1397 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1398 self.core
1399 .ask
1400 .map_or_else(|| "None".to_string(), |p| p.to_string())
1401 ).into(),
1402 );
1403 return;
1404 }
1405
1406 self.accept_order(order);
1407 self.generate_order_triggered(order);
1408
1409 let limit_px = order.price().expect("Stop limit order must have a price");
1411 if self
1412 .core
1413 .is_limit_matched(order.order_side_specified(), limit_px)
1414 {
1415 order.set_liquidity_side(LiquiditySide::Taker);
1416 self.fill_limit_order(order);
1417 }
1418 }
1419
1420 self.accept_order(order);
1422 }
1423
1424 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
1425 if self
1426 .core
1427 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1428 {
1429 if self.config.reject_stop_orders {
1430 self.generate_order_rejected(
1431 order,
1432 format!(
1433 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1434 order.order_type(),
1435 order.order_side(),
1436 order.trigger_price().unwrap(),
1437 self.core
1438 .bid
1439 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1440 self.core
1441 .ask
1442 .map_or_else(|| "None".to_string(), |p| p.to_string())
1443 ).into(),
1444 );
1445 return;
1446 }
1447 self.fill_market_order(order);
1448 return;
1449 }
1450
1451 self.accept_order(order);
1453 }
1454
1455 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
1456 if self
1457 .core
1458 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
1459 {
1460 if self.config.reject_stop_orders {
1461 self.generate_order_rejected(
1462 order,
1463 format!(
1464 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1465 order.order_type(),
1466 order.order_side(),
1467 order.trigger_price().unwrap(),
1468 self.core
1469 .bid
1470 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1471 self.core
1472 .ask
1473 .map_or_else(|| "None".to_string(), |p| p.to_string())
1474 ).into(),
1475 );
1476 return;
1477 }
1478 self.accept_order(order);
1479 self.generate_order_triggered(order);
1480
1481 if self
1483 .core
1484 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
1485 {
1486 order.set_liquidity_side(LiquiditySide::Taker);
1487 self.fill_limit_order(order);
1488 }
1489 return;
1490 }
1491
1492 self.accept_order(order);
1494 }
1495
1496 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
1497 if let Some(trigger_price) = order.trigger_price()
1498 && self
1499 .core
1500 .is_stop_matched(order.order_side_specified(), trigger_price)
1501 {
1502 self.generate_order_rejected(
1503 order,
1504 format!(
1505 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
1506 order.order_type(),
1507 order.order_side(),
1508 trigger_price,
1509 self.core
1510 .bid
1511 .map_or_else(|| "None".to_string(), |p| p.to_string()),
1512 self.core
1513 .ask
1514 .map_or_else(|| "None".to_string(), |p| p.to_string())
1515 ).into(),
1516 );
1517 return;
1518 }
1519
1520 self.accept_order(order);
1522 }
1523
1524 pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
1537 if aggressor_side == AggressorSide::NoAggressor {
1542 if self.book.has_bid() {
1543 self.core.set_bid_raw(self.book.best_bid_price().unwrap());
1544 }
1545 if self.book.has_ask() {
1546 self.core.set_ask_raw(self.book.best_ask_price().unwrap());
1547 }
1548 }
1549
1550 self.core.iterate();
1551
1552 let orders_bid = self.core.get_orders_bid().to_vec();
1553 let orders_ask = self.core.get_orders_ask().to_vec();
1554
1555 self.iterate_orders(timestamp_ns, &orders_bid);
1556 self.iterate_orders(timestamp_ns, &orders_ask);
1557
1558 self.core.bid = self.book.best_bid_price();
1561 self.core.ask = self.book.best_ask_price();
1562 }
1563
1564 fn maybe_activate_trailing_stop(
1565 &mut self,
1566 order: &mut OrderAny,
1567 bid: Option<Price>,
1568 ask: Option<Price>,
1569 ) -> bool {
1570 match order {
1571 OrderAny::TrailingStopMarket(inner) => {
1572 if inner.is_activated {
1573 return true;
1574 }
1575
1576 if inner.activation_price.is_none() {
1577 let px = match inner.order_side() {
1578 OrderSide::Buy => ask,
1579 OrderSide::Sell => bid,
1580 _ => None,
1581 };
1582 if let Some(p) = px {
1583 inner.activation_price = Some(p);
1584 inner.set_activated();
1585 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1586 log::error!("Failed to update order: {e}");
1587 }
1588 return true;
1589 }
1590 return false;
1591 }
1592
1593 let activation_price = inner.activation_price.unwrap();
1594 let hit = match inner.order_side() {
1595 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1596 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1597 _ => false,
1598 };
1599 if hit {
1600 inner.set_activated();
1601 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1602 log::error!("Failed to update order: {e}");
1603 }
1604 }
1605 hit
1606 }
1607 OrderAny::TrailingStopLimit(inner) => {
1608 if inner.is_activated {
1609 return true;
1610 }
1611
1612 if inner.activation_price.is_none() {
1613 let px = match inner.order_side() {
1614 OrderSide::Buy => ask,
1615 OrderSide::Sell => bid,
1616 _ => None,
1617 };
1618 if let Some(p) = px {
1619 inner.activation_price = Some(p);
1620 inner.set_activated();
1621 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1622 log::error!("Failed to update order: {e}");
1623 }
1624 return true;
1625 }
1626 return false;
1627 }
1628
1629 let activation_price = inner.activation_price.unwrap();
1630 let hit = match inner.order_side() {
1631 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
1632 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
1633 _ => false,
1634 };
1635 if hit {
1636 inner.set_activated();
1637 if let Err(e) = self.cache.borrow_mut().update_order(order) {
1638 log::error!("Failed to update order: {e}");
1639 }
1640 }
1641 hit
1642 }
1643 _ => true,
1644 }
1645 }
1646
1647 fn iterate_orders(&mut self, timestamp_ns: UnixNanos, orders: &[PassiveOrderAny]) {
1648 for order in orders {
1649 if order.is_closed() {
1650 continue;
1651 }
1652
1653 if self.config.support_gtd_orders
1654 && order
1655 .expire_time()
1656 .is_some_and(|expire_timestamp_ns| timestamp_ns >= expire_timestamp_ns)
1657 {
1658 let _ = self.core.delete_order(order);
1659 self.cached_filled_qty.remove(&order.client_order_id());
1660 self.expire_order(order);
1661 continue;
1662 }
1663
1664 if matches!(
1665 order,
1666 PassiveOrderAny::Stop(
1667 StopOrderAny::TrailingStopMarket(_) | StopOrderAny::TrailingStopLimit(_)
1668 )
1669 ) {
1670 let mut any = OrderAny::from(order.clone());
1671
1672 if !self.maybe_activate_trailing_stop(&mut any, self.core.bid, self.core.ask) {
1673 continue;
1674 }
1675
1676 self.update_trailing_stop_order(&mut any);
1677 }
1678
1679 if let Some(target_bid) = self.target_bid {
1681 self.core.bid = Some(target_bid);
1682 self.target_bid = None;
1683 }
1684 if let Some(target_bid) = self.target_bid.take() {
1685 self.core.bid = Some(target_bid);
1686 self.target_bid = None;
1687 }
1688 if let Some(target_ask) = self.target_ask.take() {
1689 self.core.ask = Some(target_ask);
1690 self.target_ask = None;
1691 }
1692 if let Some(target_last) = self.target_last.take() {
1693 self.core.last = Some(target_last);
1694 self.target_last = None;
1695 }
1696 }
1697
1698 self.target_bid = None;
1700 self.target_ask = None;
1701 self.target_last = None;
1702 }
1703
1704 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1705 match order.price() {
1706 Some(order_price) => {
1707 let mut fills = if self.config.liquidity_consumption {
1712 let size_prec = self.instrument.size_precision();
1713 self.book
1714 .get_all_crossed_levels(order.order_side(), order_price, size_prec)
1715 } else {
1716 let book_order =
1717 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
1718 self.book.simulate_fills(&book_order)
1719 };
1720
1721 if let Some(trade_size) = self.last_trade_size
1723 && let Some(trade_price) = self.core.last
1724 {
1725 let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
1726
1727 if !fills_at_trade_price
1728 && self
1729 .core
1730 .is_limit_matched(order.order_side_specified(), order_price)
1731 {
1732 let leaves_qty = order.leaves_qty();
1733
1734 let available_qty = if self.config.liquidity_consumption {
1736 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
1737 Quantity::from_raw(remaining, trade_size.precision)
1738 } else {
1739 trade_size
1740 };
1741
1742 let fill_qty = min(leaves_qty, available_qty);
1743
1744 if !fill_qty.is_zero() {
1745 log::debug!(
1746 "Trade execution fill: {} @ {} (available: {}, book had {} fills)",
1747 fill_qty,
1748 trade_price,
1749 available_qty,
1750 fills.len()
1751 );
1752
1753 fills = vec![(trade_price, fill_qty)];
1754
1755 if self.config.liquidity_consumption {
1756 self.trade_consumption += fill_qty.raw;
1757 }
1758 }
1759 }
1760 }
1761
1762 if fills.is_empty() {
1764 return fills;
1765 }
1766
1767 if let Some(triggered_price) = order.trigger_price() {
1769 if order
1771 .liquidity_side()
1772 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
1773 {
1774 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
1775 let first_fill = fills.first().unwrap();
1777 let triggered_qty = first_fill.1;
1778 fills[0] = (triggered_price, triggered_qty);
1779 self.target_bid = self.core.bid;
1780 self.target_ask = self.core.ask;
1781 self.target_last = self.core.last;
1782 self.core.set_ask_raw(order_price);
1783 self.core.set_last_raw(order_price);
1784 } else if order.order_side() == OrderSide::Buy
1785 && order_price < triggered_price
1786 {
1787 let first_fill = fills.first().unwrap();
1789 let triggered_qty = first_fill.1;
1790 fills[0] = (triggered_price, triggered_qty);
1791 self.target_bid = self.core.bid;
1792 self.target_ask = self.core.ask;
1793 self.target_last = self.core.last;
1794 self.core.set_bid_raw(order_price);
1795 self.core.set_last_raw(order_price);
1796 }
1797 }
1798 }
1799
1800 if order
1802 .liquidity_side()
1803 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1804 {
1805 match order.order_side().as_specified() {
1806 OrderSideSpecified::Buy => {
1807 let target_price = if order
1808 .trigger_price()
1809 .is_some_and(|trigger_price| order_price > trigger_price)
1810 {
1811 order.trigger_price().unwrap()
1812 } else {
1813 order_price
1814 };
1815 for fill in &fills {
1816 let last_px = fill.0;
1817 if last_px < order_price {
1818 self.target_bid = self.core.bid;
1820 self.target_ask = self.core.ask;
1821 self.target_last = self.core.last;
1822 self.core.set_ask_raw(target_price);
1823 self.core.set_last_raw(target_price);
1824 }
1825 }
1826 }
1827 OrderSideSpecified::Sell => {
1828 let target_price = if order
1829 .trigger_price()
1830 .is_some_and(|trigger_price| order_price < trigger_price)
1831 {
1832 order.trigger_price().unwrap()
1833 } else {
1834 order_price
1835 };
1836 for fill in &fills {
1837 let last_px = fill.0;
1838 if last_px > order_price {
1839 self.target_bid = self.core.bid;
1841 self.target_ask = self.core.ask;
1842 self.target_last = self.core.last;
1843 self.core.set_bid_raw(target_price);
1844 self.core.set_last_raw(target_price);
1845 }
1846 }
1847 }
1848 }
1849 }
1850
1851 self.apply_liquidity_consumption(fills, order.order_side(), order.leaves_qty())
1852 }
1853 None => panic!("Limit order must have a price"),
1854 }
1855 }
1856
1857 fn determine_market_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
1858 let price = match order.order_side().as_specified() {
1859 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
1860 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
1861 };
1862
1863 let fills = if self.config.liquidity_consumption {
1866 let size_prec = self.instrument.size_precision();
1867 self.book
1868 .get_all_crossed_levels(order.order_side(), price, size_prec)
1869 } else {
1870 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
1871 self.book.simulate_fills(&book_order)
1872 };
1873
1874 self.apply_liquidity_consumption(fills, order.order_side(), order.leaves_qty())
1875 }
1876
1877 pub fn fill_market_order(&mut self, order: &mut OrderAny) {
1882 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
1883 && filled_qty >= &order.quantity()
1884 {
1885 log::info!(
1886 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
1887 filled_qty,
1888 order.quantity(),
1889 order.filled_qty(),
1890 order.quantity()
1891 );
1892 return;
1893 }
1894
1895 let venue_position_id = self.ids_generator.get_position_id(order, Some(true));
1896 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
1897 let cache = self.cache.as_ref().borrow();
1898 cache.position(&venue_position_id).cloned()
1899 } else {
1900 None
1901 };
1902
1903 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1904 log::warn!(
1905 "Canceling REDUCE_ONLY {} as would increase position",
1906 order.order_type()
1907 );
1908 self.cancel_order(order, None);
1909 return;
1910 }
1911 order.set_liquidity_side(LiquiditySide::Taker);
1913 let fills = self.determine_market_price_and_volume(order);
1914 self.apply_fills(order, fills, LiquiditySide::Taker, None, position);
1915 }
1916
1917 pub fn fill_limit_order(&mut self, order: &mut OrderAny) {
1926 match order.price() {
1927 Some(order_price) => {
1928 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
1929 if let Some(&qty) = cached_filled_qty
1930 && qty >= order.quantity()
1931 {
1932 log::debug!(
1933 "Ignoring fill as already filled pending pending application of events: {}, {}, {}, {}",
1934 qty,
1935 order.quantity(),
1936 order.filled_qty(),
1937 order.leaves_qty(),
1938 );
1939 return;
1940 }
1941
1942 if order
1943 .liquidity_side()
1944 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
1945 {
1946 if order.order_side() == OrderSide::Buy
1947 && self.core.bid.is_some_and(|bid| bid == order_price)
1948 && !self.fill_model.is_limit_filled()
1949 {
1950 return;
1952 }
1953 if order.order_side() == OrderSide::Sell
1954 && self.core.ask.is_some_and(|ask| ask == order_price)
1955 && !self.fill_model.is_limit_filled()
1956 {
1957 return;
1959 }
1960 }
1961
1962 let venue_position_id = self.ids_generator.get_position_id(order, None);
1963 let position = if let Some(venue_position_id) = venue_position_id {
1964 let cache = self.cache.as_ref().borrow();
1965 cache.position(&venue_position_id).cloned()
1966 } else {
1967 None
1968 };
1969
1970 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
1971 log::warn!(
1972 "Canceling REDUCE_ONLY {} as would increase position",
1973 order.order_type()
1974 );
1975 self.cancel_order(order, None);
1976 return;
1977 }
1978
1979 let fills = self.determine_limit_price_and_volume(order);
1980
1981 if fills.is_empty() {
1985 return;
1986 }
1987
1988 self.apply_fills(
1989 order,
1990 fills,
1991 order.liquidity_side().unwrap(),
1992 venue_position_id,
1993 position,
1994 );
1995 }
1996 None => panic!("Limit order must have a price"),
1997 }
1998 }
1999
2000 fn apply_fills(
2001 &mut self,
2002 order: &mut OrderAny,
2003 fills: Vec<(Price, Quantity)>,
2004 liquidity_side: LiquiditySide,
2005 venue_position_id: Option<PositionId>,
2006 position: Option<Position>,
2007 ) {
2008 if order.time_in_force() == TimeInForce::Fok {
2009 let mut total_size = Quantity::zero(order.quantity().precision);
2010 for (fill_px, fill_qty) in &fills {
2011 total_size = total_size.add(*fill_qty);
2012 }
2013
2014 if order.leaves_qty() > total_size {
2015 self.cancel_order(order, None);
2016 return;
2017 }
2018 }
2019
2020 if fills.is_empty() {
2021 if order.status() == OrderStatus::Submitted {
2022 self.generate_order_rejected(
2023 order,
2024 format!("No market for {}", order.instrument_id()).into(),
2025 );
2026 } else {
2027 log::error!(
2028 "Cannot fill order: no fills from book when fills were expected (check size in data)"
2029 );
2030 return;
2031 }
2032 }
2033
2034 if self.oms_type == OmsType::Netting {
2035 let venue_position_id: Option<PositionId> = None;
2036 }
2037
2038 let mut initial_market_to_limit_fill = false;
2039
2040 for &(mut fill_px, ref fill_qty) in &fills {
2041 assert!(
2042 (fill_px.precision == self.instrument.price_precision()),
2043 "Invalid price precision for fill price {} when instrument price precision is {}.\
2044 Check that the data price precision matches the {} instrument",
2045 fill_px.precision,
2046 self.instrument.price_precision(),
2047 self.instrument.id()
2048 );
2049
2050 assert!(
2051 (fill_qty.precision == self.instrument.size_precision()),
2052 "Invalid quantity precision for fill quantity {} when instrument size precision is {}.\
2053 Check that the data quantity precision matches the {} instrument",
2054 fill_qty.precision,
2055 self.instrument.size_precision(),
2056 self.instrument.id()
2057 );
2058
2059 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
2060 && order.order_type() == OrderType::MarketToLimit
2061 {
2062 self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
2063 initial_market_to_limit_fill = true;
2064 }
2065
2066 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
2067 fill_px = match order.order_side().as_specified() {
2068 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
2069 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
2070 }
2071 }
2072
2073 let mut effective_fill_qty = *fill_qty;
2077
2078 if self.config.use_reduce_only
2079 && order.is_reduce_only()
2080 && let Some(position) = &position
2081 && *fill_qty > position.quantity
2082 {
2083 if position.quantity == Quantity::zero(position.quantity.precision) {
2084 return;
2086 }
2087
2088 let adjusted_fill_qty =
2090 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
2091
2092 effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
2094
2095 if order.quantity() != adjusted_fill_qty {
2097 self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
2098 }
2099 }
2100
2101 if fill_qty.is_zero() {
2102 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
2103 self.generate_order_rejected(
2104 order,
2105 format!("No market for {}", order.instrument_id()).into(),
2106 );
2107 }
2108 return;
2109 }
2110
2111 self.fill_order(
2112 order,
2113 fill_px,
2114 effective_fill_qty,
2115 liquidity_side,
2116 venue_position_id,
2117 position.clone(),
2118 );
2119
2120 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
2121 return;
2123 }
2124 }
2125
2126 if order.time_in_force() == TimeInForce::Ioc && order.is_open() {
2127 self.cancel_order(order, None);
2129 return;
2130 }
2131
2132 if order.is_open()
2133 && self.book_type == BookType::L1_MBP
2134 && matches!(
2135 order.order_type(),
2136 OrderType::Market
2137 | OrderType::MarketIfTouched
2138 | OrderType::StopMarket
2139 | OrderType::TrailingStopMarket
2140 )
2141 {
2142 todo!("Exhausted simulated book volume")
2146 }
2147 }
2148
2149 fn fill_order(
2150 &mut self,
2151 order: &mut OrderAny,
2152 last_px: Price,
2153 last_qty: Quantity,
2154 liquidity_side: LiquiditySide,
2155 venue_position_id: Option<PositionId>,
2156 position: Option<Position>,
2157 ) {
2158 let size_prec = self.instrument.size_precision();
2159 let instrument_id = self.instrument.id();
2160 assert!(
2161 last_qty.precision == size_prec,
2162 "Invalid fill quantity precision {}, expected {size_prec} for {instrument_id}",
2163 last_qty.precision
2164 );
2165
2166 match self.cached_filled_qty.get(&order.client_order_id()) {
2167 Some(filled_qty) => {
2168 let leaves_qty = order.quantity().saturating_sub(*filled_qty);
2170 let last_qty = min(last_qty, leaves_qty);
2171 let new_filled_qty = *filled_qty + last_qty;
2172 self.cached_filled_qty
2174 .insert(order.client_order_id(), new_filled_qty);
2175 }
2176 None => {
2177 self.cached_filled_qty
2178 .insert(order.client_order_id(), last_qty);
2179 }
2180 }
2181
2182 let commission = self
2184 .fee_model
2185 .get_commission(order, last_qty, last_px, &self.instrument)
2186 .unwrap();
2187
2188 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2189 self.generate_order_filled(
2190 order,
2191 venue_order_id,
2192 venue_position_id,
2193 last_qty,
2194 last_px,
2195 self.instrument.quote_currency(),
2196 commission,
2197 liquidity_side,
2198 );
2199
2200 if order.is_passive() && order.is_closed() {
2201 if self.core.order_exists(order.client_order_id()) {
2203 let _ = self.core.delete_order(
2204 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
2205 );
2206 }
2207 self.cached_filled_qty.remove(&order.client_order_id());
2208 }
2209
2210 if !self.config.support_contingent_orders {
2211 return;
2212 }
2213
2214 if let Some(contingency_type) = order.contingency_type() {
2215 match contingency_type {
2216 ContingencyType::Oto => {
2217 if let Some(linked_orders_ids) = order.linked_order_ids() {
2218 for client_order_id in linked_orders_ids {
2219 let mut child_order = match self.cache.borrow().order(client_order_id) {
2220 Some(child_order) => child_order.clone(),
2221 None => panic!("Order {client_order_id} not found in cache"),
2222 };
2223
2224 if child_order.is_closed() || child_order.is_active_local() {
2225 continue;
2226 }
2227
2228 if let (None, Some(position_id)) =
2230 (child_order.position_id(), order.position_id())
2231 {
2232 self.cache
2233 .borrow_mut()
2234 .add_position_id(
2235 &position_id,
2236 &self.venue,
2237 client_order_id,
2238 &child_order.strategy_id(),
2239 )
2240 .unwrap();
2241 log::debug!(
2242 "Added position id {position_id} to cache for order {client_order_id}"
2243 );
2244 }
2245
2246 if (!child_order.is_open())
2247 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
2248 && child_order
2249 .previous_status()
2250 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
2251 {
2252 let account_id = order.account_id().unwrap_or_else(|| {
2253 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
2254 panic!(
2255 "Account ID not found for trader {}",
2256 order.trader_id()
2257 )
2258 })
2259 });
2260 self.process_order(&mut child_order, account_id);
2261 }
2262 }
2263 } else {
2264 log::error!(
2265 "OTO order {} does not have linked orders",
2266 order.client_order_id()
2267 );
2268 }
2269 }
2270 ContingencyType::Oco => {
2271 if let Some(linked_orders_ids) = order.linked_order_ids() {
2272 for client_order_id in linked_orders_ids {
2273 let child_order = match self.cache.borrow().order(client_order_id) {
2274 Some(child_order) => child_order.clone(),
2275 None => panic!("Order {client_order_id} not found in cache"),
2276 };
2277
2278 if child_order.is_closed() || child_order.is_active_local() {
2279 continue;
2280 }
2281
2282 self.cancel_order(&child_order, None);
2283 }
2284 } else {
2285 log::error!(
2286 "OCO order {} does not have linked orders",
2287 order.client_order_id()
2288 );
2289 }
2290 }
2291 ContingencyType::Ouo => {
2292 if let Some(linked_orders_ids) = order.linked_order_ids() {
2293 for client_order_id in linked_orders_ids {
2294 let mut child_order = match self.cache.borrow().order(client_order_id) {
2295 Some(child_order) => child_order.clone(),
2296 None => panic!("Order {client_order_id} not found in cache"),
2297 };
2298
2299 if child_order.is_active_local() {
2300 continue;
2301 }
2302
2303 if order.is_closed() && child_order.is_open() {
2304 self.cancel_order(&child_order, None);
2305 } else if !order.leaves_qty().is_zero()
2306 && order.leaves_qty() != child_order.leaves_qty()
2307 {
2308 let price = child_order.price();
2309 let trigger_price = child_order.trigger_price();
2310 self.update_order(
2311 &mut child_order,
2312 Some(order.leaves_qty()),
2313 price,
2314 trigger_price,
2315 Some(false),
2316 );
2317 }
2318 }
2319 } else {
2320 log::error!(
2321 "OUO order {} does not have linked orders",
2322 order.client_order_id()
2323 );
2324 }
2325 }
2326 _ => {}
2327 }
2328 }
2329 }
2330
2331 fn update_limit_order(&mut self, order: &mut OrderAny, quantity: Quantity, price: Price) {
2332 if self
2333 .core
2334 .is_limit_matched(order.order_side_specified(), price)
2335 {
2336 if order.is_post_only() {
2337 self.generate_order_modify_rejected(
2338 order.trader_id(),
2339 order.strategy_id(),
2340 order.instrument_id(),
2341 order.client_order_id(),
2342 Ustr::from(format!(
2343 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2344 order.order_type(),
2345 order.order_side(),
2346 price,
2347 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2348 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2349 ).as_str()),
2350 order.venue_order_id(),
2351 order.account_id(),
2352 );
2353 return;
2354 }
2355
2356 self.generate_order_updated(order, quantity, Some(price), None, None);
2357 order.set_liquidity_side(LiquiditySide::Taker);
2358 self.fill_limit_order(order);
2359 return;
2360 }
2361 self.generate_order_updated(order, quantity, Some(price), None, None);
2362 }
2363
2364 fn update_stop_market_order(
2365 &mut self,
2366 order: &mut OrderAny,
2367 quantity: Quantity,
2368 trigger_price: Price,
2369 ) {
2370 if self
2371 .core
2372 .is_stop_matched(order.order_side_specified(), trigger_price)
2373 {
2374 self.generate_order_modify_rejected(
2375 order.trader_id(),
2376 order.strategy_id(),
2377 order.instrument_id(),
2378 order.client_order_id(),
2379 Ustr::from(
2380 format!(
2381 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
2382 order.order_type(),
2383 order.order_side(),
2384 trigger_price,
2385 self.core
2386 .bid
2387 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2388 self.core
2389 .ask
2390 .map_or_else(|| "None".to_string(), |p| p.to_string())
2391 )
2392 .as_str(),
2393 ),
2394 order.venue_order_id(),
2395 order.account_id(),
2396 );
2397 return;
2398 }
2399
2400 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
2401 }
2402
2403 fn update_stop_limit_order(
2404 &mut self,
2405 order: &mut OrderAny,
2406 quantity: Quantity,
2407 price: Price,
2408 trigger_price: Price,
2409 ) {
2410 if order.is_triggered().is_some_and(|t| t) {
2411 if self
2413 .core
2414 .is_limit_matched(order.order_side_specified(), price)
2415 {
2416 if order.is_post_only() {
2417 self.generate_order_modify_rejected(
2418 order.trader_id(),
2419 order.strategy_id(),
2420 order.instrument_id(),
2421 order.client_order_id(),
2422 Ustr::from(format!(
2423 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2424 order.order_type(),
2425 order.order_side(),
2426 price,
2427 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2428 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2429 ).as_str()),
2430 order.venue_order_id(),
2431 order.account_id(),
2432 );
2433 return;
2434 }
2435 self.generate_order_updated(order, quantity, Some(price), None, None);
2436 order.set_liquidity_side(LiquiditySide::Taker);
2437 self.fill_limit_order(order);
2438 return; }
2440 } else {
2441 if self
2443 .core
2444 .is_stop_matched(order.order_side_specified(), trigger_price)
2445 {
2446 self.generate_order_modify_rejected(
2447 order.trader_id(),
2448 order.strategy_id(),
2449 order.instrument_id(),
2450 order.client_order_id(),
2451 Ustr::from(
2452 format!(
2453 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
2454 order.order_type(),
2455 order.order_side(),
2456 trigger_price,
2457 self.core
2458 .bid
2459 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2460 self.core
2461 .ask
2462 .map_or_else(|| "None".to_string(), |p| p.to_string())
2463 )
2464 .as_str(),
2465 ),
2466 order.venue_order_id(),
2467 order.account_id(),
2468 );
2469 return;
2470 }
2471 }
2472
2473 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
2474 }
2475
2476 fn update_market_if_touched_order(
2477 &mut self,
2478 order: &mut OrderAny,
2479 quantity: Quantity,
2480 trigger_price: Price,
2481 ) {
2482 if self
2483 .core
2484 .is_touch_triggered(order.order_side_specified(), trigger_price)
2485 {
2486 self.generate_order_modify_rejected(
2487 order.trader_id(),
2488 order.strategy_id(),
2489 order.instrument_id(),
2490 order.client_order_id(),
2491 Ustr::from(
2492 format!(
2493 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2494 order.order_type(),
2495 order.order_side(),
2496 trigger_price,
2497 self.core
2498 .bid
2499 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2500 self.core
2501 .ask
2502 .map_or_else(|| "None".to_string(), |p| p.to_string())
2503 )
2504 .as_str(),
2505 ),
2506 order.venue_order_id(),
2507 order.account_id(),
2508 );
2509 return;
2511 }
2512
2513 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
2514 }
2515
2516 fn update_limit_if_touched_order(
2517 &mut self,
2518 order: &mut OrderAny,
2519 quantity: Quantity,
2520 price: Price,
2521 trigger_price: Price,
2522 ) {
2523 if order.is_triggered().is_some_and(|t| t) {
2524 if self
2526 .core
2527 .is_limit_matched(order.order_side_specified(), price)
2528 {
2529 if order.is_post_only() {
2530 self.generate_order_modify_rejected(
2531 order.trader_id(),
2532 order.strategy_id(),
2533 order.instrument_id(),
2534 order.client_order_id(),
2535 Ustr::from(format!(
2536 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
2537 order.order_type(),
2538 order.order_side(),
2539 price,
2540 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
2541 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
2542 ).as_str()),
2543 order.venue_order_id(),
2544 order.account_id(),
2545 );
2546 return;
2548 }
2549 self.generate_order_updated(order, quantity, Some(price), None, None);
2550 order.set_liquidity_side(LiquiditySide::Taker);
2551 self.fill_limit_order(order);
2552 return;
2553 }
2554 } else {
2555 if self
2557 .core
2558 .is_touch_triggered(order.order_side_specified(), trigger_price)
2559 {
2560 self.generate_order_modify_rejected(
2561 order.trader_id(),
2562 order.strategy_id(),
2563 order.instrument_id(),
2564 order.client_order_id(),
2565 Ustr::from(
2566 format!(
2567 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
2568 order.order_type(),
2569 order.order_side(),
2570 trigger_price,
2571 self.core
2572 .bid
2573 .map_or_else(|| "None".to_string(), |p| p.to_string()),
2574 self.core
2575 .ask
2576 .map_or_else(|| "None".to_string(), |p| p.to_string())
2577 )
2578 .as_str(),
2579 ),
2580 order.venue_order_id(),
2581 order.account_id(),
2582 );
2583 return;
2584 }
2585 }
2586
2587 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
2588 }
2589
2590 fn update_trailing_stop_order(&mut self, order: &mut OrderAny) {
2591 let (new_trigger_price, new_price) = trailing_stop_calculate(
2592 self.instrument.price_increment(),
2593 order.trigger_price(),
2594 order.activation_price(),
2595 order,
2596 self.core.bid,
2597 self.core.ask,
2598 self.core.last,
2599 )
2600 .unwrap();
2601
2602 if new_trigger_price.is_none() && new_price.is_none() {
2603 return;
2604 }
2605
2606 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
2607 }
2608
2609 fn update_protection_price(&mut self, order: &mut OrderAny) {
2610 let protection_price = protection_price_calculate(
2611 self.instrument.price_increment(),
2612 order,
2613 self.config.price_protection_points,
2614 self.core.bid,
2615 self.core.ask,
2616 );
2617
2618 if let Ok(protection_price) = protection_price {
2619 self.generate_order_updated(
2620 order,
2621 order.quantity(),
2622 None,
2623 None,
2624 Some(protection_price),
2625 );
2626 }
2627 }
2628
2629 fn accept_order(&mut self, order: &mut OrderAny) {
2632 if order.is_closed() {
2633 return;
2635 }
2636 if order.status() != OrderStatus::Accepted {
2637 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2638 self.generate_order_accepted(order, venue_order_id);
2639
2640 if matches!(
2641 order.order_type(),
2642 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
2643 ) && order.trigger_price().is_none()
2644 {
2645 self.update_trailing_stop_order(order);
2646 }
2647 }
2648
2649 let _ = self.core.add_order(
2650 PassiveOrderAny::try_from(order.to_owned()).expect("passive order conversion"),
2651 );
2652 }
2653
2654 fn expire_order(&mut self, order: &PassiveOrderAny) {
2655 if self.config.support_contingent_orders
2656 && order
2657 .contingency_type()
2658 .is_some_and(|c| c != ContingencyType::NoContingency)
2659 {
2660 self.cancel_contingent_orders(&OrderAny::from(order.clone()));
2661 }
2662
2663 self.generate_order_expired(&order.to_any());
2664 }
2665
2666 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
2667 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
2668 if order.is_active_local() {
2669 log::error!(
2670 "Cannot cancel an order with {} from the matching engine",
2671 order.status()
2672 );
2673 return;
2674 }
2675
2676 if self.core.order_exists(order.client_order_id()) {
2678 let _ = self.core.delete_order(
2679 &PassiveOrderAny::try_from(order.clone()).expect("passive order conversion"),
2680 );
2681 }
2682 self.cached_filled_qty.remove(&order.client_order_id());
2683
2684 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
2685 self.generate_order_canceled(order, venue_order_id);
2686
2687 if self.config.support_contingent_orders
2688 && order.contingency_type().is_some()
2689 && order.contingency_type().unwrap() != ContingencyType::NoContingency
2690 && cancel_contingencies
2691 {
2692 self.cancel_contingent_orders(order);
2693 }
2694 }
2695
2696 fn update_order(
2697 &mut self,
2698 order: &mut OrderAny,
2699 quantity: Option<Quantity>,
2700 price: Option<Price>,
2701 trigger_price: Option<Price>,
2702 update_contingencies: Option<bool>,
2703 ) {
2704 let update_contingencies = update_contingencies.unwrap_or(true);
2705 let quantity = quantity.unwrap_or(order.quantity());
2706
2707 let price_prec = self.instrument.price_precision();
2708 let size_prec = self.instrument.size_precision();
2709 let instrument_id = self.instrument.id();
2710 if quantity.precision != size_prec {
2711 self.generate_order_modify_rejected(
2712 order.trader_id(),
2713 order.strategy_id(),
2714 order.instrument_id(),
2715 order.client_order_id(),
2716 Ustr::from(&format!(
2717 "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
2718 quantity.precision
2719 )),
2720 order.venue_order_id(),
2721 order.account_id(),
2722 );
2723 return;
2724 }
2725 if let Some(px) = price
2726 && px.precision != price_prec
2727 {
2728 self.generate_order_modify_rejected(
2729 order.trader_id(),
2730 order.strategy_id(),
2731 order.instrument_id(),
2732 order.client_order_id(),
2733 Ustr::from(&format!(
2734 "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
2735 px.precision
2736 )),
2737 order.venue_order_id(),
2738 order.account_id(),
2739 );
2740 return;
2741 }
2742 if let Some(tp) = trigger_price
2743 && tp.precision != price_prec
2744 {
2745 self.generate_order_modify_rejected(
2746 order.trader_id(),
2747 order.strategy_id(),
2748 order.instrument_id(),
2749 order.client_order_id(),
2750 Ustr::from(&format!(
2751 "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
2752 tp.precision
2753 )),
2754 order.venue_order_id(),
2755 order.account_id(),
2756 );
2757 return;
2758 }
2759
2760 let filled_qty = self
2762 .cached_filled_qty
2763 .get(&order.client_order_id())
2764 .copied()
2765 .unwrap_or(order.filled_qty());
2766 if quantity < filled_qty {
2767 self.generate_order_modify_rejected(
2768 order.trader_id(),
2769 order.strategy_id(),
2770 order.instrument_id(),
2771 order.client_order_id(),
2772 Ustr::from(&format!(
2773 "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
2774 )),
2775 order.venue_order_id(),
2776 order.account_id(),
2777 );
2778 return;
2779 }
2780
2781 match order {
2782 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
2783 let price = price.unwrap_or(order.price().unwrap());
2784 self.update_limit_order(order, quantity, price);
2785 }
2786 OrderAny::StopMarket(_) => {
2787 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2788 self.update_stop_market_order(order, quantity, trigger_price);
2789 }
2790 OrderAny::StopLimit(_) => {
2791 let price = price.unwrap_or(order.price().unwrap());
2792 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2793 self.update_stop_limit_order(order, quantity, price, trigger_price);
2794 }
2795 OrderAny::MarketIfTouched(_) => {
2796 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2797 self.update_market_if_touched_order(order, quantity, trigger_price);
2798 }
2799 OrderAny::LimitIfTouched(_) => {
2800 let price = price.unwrap_or(order.price().unwrap());
2801 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2802 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2803 }
2804 OrderAny::TrailingStopMarket(_) => {
2805 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
2806 self.update_market_if_touched_order(order, quantity, trigger_price);
2807 }
2808 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
2809 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
2810 let trigger_price =
2811 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
2812 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
2813 }
2814 _ => {
2815 panic!(
2816 "Unsupported order type {} for update_order",
2817 order.order_type()
2818 );
2819 }
2820 }
2821
2822 let new_leaves_qty = quantity.saturating_sub(filled_qty);
2824 if new_leaves_qty.is_zero() {
2825 if self.config.support_contingent_orders
2826 && order
2827 .contingency_type()
2828 .is_some_and(|c| c != ContingencyType::NoContingency)
2829 && update_contingencies
2830 {
2831 self.update_contingent_order(order);
2832 }
2833 self.cancel_order(order, Some(false));
2835 return;
2836 }
2837
2838 if self.config.support_contingent_orders
2839 && order
2840 .contingency_type()
2841 .is_some_and(|c| c != ContingencyType::NoContingency)
2842 && update_contingencies
2843 {
2844 self.update_contingent_order(order);
2845 }
2846 }
2847
2848 pub fn trigger_stop_order(&mut self, order: &mut OrderAny) {
2850 todo!("trigger_stop_order")
2851 }
2852
2853 fn update_contingent_order(&mut self, order: &OrderAny) {
2854 log::debug!("Updating OUO orders from {}", order.client_order_id());
2855 if let Some(linked_order_ids) = order.linked_order_ids() {
2856 let parent_filled_qty = self
2857 .cached_filled_qty
2858 .get(&order.client_order_id())
2859 .copied()
2860 .unwrap_or(order.filled_qty());
2861 let parent_leaves_qty = order.quantity().saturating_sub(parent_filled_qty);
2862
2863 for client_order_id in linked_order_ids {
2864 let mut child_order = match self.cache.borrow().order(client_order_id) {
2865 Some(order) => order.clone(),
2866 None => panic!("Order {client_order_id} not found in cache."),
2867 };
2868
2869 if child_order.is_active_local() {
2870 continue;
2871 }
2872
2873 let child_filled_qty = self
2874 .cached_filled_qty
2875 .get(&child_order.client_order_id())
2876 .copied()
2877 .unwrap_or(child_order.filled_qty());
2878
2879 if parent_leaves_qty.is_zero() {
2880 self.cancel_order(&child_order, Some(false));
2881 } else if child_filled_qty >= parent_leaves_qty {
2882 self.cancel_order(&child_order, Some(false));
2884 } else {
2885 let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
2886 if child_leaves_qty != parent_leaves_qty {
2887 let price = child_order.price();
2888 let trigger_price = child_order.trigger_price();
2889 self.update_order(
2890 &mut child_order,
2891 Some(parent_leaves_qty),
2892 price,
2893 trigger_price,
2894 Some(false),
2895 );
2896 }
2897 }
2898 }
2899 }
2900 }
2901
2902 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
2903 if let Some(linked_order_ids) = order.linked_order_ids() {
2904 for client_order_id in linked_order_ids {
2905 let contingent_order = match self.cache.borrow().order(client_order_id) {
2906 Some(order) => order.clone(),
2907 None => panic!("Cannot find contingent order for {client_order_id}"),
2908 };
2909 if contingent_order.is_active_local() {
2910 continue;
2912 }
2913 if !contingent_order.is_closed() {
2914 self.cancel_order(&contingent_order, Some(false));
2915 }
2916 }
2917 }
2918 }
2919
2920 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
2923 let ts_now = self.clock.borrow().timestamp_ns();
2924 let account_id = order
2925 .account_id()
2926 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2927
2928 let due_post_only = reason.as_str().starts_with("POST_ONLY");
2930
2931 let event = OrderEventAny::Rejected(OrderRejected::new(
2932 order.trader_id(),
2933 order.strategy_id(),
2934 order.instrument_id(),
2935 order.client_order_id(),
2936 account_id,
2937 reason,
2938 UUID4::new(),
2939 ts_now,
2940 ts_now,
2941 false,
2942 due_post_only,
2943 ));
2944 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2945 }
2946
2947 fn generate_order_accepted(&self, order: &mut OrderAny, venue_order_id: VenueOrderId) {
2948 let ts_now = self.clock.borrow().timestamp_ns();
2949 let account_id = order
2950 .account_id()
2951 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
2952 let event = OrderEventAny::Accepted(OrderAccepted::new(
2953 order.trader_id(),
2954 order.strategy_id(),
2955 order.instrument_id(),
2956 order.client_order_id(),
2957 venue_order_id,
2958 account_id,
2959 UUID4::new(),
2960 ts_now,
2961 ts_now,
2962 false,
2963 ));
2964 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2965
2966 order.apply(event).expect("Failed to apply order event");
2968 }
2969
2970 #[allow(clippy::too_many_arguments)]
2971 fn generate_order_modify_rejected(
2972 &self,
2973 trader_id: TraderId,
2974 strategy_id: StrategyId,
2975 instrument_id: InstrumentId,
2976 client_order_id: ClientOrderId,
2977 reason: Ustr,
2978 venue_order_id: Option<VenueOrderId>,
2979 account_id: Option<AccountId>,
2980 ) {
2981 let ts_now = self.clock.borrow().timestamp_ns();
2982 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
2983 trader_id,
2984 strategy_id,
2985 instrument_id,
2986 client_order_id,
2987 reason,
2988 UUID4::new(),
2989 ts_now,
2990 ts_now,
2991 false,
2992 venue_order_id,
2993 account_id,
2994 ));
2995 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
2996 }
2997
2998 #[allow(clippy::too_many_arguments)]
2999 fn generate_order_cancel_rejected(
3000 &self,
3001 trader_id: TraderId,
3002 strategy_id: StrategyId,
3003 account_id: AccountId,
3004 instrument_id: InstrumentId,
3005 client_order_id: ClientOrderId,
3006 venue_order_id: Option<VenueOrderId>,
3007 reason: Ustr,
3008 ) {
3009 let ts_now = self.clock.borrow().timestamp_ns();
3010 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
3011 trader_id,
3012 strategy_id,
3013 instrument_id,
3014 client_order_id,
3015 reason,
3016 UUID4::new(),
3017 ts_now,
3018 ts_now,
3019 false,
3020 venue_order_id,
3021 Some(account_id),
3022 ));
3023 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3024 }
3025
3026 fn generate_order_updated(
3027 &self,
3028 order: &mut OrderAny,
3029 quantity: Quantity,
3030 price: Option<Price>,
3031 trigger_price: Option<Price>,
3032 protection_price: Option<Price>,
3033 ) {
3034 let ts_now = self.clock.borrow().timestamp_ns();
3035 let event = OrderEventAny::Updated(OrderUpdated::new(
3036 order.trader_id(),
3037 order.strategy_id(),
3038 order.instrument_id(),
3039 order.client_order_id(),
3040 quantity,
3041 UUID4::new(),
3042 ts_now,
3043 ts_now,
3044 false,
3045 order.venue_order_id(),
3046 order.account_id(),
3047 price,
3048 trigger_price,
3049 protection_price,
3050 ));
3051 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3052
3053 order.apply(event).expect("Failed to apply order event");
3055 }
3056
3057 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
3058 let ts_now = self.clock.borrow().timestamp_ns();
3059 let event = OrderEventAny::Canceled(OrderCanceled::new(
3060 order.trader_id(),
3061 order.strategy_id(),
3062 order.instrument_id(),
3063 order.client_order_id(),
3064 UUID4::new(),
3065 ts_now,
3066 ts_now,
3067 false,
3068 Some(venue_order_id),
3069 order.account_id(),
3070 ));
3071 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3072 }
3073
3074 fn generate_order_triggered(&self, order: &OrderAny) {
3075 let ts_now = self.clock.borrow().timestamp_ns();
3076 let event = OrderEventAny::Triggered(OrderTriggered::new(
3077 order.trader_id(),
3078 order.strategy_id(),
3079 order.instrument_id(),
3080 order.client_order_id(),
3081 UUID4::new(),
3082 ts_now,
3083 ts_now,
3084 false,
3085 order.venue_order_id(),
3086 order.account_id(),
3087 ));
3088 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3089 }
3090
3091 fn generate_order_expired(&self, order: &OrderAny) {
3092 let ts_now = self.clock.borrow().timestamp_ns();
3093 let event = OrderEventAny::Expired(OrderExpired::new(
3094 order.trader_id(),
3095 order.strategy_id(),
3096 order.instrument_id(),
3097 order.client_order_id(),
3098 UUID4::new(),
3099 ts_now,
3100 ts_now,
3101 false,
3102 order.venue_order_id(),
3103 order.account_id(),
3104 ));
3105 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3106 }
3107
3108 #[allow(clippy::too_many_arguments)]
3109 fn generate_order_filled(
3110 &mut self,
3111 order: &mut OrderAny,
3112 venue_order_id: VenueOrderId,
3113 venue_position_id: Option<PositionId>,
3114 last_qty: Quantity,
3115 last_px: Price,
3116 quote_currency: Currency,
3117 commission: Money,
3118 liquidity_side: LiquiditySide,
3119 ) {
3120 debug_assert!(
3121 last_qty <= order.quantity(),
3122 "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
3123 order_qty = order.quantity(),
3124 client_order_id = order.client_order_id()
3125 );
3126
3127 let ts_now = self.clock.borrow().timestamp_ns();
3128 let account_id = order
3129 .account_id()
3130 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
3131 let event = OrderEventAny::Filled(OrderFilled::new(
3132 order.trader_id(),
3133 order.strategy_id(),
3134 order.instrument_id(),
3135 order.client_order_id(),
3136 venue_order_id,
3137 account_id,
3138 self.ids_generator.generate_trade_id(),
3139 order.order_side(),
3140 order.order_type(),
3141 last_qty,
3142 last_px,
3143 quote_currency,
3144 liquidity_side,
3145 UUID4::new(),
3146 ts_now,
3147 ts_now,
3148 false,
3149 venue_position_id,
3150 Some(commission),
3151 ));
3152 msgbus::send_any("ExecEngine.process".into(), &event as &dyn Any);
3153
3154 order.apply(event).expect("Failed to apply order event");
3156 }
3157}