1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
use core::task::{Context, Poll}; use futures_core::Stream; use tokio::macros::support::Pin; #[derive(Copy, Clone)] pub struct TryRecvError(()); pub enum TrySendError<T> { Full(T), Closed(T), } pub struct SendError<T>(pub T); #[cfg(feature = "tokio_asyncs")] pub mod mpsc_impl { use crate::asyncs::sync::mpsc::{SendError, TryRecvError, TrySendError}; use core::task::{Context, Poll}; pub struct ReceiverImpl<T>(tokio::sync::mpsc::Receiver<T>); impl<T> ReceiverImpl<T> { pub async fn recv(&mut self) -> Option<T> { self.0.recv().await } pub fn try_recv(&mut self) -> Result<T, TryRecvError> { self.0.try_recv().map_err(|_| TryRecvError(())) } pub fn close(&mut self) { self.0.close(); } pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.0.poll_recv(cx) } } impl<T> From<tokio::sync::mpsc::Receiver<T>> for ReceiverImpl<T> { fn from(r: tokio::sync::mpsc::Receiver<T>) -> Self { Self(r) } } impl<T> From<tokio::sync::mpsc::error::TrySendError<T>> for TrySendError<T> { fn from(e: tokio::sync::mpsc::error::TrySendError<T>) -> Self { match e { tokio::sync::mpsc::error::TrySendError::Full(t) => TrySendError::Full(t), tokio::sync::mpsc::error::TrySendError::Closed(t) => TrySendError::Closed(t), } } } pub struct SenderImpl<T>(tokio::sync::mpsc::Sender<T>); impl<T> Clone for SenderImpl<T> { fn clone(&self) -> Self { Self(self.0.clone()) } } impl<T> SenderImpl<T> { pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { self.0.try_send(message).map_err(TrySendError::from) } pub async fn send(&mut self, message: T) -> Result<(), SendError<T>> { self.0.send(message).await.map_err(|e| SendError(e.0)) } } impl<T> From<tokio::sync::mpsc::Sender<T>> for SenderImpl<T> { fn from(s: tokio::sync::mpsc::Sender<T>) -> Self { Self(s) } } pub fn channel<T>(buffer_size: usize) -> (SenderImpl<T>, ReceiverImpl<T>) { let (tx, rx) = tokio::sync::mpsc::channel(buffer_size); (SenderImpl(tx), ReceiverImpl(rx)) } } pub struct Receiver<T>(mpsc_impl::ReceiverImpl<T>); impl<T> Receiver<T> { pub async fn recv(&mut self) -> Option<T> { self.0.recv().await } pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { self.0.poll_recv(cx) } pub fn try_recv(&mut self) -> Result<T, TryRecvError> { self.0.try_recv() } } impl<T> Stream for Receiver<T> { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { self.poll_recv(cx) } } pub struct Sender<T>(mpsc_impl::SenderImpl<T>); impl<T> Clone for Sender<T> { fn clone(&self) -> Self { Self(self.0.clone()) } } impl<T> Sender<T> { pub async fn send(&mut self, message: T) -> Result<(), SendError<T>> { self.0.send(message).await } pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> { self.0.try_send(message) } } pub fn channel<T>(buffer_size: usize) -> (Sender<T>, Receiver<T>) { let (tx, rx) = mpsc_impl::channel(buffer_size); (Sender(tx), Receiver(rx)) }