Skip to main content

nautilus_execution/matching_engine/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::RefCell,
18    cmp::min,
19    fmt::Debug,
20    ops::{Add, Sub},
21    rc::Rc,
22};
23
24use chrono::TimeDelta;
25use indexmap::{IndexMap, IndexSet};
26use nautilus_common::{
27    cache::Cache,
28    clock::Clock,
29    messages::execution::{BatchCancelOrders, CancelAllOrders, CancelOrder, ModifyOrder},
30    msgbus::{self, MessagingSwitchboard},
31};
32use nautilus_core::{UUID4, UnixNanos};
33use nautilus_model::{
34    data::{
35        Bar, BarType, InstrumentClose, OrderBookDelta, OrderBookDeltas, OrderBookDepth10,
36        QuoteTick, TradeTick, order::BookOrder,
37    },
38    enums::{
39        AccountType, AggregationSource, AggressorSide, BookAction, BookType, ContingencyType,
40        InstrumentCloseType, LiquiditySide, MarketStatus, MarketStatusAction, OmsType, OptionKind,
41        OrderSide, OrderSideSpecified, OrderStatus, OrderType, PositionSide, PriceType,
42        TimeInForce, TriggerType,
43    },
44    events::{
45        OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderExpired,
46        OrderFilled, OrderModifyRejected, OrderRejected, OrderSubmitted, OrderTriggered,
47        OrderUpdated,
48    },
49    identifiers::{
50        AccountId, ClientOrderId, InstrumentId, PositionId, StrategyId, TradeId, TraderId, Venue,
51        VenueOrderId,
52    },
53    instruments::{Instrument, InstrumentAny},
54    orderbook::OrderBook,
55    orders::{MarketOrder, Order, OrderAny, OrderCore},
56    position::Position,
57    types::{
58        Currency, Money, Price, Quantity, fixed::FIXED_PRECISION, price::PriceRaw,
59        quantity::QuantityRaw,
60    },
61};
62use ustr::Ustr;
63
64use crate::{
65    matching_core::{MatchAction, OrderMatchingCore, RestingOrder},
66    matching_engine::{config::OrderMatchingEngineConfig, ids_generator::IdsGenerator},
67    models::{
68        fee::{FeeModel, FeeModelAny},
69        fill::{FillModel, FillModelAny},
70    },
71    protection::protection_price_calculate,
72    trailing::trailing_stop_calculate,
73};
74
75/// An order matching engine for a single market.
76pub struct OrderMatchingEngine {
77    /// The venue for the matching engine.
78    pub venue: Venue,
79    /// The instrument for the matching engine.
80    pub instrument: InstrumentAny,
81    /// The instruments raw integer ID for the venue.
82    pub raw_id: u32,
83    /// The order book type for the matching engine.
84    pub book_type: BookType,
85    /// The order management system (OMS) type for the matching engine.
86    pub oms_type: OmsType,
87    /// The account type for the matching engine.
88    pub account_type: AccountType,
89    /// The market status for the matching engine.
90    pub market_status: MarketStatus,
91    /// The config for the matching engine.
92    pub config: OrderMatchingEngineConfig,
93    core: OrderMatchingCore,
94    clock: Rc<RefCell<dyn Clock>>,
95    cache: Rc<RefCell<Cache>>,
96    book: OrderBook,
97    fill_model: FillModelAny,
98    fee_model: FeeModelAny,
99    event_handler: Option<Rc<dyn Fn(OrderEventAny)>>,
100    target_bid: Option<Price>,
101    target_ask: Option<Price>,
102    target_last: Option<Price>,
103    last_bar_bid: Option<Bar>,
104    last_bar_ask: Option<Bar>,
105    fill_at_market: bool,
106    execution_bar_types: IndexMap<InstrumentId, BarType>,
107    execution_bar_deltas: IndexMap<BarType, TimeDelta>,
108    account_ids: IndexMap<TraderId, AccountId>,
109    cached_filled_qty: IndexMap<ClientOrderId, Quantity>,
110    ids_generator: IdsGenerator,
111    last_trade_size: Option<Quantity>,
112    bid_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
113    ask_consumption: IndexMap<PriceRaw, (QuantityRaw, QuantityRaw)>,
114    trade_consumption: QuantityRaw,
115    queue_ahead: IndexMap<ClientOrderId, (PriceRaw, QuantityRaw)>,
116    queue_excess: IndexMap<ClientOrderId, QuantityRaw>,
117    queue_pending: IndexMap<ClientOrderId, PriceRaw>,
118    prev_bid_price_raw: PriceRaw,
119    prev_bid_size_raw: QuantityRaw,
120    prev_ask_price_raw: PriceRaw,
121    prev_ask_size_raw: QuantityRaw,
122    last_quote_bid: Option<Price>,
123    last_quote_ask: Option<Price>,
124    precision_mismatch_streak: u32,
125    tob_initialized: bool,
126    instrument_close: Option<InstrumentClose>,
127    pending_resolution: bool,
128    settlement_price: Option<Price>,
129    expiration_processed: bool,
130}
131
132impl Debug for OrderMatchingEngine {
133    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
134        f.debug_struct(stringify!(OrderMatchingEngine))
135            .field("venue", &self.venue)
136            .field("instrument", &self.instrument.id())
137            .finish()
138    }
139}
140
141impl OrderMatchingEngine {
142    /// Creates a new [`OrderMatchingEngine`] instance.
143    #[expect(clippy::too_many_arguments)]
144    pub fn new(
145        instrument: InstrumentAny,
146        raw_id: u32,
147        fill_model: FillModelAny,
148        fee_model: FeeModelAny,
149        book_type: BookType,
150        oms_type: OmsType,
151        account_type: AccountType,
152        clock: Rc<RefCell<dyn Clock>>,
153        cache: Rc<RefCell<Cache>>,
154        config: OrderMatchingEngineConfig,
155    ) -> Self {
156        let book = OrderBook::new(instrument.id(), book_type);
157        let mut core = OrderMatchingCore::new(instrument.id(), instrument.price_increment());
158        core.set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
159        let ids_generator = IdsGenerator::new(
160            instrument.id().venue,
161            oms_type,
162            raw_id,
163            config.use_random_ids,
164            config.use_position_ids,
165            cache.clone(),
166        );
167
168        Self {
169            venue: instrument.id().venue,
170            instrument,
171            raw_id,
172            fill_model,
173            fee_model,
174            event_handler: None,
175            book_type,
176            oms_type,
177            account_type,
178            clock,
179            cache,
180            book,
181            market_status: MarketStatus::Open,
182            config,
183            core,
184            target_bid: None,
185            target_ask: None,
186            target_last: None,
187            last_bar_bid: None,
188            last_bar_ask: None,
189            fill_at_market: true,
190            execution_bar_types: IndexMap::new(),
191            execution_bar_deltas: IndexMap::new(),
192            account_ids: IndexMap::new(),
193            cached_filled_qty: IndexMap::new(),
194            ids_generator,
195            last_trade_size: None,
196            bid_consumption: IndexMap::new(),
197            ask_consumption: IndexMap::new(),
198            trade_consumption: 0,
199            queue_ahead: IndexMap::new(),
200            queue_excess: IndexMap::new(),
201            queue_pending: IndexMap::new(),
202            prev_bid_price_raw: 0,
203            prev_bid_size_raw: 0,
204            prev_ask_price_raw: 0,
205            prev_ask_size_raw: 0,
206            last_quote_bid: None,
207            last_quote_ask: None,
208            precision_mismatch_streak: 0,
209            tob_initialized: false,
210            instrument_close: None,
211            pending_resolution: false,
212            settlement_price: None,
213            expiration_processed: false,
214        }
215    }
216
217    /// Sets the event handler for dispatching order events.
218    ///
219    /// When set, events are routed through the handler instead of directly
220    /// through the message bus. This allows sandbox execution clients to
221    /// dispatch events through the async runner channel, avoiding `RefCell`
222    /// re-entrancy panics.
223    pub fn set_event_handler(&mut self, handler: Rc<dyn Fn(OrderEventAny)>) {
224        self.event_handler = Some(handler);
225    }
226
227    fn dispatch_order_event(&self, event: OrderEventAny) {
228        if let Some(handler) = &self.event_handler {
229            handler(event);
230        } else {
231            let endpoint = MessagingSwitchboard::exec_engine_process();
232            msgbus::send_order_event(endpoint, event);
233        }
234    }
235
236    /// Resets the matching engine to its initial state.
237    ///
238    /// Clears the order book, execution state, cached data, and resets all
239    /// internal components. This is typically used for backtesting scenarios
240    /// where the engine needs to be reset between test runs.
241    pub fn reset(&mut self) {
242        self.book.reset();
243        self.execution_bar_types.clear();
244        self.execution_bar_deltas.clear();
245        self.account_ids.clear();
246        self.cached_filled_qty.clear();
247        self.core.reset();
248        self.target_bid = None;
249        self.target_ask = None;
250        self.target_last = None;
251        self.last_trade_size = None;
252        self.bid_consumption.clear();
253        self.ask_consumption.clear();
254        self.trade_consumption = 0;
255        self.queue_ahead.clear();
256        self.queue_excess.clear();
257        self.queue_pending.clear();
258        self.prev_bid_price_raw = 0;
259        self.prev_bid_size_raw = 0;
260        self.prev_ask_price_raw = 0;
261        self.prev_ask_size_raw = 0;
262        self.last_quote_bid = None;
263        self.last_quote_ask = None;
264        self.precision_mismatch_streak = 0;
265        self.tob_initialized = false;
266        self.instrument_close = None;
267        self.pending_resolution = false;
268        self.settlement_price = None;
269        self.expiration_processed = false;
270        self.fill_at_market = true;
271        self.ids_generator.reset();
272
273        log::info!("Reset {}", self.instrument.id());
274    }
275
276    fn apply_liquidity_consumption(
277        &mut self,
278        fills: Vec<(Price, Quantity)>,
279        order_side: OrderSide,
280        leaves_qty: Quantity,
281        book_prices: Option<&[Price]>,
282    ) -> Vec<(Price, Quantity)> {
283        if !self.config.liquidity_consumption {
284            return fills;
285        }
286
287        let consumption = match order_side {
288            OrderSide::Buy => &mut self.ask_consumption,
289            OrderSide::Sell => &mut self.bid_consumption,
290            _ => return fills,
291        };
292
293        let mut adjusted_fills = Vec::with_capacity(fills.len());
294        let mut remaining_qty = leaves_qty.raw;
295
296        for (fill_idx, (price, qty)) in fills.into_iter().enumerate() {
297            if remaining_qty == 0 {
298                break;
299            }
300
301            // Use book_price for consumption tracking (original price before MAKER adjustment),
302            // but use price (potentially adjusted) for the output fill.
303            let book_price = book_prices
304                .and_then(|bp| bp.get(fill_idx).copied())
305                .unwrap_or(price);
306
307            let book_price_raw = book_price.raw;
308            let level_size = self
309                .book
310                .get_quantity_at_level(book_price, order_side, qty.precision);
311
312            let (original_size, consumed) = consumption
313                .entry(book_price_raw)
314                .or_insert((level_size.raw, 0));
315
316            // Reset consumption when book size changes (fresh data)
317            if *original_size != level_size.raw {
318                *original_size = level_size.raw;
319                *consumed = 0;
320            }
321
322            let available = original_size.saturating_sub(*consumed);
323            if available == 0 {
324                continue;
325            }
326
327            let adjusted_qty_raw = min(min(qty.raw, available), remaining_qty);
328            if adjusted_qty_raw == 0 {
329                continue;
330            }
331
332            *consumed += adjusted_qty_raw;
333            remaining_qty -= adjusted_qty_raw;
334
335            let adjusted_qty = Quantity::from_raw(adjusted_qty_raw, qty.precision);
336            adjusted_fills.push((price, adjusted_qty));
337        }
338
339        adjusted_fills
340    }
341
342    fn seed_trade_consumption(
343        &mut self,
344        trade_price_raw: PriceRaw,
345        trade_size_raw: QuantityRaw,
346        trade_ts_event: UnixNanos,
347        aggressor_side: AggressorSide,
348    ) {
349        if trade_size_raw == 0 {
350            return;
351        }
352
353        // If the book was updated after the trade's event time, depth deltas
354        // already reflect this trade's consumed volume, skip to avoid double-counting
355        if self.book.ts_last > trade_ts_event {
356            return;
357        }
358
359        let consumption = match aggressor_side {
360            AggressorSide::Buyer => &mut self.ask_consumption,
361            AggressorSide::Seller => &mut self.bid_consumption,
362            AggressorSide::NoAggressor => return,
363        };
364
365        let levels: Vec<_> = match aggressor_side {
366            AggressorSide::Buyer => self
367                .book
368                .asks(None)
369                .take_while(|l| l.price.value.raw <= trade_price_raw)
370                .collect(),
371            AggressorSide::Seller => self
372                .book
373                .bids(None)
374                .take_while(|l| l.price.value.raw >= trade_price_raw)
375                .collect(),
376            _ => unreachable!(),
377        };
378
379        let mut remaining = trade_size_raw;
380        for level in &levels {
381            if remaining == 0 {
382                break;
383            }
384            let level_size = level.size_raw();
385            let entry = consumption
386                .entry(level.price.value.raw)
387                .or_insert((level_size, 0));
388
389            // Reconcile stale level size to prevent reset in apply_liquidity_consumption
390            if entry.0 != level_size {
391                entry.0 = level_size;
392                entry.1 = 0;
393            }
394
395            let available = level_size.saturating_sub(entry.1);
396            let consume = min(remaining, available);
397            entry.1 += consume;
398            remaining -= consume;
399        }
400    }
401
402    /// Sets the fill model for the matching engine.
403    pub fn set_fill_model(&mut self, fill_model: FillModelAny) {
404        self.core
405            .set_fill_limit_inside_spread(fill_model.fill_limit_inside_spread());
406        self.fill_model = fill_model;
407    }
408
409    pub fn set_settlement_price(&mut self, price: Price) {
410        self.settlement_price = Some(price);
411    }
412
413    fn snapshot_queue_position(&mut self, order: &OrderAny, price: Price) {
414        if !self.config.queue_position {
415            return;
416        }
417        let size_prec = self.instrument.size_precision();
418
419        // Pass opposite side because get_quantity_at_level flips internally
420        // (BUY reads asks, SELL reads bids). We want the resting side depth.
421        let qty_ahead = self.book.get_quantity_at_level(
422            price,
423            OrderCore::opposite_side(order.order_side()),
424            size_prec,
425        );
426
427        let client_order_id = order.client_order_id();
428
429        // Clear stale entries from both maps (e.g. order modified to new price)
430        self.queue_pending.shift_remove(&client_order_id);
431        self.queue_ahead.shift_remove(&client_order_id);
432
433        // For L1 books, levels behind the BBO have no visible depth. Track
434        // these orders separately so fills are blocked until the BBO reaches
435        // this price. Only truly behind-BBO prices are pending (BUY below
436        // best bid / SELL above best ask); inside-spread and no-book keep 0.
437        if self.book_type == BookType::L1_MBP && qty_ahead.raw == 0 {
438            let behind_bbo = match order.order_side() {
439                OrderSide::Buy => self.book.best_bid_price().is_some_and(|bid| price < bid),
440                OrderSide::Sell => self.book.best_ask_price().is_some_and(|ask| price > ask),
441                _ => false,
442            };
443
444            if behind_bbo {
445                self.queue_pending.insert(client_order_id, price.raw);
446                return;
447            }
448        }
449
450        self.queue_ahead
451            .insert(client_order_id, (price.raw, qty_ahead.raw));
452    }
453
454    fn decrement_queue_on_trade(
455        &mut self,
456        price_raw: PriceRaw,
457        trade_size_raw: QuantityRaw,
458        aggressor_side: AggressorSide,
459    ) {
460        if !self.config.queue_position {
461            return;
462        }
463
464        self.queue_excess.clear();
465
466        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
467        let mut entries: Vec<(ClientOrderId, QuantityRaw, QuantityRaw)> = Vec::new();
468        let mut stale: Vec<ClientOrderId> = Vec::new();
469
470        for client_order_id in keys {
471            let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
472            {
473                Some(v) => v,
474                None => continue,
475            };
476
477            let cache = self.cache.borrow();
478            let order_info = cache.order(&client_order_id).and_then(|order| {
479                if order.is_closed() {
480                    None
481                } else {
482                    Some((order.order_side(), order.leaves_qty().raw))
483                }
484            });
485            drop(cache);
486
487            let Some((order_side, leaves_raw)) = order_info else {
488                stale.push(client_order_id);
489                continue;
490            };
491
492            if order_price_raw != price_raw || ahead_raw == 0 {
493                continue;
494            }
495
496            let should_decrement = matches!(aggressor_side, AggressorSide::NoAggressor)
497                || (aggressor_side == AggressorSide::Buyer && order_side == OrderSide::Sell)
498                || (aggressor_side == AggressorSide::Seller && order_side == OrderSide::Buy);
499
500            if should_decrement {
501                entries.push((client_order_id, ahead_raw, leaves_raw));
502            }
503        }
504
505        for id in stale {
506            self.queue_ahead.shift_remove(&id);
507        }
508
509        // Sort by queue position (earliest first) for shared budget allocation
510        entries.sort_by_key(|&(_, ahead, _)| ahead);
511
512        let mut remaining = trade_size_raw;
513        let mut prev_position: QuantityRaw = 0;
514
515        for (client_order_id, ahead_raw, leaves_raw) in &entries {
516            if remaining == 0 {
517                let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
518                self.queue_ahead
519                    .insert(*client_order_id, (price_raw, new_ahead));
520                if new_ahead == 0 {
521                    // Queue cleared but no trade volume left for this order
522                    self.queue_excess.insert(*client_order_id, 0);
523                }
524                continue;
525            }
526
527            // Consume the gap between previous position and this order's depth
528            let gap = ahead_raw.saturating_sub(prev_position);
529            let queue_consumed = remaining.min(gap);
530            remaining -= queue_consumed;
531
532            if remaining == 0 && queue_consumed < gap {
533                let new_ahead = ahead_raw.saturating_sub(trade_size_raw);
534                self.queue_ahead
535                    .insert(*client_order_id, (price_raw, new_ahead));
536                continue;
537            }
538
539            self.queue_ahead.insert(*client_order_id, (price_raw, 0));
540            let excess = remaining.min(*leaves_raw);
541            self.queue_excess.insert(*client_order_id, excess);
542            remaining -= excess;
543            prev_position = ahead_raw + excess;
544        }
545    }
546
547    fn determine_trade_fill_qty(&self, order: &OrderAny) -> Option<QuantityRaw> {
548        if !self.config.queue_position {
549            return Some(order.leaves_qty().raw);
550        }
551
552        let client_order_id = order.client_order_id();
553
554        // Block fills for L1 orders pending a deferred snapshot
555        if self.queue_pending.contains_key(&client_order_id) {
556            return None;
557        }
558
559        if let Some(&(tracked_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id)
560            && let Some(order_price) = order.price()
561            && order_price.raw == tracked_price_raw
562            && ahead_raw > 0
563        {
564            return None;
565        }
566
567        let leaves_raw = order.leaves_qty().raw;
568        if leaves_raw == 0 {
569            return None;
570        }
571
572        let mut available_raw = leaves_raw;
573
574        // Cap by remaining trade volume and queue excess (only during trade processing)
575        if let Some(trade_size) = self.last_trade_size {
576            let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
577            available_raw = available_raw.min(remaining);
578
579            if let Some(&excess_raw) = self.queue_excess.get(&client_order_id) {
580                if excess_raw == 0 {
581                    return None;
582                }
583                available_raw = available_raw.min(excess_raw);
584            }
585        }
586
587        if available_raw == 0 {
588            return None;
589        }
590
591        Some(available_raw)
592    }
593
594    fn clear_all_queue_positions(&mut self) {
595        for (_, (_, ahead_raw)) in &mut self.queue_ahead {
596            *ahead_raw = 0;
597        }
598    }
599
600    fn clear_queue_on_delete(&mut self, deleted_price_raw: PriceRaw, deleted_side: OrderSide) {
601        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
602        for client_order_id in keys {
603            if let Some(&(order_price_raw, _)) = self.queue_ahead.get(&client_order_id)
604                && order_price_raw == deleted_price_raw
605            {
606                let matches_side = self
607                    .cache
608                    .borrow()
609                    .order(&client_order_id)
610                    .is_some_and(|o| o.order_side() == deleted_side);
611                if matches_side {
612                    self.queue_ahead
613                        .insert(client_order_id, (order_price_raw, 0));
614                }
615            }
616        }
617    }
618
619    fn cap_queue_ahead(
620        &mut self,
621        price_raw: PriceRaw,
622        size_raw: QuantityRaw,
623        order_side: OrderSide,
624    ) {
625        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
626        let mut stale: Vec<ClientOrderId> = Vec::new();
627
628        for client_order_id in keys {
629            let (order_price_raw, ahead_raw) = match self.queue_ahead.get(&client_order_id).copied()
630            {
631                Some(v) => v,
632                None => continue,
633            };
634
635            if order_price_raw != price_raw || ahead_raw <= size_raw {
636                continue;
637            }
638
639            let cache = self.cache.borrow();
640            let order_info = cache.order(&client_order_id).and_then(|order| {
641                if order.is_closed() {
642                    None
643                } else {
644                    Some(order.order_side())
645                }
646            });
647            drop(cache);
648
649            let Some(side) = order_info else {
650                stale.push(client_order_id);
651                continue;
652            };
653
654            if side != order_side {
655                continue;
656            }
657
658            self.queue_ahead
659                .insert(client_order_id, (order_price_raw, size_raw));
660        }
661
662        for id in stale {
663            self.queue_ahead.shift_remove(&id);
664        }
665    }
666
667    fn seed_tob_baseline(&mut self) {
668        let bid = self.book.best_bid_price();
669        let ask = self.book.best_ask_price();
670        self.prev_bid_price_raw = bid.map_or(0, |p| p.raw);
671        self.prev_bid_size_raw = self.book.best_bid_size().map_or(0, |q| q.raw);
672        self.prev_ask_price_raw = ask.map_or(0, |p| p.raw);
673        self.prev_ask_size_raw = self.book.best_ask_size().map_or(0, |q| q.raw);
674        self.tob_initialized = bid.is_some() || ask.is_some();
675    }
676
677    fn decrement_l1_queue_on_quote(
678        &mut self,
679        bid_price_raw: PriceRaw,
680        bid_size_raw: QuantityRaw,
681        ask_price_raw: PriceRaw,
682        ask_size_raw: QuantityRaw,
683    ) {
684        if !self.config.queue_position {
685            return;
686        }
687
688        // Price-move detection requires a valid prior TOB snapshot
689        if self.tob_initialized {
690            // BID side (BUY limit orders): handle price drops (crossed/snapshot)
691            if bid_price_raw < self.prev_bid_price_raw {
692                self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
693            }
694
695            // ASK side (SELL limit orders): handle price rises (crossed/snapshot)
696            if ask_price_raw > self.prev_ask_price_raw {
697                self.adjust_l1_queue_on_price_move(ask_price_raw, ask_size_raw, OrderSide::Sell);
698            }
699        }
700
701        // Resolve pending snapshots when BBO reaches a tracked order's price
702        self.resolve_pending_l1_snapshots(bid_price_raw, bid_size_raw, ask_price_raw, ask_size_raw);
703    }
704
705    fn adjust_l1_queue_on_price_move(
706        &mut self,
707        new_price_raw: PriceRaw,
708        new_size_raw: QuantityRaw,
709        order_side: OrderSide,
710    ) {
711        let keys: Vec<ClientOrderId> = self.queue_ahead.keys().copied().collect();
712        let mut stale: Vec<ClientOrderId> = Vec::new();
713
714        for client_order_id in keys {
715            let Some(&(order_price_raw, ahead_raw)) = self.queue_ahead.get(&client_order_id) else {
716                continue;
717            };
718
719            let cache = self.cache.borrow();
720            let order_info = cache.order(&client_order_id).and_then(|order| {
721                if order.is_closed() {
722                    None
723                } else {
724                    Some(order.order_side())
725                }
726            });
727            drop(cache);
728
729            let Some(side) = order_info else {
730                stale.push(client_order_id);
731                continue;
732            };
733
734            if side != order_side {
735                continue;
736            }
737
738            // BUY orders crossed when bid drops below order price
739            // SELL orders crossed when ask rises above order price
740            let crossed = match order_side {
741                OrderSide::Buy => order_price_raw > new_price_raw,
742                _ => order_price_raw < new_price_raw,
743            };
744
745            if crossed {
746                self.queue_ahead
747                    .insert(client_order_id, (order_price_raw, 0));
748            } else if order_price_raw == new_price_raw && ahead_raw > new_size_raw {
749                self.queue_ahead
750                    .insert(client_order_id, (order_price_raw, new_size_raw));
751            }
752        }
753
754        for id in stale {
755            self.queue_ahead.shift_remove(&id);
756        }
757
758        // Also resolve pending L1 orders affected by this price move
759        let pending_keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
760        let mut pending_stale: Vec<ClientOrderId> = Vec::new();
761
762        for client_order_id in pending_keys {
763            let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
764                continue;
765            };
766
767            let cache = self.cache.borrow();
768            let order_info = cache.order(&client_order_id).and_then(|order| {
769                if order.is_closed() {
770                    None
771                } else {
772                    Some(order.order_side())
773                }
774            });
775            drop(cache);
776
777            let Some(side) = order_info else {
778                pending_stale.push(client_order_id);
779                continue;
780            };
781
782            if side != order_side {
783                continue;
784            }
785
786            let crossed = match order_side {
787                OrderSide::Buy => order_price_raw > new_price_raw,
788                _ => order_price_raw < new_price_raw,
789            };
790
791            if crossed {
792                self.queue_pending.shift_remove(&client_order_id);
793                self.queue_ahead
794                    .insert(client_order_id, (order_price_raw, 0));
795            } else if order_price_raw == new_price_raw {
796                self.queue_pending.shift_remove(&client_order_id);
797                self.queue_ahead
798                    .insert(client_order_id, (order_price_raw, new_size_raw));
799            }
800        }
801
802        for id in pending_stale {
803            self.queue_pending.shift_remove(&id);
804        }
805    }
806
807    fn resolve_pending_l1_snapshots(
808        &mut self,
809        bid_price_raw: PriceRaw,
810        bid_size_raw: QuantityRaw,
811        ask_price_raw: PriceRaw,
812        ask_size_raw: QuantityRaw,
813    ) {
814        let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
815        let mut stale: Vec<ClientOrderId> = Vec::new();
816
817        for client_order_id in keys {
818            let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
819                continue;
820            };
821
822            let cache = self.cache.borrow();
823            let order_info = cache.order(&client_order_id).and_then(|order| {
824                if order.is_closed() {
825                    None
826                } else {
827                    Some(order.order_side())
828                }
829            });
830            drop(cache);
831
832            let Some(side) = order_info else {
833                stale.push(client_order_id);
834                continue;
835            };
836
837            // Initialize snapshot when BBO reaches the order's price level
838            let matched_size = match side {
839                OrderSide::Buy if order_price_raw == bid_price_raw => Some(bid_size_raw),
840                OrderSide::Sell if order_price_raw == ask_price_raw => Some(ask_size_raw),
841                _ => None,
842            };
843
844            if let Some(size) = matched_size {
845                self.queue_pending.shift_remove(&client_order_id);
846                self.queue_ahead
847                    .insert(client_order_id, (order_price_raw, size));
848            }
849        }
850
851        for id in stale {
852            self.queue_pending.shift_remove(&id);
853        }
854    }
855
856    fn resolve_pending_on_trade(&mut self, trade_price_raw: PriceRaw) {
857        let keys: Vec<ClientOrderId> = self.queue_pending.keys().copied().collect();
858        let mut stale: Vec<ClientOrderId> = Vec::new();
859
860        for client_order_id in keys {
861            let Some(&order_price_raw) = self.queue_pending.get(&client_order_id) else {
862                continue;
863            };
864
865            let cache = self.cache.borrow();
866            let order_side = cache.order(&client_order_id).and_then(|order| {
867                if order.is_closed() {
868                    None
869                } else {
870                    Some(order.order_side())
871                }
872            });
873            drop(cache);
874
875            let Some(side) = order_side else {
876                stale.push(client_order_id);
877                continue;
878            };
879
880            // Trade through a pending level proves the queue was crossed
881            let crossed = match side {
882                OrderSide::Buy => trade_price_raw < order_price_raw,
883                OrderSide::Sell => trade_price_raw > order_price_raw,
884                _ => false,
885            };
886
887            if crossed {
888                self.queue_pending.shift_remove(&client_order_id);
889                self.queue_ahead
890                    .insert(client_order_id, (order_price_raw, 0));
891            }
892        }
893
894        for id in stale {
895            self.queue_pending.shift_remove(&id);
896        }
897    }
898
899    #[must_use]
900    /// Returns the best bid price from the order book.
901    pub fn best_bid_price(&self) -> Option<Price> {
902        self.book.best_bid_price()
903    }
904
905    #[must_use]
906    /// Returns the best ask price from the order book.
907    pub fn best_ask_price(&self) -> Option<Price> {
908        self.book.best_ask_price()
909    }
910
911    #[must_use]
912    /// Returns a reference to the internal order book.
913    pub const fn get_book(&self) -> &OrderBook {
914        &self.book
915    }
916
917    #[must_use]
918    /// Returns all open bid orders managed by the matching core.
919    pub fn get_open_bid_orders(&self) -> Vec<RestingOrder> {
920        self.core.get_orders_bid()
921    }
922
923    #[must_use]
924    /// Returns all open ask orders managed by the matching core.
925    pub fn get_open_ask_orders(&self) -> Vec<RestingOrder> {
926        self.core.get_orders_ask()
927    }
928
929    #[must_use]
930    /// Returns all open orders from both bid and ask sides.
931    pub fn get_open_orders(&self) -> Vec<RestingOrder> {
932        self.core.get_orders()
933    }
934
935    #[must_use]
936    /// Returns true if an order with the given client order ID exists in the matching engine.
937    pub fn order_exists(&self, client_order_id: ClientOrderId) -> bool {
938        self.core.order_exists(client_order_id)
939    }
940
941    #[must_use]
942    /// Returns the number of partial-fill counters tracked by the engine.
943    pub fn cached_filled_qty_len(&self) -> usize {
944        self.cached_filled_qty.len()
945    }
946
947    #[must_use]
948    pub const fn get_core(&self) -> &OrderMatchingCore {
949        &self.core
950    }
951
952    pub fn set_fill_at_market(&mut self, value: bool) {
953        self.fill_at_market = value;
954    }
955
956    /// Updates the instrument definition used by this matching engine.
957    ///
958    /// # Errors
959    ///
960    /// Returns an error if `instrument.id()` does not match this engines instrument ID.
961    pub fn update_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
962        if instrument.id() != self.instrument.id() {
963            anyhow::bail!(
964                "Cannot update instrument {} with {}",
965                self.instrument.id(),
966                instrument.id()
967            );
968        }
969
970        let changed = instrument.price_increment() != self.instrument.price_increment()
971            || instrument.price_precision() != self.instrument.price_precision()
972            || instrument.size_precision() != self.instrument.size_precision();
973
974        if changed {
975            self.core
976                .update_price_increment(instrument.price_increment());
977            self.book.reset();
978            self.bid_consumption.clear();
979            self.ask_consumption.clear();
980            self.trade_consumption = 0;
981            self.queue_ahead.clear();
982            self.queue_excess.clear();
983            self.queue_pending.clear();
984            self.prev_bid_price_raw = 0;
985            self.prev_bid_size_raw = 0;
986            self.prev_ask_price_raw = 0;
987            self.prev_ask_size_raw = 0;
988            self.last_quote_bid = None;
989            self.last_quote_ask = None;
990            self.precision_mismatch_streak = 0;
991            self.tob_initialized = false;
992            self.target_bid = None;
993            self.target_ask = None;
994            self.target_last = None;
995            self.last_bar_bid = None;
996            self.last_bar_ask = None;
997            self.core.bid = None;
998            self.core.ask = None;
999            self.core.last = None;
1000            log::info!(
1001                "Updated instrument {} (price_precision={} size_precision={})",
1002                instrument.id(),
1003                instrument.price_precision(),
1004                instrument.size_precision()
1005            );
1006        }
1007
1008        self.instrument = instrument;
1009
1010        if changed {
1011            self.drop_incompatible_core_orders();
1012        }
1013        Ok(())
1014    }
1015
1016    fn check_price_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
1017        let expected = self.instrument.price_precision();
1018        if actual != expected {
1019            anyhow::bail!(
1020                "Invalid {field} precision {actual}, expected {expected} for {}",
1021                self.instrument.id()
1022            );
1023        }
1024        Ok(())
1025    }
1026
1027    fn check_size_precision(&self, actual: u8, field: &str) -> anyhow::Result<()> {
1028        let expected = self.instrument.size_precision();
1029        if actual != expected {
1030            anyhow::bail!(
1031                "Invalid {field} precision {actual}, expected {expected} for {}",
1032                self.instrument.id()
1033            );
1034        }
1035        Ok(())
1036    }
1037
1038    fn log_precision_mismatch(
1039        &mut self,
1040        data_type: &str,
1041        instrument_id: InstrumentId,
1042        err: &anyhow::Error,
1043    ) {
1044        self.precision_mismatch_streak = self.precision_mismatch_streak.saturating_add(1);
1045        let streak = self.precision_mismatch_streak;
1046
1047        if streak <= 3 || streak.is_multiple_of(100) {
1048            log::warn!(
1049                "Skipping {data_type} for {instrument_id}: {err} \
1050                 (consecutive_precision_mismatches={streak})"
1051            );
1052        }
1053
1054        if streak == 20 {
1055            log::error!(
1056                "Precision mismatches reached {streak} consecutive events for \
1057                 {instrument_id}; check instrument update flow and upstream market data"
1058            );
1059        }
1060    }
1061
1062    fn drop_incompatible_core_orders(&mut self) {
1063        let client_order_ids: Vec<ClientOrderId> = self
1064            .core
1065            .iter_orders()
1066            .filter(|order| {
1067                !self.resting_order_matches_current_instrument(order)
1068                    || !self.cached_order_matches_current_instrument(order.client_order_id)
1069            })
1070            .map(|order| order.client_order_id)
1071            .collect();
1072
1073        for client_order_id in client_order_ids {
1074            let order = self
1075                .cache
1076                .borrow()
1077                .order(&client_order_id)
1078                .map(|o| o.clone());
1079            if let Some(order) = order
1080                && (order.is_inflight() || order.is_open())
1081            {
1082                log::warn!(
1083                    "Canceling order {client_order_id} after instrument update: \
1084                     price, trigger price, or quantity is not compatible with {}",
1085                    self.instrument.id()
1086                );
1087                self.cancel_order(&order, None);
1088            } else {
1089                let _ = self.core.delete_order(client_order_id);
1090                self.cached_filled_qty.swap_remove(&client_order_id);
1091            }
1092        }
1093    }
1094
1095    fn cached_order_matches_current_instrument(&self, client_order_id: ClientOrderId) -> bool {
1096        self.cache
1097            .borrow()
1098            .order(&client_order_id)
1099            .is_none_or(|order| {
1100                Self::quantity_matches_precision(order.quantity(), self.instrument.size_precision())
1101            })
1102    }
1103
1104    fn resting_order_matches_current_instrument(&self, order: &RestingOrder) -> bool {
1105        order
1106            .limit_price
1107            .is_none_or(|price| self.price_matches_current_instrument(price))
1108            && order
1109                .trigger_price
1110                .is_none_or(|price| self.price_matches_current_instrument(price))
1111    }
1112
1113    fn price_matches_current_instrument(&self, price: Price) -> bool {
1114        Self::price_matches_precision(price, self.instrument.price_precision())
1115            && Self::price_matches_tick(price, self.instrument.price_increment())
1116    }
1117
1118    fn price_matches_precision(price: Price, precision: u8) -> bool {
1119        let precision_diff = FIXED_PRECISION.saturating_sub(precision);
1120        let scale = PriceRaw::pow(10, u32::from(precision_diff));
1121        price.raw % scale == 0
1122    }
1123
1124    fn price_matches_tick(price: Price, increment: Price) -> bool {
1125        let increment_raw = increment.raw.abs();
1126        increment_raw == 0 || price.raw % increment_raw == 0
1127    }
1128
1129    fn quantity_matches_precision(quantity: Quantity, precision: u8) -> bool {
1130        let precision_diff = FIXED_PRECISION.saturating_sub(precision);
1131        let scale = QuantityRaw::pow(10, u32::from(precision_diff));
1132        quantity.raw.is_multiple_of(scale)
1133    }
1134
1135    fn normalize_price_for_current_instrument(&self, price: Price) -> Option<Price> {
1136        if !self.price_matches_current_instrument(price) {
1137            return None;
1138        }
1139
1140        Some(Price::from_raw(
1141            price.raw,
1142            self.instrument.price_precision(),
1143        ))
1144    }
1145
1146    fn normalize_quantity_for_current_instrument(&self, quantity: Quantity) -> Option<Quantity> {
1147        let precision = self.instrument.size_precision();
1148        if !Self::quantity_matches_precision(quantity, precision) {
1149            return None;
1150        }
1151
1152        Some(Quantity::from_raw(quantity.raw, precision))
1153    }
1154
1155    /// Process the venues market for the given order book delta.
1156    ///
1157    /// # Errors
1158    ///
1159    /// - If delta order price precision does not match the instrument (for Add/Update actions).
1160    /// - If delta order size precision does not match the instrument (for Add/Update actions).
1161    /// - If applying the delta to the book fails.
1162    pub fn process_order_book_delta(&mut self, delta: &OrderBookDelta) -> anyhow::Result<()> {
1163        log::debug!("Processing {delta}");
1164
1165        // Validate precision for Add and Update actions (Delete/Clear may have NULL_ORDER)
1166        if matches!(delta.action, BookAction::Add | BookAction::Update) {
1167            self.check_price_precision(delta.order.price.precision, "delta order price")?;
1168            self.check_size_precision(delta.order.size.precision, "delta order size")?;
1169        }
1170
1171        // L1 books are driven by top-of-book data only, ignore deltas
1172        if self.book_type == BookType::L1_MBP {
1173            self.iterate(delta.ts_init, AggressorSide::NoAggressor);
1174            return Ok(());
1175        }
1176
1177        self.book.apply_delta(delta)?;
1178
1179        let delta_snapshot_or_clear = (delta.flags & 32) != 0 || delta.action == BookAction::Clear;
1180
1181        if self.config.queue_position {
1182            if delta_snapshot_or_clear {
1183                self.clear_all_queue_positions();
1184            } else if delta.action == BookAction::Delete {
1185                self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1186            } else if delta.action == BookAction::Update {
1187                self.cap_queue_ahead(
1188                    delta.order.price.raw,
1189                    delta.order.size.raw,
1190                    delta.order.side,
1191                );
1192            }
1193        }
1194
1195        if self.config.queue_position && delta_snapshot_or_clear {
1196            self.seed_tob_baseline();
1197        }
1198
1199        self.iterate(delta.ts_init, AggressorSide::NoAggressor);
1200        Ok(())
1201    }
1202
1203    /// Process the venues market for the given order book deltas.
1204    ///
1205    /// # Errors
1206    ///
1207    /// - If any delta order price precision does not match the instrument (for Add/Update actions).
1208    /// - If any delta order size precision does not match the instrument (for Add/Update actions).
1209    /// - If applying the deltas to the book fails.
1210    pub fn process_order_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
1211        log::debug!("Processing {deltas}");
1212
1213        // Validate precision for Add and Update actions (Delete/Clear may have NULL_ORDER)
1214        for delta in &deltas.deltas {
1215            if matches!(delta.action, BookAction::Add | BookAction::Update) {
1216                self.check_price_precision(delta.order.price.precision, "delta order price")?;
1217                self.check_size_precision(delta.order.size.precision, "delta order size")?;
1218            }
1219        }
1220
1221        // L1 books are driven by top-of-book data only, ignore deltas
1222        if self.book_type == BookType::L1_MBP {
1223            self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1224            return Ok(());
1225        }
1226
1227        self.book.apply_deltas(deltas)?;
1228
1229        let mut has_snapshot_or_clear = false;
1230
1231        if self.config.queue_position {
1232            for delta in &deltas.deltas {
1233                if (delta.flags & 32) != 0 || delta.action == BookAction::Clear {
1234                    self.clear_all_queue_positions();
1235                    has_snapshot_or_clear = true;
1236                    break;
1237                } else if delta.action == BookAction::Delete {
1238                    self.clear_queue_on_delete(delta.order.price.raw, delta.order.side);
1239                } else if delta.action == BookAction::Update {
1240                    self.cap_queue_ahead(
1241                        delta.order.price.raw,
1242                        delta.order.size.raw,
1243                        delta.order.side,
1244                    );
1245                }
1246            }
1247        }
1248
1249        if self.config.queue_position && has_snapshot_or_clear {
1250            self.seed_tob_baseline();
1251        }
1252
1253        self.iterate(deltas.ts_init, AggressorSide::NoAggressor);
1254        Ok(())
1255    }
1256
1257    /// Process the venues market for the given order book depth10.
1258    ///
1259    /// # Errors
1260    ///
1261    /// - If any bid/ask price precision does not match the instrument.
1262    /// - If any bid/ask size precision does not match the instrument.
1263    /// - If applying the depth to the book fails.
1264    /// - If updating the L1 order book with the top-of-book quote fails.
1265    pub fn process_order_book_depth10(&mut self, depth: &OrderBookDepth10) -> anyhow::Result<()> {
1266        log::debug!("Processing OrderBookDepth10 for {}", depth.instrument_id);
1267
1268        // Validate precision for non-padding entries
1269        for order in &depth.bids {
1270            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1271                continue;
1272            }
1273            self.check_price_precision(order.price.precision, "bid price")?;
1274            self.check_size_precision(order.size.precision, "bid size")?;
1275        }
1276
1277        for order in &depth.asks {
1278            if order.side == OrderSide::NoOrderSide || !order.size.is_positive() {
1279                continue;
1280            }
1281            self.check_price_precision(order.price.precision, "ask price")?;
1282            self.check_size_precision(order.size.precision, "ask size")?;
1283        }
1284
1285        // For L1 books, only apply top-of-book to avoid mispricing
1286        // against worst-level entries when full depth is applied
1287        if self.book_type == BookType::L1_MBP {
1288            let quote = QuoteTick::new(
1289                depth.instrument_id,
1290                depth.bids[0].price,
1291                depth.asks[0].price,
1292                depth.bids[0].size,
1293                depth.asks[0].size,
1294                depth.ts_event,
1295                depth.ts_init,
1296            );
1297            self.book.update_quote_tick(&quote)?;
1298            self.last_quote_bid = Some(depth.bids[0].price);
1299            self.last_quote_ask = Some(depth.asks[0].price);
1300        } else {
1301            self.book.apply_depth(depth)?;
1302        }
1303
1304        // Depth10 always replaces the full book via apply_depth regardless of flags
1305        if self.config.queue_position {
1306            self.clear_all_queue_positions();
1307            let bid_price_raw = depth.bids[0].price.raw;
1308            let bid_size_raw = depth.bids[0].size.raw;
1309            let ask_price_raw = depth.asks[0].price.raw;
1310            let ask_size_raw = depth.asks[0].size.raw;
1311
1312            // Handle crossed/matched pending orders (same as quote path)
1313            if self.tob_initialized {
1314                if bid_price_raw < self.prev_bid_price_raw {
1315                    self.adjust_l1_queue_on_price_move(bid_price_raw, bid_size_raw, OrderSide::Buy);
1316                }
1317
1318                if ask_price_raw > self.prev_ask_price_raw {
1319                    self.adjust_l1_queue_on_price_move(
1320                        ask_price_raw,
1321                        ask_size_raw,
1322                        OrderSide::Sell,
1323                    );
1324                }
1325            }
1326
1327            self.resolve_pending_l1_snapshots(
1328                bid_price_raw,
1329                bid_size_raw,
1330                ask_price_raw,
1331                ask_size_raw,
1332            );
1333
1334            self.prev_bid_price_raw = bid_price_raw;
1335            self.prev_bid_size_raw = bid_size_raw;
1336            self.prev_ask_price_raw = ask_price_raw;
1337            self.prev_ask_size_raw = ask_size_raw;
1338            self.tob_initialized = true;
1339        }
1340
1341        self.iterate(depth.ts_init, AggressorSide::NoAggressor);
1342        Ok(())
1343    }
1344
1345    /// Processes a quote tick to update the market state.
1346    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
1347        log::debug!("Processing {quote}");
1348
1349        if let Err(e) = self.check_price_precision(quote.bid_price.precision, "bid_price") {
1350            self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1351            return;
1352        }
1353
1354        if let Err(e) = self.check_price_precision(quote.ask_price.precision, "ask_price") {
1355            self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1356            return;
1357        }
1358
1359        if let Err(e) = self.check_size_precision(quote.bid_size.precision, "bid_size") {
1360            self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1361            return;
1362        }
1363
1364        if let Err(e) = self.check_size_precision(quote.ask_size.precision, "ask_size") {
1365            self.log_precision_mismatch("quote tick", quote.instrument_id, &e);
1366            return;
1367        }
1368
1369        self.precision_mismatch_streak = 0;
1370
1371        if self.book_type == BookType::L1_MBP {
1372            // Stale update: skip book mutation and cache updates
1373            if quote.ts_event < self.book.ts_last {
1374                log::warn!(
1375                    "Skipping stale quote: ts_event {} < book.ts_last {} for {}",
1376                    quote.ts_event,
1377                    self.book.ts_last,
1378                    self.book.instrument_id,
1379                );
1380                self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1381                return;
1382            }
1383
1384            if !self.update_quote_tick_or_skip(quote, "quote tick") {
1385                return;
1386            }
1387
1388            if self.config.queue_position {
1389                self.decrement_l1_queue_on_quote(
1390                    quote.bid_price.raw,
1391                    quote.bid_size.raw,
1392                    quote.ask_price.raw,
1393                    quote.ask_size.raw,
1394                );
1395                self.prev_bid_price_raw = quote.bid_price.raw;
1396                self.prev_bid_size_raw = quote.bid_size.raw;
1397                self.prev_ask_price_raw = quote.ask_price.raw;
1398                self.prev_ask_size_raw = quote.ask_size.raw;
1399                self.tob_initialized = true;
1400            }
1401            self.last_quote_bid = Some(quote.bid_price);
1402            self.last_quote_ask = Some(quote.ask_price);
1403        }
1404
1405        self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1406    }
1407
1408    /// Processes a bar and simulates market dynamics by creating synthetic ticks.
1409    ///
1410    /// For L1 books with bar execution enabled, generates synthetic trade or quote
1411    /// ticks from bar OHLC data to drive order matching.
1412    ///
1413    /// # Panics
1414    ///
1415    /// - If the bar type configuration is missing a time delta.
1416    pub fn process_bar(&mut self, bar: &Bar) {
1417        log::debug!("Processing {bar}");
1418
1419        // Check if configured for bar execution can only process an L1 book with bars
1420        if !self.config.bar_execution || self.book_type != BookType::L1_MBP {
1421            return;
1422        }
1423
1424        let bar_type = bar.bar_type;
1425        // Do not process internally aggregated bars
1426        if bar_type.aggregation_source() == AggregationSource::Internal {
1427            return;
1428        }
1429
1430        if let Err(e) = self.check_price_precision(bar.open.precision, "bar open") {
1431            self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1432            return;
1433        }
1434
1435        if let Err(e) = self.check_price_precision(bar.high.precision, "bar high") {
1436            self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1437            return;
1438        }
1439
1440        if let Err(e) = self.check_price_precision(bar.low.precision, "bar low") {
1441            self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1442            return;
1443        }
1444
1445        if let Err(e) = self.check_price_precision(bar.close.precision, "bar close") {
1446            self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1447            return;
1448        }
1449
1450        if let Err(e) = self.check_size_precision(bar.volume.precision, "bar volume") {
1451            self.log_precision_mismatch("bar", bar.instrument_id(), &e);
1452            return;
1453        }
1454
1455        self.precision_mismatch_streak = 0;
1456
1457        let price_type = bar_type.spec().price_type;
1458        if price_type == PriceType::Mark {
1459            log::warn!(
1460                "Cannot process bar for {} with `PriceType::Mark`, mark price bars are not supported for bar execution",
1461                bar.instrument_id(),
1462            );
1463            return;
1464        }
1465
1466        let execution_bar_type =
1467            if let Some(execution_bar_type) = self.execution_bar_types.get(&bar.instrument_id()) {
1468                execution_bar_type.to_owned()
1469            } else {
1470                self.execution_bar_types
1471                    .insert(bar.instrument_id(), bar_type);
1472                self.execution_bar_deltas
1473                    .insert(bar_type, bar_type.spec().timedelta());
1474                bar_type
1475            };
1476
1477        if execution_bar_type != bar_type {
1478            let mut bar_type_timedelta = self.execution_bar_deltas.get(&bar_type).copied();
1479            if bar_type_timedelta.is_none() {
1480                bar_type_timedelta = Some(bar_type.spec().timedelta());
1481                self.execution_bar_deltas
1482                    .insert(bar_type, bar_type_timedelta.unwrap());
1483            }
1484
1485            if self.execution_bar_deltas.get(&execution_bar_type).unwrap()
1486                >= &bar_type_timedelta.unwrap()
1487            {
1488                self.execution_bar_types
1489                    .insert(bar_type.instrument_id(), bar_type);
1490            } else {
1491                return;
1492            }
1493        }
1494
1495        match price_type {
1496            PriceType::Last | PriceType::Mid => self.process_trade_ticks_from_bar(bar),
1497            PriceType::Bid => {
1498                self.last_bar_bid = Some(bar.to_owned());
1499                self.process_quote_ticks_from_bar();
1500            }
1501            PriceType::Ask => {
1502                self.last_bar_ask = Some(bar.to_owned());
1503                self.process_quote_ticks_from_bar();
1504            }
1505            PriceType::Mark => {
1506                unreachable!("PriceType::Mark bars return before execution bar state updates")
1507            }
1508        }
1509    }
1510
1511    fn process_trade_ticks_from_bar(&mut self, bar: &Bar) {
1512        let sizes = BarTickSizes::from_volume(bar.volume, self.instrument.size_increment());
1513
1514        let aggressor_side = if self.core.last.is_none_or(|last| bar.open > last) {
1515            AggressorSide::Buyer
1516        } else {
1517            AggressorSide::Seller
1518        };
1519
1520        // Open: fill at market price (gap from previous bar)
1521        if self.core.last.is_none() {
1522            self.fill_at_market = true;
1523
1524            if !self.process_bar_trade_tick(
1525                bar,
1526                bar.open,
1527                sizes.open,
1528                aggressor_side,
1529                "bar open trade tick",
1530            ) {
1531                return;
1532            }
1533            self.core.set_last_raw(bar.open);
1534        } else if self.core.last.is_some_and(|last| bar.open != last) {
1535            // Gap between previous close and this bar's open
1536            self.fill_at_market = true;
1537
1538            if !self.process_bar_trade_tick(
1539                bar,
1540                bar.open,
1541                sizes.open,
1542                aggressor_side,
1543                "bar gap-open trade tick",
1544            ) {
1545                return;
1546            }
1547            self.core.set_last_raw(bar.open);
1548        }
1549
1550        // Determine high/low processing order.
1551        // Default: O > H > L > C. With adaptive ordering, swap if low is closer to open.
1552        let high_first = !self.config.bar_adaptive_high_low_ordering
1553            || (bar.high.raw - bar.open.raw).abs() < (bar.low.raw - bar.open.raw).abs();
1554
1555        if high_first {
1556            self.process_bar_high(bar, sizes.high);
1557            self.process_bar_low(bar, sizes.low);
1558        } else {
1559            self.process_bar_low(bar, sizes.low);
1560            self.process_bar_high(bar, sizes.high);
1561        }
1562
1563        // Close: fill at trigger price (market moving through prices)
1564        if self.core.last.is_some_and(|last| bar.close != last) {
1565            self.fill_at_market = false;
1566
1567            let aggressor_side = if bar.close > self.core.last.unwrap() {
1568                AggressorSide::Buyer
1569            } else {
1570                AggressorSide::Seller
1571            };
1572
1573            if !self.process_bar_trade_tick(
1574                bar,
1575                bar.close,
1576                sizes.close,
1577                aggressor_side,
1578                "bar close trade tick",
1579            ) {
1580                return;
1581            }
1582
1583            self.core.set_last_raw(bar.close);
1584        }
1585
1586        self.fill_at_market = true;
1587    }
1588
1589    fn process_bar_high(&mut self, bar: &Bar, size: Quantity) {
1590        if self.core.last.is_some_and(|last| bar.high > last) {
1591            self.fill_at_market = false;
1592
1593            if !self.process_bar_trade_tick(
1594                bar,
1595                bar.high,
1596                size,
1597                AggressorSide::Buyer,
1598                "bar high trade tick",
1599            ) {
1600                return;
1601            }
1602
1603            self.core.set_last_raw(bar.high);
1604        }
1605    }
1606
1607    fn process_bar_low(&mut self, bar: &Bar, size: Quantity) {
1608        if self.core.last.is_some_and(|last| bar.low < last) {
1609            self.fill_at_market = false;
1610
1611            if !self.process_bar_trade_tick(
1612                bar,
1613                bar.low,
1614                size,
1615                AggressorSide::Seller,
1616                "bar low trade tick",
1617            ) {
1618                return;
1619            }
1620
1621            self.core.set_last_raw(bar.low);
1622        }
1623    }
1624
1625    fn process_bar_trade_tick(
1626        &mut self,
1627        bar: &Bar,
1628        price: Price,
1629        size: Quantity,
1630        aggressor_side: AggressorSide,
1631        context: &str,
1632    ) -> bool {
1633        if size.is_zero() {
1634            return true;
1635        }
1636
1637        let trade_tick = TradeTick::new(
1638            bar.instrument_id(),
1639            price,
1640            size,
1641            aggressor_side,
1642            self.ids_generator.generate_trade_id(bar.ts_init),
1643            bar.ts_init,
1644            bar.ts_init,
1645        );
1646
1647        if !self.update_trade_tick_or_skip(&trade_tick, context) {
1648            return false;
1649        }
1650
1651        self.iterate(trade_tick.ts_init, AggressorSide::NoAggressor);
1652        true
1653    }
1654
1655    fn process_quote_ticks_from_bar(&mut self) {
1656        // Wait for next bar
1657        if self.last_bar_bid.is_none()
1658            || self.last_bar_ask.is_none()
1659            || self.last_bar_bid.unwrap().ts_init != self.last_bar_ask.unwrap().ts_init
1660        {
1661            return;
1662        }
1663        let bid_bar = self.last_bar_bid.unwrap();
1664        let ask_bar = self.last_bar_ask.unwrap();
1665
1666        let size_increment = self.instrument.size_increment();
1667        let bid_sizes = BarTickSizes::from_volume(bid_bar.volume, size_increment);
1668        let ask_sizes = BarTickSizes::from_volume(ask_bar.volume, size_increment);
1669        let mut has_current_bid = false;
1670        let mut has_current_ask = false;
1671
1672        let mut quote_tick = QuoteTick::new(
1673            self.book.instrument_id,
1674            bid_bar.open,
1675            ask_bar.open,
1676            bid_sizes.open,
1677            ask_sizes.open,
1678            bid_bar.ts_init,
1679            bid_bar.ts_init,
1680        );
1681
1682        // Open: fill at market price (gap from previous bar)
1683        self.fill_at_market = true;
1684
1685        if !self.process_bar_quote_tick(
1686            &quote_tick,
1687            "bar open quote tick",
1688            &mut has_current_bid,
1689            &mut has_current_ask,
1690        ) {
1691            return;
1692        }
1693
1694        // High: fill at trigger price (market moving through prices)
1695        self.fill_at_market = false;
1696        quote_tick.bid_price = bid_bar.high;
1697        quote_tick.ask_price = ask_bar.high;
1698        quote_tick.bid_size = bid_sizes.high;
1699        quote_tick.ask_size = ask_sizes.high;
1700
1701        if !self.process_bar_quote_tick(
1702            &quote_tick,
1703            "bar high quote tick",
1704            &mut has_current_bid,
1705            &mut has_current_ask,
1706        ) {
1707            return;
1708        }
1709
1710        // Low: fill at trigger price (market moving through prices)
1711        self.fill_at_market = false;
1712        quote_tick.bid_price = bid_bar.low;
1713        quote_tick.ask_price = ask_bar.low;
1714        quote_tick.bid_size = bid_sizes.low;
1715        quote_tick.ask_size = ask_sizes.low;
1716
1717        if !self.process_bar_quote_tick(
1718            &quote_tick,
1719            "bar low quote tick",
1720            &mut has_current_bid,
1721            &mut has_current_ask,
1722        ) {
1723            return;
1724        }
1725
1726        // Close: fill at trigger price (market moving through prices)
1727        self.fill_at_market = false;
1728        quote_tick.bid_price = bid_bar.close;
1729        quote_tick.ask_price = ask_bar.close;
1730        quote_tick.bid_size = bid_sizes.close;
1731        quote_tick.ask_size = ask_sizes.close;
1732
1733        if !self.process_bar_quote_tick(
1734            &quote_tick,
1735            "bar close quote tick",
1736            &mut has_current_bid,
1737            &mut has_current_ask,
1738        ) {
1739            return;
1740        }
1741
1742        self.last_bar_bid = None;
1743        self.last_bar_ask = None;
1744        self.fill_at_market = true;
1745    }
1746
1747    fn process_bar_quote_tick(
1748        &mut self,
1749        quote: &QuoteTick,
1750        context: &str,
1751        has_current_bid: &mut bool,
1752        has_current_ask: &mut bool,
1753    ) -> bool {
1754        let has_bid_size = !quote.bid_size.is_zero();
1755        let has_ask_size = !quote.ask_size.is_zero();
1756        let mut book_changed = false;
1757        let mut bid_cleared = false;
1758        let mut ask_cleared = false;
1759
1760        match (has_bid_size, has_ask_size) {
1761            (true, true) => {
1762                if !self.update_quote_tick_or_skip(quote, context) {
1763                    return false;
1764                }
1765                *has_current_bid = true;
1766                *has_current_ask = true;
1767                book_changed = true;
1768            }
1769            _ => {
1770                if has_bid_size {
1771                    self.update_bar_quote_bid(quote);
1772                    *has_current_bid = true;
1773                    book_changed = true;
1774                } else if !*has_current_bid {
1775                    self.clear_bar_quote_bid(quote);
1776                    *has_current_bid = true;
1777                    book_changed = true;
1778                    bid_cleared = true;
1779                }
1780
1781                if has_ask_size {
1782                    self.update_bar_quote_ask(quote);
1783                    *has_current_ask = true;
1784                    book_changed = true;
1785                } else if !*has_current_ask {
1786                    self.clear_bar_quote_ask(quote);
1787                    *has_current_ask = true;
1788                    book_changed = true;
1789                    ask_cleared = true;
1790                }
1791            }
1792        }
1793
1794        if book_changed
1795            && let (Some(best_bid), Some(best_ask)) =
1796                (self.book.best_bid_price(), self.book.best_ask_price())
1797            && best_bid > best_ask
1798        {
1799            if has_bid_size && !has_ask_size {
1800                self.clear_bar_quote_ask(quote);
1801                ask_cleared = true;
1802            } else if has_ask_size && !has_bid_size {
1803                self.clear_bar_quote_bid(quote);
1804                bid_cleared = true;
1805            }
1806        }
1807
1808        if has_bid_size {
1809            self.last_quote_bid = Some(quote.bid_price);
1810        } else if bid_cleared {
1811            self.last_quote_bid = None;
1812        }
1813
1814        if has_ask_size {
1815            self.last_quote_ask = Some(quote.ask_price);
1816        } else if ask_cleared {
1817            self.last_quote_ask = None;
1818        }
1819
1820        if !book_changed {
1821            return true;
1822        }
1823
1824        self.iterate(quote.ts_init, AggressorSide::NoAggressor);
1825        true
1826    }
1827
1828    fn update_bar_quote_bid(&mut self, quote: &QuoteTick) {
1829        let bid = BookOrder::new(
1830            OrderSide::Buy,
1831            quote.bid_price,
1832            quote.bid_size,
1833            OrderSide::Buy as u64,
1834        );
1835        self.book
1836            .add(bid, 0, self.book.sequence.saturating_add(1), quote.ts_event);
1837    }
1838
1839    fn clear_bar_quote_bid(&mut self, quote: &QuoteTick) {
1840        self.book
1841            .clear_bids(self.book.sequence.saturating_add(1), quote.ts_event);
1842    }
1843
1844    fn update_bar_quote_ask(&mut self, quote: &QuoteTick) {
1845        let ask = BookOrder::new(
1846            OrderSide::Sell,
1847            quote.ask_price,
1848            quote.ask_size,
1849            OrderSide::Sell as u64,
1850        );
1851        self.book
1852            .add(ask, 0, self.book.sequence.saturating_add(1), quote.ts_event);
1853    }
1854
1855    fn clear_bar_quote_ask(&mut self, quote: &QuoteTick) {
1856        self.book
1857            .clear_asks(self.book.sequence.saturating_add(1), quote.ts_event);
1858    }
1859
1860    /// Processes a trade tick to update the market state.
1861    ///
1862    /// For L1 books, always updates the order book with the trade tick to maintain
1863    /// market state. When `trade_execution` is disabled, order matching and maintenance
1864    /// operations (GTD order expiry, trailing stop activation, instrument expiration)
1865    /// are skipped. These maintenance operations will run on the next quote tick or bar.
1866    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
1867        log::debug!("Processing {trade}");
1868
1869        if let Err(e) = self.check_price_precision(trade.price.precision, "trade price") {
1870            self.log_precision_mismatch("trade tick", trade.instrument_id, &e);
1871            return;
1872        }
1873
1874        if let Err(e) = self.check_size_precision(trade.size.precision, "trade size") {
1875            self.log_precision_mismatch("trade tick", trade.instrument_id, &e);
1876            return;
1877        }
1878
1879        self.precision_mismatch_streak = 0;
1880
1881        let price_raw = trade.price.raw;
1882
1883        if self.book_type == BookType::L1_MBP {
1884            // Stale update: skip book mutation and trade execution
1885            if trade.ts_event < self.book.ts_last {
1886                log::warn!(
1887                    "Skipping stale trade: ts_event {} < book.ts_last {} for {}",
1888                    trade.ts_event,
1889                    self.book.ts_last,
1890                    self.book.instrument_id,
1891                );
1892                self.iterate(trade.ts_init, AggressorSide::NoAggressor);
1893                return;
1894            }
1895
1896            if !self.update_trade_tick_or_skip(trade, "trade tick") {
1897                return;
1898            }
1899        }
1900
1901        self.core.set_last_raw(trade.price);
1902
1903        if !self.config.trade_execution {
1904            // Sync core to L1 book, skip order matching
1905            if self.book_type == BookType::L1_MBP {
1906                if let Some(bid) = self.book.best_bid_price() {
1907                    self.core.set_bid_raw(bid);
1908                }
1909
1910                if let Some(ask) = self.book.best_ask_price() {
1911                    self.core.set_ask_raw(ask);
1912                }
1913            }
1914            return;
1915        }
1916
1917        let aggressor_side = trade.aggressor_side;
1918
1919        match aggressor_side {
1920            AggressorSide::Buyer => {
1921                // Buyer lifted the ask: ask was at trade.price, post-trade
1922                // ask is at least this level (only widen)
1923                if self.core.ask.is_none() || price_raw > self.core.ask.map_or(0, |p| p.raw) {
1924                    self.core.set_ask_raw(trade.price);
1925                }
1926
1927                // Initialize bid from first trade if needed
1928                if self.core.bid.is_none() {
1929                    self.core.set_bid_raw(trade.price);
1930                }
1931            }
1932            AggressorSide::Seller => {
1933                // Seller hit the bid: bid was at trade.price, post-trade
1934                // bid is at most this level (only narrow)
1935                if self.core.bid.is_none()
1936                    || price_raw < self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1937                {
1938                    self.core.set_bid_raw(trade.price);
1939                }
1940
1941                // Initialize ask from first trade if needed
1942                if self.core.ask.is_none() {
1943                    self.core.set_ask_raw(trade.price);
1944                }
1945            }
1946            AggressorSide::NoAggressor => {
1947                if self.core.bid.is_none()
1948                    || price_raw <= self.core.bid.map_or(PriceRaw::MAX, |p| p.raw)
1949                {
1950                    self.core.set_bid_raw(trade.price);
1951                }
1952
1953                if self.core.ask.is_none() || price_raw >= self.core.ask.map_or(0, |p| p.raw) {
1954                    self.core.set_ask_raw(trade.price);
1955                }
1956            }
1957        }
1958
1959        let original_bid = self.core.bid;
1960        let original_ask = self.core.ask;
1961
1962        match aggressor_side {
1963            AggressorSide::Seller => {
1964                if original_ask.is_some_and(|ask| price_raw < ask.raw) {
1965                    self.core.set_ask_raw(trade.price);
1966                }
1967            }
1968            AggressorSide::Buyer => {
1969                if original_bid.is_some_and(|bid| price_raw > bid.raw) {
1970                    self.core.set_bid_raw(trade.price);
1971                }
1972            }
1973            AggressorSide::NoAggressor => {
1974                // Force both sides to trade price (parity with Cython)
1975                self.core.set_bid_raw(trade.price);
1976                self.core.set_ask_raw(trade.price);
1977            }
1978        }
1979
1980        self.last_trade_size = Some(trade.size);
1981        self.trade_consumption = 0;
1982
1983        if self.config.liquidity_consumption && self.book_type != BookType::L1_MBP {
1984            self.seed_trade_consumption(price_raw, trade.size.raw, trade.ts_event, aggressor_side);
1985        }
1986
1987        self.resolve_pending_on_trade(price_raw);
1988        self.decrement_queue_on_trade(price_raw, trade.size.raw, aggressor_side);
1989
1990        self.iterate(trade.ts_init, aggressor_side);
1991
1992        self.last_trade_size = None;
1993        self.trade_consumption = 0;
1994
1995        // Restore the non-aggressor side after temporary trade price override.
1996        // For L2/L3 books the book has independent depth so restore from originals.
1997        // For L1_MBP restore from the last quote values (not originals, which are
1998        // polluted by iterate's L1 book sync). Without quotes, skip the restore
1999        // so the core tracks the latest trade price.
2000        if self.book_type == BookType::L1_MBP {
2001            match aggressor_side {
2002                AggressorSide::Seller => {
2003                    if let Some(ask) = self.last_quote_ask {
2004                        self.core.ask = Some(ask);
2005                    }
2006                }
2007                AggressorSide::Buyer => {
2008                    if let Some(bid) = self.last_quote_bid {
2009                        self.core.bid = Some(bid);
2010                    }
2011                }
2012                AggressorSide::NoAggressor => {}
2013            }
2014        } else {
2015            match aggressor_side {
2016                AggressorSide::Seller => {
2017                    if let Some(ask) = original_ask
2018                        && price_raw < ask.raw
2019                    {
2020                        self.core.ask = Some(ask);
2021                    }
2022                }
2023                AggressorSide::Buyer => {
2024                    if let Some(bid) = original_bid
2025                        && price_raw > bid.raw
2026                    {
2027                        self.core.bid = Some(bid);
2028                    }
2029                }
2030                AggressorSide::NoAggressor => {}
2031            }
2032        }
2033    }
2034
2035    fn update_quote_tick_or_skip(&mut self, quote: &QuoteTick, context: &str) -> bool {
2036        if let Err(e) = self.book.update_quote_tick(quote) {
2037            log::warn!(
2038                "Skipping {context} for {}: update_quote_tick failed: {e}",
2039                quote.instrument_id,
2040            );
2041            return false;
2042        }
2043        true
2044    }
2045
2046    fn update_trade_tick_or_skip(&mut self, trade: &TradeTick, context: &str) -> bool {
2047        if let Err(e) = self.book.update_trade_tick(trade) {
2048            log::warn!(
2049                "Skipping {context} for {}: update_trade_tick failed: {e}",
2050                trade.instrument_id,
2051            );
2052            return false;
2053        }
2054        true
2055    }
2056
2057    /// Processes a market status action to update the market state.
2058    pub fn process_status(&mut self, action: MarketStatusAction) {
2059        log::debug!("Processing {action}");
2060
2061        // Check if market is closed and market opens with trading or pre-open status
2062        if self.market_status == MarketStatus::Closed
2063            && (action == MarketStatusAction::Trading || action == MarketStatusAction::PreOpen)
2064        {
2065            self.market_status = MarketStatus::Open;
2066        }
2067        // Check if market is open and market pauses
2068        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Pause {
2069            self.market_status = MarketStatus::Paused;
2070        }
2071        // Check if market is open and market suspends
2072        if self.market_status == MarketStatus::Open && action == MarketStatusAction::Suspend {
2073            self.market_status = MarketStatus::Suspended;
2074        }
2075        // Check if market is open and we halt or close
2076        if self.market_status == MarketStatus::Open
2077            && (action == MarketStatusAction::Halt || action == MarketStatusAction::Close)
2078        {
2079            self.market_status = MarketStatus::Closed;
2080        }
2081    }
2082
2083    /// Processes an instrument close event.
2084    ///
2085    /// For `ContractExpired` close types, stores the close and triggers expiration
2086    /// processing which cancels all open orders and closes all open positions.
2087    pub fn process_instrument_close(&mut self, close: InstrumentClose) {
2088        if close.instrument_id != self.instrument.id() {
2089            log::warn!(
2090                "Received instrument close for unknown instrument_id: {}",
2091                close.instrument_id
2092            );
2093            return;
2094        }
2095
2096        if close.close_type == InstrumentCloseType::ContractExpired {
2097            self.instrument_close = Some(close);
2098            self.iterate(close.ts_init, AggressorSide::NoAggressor);
2099        }
2100    }
2101
2102    /// Processes instrument expiration at the given timestamp.
2103    pub fn process_instrument_expiration(&mut self, timestamp_ns: UnixNanos) {
2104        self.check_instrument_expiration(timestamp_ns);
2105    }
2106
2107    /// Returns whether instrument expiration has already been processed.
2108    #[must_use]
2109    pub const fn is_expiration_processed(&self) -> bool {
2110        self.expiration_processed
2111    }
2112
2113    fn requires_pending_resolution(&self) -> bool {
2114        matches!(self.instrument, InstrumentAny::BinaryOption(_))
2115    }
2116
2117    fn cancel_open_orders_for_expiration(&mut self) {
2118        // Build a single de-duplicated cancellation set across the matching
2119        // core and cache. Resting orders may still only be represented in the
2120        // core while inflight orders can remain cache-only during the
2121        // submitted/pending transition window.
2122        let instrument_id = self.instrument.id();
2123        let expiration_order_ids: IndexSet<ClientOrderId> = {
2124            let cache = self.cache.borrow();
2125            let mut order_ids = IndexSet::new();
2126
2127            for order_info in self.get_open_orders() {
2128                order_ids.insert(order_info.client_order_id);
2129            }
2130
2131            for order in cache.orders(None, Some(&instrument_id), None, None, None) {
2132                if order.is_open() || order.is_inflight() {
2133                    order_ids.insert(order.client_order_id());
2134                }
2135            }
2136
2137            order_ids
2138        };
2139
2140        for client_order_id in expiration_order_ids {
2141            let order = {
2142                let cache = self.cache.borrow();
2143                cache.order(&client_order_id).map(|order| order.clone())
2144            };
2145
2146            if let Some(order) = order {
2147                self.cancel_order(&order, None);
2148            }
2149        }
2150    }
2151
2152    fn enter_pending_resolution(&mut self) {
2153        if self.pending_resolution {
2154            return;
2155        }
2156
2157        self.pending_resolution = true;
2158        self.market_status = MarketStatus::Closed;
2159        self.cancel_open_orders_for_expiration();
2160        log::info!(
2161            "{} expired and is now pending resolution; open orders canceled and new orders blocked",
2162            self.instrument.id()
2163        );
2164    }
2165
2166    fn check_instrument_expiration(&mut self, timestamp_ns: UnixNanos) {
2167        if self.expiration_processed {
2168            return;
2169        }
2170
2171        let timestamp_triggered = self
2172            .instrument
2173            .expiration_ns()
2174            .is_some_and(|ns| timestamp_ns >= ns);
2175
2176        if !timestamp_triggered && self.instrument_close.is_none() {
2177            return;
2178        }
2179
2180        if self.instrument_close.is_none()
2181            && timestamp_triggered
2182            && self.requires_pending_resolution()
2183        {
2184            self.enter_pending_resolution();
2185            return;
2186        }
2187
2188        self.expiration_processed = true;
2189        self.pending_resolution = false;
2190        let close = self.instrument_close.take();
2191        log::info!("{} reached expiration", self.instrument.id());
2192        self.cancel_open_orders_for_expiration();
2193
2194        if matches!(
2195            self.instrument,
2196            InstrumentAny::OptionContract(_) | InstrumentAny::CryptoOption(_)
2197        ) {
2198            self.process_option_expiry(timestamp_ns);
2199            return;
2200        }
2201
2202        let instrument_id = self.instrument.id();
2203        let positions: Vec<(TraderId, StrategyId, PositionId, OrderSide, Quantity)> = {
2204            let cache = self.cache.borrow();
2205            cache
2206                .positions_open(None, Some(&instrument_id), None, None, None)
2207                .into_iter()
2208                .map(|pos| {
2209                    let closing_side = match pos.side {
2210                        PositionSide::Long => OrderSide::Sell,
2211                        PositionSide::Short => OrderSide::Buy,
2212                        _ => OrderSide::NoOrderSide,
2213                    };
2214                    (
2215                        pos.trader_id,
2216                        pos.strategy_id,
2217                        pos.id,
2218                        closing_side,
2219                        pos.quantity,
2220                    )
2221                })
2222                .collect()
2223        };
2224
2225        let ts_now = self.clock.borrow().timestamp_ns();
2226        let close_price_fallback = close.as_ref().map(|c| c.close_price);
2227
2228        for (trader_id, strategy_id, position_id, closing_side, quantity) in positions {
2229            let client_order_id =
2230                ClientOrderId::from(format!("EXPIRATION-{}-{}", self.venue, UUID4::new()).as_str());
2231            let mut order = OrderAny::Market(MarketOrder::new(
2232                trader_id,
2233                strategy_id,
2234                instrument_id,
2235                client_order_id,
2236                closing_side,
2237                quantity,
2238                TimeInForce::Gtc,
2239                UUID4::new(),
2240                ts_now,
2241                true, // reduce_only
2242                false,
2243                None,
2244                None,
2245                None,
2246                None,
2247                None,
2248                None,
2249                None,
2250                Some(vec![Ustr::from(&format!(
2251                    "EXPIRATION_{}_CLOSE",
2252                    self.venue
2253                ))]),
2254            ));
2255            order.set_liquidity_side(LiquiditySide::Taker);
2256
2257            let add_result =
2258                self.cache
2259                    .borrow_mut()
2260                    .add_order(order.clone(), Some(position_id), None, false);
2261            if add_result.is_err() {
2262                log::debug!("Expiration order already in cache: {client_order_id}");
2263            } else {
2264                self.publish_order_initialized(&order);
2265            }
2266
2267            let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
2268            self.generate_order_accepted(&order, venue_order_id);
2269
2270            let fill_price = self.settlement_price.or(close_price_fallback);
2271            if let Some(fill_price) = fill_price {
2272                self.apply_fills(
2273                    &order,
2274                    &[(fill_price, quantity)],
2275                    LiquiditySide::Taker,
2276                    Some(position_id),
2277                    None,
2278                    None,
2279                );
2280            } else {
2281                self.fill_market_order(client_order_id);
2282            }
2283        }
2284    }
2285
2286    fn process_option_expiry(&mut self, ts_now: UnixNanos) {
2287        let instrument_id = self.instrument.id();
2288
2289        let positions: Vec<Position> = {
2290            let cache = self.cache.borrow();
2291            cache
2292                .positions_open(None, Some(&instrument_id), None, None, None)
2293                .into_iter()
2294                .map(|p| p.cloned())
2295                .collect()
2296        };
2297
2298        if positions.is_empty() {
2299            return;
2300        }
2301
2302        let underlying = match self.instrument.underlying() {
2303            Some(u) => u,
2304            None => {
2305                log::error!("No underlying for option {instrument_id}");
2306                return;
2307            }
2308        };
2309        let underlying_id = InstrumentId::from(format!("{underlying}.{}", self.venue).as_str());
2310
2311        let (underlying_instrument, underlying_price) = {
2312            let cache = self.cache.borrow();
2313            (
2314                cache.instrument(&underlying_id).cloned(),
2315                cache.price(&underlying_id, PriceType::Last),
2316            )
2317        };
2318
2319        let underlying_instrument = match underlying_instrument {
2320            Some(u) => u,
2321            None => {
2322                log::error!("No underlying instrument for option {instrument_id}");
2323                return;
2324            }
2325        };
2326
2327        let underlying_price = match underlying_price {
2328            Some(p) => p,
2329            None => {
2330                log::error!("No underlying price for option {instrument_id}");
2331                return;
2332            }
2333        };
2334
2335        let custom_option_price = self.settlement_price;
2336        let should_exercise = self.option_should_exercise(underlying_price);
2337
2338        for position in positions {
2339            self.account_ids
2340                .insert(position.trader_id, position.account_id);
2341
2342            if should_exercise {
2343                self.option_exercise_position(
2344                    &position,
2345                    &underlying_instrument,
2346                    underlying_price,
2347                    ts_now,
2348                    custom_option_price,
2349                );
2350            } else {
2351                self.option_otm_expiry(&position, ts_now, custom_option_price);
2352            }
2353        }
2354    }
2355
2356    fn option_should_exercise(&self, underlying_price: Price) -> bool {
2357        let strike = match self.instrument.strike_price() {
2358            Some(p) => p.as_f64(),
2359            None => return false,
2360        };
2361        let spot = underlying_price.as_f64();
2362        match self.instrument.option_kind() {
2363            Some(OptionKind::Call) => spot > strike,
2364            Some(OptionKind::Put) => strike > spot,
2365            None => false,
2366        }
2367    }
2368
2369    fn option_settlement_price(&self, underlying_price: Price, cash_settled: bool) -> Price {
2370        let strike = self
2371            .instrument
2372            .strike_price()
2373            .expect("option must have strike");
2374        if !cash_settled {
2375            return strike;
2376        }
2377
2378        let spot = underlying_price.as_f64();
2379        let strike_f = strike.as_f64();
2380        let value = match self.instrument.option_kind() {
2381            Some(OptionKind::Call) => (spot - strike_f).max(0.0),
2382            _ => (strike_f - spot).max(0.0),
2383        };
2384        Price::new(value, strike.precision)
2385    }
2386
2387    fn option_exercise_position(
2388        &self,
2389        position: &Position,
2390        underlying_instrument: &InstrumentAny,
2391        underlying_price: Price,
2392        ts_now: UnixNanos,
2393        custom_option_price: Option<Price>,
2394    ) {
2395        if matches!(underlying_instrument, InstrumentAny::IndexInstrument(_)) {
2396            self.option_cash_settlement(position, underlying_price, ts_now, custom_option_price);
2397        } else {
2398            self.option_physical_settlement(
2399                position,
2400                underlying_instrument,
2401                underlying_price,
2402                ts_now,
2403                custom_option_price,
2404            );
2405        }
2406    }
2407
2408    fn option_cash_settlement(
2409        &self,
2410        position: &Position,
2411        underlying_price: Price,
2412        ts_now: UnixNanos,
2413        custom_option_price: Option<Price>,
2414    ) {
2415        let venue = self.venue;
2416        let trade_id = format!("{venue}-LEG-CASH-{}", &UUID4::new().to_string()[..8]);
2417        let close_px = custom_option_price
2418            .unwrap_or_else(|| self.option_settlement_price(underlying_price, true));
2419        let close_side = OrderCore::closing_side(position.side);
2420        self.option_register_settlement_order(
2421            position,
2422            self.instrument.id(),
2423            close_side,
2424            position.quantity,
2425            ClientOrderId::from(trade_id.as_str()),
2426            VenueOrderId::from(trade_id.as_str()),
2427            Some(position.id),
2428            true,
2429            &format!("EXPIRATION_{venue}_CASH"),
2430        );
2431        let fill = self.option_create_close_fill(position, close_px, &trade_id, ts_now);
2432        self.dispatch_order_event(OrderEventAny::Filled(fill));
2433    }
2434
2435    fn option_physical_settlement(
2436        &self,
2437        position: &Position,
2438        underlying_instrument: &InstrumentAny,
2439        underlying_price: Price,
2440        ts_now: UnixNanos,
2441        custom_option_price: Option<Price>,
2442    ) {
2443        let multiplier = self.instrument.multiplier();
2444        let underlying_qty = Quantity::new(
2445            position.quantity.as_f64() * multiplier.as_f64(),
2446            underlying_instrument.size_precision(),
2447        );
2448
2449        let underlying_side = if self.instrument.option_kind() == Some(OptionKind::Call) {
2450            position.side
2451        } else {
2452            match position.side {
2453                PositionSide::Long => PositionSide::Short,
2454                PositionSide::Short => PositionSide::Long,
2455                other => other,
2456            }
2457        };
2458
2459        let venue = self.venue;
2460        let trade_base = format!("{venue}-LEG-EX-{}", &UUID4::new().to_string()[..8]);
2461        let close_trade_id = format!("{trade_base}-CLOSE");
2462        let open_trade_id = format!("{trade_base}-OPEN");
2463        let settlement_px = self.option_settlement_price(underlying_price, false);
2464        let option_close_px = custom_option_price
2465            .unwrap_or_else(|| Price::new(0.0, self.instrument.price_precision()));
2466        let close_side = OrderCore::closing_side(position.side);
2467        let underlying_order_side = match underlying_side {
2468            PositionSide::Long => OrderSide::Buy,
2469            _ => OrderSide::Sell,
2470        };
2471
2472        self.option_register_settlement_order(
2473            position,
2474            self.instrument.id(),
2475            close_side,
2476            position.quantity,
2477            ClientOrderId::from(close_trade_id.as_str()),
2478            VenueOrderId::from(close_trade_id.as_str()),
2479            Some(position.id),
2480            true,
2481            &format!("EXPIRATION_{venue}_PHYSICAL_CLOSE"),
2482        );
2483        self.option_register_settlement_order(
2484            position,
2485            underlying_instrument.id(),
2486            underlying_order_side,
2487            underlying_qty,
2488            ClientOrderId::from(open_trade_id.as_str()),
2489            VenueOrderId::from(open_trade_id.as_str()),
2490            None,
2491            false,
2492            &format!("EXPIRATION_{venue}_PHYSICAL_OPEN"),
2493        );
2494
2495        let option_fill =
2496            self.option_create_close_fill(position, option_close_px, &close_trade_id, ts_now);
2497        let underlying_fill = self.option_create_underlying_fill(
2498            position,
2499            underlying_instrument,
2500            underlying_qty,
2501            underlying_side,
2502            settlement_px,
2503            &open_trade_id,
2504            ts_now,
2505        );
2506        self.dispatch_order_event(OrderEventAny::Filled(option_fill));
2507        self.dispatch_order_event(OrderEventAny::Filled(underlying_fill));
2508    }
2509
2510    fn option_otm_expiry(
2511        &self,
2512        position: &Position,
2513        ts_now: UnixNanos,
2514        custom_option_price: Option<Price>,
2515    ) {
2516        let venue = self.venue;
2517        let trade_id = format!("{venue}-LEG-OTM-{}", &UUID4::new().to_string()[..8]);
2518        let close_px = custom_option_price
2519            .unwrap_or_else(|| Price::new(0.0, self.instrument.price_precision()));
2520        let close_side = OrderCore::closing_side(position.side);
2521        self.option_register_settlement_order(
2522            position,
2523            self.instrument.id(),
2524            close_side,
2525            position.quantity,
2526            ClientOrderId::from(trade_id.as_str()),
2527            VenueOrderId::from(trade_id.as_str()),
2528            Some(position.id),
2529            true,
2530            &format!("EXPIRATION_{venue}_OTM"),
2531        );
2532        let fill = self.option_create_close_fill(position, close_px, &trade_id, ts_now);
2533        self.dispatch_order_event(OrderEventAny::Filled(fill));
2534    }
2535
2536    #[expect(clippy::too_many_arguments)]
2537    fn option_register_settlement_order(
2538        &self,
2539        position: &Position,
2540        instrument_id: InstrumentId,
2541        order_side: OrderSide,
2542        quantity: Quantity,
2543        client_order_id: ClientOrderId,
2544        venue_order_id: VenueOrderId,
2545        position_id: Option<PositionId>,
2546        reduce_only: bool,
2547        tag: &str,
2548    ) {
2549        let ts_now = self.clock.borrow().timestamp_ns();
2550        let order = OrderAny::Market(MarketOrder::new(
2551            position.trader_id,
2552            position.strategy_id,
2553            instrument_id,
2554            client_order_id,
2555            order_side,
2556            quantity,
2557            TimeInForce::Gtc,
2558            UUID4::new(),
2559            ts_now,
2560            reduce_only,
2561            false,
2562            None,
2563            None,
2564            None,
2565            None,
2566            None,
2567            None,
2568            None,
2569            Some(vec![Ustr::from(tag)]),
2570        ));
2571
2572        {
2573            let mut cache = self.cache.borrow_mut();
2574            if let Err(e) = cache.add_order(order.clone(), position_id, None, false) {
2575                log::debug!("Settlement order already in cache: {e}");
2576            } else {
2577                drop(cache);
2578                self.publish_order_initialized(&order);
2579                self.cache
2580                    .borrow_mut()
2581                    .add_venue_order_id(&client_order_id, &venue_order_id, false)
2582                    .ok();
2583            }
2584        }
2585
2586        self.generate_order_accepted(&order, venue_order_id);
2587    }
2588
2589    fn option_create_close_fill(
2590        &self,
2591        position: &Position,
2592        price: Price,
2593        trade_id_str: &str,
2594        ts_now: UnixNanos,
2595    ) -> OrderFilled {
2596        let close_side = OrderCore::closing_side(position.side);
2597        OrderFilled::new(
2598            position.trader_id,
2599            position.strategy_id,
2600            self.instrument.id(),
2601            ClientOrderId::from(trade_id_str),
2602            VenueOrderId::from(trade_id_str),
2603            position.account_id,
2604            TradeId::from(trade_id_str),
2605            close_side,
2606            OrderType::Market,
2607            position.quantity,
2608            price,
2609            self.instrument.quote_currency(),
2610            LiquiditySide::Taker,
2611            UUID4::new(),
2612            ts_now,
2613            ts_now,
2614            false,
2615            Some(position.id),
2616            Some(Money::new(0.0, self.instrument.quote_currency())),
2617        )
2618    }
2619
2620    #[expect(clippy::too_many_arguments)]
2621    fn option_create_underlying_fill(
2622        &self,
2623        position: &Position,
2624        underlying_instrument: &InstrumentAny,
2625        quantity: Quantity,
2626        side: PositionSide,
2627        price: Price,
2628        trade_id_str: &str,
2629        ts_now: UnixNanos,
2630    ) -> OrderFilled {
2631        let order_side = match side {
2632            PositionSide::Long => OrderSide::Buy,
2633            _ => OrderSide::Sell,
2634        };
2635        OrderFilled::new(
2636            position.trader_id,
2637            position.strategy_id,
2638            underlying_instrument.id(),
2639            ClientOrderId::from(trade_id_str),
2640            VenueOrderId::from(trade_id_str),
2641            position.account_id,
2642            TradeId::from(trade_id_str),
2643            order_side,
2644            OrderType::Market,
2645            quantity,
2646            price,
2647            underlying_instrument.quote_currency(),
2648            LiquiditySide::Taker,
2649            UUID4::new(),
2650            ts_now,
2651            ts_now,
2652            false,
2653            None,
2654            Some(Money::new(0.0, underlying_instrument.quote_currency())),
2655        )
2656    }
2657
2658    /// Liquidates all open positions for this instrument.
2659    ///
2660    /// Cancels open orders if `cancel_open_orders` is true, then closes every open
2661    /// position at best bid/ask or the settlement price, emitting accepted and filled
2662    /// events for each synthetic close order.
2663    ///
2664    /// # Panics
2665    ///
2666    /// Panics if the venue order ID generator cannot produce an ID for the synthetic
2667    /// liquidation order (internal state inconsistency).
2668    ///
2669    /// Only positions whose instrument settles in `settlement_currency` are closed.
2670    /// Matching engines for other settlement currencies are skipped, scoping
2671    /// liquidation to the currency whose margin account breached the threshold.
2672    pub fn liquidate_open_positions(
2673        &mut self,
2674        ts_now: UnixNanos,
2675        cancel_open_orders: bool,
2676        settlement_currency: Currency,
2677    ) {
2678        // Only liquidate positions settled in the breached currency.
2679        if self.instrument.settlement_currency() != settlement_currency {
2680            return;
2681        }
2682
2683        if cancel_open_orders {
2684            let open_orders: Vec<RestingOrder> = self.get_open_orders();
2685            for order_info in &open_orders {
2686                let order = {
2687                    let cache = self.cache.borrow();
2688                    cache.order_owned(&order_info.client_order_id)
2689                };
2690
2691                if let Some(order) = order {
2692                    self.cancel_order(&order, None);
2693                }
2694            }
2695        }
2696
2697        let instrument_id = self.instrument.id();
2698        let positions: Vec<(
2699            TraderId,
2700            StrategyId,
2701            AccountId,
2702            PositionId,
2703            OrderSide,
2704            Quantity,
2705        )> = {
2706            let cache = self.cache.borrow();
2707            cache
2708                .positions_open(None, Some(&instrument_id), None, None, None)
2709                .into_iter()
2710                .map(|pos| {
2711                    (
2712                        pos.trader_id,
2713                        pos.strategy_id,
2714                        pos.account_id,
2715                        pos.id,
2716                        OrderCore::closing_side(pos.side),
2717                        pos.quantity,
2718                    )
2719                })
2720                .collect()
2721        };
2722
2723        for (trader_id, strategy_id, account_id, position_id, closing_side, quantity) in positions {
2724            // Pre-check: ensure a price source is available before emitting events.
2725            let has_price = if closing_side == OrderSide::Sell {
2726                self.best_bid_price().is_some() || self.settlement_price.is_some()
2727            } else {
2728                self.best_ask_price().is_some() || self.settlement_price.is_some()
2729            };
2730
2731            if !has_price {
2732                log::warn!(
2733                    "LIQUIDATION: no price available for {instrument_id} position {position_id}, skipping"
2734                );
2735                continue;
2736            }
2737
2738            let client_order_id = ClientOrderId::from(
2739                format!("LIQUIDATION-{}-{}", self.venue, UUID4::new()).as_str(),
2740            );
2741            let order = OrderAny::Market(MarketOrder::new(
2742                trader_id,
2743                strategy_id,
2744                instrument_id,
2745                client_order_id,
2746                closing_side,
2747                quantity,
2748                TimeInForce::Ioc,
2749                UUID4::new(),
2750                ts_now,
2751                true, // reduce_only
2752                false,
2753                None,
2754                None,
2755                None,
2756                None,
2757                None,
2758                None,
2759                None,
2760                Some(vec![Ustr::from(&format!(
2761                    "LIQUIDATION_{}_CLOSE",
2762                    self.venue
2763                ))]),
2764            ));
2765
2766            let venue_order_id = self.ids_generator.get_venue_order_id(&order).unwrap();
2767            {
2768                let mut cache = self.cache.borrow_mut();
2769                if let Err(e) = cache.add_order(order.clone(), Some(position_id), None, false) {
2770                    log::debug!("Liquidation order already in cache: {e}");
2771                } else {
2772                    drop(cache);
2773                    self.publish_order_initialized(&order);
2774                    self.cache
2775                        .borrow_mut()
2776                        .add_venue_order_id(&client_order_id, &venue_order_id, false)
2777                        .ok();
2778                }
2779            }
2780
2781            // Route through the normal market-order fill machinery (fill model,
2782            // book depth consumption, slippage) instead of apply_fills directly.
2783            self.account_ids.insert(trader_id, account_id);
2784            self.generate_order_submitted(&order, account_id);
2785            self.generate_order_accepted(&order, venue_order_id);
2786            self.fill_market_order(client_order_id);
2787        }
2788    }
2789
2790    /// Processes a new order submission.
2791    ///
2792    /// Validates the order against instrument precision, expiration, and contingency
2793    /// rules before accepting or rejecting it.
2794    ///
2795    /// # Panics
2796    ///
2797    /// Panics if an OTO child order references a missing or non-OTO parent.
2798    pub fn process_order(&mut self, order: &mut OrderAny, account_id: AccountId) {
2799        // Idempotent: OTO children may be re-routed via `fill_order`
2800        if self.core.order_exists(order.client_order_id()) {
2801            return;
2802        }
2803
2804        // Ensure expiration semantics are enforced even when no fresh market-data
2805        // tick arrives for this instrument after expiry (e.g. after rotation).
2806        let ts_now = self.clock.borrow().timestamp_ns();
2807        self.check_instrument_expiration(ts_now);
2808
2809        // Validate inside a cache borrow scope, collecting any rejection
2810        // reason rather than emitting events while the borrow is held.
2811        // This avoids RefCell re-entrancy panics from synchronous event
2812        // dispatch that calls back into the execution engine.
2813        let reject_reason: Option<Ustr> = 'validate: {
2814            let cache_borrow = self.cache.as_ref().borrow();
2815
2816            // Index identifiers
2817            self.account_ids.insert(order.trader_id(), account_id);
2818
2819            if self.pending_resolution {
2820                break 'validate Some(
2821                    format!(
2822                        "Contract {} has expired and is pending resolution",
2823                        self.instrument.id()
2824                    )
2825                    .into(),
2826                );
2827            }
2828
2829            // Check for instrument expiration or activation
2830            if self.instrument.has_expiration() {
2831                if let Some(activation_ns) = self.instrument.activation_ns()
2832                    && self.clock.borrow().timestamp_ns() < activation_ns
2833                {
2834                    break 'validate Some(
2835                        format!(
2836                            "Contract {} is not yet active, activation {activation_ns}",
2837                            self.instrument.id(),
2838                        )
2839                        .into(),
2840                    );
2841                }
2842
2843                if let Some(expiration_ns) = self.instrument.expiration_ns()
2844                    && self.clock.borrow().timestamp_ns() >= expiration_ns
2845                {
2846                    break 'validate Some(
2847                        format!(
2848                            "Contract {} has expired, expiration {expiration_ns}",
2849                            self.instrument.id(),
2850                        )
2851                        .into(),
2852                    );
2853                }
2854            }
2855
2856            // Contingent orders checks
2857            if self.config.support_contingent_orders {
2858                if let Some(parent_order_id) = order.parent_order_id() {
2859                    let parent_order = match cache_borrow.order(&parent_order_id) {
2860                        Some(o) if o.contingency_type().unwrap() == ContingencyType::Oto => o,
2861                        _ => panic!("OTO parent not found"),
2862                    };
2863
2864                    if parent_order.status() == OrderStatus::Rejected && order.is_open() {
2865                        break 'validate Some(
2866                            format!("Rejected OTO order from {parent_order_id}").into(),
2867                        );
2868                    } else if parent_order.status() == OrderStatus::Accepted
2869                        || parent_order.status() == OrderStatus::Triggered
2870                        || (self.config.oto_full_trigger
2871                            && parent_order.status() == OrderStatus::PartiallyFilled)
2872                    {
2873                        log::info!(
2874                            "Pending OTO order {} triggers from {parent_order_id}",
2875                            order.client_order_id(),
2876                        );
2877                        return;
2878                    }
2879                }
2880
2881                if let Some(linked_order_ids) = order.linked_order_ids() {
2882                    for client_order_id in linked_order_ids {
2883                        match cache_borrow.order(client_order_id) {
2884                            Some(contingent_order)
2885                                if (order.contingency_type().unwrap() == ContingencyType::Oco
2886                                    || order.contingency_type().unwrap()
2887                                        == ContingencyType::Ouo)
2888                                    && !order.is_closed()
2889                                    && contingent_order.is_closed() =>
2890                            {
2891                                break 'validate Some(
2892                                    format!("Contingent order {client_order_id} already closed")
2893                                        .into(),
2894                                );
2895                            }
2896                            None => panic!("Cannot find contingent order for {client_order_id}"),
2897                            _ => {}
2898                        }
2899                    }
2900                }
2901            }
2902
2903            // Check for valid order quantity precision
2904            if order.quantity().precision != self.instrument.size_precision() {
2905                break 'validate Some(
2906                    format!(
2907                        "Invalid order quantity precision for order {}, was {} when {} size precision is {}",
2908                        order.client_order_id(),
2909                        order.quantity().precision,
2910                        self.instrument.id(),
2911                        self.instrument.size_precision()
2912                    )
2913                    .into(),
2914                );
2915            }
2916
2917            // Check for valid order price precision
2918            if let Some(price) = order.price()
2919                && price.precision != self.instrument.price_precision()
2920            {
2921                break 'validate Some(
2922                    format!(
2923                        "Invalid order price precision for order {}, was {} when {} price precision is {}",
2924                        order.client_order_id(),
2925                        price.precision,
2926                        self.instrument.id(),
2927                        self.instrument.price_precision()
2928                    )
2929                    .into(),
2930                );
2931            }
2932
2933            // Check for valid order trigger price precision
2934            if let Some(trigger_price) = order.trigger_price()
2935                && trigger_price.precision != self.instrument.price_precision()
2936            {
2937                break 'validate Some(
2938                    format!(
2939                        "Invalid order trigger price precision for order {}, was {} when {} price precision is {}",
2940                        order.client_order_id(),
2941                        trigger_price.precision,
2942                        self.instrument.id(),
2943                        self.instrument.price_precision()
2944                    )
2945                    .into(),
2946                );
2947            }
2948
2949            // Get position if exists
2950            let position = cache_borrow
2951                .position_for_order(&order.client_order_id())
2952                .or_else(|| {
2953                    if self.oms_type == OmsType::Netting {
2954                        let position_id = PositionId::new(
2955                            format!("{}-{}", order.instrument_id(), order.strategy_id()).as_str(),
2956                        );
2957                        cache_borrow.position(&position_id)
2958                    } else {
2959                        None
2960                    }
2961                });
2962
2963            // Check not shorting an equity without a MARGIN account
2964            if order.order_side() == OrderSide::Sell
2965                && self.account_type != AccountType::Margin
2966                && matches!(self.instrument, InstrumentAny::Equity(_))
2967                && position
2968                    .as_ref()
2969                    .is_none_or(|pos| !order.would_reduce_only(pos.side, pos.quantity))
2970            {
2971                let position_string = position
2972                    .as_ref()
2973                    .map_or("None".to_string(), |pos| pos.id.to_string());
2974                break 'validate Some(
2975                    format!(
2976                        "Short selling not permitted on a CASH account with position {position_string} and order {order}",
2977                    )
2978                    .into(),
2979                );
2980            }
2981
2982            // Check reduce-only instruction
2983            if self.config.use_reduce_only
2984                && order.is_reduce_only()
2985                && !order.is_closed()
2986                && position.as_ref().is_none_or(|pos| {
2987                    pos.is_closed()
2988                        || (order.is_buy() && pos.is_long())
2989                        || (order.is_sell() && pos.is_short())
2990                })
2991            {
2992                break 'validate Some(
2993                    format!(
2994                        "Reduce-only order {} ({}-{}) would have increased position",
2995                        order.client_order_id(),
2996                        order.order_type().to_string().to_uppercase(),
2997                        order.order_side().to_string().to_uppercase()
2998                    )
2999                    .into(),
3000                );
3001            }
3002
3003            None
3004        };
3005
3006        if let Some(reason) = reject_reason {
3007            self.generate_order_rejected(order, reason);
3008            return;
3009        }
3010
3011        // Convert quote-denominated quantity to base quantity for non-inverse instruments.
3012        // Mirrors live venue semantics where the quote notional is settled into a base
3013        // quantity before the order enters normal fill and state handling. Without this
3014        // conversion the book simulation would treat the quote notional as base size.
3015        // Only applies to order types with a reliable reference price at submission;
3016        // trigger-style market orders and trailing orders are left untouched so they
3017        // convert at fill time from the actual (possibly-trailed) price.
3018        if order.is_quote_quantity()
3019            && !self.instrument.is_inverse()
3020            && !matches!(
3021                order.order_type(),
3022                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket,
3023            )
3024            && (order.price().is_some()
3025                || matches!(
3026                    order.order_type(),
3027                    OrderType::Market | OrderType::MarketToLimit,
3028                ))
3029            && !self.convert_quote_to_base_quantity(order)
3030        {
3031            return;
3032        }
3033
3034        match order.order_type() {
3035            OrderType::Market => self.process_market_order(order),
3036            OrderType::Limit => self.process_limit_order(order),
3037            OrderType::MarketToLimit => self.process_market_to_limit_order(order),
3038            OrderType::StopMarket => self.process_stop_market_order(order),
3039            OrderType::StopLimit => self.process_stop_limit_order(order),
3040            OrderType::MarketIfTouched => self.process_market_if_touched_order(order),
3041            OrderType::LimitIfTouched => self.process_limit_if_touched_order(order),
3042            OrderType::TrailingStopMarket => self.process_trailing_stop_order(order),
3043            OrderType::TrailingStopLimit => self.process_trailing_stop_order(order),
3044        }
3045    }
3046
3047    fn convert_quote_to_base_quantity(&self, order: &mut OrderAny) -> bool {
3048        // Pick a reference price to convert the quote notional into a base quantity.
3049        // Priced orders use their own price (worst-case execution); marketable orders
3050        // use the best opposing book level.
3051        let reference_price = if let Some(price) = order.price() {
3052            Some(price)
3053        } else {
3054            match order.order_side() {
3055                OrderSide::Buy => self.core.ask,
3056                OrderSide::Sell => self.core.bid,
3057                OrderSide::NoOrderSide => None,
3058            }
3059        };
3060
3061        let Some(reference_price) = reference_price else {
3062            self.generate_order_rejected(
3063                order,
3064                format!(
3065                    "No market for {} to convert quote quantity to base",
3066                    order.instrument_id(),
3067                )
3068                .into(),
3069            );
3070            return false;
3071        };
3072
3073        let base_quantity = self
3074            .instrument
3075            .calculate_base_quantity(order.quantity(), reference_price);
3076
3077        let ts_now = self.clock.borrow().timestamp_ns();
3078        let event = OrderEventAny::Updated(OrderUpdated::new(
3079            order.trader_id(),
3080            order.strategy_id(),
3081            order.instrument_id(),
3082            order.client_order_id(),
3083            base_quantity,
3084            UUID4::new(),
3085            ts_now,
3086            ts_now,
3087            false,
3088            order.venue_order_id(),
3089            order.account_id(),
3090            None,
3091            None,
3092            None,
3093            false,
3094        ));
3095
3096        // Apply the update to the local order so subsequent dispatch uses the base
3097        // quantity immediately (the event is also dispatched to the execution engine
3098        // for cache reconciliation).
3099        if let Err(e) = order.apply(event.clone()) {
3100            log::error!(
3101                "Failed to apply quote-to-base update for {}: {e}",
3102                order.client_order_id(),
3103            );
3104            return false;
3105        }
3106        self.dispatch_order_event(event);
3107        true
3108    }
3109
3110    /// Processes an order modify command to update quantity, price, or trigger price.
3111    pub fn process_modify(&mut self, command: &ModifyOrder, account_id: AccountId) {
3112        if !self.core.order_exists(command.client_order_id) {
3113            self.generate_order_modify_rejected(
3114                command.trader_id,
3115                command.strategy_id,
3116                command.instrument_id,
3117                command.client_order_id,
3118                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
3119                command.venue_order_id,
3120                Some(account_id),
3121            );
3122            return;
3123        }
3124
3125        let mut order = match self
3126            .cache
3127            .borrow()
3128            .order(&command.client_order_id)
3129            .map(|o| o.clone())
3130        {
3131            Some(order) => order,
3132            None => {
3133                log::error!(
3134                    "Cannot modify order: order {} not found in cache",
3135                    command.client_order_id
3136                );
3137                return;
3138            }
3139        };
3140
3141        let update_success = self.update_order(
3142            &mut order,
3143            command.quantity,
3144            command.price,
3145            command.trigger_price,
3146            None,
3147        );
3148
3149        if !update_success {
3150            return;
3151        }
3152
3153        // Local `order` is pre-event; resync from the cache for fresh state
3154        let Some(refreshed) = self.resync_core_entry(command.client_order_id) else {
3155            return;
3156        };
3157
3158        // Skip queue reset on rejected modifies to preserve accrued position
3159        let price_changed = refreshed.price() != order.price()
3160            || refreshed.trigger_price() != order.trigger_price();
3161
3162        if price_changed
3163            && refreshed.is_open()
3164            && self.config.queue_position
3165            && let Some(new_price) = refreshed.price()
3166        {
3167            self.snapshot_queue_position(&refreshed, new_price);
3168            self.queue_excess.swap_remove(&refreshed.client_order_id());
3169        }
3170    }
3171
3172    /// Processes an order cancel command.
3173    pub fn process_cancel(&mut self, command: &CancelOrder, account_id: AccountId) {
3174        if !self.core.order_exists(command.client_order_id) {
3175            self.generate_order_cancel_rejected(
3176                command.trader_id,
3177                command.strategy_id,
3178                account_id,
3179                command.instrument_id,
3180                command.client_order_id,
3181                command.venue_order_id,
3182                Ustr::from(format!("Order {} not found", command.client_order_id).as_str()),
3183            );
3184            return;
3185        }
3186
3187        let order = match self
3188            .cache
3189            .borrow()
3190            .order(&command.client_order_id)
3191            .map(|o| o.clone())
3192        {
3193            Some(order) => order,
3194            None => {
3195                log::error!(
3196                    "Cannot cancel order: order {} not found in cache",
3197                    command.client_order_id
3198                );
3199                return;
3200            }
3201        };
3202
3203        if !order.is_inflight() && !order.is_open() {
3204            self.purge_stale_core_entry(command.client_order_id);
3205            return;
3206        }
3207
3208        self.cancel_order(&order, None);
3209    }
3210
3211    /// Processes a cancel all orders command for an instrument.
3212    pub fn process_cancel_all(&mut self, command: &CancelAllOrders, _account_id: AccountId) {
3213        let instrument_id = command.instrument_id;
3214        let order_side = if command.order_side == OrderSide::NoOrderSide {
3215            None
3216        } else {
3217            Some(command.order_side)
3218        };
3219
3220        let client_order_ids: Vec<ClientOrderId> = self
3221            .cache
3222            .borrow()
3223            .orders_open(None, Some(&instrument_id), None, None, order_side)
3224            .iter()
3225            .map(|o| o.client_order_id())
3226            .collect();
3227
3228        for client_order_id in client_order_ids {
3229            let order = match self
3230                .cache
3231                .borrow()
3232                .order(&client_order_id)
3233                .map(|o| o.clone())
3234            {
3235                Some(order) => order,
3236                None => continue,
3237            };
3238
3239            if !order.is_inflight() && !order.is_open() {
3240                self.purge_stale_core_entry(client_order_id);
3241                continue;
3242            }
3243
3244            self.cancel_order(&order, None);
3245        }
3246    }
3247
3248    // Removes a closed order's stale entry from the matching core so the next
3249    // `iterate_bids/asks` does not produce a spurious fill action.
3250    fn purge_stale_core_entry(&mut self, client_order_id: ClientOrderId) {
3251        if self.core.order_exists(client_order_id) {
3252            let _ = self.core.delete_order(client_order_id);
3253        }
3254        self.cached_filled_qty.swap_remove(&client_order_id);
3255    }
3256
3257    fn resync_core_entry(&mut self, client_order_id: ClientOrderId) -> Option<OrderAny> {
3258        let order = self
3259            .cache
3260            .borrow()
3261            .order(&client_order_id)
3262            .map(|o| o.clone())?;
3263
3264        // Gate on `is_closed`, not `is_open`: cache may transiently hold the
3265        // order in `Submitted` (process_limit_order accepts before cache add)
3266        if order.is_closed() {
3267            let _ = self.core.delete_order(client_order_id);
3268            return Some(order);
3269        }
3270
3271        let new_match_info = Self::matching_core_entry(&order);
3272
3273        // Skip the delete+add when unchanged to preserve FIFO at the level
3274        let unchanged = self
3275            .core
3276            .get_order(client_order_id)
3277            .is_some_and(|existing| *existing == new_match_info);
3278
3279        if unchanged {
3280            return Some(order);
3281        }
3282
3283        let _ = self.core.delete_order(client_order_id);
3284        self.core.add_order(new_match_info);
3285        Some(order)
3286    }
3287
3288    /// Processes a batch cancel orders command.
3289    pub fn process_batch_cancel(&mut self, command: &BatchCancelOrders, account_id: AccountId) {
3290        for order in &command.cancels {
3291            self.process_cancel(order, account_id);
3292        }
3293    }
3294
3295    fn process_market_order(&mut self, order: &OrderAny) {
3296        if order.time_in_force() == TimeInForce::AtTheOpen
3297            || order.time_in_force() == TimeInForce::AtTheClose
3298        {
3299            self.generate_order_rejected(
3300                order,
3301                format!(
3302                    "time in force {} is not currently supported",
3303                    order.time_in_force()
3304                )
3305                .into(),
3306            );
3307            return;
3308        }
3309
3310        // Check if market exists
3311        if (order.order_side() == OrderSide::Buy && self.core.ask.is_none())
3312            || (order.order_side() == OrderSide::Sell && self.core.bid.is_none())
3313        {
3314            self.generate_order_rejected(
3315                order,
3316                format!("No market for {}", order.instrument_id()).into(),
3317            );
3318            return;
3319        }
3320
3321        if self.config.use_market_order_acks {
3322            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3323            self.generate_order_accepted(order, venue_order_id);
3324        }
3325
3326        // Add order to cache for fill_market_order to fetch
3327        if let Err(e) = self
3328            .cache
3329            .borrow_mut()
3330            .add_order(order.clone(), None, None, false)
3331        {
3332            log::debug!("Order already in cache: {e}");
3333        }
3334
3335        self.fill_market_order(order.client_order_id());
3336    }
3337
3338    fn process_limit_order(&mut self, order: &mut OrderAny) {
3339        if order.time_in_force() == TimeInForce::AtTheOpen
3340            || order.time_in_force() == TimeInForce::AtTheClose
3341        {
3342            self.generate_order_rejected(
3343                order,
3344                format!(
3345                    "time in force {} is not currently supported",
3346                    order.time_in_force()
3347                )
3348                .into(),
3349            );
3350            return;
3351        }
3352
3353        let limit_px = order.price().expect("Limit order must have a price");
3354        if order.is_post_only()
3355            && self
3356                .core
3357                .is_limit_matched(order.order_side_specified(), limit_px)
3358        {
3359            self.generate_order_rejected(
3360                order,
3361                format!(
3362                    "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
3363                    order.order_type(),
3364                    order.order_side(),
3365                    order.price().unwrap(),
3366                    self.core
3367                        .bid
3368                        .map_or_else(|| "None".to_string(), |p| p.to_string()),
3369                    self.core
3370                        .ask
3371                        .map_or_else(|| "None".to_string(), |p| p.to_string())
3372                )
3373                .into(),
3374            );
3375            return;
3376        }
3377
3378        // Order is valid and accepted
3379        self.accept_order(order);
3380
3381        // Check for immediate fill
3382        if self
3383            .core
3384            .is_limit_matched(order.order_side_specified(), limit_px)
3385        {
3386            // Filling as liquidity taker
3387            order.set_liquidity_side(LiquiditySide::Taker);
3388
3389            if self
3390                .cache
3391                .borrow_mut()
3392                .add_order(order.clone(), None, None, false)
3393                .is_err()
3394                && let Err(e) = self.cache.borrow_mut().replace_order(order)
3395            {
3396                log::debug!("Failed to update order in cache: {e}");
3397            }
3398            self.fill_limit_order(order.client_order_id());
3399
3400            // If fill didn't execute (e.g. all liquidity consumed), revert to
3401            // maker so the fill model check applies on subsequent iterations
3402            if self.core.order_exists(order.client_order_id())
3403                && let Some(mut order) = self.cache.borrow_mut().order_mut(&order.client_order_id())
3404            {
3405                order.set_liquidity_side(LiquiditySide::Maker);
3406            }
3407        } else if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
3408            self.cancel_order(order, None);
3409        } else {
3410            // Add passive order to cache for later modify/cancel operations
3411            order.set_liquidity_side(LiquiditySide::Maker);
3412
3413            if let Some(price) = order.price() {
3414                self.snapshot_queue_position(order, price);
3415            }
3416
3417            let add_result = self
3418                .cache
3419                .borrow_mut()
3420                .add_order(order.clone(), None, None, false);
3421
3422            if let Err(e) = add_result {
3423                log::debug!("Failed to add order to cache: {e}");
3424
3425                // Persist Maker side on the cached copy when exec engine
3426                // already cached the order (only if not already Maker/Taker)
3427                if let Some(mut order) = self.cache.borrow_mut().order_mut(&order.client_order_id())
3428                    && !matches!(
3429                        order.liquidity_side(),
3430                        Some(LiquiditySide::Maker | LiquiditySide::Taker)
3431                    )
3432                {
3433                    order.set_liquidity_side(LiquiditySide::Maker);
3434                }
3435            }
3436        }
3437    }
3438
3439    fn process_market_to_limit_order(&mut self, order: &OrderAny) {
3440        // Check that market exists
3441        if (order.order_side() == OrderSide::Buy && self.core.ask.is_none())
3442            || (order.order_side() == OrderSide::Sell && self.core.bid.is_none())
3443        {
3444            self.generate_order_rejected(
3445                order,
3446                format!("No market for {}", order.instrument_id()).into(),
3447            );
3448            return;
3449        }
3450
3451        if self.config.use_market_order_acks {
3452            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
3453            self.generate_order_accepted(order, venue_order_id);
3454        }
3455
3456        // Immediately fill marketable order
3457        if let Err(e) = self
3458            .cache
3459            .borrow_mut()
3460            .add_order(order.clone(), None, None, false)
3461        {
3462            log::debug!("Order already in cache: {e}");
3463        }
3464        let client_order_id = order.client_order_id();
3465        self.fill_market_order(client_order_id);
3466
3467        // Check for remaining quantity to rest as limit order
3468        let filled_qty = self
3469            .cached_filled_qty
3470            .get(&client_order_id)
3471            .copied()
3472            .unwrap_or_default();
3473        let leaves_qty = order.quantity().saturating_sub(filled_qty);
3474        if leaves_qty.is_zero() {
3475            self.purge_cached_filled_qty_if_closed(client_order_id);
3476            return;
3477        }
3478
3479        let updated_order = self
3480            .cache
3481            .borrow()
3482            .order(&client_order_id)
3483            .map(|o| o.clone());
3484        if let Some(mut updated_order) = updated_order {
3485            self.accept_order(&mut updated_order);
3486        }
3487    }
3488
3489    fn process_stop_market_order(&mut self, order: &mut OrderAny) {
3490        let stop_px = order
3491            .trigger_price()
3492            .expect("Stop order must have a trigger price");
3493
3494        if self
3495            .core
3496            .is_stop_matched(order.order_side_specified(), stop_px)
3497        {
3498            if self.config.reject_stop_orders {
3499                self.generate_order_rejected(
3500                    order,
3501                    format!(
3502                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3503                        order.order_type(),
3504                        order.order_side(),
3505                        order.trigger_price().unwrap(),
3506                        self.core
3507                            .bid
3508                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3509                        self.core
3510                            .ask
3511                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3512                    ).into(),
3513                );
3514                return;
3515            }
3516
3517            if let Err(e) = self
3518                .cache
3519                .borrow_mut()
3520                .add_order(order.clone(), None, None, false)
3521            {
3522                log::debug!("Order already in cache: {e}");
3523            }
3524            self.fill_market_order(order.client_order_id());
3525            return;
3526        }
3527
3528        // order is not matched but is valid and we accept it
3529        self.accept_order(order);
3530
3531        // Add passive order to cache for later modify/cancel operations
3532        order.set_liquidity_side(LiquiditySide::Maker);
3533
3534        if let Err(e) = self
3535            .cache
3536            .borrow_mut()
3537            .add_order(order.clone(), None, None, false)
3538        {
3539            log::debug!("Order already in cache: {e}");
3540        }
3541    }
3542
3543    fn process_stop_limit_order(&mut self, order: &mut OrderAny) {
3544        let stop_px = order
3545            .trigger_price()
3546            .expect("Stop order must have a trigger price");
3547
3548        if self
3549            .core
3550            .is_stop_matched(order.order_side_specified(), stop_px)
3551        {
3552            if self.config.reject_stop_orders {
3553                self.generate_order_rejected(
3554                    order,
3555                    format!(
3556                        "{} {} order stop px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3557                        order.order_type(),
3558                        order.order_side(),
3559                        order.trigger_price().unwrap(),
3560                        self.core
3561                            .bid
3562                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3563                        self.core
3564                            .ask
3565                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3566                    ).into(),
3567                );
3568                return;
3569            }
3570
3571            self.accept_order(order);
3572            self.generate_order_triggered(order);
3573
3574            // Check for immediate fill
3575            let limit_px = order.price().expect("Stop limit order must have a price");
3576
3577            if self
3578                .core
3579                .is_limit_matched(order.order_side_specified(), limit_px)
3580            {
3581                order.set_liquidity_side(LiquiditySide::Taker);
3582
3583                if let Err(e) = self
3584                    .cache
3585                    .borrow_mut()
3586                    .add_order(order.clone(), None, None, false)
3587                {
3588                    log::debug!("Order already in cache: {e}");
3589                }
3590                self.fill_limit_order(order.client_order_id());
3591            }
3592
3593            // Order was triggered (and possibly filled), don't accept again
3594            return;
3595        }
3596
3597        self.accept_order(order);
3598
3599        // Add passive order to cache for later modify/cancel operations
3600        order.set_liquidity_side(LiquiditySide::Maker);
3601
3602        if let Err(e) = self
3603            .cache
3604            .borrow_mut()
3605            .add_order(order.clone(), None, None, false)
3606        {
3607            log::debug!("Order already in cache: {e}");
3608        }
3609    }
3610
3611    fn process_market_if_touched_order(&mut self, order: &mut OrderAny) {
3612        if self
3613            .core
3614            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
3615        {
3616            if self.config.reject_stop_orders {
3617                self.generate_order_rejected(
3618                    order,
3619                    format!(
3620                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3621                        order.order_type(),
3622                        order.order_side(),
3623                        order.trigger_price().unwrap(),
3624                        self.core
3625                            .bid
3626                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3627                        self.core
3628                            .ask
3629                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3630                    ).into(),
3631                );
3632                return;
3633            }
3634
3635            if let Err(e) = self
3636                .cache
3637                .borrow_mut()
3638                .add_order(order.clone(), None, None, false)
3639            {
3640                log::debug!("Order already in cache: {e}");
3641            }
3642            self.fill_market_order(order.client_order_id());
3643            return;
3644        }
3645
3646        // Order is valid and accepted
3647        self.accept_order(order);
3648
3649        // Add passive order to cache for later modify/cancel operations
3650        order.set_liquidity_side(LiquiditySide::Maker);
3651
3652        if let Err(e) = self
3653            .cache
3654            .borrow_mut()
3655            .add_order(order.clone(), None, None, false)
3656        {
3657            log::debug!("Order already in cache: {e}");
3658        }
3659    }
3660
3661    fn process_limit_if_touched_order(&mut self, order: &mut OrderAny) {
3662        if self
3663            .core
3664            .is_touch_triggered(order.order_side_specified(), order.trigger_price().unwrap())
3665        {
3666            if self.config.reject_stop_orders {
3667                self.generate_order_rejected(
3668                    order,
3669                    format!(
3670                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3671                        order.order_type(),
3672                        order.order_side(),
3673                        order.trigger_price().unwrap(),
3674                        self.core
3675                            .bid
3676                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3677                        self.core
3678                            .ask
3679                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3680                    ).into(),
3681                );
3682                return;
3683            }
3684            self.accept_order(order);
3685            self.generate_order_triggered(order);
3686
3687            // Check if immediate marketable
3688            if self
3689                .core
3690                .is_limit_matched(order.order_side_specified(), order.price().unwrap())
3691            {
3692                order.set_liquidity_side(LiquiditySide::Taker);
3693
3694                if let Err(e) = self
3695                    .cache
3696                    .borrow_mut()
3697                    .add_order(order.clone(), None, None, false)
3698                {
3699                    log::debug!("Order already in cache: {e}");
3700                }
3701                self.fill_limit_order(order.client_order_id());
3702            }
3703            return;
3704        }
3705
3706        // Order is valid and accepted
3707        self.accept_order(order);
3708
3709        // Add passive order to cache for later modify/cancel operations
3710        order.set_liquidity_side(LiquiditySide::Maker);
3711
3712        if let Err(e) = self
3713            .cache
3714            .borrow_mut()
3715            .add_order(order.clone(), None, None, false)
3716        {
3717            log::debug!("Order already in cache: {e}");
3718        }
3719    }
3720
3721    fn process_trailing_stop_order(&mut self, order: &mut OrderAny) {
3722        if let Some(trigger_price) = order.trigger_price()
3723            && self
3724                .core
3725                .is_stop_matched(order.order_side_specified(), trigger_price)
3726        {
3727            self.generate_order_rejected(
3728                    order,
3729                    format!(
3730                        "{} {} order trigger px of {} was in the market: bid={}, ask={}, but rejected because of configuration",
3731                        order.order_type(),
3732                        order.order_side(),
3733                        trigger_price,
3734                        self.core
3735                            .bid
3736                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
3737                        self.core
3738                            .ask
3739                            .map_or_else(|| "None".to_string(), |p| p.to_string())
3740                    ).into(),
3741                );
3742            return;
3743        }
3744
3745        // Set Maker before `accept_order` so trail-on-accept's cache write
3746        // captures it (a later `set_liquidity_side` would be dropped by the
3747        // `add_order` no-op below).
3748        order.set_liquidity_side(LiquiditySide::Maker);
3749
3750        self.accept_order(order);
3751
3752        if let Err(e) = self
3753            .cache
3754            .borrow_mut()
3755            .add_order(order.clone(), None, None, false)
3756        {
3757            log::debug!("Order already in cache: {e}");
3758        }
3759    }
3760
3761    /// Iterate the matching engine by processing the bid and ask order sides
3762    /// and advancing time up to the given UNIX `timestamp_ns`.
3763    ///
3764    /// The `aggressor_side` parameter is used for trade execution processing.
3765    /// When not `NoAggressor`, the book-based bid/ask reset is skipped to preserve
3766    /// transient trade price overrides.
3767    pub fn iterate(&mut self, timestamp_ns: UnixNanos, aggressor_side: AggressorSide) {
3768        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
3769        self.purge_closed_cached_filled_qty();
3770
3771        // Only reset bid/ask from book when not processing trade execution
3772        // (preserves transient trade price override for L2/L3 books). The
3773        // `last_trade_size` gate covers the no-aggressor trade-tick path
3774        // where `process_trade_tick` overrides both sides to the trade
3775        // price; without it the override is undone here.
3776        if aggressor_side == AggressorSide::NoAggressor && self.last_trade_size.is_none() {
3777            if let Some(bid) = self.book.best_bid_price() {
3778                self.core.set_bid_raw(bid);
3779            }
3780
3781            if let Some(ask) = self.book.best_ask_price() {
3782                self.core.set_ask_raw(ask);
3783            }
3784        }
3785
3786        // Process bid actions before snapshotting asks so cross-side
3787        // contingencies (OCO/OUO) mutate state between sides
3788        for action in self.core.iterate_bids() {
3789            match action {
3790                MatchAction::FillLimit(id) => self.fill_limit_order(id),
3791                MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
3792            }
3793        }
3794
3795        for action in self.core.iterate_asks() {
3796            match action {
3797                MatchAction::FillLimit(id) => self.fill_limit_order(id),
3798                MatchAction::TriggerStop(id) => self.trigger_stop_order(id),
3799            }
3800        }
3801
3802        let order_ids: Vec<ClientOrderId> =
3803            self.core.iter_orders().map(|m| m.client_order_id).collect();
3804
3805        for client_order_id in order_ids {
3806            let order = match self
3807                .cache
3808                .borrow()
3809                .order(&client_order_id)
3810                .map(|o| o.clone())
3811            {
3812                Some(order) => order,
3813                None => continue,
3814            };
3815
3816            if order.is_closed() {
3817                let _ = self.core.delete_order(client_order_id);
3818                self.cached_filled_qty.swap_remove(&client_order_id);
3819                continue;
3820            }
3821
3822            if self.config.support_gtd_orders
3823                && let Some(expire_ns) = order.expire_time()
3824                && timestamp_ns >= expire_ns
3825            {
3826                let _ = self.core.delete_order(client_order_id);
3827                self.cached_filled_qty.swap_remove(&client_order_id);
3828                self.expire_order(&order);
3829                continue;
3830            }
3831
3832            if matches!(
3833                order.order_type(),
3834                OrderType::TrailingStopMarket | OrderType::TrailingStopLimit
3835            ) {
3836                let mut any = order;
3837                if self.maybe_activate_trailing_stop(
3838                    &mut any,
3839                    self.core.bid,
3840                    self.core.ask,
3841                    self.core.last,
3842                ) {
3843                    self.update_trailing_stop_order(&any);
3844                    self.resync_core_entry(client_order_id);
3845                }
3846            }
3847
3848            // Single-shot: only the first order after a trigger fill sees
3849            // the mutated core; the restore clears the override here.
3850            if self.target_bid.is_some() || self.target_ask.is_some() || self.target_last.is_some()
3851            {
3852                if let Some(t) = self.target_bid.take() {
3853                    self.core.bid = Some(t);
3854                }
3855
3856                if let Some(t) = self.target_ask.take() {
3857                    self.core.ask = Some(t);
3858                }
3859
3860                if let Some(t) = self.target_last.take() {
3861                    self.core.last = Some(t);
3862                }
3863            }
3864        }
3865
3866        // Fallback for when the per-order loop hit no eligible order (e.g.,
3867        // all closed by the matching pass) so the fill override on
3868        // `core.last` cannot leak into the next iterate.
3869        if let Some(t) = self.target_bid.take() {
3870            self.core.bid = Some(t);
3871        }
3872
3873        if let Some(t) = self.target_ask.take() {
3874            self.core.ask = Some(t);
3875        }
3876
3877        if let Some(t) = self.target_last.take() {
3878            self.core.last = Some(t);
3879        }
3880
3881        // Restore core bid/ask to book values after iteration
3882        // (during trade execution, transient override was used for matching)
3883        self.core.bid = self.book.best_bid_price();
3884        self.core.ask = self.book.best_ask_price();
3885
3886        // Process instrument expiration last so orders at the expiration tick
3887        // get a chance to fill before positions are closed.
3888        self.check_instrument_expiration(timestamp_ns);
3889        self.purge_closed_cached_filled_qty();
3890    }
3891
3892    fn get_trailing_activation_price(
3893        &self,
3894        trigger_type: TriggerType,
3895        order_side: OrderSide,
3896        bid: Option<Price>,
3897        ask: Option<Price>,
3898        last: Option<Price>,
3899    ) -> Option<Price> {
3900        match trigger_type {
3901            TriggerType::LastPrice => last,
3902            TriggerType::LastOrBidAsk => last.or(match order_side {
3903                OrderSide::Buy => ask,
3904                OrderSide::Sell => bid,
3905                _ => None,
3906            }),
3907            // Default, BidAsk, DoubleBidAsk, DoubleLastPrice, IndexPrice, MarkPrice
3908            _ => match order_side {
3909                OrderSide::Buy => ask,
3910                OrderSide::Sell => bid,
3911                _ => None,
3912            },
3913        }
3914    }
3915
3916    fn maybe_activate_trailing_stop(
3917        &self,
3918        order: &mut OrderAny,
3919        bid: Option<Price>,
3920        ask: Option<Price>,
3921        last: Option<Price>,
3922    ) -> bool {
3923        match order {
3924            OrderAny::TrailingStopMarket(inner) => {
3925                if inner.is_activated {
3926                    return true;
3927                }
3928
3929                if inner.activation_price.is_none() {
3930                    let px = self.get_trailing_activation_price(
3931                        inner.trigger_type,
3932                        inner.order_side(),
3933                        bid,
3934                        ask,
3935                        last,
3936                    );
3937
3938                    if let Some(p) = px {
3939                        inner.activation_price = Some(p);
3940                        inner.set_activated();
3941
3942                        if let Err(e) = self.cache.borrow_mut().replace_order(order) {
3943                            log::error!("Failed to update order: {e}");
3944                        }
3945                        return true;
3946                    }
3947                    return false;
3948                }
3949
3950                let activation_price = inner.activation_price.unwrap();
3951                let hit = match inner.order_side() {
3952                    OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
3953                    OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
3954                    _ => false,
3955                };
3956
3957                if hit {
3958                    inner.set_activated();
3959
3960                    if let Err(e) = self.cache.borrow_mut().replace_order(order) {
3961                        log::error!("Failed to update order: {e}");
3962                    }
3963                }
3964                hit
3965            }
3966            OrderAny::TrailingStopLimit(inner) => {
3967                if inner.is_activated {
3968                    return true;
3969                }
3970
3971                if inner.activation_price.is_none() {
3972                    let px = self.get_trailing_activation_price(
3973                        inner.trigger_type,
3974                        inner.order_side(),
3975                        bid,
3976                        ask,
3977                        last,
3978                    );
3979
3980                    if let Some(p) = px {
3981                        inner.activation_price = Some(p);
3982                        inner.set_activated();
3983
3984                        if let Err(e) = self.cache.borrow_mut().replace_order(order) {
3985                            log::error!("Failed to update order: {e}");
3986                        }
3987                        return true;
3988                    }
3989                    return false;
3990                }
3991
3992                let activation_price = inner.activation_price.unwrap();
3993                let hit = match inner.order_side() {
3994                    OrderSide::Buy => ask.is_some_and(|a| a <= activation_price),
3995                    OrderSide::Sell => bid.is_some_and(|b| b >= activation_price),
3996                    _ => false,
3997                };
3998
3999                if hit {
4000                    inner.set_activated();
4001
4002                    if let Err(e) = self.cache.borrow_mut().replace_order(order) {
4003                        log::error!("Failed to update order: {e}");
4004                    }
4005                }
4006                hit
4007            }
4008            _ => true,
4009        }
4010    }
4011
4012    fn determine_limit_price_and_volume(&mut self, order: &OrderAny) -> Vec<(Price, Quantity)> {
4013        match order.price() {
4014            Some(order_price) => {
4015                // When liquidity consumption is enabled, get ALL crossed levels so that
4016                // consumed levels can be filtered out while still finding valid ones.
4017                // Otherwise simulate_fills only returns enough levels to satisfy leaves_qty,
4018                // which may all be consumed, missing other valid crossed levels.
4019                let mut fills = if self.config.liquidity_consumption {
4020                    let size_prec = self.instrument.size_precision();
4021                    self.book
4022                        .get_all_crossed_levels(order.order_side(), order_price, size_prec)
4023                } else {
4024                    let book_order =
4025                        BookOrder::new(order.order_side(), order_price, order.quantity(), 1);
4026                    self.book.simulate_fills(&book_order)
4027                };
4028
4029                // Trade execution: use trade-driven fill when book doesn't reflect trade price
4030                if let Some(trade_size) = self.last_trade_size
4031                    && let Some(trade_price) = self.core.last
4032                {
4033                    let fills_at_trade_price = fills.iter().any(|(px, _)| *px == trade_price);
4034
4035                    if !fills_at_trade_price
4036                        && self
4037                            .core
4038                            .is_limit_matched(order.order_side_specified(), order_price)
4039                    {
4040                        // Fill model check for MAKER at limit is already handled in fill_limit_order,
4041                        // don't re-check here to avoid calling is_limit_filled() twice (p² probability).
4042                        let leaves_qty = order.leaves_qty();
4043                        let available_qty = if self.config.liquidity_consumption {
4044                            let remaining = trade_size.raw.saturating_sub(self.trade_consumption);
4045                            Quantity::from_raw(remaining, trade_size.precision)
4046                        } else {
4047                            trade_size
4048                        };
4049
4050                        let fill_qty = min(leaves_qty, available_qty);
4051
4052                        if !fill_qty.is_zero() {
4053                            log::debug!(
4054                                "Trade execution fill: {} @ {} (trade_price={}, available: {}, book had {} fills)",
4055                                fill_qty,
4056                                order_price,
4057                                trade_price,
4058                                available_qty,
4059                                fills.len()
4060                            );
4061
4062                            if self.config.liquidity_consumption {
4063                                self.trade_consumption += fill_qty.raw;
4064                            }
4065
4066                            // Fill at the limit price (conservative) rather than the trade price.
4067                            // Trade execution fills already account for consumption via trade_consumption,
4068                            // return early to bypass apply_liquidity_consumption which would incorrectly
4069                            // discard these fills when the trade price isn't in the order book.
4070                            return vec![(order_price, fill_qty)];
4071                        }
4072                    }
4073                }
4074
4075                // Return immediately if no fills
4076                if fills.is_empty() {
4077                    return fills;
4078                }
4079
4080                // Save original book prices BEFORE any fill price modifications for consumption tracking,
4081                // since the TAKER and MAKER loops below may adjust fill prices. Consumption should be
4082                // tracked against the original book price levels where liquidity was sourced from.
4083                let book_prices: Vec<Price> = if self.config.liquidity_consumption {
4084                    fills.iter().map(|(px, _)| *px).collect()
4085                } else {
4086                    Vec::new()
4087                };
4088                let book_prices_ref: Option<&[Price]> = if book_prices.is_empty() {
4089                    None
4090                } else {
4091                    Some(&book_prices)
4092                };
4093
4094                // check if trigger price exists
4095                if let Some(triggered_price) = order.trigger_price() {
4096                    // Filling as TAKER from trigger
4097                    if order
4098                        .liquidity_side()
4099                        .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Taker)
4100                    {
4101                        if order.order_side() == OrderSide::Sell && order_price > triggered_price {
4102                            // manually change the fills index 0
4103                            let first_fill = fills.first().unwrap();
4104                            let triggered_qty = first_fill.1;
4105                            fills[0] = (triggered_price, triggered_qty);
4106                            self.target_bid = self.core.bid;
4107                            self.target_ask = self.core.ask;
4108                            self.target_last = self.core.last;
4109                            self.core.set_ask_raw(order_price);
4110                            self.core.set_last_raw(order_price);
4111                        } else if order.order_side() == OrderSide::Buy
4112                            && order_price < triggered_price
4113                        {
4114                            // manually change the fills index 0
4115                            let first_fill = fills.first().unwrap();
4116                            let triggered_qty = first_fill.1;
4117                            fills[0] = (triggered_price, triggered_qty);
4118                            self.target_bid = self.core.bid;
4119                            self.target_ask = self.core.ask;
4120                            self.target_last = self.core.last;
4121                            self.core.set_bid_raw(order_price);
4122                            self.core.set_last_raw(order_price);
4123                        }
4124                    }
4125                }
4126
4127                // Filling as MAKER from trigger
4128                if order
4129                    .liquidity_side()
4130                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
4131                {
4132                    match order.order_side().as_specified() {
4133                        OrderSideSpecified::Buy => {
4134                            let target_price = if order
4135                                .trigger_price()
4136                                .is_some_and(|trigger_price| order_price > trigger_price)
4137                            {
4138                                order.trigger_price().unwrap()
4139                            } else {
4140                                order_price
4141                            };
4142
4143                            for fill in &mut fills {
4144                                let last_px = fill.0;
4145                                if last_px < order_price {
4146                                    // Marketable BUY would have filled at limit
4147                                    self.target_bid = self.core.bid;
4148                                    self.target_ask = self.core.ask;
4149                                    self.target_last = self.core.last;
4150                                    self.core.set_ask_raw(target_price);
4151                                    self.core.set_last_raw(target_price);
4152                                    fill.0 = target_price;
4153                                }
4154                            }
4155                        }
4156                        OrderSideSpecified::Sell => {
4157                            let target_price = if order
4158                                .trigger_price()
4159                                .is_some_and(|trigger_price| order_price < trigger_price)
4160                            {
4161                                order.trigger_price().unwrap()
4162                            } else {
4163                                order_price
4164                            };
4165
4166                            for fill in &mut fills {
4167                                let last_px = fill.0;
4168                                if last_px > order_price {
4169                                    // Marketable SELL would have filled at limit
4170                                    self.target_bid = self.core.bid;
4171                                    self.target_ask = self.core.ask;
4172                                    self.target_last = self.core.last;
4173                                    self.core.set_bid_raw(target_price);
4174                                    self.core.set_last_raw(target_price);
4175                                    fill.0 = target_price;
4176                                }
4177                            }
4178                        }
4179                    }
4180                }
4181
4182                self.apply_liquidity_consumption(
4183                    fills,
4184                    order.order_side(),
4185                    order.leaves_qty(),
4186                    book_prices_ref,
4187                )
4188            }
4189            None => panic!("Limit order must have a price"),
4190        }
4191    }
4192
4193    fn determine_market_price_and_volume(&self, order: &OrderAny) -> Vec<(Price, Quantity)> {
4194        let price = match order.order_side().as_specified() {
4195            OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
4196            OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
4197        };
4198
4199        // When liquidity consumption is enabled, get ALL crossed levels so that
4200        // consumed levels can be filtered out while still finding valid ones.
4201        let mut fills = if self.config.liquidity_consumption {
4202            let size_prec = self.instrument.size_precision();
4203            self.book
4204                .get_all_crossed_levels(order.order_side(), price, size_prec)
4205        } else {
4206            let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
4207            self.book.simulate_fills(&book_order)
4208        };
4209
4210        // For stop market and market-if-touched orders during bar H/L/C processing, fill at trigger price
4211        // (market moved through the trigger). For gaps/immediate triggers, fill at market.
4212        if !self.fill_at_market
4213            && self.book_type == BookType::L1_MBP
4214            && !fills.is_empty()
4215            && matches!(
4216                order.order_type(),
4217                OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
4218            )
4219            && let Some(trigger_price) = order.trigger_price()
4220        {
4221            fills[0] = (trigger_price, fills[0].1);
4222
4223            // Skip liquidity consumption for trigger price fills (gap price may not exist in book).
4224            let mut remaining_qty = order.leaves_qty().raw;
4225            let mut capped_fills = Vec::with_capacity(fills.len());
4226
4227            for (price, qty) in fills {
4228                if remaining_qty == 0 {
4229                    break;
4230                }
4231
4232                let capped_qty_raw = min(qty.raw, remaining_qty);
4233                if capped_qty_raw == 0 {
4234                    continue;
4235                }
4236
4237                remaining_qty -= capped_qty_raw;
4238                capped_fills.push((price, Quantity::from_raw(capped_qty_raw, qty.precision)));
4239            }
4240
4241            return capped_fills;
4242        }
4243
4244        fills
4245    }
4246
4247    fn determine_market_fill_model_price_and_volume(
4248        &mut self,
4249        order: &OrderAny,
4250    ) -> (Vec<(Price, Quantity)>, bool) {
4251        if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
4252            && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
4253                &self.instrument,
4254                order,
4255                best_bid,
4256                best_ask,
4257            )
4258        {
4259            let price = match order.order_side().as_specified() {
4260                OrderSideSpecified::Buy => Price::max(FIXED_PRECISION),
4261                OrderSideSpecified::Sell => Price::min(FIXED_PRECISION),
4262            };
4263            let book_order = BookOrder::new(order.order_side(), price, order.quantity(), 0);
4264            let fills = book.simulate_fills(&book_order);
4265            if !fills.is_empty() {
4266                return (fills, true);
4267            }
4268        }
4269        (self.determine_market_price_and_volume(order), false)
4270    }
4271
4272    fn determine_limit_fill_model_price_and_volume(
4273        &mut self,
4274        order: &OrderAny,
4275    ) -> Vec<(Price, Quantity)> {
4276        if let (Some(best_bid), Some(best_ask)) = (self.core.bid, self.core.ask)
4277            && let Some(book) = self.fill_model.get_orderbook_for_fill_simulation(
4278                &self.instrument,
4279                order,
4280                best_bid,
4281                best_ask,
4282            )
4283            && let Some(limit_price) = order.price()
4284        {
4285            let book_order = BookOrder::new(order.order_side(), limit_price, order.quantity(), 0);
4286            let fills = book.simulate_fills(&book_order);
4287            if !fills.is_empty() {
4288                return fills;
4289            }
4290        }
4291        self.determine_limit_price_and_volume(order)
4292    }
4293
4294    /// Fills a market order against the current order book.
4295    ///
4296    /// The order is filled as a taker against available liquidity.
4297    /// Reduce-only orders are canceled if no position exists.
4298    pub fn fill_market_order(&mut self, client_order_id: ClientOrderId) {
4299        let mut order = match self
4300            .cache
4301            .borrow()
4302            .order(&client_order_id)
4303            .map(|o| o.clone())
4304        {
4305            Some(order) => order,
4306            None => {
4307                log::error!("Cannot fill market order: order {client_order_id} not found in cache");
4308                return;
4309            }
4310        };
4311
4312        if order.is_closed() {
4313            self.purge_stale_core_entry(client_order_id);
4314            return;
4315        }
4316
4317        // Convert quote-denominated quantity at fill time for trigger-style market
4318        // orders that skipped conversion at submission. Idempotent: orders already
4319        // converted have `is_quote_quantity == false`.
4320        if order.is_quote_quantity()
4321            && !self.instrument.is_inverse()
4322            && !self.convert_quote_to_base_quantity(&mut order)
4323        {
4324            return;
4325        }
4326
4327        if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id())
4328            && filled_qty >= &order.quantity()
4329        {
4330            log::debug!(
4331                "Ignoring fill as already filled pending application of events: {:?}, {:?}, {:?}, {:?}",
4332                filled_qty,
4333                order.quantity(),
4334                order.filled_qty(),
4335                order.quantity()
4336            );
4337            return;
4338        }
4339
4340        let venue_position_id = self.ids_generator.get_position_id(&order, Some(true));
4341        let position: Option<Position> = if let Some(venue_position_id) = venue_position_id {
4342            let cache = self.cache.as_ref().borrow();
4343            cache.position_owned(&venue_position_id)
4344        } else {
4345            None
4346        };
4347
4348        if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
4349            log::warn!(
4350                "Canceling REDUCE_ONLY {} as would increase position",
4351                order.order_type()
4352            );
4353            self.cancel_order(&order, None);
4354            return;
4355        }
4356
4357        order.set_liquidity_side(LiquiditySide::Taker);
4358        let (mut fills, from_synthetic) = self.determine_market_fill_model_price_and_volume(&order);
4359
4360        // Apply protection price filtering at fill time (trigger-time semantics for stops)
4361        let protection_price: Option<Price> = if let Some(protection_points) =
4362            self.config.price_protection_points
4363            && matches!(
4364                order.order_type(),
4365                OrderType::Market | OrderType::StopMarket
4366            ) {
4367            protection_price_calculate(
4368                self.instrument.price_increment(),
4369                &order,
4370                protection_points,
4371                self.core.bid,
4372                self.core.ask,
4373            )
4374            .ok()
4375        } else {
4376            None
4377        };
4378
4379        if let Some(protection_price) = protection_price {
4380            fills = self.filter_fills_by_protection(fills, &order, protection_price);
4381        }
4382
4383        // Skip consumption for synthetic fill-model books (prices may not exist
4384        // in the real book) and trigger price fills (gap price may not exist)
4385        let is_trigger_price_fill = !self.fill_at_market
4386            && self.book_type == BookType::L1_MBP
4387            && matches!(
4388                order.order_type(),
4389                OrderType::StopMarket | OrderType::TrailingStopMarket | OrderType::MarketIfTouched
4390            )
4391            && order.trigger_price().is_some();
4392
4393        if !from_synthetic && !is_trigger_price_fill {
4394            fills = self.apply_liquidity_consumption(
4395                fills,
4396                order.order_side(),
4397                order.leaves_qty(),
4398                None,
4399            );
4400        }
4401
4402        self.apply_fills(
4403            &order,
4404            &fills,
4405            LiquiditySide::Taker,
4406            None,
4407            position.as_ref(),
4408            protection_price,
4409        );
4410    }
4411
4412    fn filter_fills_by_protection(
4413        &self,
4414        fills: Vec<(Price, Quantity)>,
4415        order: &OrderAny,
4416        protection_price: Price,
4417    ) -> Vec<(Price, Quantity)> {
4418        let protection_raw = protection_price.raw;
4419        fills
4420            .into_iter()
4421            .filter(|(fill_price, _)| {
4422                match order.order_side() {
4423                    // BUY: only fill at prices <= protection_price
4424                    OrderSide::Buy => fill_price.raw <= protection_raw,
4425                    // SELL: only fill at prices >= protection_price
4426                    OrderSide::Sell => fill_price.raw >= protection_raw,
4427                    OrderSide::NoOrderSide => false,
4428                }
4429            })
4430            .collect()
4431    }
4432
4433    /// Attempts to fill a limit order against the current order book.
4434    ///
4435    /// Determines fill prices and quantities based on available liquidity,
4436    /// then applies the fills to the order.
4437    ///
4438    /// # Panics
4439    ///
4440    /// Panics if the order has no price (design error).
4441    pub fn fill_limit_order(&mut self, client_order_id: ClientOrderId) {
4442        let mut order = match self
4443            .cache
4444            .borrow()
4445            .order(&client_order_id)
4446            .map(|o| o.clone())
4447        {
4448            Some(order) => order,
4449            None => {
4450                log::error!("Cannot fill limit order: order {client_order_id} not found in cache");
4451                return;
4452            }
4453        };
4454
4455        if order.is_closed() {
4456            self.purge_stale_core_entry(client_order_id);
4457            return;
4458        }
4459
4460        // Convert quote-denominated quantity at fill time for orders that entered
4461        // this path still carrying a quote notional (e.g. trailing-stop-limit with
4462        // a late-assigned price). Idempotent for already-converted orders.
4463        if order.is_quote_quantity()
4464            && !self.instrument.is_inverse()
4465            && !self.convert_quote_to_base_quantity(&mut order)
4466        {
4467            return;
4468        }
4469
4470        match order.price() {
4471            Some(order_price) => {
4472                let cached_filled_qty = self.cached_filled_qty.get(&order.client_order_id());
4473                if let Some(&qty) = cached_filled_qty
4474                    && qty >= order.quantity()
4475                {
4476                    log::debug!(
4477                        "Ignoring fill as already filled pending application of events: {}, {}, {}, {}",
4478                        qty,
4479                        order.quantity(),
4480                        order.filled_qty(),
4481                        order.leaves_qty(),
4482                    );
4483                    return;
4484                }
4485
4486                // Check fill model for MAKER orders at the limit price
4487                if order
4488                    .liquidity_side()
4489                    .is_some_and(|liquidity_side| liquidity_side == LiquiditySide::Maker)
4490                {
4491                    // For trade execution: check if trade price equals order price
4492                    // For quote updates: check if bid/ask equals order price
4493                    let at_limit = if self.last_trade_size.is_some() && self.core.last.is_some() {
4494                        self.core.last.is_some_and(|last| last == order_price)
4495                    } else if order.order_side() == OrderSide::Buy {
4496                        self.core.bid.is_some_and(|bid| bid == order_price)
4497                    } else {
4498                        self.core.ask.is_some_and(|ask| ask == order_price)
4499                    };
4500
4501                    if at_limit && !self.fill_model.is_limit_filled() {
4502                        return; // Not filled (simulates queue position)
4503                    }
4504                }
4505
4506                let queue_allowed_raw = if self.config.queue_position {
4507                    match self.determine_trade_fill_qty(&order) {
4508                        None | Some(0) => {
4509                            if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc)
4510                            {
4511                                self.cancel_order(&order, None);
4512                            }
4513                            return;
4514                        }
4515                        Some(allowed) => Some(allowed),
4516                    }
4517                } else {
4518                    None
4519                };
4520
4521                let venue_position_id = self.ids_generator.get_position_id(&order, None);
4522                let position = if let Some(venue_position_id) = venue_position_id {
4523                    let cache = self.cache.as_ref().borrow();
4524                    cache.position_owned(&venue_position_id)
4525                } else {
4526                    None
4527                };
4528
4529                if self.config.use_reduce_only && order.is_reduce_only() && position.is_none() {
4530                    log::warn!(
4531                        "Canceling REDUCE_ONLY {} as would increase position",
4532                        order.order_type()
4533                    );
4534                    self.cancel_order(&order, None);
4535                    return;
4536                }
4537
4538                let tc_before = self.trade_consumption;
4539                let mut fills = self.determine_limit_fill_model_price_and_volume(&order);
4540
4541                if let Some(allowed_raw) = queue_allowed_raw {
4542                    let size_prec = self.instrument.size_precision();
4543                    let mut remaining = allowed_raw;
4544                    fills = fills
4545                        .into_iter()
4546                        .filter_map(|(price, qty)| {
4547                            if remaining == 0 {
4548                                return None;
4549                            }
4550                            let capped = qty.raw.min(remaining);
4551                            remaining -= capped;
4552                            Some((price, Quantity::from_raw(capped, size_prec)))
4553                        })
4554                        .collect();
4555
4556                    // Consume excess and reconcile trade budget after capping
4557                    let consumed: QuantityRaw = fills.iter().map(|(_, qty)| qty.raw).sum();
4558
4559                    if let Some(excess) = self.queue_excess.get_mut(&order.client_order_id()) {
4560                        *excess = excess.saturating_sub(consumed);
4561                    }
4562                    self.trade_consumption = tc_before + consumed;
4563                }
4564
4565                // Skip apply_fills when consumed-liquidity adjustment produces no fills.
4566                // This occurs for partially filled orders when an unrelated delta arrives
4567                // and no new liquidity is available at the order's price level.
4568                if fills.is_empty() && self.config.liquidity_consumption {
4569                    log::debug!(
4570                        "Skipping fill for {}: no liquidity available after consumption",
4571                        order.client_order_id()
4572                    );
4573
4574                    if matches!(order.time_in_force(), TimeInForce::Fok | TimeInForce::Ioc) {
4575                        self.cancel_order(&order, None);
4576                    }
4577
4578                    return;
4579                }
4580
4581                let liquidity_side = order.liquidity_side().unwrap();
4582                self.apply_fills(
4583                    &order,
4584                    &fills,
4585                    liquidity_side,
4586                    venue_position_id,
4587                    position.as_ref(),
4588                    None,
4589                );
4590            }
4591            None => panic!("Limit order must have a price"),
4592        }
4593    }
4594
4595    fn apply_fills(
4596        &mut self,
4597        order: &OrderAny,
4598        fills: &[(Price, Quantity)],
4599        liquidity_side: LiquiditySide,
4600        venue_position_id: Option<PositionId>,
4601        position: Option<&Position>,
4602        protection_price: Option<Price>,
4603    ) {
4604        if order.time_in_force() == TimeInForce::Fok {
4605            let mut total_size = Quantity::zero(order.quantity().precision);
4606
4607            for &(fill_px, fill_qty) in fills {
4608                if self
4609                    .normalize_price_for_current_instrument(fill_px)
4610                    .is_some()
4611                    && let Some(fill_qty) = self.normalize_quantity_for_current_instrument(fill_qty)
4612                {
4613                    total_size = total_size.add(fill_qty);
4614                }
4615            }
4616
4617            if order.leaves_qty() > total_size {
4618                self.cancel_order(order, None);
4619                return;
4620            }
4621        }
4622
4623        if fills.is_empty() {
4624            if order.status() == OrderStatus::Submitted {
4625                self.generate_order_rejected(
4626                    order,
4627                    format!("No market for {}", order.instrument_id()).into(),
4628                );
4629            } else {
4630                log::error!(
4631                    "Cannot fill order: no fills from book when fills were expected (check size in data)"
4632                );
4633                return;
4634            }
4635        }
4636
4637        // For netting mode, don't use venue position ID (use None instead)
4638        let venue_position_id = if self.oms_type == OmsType::Netting {
4639            None
4640        } else {
4641            venue_position_id
4642        };
4643
4644        let mut initial_market_to_limit_fill = false;
4645        let mut total_filled = self
4646            .cached_filled_qty
4647            .get(&order.client_order_id())
4648            .copied()
4649            .unwrap_or_else(|| order.filled_qty());
4650        let initial_total_filled = total_filled;
4651        let mut last_fill_px: Option<Price> = None;
4652
4653        for &(fill_px, fill_qty) in fills {
4654            let Some(mut fill_px) = self.normalize_fill_price(fill_px, order.client_order_id())
4655            else {
4656                continue;
4657            };
4658
4659            let Some(fill_qty) = self.normalize_fill_quantity(fill_qty, order.client_order_id())
4660            else {
4661                continue;
4662            };
4663
4664            if order.filled_qty() == Quantity::zero(order.filled_qty().precision)
4665                && order.order_type() == OrderType::MarketToLimit
4666            {
4667                self.generate_order_updated(order, order.quantity(), Some(fill_px), None, None);
4668                initial_market_to_limit_fill = true;
4669            }
4670
4671            if self.book_type == BookType::L1_MBP && self.fill_model.is_slipped() {
4672                fill_px = match order.order_side().as_specified() {
4673                    OrderSideSpecified::Buy => fill_px.add(self.instrument.price_increment()),
4674                    OrderSideSpecified::Sell => fill_px.sub(self.instrument.price_increment()),
4675                }
4676            }
4677
4678            // Check reduce only order
4679            // If the incoming simulated fill would exceed the position when reduce-only is honored,
4680            // clamp the effective fill size to the adjusted (remaining position) quantity.
4681            let mut effective_fill_qty = fill_qty;
4682
4683            if self.config.use_reduce_only
4684                && order.is_reduce_only()
4685                && let Some(position) = &position
4686                && fill_qty > position.quantity
4687            {
4688                if position.quantity == Quantity::zero(position.quantity.precision) {
4689                    // Done
4690                    return;
4691                }
4692
4693                // Adjusted target quantity equals the remaining position size
4694                let adjusted_fill_qty =
4695                    Quantity::from_raw(position.quantity.raw, fill_qty.precision);
4696
4697                // Determine the effective fill size for this iteration first
4698                effective_fill_qty = min(effective_fill_qty, adjusted_fill_qty);
4699
4700                // Only emit an update if the order quantity actually changes
4701                if order.quantity() != adjusted_fill_qty {
4702                    self.generate_order_updated(order, adjusted_fill_qty, None, None, None);
4703                }
4704            }
4705
4706            if fill_qty.is_zero() {
4707                if fills.len() == 1 && order.status() == OrderStatus::Submitted {
4708                    self.generate_order_rejected(
4709                        order,
4710                        format!("No market for {}", order.instrument_id()).into(),
4711                    );
4712                }
4713                return;
4714            }
4715
4716            // Mirror `fill_order`'s leaves cap
4717            let capped_fill_qty = min(
4718                effective_fill_qty,
4719                order.quantity().saturating_sub(total_filled),
4720            );
4721            total_filled = total_filled.add(capped_fill_qty);
4722
4723            self.fill_order(
4724                order,
4725                fill_px,
4726                effective_fill_qty,
4727                liquidity_side,
4728                venue_position_id,
4729                position,
4730            );
4731            last_fill_px = Some(fill_px);
4732
4733            if order.order_type() == OrderType::MarketToLimit && initial_market_to_limit_fill {
4734                // Filled initial level
4735                return;
4736            }
4737        }
4738
4739        let leaves_remaining = total_filled < order.quantity();
4740        let filled_in_loop = total_filled > initial_total_filled;
4741
4742        if order.time_in_force() == TimeInForce::Ioc && leaves_remaining {
4743            self.cancel_order(order, None);
4744            return;
4745        }
4746
4747        // `filled_in_loop` covers the just-partially-filled case where the
4748        // local clone's status has not seen the fill events yet.
4749        if leaves_remaining
4750            && (order.is_open() || filled_in_loop)
4751            && self.book_type == BookType::L1_MBP
4752            && matches!(
4753                order.order_type(),
4754                OrderType::Market
4755                    | OrderType::MarketIfTouched
4756                    | OrderType::StopMarket
4757                    | OrderType::TrailingStopMarket
4758            )
4759        {
4760            // Exhausted L1 volume: slip remainder by a single price increment
4761            let Some(last_fill_px) = last_fill_px else {
4762                return;
4763            };
4764
4765            let side = order.order_side().as_specified();
4766            let slip_fill_px = match side {
4767                OrderSideSpecified::Buy => last_fill_px.add(self.instrument.price_increment()),
4768                OrderSideSpecified::Sell => last_fill_px.sub(self.instrument.price_increment()),
4769            };
4770
4771            if let Some(protection_price) = protection_price {
4772                let exceeds_boundary = match side {
4773                    OrderSideSpecified::Buy => slip_fill_px.raw > protection_price.raw,
4774                    OrderSideSpecified::Sell => slip_fill_px.raw < protection_price.raw,
4775                };
4776
4777                if exceeds_boundary {
4778                    return;
4779                }
4780            }
4781
4782            let leaves_qty = order.quantity().saturating_sub(total_filled);
4783
4784            self.fill_order(
4785                order,
4786                slip_fill_px,
4787                leaves_qty,
4788                liquidity_side,
4789                venue_position_id,
4790                position,
4791            );
4792        }
4793    }
4794
4795    fn normalize_fill_price(
4796        &self,
4797        fill_px: Price,
4798        client_order_id: ClientOrderId,
4799    ) -> Option<Price> {
4800        let normalized = self.normalize_price_for_current_instrument(fill_px);
4801        if normalized.is_none() {
4802            log::warn!(
4803                "Skipping fill for {client_order_id}: fill price {fill_px} is not compatible \
4804                 with {} price_precision={} price_increment={}",
4805                self.instrument.id(),
4806                self.instrument.price_precision(),
4807                self.instrument.price_increment()
4808            );
4809        }
4810        normalized
4811    }
4812
4813    fn normalize_fill_quantity(
4814        &self,
4815        fill_qty: Quantity,
4816        client_order_id: ClientOrderId,
4817    ) -> Option<Quantity> {
4818        let normalized = self.normalize_quantity_for_current_instrument(fill_qty);
4819        if normalized.is_none() {
4820            log::warn!(
4821                "Skipping fill for {client_order_id}: fill quantity {fill_qty} is not compatible \
4822                 with {} size_precision={}",
4823                self.instrument.id(),
4824                self.instrument.size_precision()
4825            );
4826        }
4827        normalized
4828    }
4829
4830    fn fill_order(
4831        &mut self,
4832        order: &OrderAny,
4833        last_px: Price,
4834        last_qty: Quantity,
4835        liquidity_side: LiquiditySide,
4836        venue_position_id: Option<PositionId>,
4837        _position: Option<&Position>,
4838    ) {
4839        self.check_size_precision(last_qty.precision, "fill quantity")
4840            .unwrap();
4841
4842        let (last_qty, new_filled_qty) =
4843            if let Some(filled_qty) = self.cached_filled_qty.get(&order.client_order_id()) {
4844                let leaves_qty = order.quantity().saturating_sub(*filled_qty);
4845                let last_qty = min(last_qty, leaves_qty);
4846                (last_qty, *filled_qty + last_qty)
4847            } else {
4848                let last_qty = min(last_qty, order.quantity());
4849                (last_qty, last_qty)
4850            };
4851
4852        self.cached_filled_qty
4853            .insert(order.client_order_id(), new_filled_qty);
4854
4855        if last_qty.is_zero() {
4856            return;
4857        }
4858
4859        let underlying_px = self.fee_underlying_price();
4860        let commission = self
4861            .fee_model
4862            .get_commission_with_context(order, last_qty, last_px, &self.instrument, underlying_px)
4863            .unwrap_or_else(|e| {
4864                panic!(
4865                    "Failed to compute commission for {}: {}",
4866                    order.client_order_id(),
4867                    e
4868                );
4869            });
4870
4871        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
4872        self.generate_order_filled(
4873            order,
4874            venue_order_id,
4875            venue_position_id,
4876            last_qty,
4877            last_px,
4878            self.instrument.quote_currency(),
4879            commission,
4880            liquidity_side,
4881        );
4882
4883        let post_fill_filled_qty = self
4884            .cached_filled_qty
4885            .get(&order.client_order_id())
4886            .copied()
4887            .unwrap_or(order.filled_qty());
4888        let post_fill_leaves_qty = order.quantity().saturating_sub(post_fill_filled_qty);
4889        let fully_filled = post_fill_leaves_qty.is_zero();
4890
4891        if order.is_closed() || fully_filled {
4892            if self.core.order_exists(order.client_order_id()) {
4893                let _ = self.core.delete_order(order.client_order_id());
4894            }
4895            // MarketToLimit reads `cached_filled_qty` in its caller to compute leaves;
4896            // its own cleanup happens there after the read.
4897            if order.order_type() != OrderType::MarketToLimit {
4898                self.purge_cached_filled_qty_if_closed(order.client_order_id());
4899            }
4900        }
4901
4902        if !self.config.support_contingent_orders {
4903            return;
4904        }
4905
4906        if let Some(contingency_type) = order.contingency_type() {
4907            match contingency_type {
4908                ContingencyType::Oto => {
4909                    if let Some(linked_orders_ids) = order.linked_order_ids() {
4910                        for client_order_id in linked_orders_ids {
4911                            let mut child_order = match self.cache.borrow().order(client_order_id) {
4912                                Some(child_order) => child_order.clone(),
4913                                None => panic!("Order {client_order_id} not found in cache"),
4914                            };
4915
4916                            if child_order.is_closed() || child_order.is_active_local() {
4917                                continue;
4918                            }
4919
4920                            // Check if we need to index position id
4921                            if let (None, Some(position_id)) =
4922                                (child_order.position_id(), order.position_id())
4923                            {
4924                                self.cache
4925                                    .borrow_mut()
4926                                    .add_position_id(
4927                                        &position_id,
4928                                        &self.venue,
4929                                        client_order_id,
4930                                        &child_order.strategy_id(),
4931                                    )
4932                                    .unwrap();
4933                                log::debug!(
4934                                    "Added position id {position_id} to cache for order {client_order_id}"
4935                                );
4936                            }
4937
4938                            if (!child_order.is_open())
4939                                || (matches!(child_order.status(), OrderStatus::PendingUpdate)
4940                                    && child_order
4941                                        .previous_status()
4942                                        .is_some_and(|s| matches!(s, OrderStatus::Submitted)))
4943                            {
4944                                let account_id = order.account_id().unwrap_or_else(|| {
4945                                    *self.account_ids.get(&order.trader_id()).unwrap_or_else(|| {
4946                                        panic!(
4947                                            "Account ID not found for trader {}",
4948                                            order.trader_id()
4949                                        )
4950                                    })
4951                                });
4952                                self.process_order(&mut child_order, account_id);
4953                            }
4954                        }
4955                    } else {
4956                        log::error!(
4957                            "OTO order {} does not have linked orders",
4958                            order.client_order_id()
4959                        );
4960                    }
4961                }
4962                ContingencyType::Oco => {
4963                    if let Some(linked_orders_ids) = order.linked_order_ids() {
4964                        for client_order_id in linked_orders_ids {
4965                            let child_order = match self.cache.borrow().order(client_order_id) {
4966                                Some(child_order) => child_order.clone(),
4967                                None => panic!("Order {client_order_id} not found in cache"),
4968                            };
4969
4970                            if child_order.is_closed() || child_order.is_active_local() {
4971                                continue;
4972                            }
4973
4974                            self.cancel_order(&child_order, None);
4975                        }
4976                    } else {
4977                        log::error!(
4978                            "OCO order {} does not have linked orders",
4979                            order.client_order_id()
4980                        );
4981                    }
4982                }
4983                ContingencyType::Ouo => {
4984                    if let Some(linked_orders_ids) = order.linked_order_ids() {
4985                        for client_order_id in linked_orders_ids {
4986                            let mut child_order = match self.cache.borrow().order(client_order_id) {
4987                                Some(child_order) => child_order.clone(),
4988                                None => panic!("Order {client_order_id} not found in cache"),
4989                            };
4990
4991                            if child_order.is_active_local() {
4992                                continue;
4993                            }
4994
4995                            let child_filled_qty = self
4996                                .cached_filled_qty
4997                                .get(&child_order.client_order_id())
4998                                .copied()
4999                                .unwrap_or(child_order.filled_qty());
5000
5001                            if post_fill_leaves_qty.is_zero() && child_order.is_open() {
5002                                self.cancel_order(&child_order, None);
5003                            } else if child_order.is_open()
5004                                && child_filled_qty >= post_fill_leaves_qty
5005                            {
5006                                self.cancel_order(&child_order, Some(false));
5007                            } else if !post_fill_leaves_qty.is_zero()
5008                                && post_fill_leaves_qty != child_order.leaves_qty()
5009                            {
5010                                let price = child_order.price();
5011                                let trigger_price = child_order.trigger_price();
5012                                self.update_order(
5013                                    &mut child_order,
5014                                    Some(post_fill_leaves_qty),
5015                                    price,
5016                                    trigger_price,
5017                                    Some(false),
5018                                );
5019                            }
5020                        }
5021                    } else {
5022                        log::error!(
5023                            "OUO order {} does not have linked orders",
5024                            order.client_order_id()
5025                        );
5026                    }
5027                }
5028                _ => {}
5029            }
5030        }
5031    }
5032
5033    fn fee_underlying_price(&self) -> Option<Price> {
5034        if !matches!(
5035            self.instrument,
5036            InstrumentAny::CryptoOption(_) | InstrumentAny::OptionContract(_)
5037        ) {
5038            return None;
5039        }
5040
5041        let underlying = self.instrument.underlying()?;
5042        let underlying_id = InstrumentId::from(format!("{underlying}.{}", self.venue).as_str());
5043        let instrument_id = self.instrument.id();
5044        let cache = self.cache.borrow();
5045        cache
5046            .price(&underlying_id, PriceType::Last)
5047            .or_else(|| cache.price(&underlying_id, PriceType::Mark))
5048            .or_else(|| cache.price(&underlying_id, PriceType::Mid))
5049            .or_else(|| {
5050                cache
5051                    .option_greeks(&instrument_id)
5052                    .and_then(|greeks| greeks.underlying_price)
5053                    .map(|price| Price::new(price, FIXED_PRECISION))
5054            })
5055    }
5056
5057    fn cached_order_is_closed(&self, client_order_id: ClientOrderId) -> bool {
5058        self.cache
5059            .borrow()
5060            .order(&client_order_id)
5061            .is_none_or(|order| order.is_closed())
5062    }
5063
5064    fn purge_cached_filled_qty_if_closed(&mut self, client_order_id: ClientOrderId) {
5065        if self.cached_order_is_closed(client_order_id) {
5066            self.cached_filled_qty.swap_remove(&client_order_id);
5067        }
5068    }
5069
5070    fn purge_closed_cached_filled_qty(&mut self) {
5071        let client_order_ids: Vec<ClientOrderId> = self.cached_filled_qty.keys().copied().collect();
5072
5073        for client_order_id in client_order_ids {
5074            self.purge_cached_filled_qty_if_closed(client_order_id);
5075        }
5076    }
5077
5078    fn update_limit_order(&mut self, order: &OrderAny, quantity: Quantity, price: Price) {
5079        if self
5080            .core
5081            .is_limit_matched(order.order_side_specified(), price)
5082        {
5083            if order.is_post_only() {
5084                self.generate_order_modify_rejected(
5085                    order.trader_id(),
5086                    order.strategy_id(),
5087                    order.instrument_id(),
5088                    order.client_order_id(),
5089                    Ustr::from(format!(
5090                        "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
5091                        order.order_type(),
5092                        order.order_side(),
5093                        price,
5094                        self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
5095                        self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
5096                    ).as_str()),
5097                    order.venue_order_id(),
5098                    order.account_id(),
5099                );
5100                return;
5101            }
5102
5103            self.generate_order_updated(order, quantity, Some(price), None, None);
5104
5105            // Re-read from cache to get the order with events applied
5106            let client_order_id = order.client_order_id();
5107            if let Some(mut order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5108                order.set_liquidity_side(LiquiditySide::Taker);
5109            }
5110            self.fill_limit_order(client_order_id);
5111            return;
5112        }
5113        self.generate_order_updated(order, quantity, Some(price), None, None);
5114    }
5115
5116    fn update_stop_market_order(&self, order: &OrderAny, quantity: Quantity, trigger_price: Price) {
5117        if self
5118            .core
5119            .is_stop_matched(order.order_side_specified(), trigger_price)
5120        {
5121            self.generate_order_modify_rejected(
5122                order.trader_id(),
5123                order.strategy_id(),
5124                order.instrument_id(),
5125                order.client_order_id(),
5126                Ustr::from(
5127                    format!(
5128                        "{} {} order new stop px of {} was in the market: bid={}, ask={}",
5129                        order.order_type(),
5130                        order.order_side(),
5131                        trigger_price,
5132                        self.core
5133                            .bid
5134                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
5135                        self.core
5136                            .ask
5137                            .map_or_else(|| "None".to_string(), |p| p.to_string())
5138                    )
5139                    .as_str(),
5140                ),
5141                order.venue_order_id(),
5142                order.account_id(),
5143            );
5144            return;
5145        }
5146
5147        self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
5148    }
5149
5150    fn update_stop_limit_order(
5151        &mut self,
5152        order: &mut OrderAny,
5153        quantity: Quantity,
5154        price: Price,
5155        trigger_price: Price,
5156    ) {
5157        if order.is_triggered().is_some_and(|t| t) {
5158            // Update limit price
5159            if self
5160                .core
5161                .is_limit_matched(order.order_side_specified(), price)
5162            {
5163                if order.is_post_only() {
5164                    self.generate_order_modify_rejected(
5165                        order.trader_id(),
5166                        order.strategy_id(),
5167                        order.instrument_id(),
5168                        order.client_order_id(),
5169                        Ustr::from(format!(
5170                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
5171                            order.order_type(),
5172                            order.order_side(),
5173                            price,
5174                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
5175                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
5176                        ).as_str()),
5177                        order.venue_order_id(),
5178                        order.account_id(),
5179                    );
5180                    return;
5181                }
5182                self.generate_order_updated(order, quantity, Some(price), None, None);
5183                order.set_liquidity_side(LiquiditySide::Taker);
5184
5185                if let Err(e) = self
5186                    .cache
5187                    .borrow_mut()
5188                    .add_order(order.clone(), None, None, false)
5189                {
5190                    log::debug!("Order already in cache: {e}");
5191                }
5192                self.fill_limit_order(order.client_order_id());
5193                return; // Filled
5194            }
5195        } else {
5196            // Update stop price
5197            if self
5198                .core
5199                .is_stop_matched(order.order_side_specified(), trigger_price)
5200            {
5201                self.generate_order_modify_rejected(
5202                    order.trader_id(),
5203                    order.strategy_id(),
5204                    order.instrument_id(),
5205                    order.client_order_id(),
5206                    Ustr::from(
5207                        format!(
5208                            "{} {} order new stop px of {} was in the market: bid={}, ask={}",
5209                            order.order_type(),
5210                            order.order_side(),
5211                            trigger_price,
5212                            self.core
5213                                .bid
5214                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
5215                            self.core
5216                                .ask
5217                                .map_or_else(|| "None".to_string(), |p| p.to_string())
5218                        )
5219                        .as_str(),
5220                    ),
5221                    order.venue_order_id(),
5222                    order.account_id(),
5223                );
5224                return;
5225            }
5226        }
5227
5228        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
5229    }
5230
5231    fn update_market_if_touched_order(
5232        &self,
5233        order: &OrderAny,
5234        quantity: Quantity,
5235        trigger_price: Price,
5236    ) {
5237        if self
5238            .core
5239            .is_touch_triggered(order.order_side_specified(), trigger_price)
5240        {
5241            self.generate_order_modify_rejected(
5242                order.trader_id(),
5243                order.strategy_id(),
5244                order.instrument_id(),
5245                order.client_order_id(),
5246                Ustr::from(
5247                    format!(
5248                        "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
5249                        order.order_type(),
5250                        order.order_side(),
5251                        trigger_price,
5252                        self.core
5253                            .bid
5254                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
5255                        self.core
5256                            .ask
5257                            .map_or_else(|| "None".to_string(), |p| p.to_string())
5258                    )
5259                    .as_str(),
5260                ),
5261                order.venue_order_id(),
5262                order.account_id(),
5263            );
5264            // Cannot update order
5265            return;
5266        }
5267
5268        self.generate_order_updated(order, quantity, None, Some(trigger_price), None);
5269    }
5270
5271    fn update_limit_if_touched_order(
5272        &mut self,
5273        order: &mut OrderAny,
5274        quantity: Quantity,
5275        price: Price,
5276        trigger_price: Price,
5277    ) {
5278        if order.is_triggered().is_some_and(|t| t) {
5279            // Update limit price
5280            if self
5281                .core
5282                .is_limit_matched(order.order_side_specified(), price)
5283            {
5284                if order.is_post_only() {
5285                    self.generate_order_modify_rejected(
5286                        order.trader_id(),
5287                        order.strategy_id(),
5288                        order.instrument_id(),
5289                        order.client_order_id(),
5290                        Ustr::from(format!(
5291                            "POST_ONLY {} {} order with new limit px of {} would have been a TAKER: bid={}, ask={}",
5292                            order.order_type(),
5293                            order.order_side(),
5294                            price,
5295                            self.core.bid.map_or_else(|| "None".to_string(), |p| p.to_string()),
5296                            self.core.ask.map_or_else(|| "None".to_string(), |p| p.to_string())
5297                        ).as_str()),
5298                        order.venue_order_id(),
5299                        order.account_id(),
5300                    );
5301                    // Cannot update order
5302                    return;
5303                }
5304                self.generate_order_updated(order, quantity, Some(price), None, None);
5305                order.set_liquidity_side(LiquiditySide::Taker);
5306                self.fill_limit_order(order.client_order_id());
5307                return;
5308            }
5309        } else {
5310            // Update trigger price
5311            if self
5312                .core
5313                .is_touch_triggered(order.order_side_specified(), trigger_price)
5314            {
5315                self.generate_order_modify_rejected(
5316                    order.trader_id(),
5317                    order.strategy_id(),
5318                    order.instrument_id(),
5319                    order.client_order_id(),
5320                    Ustr::from(
5321                        format!(
5322                            "{} {} order new trigger px of {} was in the market: bid={}, ask={}",
5323                            order.order_type(),
5324                            order.order_side(),
5325                            trigger_price,
5326                            self.core
5327                                .bid
5328                                .map_or_else(|| "None".to_string(), |p| p.to_string()),
5329                            self.core
5330                                .ask
5331                                .map_or_else(|| "None".to_string(), |p| p.to_string())
5332                        )
5333                        .as_str(),
5334                    ),
5335                    order.venue_order_id(),
5336                    order.account_id(),
5337                );
5338                return;
5339            }
5340        }
5341
5342        self.generate_order_updated(order, quantity, Some(price), Some(trigger_price), None);
5343    }
5344
5345    fn update_trailing_stop_order(&self, order: &OrderAny) {
5346        let (new_trigger_price, new_price) = trailing_stop_calculate(
5347            self.instrument.price_increment(),
5348            order.trigger_price(),
5349            order.activation_price(),
5350            order,
5351            self.core.bid,
5352            self.core.ask,
5353            self.core.last,
5354        )
5355        .unwrap();
5356
5357        if new_trigger_price.is_none() && new_price.is_none() {
5358            return;
5359        }
5360
5361        self.generate_order_updated(order, order.quantity(), new_price, new_trigger_price, None);
5362    }
5363
5364    fn accept_order(&mut self, order: &mut OrderAny) {
5365        if order.is_closed() {
5366            // Temporary guard to prevent invalid processing
5367            return;
5368        }
5369
5370        if order.status() != OrderStatus::Accepted {
5371            let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
5372            let event = self.create_order_accepted(order, venue_order_id);
5373            // Apply locally so `cancel_order` sees `Accepted`,
5374            // dispatch on apply failure so `Released` still registers with the core.
5375            if let Err(e) = order.apply(event.clone()) {
5376                log::warn!(
5377                    "Skipping local apply of accepted event for {}: {e}",
5378                    order.client_order_id(),
5379                );
5380            }
5381            self.dispatch_order_event(event);
5382
5383            // Activate before emitting `OrderUpdated` so `match_info` below
5384            // carries the activation flag.
5385            if matches!(
5386                order.order_type(),
5387                OrderType::TrailingStopLimit | OrderType::TrailingStopMarket
5388            ) && order.trigger_price().is_none()
5389                && self.maybe_activate_trailing_stop(
5390                    order,
5391                    self.core.bid,
5392                    self.core.ask,
5393                    self.core.last,
5394                )
5395            {
5396                self.update_trailing_stop_order(order);
5397            }
5398        }
5399
5400        let match_info = Self::matching_core_entry(order);
5401        self.core.add_order(match_info);
5402    }
5403
5404    fn matching_core_entry(order: &OrderAny) -> RestingOrder {
5405        let triggered_limit_style = matches!(
5406            order.order_type(),
5407            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit
5408        ) && order.is_triggered().is_some_and(|triggered| triggered);
5409
5410        RestingOrder::new(
5411            order.client_order_id(),
5412            order.order_side().as_specified(),
5413            order.order_type(),
5414            if triggered_limit_style {
5415                None
5416            } else {
5417                order.trigger_price()
5418            },
5419            order.price(),
5420            match order {
5421                OrderAny::TrailingStopMarket(o) => o.is_activated,
5422                OrderAny::TrailingStopLimit(o) => o.is_activated,
5423                _ => true,
5424            },
5425        )
5426    }
5427
5428    fn expire_order(&mut self, order: &OrderAny) {
5429        if self.config.support_contingent_orders
5430            && order
5431                .contingency_type()
5432                .is_some_and(|c| c != ContingencyType::NoContingency)
5433        {
5434            self.cancel_contingent_orders(order);
5435        }
5436
5437        self.generate_order_expired(order);
5438    }
5439
5440    fn cancel_order(&mut self, order: &OrderAny, cancel_contingencies: Option<bool>) {
5441        let cancel_contingencies = cancel_contingencies.unwrap_or(true);
5442
5443        if order.is_active_local() {
5444            log::error!(
5445                "Cannot cancel an order with {} from the matching engine",
5446                order.status()
5447            );
5448            return;
5449        }
5450
5451        // Check if order exists in OrderMatching core, and delete it if it does
5452        if self.core.order_exists(order.client_order_id()) {
5453            let _ = self.core.delete_order(order.client_order_id());
5454        }
5455        self.cached_filled_qty.swap_remove(&order.client_order_id());
5456
5457        let venue_order_id = self.ids_generator.get_venue_order_id(order).unwrap();
5458        self.generate_order_canceled(order, venue_order_id);
5459
5460        if self.config.support_contingent_orders
5461            && order.contingency_type().is_some()
5462            && order.contingency_type().unwrap() != ContingencyType::NoContingency
5463            && cancel_contingencies
5464        {
5465            self.cancel_contingent_orders(order);
5466        }
5467    }
5468
5469    fn update_order(
5470        &mut self,
5471        order: &mut OrderAny,
5472        quantity: Option<Quantity>,
5473        price: Option<Price>,
5474        trigger_price: Option<Price>,
5475        update_contingencies: Option<bool>,
5476    ) -> bool {
5477        let update_contingencies = update_contingencies.unwrap_or(true);
5478        let quantity = quantity.unwrap_or(order.quantity());
5479
5480        let price_prec = self.instrument.price_precision();
5481        let size_prec = self.instrument.size_precision();
5482        let instrument_id = self.instrument.id();
5483
5484        if quantity.precision != size_prec {
5485            self.generate_order_modify_rejected(
5486                order.trader_id(),
5487                order.strategy_id(),
5488                order.instrument_id(),
5489                order.client_order_id(),
5490                Ustr::from(&format!(
5491                    "Invalid update quantity precision {}, expected {size_prec} for {instrument_id}",
5492                    quantity.precision
5493                )),
5494                order.venue_order_id(),
5495                order.account_id(),
5496            );
5497            return false;
5498        }
5499
5500        if let Some(px) = price
5501            && px.precision != price_prec
5502        {
5503            self.generate_order_modify_rejected(
5504                order.trader_id(),
5505                order.strategy_id(),
5506                order.instrument_id(),
5507                order.client_order_id(),
5508                Ustr::from(&format!(
5509                    "Invalid update price precision {}, expected {price_prec} for {instrument_id}",
5510                    px.precision
5511                )),
5512                order.venue_order_id(),
5513                order.account_id(),
5514            );
5515            return false;
5516        }
5517
5518        if let Some(tp) = trigger_price
5519            && tp.precision != price_prec
5520        {
5521            self.generate_order_modify_rejected(
5522                order.trader_id(),
5523                order.strategy_id(),
5524                order.instrument_id(),
5525                order.client_order_id(),
5526                Ustr::from(&format!(
5527                    "Invalid update trigger_price precision {}, expected {price_prec} for {instrument_id}",
5528                    tp.precision
5529                )),
5530                order.venue_order_id(),
5531                order.account_id(),
5532            );
5533            return false;
5534        }
5535
5536        // Use cached_filled_qty since PassiveOrderAny in core is not updated with fills
5537        let filled_qty = self
5538            .cached_filled_qty
5539            .get(&order.client_order_id())
5540            .copied()
5541            .unwrap_or(order.filled_qty());
5542        if quantity < filled_qty {
5543            self.generate_order_modify_rejected(
5544                order.trader_id(),
5545                order.strategy_id(),
5546                order.instrument_id(),
5547                order.client_order_id(),
5548                Ustr::from(&format!(
5549                    "Cannot reduce order quantity {quantity} below filled quantity {filled_qty}",
5550                )),
5551                order.venue_order_id(),
5552                order.account_id(),
5553            );
5554            return false;
5555        }
5556
5557        match order {
5558            OrderAny::Limit(_) | OrderAny::MarketToLimit(_) => {
5559                let price = price.unwrap_or(order.price().unwrap());
5560                self.update_limit_order(order, quantity, price);
5561            }
5562            OrderAny::StopMarket(_) => {
5563                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5564                self.update_stop_market_order(order, quantity, trigger_price);
5565            }
5566            OrderAny::StopLimit(_) => {
5567                let price = price.unwrap_or(order.price().unwrap());
5568                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5569                self.update_stop_limit_order(order, quantity, price, trigger_price);
5570            }
5571            OrderAny::MarketIfTouched(_) => {
5572                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5573                self.update_market_if_touched_order(order, quantity, trigger_price);
5574            }
5575            OrderAny::LimitIfTouched(_) => {
5576                let price = price.unwrap_or(order.price().unwrap());
5577                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5578                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
5579            }
5580            OrderAny::TrailingStopMarket(_) => {
5581                let trigger_price = trigger_price.unwrap_or(order.trigger_price().unwrap());
5582                self.update_market_if_touched_order(order, quantity, trigger_price);
5583            }
5584            OrderAny::TrailingStopLimit(trailing_stop_limit_order) => {
5585                let price = price.unwrap_or(trailing_stop_limit_order.price().unwrap());
5586                let trigger_price =
5587                    trigger_price.unwrap_or(trailing_stop_limit_order.trigger_price().unwrap());
5588                self.update_limit_if_touched_order(order, quantity, price, trigger_price);
5589            }
5590            _ => {
5591                panic!(
5592                    "Unsupported order type {} for update_order",
5593                    order.order_type()
5594                );
5595            }
5596        }
5597
5598        // If order now has zero leaves after update, cancel it
5599        let new_leaves_qty = quantity.saturating_sub(filled_qty);
5600        if new_leaves_qty.is_zero() {
5601            if self.config.support_contingent_orders
5602                && order
5603                    .contingency_type()
5604                    .is_some_and(|c| c != ContingencyType::NoContingency)
5605                && update_contingencies
5606            {
5607                self.update_contingent_order(order, quantity);
5608            }
5609            // Pass false since we already handled contingents above
5610            self.cancel_order(order, Some(false));
5611            return true;
5612        }
5613
5614        if self.config.support_contingent_orders
5615            && order
5616                .contingency_type()
5617                .is_some_and(|c| c != ContingencyType::NoContingency)
5618            && update_contingencies
5619        {
5620            self.update_contingent_order(order, quantity);
5621        }
5622
5623        true
5624    }
5625
5626    /// Triggers a stop order, converting it to an active market or limit order.
5627    pub fn trigger_stop_order(&mut self, client_order_id: ClientOrderId) {
5628        let order = match self
5629            .cache
5630            .borrow()
5631            .order(&client_order_id)
5632            .map(|o| o.clone())
5633        {
5634            Some(order) => order,
5635            None => {
5636                log::error!(
5637                    "Cannot trigger stop order: order {client_order_id} not found in cache"
5638                );
5639                return;
5640            }
5641        };
5642
5643        match order.order_type() {
5644            OrderType::StopLimit | OrderType::LimitIfTouched | OrderType::TrailingStopLimit => {
5645                self.trigger_limit_style_stop_order(client_order_id, order);
5646            }
5647            OrderType::StopMarket | OrderType::MarketIfTouched | OrderType::TrailingStopMarket => {
5648                self.fill_market_order(client_order_id);
5649            }
5650            _ => {
5651                log::error!(
5652                    "Cannot trigger stop order: invalid order type {}",
5653                    order.order_type()
5654                );
5655            }
5656        }
5657    }
5658
5659    fn trigger_limit_style_stop_order(&mut self, client_order_id: ClientOrderId, order: OrderAny) {
5660        if order.is_triggered().is_some_and(|triggered| triggered) {
5661            let liquidity_side = match (order.price(), order.trigger_price()) {
5662                (Some(price), Some(trigger_price)) => Self::determine_triggered_limit_liquidity(
5663                    order.order_side(),
5664                    price,
5665                    trigger_price,
5666                ),
5667                _ => LiquiditySide::Maker,
5668            };
5669
5670            if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id)
5671                && !matches!(
5672                    cached_order.liquidity_side(),
5673                    Some(LiquiditySide::Maker | LiquiditySide::Taker)
5674                )
5675            {
5676                cached_order.set_liquidity_side(liquidity_side);
5677            }
5678            self.fill_limit_order(client_order_id);
5679            return;
5680        }
5681
5682        let event = self.create_order_triggered(&order);
5683        let order = match self.cache.borrow_mut().update_order(&event) {
5684            Ok(order) => order,
5685            Err(e) => {
5686                log::debug!(
5687                    "Failed to apply triggered event for {} before fill: {e}",
5688                    order.client_order_id(),
5689                );
5690                order
5691            }
5692        };
5693        self.dispatch_order_event(event);
5694
5695        let trigger_price = order
5696            .trigger_price()
5697            .expect("Limit-style stop order must have a trigger price");
5698        let price = order
5699            .price()
5700            .expect("Limit-style stop order must have a price");
5701
5702        let maker_inside = match order.order_side() {
5703            OrderSide::Buy => self
5704                .core
5705                .ask
5706                .is_some_and(|ask| trigger_price > price && price > ask),
5707            OrderSide::Sell => self
5708                .core
5709                .bid
5710                .is_some_and(|bid| trigger_price < price && price < bid),
5711            OrderSide::NoOrderSide => false,
5712        };
5713
5714        if maker_inside {
5715            if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5716                cached_order.set_liquidity_side(LiquiditySide::Maker);
5717            }
5718            self.resync_core_entry(client_order_id);
5719            self.fill_limit_order(client_order_id);
5720            return;
5721        }
5722
5723        if self
5724            .core
5725            .is_limit_matched(order.order_side_specified(), price)
5726        {
5727            if order.is_post_only() {
5728                let _ = self.core.delete_order(client_order_id);
5729                self.cached_filled_qty.swap_remove(&client_order_id);
5730                let event = self.create_order_rejected(
5731                    &order,
5732                    format!(
5733                        "POST_ONLY {} {} order limit px of {} would have been a TAKER: bid={}, ask={}",
5734                        order.order_type(),
5735                        order.order_side(),
5736                        price,
5737                        self.core
5738                            .bid
5739                            .map_or_else(|| "None".to_string(), |p| p.to_string()),
5740                        self.core
5741                            .ask
5742                            .map_or_else(|| "None".to_string(), |p| p.to_string())
5743                    )
5744                    .into(),
5745                );
5746
5747                if let Err(e) = self.cache.borrow_mut().update_order(&event) {
5748                    log::debug!(
5749                        "Failed to apply rejected event for {} after post-only trigger: {e}",
5750                        order.client_order_id(),
5751                    );
5752                }
5753                self.dispatch_order_event(event);
5754                return;
5755            }
5756
5757            if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5758                cached_order.set_liquidity_side(LiquiditySide::Taker);
5759            }
5760            self.resync_core_entry(client_order_id);
5761            self.fill_limit_order(client_order_id);
5762            return;
5763        }
5764
5765        if let Some(mut cached_order) = self.cache.borrow_mut().order_mut(&client_order_id) {
5766            cached_order.set_liquidity_side(Self::determine_triggered_limit_liquidity(
5767                order.order_side(),
5768                price,
5769                trigger_price,
5770            ));
5771        }
5772        self.resync_core_entry(client_order_id);
5773    }
5774
5775    fn determine_triggered_limit_liquidity(
5776        side: OrderSide,
5777        price: Price,
5778        trigger_price: Price,
5779    ) -> LiquiditySide {
5780        if (side == OrderSide::Buy && trigger_price > price)
5781            || (side == OrderSide::Sell && trigger_price < price)
5782        {
5783            LiquiditySide::Maker
5784        } else {
5785            LiquiditySide::Taker
5786        }
5787    }
5788
5789    fn update_contingent_order(&mut self, order: &OrderAny, parent_quantity: Quantity) {
5790        log::debug!(
5791            "Updating contingent orders from {}",
5792            order.client_order_id()
5793        );
5794
5795        if let Some(linked_order_ids) = order.linked_order_ids() {
5796            let parent_filled_qty = self
5797                .cached_filled_qty
5798                .get(&order.client_order_id())
5799                .copied()
5800                .unwrap_or(order.filled_qty());
5801            let parent_leaves_qty = parent_quantity.saturating_sub(parent_filled_qty);
5802
5803            for client_order_id in linked_order_ids {
5804                let mut child_order = match self.cache.borrow().order(client_order_id) {
5805                    Some(order) => order.clone(),
5806                    None => panic!("Order {client_order_id} not found in cache."),
5807                };
5808
5809                if child_order.is_active_local() {
5810                    continue;
5811                }
5812
5813                let child_filled_qty = self
5814                    .cached_filled_qty
5815                    .get(&child_order.client_order_id())
5816                    .copied()
5817                    .unwrap_or(child_order.filled_qty());
5818
5819                if parent_leaves_qty.is_zero() {
5820                    self.cancel_order(&child_order, Some(false));
5821                } else if child_filled_qty >= parent_leaves_qty {
5822                    // Child already filled beyond parent's remaining qty, cancel it
5823                    self.cancel_order(&child_order, Some(false));
5824                } else {
5825                    let child_leaves_qty = child_order.quantity().saturating_sub(child_filled_qty);
5826                    if child_leaves_qty != parent_leaves_qty {
5827                        let price = child_order.price();
5828                        let trigger_price = child_order.trigger_price();
5829                        self.update_order(
5830                            &mut child_order,
5831                            Some(parent_leaves_qty),
5832                            price,
5833                            trigger_price,
5834                            Some(false),
5835                        );
5836                    }
5837                }
5838            }
5839        }
5840    }
5841
5842    fn cancel_contingent_orders(&mut self, order: &OrderAny) {
5843        if let Some(linked_order_ids) = order.linked_order_ids() {
5844            for client_order_id in linked_order_ids {
5845                let contingent_order = match self.cache.borrow().order(client_order_id) {
5846                    Some(order) => order.clone(),
5847                    None => panic!("Cannot find contingent order for {client_order_id}"),
5848                };
5849
5850                if contingent_order.is_active_local() {
5851                    // order is not on the exchange yet
5852                    continue;
5853                }
5854
5855                if !contingent_order.is_closed() {
5856                    self.cancel_order(&contingent_order, Some(false));
5857                }
5858            }
5859        }
5860    }
5861
5862    fn generate_order_submitted(&self, order: &OrderAny, account_id: AccountId) {
5863        let ts_now = self.clock.borrow().timestamp_ns();
5864        let event = OrderEventAny::Submitted(OrderSubmitted::new(
5865            order.trader_id(),
5866            order.strategy_id(),
5867            order.instrument_id(),
5868            order.client_order_id(),
5869            account_id,
5870            UUID4::new(),
5871            ts_now,
5872            ts_now,
5873        ));
5874        self.dispatch_order_event(event);
5875    }
5876
5877    fn create_order_rejected(&self, order: &OrderAny, reason: Ustr) -> OrderEventAny {
5878        let ts_now = self.clock.borrow().timestamp_ns();
5879        let account_id = order
5880            .account_id()
5881            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
5882
5883        let due_post_only = reason.as_str().starts_with("POST_ONLY");
5884
5885        OrderEventAny::Rejected(OrderRejected::new(
5886            order.trader_id(),
5887            order.strategy_id(),
5888            order.instrument_id(),
5889            order.client_order_id(),
5890            account_id,
5891            reason,
5892            UUID4::new(),
5893            ts_now,
5894            ts_now,
5895            false,
5896            due_post_only,
5897        ))
5898    }
5899
5900    fn generate_order_rejected(&self, order: &OrderAny, reason: Ustr) {
5901        let event = self.create_order_rejected(order, reason);
5902        self.dispatch_order_event(event);
5903    }
5904
5905    fn publish_order_initialized(&self, order: &OrderAny) {
5906        let event = OrderEventAny::Initialized(order.init_event().clone());
5907        msgbus::publish_order_event(
5908            format!("events.order.{}", order.strategy_id()).into(),
5909            &event,
5910        );
5911    }
5912
5913    fn create_order_accepted(
5914        &self,
5915        order: &OrderAny,
5916        venue_order_id: VenueOrderId,
5917    ) -> OrderEventAny {
5918        let ts_now = self.clock.borrow().timestamp_ns();
5919        let account_id = order
5920            .account_id()
5921            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
5922        OrderEventAny::Accepted(OrderAccepted::new(
5923            order.trader_id(),
5924            order.strategy_id(),
5925            order.instrument_id(),
5926            order.client_order_id(),
5927            venue_order_id,
5928            account_id,
5929            UUID4::new(),
5930            ts_now,
5931            ts_now,
5932            false,
5933        ))
5934    }
5935
5936    fn generate_order_accepted(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
5937        let event = self.create_order_accepted(order, venue_order_id);
5938        self.dispatch_order_event(event);
5939    }
5940
5941    #[expect(clippy::too_many_arguments)]
5942    fn generate_order_modify_rejected(
5943        &self,
5944        trader_id: TraderId,
5945        strategy_id: StrategyId,
5946        instrument_id: InstrumentId,
5947        client_order_id: ClientOrderId,
5948        reason: Ustr,
5949        venue_order_id: Option<VenueOrderId>,
5950        account_id: Option<AccountId>,
5951    ) {
5952        let ts_now = self.clock.borrow().timestamp_ns();
5953        let event = OrderEventAny::ModifyRejected(OrderModifyRejected::new(
5954            trader_id,
5955            strategy_id,
5956            instrument_id,
5957            client_order_id,
5958            reason,
5959            UUID4::new(),
5960            ts_now,
5961            ts_now,
5962            false,
5963            venue_order_id,
5964            account_id,
5965        ));
5966        self.dispatch_order_event(event);
5967    }
5968
5969    #[expect(clippy::too_many_arguments)]
5970    fn generate_order_cancel_rejected(
5971        &self,
5972        trader_id: TraderId,
5973        strategy_id: StrategyId,
5974        account_id: AccountId,
5975        instrument_id: InstrumentId,
5976        client_order_id: ClientOrderId,
5977        venue_order_id: Option<VenueOrderId>,
5978        reason: Ustr,
5979    ) {
5980        let ts_now = self.clock.borrow().timestamp_ns();
5981        let event = OrderEventAny::CancelRejected(OrderCancelRejected::new(
5982            trader_id,
5983            strategy_id,
5984            instrument_id,
5985            client_order_id,
5986            reason,
5987            UUID4::new(),
5988            ts_now,
5989            ts_now,
5990            false,
5991            venue_order_id,
5992            Some(account_id),
5993        ));
5994        self.dispatch_order_event(event);
5995    }
5996
5997    fn generate_order_updated(
5998        &self,
5999        order: &OrderAny,
6000        quantity: Quantity,
6001        price: Option<Price>,
6002        trigger_price: Option<Price>,
6003        protection_price: Option<Price>,
6004    ) {
6005        let ts_now = self.clock.borrow().timestamp_ns();
6006        let event = OrderEventAny::Updated(OrderUpdated::new(
6007            order.trader_id(),
6008            order.strategy_id(),
6009            order.instrument_id(),
6010            order.client_order_id(),
6011            quantity,
6012            UUID4::new(),
6013            ts_now,
6014            ts_now,
6015            false,
6016            order.venue_order_id(),
6017            order.account_id(),
6018            price,
6019            trigger_price,
6020            protection_price,
6021            order.is_quote_quantity(),
6022        ));
6023
6024        self.dispatch_order_event(event);
6025    }
6026
6027    fn generate_order_canceled(&self, order: &OrderAny, venue_order_id: VenueOrderId) {
6028        let ts_now = self.clock.borrow().timestamp_ns();
6029        let event = OrderEventAny::Canceled(OrderCanceled::new(
6030            order.trader_id(),
6031            order.strategy_id(),
6032            order.instrument_id(),
6033            order.client_order_id(),
6034            UUID4::new(),
6035            ts_now,
6036            ts_now,
6037            false,
6038            Some(venue_order_id),
6039            order.account_id(),
6040        ));
6041        self.dispatch_order_event(event);
6042    }
6043
6044    fn create_order_triggered(&self, order: &OrderAny) -> OrderEventAny {
6045        let ts_now = self.clock.borrow().timestamp_ns();
6046        OrderEventAny::Triggered(OrderTriggered::new(
6047            order.trader_id(),
6048            order.strategy_id(),
6049            order.instrument_id(),
6050            order.client_order_id(),
6051            UUID4::new(),
6052            ts_now,
6053            ts_now,
6054            false,
6055            order.venue_order_id(),
6056            order.account_id(),
6057        ))
6058    }
6059
6060    fn generate_order_triggered(&self, order: &OrderAny) {
6061        let event = self.create_order_triggered(order);
6062        self.dispatch_order_event(event);
6063    }
6064
6065    fn generate_order_expired(&self, order: &OrderAny) {
6066        let ts_now = self.clock.borrow().timestamp_ns();
6067        let event = OrderEventAny::Expired(OrderExpired::new(
6068            order.trader_id(),
6069            order.strategy_id(),
6070            order.instrument_id(),
6071            order.client_order_id(),
6072            UUID4::new(),
6073            ts_now,
6074            ts_now,
6075            false,
6076            order.venue_order_id(),
6077            order.account_id(),
6078        ));
6079        self.dispatch_order_event(event);
6080    }
6081
6082    #[expect(clippy::too_many_arguments)]
6083    fn generate_order_filled(
6084        &mut self,
6085        order: &OrderAny,
6086        venue_order_id: VenueOrderId,
6087        venue_position_id: Option<PositionId>,
6088        last_qty: Quantity,
6089        last_px: Price,
6090        quote_currency: Currency,
6091        commission: Money,
6092        liquidity_side: LiquiditySide,
6093    ) {
6094        debug_assert!(
6095            last_qty <= order.quantity(),
6096            "Fill quantity {last_qty} exceeds order quantity {order_qty} for {client_order_id}",
6097            order_qty = order.quantity(),
6098            client_order_id = order.client_order_id()
6099        );
6100
6101        let ts_now = self.clock.borrow().timestamp_ns();
6102        let account_id = order
6103            .account_id()
6104            .unwrap_or(self.account_ids.get(&order.trader_id()).unwrap().to_owned());
6105        let event = OrderEventAny::Filled(OrderFilled::new(
6106            order.trader_id(),
6107            order.strategy_id(),
6108            order.instrument_id(),
6109            order.client_order_id(),
6110            venue_order_id,
6111            account_id,
6112            self.ids_generator.generate_trade_id(ts_now),
6113            order.order_side(),
6114            order.order_type(),
6115            last_qty,
6116            last_px,
6117            quote_currency,
6118            liquidity_side,
6119            UUID4::new(),
6120            ts_now,
6121            ts_now,
6122            false,
6123            venue_position_id,
6124            Some(commission),
6125        ));
6126
6127        self.dispatch_order_event(event);
6128    }
6129}
6130
6131#[derive(Debug, Clone, Copy)]
6132struct BarTickSizes {
6133    open: Quantity,
6134    high: Quantity,
6135    low: Quantity,
6136    close: Quantity,
6137}
6138
6139impl BarTickSizes {
6140    fn from_volume(volume: Quantity, size_increment: Quantity) -> Self {
6141        let precision_diff = FIXED_PRECISION.saturating_sub(volume.precision);
6142        let scale = QuantityRaw::pow(10, u32::from(precision_diff));
6143        let units = volume.raw / scale;
6144        let increment_units = (size_increment.raw / scale).max(1);
6145        let rounded_units = (units / increment_units) * increment_units;
6146        let increments = rounded_units / increment_units;
6147        let zero = Quantity::zero(volume.precision);
6148        let size =
6149            |increments| Quantity::from_raw(increments * increment_units * scale, volume.precision);
6150
6151        match increments {
6152            0 => Self {
6153                open: zero,
6154                high: zero,
6155                low: zero,
6156                close: zero,
6157            },
6158            // One increment cannot cover both high and low without exceeding the bar volume.
6159            1 => Self {
6160                open: zero,
6161                high: zero,
6162                low: zero,
6163                close: size(1),
6164            },
6165            2 => Self {
6166                open: zero,
6167                high: size(1),
6168                low: size(1),
6169                close: zero,
6170            },
6171            3 => {
6172                let path_size = size(1);
6173
6174                Self {
6175                    open: path_size,
6176                    high: path_size,
6177                    low: path_size,
6178                    close: zero,
6179                }
6180            }
6181            _ => {
6182                let path_increments = increments / 4;
6183                let close_increments = increments - (path_increments * 3);
6184                let path_size = size(path_increments);
6185
6186                Self {
6187                    open: path_size,
6188                    high: path_size,
6189                    low: path_size,
6190                    close: size(close_increments),
6191                }
6192            }
6193        }
6194    }
6195}
6196
6197////////////////////////////////////////////////////////////////////////////////
6198// Tests
6199////////////////////////////////////////////////////////////////////////////////
6200#[cfg(test)]
6201mod tests {
6202    use nautilus_model::types::{Quantity, fixed::FIXED_PRECISION, quantity::QuantityRaw};
6203    use rstest::rstest;
6204
6205    use super::{BarTickSizes, OrderMatchingEngine};
6206
6207    fn assert_valid_bar_tick_sizes(volume: Quantity, size_increment: Quantity) {
6208        let sizes = BarTickSizes::from_volume(volume, size_increment);
6209        let total_raw = sizes.open.raw + sizes.high.raw + sizes.low.raw + sizes.close.raw;
6210        assert!(total_raw <= volume.raw);
6211
6212        for quantity in [sizes.open, sizes.high, sizes.low, sizes.close] {
6213            assert_eq!(quantity.precision, volume.precision);
6214            assert!(
6215                OrderMatchingEngine::quantity_matches_precision(quantity, volume.precision),
6216                "bar tick quantity {quantity} not aligned to precision {}",
6217                volume.precision,
6218            );
6219            assert!(
6220                size_increment.raw == 0 || quantity.raw.is_multiple_of(size_increment.raw),
6221                "bar tick quantity {quantity} not aligned to increment {size_increment}",
6222            );
6223        }
6224
6225        if size_increment.raw > 0 {
6226            assert!(
6227                volume.raw - total_raw < size_increment.raw,
6228                "bar tick split left {} raw units from volume {volume} and increment {size_increment}",
6229                volume.raw - total_raw,
6230            );
6231        }
6232    }
6233
6234    #[rstest]
6235    fn test_bar_tick_sizes_divisible() {
6236        // precision=3, units=100_000: exactly divisible by 4, no rounding.
6237        let volume = Quantity::from("100.000");
6238        let increment = Quantity::from("0.001");
6239        let sizes = BarTickSizes::from_volume(volume, increment);
6240        assert_eq!(sizes.open, Quantity::from("25.000"));
6241        assert_eq!(sizes.high, Quantity::from("25.000"));
6242        assert_eq!(sizes.low, Quantity::from("25.000"));
6243        assert_eq!(sizes.close, Quantity::from("25.000"));
6244        assert_valid_bar_tick_sizes(volume, increment);
6245    }
6246
6247    #[rstest]
6248    fn test_bar_tick_sizes_indivisible_with_remainder() {
6249        // precision=2, units=5: quarter_units=1, remainder=1; close carries 2 units.
6250        let volume = Quantity::from("0.05");
6251        let increment = Quantity::from("0.01");
6252        let sizes = BarTickSizes::from_volume(volume, increment);
6253        assert_eq!(sizes.open, Quantity::from("0.01"));
6254        assert_eq!(sizes.high, Quantity::from("0.01"));
6255        assert_eq!(sizes.low, Quantity::from("0.01"));
6256        assert_eq!(sizes.close, Quantity::from("0.02"));
6257        assert_valid_bar_tick_sizes(volume, increment);
6258        assert_eq!(
6259            sizes.open.raw + sizes.high.raw + sizes.low.raw + sizes.close.raw,
6260            volume.raw
6261        );
6262    }
6263
6264    #[rstest]
6265    #[case("1", "0", "0", "0", "1")]
6266    #[case("2", "0", "1", "1", "0")]
6267    #[case("3", "1", "1", "1", "0")]
6268    fn test_bar_tick_sizes_units_less_than_four_preserves_volume(
6269        #[case] volume: &str,
6270        #[case] open_size: &str,
6271        #[case] high_size: &str,
6272        #[case] low_size: &str,
6273        #[case] close_size: &str,
6274    ) {
6275        let volume = Quantity::from(volume);
6276        let increment = Quantity::from("1");
6277        let sizes = BarTickSizes::from_volume(volume, increment);
6278
6279        assert_eq!(sizes.open, Quantity::from(open_size));
6280        assert_eq!(sizes.high, Quantity::from(high_size));
6281        assert_eq!(sizes.low, Quantity::from(low_size));
6282        assert_eq!(sizes.close, Quantity::from(close_size));
6283        assert_valid_bar_tick_sizes(volume, increment);
6284        assert_eq!(
6285            sizes.open.raw + sizes.high.raw + sizes.low.raw + sizes.close.raw,
6286            volume.raw
6287        );
6288    }
6289
6290    #[rstest]
6291    fn test_bar_tick_sizes_zero_volume_remains_zero() {
6292        let volume = Quantity::zero(3);
6293        let increment = Quantity::from("0.001");
6294        let sizes = BarTickSizes::from_volume(volume, increment);
6295        assert_eq!(sizes.open, Quantity::zero(3));
6296        assert_eq!(sizes.high, Quantity::zero(3));
6297        assert_eq!(sizes.low, Quantity::zero(3));
6298        assert_eq!(sizes.close, Quantity::zero(3));
6299        assert_valid_bar_tick_sizes(volume, increment);
6300    }
6301
6302    #[rstest]
6303    fn test_bar_tick_sizes_rounds_down_to_size_increment() {
6304        let volume = Quantity::from("1.07");
6305        let increment = Quantity::from("0.10");
6306        let sizes = BarTickSizes::from_volume(volume, increment);
6307        assert_eq!(sizes.open, Quantity::from("0.20"));
6308        assert_eq!(sizes.high, Quantity::from("0.20"));
6309        assert_eq!(sizes.low, Quantity::from("0.20"));
6310        assert_eq!(sizes.close, Quantity::from("0.40"));
6311        assert_valid_bar_tick_sizes(volume, increment);
6312    }
6313
6314    #[rstest]
6315    fn test_bar_tick_sizes_at_fixed_precision() {
6316        // When volume.precision == FIXED_PRECISION the scale is 1 and the formula
6317        // degenerates to a plain raw-space quartering.
6318        let units: QuantityRaw = 17;
6319        let volume = Quantity::from_raw(units, FIXED_PRECISION);
6320        let increment = Quantity::from_raw(1, FIXED_PRECISION);
6321        let sizes = BarTickSizes::from_volume(volume, increment);
6322        assert_eq!(sizes.open.raw, 4);
6323        assert_eq!(sizes.high.raw, 4);
6324        assert_eq!(sizes.low.raw, 4);
6325        assert_eq!(sizes.close.raw, 5);
6326        assert_valid_bar_tick_sizes(volume, increment);
6327    }
6328}