1use iridis_api::prelude::DataflowMessage;
2use pyo3::IntoPyObjectExt;
3
4use crate::prelude::{
5 thirdparty::{
6 arrow::{array::*, pyarrow::*},
7 pyo3::prelude::*,
8 },
9 *,
10};
11
12#[pyclass]
13pub struct Inputs(pub ird::Inputs);
14
15#[pymethods]
16impl Inputs {
17 pub async fn with_input(&mut self, input: String) -> PyResult<Input> {
18 let input = self.0.raw(input).await?;
19
20 Ok(Input(input))
21 }
22}
23
24#[pyclass]
25pub struct Outputs(pub ird::Outputs);
26
27#[pymethods]
28impl Outputs {
29 pub async fn with_output(&mut self, output: String) -> PyResult<Output> {
30 let output = self.0.raw(output).await?;
31
32 Ok(Output(output))
33 }
34}
35
36#[pyclass]
37pub struct Queries(pub ird::Queries);
38
39#[pymethods]
40impl Queries {
41 pub async fn with_query(&mut self, query: String) -> PyResult<Query> {
42 let query = self.0.raw(query).await?;
43
44 Ok(Query(query))
45 }
46}
47
48#[pyclass]
49pub struct Queryables(pub ird::Queryables);
50
51#[pymethods]
52impl Queryables {
53 pub async fn with_queryable(&mut self, queryable: String) -> PyResult<Queryable> {
54 let queryable = self.0.raw(queryable).await?;
55
56 Ok(Queryable(queryable))
57 }
58}
59
60#[pyclass]
61pub struct Header(pub ird::Header);
62
63#[pymethods]
64impl Header {
65 #[getter]
66 pub fn source_node(&self) -> String {
67 let (a, _) = self.0.source;
68 a.to_string()
69 }
70
71 #[getter]
72 pub fn source_io(&self) -> String {
73 let (_, b) = self.0.source;
74 b.to_string()
75 }
76
77 #[getter]
78 pub fn timestamp(&self, py: Python) -> PyResult<PyObject> {
79 self.0.timestamp.get_time().to_system_time().into_py_any(py)
80 }
81
82 #[getter]
83 pub fn elapsed(&self) -> u128 {
84 let elapsed = self
85 .0
86 .timestamp
87 .get_time()
88 .to_system_time()
89 .elapsed()
90 .unwrap_or_default()
91 .as_nanos();
92
93 elapsed
94 }
95}
96
97#[pyclass]
98pub struct PyDataflowMessage {
99 pub data: PyArrowType<ArrayData>,
100 pub header: Header,
101}
102
103#[pymethods]
104impl PyDataflowMessage {
105 #[getter]
106 pub fn data(&self) -> PyArrowType<ArrayData> {
107 let array = self.data.0.clone();
108
109 PyArrowType(array)
110 }
111
112 #[getter]
113 pub fn header(&self) -> Header {
114 let header = self.header.0.clone();
115
116 Header(header)
117 }
118}
119
120#[pyclass]
121pub struct Input(pub ird::RawInput);
122
123#[pymethods]
124impl Input {
125 pub async fn recv(&mut self) -> PyResult<PyDataflowMessage> {
126 let DataflowMessage { header, data } = self.0.recv().await?;
127
128 Ok(PyDataflowMessage {
129 data: PyArrowType(data),
130 header: Header(header),
131 })
132 }
133}
134
135#[pyclass]
136pub struct Output(pub ird::RawOutput);
137
138#[pymethods]
139impl Output {
140 pub async fn send(&mut self, data: PyArrowType<ArrayData>) -> PyResult<()> {
141 self.0.send(data.0).await?;
142
143 Ok(())
144 }
145}
146
147#[pyclass]
148pub struct Query(pub ird::RawQuery);
149
150#[pymethods]
151impl Query {
152 pub async fn query(&mut self, data: PyArrowType<ArrayData>) -> PyResult<PyDataflowMessage> {
153 let DataflowMessage { header, data } = self.0.query(data.0).await?;
154
155 Ok(PyDataflowMessage {
156 data: PyArrowType(data),
157 header: Header(header),
158 })
159 }
160}
161
162#[pyclass]
163pub struct Queryable(pub ird::RawQueryable);
164
165#[pymethods]
167impl Queryable {
168 pub async fn on_query(&mut self, response: PyObject) -> PyResult<()> {
169 self.0
170 .on_query(async |query: DataflowMessage| {
171 let DataflowMessage { header, data } = query;
172 let message = PyDataflowMessage {
173 data: PyArrowType(data),
174 header: Header(header),
175 };
176
177 let array = Python::with_gil(|py| -> PyResult<ArrayData> {
178 let array = response.call1(py, (message,))?.into_bound(py);
179
180 ArrayData::from_pyarrow_bound(&array)
181 })?;
182
183 Ok(array)
184 })
185 .await?;
186
187 Ok(())
188 }
189}