strut_sync/
conduit.rs

1use std::future::pending;
2use std::sync::Arc;
3use std::time::Duration;
4use tokio::select;
5use tokio::sync::{mpsc, oneshot, Mutex as AsyncMutex, Notify};
6
7/// A conduit for a simplified request-response communication between asynchronous
8/// tasks that allows an **owner task** to listen for requests for a resource `T` from
9/// any number of **requester tasks**, and to then asynchronously serve such requests.
10///
11/// The [`Conduit`] should belong to the owner task, as it allows to listen for and
12/// serve the requests. The conduit can spawn any number of linked [`Retriever`]s,
13/// which the requester tasks may use to asynchronously retrieve the resource `T`.
14///
15/// ## Error handling
16///
17/// The conduit-retriever pair implements a communication method between asynchronous
18/// tasks, so it is only natural that communication errors may arise. Suppose two
19/// tasks are communicating: one owns a conduit, and the other owns a linked retriever.
20/// Requester task then initiates retrieving of the resource `T` from the owner task.
21/// This process is not atomic:
22///
23/// - the requester task sends a request,
24/// - the owner task receives the request,
25/// - the owner task sends a response,
26/// - the requester task receives the response.
27///
28/// Anywhere in this process one of the two asynchronous tasks may stop existing.
29/// Furthermore, even if both tasks remain in existence, the owner task may choose
30/// to not listen to incoming requests, or to not serve them (e.g., there is an
31/// error producing the resource `T`).
32///
33/// This implementation chooses a largely error-less approach to dealing with these
34/// uncertainties:
35///
36/// 1. The requester task may choose to either
37/// [wait for the response indefinitely](Retriever::anticipate) or
38/// [to deal with lack of response](Retriever::request). Another option is to wait
39/// for a response [within a timeout](Retriever::request_with_timeout).
40/// 2. The owner task may choose whether to [listen to requests](Conduit::requested)
41/// or whether to send responses.
42///
43/// Any breakage of communication caused by either of the tasks exiting is consciously
44/// treated as a normal part of the application lifecycle: it does not cause panics
45/// and is not logged.
46///
47/// # Example
48///
49/// ```rust
50/// use strut_sync::Conduit;
51/// use strut_sync::Retriever;
52/// use tokio::sync::oneshot;
53///
54/// #[tokio::main]
55/// async fn main() {
56///     // Create a conduit and a related retriever
57///     let conduit: Conduit<String> = Conduit::new();
58///     let retriever: Retriever<String> = conduit.retriever();
59///
60///     // Spawn owner task
61///     let owner = tokio::spawn(async move {
62///         // Stand-in for a valuable resource
63///         let resource: String = "valuable_resource".to_string();
64///
65///         // Await a request
66///         let sender: oneshot::Sender<String> = conduit.requested().await;
67///
68///         // Send a response
69///         sender.send(resource).unwrap();
70///     });
71///
72///     // Spawn requester task
73///     let requester = tokio::spawn(async move {
74///         // Acquire the resource
75///         let acquired_resource: String = retriever.request().await.unwrap();
76///
77///         // Assert it is what is expected
78///         assert_eq!(&acquired_resource, "valuable_resource");
79///     });
80///
81///     requester.await.unwrap();
82///     owner.await.unwrap();
83/// }
84/// ```
85#[derive(Debug)]
86pub struct Conduit<T> {
87    listener: AsyncMutex<mpsc::Receiver<oneshot::Sender<T>>>,
88    requester_template: mpsc::Sender<oneshot::Sender<T>>,
89}
90
91/// Allows asynchronously retrieving the resource `T` from the owner of the
92/// linked [`Conduit`].
93///
94/// These retrievers are light-weight and may be freely cloned and passed
95/// between asynchronous tasks.
96#[derive(Debug, Clone)]
97pub struct Retriever<T> {
98    requester: mpsc::Sender<oneshot::Sender<T>>,
99}
100
101impl<T> Conduit<T> {
102    /// Creates and returns a new [`Conduit`] that may [spawn](Self::retriever)
103    /// any number of connected [`Retriever`]s. The retrievers may be used to
104    /// request the resource `T` from the owner of the conduit.
105    pub fn new() -> Self {
106        // There is no point buffering requests: we wait for response immediately
107        // after sending request
108        let (requester_template, listener): (
109            mpsc::Sender<oneshot::Sender<T>>,
110            mpsc::Receiver<oneshot::Sender<T>>,
111        ) = mpsc::channel(1);
112
113        Self {
114            listener: AsyncMutex::new(listener),
115            requester_template,
116        }
117    }
118
119    /// Spawns and returns a [`Retriever`] that is linked to this [`Conduit`] and
120    /// can be used to request the resource `T` from the conduit’s owner. The returned
121    /// retriever may be cloned and shared among multiple asynchronous tasks.
122    pub fn retriever(&self) -> Retriever<T> {
123        Retriever {
124            requester: self.requester_template.clone(),
125        }
126    }
127
128    /// Waits until the resource `T` is requested from any of the connected
129    /// [`Retriever`]s. Upon first such request, returns the one-off sender through
130    /// which the resource `T` should be sent back to whoever requested it.
131    ///
132    /// This method should be repeatedly awaited on by the asynchronous task that
133    /// owns the resource `T`. Only one asynchronous task may listen for and
134    /// serve requests at any given moment, which is the limitation that comes
135    /// from the nature of the `mpsc` channels (see [`mpsc::Receiver::recv`]).
136    ///
137    /// ## Return type
138    ///
139    /// It is notable that this method returns a [`oneshot::Sender`], not an [`Option`]
140    /// of it.
141    ///
142    /// Technically, it is possible to receive [`None`] from [`mpsc::Receiver::recv`].
143    /// This happens iff there are no buffered messages in the `mpsc` channel **and**
144    /// the `mpsc` channel is closed. The `mpsc` channel is closed when either all
145    /// senders are dropped, or when [`mpsc::Receiver::close`] is called.
146    ///
147    /// This conduit owns at least one copy of [`mpsc::Sender`] (`requester_template`),
148    /// so dropping all senders without also dropping this conduit is not possible.
149    ///
150    /// Then, this conduit does not call [`close`](mpsc::Receiver::close) and also
151    /// does not expose ways to call it externally.
152    ///
153    /// Thus, this method takes a calculated risk of unwrapping the [`Option`] before
154    /// returning.
155    pub async fn requested(&self) -> oneshot::Sender<T> {
156        let mut listener = self.listener.lock().await;
157
158        listener.recv().await.expect(concat!(
159            "it should not be possible for the mpsc channel of this conduit to be",
160            " closed while this conduit still exists: this conduit owns both the",
161            " receiver and at least one sender, and also precludes calling `close`",
162            " on the receiver",
163        ))
164    }
165}
166
167impl<T> Retriever<T> {
168    /// Requests and retrieves the resource `T` from the owner of the linked
169    /// [`Conduit`]. Waits for the response potentially indefinitely. For example,
170    /// if the linked conduit no longer exists, this method will never return.
171    ///
172    /// ## Use cases
173    ///
174    /// This method is useful when the requester of `T` is logically unable to proceed
175    /// without it, and when it can be expected that the owner of the linked conduit
176    /// has a good reason to not return a result.
177    ///
178    /// Examples may include the owner of the database connection struggling to
179    /// establish a connection because the remote server has gone away. Another
180    /// example would be the application entering the spindown phase before exiting,
181    /// prompting the conduit’s owner to stop listening for requests.
182    ///
183    /// In such cases, this method exerts useful backpressure that prevents unwanted
184    /// processing.
185    pub async fn anticipate(&self) -> T {
186        // Happy path: request and return
187        if let Some(value) = self.request().await {
188            return value;
189        }
190
191        // If we are here, there is no longer any hope of receiving the answer,
192        // but the caller accepted the risk of waiting forever.
193
194        // Wait forever (it is up to the caller to deal with this)
195        pending::<()>().await;
196
197        // We’ll never return in this case
198        unreachable!()
199    }
200
201    /// Requests and retrieves the resource `T` from the owner of the linked
202    /// [`Conduit`]. If any communication failure occurs (such as the linked
203    /// conduit no longer exists, or the request is dropped without responding),
204    /// this method returns [`None`].
205    ///
206    /// Note that if the linked conduit still hangs on to incoming requests
207    /// without ever responding to them, this method may still wait indefinitely.
208    /// [Request with a timeout](Retriever::request_with_timeout) if necessary.
209    pub async fn request(&self) -> Option<T> {
210        // Make a one-off channel for the owner task to send the resource `T` into
211        let (oneshot_sender, oneshot_receiver) = oneshot::channel();
212
213        // Send the request and silently give up if the linked conduit doesn’t exist anymore
214        if self.requester.send(oneshot_sender).await.is_err() {
215            return None;
216        }
217
218        // Return the result or nothing in case of error
219        oneshot_receiver.await.ok()
220    }
221
222    /// Performs a [normal request](Retriever::request), but within the given
223    /// timeout. If the request is not served in time, [`None`] is returned, and
224    /// the request is dropped.
225    pub async fn request_with_timeout(&self, timeout: Duration) -> Option<T> {
226        // Create a notification mechanism for the timeout
227        let notify_in = Arc::new(Notify::new());
228        let notify_out = Arc::clone(&notify_in);
229
230        // Start the spindown timeout
231        tokio::spawn(async move {
232            tokio::time::sleep(timeout).await;
233            notify_in.notify_one();
234        });
235
236        // Send the request, and wait for the response or for the timeout
237        select! {
238            biased;
239            response = self.request() => response,
240            _ = notify_out.notified() => None,
241        }
242    }
243}
244
245impl<T> From<&Retriever<T>> for Retriever<T>
246where
247    T: Clone,
248{
249    fn from(value: &Retriever<T>) -> Self {
250        value.clone()
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use futures::FutureExt;
258    use pretty_assertions::assert_eq;
259    use std::panic::AssertUnwindSafe;
260    use tokio::task;
261
262    #[tokio::test]
263    async fn simple_request_response() {
264        // Given
265        let conduit = Conduit::new();
266        let retriever = conduit.retriever();
267
268        // When
269        let owner_task = task::spawn(async move {
270            for i in 0..2 {
271                let request = conduit.requested().await;
272                request.send(format!("response_{}", i)).unwrap();
273            }
274        });
275
276        // When
277        let requested_value = retriever.request().await;
278        let anticipated_value = retriever.anticipate().await;
279
280        // Then
281        assert_eq!(requested_value.unwrap(), "response_0");
282        assert_eq!(anticipated_value, "response_1");
283        assert!(owner_task.await.is_ok());
284    }
285
286    #[tokio::test]
287    async fn simple_request_with_timeout() {
288        // Given
289        let conduit = Conduit::new();
290        let retriever = conduit.retriever();
291
292        // When
293        let owner_task = task::spawn(async move {
294            let request = conduit.requested().await;
295            tokio::time::sleep(Duration::from_millis(20)).await;
296            request.send("response").unwrap();
297        });
298
299        // When
300        let requested_value = retriever
301            .request_with_timeout(Duration::from_millis(10))
302            .await;
303
304        // Then
305        assert_eq!(requested_value, None);
306        assert!(owner_task.await.is_err()); // owner errors out because the request is dropped
307    }
308
309    #[tokio::test]
310    async fn multiple_requests() {
311        // Given
312        let conduit = Conduit::new();
313        let retriever = conduit.retriever();
314
315        // When
316        let owner_task = task::spawn(async move {
317            for _ in 0..5 {
318                let request = conduit.requested().await;
319                request.send("response").unwrap();
320            }
321        });
322
323        // Then
324        let mut requester_tasks = vec![];
325        for _ in 0..5 {
326            let retriever = retriever.clone();
327            let task = task::spawn(async move {
328                let result = retriever.request().await;
329                assert_eq!(result.unwrap(), "response");
330            });
331            requester_tasks.push(task);
332        }
333
334        // Then
335        for requester_task in requester_tasks {
336            assert!(requester_task.await.is_ok());
337        }
338        assert!(owner_task.await.is_ok());
339    }
340
341    #[tokio::test]
342    async fn multiple_requests_in_order() {
343        // Given
344        let conduit = Conduit::new();
345        let retriever = conduit.retriever();
346
347        // When
348        let owner_task = task::spawn(async move {
349            for scheduled_payload in 0..5 {
350                let request = conduit.requested().await;
351                request.send(scheduled_payload).unwrap();
352            }
353        });
354
355        // Then
356        let requester_task = task::spawn(async move {
357            for expected_payload in 0..5 {
358                let result = retriever.request().await;
359                assert_eq!(result.unwrap(), expected_payload);
360            }
361        });
362
363        // Then
364        assert!(requester_task.await.is_ok());
365        assert!(owner_task.await.is_ok());
366    }
367
368    #[tokio::test]
369    async fn retriever_request_send_error() {
370        // Given
371        let (requester_template, mut listener): (
372            mpsc::Sender<oneshot::Sender<usize>>,
373            mpsc::Receiver<oneshot::Sender<usize>>,
374        ) = mpsc::channel(1);
375        listener.close();
376
377        // Given
378        let retriever = Retriever {
379            requester: requester_template,
380        };
381
382        // When
383        let result = retriever.request().await;
384
385        // Then
386        assert!(result.is_none());
387    }
388
389    #[tokio::test]
390    async fn oneshot_sender_dropped() {
391        // Given
392        let conduit: Conduit<usize> = Conduit::new();
393        let retriever = conduit.retriever();
394
395        // When
396        let owner_task = task::spawn(async move {
397            // Intentionally drop the oneshot sender without sending a response
398            let _request = conduit.requested().await;
399        });
400        let result = retriever.request().await;
401
402        // Then
403        assert!(result.is_none());
404        assert!(owner_task.await.is_ok());
405    }
406
407    #[tokio::test]
408    async fn conduit_requested_none() {
409        // Given
410        let (requester_template, mut listener): (
411            mpsc::Sender<oneshot::Sender<usize>>,
412            mpsc::Receiver<oneshot::Sender<usize>>,
413        ) = mpsc::channel(1);
414        listener.close();
415
416        let conduit = Conduit {
417            listener: AsyncMutex::new(listener),
418            requester_template,
419        };
420
421        // When
422        let result = AssertUnwindSafe(async {
423            conduit.requested().await;
424        });
425        let result = result.catch_unwind().await;
426
427        assert!(result.is_err());
428    }
429}