1use std::iter::Peekable;
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_core::UnixNanos;
22use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny};
23use nautilus_model::{
24 data::{
25 Bar, Data, HasTsInit, IndexPriceUpdate, InstrumentClose, MarkPriceUpdate, OrderBookDelta,
26 OrderBookDepth10, QuoteTick, TradeTick,
27 },
28 enums::BookType,
29 identifiers::{InstrumentId, Venue},
30 types::Money,
31};
32use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
33use rust_decimal::{Decimal, prelude::FromPrimitive};
34
35use crate::{
36 config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType},
37 engine::{BacktestEngine, BacktestResult},
38};
39
40#[derive(Debug)]
45#[cfg_attr(
46 feature = "python",
47 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
48)]
49pub struct BacktestNode {
50 configs: Vec<BacktestRunConfig>,
51 engines: AHashMap<String, BacktestEngine>,
52}
53
54impl BacktestNode {
55 pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
66 anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
67 validate_configs(&configs)?;
68 Ok(Self {
69 configs,
70 engines: AHashMap::new(),
71 })
72 }
73
74 #[must_use]
76 pub fn configs(&self) -> &[BacktestRunConfig] {
77 &self.configs
78 }
79
80 pub fn build(&mut self) -> anyhow::Result<()> {
89 for config in &self.configs {
90 if self.engines.contains_key(config.id()) {
91 continue;
92 }
93
94 let engine_config = config.engine().clone();
95 let mut engine = BacktestEngine::new(engine_config)?;
96
97 for venue_config in config.venues() {
98 let starting_balances: Vec<Money> = venue_config
99 .starting_balances()
100 .iter()
101 .map(|s| s.parse::<Money>())
102 .collect::<Result<Vec<_>, _>>()
103 .map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
104
105 let default_leverage = venue_config.default_leverage().and_then(Decimal::from_f64);
106 let leverages: AHashMap<InstrumentId, Decimal> = venue_config
107 .leverages()
108 .map(|m| {
109 m.iter()
110 .map(|(k, v)| {
111 Decimal::from_f64(*v).map(|d| (*k, d)).ok_or_else(|| {
112 anyhow::anyhow!("Invalid leverage {v} for instrument {k}")
113 })
114 })
115 .collect::<anyhow::Result<AHashMap<_, _>>>()
116 })
117 .transpose()?
118 .unwrap_or_default();
119
120 engine.add_venue(
121 Venue::from(venue_config.name().as_str()),
122 venue_config.oms_type(),
123 venue_config.account_type(),
124 venue_config.book_type(),
125 starting_balances,
126 venue_config.base_currency(),
127 default_leverage,
128 leverages,
129 Vec::new(),
130 FillModelAny::default(),
131 FeeModelAny::default(),
132 None, Some(venue_config.routing()),
134 Some(venue_config.reject_stop_orders()),
135 Some(venue_config.support_gtd_orders()),
136 Some(venue_config.support_contingent_orders()),
137 Some(venue_config.use_position_ids()),
138 Some(venue_config.use_random_ids()),
139 Some(venue_config.use_reduce_only()),
140 None, Some(venue_config.use_market_order_acks()),
142 Some(venue_config.bar_execution()),
143 Some(venue_config.bar_adaptive_high_low_ordering()),
144 Some(venue_config.trade_execution()),
145 Some(venue_config.liquidity_consumption()),
146 Some(venue_config.allow_cash_borrowing()),
147 Some(venue_config.frozen_account()),
148 Some(venue_config.price_protection_points()),
149 )?;
150 }
151
152 for data_config in config.data() {
153 let catalog = create_catalog(data_config)?;
154 let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
155 let filter = if instr_ids.is_empty() {
156 None
157 } else {
158 Some(instr_ids.iter().map(ToString::to_string).collect())
159 };
160
161 let instruments = catalog.query_instruments(filter)?;
162
163 if !instr_ids.is_empty() && instruments.is_empty() {
164 let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
165 anyhow::bail!(
166 "No instruments found in catalog for requested IDs: [{}]",
167 ids.join(", ")
168 );
169 }
170
171 for instrument in instruments {
172 engine.add_instrument(instrument)?;
173 }
174 }
175
176 self.engines.insert(config.id().to_string(), engine);
177 }
178
179 Ok(())
180 }
181
182 #[must_use]
184 pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
185 self.engines.get_mut(id)
186 }
187
188 #[must_use]
190 pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
191 self.engines.get(id)
192 }
193
194 #[must_use]
196 pub fn get_engines(&self) -> Vec<&BacktestEngine> {
197 self.engines.values().collect()
198 }
199
200 pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
210 if self.engines.is_empty() {
212 self.build()?;
213 }
214
215 let mut results = Vec::new();
216
217 for config in &self.configs {
218 let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
219 anyhow::anyhow!(
220 "Engine not found for config '{}'. Call build() first.",
221 config.id()
222 )
223 })?;
224
225 match config.chunk_size() {
226 None => run_oneshot(engine, config)?,
227 Some(chunk_size) => {
228 anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
229 run_streaming(engine, config, chunk_size)?;
230 }
231 }
232
233 results.push(engine.get_result());
234
235 if config.dispose_on_completion() {
236 engine.dispose();
237 } else {
238 engine.clear_data();
239 }
240 }
241
242 Ok(results)
243 }
244
245 pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
251 create_catalog(config)
252 }
253
254 pub fn load_data_config(
260 config: &BacktestDataConfig,
261 start: Option<UnixNanos>,
262 end: Option<UnixNanos>,
263 ) -> anyhow::Result<Vec<Data>> {
264 load_data(config, start, end)
265 }
266
267 pub fn dispose(&mut self) {
269 for engine in self.engines.values_mut() {
270 engine.dispose();
271 }
272 self.engines.clear();
273 }
274}
275
276fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
277 anyhow::ensure!(
280 configs.len() <= 1,
281 "Only one run config per BacktestNode is supported \
282 (kernel MessageBus is a thread-local singleton)"
283 );
284
285 let mut seen_ids = AHashSet::new();
286 for config in configs {
287 anyhow::ensure!(
288 seen_ids.insert(config.id()),
289 "Duplicate run config ID '{}'",
290 config.id()
291 );
292
293 let venue_names: Vec<String> = config
294 .venues()
295 .iter()
296 .map(|v| v.name().to_string())
297 .collect();
298
299 for data_config in config.data() {
300 if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
301 anyhow::ensure!(
302 start <= end,
303 "Data config start_time ({start}) must be <= end_time ({end})"
304 );
305 }
306
307 for instrument_id in data_config.get_instrument_ids()? {
308 let venue = instrument_id.venue.to_string();
309 anyhow::ensure!(
310 venue_names.contains(&venue),
311 "No venue config found for venue '{venue}' (required by instrument {instrument_id})"
312 );
313 }
314 }
315
316 for venue_config in config.venues() {
317 let needs_book_data = matches!(
318 venue_config.book_type(),
319 BookType::L2_MBP | BookType::L3_MBO
320 );
321
322 if needs_book_data {
323 let venue_name = venue_config.name().to_string();
324 let has_book_data = config.data().iter().any(|dc| {
325 let is_book_type = matches!(
326 dc.data_type(),
327 NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
328 );
329
330 if !is_book_type {
331 return false;
332 }
333
334 let ids = dc.get_instrument_ids().unwrap_or_default();
336 ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
337 });
338 anyhow::ensure!(
339 has_book_data,
340 "Venue '{venue_name}' has book_type {:?} but no order book data configured",
341 venue_config.book_type()
342 );
343 }
344 }
345 }
346 Ok(())
347}
348
349fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
350 for data_config in config.data() {
351 let data = load_data(data_config, config.start(), config.end())?;
352 if data.is_empty() {
353 log::warn!("No data found for config: {:?}", data_config.data_type());
354 continue;
355 }
356 engine.add_data(data, data_config.client_id(), false, false);
357 }
358
359 engine.sort_data();
360 engine.run(
361 config.start(),
362 config.end(),
363 Some(config.id().to_string()),
364 false,
365 )
366}
367
368fn run_streaming(
369 engine: &mut BacktestEngine,
370 config: &BacktestRunConfig,
371 chunk_size: usize,
372) -> anyhow::Result<()> {
373 let data_configs = config.data();
374
375 if data_configs.len() == 1 {
376 let data_config = &data_configs[0];
379 let mut catalog = create_catalog(data_config)?;
380 let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
381 stream_chunks(engine, config, result.peekable(), chunk_size)?;
382 } else {
383 let all_data = load_and_merge_data(config)?;
385 stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
386 }
387
388 Ok(())
389}
390
391fn stream_chunks<I: Iterator<Item = Data>>(
395 engine: &mut BacktestEngine,
396 config: &BacktestRunConfig,
397 mut iter: Peekable<I>,
398 chunk_size: usize,
399) -> anyhow::Result<()> {
400 if iter.peek().is_none() {
401 engine.end();
402 return Ok(());
403 }
404
405 let mut next_start = config.start();
406 loop {
407 let chunk = take_aligned_chunk(&mut iter, chunk_size);
408 if chunk.is_empty() {
409 break;
410 }
411
412 let is_last = iter.peek().is_none();
413 let end = if is_last {
414 config.end()
415 } else {
416 chunk.last().map(HasTsInit::ts_init)
417 };
418
419 engine.add_data(chunk, None, false, true);
420 engine.run(next_start, end, Some(config.id().to_string()), true)?;
421 engine.clear_data();
422
423 next_start = end;
426 }
427
428 engine.end();
429 Ok(())
430}
431
432fn take_aligned_chunk<I: Iterator<Item = Data>>(
435 iter: &mut Peekable<I>,
436 chunk_size: usize,
437) -> Vec<Data> {
438 let mut chunk = Vec::with_capacity(chunk_size);
439
440 for _ in 0..chunk_size {
441 match iter.next() {
442 Some(item) => chunk.push(item),
443 None => return chunk,
444 }
445 }
446
447 if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
448 while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
449 chunk.push(iter.next().unwrap());
450 }
451 }
452
453 chunk
454}
455
456fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
457 let mut all_data = Vec::new();
458 for data_config in config.data() {
459 let data = load_data(data_config, config.start(), config.end())?;
460 if data.is_empty() {
461 log::warn!("No data found for config: {:?}", data_config.data_type());
462 continue;
463 }
464 all_data.extend(data);
465 }
466 all_data.sort_by_key(HasTsInit::ts_init);
467 Ok(all_data)
468}
469
470fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
471 let uri = match config.catalog_fs_protocol() {
472 Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
473 None => config.catalog_path().to_string(),
474 };
475 ParquetDataCatalog::from_uri(
476 &uri,
477 config.catalog_fs_storage_options().cloned(),
478 None,
479 None,
480 None,
481 )
482}
483
484fn load_data(
485 config: &BacktestDataConfig,
486 run_start: Option<UnixNanos>,
487 run_end: Option<UnixNanos>,
488) -> anyhow::Result<Vec<Data>> {
489 let mut catalog = create_catalog(config)?;
490 let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
491 Ok(result.collect())
492}
493
494fn dispatch_query(
495 catalog: &mut ParquetDataCatalog,
496 config: &BacktestDataConfig,
497 run_start: Option<UnixNanos>,
498 run_end: Option<UnixNanos>,
499) -> anyhow::Result<QueryResult> {
500 catalog.reset_session();
501
502 let identifiers = config.query_identifiers();
503 let start = max_opt(config.start_time(), run_start);
504 let end = min_opt(config.end_time(), run_end);
505 let filter = config.filter_expr();
506 let optimize = config.optimize_file_loading();
507
508 match config.data_type() {
509 NautilusDataType::QuoteTick => {
510 catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
511 }
512 NautilusDataType::TradeTick => {
513 catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
514 }
515 NautilusDataType::Bar => {
516 catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
517 }
518 NautilusDataType::OrderBookDelta => {
519 catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
520 }
521 NautilusDataType::OrderBookDepth10 => {
522 catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
523 }
524 NautilusDataType::MarkPriceUpdate => {
525 catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
526 }
527 NautilusDataType::IndexPriceUpdate => {
528 catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
529 }
530 NautilusDataType::InstrumentClose => {
531 catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
532 }
533 }
534}
535
536fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
537 match (a, b) {
538 (Some(a), Some(b)) => Some(a.max(b)),
539 (Some(a), None) => Some(a),
540 (None, Some(b)) => Some(b),
541 (None, None) => None,
542 }
543}
544
545fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
546 match (a, b) {
547 (Some(a), Some(b)) => Some(a.min(b)),
548 (Some(a), None) => Some(a),
549 (None, Some(b)) => Some(b),
550 (None, None) => None,
551 }
552}