use std::{
pin::Pin,
task::{Context, Poll},
};
use futures::{Sink, SinkExt, Stream};
use tokio::sync::mpsc::{
self, Receiver,
error::{TryRecvError, TrySendError},
};
use tokio_util::sync::{PollSendError, PollSender};
pub struct Channel<S, R> {
tx: PollSender<S>,
rx: Receiver<R>,
}
pub fn channel<S, R>(tx_buffer: usize, rx_buffer: usize) -> (Channel<S, R>, Channel<R, S>)
where
S: Send,
R: Send,
{
let (tx1, rx1) = mpsc::channel(tx_buffer);
let (tx2, rx2) = mpsc::channel(rx_buffer);
let tx1 = PollSender::new(tx1);
let tx2 = PollSender::new(tx2);
(Channel { tx: tx1, rx: rx2 }, Channel { tx: tx2, rx: rx1 })
}
impl<S: Send + 'static, R> Channel<S, R> {
pub async fn send(&mut self, msg: S) -> Result<(), PollSendError<S>> {
self.tx.send(msg).await
}
pub fn try_send(&mut self, msg: S) -> Result<(), TrySendError<S>> {
if let Some(tx) = self.tx.get_ref() {
tx.try_send(msg)
} else {
Err(TrySendError::Closed(msg))
}
}
pub async fn recv(&mut self) -> Option<R> {
self.rx.recv().await
}
pub fn try_recv(&mut self) -> Result<R, TryRecvError> {
self.rx.try_recv()
}
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<R>> {
self.rx.poll_recv(cx)
}
}
impl<S, R> Stream for Channel<S, R> {
type Item = R;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
impl<S: Send + 'static, R> Sink<S> for Channel<S, R> {
type Error = PollSendError<S>;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_ready_unpin(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: S) -> Result<(), Self::Error> {
self.tx.start_send_unpin(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_flush_unpin(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tx.poll_close_unpin(cx)
}
}