use std::sync::Arc;
use std::time::Duration;
use async_broadcast::{Receiver, Sender};
use async_std::stream::interval;
use async_std::sync::RwLock;
use crossbeam_skiplist::SkipSet;
use libp2p::{identity, noise, tcp, yamux, PeerId, Swarm};
use libp2p::core::transport::ListenerId;
use libp2p::futures::{select, StreamExt};
use crate::errors::GodraysError;
use libp2p::mdns::Event;
use libp2p::mdns::{async_io::Behaviour, Config};
use libp2p::mdns::async_io::AsyncIo;
use libp2p::multiaddr::Protocol;
use libp2p::swarm::SwarmEvent;
use log::{debug, info};
use crate::core::clusteraddr::{ClusterAddresses, DiscoveredAddr};
use crate::errors::*;
#[derive(Clone)]
pub struct GodraysMdns {
local_key: identity::Keypair,
local_peer_id: PeerId,
expected_listener_id: ListenerId,
pub discovered_nodes: Arc<SkipSet<String>>,
swarm: Arc<RwLock<Swarm<libp2p::mdns::Behaviour<AsyncIo>>>>
}
impl GodraysMdns {
pub fn new(local_key: identity::Keypair, tx: Sender<DiscoveredAddr>) -> Result<Self> {
let local_peer_id = PeerId::from(local_key.public());
let behavior = Behaviour::new(Config::default(), local_peer_id)
.map_err(|e| GodraysError::GeneralError(e.to_string()))?;
let mut swarm: Swarm<libp2p::mdns::Behaviour<AsyncIo>> = libp2p::SwarmBuilder::with_existing_identity(local_key.clone())
.with_async_std()
.with_tcp(
tcp::Config::default(),
noise::Config::new,
yamux::Config::default,
)
.map_err(|e| GodraysError::GeneralError(e.to_string()))?
.with_behaviour(|key| behavior)
.map_err(|e| GodraysError::GeneralError(e.to_string()))?
.with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(5)))
.build();
let expected_listener_id = swarm
.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap())
.unwrap();
Ok(GodraysMdns {
local_key,
local_peer_id,
expected_listener_id,
discovered_nodes: Arc::new(SkipSet::new()),
swarm: Arc::new(RwLock::new(swarm))
})
}
pub async fn launch(&self) {
let mut swarm = self.swarm.write().await;
loop {
Self::execute(&mut *swarm, self.discovered_nodes.clone()).await;
}
}
pub async fn execute(swarm: &mut Swarm<libp2p::mdns::Behaviour<AsyncIo>>, disctable: Arc<SkipSet<String>>) {
select! {
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(Event::Discovered(peers)) => {
for (peerid, addr) in peers.into_iter() {
let components = addr.iter().collect::<Vec<_>>();
match components[0] {
Protocol::Ip4(ip4) if !disctable.contains(&ip4.to_string()) => {
info!("Appending nodes onto NodeTable with IPv4: {}", ip4.to_string());
disctable.insert(ip4.to_string());
}
Protocol::Ip6(ip6) if !disctable.contains(&ip6.to_string()) => {
info!("Appending nodes onto NodeTable with IPv6: {}", ip6.to_string());
disctable.insert(ip6.to_string());
}
_ => {}
}
info!("{:?} -> {:?}", peerid, addr);
}
},
SwarmEvent::Behaviour(Event::Expired(expireds)) => {
for (peerid, addr) in expireds.into_iter() {
let components = addr.iter().collect::<Vec<_>>();
match components[0] {
Protocol::Ip4(ip4) => {
info!("Removing nodes from NodeTable with IPv4: {}", ip4.to_string());
disctable.remove(&ip4.to_string());
}
Protocol::Ip6(ip6) => {
info!("Removing nodes from NodeTable with IPv6: {}", ip6.to_string());
disctable.remove(&ip6.to_string());
}
_ => {}
}
info!("EXPIRED {:?} -> {:?}", peerid, addr);
}
},
other => {
debug!("Unhandled {:?}", other);
}
}
}
}
}