byor 1.0.0

Bring your own runtime!
Documentation
use crate::{channel::mpsc::*, runtime::Tokio};
use tokio::sync::mpsc::{Sender as TokioSender, UnboundedSender as TokioUnboundedSender};

impl<T: 'static> Sender<T> for TokioSender<T> {
    type SendError = tokio::sync::mpsc::error::SendError<T>;

    fn is_closed(&self) -> bool {
        self.is_closed()
    }
}

impl<T: 'static> SenderExt<T> for TokioSender<T> {
    fn closed(&mut self) -> impl Future<Output = ()> {
        TokioSender::closed(self)
    }

    fn same_channel(&self, other: &Self) -> bool {
        self.same_channel(other)
    }
}

impl<T: 'static> BoundedSender<T> for TokioSender<T> {
    type TrySendError = tokio::sync::mpsc::error::TrySendError<T>;

    fn send(&mut self, message: T) -> impl Future<Output = Result<(), Self::SendError>> {
        TokioSender::send(self, message)
    }

    fn try_send(&mut self, message: T) -> Result<(), Self::TrySendError> {
        TokioSender::try_send(self, message)
    }
}

impl<T: 'static> Receiver<T> for tokio_stream::wrappers::ReceiverStream<T> {
    type TryRecvError = tokio::sync::mpsc::error::TryRecvError;

    fn close(&mut self) {
        self.close()
    }

    fn try_recv(&mut self) -> Result<Option<T>, Self::TryRecvError> {
        match self.as_mut().try_recv() {
            Ok(t) => Ok(Some(t)),
            Err(Self::TryRecvError::Empty) => Ok(None),
            Err(Self::TryRecvError::Disconnected) => Err(Self::TryRecvError::Disconnected),
        }
    }
}

impl<T: 'static> Sender<T> for TokioUnboundedSender<T> {
    type SendError = <TokioSender<T> as Sender<T>>::SendError;

    fn is_closed(&self) -> bool {
        self.is_closed()
    }
}

impl<T: 'static> SenderExt<T> for TokioUnboundedSender<T> {
    fn closed(&mut self) -> impl Future<Output = ()> {
        TokioUnboundedSender::closed(self)
    }

    fn same_channel(&self, other: &Self) -> bool {
        self.same_channel(other)
    }
}

impl<T: 'static> UnboundedSender<T> for TokioUnboundedSender<T> {
    fn send(&self, message: T) -> Result<(), Self::SendError> {
        self.send(message)
    }
}

impl<T: 'static> Receiver<T> for tokio_stream::wrappers::UnboundedReceiverStream<T> {
    type TryRecvError = tokio::sync::mpsc::error::TryRecvError;

    fn close(&mut self) {
        self.close()
    }

    fn try_recv(&mut self) -> Result<Option<T>, Self::TryRecvError> {
        match self.as_mut().try_recv() {
            Ok(t) => Ok(Some(t)),
            Err(Self::TryRecvError::Empty) => Ok(None),
            Err(Self::TryRecvError::Disconnected) => Err(Self::TryRecvError::Disconnected),
        }
    }
}

impl RuntimeMpsc for Tokio {
    type BoundedSender<T: 'static> = tokio::sync::mpsc::Sender<T>;
    type BoundedReceiver<T: 'static> = tokio_stream::wrappers::ReceiverStream<T>;

    fn bounded_channel<T: 'static>(
        buffer: usize,
    ) -> (Self::BoundedSender<T>, Self::BoundedReceiver<T>) {
        let (tx, rx) = tokio::sync::mpsc::channel(buffer);

        (tx, rx.into())
    }

    type UnboundedSender<T: 'static> = tokio::sync::mpsc::UnboundedSender<T>;
    type UnboundedReceiver<T: 'static> = tokio_stream::wrappers::UnboundedReceiverStream<T>;

    fn unbounded_channel<T: 'static>() -> (Self::UnboundedSender<T>, Self::UnboundedReceiver<T>) {
        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();

        (tx, rx.into())
    }
}