use std::{
collections::{HashMap, VecDeque},
task::{Context, Poll},
};
use libp2p::{
core::{connection::ConnectionId, ConnectedPoint},
swarm::{IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters},
Multiaddr, PeerId,
};
use log::debug;
use super::{
event::{IotaGossipEvent, IotaGossipHandlerEvent},
handler::{GossipProtocolHandler, IotaGossipHandlerInEvent},
id::IotaGossipIdentifier,
};
use crate::{alias, init::global::network_id, network::origin::Origin};
const IOTA_GOSSIP_NAME: &str = "iota-gossip";
const IOTA_GOSSIP_VERSION: &str = "1.0.0";
type GossipBehaviourAction = NetworkBehaviourAction<IotaGossipEvent, GossipProtocolHandler, IotaGossipHandlerInEvent>;
struct ConnectionInfo {
addr: Multiaddr,
origin: Origin,
}
pub struct IotaGossipProtocol {
id: IotaGossipIdentifier,
num_handlers: usize,
num_inbounds: usize,
num_outbounds: usize,
events: VecDeque<GossipBehaviourAction>,
peers: HashMap<PeerId, ConnectionInfo>,
}
impl IotaGossipProtocol {
pub fn new() -> Self {
Self::default()
}
}
impl Default for IotaGossipProtocol {
fn default() -> Self {
Self {
id: IotaGossipIdentifier::new(IOTA_GOSSIP_NAME, network_id(), IOTA_GOSSIP_VERSION),
num_handlers: 0,
num_inbounds: 0,
num_outbounds: 0,
events: VecDeque::with_capacity(16),
peers: HashMap::with_capacity(8),
}
}
}
impl NetworkBehaviour for IotaGossipProtocol {
type ConnectionHandler = GossipProtocolHandler;
type OutEvent = IotaGossipEvent;
fn new_handler(&mut self) -> Self::ConnectionHandler {
self.num_handlers += 1;
debug!("gossip protocol: new handler ({}).", self.num_handlers);
GossipProtocolHandler::new(self.id.clone())
}
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
let addrs = self
.peers
.get(peer_id)
.map_or(Vec::new(), |conn_info| vec![conn_info.addr.clone()]);
debug!("gossip protocol: addresses of peer {}: {:?}.", alias!(peer_id), addrs);
addrs
}
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
conn_id: &ConnectionId,
endpoint: &ConnectedPoint,
_failed_addresses: Option<&Vec<Multiaddr>>,
_other_established: usize,
) {
let (peer_addr, origin) = match endpoint {
ConnectedPoint::Dialer {
address,
role_override: _,
} => (address.clone(), Origin::Outbound),
ConnectedPoint::Listener { send_back_addr, .. } => (send_back_addr.clone(), Origin::Inbound),
};
match origin {
Origin::Inbound => self.num_inbounds += 1,
Origin::Outbound => self.num_outbounds += 1,
}
debug!(
"gossip protocol: connection established: inbound/outbound: {}/{}",
self.num_inbounds, self.num_outbounds
);
self.peers.insert(*peer_id, {
ConnectionInfo {
addr: peer_addr,
origin,
}
});
let handler_event = IotaGossipHandlerInEvent { origin };
let notify_handler = NetworkBehaviourAction::NotifyHandler {
peer_id: *peer_id,
handler: NotifyHandler::One(*conn_id), event: handler_event,
};
self.events.push_back(notify_handler);
}
fn inject_event(&mut self, peer_id: PeerId, _: ConnectionId, event: IotaGossipHandlerEvent) {
debug!("gossip protocol: handler event: {:?}", event);
let ev = match event {
IotaGossipHandlerEvent::SentUpgradeRequest { to } => {
NetworkBehaviourAction::GenerateEvent(IotaGossipEvent::SentUpgradeRequest { to })
}
IotaGossipHandlerEvent::UpgradeCompleted { substream } => {
if let Some(conn_info) = self.peers.remove(&peer_id) {
NetworkBehaviourAction::GenerateEvent(IotaGossipEvent::UpgradeCompleted {
peer_id,
peer_addr: conn_info.addr,
origin: conn_info.origin,
substream,
})
} else {
return;
}
}
IotaGossipHandlerEvent::UpgradeError { peer_id, error } => {
NetworkBehaviourAction::GenerateEvent(IotaGossipEvent::UpgradeError { peer_id, error })
}
_ => return,
};
self.events.push_back(ev);
}
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
_: &ConnectedPoint,
_: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
_remaining_established: usize,
) {
debug!("gossip behaviour: connection with {} closed.", alias!(peer_id));
}
fn inject_address_change(
&mut self,
peer_id: &PeerId,
_: &ConnectionId,
_old: &ConnectedPoint,
_new: &ConnectedPoint,
) {
debug!("gossip behaviour: address of {} changed.", alias!(peer_id));
}
fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll<GossipBehaviourAction> {
if let Some(event) = self.events.pop_front() {
Poll::Ready(event)
} else {
Poll::Pending
}
}
}