nautilus_backtest/
exchange.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a `SimulatedExchange` venue for backtesting on historical data.
17
18// Under development
19#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23    cell::RefCell,
24    collections::{BinaryHeap, VecDeque},
25    fmt::Debug,
26    rc::Rc,
27};
28
29use ahash::AHashMap;
30use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
31use nautilus_core::{
32    UnixNanos,
33    correctness::{FAILED, check_equal},
34};
35use nautilus_execution::{
36    client::ExecutionClient,
37    matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
38    models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
39};
40use nautilus_model::{
41    accounts::AccountAny,
42    data::{
43        Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
44        QuoteTick, TradeTick,
45    },
46    enums::{AccountType, BookType, OmsType},
47    identifiers::{InstrumentId, Venue},
48    instruments::{Instrument, InstrumentAny},
49    orderbook::OrderBook,
50    orders::PassiveOrderAny,
51    types::{AccountBalance, Currency, Money, Price},
52};
53use rust_decimal::Decimal;
54
55use crate::modules::SimulationModule;
56
57/// Represents commands with simulated network latency in a min-heap priority queue.
58/// The commands are ordered by timestamp for FIFO processing, with the
59/// earliest timestamp having the highest priority in the queue.
60#[derive(Debug, Eq, PartialEq)]
61struct InflightCommand {
62    timestamp: UnixNanos,
63    counter: u32,
64    command: TradingCommand,
65}
66
67impl InflightCommand {
68    const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
69        Self {
70            timestamp,
71            counter,
72            command,
73        }
74    }
75}
76
77impl Ord for InflightCommand {
78    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
79        // Reverse ordering for min-heap (earliest timestamp first then lowest counter)
80        other
81            .timestamp
82            .cmp(&self.timestamp)
83            .then_with(|| other.counter.cmp(&self.counter))
84    }
85}
86
87impl PartialOrd for InflightCommand {
88    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
89        Some(self.cmp(other))
90    }
91}
92
93/// Simulated exchange venue for realistic trading execution during backtesting.
94///
95/// The `SimulatedExchange` provides a comprehensive simulation of a trading venue,
96/// including order matching engines, account management, and realistic execution
97/// models. It maintains order books, processes market data, and executes trades
98/// with configurable latency and fill models to accurately simulate real market
99/// conditions during backtesting.
100///
101/// Key features:
102/// - Multi-instrument order matching with realistic execution
103/// - Configurable fee, fill, and latency models
104/// - Support for various order types and execution options
105/// - Account balance and position management
106/// - Market data processing and order book maintenance
107/// - Simulation modules for custom venue behaviors
108pub struct SimulatedExchange {
109    pub id: Venue,
110    pub oms_type: OmsType,
111    pub account_type: AccountType,
112    starting_balances: Vec<Money>,
113    book_type: BookType,
114    default_leverage: Decimal,
115    exec_client: Option<Rc<dyn ExecutionClient>>,
116    pub base_currency: Option<Currency>,
117    fee_model: FeeModelAny,
118    fill_model: FillModel,
119    latency_model: Option<Box<dyn LatencyModel>>,
120    instruments: AHashMap<InstrumentId, InstrumentAny>,
121    matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
122    leverages: AHashMap<InstrumentId, Decimal>,
123    modules: Vec<Box<dyn SimulationModule>>,
124    clock: Rc<RefCell<dyn Clock>>,
125    cache: Rc<RefCell<Cache>>,
126    message_queue: VecDeque<TradingCommand>,
127    inflight_queue: BinaryHeap<InflightCommand>,
128    inflight_counter: AHashMap<UnixNanos, u32>,
129    bar_execution: bool,
130    trade_execution: bool,
131    liquidity_consumption: bool,
132    reject_stop_orders: bool,
133    support_gtd_orders: bool,
134    support_contingent_orders: bool,
135    use_position_ids: bool,
136    use_random_ids: bool,
137    use_reduce_only: bool,
138    use_message_queue: bool,
139    allow_cash_borrowing: bool,
140    frozen_account: bool,
141    price_protection_points: u32,
142}
143
144impl Debug for SimulatedExchange {
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        f.debug_struct(stringify!(SimulatedExchange))
147            .field("id", &self.id)
148            .field("account_type", &self.account_type)
149            .finish()
150    }
151}
152
153impl SimulatedExchange {
154    /// Creates a new [`SimulatedExchange`] instance.
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if:
159    /// - `starting_balances` is empty.
160    /// - `base_currency` is `Some` but `starting_balances` contains multiple currencies.
161    #[allow(clippy::too_many_arguments)]
162    pub fn new(
163        venue: Venue,
164        oms_type: OmsType,
165        account_type: AccountType,
166        starting_balances: Vec<Money>,
167        base_currency: Option<Currency>,
168        default_leverage: Decimal,
169        leverages: AHashMap<InstrumentId, Decimal>,
170        modules: Vec<Box<dyn SimulationModule>>,
171        cache: Rc<RefCell<Cache>>,
172        clock: Rc<RefCell<dyn Clock>>,
173        fill_model: FillModel,
174        fee_model: FeeModelAny,
175        book_type: BookType,
176        latency_model: Option<Box<dyn LatencyModel>>,
177        bar_execution: Option<bool>,
178        trade_execution: Option<bool>,
179        liquidity_consumption: Option<bool>,
180        reject_stop_orders: Option<bool>,
181        support_gtd_orders: Option<bool>,
182        support_contingent_orders: Option<bool>,
183        use_position_ids: Option<bool>,
184        use_random_ids: Option<bool>,
185        use_reduce_only: Option<bool>,
186        use_message_queue: Option<bool>,
187        allow_cash_borrowing: Option<bool>,
188        frozen_account: Option<bool>,
189        price_protection_points: Option<u32>,
190    ) -> anyhow::Result<Self> {
191        if starting_balances.is_empty() {
192            anyhow::bail!("Starting balances must be provided")
193        }
194        if base_currency.is_some() && starting_balances.len() > 1 {
195            anyhow::bail!("single-currency account has multiple starting currencies")
196        }
197        // TODO register and load modules
198        Ok(Self {
199            id: venue,
200            oms_type,
201            account_type,
202            starting_balances,
203            book_type,
204            default_leverage,
205            exec_client: None,
206            base_currency,
207            fee_model,
208            fill_model,
209            latency_model,
210            instruments: AHashMap::new(),
211            matching_engines: AHashMap::new(),
212            leverages,
213            modules,
214            clock,
215            cache,
216            message_queue: VecDeque::new(),
217            inflight_queue: BinaryHeap::new(),
218            inflight_counter: AHashMap::new(),
219            bar_execution: bar_execution.unwrap_or(true),
220            trade_execution: trade_execution.unwrap_or(true),
221            liquidity_consumption: liquidity_consumption.unwrap_or(true),
222            reject_stop_orders: reject_stop_orders.unwrap_or(true),
223            support_gtd_orders: support_gtd_orders.unwrap_or(true),
224            support_contingent_orders: support_contingent_orders.unwrap_or(true),
225            use_position_ids: use_position_ids.unwrap_or(true),
226            use_random_ids: use_random_ids.unwrap_or(false),
227            use_reduce_only: use_reduce_only.unwrap_or(true),
228            use_message_queue: use_message_queue.unwrap_or(true),
229            allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
230            frozen_account: frozen_account.unwrap_or(false),
231            price_protection_points: price_protection_points.unwrap_or(0),
232        })
233    }
234
235    pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
236        self.exec_client = Some(client);
237    }
238
239    pub fn set_fill_model(&mut self, fill_model: FillModel) {
240        for matching_engine in self.matching_engines.values_mut() {
241            matching_engine.set_fill_model(fill_model.clone());
242            log::info!(
243                "Setting fill model for {} to {}",
244                matching_engine.venue,
245                self.fill_model
246            );
247        }
248        self.fill_model = fill_model;
249    }
250
251    pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
252        self.latency_model = Some(latency_model);
253    }
254
255    pub fn initialize_account(&mut self) {
256        self.generate_fresh_account_state();
257    }
258
259    /// Adds an instrument to the simulated exchange and initializes its matching engine.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the exchange account type is `Cash` and the instrument is a `CryptoPerpetual` or `CryptoFuture`.
264    ///
265    /// # Panics
266    ///
267    /// Panics if the instrument cannot be added to the exchange.
268    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
269        check_equal(
270            &instrument.id().venue,
271            &self.id,
272            "Venue of instrument id",
273            "Venue of simulated exchange",
274        )
275        .expect(FAILED);
276
277        if self.account_type == AccountType::Cash
278            && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
279                || matches!(instrument, InstrumentAny::CryptoFuture(_)))
280        {
281            anyhow::bail!("Cash account cannot trade futures or perpetuals")
282        }
283
284        self.instruments.insert(instrument.id(), instrument.clone());
285
286        let price_protection = if self.price_protection_points == 0 {
287            None
288        } else {
289            Some(self.price_protection_points)
290        };
291
292        let matching_engine_config = OrderMatchingEngineConfig::new(
293            self.bar_execution,
294            self.trade_execution,
295            self.liquidity_consumption,
296            self.reject_stop_orders,
297            self.support_gtd_orders,
298            self.support_contingent_orders,
299            self.use_position_ids,
300            self.use_random_ids,
301            self.use_reduce_only,
302        )
303        .with_price_protection_points(price_protection);
304        let instrument_id = instrument.id();
305        let matching_engine = OrderMatchingEngine::new(
306            instrument,
307            self.instruments.len() as u32,
308            self.fill_model.clone(),
309            self.fee_model.clone(),
310            self.book_type,
311            self.oms_type,
312            self.account_type,
313            self.clock.clone(),
314            Rc::clone(&self.cache),
315            matching_engine_config,
316        );
317        self.matching_engines.insert(instrument_id, matching_engine);
318
319        log::info!("Added instrument {instrument_id} and created matching engine");
320        Ok(())
321    }
322
323    #[must_use]
324    pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
325        self.matching_engines
326            .get(&instrument_id)
327            .and_then(OrderMatchingEngine::best_bid_price)
328    }
329
330    #[must_use]
331    pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
332        self.matching_engines
333            .get(&instrument_id)
334            .and_then(OrderMatchingEngine::best_ask_price)
335    }
336
337    pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
338        self.matching_engines
339            .get(&instrument_id)
340            .map(OrderMatchingEngine::get_book)
341    }
342
343    #[must_use]
344    pub fn get_matching_engine(
345        &self,
346        instrument_id: &InstrumentId,
347    ) -> Option<&OrderMatchingEngine> {
348        self.matching_engines.get(instrument_id)
349    }
350
351    #[must_use]
352    pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
353        &self.matching_engines
354    }
355
356    #[must_use]
357    pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
358        let mut books = AHashMap::new();
359        for (instrument_id, matching_engine) in &self.matching_engines {
360            books.insert(*instrument_id, matching_engine.get_book().clone());
361        }
362        books
363    }
364
365    #[must_use]
366    pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
367        instrument_id
368            .and_then(|id| {
369                self.matching_engines
370                    .get(&id)
371                    .map(OrderMatchingEngine::get_open_orders)
372            })
373            .unwrap_or_else(|| {
374                self.matching_engines
375                    .values()
376                    .flat_map(OrderMatchingEngine::get_open_orders)
377                    .collect()
378            })
379    }
380
381    #[must_use]
382    pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
383        instrument_id
384            .and_then(|id| {
385                self.matching_engines
386                    .get(&id)
387                    .map(|engine| engine.get_open_bid_orders().to_vec())
388            })
389            .unwrap_or_else(|| {
390                self.matching_engines
391                    .values()
392                    .flat_map(|engine| engine.get_open_bid_orders().to_vec())
393                    .collect()
394            })
395    }
396
397    #[must_use]
398    pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
399        instrument_id
400            .and_then(|id| {
401                self.matching_engines
402                    .get(&id)
403                    .map(|engine| engine.get_open_ask_orders().to_vec())
404            })
405            .unwrap_or_else(|| {
406                self.matching_engines
407                    .values()
408                    .flat_map(|engine| engine.get_open_ask_orders().to_vec())
409                    .collect()
410            })
411    }
412
413    /// # Panics
414    ///
415    /// Panics if retrieving the account from the execution client fails.
416    #[must_use]
417    pub fn get_account(&self) -> Option<AccountAny> {
418        self.exec_client
419            .as_ref()
420            .map(|client| client.get_account().unwrap())
421    }
422
423    /// # Panics
424    ///
425    /// Panics if generating account state fails during adjustment.
426    pub fn adjust_account(&mut self, adjustment: Money) {
427        if self.frozen_account {
428            // Nothing to adjust
429            return;
430        }
431
432        if let Some(exec_client) = &self.exec_client {
433            let venue = exec_client.venue();
434            println!("Adjusting account for venue {venue}");
435            if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
436                match account.balance(Some(adjustment.currency)) {
437                    Some(balance) => {
438                        let mut current_balance = *balance;
439                        current_balance.total += adjustment;
440                        current_balance.free += adjustment;
441
442                        let margins = match account {
443                            AccountAny::Margin(margin_account) => margin_account.margins.clone(),
444                            _ => AHashMap::new(),
445                        };
446
447                        if let Some(exec_client) = &self.exec_client {
448                            exec_client
449                                .generate_account_state(
450                                    vec![current_balance],
451                                    margins.values().copied().collect(),
452                                    true,
453                                    self.clock.borrow().timestamp_ns(),
454                                )
455                                .unwrap();
456                        }
457                    }
458                    None => {
459                        log::error!(
460                            "Cannot adjust account: no balance for currency {}",
461                            adjustment.currency
462                        );
463                    }
464                }
465            } else {
466                log::error!("Cannot adjust account: no account for venue {venue}");
467            }
468        }
469    }
470
471    pub fn send(&mut self, command: TradingCommand) {
472        if !self.use_message_queue {
473            self.process_trading_command(command);
474        } else if self.latency_model.is_none() {
475            self.message_queue.push_back(command);
476        } else {
477            let (timestamp, counter) = self.generate_inflight_command(&command);
478            self.inflight_queue
479                .push(InflightCommand::new(timestamp, counter, command));
480        }
481    }
482
483    /// # Panics
484    ///
485    /// Panics if the command is invalid when generating inflight command.
486    pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
487        if let Some(latency_model) = &self.latency_model {
488            let ts = match command {
489                TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
490                    command.ts_init() + latency_model.get_insert_latency()
491                }
492                TradingCommand::ModifyOrder(_) => {
493                    command.ts_init() + latency_model.get_update_latency()
494                }
495                TradingCommand::CancelOrder(_)
496                | TradingCommand::CancelAllOrders(_)
497                | TradingCommand::BatchCancelOrders(_) => {
498                    command.ts_init() + latency_model.get_delete_latency()
499                }
500                _ => panic!("Cannot handle command: {command:?}"),
501            };
502
503            let counter = self
504                .inflight_counter
505                .entry(ts)
506                .and_modify(|e| *e += 1)
507                .or_insert(1);
508
509            (ts, *counter)
510        } else {
511            panic!("Latency model should be initialized");
512        }
513    }
514
515    /// # Panics
516    ///
517    /// Panics if adding a missing instrument during delta processing fails.
518    pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
519        for module in &self.modules {
520            module.pre_process(Data::Delta(delta));
521        }
522
523        if !self.matching_engines.contains_key(&delta.instrument_id) {
524            let instrument = {
525                let cache = self.cache.as_ref().borrow();
526                cache.instrument(&delta.instrument_id).cloned()
527            };
528
529            if let Some(instrument) = instrument {
530                self.add_instrument(instrument).unwrap();
531            } else {
532                panic!(
533                    "No matching engine found for instrument {}",
534                    delta.instrument_id
535                );
536            }
537        }
538
539        if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
540            matching_engine.process_order_book_delta(&delta).unwrap();
541        } else {
542            panic!("Matching engine should be initialized");
543        }
544    }
545
546    /// # Panics
547    ///
548    /// Panics if adding a missing instrument during deltas processing fails.
549    pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
550        for module in &self.modules {
551            module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
552        }
553
554        if !self.matching_engines.contains_key(&deltas.instrument_id) {
555            let instrument = {
556                let cache = self.cache.as_ref().borrow();
557                cache.instrument(&deltas.instrument_id).cloned()
558            };
559
560            if let Some(instrument) = instrument {
561                self.add_instrument(instrument).unwrap();
562            } else {
563                panic!(
564                    "No matching engine found for instrument {}",
565                    deltas.instrument_id
566                );
567            }
568        }
569
570        if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
571            matching_engine.process_order_book_deltas(&deltas).unwrap();
572        } else {
573            panic!("Matching engine should be initialized");
574        }
575    }
576
577    /// # Panics
578    ///
579    /// Panics if adding a missing instrument during quote tick processing fails.
580    pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
581        for module in &self.modules {
582            module.pre_process(Data::Quote(quote.to_owned()));
583        }
584
585        if !self.matching_engines.contains_key(&quote.instrument_id) {
586            let instrument = {
587                let cache = self.cache.as_ref().borrow();
588                cache.instrument(&quote.instrument_id).cloned()
589            };
590
591            if let Some(instrument) = instrument {
592                self.add_instrument(instrument).unwrap();
593            } else {
594                panic!(
595                    "No matching engine found for instrument {}",
596                    quote.instrument_id
597                );
598            }
599        }
600
601        if let Some(matching_engine) = self.matching_engines.get_mut(&quote.instrument_id) {
602            matching_engine.process_quote_tick(quote);
603        } else {
604            panic!("Matching engine should be initialized");
605        }
606    }
607
608    /// # Panics
609    ///
610    /// Panics if adding a missing instrument during trade tick processing fails.
611    pub fn process_trade_tick(&mut self, trade: &TradeTick) {
612        for module in &self.modules {
613            module.pre_process(Data::Trade(trade.to_owned()));
614        }
615
616        if !self.matching_engines.contains_key(&trade.instrument_id) {
617            let instrument = {
618                let cache = self.cache.as_ref().borrow();
619                cache.instrument(&trade.instrument_id).cloned()
620            };
621
622            if let Some(instrument) = instrument {
623                self.add_instrument(instrument).unwrap();
624            } else {
625                panic!(
626                    "No matching engine found for instrument {}",
627                    trade.instrument_id
628                );
629            }
630        }
631
632        if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
633            matching_engine.process_trade_tick(trade);
634        } else {
635            panic!("Matching engine should be initialized");
636        }
637    }
638
639    /// # Panics
640    ///
641    /// Panics if adding a missing instrument during bar processing fails.
642    pub fn process_bar(&mut self, bar: Bar) {
643        for module in &self.modules {
644            module.pre_process(Data::Bar(bar));
645        }
646
647        if !self.matching_engines.contains_key(&bar.instrument_id()) {
648            let instrument = {
649                let cache = self.cache.as_ref().borrow();
650                cache.instrument(&bar.instrument_id()).cloned()
651            };
652
653            if let Some(instrument) = instrument {
654                self.add_instrument(instrument).unwrap();
655            } else {
656                panic!(
657                    "No matching engine found for instrument {}",
658                    bar.instrument_id()
659                );
660            }
661        }
662
663        if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
664            matching_engine.process_bar(&bar);
665        } else {
666            panic!("Matching engine should be initialized");
667        }
668    }
669
670    /// # Panics
671    ///
672    /// Panics if adding a missing instrument during instrument status processing fails.
673    pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
674        // TODO add module preprocessing
675
676        if !self.matching_engines.contains_key(&status.instrument_id) {
677            let instrument = {
678                let cache = self.cache.as_ref().borrow();
679                cache.instrument(&status.instrument_id).cloned()
680            };
681
682            if let Some(instrument) = instrument {
683                self.add_instrument(instrument).unwrap();
684            } else {
685                panic!(
686                    "No matching engine found for instrument {}",
687                    status.instrument_id
688                );
689            }
690        }
691
692        if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
693            matching_engine.process_status(status.action);
694        } else {
695            panic!("Matching engine should be initialized");
696        }
697    }
698
699    /// # Panics
700    ///
701    /// Panics if popping an inflight command fails during processing.
702    pub fn process(&mut self, ts_now: UnixNanos) {
703        // TODO implement correct clock fixed time setting self.clock.set_time(ts_now);
704
705        // Process inflight commands
706        while let Some(inflight) = self.inflight_queue.peek() {
707            if inflight.timestamp > ts_now {
708                // Future commands remain in the queue
709                break;
710            }
711            // We get the inflight command, remove it from the queue and process it
712            let inflight = self.inflight_queue.pop().unwrap();
713            self.process_trading_command(inflight.command);
714        }
715
716        // Process regular message queue
717        while let Some(command) = self.message_queue.pop_front() {
718            self.process_trading_command(command);
719        }
720    }
721
722    pub fn reset(&mut self) {
723        for module in &self.modules {
724            module.reset();
725        }
726
727        self.generate_fresh_account_state();
728
729        for matching_engine in self.matching_engines.values_mut() {
730            matching_engine.reset();
731        }
732
733        // TODO Clear the inflight and message queues
734        log::info!("Resetting exchange state");
735    }
736
737    /// # Panics
738    ///
739    /// Panics if execution client is uninitialized when processing trading command.
740    pub fn process_trading_command(&mut self, command: TradingCommand) {
741        if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
742            let account_id = if let Some(exec_client) = &self.exec_client {
743                exec_client.account_id()
744            } else {
745                panic!("Execution client should be initialized");
746            };
747            match command {
748                TradingCommand::SubmitOrder(mut command) => {
749                    matching_engine.process_order(&mut command.order, account_id);
750                }
751                TradingCommand::ModifyOrder(ref command) => {
752                    matching_engine.process_modify(command, account_id);
753                }
754                TradingCommand::CancelOrder(ref command) => {
755                    matching_engine.process_cancel(command, account_id);
756                }
757                TradingCommand::CancelAllOrders(ref command) => {
758                    matching_engine.process_cancel_all(command, account_id);
759                }
760                TradingCommand::BatchCancelOrders(ref command) => {
761                    matching_engine.process_batch_cancel(command, account_id);
762                }
763                TradingCommand::SubmitOrderList(mut command) => {
764                    for order in &mut command.order_list.orders {
765                        matching_engine.process_order(order, account_id);
766                    }
767                }
768                _ => {}
769            }
770        } else {
771            panic!(
772                "Matching engine not found for instrument {}",
773                command.instrument_id()
774            );
775        }
776    }
777
778    /// # Panics
779    ///
780    /// Panics if generating fresh account state fails.
781    pub fn generate_fresh_account_state(&self) {
782        let balances: Vec<AccountBalance> = self
783            .starting_balances
784            .iter()
785            .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
786            .collect();
787
788        if let Some(exec_client) = &self.exec_client {
789            exec_client
790                .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
791                .unwrap();
792        }
793
794        // Set leverages
795        if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
796            margin_account.set_default_leverage(self.default_leverage);
797
798            // Set instrument specific leverages
799            for (instrument_id, leverage) in &self.leverages {
800                margin_account.set_leverage(*instrument_id, *leverage);
801            }
802        }
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use std::{cell::RefCell, collections::BinaryHeap, rc::Rc};
809
810    use ahash::AHashMap;
811    use nautilus_common::{
812        cache::Cache,
813        clock::TestClock,
814        messages::execution::{SubmitOrder, TradingCommand},
815        msgbus::{
816            self,
817            stubs::{get_message_saving_handler, get_saved_messages},
818        },
819    };
820    use nautilus_core::{UUID4, UnixNanos};
821    use nautilus_execution::models::{
822        fee::{FeeModelAny, MakerTakerFeeModel},
823        fill::FillModel,
824        latency::StaticLatencyModel,
825    };
826    use nautilus_model::{
827        accounts::{AccountAny, MarginAccount},
828        data::{
829            Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
830            TradeTick,
831        },
832        enums::{
833            AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
834            OmsType, OrderSide, OrderType,
835        },
836        events::AccountState,
837        identifiers::{AccountId, InstrumentId, StrategyId, TradeId, TraderId, Venue},
838        instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
839        orders::OrderTestBuilder,
840        stubs::TestDefault,
841        types::{AccountBalance, Currency, Money, Price, Quantity},
842    };
843    use rstest::rstest;
844
845    use crate::{
846        exchange::{InflightCommand, SimulatedExchange},
847        execution_client::BacktestExecutionClient,
848    };
849
850    fn get_exchange(
851        venue: Venue,
852        account_type: AccountType,
853        book_type: BookType,
854        cache: Option<Rc<RefCell<Cache>>>,
855    ) -> Rc<RefCell<SimulatedExchange>> {
856        let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
857        let clock = Rc::new(RefCell::new(TestClock::new()));
858        let exchange = Rc::new(RefCell::new(
859            SimulatedExchange::new(
860                venue,
861                OmsType::Netting,
862                account_type,
863                vec![Money::new(1000.0, Currency::USD())],
864                None,
865                1.into(),
866                AHashMap::new(),
867                vec![],
868                cache.clone(),
869                clock,
870                FillModel::default(),
871                FeeModelAny::MakerTaker(MakerTakerFeeModel),
872                book_type,
873                None, // latency_model
874                None, // bar_execution
875                None, // trade_execution
876                None, // liquidity_consumption
877                None, // reject_stop_orders
878                None, // support_gtd_orders
879                None, // support_contingent_orders
880                None, // use_position_ids
881                None, // use_random_ids
882                None, // use_reduce_only
883                None, // use_message_queue
884                None, // allow_cash_borrowing
885                None, // frozen_account
886                None, // price_protection_points
887            )
888            .unwrap(),
889        ));
890
891        let clock = TestClock::new();
892        let execution_client = BacktestExecutionClient::new(
893            TraderId::test_default(),
894            AccountId::test_default(),
895            exchange.clone(),
896            cache,
897            Rc::new(RefCell::new(clock)),
898            None,
899            None,
900        );
901        exchange
902            .borrow_mut()
903            .register_client(Rc::new(execution_client));
904
905        exchange
906    }
907
908    fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
909        let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
910        let order = OrderTestBuilder::new(OrderType::Market)
911            .instrument_id(instrument_id)
912            .quantity(Quantity::from(1))
913            .build();
914        TradingCommand::SubmitOrder(SubmitOrder::new(
915            TraderId::test_default(),
916            None,
917            StrategyId::test_default(),
918            instrument_id,
919            order,
920            None,
921            None,
922            None, // params
923            UUID4::default(),
924            ts_init,
925        ))
926    }
927
928    #[rstest]
929    #[should_panic(
930        expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
931    )]
932    fn test_venue_mismatch_between_exchange_and_instrument(
933        crypto_perpetual_ethusdt: CryptoPerpetual,
934    ) {
935        let exchange = get_exchange(
936            Venue::new("SIM"),
937            AccountType::Margin,
938            BookType::L1_MBP,
939            None,
940        );
941        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
942        exchange.borrow_mut().add_instrument(instrument).unwrap();
943    }
944
945    #[rstest]
946    #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
947    fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
948        let exchange = get_exchange(
949            Venue::new("BINANCE"),
950            AccountType::Cash,
951            BookType::L1_MBP,
952            None,
953        );
954        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
955        exchange.borrow_mut().add_instrument(instrument).unwrap();
956    }
957
958    #[rstest]
959    fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
960        let exchange = get_exchange(
961            Venue::new("BINANCE"),
962            AccountType::Margin,
963            BookType::L1_MBP,
964            None,
965        );
966        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
967
968        // register instrument
969        exchange.borrow_mut().add_instrument(instrument).unwrap();
970
971        // process tick
972        let quote_tick = QuoteTick::new(
973            crypto_perpetual_ethusdt.id,
974            Price::from("1000.00"),
975            Price::from("1001.00"),
976            Quantity::from("1.000"),
977            Quantity::from("1.000"),
978            UnixNanos::default(),
979            UnixNanos::default(),
980        );
981        exchange.borrow_mut().process_quote_tick(&quote_tick);
982
983        let best_bid_price = exchange
984            .borrow()
985            .best_bid_price(crypto_perpetual_ethusdt.id);
986        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
987        let best_ask_price = exchange
988            .borrow()
989            .best_ask_price(crypto_perpetual_ethusdt.id);
990        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
991    }
992
993    #[rstest]
994    fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
995        let exchange = get_exchange(
996            Venue::new("BINANCE"),
997            AccountType::Margin,
998            BookType::L1_MBP,
999            None,
1000        );
1001        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1002
1003        // register instrument
1004        exchange.borrow_mut().add_instrument(instrument).unwrap();
1005
1006        // process tick
1007        let trade_tick = TradeTick::new(
1008            crypto_perpetual_ethusdt.id,
1009            Price::from("1000.00"),
1010            Quantity::from("1.000"),
1011            AggressorSide::Buyer,
1012            TradeId::from("1"),
1013            UnixNanos::default(),
1014            UnixNanos::default(),
1015        );
1016        exchange.borrow_mut().process_trade_tick(&trade_tick);
1017
1018        let best_bid_price = exchange
1019            .borrow()
1020            .best_bid_price(crypto_perpetual_ethusdt.id);
1021        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1022        let best_ask = exchange
1023            .borrow()
1024            .best_ask_price(crypto_perpetual_ethusdt.id);
1025        assert_eq!(best_ask, Some(Price::from("1000.00")));
1026    }
1027
1028    #[rstest]
1029    fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1030        let exchange = get_exchange(
1031            Venue::new("BINANCE"),
1032            AccountType::Margin,
1033            BookType::L1_MBP,
1034            None,
1035        );
1036        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1037
1038        // register instrument
1039        exchange.borrow_mut().add_instrument(instrument).unwrap();
1040
1041        // process bar
1042        let bar = Bar::new(
1043            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1044            Price::from("1500.00"),
1045            Price::from("1505.00"),
1046            Price::from("1490.00"),
1047            Price::from("1502.00"),
1048            Quantity::from("100.000"),
1049            UnixNanos::default(),
1050            UnixNanos::default(),
1051        );
1052        exchange.borrow_mut().process_bar(bar);
1053
1054        // this will be processed as ticks so both bid and ask will be the same as close of the bar
1055        let best_bid_price = exchange
1056            .borrow()
1057            .best_bid_price(crypto_perpetual_ethusdt.id);
1058        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1059        let best_ask_price = exchange
1060            .borrow()
1061            .best_ask_price(crypto_perpetual_ethusdt.id);
1062        assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1063    }
1064
1065    #[rstest]
1066    fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1067        let exchange = get_exchange(
1068            Venue::new("BINANCE"),
1069            AccountType::Margin,
1070            BookType::L1_MBP,
1071            None,
1072        );
1073        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1074
1075        // register instrument
1076        exchange.borrow_mut().add_instrument(instrument).unwrap();
1077
1078        // create both bid and ask based bars
1079        // add +1 on ask to make sure it is different from bid
1080        let bar_bid = Bar::new(
1081            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1082            Price::from("1500.00"),
1083            Price::from("1505.00"),
1084            Price::from("1490.00"),
1085            Price::from("1502.00"),
1086            Quantity::from("100.000"),
1087            UnixNanos::from(1),
1088            UnixNanos::from(1),
1089        );
1090        let bar_ask = Bar::new(
1091            BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1092            Price::from("1501.00"),
1093            Price::from("1506.00"),
1094            Price::from("1491.00"),
1095            Price::from("1503.00"),
1096            Quantity::from("100.000"),
1097            UnixNanos::from(1),
1098            UnixNanos::from(1),
1099        );
1100
1101        // process them
1102        exchange.borrow_mut().process_bar(bar_bid);
1103        exchange.borrow_mut().process_bar(bar_ask);
1104
1105        // current bid and ask prices will be the corresponding close of the ask and bid bar
1106        let best_bid_price = exchange
1107            .borrow()
1108            .best_bid_price(crypto_perpetual_ethusdt.id);
1109        assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1110        let best_ask_price = exchange
1111            .borrow()
1112            .best_ask_price(crypto_perpetual_ethusdt.id);
1113        assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1114    }
1115
1116    #[rstest]
1117    fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1118        let exchange = get_exchange(
1119            Venue::new("BINANCE"),
1120            AccountType::Margin,
1121            BookType::L2_MBP,
1122            None,
1123        );
1124        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1125
1126        // register instrument
1127        exchange.borrow_mut().add_instrument(instrument).unwrap();
1128
1129        // create order book delta at both bid and ask with incremented ts init and sequence
1130        let delta_buy = OrderBookDelta::new(
1131            crypto_perpetual_ethusdt.id,
1132            BookAction::Add,
1133            BookOrder::new(
1134                OrderSide::Buy,
1135                Price::from("1000.00"),
1136                Quantity::from("1.000"),
1137                1,
1138            ),
1139            0,
1140            0,
1141            UnixNanos::from(1),
1142            UnixNanos::from(1),
1143        );
1144        let delta_sell = OrderBookDelta::new(
1145            crypto_perpetual_ethusdt.id,
1146            BookAction::Add,
1147            BookOrder::new(
1148                OrderSide::Sell,
1149                Price::from("1001.00"),
1150                Quantity::from("1.000"),
1151                1,
1152            ),
1153            0,
1154            1,
1155            UnixNanos::from(2),
1156            UnixNanos::from(2),
1157        );
1158
1159        // process both deltas
1160        exchange.borrow_mut().process_order_book_delta(delta_buy);
1161        exchange.borrow_mut().process_order_book_delta(delta_sell);
1162
1163        let book = exchange
1164            .borrow()
1165            .get_book(crypto_perpetual_ethusdt.id)
1166            .unwrap()
1167            .clone();
1168        assert_eq!(book.update_count, 2);
1169        assert_eq!(book.sequence, 1);
1170        assert_eq!(book.ts_last, UnixNanos::from(2));
1171        let best_bid_price = exchange
1172            .borrow()
1173            .best_bid_price(crypto_perpetual_ethusdt.id);
1174        assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1175        let best_ask_price = exchange
1176            .borrow()
1177            .best_ask_price(crypto_perpetual_ethusdt.id);
1178        assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1179    }
1180
1181    #[rstest]
1182    fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1183        let exchange = get_exchange(
1184            Venue::new("BINANCE"),
1185            AccountType::Margin,
1186            BookType::L2_MBP,
1187            None,
1188        );
1189        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1190
1191        // register instrument
1192        exchange.borrow_mut().add_instrument(instrument).unwrap();
1193
1194        // create two sell order book deltas with same timestamps and higher sequence
1195        let delta_sell_1 = OrderBookDelta::new(
1196            crypto_perpetual_ethusdt.id,
1197            BookAction::Add,
1198            BookOrder::new(
1199                OrderSide::Sell,
1200                Price::from("1000.00"),
1201                Quantity::from("3.000"),
1202                1,
1203            ),
1204            0,
1205            0,
1206            UnixNanos::from(1),
1207            UnixNanos::from(1),
1208        );
1209        let delta_sell_2 = OrderBookDelta::new(
1210            crypto_perpetual_ethusdt.id,
1211            BookAction::Add,
1212            BookOrder::new(
1213                OrderSide::Sell,
1214                Price::from("1001.00"),
1215                Quantity::from("1.000"),
1216                1,
1217            ),
1218            0,
1219            1,
1220            UnixNanos::from(1),
1221            UnixNanos::from(1),
1222        );
1223        let orderbook_deltas = OrderBookDeltas::new(
1224            crypto_perpetual_ethusdt.id,
1225            vec![delta_sell_1, delta_sell_2],
1226        );
1227
1228        // process both deltas
1229        exchange
1230            .borrow_mut()
1231            .process_order_book_deltas(orderbook_deltas);
1232
1233        let book = exchange
1234            .borrow()
1235            .get_book(crypto_perpetual_ethusdt.id)
1236            .unwrap()
1237            .clone();
1238        assert_eq!(book.update_count, 2);
1239        assert_eq!(book.sequence, 1);
1240        assert_eq!(book.ts_last, UnixNanos::from(1));
1241        let best_bid_price = exchange
1242            .borrow()
1243            .best_bid_price(crypto_perpetual_ethusdt.id);
1244        // no bid orders in orderbook deltas
1245        assert_eq!(best_bid_price, None);
1246        let best_ask_price = exchange
1247            .borrow()
1248            .best_ask_price(crypto_perpetual_ethusdt.id);
1249        // best ask price is the first order in orderbook deltas
1250        assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1251    }
1252
1253    #[rstest]
1254    fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1255        let exchange = get_exchange(
1256            Venue::new("BINANCE"),
1257            AccountType::Margin,
1258            BookType::L2_MBP,
1259            None,
1260        );
1261        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1262
1263        // register instrument
1264        exchange.borrow_mut().add_instrument(instrument).unwrap();
1265
1266        let instrument_status = InstrumentStatus::new(
1267            crypto_perpetual_ethusdt.id,
1268            MarketStatusAction::Close, // close the market
1269            UnixNanos::from(1),
1270            UnixNanos::from(1),
1271            None,
1272            None,
1273            None,
1274            None,
1275            None,
1276        );
1277
1278        exchange
1279            .borrow_mut()
1280            .process_instrument_status(instrument_status);
1281
1282        let market_status = exchange
1283            .borrow()
1284            .get_matching_engine(&crypto_perpetual_ethusdt.id)
1285            .unwrap()
1286            .market_status;
1287        assert_eq!(market_status, MarketStatus::Closed);
1288    }
1289
1290    #[rstest]
1291    fn test_accounting() {
1292        let account_type = AccountType::Margin;
1293        let mut cache = Cache::default();
1294        let handler = get_message_saving_handler::<AccountState>(None);
1295        msgbus::register("Portfolio.update_account".into(), handler.clone());
1296        let margin_account = MarginAccount::new(
1297            AccountState::new(
1298                AccountId::from("SIM-001"),
1299                account_type,
1300                vec![AccountBalance::new(
1301                    Money::from("1000 USD"),
1302                    Money::from("0 USD"),
1303                    Money::from("1000 USD"),
1304                )],
1305                vec![],
1306                false,
1307                UUID4::default(),
1308                UnixNanos::default(),
1309                UnixNanos::default(),
1310                None,
1311            ),
1312            false,
1313        );
1314        let () = cache
1315            .add_account(AccountAny::Margin(margin_account))
1316            .unwrap();
1317        // build indexes
1318        cache.build_index();
1319
1320        let exchange = get_exchange(
1321            Venue::new("SIM"),
1322            account_type,
1323            BookType::L2_MBP,
1324            Some(Rc::new(RefCell::new(cache))),
1325        );
1326        exchange.borrow_mut().initialize_account();
1327
1328        // Test adjust account, increase balance by 500 USD
1329        exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1330
1331        // Check if we received two messages, one for initial account state and one for adjusted account state
1332        let messages = get_saved_messages::<AccountState>(handler);
1333        assert_eq!(messages.len(), 2);
1334        let account_state_first = messages.first().unwrap();
1335        let account_state_second = messages.last().unwrap();
1336
1337        assert_eq!(account_state_first.balances.len(), 1);
1338        let current_balance = account_state_first.balances[0];
1339        assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1340        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1341        assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1342
1343        assert_eq!(account_state_second.balances.len(), 1);
1344        let current_balance = account_state_second.balances[0];
1345        assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1346        assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1347        assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1348    }
1349
1350    #[rstest]
1351    fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1352        // Create 3 inflight commands with different timestamps and counters
1353        let inflight1 = InflightCommand::new(
1354            UnixNanos::from(100),
1355            1,
1356            create_submit_order_command(UnixNanos::from(100)),
1357        );
1358        let inflight2 = InflightCommand::new(
1359            UnixNanos::from(200),
1360            2,
1361            create_submit_order_command(UnixNanos::from(200)),
1362        );
1363        let inflight3 = InflightCommand::new(
1364            UnixNanos::from(100),
1365            2,
1366            create_submit_order_command(UnixNanos::from(100)),
1367        );
1368
1369        // Create a binary heap and push the inflight commands
1370        let mut inflight_heap = BinaryHeap::new();
1371        inflight_heap.push(inflight1);
1372        inflight_heap.push(inflight2);
1373        inflight_heap.push(inflight3);
1374
1375        // Pop the inflight commands and check if they are in the correct order
1376        // by our custom ordering with counter and timestamp
1377        let first = inflight_heap.pop().unwrap();
1378        let second = inflight_heap.pop().unwrap();
1379        let third = inflight_heap.pop().unwrap();
1380
1381        assert_eq!(first.timestamp, UnixNanos::from(100));
1382        assert_eq!(first.counter, 1);
1383        assert_eq!(second.timestamp, UnixNanos::from(100));
1384        assert_eq!(second.counter, 2);
1385        assert_eq!(third.timestamp, UnixNanos::from(200));
1386        assert_eq!(third.counter, 2);
1387    }
1388
1389    #[rstest]
1390    fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1391        let exchange = get_exchange(
1392            Venue::new("BINANCE"),
1393            AccountType::Margin,
1394            BookType::L2_MBP,
1395            None,
1396        );
1397
1398        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1399        exchange.borrow_mut().add_instrument(instrument).unwrap();
1400
1401        let command1 = create_submit_order_command(UnixNanos::from(100));
1402        let command2 = create_submit_order_command(UnixNanos::from(200));
1403
1404        exchange.borrow_mut().send(command1);
1405        exchange.borrow_mut().send(command2);
1406
1407        // Verify that message queue has 2 commands and inflight queue is empty
1408        // as we are not using latency model
1409        assert_eq!(exchange.borrow().message_queue.len(), 2);
1410        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1411
1412        // Process command and check that queues is empty
1413        exchange.borrow_mut().process(UnixNanos::from(300));
1414        assert_eq!(exchange.borrow().message_queue.len(), 0);
1415        assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1416    }
1417
1418    #[rstest]
1419    fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1420        // StaticLatencyModel adds base_latency to each operation latency
1421        // base=100, insert=200 -> effective insert latency = 300
1422        let latency_model = StaticLatencyModel::new(
1423            UnixNanos::from(100),
1424            UnixNanos::from(200),
1425            UnixNanos::from(300),
1426            UnixNanos::from(100),
1427        );
1428        let exchange = get_exchange(
1429            Venue::new("BINANCE"),
1430            AccountType::Margin,
1431            BookType::L2_MBP,
1432            None,
1433        );
1434        exchange
1435            .borrow_mut()
1436            .set_latency_model(Box::new(latency_model));
1437
1438        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1439        exchange.borrow_mut().add_instrument(instrument).unwrap();
1440
1441        let command1 = create_submit_order_command(UnixNanos::from(100));
1442        let command2 = create_submit_order_command(UnixNanos::from(150));
1443        exchange.borrow_mut().send(command1);
1444        exchange.borrow_mut().send(command2);
1445
1446        // Verify that inflight queue has 2 commands and message queue is empty
1447        assert_eq!(exchange.borrow().message_queue.len(), 0);
1448        assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1449        // First inflight command: ts_init=100 + effective_insert_latency=300 = 400
1450        assert_eq!(
1451            exchange
1452                .borrow()
1453                .inflight_queue
1454                .iter()
1455                .next()
1456                .unwrap()
1457                .timestamp,
1458            UnixNanos::from(400)
1459        );
1460        // Second inflight command: ts_init=150 + effective_insert_latency=300 = 450
1461        assert_eq!(
1462            exchange
1463                .borrow()
1464                .inflight_queue
1465                .iter()
1466                .nth(1)
1467                .unwrap()
1468                .timestamp,
1469            UnixNanos::from(450)
1470        );
1471
1472        // Process at timestamp 420, and test that only first command is processed
1473        exchange.borrow_mut().process(UnixNanos::from(420));
1474        assert_eq!(exchange.borrow().message_queue.len(), 0);
1475        assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1476        assert_eq!(
1477            exchange
1478                .borrow()
1479                .inflight_queue
1480                .iter()
1481                .next()
1482                .unwrap()
1483                .timestamp,
1484            UnixNanos::from(450)
1485        );
1486    }
1487}