strut_sync/conduit.rs
1use std::future::pending;
2use std::sync::Arc;
3use std::time::Duration;
4use tokio::select;
5use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify};
6
7/// A conduit for a simplified request-response communication between asynchronous
8/// tasks that allows an **owner task** to listen for requests for a resource `T` from
9/// any number of **requester tasks**, and to then asynchronously serve such requests.
10///
11/// The [`Conduit`] should belong to the owner task, as it allows to listen for and
12/// serve the requests. The conduit can spawn any number of linked [`Retriever`]s,
13/// which the requester tasks may use to asynchronously retrieve the resource `T`.
14///
15/// ## Error handling
16///
17/// The conduit-retriever pair implements a communication method between asynchronous
18/// tasks, so it is only natural that communication errors may arise. Suppose two
19/// tasks are communicating: one owns a conduit, and the other owns a linked retriever.
20/// Requester task then initiates retrieving of the resource `T` from the owner task.
21/// This process is not atomic:
22///
23/// - the requester task sends a request,
24/// - the owner task receives the request,
25/// - the owner task sends a response,
26/// - the requester task receives the response.
27///
28/// Anywhere in this process one of the two asynchronous tasks may stop existing.
29/// Furthermore, even if both tasks remain in existence, the owner task may choose
30/// to not listen to incoming requests, or to not serve them (e.g., there is an
31/// error producing the resource `T`).
32///
33/// This implementation chooses a largely error-less approach to dealing with these
34/// uncertainties:
35///
36/// 1. The requester task may choose to either
37/// [wait for the response indefinitely](Retriever::anticipate) or
38/// [to deal with lack of response](Retriever::request). Another option is to wait
39/// for a response [within a timeout](Retriever::request_with_timeout).
40/// 2. The owner task may choose whether to [listen to requests](Conduit::requested)
41/// or whether to send responses.
42///
43/// Any breakage of communication caused by either of the tasks exiting is consciously
44/// treated as a normal part of the application lifecycle: it does not cause panics
45/// and is not logged.
46///
47/// # Example
48///
49/// ```rust
50/// use strut_sync::Conduit;
51/// use strut_sync::Retriever;
52/// use tokio::sync::oneshot;
53///
54/// #[tokio::main]
55/// async fn main() {
56/// // Create a conduit and a related retriever
57/// let conduit: Conduit<String> = Conduit::new();
58/// let retriever: Retriever<String> = conduit.retriever();
59///
60/// // Spawn owner task
61/// let owner = tokio::spawn(async move {
62/// // Stand-in for a valuable resource
63/// let resource: String = "valuable_resource".to_string();
64///
65/// // Await a request
66/// let sender: oneshot::Sender<String> = conduit.requested().await;
67///
68/// // Send a response
69/// sender.send(resource).unwrap();
70/// });
71///
72/// // Spawn requester task
73/// let requester = tokio::spawn(async move {
74/// // Acquire the resource
75/// let acquired_resource: String = retriever.request().await.unwrap();
76///
77/// // Assert it is what is expected
78/// assert_eq!(&acquired_resource, "valuable_resource");
79/// });
80///
81/// requester.await.unwrap();
82/// owner.await.unwrap();
83/// }
84/// ```
85#[derive(Debug)]
86pub struct Conduit<T> {
87 listener: AsyncMutex<mpsc::Receiver<oneshot::Sender<T>>>,
88 requester_template: mpsc::Sender<oneshot::Sender<T>>,
89}
90
91/// Allows asynchronously retrieving the resource `T` from the owner of the
92/// linked [`Conduit`].
93///
94/// These retrievers are light-weight and may be freely cloned and passed
95/// between asynchronous tasks.
96#[derive(Debug, Clone)]
97pub struct Retriever<T> {
98 requester: mpsc::Sender<oneshot::Sender<T>>,
99}
100
101impl<T> Conduit<T> {
102 /// Creates and returns a new [`Conduit`] that may [spawn](Self::retriever)
103 /// any number of connected [`Retriever`]s. The retrievers may be used to
104 /// request the resource `T` from the owner of the conduit.
105 pub fn new() -> Self {
106 // There is no point buffering requests: we wait for response immediately
107 // after sending request
108 let (requester_template, listener): (
109 mpsc::Sender<oneshot::Sender<T>>,
110 mpsc::Receiver<oneshot::Sender<T>>,
111 ) = mpsc::channel(1);
112
113 Self {
114 listener: AsyncMutex::new(listener),
115 requester_template,
116 }
117 }
118
119 /// Spawns and returns a [`Retriever`] that is linked to this [`Conduit`] and
120 /// can be used to request the resource `T` from the conduit’s owner. The returned
121 /// retriever may be cloned and shared among multiple asynchronous tasks.
122 pub fn retriever(&self) -> Retriever<T> {
123 Retriever {
124 requester: self.requester_template.clone(),
125 }
126 }
127
128 /// Waits until the resource `T` is requested from any of the connected
129 /// [`Retriever`]s. Upon first such request, returns the one-off sender through
130 /// which the resource `T` should be sent back to whoever requested it.
131 ///
132 /// This method should be repeatedly awaited on by the asynchronous task that
133 /// owns the resource `T`. Only one asynchronous task may listen for and
134 /// serve requests at any given moment, which is the limitation that comes
135 /// from the nature of the `mpsc` channels (see [`mpsc::Receiver::recv`]).
136 ///
137 /// ## Return type
138 ///
139 /// It is notable that this method returns a [`oneshot::Sender`], not an [`Option`]
140 /// of it.
141 ///
142 /// Technically, it is possible to receive [`None`] from [`mpsc::Receiver::recv`].
143 /// This happens iff there are no buffered messages in the `mpsc` channel **and**
144 /// the `mpsc` channel is closed. The `mpsc` channel is closed when either all
145 /// senders are dropped, or when [`mpsc::Receiver::close`] is called.
146 ///
147 /// This conduit owns at least one copy of [`mpsc::Sender`] (`requester_template`),
148 /// so dropping all senders without also dropping this conduit is not possible.
149 ///
150 /// Then, this conduit does not call [`close`](mpsc::Receiver::close) and also
151 /// does not expose ways to call it externally.
152 ///
153 /// Thus, this method takes a calculated risk of unwrapping the [`Option`] before
154 /// returning.
155 pub async fn requested(&self) -> oneshot::Sender<T> {
156 let mut listener = self.listener.lock().await;
157
158 listener.recv().await.expect(concat!(
159 "it should not be possible for the mpsc channel of this conduit to be",
160 " closed while this conduit still exists: this conduit owns both the",
161 " receiver and at least one sender, and also precludes calling `close`",
162 " on the receiver",
163 ))
164 }
165}
166
167impl<T> Retriever<T> {
168 /// Requests and retrieves the resource `T` from the owner of the linked
169 /// [`Conduit`]. Waits for the response potentially indefinitely. For example,
170 /// if the linked conduit no longer exists, this method will never return.
171 ///
172 /// ## Use cases
173 ///
174 /// This method is useful when the requester of `T` is logically unable to proceed
175 /// without it, and when it can be expected that the owner of the linked conduit
176 /// has a good reason to not return a result.
177 ///
178 /// Examples may include the owner of the database connection struggling to
179 /// establish a connection because the remote server has gone away. Another
180 /// example would be the application entering the spindown phase before exiting,
181 /// prompting the conduit’s owner to stop listening for requests.
182 ///
183 /// In such cases, this method exerts useful backpressure that prevents unwanted
184 /// processing.
185 pub async fn anticipate(&self) -> T {
186 // Happy path: request and return
187 if let Some(value) = self.request().await {
188 return value;
189 }
190
191 // If we are here, there is no longer any hope of receiving the answer,
192 // but the caller accepted the risk of waiting forever.
193
194 // Wait forever (it is up to the caller to deal with this)
195 pending::<()>().await;
196
197 // We’ll never return in this case
198 unreachable!()
199 }
200
201 /// Requests and retrieves the resource `T` from the owner of the linked
202 /// [`Conduit`]. If any communication failure occurs (such as the linked
203 /// conduit no longer exists, or the request is dropped without responding),
204 /// this method returns [`None`].
205 ///
206 /// Note that if the linked conduit still hangs on to incoming requests
207 /// without ever responding to them, this method may still wait indefinitely.
208 /// [Request with a timeout](Retriever::request_with_timeout) if necessary.
209 pub async fn request(&self) -> Option<T> {
210 // Make a one-off channel for the owner task to send the resource `T` into
211 let (oneshot_sender, oneshot_receiver) = oneshot::channel();
212
213 // Send the request and silently give up if the linked conduit doesn’t exist anymore
214 if self.requester.send(oneshot_sender).await.is_err() {
215 return None;
216 }
217
218 // Return the result or nothing in case of error
219 oneshot_receiver.await.ok()
220 }
221
222 /// Performs a [normal request](Retriever::request), but within the given
223 /// timeout. If the request is not served in time, [`None`] is returned, and
224 /// the request is dropped.
225 pub async fn request_with_timeout(&self, timeout: Duration) -> Option<T> {
226 // Create a notification mechanism for the timeout
227 let notify_in = Arc::new(Notify::new());
228 let notify_out = Arc::clone(¬ify_in);
229
230 // Start the spindown timeout
231 tokio::spawn(async move {
232 tokio::time::sleep(timeout).await;
233 notify_in.notify_one();
234 });
235
236 // Send the request, and wait for the response or for the timeout
237 select! {
238 biased;
239 response = self.request() => response,
240 _ = notify_out.notified() => None,
241 }
242 }
243}
244
245impl<T> From<&Retriever<T>> for Retriever<T>
246where
247 T: Clone,
248{
249 fn from(value: &Retriever<T>) -> Self {
250 value.clone()
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use futures::FutureExt;
258 use pretty_assertions::assert_eq;
259 use std::panic::AssertUnwindSafe;
260 use tokio::task;
261
262 #[tokio::test]
263 async fn simple_request_response() {
264 // Given
265 let conduit = Conduit::new();
266 let retriever = conduit.retriever();
267
268 // When
269 let owner_task = task::spawn(async move {
270 for i in 0..2 {
271 let request = conduit.requested().await;
272 request.send(format!("response_{}", i)).unwrap();
273 }
274 });
275
276 // When
277 let requested_value = retriever.request().await;
278 let anticipated_value = retriever.anticipate().await;
279
280 // Then
281 assert_eq!(requested_value.unwrap(), "response_0");
282 assert_eq!(anticipated_value, "response_1");
283 assert!(owner_task.await.is_ok());
284 }
285
286 #[tokio::test]
287 async fn simple_request_with_timeout() {
288 // Given
289 let conduit = Conduit::new();
290 let retriever = conduit.retriever();
291
292 // When
293 let owner_task = task::spawn(async move {
294 let request = conduit.requested().await;
295 tokio::time::sleep(Duration::from_millis(20)).await;
296 request.send("response").unwrap();
297 });
298
299 // When
300 let requested_value = retriever
301 .request_with_timeout(Duration::from_millis(10))
302 .await;
303
304 // Then
305 assert_eq!(requested_value, None);
306 assert!(owner_task.await.is_err()); // owner errors out because the request is dropped
307 }
308
309 #[tokio::test]
310 async fn multiple_requests() {
311 // Given
312 let conduit = Conduit::new();
313 let retriever = conduit.retriever();
314
315 // When
316 let owner_task = task::spawn(async move {
317 for _ in 0..5 {
318 let request = conduit.requested().await;
319 request.send("response").unwrap();
320 }
321 });
322
323 // Then
324 let mut requester_tasks = vec![];
325 for _ in 0..5 {
326 let retriever = retriever.clone();
327 let task = task::spawn(async move {
328 let result = retriever.request().await;
329 assert_eq!(result.unwrap(), "response");
330 });
331 requester_tasks.push(task);
332 }
333
334 // Then
335 for requester_task in requester_tasks {
336 assert!(requester_task.await.is_ok());
337 }
338 assert!(owner_task.await.is_ok());
339 }
340
341 #[tokio::test]
342 async fn multiple_requests_in_order() {
343 // Given
344 let conduit = Conduit::new();
345 let retriever = conduit.retriever();
346
347 // When
348 let owner_task = task::spawn(async move {
349 for scheduled_payload in 0..5 {
350 let request = conduit.requested().await;
351 request.send(scheduled_payload).unwrap();
352 }
353 });
354
355 // Then
356 let requester_task = task::spawn(async move {
357 for expected_payload in 0..5 {
358 let result = retriever.request().await;
359 assert_eq!(result.unwrap(), expected_payload);
360 }
361 });
362
363 // Then
364 assert!(requester_task.await.is_ok());
365 assert!(owner_task.await.is_ok());
366 }
367
368 #[tokio::test]
369 async fn retriever_request_send_error() {
370 // Given
371 let (requester_template, mut listener): (
372 mpsc::Sender<oneshot::Sender<usize>>,
373 mpsc::Receiver<oneshot::Sender<usize>>,
374 ) = mpsc::channel(1);
375 listener.close();
376
377 // Given
378 let retriever = Retriever {
379 requester: requester_template,
380 };
381
382 // When
383 let result = retriever.request().await;
384
385 // Then
386 assert!(result.is_none());
387 }
388
389 #[tokio::test]
390 async fn oneshot_sender_dropped() {
391 // Given
392 let conduit: Conduit<usize> = Conduit::new();
393 let retriever = conduit.retriever();
394
395 // When
396 let owner_task = task::spawn(async move {
397 // Intentionally drop the oneshot sender without sending a response
398 let _request = conduit.requested().await;
399 });
400 let result = retriever.request().await;
401
402 // Then
403 assert!(result.is_none());
404 assert!(owner_task.await.is_ok());
405 }
406
407 #[tokio::test]
408 async fn conduit_requested_none() {
409 // Given
410 let (requester_template, mut listener): (
411 mpsc::Sender<oneshot::Sender<usize>>,
412 mpsc::Receiver<oneshot::Sender<usize>>,
413 ) = mpsc::channel(1);
414 listener.close();
415
416 let conduit = Conduit {
417 listener: AsyncMutex::new(listener),
418 requester_template,
419 };
420
421 // When
422 let result = AssertUnwindSafe(async {
423 conduit.requested().await;
424 });
425 let result = result.catch_unwind().await;
426
427 assert!(result.is_err());
428 }
429}