1use std::ffi::CString;
2
3use crate::prelude::{
4 thirdparty::{
5 ird::thirdparty::*,
6 pyo3::{ffi::c_str, prelude::*, types::*},
7 },
8 *,
9};
10
11#[derive(ird::Node)]
12pub struct PythonNode {
13 pub instance: PyObject,
14}
15
16impl ird::Node for PythonNode {
17 fn new(
18 inputs: ird::Inputs,
19 outputs: ird::Outputs,
20 queries: ird::Queries,
21 queryables: ird::Queryables,
22 configuration: serde_yml::Value,
23 ) -> tokio::task::JoinHandle<Result<Box<dyn ird::Node>>> {
24 pyo3::prepare_freethreaded_python();
25
26 let mut builder = tokio::runtime::Builder::new_current_thread();
27 builder.enable_all();
28
29 pyo3_async_runtimes::tokio::init(builder);
30
31 std::thread::spawn(|| {
32 pyo3_async_runtimes::tokio::get_runtime()
33 .block_on(pyo3_async_runtimes::tokio::re_exports::pending::<()>())
34 });
35
36 pyo3_async_runtimes::tokio::get_runtime().spawn_blocking(move || {
37 Python::with_gil(|py| {
38 pyo3_async_runtimes::tokio::run(py, async move {
39 let file_path = configuration
40 .get("python_file_path")
41 .ok_or_eyre("Cannot find python file path inside configuration")?
42 .as_str();
43
44 let file_path = file_path
45 .ok_or_eyre(format!("Invalid python file path: '{:?}'", file_path))?;
46
47 let py_script = tokio::fs::read_to_string(file_path)
48 .await
49 .wrap_err(format!("Couldn't read path '{}'", file_path))?;
50
51 let module: PyObject = Python::with_gil(|py| -> Result<PyObject> {
52 Ok(PyModule::from_code(
53 py,
54 CString::new(py_script)?.as_c_str(),
55 CString::new(file_path)?.as_c_str(),
56 c_str!("pyridis_node"),
57 )?
58 .into())
59 })?;
60
61 let instance: PyObject = Python::with_gil(|py| -> PyResult<PyObject> {
62 let class = module.call_method0(py, "pyridis_node")?;
63
64 Ok(class.call0(py)?.into())
65 })?;
66
67 let configuration: PyObject = Python::with_gil(|py| -> PyResult<PyObject> {
68 Ok(PyDict::new(py).into())
70 })?;
71
72 let inputs = Inputs(inputs);
73 let outputs = Outputs(outputs);
74 let queries = Queries(queries);
75 let queryables = Queryables(queryables);
76
77 let fut = Python::with_gil(|py| {
78 pyo3_async_runtimes::tokio::into_future(
79 instance
80 .call_method1(
81 py,
82 "new",
83 (inputs, outputs, queries, queryables, configuration),
84 )?
85 .into_bound(py),
86 )
87 })?;
88
89 fut.await
90 .wrap_err("Couldn't await for the instance call to 'new'")?;
91
92 Ok(Box::new(Self { instance }) as Box<dyn ird::Node>)
93 })
94 .map_err(|e| {
95 e.print_and_set_sys_last_vars(py);
96
97 eyre::eyre!(e)
98 })
99 })
100 })
101 }
102
103 fn start(self: Box<Self>) -> tokio::task::JoinHandle<Result<()>> {
104 pyo3_async_runtimes::tokio::get_runtime().spawn_blocking(move || {
105 Python::with_gil(|py| {
106 pyo3_async_runtimes::tokio::run(py, async move {
107 {
108 let fut = Python::with_gil(|py| {
109 pyo3_async_runtimes::tokio::into_future(
110 self.instance.call_method0(py, "start")?.into_bound(py),
111 )
112 })?;
113
114 fut.await
115 .wrap_err("Couldn't await for the instance call to 'start'")?;
116
117 Ok(())
118 }
119 })
120 .map_err(|e| {
121 e.print_and_set_sys_last_vars(py);
122
123 eyre::eyre!(e)
124 })
125 })
126 })
127 }
128}