flarrow_api/io/
raw_output.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6/// Not typed Output to receive data from the dataflow
7pub struct RawOutput {
8    /// The senders parts of MPSC channels
9    pub tx: Vec<MessageSender>,
10    /// The shared clock of the runtime
11    pub clock: Arc<HLC>,
12
13    /// The source node layout, useful for debugging
14    pub source: NodeLayout,
15    /// The layout of the output, useful for debugging
16    pub layout: OutputLayout,
17}
18
19impl RawOutput {
20    /// Create a new RawOutput instance
21    pub fn new(
22        tx: Vec<MessageSender>,
23        clock: Arc<HLC>,
24        source: NodeLayout,
25        layout: OutputLayout,
26    ) -> Self {
27        Self {
28            tx,
29            clock,
30            source,
31            layout,
32        }
33    }
34
35    /// Send a message from the channel, blocking until it's possible, don't use it
36    /// in async context
37    pub fn blocking_send(&self, data: ArrayData) -> Result<()> {
38        let data = DataflowMessage {
39            header: Header {
40                timestamp: self.clock.new_timestamp(),
41                source: (self.source.uuid, self.layout.uuid),
42            },
43            data,
44        };
45
46        let results: Vec<Result<()>> = self
47            .tx
48            .iter()
49            .map(|tx| {
50                tx.blocking_send(data.clone())
51                    .map_err(eyre::Report::msg)
52                    .wrap_err(report_error_sending(&self.source, &self.layout))
53            })
54            .collect();
55
56        if results.iter().all(|r| r.is_ok()) {
57            Ok(())
58        } else {
59            let combined_report: eyre::Report = results
60                .into_iter()
61                .filter(Result::is_err)
62                .map(Result::unwrap_err)
63                .fold(
64                    eyre::eyre!(
65                        "Node '{}' (uuid: {}) encountered multiple errors",
66                        self.source.label,
67                        self.source.uuid
68                    ),
69                    |report, e| e.wrap_err(report),
70                );
71            Err(combined_report)
72        }
73    }
74
75    /// Send a message asynchronously to all connected nodes.
76    pub async fn send(&self, data: ArrayData) -> Result<()> {
77        let data = DataflowMessage {
78            header: Header {
79                timestamp: self.clock.new_timestamp(),
80                source: (self.source.uuid, self.layout.uuid),
81            },
82            data,
83        };
84
85        let mut tasks = Vec::new();
86
87        for tx in &self.tx {
88            let tx = tx.clone();
89            let data = data.clone();
90
91            let source = self.source.clone();
92            let layout = self.layout.clone();
93
94            tasks.push(tokio::spawn(async move {
95                tx.send(data)
96                    .await
97                    .map_err(eyre::Report::msg)
98                    .wrap_err(report_error_sending(&source, layout))
99            }));
100        }
101
102        let mut results = Vec::new();
103        for task in tasks {
104            match task.await {
105                Ok(result) => results.push(result),
106                Err(err) => results.push(Err(err.into())),
107            }
108        }
109
110        if results.iter().all(|r| r.is_ok()) {
111            Ok(())
112        } else {
113            let combined_report: eyre::Report = results
114                .into_iter()
115                .filter(Result::is_err)
116                .map(Result::unwrap_err)
117                .fold(
118                    eyre::eyre!(
119                        "Node '{}' (uuid: {}) encountered multiple errors",
120                        self.source.label,
121                        self.source.uuid
122                    ),
123                    |report, e| e.wrap_err(report),
124                );
125
126            Err(combined_report)
127        }
128    }
129}