use std::{
collections::VecDeque,
task::{Context, Poll},
};
use derive_more::From;
use libp2p::{
core::PublicKey,
gossipsub::{Gossipsub, GossipsubEvent},
identify::{Identify, IdentifyEvent},
kad::{record::store::MemoryStore, Kademlia, KademliaEvent},
request_response::{RequestResponse, RequestResponseEvent},
swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters},
Multiaddr, NetworkBehaviour, PeerId,
};
use tracing::{info, trace, warn};
use super::{
gossip::{self, TOPIC},
one_way_messaging, peer_discovery, Config, GossipMessage, OneWayCodec, OneWayOutgoingMessage,
};
use crate::{components::chainspec_loader::Chainspec, types::NodeId};
#[derive(Debug, From)]
pub(super) enum SwarmBehaviorEvent {
OneWayMessaging(RequestResponseEvent<Vec<u8>, ()>),
Gossiper(GossipsubEvent),
Kademlia(KademliaEvent),
Identify(IdentifyEvent),
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "SwarmBehaviorEvent", poll_method = "custom_poll")]
pub(super) struct Behavior {
one_way_message_behavior: RequestResponse<OneWayCodec>,
gossip_behavior: Gossipsub,
kademlia_behavior: Kademlia<MemoryStore>,
identify_behavior: Identify,
#[behaviour(ignore)]
our_id: NodeId,
#[behaviour(ignore)]
events: VecDeque<SwarmBehaviorEvent>,
}
impl Behavior {
pub(super) fn new(config: &Config, chainspec: &Chainspec, our_public_key: PublicKey) -> Self {
let one_way_message_behavior = one_way_messaging::new_behavior(config, chainspec);
let gossip_behavior = gossip::new_behavior(config, chainspec, our_public_key.clone());
let (kademlia_behavior, identify_behavior) =
peer_discovery::new_behaviors(config, chainspec, our_public_key.clone());
Behavior {
one_way_message_behavior,
gossip_behavior,
kademlia_behavior,
identify_behavior,
our_id: NodeId::P2p(PeerId::from(our_public_key)),
events: VecDeque::new(),
}
}
pub(super) fn send_one_way_message(&mut self, outgoing_message: OneWayOutgoingMessage) {
let request_id = self
.one_way_message_behavior
.send_request(&outgoing_message.destination, outgoing_message.message);
trace!("{}: sent one-way message {}", self.our_id, request_id);
}
pub(super) fn add_discovered_peer(
&mut self,
peer_id: &PeerId,
listening_addresses: Vec<Multiaddr>,
) {
let should_bootstrap = self
.kademlia_behavior
.kbuckets()
.map(|k_bucket| k_bucket.num_entries())
.sum::<usize>()
== 1;
for address in listening_addresses {
self.kademlia_behavior.add_address(peer_id, address);
}
if should_bootstrap {
info!("{}: bootstrapping kademlia", self.our_id);
if self.kademlia_behavior.bootstrap().is_err() {
warn!(
"{}: could not bootstrap kademlia due to lost connection leaving no peers",
self.our_id
)
}
}
}
pub(super) fn discover_peers(&mut self) {
let random_address = PeerId::random();
let query_id = self
.kademlia_behavior
.get_closest_peers(random_address.clone());
info!(
"{}: random kademlia lookup for peers closest to {} with {:?}",
self.our_id, random_address, query_id
);
}
pub(super) fn gossip(&mut self, message: GossipMessage) {
if let Err(error) = self.gossip_behavior.publish(&*TOPIC, message) {
warn!(?error, "{}: failed to gossip new message", self.our_id);
}
}
fn custom_poll<T>(
&mut self,
_context: &mut Context,
_parameterss: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<T, SwarmBehaviorEvent>> {
if let Some(event) = self.events.pop_back() {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
} else {
Poll::Pending
}
}
}
impl NetworkBehaviourEventProcess<RequestResponseEvent<Vec<u8>, ()>> for Behavior {
fn inject_event(&mut self, event: RequestResponseEvent<Vec<u8>, ()>) {
self.events.push_front(SwarmBehaviorEvent::from(event));
}
}
impl NetworkBehaviourEventProcess<GossipsubEvent> for Behavior {
fn inject_event(&mut self, event: GossipsubEvent) {
self.events.push_front(SwarmBehaviorEvent::from(event));
}
}
impl NetworkBehaviourEventProcess<KademliaEvent> for Behavior {
fn inject_event(&mut self, event: KademliaEvent) {
self.events.push_front(SwarmBehaviorEvent::from(event));
}
}
impl NetworkBehaviourEventProcess<IdentifyEvent> for Behavior {
fn inject_event(&mut self, event: IdentifyEvent) {
self.events.push_front(SwarmBehaviorEvent::from(event));
}
}