commonware_utils/channel/
fallible.rs1use super::{
20 mpsc, oneshot,
21 reservation::{Reservation, ReservationExt},
22};
23use std::future::Future;
24
25pub trait FallibleExt<T> {
30 fn send_lossy(&self, msg: T) -> bool;
37
38 fn request<R, F>(&self, make_msg: F) -> impl Future<Output = Option<R>> + Send
52 where
53 R: Send,
54 F: FnOnce(oneshot::Sender<R>) -> T + Send;
55
56 fn request_or<R, F>(&self, make_msg: F, default: R) -> impl Future<Output = R> + Send
61 where
62 R: Send,
63 F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65 fn request_or_default<R, F>(&self, make_msg: F) -> impl Future<Output = R> + Send
70 where
71 R: Default + Send,
72 F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76 fn send_lossy(&self, msg: T) -> bool {
77 self.send(msg).is_ok()
78 }
79
80 async fn request<R, F>(&self, make_msg: F) -> Option<R>
81 where
82 R: Send,
83 F: FnOnce(oneshot::Sender<R>) -> T + Send,
84 {
85 let (tx, rx) = oneshot::channel();
86 if self.send(make_msg(tx)).is_err() {
87 return None;
88 }
89 rx.await.ok()
90 }
91
92 async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93 where
94 R: Send,
95 F: FnOnce(oneshot::Sender<R>) -> T + Send,
96 {
97 self.request(make_msg).await.unwrap_or(default)
98 }
99
100 async fn request_or_default<R, F>(&self, make_msg: F) -> R
101 where
102 R: Default + Send,
103 F: FnOnce(oneshot::Sender<R>) -> T + Send,
104 {
105 self.request(make_msg).await.unwrap_or_default()
106 }
107}
108
109pub trait AsyncFallibleExt<T> {
113 fn send_lossy(&self, msg: T) -> impl Future<Output = bool> + Send;
120
121 fn try_send_lossy(&self, msg: T) -> bool;
127
128 #[must_use = "await and send any reservation"]
132 fn send_or_reserve_lossy(&self, msg: T) -> Option<Reservation<T>>
133 where
134 T: 'static;
135
136 fn request<R, F>(&self, make_msg: F) -> impl Future<Output = Option<R>> + Send
142 where
143 R: Send,
144 F: FnOnce(oneshot::Sender<R>) -> T + Send;
145
146 fn request_or<R, F>(&self, make_msg: F, default: R) -> impl Future<Output = R> + Send
148 where
149 R: Send,
150 F: FnOnce(oneshot::Sender<R>) -> T + Send;
151
152 fn request_or_default<R, F>(&self, make_msg: F) -> impl Future<Output = R> + Send
154 where
155 R: Default + Send,
156 F: FnOnce(oneshot::Sender<R>) -> T + Send;
157}
158
159impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
160 async fn send_lossy(&self, msg: T) -> bool {
161 self.send(msg).await.is_ok()
162 }
163
164 fn try_send_lossy(&self, msg: T) -> bool {
165 self.try_send(msg).is_ok()
166 }
167
168 fn send_or_reserve_lossy(&self, msg: T) -> Option<Reservation<T>>
169 where
170 T: 'static,
171 {
172 self.send_or_reserve(msg).ok().flatten()
173 }
174
175 async fn request<R, F>(&self, make_msg: F) -> Option<R>
176 where
177 R: Send,
178 F: FnOnce(oneshot::Sender<R>) -> T + Send,
179 {
180 let (tx, rx) = oneshot::channel();
181 if self.send(make_msg(tx)).await.is_err() {
182 return None;
183 }
184 rx.await.ok()
185 }
186
187 async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
188 where
189 R: Send,
190 F: FnOnce(oneshot::Sender<R>) -> T + Send,
191 {
192 self.request(make_msg).await.unwrap_or(default)
193 }
194
195 async fn request_or_default<R, F>(&self, make_msg: F) -> R
196 where
197 R: Default + Send,
198 F: FnOnce(oneshot::Sender<R>) -> T + Send,
199 {
200 self.request(make_msg).await.unwrap_or_default()
201 }
202}
203
204pub trait OneshotExt<T> {
209 fn send_lossy(self, msg: T) -> bool;
218}
219
220impl<T> OneshotExt<T> for oneshot::Sender<T> {
221 fn send_lossy(self, msg: T) -> bool {
222 self.send(msg).is_ok()
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use commonware_macros::test_async;
230
231 #[derive(Debug)]
232 #[allow(dead_code)]
233 enum TestMessage {
234 FireAndForget(u32),
235 Request {
236 responder: oneshot::Sender<String>,
237 },
238 RequestBool {
239 responder: oneshot::Sender<bool>,
240 },
241 RequestVec {
242 responder: oneshot::Sender<Vec<u32>>,
243 },
244 }
245
246 #[test]
247 fn test_send_lossy_success() {
248 let (tx, mut rx) = mpsc::unbounded_channel();
249 assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
250
251 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
253 }
254
255 #[test]
256 fn test_send_lossy_disconnected() {
257 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
258 drop(rx);
259
260 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
262 }
263
264 #[test_async]
265 async fn test_request_send_disconnected() {
266 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
267 drop(rx);
268
269 let result: Option<String> = tx
270 .request(|responder| TestMessage::Request { responder })
271 .await;
272
273 assert_eq!(result, None);
274 }
275
276 #[test_async]
277 async fn test_request_or_disconnected() {
278 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
279 drop(rx);
280
281 let result = tx
282 .request_or(|responder| TestMessage::RequestBool { responder }, false)
283 .await;
284
285 assert!(!result);
286 }
287
288 #[test_async]
289 async fn test_request_or_default_disconnected() {
290 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
291 drop(rx);
292
293 let result: Vec<u32> = tx
294 .request_or_default(|responder| TestMessage::RequestVec { responder })
295 .await;
296
297 assert!(result.is_empty());
298 }
299
300 #[test_async]
303 async fn test_async_send_lossy_success() {
304 let (tx, mut rx) = mpsc::channel(1);
305 assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
306
307 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
309 }
310
311 #[test_async]
312 async fn test_async_send_lossy_disconnected() {
313 let (tx, rx) = mpsc::channel::<TestMessage>(1);
314 drop(rx);
315
316 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
318 }
319
320 #[test_async]
321 async fn test_async_request_send_disconnected() {
322 let (tx, rx) = mpsc::channel::<TestMessage>(1);
323 drop(rx);
324
325 let result: Option<String> =
326 AsyncFallibleExt::request(&tx, |responder| TestMessage::Request { responder }).await;
327
328 assert_eq!(result, None);
329 }
330
331 #[test_async]
332 async fn test_async_request_or_disconnected() {
333 let (tx, rx) = mpsc::channel::<TestMessage>(1);
334 drop(rx);
335
336 let result = AsyncFallibleExt::request_or(
337 &tx,
338 |responder| TestMessage::RequestBool { responder },
339 false,
340 )
341 .await;
342
343 assert!(!result);
344 }
345
346 #[test_async]
347 async fn test_async_request_or_default_disconnected() {
348 let (tx, rx) = mpsc::channel::<TestMessage>(1);
349 drop(rx);
350
351 let result: Vec<u32> = AsyncFallibleExt::request_or_default(&tx, |responder| {
352 TestMessage::RequestVec { responder }
353 })
354 .await;
355
356 assert!(result.is_empty());
357 }
358
359 #[test]
362 fn test_try_send_lossy_success() {
363 let (tx, mut rx) = mpsc::channel(1);
364 assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
365
366 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
368 }
369
370 #[test]
371 fn test_try_send_lossy_disconnected() {
372 let (tx, rx) = mpsc::channel::<TestMessage>(1);
373 drop(rx);
374
375 assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
377 }
378
379 #[test]
382 fn test_send_or_reserve_lossy_success() {
383 let (tx, mut rx) = mpsc::channel(1);
384
385 assert!(tx
386 .send_or_reserve_lossy(TestMessage::FireAndForget(42))
387 .is_none());
388 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
389 }
390
391 #[test]
392 fn test_send_or_reserve_lossy_disconnected() {
393 let (tx, rx) = mpsc::channel::<TestMessage>(1);
394 drop(rx);
395
396 assert!(tx
397 .send_or_reserve_lossy(TestMessage::FireAndForget(42))
398 .is_none());
399 }
400
401 #[test_async]
402 async fn test_send_or_reserve_lossy_reserves_when_full() {
403 let (tx, mut rx) = mpsc::channel(1);
404 tx.try_send(TestMessage::FireAndForget(1)).unwrap();
405
406 let reservation = tx
407 .send_or_reserve_lossy(TestMessage::FireAndForget(2))
408 .expect("receiver should be open");
409
410 assert!(matches!(
411 rx.recv().await,
412 Some(TestMessage::FireAndForget(1))
413 ));
414 reservation.await.unwrap().send();
415 assert!(matches!(
416 rx.recv().await,
417 Some(TestMessage::FireAndForget(2))
418 ));
419 }
420
421 #[test_async]
422 async fn test_send_or_reserve_lossy_reserved_disconnected() {
423 let (tx, rx) = mpsc::channel(1);
424 tx.try_send(TestMessage::FireAndForget(1)).unwrap();
425
426 let reservation = tx
427 .send_or_reserve_lossy(TestMessage::FireAndForget(2))
428 .expect("receiver should be open");
429 drop(rx);
430
431 assert!(reservation.await.is_err());
432 }
433
434 #[test]
437 fn test_oneshot_send_lossy_success() {
438 let (tx, mut rx) = oneshot::channel::<u32>();
439 assert!(tx.send_lossy(42));
440 assert_eq!(rx.try_recv(), Ok(42));
441 }
442
443 #[test]
444 fn test_oneshot_send_lossy_disconnected() {
445 let (tx, rx) = oneshot::channel::<u32>();
446 drop(rx);
447 assert!(!tx.send_lossy(42));
448 }
449}