use crate::message::Message;
use crate::transport::AsyncTransport;
use futures_channel::mpsc;
use futures_core::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use thiserror::Error;
pub fn bounded(fifo_size: usize) -> (Bounded, Bounded) {
let (sender1, receiver1) = mpsc::channel(fifo_size);
let (sender2, receiver2) = mpsc::channel(fifo_size);
(
Bounded::new(receiver1, sender2),
Bounded::new(receiver2, sender1),
)
}
#[derive(Debug)]
pub struct Bounded {
receiver: mpsc::Receiver<Message>,
sender: mpsc::Sender<Message>,
}
impl Bounded {
fn new(receiver: mpsc::Receiver<Message>, sender: mpsc::Sender<Message>) -> Self {
Self { receiver, sender }
}
}
#[derive(Error, Debug, Copy, Clone, PartialEq, Eq)]
#[error("disconnected")]
pub struct Disconnected;
impl AsyncTransport for Bounded {
type Error = Disconnected;
fn receive_poll(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<Message, Disconnected>> {
match Pin::new(&mut self.receiver).poll_next(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
Poll::Ready(None) => Poll::Ready(Err(Disconnected)),
Poll::Pending => Poll::Pending,
}
}
fn send_poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Disconnected>> {
match self.sender.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Err(Disconnected)),
Poll::Pending => Poll::Pending,
}
}
fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Disconnected> {
self.sender.start_send(msg).map_err(|_| Disconnected)
}
fn send_poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<(), Disconnected>> {
match self.sender.poll_ready(cx) {
Poll::Ready(_) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
}
}
pub fn unbounded() -> (Unbounded, Unbounded) {
let (sender1, receiver1) = mpsc::unbounded();
let (sender2, receiver2) = mpsc::unbounded();
(
Unbounded::new(receiver1, sender2),
Unbounded::new(receiver2, sender1),
)
}
#[derive(Debug)]
pub struct Unbounded {
receiver: mpsc::UnboundedReceiver<Message>,
sender: mpsc::UnboundedSender<Message>,
}
impl Unbounded {
fn new(
receiver: mpsc::UnboundedReceiver<Message>,
sender: mpsc::UnboundedSender<Message>,
) -> Self {
Self { receiver, sender }
}
}
impl AsyncTransport for Unbounded {
type Error = Disconnected;
fn receive_poll(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<Message, Disconnected>> {
match Pin::new(&mut self.receiver).poll_next(cx) {
Poll::Ready(Some(msg)) => Poll::Ready(Ok(msg)),
Poll::Ready(None) => Poll::Ready(Err(Disconnected)),
Poll::Pending => Poll::Pending,
}
}
fn send_poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Disconnected>> {
match self.sender.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Err(Disconnected)),
Poll::Pending => Poll::Pending,
}
}
fn send_start(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Disconnected> {
self.sender.start_send(msg).map_err(|_| Disconnected)
}
fn send_poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Disconnected>> {
match self.sender.poll_ready(cx) {
Poll::Ready(_) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
}
}