1use crate::prelude::*;
2
3#[derive(Debug)]
5pub struct Input<T: ArrowMessage> {
6    pub raw: RawInput,
7    _phantom: std::marker::PhantomData<T>,
8}
9
10impl<T: ArrowMessage> Input<T> {
11    pub fn new(rx: MessageReceiver, source: NodeLayout, layout: InputLayout) -> Self {
13        Self {
14            raw: RawInput::new(rx, source, layout),
15            _phantom: std::marker::PhantomData,
16        }
17    }
18
19    pub async fn recv(&mut self) -> Result<(Header, T)> {
21        let DataflowMessage { header, data } = self
22            .raw
23            .recv()
24            .await
25            .wrap_err(report_error_receiving(&self.raw.source, &self.raw.layout))?;
26
27        let message = T::try_from_arrow(data).wrap_err(
28            report_failed_conversion_from_arrow::<T>(&self.raw.source, &self.raw.layout),
29        )?;
30
31        Ok((header, message))
32    }
33}