1use crate::{
44 log,
45 platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49 borrow::ToOwned as _,
50 boxed::Box,
51 collections::BTreeMap,
52 format,
53 string::{String, ToString as _},
54 sync::Arc,
55 vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, num::NonZeroUsize, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65 header,
66 informant::{BytesDisplay, HashDisplay},
67 libp2p::{
68 connection,
69 multiaddr::{self, Multiaddr},
70 peer_id,
71 },
72 network::{basic_peering_strategy, bitswap_peering_strategy, codec, service},
73};
74
75pub use codec::{AffinityFilter, CallProofRequestConfig, Role};
76use service::SendTopicAffinityError;
77pub use service::{
78 ChainId, EncodedMerkleProof, PeerId, QueueNotificationError, SendBitswapMessageError,
79};
80
81#[derive(Debug, Clone)]
83pub struct StatementProtocolConfig {
84 max_seen_statements: NonZeroUsize,
86 false_positive_rate: f64,
87 bloom_seed: u128,
88 affinity_update_interval: Duration,
89}
90
91impl StatementProtocolConfig {
92 pub fn new(
93 max_seen_statements: NonZeroUsize,
94 false_positive_rate: f64,
95 bloom_seed: u128,
96 affinity_update_interval: Duration,
97 ) -> Self {
98 assert!(
99 false_positive_rate.is_finite()
100 && false_positive_rate > 0.0
101 && false_positive_rate < 1.0
102 );
103 assert!(!affinity_update_interval.is_zero());
104 StatementProtocolConfig {
105 max_seen_statements,
106 false_positive_rate,
107 bloom_seed,
108 affinity_update_interval,
109 }
110 }
111
112 pub fn max_seen_statements(&self) -> NonZeroUsize {
113 self.max_seen_statements
114 }
115
116 pub fn false_positive_rate(&self) -> f64 {
117 self.false_positive_rate
118 }
119
120 pub fn bloom_seed(&self) -> u128 {
121 self.bloom_seed
122 }
123
124 pub fn affinity_update_interval(&self) -> Duration {
125 self.affinity_update_interval
126 }
127}
128
129mod tasks;
130
131pub struct Config<TPlat> {
133 pub platform: TPlat,
135
136 pub identify_agent_version: String,
138
139 pub chains_capacity: usize,
141
142 pub connections_open_pool_size: u32,
146
147 pub connections_open_pool_restore_delay: Duration,
152}
153
154pub struct ConfigChain {
160 pub log_name: String,
162
163 pub num_out_slots: usize,
166
167 pub genesis_block_hash: [u8; 32],
173
174 pub best_block: (u64, [u8; 32]),
177
178 pub fork_id: Option<String>,
181
182 pub block_number_bytes: usize,
184
185 pub grandpa_protocol_finalized_block_height: Option<u64>,
188
189 pub statement_protocol_config: Option<StatementProtocolConfig>,
191}
192
193pub struct NetworkService<TPlat: PlatformRef> {
194 messages_tx: async_channel::Sender<ToBackground<TPlat>>,
196
197 platform: TPlat,
199}
200
201impl<TPlat: PlatformRef> NetworkService<TPlat> {
202 pub fn new(config: Config<TPlat>) -> Arc<Self> {
204 let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
205
206 let network = service::ChainNetwork::new(service::Config {
207 chains_capacity: config.chains_capacity,
208 connections_capacity: 32,
209 handshake_timeout: Duration::from_secs(4),
211 randomness_seed: {
212 let mut seed = [0; 32];
213 config.platform.fill_random_bytes(&mut seed);
214 seed
215 },
216 });
217
218 let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
220 let task = Box::pin(background_task(BackgroundTask {
221 randomness: rand_chacha::ChaCha20Rng::from_seed({
222 let mut seed = [0; 32];
223 config.platform.fill_random_bytes(&mut seed);
224 seed
225 }),
226 identify_agent_version: config.identify_agent_version,
227 tasks_messages_tx,
228 tasks_messages_rx: Box::pin(tasks_messages_rx),
229 peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
230 basic_peering_strategy::Config {
231 randomness_seed: {
232 let mut seed = [0; 32];
233 config.platform.fill_random_bytes(&mut seed);
234 seed
235 },
236 peers_capacity: 50, chains_capacity: config.chains_capacity,
238 },
239 ),
240 bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy::new(
241 bitswap_peering_strategy::Config {
242 randomness_seed: {
243 let mut seed = [0; 32];
244 config.platform.fill_random_bytes(&mut seed);
245 seed
246 },
247 peers_capacity: 50, },
249 ),
250 network,
251 connections_open_pool_size: config.connections_open_pool_size,
252 connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
253 num_recent_connection_opening: 0,
254 next_recent_connection_restore: None,
255 platform: config.platform.clone(),
256 open_gossip_links: BTreeMap::new(),
257 chains_ever_gossip_connected: HashSet::with_capacity_and_hasher(4, Default::default()),
258 v2_statement_peers: HashMap::with_capacity_and_hasher(4, Default::default()),
259 current_affinity_filter: HashMap::with_capacity_and_hasher(4, Default::default()),
260 event_pending_send: None,
261 event_senders: either::Left(Vec::new()),
262 pending_new_subscriptions: Vec::new(),
263 bitswap_event_pending_send: None,
264 bitswap_event_senders: either::Left(Vec::new()),
265 pending_new_bitswap_subscriptions: Vec::new(),
266 important_nodes: HashMap::with_capacity_and_hasher(16, Default::default()),
267 main_messages_rx: Box::pin(main_messages_rx),
268 messages_rx: stream::SelectAll::new(),
269 blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
270 grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
271 storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
272 call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
273 child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
274 chains_by_next_discovery: BTreeMap::new(),
275 }));
276
277 config.platform.spawn_task("network-service".into(), {
278 let platform = config.platform.clone();
279 async move {
280 task.await;
281 log!(&platform, Debug, "network", "shutdown");
282 }
283 });
284
285 Arc::new(NetworkService {
286 messages_tx: main_messages_tx,
287 platform: config.platform,
288 })
289 }
290
291 pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
297 let (messages_tx, messages_rx) = async_channel::bounded(32);
298
299 self.platform.spawn_task("add-chain-message-send".into(), {
301 let config = service::ChainConfig {
302 grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
303 |commit_finalized_height| service::GrandpaState {
304 commit_finalized_height,
305 round_number: 1,
306 set_id: 0,
307 },
308 ),
309 enable_statement_protocol: config.statement_protocol_config.is_some(),
310 fork_id: config.fork_id.clone(),
311 block_number_bytes: config.block_number_bytes,
312 best_hash: config.best_block.1,
313 best_number: config.best_block.0,
314 genesis_hash: config.genesis_block_hash,
315 role: Role::Light,
316 allow_inbound_block_requests: false,
317 user_data: Chain {
318 log_name: config.log_name,
319 block_number_bytes: config.block_number_bytes,
320 num_out_slots: config.num_out_slots,
321 num_references: NonZero::<usize>::new(1).unwrap(),
322 next_discovery_period: Duration::from_secs(2),
323 next_discovery_when: self.platform.now(),
324 },
325 };
326
327 let messages_tx = self.messages_tx.clone();
328 async move {
329 let _ = messages_tx
330 .send(ToBackground::AddChain {
331 messages_rx,
332 config,
333 })
334 .await;
335 }
336 });
337
338 Arc::new(NetworkServiceChain {
339 _keep_alive_messages_tx: self.messages_tx.clone(),
340 messages_tx,
341 marker: core::marker::PhantomData,
342 })
343 }
344}
345
346pub struct NetworkServiceChain<TPlat: PlatformRef> {
347 _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
350
351 messages_tx: async_channel::Sender<ToBackgroundChain>,
353
354 marker: core::marker::PhantomData<TPlat>,
356}
357
358#[derive(Debug, Copy, Clone, PartialEq, Eq)]
360pub enum BanSeverity {
361 Low,
362 High,
363}
364
365impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
366 pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
383 let (tx, rx) = async_channel::bounded(128);
384
385 self.messages_tx
386 .send(ToBackgroundChain::Subscribe { sender: tx })
387 .await
388 .unwrap();
389
390 rx
391 }
392
393 pub async fn subscribe_bitswap(&self) -> async_channel::Receiver<BitswapEvent> {
408 let (tx, rx) = async_channel::bounded(128);
409
410 self.messages_tx
411 .send(ToBackgroundChain::SubscribeBitswap { sender: tx })
412 .await
413 .unwrap();
414
415 rx
416 }
417
418 pub async fn ban_and_disconnect(
430 &self,
431 peer_id: PeerId,
432 severity: BanSeverity,
433 reason: &'static str,
434 ) {
435 let _ = self
436 .messages_tx
437 .send(ToBackgroundChain::DisconnectAndBan {
438 peer_id,
439 severity,
440 reason,
441 })
442 .await;
443 }
444
445 pub async fn blocks_request(
448 self: Arc<Self>,
449 target: PeerId,
450 config: codec::BlocksRequestConfig,
451 timeout: Duration,
452 ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
453 let (tx, rx) = oneshot::channel();
454
455 self.messages_tx
456 .send(ToBackgroundChain::StartBlocksRequest {
457 target: target.clone(),
458 config,
459 timeout,
460 result: tx,
461 })
462 .await
463 .unwrap();
464
465 rx.await.unwrap()
466 }
467
468 pub async fn grandpa_warp_sync_request(
471 self: Arc<Self>,
472 target: PeerId,
473 begin_hash: [u8; 32],
474 timeout: Duration,
475 ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
476 let (tx, rx) = oneshot::channel();
477
478 self.messages_tx
479 .send(ToBackgroundChain::StartWarpSyncRequest {
480 target: target.clone(),
481 begin_hash,
482 timeout,
483 result: tx,
484 })
485 .await
486 .unwrap();
487
488 rx.await.unwrap()
489 }
490
491 pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
492 self.messages_tx
493 .send(ToBackgroundChain::SetLocalBestBlock {
494 best_hash,
495 best_number,
496 })
497 .await
498 .unwrap();
499 }
500
501 pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
502 self.messages_tx
503 .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
504 .await
505 .unwrap();
506 }
507
508 pub async fn storage_proof_request(
511 self: Arc<Self>,
512 target: PeerId, config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
514 timeout: Duration,
515 ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
516 let (tx, rx) = oneshot::channel();
517
518 self.messages_tx
519 .send(ToBackgroundChain::StartStorageProofRequest {
520 target: target.clone(),
521 config: codec::StorageProofRequestConfig {
522 block_hash: config.block_hash,
523 keys: config
524 .keys
525 .map(|key| key.as_ref().to_vec()) .collect::<Vec<_>>()
527 .into_iter(),
528 },
529 timeout,
530 result: tx,
531 })
532 .await
533 .unwrap();
534
535 rx.await.unwrap()
536 }
537
538 pub async fn call_proof_request(
543 self: Arc<Self>,
544 target: PeerId, config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
546 timeout: Duration,
547 ) -> Result<EncodedMerkleProof, CallProofRequestError> {
548 let (tx, rx) = oneshot::channel();
549
550 self.messages_tx
551 .send(ToBackgroundChain::StartCallProofRequest {
552 target: target.clone(),
553 config: codec::CallProofRequestConfig {
554 block_hash: config.block_hash,
555 method: config.method.into_owned().into(),
556 parameter_vectored: config
557 .parameter_vectored
558 .map(|v| v.as_ref().to_vec()) .collect::<Vec<_>>()
560 .into_iter(),
561 },
562 timeout,
563 result: tx,
564 })
565 .await
566 .unwrap();
567
568 rx.await.unwrap()
569 }
570
571 pub async fn child_storage_proof_request(
573 self: Arc<Self>,
574 target: PeerId,
575 config: codec::ChildStorageProofRequestConfig<
576 impl AsRef<[u8]> + Clone,
577 impl Iterator<Item = impl AsRef<[u8]> + Clone>,
578 >,
579 timeout: Duration,
580 ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
581 let (tx, rx) = oneshot::channel();
582
583 self.messages_tx
584 .send(ToBackgroundChain::StartChildStorageProofRequest {
585 target: target.clone(),
586 config: ChildStorageProofRequestConfigOwned {
587 block_hash: config.block_hash,
588 child_trie: config.child_trie.as_ref().to_vec(),
589 keys: config
590 .keys
591 .map(|key| key.as_ref().to_vec())
592 .collect::<Vec<_>>(),
593 },
594 timeout,
595 result: tx,
596 })
597 .await
598 .unwrap();
599
600 rx.await.unwrap()
601 }
602
603 pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
613 let (tx, rx) = oneshot::channel();
614
615 self.messages_tx
616 .send(ToBackgroundChain::AnnounceTransaction {
617 transaction: transaction.to_vec(), result: tx,
619 })
620 .await
621 .unwrap();
622
623 rx.await.unwrap()
624 }
625
626 pub async fn send_block_announce(
628 self: Arc<Self>,
629 target: &PeerId,
630 scale_encoded_header: &[u8],
631 is_best: bool,
632 ) -> Result<(), QueueNotificationError> {
633 let (tx, rx) = oneshot::channel();
634
635 self.messages_tx
636 .send(ToBackgroundChain::SendBlockAnnounce {
637 target: target.clone(), scale_encoded_header: scale_encoded_header.to_vec(), is_best,
640 result: tx,
641 })
642 .await
643 .unwrap();
644
645 rx.await.unwrap()
646 }
647
648 pub async fn send_bitswap_message(
650 &self,
651 target: PeerId,
652 message: Vec<u8>,
653 ) -> Result<(), SendBitswapMessageError> {
654 let (tx, rx) = oneshot::channel();
655
656 self.messages_tx
657 .send(ToBackgroundChain::SendBitswapMessage {
658 target,
659 message,
660 result: tx,
661 })
662 .await
663 .unwrap();
664
665 rx.await.unwrap()
666 }
667
668 pub async fn broadcast_bitswap_message(
676 &self,
677 message: Vec<u8>,
678 ) -> Result<Vec<PeerId>, SendBitswapMessageError> {
679 let (tx, rx) = oneshot::channel();
680
681 self.messages_tx
682 .send(ToBackgroundChain::BroadcastBitswapMessage {
683 message,
684 result: tx,
685 })
686 .await
687 .unwrap();
688
689 rx.await.unwrap()
690 }
691
692 pub async fn broadcast_statement(
694 self: Arc<Self>,
695 statement: Vec<u8>,
696 ) -> BroadcastStatementResult {
697 let (tx, rx) = oneshot::channel();
698
699 self.messages_tx
700 .send(ToBackgroundChain::BroadcastStatement {
701 statement,
702 result: tx,
703 })
704 .await
705 .unwrap();
706
707 rx.await.unwrap()
708 }
709
710 pub async fn update_topic_affinity(&self, filter: AffinityFilter) {
711 self.messages_tx
712 .send(ToBackgroundChain::UpdateTopicAffinity { filter })
713 .await
714 .unwrap();
715 }
716
717 pub async fn discover(
723 &self,
724 list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
725 important_nodes: bool,
726 ) {
727 self.messages_tx
728 .send(ToBackgroundChain::Discover {
729 list: list
731 .into_iter()
732 .map(|(peer_id, addrs)| {
733 (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
734 })
735 .collect::<Vec<_>>()
736 .into_iter(),
737 important_nodes,
738 })
739 .await
740 .unwrap();
741 }
742
743 pub async fn discovered_nodes(
750 &self,
751 ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
752 let (tx, rx) = oneshot::channel();
753
754 self.messages_tx
755 .send(ToBackgroundChain::DiscoveredNodes { result: tx })
756 .await
757 .unwrap();
758
759 rx.await
760 .unwrap()
761 .into_iter()
762 .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
763 }
764
765 pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
768 let (tx, rx) = oneshot::channel();
769 self.messages_tx
770 .send(ToBackgroundChain::PeersList { result: tx })
771 .await
772 .unwrap();
773 rx.await.unwrap().into_iter()
774 }
775}
776
777#[derive(Debug, Clone)]
778pub struct BroadcastStatementResult {
779 pub sent: usize,
780 pub total: usize,
781}
782
783#[derive(Debug, Clone)]
785pub enum Event {
786 Connected {
787 peer_id: PeerId,
788 role: Role,
789 best_block_number: u64,
790 best_block_hash: [u8; 32],
791 },
792 Disconnected {
793 peer_id: PeerId,
794 },
795 BlockAnnounce {
796 peer_id: PeerId,
797 announce: service::EncodedBlockAnnounce,
798 },
799 GrandpaNeighborPacket {
800 peer_id: PeerId,
801 finalized_block_height: u64,
802 },
803 GrandpaCommitMessage {
805 peer_id: PeerId,
806 message: service::EncodedGrandpaCommitMessage,
807 },
808 StatementsNotification {
810 peer_id: PeerId,
811 statements: Vec<([u8; 32], codec::Statement)>,
812 },
813}
814
815#[derive(Debug, Clone)]
819pub enum BitswapEvent {
820 BitswapMessage {
821 peer_id: PeerId,
822 message: service::EncodedBitswapMessage,
823 },
824}
825
826#[derive(Debug, derive_more::Display, derive_more::Error)]
828pub enum BlocksRequestError {
829 NoConnection,
831 #[display("{_0}")]
833 Request(service::BlocksRequestError),
834}
835
836#[derive(Debug, derive_more::Display, derive_more::Error)]
838pub enum WarpSyncRequestError {
839 NoConnection,
841 #[display("{_0}")]
843 Request(service::GrandpaWarpSyncRequestError),
844}
845
846#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
848pub enum StorageProofRequestError {
849 NoConnection,
851 RequestTooLarge,
853 #[display("{_0}")]
855 Request(service::StorageProofRequestError),
856}
857
858#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
860pub enum CallProofRequestError {
861 NoConnection,
863 RequestTooLarge,
865 #[display("{_0}")]
867 Request(service::CallProofRequestError),
868}
869
870impl CallProofRequestError {
871 pub fn is_network_problem(&self) -> bool {
874 match self {
875 CallProofRequestError::Request(err) => err.is_network_problem(),
876 CallProofRequestError::RequestTooLarge => false,
877 CallProofRequestError::NoConnection => true,
878 }
879 }
880}
881
882#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
884pub enum ChildStorageProofRequestError {
885 NoConnection,
887 RequestTooLarge,
889 #[display("{_0}")]
891 Request(service::StorageProofRequestError),
892}
893
894impl ChildStorageProofRequestError {
895 pub fn is_network_problem(&self) -> bool {
898 match self {
899 ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
900 ChildStorageProofRequestError::RequestTooLarge => false,
901 ChildStorageProofRequestError::NoConnection => true,
902 }
903 }
904}
905
906struct ChildStorageProofRequestConfigOwned {
908 block_hash: [u8; 32],
909 child_trie: Vec<u8>,
910 keys: Vec<Vec<u8>>,
911}
912
913enum ToBackground<TPlat: PlatformRef> {
914 AddChain {
915 messages_rx: async_channel::Receiver<ToBackgroundChain>,
916 config: service::ChainConfig<Chain<TPlat>>,
917 },
918}
919
920enum ToBackgroundChain {
921 RemoveChain,
922 Subscribe {
923 sender: async_channel::Sender<Event>,
924 },
925 SubscribeBitswap {
926 sender: async_channel::Sender<BitswapEvent>,
927 },
928 DisconnectAndBan {
929 peer_id: PeerId,
930 severity: BanSeverity,
931 reason: &'static str,
932 },
933 StartBlocksRequest {
935 target: PeerId, config: codec::BlocksRequestConfig,
937 timeout: Duration,
938 result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
939 },
940 StartWarpSyncRequest {
942 target: PeerId,
943 begin_hash: [u8; 32],
944 timeout: Duration,
945 result:
946 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
947 },
948 StartStorageProofRequest {
950 target: PeerId,
951 config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
952 timeout: Duration,
953 result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
954 },
955 StartCallProofRequest {
957 target: PeerId, config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
959 timeout: Duration,
960 result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
961 },
962 StartChildStorageProofRequest {
964 target: PeerId,
965 config: ChildStorageProofRequestConfigOwned,
966 timeout: Duration,
967 result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
968 },
969 SetLocalBestBlock {
970 best_hash: [u8; 32],
971 best_number: u64,
972 },
973 SetLocalGrandpaState {
974 grandpa_state: service::GrandpaState,
975 },
976 AnnounceTransaction {
977 transaction: Vec<u8>,
978 result: oneshot::Sender<Vec<PeerId>>,
979 },
980 SendBlockAnnounce {
981 target: PeerId,
982 scale_encoded_header: Vec<u8>,
983 is_best: bool,
984 result: oneshot::Sender<Result<(), QueueNotificationError>>,
985 },
986 SendBitswapMessage {
987 target: PeerId,
988 message: Vec<u8>,
989 result: oneshot::Sender<Result<(), SendBitswapMessageError>>,
990 },
991 BroadcastBitswapMessage {
992 message: Vec<u8>,
993 result: oneshot::Sender<Result<Vec<PeerId>, SendBitswapMessageError>>,
994 },
995 BroadcastStatement {
996 statement: Vec<u8>,
997 result: oneshot::Sender<BroadcastStatementResult>,
998 },
999 UpdateTopicAffinity {
1000 filter: AffinityFilter,
1001 },
1002 Discover {
1003 list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
1004 important_nodes: bool,
1005 },
1006 DiscoveredNodes {
1007 result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
1008 },
1009 PeersList {
1010 result: oneshot::Sender<Vec<PeerId>>,
1011 },
1012}
1013
1014struct BackgroundTask<TPlat: PlatformRef> {
1015 platform: TPlat,
1017
1018 randomness: rand_chacha::ChaCha20Rng,
1020
1021 identify_agent_version: String,
1023
1024 tasks_messages_tx:
1026 async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
1027
1028 tasks_messages_rx: Pin<
1030 Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
1031 >,
1032
1033 network: service::ChainNetwork<
1035 Chain<TPlat>,
1036 async_channel::Sender<service::CoordinatorToConnection>,
1037 TPlat::Instant,
1038 >,
1039
1040 peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
1042
1043 bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy<TPlat::Instant>,
1045
1046 connections_open_pool_size: u32,
1048
1049 connections_open_pool_restore_delay: Duration,
1051
1052 num_recent_connection_opening: u32,
1056
1057 next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
1059
1060 open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
1063
1064 chains_ever_gossip_connected: HashSet<ChainId, fnv::FnvBuildHasher>,
1067
1068 v2_statement_peers: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1070
1071 current_affinity_filter: HashMap<ChainId, AffinityFilter, fnv::FnvBuildHasher>,
1073
1074 important_nodes: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1078
1079 event_pending_send: Option<(ChainId, Event)>,
1081
1082 bitswap_event_pending_send: Option<BitswapEvent>,
1084
1085 event_senders: either::Either<
1091 Vec<(ChainId, async_channel::Sender<Event>)>,
1092 Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
1093 >,
1094
1095 pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
1098
1099 bitswap_event_senders: either::Either<
1109 Vec<async_channel::Sender<BitswapEvent>>,
1110 Pin<Box<dyn Future<Output = Vec<async_channel::Sender<BitswapEvent>>> + Send>>,
1111 >,
1112
1113 pending_new_bitswap_subscriptions: Vec<async_channel::Sender<BitswapEvent>>,
1117
1118 main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
1119
1120 messages_rx:
1121 stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
1122
1123 blocks_requests: HashMap<
1124 service::SubstreamId,
1125 oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
1126 fnv::FnvBuildHasher,
1127 >,
1128
1129 grandpa_warp_sync_requests: HashMap<
1130 service::SubstreamId,
1131 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
1132 fnv::FnvBuildHasher,
1133 >,
1134
1135 storage_proof_requests: HashMap<
1136 service::SubstreamId,
1137 oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
1138 fnv::FnvBuildHasher,
1139 >,
1140
1141 call_proof_requests: HashMap<
1142 service::SubstreamId,
1143 oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
1144 fnv::FnvBuildHasher,
1145 >,
1146
1147 child_storage_proof_requests: HashMap<
1148 service::SubstreamId,
1149 oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
1150 fnv::FnvBuildHasher,
1151 >,
1152
1153 chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
1155}
1156
1157struct Chain<TPlat: PlatformRef> {
1158 log_name: String,
1159
1160 num_references: NonZero<usize>,
1162
1163 block_number_bytes: usize,
1166
1167 num_out_slots: usize,
1169
1170 next_discovery_when: TPlat::Instant,
1172
1173 next_discovery_period: Duration,
1176}
1177
1178#[derive(Clone)]
1179struct OpenGossipLinkState {
1180 role: Role,
1181 best_block_number: u64,
1182 best_block_hash: [u8; 32],
1183 finalized_block_height: Option<u64>,
1185}
1186
1187async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
1188 loop {
1189 futures_lite::future::yield_now().await;
1191
1192 enum WakeUpReason<TPlat: PlatformRef> {
1193 ForegroundClosed,
1194 Message(ToBackground<TPlat>),
1195 MessageForChain(ChainId, ToBackgroundChain),
1196 NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
1197 CanAssignSlot(PeerId, ChainId),
1198 CanAssignBitswapSlot(PeerId),
1199 NextRecentConnectionRestore,
1200 CanStartConnect(PeerId),
1201 CanOpenGossip(PeerId, ChainId),
1202 CanOpenBitswap(PeerId),
1203 MessageFromConnection {
1204 connection_id: service::ConnectionId,
1205 message: service::ConnectionToCoordinator,
1206 },
1207 MessageToConnection {
1208 connection_id: service::ConnectionId,
1209 message: service::CoordinatorToConnection,
1210 },
1211 EventSendersReady,
1212 BitswapEventSendersReady,
1213 StartDiscovery(ChainId),
1214 }
1215
1216 let wake_up_reason = {
1217 let message_received = async {
1218 task.main_messages_rx
1219 .next()
1220 .await
1221 .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
1222 };
1223 let message_for_chain_received = async {
1224 let Some((chain_id, message)) = task.messages_rx.next().await else {
1229 future::pending().await
1230 };
1231 WakeUpReason::MessageForChain(chain_id, message)
1232 };
1233 let message_from_task_received = async {
1234 let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
1235 WakeUpReason::MessageFromConnection {
1236 connection_id,
1237 message,
1238 }
1239 };
1240 let service_event = async {
1241 if let Some(event) = (task.event_pending_send.is_none()
1242 && task.bitswap_event_pending_send.is_none()
1243 && task.pending_new_subscriptions.is_empty()
1244 && task.pending_new_bitswap_subscriptions.is_empty())
1245 .then(|| task.network.next_event())
1246 .flatten()
1247 {
1248 WakeUpReason::NetworkEvent(event)
1249 } else if let Some(start_connect) = {
1250 let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1251 .then(|| {
1252 task.network
1253 .unconnected_desired()
1254 .choose(&mut task.randomness)
1255 .cloned()
1256 })
1257 .flatten();
1258 x
1259 } {
1260 WakeUpReason::CanStartConnect(start_connect)
1261 } else if let Some((peer_id, chain_id)) = {
1262 let x = task
1263 .network
1264 .connected_unopened_gossip_desired()
1265 .choose(&mut task.randomness)
1266 .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1267 x
1268 } {
1269 WakeUpReason::CanOpenGossip(peer_id, chain_id)
1270 } else if let Some(peer_id) = {
1271 let x = task
1272 .network
1273 .connected_unopened_bitswap_desired()
1274 .choose(&mut task.randomness)
1275 .cloned();
1276 x
1277 } {
1278 WakeUpReason::CanOpenBitswap(peer_id)
1279 } else if let Some((connection_id, message)) =
1280 task.network.pull_message_to_connection()
1281 {
1282 WakeUpReason::MessageToConnection {
1283 connection_id,
1284 message,
1285 }
1286 } else {
1287 'search: loop {
1288 let mut earlier_unban = None;
1289
1290 for chain_id in task.network.chains().collect::<Vec<_>>() {
1291 if task.network.gossip_desired_num(
1292 chain_id,
1293 service::GossipKind::ConsensusTransactions,
1294 ) >= task.network[chain_id].num_out_slots
1295 {
1296 continue;
1297 }
1298
1299 let now = task.platform.now();
1300
1301 if !task.chains_ever_gossip_connected.contains(&chain_id) {
1304 if let basic_peering_strategy::AssignablePeer::Assignable(peer_id) =
1305 task.peering_strategy.pick_assignable_peer_filtered(
1306 &chain_id,
1307 &now,
1308 |peer_id| {
1309 task.important_nodes
1310 .get(&chain_id)
1311 .map_or(false, |nodes| nodes.contains(peer_id))
1312 },
1313 )
1314 {
1315 break 'search WakeUpReason::CanAssignSlot(
1316 peer_id.clone(),
1317 chain_id,
1318 );
1319 }
1320 }
1321
1322 match task.peering_strategy.pick_assignable_peer(&chain_id, &now) {
1323 basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1324 break 'search WakeUpReason::CanAssignSlot(
1325 peer_id.clone(),
1326 chain_id,
1327 );
1328 }
1329 basic_peering_strategy::AssignablePeer::AllPeersBanned {
1330 next_unban,
1331 } => {
1332 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1333 earlier_unban = Some(next_unban.clone());
1334 }
1335 }
1336 basic_peering_strategy::AssignablePeer::NoPeer => continue,
1337 }
1338 }
1339
1340 match task
1341 .bitswap_peering_strategy
1342 .pick_assignable_peer(&task.platform.now())
1343 {
1344 bitswap_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1345 break 'search WakeUpReason::CanAssignBitswapSlot(peer_id.clone());
1346 }
1347 bitswap_peering_strategy::AssignablePeer::AllPeersBanned {
1348 next_unban,
1349 } => {
1350 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1351 earlier_unban = Some(next_unban.clone());
1352 }
1353 }
1354 bitswap_peering_strategy::AssignablePeer::NoPeer => {}
1355 }
1356
1357 if let Some(earlier_unban) = earlier_unban {
1358 task.platform.sleep_until(earlier_unban).await;
1359 } else {
1360 future::pending::<()>().await;
1361 }
1362 }
1363 }
1364 };
1365 let next_recent_connection_restore = async {
1366 if task.num_recent_connection_opening != 0
1367 && task.next_recent_connection_restore.is_none()
1368 {
1369 task.next_recent_connection_restore = Some(Box::pin(
1370 task.platform
1371 .sleep(task.connections_open_pool_restore_delay),
1372 ));
1373 }
1374 if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1375 delay.await;
1376 task.next_recent_connection_restore = None;
1377 WakeUpReason::NextRecentConnectionRestore
1378 } else {
1379 future::pending().await
1380 }
1381 };
1382 let finished_sending_event = async {
1383 if let either::Right(event_sending_future) = &mut task.event_senders {
1384 let event_senders = event_sending_future.await;
1385 task.event_senders = either::Left(event_senders);
1386 WakeUpReason::EventSendersReady
1387 } else if task.event_pending_send.is_some()
1388 || !task.pending_new_subscriptions.is_empty()
1389 {
1390 WakeUpReason::EventSendersReady
1391 } else {
1392 future::pending().await
1393 }
1394 };
1395 let finished_sending_bitswap_event = async {
1396 if let either::Right(bitswap_event_sending_future) = &mut task.bitswap_event_senders
1397 {
1398 let bitswap_event_senders = bitswap_event_sending_future.await;
1399 task.bitswap_event_senders = either::Left(bitswap_event_senders);
1400 WakeUpReason::BitswapEventSendersReady
1401 } else if task.bitswap_event_pending_send.is_some()
1402 || !task.pending_new_bitswap_subscriptions.is_empty()
1403 {
1404 WakeUpReason::BitswapEventSendersReady
1405 } else {
1406 future::pending().await
1407 }
1408 };
1409 let start_discovery = async {
1410 let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1411 future::pending().await
1412 };
1413 next_discovery.get_mut().await;
1414 let ((_, chain_id), _) = next_discovery.remove_entry();
1415 WakeUpReason::StartDiscovery(chain_id)
1416 };
1417
1418 message_for_chain_received
1419 .or(message_received)
1420 .or(message_from_task_received)
1421 .or(service_event)
1422 .or(next_recent_connection_restore)
1423 .or(finished_sending_event)
1424 .or(finished_sending_bitswap_event)
1425 .or(start_discovery)
1426 .await
1427 };
1428
1429 match wake_up_reason {
1430 WakeUpReason::ForegroundClosed => {
1431 return;
1433 }
1434 WakeUpReason::Message(ToBackground::AddChain {
1435 messages_rx,
1436 config,
1437 }) => {
1438 let chain_id = match task.network.add_chain(config) {
1440 Ok(id) => id,
1441 Err(service::AddChainError::Duplicate { existing_identical }) => {
1442 task.network[existing_identical].num_references = task.network
1443 [existing_identical]
1444 .num_references
1445 .checked_add(1)
1446 .unwrap();
1447 existing_identical
1448 }
1449 };
1450
1451 task.chains_by_next_discovery.insert(
1452 (task.network[chain_id].next_discovery_when.clone(), chain_id),
1453 Box::pin(
1454 task.platform
1455 .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1456 ),
1457 );
1458
1459 task.messages_rx
1460 .push(Box::pin(
1461 messages_rx
1462 .map(move |msg| (chain_id, msg))
1463 .chain(stream::once(future::ready((
1464 chain_id,
1465 ToBackgroundChain::RemoveChain,
1466 )))),
1467 ) as Pin<Box<_>>);
1468
1469 log!(
1470 &task.platform,
1471 Debug,
1472 "network",
1473 "chain-added",
1474 id = task.network[chain_id].log_name
1475 );
1476 }
1477 WakeUpReason::EventSendersReady => {
1478 let either::Left(event_senders) = &mut task.event_senders else {
1482 unreachable!()
1483 };
1484
1485 if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1486 task.event_pending_send.take()
1487 {
1488 let mut event_senders = mem::take(event_senders);
1489 task.event_senders = either::Right(Box::pin(async move {
1490 for index in (0..event_senders.len()).rev() {
1493 let (event_sender_chain_id, event_sender) =
1494 event_senders.swap_remove(index);
1495 if event_sender_chain_id == event_to_dispatch_chain_id {
1496 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1497 continue;
1498 }
1499 }
1500 event_senders.push((event_sender_chain_id, event_sender));
1501 }
1502 event_senders
1503 }));
1504 } else if !task.pending_new_subscriptions.is_empty() {
1505 let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1506 let mut event_senders = mem::take(event_senders);
1507 let open_gossip_links = task.open_gossip_links.clone();
1509 task.event_senders = either::Right(Box::pin(async move {
1510 for (chain_id, new_subscription) in pending_new_subscriptions {
1511 for ((link_chain_id, peer_id), state) in &open_gossip_links {
1512 if *link_chain_id != chain_id {
1514 continue;
1515 }
1516
1517 let _ = new_subscription
1518 .send(Event::Connected {
1519 peer_id: peer_id.clone(),
1520 role: state.role,
1521 best_block_number: state.best_block_number,
1522 best_block_hash: state.best_block_hash,
1523 })
1524 .await;
1525
1526 if let Some(finalized_block_height) = state.finalized_block_height {
1527 let _ = new_subscription
1528 .send(Event::GrandpaNeighborPacket {
1529 peer_id: peer_id.clone(),
1530 finalized_block_height,
1531 })
1532 .await;
1533 }
1534 }
1535
1536 event_senders.push((chain_id, new_subscription));
1537 }
1538
1539 event_senders
1540 }));
1541 }
1542 }
1543 WakeUpReason::BitswapEventSendersReady => {
1544 let either::Left(bitswap_event_senders) = &mut task.bitswap_event_senders else {
1546 unreachable!()
1547 };
1548
1549 if let Some(event_to_dispatch) = task.bitswap_event_pending_send.take() {
1550 let mut bitswap_event_senders = mem::take(bitswap_event_senders);
1551 task.bitswap_event_senders = either::Right(Box::pin(async move {
1552 for index in (0..bitswap_event_senders.len()).rev() {
1555 let event_sender = bitswap_event_senders.swap_remove(index);
1556 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1557 continue;
1558 }
1559 bitswap_event_senders.push(event_sender);
1560 }
1561 bitswap_event_senders
1562 }));
1563 } else if !task.pending_new_bitswap_subscriptions.is_empty() {
1564 bitswap_event_senders.append(&mut task.pending_new_bitswap_subscriptions);
1565 }
1566 }
1567 WakeUpReason::MessageFromConnection {
1568 connection_id,
1569 message,
1570 } => {
1571 task.network
1572 .inject_connection_message(connection_id, message);
1573 }
1574 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1575 if let Some(new_ref) =
1576 NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1577 {
1578 task.network[chain_id].num_references = new_ref;
1579 continue;
1580 }
1581
1582 for peer_id in task
1583 .network
1584 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1585 .cloned()
1586 .collect::<Vec<_>>()
1587 {
1588 task.network
1589 .gossip_close(
1590 chain_id,
1591 &peer_id,
1592 service::GossipKind::ConsensusTransactions,
1593 )
1594 .unwrap();
1595
1596 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1597 debug_assert!(_was_in.is_some());
1598 }
1599
1600 let _was_in = task
1601 .chains_by_next_discovery
1602 .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1603 debug_assert!(_was_in.is_some());
1604
1605 log!(
1606 &task.platform,
1607 Debug,
1608 "network",
1609 "chain-removed",
1610 id = task.network[chain_id].log_name
1611 );
1612 task.v2_statement_peers.remove(&chain_id);
1613 task.current_affinity_filter.remove(&chain_id);
1614 task.important_nodes.remove(&chain_id);
1615 task.chains_ever_gossip_connected.remove(&chain_id);
1616 task.network.remove_chain(chain_id).unwrap();
1617 task.peering_strategy.remove_chain_peers(&chain_id);
1618 }
1619 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1620 task.pending_new_subscriptions.push((chain_id, sender));
1621 }
1622 WakeUpReason::MessageForChain(
1623 _chain_id,
1624 ToBackgroundChain::SubscribeBitswap { sender },
1625 ) => {
1626 task.pending_new_bitswap_subscriptions.push(sender);
1627 }
1628 WakeUpReason::MessageForChain(
1629 chain_id,
1630 ToBackgroundChain::DisconnectAndBan {
1631 peer_id,
1632 severity,
1633 reason,
1634 },
1635 ) => {
1636 let ban_duration = Duration::from_secs(match severity {
1637 BanSeverity::Low => 10,
1638 BanSeverity::High => 40,
1639 });
1640
1641 let had_slot = matches!(
1642 task.peering_strategy.unassign_slot_and_ban(
1643 &chain_id,
1644 &peer_id,
1645 task.platform.now() + ban_duration,
1646 ),
1647 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1648 );
1649
1650 if had_slot {
1651 log!(
1652 &task.platform,
1653 Debug,
1654 "network",
1655 "slot-unassigned",
1656 chain = &task.network[chain_id].log_name,
1657 peer_id,
1658 ?ban_duration,
1659 reason = "user-ban",
1660 user_reason = reason
1661 );
1662 task.network.gossip_remove_desired(
1663 chain_id,
1664 &peer_id,
1665 service::GossipKind::ConsensusTransactions,
1666 );
1667 }
1668
1669 if task.network.gossip_is_connected(
1670 chain_id,
1671 &peer_id,
1672 service::GossipKind::ConsensusTransactions,
1673 ) {
1674 let _closed_result = task.network.gossip_close(
1675 chain_id,
1676 &peer_id,
1677 service::GossipKind::ConsensusTransactions,
1678 );
1679 debug_assert!(_closed_result.is_ok());
1680
1681 log!(
1682 &task.platform,
1683 Debug,
1684 "network",
1685 "gossip-closed",
1686 chain = &task.network[chain_id].log_name,
1687 peer_id,
1688 );
1689
1690 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1691 debug_assert!(_was_in.is_some());
1692
1693 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
1694 peers.remove(&peer_id);
1695 }
1696
1697 debug_assert!(task.event_pending_send.is_none());
1698 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1699 }
1700 }
1701 WakeUpReason::MessageForChain(
1702 chain_id,
1703 ToBackgroundChain::StartBlocksRequest {
1704 target,
1705 config,
1706 timeout,
1707 result,
1708 },
1709 ) => {
1710 match &config.start {
1711 codec::BlocksRequestConfigStart::Hash(hash) => {
1712 log!(
1713 &task.platform,
1714 Debug,
1715 "network",
1716 "blocks-request-started",
1717 chain = task.network[chain_id].log_name, target,
1718 start = HashDisplay(hash),
1719 num = config.desired_count.get(),
1720 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1721 header = ?config.fields.header, body = ?config.fields.body,
1722 justifications = ?config.fields.justifications
1723 );
1724 }
1725 codec::BlocksRequestConfigStart::Number(number) => {
1726 log!(
1727 &task.platform,
1728 Debug,
1729 "network",
1730 "blocks-request-started",
1731 chain = task.network[chain_id].log_name, target, start = number,
1732 num = config.desired_count.get(),
1733 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1734 header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1735 );
1736 }
1737 }
1738
1739 match task
1740 .network
1741 .start_blocks_request(&target, chain_id, config.clone(), timeout)
1742 {
1743 Ok(substream_id) => {
1744 task.blocks_requests.insert(substream_id, result);
1745 }
1746 Err(service::StartRequestError::NoConnection) => {
1747 log!(
1748 &task.platform,
1749 Debug,
1750 "network",
1751 "blocks-request-error",
1752 chain = task.network[chain_id].log_name,
1753 target,
1754 error = "NoConnection"
1755 );
1756 let _ = result.send(Err(BlocksRequestError::NoConnection));
1757 }
1758 }
1759 }
1760 WakeUpReason::MessageForChain(
1761 chain_id,
1762 ToBackgroundChain::StartWarpSyncRequest {
1763 target,
1764 begin_hash,
1765 timeout,
1766 result,
1767 },
1768 ) => {
1769 log!(
1770 &task.platform,
1771 Debug,
1772 "network",
1773 "warp-sync-request-started",
1774 chain = task.network[chain_id].log_name,
1775 target,
1776 start = HashDisplay(&begin_hash)
1777 );
1778
1779 match task
1780 .network
1781 .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1782 {
1783 Ok(substream_id) => {
1784 task.grandpa_warp_sync_requests.insert(substream_id, result);
1785 }
1786 Err(service::StartRequestError::NoConnection) => {
1787 log!(
1788 &task.platform,
1789 Debug,
1790 "network",
1791 "warp-sync-request-error",
1792 chain = task.network[chain_id].log_name,
1793 target,
1794 error = "NoConnection"
1795 );
1796 let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1797 }
1798 }
1799 }
1800 WakeUpReason::MessageForChain(
1801 chain_id,
1802 ToBackgroundChain::StartStorageProofRequest {
1803 target,
1804 config,
1805 timeout,
1806 result,
1807 },
1808 ) => {
1809 log!(
1810 &task.platform,
1811 Debug,
1812 "network",
1813 "storage-proof-request-started",
1814 chain = task.network[chain_id].log_name,
1815 target,
1816 block_hash = HashDisplay(&config.block_hash)
1817 );
1818
1819 match task.network.start_storage_proof_request(
1820 &target,
1821 chain_id,
1822 config.clone(),
1823 timeout,
1824 ) {
1825 Ok(substream_id) => {
1826 task.storage_proof_requests.insert(substream_id, result);
1827 }
1828 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1829 log!(
1830 &task.platform,
1831 Debug,
1832 "network",
1833 "storage-proof-request-error",
1834 chain = task.network[chain_id].log_name,
1835 target,
1836 error = "NoConnection"
1837 );
1838 let _ = result.send(Err(StorageProofRequestError::NoConnection));
1839 }
1840 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1841 log!(
1842 &task.platform,
1843 Debug,
1844 "network",
1845 "storage-proof-request-error",
1846 chain = task.network[chain_id].log_name,
1847 target,
1848 error = "RequestTooLarge"
1849 );
1850 let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1851 }
1852 };
1853 }
1854 WakeUpReason::MessageForChain(
1855 chain_id,
1856 ToBackgroundChain::StartCallProofRequest {
1857 target,
1858 config,
1859 timeout,
1860 result,
1861 },
1862 ) => {
1863 log!(
1864 &task.platform,
1865 Debug,
1866 "network",
1867 "call-proof-request-started",
1868 chain = task.network[chain_id].log_name,
1869 target,
1870 block_hash = HashDisplay(&config.block_hash),
1871 function = config.method
1872 );
1873 match task.network.start_call_proof_request(
1876 &target,
1877 chain_id,
1878 config.clone(),
1879 timeout,
1880 ) {
1881 Ok(substream_id) => {
1882 task.call_proof_requests.insert(substream_id, result);
1883 }
1884 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1885 log!(
1886 &task.platform,
1887 Debug,
1888 "network",
1889 "call-proof-request-error",
1890 chain = task.network[chain_id].log_name,
1891 target,
1892 error = "NoConnection"
1893 );
1894 let _ = result.send(Err(CallProofRequestError::NoConnection));
1895 }
1896 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1897 log!(
1898 &task.platform,
1899 Debug,
1900 "network",
1901 "call-proof-request-error",
1902 chain = task.network[chain_id].log_name,
1903 target,
1904 error = "RequestTooLarge"
1905 );
1906 let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1907 }
1908 };
1909 }
1910 WakeUpReason::MessageForChain(
1911 chain_id,
1912 ToBackgroundChain::StartChildStorageProofRequest {
1913 target,
1914 config,
1915 timeout,
1916 result,
1917 },
1918 ) => {
1919 log!(
1920 &task.platform,
1921 Debug,
1922 "network",
1923 "child-storage-proof-request-started",
1924 chain = task.network[chain_id].log_name,
1925 target,
1926 block_hash = HashDisplay(&config.block_hash)
1927 );
1928
1929 match task.network.start_child_storage_proof_request(
1930 &target,
1931 chain_id,
1932 codec::ChildStorageProofRequestConfig {
1933 block_hash: config.block_hash,
1934 child_trie: &config.child_trie,
1935 keys: config.keys.iter().map(|k| k.as_slice()),
1936 },
1937 timeout,
1938 ) {
1939 Ok(substream_id) => {
1940 task.child_storage_proof_requests
1941 .insert(substream_id, result);
1942 }
1943 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1944 log!(
1945 &task.platform,
1946 Debug,
1947 "network",
1948 "child-storage-proof-request-error",
1949 chain = task.network[chain_id].log_name,
1950 target,
1951 error = "NoConnection"
1952 );
1953 let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1954 }
1955 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1956 log!(
1957 &task.platform,
1958 Debug,
1959 "network",
1960 "child-storage-proof-request-error",
1961 chain = task.network[chain_id].log_name,
1962 target,
1963 error = "RequestTooLarge"
1964 );
1965 let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1966 }
1967 };
1968 }
1969 WakeUpReason::MessageForChain(
1970 chain_id,
1971 ToBackgroundChain::SetLocalBestBlock {
1972 best_hash,
1973 best_number,
1974 },
1975 ) => {
1976 task.network
1977 .set_chain_local_best_block(chain_id, best_hash, best_number);
1978 }
1979 WakeUpReason::MessageForChain(
1980 chain_id,
1981 ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1982 ) => {
1983 log!(
1984 &task.platform,
1985 Debug,
1986 "network",
1987 "local-grandpa-state-announced",
1988 chain = task.network[chain_id].log_name,
1989 set_id = grandpa_state.set_id,
1990 commit_finalized_height = grandpa_state.commit_finalized_height,
1991 );
1992
1993 task.network
1996 .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1997 }
1998 WakeUpReason::MessageForChain(
1999 chain_id,
2000 ToBackgroundChain::AnnounceTransaction {
2001 transaction,
2002 result,
2003 },
2004 ) => {
2005 let peers_to_send = task
2008 .network
2009 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2010 .cloned()
2011 .collect::<Vec<_>>();
2012
2013 let mut peers_sent = Vec::with_capacity(peers_to_send.len());
2014 let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
2015 for peer in &peers_to_send {
2016 match task
2017 .network
2018 .gossip_send_transaction(peer, chain_id, &transaction)
2019 {
2020 Ok(()) => peers_sent.push(peer.to_base58()),
2021 Err(QueueNotificationError::QueueFull) => {
2022 peers_queue_full.push(peer.to_base58())
2023 }
2024 Err(QueueNotificationError::NoConnection) => unreachable!(),
2025 }
2026 }
2027
2028 log!(
2029 &task.platform,
2030 Debug,
2031 "network",
2032 "transaction-announced",
2033 chain = task.network[chain_id].log_name,
2034 transaction =
2035 hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
2036 size = transaction.len(),
2037 peers_sent = peers_sent.join(", "),
2038 peers_queue_full = peers_queue_full.join(", "),
2039 );
2040
2041 let _ = result.send(peers_to_send);
2042 }
2043 WakeUpReason::MessageForChain(
2044 chain_id,
2045 ToBackgroundChain::SendBlockAnnounce {
2046 target,
2047 scale_encoded_header,
2048 is_best,
2049 result,
2050 },
2051 ) => {
2052 let _ = result.send(task.network.gossip_send_block_announce(
2054 &target,
2055 chain_id,
2056 &scale_encoded_header,
2057 is_best,
2058 ));
2059 }
2060 WakeUpReason::MessageForChain(
2061 _chain_id,
2062 ToBackgroundChain::SendBitswapMessage {
2063 target,
2064 message,
2065 result,
2066 },
2067 ) => {
2068 let _ = result.send(task.network.bitswap_send_message(&target, message));
2069 }
2070 WakeUpReason::MessageForChain(
2071 _chain_id,
2072 ToBackgroundChain::BroadcastBitswapMessage { message, result },
2073 ) => {
2074 let peers = task
2075 .network
2076 .established_bitswap_desired()
2077 .cloned()
2078 .collect::<Vec<_>>();
2079 let results = peers
2080 .iter()
2081 .map(|peer| {
2082 (
2083 peer,
2084 task.network.bitswap_send_message(peer, message.clone()),
2085 )
2086 })
2087 .collect::<Vec<_>>(); let succeeded_peers = results
2090 .iter()
2091 .filter_map(|(peer, r)| r.is_ok().then(|| (*peer).clone()))
2092 .collect::<Vec<_>>();
2093
2094 let r = if !succeeded_peers.is_empty() {
2096 Ok(succeeded_peers)
2097 } else if results
2098 .iter()
2099 .any(|(_peer, r)| matches!(r, Err(SendBitswapMessageError::QueueFull)))
2100 {
2101 Err(SendBitswapMessageError::QueueFull)
2104 } else {
2105 Err(SendBitswapMessageError::NoConnection)
2108 };
2109
2110 let _ = result.send(r);
2111 }
2112 WakeUpReason::MessageForChain(
2113 chain_id,
2114 ToBackgroundChain::BroadcastStatement { statement, result },
2115 ) => {
2116 let peers_to_send = task
2117 .network
2118 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2119 .cloned()
2120 .collect::<Vec<_>>();
2121
2122 let total = peers_to_send.len();
2123 let mut sent = 0;
2124 for peer in &peers_to_send {
2125 if task
2126 .network
2127 .gossip_send_statement(peer, chain_id, statement.clone())
2128 .is_ok()
2129 {
2130 sent += 1;
2131 }
2132 }
2133
2134 log!(
2135 &task.platform,
2136 Debug,
2137 "network",
2138 "statement-broadcast",
2139 chain = task.network[chain_id].log_name,
2140 sent,
2141 total,
2142 );
2143
2144 let _ = result.send(BroadcastStatementResult { sent, total });
2145 }
2146 WakeUpReason::MessageForChain(
2147 chain_id,
2148 ToBackgroundChain::UpdateTopicAffinity { filter },
2149 ) => {
2150 task.current_affinity_filter
2151 .insert(chain_id, filter.clone());
2152 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2153 let mut to_remove = Vec::new();
2154 for peer_id in peers.iter() {
2155 if let Err(
2156 SendTopicAffinityError::NoConnection
2157 | SendTopicAffinityError::ProtocolV1,
2158 ) = task.network.send_topic_affinity(peer_id, chain_id, &filter)
2159 {
2160 to_remove.push(peer_id.clone());
2161 }
2162 }
2163 for peer_id in &to_remove {
2164 peers.remove(peer_id);
2165 }
2166 }
2167 }
2168 WakeUpReason::MessageForChain(
2169 chain_id,
2170 ToBackgroundChain::Discover {
2171 list,
2172 important_nodes,
2173 },
2174 ) => {
2175 for (peer_id, addrs) in list {
2176 if important_nodes {
2177 task.important_nodes
2178 .entry(chain_id)
2179 .or_default()
2180 .insert(peer_id.clone());
2181 }
2182
2183 task.peering_strategy
2186 .insert_chain_peer(chain_id, peer_id.clone(), 30); for addr in addrs {
2189 let _ =
2190 task.peering_strategy
2191 .insert_address(&peer_id, addr.into_bytes(), 10);
2192 }
2194 }
2195 }
2196 WakeUpReason::MessageForChain(
2197 chain_id,
2198 ToBackgroundChain::DiscoveredNodes { result },
2199 ) => {
2200 let _ = result.send(
2202 task.peering_strategy
2203 .chain_peers_unordered(&chain_id)
2204 .map(|peer_id| {
2205 let addrs = task
2206 .peering_strategy
2207 .peer_addresses(peer_id)
2208 .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
2209 .collect::<Vec<_>>();
2210 (peer_id.clone(), addrs)
2211 })
2212 .collect::<Vec<_>>(),
2213 );
2214 }
2215 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
2216 let _ = result.send(
2217 task.network
2218 .gossip_connected_peers(
2219 chain_id,
2220 service::GossipKind::ConsensusTransactions,
2221 )
2222 .cloned()
2223 .collect(),
2224 );
2225 }
2226 WakeUpReason::StartDiscovery(chain_id) => {
2227 let chain = &mut task.network[chain_id];
2229 chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
2230 chain.next_discovery_period =
2231 cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
2232 task.chains_by_next_discovery.insert(
2233 (chain.next_discovery_when.clone(), chain_id),
2234 Box::pin(
2235 task.platform
2236 .sleep(task.network[chain_id].next_discovery_period),
2237 ),
2238 );
2239
2240 let random_peer_id = {
2241 let mut pub_key = [0; 32];
2242 rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
2243 PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
2244 };
2245
2246 let target = task
2248 .network
2249 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2250 .next()
2251 .cloned();
2252
2253 if let Some(target) = target {
2254 match task.network.start_kademlia_find_node_request(
2255 &target,
2256 chain_id,
2257 &random_peer_id,
2258 Duration::from_secs(20),
2259 ) {
2260 Ok(_) => {}
2261 Err(service::StartRequestError::NoConnection) => unreachable!(),
2262 };
2263
2264 log!(
2265 &task.platform,
2266 Debug,
2267 "network",
2268 "discovery-find-node-started",
2269 chain = &task.network[chain_id].log_name,
2270 request_target = target,
2271 requested_peer_id = random_peer_id
2272 );
2273 } else {
2274 log!(
2275 &task.platform,
2276 Debug,
2277 "network",
2278 "discovery-skipped-no-peer",
2279 chain = &task.network[chain_id].log_name
2280 );
2281 }
2282 }
2283 WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
2284 peer_id,
2285 expected_peer_id,
2286 id,
2287 }) => {
2288 let remote_addr =
2289 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
2291 {
2292 log!(
2293 &task.platform,
2294 Debug,
2295 "network",
2296 "handshake-finished-peer-id-mismatch",
2297 remote_addr,
2298 expected_peer_id,
2299 actual_peer_id = peer_id
2300 );
2301
2302 let _was_in = task
2303 .peering_strategy
2304 .decrease_address_connections_and_remove_if_zero(
2305 expected_peer_id,
2306 remote_addr.as_ref(),
2307 );
2308 debug_assert!(_was_in.is_ok());
2309 let _ = task.peering_strategy.increase_address_connections(
2310 &peer_id,
2311 remote_addr.into_bytes().to_vec(),
2312 10,
2313 );
2314 } else {
2315 log!(
2316 &task.platform,
2317 Debug,
2318 "network",
2319 "handshake-finished",
2320 remote_addr,
2321 peer_id
2322 );
2323 }
2324
2325 task.bitswap_peering_strategy
2326 .increase_peer_connections(&peer_id);
2327 }
2328 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2329 expected_peer_id: Some(_),
2330 ..
2331 })
2332 | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
2333 let (address, peer_id, handshake_finished) = match wake_up_reason {
2334 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2335 address,
2336 expected_peer_id: Some(peer_id),
2337 ..
2338 }) => (address, peer_id, false),
2339 WakeUpReason::NetworkEvent(service::Event::Disconnected {
2340 address,
2341 peer_id,
2342 ..
2343 }) => (address, peer_id, true),
2344 _ => unreachable!(),
2345 };
2346
2347 task.peering_strategy
2348 .decrease_address_connections(&peer_id, &address)
2349 .unwrap();
2350 let address = Multiaddr::from_bytes(address).unwrap();
2351 log!(
2352 &task.platform,
2353 Debug,
2354 "network",
2355 "connection-shutdown",
2356 peer_id,
2357 address,
2358 ?handshake_finished
2359 );
2360
2361 let ban_duration = if handshake_finished {
2372 Duration::from_secs(5)
2373 } else {
2374 Duration::from_secs(2)
2375 };
2376 task.network.gossip_remove_desired_all(
2377 &peer_id,
2378 service::GossipKind::ConsensusTransactions,
2379 );
2380 for (&chain_id, what_happened) in task
2381 .peering_strategy
2382 .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
2383 {
2384 if matches!(
2385 what_happened,
2386 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2387 ) {
2388 log!(
2389 &task.platform,
2390 Debug,
2391 "network",
2392 "slot-unassigned",
2393 chain = &task.network[chain_id].log_name,
2394 peer_id,
2395 ?ban_duration,
2396 reason = "pre-handshake-disconnect"
2398 );
2399 }
2400 }
2401
2402 if handshake_finished {
2403 task.network.bitswap_remove_desired(&peer_id);
2404 let what_happened = task
2405 .bitswap_peering_strategy
2406 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration);
2407 if matches!(
2408 what_happened,
2409 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true },
2410 ) {
2411 log!(
2412 &task.platform,
2413 Debug,
2414 "network",
2415 "bitswap-slot-unassigned",
2416 peer_id,
2417 ?ban_duration,
2418 reason = "disconnect",
2419 );
2420 }
2421 let _ = task
2422 .bitswap_peering_strategy
2423 .decrease_peer_connections(&peer_id);
2424 }
2425 }
2426 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2427 expected_peer_id: None,
2428 ..
2429 }) => {
2430 debug_assert!(false);
2433 }
2434 WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
2435 id,
2436 peer_id,
2437 ping_time,
2438 }) => {
2439 let remote_addr =
2440 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); log!(
2442 &task.platform,
2443 Debug,
2444 "network",
2445 "pong",
2446 peer_id,
2447 remote_addr,
2448 ?ping_time
2449 );
2450 }
2451 WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
2452 chain_id,
2453 peer_id,
2454 announce,
2455 }) => {
2456 log!(
2457 &task.platform,
2458 Debug,
2459 "network",
2460 "block-announce-received",
2461 chain = &task.network[chain_id].log_name,
2462 peer_id,
2463 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2464 announce.decode().scale_encoded_header
2465 )),
2466 is_best = announce.decode().is_best
2467 );
2468
2469 let decoded_announce = announce.decode();
2470 if decoded_announce.is_best {
2471 let link = task
2472 .open_gossip_links
2473 .get_mut(&(chain_id, peer_id.clone()))
2474 .unwrap();
2475 if let Ok(decoded) = header::decode(
2476 decoded_announce.scale_encoded_header,
2477 task.network[chain_id].block_number_bytes,
2478 ) {
2479 link.best_block_hash = header::hash_from_scale_encoded_header(
2480 decoded_announce.scale_encoded_header,
2481 );
2482 link.best_block_number = decoded.number;
2483 }
2484 }
2485
2486 debug_assert!(task.event_pending_send.is_none());
2487 task.event_pending_send =
2488 Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2489 }
2490 WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2491 peer_id,
2492 chain_id,
2493 role,
2494 best_number,
2495 best_hash,
2496 kind: service::GossipKind::ConsensusTransactions,
2497 }) => {
2498 log!(
2499 &task.platform,
2500 Debug,
2501 "network",
2502 "gossip-open-success",
2503 chain = &task.network[chain_id].log_name,
2504 peer_id,
2505 best_number,
2506 best_hash = HashDisplay(&best_hash)
2507 );
2508
2509 let _prev_value = task.open_gossip_links.insert(
2510 (chain_id, peer_id.clone()),
2511 OpenGossipLinkState {
2512 best_block_number: best_number,
2513 best_block_hash: best_hash,
2514 role,
2515 finalized_block_height: None,
2516 },
2517 );
2518 debug_assert!(_prev_value.is_none());
2519
2520 task.chains_ever_gossip_connected.insert(chain_id);
2521
2522 debug_assert!(task.event_pending_send.is_none());
2523 task.event_pending_send = Some((
2524 chain_id,
2525 Event::Connected {
2526 peer_id,
2527 role,
2528 best_block_number: best_number,
2529 best_block_hash: best_hash,
2530 },
2531 ));
2532 }
2533 WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2534 peer_id,
2535 chain_id,
2536 error,
2537 kind: service::GossipKind::ConsensusTransactions,
2538 }) => {
2539 log!(
2540 &task.platform,
2541 Debug,
2542 "network",
2543 "gossip-open-error",
2544 chain = &task.network[chain_id].log_name,
2545 peer_id,
2546 ?error,
2547 );
2548 let ban_duration = Duration::from_millis(5500);
2552
2553 let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2556 matches!(
2557 task.peering_strategy
2558 .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2559 basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2560 )
2561 } else {
2562 matches!(
2563 task.peering_strategy.unassign_slot_and_ban(
2564 &chain_id,
2565 &peer_id,
2566 task.platform.now() + ban_duration,
2567 ),
2568 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2569 )
2570 };
2571
2572 if had_slot {
2573 log!(
2574 &task.platform,
2575 Debug,
2576 "network",
2577 "slot-unassigned",
2578 chain = &task.network[chain_id].log_name,
2579 peer_id,
2580 ?ban_duration,
2581 reason = "gossip-open-failed"
2582 );
2583 task.network.gossip_remove_desired(
2584 chain_id,
2585 &peer_id,
2586 service::GossipKind::ConsensusTransactions,
2587 );
2588 }
2589 }
2590 WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2591 peer_id,
2592 chain_id,
2593 kind: service::GossipKind::ConsensusTransactions,
2594 }) => {
2595 log!(
2596 &task.platform,
2597 Debug,
2598 "network",
2599 "gossip-closed",
2600 chain = &task.network[chain_id].log_name,
2601 peer_id,
2602 );
2603 let ban_duration = Duration::from_secs(10);
2604
2605 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2606 debug_assert!(_was_in.is_some());
2607
2608 if matches!(
2611 task.peering_strategy.unassign_slot_and_ban(
2612 &chain_id,
2613 &peer_id,
2614 task.platform.now() + ban_duration,
2615 ),
2616 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2617 ) {
2618 log!(
2619 &task.platform,
2620 Debug,
2621 "network",
2622 "slot-unassigned",
2623 chain = &task.network[chain_id].log_name,
2624 peer_id,
2625 ?ban_duration,
2626 reason = "gossip-closed"
2627 );
2628 task.network.gossip_remove_desired(
2629 chain_id,
2630 &peer_id,
2631 service::GossipKind::ConsensusTransactions,
2632 );
2633 }
2634
2635 if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2636 peers.remove(&peer_id);
2637 }
2638
2639 debug_assert!(task.event_pending_send.is_none());
2640 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2641 }
2642 WakeUpReason::NetworkEvent(service::Event::BitswapConnected { peer_id }) => {
2643 log!(
2644 &task.platform,
2645 Debug,
2646 "network",
2647 "bitswap-open-success",
2648 peer_id
2649 );
2650 }
2651 WakeUpReason::NetworkEvent(service::Event::BitswapOpenFailed { peer_id, error }) => {
2652 log!(
2653 &task.platform,
2654 Debug,
2655 "network",
2656 "bitswap-open-error",
2657 peer_id,
2658 ?error
2659 );
2660 let ban_duration = if error.is_protocol_not_available() {
2661 Duration::from_secs(600)
2662 } else {
2663 Duration::from_secs(15)
2664 };
2665 if matches!(
2666 task.bitswap_peering_strategy
2667 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2668 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2669 ) {
2670 log!(
2671 &task.platform,
2672 Debug,
2673 "network",
2674 "bitswap-slot-unassigned",
2675 peer_id,
2676 ?ban_duration,
2677 reason = "bitswap-open-failed"
2678 );
2679 task.network.bitswap_remove_desired(&peer_id);
2680 }
2681 }
2682 WakeUpReason::NetworkEvent(service::Event::BitswapMessage { peer_id, message }) => {
2683 log!(
2684 &task.platform,
2685 Debug,
2686 "network",
2687 "bitswap-message-received",
2688 peer_id
2689 );
2690 debug_assert!(task.bitswap_event_pending_send.is_none());
2691 task.bitswap_event_pending_send =
2692 Some(BitswapEvent::BitswapMessage { peer_id, message });
2693 }
2694 WakeUpReason::NetworkEvent(service::Event::BitswapDisconnected { peer_id }) => {
2695 log!(&task.platform, Debug, "network", "bitswap-closed", peer_id);
2696 let ban_duration = Duration::from_secs(10);
2697 if matches!(
2698 task.bitswap_peering_strategy
2699 .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2700 bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2701 ) {
2702 log!(
2703 &task.platform,
2704 Debug,
2705 "network",
2706 "bitswap-slot-unassigned",
2707 peer_id,
2708 ?ban_duration,
2709 reason = "bitswap-closed"
2710 );
2711 task.network.bitswap_remove_desired(&peer_id);
2712 }
2713 }
2714 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2715 substream_id,
2716 peer_id,
2717 chain_id,
2718 response: service::RequestResult::Blocks(response),
2719 }) => {
2720 match &response {
2721 Ok(blocks) => {
2722 log!(
2723 &task.platform,
2724 Debug,
2725 "network",
2726 "blocks-request-success",
2727 chain = task.network[chain_id].log_name,
2728 target = peer_id,
2729 num_blocks = blocks.len(),
2730 block_data_total_size =
2731 BytesDisplay(blocks.iter().fold(0, |sum, block| {
2732 let block_size = block.header.as_ref().map_or(0, |h| h.len())
2733 + block
2734 .body
2735 .as_ref()
2736 .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2737 + block
2738 .justifications
2739 .as_ref()
2740 .into_iter()
2741 .flat_map(|l| l.iter())
2742 .fold(0, |s, j| s + j.justification.len());
2743 sum + u64::try_from(block_size).unwrap()
2744 }))
2745 );
2746 }
2747 Err(error) => {
2748 log!(
2749 &task.platform,
2750 Debug,
2751 "network",
2752 "blocks-request-error",
2753 chain = task.network[chain_id].log_name,
2754 target = peer_id,
2755 ?error
2756 );
2757 }
2758 }
2759
2760 match &response {
2761 Ok(_) => {}
2762 Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2763 Err(err) => {
2764 log!(
2765 &task.platform,
2766 Debug,
2767 "network",
2768 format!(
2769 "Error in block request with {}. This might indicate an \
2770 incompatibility. Error: {}",
2771 peer_id, err
2772 )
2773 );
2774 }
2775 }
2776
2777 let _ = task
2778 .blocks_requests
2779 .remove(&substream_id)
2780 .unwrap()
2781 .send(response.map_err(BlocksRequestError::Request));
2782 }
2783 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2784 substream_id,
2785 peer_id,
2786 chain_id,
2787 response: service::RequestResult::GrandpaWarpSync(response),
2788 }) => {
2789 match &response {
2790 Ok(response) => {
2791 let decoded = response.decode();
2793 log!(
2794 &task.platform,
2795 Debug,
2796 "network",
2797 "warp-sync-request-success",
2798 chain = task.network[chain_id].log_name,
2799 target = peer_id,
2800 num_fragments = decoded.fragments.len(),
2801 is_finished = ?decoded.is_finished,
2802 );
2803 }
2804 Err(error) => {
2805 log!(
2806 &task.platform,
2807 Debug,
2808 "network",
2809 "warp-sync-request-error",
2810 chain = task.network[chain_id].log_name,
2811 target = peer_id,
2812 ?error,
2813 );
2814 }
2815 }
2816
2817 let _ = task
2818 .grandpa_warp_sync_requests
2819 .remove(&substream_id)
2820 .unwrap()
2821 .send(response.map_err(WarpSyncRequestError::Request));
2822 }
2823 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2824 substream_id,
2825 peer_id,
2826 chain_id,
2827 response: service::RequestResult::StorageProof(response),
2828 }) => {
2829 match &response {
2830 Ok(items) => {
2831 let decoded = items.decode();
2832 log!(
2833 &task.platform,
2834 Debug,
2835 "network",
2836 "storage-proof-request-success",
2837 chain = task.network[chain_id].log_name,
2838 target = peer_id,
2839 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2840 );
2841 }
2842 Err(error) => {
2843 log!(
2844 &task.platform,
2845 Debug,
2846 "network",
2847 "storage-proof-request-error",
2848 chain = task.network[chain_id].log_name,
2849 target = peer_id,
2850 ?error
2851 );
2852 }
2853 }
2854
2855 if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2858 let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2859 } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2860 {
2861 let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2862 } else {
2863 unreachable!()
2864 }
2865 }
2866 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2867 substream_id,
2868 peer_id,
2869 chain_id,
2870 response: service::RequestResult::CallProof(response),
2871 }) => {
2872 match &response {
2873 Ok(items) => {
2874 let decoded = items.decode();
2875 log!(
2876 &task.platform,
2877 Debug,
2878 "network",
2879 "call-proof-request-success",
2880 chain = task.network[chain_id].log_name,
2881 target = peer_id,
2882 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2883 );
2884 }
2885 Err(error) => {
2886 log!(
2887 &task.platform,
2888 Debug,
2889 "network",
2890 "call-proof-request-error",
2891 chain = task.network[chain_id].log_name,
2892 target = peer_id,
2893 ?error
2894 );
2895 }
2896 }
2897
2898 let _ = task
2899 .call_proof_requests
2900 .remove(&substream_id)
2901 .unwrap()
2902 .send(response.map_err(CallProofRequestError::Request));
2903 }
2904 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2905 peer_id: requestee_peer_id,
2906 chain_id,
2907 response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2908 ..
2909 }) => {
2910 for (peer_id, mut addrs) in nodes {
2911 if addrs.len() >= 10 {
2914 addrs.truncate(10);
2915 }
2916
2917 let mut valid_addrs = Vec::with_capacity(addrs.len());
2918 for addr in addrs {
2919 match Multiaddr::from_bytes(addr) {
2920 Ok(mut a) => {
2921 if !pop_p2p_if_matches(&mut a, &peer_id) {
2922 log!(
2923 &task.platform,
2924 Debug,
2925 "network",
2926 "discovered-address-peer-id-mismatch",
2927 chain = &task.network[chain_id].log_name,
2928 announced_peer_id = peer_id,
2929 addr = &a,
2930 obtained_from = requestee_peer_id
2931 );
2932 continue;
2933 }
2934 if platform::address_parse::multiaddr_to_address(&a)
2935 .ok()
2936 .map_or(false, |addr| {
2937 task.platform.supports_connection_type((&addr).into())
2938 })
2939 {
2940 valid_addrs.push(a)
2941 } else {
2942 log!(
2943 &task.platform,
2944 Debug,
2945 "network",
2946 "discovered-address-not-supported",
2947 chain = &task.network[chain_id].log_name,
2948 peer_id,
2949 addr = &a,
2950 obtained_from = requestee_peer_id
2951 );
2952 }
2953 }
2954 Err((error, addr)) => {
2955 log!(
2956 &task.platform,
2957 Debug,
2958 "network",
2959 "discovered-address-invalid",
2960 chain = &task.network[chain_id].log_name,
2961 peer_id,
2962 error,
2963 addr = hex::encode(&addr),
2964 obtained_from = requestee_peer_id
2965 );
2966 }
2967 }
2968 }
2969
2970 if !valid_addrs.is_empty() {
2971 let insert_outcome =
2974 task.peering_strategy
2975 .insert_chain_peer(chain_id, peer_id.clone(), 30); if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2978 peer_removed,
2979 } = insert_outcome
2980 {
2981 if let Some(peer_removed) = peer_removed {
2982 log!(
2983 &task.platform,
2984 Debug,
2985 "network",
2986 "peer-purged-from-address-book",
2987 chain = &task.network[chain_id].log_name,
2988 peer_id = peer_removed,
2989 );
2990 }
2991
2992 log!(
2993 &task.platform,
2994 Debug,
2995 "network",
2996 "peer-discovered",
2997 chain = &task.network[chain_id].log_name,
2998 peer_id,
2999 addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), obtained_from = requestee_peer_id
3001 );
3002 }
3003 }
3004
3005 for addr in valid_addrs {
3006 let _insert_result =
3007 task.peering_strategy
3008 .insert_address(&peer_id, addr.into_bytes(), 10); debug_assert!(!matches!(
3010 _insert_result,
3011 basic_peering_strategy::InsertAddressResult::UnknownPeer
3012 ));
3013 }
3014 }
3015 }
3016 WakeUpReason::NetworkEvent(service::Event::RequestResult {
3017 peer_id,
3018 chain_id,
3019 response: service::RequestResult::KademliaFindNode(Err(error)),
3020 ..
3021 }) => {
3022 log!(
3023 &task.platform,
3024 Debug,
3025 "network",
3026 "discovery-find-node-error",
3027 chain = &task.network[chain_id].log_name,
3028 ?error,
3029 find_node_target = peer_id,
3030 );
3031
3032 match error {
3035 service::KademliaFindNodeError::RequestFailed(err)
3036 if !err.is_protocol_error() => {}
3037
3038 service::KademliaFindNodeError::RequestFailed(
3039 service::RequestError::Substream(
3040 connection::established::RequestError::ProtocolNotAvailable,
3041 ),
3042 ) => {
3043 log!(
3045 &task.platform,
3046 Warn,
3047 "network",
3048 format!(
3049 "Problem during discovery on {}: protocol not available. \
3050 This might indicate that the version of Substrate used by \
3051 the chain doesn't include \
3052 <https://github.com/paritytech/substrate/pull/12545>.",
3053 &task.network[chain_id].log_name
3054 )
3055 );
3056 }
3057 _ => {
3058 log!(
3059 &task.platform,
3060 Debug,
3061 "network",
3062 format!(
3063 "Problem during discovery on {}: {}",
3064 &task.network[chain_id].log_name, error
3065 )
3066 );
3067 }
3068 }
3069 }
3070 WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
3071 unreachable!()
3073 }
3074 WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
3075 peer_id,
3076 chain_id,
3077 kind: service::GossipKind::ConsensusTransactions,
3078 }) => {
3079 if task
3084 .network
3085 .opened_gossip_undesired_by_chain(chain_id)
3086 .count()
3087 < 4
3088 {
3089 log!(
3090 &task.platform,
3091 Debug,
3092 "network",
3093 "gossip-in-request",
3094 chain = &task.network[chain_id].log_name,
3095 peer_id,
3096 outcome = "accepted"
3097 );
3098 task.network
3099 .gossip_open(
3100 chain_id,
3101 &peer_id,
3102 service::GossipKind::ConsensusTransactions,
3103 )
3104 .unwrap();
3105 } else {
3106 log!(
3107 &task.platform,
3108 Debug,
3109 "network",
3110 "gossip-in-request",
3111 chain = &task.network[chain_id].log_name,
3112 peer_id,
3113 outcome = "rejected",
3114 );
3115 task.network
3116 .gossip_close(
3117 chain_id,
3118 &peer_id,
3119 service::GossipKind::ConsensusTransactions,
3120 )
3121 .unwrap();
3122 }
3123 }
3124 WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
3125 unreachable!()
3127 }
3128 WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
3129 peer_id,
3130 substream_id,
3131 }) => {
3132 log!(
3133 &task.platform,
3134 Debug,
3135 "network",
3136 "identify-request-received",
3137 peer_id,
3138 );
3139 task.network
3140 .respond_identify(substream_id, &task.identify_agent_version);
3141 }
3142 WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
3143 WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
3144 unreachable!()
3146 }
3147 WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
3148 chain_id,
3149 peer_id,
3150 state,
3151 }) => {
3152 log!(
3153 &task.platform,
3154 Debug,
3155 "network",
3156 "grandpa-neighbor-packet-received",
3157 chain = &task.network[chain_id].log_name,
3158 peer_id,
3159 round_number = state.round_number,
3160 set_id = state.set_id,
3161 commit_finalized_height = state.commit_finalized_height,
3162 );
3163
3164 task.open_gossip_links
3165 .get_mut(&(chain_id, peer_id.clone()))
3166 .unwrap()
3167 .finalized_block_height = Some(state.commit_finalized_height);
3168
3169 debug_assert!(task.event_pending_send.is_none());
3170 task.event_pending_send = Some((
3171 chain_id,
3172 Event::GrandpaNeighborPacket {
3173 peer_id,
3174 finalized_block_height: state.commit_finalized_height,
3175 },
3176 ));
3177 }
3178 WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
3179 chain_id,
3180 peer_id,
3181 message,
3182 }) => {
3183 log!(
3184 &task.platform,
3185 Debug,
3186 "network",
3187 "grandpa-commit-message-received",
3188 chain = &task.network[chain_id].log_name,
3189 peer_id,
3190 target_block_hash = HashDisplay(message.decode().target_hash),
3191 );
3192
3193 debug_assert!(task.event_pending_send.is_none());
3194 task.event_pending_send =
3195 Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
3196 }
3197 WakeUpReason::NetworkEvent(service::Event::StatementsNotification {
3198 chain_id,
3199 peer_id,
3200 statements,
3201 }) => {
3202 debug_assert!(task.event_pending_send.is_none());
3203
3204 if statements.is_empty() {
3205 continue;
3206 }
3207
3208 task.event_pending_send = Some((
3209 chain_id,
3210 Event::StatementsNotification {
3211 peer_id,
3212 statements,
3213 },
3214 ));
3215 }
3216 WakeUpReason::NetworkEvent(service::Event::StatementProtocolConnected {
3217 peer_id,
3218 chain_id,
3219 version,
3220 }) => {
3221 log!(
3222 &task.platform,
3223 Trace,
3224 "network",
3225 "statement-protocol-open-success",
3226 chain = &task.network[chain_id].log_name,
3227 peer_id,
3228 ?version,
3229 );
3230
3231 if matches!(version, codec::StatementProtocolVersion::V2) {
3232 task.v2_statement_peers
3233 .entry(chain_id)
3234 .or_insert_with(|| {
3235 HashSet::with_capacity_and_hasher(16, Default::default())
3236 })
3237 .insert(peer_id.clone());
3238 if let Some(filter) = task.current_affinity_filter.get(&chain_id) {
3239 if let Err(
3240 SendTopicAffinityError::NoConnection
3241 | SendTopicAffinityError::ProtocolV1,
3242 ) = task.network.send_topic_affinity(&peer_id, chain_id, filter)
3243 {
3244 task.v2_statement_peers
3245 .get_mut(&chain_id)
3246 .unwrap()
3247 .remove(&peer_id);
3248 }
3249 }
3250 }
3251 }
3252 WakeUpReason::NetworkEvent(service::Event::StatementTopicAffinityReceived {
3254 ..
3255 }) => {}
3256 WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
3257 log!(
3259 &task.platform,
3260 Warn,
3261 "network",
3262 "protocol-error",
3263 peer_id,
3264 ?error
3265 );
3266
3267 }
3269 WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
3270 task.peering_strategy.assign_slot(&chain_id, &peer_id);
3271
3272 log!(
3273 &task.platform,
3274 Debug,
3275 "network",
3276 "slot-assigned",
3277 chain = &task.network[chain_id].log_name,
3278 peer_id
3279 );
3280
3281 task.network.gossip_insert_desired(
3282 chain_id,
3283 peer_id,
3284 service::GossipKind::ConsensusTransactions,
3285 );
3286 }
3287 WakeUpReason::CanAssignBitswapSlot(peer_id) => {
3288 task.bitswap_peering_strategy.assign_slot(&peer_id).unwrap();
3289
3290 log!(
3291 &task.platform,
3292 Debug,
3293 "network",
3294 "bitswap-slot-assigned",
3295 peer_id
3296 );
3297
3298 task.network.bitswap_insert_desired(peer_id);
3299 }
3300 WakeUpReason::NextRecentConnectionRestore => {
3301 task.num_recent_connection_opening =
3302 task.num_recent_connection_opening.saturating_sub(1);
3303 }
3304 WakeUpReason::CanStartConnect(expected_peer_id) => {
3305 let Some(multiaddr) = task
3306 .peering_strategy
3307 .pick_address_and_add_connection(&expected_peer_id)
3308 else {
3309 task.network.gossip_remove_desired_all(
3311 &expected_peer_id,
3312 service::GossipKind::ConsensusTransactions,
3313 );
3314 let ban_duration = Duration::from_secs(10);
3315 for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
3316 &expected_peer_id,
3317 task.platform.now() + ban_duration,
3318 ) {
3319 if matches!(
3320 what_happened,
3321 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
3322 ) {
3323 log!(
3324 &task.platform,
3325 Debug,
3326 "network",
3327 "slot-unassigned",
3328 chain = &task.network[chain_id].log_name,
3329 peer_id = expected_peer_id,
3330 ?ban_duration,
3331 reason = "no-address"
3332 );
3333 }
3334 }
3335 continue;
3336 };
3337
3338 let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
3339 Ok(a) => a,
3340 Err((multiaddr::FromBytesError, addr)) => {
3341 let _was_in = task
3343 .peering_strategy
3344 .decrease_address_connections_and_remove_if_zero(
3345 &expected_peer_id,
3346 &addr,
3347 );
3348 debug_assert!(_was_in.is_ok());
3349 continue;
3350 }
3351 };
3352
3353 let address = address_parse::multiaddr_to_address(&multiaddr)
3354 .ok()
3355 .filter(|addr| {
3356 task.platform.supports_connection_type(match &addr {
3357 address_parse::AddressOrMultiStreamAddress::Address(addr) => {
3358 From::from(addr)
3359 }
3360 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3361 addr,
3362 ) => From::from(addr),
3363 })
3364 });
3365
3366 let Some(address) = address else {
3367 let _was_in = task
3369 .peering_strategy
3370 .decrease_address_connections_and_remove_if_zero(
3371 &expected_peer_id,
3372 multiaddr.as_ref(),
3373 );
3374 debug_assert!(_was_in.is_ok());
3375 continue;
3376 };
3377
3378 let noise_key = {
3380 let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
3381 task.platform.fill_random_bytes(&mut *noise_static_key);
3382 let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
3383 task.platform.fill_random_bytes(&mut *libp2p_key);
3384 connection::NoiseKey::new(&libp2p_key, &noise_static_key)
3385 };
3386
3387 log!(
3388 &task.platform,
3389 Debug,
3390 "network",
3391 "connection-started",
3392 expected_peer_id,
3393 remote_addr = multiaddr,
3394 local_peer_id =
3395 peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
3396 .into_peer_id(),
3397 );
3398
3399 task.num_recent_connection_opening += 1;
3400
3401 let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
3402 async_channel::bounded(8);
3403 let task_name = format!("connection-{}", multiaddr);
3404
3405 match address {
3406 address_parse::AddressOrMultiStreamAddress::Address(address) => {
3407 let connection = task.platform.connect_stream(address).await;
3410
3411 let (connection_id, connection_task) =
3412 task.network.add_single_stream_connection(
3413 task.platform.now(),
3414 service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
3415 is_initiator: true,
3416 noise_key: &noise_key,
3417 },
3418 multiaddr.clone().into_bytes(),
3419 Some(expected_peer_id.clone()),
3420 coordinator_to_connection_tx,
3421 );
3422
3423 task.platform.spawn_task(
3424 task_name.into(),
3425 tasks::single_stream_connection_task::<TPlat>(
3426 connection,
3427 multiaddr.to_string(),
3428 task.platform.clone(),
3429 connection_id,
3430 connection_task,
3431 coordinator_to_connection_rx,
3432 task.tasks_messages_tx.clone(),
3433 ),
3434 );
3435 }
3436 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3437 platform::MultiStreamAddress::WebRtc {
3438 ip,
3439 port,
3440 remote_certificate_sha256,
3441 },
3442 ) => {
3443 let connection = task
3448 .platform
3449 .connect_multistream(platform::MultiStreamAddress::WebRtc {
3450 ip,
3451 port,
3452 remote_certificate_sha256,
3453 })
3454 .await;
3455
3456 let local_tls_certificate_multihash = [18u8, 32]
3458 .into_iter()
3459 .chain(connection.local_tls_certificate_sha256.into_iter())
3460 .collect();
3461 let remote_tls_certificate_multihash = [18u8, 32]
3462 .into_iter()
3463 .chain(remote_certificate_sha256.iter().copied())
3464 .collect();
3465
3466 let (connection_id, connection_task) =
3467 task.network.add_multi_stream_connection(
3468 task.platform.now(),
3469 service::MultiStreamHandshakeKind::WebRtc {
3470 is_initiator: true,
3471 local_tls_certificate_multihash,
3472 remote_tls_certificate_multihash,
3473 noise_key: &noise_key,
3474 },
3475 multiaddr.clone().into_bytes(),
3476 Some(expected_peer_id.clone()),
3477 coordinator_to_connection_tx,
3478 );
3479
3480 task.platform.spawn_task(
3481 task_name.into(),
3482 tasks::webrtc_multi_stream_connection_task::<TPlat>(
3483 connection.connection,
3484 multiaddr.to_string(),
3485 task.platform.clone(),
3486 connection_id,
3487 connection_task,
3488 coordinator_to_connection_rx,
3489 task.tasks_messages_tx.clone(),
3490 ),
3491 );
3492 }
3493 }
3494 }
3495 WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
3496 task.network
3497 .gossip_open(
3498 chain_id,
3499 &peer_id,
3500 service::GossipKind::ConsensusTransactions,
3501 )
3502 .unwrap();
3503
3504 log!(
3505 &task.platform,
3506 Debug,
3507 "network",
3508 "gossip-open-start",
3509 chain = &task.network[chain_id].log_name,
3510 peer_id,
3511 );
3512 }
3513 WakeUpReason::CanOpenBitswap(peer_id) => {
3514 task.network.bitswap_open(&peer_id).unwrap();
3515
3516 log!(
3517 &task.platform,
3518 Debug,
3519 "network",
3520 "bitswap-open-start",
3521 peer_id
3522 );
3523 }
3524 WakeUpReason::MessageToConnection {
3525 connection_id,
3526 message,
3527 } => {
3528 let _send_result = task.network[connection_id].send(message).await;
3536 debug_assert!(_send_result.is_ok());
3537 }
3538 }
3539 }
3540}
3541
3542fn pop_p2p_if_matches(
3545 addr: &mut smoldot::libp2p::multiaddr::Multiaddr,
3546 expected_peer: &smoldot::libp2p::peer_id::PeerId,
3547) -> bool {
3548 use smoldot::libp2p::multiaddr::Protocol;
3549 match addr.iter().last() {
3550 Some(Protocol::P2p(mh)) => {
3551 if mh.into_bytes() == expected_peer.as_bytes() {
3552 addr.pop();
3553 true
3554 } else {
3555 false
3556 }
3557 }
3558 _ => true,
3559 }
3560}
3561
3562#[cfg(test)]
3563mod tests {
3564 use super::pop_p2p_if_matches;
3565 use smoldot::libp2p::{multiaddr::Multiaddr, peer_id::PeerId};
3566
3567 const PEER_A: &str = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN";
3571 const PEER_B: &str = "12D3KooWQk1yQtG1YugyKjiQf6KNk8VjGGAT5xy1FWcnRKN4yXYJ";
3572
3573 fn peer(s: &str) -> PeerId {
3574 PeerId::from_bytes(bs58::decode(s).into_vec().unwrap()).unwrap()
3575 }
3576
3577 #[test]
3578 fn no_suffix_passes_through_unchanged() {
3579 let mut addr: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3580 let before = addr.clone();
3581 assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3582 assert_eq!(addr, before);
3583 }
3584
3585 #[test]
3586 fn matching_suffix_is_stripped() {
3587 let mut addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3588 .parse()
3589 .unwrap();
3590 assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3591 let expected: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3592 assert_eq!(addr, expected);
3593 }
3594
3595 #[test]
3596 fn mismatched_suffix_rejects_and_keeps_addr() {
3597 let original: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3598 .parse()
3599 .unwrap();
3600 let mut addr = original.clone();
3601 assert!(!pop_p2p_if_matches(&mut addr, &peer(PEER_B)));
3602 assert_eq!(addr, original);
3603 }
3604}