use super::{
    exchange::{Recv, Send},
    Session,
};
use futures::{Future, FutureExt, Stream};
use std::{
    marker,
    pin::Pin,
    task::{Context, Poll},
};
#[must_use]
pub struct Dequeue<T, S: Session = ()> {
    deq: Recv<Queue<T, S>>,
}
#[must_use]
pub struct Enqueue<T, S: Session = ()> {
    enq: Send<Queue<T, S::Dual>>,
}
pub enum Queue<T, S: Session = ()> {
    Item(T, Dequeue<T, S>),
    Closed(S),
}
impl<T, S: Session> Session for Dequeue<T, S>
where
    T: marker::Send + 'static,
{
    type Dual = Enqueue<T, S::Dual>;
    fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self {
        Self {
            deq: Recv::fork_sync(|send| f(Enqueue { enq: send })),
        }
    }
    fn link(self, dual: Self::Dual) {
        self.deq.link(dual.enq)
    }
}
impl<T, S: Session> Session for Enqueue<T, S>
where
    T: marker::Send + 'static,
{
    type Dual = Dequeue<T, S::Dual>;
    fn fork_sync(f: impl FnOnce(Self::Dual)) -> Self {
        Self {
            enq: Send::fork_sync(|recv| f(Dequeue { deq: recv })),
        }
    }
    fn link(self, dual: Self::Dual) {
        self.enq.link(dual.deq)
    }
}
impl<T, S: Session> Dequeue<T, S>
where
    T: marker::Send + 'static,
{
            #[must_use]
    pub async fn pop(self) -> Queue<T, S> {
        self.deq.recv1().await
    }
                #[must_use]
    pub async fn fold<A, F>(mut self, init: A, mut f: impl FnMut(A, T) -> F) -> (A, S)
    where
        F: Future<Output = A>,
    {
        let mut accum = init;
        loop {
            match self.pop().await {
                Queue::Item(item, rest) => {
                    accum = f(accum, item).await;
                    self = rest;
                }
                Queue::Closed(session) => return (accum, session),
            }
        }
    }
            #[must_use]
    pub async fn for_each<F>(self, mut f: impl FnMut(T) -> F) -> S
    where
        F: Future<Output = ()>,
    {
        self.fold((), |(), item| f(item)).await.1
    }
            #[must_use]
    pub fn into_stream(self) -> DequeueStream<T, S> {
        DequeueStream {
            future: Box::pin(self.pop()),
        }
    }
}
impl<T> Dequeue<T, ()>
where
    T: marker::Send + 'static,
{
            pub async fn fold1<A, F>(self, init: A, f: impl FnMut(A, T) -> F) -> A
    where
        F: Future<Output = A>,
    {
        self.fold(init, f).await.0
    }
            pub async fn for_each1<F>(self, f: impl FnMut(T) -> F)
    where
        F: Future<Output = ()>,
    {
        self.for_each(f).await
    }
        #[must_use]
    pub fn into_stream1(self) -> DequeueStream1<T> {
        DequeueStream1 {
            future: Box::pin(self.pop()),
        }
    }
}
impl<T, S: Session> Enqueue<T, S>
where
    T: marker::Send + 'static,
{
            #[must_use]
    pub fn close(self) -> S {
        S::fork_sync(|dual| self.enq.send1(Queue::Closed(dual)))
    }
            pub fn push(self, item: T) -> Self {
        Self::fork_sync(|dual| self.enq.send1(Queue::Item(item, dual)))
    }
}
impl<T> Enqueue<T, ()>
where
    T: marker::Send + 'static,
{
        pub fn close1(self) {
        self.close()
    }
}
pub struct DequeueStream<T, S: Session> {
    future: Pin<Box<dyn Future<Output = Queue<T, S>> + marker::Send + 'static>>,
}
pub enum Next<T, S: Session> {
    Item(T),
    Closed(S),
}
impl<T, S: Session> Stream for DequeueStream<T, S>
where
    T: marker::Send + 'static,
{
    type Item = Next<T, S>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.future.poll_unpin(cx) {
            Poll::Ready(Queue::Item(value, next)) => {
                self.future = Box::pin(next.pop());
                Poll::Ready(Some(Next::Item(value)))
            }
            Poll::Ready(Queue::Closed(session)) => Poll::Ready(Some(Next::Closed(session))),
            Poll::Pending => Poll::Pending,
        }
    }
}
pub struct DequeueStream1<T> {
    future: Pin<Box<dyn Future<Output = Queue<T, ()>> + marker::Send + 'static>>,
}
impl<T> Stream for DequeueStream1<T>
where
    T: marker::Send + 'static,
{
    type Item = T;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        match self.future.poll_unpin(cx) {
            Poll::Ready(Queue::Item(value, next)) => {
                self.future = Box::pin(next.pop());
                Poll::Ready(Some(value))
            }
            Poll::Ready(Queue::Closed(())) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}