pyridis_api/
io.rs

1use iridis_api::prelude::DataflowMessage;
2
3use crate::prelude::{
4    thirdparty::{
5        arrow::{array::*, pyarrow::*},
6        pyo3::prelude::*,
7    },
8    *,
9};
10
11#[pyclass]
12pub struct Inputs(pub ird::Inputs);
13
14#[pymethods]
15impl Inputs {
16    pub async fn with_input(&mut self, input: String) -> PyResult<Input> {
17        let input = self.0.raw(input).await?;
18
19        Ok(Input(input))
20    }
21}
22
23#[pyclass]
24pub struct Outputs(pub ird::Outputs);
25
26#[pymethods]
27impl Outputs {
28    pub async fn with_output(&mut self, output: String) -> PyResult<Output> {
29        let output = self.0.raw(output).await?;
30
31        Ok(Output(output))
32    }
33}
34
35#[pyclass]
36pub struct Queries(pub ird::Queries);
37
38#[pymethods]
39impl Queries {
40    pub async fn with_query(&mut self, query: String) -> PyResult<Query> {
41        let query = self.0.raw(query).await?;
42
43        Ok(Query(query))
44    }
45}
46
47#[pyclass]
48pub struct Queryables(pub ird::Queryables);
49
50#[pymethods]
51impl Queryables {
52    pub async fn with_queryable(&mut self, queryable: String) -> PyResult<Queryable> {
53        let queryable = self.0.raw(queryable).await?;
54
55        Ok(Queryable(queryable))
56    }
57}
58
59#[pyclass]
60pub struct Header(pub ird::Header);
61
62#[pyclass]
63pub struct PyDataflowMessage {
64    pub data: PyArrowType<ArrayData>,
65    pub header: Header,
66}
67
68#[pymethods]
69impl PyDataflowMessage {
70    #[getter]
71    pub fn data(&self) -> PyResult<PyArrowType<ArrayData>> {
72        let array = self.data.0.clone();
73
74        Ok(PyArrowType(array))
75    }
76}
77
78#[pyclass]
79pub struct Input(pub ird::RawInput);
80
81#[pymethods]
82impl Input {
83    pub async fn recv(&mut self) -> PyResult<PyDataflowMessage> {
84        let DataflowMessage { header, data } = self.0.recv().await?;
85
86        Ok(PyDataflowMessage {
87            data: PyArrowType(data),
88            header: Header(header),
89        })
90    }
91}
92
93#[pyclass]
94pub struct Output(pub ird::RawOutput);
95
96#[pymethods]
97impl Output {
98    pub async fn send(&mut self, data: PyArrowType<ArrayData>) -> PyResult<()> {
99        self.0.send(data.0).await?;
100
101        Ok(())
102    }
103}
104
105#[pyclass]
106pub struct Query(pub ird::RawQuery);
107
108#[pymethods]
109impl Query {
110    pub async fn query(&mut self, data: PyArrowType<ArrayData>) -> PyResult<PyDataflowMessage> {
111        let DataflowMessage { header, data } = self.0.query(data.0).await?;
112
113        Ok(PyDataflowMessage {
114            data: PyArrowType(data),
115            header: Header(header),
116        })
117    }
118}
119
120#[pyclass]
121pub struct Queryable(pub ird::RawQueryable);
122
123// TODO: should accept async python callbacks
124#[pymethods]
125impl Queryable {
126    pub async fn on_query(&mut self, response: PyObject) -> PyResult<()> {
127        self.0
128            .on_query(async |query: DataflowMessage| {
129                let DataflowMessage { header, data } = query;
130                let message = PyDataflowMessage {
131                    data: PyArrowType(data),
132                    header: Header(header),
133                };
134
135                let array = Python::with_gil(|py| -> PyResult<ArrayData> {
136                    let array = response.call1(py, (message,))?.into_bound(py);
137                    let array = ArrayData::from_pyarrow_bound(&array);
138
139                    array
140                })?;
141
142                Ok(array)
143            })
144            .await?;
145
146        Ok(())
147    }
148}