use crate::protocol::{FloodsubProtocol, FloodsubMessage, FloodsubRpc, FloodsubSubscription, FloodsubSubscriptionAction};
use crate::topic::Topic;
use crate::FloodsubConfig;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
    NetworkBehaviour,
    NetworkBehaviourAction,
    PollParameters,
    ProtocolsHandler,
    OneShotHandler,
    NotifyHandler,
    DialPeerCondition,
};
use rand;
use smallvec::SmallVec;
use std::{collections::VecDeque, iter};
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::task::{Context, Poll};
pub struct Floodsub {
    
    events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
    config: FloodsubConfig,
    
    target_peers: FnvHashSet<PeerId>,
    
    
    
    connected_peers: HashMap<PeerId, SmallVec<[Topic; 8]>>,
    
    
    subscribed_topics: SmallVec<[Topic; 16]>,
    
    
    received: CuckooFilter<DefaultHasher>,
}
impl Floodsub {
    
    pub fn new(local_peer_id: PeerId) -> Self {
        Self::from_config(FloodsubConfig::new(local_peer_id))
    }
    
    pub fn from_config(config: FloodsubConfig) -> Self {
        Floodsub {
            events: VecDeque::new(),
            config,
            target_peers: FnvHashSet::default(),
            connected_peers: HashMap::new(),
            subscribed_topics: SmallVec::new(),
            received: CuckooFilter::new(),
        }
    }
    
    #[inline]
    pub fn add_node_to_partial_view(&mut self, peer_id: PeerId) {
        
        if self.connected_peers.contains_key(&peer_id) {
            for topic in self.subscribed_topics.iter().cloned() {
                self.events.push_back(NetworkBehaviourAction::NotifyHandler {
                    peer_id: peer_id.clone(),
                    handler: NotifyHandler::Any,
                    event: FloodsubRpc {
                        messages: Vec::new(),
                        subscriptions: vec![FloodsubSubscription {
                            topic,
                            action: FloodsubSubscriptionAction::Subscribe,
                        }],
                    },
                });
            }
        }
        if self.target_peers.insert(peer_id.clone()) {
            self.events.push_back(NetworkBehaviourAction::DialPeer {
                peer_id, condition: DialPeerCondition::Disconnected
            });
        }
    }
    
    #[inline]
    pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
        self.target_peers.remove(peer_id);
    }
    
    
    
    pub fn subscribe(&mut self, topic: Topic) -> bool {
        if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
            return false;
        }
        for peer in self.connected_peers.keys() {
            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
                peer_id: peer.clone(),
                handler: NotifyHandler::Any,
                event: FloodsubRpc {
                    messages: Vec::new(),
                    subscriptions: vec![FloodsubSubscription {
                        topic: topic.clone(),
                        action: FloodsubSubscriptionAction::Subscribe,
                    }],
                },
            });
        }
        self.subscribed_topics.push(topic);
        true
    }
    
    
    
    
    
    pub fn unsubscribe(&mut self, topic: Topic) -> bool {
        let pos = match self.subscribed_topics.iter().position(|t| *t == topic) {
            Some(pos) => pos,
            None => return false
        };
        self.subscribed_topics.remove(pos);
        for peer in self.connected_peers.keys() {
            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
                peer_id: peer.clone(),
                handler: NotifyHandler::Any,
                event: FloodsubRpc {
                    messages: Vec::new(),
                    subscriptions: vec![FloodsubSubscription {
                        topic: topic.clone(),
                        action: FloodsubSubscriptionAction::Unsubscribe,
                    }],
                },
            });
        }
        true
    }
    
    pub fn publish(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
        self.publish_many(iter::once(topic), data)
    }
    
    pub fn publish_any(&mut self, topic: impl Into<Topic>, data: impl Into<Vec<u8>>) {
        self.publish_many_any(iter::once(topic), data)
    }
    
    
    
    
    pub fn publish_many(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
        self.publish_many_inner(topic, data, true)
    }
    
    pub fn publish_many_any(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>) {
        self.publish_many_inner(topic, data, false)
    }
    fn publish_many_inner(&mut self, topic: impl IntoIterator<Item = impl Into<Topic>>, data: impl Into<Vec<u8>>, check_self_subscriptions: bool) {
        let message = FloodsubMessage {
            source: self.config.local_peer_id.clone(),
            data: data.into(),
            
            
            
            sequence_number: rand::random::<[u8; 20]>().to_vec(),
            topics: topic.into_iter().map(Into::into).collect(),
        };
        let self_subscribed = self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u));
        if self_subscribed {
            self.received.add(&message);
            if self.config.subscribe_local_messages {
                self.events.push_back(
                    NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Message(message.clone())));
            }
        }
        
        
        if check_self_subscriptions && !self_subscribed {
            return
        }
        
        for (peer_id, sub_topic) in self.connected_peers.iter() {
            if !sub_topic.iter().any(|t| message.topics.iter().any(|u| t == u)) {
                continue;
            }
            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
                peer_id: peer_id.clone(),
                handler: NotifyHandler::Any,
                event: FloodsubRpc {
                    subscriptions: Vec::new(),
                    messages: vec![message.clone()],
                }
            });
        }
    }
}
impl NetworkBehaviour for Floodsub {
    type ProtocolsHandler = OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>;
    type OutEvent = FloodsubEvent;
    fn new_handler(&mut self) -> Self::ProtocolsHandler {
        Default::default()
    }
    fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
        Vec::new()
    }
    fn inject_connected(&mut self, id: &PeerId) {
        
        if self.target_peers.contains(id) {
            for topic in self.subscribed_topics.iter().cloned() {
                self.events.push_back(NetworkBehaviourAction::NotifyHandler {
                    peer_id: id.clone(),
                    handler: NotifyHandler::Any,
                    event: FloodsubRpc {
                        messages: Vec::new(),
                        subscriptions: vec![FloodsubSubscription {
                            topic,
                            action: FloodsubSubscriptionAction::Subscribe,
                        }],
                    },
                });
            }
        }
        self.connected_peers.insert(id.clone(), SmallVec::new());
    }
    fn inject_disconnected(&mut self, id: &PeerId) {
        let was_in = self.connected_peers.remove(id);
        debug_assert!(was_in.is_some());
        
        
        if self.target_peers.contains(id) {
            self.events.push_back(NetworkBehaviourAction::DialPeer {
                peer_id: id.clone(),
                condition: DialPeerCondition::Disconnected
            });
        }
    }
    fn inject_event(
        &mut self,
        propagation_source: PeerId,
        _connection: ConnectionId,
        event: InnerMessage,
    ) {
        
        let event = match event {
            InnerMessage::Rx(event) => event,
            InnerMessage::Sent => return,
        };
        
        for subscription in event.subscriptions {
            let remote_peer_topics = self.connected_peers
                .get_mut(&propagation_source)
                .expect("connected_peers is kept in sync with the peers we are connected to; we are guaranteed to only receive events from connected peers; QED");
            match subscription.action {
                FloodsubSubscriptionAction::Subscribe => {
                    if !remote_peer_topics.contains(&subscription.topic) {
                        remote_peer_topics.push(subscription.topic.clone());
                    }
                    self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Subscribed {
                        peer_id: propagation_source.clone(),
                        topic: subscription.topic,
                    }));
                }
                FloodsubSubscriptionAction::Unsubscribe => {
                    if let Some(pos) = remote_peer_topics.iter().position(|t| t == &subscription.topic ) {
                        remote_peer_topics.remove(pos);
                    }
                    self.events.push_back(NetworkBehaviourAction::GenerateEvent(FloodsubEvent::Unsubscribed {
                        peer_id: propagation_source.clone(),
                        topic: subscription.topic,
                    }));
                }
            }
        }
        
        let mut rpcs_to_dispatch: Vec<(PeerId, FloodsubRpc)> = Vec::new();
        for message in event.messages {
            
            
            if !self.received.test_and_add(&message) {
                continue;
            }
            
            if self.subscribed_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
                let event = FloodsubEvent::Message(message.clone());
                self.events.push_back(NetworkBehaviourAction::GenerateEvent(event));
            }
            
            for (peer_id, subscr_topics) in self.connected_peers.iter() {
                if peer_id == &propagation_source {
                    continue;
                }
                if !subscr_topics.iter().any(|t| message.topics.iter().any(|u| t == u)) {
                    continue;
                }
                if let Some(pos) = rpcs_to_dispatch.iter().position(|(p, _)| p == peer_id) {
                    rpcs_to_dispatch[pos].1.messages.push(message.clone());
                } else {
                    rpcs_to_dispatch.push((peer_id.clone(), FloodsubRpc {
                        subscriptions: Vec::new(),
                        messages: vec![message.clone()],
                    }));
                }
            }
        }
        for (peer_id, rpc) in rpcs_to_dispatch {
            self.events.push_back(NetworkBehaviourAction::NotifyHandler {
                peer_id,
                handler: NotifyHandler::Any,
                event: rpc,
            });
        }
    }
    fn poll(
        &mut self,
        _: &mut Context<'_>,
        _: &mut impl PollParameters,
    ) -> Poll<
        NetworkBehaviourAction<
            <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
            Self::OutEvent,
        >,
    > {
        if let Some(event) = self.events.pop_front() {
            return Poll::Ready(event);
        }
        Poll::Pending
    }
}
pub enum InnerMessage {
    
    Rx(FloodsubRpc),
    
    Sent,
}
impl From<FloodsubRpc> for InnerMessage {
    #[inline]
    fn from(rpc: FloodsubRpc) -> InnerMessage {
        InnerMessage::Rx(rpc)
    }
}
impl From<()> for InnerMessage {
    #[inline]
    fn from(_: ()) -> InnerMessage {
        InnerMessage::Sent
    }
}
#[derive(Debug)]
pub enum FloodsubEvent {
    
    Message(FloodsubMessage),
    
    Subscribed {
        
        peer_id: PeerId,
        
        topic: Topic,
    },
    
    Unsubscribed {
        
        peer_id: PeerId,
        
        topic: Topic,
    },
}