commonware_utils/channels/
fallible.rs1use futures::channel::{mpsc, oneshot};
20
21pub trait FallibleExt<T> {
26 fn send_lossy(&self, msg: T) -> bool;
33
34 fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
48 where
49 R: Send,
50 F: FnOnce(oneshot::Sender<R>) -> T + Send;
51
52 fn request_or<R, F>(
57 &self,
58 make_msg: F,
59 default: R,
60 ) -> impl std::future::Future<Output = R> + Send
61 where
62 R: Send,
63 F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65 fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
70 where
71 R: Default + Send,
72 F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76 fn send_lossy(&self, msg: T) -> bool {
77 self.unbounded_send(msg).is_ok()
78 }
79
80 async fn request<R, F>(&self, make_msg: F) -> Option<R>
81 where
82 R: Send,
83 F: FnOnce(oneshot::Sender<R>) -> T + Send,
84 {
85 let (tx, rx) = oneshot::channel();
86 if self.unbounded_send(make_msg(tx)).is_err() {
87 return None;
88 }
89 rx.await.ok()
90 }
91
92 async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93 where
94 R: Send,
95 F: FnOnce(oneshot::Sender<R>) -> T + Send,
96 {
97 self.request(make_msg).await.unwrap_or(default)
98 }
99
100 async fn request_or_default<R, F>(&self, make_msg: F) -> R
101 where
102 R: Default + Send,
103 F: FnOnce(oneshot::Sender<R>) -> T + Send,
104 {
105 self.request(make_msg).await.unwrap_or_default()
106 }
107}
108
109pub trait AsyncFallibleExt<T> {
113 fn send_lossy(&mut self, msg: T) -> impl std::future::Future<Output = bool> + Send;
120
121 fn try_send_lossy(&mut self, msg: T) -> bool;
127
128 fn request<R, F>(&mut self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
134 where
135 R: Send,
136 F: FnOnce(oneshot::Sender<R>) -> T + Send;
137
138 fn request_or<R, F>(
140 &mut self,
141 make_msg: F,
142 default: R,
143 ) -> impl std::future::Future<Output = R> + Send
144 where
145 R: Send,
146 F: FnOnce(oneshot::Sender<R>) -> T + Send;
147
148 fn request_or_default<R, F>(
150 &mut self,
151 make_msg: F,
152 ) -> impl std::future::Future<Output = R> + Send
153 where
154 R: Default + Send,
155 F: FnOnce(oneshot::Sender<R>) -> T + Send;
156}
157
158impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
159 async fn send_lossy(&mut self, msg: T) -> bool {
160 futures::SinkExt::send(self, msg).await.is_ok()
161 }
162
163 fn try_send_lossy(&mut self, msg: T) -> bool {
164 self.try_send(msg).is_ok()
165 }
166
167 async fn request<R, F>(&mut self, make_msg: F) -> Option<R>
168 where
169 R: Send,
170 F: FnOnce(oneshot::Sender<R>) -> T + Send,
171 {
172 let (tx, rx) = oneshot::channel();
173 if futures::SinkExt::send(self, make_msg(tx)).await.is_err() {
174 return None;
175 }
176 rx.await.ok()
177 }
178
179 async fn request_or<R, F>(&mut self, make_msg: F, default: R) -> R
180 where
181 R: Send,
182 F: FnOnce(oneshot::Sender<R>) -> T + Send,
183 {
184 self.request(make_msg).await.unwrap_or(default)
185 }
186
187 async fn request_or_default<R, F>(&mut self, make_msg: F) -> R
188 where
189 R: Default + Send,
190 F: FnOnce(oneshot::Sender<R>) -> T + Send,
191 {
192 self.request(make_msg).await.unwrap_or_default()
193 }
194}
195
196pub trait OneshotExt<T> {
201 fn send_lossy(self, msg: T) -> bool;
210}
211
212impl<T> OneshotExt<T> for oneshot::Sender<T> {
213 fn send_lossy(self, msg: T) -> bool {
214 self.send(msg).is_ok()
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use commonware_macros::test_async;
222
223 #[derive(Debug)]
224 #[allow(dead_code)]
225 enum TestMessage {
226 FireAndForget(u32),
227 Request {
228 responder: oneshot::Sender<String>,
229 },
230 RequestBool {
231 responder: oneshot::Sender<bool>,
232 },
233 RequestVec {
234 responder: oneshot::Sender<Vec<u32>>,
235 },
236 }
237
238 #[test]
239 fn test_send_lossy_success() {
240 let (tx, mut rx) = mpsc::unbounded();
241 assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
242
243 assert!(matches!(
245 rx.try_next(),
246 Ok(Some(TestMessage::FireAndForget(42)))
247 ));
248 }
249
250 #[test]
251 fn test_send_lossy_disconnected() {
252 let (tx, rx) = mpsc::unbounded::<TestMessage>();
253 drop(rx);
254
255 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
257 }
258
259 #[test_async]
260 async fn test_request_send_disconnected() {
261 let (tx, rx) = mpsc::unbounded::<TestMessage>();
262 drop(rx);
263
264 let result: Option<String> = tx
265 .request(|responder| TestMessage::Request { responder })
266 .await;
267
268 assert_eq!(result, None);
269 }
270
271 #[test_async]
272 async fn test_request_or_disconnected() {
273 let (tx, rx) = mpsc::unbounded::<TestMessage>();
274 drop(rx);
275
276 let result = tx
277 .request_or(|responder| TestMessage::RequestBool { responder }, false)
278 .await;
279
280 assert!(!result);
281 }
282
283 #[test_async]
284 async fn test_request_or_default_disconnected() {
285 let (tx, rx) = mpsc::unbounded::<TestMessage>();
286 drop(rx);
287
288 let result: Vec<u32> = tx
289 .request_or_default(|responder| TestMessage::RequestVec { responder })
290 .await;
291
292 assert!(result.is_empty());
293 }
294
295 #[test_async]
298 async fn test_async_send_lossy_success() {
299 let (mut tx, mut rx) = mpsc::channel(1);
300 assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
301
302 assert!(matches!(
304 rx.try_next(),
305 Ok(Some(TestMessage::FireAndForget(42)))
306 ));
307 }
308
309 #[test_async]
310 async fn test_async_send_lossy_disconnected() {
311 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
312 drop(rx);
313
314 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
316 }
317
318 #[test_async]
319 async fn test_async_request_send_disconnected() {
320 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
321 drop(rx);
322
323 let result: Option<String> =
324 AsyncFallibleExt::request(&mut tx, |responder| TestMessage::Request { responder })
325 .await;
326
327 assert_eq!(result, None);
328 }
329
330 #[test_async]
331 async fn test_async_request_or_disconnected() {
332 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
333 drop(rx);
334
335 let result = AsyncFallibleExt::request_or(
336 &mut tx,
337 |responder| TestMessage::RequestBool { responder },
338 false,
339 )
340 .await;
341
342 assert!(!result);
343 }
344
345 #[test_async]
346 async fn test_async_request_or_default_disconnected() {
347 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
348 drop(rx);
349
350 let result: Vec<u32> = AsyncFallibleExt::request_or_default(&mut tx, |responder| {
351 TestMessage::RequestVec { responder }
352 })
353 .await;
354
355 assert!(result.is_empty());
356 }
357
358 #[test]
361 fn test_try_send_lossy_success() {
362 let (mut tx, mut rx) = mpsc::channel(1);
363 assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
364
365 assert!(matches!(
367 rx.try_next(),
368 Ok(Some(TestMessage::FireAndForget(42)))
369 ));
370 }
371
372 #[test]
373 fn test_try_send_lossy_disconnected() {
374 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
375 drop(rx);
376
377 assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
379 }
380
381 #[test]
384 fn test_oneshot_send_lossy_success() {
385 use futures::FutureExt;
386 let (tx, rx) = oneshot::channel::<u32>();
387 assert!(tx.send_lossy(42));
388 assert_eq!(rx.now_or_never(), Some(Ok(42)));
389 }
390
391 #[test]
392 fn test_oneshot_send_lossy_disconnected() {
393 let (tx, rx) = oneshot::channel::<u32>();
394 drop(rx);
395 assert!(!tx.send_lossy(42));
396 }
397}