channel_sender/
impl_async_channel.rs

1pub use async_channel::Sender as AsyncChannelSender;
2use async_channel::TrySendError;
3
4//
5mod multi_producer_impl {
6    use super::*;
7
8    use crate::{
9        error::{SendError, SendErrorWithoutFull},
10        multi_producer::{BoundedSender, UnboundedSender},
11    };
12
13    #[async_trait::async_trait]
14    impl<T> BoundedSender<T> for AsyncChannelSender<T> {
15        async fn send(&self, t: T) -> Result<(), SendErrorWithoutFull<T>>
16        where
17            T: Send,
18        {
19            AsyncChannelSender::send(self, t)
20                .await
21                .map_err(|err| SendErrorWithoutFull::Closed(err.0))
22        }
23
24        fn try_send(&self, t: T) -> Result<(), SendError<T>> {
25            AsyncChannelSender::try_send(self, t).map_err(Into::into)
26        }
27    }
28
29    impl<T> UnboundedSender<T> for AsyncChannelSender<T> {
30        fn send(&self, t: T) -> Result<(), SendErrorWithoutFull<T>> {
31            debug_assert!(
32                !self.is_full(),
33                "Unbounded channels are never full. Make sure you are using `async_channel::unbounded`."
34            );
35
36            match AsyncChannelSender::try_send(self, t) {
37                Ok(_) => Ok(()),
38                Err(err) => match err {
39                    TrySendError::Full(v) => Err(SendErrorWithoutFull::UnreachableFull(v)),
40                    TrySendError::Closed(v) => Err(SendErrorWithoutFull::Closed(v)),
41                },
42            }
43        }
44    }
45}
46
47//
48mod generic_impl {
49    use super::*;
50
51    use crate::{
52        error::SendError,
53        generic::{CloneableSender, Sender},
54    };
55
56    impl<T> Sender<T> for AsyncChannelSender<T> {
57        fn send(&self, t: T) -> Result<(), SendError<T>> {
58            AsyncChannelSender::try_send(self, t).map_err(Into::into)
59        }
60    }
61
62    impl<T> CloneableSender<T> for AsyncChannelSender<T> {
63        fn send(&self, t: T) -> Result<(), SendError<T>> {
64            AsyncChannelSender::try_send(self, t).map_err(Into::into)
65        }
66    }
67}
68
69//
70mod error_convert {
71    use super::*;
72
73    use crate::error::SendError;
74
75    impl<T> From<TrySendError<T>> for SendError<T> {
76        fn from(err: TrySendError<T>) -> Self {
77            match err {
78                TrySendError::Full(v) => Self::Full(v),
79                TrySendError::Closed(v) => Self::Closed(v),
80            }
81        }
82    }
83}
84
85#[cfg(test)]
86mod multi_producer_impl_tests {
87    use crate::{
88        error::{SendError, SendErrorWithoutFull},
89        multi_producer::{BoundedSender, UnboundedSender},
90    };
91
92    #[tokio::test]
93    async fn test_with_bounded() {
94        {
95            let (tx, rx) = async_channel::bounded(1);
96            let sender: Box<dyn BoundedSender<usize>> = Box::new(tx);
97            let sender = sender.clone();
98            assert_eq!(sender.send(1).await, Ok(()));
99            assert_eq!(sender.try_send(2), Err(SendError::Full(2)));
100            assert!(
101                tokio::time::timeout(tokio::time::Duration::from_millis(200), sender.send(2))
102                    .await
103                    .is_err()
104            );
105            assert_eq!(rx.recv().await, Ok(1));
106            drop(rx);
107            assert_eq!(sender.send(3).await, Err(SendErrorWithoutFull::Closed(3)));
108            assert_eq!(sender.try_send(3), Err(SendError::Closed(3)));
109        }
110    }
111
112    #[tokio::test]
113    async fn test_with_unbounded() {
114        {
115            let (tx, rx) = async_channel::unbounded();
116            let sender: Box<dyn UnboundedSender<usize>> = Box::new(tx);
117            let sender = sender.clone();
118            assert_eq!(sender.send(1), Ok(()));
119            assert_eq!(sender.send(2), Ok(()));
120            assert_eq!(rx.recv().await, Ok(1));
121            assert_eq!(rx.recv().await, Ok(2));
122            drop(rx);
123            assert_eq!(sender.send(3), Err(SendErrorWithoutFull::Closed(3)));
124        }
125    }
126}
127
128#[cfg(test)]
129mod generic_impl_tests {
130    use crate::{
131        error::SendError,
132        generic::{CloneableSender, Sender},
133    };
134
135    #[tokio::test]
136    async fn test_with_bounded() {
137        {
138            let (tx, rx) = async_channel::bounded(1);
139            let sender: Box<dyn Sender<usize>> = Box::new(tx);
140            assert_eq!(sender.send(1), Ok(()));
141            assert_eq!(sender.send(2), Err(SendError::Full(2)));
142            assert_eq!(rx.recv().await, Ok(1));
143            drop(rx);
144            assert_eq!(sender.send(3), Err(SendError::Closed(3)));
145        }
146        {
147            let (tx, rx) = async_channel::bounded(1);
148            let sender: Box<dyn CloneableSender<usize>> = Box::new(tx);
149            let sender = sender.clone();
150            assert_eq!(sender.send(1), Ok(()));
151            assert_eq!(sender.send(2), Err(SendError::Full(2)));
152            assert_eq!(rx.recv().await, Ok(1));
153            drop(rx);
154            assert_eq!(sender.send(3), Err(SendError::Closed(3)));
155        }
156    }
157
158    #[tokio::test]
159    async fn test_with_unbounded() {
160        {
161            let (tx, rx) = async_channel::unbounded();
162            let sender: Box<dyn Sender<usize>> = Box::new(tx);
163            assert_eq!(sender.send(1), Ok(()));
164            assert_eq!(sender.send(2), Ok(()));
165            assert_eq!(rx.recv().await, Ok(1));
166            assert_eq!(rx.recv().await, Ok(2));
167            drop(rx);
168            assert_eq!(sender.send(3), Err(SendError::Closed(3)));
169        }
170        {
171            let (tx, rx) = async_channel::unbounded();
172            let sender: Box<dyn CloneableSender<usize>> = Box::new(tx);
173            let sender = sender.clone();
174            assert_eq!(sender.send(1), Ok(()));
175            assert_eq!(sender.send(2), Ok(()));
176            assert_eq!(rx.recv().await, Ok(1));
177            assert_eq!(rx.recv().await, Ok(2));
178            drop(rx);
179            assert_eq!(sender.send(3), Err(SendError::Closed(3)));
180        }
181    }
182}