#![forbid(unsafe_code)]
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::any::type_name;
use std::cell::Cell;
use std::fmt::{Debug, Formatter};
use std::sync::mpsc::{RecvError, SendError, TryRecvError, TrySendError};
use std::sync::{Arc, Mutex};
use std::task::Waker;
pub struct Inner {
sender_wakers: Vec<Waker>,
receiver_waker: Option<Waker>,
}
pub struct OneSender<T: Send> {
std_sender: Option<std::sync::mpsc::SyncSender<T>>,
inner: Arc<Mutex<Inner>>,
}
impl<T: Send> OneSender<T> {
/// Saves the value in the channel buffer and consumes the sender.
///
/// Note that the receiver may drop before reading the value.
///
/// # Errors
/// When the receiver is already dropped, returns `SendError` and the value.
#[allow(clippy::missing_panics_doc)]
pub fn send(mut self, value: T) -> Result<(), SendError<T>> {
self.std_sender.take().unwrap().send(value)
// This method consumes self. When self drops, it wakes any receiver.
}
}
impl<T: Send> Drop for OneSender<T> {
fn drop(&mut self) {
let mut inner_guard = self.inner.lock().unwrap();
self.std_sender.take();
let opt_waker = inner_guard.receiver_waker.take();
drop(inner_guard);
if let Some(waker) = opt_waker {
waker.wake();
}
}
}
impl<T: Send> Debug for OneSender<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "OneSender<{}>", type_name::<T>())
}
}
pub struct SendFut<T: Send> {
std_sender: std::sync::mpsc::SyncSender<T>,
inner: Arc<Mutex<Inner>>,
value: Cell<Option<T>>,
}
impl<T: Send> Future for SendFut<T> {
type Output = Result<(), SendError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// The enclosing Rust-generated `async_send future future prevents re-polling.
// It panics at '`async fn` resumed after completion'. So this can never panic:
let value = self.value.take().take().unwrap();
let mut inner_guard = self.inner.lock().unwrap();
match self.std_sender.try_send(value) {
Ok(()) => Poll::Ready(Ok(())),
Err(TrySendError::Disconnected(value)) => Poll::Ready(Err(SendError(value))),
Err(TrySendError::Full(value)) => {
self.value.set(Some(value));
inner_guard.sender_wakers.push(cx.waker().clone());
Poll::Pending
}
}
}
}
impl<T: Send> PartialEq for OneSender<T> {
fn eq(&self, _other: &Self) -> bool {
false
}
}
impl<T: Send> Eq for OneSender<T> {}
#[derive(Clone)]
pub struct SyncSender<T: Send> {
std_sender: Option<std::sync::mpsc::SyncSender<T>>,
inner: Arc<Mutex<Inner>>,
}
impl<T: Send + Clone> SyncSender<T> {
/// Sends a value on this synchronous channel.
///
/// This function will block until space in the internal buffer becomes available or a receiver
/// is available to hand off the message to.
///
/// Note that a successful send does not guarantee that the receiver will ever see the data.
/// Items may be enqueued in the internal buffer for the receiver to receive at a later time.
///
/// # Errors
/// This function will never panic, but it may return Err if the Receiver has disconnected and
/// is no longer able to receive information.
#[allow(clippy::missing_panics_doc)]
pub async fn async_send(&self, value: T) -> Result<(), SendError<T>> {
self.wake_receiver_if_ok(
SendFut {
std_sender: self.std_sender.as_ref().unwrap().clone(),
inner: self.inner.clone(),
value: Cell::new(Some(value)),
}
.await,
)
}
}
impl<T: Send> SyncSender<T> {
fn wake_receiver(&self) {
let opt_waker = self.inner.lock().unwrap().receiver_waker.take();
if let Some(waker) = opt_waker {
waker.wake();
}
}
fn wake_receiver_if_ok<E>(&self, result: Result<(), E>) -> Result<(), E> {
if result.is_ok() {
self.wake_receiver();
}
result
}
/// Sends a value on this synchronous channel.
///
/// This function will block until space in the internal buffer becomes available or a receiver
/// is available to hand off the message to.
///
/// Note that a successful send does not guarantee that the receiver will ever see the data.
/// Items may be enqueued in the internal buffer for the receiver to receive at a later time.
///
/// # Errors
/// This function will never panic, but it may return Err if the Receiver has disconnected and
/// is no longer able to receive information.
#[allow(clippy::missing_panics_doc)]
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.wake_receiver_if_ok(self.std_sender.as_ref().unwrap().send(value))
}
/// Attempts to send a value on this channel. Returns immediately.
///
/// # Errors
/// Returns [`TrySendError::Full`] when the channel's buffer is full.
///
/// Returns [`TrySendError::Disconnected`] when the channel's receiver has been dropped.
///
/// [`TrySendError::Full`]: std::sync::mpsc::TrySendError
/// [`TrySendError::Disconnected`]: std::sync::mpsc::TrySendError
#[allow(clippy::missing_panics_doc)]
pub fn try_send(&self, value: T) -> Result<(), std::sync::mpsc::TrySendError<T>> {
self.wake_receiver_if_ok(self.std_sender.as_ref().unwrap().try_send(value))
}
}
impl<T: Send> Drop for SyncSender<T> {
fn drop(&mut self) {
let mut inner_guard = self.inner.lock().unwrap();
self.std_sender.take();
if Arc::strong_count(&self.inner) < 3 {
// Either the receiver is already dropped or we are the last sender.
// Either way, it's safe to wake any receiver.
let opt_waker = inner_guard.receiver_waker.take();
drop(inner_guard);
if let Some(waker) = opt_waker {
waker.wake();
}
}
}
}
impl<T: Send> Debug for SyncSender<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "SyncSender<{}>", type_name::<T>())
}
}
impl<T: Send> PartialEq for SyncSender<T> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}
impl<T: Send> Eq for SyncSender<T> {}
/// The receiving half of a channel. This half can only be owned by one thread.
///
/// Ways to receive messages sent to the channel:
/// - await this struct (it implements `Future`)
/// - call [`async_recv`]
/// - call one of the blocking receive methods.
pub struct Receiver<T>
where
T: Send,
{
std_receiver: Option<std::sync::mpsc::Receiver<T>>,
inner: Arc<Mutex<Inner>>,
}
impl<T: Send> Receiver<T> {
fn wake_senders(&self) {
let wakers: Vec<Waker> = std::mem::take(&mut self.inner.lock().unwrap().sender_wakers);
for waker in wakers {
waker.wake();
}
}
fn wake_senders_if_ok<E>(&self, result: Result<T, E>) -> Result<T, E> {
if result.is_ok() {
self.wake_senders();
}
result
}
/// Gets the next value from the channel.
/// When the channel is empty,
/// waits for a sender to add a value to it,
/// then returns that value.
///
/// If all corresponding senders have disconnected,
/// or disconnect while this call is waiting,
/// this call will wake up and return Err to indicate that no more messages can ever be
/// received on this channel. However, since channels are buffered, messages sent before the
/// disconnect will still be properly received.
///
/// This struct implements `Future`. This method just returns `self.await`.
///
/// # Errors
/// Returns `RecvError` when the channel is empty and all channel senders have disconnected.
pub async fn async_recv(&mut self) -> Result<T, std::sync::mpsc::RecvError> {
self.await
}
/// Attempts to return a pending value on this receiver without blocking.
///
/// This method will never block the caller in order to wait for data to become available.
/// Instead, this will always return immediately with a possible option of pending data on the
/// channel.
///
/// This is useful for a flavor of “optimistic check” before deciding to block on a receiver.
///
/// Compared with `recv`, this function has two failure cases
/// instead of one (one for disconnection, one for an empty buffer).
#[allow(clippy::missing_errors_doc)]
#[allow(clippy::missing_panics_doc)]
pub fn try_recv(&self) -> Result<T, std::sync::mpsc::TryRecvError> {
self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().try_recv())
}
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
/// channel has hung up.
///
/// This function will always block the current thread if there is no data available
/// and it’s possible for more data to be sent (at least one sender still exists).
/// Once a message is sent to the corresponding sender,
/// this receiver will wake up and return that message.
///
/// If the corresponding sender has disconnected, or it disconnects while this call is blocking,
/// this call will wake up and return Err to indicate that no more messages can ever be
/// received on this channel. However, since channels are buffered, messages sent before the
/// disconnect will still be properly received.
#[allow(clippy::missing_errors_doc)]
#[allow(clippy::missing_panics_doc)]
pub fn recv(&self) -> Result<T, std::sync::mpsc::RecvError> {
self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().recv())
}
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
/// channel has hung up, or if it waits more than timeout.
///
/// This function will always block the current thread if there is no data available and it’s
/// possible for more data to be sent (at least one sender still exists). Once a message is
/// sent to the corresponding sender, this receiver will wake up and return that message.
///
/// If the corresponding sender has disconnected, or it disconnects while this call is blocking,
/// this call will wake up and return Err to indicate that no more messages can ever be received
/// on this channel. However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
///
/// # Known Issues
/// There is currently a known issue in the inner `std::sync::mpsc::Receiver`
/// that can cause `recv_timeout` to panic unexpectedly. See the explanation at
/// [`std::sync::mpsc::Receiver::recv_timeout`](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_timeout).
#[allow(clippy::missing_errors_doc)]
#[allow(clippy::missing_panics_doc)]
pub fn recv_timeout(
&self,
timeout: core::time::Duration,
) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().recv_timeout(timeout))
}
/// Attempts to wait for a value on this receiver, returning an error if the corresponding
/// channel has hung up, or if deadline is reached.
///
/// This function will always block the current thread if there is no data available and it’s
/// possible for more data to be sent. Once a message is sent to the corresponding sender,
/// then this receiver will wake up and return that message.
///
/// If the corresponding Sender has disconnected, or it disconnects while this call is blocking,
/// this call will wake up and return Err to indicate that no more messages can ever be received
/// on this channel. However, since channels are buffered, messages sent before the disconnect
/// will still be properly received.
#[cfg(unstble)]
#[allow(clippy::missing_errors_doc)]
pub fn recv_deadline(
&self,
deadline: std::time::Instant,
) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().recv_deadline(deadline))
}
/// Returns an iterator that will block waiting for messages, but never panic.
/// It will return `None` when the channel has hung up.
pub fn iter(&self) -> Iter<'_, T> {
Iter { rx: self }
}
/// Returns an iterator that will attempt to yield all pending values.
/// It will return `None` if there are no more pending values or if the channel has hung up.
/// The iterator will never panic or block the user by waiting for values.
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter { rx: self }
}
}
impl<T: Send> Drop for Receiver<T> {
fn drop(&mut self) {
let mut inner_guard = self.inner.lock().unwrap();
self.std_receiver.take();
let receiver_waker = inner_guard.receiver_waker.take();
let sender_wakers: Vec<Waker> = std::mem::take(&mut inner_guard.sender_wakers);
drop(inner_guard);
drop(receiver_waker);
for waker in sender_wakers {
waker.wake();
}
}
}
#[doc(hidden)]
impl<T: Send> Future for Receiver<T> {
type Output = Result<T, std::sync::mpsc::RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut inner_guard = self.inner.lock().unwrap();
match self.std_receiver.as_ref().unwrap().try_recv() {
Ok(value) => {
drop(inner_guard);
self.wake_senders();
Poll::Ready(Ok(value))
}
Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
Err(TryRecvError::Empty) => {
let waker = cx.waker().clone();
if Arc::strong_count(&self.inner) < 2 {
// Last sender dropped.
Poll::Ready(Err(RecvError))
} else {
let opt_waker = inner_guard.receiver_waker.replace(waker);
drop(inner_guard);
drop(opt_waker);
Poll::Pending
}
}
}
}
}
impl<T: Send> Debug for Receiver<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Receiver<{}>", type_name::<T>())
}
}
impl<T: Send> PartialEq for Receiver<T> {
fn eq(&self, _other: &Self) -> bool {
false
}
}
impl<T: Send> Eq for Receiver<T> {}
impl<'a, T: Send> IntoIterator for &'a Receiver<T> {
type Item = T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Iter<'a, T> {
self.iter()
}
}
/// An iterator over messages on a [`Receiver`], created by [`iter`].
///
/// This iterator will block whenever [`next`] is called,
/// waiting for a new message, and [`None`] will be returned
/// when the corresponding channel has hung up.
///
/// [`iter`]: Receiver::iter
/// [`next`]: Iterator::next
#[derive(Debug)]
pub struct Iter<'a, T: 'a + Send> {
rx: &'a Receiver<T>,
}
impl<'a, T: Send> Iterator for Iter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}
/// An owning iterator over messages on a [`Receiver`],
/// created by [`into_iter`].
///
/// This iterator will block whenever [`next`]
/// is called, waiting for a new message, and [`None`] will be
/// returned if the corresponding channel has hung up.
///
/// [`into_iter`]: Receiver::into_iter
/// [`next`]: Iterator::next
#[derive(Debug)]
pub struct IntoIter<T: Send> {
rx: Receiver<T>,
}
impl<T: Send> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}
impl<T: Send> IntoIterator for Receiver<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> IntoIter<T> {
IntoIter { rx: self }
}
}
/// An iterator that attempts to yield all pending values for a [`Receiver`],
/// created by [`try_iter`].
///
/// [`None`] will be returned when there are no pending values remaining or
/// if the corresponding channel has hung up.
///
/// This iterator will never block the caller in order to wait for data to
/// become available. Instead, it will return [`None`].
///
/// [`try_iter`]: Receiver::try_iter
#[derive(Debug)]
pub struct TryIter<'a, T: 'a + Send> {
rx: &'a Receiver<T>,
}
impl<'a, T: Send> Iterator for TryIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.try_recv().ok()
}
}
/// Creates a channel that can be used to send a single value.
///
/// Use the returned `Receiver` to get the value.
#[must_use]
pub fn oneshot<T>() -> (OneSender<T>, Receiver<T>)
where
T: Send,
{
let (std_sender, std_receiver) = std::sync::mpsc::sync_channel(1);
let inner = Arc::new(Mutex::new(Inner {
sender_wakers: Vec::new(),
receiver_waker: None,
}));
(
OneSender {
std_sender: Some(std_sender),
inner: inner.clone(),
},
Receiver {
std_receiver: Some(std_receiver),
inner,
},
)
}
/// Creates a new synchronous, bounded channel.
/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
/// in the same order as it was sent.
/// The [`Receiver`] will block until a message becomes available.
///
/// This channel has an internal buffer on which messages will be queued.
/// `bound` specifies the buffer size. When the internal buffer becomes full,
/// future sends will wait for the buffer to open up.
///
/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
/// times, but only one [`Receiver`] is supported.
///
/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
/// to [`recv`], the [`recv`] method will return a [`RecvError`].
///
/// [`send`]: SyncSender::send
/// [`recv`]: Receiver::recv
///
/// # Panics
/// Panics if `bound` is zero.
#[must_use]
#[allow(clippy::module_name_repetitions)]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)
where
T: Send,
{
assert!(bound > 0, "bound must be greater than zero");
let (std_sender, std_receiver) = std::sync::mpsc::sync_channel(bound);
let inner = Arc::new(Mutex::new(Inner {
sender_wakers: Vec::new(),
receiver_waker: None,
}));
(
SyncSender {
std_sender: Some(std_sender),
inner: inner.clone(),
},
Receiver {
std_receiver: Some(std_receiver),
inner,
},
)
}