flarrow_api/io/
input.rs

1use crate::prelude::*;
2
3/// Typed Input to receive data from the dataflow
4#[derive(Debug)]
5pub struct Input<T: ArrowMessage> {
6    pub raw: RawInput,
7    _phantom: std::marker::PhantomData<T>,
8}
9
10impl<T: ArrowMessage> Input<T> {
11    /// Create a new typed Input from a MessageReceiver, NodeLayout, and InputLayout
12    pub fn new(rx: MessageReceiver, source: NodeLayout, layout: InputLayout) -> Self {
13        Self {
14            raw: RawInput::new(rx, source, layout),
15            _phantom: std::marker::PhantomData,
16        }
17    }
18
19    /// Receive a message from the channel and converting it from Arrow format, blocking until one is available, don't use it
20    /// in async context
21    pub fn blocking_recv(&mut self) -> Result<(Header, T)> {
22        let (header, data) = self
23            .raw
24            .blocking_recv()
25            .wrap_err(report_error_receiving(&self.raw.source, &self.raw.layout))?;
26
27        let message = T::try_from_arrow(data).wrap_err(
28            report_failed_conversion_from_arrow::<T>(&self.raw.source, &self.raw.layout),
29        )?;
30
31        Ok((header, message))
32    }
33
34    /// Receive a message from the channel and converting it from Arrow format, asyncronously
35    pub async fn recv(&mut self) -> Result<(Header, T)> {
36        let (header, data) = self
37            .raw
38            .recv()
39            .await
40            .wrap_err(report_error_receiving(&self.raw.source, &self.raw.layout))?;
41
42        let message = T::try_from_arrow(data).wrap_err(
43            report_failed_conversion_from_arrow::<T>(&self.raw.source, &self.raw.layout),
44        )?;
45
46        Ok((header, message))
47    }
48}