1use crate::prelude::*;
2
3#[derive(Debug)]
5pub struct Input<T: ArrowMessage> {
6 pub raw: RawInput,
7 _phantom: std::marker::PhantomData<T>,
8}
9
10impl<T: ArrowMessage> Input<T> {
11 pub fn new(rx: MessageReceiver, source: NodeLayout, layout: InputLayout) -> Self {
13 Self {
14 raw: RawInput::new(rx, source, layout),
15 _phantom: std::marker::PhantomData,
16 }
17 }
18
19 pub fn blocking_recv(&mut self) -> Result<(Header, T)> {
22 let (header, data) = self
23 .raw
24 .blocking_recv()
25 .wrap_err(report_error_receiving(&self.raw.source, &self.raw.layout))?;
26
27 let message = T::try_from_arrow(data).wrap_err(
28 report_failed_conversion_from_arrow::<T>(&self.raw.source, &self.raw.layout),
29 )?;
30
31 Ok((header, message))
32 }
33
34 pub async fn recv(&mut self) -> Result<(Header, T)> {
36 let (header, data) = self
37 .raw
38 .recv()
39 .await
40 .wrap_err(report_error_receiving(&self.raw.source, &self.raw.layout))?;
41
42 let message = T::try_from_arrow(data).wrap_err(
43 report_failed_conversion_from_arrow::<T>(&self.raw.source, &self.raw.layout),
44 )?;
45
46 Ok((header, message))
47 }
48}