pyridis_api/
io.rs

1use iridis_api::prelude::DataflowMessage;
2
3use crate::prelude::{
4    thirdparty::{
5        arrow::{array::*, pyarrow::*},
6        pyo3::prelude::*,
7    },
8    *,
9};
10
11#[pyclass]
12pub struct Inputs(pub ird::Inputs);
13
14#[pymethods]
15impl Inputs {
16    pub async fn with_input(&mut self, input: String) -> PyResult<Input> {
17        let input = self.0.raw(input).await?;
18
19        Ok(Input(input))
20    }
21}
22
23#[pyclass]
24pub struct Outputs(pub ird::Outputs);
25
26#[pymethods]
27impl Outputs {
28    pub async fn with_output(&mut self, output: String) -> PyResult<Output> {
29        let output = self.0.raw(output).await?;
30
31        Ok(Output(output))
32    }
33}
34
35#[pyclass]
36pub struct Queries(pub ird::Queries);
37
38#[pymethods]
39impl Queries {
40    pub async fn with_query(&mut self, query: String) -> PyResult<Query> {
41        let query = self.0.raw(query).await?;
42
43        Ok(Query(query))
44    }
45}
46
47#[pyclass]
48pub struct Queryables(pub ird::Queryables);
49
50#[pymethods]
51impl Queryables {
52    pub async fn with_queryable(&mut self, queryable: String) -> PyResult<Queryable> {
53        let queryable = self.0.raw(queryable).await?;
54
55        Ok(Queryable(queryable))
56    }
57}
58
59#[pyclass]
60pub struct Header(pub ird::Header);
61
62#[pymethods]
63impl Header {
64    #[getter]
65    pub fn source_node(&self) -> String {
66        let (a, _) = self.0.source;
67        a.to_string()
68    }
69
70    #[getter]
71    pub fn source_io(&self) -> String {
72        let (_, b) = self.0.source;
73        b.to_string()
74    }
75
76    #[getter]
77    pub fn elapsed(&self) -> u128 {
78        let elapsed = self
79            .0
80            .timestamp
81            .get_time()
82            .to_system_time()
83            .elapsed()
84            .unwrap_or_default()
85            .as_nanos();
86
87        elapsed
88    }
89}
90
91#[pyclass]
92pub struct PyDataflowMessage {
93    pub data: PyArrowType<ArrayData>,
94    pub header: Header,
95}
96
97#[pymethods]
98impl PyDataflowMessage {
99    #[getter]
100    pub fn data(&self) -> PyArrowType<ArrayData> {
101        let array = self.data.0.clone();
102
103        PyArrowType(array)
104    }
105
106    #[getter]
107    pub fn header(&self) -> Header {
108        let header = self.header.0.clone();
109
110        Header(header)
111    }
112}
113
114#[pyclass]
115pub struct Input(pub ird::RawInput);
116
117#[pymethods]
118impl Input {
119    pub async fn recv(&mut self) -> PyResult<PyDataflowMessage> {
120        let DataflowMessage { header, data } = self.0.recv().await?;
121
122        Ok(PyDataflowMessage {
123            data: PyArrowType(data),
124            header: Header(header),
125        })
126    }
127}
128
129#[pyclass]
130pub struct Output(pub ird::RawOutput);
131
132#[pymethods]
133impl Output {
134    pub async fn send(&mut self, data: PyArrowType<ArrayData>) -> PyResult<()> {
135        self.0.send(data.0).await?;
136
137        Ok(())
138    }
139}
140
141#[pyclass]
142pub struct Query(pub ird::RawQuery);
143
144#[pymethods]
145impl Query {
146    pub async fn query(&mut self, data: PyArrowType<ArrayData>) -> PyResult<PyDataflowMessage> {
147        let DataflowMessage { header, data } = self.0.query(data.0).await?;
148
149        Ok(PyDataflowMessage {
150            data: PyArrowType(data),
151            header: Header(header),
152        })
153    }
154}
155
156#[pyclass]
157pub struct Queryable(pub ird::RawQueryable);
158
159// TODO: should accept async python callbacks
160#[pymethods]
161impl Queryable {
162    pub async fn on_query(&mut self, response: PyObject) -> PyResult<()> {
163        self.0
164            .on_query(async |query: DataflowMessage| {
165                let DataflowMessage { header, data } = query;
166                let message = PyDataflowMessage {
167                    data: PyArrowType(data),
168                    header: Header(header),
169                };
170
171                let array = Python::with_gil(|py| -> PyResult<ArrayData> {
172                    let array = response.call1(py, (message,))?.into_bound(py);
173
174                    ArrayData::from_pyarrow_bound(&array)
175                })?;
176
177                Ok(array)
178            })
179            .await?;
180
181        Ok(())
182    }
183}