1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
//! On-the-fly modification of the pipeline.
use crate::pipeline::control::messages::RequestMessage;
use crate::pipeline::error::PipelineError;
use crate::pipeline::elements::{output, source, transform};
use anyhow::anyhow;
use tokio::runtime;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use super::messages::SpecificBody;
use super::{messages, AnonymousControlHandle};
/// Encapsulates sources, transforms and outputs control.
pub(crate) struct PipelineControl {
sources: source::control::SourceControl,
transforms: transform::control::TransformControl,
outputs: output::control::OutputControl,
}
impl PipelineControl {
pub fn new(
sources: source::control::SourceControl,
transforms: transform::control::TransformControl,
outputs: output::control::OutputControl,
) -> Self {
Self {
sources,
transforms,
outputs,
}
}
pub fn start(
self,
shutdown: CancellationToken,
finalize_shutdown: CancellationToken,
on: &runtime::Handle,
) -> (AnonymousControlHandle, JoinHandle<Result<(), PipelineError>>) {
let (tx, rx) = mpsc::channel(256);
let task = self.run(shutdown.clone(), finalize_shutdown, rx);
let control_handle = AnonymousControlHandle {
tx,
shutdown_token: shutdown,
};
let task_handle = on.spawn(task);
(control_handle, task_handle)
}
async fn handle_specific_msg(&mut self, body: SpecificBody) -> anyhow::Result<()> {
match body {
messages::SpecificBody::Source(msg) => self.sources.handle_message(msg).await,
messages::SpecificBody::Transform(msg) => self.transforms.handle_message(msg),
messages::SpecificBody::Output(msg) => self.outputs.handle_message(msg).await,
}
}
async fn handle_message(&mut self, msg: messages::ControlRequest) -> Result<(), PipelineError> {
/// Responds to a message with a value of type `Result<R, PipelineError>`.
fn send_response<R>(
result: Result<R, PipelineError>,
response_tx: Option<messages::ResponseSender<R>>,
) -> Result<(), PipelineError> {
match response_tx {
Some(tx) => tx
.send(result)
.map_err(|_| PipelineError::internal(anyhow!("failed to send control response"))),
None => {
// those who has sent the message does not care about the response, discard it
result.map(|_| ())
}
}
}
// ControlRequest uses variants for each response type.
match msg {
messages::ControlRequest::NoResult(RequestMessage { response_tx, body }) => {
let result = match body {
messages::EmptyResponseBody::Single(msg) => self.handle_specific_msg(msg).await,
messages::EmptyResponseBody::Mixed(messages) => {
// TODO handle multiple errors in a smarter way
// "atomic transactions" with rollback?
for msg in messages {
self.handle_specific_msg(msg).await?
}
Ok(())
}
};
send_response(result.map_err(PipelineError::internal), response_tx)
}
messages::ControlRequest::Introspect(RequestMessage { response_tx, body }) => {
let result = match body {
messages::IntrospectionBody::ListElements(filter) => {
let mut buf = Vec::new();
self.sources.list_elements(&mut buf, &filter);
self.transforms.list_elements(&mut buf, &filter);
self.outputs.list_elements(&mut buf, &filter);
Ok(buf)
}
};
send_response(result, response_tx)
}
}
}
/// Main control loop of the measurement pipeline.
///
/// The role of this function is to "oversee" the operation of the pipeline by:
/// - checking if the pipeline should be shut down
/// - receiving control messages and forwarding them to the appropriate handling code
/// - polling the async tasks to cleanup the tasks that have finished
///
/// When the pipeline is requested to shut down, `run` exits from the control loop and
/// waits for the elements to finish. This can take an arbitrarily long time to complete
/// (e.g. because of a bug in an element), therefore `run` should be wrapped in
/// [`tokio::time::timeout`];
async fn run(
mut self,
init_shutdown: CancellationToken,
finalize_shutdown: CancellationToken,
mut rx: messages::Receiver,
) -> Result<(), PipelineError> {
fn task_finished(
res: Result<Result<(), PipelineError>, tokio::task::JoinError>,
kind: &'static str,
result: &mut Result<(), PipelineError>,
) {
match res {
Ok(Ok(())) => log::debug!("One {kind} task finished without error."),
Ok(Err(e_normal)) => {
log::error!("One {kind} task finished with error: {e_normal}");
*result = Err(e_normal);
}
Err(e) if e.is_cancelled() => {
log::error!("{kind} cancelled: {e:?}");
*result = Err(PipelineError::internal(e.into()));
}
Err(e_panic) => {
log::error!("One {kind} task panicked with error: {e_panic:?}");
*result = Err(PipelineError::internal(e_panic.into()));
}
}
}
// Keep track of the most recent error, so we can propagate it to the agent.
// It is particularily useful in tests, to assert that no error occurred.
let mut last_error: Result<(), PipelineError> = Result::Ok(());
loop {
tokio::select! {
_ = init_shutdown.cancelled() => {
// The main way to shutdown the pipeline is to cancel the `shutdown` token.
// Stop the control loop and shut every element down.
break;
},
_ = tokio::signal::ctrl_c() => {
// Another way to shutdown the pipeline is to send SIGTERM, usually with Ctrl+C.
// Tokio's ctrl_c() also handles Ctrl+C on Windows.
log::info!("Ctrl+C received, shutting down...");
// The token can have child tokens, therefore we need to cancel it instead of simply breaking.
init_shutdown.cancel();
},
message = rx.recv() => {
// A control message has been received, or the channel has been closed (should not happen).
match message {
Some(msg) => {
log::trace!("handling {msg:?}");
if let Err(e) = self.handle_message(msg).await {
log::error!("error in message handling: {e:?}");
last_error = Err(e);
}
},
None => todo!("pipeline_control_loop#rx channel closed"),
}
},
// Below we asynchronously poll the source, transform and output tasks, in order to detect
// when one of them finishes before the entire pipeline is shut down.
//
// IMPORTANT: if a JoinSet is empty, `join_next_task` will immediately return
// `Poll::Ready(None)` when polled, which will cause an infinite loop.
//
// The solution is to NOT poll `join_next_task` if there is no task in the set.
// The condition `has_task` reads a single boolean variable, hence it's very cheap.
//
// Since the only way to add new tasks to the JoinSet is to send a control message,
// and this is handled by a separate branch, we are good.
// NOTE: if the above paragraph becomes untrue, another solution needs to be found.
//
// Example scenario:
// - loop, sources JoinSet empty => branch disabled, we only poll cancelled(), ctrl_c() and rx.recv()
// - we receive a message, add a source to the JoinSet
// - loop, sources JoinSet not empty => branch enabled
res = self.sources.join_next_task(), if self.sources.has_task() => {
task_finished(res, "source", &mut last_error);
},
res = self.transforms.join_next_task(), if self.transforms.has_task() => {
task_finished(res, "transform", &mut last_error);
}
res = self.outputs.join_next_task(), if self.outputs.has_task() => {
task_finished(res, "output", &mut last_error);
}
}
}
log::debug!("Pipeline control task shutting down...");
// Stop the elements, waiting for each step of the pipeline to finish before stopping the next one.
log::trace!("waiting for sources to finish");
self.sources
.shutdown(|res| task_finished(res, "source", &mut last_error))
.await;
log::trace!("waiting for transforms to finish");
self.transforms
.shutdown(|res| task_finished(res, "transform", &mut last_error))
.await;
log::trace!("waiting for outputs to finish");
self.outputs
.shutdown(|res| task_finished(res, "output", &mut last_error))
.await;
// Finalize the shutdown sequence by cancelling the remaining things.
finalize_shutdown.cancel();
last_error.map_err(|e| PipelineError::from(e))
}
}