flarrow_api/io/
raw_output.rs1use std::sync::Arc;
2
3use crate::prelude::*;
4use thirdparty::arrow_data::ArrayData;
5
6pub struct RawOutput {
8 pub tx: Vec<MessageSender>,
10 pub clock: Arc<HLC>,
12
13 pub source: NodeLayout,
15 pub layout: OutputLayout,
17}
18
19impl RawOutput {
20 pub fn new(
22 tx: Vec<MessageSender>,
23 clock: Arc<HLC>,
24 source: NodeLayout,
25 layout: OutputLayout,
26 ) -> Self {
27 Self {
28 tx,
29 clock,
30 source,
31 layout,
32 }
33 }
34
35 pub fn blocking_send(&self, data: ArrayData) -> Result<()> {
38 let data = DataflowMessage {
39 header: Header {
40 timestamp: self.clock.new_timestamp(),
41 source: (self.source.uuid, self.layout.uuid),
42 },
43 data,
44 };
45
46 let results: Vec<Result<()>> = self
47 .tx
48 .iter()
49 .map(|tx| {
50 tx.blocking_send(data.clone())
51 .map_err(eyre::Report::msg)
52 .wrap_err(report_error_sending(&self.source, &self.layout))
53 })
54 .collect();
55
56 if results.iter().all(|r| r.is_ok()) {
57 Ok(())
58 } else {
59 let combined_report: eyre::Report = results
60 .into_iter()
61 .filter(Result::is_err)
62 .map(Result::unwrap_err)
63 .fold(
64 eyre::eyre!(
65 "Node '{}' (uuid: {}) encountered multiple errors",
66 self.source.label,
67 self.source.uuid
68 ),
69 |report, e| e.wrap_err(report),
70 );
71 Err(combined_report)
72 }
73 }
74
75 pub async fn send(&self, data: ArrayData) -> Result<()> {
77 let data = DataflowMessage {
78 header: Header {
79 timestamp: self.clock.new_timestamp(),
80 source: (self.source.uuid, self.layout.uuid),
81 },
82 data,
83 };
84
85 let mut tasks = Vec::new();
86
87 for tx in &self.tx {
88 let tx = tx.clone();
89 let data = data.clone();
90
91 let source = self.source.clone();
92 let layout = self.layout.clone();
93
94 tasks.push(tokio::spawn(async move {
95 tx.send(data)
96 .await
97 .map_err(eyre::Report::msg)
98 .wrap_err(report_error_sending(&source, layout))
99 }));
100 }
101
102 let mut results = Vec::new();
103 for task in tasks {
104 match task.await {
105 Ok(result) => results.push(result),
106 Err(err) => results.push(Err(err.into())),
107 }
108 }
109
110 if results.iter().all(|r| r.is_ok()) {
111 Ok(())
112 } else {
113 let combined_report: eyre::Report = results
114 .into_iter()
115 .filter(Result::is_err)
116 .map(Result::unwrap_err)
117 .fold(
118 eyre::eyre!(
119 "Node '{}' (uuid: {}) encountered multiple errors",
120 self.source.label,
121 self.source.uuid
122 ),
123 |report, e| e.wrap_err(report),
124 );
125
126 Err(combined_report)
127 }
128 }
129}