iridis_api/io/
raw_output.rs

1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6/// Not typed Output to receive data from the dataflow
7pub struct RawOutput {
8    /// The senders parts of MPSC channels
9    pub tx: Vec<MessageSender>,
10    /// The shared clock of the runtime
11    pub clock: Arc<HLC>,
12
13    /// The source node layout, useful for debugging
14    pub source: NodeLayout,
15    /// The layout of the output, useful for debugging
16    pub layout: OutputLayout,
17}
18
19impl RawOutput {
20    /// Create a new RawOutput instance
21    pub fn new(
22        tx: Vec<MessageSender>,
23        clock: Arc<HLC>,
24        source: NodeLayout,
25        layout: OutputLayout,
26    ) -> Self {
27        Self {
28            tx,
29            clock,
30            source,
31            layout,
32        }
33    }
34
35    /// Send a message asynchronously to all connected nodes.
36    pub async fn send(&self, data: ArrayData) -> Result<()> {
37        let data = DataflowMessage {
38            header: Header {
39                timestamp: self.clock.new_timestamp(),
40                source: (self.source.uuid, self.layout.uuid),
41            },
42            data,
43        };
44
45        let mut tasks = Vec::new();
46
47        for tx in &self.tx {
48            let tx = tx.clone();
49            let data = data.clone();
50
51            let source = self.source.clone();
52            let layout = self.layout.clone();
53
54            tasks.push(tokio::spawn(async move {
55                tx.send(data)
56                    .await
57                    .map_err(eyre::Report::msg)
58                    .wrap_err(report_error_sending(&source, layout))
59            }));
60        }
61
62        let mut results = Vec::new();
63        for task in tasks {
64            match task.await {
65                Ok(result) => results.push(result),
66                Err(err) => results.push(Err(err.into())),
67            }
68        }
69
70        if results.iter().all(|r| r.is_ok()) {
71            Ok(())
72        } else {
73            let combined_report: eyre::Report = results
74                .into_iter()
75                .filter(Result::is_err)
76                .map(Result::unwrap_err)
77                .fold(
78                    eyre::eyre!(
79                        "Node '{}' (uuid: {}) encountered multiple errors",
80                        self.source.label,
81                        self.source.uuid
82                    ),
83                    |report, e| e.wrap_err(report),
84                );
85
86            Err(combined_report)
87        }
88    }
89}