Skip to main content

nautilus_backtest/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a [`BacktestNode`] that orchestrates catalog-driven backtests.
17
18use std::iter::Peekable;
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23    data::{
24        Bar, Data, FundingRateUpdate, HasTsInit, IndexPriceUpdate, InstrumentClose,
25        InstrumentStatus, MarkPriceUpdate, OptionGreeks, OrderBookDelta, OrderBookDepth10,
26        QuoteTick, TradeTick,
27    },
28    enums::{BookType, OtoTriggerMode},
29    identifiers::{InstrumentId, Venue},
30    instruments::Instrument,
31    types::Money,
32};
33use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
34
35use crate::{
36    config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType, SimulatedVenueConfig},
37    engine::BacktestEngine,
38    result::BacktestResult,
39};
40
41/// Orchestrates catalog-driven backtests from run configurations.
42///
43/// `BacktestNode` connects the [`ParquetDataCatalog`] with [`BacktestEngine`] to load
44/// historical data and run backtests. Supports both oneshot and streaming modes.
45#[derive(Debug)]
46#[cfg_attr(
47    feature = "python",
48    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
49)]
50#[cfg_attr(
51    feature = "python",
52    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.backtest")
53)]
54pub struct BacktestNode {
55    configs: Vec<BacktestRunConfig>,
56    engines: AHashMap<String, BacktestEngine>,
57}
58
59impl BacktestNode {
60    /// Creates a new [`BacktestNode`] instance.
61    ///
62    /// Validates that configs are non-empty and internally consistent:
63    /// - All data config instrument venues must have a matching venue config.
64    /// - L2/L3 book types require order book data in the data configs.
65    /// - Data config time ranges must be valid (start <= end).
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if `configs` is empty or validation fails.
70    pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
71        anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
72        validate_configs(&configs)?;
73        Ok(Self {
74            configs,
75            engines: AHashMap::new(),
76        })
77    }
78
79    /// Returns the run configurations.
80    #[must_use]
81    pub fn configs(&self) -> &[BacktestRunConfig] {
82        &self.configs
83    }
84
85    /// Builds backtest engines from the run configurations.
86    ///
87    /// For each config, creates a [`BacktestEngine`], adds venues, and loads
88    /// instruments from the catalog.
89    ///
90    /// # Errors
91    ///
92    /// Returns an error if engine creation, venue setup, or instrument loading fails.
93    pub fn build(&mut self) -> anyhow::Result<()> {
94        for config in &self.configs {
95            if self.engines.contains_key(config.id()) {
96                continue;
97            }
98
99            let engine_config = config.engine().clone();
100            let mut engine = BacktestEngine::new(engine_config)?;
101
102            for venue_config in config.venues() {
103                let starting_balances: Vec<Money> = venue_config
104                    .starting_balances()
105                    .iter()
106                    .map(|s| s.parse::<Money>())
107                    .collect::<Result<Vec<_>, _>>()
108                    .map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
109
110                let default_leverage = venue_config.default_leverage();
111                let leverages = venue_config.leverages().cloned().unwrap_or_default();
112                let margin_model = venue_config.margin_model().cloned();
113                let modules = venue_config
114                    .modules()
115                    .iter()
116                    .cloned()
117                    .map(Into::into)
118                    .collect();
119                let fill_model = venue_config.fill_model().cloned().unwrap_or_default();
120                let fee_model = venue_config.fee_model().cloned().unwrap_or_default();
121                let latency_model = venue_config.latency_model().cloned().map(Into::into);
122                let sim_config = SimulatedVenueConfig::builder()
123                    .venue(Venue::from(venue_config.name().as_str()))
124                    .oms_type(venue_config.oms_type())
125                    .account_type(venue_config.account_type())
126                    .book_type(venue_config.book_type())
127                    .starting_balances(starting_balances)
128                    .maybe_base_currency(venue_config.base_currency())
129                    .default_leverage(default_leverage)
130                    .leverages(leverages)
131                    .maybe_margin_model(margin_model)
132                    .modules(modules)
133                    .fill_model(fill_model)
134                    .fee_model(fee_model)
135                    .maybe_latency_model(latency_model)
136                    .routing(venue_config.routing())
137                    .reject_stop_orders(venue_config.reject_stop_orders())
138                    .support_gtd_orders(venue_config.support_gtd_orders())
139                    .support_contingent_orders(venue_config.support_contingent_orders())
140                    .use_position_ids(venue_config.use_position_ids())
141                    .use_random_ids(venue_config.use_random_ids())
142                    .use_reduce_only(venue_config.use_reduce_only())
143                    .use_market_order_acks(venue_config.use_market_order_acks())
144                    .bar_execution(venue_config.bar_execution())
145                    .bar_adaptive_high_low_ordering(venue_config.bar_adaptive_high_low_ordering())
146                    .trade_execution(venue_config.trade_execution())
147                    .liquidity_consumption(venue_config.liquidity_consumption())
148                    .allow_cash_borrowing(venue_config.allow_cash_borrowing())
149                    .frozen_account(venue_config.frozen_account())
150                    .queue_position(venue_config.queue_position())
151                    .oto_full_trigger(venue_config.oto_trigger_mode() == OtoTriggerMode::Full)
152                    .price_protection_points(venue_config.price_protection_points())
153                    .liquidation_enabled(venue_config.liquidation_enabled())
154                    .liquidation_trigger_ratio(venue_config.liquidation_trigger_ratio())
155                    .liquidation_cancel_open_orders(venue_config.liquidation_cancel_open_orders())
156                    .build();
157                engine.add_venue(sim_config)?;
158            }
159
160            for data_config in config.data() {
161                let catalog = create_catalog(data_config)?;
162                let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
163                let filter: Option<Vec<String>> = if instr_ids.is_empty() {
164                    None
165                } else {
166                    Some(instr_ids.iter().map(ToString::to_string).collect())
167                };
168
169                let instruments = catalog.query_instruments(filter.as_deref())?;
170
171                if !instr_ids.is_empty() && instruments.is_empty() {
172                    let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
173                    anyhow::bail!(
174                        "No instruments found in catalog for requested IDs: [{}]",
175                        ids.join(", ")
176                    );
177                }
178
179                for instrument in instruments {
180                    engine.add_instrument(&instrument)?;
181                }
182            }
183
184            for venue_config in config.venues() {
185                let Some(settlement_prices) = venue_config.settlement_prices() else {
186                    continue;
187                };
188                let venue = Venue::from(venue_config.name().as_str());
189
190                for (instrument_id, raw_price) in settlement_prices {
191                    let price = {
192                        let cache = engine.kernel().cache.borrow();
193                        let instrument = cache.instrument(instrument_id).ok_or_else(|| {
194                            anyhow::anyhow!(
195                                "No instrument found for settlement price configuration: {instrument_id}"
196                            )
197                        })?;
198                        instrument.make_price(*raw_price)
199                    };
200                    engine.set_settlement_price(venue, *instrument_id, price)?;
201                }
202            }
203
204            self.engines.insert(config.id().to_string(), engine);
205        }
206
207        Ok(())
208    }
209
210    /// Returns a mutable reference to the engine for the given run config ID.
211    #[must_use]
212    pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
213        self.engines.get_mut(id)
214    }
215
216    /// Returns a reference to the engine for the given run config ID.
217    #[must_use]
218    pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
219        self.engines.get(id)
220    }
221
222    /// Returns all created backtest engines.
223    #[must_use]
224    pub fn get_engines(&self) -> Vec<&BacktestEngine> {
225        self.engines.values().collect()
226    }
227
228    /// Runs all configured backtests and returns results.
229    ///
230    /// Automatically calls [`build()`](Self::build) if engines have not been created yet.
231    /// For each run config, loads data from the catalog and runs the engine.
232    /// Supports both oneshot (`chunk_size = None`) and streaming modes.
233    ///
234    /// # Errors
235    ///
236    /// Returns an error if building, data loading, or engine execution fails.
237    pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
238        // Auto-build if not already done
239        if self.engines.is_empty() {
240            self.build()?;
241        }
242
243        let mut results = Vec::new();
244
245        for config in &self.configs {
246            let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
247                anyhow::anyhow!(
248                    "Engine not found for config '{}'. Call build() first.",
249                    config.id()
250                )
251            })?;
252
253            match config.chunk_size() {
254                None => run_oneshot(engine, config)?,
255                Some(chunk_size) => {
256                    anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
257                    run_streaming(engine, config, chunk_size)?;
258                }
259            }
260
261            results.push(engine.get_result());
262
263            if config.dispose_on_completion() {
264                engine.dispose();
265            } else {
266                engine.clear_data();
267            }
268        }
269
270        Ok(results)
271    }
272
273    /// Creates a [`ParquetDataCatalog`] from a data config.
274    ///
275    /// # Errors
276    ///
277    /// Returns an error if the catalog cannot be created from the URI.
278    pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
279        create_catalog(config)
280    }
281
282    /// Loads data from the catalog for a specific data config.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if catalog creation or data querying fails.
287    pub fn load_data_config(
288        config: &BacktestDataConfig,
289        start: Option<UnixNanos>,
290        end: Option<UnixNanos>,
291    ) -> anyhow::Result<Vec<Data>> {
292        load_data(config, start, end)
293    }
294
295    /// Disposes all engines and releases resources.
296    pub fn dispose(&mut self) {
297        for engine in self.engines.values_mut() {
298            engine.dispose();
299        }
300        self.engines.clear();
301    }
302}
303
304fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
305    // Kernel initialization sets a thread-local MessageBus that can only be
306    // initialized once per thread, so multiple engines cannot coexist
307    anyhow::ensure!(
308        configs.len() <= 1,
309        "Only one run config per BacktestNode is supported \
310         (kernel MessageBus is a thread-local singleton)"
311    );
312
313    let mut seen_ids = AHashSet::new();
314
315    for config in configs {
316        anyhow::ensure!(
317            seen_ids.insert(config.id()),
318            "Duplicate run config ID '{}'",
319            config.id()
320        );
321
322        let venue_names: Vec<String> = config
323            .venues()
324            .iter()
325            .map(|v| v.name().to_string())
326            .collect();
327
328        for data_config in config.data() {
329            if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
330                anyhow::ensure!(
331                    start <= end,
332                    "Data config start_time ({start}) must be <= end_time ({end})"
333                );
334            }
335
336            for instrument_id in data_config.get_instrument_ids()? {
337                let venue = instrument_id.venue.to_string();
338                anyhow::ensure!(
339                    venue_names.contains(&venue),
340                    "No venue config found for venue '{venue}' (required by instrument {instrument_id})"
341                );
342            }
343        }
344
345        for venue_config in config.venues() {
346            let needs_book_data = matches!(
347                venue_config.book_type(),
348                BookType::L2_MBP | BookType::L3_MBO
349            );
350
351            if needs_book_data {
352                let venue_name = venue_config.name().to_string();
353                let has_book_data = config.data().iter().any(|dc| {
354                    let is_book_type = matches!(
355                        dc.data_type(),
356                        NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
357                    );
358
359                    if !is_book_type {
360                        return false;
361                    }
362
363                    // Unfiltered config (no instrument filter) covers all venues
364                    let ids = dc.get_instrument_ids().unwrap_or_default();
365                    ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
366                });
367                anyhow::ensure!(
368                    has_book_data,
369                    "Venue '{venue_name}' has book_type {:?} but no order book data configured",
370                    venue_config.book_type()
371                );
372            }
373        }
374    }
375    Ok(())
376}
377
378fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
379    for data_config in config.data() {
380        let data = load_data(data_config, config.start(), config.end())?;
381        if data.is_empty() {
382            log::warn!("No data found for config: {:?}", data_config.data_type());
383            continue;
384        }
385        engine.add_data(data, data_config.client_id(), false, false)?;
386    }
387
388    engine.sort_data();
389    engine.run(
390        config.start(),
391        config.end(),
392        Some(config.id().to_string()),
393        false,
394    )
395}
396
397fn run_streaming(
398    engine: &mut BacktestEngine,
399    config: &BacktestRunConfig,
400    chunk_size: usize,
401) -> anyhow::Result<()> {
402    let data_configs = config.data();
403
404    if data_configs.len() == 1 {
405        // Single config: stream directly from catalog iterator without
406        // materializing the full dataset, bounded by chunk_size
407        let data_config = &data_configs[0];
408        let mut catalog = create_catalog(data_config)?;
409        let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
410        stream_chunks(engine, config, result.peekable(), chunk_size)?;
411    } else {
412        // Multiple configs require loading all data to merge-sort across types
413        let all_data = load_and_merge_data(config)?;
414        stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
415    }
416
417    Ok(())
418}
419
420// Feeds data from an iterator to the engine in timestamp-aligned chunks.
421// Each chunk contains up to `chunk_size` events, extended to include all
422// events sharing the boundary timestamp so timers flush correctly.
423fn stream_chunks<I: Iterator<Item = Data>>(
424    engine: &mut BacktestEngine,
425    config: &BacktestRunConfig,
426    mut iter: Peekable<I>,
427    chunk_size: usize,
428) -> anyhow::Result<()> {
429    if iter.peek().is_none() {
430        engine.end();
431        return Ok(());
432    }
433
434    let mut next_start = config.start();
435
436    loop {
437        let chunk = take_aligned_chunk(&mut iter, chunk_size);
438        if chunk.is_empty() {
439            break;
440        }
441
442        let is_last = iter.peek().is_none();
443        let end = if is_last {
444            config.end()
445        } else {
446            chunk.last().map(HasTsInit::ts_init)
447        };
448
449        engine.add_data(chunk, None, false, true)?;
450        engine.run(next_start, end, Some(config.id().to_string()), true)?;
451        engine.clear_data();
452
453        // A shutdown request during the chunk already triggered end() inside
454        // engine.run(); stop loading further chunks so later data is not processed
455        if engine.kernel().is_shutdown_requested() {
456            return Ok(());
457        }
458
459        // Carry forward the end timestamp so the next chunk's run_impl
460        // sets clocks contiguously and processes gap timers correctly
461        next_start = end;
462    }
463
464    engine.end();
465    Ok(())
466}
467
468// Takes up to `chunk_size` items, then extends to include all remaining
469// items sharing the boundary timestamp to avoid splitting same-ts events.
470fn take_aligned_chunk<I: Iterator<Item = Data>>(
471    iter: &mut Peekable<I>,
472    chunk_size: usize,
473) -> Vec<Data> {
474    let mut chunk = Vec::with_capacity(chunk_size);
475
476    for _ in 0..chunk_size {
477        match iter.next() {
478            Some(item) => chunk.push(item),
479            None => return chunk,
480        }
481    }
482
483    if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
484        while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
485            chunk.push(iter.next().unwrap());
486        }
487    }
488
489    chunk
490}
491
492fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
493    let mut all_data = Vec::new();
494
495    for data_config in config.data() {
496        let data = load_data(data_config, config.start(), config.end())?;
497        if data.is_empty() {
498            log::warn!("No data found for config: {:?}", data_config.data_type());
499            continue;
500        }
501        all_data.extend(data);
502    }
503    all_data.sort_by_key(HasTsInit::ts_init);
504    Ok(all_data)
505}
506
507fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
508    let uri = match config.catalog_fs_protocol() {
509        Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
510        None => config.catalog_path().to_string(),
511    };
512    let storage_options = config
513        .catalog_fs_rust_storage_options()
514        .cloned()
515        .or_else(|| config.catalog_fs_storage_options().cloned());
516    ParquetDataCatalog::from_uri(&uri, storage_options, None, None, None)
517}
518
519fn load_data(
520    config: &BacktestDataConfig,
521    run_start: Option<UnixNanos>,
522    run_end: Option<UnixNanos>,
523) -> anyhow::Result<Vec<Data>> {
524    let mut catalog = create_catalog(config)?;
525    let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
526    Ok(result.collect())
527}
528
529fn dispatch_query(
530    catalog: &mut ParquetDataCatalog,
531    config: &BacktestDataConfig,
532    run_start: Option<UnixNanos>,
533    run_end: Option<UnixNanos>,
534) -> anyhow::Result<QueryResult> {
535    catalog.reset_session();
536
537    let identifiers = config.query_identifiers();
538    let start = max_opt(config.start_time(), run_start);
539    let end = min_opt(config.end_time(), run_end);
540    let filter = config.filter_expr();
541    let optimize = config.optimize_file_loading();
542
543    match config.data_type() {
544        NautilusDataType::QuoteTick => {
545            catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
546        }
547        NautilusDataType::TradeTick => {
548            catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
549        }
550        NautilusDataType::Bar => {
551            catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
552        }
553        NautilusDataType::OrderBookDelta => {
554            catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
555        }
556        NautilusDataType::OrderBookDepth10 => {
557            catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
558        }
559        NautilusDataType::MarkPriceUpdate => {
560            catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
561        }
562        NautilusDataType::IndexPriceUpdate => {
563            catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
564        }
565        NautilusDataType::FundingRateUpdate => {
566            catalog.query::<FundingRateUpdate>(identifiers, start, end, filter, None, optimize)
567        }
568        NautilusDataType::InstrumentStatus => {
569            catalog.query::<InstrumentStatus>(identifiers, start, end, filter, None, optimize)
570        }
571        NautilusDataType::OptionGreeks => {
572            catalog.query::<OptionGreeks>(identifiers, start, end, filter, None, optimize)
573        }
574        NautilusDataType::InstrumentClose => {
575            catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
576        }
577    }
578}
579
580fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
581    match (a, b) {
582        (Some(a), Some(b)) => Some(a.max(b)),
583        (Some(a), None) => Some(a),
584        (None, Some(b)) => Some(b),
585        (None, None) => None,
586    }
587}
588
589fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
590    match (a, b) {
591        (Some(a), Some(b)) => Some(a.min(b)),
592        (Some(a), None) => Some(a),
593        (None, Some(b)) => Some(b),
594        (None, None) => None,
595    }
596}