1use std::{
17 cell::RefCell,
18 cmp::min,
19 fmt::Debug,
20 ops::{Add, Sub},
21 rc::Rc,
22};
23
24use chrono::TimeDelta;
25use indexmap::{IndexMap, IndexSet};
26use nautilus_common::{
27 cache::Cache,
28 clock::Clock,
29 messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
30 msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{UUID4, UnixNanos};
33use nautilus_model::{
34 data::{
35 Bar, BarType, InstrumentClose, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
36 QuoteTick, TradeTick, order::BookOrder,
37 },
38 enums::{
39 AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
40 InstrumentCloseType, LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OptionKind,
41 OrderSide, OrderSideSpecified, OrderStatus, OrderType, PositionSide, PriceType,
42 TimeInForce, TriggerType,
43 },
44 events::{
45 OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
46 OrderFilled, OrderModifyRejected, OrderRejected, OrderSubmitted, OrderTriggered,
47 OrderUpdated,
48 },
49 identifiers::{
50 AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId, Venue,
51 VenueOrderId,
52 },
53 instruments::{Instrument, InstrumentAny},
54 orderbook::OrderBook,
55 orders::{MarketOrder, Order, OrderAny, OrderCore},
56 position::Position,
57 types::{
58 Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
59 quantity::QuantityRaw,
60 },
61};
62use ustr::Ustr;
63
64use crate::{
65 matching_core::{MatchAction, OrderMatchingCore, RestingOrder},
66 matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
67 models::{
68 fee::{FeeModel, FeeModelAny},
69 fill::{FillModel, FillModelAny},
70 },
71 protection::protection_price_calculate,
72 trailing::trailing_stop_calculate,
73};
74
75pub struct OrderMatchingEngine {
77 pub venue: Venue,
79 pub instrument: InstrumentAny,
81 pub raw_id: u32,
83 pub book_type: BookType,
85 pub oms_type: OmsType,
87 pub account_type: AccountType,
89 pub market_status: MarketStatus,
91 pub config: OrderMatchingEngineConfig,
93 core: OrderMatchingCore,
94 clock: Rc<RefCell<dyn Clock>>,
95 cache: Rc<RefCell<Cache>>,
96 book: OrderBook,
97 fill_model: FillModelAny,
98 fee_model: FeeModelAny,
99 event_handler: Option<Rc<dyn Fn(OrderEventAny)>>,
100 target_bid: Option<Price>,
101 target_ask: Option<Price>,
102 target_last: Option<Price>,
103 last_bar_bid: Option<Bar>,
104 last_bar_ask: Option<Bar>,
105 fill_at_market: bool,
106 execution_bar_types: IndexMap<InstrumentId, BarType>,
107 execution_bar_deltas: IndexMap<BarType, TimeDelta>,
108 account_ids: IndexMap<TraderId, AccountId>,
109 cached_filled_qty: IndexMap<ClientOrderId, Quantity>,
110 ids_generator: IdsGenerator,
111 last_trade_size: Option<Quantity>,
112 bid_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
113 ask_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
114 trade_consumption: QuantityRaw,
115 queue_ahead: IndexMap<ClientOrderId, (PriceRaw, QuantityRaw)>,
116 queue_excess: IndexMap<ClientOrderId, QuantityRaw>,
117 queue_pending: IndexMap<ClientOrderId, PriceRaw>,
118 prev_bid_price_raw: PriceRaw,
119 prev_bid_size_raw: QuantityRaw,
120 prev_ask_price_raw: PriceRaw,
121 prev_ask_size_raw: QuantityRaw,
122 last_quote_bid: Option<Price>,
123 last_quote_ask: Option<Price>,
124 precision_mismatch_streak: u32,
125 tob_initialized: bool,
126 instrument_close: Option<InstrumentClose>,
127 pending_resolution: bool,
128 settlement_price: Option<Price>,
129 expiration_processed: bool,
130}
131
132impl Debug for OrderMatchingEngine {
133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134 f.debug_struct(stringify!(OrderMatchingEngine))
135 .field("venue", &self.venue)
136 .field("instrument", &self.instrument.id())
137 .finish()
138 }
139}
140
141impl OrderMatchingEngine {
142 #[expect(clippy::too_many_arguments)]
144 pub fn new(
145 instrument: InstrumentAny,
146 raw_id: u32,
147 fill_model: FillModelAny,
148 fee_model: FeeModelAny,
149 book_type: BookType,
150 oms_type: OmsType,
151 account_type: AccountType,
152 clock: Rc<RefCell<dyn Clock>>,
153 cache: Rc<RefCell<Cache>>,
154 config: OrderMatchingEngineConfig,
155 ) -> Self {
156 let book = OrderBook::new(instrument.id(), book_type);
157 let mut core = OrderMatchingCore::new(instrument.id(), instrument.price_increment());
158 core.set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
159 let ids_generator = IdsGenerator::new(
160 instrument.id().venue,
161 oms_type,
162 raw_id,
163 config.use_random_ids,
164 config.use_position_ids,
165 cache.clone(),
166 );
167
168 Self {
169 venue: instrument.id().venue,
170 instrument,
171 raw_id,
172 fill_model,
173 fee_model,
174 event_handler: None,
175 book_type,
176 oms_type,
177 account_type,
178 clock,
179 cache,
180 book,
181 market_status: MarketStatus::Open,
182 config,
183 core,
184 target_bid: None,
185 target_ask: None,
186 target_last: None,
187 last_bar_bid: None,
188 last_bar_ask: None,
189 fill_at_market: true,
190 execution_bar_types: IndexMap::new(),
191 execution_bar_deltas: IndexMap::new(),
192 account_ids: IndexMap::new(),
193 cached_filled_qty: IndexMap::new(),
194 ids_generator,
195 last_trade_size: None,
196 bid_consumption: IndexMap::new(),
197 ask_consumption: IndexMap::new(),
198 trade_consumption: 0,
199 queue_ahead: IndexMap::new(),
200 queue_excess: IndexMap::new(),
201 queue_pending: IndexMap::new(),
202 prev_bid_price_raw: 0,
203 prev_bid_size_raw: 0,
204 prev_ask_price_raw: 0,
205 prev_ask_size_raw: 0,
206 last_quote_bid: None,
207 last_quote_ask: None,
208 precision_mismatch_streak: 0,
209 tob_initialized: false,
210 instrument_close: None,
211 pending_resolution: false,
212 settlement_price: None,
213 expiration_processed: false,
214 }
215 }
216
217 pub fn set_event_handler(&mut self, handler: Rc<dyn Fn(OrderEventAny)>) {
224 self.event_handler = Some(handler);
225 }
226
227 fn dispatch_order_event(&self, event: OrderEventAny) {
228 if let Some(handler) = &self.event_handler {
229 handler(event);
230 } else {
231 let endpoint = MessagingSwitchboard::exec_engine_process();
232 msgbus::send_order_event(endpoint, event);
233 }
234 }
235
236 pub fn reset(&mut self) {
242 self.book.reset();
243 self.execution_bar_types.clear();
244 self.execution_bar_deltas.clear();
245 self.account_ids.clear();
246 self.cached_filled_qty.clear();
247 self.core.reset();
248 self.target_bid = None;
249 self.target_ask = None;
250 self.target_last = None;
251 self.last_trade_size = None;
252 self.bid_consumption.clear();
253 self.ask_consumption.clear();
254 self.trade_consumption = 0;
255 self.queue_ahead.clear();
256 self.queue_excess.clear();
257 self.queue_pending.clear();
258 self.prev_bid_price_raw = 0;
259 self.prev_bid_size_raw = 0;
260 self.prev_ask_price_raw = 0;
261 self.prev_ask_size_raw = 0;
262 self.last_quote_bid = None;
263 self.last_quote_ask = None;
264 self.precision_mismatch_streak = 0;
265 self.tob_initialized = false;
266 self.instrument_close = None;
267 self.pending_resolution = false;
268 self.settlement_price = None;
269 self.expiration_processed = false;
270 self.fill_at_market = true;
271 self.ids_generator.reset();
272
273 log::info!("Reset {}", self.instrument.id());
274 }
275
276 fn apply_liquidity_consumption(
277 &mut self,
278 fills: Vec<(Price, Quantity)>,
279 order_side: OrderSide,
280 leaves_qty: Quantity,
281 book_prices: Option<&[Price]>,
282 ) -> Vec<(Price, Quantity)> {
283 if !self.config.liquidity_consumption {
284 return fills;
285 }
286
287 let consumption = match order_side {
288 OrderSide::Buy => &mut self.ask_consumption,
289 OrderSide::Sell => &mut self.bid_consumption,
290 _ => return fills,
291 };
292
293 let mut adjusted_fills = Vec::with_capacity(fills.len());
294 let mut remaining_qty = leaves_qty.raw;
295
296 for (fill_idx, (price, qty)) in fills.into_iter().enumerate() {
297 if remaining_qty == 0 {
298 break;
299 }
300
301 let book_price = book_prices
304 .and_then(|bp| bp.get(fill_idx).copied())
305 .unwrap_or(price);
306
307 let book_price_raw = book_price.raw;
308 let level_size = self
309 .book
310 .get_quantity_at_level(book_price, order_side, qty.precision);
311
312 let (original_size, consumed) = consumption
313 .entry(book_price_raw)
314 .or_insert((level_size.raw, 0));
315
316 if *original_size != level_size.raw {
318 *original_size = level_size.raw;
319 *consumed = 0;
320 }
321
322 let available = original_size.saturating_sub(*consumed);
323 if available == 0 {
324 continue;
325 }
326
327 let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
328 if adjusted_qty_raw == 0 {
329 continue;
330 }
331
332 *consumed += adjusted_qty_raw;
333 remaining_qty -= adjusted_qty_raw;
334
335 let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
336 adjusted_fills.push((price, adjusted_qty));
337 }
338
339 adjusted_fills
340 }
341
342 fn seed_trade_consumption(
343 &mut self,
344 trade_price_raw: PriceRaw,
345 trade_size_raw: QuantityRaw,
346 trade_ts_event: UnixNanos,
347 aggressor_side: AggressorSide,
348 ) {
349 if trade_size_raw == 0 {
350 return;
351 }
352
353 if self.book.ts_last > trade_ts_event {
356 return;
357 }
358
359 let consumption = match aggressor_side {
360 AggressorSide::Buyer => &mut self.ask_consumption,
361 AggressorSide::Seller => &mut self.bid_consumption,
362 AggressorSide::NoAggressor => return,
363 };
364
365 let levels: Vec<_> = match aggressor_side {
366 AggressorSide::Buyer => self
367 .book
368 .asks(None)
369 .take_while(|l| l.price.value.raw <= trade_price_raw)
370 .collect(),
371 AggressorSide::Seller => self
372 .book
373 .bids(None)
374 .take_while(|l| l.price.value.raw >= trade_price_raw)
375 .collect(),
376 _ => unreachable!(),
377 };
378
379 let mut remaining = trade_size_raw;
380 for level in &levels {
381 if remaining == 0 {
382 break;
383 }
384 let level_size = level.size_raw();
385 let entry = consumption
386 .entry(level.price.value.raw)
387 .or_insert((level_size, 0));
388
389 if entry.0 != level_size {
391 entry.0 = level_size;
392 entry.1 = 0;
393 }
394
395 let available = level_size.saturating_sub(entry.1);
396 let consume = min(remaining, available);
397 entry.1 += consume;
398 remaining -= consume;
399 }
400 }
401
402 pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
404 self.core
405 .set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
406 self.fill_model = fill_model;
407 }
408
409 pub fn set_settlement_price(&mut self, price: Price) {
410 self.settlement_price = Some(price);
411 }
412
413 fn snapshot_queue_position(&mut self, order: &OrderAny, price: Price) {
414 if !self.config.queue_position {
415 return;
416 }
417 let size_prec = self.instrument.size_precision();
418
419 let qty_ahead = self.book.get_quantity_at_level(
422 price,
423 OrderCore::opposite_side(order.order_side()),
424 size_prec,
425 );
426
427 let client_order_id = order.client_order_id();
428
429 self.queue_pending.shift_remove(&client_order_id);
431 self.queue_ahead.shift_remove(&client_order_id);
432
433 if self.book_type == BookType::L1_MBP && qty_ahead.raw == 0 {
438 let behind_bbo = match order.order_side() {
439 OrderSide::Buy => self.book.best_bid_price().is_some_and(|bid| price < bid),
440 OrderSide::Sell => self.book.best_ask_price().is_some_and(|ask| price > ask),
441 _ => false,
442 };
443
444 if behind_bbo {
445 self.queue_pending.insert(client_order_id, price.raw);
446 return;
447 }
448 }
449
450 self.queue_ahead
451 .insert(client_order_id, (price.raw, qty_ahead.raw));
452 }
453
454 fn decrement_queue_on_trade(
455 &mut self,
456 price_raw: PriceRaw,
457 trade_size_raw: QuantityRaw,
458 aggressor_side: AggressorSide,
459 ) {
460 if !self.config.queue_position {
461 return;
462 }
463
464 self.queue_excess.clear();
465
466 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
467 let mut entries: Vec<(ClientOrderId, QuantityRaw, QuantityRaw)> = Vec::new();
468 let mut stale: Vec<ClientOrderId> = Vec::new();
469
470 for client_order_id in keys {
471 let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
472 {
473 Some(v) => v,
474 None => continue,
475 };
476
477 let cache = self.cache.borrow();
478 let order_info = cache.order(&client_order_id).and_then(|order| {
479 if order.is_closed() {
480 None
481 } else {
482 Some((order.order_side(), order.leaves_qty().raw))
483 }
484 });
485 drop(cache);
486
487 let Some((order_side, leaves_raw)) = order_info else {
488 stale.push(client_order_id);
489 continue;
490 };
491
492 if order_price_raw != price_raw || ahead_raw == 0 {
493 continue;
494 }
495
496 let should_decrement = matches!(aggressor_side, AggressorSide::NoAggressor)
497 || (aggressor_side == AggressorSide::Buyer && order_side == OrderSide::Sell)
498 || (aggressor_side == AggressorSide::Seller && order_side == OrderSide::Buy);
499
500 if should_decrement {
501 entries.push((client_order_id, ahead_raw, leaves_raw));
502 }
503 }
504
505 for id in stale {
506 self.queue_ahead.shift_remove(&id);
507 }
508
509 entries.sort_by_key(|&(_, ahead, _)| ahead);
511
512 let mut remaining = trade_size_raw;
513 let mut prev_position: QuantityRaw = 0;
514
515 for (client_order_id, ahead_raw, leaves_raw) in &entries {
516 if remaining == 0 {
517 let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
518 self.queue_ahead
519 .insert(*client_order_id, (price_raw, new_ahead));
520 if new_ahead == 0 {
521 self.queue_excess.insert(*client_order_id, 0);
523 }
524 continue;
525 }
526
527 let gap = ahead_raw.saturating_sub(prev_position);
529 let queue_consumed = remaining.min(gap);
530 remaining -= queue_consumed;
531
532 if remaining == 0 && queue_consumed < gap {
533 let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
534 self.queue_ahead
535 .insert(*client_order_id, (price_raw, new_ahead));
536 continue;
537 }
538
539 self.queue_ahead.insert(*client_order_id, (price_raw, 0));
540 let excess = remaining.min(*leaves_raw);
541 self.queue_excess.insert(*client_order_id, excess);
542 remaining -= excess;
543 prev_position = ahead_raw + excess;
544 }
545 }
546
547 fn determine_trade_fill_qty(&self, order: &OrderAny) -> Option<QuantityRaw> {
548 if !self.config.queue_position {
549 return Some(order.leaves_qty().raw);
550 }
551
552 let client_order_id = order.client_order_id();
553
554 if self.queue_pending.contains_key(&client_order_id) {
556 return None;
557 }
558
559 if let Some(&(tracked_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
560 && let Some(order_price) = order.price()
561 && order_price.raw == tracked_price_raw
562 && ahead_raw > 0
563 {
564 return None;
565 }
566
567 let leaves_raw = order.leaves_qty().raw;
568 if leaves_raw == 0 {
569 return None;
570 }
571
572 let mut available_raw = leaves_raw;
573
574 if let Some(trade_size) = self.last_trade_size {
576 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
577 available_raw = available_raw.min(remaining);
578
579 if let Some(&excess_raw) = self.queue_excess.get(&client_order_id) {
580 if excess_raw == 0 {
581 return None;
582 }
583 available_raw = available_raw.min(excess_raw);
584 }
585 }
586
587 if available_raw == 0 {
588 return None;
589 }
590
591 Some(available_raw)
592 }
593
594 fn clear_all_queue_positions(&mut self) {
595 for (_, (_, ahead_raw)) in &mut self.queue_ahead {
596 *ahead_raw = 0;
597 }
598 }
599
600 fn clear_queue_on_delete(&mut self, deleted_price_raw: PriceRaw, deleted_side: OrderSide) {
601 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
602 for client_order_id in keys {
603 if let Some(&(order_price_raw, _)) = self.queue_ahead.get(&client_order_id)
604 && order_price_raw == deleted_price_raw
605 {
606 let matches_side = self
607 .cache
608 .borrow()
609 .order(&client_order_id)
610 .is_some_and(|o| o.order_side() == deleted_side);
611 if matches_side {
612 self.queue_ahead
613 .insert(client_order_id, (order_price_raw, 0));
614 }
615 }
616 }
617 }
618
619 fn cap_queue_ahead(
620 &mut self,
621 price_raw: PriceRaw,
622 size_raw: QuantityRaw,
623 order_side: OrderSide,
624 ) {
625 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
626 let mut stale: Vec<ClientOrderId> = Vec::new();
627
628 for client_order_id in keys {
629 let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
630 {
631 Some(v) => v,
632 None => continue,
633 };
634
635 if order_price_raw != price_raw || ahead_raw <= size_raw {
636 continue;
637 }
638
639 let cache = self.cache.borrow();
640 let order_info = cache.order(&client_order_id).and_then(|order| {
641 if order.is_closed() {
642 None
643 } else {
644 Some(order.order_side())
645 }
646 });
647 drop(cache);
648
649 let Some(side) = order_info else {
650 stale.push(client_order_id);
651 continue;
652 };
653
654 if side != order_side {
655 continue;
656 }
657
658 self.queue_ahead
659 .insert(client_order_id, (order_price_raw, size_raw));
660 }
661
662 for id in stale {
663 self.queue_ahead.shift_remove(&id);
664 }
665 }
666
667 fn seed_tob_baseline(&mut self) {
668 let bid = self.book.best_bid_price();
669 let ask = self.book.best_ask_price();
670 self.prev_bid_price_raw = bid.map_or(0, |p| p.raw);
671 self.prev_bid_size_raw = self.book.best_bid_size().map_or(0, |q| q.raw);
672 self.prev_ask_price_raw = ask.map_or(0, |p| p.raw);
673 self.prev_ask_size_raw = self.book.best_ask_size().map_or(0, |q| q.raw);
674 self.tob_initialized = bid.is_some() || ask.is_some();
675 }
676
677 fn decrement_l1_queue_on_quote(
678 &mut self,
679 bid_price_raw: PriceRaw,
680 bid_size_raw: QuantityRaw,
681 ask_price_raw: PriceRaw,
682 ask_size_raw: QuantityRaw,
683 ) {
684 if !self.config.queue_position {
685 return;
686 }
687
688 if self.tob_initialized {
690 if bid_price_raw < self.prev_bid_price_raw {
692 self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
693 }
694
695 if ask_price_raw > self.prev_ask_price_raw {
697 self.adjust_l1_queue_on_price_move(ask_price_raw, ask_size_raw, OrderSide::Sell);
698 }
699 }
700
701 self.resolve_pending_l1_snapshots(bid_price_raw, bid_size_raw, ask_price_raw, ask_size_raw);
703 }
704
705 fn adjust_l1_queue_on_price_move(
706 &mut self,
707 new_price_raw: PriceRaw,
708 new_size_raw: QuantityRaw,
709 order_side: OrderSide,
710 ) {
711 let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
712 let mut stale: Vec<ClientOrderId> = Vec::new();
713
714 for client_order_id in keys {
715 let Some(&(order_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id) else {
716 continue;
717 };
718
719 let cache = self.cache.borrow();
720 let order_info = cache.order(&client_order_id).and_then(|order| {
721 if order.is_closed() {
722 None
723 } else {
724 Some(order.order_side())
725 }
726 });
727 drop(cache);
728
729 let Some(side) = order_info else {
730 stale.push(client_order_id);
731 continue;
732 };
733
734 if side != order_side {
735 continue;
736 }
737
738 let crossed = match order_side {
741 OrderSide::Buy => order_price_raw > new_price_raw,
742 _ => order_price_raw < new_price_raw,
743 };
744
745 if crossed {
746 self.queue_ahead
747 .insert(client_order_id, (order_price_raw, 0));
748 } else if order_price_raw == new_price_raw && ahead_raw > new_size_raw {
749 self.queue_ahead
750 .insert(client_order_id, (order_price_raw, new_size_raw));
751 }
752 }
753
754 for id in stale {
755 self.queue_ahead.shift_remove(&id);
756 }
757
758 let pending_keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
760 let mut pending_stale: Vec<ClientOrderId> = Vec::new();
761
762 for client_order_id in pending_keys {
763 let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
764 continue;
765 };
766
767 let cache = self.cache.borrow();
768 let order_info = cache.order(&client_order_id).and_then(|order| {
769 if order.is_closed() {
770 None
771 } else {
772 Some(order.order_side())
773 }
774 });
775 drop(cache);
776
777 let Some(side) = order_info else {
778 pending_stale.push(client_order_id);
779 continue;
780 };
781
782 if side != order_side {
783 continue;
784 }
785
786 let crossed = match order_side {
787 OrderSide::Buy => order_price_raw > new_price_raw,
788 _ => order_price_raw < new_price_raw,
789 };
790
791 if crossed {
792 self.queue_pending.shift_remove(&client_order_id);
793 self.queue_ahead
794 .insert(client_order_id, (order_price_raw, 0));
795 } else if order_price_raw == new_price_raw {
796 self.queue_pending.shift_remove(&client_order_id);
797 self.queue_ahead
798 .insert(client_order_id, (order_price_raw, new_size_raw));
799 }
800 }
801
802 for id in pending_stale {
803 self.queue_pending.shift_remove(&id);
804 }
805 }
806
807 fn resolve_pending_l1_snapshots(
808 &mut self,
809 bid_price_raw: PriceRaw,
810 bid_size_raw: QuantityRaw,
811 ask_price_raw: PriceRaw,
812 ask_size_raw: QuantityRaw,
813 ) {
814 let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
815 let mut stale: Vec<ClientOrderId> = Vec::new();
816
817 for client_order_id in keys {
818 let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
819 continue;
820 };
821
822 let cache = self.cache.borrow();
823 let order_info = cache.order(&client_order_id).and_then(|order| {
824 if order.is_closed() {
825 None
826 } else {
827 Some(order.order_side())
828 }
829 });
830 drop(cache);
831
832 let Some(side) = order_info else {
833 stale.push(client_order_id);
834 continue;
835 };
836
837 let matched_size = match side {
839 OrderSide::Buy if order_price_raw == bid_price_raw => Some(bid_size_raw),
840 OrderSide::Sell if order_price_raw == ask_price_raw => Some(ask_size_raw),
841 _ => None,
842 };
843
844 if let Some(size) = matched_size {
845 self.queue_pending.shift_remove(&client_order_id);
846 self.queue_ahead
847 .insert(client_order_id, (order_price_raw, size));
848 }
849 }
850
851 for id in stale {
852 self.queue_pending.shift_remove(&id);
853 }
854 }
855
856 fn resolve_pending_on_trade(&mut self, trade_price_raw: PriceRaw) {
857 let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
858 let mut stale: Vec<ClientOrderId> = Vec::new();
859
860 for client_order_id in keys {
861 let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
862 continue;
863 };
864
865 let cache = self.cache.borrow();
866 let order_side = cache.order(&client_order_id).and_then(|order| {
867 if order.is_closed() {
868 None
869 } else {
870 Some(order.order_side())
871 }
872 });
873 drop(cache);
874
875 let Some(side) = order_side else {
876 stale.push(client_order_id);
877 continue;
878 };
879
880 let crossed = match side {
882 OrderSide::Buy => trade_price_raw < order_price_raw,
883 OrderSide::Sell => trade_price_raw > order_price_raw,
884 _ => false,
885 };
886
887 if crossed {
888 self.queue_pending.shift_remove(&client_order_id);
889 self.queue_ahead
890 .insert(client_order_id, (order_price_raw, 0));
891 }
892 }
893
894 for id in stale {
895 self.queue_pending.shift_remove(&id);
896 }
897 }
898
899 #[must_use]
900 pub fn best_bid_price(&self) -> Option<Price> {
902 self.book.best_bid_price()
903 }
904
905 #[must_use]
906 pub fn best_ask_price(&self) -> Option<Price> {
908 self.book.best_ask_price()
909 }
910
911 #[must_use]
912 pub const fn get_book(&self) -> &OrderBook {
914 &self.book
915 }
916
917 #[must_use]
918 pub fn get_open_bid_orders(&self) -> Vec<RestingOrder> {
920 self.core.get_orders_bid()
921 }
922
923 #[must_use]
924 pub fn get_open_ask_orders(&self) -> Vec<RestingOrder> {
926 self.core.get_orders_ask()
927 }
928
929 #[must_use]
930 pub fn get_open_orders(&self) -> Vec<RestingOrder> {
932 self.core.get_orders()
933 }
934
935 #[must_use]
936 pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
938 self.core.order_exists(client_order_id)
939 }
940
941 #[must_use]
942 pub fn cached_filled_qty_len(&self) -> usize {
944 self.cached_filled_qty.len()
945 }
946
947 #[must_use]
948 pub const fn get_core(&self) -> &OrderMatchingCore {
949 &self.core
950 }
951
952 pub fn set_fill_at_market(&mut self, value: bool) {
953 self.fill_at_market = value;
954 }
955
956 pub fn update_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
962 if instrument.id() != self.instrument.id() {
963 anyhow::bail!(
964 "Cannot update instrument {} with {}",
965 self.instrument.id(),
966 instrument.id()
967 );
968 }
969
970 let changed = instrument.price_increment() != self.instrument.price_increment()
971 || instrument.price_precision() != self.instrument.price_precision()
972 || instrument.size_precision() != self.instrument.size_precision();
973
974 if changed {
975 self.core
976 .update_price_increment(instrument.price_increment());
977 self.book.reset();
978 self.bid_consumption.clear();
979 self.ask_consumption.clear();
980 self.trade_consumption = 0;
981 self.queue_ahead.clear();
982 self.queue_excess.clear();
983 self.queue_pending.clear();
984 self.prev_bid_price_raw = 0;
985 self.prev_bid_size_raw = 0;
986 self.prev_ask_price_raw = 0;
987 self.prev_ask_size_raw = 0;
988 self.last_quote_bid = None;
989 self.last_quote_ask = None;
990 self.precision_mismatch_streak = 0;
991 self.tob_initialized = false;
992 self.target_bid = None;
993 self.target_ask = None;
994 self.target_last = None;
995 self.last_bar_bid = None;
996 self.last_bar_ask = None;
997 self.core.bid = None;
998 self.core.ask = None;
999 self.core.last = None;
1000 log::info!(
1001 "Updated instrument {} (price_precision={} size_precision={})",
1002 instrument.id(),
1003 instrument.price_precision(),
1004 instrument.size_precision()
1005 );
1006 }
1007
1008 self.instrument = instrument;
1009
1010 if changed {
1011 self.drop_incompatible_core_orders();
1012 }
1013 Ok(())
1014 }
1015
1016 fn check_price_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
1017 let expected = self.instrument.price_precision();
1018 if actual != expected {
1019 anyhow::bail!(
1020 "Invalid {field} precision {actual}, expected {expected} for {}",
1021 self.instrument.id()
1022 );
1023 }
1024 Ok(())
1025 }
1026
1027 fn check_size_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
1028 let expected = self.instrument.size_precision();
1029 if actual != expected {
1030 anyhow::bail!(
1031 "Invalid {field} precision {actual}, expected {expected} for {}",
1032 self.instrument.id()
1033 );
1034 }
1035 Ok(())
1036 }
1037
1038 fn log_precision_mismatch(
1039 &mut self,
1040 data_type: &str,
1041 instrument_id: InstrumentId,
1042 err: &anyhow::Error,
1043 ) {
1044 self.precision_mismatch_streak = self.precision_mismatch_streak.saturating_add(1);
1045 let streak = self.precision_mismatch_streak;
1046
1047 if streak <= 3 || streak.is_multiple_of(100) {
1048 log::warn!(
1049 "Skipping {data_type} for {instrument_id}: {err} \
1050 (consecutive_precision_mismatches={streak})"
1051 );
1052 }
1053
1054 if streak == 20 {
1055 log::error!(
1056 "Precision mismatches reached {streak} consecutive events for \
1057 {instrument_id}; check instrument update flow and upstream market data"
1058 );
1059 }
1060 }
1061
1062 fn drop_incompatible_core_orders(&mut self) {
1063 let client_order_ids: Vec<ClientOrderId> = self
1064 .core
1065 .iter_orders()
1066 .filter(|order| {
1067 !self.resting_order_matches_current_instrument(order)
1068 || !self.cached_order_matches_current_instrument(order.client_order_id)
1069 })
1070 .map(|order| order.client_order_id)
1071 .collect();
1072
1073 for client_order_id in client_order_ids {
1074 let order = self
1075 .cache
1076 .borrow()
1077 .order(&client_order_id)
1078 .map(|o| o.clone());
1079 if let Some(order) = order
1080 && (order.is_inflight() || order.is_open())
1081 {
1082 log::warn!(
1083 "Canceling order {client_order_id} after instrument update: \
1084 price, trigger price, or quantity is not compatible with {}",
1085 self.instrument.id()
1086 );
1087 self.cancel_order(&order, None);
1088 } else {
1089 let _ = self.core.delete_order(client_order_id);
1090 self.cached_filled_qty.swap_remove(&client_order_id);
1091 }
1092 }
1093 }
1094
1095 fn cached_order_matches_current_instrument(&self, client_order_id: ClientOrderId) -> bool {
1096 self.cache
1097 .borrow()
1098 .order(&client_order_id)
1099 .is_none_or(|order| {
1100 Self::quantity_matches_precision(order.quantity(), self.instrument.size_precision())
1101 })
1102 }
1103
1104 fn resting_order_matches_current_instrument(&self, order: &RestingOrder) -> bool {
1105 order
1106 .limit_price
1107 .is_none_or(|price| self.price_matches_current_instrument(price))
1108 && order
1109 .trigger_price
1110 .is_none_or(|price| self.price_matches_current_instrument(price))
1111 }
1112
1113 fn price_matches_current_instrument(&self, price: Price) -> bool {
1114 Self::price_matches_precision(price, self.instrument.price_precision())
1115 && Self::price_matches_tick(price, self.instrument.price_increment())
1116 }
1117
1118 fn price_matches_precision(price: Price, precision: u8) -> bool {
1119 let precision_diff = FIXED_PRECISION.saturating_sub(precision);
1120 let scale = PriceRaw::pow(10, u32::from(precision_diff));
1121 price.raw % scale == 0
1122 }
1123
1124 fn price_matches_tick(price: Price, increment: Price) -> bool {
1125 let increment_raw = increment.raw.abs();
1126 increment_raw == 0 || price.raw % increment_raw == 0
1127 }
1128
1129 fn quantity_matches_precision(quantity: Quantity, precision: u8) -> bool {
1130 let precision_diff = FIXED_PRECISION.saturating_sub(precision);
1131 let scale = QuantityRaw::pow(10, u32::from(precision_diff));
1132 quantity.raw.is_multiple_of(scale)
1133 }
1134
1135 fn normalize_price_for_current_instrument(&self, price: Price) -> Option<Price> {
1136 if !self.price_matches_current_instrument(price) {
1137 return None;
1138 }
1139
1140 Some(Price::from_raw(
1141 price.raw,
1142 self.instrument.price_precision(),
1143 ))
1144 }
1145
1146 fn normalize_quantity_for_current_instrument(&self, quantity: Quantity) -> Option<Quantity> {
1147 let precision = self.instrument.size_precision();
1148 if !Self::quantity_matches_precision(quantity, precision) {
1149 return None;
1150 }
1151
1152 Some(Quantity::from_raw(quantity.raw, precision))
1153 }
1154
1155 pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
1163 log::debug!("Processing {delta}");
1164
1165 if matches!(delta.action, BookAction::Add | BookAction::Update) {
1167 self.check_price_precision(delta.order.price.precision, "delta order price")?;
1168 self.check_size_precision(delta.order.size.precision, "delta order size")?;
1169 }
1170
1171 if self.book_type == BookType::L1_MBP {
1173 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
1174 return Ok(());
1175 }
1176
1177 self.book.apply_delta(delta)?;
1178
1179 let delta_snapshot_or_clear = (delta.flags & 32) != 0 || delta.action == BookAction::Clear;
1180
1181 if self.config.queue_position {
1182 if delta_snapshot_or_clear {
1183 self.clear_all_queue_positions();
1184 } else if delta.action == BookAction::Delete {
1185 self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1186 } else if delta.action == BookAction::Update {
1187 self.cap_queue_ahead(
1188 delta.order.price.raw,
1189 delta.order.size.raw,
1190 delta.order.side,
1191 );
1192 }
1193 }
1194
1195 if self.config.queue_position && delta_snapshot_or_clear {
1196 self.seed_tob_baseline();
1197 }
1198
1199 self.iterate(delta.ts_init, AggressorSide::NoAggressor);
1200 Ok(())
1201 }
1202
1203 pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1211 log::debug!("Processing {deltas}");
1212
1213 for delta in &deltas.deltas {
1215 if matches!(delta.action, BookAction::Add | BookAction::Update) {
1216 self.check_price_precision(delta.order.price.precision, "delta order price")?;
1217 self.check_size_precision(delta.order.size.precision, "delta order size")?;
1218 }
1219 }
1220
1221 if self.book_type == BookType::L1_MBP {
1223 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1224 return Ok(());
1225 }
1226
1227 self.book.apply_deltas(deltas)?;
1228
1229 let mut has_snapshot_or_clear = false;
1230
1231 if self.config.queue_position {
1232 for delta in &deltas.deltas {
1233 if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
1234 self.clear_all_queue_positions();
1235 has_snapshot_or_clear = true;
1236 break;
1237 } else if delta.action == BookAction::Delete {
1238 self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1239 } else if delta.action == BookAction::Update {
1240 self.cap_queue_ahead(
1241 delta.order.price.raw,
1242 delta.order.size.raw,
1243 delta.order.side,
1244 );
1245 }
1246 }
1247 }
1248
1249 if self.config.queue_position && has_snapshot_or_clear {
1250 self.seed_tob_baseline();
1251 }
1252
1253 self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1254 Ok(())
1255 }
1256
1257 pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) -> anyhow::Result<()> {
1266 log::debug!("Processing OrderBookDepth10 for {}", depth.instrument_id);
1267
1268 for order in &depth.bids {
1270 if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1271 continue;
1272 }
1273 self.check_price_precision(order.price.precision, "bid price")?;
1274 self.check_size_precision(order.size.precision, "bid size")?;
1275 }
1276
1277 for order in &depth.asks {
1278 if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1279 continue;
1280 }
1281 self.check_price_precision(order.price.precision, "ask price")?;
1282 self.check_size_precision(order.size.precision, "ask size")?;
1283 }
1284
1285 if self.book_type == BookType::L1_MBP {
1288 let quote = QuoteTick::new(
1289 depth.instrument_id,
1290 depth.bids[0].price,
1291 depth.asks[0].price,
1292 depth.bids[0].size,
1293 depth.asks[0].size,
1294 depth.ts_event,
1295 depth.ts_init,
1296 );
1297 self.book.update_quote_tick("e)?;
1298 self.last_quote_bid = Some(depth.bids[0].price);
1299 self.last_quote_ask = Some(depth.asks[0].price);
1300 } else {
1301 self.book.apply_depth(depth)?;
1302 }
1303
1304 if self.config.queue_position {
1306 self.clear_all_queue_positions();
1307 let bid_price_raw = depth.bids[0].price.raw;
1308 let bid_size_raw = depth.bids[0].size.raw;
1309 let ask_price_raw = depth.asks[0].price.raw;
1310 let ask_size_raw = depth.asks[0].size.raw;
1311
1312 if self.tob_initialized {
1314 if bid_price_raw < self.prev_bid_price_raw {
1315 self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
1316 }
1317
1318 if ask_price_raw > self.prev_ask_price_raw {
1319 self.adjust_l1_queue_on_price_move(
1320 ask_price_raw,
1321 ask_size_raw,
1322 OrderSide::Sell,
1323 );
1324 }
1325 }
1326
1327 self.resolve_pending_l1_snapshots(
1328 bid_price_raw,
1329 bid_size_raw,
1330 ask_price_raw,
1331 ask_size_raw,
1332 );
1333
1334 self.prev_bid_price_raw = bid_price_raw;
1335 self.prev_bid_size_raw = bid_size_raw;
1336 self.prev_ask_price_raw = ask_price_raw;
1337 self.prev_ask_size_raw = ask_size_raw;
1338 self.tob_initialized = true;
1339 }
1340
1341 self.iterate(depth.ts_init, AggressorSide::NoAggressor);
1342 Ok(())
1343 }
1344
1345 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
1347 log::debug!("Processing {quote}");
1348
1349 if let Err(e) = self.check_price_precision(quote.bid_price.precision, "bid_price") {
1350 self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1351 return;
1352 }
1353
1354 if let Err(e) = self.check_price_precision(quote.ask_price.precision, "ask_price") {
1355 self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1356 return;
1357 }
1358
1359 if let Err(e) = self.check_size_precision(quote.bid_size.precision, "bid_size") {
1360 self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1361 return;
1362 }
1363
1364 if let Err(e) = self.check_size_precision(quote.ask_size.precision, "ask_size") {
1365 self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1366 return;
1367 }
1368
1369 self.precision_mismatch_streak = 0;
1370
1371 if self.book_type == BookType::L1_MBP {
1372 if quote.ts_event < self.book.ts_last {
1374 log::warn!(
1375 "Skipping stale quote: ts_event {} < book.ts_last {} for {}",
1376 quote.ts_event,
1377 self.book.ts_last,
1378 self.book.instrument_id,
1379 );
1380 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1381 return;
1382 }
1383
1384 if !self.update_quote_tick_or_skip(quote, "quote tick") {
1385 return;
1386 }
1387
1388 if self.config.queue_position {
1389 self.decrement_l1_queue_on_quote(
1390 quote.bid_price.raw,
1391 quote.bid_size.raw,
1392 quote.ask_price.raw,
1393 quote.ask_size.raw,
1394 );
1395 self.prev_bid_price_raw = quote.bid_price.raw;
1396 self.prev_bid_size_raw = quote.bid_size.raw;
1397 self.prev_ask_price_raw = quote.ask_price.raw;
1398 self.prev_ask_size_raw = quote.ask_size.raw;
1399 self.tob_initialized = true;
1400 }
1401 self.last_quote_bid = Some(quote.bid_price);
1402 self.last_quote_ask = Some(quote.ask_price);
1403 }
1404
1405 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1406 }
1407
1408 pub fn process_bar(&mut self, bar: &Bar) {
1417 log::debug!("Processing {bar}");
1418
1419 if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
1421 return;
1422 }
1423
1424 let bar_type = bar.bar_type;
1425 if bar_type.aggregation_source() == AggregationSource::Internal {
1427 return;
1428 }
1429
1430 if let Err(e) = self.check_price_precision(bar.open.precision, "bar open") {
1431 self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1432 return;
1433 }
1434
1435 if let Err(e) = self.check_price_precision(bar.high.precision, "bar high") {
1436 self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1437 return;
1438 }
1439
1440 if let Err(e) = self.check_price_precision(bar.low.precision, "bar low") {
1441 self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1442 return;
1443 }
1444
1445 if let Err(e) = self.check_price_precision(bar.close.precision, "bar close") {
1446 self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1447 return;
1448 }
1449
1450 if let Err(e) = self.check_size_precision(bar.volume.precision, "bar volume") {
1451 self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1452 return;
1453 }
1454
1455 self.precision_mismatch_streak = 0;
1456
1457 let price_type = bar_type.spec().price_type;
1458 if price_type == PriceType::Mark {
1459 log::warn!(
1460 "Cannot process bar for {} with `PriceType::Mark`, mark price bars are not supported for bar execution",
1461 bar.instrument_id(),
1462 );
1463 return;
1464 }
1465
1466 let execution_bar_type =
1467 if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
1468 execution_bar_type.to_owned()
1469 } else {
1470 self.execution_bar_types
1471 .insert(bar.instrument_id(), bar_type);
1472 self.execution_bar_deltas
1473 .insert(bar_type, bar_type.spec().timedelta());
1474 bar_type
1475 };
1476
1477 if execution_bar_type != bar_type {
1478 let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
1479 if bar_type_timedelta.is_none() {
1480 bar_type_timedelta = Some(bar_type.spec().timedelta());
1481 self.execution_bar_deltas
1482 .insert(bar_type, bar_type_timedelta.unwrap());
1483 }
1484
1485 if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
1486 >= &bar_type_timedelta.unwrap()
1487 {
1488 self.execution_bar_types
1489 .insert(bar_type.instrument_id(), bar_type);
1490 } else {
1491 return;
1492 }
1493 }
1494
1495 match price_type {
1496 PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
1497 PriceType::Bid => {
1498 self.last_bar_bid = Some(bar.to_owned());
1499 self.process_quote_ticks_from_bar();
1500 }
1501 PriceType::Ask => {
1502 self.last_bar_ask = Some(bar.to_owned());
1503 self.process_quote_ticks_from_bar();
1504 }
1505 PriceType::Mark => {
1506 unreachable!("PriceType::Mark bars return before execution bar state updates")
1507 }
1508 }
1509 }
1510
1511 fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
1512 let sizes = BarTickSizes::from_volume(bar.volume, self.instrument.size_increment());
1513
1514 let aggressor_side = if self.core.last.is_none_or(|last| bar.open > last) {
1515 AggressorSide::Buyer
1516 } else {
1517 AggressorSide::Seller
1518 };
1519
1520 if self.core.last.is_none() {
1522 self.fill_at_market = true;
1523
1524 if !self.process_bar_trade_tick(
1525 bar,
1526 bar.open,
1527 sizes.open,
1528 aggressor_side,
1529 "bar open trade tick",
1530 ) {
1531 return;
1532 }
1533 self.core.set_last_raw(bar.open);
1534 } else if self.core.last.is_some_and(|last| bar.open != last) {
1535 self.fill_at_market = true;
1537
1538 if !self.process_bar_trade_tick(
1539 bar,
1540 bar.open,
1541 sizes.open,
1542 aggressor_side,
1543 "bar gap-open trade tick",
1544 ) {
1545 return;
1546 }
1547 self.core.set_last_raw(bar.open);
1548 }
1549
1550 let high_first = !self.config.bar_adaptive_high_low_ordering
1553 || (bar.high.raw - bar.open.raw).abs() < (bar.low.raw - bar.open.raw).abs();
1554
1555 if high_first {
1556 self.process_bar_high(bar, sizes.high);
1557 self.process_bar_low(bar, sizes.low);
1558 } else {
1559 self.process_bar_low(bar, sizes.low);
1560 self.process_bar_high(bar, sizes.high);
1561 }
1562
1563 if self.core.last.is_some_and(|last| bar.close != last) {
1565 self.fill_at_market = false;
1566
1567 let aggressor_side = if bar.close > self.core.last.unwrap() {
1568 AggressorSide::Buyer
1569 } else {
1570 AggressorSide::Seller
1571 };
1572
1573 if !self.process_bar_trade_tick(
1574 bar,
1575 bar.close,
1576 sizes.close,
1577 aggressor_side,
1578 "bar close trade tick",
1579 ) {
1580 return;
1581 }
1582
1583 self.core.set_last_raw(bar.close);
1584 }
1585
1586 self.fill_at_market = true;
1587 }
1588
1589 fn process_bar_high(&mut self, bar: &Bar, size: Quantity) {
1590 if self.core.last.is_some_and(|last| bar.high > last) {
1591 self.fill_at_market = false;
1592
1593 if !self.process_bar_trade_tick(
1594 bar,
1595 bar.high,
1596 size,
1597 AggressorSide::Buyer,
1598 "bar high trade tick",
1599 ) {
1600 return;
1601 }
1602
1603 self.core.set_last_raw(bar.high);
1604 }
1605 }
1606
1607 fn process_bar_low(&mut self, bar: &Bar, size: Quantity) {
1608 if self.core.last.is_some_and(|last| bar.low < last) {
1609 self.fill_at_market = false;
1610
1611 if !self.process_bar_trade_tick(
1612 bar,
1613 bar.low,
1614 size,
1615 AggressorSide::Seller,
1616 "bar low trade tick",
1617 ) {
1618 return;
1619 }
1620
1621 self.core.set_last_raw(bar.low);
1622 }
1623 }
1624
1625 fn process_bar_trade_tick(
1626 &mut self,
1627 bar: &Bar,
1628 price: Price,
1629 size: Quantity,
1630 aggressor_side: AggressorSide,
1631 context: &str,
1632 ) -> bool {
1633 if size.is_zero() {
1634 return true;
1635 }
1636
1637 let trade_tick = TradeTick::new(
1638 bar.instrument_id(),
1639 price,
1640 size,
1641 aggressor_side,
1642 self.ids_generator.generate_trade_id(bar.ts_init),
1643 bar.ts_init,
1644 bar.ts_init,
1645 );
1646
1647 if !self.update_trade_tick_or_skip(&trade_tick, context) {
1648 return false;
1649 }
1650
1651 self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1652 true
1653 }
1654
1655 fn process_quote_ticks_from_bar(&mut self) {
1656 if self.last_bar_bid.is_none()
1658 || self.last_bar_ask.is_none()
1659 || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
1660 {
1661 return;
1662 }
1663 let bid_bar = self.last_bar_bid.unwrap();
1664 let ask_bar = self.last_bar_ask.unwrap();
1665
1666 let size_increment = self.instrument.size_increment();
1667 let bid_sizes = BarTickSizes::from_volume(bid_bar.volume, size_increment);
1668 let ask_sizes = BarTickSizes::from_volume(ask_bar.volume, size_increment);
1669 let mut has_current_bid = false;
1670 let mut has_current_ask = false;
1671
1672 let mut quote_tick = QuoteTick::new(
1673 self.book.instrument_id,
1674 bid_bar.open,
1675 ask_bar.open,
1676 bid_sizes.open,
1677 ask_sizes.open,
1678 bid_bar.ts_init,
1679 bid_bar.ts_init,
1680 );
1681
1682 self.fill_at_market = true;
1684
1685 if !self.process_bar_quote_tick(
1686 "e_tick,
1687 "bar open quote tick",
1688 &mut has_current_bid,
1689 &mut has_current_ask,
1690 ) {
1691 return;
1692 }
1693
1694 self.fill_at_market = false;
1696 quote_tick.bid_price = bid_bar.high;
1697 quote_tick.ask_price = ask_bar.high;
1698 quote_tick.bid_size = bid_sizes.high;
1699 quote_tick.ask_size = ask_sizes.high;
1700
1701 if !self.process_bar_quote_tick(
1702 "e_tick,
1703 "bar high quote tick",
1704 &mut has_current_bid,
1705 &mut has_current_ask,
1706 ) {
1707 return;
1708 }
1709
1710 self.fill_at_market = false;
1712 quote_tick.bid_price = bid_bar.low;
1713 quote_tick.ask_price = ask_bar.low;
1714 quote_tick.bid_size = bid_sizes.low;
1715 quote_tick.ask_size = ask_sizes.low;
1716
1717 if !self.process_bar_quote_tick(
1718 "e_tick,
1719 "bar low quote tick",
1720 &mut has_current_bid,
1721 &mut has_current_ask,
1722 ) {
1723 return;
1724 }
1725
1726 self.fill_at_market = false;
1728 quote_tick.bid_price = bid_bar.close;
1729 quote_tick.ask_price = ask_bar.close;
1730 quote_tick.bid_size = bid_sizes.close;
1731 quote_tick.ask_size = ask_sizes.close;
1732
1733 if !self.process_bar_quote_tick(
1734 "e_tick,
1735 "bar close quote tick",
1736 &mut has_current_bid,
1737 &mut has_current_ask,
1738 ) {
1739 return;
1740 }
1741
1742 self.last_bar_bid = None;
1743 self.last_bar_ask = None;
1744 self.fill_at_market = true;
1745 }
1746
1747 fn process_bar_quote_tick(
1748 &mut self,
1749 quote: &QuoteTick,
1750 context: &str,
1751 has_current_bid: &mut bool,
1752 has_current_ask: &mut bool,
1753 ) -> bool {
1754 let has_bid_size = !quote.bid_size.is_zero();
1755 let has_ask_size = !quote.ask_size.is_zero();
1756 let mut book_changed = false;
1757 let mut bid_cleared = false;
1758 let mut ask_cleared = false;
1759
1760 match (has_bid_size, has_ask_size) {
1761 (true, true) => {
1762 if !self.update_quote_tick_or_skip(quote, context) {
1763 return false;
1764 }
1765 *has_current_bid = true;
1766 *has_current_ask = true;
1767 book_changed = true;
1768 }
1769 _ => {
1770 if has_bid_size {
1771 self.update_bar_quote_bid(quote);
1772 *has_current_bid = true;
1773 book_changed = true;
1774 } else if !*has_current_bid {
1775 self.clear_bar_quote_bid(quote);
1776 *has_current_bid = true;
1777 book_changed = true;
1778 bid_cleared = true;
1779 }
1780
1781 if has_ask_size {
1782 self.update_bar_quote_ask(quote);
1783 *has_current_ask = true;
1784 book_changed = true;
1785 } else if !*has_current_ask {
1786 self.clear_bar_quote_ask(quote);
1787 *has_current_ask = true;
1788 book_changed = true;
1789 ask_cleared = true;
1790 }
1791 }
1792 }
1793
1794 if book_changed
1795 && let (Some(best_bid), Some(best_ask)) =
1796 (self.book.best_bid_price(), self.book.best_ask_price())
1797 && best_bid > best_ask
1798 {
1799 if has_bid_size && !has_ask_size {
1800 self.clear_bar_quote_ask(quote);
1801 ask_cleared = true;
1802 } else if has_ask_size && !has_bid_size {
1803 self.clear_bar_quote_bid(quote);
1804 bid_cleared = true;
1805 }
1806 }
1807
1808 if has_bid_size {
1809 self.last_quote_bid = Some(quote.bid_price);
1810 } else if bid_cleared {
1811 self.last_quote_bid = None;
1812 }
1813
1814 if has_ask_size {
1815 self.last_quote_ask = Some(quote.ask_price);
1816 } else if ask_cleared {
1817 self.last_quote_ask = None;
1818 }
1819
1820 if !book_changed {
1821 return true;
1822 }
1823
1824 self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1825 true
1826 }
1827
1828 fn update_bar_quote_bid(&mut self, quote: &QuoteTick) {
1829 let bid = BookOrder::new(
1830 OrderSide::Buy,
1831 quote.bid_price,
1832 quote.bid_size,
1833 OrderSide::Buy as u64,
1834 );
1835 self.book
1836 .add(bid, 0, self.book.sequence.saturating_add(1), quote.ts_event);
1837 }
1838
1839 fn clear_bar_quote_bid(&mut self, quote: &QuoteTick) {
1840 self.book
1841 .clear_bids(self.book.sequence.saturating_add(1), quote.ts_event);
1842 }
1843
1844 fn update_bar_quote_ask(&mut self, quote: &QuoteTick) {
1845 let ask = BookOrder::new(
1846 OrderSide::Sell,
1847 quote.ask_price,
1848 quote.ask_size,
1849 OrderSide::Sell as u64,
1850 );
1851 self.book
1852 .add(ask, 0, self.book.sequence.saturating_add(1), quote.ts_event);
1853 }
1854
1855 fn clear_bar_quote_ask(&mut self, quote: &QuoteTick) {
1856 self.book
1857 .clear_asks(self.book.sequence.saturating_add(1), quote.ts_event);
1858 }
1859
1860 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
1867 log::debug!("Processing {trade}");
1868
1869 if let Err(e) = self.check_price_precision(trade.price.precision, "trade price") {
1870 self.log_precision_mismatch("trade tick", trade.instrument_id, &e);
1871 return;
1872 }
1873
1874 if let Err(e) = self.check_size_precision(trade.size.precision, "trade size") {
1875 self.log_precision_mismatch("trade tick", trade.instrument_id, &e);
1876 return;
1877 }
1878
1879 self.precision_mismatch_streak = 0;
1880
1881 let price_raw = trade.price.raw;
1882
1883 if self.book_type == BookType::L1_MBP {
1884 if trade.ts_event < self.book.ts_last {
1886 log::warn!(
1887 "Skipping stale trade: ts_event {} < book.ts_last {} for {}",
1888 trade.ts_event,
1889 self.book.ts_last,
1890 self.book.instrument_id,
1891 );
1892 self.iterate(trade.ts_init, AggressorSide::NoAggressor);
1893 return;
1894 }
1895
1896 if !self.update_trade_tick_or_skip(trade, "trade tick") {
1897 return;
1898 }
1899 }
1900
1901 self.core.set_last_raw(trade.price);
1902
1903 if !self.config.trade_execution {
1904 if self.book_type == BookType::L1_MBP {
1906 if let Some(bid) = self.book.best_bid_price() {
1907 self.core.set_bid_raw(bid);
1908 }
1909
1910 if let Some(ask) = self.book.best_ask_price() {
1911 self.core.set_ask_raw(ask);
1912 }
1913 }
1914 return;
1915 }
1916
1917 let aggressor_side = trade.aggressor_side;
1918
1919 match aggressor_side {
1920 AggressorSide::Buyer => {
1921 if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1924 self.core.set_ask_raw(trade.price);
1925 }
1926
1927 if self.core.bid.is_none() {
1929 self.core.set_bid_raw(trade.price);
1930 }
1931 }
1932 AggressorSide::Seller => {
1933 if self.core.bid.is_none()
1936 || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1937 {
1938 self.core.set_bid_raw(trade.price);
1939 }
1940
1941 if self.core.ask.is_none() {
1943 self.core.set_ask_raw(trade.price);
1944 }
1945 }
1946 AggressorSide::NoAggressor => {
1947 if self.core.bid.is_none()
1948 || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1949 {
1950 self.core.set_bid_raw(trade.price);
1951 }
1952
1953 if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
1954 self.core.set_ask_raw(trade.price);
1955 }
1956 }
1957 }
1958
1959 let original_bid = self.core.bid;
1960 let original_ask = self.core.ask;
1961
1962 match aggressor_side {
1963 AggressorSide::Seller => {
1964 if original_ask.is_some_and(|ask| price_raw < ask.raw) {
1965 self.core.set_ask_raw(trade.price);
1966 }
1967 }
1968 AggressorSide::Buyer => {
1969 if original_bid.is_some_and(|bid| price_raw > bid.raw) {
1970 self.core.set_bid_raw(trade.price);
1971 }
1972 }
1973 AggressorSide::NoAggressor => {
1974 self.core.set_bid_raw(trade.price);
1976 self.core.set_ask_raw(trade.price);
1977 }
1978 }
1979
1980 self.last_trade_size = Some(trade.size);
1981 self.trade_consumption = 0;
1982
1983 if self.config.liquidity_consumption && self.book_type != BookType::L1_MBP {
1984 self.seed_trade_consumption(price_raw, trade.size.raw, trade.ts_event, aggressor_side);
1985 }
1986
1987 self.resolve_pending_on_trade(price_raw);
1988 self.decrement_queue_on_trade(price_raw, trade.size.raw, aggressor_side);
1989
1990 self.iterate(trade.ts_init, aggressor_side);
1991
1992 self.last_trade_size = None;
1993 self.trade_consumption = 0;
1994
1995 if self.book_type == BookType::L1_MBP {
2001 match aggressor_side {
2002 AggressorSide::Seller => {
2003 if let Some(ask) = self.last_quote_ask {
2004 self.core.ask = Some(ask);
2005 }
2006 }
2007 AggressorSide::Buyer => {
2008 if let Some(bid) = self.last_quote_bid {
2009 self.core.bid = Some(bid);
2010 }
2011 }
2012 AggressorSide::NoAggressor => {}
2013 }
2014 } else {
2015 match aggressor_side {
2016 AggressorSide::Seller => {
2017 if let Some(ask) = original_ask
2018 && price_raw < ask.raw
2019 {
2020 self.core.ask = Some(ask);
2021 }
2022 }
2023 AggressorSide::Buyer => {
2024 if let Some(bid) = original_bid
2025 && price_raw > bid.raw
2026 {
2027 self.core.bid = Some(bid);
2028 }
2029 }
2030 AggressorSide::NoAggressor => {}
2031 }
2032 }
2033 }
2034
2035 fn update_quote_tick_or_skip(&mut self, quote: &QuoteTick, context: &str) -> bool {
2036 if let Err(e) = self.book.update_quote_tick(quote) {
2037 log::warn!(
2038 "Skipping {context} for {}: update_quote_tick failed: {e}",
2039 quote.instrument_id,
2040 );
2041 return false;
2042 }
2043 true
2044 }
2045
2046 fn update_trade_tick_or_skip(&mut self, trade: &TradeTick, context: &str) -> bool {
2047 if let Err(e) = self.book.update_trade_tick(trade) {
2048 log::warn!(
2049 "Skipping {context} for {}: update_trade_tick failed: {e}",
2050 trade.instrument_id,
2051 );
2052 return false;
2053 }
2054 true
2055 }
2056
2057 pub fn process_status(&mut self, action: MarketStatusAction) {
2059 log::debug!("Processing {action}");
2060
2061 if self.market_status == MarketStatus::Closed
2063 && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
2064 {
2065 self.market_status = MarketStatus::Open;
2066 }
2067 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
2069 self.market_status = MarketStatus::Paused;
2070 }
2071 if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
2073 self.market_status = MarketStatus::Suspended;
2074 }
2075 if self.market_status == MarketStatus::Open
2077 && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
2078 {
2079 self.market_status = MarketStatus::Closed;
2080 }
2081 }
2082
2083 pub fn process_instrument_close(&mut self, close: InstrumentClose) {
2088 if close.instrument_id != self.instrument.id() {
2089 log::warn!(
2090 "Received instrument close for unknown instrument_id: {}",
2091 close.instrument_id
2092 );
2093 return;
2094 }
2095
2096 if close.close_type == InstrumentCloseType::ContractExpired {
2097 self.instrument_close = Some(close);
2098 self.iterate(close.ts_init, AggressorSide::NoAggressor);
2099 }
2100 }
2101
2102 pub fn process_instrument_expiration(&mut self, timestamp_ns: UnixNanos) {
2104 self.check_instrument_expiration(timestamp_ns);
2105 }
2106
2107 #[must_use]
2109 pub const fn is_expiration_processed(&self) -> bool {
2110 self.expiration_processed
2111 }
2112
2113 fn requires_pending_resolution(&self) -> bool {
2114 matches!(self.instrument, InstrumentAny::BinaryOption(_))
2115 }
2116
2117 fn cancel_open_orders_for_expiration(&mut self) {
2118 let instrument_id = self.instrument.id();
2123 let expiration_order_ids: IndexSet<ClientOrderId> = {
2124 let cache = self.cache.borrow();
2125 let mut order_ids = IndexSet::new();
2126
2127 for order_info in self.get_open_orders() {
2128 order_ids.insert(order_info.client_order_id);
2129 }
2130
2131 for order in cache.orders(None, Some(&instrument_id), None, None, None) {
2132 if order.is_open() || order.is_inflight() {
2133 order_ids.insert(order.client_order_id());
2134 }
2135 }
2136
2137 order_ids
2138 };
2139
2140 for client_order_id in expiration_order_ids {
2141 let order = {
2142 let cache = self.cache.borrow();
2143 cache.order(&client_order_id).map(|order| order.clone())
2144 };
2145
2146 if let Some(order) = order {
2147 self.cancel_order(&order, None);
2148 }
2149 }
2150 }
2151
2152 fn enter_pending_resolution(&mut self) {
2153 if self.pending_resolution {
2154 return;
2155 }
2156
2157 self.pending_resolution = true;
2158 self.market_status = MarketStatus::Closed;
2159 self.cancel_open_orders_for_expiration();
2160 log::info!(
2161 "{} expired and is now pending resolution; open orders canceled and new orders blocked",
2162 self.instrument.id()
2163 );
2164 }
2165
2166 fn check_instrument_expiration(&mut self, timestamp_ns: UnixNanos) {
2167 if self.expiration_processed {
2168 return;
2169 }
2170
2171 let timestamp_triggered = self
2172 .instrument
2173 .expiration_ns()
2174 .is_some_and(|ns| timestamp_ns >= ns);
2175
2176 if !timestamp_triggered && self.instrument_close.is_none() {
2177 return;
2178 }
2179
2180 if self.instrument_close.is_none()
2181 && timestamp_triggered
2182 && self.requires_pending_resolution()
2183 {
2184 self.enter_pending_resolution();
2185 return;
2186 }
2187
2188 self.expiration_processed = true;
2189 self.pending_resolution = false;
2190 let close = self.instrument_close.take();
2191 log::info!("{} reached expiration", self.instrument.id());
2192 self.cancel_open_orders_for_expiration();
2193
2194 if matches!(
2195 self.instrument,
2196 InstrumentAny::OptionContract(_) | InstrumentAny::CryptoOption(_)
2197 ) {
2198 self.process_option_expiry(timestamp_ns);
2199 return;
2200 }
2201
2202 let instrument_id = self.instrument.id();
2203 let positions: Vec<(TraderId, StrategyId, PositionId, OrderSide, Quantity)> = {
2204 let cache = self.cache.borrow();
2205 cache
2206 .positions_open(None, Some(&instrument_id), None, None, None)
2207 .into_iter()
2208 .map(|pos| {
2209 let closing_side = match pos.side {
2210 PositionSide::Long => OrderSide::Sell,
2211 PositionSide::Short => OrderSide::Buy,
2212 _ => OrderSide::NoOrderSide,
2213 };
2214 (
2215 pos.trader_id,
2216 pos.strategy_id,
2217 pos.id,
2218 closing_side,
2219 pos.quantity,
2220 )
2221 })
2222 .collect()
2223 };
2224
2225 let ts_now = self.clock.borrow().timestamp_ns();
2226 let close_price_fallback = close.as_ref().map(|c| c.close_price);
2227
2228 for (trader_id, strategy_id, position_id, closing_side, quantity) in positions {
2229 let client_order_id =
2230 ClientOrderId::from(format!("EXPIRATION-{}-{}", self.venue, UUID4::new()).as_str());
2231 let mut order = OrderAny::Market(MarketOrder::new(
2232 trader_id,
2233 strategy_id,
2234 instrument_id,
2235 client_order_id,
2236 closing_side,
2237 quantity,
2238 TimeInForce::Gtc,
2239 UUID4::new(),
2240 ts_now,
2241 true, false,
2243 None,
2244 None,
2245 None,
2246 None,
2247 None,
2248 None,
2249 None,
2250 Some(vec![Ustr::from(&format!(
2251 "EXPIRATION_{}_CLOSE",
2252 self.venue
2253 ))]),
2254 ));
2255 order.set_liquidity_side(LiquiditySide::Taker);
2256
2257 let add_result =
2258 self.cache
2259 .borrow_mut()
2260 .add_order(order.clone(), Some(position_id), None, false);
2261 if add_result.is_err() {
2262 log::debug!("Expiration order already in cache: {client_order_id}");
2263 } else {
2264 self.publish_order_initialized(&order);
2265 }
2266
2267 let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
2268 self.generate_order_accepted(&order, venue_order_id);
2269
2270 let fill_price = self.settlement_price.or(close_price_fallback);
2271 if let Some(fill_price) = fill_price {
2272 self.apply_fills(
2273 &order,
2274 &[(fill_price, quantity)],
2275 LiquiditySide::Taker,
2276 Some(position_id),
2277 None,
2278 None,
2279 );
2280 } else {
2281 self.fill_market_order(client_order_id);
2282 }
2283 }
2284 }
2285
2286 fn process_option_expiry(&mut self, ts_now: UnixNanos) {
2287 let instrument_id = self.instrument.id();
2288
2289 let positions: Vec<Position> = {
2290 let cache = self.cache.borrow();
2291 cache
2292 .positions_open(None, Some(&instrument_id), None, None, None)
2293 .into_iter()
2294 .map(|p| p.cloned())
2295 .collect()
2296 };
2297
2298 if positions.is_empty() {
2299 return;
2300 }
2301
2302 let underlying = match self.instrument.underlying() {
2303 Some(u) => u,
2304 None => {
2305 log::error!("No underlying for option {instrument_id}");
2306 return;
2307 }
2308 };
2309 let underlying_id = InstrumentId::from(format!("{underlying}.{}", self.venue).as_str());
2310
2311 let (underlying_instrument, underlying_price) = {
2312 let cache = self.cache.borrow();
2313 (
2314 cache.instrument(&underlying_id).cloned(),
2315 cache.price(&underlying_id, PriceType::Last),
2316 )
2317 };
2318
2319 let underlying_instrument = match underlying_instrument {
2320 Some(u) => u,
2321 None => {
2322 log::error!("No underlying instrument for option {instrument_id}");
2323 return;
2324 }
2325 };
2326
2327 let underlying_price = match underlying_price {
2328 Some(p) => p,
2329 None => {
2330 log::error!("No underlying price for option {instrument_id}");
2331 return;
2332 }
2333 };
2334
2335 let custom_option_price = self.settlement_price;
2336 let should_exercise = self.option_should_exercise(underlying_price);
2337
2338 for position in positions {
2339 self.account_ids
2340 .insert(position.trader_id, position.account_id);
2341
2342 if should_exercise {
2343 self.option_exercise_position(
2344 &position,
2345 &underlying_instrument,
2346 underlying_price,
2347 ts_now,
2348 custom_option_price,
2349 );
2350 } else {
2351 self.option_otm_expiry(&position, ts_now, custom_option_price);
2352 }
2353 }
2354 }
2355
2356 fn option_should_exercise(&self, underlying_price: Price) -> bool {
2357 let strike = match self.instrument.strike_price() {
2358 Some(p) => p.as_f64(),
2359 None => return false,
2360 };
2361 let spot = underlying_price.as_f64();
2362 match self.instrument.option_kind() {
2363 Some(OptionKind::Call) => spot > strike,
2364 Some(OptionKind::Put) => strike > spot,
2365 None => false,
2366 }
2367 }
2368
2369 fn option_settlement_price(&self, underlying_price: Price, cash_settled: bool) -> Price {
2370 let strike = self
2371 .instrument
2372 .strike_price()
2373 .expect("option must have strike");
2374 if !cash_settled {
2375 return strike;
2376 }
2377
2378 let spot = underlying_price.as_f64();
2379 let strike_f = strike.as_f64();
2380 let value = match self.instrument.option_kind() {
2381 Some(OptionKind::Call) => (spot - strike_f).max(0.0),
2382 _ => (strike_f - spot).max(0.0),
2383 };
2384 Price::new(value, strike.precision)
2385 }
2386
2387 fn option_exercise_position(
2388 &self,
2389 position: &Position,
2390 underlying_instrument: &InstrumentAny,
2391 underlying_price: Price,
2392 ts_now: UnixNanos,
2393 custom_option_price: Option<Price>,
2394 ) {
2395 if matches!(underlying_instrument, InstrumentAny::IndexInstrument(_)) {
2396 self.option_cash_settlement(position, underlying_price, ts_now, custom_option_price);
2397 } else {
2398 self.option_physical_settlement(
2399 position,
2400 underlying_instrument,
2401 underlying_price,
2402 ts_now,
2403 custom_option_price,
2404 );
2405 }
2406 }
2407
2408 fn option_cash_settlement(
2409 &self,
2410 position: &Position,
2411 underlying_price: Price,
2412 ts_now: UnixNanos,
2413 custom_option_price: Option<Price>,
2414 ) {
2415 let venue = self.venue;
2416 let trade_id = format!("{venue}-LEG-CASH-{}", &UUID4::new().to_string()[..8]);
2417 let close_px = custom_option_price
2418 .unwrap_or_else(|| self.option_settlement_price(underlying_price, true));
2419 let close_side = OrderCore::closing_side(position.side);
2420 self.option_register_settlement_order(
2421 position,
2422 self.instrument.id(),
2423 close_side,
2424 position.quantity,
2425 ClientOrderId::from(trade_id.as_str()),
2426 VenueOrderId::from(trade_id.as_str()),
2427 Some(position.id),
2428 true,
2429 &format!("EXPIRATION_{venue}_CASH"),
2430 );
2431 let fill = self.option_create_close_fill(position, close_px, &trade_id, ts_now);
2432 self.dispatch_order_event(OrderEventAny::Filled(fill));
2433 }
2434
2435 fn option_physical_settlement(
2436 &self,
2437 position: &Position,
2438 underlying_instrument: &InstrumentAny,
2439 underlying_price: Price,
2440 ts_now: UnixNanos,
2441 custom_option_price: Option<Price>,
2442 ) {
2443 let multiplier = self.instrument.multiplier();
2444 let underlying_qty = Quantity::new(
2445 position.quantity.as_f64() * multiplier.as_f64(),
2446 underlying_instrument.size_precision(),
2447 );
2448
2449 let underlying_side = if self.instrument.option_kind() == Some(OptionKind::Call) {
2450 position.side
2451 } else {
2452 match position.side {
2453 PositionSide::Long => PositionSide::Short,
2454 PositionSide::Short => PositionSide::Long,
2455 other => other,
2456 }
2457 };
2458
2459 let venue = self.venue;
2460 let trade_base = format!("{venue}-LEG-EX-{}", &UUID4::new().to_string()[..8]);
2461 let close_trade_id = format!("{trade_base}-CLOSE");
2462 let open_trade_id = format!("{trade_base}-OPEN");
2463 let settlement_px = self.option_settlement_price(underlying_price, false);
2464 let option_close_px = custom_option_price
2465 .unwrap_or_else(|| Price::new(0.0, self.instrument.price_precision()));
2466 let close_side = OrderCore::closing_side(position.side);
2467 let underlying_order_side = match underlying_side {
2468 PositionSide::Long => OrderSide::Buy,
2469 _ => OrderSide::Sell,
2470 };
2471
2472 self.option_register_settlement_order(
2473 position,
2474 self.instrument.id(),
2475 close_side,
2476 position.quantity,
2477 ClientOrderId::from(close_trade_id.as_str()),
2478 VenueOrderId::from(close_trade_id.as_str()),
2479 Some(position.id),
2480 true,
2481 &format!("EXPIRATION_{venue}_PHYSICAL_CLOSE"),
2482 );
2483 self.option_register_settlement_order(
2484 position,
2485 underlying_instrument.id(),
2486 underlying_order_side,
2487 underlying_qty,
2488 ClientOrderId::from(open_trade_id.as_str()),
2489 VenueOrderId::from(open_trade_id.as_str()),
2490 None,
2491 false,
2492 &format!("EXPIRATION_{venue}_PHYSICAL_OPEN"),
2493 );
2494
2495 let option_fill =
2496 self.option_create_close_fill(position, option_close_px, &close_trade_id, ts_now);
2497 let underlying_fill = self.option_create_underlying_fill(
2498 position,
2499 underlying_instrument,
2500 underlying_qty,
2501 underlying_side,
2502 settlement_px,
2503 &open_trade_id,
2504 ts_now,
2505 );
2506 self.dispatch_order_event(OrderEventAny::Filled(option_fill));
2507 self.dispatch_order_event(OrderEventAny::Filled(underlying_fill));
2508 }
2509
2510 fn option_otm_expiry(
2511 &self,
2512 position: &Position,
2513 ts_now: UnixNanos,
2514 custom_option_price: Option<Price>,
2515 ) {
2516 let venue = self.venue;
2517 let trade_id = format!("{venue}-LEG-OTM-{}", &UUID4::new().to_string()[..8]);
2518 let close_px = custom_option_price
2519 .unwrap_or_else(|| Price::new(0.0, self.instrument.price_precision()));
2520 let close_side = OrderCore::closing_side(position.side);
2521 self.option_register_settlement_order(
2522 position,
2523 self.instrument.id(),
2524 close_side,
2525 position.quantity,
2526 ClientOrderId::from(trade_id.as_str()),
2527 VenueOrderId::from(trade_id.as_str()),
2528 Some(position.id),
2529 true,
2530 &format!("EXPIRATION_{venue}_OTM"),
2531 );
2532 let fill = self.option_create_close_fill(position, close_px, &trade_id, ts_now);
2533 self.dispatch_order_event(OrderEventAny::Filled(fill));
2534 }
2535
2536 #[expect(clippy::too_many_arguments)]
2537 fn option_register_settlement_order(
2538 &self,
2539 position: &Position,
2540 instrument_id: InstrumentId,
2541 order_side: OrderSide,
2542 quantity: Quantity,
2543 client_order_id: ClientOrderId,
2544 venue_order_id: VenueOrderId,
2545 position_id: Option<PositionId>,
2546 reduce_only: bool,
2547 tag: &str,
2548 ) {
2549 let ts_now = self.clock.borrow().timestamp_ns();
2550 let order = OrderAny::Market(MarketOrder::new(
2551 position.trader_id,
2552 position.strategy_id,
2553 instrument_id,
2554 client_order_id,
2555 order_side,
2556 quantity,
2557 TimeInForce::Gtc,
2558 UUID4::new(),
2559 ts_now,
2560 reduce_only,
2561 false,
2562 None,
2563 None,
2564 None,
2565 None,
2566 None,
2567 None,
2568 None,
2569 Some(vec![Ustr::from(tag)]),
2570 ));
2571
2572 {
2573 let mut cache = self.cache.borrow_mut();
2574 if let Err(e) = cache.add_order(order.clone(), position_id, None, false) {
2575 log::debug!("Settlement order already in cache: {e}");
2576 } else {
2577 drop(cache);
2578 self.publish_order_initialized(&order);
2579 self.cache
2580 .borrow_mut()
2581 .add_venue_order_id(&client_order_id, &venue_order_id, false)
2582 .ok();
2583 }
2584 }
2585
2586 self.generate_order_accepted(&order, venue_order_id);
2587 }
2588
2589 fn option_create_close_fill(
2590 &self,
2591 position: &Position,
2592 price: Price,
2593 trade_id_str: &str,
2594 ts_now: UnixNanos,
2595 ) -> OrderFilled {
2596 let close_side = OrderCore::closing_side(position.side);
2597 OrderFilled::new(
2598 position.trader_id,
2599 position.strategy_id,
2600 self.instrument.id(),
2601 ClientOrderId::from(trade_id_str),
2602 VenueOrderId::from(trade_id_str),
2603 position.account_id,
2604 TradeId::from(trade_id_str),
2605 close_side,
2606 OrderType::Market,
2607 position.quantity,
2608 price,
2609 self.instrument.quote_currency(),
2610 LiquiditySide::Taker,
2611 UUID4::new(),
2612 ts_now,
2613 ts_now,
2614 false,
2615 Some(position.id),
2616 Some(Money::new(0.0, self.instrument.quote_currency())),
2617 )
2618 }
2619
2620 #[expect(clippy::too_many_arguments)]
2621 fn option_create_underlying_fill(
2622 &self,
2623 position: &Position,
2624 underlying_instrument: &InstrumentAny,
2625 quantity: Quantity,
2626 side: PositionSide,
2627 price: Price,
2628 trade_id_str: &str,
2629 ts_now: UnixNanos,
2630 ) -> OrderFilled {
2631 let order_side = match side {
2632 PositionSide::Long => OrderSide::Buy,
2633 _ => OrderSide::Sell,
2634 };
2635 OrderFilled::new(
2636 position.trader_id,
2637 position.strategy_id,
2638 underlying_instrument.id(),
2639 ClientOrderId::from(trade_id_str),
2640 VenueOrderId::from(trade_id_str),
2641 position.account_id,
2642 TradeId::from(trade_id_str),
2643 order_side,
2644 OrderType::Market,
2645 quantity,
2646 price,
2647 underlying_instrument.quote_currency(),
2648 LiquiditySide::Taker,
2649 UUID4::new(),
2650 ts_now,
2651 ts_now,
2652 false,
2653 None,
2654 Some(Money::new(0.0, underlying_instrument.quote_currency())),
2655 )
2656 }
2657
2658 pub fn liquidate_open_positions(
2673 &mut self,
2674 ts_now: UnixNanos,
2675 cancel_open_orders: bool,
2676 settlement_currency: Currency,
2677 ) {
2678 if self.instrument.settlement_currency() != settlement_currency {
2680 return;
2681 }
2682
2683 if cancel_open_orders {
2684 let open_orders: Vec<RestingOrder> = self.get_open_orders();
2685 for order_info in &open_orders {
2686 let order = {
2687 let cache = self.cache.borrow();
2688 cache.order_owned(&order_info.client_order_id)
2689 };
2690
2691 if let Some(order) = order {
2692 self.cancel_order(&order, None);
2693 }
2694 }
2695 }
2696
2697 let instrument_id = self.instrument.id();
2698 let positions: Vec<(
2699 TraderId,
2700 StrategyId,
2701 AccountId,
2702 PositionId,
2703 OrderSide,
2704 Quantity,
2705 )> = {
2706 let cache = self.cache.borrow();
2707 cache
2708 .positions_open(None, Some(&instrument_id), None, None, None)
2709 .into_iter()
2710 .map(|pos| {
2711 (
2712 pos.trader_id,
2713 pos.strategy_id,
2714 pos.account_id,
2715 pos.id,
2716 OrderCore::closing_side(pos.side),
2717 pos.quantity,
2718 )
2719 })
2720 .collect()
2721 };
2722
2723 for (trader_id, strategy_id, account_id, position_id, closing_side, quantity) in positions {
2724 let has_price = if closing_side == OrderSide::Sell {
2726 self.best_bid_price().is_some() || self.settlement_price.is_some()
2727 } else {
2728 self.best_ask_price().is_some() || self.settlement_price.is_some()
2729 };
2730
2731 if !has_price {
2732 log::warn!(
2733 "LIQUIDATION: no price available for {instrument_id} position {position_id}, skipping"
2734 );
2735 continue;
2736 }
2737
2738 let client_order_id = ClientOrderId::from(
2739 format!("LIQUIDATION-{}-{}", self.venue, UUID4::new()).as_str(),
2740 );
2741 let order = OrderAny::Market(MarketOrder::new(
2742 trader_id,
2743 strategy_id,
2744 instrument_id,
2745 client_order_id,
2746 closing_side,
2747 quantity,
2748 TimeInForce::Ioc,
2749 UUID4::new(),
2750 ts_now,
2751 true, false,
2753 None,
2754 None,
2755 None,
2756 None,
2757 None,
2758 None,
2759 None,
2760 Some(vec![Ustr::from(&format!(
2761 "LIQUIDATION_{}_CLOSE",
2762 self.venue
2763 ))]),
2764 ));
2765
2766 let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
2767 {
2768 let mut cache = self.cache.borrow_mut();
2769 if let Err(e) = cache.add_order(order.clone(), Some(position_id), None, false) {
2770 log::debug!("Liquidation order already in cache: {e}");
2771 } else {
2772 drop(cache);
2773 self.publish_order_initialized(&order);
2774 self.cache
2775 .borrow_mut()
2776 .add_venue_order_id(&client_order_id, &venue_order_id, false)
2777 .ok();
2778 }
2779 }
2780
2781 self.account_ids.insert(trader_id, account_id);
2784 self.generate_order_submitted(&order, account_id);
2785 self.generate_order_accepted(&order, venue_order_id);
2786 self.fill_market_order(client_order_id);
2787 }
2788 }
2789
2790 pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
2799 if self.core.order_exists(order.client_order_id()) {
2801 return;
2802 }
2803
2804 let ts_now = self.clock.borrow().timestamp_ns();
2807 self.check_instrument_expiration(ts_now);
2808
2809 let reject_reason: Option<Ustr> = 'validate: {
2814 let cache_borrow = self.cache.as_ref().borrow();
2815
2816 self.account_ids.insert(order.trader_id(), account_id);
2818
2819 if self.pending_resolution {
2820 break 'validate Some(
2821 format!(
2822 "Contract {} has expired and is pending resolution",
2823 self.instrument.id()
2824 )
2825 .into(),
2826 );
2827 }
2828
2829 if self.instrument.has_expiration() {
2831 if let Some(activation_ns) = self.instrument.activation_ns()
2832 && self.clock.borrow().timestamp_ns() < activation_ns
2833 {
2834 break 'validate Some(
2835 format!(
2836 "Contract {} is not yet active, activation {activation_ns}",
2837 self.instrument.id(),
2838 )
2839 .into(),
2840 );
2841 }
2842
2843 if let Some(expiration_ns) = self.instrument.expiration_ns()
2844 && self.clock.borrow().timestamp_ns() >= expiration_ns
2845 {
2846 break 'validate Some(
2847 format!(
2848 "Contract {} has expired, expiration {expiration_ns}",
2849 self.instrument.id(),
2850 )
2851 .into(),
2852 );
2853 }
2854 }
2855
2856 if self.config.support_contingent_orders {
2858 if let Some(parent_order_id) = order.parent_order_id() {
2859 let parent_order = match cache_borrow.order(&parent_order_id) {
2860 Some(o) if o.contingency_type().unwrap() == ContingencyType::Oto => o,
2861 _ => panic!("OTO parent not found"),
2862 };
2863
2864 if parent_order.status() == OrderStatus::Rejected && order.is_open() {
2865 break 'validate Some(
2866 format!("Rejected OTO order from {parent_order_id}").into(),
2867 );
2868 } else if parent_order.status() == OrderStatus::Accepted
2869 || parent_order.status() == OrderStatus::Triggered
2870 || (self.config.oto_full_trigger
2871 && parent_order.status() == OrderStatus::PartiallyFilled)
2872 {
2873 log::info!(
2874 "Pending OTO order {} triggers from {parent_order_id}",
2875 order.client_order_id(),
2876 );
2877 return;
2878 }
2879 }
2880
2881 if let Some(linked_order_ids) = order.linked_order_ids() {
2882 for client_order_id in linked_order_ids {
2883 match cache_borrow.order(client_order_id) {
2884 Some(contingent_order)
2885 if (order.contingency_type().unwrap() == ContingencyType::Oco
2886 || order.contingency_type().unwrap()
2887 == ContingencyType::Ouo)
2888 && !order.is_closed()
2889 && contingent_order.is_closed() =>
2890 {
2891 break 'validate Some(
2892 format!("Contingent order {client_order_id} already closed")
2893 .into(),
2894 );
2895 }
2896 None => panic!("Cannot find contingent order for {client_order_id}"),
2897 _ => {}
2898 }
2899 }
2900 }
2901 }
2902
2903 if order.quantity().precision != self.instrument.size_precision() {
2905 break 'validate Some(
2906 format!(
2907 "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
2908 order.client_order_id(),
2909 order.quantity().precision,
2910 self.instrument.id(),
2911 self.instrument.size_precision()
2912 )
2913 .into(),
2914 );
2915 }
2916
2917 if let Some(price) = order.price()
2919 && price.precision != self.instrument.price_precision()
2920 {
2921 break 'validate Some(
2922 format!(
2923 "Invalid order price precision for order {}, was {} when {} price precision is {}",
2924 order.client_order_id(),
2925 price.precision,
2926 self.instrument.id(),
2927 self.instrument.price_precision()
2928 )
2929 .into(),
2930 );
2931 }
2932
2933 if let Some(trigger_price) = order.trigger_price()
2935 && trigger_price.precision != self.instrument.price_precision()
2936 {
2937 break 'validate Some(
2938 format!(
2939 "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
2940 order.client_order_id(),
2941 trigger_price.precision,
2942 self.instrument.id(),
2943 self.instrument.price_precision()
2944 )
2945 .into(),
2946 );
2947 }
2948
2949 let position = cache_borrow
2951 .position_for_order(&order.client_order_id())
2952 .or_else(|| {
2953 if self.oms_type == OmsType::Netting {
2954 let position_id = PositionId::new(
2955 format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
2956 );
2957 cache_borrow.position(&position_id)
2958 } else {
2959 None
2960 }
2961 });
2962
2963 if order.order_side() == OrderSide::Sell
2965 && self.account_type != AccountType::Margin
2966 && matches!(self.instrument, InstrumentAny::Equity(_))
2967 && position
2968 .as_ref()
2969 .is_none_or(|pos| !order.would_reduce_only(pos.side, pos.quantity))
2970 {
2971 let position_string = position
2972 .as_ref()
2973 .map_or("None".to_string(), |pos| pos.id.to_string());
2974 break 'validate Some(
2975 format!(
2976 "Short selling not permitted on a CASH account with position {position_string} and order {order}",
2977 )
2978 .into(),
2979 );
2980 }
2981
2982 if self.config.use_reduce_only
2984 && order.is_reduce_only()
2985 && !order.is_closed()
2986 && position.as_ref().is_none_or(|pos| {
2987 pos.is_closed()
2988 || (order.is_buy() && pos.is_long())
2989 || (order.is_sell() && pos.is_short())
2990 })
2991 {
2992 break 'validate Some(
2993 format!(
2994 "Reduce-only order {} ({}-{}) would have increased position",
2995 order.client_order_id(),
2996 order.order_type().to_string().to_uppercase(),
2997 order.order_side().to_string().to_uppercase()
2998 )
2999 .into(),
3000 );
3001 }
3002
3003 None
3004 };
3005
3006 if let Some(reason) = reject_reason {
3007 self.generate_order_rejected(order, reason);
3008 return;
3009 }
3010
3011 if order.is_quote_quantity()
3019 && !self.instrument.is_inverse()
3020 && !matches!(
3021 order.order_type(),
3022 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket,
3023 )
3024 && (order.price().is_some()
3025 || matches!(
3026 order.order_type(),
3027 OrderType::Market | OrderType::MarketToLimit,
3028 ))
3029 && !self.convert_quote_to_base_quantity(order)
3030 {
3031 return;
3032 }
3033
3034 match order.order_type() {
3035 OrderType::Market => self.process_market_order(order),
3036 OrderType::Limit => self.process_limit_order(order),
3037 OrderType::MarketToLimit => self.process_market_to_limit_order(order),
3038 OrderType::StopMarket => self.process_stop_market_order(order),
3039 OrderType::StopLimit => self.process_stop_limit_order(order),
3040 OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
3041 OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
3042 OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
3043 OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
3044 }
3045 }
3046
3047 fn convert_quote_to_base_quantity(&self, order: &mut OrderAny) -> bool {
3048 let reference_price = if let Some(price) = order.price() {
3052 Some(price)
3053 } else {
3054 match order.order_side() {
3055 OrderSide::Buy => self.core.ask,
3056 OrderSide::Sell => self.core.bid,
3057 OrderSide::NoOrderSide => None,
3058 }
3059 };
3060
3061 let Some(reference_price) = reference_price else {
3062 self.generate_order_rejected(
3063 order,
3064 format!(
3065 "No market for {} to convert quote quantity to base",
3066 order.instrument_id(),
3067 )
3068 .into(),
3069 );
3070 return false;
3071 };
3072
3073 let base_quantity = self
3074 .instrument
3075 .calculate_base_quantity(order.quantity(), reference_price);
3076
3077 let ts_now = self.clock.borrow().timestamp_ns();
3078 let event = OrderEventAny::Updated(OrderUpdated::new(
3079 order.trader_id(),
3080 order.strategy_id(),
3081 order.instrument_id(),
3082 order.client_order_id(),
3083 base_quantity,
3084 UUID4::new(),
3085 ts_now,
3086 ts_now,
3087 false,
3088 order.venue_order_id(),
3089 order.account_id(),
3090 None,
3091 None,
3092 None,
3093 false,
3094 ));
3095
3096 if let Err(e) = order.apply(event.clone()) {
3100 log::error!(
3101 "Failed to apply quote-to-base update for {}: {e}",
3102 order.client_order_id(),
3103 );
3104 return false;
3105 }
3106 self.dispatch_order_event(event);
3107 true
3108 }
3109
3110 pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
3112 if !self.core.order_exists(command.client_order_id) {
3113 self.generate_order_modify_rejected(
3114 command.trader_id,
3115 command.strategy_id,
3116 command.instrument_id,
3117 command.client_order_id,
3118 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
3119 command.venue_order_id,
3120 Some(account_id),
3121 );
3122 return;
3123 }
3124
3125 let mut order = match self
3126 .cache
3127 .borrow()
3128 .order(&command.client_order_id)
3129 .map(|o| o.clone())
3130 {
3131 Some(order) => order,
3132 None => {
3133 log::error!(
3134 "Cannot modify order: order {} not found in cache",
3135 command.client_order_id
3136 );
3137 return;
3138 }
3139 };
3140
3141 let update_success = self.update_order(
3142 &mut order,
3143 command.quantity,
3144 command.price,
3145 command.trigger_price,
3146 None,
3147 );
3148
3149 if !update_success {
3150 return;
3151 }
3152
3153 let Some(refreshed) = self.resync_core_entry(command.client_order_id) else {
3155 return;
3156 };
3157
3158 let price_changed = refreshed.price() != order.price()
3160 || refreshed.trigger_price() != order.trigger_price();
3161
3162 if price_changed
3163 && refreshed.is_open()
3164 && self.config.queue_position
3165 && let Some(new_price) = refreshed.price()
3166 {
3167 self.snapshot_queue_position(&refreshed, new_price);
3168 self.queue_excess.swap_remove(&refreshed.client_order_id());
3169 }
3170 }
3171
3172 pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
3174 if !self.core.order_exists(command.client_order_id) {
3175 self.generate_order_cancel_rejected(
3176 command.trader_id,
3177 command.strategy_id,
3178 account_id,
3179 command.instrument_id,
3180 command.client_order_id,
3181 command.venue_order_id,
3182 Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
3183 );
3184 return;
3185 }
3186
3187 let order = match self
3188 .cache
3189 .borrow()
3190 .order(&command.client_order_id)
3191 .map(|o| o.clone())
3192 {
3193 Some(order) => order,
3194 None => {
3195 log::error!(
3196 "Cannot cancel order: order {} not found in cache",
3197 command.client_order_id
3198 );
3199 return;
3200 }
3201 };
3202
3203 if !order.is_inflight() && !order.is_open() {
3204 self.purge_stale_core_entry(command.client_order_id);
3205 return;
3206 }
3207
3208 self.cancel_order(&order, None);
3209 }
3210
3211 pub fn process_cancel_all(&mut self, command: &CancelAllOrders, _account_id: AccountId) {
3213 let instrument_id = command.instrument_id;
3214 let order_side = if command.order_side == OrderSide::NoOrderSide {
3215 None
3216 } else {
3217 Some(command.order_side)
3218 };
3219
3220 let client_order_ids: Vec<ClientOrderId> = self
3221 .cache
3222 .borrow()
3223 .orders_open(None, Some(&instrument_id), None, None, order_side)
3224 .iter()
3225 .map(|o| o.client_order_id())
3226 .collect();
3227
3228 for client_order_id in client_order_ids {
3229 let order = match self
3230 .cache
3231 .borrow()
3232 .order(&client_order_id)
3233 .map(|o| o.clone())
3234 {
3235 Some(order) => order,
3236 None => continue,
3237 };
3238
3239 if !order.is_inflight() && !order.is_open() {
3240 self.purge_stale_core_entry(client_order_id);
3241 continue;
3242 }
3243
3244 self.cancel_order(&order, None);
3245 }
3246 }
3247
3248 fn purge_stale_core_entry(&mut self, client_order_id: ClientOrderId) {
3251 if self.core.order_exists(client_order_id) {
3252 let _ = self.core.delete_order(client_order_id);
3253 }
3254 self.cached_filled_qty.swap_remove(&client_order_id);
3255 }
3256
3257 fn resync_core_entry(&mut self, client_order_id: ClientOrderId) -> Option<OrderAny> {
3258 let order = self
3259 .cache
3260 .borrow()
3261 .order(&client_order_id)
3262 .map(|o| o.clone())?;
3263
3264 if order.is_closed() {
3267 let _ = self.core.delete_order(client_order_id);
3268 return Some(order);
3269 }
3270
3271 let new_match_info = Self::matching_core_entry(&order);
3272
3273 let unchanged = self
3275 .core
3276 .get_order(client_order_id)
3277 .is_some_and(|existing| *existing == new_match_info);
3278
3279 if unchanged {
3280 return Some(order);
3281 }
3282
3283 let _ = self.core.delete_order(client_order_id);
3284 self.core.add_order(new_match_info);
3285 Some(order)
3286 }
3287
3288 pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
3290 for order in &command.cancels {
3291 self.process_cancel(order, account_id);
3292 }
3293 }
3294
3295 fn process_market_order(&mut self, order: &OrderAny) {
3296 if order.time_in_force() == TimeInForce::AtTheOpen
3297 || order.time_in_force() == TimeInForce::AtTheClose
3298 {
3299 self.generate_order_rejected(
3300 order,
3301 format!(
3302 "time in force {} is not currently supported",
3303 order.time_in_force()
3304 )
3305 .into(),
3306 );
3307 return;
3308 }
3309
3310 if (order.order_side() == OrderSide::Buy && self.core.ask.is_none())
3312 || (order.order_side() == OrderSide::Sell && self.core.bid.is_none())
3313 {
3314 self.generate_order_rejected(
3315 order,
3316 format!("No market for {}", order.instrument_id()).into(),
3317 );
3318 return;
3319 }
3320
3321 if self.config.use_market_order_acks {
3322 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3323 self.generate_order_accepted(order, venue_order_id);
3324 }
3325
3326 if let Err(e) = self
3328 .cache
3329 .borrow_mut()
3330 .add_order(order.clone(), None, None, false)
3331 {
3332 log::debug!("Order already in cache: {e}");
3333 }
3334
3335 self.fill_market_order(order.client_order_id());
3336 }
3337
3338 fn process_limit_order(&mut self, order: &mut OrderAny) {
3339 if order.time_in_force() == TimeInForce::AtTheOpen
3340 || order.time_in_force() == TimeInForce::AtTheClose
3341 {
3342 self.generate_order_rejected(
3343 order,
3344 format!(
3345 "time in force {} is not currently supported",
3346 order.time_in_force()
3347 )
3348 .into(),
3349 );
3350 return;
3351 }
3352
3353 let limit_px = order.price().expect("Limit order must have a price");
3354 if order.is_post_only()
3355 && self
3356 .core
3357 .is_limit_matched(order.order_side_specified(), limit_px)
3358 {
3359 self.generate_order_rejected(
3360 order,
3361 format!(
3362 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
3363 order.order_type(),
3364 order.order_side(),
3365 order.price().unwrap(),
3366 self.core
3367 .bid
3368 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3369 self.core
3370 .ask
3371 .map_or_else(|| "None".to_string(), |p| p.to_string())
3372 )
3373 .into(),
3374 );
3375 return;
3376 }
3377
3378 self.accept_order(order);
3380
3381 if self
3383 .core
3384 .is_limit_matched(order.order_side_specified(), limit_px)
3385 {
3386 order.set_liquidity_side(LiquiditySide::Taker);
3388
3389 if self
3390 .cache
3391 .borrow_mut()
3392 .add_order(order.clone(), None, None, false)
3393 .is_err()
3394 && let Err(e) = self.cache.borrow_mut().replace_order(order)
3395 {
3396 log::debug!("Failed to update order in cache: {e}");
3397 }
3398 self.fill_limit_order(order.client_order_id());
3399
3400 if self.core.order_exists(order.client_order_id())
3403 && let Some(mut order) = self.cache.borrow_mut().order_mut(&order.client_order_id())
3404 {
3405 order.set_liquidity_side(LiquiditySide::Maker);
3406 }
3407 } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
3408 self.cancel_order(order, None);
3409 } else {
3410 order.set_liquidity_side(LiquiditySide::Maker);
3412
3413 if let Some(price) = order.price() {
3414 self.snapshot_queue_position(order, price);
3415 }
3416
3417 let add_result = self
3418 .cache
3419 .borrow_mut()
3420 .add_order(order.clone(), None, None, false);
3421
3422 if let Err(e) = add_result {
3423 log::debug!("Failed to add order to cache: {e}");
3424
3425 if let Some(mut order) = self.cache.borrow_mut().order_mut(&order.client_order_id())
3428 && !matches!(
3429 order.liquidity_side(),
3430 Some(LiquiditySide::Maker | LiquiditySide::Taker)
3431 )
3432 {
3433 order.set_liquidity_side(LiquiditySide::Maker);
3434 }
3435 }
3436 }
3437 }
3438
3439 fn process_market_to_limit_order(&mut self, order: &OrderAny) {
3440 if (order.order_side() == OrderSide::Buy && self.core.ask.is_none())
3442 || (order.order_side() == OrderSide::Sell && self.core.bid.is_none())
3443 {
3444 self.generate_order_rejected(
3445 order,
3446 format!("No market for {}", order.instrument_id()).into(),
3447 );
3448 return;
3449 }
3450
3451 if self.config.use_market_order_acks {
3452 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3453 self.generate_order_accepted(order, venue_order_id);
3454 }
3455
3456 if let Err(e) = self
3458 .cache
3459 .borrow_mut()
3460 .add_order(order.clone(), None, None, false)
3461 {
3462 log::debug!("Order already in cache: {e}");
3463 }
3464 let client_order_id = order.client_order_id();
3465 self.fill_market_order(client_order_id);
3466
3467 let filled_qty = self
3469 .cached_filled_qty
3470 .get(&client_order_id)
3471 .copied()
3472 .unwrap_or_default();
3473 let leaves_qty = order.quantity().saturating_sub(filled_qty);
3474 if leaves_qty.is_zero() {
3475 self.purge_cached_filled_qty_if_closed(client_order_id);
3476 return;
3477 }
3478
3479 let updated_order = self
3480 .cache
3481 .borrow()
3482 .order(&client_order_id)
3483 .map(|o| o.clone());
3484 if let Some(mut updated_order) = updated_order {
3485 self.accept_order(&mut updated_order);
3486 }
3487 }
3488
3489 fn process_stop_market_order(&mut self, order: &mut OrderAny) {
3490 let stop_px = order
3491 .trigger_price()
3492 .expect("Stop order must have a trigger price");
3493
3494 if self
3495 .core
3496 .is_stop_matched(order.order_side_specified(), stop_px)
3497 {
3498 if self.config.reject_stop_orders {
3499 self.generate_order_rejected(
3500 order,
3501 format!(
3502 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3503 order.order_type(),
3504 order.order_side(),
3505 order.trigger_price().unwrap(),
3506 self.core
3507 .bid
3508 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3509 self.core
3510 .ask
3511 .map_or_else(|| "None".to_string(), |p| p.to_string())
3512 ).into(),
3513 );
3514 return;
3515 }
3516
3517 if let Err(e) = self
3518 .cache
3519 .borrow_mut()
3520 .add_order(order.clone(), None, None, false)
3521 {
3522 log::debug!("Order already in cache: {e}");
3523 }
3524 self.fill_market_order(order.client_order_id());
3525 return;
3526 }
3527
3528 self.accept_order(order);
3530
3531 order.set_liquidity_side(LiquiditySide::Maker);
3533
3534 if let Err(e) = self
3535 .cache
3536 .borrow_mut()
3537 .add_order(order.clone(), None, None, false)
3538 {
3539 log::debug!("Order already in cache: {e}");
3540 }
3541 }
3542
3543 fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
3544 let stop_px = order
3545 .trigger_price()
3546 .expect("Stop order must have a trigger price");
3547
3548 if self
3549 .core
3550 .is_stop_matched(order.order_side_specified(), stop_px)
3551 {
3552 if self.config.reject_stop_orders {
3553 self.generate_order_rejected(
3554 order,
3555 format!(
3556 "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3557 order.order_type(),
3558 order.order_side(),
3559 order.trigger_price().unwrap(),
3560 self.core
3561 .bid
3562 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3563 self.core
3564 .ask
3565 .map_or_else(|| "None".to_string(), |p| p.to_string())
3566 ).into(),
3567 );
3568 return;
3569 }
3570
3571 self.accept_order(order);
3572 self.generate_order_triggered(order);
3573
3574 let limit_px = order.price().expect("Stop limit order must have a price");
3576
3577 if self
3578 .core
3579 .is_limit_matched(order.order_side_specified(), limit_px)
3580 {
3581 order.set_liquidity_side(LiquiditySide::Taker);
3582
3583 if let Err(e) = self
3584 .cache
3585 .borrow_mut()
3586 .add_order(order.clone(), None, None, false)
3587 {
3588 log::debug!("Order already in cache: {e}");
3589 }
3590 self.fill_limit_order(order.client_order_id());
3591 }
3592
3593 return;
3595 }
3596
3597 self.accept_order(order);
3598
3599 order.set_liquidity_side(LiquiditySide::Maker);
3601
3602 if let Err(e) = self
3603 .cache
3604 .borrow_mut()
3605 .add_order(order.clone(), None, None, false)
3606 {
3607 log::debug!("Order already in cache: {e}");
3608 }
3609 }
3610
3611 fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
3612 if self
3613 .core
3614 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
3615 {
3616 if self.config.reject_stop_orders {
3617 self.generate_order_rejected(
3618 order,
3619 format!(
3620 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3621 order.order_type(),
3622 order.order_side(),
3623 order.trigger_price().unwrap(),
3624 self.core
3625 .bid
3626 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3627 self.core
3628 .ask
3629 .map_or_else(|| "None".to_string(), |p| p.to_string())
3630 ).into(),
3631 );
3632 return;
3633 }
3634
3635 if let Err(e) = self
3636 .cache
3637 .borrow_mut()
3638 .add_order(order.clone(), None, None, false)
3639 {
3640 log::debug!("Order already in cache: {e}");
3641 }
3642 self.fill_market_order(order.client_order_id());
3643 return;
3644 }
3645
3646 self.accept_order(order);
3648
3649 order.set_liquidity_side(LiquiditySide::Maker);
3651
3652 if let Err(e) = self
3653 .cache
3654 .borrow_mut()
3655 .add_order(order.clone(), None, None, false)
3656 {
3657 log::debug!("Order already in cache: {e}");
3658 }
3659 }
3660
3661 fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
3662 if self
3663 .core
3664 .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
3665 {
3666 if self.config.reject_stop_orders {
3667 self.generate_order_rejected(
3668 order,
3669 format!(
3670 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3671 order.order_type(),
3672 order.order_side(),
3673 order.trigger_price().unwrap(),
3674 self.core
3675 .bid
3676 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3677 self.core
3678 .ask
3679 .map_or_else(|| "None".to_string(), |p| p.to_string())
3680 ).into(),
3681 );
3682 return;
3683 }
3684 self.accept_order(order);
3685 self.generate_order_triggered(order);
3686
3687 if self
3689 .core
3690 .is_limit_matched(order.order_side_specified(), order.price().unwrap())
3691 {
3692 order.set_liquidity_side(LiquiditySide::Taker);
3693
3694 if let Err(e) = self
3695 .cache
3696 .borrow_mut()
3697 .add_order(order.clone(), None, None, false)
3698 {
3699 log::debug!("Order already in cache: {e}");
3700 }
3701 self.fill_limit_order(order.client_order_id());
3702 }
3703 return;
3704 }
3705
3706 self.accept_order(order);
3708
3709 order.set_liquidity_side(LiquiditySide::Maker);
3711
3712 if let Err(e) = self
3713 .cache
3714 .borrow_mut()
3715 .add_order(order.clone(), None, None, false)
3716 {
3717 log::debug!("Order already in cache: {e}");
3718 }
3719 }
3720
3721 fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
3722 if let Some(trigger_price) = order.trigger_price()
3723 && self
3724 .core
3725 .is_stop_matched(order.order_side_specified(), trigger_price)
3726 {
3727 self.generate_order_rejected(
3728 order,
3729 format!(
3730 "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3731 order.order_type(),
3732 order.order_side(),
3733 trigger_price,
3734 self.core
3735 .bid
3736 .map_or_else(|| "None".to_string(), |p| p.to_string()),
3737 self.core
3738 .ask
3739 .map_or_else(|| "None".to_string(), |p| p.to_string())
3740 ).into(),
3741 );
3742 return;
3743 }
3744
3745 order.set_liquidity_side(LiquiditySide::Maker);
3749
3750 self.accept_order(order);
3751
3752 if let Err(e) = self
3753 .cache
3754 .borrow_mut()
3755 .add_order(order.clone(), None, None, false)
3756 {
3757 log::debug!("Order already in cache: {e}");
3758 }
3759 }
3760
3761 pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
3768 self.purge_closed_cached_filled_qty();
3770
3771 if aggressor_side == AggressorSide::NoAggressor && self.last_trade_size.is_none() {
3777 if let Some(bid) = self.book.best_bid_price() {
3778 self.core.set_bid_raw(bid);
3779 }
3780
3781 if let Some(ask) = self.book.best_ask_price() {
3782 self.core.set_ask_raw(ask);
3783 }
3784 }
3785
3786 for action in self.core.iterate_bids() {
3789 match action {
3790 MatchAction::FillLimit(id) => self.fill_limit_order(id),
3791 MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
3792 }
3793 }
3794
3795 for action in self.core.iterate_asks() {
3796 match action {
3797 MatchAction::FillLimit(id) => self.fill_limit_order(id),
3798 MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
3799 }
3800 }
3801
3802 let order_ids: Vec<ClientOrderId> =
3803 self.core.iter_orders().map(|m| m.client_order_id).collect();
3804
3805 for client_order_id in order_ids {
3806 let order = match self
3807 .cache
3808 .borrow()
3809 .order(&client_order_id)
3810 .map(|o| o.clone())
3811 {
3812 Some(order) => order,
3813 None => continue,
3814 };
3815
3816 if order.is_closed() {
3817 let _ = self.core.delete_order(client_order_id);
3818 self.cached_filled_qty.swap_remove(&client_order_id);
3819 continue;
3820 }
3821
3822 if self.config.support_gtd_orders
3823 && let Some(expire_ns) = order.expire_time()
3824 && timestamp_ns >= expire_ns
3825 {
3826 let _ = self.core.delete_order(client_order_id);
3827 self.cached_filled_qty.swap_remove(&client_order_id);
3828 self.expire_order(&order);
3829 continue;
3830 }
3831
3832 if matches!(
3833 order.order_type(),
3834 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
3835 ) {
3836 let mut any = order;
3837 if self.maybe_activate_trailing_stop(
3838 &mut any,
3839 self.core.bid,
3840 self.core.ask,
3841 self.core.last,
3842 ) {
3843 self.update_trailing_stop_order(&any);
3844 self.resync_core_entry(client_order_id);
3845 }
3846 }
3847
3848 if self.target_bid.is_some() || self.target_ask.is_some() || self.target_last.is_some()
3851 {
3852 if let Some(t) = self.target_bid.take() {
3853 self.core.bid = Some(t);
3854 }
3855
3856 if let Some(t) = self.target_ask.take() {
3857 self.core.ask = Some(t);
3858 }
3859
3860 if let Some(t) = self.target_last.take() {
3861 self.core.last = Some(t);
3862 }
3863 }
3864 }
3865
3866 if let Some(t) = self.target_bid.take() {
3870 self.core.bid = Some(t);
3871 }
3872
3873 if let Some(t) = self.target_ask.take() {
3874 self.core.ask = Some(t);
3875 }
3876
3877 if let Some(t) = self.target_last.take() {
3878 self.core.last = Some(t);
3879 }
3880
3881 self.core.bid = self.book.best_bid_price();
3884 self.core.ask = self.book.best_ask_price();
3885
3886 self.check_instrument_expiration(timestamp_ns);
3889 self.purge_closed_cached_filled_qty();
3890 }
3891
3892 fn get_trailing_activation_price(
3893 &self,
3894 trigger_type: TriggerType,
3895 order_side: OrderSide,
3896 bid: Option<Price>,
3897 ask: Option<Price>,
3898 last: Option<Price>,
3899 ) -> Option<Price> {
3900 match trigger_type {
3901 TriggerType::LastPrice => last,
3902 TriggerType::LastOrBidAsk => last.or(match order_side {
3903 OrderSide::Buy => ask,
3904 OrderSide::Sell => bid,
3905 _ => None,
3906 }),
3907 _ => match order_side {
3909 OrderSide::Buy => ask,
3910 OrderSide::Sell => bid,
3911 _ => None,
3912 },
3913 }
3914 }
3915
3916 fn maybe_activate_trailing_stop(
3917 &self,
3918 order: &mut OrderAny,
3919 bid: Option<Price>,
3920 ask: Option<Price>,
3921 last: Option<Price>,
3922 ) -> bool {
3923 match order {
3924 OrderAny::TrailingStopMarket(inner) => {
3925 if inner.is_activated {
3926 return true;
3927 }
3928
3929 if inner.activation_price.is_none() {
3930 let px = self.get_trailing_activation_price(
3931 inner.trigger_type,
3932 inner.order_side(),
3933 bid,
3934 ask,
3935 last,
3936 );
3937
3938 if let Some(p) = px {
3939 inner.activation_price = Some(p);
3940 inner.set_activated();
3941
3942 if let Err(e) = self.cache.borrow_mut().replace_order(order) {
3943 log::error!("Failed to update order: {e}");
3944 }
3945 return true;
3946 }
3947 return false;
3948 }
3949
3950 let activation_price = inner.activation_price.unwrap();
3951 let hit = match inner.order_side() {
3952 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
3953 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
3954 _ => false,
3955 };
3956
3957 if hit {
3958 inner.set_activated();
3959
3960 if let Err(e) = self.cache.borrow_mut().replace_order(order) {
3961 log::error!("Failed to update order: {e}");
3962 }
3963 }
3964 hit
3965 }
3966 OrderAny::TrailingStopLimit(inner) => {
3967 if inner.is_activated {
3968 return true;
3969 }
3970
3971 if inner.activation_price.is_none() {
3972 let px = self.get_trailing_activation_price(
3973 inner.trigger_type,
3974 inner.order_side(),
3975 bid,
3976 ask,
3977 last,
3978 );
3979
3980 if let Some(p) = px {
3981 inner.activation_price = Some(p);
3982 inner.set_activated();
3983
3984 if let Err(e) = self.cache.borrow_mut().replace_order(order) {
3985 log::error!("Failed to update order: {e}");
3986 }
3987 return true;
3988 }
3989 return false;
3990 }
3991
3992 let activation_price = inner.activation_price.unwrap();
3993 let hit = match inner.order_side() {
3994 OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
3995 OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
3996 _ => false,
3997 };
3998
3999 if hit {
4000 inner.set_activated();
4001
4002 if let Err(e) = self.cache.borrow_mut().replace_order(order) {
4003 log::error!("Failed to update order: {e}");
4004 }
4005 }
4006 hit
4007 }
4008 _ => true,
4009 }
4010 }
4011
4012 fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
4013 match order.price() {
4014 Some(order_price) => {
4015 let mut fills = if self.config.liquidity_consumption {
4020 let size_prec = self.instrument.size_precision();
4021 self.book
4022 .get_all_crossed_levels(order.order_side(), order_price, size_prec)
4023 } else {
4024 let book_order =
4025 BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
4026 self.book.simulate_fills(&book_order)
4027 };
4028
4029 if let Some(trade_size) = self.last_trade_size
4031 && let Some(trade_price) = self.core.last
4032 {
4033 let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
4034
4035 if !fills_at_trade_price
4036 && self
4037 .core
4038 .is_limit_matched(order.order_side_specified(), order_price)
4039 {
4040 let leaves_qty = order.leaves_qty();
4043 let available_qty = if self.config.liquidity_consumption {
4044 let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
4045 Quantity::from_raw(remaining, trade_size.precision)
4046 } else {
4047 trade_size
4048 };
4049
4050 let fill_qty = min(leaves_qty, available_qty);
4051
4052 if !fill_qty.is_zero() {
4053 log::debug!(
4054 "Trade execution fill: {} @ {} (trade_price={}, available: {}, book had {} fills)",
4055 fill_qty,
4056 order_price,
4057 trade_price,
4058 available_qty,
4059 fills.len()
4060 );
4061
4062 if self.config.liquidity_consumption {
4063 self.trade_consumption += fill_qty.raw;
4064 }
4065
4066 return vec![(order_price, fill_qty)];
4071 }
4072 }
4073 }
4074
4075 if fills.is_empty() {
4077 return fills;
4078 }
4079
4080 let book_prices: Vec<Price> = if self.config.liquidity_consumption {
4084 fills.iter().map(|(px, _)| *px).collect()
4085 } else {
4086 Vec::new()
4087 };
4088 let book_prices_ref: Option<&[Price]> = if book_prices.is_empty() {
4089 None
4090 } else {
4091 Some(&book_prices)
4092 };
4093
4094 if let Some(triggered_price) = order.trigger_price() {
4096 if order
4098 .liquidity_side()
4099 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
4100 {
4101 if order.order_side() == OrderSide::Sell && order_price > triggered_price {
4102 let first_fill = fills.first().unwrap();
4104 let triggered_qty = first_fill.1;
4105 fills[0] = (triggered_price, triggered_qty);
4106 self.target_bid = self.core.bid;
4107 self.target_ask = self.core.ask;
4108 self.target_last = self.core.last;
4109 self.core.set_ask_raw(order_price);
4110 self.core.set_last_raw(order_price);
4111 } else if order.order_side() == OrderSide::Buy
4112 && order_price < triggered_price
4113 {
4114 let first_fill = fills.first().unwrap();
4116 let triggered_qty = first_fill.1;
4117 fills[0] = (triggered_price, triggered_qty);
4118 self.target_bid = self.core.bid;
4119 self.target_ask = self.core.ask;
4120 self.target_last = self.core.last;
4121 self.core.set_bid_raw(order_price);
4122 self.core.set_last_raw(order_price);
4123 }
4124 }
4125 }
4126
4127 if order
4129 .liquidity_side()
4130 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
4131 {
4132 match order.order_side().as_specified() {
4133 OrderSideSpecified::Buy => {
4134 let target_price = if order
4135 .trigger_price()
4136 .is_some_and(|trigger_price| order_price > trigger_price)
4137 {
4138 order.trigger_price().unwrap()
4139 } else {
4140 order_price
4141 };
4142
4143 for fill in &mut fills {
4144 let last_px = fill.0;
4145 if last_px < order_price {
4146 self.target_bid = self.core.bid;
4148 self.target_ask = self.core.ask;
4149 self.target_last = self.core.last;
4150 self.core.set_ask_raw(target_price);
4151 self.core.set_last_raw(target_price);
4152 fill.0 = target_price;
4153 }
4154 }
4155 }
4156 OrderSideSpecified::Sell => {
4157 let target_price = if order
4158 .trigger_price()
4159 .is_some_and(|trigger_price| order_price < trigger_price)
4160 {
4161 order.trigger_price().unwrap()
4162 } else {
4163 order_price
4164 };
4165
4166 for fill in &mut fills {
4167 let last_px = fill.0;
4168 if last_px > order_price {
4169 self.target_bid = self.core.bid;
4171 self.target_ask = self.core.ask;
4172 self.target_last = self.core.last;
4173 self.core.set_bid_raw(target_price);
4174 self.core.set_last_raw(target_price);
4175 fill.0 = target_price;
4176 }
4177 }
4178 }
4179 }
4180 }
4181
4182 self.apply_liquidity_consumption(
4183 fills,
4184 order.order_side(),
4185 order.leaves_qty(),
4186 book_prices_ref,
4187 )
4188 }
4189 None => panic!("Limit order must have a price"),
4190 }
4191 }
4192
4193 fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
4194 let price = match order.order_side().as_specified() {
4195 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
4196 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
4197 };
4198
4199 let mut fills = if self.config.liquidity_consumption {
4202 let size_prec = self.instrument.size_precision();
4203 self.book
4204 .get_all_crossed_levels(order.order_side(), price, size_prec)
4205 } else {
4206 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
4207 self.book.simulate_fills(&book_order)
4208 };
4209
4210 if !self.fill_at_market
4213 && self.book_type == BookType::L1_MBP
4214 && !fills.is_empty()
4215 && matches!(
4216 order.order_type(),
4217 OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
4218 )
4219 && let Some(trigger_price) = order.trigger_price()
4220 {
4221 fills[0] = (trigger_price, fills[0].1);
4222
4223 let mut remaining_qty = order.leaves_qty().raw;
4225 let mut capped_fills = Vec::with_capacity(fills.len());
4226
4227 for (price, qty) in fills {
4228 if remaining_qty == 0 {
4229 break;
4230 }
4231
4232 let capped_qty_raw = min(qty.raw, remaining_qty);
4233 if capped_qty_raw == 0 {
4234 continue;
4235 }
4236
4237 remaining_qty -= capped_qty_raw;
4238 capped_fills.push((price, Quantity::from_raw(capped_qty_raw, qty.precision)));
4239 }
4240
4241 return capped_fills;
4242 }
4243
4244 fills
4245 }
4246
4247 fn determine_market_fill_model_price_and_volume(
4248 &mut self,
4249 order: &OrderAny,
4250 ) -> (Vec<(Price, Quantity)>, bool) {
4251 if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
4252 && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
4253 &self.instrument,
4254 order,
4255 best_bid,
4256 best_ask,
4257 )
4258 {
4259 let price = match order.order_side().as_specified() {
4260 OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
4261 OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
4262 };
4263 let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
4264 let fills = book.simulate_fills(&book_order);
4265 if !fills.is_empty() {
4266 return (fills, true);
4267 }
4268 }
4269 (self.determine_market_price_and_volume(order), false)
4270 }
4271
4272 fn determine_limit_fill_model_price_and_volume(
4273 &mut self,
4274 order: &OrderAny,
4275 ) -> Vec<(Price, Quantity)> {
4276 if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
4277 && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
4278 &self.instrument,
4279 order,
4280 best_bid,
4281 best_ask,
4282 )
4283 && let Some(limit_price) = order.price()
4284 {
4285 let book_order = BookOrder::new(order.order_side(), limit_price, order.quantity(), 0);
4286 let fills = book.simulate_fills(&book_order);
4287 if !fills.is_empty() {
4288 return fills;
4289 }
4290 }
4291 self.determine_limit_price_and_volume(order)
4292 }
4293
4294 pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
4299 let mut order = match self
4300 .cache
4301 .borrow()
4302 .order(&client_order_id)
4303 .map(|o| o.clone())
4304 {
4305 Some(order) => order,
4306 None => {
4307 log::error!("Cannot fill market order: order {client_order_id} not found in cache");
4308 return;
4309 }
4310 };
4311
4312 if order.is_closed() {
4313 self.purge_stale_core_entry(client_order_id);
4314 return;
4315 }
4316
4317 if order.is_quote_quantity()
4321 && !self.instrument.is_inverse()
4322 && !self.convert_quote_to_base_quantity(&mut order)
4323 {
4324 return;
4325 }
4326
4327 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
4328 && filled_qty >= &order.quantity()
4329 {
4330 log::debug!(
4331 "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
4332 filled_qty,
4333 order.quantity(),
4334 order.filled_qty(),
4335 order.quantity()
4336 );
4337 return;
4338 }
4339
4340 let venue_position_id = self.ids_generator.get_position_id(&order, Some(true));
4341 let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
4342 let cache = self.cache.as_ref().borrow();
4343 cache.position_owned(&venue_position_id)
4344 } else {
4345 None
4346 };
4347
4348 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
4349 log::warn!(
4350 "Canceling REDUCE_ONLY {} as would increase position",
4351 order.order_type()
4352 );
4353 self.cancel_order(&order, None);
4354 return;
4355 }
4356
4357 order.set_liquidity_side(LiquiditySide::Taker);
4358 let (mut fills, from_synthetic) = self.determine_market_fill_model_price_and_volume(&order);
4359
4360 let protection_price: Option<Price> = if let Some(protection_points) =
4362 self.config.price_protection_points
4363 && matches!(
4364 order.order_type(),
4365 OrderType::Market | OrderType::StopMarket
4366 ) {
4367 protection_price_calculate(
4368 self.instrument.price_increment(),
4369 &order,
4370 protection_points,
4371 self.core.bid,
4372 self.core.ask,
4373 )
4374 .ok()
4375 } else {
4376 None
4377 };
4378
4379 if let Some(protection_price) = protection_price {
4380 fills = self.filter_fills_by_protection(fills, &order, protection_price);
4381 }
4382
4383 let is_trigger_price_fill = !self.fill_at_market
4386 && self.book_type == BookType::L1_MBP
4387 && matches!(
4388 order.order_type(),
4389 OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
4390 )
4391 && order.trigger_price().is_some();
4392
4393 if !from_synthetic && !is_trigger_price_fill {
4394 fills = self.apply_liquidity_consumption(
4395 fills,
4396 order.order_side(),
4397 order.leaves_qty(),
4398 None,
4399 );
4400 }
4401
4402 self.apply_fills(
4403 &order,
4404 &fills,
4405 LiquiditySide::Taker,
4406 None,
4407 position.as_ref(),
4408 protection_price,
4409 );
4410 }
4411
4412 fn filter_fills_by_protection(
4413 &self,
4414 fills: Vec<(Price, Quantity)>,
4415 order: &OrderAny,
4416 protection_price: Price,
4417 ) -> Vec<(Price, Quantity)> {
4418 let protection_raw = protection_price.raw;
4419 fills
4420 .into_iter()
4421 .filter(|(fill_price, _)| {
4422 match order.order_side() {
4423 OrderSide::Buy => fill_price.raw <= protection_raw,
4425 OrderSide::Sell => fill_price.raw >= protection_raw,
4427 OrderSide::NoOrderSide => false,
4428 }
4429 })
4430 .collect()
4431 }
4432
4433 pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
4442 let mut order = match self
4443 .cache
4444 .borrow()
4445 .order(&client_order_id)
4446 .map(|o| o.clone())
4447 {
4448 Some(order) => order,
4449 None => {
4450 log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
4451 return;
4452 }
4453 };
4454
4455 if order.is_closed() {
4456 self.purge_stale_core_entry(client_order_id);
4457 return;
4458 }
4459
4460 if order.is_quote_quantity()
4464 && !self.instrument.is_inverse()
4465 && !self.convert_quote_to_base_quantity(&mut order)
4466 {
4467 return;
4468 }
4469
4470 match order.price() {
4471 Some(order_price) => {
4472 let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
4473 if let Some(&qty) = cached_filled_qty
4474 && qty >= order.quantity()
4475 {
4476 log::debug!(
4477 "Ignoring fill as already filled pending application of events: {}, {}, {}, {}",
4478 qty,
4479 order.quantity(),
4480 order.filled_qty(),
4481 order.leaves_qty(),
4482 );
4483 return;
4484 }
4485
4486 if order
4488 .liquidity_side()
4489 .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
4490 {
4491 let at_limit = if self.last_trade_size.is_some() && self.core.last.is_some() {
4494 self.core.last.is_some_and(|last| last == order_price)
4495 } else if order.order_side() == OrderSide::Buy {
4496 self.core.bid.is_some_and(|bid| bid == order_price)
4497 } else {
4498 self.core.ask.is_some_and(|ask| ask == order_price)
4499 };
4500
4501 if at_limit && !self.fill_model.is_limit_filled() {
4502 return; }
4504 }
4505
4506 let queue_allowed_raw = if self.config.queue_position {
4507 match self.determine_trade_fill_qty(&order) {
4508 None | Some(0) => {
4509 if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc)
4510 {
4511 self.cancel_order(&order, None);
4512 }
4513 return;
4514 }
4515 Some(allowed) => Some(allowed),
4516 }
4517 } else {
4518 None
4519 };
4520
4521 let venue_position_id = self.ids_generator.get_position_id(&order, None);
4522 let position = if let Some(venue_position_id) = venue_position_id {
4523 let cache = self.cache.as_ref().borrow();
4524 cache.position_owned(&venue_position_id)
4525 } else {
4526 None
4527 };
4528
4529 if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
4530 log::warn!(
4531 "Canceling REDUCE_ONLY {} as would increase position",
4532 order.order_type()
4533 );
4534 self.cancel_order(&order, None);
4535 return;
4536 }
4537
4538 let tc_before = self.trade_consumption;
4539 let mut fills = self.determine_limit_fill_model_price_and_volume(&order);
4540
4541 if let Some(allowed_raw) = queue_allowed_raw {
4542 let size_prec = self.instrument.size_precision();
4543 let mut remaining = allowed_raw;
4544 fills = fills
4545 .into_iter()
4546 .filter_map(|(price, qty)| {
4547 if remaining == 0 {
4548 return None;
4549 }
4550 let capped = qty.raw.min(remaining);
4551 remaining -= capped;
4552 Some((price, Quantity::from_raw(capped, size_prec)))
4553 })
4554 .collect();
4555
4556 let consumed: QuantityRaw = fills.iter().map(|(_, qty)| qty.raw).sum();
4558
4559 if let Some(excess) = self.queue_excess.get_mut(&order.client_order_id()) {
4560 *excess = excess.saturating_sub(consumed);
4561 }
4562 self.trade_consumption = tc_before + consumed;
4563 }
4564
4565 if fills.is_empty() && self.config.liquidity_consumption {
4569 log::debug!(
4570 "Skipping fill for {}: no liquidity available after consumption",
4571 order.client_order_id()
4572 );
4573
4574 if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
4575 self.cancel_order(&order, None);
4576 }
4577
4578 return;
4579 }
4580
4581 let liquidity_side = order.liquidity_side().unwrap();
4582 self.apply_fills(
4583 &order,
4584 &fills,
4585 liquidity_side,
4586 venue_position_id,
4587 position.as_ref(),
4588 None,
4589 );
4590 }
4591 None => panic!("Limit order must have a price"),
4592 }
4593 }
4594
4595 fn apply_fills(
4596 &mut self,
4597 order: &OrderAny,
4598 fills: &[(Price, Quantity)],
4599 liquidity_side: LiquiditySide,
4600 venue_position_id: Option<PositionId>,
4601 position: Option<&Position>,
4602 protection_price: Option<Price>,
4603 ) {
4604 if order.time_in_force() == TimeInForce::Fok {
4605 let mut total_size = Quantity::zero(order.quantity().precision);
4606
4607 for &(fill_px, fill_qty) in fills {
4608 if self
4609 .normalize_price_for_current_instrument(fill_px)
4610 .is_some()
4611 && let Some(fill_qty) = self.normalize_quantity_for_current_instrument(fill_qty)
4612 {
4613 total_size = total_size.add(fill_qty);
4614 }
4615 }
4616
4617 if order.leaves_qty() > total_size {
4618 self.cancel_order(order, None);
4619 return;
4620 }
4621 }
4622
4623 if fills.is_empty() {
4624 if order.status() == OrderStatus::Submitted {
4625 self.generate_order_rejected(
4626 order,
4627 format!("No market for {}", order.instrument_id()).into(),
4628 );
4629 } else {
4630 log::error!(
4631 "Cannot fill order: no fills from book when fills were expected (check size in data)"
4632 );
4633 return;
4634 }
4635 }
4636
4637 let venue_position_id = if self.oms_type == OmsType::Netting {
4639 None
4640 } else {
4641 venue_position_id
4642 };
4643
4644 let mut initial_market_to_limit_fill = false;
4645 let mut total_filled = self
4646 .cached_filled_qty
4647 .get(&order.client_order_id())
4648 .copied()
4649 .unwrap_or_else(|| order.filled_qty());
4650 let initial_total_filled = total_filled;
4651 let mut last_fill_px: Option<Price> = None;
4652
4653 for &(fill_px, fill_qty) in fills {
4654 let Some(mut fill_px) = self.normalize_fill_price(fill_px, order.client_order_id())
4655 else {
4656 continue;
4657 };
4658
4659 let Some(fill_qty) = self.normalize_fill_quantity(fill_qty, order.client_order_id())
4660 else {
4661 continue;
4662 };
4663
4664 if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
4665 && order.order_type() == OrderType::MarketToLimit
4666 {
4667 self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
4668 initial_market_to_limit_fill = true;
4669 }
4670
4671 if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
4672 fill_px = match order.order_side().as_specified() {
4673 OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
4674 OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
4675 }
4676 }
4677
4678 let mut effective_fill_qty = fill_qty;
4682
4683 if self.config.use_reduce_only
4684 && order.is_reduce_only()
4685 && let Some(position) = &position
4686 && fill_qty > position.quantity
4687 {
4688 if position.quantity == Quantity::zero(position.quantity.precision) {
4689 return;
4691 }
4692
4693 let adjusted_fill_qty =
4695 Quantity::from_raw(position.quantity.raw, fill_qty.precision);
4696
4697 effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
4699
4700 if order.quantity() != adjusted_fill_qty {
4702 self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
4703 }
4704 }
4705
4706 if fill_qty.is_zero() {
4707 if fills.len() == 1 && order.status() == OrderStatus::Submitted {
4708 self.generate_order_rejected(
4709 order,
4710 format!("No market for {}", order.instrument_id()).into(),
4711 );
4712 }
4713 return;
4714 }
4715
4716 let capped_fill_qty = min(
4718 effective_fill_qty,
4719 order.quantity().saturating_sub(total_filled),
4720 );
4721 total_filled = total_filled.add(capped_fill_qty);
4722
4723 self.fill_order(
4724 order,
4725 fill_px,
4726 effective_fill_qty,
4727 liquidity_side,
4728 venue_position_id,
4729 position,
4730 );
4731 last_fill_px = Some(fill_px);
4732
4733 if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
4734 return;
4736 }
4737 }
4738
4739 let leaves_remaining = total_filled < order.quantity();
4740 let filled_in_loop = total_filled > initial_total_filled;
4741
4742 if order.time_in_force() == TimeInForce::Ioc && leaves_remaining {
4743 self.cancel_order(order, None);
4744 return;
4745 }
4746
4747 if leaves_remaining
4750 && (order.is_open() || filled_in_loop)
4751 && self.book_type == BookType::L1_MBP
4752 && matches!(
4753 order.order_type(),
4754 OrderType::Market
4755 | OrderType::MarketIfTouched
4756 | OrderType::StopMarket
4757 | OrderType::TrailingStopMarket
4758 )
4759 {
4760 let Some(last_fill_px) = last_fill_px else {
4762 return;
4763 };
4764
4765 let side = order.order_side().as_specified();
4766 let slip_fill_px = match side {
4767 OrderSideSpecified::Buy => last_fill_px.add(self.instrument.price_increment()),
4768 OrderSideSpecified::Sell => last_fill_px.sub(self.instrument.price_increment()),
4769 };
4770
4771 if let Some(protection_price) = protection_price {
4772 let exceeds_boundary = match side {
4773 OrderSideSpecified::Buy => slip_fill_px.raw > protection_price.raw,
4774 OrderSideSpecified::Sell => slip_fill_px.raw < protection_price.raw,
4775 };
4776
4777 if exceeds_boundary {
4778 return;
4779 }
4780 }
4781
4782 let leaves_qty = order.quantity().saturating_sub(total_filled);
4783
4784 self.fill_order(
4785 order,
4786 slip_fill_px,
4787 leaves_qty,
4788 liquidity_side,
4789 venue_position_id,
4790 position,
4791 );
4792 }
4793 }
4794
4795 fn normalize_fill_price(
4796 &self,
4797 fill_px: Price,
4798 client_order_id: ClientOrderId,
4799 ) -> Option<Price> {
4800 let normalized = self.normalize_price_for_current_instrument(fill_px);
4801 if normalized.is_none() {
4802 log::warn!(
4803 "Skipping fill for {client_order_id}: fill price {fill_px} is not compatible \
4804 with {} price_precision={} price_increment={}",
4805 self.instrument.id(),
4806 self.instrument.price_precision(),
4807 self.instrument.price_increment()
4808 );
4809 }
4810 normalized
4811 }
4812
4813 fn normalize_fill_quantity(
4814 &self,
4815 fill_qty: Quantity,
4816 client_order_id: ClientOrderId,
4817 ) -> Option<Quantity> {
4818 let normalized = self.normalize_quantity_for_current_instrument(fill_qty);
4819 if normalized.is_none() {
4820 log::warn!(
4821 "Skipping fill for {client_order_id}: fill quantity {fill_qty} is not compatible \
4822 with {} size_precision={}",
4823 self.instrument.id(),
4824 self.instrument.size_precision()
4825 );
4826 }
4827 normalized
4828 }
4829
4830 fn fill_order(
4831 &mut self,
4832 order: &OrderAny,
4833 last_px: Price,
4834 last_qty: Quantity,
4835 liquidity_side: LiquiditySide,
4836 venue_position_id: Option<PositionId>,
4837 _position: Option<&Position>,
4838 ) {
4839 self.check_size_precision(last_qty.precision, "fill quantity")
4840 .unwrap();
4841
4842 let (last_qty, new_filled_qty) =
4843 if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id()) {
4844 let leaves_qty = order.quantity().saturating_sub(*filled_qty);
4845 let last_qty = min(last_qty, leaves_qty);
4846 (last_qty, *filled_qty + last_qty)
4847 } else {
4848 let last_qty = min(last_qty, order.quantity());
4849 (last_qty, last_qty)
4850 };
4851
4852 self.cached_filled_qty
4853 .insert(order.client_order_id(), new_filled_qty);
4854
4855 if last_qty.is_zero() {
4856 return;
4857 }
4858
4859 let underlying_px = self.fee_underlying_price();
4860 let commission = self
4861 .fee_model
4862 .get_commission_with_context(order, last_qty, last_px, &self.instrument, underlying_px)
4863 .unwrap_or_else(|e| {
4864 panic!(
4865 "Failed to compute commission for {}: {}",
4866 order.client_order_id(),
4867 e
4868 );
4869 });
4870
4871 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
4872 self.generate_order_filled(
4873 order,
4874 venue_order_id,
4875 venue_position_id,
4876 last_qty,
4877 last_px,
4878 self.instrument.quote_currency(),
4879 commission,
4880 liquidity_side,
4881 );
4882
4883 let post_fill_filled_qty = self
4884 .cached_filled_qty
4885 .get(&order.client_order_id())
4886 .copied()
4887 .unwrap_or(order.filled_qty());
4888 let post_fill_leaves_qty = order.quantity().saturating_sub(post_fill_filled_qty);
4889 let fully_filled = post_fill_leaves_qty.is_zero();
4890
4891 if order.is_closed() || fully_filled {
4892 if self.core.order_exists(order.client_order_id()) {
4893 let _ = self.core.delete_order(order.client_order_id());
4894 }
4895 if order.order_type() != OrderType::MarketToLimit {
4898 self.purge_cached_filled_qty_if_closed(order.client_order_id());
4899 }
4900 }
4901
4902 if !self.config.support_contingent_orders {
4903 return;
4904 }
4905
4906 if let Some(contingency_type) = order.contingency_type() {
4907 match contingency_type {
4908 ContingencyType::Oto => {
4909 if let Some(linked_orders_ids) = order.linked_order_ids() {
4910 for client_order_id in linked_orders_ids {
4911 let mut child_order = match self.cache.borrow().order(client_order_id) {
4912 Some(child_order) => child_order.clone(),
4913 None => panic!("Order {client_order_id} not found in cache"),
4914 };
4915
4916 if child_order.is_closed() || child_order.is_active_local() {
4917 continue;
4918 }
4919
4920 if let (None, Some(position_id)) =
4922 (child_order.position_id(), order.position_id())
4923 {
4924 self.cache
4925 .borrow_mut()
4926 .add_position_id(
4927 &position_id,
4928 &self.venue,
4929 client_order_id,
4930 &child_order.strategy_id(),
4931 )
4932 .unwrap();
4933 log::debug!(
4934 "Added position id {position_id} to cache for order {client_order_id}"
4935 );
4936 }
4937
4938 if (!child_order.is_open())
4939 || (matches!(child_order.status(), OrderStatus::PendingUpdate)
4940 && child_order
4941 .previous_status()
4942 .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
4943 {
4944 let account_id = order.account_id().unwrap_or_else(|| {
4945 *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
4946 panic!(
4947 "Account ID not found for trader {}",
4948 order.trader_id()
4949 )
4950 })
4951 });
4952 self.process_order(&mut child_order, account_id);
4953 }
4954 }
4955 } else {
4956 log::error!(
4957 "OTO order {} does not have linked orders",
4958 order.client_order_id()
4959 );
4960 }
4961 }
4962 ContingencyType::Oco => {
4963 if let Some(linked_orders_ids) = order.linked_order_ids() {
4964 for client_order_id in linked_orders_ids {
4965 let child_order = match self.cache.borrow().order(client_order_id) {
4966 Some(child_order) => child_order.clone(),
4967 None => panic!("Order {client_order_id} not found in cache"),
4968 };
4969
4970 if child_order.is_closed() || child_order.is_active_local() {
4971 continue;
4972 }
4973
4974 self.cancel_order(&child_order, None);
4975 }
4976 } else {
4977 log::error!(
4978 "OCO order {} does not have linked orders",
4979 order.client_order_id()
4980 );
4981 }
4982 }
4983 ContingencyType::Ouo => {
4984 if let Some(linked_orders_ids) = order.linked_order_ids() {
4985 for client_order_id in linked_orders_ids {
4986 let mut child_order = match self.cache.borrow().order(client_order_id) {
4987 Some(child_order) => child_order.clone(),
4988 None => panic!("Order {client_order_id} not found in cache"),
4989 };
4990
4991 if child_order.is_active_local() {
4992 continue;
4993 }
4994
4995 let child_filled_qty = self
4996 .cached_filled_qty
4997 .get(&child_order.client_order_id())
4998 .copied()
4999 .unwrap_or(child_order.filled_qty());
5000
5001 if post_fill_leaves_qty.is_zero() && child_order.is_open() {
5002 self.cancel_order(&child_order, None);
5003 } else if child_order.is_open()
5004 && child_filled_qty >= post_fill_leaves_qty
5005 {
5006 self.cancel_order(&child_order, Some(false));
5007 } else if !post_fill_leaves_qty.is_zero()
5008 && post_fill_leaves_qty != child_order.leaves_qty()
5009 {
5010 let price = child_order.price();
5011 let trigger_price = child_order.trigger_price();
5012 self.update_order(
5013 &mut child_order,
5014 Some(post_fill_leaves_qty),
5015 price,
5016 trigger_price,
5017 Some(false),
5018 );
5019 }
5020 }
5021 } else {
5022 log::error!(
5023 "OUO order {} does not have linked orders",
5024 order.client_order_id()
5025 );
5026 }
5027 }
5028 _ => {}
5029 }
5030 }
5031 }
5032
5033 fn fee_underlying_price(&self) -> Option<Price> {
5034 if !matches!(
5035 self.instrument,
5036 InstrumentAny::CryptoOption(_) | InstrumentAny::OptionContract(_)
5037 ) {
5038 return None;
5039 }
5040
5041 let underlying = self.instrument.underlying()?;
5042 let underlying_id = InstrumentId::from(format!("{underlying}.{}", self.venue).as_str());
5043 let instrument_id = self.instrument.id();
5044 let cache = self.cache.borrow();
5045 cache
5046 .price(&underlying_id, PriceType::Last)
5047 .or_else(|| cache.price(&underlying_id, PriceType::Mark))
5048 .or_else(|| cache.price(&underlying_id, PriceType::Mid))
5049 .or_else(|| {
5050 cache
5051 .option_greeks(&instrument_id)
5052 .and_then(|greeks| greeks.underlying_price)
5053 .map(|price| Price::new(price, FIXED_PRECISION))
5054 })
5055 }
5056
5057 fn cached_order_is_closed(&self, client_order_id: ClientOrderId) -> bool {
5058 self.cache
5059 .borrow()
5060 .order(&client_order_id)
5061 .is_none_or(|order| order.is_closed())
5062 }
5063
5064 fn purge_cached_filled_qty_if_closed(&mut self, client_order_id: ClientOrderId) {
5065 if self.cached_order_is_closed(client_order_id) {
5066 self.cached_filled_qty.swap_remove(&client_order_id);
5067 }
5068 }
5069
5070 fn purge_closed_cached_filled_qty(&mut self) {
5071 let client_order_ids: Vec<ClientOrderId> = self.cached_filled_qty.keys().copied().collect();
5072
5073 for client_order_id in client_order_ids {
5074 self.purge_cached_filled_qty_if_closed(client_order_id);
5075 }
5076 }
5077
5078 fn update_limit_order(&mut self, order: &OrderAny, quantity: Quantity, price: Price) {
5079 if self
5080 .core
5081 .is_limit_matched(order.order_side_specified(), price)
5082 {
5083 if order.is_post_only() {
5084 self.generate_order_modify_rejected(
5085 order.trader_id(),
5086 order.strategy_id(),
5087 order.instrument_id(),
5088 order.client_order_id(),
5089 Ustr::from(format!(
5090 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
5091 order.order_type(),
5092 order.order_side(),
5093 price,
5094 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
5095 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
5096 ).as_str()),
5097 order.venue_order_id(),
5098 order.account_id(),
5099 );
5100 return;
5101 }
5102
5103 self.generate_order_updated(order, quantity, Some(price), None, None);
5104
5105 let client_order_id = order.client_order_id();
5107 if let Some(mut order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5108 order.set_liquidity_side(LiquiditySide::Taker);
5109 }
5110 self.fill_limit_order(client_order_id);
5111 return;
5112 }
5113 self.generate_order_updated(order, quantity, Some(price), None, None);
5114 }
5115
5116 fn update_stop_market_order(&self, order: &OrderAny, quantity: Quantity, trigger_price: Price) {
5117 if self
5118 .core
5119 .is_stop_matched(order.order_side_specified(), trigger_price)
5120 {
5121 self.generate_order_modify_rejected(
5122 order.trader_id(),
5123 order.strategy_id(),
5124 order.instrument_id(),
5125 order.client_order_id(),
5126 Ustr::from(
5127 format!(
5128 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
5129 order.order_type(),
5130 order.order_side(),
5131 trigger_price,
5132 self.core
5133 .bid
5134 .map_or_else(|| "None".to_string(), |p| p.to_string()),
5135 self.core
5136 .ask
5137 .map_or_else(|| "None".to_string(), |p| p.to_string())
5138 )
5139 .as_str(),
5140 ),
5141 order.venue_order_id(),
5142 order.account_id(),
5143 );
5144 return;
5145 }
5146
5147 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
5148 }
5149
5150 fn update_stop_limit_order(
5151 &mut self,
5152 order: &mut OrderAny,
5153 quantity: Quantity,
5154 price: Price,
5155 trigger_price: Price,
5156 ) {
5157 if order.is_triggered().is_some_and(|t| t) {
5158 if self
5160 .core
5161 .is_limit_matched(order.order_side_specified(), price)
5162 {
5163 if order.is_post_only() {
5164 self.generate_order_modify_rejected(
5165 order.trader_id(),
5166 order.strategy_id(),
5167 order.instrument_id(),
5168 order.client_order_id(),
5169 Ustr::from(format!(
5170 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
5171 order.order_type(),
5172 order.order_side(),
5173 price,
5174 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
5175 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
5176 ).as_str()),
5177 order.venue_order_id(),
5178 order.account_id(),
5179 );
5180 return;
5181 }
5182 self.generate_order_updated(order, quantity, Some(price), None, None);
5183 order.set_liquidity_side(LiquiditySide::Taker);
5184
5185 if let Err(e) = self
5186 .cache
5187 .borrow_mut()
5188 .add_order(order.clone(), None, None, false)
5189 {
5190 log::debug!("Order already in cache: {e}");
5191 }
5192 self.fill_limit_order(order.client_order_id());
5193 return; }
5195 } else {
5196 if self
5198 .core
5199 .is_stop_matched(order.order_side_specified(), trigger_price)
5200 {
5201 self.generate_order_modify_rejected(
5202 order.trader_id(),
5203 order.strategy_id(),
5204 order.instrument_id(),
5205 order.client_order_id(),
5206 Ustr::from(
5207 format!(
5208 "{} {} order new stop px of {} was in the market: bid={}, ask={}",
5209 order.order_type(),
5210 order.order_side(),
5211 trigger_price,
5212 self.core
5213 .bid
5214 .map_or_else(|| "None".to_string(), |p| p.to_string()),
5215 self.core
5216 .ask
5217 .map_or_else(|| "None".to_string(), |p| p.to_string())
5218 )
5219 .as_str(),
5220 ),
5221 order.venue_order_id(),
5222 order.account_id(),
5223 );
5224 return;
5225 }
5226 }
5227
5228 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
5229 }
5230
5231 fn update_market_if_touched_order(
5232 &self,
5233 order: &OrderAny,
5234 quantity: Quantity,
5235 trigger_price: Price,
5236 ) {
5237 if self
5238 .core
5239 .is_touch_triggered(order.order_side_specified(), trigger_price)
5240 {
5241 self.generate_order_modify_rejected(
5242 order.trader_id(),
5243 order.strategy_id(),
5244 order.instrument_id(),
5245 order.client_order_id(),
5246 Ustr::from(
5247 format!(
5248 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
5249 order.order_type(),
5250 order.order_side(),
5251 trigger_price,
5252 self.core
5253 .bid
5254 .map_or_else(|| "None".to_string(), |p| p.to_string()),
5255 self.core
5256 .ask
5257 .map_or_else(|| "None".to_string(), |p| p.to_string())
5258 )
5259 .as_str(),
5260 ),
5261 order.venue_order_id(),
5262 order.account_id(),
5263 );
5264 return;
5266 }
5267
5268 self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
5269 }
5270
5271 fn update_limit_if_touched_order(
5272 &mut self,
5273 order: &mut OrderAny,
5274 quantity: Quantity,
5275 price: Price,
5276 trigger_price: Price,
5277 ) {
5278 if order.is_triggered().is_some_and(|t| t) {
5279 if self
5281 .core
5282 .is_limit_matched(order.order_side_specified(), price)
5283 {
5284 if order.is_post_only() {
5285 self.generate_order_modify_rejected(
5286 order.trader_id(),
5287 order.strategy_id(),
5288 order.instrument_id(),
5289 order.client_order_id(),
5290 Ustr::from(format!(
5291 "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
5292 order.order_type(),
5293 order.order_side(),
5294 price,
5295 self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
5296 self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
5297 ).as_str()),
5298 order.venue_order_id(),
5299 order.account_id(),
5300 );
5301 return;
5303 }
5304 self.generate_order_updated(order, quantity, Some(price), None, None);
5305 order.set_liquidity_side(LiquiditySide::Taker);
5306 self.fill_limit_order(order.client_order_id());
5307 return;
5308 }
5309 } else {
5310 if self
5312 .core
5313 .is_touch_triggered(order.order_side_specified(), trigger_price)
5314 {
5315 self.generate_order_modify_rejected(
5316 order.trader_id(),
5317 order.strategy_id(),
5318 order.instrument_id(),
5319 order.client_order_id(),
5320 Ustr::from(
5321 format!(
5322 "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
5323 order.order_type(),
5324 order.order_side(),
5325 trigger_price,
5326 self.core
5327 .bid
5328 .map_or_else(|| "None".to_string(), |p| p.to_string()),
5329 self.core
5330 .ask
5331 .map_or_else(|| "None".to_string(), |p| p.to_string())
5332 )
5333 .as_str(),
5334 ),
5335 order.venue_order_id(),
5336 order.account_id(),
5337 );
5338 return;
5339 }
5340 }
5341
5342 self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
5343 }
5344
5345 fn update_trailing_stop_order(&self, order: &OrderAny) {
5346 let (new_trigger_price, new_price) = trailing_stop_calculate(
5347 self.instrument.price_increment(),
5348 order.trigger_price(),
5349 order.activation_price(),
5350 order,
5351 self.core.bid,
5352 self.core.ask,
5353 self.core.last,
5354 )
5355 .unwrap();
5356
5357 if new_trigger_price.is_none() && new_price.is_none() {
5358 return;
5359 }
5360
5361 self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
5362 }
5363
5364 fn accept_order(&mut self, order: &mut OrderAny) {
5365 if order.is_closed() {
5366 return;
5368 }
5369
5370 if order.status() != OrderStatus::Accepted {
5371 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
5372 let event = self.create_order_accepted(order, venue_order_id);
5373 if let Err(e) = order.apply(event.clone()) {
5376 log::warn!(
5377 "Skipping local apply of accepted event for {}: {e}",
5378 order.client_order_id(),
5379 );
5380 }
5381 self.dispatch_order_event(event);
5382
5383 if matches!(
5386 order.order_type(),
5387 OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
5388 ) && order.trigger_price().is_none()
5389 && self.maybe_activate_trailing_stop(
5390 order,
5391 self.core.bid,
5392 self.core.ask,
5393 self.core.last,
5394 )
5395 {
5396 self.update_trailing_stop_order(order);
5397 }
5398 }
5399
5400 let match_info = Self::matching_core_entry(order);
5401 self.core.add_order(match_info);
5402 }
5403
5404 fn matching_core_entry(order: &OrderAny) -> RestingOrder {
5405 let triggered_limit_style = matches!(
5406 order.order_type(),
5407 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit
5408 ) && order.is_triggered().is_some_and(|triggered| triggered);
5409
5410 RestingOrder::new(
5411 order.client_order_id(),
5412 order.order_side().as_specified(),
5413 order.order_type(),
5414 if triggered_limit_style {
5415 None
5416 } else {
5417 order.trigger_price()
5418 },
5419 order.price(),
5420 match order {
5421 OrderAny::TrailingStopMarket(o) => o.is_activated,
5422 OrderAny::TrailingStopLimit(o) => o.is_activated,
5423 _ => true,
5424 },
5425 )
5426 }
5427
5428 fn expire_order(&mut self, order: &OrderAny) {
5429 if self.config.support_contingent_orders
5430 && order
5431 .contingency_type()
5432 .is_some_and(|c| c != ContingencyType::NoContingency)
5433 {
5434 self.cancel_contingent_orders(order);
5435 }
5436
5437 self.generate_order_expired(order);
5438 }
5439
5440 fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
5441 let cancel_contingencies = cancel_contingencies.unwrap_or(true);
5442
5443 if order.is_active_local() {
5444 log::error!(
5445 "Cannot cancel an order with {} from the matching engine",
5446 order.status()
5447 );
5448 return;
5449 }
5450
5451 if self.core.order_exists(order.client_order_id()) {
5453 let _ = self.core.delete_order(order.client_order_id());
5454 }
5455 self.cached_filled_qty.swap_remove(&order.client_order_id());
5456
5457 let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
5458 self.generate_order_canceled(order, venue_order_id);
5459
5460 if self.config.support_contingent_orders
5461 && order.contingency_type().is_some()
5462 && order.contingency_type().unwrap() != ContingencyType::NoContingency
5463 && cancel_contingencies
5464 {
5465 self.cancel_contingent_orders(order);
5466 }
5467 }
5468
5469 fn update_order(
5470 &mut self,
5471 order: &mut OrderAny,
5472 quantity: Option<Quantity>,
5473 price: Option<Price>,
5474 trigger_price: Option<Price>,
5475 update_contingencies: Option<bool>,
5476 ) -> bool {
5477 let update_contingencies = update_contingencies.unwrap_or(true);
5478 let quantity = quantity.unwrap_or(order.quantity());
5479
5480 let price_prec = self.instrument.price_precision();
5481 let size_prec = self.instrument.size_precision();
5482 let instrument_id = self.instrument.id();
5483
5484 if quantity.precision != size_prec {
5485 self.generate_order_modify_rejected(
5486 order.trader_id(),
5487 order.strategy_id(),
5488 order.instrument_id(),
5489 order.client_order_id(),
5490 Ustr::from(&format!(
5491 "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
5492 quantity.precision
5493 )),
5494 order.venue_order_id(),
5495 order.account_id(),
5496 );
5497 return false;
5498 }
5499
5500 if let Some(px) = price
5501 && px.precision != price_prec
5502 {
5503 self.generate_order_modify_rejected(
5504 order.trader_id(),
5505 order.strategy_id(),
5506 order.instrument_id(),
5507 order.client_order_id(),
5508 Ustr::from(&format!(
5509 "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
5510 px.precision
5511 )),
5512 order.venue_order_id(),
5513 order.account_id(),
5514 );
5515 return false;
5516 }
5517
5518 if let Some(tp) = trigger_price
5519 && tp.precision != price_prec
5520 {
5521 self.generate_order_modify_rejected(
5522 order.trader_id(),
5523 order.strategy_id(),
5524 order.instrument_id(),
5525 order.client_order_id(),
5526 Ustr::from(&format!(
5527 "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
5528 tp.precision
5529 )),
5530 order.venue_order_id(),
5531 order.account_id(),
5532 );
5533 return false;
5534 }
5535
5536 let filled_qty = self
5538 .cached_filled_qty
5539 .get(&order.client_order_id())
5540 .copied()
5541 .unwrap_or(order.filled_qty());
5542 if quantity < filled_qty {
5543 self.generate_order_modify_rejected(
5544 order.trader_id(),
5545 order.strategy_id(),
5546 order.instrument_id(),
5547 order.client_order_id(),
5548 Ustr::from(&format!(
5549 "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
5550 )),
5551 order.venue_order_id(),
5552 order.account_id(),
5553 );
5554 return false;
5555 }
5556
5557 match order {
5558 OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
5559 let price = price.unwrap_or(order.price().unwrap());
5560 self.update_limit_order(order, quantity, price);
5561 }
5562 OrderAny::StopMarket(_) => {
5563 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5564 self.update_stop_market_order(order, quantity, trigger_price);
5565 }
5566 OrderAny::StopLimit(_) => {
5567 let price = price.unwrap_or(order.price().unwrap());
5568 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5569 self.update_stop_limit_order(order, quantity, price, trigger_price);
5570 }
5571 OrderAny::MarketIfTouched(_) => {
5572 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5573 self.update_market_if_touched_order(order, quantity, trigger_price);
5574 }
5575 OrderAny::LimitIfTouched(_) => {
5576 let price = price.unwrap_or(order.price().unwrap());
5577 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5578 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
5579 }
5580 OrderAny::TrailingStopMarket(_) => {
5581 let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5582 self.update_market_if_touched_order(order, quantity, trigger_price);
5583 }
5584 OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
5585 let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
5586 let trigger_price =
5587 trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
5588 self.update_limit_if_touched_order(order, quantity, price, trigger_price);
5589 }
5590 _ => {
5591 panic!(
5592 "Unsupported order type {} for update_order",
5593 order.order_type()
5594 );
5595 }
5596 }
5597
5598 let new_leaves_qty = quantity.saturating_sub(filled_qty);
5600 if new_leaves_qty.is_zero() {
5601 if self.config.support_contingent_orders
5602 && order
5603 .contingency_type()
5604 .is_some_and(|c| c != ContingencyType::NoContingency)
5605 && update_contingencies
5606 {
5607 self.update_contingent_order(order, quantity);
5608 }
5609 self.cancel_order(order, Some(false));
5611 return true;
5612 }
5613
5614 if self.config.support_contingent_orders
5615 && order
5616 .contingency_type()
5617 .is_some_and(|c| c != ContingencyType::NoContingency)
5618 && update_contingencies
5619 {
5620 self.update_contingent_order(order, quantity);
5621 }
5622
5623 true
5624 }
5625
5626 pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
5628 let order = match self
5629 .cache
5630 .borrow()
5631 .order(&client_order_id)
5632 .map(|o| o.clone())
5633 {
5634 Some(order) => order,
5635 None => {
5636 log::error!(
5637 "Cannot trigger stop order: order {client_order_id} not found in cache"
5638 );
5639 return;
5640 }
5641 };
5642
5643 match order.order_type() {
5644 OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
5645 self.trigger_limit_style_stop_order(client_order_id, order);
5646 }
5647 OrderType::StopMarket | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
5648 self.fill_market_order(client_order_id);
5649 }
5650 _ => {
5651 log::error!(
5652 "Cannot trigger stop order: invalid order type {}",
5653 order.order_type()
5654 );
5655 }
5656 }
5657 }
5658
5659 fn trigger_limit_style_stop_order(&mut self, client_order_id: ClientOrderId, order: OrderAny) {
5660 if order.is_triggered().is_some_and(|triggered| triggered) {
5661 let liquidity_side = match (order.price(), order.trigger_price()) {
5662 (Some(price), Some(trigger_price)) => Self::determine_triggered_limit_liquidity(
5663 order.order_side(),
5664 price,
5665 trigger_price,
5666 ),
5667 _ => LiquiditySide::Maker,
5668 };
5669
5670 if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id)
5671 && !matches!(
5672 cached_order.liquidity_side(),
5673 Some(LiquiditySide::Maker | LiquiditySide::Taker)
5674 )
5675 {
5676 cached_order.set_liquidity_side(liquidity_side);
5677 }
5678 self.fill_limit_order(client_order_id);
5679 return;
5680 }
5681
5682 let event = self.create_order_triggered(&order);
5683 let order = match self.cache.borrow_mut().update_order(&event) {
5684 Ok(order) => order,
5685 Err(e) => {
5686 log::debug!(
5687 "Failed to apply triggered event for {} before fill: {e}",
5688 order.client_order_id(),
5689 );
5690 order
5691 }
5692 };
5693 self.dispatch_order_event(event);
5694
5695 let trigger_price = order
5696 .trigger_price()
5697 .expect("Limit-style stop order must have a trigger price");
5698 let price = order
5699 .price()
5700 .expect("Limit-style stop order must have a price");
5701
5702 let maker_inside = match order.order_side() {
5703 OrderSide::Buy => self
5704 .core
5705 .ask
5706 .is_some_and(|ask| trigger_price > price && price > ask),
5707 OrderSide::Sell => self
5708 .core
5709 .bid
5710 .is_some_and(|bid| trigger_price < price && price < bid),
5711 OrderSide::NoOrderSide => false,
5712 };
5713
5714 if maker_inside {
5715 if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5716 cached_order.set_liquidity_side(LiquiditySide::Maker);
5717 }
5718 self.resync_core_entry(client_order_id);
5719 self.fill_limit_order(client_order_id);
5720 return;
5721 }
5722
5723 if self
5724 .core
5725 .is_limit_matched(order.order_side_specified(), price)
5726 {
5727 if order.is_post_only() {
5728 let _ = self.core.delete_order(client_order_id);
5729 self.cached_filled_qty.swap_remove(&client_order_id);
5730 let event = self.create_order_rejected(
5731 &order,
5732 format!(
5733 "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
5734 order.order_type(),
5735 order.order_side(),
5736 price,
5737 self.core
5738 .bid
5739 .map_or_else(|| "None".to_string(), |p| p.to_string()),
5740 self.core
5741 .ask
5742 .map_or_else(|| "None".to_string(), |p| p.to_string())
5743 )
5744 .into(),
5745 );
5746
5747 if let Err(e) = self.cache.borrow_mut().update_order(&event) {
5748 log::debug!(
5749 "Failed to apply rejected event for {} after post-only trigger: {e}",
5750 order.client_order_id(),
5751 );
5752 }
5753 self.dispatch_order_event(event);
5754 return;
5755 }
5756
5757 if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5758 cached_order.set_liquidity_side(LiquiditySide::Taker);
5759 }
5760 self.resync_core_entry(client_order_id);
5761 self.fill_limit_order(client_order_id);
5762 return;
5763 }
5764
5765 if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5766 cached_order.set_liquidity_side(Self::determine_triggered_limit_liquidity(
5767 order.order_side(),
5768 price,
5769 trigger_price,
5770 ));
5771 }
5772 self.resync_core_entry(client_order_id);
5773 }
5774
5775 fn determine_triggered_limit_liquidity(
5776 side: OrderSide,
5777 price: Price,
5778 trigger_price: Price,
5779 ) -> LiquiditySide {
5780 if (side == OrderSide::Buy && trigger_price > price)
5781 || (side == OrderSide::Sell && trigger_price < price)
5782 {
5783 LiquiditySide::Maker
5784 } else {
5785 LiquiditySide::Taker
5786 }
5787 }
5788
5789 fn update_contingent_order(&mut self, order: &OrderAny, parent_quantity: Quantity) {
5790 log::debug!(
5791 "Updating contingent orders from {}",
5792 order.client_order_id()
5793 );
5794
5795 if let Some(linked_order_ids) = order.linked_order_ids() {
5796 let parent_filled_qty = self
5797 .cached_filled_qty
5798 .get(&order.client_order_id())
5799 .copied()
5800 .unwrap_or(order.filled_qty());
5801 let parent_leaves_qty = parent_quantity.saturating_sub(parent_filled_qty);
5802
5803 for client_order_id in linked_order_ids {
5804 let mut child_order = match self.cache.borrow().order(client_order_id) {
5805 Some(order) => order.clone(),
5806 None => panic!("Order {client_order_id} not found in cache."),
5807 };
5808
5809 if child_order.is_active_local() {
5810 continue;
5811 }
5812
5813 let child_filled_qty = self
5814 .cached_filled_qty
5815 .get(&child_order.client_order_id())
5816 .copied()
5817 .unwrap_or(child_order.filled_qty());
5818
5819 if parent_leaves_qty.is_zero() {
5820 self.cancel_order(&child_order, Some(false));
5821 } else if child_filled_qty >= parent_leaves_qty {
5822 self.cancel_order(&child_order, Some(false));
5824 } else {
5825 let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
5826 if child_leaves_qty != parent_leaves_qty {
5827 let price = child_order.price();
5828 let trigger_price = child_order.trigger_price();
5829 self.update_order(
5830 &mut child_order,
5831 Some(parent_leaves_qty),
5832 price,
5833 trigger_price,
5834 Some(false),
5835 );
5836 }
5837 }
5838 }
5839 }
5840 }
5841
5842 fn cancel_contingent_orders(&mut self, order: &OrderAny) {
5843 if let Some(linked_order_ids) = order.linked_order_ids() {
5844 for client_order_id in linked_order_ids {
5845 let contingent_order = match self.cache.borrow().order(client_order_id) {
5846 Some(order) => order.clone(),
5847 None => panic!("Cannot find contingent order for {client_order_id}"),
5848 };
5849
5850 if contingent_order.is_active_local() {
5851 continue;
5853 }
5854
5855 if !contingent_order.is_closed() {
5856 self.cancel_order(&contingent_order, Some(false));
5857 }
5858 }
5859 }
5860 }
5861
5862 fn generate_order_submitted(&self, order: &OrderAny, account_id: AccountId) {
5863 let ts_now = self.clock.borrow().timestamp_ns();
5864 let event = OrderEventAny::Submitted(OrderSubmitted::new(
5865 order.trader_id(),
5866 order.strategy_id(),
5867 order.instrument_id(),
5868 order.client_order_id(),
5869 account_id,
5870 UUID4::new(),
5871 ts_now,
5872 ts_now,
5873 ));
5874 self.dispatch_order_event(event);
5875 }
5876
5877 fn create_order_rejected(&self, order: &OrderAny, reason: Ustr) -> OrderEventAny {
5878 let ts_now = self.clock.borrow().timestamp_ns();
5879 let account_id = order
5880 .account_id()
5881 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
5882
5883 let due_post_only = reason.as_str().starts_with("POST_ONLY");
5884
5885 OrderEventAny::Rejected(OrderRejected::new(
5886 order.trader_id(),
5887 order.strategy_id(),
5888 order.instrument_id(),
5889 order.client_order_id(),
5890 account_id,
5891 reason,
5892 UUID4::new(),
5893 ts_now,
5894 ts_now,
5895 false,
5896 due_post_only,
5897 ))
5898 }
5899
5900 fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
5901 let event = self.create_order_rejected(order, reason);
5902 self.dispatch_order_event(event);
5903 }
5904
5905 fn publish_order_initialized(&self, order: &OrderAny) {
5906 let event = OrderEventAny::Initialized(order.init_event().clone());
5907 msgbus::publish_order_event(
5908 format!("events.order.{}", order.strategy_id()).into(),
5909 &event,
5910 );
5911 }
5912
5913 fn create_order_accepted(
5914 &self,
5915 order: &OrderAny,
5916 venue_order_id: VenueOrderId,
5917 ) -> OrderEventAny {
5918 let ts_now = self.clock.borrow().timestamp_ns();
5919 let account_id = order
5920 .account_id()
5921 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
5922 OrderEventAny::Accepted(OrderAccepted::new(
5923 order.trader_id(),
5924 order.strategy_id(),
5925 order.instrument_id(),
5926 order.client_order_id(),
5927 venue_order_id,
5928 account_id,
5929 UUID4::new(),
5930 ts_now,
5931 ts_now,
5932 false,
5933 ))
5934 }
5935
5936 fn generate_order_accepted(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
5937 let event = self.create_order_accepted(order, venue_order_id);
5938 self.dispatch_order_event(event);
5939 }
5940
5941 #[expect(clippy::too_many_arguments)]
5942 fn generate_order_modify_rejected(
5943 &self,
5944 trader_id: TraderId,
5945 strategy_id: StrategyId,
5946 instrument_id: InstrumentId,
5947 client_order_id: ClientOrderId,
5948 reason: Ustr,
5949 venue_order_id: Option<VenueOrderId>,
5950 account_id: Option<AccountId>,
5951 ) {
5952 let ts_now = self.clock.borrow().timestamp_ns();
5953 let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
5954 trader_id,
5955 strategy_id,
5956 instrument_id,
5957 client_order_id,
5958 reason,
5959 UUID4::new(),
5960 ts_now,
5961 ts_now,
5962 false,
5963 venue_order_id,
5964 account_id,
5965 ));
5966 self.dispatch_order_event(event);
5967 }
5968
5969 #[expect(clippy::too_many_arguments)]
5970 fn generate_order_cancel_rejected(
5971 &self,
5972 trader_id: TraderId,
5973 strategy_id: StrategyId,
5974 account_id: AccountId,
5975 instrument_id: InstrumentId,
5976 client_order_id: ClientOrderId,
5977 venue_order_id: Option<VenueOrderId>,
5978 reason: Ustr,
5979 ) {
5980 let ts_now = self.clock.borrow().timestamp_ns();
5981 let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
5982 trader_id,
5983 strategy_id,
5984 instrument_id,
5985 client_order_id,
5986 reason,
5987 UUID4::new(),
5988 ts_now,
5989 ts_now,
5990 false,
5991 venue_order_id,
5992 Some(account_id),
5993 ));
5994 self.dispatch_order_event(event);
5995 }
5996
5997 fn generate_order_updated(
5998 &self,
5999 order: &OrderAny,
6000 quantity: Quantity,
6001 price: Option<Price>,
6002 trigger_price: Option<Price>,
6003 protection_price: Option<Price>,
6004 ) {
6005 let ts_now = self.clock.borrow().timestamp_ns();
6006 let event = OrderEventAny::Updated(OrderUpdated::new(
6007 order.trader_id(),
6008 order.strategy_id(),
6009 order.instrument_id(),
6010 order.client_order_id(),
6011 quantity,
6012 UUID4::new(),
6013 ts_now,
6014 ts_now,
6015 false,
6016 order.venue_order_id(),
6017 order.account_id(),
6018 price,
6019 trigger_price,
6020 protection_price,
6021 order.is_quote_quantity(),
6022 ));
6023
6024 self.dispatch_order_event(event);
6025 }
6026
6027 fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
6028 let ts_now = self.clock.borrow().timestamp_ns();
6029 let event = OrderEventAny::Canceled(OrderCanceled::new(
6030 order.trader_id(),
6031 order.strategy_id(),
6032 order.instrument_id(),
6033 order.client_order_id(),
6034 UUID4::new(),
6035 ts_now,
6036 ts_now,
6037 false,
6038 Some(venue_order_id),
6039 order.account_id(),
6040 ));
6041 self.dispatch_order_event(event);
6042 }
6043
6044 fn create_order_triggered(&self, order: &OrderAny) -> OrderEventAny {
6045 let ts_now = self.clock.borrow().timestamp_ns();
6046 OrderEventAny::Triggered(OrderTriggered::new(
6047 order.trader_id(),
6048 order.strategy_id(),
6049 order.instrument_id(),
6050 order.client_order_id(),
6051 UUID4::new(),
6052 ts_now,
6053 ts_now,
6054 false,
6055 order.venue_order_id(),
6056 order.account_id(),
6057 ))
6058 }
6059
6060 fn generate_order_triggered(&self, order: &OrderAny) {
6061 let event = self.create_order_triggered(order);
6062 self.dispatch_order_event(event);
6063 }
6064
6065 fn generate_order_expired(&self, order: &OrderAny) {
6066 let ts_now = self.clock.borrow().timestamp_ns();
6067 let event = OrderEventAny::Expired(OrderExpired::new(
6068 order.trader_id(),
6069 order.strategy_id(),
6070 order.instrument_id(),
6071 order.client_order_id(),
6072 UUID4::new(),
6073 ts_now,
6074 ts_now,
6075 false,
6076 order.venue_order_id(),
6077 order.account_id(),
6078 ));
6079 self.dispatch_order_event(event);
6080 }
6081
6082 #[expect(clippy::too_many_arguments)]
6083 fn generate_order_filled(
6084 &mut self,
6085 order: &OrderAny,
6086 venue_order_id: VenueOrderId,
6087 venue_position_id: Option<PositionId>,
6088 last_qty: Quantity,
6089 last_px: Price,
6090 quote_currency: Currency,
6091 commission: Money,
6092 liquidity_side: LiquiditySide,
6093 ) {
6094 debug_assert!(
6095 last_qty <= order.quantity(),
6096 "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
6097 order_qty = order.quantity(),
6098 client_order_id = order.client_order_id()
6099 );
6100
6101 let ts_now = self.clock.borrow().timestamp_ns();
6102 let account_id = order
6103 .account_id()
6104 .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
6105 let event = OrderEventAny::Filled(OrderFilled::new(
6106 order.trader_id(),
6107 order.strategy_id(),
6108 order.instrument_id(),
6109 order.client_order_id(),
6110 venue_order_id,
6111 account_id,
6112 self.ids_generator.generate_trade_id(ts_now),
6113 order.order_side(),
6114 order.order_type(),
6115 last_qty,
6116 last_px,
6117 quote_currency,
6118 liquidity_side,
6119 UUID4::new(),
6120 ts_now,
6121 ts_now,
6122 false,
6123 venue_position_id,
6124 Some(commission),
6125 ));
6126
6127 self.dispatch_order_event(event);
6128 }
6129}
6130
6131#[derive(Debug, Clone, Copy)]
6132struct BarTickSizes {
6133 open: Quantity,
6134 high: Quantity,
6135 low: Quantity,
6136 close: Quantity,
6137}
6138
6139impl BarTickSizes {
6140 fn from_volume(volume: Quantity, size_increment: Quantity) -> Self {
6141 let precision_diff = FIXED_PRECISION.saturating_sub(volume.precision);
6142 let scale = QuantityRaw::pow(10, u32::from(precision_diff));
6143 let units = volume.raw / scale;
6144 let increment_units = (size_increment.raw / scale).max(1);
6145 let rounded_units = (units / increment_units) * increment_units;
6146 let increments = rounded_units / increment_units;
6147 let zero = Quantity::zero(volume.precision);
6148 let size =
6149 |increments| Quantity::from_raw(increments * increment_units * scale, volume.precision);
6150
6151 match increments {
6152 0 => Self {
6153 open: zero,
6154 high: zero,
6155 low: zero,
6156 close: zero,
6157 },
6158 1 => Self {
6160 open: zero,
6161 high: zero,
6162 low: zero,
6163 close: size(1),
6164 },
6165 2 => Self {
6166 open: zero,
6167 high: size(1),
6168 low: size(1),
6169 close: zero,
6170 },
6171 3 => {
6172 let path_size = size(1);
6173
6174 Self {
6175 open: path_size,
6176 high: path_size,
6177 low: path_size,
6178 close: zero,
6179 }
6180 }
6181 _ => {
6182 let path_increments = increments / 4;
6183 let close_increments = increments - (path_increments * 3);
6184 let path_size = size(path_increments);
6185
6186 Self {
6187 open: path_size,
6188 high: path_size,
6189 low: path_size,
6190 close: size(close_increments),
6191 }
6192 }
6193 }
6194 }
6195}
6196
6197#[cfg(test)]
6201mod tests {
6202 use nautilus_model::types::{Quantity, fixed::FIXED_PRECISION, quantity::QuantityRaw};
6203 use rstest::rstest;
6204
6205 use super::{BarTickSizes, OrderMatchingEngine};
6206
6207 fn assert_valid_bar_tick_sizes(volume: Quantity, size_increment: Quantity) {
6208 let sizes = BarTickSizes::from_volume(volume, size_increment);
6209 let total_raw = sizes.open.raw + sizes.high.raw + sizes.low.raw + sizes.close.raw;
6210 assert!(total_raw <= volume.raw);
6211
6212 for quantity in [sizes.open, sizes.high, sizes.low, sizes.close] {
6213 assert_eq!(quantity.precision, volume.precision);
6214 assert!(
6215 OrderMatchingEngine::quantity_matches_precision(quantity, volume.precision),
6216 "bar tick quantity {quantity} not aligned to precision {}",
6217 volume.precision,
6218 );
6219 assert!(
6220 size_increment.raw == 0 || quantity.raw.is_multiple_of(size_increment.raw),
6221 "bar tick quantity {quantity} not aligned to increment {size_increment}",
6222 );
6223 }
6224
6225 if size_increment.raw > 0 {
6226 assert!(
6227 volume.raw - total_raw < size_increment.raw,
6228 "bar tick split left {} raw units from volume {volume} and increment {size_increment}",
6229 volume.raw - total_raw,
6230 );
6231 }
6232 }
6233
6234 #[rstest]
6235 fn test_bar_tick_sizes_divisible() {
6236 let volume = Quantity::from("100.000");
6238 let increment = Quantity::from("0.001");
6239 let sizes = BarTickSizes::from_volume(volume, increment);
6240 assert_eq!(sizes.open, Quantity::from("25.000"));
6241 assert_eq!(sizes.high, Quantity::from("25.000"));
6242 assert_eq!(sizes.low, Quantity::from("25.000"));
6243 assert_eq!(sizes.close, Quantity::from("25.000"));
6244 assert_valid_bar_tick_sizes(volume, increment);
6245 }
6246
6247 #[rstest]
6248 fn test_bar_tick_sizes_indivisible_with_remainder() {
6249 let volume = Quantity::from("0.05");
6251 let increment = Quantity::from("0.01");
6252 let sizes = BarTickSizes::from_volume(volume, increment);
6253 assert_eq!(sizes.open, Quantity::from("0.01"));
6254 assert_eq!(sizes.high, Quantity::from("0.01"));
6255 assert_eq!(sizes.low, Quantity::from("0.01"));
6256 assert_eq!(sizes.close, Quantity::from("0.02"));
6257 assert_valid_bar_tick_sizes(volume, increment);
6258 assert_eq!(
6259 sizes.open.raw + sizes.high.raw + sizes.low.raw + sizes.close.raw,
6260 volume.raw
6261 );
6262 }
6263
6264 #[rstest]
6265 #[case("1", "0", "0", "0", "1")]
6266 #[case("2", "0", "1", "1", "0")]
6267 #[case("3", "1", "1", "1", "0")]
6268 fn test_bar_tick_sizes_units_less_than_four_preserves_volume(
6269 #[case] volume: &str,
6270 #[case] open_size: &str,
6271 #[case] high_size: &str,
6272 #[case] low_size: &str,
6273 #[case] close_size: &str,
6274 ) {
6275 let volume = Quantity::from(volume);
6276 let increment = Quantity::from("1");
6277 let sizes = BarTickSizes::from_volume(volume, increment);
6278
6279 assert_eq!(sizes.open, Quantity::from(open_size));
6280 assert_eq!(sizes.high, Quantity::from(high_size));
6281 assert_eq!(sizes.low, Quantity::from(low_size));
6282 assert_eq!(sizes.close, Quantity::from(close_size));
6283 assert_valid_bar_tick_sizes(volume, increment);
6284 assert_eq!(
6285 sizes.open.raw + sizes.high.raw + sizes.low.raw + sizes.close.raw,
6286 volume.raw
6287 );
6288 }
6289
6290 #[rstest]
6291 fn test_bar_tick_sizes_zero_volume_remains_zero() {
6292 let volume = Quantity::zero(3);
6293 let increment = Quantity::from("0.001");
6294 let sizes = BarTickSizes::from_volume(volume, increment);
6295 assert_eq!(sizes.open, Quantity::zero(3));
6296 assert_eq!(sizes.high, Quantity::zero(3));
6297 assert_eq!(sizes.low, Quantity::zero(3));
6298 assert_eq!(sizes.close, Quantity::zero(3));
6299 assert_valid_bar_tick_sizes(volume, increment);
6300 }
6301
6302 #[rstest]
6303 fn test_bar_tick_sizes_rounds_down_to_size_increment() {
6304 let volume = Quantity::from("1.07");
6305 let increment = Quantity::from("0.10");
6306 let sizes = BarTickSizes::from_volume(volume, increment);
6307 assert_eq!(sizes.open, Quantity::from("0.20"));
6308 assert_eq!(sizes.high, Quantity::from("0.20"));
6309 assert_eq!(sizes.low, Quantity::from("0.20"));
6310 assert_eq!(sizes.close, Quantity::from("0.40"));
6311 assert_valid_bar_tick_sizes(volume, increment);
6312 }
6313
6314 #[rstest]
6315 fn test_bar_tick_sizes_at_fixed_precision() {
6316 let units: QuantityRaw = 17;
6319 let volume = Quantity::from_raw(units, FIXED_PRECISION);
6320 let increment = Quantity::from_raw(1, FIXED_PRECISION);
6321 let sizes = BarTickSizes::from_volume(volume, increment);
6322 assert_eq!(sizes.open.raw, 4);
6323 assert_eq!(sizes.high.raw, 4);
6324 assert_eq!(sizes.low.raw, 4);
6325 assert_eq!(sizes.close.raw, 5);
6326 assert_valid_bar_tick_sizes(volume, increment);
6327 }
6328}