Skip to main content

commonware_utils/channel/
fallible.rs

1//! Utilities for graceful channel shutdown handling.
2//!
3//! When actors communicate via channels, senders may outlive receivers during shutdown.
4//! This module provides extension traits and helpers that handle disconnection gracefully
5//! rather than panicking.
6//!
7//! # Example
8//!
9//! ```ignore
10//! use commonware_utils::channel::fallible::FallibleExt;
11//!
12//! // Fire-and-forget: silently ignore disconnection
13//! sender.send_lossy(Message::Shutdown);
14//!
15//! // Request-response: return None on disconnection
16//! let result = sender.request(|tx| Message::Query { responder: tx }).await;
17//! ```
18
19use super::{mpsc, oneshot};
20
21/// Extension trait for channel operations that may fail due to disconnection.
22///
23/// Use these methods when the receiver may be dropped during shutdown
24/// and you want to handle that gracefully rather than panicking.
25pub trait FallibleExt<T> {
26    /// Send a message, returning `true` if successful.
27    ///
28    /// Use this for fire-and-forget messages where the receiver
29    /// may have been dropped during shutdown. The return value can
30    /// be ignored if the caller doesn't need to know whether the
31    /// send succeeded.
32    fn send_lossy(&self, msg: T) -> bool;
33
34    /// Send a request message containing a oneshot responder and await the response.
35    ///
36    /// Returns `None` if:
37    /// - The receiver has been dropped (send fails)
38    /// - The responder is dropped without sending (receive fails)
39    ///
40    /// # Example
41    ///
42    /// ```ignore
43    /// let dialable: Option<Vec<PublicKey>> = sender
44    ///     .request(|tx| Message::Dialable { responder: tx })
45    ///     .await;
46    /// ```
47    fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
48    where
49        R: Send,
50        F: FnOnce(oneshot::Sender<R>) -> T + Send;
51
52    /// Send a request and return the provided default on failure.
53    ///
54    /// This is a convenience wrapper around [`request`](Self::request) for cases
55    /// where you have a sensible default value.
56    fn request_or<R, F>(
57        &self,
58        make_msg: F,
59        default: R,
60    ) -> impl std::future::Future<Output = R> + Send
61    where
62        R: Send,
63        F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65    /// Send a request and return `R::default()` on failure.
66    ///
67    /// This is a convenience wrapper around [`request`](Self::request) for types
68    /// that implement [`Default`].
69    fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
70    where
71        R: Default + Send,
72        F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76    fn send_lossy(&self, msg: T) -> bool {
77        self.send(msg).is_ok()
78    }
79
80    async fn request<R, F>(&self, make_msg: F) -> Option<R>
81    where
82        R: Send,
83        F: FnOnce(oneshot::Sender<R>) -> T + Send,
84    {
85        let (tx, rx) = oneshot::channel();
86        if self.send(make_msg(tx)).is_err() {
87            return None;
88        }
89        rx.await.ok()
90    }
91
92    async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93    where
94        R: Send,
95        F: FnOnce(oneshot::Sender<R>) -> T + Send,
96    {
97        self.request(make_msg).await.unwrap_or(default)
98    }
99
100    async fn request_or_default<R, F>(&self, make_msg: F) -> R
101    where
102        R: Default + Send,
103        F: FnOnce(oneshot::Sender<R>) -> T + Send,
104    {
105        self.request(make_msg).await.unwrap_or_default()
106    }
107}
108
109/// Extension trait for bounded channel operations that may fail due to disconnection.
110///
111/// Similar to [`FallibleExt`] but for bounded channels where send operations are async.
112pub trait AsyncFallibleExt<T> {
113    /// Send a message asynchronously, returning `true` if successful.
114    ///
115    /// Use this for fire-and-forget messages where the receiver
116    /// may have been dropped during shutdown. The return value can
117    /// be ignored if the caller doesn't need to know whether the
118    /// send succeeded.
119    fn send_lossy(&self, msg: T) -> impl std::future::Future<Output = bool> + Send;
120
121    /// Try to send a message without blocking, returning `true` if successful.
122    ///
123    /// Use this for fire-and-forget messages where you don't want to wait
124    /// if the channel is full. Returns `false` if the channel is full or
125    /// disconnected.
126    fn try_send_lossy(&self, msg: T) -> bool;
127
128    /// Send a request message containing a oneshot responder and await the response.
129    ///
130    /// Returns `None` if:
131    /// - The receiver has been dropped (send fails)
132    /// - The responder is dropped without sending (receive fails)
133    fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
134    where
135        R: Send,
136        F: FnOnce(oneshot::Sender<R>) -> T + Send;
137
138    /// Send a request and return the provided default on failure.
139    fn request_or<R, F>(
140        &self,
141        make_msg: F,
142        default: R,
143    ) -> impl std::future::Future<Output = R> + Send
144    where
145        R: Send,
146        F: FnOnce(oneshot::Sender<R>) -> T + Send;
147
148    /// Send a request and return `R::default()` on failure.
149    fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
150    where
151        R: Default + Send,
152        F: FnOnce(oneshot::Sender<R>) -> T + Send;
153}
154
155impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
156    async fn send_lossy(&self, msg: T) -> bool {
157        self.send(msg).await.is_ok()
158    }
159
160    fn try_send_lossy(&self, msg: T) -> bool {
161        self.try_send(msg).is_ok()
162    }
163
164    async fn request<R, F>(&self, make_msg: F) -> Option<R>
165    where
166        R: Send,
167        F: FnOnce(oneshot::Sender<R>) -> T + Send,
168    {
169        let (tx, rx) = oneshot::channel();
170        if self.send(make_msg(tx)).await.is_err() {
171            return None;
172        }
173        rx.await.ok()
174    }
175
176    async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
177    where
178        R: Send,
179        F: FnOnce(oneshot::Sender<R>) -> T + Send,
180    {
181        self.request(make_msg).await.unwrap_or(default)
182    }
183
184    async fn request_or_default<R, F>(&self, make_msg: F) -> R
185    where
186        R: Default + Send,
187        F: FnOnce(oneshot::Sender<R>) -> T + Send,
188    {
189        self.request(make_msg).await.unwrap_or_default()
190    }
191}
192
193/// Extension trait for oneshot sender operations that may fail due to disconnection.
194///
195/// Use this when the receiver may have been dropped during shutdown
196/// and you want to handle that gracefully rather than panicking.
197pub trait OneshotExt<T> {
198    /// Send a value, returning `true` if successful.
199    ///
200    /// Use this for fire-and-forget responses where the requester
201    /// may have been dropped during shutdown. The return value can
202    /// be ignored if the caller doesn't need to know whether the
203    /// send succeeded.
204    ///
205    /// Consumes the sender.
206    fn send_lossy(self, msg: T) -> bool;
207}
208
209impl<T> OneshotExt<T> for oneshot::Sender<T> {
210    fn send_lossy(self, msg: T) -> bool {
211        self.send(msg).is_ok()
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use commonware_macros::test_async;
219
220    #[derive(Debug)]
221    #[allow(dead_code)]
222    enum TestMessage {
223        FireAndForget(u32),
224        Request {
225            responder: oneshot::Sender<String>,
226        },
227        RequestBool {
228            responder: oneshot::Sender<bool>,
229        },
230        RequestVec {
231            responder: oneshot::Sender<Vec<u32>>,
232        },
233    }
234
235    #[test]
236    fn test_send_lossy_success() {
237        let (tx, mut rx) = mpsc::unbounded_channel();
238        assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
239
240        // Message should be received
241        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
242    }
243
244    #[test]
245    fn test_send_lossy_disconnected() {
246        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
247        drop(rx);
248
249        // Should not panic, returns false
250        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
251    }
252
253    #[test_async]
254    async fn test_request_send_disconnected() {
255        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
256        drop(rx);
257
258        let result: Option<String> = tx
259            .request(|responder| TestMessage::Request { responder })
260            .await;
261
262        assert_eq!(result, None);
263    }
264
265    #[test_async]
266    async fn test_request_or_disconnected() {
267        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
268        drop(rx);
269
270        let result = tx
271            .request_or(|responder| TestMessage::RequestBool { responder }, false)
272            .await;
273
274        assert!(!result);
275    }
276
277    #[test_async]
278    async fn test_request_or_default_disconnected() {
279        let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
280        drop(rx);
281
282        let result: Vec<u32> = tx
283            .request_or_default(|responder| TestMessage::RequestVec { responder })
284            .await;
285
286        assert!(result.is_empty());
287    }
288
289    // AsyncFallibleExt tests for bounded channels
290
291    #[test_async]
292    async fn test_async_send_lossy_success() {
293        let (tx, mut rx) = mpsc::channel(1);
294        assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
295
296        // Message should be received
297        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
298    }
299
300    #[test_async]
301    async fn test_async_send_lossy_disconnected() {
302        let (tx, rx) = mpsc::channel::<TestMessage>(1);
303        drop(rx);
304
305        // Should not panic, returns false
306        assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
307    }
308
309    #[test_async]
310    async fn test_async_request_send_disconnected() {
311        let (tx, rx) = mpsc::channel::<TestMessage>(1);
312        drop(rx);
313
314        let result: Option<String> =
315            AsyncFallibleExt::request(&tx, |responder| TestMessage::Request { responder }).await;
316
317        assert_eq!(result, None);
318    }
319
320    #[test_async]
321    async fn test_async_request_or_disconnected() {
322        let (tx, rx) = mpsc::channel::<TestMessage>(1);
323        drop(rx);
324
325        let result = AsyncFallibleExt::request_or(
326            &tx,
327            |responder| TestMessage::RequestBool { responder },
328            false,
329        )
330        .await;
331
332        assert!(!result);
333    }
334
335    #[test_async]
336    async fn test_async_request_or_default_disconnected() {
337        let (tx, rx) = mpsc::channel::<TestMessage>(1);
338        drop(rx);
339
340        let result: Vec<u32> = AsyncFallibleExt::request_or_default(&tx, |responder| {
341            TestMessage::RequestVec { responder }
342        })
343        .await;
344
345        assert!(result.is_empty());
346    }
347
348    // try_send_lossy tests
349
350    #[test]
351    fn test_try_send_lossy_success() {
352        let (tx, mut rx) = mpsc::channel(1);
353        assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
354
355        // Message should be received
356        assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
357    }
358
359    #[test]
360    fn test_try_send_lossy_disconnected() {
361        let (tx, rx) = mpsc::channel::<TestMessage>(1);
362        drop(rx);
363
364        // Should not panic, returns false
365        assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
366    }
367
368    // OneshotExt tests
369
370    #[test]
371    fn test_oneshot_send_lossy_success() {
372        let (tx, mut rx) = oneshot::channel::<u32>();
373        assert!(tx.send_lossy(42));
374        assert_eq!(rx.try_recv(), Ok(42));
375    }
376
377    #[test]
378    fn test_oneshot_send_lossy_disconnected() {
379        let (tx, rx) = oneshot::channel::<u32>();
380        drop(rx);
381        assert!(!tx.send_lossy(42));
382    }
383}