pyridis_api/
io.rs

1use iridis_api::prelude::DataflowMessage;
2use pyo3::IntoPyObjectExt;
3
4use crate::prelude::{
5    thirdparty::{
6        arrow::{array::*, pyarrow::*},
7        pyo3::prelude::*,
8    },
9    *,
10};
11
12#[pyclass]
13pub struct Inputs(pub ird::Inputs);
14
15#[pymethods]
16impl Inputs {
17    pub async fn with_input(&mut self, input: String) -> PyResult<Input> {
18        let input = self.0.raw(input).await?;
19
20        Ok(Input(input))
21    }
22}
23
24#[pyclass]
25pub struct Outputs(pub ird::Outputs);
26
27#[pymethods]
28impl Outputs {
29    pub async fn with_output(&mut self, output: String) -> PyResult<Output> {
30        let output = self.0.raw(output).await?;
31
32        Ok(Output(output))
33    }
34}
35
36#[pyclass]
37pub struct Queries(pub ird::Queries);
38
39#[pymethods]
40impl Queries {
41    pub async fn with_query(&mut self, query: String) -> PyResult<Query> {
42        let query = self.0.raw(query).await?;
43
44        Ok(Query(query))
45    }
46}
47
48#[pyclass]
49pub struct Queryables(pub ird::Queryables);
50
51#[pymethods]
52impl Queryables {
53    pub async fn with_queryable(&mut self, queryable: String) -> PyResult<Queryable> {
54        let queryable = self.0.raw(queryable).await?;
55
56        Ok(Queryable(queryable))
57    }
58}
59
60#[pyclass]
61pub struct Header(pub ird::Header);
62
63#[pymethods]
64impl Header {
65    #[getter]
66    pub fn source_node(&self) -> String {
67        let (a, _) = self.0.source;
68        a.to_string()
69    }
70
71    #[getter]
72    pub fn source_io(&self) -> String {
73        let (_, b) = self.0.source;
74        b.to_string()
75    }
76
77    #[getter]
78    pub fn timestamp(&self, py: Python) -> PyResult<PyObject> {
79        self.0.timestamp.get_time().to_system_time().into_py_any(py)
80    }
81
82    #[getter]
83    pub fn elapsed(&self) -> u128 {
84        let elapsed = self
85            .0
86            .timestamp
87            .get_time()
88            .to_system_time()
89            .elapsed()
90            .unwrap_or_default()
91            .as_nanos();
92
93        elapsed
94    }
95}
96
97#[pyclass]
98pub struct PyDataflowMessage {
99    pub data: PyArrowType<ArrayData>,
100    pub header: Header,
101}
102
103#[pymethods]
104impl PyDataflowMessage {
105    #[getter]
106    pub fn data(&self) -> PyArrowType<ArrayData> {
107        let array = self.data.0.clone();
108
109        PyArrowType(array)
110    }
111
112    #[getter]
113    pub fn header(&self) -> Header {
114        let header = self.header.0.clone();
115
116        Header(header)
117    }
118}
119
120#[pyclass]
121pub struct Input(pub ird::RawInput);
122
123#[pymethods]
124impl Input {
125    pub async fn recv(&mut self) -> PyResult<PyDataflowMessage> {
126        let DataflowMessage { header, data } = self.0.recv().await?;
127
128        Ok(PyDataflowMessage {
129            data: PyArrowType(data),
130            header: Header(header),
131        })
132    }
133}
134
135#[pyclass]
136pub struct Output(pub ird::RawOutput);
137
138#[pymethods]
139impl Output {
140    pub async fn send(&mut self, data: PyArrowType<ArrayData>) -> PyResult<()> {
141        self.0.send(data.0).await?;
142
143        Ok(())
144    }
145}
146
147#[pyclass]
148pub struct Query(pub ird::RawQuery);
149
150#[pymethods]
151impl Query {
152    pub async fn query(&mut self, data: PyArrowType<ArrayData>) -> PyResult<PyDataflowMessage> {
153        let DataflowMessage { header, data } = self.0.query(data.0).await?;
154
155        Ok(PyDataflowMessage {
156            data: PyArrowType(data),
157            header: Header(header),
158        })
159    }
160}
161
162#[pyclass]
163pub struct Queryable(pub ird::RawQueryable);
164
165// TODO: should accept async python callbacks
166#[pymethods]
167impl Queryable {
168    pub async fn on_query(&mut self, response: PyObject) -> PyResult<()> {
169        self.0
170            .on_query(async |query: DataflowMessage| {
171                let DataflowMessage { header, data } = query;
172                let message = PyDataflowMessage {
173                    data: PyArrowType(data),
174                    header: Header(header),
175                };
176
177                let array = Python::with_gil(|py| -> PyResult<ArrayData> {
178                    let array = response.call1(py, (message,))?.into_bound(py);
179
180                    ArrayData::from_pyarrow_bound(&array)
181                })?;
182
183                Ok(array)
184            })
185            .await?;
186
187        Ok(())
188    }
189}