1use std::iter::Peekable;
19
20use ahash::{AHashMap, AHashSet};
21use nautilus_core::UnixNanos;
22use nautilus_model::{
23 data::{
24 Bar, Data, FundingRateUpdate, HasTsInit, IndexPriceUpdate, InstrumentClose,
25 InstrumentStatus, MarkPriceUpdate, OptionGreeks, OrderBookDelta, OrderBookDepth10,
26 QuoteTick, TradeTick,
27 },
28 enums::{BookType, OtoTriggerMode},
29 identifiers::{InstrumentId, Venue},
30 instruments::Instrument,
31 types::Money,
32};
33use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
34
35use crate::{
36 config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType, SimulatedVenueConfig},
37 engine::BacktestEngine,
38 result::BacktestResult,
39};
40
41#[derive(Debug)]
46#[cfg_attr(
47 feature = "python",
48 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
49)]
50#[cfg_attr(
51 feature = "python",
52 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.backtest")
53)]
54pub struct BacktestNode {
55 configs: Vec<BacktestRunConfig>,
56 engines: AHashMap<String, BacktestEngine>,
57}
58
59impl BacktestNode {
60 pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
71 anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
72 validate_configs(&configs)?;
73 Ok(Self {
74 configs,
75 engines: AHashMap::new(),
76 })
77 }
78
79 #[must_use]
81 pub fn configs(&self) -> &[BacktestRunConfig] {
82 &self.configs
83 }
84
85 pub fn build(&mut self) -> anyhow::Result<()> {
94 for config in &self.configs {
95 if self.engines.contains_key(config.id()) {
96 continue;
97 }
98
99 let engine_config = config.engine().clone();
100 let mut engine = BacktestEngine::new(engine_config)?;
101
102 for venue_config in config.venues() {
103 let starting_balances: Vec<Money> = venue_config
104 .starting_balances()
105 .iter()
106 .map(|s| s.parse::<Money>())
107 .collect::<Result<Vec<_>, _>>()
108 .map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
109
110 let default_leverage = venue_config.default_leverage();
111 let leverages = venue_config.leverages().cloned().unwrap_or_default();
112 let margin_model = venue_config.margin_model().cloned();
113 let modules = venue_config
114 .modules()
115 .iter()
116 .cloned()
117 .map(Into::into)
118 .collect();
119 let fill_model = venue_config.fill_model().cloned().unwrap_or_default();
120 let fee_model = venue_config.fee_model().cloned().unwrap_or_default();
121 let latency_model = venue_config.latency_model().cloned().map(Into::into);
122 let sim_config = SimulatedVenueConfig::builder()
123 .venue(Venue::from(venue_config.name().as_str()))
124 .oms_type(venue_config.oms_type())
125 .account_type(venue_config.account_type())
126 .book_type(venue_config.book_type())
127 .starting_balances(starting_balances)
128 .maybe_base_currency(venue_config.base_currency())
129 .default_leverage(default_leverage)
130 .leverages(leverages)
131 .maybe_margin_model(margin_model)
132 .modules(modules)
133 .fill_model(fill_model)
134 .fee_model(fee_model)
135 .maybe_latency_model(latency_model)
136 .routing(venue_config.routing())
137 .reject_stop_orders(venue_config.reject_stop_orders())
138 .support_gtd_orders(venue_config.support_gtd_orders())
139 .support_contingent_orders(venue_config.support_contingent_orders())
140 .use_position_ids(venue_config.use_position_ids())
141 .use_random_ids(venue_config.use_random_ids())
142 .use_reduce_only(venue_config.use_reduce_only())
143 .use_market_order_acks(venue_config.use_market_order_acks())
144 .bar_execution(venue_config.bar_execution())
145 .bar_adaptive_high_low_ordering(venue_config.bar_adaptive_high_low_ordering())
146 .trade_execution(venue_config.trade_execution())
147 .liquidity_consumption(venue_config.liquidity_consumption())
148 .allow_cash_borrowing(venue_config.allow_cash_borrowing())
149 .frozen_account(venue_config.frozen_account())
150 .queue_position(venue_config.queue_position())
151 .oto_full_trigger(venue_config.oto_trigger_mode() == OtoTriggerMode::Full)
152 .price_protection_points(venue_config.price_protection_points())
153 .liquidation_enabled(venue_config.liquidation_enabled())
154 .liquidation_trigger_ratio(venue_config.liquidation_trigger_ratio())
155 .liquidation_cancel_open_orders(venue_config.liquidation_cancel_open_orders())
156 .build();
157 engine.add_venue(sim_config)?;
158 }
159
160 for data_config in config.data() {
161 let catalog = create_catalog(data_config)?;
162 let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
163 let filter: Option<Vec<String>> = if instr_ids.is_empty() {
164 None
165 } else {
166 Some(instr_ids.iter().map(ToString::to_string).collect())
167 };
168
169 let instruments = catalog.query_instruments(filter.as_deref())?;
170
171 if !instr_ids.is_empty() && instruments.is_empty() {
172 let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
173 anyhow::bail!(
174 "No instruments found in catalog for requested IDs: [{}]",
175 ids.join(", ")
176 );
177 }
178
179 for instrument in instruments {
180 engine.add_instrument(&instrument)?;
181 }
182 }
183
184 for venue_config in config.venues() {
185 let Some(settlement_prices) = venue_config.settlement_prices() else {
186 continue;
187 };
188 let venue = Venue::from(venue_config.name().as_str());
189
190 for (instrument_id, raw_price) in settlement_prices {
191 let price = {
192 let cache = engine.kernel().cache.borrow();
193 let instrument = cache.instrument(instrument_id).ok_or_else(|| {
194 anyhow::anyhow!(
195 "No instrument found for settlement price configuration: {instrument_id}"
196 )
197 })?;
198 instrument.make_price(*raw_price)
199 };
200 engine.set_settlement_price(venue, *instrument_id, price)?;
201 }
202 }
203
204 self.engines.insert(config.id().to_string(), engine);
205 }
206
207 Ok(())
208 }
209
210 #[must_use]
212 pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
213 self.engines.get_mut(id)
214 }
215
216 #[must_use]
218 pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
219 self.engines.get(id)
220 }
221
222 #[must_use]
224 pub fn get_engines(&self) -> Vec<&BacktestEngine> {
225 self.engines.values().collect()
226 }
227
228 pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
238 if self.engines.is_empty() {
240 self.build()?;
241 }
242
243 let mut results = Vec::new();
244
245 for config in &self.configs {
246 let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
247 anyhow::anyhow!(
248 "Engine not found for config '{}'. Call build() first.",
249 config.id()
250 )
251 })?;
252
253 match config.chunk_size() {
254 None => run_oneshot(engine, config)?,
255 Some(chunk_size) => {
256 anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
257 run_streaming(engine, config, chunk_size)?;
258 }
259 }
260
261 results.push(engine.get_result());
262
263 if config.dispose_on_completion() {
264 engine.dispose();
265 } else {
266 engine.clear_data();
267 }
268 }
269
270 Ok(results)
271 }
272
273 pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
279 create_catalog(config)
280 }
281
282 pub fn load_data_config(
288 config: &BacktestDataConfig,
289 start: Option<UnixNanos>,
290 end: Option<UnixNanos>,
291 ) -> anyhow::Result<Vec<Data>> {
292 load_data(config, start, end)
293 }
294
295 pub fn dispose(&mut self) {
297 for engine in self.engines.values_mut() {
298 engine.dispose();
299 }
300 self.engines.clear();
301 }
302}
303
304fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
305 anyhow::ensure!(
308 configs.len() <= 1,
309 "Only one run config per BacktestNode is supported \
310 (kernel MessageBus is a thread-local singleton)"
311 );
312
313 let mut seen_ids = AHashSet::new();
314
315 for config in configs {
316 anyhow::ensure!(
317 seen_ids.insert(config.id()),
318 "Duplicate run config ID '{}'",
319 config.id()
320 );
321
322 let venue_names: Vec<String> = config
323 .venues()
324 .iter()
325 .map(|v| v.name().to_string())
326 .collect();
327
328 for data_config in config.data() {
329 if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
330 anyhow::ensure!(
331 start <= end,
332 "Data config start_time ({start}) must be <= end_time ({end})"
333 );
334 }
335
336 for instrument_id in data_config.get_instrument_ids()? {
337 let venue = instrument_id.venue.to_string();
338 anyhow::ensure!(
339 venue_names.contains(&venue),
340 "No venue config found for venue '{venue}' (required by instrument {instrument_id})"
341 );
342 }
343 }
344
345 for venue_config in config.venues() {
346 let needs_book_data = matches!(
347 venue_config.book_type(),
348 BookType::L2_MBP | BookType::L3_MBO
349 );
350
351 if needs_book_data {
352 let venue_name = venue_config.name().to_string();
353 let has_book_data = config.data().iter().any(|dc| {
354 let is_book_type = matches!(
355 dc.data_type(),
356 NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
357 );
358
359 if !is_book_type {
360 return false;
361 }
362
363 let ids = dc.get_instrument_ids().unwrap_or_default();
365 ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
366 });
367 anyhow::ensure!(
368 has_book_data,
369 "Venue '{venue_name}' has book_type {:?} but no order book data configured",
370 venue_config.book_type()
371 );
372 }
373 }
374 }
375 Ok(())
376}
377
378fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
379 for data_config in config.data() {
380 let data = load_data(data_config, config.start(), config.end())?;
381 if data.is_empty() {
382 log::warn!("No data found for config: {:?}", data_config.data_type());
383 continue;
384 }
385 engine.add_data(data, data_config.client_id(), false, false)?;
386 }
387
388 engine.sort_data();
389 engine.run(
390 config.start(),
391 config.end(),
392 Some(config.id().to_string()),
393 false,
394 )
395}
396
397fn run_streaming(
398 engine: &mut BacktestEngine,
399 config: &BacktestRunConfig,
400 chunk_size: usize,
401) -> anyhow::Result<()> {
402 let data_configs = config.data();
403
404 if data_configs.len() == 1 {
405 let data_config = &data_configs[0];
408 let mut catalog = create_catalog(data_config)?;
409 let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
410 stream_chunks(engine, config, result.peekable(), chunk_size)?;
411 } else {
412 let all_data = load_and_merge_data(config)?;
414 stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
415 }
416
417 Ok(())
418}
419
420fn stream_chunks<I: Iterator<Item = Data>>(
424 engine: &mut BacktestEngine,
425 config: &BacktestRunConfig,
426 mut iter: Peekable<I>,
427 chunk_size: usize,
428) -> anyhow::Result<()> {
429 if iter.peek().is_none() {
430 engine.end();
431 return Ok(());
432 }
433
434 let mut next_start = config.start();
435
436 loop {
437 let chunk = take_aligned_chunk(&mut iter, chunk_size);
438 if chunk.is_empty() {
439 break;
440 }
441
442 let is_last = iter.peek().is_none();
443 let end = if is_last {
444 config.end()
445 } else {
446 chunk.last().map(HasTsInit::ts_init)
447 };
448
449 engine.add_data(chunk, None, false, true)?;
450 engine.run(next_start, end, Some(config.id().to_string()), true)?;
451 engine.clear_data();
452
453 if engine.kernel().is_shutdown_requested() {
456 return Ok(());
457 }
458
459 next_start = end;
462 }
463
464 engine.end();
465 Ok(())
466}
467
468fn take_aligned_chunk<I: Iterator<Item = Data>>(
471 iter: &mut Peekable<I>,
472 chunk_size: usize,
473) -> Vec<Data> {
474 let mut chunk = Vec::with_capacity(chunk_size);
475
476 for _ in 0..chunk_size {
477 match iter.next() {
478 Some(item) => chunk.push(item),
479 None => return chunk,
480 }
481 }
482
483 if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
484 while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
485 chunk.push(iter.next().unwrap());
486 }
487 }
488
489 chunk
490}
491
492fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
493 let mut all_data = Vec::new();
494
495 for data_config in config.data() {
496 let data = load_data(data_config, config.start(), config.end())?;
497 if data.is_empty() {
498 log::warn!("No data found for config: {:?}", data_config.data_type());
499 continue;
500 }
501 all_data.extend(data);
502 }
503 all_data.sort_by_key(HasTsInit::ts_init);
504 Ok(all_data)
505}
506
507fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
508 let uri = match config.catalog_fs_protocol() {
509 Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
510 None => config.catalog_path().to_string(),
511 };
512 let storage_options = config
513 .catalog_fs_rust_storage_options()
514 .cloned()
515 .or_else(|| config.catalog_fs_storage_options().cloned());
516 ParquetDataCatalog::from_uri(&uri, storage_options, None, None, None)
517}
518
519fn load_data(
520 config: &BacktestDataConfig,
521 run_start: Option<UnixNanos>,
522 run_end: Option<UnixNanos>,
523) -> anyhow::Result<Vec<Data>> {
524 let mut catalog = create_catalog(config)?;
525 let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
526 Ok(result.collect())
527}
528
529fn dispatch_query(
530 catalog: &mut ParquetDataCatalog,
531 config: &BacktestDataConfig,
532 run_start: Option<UnixNanos>,
533 run_end: Option<UnixNanos>,
534) -> anyhow::Result<QueryResult> {
535 catalog.reset_session();
536
537 let identifiers = config.query_identifiers();
538 let start = max_opt(config.start_time(), run_start);
539 let end = min_opt(config.end_time(), run_end);
540 let filter = config.filter_expr();
541 let optimize = config.optimize_file_loading();
542
543 match config.data_type() {
544 NautilusDataType::QuoteTick => {
545 catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
546 }
547 NautilusDataType::TradeTick => {
548 catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
549 }
550 NautilusDataType::Bar => {
551 catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
552 }
553 NautilusDataType::OrderBookDelta => {
554 catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
555 }
556 NautilusDataType::OrderBookDepth10 => {
557 catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
558 }
559 NautilusDataType::MarkPriceUpdate => {
560 catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
561 }
562 NautilusDataType::IndexPriceUpdate => {
563 catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
564 }
565 NautilusDataType::FundingRateUpdate => {
566 catalog.query::<FundingRateUpdate>(identifiers, start, end, filter, None, optimize)
567 }
568 NautilusDataType::InstrumentStatus => {
569 catalog.query::<InstrumentStatus>(identifiers, start, end, filter, None, optimize)
570 }
571 NautilusDataType::OptionGreeks => {
572 catalog.query::<OptionGreeks>(identifiers, start, end, filter, None, optimize)
573 }
574 NautilusDataType::InstrumentClose => {
575 catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
576 }
577 }
578}
579
580fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
581 match (a, b) {
582 (Some(a), Some(b)) => Some(a.max(b)),
583 (Some(a), None) => Some(a),
584 (None, Some(b)) => Some(b),
585 (None, None) => None,
586 }
587}
588
589fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
590 match (a, b) {
591 (Some(a), Some(b)) => Some(a.min(b)),
592 (Some(a), None) => Some(a),
593 (None, Some(b)) => Some(b),
594 (None, None) => None,
595 }
596}