1use std::{any::Any, cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23 actor::DataActor,
24 cache::Cache,
25 clock::{Clock, TestClock},
26 component::Component,
27 enums::LogColor,
28 log_info,
29 logging::{
30 logging_clock_set_realtime_mode, logging_clock_set_static_mode,
31 logging_clock_set_static_time,
32 },
33 runner::{
34 SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
35 drain_data_cmd_queue, drain_trading_cmd_queue, init_data_cmd_sender, init_exec_cmd_sender,
36 trading_cmd_queue_is_empty,
37 },
38};
39use nautilus_core::{UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, formatting::Separable};
40use nautilus_data::client::DataClientAdapter;
41use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel};
42use nautilus_model::{
43 accounts::{Account, AccountAny},
44 data::{Data, HasTsInit},
45 enums::{AccountType, BookType, OmsType},
46 identifiers::{AccountId, ClientId, InstrumentId, Venue},
47 instruments::{Instrument, InstrumentAny},
48 orders::Order,
49 position::Position,
50 types::{Currency, Money},
51};
52use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
53use nautilus_trading::strategy::Strategy;
54use rust_decimal::Decimal;
55
56use crate::{
57 accumulator::TimeEventAccumulator, config::BacktestEngineConfig,
58 data_client::BacktestDataClient, data_iterator::BacktestDataIterator,
59 exchange::SimulatedExchange, execution_client::BacktestExecutionClient,
60 modules::SimulationModule,
61};
62
63#[derive(Debug)]
65#[cfg_attr(
66 feature = "python",
67 pyo3::pyclass(
68 module = "nautilus_trader.core.nautilus_pyo3.backtest",
69 skip_from_py_object
70 )
71)]
72pub struct BacktestResult {
73 pub trader_id: String,
74 pub machine_id: String,
75 pub instance_id: UUID4,
76 pub run_config_id: Option<String>,
77 pub run_id: Option<UUID4>,
78 pub run_started: Option<UnixNanos>,
79 pub run_finished: Option<UnixNanos>,
80 pub backtest_start: Option<UnixNanos>,
81 pub backtest_end: Option<UnixNanos>,
82 pub elapsed_time_secs: f64,
83 pub iterations: usize,
84 pub total_events: usize,
85 pub total_orders: usize,
86 pub total_positions: usize,
87 pub stats_pnls: AHashMap<String, AHashMap<String, f64>>,
88 pub stats_returns: AHashMap<String, f64>,
89 pub stats_general: AHashMap<String, f64>,
90}
91
92#[cfg(feature = "python")]
93#[pyo3::pymethods]
94impl BacktestResult {
95 #[getter]
96 #[pyo3(name = "trader_id")]
97 fn py_trader_id(&self) -> &str {
98 &self.trader_id
99 }
100
101 #[getter]
102 #[pyo3(name = "machine_id")]
103 fn py_machine_id(&self) -> &str {
104 &self.machine_id
105 }
106
107 #[getter]
108 #[pyo3(name = "instance_id")]
109 const fn py_instance_id(&self) -> UUID4 {
110 self.instance_id
111 }
112
113 #[getter]
114 #[pyo3(name = "run_config_id")]
115 fn py_run_config_id(&self) -> Option<&str> {
116 self.run_config_id.as_deref()
117 }
118
119 #[getter]
120 #[pyo3(name = "elapsed_time_secs")]
121 const fn py_elapsed_time_secs(&self) -> f64 {
122 self.elapsed_time_secs
123 }
124
125 #[getter]
126 #[pyo3(name = "iterations")]
127 const fn py_iterations(&self) -> usize {
128 self.iterations
129 }
130
131 #[getter]
132 #[pyo3(name = "total_events")]
133 const fn py_total_events(&self) -> usize {
134 self.total_events
135 }
136
137 #[getter]
138 #[pyo3(name = "total_orders")]
139 const fn py_total_orders(&self) -> usize {
140 self.total_orders
141 }
142
143 #[getter]
144 #[pyo3(name = "total_positions")]
145 const fn py_total_positions(&self) -> usize {
146 self.total_positions
147 }
148
149 #[getter]
150 #[pyo3(name = "stats_pnls")]
151 fn py_stats_pnls(&self) -> HashMap<String, HashMap<String, f64>> {
152 self.stats_pnls
153 .iter()
154 .map(|(k, v)| {
155 (
156 k.clone(),
157 v.iter().map(|(k2, v2)| (k2.clone(), *v2)).collect(),
158 )
159 })
160 .collect()
161 }
162
163 #[getter]
164 #[pyo3(name = "stats_returns")]
165 fn py_stats_returns(&self) -> HashMap<String, f64> {
166 self.stats_returns
167 .iter()
168 .map(|(k, v)| (k.clone(), *v))
169 .collect()
170 }
171
172 #[getter]
173 #[pyo3(name = "stats_general")]
174 fn py_stats_general(&self) -> HashMap<String, f64> {
175 self.stats_general
176 .iter()
177 .map(|(k, v)| (k.clone(), *v))
178 .collect()
179 }
180
181 fn __repr__(&self) -> String {
182 format!(
183 "BacktestResult(trader_id='{}', elapsed={:.2}s, iterations={}, orders={}, positions={})",
184 self.trader_id,
185 self.elapsed_time_secs,
186 self.iterations,
187 self.total_orders,
188 self.total_positions,
189 )
190 }
191}
192
193pub struct BacktestEngine {
206 instance_id: UUID4,
207 config: BacktestEngineConfig,
208 kernel: NautilusKernel,
209 accumulator: TimeEventAccumulator,
210 run_config_id: Option<String>,
211 run_id: Option<UUID4>,
212 venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
213 exec_clients: Vec<BacktestExecutionClient>,
214 has_data: AHashSet<InstrumentId>,
215 has_book_data: AHashSet<InstrumentId>,
216 data_iterator: BacktestDataIterator,
217 data_len: usize,
218 data_stream_counter: usize,
219 ts_first: Option<UnixNanos>,
220 ts_last_data: Option<UnixNanos>,
221 iteration: usize,
222 force_stop: bool,
223 last_ns: UnixNanos,
224 last_module_ns: Option<UnixNanos>,
225 end_ns: UnixNanos,
226 run_started: Option<UnixNanos>,
227 run_finished: Option<UnixNanos>,
228 backtest_start: Option<UnixNanos>,
229 backtest_end: Option<UnixNanos>,
230}
231
232impl Debug for BacktestEngine {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct(stringify!(BacktestEngine))
235 .field("instance_id", &self.instance_id)
236 .field("run_config_id", &self.run_config_id)
237 .field("run_id", &self.run_id)
238 .finish()
239 }
240}
241
242impl BacktestEngine {
243 pub fn new(config: BacktestEngineConfig) -> anyhow::Result<Self> {
249 let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
250 Ok(Self {
251 instance_id: kernel.instance_id,
252 config,
253 accumulator: TimeEventAccumulator::new(),
254 kernel,
255 run_config_id: None,
256 run_id: None,
257 venues: AHashMap::new(),
258 exec_clients: Vec::new(),
259 has_data: AHashSet::new(),
260 has_book_data: AHashSet::new(),
261 data_iterator: BacktestDataIterator::new(),
262 data_len: 0,
263 data_stream_counter: 0,
264 ts_first: None,
265 ts_last_data: None,
266 iteration: 0,
267 force_stop: false,
268 last_ns: UnixNanos::default(),
269 last_module_ns: None,
270 end_ns: UnixNanos::default(),
271 run_started: None,
272 run_finished: None,
273 backtest_start: None,
274 backtest_end: None,
275 })
276 }
277
278 #[must_use]
280 pub const fn kernel(&self) -> &NautilusKernel {
281 &self.kernel
282 }
283
284 pub fn kernel_mut(&mut self) -> &mut NautilusKernel {
286 &mut self.kernel
287 }
288
289 #[allow(clippy::too_many_arguments)]
293 pub fn add_venue(
294 &mut self,
295 venue: Venue,
296 oms_type: OmsType,
297 account_type: AccountType,
298 book_type: BookType,
299 starting_balances: Vec<Money>,
300 base_currency: Option<Currency>,
301 default_leverage: Option<Decimal>,
302 leverages: AHashMap<InstrumentId, Decimal>,
303 modules: Vec<Box<dyn SimulationModule>>,
304 fill_model: FillModelAny,
305 fee_model: FeeModelAny,
306 latency_model: Option<Box<dyn LatencyModel>>,
307 routing: Option<bool>,
308 reject_stop_orders: Option<bool>,
309 support_gtd_orders: Option<bool>,
310 support_contingent_orders: Option<bool>,
311 use_position_ids: Option<bool>,
312 use_random_ids: Option<bool>,
313 use_reduce_only: Option<bool>,
314 use_message_queue: Option<bool>,
315 use_market_order_acks: Option<bool>,
316 bar_execution: Option<bool>,
317 bar_adaptive_high_low_ordering: Option<bool>,
318 trade_execution: Option<bool>,
319 liquidity_consumption: Option<bool>,
320 allow_cash_borrowing: Option<bool>,
321 frozen_account: Option<bool>,
322 price_protection_points: Option<u32>,
323 ) -> anyhow::Result<()> {
324 let default_leverage: Decimal = default_leverage.unwrap_or_else(|| {
325 if account_type == AccountType::Margin {
326 Decimal::from(10)
327 } else {
328 Decimal::from(0)
329 }
330 });
331
332 let exchange = SimulatedExchange::new(
333 venue,
334 oms_type,
335 account_type,
336 starting_balances,
337 base_currency,
338 default_leverage,
339 leverages,
340 modules,
341 self.kernel.cache.clone(),
342 self.kernel.clock.clone(),
343 fill_model,
344 fee_model,
345 book_type,
346 latency_model,
347 bar_execution,
348 bar_adaptive_high_low_ordering,
349 trade_execution,
350 liquidity_consumption,
351 reject_stop_orders,
352 support_gtd_orders,
353 support_contingent_orders,
354 use_position_ids,
355 use_random_ids,
356 use_reduce_only,
357 use_message_queue,
358 use_market_order_acks,
359 allow_cash_borrowing,
360 frozen_account,
361 price_protection_points,
362 )?;
363 let exchange = Rc::new(RefCell::new(exchange));
364 self.venues.insert(venue, exchange.clone());
365
366 let account_id = AccountId::from(format!("{venue}-001").as_str());
367
368 let exec_client = BacktestExecutionClient::new(
369 self.config.trader_id(),
370 account_id,
371 exchange.clone(),
372 self.kernel.cache.clone(),
373 self.kernel.clock.clone(),
374 routing,
375 frozen_account,
376 );
377
378 exchange
379 .borrow_mut()
380 .register_client(Rc::new(exec_client.clone()));
381
382 self.exec_clients.push(exec_client.clone());
383
384 self.kernel
385 .exec_engine
386 .borrow_mut()
387 .register_client(Box::new(exec_client))?;
388
389 log::info!("Adding exchange {venue} to engine");
390
391 Ok(())
392 }
393
394 pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
396 if let Some(exchange) = self.venues.get_mut(&venue) {
397 exchange.borrow_mut().set_fill_model(fill_model);
398 } else {
399 log::warn!(
400 "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
401 );
402 }
403 }
404
405 pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
414 let instrument_id = instrument.id();
415 if let Some(exchange) = self.venues.get_mut(&instrument.id().venue) {
416 if matches!(instrument, InstrumentAny::CurrencyPair(_))
417 && exchange.borrow().account_type != AccountType::Margin
418 && exchange.borrow().base_currency.is_some()
419 {
420 anyhow::bail!(
421 "Cannot add a `CurrencyPair` instrument {instrument_id} for a venue with a single-currency CASH account"
422 )
423 }
424 exchange.borrow_mut().add_instrument(instrument.clone())?;
425 } else {
426 anyhow::bail!(
427 "Cannot add an `Instrument` object without first adding its associated venue {}",
428 instrument.id().venue
429 )
430 }
431
432 self.add_market_data_client_if_not_exists(instrument.id().venue);
433
434 self.kernel
435 .data_engine
436 .borrow_mut()
437 .process(&instrument as &dyn Any);
438 log::info!(
439 "Added instrument {} to exchange {}",
440 instrument_id,
441 instrument_id.venue
442 );
443 Ok(())
444 }
445
446 pub fn add_data(
448 &mut self,
449 data: Vec<Data>,
450 _client_id: Option<ClientId>,
451 validate: bool,
452 sort: bool,
453 ) {
454 if data.is_empty() {
455 log::warn!("add_data called with empty data slice – ignoring");
456 return;
457 }
458
459 let count = data.len();
460
461 let mut to_add = data;
462
463 if sort {
464 to_add.sort_by_key(HasTsInit::ts_init);
465 }
466
467 if validate {
468 for item in &to_add {
469 let instr_id = item.instrument_id();
470 self.has_data.insert(instr_id);
471
472 if item.is_order_book_data() {
473 self.has_book_data.insert(instr_id);
474 }
475
476 self.add_market_data_client_if_not_exists(instr_id.venue);
477 }
478 }
479
480 if let Some(first) = to_add.first() {
482 let ts = first.ts_init();
483 if self.ts_first.is_none_or(|t| ts < t) {
484 self.ts_first = Some(ts);
485 }
486 }
487
488 if let Some(last) = to_add.last() {
489 let ts = last.ts_init();
490 if self.ts_last_data.is_none_or(|t| ts > t) {
491 self.ts_last_data = Some(ts);
492 }
493 }
494
495 self.data_len += count;
496 let stream_name = format!("backtest_data_{}", self.data_stream_counter);
497 self.data_stream_counter += 1;
498 self.data_iterator.add_data(&stream_name, to_add, true);
499
500 log::info!(
501 "Added {count} data element{} to BacktestEngine ({} total)",
502 if count == 1 { "" } else { "s" },
503 self.data_len,
504 );
505 }
506
507 pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
513 where
514 T: Strategy + Component + Debug + 'static,
515 {
516 self.kernel.trader.add_strategy(strategy)
517 }
518
519 pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
525 where
526 T: DataActor + Component + Debug + 'static,
527 {
528 self.kernel.trader.add_actor(actor)
529 }
530
531 pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
537 where
538 T: DataActor + Component + Debug + 'static,
539 {
540 self.kernel.trader.add_exec_algorithm(exec_algorithm)
541 }
542
543 pub fn run(
560 &mut self,
561 start: Option<UnixNanos>,
562 end: Option<UnixNanos>,
563 run_config_id: Option<String>,
564 streaming: bool,
565 ) -> anyhow::Result<()> {
566 self.run_impl(start, end, run_config_id, streaming)?;
567
568 if !streaming {
569 self.end();
570 }
571
572 Ok(())
573 }
574
575 fn run_impl(
576 &mut self,
577 start: Option<UnixNanos>,
578 end: Option<UnixNanos>,
579 run_config_id: Option<String>,
580 streaming: bool,
581 ) -> anyhow::Result<()> {
582 let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
584 let end_ns = end.unwrap_or_else(|| {
585 self.ts_last_data
586 .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
587 });
588 anyhow::ensure!(start_ns <= end_ns, "start was > end");
589 self.end_ns = end_ns;
590 self.last_ns = start_ns;
591 self.last_module_ns = None;
592
593 let clocks = self.collect_all_clocks();
595 Self::set_all_clocks_time(&clocks, start_ns);
596
597 if self.iteration == 0 {
599 self.run_config_id = run_config_id;
600 self.run_id = Some(UUID4::new());
601 self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
602 self.backtest_start = Some(start_ns);
603
604 for exchange in self.venues.values() {
606 exchange.borrow_mut().initialize_account();
607 }
608
609 Self::set_all_clocks_time(&clocks, start_ns);
611
612 self.force_stop = false;
614
615 Self::init_command_senders();
617
618 logging_clock_set_static_mode();
620 logging_clock_set_static_time(start_ns.as_u64());
621
622 self.kernel.start();
624 self.kernel.start_trader();
625
626 self.log_pre_run();
627 }
628
629 self.log_run();
630
631 let mut data = self.data_iterator.next();
633 while let Some(ref d) = data {
634 if d.ts_init() >= start_ns {
635 break;
636 }
637 data = self.data_iterator.next();
638 }
639
640 if let Some(ref d) = data {
642 let ts = d.ts_init();
643 self.last_ns = if ts.as_u64() > 0 {
644 UnixNanos::from(ts.as_u64() - 1)
645 } else {
646 UnixNanos::default()
647 };
648 } else {
649 self.last_ns = start_ns;
650 }
651
652 loop {
653 if self.force_stop {
654 log::error!("Force stop triggered, ending backtest");
655 break;
656 }
657
658 if data.is_none() {
659 if streaming {
660 break;
664 }
665 let done = self.process_next_timer(&clocks);
666 data = self.data_iterator.next();
667 if data.is_none() && done {
668 break;
669 }
670 continue;
671 }
672
673 let d = data.as_ref().unwrap();
674 let ts_init = d.ts_init();
675
676 if ts_init > end_ns {
677 break;
678 }
679
680 if ts_init > self.last_ns {
681 self.last_ns = ts_init;
682 self.advance_time_impl(ts_init, &clocks);
683 }
684
685 self.route_data_to_exchange(d);
687
688 self.kernel.data_engine.borrow_mut().process_data(d.clone());
691
692 self.drain_command_queues();
694 self.settle_venues(ts_init);
695
696 let prev_last_ns = self.last_ns;
697 data = self.data_iterator.next();
698
699 if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
701 self.flush_accumulator_events(&clocks, prev_last_ns);
702 self.run_venue_modules(prev_last_ns);
703 }
704
705 self.iteration += 1;
706 }
707
708 let ts_now = self.kernel.clock.borrow().timestamp_ns();
710 self.settle_venues(ts_now);
711 self.run_venue_modules(ts_now);
712
713 if streaming {
717 self.flush_accumulator_events(&clocks, self.last_ns);
718 } else {
719 self.flush_accumulator_events(&clocks, end_ns);
720 }
721
722 Ok(())
723 }
724
725 pub fn end(&mut self) {
727 if self.end_ns.as_u64() > 0 {
732 let clocks = self.collect_all_clocks();
733 self.flush_accumulator_events(&clocks, self.end_ns);
734 }
735
736 self.kernel.stop_trader();
738
739 self.kernel.data_engine.borrow_mut().stop();
741 self.kernel.risk_engine.borrow_mut().stop();
742 self.kernel.exec_engine.borrow_mut().stop();
743
744 let ts_now = self.kernel.clock.borrow().timestamp_ns();
746 self.settle_venues(ts_now);
747 self.run_venue_modules(ts_now);
748
749 self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
750 self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
751
752 logging_clock_set_realtime_mode();
754
755 self.log_post_run();
756 }
757
758 pub fn reset(&mut self) {
763 log::debug!("Resetting");
764
765 if self.kernel.trader.is_running() {
766 self.end();
767 }
768
769 self.kernel.data_engine.borrow_mut().stop();
771 self.kernel.data_engine.borrow_mut().reset();
772
773 self.kernel.exec_engine.borrow_mut().stop();
774 self.kernel.exec_engine.borrow_mut().reset();
775
776 self.kernel.risk_engine.borrow_mut().stop();
777 self.kernel.risk_engine.borrow_mut().reset();
778
779 if let Err(e) = self.kernel.trader.reset() {
781 log::error!("Error resetting trader: {e:?}");
782 }
783
784 for exchange in self.venues.values() {
786 exchange.borrow_mut().reset();
787 }
788
789 self.run_config_id = None;
791 self.run_id = None;
792 self.run_started = None;
793 self.run_finished = None;
794 self.backtest_start = None;
795 self.backtest_end = None;
796 self.iteration = 0;
797 self.force_stop = false;
798 self.last_ns = UnixNanos::default();
799 self.last_module_ns = None;
800 self.end_ns = UnixNanos::default();
801
802 self.accumulator.clear();
803
804 self.data_iterator.reset_all_cursors();
806
807 log::info!("Reset");
808 }
809
810 pub fn sort_data(&mut self) {
815 log::info!("Data sort requested (iterator maintains sort order)");
820 }
821
822 pub fn clear_data(&mut self) {
824 self.has_data.clear();
825 self.has_book_data.clear();
826 self.data_iterator = BacktestDataIterator::new();
827 self.data_len = 0;
828 self.data_stream_counter = 0;
829 self.ts_first = None;
830 self.ts_last_data = None;
831 }
832
833 pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
839 self.kernel.trader.clear_strategies()
840 }
841
842 pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
848 self.kernel.trader.clear_exec_algorithms()
849 }
850
851 pub fn dispose(&mut self) {
853 self.clear_data();
854 self.kernel.dispose();
855 }
856
857 #[must_use]
859 pub fn get_result(&self) -> BacktestResult {
860 let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
861 (Some(start), Some(end)) => {
862 (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
863 }
864 _ => 0.0,
865 };
866
867 let cache = self.kernel.cache.borrow();
868 let orders = cache.orders(None, None, None, None, None);
869 let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
870 let total_orders = orders.len();
871 let positions = cache.positions(None, None, None, None, None);
872 let total_positions = positions.len();
873
874 let analyzer = self.build_analyzer(&cache, &positions);
875 let mut stats_pnls = AHashMap::new();
876 for currency in analyzer.currencies() {
877 if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
878 stats_pnls.insert(currency.code.to_string(), pnls);
879 }
880 }
881
882 let stats_returns = analyzer.get_performance_stats_returns();
883 let stats_general = analyzer.get_performance_stats_general();
884
885 BacktestResult {
886 trader_id: self.config.trader_id().to_string(),
887 machine_id: self.kernel.machine_id.clone(),
888 instance_id: self.instance_id,
889 run_config_id: self.run_config_id.clone(),
890 run_id: self.run_id,
891 run_started: self.run_started,
892 run_finished: self.run_finished,
893 backtest_start: self.backtest_start,
894 backtest_end: self.backtest_end,
895 elapsed_time_secs,
896 iterations: self.iteration,
897 total_events,
898 total_orders,
899 total_positions,
900 stats_pnls,
901 stats_returns,
902 stats_general,
903 }
904 }
905
906 fn build_analyzer(&self, cache: &Cache, positions: &[&Position]) -> PortfolioAnalyzer {
907 let mut analyzer = PortfolioAnalyzer::default();
908 let positions_owned: Vec<_> = positions.iter().map(|p| (*p).clone()).collect();
909
910 for venue in self.venues.keys() {
912 if let Some(account) = cache.account_for_venue(venue) {
913 let account_ref: &dyn Account = match account {
914 AccountAny::Cash(cash) => cash,
915 AccountAny::Margin(margin) => margin,
916 };
917 for (currency, money) in account_ref.starting_balances() {
918 analyzer
919 .account_balances_starting
920 .entry(currency)
921 .and_modify(|existing| *existing = *existing + money)
922 .or_insert(money);
923 }
924 for (currency, money) in account_ref.balances_total() {
925 analyzer
926 .account_balances
927 .entry(currency)
928 .and_modify(|existing| *existing = *existing + money)
929 .or_insert(money);
930 }
931 }
932 }
933
934 analyzer.add_positions(&positions_owned);
935 analyzer
936 }
937
938 fn route_data_to_exchange(&self, data: &Data) {
939 let venue = data.instrument_id().venue;
940 if let Some(exchange) = self.venues.get(&venue) {
941 let mut ex = exchange.borrow_mut();
942 match data {
943 Data::Delta(delta) => ex.process_order_book_delta(*delta),
944 Data::Deltas(deltas) => ex.process_order_book_deltas((**deltas).clone()),
945 Data::Quote(quote) => ex.process_quote_tick(quote),
946 Data::Trade(trade) => ex.process_trade_tick(trade),
947 Data::Bar(bar) => ex.process_bar(*bar),
948 Data::InstrumentClose(close) => ex.process_instrument_close(*close),
949 Data::Depth10(depth) => ex.process_order_book_depth10(depth),
950 Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) => {
951 }
953 }
954 } else {
955 log::warn!("No exchange found for venue {venue}, data not routed");
956 }
957 }
958
959 fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
960 for clock in clocks {
962 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
963 }
964
965 let ts_before = if ts_now.as_u64() > 0 {
967 UnixNanos::from(ts_now.as_u64() - 1)
968 } else {
969 UnixNanos::default()
970 };
971
972 let mut ts_last: Option<UnixNanos> = None;
973
974 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
975 let ts_event = handler.event.ts_event;
976
977 if let Some(ts) = ts_last
979 && ts != ts_event
980 {
981 self.settle_venues(ts);
982 self.run_venue_modules(ts);
983 }
984
985 ts_last = Some(ts_event);
986 Self::set_all_clocks_time(clocks, ts_event);
987 logging_clock_set_static_time(ts_event.as_u64());
988
989 handler.run();
990 self.drain_command_queues();
991
992 for clock in clocks {
994 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
995 }
996 }
997
998 if let Some(ts) = ts_last {
1000 self.settle_venues(ts);
1001 self.run_venue_modules(ts);
1002 }
1003
1004 Self::set_all_clocks_time(clocks, ts_now);
1005 logging_clock_set_static_time(ts_now.as_u64());
1006 }
1007
1008 fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
1009 for clock in clocks {
1010 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1011 }
1012
1013 let mut ts_last: Option<UnixNanos> = None;
1014
1015 while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
1016 let ts_event = handler.event.ts_event;
1017
1018 if let Some(ts) = ts_last
1020 && ts != ts_event
1021 {
1022 self.settle_venues(ts);
1023 self.run_venue_modules(ts);
1024 }
1025
1026 ts_last = Some(ts_event);
1027 Self::set_all_clocks_time(clocks, ts_event);
1028 logging_clock_set_static_time(ts_event.as_u64());
1029
1030 handler.run();
1031 self.drain_command_queues();
1032
1033 for clock in clocks {
1035 Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1036 }
1037 }
1038
1039 if let Some(ts) = ts_last {
1041 self.settle_venues(ts);
1042 self.run_venue_modules(ts);
1043 }
1044 }
1045
1046 fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
1047 self.flush_accumulator_events(clocks, self.last_ns);
1048
1049 let mut min_next_time: Option<UnixNanos> = None;
1051
1052 for clock in clocks {
1053 let clock_ref = clock.borrow();
1054 for name in clock_ref.timer_names() {
1055 if let Some(next_time) = clock_ref.next_time_ns(name)
1056 && next_time > self.last_ns
1057 {
1058 min_next_time = Some(match min_next_time {
1059 Some(current_min) => next_time.min(current_min),
1060 None => next_time,
1061 });
1062 }
1063 }
1064 }
1065
1066 match min_next_time {
1067 None => true,
1068 Some(t) if t > self.end_ns => true,
1069 Some(t) => {
1070 self.last_ns = t;
1071 self.flush_accumulator_events(clocks, t);
1072 false
1073 }
1074 }
1075 }
1076
1077 fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
1078 let mut clocks = vec![self.kernel.clock.clone()];
1079 clocks.extend(self.kernel.trader.get_component_clocks());
1080 clocks
1081 }
1082
1083 fn settle_venues(&self, ts_now: UnixNanos) {
1084 for exchange in self.venues.values() {
1087 exchange.borrow().set_clock_time(ts_now);
1088 }
1089
1090 loop {
1096 let active_venues: Vec<Venue> = self
1097 .venues
1098 .iter()
1099 .filter(|(_, ex)| ex.borrow().has_pending_commands(ts_now))
1100 .map(|(id, _)| *id)
1101 .collect();
1102
1103 if active_venues.is_empty() {
1104 break;
1105 }
1106
1107 for venue_id in &active_venues {
1108 self.venues[venue_id].borrow_mut().process(ts_now);
1109 }
1110 self.drain_command_queues();
1111
1112 for venue_id in &active_venues {
1113 self.venues[venue_id]
1114 .borrow_mut()
1115 .iterate_matching_engines(ts_now);
1116 }
1117
1118 self.drain_command_queues();
1121 }
1122 }
1123
1124 fn run_venue_modules(&mut self, ts_now: UnixNanos) {
1125 if self.last_module_ns == Some(ts_now) {
1126 return;
1127 }
1128 self.last_module_ns = Some(ts_now);
1129
1130 self.drain_command_queues();
1132 self.settle_venues(ts_now);
1133
1134 for exchange in self.venues.values() {
1135 exchange.borrow().process_modules(ts_now);
1136 }
1137
1138 self.drain_command_queues();
1140 self.settle_venues(ts_now);
1141 }
1142
1143 fn drain_exec_client_events(&self) {
1144 for client in &self.exec_clients {
1145 client.drain_queued_events();
1146 }
1147 }
1148
1149 fn drain_command_queues(&self) {
1150 loop {
1154 drain_trading_cmd_queue();
1155 drain_data_cmd_queue();
1156 self.drain_exec_client_events();
1157
1158 if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
1159 break;
1160 }
1161 }
1162 }
1163
1164 fn init_command_senders() {
1165 init_data_cmd_sender(Arc::new(SyncDataCommandSender));
1166 init_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1167 }
1168
1169 fn advance_clock_on_accumulator(
1170 accumulator: &mut TimeEventAccumulator,
1171 clock: &Rc<RefCell<dyn Clock>>,
1172 to_time_ns: UnixNanos,
1173 set_time: bool,
1174 ) {
1175 let mut clock_ref = clock.borrow_mut();
1176 let test_clock = clock_ref
1177 .as_any_mut()
1178 .downcast_mut::<TestClock>()
1179 .expect("BacktestEngine requires TestClock");
1180 accumulator.advance_clock(test_clock, to_time_ns, set_time);
1181 }
1182
1183 fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
1184 for clock in clocks {
1185 let mut clock_ref = clock.borrow_mut();
1186 let test_clock = clock_ref
1187 .as_any_mut()
1188 .downcast_mut::<TestClock>()
1189 .expect("BacktestEngine requires TestClock");
1190 test_clock.set_time(ts);
1191 }
1192 }
1193
1194 #[rustfmt::skip]
1195 fn log_pre_run(&self) {
1196 log_info!("=================================================================", color = LogColor::Cyan);
1197 log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
1198 log_info!("=================================================================", color = LogColor::Cyan);
1199
1200 for exchange in self.venues.values() {
1201 let ex = exchange.borrow();
1202 log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
1203 }
1204
1205 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1206 }
1207
1208 #[rustfmt::skip]
1209 fn log_run(&self) {
1210 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1211 let id = format_optional_uuid(self.run_id.as_ref());
1212 let start = format_optional_nanos(self.backtest_start);
1213
1214 log_info!("=================================================================", color = LogColor::Cyan);
1215 log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1216 log_info!("=================================================================", color = LogColor::Cyan);
1217 log::info!("Run config ID: {config_id}");
1218 log::info!("Run ID: {id}");
1219 log::info!("Backtest start: {start}");
1220 log::info!("Data elements: {}", self.data_len);
1221 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1222 }
1223
1224 #[rustfmt::skip]
1225 fn log_post_run(&self) {
1226 let cache = self.kernel.cache.borrow();
1227 let orders = cache.orders(None, None, None, None, None);
1228 let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
1229 let total_orders = orders.len();
1230 let positions = cache.positions(None, None, None, None, None);
1231 let total_positions = positions.len();
1232
1233 let config_id = self.run_config_id.as_deref().unwrap_or("None");
1234 let id = format_optional_uuid(self.run_id.as_ref());
1235 let started = format_optional_nanos(self.run_started);
1236 let finished = format_optional_nanos(self.run_finished);
1237 let elapsed = format_optional_duration(self.run_started, self.run_finished);
1238 let bt_start = format_optional_nanos(self.backtest_start);
1239 let bt_end = format_optional_nanos(self.backtest_end);
1240 let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1241 let iterations = self.iteration.separate_with_underscores();
1242 let events = total_events.separate_with_underscores();
1243 let num_orders = total_orders.separate_with_underscores();
1244 let num_positions = total_positions.separate_with_underscores();
1245
1246 log_info!("=================================================================", color = LogColor::Cyan);
1247 log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1248 log_info!("=================================================================", color = LogColor::Cyan);
1249 log::info!("Run config ID: {config_id}");
1250 log::info!("Run ID: {id}");
1251 log::info!("Run started: {started}");
1252 log::info!("Run finished: {finished}");
1253 log::info!("Elapsed time: {elapsed}");
1254 log::info!("Backtest start: {bt_start}");
1255 log::info!("Backtest end: {bt_end}");
1256 log::info!("Backtest range: {bt_range}");
1257 log::info!("Iterations: {iterations}");
1258 log::info!("Total events: {events}");
1259 log::info!("Total orders: {num_orders}");
1260 log::info!("Total positions: {num_positions}");
1261
1262 if !self.config.run_analysis {
1263 return;
1264 }
1265
1266 let analyzer = self.build_analyzer(&cache, &positions);
1267 log_portfolio_performance(&analyzer);
1268 }
1269
1270 pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1272 if self
1273 .kernel
1274 .data_engine
1275 .borrow()
1276 .registered_clients()
1277 .contains(&client_id)
1278 {
1279 return;
1280 }
1281
1282 let venue = Venue::from(client_id.as_str());
1283 let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1284 let data_client_adapter = DataClientAdapter::new(
1285 backtest_client.client_id,
1286 None,
1287 false,
1288 false,
1289 Box::new(backtest_client),
1290 );
1291
1292 self.kernel
1293 .data_engine
1294 .borrow_mut()
1295 .register_client(data_client_adapter, None);
1296 }
1297
1298 pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1300 let client_id = ClientId::from(venue.as_str());
1301
1302 if !self
1303 .kernel
1304 .data_engine
1305 .borrow()
1306 .registered_clients()
1307 .contains(&client_id)
1308 {
1309 let backtest_client =
1310 BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1311 let data_client_adapter = DataClientAdapter::new(
1312 client_id,
1313 Some(venue),
1314 false,
1315 false,
1316 Box::new(backtest_client),
1317 );
1318 self.kernel
1319 .data_engine
1320 .borrow_mut()
1321 .register_client(data_client_adapter, Some(venue));
1322 }
1323 }
1324}
1325
1326fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1327 nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1328}
1329
1330fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1331 uuid.map_or("None".to_string(), |id| id.to_string())
1332}
1333
1334fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1335 match (start, end) {
1336 (Some(s), Some(e)) => {
1337 let delta = e.to_datetime_utc() - s.to_datetime_utc();
1338 let days = delta.num_days().abs();
1339 let hours = delta.num_hours().abs() % 24;
1340 let minutes = delta.num_minutes().abs() % 60;
1341 let seconds = delta.num_seconds().abs() % 60;
1342 let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1343 format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1344 }
1345 _ => "None".to_string(),
1346 }
1347}
1348
1349#[rustfmt::skip]
1350fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1351 log_info!("=================================================================", color = LogColor::Cyan);
1352 log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1353 log_info!("=================================================================", color = LogColor::Cyan);
1354
1355 for currency in analyzer.currencies() {
1356 log::info!(" PnL Statistics ({})", currency.code);
1357 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1358
1359 if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1360 for line in &pnl_lines {
1361 log::info!("{line}");
1362 }
1363 }
1364
1365 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1366 }
1367
1368 log::info!(" Returns Statistics");
1369 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1370 for line in &analyzer.get_stats_returns_formatted() {
1371 log::info!("{line}");
1372 }
1373 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1374
1375 log::info!(" General Statistics");
1376 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1377 for line in &analyzer.get_stats_general_formatted() {
1378 log::info!("{line}");
1379 }
1380 log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1381}