iridis_api/io/
output.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6/// Typed Output to receive data from the dataflow
7pub struct Output<T: ArrowMessage> {
8    pub raw: RawOutput,
9    _phantom: std::marker::PhantomData<T>,
10}
11
12impl<T: ArrowMessage> Output<T> {
13    /// Create a new typed Output from a MessageSender, NodeLayout, and OutputLayout
14    pub fn new(
15        tx: Vec<MessageSender>,
16        clock: Arc<HLC>,
17        source: NodeLayout,
18        layout: OutputLayout,
19    ) -> Self {
20        Self {
21            raw: RawOutput::new(tx, clock, source, layout),
22            _phantom: std::marker::PhantomData,
23        }
24    }
25
26    /// Send a message to the output asynchronously.
27    pub async fn send(&self, data: T) -> Result<()> {
28        self.raw
29            .send(
30                data.try_into_arrow()
31                    .wrap_err(report_failed_conversion_to_arrow::<T>(
32                        &self.raw.source,
33                        &self.raw.layout,
34                    ))?
35                    .into_data(),
36            )
37            .await
38    }
39}