use std::collections::VecDeque;
use std::future::poll_fn;
use std::{fmt, panic::UnwindSafe, pin::Pin, task::Context, task::Poll};
use futures_core::{FusedStream, Stream};
use futures_sink::Sink;
use super::cell::{Cell, WeakCell};
use crate::task::LocalWaker;
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let shared = Cell::new(Shared {
        has_receiver: true,
        buffer: VecDeque::new(),
        blocked_recv: LocalWaker::new(),
    });
    let sender = Sender {
        shared: shared.clone(),
    };
    let receiver = Receiver { shared };
    (sender, receiver)
}
#[derive(Debug)]
struct Shared<T> {
    buffer: VecDeque<T>,
    blocked_recv: LocalWaker,
    has_receiver: bool,
}
#[derive(Debug)]
pub struct Sender<T> {
    shared: Cell<Shared<T>>,
}
impl<T> Unpin for Sender<T> {}
impl<T> Sender<T> {
    pub fn send(&self, item: T) -> Result<(), SendError<T>> {
        let shared = self.shared.get_mut();
        if !shared.has_receiver {
            return Err(SendError(item)); };
        shared.buffer.push_back(item);
        shared.blocked_recv.wake();
        Ok(())
    }
    pub fn close(&self) {
        let shared = self.shared.get_mut();
        shared.has_receiver = false;
        shared.blocked_recv.wake();
    }
    pub fn is_closed(&self) -> bool {
        self.shared.strong_count() == 1 || !self.shared.get_ref().has_receiver
    }
    pub fn downgrade(self) -> WeakSender<T> {
        WeakSender {
            shared: self.shared.downgrade(),
        }
    }
}
impl<T> Clone for Sender<T> {
    fn clone(&self) -> Self {
        Sender {
            shared: self.shared.clone(),
        }
    }
}
impl<T> Sink<T> for Sender<T> {
    type Error = SendError<T>;
    fn poll_ready(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }
    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
        self.send(item)
    }
    fn poll_flush(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Result<(), SendError<T>>> {
        Poll::Ready(Ok(()))
    }
    fn poll_close(
        self: Pin<&mut Self>,
        _: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.close();
        Poll::Ready(Ok(()))
    }
}
impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        let count = self.shared.strong_count();
        let shared = self.shared.get_mut();
        if shared.has_receiver && count == 2 {
            shared.blocked_recv.wake();
        }
    }
}
#[derive(Debug)]
pub struct WeakSender<T> {
    shared: WeakCell<Shared<T>>,
}
impl<T> WeakSender<T> {
    pub fn upgrade(&self) -> Option<Sender<T>> {
        self.shared.upgrade().map(|shared| Sender { shared })
    }
}
#[derive(Debug)]
pub struct Receiver<T> {
    shared: Cell<Shared<T>>,
}
impl<T> Receiver<T> {
    pub fn sender(&self) -> Sender<T> {
        Sender {
            shared: self.shared.clone(),
        }
    }
    pub fn close(&self) {
        self.shared.get_mut().has_receiver = false;
    }
    pub fn is_closed(&self) -> bool {
        self.shared.strong_count() == 1 || !self.shared.get_ref().has_receiver
    }
    pub async fn recv(&self) -> Option<T> {
        poll_fn(|cx| self.poll_recv(cx)).await
    }
    pub fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        let shared = self.shared.get_mut();
        if let Some(msg) = shared.buffer.pop_front() {
            Poll::Ready(Some(msg))
        } else if shared.has_receiver {
            shared.blocked_recv.register(cx.waker());
            if self.shared.strong_count() == 1 {
                Poll::Ready(None)
            } else {
                Poll::Pending
            }
        } else {
            Poll::Ready(None)
        }
    }
}
impl<T> Unpin for Receiver<T> {}
impl<T> Stream for Receiver<T> {
    type Item = T;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.poll_recv(cx)
    }
}
impl<T> FusedStream for Receiver<T> {
    fn is_terminated(&self) -> bool {
        self.is_closed()
    }
}
impl<T> UnwindSafe for Receiver<T> {}
impl<T> Drop for Receiver<T> {
    fn drop(&mut self) {
        let shared = self.shared.get_mut();
        shared.buffer.clear();
        shared.has_receiver = false;
    }
}
pub struct SendError<T>(T);
impl<T> std::error::Error for SendError<T> {}
impl<T> fmt::Debug for SendError<T> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_tuple("SendError").field(&"...").finish()
    }
}
impl<T> fmt::Display for SendError<T> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(fmt, "send failed because receiver is gone")
    }
}
impl<T> SendError<T> {
    pub fn into_inner(self) -> T {
        self.0
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::{future::lazy, future::stream_recv, Stream};
    use futures_sink::Sink;
    #[ntex_macros::rt_test2]
    async fn test_mpsc() {
        let (tx, mut rx) = channel();
        assert!(format!("{:?}", tx).contains("Sender"));
        assert!(format!("{:?}", rx).contains("Receiver"));
        tx.send("test").unwrap();
        assert_eq!(stream_recv(&mut rx).await.unwrap(), "test");
        let tx2 = tx.clone();
        tx2.send("test2").unwrap();
        assert_eq!(stream_recv(&mut rx).await.unwrap(), "test2");
        assert_eq!(
            lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
            Poll::Pending
        );
        drop(tx2);
        assert_eq!(
            lazy(|cx| Pin::new(&mut rx).poll_next(cx)).await,
            Poll::Pending
        );
        drop(tx);
        let (tx, mut rx) = channel::<String>();
        tx.close();
        assert_eq!(stream_recv(&mut rx).await, None);
        let (tx, _rx) = channel::<String>();
        let weak_tx = tx.downgrade();
        assert!(weak_tx.upgrade().is_some());
        let (tx, rx) = channel();
        tx.send("test").unwrap();
        drop(rx);
        assert!(tx.send("test").is_err());
        let (tx, _) = channel();
        let tx2 = tx.clone();
        tx.close();
        assert!(tx.send("test").is_err());
        assert!(tx2.send("test").is_err());
        let err = SendError("test");
        assert!(format!("{:?}", err).contains("SendError"));
        assert!(format!("{}", err).contains("send failed because receiver is gone"));
        assert_eq!(err.into_inner(), "test");
    }
    #[ntex_macros::rt_test2]
    async fn test_sink() {
        let (mut tx, mut rx) = channel();
        lazy(|cx| {
            assert!(Pin::new(&mut tx).poll_ready(cx).is_ready());
            assert!(Pin::new(&mut tx).start_send("test").is_ok());
            assert!(Pin::new(&mut tx).poll_flush(cx).is_ready());
            assert!(Pin::new(&mut tx).poll_close(cx).is_ready());
        })
        .await;
        assert_eq!(stream_recv(&mut rx).await.unwrap(), "test");
        assert_eq!(stream_recv(&mut rx).await, None);
    }
    #[ntex_macros::rt_test2]
    async fn test_close() {
        let (tx, rx) = channel::<()>();
        assert!(!tx.is_closed());
        assert!(!rx.is_closed());
        assert!(!rx.is_terminated());
        tx.close();
        assert!(tx.is_closed());
        assert!(rx.is_closed());
        assert!(rx.is_terminated());
        let (tx, rx) = channel::<()>();
        rx.close();
        assert!(tx.is_closed());
        let (tx, rx) = channel::<()>();
        drop(tx);
        assert!(rx.is_closed());
        assert!(rx.is_terminated());
        let _tx = rx.sender();
        assert!(!rx.is_closed());
        assert!(!rx.is_terminated());
    }
}