Skip to main content

smoldot_light/
network_service.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Background network service.
19//!
20//! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes.
21//! Importantly, its design is oriented towards the particular use case of the light client.
22//!
23//! The [`NetworkService`] spawns one background task (using [`PlatformRef::spawn_task`]) for
24//! each active connection.
25//!
26//! The objective of the [`NetworkService`] in general is to try stay connected as much as
27//! possible to the nodes of the peer-to-peer network of the chain, and maintain open substreams
28//! with them in order to send out requests (e.g. block requests) and notifications (e.g. block
29//! announces).
30//!
31//! Connectivity to the network is performed in the background as an implementation detail of
32//! the service. The public API only allows emitting requests and notifications towards the
33//! already-connected nodes.
34//!
35//! After a [`NetworkService`] is created, one can add chains using [`NetworkService::add_chain`].
36//! If all references to a [`NetworkServiceChain`] are destroyed, the chain is automatically
37//! purged.
38//!
39//! An important part of the API is the list of channel receivers of [`Event`] returned by
40//! [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to the
41//! network connectivity.
42
43use crate::{
44    log,
45    platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49    borrow::ToOwned as _,
50    boxed::Box,
51    collections::BTreeMap,
52    format,
53    string::{String, ToString as _},
54    sync::Arc,
55    vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, num::NonZeroUsize, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65    header,
66    informant::{BytesDisplay, HashDisplay},
67    libp2p::{
68        connection,
69        multiaddr::{self, Multiaddr},
70        peer_id,
71    },
72    network::{basic_peering_strategy, bitswap_peering_strategy, codec, service},
73};
74
75pub use codec::{AffinityFilter, CallProofRequestConfig, Role};
76use service::SendTopicAffinityError;
77pub use service::{
78    ChainId, EncodedMerkleProof, PeerId, QueueNotificationError, SendBitswapMessageError,
79};
80
81/// Configuration for the Statement Store protocol.
82#[derive(Debug, Clone)]
83pub struct StatementProtocolConfig {
84    /// Per-subscription LRU cache size used for deduplicating delivered statements.
85    max_seen_statements: NonZeroUsize,
86    false_positive_rate: f64,
87    bloom_seed: u128,
88    affinity_update_interval: Duration,
89}
90
91impl StatementProtocolConfig {
92    pub fn new(
93        max_seen_statements: NonZeroUsize,
94        false_positive_rate: f64,
95        bloom_seed: u128,
96        affinity_update_interval: Duration,
97    ) -> Self {
98        assert!(
99            false_positive_rate.is_finite()
100                && false_positive_rate > 0.0
101                && false_positive_rate < 1.0
102        );
103        assert!(!affinity_update_interval.is_zero());
104        StatementProtocolConfig {
105            max_seen_statements,
106            false_positive_rate,
107            bloom_seed,
108            affinity_update_interval,
109        }
110    }
111
112    pub fn max_seen_statements(&self) -> NonZeroUsize {
113        self.max_seen_statements
114    }
115
116    pub fn false_positive_rate(&self) -> f64 {
117        self.false_positive_rate
118    }
119
120    pub fn bloom_seed(&self) -> u128 {
121        self.bloom_seed
122    }
123
124    pub fn affinity_update_interval(&self) -> Duration {
125        self.affinity_update_interval
126    }
127}
128
129mod tasks;
130
131/// Configuration for a [`NetworkService`].
132pub struct Config<TPlat> {
133    /// Access to the platform's capabilities.
134    pub platform: TPlat,
135
136    /// Value sent back for the agent version when receiving an identification request.
137    pub identify_agent_version: String,
138
139    /// Capacity to allocate for the list of chains.
140    pub chains_capacity: usize,
141
142    /// Maximum number of connections that the service can open simultaneously. After this value
143    /// has been reached, a new connection can be opened after each
144    /// [`Config::connections_open_pool_restore_delay`].
145    pub connections_open_pool_size: u32,
146
147    /// Delay after which the service can open a new connection.
148    /// The delay is cumulative. If no connection has been opened for example for twice this
149    /// duration, then two connections can be opened at the same time, up to a maximum of
150    /// [`Config::connections_open_pool_size`].
151    pub connections_open_pool_restore_delay: Duration,
152}
153
154/// See [`NetworkService::add_chain`].
155///
156/// Note that this configuration is intentionally missing a field containing the bootstrap
157/// nodes of the chain. Bootstrap nodes are supposed to be added afterwards by calling
158/// [`NetworkServiceChain::discover`].
159pub struct ConfigChain {
160    /// Name of the chain, for logging purposes.
161    pub log_name: String,
162
163    /// Number of "out slots" of this chain. We establish simultaneously gossip substreams up to
164    /// this number of peers.
165    pub num_out_slots: usize,
166
167    /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether
168    /// the chains match.
169    ///
170    /// > **Note**: Be aware that this *must* be the *genesis* block, not any block known to be
171    /// >           in the chain.
172    pub genesis_block_hash: [u8; 32],
173
174    /// Number and hash of the current best block. Can later be updated with
175    /// [`NetworkServiceChain::set_local_best_block`].
176    pub best_block: (u64, [u8; 32]),
177
178    /// Optional identifier to insert into the networking protocol names. Used to differentiate
179    /// between chains with the same genesis hash.
180    pub fork_id: Option<String>,
181
182    /// Number of bytes of the block number in the networking protocol.
183    pub block_number_bytes: usize,
184
185    /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the
186    /// number of the finalized block at the time of the initialization.
187    pub grandpa_protocol_finalized_block_height: Option<u64>,
188
189    /// If `Some`, enables the statement store protocol.
190    pub statement_protocol_config: Option<StatementProtocolConfig>,
191}
192
193pub struct NetworkService<TPlat: PlatformRef> {
194    /// Channel connected to the background service.
195    messages_tx: async_channel::Sender<ToBackground<TPlat>>,
196
197    /// See [`Config::platform`].
198    platform: TPlat,
199}
200
201impl<TPlat: PlatformRef> NetworkService<TPlat> {
202    /// Initializes the network service with the given configuration.
203    pub fn new(config: Config<TPlat>) -> Arc<Self> {
204        let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
205
206        let network = service::ChainNetwork::new(service::Config {
207            chains_capacity: config.chains_capacity,
208            connections_capacity: 32,
209            // Shortened from 8s: parallel dials hold slots until this fires.
210            handshake_timeout: Duration::from_secs(4),
211            randomness_seed: {
212                let mut seed = [0; 32];
213                config.platform.fill_random_bytes(&mut seed);
214                seed
215            },
216        });
217
218        // Spawn main task that processes the network service.
219        let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
220        let task = Box::pin(background_task(BackgroundTask {
221            randomness: rand_chacha::ChaCha20Rng::from_seed({
222                let mut seed = [0; 32];
223                config.platform.fill_random_bytes(&mut seed);
224                seed
225            }),
226            identify_agent_version: config.identify_agent_version,
227            tasks_messages_tx,
228            tasks_messages_rx: Box::pin(tasks_messages_rx),
229            peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
230                basic_peering_strategy::Config {
231                    randomness_seed: {
232                        let mut seed = [0; 32];
233                        config.platform.fill_random_bytes(&mut seed);
234                        seed
235                    },
236                    peers_capacity: 50, // TODO: ?
237                    chains_capacity: config.chains_capacity,
238                },
239            ),
240            bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy::new(
241                bitswap_peering_strategy::Config {
242                    randomness_seed: {
243                        let mut seed = [0; 32];
244                        config.platform.fill_random_bytes(&mut seed);
245                        seed
246                    },
247                    peers_capacity: 50, // TODO: hardcoded to the same value as `peering_strategy`.
248                },
249            ),
250            network,
251            connections_open_pool_size: config.connections_open_pool_size,
252            connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
253            num_recent_connection_opening: 0,
254            next_recent_connection_restore: None,
255            platform: config.platform.clone(),
256            open_gossip_links: BTreeMap::new(),
257            chains_ever_gossip_connected: HashSet::with_capacity_and_hasher(4, Default::default()),
258            v2_statement_peers: HashMap::with_capacity_and_hasher(4, Default::default()),
259            current_affinity_filter: HashMap::with_capacity_and_hasher(4, Default::default()),
260            event_pending_send: None,
261            event_senders: either::Left(Vec::new()),
262            pending_new_subscriptions: Vec::new(),
263            bitswap_event_pending_send: None,
264            bitswap_event_senders: either::Left(Vec::new()),
265            pending_new_bitswap_subscriptions: Vec::new(),
266            important_nodes: HashMap::with_capacity_and_hasher(16, Default::default()),
267            main_messages_rx: Box::pin(main_messages_rx),
268            messages_rx: stream::SelectAll::new(),
269            blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
270            grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
271            storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
272            call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
273            child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
274            chains_by_next_discovery: BTreeMap::new(),
275        }));
276
277        config.platform.spawn_task("network-service".into(), {
278            let platform = config.platform.clone();
279            async move {
280                task.await;
281                log!(&platform, Debug, "network", "shutdown");
282            }
283        });
284
285        Arc::new(NetworkService {
286            messages_tx: main_messages_tx,
287            platform: config.platform,
288        })
289    }
290
291    /// Adds a chain to the list of chains that the network service connects to.
292    ///
293    /// Returns an object representing the chain and that allows interacting with it. If all
294    /// references to [`NetworkServiceChain`] are destroyed, the network service automatically
295    /// purges that chain.
296    pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
297        let (messages_tx, messages_rx) = async_channel::bounded(32);
298
299        // TODO: this code is hacky because we don't want to make `add_chain` async at the moment, because it's not convenient for lib.rs
300        self.platform.spawn_task("add-chain-message-send".into(), {
301            let config = service::ChainConfig {
302                grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
303                    |commit_finalized_height| service::GrandpaState {
304                        commit_finalized_height,
305                        round_number: 1,
306                        set_id: 0,
307                    },
308                ),
309                enable_statement_protocol: config.statement_protocol_config.is_some(),
310                fork_id: config.fork_id.clone(),
311                block_number_bytes: config.block_number_bytes,
312                best_hash: config.best_block.1,
313                best_number: config.best_block.0,
314                genesis_hash: config.genesis_block_hash,
315                role: Role::Light,
316                allow_inbound_block_requests: false,
317                user_data: Chain {
318                    log_name: config.log_name,
319                    block_number_bytes: config.block_number_bytes,
320                    num_out_slots: config.num_out_slots,
321                    num_references: NonZero::<usize>::new(1).unwrap(),
322                    next_discovery_period: Duration::from_secs(2),
323                    next_discovery_when: self.platform.now(),
324                },
325            };
326
327            let messages_tx = self.messages_tx.clone();
328            async move {
329                let _ = messages_tx
330                    .send(ToBackground::AddChain {
331                        messages_rx,
332                        config,
333                    })
334                    .await;
335            }
336        });
337
338        Arc::new(NetworkServiceChain {
339            _keep_alive_messages_tx: self.messages_tx.clone(),
340            messages_tx,
341            marker: core::marker::PhantomData,
342        })
343    }
344}
345
346pub struct NetworkServiceChain<TPlat: PlatformRef> {
347    /// Copy of [`NetworkService::messages_tx`]. Used in order to maintain the network service
348    /// background task alive.
349    _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
350
351    /// Channel to send messages to the background task.
352    messages_tx: async_channel::Sender<ToBackgroundChain>,
353
354    /// Dummy to hold the `TPlat` type.
355    marker: core::marker::PhantomData<TPlat>,
356}
357
358/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`].
359#[derive(Debug, Copy, Clone, PartialEq, Eq)]
360pub enum BanSeverity {
361    Low,
362    High,
363}
364
365impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
366    /// Subscribes to the networking events that happen on the given chain.
367    ///
368    /// Calling this function returns a `Receiver` that receives events about the chain.
369    /// The new channel will immediately receive events about all the existing connections, so
370    /// that it is able to maintain a coherent view of the network.
371    ///
372    /// Note that this function is `async`, but it should return very quickly.
373    ///
374    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
375    /// connections will be back-pressured until the channel isn't full anymore.
376    ///
377    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
378    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
379    /// [`NetworkServiceChain::subscribe`] again to obtain a new `Receiver`.
380    ///
381    // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour
382    pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
383        let (tx, rx) = async_channel::bounded(128);
384
385        self.messages_tx
386            .send(ToBackgroundChain::Subscribe { sender: tx })
387            .await
388            .unwrap();
389
390        rx
391    }
392
393    /// Subscribes to the Bitswap events that happen on the network. Bitswap events subscription is
394    /// separate from other network service events, because Bitswap events are big and are not
395    /// needed by the most of subscribers.
396    ///
397    /// Note that this function is `async`, but it should return very quickly.
398    ///
399    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
400    /// connections will be back-pressured until the channel isn't full anymore.
401    ///
402    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
403    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
404    /// [`NetworkServiceChain::subscribe_bitswap`] again to obtain a new `Receiver`.
405    ///
406    // TODO: the last section of the doc seem to contradict itself.
407    pub async fn subscribe_bitswap(&self) -> async_channel::Receiver<BitswapEvent> {
408        let (tx, rx) = async_channel::bounded(128);
409
410        self.messages_tx
411            .send(ToBackgroundChain::SubscribeBitswap { sender: tx })
412            .await
413            .unwrap();
414
415        rx
416    }
417
418    /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
419    /// generated. Prevents a new gossip link with the same peer from being reopened for a
420    /// little while.
421    ///
422    /// `reason` is a human-readable string printed in the logs.
423    ///
424    /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
425    /// reconnection was already happening as the call to this function is still being processed.
426    /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
427    /// words, this function guarantees that we will be disconnected in the future rather than
428    /// guarantees that we will disconnect.
429    pub async fn ban_and_disconnect(
430        &self,
431        peer_id: PeerId,
432        severity: BanSeverity,
433        reason: &'static str,
434    ) {
435        let _ = self
436            .messages_tx
437            .send(ToBackgroundChain::DisconnectAndBan {
438                peer_id,
439                severity,
440                reason,
441            })
442            .await;
443    }
444
445    /// Sends a blocks request to the given peer.
446    // TODO: more docs
447    pub async fn blocks_request(
448        self: Arc<Self>,
449        target: PeerId,
450        config: codec::BlocksRequestConfig,
451        timeout: Duration,
452    ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
453        let (tx, rx) = oneshot::channel();
454
455        self.messages_tx
456            .send(ToBackgroundChain::StartBlocksRequest {
457                target: target.clone(),
458                config,
459                timeout,
460                result: tx,
461            })
462            .await
463            .unwrap();
464
465        rx.await.unwrap()
466    }
467
468    /// Sends a grandpa warp sync request to the given peer.
469    // TODO: more docs
470    pub async fn grandpa_warp_sync_request(
471        self: Arc<Self>,
472        target: PeerId,
473        begin_hash: [u8; 32],
474        timeout: Duration,
475    ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
476        let (tx, rx) = oneshot::channel();
477
478        self.messages_tx
479            .send(ToBackgroundChain::StartWarpSyncRequest {
480                target: target.clone(),
481                begin_hash,
482                timeout,
483                result: tx,
484            })
485            .await
486            .unwrap();
487
488        rx.await.unwrap()
489    }
490
491    pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
492        self.messages_tx
493            .send(ToBackgroundChain::SetLocalBestBlock {
494                best_hash,
495                best_number,
496            })
497            .await
498            .unwrap();
499    }
500
501    pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
502        self.messages_tx
503            .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
504            .await
505            .unwrap();
506    }
507
508    /// Sends a storage proof request to the given peer.
509    // TODO: more docs
510    pub async fn storage_proof_request(
511        self: Arc<Self>,
512        target: PeerId, // TODO: takes by value because of futures longevity issue
513        config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
514        timeout: Duration,
515    ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
516        let (tx, rx) = oneshot::channel();
517
518        self.messages_tx
519            .send(ToBackgroundChain::StartStorageProofRequest {
520                target: target.clone(),
521                config: codec::StorageProofRequestConfig {
522                    block_hash: config.block_hash,
523                    keys: config
524                        .keys
525                        .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead
526                        .collect::<Vec<_>>()
527                        .into_iter(),
528                },
529                timeout,
530                result: tx,
531            })
532            .await
533            .unwrap();
534
535        rx.await.unwrap()
536    }
537
538    /// Sends a call proof request to the given peer.
539    ///
540    /// See also [`NetworkServiceChain::call_proof_request`].
541    // TODO: more docs
542    pub async fn call_proof_request(
543        self: Arc<Self>,
544        target: PeerId, // TODO: takes by value because of futures longevity issue
545        config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
546        timeout: Duration,
547    ) -> Result<EncodedMerkleProof, CallProofRequestError> {
548        let (tx, rx) = oneshot::channel();
549
550        self.messages_tx
551            .send(ToBackgroundChain::StartCallProofRequest {
552                target: target.clone(),
553                config: codec::CallProofRequestConfig {
554                    block_hash: config.block_hash,
555                    method: config.method.into_owned().into(),
556                    parameter_vectored: config
557                        .parameter_vectored
558                        .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead
559                        .collect::<Vec<_>>()
560                        .into_iter(),
561                },
562                timeout,
563                result: tx,
564            })
565            .await
566            .unwrap();
567
568        rx.await.unwrap()
569    }
570
571    /// Sends a child storage proof request to the given peer.
572    pub async fn child_storage_proof_request(
573        self: Arc<Self>,
574        target: PeerId,
575        config: codec::ChildStorageProofRequestConfig<
576            impl AsRef<[u8]> + Clone,
577            impl Iterator<Item = impl AsRef<[u8]> + Clone>,
578        >,
579        timeout: Duration,
580    ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
581        let (tx, rx) = oneshot::channel();
582
583        self.messages_tx
584            .send(ToBackgroundChain::StartChildStorageProofRequest {
585                target: target.clone(),
586                config: ChildStorageProofRequestConfigOwned {
587                    block_hash: config.block_hash,
588                    child_trie: config.child_trie.as_ref().to_vec(),
589                    keys: config
590                        .keys
591                        .map(|key| key.as_ref().to_vec())
592                        .collect::<Vec<_>>(),
593                },
594                timeout,
595                result: tx,
596            })
597            .await
598            .unwrap();
599
600        rx.await.unwrap()
601    }
602
603    /// Announces transaction to the peers we are connected to.
604    ///
605    /// Returns a list of peers that we have sent the transaction to. Can return an empty `Vec`
606    /// if we didn't send the transaction to any peer.
607    ///
608    /// Note that the remote doesn't confirm that it has received the transaction. Because
609    /// networking is inherently unreliable, successfully sending a transaction to a peer doesn't
610    /// necessarily mean that the remote has received it. In practice, however, the likelihood of
611    /// a transaction not being received are extremely low. This can be considered as known flaw.
612    pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
613        let (tx, rx) = oneshot::channel();
614
615        self.messages_tx
616            .send(ToBackgroundChain::AnnounceTransaction {
617                transaction: transaction.to_vec(), // TODO: ovheread
618                result: tx,
619            })
620            .await
621            .unwrap();
622
623        rx.await.unwrap()
624    }
625
626    /// See [`service::ChainNetwork::gossip_send_block_announce`].
627    pub async fn send_block_announce(
628        self: Arc<Self>,
629        target: &PeerId,
630        scale_encoded_header: &[u8],
631        is_best: bool,
632    ) -> Result<(), QueueNotificationError> {
633        let (tx, rx) = oneshot::channel();
634
635        self.messages_tx
636            .send(ToBackgroundChain::SendBlockAnnounce {
637                target: target.clone(),                              // TODO: overhead
638                scale_encoded_header: scale_encoded_header.to_vec(), // TODO: overhead
639                is_best,
640                result: tx,
641            })
642            .await
643            .unwrap();
644
645        rx.await.unwrap()
646    }
647
648    /// Send Bitswap message to the given peer.
649    pub async fn send_bitswap_message(
650        &self,
651        target: PeerId,
652        message: Vec<u8>,
653    ) -> Result<(), SendBitswapMessageError> {
654        let (tx, rx) = oneshot::channel();
655
656        self.messages_tx
657            .send(ToBackgroundChain::SendBitswapMessage {
658                target,
659                message,
660                result: tx,
661            })
662            .await
663            .unwrap();
664
665        rx.await.unwrap()
666    }
667
668    /// Broadcast Bitswap message to all [`service::ChainNetwork::established_bitswap_desired`]
669    /// peers.
670    ///
671    /// Returns the peers message was broadcast to or an error if the message cannot be sent
672    /// to at least one peer.
673    // TODO: better use a dedicated error type instead of reusing a lower-level
674    // `SendBitswapMessageErorr`.
675    pub async fn broadcast_bitswap_message(
676        &self,
677        message: Vec<u8>,
678    ) -> Result<Vec<PeerId>, SendBitswapMessageError> {
679        let (tx, rx) = oneshot::channel();
680
681        self.messages_tx
682            .send(ToBackgroundChain::BroadcastBitswapMessage {
683                message,
684                result: tx,
685            })
686            .await
687            .unwrap();
688
689        rx.await.unwrap()
690    }
691
692    /// Broadcast a statement notification to all gossip-connected peers.
693    pub async fn broadcast_statement(
694        self: Arc<Self>,
695        statement: Vec<u8>,
696    ) -> BroadcastStatementResult {
697        let (tx, rx) = oneshot::channel();
698
699        self.messages_tx
700            .send(ToBackgroundChain::BroadcastStatement {
701                statement,
702                result: tx,
703            })
704            .await
705            .unwrap();
706
707        rx.await.unwrap()
708    }
709
710    pub async fn update_topic_affinity(&self, filter: AffinityFilter) {
711        self.messages_tx
712            .send(ToBackgroundChain::UpdateTopicAffinity { filter })
713            .await
714            .unwrap();
715    }
716
717    /// Marks the given peers as belonging to the given chain, and adds some addresses to these
718    /// peers to the address book.
719    ///
720    /// The `important_nodes` parameter indicates whether these nodes are considered note-worthy
721    /// and should have additional logging.
722    pub async fn discover(
723        &self,
724        list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
725        important_nodes: bool,
726    ) {
727        self.messages_tx
728            .send(ToBackgroundChain::Discover {
729                // TODO: overhead
730                list: list
731                    .into_iter()
732                    .map(|(peer_id, addrs)| {
733                        (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
734                    })
735                    .collect::<Vec<_>>()
736                    .into_iter(),
737                important_nodes,
738            })
739            .await
740            .unwrap();
741    }
742
743    /// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
744    /// the network.
745    ///
746    /// Nodes that are discovered might disappear over time. In other words, there is no guarantee
747    /// that a node that has been added through [`NetworkServiceChain::discover`] will later be
748    /// returned by [`NetworkServiceChain::discovered_nodes`].
749    pub async fn discovered_nodes(
750        &self,
751    ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
752        let (tx, rx) = oneshot::channel();
753
754        self.messages_tx
755            .send(ToBackgroundChain::DiscoveredNodes { result: tx })
756            .await
757            .unwrap();
758
759        rx.await
760            .unwrap()
761            .into_iter()
762            .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
763    }
764
765    /// Returns an iterator to the list of [`PeerId`]s that we have an established connection
766    /// with.
767    pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
768        let (tx, rx) = oneshot::channel();
769        self.messages_tx
770            .send(ToBackgroundChain::PeersList { result: tx })
771            .await
772            .unwrap();
773        rx.await.unwrap().into_iter()
774    }
775}
776
777#[derive(Debug, Clone)]
778pub struct BroadcastStatementResult {
779    pub sent: usize,
780    pub total: usize,
781}
782
783/// Event that can happen on the network service.
784#[derive(Debug, Clone)]
785pub enum Event {
786    Connected {
787        peer_id: PeerId,
788        role: Role,
789        best_block_number: u64,
790        best_block_hash: [u8; 32],
791    },
792    Disconnected {
793        peer_id: PeerId,
794    },
795    BlockAnnounce {
796        peer_id: PeerId,
797        announce: service::EncodedBlockAnnounce,
798    },
799    GrandpaNeighborPacket {
800        peer_id: PeerId,
801        finalized_block_height: u64,
802    },
803    /// Received a GrandPa commit message from the network.
804    GrandpaCommitMessage {
805        peer_id: PeerId,
806        message: service::EncodedGrandpaCommitMessage,
807    },
808    /// Received a statement notification from the network.
809    StatementsNotification {
810        peer_id: PeerId,
811        statements: Vec<([u8; 32], codec::Statement)>,
812    },
813}
814
815/// Bitswap event that can be generated by the network service. Because Bitswap messages are big
816/// (up to 2 MiB) and can be delivered at high rate, we use a dedicated subscriber to not copy them
817/// to all network service subscribers.
818#[derive(Debug, Clone)]
819pub enum BitswapEvent {
820    BitswapMessage {
821        peer_id: PeerId,
822        message: service::EncodedBitswapMessage,
823    },
824}
825
826/// Error returned by [`NetworkServiceChain::blocks_request`].
827#[derive(Debug, derive_more::Display, derive_more::Error)]
828pub enum BlocksRequestError {
829    /// No established connection with the target.
830    NoConnection,
831    /// Error during the request.
832    #[display("{_0}")]
833    Request(service::BlocksRequestError),
834}
835
836/// Error returned by [`NetworkServiceChain::grandpa_warp_sync_request`].
837#[derive(Debug, derive_more::Display, derive_more::Error)]
838pub enum WarpSyncRequestError {
839    /// No established connection with the target.
840    NoConnection,
841    /// Error during the request.
842    #[display("{_0}")]
843    Request(service::GrandpaWarpSyncRequestError),
844}
845
846/// Error returned by [`NetworkServiceChain::storage_proof_request`].
847#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
848pub enum StorageProofRequestError {
849    /// No established connection with the target.
850    NoConnection,
851    /// Storage proof request is too large and can't be sent.
852    RequestTooLarge,
853    /// Error during the request.
854    #[display("{_0}")]
855    Request(service::StorageProofRequestError),
856}
857
858/// Error returned by [`NetworkServiceChain::call_proof_request`].
859#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
860pub enum CallProofRequestError {
861    /// No established connection with the target.
862    NoConnection,
863    /// Call proof request is too large and can't be sent.
864    RequestTooLarge,
865    /// Error during the request.
866    #[display("{_0}")]
867    Request(service::CallProofRequestError),
868}
869
870impl CallProofRequestError {
871    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
872    /// issue.
873    pub fn is_network_problem(&self) -> bool {
874        match self {
875            CallProofRequestError::Request(err) => err.is_network_problem(),
876            CallProofRequestError::RequestTooLarge => false,
877            CallProofRequestError::NoConnection => true,
878        }
879    }
880}
881
882/// Error returned by [`NetworkServiceChain::child_storage_proof_request`].
883#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
884pub enum ChildStorageProofRequestError {
885    /// No established connection with the target.
886    NoConnection,
887    /// Child storage proof request is too large and can't be sent.
888    RequestTooLarge,
889    /// Error during the request.
890    #[display("{_0}")]
891    Request(service::StorageProofRequestError),
892}
893
894impl ChildStorageProofRequestError {
895    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
896    /// issue.
897    pub fn is_network_problem(&self) -> bool {
898        match self {
899            ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
900            ChildStorageProofRequestError::RequestTooLarge => false,
901            ChildStorageProofRequestError::NoConnection => true,
902        }
903    }
904}
905
906/// Owned version of [`codec::ChildStorageProofRequestConfig`] for sending across channel.
907struct ChildStorageProofRequestConfigOwned {
908    block_hash: [u8; 32],
909    child_trie: Vec<u8>,
910    keys: Vec<Vec<u8>>,
911}
912
913enum ToBackground<TPlat: PlatformRef> {
914    AddChain {
915        messages_rx: async_channel::Receiver<ToBackgroundChain>,
916        config: service::ChainConfig<Chain<TPlat>>,
917    },
918}
919
920enum ToBackgroundChain {
921    RemoveChain,
922    Subscribe {
923        sender: async_channel::Sender<Event>,
924    },
925    SubscribeBitswap {
926        sender: async_channel::Sender<BitswapEvent>,
927    },
928    DisconnectAndBan {
929        peer_id: PeerId,
930        severity: BanSeverity,
931        reason: &'static str,
932    },
933    // TODO: serialize the request before sending over channel
934    StartBlocksRequest {
935        target: PeerId, // TODO: takes by value because of future longevity issue
936        config: codec::BlocksRequestConfig,
937        timeout: Duration,
938        result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
939    },
940    // TODO: serialize the request before sending over channel
941    StartWarpSyncRequest {
942        target: PeerId,
943        begin_hash: [u8; 32],
944        timeout: Duration,
945        result:
946            oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
947    },
948    // TODO: serialize the request before sending over channel
949    StartStorageProofRequest {
950        target: PeerId,
951        config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
952        timeout: Duration,
953        result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
954    },
955    // TODO: serialize the request before sending over channel
956    StartCallProofRequest {
957        target: PeerId, // TODO: takes by value because of futures longevity issue
958        config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
959        timeout: Duration,
960        result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
961    },
962    // TODO: serialize the request before sending over channel
963    StartChildStorageProofRequest {
964        target: PeerId,
965        config: ChildStorageProofRequestConfigOwned,
966        timeout: Duration,
967        result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
968    },
969    SetLocalBestBlock {
970        best_hash: [u8; 32],
971        best_number: u64,
972    },
973    SetLocalGrandpaState {
974        grandpa_state: service::GrandpaState,
975    },
976    AnnounceTransaction {
977        transaction: Vec<u8>,
978        result: oneshot::Sender<Vec<PeerId>>,
979    },
980    SendBlockAnnounce {
981        target: PeerId,
982        scale_encoded_header: Vec<u8>,
983        is_best: bool,
984        result: oneshot::Sender<Result<(), QueueNotificationError>>,
985    },
986    SendBitswapMessage {
987        target: PeerId,
988        message: Vec<u8>,
989        result: oneshot::Sender<Result<(), SendBitswapMessageError>>,
990    },
991    BroadcastBitswapMessage {
992        message: Vec<u8>,
993        result: oneshot::Sender<Result<Vec<PeerId>, SendBitswapMessageError>>,
994    },
995    BroadcastStatement {
996        statement: Vec<u8>,
997        result: oneshot::Sender<BroadcastStatementResult>,
998    },
999    UpdateTopicAffinity {
1000        filter: AffinityFilter,
1001    },
1002    Discover {
1003        list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
1004        important_nodes: bool,
1005    },
1006    DiscoveredNodes {
1007        result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
1008    },
1009    PeersList {
1010        result: oneshot::Sender<Vec<PeerId>>,
1011    },
1012}
1013
1014struct BackgroundTask<TPlat: PlatformRef> {
1015    /// See [`Config::platform`].
1016    platform: TPlat,
1017
1018    /// Random number generator.
1019    randomness: rand_chacha::ChaCha20Rng,
1020
1021    /// Value provided through [`Config::identify_agent_version`].
1022    identify_agent_version: String,
1023
1024    /// Channel to send messages to the background task.
1025    tasks_messages_tx:
1026        async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
1027
1028    /// Channel to receive messages destined to the background task.
1029    tasks_messages_rx: Pin<
1030        Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
1031    >,
1032
1033    /// Data structure holding the entire state of the networking.
1034    network: service::ChainNetwork<
1035        Chain<TPlat>,
1036        async_channel::Sender<service::CoordinatorToConnection>,
1037        TPlat::Instant,
1038    >,
1039
1040    /// All known peers and their addresses.
1041    peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
1042
1043    /// Bitswap slot assignment strategy.
1044    bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy<TPlat::Instant>,
1045
1046    /// See [`Config::connections_open_pool_size`].
1047    connections_open_pool_size: u32,
1048
1049    /// See [`Config::connections_open_pool_restore_delay`].
1050    connections_open_pool_restore_delay: Duration,
1051
1052    /// Every time a connection is opened, the value in this field is increased by one. After
1053    /// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
1054    /// one.
1055    num_recent_connection_opening: u32,
1056
1057    /// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
1058    next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
1059
1060    /// List of all open gossip links.
1061    // TODO: using this data structure unfortunately means that PeerIds are cloned a lot, maybe some user data in ChainNetwork is better? not sure
1062    open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
1063
1064    /// Chains for which a gossip link has been opened at least once. Used to prefer bootnodes for
1065    /// out slots only until the chain first connects.
1066    chains_ever_gossip_connected: HashSet<ChainId, fnv::FnvBuildHasher>,
1067
1068    /// Connected peers using statement protocol V2, per chain.
1069    v2_statement_peers: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1070
1071    /// Current topic affinity filter per chain, sent to V2 peers on connect.
1072    current_affinity_filter: HashMap<ChainId, AffinityFilter, fnv::FnvBuildHasher>,
1073
1074    /// Important nodes per chain (in practice the bootnodes; see [`NetworkServiceChain::discover`]).
1075    /// They get extra logging, and slot preference until the chain first connects.
1076    // TODO: should also detect whenever we fail to open a block announces substream with any of these peers
1077    important_nodes: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1078
1079    /// Event about to be sent on the senders of [`BackgroundTask::event_senders`].
1080    event_pending_send: Option<(ChainId, Event)>,
1081
1082    /// Bitswap event about to be sent on the senders of [`BackgroundTask::bitswap_event_senders`].
1083    bitswap_event_pending_send: Option<BitswapEvent>,
1084
1085    /// Sending events through the public API.
1086    ///
1087    /// Contains either senders, or a `Future` that is currently sending an event and will yield
1088    /// the senders back once it is finished.
1089    // TODO: sort by ChainId instead of using a Vec?
1090    event_senders: either::Either<
1091        Vec<(ChainId, async_channel::Sender<Event>)>,
1092        Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
1093    >,
1094
1095    /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list.
1096    /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders.
1097    pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
1098
1099    /// Sending Bitswap events through the public API. We use separate channels for Bitswap events,
1100    /// because Bitswap messages are big and only few of event subscribers are interested in them.
1101    ///
1102    /// Contains either senders, or a `Future` that is currently sending an event and will yield
1103    /// the senders back once it is finished.
1104    ///
1105    /// Note that compared to `event_senders`, `bitswap_event_senders` are not associated with
1106    /// chains, because Bitswap messages coming from the network do not have the information about
1107    /// what chain they are coming from.
1108    bitswap_event_senders: either::Either<
1109        Vec<async_channel::Sender<BitswapEvent>>,
1110        Pin<Box<dyn Future<Output = Vec<async_channel::Sender<BitswapEvent>>> + Send>>,
1111    >,
1112
1113    /// Whenever [`NetworkServiceChain::subscribe_bitswap`] is called, the new sender is added to
1114    /// this list. Once [`BackgroundTask::bitswap_event_senders`] is ready, we properly initialize
1115    /// these senders.
1116    pending_new_bitswap_subscriptions: Vec<async_channel::Sender<BitswapEvent>>,
1117
1118    main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
1119
1120    messages_rx:
1121        stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
1122
1123    blocks_requests: HashMap<
1124        service::SubstreamId,
1125        oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
1126        fnv::FnvBuildHasher,
1127    >,
1128
1129    grandpa_warp_sync_requests: HashMap<
1130        service::SubstreamId,
1131        oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
1132        fnv::FnvBuildHasher,
1133    >,
1134
1135    storage_proof_requests: HashMap<
1136        service::SubstreamId,
1137        oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
1138        fnv::FnvBuildHasher,
1139    >,
1140
1141    call_proof_requests: HashMap<
1142        service::SubstreamId,
1143        oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
1144        fnv::FnvBuildHasher,
1145    >,
1146
1147    child_storage_proof_requests: HashMap<
1148        service::SubstreamId,
1149        oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
1150        fnv::FnvBuildHasher,
1151    >,
1152
1153    /// All chains, indexed by the value of [`Chain::next_discovery_when`].
1154    chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
1155}
1156
1157struct Chain<TPlat: PlatformRef> {
1158    log_name: String,
1159
1160    // TODO: this field is a hack due to the fact that `add_chain` can't be `async`; should eventually be fixed after a lib.rs refactor
1161    num_references: NonZero<usize>,
1162
1163    /// See [`ConfigChain::block_number_bytes`].
1164    // TODO: redundant with ChainNetwork? since we might not need to know this in the future i'm reluctant to add a getter to ChainNetwork
1165    block_number_bytes: usize,
1166
1167    /// See [`ConfigChain::num_out_slots`].
1168    num_out_slots: usize,
1169
1170    /// When the next discovery should be started for this chain.
1171    next_discovery_when: TPlat::Instant,
1172
1173    /// After [`Chain::next_discovery_when`] is reached, the following discovery happens after
1174    /// the given duration.
1175    next_discovery_period: Duration,
1176}
1177
1178#[derive(Clone)]
1179struct OpenGossipLinkState {
1180    role: Role,
1181    best_block_number: u64,
1182    best_block_hash: [u8; 32],
1183    /// `None` if unknown.
1184    finalized_block_height: Option<u64>,
1185}
1186
1187async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
1188    loop {
1189        // Yield at every loop in order to provide better tasks granularity.
1190        futures_lite::future::yield_now().await;
1191
1192        enum WakeUpReason<TPlat: PlatformRef> {
1193            ForegroundClosed,
1194            Message(ToBackground<TPlat>),
1195            MessageForChain(ChainId, ToBackgroundChain),
1196            NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
1197            CanAssignSlot(PeerId, ChainId),
1198            CanAssignBitswapSlot(PeerId),
1199            NextRecentConnectionRestore,
1200            CanStartConnect(PeerId),
1201            CanOpenGossip(PeerId, ChainId),
1202            CanOpenBitswap(PeerId),
1203            MessageFromConnection {
1204                connection_id: service::ConnectionId,
1205                message: service::ConnectionToCoordinator,
1206            },
1207            MessageToConnection {
1208                connection_id: service::ConnectionId,
1209                message: service::CoordinatorToConnection,
1210            },
1211            EventSendersReady,
1212            BitswapEventSendersReady,
1213            StartDiscovery(ChainId),
1214        }
1215
1216        let wake_up_reason = {
1217            let message_received = async {
1218                task.main_messages_rx
1219                    .next()
1220                    .await
1221                    .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
1222            };
1223            let message_for_chain_received = async {
1224                // Note that when the last entry of `messages_rx` yields `None`, `messages_rx`
1225                // itself will yield `None`. For this reason, we can't use
1226                // `task.messages_rx.is_empty()` to determine whether `messages_rx` will
1227                // yield `None`.
1228                let Some((chain_id, message)) = task.messages_rx.next().await else {
1229                    future::pending().await
1230                };
1231                WakeUpReason::MessageForChain(chain_id, message)
1232            };
1233            let message_from_task_received = async {
1234                let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
1235                WakeUpReason::MessageFromConnection {
1236                    connection_id,
1237                    message,
1238                }
1239            };
1240            let service_event = async {
1241                if let Some(event) = (task.event_pending_send.is_none()
1242                    && task.bitswap_event_pending_send.is_none()
1243                    && task.pending_new_subscriptions.is_empty()
1244                    && task.pending_new_bitswap_subscriptions.is_empty())
1245                .then(|| task.network.next_event())
1246                .flatten()
1247                {
1248                    WakeUpReason::NetworkEvent(event)
1249                } else if let Some(start_connect) = {
1250                    let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1251                        .then(|| {
1252                            task.network
1253                                .unconnected_desired()
1254                                .choose(&mut task.randomness)
1255                                .cloned()
1256                        })
1257                        .flatten();
1258                    x
1259                } {
1260                    WakeUpReason::CanStartConnect(start_connect)
1261                } else if let Some((peer_id, chain_id)) = {
1262                    let x = task
1263                        .network
1264                        .connected_unopened_gossip_desired()
1265                        .choose(&mut task.randomness)
1266                        .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1267                    x
1268                } {
1269                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
1270                } else if let Some(peer_id) = {
1271                    let x = task
1272                        .network
1273                        .connected_unopened_bitswap_desired()
1274                        .choose(&mut task.randomness)
1275                        .cloned();
1276                    x
1277                } {
1278                    WakeUpReason::CanOpenBitswap(peer_id)
1279                } else if let Some((connection_id, message)) =
1280                    task.network.pull_message_to_connection()
1281                {
1282                    WakeUpReason::MessageToConnection {
1283                        connection_id,
1284                        message,
1285                    }
1286                } else {
1287                    'search: loop {
1288                        let mut earlier_unban = None;
1289
1290                        for chain_id in task.network.chains().collect::<Vec<_>>() {
1291                            if task.network.gossip_desired_num(
1292                                chain_id,
1293                                service::GossipKind::ConsensusTransactions,
1294                            ) >= task.network[chain_id].num_out_slots
1295                            {
1296                                continue;
1297                            }
1298
1299                            let now = task.platform.now();
1300
1301                            // Until the chain first connects, prefer slots for important nodes
1302                            // (the bootnodes); otherwise use the general pool.
1303                            if !task.chains_ever_gossip_connected.contains(&chain_id) {
1304                                if let basic_peering_strategy::AssignablePeer::Assignable(peer_id) =
1305                                    task.peering_strategy.pick_assignable_peer_filtered(
1306                                        &chain_id,
1307                                        &now,
1308                                        |peer_id| {
1309                                            task.important_nodes
1310                                                .get(&chain_id)
1311                                                .map_or(false, |nodes| nodes.contains(peer_id))
1312                                        },
1313                                    )
1314                                {
1315                                    break 'search WakeUpReason::CanAssignSlot(
1316                                        peer_id.clone(),
1317                                        chain_id,
1318                                    );
1319                                }
1320                            }
1321
1322                            match task.peering_strategy.pick_assignable_peer(&chain_id, &now) {
1323                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1324                                    break 'search WakeUpReason::CanAssignSlot(
1325                                        peer_id.clone(),
1326                                        chain_id,
1327                                    );
1328                                }
1329                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
1330                                    next_unban,
1331                                } => {
1332                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1333                                        earlier_unban = Some(next_unban.clone());
1334                                    }
1335                                }
1336                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
1337                            }
1338                        }
1339
1340                        match task
1341                            .bitswap_peering_strategy
1342                            .pick_assignable_peer(&task.platform.now())
1343                        {
1344                            bitswap_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1345                                break 'search WakeUpReason::CanAssignBitswapSlot(peer_id.clone());
1346                            }
1347                            bitswap_peering_strategy::AssignablePeer::AllPeersBanned {
1348                                next_unban,
1349                            } => {
1350                                if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1351                                    earlier_unban = Some(next_unban.clone());
1352                                }
1353                            }
1354                            bitswap_peering_strategy::AssignablePeer::NoPeer => {}
1355                        }
1356
1357                        if let Some(earlier_unban) = earlier_unban {
1358                            task.platform.sleep_until(earlier_unban).await;
1359                        } else {
1360                            future::pending::<()>().await;
1361                        }
1362                    }
1363                }
1364            };
1365            let next_recent_connection_restore = async {
1366                if task.num_recent_connection_opening != 0
1367                    && task.next_recent_connection_restore.is_none()
1368                {
1369                    task.next_recent_connection_restore = Some(Box::pin(
1370                        task.platform
1371                            .sleep(task.connections_open_pool_restore_delay),
1372                    ));
1373                }
1374                if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1375                    delay.await;
1376                    task.next_recent_connection_restore = None;
1377                    WakeUpReason::NextRecentConnectionRestore
1378                } else {
1379                    future::pending().await
1380                }
1381            };
1382            let finished_sending_event = async {
1383                if let either::Right(event_sending_future) = &mut task.event_senders {
1384                    let event_senders = event_sending_future.await;
1385                    task.event_senders = either::Left(event_senders);
1386                    WakeUpReason::EventSendersReady
1387                } else if task.event_pending_send.is_some()
1388                    || !task.pending_new_subscriptions.is_empty()
1389                {
1390                    WakeUpReason::EventSendersReady
1391                } else {
1392                    future::pending().await
1393                }
1394            };
1395            let finished_sending_bitswap_event = async {
1396                if let either::Right(bitswap_event_sending_future) = &mut task.bitswap_event_senders
1397                {
1398                    let bitswap_event_senders = bitswap_event_sending_future.await;
1399                    task.bitswap_event_senders = either::Left(bitswap_event_senders);
1400                    WakeUpReason::BitswapEventSendersReady
1401                } else if task.bitswap_event_pending_send.is_some()
1402                    || !task.pending_new_bitswap_subscriptions.is_empty()
1403                {
1404                    WakeUpReason::BitswapEventSendersReady
1405                } else {
1406                    future::pending().await
1407                }
1408            };
1409            let start_discovery = async {
1410                let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1411                    future::pending().await
1412                };
1413                next_discovery.get_mut().await;
1414                let ((_, chain_id), _) = next_discovery.remove_entry();
1415                WakeUpReason::StartDiscovery(chain_id)
1416            };
1417
1418            message_for_chain_received
1419                .or(message_received)
1420                .or(message_from_task_received)
1421                .or(service_event)
1422                .or(next_recent_connection_restore)
1423                .or(finished_sending_event)
1424                .or(finished_sending_bitswap_event)
1425                .or(start_discovery)
1426                .await
1427        };
1428
1429        match wake_up_reason {
1430            WakeUpReason::ForegroundClosed => {
1431                // End the task.
1432                return;
1433            }
1434            WakeUpReason::Message(ToBackground::AddChain {
1435                messages_rx,
1436                config,
1437            }) => {
1438                // TODO: this is not a completely clean way of handling duplicate chains, because the existing chain might have a different best block and role and all ; also, multiple sync services will call set_best_block and set_finalized_block
1439                let chain_id = match task.network.add_chain(config) {
1440                    Ok(id) => id,
1441                    Err(service::AddChainError::Duplicate { existing_identical }) => {
1442                        task.network[existing_identical].num_references = task.network
1443                            [existing_identical]
1444                            .num_references
1445                            .checked_add(1)
1446                            .unwrap();
1447                        existing_identical
1448                    }
1449                };
1450
1451                task.chains_by_next_discovery.insert(
1452                    (task.network[chain_id].next_discovery_when.clone(), chain_id),
1453                    Box::pin(
1454                        task.platform
1455                            .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1456                    ),
1457                );
1458
1459                task.messages_rx
1460                    .push(Box::pin(
1461                        messages_rx
1462                            .map(move |msg| (chain_id, msg))
1463                            .chain(stream::once(future::ready((
1464                                chain_id,
1465                                ToBackgroundChain::RemoveChain,
1466                            )))),
1467                    ) as Pin<Box<_>>);
1468
1469                log!(
1470                    &task.platform,
1471                    Debug,
1472                    "network",
1473                    "chain-added",
1474                    id = task.network[chain_id].log_name
1475                );
1476            }
1477            WakeUpReason::EventSendersReady => {
1478                // Dispatch the pending event, if any, to the various senders.
1479
1480                // We made sure that the senders were ready before generating an event.
1481                let either::Left(event_senders) = &mut task.event_senders else {
1482                    unreachable!()
1483                };
1484
1485                if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1486                    task.event_pending_send.take()
1487                {
1488                    let mut event_senders = mem::take(event_senders);
1489                    task.event_senders = either::Right(Box::pin(async move {
1490                        // Elements in `event_senders` are removed one by one and inserted
1491                        // back if the channel is still open.
1492                        for index in (0..event_senders.len()).rev() {
1493                            let (event_sender_chain_id, event_sender) =
1494                                event_senders.swap_remove(index);
1495                            if event_sender_chain_id == event_to_dispatch_chain_id {
1496                                if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1497                                    continue;
1498                                }
1499                            }
1500                            event_senders.push((event_sender_chain_id, event_sender));
1501                        }
1502                        event_senders
1503                    }));
1504                } else if !task.pending_new_subscriptions.is_empty() {
1505                    let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1506                    let mut event_senders = mem::take(event_senders);
1507                    // TODO: cloning :-/
1508                    let open_gossip_links = task.open_gossip_links.clone();
1509                    task.event_senders = either::Right(Box::pin(async move {
1510                        for (chain_id, new_subscription) in pending_new_subscriptions {
1511                            for ((link_chain_id, peer_id), state) in &open_gossip_links {
1512                                // TODO: optimize? this is O(n) by chain
1513                                if *link_chain_id != chain_id {
1514                                    continue;
1515                                }
1516
1517                                let _ = new_subscription
1518                                    .send(Event::Connected {
1519                                        peer_id: peer_id.clone(),
1520                                        role: state.role,
1521                                        best_block_number: state.best_block_number,
1522                                        best_block_hash: state.best_block_hash,
1523                                    })
1524                                    .await;
1525
1526                                if let Some(finalized_block_height) = state.finalized_block_height {
1527                                    let _ = new_subscription
1528                                        .send(Event::GrandpaNeighborPacket {
1529                                            peer_id: peer_id.clone(),
1530                                            finalized_block_height,
1531                                        })
1532                                        .await;
1533                                }
1534                            }
1535
1536                            event_senders.push((chain_id, new_subscription));
1537                        }
1538
1539                        event_senders
1540                    }));
1541                }
1542            }
1543            WakeUpReason::BitswapEventSendersReady => {
1544                // We made sure that the senders were ready before generating an event.
1545                let either::Left(bitswap_event_senders) = &mut task.bitswap_event_senders else {
1546                    unreachable!()
1547                };
1548
1549                if let Some(event_to_dispatch) = task.bitswap_event_pending_send.take() {
1550                    let mut bitswap_event_senders = mem::take(bitswap_event_senders);
1551                    task.bitswap_event_senders = either::Right(Box::pin(async move {
1552                        // Elements in `bitswap_event_senders` are removed one by one and
1553                        // inserted back if the channel is still open.
1554                        for index in (0..bitswap_event_senders.len()).rev() {
1555                            let event_sender = bitswap_event_senders.swap_remove(index);
1556                            if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1557                                continue;
1558                            }
1559                            bitswap_event_senders.push(event_sender);
1560                        }
1561                        bitswap_event_senders
1562                    }));
1563                } else if !task.pending_new_bitswap_subscriptions.is_empty() {
1564                    bitswap_event_senders.append(&mut task.pending_new_bitswap_subscriptions);
1565                }
1566            }
1567            WakeUpReason::MessageFromConnection {
1568                connection_id,
1569                message,
1570            } => {
1571                task.network
1572                    .inject_connection_message(connection_id, message);
1573            }
1574            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1575                if let Some(new_ref) =
1576                    NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1577                {
1578                    task.network[chain_id].num_references = new_ref;
1579                    continue;
1580                }
1581
1582                for peer_id in task
1583                    .network
1584                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1585                    .cloned()
1586                    .collect::<Vec<_>>()
1587                {
1588                    task.network
1589                        .gossip_close(
1590                            chain_id,
1591                            &peer_id,
1592                            service::GossipKind::ConsensusTransactions,
1593                        )
1594                        .unwrap();
1595
1596                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1597                    debug_assert!(_was_in.is_some());
1598                }
1599
1600                let _was_in = task
1601                    .chains_by_next_discovery
1602                    .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1603                debug_assert!(_was_in.is_some());
1604
1605                log!(
1606                    &task.platform,
1607                    Debug,
1608                    "network",
1609                    "chain-removed",
1610                    id = task.network[chain_id].log_name
1611                );
1612                task.v2_statement_peers.remove(&chain_id);
1613                task.current_affinity_filter.remove(&chain_id);
1614                task.important_nodes.remove(&chain_id);
1615                task.chains_ever_gossip_connected.remove(&chain_id);
1616                task.network.remove_chain(chain_id).unwrap();
1617                task.peering_strategy.remove_chain_peers(&chain_id);
1618            }
1619            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1620                task.pending_new_subscriptions.push((chain_id, sender));
1621            }
1622            WakeUpReason::MessageForChain(
1623                _chain_id,
1624                ToBackgroundChain::SubscribeBitswap { sender },
1625            ) => {
1626                task.pending_new_bitswap_subscriptions.push(sender);
1627            }
1628            WakeUpReason::MessageForChain(
1629                chain_id,
1630                ToBackgroundChain::DisconnectAndBan {
1631                    peer_id,
1632                    severity,
1633                    reason,
1634                },
1635            ) => {
1636                let ban_duration = Duration::from_secs(match severity {
1637                    BanSeverity::Low => 10,
1638                    BanSeverity::High => 40,
1639                });
1640
1641                let had_slot = matches!(
1642                    task.peering_strategy.unassign_slot_and_ban(
1643                        &chain_id,
1644                        &peer_id,
1645                        task.platform.now() + ban_duration,
1646                    ),
1647                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1648                );
1649
1650                if had_slot {
1651                    log!(
1652                        &task.platform,
1653                        Debug,
1654                        "network",
1655                        "slot-unassigned",
1656                        chain = &task.network[chain_id].log_name,
1657                        peer_id,
1658                        ?ban_duration,
1659                        reason = "user-ban",
1660                        user_reason = reason
1661                    );
1662                    task.network.gossip_remove_desired(
1663                        chain_id,
1664                        &peer_id,
1665                        service::GossipKind::ConsensusTransactions,
1666                    );
1667                }
1668
1669                if task.network.gossip_is_connected(
1670                    chain_id,
1671                    &peer_id,
1672                    service::GossipKind::ConsensusTransactions,
1673                ) {
1674                    let _closed_result = task.network.gossip_close(
1675                        chain_id,
1676                        &peer_id,
1677                        service::GossipKind::ConsensusTransactions,
1678                    );
1679                    debug_assert!(_closed_result.is_ok());
1680
1681                    log!(
1682                        &task.platform,
1683                        Debug,
1684                        "network",
1685                        "gossip-closed",
1686                        chain = &task.network[chain_id].log_name,
1687                        peer_id,
1688                    );
1689
1690                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1691                    debug_assert!(_was_in.is_some());
1692
1693                    if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
1694                        peers.remove(&peer_id);
1695                    }
1696
1697                    debug_assert!(task.event_pending_send.is_none());
1698                    task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1699                }
1700            }
1701            WakeUpReason::MessageForChain(
1702                chain_id,
1703                ToBackgroundChain::StartBlocksRequest {
1704                    target,
1705                    config,
1706                    timeout,
1707                    result,
1708                },
1709            ) => {
1710                match &config.start {
1711                    codec::BlocksRequestConfigStart::Hash(hash) => {
1712                        log!(
1713                            &task.platform,
1714                            Debug,
1715                            "network",
1716                            "blocks-request-started",
1717                            chain = task.network[chain_id].log_name, target,
1718                            start = HashDisplay(hash),
1719                            num = config.desired_count.get(),
1720                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1721                            header = ?config.fields.header, body = ?config.fields.body,
1722                            justifications = ?config.fields.justifications
1723                        );
1724                    }
1725                    codec::BlocksRequestConfigStart::Number(number) => {
1726                        log!(
1727                            &task.platform,
1728                            Debug,
1729                            "network",
1730                            "blocks-request-started",
1731                            chain = task.network[chain_id].log_name, target, start = number,
1732                            num = config.desired_count.get(),
1733                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1734                            header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1735                        );
1736                    }
1737                }
1738
1739                match task
1740                    .network
1741                    .start_blocks_request(&target, chain_id, config.clone(), timeout)
1742                {
1743                    Ok(substream_id) => {
1744                        task.blocks_requests.insert(substream_id, result);
1745                    }
1746                    Err(service::StartRequestError::NoConnection) => {
1747                        log!(
1748                            &task.platform,
1749                            Debug,
1750                            "network",
1751                            "blocks-request-error",
1752                            chain = task.network[chain_id].log_name,
1753                            target,
1754                            error = "NoConnection"
1755                        );
1756                        let _ = result.send(Err(BlocksRequestError::NoConnection));
1757                    }
1758                }
1759            }
1760            WakeUpReason::MessageForChain(
1761                chain_id,
1762                ToBackgroundChain::StartWarpSyncRequest {
1763                    target,
1764                    begin_hash,
1765                    timeout,
1766                    result,
1767                },
1768            ) => {
1769                log!(
1770                    &task.platform,
1771                    Debug,
1772                    "network",
1773                    "warp-sync-request-started",
1774                    chain = task.network[chain_id].log_name,
1775                    target,
1776                    start = HashDisplay(&begin_hash)
1777                );
1778
1779                match task
1780                    .network
1781                    .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1782                {
1783                    Ok(substream_id) => {
1784                        task.grandpa_warp_sync_requests.insert(substream_id, result);
1785                    }
1786                    Err(service::StartRequestError::NoConnection) => {
1787                        log!(
1788                            &task.platform,
1789                            Debug,
1790                            "network",
1791                            "warp-sync-request-error",
1792                            chain = task.network[chain_id].log_name,
1793                            target,
1794                            error = "NoConnection"
1795                        );
1796                        let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1797                    }
1798                }
1799            }
1800            WakeUpReason::MessageForChain(
1801                chain_id,
1802                ToBackgroundChain::StartStorageProofRequest {
1803                    target,
1804                    config,
1805                    timeout,
1806                    result,
1807                },
1808            ) => {
1809                log!(
1810                    &task.platform,
1811                    Debug,
1812                    "network",
1813                    "storage-proof-request-started",
1814                    chain = task.network[chain_id].log_name,
1815                    target,
1816                    block_hash = HashDisplay(&config.block_hash)
1817                );
1818
1819                match task.network.start_storage_proof_request(
1820                    &target,
1821                    chain_id,
1822                    config.clone(),
1823                    timeout,
1824                ) {
1825                    Ok(substream_id) => {
1826                        task.storage_proof_requests.insert(substream_id, result);
1827                    }
1828                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1829                        log!(
1830                            &task.platform,
1831                            Debug,
1832                            "network",
1833                            "storage-proof-request-error",
1834                            chain = task.network[chain_id].log_name,
1835                            target,
1836                            error = "NoConnection"
1837                        );
1838                        let _ = result.send(Err(StorageProofRequestError::NoConnection));
1839                    }
1840                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1841                        log!(
1842                            &task.platform,
1843                            Debug,
1844                            "network",
1845                            "storage-proof-request-error",
1846                            chain = task.network[chain_id].log_name,
1847                            target,
1848                            error = "RequestTooLarge"
1849                        );
1850                        let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1851                    }
1852                };
1853            }
1854            WakeUpReason::MessageForChain(
1855                chain_id,
1856                ToBackgroundChain::StartCallProofRequest {
1857                    target,
1858                    config,
1859                    timeout,
1860                    result,
1861                },
1862            ) => {
1863                log!(
1864                    &task.platform,
1865                    Debug,
1866                    "network",
1867                    "call-proof-request-started",
1868                    chain = task.network[chain_id].log_name,
1869                    target,
1870                    block_hash = HashDisplay(&config.block_hash),
1871                    function = config.method
1872                );
1873                // TODO: log parameter
1874
1875                match task.network.start_call_proof_request(
1876                    &target,
1877                    chain_id,
1878                    config.clone(),
1879                    timeout,
1880                ) {
1881                    Ok(substream_id) => {
1882                        task.call_proof_requests.insert(substream_id, result);
1883                    }
1884                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1885                        log!(
1886                            &task.platform,
1887                            Debug,
1888                            "network",
1889                            "call-proof-request-error",
1890                            chain = task.network[chain_id].log_name,
1891                            target,
1892                            error = "NoConnection"
1893                        );
1894                        let _ = result.send(Err(CallProofRequestError::NoConnection));
1895                    }
1896                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1897                        log!(
1898                            &task.platform,
1899                            Debug,
1900                            "network",
1901                            "call-proof-request-error",
1902                            chain = task.network[chain_id].log_name,
1903                            target,
1904                            error = "RequestTooLarge"
1905                        );
1906                        let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1907                    }
1908                };
1909            }
1910            WakeUpReason::MessageForChain(
1911                chain_id,
1912                ToBackgroundChain::StartChildStorageProofRequest {
1913                    target,
1914                    config,
1915                    timeout,
1916                    result,
1917                },
1918            ) => {
1919                log!(
1920                    &task.platform,
1921                    Debug,
1922                    "network",
1923                    "child-storage-proof-request-started",
1924                    chain = task.network[chain_id].log_name,
1925                    target,
1926                    block_hash = HashDisplay(&config.block_hash)
1927                );
1928
1929                match task.network.start_child_storage_proof_request(
1930                    &target,
1931                    chain_id,
1932                    codec::ChildStorageProofRequestConfig {
1933                        block_hash: config.block_hash,
1934                        child_trie: &config.child_trie,
1935                        keys: config.keys.iter().map(|k| k.as_slice()),
1936                    },
1937                    timeout,
1938                ) {
1939                    Ok(substream_id) => {
1940                        task.child_storage_proof_requests
1941                            .insert(substream_id, result);
1942                    }
1943                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1944                        log!(
1945                            &task.platform,
1946                            Debug,
1947                            "network",
1948                            "child-storage-proof-request-error",
1949                            chain = task.network[chain_id].log_name,
1950                            target,
1951                            error = "NoConnection"
1952                        );
1953                        let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1954                    }
1955                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1956                        log!(
1957                            &task.platform,
1958                            Debug,
1959                            "network",
1960                            "child-storage-proof-request-error",
1961                            chain = task.network[chain_id].log_name,
1962                            target,
1963                            error = "RequestTooLarge"
1964                        );
1965                        let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1966                    }
1967                };
1968            }
1969            WakeUpReason::MessageForChain(
1970                chain_id,
1971                ToBackgroundChain::SetLocalBestBlock {
1972                    best_hash,
1973                    best_number,
1974                },
1975            ) => {
1976                task.network
1977                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1978            }
1979            WakeUpReason::MessageForChain(
1980                chain_id,
1981                ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1982            ) => {
1983                log!(
1984                    &task.platform,
1985                    Debug,
1986                    "network",
1987                    "local-grandpa-state-announced",
1988                    chain = task.network[chain_id].log_name,
1989                    set_id = grandpa_state.set_id,
1990                    commit_finalized_height = grandpa_state.commit_finalized_height,
1991                );
1992
1993                // TODO: log the list of peers we sent the packet to
1994
1995                task.network
1996                    .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1997            }
1998            WakeUpReason::MessageForChain(
1999                chain_id,
2000                ToBackgroundChain::AnnounceTransaction {
2001                    transaction,
2002                    result,
2003                },
2004            ) => {
2005                // TODO: keep track of which peer knows about which transaction, and don't send it again
2006
2007                let peers_to_send = task
2008                    .network
2009                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2010                    .cloned()
2011                    .collect::<Vec<_>>();
2012
2013                let mut peers_sent = Vec::with_capacity(peers_to_send.len());
2014                let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
2015                for peer in &peers_to_send {
2016                    match task
2017                        .network
2018                        .gossip_send_transaction(peer, chain_id, &transaction)
2019                    {
2020                        Ok(()) => peers_sent.push(peer.to_base58()),
2021                        Err(QueueNotificationError::QueueFull) => {
2022                            peers_queue_full.push(peer.to_base58())
2023                        }
2024                        Err(QueueNotificationError::NoConnection) => unreachable!(),
2025                    }
2026                }
2027
2028                log!(
2029                    &task.platform,
2030                    Debug,
2031                    "network",
2032                    "transaction-announced",
2033                    chain = task.network[chain_id].log_name,
2034                    transaction =
2035                        hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
2036                    size = transaction.len(),
2037                    peers_sent = peers_sent.join(", "),
2038                    peers_queue_full = peers_queue_full.join(", "),
2039                );
2040
2041                let _ = result.send(peers_to_send);
2042            }
2043            WakeUpReason::MessageForChain(
2044                chain_id,
2045                ToBackgroundChain::SendBlockAnnounce {
2046                    target,
2047                    scale_encoded_header,
2048                    is_best,
2049                    result,
2050                },
2051            ) => {
2052                // TODO: log who the announce was sent to
2053                let _ = result.send(task.network.gossip_send_block_announce(
2054                    &target,
2055                    chain_id,
2056                    &scale_encoded_header,
2057                    is_best,
2058                ));
2059            }
2060            WakeUpReason::MessageForChain(
2061                _chain_id,
2062                ToBackgroundChain::SendBitswapMessage {
2063                    target,
2064                    message,
2065                    result,
2066                },
2067            ) => {
2068                let _ = result.send(task.network.bitswap_send_message(&target, message));
2069            }
2070            WakeUpReason::MessageForChain(
2071                _chain_id,
2072                ToBackgroundChain::BroadcastBitswapMessage { message, result },
2073            ) => {
2074                let peers = task
2075                    .network
2076                    .established_bitswap_desired()
2077                    .cloned()
2078                    .collect::<Vec<_>>();
2079                let results = peers
2080                    .iter()
2081                    .map(|peer| {
2082                        (
2083                            peer,
2084                            task.network.bitswap_send_message(peer, message.clone()),
2085                        )
2086                    })
2087                    .collect::<Vec<_>>(); // we must collect first to send all messages
2088
2089                let succeeded_peers = results
2090                    .iter()
2091                    .filter_map(|(peer, r)| r.is_ok().then(|| (*peer).clone()))
2092                    .collect::<Vec<_>>();
2093
2094                // TODO: introspecting a third-party error type below doesn't seem good.
2095                let r = if !succeeded_peers.is_empty() {
2096                    Ok(succeeded_peers)
2097                } else if results
2098                    .iter()
2099                    .any(|(_peer, r)| matches!(r, Err(SendBitswapMessageError::QueueFull)))
2100                {
2101                    // `QueueFull` has higher priority than `NoConnection` for possible
2102                    // back-pressure in higher level code.
2103                    Err(SendBitswapMessageError::QueueFull)
2104                } else {
2105                    // This is only emitted if all peers fail with `NoConnection` or there is no
2106                    // peers at all.
2107                    Err(SendBitswapMessageError::NoConnection)
2108                };
2109
2110                let _ = result.send(r);
2111            }
2112            WakeUpReason::MessageForChain(
2113                chain_id,
2114                ToBackgroundChain::BroadcastStatement { statement, result },
2115            ) => {
2116                let peers_to_send = task
2117                    .network
2118                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2119                    .cloned()
2120                    .collect::<Vec<_>>();
2121
2122                let total = peers_to_send.len();
2123                let mut sent = 0;
2124                for peer in &peers_to_send {
2125                    if task
2126                        .network
2127                        .gossip_send_statement(peer, chain_id, statement.clone())
2128                        .is_ok()
2129                    {
2130                        sent += 1;
2131                    }
2132                }
2133
2134                log!(
2135                    &task.platform,
2136                    Debug,
2137                    "network",
2138                    "statement-broadcast",
2139                    chain = task.network[chain_id].log_name,
2140                    sent,
2141                    total,
2142                );
2143
2144                let _ = result.send(BroadcastStatementResult { sent, total });
2145            }
2146            WakeUpReason::MessageForChain(
2147                chain_id,
2148                ToBackgroundChain::UpdateTopicAffinity { filter },
2149            ) => {
2150                task.current_affinity_filter
2151                    .insert(chain_id, filter.clone());
2152                if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2153                    let mut to_remove = Vec::new();
2154                    for peer_id in peers.iter() {
2155                        if let Err(
2156                            SendTopicAffinityError::NoConnection
2157                            | SendTopicAffinityError::ProtocolV1,
2158                        ) = task.network.send_topic_affinity(peer_id, chain_id, &filter)
2159                        {
2160                            to_remove.push(peer_id.clone());
2161                        }
2162                    }
2163                    for peer_id in &to_remove {
2164                        peers.remove(peer_id);
2165                    }
2166                }
2167            }
2168            WakeUpReason::MessageForChain(
2169                chain_id,
2170                ToBackgroundChain::Discover {
2171                    list,
2172                    important_nodes,
2173                },
2174            ) => {
2175                for (peer_id, addrs) in list {
2176                    if important_nodes {
2177                        task.important_nodes
2178                            .entry(chain_id)
2179                            .or_default()
2180                            .insert(peer_id.clone());
2181                    }
2182
2183                    // Note that we must call this function before `insert_address`, as documented
2184                    // in `basic_peering_strategy`.
2185                    task.peering_strategy
2186                        .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2187
2188                    for addr in addrs {
2189                        let _ =
2190                            task.peering_strategy
2191                                .insert_address(&peer_id, addr.into_bytes(), 10);
2192                        // TODO: constant
2193                    }
2194                }
2195            }
2196            WakeUpReason::MessageForChain(
2197                chain_id,
2198                ToBackgroundChain::DiscoveredNodes { result },
2199            ) => {
2200                // TODO: consider returning Vec<u8>s for the addresses?
2201                let _ = result.send(
2202                    task.peering_strategy
2203                        .chain_peers_unordered(&chain_id)
2204                        .map(|peer_id| {
2205                            let addrs = task
2206                                .peering_strategy
2207                                .peer_addresses(peer_id)
2208                                .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
2209                                .collect::<Vec<_>>();
2210                            (peer_id.clone(), addrs)
2211                        })
2212                        .collect::<Vec<_>>(),
2213                );
2214            }
2215            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
2216                let _ = result.send(
2217                    task.network
2218                        .gossip_connected_peers(
2219                            chain_id,
2220                            service::GossipKind::ConsensusTransactions,
2221                        )
2222                        .cloned()
2223                        .collect(),
2224                );
2225            }
2226            WakeUpReason::StartDiscovery(chain_id) => {
2227                // Re-insert the chain in `chains_by_next_discovery`.
2228                let chain = &mut task.network[chain_id];
2229                chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
2230                chain.next_discovery_period =
2231                    cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
2232                task.chains_by_next_discovery.insert(
2233                    (chain.next_discovery_when.clone(), chain_id),
2234                    Box::pin(
2235                        task.platform
2236                            .sleep(task.network[chain_id].next_discovery_period),
2237                    ),
2238                );
2239
2240                let random_peer_id = {
2241                    let mut pub_key = [0; 32];
2242                    rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
2243                    PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
2244                };
2245
2246                // TODO: select target closest to the random peer instead
2247                let target = task
2248                    .network
2249                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2250                    .next()
2251                    .cloned();
2252
2253                if let Some(target) = target {
2254                    match task.network.start_kademlia_find_node_request(
2255                        &target,
2256                        chain_id,
2257                        &random_peer_id,
2258                        Duration::from_secs(20),
2259                    ) {
2260                        Ok(_) => {}
2261                        Err(service::StartRequestError::NoConnection) => unreachable!(),
2262                    };
2263
2264                    log!(
2265                        &task.platform,
2266                        Debug,
2267                        "network",
2268                        "discovery-find-node-started",
2269                        chain = &task.network[chain_id].log_name,
2270                        request_target = target,
2271                        requested_peer_id = random_peer_id
2272                    );
2273                } else {
2274                    log!(
2275                        &task.platform,
2276                        Debug,
2277                        "network",
2278                        "discovery-skipped-no-peer",
2279                        chain = &task.network[chain_id].log_name
2280                    );
2281                }
2282            }
2283            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
2284                peer_id,
2285                expected_peer_id,
2286                id,
2287            }) => {
2288                let remote_addr =
2289                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
2290                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
2291                {
2292                    log!(
2293                        &task.platform,
2294                        Debug,
2295                        "network",
2296                        "handshake-finished-peer-id-mismatch",
2297                        remote_addr,
2298                        expected_peer_id,
2299                        actual_peer_id = peer_id
2300                    );
2301
2302                    let _was_in = task
2303                        .peering_strategy
2304                        .decrease_address_connections_and_remove_if_zero(
2305                            expected_peer_id,
2306                            remote_addr.as_ref(),
2307                        );
2308                    debug_assert!(_was_in.is_ok());
2309                    let _ = task.peering_strategy.increase_address_connections(
2310                        &peer_id,
2311                        remote_addr.into_bytes().to_vec(),
2312                        10,
2313                    );
2314                } else {
2315                    log!(
2316                        &task.platform,
2317                        Debug,
2318                        "network",
2319                        "handshake-finished",
2320                        remote_addr,
2321                        peer_id
2322                    );
2323                }
2324
2325                task.bitswap_peering_strategy
2326                    .increase_peer_connections(&peer_id);
2327            }
2328            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2329                expected_peer_id: Some(_),
2330                ..
2331            })
2332            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
2333                let (address, peer_id, handshake_finished) = match wake_up_reason {
2334                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2335                        address,
2336                        expected_peer_id: Some(peer_id),
2337                        ..
2338                    }) => (address, peer_id, false),
2339                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
2340                        address,
2341                        peer_id,
2342                        ..
2343                    }) => (address, peer_id, true),
2344                    _ => unreachable!(),
2345                };
2346
2347                task.peering_strategy
2348                    .decrease_address_connections(&peer_id, &address)
2349                    .unwrap();
2350                let address = Multiaddr::from_bytes(address).unwrap();
2351                log!(
2352                    &task.platform,
2353                    Debug,
2354                    "network",
2355                    "connection-shutdown",
2356                    peer_id,
2357                    address,
2358                    ?handshake_finished
2359                );
2360
2361                // Ban the peer in order to avoid trying over and over again the same address(es).
2362                // Even if the handshake was finished, it is possible that the peer simply shuts
2363                // down connections immediately after it has been opened, hence the ban.
2364                // Due to race conditions and peerid mismatches, it is possible that there is
2365                // another existing connection or connection attempt with that same peer. However,
2366                // it is not possible to be sure that we will reach 0 connections or connection
2367                // attempts, and thus we ban the peer every time.
2368                // Pre-handshake failures get a shorter ban: many parallel dials time out
2369                // before any handshake completes, and a long slot-hold there dominates
2370                // peer-discovery latency on restarts.
2371                let ban_duration = if handshake_finished {
2372                    Duration::from_secs(5)
2373                } else {
2374                    Duration::from_secs(2)
2375                };
2376                task.network.gossip_remove_desired_all(
2377                    &peer_id,
2378                    service::GossipKind::ConsensusTransactions,
2379                );
2380                for (&chain_id, what_happened) in task
2381                    .peering_strategy
2382                    .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
2383                {
2384                    if matches!(
2385                        what_happened,
2386                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2387                    ) {
2388                        log!(
2389                            &task.platform,
2390                            Debug,
2391                            "network",
2392                            "slot-unassigned",
2393                            chain = &task.network[chain_id].log_name,
2394                            peer_id,
2395                            ?ban_duration,
2396                            // TODO: `reason` might be wrong, `handshake_finished` is not checked.
2397                            reason = "pre-handshake-disconnect"
2398                        );
2399                    }
2400                }
2401
2402                if handshake_finished {
2403                    task.network.bitswap_remove_desired(&peer_id);
2404                    let what_happened = task
2405                        .bitswap_peering_strategy
2406                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration);
2407                    if matches!(
2408                        what_happened,
2409                        bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true },
2410                    ) {
2411                        log!(
2412                            &task.platform,
2413                            Debug,
2414                            "network",
2415                            "bitswap-slot-unassigned",
2416                            peer_id,
2417                            ?ban_duration,
2418                            reason = "disconnect",
2419                        );
2420                    }
2421                    let _ = task
2422                        .bitswap_peering_strategy
2423                        .decrease_peer_connections(&peer_id);
2424                }
2425            }
2426            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2427                expected_peer_id: None,
2428                ..
2429            }) => {
2430                // This path can't be reached as we always set an expected peer id when creating
2431                // a connection.
2432                debug_assert!(false);
2433            }
2434            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
2435                id,
2436                peer_id,
2437                ping_time,
2438            }) => {
2439                let remote_addr =
2440                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
2441                log!(
2442                    &task.platform,
2443                    Debug,
2444                    "network",
2445                    "pong",
2446                    peer_id,
2447                    remote_addr,
2448                    ?ping_time
2449                );
2450            }
2451            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
2452                chain_id,
2453                peer_id,
2454                announce,
2455            }) => {
2456                log!(
2457                    &task.platform,
2458                    Debug,
2459                    "network",
2460                    "block-announce-received",
2461                    chain = &task.network[chain_id].log_name,
2462                    peer_id,
2463                    block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2464                        announce.decode().scale_encoded_header
2465                    )),
2466                    is_best = announce.decode().is_best
2467                );
2468
2469                let decoded_announce = announce.decode();
2470                if decoded_announce.is_best {
2471                    let link = task
2472                        .open_gossip_links
2473                        .get_mut(&(chain_id, peer_id.clone()))
2474                        .unwrap();
2475                    if let Ok(decoded) = header::decode(
2476                        decoded_announce.scale_encoded_header,
2477                        task.network[chain_id].block_number_bytes,
2478                    ) {
2479                        link.best_block_hash = header::hash_from_scale_encoded_header(
2480                            decoded_announce.scale_encoded_header,
2481                        );
2482                        link.best_block_number = decoded.number;
2483                    }
2484                }
2485
2486                debug_assert!(task.event_pending_send.is_none());
2487                task.event_pending_send =
2488                    Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2489            }
2490            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2491                peer_id,
2492                chain_id,
2493                role,
2494                best_number,
2495                best_hash,
2496                kind: service::GossipKind::ConsensusTransactions,
2497            }) => {
2498                log!(
2499                    &task.platform,
2500                    Debug,
2501                    "network",
2502                    "gossip-open-success",
2503                    chain = &task.network[chain_id].log_name,
2504                    peer_id,
2505                    best_number,
2506                    best_hash = HashDisplay(&best_hash)
2507                );
2508
2509                let _prev_value = task.open_gossip_links.insert(
2510                    (chain_id, peer_id.clone()),
2511                    OpenGossipLinkState {
2512                        best_block_number: best_number,
2513                        best_block_hash: best_hash,
2514                        role,
2515                        finalized_block_height: None,
2516                    },
2517                );
2518                debug_assert!(_prev_value.is_none());
2519
2520                task.chains_ever_gossip_connected.insert(chain_id);
2521
2522                debug_assert!(task.event_pending_send.is_none());
2523                task.event_pending_send = Some((
2524                    chain_id,
2525                    Event::Connected {
2526                        peer_id,
2527                        role,
2528                        best_block_number: best_number,
2529                        best_block_hash: best_hash,
2530                    },
2531                ));
2532            }
2533            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2534                peer_id,
2535                chain_id,
2536                error,
2537                kind: service::GossipKind::ConsensusTransactions,
2538            }) => {
2539                log!(
2540                    &task.platform,
2541                    Debug,
2542                    "network",
2543                    "gossip-open-error",
2544                    chain = &task.network[chain_id].log_name,
2545                    peer_id,
2546                    ?error,
2547                );
2548                // Must exceed polkadot-sdk's 5s notification-reject ban; otherwise we retry
2549                // into a still-active remote ban. 0.5s margin covers network delay and
2550                // clock skew between the two sides' ban timers.
2551                let ban_duration = Duration::from_millis(5500);
2552
2553                // Note that peer doesn't necessarily have an out slot, as this event might happen
2554                // as a result of an inbound gossip connection.
2555                let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2556                    matches!(
2557                        task.peering_strategy
2558                            .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2559                        basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2560                    )
2561                } else {
2562                    matches!(
2563                        task.peering_strategy.unassign_slot_and_ban(
2564                            &chain_id,
2565                            &peer_id,
2566                            task.platform.now() + ban_duration,
2567                        ),
2568                        basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2569                    )
2570                };
2571
2572                if had_slot {
2573                    log!(
2574                        &task.platform,
2575                        Debug,
2576                        "network",
2577                        "slot-unassigned",
2578                        chain = &task.network[chain_id].log_name,
2579                        peer_id,
2580                        ?ban_duration,
2581                        reason = "gossip-open-failed"
2582                    );
2583                    task.network.gossip_remove_desired(
2584                        chain_id,
2585                        &peer_id,
2586                        service::GossipKind::ConsensusTransactions,
2587                    );
2588                }
2589            }
2590            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2591                peer_id,
2592                chain_id,
2593                kind: service::GossipKind::ConsensusTransactions,
2594            }) => {
2595                log!(
2596                    &task.platform,
2597                    Debug,
2598                    "network",
2599                    "gossip-closed",
2600                    chain = &task.network[chain_id].log_name,
2601                    peer_id,
2602                );
2603                let ban_duration = Duration::from_secs(10);
2604
2605                let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2606                debug_assert!(_was_in.is_some());
2607
2608                // Note that peer doesn't necessarily have an out slot, as this event might happen
2609                // as a result of an inbound gossip connection.
2610                if matches!(
2611                    task.peering_strategy.unassign_slot_and_ban(
2612                        &chain_id,
2613                        &peer_id,
2614                        task.platform.now() + ban_duration,
2615                    ),
2616                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2617                ) {
2618                    log!(
2619                        &task.platform,
2620                        Debug,
2621                        "network",
2622                        "slot-unassigned",
2623                        chain = &task.network[chain_id].log_name,
2624                        peer_id,
2625                        ?ban_duration,
2626                        reason = "gossip-closed"
2627                    );
2628                    task.network.gossip_remove_desired(
2629                        chain_id,
2630                        &peer_id,
2631                        service::GossipKind::ConsensusTransactions,
2632                    );
2633                }
2634
2635                if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2636                    peers.remove(&peer_id);
2637                }
2638
2639                debug_assert!(task.event_pending_send.is_none());
2640                task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2641            }
2642            WakeUpReason::NetworkEvent(service::Event::BitswapConnected { peer_id }) => {
2643                log!(
2644                    &task.platform,
2645                    Debug,
2646                    "network",
2647                    "bitswap-open-success",
2648                    peer_id
2649                );
2650            }
2651            WakeUpReason::NetworkEvent(service::Event::BitswapOpenFailed { peer_id, error }) => {
2652                log!(
2653                    &task.platform,
2654                    Debug,
2655                    "network",
2656                    "bitswap-open-error",
2657                    peer_id,
2658                    ?error
2659                );
2660                let ban_duration = if error.is_protocol_not_available() {
2661                    Duration::from_secs(600)
2662                } else {
2663                    Duration::from_secs(15)
2664                };
2665                if matches!(
2666                    task.bitswap_peering_strategy
2667                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2668                    bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2669                ) {
2670                    log!(
2671                        &task.platform,
2672                        Debug,
2673                        "network",
2674                        "bitswap-slot-unassigned",
2675                        peer_id,
2676                        ?ban_duration,
2677                        reason = "bitswap-open-failed"
2678                    );
2679                    task.network.bitswap_remove_desired(&peer_id);
2680                }
2681            }
2682            WakeUpReason::NetworkEvent(service::Event::BitswapMessage { peer_id, message }) => {
2683                log!(
2684                    &task.platform,
2685                    Debug,
2686                    "network",
2687                    "bitswap-message-received",
2688                    peer_id
2689                );
2690                debug_assert!(task.bitswap_event_pending_send.is_none());
2691                task.bitswap_event_pending_send =
2692                    Some(BitswapEvent::BitswapMessage { peer_id, message });
2693            }
2694            WakeUpReason::NetworkEvent(service::Event::BitswapDisconnected { peer_id }) => {
2695                log!(&task.platform, Debug, "network", "bitswap-closed", peer_id);
2696                let ban_duration = Duration::from_secs(10);
2697                if matches!(
2698                    task.bitswap_peering_strategy
2699                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2700                    bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2701                ) {
2702                    log!(
2703                        &task.platform,
2704                        Debug,
2705                        "network",
2706                        "bitswap-slot-unassigned",
2707                        peer_id,
2708                        ?ban_duration,
2709                        reason = "bitswap-closed"
2710                    );
2711                    task.network.bitswap_remove_desired(&peer_id);
2712                }
2713            }
2714            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2715                substream_id,
2716                peer_id,
2717                chain_id,
2718                response: service::RequestResult::Blocks(response),
2719            }) => {
2720                match &response {
2721                    Ok(blocks) => {
2722                        log!(
2723                            &task.platform,
2724                            Debug,
2725                            "network",
2726                            "blocks-request-success",
2727                            chain = task.network[chain_id].log_name,
2728                            target = peer_id,
2729                            num_blocks = blocks.len(),
2730                            block_data_total_size =
2731                                BytesDisplay(blocks.iter().fold(0, |sum, block| {
2732                                    let block_size = block.header.as_ref().map_or(0, |h| h.len())
2733                                        + block
2734                                            .body
2735                                            .as_ref()
2736                                            .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2737                                        + block
2738                                            .justifications
2739                                            .as_ref()
2740                                            .into_iter()
2741                                            .flat_map(|l| l.iter())
2742                                            .fold(0, |s, j| s + j.justification.len());
2743                                    sum + u64::try_from(block_size).unwrap()
2744                                }))
2745                        );
2746                    }
2747                    Err(error) => {
2748                        log!(
2749                            &task.platform,
2750                            Debug,
2751                            "network",
2752                            "blocks-request-error",
2753                            chain = task.network[chain_id].log_name,
2754                            target = peer_id,
2755                            ?error
2756                        );
2757                    }
2758                }
2759
2760                match &response {
2761                    Ok(_) => {}
2762                    Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2763                    Err(err) => {
2764                        log!(
2765                            &task.platform,
2766                            Debug,
2767                            "network",
2768                            format!(
2769                                "Error in block request with {}. This might indicate an \
2770                                incompatibility. Error: {}",
2771                                peer_id, err
2772                            )
2773                        );
2774                    }
2775                }
2776
2777                let _ = task
2778                    .blocks_requests
2779                    .remove(&substream_id)
2780                    .unwrap()
2781                    .send(response.map_err(BlocksRequestError::Request));
2782            }
2783            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2784                substream_id,
2785                peer_id,
2786                chain_id,
2787                response: service::RequestResult::GrandpaWarpSync(response),
2788            }) => {
2789                match &response {
2790                    Ok(response) => {
2791                        // TODO: print total bytes size
2792                        let decoded = response.decode();
2793                        log!(
2794                            &task.platform,
2795                            Debug,
2796                            "network",
2797                            "warp-sync-request-success",
2798                            chain = task.network[chain_id].log_name,
2799                            target = peer_id,
2800                            num_fragments = decoded.fragments.len(),
2801                            is_finished = ?decoded.is_finished,
2802                        );
2803                    }
2804                    Err(error) => {
2805                        log!(
2806                            &task.platform,
2807                            Debug,
2808                            "network",
2809                            "warp-sync-request-error",
2810                            chain = task.network[chain_id].log_name,
2811                            target = peer_id,
2812                            ?error,
2813                        );
2814                    }
2815                }
2816
2817                let _ = task
2818                    .grandpa_warp_sync_requests
2819                    .remove(&substream_id)
2820                    .unwrap()
2821                    .send(response.map_err(WarpSyncRequestError::Request));
2822            }
2823            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2824                substream_id,
2825                peer_id,
2826                chain_id,
2827                response: service::RequestResult::StorageProof(response),
2828            }) => {
2829                match &response {
2830                    Ok(items) => {
2831                        let decoded = items.decode();
2832                        log!(
2833                            &task.platform,
2834                            Debug,
2835                            "network",
2836                            "storage-proof-request-success",
2837                            chain = task.network[chain_id].log_name,
2838                            target = peer_id,
2839                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2840                        );
2841                    }
2842                    Err(error) => {
2843                        log!(
2844                            &task.platform,
2845                            Debug,
2846                            "network",
2847                            "storage-proof-request-error",
2848                            chain = task.network[chain_id].log_name,
2849                            target = peer_id,
2850                            ?error
2851                        );
2852                    }
2853                }
2854
2855                // Both regular storage proof and child storage proof use the same protocol,
2856                // so check both HashMaps for the request.
2857                if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2858                    let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2859                } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2860                {
2861                    let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2862                } else {
2863                    unreachable!()
2864                }
2865            }
2866            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2867                substream_id,
2868                peer_id,
2869                chain_id,
2870                response: service::RequestResult::CallProof(response),
2871            }) => {
2872                match &response {
2873                    Ok(items) => {
2874                        let decoded = items.decode();
2875                        log!(
2876                            &task.platform,
2877                            Debug,
2878                            "network",
2879                            "call-proof-request-success",
2880                            chain = task.network[chain_id].log_name,
2881                            target = peer_id,
2882                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2883                        );
2884                    }
2885                    Err(error) => {
2886                        log!(
2887                            &task.platform,
2888                            Debug,
2889                            "network",
2890                            "call-proof-request-error",
2891                            chain = task.network[chain_id].log_name,
2892                            target = peer_id,
2893                            ?error
2894                        );
2895                    }
2896                }
2897
2898                let _ = task
2899                    .call_proof_requests
2900                    .remove(&substream_id)
2901                    .unwrap()
2902                    .send(response.map_err(CallProofRequestError::Request));
2903            }
2904            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2905                peer_id: requestee_peer_id,
2906                chain_id,
2907                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2908                ..
2909            }) => {
2910                for (peer_id, mut addrs) in nodes {
2911                    // Make sure to not insert too many address for a single peer.
2912                    // While the .
2913                    if addrs.len() >= 10 {
2914                        addrs.truncate(10);
2915                    }
2916
2917                    let mut valid_addrs = Vec::with_capacity(addrs.len());
2918                    for addr in addrs {
2919                        match Multiaddr::from_bytes(addr) {
2920                            Ok(mut a) => {
2921                                if !pop_p2p_if_matches(&mut a, &peer_id) {
2922                                    log!(
2923                                        &task.platform,
2924                                        Debug,
2925                                        "network",
2926                                        "discovered-address-peer-id-mismatch",
2927                                        chain = &task.network[chain_id].log_name,
2928                                        announced_peer_id = peer_id,
2929                                        addr = &a,
2930                                        obtained_from = requestee_peer_id
2931                                    );
2932                                    continue;
2933                                }
2934                                if platform::address_parse::multiaddr_to_address(&a)
2935                                    .ok()
2936                                    .map_or(false, |addr| {
2937                                        task.platform.supports_connection_type((&addr).into())
2938                                    })
2939                                {
2940                                    valid_addrs.push(a)
2941                                } else {
2942                                    log!(
2943                                        &task.platform,
2944                                        Debug,
2945                                        "network",
2946                                        "discovered-address-not-supported",
2947                                        chain = &task.network[chain_id].log_name,
2948                                        peer_id,
2949                                        addr = &a,
2950                                        obtained_from = requestee_peer_id
2951                                    );
2952                                }
2953                            }
2954                            Err((error, addr)) => {
2955                                log!(
2956                                    &task.platform,
2957                                    Debug,
2958                                    "network",
2959                                    "discovered-address-invalid",
2960                                    chain = &task.network[chain_id].log_name,
2961                                    peer_id,
2962                                    error,
2963                                    addr = hex::encode(&addr),
2964                                    obtained_from = requestee_peer_id
2965                                );
2966                            }
2967                        }
2968                    }
2969
2970                    if !valid_addrs.is_empty() {
2971                        // Note that we must call this function before `insert_address`,
2972                        // as documented in `basic_peering_strategy`.
2973                        let insert_outcome =
2974                            task.peering_strategy
2975                                .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2976
2977                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2978                            peer_removed,
2979                        } = insert_outcome
2980                        {
2981                            if let Some(peer_removed) = peer_removed {
2982                                log!(
2983                                    &task.platform,
2984                                    Debug,
2985                                    "network",
2986                                    "peer-purged-from-address-book",
2987                                    chain = &task.network[chain_id].log_name,
2988                                    peer_id = peer_removed,
2989                                );
2990                            }
2991
2992                            log!(
2993                                &task.platform,
2994                                Debug,
2995                                "network",
2996                                "peer-discovered",
2997                                chain = &task.network[chain_id].log_name,
2998                                peer_id,
2999                                addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), // TODO: better formatting?
3000                                obtained_from = requestee_peer_id
3001                            );
3002                        }
3003                    }
3004
3005                    for addr in valid_addrs {
3006                        let _insert_result =
3007                            task.peering_strategy
3008                                .insert_address(&peer_id, addr.into_bytes(), 10); // TODO: constant
3009                        debug_assert!(!matches!(
3010                            _insert_result,
3011                            basic_peering_strategy::InsertAddressResult::UnknownPeer
3012                        ));
3013                    }
3014                }
3015            }
3016            WakeUpReason::NetworkEvent(service::Event::RequestResult {
3017                peer_id,
3018                chain_id,
3019                response: service::RequestResult::KademliaFindNode(Err(error)),
3020                ..
3021            }) => {
3022                log!(
3023                    &task.platform,
3024                    Debug,
3025                    "network",
3026                    "discovery-find-node-error",
3027                    chain = &task.network[chain_id].log_name,
3028                    ?error,
3029                    find_node_target = peer_id,
3030                );
3031
3032                // No error is printed if the request fails due to a benign networking error such
3033                // as an unresponsive peer.
3034                match error {
3035                    service::KademliaFindNodeError::RequestFailed(err)
3036                        if !err.is_protocol_error() => {}
3037
3038                    service::KademliaFindNodeError::RequestFailed(
3039                        service::RequestError::Substream(
3040                            connection::established::RequestError::ProtocolNotAvailable,
3041                        ),
3042                    ) => {
3043                        // TODO: remove this warning in a long time
3044                        log!(
3045                            &task.platform,
3046                            Warn,
3047                            "network",
3048                            format!(
3049                                "Problem during discovery on {}: protocol not available. \
3050                                This might indicate that the version of Substrate used by \
3051                                the chain doesn't include \
3052                                <https://github.com/paritytech/substrate/pull/12545>.",
3053                                &task.network[chain_id].log_name
3054                            )
3055                        );
3056                    }
3057                    _ => {
3058                        log!(
3059                            &task.platform,
3060                            Debug,
3061                            "network",
3062                            format!(
3063                                "Problem during discovery on {}: {}",
3064                                &task.network[chain_id].log_name, error
3065                            )
3066                        );
3067                    }
3068                }
3069            }
3070            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
3071                // We never start any other kind of requests.
3072                unreachable!()
3073            }
3074            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
3075                peer_id,
3076                chain_id,
3077                kind: service::GossipKind::ConsensusTransactions,
3078            }) => {
3079                // The networking state machine guarantees that `GossipInDesired`
3080                // can't happen if we are already opening an out slot, which we do
3081                // immediately.
3082                // TODO: add debug_assert! ^
3083                if task
3084                    .network
3085                    .opened_gossip_undesired_by_chain(chain_id)
3086                    .count()
3087                    < 4
3088                {
3089                    log!(
3090                        &task.platform,
3091                        Debug,
3092                        "network",
3093                        "gossip-in-request",
3094                        chain = &task.network[chain_id].log_name,
3095                        peer_id,
3096                        outcome = "accepted"
3097                    );
3098                    task.network
3099                        .gossip_open(
3100                            chain_id,
3101                            &peer_id,
3102                            service::GossipKind::ConsensusTransactions,
3103                        )
3104                        .unwrap();
3105                } else {
3106                    log!(
3107                        &task.platform,
3108                        Debug,
3109                        "network",
3110                        "gossip-in-request",
3111                        chain = &task.network[chain_id].log_name,
3112                        peer_id,
3113                        outcome = "rejected",
3114                    );
3115                    task.network
3116                        .gossip_close(
3117                            chain_id,
3118                            &peer_id,
3119                            service::GossipKind::ConsensusTransactions,
3120                        )
3121                        .unwrap();
3122                }
3123            }
3124            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
3125                // Can't happen as we already instantaneously accept or reject gossip in requests.
3126                unreachable!()
3127            }
3128            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
3129                peer_id,
3130                substream_id,
3131            }) => {
3132                log!(
3133                    &task.platform,
3134                    Debug,
3135                    "network",
3136                    "identify-request-received",
3137                    peer_id,
3138                );
3139                task.network
3140                    .respond_identify(substream_id, &task.identify_agent_version);
3141            }
3142            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
3143            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
3144                // All incoming requests are immediately answered.
3145                unreachable!()
3146            }
3147            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
3148                chain_id,
3149                peer_id,
3150                state,
3151            }) => {
3152                log!(
3153                    &task.platform,
3154                    Debug,
3155                    "network",
3156                    "grandpa-neighbor-packet-received",
3157                    chain = &task.network[chain_id].log_name,
3158                    peer_id,
3159                    round_number = state.round_number,
3160                    set_id = state.set_id,
3161                    commit_finalized_height = state.commit_finalized_height,
3162                );
3163
3164                task.open_gossip_links
3165                    .get_mut(&(chain_id, peer_id.clone()))
3166                    .unwrap()
3167                    .finalized_block_height = Some(state.commit_finalized_height);
3168
3169                debug_assert!(task.event_pending_send.is_none());
3170                task.event_pending_send = Some((
3171                    chain_id,
3172                    Event::GrandpaNeighborPacket {
3173                        peer_id,
3174                        finalized_block_height: state.commit_finalized_height,
3175                    },
3176                ));
3177            }
3178            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
3179                chain_id,
3180                peer_id,
3181                message,
3182            }) => {
3183                log!(
3184                    &task.platform,
3185                    Debug,
3186                    "network",
3187                    "grandpa-commit-message-received",
3188                    chain = &task.network[chain_id].log_name,
3189                    peer_id,
3190                    target_block_hash = HashDisplay(message.decode().target_hash),
3191                );
3192
3193                debug_assert!(task.event_pending_send.is_none());
3194                task.event_pending_send =
3195                    Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
3196            }
3197            WakeUpReason::NetworkEvent(service::Event::StatementsNotification {
3198                chain_id,
3199                peer_id,
3200                statements,
3201            }) => {
3202                debug_assert!(task.event_pending_send.is_none());
3203
3204                if statements.is_empty() {
3205                    continue;
3206                }
3207
3208                task.event_pending_send = Some((
3209                    chain_id,
3210                    Event::StatementsNotification {
3211                        peer_id,
3212                        statements,
3213                    },
3214                ));
3215            }
3216            WakeUpReason::NetworkEvent(service::Event::StatementProtocolConnected {
3217                peer_id,
3218                chain_id,
3219                version,
3220            }) => {
3221                log!(
3222                    &task.platform,
3223                    Trace,
3224                    "network",
3225                    "statement-protocol-open-success",
3226                    chain = &task.network[chain_id].log_name,
3227                    peer_id,
3228                    ?version,
3229                );
3230
3231                if matches!(version, codec::StatementProtocolVersion::V2) {
3232                    task.v2_statement_peers
3233                        .entry(chain_id)
3234                        .or_insert_with(|| {
3235                            HashSet::with_capacity_and_hasher(16, Default::default())
3236                        })
3237                        .insert(peer_id.clone());
3238                    if let Some(filter) = task.current_affinity_filter.get(&chain_id) {
3239                        if let Err(
3240                            SendTopicAffinityError::NoConnection
3241                            | SendTopicAffinityError::ProtocolV1,
3242                        ) = task.network.send_topic_affinity(&peer_id, chain_id, filter)
3243                        {
3244                            task.v2_statement_peers
3245                                .get_mut(&chain_id)
3246                                .unwrap()
3247                                .remove(&peer_id);
3248                        }
3249                    }
3250                }
3251            }
3252            // TODO: we don't filter outbound statements yet
3253            WakeUpReason::NetworkEvent(service::Event::StatementTopicAffinityReceived {
3254                ..
3255            }) => {}
3256            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
3257                // TODO: handle properly?
3258                log!(
3259                    &task.platform,
3260                    Warn,
3261                    "network",
3262                    "protocol-error",
3263                    peer_id,
3264                    ?error
3265                );
3266
3267                // TODO: disconnect peer
3268            }
3269            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
3270                task.peering_strategy.assign_slot(&chain_id, &peer_id);
3271
3272                log!(
3273                    &task.platform,
3274                    Debug,
3275                    "network",
3276                    "slot-assigned",
3277                    chain = &task.network[chain_id].log_name,
3278                    peer_id
3279                );
3280
3281                task.network.gossip_insert_desired(
3282                    chain_id,
3283                    peer_id,
3284                    service::GossipKind::ConsensusTransactions,
3285                );
3286            }
3287            WakeUpReason::CanAssignBitswapSlot(peer_id) => {
3288                task.bitswap_peering_strategy.assign_slot(&peer_id).unwrap();
3289
3290                log!(
3291                    &task.platform,
3292                    Debug,
3293                    "network",
3294                    "bitswap-slot-assigned",
3295                    peer_id
3296                );
3297
3298                task.network.bitswap_insert_desired(peer_id);
3299            }
3300            WakeUpReason::NextRecentConnectionRestore => {
3301                task.num_recent_connection_opening =
3302                    task.num_recent_connection_opening.saturating_sub(1);
3303            }
3304            WakeUpReason::CanStartConnect(expected_peer_id) => {
3305                let Some(multiaddr) = task
3306                    .peering_strategy
3307                    .pick_address_and_add_connection(&expected_peer_id)
3308                else {
3309                    // There is no address for that peer in the address book.
3310                    task.network.gossip_remove_desired_all(
3311                        &expected_peer_id,
3312                        service::GossipKind::ConsensusTransactions,
3313                    );
3314                    let ban_duration = Duration::from_secs(10);
3315                    for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
3316                        &expected_peer_id,
3317                        task.platform.now() + ban_duration,
3318                    ) {
3319                        if matches!(
3320                            what_happened,
3321                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
3322                        ) {
3323                            log!(
3324                                &task.platform,
3325                                Debug,
3326                                "network",
3327                                "slot-unassigned",
3328                                chain = &task.network[chain_id].log_name,
3329                                peer_id = expected_peer_id,
3330                                ?ban_duration,
3331                                reason = "no-address"
3332                            );
3333                        }
3334                    }
3335                    continue;
3336                };
3337
3338                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
3339                    Ok(a) => a,
3340                    Err((multiaddr::FromBytesError, addr)) => {
3341                        // Address is in an invalid format.
3342                        let _was_in = task
3343                            .peering_strategy
3344                            .decrease_address_connections_and_remove_if_zero(
3345                                &expected_peer_id,
3346                                &addr,
3347                            );
3348                        debug_assert!(_was_in.is_ok());
3349                        continue;
3350                    }
3351                };
3352
3353                let address = address_parse::multiaddr_to_address(&multiaddr)
3354                    .ok()
3355                    .filter(|addr| {
3356                        task.platform.supports_connection_type(match &addr {
3357                            address_parse::AddressOrMultiStreamAddress::Address(addr) => {
3358                                From::from(addr)
3359                            }
3360                            address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3361                                addr,
3362                            ) => From::from(addr),
3363                        })
3364                    });
3365
3366                let Some(address) = address else {
3367                    // Address is in an invalid format or isn't supported by the platform.
3368                    let _was_in = task
3369                        .peering_strategy
3370                        .decrease_address_connections_and_remove_if_zero(
3371                            &expected_peer_id,
3372                            multiaddr.as_ref(),
3373                        );
3374                    debug_assert!(_was_in.is_ok());
3375                    continue;
3376                };
3377
3378                // Each connection has its own individual Noise key.
3379                let noise_key = {
3380                    let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
3381                    task.platform.fill_random_bytes(&mut *noise_static_key);
3382                    let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
3383                    task.platform.fill_random_bytes(&mut *libp2p_key);
3384                    connection::NoiseKey::new(&libp2p_key, &noise_static_key)
3385                };
3386
3387                log!(
3388                    &task.platform,
3389                    Debug,
3390                    "network",
3391                    "connection-started",
3392                    expected_peer_id,
3393                    remote_addr = multiaddr,
3394                    local_peer_id =
3395                        peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
3396                            .into_peer_id(),
3397                );
3398
3399                task.num_recent_connection_opening += 1;
3400
3401                let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
3402                    async_channel::bounded(8);
3403                let task_name = format!("connection-{}", multiaddr);
3404
3405                match address {
3406                    address_parse::AddressOrMultiStreamAddress::Address(address) => {
3407                        // As documented in the `PlatformRef` trait, `connect_stream` must
3408                        // return as soon as possible.
3409                        let connection = task.platform.connect_stream(address).await;
3410
3411                        let (connection_id, connection_task) =
3412                            task.network.add_single_stream_connection(
3413                                task.platform.now(),
3414                                service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
3415                                    is_initiator: true,
3416                                    noise_key: &noise_key,
3417                                },
3418                                multiaddr.clone().into_bytes(),
3419                                Some(expected_peer_id.clone()),
3420                                coordinator_to_connection_tx,
3421                            );
3422
3423                        task.platform.spawn_task(
3424                            task_name.into(),
3425                            tasks::single_stream_connection_task::<TPlat>(
3426                                connection,
3427                                multiaddr.to_string(),
3428                                task.platform.clone(),
3429                                connection_id,
3430                                connection_task,
3431                                coordinator_to_connection_rx,
3432                                task.tasks_messages_tx.clone(),
3433                            ),
3434                        );
3435                    }
3436                    address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3437                        platform::MultiStreamAddress::WebRtc {
3438                            ip,
3439                            port,
3440                            remote_certificate_sha256,
3441                        },
3442                    ) => {
3443                        // We need to know the local TLS certificate in order to insert the
3444                        // connection, and as such we need to call `connect_multistream` here.
3445                        // As documented in the `PlatformRef` trait, `connect_multistream` must
3446                        // return as soon as possible.
3447                        let connection = task
3448                            .platform
3449                            .connect_multistream(platform::MultiStreamAddress::WebRtc {
3450                                ip,
3451                                port,
3452                                remote_certificate_sha256,
3453                            })
3454                            .await;
3455
3456                        // Convert the SHA256 hashes into multihashes.
3457                        let local_tls_certificate_multihash = [18u8, 32]
3458                            .into_iter()
3459                            .chain(connection.local_tls_certificate_sha256.into_iter())
3460                            .collect();
3461                        let remote_tls_certificate_multihash = [18u8, 32]
3462                            .into_iter()
3463                            .chain(remote_certificate_sha256.iter().copied())
3464                            .collect();
3465
3466                        let (connection_id, connection_task) =
3467                            task.network.add_multi_stream_connection(
3468                                task.platform.now(),
3469                                service::MultiStreamHandshakeKind::WebRtc {
3470                                    is_initiator: true,
3471                                    local_tls_certificate_multihash,
3472                                    remote_tls_certificate_multihash,
3473                                    noise_key: &noise_key,
3474                                },
3475                                multiaddr.clone().into_bytes(),
3476                                Some(expected_peer_id.clone()),
3477                                coordinator_to_connection_tx,
3478                            );
3479
3480                        task.platform.spawn_task(
3481                            task_name.into(),
3482                            tasks::webrtc_multi_stream_connection_task::<TPlat>(
3483                                connection.connection,
3484                                multiaddr.to_string(),
3485                                task.platform.clone(),
3486                                connection_id,
3487                                connection_task,
3488                                coordinator_to_connection_rx,
3489                                task.tasks_messages_tx.clone(),
3490                            ),
3491                        );
3492                    }
3493                }
3494            }
3495            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
3496                task.network
3497                    .gossip_open(
3498                        chain_id,
3499                        &peer_id,
3500                        service::GossipKind::ConsensusTransactions,
3501                    )
3502                    .unwrap();
3503
3504                log!(
3505                    &task.platform,
3506                    Debug,
3507                    "network",
3508                    "gossip-open-start",
3509                    chain = &task.network[chain_id].log_name,
3510                    peer_id,
3511                );
3512            }
3513            WakeUpReason::CanOpenBitswap(peer_id) => {
3514                task.network.bitswap_open(&peer_id).unwrap();
3515
3516                log!(
3517                    &task.platform,
3518                    Debug,
3519                    "network",
3520                    "bitswap-open-start",
3521                    peer_id
3522                );
3523            }
3524            WakeUpReason::MessageToConnection {
3525                connection_id,
3526                message,
3527            } => {
3528                // Note that it is critical for the sending to not take too long here, in order to
3529                // not block the process of the network service.
3530                // In particular, if sending the message to the connection is blocked due to
3531                // sending a message on the connection-to-coordinator channel, this will result
3532                // in a deadlock.
3533                // For this reason, the connection task is always ready to immediately accept a
3534                // message on the coordinator-to-connection channel.
3535                let _send_result = task.network[connection_id].send(message).await;
3536                debug_assert!(_send_result.is_ok());
3537            }
3538        }
3539    }
3540}
3541
3542/// Pops a trailing `/p2p/<peer_id>` from `addr` if it matches `expected_peer`. Returns `false`
3543/// (caller must discard the address) on mismatch.
3544fn pop_p2p_if_matches(
3545    addr: &mut smoldot::libp2p::multiaddr::Multiaddr,
3546    expected_peer: &smoldot::libp2p::peer_id::PeerId,
3547) -> bool {
3548    use smoldot::libp2p::multiaddr::Protocol;
3549    match addr.iter().last() {
3550        Some(Protocol::P2p(mh)) => {
3551            if mh.into_bytes() == expected_peer.as_bytes() {
3552                addr.pop();
3553                true
3554            } else {
3555                false
3556            }
3557        }
3558        _ => true,
3559    }
3560}
3561
3562#[cfg(test)]
3563mod tests {
3564    use super::pop_p2p_if_matches;
3565    use smoldot::libp2p::{multiaddr::Multiaddr, peer_id::PeerId};
3566
3567    // Two distinct, valid PeerIds. The first is reused from existing smoldot tests in
3568    // `lib/src/libp2p/multiaddr.rs:629`; the second is the bootnode peer-id observed in the
3569    // test environment that motivated this change.
3570    const PEER_A: &str = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN";
3571    const PEER_B: &str = "12D3KooWQk1yQtG1YugyKjiQf6KNk8VjGGAT5xy1FWcnRKN4yXYJ";
3572
3573    fn peer(s: &str) -> PeerId {
3574        PeerId::from_bytes(bs58::decode(s).into_vec().unwrap()).unwrap()
3575    }
3576
3577    #[test]
3578    fn no_suffix_passes_through_unchanged() {
3579        let mut addr: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3580        let before = addr.clone();
3581        assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3582        assert_eq!(addr, before);
3583    }
3584
3585    #[test]
3586    fn matching_suffix_is_stripped() {
3587        let mut addr: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3588            .parse()
3589            .unwrap();
3590        assert!(pop_p2p_if_matches(&mut addr, &peer(PEER_A)));
3591        let expected: Multiaddr = "/ip4/127.0.0.1/tcp/30333/ws".parse().unwrap();
3592        assert_eq!(addr, expected);
3593    }
3594
3595    #[test]
3596    fn mismatched_suffix_rejects_and_keeps_addr() {
3597        let original: Multiaddr = format!("/ip4/127.0.0.1/tcp/30333/ws/p2p/{PEER_A}")
3598            .parse()
3599            .unwrap();
3600        let mut addr = original.clone();
3601        assert!(!pop_p2p_if_matches(&mut addr, &peer(PEER_B)));
3602        assert_eq!(addr, original);
3603    }
3604}