async_compatibility_layer/channel/
unbounded.rs1#![allow(clippy::module_name_repetitions)]
2
3use std::pin::Pin;
4
5use futures::Stream;
6
7#[cfg(async_channel_impl = "tokio")]
9mod inner {
10 pub use tokio::sync::mpsc::error::{
11 SendError as UnboundedSendError, TryRecvError as UnboundedTryRecvError,
12 };
13 use tokio::sync::mpsc::{UnboundedReceiver as InnerReceiver, UnboundedSender as InnerSender};
14
15 #[derive(Debug, PartialEq, Eq, Clone)]
17 pub struct UnboundedRecvError;
18
19 impl std::fmt::Display for UnboundedRecvError {
20 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 write!(fmt, stringify!(UnboundedRecvError))
22 }
23 }
24
25 impl std::error::Error for UnboundedRecvError {}
26
27 use tokio::sync::Mutex;
28
29 pub struct UnboundedSender<T>(pub(super) InnerSender<T>);
31 pub struct UnboundedReceiver<T>(pub(super) Mutex<InnerReceiver<T>>);
33
34 pub struct UnboundedStream<T>(pub(super) tokio_stream::wrappers::UnboundedReceiverStream<T>);
36
37 pub(super) fn try_recv_error_to_recv_error(
39 e: UnboundedTryRecvError,
40 ) -> Option<UnboundedRecvError> {
41 match e {
42 UnboundedTryRecvError::Empty => None,
43 UnboundedTryRecvError::Disconnected => Some(UnboundedRecvError),
44 }
45 }
46
47 #[must_use]
51 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
52 let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
53 let receiver = Mutex::new(receiver);
54 (UnboundedSender(sender), UnboundedReceiver(receiver))
55 }
56}
57
58#[cfg(async_channel_impl = "flume")]
60mod inner {
61 use flume::{r#async::RecvStream, Receiver, Sender};
62 pub use flume::{
63 RecvError as UnboundedRecvError, SendError as UnboundedSendError,
64 TryRecvError as UnboundedTryRecvError,
65 };
66
67 pub struct UnboundedSender<T>(pub(super) Sender<T>);
69 pub struct UnboundedReceiver<T>(pub(super) Receiver<T>);
71 pub struct UnboundedStream<T: 'static>(pub(super) RecvStream<'static, T>);
73
74 pub(super) fn try_recv_error_to_recv_error(
76 e: UnboundedTryRecvError,
77 ) -> Option<UnboundedRecvError> {
78 match e {
79 UnboundedTryRecvError::Empty => None,
80 UnboundedTryRecvError::Disconnected => Some(UnboundedRecvError::Disconnected),
81 }
82 }
83
84 #[must_use]
88 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
89 let (sender, receiver) = flume::unbounded();
90 (UnboundedSender(sender), UnboundedReceiver(receiver))
91 }
92}
93
94#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
96mod inner {
97 use async_std::channel::{Receiver, Sender};
98 pub use async_std::channel::{
99 RecvError as UnboundedRecvError, SendError as UnboundedSendError,
100 TryRecvError as UnboundedTryRecvError,
101 };
102
103 pub struct UnboundedSender<T>(pub(super) Sender<T>);
105 pub struct UnboundedReceiver<T>(pub(super) Receiver<T>);
107 pub struct UnboundedStream<T>(pub(super) Receiver<T>);
109
110 pub(super) fn try_recv_error_to_recv_error(
112 e: UnboundedTryRecvError,
113 ) -> Option<UnboundedRecvError> {
114 match e {
115 UnboundedTryRecvError::Empty => None,
116 UnboundedTryRecvError::Closed => Some(UnboundedRecvError),
117 }
118 }
119
120 #[must_use]
124 pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
125 let (sender, receiver) = async_std::channel::unbounded();
126 (UnboundedSender(sender), UnboundedReceiver(receiver))
127 }
128}
129
130pub use inner::*;
131
132impl<T> UnboundedSender<T> {
133 #[allow(clippy::unused_async)] pub async fn send(&self, msg: T) -> Result<(), UnboundedSendError<T>> {
140 #[cfg(async_channel_impl = "flume")]
141 let result = self.0.send_async(msg).await;
142 #[cfg(async_channel_impl = "tokio")]
143 let result = self.0.send(msg);
144 #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
145 let result = self.0.send(msg).await;
146 result
147 }
148}
149
150impl<T> UnboundedReceiver<T> {
151 pub async fn recv(&self) -> Result<T, UnboundedRecvError> {
159 #[cfg(async_channel_impl = "flume")]
160 let result = self.0.recv_async().await;
161 #[cfg(async_channel_impl = "tokio")]
162 let result = self.0.lock().await.recv().await.ok_or(UnboundedRecvError);
163 #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
164 let result = self.0.recv().await;
165 result
166 }
167 pub fn into_stream(self) -> UnboundedStream<T> {
169 #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
170 let result = self.0;
171 #[cfg(async_channel_impl = "tokio")]
172 let result = tokio_stream::wrappers::UnboundedReceiverStream::new(self.0.into_inner());
173 #[cfg(async_channel_impl = "flume")]
174 let result = self.0.into_stream();
175
176 UnboundedStream(result)
177 }
178 pub fn try_recv(&self) -> Result<T, UnboundedTryRecvError> {
184 #[cfg(async_channel_impl = "tokio")]
185 let result = self
186 .0
187 .try_lock()
188 .map_err(|_| UnboundedTryRecvError::Empty)?
189 .try_recv();
190
191 #[cfg(not(async_channel_impl = "tokio"))]
192 let result = self.0.try_recv();
193
194 result
195 }
196 pub async fn drain_at_least_one(&self) -> Result<Vec<T>, UnboundedRecvError> {
204 let first = self.recv().await?;
206 let mut ret = vec![first];
207 loop {
208 match self.try_recv() {
209 Ok(x) => ret.push(x),
210 Err(e) => {
211 if let Some(e) = try_recv_error_to_recv_error(e) {
212 tracing::error!(
213 "Tried to empty {:?} queue but it disconnected while we were emptying it ({} items are being dropped)",
214 std::any::type_name::<Self>(),
215 ret.len()
216 );
217 return Err(e);
218 }
219 break;
220 }
221 }
222 }
223 Ok(ret)
224 }
225 pub fn drain(&self) -> Result<Vec<T>, UnboundedRecvError> {
231 let mut result = Vec::new();
232 loop {
233 match self.try_recv() {
234 Ok(t) => result.push(t),
235 Err(e) => {
236 if let Some(e) = try_recv_error_to_recv_error(e) {
237 return Err(e);
238 }
239 break;
240 }
241 }
242 }
243 Ok(result)
244 }
245 #[allow(clippy::len_without_is_empty, clippy::unused_self)]
249 #[must_use]
250 pub fn len(&self) -> Option<usize> {
251 #[cfg(async_channel_impl = "tokio")]
252 let result = None;
253 #[cfg(not(all(async_channel_impl = "tokio")))]
254 let result = Some(self.0.len());
255 result
256 }
257}
258
259impl<T> Stream for UnboundedStream<T> {
260 type Item = T;
261
262 fn poll_next(
263 mut self: std::pin::Pin<&mut Self>,
264 cx: &mut std::task::Context<'_>,
265 ) -> std::task::Poll<Option<Self::Item>> {
266 #[cfg(async_channel_impl = "flume")]
267 return <flume::r#async::RecvStream<T>>::poll_next(Pin::new(&mut self.0), cx);
268 #[cfg(async_channel_impl = "tokio")]
269 return <tokio_stream::wrappers::UnboundedReceiverStream<T> as Stream>::poll_next(
270 Pin::new(&mut self.0),
271 cx,
272 );
273 #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
274 return <async_std::channel::Receiver<T> as Stream>::poll_next(Pin::new(&mut self.0), cx);
275 }
276}
277
278impl<T> Clone for UnboundedSender<T> {
280 fn clone(&self) -> Self {
281 Self(self.0.clone())
282 }
283}
284
285impl<T> std::fmt::Debug for UnboundedSender<T> {
287 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288 f.debug_struct("UnboundedSender").finish()
289 }
290}
291impl<T> std::fmt::Debug for UnboundedReceiver<T> {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 f.debug_struct("UnboundedReceiver").finish()
294 }
295}