flarrow_api/
io.rs

1use eyre::{Context, OptionExt};
2use std::{collections::HashMap, sync::Arc};
3
4use tokio::sync::{
5    Mutex,
6    broadcast::{Receiver, Sender},
7};
8
9use arrow_array::Array;
10use arrow_data::ArrayData;
11
12use crate::prelude::*;
13
14pub struct RawOutput {
15    clock: Arc<uhlc::HLC>,
16
17    pub tx: Sender<DataflowMessage>,
18}
19
20impl RawOutput {
21    pub fn new(clock: Arc<uhlc::HLC>, tx: Sender<DataflowMessage>) -> Self {
22        Self { clock, tx }
23    }
24
25    pub fn send(&self, data: ArrayData) -> eyre::Result<()> {
26        let data = DataflowMessage {
27            header: Header {
28                timestamp: self.clock.new_timestamp(),
29            },
30            data,
31        };
32
33        self.tx
34            .send(data)
35            .map(|_| ())
36            .map_err(eyre::Report::msg)
37            .wrap_err("Failed to send the message")
38    }
39}
40
41pub struct Output<T: ArrowMessage> {
42    pub raw: RawOutput,
43
44    _phantom: std::marker::PhantomData<T>,
45}
46
47impl<T: ArrowMessage> Output<T> {
48    pub fn new(clock: Arc<uhlc::HLC>, tx: Sender<DataflowMessage>) -> Self {
49        Self {
50            raw: RawOutput::new(clock, tx),
51            _phantom: std::marker::PhantomData,
52        }
53    }
54
55    pub fn send(&self, data: T) -> eyre::Result<()> {
56        self.raw.send(
57            data.try_into_arrow()
58                .wrap_err("Failed to convert arrow 'data' to message T")?
59                .into_data(),
60        )
61    }
62}
63
64pub struct RawInput {
65    pub rx: Receiver<DataflowMessage>,
66}
67
68impl RawInput {
69    pub fn new(rx: Receiver<DataflowMessage>) -> Self {
70        Self { rx }
71    }
72
73    pub fn recv(&mut self) -> eyre::Result<(Header, ArrayData)> {
74        let DataflowMessage { header, data } = self
75            .rx
76            .blocking_recv()
77            .map_err(eyre::Report::msg)
78            .wrap_err("Failed to receive from this input")?;
79
80        Ok((header, data))
81    }
82
83    pub async fn recv_async(&mut self) -> eyre::Result<(Header, ArrayData)> {
84        let DataflowMessage { header, data } = self
85            .rx
86            .recv()
87            .await
88            .map_err(eyre::Report::msg)
89            .wrap_err("Failed to receive from this input")?;
90
91        Ok((header, data))
92    }
93}
94
95pub struct Input<T: ArrowMessage> {
96    pub raw: RawInput,
97
98    _phantom: std::marker::PhantomData<T>,
99}
100
101impl<T: ArrowMessage> Input<T> {
102    pub fn new(rx: Receiver<DataflowMessage>) -> Self {
103        Self {
104            raw: RawInput::new(rx),
105            _phantom: std::marker::PhantomData,
106        }
107    }
108
109    pub fn recv(&mut self) -> eyre::Result<(Header, T)> {
110        let (header, data) = self.raw.recv()?;
111
112        Ok((
113            header,
114            T::try_from_arrow(data).wrap_err("Failed to convert arrow 'data' to message T")?,
115        ))
116    }
117
118    pub async fn recv_async(&mut self) -> eyre::Result<(Header, T)> {
119        let (header, data) = self.raw.recv_async().await?;
120
121        Ok((
122            header,
123            T::try_from_arrow(data).wrap_err("Failed to convert arrow 'data' to message T")?,
124        ))
125    }
126}
127
128pub struct Inputs {
129    node: NodeID,
130    receivers: Arc<Mutex<HashMap<InputID, Receiver<DataflowMessage>>>>,
131}
132
133impl Inputs {
134    pub fn new(
135        node: NodeID,
136        receivers: Arc<Mutex<HashMap<InputID, Receiver<DataflowMessage>>>>,
137    ) -> Self {
138        Self { node, receivers }
139    }
140
141    pub async fn raw(&mut self, input: impl Into<String>) -> eyre::Result<RawInput> {
142        let id = self.node.input(input);
143
144        let receiver = self
145            .receivers
146            .lock()
147            .await
148            .remove(&id)
149            .ok_or_eyre(format!("Input {} not found", id.0))?;
150
151        Ok(RawInput::new(receiver))
152    }
153
154    pub async fn with<T: ArrowMessage>(
155        &mut self,
156        input: impl Into<String>,
157    ) -> eyre::Result<Input<T>> {
158        let id = self.node.input(input);
159
160        let receiver = self
161            .receivers
162            .lock()
163            .await
164            .remove(&id)
165            .ok_or_eyre(format!("Input {} not found", id.0))?;
166
167        Ok(Input::new(receiver))
168    }
169}
170
171pub struct Outputs {
172    node: NodeID,
173    clock: Arc<uhlc::HLC>,
174    senders: Arc<Mutex<HashMap<OutputID, Sender<DataflowMessage>>>>,
175}
176
177impl Outputs {
178    pub fn new(
179        node: NodeID,
180        clock: Arc<uhlc::HLC>,
181        senders: Arc<Mutex<HashMap<OutputID, Sender<DataflowMessage>>>>,
182    ) -> Self {
183        Self {
184            node,
185            clock,
186            senders,
187        }
188    }
189
190    pub async fn raw(&mut self, output: impl Into<String>) -> eyre::Result<RawOutput> {
191        let id = self.node.output(output);
192
193        let sender = self
194            .senders
195            .lock()
196            .await
197            .remove(&id)
198            .ok_or_eyre(format!("Output {} not found", id.0))?;
199
200        Ok(RawOutput::new(self.clock.clone(), sender))
201    }
202
203    pub async fn with<T: ArrowMessage>(
204        &mut self,
205        output: impl Into<String>,
206    ) -> eyre::Result<Output<T>> {
207        let id = self.node.output(output);
208
209        let sender = self
210            .senders
211            .lock()
212            .await
213            .remove(&id)
214            .ok_or_eyre(format!("Output {} not found", id.0))?;
215
216        Ok(Output::new(self.clock.clone(), sender))
217    }
218}