iridis_node/primitives/
raw_output.rs1use std::sync::Arc;
4
5use crate::prelude::{thirdparty::arrow_data::ArrayData, *};
6
7pub struct RawOutput {
9 pub tx: Vec<MessageSender>,
11 pub clock: Arc<HLC>,
13
14 pub source: NodeID,
16 pub layout: OutputID,
18}
19
20impl RawOutput {
21 pub fn new(tx: Vec<MessageSender>, clock: Arc<HLC>, source: NodeID, layout: OutputID) -> Self {
23 Self {
24 tx,
25 clock,
26 source,
27 layout,
28 }
29 }
30
31 pub async fn send(&self, data: ArrayData) -> Result<()> {
33 let data = DataflowMessage {
34 header: Header {
35 timestamp: self.clock.new_timestamp(),
36 source: (self.source.uuid, self.layout.uuid),
37 },
38 data,
39 };
40
41 let mut tasks = Vec::new();
42
43 for tx in &self.tx {
44 let tx = tx.clone();
45 let data = data.clone();
46
47 let source = self.source.clone();
48 let layout = self.layout.clone();
49
50 tasks.push(tokio::spawn(async move {
51 tx.send(data)
52 .await
53 .map_err(eyre::Report::msg)
54 .wrap_err(report_error_sending(&source, layout))
55 }));
56 }
57
58 let mut results = Vec::new();
59 for task in tasks {
60 match task.await {
61 Ok(result) => results.push(result),
62 Err(err) => results.push(Err(err.into())),
63 }
64 }
65
66 if results.iter().all(|r| r.is_ok()) {
67 Ok(())
68 } else {
69 let combined_report: eyre::Report = results
70 .into_iter()
71 .filter(Result::is_err)
72 .map(Result::unwrap_err)
73 .fold(
74 eyre::eyre!(
75 "Node '{}' (uuid: {}) encountered multiple errors",
76 self.source.label,
77 self.source.uuid
78 ),
79 |report, e| e.wrap_err(report),
80 );
81
82 Err(combined_report)
83 }
84 }
85}