iridis_api/io/
raw_output.rs1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawOutput {
8 pub tx: Vec<MessageSender>,
10 pub clock: Arc<HLC>,
12
13 pub source: NodeLayout,
15 pub layout: OutputLayout,
17}
18
19impl RawOutput {
20 pub fn new(
22 tx: Vec<MessageSender>,
23 clock: Arc<HLC>,
24 source: NodeLayout,
25 layout: OutputLayout,
26 ) -> Self {
27 Self {
28 tx,
29 clock,
30 source,
31 layout,
32 }
33 }
34
35 pub async fn send(&self, data: ArrayData) -> Result<()> {
37 let data = DataflowMessage {
38 header: Header {
39 timestamp: self.clock.new_timestamp(),
40 source: (self.source.uuid, self.layout.uuid),
41 },
42 data,
43 };
44
45 let mut tasks = Vec::new();
46
47 for tx in &self.tx {
48 let tx = tx.clone();
49 let data = data.clone();
50
51 let source = self.source.clone();
52 let layout = self.layout.clone();
53
54 tasks.push(tokio::spawn(async move {
55 tx.send(data)
56 .await
57 .map_err(eyre::Report::msg)
58 .wrap_err(report_error_sending(&source, layout))
59 }));
60 }
61
62 let mut results = Vec::new();
63 for task in tasks {
64 match task.await {
65 Ok(result) => results.push(result),
66 Err(err) => results.push(Err(err.into())),
67 }
68 }
69
70 if results.iter().all(|r| r.is_ok()) {
71 Ok(())
72 } else {
73 let combined_report: eyre::Report = results
74 .into_iter()
75 .filter(Result::is_err)
76 .map(Result::unwrap_err)
77 .fold(
78 eyre::eyre!(
79 "Node '{}' (uuid: {}) encountered multiple errors",
80 self.source.label,
81 self.source.uuid
82 ),
83 |report, e| e.wrap_err(report),
84 );
85
86 Err(combined_report)
87 }
88 }
89}