1use crate::{
44 log,
45 platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49 borrow::ToOwned as _,
50 boxed::Box,
51 collections::BTreeMap,
52 format,
53 string::{String, ToString as _},
54 sync::Arc,
55 vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, num::NonZeroUsize, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65 header,
66 informant::{BytesDisplay, HashDisplay},
67 libp2p::{
68 connection,
69 multiaddr::{self, Multiaddr},
70 peer_id,
71 },
72 network::{basic_peering_strategy, bitswap_peering_strategy, codec, service},
73};
74
75pub use codec::{AffinityFilter, CallProofRequestConfig, Role};
76use service::SendTopicAffinityError;
77pub use service::{
78 ChainId, EncodedMerkleProof, PeerId, QueueNotificationError, SendBitswapMessageError,
79};
80
81#[derive(Debug, Clone)]
83pub struct StatementProtocolConfig {
84 max_seen_statements: NonZeroUsize,
86 false_positive_rate: f64,
87 bloom_seed: u128,
88 affinity_update_interval: Duration,
89}
90
91impl StatementProtocolConfig {
92 pub fn new(
93 max_seen_statements: NonZeroUsize,
94 false_positive_rate: f64,
95 bloom_seed: u128,
96 affinity_update_interval: Duration,
97 ) -> Self {
98 assert!(
99 false_positive_rate.is_finite()
100 && false_positive_rate > 0.0
101 && false_positive_rate < 1.0
102 );
103 assert!(!affinity_update_interval.is_zero());
104 StatementProtocolConfig {
105 max_seen_statements,
106 false_positive_rate,
107 bloom_seed,
108 affinity_update_interval,
109 }
110 }
111
112 pub fn max_seen_statements(&self) -> NonZeroUsize {
113 self.max_seen_statements
114 }
115
116 pub fn false_positive_rate(&self) -> f64 {
117 self.false_positive_rate
118 }
119
120 pub fn bloom_seed(&self) -> u128 {
121 self.bloom_seed
122 }
123
124 pub fn affinity_update_interval(&self) -> Duration {
125 self.affinity_update_interval
126 }
127}
128
129mod tasks;
130
131pub struct Config<TPlat> {
133 pub platform: TPlat,
135
136 pub identify_agent_version: String,
138
139 pub chains_capacity: usize,
141
142 pub connections_open_pool_size: u32,
146
147 pub connections_open_pool_restore_delay: Duration,
152}
153
154pub struct ConfigChain {
160 pub log_name: String,
162
163 pub num_out_slots: usize,
166
167 pub genesis_block_hash: [u8; 32],
173
174 pub best_block: (u64, [u8; 32]),
177
178 pub fork_id: Option<String>,
181
182 pub block_number_bytes: usize,
184
185 pub grandpa_protocol_finalized_block_height: Option<u64>,
188
189 pub statement_protocol_config: Option<StatementProtocolConfig>,
191}
192
193pub struct NetworkService<TPlat: PlatformRef> {
194 messages_tx: async_channel::Sender<ToBackground<TPlat>>,
196
197 platform: TPlat,
199}
200
201impl<TPlat: PlatformRef> NetworkService<TPlat> {
202 pub fn new(config: Config<TPlat>) -> Arc<Self> {
204 let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
205
206 let network = service::ChainNetwork::new(service::Config {
207 chains_capacity: config.chains_capacity,
208 connections_capacity: 32,
209 handshake_timeout: Duration::from_secs(8),
210 randomness_seed: {
211 let mut seed = [0; 32];
212 config.platform.fill_random_bytes(&mut seed);
213 seed
214 },
215 });
216
217 let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
219 let task = Box::pin(background_task(BackgroundTask {
220 randomness: rand_chacha::ChaCha20Rng::from_seed({
221 let mut seed = [0; 32];
222 config.platform.fill_random_bytes(&mut seed);
223 seed
224 }),
225 identify_agent_version: config.identify_agent_version,
226 tasks_messages_tx,
227 tasks_messages_rx: Box::pin(tasks_messages_rx),
228 peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
229 basic_peering_strategy::Config {
230 randomness_seed: {
231 let mut seed = [0; 32];
232 config.platform.fill_random_bytes(&mut seed);
233 seed
234 },
235 peers_capacity: 50, chains_capacity: config.chains_capacity,
237 },
238 ),
239 bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy::new(
240 bitswap_peering_strategy::Config {
241 randomness_seed: {
242 let mut seed = [0; 32];
243 config.platform.fill_random_bytes(&mut seed);
244 seed
245 },
246 peers_capacity: 50, },
248 ),
249 network,
250 connections_open_pool_size: config.connections_open_pool_size,
251 connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
252 num_recent_connection_opening: 0,
253 next_recent_connection_restore: None,
254 platform: config.platform.clone(),
255 open_gossip_links: BTreeMap::new(),
256 v2_statement_peers: HashMap::with_capacity_and_hasher(4, Default::default()),
257 current_affinity_filter: HashMap::with_capacity_and_hasher(4, Default::default()),
258 event_pending_send: None,
259 event_senders: either::Left(Vec::new()),
260 pending_new_subscriptions: Vec::new(),
261 bitswap_event_pending_send: None,
262 bitswap_event_senders: either::Left(Vec::new()),
263 pending_new_bitswap_subscriptions: Vec::new(),
264 important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
265 main_messages_rx: Box::pin(main_messages_rx),
266 messages_rx: stream::SelectAll::new(),
267 blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
268 grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
269 storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
270 call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
271 child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
272 chains_by_next_discovery: BTreeMap::new(),
273 }));
274
275 config.platform.spawn_task("network-service".into(), {
276 let platform = config.platform.clone();
277 async move {
278 task.await;
279 log!(&platform, Debug, "network", "shutdown");
280 }
281 });
282
283 Arc::new(NetworkService {
284 messages_tx: main_messages_tx,
285 platform: config.platform,
286 })
287 }
288
289 pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
295 let (messages_tx, messages_rx) = async_channel::bounded(32);
296
297 self.platform.spawn_task("add-chain-message-send".into(), {
299 let config = service::ChainConfig {
300 grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
301 |commit_finalized_height| service::GrandpaState {
302 commit_finalized_height,
303 round_number: 1,
304 set_id: 0,
305 },
306 ),
307 enable_statement_protocol: config.statement_protocol_config.is_some(),
308 fork_id: config.fork_id.clone(),
309 block_number_bytes: config.block_number_bytes,
310 best_hash: config.best_block.1,
311 best_number: config.best_block.0,
312 genesis_hash: config.genesis_block_hash,
313 role: Role::Light,
314 allow_inbound_block_requests: false,
315 user_data: Chain {
316 log_name: config.log_name,
317 block_number_bytes: config.block_number_bytes,
318 num_out_slots: config.num_out_slots,
319 num_references: NonZero::<usize>::new(1).unwrap(),
320 next_discovery_period: Duration::from_secs(2),
321 next_discovery_when: self.platform.now(),
322 },
323 };
324
325 let messages_tx = self.messages_tx.clone();
326 async move {
327 let _ = messages_tx
328 .send(ToBackground::AddChain {
329 messages_rx,
330 config,
331 })
332 .await;
333 }
334 });
335
336 Arc::new(NetworkServiceChain {
337 _keep_alive_messages_tx: self.messages_tx.clone(),
338 messages_tx,
339 marker: core::marker::PhantomData,
340 })
341 }
342}
343
344pub struct NetworkServiceChain<TPlat: PlatformRef> {
345 _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
348
349 messages_tx: async_channel::Sender<ToBackgroundChain>,
351
352 marker: core::marker::PhantomData<TPlat>,
354}
355
356#[derive(Debug, Copy, Clone, PartialEq, Eq)]
358pub enum BanSeverity {
359 Low,
360 High,
361}
362
363impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
364 pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
381 let (tx, rx) = async_channel::bounded(128);
382
383 self.messages_tx
384 .send(ToBackgroundChain::Subscribe { sender: tx })
385 .await
386 .unwrap();
387
388 rx
389 }
390
391 pub async fn subscribe_bitswap(&self) -> async_channel::Receiver<BitswapEvent> {
406 let (tx, rx) = async_channel::bounded(128);
407
408 self.messages_tx
409 .send(ToBackgroundChain::SubscribeBitswap { sender: tx })
410 .await
411 .unwrap();
412
413 rx
414 }
415
416 pub async fn ban_and_disconnect(
428 &self,
429 peer_id: PeerId,
430 severity: BanSeverity,
431 reason: &'static str,
432 ) {
433 let _ = self
434 .messages_tx
435 .send(ToBackgroundChain::DisconnectAndBan {
436 peer_id,
437 severity,
438 reason,
439 })
440 .await;
441 }
442
443 pub async fn blocks_request(
446 self: Arc<Self>,
447 target: PeerId,
448 config: codec::BlocksRequestConfig,
449 timeout: Duration,
450 ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
451 let (tx, rx) = oneshot::channel();
452
453 self.messages_tx
454 .send(ToBackgroundChain::StartBlocksRequest {
455 target: target.clone(),
456 config,
457 timeout,
458 result: tx,
459 })
460 .await
461 .unwrap();
462
463 rx.await.unwrap()
464 }
465
466 pub async fn grandpa_warp_sync_request(
469 self: Arc<Self>,
470 target: PeerId,
471 begin_hash: [u8; 32],
472 timeout: Duration,
473 ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
474 let (tx, rx) = oneshot::channel();
475
476 self.messages_tx
477 .send(ToBackgroundChain::StartWarpSyncRequest {
478 target: target.clone(),
479 begin_hash,
480 timeout,
481 result: tx,
482 })
483 .await
484 .unwrap();
485
486 rx.await.unwrap()
487 }
488
489 pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
490 self.messages_tx
491 .send(ToBackgroundChain::SetLocalBestBlock {
492 best_hash,
493 best_number,
494 })
495 .await
496 .unwrap();
497 }
498
499 pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
500 self.messages_tx
501 .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
502 .await
503 .unwrap();
504 }
505
506 pub async fn storage_proof_request(
509 self: Arc<Self>,
510 target: PeerId, config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
512 timeout: Duration,
513 ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
514 let (tx, rx) = oneshot::channel();
515
516 self.messages_tx
517 .send(ToBackgroundChain::StartStorageProofRequest {
518 target: target.clone(),
519 config: codec::StorageProofRequestConfig {
520 block_hash: config.block_hash,
521 keys: config
522 .keys
523 .map(|key| key.as_ref().to_vec()) .collect::<Vec<_>>()
525 .into_iter(),
526 },
527 timeout,
528 result: tx,
529 })
530 .await
531 .unwrap();
532
533 rx.await.unwrap()
534 }
535
536 pub async fn call_proof_request(
541 self: Arc<Self>,
542 target: PeerId, config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
544 timeout: Duration,
545 ) -> Result<EncodedMerkleProof, CallProofRequestError> {
546 let (tx, rx) = oneshot::channel();
547
548 self.messages_tx
549 .send(ToBackgroundChain::StartCallProofRequest {
550 target: target.clone(),
551 config: codec::CallProofRequestConfig {
552 block_hash: config.block_hash,
553 method: config.method.into_owned().into(),
554 parameter_vectored: config
555 .parameter_vectored
556 .map(|v| v.as_ref().to_vec()) .collect::<Vec<_>>()
558 .into_iter(),
559 },
560 timeout,
561 result: tx,
562 })
563 .await
564 .unwrap();
565
566 rx.await.unwrap()
567 }
568
569 pub async fn child_storage_proof_request(
571 self: Arc<Self>,
572 target: PeerId,
573 config: codec::ChildStorageProofRequestConfig<
574 impl AsRef<[u8]> + Clone,
575 impl Iterator<Item = impl AsRef<[u8]> + Clone>,
576 >,
577 timeout: Duration,
578 ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
579 let (tx, rx) = oneshot::channel();
580
581 self.messages_tx
582 .send(ToBackgroundChain::StartChildStorageProofRequest {
583 target: target.clone(),
584 config: ChildStorageProofRequestConfigOwned {
585 block_hash: config.block_hash,
586 child_trie: config.child_trie.as_ref().to_vec(),
587 keys: config
588 .keys
589 .map(|key| key.as_ref().to_vec())
590 .collect::<Vec<_>>(),
591 },
592 timeout,
593 result: tx,
594 })
595 .await
596 .unwrap();
597
598 rx.await.unwrap()
599 }
600
601 pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
611 let (tx, rx) = oneshot::channel();
612
613 self.messages_tx
614 .send(ToBackgroundChain::AnnounceTransaction {
615 transaction: transaction.to_vec(), result: tx,
617 })
618 .await
619 .unwrap();
620
621 rx.await.unwrap()
622 }
623
624 pub async fn send_block_announce(
626 self: Arc<Self>,
627 target: &PeerId,
628 scale_encoded_header: &[u8],
629 is_best: bool,
630 ) -> Result<(), QueueNotificationError> {
631 let (tx, rx) = oneshot::channel();
632
633 self.messages_tx
634 .send(ToBackgroundChain::SendBlockAnnounce {
635 target: target.clone(), scale_encoded_header: scale_encoded_header.to_vec(), is_best,
638 result: tx,
639 })
640 .await
641 .unwrap();
642
643 rx.await.unwrap()
644 }
645
646 pub async fn send_bitswap_message(
648 &self,
649 target: PeerId,
650 message: Vec<u8>,
651 ) -> Result<(), SendBitswapMessageError> {
652 let (tx, rx) = oneshot::channel();
653
654 self.messages_tx
655 .send(ToBackgroundChain::SendBitswapMessage {
656 target,
657 message,
658 result: tx,
659 })
660 .await
661 .unwrap();
662
663 rx.await.unwrap()
664 }
665
666 pub async fn broadcast_bitswap_message(
674 &self,
675 message: Vec<u8>,
676 ) -> Result<Vec<PeerId>, SendBitswapMessageError> {
677 let (tx, rx) = oneshot::channel();
678
679 self.messages_tx
680 .send(ToBackgroundChain::BroadcastBitswapMessage {
681 message,
682 result: tx,
683 })
684 .await
685 .unwrap();
686
687 rx.await.unwrap()
688 }
689
690 pub async fn broadcast_statement(
692 self: Arc<Self>,
693 statement: Vec<u8>,
694 ) -> BroadcastStatementResult {
695 let (tx, rx) = oneshot::channel();
696
697 self.messages_tx
698 .send(ToBackgroundChain::BroadcastStatement {
699 statement,
700 result: tx,
701 })
702 .await
703 .unwrap();
704
705 rx.await.unwrap()
706 }
707
708 pub async fn update_topic_affinity(&self, filter: AffinityFilter) {
709 self.messages_tx
710 .send(ToBackgroundChain::UpdateTopicAffinity { filter })
711 .await
712 .unwrap();
713 }
714
715 pub async fn discover(
721 &self,
722 list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
723 important_nodes: bool,
724 ) {
725 self.messages_tx
726 .send(ToBackgroundChain::Discover {
727 list: list
729 .into_iter()
730 .map(|(peer_id, addrs)| {
731 (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
732 })
733 .collect::<Vec<_>>()
734 .into_iter(),
735 important_nodes,
736 })
737 .await
738 .unwrap();
739 }
740
741 pub async fn discovered_nodes(
748 &self,
749 ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
750 let (tx, rx) = oneshot::channel();
751
752 self.messages_tx
753 .send(ToBackgroundChain::DiscoveredNodes { result: tx })
754 .await
755 .unwrap();
756
757 rx.await
758 .unwrap()
759 .into_iter()
760 .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
761 }
762
763 pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
766 let (tx, rx) = oneshot::channel();
767 self.messages_tx
768 .send(ToBackgroundChain::PeersList { result: tx })
769 .await
770 .unwrap();
771 rx.await.unwrap().into_iter()
772 }
773}
774
775#[derive(Debug, Clone)]
776pub struct BroadcastStatementResult {
777 pub sent: usize,
778 pub total: usize,
779}
780
781#[derive(Debug, Clone)]
783pub enum Event {
784 Connected {
785 peer_id: PeerId,
786 role: Role,
787 best_block_number: u64,
788 best_block_hash: [u8; 32],
789 },
790 Disconnected {
791 peer_id: PeerId,
792 },
793 BlockAnnounce {
794 peer_id: PeerId,
795 announce: service::EncodedBlockAnnounce,
796 },
797 GrandpaNeighborPacket {
798 peer_id: PeerId,
799 finalized_block_height: u64,
800 },
801 GrandpaCommitMessage {
803 peer_id: PeerId,
804 message: service::EncodedGrandpaCommitMessage,
805 },
806 StatementsNotification {
808 peer_id: PeerId,
809 statements: Vec<([u8; 32], codec::Statement)>,
810 },
811}
812
813#[derive(Debug, Clone)]
817pub enum BitswapEvent {
818 BitswapMessage {
819 peer_id: PeerId,
820 message: service::EncodedBitswapMessage,
821 },
822}
823
824#[derive(Debug, derive_more::Display, derive_more::Error)]
826pub enum BlocksRequestError {
827 NoConnection,
829 #[display("{_0}")]
831 Request(service::BlocksRequestError),
832}
833
834#[derive(Debug, derive_more::Display, derive_more::Error)]
836pub enum WarpSyncRequestError {
837 NoConnection,
839 #[display("{_0}")]
841 Request(service::GrandpaWarpSyncRequestError),
842}
843
844#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
846pub enum StorageProofRequestError {
847 NoConnection,
849 RequestTooLarge,
851 #[display("{_0}")]
853 Request(service::StorageProofRequestError),
854}
855
856#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
858pub enum CallProofRequestError {
859 NoConnection,
861 RequestTooLarge,
863 #[display("{_0}")]
865 Request(service::CallProofRequestError),
866}
867
868impl CallProofRequestError {
869 pub fn is_network_problem(&self) -> bool {
872 match self {
873 CallProofRequestError::Request(err) => err.is_network_problem(),
874 CallProofRequestError::RequestTooLarge => false,
875 CallProofRequestError::NoConnection => true,
876 }
877 }
878}
879
880#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
882pub enum ChildStorageProofRequestError {
883 NoConnection,
885 RequestTooLarge,
887 #[display("{_0}")]
889 Request(service::StorageProofRequestError),
890}
891
892impl ChildStorageProofRequestError {
893 pub fn is_network_problem(&self) -> bool {
896 match self {
897 ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
898 ChildStorageProofRequestError::RequestTooLarge => false,
899 ChildStorageProofRequestError::NoConnection => true,
900 }
901 }
902}
903
904struct ChildStorageProofRequestConfigOwned {
906 block_hash: [u8; 32],
907 child_trie: Vec<u8>,
908 keys: Vec<Vec<u8>>,
909}
910
911enum ToBackground<TPlat: PlatformRef> {
912 AddChain {
913 messages_rx: async_channel::Receiver<ToBackgroundChain>,
914 config: service::ChainConfig<Chain<TPlat>>,
915 },
916}
917
918enum ToBackgroundChain {
919 RemoveChain,
920 Subscribe {
921 sender: async_channel::Sender<Event>,
922 },
923 SubscribeBitswap {
924 sender: async_channel::Sender<BitswapEvent>,
925 },
926 DisconnectAndBan {
927 peer_id: PeerId,
928 severity: BanSeverity,
929 reason: &'static str,
930 },
931 StartBlocksRequest {
933 target: PeerId, config: codec::BlocksRequestConfig,
935 timeout: Duration,
936 result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
937 },
938 StartWarpSyncRequest {
940 target: PeerId,
941 begin_hash: [u8; 32],
942 timeout: Duration,
943 result:
944 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
945 },
946 StartStorageProofRequest {
948 target: PeerId,
949 config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
950 timeout: Duration,
951 result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
952 },
953 StartCallProofRequest {
955 target: PeerId, config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
957 timeout: Duration,
958 result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
959 },
960 StartChildStorageProofRequest {
962 target: PeerId,
963 config: ChildStorageProofRequestConfigOwned,
964 timeout: Duration,
965 result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
966 },
967 SetLocalBestBlock {
968 best_hash: [u8; 32],
969 best_number: u64,
970 },
971 SetLocalGrandpaState {
972 grandpa_state: service::GrandpaState,
973 },
974 AnnounceTransaction {
975 transaction: Vec<u8>,
976 result: oneshot::Sender<Vec<PeerId>>,
977 },
978 SendBlockAnnounce {
979 target: PeerId,
980 scale_encoded_header: Vec<u8>,
981 is_best: bool,
982 result: oneshot::Sender<Result<(), QueueNotificationError>>,
983 },
984 SendBitswapMessage {
985 target: PeerId,
986 message: Vec<u8>,
987 result: oneshot::Sender<Result<(), SendBitswapMessageError>>,
988 },
989 BroadcastBitswapMessage {
990 message: Vec<u8>,
991 result: oneshot::Sender<Result<Vec<PeerId>, SendBitswapMessageError>>,
992 },
993 BroadcastStatement {
994 statement: Vec<u8>,
995 result: oneshot::Sender<BroadcastStatementResult>,
996 },
997 UpdateTopicAffinity {
998 filter: AffinityFilter,
999 },
1000 Discover {
1001 list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
1002 important_nodes: bool,
1003 },
1004 DiscoveredNodes {
1005 result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
1006 },
1007 PeersList {
1008 result: oneshot::Sender<Vec<PeerId>>,
1009 },
1010}
1011
1012struct BackgroundTask<TPlat: PlatformRef> {
1013 platform: TPlat,
1015
1016 randomness: rand_chacha::ChaCha20Rng,
1018
1019 identify_agent_version: String,
1021
1022 tasks_messages_tx:
1024 async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
1025
1026 tasks_messages_rx: Pin<
1028 Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
1029 >,
1030
1031 network: service::ChainNetwork<
1033 Chain<TPlat>,
1034 async_channel::Sender<service::CoordinatorToConnection>,
1035 TPlat::Instant,
1036 >,
1037
1038 peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
1040
1041 bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy<TPlat::Instant>,
1043
1044 connections_open_pool_size: u32,
1046
1047 connections_open_pool_restore_delay: Duration,
1049
1050 num_recent_connection_opening: u32,
1054
1055 next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
1057
1058 open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
1061
1062 v2_statement_peers: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1064
1065 current_affinity_filter: HashMap<ChainId, AffinityFilter, fnv::FnvBuildHasher>,
1067
1068 important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
1071
1072 event_pending_send: Option<(ChainId, Event)>,
1074
1075 bitswap_event_pending_send: Option<BitswapEvent>,
1077
1078 event_senders: either::Either<
1084 Vec<(ChainId, async_channel::Sender<Event>)>,
1085 Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
1086 >,
1087
1088 pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
1091
1092 bitswap_event_senders: either::Either<
1102 Vec<async_channel::Sender<BitswapEvent>>,
1103 Pin<Box<dyn Future<Output = Vec<async_channel::Sender<BitswapEvent>>> + Send>>,
1104 >,
1105
1106 pending_new_bitswap_subscriptions: Vec<async_channel::Sender<BitswapEvent>>,
1110
1111 main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
1112
1113 messages_rx:
1114 stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
1115
1116 blocks_requests: HashMap<
1117 service::SubstreamId,
1118 oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
1119 fnv::FnvBuildHasher,
1120 >,
1121
1122 grandpa_warp_sync_requests: HashMap<
1123 service::SubstreamId,
1124 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
1125 fnv::FnvBuildHasher,
1126 >,
1127
1128 storage_proof_requests: HashMap<
1129 service::SubstreamId,
1130 oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
1131 fnv::FnvBuildHasher,
1132 >,
1133
1134 call_proof_requests: HashMap<
1135 service::SubstreamId,
1136 oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
1137 fnv::FnvBuildHasher,
1138 >,
1139
1140 child_storage_proof_requests: HashMap<
1141 service::SubstreamId,
1142 oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
1143 fnv::FnvBuildHasher,
1144 >,
1145
1146 chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
1148}
1149
1150struct Chain<TPlat: PlatformRef> {
1151 log_name: String,
1152
1153 num_references: NonZero<usize>,
1155
1156 block_number_bytes: usize,
1159
1160 num_out_slots: usize,
1162
1163 next_discovery_when: TPlat::Instant,
1165
1166 next_discovery_period: Duration,
1169}
1170
1171#[derive(Clone)]
1172struct OpenGossipLinkState {
1173 role: Role,
1174 best_block_number: u64,
1175 best_block_hash: [u8; 32],
1176 finalized_block_height: Option<u64>,
1178}
1179
1180async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
1181 loop {
1182 futures_lite::future::yield_now().await;
1184
1185 enum WakeUpReason<TPlat: PlatformRef> {
1186 ForegroundClosed,
1187 Message(ToBackground<TPlat>),
1188 MessageForChain(ChainId, ToBackgroundChain),
1189 NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
1190 CanAssignSlot(PeerId, ChainId),
1191 CanAssignBitswapSlot(PeerId),
1192 NextRecentConnectionRestore,
1193 CanStartConnect(PeerId),
1194 CanOpenGossip(PeerId, ChainId),
1195 CanOpenBitswap(PeerId),
1196 MessageFromConnection {
1197 connection_id: service::ConnectionId,
1198 message: service::ConnectionToCoordinator,
1199 },
1200 MessageToConnection {
1201 connection_id: service::ConnectionId,
1202 message: service::CoordinatorToConnection,
1203 },
1204 EventSendersReady,
1205 BitswapEventSendersReady,
1206 StartDiscovery(ChainId),
1207 }
1208
1209 let wake_up_reason = {
1210 let message_received = async {
1211 task.main_messages_rx
1212 .next()
1213 .await
1214 .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
1215 };
1216 let message_for_chain_received = async {
1217 let Some((chain_id, message)) = task.messages_rx.next().await else {
1222 future::pending().await
1223 };
1224 WakeUpReason::MessageForChain(chain_id, message)
1225 };
1226 let message_from_task_received = async {
1227 let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
1228 WakeUpReason::MessageFromConnection {
1229 connection_id,
1230 message,
1231 }
1232 };
1233 let service_event = async {
1234 if let Some(event) = (task.event_pending_send.is_none()
1235 && task.bitswap_event_pending_send.is_none()
1236 && task.pending_new_subscriptions.is_empty()
1237 && task.pending_new_bitswap_subscriptions.is_empty())
1238 .then(|| task.network.next_event())
1239 .flatten()
1240 {
1241 WakeUpReason::NetworkEvent(event)
1242 } else if let Some(start_connect) = {
1243 let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1244 .then(|| {
1245 task.network
1246 .unconnected_desired()
1247 .choose(&mut task.randomness)
1248 .cloned()
1249 })
1250 .flatten();
1251 x
1252 } {
1253 WakeUpReason::CanStartConnect(start_connect)
1254 } else if let Some((peer_id, chain_id)) = {
1255 let x = task
1256 .network
1257 .connected_unopened_gossip_desired()
1258 .choose(&mut task.randomness)
1259 .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1260 x
1261 } {
1262 WakeUpReason::CanOpenGossip(peer_id, chain_id)
1263 } else if let Some(peer_id) = {
1264 let x = task
1265 .network
1266 .connected_unopened_bitswap_desired()
1267 .choose(&mut task.randomness)
1268 .cloned();
1269 x
1270 } {
1271 WakeUpReason::CanOpenBitswap(peer_id)
1272 } else if let Some((connection_id, message)) =
1273 task.network.pull_message_to_connection()
1274 {
1275 WakeUpReason::MessageToConnection {
1276 connection_id,
1277 message,
1278 }
1279 } else {
1280 'search: loop {
1281 let mut earlier_unban = None;
1282
1283 for chain_id in task.network.chains().collect::<Vec<_>>() {
1284 if task.network.gossip_desired_num(
1285 chain_id,
1286 service::GossipKind::ConsensusTransactions,
1287 ) >= task.network[chain_id].num_out_slots
1288 {
1289 continue;
1290 }
1291
1292 match task
1293 .peering_strategy
1294 .pick_assignable_peer(&chain_id, &task.platform.now())
1295 {
1296 basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1297 break 'search WakeUpReason::CanAssignSlot(
1298 peer_id.clone(),
1299 chain_id,
1300 );
1301 }
1302 basic_peering_strategy::AssignablePeer::AllPeersBanned {
1303 next_unban,
1304 } => {
1305 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1306 earlier_unban = Some(next_unban.clone());
1307 }
1308 }
1309 basic_peering_strategy::AssignablePeer::NoPeer => continue,
1310 }
1311 }
1312
1313 match task
1314 .bitswap_peering_strategy
1315 .pick_assignable_peer(&task.platform.now())
1316 {
1317 bitswap_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1318 break 'search WakeUpReason::CanAssignBitswapSlot(peer_id.clone());
1319 }
1320 bitswap_peering_strategy::AssignablePeer::AllPeersBanned {
1321 next_unban,
1322 } => {
1323 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1324 earlier_unban = Some(next_unban.clone());
1325 }
1326 }
1327 bitswap_peering_strategy::AssignablePeer::NoPeer => {}
1328 }
1329
1330 if let Some(earlier_unban) = earlier_unban {
1331 task.platform.sleep_until(earlier_unban).await;
1332 } else {
1333 future::pending::<()>().await;
1334 }
1335 }
1336 }
1337 };
1338 let next_recent_connection_restore = async {
1339 if task.num_recent_connection_opening != 0
1340 && task.next_recent_connection_restore.is_none()
1341 {
1342 task.next_recent_connection_restore = Some(Box::pin(
1343 task.platform
1344 .sleep(task.connections_open_pool_restore_delay),
1345 ));
1346 }
1347 if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1348 delay.await;
1349 task.next_recent_connection_restore = None;
1350 WakeUpReason::NextRecentConnectionRestore
1351 } else {
1352 future::pending().await
1353 }
1354 };
1355 let finished_sending_event = async {
1356 if let either::Right(event_sending_future) = &mut task.event_senders {
1357 let event_senders = event_sending_future.await;
1358 task.event_senders = either::Left(event_senders);
1359 WakeUpReason::EventSendersReady
1360 } else if task.event_pending_send.is_some()
1361 || !task.pending_new_subscriptions.is_empty()
1362 {
1363 WakeUpReason::EventSendersReady
1364 } else {
1365 future::pending().await
1366 }
1367 };
1368 let finished_sending_bitswap_event = async {
1369 if let either::Right(bitswap_event_sending_future) = &mut task.bitswap_event_senders
1370 {
1371 let bitswap_event_senders = bitswap_event_sending_future.await;
1372 task.bitswap_event_senders = either::Left(bitswap_event_senders);
1373 WakeUpReason::BitswapEventSendersReady
1374 } else if task.bitswap_event_pending_send.is_some()
1375 || !task.pending_new_bitswap_subscriptions.is_empty()
1376 {
1377 WakeUpReason::BitswapEventSendersReady
1378 } else {
1379 future::pending().await
1380 }
1381 };
1382 let start_discovery = async {
1383 let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1384 future::pending().await
1385 };
1386 next_discovery.get_mut().await;
1387 let ((_, chain_id), _) = next_discovery.remove_entry();
1388 WakeUpReason::StartDiscovery(chain_id)
1389 };
1390
1391 message_for_chain_received
1392 .or(message_received)
1393 .or(message_from_task_received)
1394 .or(service_event)
1395 .or(next_recent_connection_restore)
1396 .or(finished_sending_event)
1397 .or(finished_sending_bitswap_event)
1398 .or(start_discovery)
1399 .await
1400 };
1401
1402 match wake_up_reason {
1403 WakeUpReason::ForegroundClosed => {
1404 return;
1406 }
1407 WakeUpReason::Message(ToBackground::AddChain {
1408 messages_rx,
1409 config,
1410 }) => {
1411 let chain_id = match task.network.add_chain(config) {
1413 Ok(id) => id,
1414 Err(service::AddChainError::Duplicate { existing_identical }) => {
1415 task.network[existing_identical].num_references = task.network
1416 [existing_identical]
1417 .num_references
1418 .checked_add(1)
1419 .unwrap();
1420 existing_identical
1421 }
1422 };
1423
1424 task.chains_by_next_discovery.insert(
1425 (task.network[chain_id].next_discovery_when.clone(), chain_id),
1426 Box::pin(
1427 task.platform
1428 .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1429 ),
1430 );
1431
1432 task.messages_rx
1433 .push(Box::pin(
1434 messages_rx
1435 .map(move |msg| (chain_id, msg))
1436 .chain(stream::once(future::ready((
1437 chain_id,
1438 ToBackgroundChain::RemoveChain,
1439 )))),
1440 ) as Pin<Box<_>>);
1441
1442 log!(
1443 &task.platform,
1444 Debug,
1445 "network",
1446 "chain-added",
1447 id = task.network[chain_id].log_name
1448 );
1449 }
1450 WakeUpReason::EventSendersReady => {
1451 let either::Left(event_senders) = &mut task.event_senders else {
1455 unreachable!()
1456 };
1457
1458 if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1459 task.event_pending_send.take()
1460 {
1461 let mut event_senders = mem::take(event_senders);
1462 task.event_senders = either::Right(Box::pin(async move {
1463 for index in (0..event_senders.len()).rev() {
1466 let (event_sender_chain_id, event_sender) =
1467 event_senders.swap_remove(index);
1468 if event_sender_chain_id == event_to_dispatch_chain_id {
1469 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1470 continue;
1471 }
1472 }
1473 event_senders.push((event_sender_chain_id, event_sender));
1474 }
1475 event_senders
1476 }));
1477 } else if !task.pending_new_subscriptions.is_empty() {
1478 let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1479 let mut event_senders = mem::take(event_senders);
1480 let open_gossip_links = task.open_gossip_links.clone();
1482 task.event_senders = either::Right(Box::pin(async move {
1483 for (chain_id, new_subscription) in pending_new_subscriptions {
1484 for ((link_chain_id, peer_id), state) in &open_gossip_links {
1485 if *link_chain_id != chain_id {
1487 continue;
1488 }
1489
1490 let _ = new_subscription
1491 .send(Event::Connected {
1492 peer_id: peer_id.clone(),
1493 role: state.role,
1494 best_block_number: state.best_block_number,
1495 best_block_hash: state.best_block_hash,
1496 })
1497 .await;
1498
1499 if let Some(finalized_block_height) = state.finalized_block_height {
1500 let _ = new_subscription
1501 .send(Event::GrandpaNeighborPacket {
1502 peer_id: peer_id.clone(),
1503 finalized_block_height,
1504 })
1505 .await;
1506 }
1507 }
1508
1509 event_senders.push((chain_id, new_subscription));
1510 }
1511
1512 event_senders
1513 }));
1514 }
1515 }
1516 WakeUpReason::BitswapEventSendersReady => {
1517 let either::Left(bitswap_event_senders) = &mut task.bitswap_event_senders else {
1519 unreachable!()
1520 };
1521
1522 if let Some(event_to_dispatch) = task.bitswap_event_pending_send.take() {
1523 let mut bitswap_event_senders = mem::take(bitswap_event_senders);
1524 task.bitswap_event_senders = either::Right(Box::pin(async move {
1525 for index in (0..bitswap_event_senders.len()).rev() {
1528 let event_sender = bitswap_event_senders.swap_remove(index);
1529 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1530 continue;
1531 }
1532 bitswap_event_senders.push(event_sender);
1533 }
1534 bitswap_event_senders
1535 }));
1536 } else if !task.pending_new_bitswap_subscriptions.is_empty() {
1537 bitswap_event_senders.append(&mut task.pending_new_bitswap_subscriptions);
1538 }
1539 }
1540 WakeUpReason::MessageFromConnection {
1541 connection_id,
1542 message,
1543 } => {
1544 task.network
1545 .inject_connection_message(connection_id, message);
1546 }
1547 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1548 if let Some(new_ref) =
1549 NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1550 {
1551 task.network[chain_id].num_references = new_ref;
1552 continue;
1553 }
1554
1555 for peer_id in task
1556 .network
1557 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1558 .cloned()
1559 .collect::<Vec<_>>()
1560 {
1561 task.network
1562 .gossip_close(
1563 chain_id,
1564 &peer_id,
1565 service::GossipKind::ConsensusTransactions,
1566 )
1567 .unwrap();
1568
1569 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1570 debug_assert!(_was_in.is_some());
1571 }
1572
1573 let _was_in = task
1574 .chains_by_next_discovery
1575 .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1576 debug_assert!(_was_in.is_some());
1577
1578 log!(
1579 &task.platform,
1580 Debug,
1581 "network",
1582 "chain-removed",
1583 id = task.network[chain_id].log_name
1584 );
1585 task.v2_statement_peers.remove(&chain_id);
1586 task.current_affinity_filter.remove(&chain_id);
1587 task.network.remove_chain(chain_id).unwrap();
1588 task.peering_strategy.remove_chain_peers(&chain_id);
1589 }
1590 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1591 task.pending_new_subscriptions.push((chain_id, sender));
1592 }
1593 WakeUpReason::MessageForChain(
1594 _chain_id,
1595 ToBackgroundChain::SubscribeBitswap { sender },
1596 ) => {
1597 task.pending_new_bitswap_subscriptions.push(sender);
1598 }
1599 WakeUpReason::MessageForChain(
1600 chain_id,
1601 ToBackgroundChain::DisconnectAndBan {
1602 peer_id,
1603 severity,
1604 reason,
1605 },
1606 ) => {
1607 let ban_duration = Duration::from_secs(match severity {
1608 BanSeverity::Low => 10,
1609 BanSeverity::High => 40,
1610 });
1611
1612 let had_slot = matches!(
1613 task.peering_strategy.unassign_slot_and_ban(
1614 &chain_id,
1615 &peer_id,
1616 task.platform.now() + ban_duration,
1617 ),
1618 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1619 );
1620
1621 if had_slot {
1622 log!(
1623 &task.platform,
1624 Debug,
1625 "network",
1626 "slot-unassigned",
1627 chain = &task.network[chain_id].log_name,
1628 peer_id,
1629 ?ban_duration,
1630 reason = "user-ban",
1631 user_reason = reason
1632 );
1633 task.network.gossip_remove_desired(
1634 chain_id,
1635 &peer_id,
1636 service::GossipKind::ConsensusTransactions,
1637 );
1638 }
1639
1640 if task.network.gossip_is_connected(
1641 chain_id,
1642 &peer_id,
1643 service::GossipKind::ConsensusTransactions,
1644 ) {
1645 let _closed_result = task.network.gossip_close(
1646 chain_id,
1647 &peer_id,
1648 service::GossipKind::ConsensusTransactions,
1649 );
1650 debug_assert!(_closed_result.is_ok());
1651
1652 log!(
1653 &task.platform,
1654 Debug,
1655 "network",
1656 "gossip-closed",
1657 chain = &task.network[chain_id].log_name,
1658 peer_id,
1659 );
1660
1661 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1662 debug_assert!(_was_in.is_some());
1663
1664 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
1665 peers.remove(&peer_id);
1666 }
1667
1668 debug_assert!(task.event_pending_send.is_none());
1669 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1670 }
1671 }
1672 WakeUpReason::MessageForChain(
1673 chain_id,
1674 ToBackgroundChain::StartBlocksRequest {
1675 target,
1676 config,
1677 timeout,
1678 result,
1679 },
1680 ) => {
1681 match &config.start {
1682 codec::BlocksRequestConfigStart::Hash(hash) => {
1683 log!(
1684 &task.platform,
1685 Debug,
1686 "network",
1687 "blocks-request-started",
1688 chain = task.network[chain_id].log_name, target,
1689 start = HashDisplay(hash),
1690 num = config.desired_count.get(),
1691 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1692 header = ?config.fields.header, body = ?config.fields.body,
1693 justifications = ?config.fields.justifications
1694 );
1695 }
1696 codec::BlocksRequestConfigStart::Number(number) => {
1697 log!(
1698 &task.platform,
1699 Debug,
1700 "network",
1701 "blocks-request-started",
1702 chain = task.network[chain_id].log_name, target, start = number,
1703 num = config.desired_count.get(),
1704 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1705 header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1706 );
1707 }
1708 }
1709
1710 match task
1711 .network
1712 .start_blocks_request(&target, chain_id, config.clone(), timeout)
1713 {
1714 Ok(substream_id) => {
1715 task.blocks_requests.insert(substream_id, result);
1716 }
1717 Err(service::StartRequestError::NoConnection) => {
1718 log!(
1719 &task.platform,
1720 Debug,
1721 "network",
1722 "blocks-request-error",
1723 chain = task.network[chain_id].log_name,
1724 target,
1725 error = "NoConnection"
1726 );
1727 let _ = result.send(Err(BlocksRequestError::NoConnection));
1728 }
1729 }
1730 }
1731 WakeUpReason::MessageForChain(
1732 chain_id,
1733 ToBackgroundChain::StartWarpSyncRequest {
1734 target,
1735 begin_hash,
1736 timeout,
1737 result,
1738 },
1739 ) => {
1740 log!(
1741 &task.platform,
1742 Debug,
1743 "network",
1744 "warp-sync-request-started",
1745 chain = task.network[chain_id].log_name,
1746 target,
1747 start = HashDisplay(&begin_hash)
1748 );
1749
1750 match task
1751 .network
1752 .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1753 {
1754 Ok(substream_id) => {
1755 task.grandpa_warp_sync_requests.insert(substream_id, result);
1756 }
1757 Err(service::StartRequestError::NoConnection) => {
1758 log!(
1759 &task.platform,
1760 Debug,
1761 "network",
1762 "warp-sync-request-error",
1763 chain = task.network[chain_id].log_name,
1764 target,
1765 error = "NoConnection"
1766 );
1767 let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1768 }
1769 }
1770 }
1771 WakeUpReason::MessageForChain(
1772 chain_id,
1773 ToBackgroundChain::StartStorageProofRequest {
1774 target,
1775 config,
1776 timeout,
1777 result,
1778 },
1779 ) => {
1780 log!(
1781 &task.platform,
1782 Debug,
1783 "network",
1784 "storage-proof-request-started",
1785 chain = task.network[chain_id].log_name,
1786 target,
1787 block_hash = HashDisplay(&config.block_hash)
1788 );
1789
1790 match task.network.start_storage_proof_request(
1791 &target,
1792 chain_id,
1793 config.clone(),
1794 timeout,
1795 ) {
1796 Ok(substream_id) => {
1797 task.storage_proof_requests.insert(substream_id, result);
1798 }
1799 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1800 log!(
1801 &task.platform,
1802 Debug,
1803 "network",
1804 "storage-proof-request-error",
1805 chain = task.network[chain_id].log_name,
1806 target,
1807 error = "NoConnection"
1808 );
1809 let _ = result.send(Err(StorageProofRequestError::NoConnection));
1810 }
1811 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1812 log!(
1813 &task.platform,
1814 Debug,
1815 "network",
1816 "storage-proof-request-error",
1817 chain = task.network[chain_id].log_name,
1818 target,
1819 error = "RequestTooLarge"
1820 );
1821 let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1822 }
1823 };
1824 }
1825 WakeUpReason::MessageForChain(
1826 chain_id,
1827 ToBackgroundChain::StartCallProofRequest {
1828 target,
1829 config,
1830 timeout,
1831 result,
1832 },
1833 ) => {
1834 log!(
1835 &task.platform,
1836 Debug,
1837 "network",
1838 "call-proof-request-started",
1839 chain = task.network[chain_id].log_name,
1840 target,
1841 block_hash = HashDisplay(&config.block_hash),
1842 function = config.method
1843 );
1844 match task.network.start_call_proof_request(
1847 &target,
1848 chain_id,
1849 config.clone(),
1850 timeout,
1851 ) {
1852 Ok(substream_id) => {
1853 task.call_proof_requests.insert(substream_id, result);
1854 }
1855 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1856 log!(
1857 &task.platform,
1858 Debug,
1859 "network",
1860 "call-proof-request-error",
1861 chain = task.network[chain_id].log_name,
1862 target,
1863 error = "NoConnection"
1864 );
1865 let _ = result.send(Err(CallProofRequestError::NoConnection));
1866 }
1867 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1868 log!(
1869 &task.platform,
1870 Debug,
1871 "network",
1872 "call-proof-request-error",
1873 chain = task.network[chain_id].log_name,
1874 target,
1875 error = "RequestTooLarge"
1876 );
1877 let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1878 }
1879 };
1880 }
1881 WakeUpReason::MessageForChain(
1882 chain_id,
1883 ToBackgroundChain::StartChildStorageProofRequest {
1884 target,
1885 config,
1886 timeout,
1887 result,
1888 },
1889 ) => {
1890 log!(
1891 &task.platform,
1892 Debug,
1893 "network",
1894 "child-storage-proof-request-started",
1895 chain = task.network[chain_id].log_name,
1896 target,
1897 block_hash = HashDisplay(&config.block_hash)
1898 );
1899
1900 match task.network.start_child_storage_proof_request(
1901 &target,
1902 chain_id,
1903 codec::ChildStorageProofRequestConfig {
1904 block_hash: config.block_hash,
1905 child_trie: &config.child_trie,
1906 keys: config.keys.iter().map(|k| k.as_slice()),
1907 },
1908 timeout,
1909 ) {
1910 Ok(substream_id) => {
1911 task.child_storage_proof_requests
1912 .insert(substream_id, result);
1913 }
1914 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1915 log!(
1916 &task.platform,
1917 Debug,
1918 "network",
1919 "child-storage-proof-request-error",
1920 chain = task.network[chain_id].log_name,
1921 target,
1922 error = "NoConnection"
1923 );
1924 let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1925 }
1926 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1927 log!(
1928 &task.platform,
1929 Debug,
1930 "network",
1931 "child-storage-proof-request-error",
1932 chain = task.network[chain_id].log_name,
1933 target,
1934 error = "RequestTooLarge"
1935 );
1936 let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1937 }
1938 };
1939 }
1940 WakeUpReason::MessageForChain(
1941 chain_id,
1942 ToBackgroundChain::SetLocalBestBlock {
1943 best_hash,
1944 best_number,
1945 },
1946 ) => {
1947 task.network
1948 .set_chain_local_best_block(chain_id, best_hash, best_number);
1949 }
1950 WakeUpReason::MessageForChain(
1951 chain_id,
1952 ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1953 ) => {
1954 log!(
1955 &task.platform,
1956 Debug,
1957 "network",
1958 "local-grandpa-state-announced",
1959 chain = task.network[chain_id].log_name,
1960 set_id = grandpa_state.set_id,
1961 commit_finalized_height = grandpa_state.commit_finalized_height,
1962 );
1963
1964 task.network
1967 .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1968 }
1969 WakeUpReason::MessageForChain(
1970 chain_id,
1971 ToBackgroundChain::AnnounceTransaction {
1972 transaction,
1973 result,
1974 },
1975 ) => {
1976 let peers_to_send = task
1979 .network
1980 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1981 .cloned()
1982 .collect::<Vec<_>>();
1983
1984 let mut peers_sent = Vec::with_capacity(peers_to_send.len());
1985 let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
1986 for peer in &peers_to_send {
1987 match task
1988 .network
1989 .gossip_send_transaction(peer, chain_id, &transaction)
1990 {
1991 Ok(()) => peers_sent.push(peer.to_base58()),
1992 Err(QueueNotificationError::QueueFull) => {
1993 peers_queue_full.push(peer.to_base58())
1994 }
1995 Err(QueueNotificationError::NoConnection) => unreachable!(),
1996 }
1997 }
1998
1999 log!(
2000 &task.platform,
2001 Debug,
2002 "network",
2003 "transaction-announced",
2004 chain = task.network[chain_id].log_name,
2005 transaction =
2006 hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
2007 size = transaction.len(),
2008 peers_sent = peers_sent.join(", "),
2009 peers_queue_full = peers_queue_full.join(", "),
2010 );
2011
2012 let _ = result.send(peers_to_send);
2013 }
2014 WakeUpReason::MessageForChain(
2015 chain_id,
2016 ToBackgroundChain::SendBlockAnnounce {
2017 target,
2018 scale_encoded_header,
2019 is_best,
2020 result,
2021 },
2022 ) => {
2023 let _ = result.send(task.network.gossip_send_block_announce(
2025 &target,
2026 chain_id,
2027 &scale_encoded_header,
2028 is_best,
2029 ));
2030 }
2031 WakeUpReason::MessageForChain(
2032 _chain_id,
2033 ToBackgroundChain::SendBitswapMessage {
2034 target,
2035 message,
2036 result,
2037 },
2038 ) => {
2039 let _ = result.send(task.network.bitswap_send_message(&target, message));
2040 }
2041 WakeUpReason::MessageForChain(
2042 _chain_id,
2043 ToBackgroundChain::BroadcastBitswapMessage { message, result },
2044 ) => {
2045 let peers = task
2046 .network
2047 .established_bitswap_desired()
2048 .cloned()
2049 .collect::<Vec<_>>();
2050 let results = peers
2051 .iter()
2052 .map(|peer| {
2053 (
2054 peer,
2055 task.network.bitswap_send_message(peer, message.clone()),
2056 )
2057 })
2058 .collect::<Vec<_>>(); let succeeded_peers = results
2061 .iter()
2062 .filter_map(|(peer, r)| r.is_ok().then(|| (*peer).clone()))
2063 .collect::<Vec<_>>();
2064
2065 let r = if !succeeded_peers.is_empty() {
2067 Ok(succeeded_peers)
2068 } else if results
2069 .iter()
2070 .any(|(_peer, r)| matches!(r, Err(SendBitswapMessageError::QueueFull)))
2071 {
2072 Err(SendBitswapMessageError::QueueFull)
2075 } else {
2076 Err(SendBitswapMessageError::NoConnection)
2079 };
2080
2081 let _ = result.send(r);
2082 }
2083 WakeUpReason::MessageForChain(
2084 chain_id,
2085 ToBackgroundChain::BroadcastStatement { statement, result },
2086 ) => {
2087 let peers_to_send = task
2088 .network
2089 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2090 .cloned()
2091 .collect::<Vec<_>>();
2092
2093 let total = peers_to_send.len();
2094 let mut sent = 0;
2095 for peer in &peers_to_send {
2096 if task
2097 .network
2098 .gossip_send_statement(peer, chain_id, statement.clone())
2099 .is_ok()
2100 {
2101 sent += 1;
2102 }
2103 }
2104
2105 log!(
2106 &task.platform,
2107 Debug,
2108 "network",
2109 "statement-broadcast",
2110 chain = task.network[chain_id].log_name,
2111 sent,
2112 total,
2113 );
2114
2115 let _ = result.send(BroadcastStatementResult { sent, total });
2116 }
2117 WakeUpReason::MessageForChain(
2118 chain_id,
2119 ToBackgroundChain::UpdateTopicAffinity { filter },
2120 ) => {
2121 task.current_affinity_filter
2122 .insert(chain_id, filter.clone());
2123 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2124 let mut to_remove = Vec::new();
2125 for peer_id in peers.iter() {
2126 if let Err(
2127 SendTopicAffinityError::NoConnection
2128 | SendTopicAffinityError::ProtocolV1,
2129 ) = task.network.send_topic_affinity(peer_id, chain_id, &filter)
2130 {
2131 to_remove.push(peer_id.clone());
2132 }
2133 }
2134 for peer_id in &to_remove {
2135 peers.remove(peer_id);
2136 }
2137 }
2138 }
2139 WakeUpReason::MessageForChain(
2140 chain_id,
2141 ToBackgroundChain::Discover {
2142 list,
2143 important_nodes,
2144 },
2145 ) => {
2146 for (peer_id, addrs) in list {
2147 if important_nodes {
2148 task.important_nodes.insert(peer_id.clone());
2149 }
2150
2151 task.peering_strategy
2154 .insert_chain_peer(chain_id, peer_id.clone(), 30); for addr in addrs {
2157 let _ =
2158 task.peering_strategy
2159 .insert_address(&peer_id, addr.into_bytes(), 10);
2160 }
2162 }
2163 }
2164 WakeUpReason::MessageForChain(
2165 chain_id,
2166 ToBackgroundChain::DiscoveredNodes { result },
2167 ) => {
2168 let _ = result.send(
2170 task.peering_strategy
2171 .chain_peers_unordered(&chain_id)
2172 .map(|peer_id| {
2173 let addrs = task
2174 .peering_strategy
2175 .peer_addresses(peer_id)
2176 .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
2177 .collect::<Vec<_>>();
2178 (peer_id.clone(), addrs)
2179 })
2180 .collect::<Vec<_>>(),
2181 );
2182 }
2183 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
2184 let _ = result.send(
2185 task.network
2186 .gossip_connected_peers(
2187 chain_id,
2188 service::GossipKind::ConsensusTransactions,
2189 )
2190 .cloned()
2191 .collect(),
2192 );
2193 }
2194 WakeUpReason::StartDiscovery(chain_id) => {
2195 let chain = &mut task.network[chain_id];
2197 chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
2198 chain.next_discovery_period =
2199 cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
2200 task.chains_by_next_discovery.insert(
2201 (chain.next_discovery_when.clone(), chain_id),
2202 Box::pin(
2203 task.platform
2204 .sleep(task.network[chain_id].next_discovery_period),
2205 ),
2206 );
2207
2208 let random_peer_id = {
2209 let mut pub_key = [0; 32];
2210 rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
2211 PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
2212 };
2213
2214 let target = task
2216 .network
2217 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2218 .next()
2219 .cloned();
2220
2221 if let Some(target) = target {
2222 match task.network.start_kademlia_find_node_request(
2223 &target,
2224 chain_id,
2225 &random_peer_id,
2226 Duration::from_secs(20),
2227 ) {
2228 Ok(_) => {}
2229 Err(service::StartRequestError::NoConnection) => unreachable!(),
2230 };
2231
2232 log!(
2233 &task.platform,
2234 Debug,
2235 "network",
2236 "discovery-find-node-started",
2237 chain = &task.network[chain_id].log_name,
2238 request_target = target,
2239 requested_peer_id = random_peer_id
2240 );
2241 } else {
2242 log!(
2243 &task.platform,
2244 Debug,
2245 "network",
2246 "discovery-skipped-no-peer",
2247 chain = &task.network[chain_id].log_name
2248 );
2249 }
2250 }
2251 WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
2252 peer_id,
2253 expected_peer_id,
2254 id,
2255 }) => {
2256 let remote_addr =
2257 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
2259 {
2260 log!(
2261 &task.platform,
2262 Debug,
2263 "network",
2264 "handshake-finished-peer-id-mismatch",
2265 remote_addr,
2266 expected_peer_id,
2267 actual_peer_id = peer_id
2268 );
2269
2270 let _was_in = task
2271 .peering_strategy
2272 .decrease_address_connections_and_remove_if_zero(
2273 expected_peer_id,
2274 remote_addr.as_ref(),
2275 );
2276 debug_assert!(_was_in.is_ok());
2277 let _ = task.peering_strategy.increase_address_connections(
2278 &peer_id,
2279 remote_addr.into_bytes().to_vec(),
2280 10,
2281 );
2282 } else {
2283 log!(
2284 &task.platform,
2285 Debug,
2286 "network",
2287 "handshake-finished",
2288 remote_addr,
2289 peer_id
2290 );
2291 }
2292
2293 task.bitswap_peering_strategy
2294 .increase_peer_connections(&peer_id);
2295 }
2296 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2297 expected_peer_id: Some(_),
2298 ..
2299 })
2300 | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
2301 let (address, peer_id, handshake_finished) = match wake_up_reason {
2302 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2303 address,
2304 expected_peer_id: Some(peer_id),
2305 ..
2306 }) => (address, peer_id, false),
2307 WakeUpReason::NetworkEvent(service::Event::Disconnected {
2308 address,
2309 peer_id,
2310 ..
2311 }) => (address, peer_id, true),
2312 _ => unreachable!(),
2313 };
2314
2315 task.peering_strategy
2316 .decrease_address_connections(&peer_id, &address)
2317 .unwrap();
2318 let address = Multiaddr::from_bytes(address).unwrap();
2319 log!(
2320 &task.platform,
2321 Debug,
2322 "network",
2323 "connection-shutdown",
2324 peer_id,
2325 address,
2326 ?handshake_finished
2327 );
2328
2329 let ban_duration = Duration::from_secs(5);
2337 task.network.gossip_remove_desired_all(
2338 &peer_id,
2339 service::GossipKind::ConsensusTransactions,
2340 );
2341 for (&chain_id, what_happened) in task
2342 .peering_strategy
2343 .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
2344 {
2345 if matches!(
2346 what_happened,
2347 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2348 ) {
2349 log!(
2350 &task.platform,
2351 Debug,
2352 "network",
2353 "slot-unassigned",
2354 chain = &task.network[chain_id].log_name,
2355 peer_id,
2356 ?ban_duration,
2357 reason = "pre-handshake-disconnect"
2359 );
2360 }
2361 }
2362
2363 if handshake_finished {
2364 task.network.bitswap_remove_desired(&peer_id);
2365 let what_happened = task
2366 .bitswap_peering_strategy
2367 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration);
2368 if matches!(
2369 what_happened,
2370 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true },
2371 ) {
2372 log!(
2373 &task.platform,
2374 Debug,
2375 "network",
2376 "bitswap-slot-unassigned",
2377 peer_id,
2378 ?ban_duration,
2379 reason = "disconnect",
2380 );
2381 }
2382 let _ = task
2383 .bitswap_peering_strategy
2384 .decrease_peer_connections(&peer_id);
2385 }
2386 }
2387 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2388 expected_peer_id: None,
2389 ..
2390 }) => {
2391 debug_assert!(false);
2394 }
2395 WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
2396 id,
2397 peer_id,
2398 ping_time,
2399 }) => {
2400 let remote_addr =
2401 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); log!(
2403 &task.platform,
2404 Debug,
2405 "network",
2406 "pong",
2407 peer_id,
2408 remote_addr,
2409 ?ping_time
2410 );
2411 }
2412 WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
2413 chain_id,
2414 peer_id,
2415 announce,
2416 }) => {
2417 log!(
2418 &task.platform,
2419 Debug,
2420 "network",
2421 "block-announce-received",
2422 chain = &task.network[chain_id].log_name,
2423 peer_id,
2424 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2425 announce.decode().scale_encoded_header
2426 )),
2427 is_best = announce.decode().is_best
2428 );
2429
2430 let decoded_announce = announce.decode();
2431 if decoded_announce.is_best {
2432 let link = task
2433 .open_gossip_links
2434 .get_mut(&(chain_id, peer_id.clone()))
2435 .unwrap();
2436 if let Ok(decoded) = header::decode(
2437 decoded_announce.scale_encoded_header,
2438 task.network[chain_id].block_number_bytes,
2439 ) {
2440 link.best_block_hash = header::hash_from_scale_encoded_header(
2441 decoded_announce.scale_encoded_header,
2442 );
2443 link.best_block_number = decoded.number;
2444 }
2445 }
2446
2447 debug_assert!(task.event_pending_send.is_none());
2448 task.event_pending_send =
2449 Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2450 }
2451 WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2452 peer_id,
2453 chain_id,
2454 role,
2455 best_number,
2456 best_hash,
2457 kind: service::GossipKind::ConsensusTransactions,
2458 }) => {
2459 log!(
2460 &task.platform,
2461 Debug,
2462 "network",
2463 "gossip-open-success",
2464 chain = &task.network[chain_id].log_name,
2465 peer_id,
2466 best_number,
2467 best_hash = HashDisplay(&best_hash)
2468 );
2469
2470 let _prev_value = task.open_gossip_links.insert(
2471 (chain_id, peer_id.clone()),
2472 OpenGossipLinkState {
2473 best_block_number: best_number,
2474 best_block_hash: best_hash,
2475 role,
2476 finalized_block_height: None,
2477 },
2478 );
2479 debug_assert!(_prev_value.is_none());
2480
2481 debug_assert!(task.event_pending_send.is_none());
2482 task.event_pending_send = Some((
2483 chain_id,
2484 Event::Connected {
2485 peer_id,
2486 role,
2487 best_block_number: best_number,
2488 best_block_hash: best_hash,
2489 },
2490 ));
2491 }
2492 WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2493 peer_id,
2494 chain_id,
2495 error,
2496 kind: service::GossipKind::ConsensusTransactions,
2497 }) => {
2498 log!(
2499 &task.platform,
2500 Debug,
2501 "network",
2502 "gossip-open-error",
2503 chain = &task.network[chain_id].log_name,
2504 peer_id,
2505 ?error,
2506 );
2507 let ban_duration = Duration::from_secs(15);
2508
2509 let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2512 matches!(
2513 task.peering_strategy
2514 .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2515 basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2516 )
2517 } else {
2518 matches!(
2519 task.peering_strategy.unassign_slot_and_ban(
2520 &chain_id,
2521 &peer_id,
2522 task.platform.now() + ban_duration,
2523 ),
2524 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2525 )
2526 };
2527
2528 if had_slot {
2529 log!(
2530 &task.platform,
2531 Debug,
2532 "network",
2533 "slot-unassigned",
2534 chain = &task.network[chain_id].log_name,
2535 peer_id,
2536 ?ban_duration,
2537 reason = "gossip-open-failed"
2538 );
2539 task.network.gossip_remove_desired(
2540 chain_id,
2541 &peer_id,
2542 service::GossipKind::ConsensusTransactions,
2543 );
2544 }
2545 }
2546 WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2547 peer_id,
2548 chain_id,
2549 kind: service::GossipKind::ConsensusTransactions,
2550 }) => {
2551 log!(
2552 &task.platform,
2553 Debug,
2554 "network",
2555 "gossip-closed",
2556 chain = &task.network[chain_id].log_name,
2557 peer_id,
2558 );
2559 let ban_duration = Duration::from_secs(10);
2560
2561 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2562 debug_assert!(_was_in.is_some());
2563
2564 if matches!(
2567 task.peering_strategy.unassign_slot_and_ban(
2568 &chain_id,
2569 &peer_id,
2570 task.platform.now() + ban_duration,
2571 ),
2572 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2573 ) {
2574 log!(
2575 &task.platform,
2576 Debug,
2577 "network",
2578 "slot-unassigned",
2579 chain = &task.network[chain_id].log_name,
2580 peer_id,
2581 ?ban_duration,
2582 reason = "gossip-closed"
2583 );
2584 task.network.gossip_remove_desired(
2585 chain_id,
2586 &peer_id,
2587 service::GossipKind::ConsensusTransactions,
2588 );
2589 }
2590
2591 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2592 peers.remove(&peer_id);
2593 }
2594
2595 debug_assert!(task.event_pending_send.is_none());
2596 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2597 }
2598 WakeUpReason::NetworkEvent(service::Event::BitswapConnected { peer_id }) => {
2599 log!(
2600 &task.platform,
2601 Debug,
2602 "network",
2603 "bitswap-open-success",
2604 peer_id
2605 );
2606 }
2607 WakeUpReason::NetworkEvent(service::Event::BitswapOpenFailed { peer_id, error }) => {
2608 log!(
2609 &task.platform,
2610 Debug,
2611 "network",
2612 "bitswap-open-error",
2613 peer_id,
2614 ?error
2615 );
2616 let ban_duration = if error.is_protocol_not_available() {
2617 Duration::from_secs(600)
2618 } else {
2619 Duration::from_secs(15)
2620 };
2621 if matches!(
2622 task.bitswap_peering_strategy
2623 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2624 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2625 ) {
2626 log!(
2627 &task.platform,
2628 Debug,
2629 "network",
2630 "bitswap-slot-unassigned",
2631 peer_id,
2632 ?ban_duration,
2633 reason = "bitswap-open-failed"
2634 );
2635 task.network.bitswap_remove_desired(&peer_id);
2636 }
2637 }
2638 WakeUpReason::NetworkEvent(service::Event::BitswapMessage { peer_id, message }) => {
2639 log!(
2640 &task.platform,
2641 Debug,
2642 "network",
2643 "bitswap-message-received",
2644 peer_id
2645 );
2646 debug_assert!(task.bitswap_event_pending_send.is_none());
2647 task.bitswap_event_pending_send =
2648 Some(BitswapEvent::BitswapMessage { peer_id, message });
2649 }
2650 WakeUpReason::NetworkEvent(service::Event::BitswapDisconnected { peer_id }) => {
2651 log!(&task.platform, Debug, "network", "bitswap-closed", peer_id);
2652 let ban_duration = Duration::from_secs(10);
2653 if matches!(
2654 task.bitswap_peering_strategy
2655 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2656 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2657 ) {
2658 log!(
2659 &task.platform,
2660 Debug,
2661 "network",
2662 "bitswap-slot-unassigned",
2663 peer_id,
2664 ?ban_duration,
2665 reason = "bitswap-closed"
2666 );
2667 task.network.bitswap_remove_desired(&peer_id);
2668 }
2669 }
2670 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2671 substream_id,
2672 peer_id,
2673 chain_id,
2674 response: service::RequestResult::Blocks(response),
2675 }) => {
2676 match &response {
2677 Ok(blocks) => {
2678 log!(
2679 &task.platform,
2680 Debug,
2681 "network",
2682 "blocks-request-success",
2683 chain = task.network[chain_id].log_name,
2684 target = peer_id,
2685 num_blocks = blocks.len(),
2686 block_data_total_size =
2687 BytesDisplay(blocks.iter().fold(0, |sum, block| {
2688 let block_size = block.header.as_ref().map_or(0, |h| h.len())
2689 + block
2690 .body
2691 .as_ref()
2692 .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2693 + block
2694 .justifications
2695 .as_ref()
2696 .into_iter()
2697 .flat_map(|l| l.iter())
2698 .fold(0, |s, j| s + j.justification.len());
2699 sum + u64::try_from(block_size).unwrap()
2700 }))
2701 );
2702 }
2703 Err(error) => {
2704 log!(
2705 &task.platform,
2706 Debug,
2707 "network",
2708 "blocks-request-error",
2709 chain = task.network[chain_id].log_name,
2710 target = peer_id,
2711 ?error
2712 );
2713 }
2714 }
2715
2716 match &response {
2717 Ok(_) => {}
2718 Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2719 Err(err) => {
2720 log!(
2721 &task.platform,
2722 Debug,
2723 "network",
2724 format!(
2725 "Error in block request with {}. This might indicate an \
2726 incompatibility. Error: {}",
2727 peer_id, err
2728 )
2729 );
2730 }
2731 }
2732
2733 let _ = task
2734 .blocks_requests
2735 .remove(&substream_id)
2736 .unwrap()
2737 .send(response.map_err(BlocksRequestError::Request));
2738 }
2739 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2740 substream_id,
2741 peer_id,
2742 chain_id,
2743 response: service::RequestResult::GrandpaWarpSync(response),
2744 }) => {
2745 match &response {
2746 Ok(response) => {
2747 let decoded = response.decode();
2749 log!(
2750 &task.platform,
2751 Debug,
2752 "network",
2753 "warp-sync-request-success",
2754 chain = task.network[chain_id].log_name,
2755 target = peer_id,
2756 num_fragments = decoded.fragments.len(),
2757 is_finished = ?decoded.is_finished,
2758 );
2759 }
2760 Err(error) => {
2761 log!(
2762 &task.platform,
2763 Debug,
2764 "network",
2765 "warp-sync-request-error",
2766 chain = task.network[chain_id].log_name,
2767 target = peer_id,
2768 ?error,
2769 );
2770 }
2771 }
2772
2773 let _ = task
2774 .grandpa_warp_sync_requests
2775 .remove(&substream_id)
2776 .unwrap()
2777 .send(response.map_err(WarpSyncRequestError::Request));
2778 }
2779 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2780 substream_id,
2781 peer_id,
2782 chain_id,
2783 response: service::RequestResult::StorageProof(response),
2784 }) => {
2785 match &response {
2786 Ok(items) => {
2787 let decoded = items.decode();
2788 log!(
2789 &task.platform,
2790 Debug,
2791 "network",
2792 "storage-proof-request-success",
2793 chain = task.network[chain_id].log_name,
2794 target = peer_id,
2795 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2796 );
2797 }
2798 Err(error) => {
2799 log!(
2800 &task.platform,
2801 Debug,
2802 "network",
2803 "storage-proof-request-error",
2804 chain = task.network[chain_id].log_name,
2805 target = peer_id,
2806 ?error
2807 );
2808 }
2809 }
2810
2811 if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2814 let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2815 } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2816 {
2817 let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2818 } else {
2819 unreachable!()
2820 }
2821 }
2822 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2823 substream_id,
2824 peer_id,
2825 chain_id,
2826 response: service::RequestResult::CallProof(response),
2827 }) => {
2828 match &response {
2829 Ok(items) => {
2830 let decoded = items.decode();
2831 log!(
2832 &task.platform,
2833 Debug,
2834 "network",
2835 "call-proof-request-success",
2836 chain = task.network[chain_id].log_name,
2837 target = peer_id,
2838 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2839 );
2840 }
2841 Err(error) => {
2842 log!(
2843 &task.platform,
2844 Debug,
2845 "network",
2846 "call-proof-request-error",
2847 chain = task.network[chain_id].log_name,
2848 target = peer_id,
2849 ?error
2850 );
2851 }
2852 }
2853
2854 let _ = task
2855 .call_proof_requests
2856 .remove(&substream_id)
2857 .unwrap()
2858 .send(response.map_err(CallProofRequestError::Request));
2859 }
2860 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2861 peer_id: requestee_peer_id,
2862 chain_id,
2863 response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2864 ..
2865 }) => {
2866 for (peer_id, mut addrs) in nodes {
2867 if addrs.len() >= 10 {
2870 addrs.truncate(10);
2871 }
2872
2873 let mut valid_addrs = Vec::with_capacity(addrs.len());
2874 for addr in addrs {
2875 match Multiaddr::from_bytes(addr) {
2876 Ok(a) => {
2877 if platform::address_parse::multiaddr_to_address(&a)
2878 .ok()
2879 .map_or(false, |addr| {
2880 task.platform.supports_connection_type((&addr).into())
2881 })
2882 {
2883 valid_addrs.push(a)
2884 } else {
2885 log!(
2886 &task.platform,
2887 Debug,
2888 "network",
2889 "discovered-address-not-supported",
2890 chain = &task.network[chain_id].log_name,
2891 peer_id,
2892 addr = &a,
2893 obtained_from = requestee_peer_id
2894 );
2895 }
2896 }
2897 Err((error, addr)) => {
2898 log!(
2899 &task.platform,
2900 Debug,
2901 "network",
2902 "discovered-address-invalid",
2903 chain = &task.network[chain_id].log_name,
2904 peer_id,
2905 error,
2906 addr = hex::encode(&addr),
2907 obtained_from = requestee_peer_id
2908 );
2909 }
2910 }
2911 }
2912
2913 if !valid_addrs.is_empty() {
2914 let insert_outcome =
2917 task.peering_strategy
2918 .insert_chain_peer(chain_id, peer_id.clone(), 30); if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2921 peer_removed,
2922 } = insert_outcome
2923 {
2924 if let Some(peer_removed) = peer_removed {
2925 log!(
2926 &task.platform,
2927 Debug,
2928 "network",
2929 "peer-purged-from-address-book",
2930 chain = &task.network[chain_id].log_name,
2931 peer_id = peer_removed,
2932 );
2933 }
2934
2935 log!(
2936 &task.platform,
2937 Debug,
2938 "network",
2939 "peer-discovered",
2940 chain = &task.network[chain_id].log_name,
2941 peer_id,
2942 addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), obtained_from = requestee_peer_id
2944 );
2945 }
2946 }
2947
2948 for addr in valid_addrs {
2949 let _insert_result =
2950 task.peering_strategy
2951 .insert_address(&peer_id, addr.into_bytes(), 10); debug_assert!(!matches!(
2953 _insert_result,
2954 basic_peering_strategy::InsertAddressResult::UnknownPeer
2955 ));
2956 }
2957 }
2958 }
2959 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2960 peer_id,
2961 chain_id,
2962 response: service::RequestResult::KademliaFindNode(Err(error)),
2963 ..
2964 }) => {
2965 log!(
2966 &task.platform,
2967 Debug,
2968 "network",
2969 "discovery-find-node-error",
2970 chain = &task.network[chain_id].log_name,
2971 ?error,
2972 find_node_target = peer_id,
2973 );
2974
2975 match error {
2978 service::KademliaFindNodeError::RequestFailed(err)
2979 if !err.is_protocol_error() => {}
2980
2981 service::KademliaFindNodeError::RequestFailed(
2982 service::RequestError::Substream(
2983 connection::established::RequestError::ProtocolNotAvailable,
2984 ),
2985 ) => {
2986 log!(
2988 &task.platform,
2989 Warn,
2990 "network",
2991 format!(
2992 "Problem during discovery on {}: protocol not available. \
2993 This might indicate that the version of Substrate used by \
2994 the chain doesn't include \
2995 <https://github.com/paritytech/substrate/pull/12545>.",
2996 &task.network[chain_id].log_name
2997 )
2998 );
2999 }
3000 _ => {
3001 log!(
3002 &task.platform,
3003 Debug,
3004 "network",
3005 format!(
3006 "Problem during discovery on {}: {}",
3007 &task.network[chain_id].log_name, error
3008 )
3009 );
3010 }
3011 }
3012 }
3013 WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
3014 unreachable!()
3016 }
3017 WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
3018 peer_id,
3019 chain_id,
3020 kind: service::GossipKind::ConsensusTransactions,
3021 }) => {
3022 if task
3027 .network
3028 .opened_gossip_undesired_by_chain(chain_id)
3029 .count()
3030 < 4
3031 {
3032 log!(
3033 &task.platform,
3034 Debug,
3035 "network",
3036 "gossip-in-request",
3037 chain = &task.network[chain_id].log_name,
3038 peer_id,
3039 outcome = "accepted"
3040 );
3041 task.network
3042 .gossip_open(
3043 chain_id,
3044 &peer_id,
3045 service::GossipKind::ConsensusTransactions,
3046 )
3047 .unwrap();
3048 } else {
3049 log!(
3050 &task.platform,
3051 Debug,
3052 "network",
3053 "gossip-in-request",
3054 chain = &task.network[chain_id].log_name,
3055 peer_id,
3056 outcome = "rejected",
3057 );
3058 task.network
3059 .gossip_close(
3060 chain_id,
3061 &peer_id,
3062 service::GossipKind::ConsensusTransactions,
3063 )
3064 .unwrap();
3065 }
3066 }
3067 WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
3068 unreachable!()
3070 }
3071 WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
3072 peer_id,
3073 substream_id,
3074 }) => {
3075 log!(
3076 &task.platform,
3077 Debug,
3078 "network",
3079 "identify-request-received",
3080 peer_id,
3081 );
3082 task.network
3083 .respond_identify(substream_id, &task.identify_agent_version);
3084 }
3085 WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
3086 WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
3087 unreachable!()
3089 }
3090 WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
3091 chain_id,
3092 peer_id,
3093 state,
3094 }) => {
3095 log!(
3096 &task.platform,
3097 Debug,
3098 "network",
3099 "grandpa-neighbor-packet-received",
3100 chain = &task.network[chain_id].log_name,
3101 peer_id,
3102 round_number = state.round_number,
3103 set_id = state.set_id,
3104 commit_finalized_height = state.commit_finalized_height,
3105 );
3106
3107 task.open_gossip_links
3108 .get_mut(&(chain_id, peer_id.clone()))
3109 .unwrap()
3110 .finalized_block_height = Some(state.commit_finalized_height);
3111
3112 debug_assert!(task.event_pending_send.is_none());
3113 task.event_pending_send = Some((
3114 chain_id,
3115 Event::GrandpaNeighborPacket {
3116 peer_id,
3117 finalized_block_height: state.commit_finalized_height,
3118 },
3119 ));
3120 }
3121 WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
3122 chain_id,
3123 peer_id,
3124 message,
3125 }) => {
3126 log!(
3127 &task.platform,
3128 Debug,
3129 "network",
3130 "grandpa-commit-message-received",
3131 chain = &task.network[chain_id].log_name,
3132 peer_id,
3133 target_block_hash = HashDisplay(message.decode().target_hash),
3134 );
3135
3136 debug_assert!(task.event_pending_send.is_none());
3137 task.event_pending_send =
3138 Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
3139 }
3140 WakeUpReason::NetworkEvent(service::Event::StatementsNotification {
3141 chain_id,
3142 peer_id,
3143 statements,
3144 }) => {
3145 debug_assert!(task.event_pending_send.is_none());
3146
3147 if statements.is_empty() {
3148 continue;
3149 }
3150
3151 task.event_pending_send = Some((
3152 chain_id,
3153 Event::StatementsNotification {
3154 peer_id,
3155 statements,
3156 },
3157 ));
3158 }
3159 WakeUpReason::NetworkEvent(service::Event::StatementProtocolConnected {
3160 peer_id,
3161 chain_id,
3162 version,
3163 }) => {
3164 if matches!(version, codec::StatementProtocolVersion::V2) {
3165 task.v2_statement_peers
3166 .entry(chain_id)
3167 .or_insert_with(|| {
3168 HashSet::with_capacity_and_hasher(16, Default::default())
3169 })
3170 .insert(peer_id.clone());
3171 if let Some(filter) = task.current_affinity_filter.get(&chain_id) {
3172 if let Err(
3173 SendTopicAffinityError::NoConnection
3174 | SendTopicAffinityError::ProtocolV1,
3175 ) = task.network.send_topic_affinity(&peer_id, chain_id, filter)
3176 {
3177 task.v2_statement_peers
3178 .get_mut(&chain_id)
3179 .unwrap()
3180 .remove(&peer_id);
3181 }
3182 }
3183 }
3184 }
3185 WakeUpReason::NetworkEvent(service::Event::StatementTopicAffinityReceived {
3187 ..
3188 }) => {}
3189 WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
3190 log!(
3192 &task.platform,
3193 Warn,
3194 "network",
3195 "protocol-error",
3196 peer_id,
3197 ?error
3198 );
3199
3200 }
3202 WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
3203 task.peering_strategy.assign_slot(&chain_id, &peer_id);
3204
3205 log!(
3206 &task.platform,
3207 Debug,
3208 "network",
3209 "slot-assigned",
3210 chain = &task.network[chain_id].log_name,
3211 peer_id
3212 );
3213
3214 task.network.gossip_insert_desired(
3215 chain_id,
3216 peer_id,
3217 service::GossipKind::ConsensusTransactions,
3218 );
3219 }
3220 WakeUpReason::CanAssignBitswapSlot(peer_id) => {
3221 task.bitswap_peering_strategy.assign_slot(&peer_id).unwrap();
3222
3223 log!(
3224 &task.platform,
3225 Debug,
3226 "network",
3227 "bitswap-slot-assigned",
3228 peer_id
3229 );
3230
3231 task.network.bitswap_insert_desired(peer_id);
3232 }
3233 WakeUpReason::NextRecentConnectionRestore => {
3234 task.num_recent_connection_opening =
3235 task.num_recent_connection_opening.saturating_sub(1);
3236 }
3237 WakeUpReason::CanStartConnect(expected_peer_id) => {
3238 let Some(multiaddr) = task
3239 .peering_strategy
3240 .pick_address_and_add_connection(&expected_peer_id)
3241 else {
3242 task.network.gossip_remove_desired_all(
3244 &expected_peer_id,
3245 service::GossipKind::ConsensusTransactions,
3246 );
3247 let ban_duration = Duration::from_secs(10);
3248 for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
3249 &expected_peer_id,
3250 task.platform.now() + ban_duration,
3251 ) {
3252 if matches!(
3253 what_happened,
3254 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
3255 ) {
3256 log!(
3257 &task.platform,
3258 Debug,
3259 "network",
3260 "slot-unassigned",
3261 chain = &task.network[chain_id].log_name,
3262 peer_id = expected_peer_id,
3263 ?ban_duration,
3264 reason = "no-address"
3265 );
3266 }
3267 }
3268 continue;
3269 };
3270
3271 let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
3272 Ok(a) => a,
3273 Err((multiaddr::FromBytesError, addr)) => {
3274 let _was_in = task
3276 .peering_strategy
3277 .decrease_address_connections_and_remove_if_zero(
3278 &expected_peer_id,
3279 &addr,
3280 );
3281 debug_assert!(_was_in.is_ok());
3282 continue;
3283 }
3284 };
3285
3286 let address = address_parse::multiaddr_to_address(&multiaddr)
3287 .ok()
3288 .filter(|addr| {
3289 task.platform.supports_connection_type(match &addr {
3290 address_parse::AddressOrMultiStreamAddress::Address(addr) => {
3291 From::from(addr)
3292 }
3293 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3294 addr,
3295 ) => From::from(addr),
3296 })
3297 });
3298
3299 let Some(address) = address else {
3300 let _was_in = task
3302 .peering_strategy
3303 .decrease_address_connections_and_remove_if_zero(
3304 &expected_peer_id,
3305 multiaddr.as_ref(),
3306 );
3307 debug_assert!(_was_in.is_ok());
3308 continue;
3309 };
3310
3311 let noise_key = {
3313 let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
3314 task.platform.fill_random_bytes(&mut *noise_static_key);
3315 let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
3316 task.platform.fill_random_bytes(&mut *libp2p_key);
3317 connection::NoiseKey::new(&libp2p_key, &noise_static_key)
3318 };
3319
3320 log!(
3321 &task.platform,
3322 Debug,
3323 "network",
3324 "connection-started",
3325 expected_peer_id,
3326 remote_addr = multiaddr,
3327 local_peer_id =
3328 peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
3329 .into_peer_id(),
3330 );
3331
3332 task.num_recent_connection_opening += 1;
3333
3334 let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
3335 async_channel::bounded(8);
3336 let task_name = format!("connection-{}", multiaddr);
3337
3338 match address {
3339 address_parse::AddressOrMultiStreamAddress::Address(address) => {
3340 let connection = task.platform.connect_stream(address).await;
3343
3344 let (connection_id, connection_task) =
3345 task.network.add_single_stream_connection(
3346 task.platform.now(),
3347 service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
3348 is_initiator: true,
3349 noise_key: &noise_key,
3350 },
3351 multiaddr.clone().into_bytes(),
3352 Some(expected_peer_id.clone()),
3353 coordinator_to_connection_tx,
3354 );
3355
3356 task.platform.spawn_task(
3357 task_name.into(),
3358 tasks::single_stream_connection_task::<TPlat>(
3359 connection,
3360 multiaddr.to_string(),
3361 task.platform.clone(),
3362 connection_id,
3363 connection_task,
3364 coordinator_to_connection_rx,
3365 task.tasks_messages_tx.clone(),
3366 ),
3367 );
3368 }
3369 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3370 platform::MultiStreamAddress::WebRtc {
3371 ip,
3372 port,
3373 remote_certificate_sha256,
3374 },
3375 ) => {
3376 let connection = task
3381 .platform
3382 .connect_multistream(platform::MultiStreamAddress::WebRtc {
3383 ip,
3384 port,
3385 remote_certificate_sha256,
3386 })
3387 .await;
3388
3389 let local_tls_certificate_multihash = [18u8, 32]
3391 .into_iter()
3392 .chain(connection.local_tls_certificate_sha256.into_iter())
3393 .collect();
3394 let remote_tls_certificate_multihash = [18u8, 32]
3395 .into_iter()
3396 .chain(remote_certificate_sha256.iter().copied())
3397 .collect();
3398
3399 let (connection_id, connection_task) =
3400 task.network.add_multi_stream_connection(
3401 task.platform.now(),
3402 service::MultiStreamHandshakeKind::WebRtc {
3403 is_initiator: true,
3404 local_tls_certificate_multihash,
3405 remote_tls_certificate_multihash,
3406 noise_key: &noise_key,
3407 },
3408 multiaddr.clone().into_bytes(),
3409 Some(expected_peer_id.clone()),
3410 coordinator_to_connection_tx,
3411 );
3412
3413 task.platform.spawn_task(
3414 task_name.into(),
3415 tasks::webrtc_multi_stream_connection_task::<TPlat>(
3416 connection.connection,
3417 multiaddr.to_string(),
3418 task.platform.clone(),
3419 connection_id,
3420 connection_task,
3421 coordinator_to_connection_rx,
3422 task.tasks_messages_tx.clone(),
3423 ),
3424 );
3425 }
3426 }
3427 }
3428 WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
3429 task.network
3430 .gossip_open(
3431 chain_id,
3432 &peer_id,
3433 service::GossipKind::ConsensusTransactions,
3434 )
3435 .unwrap();
3436
3437 log!(
3438 &task.platform,
3439 Debug,
3440 "network",
3441 "gossip-open-start",
3442 chain = &task.network[chain_id].log_name,
3443 peer_id,
3444 );
3445 }
3446 WakeUpReason::CanOpenBitswap(peer_id) => {
3447 task.network.bitswap_open(&peer_id).unwrap();
3448
3449 log!(
3450 &task.platform,
3451 Debug,
3452 "network",
3453 "bitswap-open-start",
3454 peer_id
3455 );
3456 }
3457 WakeUpReason::MessageToConnection {
3458 connection_id,
3459 message,
3460 } => {
3461 let _send_result = task.network[connection_id].send(message).await;
3469 debug_assert!(_send_result.is_ok());
3470 }
3471 }
3472 }
3473}