1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
use std::time::Duration;
use thiserror::Error;
use tokio::{
sync::mpsc::error::{SendTimeoutError, TrySendError},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
use crate::pipeline::{error::PipelineError, naming::PluginName};
use super::{messages, request};
/// A control handle that is not tied to a particular plugin.
///
/// Unlike [`ScopedControlHandle`], `AnonymousControlHandle` does not provide any method
/// that register new pipeline elements. You can call [`AnonymousControlHandle::scoped`] to turn an anonymous handle
/// into a scoped one.
#[derive(Clone)]
pub struct AnonymousControlHandle {
pub(super) tx: messages::Sender,
pub(super) shutdown_token: CancellationToken,
}
#[derive(Clone)]
pub struct PluginControlHandle {
pub(super) inner: AnonymousControlHandle,
pub(super) plugin: PluginName,
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DispatchError {
/// The pipeline controller was not available.
/// This happens when the pipeline is shut down before dispatching the request.
#[error("dispatch failed: pipeline controller not available")]
NotAvailable,
/// The deadline has expired.
#[error("dispatch failed: timeout expired")]
Timeout,
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SendWaitError {
/// The pipeline controlled was not available.
/// This happens when the pipeline is shut down before processing the request.
#[error("send_wait failed: pipeline controller not available")]
NotAvailable,
/// The deadline has expired.
#[error("send_wait failed: timeout expired")]
Timeout,
/// The request was processed by the pipeline controller, but it returned an error.
///
/// This does not always mean that the entire operation failed.
/// It could be a partial failure. For instance, if your requested the creation of
/// multiple elements, some of them may have been created successfully while others
/// have failed.
#[error("send_wait failed: processing the request returned an error")]
Operation(#[source] PipelineError),
}
pub enum OnBackgroundError {
Log,
}
impl AnonymousControlHandle {
pub fn with_plugin(self, plugin: PluginName) -> PluginControlHandle {
PluginControlHandle { inner: self, plugin }
}
/// Shuts the pipeline down.
pub fn shutdown(&self) {
self.shutdown_token.cancel();
}
/// Sends a control request to the pipeline, without waiting for a response.
///
/// # Errors
/// If the pipeline has been shut down, returns a `NotAvailable` error.
#[allow(private_bounds)] // intended: only us should be able to implement request traits
pub async fn dispatch(
&self,
request: impl request::AnonymousControlRequest,
timeout: impl Into<Option<Duration>>,
) -> Result<(), DispatchError> {
self.impl_dispatch(request.serialize(), timeout.into()).await
}
/// Sends a control request to the pipeline, and waits for a response.
///
/// Unlike [`dispatch`], `send_wait` waits for the request to be processed
/// by the pipeline and returns its result.
///
/// # Errors
/// If the pipeline is shut down before the request is processed, the function
/// returns a `NotAvailable` error.
#[allow(private_bounds)]
pub async fn send_wait<R>(
&self,
request: impl request::AnonymousControlRequest<OkResponse = R>,
timeout: impl Into<Option<Duration>>,
) -> Result<R, SendWaitError> {
let (msg, rx) = request.serialize_with_response();
self.impl_send_wait(msg, rx, timeout.into()).await
}
/// Sends a request without waiting for a response and without blocking.
///
/// If the request cannot be sent immediately, spawns a background task on the current Tokio runtime.
/// Background task failures are handled according to the `on_error` strategy.
///
/// # Errors
/// If the pipeline has been shut down, returns a `NotAvailable` error.
///
/// # Panics
/// Panics if not called in the context of a Tokio runtime.
#[allow(private_bounds)]
pub fn dispatch_in_current_runtime(
&self,
request: impl request::AnonymousControlRequest,
timeout: impl Into<Option<Duration>>,
on_error: OnBackgroundError,
) -> Result<(), DispatchError> {
let _ = on_error; // there is only one strategy, it's hardcoded in impl
self.impl_dispatch_in_current_runtime(request.serialize(), timeout.into())
.map(|_| ())
}
async fn impl_dispatch(
&self,
msg: messages::ControlRequest,
timeout: Option<Duration>,
) -> Result<(), DispatchError> {
match timeout {
Some(timeout) => self.tx.send_timeout(msg, timeout).await.map_err(|e| match e {
SendTimeoutError::Timeout(_) => DispatchError::Timeout,
SendTimeoutError::Closed(_) => DispatchError::NotAvailable,
}),
None => self.tx.send(msg).await.map_err(|_| DispatchError::NotAvailable),
}
}
async fn impl_send_wait<R>(
&self,
msg: messages::ControlRequest,
rx: impl request::ResponseReceiver<Ok = R>,
timeout: Option<Duration>,
) -> Result<R, SendWaitError> {
// send the message
match timeout {
Some(timeout) => self.tx.send_timeout(msg, timeout).await.map_err(|e| match e {
SendTimeoutError::Timeout(_) => SendWaitError::Timeout,
SendTimeoutError::Closed(_) => SendWaitError::NotAvailable,
}),
None => self.tx.send(msg).await.map_err(|_| SendWaitError::NotAvailable),
}?;
// wait for a response
match rx.recv().await {
Ok(result) => match result {
Ok(ret) => Ok(ret),
Err(err) => Err(SendWaitError::Operation(err)),
},
Err(_recv_error) => Err(SendWaitError::NotAvailable),
}
}
fn impl_dispatch_in_current_runtime(
&self,
msg: messages::ControlRequest,
timeout: Option<Duration>,
) -> Result<Option<JoinHandle<Result<(), DispatchError>>>, DispatchError> {
// get the handle to the current runtime
let current = tokio::runtime::Handle::try_current()
.expect("dispatch_in_current_runtime must be called within a Tokio runtime. If you are not in a thread that is managed by Alumet, a potential solution is to create a runtime yourself.");
// attempt to send the message right now
match self.tx.try_send(msg) {
Ok(()) => Ok(None),
Err(TrySendError::Closed(_)) => Err(DispatchError::NotAvailable),
Err(TrySendError::Full(msg)) => {
// the message queue is full, we need to wait in an async task
let control_handle = self.clone();
let task_handle = current.spawn(async move {
let res = control_handle.impl_dispatch(msg, timeout).await;
if let Err(e) = &res {
log::error!("dispacth failed in background: {e:?}");
}
res
});
Ok(Some(task_handle))
}
}
}
}
impl PluginControlHandle {
pub fn anonymous(self) -> AnonymousControlHandle {
self.inner
}
/// Sends a control request to the pipeline, without waiting for a response.
///
/// # Errors
/// If the pipeline has been shut down, returns a `NotAvailable` error.
#[allow(private_bounds)]
pub async fn dispatch(
&self,
request: impl request::PluginControlRequest,
timeout: impl Into<Option<Duration>>,
) -> Result<(), DispatchError> {
let body = request.serialize(&self.plugin);
self.inner.impl_dispatch(body, timeout.into()).await
}
/// Sends a control request to the pipeline, and waits for a response.
///
/// Unlike [`dispatch`], `send_wait` waits for the request to be processed
/// by the pipeline and returns its result.
///
/// # Errors
/// If the pipeline is shut down before the request is processed, the function
/// returns a `NotAvailable` error.
#[allow(private_bounds)]
pub async fn send_wait<R>(
&self,
request: impl request::PluginControlRequest<OkResponse = R>,
timeout: impl Into<Option<Duration>>,
) -> Result<R, SendWaitError> {
let (msg, rx) = request.serialize_with_response(&self.plugin);
self.inner.impl_send_wait(msg, rx, timeout.into()).await
}
/// Shuts the pipeline down.
pub fn shutdown(&self) {
self.inner.shutdown();
}
/// Sends a request without waiting for a response and without blocking.
///
/// If the request cannot be sent immediately, spawns a background task on the current Tokio runtime.
/// Background task failures are handled according to the `on_error` strategy.
///
/// # Errors
/// If the pipeline has been shut down, returns a `NotAvailable` error.
///
/// # Panics
/// Panics if not called in the context of a Tokio runtime.
#[allow(private_bounds)]
pub fn dispatch_in_current_runtime(
&self,
request: impl request::PluginControlRequest,
timeout: impl Into<Option<Duration>>,
on_error: OnBackgroundError,
) -> Result<(), DispatchError> {
let _ = on_error;
let request = request.serialize(&self.plugin);
self.inner
.impl_dispatch_in_current_runtime(request, timeout.into())
.map(|_| ())
}
}
#[cfg(test)]
mod tests {
use crate::pipeline::util::assert_send;
use super::{AnonymousControlHandle, PluginControlHandle};
#[test]
fn types() {
assert_send::<AnonymousControlHandle>();
assert_send::<PluginControlHandle>();
}
}