1use crate::{
10 config::{IncomingRequest, MultiaddrWithPeerId, NotificationHandshake, Params, SetConfig},
11 error::{self, Error},
12 event::Event,
13 network_state::NetworkState,
14 request_responses::{IfDisconnected, RequestFailure},
15 service::{metrics::NotificationMetrics, signature::Signature, PeerStoreProvider},
16 types::ProtocolName,
17 ReputationChange,
18};
19
20use futures::{channel::oneshot, Stream};
21use soil_prometheus::Registry;
22
23use crate::common::{role::ObservedRole, ExHashT};
24pub use crate::types::{
25 kad::{Key as KademliaKey, Record},
26 multiaddr::Multiaddr,
27 PeerId,
28};
29use soil_client::client_api::BlockBackend;
30use subsoil::runtime::traits::Block as BlockT;
31
32use std::{
33 collections::HashSet,
34 fmt::Debug,
35 future::Future,
36 pin::Pin,
37 sync::Arc,
38 time::{Duration, Instant},
39};
40
41pub use libp2p::identity::SigningError;
42
43pub trait NetworkService:
45 NetworkSigner
46 + NetworkDHTProvider
47 + NetworkStatusProvider
48 + NetworkPeers
49 + NetworkEventStream
50 + NetworkStateInfo
51 + NetworkRequest
52 + Send
53 + Sync
54 + 'static
55{
56}
57
58impl<T> NetworkService for T where
59 T: NetworkSigner
60 + NetworkDHTProvider
61 + NetworkStatusProvider
62 + NetworkPeers
63 + NetworkEventStream
64 + NetworkStateInfo
65 + NetworkRequest
66 + Send
67 + Sync
68 + 'static
69{
70}
71
72pub trait NotificationConfig: Debug {
74 fn set_config(&self) -> &SetConfig;
76
77 fn protocol_name(&self) -> &ProtocolName;
79}
80
81pub trait RequestResponseConfig: Debug {
83 fn protocol_name(&self) -> &ProtocolName;
85}
86
87#[async_trait::async_trait]
89pub trait PeerStore {
90 fn handle(&self) -> Arc<dyn PeerStoreProvider>;
92
93 async fn run(self);
95}
96
97#[async_trait::async_trait]
99pub trait NetworkBackend<B: BlockT + 'static, H: ExHashT>: Send + 'static {
100 type NotificationProtocolConfig: NotificationConfig;
102
103 type RequestResponseProtocolConfig: RequestResponseConfig;
105
106 type NetworkService<Block, Hash>: NetworkService + Clone;
111
112 type PeerStore: PeerStore;
114
115 type BitswapConfig;
117
118 fn new(params: Params<B, H, Self>) -> Result<Self, Error>
120 where
121 Self: Sized;
122
123 fn network_service(&self) -> Arc<dyn NetworkService>;
125
126 fn peer_store(bootnodes: Vec<PeerId>, metrics_registry: Option<Registry>) -> Self::PeerStore;
128
129 fn register_notification_metrics(registry: Option<&Registry>) -> NotificationMetrics;
131
132 fn bitswap_server(
134 client: Arc<dyn BlockBackend<B> + Send + Sync>,
135 ) -> (Pin<Box<dyn Future<Output = ()> + Send>>, Self::BitswapConfig);
136
137 fn notification_config(
140 protocol_name: ProtocolName,
141 fallback_names: Vec<ProtocolName>,
142 max_notification_size: u64,
143 handshake: Option<NotificationHandshake>,
144 set_config: SetConfig,
145 metrics: NotificationMetrics,
146 peerstore_handle: Arc<dyn PeerStoreProvider>,
147 ) -> (Self::NotificationProtocolConfig, Box<dyn NotificationService>);
148
149 fn request_response_config(
151 protocol_name: ProtocolName,
152 fallback_names: Vec<ProtocolName>,
153 max_request_size: u64,
154 max_response_size: u64,
155 request_timeout: Duration,
156 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
157 ) -> Self::RequestResponseProtocolConfig;
158
159 async fn run(mut self);
161}
162
163pub trait NetworkSigner {
165 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError>;
167
168 fn verify(
174 &self,
175 peer_id: crate::types::PeerId,
176 public_key: &Vec<u8>,
177 signature: &Vec<u8>,
178 message: &Vec<u8>,
179 ) -> Result<bool, String>;
180}
181
182impl<T> NetworkSigner for Arc<T>
183where
184 T: ?Sized,
185 T: NetworkSigner,
186{
187 fn sign_with_local_identity(&self, msg: Vec<u8>) -> Result<Signature, SigningError> {
188 T::sign_with_local_identity(self, msg)
189 }
190
191 fn verify(
192 &self,
193 peer_id: crate::types::PeerId,
194 public_key: &Vec<u8>,
195 signature: &Vec<u8>,
196 message: &Vec<u8>,
197 ) -> Result<bool, String> {
198 T::verify(self, peer_id, public_key, signature, message)
199 }
200}
201
202pub trait NetworkDHTProvider {
204 fn find_closest_peers(&self, target: PeerId);
206
207 fn get_value(&self, key: &KademliaKey);
209
210 fn put_value(&self, key: KademliaKey, value: Vec<u8>);
212
213 fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool);
217
218 fn store_record(
220 &self,
221 key: KademliaKey,
222 value: Vec<u8>,
223 publisher: Option<PeerId>,
224 expires: Option<Instant>,
225 );
226
227 fn start_providing(&self, key: KademliaKey);
229
230 fn stop_providing(&self, key: KademliaKey);
232
233 fn get_providers(&self, key: KademliaKey);
235}
236
237impl<T> NetworkDHTProvider for Arc<T>
238where
239 T: ?Sized,
240 T: NetworkDHTProvider,
241{
242 fn find_closest_peers(&self, target: PeerId) {
243 T::find_closest_peers(self, target)
244 }
245
246 fn get_value(&self, key: &KademliaKey) {
247 T::get_value(self, key)
248 }
249
250 fn put_value(&self, key: KademliaKey, value: Vec<u8>) {
251 T::put_value(self, key, value)
252 }
253
254 fn put_record_to(&self, record: Record, peers: HashSet<PeerId>, update_local_storage: bool) {
255 T::put_record_to(self, record, peers, update_local_storage)
256 }
257
258 fn store_record(
259 &self,
260 key: KademliaKey,
261 value: Vec<u8>,
262 publisher: Option<PeerId>,
263 expires: Option<Instant>,
264 ) {
265 T::store_record(self, key, value, publisher, expires)
266 }
267
268 fn start_providing(&self, key: KademliaKey) {
269 T::start_providing(self, key)
270 }
271
272 fn stop_providing(&self, key: KademliaKey) {
273 T::stop_providing(self, key)
274 }
275
276 fn get_providers(&self, key: KademliaKey) {
277 T::get_providers(self, key)
278 }
279}
280
281pub trait NetworkSyncForkRequest<BlockHash, BlockNumber> {
283 fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: BlockHash, number: BlockNumber);
290}
291
292impl<T, BlockHash, BlockNumber> NetworkSyncForkRequest<BlockHash, BlockNumber> for Arc<T>
293where
294 T: ?Sized,
295 T: NetworkSyncForkRequest<BlockHash, BlockNumber>,
296{
297 fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: BlockHash, number: BlockNumber) {
298 T::set_sync_fork_request(self, peers, hash, number)
299 }
300}
301
302#[derive(Clone)]
304pub struct NetworkStatus {
305 pub num_connected_peers: usize,
307 pub total_bytes_inbound: u64,
309 pub total_bytes_outbound: u64,
311}
312
313#[async_trait::async_trait]
315pub trait NetworkStatusProvider {
316 async fn status(&self) -> Result<NetworkStatus, ()>;
320
321 async fn network_state(&self) -> Result<NetworkState, ()>;
325}
326
327impl<T> NetworkStatusProvider for Arc<T>
329where
330 T: ?Sized,
331 T: NetworkStatusProvider,
332{
333 fn status<'life0, 'async_trait>(
334 &'life0 self,
335 ) -> Pin<Box<dyn Future<Output = Result<NetworkStatus, ()>> + Send + 'async_trait>>
336 where
337 'life0: 'async_trait,
338 Self: 'async_trait,
339 {
340 T::status(self)
341 }
342
343 fn network_state<'life0, 'async_trait>(
344 &'life0 self,
345 ) -> Pin<Box<dyn Future<Output = Result<NetworkState, ()>> + Send + 'async_trait>>
346 where
347 'life0: 'async_trait,
348 Self: 'async_trait,
349 {
350 T::network_state(self)
351 }
352}
353
354#[async_trait::async_trait]
356pub trait NetworkPeers {
357 fn set_authorized_peers(&self, peers: HashSet<PeerId>);
362
363 fn set_authorized_only(&self, reserved_only: bool);
368
369 fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr);
371
372 fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange);
375
376 fn peer_reputation(&self, peer_id: &PeerId) -> i32;
378
379 fn disconnect_peer(&self, peer_id: PeerId, protocol: ProtocolName);
383
384 fn accept_unreserved_peers(&self);
386
387 fn deny_unreserved_peers(&self);
390
391 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>;
396
397 fn remove_reserved_peer(&self, peer_id: PeerId);
399
400 fn set_reserved_peers(
418 &self,
419 protocol: ProtocolName,
420 peers: HashSet<Multiaddr>,
421 ) -> Result<(), String>;
422
423 fn add_peers_to_reserved_set(
432 &self,
433 protocol: ProtocolName,
434 peers: HashSet<Multiaddr>,
435 ) -> Result<(), String>;
436
437 fn remove_peers_from_reserved_set(
441 &self,
442 protocol: ProtocolName,
443 peers: Vec<PeerId>,
444 ) -> Result<(), String>;
445
446 fn sync_num_connected(&self) -> usize;
448
449 fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole>;
456
457 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()>;
461}
462
463#[async_trait::async_trait]
465impl<T> NetworkPeers for Arc<T>
466where
467 T: ?Sized,
468 T: NetworkPeers,
469{
470 fn set_authorized_peers(&self, peers: HashSet<PeerId>) {
471 T::set_authorized_peers(self, peers)
472 }
473
474 fn set_authorized_only(&self, reserved_only: bool) {
475 T::set_authorized_only(self, reserved_only)
476 }
477
478 fn add_known_address(&self, peer_id: PeerId, addr: Multiaddr) {
479 T::add_known_address(self, peer_id, addr)
480 }
481
482 fn report_peer(&self, peer_id: PeerId, cost_benefit: ReputationChange) {
483 T::report_peer(self, peer_id, cost_benefit)
484 }
485
486 fn peer_reputation(&self, peer_id: &PeerId) -> i32 {
487 T::peer_reputation(self, peer_id)
488 }
489
490 fn disconnect_peer(&self, peer_id: PeerId, protocol: ProtocolName) {
491 T::disconnect_peer(self, peer_id, protocol)
492 }
493
494 fn accept_unreserved_peers(&self) {
495 T::accept_unreserved_peers(self)
496 }
497
498 fn deny_unreserved_peers(&self) {
499 T::deny_unreserved_peers(self)
500 }
501
502 fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String> {
503 T::add_reserved_peer(self, peer)
504 }
505
506 fn remove_reserved_peer(&self, peer_id: PeerId) {
507 T::remove_reserved_peer(self, peer_id)
508 }
509
510 fn set_reserved_peers(
511 &self,
512 protocol: ProtocolName,
513 peers: HashSet<Multiaddr>,
514 ) -> Result<(), String> {
515 T::set_reserved_peers(self, protocol, peers)
516 }
517
518 fn add_peers_to_reserved_set(
519 &self,
520 protocol: ProtocolName,
521 peers: HashSet<Multiaddr>,
522 ) -> Result<(), String> {
523 T::add_peers_to_reserved_set(self, protocol, peers)
524 }
525
526 fn remove_peers_from_reserved_set(
527 &self,
528 protocol: ProtocolName,
529 peers: Vec<PeerId>,
530 ) -> Result<(), String> {
531 T::remove_peers_from_reserved_set(self, protocol, peers)
532 }
533
534 fn sync_num_connected(&self) -> usize {
535 T::sync_num_connected(self)
536 }
537
538 fn peer_role(&self, peer_id: PeerId, handshake: Vec<u8>) -> Option<ObservedRole> {
539 T::peer_role(self, peer_id, handshake)
540 }
541
542 fn reserved_peers<'life0, 'async_trait>(
543 &'life0 self,
544 ) -> Pin<Box<dyn Future<Output = Result<Vec<PeerId>, ()>> + Send + 'async_trait>>
545 where
546 'life0: 'async_trait,
547 Self: 'async_trait,
548 {
549 T::reserved_peers(self)
550 }
551}
552
553pub trait NetworkEventStream {
555 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>>;
565}
566
567impl<T> NetworkEventStream for Arc<T>
568where
569 T: ?Sized,
570 T: NetworkEventStream,
571{
572 fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
573 T::event_stream(self, name)
574 }
575}
576
577pub trait NetworkStateInfo {
579 fn external_addresses(&self) -> Vec<Multiaddr>;
581
582 fn listen_addresses(&self) -> Vec<Multiaddr>;
584
585 fn local_peer_id(&self) -> PeerId;
587}
588
589impl<T> NetworkStateInfo for Arc<T>
590where
591 T: ?Sized,
592 T: NetworkStateInfo,
593{
594 fn external_addresses(&self) -> Vec<Multiaddr> {
595 T::external_addresses(self)
596 }
597
598 fn listen_addresses(&self) -> Vec<Multiaddr> {
599 T::listen_addresses(self)
600 }
601
602 fn local_peer_id(&self) -> PeerId {
603 T::local_peer_id(self)
604 }
605}
606
607pub trait NotificationSenderReady {
609 fn send(&mut self, notification: Vec<u8>) -> Result<(), NotificationSenderError>;
613}
614
615#[async_trait::async_trait]
617pub trait NotificationSender: Send + Sync + 'static {
618 async fn ready(&self)
621 -> Result<Box<dyn NotificationSenderReady + '_>, NotificationSenderError>;
622}
623
624#[derive(Debug, thiserror::Error)]
626pub enum NotificationSenderError {
627 #[error("The notification receiver has been closed")]
634 Closed,
635 #[error("Protocol name hasn't been registered")]
637 BadProtocol,
638}
639
640#[async_trait::async_trait]
642pub trait NetworkRequest {
643 async fn request(
661 &self,
662 target: PeerId,
663 protocol: ProtocolName,
664 request: Vec<u8>,
665 fallback_request: Option<(Vec<u8>, ProtocolName)>,
666 connect: IfDisconnected,
667 ) -> Result<(Vec<u8>, ProtocolName), RequestFailure>;
668
669 fn start_request(
680 &self,
681 target: PeerId,
682 protocol: ProtocolName,
683 request: Vec<u8>,
684 fallback_request: Option<(Vec<u8>, ProtocolName)>,
685 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
686 connect: IfDisconnected,
687 );
688}
689
690impl<T> NetworkRequest for Arc<T>
692where
693 T: ?Sized,
694 T: NetworkRequest,
695{
696 fn request<'life0, 'async_trait>(
697 &'life0 self,
698 target: PeerId,
699 protocol: ProtocolName,
700 request: Vec<u8>,
701 fallback_request: Option<(Vec<u8>, ProtocolName)>,
702 connect: IfDisconnected,
703 ) -> Pin<
704 Box<
705 dyn Future<Output = Result<(Vec<u8>, ProtocolName), RequestFailure>>
706 + Send
707 + 'async_trait,
708 >,
709 >
710 where
711 'life0: 'async_trait,
712 Self: 'async_trait,
713 {
714 T::request(self, target, protocol, request, fallback_request, connect)
715 }
716
717 fn start_request(
718 &self,
719 target: PeerId,
720 protocol: ProtocolName,
721 request: Vec<u8>,
722 fallback_request: Option<(Vec<u8>, ProtocolName)>,
723 tx: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
724 connect: IfDisconnected,
725 ) {
726 T::start_request(self, target, protocol, request, fallback_request, tx, connect)
727 }
728}
729
730pub trait NetworkBlock<BlockHash, BlockNumber> {
732 fn announce_block(&self, hash: BlockHash, data: Option<Vec<u8>>);
737
738 fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber);
740}
741
742impl<T, BlockHash, BlockNumber> NetworkBlock<BlockHash, BlockNumber> for Arc<T>
743where
744 T: ?Sized,
745 T: NetworkBlock<BlockHash, BlockNumber>,
746{
747 fn announce_block(&self, hash: BlockHash, data: Option<Vec<u8>>) {
748 T::announce_block(self, hash, data)
749 }
750
751 fn new_best_block_imported(&self, hash: BlockHash, number: BlockNumber) {
752 T::new_best_block_imported(self, hash, number)
753 }
754}
755
756#[derive(Debug, PartialEq, Eq)]
758pub enum ValidationResult {
759 Accept,
761
762 Reject,
764}
765
766#[derive(Debug, Copy, Clone, PartialEq, Eq)]
768pub enum Direction {
769 Inbound,
771
772 Outbound,
774}
775
776impl From<litep2p::protocol::notification::Direction> for Direction {
777 fn from(direction: litep2p::protocol::notification::Direction) -> Self {
778 match direction {
779 litep2p::protocol::notification::Direction::Inbound => Direction::Inbound,
780 litep2p::protocol::notification::Direction::Outbound => Direction::Outbound,
781 }
782 }
783}
784
785impl Direction {
786 pub fn is_inbound(&self) -> bool {
788 std::matches!(self, Direction::Inbound)
789 }
790}
791
792#[derive(Debug)]
794pub enum NotificationEvent {
795 ValidateInboundSubstream {
797 peer: PeerId,
799
800 handshake: Vec<u8>,
802
803 result_tx: tokio::sync::oneshot::Sender<ValidationResult>,
805 },
806
807 NotificationStreamOpened {
810 peer: PeerId,
812
813 direction: Direction,
815
816 handshake: Vec<u8>,
818
819 negotiated_fallback: Option<ProtocolName>,
821 },
822
823 NotificationStreamClosed {
825 peer: PeerId,
827 },
828
829 NotificationReceived {
831 peer: PeerId,
833
834 notification: Vec<u8>,
836 },
837}
838
839#[async_trait::async_trait]
879pub trait NotificationService: Debug + Send {
880 async fn open_substream(&mut self, peer: PeerId) -> Result<(), ()>;
887
888 async fn close_substream(&mut self, peer: PeerId) -> Result<(), ()>;
891
892 fn send_sync_notification(&mut self, peer: &PeerId, notification: Vec<u8>);
894
895 async fn send_async_notification(
899 &mut self,
900 peer: &PeerId,
901 notification: Vec<u8>,
902 ) -> Result<(), error::Error>;
903
904 async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
906
907 fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()>;
913
914 async fn next_event(&mut self) -> Option<NotificationEvent>;
916
917 fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()>;
920
921 fn protocol(&self) -> &ProtocolName;
923
924 fn message_sink(&self, peer: &PeerId) -> Option<Box<dyn MessageSink>>;
926}
927
928#[async_trait::async_trait]
939pub trait MessageSink: Send + Sync {
940 fn send_sync_notification(&self, notification: Vec<u8>);
942
943 async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error>;
948}
949
950pub trait BandwidthSink: Send + Sync {
952 fn total_inbound(&self) -> u64;
954
955 fn total_outbound(&self) -> u64;
957}