Skip to main content

commonware_utils/channel/
fallible.rs

1//! Utilities for graceful channel shutdown handling.
2//!
3//! When actors communicate via channels, senders may outlive receivers during shutdown.
4//! This module provides extension traits and helpers that handle disconnection gracefully
5//! rather than panicking.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use commonware_utils::channel::fallible::FallibleExt;
11//!
12//! // Fire-and-forget: silently ignore disconnection
13//! sender.send_lossy(Message::Shutdown);
14//!
15//! // Request-response: return None on disconnection
16//! let result = sender.request(|tx| Message::Query { responder: tx }).await;
17//! ```
18
19use super::{
20    mpsc, oneshot,
21    reservation::{Reservation, ReservationExt},
22};
23use std::future::Future;
24
25/// Extension trait for channel operations that may fail due to disconnection.
26///
27/// Use these methods when the receiver may be dropped during shutdown
28/// and you want to handle that gracefully rather than panicking.
29pub trait FallibleExt<T> {
30    /// Send a message, returning `true` if successful.
31    ///
32    /// Use this for fire-and-forget messages where the receiver
33    /// may have been dropped during shutdown. The return value can
34    /// be ignored if the caller doesn't need to know whether the
35    /// send succeeded.
36    fn send_lossy(&self, msg: T) -> bool;
37
38    /// Send a request message containing a oneshot responder and await the response.
39    ///
40    /// Returns `None` if:
41    /// - The receiver has been dropped (send fails)
42    /// - The responder is dropped without sending (receive fails)
43    ///
44    /// # Example
45    ///
46    /// ```ignore
47    /// let dialable: Option<Vec<PublicKey>> = sender
48    ///     .request(|tx| Message::Dialable { responder: tx })
49    ///     .await;
50    /// ```
51    fn request<R, F>(&self, make_msg: F) -> impl Future<Output = Option<R>> + Send
52    where
53        R: Send,
54        F: FnOnce(oneshot::Sender<R>) -> T + Send;
55
56    /// Send a request and return the provided default on failure.
57    ///
58    /// This is a convenience wrapper around [`request`](Self::request) for cases
59    /// where you have a sensible default value.
60    fn request_or<R, F>(&self, make_msg: F, default: R) -> impl Future<Output = R> + Send
61    where
62        R: Send,
63        F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65    /// Send a request and return `R::default()` on failure.
66    ///
67    /// This is a convenience wrapper around [`request`](Self::request) for types
68    /// that implement [`Default`].
69    fn request_or_default<R, F>(&self, make_msg: F) -> impl Future<Output = R> + Send
70    where
71        R: Default + Send,
72        F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76    fn send_lossy(&self, msg: T) -> bool {
77        self.send(msg).is_ok()
78    }
79
80    async fn request<R, F>(&self, make_msg: F) -> Option<R>
81    where
82        R: Send,
83        F: FnOnce(oneshot::Sender<R>) -> T + Send,
84    {
85        let (tx, rx) = oneshot::channel();
86        if self.send(make_msg(tx)).is_err() {
87            return None;
88        }
89        rx.await.ok()
90    }
91
92    async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93    where
94        R: Send,
95        F: FnOnce(oneshot::Sender<R>) -> T + Send,
96    {
97        self.request(make_msg).await.unwrap_or(default)
98    }
99
100    async fn request_or_default<R, F>(&self, make_msg: F) -> R
101    where
102        R: Default + Send,
103        F: FnOnce(oneshot::Sender<R>) -> T + Send,
104    {
105        self.request(make_msg).await.unwrap_or_default()
106    }
107}
108
109/// Extension trait for bounded channel operations that may fail due to disconnection.
110///
111/// Similar to [`FallibleExt`] but for bounded channels where send operations are async.
112pub trait AsyncFallibleExt<T> {
113    /// Send a message asynchronously, returning `true` if successful.
114    ///
115    /// Use this for fire-and-forget messages where the receiver
116    /// may have been dropped during shutdown. The return value can
117    /// be ignored if the caller doesn't need to know whether the
118    /// send succeeded.
119    fn send_lossy(&self, msg: T) -> impl Future<Output = bool> + Send;
120
121    /// Try to send a message without blocking, returning `true` if successful.
122    ///
123    /// Use this for fire-and-forget messages where you don't want to wait
124    /// if the channel is full. Returns `false` if the channel is full or
125    /// disconnected.
126    fn try_send_lossy(&self, msg: T) -> bool;
127
128    /// Attempts to send immediately, reserving the message when the channel is full.
129    ///
130    /// Returns `None` if the value was sent immediately or the receiver has been dropped.
131    #[must_use = "await and send any reservation"]
132    fn send_or_reserve_lossy(&self, msg: T) -> Option<Reservation<T>>
133    where
134        T: 'static;
135
136    /// Send a request message containing a oneshot responder and await the response.
137    ///
138    /// Returns `None` if:
139    /// - The receiver has been dropped (send fails)
140    /// - The responder is dropped without sending (receive fails)
141    fn request<R, F>(&self, make_msg: F) -> impl Future<Output = Option<R>> + Send
142    where
143        R: Send,
144        F: FnOnce(oneshot::Sender<R>) -> T + Send;
145
146    /// Send a request and return the provided default on failure.
147    fn request_or<R, F>(&self, make_msg: F, default: R) -> impl Future<Output = R> + Send
148    where
149        R: Send,
150        F: FnOnce(oneshot::Sender<R>) -> T + Send;
151
152    /// Send a request and return `R::default()` on failure.
153    fn request_or_default<R, F>(&self, make_msg: F) -> impl Future<Output = R> + Send
154    where
155        R: Default + Send,
156        F: FnOnce(oneshot::Sender<R>) -> T + Send;
157}
158
159impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
160    async fn send_lossy(&self, msg: T) -> bool {
161        self.send(msg).await.is_ok()
162    }
163
164    fn try_send_lossy(&self, msg: T) -> bool {
165        self.try_send(msg).is_ok()
166    }
167
168    fn send_or_reserve_lossy(&self, msg: T) -> Option<Reservation<T>>
169    where
170        T: 'static,
171    {
172        self.send_or_reserve(msg).ok().flatten()
173    }
174
175    async fn request<R, F>(&self, make_msg: F) -> Option<R>
176    where
177        R: Send,
178        F: FnOnce(oneshot::Sender<R>) -> T + Send,
179    {
180        let (tx, rx) = oneshot::channel();
181        if self.send(make_msg(tx)).await.is_err() {
182            return None;
183        }
184        rx.await.ok()
185    }
186
187    async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
188    where
189        R: Send,
190        F: FnOnce(oneshot::Sender<R>) -> T + Send,
191    {
192        self.request(make_msg).await.unwrap_or(default)
193    }
194
195    async fn request_or_default<R, F>(&self, make_msg: F) -> R
196    where
197        R: Default + Send,
198        F: FnOnce(oneshot::Sender<R>) -> T + Send,
199    {
200        self.request(make_msg).await.unwrap_or_default()
201    }
202}
203
204/// Extension trait for oneshot sender operations that may fail due to disconnection.
205///
206/// Use this when the receiver may have been dropped during shutdown
207/// and you want to handle that gracefully rather than panicking.
208pub trait OneshotExt<T> {
209    /// Send a value, returning `true` if successful.
210    ///
211    /// Use this for fire-and-forget responses where the requester
212    /// may have been dropped during shutdown. The return value can
213    /// be ignored if the caller doesn't need to know whether the
214    /// send succeeded.
215    ///
216    /// Consumes the sender.
217    fn send_lossy(self, msg: T) -> bool;
218}
219
220impl<T> OneshotExt<T> for oneshot::Sender<T> {
221    fn send_lossy(self, msg: T) -> bool {
222        self.send(msg).is_ok()
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use commonware_macros::test_async;
230
231    #[derive(Debug)]
232    #[allow(dead_code)]
233    enum TestMessage {
234        FireAndForget(u32),
235        Request {
236            responder: oneshot::Sender<String>,
237        },
238        RequestBool {
239            responder: oneshot::Sender<bool>,
240        },
241        RequestVec {
242            responder: oneshot::Sender<Vec<u32>>,
243        },
244    }
245
246    #[test]
247    fn test_send_lossy_success() {
248        let (tx, mut rx) = mpsc::unbounded_channel();
249        assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
250
251        // Message should be received
252        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
253    }
254
255    #[test]
256    fn test_send_lossy_disconnected() {
257        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
258        drop(rx);
259
260        // Should not panic, returns false
261        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
262    }
263
264    #[test_async]
265    async fn test_request_send_disconnected() {
266        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
267        drop(rx);
268
269        let result: Option<String> = tx
270            .request(|responder| TestMessage::Request { responder })
271            .await;
272
273        assert_eq!(result, None);
274    }
275
276    #[test_async]
277    async fn test_request_or_disconnected() {
278        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
279        drop(rx);
280
281        let result = tx
282            .request_or(|responder| TestMessage::RequestBool { responder }, false)
283            .await;
284
285        assert!(!result);
286    }
287
288    #[test_async]
289    async fn test_request_or_default_disconnected() {
290        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
291        drop(rx);
292
293        let result: Vec<u32> = tx
294            .request_or_default(|responder| TestMessage::RequestVec { responder })
295            .await;
296
297        assert!(result.is_empty());
298    }
299
300    // AsyncFallibleExt tests for bounded channels
301
302    #[test_async]
303    async fn test_async_send_lossy_success() {
304        let (tx, mut rx) = mpsc::channel(1);
305        assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
306
307        // Message should be received
308        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
309    }
310
311    #[test_async]
312    async fn test_async_send_lossy_disconnected() {
313        let (tx, rx) = mpsc::channel::<TestMessage>(1);
314        drop(rx);
315
316        // Should not panic, returns false
317        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
318    }
319
320    #[test_async]
321    async fn test_async_request_send_disconnected() {
322        let (tx, rx) = mpsc::channel::<TestMessage>(1);
323        drop(rx);
324
325        let result: Option<String> =
326            AsyncFallibleExt::request(&tx, |responder| TestMessage::Request { responder }).await;
327
328        assert_eq!(result, None);
329    }
330
331    #[test_async]
332    async fn test_async_request_or_disconnected() {
333        let (tx, rx) = mpsc::channel::<TestMessage>(1);
334        drop(rx);
335
336        let result = AsyncFallibleExt::request_or(
337            &tx,
338            |responder| TestMessage::RequestBool { responder },
339            false,
340        )
341        .await;
342
343        assert!(!result);
344    }
345
346    #[test_async]
347    async fn test_async_request_or_default_disconnected() {
348        let (tx, rx) = mpsc::channel::<TestMessage>(1);
349        drop(rx);
350
351        let result: Vec<u32> = AsyncFallibleExt::request_or_default(&tx, |responder| {
352            TestMessage::RequestVec { responder }
353        })
354        .await;
355
356        assert!(result.is_empty());
357    }
358
359    // try_send_lossy tests
360
361    #[test]
362    fn test_try_send_lossy_success() {
363        let (tx, mut rx) = mpsc::channel(1);
364        assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
365
366        // Message should be received
367        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
368    }
369
370    #[test]
371    fn test_try_send_lossy_disconnected() {
372        let (tx, rx) = mpsc::channel::<TestMessage>(1);
373        drop(rx);
374
375        // Should not panic, returns false
376        assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
377    }
378
379    // send_or_reserve_lossy tests
380
381    #[test]
382    fn test_send_or_reserve_lossy_success() {
383        let (tx, mut rx) = mpsc::channel(1);
384
385        assert!(tx
386            .send_or_reserve_lossy(TestMessage::FireAndForget(42))
387            .is_none());
388        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
389    }
390
391    #[test]
392    fn test_send_or_reserve_lossy_disconnected() {
393        let (tx, rx) = mpsc::channel::<TestMessage>(1);
394        drop(rx);
395
396        assert!(tx
397            .send_or_reserve_lossy(TestMessage::FireAndForget(42))
398            .is_none());
399    }
400
401    #[test_async]
402    async fn test_send_or_reserve_lossy_reserves_when_full() {
403        let (tx, mut rx) = mpsc::channel(1);
404        tx.try_send(TestMessage::FireAndForget(1)).unwrap();
405
406        let reservation = tx
407            .send_or_reserve_lossy(TestMessage::FireAndForget(2))
408            .expect("receiver should be open");
409
410        assert!(matches!(
411            rx.recv().await,
412            Some(TestMessage::FireAndForget(1))
413        ));
414        reservation.await.unwrap().send();
415        assert!(matches!(
416            rx.recv().await,
417            Some(TestMessage::FireAndForget(2))
418        ));
419    }
420
421    #[test_async]
422    async fn test_send_or_reserve_lossy_reserved_disconnected() {
423        let (tx, rx) = mpsc::channel(1);
424        tx.try_send(TestMessage::FireAndForget(1)).unwrap();
425
426        let reservation = tx
427            .send_or_reserve_lossy(TestMessage::FireAndForget(2))
428            .expect("receiver should be open");
429        drop(rx);
430
431        assert!(reservation.await.is_err());
432    }
433
434    // OneshotExt tests
435
436    #[test]
437    fn test_oneshot_send_lossy_success() {
438        let (tx, mut rx) = oneshot::channel::<u32>();
439        assert!(tx.send_lossy(42));
440        assert_eq!(rx.try_recv(), Ok(42));
441    }
442
443    #[test]
444    fn test_oneshot_send_lossy_disconnected() {
445        let (tx, rx) = oneshot::channel::<u32>();
446        drop(rx);
447        assert!(!tx.send_lossy(42));
448    }
449}