use async_broadcast::TrySendError;
use futures::channel::mpsc::{self as channel};
use futures::stream::{FusedStream, Stream};
use libp2p::gossipsub::PublishError;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tracing::debug;
use libp2p::core::{Endpoint, Multiaddr};
use libp2p::identity::PeerId;
use libp2p::gossipsub::{
    Behaviour as Gossipsub, Event as GossipsubEvent, IdentTopic as Topic,
    Message as GossipsubMessage, MessageId, TopicHash,
};
use libp2p::swarm::{
    ConnectionDenied, ConnectionId, NetworkBehaviour, THandler, THandlerInEvent, ToSwarm,
};
pub struct GossipsubStream {
    streams: HashMap<TopicHash, async_broadcast::Sender<GossipsubMessage>>,
    active_streams: HashMap<TopicHash, Arc<AtomicUsize>>,
    gossipsub: Gossipsub,
    unsubscriptions: (
        channel::UnboundedSender<TopicHash>,
        channel::UnboundedReceiver<TopicHash>,
    ),
}
impl core::ops::Deref for GossipsubStream {
    type Target = Gossipsub;
    fn deref(&self) -> &Self::Target {
        &self.gossipsub
    }
}
impl core::ops::DerefMut for GossipsubStream {
    fn deref_mut(&mut self) -> &mut Gossipsub {
        &mut self.gossipsub
    }
}
pub struct SubscriptionStream {
    on_drop: Option<channel::UnboundedSender<TopicHash>>,
    topic: Option<TopicHash>,
    inner: async_broadcast::Receiver<GossipsubMessage>,
    counter: Arc<AtomicUsize>,
}
impl Clone for SubscriptionStream {
    fn clone(&self) -> Self {
        self.counter.fetch_add(1, Ordering::SeqCst);
        Self {
            on_drop: self.on_drop.clone(),
            topic: self.topic.clone(),
            inner: self.inner.clone(),
            counter: self.counter.clone(),
        }
    }
}
impl Drop for SubscriptionStream {
    fn drop(&mut self) {
        if self.counter.load(Ordering::SeqCst) == 1 {
            if let Some(sender) = self.on_drop.take() {
                if let Some(topic) = self.topic.take() {
                    let _ = sender.unbounded_send(topic);
                }
            }
        } else {
            self.counter.fetch_sub(1, Ordering::SeqCst);
        }
    }
}
impl fmt::Debug for SubscriptionStream {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        if let Some(topic) = self.topic.as_ref() {
            write!(
                fmt,
                "SubscriptionStream {{ topic: {:?}, is_terminated: {} }}",
                topic,
                self.is_terminated()
            )
        } else {
            write!(
                fmt,
                "SubscriptionStream {{ is_terminated: {} }}",
                self.is_terminated()
            )
        }
    }
}
impl Stream for SubscriptionStream {
    type Item = GossipsubMessage;
    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
        use futures::stream::StreamExt;
        let inner = &mut self.as_mut().inner;
        match inner.poll_next_unpin(ctx) {
            Poll::Ready(None) => {
                self.on_drop.take();
                Poll::Ready(None)
            }
            other => other,
        }
    }
}
impl FusedStream for SubscriptionStream {
    fn is_terminated(&self) -> bool {
        self.on_drop.is_none()
    }
}
impl From<Gossipsub> for GossipsubStream {
    fn from(gossipsub: Gossipsub) -> Self {
        let (tx, rx) = channel::unbounded();
        GossipsubStream {
            streams: HashMap::new(),
            gossipsub,
            unsubscriptions: (tx, rx),
            active_streams: Default::default(),
        }
    }
}
impl GossipsubStream {
    pub fn subscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<SubscriptionStream> {
        use std::collections::hash_map::Entry;
        let topic = Topic::new(topic);
        match self.streams.entry(topic.hash()) {
            Entry::Vacant(ve) => {
                match self.gossipsub.subscribe(&topic) {
                    Ok(true) => {
                        let counter = Arc::new(AtomicUsize::new(1));
                        self.active_streams
                            .insert(topic.hash(), Arc::clone(&counter));
                        let (tx, rx) = async_broadcast::broadcast(15000);
                        let key = ve.key().clone();
                        ve.insert(tx);
                        Ok(SubscriptionStream {
                            on_drop: Some(self.unsubscriptions.0.clone()),
                            topic: Some(key),
                            inner: rx,
                            counter,
                        })
                    }
                    Ok(false) => anyhow::bail!("Already subscribed to topic; shouldnt reach this"),
                    Err(e) => {
                        debug!("{}", e); Err(anyhow::Error::from(e))
                    }
                }
            }
            Entry::Occupied(entry) => {
                let rx = entry.get().clone().new_receiver();
                let key = entry.key().clone();
                let counter = self
                    .active_streams
                    .get(&key)
                    .cloned()
                    .ok_or(anyhow::anyhow!("No active stream"))?;
                counter.fetch_add(1, Ordering::SeqCst);
                Ok(SubscriptionStream {
                    on_drop: Some(self.unsubscriptions.0.clone()),
                    topic: Some(key),
                    inner: rx,
                    counter,
                })
            }
        }
    }
    pub fn unsubscribe(&mut self, topic: impl Into<String>) -> anyhow::Result<bool> {
        let topic = Topic::new(topic.into());
        if let Some(sender) = self.streams.remove(&topic.hash()) {
            sender.close();
            self.active_streams.remove(&topic.hash());
            Ok(self.gossipsub.unsubscribe(&topic)?)
        } else {
            anyhow::bail!("Unable to unsubscribe from topic.")
        }
    }
    pub fn publish(
        &mut self,
        topic: impl Into<String>,
        data: impl Into<Vec<u8>>,
    ) -> Result<MessageId, PublishError> {
        self.gossipsub.publish(Topic::new(topic), data)
    }
    pub fn known_peers(&self) -> Vec<PeerId> {
        self.all_peers().map(|(peer, _)| *peer).collect()
    }
    pub fn subscribed_peers(&self, topic: &str) -> Vec<PeerId> {
        let topic = Topic::new(topic);
        self.all_peers()
            .filter(|(_, list)| list.contains(&&topic.hash()))
            .map(|(peer_id, _)| *peer_id)
            .collect()
    }
    pub fn subscribed_topics(&self) -> Vec<String> {
        self.streams.keys().map(|t| t.to_string()).collect()
    }
}
impl NetworkBehaviour for GossipsubStream {
    type ConnectionHandler = <Gossipsub as NetworkBehaviour>::ConnectionHandler;
    type ToSwarm = GossipsubEvent;
    fn handle_pending_outbound_connection(
        &mut self,
        connection_id: ConnectionId,
        maybe_peer: Option<PeerId>,
        addresses: &[Multiaddr],
        effective_role: Endpoint,
    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
        self.gossipsub.handle_pending_outbound_connection(
            connection_id,
            maybe_peer,
            addresses,
            effective_role,
        )
    }
    fn on_swarm_event(&mut self, event: libp2p::swarm::FromSwarm) {
        self.gossipsub.on_swarm_event(event)
    }
    fn on_connection_handler_event(
        &mut self,
        peer_id: PeerId,
        connection_id: libp2p::swarm::ConnectionId,
        event: libp2p::swarm::THandlerOutEvent<Self>,
    ) {
        self.gossipsub
            .on_connection_handler_event(peer_id, connection_id, event)
    }
    fn handle_established_inbound_connection(
        &mut self,
        connection_id: ConnectionId,
        peer: PeerId,
        local_addr: &Multiaddr,
        remote_addr: &Multiaddr,
    ) -> Result<THandler<Self>, ConnectionDenied> {
        self.gossipsub.handle_established_inbound_connection(
            connection_id,
            peer,
            local_addr,
            remote_addr,
        )
    }
    fn handle_established_outbound_connection(
        &mut self,
        connection_id: ConnectionId,
        peer: PeerId,
        addr: &Multiaddr,
        role_override: Endpoint,
    ) -> Result<THandler<Self>, ConnectionDenied> {
        self.gossipsub.handle_established_outbound_connection(
            connection_id,
            peer,
            addr,
            role_override,
        )
    }
    fn poll(
        &mut self,
        ctx: &mut Context,
    ) -> Poll<ToSwarm<libp2p::gossipsub::Event, THandlerInEvent<Self>>> {
        use futures::stream::StreamExt;
        use std::collections::hash_map::Entry;
        loop {
            match self.unsubscriptions.1.poll_next_unpin(ctx) {
                Poll::Ready(Some(dropped)) => {
                    if let Some(sender) = self.streams.remove(&dropped) {
                        sender.close();
                        debug!("unsubscribing via drop from {:?}", dropped);
                        assert!(
                            self.gossipsub
                                .unsubscribe(&Topic::new(dropped.to_string()))
                                .unwrap_or_default(),
                            "Failed to unsubscribe a dropped subscription"
                        );
                        self.active_streams.remove(&dropped);
                    }
                }
                Poll::Ready(None) => unreachable!("we own the sender"),
                Poll::Pending => break,
            }
        }
        loop {
            match futures::ready!(self.gossipsub.poll(ctx)) {
                ToSwarm::GenerateEvent(GossipsubEvent::Message { message, .. }) => {
                    let topic = message.topic.clone();
                    if let Entry::Occupied(oe) = self.streams.entry(topic) {
                        if let Err(TrySendError::Closed(_)) =
                            oe.get().try_broadcast(message.clone())
                        {
                            let (topic, _) = oe.remove_entry();
                            debug!("unsubscribing via SendError from {:?}", &topic);
                            assert!(
                                self.gossipsub
                                    .unsubscribe(&Topic::new(topic.to_string()))
                                    .unwrap_or_default(),
                                "Failed to unsubscribe following SendError"
                            );
                            self.active_streams.remove(&topic);
                        }
                    }
                    continue;
                }
                ToSwarm::GenerateEvent(GossipsubEvent::Subscribed { peer_id, topic }) => {
                    return Poll::Ready(ToSwarm::GenerateEvent(GossipsubEvent::Subscribed {
                        peer_id,
                        topic,
                    }));
                }
                ToSwarm::GenerateEvent(GossipsubEvent::Unsubscribed { peer_id, topic }) => {
                    return Poll::Ready(ToSwarm::GenerateEvent(GossipsubEvent::Unsubscribed {
                        peer_id,
                        topic,
                    }));
                }
                action => {
                    return Poll::Ready(action);
                }
            }
        }
    }
}