safina-sync 0.2.4

Safe structs for sharing or sending data between async tasks
Documentation
#![forbid(unsafe_code)]

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use std::any::type_name;
use std::cell::Cell;
use std::fmt::{Debug, Formatter};
use std::sync::mpsc::{RecvError, SendError, TryRecvError, TrySendError};
use std::sync::{Arc, Mutex};
use std::task::Waker;

pub struct Inner {
    sender_wakers: Vec<Waker>,
    receiver_waker: Option<Waker>,
}

pub struct OneSender<T: Send> {
    std_sender: Option<std::sync::mpsc::SyncSender<T>>,
    inner: Arc<Mutex<Inner>>,
}
impl<T: Send> OneSender<T> {
    /// Saves the value in the channel buffer and consumes the sender.
    ///
    /// Note that the receiver may drop before reading the value.
    ///
    /// # Errors
    /// When the receiver is already dropped, returns `SendError` and the value.
    #[allow(clippy::missing_panics_doc)]
    pub fn send(mut self, value: T) -> Result<(), SendError<T>> {
        self.std_sender.take().unwrap().send(value)
        // This method consumes self.  When self drops, it wakes any receiver.
    }
}
impl<T: Send> Drop for OneSender<T> {
    fn drop(&mut self) {
        let mut inner_guard = self.inner.lock().unwrap();
        self.std_sender.take();
        let opt_waker = inner_guard.receiver_waker.take();
        drop(inner_guard);
        if let Some(waker) = opt_waker {
            waker.wake();
        }
    }
}
impl<T: Send> Debug for OneSender<T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "OneSender<{}>", type_name::<T>())
    }
}

pub struct SendFut<T: Send> {
    std_sender: std::sync::mpsc::SyncSender<T>,
    inner: Arc<Mutex<Inner>>,
    value: Cell<Option<T>>,
}
impl<T: Send> Future for SendFut<T> {
    type Output = Result<(), SendError<T>>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // The enclosing Rust-generated `async_send future future prevents re-polling.
        // It panics at '`async fn` resumed after completion'.  So this can never panic:
        let value = self.value.take().take().unwrap();
        let mut inner_guard = self.inner.lock().unwrap();
        match self.std_sender.try_send(value) {
            Ok(()) => Poll::Ready(Ok(())),
            Err(TrySendError::Disconnected(value)) => Poll::Ready(Err(SendError(value))),
            Err(TrySendError::Full(value)) => {
                self.value.set(Some(value));
                inner_guard.sender_wakers.push(cx.waker().clone());
                Poll::Pending
            }
        }
    }
}
impl<T: Send> PartialEq for OneSender<T> {
    fn eq(&self, _other: &Self) -> bool {
        false
    }
}
impl<T: Send> Eq for OneSender<T> {}

#[derive(Clone)]
pub struct SyncSender<T: Send> {
    std_sender: Option<std::sync::mpsc::SyncSender<T>>,
    inner: Arc<Mutex<Inner>>,
}
impl<T: Send + Clone> SyncSender<T> {
    /// Sends a value on this synchronous channel.
    ///
    /// This function will block until space in the internal buffer becomes available or a receiver
    /// is available to hand off the message to.
    ///
    /// Note that a successful send does not guarantee that the receiver will ever see the data.
    /// Items may be enqueued in the internal buffer for the receiver to receive at a later time.
    ///
    /// # Errors
    /// This function will never panic, but it may return Err if the Receiver has disconnected and
    /// is no longer able to receive information.
    #[allow(clippy::missing_panics_doc)]
    pub async fn async_send(&self, value: T) -> Result<(), SendError<T>> {
        self.wake_receiver_if_ok(
            SendFut {
                std_sender: self.std_sender.as_ref().unwrap().clone(),
                inner: self.inner.clone(),
                value: Cell::new(Some(value)),
            }
            .await,
        )
    }
}
impl<T: Send> SyncSender<T> {
    fn wake_receiver(&self) {
        let opt_waker = self.inner.lock().unwrap().receiver_waker.take();
        if let Some(waker) = opt_waker {
            waker.wake();
        }
    }

    fn wake_receiver_if_ok<E>(&self, result: Result<(), E>) -> Result<(), E> {
        if result.is_ok() {
            self.wake_receiver();
        }
        result
    }

    /// Sends a value on this synchronous channel.
    ///
    /// This function will block until space in the internal buffer becomes available or a receiver
    /// is available to hand off the message to.
    ///
    /// Note that a successful send does not guarantee that the receiver will ever see the data.
    /// Items may be enqueued in the internal buffer for the receiver to receive at a later time.
    ///
    /// # Errors
    /// This function will never panic, but it may return Err if the Receiver has disconnected and
    /// is no longer able to receive information.
    #[allow(clippy::missing_panics_doc)]
    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
        self.wake_receiver_if_ok(self.std_sender.as_ref().unwrap().send(value))
    }

    /// Attempts to send a value on this channel.  Returns immediately.
    ///
    /// # Errors
    /// Returns [`TrySendError::Full`] when the channel's buffer is full.
    ///
    /// Returns [`TrySendError::Disconnected`] when the channel's receiver has been dropped.
    ///
    /// [`TrySendError::Full`]: std::sync::mpsc::TrySendError
    /// [`TrySendError::Disconnected`]: std::sync::mpsc::TrySendError
    #[allow(clippy::missing_panics_doc)]
    pub fn try_send(&self, value: T) -> Result<(), std::sync::mpsc::TrySendError<T>> {
        self.wake_receiver_if_ok(self.std_sender.as_ref().unwrap().try_send(value))
    }
}
impl<T: Send> Drop for SyncSender<T> {
    fn drop(&mut self) {
        let mut inner_guard = self.inner.lock().unwrap();
        self.std_sender.take();
        if Arc::strong_count(&self.inner) < 3 {
            // Either the receiver is already dropped or we are the last sender.
            // Either way, it's safe to wake any receiver.
            let opt_waker = inner_guard.receiver_waker.take();
            drop(inner_guard);
            if let Some(waker) = opt_waker {
                waker.wake();
            }
        }
    }
}
impl<T: Send> Debug for SyncSender<T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "SyncSender<{}>", type_name::<T>())
    }
}
impl<T: Send> PartialEq for SyncSender<T> {
    fn eq(&self, other: &Self) -> bool {
        Arc::ptr_eq(&self.inner, &other.inner)
    }
}
impl<T: Send> Eq for SyncSender<T> {}

/// The receiving half of a channel.  This half can only be owned by one thread.
///
/// Ways to receive messages sent to the channel:
/// - await this struct (it implements `Future`)
/// - call [`async_recv`]
/// - call one of the blocking receive methods.
pub struct Receiver<T>
where
    T: Send,
{
    std_receiver: Option<std::sync::mpsc::Receiver<T>>,
    inner: Arc<Mutex<Inner>>,
}
impl<T: Send> Receiver<T> {
    fn wake_senders(&self) {
        let wakers: Vec<Waker> = std::mem::take(&mut self.inner.lock().unwrap().sender_wakers);
        for waker in wakers {
            waker.wake();
        }
    }

    fn wake_senders_if_ok<E>(&self, result: Result<T, E>) -> Result<T, E> {
        if result.is_ok() {
            self.wake_senders();
        }
        result
    }

    /// Gets the next value from the channel.
    /// When the channel is empty,
    /// waits for a sender to add a value to it,
    /// then returns that value.
    ///
    /// If all corresponding senders have disconnected,
    /// or disconnect while this call is waiting,
    /// this call will wake up and return Err to indicate that no more messages can ever be
    /// received on this channel.  However, since channels are buffered, messages sent before the
    /// disconnect will still be properly received.
    ///
    /// This struct implements `Future`.  This method just returns `self.await`.
    ///
    /// # Errors
    /// Returns `RecvError` when the channel is empty and all channel senders have disconnected.
    pub async fn async_recv(&mut self) -> Result<T, std::sync::mpsc::RecvError> {
        self.await
    }

    /// Attempts to return a pending value on this receiver without blocking.
    ///
    /// This method will never block the caller in order to wait for data to become available.
    /// Instead, this will always return immediately with a possible option of pending data on the
    /// channel.
    ///
    /// This is useful for a flavor of “optimistic check” before deciding to block on a receiver.
    ///
    /// Compared with `recv`, this function has two failure cases
    /// instead of one (one for disconnection, one for an empty buffer).
    #[allow(clippy::missing_errors_doc)]
    #[allow(clippy::missing_panics_doc)]
    pub fn try_recv(&self) -> Result<T, std::sync::mpsc::TryRecvError> {
        self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().try_recv())
    }

    /// Attempts to wait for a value on this receiver, returning an error if the corresponding
    /// channel has hung up.
    ///
    /// This function will always block the current thread if there is no data available
    /// and it’s possible for more data to be sent (at least one sender still exists).
    /// Once a message is sent to the corresponding sender,
    /// this receiver will wake up and return that message.
    ///
    /// If the corresponding sender has disconnected, or it disconnects while this call is blocking,
    /// this call will wake up and return Err to indicate that no more messages can ever be
    /// received on this channel.  However, since channels are buffered, messages sent before the
    /// disconnect will still be properly received.
    #[allow(clippy::missing_errors_doc)]
    #[allow(clippy::missing_panics_doc)]
    pub fn recv(&self) -> Result<T, std::sync::mpsc::RecvError> {
        self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().recv())
    }

    /// Attempts to wait for a value on this receiver, returning an error if the corresponding
    /// channel has hung up, or if it waits more than timeout.
    ///
    /// This function will always block the current thread if there is no data available and it’s
    /// possible for more data to be sent (at least one sender still exists).  Once a message is
    /// sent to the corresponding sender, this receiver will wake up and return that message.
    ///
    /// If the corresponding sender has disconnected, or it disconnects while this call is blocking,
    /// this call will wake up and return Err to indicate that no more messages can ever be received
    /// on this channel.  However, since channels are buffered, messages sent before the disconnect
    /// will still be properly received.
    ///
    /// # Known Issues
    /// There is currently a known issue in the inner `std::sync::mpsc::Receiver`
    /// that can cause `recv_timeout` to panic unexpectedly.  See the explanation at
    /// [`std::sync::mpsc::Receiver::recv_timeout`](https://doc.rust-lang.org/std/sync/mpsc/struct.Receiver.html#method.recv_timeout).
    #[allow(clippy::missing_errors_doc)]
    #[allow(clippy::missing_panics_doc)]
    pub fn recv_timeout(
        &self,
        timeout: core::time::Duration,
    ) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
        self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().recv_timeout(timeout))
    }

    /// Attempts to wait for a value on this receiver, returning an error if the corresponding
    /// channel has hung up, or if deadline is reached.
    ///
    /// This function will always block the current thread if there is no data available and it’s
    /// possible for more data to be sent.  Once a message is sent to the corresponding sender,
    /// then this receiver will wake up and return that message.
    ///
    /// If the corresponding Sender has disconnected, or it disconnects while this call is blocking,
    /// this call will wake up and return Err to indicate that no more messages can ever be received
    /// on this channel.  However, since channels are buffered, messages sent before the disconnect
    /// will still be properly received.
    #[cfg(unstble)]
    #[allow(clippy::missing_errors_doc)]
    pub fn recv_deadline(
        &self,
        deadline: std::time::Instant,
    ) -> Result<T, std::sync::mpsc::RecvTimeoutError> {
        self.wake_senders_if_ok(self.std_receiver.as_ref().unwrap().recv_deadline(deadline))
    }

    /// Returns an iterator that will block waiting for messages, but never panic.
    /// It will return `None` when the channel has hung up.
    pub fn iter(&self) -> Iter<'_, T> {
        Iter { rx: self }
    }

    /// Returns an iterator that will attempt to yield all pending values.
    /// It will return `None` if there are no more pending values or if the channel has hung up.
    /// The iterator will never panic or block the user by waiting for values.
    pub fn try_iter(&self) -> TryIter<'_, T> {
        TryIter { rx: self }
    }
}
impl<T: Send> Drop for Receiver<T> {
    fn drop(&mut self) {
        let mut inner_guard = self.inner.lock().unwrap();
        self.std_receiver.take();
        let receiver_waker = inner_guard.receiver_waker.take();
        let sender_wakers: Vec<Waker> = std::mem::take(&mut inner_guard.sender_wakers);
        drop(inner_guard);
        drop(receiver_waker);
        for waker in sender_wakers {
            waker.wake();
        }
    }
}
#[doc(hidden)]
impl<T: Send> Future for Receiver<T> {
    type Output = Result<T, std::sync::mpsc::RecvError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut inner_guard = self.inner.lock().unwrap();
        match self.std_receiver.as_ref().unwrap().try_recv() {
            Ok(value) => {
                drop(inner_guard);
                self.wake_senders();
                Poll::Ready(Ok(value))
            }
            Err(TryRecvError::Disconnected) => Poll::Ready(Err(RecvError)),
            Err(TryRecvError::Empty) => {
                let waker = cx.waker().clone();
                if Arc::strong_count(&self.inner) < 2 {
                    // Last sender dropped.
                    Poll::Ready(Err(RecvError))
                } else {
                    let opt_waker = inner_guard.receiver_waker.replace(waker);
                    drop(inner_guard);
                    drop(opt_waker);
                    Poll::Pending
                }
            }
        }
    }
}
impl<T: Send> Debug for Receiver<T> {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "Receiver<{}>", type_name::<T>())
    }
}
impl<T: Send> PartialEq for Receiver<T> {
    fn eq(&self, _other: &Self) -> bool {
        false
    }
}
impl<T: Send> Eq for Receiver<T> {}
impl<'a, T: Send> IntoIterator for &'a Receiver<T> {
    type Item = T;
    type IntoIter = Iter<'a, T>;

    fn into_iter(self) -> Iter<'a, T> {
        self.iter()
    }
}

/// An iterator over messages on a [`Receiver`], created by [`iter`].
///
/// This iterator will block whenever [`next`] is called,
/// waiting for a new message, and [`None`] will be returned
/// when the corresponding channel has hung up.
///
/// [`iter`]: Receiver::iter
/// [`next`]: Iterator::next
#[derive(Debug)]
pub struct Iter<'a, T: 'a + Send> {
    rx: &'a Receiver<T>,
}
impl<'a, T: Send> Iterator for Iter<'a, T> {
    type Item = T;

    fn next(&mut self) -> Option<T> {
        self.rx.recv().ok()
    }
}

/// An owning iterator over messages on a [`Receiver`],
/// created by [`into_iter`].
///
/// This iterator will block whenever [`next`]
/// is called, waiting for a new message, and [`None`] will be
/// returned if the corresponding channel has hung up.
///
/// [`into_iter`]: Receiver::into_iter
/// [`next`]: Iterator::next
#[derive(Debug)]
pub struct IntoIter<T: Send> {
    rx: Receiver<T>,
}
impl<T: Send> Iterator for IntoIter<T> {
    type Item = T;
    fn next(&mut self) -> Option<T> {
        self.rx.recv().ok()
    }
}
impl<T: Send> IntoIterator for Receiver<T> {
    type Item = T;
    type IntoIter = IntoIter<T>;

    fn into_iter(self) -> IntoIter<T> {
        IntoIter { rx: self }
    }
}

/// An iterator that attempts to yield all pending values for a [`Receiver`],
/// created by [`try_iter`].
///
/// [`None`] will be returned when there are no pending values remaining or
/// if the corresponding channel has hung up.
///
/// This iterator will never block the caller in order to wait for data to
/// become available. Instead, it will return [`None`].
///
/// [`try_iter`]: Receiver::try_iter
#[derive(Debug)]
pub struct TryIter<'a, T: 'a + Send> {
    rx: &'a Receiver<T>,
}
impl<'a, T: Send> Iterator for TryIter<'a, T> {
    type Item = T;

    fn next(&mut self) -> Option<T> {
        self.rx.try_recv().ok()
    }
}

/// Creates a channel that can be used to send a single value.
///
/// Use the returned `Receiver` to get the value.
#[must_use]
pub fn oneshot<T>() -> (OneSender<T>, Receiver<T>)
where
    T: Send,
{
    let (std_sender, std_receiver) = std::sync::mpsc::sync_channel(1);
    let inner = Arc::new(Mutex::new(Inner {
        sender_wakers: Vec::new(),
        receiver_waker: None,
    }));
    (
        OneSender {
            std_sender: Some(std_sender),
            inner: inner.clone(),
        },
        Receiver {
            std_receiver: Some(std_receiver),
            inner,
        },
    )
}

/// Creates a new synchronous, bounded channel.
/// All data sent on the [`SyncSender`] will become available on the [`Receiver`]
/// in the same order as it was sent.
/// The [`Receiver`] will block until a message becomes available.
///
/// This channel has an internal buffer on which messages will be queued.
/// `bound` specifies the buffer size. When the internal buffer becomes full,
/// future sends will wait for the buffer to open up.
///
/// The [`SyncSender`] can be cloned to [`send`] to the same channel multiple
/// times, but only one [`Receiver`] is supported.
///
/// Like asynchronous channels, if the [`Receiver`] is disconnected while trying
/// to [`send`] with the [`SyncSender`], the [`send`] method will return a
/// [`SendError`]. Similarly, If the [`SyncSender`] is disconnected while trying
/// to [`recv`], the [`recv`] method will return a [`RecvError`].
///
/// [`send`]: SyncSender::send
/// [`recv`]: Receiver::recv
///
/// # Panics
/// Panics if `bound` is zero.
#[must_use]
#[allow(clippy::module_name_repetitions)]
pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)
where
    T: Send,
{
    assert!(bound > 0, "bound must be greater than zero");
    let (std_sender, std_receiver) = std::sync::mpsc::sync_channel(bound);
    let inner = Arc::new(Mutex::new(Inner {
        sender_wakers: Vec::new(),
        receiver_waker: None,
    }));
    (
        SyncSender {
            std_sender: Some(std_sender),
            inner: inner.clone(),
        },
        Receiver {
            std_receiver: Some(std_receiver),
            inner,
        },
    )
}