use std::iter::Peekable;
use ahash::{AHashMap, AHashSet};
use nautilus_core::UnixNanos;
use nautilus_execution::models::{fee::FeeModelAny, fill::FillModelAny};
use nautilus_model::{
data::{
Bar, Data, HasTsInit, IndexPriceUpdate, InstrumentClose, MarkPriceUpdate, OrderBookDelta,
OrderBookDepth10, QuoteTick, TradeTick,
},
enums::{BookType, OtoTriggerMode},
identifiers::{InstrumentId, Venue},
types::Money,
};
use nautilus_persistence::backend::{catalog::ParquetDataCatalog, session::QueryResult};
use rust_decimal::{Decimal, prelude::FromPrimitive};
use crate::{
config::{BacktestDataConfig, BacktestRunConfig, NautilusDataType},
engine::BacktestEngine,
result::BacktestResult,
};
#[derive(Debug)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.backtest", unsendable)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.backtest")
)]
pub struct BacktestNode {
configs: Vec<BacktestRunConfig>,
engines: AHashMap<String, BacktestEngine>,
}
impl BacktestNode {
pub fn new(configs: Vec<BacktestRunConfig>) -> anyhow::Result<Self> {
anyhow::ensure!(!configs.is_empty(), "At least one run config is required");
validate_configs(&configs)?;
Ok(Self {
configs,
engines: AHashMap::new(),
})
}
#[must_use]
pub fn configs(&self) -> &[BacktestRunConfig] {
&self.configs
}
pub fn build(&mut self) -> anyhow::Result<()> {
for config in &self.configs {
if self.engines.contains_key(config.id()) {
continue;
}
let engine_config = config.engine().clone();
let mut engine = BacktestEngine::new(engine_config)?;
for venue_config in config.venues() {
let starting_balances: Vec<Money> = venue_config
.starting_balances()
.iter()
.map(|s| s.parse::<Money>())
.collect::<Result<Vec<_>, _>>()
.map_err(|e| anyhow::anyhow!("Invalid starting balance: {e}"))?;
let default_leverage = venue_config.default_leverage().and_then(Decimal::from_f64);
let leverages: AHashMap<InstrumentId, Decimal> = venue_config
.leverages()
.map(|m| {
m.iter()
.map(|(k, v)| {
Decimal::from_f64(*v).map(|d| (*k, d)).ok_or_else(|| {
anyhow::anyhow!("Invalid leverage {v} for instrument {k}")
})
})
.collect::<anyhow::Result<AHashMap<_, _>>>()
})
.transpose()?
.unwrap_or_default();
engine.add_venue(
Venue::from(venue_config.name().as_str()),
venue_config.oms_type(),
venue_config.account_type(),
venue_config.book_type(),
starting_balances,
venue_config.base_currency(),
default_leverage,
leverages,
None, Vec::new(),
FillModelAny::default(),
FeeModelAny::default(),
None, Some(venue_config.routing()),
Some(venue_config.reject_stop_orders()),
Some(venue_config.support_gtd_orders()),
Some(venue_config.support_contingent_orders()),
Some(venue_config.use_position_ids()),
Some(venue_config.use_random_ids()),
Some(venue_config.use_reduce_only()),
None, Some(venue_config.use_market_order_acks()),
Some(venue_config.bar_execution()),
Some(venue_config.bar_adaptive_high_low_ordering()),
Some(venue_config.trade_execution()),
Some(venue_config.liquidity_consumption()),
Some(venue_config.allow_cash_borrowing()),
Some(venue_config.frozen_account()),
Some(venue_config.queue_position()),
Some(venue_config.oto_trigger_mode() == OtoTriggerMode::Full),
Some(venue_config.price_protection_points()),
)?;
}
for data_config in config.data() {
let catalog = create_catalog(data_config)?;
let instr_ids: Vec<InstrumentId> = data_config.get_instrument_ids()?;
let filter: Option<Vec<String>> = if instr_ids.is_empty() {
None
} else {
Some(instr_ids.iter().map(ToString::to_string).collect())
};
let instruments = catalog.query_instruments(filter.as_deref())?;
if !instr_ids.is_empty() && instruments.is_empty() {
let ids: Vec<String> = instr_ids.iter().map(ToString::to_string).collect();
anyhow::bail!(
"No instruments found in catalog for requested IDs: [{}]",
ids.join(", ")
);
}
for instrument in instruments {
engine.add_instrument(&instrument)?;
}
}
self.engines.insert(config.id().to_string(), engine);
}
Ok(())
}
#[must_use]
pub fn get_engine_mut(&mut self, id: &str) -> Option<&mut BacktestEngine> {
self.engines.get_mut(id)
}
#[must_use]
pub fn get_engine(&self, id: &str) -> Option<&BacktestEngine> {
self.engines.get(id)
}
#[must_use]
pub fn get_engines(&self) -> Vec<&BacktestEngine> {
self.engines.values().collect()
}
pub fn run(&mut self) -> anyhow::Result<Vec<BacktestResult>> {
if self.engines.is_empty() {
self.build()?;
}
let mut results = Vec::new();
for config in &self.configs {
let engine = self.engines.get_mut(config.id()).ok_or_else(|| {
anyhow::anyhow!(
"Engine not found for config '{}'. Call build() first.",
config.id()
)
})?;
match config.chunk_size() {
None => run_oneshot(engine, config)?,
Some(chunk_size) => {
anyhow::ensure!(chunk_size > 0, "chunk_size must be > 0");
run_streaming(engine, config, chunk_size)?;
}
}
results.push(engine.get_result());
if config.dispose_on_completion() {
engine.dispose();
} else {
engine.clear_data();
}
}
Ok(results)
}
pub fn load_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
create_catalog(config)
}
pub fn load_data_config(
config: &BacktestDataConfig,
start: Option<UnixNanos>,
end: Option<UnixNanos>,
) -> anyhow::Result<Vec<Data>> {
load_data(config, start, end)
}
pub fn dispose(&mut self) {
for engine in self.engines.values_mut() {
engine.dispose();
}
self.engines.clear();
}
}
fn validate_configs(configs: &[BacktestRunConfig]) -> anyhow::Result<()> {
anyhow::ensure!(
configs.len() <= 1,
"Only one run config per BacktestNode is supported \
(kernel MessageBus is a thread-local singleton)"
);
let mut seen_ids = AHashSet::new();
for config in configs {
anyhow::ensure!(
seen_ids.insert(config.id()),
"Duplicate run config ID '{}'",
config.id()
);
let venue_names: Vec<String> = config
.venues()
.iter()
.map(|v| v.name().to_string())
.collect();
for data_config in config.data() {
if let (Some(start), Some(end)) = (data_config.start_time(), data_config.end_time()) {
anyhow::ensure!(
start <= end,
"Data config start_time ({start}) must be <= end_time ({end})"
);
}
for instrument_id in data_config.get_instrument_ids()? {
let venue = instrument_id.venue.to_string();
anyhow::ensure!(
venue_names.contains(&venue),
"No venue config found for venue '{venue}' (required by instrument {instrument_id})"
);
}
}
for venue_config in config.venues() {
let needs_book_data = matches!(
venue_config.book_type(),
BookType::L2_MBP | BookType::L3_MBO
);
if needs_book_data {
let venue_name = venue_config.name().to_string();
let has_book_data = config.data().iter().any(|dc| {
let is_book_type = matches!(
dc.data_type(),
NautilusDataType::OrderBookDelta | NautilusDataType::OrderBookDepth10
);
if !is_book_type {
return false;
}
let ids = dc.get_instrument_ids().unwrap_or_default();
ids.is_empty() || ids.iter().any(|id| id.venue.to_string() == venue_name)
});
anyhow::ensure!(
has_book_data,
"Venue '{venue_name}' has book_type {:?} but no order book data configured",
venue_config.book_type()
);
}
}
}
Ok(())
}
fn run_oneshot(engine: &mut BacktestEngine, config: &BacktestRunConfig) -> anyhow::Result<()> {
for data_config in config.data() {
let data = load_data(data_config, config.start(), config.end())?;
if data.is_empty() {
log::warn!("No data found for config: {:?}", data_config.data_type());
continue;
}
engine.add_data(data, data_config.client_id(), false, false);
}
engine.sort_data();
engine.run(
config.start(),
config.end(),
Some(config.id().to_string()),
false,
)
}
fn run_streaming(
engine: &mut BacktestEngine,
config: &BacktestRunConfig,
chunk_size: usize,
) -> anyhow::Result<()> {
let data_configs = config.data();
if data_configs.len() == 1 {
let data_config = &data_configs[0];
let mut catalog = create_catalog(data_config)?;
let result = dispatch_query(&mut catalog, data_config, config.start(), config.end())?;
stream_chunks(engine, config, result.peekable(), chunk_size)?;
} else {
let all_data = load_and_merge_data(config)?;
stream_chunks(engine, config, all_data.into_iter().peekable(), chunk_size)?;
}
Ok(())
}
fn stream_chunks<I: Iterator<Item = Data>>(
engine: &mut BacktestEngine,
config: &BacktestRunConfig,
mut iter: Peekable<I>,
chunk_size: usize,
) -> anyhow::Result<()> {
if iter.peek().is_none() {
engine.end();
return Ok(());
}
let mut next_start = config.start();
loop {
let chunk = take_aligned_chunk(&mut iter, chunk_size);
if chunk.is_empty() {
break;
}
let is_last = iter.peek().is_none();
let end = if is_last {
config.end()
} else {
chunk.last().map(HasTsInit::ts_init)
};
engine.add_data(chunk, None, false, true);
engine.run(next_start, end, Some(config.id().to_string()), true)?;
engine.clear_data();
next_start = end;
}
engine.end();
Ok(())
}
fn take_aligned_chunk<I: Iterator<Item = Data>>(
iter: &mut Peekable<I>,
chunk_size: usize,
) -> Vec<Data> {
let mut chunk = Vec::with_capacity(chunk_size);
for _ in 0..chunk_size {
match iter.next() {
Some(item) => chunk.push(item),
None => return chunk,
}
}
if let Some(boundary_ts) = chunk.last().map(HasTsInit::ts_init) {
while iter.peek().is_some_and(|d| d.ts_init() == boundary_ts) {
chunk.push(iter.next().unwrap());
}
}
chunk
}
fn load_and_merge_data(config: &BacktestRunConfig) -> anyhow::Result<Vec<Data>> {
let mut all_data = Vec::new();
for data_config in config.data() {
let data = load_data(data_config, config.start(), config.end())?;
if data.is_empty() {
log::warn!("No data found for config: {:?}", data_config.data_type());
continue;
}
all_data.extend(data);
}
all_data.sort_by_key(HasTsInit::ts_init);
Ok(all_data)
}
fn create_catalog(config: &BacktestDataConfig) -> anyhow::Result<ParquetDataCatalog> {
let uri = match config.catalog_fs_protocol() {
Some(protocol) => format!("{protocol}://{}", config.catalog_path()),
None => config.catalog_path().to_string(),
};
ParquetDataCatalog::from_uri(
&uri,
config.catalog_fs_storage_options().cloned(),
None,
None,
None,
)
}
fn load_data(
config: &BacktestDataConfig,
run_start: Option<UnixNanos>,
run_end: Option<UnixNanos>,
) -> anyhow::Result<Vec<Data>> {
let mut catalog = create_catalog(config)?;
let result = dispatch_query(&mut catalog, config, run_start, run_end)?;
Ok(result.collect())
}
fn dispatch_query(
catalog: &mut ParquetDataCatalog,
config: &BacktestDataConfig,
run_start: Option<UnixNanos>,
run_end: Option<UnixNanos>,
) -> anyhow::Result<QueryResult> {
catalog.reset_session();
let identifiers = config.query_identifiers();
let start = max_opt(config.start_time(), run_start);
let end = min_opt(config.end_time(), run_end);
let filter = config.filter_expr();
let optimize = config.optimize_file_loading();
match config.data_type() {
NautilusDataType::QuoteTick => {
catalog.query::<QuoteTick>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::TradeTick => {
catalog.query::<TradeTick>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::Bar => {
catalog.query::<Bar>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::OrderBookDelta => {
catalog.query::<OrderBookDelta>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::OrderBookDepth10 => {
catalog.query::<OrderBookDepth10>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::MarkPriceUpdate => {
catalog.query::<MarkPriceUpdate>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::IndexPriceUpdate => {
catalog.query::<IndexPriceUpdate>(identifiers, start, end, filter, None, optimize)
}
NautilusDataType::InstrumentClose => {
catalog.query::<InstrumentClose>(identifiers, start, end, filter, None, optimize)
}
}
}
fn max_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
match (a, b) {
(Some(a), Some(b)) => Some(a.max(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}
fn min_opt(a: Option<UnixNanos>, b: Option<UnixNanos>) -> Option<UnixNanos> {
match (a, b) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}