Skip to main content

nautilus_backtest/
engine.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! The core `BacktestEngine` for backtesting on historical data.
17
18use std::{
19    any::Any,
20    cell::RefCell,
21    fmt::Debug,
22    rc::{Rc, Weak},
23    sync::Arc,
24};
25
26use ahash::{AHashMap, AHashSet};
27use nautilus_analysis::analyzer::PortfolioAnalyzer;
28use nautilus_common::{
29    actor::DataActor,
30    cache::Cache,
31    clock::{Clock, TestClock},
32    component::Component,
33    enums::LogColor,
34    log_info,
35    logging::{
36        logging_clock_set_realtime_mode, logging_clock_set_static_mode,
37        logging_clock_set_static_time,
38    },
39    runner::{
40        SyncDataCommandSender, SyncTradingCommandSender, data_cmd_queue_is_empty,
41        drain_data_cmd_queue, drain_trading_cmd_queue, replace_data_cmd_sender,
42        replace_exec_cmd_sender, trading_cmd_queue_is_empty,
43    },
44    timer::{TimeEvent, TimeEventCallback},
45};
46use nautilus_core::{
47    UUID4, UnixNanos, datetime::unix_nanos_to_iso8601, string::formatting::Separable,
48};
49use nautilus_data::client::DataClientAdapter;
50use nautilus_execution::models::fill::FillModelAny;
51use nautilus_model::{
52    accounts::{Account, AccountAny},
53    data::{Data, HasTsInit},
54    enums::{AccountType, AggregationSource, BookType},
55    identifiers::{AccountId, ClientId, InstrumentId, TraderId, Venue},
56    instruments::{Instrument, InstrumentAny},
57    position::Position,
58    types::Price,
59};
60use nautilus_system::{config::NautilusKernelConfig, kernel::NautilusKernel};
61use nautilus_trading::{ExecutionAlgorithm, strategy::Strategy};
62
63use crate::{
64    accumulator::TimeEventAccumulator,
65    config::{BacktestEngineConfig, SimulatedVenueConfig},
66    data_client::BacktestDataClient,
67    data_iterator::BacktestDataIterator,
68    exchange::SimulatedExchange,
69    execution_client::BacktestExecutionClient,
70    result::BacktestResult,
71};
72
73/// Core backtesting engine for running event-driven strategy backtests on historical data.
74///
75/// The `BacktestEngine` provides a high-fidelity simulation environment that processes
76/// historical market data chronologically through an event-driven architecture. It maintains
77/// simulated exchanges with realistic order matching and execution, allowing strategies
78/// to be tested exactly as they would run in live trading:
79///
80/// - Event-driven data replay with configurable latency models.
81/// - Multi-venue and multi-asset support.
82/// - Realistic order matching and execution simulation.
83/// - Strategy and portfolio performance analysis.
84/// - Transition from backtesting to live trading.
85pub struct BacktestEngine {
86    instance_id: UUID4,
87    config: BacktestEngineConfig,
88    kernel: NautilusKernel,
89    accumulator: TimeEventAccumulator,
90    run_config_id: Option<String>,
91    run_id: Option<UUID4>,
92    venues: AHashMap<Venue, Rc<RefCell<SimulatedExchange>>>,
93    exec_clients: Vec<BacktestExecutionClient>,
94    has_data: AHashSet<InstrumentId>,
95    has_book_data: AHashSet<InstrumentId>,
96    data_iterator: BacktestDataIterator,
97    data_len: usize,
98    data_stream_counter: usize,
99    ts_first: Option<UnixNanos>,
100    ts_last_data: Option<UnixNanos>,
101    sorted: bool,
102    iteration: usize,
103    force_stop: bool,
104    last_ns: UnixNanos,
105    last_module_ns: Option<UnixNanos>,
106    last_liquidation_ns: Option<UnixNanos>,
107    end_ns: UnixNanos,
108    run_started: Option<UnixNanos>,
109    run_finished: Option<UnixNanos>,
110    backtest_start: Option<UnixNanos>,
111    backtest_end: Option<UnixNanos>,
112}
113
114impl Debug for BacktestEngine {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct(stringify!(BacktestEngine))
117            .field("instance_id", &self.instance_id)
118            .field("run_config_id", &self.run_config_id)
119            .field("run_id", &self.run_id)
120            .finish()
121    }
122}
123
124impl BacktestEngine {
125    /// Create a new [`BacktestEngine`] instance.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error if the core `NautilusKernel` fails to initialize.
130    pub fn new(mut config: BacktestEngineConfig) -> anyhow::Result<Self> {
131        // The engine does not replay `add_instrument` on reset, so reruns rely
132        // on the cache retaining instruments regardless of the caller's config.
133        let mut cache_config = config.cache.unwrap_or_default();
134        cache_config.drop_instruments_on_reset = false;
135        config.cache = Some(cache_config);
136        let kernel = NautilusKernel::new("BacktestEngine".to_string(), config.clone())?;
137        Ok(Self {
138            instance_id: kernel.instance_id,
139            config,
140            accumulator: TimeEventAccumulator::new(),
141            kernel,
142            run_config_id: None,
143            run_id: None,
144            venues: AHashMap::new(),
145            exec_clients: Vec::new(),
146            has_data: AHashSet::new(),
147            has_book_data: AHashSet::new(),
148            data_iterator: BacktestDataIterator::new(),
149            data_len: 0,
150            data_stream_counter: 0,
151            ts_first: None,
152            ts_last_data: None,
153            sorted: true,
154            iteration: 0,
155            force_stop: false,
156            last_ns: UnixNanos::default(),
157            last_module_ns: None,
158            last_liquidation_ns: None,
159            end_ns: UnixNanos::default(),
160            run_started: None,
161            run_finished: None,
162            backtest_start: None,
163            backtest_end: None,
164        })
165    }
166
167    /// Returns a reference to the underlying kernel.
168    #[must_use]
169    pub const fn kernel(&self) -> &NautilusKernel {
170        &self.kernel
171    }
172
173    /// Returns a mutable reference to the underlying kernel.
174    pub fn kernel_mut(&mut self) -> &mut NautilusKernel {
175        &mut self.kernel
176    }
177
178    /// Returns the trader ID for this engine.
179    #[must_use]
180    pub fn trader_id(&self) -> TraderId {
181        self.kernel.trader_id()
182    }
183
184    /// Returns the machine ID for this engine.
185    #[must_use]
186    pub fn machine_id(&self) -> &str {
187        self.kernel.machine_id()
188    }
189
190    /// Returns the unique instance ID for this engine.
191    #[must_use]
192    pub fn instance_id(&self) -> UUID4 {
193        self.instance_id
194    }
195
196    /// Returns the current iteration count.
197    #[must_use]
198    pub fn iteration(&self) -> usize {
199        self.iteration
200    }
201
202    /// Returns the last run config ID, if any.
203    #[must_use]
204    pub fn run_config_id(&self) -> Option<&str> {
205        self.run_config_id.as_deref()
206    }
207
208    /// Returns the last run ID, if any.
209    #[must_use]
210    pub const fn run_id(&self) -> Option<UUID4> {
211        self.run_id
212    }
213
214    /// Returns when the last run started, if any.
215    #[must_use]
216    pub const fn run_started(&self) -> Option<UnixNanos> {
217        self.run_started
218    }
219
220    /// Returns when the last run finished, if any.
221    #[must_use]
222    pub const fn run_finished(&self) -> Option<UnixNanos> {
223        self.run_finished
224    }
225
226    /// Returns the last backtest range start, if any.
227    #[must_use]
228    pub const fn backtest_start(&self) -> Option<UnixNanos> {
229        self.backtest_start
230    }
231
232    /// Returns the last backtest range end, if any.
233    #[must_use]
234    pub const fn backtest_end(&self) -> Option<UnixNanos> {
235        self.backtest_end
236    }
237
238    /// Returns the list of registered venue identifiers.
239    #[must_use]
240    pub fn list_venues(&self) -> Vec<Venue> {
241        self.venues.keys().copied().collect()
242    }
243
244    /// # Errors
245    ///
246    /// Returns an error if initializing the simulated exchange for the venue fails.
247    pub fn add_venue(&mut self, config: SimulatedVenueConfig) -> anyhow::Result<()> {
248        // `routing` and `frozen_account` flow to the exec client, so capture
249        // them before the config is consumed by the exchange constructor.
250        let venue = config.venue;
251        let routing = Some(config.routing);
252        let frozen_account = Some(config.frozen_account);
253
254        let exchange =
255            SimulatedExchange::new(config, self.kernel.cache.clone(), self.kernel.clock.clone())?;
256        let exchange = Rc::new(RefCell::new(exchange));
257        SimulatedExchange::register_spread_quote_endpoint(&exchange);
258        self.venues.insert(venue, exchange.clone());
259
260        let account_id = AccountId::from(format!("{venue}-001").as_str());
261
262        let exec_client = BacktestExecutionClient::new(
263            self.config.trader_id(),
264            account_id,
265            &exchange,
266            self.kernel.cache.clone(),
267            self.kernel.clock.clone(),
268            routing,
269            frozen_account,
270        );
271
272        exchange
273            .borrow_mut()
274            .register_client(Rc::new(exec_client.clone()));
275
276        self.exec_clients.push(exec_client.clone());
277
278        self.kernel
279            .exec_engine
280            .borrow_mut()
281            .register_client(Box::new(exec_client))?;
282
283        log::info!("Adding exchange {venue} to engine");
284
285        Ok(())
286    }
287
288    /// Sets the settlement price for the specified venue instrument.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the venue has not been added to the engine.
293    pub fn set_settlement_price(
294        &mut self,
295        venue: Venue,
296        instrument_id: InstrumentId,
297        price: Price,
298    ) -> anyhow::Result<()> {
299        let exchange = self
300            .venues
301            .get_mut(&venue)
302            .ok_or_else(|| anyhow::anyhow!("Unknown venue {venue}"))?;
303        exchange
304            .borrow_mut()
305            .set_settlement_price(instrument_id, price);
306        Ok(())
307    }
308
309    /// Changes the fill model for the specified venue.
310    pub fn change_fill_model(&mut self, venue: Venue, fill_model: FillModelAny) {
311        if let Some(exchange) = self.venues.get_mut(&venue) {
312            exchange.borrow_mut().set_fill_model(fill_model);
313        } else {
314            log::warn!(
315                "BacktestEngine::change_fill_model called for unknown venue {venue}, ignoring"
316            );
317        }
318    }
319
320    /// Adds an instrument to the backtest engine for the specified venue.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if:
325    /// - The instrument's associated venue has not been added via `add_venue`.
326    /// - Attempting to add a `CurrencyPair` instrument for a single-currency CASH account.
327    ///
328    pub fn add_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
329        let instrument_id = instrument.id();
330        if let Some(exchange) = self.venues.get(&instrument.id().venue) {
331            if matches!(
332                instrument,
333                InstrumentAny::CurrencyPair(_) | InstrumentAny::TokenizedAsset(_)
334            ) && exchange.borrow().account_type != AccountType::Margin
335                && exchange.borrow().base_currency.is_some()
336            {
337                anyhow::bail!(
338                    "Cannot add a multi-currency spot instrument {instrument_id} for a venue with a single-currency CASH account"
339                )
340            }
341            exchange.borrow_mut().add_instrument(instrument.clone())?;
342            if let Some(expiration_ns) = instrument.expiration_ns() {
343                self.set_instrument_expiration_timer(exchange, instrument_id, expiration_ns)?;
344            }
345        } else {
346            anyhow::bail!(
347                "Cannot add an `Instrument` object without first adding its associated venue {}",
348                instrument.id().venue
349            )
350        }
351
352        self.add_market_data_client_if_not_exists(instrument.id().venue);
353
354        self.kernel
355            .data_engine
356            .borrow_mut()
357            .process(instrument as &dyn Any);
358        log::info!(
359            "Added instrument {} to exchange {}",
360            instrument_id,
361            instrument_id.venue
362        );
363        Ok(())
364    }
365
366    /// Adds market data to the engine for replay during the backtest run.
367    ///
368    /// # Errors
369    ///
370    /// Returns an error if:
371    /// - `data` is empty.
372    /// - `validate` is `true` and the instrument for the first element has not been
373    ///   added to the cache via [`add_instrument`](Self::add_instrument).
374    /// - `validate` is `true` and the first element is a [`Data::Bar`] whose
375    ///   `aggregation_source` is not [`AggregationSource::External`].
376    pub fn add_data(
377        &mut self,
378        data: Vec<Data>,
379        _client_id: Option<ClientId>,
380        validate: bool,
381        sort: bool,
382    ) -> anyhow::Result<()> {
383        anyhow::ensure!(!data.is_empty(), "data was empty");
384
385        let count = data.len();
386        let mut to_add = data;
387
388        if sort {
389            to_add.sort_by_key(HasTsInit::ts_init);
390        }
391
392        if validate {
393            // Mirror Cython: validate against the first element only and assume the
394            // batch is homogeneous (documented contract on add_data).
395            let first = &to_add[0];
396            #[cfg(feature = "defi")]
397            let first_is_defi = matches!(first, Data::Defi(_));
398            #[cfg(not(feature = "defi"))]
399            let first_is_defi = false;
400
401            if !first_is_defi {
402                let first_instrument_id = first.instrument_id();
403                anyhow::ensure!(
404                    self.kernel
405                        .cache
406                        .borrow()
407                        .instrument(&first_instrument_id)
408                        .is_some(),
409                    "Instrument {first_instrument_id} for the given data not found in the cache. \
410                     Add the instrument through `add_instrument()` prior to adding related data."
411                );
412
413                if let Data::Bar(bar) = first {
414                    anyhow::ensure!(
415                        bar.bar_type.aggregation_source() == AggregationSource::External,
416                        "bar_type.aggregation_source must be External, was {:?}",
417                        bar.bar_type.aggregation_source(),
418                    );
419                }
420            }
421        }
422
423        // Track has_data / has_book_data unconditionally so the depth-vs-data
424        // run-time check still fires for callers that pass validate=false
425        // (e.g. node.rs run_oneshot loading from a catalog). Time bounds are
426        // also tracked here so start/end defaults are correct even when the
427        // batch was added with sort=false.
428        let mut batch_min_ts: Option<UnixNanos> = None;
429        let mut batch_max_ts: Option<UnixNanos> = None;
430
431        #[cfg(feature = "defi")]
432        if to_add.iter().any(|item| matches!(item, Data::Defi(_))) {
433            self.add_defi_data_client_if_not_exists(_client_id);
434        }
435
436        for item in &to_add {
437            #[cfg(feature = "defi")]
438            if matches!(item, Data::Defi(_)) {
439                let ts = item.ts_init();
440                batch_min_ts = Some(batch_min_ts.map_or(ts, |cur| cur.min(ts)));
441                batch_max_ts = Some(batch_max_ts.map_or(ts, |cur| cur.max(ts)));
442                continue;
443            }
444
445            let instr_id = item.instrument_id();
446            self.has_data.insert(instr_id);
447
448            if item.is_order_book_data() {
449                self.has_book_data.insert(instr_id);
450            }
451
452            self.add_market_data_client_if_not_exists(instr_id.venue);
453
454            let ts = item.ts_init();
455            batch_min_ts = Some(batch_min_ts.map_or(ts, |cur| cur.min(ts)));
456            batch_max_ts = Some(batch_max_ts.map_or(ts, |cur| cur.max(ts)));
457        }
458
459        if let Some(ts) = batch_min_ts
460            && self.ts_first.is_none_or(|t| ts < t)
461        {
462            self.ts_first = Some(ts);
463        }
464
465        if let Some(ts) = batch_max_ts
466            && self.ts_last_data.is_none_or(|t| ts > t)
467        {
468            self.ts_last_data = Some(ts);
469        }
470
471        self.data_len += count;
472        let stream_name = format!("backtest_data_{}", self.data_stream_counter);
473        self.data_stream_counter += 1;
474        self.data_iterator.add_data(&stream_name, to_add, true);
475
476        self.sorted = sort;
477
478        log::info!(
479            "Added {count} data element{} to BacktestEngine ({} total)",
480            if count == 1 { "" } else { "s" },
481            self.data_len,
482        );
483
484        Ok(())
485    }
486
487    /// Adds a strategy to the backtest engine.
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if the strategy is already registered or the trader is in an invalid
492    /// state for strategy registration.
493    pub fn add_strategy<T>(&mut self, strategy: T) -> anyhow::Result<()>
494    where
495        T: Strategy + Component + Debug + 'static,
496    {
497        self.kernel.trader.borrow_mut().add_strategy(strategy)
498    }
499
500    /// Adds the given strategies to the backtest engine. Stops at the first error.
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if any strategy fails to register; preceding strategies remain registered.
505    pub fn add_strategies<T>(&mut self, strategies: Vec<T>) -> anyhow::Result<()>
506    where
507        T: Strategy + Component + Debug + 'static,
508    {
509        for strategy in strategies {
510            self.add_strategy(strategy)?;
511        }
512        Ok(())
513    }
514
515    /// Adds an actor to the backtest engine.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if the actor is already registered or the trader is in an invalid
520    /// state for actor registration.
521    pub fn add_actor<T>(&mut self, actor: T) -> anyhow::Result<()>
522    where
523        T: DataActor + Component + Debug + 'static,
524    {
525        self.kernel.trader.borrow_mut().add_actor(actor)
526    }
527
528    /// Adds the given actors to the backtest engine. Stops at the first error.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if any actor fails to register; preceding actors remain registered.
533    pub fn add_actors<T>(&mut self, actors: Vec<T>) -> anyhow::Result<()>
534    where
535        T: DataActor + Component + Debug + 'static,
536    {
537        for actor in actors {
538            self.add_actor(actor)?;
539        }
540        Ok(())
541    }
542
543    /// Adds an execution algorithm to the backtest engine.
544    ///
545    /// # Errors
546    ///
547    /// Returns an error if the algorithm is already registered or the trader is running.
548    pub fn add_exec_algorithm<T>(&mut self, exec_algorithm: T) -> anyhow::Result<()>
549    where
550        T: ExecutionAlgorithm + Component + Debug + 'static,
551    {
552        self.kernel
553            .trader
554            .borrow_mut()
555            .add_exec_algorithm(exec_algorithm)
556    }
557
558    /// Adds the given execution algorithms to the backtest engine. Stops at the first error.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if any execution algorithm fails to register; preceding algorithms remain
563    /// registered.
564    pub fn add_exec_algorithms<T>(&mut self, exec_algorithms: Vec<T>) -> anyhow::Result<()>
565    where
566        T: ExecutionAlgorithm + Component + Debug + 'static,
567    {
568        for exec_algorithm in exec_algorithms {
569            self.add_exec_algorithm(exec_algorithm)?;
570        }
571        Ok(())
572    }
573
574    /// Run a backtest.
575    ///
576    /// Processes all data chronologically. When `streaming` is false (default),
577    /// finalizes the run via [`end`](Self::end). When `streaming` is true, the
578    /// run pauses without finalizing so additional data batches can be loaded.
579    /// Timer advancement stops at data exhaustion to avoid producing synthetic
580    /// events (e.g. zero-volume bars) past the current batch.
581    ///
582    /// Streaming workflow:
583    /// 1. Add initial data and strategies
584    /// 2. Loop: call `run(streaming=true)`, `clear_data()`, `add_data(next_batch)`
585    /// 3. After all batches: call `end()` to finalize
586    ///
587    /// # Errors
588    ///
589    /// Returns an error if the backtest encounters an unrecoverable state.
590    pub fn run(
591        &mut self,
592        start: Option<UnixNanos>,
593        end: Option<UnixNanos>,
594        run_config_id: Option<String>,
595        streaming: bool,
596    ) -> anyhow::Result<()> {
597        self.run_impl(start, end, run_config_id, streaming)?;
598
599        // Finalize on non-streaming runs, or when a shutdown was triggered
600        // at any point during the run (including the trailing settle, module,
601        // and flush callbacks that execute after the main data loop) so the
602        // trader and engines actually stop.
603        if !streaming || self.force_stop || self.kernel.is_shutdown_requested() {
604            self.end();
605        }
606
607        Ok(())
608    }
609
610    fn run_impl(
611        &mut self,
612        start: Option<UnixNanos>,
613        end: Option<UnixNanos>,
614        run_config_id: Option<String>,
615        streaming: bool,
616    ) -> anyhow::Result<()> {
617        anyhow::ensure!(
618            self.sorted,
619            "Data has been added but not sorted, call `engine.sort_data()` or use \
620             `engine.add_data(..., sort=true)` before running"
621        );
622
623        for exchange in self.venues.values() {
624            let exchange = exchange.borrow();
625            let book_type_has_depth = exchange.book_type() as u8 > BookType::L1_MBP as u8;
626            if !book_type_has_depth {
627                continue;
628            }
629
630            for instrument_id in exchange.instrument_ids() {
631                let has_data = self.has_data.contains(instrument_id);
632                let missing_book_data = !self.has_book_data.contains(instrument_id);
633                if has_data && missing_book_data {
634                    anyhow::bail!(
635                        "No order book data found for instrument '{instrument_id}' when `book_type` \
636                         is '{:?}'. Set the venue `book_type` to 'L1_MBP' (for top-of-book data \
637                         like quotes, trades, and bars) or provide order book data for this \
638                         instrument.",
639                        exchange.book_type()
640                    );
641                }
642            }
643        }
644
645        // Determine time boundaries
646        let start_ns = start.unwrap_or_else(|| self.ts_first.unwrap_or_default());
647        let end_ns = end.unwrap_or_else(|| {
648            self.ts_last_data
649                .unwrap_or(UnixNanos::from(4_102_444_800_000_000_000u64))
650        });
651        anyhow::ensure!(start_ns <= end_ns, "start was > end");
652        self.end_ns = end_ns;
653        self.last_ns = start_ns;
654        self.last_module_ns = None;
655
656        // Set all component clocks to start
657        let clocks = self.collect_all_clocks();
658        Self::set_all_clocks_time(&clocks, start_ns);
659
660        // First-iteration initialization
661        if self.iteration == 0 {
662            self.set_instrument_expiration_timers()?;
663
664            self.run_config_id = run_config_id;
665            self.run_id = Some(UUID4::new());
666            self.run_started = Some(UnixNanos::from(std::time::SystemTime::now()));
667            self.backtest_start = Some(start_ns);
668
669            for exchange in self.venues.values() {
670                let mut ex = exchange.borrow_mut();
671                ex.initialize_account();
672                ex.load_open_orders();
673            }
674
675            // Re-set clocks after account init
676            Self::set_all_clocks_time(&clocks, start_ns);
677
678            // Reset force stop flag
679            self.force_stop = false;
680            self.kernel.reset_shutdown_flag();
681
682            // Initialize sync command senders (once per thread)
683            Self::init_command_senders();
684
685            // Set logging to static clock mode for deterministic timestamps
686            logging_clock_set_static_mode();
687            logging_clock_set_static_time(start_ns.as_u64());
688
689            // Start kernel, then stop before trader startup for event-store replay
690            self.kernel.start();
691            if self.kernel.is_event_store_replay() {
692                self.log_pre_run();
693                return Ok(());
694            }
695
696            if self.kernel.is_event_store_replay_configured() {
697                anyhow::bail!("event-store replay did not start");
698            }
699            self.kernel.start_trader();
700
701            self.log_pre_run();
702        }
703
704        self.log_run();
705
706        // Skip data before start_ns
707        let mut data = self.data_iterator.next_item();
708        while let Some(ref d) = data {
709            if d.ts_init() >= start_ns {
710                break;
711            }
712            data = self.data_iterator.next_item();
713        }
714
715        // Initialize last_ns before first data point
716        if let Some(ref d) = data {
717            let ts = d.ts_init();
718            self.last_ns = if ts.as_u64() > 0 {
719                UnixNanos::from(ts.as_u64() - 1)
720            } else {
721                UnixNanos::default()
722            };
723        } else {
724            self.last_ns = start_ns;
725        }
726
727        loop {
728            if self.kernel.is_shutdown_requested() {
729                log::info!("Shutdown requested via ShutdownSystem, ending backtest");
730                self.force_stop = true;
731            }
732
733            if self.force_stop {
734                log::error!("Force stop triggered, ending backtest");
735                break;
736            }
737
738            if data.is_none() {
739                if streaming {
740                    // In streaming mode, don't advance timers past the
741                    // current batch. The next batch will provide more data
742                    // and timers will fire naturally as time advances.
743                    break;
744                }
745                let done = self.process_next_timer(&clocks);
746                data = self.data_iterator.next_item();
747                if data.is_none() && done {
748                    break;
749                }
750                continue;
751            }
752
753            let d = data.as_ref().unwrap();
754            let ts_init = d.ts_init();
755
756            if ts_init > end_ns {
757                break;
758            }
759
760            if ts_init > self.last_ns {
761                self.last_ns = ts_init;
762                self.advance_time_impl(ts_init, &clocks);
763            }
764
765            // A timer fired during clock advance may have requested shutdown,
766            // skip delivering this data point in that case
767            if self.kernel.is_shutdown_requested() {
768                self.force_stop = true;
769                break;
770            }
771
772            self.route_data_to_exchange(d);
773            self.kernel.data_engine.borrow_mut().process_data(d.clone());
774
775            // Drain deferred commands, then process exchange queues
776            self.drain_command_queues();
777            self.settle_venues(ts_init);
778
779            let prev_last_ns = self.last_ns;
780            data = self.data_iterator.next_item();
781
782            // If timestamp changed (or exhausted), flush timers then run modules
783            if data.is_none() || data.as_ref().unwrap().ts_init() > prev_last_ns {
784                self.flush_accumulator_events(&clocks, prev_last_ns);
785                self.run_venue_modules(prev_last_ns);
786                self.run_venue_liquidations(prev_last_ns);
787            }
788
789            self.iteration += 1;
790        }
791
792        // Process remaining exchange messages
793        let ts_now = self.kernel.clock.borrow().timestamp_ns();
794        self.settle_venues(ts_now);
795        self.run_venue_modules(ts_now);
796        self.run_venue_liquidations(ts_now);
797
798        // Cap at last_ns when streaming or after shutdown to avoid firing
799        // timers past the current batch or the graceful stop
800        let flush_ts = if streaming || self.force_stop || self.kernel.is_shutdown_requested() {
801            self.last_ns
802        } else {
803            end_ns
804        };
805        self.flush_accumulator_events(&clocks, flush_ts);
806
807        Ok(())
808    }
809
810    /// Manually end the backtest.
811    pub fn end(&mut self) {
812        // Flush remaining timer events to the backtest end boundary so that
813        // tail alerts/expiries scheduled after the last data point still fire.
814        // Must run before stopping engines since DataEngine::stop() cancels
815        // bar aggregator timers. When a shutdown was requested, cap the flush
816        // at the last processed timestamp so timers scheduled past the stop
817        // point do not fire extra callbacks after the graceful stop request.
818        if self.end_ns.as_u64() > 0 {
819            let clocks = self.collect_all_clocks();
820            let flush_ts = if self.force_stop || self.kernel.is_shutdown_requested() {
821                self.last_ns
822            } else {
823                self.end_ns
824            };
825
826            self.flush_accumulator_events(&clocks, flush_ts);
827        }
828
829        self.kernel.stop_trader();
830
831        // Settle residual on_stop commands before stopping engines. Venue modules are
832        // not re-run; process_modules is once per timestamp.
833        let mut ts_now = self.kernel.clock.borrow().timestamp_ns();
834
835        // Drain first so latency-deferred commands reach venue inflight queues
836        self.drain_command_queues();
837
838        // Advance the clock to the latest inflight arrival; otherwise commands deferred
839        // by a LatencyModel sit past ts_now and never settle.
840        if let Some(max_inflight_ts) = self.max_inflight_command_ts()
841            && max_inflight_ts > ts_now
842        {
843            ts_now = max_inflight_ts;
844            let clocks = self.collect_all_clocks();
845            Self::set_all_clocks_time(&clocks, ts_now);
846        }
847
848        self.settle_venues(ts_now);
849
850        // Stop engines
851        self.kernel.data_engine.borrow_mut().stop();
852        self.kernel.risk_engine.borrow_mut().stop();
853        self.kernel.exec_engine.borrow_mut().stop();
854
855        self.run_finished = Some(UnixNanos::from(std::time::SystemTime::now()));
856        self.backtest_end = Some(self.kernel.clock.borrow().timestamp_ns());
857
858        // Switch logging back to realtime mode
859        logging_clock_set_realtime_mode();
860
861        self.log_post_run();
862    }
863
864    /// Reset the backtest engine.
865    ///
866    /// All stateful fields are reset to their initial value. Data and instruments
867    /// persist across resets to enable repeated runs with different strategies.
868    pub fn reset(&mut self) {
869        log::debug!("Resetting");
870
871        if self.kernel.trader.borrow().is_running() {
872            self.end();
873        }
874
875        // Stop and reset engines
876        self.kernel.data_engine.borrow_mut().stop();
877        self.kernel.data_engine.borrow_mut().reset();
878
879        self.kernel.exec_engine.borrow_mut().stop();
880
881        // Reset exchanges before the exec engine wipes the cache so
882        // exchange.reset() can see the prior run's account.
883        for exchange in self.venues.values() {
884            exchange.borrow_mut().reset();
885        }
886        self.kernel.exec_engine.borrow_mut().reset();
887
888        self.kernel.risk_engine.borrow_mut().stop();
889        self.kernel.risk_engine.borrow_mut().reset();
890
891        // Reset trader
892        if let Err(e) = self.kernel.trader.borrow_mut().reset() {
893            log::error!("Error resetting trader: {e:?}");
894        }
895
896        self.kernel.portfolio.borrow_mut().reset();
897
898        // Clear run state
899        self.run_config_id = None;
900        self.run_id = None;
901        self.run_started = None;
902        self.run_finished = None;
903        self.backtest_start = None;
904        self.backtest_end = None;
905        self.iteration = 0;
906        self.force_stop = false;
907        self.last_ns = UnixNanos::default();
908        self.last_module_ns = None;
909        self.last_liquidation_ns = None;
910        self.end_ns = UnixNanos::default();
911
912        self.accumulator.clear();
913
914        // Reset all iterator cursors to beginning (data persists)
915        self.data_iterator.reset_all_cursors();
916
917        log::info!("Reset");
918    }
919
920    /// Sort the engine's internal data stream by timestamp.
921    ///
922    /// Useful when data has been added with `sort=false` for batch performance,
923    /// then sorted once before running.
924    pub fn sort_data(&mut self) {
925        // Each add call creates its own stream; the iterator merges streams by
926        // replay timestamp across streams. Mark the engine as sorted so `run`
927        // no longer rejects it.
928        self.sorted = true;
929        log::info!("Data sort requested (iterator merges streams by replay timestamp)");
930    }
931
932    /// Clear the engine's internal data stream. Does not clear instruments.
933    pub fn clear_data(&mut self) {
934        self.has_data.clear();
935        self.has_book_data.clear();
936        self.data_iterator = BacktestDataIterator::new();
937        self.data_len = 0;
938        self.data_stream_counter = 0;
939        self.ts_first = None;
940        self.ts_last_data = None;
941        self.sorted = true;
942    }
943
944    /// Clear all actors from the engine's internal trader.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if any actor fails to dispose.
949    pub fn clear_actors(&mut self) -> anyhow::Result<()> {
950        self.kernel.trader.borrow_mut().clear_actors()
951    }
952
953    /// Clear all trading strategies from the engine's internal trader.
954    ///
955    /// # Errors
956    ///
957    /// Returns an error if any strategy fails to dispose.
958    pub fn clear_strategies(&mut self) -> anyhow::Result<()> {
959        self.kernel.trader.borrow_mut().clear_strategies()
960    }
961
962    /// Clear all execution algorithms from the engine's internal trader.
963    ///
964    /// # Errors
965    ///
966    /// Returns an error if any execution algorithm fails to dispose.
967    pub fn clear_exec_algorithms(&mut self) -> anyhow::Result<()> {
968        self.kernel.trader.borrow_mut().clear_exec_algorithms()
969    }
970
971    /// Dispose of the backtest engine, releasing all resources.
972    pub fn dispose(&mut self) {
973        self.clear_data();
974        self.accumulator.clear();
975        self.kernel.dispose();
976    }
977
978    /// Return the backtest result from the last run.
979    #[must_use]
980    pub fn get_result(&self) -> BacktestResult {
981        let elapsed_time_secs = match (self.backtest_start, self.backtest_end) {
982            (Some(start), Some(end)) => {
983                (end.as_u64() as f64 - start.as_u64() as f64) / 1_000_000_000.0
984            }
985            _ => 0.0,
986        };
987
988        let cache = self.kernel.cache.borrow();
989        let orders = cache.orders(None, None, None, None, None);
990        let total_events = self.kernel.exec_engine.borrow().event_count() as usize;
991        let total_orders = orders.len();
992        let positions: Vec<Position> = cache
993            .positions(None, None, None, None, None)
994            .into_iter()
995            .map(|p| p.cloned())
996            .collect();
997        let cached_positions_count = positions.len();
998        let snapshot_positions = cache.position_snapshots(None, None).len();
999        let total_positions = Self::total_positions_with_snapshots(&cache, cached_positions_count);
1000        let summary = self.build_result_summary(
1001            &cache,
1002            total_events,
1003            total_orders,
1004            cached_positions_count,
1005            snapshot_positions,
1006        );
1007
1008        let analyzer = self.build_analyzer(&cache, &positions);
1009        let mut stats_pnls = AHashMap::new();
1010
1011        for currency in analyzer.currencies() {
1012            if let Ok(pnls) = analyzer.get_performance_stats_pnls(Some(currency), None) {
1013                stats_pnls.insert(currency.code.to_string(), pnls);
1014            }
1015        }
1016
1017        let stats_returns = analyzer.get_performance_stats_returns();
1018        let stats_general = analyzer.get_performance_stats_general();
1019
1020        BacktestResult {
1021            trader_id: self.config.trader_id().to_string(),
1022            machine_id: self.kernel.machine_id.clone(),
1023            instance_id: self.instance_id,
1024            run_config_id: self.run_config_id.clone(),
1025            run_id: self.run_id,
1026            run_started: self.run_started,
1027            run_finished: self.run_finished,
1028            backtest_start: self.backtest_start,
1029            backtest_end: self.backtest_end,
1030            elapsed_time_secs,
1031            iterations: self.iteration,
1032            total_events,
1033            total_orders,
1034            total_positions,
1035            summary,
1036            stats_pnls,
1037            stats_returns,
1038            stats_general,
1039        }
1040    }
1041
1042    fn build_result_summary(
1043        &self,
1044        cache: &Cache,
1045        total_events: usize,
1046        total_orders: usize,
1047        cached_positions_count: usize,
1048        snapshot_positions: usize,
1049    ) -> AHashMap<String, String> {
1050        let mut summary = AHashMap::new();
1051        summary.insert("iterations".to_string(), self.iteration.to_string());
1052        summary.insert("total_events".to_string(), total_events.to_string());
1053        summary.insert("orders.total".to_string(), total_orders.to_string());
1054        summary.insert(
1055            "orders.open".to_string(),
1056            cache
1057                .orders_open_count(None, None, None, None, None)
1058                .to_string(),
1059        );
1060        summary.insert(
1061            "orders.closed".to_string(),
1062            cache
1063                .orders_closed_count(None, None, None, None, None)
1064                .to_string(),
1065        );
1066        summary.insert(
1067            "orders.emulated".to_string(),
1068            cache
1069                .orders_emulated_count(None, None, None, None, None)
1070                .to_string(),
1071        );
1072        summary.insert(
1073            "orders.inflight".to_string(),
1074            cache
1075                .orders_inflight_count(None, None, None, None, None)
1076                .to_string(),
1077        );
1078        summary.insert(
1079            "positions.total".to_string(),
1080            cached_positions_count.to_string(),
1081        );
1082        summary.insert(
1083            "positions.open".to_string(),
1084            cache
1085                .positions_open_count(None, None, None, None, None)
1086                .to_string(),
1087        );
1088        summary.insert(
1089            "positions.closed".to_string(),
1090            cache
1091                .positions_closed_count(None, None, None, None, None)
1092                .to_string(),
1093        );
1094        summary.insert(
1095            "positions.snapshots".to_string(),
1096            snapshot_positions.to_string(),
1097        );
1098        summary.insert(
1099            "positions.total_with_snapshots".to_string(),
1100            (cached_positions_count + snapshot_positions).to_string(),
1101        );
1102
1103        let mut venues: Vec<Venue> = self.venues.keys().copied().collect();
1104        venues.sort_by_key(ToString::to_string);
1105        summary.insert("venues.total".to_string(), venues.len().to_string());
1106
1107        for venue in venues {
1108            let Some(account) = cache.account_for_venue(&venue) else {
1109                continue;
1110            };
1111
1112            let venue_key = venue.to_string();
1113            let account_key = format!("account.{venue_key}");
1114            summary.insert(format!("{account_key}.id"), account.id().to_string());
1115            summary.insert(
1116                format!("{account_key}.type"),
1117                account.account_type().to_string(),
1118            );
1119            summary.insert(
1120                format!("{account_key}.base_currency"),
1121                account
1122                    .base_currency()
1123                    .map_or_else(|| "None".to_string(), |currency| currency.code.to_string()),
1124            );
1125            summary.insert(
1126                format!("{account_key}.event_count"),
1127                account.event_count().to_string(),
1128            );
1129
1130            let mut balances: Vec<_> = account.balances().into_iter().collect();
1131            balances.sort_by_key(|(currency, _)| currency.code.to_string());
1132
1133            for (currency, balance) in balances {
1134                let balance_key = format!("{account_key}.balance.{}", currency.code);
1135                summary.insert(format!("{balance_key}.total"), balance.total.to_string());
1136                summary.insert(format!("{balance_key}.free"), balance.free.to_string());
1137                summary.insert(format!("{balance_key}.locked"), balance.locked.to_string());
1138            }
1139        }
1140
1141        summary
1142    }
1143
1144    fn build_analyzer(&self, cache: &Cache, positions: &[Position]) -> PortfolioAnalyzer {
1145        let mut analyzer = PortfolioAnalyzer::default();
1146        let mut snapshot_positions = Vec::new();
1147
1148        for position in positions {
1149            snapshot_positions.extend(cache.position_snapshots(Some(&position.id), None));
1150        }
1151
1152        // Aggregate starting and current balances across all venue accounts
1153        for venue in self.venues.keys() {
1154            if let Some(account) = cache.account_for_venue(venue) {
1155                let account_ref: &dyn Account = match &*account {
1156                    AccountAny::Margin(margin) => margin,
1157                    AccountAny::Cash(cash) => cash,
1158                    AccountAny::Betting(betting) => betting,
1159                };
1160
1161                for (currency, money) in account_ref.starting_balances() {
1162                    analyzer
1163                        .account_balances_starting
1164                        .entry(currency)
1165                        .and_modify(|existing| *existing = *existing + money)
1166                        .or_insert(money);
1167                }
1168
1169                for (currency, money) in account_ref.balances_total() {
1170                    analyzer
1171                        .account_balances
1172                        .entry(currency)
1173                        .and_modify(|existing| *existing = *existing + money)
1174                        .or_insert(money);
1175                }
1176            }
1177        }
1178
1179        analyzer.add_positions(positions);
1180        analyzer.add_positions(&snapshot_positions);
1181        analyzer
1182    }
1183
1184    fn route_data_to_exchange(&self, data: &Data) {
1185        if matches!(
1186            data,
1187            Data::MarkPriceUpdate(_)
1188                | Data::IndexPriceUpdate(_)
1189                | Data::OptionGreeks(_)
1190                | Data::Custom(_)
1191        ) {
1192            return;
1193        }
1194        #[cfg(feature = "defi")]
1195        if matches!(data, Data::Defi(_)) {
1196            return;
1197        }
1198
1199        let venue = data.instrument_id().venue;
1200        if let Some(exchange) = self.venues.get(&venue) {
1201            let mut exchange_ref = exchange.borrow_mut();
1202
1203            match data {
1204                Data::Delta(delta) => exchange_ref.process_order_book_delta(*delta),
1205                Data::Deltas(deltas) => exchange_ref.process_order_book_deltas(deltas),
1206                Data::Depth10(depth) => exchange_ref.process_order_book_depth10(depth),
1207                Data::Quote(quote) => exchange_ref.process_quote_tick(quote),
1208                Data::Trade(trade) => exchange_ref.process_trade_tick(trade),
1209                Data::Bar(bar) => exchange_ref.process_bar(*bar),
1210                Data::InstrumentStatus(status) => exchange_ref.process_instrument_status(*status),
1211                Data::InstrumentClose(close) => exchange_ref.process_instrument_close(*close),
1212                Data::FundingRateUpdate(funding) => {
1213                    let settlement_ns = exchange_ref.process_funding_rate(*funding);
1214                    drop(exchange_ref);
1215                    self.schedule_funding_settlement_if_required(
1216                        exchange,
1217                        funding.instrument_id,
1218                        settlement_ns,
1219                    );
1220                }
1221                _ => {}
1222            }
1223        } else {
1224            log::warn!("No exchange found for venue {venue}, data not routed");
1225        }
1226    }
1227
1228    fn advance_time_impl(&mut self, ts_now: UnixNanos, clocks: &[Rc<RefCell<dyn Clock>>]) {
1229        for clock in clocks {
1230            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1231        }
1232
1233        // Process events with ts_event < ts_now
1234        let ts_before = if ts_now.as_u64() > 0 {
1235            UnixNanos::from(ts_now.as_u64() - 1)
1236        } else {
1237            UnixNanos::default()
1238        };
1239
1240        let mut ts_last: Option<UnixNanos> = None;
1241        let mut shutdown_at: Option<UnixNanos> = None;
1242
1243        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_before) {
1244            let ts_event = handler.event.ts_event;
1245
1246            // Settle previous timestamp batch before advancing
1247            if let Some(ts) = ts_last
1248                && ts != ts_event
1249            {
1250                self.settle_venues(ts);
1251                self.run_venue_modules(ts);
1252                self.run_venue_liquidations(ts);
1253            }
1254
1255            ts_last = Some(ts_event);
1256            Self::set_all_clocks_time(clocks, ts_event);
1257            logging_clock_set_static_time(ts_event.as_u64());
1258
1259            handler.run();
1260            self.drain_command_queues();
1261
1262            // Drop queued events on a handler-triggered shutdown so no later
1263            // timer fires after the graceful stop
1264            if self.kernel.is_shutdown_requested() {
1265                self.accumulator.clear();
1266                shutdown_at = Some(ts_event);
1267                break;
1268            }
1269
1270            // Re-advance clocks to capture chained timers
1271            for clock in clocks {
1272                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1273            }
1274        }
1275
1276        // Settle the last timestamp batch
1277        if let Some(ts) = ts_last {
1278            self.settle_venues(ts);
1279            self.run_venue_modules(ts);
1280            self.run_venue_liquidations(ts);
1281        }
1282
1283        // On a mid-drain shutdown, anchor state at the firing timer's ts so
1284        // post-run settlement and backtest_end reflect the graceful stop
1285        if let Some(ts_event) = shutdown_at {
1286            self.last_ns = ts_event;
1287        } else {
1288            Self::set_all_clocks_time(clocks, ts_now);
1289            logging_clock_set_static_time(ts_now.as_u64());
1290        }
1291    }
1292
1293    fn flush_accumulator_events(&mut self, clocks: &[Rc<RefCell<dyn Clock>>], ts_now: UnixNanos) {
1294        // Bail after shutdown so handler-scheduled alerts do not fire post-stop
1295        if self.kernel.is_shutdown_requested() {
1296            self.accumulator.clear();
1297            return;
1298        }
1299
1300        for clock in clocks {
1301            Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1302        }
1303
1304        let mut ts_last: Option<UnixNanos> = None;
1305
1306        while let Some(handler) = self.accumulator.pop_next_at_or_before(ts_now) {
1307            let ts_event = handler.event.ts_event;
1308
1309            // Settle previous timestamp batch before advancing
1310            if let Some(ts) = ts_last
1311                && ts != ts_event
1312            {
1313                self.settle_venues(ts);
1314                self.run_venue_modules(ts);
1315                self.run_venue_liquidations(ts);
1316            }
1317
1318            ts_last = Some(ts_event);
1319            Self::set_all_clocks_time(clocks, ts_event);
1320            logging_clock_set_static_time(ts_event.as_u64());
1321
1322            handler.run();
1323            self.drain_command_queues();
1324
1325            // Drop queued events on a handler-triggered shutdown so no later
1326            // timer fires after the graceful stop
1327            if self.kernel.is_shutdown_requested() {
1328                self.accumulator.clear();
1329                break;
1330            }
1331
1332            // Re-advance clocks to capture chained timers
1333            for clock in clocks {
1334                Self::advance_clock_on_accumulator(&mut self.accumulator, clock, ts_now, false);
1335            }
1336        }
1337
1338        // Settle the last timestamp batch
1339        if let Some(ts) = ts_last {
1340            self.settle_venues(ts);
1341            self.run_venue_modules(ts);
1342            self.run_venue_liquidations(ts);
1343        }
1344    }
1345
1346    fn process_next_timer(&mut self, clocks: &[Rc<RefCell<dyn Clock>>]) -> bool {
1347        self.flush_accumulator_events(clocks, self.last_ns);
1348
1349        // Find minimum next timer time across all component clocks
1350        let mut min_next_time: Option<UnixNanos> = None;
1351
1352        for clock in clocks {
1353            let clock_ref = clock.borrow();
1354            for name in clock_ref.timer_names() {
1355                if let Some(next_time) = clock_ref.next_time_ns(name)
1356                    && next_time > self.last_ns
1357                {
1358                    min_next_time = Some(match min_next_time {
1359                        Some(current_min) => next_time.min(current_min),
1360                        None => next_time,
1361                    });
1362                }
1363            }
1364        }
1365
1366        match min_next_time {
1367            None => true,
1368            Some(t) if t > self.end_ns => true,
1369            Some(t) => {
1370                self.last_ns = t;
1371                self.flush_accumulator_events(clocks, t);
1372                false
1373            }
1374        }
1375    }
1376
1377    fn set_instrument_expiration_timers(&self) -> anyhow::Result<()> {
1378        for exchange in self.venues.values() {
1379            let expirations = exchange.borrow().instrument_expirations();
1380            for (instrument_id, expiration_ns) in expirations {
1381                self.set_instrument_expiration_timer(exchange, instrument_id, expiration_ns)?;
1382            }
1383        }
1384
1385        Ok(())
1386    }
1387
1388    fn set_instrument_expiration_timer(
1389        &self,
1390        exchange: &Rc<RefCell<SimulatedExchange>>,
1391        instrument_id: InstrumentId,
1392        expiration_ns: UnixNanos,
1393    ) -> anyhow::Result<()> {
1394        if expiration_ns == UnixNanos::default() {
1395            return Ok(());
1396        }
1397
1398        let timer_name = Self::instrument_expiration_timer_name(instrument_id);
1399        let exchange: Weak<RefCell<SimulatedExchange>> = Rc::downgrade(exchange);
1400        let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event: TimeEvent| {
1401            if let Some(exchange) = exchange.upgrade() {
1402                exchange
1403                    .borrow_mut()
1404                    .process_instrument_expirations(event.ts_event);
1405            }
1406        });
1407        let timer_key = ustr::Ustr::from(timer_name.as_str());
1408        let mut clock = self.kernel.clock.borrow_mut();
1409        if clock.timer_exists(&timer_key) {
1410            clock.cancel_timer(&timer_name);
1411        }
1412
1413        clock.set_time_alert_ns(
1414            &timer_name,
1415            expiration_ns,
1416            Some(TimeEventCallback::from(callback)),
1417            None,
1418        )?;
1419
1420        Ok(())
1421    }
1422
1423    fn instrument_expiration_timer_name(instrument_id: InstrumentId) -> String {
1424        format!("INSTRUMENT-EXPIRATION:{instrument_id}")
1425    }
1426
1427    fn schedule_funding_settlement_if_required(
1428        &self,
1429        exchange: &Rc<RefCell<SimulatedExchange>>,
1430        instrument_id: InstrumentId,
1431        settlement_ns: Option<UnixNanos>,
1432    ) {
1433        let Some(settlement_ns) = settlement_ns else {
1434            return;
1435        };
1436
1437        if let Err(e) = self.set_funding_settlement_timer(exchange, instrument_id, settlement_ns) {
1438            log::error!("Cannot schedule funding settlement for {instrument_id}: {e}");
1439        }
1440    }
1441
1442    fn set_funding_settlement_timer(
1443        &self,
1444        exchange: &Rc<RefCell<SimulatedExchange>>,
1445        instrument_id: InstrumentId,
1446        settlement_ns: UnixNanos,
1447    ) -> anyhow::Result<()> {
1448        let timer_name = Self::funding_settlement_timer_name(instrument_id);
1449        let exchange: Weak<RefCell<SimulatedExchange>> = Rc::downgrade(exchange);
1450        let callback: Rc<dyn Fn(TimeEvent)> = Rc::new(move |event: TimeEvent| {
1451            if let Some(exchange) = exchange.upgrade() {
1452                exchange
1453                    .borrow_mut()
1454                    .process_funding_settlement(instrument_id, event.ts_event);
1455            }
1456        });
1457        let timer_key = ustr::Ustr::from(timer_name.as_str());
1458        let mut clock = self.kernel.clock.borrow_mut();
1459        if clock.timer_exists(&timer_key) {
1460            clock.cancel_timer(&timer_name);
1461        }
1462
1463        clock.set_time_alert_ns(
1464            &timer_name,
1465            settlement_ns,
1466            Some(TimeEventCallback::from(callback)),
1467            None,
1468        )?;
1469
1470        Ok(())
1471    }
1472
1473    fn funding_settlement_timer_name(instrument_id: InstrumentId) -> String {
1474        format!("FUNDING-SETTLEMENT:{instrument_id}")
1475    }
1476
1477    fn collect_all_clocks(&self) -> Vec<Rc<RefCell<dyn Clock>>> {
1478        let mut clocks = vec![self.kernel.clock.clone()];
1479        clocks.extend(self.kernel.trader.borrow().get_component_clocks());
1480        clocks
1481    }
1482
1483    fn max_inflight_command_ts(&self) -> Option<UnixNanos> {
1484        self.venues
1485            .values()
1486            .filter_map(|v| v.borrow().max_inflight_command_ts())
1487            .max()
1488    }
1489
1490    fn settle_venues(&self, ts_now: UnixNanos) {
1491        // Advance venue clocks so modules and event generators see the
1492        // correct timestamp even when no commands are pending
1493        for exchange in self.venues.values() {
1494            exchange.borrow().set_clock_time(ts_now);
1495        }
1496
1497        // Drain commands then iterate matching engines to fill newly added
1498        // orders. Fills may enqueue further commands (e.g. hedge orders
1499        // submitted from on_order_filled), so loop until quiescent.
1500        // Only process and iterate venues that had pending commands each
1501        // pass, to avoid extra fill-model rolls on untouched venues.
1502        loop {
1503            // Drain first so commands buffered in the trading queue (e.g. from
1504            // on_stop handlers) reach the venues before we check for activity.
1505            self.drain_command_queues();
1506
1507            let active_venues: Vec<Venue> = self
1508                .venues
1509                .iter()
1510                .filter(|(_, ex)| ex.borrow().has_pending_commands(ts_now))
1511                .map(|(id, _)| *id)
1512                .collect();
1513
1514            if active_venues.is_empty() {
1515                break;
1516            }
1517
1518            for venue_id in &active_venues {
1519                self.venues[venue_id].borrow_mut().process(ts_now);
1520            }
1521            self.drain_command_queues();
1522
1523            for venue_id in &active_venues {
1524                self.venues[venue_id]
1525                    .borrow_mut()
1526                    .iterate_matching_engines(ts_now);
1527            }
1528
1529            // Drain again so fill-triggered commands (e.g. hedge orders
1530            // from on_order_filled) are visible to has_pending_commands
1531            self.drain_command_queues();
1532        }
1533    }
1534
1535    fn run_venue_modules(&mut self, ts_now: UnixNanos) {
1536        if self.last_module_ns == Some(ts_now) {
1537            return;
1538        }
1539        self.last_module_ns = Some(ts_now);
1540
1541        // Pre-settle handler-generated work so modules see final state
1542        self.drain_command_queues();
1543        self.settle_venues(ts_now);
1544
1545        for exchange in self.venues.values() {
1546            exchange.borrow_mut().process_modules(ts_now);
1547        }
1548
1549        // Post-settle any commands emitted by modules
1550        self.drain_command_queues();
1551        self.settle_venues(ts_now);
1552    }
1553
1554    fn run_venue_liquidations(&mut self, ts_now: UnixNanos) {
1555        if self.last_liquidation_ns == Some(ts_now) {
1556            return;
1557        }
1558        self.last_liquidation_ns = Some(ts_now);
1559
1560        for exchange in self.venues.values() {
1561            exchange.borrow_mut().process_liquidations(ts_now);
1562        }
1563
1564        self.drain_command_queues();
1565        self.settle_venues(ts_now);
1566    }
1567
1568    fn drain_exec_client_events(&self) {
1569        for client in &self.exec_clients {
1570            client.drain_queued_events();
1571        }
1572    }
1573
1574    fn drain_command_queues(&self) {
1575        // Drain trading commands, exec client events, and data commands
1576        // in a loop until all queues settle. Handles cascading re-entrancy
1577        // (e.g. strategy submits order from on_order_filled).
1578        loop {
1579            drain_trading_cmd_queue();
1580            drain_data_cmd_queue();
1581            self.drain_exec_client_events();
1582
1583            if trading_cmd_queue_is_empty() && data_cmd_queue_is_empty() {
1584                break;
1585            }
1586        }
1587    }
1588
1589    fn init_command_senders() {
1590        replace_data_cmd_sender(Arc::new(SyncDataCommandSender));
1591        replace_exec_cmd_sender(Arc::new(SyncTradingCommandSender));
1592    }
1593
1594    fn advance_clock_on_accumulator(
1595        accumulator: &mut TimeEventAccumulator,
1596        clock: &Rc<RefCell<dyn Clock>>,
1597        to_time_ns: UnixNanos,
1598        set_time: bool,
1599    ) {
1600        let mut clock_ref = clock.borrow_mut();
1601        let test_clock = clock_ref
1602            .as_any_mut()
1603            .downcast_mut::<TestClock>()
1604            .expect("BacktestEngine requires TestClock");
1605        accumulator.advance_clock(test_clock, to_time_ns, set_time);
1606    }
1607
1608    fn set_all_clocks_time(clocks: &[Rc<RefCell<dyn Clock>>], ts: UnixNanos) {
1609        for clock in clocks {
1610            let mut clock_ref = clock.borrow_mut();
1611            let test_clock = clock_ref
1612                .as_any_mut()
1613                .downcast_mut::<TestClock>()
1614                .expect("BacktestEngine requires TestClock");
1615            test_clock.set_time(ts);
1616        }
1617    }
1618
1619    #[rustfmt::skip]
1620    fn log_pre_run(&self) {
1621        log_info!("=================================================================", color = LogColor::Cyan);
1622        log_info!(" BACKTEST PRE-RUN", color = LogColor::Cyan);
1623        log_info!("=================================================================", color = LogColor::Cyan);
1624
1625        let cache = self.kernel.cache.borrow();
1626        for exchange in self.venues.values() {
1627            let ex = exchange.borrow();
1628            log_info!("=================================================================", color = LogColor::Cyan);
1629            log::info!(" SimulatedVenue {} ({})", ex.id, ex.account_type);
1630            log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1631
1632            if let Some(account) = cache.account_for_venue(&ex.id) {
1633                log::info!("Balances starting:");
1634                let account_ref: &dyn Account = match &*account {
1635                    AccountAny::Margin(margin) => margin,
1636                    AccountAny::Cash(cash) => cash,
1637                    AccountAny::Betting(betting) => betting,
1638                };
1639
1640                for balance in account_ref.starting_balances().values() {
1641                    log::info!("  {balance}");
1642                }
1643            }
1644        }
1645
1646        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1647    }
1648
1649    #[rustfmt::skip]
1650    fn log_run(&self) {
1651        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1652        let id = format_optional_uuid(self.run_id.as_ref());
1653        let start = format_optional_nanos(self.backtest_start);
1654
1655        log_info!("=================================================================", color = LogColor::Cyan);
1656        log_info!(" BACKTEST RUN", color = LogColor::Cyan);
1657        log_info!("=================================================================", color = LogColor::Cyan);
1658        log::info!("Run config ID:  {config_id}");
1659        log::info!("Run ID:         {id}");
1660        log::info!("Backtest start: {start}");
1661        log::info!("Data elements:  {}", self.data_len);
1662        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1663    }
1664
1665    #[rustfmt::skip]
1666    fn log_post_run(&self) {
1667        let cache = self.kernel.cache.borrow();
1668        let orders = cache.orders(None, None, None, None, None);
1669        let total_events = self.kernel.exec_engine.borrow().event_count() as usize;
1670        let total_orders = orders.len();
1671        let positions: Vec<Position> = cache
1672            .positions(None, None, None, None, None)
1673            .into_iter()
1674            .map(|p| p.cloned())
1675            .collect();
1676        let total_positions = Self::total_positions_with_snapshots(&cache, positions.len());
1677
1678        let config_id = self.run_config_id.as_deref().unwrap_or("None");
1679        let id = format_optional_uuid(self.run_id.as_ref());
1680        let started = format_optional_nanos(self.run_started);
1681        let finished = format_optional_nanos(self.run_finished);
1682        let elapsed = format_optional_duration(self.run_started, self.run_finished);
1683        let bt_start = format_optional_nanos(self.backtest_start);
1684        let bt_end = format_optional_nanos(self.backtest_end);
1685        let bt_range = format_optional_duration(self.backtest_start, self.backtest_end);
1686        let iterations = self.iteration.separate_with_underscores();
1687        let events = total_events.separate_with_underscores();
1688        let num_orders = total_orders.separate_with_underscores();
1689        let num_positions = total_positions.separate_with_underscores();
1690
1691        log_info!("=================================================================", color = LogColor::Cyan);
1692        log_info!(" BACKTEST POST-RUN", color = LogColor::Cyan);
1693        log_info!("=================================================================", color = LogColor::Cyan);
1694        log::info!("Run config ID:  {config_id}");
1695        log::info!("Run ID:         {id}");
1696        log::info!("Run started:    {started}");
1697        log::info!("Run finished:   {finished}");
1698        log::info!("Elapsed time:   {elapsed}");
1699        log::info!("Backtest start: {bt_start}");
1700        log::info!("Backtest end:   {bt_end}");
1701        log::info!("Backtest range: {bt_range}");
1702        log::info!("Iterations: {iterations}");
1703        log::info!("Total events: {events}");
1704        log::info!("Total orders: {num_orders}");
1705        log::info!("Total positions: {num_positions}");
1706
1707        if !self.config.run_analysis {
1708            return;
1709        }
1710
1711        let analyzer = self.build_analyzer(&cache, &positions);
1712        log_portfolio_performance(&analyzer);
1713    }
1714
1715    fn total_positions_with_snapshots(cache: &Cache, cached_positions_count: usize) -> usize {
1716        cached_positions_count + cache.position_snapshots(None, None).len()
1717    }
1718
1719    /// Registers a data client for the given `client_id` if one does not already exist.
1720    pub fn add_data_client_if_not_exists(&mut self, client_id: ClientId) {
1721        if self
1722            .kernel
1723            .data_engine
1724            .borrow()
1725            .registered_clients()
1726            .contains(&client_id)
1727        {
1728            return;
1729        }
1730
1731        let venue = Venue::from(client_id.as_str());
1732        let backtest_client = BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1733        let data_client_adapter = DataClientAdapter::new(
1734            backtest_client.client_id,
1735            None,
1736            false,
1737            false,
1738            Box::new(backtest_client),
1739        );
1740
1741        self.kernel
1742            .data_engine
1743            .borrow_mut()
1744            .register_client(data_client_adapter, None);
1745    }
1746
1747    /// Registers a market data client for the given `venue` if one does not already exist.
1748    pub fn add_market_data_client_if_not_exists(&mut self, venue: Venue) {
1749        let client_id = ClientId::from(venue.as_str());
1750
1751        if !self
1752            .kernel
1753            .data_engine
1754            .borrow()
1755            .registered_clients()
1756            .contains(&client_id)
1757        {
1758            let backtest_client =
1759                BacktestDataClient::new(client_id, venue, self.kernel.cache.clone());
1760            let data_client_adapter = DataClientAdapter::new(
1761                client_id,
1762                Some(venue),
1763                false,
1764                false,
1765                Box::new(backtest_client),
1766            );
1767            self.kernel
1768                .data_engine
1769                .borrow_mut()
1770                .register_client(data_client_adapter, Some(venue));
1771        }
1772    }
1773}
1774
1775fn format_optional_nanos(nanos: Option<UnixNanos>) -> String {
1776    nanos.map_or("None".to_string(), unix_nanos_to_iso8601)
1777}
1778
1779fn format_optional_uuid(uuid: Option<&UUID4>) -> String {
1780    uuid.map_or("None".to_string(), |id| id.to_string())
1781}
1782
1783fn format_optional_duration(start: Option<UnixNanos>, end: Option<UnixNanos>) -> String {
1784    match (start, end) {
1785        (Some(s), Some(e)) => {
1786            let delta = e.to_datetime_utc() - s.to_datetime_utc();
1787            let days = delta.num_days().abs();
1788            let hours = delta.num_hours().abs() % 24;
1789            let minutes = delta.num_minutes().abs() % 60;
1790            let seconds = delta.num_seconds().abs() % 60;
1791            let micros = delta.subsec_nanos().unsigned_abs() / 1_000;
1792            format!("{days} days {hours:02}:{minutes:02}:{seconds:02}.{micros:06}")
1793        }
1794        _ => "None".to_string(),
1795    }
1796}
1797
1798#[rustfmt::skip]
1799fn log_portfolio_performance(analyzer: &PortfolioAnalyzer) {
1800    log_info!("=================================================================", color = LogColor::Cyan);
1801    log_info!(" PORTFOLIO PERFORMANCE", color = LogColor::Cyan);
1802    log_info!("=================================================================", color = LogColor::Cyan);
1803
1804    for currency in analyzer.currencies() {
1805        log::info!(" PnL Statistics ({})", currency.code);
1806        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1807
1808        if let Ok(pnl_lines) = analyzer.get_stats_pnls_formatted(Some(currency), None) {
1809            for line in &pnl_lines {
1810                log::info!("{line}");
1811            }
1812        }
1813
1814        log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1815    }
1816
1817    log::info!(" Returns Statistics");
1818    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1819
1820    for line in &analyzer.get_stats_returns_formatted() {
1821        log::info!("{line}");
1822    }
1823    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1824
1825    log::info!(" General Statistics");
1826    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1827
1828    for line in &analyzer.get_stats_general_formatted() {
1829        log::info!("{line}");
1830    }
1831    log_info!("-----------------------------------------------------------------", color = LogColor::Cyan);
1832}
1833
1834#[cfg(test)]
1835mod tests {
1836    use nautilus_common::enums::Environment;
1837    use nautilus_execution::engine::SnapshotAnchorer;
1838    use nautilus_model::{
1839        data::{Data, InstrumentStatus},
1840        enums::{AccountType, BookType, MarketStatus, MarketStatusAction, OmsType},
1841        identifiers::Venue,
1842        instruments::{
1843            CryptoPerpetual, Instrument, InstrumentAny, stubs::crypto_perpetual_ethusdt,
1844        },
1845        types::Money,
1846    };
1847    use nautilus_system::{KernelEventStore, RegisteredComponents};
1848    use rstest::*;
1849
1850    use super::*;
1851
1852    #[derive(Debug)]
1853    struct BacktestReplayKernelEventStore {
1854        fail_restore: bool,
1855    }
1856
1857    impl KernelEventStore for BacktestReplayKernelEventStore {
1858        fn restore_parent_cache(
1859            &mut self,
1860            _instance_id: UUID4,
1861            _cache: &mut Cache,
1862        ) -> anyhow::Result<()> {
1863            if self.fail_restore {
1864                anyhow::bail!("replay restore failed");
1865            }
1866
1867            Ok(())
1868        }
1869
1870        fn open(
1871            &mut self,
1872            _instance_id: UUID4,
1873            _components: &RegisteredComponents,
1874            _environment: Environment,
1875        ) -> anyhow::Result<()> {
1876            Ok(())
1877        }
1878
1879        fn snapshot_anchorer(&self) -> Option<SnapshotAnchorer> {
1880            None
1881        }
1882
1883        fn seal(&mut self, _ts_init: UnixNanos) {}
1884
1885        fn run_id(&self) -> Option<&str> {
1886            Some("replay-child")
1887        }
1888
1889        fn parent_run_id(&self) -> Option<&str> {
1890            Some("seed-run")
1891        }
1892
1893        fn is_event_store_replay_configured(&self) -> bool {
1894            true
1895        }
1896
1897        fn is_halted(&self) -> bool {
1898            false
1899        }
1900    }
1901
1902    fn create_engine() -> BacktestEngine {
1903        let mut engine = BacktestEngine::new(BacktestEngineConfig::default()).unwrap();
1904        let venue_config = SimulatedVenueConfig::builder()
1905            .venue(Venue::from("BINANCE"))
1906            .oms_type(OmsType::Netting)
1907            .account_type(AccountType::Margin)
1908            .book_type(BookType::L1_MBP)
1909            .starting_balances(vec![Money::from("1_000_000 USDT")])
1910            .build();
1911        engine.add_venue(venue_config).unwrap();
1912        engine
1913    }
1914
1915    fn create_engine_with_replay_store(fail_restore: bool) -> BacktestEngine {
1916        let config = BacktestEngineConfig {
1917            load_state: true,
1918            run_analysis: false,
1919            ..Default::default()
1920        };
1921        let mut engine = BacktestEngine::new(config.clone()).unwrap();
1922        let event_store_factory = move |_instance_id: UUID4, _clock: Rc<RefCell<dyn Clock>>| {
1923            Ok::<_, anyhow::Error>(Box::new(BacktestReplayKernelEventStore { fail_restore })
1924                as Box<dyn KernelEventStore>)
1925        };
1926
1927        engine.kernel = NautilusKernel::new_with(
1928            "BacktestEngine".to_string(),
1929            config,
1930            None,
1931            Some(Box::new(event_store_factory)),
1932        )
1933        .unwrap();
1934        engine.instance_id = engine.kernel.instance_id;
1935        engine
1936    }
1937
1938    #[rstest]
1939    fn test_run_impl_event_store_replay_skips_trader_start() {
1940        let mut engine = create_engine_with_replay_store(false);
1941
1942        engine
1943            .run_impl(
1944                Some(UnixNanos::from(0)),
1945                Some(UnixNanos::from(1)),
1946                None,
1947                true,
1948            )
1949            .unwrap();
1950
1951        assert!(engine.kernel.is_event_store_replay_configured());
1952        assert!(engine.kernel.is_event_store_replay());
1953        assert!(!engine.kernel.trader.borrow().is_running());
1954    }
1955
1956    #[rstest]
1957    fn test_run_impl_event_store_replay_config_failure_errors() {
1958        let mut engine = create_engine_with_replay_store(true);
1959
1960        let error = engine
1961            .run_impl(
1962                Some(UnixNanos::from(0)),
1963                Some(UnixNanos::from(1)),
1964                None,
1965                true,
1966            )
1967            .unwrap_err();
1968
1969        assert_eq!(error.to_string(), "event-store replay did not start");
1970        assert!(engine.kernel.is_event_store_replay_configured());
1971        assert!(!engine.kernel.is_event_store_replay());
1972        assert!(!engine.kernel.trader.borrow().is_running());
1973    }
1974
1975    #[rstest]
1976    #[case(None)]
1977    #[case(Some(true))]
1978    #[case(Some(false))]
1979    fn test_new_forces_drop_instruments_on_reset_false(
1980        crypto_perpetual_ethusdt: CryptoPerpetual,
1981        #[case] user_value: Option<bool>,
1982    ) {
1983        use nautilus_common::cache::CacheConfig;
1984
1985        let config = match user_value {
1986            None => BacktestEngineConfig::builder().build(),
1987            Some(value) => BacktestEngineConfig::builder()
1988                .cache(
1989                    CacheConfig::builder()
1990                        .drop_instruments_on_reset(value)
1991                        .build(),
1992                )
1993                .build(),
1994        };
1995        let mut engine = BacktestEngine::new(config).unwrap();
1996
1997        let venue_config = SimulatedVenueConfig::builder()
1998            .venue(Venue::from("BINANCE"))
1999            .oms_type(OmsType::Netting)
2000            .account_type(AccountType::Margin)
2001            .book_type(BookType::L1_MBP)
2002            .starting_balances(vec![Money::from("1_000_000 USDT")])
2003            .build();
2004        engine.add_venue(venue_config).unwrap();
2005
2006        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
2007        let instrument_id = instrument.id();
2008        engine.add_instrument(&instrument).unwrap();
2009
2010        engine.reset();
2011
2012        assert!(
2013            engine
2014                .kernel()
2015                .cache
2016                .borrow()
2017                .instrument(&instrument_id)
2018                .is_some(),
2019            "instrument must survive engine.reset(); user-supplied \
2020             drop_instruments_on_reset={user_value:?} must not leak through",
2021        );
2022    }
2023
2024    #[rstest]
2025    fn test_route_data_to_exchange_instrument_status(crypto_perpetual_ethusdt: CryptoPerpetual) {
2026        let mut engine = create_engine();
2027        let instrument = InstrumentAny::CryptoPerpetual(crypto_perpetual_ethusdt);
2028        let instrument_id = instrument.id();
2029        engine.add_instrument(&instrument).unwrap();
2030
2031        let status = InstrumentStatus::new(
2032            instrument_id,
2033            MarketStatusAction::Close,
2034            UnixNanos::from(1),
2035            UnixNanos::from(1),
2036            None,
2037            None,
2038            None,
2039            None,
2040            None,
2041        );
2042
2043        engine.route_data_to_exchange(&Data::InstrumentStatus(status));
2044
2045        let exchange = engine.venues.get(&instrument_id.venue).unwrap().borrow();
2046        let market_status = exchange
2047            .get_matching_engine(&instrument_id)
2048            .unwrap()
2049            .market_status;
2050        assert_eq!(market_status, MarketStatus::Closed);
2051    }
2052}