nautilus_backtest/python/
node.rs1use std::collections::HashMap;
19
20use nautilus_common::{actor::data_actor::ImportableActorConfig, python::actor::PyDataActor};
21use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
22use nautilus_model::identifiers::{ActorId, ComponentId, StrategyId};
23use nautilus_trading::{
24 ImportableStrategyConfig,
25 python::strategy::{PyStrategy, PyStrategyInner},
26};
27use pyo3::{prelude::*, types::PyDict};
28
29use crate::{config::BacktestRunConfig, engine::BacktestResult, node::BacktestNode};
30
31#[pymethods]
32impl BacktestNode {
33 #[new]
34 fn py_new(configs: Vec<BacktestRunConfig>) -> PyResult<Self> {
35 Self::new(configs).map_err(to_pyruntime_err)
36 }
37
38 #[pyo3(name = "build")]
39 fn py_build(&mut self) -> PyResult<()> {
40 self.build().map_err(to_pyruntime_err)
41 }
42
43 #[pyo3(name = "run")]
44 fn py_run(&mut self) -> PyResult<Vec<BacktestResult>> {
45 self.run().map_err(to_pyruntime_err)
46 }
47
48 #[pyo3(name = "dispose")]
49 fn py_dispose(&mut self) {
50 self.dispose();
51 }
52
53 #[allow(
54 unsafe_code,
55 reason = "Required for Python actor component registration"
56 )]
57 #[pyo3(name = "add_actor_from_config")]
58 fn py_add_actor_from_config(
59 &mut self,
60 _py: Python,
61 run_config_id: &str,
62 config: ImportableActorConfig,
63 ) -> PyResult<()> {
64 log::debug!("`add_actor_from_config` with: {config:?}");
65
66 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
67 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
68 })?;
69
70 let parts: Vec<&str> = config.actor_path.split(':').collect();
71 if parts.len() != 2 {
72 return Err(to_pyvalue_err(
73 "actor_path must be in format 'module.path:ClassName'",
74 ));
75 }
76 let (module_name, class_name) = (parts[0], parts[1]);
77
78 log::info!("Importing actor from module: {module_name} class: {class_name}");
79
80 let (python_actor, actor_id) =
82 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
83 let actor_module = py
84 .import(module_name)
85 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
86 let actor_class = actor_module
87 .getattr(class_name)
88 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
89
90 let config_instance =
91 create_config_instance(py, &config.config_path, &config.config)?;
92
93 let python_actor = if let Some(config_obj) = config_instance.clone() {
94 actor_class.call1((config_obj,))?
95 } else {
96 actor_class.call0()?
97 };
98
99 log::debug!("Created Python actor instance: {python_actor:?}");
100
101 let mut py_data_actor_ref = python_actor
102 .extract::<PyRefMut<PyDataActor>>()
103 .map_err(Into::<PyErr>::into)
104 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
105
106 if let Some(config_obj) = config_instance.as_ref() {
108 if let Ok(actor_id) = config_obj.getattr("actor_id")
109 && !actor_id.is_none()
110 {
111 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
112 actor_id_val
113 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
114 ActorId::new_checked(&actor_id_str)?
115 } else {
116 anyhow::bail!("Invalid `actor_id` type");
117 };
118 py_data_actor_ref.set_actor_id(actor_id_val);
119 }
120
121 if let Ok(log_events) = config_obj.getattr("log_events")
122 && let Ok(log_events_val) = log_events.extract::<bool>()
123 {
124 py_data_actor_ref.set_log_events(log_events_val);
125 }
126
127 if let Ok(log_commands) = config_obj.getattr("log_commands")
128 && let Ok(log_commands_val) = log_commands.extract::<bool>()
129 {
130 py_data_actor_ref.set_log_commands(log_commands_val);
131 }
132 }
133
134 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
135
136 let actor_id = py_data_actor_ref.actor_id();
137
138 Ok((python_actor.unbind(), actor_id))
139 })
140 .map_err(to_pyruntime_err)?;
141
142 if engine.kernel().trader.actor_ids().contains(&actor_id) {
144 return Err(to_pyruntime_err(format!(
145 "Actor '{actor_id}' is already registered"
146 )));
147 }
148
149 let trader_id = engine.kernel().config.trader_id();
152 let cache = engine.kernel().cache.clone();
153 let component_id = ComponentId::new(actor_id.inner().as_str());
154 let clock = engine
155 .kernel_mut()
156 .trader
157 .create_component_clock(component_id);
158
159 Python::attach(|py| -> anyhow::Result<()> {
161 let py_actor = python_actor.bind(py);
162 let mut py_data_actor_ref = py_actor
163 .extract::<PyRefMut<PyDataActor>>()
164 .map_err(Into::<PyErr>::into)
165 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
166
167 py_data_actor_ref
168 .register(trader_id, clock, cache)
169 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
170
171 log::debug!(
172 "Internal PyDataActor registered: {}, state: {:?}",
173 py_data_actor_ref.is_registered(),
174 py_data_actor_ref.state()
175 );
176
177 Ok(())
178 })
179 .map_err(to_pyruntime_err)?;
180
181 Python::attach(|py| -> anyhow::Result<()> {
183 let py_actor = python_actor.bind(py);
184 let py_data_actor_ref = py_actor
185 .cast::<PyDataActor>()
186 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
187 py_data_actor_ref.borrow().register_in_global_registries();
188 Ok(())
189 })
190 .map_err(to_pyruntime_err)?;
191
192 engine
193 .kernel_mut()
194 .trader
195 .add_actor_id_for_lifecycle(actor_id)
196 .map_err(to_pyruntime_err)?;
197
198 log::info!("Registered Python actor {actor_id}");
199 Ok(())
200 }
201
202 #[allow(
203 unsafe_code,
204 reason = "Required for Python strategy component registration"
205 )]
206 #[pyo3(name = "add_strategy_from_config")]
207 fn py_add_strategy_from_config(
208 &mut self,
209 _py: Python,
210 run_config_id: &str,
211 config: ImportableStrategyConfig,
212 ) -> PyResult<()> {
213 log::debug!("`add_strategy_from_config` with: {config:?}");
214
215 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
216 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
217 })?;
218
219 let parts: Vec<&str> = config.strategy_path.split(':').collect();
220 if parts.len() != 2 {
221 return Err(to_pyvalue_err(
222 "strategy_path must be in format 'module.path:ClassName'",
223 ));
224 }
225 let (module_name, class_name) = (parts[0], parts[1]);
226
227 log::info!("Importing strategy from module: {module_name} class: {class_name}");
228
229 let (python_strategy, strategy_id) =
231 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
232 let strategy_module = py
233 .import(module_name)
234 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
235 let strategy_class = strategy_module
236 .getattr(class_name)
237 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
238
239 let config_instance =
240 create_config_instance(py, &config.config_path, &config.config)?;
241
242 let python_strategy = if let Some(config_obj) = config_instance.clone() {
243 strategy_class.call1((config_obj,))?
244 } else {
245 strategy_class.call0()?
246 };
247
248 log::debug!("Created Python strategy instance: {python_strategy:?}");
249
250 let mut py_strategy_ref = python_strategy
251 .extract::<PyRefMut<PyStrategy>>()
252 .map_err(Into::<PyErr>::into)
253 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
254
255 if let Some(config_obj) = config_instance.as_ref() {
257 if let Ok(strategy_id) = config_obj.getattr("strategy_id")
258 && !strategy_id.is_none()
259 {
260 let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
261 sid
262 } else if let Ok(sid_str) = strategy_id.extract::<String>() {
263 StrategyId::new_checked(&sid_str)?
264 } else {
265 anyhow::bail!("Invalid `strategy_id` type");
266 };
267 py_strategy_ref.set_strategy_id(strategy_id_val);
268 }
269
270 if let Ok(log_events) = config_obj.getattr("log_events")
271 && let Ok(log_events_val) = log_events.extract::<bool>()
272 {
273 py_strategy_ref.set_log_events(log_events_val);
274 }
275
276 if let Ok(log_commands) = config_obj.getattr("log_commands")
277 && let Ok(log_commands_val) = log_commands.extract::<bool>()
278 {
279 py_strategy_ref.set_log_commands(log_commands_val);
280 }
281 }
282
283 py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
284
285 let strategy_id = py_strategy_ref.strategy_id();
286
287 Ok((python_strategy.unbind(), strategy_id))
288 })
289 .map_err(to_pyruntime_err)?;
290
291 if engine.kernel().trader.strategy_ids().contains(&strategy_id) {
293 return Err(to_pyruntime_err(format!(
294 "Strategy '{strategy_id}' is already registered"
295 )));
296 }
297
298 let trader_id = engine.kernel().config.trader_id();
301 let cache = engine.kernel().cache.clone();
302 let portfolio = engine.kernel().portfolio.clone();
303 let component_id = ComponentId::new(strategy_id.inner().as_str());
304 let clock = engine
305 .kernel_mut()
306 .trader
307 .create_component_clock(component_id);
308
309 Python::attach(|py| -> anyhow::Result<()> {
311 let py_strategy = python_strategy.bind(py);
312 let mut py_strategy_ref = py_strategy
313 .extract::<PyRefMut<PyStrategy>>()
314 .map_err(Into::<PyErr>::into)
315 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
316
317 py_strategy_ref
318 .register(trader_id, clock, cache, portfolio)
319 .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
320
321 log::debug!(
322 "Internal PyStrategy registered: {}",
323 py_strategy_ref.is_registered()
324 );
325
326 Ok(())
327 })
328 .map_err(to_pyruntime_err)?;
329
330 Python::attach(|py| -> anyhow::Result<()> {
332 let py_strategy = python_strategy.bind(py);
333 let py_strategy_ref = py_strategy
334 .cast::<PyStrategy>()
335 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
336 py_strategy_ref.borrow().register_in_global_registries();
337 Ok(())
338 })
339 .map_err(to_pyruntime_err)?;
340
341 engine
342 .kernel_mut()
343 .trader
344 .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
345 .map_err(to_pyruntime_err)?;
346
347 log::info!("Registered Python strategy {strategy_id}");
348 Ok(())
349 }
350
351 fn __repr__(&self) -> String {
352 format!("{self:?}")
353 }
354}
355
356fn create_config_instance<'py>(
357 py: Python<'py>,
358 config_path: &str,
359 config: &HashMap<String, serde_json::Value>,
360) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
361 if config_path.is_empty() && config.is_empty() {
362 log::debug!("No config_path or empty config, using None");
363 return Ok(None);
364 }
365
366 let config_parts: Vec<&str> = config_path.split(':').collect();
367 if config_parts.len() != 2 {
368 anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
369 }
370 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
371
372 log::debug!(
373 "Importing config class from module: {config_module_name} class: {config_class_name}"
374 );
375
376 let config_module = py
377 .import(config_module_name)
378 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
379 let config_class = config_module
380 .getattr(config_class_name)
381 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
382
383 let py_dict = PyDict::new(py);
385 for (key, value) in config {
386 let json_str = serde_json::to_string(value)
387 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
388 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
389 py_dict.set_item(key, py_value)?;
390 }
391
392 log::debug!("Created config dict: {py_dict:?}");
393
394 let config_instance = match config_class.call((), Some(&py_dict)) {
396 Ok(instance) => {
397 log::debug!("Created config instance with kwargs");
398 instance
399 }
400 Err(kwargs_err) => {
401 log::debug!("Failed to create config with kwargs: {kwargs_err}");
402
403 match config_class.call0() {
404 Ok(instance) => {
405 log::debug!("Created default config instance, setting attributes");
406 for (key, value) in config {
407 let json_str = serde_json::to_string(value).map_err(|e| {
408 anyhow::anyhow!("Failed to serialize config value: {e}")
409 })?;
410 let py_value = PyModule::import(py, "json")?.call_method(
411 "loads",
412 (json_str,),
413 None,
414 )?;
415
416 if let Err(setattr_err) = instance.setattr(key, py_value) {
417 log::warn!("Failed to set attribute {key}: {setattr_err}");
418 }
419 }
420
421 if instance.hasattr("__post_init__")? {
424 instance.call_method0("__post_init__")?;
425 }
426
427 instance
428 }
429 Err(default_err) => {
430 anyhow::bail!(
431 "Failed to create config instance. \
432 Tried kwargs: {kwargs_err}, default: {default_err}"
433 );
434 }
435 }
436 }
437 };
438
439 log::debug!("Created config instance: {config_instance:?}");
440
441 Ok(Some(config_instance))
442}