nautilus_backtest/python/
node.rs1use std::collections::HashMap;
19
20use nautilus_common::{actor::data_actor::ImportableActorConfig, python::actor::PyDataActor};
21#[cfg(feature = "examples")]
22use nautilus_core::python::to_pytype_err;
23use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
24use nautilus_model::identifiers::{ActorId, ComponentId, StrategyId};
25#[cfg(feature = "examples")]
26use nautilus_trading::examples::strategies::{
27 CompositeMarketMaker, CompositeMarketMakerConfig, DeltaNeutralVol, DeltaNeutralVolConfig,
28 EmaCross, EmaCrossConfig, GridMarketMaker, GridMarketMakerConfig, HurstVpinDirectional,
29 HurstVpinDirectionalConfig,
30};
31use nautilus_trading::{
32 ImportableStrategyConfig,
33 python::strategy::{PyStrategy, PyStrategyInner},
34};
35use pyo3::{prelude::*, types::PyDict};
36
37#[cfg(feature = "examples")]
38use crate::engine::BacktestEngine;
39use crate::{config::BacktestRunConfig, node::BacktestNode, result::BacktestResult};
40
41#[pyo3_stub_gen::derive::gen_stub_pymethods]
42#[pymethods]
43impl BacktestNode {
44 #[new]
49 fn py_new(configs: Vec<BacktestRunConfig>) -> PyResult<Self> {
50 Self::new(configs).map_err(to_pyruntime_err)
51 }
52
53 #[pyo3(name = "build")]
62 fn py_build(&mut self) -> PyResult<()> {
63 self.build().map_err(to_pyruntime_err)
64 }
65
66 #[pyo3(name = "run")]
76 fn py_run(&mut self) -> PyResult<Vec<BacktestResult>> {
77 self.run().map_err(to_pyruntime_err)
78 }
79
80 #[pyo3(name = "dispose")]
82 fn py_dispose(&mut self) {
83 self.dispose();
84 }
85
86 #[allow(
87 unsafe_code,
88 reason = "Required for Python actor component registration"
89 )]
90 #[pyo3(name = "add_actor_from_config")]
91 #[expect(clippy::needless_pass_by_value)]
92 fn py_add_actor_from_config(
93 &mut self,
94 _py: Python,
95 run_config_id: &str,
96 config: ImportableActorConfig,
97 ) -> PyResult<()> {
98 log::debug!("`add_actor_from_config` with: {config:?}");
99
100 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
101 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
102 })?;
103
104 let parts: Vec<&str> = config.actor_path.split(':').collect();
105 if parts.len() != 2 {
106 return Err(to_pyvalue_err(
107 "actor_path must be in format 'module.path:ClassName'",
108 ));
109 }
110 let (module_name, class_name) = (parts[0], parts[1]);
111
112 log::info!("Importing actor from module: {module_name} class: {class_name}");
113
114 let (python_actor, actor_id) =
116 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
117 let actor_module = py
118 .import(module_name)
119 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
120 let actor_class = actor_module
121 .getattr(class_name)
122 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
123
124 let config_instance =
125 create_config_instance(py, &config.config_path, &config.config)?;
126
127 let python_actor = if let Some(config_obj) = config_instance.clone() {
128 actor_class.call1((config_obj,))?
129 } else {
130 actor_class.call0()?
131 };
132
133 log::debug!("Created Python actor instance: {python_actor:?}");
134
135 let mut py_data_actor_ref = python_actor
136 .extract::<PyRefMut<PyDataActor>>()
137 .map_err(Into::<PyErr>::into)
138 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
139
140 if let Some(config_obj) = config_instance.as_ref() {
142 if let Ok(actor_id) = config_obj.getattr("actor_id")
143 && !actor_id.is_none()
144 {
145 let actor_id_val = if let Ok(actor_id_val) = actor_id.extract::<ActorId>() {
146 actor_id_val
147 } else if let Ok(actor_id_str) = actor_id.extract::<String>() {
148 ActorId::new_checked(&actor_id_str)?
149 } else {
150 anyhow::bail!("Invalid `actor_id` type");
151 };
152 py_data_actor_ref.set_actor_id(actor_id_val);
153 }
154
155 if let Ok(log_events) = config_obj.getattr("log_events")
156 && let Ok(log_events_val) = log_events.extract::<bool>()
157 {
158 py_data_actor_ref.set_log_events(log_events_val);
159 }
160
161 if let Ok(log_commands) = config_obj.getattr("log_commands")
162 && let Ok(log_commands_val) = log_commands.extract::<bool>()
163 {
164 py_data_actor_ref.set_log_commands(log_commands_val);
165 }
166 }
167
168 py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
169
170 let actor_id = py_data_actor_ref.actor_id();
171
172 Ok((python_actor.unbind(), actor_id))
173 })
174 .map_err(to_pyruntime_err)?;
175
176 if engine
178 .kernel()
179 .trader
180 .borrow()
181 .actor_ids()
182 .contains(&actor_id)
183 {
184 return Err(to_pyruntime_err(format!(
185 "Actor '{actor_id}' is already registered"
186 )));
187 }
188
189 let trader_id = engine.kernel().config.trader_id();
192 let cache = engine.kernel().cache.clone();
193 let component_id = ComponentId::new(actor_id.inner().as_str());
194 let clock = engine
195 .kernel_mut()
196 .trader
197 .borrow_mut()
198 .create_component_clock(component_id);
199
200 Python::attach(|py| -> anyhow::Result<()> {
202 let py_actor = python_actor.bind(py);
203 let mut py_data_actor_ref = py_actor
204 .extract::<PyRefMut<PyDataActor>>()
205 .map_err(Into::<PyErr>::into)
206 .map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
207
208 py_data_actor_ref
209 .register(trader_id, clock, cache)
210 .map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
211
212 log::debug!(
213 "Internal PyDataActor registered: {}, state: {:?}",
214 py_data_actor_ref.is_registered(),
215 py_data_actor_ref.state()
216 );
217
218 Ok(())
219 })
220 .map_err(to_pyruntime_err)?;
221
222 Python::attach(|py| -> anyhow::Result<()> {
224 let py_actor = python_actor.bind(py);
225 let py_data_actor_ref = py_actor
226 .cast::<PyDataActor>()
227 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
228 py_data_actor_ref.borrow().register_in_global_registries();
229 Ok(())
230 })
231 .map_err(to_pyruntime_err)?;
232
233 engine
234 .kernel_mut()
235 .trader
236 .borrow_mut()
237 .add_actor_id_for_lifecycle(actor_id)
238 .map_err(to_pyruntime_err)?;
239
240 log::info!("Registered Python actor {actor_id}");
241 Ok(())
242 }
243
244 #[allow(
245 unsafe_code,
246 reason = "Required for Python strategy component registration"
247 )]
248 #[pyo3(name = "add_strategy_from_config")]
249 #[expect(clippy::needless_pass_by_value)]
250 fn py_add_strategy_from_config(
251 &mut self,
252 _py: Python,
253 run_config_id: &str,
254 config: ImportableStrategyConfig,
255 ) -> PyResult<()> {
256 log::debug!("`add_strategy_from_config` with: {config:?}");
257
258 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
259 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
260 })?;
261
262 let parts: Vec<&str> = config.strategy_path.split(':').collect();
263 if parts.len() != 2 {
264 return Err(to_pyvalue_err(
265 "strategy_path must be in format 'module.path:ClassName'",
266 ));
267 }
268 let (module_name, class_name) = (parts[0], parts[1]);
269
270 log::info!("Importing strategy from module: {module_name} class: {class_name}");
271
272 let (python_strategy, strategy_id) =
274 Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
275 let strategy_module = py
276 .import(module_name)
277 .map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
278 let strategy_class = strategy_module
279 .getattr(class_name)
280 .map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
281
282 let config_instance =
283 create_config_instance(py, &config.config_path, &config.config)?;
284
285 let python_strategy = if let Some(config_obj) = config_instance.clone() {
286 strategy_class.call1((config_obj,))?
287 } else {
288 strategy_class.call0()?
289 };
290
291 log::debug!("Created Python strategy instance: {python_strategy:?}");
292
293 let mut py_strategy_ref = python_strategy
294 .extract::<PyRefMut<PyStrategy>>()
295 .map_err(Into::<PyErr>::into)
296 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
297
298 if let Some(config_obj) = config_instance.as_ref() {
300 if let Ok(strategy_id) = config_obj.getattr("strategy_id")
301 && !strategy_id.is_none()
302 {
303 let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
304 sid
305 } else if let Ok(sid_str) = strategy_id.extract::<String>() {
306 StrategyId::new_checked(&sid_str)?
307 } else {
308 anyhow::bail!("Invalid `strategy_id` type");
309 };
310 py_strategy_ref.set_strategy_id(strategy_id_val)?;
311 }
312
313 if let Ok(order_id_tag) = config_obj.getattr("order_id_tag")
314 && !order_id_tag.is_none()
315 {
316 let order_id_tag_val = order_id_tag
317 .extract::<String>()
318 .map_err(|e| anyhow::anyhow!("Invalid `order_id_tag` type: {e}"))?;
319 py_strategy_ref.set_order_id_tag(&order_id_tag_val)?;
320 }
321
322 if let Ok(log_events) = config_obj.getattr("log_events")
323 && let Ok(log_events_val) = log_events.extract::<bool>()
324 {
325 py_strategy_ref.set_log_events(log_events_val);
326 }
327
328 if let Ok(log_commands) = config_obj.getattr("log_commands")
329 && let Ok(log_commands_val) = log_commands.extract::<bool>()
330 {
331 py_strategy_ref.set_log_commands(log_commands_val);
332 }
333 }
334
335 py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
336
337 let strategy_id = py_strategy_ref.strategy_id();
338
339 Ok((python_strategy.unbind(), strategy_id))
340 })
341 .map_err(to_pyruntime_err)?;
342
343 if engine
345 .kernel()
346 .trader
347 .borrow()
348 .strategy_ids()
349 .contains(&strategy_id)
350 {
351 return Err(to_pyruntime_err(format!(
352 "Strategy '{strategy_id}' is already registered"
353 )));
354 }
355
356 let trader_id = engine.kernel().config.trader_id();
359 let cache = engine.kernel().cache.clone();
360 let portfolio = engine.kernel().portfolio.clone();
361 let component_id = ComponentId::new(strategy_id.inner().as_str());
362 let clock = engine
363 .kernel_mut()
364 .trader
365 .borrow_mut()
366 .create_component_clock(component_id);
367
368 Python::attach(|py| -> anyhow::Result<()> {
370 let py_strategy = python_strategy.bind(py);
371 let mut py_strategy_ref = py_strategy
372 .extract::<PyRefMut<PyStrategy>>()
373 .map_err(Into::<PyErr>::into)
374 .map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
375
376 py_strategy_ref
377 .register(trader_id, clock, cache, portfolio)
378 .map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
379
380 log::debug!(
381 "Internal PyStrategy registered: {}",
382 py_strategy_ref.is_registered()
383 );
384
385 Ok(())
386 })
387 .map_err(to_pyruntime_err)?;
388
389 Python::attach(|py| -> anyhow::Result<()> {
391 let py_strategy = python_strategy.bind(py);
392 let py_strategy_ref = py_strategy
393 .cast::<PyStrategy>()
394 .map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
395 py_strategy_ref.borrow().register_in_global_registries();
396 Ok(())
397 })
398 .map_err(to_pyruntime_err)?;
399
400 engine
401 .kernel_mut()
402 .trader
403 .borrow_mut()
404 .add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
405 .map_err(to_pyruntime_err)?;
406
407 log::info!("Registered Python strategy {strategy_id}");
408 Ok(())
409 }
410
411 #[pyo3(name = "add_native_strategy")]
416 fn py_add_native_strategy(
417 &mut self,
418 run_config_id: &str,
419 type_name: &str,
420 config: &Bound<'_, PyAny>,
421 ) -> PyResult<()> {
422 #[cfg(feature = "examples")]
423 {
424 let engine = self.get_engine_mut(run_config_id).ok_or_else(|| {
425 to_pyruntime_err(format!("No engine for run config '{run_config_id}'"))
426 })?;
427
428 let register = native_strategy_register(type_name).ok_or_else(|| {
429 to_pytype_err(format!("Unsupported native strategy type: {type_name}"))
430 })?;
431 register(engine, config)
432 }
433
434 #[cfg(not(feature = "examples"))]
435 {
436 let _ = (run_config_id, type_name, config);
437 Err(to_pyruntime_err(
438 "add_native_strategy requires the `examples` feature",
439 ))
440 }
441 }
442
443 fn __repr__(&self) -> String {
444 format!("{self:?}")
445 }
446}
447
448#[cfg(feature = "examples")]
449type NativeStrategyRegister = for<'py> fn(&mut BacktestEngine, &Bound<'py, PyAny>) -> PyResult<()>;
450
451#[cfg(feature = "examples")]
452fn native_strategy_register(type_name: &str) -> Option<NativeStrategyRegister> {
453 match type_name {
454 "CompositeMarketMaker" => Some(register_composite_market_maker),
455 "DeltaNeutralVol" => Some(register_delta_neutral_vol),
456 "EmaCross" => Some(register_ema_cross),
457 "GridMarketMaker" => Some(register_grid_market_maker),
458 "HurstVpinDirectional" => Some(register_hurst_vpin_directional),
459 _ => None,
460 }
461}
462
463#[cfg(feature = "examples")]
464fn register_composite_market_maker(
465 engine: &mut BacktestEngine,
466 config: &Bound<'_, PyAny>,
467) -> PyResult<()> {
468 let config = config.extract::<CompositeMarketMakerConfig>()?;
469 engine
470 .add_strategy(CompositeMarketMaker::new(config))
471 .map_err(to_pyruntime_err)
472}
473
474#[cfg(feature = "examples")]
475fn register_delta_neutral_vol(
476 engine: &mut BacktestEngine,
477 config: &Bound<'_, PyAny>,
478) -> PyResult<()> {
479 let config = config.extract::<DeltaNeutralVolConfig>()?;
480 engine
481 .add_strategy(DeltaNeutralVol::new(config))
482 .map_err(to_pyruntime_err)
483}
484
485#[cfg(feature = "examples")]
486fn register_ema_cross(engine: &mut BacktestEngine, config: &Bound<'_, PyAny>) -> PyResult<()> {
487 let config = config.extract::<EmaCrossConfig>()?;
488 engine
489 .add_strategy(EmaCross::from_config(config))
490 .map_err(to_pyruntime_err)
491}
492
493#[cfg(feature = "examples")]
494fn register_grid_market_maker(
495 engine: &mut BacktestEngine,
496 config: &Bound<'_, PyAny>,
497) -> PyResult<()> {
498 let config = config.extract::<GridMarketMakerConfig>()?;
499 engine
500 .add_strategy(GridMarketMaker::new(config))
501 .map_err(to_pyruntime_err)
502}
503
504#[cfg(feature = "examples")]
505fn register_hurst_vpin_directional(
506 engine: &mut BacktestEngine,
507 config: &Bound<'_, PyAny>,
508) -> PyResult<()> {
509 let config = config.extract::<HurstVpinDirectionalConfig>()?;
510 engine
511 .add_strategy(HurstVpinDirectional::new(config))
512 .map_err(to_pyruntime_err)
513}
514
515#[cfg(all(test, feature = "examples"))]
516mod tests {
517 use pyo3::{Python, types::PyDict};
518 use rstest::rstest;
519
520 use crate::{config::BacktestEngineConfig, engine::BacktestEngine};
521
522 #[rstest]
523 #[case("CompositeMarketMaker")]
524 #[case("DeltaNeutralVol")]
525 #[case("EmaCross")]
526 #[case("GridMarketMaker")]
527 #[case("HurstVpinDirectional")]
528 fn test_native_strategy_register_accepts_supported_names(#[case] type_name: &str) {
529 assert!(super::native_strategy_register(type_name).is_some());
530 }
531
532 #[rstest]
533 fn test_native_strategy_register_rejects_unknown_name() {
534 assert!(super::native_strategy_register("UnknownStrategy").is_none());
535 }
536
537 #[rstest]
538 fn test_native_strategy_register_rejects_mismatched_config() {
539 Python::initialize();
540
541 let mut engine = BacktestEngine::new(BacktestEngineConfig::default()).unwrap();
542 Python::attach(|py| {
543 let register = super::native_strategy_register("EmaCross").unwrap();
544 let config = PyDict::new(py);
545 let error = register(&mut engine, config.as_any()).unwrap_err();
546
547 assert!(error.is_instance_of::<pyo3::exceptions::PyTypeError>(py));
548 });
549 }
550}
551
552pub(crate) fn create_config_instance<'py>(
553 py: Python<'py>,
554 config_path: &str,
555 config: &HashMap<String, serde_json::Value>,
556) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
557 if config_path.is_empty() && config.is_empty() {
558 log::debug!("No config_path or empty config, using None");
559 return Ok(None);
560 }
561
562 let config_parts: Vec<&str> = config_path.split(':').collect();
563 if config_parts.len() != 2 {
564 anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
565 }
566 let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
567
568 log::debug!(
569 "Importing config class from module: {config_module_name} class: {config_class_name}"
570 );
571
572 let config_module = py
573 .import(config_module_name)
574 .map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
575 let config_class = config_module
576 .getattr(config_class_name)
577 .map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
578
579 let py_dict = PyDict::new(py);
581
582 for (key, value) in config {
583 let json_str = serde_json::to_string(value)
584 .map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
585 let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
586 py_dict.set_item(key, py_value)?;
587 }
588
589 log::debug!("Created config dict: {py_dict:?}");
590
591 let config_instance = match config_class.call((), Some(&py_dict)) {
593 Ok(instance) => {
594 log::debug!("Created config instance with kwargs");
595 instance
596 }
597 Err(kwargs_err) => {
598 log::debug!("Failed to create config with kwargs: {kwargs_err}");
599
600 match config_class.call0() {
601 Ok(instance) => {
602 log::debug!("Created default config instance, setting attributes");
603 for (key, value) in config {
604 let json_str = serde_json::to_string(value).map_err(|e| {
605 anyhow::anyhow!("Failed to serialize config value: {e}")
606 })?;
607 let py_value = PyModule::import(py, "json")?.call_method(
608 "loads",
609 (json_str,),
610 None,
611 )?;
612
613 if let Err(setattr_err) = instance.setattr(key, py_value) {
614 log::warn!("Failed to set attribute {key}: {setattr_err}");
615 }
616 }
617
618 if instance.hasattr("__post_init__")? {
621 instance.call_method0("__post_init__")?;
622 }
623
624 instance
625 }
626 Err(default_err) => {
627 anyhow::bail!(
628 "Failed to create config instance. \
629 Tried kwargs: {kwargs_err}, default: {default_err}"
630 );
631 }
632 }
633 }
634 };
635
636 log::debug!("Created config instance: {config_instance:?}");
637
638 Ok(Some(config_instance))
639}