commonware_utils/channel/
fallible.rs1use super::{mpsc, oneshot};
20
21pub trait FallibleExt<T> {
26 fn send_lossy(&self, msg: T) -> bool;
33
34 fn request<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
48 where
49 R: Send,
50 F: FnOnce(oneshot::Sender<R>) -> T + Send;
51
52 fn request_or<R, F>(
57 &self,
58 make_msg: F,
59 default: R,
60 ) -> impl std::future::Future<Output = R> + Send
61 where
62 R: Send,
63 F: FnOnce(oneshot::Sender<R>) -> T + Send;
64
65 fn request_or_default<R, F>(&self, make_msg: F) -> impl std::future::Future<Output = R> + Send
70 where
71 R: Default + Send,
72 F: FnOnce(oneshot::Sender<R>) -> T + Send;
73}
74
75impl<T: Send> FallibleExt<T> for mpsc::UnboundedSender<T> {
76 fn send_lossy(&self, msg: T) -> bool {
77 self.send(msg).is_ok()
78 }
79
80 async fn request<R, F>(&self, make_msg: F) -> Option<R>
81 where
82 R: Send,
83 F: FnOnce(oneshot::Sender<R>) -> T + Send,
84 {
85 let (tx, rx) = oneshot::channel();
86 if self.send(make_msg(tx)).is_err() {
87 return None;
88 }
89 rx.await.ok()
90 }
91
92 async fn request_or<R, F>(&self, make_msg: F, default: R) -> R
93 where
94 R: Send,
95 F: FnOnce(oneshot::Sender<R>) -> T + Send,
96 {
97 self.request(make_msg).await.unwrap_or(default)
98 }
99
100 async fn request_or_default<R, F>(&self, make_msg: F) -> R
101 where
102 R: Default + Send,
103 F: FnOnce(oneshot::Sender<R>) -> T + Send,
104 {
105 self.request(make_msg).await.unwrap_or_default()
106 }
107}
108
109pub trait AsyncFallibleExt<T> {
113 fn send_lossy(&mut self, msg: T) -> impl std::future::Future<Output = bool> + Send;
120
121 fn try_send_lossy(&mut self, msg: T) -> bool;
127
128 fn request<R, F>(&mut self, make_msg: F) -> impl std::future::Future<Output = Option<R>> + Send
134 where
135 R: Send,
136 F: FnOnce(oneshot::Sender<R>) -> T + Send;
137
138 fn request_or<R, F>(
140 &mut self,
141 make_msg: F,
142 default: R,
143 ) -> impl std::future::Future<Output = R> + Send
144 where
145 R: Send,
146 F: FnOnce(oneshot::Sender<R>) -> T + Send;
147
148 fn request_or_default<R, F>(
150 &mut self,
151 make_msg: F,
152 ) -> impl std::future::Future<Output = R> + Send
153 where
154 R: Default + Send,
155 F: FnOnce(oneshot::Sender<R>) -> T + Send;
156}
157
158impl<T: Send> AsyncFallibleExt<T> for mpsc::Sender<T> {
159 async fn send_lossy(&mut self, msg: T) -> bool {
160 self.send(msg).await.is_ok()
161 }
162
163 fn try_send_lossy(&mut self, msg: T) -> bool {
164 self.try_send(msg).is_ok()
165 }
166
167 async fn request<R, F>(&mut self, make_msg: F) -> Option<R>
168 where
169 R: Send,
170 F: FnOnce(oneshot::Sender<R>) -> T + Send,
171 {
172 let (tx, rx) = oneshot::channel();
173 if self.send(make_msg(tx)).await.is_err() {
174 return None;
175 }
176 rx.await.ok()
177 }
178
179 async fn request_or<R, F>(&mut self, make_msg: F, default: R) -> R
180 where
181 R: Send,
182 F: FnOnce(oneshot::Sender<R>) -> T + Send,
183 {
184 self.request(make_msg).await.unwrap_or(default)
185 }
186
187 async fn request_or_default<R, F>(&mut self, make_msg: F) -> R
188 where
189 R: Default + Send,
190 F: FnOnce(oneshot::Sender<R>) -> T + Send,
191 {
192 self.request(make_msg).await.unwrap_or_default()
193 }
194}
195
196pub trait OneshotExt<T> {
201 fn send_lossy(self, msg: T) -> bool;
210}
211
212impl<T> OneshotExt<T> for oneshot::Sender<T> {
213 fn send_lossy(self, msg: T) -> bool {
214 self.send(msg).is_ok()
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use commonware_macros::test_async;
222
223 #[derive(Debug)]
224 #[allow(dead_code)]
225 enum TestMessage {
226 FireAndForget(u32),
227 Request {
228 responder: oneshot::Sender<String>,
229 },
230 RequestBool {
231 responder: oneshot::Sender<bool>,
232 },
233 RequestVec {
234 responder: oneshot::Sender<Vec<u32>>,
235 },
236 }
237
238 #[test]
239 fn test_send_lossy_success() {
240 let (tx, mut rx) = mpsc::unbounded_channel();
241 assert!(tx.send_lossy(TestMessage::FireAndForget(42)));
242
243 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
245 }
246
247 #[test]
248 fn test_send_lossy_disconnected() {
249 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
250 drop(rx);
251
252 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)));
254 }
255
256 #[test_async]
257 async fn test_request_send_disconnected() {
258 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
259 drop(rx);
260
261 let result: Option<String> = tx
262 .request(|responder| TestMessage::Request { responder })
263 .await;
264
265 assert_eq!(result, None);
266 }
267
268 #[test_async]
269 async fn test_request_or_disconnected() {
270 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
271 drop(rx);
272
273 let result = tx
274 .request_or(|responder| TestMessage::RequestBool { responder }, false)
275 .await;
276
277 assert!(!result);
278 }
279
280 #[test_async]
281 async fn test_request_or_default_disconnected() {
282 let (tx, rx) = mpsc::unbounded_channel::<TestMessage>();
283 drop(rx);
284
285 let result: Vec<u32> = tx
286 .request_or_default(|responder| TestMessage::RequestVec { responder })
287 .await;
288
289 assert!(result.is_empty());
290 }
291
292 #[test_async]
295 async fn test_async_send_lossy_success() {
296 let (mut tx, mut rx) = mpsc::channel(1);
297 assert!(tx.send_lossy(TestMessage::FireAndForget(42)).await);
298
299 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
301 }
302
303 #[test_async]
304 async fn test_async_send_lossy_disconnected() {
305 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
306 drop(rx);
307
308 assert!(!tx.send_lossy(TestMessage::FireAndForget(42)).await);
310 }
311
312 #[test_async]
313 async fn test_async_request_send_disconnected() {
314 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
315 drop(rx);
316
317 let result: Option<String> =
318 AsyncFallibleExt::request(&mut tx, |responder| TestMessage::Request { responder })
319 .await;
320
321 assert_eq!(result, None);
322 }
323
324 #[test_async]
325 async fn test_async_request_or_disconnected() {
326 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
327 drop(rx);
328
329 let result = AsyncFallibleExt::request_or(
330 &mut tx,
331 |responder| TestMessage::RequestBool { responder },
332 false,
333 )
334 .await;
335
336 assert!(!result);
337 }
338
339 #[test_async]
340 async fn test_async_request_or_default_disconnected() {
341 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
342 drop(rx);
343
344 let result: Vec<u32> = AsyncFallibleExt::request_or_default(&mut tx, |responder| {
345 TestMessage::RequestVec { responder }
346 })
347 .await;
348
349 assert!(result.is_empty());
350 }
351
352 #[test]
355 fn test_try_send_lossy_success() {
356 let (mut tx, mut rx) = mpsc::channel(1);
357 assert!(tx.try_send_lossy(TestMessage::FireAndForget(42)));
358
359 assert!(matches!(rx.try_recv(), Ok(TestMessage::FireAndForget(42))));
361 }
362
363 #[test]
364 fn test_try_send_lossy_disconnected() {
365 let (mut tx, rx) = mpsc::channel::<TestMessage>(1);
366 drop(rx);
367
368 assert!(!tx.try_send_lossy(TestMessage::FireAndForget(42)));
370 }
371
372 #[test]
375 fn test_oneshot_send_lossy_success() {
376 let (tx, mut rx) = oneshot::channel::<u32>();
377 assert!(tx.send_lossy(42));
378 assert_eq!(rx.try_recv(), Ok(42));
379 }
380
381 #[test]
382 fn test_oneshot_send_lossy_disconnected() {
383 let (tx, rx) = oneshot::channel::<u32>();
384 drop(rx);
385 assert!(!tx.send_lossy(42));
386 }
387}