use super::{
error::{Error, Result},
record_store::DiskBackedRecordStore,
SwarmDriver,
};
use crate::{
multiaddr_is_global, multiaddr_strip_p2p, sort_peers_by_address, CLOSE_GROUP_SIZE,
IDENTIFY_AGENT_STR,
};
use itertools::Itertools;
#[cfg(feature = "local-discovery")]
use libp2p::mdns;
use libp2p::{
autonat::{self, NatStatus},
kad::{
GetRecordOk, InboundRequest, Kademlia, KademliaEvent, QueryResult, Record, RecordKey,
K_VALUE,
},
multiaddr::Protocol,
request_response::{self, ResponseChannel as PeerResponseChannel},
swarm::{behaviour::toggle::Toggle, NetworkBehaviour, SwarmEvent},
Multiaddr, PeerId,
};
use sn_protocol::{
messages::{Request, Response},
NetworkAddress,
};
use std::collections::HashSet;
use tokio::sync::oneshot;
use tracing::{info, warn};
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "NodeEvent")]
pub(super) struct NodeBehaviour {
pub(super) request_response: request_response::cbor::Behaviour<Request, Response>,
pub(super) kademlia: Kademlia<DiskBackedRecordStore>,
#[cfg(feature = "local-discovery")]
pub(super) mdns: mdns::tokio::Behaviour,
pub(super) identify: libp2p::identify::Behaviour,
pub(super) autonat: Toggle<autonat::Behaviour>,
}
#[derive(Debug)]
pub(super) enum NodeEvent {
MsgReceived(request_response::Event<Request, Response>),
Kademlia(KademliaEvent),
#[cfg(feature = "local-discovery")]
Mdns(Box<mdns::Event>),
Identify(Box<libp2p::identify::Event>),
Autonat(autonat::Event),
}
impl From<request_response::Event<Request, Response>> for NodeEvent {
fn from(event: request_response::Event<Request, Response>) -> Self {
NodeEvent::MsgReceived(event)
}
}
impl From<KademliaEvent> for NodeEvent {
fn from(event: KademliaEvent) -> Self {
NodeEvent::Kademlia(event)
}
}
#[cfg(feature = "local-discovery")]
impl From<mdns::Event> for NodeEvent {
fn from(event: mdns::Event) -> Self {
NodeEvent::Mdns(Box::new(event))
}
}
impl From<libp2p::identify::Event> for NodeEvent {
fn from(event: libp2p::identify::Event) -> Self {
NodeEvent::Identify(Box::new(event))
}
}
impl From<autonat::Event> for NodeEvent {
fn from(event: autonat::Event) -> Self {
NodeEvent::Autonat(event)
}
}
#[derive(Debug)]
pub enum MsgResponder {
FromSelf(Option<oneshot::Sender<Result<Response>>>),
FromPeer(PeerResponseChannel<Response>),
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum NetworkEvent {
RequestReceived {
req: Request,
channel: MsgResponder,
},
ResponseReceived {
res: Response,
},
PeerAdded(PeerId),
PeerRemoved(PeerId),
CloseGroupUpdated(Vec<PeerId>),
KeysForReplication(Vec<(RecordKey, Option<PeerId>)>),
NewListenAddr(Multiaddr),
NatStatusChanged(NatStatus),
UnverifiedRecord(Record),
}
impl SwarmDriver {
pub(super) fn handle_swarm_events<EventError: std::error::Error>(
&mut self,
event: SwarmEvent<NodeEvent, EventError>,
) -> Result<()> {
match event {
SwarmEvent::Behaviour(NodeEvent::MsgReceived(event)) => {
if let Err(e) = self.handle_msg(event) {
warn!("MsgReceivedError: {e:?}");
}
}
SwarmEvent::Behaviour(NodeEvent::Kademlia(kad_event)) => {
self.handle_kad_event(kad_event)?;
}
SwarmEvent::Behaviour(NodeEvent::Identify(iden)) => {
match *iden {
libp2p::identify::Event::Received { peer_id, info } => {
debug!(%peer_id, ?info, "identify: received info");
if (self.local || self.dialed_peers.contains(&peer_id))
&& info.agent_version.starts_with(IDENTIFY_AGENT_STR)
{
let addrs = match self.local {
true => info.listen_addrs,
false => info
.listen_addrs
.into_iter()
.filter(multiaddr_is_global)
.collect(),
};
let addrs: Vec<_> = addrs
.into_iter()
.map(|addr| multiaddr_strip_p2p(&addr))
.unique()
.collect();
debug!(%peer_id, ?addrs, "identify: adding addresses to routing table");
for multiaddr in addrs.clone() {
let _routing_update = self
.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, multiaddr);
}
if info.protocols.iter().any(|protocol| {
protocol.to_string().starts_with("/libp2p/autonat/")
}) {
let a = &mut self.swarm.behaviour_mut().autonat;
if let Some(autonat) = a.as_mut() {
for multiaddr in addrs {
autonat.add_server(peer_id, Some(multiaddr));
}
}
}
}
}
libp2p::identify::Event::Sent { .. } => trace!("identify: {iden:?}"),
libp2p::identify::Event::Pushed { .. } => trace!("identify: {iden:?}"),
libp2p::identify::Event::Error { .. } => trace!("identify: {iden:?}"),
}
}
#[cfg(feature = "local-discovery")]
SwarmEvent::Behaviour(NodeEvent::Mdns(mdns_event)) => match *mdns_event {
mdns::Event::Discovered(list) => {
if self.local {
for (peer_id, addr) in list {
let addr = addr.with(Protocol::P2p(peer_id));
info!(%addr, "mDNS node discovered and dialing");
if let Err(err) = self.dial(addr.clone()) {
warn!(%addr, "mDNS node dial error: {err:?}");
}
}
}
}
mdns::Event::Expired(peer) => {
debug!("mdns peer {peer:?} expired");
}
},
SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id();
let address = address.with(Protocol::P2p(local_peer_id));
if !self.is_client {
if self.local {
self.swarm.add_external_address(address.clone());
} else {
if multiaddr_is_global(&address) {
self.swarm.add_external_address(address.clone());
}
}
}
self.send_event(NetworkEvent::NewListenAddr(address.clone()));
info!("Local node is listening on {address:?}");
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
..
} => {
debug!(%peer_id, num_established, "ConnectionEstablished: {}", endpoint_str(&endpoint));
if endpoint.is_dialer() {
self.dialed_peers.push(peer_id);
}
}
SwarmEvent::ConnectionClosed {
peer_id,
endpoint,
cause,
num_established,
connection_id,
} => {
debug!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
}
SwarmEvent::OutgoingConnectionError {
peer_id: Some(failed_peer_id),
error,
connection_id,
} => {
error!("OutgoingConnectionError to {failed_peer_id:?} on {connection_id:?} - {error:?}");
if let Some(dead_peer) = self
.swarm
.behaviour_mut()
.kademlia
.remove_peer(&failed_peer_id)
{
self.send_event(NetworkEvent::PeerRemoved(*dead_peer.node.key.preimage()));
self.log_kbuckets(&failed_peer_id);
if let Some(new_members) = self.check_for_change_in_our_close_group() {
self.send_event(NetworkEvent::CloseGroupUpdated(new_members));
}
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing {
peer_id,
connection_id,
} => trace!("Dialing {peer_id:?} on {connection_id:?}"),
SwarmEvent::Behaviour(NodeEvent::Autonat(event)) => match event {
autonat::Event::InboundProbe(e) => debug!("AutoNAT inbound probe: {e:?}"),
autonat::Event::OutboundProbe(e) => debug!("AutoNAT outbound probe: {e:?}"),
autonat::Event::StatusChanged { old, new } => {
info!("AutoNAT status changed: {old:?} -> {new:?}");
self.send_event(NetworkEvent::NatStatusChanged(new.clone()));
match new {
NatStatus::Public(_addr) => {
}
NatStatus::Private => {
}
NatStatus::Unknown => {}
};
}
},
other => debug!("SwarmEvent has been ignored: {other:?}"),
}
Ok(())
}
fn handle_kad_event(&mut self, kad_event: KademliaEvent) -> Result<()> {
match kad_event {
ref event @ KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetClosestPeers(Ok(ref closest_peers)),
ref stats,
ref step,
} => {
trace!(
"Query task {id:?} returned with peers {closest_peers:?}, {stats:?} - {step:?}"
);
let (sender, mut current_closest) =
self.pending_get_closest_peers.remove(&id).ok_or_else(|| {
trace!("Can't locate query task {id:?}, shall be completed already.");
Error::ReceivedKademliaEventDropped(event.clone())
})?;
let new_peers: HashSet<PeerId> = closest_peers.peers.clone().into_iter().collect();
current_closest.extend(new_peers);
if current_closest.len() >= usize::from(K_VALUE) || step.last {
sender
.send(current_closest)
.map_err(|_| Error::InternalMsgChannelDropped)?;
} else {
let _ = self
.pending_get_closest_peers
.insert(id, (sender, current_closest));
}
}
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(peer_record))),
stats,
step,
} => {
if let Some(sender) = self.pending_query.remove(&id) {
trace!(
"Query task {id:?} returned with record {:?} from peer {:?}, {stats:?} - {step:?}",
peer_record.record.key,
peer_record.peer
);
sender
.send(Ok(peer_record.record))
.map_err(|_| Error::InternalMsgChannelDropped)?;
}
}
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetRecord(Err(err)),
stats,
step,
} => {
warn!("Query task {id:?} failed to get record with error: {err:?}, {stats:?} - {step:?}");
if step.last {
if let Some(sender) = self.pending_query.remove(&id) {
sender
.send(Err(Error::RecordNotFound))
.map_err(|_| Error::InternalMsgChannelDropped)?;
}
}
}
KademliaEvent::RoutingUpdated {
peer,
is_new_peer,
old_peer,
..
} => {
if is_new_peer {
self.log_kbuckets(&peer);
self.send_event(NetworkEvent::PeerAdded(peer));
let connected_peers = self.swarm.connected_peers().count();
info!("Connected peers: {connected_peers}");
}
if old_peer.is_some() {
info!("Evicted old peer on new peer join: {old_peer:?}");
self.send_event(NetworkEvent::PeerRemoved(peer));
self.log_kbuckets(&peer);
}
if let Some(new_members) = self.check_for_change_in_our_close_group() {
self.send_event(NetworkEvent::CloseGroupUpdated(new_members));
}
}
KademliaEvent::InboundRequest {
request: InboundRequest::PutRecord { .. },
} => {
}
KademliaEvent::InboundRequest {
request:
InboundRequest::GetRecord {
num_closer_peers,
present_locally,
},
} => {
if !present_locally && num_closer_peers < CLOSE_GROUP_SIZE {
trace!("InboundRequest::GetRecord doesn't have local record, with {num_closer_peers:?} closer_peers");
}
}
other => {
trace!("KademliaEvent ignored: {other:?}");
}
}
Ok(())
}
pub(super) fn get_all_local_peers(&mut self) -> Vec<PeerId> {
let mut all_peers: Vec<PeerId> = vec![];
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
for entry in kbucket.iter() {
all_peers.push(entry.node.key.clone().into_preimage());
}
}
all_peers.push(self.self_peer_id);
all_peers
}
fn check_for_change_in_our_close_group(&mut self) -> Option<Vec<PeerId>> {
let new_closest_peers = {
let all_peers = self.get_all_local_peers();
sort_peers_by_address(
all_peers,
&NetworkAddress::from_peer(self.self_peer_id),
CLOSE_GROUP_SIZE + 1,
)
.ok()?
};
let old = self.close_group.iter().cloned().collect::<HashSet<_>>();
let new_members = new_closest_peers
.iter()
.filter(|p| !old.contains(p))
.cloned()
.collect::<Vec<_>>();
if !new_members.is_empty() {
debug!("The close group has been updated. The new members are {new_members:?}");
self.close_group = new_closest_peers;
Some(new_members)
} else {
None
}
}
fn log_kbuckets(&mut self, peer: &PeerId) {
let distance = NetworkAddress::from_peer(self.self_peer_id)
.distance(&NetworkAddress::from_peer(*peer));
info!("Peer {peer:?} has a {:?} distance to us", distance.ilog2());
let mut kbucket_table_stats = vec![];
let mut index = 0;
let mut total_peers = 0;
for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() {
let range = kbucket.range();
total_peers += kbucket.num_entries();
if let Some(distance) = range.0.ilog2() {
kbucket_table_stats.push((index, kbucket.num_entries(), distance));
} else {
error!("bucket #{index:?} is ourself ???!!!");
}
index += 1;
}
info!("kBucketTable has {index:?} kbuckets {total_peers:?} peers, {kbucket_table_stats:?}");
}
}
fn endpoint_str(endpoint: &libp2p::core::ConnectedPoint) -> String {
match endpoint {
libp2p::core::ConnectedPoint::Dialer { address, .. } => {
format!("outgoing ({address})")
}
libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => {
format!("incoming ({send_back_addr})")
}
}
}