1#![allow(dead_code)]
20#![allow(unused_variables)]
21
22use std::{
23 cell::RefCell,
24 collections::{BinaryHeap, VecDeque},
25 fmt::Debug,
26 rc::Rc,
27};
28
29use ahash::AHashMap;
30use nautilus_common::{cache::Cache, clock::Clock, messages::execution::TradingCommand};
31use nautilus_core::{
32 UnixNanos,
33 correctness::{FAILED, check_equal},
34};
35use nautilus_execution::{
36 client::ExecutionClient,
37 matching_engine::{config::OrderMatchingEngineConfig, engine::OrderMatchingEngine},
38 models::{fee::FeeModelAny, fill::FillModel, latency::LatencyModel},
39};
40use nautilus_model::{
41 accounts::AccountAny,
42 data::{
43 Bar, Data, InstrumentStatus, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API,
44 QuoteTick, TradeTick,
45 },
46 enums::{AccountType, BookType, OmsType},
47 identifiers::{InstrumentId, Venue},
48 instruments::{Instrument, InstrumentAny},
49 orderbook::OrderBook,
50 orders::PassiveOrderAny,
51 types::{AccountBalance, Currency, Money, Price},
52};
53use rust_decimal::Decimal;
54
55use crate::modules::SimulationModule;
56
57#[derive(Debug, Eq, PartialEq)]
61struct InflightCommand {
62 timestamp: UnixNanos,
63 counter: u32,
64 command: TradingCommand,
65}
66
67impl InflightCommand {
68 const fn new(timestamp: UnixNanos, counter: u32, command: TradingCommand) -> Self {
69 Self {
70 timestamp,
71 counter,
72 command,
73 }
74 }
75}
76
77impl Ord for InflightCommand {
78 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
79 other
81 .timestamp
82 .cmp(&self.timestamp)
83 .then_with(|| other.counter.cmp(&self.counter))
84 }
85}
86
87impl PartialOrd for InflightCommand {
88 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
89 Some(self.cmp(other))
90 }
91}
92
93pub struct SimulatedExchange {
109 pub id: Venue,
110 pub oms_type: OmsType,
111 pub account_type: AccountType,
112 starting_balances: Vec<Money>,
113 book_type: BookType,
114 default_leverage: Decimal,
115 exec_client: Option<Rc<dyn ExecutionClient>>,
116 pub base_currency: Option<Currency>,
117 fee_model: FeeModelAny,
118 fill_model: FillModel,
119 latency_model: Option<Box<dyn LatencyModel>>,
120 instruments: AHashMap<InstrumentId, InstrumentAny>,
121 matching_engines: AHashMap<InstrumentId, OrderMatchingEngine>,
122 leverages: AHashMap<InstrumentId, Decimal>,
123 modules: Vec<Box<dyn SimulationModule>>,
124 clock: Rc<RefCell<dyn Clock>>,
125 cache: Rc<RefCell<Cache>>,
126 message_queue: VecDeque<TradingCommand>,
127 inflight_queue: BinaryHeap<InflightCommand>,
128 inflight_counter: AHashMap<UnixNanos, u32>,
129 bar_execution: bool,
130 trade_execution: bool,
131 liquidity_consumption: bool,
132 reject_stop_orders: bool,
133 support_gtd_orders: bool,
134 support_contingent_orders: bool,
135 use_position_ids: bool,
136 use_random_ids: bool,
137 use_reduce_only: bool,
138 use_message_queue: bool,
139 allow_cash_borrowing: bool,
140 frozen_account: bool,
141 price_protection_points: u32,
142}
143
144impl Debug for SimulatedExchange {
145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct(stringify!(SimulatedExchange))
147 .field("id", &self.id)
148 .field("account_type", &self.account_type)
149 .finish()
150 }
151}
152
153impl SimulatedExchange {
154 #[allow(clippy::too_many_arguments)]
162 pub fn new(
163 venue: Venue,
164 oms_type: OmsType,
165 account_type: AccountType,
166 starting_balances: Vec<Money>,
167 base_currency: Option<Currency>,
168 default_leverage: Decimal,
169 leverages: AHashMap<InstrumentId, Decimal>,
170 modules: Vec<Box<dyn SimulationModule>>,
171 cache: Rc<RefCell<Cache>>,
172 clock: Rc<RefCell<dyn Clock>>,
173 fill_model: FillModel,
174 fee_model: FeeModelAny,
175 book_type: BookType,
176 latency_model: Option<Box<dyn LatencyModel>>,
177 bar_execution: Option<bool>,
178 trade_execution: Option<bool>,
179 liquidity_consumption: Option<bool>,
180 reject_stop_orders: Option<bool>,
181 support_gtd_orders: Option<bool>,
182 support_contingent_orders: Option<bool>,
183 use_position_ids: Option<bool>,
184 use_random_ids: Option<bool>,
185 use_reduce_only: Option<bool>,
186 use_message_queue: Option<bool>,
187 allow_cash_borrowing: Option<bool>,
188 frozen_account: Option<bool>,
189 price_protection_points: Option<u32>,
190 ) -> anyhow::Result<Self> {
191 if starting_balances.is_empty() {
192 anyhow::bail!("Starting balances must be provided")
193 }
194 if base_currency.is_some() && starting_balances.len() > 1 {
195 anyhow::bail!("single-currency account has multiple starting currencies")
196 }
197 Ok(Self {
199 id: venue,
200 oms_type,
201 account_type,
202 starting_balances,
203 book_type,
204 default_leverage,
205 exec_client: None,
206 base_currency,
207 fee_model,
208 fill_model,
209 latency_model,
210 instruments: AHashMap::new(),
211 matching_engines: AHashMap::new(),
212 leverages,
213 modules,
214 clock,
215 cache,
216 message_queue: VecDeque::new(),
217 inflight_queue: BinaryHeap::new(),
218 inflight_counter: AHashMap::new(),
219 bar_execution: bar_execution.unwrap_or(true),
220 trade_execution: trade_execution.unwrap_or(true),
221 liquidity_consumption: liquidity_consumption.unwrap_or(true),
222 reject_stop_orders: reject_stop_orders.unwrap_or(true),
223 support_gtd_orders: support_gtd_orders.unwrap_or(true),
224 support_contingent_orders: support_contingent_orders.unwrap_or(true),
225 use_position_ids: use_position_ids.unwrap_or(true),
226 use_random_ids: use_random_ids.unwrap_or(false),
227 use_reduce_only: use_reduce_only.unwrap_or(true),
228 use_message_queue: use_message_queue.unwrap_or(true),
229 allow_cash_borrowing: allow_cash_borrowing.unwrap_or(false),
230 frozen_account: frozen_account.unwrap_or(false),
231 price_protection_points: price_protection_points.unwrap_or(0),
232 })
233 }
234
235 pub fn register_client(&mut self, client: Rc<dyn ExecutionClient>) {
236 self.exec_client = Some(client);
237 }
238
239 pub fn set_fill_model(&mut self, fill_model: FillModel) {
240 for matching_engine in self.matching_engines.values_mut() {
241 matching_engine.set_fill_model(fill_model.clone());
242 log::info!(
243 "Setting fill model for {} to {}",
244 matching_engine.venue,
245 self.fill_model
246 );
247 }
248 self.fill_model = fill_model;
249 }
250
251 pub fn set_latency_model(&mut self, latency_model: Box<dyn LatencyModel>) {
252 self.latency_model = Some(latency_model);
253 }
254
255 pub fn initialize_account(&mut self) {
256 self.generate_fresh_account_state();
257 }
258
259 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
269 check_equal(
270 &instrument.id().venue,
271 &self.id,
272 "Venue of instrument id",
273 "Venue of simulated exchange",
274 )
275 .expect(FAILED);
276
277 if self.account_type == AccountType::Cash
278 && (matches!(instrument, InstrumentAny::CryptoPerpetual(_))
279 || matches!(instrument, InstrumentAny::CryptoFuture(_)))
280 {
281 anyhow::bail!("Cash account cannot trade futures or perpetuals")
282 }
283
284 self.instruments.insert(instrument.id(), instrument.clone());
285
286 let price_protection = if self.price_protection_points == 0 {
287 None
288 } else {
289 Some(self.price_protection_points)
290 };
291
292 let matching_engine_config = OrderMatchingEngineConfig::new(
293 self.bar_execution,
294 self.trade_execution,
295 self.liquidity_consumption,
296 self.reject_stop_orders,
297 self.support_gtd_orders,
298 self.support_contingent_orders,
299 self.use_position_ids,
300 self.use_random_ids,
301 self.use_reduce_only,
302 )
303 .with_price_protection_points(price_protection);
304 let instrument_id = instrument.id();
305 let matching_engine = OrderMatchingEngine::new(
306 instrument,
307 self.instruments.len() as u32,
308 self.fill_model.clone(),
309 self.fee_model.clone(),
310 self.book_type,
311 self.oms_type,
312 self.account_type,
313 self.clock.clone(),
314 Rc::clone(&self.cache),
315 matching_engine_config,
316 );
317 self.matching_engines.insert(instrument_id, matching_engine);
318
319 log::info!("Added instrument {instrument_id} and created matching engine");
320 Ok(())
321 }
322
323 #[must_use]
324 pub fn best_bid_price(&self, instrument_id: InstrumentId) -> Option<Price> {
325 self.matching_engines
326 .get(&instrument_id)
327 .and_then(OrderMatchingEngine::best_bid_price)
328 }
329
330 #[must_use]
331 pub fn best_ask_price(&self, instrument_id: InstrumentId) -> Option<Price> {
332 self.matching_engines
333 .get(&instrument_id)
334 .and_then(OrderMatchingEngine::best_ask_price)
335 }
336
337 pub fn get_book(&self, instrument_id: InstrumentId) -> Option<&OrderBook> {
338 self.matching_engines
339 .get(&instrument_id)
340 .map(OrderMatchingEngine::get_book)
341 }
342
343 #[must_use]
344 pub fn get_matching_engine(
345 &self,
346 instrument_id: &InstrumentId,
347 ) -> Option<&OrderMatchingEngine> {
348 self.matching_engines.get(instrument_id)
349 }
350
351 #[must_use]
352 pub const fn get_matching_engines(&self) -> &AHashMap<InstrumentId, OrderMatchingEngine> {
353 &self.matching_engines
354 }
355
356 #[must_use]
357 pub fn get_books(&self) -> AHashMap<InstrumentId, OrderBook> {
358 let mut books = AHashMap::new();
359 for (instrument_id, matching_engine) in &self.matching_engines {
360 books.insert(*instrument_id, matching_engine.get_book().clone());
361 }
362 books
363 }
364
365 #[must_use]
366 pub fn get_open_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
367 instrument_id
368 .and_then(|id| {
369 self.matching_engines
370 .get(&id)
371 .map(OrderMatchingEngine::get_open_orders)
372 })
373 .unwrap_or_else(|| {
374 self.matching_engines
375 .values()
376 .flat_map(OrderMatchingEngine::get_open_orders)
377 .collect()
378 })
379 }
380
381 #[must_use]
382 pub fn get_open_bid_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
383 instrument_id
384 .and_then(|id| {
385 self.matching_engines
386 .get(&id)
387 .map(|engine| engine.get_open_bid_orders().to_vec())
388 })
389 .unwrap_or_else(|| {
390 self.matching_engines
391 .values()
392 .flat_map(|engine| engine.get_open_bid_orders().to_vec())
393 .collect()
394 })
395 }
396
397 #[must_use]
398 pub fn get_open_ask_orders(&self, instrument_id: Option<InstrumentId>) -> Vec<PassiveOrderAny> {
399 instrument_id
400 .and_then(|id| {
401 self.matching_engines
402 .get(&id)
403 .map(|engine| engine.get_open_ask_orders().to_vec())
404 })
405 .unwrap_or_else(|| {
406 self.matching_engines
407 .values()
408 .flat_map(|engine| engine.get_open_ask_orders().to_vec())
409 .collect()
410 })
411 }
412
413 #[must_use]
417 pub fn get_account(&self) -> Option<AccountAny> {
418 self.exec_client
419 .as_ref()
420 .map(|client| client.get_account().unwrap())
421 }
422
423 pub fn adjust_account(&mut self, adjustment: Money) {
427 if self.frozen_account {
428 return;
430 }
431
432 if let Some(exec_client) = &self.exec_client {
433 let venue = exec_client.venue();
434 println!("Adjusting account for venue {venue}");
435 if let Some(account) = self.cache.borrow().account_for_venue(&venue) {
436 match account.balance(Some(adjustment.currency)) {
437 Some(balance) => {
438 let mut current_balance = *balance;
439 current_balance.total += adjustment;
440 current_balance.free += adjustment;
441
442 let margins = match account {
443 AccountAny::Margin(margin_account) => margin_account.margins.clone(),
444 _ => AHashMap::new(),
445 };
446
447 if let Some(exec_client) = &self.exec_client {
448 exec_client
449 .generate_account_state(
450 vec![current_balance],
451 margins.values().copied().collect(),
452 true,
453 self.clock.borrow().timestamp_ns(),
454 )
455 .unwrap();
456 }
457 }
458 None => {
459 log::error!(
460 "Cannot adjust account: no balance for currency {}",
461 adjustment.currency
462 );
463 }
464 }
465 } else {
466 log::error!("Cannot adjust account: no account for venue {venue}");
467 }
468 }
469 }
470
471 pub fn send(&mut self, command: TradingCommand) {
472 if !self.use_message_queue {
473 self.process_trading_command(command);
474 } else if self.latency_model.is_none() {
475 self.message_queue.push_back(command);
476 } else {
477 let (timestamp, counter) = self.generate_inflight_command(&command);
478 self.inflight_queue
479 .push(InflightCommand::new(timestamp, counter, command));
480 }
481 }
482
483 pub fn generate_inflight_command(&mut self, command: &TradingCommand) -> (UnixNanos, u32) {
487 if let Some(latency_model) = &self.latency_model {
488 let ts = match command {
489 TradingCommand::SubmitOrder(_) | TradingCommand::SubmitOrderList(_) => {
490 command.ts_init() + latency_model.get_insert_latency()
491 }
492 TradingCommand::ModifyOrder(_) => {
493 command.ts_init() + latency_model.get_update_latency()
494 }
495 TradingCommand::CancelOrder(_)
496 | TradingCommand::CancelAllOrders(_)
497 | TradingCommand::BatchCancelOrders(_) => {
498 command.ts_init() + latency_model.get_delete_latency()
499 }
500 _ => panic!("Cannot handle command: {command:?}"),
501 };
502
503 let counter = self
504 .inflight_counter
505 .entry(ts)
506 .and_modify(|e| *e += 1)
507 .or_insert(1);
508
509 (ts, *counter)
510 } else {
511 panic!("Latency model should be initialized");
512 }
513 }
514
515 pub fn process_order_book_delta(&mut self, delta: OrderBookDelta) {
519 for module in &self.modules {
520 module.pre_process(Data::Delta(delta));
521 }
522
523 if !self.matching_engines.contains_key(&delta.instrument_id) {
524 let instrument = {
525 let cache = self.cache.as_ref().borrow();
526 cache.instrument(&delta.instrument_id).cloned()
527 };
528
529 if let Some(instrument) = instrument {
530 self.add_instrument(instrument).unwrap();
531 } else {
532 panic!(
533 "No matching engine found for instrument {}",
534 delta.instrument_id
535 );
536 }
537 }
538
539 if let Some(matching_engine) = self.matching_engines.get_mut(&delta.instrument_id) {
540 matching_engine.process_order_book_delta(&delta).unwrap();
541 } else {
542 panic!("Matching engine should be initialized");
543 }
544 }
545
546 pub fn process_order_book_deltas(&mut self, deltas: OrderBookDeltas) {
550 for module in &self.modules {
551 module.pre_process(Data::Deltas(OrderBookDeltas_API::new(deltas.clone())));
552 }
553
554 if !self.matching_engines.contains_key(&deltas.instrument_id) {
555 let instrument = {
556 let cache = self.cache.as_ref().borrow();
557 cache.instrument(&deltas.instrument_id).cloned()
558 };
559
560 if let Some(instrument) = instrument {
561 self.add_instrument(instrument).unwrap();
562 } else {
563 panic!(
564 "No matching engine found for instrument {}",
565 deltas.instrument_id
566 );
567 }
568 }
569
570 if let Some(matching_engine) = self.matching_engines.get_mut(&deltas.instrument_id) {
571 matching_engine.process_order_book_deltas(&deltas).unwrap();
572 } else {
573 panic!("Matching engine should be initialized");
574 }
575 }
576
577 pub fn process_quote_tick(&mut self, quote: &QuoteTick) {
581 for module in &self.modules {
582 module.pre_process(Data::Quote(quote.to_owned()));
583 }
584
585 if !self.matching_engines.contains_key("e.instrument_id) {
586 let instrument = {
587 let cache = self.cache.as_ref().borrow();
588 cache.instrument("e.instrument_id).cloned()
589 };
590
591 if let Some(instrument) = instrument {
592 self.add_instrument(instrument).unwrap();
593 } else {
594 panic!(
595 "No matching engine found for instrument {}",
596 quote.instrument_id
597 );
598 }
599 }
600
601 if let Some(matching_engine) = self.matching_engines.get_mut("e.instrument_id) {
602 matching_engine.process_quote_tick(quote);
603 } else {
604 panic!("Matching engine should be initialized");
605 }
606 }
607
608 pub fn process_trade_tick(&mut self, trade: &TradeTick) {
612 for module in &self.modules {
613 module.pre_process(Data::Trade(trade.to_owned()));
614 }
615
616 if !self.matching_engines.contains_key(&trade.instrument_id) {
617 let instrument = {
618 let cache = self.cache.as_ref().borrow();
619 cache.instrument(&trade.instrument_id).cloned()
620 };
621
622 if let Some(instrument) = instrument {
623 self.add_instrument(instrument).unwrap();
624 } else {
625 panic!(
626 "No matching engine found for instrument {}",
627 trade.instrument_id
628 );
629 }
630 }
631
632 if let Some(matching_engine) = self.matching_engines.get_mut(&trade.instrument_id) {
633 matching_engine.process_trade_tick(trade);
634 } else {
635 panic!("Matching engine should be initialized");
636 }
637 }
638
639 pub fn process_bar(&mut self, bar: Bar) {
643 for module in &self.modules {
644 module.pre_process(Data::Bar(bar));
645 }
646
647 if !self.matching_engines.contains_key(&bar.instrument_id()) {
648 let instrument = {
649 let cache = self.cache.as_ref().borrow();
650 cache.instrument(&bar.instrument_id()).cloned()
651 };
652
653 if let Some(instrument) = instrument {
654 self.add_instrument(instrument).unwrap();
655 } else {
656 panic!(
657 "No matching engine found for instrument {}",
658 bar.instrument_id()
659 );
660 }
661 }
662
663 if let Some(matching_engine) = self.matching_engines.get_mut(&bar.instrument_id()) {
664 matching_engine.process_bar(&bar);
665 } else {
666 panic!("Matching engine should be initialized");
667 }
668 }
669
670 pub fn process_instrument_status(&mut self, status: InstrumentStatus) {
674 if !self.matching_engines.contains_key(&status.instrument_id) {
677 let instrument = {
678 let cache = self.cache.as_ref().borrow();
679 cache.instrument(&status.instrument_id).cloned()
680 };
681
682 if let Some(instrument) = instrument {
683 self.add_instrument(instrument).unwrap();
684 } else {
685 panic!(
686 "No matching engine found for instrument {}",
687 status.instrument_id
688 );
689 }
690 }
691
692 if let Some(matching_engine) = self.matching_engines.get_mut(&status.instrument_id) {
693 matching_engine.process_status(status.action);
694 } else {
695 panic!("Matching engine should be initialized");
696 }
697 }
698
699 pub fn process(&mut self, ts_now: UnixNanos) {
703 while let Some(inflight) = self.inflight_queue.peek() {
707 if inflight.timestamp > ts_now {
708 break;
710 }
711 let inflight = self.inflight_queue.pop().unwrap();
713 self.process_trading_command(inflight.command);
714 }
715
716 while let Some(command) = self.message_queue.pop_front() {
718 self.process_trading_command(command);
719 }
720 }
721
722 pub fn reset(&mut self) {
723 for module in &self.modules {
724 module.reset();
725 }
726
727 self.generate_fresh_account_state();
728
729 for matching_engine in self.matching_engines.values_mut() {
730 matching_engine.reset();
731 }
732
733 log::info!("Resetting exchange state");
735 }
736
737 pub fn process_trading_command(&mut self, command: TradingCommand) {
741 if let Some(matching_engine) = self.matching_engines.get_mut(&command.instrument_id()) {
742 let account_id = if let Some(exec_client) = &self.exec_client {
743 exec_client.account_id()
744 } else {
745 panic!("Execution client should be initialized");
746 };
747 match command {
748 TradingCommand::SubmitOrder(mut command) => {
749 matching_engine.process_order(&mut command.order, account_id);
750 }
751 TradingCommand::ModifyOrder(ref command) => {
752 matching_engine.process_modify(command, account_id);
753 }
754 TradingCommand::CancelOrder(ref command) => {
755 matching_engine.process_cancel(command, account_id);
756 }
757 TradingCommand::CancelAllOrders(ref command) => {
758 matching_engine.process_cancel_all(command, account_id);
759 }
760 TradingCommand::BatchCancelOrders(ref command) => {
761 matching_engine.process_batch_cancel(command, account_id);
762 }
763 TradingCommand::SubmitOrderList(mut command) => {
764 for order in &mut command.order_list.orders {
765 matching_engine.process_order(order, account_id);
766 }
767 }
768 _ => {}
769 }
770 } else {
771 panic!(
772 "Matching engine not found for instrument {}",
773 command.instrument_id()
774 );
775 }
776 }
777
778 pub fn generate_fresh_account_state(&self) {
782 let balances: Vec<AccountBalance> = self
783 .starting_balances
784 .iter()
785 .map(|money| AccountBalance::new(*money, Money::zero(money.currency), *money))
786 .collect();
787
788 if let Some(exec_client) = &self.exec_client {
789 exec_client
790 .generate_account_state(balances, vec![], true, self.clock.borrow().timestamp_ns())
791 .unwrap();
792 }
793
794 if let Some(AccountAny::Margin(mut margin_account)) = self.get_account() {
796 margin_account.set_default_leverage(self.default_leverage);
797
798 for (instrument_id, leverage) in &self.leverages {
800 margin_account.set_leverage(*instrument_id, *leverage);
801 }
802 }
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use std::{cell::RefCell, collections::BinaryHeap, rc::Rc};
809
810 use ahash::AHashMap;
811 use nautilus_common::{
812 cache::Cache,
813 clock::TestClock,
814 messages::execution::{SubmitOrder, TradingCommand},
815 msgbus::{
816 self,
817 stubs::{get_message_saving_handler, get_saved_messages},
818 },
819 };
820 use nautilus_core::{UUID4, UnixNanos};
821 use nautilus_execution::models::{
822 fee::{FeeModelAny, MakerTakerFeeModel},
823 fill::FillModel,
824 latency::StaticLatencyModel,
825 };
826 use nautilus_model::{
827 accounts::{AccountAny, MarginAccount},
828 data::{
829 Bar, BarType, BookOrder, InstrumentStatus, OrderBookDelta, OrderBookDeltas, QuoteTick,
830 TradeTick,
831 },
832 enums::{
833 AccountType, AggressorSide, BookAction, BookType, MarketStatus, MarketStatusAction,
834 OmsType, OrderSide, OrderType,
835 },
836 events::AccountState,
837 identifiers::{AccountId, InstrumentId, StrategyId, TradeId, TraderId, Venue},
838 instruments::{CryptoPerpetual, InstrumentAny, stubs::crypto_perpetual_ethusdt},
839 orders::OrderTestBuilder,
840 stubs::TestDefault,
841 types::{AccountBalance, Currency, Money, Price, Quantity},
842 };
843 use rstest::rstest;
844
845 use crate::{
846 exchange::{InflightCommand, SimulatedExchange},
847 execution_client::BacktestExecutionClient,
848 };
849
850 fn get_exchange(
851 venue: Venue,
852 account_type: AccountType,
853 book_type: BookType,
854 cache: Option<Rc<RefCell<Cache>>>,
855 ) -> Rc<RefCell<SimulatedExchange>> {
856 let cache = cache.unwrap_or(Rc::new(RefCell::new(Cache::default())));
857 let clock = Rc::new(RefCell::new(TestClock::new()));
858 let exchange = Rc::new(RefCell::new(
859 SimulatedExchange::new(
860 venue,
861 OmsType::Netting,
862 account_type,
863 vec![Money::new(1000.0, Currency::USD())],
864 None,
865 1.into(),
866 AHashMap::new(),
867 vec![],
868 cache.clone(),
869 clock,
870 FillModel::default(),
871 FeeModelAny::MakerTaker(MakerTakerFeeModel),
872 book_type,
873 None, None, None, None, None, None, None, None, None, None, None, None, None, None, )
888 .unwrap(),
889 ));
890
891 let clock = TestClock::new();
892 let execution_client = BacktestExecutionClient::new(
893 TraderId::test_default(),
894 AccountId::test_default(),
895 exchange.clone(),
896 cache,
897 Rc::new(RefCell::new(clock)),
898 None,
899 None,
900 );
901 exchange
902 .borrow_mut()
903 .register_client(Rc::new(execution_client));
904
905 exchange
906 }
907
908 fn create_submit_order_command(ts_init: UnixNanos) -> TradingCommand {
909 let instrument_id = InstrumentId::from("ETHUSDT-PERP.BINANCE");
910 let order = OrderTestBuilder::new(OrderType::Market)
911 .instrument_id(instrument_id)
912 .quantity(Quantity::from(1))
913 .build();
914 TradingCommand::SubmitOrder(SubmitOrder::new(
915 TraderId::test_default(),
916 None,
917 StrategyId::test_default(),
918 instrument_id,
919 order,
920 None,
921 None,
922 None, UUID4::default(),
924 ts_init,
925 ))
926 }
927
928 #[rstest]
929 #[should_panic(
930 expected = "Condition failed: 'Venue of instrument id' value of BINANCE was not equal to 'Venue of simulated exchange' value of SIM"
931 )]
932 fn test_venue_mismatch_between_exchange_and_instrument(
933 crypto_perpetual_ethusdt: CryptoPerpetual,
934 ) {
935 let exchange = get_exchange(
936 Venue::new("SIM"),
937 AccountType::Margin,
938 BookType::L1_MBP,
939 None,
940 );
941 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
942 exchange.borrow_mut().add_instrument(instrument).unwrap();
943 }
944
945 #[rstest]
946 #[should_panic(expected = "Cash account cannot trade futures or perpetuals")]
947 fn test_cash_account_trading_futures_or_perpetuals(crypto_perpetual_ethusdt: CryptoPerpetual) {
948 let exchange = get_exchange(
949 Venue::new("BINANCE"),
950 AccountType::Cash,
951 BookType::L1_MBP,
952 None,
953 );
954 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
955 exchange.borrow_mut().add_instrument(instrument).unwrap();
956 }
957
958 #[rstest]
959 fn test_exchange_process_quote_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
960 let exchange = get_exchange(
961 Venue::new("BINANCE"),
962 AccountType::Margin,
963 BookType::L1_MBP,
964 None,
965 );
966 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
967
968 exchange.borrow_mut().add_instrument(instrument).unwrap();
970
971 let quote_tick = QuoteTick::new(
973 crypto_perpetual_ethusdt.id,
974 Price::from("1000.00"),
975 Price::from("1001.00"),
976 Quantity::from("1.000"),
977 Quantity::from("1.000"),
978 UnixNanos::default(),
979 UnixNanos::default(),
980 );
981 exchange.borrow_mut().process_quote_tick("e_tick);
982
983 let best_bid_price = exchange
984 .borrow()
985 .best_bid_price(crypto_perpetual_ethusdt.id);
986 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
987 let best_ask_price = exchange
988 .borrow()
989 .best_ask_price(crypto_perpetual_ethusdt.id);
990 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
991 }
992
993 #[rstest]
994 fn test_exchange_process_trade_tick(crypto_perpetual_ethusdt: CryptoPerpetual) {
995 let exchange = get_exchange(
996 Venue::new("BINANCE"),
997 AccountType::Margin,
998 BookType::L1_MBP,
999 None,
1000 );
1001 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1002
1003 exchange.borrow_mut().add_instrument(instrument).unwrap();
1005
1006 let trade_tick = TradeTick::new(
1008 crypto_perpetual_ethusdt.id,
1009 Price::from("1000.00"),
1010 Quantity::from("1.000"),
1011 AggressorSide::Buyer,
1012 TradeId::from("1"),
1013 UnixNanos::default(),
1014 UnixNanos::default(),
1015 );
1016 exchange.borrow_mut().process_trade_tick(&trade_tick);
1017
1018 let best_bid_price = exchange
1019 .borrow()
1020 .best_bid_price(crypto_perpetual_ethusdt.id);
1021 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1022 let best_ask = exchange
1023 .borrow()
1024 .best_ask_price(crypto_perpetual_ethusdt.id);
1025 assert_eq!(best_ask, Some(Price::from("1000.00")));
1026 }
1027
1028 #[rstest]
1029 fn test_exchange_process_bar_last_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1030 let exchange = get_exchange(
1031 Venue::new("BINANCE"),
1032 AccountType::Margin,
1033 BookType::L1_MBP,
1034 None,
1035 );
1036 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1037
1038 exchange.borrow_mut().add_instrument(instrument).unwrap();
1040
1041 let bar = Bar::new(
1043 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-LAST-EXTERNAL"),
1044 Price::from("1500.00"),
1045 Price::from("1505.00"),
1046 Price::from("1490.00"),
1047 Price::from("1502.00"),
1048 Quantity::from("100.000"),
1049 UnixNanos::default(),
1050 UnixNanos::default(),
1051 );
1052 exchange.borrow_mut().process_bar(bar);
1053
1054 let best_bid_price = exchange
1056 .borrow()
1057 .best_bid_price(crypto_perpetual_ethusdt.id);
1058 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1059 let best_ask_price = exchange
1060 .borrow()
1061 .best_ask_price(crypto_perpetual_ethusdt.id);
1062 assert_eq!(best_ask_price, Some(Price::from("1502.00")));
1063 }
1064
1065 #[rstest]
1066 fn test_exchange_process_bar_bid_ask_bar_spec(crypto_perpetual_ethusdt: CryptoPerpetual) {
1067 let exchange = get_exchange(
1068 Venue::new("BINANCE"),
1069 AccountType::Margin,
1070 BookType::L1_MBP,
1071 None,
1072 );
1073 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1074
1075 exchange.borrow_mut().add_instrument(instrument).unwrap();
1077
1078 let bar_bid = Bar::new(
1081 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-BID-EXTERNAL"),
1082 Price::from("1500.00"),
1083 Price::from("1505.00"),
1084 Price::from("1490.00"),
1085 Price::from("1502.00"),
1086 Quantity::from("100.000"),
1087 UnixNanos::from(1),
1088 UnixNanos::from(1),
1089 );
1090 let bar_ask = Bar::new(
1091 BarType::from("ETHUSDT-PERP.BINANCE-1-MINUTE-ASK-EXTERNAL"),
1092 Price::from("1501.00"),
1093 Price::from("1506.00"),
1094 Price::from("1491.00"),
1095 Price::from("1503.00"),
1096 Quantity::from("100.000"),
1097 UnixNanos::from(1),
1098 UnixNanos::from(1),
1099 );
1100
1101 exchange.borrow_mut().process_bar(bar_bid);
1103 exchange.borrow_mut().process_bar(bar_ask);
1104
1105 let best_bid_price = exchange
1107 .borrow()
1108 .best_bid_price(crypto_perpetual_ethusdt.id);
1109 assert_eq!(best_bid_price, Some(Price::from("1502.00")));
1110 let best_ask_price = exchange
1111 .borrow()
1112 .best_ask_price(crypto_perpetual_ethusdt.id);
1113 assert_eq!(best_ask_price, Some(Price::from("1503.00")));
1114 }
1115
1116 #[rstest]
1117 fn test_exchange_process_orderbook_delta(crypto_perpetual_ethusdt: CryptoPerpetual) {
1118 let exchange = get_exchange(
1119 Venue::new("BINANCE"),
1120 AccountType::Margin,
1121 BookType::L2_MBP,
1122 None,
1123 );
1124 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1125
1126 exchange.borrow_mut().add_instrument(instrument).unwrap();
1128
1129 let delta_buy = OrderBookDelta::new(
1131 crypto_perpetual_ethusdt.id,
1132 BookAction::Add,
1133 BookOrder::new(
1134 OrderSide::Buy,
1135 Price::from("1000.00"),
1136 Quantity::from("1.000"),
1137 1,
1138 ),
1139 0,
1140 0,
1141 UnixNanos::from(1),
1142 UnixNanos::from(1),
1143 );
1144 let delta_sell = OrderBookDelta::new(
1145 crypto_perpetual_ethusdt.id,
1146 BookAction::Add,
1147 BookOrder::new(
1148 OrderSide::Sell,
1149 Price::from("1001.00"),
1150 Quantity::from("1.000"),
1151 1,
1152 ),
1153 0,
1154 1,
1155 UnixNanos::from(2),
1156 UnixNanos::from(2),
1157 );
1158
1159 exchange.borrow_mut().process_order_book_delta(delta_buy);
1161 exchange.borrow_mut().process_order_book_delta(delta_sell);
1162
1163 let book = exchange
1164 .borrow()
1165 .get_book(crypto_perpetual_ethusdt.id)
1166 .unwrap()
1167 .clone();
1168 assert_eq!(book.update_count, 2);
1169 assert_eq!(book.sequence, 1);
1170 assert_eq!(book.ts_last, UnixNanos::from(2));
1171 let best_bid_price = exchange
1172 .borrow()
1173 .best_bid_price(crypto_perpetual_ethusdt.id);
1174 assert_eq!(best_bid_price, Some(Price::from("1000.00")));
1175 let best_ask_price = exchange
1176 .borrow()
1177 .best_ask_price(crypto_perpetual_ethusdt.id);
1178 assert_eq!(best_ask_price, Some(Price::from("1001.00")));
1179 }
1180
1181 #[rstest]
1182 fn test_exchange_process_orderbook_deltas(crypto_perpetual_ethusdt: CryptoPerpetual) {
1183 let exchange = get_exchange(
1184 Venue::new("BINANCE"),
1185 AccountType::Margin,
1186 BookType::L2_MBP,
1187 None,
1188 );
1189 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1190
1191 exchange.borrow_mut().add_instrument(instrument).unwrap();
1193
1194 let delta_sell_1 = OrderBookDelta::new(
1196 crypto_perpetual_ethusdt.id,
1197 BookAction::Add,
1198 BookOrder::new(
1199 OrderSide::Sell,
1200 Price::from("1000.00"),
1201 Quantity::from("3.000"),
1202 1,
1203 ),
1204 0,
1205 0,
1206 UnixNanos::from(1),
1207 UnixNanos::from(1),
1208 );
1209 let delta_sell_2 = OrderBookDelta::new(
1210 crypto_perpetual_ethusdt.id,
1211 BookAction::Add,
1212 BookOrder::new(
1213 OrderSide::Sell,
1214 Price::from("1001.00"),
1215 Quantity::from("1.000"),
1216 1,
1217 ),
1218 0,
1219 1,
1220 UnixNanos::from(1),
1221 UnixNanos::from(1),
1222 );
1223 let orderbook_deltas = OrderBookDeltas::new(
1224 crypto_perpetual_ethusdt.id,
1225 vec![delta_sell_1, delta_sell_2],
1226 );
1227
1228 exchange
1230 .borrow_mut()
1231 .process_order_book_deltas(orderbook_deltas);
1232
1233 let book = exchange
1234 .borrow()
1235 .get_book(crypto_perpetual_ethusdt.id)
1236 .unwrap()
1237 .clone();
1238 assert_eq!(book.update_count, 2);
1239 assert_eq!(book.sequence, 1);
1240 assert_eq!(book.ts_last, UnixNanos::from(1));
1241 let best_bid_price = exchange
1242 .borrow()
1243 .best_bid_price(crypto_perpetual_ethusdt.id);
1244 assert_eq!(best_bid_price, None);
1246 let best_ask_price = exchange
1247 .borrow()
1248 .best_ask_price(crypto_perpetual_ethusdt.id);
1249 assert_eq!(best_ask_price, Some(Price::from("1000.00")));
1251 }
1252
1253 #[rstest]
1254 fn test_exchange_process_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
1255 let exchange = get_exchange(
1256 Venue::new("BINANCE"),
1257 AccountType::Margin,
1258 BookType::L2_MBP,
1259 None,
1260 );
1261 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1262
1263 exchange.borrow_mut().add_instrument(instrument).unwrap();
1265
1266 let instrument_status = InstrumentStatus::new(
1267 crypto_perpetual_ethusdt.id,
1268 MarketStatusAction::Close, UnixNanos::from(1),
1270 UnixNanos::from(1),
1271 None,
1272 None,
1273 None,
1274 None,
1275 None,
1276 );
1277
1278 exchange
1279 .borrow_mut()
1280 .process_instrument_status(instrument_status);
1281
1282 let market_status = exchange
1283 .borrow()
1284 .get_matching_engine(&crypto_perpetual_ethusdt.id)
1285 .unwrap()
1286 .market_status;
1287 assert_eq!(market_status, MarketStatus::Closed);
1288 }
1289
1290 #[rstest]
1291 fn test_accounting() {
1292 let account_type = AccountType::Margin;
1293 let mut cache = Cache::default();
1294 let handler = get_message_saving_handler::<AccountState>(None);
1295 msgbus::register("Portfolio.update_account".into(), handler.clone());
1296 let margin_account = MarginAccount::new(
1297 AccountState::new(
1298 AccountId::from("SIM-001"),
1299 account_type,
1300 vec![AccountBalance::new(
1301 Money::from("1000 USD"),
1302 Money::from("0 USD"),
1303 Money::from("1000 USD"),
1304 )],
1305 vec![],
1306 false,
1307 UUID4::default(),
1308 UnixNanos::default(),
1309 UnixNanos::default(),
1310 None,
1311 ),
1312 false,
1313 );
1314 let () = cache
1315 .add_account(AccountAny::Margin(margin_account))
1316 .unwrap();
1317 cache.build_index();
1319
1320 let exchange = get_exchange(
1321 Venue::new("SIM"),
1322 account_type,
1323 BookType::L2_MBP,
1324 Some(Rc::new(RefCell::new(cache))),
1325 );
1326 exchange.borrow_mut().initialize_account();
1327
1328 exchange.borrow_mut().adjust_account(Money::from("500 USD"));
1330
1331 let messages = get_saved_messages::<AccountState>(handler);
1333 assert_eq!(messages.len(), 2);
1334 let account_state_first = messages.first().unwrap();
1335 let account_state_second = messages.last().unwrap();
1336
1337 assert_eq!(account_state_first.balances.len(), 1);
1338 let current_balance = account_state_first.balances[0];
1339 assert_eq!(current_balance.free, Money::new(1000.0, Currency::USD()));
1340 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1341 assert_eq!(current_balance.total, Money::new(1000.0, Currency::USD()));
1342
1343 assert_eq!(account_state_second.balances.len(), 1);
1344 let current_balance = account_state_second.balances[0];
1345 assert_eq!(current_balance.free, Money::new(1500.0, Currency::USD()));
1346 assert_eq!(current_balance.locked, Money::new(0.0, Currency::USD()));
1347 assert_eq!(current_balance.total, Money::new(1500.0, Currency::USD()));
1348 }
1349
1350 #[rstest]
1351 fn test_inflight_commands_binary_heap_ordering_respecting_timestamp_counter() {
1352 let inflight1 = InflightCommand::new(
1354 UnixNanos::from(100),
1355 1,
1356 create_submit_order_command(UnixNanos::from(100)),
1357 );
1358 let inflight2 = InflightCommand::new(
1359 UnixNanos::from(200),
1360 2,
1361 create_submit_order_command(UnixNanos::from(200)),
1362 );
1363 let inflight3 = InflightCommand::new(
1364 UnixNanos::from(100),
1365 2,
1366 create_submit_order_command(UnixNanos::from(100)),
1367 );
1368
1369 let mut inflight_heap = BinaryHeap::new();
1371 inflight_heap.push(inflight1);
1372 inflight_heap.push(inflight2);
1373 inflight_heap.push(inflight3);
1374
1375 let first = inflight_heap.pop().unwrap();
1378 let second = inflight_heap.pop().unwrap();
1379 let third = inflight_heap.pop().unwrap();
1380
1381 assert_eq!(first.timestamp, UnixNanos::from(100));
1382 assert_eq!(first.counter, 1);
1383 assert_eq!(second.timestamp, UnixNanos::from(100));
1384 assert_eq!(second.counter, 2);
1385 assert_eq!(third.timestamp, UnixNanos::from(200));
1386 assert_eq!(third.counter, 2);
1387 }
1388
1389 #[rstest]
1390 fn test_process_without_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1391 let exchange = get_exchange(
1392 Venue::new("BINANCE"),
1393 AccountType::Margin,
1394 BookType::L2_MBP,
1395 None,
1396 );
1397
1398 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1399 exchange.borrow_mut().add_instrument(instrument).unwrap();
1400
1401 let command1 = create_submit_order_command(UnixNanos::from(100));
1402 let command2 = create_submit_order_command(UnixNanos::from(200));
1403
1404 exchange.borrow_mut().send(command1);
1405 exchange.borrow_mut().send(command2);
1406
1407 assert_eq!(exchange.borrow().message_queue.len(), 2);
1410 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1411
1412 exchange.borrow_mut().process(UnixNanos::from(300));
1414 assert_eq!(exchange.borrow().message_queue.len(), 0);
1415 assert_eq!(exchange.borrow().inflight_queue.len(), 0);
1416 }
1417
1418 #[rstest]
1419 fn test_process_with_latency_model(crypto_perpetual_ethusdt: CryptoPerpetual) {
1420 let latency_model = StaticLatencyModel::new(
1423 UnixNanos::from(100),
1424 UnixNanos::from(200),
1425 UnixNanos::from(300),
1426 UnixNanos::from(100),
1427 );
1428 let exchange = get_exchange(
1429 Venue::new("BINANCE"),
1430 AccountType::Margin,
1431 BookType::L2_MBP,
1432 None,
1433 );
1434 exchange
1435 .borrow_mut()
1436 .set_latency_model(Box::new(latency_model));
1437
1438 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
1439 exchange.borrow_mut().add_instrument(instrument).unwrap();
1440
1441 let command1 = create_submit_order_command(UnixNanos::from(100));
1442 let command2 = create_submit_order_command(UnixNanos::from(150));
1443 exchange.borrow_mut().send(command1);
1444 exchange.borrow_mut().send(command2);
1445
1446 assert_eq!(exchange.borrow().message_queue.len(), 0);
1448 assert_eq!(exchange.borrow().inflight_queue.len(), 2);
1449 assert_eq!(
1451 exchange
1452 .borrow()
1453 .inflight_queue
1454 .iter()
1455 .next()
1456 .unwrap()
1457 .timestamp,
1458 UnixNanos::from(400)
1459 );
1460 assert_eq!(
1462 exchange
1463 .borrow()
1464 .inflight_queue
1465 .iter()
1466 .nth(1)
1467 .unwrap()
1468 .timestamp,
1469 UnixNanos::from(450)
1470 );
1471
1472 exchange.borrow_mut().process(UnixNanos::from(420));
1474 assert_eq!(exchange.borrow().message_queue.len(), 0);
1475 assert_eq!(exchange.borrow().inflight_queue.len(), 1);
1476 assert_eq!(
1477 exchange
1478 .borrow()
1479 .inflight_queue
1480 .iter()
1481 .next()
1482 .unwrap()
1483 .timestamp,
1484 UnixNanos::from(450)
1485 );
1486 }
1487}