use crate::{
config::{IncomingRequest, MultiaddrWithPeerId, NotificationHandshake, Params, SetConfig},
error::{self, Error},
event::Event,
network_state::NetworkState,
request_responses::{IfDisconnected, RequestFailure},
service::{metrics::NotificationMetrics, signature::Signature, PeerStoreProvider},
types::ProtocolName,
ReputationChange,
};
use futures::{channel::oneshot, Stream};
use prometheus_endpoint::Registry;
use sc_client_api::BlockBackend;
use sc_network_common::{role::ObservedRole, ExHashT};
pub use sc_network_types::{
kad::{Key as KademliaKey, Record},
multiaddr::Multiaddr,
PeerId,
};
use sp_runtime::traits::Block as BlockT;
use std::{
collections::HashSet,
fmt::Debug,
future::Future,
pin::Pin,
sync::Arc,
time::{Duration, Instant},
};
pub use libp2p::identity::SigningError;
pub trait NetworkService:
NetworkSigner
+ NetworkDHTProvider
+ NetworkStatusProvider
+ NetworkPeers
+ NetworkEventStream
+ NetworkStateInfo
+ NetworkRequest
+ Send
+ Sync
+ 'static
{
}
impl<T> NetworkService for T where
T: NetworkSigner
+ NetworkDHTProvider
+ NetworkStatusProvider
+ NetworkPeers
+ NetworkEventStream
+ NetworkStateInfo
+ NetworkRequest
+ Send
+ Sync
+ 'static
{
}
pub trait NotificationConfig: Debug {
fn set_config(&self) -> &SetConfig;
fn protocol_name(&self) -> &ProtocolName;
}
pub trait RequestResponseConfig: Debug {
fn protocol_name(&self) -> &ProtocolName;
}
#[async_trait::async_trait]
pub trait PeerStore {
fn handle(&self) -> Arc<dyn PeerStoreProvider>;
async fn run(self);
}
#[async_trait::async_trait]
pub trait NetworkBackend<B: BlockT + 'static, H: ExHashT>: Send + 'static {
type NotificationProtocolConfig: NotificationConfig;
type RequestResponseProtocolConfig: RequestResponseConfig;
type NetworkService<Block, Hash>: NetworkService + Clone;
type PeerStore: PeerStore;
type BitswapConfig;
fn new(params: Params<B, H, Self>) -> Result<Self, Error>
where
Self: Sized;
fn network_service(&self) -> Arc<dyn NetworkService>;
fn peer_store(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self::PeerStore;
fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics;
fn bitswap_server(
client: Arc<dyn BlockBackend<B> + Send + Sync>,
) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig);
fn notification_config(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_notification_size: u64,
handshake: Option<NotificationHandshake>,
set_config: SetConfig,
metrics: NotificationMetrics,
peerstore_handle: Arc<dyn PeerStoreProvider>,
) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>);
fn request_response_config(
protocol_name: ProtocolName,
fallback_names: Vec<ProtocolName>,
max_request_size: u64,
max_response_size: u64,
request_timeout: Duration,
inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
) -> Self::RequestResponseProtocolConfig;
async fn run(mut self);
}
pub trait NetworkSigner {
fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError>;
fn verify(
&self,
peer_id: sc_network_types::PeerId,
public_key: &Vec<u8>,
signature: &Vec<u8>,
message: &Vec<u8>,
) -> Result<bool, String>;
}
impl<T> NetworkSigner for Arc<T>
where
T: ?Sized,
T: NetworkSigner,
{
fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
T::sign_with_local_identity(self, msg)
}
fn verify(
&self,
peer_id: sc_network_types::PeerId,
public_key: &Vec<u8>,
signature: &Vec<u8>,
message: &Vec<u8>,
) -> Result<bool, String> {
T::verify(self, peer_id, public_key, signature, message)
}
}
pub trait NetworkDHTProvider {
fn find_closest_peers(&self, target: PeerId);
fn get_value(&self, key: &KademliaKey);
fn put_value(&self, key: KademliaKey, value: Vec<u8>);
fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool);
fn store_record(
&self,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<PeerId>,
expires: Option<Instant>,
);
fn start_providing(&self, key: KademliaKey);
fn stop_providing(&self, key: KademliaKey);
fn get_providers(&self, key: KademliaKey);
}
impl<T> NetworkDHTProvider for Arc<T>
where
T: ?Sized,
T: NetworkDHTProvider,
{
fn find_closest_peers(&self, target: PeerId) {
T::find_closest_peers(self, target)
}
fn get_value(&self, key: &KademliaKey) {
T::get_value(self, key)
}
fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
T::put_value(self, key, value)
}
fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
T::put_record_to(self, record, peers, update_local_storage)
}
fn store_record(
&self,
key: KademliaKey,
value: Vec<u8>,
publisher: Option<PeerId>,
expires: Option<Instant>,
) {
T::store_record(self, key, value, publisher, expires)
}
fn start_providing(&self, key: KademliaKey) {
T::start_providing(self, key)
}
fn stop_providing(&self, key: KademliaKey) {
T::stop_providing(self, key)
}
fn get_providers(&self, key: KademliaKey) {
T::get_providers(self, key)
}
}
pub trait NetworkSyncForkRequest<BlockHash, BlockNumber> {
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: BlockHash, number: BlockNumber);
}
impl<T, BlockHash, BlockNumber> NetworkSyncForkRequest<BlockHash, BlockNumber> for Arc<T>
where
T: ?Sized,
T: NetworkSyncForkRequest<BlockHash, BlockNumber>,
{
fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: BlockHash, number: BlockNumber) {
T::set_sync_fork_request(self, peers, hash, number)
}
}
#[derive(Clone)]
pub struct NetworkStatus {
pub num_connected_peers: usize,
pub total_bytes_inbound: u64,
pub total_bytes_outbound: u64,
}
#[async_trait::async_trait]
pub trait NetworkStatusProvider {
async fn status(&self) -> Result<NetworkStatus, ()>;
async fn network_state(&self) -> Result<NetworkState, ()>;
}
impl<T> NetworkStatusProvider for Arc<T>
where
T: ?Sized,
T: NetworkStatusProvider,
{
fn status<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<NetworkStatus, ()>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
T::status(self)
}
fn network_state<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<NetworkState, ()>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
T::network_state(self)
}
}
#[async_trait::async_trait]
pub trait NetworkPeers {
fn set_authorized_peers(&self, peers: HashSet<PeerId>);
fn set_authorized_only(&self, reserved_only: bool);
fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr);
fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange);
fn peer_reputation(&self, peer_id: &PeerId) -> i32;
fn disconnect_peer(&self, peer_id: PeerId, protocol: ProtocolName);
fn accept_unreserved_peers(&self);
fn deny_unreserved_peers(&self);
fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>;
fn remove_reserved_peer(&self, peer_id: PeerId);
fn set_reserved_peers(
&self,
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
) -> Result<(), String>;
fn add_peers_to_reserved_set(
&self,
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
) -> Result<(), String>;
fn remove_peers_from_reserved_set(
&self,
protocol: ProtocolName,
peers: Vec<PeerId>,
) -> Result<(), String>;
fn sync_num_connected(&self) -> usize;
fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole>;
async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()>;
}
#[async_trait::async_trait]
impl<T> NetworkPeers for Arc<T>
where
T: ?Sized,
T: NetworkPeers,
{
fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
T::set_authorized_peers(self, peers)
}
fn set_authorized_only(&self, reserved_only: bool) {
T::set_authorized_only(self, reserved_only)
}
fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) {
T::add_known_address(self, peer_id, addr)
}
fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
T::report_peer(self, peer_id, cost_benefit)
}
fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
T::peer_reputation(self, peer_id)
}
fn disconnect_peer(&self, peer_id: PeerId, protocol: ProtocolName) {
T::disconnect_peer(self, peer_id, protocol)
}
fn accept_unreserved_peers(&self) {
T::accept_unreserved_peers(self)
}
fn deny_unreserved_peers(&self) {
T::deny_unreserved_peers(self)
}
fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
T::add_reserved_peer(self, peer)
}
fn remove_reserved_peer(&self, peer_id: PeerId) {
T::remove_reserved_peer(self, peer_id)
}
fn set_reserved_peers(
&self,
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
) -> Result<(), String> {
T::set_reserved_peers(self, protocol, peers)
}
fn add_peers_to_reserved_set(
&self,
protocol: ProtocolName,
peers: HashSet<Multiaddr>,
) -> Result<(), String> {
T::add_peers_to_reserved_set(self, protocol, peers)
}
fn remove_peers_from_reserved_set(
&self,
protocol: ProtocolName,
peers: Vec<PeerId>,
) -> Result<(), String> {
T::remove_peers_from_reserved_set(self, protocol, peers)
}
fn sync_num_connected(&self) -> usize {
T::sync_num_connected(self)
}
fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
T::peer_role(self, peer_id, handshake)
}
fn reserved_peers<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<PeerId>, ()>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait,
{
T::reserved_peers(self)
}
}
pub trait NetworkEventStream {
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>>;
}
impl<T> NetworkEventStream for Arc<T>
where
T: ?Sized,
T: NetworkEventStream,
{
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
T::event_stream(self, name)
}
}
pub trait NetworkStateInfo {
fn external_addresses(&self) -> Vec<Multiaddr>;
fn listen_addresses(&self) -> Vec<Multiaddr>;
fn local_peer_id(&self) -> PeerId;
}
impl<T> NetworkStateInfo for Arc<T>
where
T: ?Sized,
T: NetworkStateInfo,
{
fn external_addresses(&self) -> Vec<Multiaddr> {
T::external_addresses(self)
}
fn listen_addresses(&self) -> Vec<Multiaddr> {
T::listen_addresses(self)
}
fn local_peer_id(&self) -> PeerId {
T::local_peer_id(self)
}
}
pub trait NotificationSenderReady {
fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError>;
}
#[async_trait::async_trait]
pub trait NotificationSender: Send + Sync + 'static {
async fn ready(&self)
-> Result<Box<dyn NotificationSenderReady + '_>, NotificationSenderError>;
}
#[derive(Debug, thiserror::Error)]
pub enum NotificationSenderError {
#[error("The notification receiver has been closed")]
Closed,
#[error("Protocol name hasn't been registered")]
BadProtocol,
}
#[async_trait::async_trait]
pub trait NetworkRequest {
async fn request(
&self,
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Result<(Vec<u8>, ProtocolName), RequestFailure>;
fn start_request(
&self,
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
);
}
impl<T> NetworkRequest for Arc<T>
where
T: ?Sized,
T: NetworkRequest,
{
fn request<'life0, 'async_trait>(
&'life0 self,
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
connect: IfDisconnected,
) -> Pin<
Box<
dyn Future<Output = Result<(Vec<u8>, ProtocolName), RequestFailure>>
+ Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
T::request(self, target, protocol, request, fallback_request, connect)
}
fn start_request(
&self,
target: PeerId,
protocol: ProtocolName,
request: Vec<u8>,
fallback_request: Option<(Vec<u8>, ProtocolName)>,
tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
connect: IfDisconnected,
) {
T::start_request(self, target, protocol, request, fallback_request, tx, connect)
}
}
pub trait NetworkBlock<BlockHash, BlockNumber> {
fn announce_block(&self, hash: BlockHash, data: Option<Vec<u8>>);
fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber);
}
impl<T, BlockHash, BlockNumber> NetworkBlock<BlockHash, BlockNumber> for Arc<T>
where
T: ?Sized,
T: NetworkBlock<BlockHash, BlockNumber>,
{
fn announce_block(&self, hash: BlockHash, data: Option<Vec<u8>>) {
T::announce_block(self, hash, data)
}
fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber) {
T::new_best_block_imported(self, hash, number)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum ValidationResult {
Accept,
Reject,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Direction {
Inbound,
Outbound,
}
impl From<litep2p::protocol::notification::Direction> for Direction {
fn from(direction: litep2p::protocol::notification::Direction) -> Self {
match direction {
litep2p::protocol::notification::Direction::Inbound => Direction::Inbound,
litep2p::protocol::notification::Direction::Outbound => Direction::Outbound,
}
}
}
impl Direction {
pub fn is_inbound(&self) -> bool {
std::matches!(self, Direction::Inbound)
}
}
#[derive(Debug)]
pub enum NotificationEvent {
ValidateInboundSubstream {
peer: PeerId,
handshake: Vec<u8>,
result_tx: tokio::sync::oneshot::Sender<ValidationResult>,
},
NotificationStreamOpened {
peer: PeerId,
direction: Direction,
handshake: Vec<u8>,
negotiated_fallback: Option<ProtocolName>,
},
NotificationStreamClosed {
peer: PeerId,
},
NotificationReceived {
peer: PeerId,
notification: Vec<u8>,
},
}
#[async_trait::async_trait]
pub trait NotificationService: Debug + Send {
async fn open_substream(&mut self, peer: PeerId) -> Result<(), ()>;
async fn close_substream(&mut self, peer: PeerId) -> Result<(), ()>;
fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>);
async fn send_async_notification(
&mut self,
peer: &PeerId,
notification: Vec<u8>,
) -> Result<(), error::Error>;
async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
async fn next_event(&mut self) -> Option<NotificationEvent>;
fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()>;
fn protocol(&self) -> &ProtocolName;
fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>>;
}
#[async_trait::async_trait]
pub trait MessageSink: Send + Sync {
fn send_sync_notification(&self, notification: Vec<u8>);
async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error>;
}
pub trait BandwidthSink: Send + Sync {
fn total_inbound(&self) -> u64;
fn total_outbound(&self) -> u64;
}