iridis_node/primitives/
input.rs

1//! This module contains implementations for this primitive.
2
3use crate::prelude::*;
4
5/// Typed Input to receive data from the dataflow
6#[derive(Debug)]
7pub struct Input<T: ArrowMessage> {
8    pub raw: RawInput,
9    _phantom: std::marker::PhantomData<T>,
10}
11
12impl<T: ArrowMessage> Input<T> {
13    /// Create a new typed Input from a MessageReceiver, NodeID, and InputID
14    pub fn new(rx: MessageReceiver, source: NodeID, layout: InputID) -> Self {
15        Self {
16            raw: RawInput::new(rx, source, layout),
17            _phantom: std::marker::PhantomData,
18        }
19    }
20
21    /// Receive a message from the channel and converting it from Arrow format, asyncronously
22    pub async fn recv(&mut self) -> Result<TypedDataflowMessage<T>> {
23        self.raw
24            .recv()
25            .await?
26            .try_into()
27            .wrap_err(report_failed_conversion_from_arrow::<T>(
28                &self.raw.source,
29                &self.raw.layout,
30            ))
31    }
32}