channel_receiver/
impl_async_channel.rs1pub use async_channel::Receiver as AsyncChannelReceiver;
2use async_channel::TryRecvError as TryRecvErrorInner;
3
4mod multi_consumer_impl {
6 use super::*;
7
8 use crate::{error::TryRecvError, multi_consumer::AsyncReceiver};
9
10 #[async_trait::async_trait]
11 impl<T> AsyncReceiver<T> for AsyncChannelReceiver<T> {
12 async fn recv(&mut self) -> Option<T>
13 where
14 T: Send,
15 {
16 AsyncChannelReceiver::recv(self).await.ok()
17 }
18
19 fn try_recv(&mut self) -> Result<T, TryRecvError> {
20 AsyncChannelReceiver::try_recv(self).map_err(Into::into)
21 }
22 }
23}
24
25mod generic_impl {
27 use super::*;
28
29 use crate::{error::TryRecvError, generic::AsyncReceiver};
30
31 #[async_trait::async_trait]
32 impl<T> AsyncReceiver<T> for AsyncChannelReceiver<T> {
33 async fn recv(&mut self) -> Option<T>
34 where
35 T: Send,
36 {
37 AsyncChannelReceiver::recv(self).await.ok()
38 }
39
40 fn try_recv(&mut self) -> Result<T, TryRecvError> {
41 AsyncChannelReceiver::try_recv(self).map_err(Into::into)
42 }
43 }
44}
45
46mod error_convert {
48 use super::*;
49
50 use crate::error::TryRecvError;
51
52 impl From<TryRecvErrorInner> for TryRecvError {
53 fn from(err: TryRecvErrorInner) -> Self {
54 match err {
55 TryRecvErrorInner::Empty => Self::Empty,
56 TryRecvErrorInner::Closed => Self::Closed,
57 }
58 }
59 }
60}
61
62#[cfg(test)]
63mod multi_consumer_impl_tests {
64 use crate::{error::TryRecvError, multi_consumer::AsyncReceiver};
65
66 #[tokio::test]
67 async fn test_with_bounded() {
68 {
69 let (tx, rx) = async_channel::bounded(1);
70 let receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
71 let mut receiver = receiver.clone();
72 assert_eq!(tx.send(1).await, Ok(()));
73 assert_eq!(receiver.recv().await, Some(1));
74 assert_eq!(tx.try_send(2), Ok(()));
75 assert_eq!(receiver.recv().await, Some(2));
76 assert!(
77 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
78 .await
79 .is_err()
80 );
81 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
82 drop(tx);
83 assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
84 assert_eq!(
85 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
86 .await,
87 Ok(None)
88 );
89 }
90 }
91
92 #[tokio::test]
93 async fn test_with_unbounded() {
94 {
95 let (tx, rx) = async_channel::unbounded();
96 let receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
97 let mut receiver = receiver.clone();
98 assert_eq!(tx.send(1).await, Ok(()));
99 assert_eq!(tx.send(2).await, Ok(()));
100 assert_eq!(receiver.recv().await, Some(1));
101 assert_eq!(receiver.recv().await, Some(2));
102 assert!(
103 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
104 .await
105 .is_err()
106 );
107 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
108 drop(tx);
109 assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
110 assert_eq!(
111 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
112 .await,
113 Ok(None)
114 );
115 }
116 }
117}
118
119#[cfg(test)]
120mod generic_impl_tests {
121 use crate::{error::TryRecvError, generic::AsyncReceiver};
122
123 #[tokio::test]
124 async fn test_with_bounded() {
125 {
126 let (tx, rx) = async_channel::bounded(1);
127 let mut receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
128 assert_eq!(tx.send(1).await, Ok(()));
129 assert_eq!(receiver.recv().await, Some(1));
130 assert_eq!(tx.try_send(2), Ok(()));
131 assert_eq!(receiver.recv().await, Some(2));
132 assert!(
133 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
134 .await
135 .is_err()
136 );
137 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
138 drop(tx);
139 assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
140 assert_eq!(
141 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
142 .await,
143 Ok(None)
144 );
145 }
146 }
147
148 #[tokio::test]
149 async fn test_with_unbounded() {
150 {
151 let (tx, rx) = async_channel::unbounded();
152 let mut receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
153 assert_eq!(tx.send(1).await, Ok(()));
154 assert_eq!(tx.send(2).await, Ok(()));
155 assert_eq!(receiver.recv().await, Some(1));
156 assert_eq!(receiver.recv().await, Some(2));
157 assert!(
158 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
159 .await
160 .is_err()
161 );
162 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
163 drop(tx);
164 assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
165 assert_eq!(
166 tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
167 .await,
168 Ok(None)
169 );
170 }
171 }
172}