channel_sender/
impl_async_channel.rs1pub use async_channel::Sender as AsyncChannelSender;
2use async_channel::TrySendError;
3
4mod multi_producer_impl {
6 use super::*;
7
8 use crate::{
9 error::{SendError, SendErrorWithoutFull},
10 multi_producer::{BoundedSender, UnboundedSender},
11 };
12
13 #[async_trait::async_trait]
14 impl<T> BoundedSender<T> for AsyncChannelSender<T> {
15 async fn send(&self, t: T) -> Result<(), SendErrorWithoutFull<T>>
16 where
17 T: Send,
18 {
19 AsyncChannelSender::send(self, t)
20 .await
21 .map_err(|err| SendErrorWithoutFull::Closed(err.0))
22 }
23
24 fn try_send(&self, t: T) -> Result<(), SendError<T>> {
25 AsyncChannelSender::try_send(self, t).map_err(Into::into)
26 }
27 }
28
29 impl<T> UnboundedSender<T> for AsyncChannelSender<T> {
30 fn send(&self, t: T) -> Result<(), SendErrorWithoutFull<T>> {
31 debug_assert!(
32 !self.is_full(),
33 "Unbounded channels are never full. Make sure you are using `async_channel::unbounded`."
34 );
35
36 match AsyncChannelSender::try_send(self, t) {
37 Ok(_) => Ok(()),
38 Err(err) => match err {
39 TrySendError::Full(v) => Err(SendErrorWithoutFull::UnreachableFull(v)),
40 TrySendError::Closed(v) => Err(SendErrorWithoutFull::Closed(v)),
41 },
42 }
43 }
44 }
45}
46
47mod generic_impl {
49 use super::*;
50
51 use crate::{
52 error::SendError,
53 generic::{CloneableSender, Sender},
54 };
55
56 impl<T> Sender<T> for AsyncChannelSender<T> {
57 fn send(&self, t: T) -> Result<(), SendError<T>> {
58 AsyncChannelSender::try_send(self, t).map_err(Into::into)
59 }
60 }
61
62 impl<T> CloneableSender<T> for AsyncChannelSender<T> {
63 fn send(&self, t: T) -> Result<(), SendError<T>> {
64 AsyncChannelSender::try_send(self, t).map_err(Into::into)
65 }
66 }
67}
68
69mod error_convert {
71 use super::*;
72
73 use crate::error::SendError;
74
75 impl<T> From<TrySendError<T>> for SendError<T> {
76 fn from(err: TrySendError<T>) -> Self {
77 match err {
78 TrySendError::Full(v) => Self::Full(v),
79 TrySendError::Closed(v) => Self::Closed(v),
80 }
81 }
82 }
83}
84
85#[cfg(test)]
86mod multi_producer_impl_tests {
87 use crate::{
88 error::{SendError, SendErrorWithoutFull},
89 multi_producer::{BoundedSender, UnboundedSender},
90 };
91
92 #[tokio::test]
93 async fn test_with_bounded() {
94 {
95 let (tx, rx) = async_channel::bounded(1);
96 let sender: Box<dyn BoundedSender<usize>> = Box::new(tx);
97 let sender = sender.clone();
98 assert_eq!(sender.send(1).await, Ok(()));
99 assert_eq!(sender.try_send(2), Err(SendError::Full(2)));
100 assert!(
101 tokio::time::timeout(tokio::time::Duration::from_millis(200), sender.send(2))
102 .await
103 .is_err()
104 );
105 assert_eq!(rx.recv().await, Ok(1));
106 drop(rx);
107 assert_eq!(sender.send(3).await, Err(SendErrorWithoutFull::Closed(3)));
108 assert_eq!(sender.try_send(3), Err(SendError::Closed(3)));
109 }
110 }
111
112 #[tokio::test]
113 async fn test_with_unbounded() {
114 {
115 let (tx, rx) = async_channel::unbounded();
116 let sender: Box<dyn UnboundedSender<usize>> = Box::new(tx);
117 let sender = sender.clone();
118 assert_eq!(sender.send(1), Ok(()));
119 assert_eq!(sender.send(2), Ok(()));
120 assert_eq!(rx.recv().await, Ok(1));
121 assert_eq!(rx.recv().await, Ok(2));
122 drop(rx);
123 assert_eq!(sender.send(3), Err(SendErrorWithoutFull::Closed(3)));
124 }
125 }
126}
127
128#[cfg(test)]
129mod generic_impl_tests {
130 use crate::{
131 error::SendError,
132 generic::{CloneableSender, Sender},
133 };
134
135 #[tokio::test]
136 async fn test_with_bounded() {
137 {
138 let (tx, rx) = async_channel::bounded(1);
139 let sender: Box<dyn Sender<usize>> = Box::new(tx);
140 assert_eq!(sender.send(1), Ok(()));
141 assert_eq!(sender.send(2), Err(SendError::Full(2)));
142 assert_eq!(rx.recv().await, Ok(1));
143 drop(rx);
144 assert_eq!(sender.send(3), Err(SendError::Closed(3)));
145 }
146 {
147 let (tx, rx) = async_channel::bounded(1);
148 let sender: Box<dyn CloneableSender<usize>> = Box::new(tx);
149 let sender = sender.clone();
150 assert_eq!(sender.send(1), Ok(()));
151 assert_eq!(sender.send(2), Err(SendError::Full(2)));
152 assert_eq!(rx.recv().await, Ok(1));
153 drop(rx);
154 assert_eq!(sender.send(3), Err(SendError::Closed(3)));
155 }
156 }
157
158 #[tokio::test]
159 async fn test_with_unbounded() {
160 {
161 let (tx, rx) = async_channel::unbounded();
162 let sender: Box<dyn Sender<usize>> = Box::new(tx);
163 assert_eq!(sender.send(1), Ok(()));
164 assert_eq!(sender.send(2), Ok(()));
165 assert_eq!(rx.recv().await, Ok(1));
166 assert_eq!(rx.recv().await, Ok(2));
167 drop(rx);
168 assert_eq!(sender.send(3), Err(SendError::Closed(3)));
169 }
170 {
171 let (tx, rx) = async_channel::unbounded();
172 let sender: Box<dyn CloneableSender<usize>> = Box::new(tx);
173 let sender = sender.clone();
174 assert_eq!(sender.send(1), Ok(()));
175 assert_eq!(sender.send(2), Ok(()));
176 assert_eq!(rx.recv().await, Ok(1));
177 assert_eq!(rx.recv().await, Ok(2));
178 drop(rx);
179 assert_eq!(sender.send(3), Err(SendError::Closed(3)));
180 }
181 }
182}