Skip to main content

nautilus_backtest/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for backtest node.
17
18use std::collections::HashMap;
19
20use nautilus_common::{actor::data_actor::ImportableActorConfig, python::actor::PyDataActor};
21#[cfg(feature = "examples")]
22use nautilus_core::python::to_pytype_err;
23use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
24use nautilus_model::identifiers::{ActorId, ComponentId, StrategyId};
25#[cfg(feature = "examples")]
26use nautilus_trading::examples::strategies::{
27    CompositeMarketMaker, CompositeMarketMakerConfig, DeltaNeutralVol, DeltaNeutralVolConfig,
28    EmaCross, EmaCrossConfig, GridMarketMaker, GridMarketMakerConfig, HurstVpinDirectional,
29    HurstVpinDirectionalConfig,
30};
31use nautilus_trading::{
32    ImportableStrategyConfig,
33    python::strategy::{PyStrategy, PyStrategyInner},
34};
35use pyo3::{prelude::*, types::PyDict};
36
37#[cfg(feature = "examples")]
38use crate::engine::BacktestEngine;
39use crate::{config::BacktestRunConfig, node::BacktestNode, result::BacktestResult};
40
41#[pyo3_stub_gen::derive::gen_stub_pymethods]
42#[pymethods]
43impl BacktestNode {
44    /// Orchestrates catalog-driven backtests from run configurations.
45    ///
46    /// `BacktestNode` connects the `ParquetDataCatalog` with `BacktestEngine` to load
47    /// historical data and run backtests. Supports both oneshot and streaming modes.
48    #[new]
49    fn py_new(configs: Vec<BacktestRunConfig>) -> PyResult<Self> {
50        Self::new(configs).map_err(to_pyruntime_err)
51    }
52
53    /// Builds backtest engines from the run configurations.
54    ///
55    /// For each config, creates a `BacktestEngine`, adds venues, and loads
56    /// instruments from the catalog.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if engine creation, venue setup, or instrument loading fails.
61    #[pyo3(name = "build")]
62    fn py_build(&mut self) -> PyResult<()> {
63        self.build().map_err(to_pyruntime_err)
64    }
65
66    /// Runs all configured backtests and returns results.
67    ///
68    /// Automatically calls `build()` if engines have not been created yet.
69    /// For each run config, loads data from the catalog and runs the engine.
70    /// Supports both oneshot (`chunk_size = None`) and streaming modes.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if building, data loading, or engine execution fails.
75    #[pyo3(name = "run")]
76    fn py_run(&mut self) -> PyResult<Vec<BacktestResult>> {
77        self.run().map_err(to_pyruntime_err)
78    }
79
80    /// Disposes all engines and releases resources.
81    #[pyo3(name = "dispose")]
82    fn py_dispose(&mut self) {
83        self.dispose();
84    }
85
86    #[allow(
87        unsafe_code,
88        reason = "Required for Python actor component registration"
89    )]
90    #[pyo3(name = "add_actor_from_config")]
91    #[expect(clippy::needless_pass_by_value)]
92    fn py_add_actor_from_config(
93        &mut self,
94        _py: Python,
95        run_config_id: &str,
96        config: ImportableActorConfig,
97    ) -> PyResult<()> {
98        log::debug!("`add_actor_from_config` with: {config:?}");
99
100        let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
101            to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
102        })?;
103
104        let parts: Vec<&str> = config.actor_path.split(':').collect();
105        if parts.len() != 2 {
106            return Err(to_pyvalue_err(
107                "actor_path must be in format 'module.path:ClassName'",
108            ));
109        }
110        let (module_name, class_name) = (parts[0], parts[1]);
111
112        log::info!("Importing actor from module: {module_name} class: {class_name}");
113
114        // Phase 1: Create and configure the Python actor, extract its actor_id
115        let (python_actor, actor_id) =
116            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
117                let actor_module = py
118                    .import(module_name)
119                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
120                let actor_class = actor_module
121                    .getattr(class_name)
122                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
123
124                let config_instance =
125                    create_config_instance(py, &config.config_path, &config.config)?;
126
127                let python_actor = if let Some(config_obj) = config_instance.clone() {
128                    actor_class.call1((config_obj,))?
129                } else {
130                    actor_class.call0()?
131                };
132
133                log::debug!("Created Python actor instance: {python_actor:?}");
134
135                let mut py_data_actor_ref = python_actor
136                    .extract::<PyRefMut<PyDataActor>>()
137                    .map_err(Into::<PyErr>::into)
138                    .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
139
140                // Extract inherited config fields from the Python config
141                if let Some(config_obj) = config_instance.as_ref() {
142                    if let Ok(actor_id) = config_obj.getattr("actor_id")
143                        && !actor_id.is_none()
144                    {
145                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
146                            actor_id_val
147                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
148                            ActorId::new_checked(&actor_id_str)?
149                        } else {
150                            anyhow::bail!("Invalid `actor_id` type");
151                        };
152                        py_data_actor_ref.set_actor_id(actor_id_val);
153                    }
154
155                    if let Ok(log_events) = config_obj.getattr("log_events")
156                        && let Ok(log_events_val) = log_events.extract::<bool>()
157                    {
158                        py_data_actor_ref.set_log_events(log_events_val);
159                    }
160
161                    if let Ok(log_commands) = config_obj.getattr("log_commands")
162                        && let Ok(log_commands_val) = log_commands.extract::<bool>()
163                    {
164                        py_data_actor_ref.set_log_commands(log_commands_val);
165                    }
166                }
167
168                py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
169
170                let actor_id = py_data_actor_ref.actor_id();
171
172                Ok((python_actor.unbind(), actor_id))
173            })
174            .map_err(to_pyruntime_err)?;
175
176        // Validate no duplicate before any mutations
177        if engine
178            .kernel()
179            .trader
180            .borrow()
181            .actor_ids()
182            .contains(&actor_id)
183        {
184            return Err(to_pyruntime_err(format!(
185                "Actor '{actor_id}' is already registered"
186            )));
187        }
188
189        // Phase 2: Create per-component clock via the trader (individual
190        // TestClock in backtest so each actor gets its own default timer handler)
191        let trader_id = engine.kernel().config.trader_id();
192        let cache = engine.kernel().cache.clone();
193        let component_id = ComponentId::new(actor_id.inner().as_str());
194        let clock = engine
195            .kernel_mut()
196            .trader
197            .borrow_mut()
198            .create_component_clock(component_id);
199
200        // Phase 3: Register the actor with its dedicated clock
201        Python::attach(|py| -> anyhow::Result<()> {
202            let py_actor = python_actor.bind(py);
203            let mut py_data_actor_ref = py_actor
204                .extract::<PyRefMut<PyDataActor>>()
205                .map_err(Into::<PyErr>::into)
206                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
207
208            py_data_actor_ref
209                .register(trader_id, clock, cache)
210                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
211
212            log::debug!(
213                "Internal PyDataActor registered: {}, state: {:?}",
214                py_data_actor_ref.is_registered(),
215                py_data_actor_ref.state()
216            );
217
218            Ok(())
219        })
220        .map_err(to_pyruntime_err)?;
221
222        // Phase 4: Register in global registries and track for lifecycle
223        Python::attach(|py| -> anyhow::Result<()> {
224            let py_actor = python_actor.bind(py);
225            let py_data_actor_ref = py_actor
226                .cast::<PyDataActor>()
227                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
228            py_data_actor_ref.borrow().register_in_global_registries();
229            Ok(())
230        })
231        .map_err(to_pyruntime_err)?;
232
233        engine
234            .kernel_mut()
235            .trader
236            .borrow_mut()
237            .add_actor_id_for_lifecycle(actor_id)
238            .map_err(to_pyruntime_err)?;
239
240        log::info!("Registered Python actor {actor_id}");
241        Ok(())
242    }
243
244    #[allow(
245        unsafe_code,
246        reason = "Required for Python strategy component registration"
247    )]
248    #[pyo3(name = "add_strategy_from_config")]
249    #[expect(clippy::needless_pass_by_value)]
250    fn py_add_strategy_from_config(
251        &mut self,
252        _py: Python,
253        run_config_id: &str,
254        config: ImportableStrategyConfig,
255    ) -> PyResult<()> {
256        log::debug!("`add_strategy_from_config` with: {config:?}");
257
258        let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
259            to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
260        })?;
261
262        let parts: Vec<&str> = config.strategy_path.split(':').collect();
263        if parts.len() != 2 {
264            return Err(to_pyvalue_err(
265                "strategy_path must be in format 'module.path:ClassName'",
266            ));
267        }
268        let (module_name, class_name) = (parts[0], parts[1]);
269
270        log::info!("Importing strategy from module: {module_name} class: {class_name}");
271
272        // Phase 1: Create and configure the Python strategy, extract its strategy_id
273        let (python_strategy, strategy_id) =
274            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
275                let strategy_module = py
276                    .import(module_name)
277                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
278                let strategy_class = strategy_module
279                    .getattr(class_name)
280                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
281
282                let config_instance =
283                    create_config_instance(py, &config.config_path, &config.config)?;
284
285                let python_strategy = if let Some(config_obj) = config_instance.clone() {
286                    strategy_class.call1((config_obj,))?
287                } else {
288                    strategy_class.call0()?
289                };
290
291                log::debug!("Created Python strategy instance: {python_strategy:?}");
292
293                let mut py_strategy_ref = python_strategy
294                    .extract::<PyRefMut<PyStrategy>>()
295                    .map_err(Into::<PyErr>::into)
296                    .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
297
298                // Extract inherited config fields from the Python config
299                if let Some(config_obj) = config_instance.as_ref() {
300                    if let Ok(strategy_id) = config_obj.getattr("strategy_id")
301                        && !strategy_id.is_none()
302                    {
303                        let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
304                            sid
305                        } else if let Ok(sid_str) = strategy_id.extract::<String>() {
306                            StrategyId::new_checked(&sid_str)?
307                        } else {
308                            anyhow::bail!("Invalid `strategy_id` type");
309                        };
310                        py_strategy_ref.set_strategy_id(strategy_id_val)?;
311                    }
312
313                    if let Ok(order_id_tag) = config_obj.getattr("order_id_tag")
314                        && !order_id_tag.is_none()
315                    {
316                        let order_id_tag_val = order_id_tag
317                            .extract::<String>()
318                            .map_err(|e| anyhow::anyhow!("Invalid `order_id_tag` type: {e}"))?;
319                        py_strategy_ref.set_order_id_tag(&order_id_tag_val)?;
320                    }
321
322                    if let Ok(log_events) = config_obj.getattr("log_events")
323                        && let Ok(log_events_val) = log_events.extract::<bool>()
324                    {
325                        py_strategy_ref.set_log_events(log_events_val);
326                    }
327
328                    if let Ok(log_commands) = config_obj.getattr("log_commands")
329                        && let Ok(log_commands_val) = log_commands.extract::<bool>()
330                    {
331                        py_strategy_ref.set_log_commands(log_commands_val);
332                    }
333                }
334
335                py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
336
337                let strategy_id = py_strategy_ref.strategy_id();
338
339                Ok((python_strategy.unbind(), strategy_id))
340            })
341            .map_err(to_pyruntime_err)?;
342
343        // Validate no duplicate before any mutations
344        if engine
345            .kernel()
346            .trader
347            .borrow()
348            .strategy_ids()
349            .contains(&strategy_id)
350        {
351            return Err(to_pyruntime_err(format!(
352                "Strategy '{strategy_id}' is already registered"
353            )));
354        }
355
356        // Phase 2: Create per-component clock via the trader (individual
357        // TestClock in backtest so each strategy gets its own default timer handler)
358        let trader_id = engine.kernel().config.trader_id();
359        let cache = engine.kernel().cache.clone();
360        let portfolio = engine.kernel().portfolio.clone();
361        let component_id = ComponentId::new(strategy_id.inner().as_str());
362        let clock = engine
363            .kernel_mut()
364            .trader
365            .borrow_mut()
366            .create_component_clock(component_id);
367
368        // Phase 3: Register the strategy with its dedicated clock
369        Python::attach(|py| -> anyhow::Result<()> {
370            let py_strategy = python_strategy.bind(py);
371            let mut py_strategy_ref = py_strategy
372                .extract::<PyRefMut<PyStrategy>>()
373                .map_err(Into::<PyErr>::into)
374                .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
375
376            py_strategy_ref
377                .register(trader_id, clock, cache, portfolio)
378                .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
379
380            log::debug!(
381                "Internal PyStrategy registered: {}",
382                py_strategy_ref.is_registered()
383            );
384
385            Ok(())
386        })
387        .map_err(to_pyruntime_err)?;
388
389        // Phase 4: Register in global registries and install event subscriptions
390        Python::attach(|py| -> anyhow::Result<()> {
391            let py_strategy = python_strategy.bind(py);
392            let py_strategy_ref = py_strategy
393                .cast::<PyStrategy>()
394                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
395            py_strategy_ref.borrow().register_in_global_registries();
396            Ok(())
397        })
398        .map_err(to_pyruntime_err)?;
399
400        engine
401            .kernel_mut()
402            .trader
403            .borrow_mut()
404            .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
405            .map_err(to_pyruntime_err)?;
406
407        log::info!("Registered Python strategy {strategy_id}");
408        Ok(())
409    }
410
411    /// Adds a compiled-in native Rust strategy to the engine for the given run config.
412    ///
413    /// The type name determines which built-in strategy is constructed.
414    /// All execution happens in Rust; Python is the configuration layer.
415    #[pyo3(name = "add_native_strategy")]
416    fn py_add_native_strategy(
417        &mut self,
418        run_config_id: &str,
419        type_name: &str,
420        config: &Bound<'_, PyAny>,
421    ) -> PyResult<()> {
422        #[cfg(feature = "examples")]
423        {
424            let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
425                to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
426            })?;
427
428            let register = native_strategy_register(type_name).ok_or_else(|| {
429                to_pytype_err(format!("Unsupported native strategy type: {type_name}"))
430            })?;
431            register(engine, config)
432        }
433
434        #[cfg(not(feature = "examples"))]
435        {
436            let _ = (run_config_id, type_name, config);
437            Err(to_pyruntime_err(
438                "add_native_strategy requires the `examples` feature",
439            ))
440        }
441    }
442
443    fn __repr__(&self) -> String {
444        format!("{self:?}")
445    }
446}
447
448#[cfg(feature = "examples")]
449type NativeStrategyRegister = for<'py> fn(&mut BacktestEngine, &Bound<'py, PyAny>) -> PyResult<()>;
450
451#[cfg(feature = "examples")]
452fn native_strategy_register(type_name: &str) -> Option<NativeStrategyRegister> {
453    match type_name {
454        "CompositeMarketMaker" => Some(register_composite_market_maker),
455        "DeltaNeutralVol" => Some(register_delta_neutral_vol),
456        "EmaCross" => Some(register_ema_cross),
457        "GridMarketMaker" => Some(register_grid_market_maker),
458        "HurstVpinDirectional" => Some(register_hurst_vpin_directional),
459        _ => None,
460    }
461}
462
463#[cfg(feature = "examples")]
464fn register_composite_market_maker(
465    engine: &mut BacktestEngine,
466    config: &Bound<'_, PyAny>,
467) -> PyResult<()> {
468    let config = config.extract::<CompositeMarketMakerConfig>()?;
469    engine
470        .add_strategy(CompositeMarketMaker::new(config))
471        .map_err(to_pyruntime_err)
472}
473
474#[cfg(feature = "examples")]
475fn register_delta_neutral_vol(
476    engine: &mut BacktestEngine,
477    config: &Bound<'_, PyAny>,
478) -> PyResult<()> {
479    let config = config.extract::<DeltaNeutralVolConfig>()?;
480    engine
481        .add_strategy(DeltaNeutralVol::new(config))
482        .map_err(to_pyruntime_err)
483}
484
485#[cfg(feature = "examples")]
486fn register_ema_cross(engine: &mut BacktestEngine, config: &Bound<'_, PyAny>) -> PyResult<()> {
487    let config = config.extract::<EmaCrossConfig>()?;
488    engine
489        .add_strategy(EmaCross::from_config(config))
490        .map_err(to_pyruntime_err)
491}
492
493#[cfg(feature = "examples")]
494fn register_grid_market_maker(
495    engine: &mut BacktestEngine,
496    config: &Bound<'_, PyAny>,
497) -> PyResult<()> {
498    let config = config.extract::<GridMarketMakerConfig>()?;
499    engine
500        .add_strategy(GridMarketMaker::new(config))
501        .map_err(to_pyruntime_err)
502}
503
504#[cfg(feature = "examples")]
505fn register_hurst_vpin_directional(
506    engine: &mut BacktestEngine,
507    config: &Bound<'_, PyAny>,
508) -> PyResult<()> {
509    let config = config.extract::<HurstVpinDirectionalConfig>()?;
510    engine
511        .add_strategy(HurstVpinDirectional::new(config))
512        .map_err(to_pyruntime_err)
513}
514
515#[cfg(all(test, feature = "examples"))]
516mod tests {
517    use pyo3::{Python, types::PyDict};
518    use rstest::rstest;
519
520    use crate::{config::BacktestEngineConfig, engine::BacktestEngine};
521
522    #[rstest]
523    #[case("CompositeMarketMaker")]
524    #[case("DeltaNeutralVol")]
525    #[case("EmaCross")]
526    #[case("GridMarketMaker")]
527    #[case("HurstVpinDirectional")]
528    fn test_native_strategy_register_accepts_supported_names(#[case] type_name: &str) {
529        assert!(super::native_strategy_register(type_name).is_some());
530    }
531
532    #[rstest]
533    fn test_native_strategy_register_rejects_unknown_name() {
534        assert!(super::native_strategy_register("UnknownStrategy").is_none());
535    }
536
537    #[rstest]
538    fn test_native_strategy_register_rejects_mismatched_config() {
539        Python::initialize();
540
541        let mut engine = BacktestEngine::new(BacktestEngineConfig::default()).unwrap();
542        Python::attach(|py| {
543            let register = super::native_strategy_register("EmaCross").unwrap();
544            let config = PyDict::new(py);
545            let error = register(&mut engine, config.as_any()).unwrap_err();
546
547            assert!(error.is_instance_of::<pyo3::exceptions::PyTypeError>(py));
548        });
549    }
550}
551
552pub(crate) fn create_config_instance<'py>(
553    py: Python<'py>,
554    config_path: &str,
555    config: &HashMap<String, serde_json::Value>,
556) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
557    if config_path.is_empty() && config.is_empty() {
558        log::debug!("No config_path or empty config, using None");
559        return Ok(None);
560    }
561
562    let config_parts: Vec<&str> = config_path.split(':').collect();
563    if config_parts.len() != 2 {
564        anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
565    }
566    let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
567
568    log::debug!(
569        "Importing config class from module: {config_module_name} class: {config_class_name}"
570    );
571
572    let config_module = py
573        .import(config_module_name)
574        .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
575    let config_class = config_module
576        .getattr(config_class_name)
577        .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
578
579    // Convert config dict to Python dict
580    let py_dict = PyDict::new(py);
581
582    for (key, value) in config {
583        let json_str = serde_json::to_string(value)
584            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
585        let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
586        py_dict.set_item(key, py_value)?;
587    }
588
589    log::debug!("Created config dict: {py_dict:?}");
590
591    // Try kwargs first, then default constructor with setattr
592    let config_instance = match config_class.call((), Some(&py_dict)) {
593        Ok(instance) => {
594            log::debug!("Created config instance with kwargs");
595            instance
596        }
597        Err(kwargs_err) => {
598            log::debug!("Failed to create config with kwargs: {kwargs_err}");
599
600            match config_class.call0() {
601                Ok(instance) => {
602                    log::debug!("Created default config instance, setting attributes");
603                    for (key, value) in config {
604                        let json_str = serde_json::to_string(value).map_err(|e| {
605                            anyhow::anyhow!("Failed to serialize config value: {e}")
606                        })?;
607                        let py_value = PyModule::import(py, "json")?.call_method(
608                            "loads",
609                            (json_str,),
610                            None,
611                        )?;
612
613                        if let Err(setattr_err) = instance.setattr(key, py_value) {
614                            log::warn!("Failed to set attribute {key}: {setattr_err}");
615                        }
616                    }
617
618                    // Only call __post_init__ if it exists (setattr path
619                    // needs it, kwargs path already triggered it via __init__)
620                    if instance.hasattr("__post_init__")? {
621                        instance.call_method0("__post_init__")?;
622                    }
623
624                    instance
625                }
626                Err(default_err) => {
627                    anyhow::bail!(
628                        "Failed to create config instance. \
629                         Tried kwargs: {kwargs_err}, default: {default_err}"
630                    );
631                }
632            }
633        }
634    };
635
636    log::debug!("Created config instance: {config_instance:?}");
637
638    Ok(Some(config_instance))
639}