iridis_node/primitives/
raw_output.rs

1//! This module contains implementations for this primitive.
2
3use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7/// Not typed Output to receive data from the dataflow
8pub struct RawOutput {
9    /// The senders parts of MPSC channels
10    pub tx: Vec<MessageSender>,
11    /// The shared clock of the runtime
12    pub clock: Arc<HLC>,
13
14    /// The source node layout, useful for debugging
15    pub source: NodeID,
16    /// The layout of the output, useful for debugging
17    pub layout: OutputID,
18}
19
20impl RawOutput {
21    /// Create a new RawOutput instance
22    pub fn new(tx: Vec<MessageSender>, clock: Arc<HLC>, source: NodeID, layout: OutputID) -> Self {
23        Self {
24            tx,
25            clock,
26            source,
27            layout,
28        }
29    }
30
31    /// Send a message asynchronously to all connected nodes.
32    pub async fn send(&self, data: ArrayData) -> Result<()> {
33        let data = DataflowMessage {
34            header: Header {
35                timestamp: self.clock.new_timestamp(),
36                source: (self.source.uuid, self.layout.uuid),
37            },
38            data,
39        };
40
41        let mut tasks = Vec::new();
42
43        for tx in &self.tx {
44            let tx = tx.clone();
45            let data = data.clone();
46
47            let source = self.source.clone();
48            let layout = self.layout.clone();
49
50            tasks.push(tokio::spawn(async move {
51                tx.send(data)
52                    .await
53                    .map_err(eyre::Report::msg)
54                    .wrap_err(report_error_sending(&source, layout))
55            }));
56        }
57
58        let mut results = Vec::new();
59        for task in tasks {
60            match task.await {
61                Ok(result) => results.push(result),
62                Err(err) => results.push(Err(err.into())),
63            }
64        }
65
66        if results.iter().all(|r| r.is_ok()) {
67            Ok(())
68        } else {
69            let combined_report: eyre::Report = results
70                .into_iter()
71                .filter(Result::is_err)
72                .map(Result::unwrap_err)
73                .fold(
74                    eyre::eyre!(
75                        "Node '{}' (uuid: {}) encountered multiple errors",
76                        self.source.label,
77                        self.source.uuid
78                    ),
79                    |report, e| e.wrap_err(report),
80                );
81
82            Err(combined_report)
83        }
84    }
85}