1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
//! Simple async/sync channels used in various parts of this crate.

use std::{
    pin::Pin,
    task::{Context, Poll},
};

/// An error on send

#[derive(Debug)]
pub enum TrySendError<T> {
    /// The receiver was closed

    Closed(T),
    /// The receiver was full

    Full(T),
}

/// Async and Sync MPMP Sender.

#[derive(Clone)]
pub struct Sender<T> {
    inner: async_channel::Sender<T>,
}

impl<T> std::fmt::Debug for Sender<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Sender").finish()
    }
}

impl<T> Sender<T> {
    /// Send this item asynchronously.

    ///

    /// On failure, return the sent item.

    pub async fn send(&self, item: T) -> Result<(), T>
    where
        T: Send,
    {
        self.inner.send(item).await.map_err(|e| e.into_inner())
    }

    /// Send this item synchronously.

    ///

    /// On failure, returns why and the item.

    pub fn try_send(&self, item: T) -> Result<(), TrySendError<T>> {
        self.inner.try_send(item).map_err(|e| match e {
            async_channel::TrySendError::Full(t) => TrySendError::Full(t),
            async_channel::TrySendError::Closed(t) => TrySendError::Closed(t),
        })
    }
}

pin_project_lite::pin_project! {
    /// Async and Sync MPMP Receiver.

    #[derive(Clone)]
    pub struct Receiver<T> {
        #[pin]
        inner: async_channel::Receiver<T>,
    }
}

impl<T> Receiver<T> {
    /// Asynchronously receives an item

    ///

    /// If this returns None, the Sender was closed

    pub async fn recv(&self) -> Option<T>
    where
        T: Send,
    {
        self.inner.recv().await.ok()
    }

    /// Close the receiver

    pub fn close(&self) -> bool {
        self.inner.close()
    }

    /// Synchronously receives an item

    ///

    /// If this returns None, the Sender was closed

    pub fn try_recv(&self) -> Option<T> {
        self.inner.try_recv().ok()
    }
}

impl<T> futures_lite::Stream for Receiver<T> {
    type Item = T;
    fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        this.inner.poll_next(ctx)
    }
}

/// Create a bounded channel

pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
    let (tx, rx) = async_channel::bounded(cap);
    (Sender { inner: tx }, Receiver { inner: rx })
}

/// Create an unbounded channel

pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
    let (tx, rx) = async_channel::unbounded();
    (Sender { inner: tx }, Receiver { inner: rx })
}