channel_receiver/
impl_async_channel.rs

1pub use async_channel::Receiver as AsyncChannelReceiver;
2use async_channel::TryRecvError as TryRecvErrorInner;
3
4//
5mod multi_consumer_impl {
6    use super::*;
7
8    use crate::{error::TryRecvError, multi_consumer::AsyncReceiver};
9
10    #[async_trait::async_trait]
11    impl<T> AsyncReceiver<T> for AsyncChannelReceiver<T> {
12        async fn recv(&mut self) -> Option<T>
13        where
14            T: Send,
15        {
16            AsyncChannelReceiver::recv(self).await.ok()
17        }
18
19        fn try_recv(&mut self) -> Result<T, TryRecvError> {
20            AsyncChannelReceiver::try_recv(self).map_err(Into::into)
21        }
22    }
23}
24
25//
26mod generic_impl {
27    use super::*;
28
29    use crate::{error::TryRecvError, generic::AsyncReceiver};
30
31    #[async_trait::async_trait]
32    impl<T> AsyncReceiver<T> for AsyncChannelReceiver<T> {
33        async fn recv(&mut self) -> Option<T>
34        where
35            T: Send,
36        {
37            AsyncChannelReceiver::recv(self).await.ok()
38        }
39
40        fn try_recv(&mut self) -> Result<T, TryRecvError> {
41            AsyncChannelReceiver::try_recv(self).map_err(Into::into)
42        }
43    }
44}
45
46//
47mod error_convert {
48    use super::*;
49
50    use crate::error::TryRecvError;
51
52    impl From<TryRecvErrorInner> for TryRecvError {
53        fn from(err: TryRecvErrorInner) -> Self {
54            match err {
55                TryRecvErrorInner::Empty => Self::Empty,
56                TryRecvErrorInner::Closed => Self::Closed,
57            }
58        }
59    }
60}
61
62#[cfg(test)]
63mod multi_consumer_impl_tests {
64    use crate::{error::TryRecvError, multi_consumer::AsyncReceiver};
65
66    #[tokio::test]
67    async fn test_with_bounded() {
68        {
69            let (tx, rx) = async_channel::bounded(1);
70            let receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
71            let mut receiver = receiver.clone();
72            assert_eq!(tx.send(1).await, Ok(()));
73            assert_eq!(receiver.recv().await, Some(1));
74            assert_eq!(tx.try_send(2), Ok(()));
75            assert_eq!(receiver.recv().await, Some(2));
76            assert!(
77                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
78                    .await
79                    .is_err()
80            );
81            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
82            drop(tx);
83            assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
84            assert_eq!(
85                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
86                    .await,
87                Ok(None)
88            );
89        }
90    }
91
92    #[tokio::test]
93    async fn test_with_unbounded() {
94        {
95            let (tx, rx) = async_channel::unbounded();
96            let receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
97            let mut receiver = receiver.clone();
98            assert_eq!(tx.send(1).await, Ok(()));
99            assert_eq!(tx.send(2).await, Ok(()));
100            assert_eq!(receiver.recv().await, Some(1));
101            assert_eq!(receiver.recv().await, Some(2));
102            assert!(
103                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
104                    .await
105                    .is_err()
106            );
107            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
108            drop(tx);
109            assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
110            assert_eq!(
111                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
112                    .await,
113                Ok(None)
114            );
115        }
116    }
117}
118
119#[cfg(test)]
120mod generic_impl_tests {
121    use crate::{error::TryRecvError, generic::AsyncReceiver};
122
123    #[tokio::test]
124    async fn test_with_bounded() {
125        {
126            let (tx, rx) = async_channel::bounded(1);
127            let mut receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
128            assert_eq!(tx.send(1).await, Ok(()));
129            assert_eq!(receiver.recv().await, Some(1));
130            assert_eq!(tx.try_send(2), Ok(()));
131            assert_eq!(receiver.recv().await, Some(2));
132            assert!(
133                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
134                    .await
135                    .is_err()
136            );
137            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
138            drop(tx);
139            assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
140            assert_eq!(
141                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
142                    .await,
143                Ok(None)
144            );
145        }
146    }
147
148    #[tokio::test]
149    async fn test_with_unbounded() {
150        {
151            let (tx, rx) = async_channel::unbounded();
152            let mut receiver: Box<dyn AsyncReceiver<usize>> = Box::new(rx);
153            assert_eq!(tx.send(1).await, Ok(()));
154            assert_eq!(tx.send(2).await, Ok(()));
155            assert_eq!(receiver.recv().await, Some(1));
156            assert_eq!(receiver.recv().await, Some(2));
157            assert!(
158                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
159                    .await
160                    .is_err()
161            );
162            assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
163            drop(tx);
164            assert_eq!(receiver.try_recv(), Err(TryRecvError::Closed));
165            assert_eq!(
166                tokio::time::timeout(tokio::time::Duration::from_millis(200), receiver.recv())
167                    .await,
168                Ok(None)
169            );
170        }
171    }
172}