1use eyre::{Context, OptionExt};
2use std::{collections::HashMap, sync::Arc};
3
4use tokio::sync::{
5 Mutex,
6 broadcast::{Receiver, Sender},
7};
8
9use arrow_array::Array;
10use arrow_data::ArrayData;
11
12use crate::prelude::*;
13
14pub struct RawOutput {
15 clock: Arc<uhlc::HLC>,
16
17 pub tx: Sender<DataflowMessage>,
18}
19
20impl RawOutput {
21 pub fn new(clock: Arc<uhlc::HLC>, tx: Sender<DataflowMessage>) -> Self {
22 Self { clock, tx }
23 }
24
25 pub fn send(&self, data: ArrayData) -> eyre::Result<()> {
26 let data = DataflowMessage {
27 header: Header {
28 timestamp: self.clock.new_timestamp(),
29 },
30 data,
31 };
32
33 self.tx
34 .send(data)
35 .map(|_| ())
36 .map_err(eyre::Report::msg)
37 .wrap_err("Failed to send the message")
38 }
39}
40
41pub struct Output<T: ArrowMessage> {
42 pub raw: RawOutput,
43
44 _phantom: std::marker::PhantomData<T>,
45}
46
47impl<T: ArrowMessage> Output<T> {
48 pub fn new(clock: Arc<uhlc::HLC>, tx: Sender<DataflowMessage>) -> Self {
49 Self {
50 raw: RawOutput::new(clock, tx),
51 _phantom: std::marker::PhantomData,
52 }
53 }
54
55 pub fn send(&self, data: T) -> eyre::Result<()> {
56 self.raw.send(
57 data.try_into_arrow()
58 .wrap_err("Failed to convert arrow 'data' to message T")?
59 .into_data(),
60 )
61 }
62}
63
64pub struct RawInput {
65 pub rx: Receiver<DataflowMessage>,
66}
67
68impl RawInput {
69 pub fn new(rx: Receiver<DataflowMessage>) -> Self {
70 Self { rx }
71 }
72
73 pub fn recv(&mut self) -> eyre::Result<(Header, ArrayData)> {
74 let DataflowMessage { header, data } = self
75 .rx
76 .blocking_recv()
77 .map_err(eyre::Report::msg)
78 .wrap_err("Failed to receive from this input")?;
79
80 Ok((header, data))
81 }
82
83 pub async fn recv_async(&mut self) -> eyre::Result<(Header, ArrayData)> {
84 let DataflowMessage { header, data } = self
85 .rx
86 .recv()
87 .await
88 .map_err(eyre::Report::msg)
89 .wrap_err("Failed to receive from this input")?;
90
91 Ok((header, data))
92 }
93}
94
95pub struct Input<T: ArrowMessage> {
96 pub raw: RawInput,
97
98 _phantom: std::marker::PhantomData<T>,
99}
100
101impl<T: ArrowMessage> Input<T> {
102 pub fn new(rx: Receiver<DataflowMessage>) -> Self {
103 Self {
104 raw: RawInput::new(rx),
105 _phantom: std::marker::PhantomData,
106 }
107 }
108
109 pub fn recv(&mut self) -> eyre::Result<(Header, T)> {
110 let (header, data) = self.raw.recv()?;
111
112 Ok((
113 header,
114 T::try_from_arrow(data).wrap_err("Failed to convert arrow 'data' to message T")?,
115 ))
116 }
117
118 pub async fn recv_async(&mut self) -> eyre::Result<(Header, T)> {
119 let (header, data) = self.raw.recv_async().await?;
120
121 Ok((
122 header,
123 T::try_from_arrow(data).wrap_err("Failed to convert arrow 'data' to message T")?,
124 ))
125 }
126}
127
128pub struct Inputs {
129 node: NodeID,
130 receivers: Arc<Mutex<HashMap<InputID, Receiver<DataflowMessage>>>>,
131}
132
133impl Inputs {
134 pub fn new(
135 node: NodeID,
136 receivers: Arc<Mutex<HashMap<InputID, Receiver<DataflowMessage>>>>,
137 ) -> Self {
138 Self { node, receivers }
139 }
140
141 pub async fn raw(&mut self, input: impl Into<String>) -> eyre::Result<RawInput> {
142 let id = self.node.input(input);
143
144 let receiver = self
145 .receivers
146 .lock()
147 .await
148 .remove(&id)
149 .ok_or_eyre(format!("Input {} not found", id.0))?;
150
151 Ok(RawInput::new(receiver))
152 }
153
154 pub async fn with<T: ArrowMessage>(
155 &mut self,
156 input: impl Into<String>,
157 ) -> eyre::Result<Input<T>> {
158 let id = self.node.input(input);
159
160 let receiver = self
161 .receivers
162 .lock()
163 .await
164 .remove(&id)
165 .ok_or_eyre(format!("Input {} not found", id.0))?;
166
167 Ok(Input::new(receiver))
168 }
169}
170
171pub struct Outputs {
172 node: NodeID,
173 clock: Arc<uhlc::HLC>,
174 senders: Arc<Mutex<HashMap<OutputID, Sender<DataflowMessage>>>>,
175}
176
177impl Outputs {
178 pub fn new(
179 node: NodeID,
180 clock: Arc<uhlc::HLC>,
181 senders: Arc<Mutex<HashMap<OutputID, Sender<DataflowMessage>>>>,
182 ) -> Self {
183 Self {
184 node,
185 clock,
186 senders,
187 }
188 }
189
190 pub async fn raw(&mut self, output: impl Into<String>) -> eyre::Result<RawOutput> {
191 let id = self.node.output(output);
192
193 let sender = self
194 .senders
195 .lock()
196 .await
197 .remove(&id)
198 .ok_or_eyre(format!("Output {} not found", id.0))?;
199
200 Ok(RawOutput::new(self.clock.clone(), sender))
201 }
202
203 pub async fn with<T: ArrowMessage>(
204 &mut self,
205 output: impl Into<String>,
206 ) -> eyre::Result<Output<T>> {
207 let id = self.node.output(output);
208
209 let sender = self
210 .senders
211 .lock()
212 .await
213 .remove(&id)
214 .ok_or_eyre(format!("Output {} not found", id.0))?;
215
216 Ok(Output::new(self.clock.clone(), sender))
217 }
218}