use async_channel::{Receiver, Sender};
pub use async_channel::{RecvError, SendError, TryRecvError, TrySendError};
use futures::Stream;
use super::*;
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PacketProducer<I, A> {
sender: Sender<Packet<I, A>>,
}
#[pin_project::pin_project]
#[derive(Debug, Clone)]
#[repr(transparent)]
pub struct PacketSubscriber<I, A> {
#[pin]
receiver: Receiver<Packet<I, A>>,
}
pub fn packet_stream<T: Transport>() -> (
PacketProducer<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
PacketSubscriber<T::Id, <T::Resolver as AddressResolver>::ResolvedAddress>,
) {
let (sender, receiver) = async_channel::unbounded();
(PacketProducer { sender }, PacketSubscriber { receiver })
}
impl<I, A> PacketProducer<I, A> {
pub async fn send(&self, packet: Packet<I, A>) -> Result<(), SendError<Packet<I, A>>> {
self.sender.send(packet).await
}
pub fn try_send(&self, packet: Packet<I, A>) -> Result<(), TrySendError<Packet<I, A>>> {
self.sender.try_send(packet)
}
pub fn is_empty(&self) -> bool {
self.sender.is_empty()
}
pub fn len(&self) -> usize {
self.sender.len()
}
pub fn is_full(&self) -> bool {
self.sender.is_full()
}
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
pub fn close(&self) -> bool {
self.sender.close()
}
}
impl<I, A> PacketSubscriber<I, A> {
pub async fn recv(&self) -> Result<Packet<I, A>, RecvError> {
self.receiver.recv().await
}
pub fn try_recv(&self) -> Result<Packet<I, A>, TryRecvError> {
self.receiver.try_recv()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn is_full(&self) -> bool {
self.receiver.is_full()
}
pub fn is_closed(&self) -> bool {
self.receiver.is_closed()
}
pub fn close(&self) -> bool {
self.receiver.close()
}
}
impl<I, A> Stream for PacketSubscriber<I, A> {
type Item = <Receiver<Packet<I, A>> as Stream>::Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
<Receiver<_> as Stream>::poll_next(self.project().receiver, cx)
}
}
pub fn promised_stream<T: Transport>() -> (
StreamProducer<<T::Resolver as AddressResolver>::ResolvedAddress, T::Stream>,
StreamSubscriber<<T::Resolver as AddressResolver>::ResolvedAddress, T::Stream>,
) {
let (sender, receiver) = async_channel::bounded(1);
(StreamProducer { sender }, StreamSubscriber { receiver })
}
#[derive(Debug)]
#[repr(transparent)]
pub struct StreamProducer<A, S> {
sender: Sender<(A, S)>,
}
impl<A, S> Clone for StreamProducer<A, S> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
#[pin_project::pin_project]
#[derive(Debug)]
#[repr(transparent)]
pub struct StreamSubscriber<A, S> {
#[pin]
receiver: Receiver<(A, S)>,
}
impl<A, S> Clone for StreamSubscriber<A, S> {
fn clone(&self) -> Self {
Self {
receiver: self.receiver.clone(),
}
}
}
impl<A, S> StreamProducer<A, S> {
pub async fn send(&self, addr: A, conn: S) -> Result<(), SendError<(A, S)>> {
self.sender.send((addr, conn)).await
}
pub fn try_send(&self, addr: A, conn: S) -> Result<(), TrySendError<(A, S)>> {
self.sender.try_send((addr, conn))
}
pub fn is_empty(&self) -> bool {
self.sender.is_empty()
}
pub fn len(&self) -> usize {
self.sender.len()
}
pub fn is_full(&self) -> bool {
self.sender.is_full()
}
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
pub fn close(&self) -> bool {
self.sender.close()
}
}
impl<A, S> StreamSubscriber<A, S> {
pub async fn recv(&self) -> Result<(A, S), RecvError> {
self.receiver.recv().await
}
pub fn try_recv(&self) -> Result<(A, S), TryRecvError> {
self.receiver.try_recv()
}
pub fn is_empty(&self) -> bool {
self.receiver.is_empty()
}
pub fn len(&self) -> usize {
self.receiver.len()
}
pub fn is_full(&self) -> bool {
self.receiver.is_full()
}
pub fn is_closed(&self) -> bool {
self.receiver.is_closed()
}
pub fn close(&self) -> bool {
self.receiver.close()
}
}
impl<A, S> Stream for StreamSubscriber<A, S> {
type Item = <Receiver<(A, S)> as Stream>::Item;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
<Receiver<_> as Stream>::poll_next(self.project().receiver, cx)
}
}