Skip to main content

nautilus_backtest/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a [`BacktestNode`] that orchestrates catalog-driven backtests.
17
18use std::iter::Peekable;
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_core::UnixNanos;
22use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny};
23use nautilus_model::{
24    data::{
25        Bar, Data, HasTsInit, IndexPriceUpdate, InstrumentClose, MarkPriceUpdate, OrderBookDelta,
26        OrderBookDepth10, QuoteTick, TradeTick,
27    },
28    enums::BookType,
29    identifiers::{InstrumentId, Venue},
30    types::Money,
31};
32use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
33use rust_decimal::{Decimal, prelude::FromPrimitive};
34
35use crate::{
36    config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType},
37    engine::{BacktestEngine, BacktestResult},
38};
39
40/// Orchestrates catalog-driven backtests from run configurations.
41///
42/// `BacktestNode` connects the [`ParquetDataCatalog`] with [`BacktestEngine`] to load
43/// historical data and run backtests. Supports both oneshot and streaming modes.
44#[derive(Debug)]
45#[cfg_attr(
46    feature = "python",
47    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
48)]
49pub struct BacktestNode {
50    configs: Vec<BacktestRunConfig>,
51    engines: AHashMap<String, BacktestEngine>,
52}
53
54impl BacktestNode {
55    /// Creates a new [`BacktestNode`] instance.
56    ///
57    /// Validates that configs are non-empty and internally consistent:
58    /// - All data config instrument venues must have a matching venue config.
59    /// - L2/L3 book types require order book data in the data configs.
60    /// - Data config time ranges must be valid (start <= end).
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if `configs` is empty or validation fails.
65    pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
66        anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
67        validate_configs(&configs)?;
68        Ok(Self {
69            configs,
70            engines: AHashMap::new(),
71        })
72    }
73
74    /// Returns the run configurations.
75    #[must_use]
76    pub fn configs(&self) -> &[BacktestRunConfig] {
77        &self.configs
78    }
79
80    /// Builds backtest engines from the run configurations.
81    ///
82    /// For each config, creates a [`BacktestEngine`], adds venues, and loads
83    /// instruments from the catalog.
84    ///
85    /// # Errors
86    ///
87    /// Returns an error if engine creation, venue setup, or instrument loading fails.
88    pub fn build(&mut self) -> anyhow::Result<()> {
89        for config in &self.configs {
90            if self.engines.contains_key(config.id()) {
91                continue;
92            }
93
94            let engine_config = config.engine().clone();
95            let mut engine = BacktestEngine::new(engine_config)?;
96
97            for venue_config in config.venues() {
98                let starting_balances: Vec<Money> = venue_config
99                    .starting_balances()
100                    .iter()
101                    .map(|s| s.parse::<Money>())
102                    .collect::<Result<Vec<_>, _>>()
103                    .map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
104
105                let default_leverage = venue_config.default_leverage().and_then(Decimal::from_f64);
106                let leverages: AHashMap<InstrumentId, Decimal> = venue_config
107                    .leverages()
108                    .map(|m| {
109                        m.iter()
110                            .map(|(k, v)| {
111                                Decimal::from_f64(*v).map(|d| (*k, d)).ok_or_else(|| {
112                                    anyhow::anyhow!("Invalid leverage {v} for instrument {k}")
113                                })
114                            })
115                            .collect::<anyhow::Result<AHashMap<_, _>>>()
116                    })
117                    .transpose()?
118                    .unwrap_or_default();
119
120                engine.add_venue(
121                    Venue::from(venue_config.name().as_str()),
122                    venue_config.oms_type(),
123                    venue_config.account_type(),
124                    venue_config.book_type(),
125                    starting_balances,
126                    venue_config.base_currency(),
127                    default_leverage,
128                    leverages,
129                    Vec::new(),
130                    FillModelAny::default(),
131                    FeeModelAny::default(),
132                    None, // latency_model
133                    Some(venue_config.routing()),
134                    Some(venue_config.reject_stop_orders()),
135                    Some(venue_config.support_gtd_orders()),
136                    Some(venue_config.support_contingent_orders()),
137                    Some(venue_config.use_position_ids()),
138                    Some(venue_config.use_random_ids()),
139                    Some(venue_config.use_reduce_only()),
140                    None, // use_message_queue
141                    Some(venue_config.use_market_order_acks()),
142                    Some(venue_config.bar_execution()),
143                    Some(venue_config.bar_adaptive_high_low_ordering()),
144                    Some(venue_config.trade_execution()),
145                    Some(venue_config.liquidity_consumption()),
146                    Some(venue_config.allow_cash_borrowing()),
147                    Some(venue_config.frozen_account()),
148                    Some(venue_config.price_protection_points()),
149                )?;
150            }
151
152            for data_config in config.data() {
153                let catalog = create_catalog(data_config)?;
154                let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
155                let filter = if instr_ids.is_empty() {
156                    None
157                } else {
158                    Some(instr_ids.iter().map(ToString::to_string).collect())
159                };
160
161                let instruments = catalog.query_instruments(filter)?;
162
163                if !instr_ids.is_empty() && instruments.is_empty() {
164                    let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
165                    anyhow::bail!(
166                        "No instruments found in catalog for requested IDs: [{}]",
167                        ids.join(", ")
168                    );
169                }
170
171                for instrument in instruments {
172                    engine.add_instrument(instrument)?;
173                }
174            }
175
176            self.engines.insert(config.id().to_string(), engine);
177        }
178
179        Ok(())
180    }
181
182    /// Returns a mutable reference to the engine for the given run config ID.
183    #[must_use]
184    pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
185        self.engines.get_mut(id)
186    }
187
188    /// Returns a reference to the engine for the given run config ID.
189    #[must_use]
190    pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
191        self.engines.get(id)
192    }
193
194    /// Returns all created backtest engines.
195    #[must_use]
196    pub fn get_engines(&self) -> Vec<&BacktestEngine> {
197        self.engines.values().collect()
198    }
199
200    /// Runs all configured backtests and returns results.
201    ///
202    /// Automatically calls [`build()`](Self::build) if engines have not been created yet.
203    /// For each run config, loads data from the catalog and runs the engine.
204    /// Supports both oneshot (`chunk_size = None`) and streaming modes.
205    ///
206    /// # Errors
207    ///
208    /// Returns an error if building, data loading, or engine execution fails.
209    pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
210        // Auto-build if not already done
211        if self.engines.is_empty() {
212            self.build()?;
213        }
214
215        let mut results = Vec::new();
216
217        for config in &self.configs {
218            let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
219                anyhow::anyhow!(
220                    "Engine not found for config '{}'. Call build() first.",
221                    config.id()
222                )
223            })?;
224
225            match config.chunk_size() {
226                None => run_oneshot(engine, config)?,
227                Some(chunk_size) => {
228                    anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
229                    run_streaming(engine, config, chunk_size)?;
230                }
231            }
232
233            results.push(engine.get_result());
234
235            if config.dispose_on_completion() {
236                engine.dispose();
237            } else {
238                engine.clear_data();
239            }
240        }
241
242        Ok(results)
243    }
244
245    /// Creates a [`ParquetDataCatalog`] from a data config.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if the catalog cannot be created from the URI.
250    pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
251        create_catalog(config)
252    }
253
254    /// Loads data from the catalog for a specific data config.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if catalog creation or data querying fails.
259    pub fn load_data_config(
260        config: &BacktestDataConfig,
261        start: Option<UnixNanos>,
262        end: Option<UnixNanos>,
263    ) -> anyhow::Result<Vec<Data>> {
264        load_data(config, start, end)
265    }
266
267    /// Disposes all engines and releases resources.
268    pub fn dispose(&mut self) {
269        for engine in self.engines.values_mut() {
270            engine.dispose();
271        }
272        self.engines.clear();
273    }
274}
275
276fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
277    // Kernel initialization sets a thread-local MessageBus that can only be
278    // initialized once per thread, so multiple engines cannot coexist
279    anyhow::ensure!(
280        configs.len() <= 1,
281        "Only one run config per BacktestNode is supported \
282         (kernel MessageBus is a thread-local singleton)"
283    );
284
285    let mut seen_ids = AHashSet::new();
286    for config in configs {
287        anyhow::ensure!(
288            seen_ids.insert(config.id()),
289            "Duplicate run config ID '{}'",
290            config.id()
291        );
292
293        let venue_names: Vec<String> = config
294            .venues()
295            .iter()
296            .map(|v| v.name().to_string())
297            .collect();
298
299        for data_config in config.data() {
300            if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
301                anyhow::ensure!(
302                    start <= end,
303                    "Data config start_time ({start}) must be <= end_time ({end})"
304                );
305            }
306
307            for instrument_id in data_config.get_instrument_ids()? {
308                let venue = instrument_id.venue.to_string();
309                anyhow::ensure!(
310                    venue_names.contains(&venue),
311                    "No venue config found for venue '{venue}' (required by instrument {instrument_id})"
312                );
313            }
314        }
315
316        for venue_config in config.venues() {
317            let needs_book_data = matches!(
318                venue_config.book_type(),
319                BookType::L2_MBP | BookType::L3_MBO
320            );
321
322            if needs_book_data {
323                let venue_name = venue_config.name().to_string();
324                let has_book_data = config.data().iter().any(|dc| {
325                    let is_book_type = matches!(
326                        dc.data_type(),
327                        NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
328                    );
329
330                    if !is_book_type {
331                        return false;
332                    }
333
334                    // Unfiltered config (no instrument filter) covers all venues
335                    let ids = dc.get_instrument_ids().unwrap_or_default();
336                    ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
337                });
338                anyhow::ensure!(
339                    has_book_data,
340                    "Venue '{venue_name}' has book_type {:?} but no order book data configured",
341                    venue_config.book_type()
342                );
343            }
344        }
345    }
346    Ok(())
347}
348
349fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
350    for data_config in config.data() {
351        let data = load_data(data_config, config.start(), config.end())?;
352        if data.is_empty() {
353            log::warn!("No data found for config: {:?}", data_config.data_type());
354            continue;
355        }
356        engine.add_data(data, data_config.client_id(), false, false);
357    }
358
359    engine.sort_data();
360    engine.run(
361        config.start(),
362        config.end(),
363        Some(config.id().to_string()),
364        false,
365    )
366}
367
368fn run_streaming(
369    engine: &mut BacktestEngine,
370    config: &BacktestRunConfig,
371    chunk_size: usize,
372) -> anyhow::Result<()> {
373    let data_configs = config.data();
374
375    if data_configs.len() == 1 {
376        // Single config: stream directly from catalog iterator without
377        // materializing the full dataset, bounded by chunk_size
378        let data_config = &data_configs[0];
379        let mut catalog = create_catalog(data_config)?;
380        let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
381        stream_chunks(engine, config, result.peekable(), chunk_size)?;
382    } else {
383        // Multiple configs require loading all data to merge-sort across types
384        let all_data = load_and_merge_data(config)?;
385        stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
386    }
387
388    Ok(())
389}
390
391// Feeds data from an iterator to the engine in timestamp-aligned chunks.
392// Each chunk contains up to `chunk_size` events, extended to include all
393// events sharing the boundary timestamp so timers flush correctly.
394fn stream_chunks<I: Iterator<Item = Data>>(
395    engine: &mut BacktestEngine,
396    config: &BacktestRunConfig,
397    mut iter: Peekable<I>,
398    chunk_size: usize,
399) -> anyhow::Result<()> {
400    if iter.peek().is_none() {
401        engine.end();
402        return Ok(());
403    }
404
405    let mut next_start = config.start();
406    loop {
407        let chunk = take_aligned_chunk(&mut iter, chunk_size);
408        if chunk.is_empty() {
409            break;
410        }
411
412        let is_last = iter.peek().is_none();
413        let end = if is_last {
414            config.end()
415        } else {
416            chunk.last().map(HasTsInit::ts_init)
417        };
418
419        engine.add_data(chunk, None, false, true);
420        engine.run(next_start, end, Some(config.id().to_string()), true)?;
421        engine.clear_data();
422
423        // Carry forward the end timestamp so the next chunk's run_impl
424        // sets clocks contiguously and processes gap timers correctly
425        next_start = end;
426    }
427
428    engine.end();
429    Ok(())
430}
431
432// Takes up to `chunk_size` items, then extends to include all remaining
433// items sharing the boundary timestamp to avoid splitting same-ts events.
434fn take_aligned_chunk<I: Iterator<Item = Data>>(
435    iter: &mut Peekable<I>,
436    chunk_size: usize,
437) -> Vec<Data> {
438    let mut chunk = Vec::with_capacity(chunk_size);
439
440    for _ in 0..chunk_size {
441        match iter.next() {
442            Some(item) => chunk.push(item),
443            None => return chunk,
444        }
445    }
446
447    if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
448        while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
449            chunk.push(iter.next().unwrap());
450        }
451    }
452
453    chunk
454}
455
456fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
457    let mut all_data = Vec::new();
458    for data_config in config.data() {
459        let data = load_data(data_config, config.start(), config.end())?;
460        if data.is_empty() {
461            log::warn!("No data found for config: {:?}", data_config.data_type());
462            continue;
463        }
464        all_data.extend(data);
465    }
466    all_data.sort_by_key(HasTsInit::ts_init);
467    Ok(all_data)
468}
469
470fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
471    let uri = match config.catalog_fs_protocol() {
472        Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
473        None => config.catalog_path().to_string(),
474    };
475    ParquetDataCatalog::from_uri(
476        &uri,
477        config.catalog_fs_storage_options().cloned(),
478        None,
479        None,
480        None,
481    )
482}
483
484fn load_data(
485    config: &BacktestDataConfig,
486    run_start: Option<UnixNanos>,
487    run_end: Option<UnixNanos>,
488) -> anyhow::Result<Vec<Data>> {
489    let mut catalog = create_catalog(config)?;
490    let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
491    Ok(result.collect())
492}
493
494fn dispatch_query(
495    catalog: &mut ParquetDataCatalog,
496    config: &BacktestDataConfig,
497    run_start: Option<UnixNanos>,
498    run_end: Option<UnixNanos>,
499) -> anyhow::Result<QueryResult> {
500    catalog.reset_session();
501
502    let identifiers = config.query_identifiers();
503    let start = max_opt(config.start_time(), run_start);
504    let end = min_opt(config.end_time(), run_end);
505    let filter = config.filter_expr();
506    let optimize = config.optimize_file_loading();
507
508    match config.data_type() {
509        NautilusDataType::QuoteTick => {
510            catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
511        }
512        NautilusDataType::TradeTick => {
513            catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
514        }
515        NautilusDataType::Bar => {
516            catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
517        }
518        NautilusDataType::OrderBookDelta => {
519            catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
520        }
521        NautilusDataType::OrderBookDepth10 => {
522            catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
523        }
524        NautilusDataType::MarkPriceUpdate => {
525            catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
526        }
527        NautilusDataType::IndexPriceUpdate => {
528            catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
529        }
530        NautilusDataType::InstrumentClose => {
531            catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
532        }
533    }
534}
535
536fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
537    match (a, b) {
538        (Some(a), Some(b)) => Some(a.max(b)),
539        (Some(a), None) => Some(a),
540        (None, Some(b)) => Some(b),
541        (None, None) => None,
542    }
543}
544
545fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
546    match (a, b) {
547        (Some(a), Some(b)) => Some(a.min(b)),
548        (Some(a), None) => Some(a),
549        (None, Some(b)) => Some(b),
550        (None, None) => None,
551    }
552}