pyridis_node/
node.rs

1use std::ffi::CString;
2
3use crate::prelude::{
4    thirdparty::{
5        ird::thirdparty::*,
6        pyo3::{ffi::c_str, prelude::*, types::*},
7    },
8    *,
9};
10
11#[derive(ird::Node)]
12pub struct PythonNode {
13    pub instance: PyObject,
14}
15
16impl ird::Node for PythonNode {
17    fn new(
18        inputs: ird::Inputs,
19        outputs: ird::Outputs,
20        queries: ird::Queries,
21        queryables: ird::Queryables,
22        configuration: serde_yml::Value,
23    ) -> tokio::task::JoinHandle<Result<Box<dyn ird::Node>>> {
24        pyo3::prepare_freethreaded_python();
25
26        let mut builder = tokio::runtime::Builder::new_current_thread();
27        builder.enable_all();
28
29        pyo3_async_runtimes::tokio::init(builder);
30
31        std::thread::spawn(|| {
32            pyo3_async_runtimes::tokio::get_runtime()
33                .block_on(pyo3_async_runtimes::tokio::re_exports::pending::<()>())
34        });
35
36        pyo3_async_runtimes::tokio::get_runtime().spawn_blocking(move || {
37            Python::with_gil(|py| {
38                pyo3_async_runtimes::tokio::run(py, async move {
39                    let file_path = configuration
40                        .get("python_file_path")
41                        .ok_or_eyre("Cannot find python file path inside configuration")?
42                        .as_str();
43
44                    let file_path = file_path
45                        .ok_or_eyre(format!("Invalid python file path: '{:?}'", file_path))?;
46
47                    let py_script = tokio::fs::read_to_string(file_path)
48                        .await
49                        .wrap_err(format!("Couldn't read path '{}'", file_path))?;
50
51                    let module: PyObject = Python::with_gil(|py| -> Result<PyObject> {
52                        Ok(PyModule::from_code(
53                            py,
54                            CString::new(py_script)?.as_c_str(),
55                            CString::new(file_path)?.as_c_str(),
56                            c_str!("pyridis_node"),
57                        )?
58                        .into())
59                    })?;
60
61                    let instance: PyObject = Python::with_gil(|py| -> PyResult<PyObject> {
62                        let class = module.call_method0(py, "pyridis_node")?;
63
64                        Ok(class.call0(py)?.into())
65                    })?;
66
67                    let configuration: PyObject = Python::with_gil(|py| -> PyResult<PyObject> {
68                        // TODO convert 'configuration' to a Python dictionary
69                        Ok(PyDict::new(py).into())
70                    })?;
71
72                    let inputs = Inputs(inputs);
73                    let outputs = Outputs(outputs);
74                    let queries = Queries(queries);
75                    let queryables = Queryables(queryables);
76
77                    let fut = Python::with_gil(|py| {
78                        pyo3_async_runtimes::tokio::into_future(
79                            instance
80                                .call_method1(
81                                    py,
82                                    "new",
83                                    (inputs, outputs, queries, queryables, configuration),
84                                )?
85                                .into_bound(py),
86                        )
87                    })?;
88
89                    fut.await
90                        .wrap_err("Couldn't await for the instance call to 'new'")?;
91
92                    Ok(Box::new(Self { instance }) as Box<dyn ird::Node>)
93                })
94                .map_err(|e| {
95                    e.print_and_set_sys_last_vars(py);
96
97                    eyre::eyre!(e)
98                })
99            })
100        })
101    }
102
103    fn start(self: Box<Self>) -> tokio::task::JoinHandle<Result<()>> {
104        pyo3_async_runtimes::tokio::get_runtime().spawn_blocking(move || {
105            Python::with_gil(|py| {
106                pyo3_async_runtimes::tokio::run(py, async move {
107                    {
108                        let fut = Python::with_gil(|py| {
109                            pyo3_async_runtimes::tokio::into_future(
110                                self.instance.call_method0(py, "start")?.into_bound(py),
111                            )
112                        })?;
113
114                        fut.await
115                            .wrap_err("Couldn't await for the instance call to 'start'")?;
116
117                        Ok(())
118                    }
119                })
120                .map_err(|e| {
121                    e.print_and_set_sys_last_vars(py);
122
123                    eyre::eyre!(e)
124                })
125            })
126        })
127    }
128}