async_compatibility_layer/channel/
unbounded.rs

1#![allow(clippy::module_name_repetitions)]
2
3use std::pin::Pin;
4
5use futures::Stream;
6
7/// inner module, used to group feature-specific imports
8#[cfg(async_channel_impl = "tokio")]
9mod inner {
10    pub use tokio::sync::mpsc::error::{
11        SendError as UnboundedSendError, TryRecvError as UnboundedTryRecvError,
12    };
13    use tokio::sync::mpsc::{UnboundedReceiver as InnerReceiver, UnboundedSender as InnerSender};
14
15    /// A receiver error returned from [`UnboundedReceiver`]'s `recv`
16    #[derive(Debug, PartialEq, Eq, Clone)]
17    pub struct UnboundedRecvError;
18
19    impl std::fmt::Display for UnboundedRecvError {
20        fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21            write!(fmt, stringify!(UnboundedRecvError))
22        }
23    }
24
25    impl std::error::Error for UnboundedRecvError {}
26
27    use tokio::sync::Mutex;
28
29    /// An unbounded sender, created with [`unbounded`]
30    pub struct UnboundedSender<T>(pub(super) InnerSender<T>);
31    /// An unbounded receiver, created with [`unbounded`]
32    pub struct UnboundedReceiver<T>(pub(super) Mutex<InnerReceiver<T>>);
33
34    /// An unbounded stream, created with a channel
35    pub struct UnboundedStream<T>(pub(super) tokio_stream::wrappers::UnboundedReceiverStream<T>);
36
37    /// Turn a `TryRecvError` into a `RecvError` if it's not `Empty`
38    pub(super) fn try_recv_error_to_recv_error(
39        e: UnboundedTryRecvError,
40    ) -> Option<UnboundedRecvError> {
41        match e {
42            UnboundedTryRecvError::Empty => None,
43            UnboundedTryRecvError::Disconnected => Some(UnboundedRecvError),
44        }
45    }
46
47    /// Create an unbounded channel. This will dynamically allocate whenever the internal buffer is full and a new message is added.
48    ///
49    /// The names of the [`UnboundedSender`] and [`UnboundedReceiver`] are specifically chosen to be less ergonomic than the [`bounded`] channels. Please consider using a bounded channel instead for performance reasons.
50    #[must_use]
51    pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
52        let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
53        let receiver = Mutex::new(receiver);
54        (UnboundedSender(sender), UnboundedReceiver(receiver))
55    }
56}
57
58/// inner module, used to group feature-specific imports
59#[cfg(async_channel_impl = "flume")]
60mod inner {
61    use flume::{r#async::RecvStream, Receiver, Sender};
62    pub use flume::{
63        RecvError as UnboundedRecvError, SendError as UnboundedSendError,
64        TryRecvError as UnboundedTryRecvError,
65    };
66
67    /// An unbounded sender, created with [`unbounded`]
68    pub struct UnboundedSender<T>(pub(super) Sender<T>);
69    /// An unbounded receiver, created with [`unbounded`]
70    pub struct UnboundedReceiver<T>(pub(super) Receiver<T>);
71    /// A bounded stream, created with a channel
72    pub struct UnboundedStream<T: 'static>(pub(super) RecvStream<'static, T>);
73
74    /// Turn a `TryRecvError` into a `RecvError` if it's not `Empty`
75    pub(super) fn try_recv_error_to_recv_error(
76        e: UnboundedTryRecvError,
77    ) -> Option<UnboundedRecvError> {
78        match e {
79            UnboundedTryRecvError::Empty => None,
80            UnboundedTryRecvError::Disconnected => Some(UnboundedRecvError::Disconnected),
81        }
82    }
83
84    /// Create an unbounded channel. This will dynamically allocate whenever the internal buffer is full and a new message is added.
85    ///
86    /// The names of the [`UnboundedSender`] and [`UnboundedReceiver`] are specifically chosen to be less ergonomic than the [`bounded`] channels. Please consider using a bounded channel instead for performance reasons.
87    #[must_use]
88    pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
89        let (sender, receiver) = flume::unbounded();
90        (UnboundedSender(sender), UnboundedReceiver(receiver))
91    }
92}
93
94/// inner module, used to group feature-specific imports
95#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
96mod inner {
97    use async_std::channel::{Receiver, Sender};
98    pub use async_std::channel::{
99        RecvError as UnboundedRecvError, SendError as UnboundedSendError,
100        TryRecvError as UnboundedTryRecvError,
101    };
102
103    /// An unbounded sender, created with [`unbounded`]
104    pub struct UnboundedSender<T>(pub(super) Sender<T>);
105    /// An unbounded receiver, created with [`unbounded`]
106    pub struct UnboundedReceiver<T>(pub(super) Receiver<T>);
107    /// An unbounded stream, created with a channel
108    pub struct UnboundedStream<T>(pub(super) Receiver<T>);
109
110    /// Turn a `TryRecvError` into a `RecvError` if it's not `Empty`
111    pub(super) fn try_recv_error_to_recv_error(
112        e: UnboundedTryRecvError,
113    ) -> Option<UnboundedRecvError> {
114        match e {
115            UnboundedTryRecvError::Empty => None,
116            UnboundedTryRecvError::Closed => Some(UnboundedRecvError),
117        }
118    }
119
120    /// Create an unbounded channel. This will dynamically allocate whenever the internal buffer is full and a new message is added.
121    ///
122    /// The names of the [`UnboundedSender`] and [`UnboundedReceiver`] are specifically chosen to be less ergonomic than the [`bounded`] channels. Please consider using a bounded channel instead for performance reasons.
123    #[must_use]
124    pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
125        let (sender, receiver) = async_std::channel::unbounded();
126        (UnboundedSender(sender), UnboundedReceiver(receiver))
127    }
128}
129
130pub use inner::*;
131
132impl<T> UnboundedSender<T> {
133    /// Send a value to the sender half of this channel.
134    ///
135    /// # Errors
136    ///
137    /// This may fail if the receiver is dropped.
138    #[allow(clippy::unused_async)] // under tokio this function is actually sync
139    pub async fn send(&self, msg: T) -> Result<(), UnboundedSendError<T>> {
140        #[cfg(async_channel_impl = "flume")]
141        let result = self.0.send_async(msg).await;
142        #[cfg(async_channel_impl = "tokio")]
143        let result = self.0.send(msg);
144        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
145        let result = self.0.send(msg).await;
146        result
147    }
148}
149
150impl<T> UnboundedReceiver<T> {
151    /// Receive a value from the receiver half of this channel.
152    ///
153    /// Will block until a value is received
154    ///
155    /// # Errors
156    ///
157    /// Will produce an error if all senders are dropped
158    pub async fn recv(&self) -> Result<T, UnboundedRecvError> {
159        #[cfg(async_channel_impl = "flume")]
160        let result = self.0.recv_async().await;
161        #[cfg(async_channel_impl = "tokio")]
162        let result = self.0.lock().await.recv().await.ok_or(UnboundedRecvError);
163        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
164        let result = self.0.recv().await;
165        result
166    }
167    /// Turn this receiver into a stream.
168    pub fn into_stream(self) -> UnboundedStream<T> {
169        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
170        let result = self.0;
171        #[cfg(async_channel_impl = "tokio")]
172        let result = tokio_stream::wrappers::UnboundedReceiverStream::new(self.0.into_inner());
173        #[cfg(async_channel_impl = "flume")]
174        let result = self.0.into_stream();
175
176        UnboundedStream(result)
177    }
178    /// Try to receive a value from the receiver.
179    ///
180    /// # Errors
181    ///
182    /// Will return an error if no value is currently queued. This function will not block.
183    pub fn try_recv(&self) -> Result<T, UnboundedTryRecvError> {
184        #[cfg(async_channel_impl = "tokio")]
185        let result = self
186            .0
187            .try_lock()
188            .map_err(|_| UnboundedTryRecvError::Empty)?
189            .try_recv();
190
191        #[cfg(not(async_channel_impl = "tokio"))]
192        let result = self.0.try_recv();
193
194        result
195    }
196    /// Asynchronously wait for at least 1 value to show up, then will greedily try to receive values until this receiver would block. The resulting values are returned.
197    ///
198    /// It is guaranteed that the returning vec contains at least 1 value
199    ///
200    /// # Errors
201    ///
202    /// Will return an error if there was an error retrieving the first value.
203    pub async fn drain_at_least_one(&self) -> Result<Vec<T>, UnboundedRecvError> {
204        // Wait for the first message to come up
205        let first = self.recv().await?;
206        let mut ret = vec![first];
207        loop {
208            match self.try_recv() {
209                Ok(x) => ret.push(x),
210                Err(e) => {
211                    if let Some(e) = try_recv_error_to_recv_error(e) {
212                        tracing::error!(
213                            "Tried to empty {:?} queue but it disconnected while we were emptying it ({} items are being dropped)",
214                            std::any::type_name::<Self>(),
215                            ret.len()
216                        );
217                        return Err(e);
218                    }
219                    break;
220                }
221            }
222        }
223        Ok(ret)
224    }
225    /// Drains the receiver from all messages in the queue, but will not poll for more messages
226    ///
227    /// # Errors
228    ///
229    /// Will return an error if all the senders get dropped before this ends.
230    pub fn drain(&self) -> Result<Vec<T>, UnboundedRecvError> {
231        let mut result = Vec::new();
232        loop {
233            match self.try_recv() {
234                Ok(t) => result.push(t),
235                Err(e) => {
236                    if let Some(e) = try_recv_error_to_recv_error(e) {
237                        return Err(e);
238                    }
239                    break;
240                }
241            }
242        }
243        Ok(result)
244    }
245    /// Attempt to load the length of the messages in queue.
246    ///
247    /// On some implementations this value does not exist, and this will return `None`.
248    #[allow(clippy::len_without_is_empty, clippy::unused_self)]
249    #[must_use]
250    pub fn len(&self) -> Option<usize> {
251        #[cfg(async_channel_impl = "tokio")]
252        let result = None;
253        #[cfg(not(all(async_channel_impl = "tokio")))]
254        let result = Some(self.0.len());
255        result
256    }
257}
258
259impl<T> Stream for UnboundedStream<T> {
260    type Item = T;
261
262    fn poll_next(
263        mut self: std::pin::Pin<&mut Self>,
264        cx: &mut std::task::Context<'_>,
265    ) -> std::task::Poll<Option<Self::Item>> {
266        #[cfg(async_channel_impl = "flume")]
267        return <flume::r#async::RecvStream<T>>::poll_next(Pin::new(&mut self.0), cx);
268        #[cfg(async_channel_impl = "tokio")]
269        return <tokio_stream::wrappers::UnboundedReceiverStream<T> as Stream>::poll_next(
270            Pin::new(&mut self.0),
271            cx,
272        );
273        #[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
274        return <async_std::channel::Receiver<T> as Stream>::poll_next(Pin::new(&mut self.0), cx);
275    }
276}
277
278// Clone impl
279impl<T> Clone for UnboundedSender<T> {
280    fn clone(&self) -> Self {
281        Self(self.0.clone())
282    }
283}
284
285// Debug impl
286impl<T> std::fmt::Debug for UnboundedSender<T> {
287    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
288        f.debug_struct("UnboundedSender").finish()
289    }
290}
291impl<T> std::fmt::Debug for UnboundedReceiver<T> {
292    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293        f.debug_struct("UnboundedReceiver").finish()
294    }
295}