commonware_utils/channel/
fallible.rs1use super::{mpsc, oneshot};
20
21pub trait FallibleExt<T> {
26 fn send_lossy(&self, msg: T) -> bool;
33
34 fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
48 where
49 R: Send,
50 F: FnOnce(oneshot::Sender<R>) -> T + Send;
51
52 fn request_or<R, F>(
57 &self,
58 make_msg: F,
59 default: R,
60 ) -> impl std::future::Future<Output = R> + Send
61 where
62 R: Send,
63 F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65 fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
70 where
71 R: Default + Send,
72 F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76 fn send_lossy(&self, msg: T) -> bool {
77 self.send(msg).is_ok()
78 }
79
80 async fn request<R, F>(&self, make_msg: F) -> Option<R>
81 where
82 R: Send,
83 F: FnOnce(oneshot::Sender<R>) -> T + Send,
84 {
85 let (tx, rx) = oneshot::channel();
86 if self.send(make_msg(tx)).is_err() {
87 return None;
88 }
89 rx.await.ok()
90 }
91
92 async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93 where
94 R: Send,
95 F: FnOnce(oneshot::Sender<R>) -> T + Send,
96 {
97 self.request(make_msg).await.unwrap_or(default)
98 }
99
100 async fn request_or_default<R, F>(&self, make_msg: F) -> R
101 where
102 R: Default + Send,
103 F: FnOnce(oneshot::Sender<R>) -> T + Send,
104 {
105 self.request(make_msg).await.unwrap_or_default()
106 }
107}
108
109pub trait AsyncFallibleExt<T> {
113 fn send_lossy(&self, msg: T) -> impl std::future::Future<Output = bool> + Send;
120
121 fn try_send_lossy(&self, msg: T) -> bool;
127
128 fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
134 where
135 R: Send,
136 F: FnOnce(oneshot::Sender<R>) -> T + Send;
137
138 fn request_or<R, F>(
140 &self,
141 make_msg: F,
142 default: R,
143 ) -> impl std::future::Future<Output = R> + Send
144 where
145 R: Send,
146 F: FnOnce(oneshot::Sender<R>) -> T + Send;
147
148 fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
150 where
151 R: Default + Send,
152 F: FnOnce(oneshot::Sender<R>) -> T + Send;
153}
154
155impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
156 async fn send_lossy(&self, msg: T) -> bool {
157 self.send(msg).await.is_ok()
158 }
159
160 fn try_send_lossy(&self, msg: T) -> bool {
161 self.try_send(msg).is_ok()
162 }
163
164 async fn request<R, F>(&self, make_msg: F) -> Option<R>
165 where
166 R: Send,
167 F: FnOnce(oneshot::Sender<R>) -> T + Send,
168 {
169 let (tx, rx) = oneshot::channel();
170 if self.send(make_msg(tx)).await.is_err() {
171 return None;
172 }
173 rx.await.ok()
174 }
175
176 async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
177 where
178 R: Send,
179 F: FnOnce(oneshot::Sender<R>) -> T + Send,
180 {
181 self.request(make_msg).await.unwrap_or(default)
182 }
183
184 async fn request_or_default<R, F>(&self, make_msg: F) -> R
185 where
186 R: Default + Send,
187 F: FnOnce(oneshot::Sender<R>) -> T + Send,
188 {
189 self.request(make_msg).await.unwrap_or_default()
190 }
191}
192
193pub trait OneshotExt<T> {
198 fn send_lossy(self, msg: T) -> bool;
207}
208
209impl<T> OneshotExt<T> for oneshot::Sender<T> {
210 fn send_lossy(self, msg: T) -> bool {
211 self.send(msg).is_ok()
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use commonware_macros::test_async;
219
220 #[derive(Debug)]
221 #[allow(dead_code)]
222 enum TestMessage {
223 FireAndForget(u32),
224 Request {
225 responder: oneshot::Sender<String>,
226 },
227 RequestBool {
228 responder: oneshot::Sender<bool>,
229 },
230 RequestVec {
231 responder: oneshot::Sender<Vec<u32>>,
232 },
233 }
234
235 #[test]
236 fn test_send_lossy_success() {
237 let (tx, mut rx) = mpsc::unbounded_channel();
238 assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
239
240 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
242 }
243
244 #[test]
245 fn test_send_lossy_disconnected() {
246 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
247 drop(rx);
248
249 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
251 }
252
253 #[test_async]
254 async fn test_request_send_disconnected() {
255 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
256 drop(rx);
257
258 let result: Option<String> = tx
259 .request(|responder| TestMessage::Request { responder })
260 .await;
261
262 assert_eq!(result, None);
263 }
264
265 #[test_async]
266 async fn test_request_or_disconnected() {
267 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
268 drop(rx);
269
270 let result = tx
271 .request_or(|responder| TestMessage::RequestBool { responder }, false)
272 .await;
273
274 assert!(!result);
275 }
276
277 #[test_async]
278 async fn test_request_or_default_disconnected() {
279 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
280 drop(rx);
281
282 let result: Vec<u32> = tx
283 .request_or_default(|responder| TestMessage::RequestVec { responder })
284 .await;
285
286 assert!(result.is_empty());
287 }
288
289 #[test_async]
292 async fn test_async_send_lossy_success() {
293 let (tx, mut rx) = mpsc::channel(1);
294 assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
295
296 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
298 }
299
300 #[test_async]
301 async fn test_async_send_lossy_disconnected() {
302 let (tx, rx) = mpsc::channel::<TestMessage>(1);
303 drop(rx);
304
305 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
307 }
308
309 #[test_async]
310 async fn test_async_request_send_disconnected() {
311 let (tx, rx) = mpsc::channel::<TestMessage>(1);
312 drop(rx);
313
314 let result: Option<String> =
315 AsyncFallibleExt::request(&tx, |responder| TestMessage::Request { responder }).await;
316
317 assert_eq!(result, None);
318 }
319
320 #[test_async]
321 async fn test_async_request_or_disconnected() {
322 let (tx, rx) = mpsc::channel::<TestMessage>(1);
323 drop(rx);
324
325 let result = AsyncFallibleExt::request_or(
326 &tx,
327 |responder| TestMessage::RequestBool { responder },
328 false,
329 )
330 .await;
331
332 assert!(!result);
333 }
334
335 #[test_async]
336 async fn test_async_request_or_default_disconnected() {
337 let (tx, rx) = mpsc::channel::<TestMessage>(1);
338 drop(rx);
339
340 let result: Vec<u32> = AsyncFallibleExt::request_or_default(&tx, |responder| {
341 TestMessage::RequestVec { responder }
342 })
343 .await;
344
345 assert!(result.is_empty());
346 }
347
348 #[test]
351 fn test_try_send_lossy_success() {
352 let (tx, mut rx) = mpsc::channel(1);
353 assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
354
355 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
357 }
358
359 #[test]
360 fn test_try_send_lossy_disconnected() {
361 let (tx, rx) = mpsc::channel::<TestMessage>(1);
362 drop(rx);
363
364 assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
366 }
367
368 #[test]
371 fn test_oneshot_send_lossy_success() {
372 let (tx, mut rx) = oneshot::channel::<u32>();
373 assert!(tx.send_lossy(42));
374 assert_eq!(rx.try_recv(), Ok(42));
375 }
376
377 #[test]
378 fn test_oneshot_send_lossy_disconnected() {
379 let (tx, rx) = oneshot::channel::<u32>();
380 drop(rx);
381 assert!(!tx.send_lossy(42));
382 }
383}