1use std::{
19 any::Any,
20 cell::RefCell,
21 fmt::Debug,
22 rc::{Rc, Weak},
23 sync::Arc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use nautilus_analysis::analyzer::PortfolioAnalyzer;
28use nautilus_common::{
29 actor::DataActor,
30 cache::Cache,
31 clock::{Clock, TestClock},
32 component::Component,
33 enums::LogColor,
34 log_info,
35 logging::{
36 logging_clock_set_realtime_mode, logging_clock_set_static_mode,
37 logging_clock_set_static_time,
38 },
39 runner::{
40 SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
41 drain_data_cmd_queue, drain_trading_cmd_queue, replace_data_cmd_sender,
42 replace_exec_cmd_sender, trading_cmd_queue_is_empty,
43 },
44 timer::{TimeEvent, TimeEventCallback},
45};
46use nautilus_core::{
47 UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, string::formatting::Separable,
48};
49use nautilus_data::client::DataClientAdapter;
50use nautilus_execution::models::fill::FillModelAny;
51use nautilus_model::{
52 accounts::{Account, AccountAny},
53 data::{Data, HasTsInit},
54 enums::{AccountType, AggregationSource, BookType},
55 identifiers::{AccountId, ClientId, InstrumentId, TraderId, Venue},
56 instruments::{Instrument, InstrumentAny},
57 position::Position,
58 types::Price,
59};
60use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
61use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
62
63use crate::{
64 accumulator::TimeEventAccumulator,
65 config::{BacktestEngineConfig, SimulatedVenueConfig},
66 data_client::BacktestDataClient,
67 data_iterator::BacktestDataIterator,
68 exchange::SimulatedExchange,
69 execution_client::BacktestExecutionClient,
70 result::BacktestResult,
71};
72
73pub struct BacktestEngine {
86 instance_id: UUID4,
87 config: BacktestEngineConfig,
88 kernel: NautilusKernel,
89 accumulator: TimeEventAccumulator,
90 run_config_id: Option<String>,
91 run_id: Option<UUID4>,
92 venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
93 exec_clients: Vec<BacktestExecutionClient>,
94 has_data: AHashSet<InstrumentId>,
95 has_book_data: AHashSet<InstrumentId>,
96 data_iterator: BacktestDataIterator,
97 data_len: usize,
98 data_stream_counter: usize,
99 ts_first: Option<UnixNanos>,
100 ts_last_data: Option<UnixNanos>,
101 sorted: bool,
102 iteration: usize,
103 force_stop: bool,
104 last_ns: UnixNanos,
105 last_module_ns: Option<UnixNanos>,
106 last_liquidation_ns: Option<UnixNanos>,
107 end_ns: UnixNanos,
108 run_started: Option<UnixNanos>,
109 run_finished: Option<UnixNanos>,
110 backtest_start: Option<UnixNanos>,
111 backtest_end: Option<UnixNanos>,
112}
113
114impl Debug for BacktestEngine {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct(stringify!(BacktestEngine))
117 .field("instance_id", &self.instance_id)
118 .field("run_config_id", &self.run_config_id)
119 .field("run_id", &self.run_id)
120 .finish()
121 }
122}
123
124impl BacktestEngine {
125 pub fn new(mut config: BacktestEngineConfig) -> anyhow::Result<Self> {
131 let mut cache_config = config.cache.unwrap_or_default();
134 cache_config.drop_instruments_on_reset = false;
135 config.cache = Some(cache_config);
136 let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
137 Ok(Self {
138 instance_id: kernel.instance_id,
139 config,
140 accumulator: TimeEventAccumulator::new(),
141 kernel,
142 run_config_id: None,
143 run_id: None,
144 venues: AHashMap::new(),
145 exec_clients: Vec::new(),
146 has_data: AHashSet::new(),
147 has_book_data: AHashSet::new(),
148 data_iterator: BacktestDataIterator::new(),
149 data_len: 0,
150 data_stream_counter: 0,
151 ts_first: None,
152 ts_last_data: None,
153 sorted: true,
154 iteration: 0,
155 force_stop: false,
156 last_ns: UnixNanos::default(),
157 last_module_ns: None,
158 last_liquidation_ns: None,
159 end_ns: UnixNanos::default(),
160 run_started: None,
161 run_finished: None,
162 backtest_start: None,
163 backtest_end: None,
164 })
165 }
166
167 #[must_use]
169 pub const fn kernel(&self) -> &NautilusKernel {
170 &self.kernel
171 }
172
173 pub fn kernel_mut(&mut self) -> &mut NautilusKernel {
175 &mut self.kernel
176 }
177
178 #[must_use]
180 pub fn trader_id(&self) -> TraderId {
181 self.kernel.trader_id()
182 }
183
184 #[must_use]
186 pub fn machine_id(&self) -> &str {
187 self.kernel.machine_id()
188 }
189
190 #[must_use]
192 pub fn instance_id(&self) -> UUID4 {
193 self.instance_id
194 }
195
196 #[must_use]
198 pub fn iteration(&self) -> usize {
199 self.iteration
200 }
201
202 #[must_use]
204 pub fn run_config_id(&self) -> Option<&str> {
205 self.run_config_id.as_deref()
206 }
207
208 #[must_use]
210 pub const fn run_id(&self) -> Option<UUID4> {
211 self.run_id
212 }
213
214 #[must_use]
216 pub const fn run_started(&self) -> Option<UnixNanos> {
217 self.run_started
218 }
219
220 #[must_use]
222 pub const fn run_finished(&self) -> Option<UnixNanos> {
223 self.run_finished
224 }
225
226 #[must_use]
228 pub const fn backtest_start(&self) -> Option<UnixNanos> {
229 self.backtest_start
230 }
231
232 #[must_use]
234 pub const fn backtest_end(&self) -> Option<UnixNanos> {
235 self.backtest_end
236 }
237
238 #[must_use]
240 pub fn list_venues(&self) -> Vec<Venue> {
241 self.venues.keys().copied().collect()
242 }
243
244 pub fn add_venue(&mut self, config: SimulatedVenueConfig) -> anyhow::Result<()> {
248 let venue = config.venue;
251 let routing = Some(config.routing);
252 let frozen_account = Some(config.frozen_account);
253
254 let exchange =
255 SimulatedExchange::new(config, self.kernel.cache.clone(), self.kernel.clock.clone())?;
256 let exchange = Rc::new(RefCell::new(exchange));
257 SimulatedExchange::register_spread_quote_endpoint(&exchange);
258 self.venues.insert(venue, exchange.clone());
259
260 let account_id = AccountId::from(format!("{venue}-001").as_str());
261
262 let exec_client = BacktestExecutionClient::new(
263 self.config.trader_id(),
264 account_id,
265 &exchange,
266 self.kernel.cache.clone(),
267 self.kernel.clock.clone(),
268 routing,
269 frozen_account,
270 );
271
272 exchange
273 .borrow_mut()
274 .register_client(Rc::new(exec_client.clone()));
275
276 self.exec_clients.push(exec_client.clone());
277
278 self.kernel
279 .exec_engine
280 .borrow_mut()
281 .register_client(Box::new(exec_client))?;
282
283 log::info!("Adding exchange {venue} to engine");
284
285 Ok(())
286 }
287
288 pub fn set_settlement_price(
294 &mut self,
295 venue: Venue,
296 instrument_id: InstrumentId,
297 price: Price,
298 ) -> anyhow::Result<()> {
299 let exchange = self
300 .venues
301 .get_mut(&venue)
302 .ok_or_else(|| anyhow::anyhow!("Unknown venue {venue}"))?;
303 exchange
304 .borrow_mut()
305 .set_settlement_price(instrument_id, price);
306 Ok(())
307 }
308
309 pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
311 if let Some(exchange) = self.venues.get_mut(&venue) {
312 exchange.borrow_mut().set_fill_model(fill_model);
313 } else {
314 log::warn!(
315 "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
316 );
317 }
318 }
319
320 pub fn add_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
329 let instrument_id = instrument.id();
330 if let Some(exchange) = self.venues.get(&instrument.id().venue) {
331 if matches!(
332 instrument,
333 InstrumentAny::CurrencyPair(_) | InstrumentAny::TokenizedAsset(_)
334 ) && exchange.borrow().account_type != AccountType::Margin
335 && exchange.borrow().base_currency.is_some()
336 {
337 anyhow::bail!(
338 "Cannot add a multi-currency spot instrument {instrument_id} for a venue with a single-currency CASH account"
339 )
340 }
341 exchange.borrow_mut().add_instrument(instrument.clone())?;
342 if let Some(expiration_ns) = instrument.expiration_ns() {
343 self.set_instrument_expiration_timer(exchange, instrument_id, expiration_ns)?;
344 }
345 } else {
346 anyhow::bail!(
347 "Cannot add an `Instrument` object without first adding its associated venue {}",
348 instrument.id().venue
349 )
350 }
351
352 self.add_market_data_client_if_not_exists(instrument.id().venue);
353
354 self.kernel
355 .data_engine
356 .borrow_mut()
357 .process(instrument as &dyn Any);
358 log::info!(
359 "Added instrument {} to exchange {}",
360 instrument_id,
361 instrument_id.venue
362 );
363 Ok(())
364 }
365
366 pub fn add_data(
377 &mut self,
378 data: Vec<Data>,
379 _client_id: Option<ClientId>,
380 validate: bool,
381 sort: bool,
382 ) -> anyhow::Result<()> {
383 anyhow::ensure!(!data.is_empty(), "data was empty");
384
385 let count = data.len();
386 let mut to_add = data;
387
388 if sort {
389 to_add.sort_by_key(HasTsInit::ts_init);
390 }
391
392 if validate {
393 let first = &to_add[0];
396 #[cfg(feature = "defi")]
397 let first_is_defi = matches!(first, Data::Defi(_));
398 #[cfg(not(feature = "defi"))]
399 let first_is_defi = false;
400
401 if !first_is_defi {
402 let first_instrument_id = first.instrument_id();
403 anyhow::ensure!(
404 self.kernel
405 .cache
406 .borrow()
407 .instrument(&first_instrument_id)
408 .is_some(),
409 "Instrument {first_instrument_id} for the given data not found in the cache. \
410 Add the instrument through `add_instrument()` prior to adding related data."
411 );
412
413 if let Data::Bar(bar) = first {
414 anyhow::ensure!(
415 bar.bar_type.aggregation_source() == AggregationSource::External,
416 "bar_type.aggregation_source must be External, was {:?}",
417 bar.bar_type.aggregation_source(),
418 );
419 }
420 }
421 }
422
423 let mut batch_min_ts: Option<UnixNanos> = None;
429 let mut batch_max_ts: Option<UnixNanos> = None;
430
431 #[cfg(feature = "defi")]
432 if to_add.iter().any(|item| matches!(item, Data::Defi(_))) {
433 self.add_defi_data_client_if_not_exists(_client_id);
434 }
435
436 for item in &to_add {
437 #[cfg(feature = "defi")]
438 if matches!(item, Data::Defi(_)) {
439 let ts = item.ts_init();
440 batch_min_ts = Some(batch_min_ts.map_or(ts, |cur| cur.min(ts)));
441 batch_max_ts = Some(batch_max_ts.map_or(ts, |cur| cur.max(ts)));
442 continue;
443 }
444
445 let instr_id = item.instrument_id();
446 self.has_data.insert(instr_id);
447
448 if item.is_order_book_data() {
449 self.has_book_data.insert(instr_id);
450 }
451
452 self.add_market_data_client_if_not_exists(instr_id.venue);
453
454 let ts = item.ts_init();
455 batch_min_ts = Some(batch_min_ts.map_or(ts, |cur| cur.min(ts)));
456 batch_max_ts = Some(batch_max_ts.map_or(ts, |cur| cur.max(ts)));
457 }
458
459 if let Some(ts) = batch_min_ts
460 && self.ts_first.is_none_or(|t| ts < t)
461 {
462 self.ts_first = Some(ts);
463 }
464
465 if let Some(ts) = batch_max_ts
466 && self.ts_last_data.is_none_or(|t| ts > t)
467 {
468 self.ts_last_data = Some(ts);
469 }
470
471 self.data_len += count;
472 let stream_name = format!("backtest_data_{}", self.data_stream_counter);
473 self.data_stream_counter += 1;
474 self.data_iterator.add_data(&stream_name, to_add, true);
475
476 self.sorted = sort;
477
478 log::info!(
479 "Added {count} data element{} to BacktestEngine ({} total)",
480 if count == 1 { "" } else { "s" },
481 self.data_len,
482 );
483
484 Ok(())
485 }
486
487 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
494 where
495 T: Strategy + Component + Debug + 'static,
496 {
497 self.kernel.trader.borrow_mut().add_strategy(strategy)
498 }
499
500 pub fn add_strategies<T>(&mut self, strategies: Vec<T>) -> anyhow::Result<()>
506 where
507 T: Strategy + Component + Debug + 'static,
508 {
509 for strategy in strategies {
510 self.add_strategy(strategy)?;
511 }
512 Ok(())
513 }
514
515 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
522 where
523 T: DataActor + Component + Debug + 'static,
524 {
525 self.kernel.trader.borrow_mut().add_actor(actor)
526 }
527
528 pub fn add_actors<T>(&mut self, actors: Vec<T>) -> anyhow::Result<()>
534 where
535 T: DataActor + Component + Debug + 'static,
536 {
537 for actor in actors {
538 self.add_actor(actor)?;
539 }
540 Ok(())
541 }
542
543 pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
549 where
550 T: ExecutionAlgorithm + Component + Debug + 'static,
551 {
552 self.kernel
553 .trader
554 .borrow_mut()
555 .add_exec_algorithm(exec_algorithm)
556 }
557
558 pub fn add_exec_algorithms<T>(&mut self, exec_algorithms: Vec<T>) -> anyhow::Result<()>
565 where
566 T: ExecutionAlgorithm + Component + Debug + 'static,
567 {
568 for exec_algorithm in exec_algorithms {
569 self.add_exec_algorithm(exec_algorithm)?;
570 }
571 Ok(())
572 }
573
574 pub fn run(
591 &mut self,
592 start: Option<UnixNanos>,
593 end: Option<UnixNanos>,
594 run_config_id: Option<String>,
595 streaming: bool,
596 ) -> anyhow::Result<()> {
597 self.run_impl(start, end, run_config_id, streaming)?;
598
599 if !streaming || self.force_stop || self.kernel.is_shutdown_requested() {
604 self.end();
605 }
606
607 Ok(())
608 }
609
610 fn run_impl(
611 &mut self,
612 start: Option<UnixNanos>,
613 end: Option<UnixNanos>,
614 run_config_id: Option<String>,
615 streaming: bool,
616 ) -> anyhow::Result<()> {
617 anyhow::ensure!(
618 self.sorted,
619 "Data has been added but not sorted, call `engine.sort_data()` or use \
620 `engine.add_data(..., sort=true)` before running"
621 );
622
623 for exchange in self.venues.values() {
624 let exchange = exchange.borrow();
625 let book_type_has_depth = exchange.book_type() as u8 > BookType::L1_MBP as u8;
626 if !book_type_has_depth {
627 continue;
628 }
629
630 for instrument_id in exchange.instrument_ids() {
631 let has_data = self.has_data.contains(instrument_id);
632 let missing_book_data = !self.has_book_data.contains(instrument_id);
633 if has_data && missing_book_data {
634 anyhow::bail!(
635 "No order book data found for instrument '{instrument_id}' when `book_type` \
636 is '{:?}'. Set the venue `book_type` to 'L1_MBP' (for top-of-book data \
637 like quotes, trades, and bars) or provide order book data for this \
638 instrument.",
639 exchange.book_type()
640 );
641 }
642 }
643 }
644
645 let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
647 let end_ns = end.unwrap_or_else(|| {
648 self.ts_last_data
649 .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
650 });
651 anyhow::ensure!(start_ns <= end_ns, "start was > end");
652 self.end_ns = end_ns;
653 self.last_ns = start_ns;
654 self.last_module_ns = None;
655
656 let clocks = self.collect_all_clocks();
658 Self::set_all_clocks_time(&clocks, start_ns);
659
660 if self.iteration == 0 {
662 self.set_instrument_expiration_timers()?;
663
664 self.run_config_id = run_config_id;
665 self.run_id = Some(UUID4::new());
666 self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
667 self.backtest_start = Some(start_ns);
668
669 for exchange in self.venues.values() {
670 let mut ex = exchange.borrow_mut();
671 ex.initialize_account();
672 ex.load_open_orders();
673 }
674
675 Self::set_all_clocks_time(&clocks, start_ns);
677
678 self.force_stop = false;
680 self.kernel.reset_shutdown_flag();
681
682 Self::init_command_senders();
684
685 logging_clock_set_static_mode();
687 logging_clock_set_static_time(start_ns.as_u64());
688
689 self.kernel.start();
691 if self.kernel.is_event_store_replay() {
692 self.log_pre_run();
693 return Ok(());
694 }
695
696 if self.kernel.is_event_store_replay_configured() {
697 anyhow::bail!("event-store replay did not start");
698 }
699 self.kernel.start_trader();
700
701 self.log_pre_run();
702 }
703
704 self.log_run();
705
706 let mut data = self.data_iterator.next_item();
708 while let Some(ref d) = data {
709 if d.ts_init() >= start_ns {
710 break;
711 }
712 data = self.data_iterator.next_item();
713 }
714
715 if let Some(ref d) = data {
717 let ts = d.ts_init();
718 self.last_ns = if ts.as_u64() > 0 {
719 UnixNanos::from(ts.as_u64() - 1)
720 } else {
721 UnixNanos::default()
722 };
723 } else {
724 self.last_ns = start_ns;
725 }
726
727 loop {
728 if self.kernel.is_shutdown_requested() {
729 log::info!("Shutdown requested via ShutdownSystem, ending backtest");
730 self.force_stop = true;
731 }
732
733 if self.force_stop {
734 log::error!("Force stop triggered, ending backtest");
735 break;
736 }
737
738 if data.is_none() {
739 if streaming {
740 break;
744 }
745 let done = self.process_next_timer(&clocks);
746 data = self.data_iterator.next_item();
747 if data.is_none() && done {
748 break;
749 }
750 continue;
751 }
752
753 let d = data.as_ref().unwrap();
754 let ts_init = d.ts_init();
755
756 if ts_init > end_ns {
757 break;
758 }
759
760 if ts_init > self.last_ns {
761 self.last_ns = ts_init;
762 self.advance_time_impl(ts_init, &clocks);
763 }
764
765 if self.kernel.is_shutdown_requested() {
768 self.force_stop = true;
769 break;
770 }
771
772 self.route_data_to_exchange(d);
773 self.kernel.data_engine.borrow_mut().process_data(d.clone());
774
775 self.drain_command_queues();
777 self.settle_venues(ts_init);
778
779 let prev_last_ns = self.last_ns;
780 data = self.data_iterator.next_item();
781
782 if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
784 self.flush_accumulator_events(&clocks, prev_last_ns);
785 self.run_venue_modules(prev_last_ns);
786 self.run_venue_liquidations(prev_last_ns);
787 }
788
789 self.iteration += 1;
790 }
791
792 let ts_now = self.kernel.clock.borrow().timestamp_ns();
794 self.settle_venues(ts_now);
795 self.run_venue_modules(ts_now);
796 self.run_venue_liquidations(ts_now);
797
798 let flush_ts = if streaming || self.force_stop || self.kernel.is_shutdown_requested() {
801 self.last_ns
802 } else {
803 end_ns
804 };
805 self.flush_accumulator_events(&clocks, flush_ts);
806
807 Ok(())
808 }
809
810 pub fn end(&mut self) {
812 if self.end_ns.as_u64() > 0 {
819 let clocks = self.collect_all_clocks();
820 let flush_ts = if self.force_stop || self.kernel.is_shutdown_requested() {
821 self.last_ns
822 } else {
823 self.end_ns
824 };
825
826 self.flush_accumulator_events(&clocks, flush_ts);
827 }
828
829 self.kernel.stop_trader();
830
831 let mut ts_now = self.kernel.clock.borrow().timestamp_ns();
834
835 self.drain_command_queues();
837
838 if let Some(max_inflight_ts) = self.max_inflight_command_ts()
841 && max_inflight_ts > ts_now
842 {
843 ts_now = max_inflight_ts;
844 let clocks = self.collect_all_clocks();
845 Self::set_all_clocks_time(&clocks, ts_now);
846 }
847
848 self.settle_venues(ts_now);
849
850 self.kernel.data_engine.borrow_mut().stop();
852 self.kernel.risk_engine.borrow_mut().stop();
853 self.kernel.exec_engine.borrow_mut().stop();
854
855 self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
856 self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
857
858 logging_clock_set_realtime_mode();
860
861 self.log_post_run();
862 }
863
864 pub fn reset(&mut self) {
869 log::debug!("Resetting");
870
871 if self.kernel.trader.borrow().is_running() {
872 self.end();
873 }
874
875 self.kernel.data_engine.borrow_mut().stop();
877 self.kernel.data_engine.borrow_mut().reset();
878
879 self.kernel.exec_engine.borrow_mut().stop();
880
881 for exchange in self.venues.values() {
884 exchange.borrow_mut().reset();
885 }
886 self.kernel.exec_engine.borrow_mut().reset();
887
888 self.kernel.risk_engine.borrow_mut().stop();
889 self.kernel.risk_engine.borrow_mut().reset();
890
891 if let Err(e) = self.kernel.trader.borrow_mut().reset() {
893 log::error!("Error resetting trader: {e:?}");
894 }
895
896 self.kernel.portfolio.borrow_mut().reset();
897
898 self.run_config_id = None;
900 self.run_id = None;
901 self.run_started = None;
902 self.run_finished = None;
903 self.backtest_start = None;
904 self.backtest_end = None;
905 self.iteration = 0;
906 self.force_stop = false;
907 self.last_ns = UnixNanos::default();
908 self.last_module_ns = None;
909 self.last_liquidation_ns = None;
910 self.end_ns = UnixNanos::default();
911
912 self.accumulator.clear();
913
914 self.data_iterator.reset_all_cursors();
916
917 log::info!("Reset");
918 }
919
920 pub fn sort_data(&mut self) {
925 self.sorted = true;
929 log::info!("Data sort requested (iterator merges streams by replay timestamp)");
930 }
931
932 pub fn clear_data(&mut self) {
934 self.has_data.clear();
935 self.has_book_data.clear();
936 self.data_iterator = BacktestDataIterator::new();
937 self.data_len = 0;
938 self.data_stream_counter = 0;
939 self.ts_first = None;
940 self.ts_last_data = None;
941 self.sorted = true;
942 }
943
944 pub fn clear_actors(&mut self) -> anyhow::Result<()> {
950 self.kernel.trader.borrow_mut().clear_actors()
951 }
952
953 pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
959 self.kernel.trader.borrow_mut().clear_strategies()
960 }
961
962 pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
968 self.kernel.trader.borrow_mut().clear_exec_algorithms()
969 }
970
971 pub fn dispose(&mut self) {
973 self.clear_data();
974 self.accumulator.clear();
975 self.kernel.dispose();
976 }
977
978 #[must_use]
980 pub fn get_result(&self) -> BacktestResult {
981 let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
982 (Some(start), Some(end)) => {
983 (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
984 }
985 _ => 0.0,
986 };
987
988 let cache = self.kernel.cache.borrow();
989 let orders = cache.orders(None, None, None, None, None);
990 let total_events = self.kernel.exec_engine.borrow().event_count() as usize;
991 let total_orders = orders.len();
992 let positions: Vec<Position> = cache
993 .positions(None, None, None, None, None)
994 .into_iter()
995 .map(|p| p.cloned())
996 .collect();
997 let cached_positions_count = positions.len();
998 let snapshot_positions = cache.position_snapshots(None, None).len();
999 let total_positions = Self::total_positions_with_snapshots(&cache, cached_positions_count);
1000 let summary = self.build_result_summary(
1001 &cache,
1002 total_events,
1003 total_orders,
1004 cached_positions_count,
1005 snapshot_positions,
1006 );
1007
1008 let analyzer = self.build_analyzer(&cache, &positions);
1009 let mut stats_pnls = AHashMap::new();
1010
1011 for currency in analyzer.currencies() {
1012 if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
1013 stats_pnls.insert(currency.code.to_string(), pnls);
1014 }
1015 }
1016
1017 let stats_returns = analyzer.get_performance_stats_returns();
1018 let stats_general = analyzer.get_performance_stats_general();
1019
1020 BacktestResult {
1021 trader_id: self.config.trader_id().to_string(),
1022 machine_id: self.kernel.machine_id.clone(),
1023 instance_id: self.instance_id,
1024 run_config_id: self.run_config_id.clone(),
1025 run_id: self.run_id,
1026 run_started: self.run_started,
1027 run_finished: self.run_finished,
1028 backtest_start: self.backtest_start,
1029 backtest_end: self.backtest_end,
1030 elapsed_time_secs,
1031 iterations: self.iteration,
1032 total_events,
1033 total_orders,
1034 total_positions,
1035 summary,
1036 stats_pnls,
1037 stats_returns,
1038 stats_general,
1039 }
1040 }
1041
1042 fn build_result_summary(
1043 &self,
1044 cache: &Cache,
1045 total_events: usize,
1046 total_orders: usize,
1047 cached_positions_count: usize,
1048 snapshot_positions: usize,
1049 ) -> AHashMap<String, String> {
1050 let mut summary = AHashMap::new();
1051 summary.insert("iterations".to_string(), self.iteration.to_string());
1052 summary.insert("total_events".to_string(), total_events.to_string());
1053 summary.insert("orders.total".to_string(), total_orders.to_string());
1054 summary.insert(
1055 "orders.open".to_string(),
1056 cache
1057 .orders_open_count(None, None, None, None, None)
1058 .to_string(),
1059 );
1060 summary.insert(
1061 "orders.closed".to_string(),
1062 cache
1063 .orders_closed_count(None, None, None, None, None)
1064 .to_string(),
1065 );
1066 summary.insert(
1067 "orders.emulated".to_string(),
1068 cache
1069 .orders_emulated_count(None, None, None, None, None)
1070 .to_string(),
1071 );
1072 summary.insert(
1073 "orders.inflight".to_string(),
1074 cache
1075 .orders_inflight_count(None, None, None, None, None)
1076 .to_string(),
1077 );
1078 summary.insert(
1079 "positions.total".to_string(),
1080 cached_positions_count.to_string(),
1081 );
1082 summary.insert(
1083 "positions.open".to_string(),
1084 cache
1085 .positions_open_count(None, None, None, None, None)
1086 .to_string(),
1087 );
1088 summary.insert(
1089 "positions.closed".to_string(),
1090 cache
1091 .positions_closed_count(None, None, None, None, None)
1092 .to_string(),
1093 );
1094 summary.insert(
1095 "positions.snapshots".to_string(),
1096 snapshot_positions.to_string(),
1097 );
1098 summary.insert(
1099 "positions.total_with_snapshots".to_string(),
1100 (cached_positions_count + snapshot_positions).to_string(),
1101 );
1102
1103 let mut venues: Vec<Venue> = self.venues.keys().copied().collect();
1104 venues.sort_by_key(ToString::to_string);
1105 summary.insert("venues.total".to_string(), venues.len().to_string());
1106
1107 for venue in venues {
1108 let Some(account) = cache.account_for_venue(&venue) else {
1109 continue;
1110 };
1111
1112 let venue_key = venue.to_string();
1113 let account_key = format!("account.{venue_key}");
1114 summary.insert(format!("{account_key}.id"), account.id().to_string());
1115 summary.insert(
1116 format!("{account_key}.type"),
1117 account.account_type().to_string(),
1118 );
1119 summary.insert(
1120 format!("{account_key}.base_currency"),
1121 account
1122 .base_currency()
1123 .map_or_else(|| "None".to_string(), |currency| currency.code.to_string()),
1124 );
1125 summary.insert(
1126 format!("{account_key}.event_count"),
1127 account.event_count().to_string(),
1128 );
1129
1130 let mut balances: Vec<_> = account.balances().into_iter().collect();
1131 balances.sort_by_key(|(currency, _)| currency.code.to_string());
1132
1133 for (currency, balance) in balances {
1134 let balance_key = format!("{account_key}.balance.{}", currency.code);
1135 summary.insert(format!("{balance_key}.total"), balance.total.to_string());
1136 summary.insert(format!("{balance_key}.free"), balance.free.to_string());
1137 summary.insert(format!("{balance_key}.locked"), balance.locked.to_string());
1138 }
1139 }
1140
1141 summary
1142 }
1143
1144 fn build_analyzer(&self, cache: &Cache, positions: &[Position]) -> PortfolioAnalyzer {
1145 let mut analyzer = PortfolioAnalyzer::default();
1146 let mut snapshot_positions = Vec::new();
1147
1148 for position in positions {
1149 snapshot_positions.extend(cache.position_snapshots(Some(&position.id), None));
1150 }
1151
1152 for venue in self.venues.keys() {
1154 if let Some(account) = cache.account_for_venue(venue) {
1155 let account_ref: &dyn Account = match &*account {
1156 AccountAny::Margin(margin) => margin,
1157 AccountAny::Cash(cash) => cash,
1158 AccountAny::Betting(betting) => betting,
1159 };
1160
1161 for (currency, money) in account_ref.starting_balances() {
1162 analyzer
1163 .account_balances_starting
1164 .entry(currency)
1165 .and_modify(|existing| *existing = *existing + money)
1166 .or_insert(money);
1167 }
1168
1169 for (currency, money) in account_ref.balances_total() {
1170 analyzer
1171 .account_balances
1172 .entry(currency)
1173 .and_modify(|existing| *existing = *existing + money)
1174 .or_insert(money);
1175 }
1176 }
1177 }
1178
1179 analyzer.add_positions(positions);
1180 analyzer.add_positions(&snapshot_positions);
1181 analyzer
1182 }
1183
1184 fn route_data_to_exchange(&self, data: &Data) {
1185 if matches!(
1186 data,
1187 Data::MarkPriceUpdate(_)
1188 | Data::IndexPriceUpdate(_)
1189 | Data::OptionGreeks(_)
1190 | Data::Custom(_)
1191 ) {
1192 return;
1193 }
1194 #[cfg(feature = "defi")]
1195 if matches!(data, Data::Defi(_)) {
1196 return;
1197 }
1198
1199 let venue = data.instrument_id().venue;
1200 if let Some(exchange) = self.venues.get(&venue) {
1201 let mut exchange_ref = exchange.borrow_mut();
1202
1203 match data {
1204 Data::Delta(delta) => exchange_ref.process_order_book_delta(*delta),
1205 Data::Deltas(deltas) => exchange_ref.process_order_book_deltas(deltas),
1206 Data::Depth10(depth) => exchange_ref.process_order_book_depth10(depth),
1207 Data::Quote(quote) => exchange_ref.process_quote_tick(quote),
1208 Data::Trade(trade) => exchange_ref.process_trade_tick(trade),
1209 Data::Bar(bar) => exchange_ref.process_bar(*bar),
1210 Data::InstrumentStatus(status) => exchange_ref.process_instrument_status(*status),
1211 Data::InstrumentClose(close) => exchange_ref.process_instrument_close(*close),
1212 Data::FundingRateUpdate(funding) => {
1213 let settlement_ns = exchange_ref.process_funding_rate(*funding);
1214 drop(exchange_ref);
1215 self.schedule_funding_settlement_if_required(
1216 exchange,
1217 funding.instrument_id,
1218 settlement_ns,
1219 );
1220 }
1221 _ => {}
1222 }
1223 } else {
1224 log::warn!("No exchange found for venue {venue}, data not routed");
1225 }
1226 }
1227
1228 fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
1229 for clock in clocks {
1230 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1231 }
1232
1233 let ts_before = if ts_now.as_u64() > 0 {
1235 UnixNanos::from(ts_now.as_u64() - 1)
1236 } else {
1237 UnixNanos::default()
1238 };
1239
1240 let mut ts_last: Option<UnixNanos> = None;
1241 let mut shutdown_at: Option<UnixNanos> = None;
1242
1243 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
1244 let ts_event = handler.event.ts_event;
1245
1246 if let Some(ts) = ts_last
1248 && ts != ts_event
1249 {
1250 self.settle_venues(ts);
1251 self.run_venue_modules(ts);
1252 self.run_venue_liquidations(ts);
1253 }
1254
1255 ts_last = Some(ts_event);
1256 Self::set_all_clocks_time(clocks, ts_event);
1257 logging_clock_set_static_time(ts_event.as_u64());
1258
1259 handler.run();
1260 self.drain_command_queues();
1261
1262 if self.kernel.is_shutdown_requested() {
1265 self.accumulator.clear();
1266 shutdown_at = Some(ts_event);
1267 break;
1268 }
1269
1270 for clock in clocks {
1272 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1273 }
1274 }
1275
1276 if let Some(ts) = ts_last {
1278 self.settle_venues(ts);
1279 self.run_venue_modules(ts);
1280 self.run_venue_liquidations(ts);
1281 }
1282
1283 if let Some(ts_event) = shutdown_at {
1286 self.last_ns = ts_event;
1287 } else {
1288 Self::set_all_clocks_time(clocks, ts_now);
1289 logging_clock_set_static_time(ts_now.as_u64());
1290 }
1291 }
1292
1293 fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
1294 if self.kernel.is_shutdown_requested() {
1296 self.accumulator.clear();
1297 return;
1298 }
1299
1300 for clock in clocks {
1301 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1302 }
1303
1304 let mut ts_last: Option<UnixNanos> = None;
1305
1306 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
1307 let ts_event = handler.event.ts_event;
1308
1309 if let Some(ts) = ts_last
1311 && ts != ts_event
1312 {
1313 self.settle_venues(ts);
1314 self.run_venue_modules(ts);
1315 self.run_venue_liquidations(ts);
1316 }
1317
1318 ts_last = Some(ts_event);
1319 Self::set_all_clocks_time(clocks, ts_event);
1320 logging_clock_set_static_time(ts_event.as_u64());
1321
1322 handler.run();
1323 self.drain_command_queues();
1324
1325 if self.kernel.is_shutdown_requested() {
1328 self.accumulator.clear();
1329 break;
1330 }
1331
1332 for clock in clocks {
1334 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1335 }
1336 }
1337
1338 if let Some(ts) = ts_last {
1340 self.settle_venues(ts);
1341 self.run_venue_modules(ts);
1342 self.run_venue_liquidations(ts);
1343 }
1344 }
1345
1346 fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
1347 self.flush_accumulator_events(clocks, self.last_ns);
1348
1349 let mut min_next_time: Option<UnixNanos> = None;
1351
1352 for clock in clocks {
1353 let clock_ref = clock.borrow();
1354 for name in clock_ref.timer_names() {
1355 if let Some(next_time) = clock_ref.next_time_ns(name)
1356 && next_time > self.last_ns
1357 {
1358 min_next_time = Some(match min_next_time {
1359 Some(current_min) => next_time.min(current_min),
1360 None => next_time,
1361 });
1362 }
1363 }
1364 }
1365
1366 match min_next_time {
1367 None => true,
1368 Some(t) if t > self.end_ns => true,
1369 Some(t) => {
1370 self.last_ns = t;
1371 self.flush_accumulator_events(clocks, t);
1372 false
1373 }
1374 }
1375 }
1376
1377 fn set_instrument_expiration_timers(&self) -> anyhow::Result<()> {
1378 for exchange in self.venues.values() {
1379 let expirations = exchange.borrow().instrument_expirations();
1380 for (instrument_id, expiration_ns) in expirations {
1381 self.set_instrument_expiration_timer(exchange, instrument_id, expiration_ns)?;
1382 }
1383 }
1384
1385 Ok(())
1386 }
1387
1388 fn set_instrument_expiration_timer(
1389 &self,
1390 exchange: &Rc<RefCell<SimulatedExchange>>,
1391 instrument_id: InstrumentId,
1392 expiration_ns: UnixNanos,
1393 ) -> anyhow::Result<()> {
1394 if expiration_ns == UnixNanos::default() {
1395 return Ok(());
1396 }
1397
1398 let timer_name = Self::instrument_expiration_timer_name(instrument_id);
1399 let exchange: Weak<RefCell<SimulatedExchange>> = Rc::downgrade(exchange);
1400 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event: TimeEvent| {
1401 if let Some(exchange) = exchange.upgrade() {
1402 exchange
1403 .borrow_mut()
1404 .process_instrument_expirations(event.ts_event);
1405 }
1406 });
1407 let timer_key = ustr::Ustr::from(timer_name.as_str());
1408 let mut clock = self.kernel.clock.borrow_mut();
1409 if clock.timer_exists(&timer_key) {
1410 clock.cancel_timer(&timer_name);
1411 }
1412
1413 clock.set_time_alert_ns(
1414 &timer_name,
1415 expiration_ns,
1416 Some(TimeEventCallback::from(callback)),
1417 None,
1418 )?;
1419
1420 Ok(())
1421 }
1422
1423 fn instrument_expiration_timer_name(instrument_id: InstrumentId) -> String {
1424 format!("INSTRUMENT-EXPIRATION:{instrument_id}")
1425 }
1426
1427 fn schedule_funding_settlement_if_required(
1428 &self,
1429 exchange: &Rc<RefCell<SimulatedExchange>>,
1430 instrument_id: InstrumentId,
1431 settlement_ns: Option<UnixNanos>,
1432 ) {
1433 let Some(settlement_ns) = settlement_ns else {
1434 return;
1435 };
1436
1437 if let Err(e) = self.set_funding_settlement_timer(exchange, instrument_id, settlement_ns) {
1438 log::error!("Cannot schedule funding settlement for {instrument_id}: {e}");
1439 }
1440 }
1441
1442 fn set_funding_settlement_timer(
1443 &self,
1444 exchange: &Rc<RefCell<SimulatedExchange>>,
1445 instrument_id: InstrumentId,
1446 settlement_ns: UnixNanos,
1447 ) -> anyhow::Result<()> {
1448 let timer_name = Self::funding_settlement_timer_name(instrument_id);
1449 let exchange: Weak<RefCell<SimulatedExchange>> = Rc::downgrade(exchange);
1450 let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event: TimeEvent| {
1451 if let Some(exchange) = exchange.upgrade() {
1452 exchange
1453 .borrow_mut()
1454 .process_funding_settlement(instrument_id, event.ts_event);
1455 }
1456 });
1457 let timer_key = ustr::Ustr::from(timer_name.as_str());
1458 let mut clock = self.kernel.clock.borrow_mut();
1459 if clock.timer_exists(&timer_key) {
1460 clock.cancel_timer(&timer_name);
1461 }
1462
1463 clock.set_time_alert_ns(
1464 &timer_name,
1465 settlement_ns,
1466 Some(TimeEventCallback::from(callback)),
1467 None,
1468 )?;
1469
1470 Ok(())
1471 }
1472
1473 fn funding_settlement_timer_name(instrument_id: InstrumentId) -> String {
1474 format!("FUNDING-SETTLEMENT:{instrument_id}")
1475 }
1476
1477 fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
1478 let mut clocks = vec![self.kernel.clock.clone()];
1479 clocks.extend(self.kernel.trader.borrow().get_component_clocks());
1480 clocks
1481 }
1482
1483 fn max_inflight_command_ts(&self) -> Option<UnixNanos> {
1484 self.venues
1485 .values()
1486 .filter_map(|v| v.borrow().max_inflight_command_ts())
1487 .max()
1488 }
1489
1490 fn settle_venues(&self, ts_now: UnixNanos) {
1491 for exchange in self.venues.values() {
1494 exchange.borrow().set_clock_time(ts_now);
1495 }
1496
1497 loop {
1503 self.drain_command_queues();
1506
1507 let active_venues: Vec<Venue> = self
1508 .venues
1509 .iter()
1510 .filter(|(_, ex)| ex.borrow().has_pending_commands(ts_now))
1511 .map(|(id, _)| *id)
1512 .collect();
1513
1514 if active_venues.is_empty() {
1515 break;
1516 }
1517
1518 for venue_id in &active_venues {
1519 self.venues[venue_id].borrow_mut().process(ts_now);
1520 }
1521 self.drain_command_queues();
1522
1523 for venue_id in &active_venues {
1524 self.venues[venue_id]
1525 .borrow_mut()
1526 .iterate_matching_engines(ts_now);
1527 }
1528
1529 self.drain_command_queues();
1532 }
1533 }
1534
1535 fn run_venue_modules(&mut self, ts_now: UnixNanos) {
1536 if self.last_module_ns == Some(ts_now) {
1537 return;
1538 }
1539 self.last_module_ns = Some(ts_now);
1540
1541 self.drain_command_queues();
1543 self.settle_venues(ts_now);
1544
1545 for exchange in self.venues.values() {
1546 exchange.borrow_mut().process_modules(ts_now);
1547 }
1548
1549 self.drain_command_queues();
1551 self.settle_venues(ts_now);
1552 }
1553
1554 fn run_venue_liquidations(&mut self, ts_now: UnixNanos) {
1555 if self.last_liquidation_ns == Some(ts_now) {
1556 return;
1557 }
1558 self.last_liquidation_ns = Some(ts_now);
1559
1560 for exchange in self.venues.values() {
1561 exchange.borrow_mut().process_liquidations(ts_now);
1562 }
1563
1564 self.drain_command_queues();
1565 self.settle_venues(ts_now);
1566 }
1567
1568 fn drain_exec_client_events(&self) {
1569 for client in &self.exec_clients {
1570 client.drain_queued_events();
1571 }
1572 }
1573
1574 fn drain_command_queues(&self) {
1575 loop {
1579 drain_trading_cmd_queue();
1580 drain_data_cmd_queue();
1581 self.drain_exec_client_events();
1582
1583 if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
1584 break;
1585 }
1586 }
1587 }
1588
1589 fn init_command_senders() {
1590 replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
1591 replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1592 }
1593
1594 fn advance_clock_on_accumulator(
1595 accumulator: &mut TimeEventAccumulator,
1596 clock: &Rc<RefCell<dyn Clock>>,
1597 to_time_ns: UnixNanos,
1598 set_time: bool,
1599 ) {
1600 let mut clock_ref = clock.borrow_mut();
1601 let test_clock = clock_ref
1602 .as_any_mut()
1603 .downcast_mut::<TestClock>()
1604 .expect("BacktestEngine requires TestClock");
1605 accumulator.advance_clock(test_clock, to_time_ns, set_time);
1606 }
1607
1608 fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
1609 for clock in clocks {
1610 let mut clock_ref = clock.borrow_mut();
1611 let test_clock = clock_ref
1612 .as_any_mut()
1613 .downcast_mut::<TestClock>()
1614 .expect("BacktestEngine requires TestClock");
1615 test_clock.set_time(ts);
1616 }
1617 }
1618
1619 #[rustfmt::skip]
1620 fn log_pre_run(&self) {
1621 log_info!("=================================================================", color = LogColor::Cyan);
1622 log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
1623 log_info!("=================================================================", color = LogColor::Cyan);
1624
1625 let cache = self.kernel.cache.borrow();
1626 for exchange in self.venues.values() {
1627 let ex = exchange.borrow();
1628 log_info!("=================================================================", color = LogColor::Cyan);
1629 log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
1630 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1631
1632 if let Some(account) = cache.account_for_venue(&ex.id) {
1633 log::info!("Balances starting:");
1634 let account_ref: &dyn Account = match &*account {
1635 AccountAny::Margin(margin) => margin,
1636 AccountAny::Cash(cash) => cash,
1637 AccountAny::Betting(betting) => betting,
1638 };
1639
1640 for balance in account_ref.starting_balances().values() {
1641 log::info!(" {balance}");
1642 }
1643 }
1644 }
1645
1646 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1647 }
1648
1649 #[rustfmt::skip]
1650 fn log_run(&self) {
1651 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1652 let id = format_optional_uuid(self.run_id.as_ref());
1653 let start = format_optional_nanos(self.backtest_start);
1654
1655 log_info!("=================================================================", color = LogColor::Cyan);
1656 log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1657 log_info!("=================================================================", color = LogColor::Cyan);
1658 log::info!("Run config ID: {config_id}");
1659 log::info!("Run ID: {id}");
1660 log::info!("Backtest start: {start}");
1661 log::info!("Data elements: {}", self.data_len);
1662 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1663 }
1664
1665 #[rustfmt::skip]
1666 fn log_post_run(&self) {
1667 let cache = self.kernel.cache.borrow();
1668 let orders = cache.orders(None, None, None, None, None);
1669 let total_events = self.kernel.exec_engine.borrow().event_count() as usize;
1670 let total_orders = orders.len();
1671 let positions: Vec<Position> = cache
1672 .positions(None, None, None, None, None)
1673 .into_iter()
1674 .map(|p| p.cloned())
1675 .collect();
1676 let total_positions = Self::total_positions_with_snapshots(&cache, positions.len());
1677
1678 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1679 let id = format_optional_uuid(self.run_id.as_ref());
1680 let started = format_optional_nanos(self.run_started);
1681 let finished = format_optional_nanos(self.run_finished);
1682 let elapsed = format_optional_duration(self.run_started, self.run_finished);
1683 let bt_start = format_optional_nanos(self.backtest_start);
1684 let bt_end = format_optional_nanos(self.backtest_end);
1685 let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1686 let iterations = self.iteration.separate_with_underscores();
1687 let events = total_events.separate_with_underscores();
1688 let num_orders = total_orders.separate_with_underscores();
1689 let num_positions = total_positions.separate_with_underscores();
1690
1691 log_info!("=================================================================", color = LogColor::Cyan);
1692 log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1693 log_info!("=================================================================", color = LogColor::Cyan);
1694 log::info!("Run config ID: {config_id}");
1695 log::info!("Run ID: {id}");
1696 log::info!("Run started: {started}");
1697 log::info!("Run finished: {finished}");
1698 log::info!("Elapsed time: {elapsed}");
1699 log::info!("Backtest start: {bt_start}");
1700 log::info!("Backtest end: {bt_end}");
1701 log::info!("Backtest range: {bt_range}");
1702 log::info!("Iterations: {iterations}");
1703 log::info!("Total events: {events}");
1704 log::info!("Total orders: {num_orders}");
1705 log::info!("Total positions: {num_positions}");
1706
1707 if !self.config.run_analysis {
1708 return;
1709 }
1710
1711 let analyzer = self.build_analyzer(&cache, &positions);
1712 log_portfolio_performance(&analyzer);
1713 }
1714
1715 fn total_positions_with_snapshots(cache: &Cache, cached_positions_count: usize) -> usize {
1716 cached_positions_count + cache.position_snapshots(None, None).len()
1717 }
1718
1719 pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1721 if self
1722 .kernel
1723 .data_engine
1724 .borrow()
1725 .registered_clients()
1726 .contains(&client_id)
1727 {
1728 return;
1729 }
1730
1731 let venue = Venue::from(client_id.as_str());
1732 let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1733 let data_client_adapter = DataClientAdapter::new(
1734 backtest_client.client_id,
1735 None,
1736 false,
1737 false,
1738 Box::new(backtest_client),
1739 );
1740
1741 self.kernel
1742 .data_engine
1743 .borrow_mut()
1744 .register_client(data_client_adapter, None);
1745 }
1746
1747 pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1749 let client_id = ClientId::from(venue.as_str());
1750
1751 if !self
1752 .kernel
1753 .data_engine
1754 .borrow()
1755 .registered_clients()
1756 .contains(&client_id)
1757 {
1758 let backtest_client =
1759 BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1760 let data_client_adapter = DataClientAdapter::new(
1761 client_id,
1762 Some(venue),
1763 false,
1764 false,
1765 Box::new(backtest_client),
1766 );
1767 self.kernel
1768 .data_engine
1769 .borrow_mut()
1770 .register_client(data_client_adapter, Some(venue));
1771 }
1772 }
1773}
1774
1775fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1776 nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1777}
1778
1779fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1780 uuid.map_or("None".to_string(), |id| id.to_string())
1781}
1782
1783fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1784 match (start, end) {
1785 (Some(s), Some(e)) => {
1786 let delta = e.to_datetime_utc() - s.to_datetime_utc();
1787 let days = delta.num_days().abs();
1788 let hours = delta.num_hours().abs() % 24;
1789 let minutes = delta.num_minutes().abs() % 60;
1790 let seconds = delta.num_seconds().abs() % 60;
1791 let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1792 format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1793 }
1794 _ => "None".to_string(),
1795 }
1796}
1797
1798#[rustfmt::skip]
1799fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1800 log_info!("=================================================================", color = LogColor::Cyan);
1801 log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1802 log_info!("=================================================================", color = LogColor::Cyan);
1803
1804 for currency in analyzer.currencies() {
1805 log::info!(" PnL Statistics ({})", currency.code);
1806 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1807
1808 if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1809 for line in &pnl_lines {
1810 log::info!("{line}");
1811 }
1812 }
1813
1814 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1815 }
1816
1817 log::info!(" Returns Statistics");
1818 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1819
1820 for line in &analyzer.get_stats_returns_formatted() {
1821 log::info!("{line}");
1822 }
1823 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1824
1825 log::info!(" General Statistics");
1826 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1827
1828 for line in &analyzer.get_stats_general_formatted() {
1829 log::info!("{line}");
1830 }
1831 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1832}
1833
1834#[cfg(test)]
1835mod tests {
1836 use nautilus_common::enums::Environment;
1837 use nautilus_execution::engine::SnapshotAnchorer;
1838 use nautilus_model::{
1839 data::{Data, InstrumentStatus},
1840 enums::{AccountType, BookType, MarketStatus, MarketStatusAction, OmsType},
1841 identifiers::Venue,
1842 instruments::{
1843 CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1844 },
1845 types::Money,
1846 };
1847 use nautilus_system::{KernelEventStore, RegisteredComponents};
1848 use rstest::*;
1849
1850 use super::*;
1851
1852 #[derive(Debug)]
1853 struct BacktestReplayKernelEventStore {
1854 fail_restore: bool,
1855 }
1856
1857 impl KernelEventStore for BacktestReplayKernelEventStore {
1858 fn restore_parent_cache(
1859 &mut self,
1860 _instance_id: UUID4,
1861 _cache: &mut Cache,
1862 ) -> anyhow::Result<()> {
1863 if self.fail_restore {
1864 anyhow::bail!("replay restore failed");
1865 }
1866
1867 Ok(())
1868 }
1869
1870 fn open(
1871 &mut self,
1872 _instance_id: UUID4,
1873 _components: &RegisteredComponents,
1874 _environment: Environment,
1875 ) -> anyhow::Result<()> {
1876 Ok(())
1877 }
1878
1879 fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
1880 None
1881 }
1882
1883 fn seal(&mut self, _ts_init: UnixNanos) {}
1884
1885 fn run_id(&self) -> Option<&str> {
1886 Some("replay-child")
1887 }
1888
1889 fn parent_run_id(&self) -> Option<&str> {
1890 Some("seed-run")
1891 }
1892
1893 fn is_event_store_replay_configured(&self) -> bool {
1894 true
1895 }
1896
1897 fn is_halted(&self) -> bool {
1898 false
1899 }
1900 }
1901
1902 fn create_engine() -> BacktestEngine {
1903 let mut engine = BacktestEngine::new(BacktestEngineConfig::default()).unwrap();
1904 let venue_config = SimulatedVenueConfig::builder()
1905 .venue(Venue::from("BINANCE"))
1906 .oms_type(OmsType::Netting)
1907 .account_type(AccountType::Margin)
1908 .book_type(BookType::L1_MBP)
1909 .starting_balances(vec![Money::from("1_000_000 USDT")])
1910 .build();
1911 engine.add_venue(venue_config).unwrap();
1912 engine
1913 }
1914
1915 fn create_engine_with_replay_store(fail_restore: bool) -> BacktestEngine {
1916 let config = BacktestEngineConfig {
1917 load_state: true,
1918 run_analysis: false,
1919 ..Default::default()
1920 };
1921 let mut engine = BacktestEngine::new(config.clone()).unwrap();
1922 let event_store_factory = move |_instance_id: UUID4, _clock: Rc<RefCell<dyn Clock>>| {
1923 Ok::<_, anyhow::Error>(Box::new(BacktestReplayKernelEventStore { fail_restore })
1924 as Box<dyn KernelEventStore>)
1925 };
1926
1927 engine.kernel = NautilusKernel::new_with(
1928 "BacktestEngine".to_string(),
1929 config,
1930 None,
1931 Some(Box::new(event_store_factory)),
1932 )
1933 .unwrap();
1934 engine.instance_id = engine.kernel.instance_id;
1935 engine
1936 }
1937
1938 #[rstest]
1939 fn test_run_impl_event_store_replay_skips_trader_start() {
1940 let mut engine = create_engine_with_replay_store(false);
1941
1942 engine
1943 .run_impl(
1944 Some(UnixNanos::from(0)),
1945 Some(UnixNanos::from(1)),
1946 None,
1947 true,
1948 )
1949 .unwrap();
1950
1951 assert!(engine.kernel.is_event_store_replay_configured());
1952 assert!(engine.kernel.is_event_store_replay());
1953 assert!(!engine.kernel.trader.borrow().is_running());
1954 }
1955
1956 #[rstest]
1957 fn test_run_impl_event_store_replay_config_failure_errors() {
1958 let mut engine = create_engine_with_replay_store(true);
1959
1960 let error = engine
1961 .run_impl(
1962 Some(UnixNanos::from(0)),
1963 Some(UnixNanos::from(1)),
1964 None,
1965 true,
1966 )
1967 .unwrap_err();
1968
1969 assert_eq!(error.to_string(), "event-store replay did not start");
1970 assert!(engine.kernel.is_event_store_replay_configured());
1971 assert!(!engine.kernel.is_event_store_replay());
1972 assert!(!engine.kernel.trader.borrow().is_running());
1973 }
1974
1975 #[rstest]
1976 #[case(None)]
1977 #[case(Some(true))]
1978 #[case(Some(false))]
1979 fn test_new_forces_drop_instruments_on_reset_false(
1980 crypto_perpetual_ethusdt: CryptoPerpetual,
1981 #[case] user_value: Option<bool>,
1982 ) {
1983 use nautilus_common::cache::CacheConfig;
1984
1985 let config = match user_value {
1986 None => BacktestEngineConfig::builder().build(),
1987 Some(value) => BacktestEngineConfig::builder()
1988 .cache(
1989 CacheConfig::builder()
1990 .drop_instruments_on_reset(value)
1991 .build(),
1992 )
1993 .build(),
1994 };
1995 let mut engine = BacktestEngine::new(config).unwrap();
1996
1997 let venue_config = SimulatedVenueConfig::builder()
1998 .venue(Venue::from("BINANCE"))
1999 .oms_type(OmsType::Netting)
2000 .account_type(AccountType::Margin)
2001 .book_type(BookType::L1_MBP)
2002 .starting_balances(vec![Money::from("1_000_000 USDT")])
2003 .build();
2004 engine.add_venue(venue_config).unwrap();
2005
2006 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
2007 let instrument_id = instrument.id();
2008 engine.add_instrument(&instrument).unwrap();
2009
2010 engine.reset();
2011
2012 assert!(
2013 engine
2014 .kernel()
2015 .cache
2016 .borrow()
2017 .instrument(&instrument_id)
2018 .is_some(),
2019 "instrument must survive engine.reset(); user-supplied \
2020 drop_instruments_on_reset={user_value:?} must not leak through",
2021 );
2022 }
2023
2024 #[rstest]
2025 fn test_route_data_to_exchange_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
2026 let mut engine = create_engine();
2027 let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
2028 let instrument_id = instrument.id();
2029 engine.add_instrument(&instrument).unwrap();
2030
2031 let status = InstrumentStatus::new(
2032 instrument_id,
2033 MarketStatusAction::Close,
2034 UnixNanos::from(1),
2035 UnixNanos::from(1),
2036 None,
2037 None,
2038 None,
2039 None,
2040 None,
2041 );
2042
2043 engine.route_data_to_exchange(&Data::InstrumentStatus(status));
2044
2045 let exchange = engine.venues.get(&instrument_id.venue).unwrap().borrow();
2046 let market_status = exchange
2047 .get_matching_engine(&instrument_id)
2048 .unwrap()
2049 .market_status;
2050 assert_eq!(market_status, MarketStatus::Closed);
2051 }
2052}