use crate::{Negotiated, NegotiationError};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
use futures::{future::Either, prelude::*};
use std::{convert::TryFrom as _, io, iter, mem, pin::Pin, task::{Context, Poll}};
pub fn dialer_select_proto<R, I>(
    inner: R,
    protocols: I,
    version: Version
) -> DialerSelectFuture<R, I::IntoIter>
where
    R: AsyncRead + AsyncWrite,
    I: IntoIterator,
    I::Item: AsRef<[u8]>
{
    let iter = protocols.into_iter();
    
    
    
    Either::Left(dialer_select_proto_serial(inner, iter, version))
    
    
    
    
    
    
}
pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPar<R, I>>;
pub(crate) fn dialer_select_proto_serial<R, I>(
    inner: R,
    protocols: I,
    version: Version
) -> DialerSelectSeq<R, I::IntoIter>
where
    R: AsyncRead + AsyncWrite,
    I: IntoIterator,
    I::Item: AsRef<[u8]>
{
    let protocols = protocols.into_iter().peekable();
    DialerSelectSeq {
        version,
        protocols,
        state: SeqState::SendHeader {
            io: MessageIO::new(inner),
        }
    }
}
pub(crate) fn dialer_select_proto_parallel<R, I>(
    inner: R,
    protocols: I,
    version: Version
) -> DialerSelectPar<R, I::IntoIter>
where
    R: AsyncRead + AsyncWrite,
    I: IntoIterator,
    I::Item: AsRef<[u8]>
{
    let protocols = protocols.into_iter();
    DialerSelectPar {
        version,
        protocols,
        state: ParState::SendHeader {
            io: MessageIO::new(inner)
        }
    }
}
#[pin_project::pin_project]
pub struct DialerSelectSeq<R, I>
where
    R: AsyncRead + AsyncWrite,
    I: Iterator,
    I::Item: AsRef<[u8]>
{
    
    protocols: iter::Peekable<I>,
    state: SeqState<R, I::Item>,
    version: Version,
}
enum SeqState<R, N>
where
    R: AsyncRead + AsyncWrite,
    N: AsRef<[u8]>
{
    SendHeader { io: MessageIO<R>, },
    SendProtocol { io: MessageIO<R>, protocol: N },
    FlushProtocol { io: MessageIO<R>, protocol: N },
    AwaitProtocol { io: MessageIO<R>, protocol: N },
    Done
}
impl<R, I> Future for DialerSelectSeq<R, I>
where
    
    
    R: AsyncRead + AsyncWrite + Unpin,
    I: Iterator,
    I::Item: AsRef<[u8]>
{
    type Output = Result<(I::Item, Negotiated<R>), NegotiationError>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        loop {
            match mem::replace(this.state, SeqState::Done) {
                SeqState::SendHeader { mut io } => {
                    match Pin::new(&mut io).poll_ready(cx)? {
                        Poll::Ready(()) => {},
                        Poll::Pending => {
                            *this.state = SeqState::SendHeader { io };
                            return Poll::Pending
                        },
                    }
                    if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
                        return Poll::Ready(Err(From::from(err)));
                    }
                    let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
                    *this.state = SeqState::SendProtocol { io, protocol };
                }
                SeqState::SendProtocol { mut io, protocol } => {
                    match Pin::new(&mut io).poll_ready(cx)? {
                        Poll::Ready(()) => {},
                        Poll::Pending => {
                            *this.state = SeqState::SendProtocol { io, protocol };
                            return Poll::Pending
                        },
                    }
                    let p = Protocol::try_from(protocol.as_ref())?;
                    if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
                        return Poll::Ready(Err(From::from(err)));
                    }
                    log::debug!("Dialer: Proposed protocol: {}", p);
                    if this.protocols.peek().is_some() {
                        *this.state = SeqState::FlushProtocol { io, protocol }
                    } else {
                        match this.version {
                            Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
                            Version::V1Lazy => {
                                log::debug!("Dialer: Expecting proposed protocol: {}", p);
                                let io = Negotiated::expecting(io.into_reader(), p, *this.version);
                                return Poll::Ready(Ok((protocol, io)))
                            }
                        }
                    }
                }
                SeqState::FlushProtocol { mut io, protocol } => {
                    match Pin::new(&mut io).poll_flush(cx)? {
                        Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol },
                        Poll::Pending => {
                            *this.state = SeqState::FlushProtocol { io, protocol };
                            return Poll::Pending
                        },
                    }
                }
                SeqState::AwaitProtocol { mut io, protocol } => {
                    let msg = match Pin::new(&mut io).poll_next(cx)? {
                        Poll::Ready(Some(msg)) => msg,
                        Poll::Pending => {
                            *this.state = SeqState::AwaitProtocol { io, protocol };
                            return Poll::Pending
                        }
                        Poll::Ready(None) =>
                            return Poll::Ready(Err(NegotiationError::from(
                                io::Error::from(io::ErrorKind::UnexpectedEof)))),
                    };
                    match msg {
                        Message::Header(v) if v == *this.version => {
                            *this.state = SeqState::AwaitProtocol { io, protocol };
                        }
                        Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
                            log::debug!("Dialer: Received confirmation for protocol: {}", p);
                            let io = Negotiated::completed(io.into_inner());
                            return Poll::Ready(Ok((protocol, io)));
                        }
                        Message::NotAvailable => {
                            log::debug!("Dialer: Received rejection of protocol: {}",
                                String::from_utf8_lossy(protocol.as_ref()));
                            let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
                            *this.state = SeqState::SendProtocol { io, protocol }
                        }
                        _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
                    }
                }
                SeqState::Done => panic!("SeqState::poll called after completion")
            }
        }
    }
}
#[pin_project::pin_project]
pub struct DialerSelectPar<R, I>
where
    R: AsyncRead + AsyncWrite,
    I: Iterator,
    I::Item: AsRef<[u8]>
{
    protocols: I,
    state: ParState<R, I::Item>,
    version: Version,
}
enum ParState<R, N>
where
    R: AsyncRead + AsyncWrite,
    N: AsRef<[u8]>
{
    SendHeader { io: MessageIO<R> },
    SendProtocolsRequest { io: MessageIO<R> },
    Flush { io: MessageIO<R> },
    RecvProtocols { io: MessageIO<R> },
    SendProtocol { io: MessageIO<R>, protocol: N },
    Done
}
impl<R, I> Future for DialerSelectPar<R, I>
where
    
    
    R: AsyncRead + AsyncWrite + Unpin,
    I: Iterator,
    I::Item: AsRef<[u8]>
{
    type Output = Result<(I::Item, Negotiated<R>), NegotiationError>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        loop {
            match mem::replace(this.state, ParState::Done) {
                ParState::SendHeader { mut io } => {
                    match Pin::new(&mut io).poll_ready(cx)? {
                        Poll::Ready(()) => {},
                        Poll::Pending => {
                            *this.state = ParState::SendHeader { io };
                            return Poll::Pending
                        },
                    }
                    if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
                        return Poll::Ready(Err(From::from(err)));
                    }
                    *this.state = ParState::SendProtocolsRequest { io };
                }
                ParState::SendProtocolsRequest { mut io } => {
                    match Pin::new(&mut io).poll_ready(cx)? {
                        Poll::Ready(()) => {},
                        Poll::Pending => {
                            *this.state = ParState::SendProtocolsRequest { io };
                            return Poll::Pending
                        },
                    }
                    if let Err(err) = Pin::new(&mut io).start_send(Message::ListProtocols) {
                        return Poll::Ready(Err(From::from(err)));
                    }
                    log::debug!("Dialer: Requested supported protocols.");
                    *this.state = ParState::Flush { io }
                }
                ParState::Flush { mut io } => {
                    match Pin::new(&mut io).poll_flush(cx)? {
                        Poll::Ready(()) => *this.state = ParState::RecvProtocols { io },
                        Poll::Pending => {
                            *this.state = ParState::Flush { io };
                            return Poll::Pending
                        },
                    }
                }
                ParState::RecvProtocols { mut io } => {
                    let msg = match Pin::new(&mut io).poll_next(cx)? {
                        Poll::Ready(Some(msg)) => msg,
                        Poll::Pending => {
                            *this.state = ParState::RecvProtocols { io };
                            return Poll::Pending
                        }
                        Poll::Ready(None) =>
                            return Poll::Ready(Err(NegotiationError::from(
                                io::Error::from(io::ErrorKind::UnexpectedEof)))),
                    };
                    match &msg {
                        Message::Header(v) if v == this.version => {
                            *this.state = ParState::RecvProtocols { io }
                        }
                        Message::Protocols(supported) => {
                            let protocol = this.protocols.by_ref()
                                .find(|p| supported.iter().any(|s|
                                    s.as_ref() == p.as_ref()))
                                .ok_or(NegotiationError::Failed)?;
                            log::debug!("Dialer: Found supported protocol: {}",
                                String::from_utf8_lossy(protocol.as_ref()));
                            *this.state = ParState::SendProtocol { io, protocol };
                        }
                        _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
                    }
                }
                ParState::SendProtocol { mut io, protocol } => {
                    match Pin::new(&mut io).poll_ready(cx)? {
                        Poll::Ready(()) => {},
                        Poll::Pending => {
                            *this.state = ParState::SendProtocol { io, protocol };
                            return Poll::Pending
                        },
                    }
                    let p = Protocol::try_from(protocol.as_ref())?;
                    if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
                        return Poll::Ready(Err(From::from(err)));
                    }
                    log::debug!("Dialer: Expecting proposed protocol: {}", p);
                    let io = Negotiated::expecting(io.into_reader(), p, *this.version);
                    return Poll::Ready(Ok((protocol, io)))
                }
                ParState::Done => panic!("ParState::poll called after completion")
            }
        }
    }
}