flarrow_api/io/
raw_input.rs

1use crate::prelude::*;
2use thirdparty::arrow_data::ArrayData;
3
4/// Not typed Input to receive data from the dataflow
5#[derive(Debug)]
6pub struct RawInput {
7    /// The receiver part of the MPSC channel
8    pub rx: MessageReceiver,
9
10    /// The source node layout, useful for debugging
11    pub source: NodeLayout,
12    /// The layout of the input, useful for debugging
13    pub layout: InputLayout,
14}
15
16impl RawInput {
17    /// Create a new RawInput instance
18    pub fn new(rx: MessageReceiver, source: NodeLayout, layout: InputLayout) -> Self {
19        Self { rx, source, layout }
20    }
21
22    /// Receive a message from the channel, blocking until one is available, don't use it
23    /// in async context
24    pub fn blocking_recv(&mut self) -> Result<(Header, ArrayData)> {
25        let DataflowMessage { header, data } = self
26            .rx
27            .blocking_recv()
28            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
29
30        Ok((header, data))
31    }
32
33    /// Receive a message from the channel, asynchronously
34    pub async fn recv(&mut self) -> Result<(Header, ArrayData)> {
35        let DataflowMessage { header, data } = self
36            .rx
37            .recv()
38            .await
39            .ok_or_eyre(report_error_receiving(&self.source, &self.layout))?;
40
41        Ok((header, data))
42    }
43}