flarrow_api/io/
output.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_array::Array;
5
6/// Typed Output to receive data from the dataflow
7pub struct Output<T: ArrowMessage> {
8    pub raw: RawOutput,
9    _phantom: std::marker::PhantomData<T>,
10}
11
12impl<T: ArrowMessage> Output<T> {
13    /// Create a new typed Output from a MessageSender, NodeLayout, and OutputLayout
14    pub fn new(
15        tx: Vec<MessageSender>,
16        clock: Arc<HLC>,
17        source: NodeLayout,
18        layout: OutputLayout,
19    ) -> Self {
20        Self {
21            raw: RawOutput::new(tx, clock, source, layout),
22            _phantom: std::marker::PhantomData,
23        }
24    }
25
26    /// Send a message to the output, blocking the current thread until the message is sent.
27    /// Don't use in async context
28    pub fn blocking_send(&self, data: T) -> Result<()> {
29        self.raw.blocking_send(
30            data.try_into_arrow()
31                .wrap_err(report_failed_conversion_to_arrow::<T>(
32                    &self.raw.source,
33                    &self.raw.layout,
34                ))?
35                .into_data(),
36        )
37    }
38
39    /// Send a message to the output asynchronously.
40    pub async fn send(&self, data: T) -> Result<()> {
41        self.raw
42            .send(
43                data.try_into_arrow()
44                    .wrap_err(report_failed_conversion_to_arrow::<T>(
45                        &self.raw.source,
46                        &self.raw.layout,
47                    ))?
48                    .into_data(),
49            )
50            .await
51    }
52}