godrays 0.1.0

Peer Discovery library for Cluster Formation
Documentation
use std::sync::Arc;
use std::time::Duration;
use async_broadcast::{Receiver, Sender};
use async_std::stream::interval;
use async_std::sync::RwLock;
use crossbeam_skiplist::SkipSet;
use libp2p::{identity, noise, tcp, yamux, PeerId, Swarm};
use libp2p::core::transport::ListenerId;
use libp2p::futures::{select, StreamExt};
use crate::errors::GodraysError;
use libp2p::mdns::Event;
use libp2p::mdns::{async_io::Behaviour, Config};
use libp2p::mdns::async_io::AsyncIo;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::SwarmEvent;
use log::{debug, info};
use crate::core::clusteraddr::{ClusterAddresses, DiscoveredAddr};
use crate::errors::*;

#[derive(Clone)]
pub struct GodraysMdns {
    local_key: identity::Keypair,
    local_peer_id: PeerId,
    expected_listener_id: ListenerId,
    pub discovered_nodes: Arc<SkipSet<String>>,
    swarm: Arc<RwLock<Swarm<libp2p::mdns::Behaviour<AsyncIo>>>>
}

impl GodraysMdns {
    pub fn new(local_key: identity::Keypair, tx: Sender<DiscoveredAddr>) -> Result<Self> {
        let local_peer_id = PeerId::from(local_key.public());
        let behavior = Behaviour::new(Config::default(), local_peer_id)
            .map_err(|e| GodraysError::GeneralError(e.to_string()))?;

        let mut swarm: Swarm<libp2p::mdns::Behaviour<AsyncIo>> = libp2p::SwarmBuilder::with_existing_identity(local_key.clone())
            .with_async_std()
            .with_tcp(
                tcp::Config::default(),
                noise::Config::new,
                yamux::Config::default,
            )
            .map_err(|e| GodraysError::GeneralError(e.to_string()))?
            .with_behaviour(|key| behavior)
            .map_err(|e| GodraysError::GeneralError(e.to_string()))?
            .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
            .build();

        // Manually listen on all interfaces because mDNS only works for non-loopback addresses.
        let expected_listener_id = swarm
            .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
            .unwrap();

        Ok(GodraysMdns {
            local_key,
            local_peer_id,
            expected_listener_id,
            discovered_nodes: Arc::new(SkipSet::new()),
            swarm: Arc::new(RwLock::new(swarm))
        })
    }

    ///
    /// Launch MDNS service.
    pub async fn launch(&self) {
        let mut swarm = self.swarm.write().await;
        loop {
            Self::execute(&mut *swarm, self.discovered_nodes.clone()).await;
        }
    }

    pub async fn execute(swarm: &mut Swarm<libp2p::mdns::Behaviour<AsyncIo>>, disctable: Arc<SkipSet<String>>) {
        select! {
            event = swarm.select_next_some() => match event {
                SwarmEvent::Behaviour(Event::Discovered(peers)) => {
                    for (peerid, addr) in peers.into_iter() {
                        let components = addr.iter().collect::<Vec<_>>();
                        match components[0] {
                            Protocol::Ip4(ip4) if !disctable.contains(&ip4.to_string()) => {
                                info!("Appending nodes onto NodeTable with IPv4: {}", ip4.to_string());
                                disctable.insert(ip4.to_string());
                            }
                            Protocol::Ip6(ip6) if !disctable.contains(&ip6.to_string()) => {
                                info!("Appending nodes onto NodeTable with IPv6: {}", ip6.to_string());
                                disctable.insert(ip6.to_string());
                            }
                            _ => {}
                        }
                        info!("{:?} -> {:?}", peerid, addr);
                    }
                },
                SwarmEvent::Behaviour(Event::Expired(expireds)) => {
                    for (peerid, addr) in expireds.into_iter() {
                        let components = addr.iter().collect::<Vec<_>>();
                        match components[0] {
                            Protocol::Ip4(ip4) => {
                                info!("Removing nodes from NodeTable with IPv4: {}", ip4.to_string());
                                disctable.remove(&ip4.to_string());
                            }
                            Protocol::Ip6(ip6) => {
                                info!("Removing nodes from NodeTable with IPv6: {}", ip6.to_string());
                                disctable.remove(&ip6.to_string());
                            }
                            _ => {}
                        }
                        info!("EXPIRED {:?} -> {:?}", peerid, addr);
                    }
                },
                other => {
                    debug!("Unhandled {:?}", other);
                }
            }
        }
    }
}