use crate::{
    PeerId,
    muxing::StreamMuxer,
    nodes::{
        handled_node::{HandledNode, HandledNodeError, NodeHandler},
        node::Substream
    }
};
use fnv::FnvHashMap;
use futures::{prelude::*, stream, sync::mpsc};
use smallvec::SmallVec;
use std::{
    collections::hash_map::{Entry, OccupiedEntry},
    error,
    fmt,
    mem
};
use tokio_executor;
use void::Void;
mod tests;
pub struct HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId = PeerId> {
    
    
    
    tasks: FnvHashMap<TaskId, mpsc::UnboundedSender<TInEvent>>,
    
    next_task_id: TaskId,
    
    
    to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
    
    events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
    
    events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>, TaskId)>,
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
    HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
{
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        f.debug_list()
            .entries(self.tasks.keys().cloned())
            .finish()
    }
}
#[derive(Debug)]
pub enum TaskClosedEvent<TReachErr, THandlerErr> {
    
    Reach(TReachErr),
    
    Node(HandledNodeError<THandlerErr>),
}
impl<TReachErr, THandlerErr> fmt::Display for TaskClosedEvent<TReachErr, THandlerErr>
where
    TReachErr: fmt::Display,
    THandlerErr: fmt::Display,
{
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            TaskClosedEvent::Reach(err) => write!(f, "{}", err),
            TaskClosedEvent::Node(err) => write!(f, "{}", err),
        }
    }
}
impl<TReachErr, THandlerErr> error::Error for TaskClosedEvent<TReachErr, THandlerErr>
where
    TReachErr: error::Error + 'static,
    THandlerErr: error::Error + 'static
{
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        match self {
            TaskClosedEvent::Reach(err) => Some(err),
            TaskClosedEvent::Node(err) => Some(err),
        }
    }
}
pub trait IntoNodeHandler<TPeerId = PeerId> {
    
    type Handler: NodeHandler;
    
    
    
    fn into_handler(self, remote_peer_id: &TPeerId) -> Self::Handler;
}
impl<T, TPeerId> IntoNodeHandler<TPeerId> for T
where T: NodeHandler
{
    type Handler = Self;
    #[inline]
    fn into_handler(self, _: &TPeerId) -> Self {
        self
    }
}
#[derive(Debug)]
pub enum HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId = PeerId> {
    
    
    
    
    TaskClosed {
        
        id: TaskId,
        
        result: Result<(), TaskClosedEvent<TReachErr, THandlerErr>>,
        
        
        handler: Option<TIntoHandler>,
    },
    
    NodeReached {
        
        id: TaskId,
        
        peer_id: TPeerId,
    },
    
    NodeEvent {
        
        id: TaskId,
        
        event: TOutEvent,
    },
}
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct TaskId(usize);
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
    HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
{
    
    #[inline]
    pub fn new() -> Self {
        let (events_tx, events_rx) = mpsc::unbounded();
        HandledNodesTasks {
            tasks: Default::default(),
            next_task_id: TaskId(0),
            to_spawn: SmallVec::new(),
            events_tx,
            events_rx,
        }
    }
    
    
    
    
    pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: TIntoHandler) -> TaskId
    where
        TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
        TIntoHandler: IntoNodeHandler<TPeerId> + Send + 'static,
        TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
        TReachErr: error::Error + Send + 'static,
        THandlerErr: error::Error + Send + 'static,
        TInEvent: Send + 'static,
        TOutEvent: Send + 'static,
        <TIntoHandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,     
        TMuxer: StreamMuxer + Send + Sync + 'static,  
        TMuxer::OutboundSubstream: Send + 'static,  
        TPeerId: Send + 'static,
    {
        let task_id = self.next_task_id;
        self.next_task_id.0 += 1;
        let (tx, rx) = mpsc::unbounded();
        self.tasks.insert(task_id, tx);
        let task = Box::new(NodeTask {
            inner: NodeTaskInner::Future {
                future,
                handler,
                events_buffer: Vec::new(),
            },
            events_tx: self.events_tx.clone(),
            in_events_rx: rx.fuse(),
            id: task_id,
        });
        self.to_spawn.push(task);
        task_id
    }
    
    pub fn broadcast_event(&mut self, event: &TInEvent)
    where TInEvent: Clone,
    {
        for sender in self.tasks.values() {
            
            
            
            let _ = sender.unbounded_send(event.clone());
        }
    }
    
    
    
    #[inline]
    pub fn task(&mut self, id: TaskId) -> Option<Task<TInEvent>> {
        match self.tasks.entry(id) {
            Entry::Occupied(inner) => Some(Task { inner }),
            Entry::Vacant(_) => None,
        }
    }
    
    #[inline]
    pub fn tasks<'a>(&'a self) -> impl Iterator<Item = TaskId> + 'a {
        self.tasks.keys().cloned()
    }
    
    pub fn poll(&mut self) -> Async<HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>> {
        for to_spawn in self.to_spawn.drain() {
            tokio_executor::spawn(to_spawn);
        }
        loop {
            match self.events_rx.poll() {
                Ok(Async::Ready(Some((message, task_id)))) => {
                    
                    
                    
                    if !self.tasks.contains_key(&task_id) {
                        continue;
                    };
                    match message {
                        InToExtMessage::NodeEvent(event) => {
                            break Async::Ready(HandledNodesEvent::NodeEvent {
                                id: task_id,
                                event,
                            });
                        },
                        InToExtMessage::NodeReached(peer_id) => {
                            break Async::Ready(HandledNodesEvent::NodeReached {
                                id: task_id,
                                peer_id,
                            });
                        },
                        InToExtMessage::TaskClosed(result, handler) => {
                            let _ = self.tasks.remove(&task_id);
                            break Async::Ready(HandledNodesEvent::TaskClosed {
                                id: task_id, result, handler
                            });
                        },
                    }
                }
                Ok(Async::NotReady) => {
                    break Async::NotReady;
                }
                Ok(Async::Ready(None)) => {
                    unreachable!("The sender is in self as well, therefore the receiver never \
                                  closes.")
                },
                Err(()) => unreachable!("An unbounded receiver never errors"),
            }
        }
    }
}
pub struct Task<'a, TInEvent: 'a> {
    inner: OccupiedEntry<'a, TaskId, mpsc::UnboundedSender<TInEvent>>,
}
impl<'a, TInEvent> Task<'a, TInEvent> {
    
    
    #[inline]
    pub fn send_event(&mut self, event: TInEvent) {
        
        
        
        let _ = self.inner.get_mut().unbounded_send(event);
    }
    
    #[inline]
    pub fn id(&self) -> TaskId {
        *self.inner.key()
    }
    
    
    
    pub fn close(self) {
        self.inner.remove();
    }
}
impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        f.debug_tuple("Task")
            .field(&self.id())
            .finish()
    }
}
impl<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> Stream for
    HandledNodesTasks<TInEvent, TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>
{
    type Item = HandledNodesEvent<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId>;
    type Error = Void; 
    #[inline]
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        Ok(self.poll().map(Option::Some))
    }
}
#[derive(Debug)]
enum InToExtMessage<TOutEvent, TIntoHandler, TReachErr, THandlerErr, TPeerId> {
    
    NodeReached(TPeerId),
    
    TaskClosed(Result<(), TaskClosedEvent<TReachErr, THandlerErr>>, Option<TIntoHandler>),
    
    NodeEvent(TOutEvent),
}
struct NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
where
    TMuxer: StreamMuxer,
    TIntoHandler: IntoNodeHandler<TPeerId>,
    TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
    
    events_tx: mpsc::UnboundedSender<(InToExtMessage<TOutEvent, TIntoHandler, TReachErr, <TIntoHandler::Handler as NodeHandler>::Error, TPeerId>, TaskId)>,
    
    in_events_rx: stream::Fuse<mpsc::UnboundedReceiver<TInEvent>>,
    
    inner: NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>,
    
    id: TaskId,
}
enum NodeTaskInner<TFut, TMuxer, TIntoHandler, TInEvent, TPeerId>
where
    TMuxer: StreamMuxer,
    TIntoHandler: IntoNodeHandler<TPeerId>,
    TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>>,
{
    
    Future {
        
        future: TFut,
        
        handler: TIntoHandler,
        
        
        
        events_buffer: Vec<TInEvent>,
    },
    
    Node(HandledNode<TMuxer, TIntoHandler::Handler>),
    
    Poisoned,
}
impl<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId> Future for
    NodeTask<TFut, TMuxer, TIntoHandler, TInEvent, TOutEvent, TReachErr, TPeerId>
where
    TMuxer: StreamMuxer,
    TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr>,
    TIntoHandler: IntoNodeHandler<TPeerId>,
    TIntoHandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent>,
{
    type Item = ();
    type Error = ();
    fn poll(&mut self) -> Poll<(), ()> {
        loop {
            match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) {
                
                NodeTaskInner::Future { mut future, handler, mut events_buffer } => {
                    
                    loop {
                        match self.in_events_rx.poll() {
                            Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
                            Ok(Async::Ready(Some(event))) => events_buffer.push(event),
                            Ok(Async::NotReady) => break,
                            Err(_) => unreachable!("An UnboundedReceiver never errors"),
                        }
                    }
                    
                    match future.poll() {
                        Ok(Async::Ready((peer_id, muxer))) => {
                            let mut node = HandledNode::new(muxer, handler.into_handler(&peer_id));
                            let event = InToExtMessage::NodeReached(peer_id);
                            for event in events_buffer {
                                node.inject_event(event);
                            }
                            if self.events_tx.unbounded_send((event, self.id)).is_err() {
                                node.shutdown();
                            }
                            self.inner = NodeTaskInner::Node(node);
                        }
                        Ok(Async::NotReady) => {
                            self.inner = NodeTaskInner::Future { future, handler, events_buffer };
                            return Ok(Async::NotReady);
                        },
                        Err(err) => {
                            
                            let event = InToExtMessage::TaskClosed(Err(TaskClosedEvent::Reach(err)), Some(handler));
                            let _ = self.events_tx.unbounded_send((event, self.id));
                            return Ok(Async::Ready(()));
                        }
                    }
                },
                
                NodeTaskInner::Node(mut node) => {
                    
                    if !self.in_events_rx.is_done() {
                        loop {
                            match self.in_events_rx.poll() {
                                Ok(Async::NotReady) => break,
                                Ok(Async::Ready(Some(event))) => {
                                    node.inject_event(event)
                                },
                                Ok(Async::Ready(None)) => {
                                    
                                    node.shutdown();
                                    break;
                                }
                                Err(()) => unreachable!("An unbounded receiver never errors"),
                            }
                        }
                    }
                    
                    loop {
                        match node.poll() {
                            Ok(Async::NotReady) => {
                                self.inner = NodeTaskInner::Node(node);
                                return Ok(Async::NotReady);
                            },
                            Ok(Async::Ready(Some(event))) => {
                                let event = InToExtMessage::NodeEvent(event);
                                if self.events_tx.unbounded_send((event, self.id)).is_err() {
                                    node.shutdown();
                                }
                            }
                            Ok(Async::Ready(None)) => {
                                let event = InToExtMessage::TaskClosed(Ok(()), None);
                                let _ = self.events_tx.unbounded_send((event, self.id));
                                return Ok(Async::Ready(())); 
                            }
                            Err(err) => {
                                let event = InToExtMessage::TaskClosed(Err(TaskClosedEvent::Node(err)), None);
                                let _ = self.events_tx.unbounded_send((event, self.id));
                                return Ok(Async::Ready(())); 
                            }
                        }
                    }
                },
                
                
                NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier")
            }
        }
    }
}