use std::{cell::RefCell, collections::HashMap, rc::Rc};
use nautilus_common::{
actor::data_actor::ImportableActorConfig, enums::Environment, live::get_runtime,
logging::logger::LoggerConfig, python::actor::PyDataActor,
};
use nautilus_core::{
UUID4,
python::{to_pyruntime_err, to_pyvalue_err},
};
use nautilus_model::identifiers::{ActorId, ComponentId, ExecAlgorithmId, StrategyId, TraderId};
use nautilus_system::get_global_pyo3_registry;
use nautilus_trading::{
ImportableExecAlgorithmConfig, ImportableStrategyConfig,
python::strategy::{PyStrategy, PyStrategyInner},
};
use pyo3::{
prelude::*,
types::{PyDict, PyTuple},
};
use serde_json;
use crate::{builder::LiveNodeBuilder, node::LiveNode};
#[pyo3_stub_gen::derive::gen_stub_pymethods]
#[pymethods]
impl LiveNode {
#[staticmethod]
#[pyo3(name = "builder")]
fn py_builder(
name: String,
trader_id: TraderId,
environment: Environment,
) -> PyResult<LiveNodeBuilderPy> {
match Self::builder(trader_id, environment) {
Ok(builder) => Ok(LiveNodeBuilderPy {
inner: Rc::new(RefCell::new(Some(builder.with_name(name)))),
}),
Err(e) => Err(to_pyruntime_err(e)),
}
}
#[getter]
#[pyo3(name = "environment")]
fn py_environment(&self) -> Environment {
self.environment()
}
#[getter]
#[pyo3(name = "trader_id")]
fn py_trader_id(&self) -> TraderId {
self.trader_id()
}
#[getter]
#[pyo3(name = "instance_id")]
const fn py_instance_id(&self) -> UUID4 {
self.instance_id()
}
#[getter]
#[pyo3(name = "is_running")]
fn py_is_running(&self) -> bool {
self.is_running()
}
#[pyo3(name = "start")]
fn py_start(&mut self) -> PyResult<()> {
if self.is_running() {
return Err(to_pyruntime_err("LiveNode is already running"));
}
get_runtime().block_on(async { self.start().await.map_err(to_pyruntime_err) })
}
#[pyo3(name = "run")]
fn py_run(&mut self, py: Python) -> PyResult<()> {
if self.is_running() {
return Err(to_pyruntime_err("LiveNode is already running"));
}
let handle = self.handle();
let signal_module = py.import("signal")?;
let original_handler =
signal_module.call_method1("signal", (2, signal_module.getattr("SIG_DFL")?))?;
let handle_for_signal = handle;
let signal_callback = pyo3::types::PyCFunction::new_closure(
py,
None,
None,
move |_args: &pyo3::Bound<'_, PyTuple>,
_kwargs: Option<&pyo3::Bound<'_, PyDict>>|
-> PyResult<()> {
log::info!("Python signal handler called");
handle_for_signal.stop();
Ok(())
},
)?;
signal_module.call_method1("signal", (2, signal_callback))?;
let result =
{ get_runtime().block_on(async { self.run().await.map_err(to_pyruntime_err) }) };
signal_module.call_method1("signal", (2, original_handler))?;
result
}
#[pyo3(name = "stop")]
fn py_stop(&self) -> PyResult<()> {
if !self.is_running() {
return Err(to_pyruntime_err("LiveNode is not running"));
}
self.handle().stop();
Ok(())
}
#[allow(
unsafe_code,
reason = "Required for Python actor component registration"
)]
#[pyo3(name = "add_actor_from_config")]
#[allow(clippy::needless_pass_by_value)]
fn py_add_actor_from_config(
&mut self,
_py: Python,
config: ImportableActorConfig,
) -> PyResult<()> {
log::debug!("`add_actor_from_config` with: {config:?}");
let parts: Vec<&str> = config.actor_path.split(':').collect();
if parts.len() != 2 {
return Err(to_pyvalue_err(
"actor_path must be in format 'module.path:ClassName'",
));
}
let (module_name, class_name) = (parts[0], parts[1]);
log::info!("Importing actor from module: {module_name} class: {class_name}");
let (python_actor, actor_id) =
Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
let actor_module = py
.import(module_name)
.map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
let actor_class = actor_module
.getattr(class_name)
.map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
let config_instance =
create_config_instance(py, &config.config_path, &config.config)?;
let python_actor = if let Some(config_obj) = config_instance.clone() {
actor_class.call1((config_obj,))?
} else {
actor_class.call0()?
};
log::debug!("Created Python actor instance: {python_actor:?}");
let mut py_data_actor_ref = python_actor
.extract::<PyRefMut<PyDataActor>>()
.map_err(Into::<PyErr>::into)
.map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
if let Some(config_obj) = config_instance.as_ref() {
if let Ok(actor_id) = config_obj.getattr("actor_id")
&& !actor_id.is_none()
{
let actor_id_val = if let Ok(aid) = actor_id.extract::<ActorId>() {
aid
} else if let Ok(aid_str) = actor_id.extract::<String>() {
ActorId::new_checked(&aid_str)?
} else {
anyhow::bail!("Invalid `actor_id` type");
};
py_data_actor_ref.set_actor_id(actor_id_val);
}
if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
py_data_actor_ref.set_log_events(val);
}
if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
py_data_actor_ref.set_log_commands(val);
}
}
py_data_actor_ref.set_python_instance(python_actor.clone().unbind());
let actor_id = py_data_actor_ref.actor_id();
Ok((python_actor.unbind(), actor_id))
})
.map_err(to_pyruntime_err)?;
if self
.kernel()
.trader
.borrow()
.actor_ids()
.contains(&actor_id)
{
return Err(to_pyruntime_err(format!(
"Actor '{actor_id}' is already registered"
)));
}
let trader_id = self.kernel().trader_id();
let cache = self.kernel().cache();
let component_id = ComponentId::new(actor_id.inner().as_str());
let clock = self
.kernel_mut()
.trader
.borrow_mut()
.create_component_clock(component_id);
Python::attach(|py| -> anyhow::Result<()> {
let py_actor = python_actor.bind(py);
let mut py_data_actor_ref = py_actor
.extract::<PyRefMut<PyDataActor>>()
.map_err(Into::<PyErr>::into)
.map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
py_data_actor_ref
.register(trader_id, clock, cache)
.map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
log::debug!(
"Internal PyDataActor registered: {}, state: {:?}",
py_data_actor_ref.is_registered(),
py_data_actor_ref.state()
);
Ok(())
})
.map_err(to_pyruntime_err)?;
Python::attach(|py| -> anyhow::Result<()> {
let py_actor = python_actor.bind(py);
let py_data_actor_ref = py_actor
.cast::<PyDataActor>()
.map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
py_data_actor_ref.borrow().register_in_global_registries();
Ok(())
})
.map_err(to_pyruntime_err)?;
self.kernel_mut()
.trader
.borrow_mut()
.add_actor_id_for_lifecycle(actor_id)
.map_err(to_pyruntime_err)?;
log::info!("Registered Python actor {actor_id}");
Ok(())
}
#[allow(
unsafe_code,
reason = "Required for Python strategy component registration"
)]
#[pyo3(name = "add_strategy_from_config")]
#[allow(clippy::needless_pass_by_value)]
fn py_add_strategy_from_config(
&mut self,
_py: Python,
config: ImportableStrategyConfig,
) -> PyResult<()> {
log::debug!("`add_strategy_from_config` with: {config:?}");
let parts: Vec<&str> = config.strategy_path.split(':').collect();
if parts.len() != 2 {
return Err(to_pyvalue_err(
"strategy_path must be in format 'module.path:ClassName'",
));
}
let (module_name, class_name) = (parts[0], parts[1]);
log::info!("Importing strategy from module: {module_name} class: {class_name}");
let (python_strategy, strategy_id) =
Python::attach(|py| -> anyhow::Result<(Py<PyAny>, StrategyId)> {
let strategy_module = py
.import(module_name)
.map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
let strategy_class = strategy_module
.getattr(class_name)
.map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
let config_instance =
create_config_instance(py, &config.config_path, &config.config)?;
let python_strategy = if let Some(config_obj) = config_instance.clone() {
strategy_class.call1((config_obj,))?
} else {
strategy_class.call0()?
};
log::debug!("Created Python strategy instance: {python_strategy:?}");
let mut py_strategy_ref = python_strategy
.extract::<PyRefMut<PyStrategy>>()
.map_err(Into::<PyErr>::into)
.map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
if let Some(config_obj) = config_instance.as_ref() {
if let Ok(strategy_id) = config_obj.getattr("strategy_id")
&& !strategy_id.is_none()
{
let strategy_id_val = if let Ok(sid) = strategy_id.extract::<StrategyId>() {
sid
} else if let Ok(sid_str) = strategy_id.extract::<String>() {
StrategyId::new_checked(&sid_str)?
} else {
anyhow::bail!("Invalid `strategy_id` type");
};
py_strategy_ref.set_strategy_id(strategy_id_val);
}
if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
py_strategy_ref.set_log_events(val);
}
if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
py_strategy_ref.set_log_commands(val);
}
}
py_strategy_ref.set_python_instance(python_strategy.clone().unbind());
let strategy_id = py_strategy_ref.strategy_id();
Ok((python_strategy.unbind(), strategy_id))
})
.map_err(to_pyruntime_err)?;
if self
.kernel()
.trader
.borrow()
.strategy_ids()
.contains(&strategy_id)
{
return Err(to_pyruntime_err(format!(
"Strategy '{strategy_id}' is already registered"
)));
}
let trader_id = self.kernel().trader_id();
let cache = self.kernel().cache();
let portfolio = self.kernel().portfolio.clone();
let component_id = ComponentId::new(strategy_id.inner().as_str());
let clock = self
.kernel_mut()
.trader
.borrow_mut()
.create_component_clock(component_id);
Python::attach(|py| -> anyhow::Result<()> {
let py_strategy = python_strategy.bind(py);
let mut py_strategy_ref = py_strategy
.extract::<PyRefMut<PyStrategy>>()
.map_err(Into::<PyErr>::into)
.map_err(|e| anyhow::anyhow!("Failed to extract PyStrategy: {e}"))?;
py_strategy_ref
.register(trader_id, clock, cache, portfolio)
.map_err(|e| anyhow::anyhow!("Failed to register PyStrategy: {e}"))?;
log::debug!(
"Internal PyStrategy registered: {}",
py_strategy_ref.is_registered()
);
Ok(())
})
.map_err(to_pyruntime_err)?;
Python::attach(|py| -> anyhow::Result<()> {
let py_strategy = python_strategy.bind(py);
let py_strategy_ref = py_strategy
.cast::<PyStrategy>()
.map_err(|e| anyhow::anyhow!("Failed to downcast to PyStrategy: {e}"))?;
py_strategy_ref.borrow().register_in_global_registries();
Ok(())
})
.map_err(to_pyruntime_err)?;
Python::attach(|py| {
let py_strategy = python_strategy.bind(py);
if let Ok(claims) = py_strategy.getattr("external_order_claims")
&& !claims.is_none()
&& claims.len().unwrap_or(0) > 0
{
log::warn!(
"Strategy '{strategy_id}' has external_order_claims configured, \
but these are not yet supported for Python strategies on the Rust backend"
);
}
});
self.kernel_mut()
.trader
.borrow_mut()
.add_strategy_id_with_subscriptions::<PyStrategyInner>(strategy_id)
.map_err(to_pyruntime_err)?;
log::info!("Registered Python strategy {strategy_id}");
Ok(())
}
#[allow(
unsafe_code,
reason = "Required for Python exec algorithm component registration"
)]
#[pyo3(name = "add_exec_algorithm_from_config")]
#[allow(clippy::needless_pass_by_value)]
fn py_add_exec_algorithm_from_config(
&mut self,
_py: Python,
config: ImportableExecAlgorithmConfig,
) -> PyResult<()> {
if self.is_running() {
return Err(to_pyruntime_err(
"Cannot add exec algorithm while node is running",
));
}
log::debug!("`add_exec_algorithm_from_config` with: {config:?}");
let parts: Vec<&str> = config.exec_algorithm_path.split(':').collect();
if parts.len() != 2 {
return Err(to_pyvalue_err(
"exec_algorithm_path must be in format 'module.path:ClassName'",
));
}
let (module_name, class_name) = (parts[0], parts[1]);
log::info!("Importing exec algorithm from module: {module_name} class: {class_name}");
let (python_exec_algorithm, actor_id) =
Python::attach(|py| -> anyhow::Result<(Py<PyAny>, ActorId)> {
let algo_module = py
.import(module_name)
.map_err(|e| anyhow::anyhow!("Failed to import module {module_name}: {e}"))?;
let algo_class = algo_module
.getattr(class_name)
.map_err(|e| anyhow::anyhow!("Failed to get class {class_name}: {e}"))?;
let config_instance =
create_config_instance(py, &config.config_path, &config.config)?;
let python_exec_algorithm = if let Some(config_obj) = config_instance.clone() {
algo_class.call1((config_obj,))?
} else {
algo_class.call0()?
};
log::debug!("Created Python exec algorithm instance: {python_exec_algorithm:?}");
let mut py_data_actor_ref = python_exec_algorithm
.extract::<PyRefMut<PyDataActor>>()
.map_err(Into::<PyErr>::into)
.map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
if let Some(config_obj) = config_instance.as_ref() {
let id_attr = config_obj
.getattr("exec_algorithm_id")
.ok()
.filter(|v| !v.is_none())
.or_else(|| config_obj.getattr("actor_id").ok().filter(|v| !v.is_none()));
if let Some(id_value) = id_attr {
let actor_id_val = if let Ok(eaid) = id_value.extract::<ExecAlgorithmId>() {
ActorId::new(eaid.inner().as_str())
} else if let Ok(aid) = id_value.extract::<ActorId>() {
aid
} else if let Ok(aid_str) = id_value.extract::<String>() {
ActorId::new_checked(&aid_str)?
} else {
anyhow::bail!("Invalid `exec_algorithm_id`/`actor_id` type");
};
py_data_actor_ref.set_actor_id(actor_id_val);
}
if let Some(val) = extract_bool_config_attr(config_obj, "log_events") {
py_data_actor_ref.set_log_events(val);
}
if let Some(val) = extract_bool_config_attr(config_obj, "log_commands") {
py_data_actor_ref.set_log_commands(val);
}
}
py_data_actor_ref.set_python_instance(python_exec_algorithm.clone().unbind());
let actor_id = py_data_actor_ref.actor_id();
Ok((python_exec_algorithm.unbind(), actor_id))
})
.map_err(to_pyruntime_err)?;
let exec_algorithm_id = ExecAlgorithmId::from(actor_id.inner().as_str());
if self
.kernel()
.trader
.borrow()
.exec_algorithm_ids()
.contains(&exec_algorithm_id)
{
return Err(to_pyruntime_err(format!(
"Execution algorithm '{exec_algorithm_id}' is already registered"
)));
}
let trader_id = self.kernel().trader_id();
let cache = self.kernel().cache();
let component_id = ComponentId::new(actor_id.inner().as_str());
let clock = self
.kernel_mut()
.trader
.borrow_mut()
.create_component_clock(component_id);
Python::attach(|py| -> anyhow::Result<()> {
let py_algo = python_exec_algorithm.bind(py);
let mut py_data_actor_ref = py_algo
.extract::<PyRefMut<PyDataActor>>()
.map_err(Into::<PyErr>::into)
.map_err(|e| anyhow::anyhow!("Failed to extract PyDataActor: {e}"))?;
py_data_actor_ref
.register(trader_id, clock, cache)
.map_err(|e| anyhow::anyhow!("Failed to register PyDataActor: {e}"))?;
log::debug!(
"Internal PyDataActor registered: {}, state: {:?}",
py_data_actor_ref.is_registered(),
py_data_actor_ref.state()
);
Ok(())
})
.map_err(to_pyruntime_err)?;
Python::attach(|py| -> anyhow::Result<()> {
let py_algo = python_exec_algorithm.bind(py);
let py_data_actor_ref = py_algo
.cast::<PyDataActor>()
.map_err(|e| anyhow::anyhow!("Failed to downcast to PyDataActor: {e}"))?;
py_data_actor_ref.borrow().register_in_global_registries();
Ok(())
})
.map_err(to_pyruntime_err)?;
self.kernel_mut()
.trader
.borrow_mut()
.add_exec_algorithm_id_for_lifecycle(exec_algorithm_id)
.map_err(to_pyruntime_err)?;
log::info!("Registered Python exec algorithm {exec_algorithm_id}");
Ok(())
}
fn __repr__(&self) -> String {
format!(
"LiveNode(trader_id={}, environment={:?}, running={})",
self.trader_id(),
self.environment(),
self.is_running()
)
}
}
#[derive(Debug)]
#[pyclass(name = "LiveNodeBuilder", module = "nautilus_trader.live", unsendable)]
#[pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.live")]
pub struct LiveNodeBuilderPy {
inner: Rc<RefCell<Option<LiveNodeBuilder>>>,
}
#[pyo3_stub_gen::derive::gen_stub_pymethods]
#[pymethods]
impl LiveNodeBuilderPy {
#[pyo3(name = "with_instance_id")]
fn py_with_instance_id(&self, instance_id: UUID4) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_instance_id(instance_id));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_load_state")]
fn py_with_load_state(&self, load_state: bool) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_load_state(load_state));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_save_state")]
fn py_with_save_state(&self, save_state: bool) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_save_state(save_state));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_timeout_connection")]
fn py_with_timeout_connection(&self, timeout_secs: u64) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_timeout_connection(timeout_secs));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_timeout_reconciliation")]
fn py_with_timeout_reconciliation(&self, timeout_secs: u64) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_timeout_reconciliation(timeout_secs));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_timeout_portfolio")]
fn py_with_timeout_portfolio(&self, timeout_secs: u64) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_timeout_portfolio(timeout_secs));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_timeout_disconnection_secs")]
fn py_with_timeout_disconnection_secs(&self, timeout_secs: u64) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_timeout_disconnection_secs(timeout_secs));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_delay_post_stop_secs")]
fn py_with_delay_post_stop_secs(&self, delay_secs: u64) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_delay_post_stop_secs(delay_secs));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_delay_shutdown_secs")]
fn py_with_delay_shutdown_secs(&self, delay_secs: u64) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_delay_shutdown_secs(delay_secs));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_reconciliation")]
fn py_with_reconciliation(&self, reconciliation: bool) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_reconciliation(reconciliation));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_reconciliation_lookback_mins")]
fn py_with_reconciliation_lookback_mins(&self, mins: u32) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_reconciliation_lookback_mins(mins));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "with_logging")]
fn py_with_logging(&self, logging: LoggerConfig) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
*inner_ref = Some(builder.with_logging(logging));
Ok(Self {
inner: self.inner.clone(),
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "add_data_client")]
#[allow(clippy::needless_pass_by_value)]
fn py_add_data_client(
&self,
name: Option<String>,
factory: Py<PyAny>,
config: Py<PyAny>,
) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
Python::attach(|py| -> PyResult<Self> {
let registry = get_global_pyo3_registry();
let boxed_factory = registry.extract_factory(py, factory.clone_ref(py))?;
let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
let factory_name = factory
.getattr(py, "name")?
.call0(py)?
.extract::<String>(py)?;
let client_name = name.unwrap_or(factory_name);
match builder.add_data_client(Some(client_name), boxed_factory, boxed_config) {
Ok(updated_builder) => {
*inner_ref = Some(updated_builder);
Ok(Self {
inner: self.inner.clone(),
})
}
Err(e) => Err(to_pyruntime_err(format!("Failed to add data client: {e}"))),
}
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "add_exec_client")]
#[allow(clippy::needless_pass_by_value)]
fn py_add_exec_client(
&self,
name: Option<String>,
factory: Py<PyAny>,
config: Py<PyAny>,
) -> PyResult<Self> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
Python::attach(|py| -> PyResult<Self> {
let registry = get_global_pyo3_registry();
let boxed_factory = registry.extract_exec_factory(py, factory.clone_ref(py))?;
let boxed_config = registry.extract_config(py, config.clone_ref(py))?;
let factory_name = factory
.getattr(py, "name")?
.call0(py)?
.extract::<String>(py)?;
let client_name = name.unwrap_or(factory_name);
match builder.add_exec_client(Some(client_name), boxed_factory, boxed_config) {
Ok(updated_builder) => {
*inner_ref = Some(updated_builder);
Ok(Self {
inner: self.inner.clone(),
})
}
Err(e) => Err(to_pyruntime_err(format!("Failed to add exec client: {e}"))),
}
})
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
#[pyo3(name = "build")]
fn py_build(&self) -> PyResult<LiveNode> {
let mut inner_ref = self.inner.borrow_mut();
if let Some(builder) = inner_ref.take() {
match builder.build() {
Ok(node) => Ok(node),
Err(e) => Err(to_pyruntime_err(e)),
}
} else {
Err(to_pyruntime_err("Builder already consumed"))
}
}
fn __repr__(&self) -> String {
format!("{self:?}")
}
}
fn create_config_instance<'py>(
py: Python<'py>,
config_path: &str,
config: &HashMap<String, serde_json::Value>,
) -> anyhow::Result<Option<Bound<'py, PyAny>>> {
if config_path.is_empty() && config.is_empty() {
log::debug!("No config_path or empty config, using None");
return Ok(None);
}
let config_parts: Vec<&str> = config_path.split(':').collect();
if config_parts.len() != 2 {
anyhow::bail!("config_path must be in format 'module.path:ClassName', was {config_path}");
}
let (config_module_name, config_class_name) = (config_parts[0], config_parts[1]);
log::debug!(
"Importing config class from module: {config_module_name} class: {config_class_name}"
);
let config_module = py
.import(config_module_name)
.map_err(|e| anyhow::anyhow!("Failed to import config module {config_module_name}: {e}"))?;
let config_class = config_module
.getattr(config_class_name)
.map_err(|e| anyhow::anyhow!("Failed to get config class {config_class_name}: {e}"))?;
let py_dict = PyDict::new(py);
for (key, value) in config {
let json_str = serde_json::to_string(value)
.map_err(|e| anyhow::anyhow!("Failed to serialize config value: {e}"))?;
let py_value = PyModule::import(py, "json")?.call_method("loads", (json_str,), None)?;
py_dict.set_item(key, py_value)?;
}
log::debug!("Created config dict: {py_dict:?}");
let config_instance = match config_class.call((), Some(&py_dict)) {
Ok(instance) => {
log::debug!("Created config instance with kwargs");
instance
}
Err(kwargs_err) => {
log::debug!("Failed to create config with kwargs: {kwargs_err}");
match config_class.call0() {
Ok(instance) => {
log::debug!("Created default config instance, setting attributes");
for (key, value) in config {
let json_str = serde_json::to_string(value).map_err(|e| {
anyhow::anyhow!("Failed to serialize config value: {e}")
})?;
let py_value = PyModule::import(py, "json")?.call_method(
"loads",
(json_str,),
None,
)?;
if let Err(setattr_err) = instance.setattr(key, py_value) {
log::warn!("Failed to set attribute {key}: {setattr_err}");
}
}
if instance.hasattr("__post_init__")? {
instance.call_method0("__post_init__")?;
}
instance
}
Err(default_err) => {
anyhow::bail!(
"Failed to create config instance. \
Tried kwargs: {kwargs_err}, default: {default_err}"
);
}
}
}
};
log::debug!("Created config instance: {config_instance:?}");
Ok(Some(config_instance))
}
fn extract_bool_config_attr(config_obj: &Bound<'_, PyAny>, attr: &str) -> Option<bool> {
config_obj
.getattr(attr)
.ok()
.and_then(|val| val.extract::<bool>().ok())
}