commonware_utils/channels/
fallible.rs

1//! Utilities for graceful channel shutdown handling.
2//!
3//! When actors communicate via channels, senders may outlive receivers during shutdown.
4//! This module provides extension traits and helpers that handle disconnection gracefully
5//! rather than panicking.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use commonware_utils::channels::fallible::FallibleExt;
11//!
12//! // Fire-and-forget: silently ignore disconnection
13//! sender.send_lossy(Message::Shutdown);
14//!
15//! // Request-response: return None on disconnection
16//! let result = sender.request(|tx| Message::Query { responder: tx }).await;
17//! ```
18
19use futures::channel::{mpsc, oneshot};
20
21/// Extension trait for channel operations that may fail due to disconnection.
22///
23/// Use these methods when the receiver may be dropped during shutdown
24/// and you want to handle that gracefully rather than panicking.
25pub trait FallibleExt<T> {
26    /// Send a message, returning `true` if successful.
27    ///
28    /// Use this for fire-and-forget messages where the receiver
29    /// may have been dropped during shutdown. The return value can
30    /// be ignored if the caller doesn't need to know whether the
31    /// send succeeded.
32    fn send_lossy(&self, msg: T) -> bool;
33
34    /// Send a request message containing a oneshot responder and await the response.
35    ///
36    /// Returns `None` if:
37    /// - The receiver has been dropped (send fails)
38    /// - The responder is dropped without sending (receive fails)
39    ///
40    /// # Example
41    ///
42    /// ```ignore
43    /// let dialable: Option<Vec<PublicKey>> = sender
44    ///     .request(|tx| Message::Dialable { responder: tx })
45    ///     .await;
46    /// ```
47    fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
48    where
49        R: Send,
50        F: FnOnce(oneshot::Sender<R>) -> T + Send;
51
52    /// Send a request and return the provided default on failure.
53    ///
54    /// This is a convenience wrapper around [`request`](Self::request) for cases
55    /// where you have a sensible default value.
56    fn request_or<R, F>(
57        &self,
58        make_msg: F,
59        default: R,
60    ) -> impl std::future::Future<Output = R> + Send
61    where
62        R: Send,
63        F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65    /// Send a request and return `R::default()` on failure.
66    ///
67    /// This is a convenience wrapper around [`request`](Self::request) for types
68    /// that implement [`Default`].
69    fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
70    where
71        R: Default + Send,
72        F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76    fn send_lossy(&self, msg: T) -> bool {
77        self.unbounded_send(msg).is_ok()
78    }
79
80    async fn request<R, F>(&self, make_msg: F) -> Option<R>
81    where
82        R: Send,
83        F: FnOnce(oneshot::Sender<R>) -> T + Send,
84    {
85        let (tx, rx) = oneshot::channel();
86        if self.unbounded_send(make_msg(tx)).is_err() {
87            return None;
88        }
89        rx.await.ok()
90    }
91
92    async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93    where
94        R: Send,
95        F: FnOnce(oneshot::Sender<R>) -> T + Send,
96    {
97        self.request(make_msg).await.unwrap_or(default)
98    }
99
100    async fn request_or_default<R, F>(&self, make_msg: F) -> R
101    where
102        R: Default + Send,
103        F: FnOnce(oneshot::Sender<R>) -> T + Send,
104    {
105        self.request(make_msg).await.unwrap_or_default()
106    }
107}
108
109/// Extension trait for bounded channel operations that may fail due to disconnection.
110///
111/// Similar to [`FallibleExt`] but for bounded channels where send operations are async.
112pub trait AsyncFallibleExt<T> {
113    /// Send a message asynchronously, returning `true` if successful.
114    ///
115    /// Use this for fire-and-forget messages where the receiver
116    /// may have been dropped during shutdown. The return value can
117    /// be ignored if the caller doesn't need to know whether the
118    /// send succeeded.
119    fn send_lossy(&mut self, msg: T) -> impl std::future::Future<Output = bool> + Send;
120
121    /// Try to send a message without blocking, returning `true` if successful.
122    ///
123    /// Use this for fire-and-forget messages where you don't want to wait
124    /// if the channel is full. Returns `false` if the channel is full or
125    /// disconnected.
126    fn try_send_lossy(&mut self, msg: T) -> bool;
127
128    /// Send a request message containing a oneshot responder and await the response.
129    ///
130    /// Returns `None` if:
131    /// - The receiver has been dropped (send fails)
132    /// - The responder is dropped without sending (receive fails)
133    fn request<R, F>(&mut self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
134    where
135        R: Send,
136        F: FnOnce(oneshot::Sender<R>) -> T + Send;
137
138    /// Send a request and return the provided default on failure.
139    fn request_or<R, F>(
140        &mut self,
141        make_msg: F,
142        default: R,
143    ) -> impl std::future::Future<Output = R> + Send
144    where
145        R: Send,
146        F: FnOnce(oneshot::Sender<R>) -> T + Send;
147
148    /// Send a request and return `R::default()` on failure.
149    fn request_or_default<R, F>(
150        &mut self,
151        make_msg: F,
152    ) -> impl std::future::Future<Output = R> + Send
153    where
154        R: Default + Send,
155        F: FnOnce(oneshot::Sender<R>) -> T + Send;
156}
157
158impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
159    async fn send_lossy(&mut self, msg: T) -> bool {
160        futures::SinkExt::send(self, msg).await.is_ok()
161    }
162
163    fn try_send_lossy(&mut self, msg: T) -> bool {
164        self.try_send(msg).is_ok()
165    }
166
167    async fn request<R, F>(&mut self, make_msg: F) -> Option<R>
168    where
169        R: Send,
170        F: FnOnce(oneshot::Sender<R>) -> T + Send,
171    {
172        let (tx, rx) = oneshot::channel();
173        if futures::SinkExt::send(self, make_msg(tx)).await.is_err() {
174            return None;
175        }
176        rx.await.ok()
177    }
178
179    async fn request_or<R, F>(&mut self, make_msg: F, default: R) -> R
180    where
181        R: Send,
182        F: FnOnce(oneshot::Sender<R>) -> T + Send,
183    {
184        self.request(make_msg).await.unwrap_or(default)
185    }
186
187    async fn request_or_default<R, F>(&mut self, make_msg: F) -> R
188    where
189        R: Default + Send,
190        F: FnOnce(oneshot::Sender<R>) -> T + Send,
191    {
192        self.request(make_msg).await.unwrap_or_default()
193    }
194}
195
196/// Extension trait for oneshot sender operations that may fail due to disconnection.
197///
198/// Use this when the receiver may have been dropped during shutdown
199/// and you want to handle that gracefully rather than panicking.
200pub trait OneshotExt<T> {
201    /// Send a value, returning `true` if successful.
202    ///
203    /// Use this for fire-and-forget responses where the requester
204    /// may have been dropped during shutdown. The return value can
205    /// be ignored if the caller doesn't need to know whether the
206    /// send succeeded.
207    ///
208    /// Consumes the sender.
209    fn send_lossy(self, msg: T) -> bool;
210}
211
212impl<T> OneshotExt<T> for oneshot::Sender<T> {
213    fn send_lossy(self, msg: T) -> bool {
214        self.send(msg).is_ok()
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use commonware_macros::test_async;
222
223    #[derive(Debug)]
224    #[allow(dead_code)]
225    enum TestMessage {
226        FireAndForget(u32),
227        Request {
228            responder: oneshot::Sender<String>,
229        },
230        RequestBool {
231            responder: oneshot::Sender<bool>,
232        },
233        RequestVec {
234            responder: oneshot::Sender<Vec<u32>>,
235        },
236    }
237
238    #[test]
239    fn test_send_lossy_success() {
240        let (tx, mut rx) = mpsc::unbounded();
241        assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
242
243        // Message should be received
244        assert!(matches!(
245            rx.try_next(),
246            Ok(Some(TestMessage::FireAndForget(42)))
247        ));
248    }
249
250    #[test]
251    fn test_send_lossy_disconnected() {
252        let (tx, rx) = mpsc::unbounded::<TestMessage>();
253        drop(rx);
254
255        // Should not panic, returns false
256        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
257    }
258
259    #[test_async]
260    async fn test_request_send_disconnected() {
261        let (tx, rx) = mpsc::unbounded::<TestMessage>();
262        drop(rx);
263
264        let result: Option<String> = tx
265            .request(|responder| TestMessage::Request { responder })
266            .await;
267
268        assert_eq!(result, None);
269    }
270
271    #[test_async]
272    async fn test_request_or_disconnected() {
273        let (tx, rx) = mpsc::unbounded::<TestMessage>();
274        drop(rx);
275
276        let result = tx
277            .request_or(|responder| TestMessage::RequestBool { responder }, false)
278            .await;
279
280        assert!(!result);
281    }
282
283    #[test_async]
284    async fn test_request_or_default_disconnected() {
285        let (tx, rx) = mpsc::unbounded::<TestMessage>();
286        drop(rx);
287
288        let result: Vec<u32> = tx
289            .request_or_default(|responder| TestMessage::RequestVec { responder })
290            .await;
291
292        assert!(result.is_empty());
293    }
294
295    // AsyncFallibleExt tests for bounded channels
296
297    #[test_async]
298    async fn test_async_send_lossy_success() {
299        let (mut tx, mut rx) = mpsc::channel(1);
300        assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
301
302        // Message should be received
303        assert!(matches!(
304            rx.try_next(),
305            Ok(Some(TestMessage::FireAndForget(42)))
306        ));
307    }
308
309    #[test_async]
310    async fn test_async_send_lossy_disconnected() {
311        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
312        drop(rx);
313
314        // Should not panic, returns false
315        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
316    }
317
318    #[test_async]
319    async fn test_async_request_send_disconnected() {
320        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
321        drop(rx);
322
323        let result: Option<String> =
324            AsyncFallibleExt::request(&mut tx, |responder| TestMessage::Request { responder })
325                .await;
326
327        assert_eq!(result, None);
328    }
329
330    #[test_async]
331    async fn test_async_request_or_disconnected() {
332        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
333        drop(rx);
334
335        let result = AsyncFallibleExt::request_or(
336            &mut tx,
337            |responder| TestMessage::RequestBool { responder },
338            false,
339        )
340        .await;
341
342        assert!(!result);
343    }
344
345    #[test_async]
346    async fn test_async_request_or_default_disconnected() {
347        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
348        drop(rx);
349
350        let result: Vec<u32> = AsyncFallibleExt::request_or_default(&mut tx, |responder| {
351            TestMessage::RequestVec { responder }
352        })
353        .await;
354
355        assert!(result.is_empty());
356    }
357
358    // try_send_lossy tests
359
360    #[test]
361    fn test_try_send_lossy_success() {
362        let (mut tx, mut rx) = mpsc::channel(1);
363        assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
364
365        // Message should be received
366        assert!(matches!(
367            rx.try_next(),
368            Ok(Some(TestMessage::FireAndForget(42)))
369        ));
370    }
371
372    #[test]
373    fn test_try_send_lossy_disconnected() {
374        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
375        drop(rx);
376
377        // Should not panic, returns false
378        assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
379    }
380
381    // OneshotExt tests
382
383    #[test]
384    fn test_oneshot_send_lossy_success() {
385        use futures::FutureExt;
386        let (tx, rx) = oneshot::channel::<u32>();
387        assert!(tx.send_lossy(42));
388        assert_eq!(rx.now_or_never(), Some(Ok(42)));
389    }
390
391    #[test]
392    fn test_oneshot_send_lossy_disconnected() {
393        let (tx, rx) = oneshot::channel::<u32>();
394        drop(rx);
395        assert!(!tx.send_lossy(42));
396    }
397}