Skip to main content

soil_network/service/
traits.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Traits defined by `soil-network`.
8
9use crate::{
10	config::{IncomingRequest, MultiaddrWithPeerId, NotificationHandshake, Params, SetConfig},
11	error::{self, Error},
12	event::Event,
13	network_state::NetworkState,
14	request_responses::{IfDisconnected, RequestFailure},
15	service::{metrics::NotificationMetrics, signature::Signature, PeerStoreProvider},
16	types::ProtocolName,
17	ReputationChange,
18};
19
20use futures::{channel::oneshot, Stream};
21use soil_prometheus::Registry;
22
23use crate::common::{role::ObservedRole, ExHashT};
24pub use crate::types::{
25	kad::{Key as KademliaKey, Record},
26	multiaddr::Multiaddr,
27	PeerId,
28};
29use soil_client::client_api::BlockBackend;
30use subsoil::runtime::traits::Block as BlockT;
31
32use std::{
33	collections::HashSet,
34	fmt::Debug,
35	future::Future,
36	pin::Pin,
37	sync::Arc,
38	time::{Duration, Instant},
39};
40
41pub use libp2p::identity::SigningError;
42
43/// Supertrait defining the services provided by [`NetworkBackend`] service handle.
44pub trait NetworkService:
45	NetworkSigner
46	+ NetworkDHTProvider
47	+ NetworkStatusProvider
48	+ NetworkPeers
49	+ NetworkEventStream
50	+ NetworkStateInfo
51	+ NetworkRequest
52	+ Send
53	+ Sync
54	+ 'static
55{
56}
57
58impl<T> NetworkService for T where
59	T: NetworkSigner
60		+ NetworkDHTProvider
61		+ NetworkStatusProvider
62		+ NetworkPeers
63		+ NetworkEventStream
64		+ NetworkStateInfo
65		+ NetworkRequest
66		+ Send
67		+ Sync
68		+ 'static
69{
70}
71
72/// Trait defining the required functionality from a notification protocol configuration.
73pub trait NotificationConfig: Debug {
74	/// Get access to the `SetConfig` of the notification protocol.
75	fn set_config(&self) -> &SetConfig;
76
77	/// Get protocol name.
78	fn protocol_name(&self) -> &ProtocolName;
79}
80
81/// Trait defining the required functionality from a request-response protocol configuration.
82pub trait RequestResponseConfig: Debug {
83	/// Get protocol name.
84	fn protocol_name(&self) -> &ProtocolName;
85}
86
87/// Trait defining required functionality from `PeerStore`.
88#[async_trait::async_trait]
89pub trait PeerStore {
90	/// Get handle to `PeerStore`.
91	fn handle(&self) -> Arc<dyn PeerStoreProvider>;
92
93	/// Start running `PeerStore` event loop.
94	async fn run(self);
95}
96
97/// Networking backend.
98#[async_trait::async_trait]
99pub trait NetworkBackend<B: BlockT + 'static, H: ExHashT>: Send + 'static {
100	/// Type representing notification protocol-related configuration.
101	type NotificationProtocolConfig: NotificationConfig;
102
103	/// Type representing request-response protocol-related configuration.
104	type RequestResponseProtocolConfig: RequestResponseConfig;
105
106	/// Type implementing `NetworkService` for the networking backend.
107	///
108	/// `NetworkService` allows other subsystems of the blockchain to interact with `soil-network`
109	/// using `NetworkService`.
110	type NetworkService<Block, Hash>: NetworkService + Clone;
111
112	/// Type implementing [`PeerStore`].
113	type PeerStore: PeerStore;
114
115	/// Bitswap config.
116	type BitswapConfig;
117
118	/// Create new `NetworkBackend`.
119	fn new(params: Params<B, H, Self>) -> Result<Self, Error>
120	where
121		Self: Sized;
122
123	/// Get handle to `NetworkService` of the `NetworkBackend`.
124	fn network_service(&self) -> Arc<dyn NetworkService>;
125
126	/// Create [`PeerStore`].
127	fn peer_store(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self::PeerStore;
128
129	/// Register metrics that are used by the notification protocols.
130	fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics;
131
132	/// Create Bitswap server.
133	fn bitswap_server(
134		client: Arc<dyn BlockBackend<B> + Send + Sync>,
135	) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig);
136
137	/// Create notification protocol configuration and an associated `NotificationService`
138	/// for the protocol.
139	fn notification_config(
140		protocol_name: ProtocolName,
141		fallback_names: Vec<ProtocolName>,
142		max_notification_size: u64,
143		handshake: Option<NotificationHandshake>,
144		set_config: SetConfig,
145		metrics: NotificationMetrics,
146		peerstore_handle: Arc<dyn PeerStoreProvider>,
147	) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>);
148
149	/// Create request-response protocol configuration.
150	fn request_response_config(
151		protocol_name: ProtocolName,
152		fallback_names: Vec<ProtocolName>,
153		max_request_size: u64,
154		max_response_size: u64,
155		request_timeout: Duration,
156		inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
157	) -> Self::RequestResponseProtocolConfig;
158
159	/// Start [`NetworkBackend`] event loop.
160	async fn run(mut self);
161}
162
163/// Signer with network identity
164pub trait NetworkSigner {
165	/// Signs the message with the `KeyPair` that defines the local [`PeerId`].
166	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError>;
167
168	/// Verify signature using peer's public key.
169	///
170	/// `public_key` must be Protobuf-encoded ed25519 public key.
171	///
172	/// Returns `Err(())` if public cannot be parsed into a valid ed25519 public key.
173	fn verify(
174		&self,
175		peer_id: crate::types::PeerId,
176		public_key: &Vec<u8>,
177		signature: &Vec<u8>,
178		message: &Vec<u8>,
179	) -> Result<bool, String>;
180}
181
182impl<T> NetworkSigner for Arc<T>
183where
184	T: ?Sized,
185	T: NetworkSigner,
186{
187	fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
188		T::sign_with_local_identity(self, msg)
189	}
190
191	fn verify(
192		&self,
193		peer_id: crate::types::PeerId,
194		public_key: &Vec<u8>,
195		signature: &Vec<u8>,
196		message: &Vec<u8>,
197	) -> Result<bool, String> {
198		T::verify(self, peer_id, public_key, signature, message)
199	}
200}
201
202/// Provides access to the networking DHT.
203pub trait NetworkDHTProvider {
204	/// Start finding closest peers to the target.
205	fn find_closest_peers(&self, target: PeerId);
206
207	/// Start getting a value from the DHT.
208	fn get_value(&self, key: &KademliaKey);
209
210	/// Start putting a value in the DHT.
211	fn put_value(&self, key: KademliaKey, value: Vec<u8>);
212
213	/// Start putting the record to `peers`.
214	///
215	/// If `update_local_storage` is true the local storage is udpated as well.
216	fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool);
217
218	/// Store a record in the DHT memory store.
219	fn store_record(
220		&self,
221		key: KademliaKey,
222		value: Vec<u8>,
223		publisher: Option<PeerId>,
224		expires: Option<Instant>,
225	);
226
227	/// Register this node as a provider for `key` on the DHT.
228	fn start_providing(&self, key: KademliaKey);
229
230	/// Deregister this node as a provider for `key` on the DHT.
231	fn stop_providing(&self, key: KademliaKey);
232
233	/// Start getting the list of providers for `key` on the DHT.
234	fn get_providers(&self, key: KademliaKey);
235}
236
237impl<T> NetworkDHTProvider for Arc<T>
238where
239	T: ?Sized,
240	T: NetworkDHTProvider,
241{
242	fn find_closest_peers(&self, target: PeerId) {
243		T::find_closest_peers(self, target)
244	}
245
246	fn get_value(&self, key: &KademliaKey) {
247		T::get_value(self, key)
248	}
249
250	fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
251		T::put_value(self, key, value)
252	}
253
254	fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
255		T::put_record_to(self, record, peers, update_local_storage)
256	}
257
258	fn store_record(
259		&self,
260		key: KademliaKey,
261		value: Vec<u8>,
262		publisher: Option<PeerId>,
263		expires: Option<Instant>,
264	) {
265		T::store_record(self, key, value, publisher, expires)
266	}
267
268	fn start_providing(&self, key: KademliaKey) {
269		T::start_providing(self, key)
270	}
271
272	fn stop_providing(&self, key: KademliaKey) {
273		T::stop_providing(self, key)
274	}
275
276	fn get_providers(&self, key: KademliaKey) {
277		T::get_providers(self, key)
278	}
279}
280
281/// Provides an ability to set a fork sync request for a particular block.
282pub trait NetworkSyncForkRequest<BlockHash, BlockNumber> {
283	/// Notifies the sync service to try and sync the given block from the given
284	/// peers.
285	///
286	/// If the given vector of peers is empty then the underlying implementation
287	/// should make a best effort to fetch the block from any peers it is
288	/// connected to (NOTE: this assumption will change in the future #3629).
289	fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: BlockHash, number: BlockNumber);
290}
291
292impl<T, BlockHash, BlockNumber> NetworkSyncForkRequest<BlockHash, BlockNumber> for Arc<T>
293where
294	T: ?Sized,
295	T: NetworkSyncForkRequest<BlockHash, BlockNumber>,
296{
297	fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: BlockHash, number: BlockNumber) {
298		T::set_sync_fork_request(self, peers, hash, number)
299	}
300}
301
302/// Overview status of the network.
303#[derive(Clone)]
304pub struct NetworkStatus {
305	/// Total number of connected peers.
306	pub num_connected_peers: usize,
307	/// The total number of bytes received.
308	pub total_bytes_inbound: u64,
309	/// The total number of bytes sent.
310	pub total_bytes_outbound: u64,
311}
312
313/// Provides high-level status information about network.
314#[async_trait::async_trait]
315pub trait NetworkStatusProvider {
316	/// High-level network status information.
317	///
318	/// Returns an error if the `NetworkWorker` is no longer running.
319	async fn status(&self) -> Result<NetworkStatus, ()>;
320
321	/// Get the network state.
322	///
323	/// Returns an error if the `NetworkWorker` is no longer running.
324	async fn network_state(&self) -> Result<NetworkState, ()>;
325}
326
327// Manual implementation to avoid extra boxing here
328impl<T> NetworkStatusProvider for Arc<T>
329where
330	T: ?Sized,
331	T: NetworkStatusProvider,
332{
333	fn status<'life0, 'async_trait>(
334		&'life0 self,
335	) -> Pin<Box<dyn Future<Output = Result<NetworkStatus, ()>> + Send + 'async_trait>>
336	where
337		'life0: 'async_trait,
338		Self: 'async_trait,
339	{
340		T::status(self)
341	}
342
343	fn network_state<'life0, 'async_trait>(
344		&'life0 self,
345	) -> Pin<Box<dyn Future<Output = Result<NetworkState, ()>> + Send + 'async_trait>>
346	where
347		'life0: 'async_trait,
348		Self: 'async_trait,
349	{
350		T::network_state(self)
351	}
352}
353
354/// Provides low-level API for manipulating network peers.
355#[async_trait::async_trait]
356pub trait NetworkPeers {
357	/// Set authorized peers.
358	///
359	/// Need a better solution to manage authorized peers, but now just use reserved peers for
360	/// prototyping.
361	fn set_authorized_peers(&self, peers: HashSet<PeerId>);
362
363	/// Set authorized_only flag.
364	///
365	/// Need a better solution to decide authorized_only, but now just use reserved_only flag for
366	/// prototyping.
367	fn set_authorized_only(&self, reserved_only: bool);
368
369	/// Adds an address known to a node.
370	fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr);
371
372	/// Report a given peer as either beneficial (+) or costly (-) according to the
373	/// given scalar.
374	fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange);
375
376	/// Get peer reputation.
377	fn peer_reputation(&self, peer_id: &PeerId) -> i32;
378
379	/// Disconnect from a node as soon as possible.
380	///
381	/// This triggers the same effects as if the connection had closed itself spontaneously.
382	fn disconnect_peer(&self, peer_id: PeerId, protocol: ProtocolName);
383
384	/// Connect to unreserved peers and allow unreserved peers to connect for syncing purposes.
385	fn accept_unreserved_peers(&self);
386
387	/// Disconnect from unreserved peers and deny new unreserved peers to connect for syncing
388	/// purposes.
389	fn deny_unreserved_peers(&self);
390
391	/// Adds a `PeerId` and its `Multiaddr` as reserved for a sync protocol (default peer set).
392	///
393	/// Returns an `Err` if the given string is not a valid multiaddress
394	/// or contains an invalid peer ID (which includes the local peer ID).
395	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>;
396
397	/// Removes a `PeerId` from the list of reserved peers for a sync protocol (default peer set).
398	fn remove_reserved_peer(&self, peer_id: PeerId);
399
400	/// Sets the reserved set of a protocol to the given set of peers.
401	///
402	/// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also
403	/// consist of only `/p2p/<peerid>`.
404	///
405	/// The node will start establishing/accepting connections and substreams to/from peers in this
406	/// set, if it doesn't have any substream open with them yet.
407	///
408	/// Note however, if a call to this function results in less peers on the reserved set, they
409	/// will not necessarily get disconnected (depending on available free slots in the peer set).
410	/// If you want to also disconnect those removed peers, you will have to call
411	/// `remove_from_peers_set` on those in addition to updating the reserved set. You can omit
412	/// this step if the peer set is in reserved only mode.
413	///
414	/// Returns an `Err` if one of the given addresses is invalid or contains an
415	/// invalid peer ID (which includes the local peer ID), or if `protocol` does not
416	/// refer to a known protocol.
417	fn set_reserved_peers(
418		&self,
419		protocol: ProtocolName,
420		peers: HashSet<Multiaddr>,
421	) -> Result<(), String>;
422
423	/// Add peers to a peer set.
424	///
425	/// Each `Multiaddr` must end with a `/p2p/` component containing the `PeerId`. It can also
426	/// consist of only `/p2p/<peerid>`.
427	///
428	/// Returns an `Err` if one of the given addresses is invalid or contains an
429	/// invalid peer ID (which includes the local peer ID), or if `protocol` does not
430	/// refer to a know protocol.
431	fn add_peers_to_reserved_set(
432		&self,
433		protocol: ProtocolName,
434		peers: HashSet<Multiaddr>,
435	) -> Result<(), String>;
436
437	/// Remove peers from a peer set.
438	///
439	/// Returns `Err` if `protocol` does not refer to a known protocol.
440	fn remove_peers_from_reserved_set(
441		&self,
442		protocol: ProtocolName,
443		peers: Vec<PeerId>,
444	) -> Result<(), String>;
445
446	/// Returns the number of peers in the sync peer set we're connected to.
447	fn sync_num_connected(&self) -> usize;
448
449	/// Attempt to get peer role.
450	///
451	/// Right now the peer role is decoded from the received handshake for all protocols
452	/// (`/block-announces/1` has other information as well). If the handshake cannot be
453	/// decoded into a role, the role queried from `PeerStore` and if the role is not stored
454	/// there either, `None` is returned and the peer should be discarded.
455	fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole>;
456
457	/// Get the list of reserved peers.
458	///
459	/// Returns an error if the `NetworkWorker` is no longer running.
460	async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()>;
461}
462
463// Manual implementation to avoid extra boxing here
464#[async_trait::async_trait]
465impl<T> NetworkPeers for Arc<T>
466where
467	T: ?Sized,
468	T: NetworkPeers,
469{
470	fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
471		T::set_authorized_peers(self, peers)
472	}
473
474	fn set_authorized_only(&self, reserved_only: bool) {
475		T::set_authorized_only(self, reserved_only)
476	}
477
478	fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) {
479		T::add_known_address(self, peer_id, addr)
480	}
481
482	fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
483		T::report_peer(self, peer_id, cost_benefit)
484	}
485
486	fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
487		T::peer_reputation(self, peer_id)
488	}
489
490	fn disconnect_peer(&self, peer_id: PeerId, protocol: ProtocolName) {
491		T::disconnect_peer(self, peer_id, protocol)
492	}
493
494	fn accept_unreserved_peers(&self) {
495		T::accept_unreserved_peers(self)
496	}
497
498	fn deny_unreserved_peers(&self) {
499		T::deny_unreserved_peers(self)
500	}
501
502	fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
503		T::add_reserved_peer(self, peer)
504	}
505
506	fn remove_reserved_peer(&self, peer_id: PeerId) {
507		T::remove_reserved_peer(self, peer_id)
508	}
509
510	fn set_reserved_peers(
511		&self,
512		protocol: ProtocolName,
513		peers: HashSet<Multiaddr>,
514	) -> Result<(), String> {
515		T::set_reserved_peers(self, protocol, peers)
516	}
517
518	fn add_peers_to_reserved_set(
519		&self,
520		protocol: ProtocolName,
521		peers: HashSet<Multiaddr>,
522	) -> Result<(), String> {
523		T::add_peers_to_reserved_set(self, protocol, peers)
524	}
525
526	fn remove_peers_from_reserved_set(
527		&self,
528		protocol: ProtocolName,
529		peers: Vec<PeerId>,
530	) -> Result<(), String> {
531		T::remove_peers_from_reserved_set(self, protocol, peers)
532	}
533
534	fn sync_num_connected(&self) -> usize {
535		T::sync_num_connected(self)
536	}
537
538	fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
539		T::peer_role(self, peer_id, handshake)
540	}
541
542	fn reserved_peers<'life0, 'async_trait>(
543		&'life0 self,
544	) -> Pin<Box<dyn Future<Output = Result<Vec<PeerId>, ()>> + Send + 'async_trait>>
545	where
546		'life0: 'async_trait,
547		Self: 'async_trait,
548	{
549		T::reserved_peers(self)
550	}
551}
552
553/// Provides access to network-level event stream.
554pub trait NetworkEventStream {
555	/// Returns a stream containing the events that happen on the network.
556	///
557	/// If this method is called multiple times, the events are duplicated.
558	///
559	/// The stream never ends (unless the `NetworkWorker` gets shut down).
560	///
561	/// The name passed is used to identify the channel in the Prometheus metrics. Note that the
562	/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
563	/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
564	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>>;
565}
566
567impl<T> NetworkEventStream for Arc<T>
568where
569	T: ?Sized,
570	T: NetworkEventStream,
571{
572	fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
573		T::event_stream(self, name)
574	}
575}
576
577/// Trait for providing information about the local network state
578pub trait NetworkStateInfo {
579	/// Returns the local external addresses.
580	fn external_addresses(&self) -> Vec<Multiaddr>;
581
582	/// Returns the listening addresses (without trailing `/p2p/` with our `PeerId`).
583	fn listen_addresses(&self) -> Vec<Multiaddr>;
584
585	/// Returns the local Peer ID.
586	fn local_peer_id(&self) -> PeerId;
587}
588
589impl<T> NetworkStateInfo for Arc<T>
590where
591	T: ?Sized,
592	T: NetworkStateInfo,
593{
594	fn external_addresses(&self) -> Vec<Multiaddr> {
595		T::external_addresses(self)
596	}
597
598	fn listen_addresses(&self) -> Vec<Multiaddr> {
599		T::listen_addresses(self)
600	}
601
602	fn local_peer_id(&self) -> PeerId {
603		T::local_peer_id(self)
604	}
605}
606
607/// Reserved slot in the notifications buffer, ready to accept data.
608pub trait NotificationSenderReady {
609	/// Consumes this slots reservation and actually queues the notification.
610	///
611	/// NOTE: Traits can't consume itself, but calling this method second time will return an error.
612	fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError>;
613}
614
615/// A `NotificationSender` allows for sending notifications to a peer with a chosen protocol.
616#[async_trait::async_trait]
617pub trait NotificationSender: Send + Sync + 'static {
618	/// Returns a future that resolves when the `NotificationSender` is ready to send a
619	/// notification.
620	async fn ready(&self)
621		-> Result<Box<dyn NotificationSenderReady + '_>, NotificationSenderError>;
622}
623
624/// Error returned by the notification sink.
625#[derive(Debug, thiserror::Error)]
626pub enum NotificationSenderError {
627	/// The notification receiver has been closed, usually because the underlying connection
628	/// closed.
629	///
630	/// Some of the notifications most recently sent may not have been received. However,
631	/// the peer may still be connected and a new notification sink for the same
632	/// protocol obtained from [`NotificationService::message_sink()`].
633	#[error("The notification receiver has been closed")]
634	Closed,
635	/// Protocol name hasn't been registered.
636	#[error("Protocol name hasn't been registered")]
637	BadProtocol,
638}
639
640/// Provides ability to send network requests.
641#[async_trait::async_trait]
642pub trait NetworkRequest {
643	/// Sends a single targeted request to a specific peer. On success, returns the response of
644	/// the peer.
645	///
646	/// Request-response protocols are a way to complement notifications protocols, but
647	/// notifications should remain the default ways of communicating information. For example, a
648	/// peer can announce something through a notification, after which the recipient can obtain
649	/// more information by performing a request.
650	/// As such, call this function with `IfDisconnected::ImmediateError` for `connect`. This way
651	/// you will get an error immediately for disconnected peers, instead of waiting for a
652	/// potentially very long connection attempt, which would suggest that something is wrong
653	/// anyway, as you are supposed to be connected because of the notification protocol.
654	///
655	/// No limit or throttling of concurrent outbound requests per peer and protocol are enforced.
656	/// Such restrictions, if desired, need to be enforced at the call site(s).
657	///
658	/// The protocol must have been registered through
659	/// `NetworkConfiguration::request_response_protocols`.
660	async fn request(
661		&self,
662		target: PeerId,
663		protocol: ProtocolName,
664		request: Vec<u8>,
665		fallback_request: Option<(Vec<u8>, ProtocolName)>,
666		connect: IfDisconnected,
667	) -> Result<(Vec<u8>, ProtocolName), RequestFailure>;
668
669	/// Variation of `request` which starts a request whose response is delivered on a provided
670	/// channel.
671	///
672	/// Instead of blocking and waiting for a reply, this function returns immediately, sending
673	/// responses via the passed in sender. This alternative API exists to make it easier to
674	/// integrate with message passing APIs.
675	///
676	/// Keep in mind that the connected receiver might receive a `Canceled` event in case of a
677	/// closing connection. This is expected behaviour. With `request` you would get a
678	/// `RequestFailure::Network(OutboundFailure::ConnectionClosed)` in that case.
679	fn start_request(
680		&self,
681		target: PeerId,
682		protocol: ProtocolName,
683		request: Vec<u8>,
684		fallback_request: Option<(Vec<u8>, ProtocolName)>,
685		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
686		connect: IfDisconnected,
687	);
688}
689
690// Manual implementation to avoid extra boxing here
691impl<T> NetworkRequest for Arc<T>
692where
693	T: ?Sized,
694	T: NetworkRequest,
695{
696	fn request<'life0, 'async_trait>(
697		&'life0 self,
698		target: PeerId,
699		protocol: ProtocolName,
700		request: Vec<u8>,
701		fallback_request: Option<(Vec<u8>, ProtocolName)>,
702		connect: IfDisconnected,
703	) -> Pin<
704		Box<
705			dyn Future<Output = Result<(Vec<u8>, ProtocolName), RequestFailure>>
706				+ Send
707				+ 'async_trait,
708		>,
709	>
710	where
711		'life0: 'async_trait,
712		Self: 'async_trait,
713	{
714		T::request(self, target, protocol, request, fallback_request, connect)
715	}
716
717	fn start_request(
718		&self,
719		target: PeerId,
720		protocol: ProtocolName,
721		request: Vec<u8>,
722		fallback_request: Option<(Vec<u8>, ProtocolName)>,
723		tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
724		connect: IfDisconnected,
725	) {
726		T::start_request(self, target, protocol, request, fallback_request, tx, connect)
727	}
728}
729
730/// Provides ability to announce blocks to the network.
731pub trait NetworkBlock<BlockHash, BlockNumber> {
732	/// Make sure an important block is propagated to peers.
733	///
734	/// In chain-based consensus, we often need to make sure non-best forks are
735	/// at least temporarily synced. This function forces such an announcement.
736	fn announce_block(&self, hash: BlockHash, data: Option<Vec<u8>>);
737
738	/// Inform the network service about new best imported block.
739	fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber);
740}
741
742impl<T, BlockHash, BlockNumber> NetworkBlock<BlockHash, BlockNumber> for Arc<T>
743where
744	T: ?Sized,
745	T: NetworkBlock<BlockHash, BlockNumber>,
746{
747	fn announce_block(&self, hash: BlockHash, data: Option<Vec<u8>>) {
748		T::announce_block(self, hash, data)
749	}
750
751	fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber) {
752		T::new_best_block_imported(self, hash, number)
753	}
754}
755
756/// Substream acceptance result.
757#[derive(Debug, PartialEq, Eq)]
758pub enum ValidationResult {
759	/// Accept inbound substream.
760	Accept,
761
762	/// Reject inbound substream.
763	Reject,
764}
765
766/// Substream direction.
767#[derive(Debug, Copy, Clone, PartialEq, Eq)]
768pub enum Direction {
769	/// Substream opened by the remote node.
770	Inbound,
771
772	/// Substream opened by the local node.
773	Outbound,
774}
775
776impl From<litep2p::protocol::notification::Direction> for Direction {
777	fn from(direction: litep2p::protocol::notification::Direction) -> Self {
778		match direction {
779			litep2p::protocol::notification::Direction::Inbound => Direction::Inbound,
780			litep2p::protocol::notification::Direction::Outbound => Direction::Outbound,
781		}
782	}
783}
784
785impl Direction {
786	/// Is the direction inbound.
787	pub fn is_inbound(&self) -> bool {
788		std::matches!(self, Direction::Inbound)
789	}
790}
791
792/// Events received by the protocol from `Notifications`.
793#[derive(Debug)]
794pub enum NotificationEvent {
795	/// Validate inbound substream.
796	ValidateInboundSubstream {
797		/// Peer ID.
798		peer: PeerId,
799
800		/// Received handshake.
801		handshake: Vec<u8>,
802
803		/// `oneshot::Sender` for sending validation result back to `Notifications`
804		result_tx: tokio::sync::oneshot::Sender<ValidationResult>,
805	},
806
807	/// Remote identified by `PeerId` opened a substream and sent `Handshake`.
808	/// Validate `Handshake` and report status (accept/reject) to `Notifications`.
809	NotificationStreamOpened {
810		/// Peer ID.
811		peer: PeerId,
812
813		/// Is the substream inbound or outbound.
814		direction: Direction,
815
816		/// Received handshake.
817		handshake: Vec<u8>,
818
819		/// Negotiated fallback.
820		negotiated_fallback: Option<ProtocolName>,
821	},
822
823	/// Substream was closed.
824	NotificationStreamClosed {
825		/// Peer Id.
826		peer: PeerId,
827	},
828
829	/// Notification was received from the substream.
830	NotificationReceived {
831		/// Peer ID.
832		peer: PeerId,
833
834		/// Received notification.
835		notification: Vec<u8>,
836	},
837}
838
839/// Notification service
840///
841/// Defines behaviors that both the protocol implementations and `Notifications` can expect from
842/// each other.
843///
844/// `Notifications` can send two different kinds of information to protocol:
845///  * substream-related information
846///  * notification-related information
847///
848/// When an unvalidated, inbound substream is received by `Notifications`, it sends the inbound
849/// stream information (peer ID, handshake) to protocol for validation. Protocol must then verify
850/// that the handshake is valid (and in the future that it has a slot it can allocate for the peer)
851/// and then report back the `ValidationResult` which is either `Accept` or `Reject`.
852///
853/// After the validation result has been received by `Notifications`, it prepares the
854/// substream for communication by initializing the necessary sinks and emits
855/// `NotificationStreamOpened` which informs the protocol that the remote peer is ready to receive
856/// notifications.
857///
858/// Two different flavors of sending options are provided:
859///  * synchronous sending ([`NotificationService::send_sync_notification()`])
860///  * asynchronous sending ([`NotificationService::send_async_notification()`])
861///
862/// The former is used by the protocols not ready to exercise backpressure and the latter by the
863/// protocols that can do it.
864///
865/// Both local and remote peer can close the substream at any time. Local peer can do so by calling
866/// [`NotificationService::close_substream()`] which instructs `Notifications` to close the
867/// substream. Remote closing the substream is indicated to the local peer by receiving
868/// [`NotificationEvent::NotificationStreamClosed`] event.
869///
870/// In case the protocol must update its handshake while it's operating (such as updating the best
871/// block information), it can do so by calling [`NotificationService::set_handshake()`]
872/// which instructs `Notifications` to update the handshake it stored during protocol
873/// initialization.
874///
875/// All peer events are multiplexed on the same incoming event stream from `Notifications` and thus
876/// each event carries a `PeerId` so the protocol knows whose information to update when receiving
877/// an event.
878#[async_trait::async_trait]
879pub trait NotificationService: Debug + Send {
880	/// Instruct `Notifications` to open a new substream for `peer`.
881	///
882	/// `dial_if_disconnected` informs `Notifications` whether to dial
883	// the peer if there is currently no active connection to it.
884	//
885	// NOTE: not offered by the current implementation
886	async fn open_substream(&mut self, peer: PeerId) -> Result<(), ()>;
887
888	/// Instruct `Notifications` to close substream for `peer`.
889	// NOTE: not offered by the current implementation
890	async fn close_substream(&mut self, peer: PeerId) -> Result<(), ()>;
891
892	/// Send synchronous `notification` to `peer`.
893	fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>);
894
895	/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
896	///
897	/// Returns an error if the peer doesn't exist.
898	async fn send_async_notification(
899		&mut self,
900		peer: &PeerId,
901		notification: Vec<u8>,
902	) -> Result<(), error::Error>;
903
904	/// Set handshake for the notification protocol replacing the old handshake.
905	async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
906
907	/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
908	/// and returns an error if the channel is blocked.
909	///
910	/// Technically the function can return an error if the channel to `Notifications` is closed
911	/// but that doesn't happen under normal operation.
912	fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
913
914	/// Get next event from the `Notifications` event stream.
915	async fn next_event(&mut self) -> Option<NotificationEvent>;
916
917	/// Make a copy of the object so it can be shared between protocol components
918	/// who wish to have access to the same underlying notification protocol.
919	fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()>;
920
921	/// Get protocol name of the `NotificationService`.
922	fn protocol(&self) -> &ProtocolName;
923
924	/// Get message sink of the peer.
925	fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>>;
926}
927
928/// Message sink for peers.
929///
930/// If protocol cannot use [`NotificationService`] to send notifications to peers and requires,
931/// e.g., notifications to be sent in another task, the protocol may acquire a [`MessageSink`]
932/// object for each peer by calling [`NotificationService::message_sink()`]. Calling this
933/// function returns an object which allows the protocol to send notifications to the remote peer.
934///
935/// Use of this API is discouraged as it's not as performant as sending notifications through
936/// [`NotificationService`] due to synchronization required to keep the underlying notification
937/// sink up to date with possible sink replacement events.
938#[async_trait::async_trait]
939pub trait MessageSink: Send + Sync {
940	/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
941	fn send_sync_notification(&self, notification: Vec<u8>);
942
943	/// Send an asynchronous `notification` to to the peer associated with this [`MessageSink`],
944	/// allowing sender to exercise backpressure.
945	///
946	/// Returns an error if the peer does not exist.
947	async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error>;
948}
949
950/// Trait defining the behavior of a bandwidth sink.
951pub trait BandwidthSink: Send + Sync {
952	/// Get the number of bytes received.
953	fn total_inbound(&self) -> u64;
954
955	/// Get the number of bytes sent.
956	fn total_outbound(&self) -> u64;
957}