iridis_node/primitives/
output.rs

1//! This module contains implementations for this primitive.
2
3use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_array::Array, *};
6
7/// Typed Output to receive data from the dataflow
8pub struct Output<T: ArrowMessage> {
9    pub raw: RawOutput,
10    _phantom: std::marker::PhantomData<T>,
11}
12
13impl<T: ArrowMessage> Output<T> {
14    /// Create a new typed Output from a MessageSender, NodeID, and OutputID
15    pub fn new(tx: Vec<MessageSender>, clock: Arc<HLC>, source: NodeID, layout: OutputID) -> Self {
16        Self {
17            raw: RawOutput::new(tx, clock, source, layout),
18            _phantom: std::marker::PhantomData,
19        }
20    }
21
22    /// Send a message to the output asynchronously.
23    pub async fn send(&self, data: T) -> Result<()> {
24        self.raw
25            .send(
26                data.try_into_arrow()
27                    .wrap_err(report_failed_conversion_to_arrow::<T>(
28                        &self.raw.source,
29                        &self.raw.layout,
30                    ))?
31                    .into_data(),
32            )
33            .await
34    }
35}