Skip to main content

commonware_utils/channel/
fallible.rs

1//! Utilities for graceful channel shutdown handling.
2//!
3//! When actors communicate via channels, senders may outlive receivers during shutdown.
4//! This module provides extension traits and helpers that handle disconnection gracefully
5//! rather than panicking.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use commonware_utils::channel::fallible::FallibleExt;
11//!
12//! // Fire-and-forget: silently ignore disconnection
13//! sender.send_lossy(Message::Shutdown);
14//!
15//! // Request-response: return None on disconnection
16//! let result = sender.request(|tx| Message::Query { responder: tx }).await;
17//! ```
18
19use super::{mpsc, oneshot};
20
21/// Extension trait for channel operations that may fail due to disconnection.
22///
23/// Use these methods when the receiver may be dropped during shutdown
24/// and you want to handle that gracefully rather than panicking.
25pub trait FallibleExt<T> {
26    /// Send a message, returning `true` if successful.
27    ///
28    /// Use this for fire-and-forget messages where the receiver
29    /// may have been dropped during shutdown. The return value can
30    /// be ignored if the caller doesn't need to know whether the
31    /// send succeeded.
32    fn send_lossy(&self, msg: T) -> bool;
33
34    /// Send a request message containing a oneshot responder and await the response.
35    ///
36    /// Returns `None` if:
37    /// - The receiver has been dropped (send fails)
38    /// - The responder is dropped without sending (receive fails)
39    ///
40    /// # Example
41    ///
42    /// ```ignore
43    /// let dialable: Option<Vec<PublicKey>> = sender
44    ///     .request(|tx| Message::Dialable { responder: tx })
45    ///     .await;
46    /// ```
47    fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
48    where
49        R: Send,
50        F: FnOnce(oneshot::Sender<R>) -> T + Send;
51
52    /// Send a request and return the provided default on failure.
53    ///
54    /// This is a convenience wrapper around [`request`](Self::request) for cases
55    /// where you have a sensible default value.
56    fn request_or<R, F>(
57        &self,
58        make_msg: F,
59        default: R,
60    ) -> impl std::future::Future<Output = R> + Send
61    where
62        R: Send,
63        F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65    /// Send a request and return `R::default()` on failure.
66    ///
67    /// This is a convenience wrapper around [`request`](Self::request) for types
68    /// that implement [`Default`].
69    fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
70    where
71        R: Default + Send,
72        F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76    fn send_lossy(&self, msg: T) -> bool {
77        self.send(msg).is_ok()
78    }
79
80    async fn request<R, F>(&self, make_msg: F) -> Option<R>
81    where
82        R: Send,
83        F: FnOnce(oneshot::Sender<R>) -> T + Send,
84    {
85        let (tx, rx) = oneshot::channel();
86        if self.send(make_msg(tx)).is_err() {
87            return None;
88        }
89        rx.await.ok()
90    }
91
92    async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93    where
94        R: Send,
95        F: FnOnce(oneshot::Sender<R>) -> T + Send,
96    {
97        self.request(make_msg).await.unwrap_or(default)
98    }
99
100    async fn request_or_default<R, F>(&self, make_msg: F) -> R
101    where
102        R: Default + Send,
103        F: FnOnce(oneshot::Sender<R>) -> T + Send,
104    {
105        self.request(make_msg).await.unwrap_or_default()
106    }
107}
108
109/// Extension trait for bounded channel operations that may fail due to disconnection.
110///
111/// Similar to [`FallibleExt`] but for bounded channels where send operations are async.
112pub trait AsyncFallibleExt<T> {
113    /// Send a message asynchronously, returning `true` if successful.
114    ///
115    /// Use this for fire-and-forget messages where the receiver
116    /// may have been dropped during shutdown. The return value can
117    /// be ignored if the caller doesn't need to know whether the
118    /// send succeeded.
119    fn send_lossy(&mut self, msg: T) -> impl std::future::Future<Output = bool> + Send;
120
121    /// Try to send a message without blocking, returning `true` if successful.
122    ///
123    /// Use this for fire-and-forget messages where you don't want to wait
124    /// if the channel is full. Returns `false` if the channel is full or
125    /// disconnected.
126    fn try_send_lossy(&mut self, msg: T) -> bool;
127
128    /// Send a request message containing a oneshot responder and await the response.
129    ///
130    /// Returns `None` if:
131    /// - The receiver has been dropped (send fails)
132    /// - The responder is dropped without sending (receive fails)
133    fn request<R, F>(&mut self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
134    where
135        R: Send,
136        F: FnOnce(oneshot::Sender<R>) -> T + Send;
137
138    /// Send a request and return the provided default on failure.
139    fn request_or<R, F>(
140        &mut self,
141        make_msg: F,
142        default: R,
143    ) -> impl std::future::Future<Output = R> + Send
144    where
145        R: Send,
146        F: FnOnce(oneshot::Sender<R>) -> T + Send;
147
148    /// Send a request and return `R::default()` on failure.
149    fn request_or_default<R, F>(
150        &mut self,
151        make_msg: F,
152    ) -> impl std::future::Future<Output = R> + Send
153    where
154        R: Default + Send,
155        F: FnOnce(oneshot::Sender<R>) -> T + Send;
156}
157
158impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
159    async fn send_lossy(&mut self, msg: T) -> bool {
160        self.send(msg).await.is_ok()
161    }
162
163    fn try_send_lossy(&mut self, msg: T) -> bool {
164        self.try_send(msg).is_ok()
165    }
166
167    async fn request<R, F>(&mut self, make_msg: F) -> Option<R>
168    where
169        R: Send,
170        F: FnOnce(oneshot::Sender<R>) -> T + Send,
171    {
172        let (tx, rx) = oneshot::channel();
173        if self.send(make_msg(tx)).await.is_err() {
174            return None;
175        }
176        rx.await.ok()
177    }
178
179    async fn request_or<R, F>(&mut self, make_msg: F, default: R) -> R
180    where
181        R: Send,
182        F: FnOnce(oneshot::Sender<R>) -> T + Send,
183    {
184        self.request(make_msg).await.unwrap_or(default)
185    }
186
187    async fn request_or_default<R, F>(&mut self, make_msg: F) -> R
188    where
189        R: Default + Send,
190        F: FnOnce(oneshot::Sender<R>) -> T + Send,
191    {
192        self.request(make_msg).await.unwrap_or_default()
193    }
194}
195
196/// Extension trait for oneshot sender operations that may fail due to disconnection.
197///
198/// Use this when the receiver may have been dropped during shutdown
199/// and you want to handle that gracefully rather than panicking.
200pub trait OneshotExt<T> {
201    /// Send a value, returning `true` if successful.
202    ///
203    /// Use this for fire-and-forget responses where the requester
204    /// may have been dropped during shutdown. The return value can
205    /// be ignored if the caller doesn't need to know whether the
206    /// send succeeded.
207    ///
208    /// Consumes the sender.
209    fn send_lossy(self, msg: T) -> bool;
210}
211
212impl<T> OneshotExt<T> for oneshot::Sender<T> {
213    fn send_lossy(self, msg: T) -> bool {
214        self.send(msg).is_ok()
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use commonware_macros::test_async;
222
223    #[derive(Debug)]
224    #[allow(dead_code)]
225    enum TestMessage {
226        FireAndForget(u32),
227        Request {
228            responder: oneshot::Sender<String>,
229        },
230        RequestBool {
231            responder: oneshot::Sender<bool>,
232        },
233        RequestVec {
234            responder: oneshot::Sender<Vec<u32>>,
235        },
236    }
237
238    #[test]
239    fn test_send_lossy_success() {
240        let (tx, mut rx) = mpsc::unbounded_channel();
241        assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
242
243        // Message should be received
244        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
245    }
246
247    #[test]
248    fn test_send_lossy_disconnected() {
249        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
250        drop(rx);
251
252        // Should not panic, returns false
253        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
254    }
255
256    #[test_async]
257    async fn test_request_send_disconnected() {
258        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
259        drop(rx);
260
261        let result: Option<String> = tx
262            .request(|responder| TestMessage::Request { responder })
263            .await;
264
265        assert_eq!(result, None);
266    }
267
268    #[test_async]
269    async fn test_request_or_disconnected() {
270        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
271        drop(rx);
272
273        let result = tx
274            .request_or(|responder| TestMessage::RequestBool { responder }, false)
275            .await;
276
277        assert!(!result);
278    }
279
280    #[test_async]
281    async fn test_request_or_default_disconnected() {
282        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
283        drop(rx);
284
285        let result: Vec<u32> = tx
286            .request_or_default(|responder| TestMessage::RequestVec { responder })
287            .await;
288
289        assert!(result.is_empty());
290    }
291
292    // AsyncFallibleExt tests for bounded channels
293
294    #[test_async]
295    async fn test_async_send_lossy_success() {
296        let (mut tx, mut rx) = mpsc::channel(1);
297        assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
298
299        // Message should be received
300        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
301    }
302
303    #[test_async]
304    async fn test_async_send_lossy_disconnected() {
305        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
306        drop(rx);
307
308        // Should not panic, returns false
309        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
310    }
311
312    #[test_async]
313    async fn test_async_request_send_disconnected() {
314        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
315        drop(rx);
316
317        let result: Option<String> =
318            AsyncFallibleExt::request(&mut tx, |responder| TestMessage::Request { responder })
319                .await;
320
321        assert_eq!(result, None);
322    }
323
324    #[test_async]
325    async fn test_async_request_or_disconnected() {
326        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
327        drop(rx);
328
329        let result = AsyncFallibleExt::request_or(
330            &mut tx,
331            |responder| TestMessage::RequestBool { responder },
332            false,
333        )
334        .await;
335
336        assert!(!result);
337    }
338
339    #[test_async]
340    async fn test_async_request_or_default_disconnected() {
341        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
342        drop(rx);
343
344        let result: Vec<u32> = AsyncFallibleExt::request_or_default(&mut tx, |responder| {
345            TestMessage::RequestVec { responder }
346        })
347        .await;
348
349        assert!(result.is_empty());
350    }
351
352    // try_send_lossy tests
353
354    #[test]
355    fn test_try_send_lossy_success() {
356        let (mut tx, mut rx) = mpsc::channel(1);
357        assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
358
359        // Message should be received
360        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
361    }
362
363    #[test]
364    fn test_try_send_lossy_disconnected() {
365        let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
366        drop(rx);
367
368        // Should not panic, returns false
369        assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
370    }
371
372    // OneshotExt tests
373
374    #[test]
375    fn test_oneshot_send_lossy_success() {
376        let (tx, mut rx) = oneshot::channel::<u32>();
377        assert!(tx.send_lossy(42));
378        assert_eq!(rx.try_recv(), Ok(42));
379    }
380
381    #[test]
382    fn test_oneshot_send_lossy_disconnected() {
383        let (tx, rx) = oneshot::channel::<u32>();
384        drop(rx);
385        assert!(!tx.send_lossy(42));
386    }
387}