1use iridis_api::prelude::DataflowMessage;
2
3use crate::prelude::{
4 thirdparty::{
5 arrow::{array::*, pyarrow::*},
6 pyo3::prelude::*,
7 },
8 *,
9};
10
11#[pyclass]
12pub struct Inputs(pub ird::Inputs);
13
14#[pymethods]
15impl Inputs {
16 pub async fn with_input(&mut self, input: String) -> PyResult<Input> {
17 let input = self.0.raw(input).await?;
18
19 Ok(Input(input))
20 }
21}
22
23#[pyclass]
24pub struct Outputs(pub ird::Outputs);
25
26#[pymethods]
27impl Outputs {
28 pub async fn with_output(&mut self, output: String) -> PyResult<Output> {
29 let output = self.0.raw(output).await?;
30
31 Ok(Output(output))
32 }
33}
34
35#[pyclass]
36pub struct Queries(pub ird::Queries);
37
38#[pymethods]
39impl Queries {
40 pub async fn with_query(&mut self, query: String) -> PyResult<Query> {
41 let query = self.0.raw(query).await?;
42
43 Ok(Query(query))
44 }
45}
46
47#[pyclass]
48pub struct Queryables(pub ird::Queryables);
49
50#[pymethods]
51impl Queryables {
52 pub async fn with_queryable(&mut self, queryable: String) -> PyResult<Queryable> {
53 let queryable = self.0.raw(queryable).await?;
54
55 Ok(Queryable(queryable))
56 }
57}
58
59#[pyclass]
60pub struct Header(pub ird::Header);
61
62#[pymethods]
63impl Header {
64 #[getter]
65 pub fn source_node(&self) -> String {
66 let (a, _) = self.0.source;
67 a.to_string()
68 }
69
70 #[getter]
71 pub fn source_io(&self) -> String {
72 let (_, b) = self.0.source;
73 b.to_string()
74 }
75
76 #[getter]
77 pub fn elapsed(&self) -> u128 {
78 let elapsed = self
79 .0
80 .timestamp
81 .get_time()
82 .to_system_time()
83 .elapsed()
84 .unwrap_or_default()
85 .as_millis();
86
87 elapsed
88 }
89}
90
91#[pyclass]
92pub struct PyDataflowMessage {
93 pub data: PyArrowType<ArrayData>,
94 pub header: Header,
95}
96
97#[pymethods]
98impl PyDataflowMessage {
99 #[getter]
100 pub fn data(&self) -> PyArrowType<ArrayData> {
101 let array = self.data.0.clone();
102
103 PyArrowType(array)
104 }
105
106 #[getter]
107 pub fn header(&self) -> Header {
108 let header = self.header.0.clone();
109
110 Header(header)
111 }
112}
113
114#[pyclass]
115pub struct Input(pub ird::RawInput);
116
117#[pymethods]
118impl Input {
119 pub async fn recv(&mut self) -> PyResult<PyDataflowMessage> {
120 let DataflowMessage { header, data } = self.0.recv().await?;
121
122 Ok(PyDataflowMessage {
123 data: PyArrowType(data),
124 header: Header(header),
125 })
126 }
127}
128
129#[pyclass]
130pub struct Output(pub ird::RawOutput);
131
132#[pymethods]
133impl Output {
134 pub async fn send(&mut self, data: PyArrowType<ArrayData>) -> PyResult<()> {
135 self.0.send(data.0).await?;
136
137 Ok(())
138 }
139}
140
141#[pyclass]
142pub struct Query(pub ird::RawQuery);
143
144#[pymethods]
145impl Query {
146 pub async fn query(&mut self, data: PyArrowType<ArrayData>) -> PyResult<PyDataflowMessage> {
147 let DataflowMessage { header, data } = self.0.query(data.0).await?;
148
149 Ok(PyDataflowMessage {
150 data: PyArrowType(data),
151 header: Header(header),
152 })
153 }
154}
155
156#[pyclass]
157pub struct Queryable(pub ird::RawQueryable);
158
159#[pymethods]
161impl Queryable {
162 pub async fn on_query(&mut self, response: PyObject) -> PyResult<()> {
163 self.0
164 .on_query(async |query: DataflowMessage| {
165 let DataflowMessage { header, data } = query;
166 let message = PyDataflowMessage {
167 data: PyArrowType(data),
168 header: Header(header),
169 };
170
171 let array = Python::with_gil(|py| -> PyResult<ArrayData> {
172 let array = response.call1(py, (message,))?.into_bound(py);
173
174 ArrayData::from_pyarrow_bound(&array)
175 })?;
176
177 Ok(array)
178 })
179 .await?;
180
181 Ok(())
182 }
183}