use core::pin::Pin;
use futures::{
task::{Context, Poll},
Stream,
};
use pin_project::pin_project;
use tokio::sync::mpsc;
use crate::Error;
pub fn unbounded<T>() -> (ChannelTx<T>, ChannelRx<T>) {
let (tx, rx) = mpsc::unbounded_channel();
(ChannelTx(tx), ChannelRx(rx))
}
#[derive(Debug, Clone)]
pub struct ChannelTx<T>(mpsc::UnboundedSender<T>);
impl<T> ChannelTx<T> {
pub fn send(&self, value: T) -> Result<(), Error> {
self.0.send(value).map_err(Error::send)
}
}
#[pin_project]
#[derive(Debug)]
pub struct ChannelRx<T>(#[pin] mpsc::UnboundedReceiver<T>);
impl<T> ChannelRx<T> {
#[allow(dead_code)]
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await
}
}
impl<T> Stream for ChannelRx<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_recv(cx)
}
}