#![allow(clippy::module_name_repetitions)]
use std::pin::Pin;
use futures::Stream;
#[cfg(async_channel_impl = "tokio")]
mod inner {
pub use tokio::sync::mpsc::error::{
SendError as UnboundedSendError, TryRecvError as UnboundedTryRecvError,
};
use tokio::sync::mpsc::{UnboundedReceiver as InnerReceiver, UnboundedSender as InnerSender};
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct UnboundedRecvError;
impl std::fmt::Display for UnboundedRecvError {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(fmt, stringify!(UnboundedRecvError))
}
}
impl std::error::Error for UnboundedRecvError {}
use tokio::sync::Mutex;
pub struct UnboundedSender<T>(pub(super) InnerSender<T>);
pub struct UnboundedReceiver<T>(pub(super) Mutex<InnerReceiver<T>>);
pub struct UnboundedStream<T>(pub(super) tokio_stream::wrappers::UnboundedReceiverStream<T>);
pub(super) fn try_recv_error_to_recv_error(
e: UnboundedTryRecvError,
) -> Option<UnboundedRecvError> {
match e {
UnboundedTryRecvError::Empty => None,
UnboundedTryRecvError::Disconnected => Some(UnboundedRecvError),
}
}
#[must_use]
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let receiver = Mutex::new(receiver);
(UnboundedSender(sender), UnboundedReceiver(receiver))
}
}
#[cfg(async_channel_impl = "flume")]
mod inner {
use flume::{r#async::RecvStream, Receiver, Sender};
pub use flume::{
RecvError as UnboundedRecvError, SendError as UnboundedSendError,
TryRecvError as UnboundedTryRecvError,
};
pub struct UnboundedSender<T>(pub(super) Sender<T>);
pub struct UnboundedReceiver<T>(pub(super) Receiver<T>);
pub struct UnboundedStream<T: 'static>(pub(super) RecvStream<'static, T>);
pub(super) fn try_recv_error_to_recv_error(
e: UnboundedTryRecvError,
) -> Option<UnboundedRecvError> {
match e {
UnboundedTryRecvError::Empty => None,
UnboundedTryRecvError::Disconnected => Some(UnboundedRecvError::Disconnected),
}
}
#[must_use]
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (sender, receiver) = flume::unbounded();
(UnboundedSender(sender), UnboundedReceiver(receiver))
}
}
#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
mod inner {
use async_std::channel::{Receiver, Sender};
pub use async_std::channel::{
RecvError as UnboundedRecvError, SendError as UnboundedSendError,
TryRecvError as UnboundedTryRecvError,
};
pub struct UnboundedSender<T>(pub(super) Sender<T>);
pub struct UnboundedReceiver<T>(pub(super) Receiver<T>);
pub struct UnboundedStream<T>(pub(super) Receiver<T>);
pub(super) fn try_recv_error_to_recv_error(
e: UnboundedTryRecvError,
) -> Option<UnboundedRecvError> {
match e {
UnboundedTryRecvError::Empty => None,
UnboundedTryRecvError::Closed => Some(UnboundedRecvError),
}
}
#[must_use]
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (sender, receiver) = async_std::channel::unbounded();
(UnboundedSender(sender), UnboundedReceiver(receiver))
}
}
pub use inner::*;
impl<T> UnboundedSender<T> {
#[allow(clippy::unused_async)] pub async fn send(&self, msg: T) -> Result<(), UnboundedSendError<T>> {
#[cfg(async_channel_impl = "flume")]
let result = self.0.send_async(msg).await;
#[cfg(async_channel_impl = "tokio")]
let result = self.0.send(msg);
#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
let result = self.0.send(msg).await;
result
}
}
impl<T> UnboundedReceiver<T> {
pub async fn recv(&self) -> Result<T, UnboundedRecvError> {
#[cfg(async_channel_impl = "flume")]
let result = self.0.recv_async().await;
#[cfg(async_channel_impl = "tokio")]
let result = self.0.lock().await.recv().await.ok_or(UnboundedRecvError);
#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
let result = self.0.recv().await;
result
}
pub fn into_stream(self) -> UnboundedStream<T> {
#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
let result = self.0;
#[cfg(async_channel_impl = "tokio")]
let result = tokio_stream::wrappers::UnboundedReceiverStream::new(self.0.into_inner());
#[cfg(async_channel_impl = "flume")]
let result = self.0.into_stream();
UnboundedStream(result)
}
pub fn try_recv(&self) -> Result<T, UnboundedTryRecvError> {
#[cfg(async_channel_impl = "tokio")]
let result = self
.0
.try_lock()
.map_err(|_| UnboundedTryRecvError::Empty)?
.try_recv();
#[cfg(not(async_channel_impl = "tokio"))]
let result = self.0.try_recv();
result
}
pub async fn drain_at_least_one(&self) -> Result<Vec<T>, UnboundedRecvError> {
let first = self.recv().await?;
let mut ret = vec![first];
loop {
match self.try_recv() {
Ok(x) => ret.push(x),
Err(e) => {
if let Some(e) = try_recv_error_to_recv_error(e) {
tracing::error!(
"Tried to empty {:?} queue but it disconnected while we were emptying it ({} items are being dropped)",
std::any::type_name::<Self>(),
ret.len()
);
return Err(e);
}
break;
}
}
}
Ok(ret)
}
pub fn drain(&self) -> Result<Vec<T>, UnboundedRecvError> {
let mut result = Vec::new();
loop {
match self.try_recv() {
Ok(t) => result.push(t),
Err(e) => {
if let Some(e) = try_recv_error_to_recv_error(e) {
return Err(e);
}
break;
}
}
}
Ok(result)
}
#[allow(clippy::len_without_is_empty, clippy::unused_self)]
#[must_use]
pub fn len(&self) -> Option<usize> {
#[cfg(async_channel_impl = "tokio")]
let result = None;
#[cfg(not(all(async_channel_impl = "tokio")))]
let result = Some(self.0.len());
result
}
}
impl<T> Stream for UnboundedStream<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
#[cfg(async_channel_impl = "flume")]
return <flume::r#async::RecvStream<T>>::poll_next(Pin::new(&mut self.0), cx);
#[cfg(async_channel_impl = "tokio")]
return <tokio_stream::wrappers::UnboundedReceiverStream<T> as Stream>::poll_next(
Pin::new(&mut self.0),
cx,
);
#[cfg(not(any(async_channel_impl = "flume", async_channel_impl = "tokio")))]
return <async_std::channel::Receiver<T> as Stream>::poll_next(Pin::new(&mut self.0), cx);
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T> std::fmt::Debug for UnboundedSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnboundedSender").finish()
}
}
impl<T> std::fmt::Debug for UnboundedReceiver<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnboundedReceiver").finish()
}
}