1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::sync::Arc;
use crate::prelude::*;
use thirdparty::arrow_data::ArrayData;
/// Not typed Output to receive data from the dataflow
pub struct RawOutput {
/// The senders parts of MPSC channels
pub tx: Vec<MessageSender>,
/// The shared clock of the runtime
pub clock: Arc<HLC>,
/// The source node layout, useful for debugging
pub source: NodeLayout,
/// The layout of the output, useful for debugging
pub layout: OutputLayout,
}
impl RawOutput {
/// Create a new RawOutput instance
pub fn new(
tx: Vec<MessageSender>,
clock: Arc<HLC>,
source: NodeLayout,
layout: OutputLayout,
) -> Self {
Self {
tx,
clock,
source,
layout,
}
}
/// Send a message from the channel, blocking until it's possible, don't use it
/// in async context
pub fn blocking_send(&self, data: ArrayData) -> Result<()> {
let data = DataflowMessage {
header: Header {
timestamp: self.clock.new_timestamp(),
source: (self.source.uuid, self.layout.uuid),
},
data,
};
let results: Vec<Result<()>> = self
.tx
.iter()
.map(|tx| {
tx.blocking_send(data.clone())
.map_err(eyre::Report::msg)
.wrap_err(report_error_sending(&self.source, &self.layout))
})
.collect();
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let combined_report: eyre::Report = results
.into_iter()
.filter(Result::is_err)
.map(Result::unwrap_err)
.fold(
eyre::eyre!(
"Node '{}' (uuid: {}) encountered multiple errors",
self.source.label,
self.source.uuid
),
|report, e| e.wrap_err(report),
);
Err(combined_report)
}
}
/// Send a message asynchronously to all connected nodes.
pub async fn send(&self, data: ArrayData) -> Result<()> {
let data = DataflowMessage {
header: Header {
timestamp: self.clock.new_timestamp(),
source: (self.source.uuid, self.layout.uuid),
},
data,
};
let mut tasks = Vec::new();
for tx in &self.tx {
let tx = tx.clone();
let data = data.clone();
let source = self.source.clone();
let layout = self.layout.clone();
tasks.push(tokio::spawn(async move {
tx.send(data)
.await
.map_err(eyre::Report::msg)
.wrap_err(report_error_sending(&source, layout))
}));
}
let mut results = Vec::new();
for task in tasks {
match task.await {
Ok(result) => results.push(result),
Err(err) => results.push(Err(err.into())),
}
}
if results.iter().all(|r| r.is_ok()) {
Ok(())
} else {
let combined_report: eyre::Report = results
.into_iter()
.filter(Result::is_err)
.map(Result::unwrap_err)
.fold(
eyre::eyre!(
"Node '{}' (uuid: {}) encountered multiple errors",
self.source.label,
self.source.uuid
),
|report, e| e.wrap_err(report),
);
Err(combined_report)
}
}
}