1use iridis_api::prelude::DataflowMessage;
2
3use crate::prelude::{
4 thirdparty::{
5 arrow::{array::*, pyarrow::*},
6 pyo3::prelude::*,
7 },
8 *,
9};
10
11#[pyclass]
12pub struct Inputs(pub ird::Inputs);
13
14#[pymethods]
15impl Inputs {
16 pub async fn with_input(&mut self, input: String) -> PyResult<Input> {
17 let input = self.0.raw(input).await?;
18
19 Ok(Input(input))
20 }
21}
22
23#[pyclass]
24pub struct Outputs(pub ird::Outputs);
25
26#[pymethods]
27impl Outputs {
28 pub async fn with_output(&mut self, output: String) -> PyResult<Output> {
29 let output = self.0.raw(output).await?;
30
31 Ok(Output(output))
32 }
33}
34
35#[pyclass]
36pub struct Queries(pub ird::Queries);
37
38#[pymethods]
39impl Queries {
40 pub async fn with_query(&mut self, query: String) -> PyResult<Query> {
41 let query = self.0.raw(query).await?;
42
43 Ok(Query(query))
44 }
45}
46
47#[pyclass]
48pub struct Queryables(pub ird::Queryables);
49
50#[pymethods]
51impl Queryables {
52 pub async fn with_queryable(&mut self, queryable: String) -> PyResult<Queryable> {
53 let queryable = self.0.raw(queryable).await?;
54
55 Ok(Queryable(queryable))
56 }
57}
58
59#[pyclass]
60pub struct Header(pub ird::Header);
61
62#[pyclass]
63pub struct PyDataflowMessage {
64 pub data: PyArrowType<ArrayData>,
65 pub header: Header,
66}
67
68#[pymethods]
69impl PyDataflowMessage {
70 #[getter]
71 pub fn data(&self) -> PyResult<PyArrowType<ArrayData>> {
72 let array = self.data.0.clone();
73
74 Ok(PyArrowType(array))
75 }
76}
77
78#[pyclass]
79pub struct Input(pub ird::RawInput);
80
81#[pymethods]
82impl Input {
83 pub async fn recv(&mut self) -> PyResult<PyDataflowMessage> {
84 let DataflowMessage { header, data } = self.0.recv().await?;
85
86 Ok(PyDataflowMessage {
87 data: PyArrowType(data),
88 header: Header(header),
89 })
90 }
91}
92
93#[pyclass]
94pub struct Output(pub ird::RawOutput);
95
96#[pymethods]
97impl Output {
98 pub async fn send(&mut self, data: PyArrowType<ArrayData>) -> PyResult<()> {
99 self.0.send(data.0).await?;
100
101 Ok(())
102 }
103}
104
105#[pyclass]
106pub struct Query(pub ird::RawQuery);
107
108#[pymethods]
109impl Query {
110 pub async fn query(&mut self, data: PyArrowType<ArrayData>) -> PyResult<PyDataflowMessage> {
111 let DataflowMessage { header, data } = self.0.query(data.0).await?;
112
113 Ok(PyDataflowMessage {
114 data: PyArrowType(data),
115 header: Header(header),
116 })
117 }
118}
119
120#[pyclass]
121pub struct Queryable(pub ird::RawQueryable);
122
123#[pymethods]
125impl Queryable {
126 pub async fn on_query(&mut self, response: PyObject) -> PyResult<()> {
127 self.0
128 .on_query(async |query: DataflowMessage| {
129 let DataflowMessage { header, data } = query;
130 let message = PyDataflowMessage {
131 data: PyArrowType(data),
132 header: Header(header),
133 };
134
135 let array = Python::with_gil(|py| -> PyResult<ArrayData> {
136 let array = response.call1(py, (message,))?.into_bound(py);
137 let array = ArrayData::from_pyarrow_bound(&array);
138
139 array
140 })?;
141
142 Ok(array)
143 })
144 .await?;
145
146 Ok(())
147 }
148}