Skip to main content

nautilus_backtest/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `BacktestEngine` for backtesting on historical data.
17
18use std::{any::Any, cell::RefCell, collections::HashMap, fmt::Debug, rc::Rc, sync::Arc};
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_analysis::analyzer::PortfolioAnalyzer;
22use nautilus_common::{
23    actor::DataActor,
24    cache::Cache,
25    clock::{Clock, TestClock},
26    component::Component,
27    enums::LogColor,
28    log_info,
29    logging::{
30        logging_clock_set_realtime_mode, logging_clock_set_static_mode,
31        logging_clock_set_static_time,
32    },
33    runner::{
34        SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
35        drain_data_cmd_queue, drain_trading_cmd_queue, init_data_cmd_sender, init_exec_cmd_sender,
36        trading_cmd_queue_is_empty,
37    },
38};
39use nautilus_core::{UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, formatting::Separable};
40use nautilus_data::client::DataClientAdapter;
41use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny, latency::LatencyModel};
42use nautilus_model::{
43    accounts::{Account, AccountAny},
44    data::{Data, HasTsInit},
45    enums::{AccountType, BookType, OmsType},
46    identifiers::{AccountId, ClientId, InstrumentId, Venue},
47    instruments::{Instrument, InstrumentAny},
48    orders::Order,
49    position::Position,
50    types::{Currency, Money},
51};
52use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
53use nautilus_trading::strategy::Strategy;
54use rust_decimal::Decimal;
55
56use crate::{
57    accumulator::TimeEventAccumulator, config::BacktestEngineConfig,
58    data_client::BacktestDataClient, data_iterator::BacktestDataIterator,
59    exchange::SimulatedExchange, execution_client::BacktestExecutionClient,
60    modules::SimulationModule,
61};
62
63/// Results from a completed backtest run.
64#[derive(Debug)]
65#[cfg_attr(
66    feature = "python",
67    pyo3::pyclass(
68        module = "nautilus_trader.core.nautilus_pyo3.backtest",
69        skip_from_py_object
70    )
71)]
72pub struct BacktestResult {
73    pub trader_id: String,
74    pub machine_id: String,
75    pub instance_id: UUID4,
76    pub run_config_id: Option<String>,
77    pub run_id: Option<UUID4>,
78    pub run_started: Option<UnixNanos>,
79    pub run_finished: Option<UnixNanos>,
80    pub backtest_start: Option<UnixNanos>,
81    pub backtest_end: Option<UnixNanos>,
82    pub elapsed_time_secs: f64,
83    pub iterations: usize,
84    pub total_events: usize,
85    pub total_orders: usize,
86    pub total_positions: usize,
87    pub stats_pnls: AHashMap<String, AHashMap<String, f64>>,
88    pub stats_returns: AHashMap<String, f64>,
89    pub stats_general: AHashMap<String, f64>,
90}
91
92#[cfg(feature = "python")]
93#[pyo3::pymethods]
94impl BacktestResult {
95    #[getter]
96    #[pyo3(name = "trader_id")]
97    fn py_trader_id(&self) -> &str {
98        &self.trader_id
99    }
100
101    #[getter]
102    #[pyo3(name = "machine_id")]
103    fn py_machine_id(&self) -> &str {
104        &self.machine_id
105    }
106
107    #[getter]
108    #[pyo3(name = "instance_id")]
109    const fn py_instance_id(&self) -> UUID4 {
110        self.instance_id
111    }
112
113    #[getter]
114    #[pyo3(name = "run_config_id")]
115    fn py_run_config_id(&self) -> Option<&str> {
116        self.run_config_id.as_deref()
117    }
118
119    #[getter]
120    #[pyo3(name = "elapsed_time_secs")]
121    const fn py_elapsed_time_secs(&self) -> f64 {
122        self.elapsed_time_secs
123    }
124
125    #[getter]
126    #[pyo3(name = "iterations")]
127    const fn py_iterations(&self) -> usize {
128        self.iterations
129    }
130
131    #[getter]
132    #[pyo3(name = "total_events")]
133    const fn py_total_events(&self) -> usize {
134        self.total_events
135    }
136
137    #[getter]
138    #[pyo3(name = "total_orders")]
139    const fn py_total_orders(&self) -> usize {
140        self.total_orders
141    }
142
143    #[getter]
144    #[pyo3(name = "total_positions")]
145    const fn py_total_positions(&self) -> usize {
146        self.total_positions
147    }
148
149    #[getter]
150    #[pyo3(name = "stats_pnls")]
151    fn py_stats_pnls(&self) -> HashMap<String, HashMap<String, f64>> {
152        self.stats_pnls
153            .iter()
154            .map(|(k, v)| {
155                (
156                    k.clone(),
157                    v.iter().map(|(k2, v2)| (k2.clone(), *v2)).collect(),
158                )
159            })
160            .collect()
161    }
162
163    #[getter]
164    #[pyo3(name = "stats_returns")]
165    fn py_stats_returns(&self) -> HashMap<String, f64> {
166        self.stats_returns
167            .iter()
168            .map(|(k, v)| (k.clone(), *v))
169            .collect()
170    }
171
172    #[getter]
173    #[pyo3(name = "stats_general")]
174    fn py_stats_general(&self) -> HashMap<String, f64> {
175        self.stats_general
176            .iter()
177            .map(|(k, v)| (k.clone(), *v))
178            .collect()
179    }
180
181    fn __repr__(&self) -> String {
182        format!(
183            "BacktestResult(trader_id='{}', elapsed={:.2}s, iterations={}, orders={}, positions={})",
184            self.trader_id,
185            self.elapsed_time_secs,
186            self.iterations,
187            self.total_orders,
188            self.total_positions,
189        )
190    }
191}
192
193/// Core backtesting engine for running event-driven strategy backtests on historical data.
194///
195/// The `BacktestEngine` provides a high-fidelity simulation environment that processes
196/// historical market data chronologically through an event-driven architecture. It maintains
197/// simulated exchanges with realistic order matching and execution, allowing strategies
198/// to be tested exactly as they would run in live trading:
199///
200/// - Event-driven data replay with configurable latency models.
201/// - Multi-venue and multi-asset support.
202/// - Realistic order matching and execution simulation.
203/// - Strategy and portfolio performance analysis.
204/// - Seamless transition from backtesting to live trading.
205pub struct BacktestEngine {
206    instance_id: UUID4,
207    config: BacktestEngineConfig,
208    kernel: NautilusKernel,
209    accumulator: TimeEventAccumulator,
210    run_config_id: Option<String>,
211    run_id: Option<UUID4>,
212    venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
213    exec_clients: Vec<BacktestExecutionClient>,
214    has_data: AHashSet<InstrumentId>,
215    has_book_data: AHashSet<InstrumentId>,
216    data_iterator: BacktestDataIterator,
217    data_len: usize,
218    data_stream_counter: usize,
219    ts_first: Option<UnixNanos>,
220    ts_last_data: Option<UnixNanos>,
221    iteration: usize,
222    force_stop: bool,
223    last_ns: UnixNanos,
224    last_module_ns: Option<UnixNanos>,
225    end_ns: UnixNanos,
226    run_started: Option<UnixNanos>,
227    run_finished: Option<UnixNanos>,
228    backtest_start: Option<UnixNanos>,
229    backtest_end: Option<UnixNanos>,
230}
231
232impl Debug for BacktestEngine {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct(stringify!(BacktestEngine))
235            .field("instance_id", &self.instance_id)
236            .field("run_config_id", &self.run_config_id)
237            .field("run_id", &self.run_id)
238            .finish()
239    }
240}
241
242impl BacktestEngine {
243    /// Create a new [`BacktestEngine`] instance.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the core `NautilusKernel` fails to initialize.
248    pub fn new(config: BacktestEngineConfig) -> anyhow::Result<Self> {
249        let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
250        Ok(Self {
251            instance_id: kernel.instance_id,
252            config,
253            accumulator: TimeEventAccumulator::new(),
254            kernel,
255            run_config_id: None,
256            run_id: None,
257            venues: AHashMap::new(),
258            exec_clients: Vec::new(),
259            has_data: AHashSet::new(),
260            has_book_data: AHashSet::new(),
261            data_iterator: BacktestDataIterator::new(),
262            data_len: 0,
263            data_stream_counter: 0,
264            ts_first: None,
265            ts_last_data: None,
266            iteration: 0,
267            force_stop: false,
268            last_ns: UnixNanos::default(),
269            last_module_ns: None,
270            end_ns: UnixNanos::default(),
271            run_started: None,
272            run_finished: None,
273            backtest_start: None,
274            backtest_end: None,
275        })
276    }
277
278    /// Returns a reference to the underlying kernel.
279    #[must_use]
280    pub const fn kernel(&self) -> &NautilusKernel {
281        &self.kernel
282    }
283
284    /// Returns a mutable reference to the underlying kernel.
285    pub fn kernel_mut(&mut self) -> &mut NautilusKernel {
286        &mut self.kernel
287    }
288
289    /// # Errors
290    ///
291    /// Returns an error if initializing the simulated exchange for the venue fails.
292    #[allow(clippy::too_many_arguments)]
293    pub fn add_venue(
294        &mut self,
295        venue: Venue,
296        oms_type: OmsType,
297        account_type: AccountType,
298        book_type: BookType,
299        starting_balances: Vec<Money>,
300        base_currency: Option<Currency>,
301        default_leverage: Option<Decimal>,
302        leverages: AHashMap<InstrumentId, Decimal>,
303        modules: Vec<Box<dyn SimulationModule>>,
304        fill_model: FillModelAny,
305        fee_model: FeeModelAny,
306        latency_model: Option<Box<dyn LatencyModel>>,
307        routing: Option<bool>,
308        reject_stop_orders: Option<bool>,
309        support_gtd_orders: Option<bool>,
310        support_contingent_orders: Option<bool>,
311        use_position_ids: Option<bool>,
312        use_random_ids: Option<bool>,
313        use_reduce_only: Option<bool>,
314        use_message_queue: Option<bool>,
315        use_market_order_acks: Option<bool>,
316        bar_execution: Option<bool>,
317        bar_adaptive_high_low_ordering: Option<bool>,
318        trade_execution: Option<bool>,
319        liquidity_consumption: Option<bool>,
320        allow_cash_borrowing: Option<bool>,
321        frozen_account: Option<bool>,
322        price_protection_points: Option<u32>,
323    ) -> anyhow::Result<()> {
324        let default_leverage: Decimal = default_leverage.unwrap_or_else(|| {
325            if account_type == AccountType::Margin {
326                Decimal::from(10)
327            } else {
328                Decimal::from(0)
329            }
330        });
331
332        let exchange = SimulatedExchange::new(
333            venue,
334            oms_type,
335            account_type,
336            starting_balances,
337            base_currency,
338            default_leverage,
339            leverages,
340            modules,
341            self.kernel.cache.clone(),
342            self.kernel.clock.clone(),
343            fill_model,
344            fee_model,
345            book_type,
346            latency_model,
347            bar_execution,
348            bar_adaptive_high_low_ordering,
349            trade_execution,
350            liquidity_consumption,
351            reject_stop_orders,
352            support_gtd_orders,
353            support_contingent_orders,
354            use_position_ids,
355            use_random_ids,
356            use_reduce_only,
357            use_message_queue,
358            use_market_order_acks,
359            allow_cash_borrowing,
360            frozen_account,
361            price_protection_points,
362        )?;
363        let exchange = Rc::new(RefCell::new(exchange));
364        self.venues.insert(venue, exchange.clone());
365
366        let account_id = AccountId::from(format!("{venue}-001").as_str());
367
368        let exec_client = BacktestExecutionClient::new(
369            self.config.trader_id(),
370            account_id,
371            exchange.clone(),
372            self.kernel.cache.clone(),
373            self.kernel.clock.clone(),
374            routing,
375            frozen_account,
376        );
377
378        exchange
379            .borrow_mut()
380            .register_client(Rc::new(exec_client.clone()));
381
382        self.exec_clients.push(exec_client.clone());
383
384        self.kernel
385            .exec_engine
386            .borrow_mut()
387            .register_client(Box::new(exec_client))?;
388
389        log::info!("Adding exchange {venue} to engine");
390
391        Ok(())
392    }
393
394    /// Changes the fill model for the specified venue.
395    pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
396        if let Some(exchange) = self.venues.get_mut(&venue) {
397            exchange.borrow_mut().set_fill_model(fill_model);
398        } else {
399            log::warn!(
400                "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
401            );
402        }
403    }
404
405    /// Adds an instrument to the backtest engine for the specified venue.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if:
410    /// - The instrument's associated venue has not been added via `add_venue`.
411    /// - Attempting to add a `CurrencyPair` instrument for a single-currency CASH account.
412    ///
413    pub fn add_instrument(&mut self, instrument: InstrumentAny) -> anyhow::Result<()> {
414        let instrument_id = instrument.id();
415        if let Some(exchange) = self.venues.get_mut(&instrument.id().venue) {
416            if matches!(instrument, InstrumentAny::CurrencyPair(_))
417                && exchange.borrow().account_type != AccountType::Margin
418                && exchange.borrow().base_currency.is_some()
419            {
420                anyhow::bail!(
421                    "Cannot add a `CurrencyPair` instrument {instrument_id} for a venue with a single-currency CASH account"
422                )
423            }
424            exchange.borrow_mut().add_instrument(instrument.clone())?;
425        } else {
426            anyhow::bail!(
427                "Cannot add an `Instrument` object without first adding its associated venue {}",
428                instrument.id().venue
429            )
430        }
431
432        self.add_market_data_client_if_not_exists(instrument.id().venue);
433
434        self.kernel
435            .data_engine
436            .borrow_mut()
437            .process(&instrument as &dyn Any);
438        log::info!(
439            "Added instrument {} to exchange {}",
440            instrument_id,
441            instrument_id.venue
442        );
443        Ok(())
444    }
445
446    /// Adds market data to the engine for replay during the backtest run.
447    pub fn add_data(
448        &mut self,
449        data: Vec<Data>,
450        _client_id: Option<ClientId>,
451        validate: bool,
452        sort: bool,
453    ) {
454        if data.is_empty() {
455            log::warn!("add_data called with empty data slice – ignoring");
456            return;
457        }
458
459        let count = data.len();
460
461        let mut to_add = data;
462
463        if sort {
464            to_add.sort_by_key(HasTsInit::ts_init);
465        }
466
467        if validate {
468            for item in &to_add {
469                let instr_id = item.instrument_id();
470                self.has_data.insert(instr_id);
471
472                if item.is_order_book_data() {
473                    self.has_book_data.insert(instr_id);
474                }
475
476                self.add_market_data_client_if_not_exists(instr_id.venue);
477            }
478        }
479
480        // Track time bounds for start/end defaults
481        if let Some(first) = to_add.first() {
482            let ts = first.ts_init();
483            if self.ts_first.is_none_or(|t| ts < t) {
484                self.ts_first = Some(ts);
485            }
486        }
487
488        if let Some(last) = to_add.last() {
489            let ts = last.ts_init();
490            if self.ts_last_data.is_none_or(|t| ts > t) {
491                self.ts_last_data = Some(ts);
492            }
493        }
494
495        self.data_len += count;
496        let stream_name = format!("backtest_data_{}", self.data_stream_counter);
497        self.data_stream_counter += 1;
498        self.data_iterator.add_data(&stream_name, to_add, true);
499
500        log::info!(
501            "Added {count} data element{} to BacktestEngine ({} total)",
502            if count == 1 { "" } else { "s" },
503            self.data_len,
504        );
505    }
506
507    /// Adds a strategy to the backtest engine.
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if the strategy is already registered or the trader is running.
512    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
513    where
514        T: Strategy + Component + Debug + 'static,
515    {
516        self.kernel.trader.add_strategy(strategy)
517    }
518
519    /// Adds an actor to the backtest engine.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the actor is already registered or the trader is running.
524    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
525    where
526        T: DataActor + Component + Debug + 'static,
527    {
528        self.kernel.trader.add_actor(actor)
529    }
530
531    /// Adds an execution algorithm to the backtest engine.
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if the algorithm is already registered or the trader is running.
536    pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
537    where
538        T: DataActor + Component + Debug + 'static,
539    {
540        self.kernel.trader.add_exec_algorithm(exec_algorithm)
541    }
542
543    /// Run a backtest.
544    ///
545    /// Processes all data chronologically. When `streaming` is false (default),
546    /// finalizes the run via [`end`](Self::end). When `streaming` is true, the
547    /// run pauses without finalizing so additional data batches can be loaded.
548    /// Timer advancement stops at data exhaustion to avoid producing synthetic
549    /// events (e.g. zero-volume bars) past the current batch.
550    ///
551    /// Streaming workflow:
552    /// 1. Add initial data and strategies
553    /// 2. Loop: call `run(streaming=true)`, `clear_data()`, `add_data(next_batch)`
554    /// 3. After all batches: call `end()` to finalize
555    ///
556    /// # Errors
557    ///
558    /// Returns an error if the backtest encounters an unrecoverable state.
559    pub fn run(
560        &mut self,
561        start: Option<UnixNanos>,
562        end: Option<UnixNanos>,
563        run_config_id: Option<String>,
564        streaming: bool,
565    ) -> anyhow::Result<()> {
566        self.run_impl(start, end, run_config_id, streaming)?;
567
568        if !streaming {
569            self.end();
570        }
571
572        Ok(())
573    }
574
575    fn run_impl(
576        &mut self,
577        start: Option<UnixNanos>,
578        end: Option<UnixNanos>,
579        run_config_id: Option<String>,
580        streaming: bool,
581    ) -> anyhow::Result<()> {
582        // Determine time boundaries
583        let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
584        let end_ns = end.unwrap_or_else(|| {
585            self.ts_last_data
586                .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
587        });
588        anyhow::ensure!(start_ns <= end_ns, "start was > end");
589        self.end_ns = end_ns;
590        self.last_ns = start_ns;
591        self.last_module_ns = None;
592
593        // Set all component clocks to start
594        let clocks = self.collect_all_clocks();
595        Self::set_all_clocks_time(&clocks, start_ns);
596
597        // First-iteration initialization
598        if self.iteration == 0 {
599            self.run_config_id = run_config_id;
600            self.run_id = Some(UUID4::new());
601            self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
602            self.backtest_start = Some(start_ns);
603
604            // Initialize exchange accounts
605            for exchange in self.venues.values() {
606                exchange.borrow_mut().initialize_account();
607            }
608
609            // Re-set clocks after account init
610            Self::set_all_clocks_time(&clocks, start_ns);
611
612            // Reset force stop flag
613            self.force_stop = false;
614
615            // Initialize sync command senders (once per thread)
616            Self::init_command_senders();
617
618            // Set logging to static clock mode for deterministic timestamps
619            logging_clock_set_static_mode();
620            logging_clock_set_static_time(start_ns.as_u64());
621
622            // Start kernel (engines + trader init + clients)
623            self.kernel.start();
624            self.kernel.start_trader();
625
626            self.log_pre_run();
627        }
628
629        self.log_run();
630
631        // Skip data before start_ns
632        let mut data = self.data_iterator.next();
633        while let Some(ref d) = data {
634            if d.ts_init() >= start_ns {
635                break;
636            }
637            data = self.data_iterator.next();
638        }
639
640        // Initialize last_ns before first data point
641        if let Some(ref d) = data {
642            let ts = d.ts_init();
643            self.last_ns = if ts.as_u64() > 0 {
644                UnixNanos::from(ts.as_u64() - 1)
645            } else {
646                UnixNanos::default()
647            };
648        } else {
649            self.last_ns = start_ns;
650        }
651
652        loop {
653            if self.force_stop {
654                log::error!("Force stop triggered, ending backtest");
655                break;
656            }
657
658            if data.is_none() {
659                if streaming {
660                    // In streaming mode, don't advance timers past the
661                    // current batch. The next batch will provide more data
662                    // and timers will fire naturally as time advances.
663                    break;
664                }
665                let done = self.process_next_timer(&clocks);
666                data = self.data_iterator.next();
667                if data.is_none() && done {
668                    break;
669                }
670                continue;
671            }
672
673            let d = data.as_ref().unwrap();
674            let ts_init = d.ts_init();
675
676            if ts_init > end_ns {
677                break;
678            }
679
680            if ts_init > self.last_ns {
681                self.last_ns = ts_init;
682                self.advance_time_impl(ts_init, &clocks);
683            }
684
685            // Route data to exchange
686            self.route_data_to_exchange(d);
687
688            // Process through data engine (may trigger strategy callbacks
689            // which queue trading commands via the sync senders)
690            self.kernel.data_engine.borrow_mut().process_data(d.clone());
691
692            // Drain deferred commands, then process exchange queues
693            self.drain_command_queues();
694            self.settle_venues(ts_init);
695
696            let prev_last_ns = self.last_ns;
697            data = self.data_iterator.next();
698
699            // If timestamp changed (or exhausted), flush timers then run modules
700            if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
701                self.flush_accumulator_events(&clocks, prev_last_ns);
702                self.run_venue_modules(prev_last_ns);
703            }
704
705            self.iteration += 1;
706        }
707
708        // Process remaining exchange messages
709        let ts_now = self.kernel.clock.borrow().timestamp_ns();
710        self.settle_venues(ts_now);
711        self.run_venue_modules(ts_now);
712
713        // Flush remaining timer events. In streaming mode only flush to the
714        // last data timestamp to avoid advancing timers past the current batch.
715        // The final flush to end_ns happens in end() or a non-streaming run.
716        if streaming {
717            self.flush_accumulator_events(&clocks, self.last_ns);
718        } else {
719            self.flush_accumulator_events(&clocks, end_ns);
720        }
721
722        Ok(())
723    }
724
725    /// Manually end the backtest.
726    pub fn end(&mut self) {
727        // Flush remaining timer events to the backtest end boundary so that
728        // tail alerts/expiries scheduled after the last data point still fire.
729        // Must run before stopping engines since DataEngine::stop() cancels
730        // bar aggregator timers.
731        if self.end_ns.as_u64() > 0 {
732            let clocks = self.collect_all_clocks();
733            self.flush_accumulator_events(&clocks, self.end_ns);
734        }
735
736        // Stop trader
737        self.kernel.stop_trader();
738
739        // Stop engines
740        self.kernel.data_engine.borrow_mut().stop();
741        self.kernel.risk_engine.borrow_mut().stop();
742        self.kernel.exec_engine.borrow_mut().stop();
743
744        // Process remaining exchange messages
745        let ts_now = self.kernel.clock.borrow().timestamp_ns();
746        self.settle_venues(ts_now);
747        self.run_venue_modules(ts_now);
748
749        self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
750        self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
751
752        // Switch logging back to realtime mode
753        logging_clock_set_realtime_mode();
754
755        self.log_post_run();
756    }
757
758    /// Reset the backtest engine.
759    ///
760    /// All stateful fields are reset to their initial value. Data and instruments
761    /// persist across resets to enable repeated runs with different strategies.
762    pub fn reset(&mut self) {
763        log::debug!("Resetting");
764
765        if self.kernel.trader.is_running() {
766            self.end();
767        }
768
769        // Stop and reset engines
770        self.kernel.data_engine.borrow_mut().stop();
771        self.kernel.data_engine.borrow_mut().reset();
772
773        self.kernel.exec_engine.borrow_mut().stop();
774        self.kernel.exec_engine.borrow_mut().reset();
775
776        self.kernel.risk_engine.borrow_mut().stop();
777        self.kernel.risk_engine.borrow_mut().reset();
778
779        // Reset trader
780        if let Err(e) = self.kernel.trader.reset() {
781            log::error!("Error resetting trader: {e:?}");
782        }
783
784        // Reset all exchanges
785        for exchange in self.venues.values() {
786            exchange.borrow_mut().reset();
787        }
788
789        // Clear run state
790        self.run_config_id = None;
791        self.run_id = None;
792        self.run_started = None;
793        self.run_finished = None;
794        self.backtest_start = None;
795        self.backtest_end = None;
796        self.iteration = 0;
797        self.force_stop = false;
798        self.last_ns = UnixNanos::default();
799        self.last_module_ns = None;
800        self.end_ns = UnixNanos::default();
801
802        self.accumulator.clear();
803
804        // Reset all iterator cursors to beginning (data persists)
805        self.data_iterator.reset_all_cursors();
806
807        log::info!("Reset");
808    }
809
810    /// Sort the engine's internal data stream by timestamp.
811    ///
812    /// Useful when data has been added with `sort=false` for batch performance,
813    /// then sorted once before running.
814    pub fn sort_data(&mut self) {
815        // The iterator sorts internally on add_data, but if multiple streams
816        // were added unsorted we need to re-add them. Since we use a single
817        // "backtest_data" stream, the iterator already maintains sort order.
818        // This is a no-op when using the iterator (data is sorted on insert).
819        log::info!("Data sort requested (iterator maintains sort order)");
820    }
821
822    /// Clear the engine's internal data stream. Does not clear instruments.
823    pub fn clear_data(&mut self) {
824        self.has_data.clear();
825        self.has_book_data.clear();
826        self.data_iterator = BacktestDataIterator::new();
827        self.data_len = 0;
828        self.data_stream_counter = 0;
829        self.ts_first = None;
830        self.ts_last_data = None;
831    }
832
833    /// Clear all trading strategies from the engine's internal trader.
834    ///
835    /// # Errors
836    ///
837    /// Returns an error if any strategy fails to dispose.
838    pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
839        self.kernel.trader.clear_strategies()
840    }
841
842    /// Clear all execution algorithms from the engine's internal trader.
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if any execution algorithm fails to dispose.
847    pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
848        self.kernel.trader.clear_exec_algorithms()
849    }
850
851    /// Dispose of the backtest engine, releasing all resources.
852    pub fn dispose(&mut self) {
853        self.clear_data();
854        self.kernel.dispose();
855    }
856
857    /// Return the backtest result from the last run.
858    #[must_use]
859    pub fn get_result(&self) -> BacktestResult {
860        let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
861            (Some(start), Some(end)) => {
862                (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
863            }
864            _ => 0.0,
865        };
866
867        let cache = self.kernel.cache.borrow();
868        let orders = cache.orders(None, None, None, None, None);
869        let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
870        let total_orders = orders.len();
871        let positions = cache.positions(None, None, None, None, None);
872        let total_positions = positions.len();
873
874        let analyzer = self.build_analyzer(&cache, &positions);
875        let mut stats_pnls = AHashMap::new();
876        for currency in analyzer.currencies() {
877            if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
878                stats_pnls.insert(currency.code.to_string(), pnls);
879            }
880        }
881
882        let stats_returns = analyzer.get_performance_stats_returns();
883        let stats_general = analyzer.get_performance_stats_general();
884
885        BacktestResult {
886            trader_id: self.config.trader_id().to_string(),
887            machine_id: self.kernel.machine_id.clone(),
888            instance_id: self.instance_id,
889            run_config_id: self.run_config_id.clone(),
890            run_id: self.run_id,
891            run_started: self.run_started,
892            run_finished: self.run_finished,
893            backtest_start: self.backtest_start,
894            backtest_end: self.backtest_end,
895            elapsed_time_secs,
896            iterations: self.iteration,
897            total_events,
898            total_orders,
899            total_positions,
900            stats_pnls,
901            stats_returns,
902            stats_general,
903        }
904    }
905
906    fn build_analyzer(&self, cache: &Cache, positions: &[&Position]) -> PortfolioAnalyzer {
907        let mut analyzer = PortfolioAnalyzer::default();
908        let positions_owned: Vec<_> = positions.iter().map(|p| (*p).clone()).collect();
909
910        // Aggregate starting and current balances across all venue accounts
911        for venue in self.venues.keys() {
912            if let Some(account) = cache.account_for_venue(venue) {
913                let account_ref: &dyn Account = match account {
914                    AccountAny::Cash(cash) => cash,
915                    AccountAny::Margin(margin) => margin,
916                };
917                for (currency, money) in account_ref.starting_balances() {
918                    analyzer
919                        .account_balances_starting
920                        .entry(currency)
921                        .and_modify(|existing| *existing = *existing + money)
922                        .or_insert(money);
923                }
924                for (currency, money) in account_ref.balances_total() {
925                    analyzer
926                        .account_balances
927                        .entry(currency)
928                        .and_modify(|existing| *existing = *existing + money)
929                        .or_insert(money);
930                }
931            }
932        }
933
934        analyzer.add_positions(&positions_owned);
935        analyzer
936    }
937
938    fn route_data_to_exchange(&self, data: &Data) {
939        let venue = data.instrument_id().venue;
940        if let Some(exchange) = self.venues.get(&venue) {
941            let mut ex = exchange.borrow_mut();
942            match data {
943                Data::Delta(delta) => ex.process_order_book_delta(*delta),
944                Data::Deltas(deltas) => ex.process_order_book_deltas((**deltas).clone()),
945                Data::Quote(quote) => ex.process_quote_tick(quote),
946                Data::Trade(trade) => ex.process_trade_tick(trade),
947                Data::Bar(bar) => ex.process_bar(*bar),
948                Data::InstrumentClose(close) => ex.process_instrument_close(*close),
949                Data::Depth10(depth) => ex.process_order_book_depth10(depth),
950                Data::MarkPriceUpdate(_) | Data::IndexPriceUpdate(_) => {
951                    // Not routed to exchange — processed by data engine only
952                }
953            }
954        } else {
955            log::warn!("No exchange found for venue {venue}, data not routed");
956        }
957    }
958
959    fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
960        // Advance all clocks to ts_now via accumulator
961        for clock in clocks {
962            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
963        }
964
965        // Process events with ts_event < ts_now
966        let ts_before = if ts_now.as_u64() > 0 {
967            UnixNanos::from(ts_now.as_u64() - 1)
968        } else {
969            UnixNanos::default()
970        };
971
972        let mut ts_last: Option<UnixNanos> = None;
973
974        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
975            let ts_event = handler.event.ts_event;
976
977            // Settle previous timestamp batch before advancing
978            if let Some(ts) = ts_last
979                && ts != ts_event
980            {
981                self.settle_venues(ts);
982                self.run_venue_modules(ts);
983            }
984
985            ts_last = Some(ts_event);
986            Self::set_all_clocks_time(clocks, ts_event);
987            logging_clock_set_static_time(ts_event.as_u64());
988
989            handler.run();
990            self.drain_command_queues();
991
992            // Re-advance clocks to capture chained timers
993            for clock in clocks {
994                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
995            }
996        }
997
998        // Settle the last timestamp batch
999        if let Some(ts) = ts_last {
1000            self.settle_venues(ts);
1001            self.run_venue_modules(ts);
1002        }
1003
1004        Self::set_all_clocks_time(clocks, ts_now);
1005        logging_clock_set_static_time(ts_now.as_u64());
1006    }
1007
1008    fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
1009        for clock in clocks {
1010            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1011        }
1012
1013        let mut ts_last: Option<UnixNanos> = None;
1014
1015        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
1016            let ts_event = handler.event.ts_event;
1017
1018            // Settle previous timestamp batch before advancing
1019            if let Some(ts) = ts_last
1020                && ts != ts_event
1021            {
1022                self.settle_venues(ts);
1023                self.run_venue_modules(ts);
1024            }
1025
1026            ts_last = Some(ts_event);
1027            Self::set_all_clocks_time(clocks, ts_event);
1028            logging_clock_set_static_time(ts_event.as_u64());
1029
1030            handler.run();
1031            self.drain_command_queues();
1032
1033            // Re-advance clocks to capture chained timers
1034            for clock in clocks {
1035                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1036            }
1037        }
1038
1039        // Settle the last timestamp batch
1040        if let Some(ts) = ts_last {
1041            self.settle_venues(ts);
1042            self.run_venue_modules(ts);
1043        }
1044    }
1045
1046    fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
1047        self.flush_accumulator_events(clocks, self.last_ns);
1048
1049        // Find minimum next timer time across all component clocks
1050        let mut min_next_time: Option<UnixNanos> = None;
1051
1052        for clock in clocks {
1053            let clock_ref = clock.borrow();
1054            for name in clock_ref.timer_names() {
1055                if let Some(next_time) = clock_ref.next_time_ns(name)
1056                    && next_time > self.last_ns
1057                {
1058                    min_next_time = Some(match min_next_time {
1059                        Some(current_min) => next_time.min(current_min),
1060                        None => next_time,
1061                    });
1062                }
1063            }
1064        }
1065
1066        match min_next_time {
1067            None => true,
1068            Some(t) if t > self.end_ns => true,
1069            Some(t) => {
1070                self.last_ns = t;
1071                self.flush_accumulator_events(clocks, t);
1072                false
1073            }
1074        }
1075    }
1076
1077    fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
1078        let mut clocks = vec![self.kernel.clock.clone()];
1079        clocks.extend(self.kernel.trader.get_component_clocks());
1080        clocks
1081    }
1082
1083    fn settle_venues(&self, ts_now: UnixNanos) {
1084        // Advance venue clocks so modules and event generators see the
1085        // correct timestamp even when no commands are pending
1086        for exchange in self.venues.values() {
1087            exchange.borrow().set_clock_time(ts_now);
1088        }
1089
1090        // Drain commands then iterate matching engines to fill newly added
1091        // orders. Fills may enqueue further commands (e.g. hedge orders
1092        // submitted from on_order_filled), so loop until quiescent.
1093        // Only process and iterate venues that had pending commands each
1094        // pass, to avoid extra fill-model rolls on untouched venues.
1095        loop {
1096            let active_venues: Vec<Venue> = self
1097                .venues
1098                .iter()
1099                .filter(|(_, ex)| ex.borrow().has_pending_commands(ts_now))
1100                .map(|(id, _)| *id)
1101                .collect();
1102
1103            if active_venues.is_empty() {
1104                break;
1105            }
1106
1107            for venue_id in &active_venues {
1108                self.venues[venue_id].borrow_mut().process(ts_now);
1109            }
1110            self.drain_command_queues();
1111
1112            for venue_id in &active_venues {
1113                self.venues[venue_id]
1114                    .borrow_mut()
1115                    .iterate_matching_engines(ts_now);
1116            }
1117
1118            // Drain again so fill-triggered commands (e.g. hedge orders
1119            // from on_order_filled) are visible to has_pending_commands
1120            self.drain_command_queues();
1121        }
1122    }
1123
1124    fn run_venue_modules(&mut self, ts_now: UnixNanos) {
1125        if self.last_module_ns == Some(ts_now) {
1126            return;
1127        }
1128        self.last_module_ns = Some(ts_now);
1129
1130        // Pre-settle handler-generated work so modules see final state
1131        self.drain_command_queues();
1132        self.settle_venues(ts_now);
1133
1134        for exchange in self.venues.values() {
1135            exchange.borrow().process_modules(ts_now);
1136        }
1137
1138        // Post-settle any commands emitted by modules
1139        self.drain_command_queues();
1140        self.settle_venues(ts_now);
1141    }
1142
1143    fn drain_exec_client_events(&self) {
1144        for client in &self.exec_clients {
1145            client.drain_queued_events();
1146        }
1147    }
1148
1149    fn drain_command_queues(&self) {
1150        // Drain trading commands, exec client events, and data commands
1151        // in a loop until all queues settle. Handles cascading re-entrancy
1152        // (e.g. strategy submits order from on_order_filled).
1153        loop {
1154            drain_trading_cmd_queue();
1155            drain_data_cmd_queue();
1156            self.drain_exec_client_events();
1157
1158            if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
1159                break;
1160            }
1161        }
1162    }
1163
1164    fn init_command_senders() {
1165        init_data_cmd_sender(Arc::new(SyncDataCommandSender));
1166        init_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1167    }
1168
1169    fn advance_clock_on_accumulator(
1170        accumulator: &mut TimeEventAccumulator,
1171        clock: &Rc<RefCell<dyn Clock>>,
1172        to_time_ns: UnixNanos,
1173        set_time: bool,
1174    ) {
1175        let mut clock_ref = clock.borrow_mut();
1176        let test_clock = clock_ref
1177            .as_any_mut()
1178            .downcast_mut::<TestClock>()
1179            .expect("BacktestEngine requires TestClock");
1180        accumulator.advance_clock(test_clock, to_time_ns, set_time);
1181    }
1182
1183    fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
1184        for clock in clocks {
1185            let mut clock_ref = clock.borrow_mut();
1186            let test_clock = clock_ref
1187                .as_any_mut()
1188                .downcast_mut::<TestClock>()
1189                .expect("BacktestEngine requires TestClock");
1190            test_clock.set_time(ts);
1191        }
1192    }
1193
1194    #[rustfmt::skip]
1195    fn log_pre_run(&self) {
1196        log_info!("=================================================================", color = LogColor::Cyan);
1197        log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
1198        log_info!("=================================================================", color = LogColor::Cyan);
1199
1200        for exchange in self.venues.values() {
1201            let ex = exchange.borrow();
1202            log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
1203        }
1204
1205        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1206    }
1207
1208    #[rustfmt::skip]
1209    fn log_run(&self) {
1210        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1211        let id = format_optional_uuid(self.run_id.as_ref());
1212        let start = format_optional_nanos(self.backtest_start);
1213
1214        log_info!("=================================================================", color = LogColor::Cyan);
1215        log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1216        log_info!("=================================================================", color = LogColor::Cyan);
1217        log::info!("Run config ID:  {config_id}");
1218        log::info!("Run ID:         {id}");
1219        log::info!("Backtest start: {start}");
1220        log::info!("Data elements:  {}", self.data_len);
1221        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1222    }
1223
1224    #[rustfmt::skip]
1225    fn log_post_run(&self) {
1226        let cache = self.kernel.cache.borrow();
1227        let orders = cache.orders(None, None, None, None, None);
1228        let total_events: usize = orders.iter().map(|o| o.event_count()).sum();
1229        let total_orders = orders.len();
1230        let positions = cache.positions(None, None, None, None, None);
1231        let total_positions = positions.len();
1232
1233        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1234        let id = format_optional_uuid(self.run_id.as_ref());
1235        let started = format_optional_nanos(self.run_started);
1236        let finished = format_optional_nanos(self.run_finished);
1237        let elapsed = format_optional_duration(self.run_started, self.run_finished);
1238        let bt_start = format_optional_nanos(self.backtest_start);
1239        let bt_end = format_optional_nanos(self.backtest_end);
1240        let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1241        let iterations = self.iteration.separate_with_underscores();
1242        let events = total_events.separate_with_underscores();
1243        let num_orders = total_orders.separate_with_underscores();
1244        let num_positions = total_positions.separate_with_underscores();
1245
1246        log_info!("=================================================================", color = LogColor::Cyan);
1247        log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1248        log_info!("=================================================================", color = LogColor::Cyan);
1249        log::info!("Run config ID:  {config_id}");
1250        log::info!("Run ID:         {id}");
1251        log::info!("Run started:    {started}");
1252        log::info!("Run finished:   {finished}");
1253        log::info!("Elapsed time:   {elapsed}");
1254        log::info!("Backtest start: {bt_start}");
1255        log::info!("Backtest end:   {bt_end}");
1256        log::info!("Backtest range: {bt_range}");
1257        log::info!("Iterations: {iterations}");
1258        log::info!("Total events: {events}");
1259        log::info!("Total orders: {num_orders}");
1260        log::info!("Total positions: {num_positions}");
1261
1262        if !self.config.run_analysis {
1263            return;
1264        }
1265
1266        let analyzer = self.build_analyzer(&cache, &positions);
1267        log_portfolio_performance(&analyzer);
1268    }
1269
1270    /// Registers a data client for the given `client_id` if one does not already exist.
1271    pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1272        if self
1273            .kernel
1274            .data_engine
1275            .borrow()
1276            .registered_clients()
1277            .contains(&client_id)
1278        {
1279            return;
1280        }
1281
1282        let venue = Venue::from(client_id.as_str());
1283        let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1284        let data_client_adapter = DataClientAdapter::new(
1285            backtest_client.client_id,
1286            None,
1287            false,
1288            false,
1289            Box::new(backtest_client),
1290        );
1291
1292        self.kernel
1293            .data_engine
1294            .borrow_mut()
1295            .register_client(data_client_adapter, None);
1296    }
1297
1298    /// Registers a market data client for the given `venue` if one does not already exist.
1299    pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1300        let client_id = ClientId::from(venue.as_str());
1301
1302        if !self
1303            .kernel
1304            .data_engine
1305            .borrow()
1306            .registered_clients()
1307            .contains(&client_id)
1308        {
1309            let backtest_client =
1310                BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1311            let data_client_adapter = DataClientAdapter::new(
1312                client_id,
1313                Some(venue),
1314                false,
1315                false,
1316                Box::new(backtest_client),
1317            );
1318            self.kernel
1319                .data_engine
1320                .borrow_mut()
1321                .register_client(data_client_adapter, Some(venue));
1322        }
1323    }
1324}
1325
1326fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1327    nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1328}
1329
1330fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1331    uuid.map_or("None".to_string(), |id| id.to_string())
1332}
1333
1334fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1335    match (start, end) {
1336        (Some(s), Some(e)) => {
1337            let delta = e.to_datetime_utc() - s.to_datetime_utc();
1338            let days = delta.num_days().abs();
1339            let hours = delta.num_hours().abs() % 24;
1340            let minutes = delta.num_minutes().abs() % 60;
1341            let seconds = delta.num_seconds().abs() % 60;
1342            let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1343            format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1344        }
1345        _ => "None".to_string(),
1346    }
1347}
1348
1349#[rustfmt::skip]
1350fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1351    log_info!("=================================================================", color = LogColor::Cyan);
1352    log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1353    log_info!("=================================================================", color = LogColor::Cyan);
1354
1355    for currency in analyzer.currencies() {
1356        log::info!(" PnL Statistics ({})", currency.code);
1357        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1358
1359        if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1360            for line in &pnl_lines {
1361                log::info!("{line}");
1362            }
1363        }
1364
1365        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1366    }
1367
1368    log::info!(" Returns Statistics");
1369    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1370    for line in &analyzer.get_stats_returns_formatted() {
1371        log::info!("{line}");
1372    }
1373    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1374
1375    log::info!(" General Statistics");
1376    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1377    for line in &analyzer.get_stats_general_formatted() {
1378        log::info!("{line}");
1379    }
1380    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1381}