use crate::{
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
event::DhtEvent,
peer_info,
peer_store::PeerStoreHandle,
protocol::{CustomMessageOutcome, NotificationsSink, Protocol},
request_responses::{self, IfDisconnected, ProtocolConfig, RequestFailure},
types::ProtocolName,
ReputationChange,
};
use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{
core::Multiaddr, identify::Info as IdentifyInfo, identity::PublicKey, kad::RecordKey,
swarm::NetworkBehaviour, PeerId,
};
use parking_lot::Mutex;
use sc_network_common::role::{ObservedRole, Roles};
use sp_runtime::traits::Block as BlockT;
use std::{collections::HashSet, sync::Arc, time::Duration};
pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, ResponseFailure};
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut")]
pub struct Behaviour<B: BlockT> {
substrate: Protocol<B>,
peer_info: peer_info::PeerInfoBehaviour,
discovery: DiscoveryBehaviour,
request_responses: request_responses::RequestResponsesBehaviour,
}
pub enum BehaviourOut {
RandomKademliaStarted,
InboundRequest {
peer: PeerId,
protocol: ProtocolName,
result: Result<Duration, ResponseFailure>,
},
RequestFinished {
peer: PeerId,
protocol: ProtocolName,
duration: Duration,
result: Result<(), RequestFailure>,
},
ReputationChanges { peer: PeerId, changes: Vec<ReputationChange> },
NotificationStreamOpened {
remote: PeerId,
protocol: ProtocolName,
negotiated_fallback: Option<ProtocolName>,
notifications_sink: NotificationsSink,
role: ObservedRole,
received_handshake: Vec<u8>,
},
NotificationStreamReplaced {
remote: PeerId,
protocol: ProtocolName,
notifications_sink: NotificationsSink,
},
NotificationStreamClosed {
remote: PeerId,
protocol: ProtocolName,
},
NotificationsReceived {
remote: PeerId,
messages: Vec<(ProtocolName, Bytes)>,
},
PeerIdentify {
peer_id: PeerId,
info: IdentifyInfo,
},
Discovered(PeerId),
Dht(DhtEvent, Duration),
None,
}
impl<B: BlockT> Behaviour<B> {
pub fn new(
substrate: Protocol<B>,
user_agent: String,
local_public_key: PublicKey,
disco_config: DiscoveryConfig,
request_response_protocols: Vec<ProtocolConfig>,
peer_store_handle: PeerStoreHandle,
external_addresses: Arc<Mutex<HashSet<Multiaddr>>>,
) -> Result<Self, request_responses::RegisterError> {
Ok(Self {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(
user_agent,
local_public_key,
external_addresses,
),
discovery: disco_config.finish(),
request_responses: request_responses::RequestResponsesBehaviour::new(
request_response_protocols.into_iter(),
Box::new(peer_store_handle),
)?,
})
}
pub fn known_peers(&mut self) -> HashSet<PeerId> {
self.discovery.known_peers()
}
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.discovery.add_known_address(peer_id, addr)
}
pub fn num_entries_per_kbucket(&mut self) -> Option<Vec<(u32, usize)>> {
self.discovery.num_entries_per_kbucket()
}
pub fn num_kademlia_records(&mut self) -> Option<usize> {
self.discovery.num_kademlia_records()
}
pub fn kademlia_records_total_size(&mut self) -> Option<usize> {
self.discovery.kademlia_records_total_size()
}
pub fn node(&self, peer_id: &PeerId) -> Option<peer_info::Node> {
self.peer_info.node(peer_id)
}
pub fn send_request(
&mut self,
target: &PeerId,
protocol: &str,
request: Vec<u8>,
pending_response: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
connect: IfDisconnected,
) {
self.request_responses
.send_request(target, protocol, request, pending_response, connect)
}
pub fn user_protocol(&self) -> &Protocol<B> {
&self.substrate
}
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B> {
&mut self.substrate
}
pub fn add_self_reported_address_to_dht(
&mut self,
peer_id: &PeerId,
supported_protocols: &[impl AsRef<[u8]>],
addr: Multiaddr,
) {
self.discovery.add_self_reported_address(peer_id, supported_protocols, addr);
}
pub fn get_value(&mut self, key: RecordKey) {
self.discovery.get_value(key);
}
pub fn put_value(&mut self, key: RecordKey, value: Vec<u8>) {
self.discovery.put_value(key, value);
}
}
fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole {
if roles.is_authority() {
ObservedRole::Authority
} else if roles.is_full() {
ObservedRole::Full
} else {
ObservedRole::Light
}
}
impl From<CustomMessageOutcome> for BehaviourOut {
fn from(event: CustomMessageOutcome) -> Self {
match event {
CustomMessageOutcome::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
roles,
received_handshake,
notifications_sink,
} => BehaviourOut::NotificationStreamOpened {
remote,
protocol,
negotiated_fallback,
role: reported_roles_to_observed_role(roles),
received_handshake,
notifications_sink,
},
CustomMessageOutcome::NotificationStreamReplaced {
remote,
protocol,
notifications_sink,
} => BehaviourOut::NotificationStreamReplaced { remote, protocol, notifications_sink },
CustomMessageOutcome::NotificationStreamClosed { remote, protocol } =>
BehaviourOut::NotificationStreamClosed { remote, protocol },
CustomMessageOutcome::NotificationsReceived { remote, messages } =>
BehaviourOut::NotificationsReceived { remote, messages },
CustomMessageOutcome::None => BehaviourOut::None,
}
}
}
impl From<request_responses::Event> for BehaviourOut {
fn from(event: request_responses::Event) -> Self {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } =>
BehaviourOut::InboundRequest { peer, protocol, result },
request_responses::Event::RequestFinished { peer, protocol, duration, result } =>
BehaviourOut::RequestFinished { peer, protocol, duration, result },
request_responses::Event::ReputationChanges { peer, changes } =>
BehaviourOut::ReputationChanges { peer, changes },
}
}
}
impl From<peer_info::PeerInfoEvent> for BehaviourOut {
fn from(event: peer_info::PeerInfoEvent) -> Self {
let peer_info::PeerInfoEvent::Identified { peer_id, info } = event;
BehaviourOut::PeerIdentify { peer_id, info }
}
}
impl From<DiscoveryOut> for BehaviourOut {
fn from(event: DiscoveryOut) -> Self {
match event {
DiscoveryOut::UnroutablePeer(_peer_id) => {
BehaviourOut::None
},
DiscoveryOut::Discovered(peer_id) => BehaviourOut::Discovered(peer_id),
DiscoveryOut::ValueFound(results, duration) =>
BehaviourOut::Dht(DhtEvent::ValueFound(results), duration),
DiscoveryOut::ValueNotFound(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValueNotFound(key), duration),
DiscoveryOut::ValuePut(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePut(key), duration),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key), duration),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
}