Skip to main content

nautilus_backtest/python/
node.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for backtest node.
17
18use std::collections::HashMap;
19
20use nautilus_common::{actor::data_actor::ImportableActorConfig, python::actor::PyDataActor};
21use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
22use nautilus_model::identifiers::{ActorId, ComponentId, StrategyId};
23use nautilus_trading::{
24    ImportableStrategyConfig,
25    python::strategy::{PyStrategy, PyStrategyInner},
26};
27use pyo3::{prelude::*, types::PyDict};
28
29use crate::{config::BacktestRunConfig, engine::BacktestResult, node::BacktestNode};
30
31#[pymethods]
32impl BacktestNode {
33    #[new]
34    fn py_new(configs: Vec<BacktestRunConfig>) -> PyResult<Self> {
35        Self::new(configs).map_err(to_pyruntime_err)
36    }
37
38    #[pyo3(name = "build")]
39    fn py_build(&mut self) -> PyResult<()> {
40        self.build().map_err(to_pyruntime_err)
41    }
42
43    #[pyo3(name = "run")]
44    fn py_run(&mut self) -> PyResult<Vec<BacktestResult>> {
45        self.run().map_err(to_pyruntime_err)
46    }
47
48    #[pyo3(name = "dispose")]
49    fn py_dispose(&mut self) {
50        self.dispose();
51    }
52
53    #[allow(
54        unsafe_code,
55        reason = "Required for Python actor component registration"
56    )]
57    #[pyo3(name = "add_actor_from_config")]
58    fn py_add_actor_from_config(
59        &mut self,
60        _py: Python,
61        run_config_id: &str,
62        config: ImportableActorConfig,
63    ) -> PyResult<()> {
64        log::debug!("`add_actor_from_config` with: {config:?}");
65
66        let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
67            to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
68        })?;
69
70        let parts: Vec<&str> = config.actor_path.split(':').collect();
71        if parts.len() != 2 {
72            return Err(to_pyvalue_err(
73                "actor_path must be in format 'module.path:ClassName'",
74            ));
75        }
76        let (module_name, class_name) = (parts[0], parts[1]);
77
78        log::info!("Importing actor from module: {module_name} class: {class_name}");
79
80        // Phase 1: Create and configure the Python actor, extract its actor_id
81        let (python_actor, actor_id) =
82            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
83                let actor_module = py
84                    .import(module_name)
85                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
86                let actor_class = actor_module
87                    .getattr(class_name)
88                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
89
90                let config_instance =
91                    create_config_instance(py, &config.config_path, &config.config)?;
92
93                let python_actor = if let Some(config_obj) = config_instance.clone() {
94                    actor_class.call1((config_obj,))?
95                } else {
96                    actor_class.call0()?
97                };
98
99                log::debug!("Created Python actor instance: {python_actor:?}");
100
101                let mut py_data_actor_ref = python_actor
102                    .extract::<PyRefMut<PyDataActor>>()
103                    .map_err(Into::<PyErr>::into)
104                    .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
105
106                // Extract inherited config fields from the Python config
107                if let Some(config_obj) = config_instance.as_ref() {
108                    if let Ok(actor_id) = config_obj.getattr("actor_id")
109                        && !actor_id.is_none()
110                    {
111                        let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
112                            actor_id_val
113                        } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
114                            ActorId::new_checked(&actor_id_str)?
115                        } else {
116                            anyhow::bail!("Invalid `actor_id` type");
117                        };
118                        py_data_actor_ref.set_actor_id(actor_id_val);
119                    }
120
121                    if let Ok(log_events) = config_obj.getattr("log_events")
122                        && let Ok(log_events_val) = log_events.extract::<bool>()
123                    {
124                        py_data_actor_ref.set_log_events(log_events_val);
125                    }
126
127                    if let Ok(log_commands) = config_obj.getattr("log_commands")
128                        && let Ok(log_commands_val) = log_commands.extract::<bool>()
129                    {
130                        py_data_actor_ref.set_log_commands(log_commands_val);
131                    }
132                }
133
134                py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
135
136                let actor_id = py_data_actor_ref.actor_id();
137
138                Ok((python_actor.unbind(), actor_id))
139            })
140            .map_err(to_pyruntime_err)?;
141
142        // Validate no duplicate before any mutations
143        if engine.kernel().trader.actor_ids().contains(&actor_id) {
144            return Err(to_pyruntime_err(format!(
145                "Actor '{actor_id}' is already registered"
146            )));
147        }
148
149        // Phase 2: Create per-component clock via the trader (individual
150        // TestClock in backtest so each actor gets its own default timer handler)
151        let trader_id = engine.kernel().config.trader_id();
152        let cache = engine.kernel().cache.clone();
153        let component_id = ComponentId::new(actor_id.inner().as_str());
154        let clock = engine
155            .kernel_mut()
156            .trader
157            .create_component_clock(component_id);
158
159        // Phase 3: Register the actor with its dedicated clock
160        Python::attach(|py| -> anyhow::Result<()> {
161            let py_actor = python_actor.bind(py);
162            let mut py_data_actor_ref = py_actor
163                .extract::<PyRefMut<PyDataActor>>()
164                .map_err(Into::<PyErr>::into)
165                .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
166
167            py_data_actor_ref
168                .register(trader_id, clock, cache)
169                .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
170
171            log::debug!(
172                "Internal PyDataActor registered: {}, state: {:?}",
173                py_data_actor_ref.is_registered(),
174                py_data_actor_ref.state()
175            );
176
177            Ok(())
178        })
179        .map_err(to_pyruntime_err)?;
180
181        // Phase 4: Register in global registries and track for lifecycle
182        Python::attach(|py| -> anyhow::Result<()> {
183            let py_actor = python_actor.bind(py);
184            let py_data_actor_ref = py_actor
185                .cast::<PyDataActor>()
186                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
187            py_data_actor_ref.borrow().register_in_global_registries();
188            Ok(())
189        })
190        .map_err(to_pyruntime_err)?;
191
192        engine
193            .kernel_mut()
194            .trader
195            .add_actor_id_for_lifecycle(actor_id)
196            .map_err(to_pyruntime_err)?;
197
198        log::info!("Registered Python actor {actor_id}");
199        Ok(())
200    }
201
202    #[allow(
203        unsafe_code,
204        reason = "Required for Python strategy component registration"
205    )]
206    #[pyo3(name = "add_strategy_from_config")]
207    fn py_add_strategy_from_config(
208        &mut self,
209        _py: Python,
210        run_config_id: &str,
211        config: ImportableStrategyConfig,
212    ) -> PyResult<()> {
213        log::debug!("`add_strategy_from_config` with: {config:?}");
214
215        let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
216            to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
217        })?;
218
219        let parts: Vec<&str> = config.strategy_path.split(':').collect();
220        if parts.len() != 2 {
221            return Err(to_pyvalue_err(
222                "strategy_path must be in format 'module.path:ClassName'",
223            ));
224        }
225        let (module_name, class_name) = (parts[0], parts[1]);
226
227        log::info!("Importing strategy from module: {module_name} class: {class_name}");
228
229        // Phase 1: Create and configure the Python strategy, extract its strategy_id
230        let (python_strategy, strategy_id) =
231            Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
232                let strategy_module = py
233                    .import(module_name)
234                    .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
235                let strategy_class = strategy_module
236                    .getattr(class_name)
237                    .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
238
239                let config_instance =
240                    create_config_instance(py, &config.config_path, &config.config)?;
241
242                let python_strategy = if let Some(config_obj) = config_instance.clone() {
243                    strategy_class.call1((config_obj,))?
244                } else {
245                    strategy_class.call0()?
246                };
247
248                log::debug!("Created Python strategy instance: {python_strategy:?}");
249
250                let mut py_strategy_ref = python_strategy
251                    .extract::<PyRefMut<PyStrategy>>()
252                    .map_err(Into::<PyErr>::into)
253                    .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
254
255                // Extract inherited config fields from the Python config
256                if let Some(config_obj) = config_instance.as_ref() {
257                    if let Ok(strategy_id) = config_obj.getattr("strategy_id")
258                        && !strategy_id.is_none()
259                    {
260                        let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
261                            sid
262                        } else if let Ok(sid_str) = strategy_id.extract::<String>() {
263                            StrategyId::new_checked(&sid_str)?
264                        } else {
265                            anyhow::bail!("Invalid `strategy_id` type");
266                        };
267                        py_strategy_ref.set_strategy_id(strategy_id_val);
268                    }
269
270                    if let Ok(log_events) = config_obj.getattr("log_events")
271                        && let Ok(log_events_val) = log_events.extract::<bool>()
272                    {
273                        py_strategy_ref.set_log_events(log_events_val);
274                    }
275
276                    if let Ok(log_commands) = config_obj.getattr("log_commands")
277                        && let Ok(log_commands_val) = log_commands.extract::<bool>()
278                    {
279                        py_strategy_ref.set_log_commands(log_commands_val);
280                    }
281                }
282
283                py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
284
285                let strategy_id = py_strategy_ref.strategy_id();
286
287                Ok((python_strategy.unbind(), strategy_id))
288            })
289            .map_err(to_pyruntime_err)?;
290
291        // Validate no duplicate before any mutations
292        if engine.kernel().trader.strategy_ids().contains(&strategy_id) {
293            return Err(to_pyruntime_err(format!(
294                "Strategy '{strategy_id}' is already registered"
295            )));
296        }
297
298        // Phase 2: Create per-component clock via the trader (individual
299        // TestClock in backtest so each strategy gets its own default timer handler)
300        let trader_id = engine.kernel().config.trader_id();
301        let cache = engine.kernel().cache.clone();
302        let portfolio = engine.kernel().portfolio.clone();
303        let component_id = ComponentId::new(strategy_id.inner().as_str());
304        let clock = engine
305            .kernel_mut()
306            .trader
307            .create_component_clock(component_id);
308
309        // Phase 3: Register the strategy with its dedicated clock
310        Python::attach(|py| -> anyhow::Result<()> {
311            let py_strategy = python_strategy.bind(py);
312            let mut py_strategy_ref = py_strategy
313                .extract::<PyRefMut<PyStrategy>>()
314                .map_err(Into::<PyErr>::into)
315                .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
316
317            py_strategy_ref
318                .register(trader_id, clock, cache, portfolio)
319                .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
320
321            log::debug!(
322                "Internal PyStrategy registered: {}",
323                py_strategy_ref.is_registered()
324            );
325
326            Ok(())
327        })
328        .map_err(to_pyruntime_err)?;
329
330        // Phase 4: Register in global registries and install event subscriptions
331        Python::attach(|py| -> anyhow::Result<()> {
332            let py_strategy = python_strategy.bind(py);
333            let py_strategy_ref = py_strategy
334                .cast::<PyStrategy>()
335                .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
336            py_strategy_ref.borrow().register_in_global_registries();
337            Ok(())
338        })
339        .map_err(to_pyruntime_err)?;
340
341        engine
342            .kernel_mut()
343            .trader
344            .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
345            .map_err(to_pyruntime_err)?;
346
347        log::info!("Registered Python strategy {strategy_id}");
348        Ok(())
349    }
350
351    fn __repr__(&self) -> String {
352        format!("{self:?}")
353    }
354}
355
356fn create_config_instance<'py>(
357    py: Python<'py>,
358    config_path: &str,
359    config: &HashMap<String, serde_json::Value>,
360) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
361    if config_path.is_empty() && config.is_empty() {
362        log::debug!("No config_path or empty config, using None");
363        return Ok(None);
364    }
365
366    let config_parts: Vec<&str> = config_path.split(':').collect();
367    if config_parts.len() != 2 {
368        anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
369    }
370    let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
371
372    log::debug!(
373        "Importing config class from module: {config_module_name} class: {config_class_name}"
374    );
375
376    let config_module = py
377        .import(config_module_name)
378        .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
379    let config_class = config_module
380        .getattr(config_class_name)
381        .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
382
383    // Convert config dict to Python dict
384    let py_dict = PyDict::new(py);
385    for (key, value) in config {
386        let json_str = serde_json::to_string(value)
387            .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
388        let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
389        py_dict.set_item(key, py_value)?;
390    }
391
392    log::debug!("Created config dict: {py_dict:?}");
393
394    // Try kwargs first, then default constructor with setattr
395    let config_instance = match config_class.call((), Some(&py_dict)) {
396        Ok(instance) => {
397            log::debug!("Created config instance with kwargs");
398            instance
399        }
400        Err(kwargs_err) => {
401            log::debug!("Failed to create config with kwargs: {kwargs_err}");
402
403            match config_class.call0() {
404                Ok(instance) => {
405                    log::debug!("Created default config instance, setting attributes");
406                    for (key, value) in config {
407                        let json_str = serde_json::to_string(value).map_err(|e| {
408                            anyhow::anyhow!("Failed to serialize config value: {e}")
409                        })?;
410                        let py_value = PyModule::import(py, "json")?.call_method(
411                            "loads",
412                            (json_str,),
413                            None,
414                        )?;
415
416                        if let Err(setattr_err) = instance.setattr(key, py_value) {
417                            log::warn!("Failed to set attribute {key}: {setattr_err}");
418                        }
419                    }
420
421                    // Only call __post_init__ if it exists (setattr path
422                    // needs it, kwargs path already triggered it via __init__)
423                    if instance.hasattr("__post_init__")? {
424                        instance.call_method0("__post_init__")?;
425                    }
426
427                    instance
428                }
429                Err(default_err) => {
430                    anyhow::bail!(
431                        "Failed to create config instance. \
432                         Tried kwargs: {kwargs_err}, default: {default_err}"
433                    );
434                }
435            }
436        }
437    };
438
439    log::debug!("Created config instance: {config_instance:?}");
440
441    Ok(Some(config_instance))
442}