Skip to main content

smoldot_light/
network_service.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Background network service.
19//!
20//! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes.
21//! Importantly, its design is oriented towards the particular use case of the light client.
22//!
23//! The [`NetworkService`] spawns one background task (using [`PlatformRef::spawn_task`]) for
24//! each active connection.
25//!
26//! The objective of the [`NetworkService`] in general is to try stay connected as much as
27//! possible to the nodes of the peer-to-peer network of the chain, and maintain open substreams
28//! with them in order to send out requests (e.g. block requests) and notifications (e.g. block
29//! announces).
30//!
31//! Connectivity to the network is performed in the background as an implementation detail of
32//! the service. The public API only allows emitting requests and notifications towards the
33//! already-connected nodes.
34//!
35//! After a [`NetworkService`] is created, one can add chains using [`NetworkService::add_chain`].
36//! If all references to a [`NetworkServiceChain`] are destroyed, the chain is automatically
37//! purged.
38//!
39//! An important part of the API is the list of channel receivers of [`Event`] returned by
40//! [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to the
41//! network connectivity.
42
43use crate::{
44    log,
45    platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49    borrow::ToOwned as _,
50    boxed::Box,
51    collections::BTreeMap,
52    format,
53    string::{String, ToString as _},
54    sync::Arc,
55    vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, num::NonZeroUsize, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65    header,
66    informant::{BytesDisplay, HashDisplay},
67    libp2p::{
68        connection,
69        multiaddr::{self, Multiaddr},
70        peer_id,
71    },
72    network::{basic_peering_strategy, bitswap_peering_strategy, codec, service},
73};
74
75pub use codec::{AffinityFilter, CallProofRequestConfig, Role};
76use service::SendTopicAffinityError;
77pub use service::{
78    ChainId, EncodedMerkleProof, PeerId, QueueNotificationError, SendBitswapMessageError,
79};
80
81/// Configuration for the Statement Store protocol.
82#[derive(Debug, Clone)]
83pub struct StatementProtocolConfig {
84    /// Per-subscription LRU cache size used for deduplicating delivered statements.
85    max_seen_statements: NonZeroUsize,
86    false_positive_rate: f64,
87    bloom_seed: u128,
88    affinity_update_interval: Duration,
89}
90
91impl StatementProtocolConfig {
92    pub fn new(
93        max_seen_statements: NonZeroUsize,
94        false_positive_rate: f64,
95        bloom_seed: u128,
96        affinity_update_interval: Duration,
97    ) -> Self {
98        assert!(
99            false_positive_rate.is_finite()
100                && false_positive_rate > 0.0
101                && false_positive_rate < 1.0
102        );
103        assert!(!affinity_update_interval.is_zero());
104        StatementProtocolConfig {
105            max_seen_statements,
106            false_positive_rate,
107            bloom_seed,
108            affinity_update_interval,
109        }
110    }
111
112    pub fn max_seen_statements(&self) -> NonZeroUsize {
113        self.max_seen_statements
114    }
115
116    pub fn false_positive_rate(&self) -> f64 {
117        self.false_positive_rate
118    }
119
120    pub fn bloom_seed(&self) -> u128 {
121        self.bloom_seed
122    }
123
124    pub fn affinity_update_interval(&self) -> Duration {
125        self.affinity_update_interval
126    }
127}
128
129mod tasks;
130
131/// Configuration for a [`NetworkService`].
132pub struct Config<TPlat> {
133    /// Access to the platform's capabilities.
134    pub platform: TPlat,
135
136    /// Value sent back for the agent version when receiving an identification request.
137    pub identify_agent_version: String,
138
139    /// Capacity to allocate for the list of chains.
140    pub chains_capacity: usize,
141
142    /// Maximum number of connections that the service can open simultaneously. After this value
143    /// has been reached, a new connection can be opened after each
144    /// [`Config::connections_open_pool_restore_delay`].
145    pub connections_open_pool_size: u32,
146
147    /// Delay after which the service can open a new connection.
148    /// The delay is cumulative. If no connection has been opened for example for twice this
149    /// duration, then two connections can be opened at the same time, up to a maximum of
150    /// [`Config::connections_open_pool_size`].
151    pub connections_open_pool_restore_delay: Duration,
152}
153
154/// See [`NetworkService::add_chain`].
155///
156/// Note that this configuration is intentionally missing a field containing the bootstrap
157/// nodes of the chain. Bootstrap nodes are supposed to be added afterwards by calling
158/// [`NetworkServiceChain::discover`].
159pub struct ConfigChain {
160    /// Name of the chain, for logging purposes.
161    pub log_name: String,
162
163    /// Number of "out slots" of this chain. We establish simultaneously gossip substreams up to
164    /// this number of peers.
165    pub num_out_slots: usize,
166
167    /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether
168    /// the chains match.
169    ///
170    /// > **Note**: Be aware that this *must* be the *genesis* block, not any block known to be
171    /// >           in the chain.
172    pub genesis_block_hash: [u8; 32],
173
174    /// Number and hash of the current best block. Can later be updated with
175    /// [`NetworkServiceChain::set_local_best_block`].
176    pub best_block: (u64, [u8; 32]),
177
178    /// Optional identifier to insert into the networking protocol names. Used to differentiate
179    /// between chains with the same genesis hash.
180    pub fork_id: Option<String>,
181
182    /// Number of bytes of the block number in the networking protocol.
183    pub block_number_bytes: usize,
184
185    /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the
186    /// number of the finalized block at the time of the initialization.
187    pub grandpa_protocol_finalized_block_height: Option<u64>,
188
189    /// If `Some`, enables the statement store protocol.
190    pub statement_protocol_config: Option<StatementProtocolConfig>,
191}
192
193pub struct NetworkService<TPlat: PlatformRef> {
194    /// Channel connected to the background service.
195    messages_tx: async_channel::Sender<ToBackground<TPlat>>,
196
197    /// See [`Config::platform`].
198    platform: TPlat,
199}
200
201impl<TPlat: PlatformRef> NetworkService<TPlat> {
202    /// Initializes the network service with the given configuration.
203    pub fn new(config: Config<TPlat>) -> Arc<Self> {
204        let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
205
206        let network = service::ChainNetwork::new(service::Config {
207            chains_capacity: config.chains_capacity,
208            connections_capacity: 32,
209            handshake_timeout: Duration::from_secs(8),
210            randomness_seed: {
211                let mut seed = [0; 32];
212                config.platform.fill_random_bytes(&mut seed);
213                seed
214            },
215        });
216
217        // Spawn main task that processes the network service.
218        let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
219        let task = Box::pin(background_task(BackgroundTask {
220            randomness: rand_chacha::ChaCha20Rng::from_seed({
221                let mut seed = [0; 32];
222                config.platform.fill_random_bytes(&mut seed);
223                seed
224            }),
225            identify_agent_version: config.identify_agent_version,
226            tasks_messages_tx,
227            tasks_messages_rx: Box::pin(tasks_messages_rx),
228            peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
229                basic_peering_strategy::Config {
230                    randomness_seed: {
231                        let mut seed = [0; 32];
232                        config.platform.fill_random_bytes(&mut seed);
233                        seed
234                    },
235                    peers_capacity: 50, // TODO: ?
236                    chains_capacity: config.chains_capacity,
237                },
238            ),
239            bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy::new(
240                bitswap_peering_strategy::Config {
241                    randomness_seed: {
242                        let mut seed = [0; 32];
243                        config.platform.fill_random_bytes(&mut seed);
244                        seed
245                    },
246                    peers_capacity: 50, // TODO: hardcoded to the same value as `peering_strategy`.
247                },
248            ),
249            network,
250            connections_open_pool_size: config.connections_open_pool_size,
251            connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
252            num_recent_connection_opening: 0,
253            next_recent_connection_restore: None,
254            platform: config.platform.clone(),
255            open_gossip_links: BTreeMap::new(),
256            v2_statement_peers: HashMap::with_capacity_and_hasher(4, Default::default()),
257            current_affinity_filter: HashMap::with_capacity_and_hasher(4, Default::default()),
258            event_pending_send: None,
259            event_senders: either::Left(Vec::new()),
260            pending_new_subscriptions: Vec::new(),
261            bitswap_event_pending_send: None,
262            bitswap_event_senders: either::Left(Vec::new()),
263            pending_new_bitswap_subscriptions: Vec::new(),
264            important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
265            main_messages_rx: Box::pin(main_messages_rx),
266            messages_rx: stream::SelectAll::new(),
267            blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
268            grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
269            storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
270            call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
271            child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
272            chains_by_next_discovery: BTreeMap::new(),
273        }));
274
275        config.platform.spawn_task("network-service".into(), {
276            let platform = config.platform.clone();
277            async move {
278                task.await;
279                log!(&platform, Debug, "network", "shutdown");
280            }
281        });
282
283        Arc::new(NetworkService {
284            messages_tx: main_messages_tx,
285            platform: config.platform,
286        })
287    }
288
289    /// Adds a chain to the list of chains that the network service connects to.
290    ///
291    /// Returns an object representing the chain and that allows interacting with it. If all
292    /// references to [`NetworkServiceChain`] are destroyed, the network service automatically
293    /// purges that chain.
294    pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
295        let (messages_tx, messages_rx) = async_channel::bounded(32);
296
297        // TODO: this code is hacky because we don't want to make `add_chain` async at the moment, because it's not convenient for lib.rs
298        self.platform.spawn_task("add-chain-message-send".into(), {
299            let config = service::ChainConfig {
300                grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
301                    |commit_finalized_height| service::GrandpaState {
302                        commit_finalized_height,
303                        round_number: 1,
304                        set_id: 0,
305                    },
306                ),
307                enable_statement_protocol: config.statement_protocol_config.is_some(),
308                fork_id: config.fork_id.clone(),
309                block_number_bytes: config.block_number_bytes,
310                best_hash: config.best_block.1,
311                best_number: config.best_block.0,
312                genesis_hash: config.genesis_block_hash,
313                role: Role::Light,
314                allow_inbound_block_requests: false,
315                user_data: Chain {
316                    log_name: config.log_name,
317                    block_number_bytes: config.block_number_bytes,
318                    num_out_slots: config.num_out_slots,
319                    num_references: NonZero::<usize>::new(1).unwrap(),
320                    next_discovery_period: Duration::from_secs(2),
321                    next_discovery_when: self.platform.now(),
322                },
323            };
324
325            let messages_tx = self.messages_tx.clone();
326            async move {
327                let _ = messages_tx
328                    .send(ToBackground::AddChain {
329                        messages_rx,
330                        config,
331                    })
332                    .await;
333            }
334        });
335
336        Arc::new(NetworkServiceChain {
337            _keep_alive_messages_tx: self.messages_tx.clone(),
338            messages_tx,
339            marker: core::marker::PhantomData,
340        })
341    }
342}
343
344pub struct NetworkServiceChain<TPlat: PlatformRef> {
345    /// Copy of [`NetworkService::messages_tx`]. Used in order to maintain the network service
346    /// background task alive.
347    _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
348
349    /// Channel to send messages to the background task.
350    messages_tx: async_channel::Sender<ToBackgroundChain>,
351
352    /// Dummy to hold the `TPlat` type.
353    marker: core::marker::PhantomData<TPlat>,
354}
355
356/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`].
357#[derive(Debug, Copy, Clone, PartialEq, Eq)]
358pub enum BanSeverity {
359    Low,
360    High,
361}
362
363impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
364    /// Subscribes to the networking events that happen on the given chain.
365    ///
366    /// Calling this function returns a `Receiver` that receives events about the chain.
367    /// The new channel will immediately receive events about all the existing connections, so
368    /// that it is able to maintain a coherent view of the network.
369    ///
370    /// Note that this function is `async`, but it should return very quickly.
371    ///
372    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
373    /// connections will be back-pressured until the channel isn't full anymore.
374    ///
375    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
376    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
377    /// [`NetworkServiceChain::subscribe`] again to obtain a new `Receiver`.
378    ///
379    // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour
380    pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
381        let (tx, rx) = async_channel::bounded(128);
382
383        self.messages_tx
384            .send(ToBackgroundChain::Subscribe { sender: tx })
385            .await
386            .unwrap();
387
388        rx
389    }
390
391    /// Subscribes to the Bitswap events that happen on the network. Bitswap events subscription is
392    /// separate from other network service events, because Bitswap events are big and are not
393    /// needed by the most of subscribers.
394    ///
395    /// Note that this function is `async`, but it should return very quickly.
396    ///
397    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
398    /// connections will be back-pressured until the channel isn't full anymore.
399    ///
400    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
401    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
402    /// [`NetworkServiceChain::subscribe_bitswap`] again to obtain a new `Receiver`.
403    ///
404    // TODO: the last section of the doc seem to contradict itself.
405    pub async fn subscribe_bitswap(&self) -> async_channel::Receiver<BitswapEvent> {
406        let (tx, rx) = async_channel::bounded(128);
407
408        self.messages_tx
409            .send(ToBackgroundChain::SubscribeBitswap { sender: tx })
410            .await
411            .unwrap();
412
413        rx
414    }
415
416    /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
417    /// generated. Prevents a new gossip link with the same peer from being reopened for a
418    /// little while.
419    ///
420    /// `reason` is a human-readable string printed in the logs.
421    ///
422    /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
423    /// reconnection was already happening as the call to this function is still being processed.
424    /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
425    /// words, this function guarantees that we will be disconnected in the future rather than
426    /// guarantees that we will disconnect.
427    pub async fn ban_and_disconnect(
428        &self,
429        peer_id: PeerId,
430        severity: BanSeverity,
431        reason: &'static str,
432    ) {
433        let _ = self
434            .messages_tx
435            .send(ToBackgroundChain::DisconnectAndBan {
436                peer_id,
437                severity,
438                reason,
439            })
440            .await;
441    }
442
443    /// Sends a blocks request to the given peer.
444    // TODO: more docs
445    pub async fn blocks_request(
446        self: Arc<Self>,
447        target: PeerId,
448        config: codec::BlocksRequestConfig,
449        timeout: Duration,
450    ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
451        let (tx, rx) = oneshot::channel();
452
453        self.messages_tx
454            .send(ToBackgroundChain::StartBlocksRequest {
455                target: target.clone(),
456                config,
457                timeout,
458                result: tx,
459            })
460            .await
461            .unwrap();
462
463        rx.await.unwrap()
464    }
465
466    /// Sends a grandpa warp sync request to the given peer.
467    // TODO: more docs
468    pub async fn grandpa_warp_sync_request(
469        self: Arc<Self>,
470        target: PeerId,
471        begin_hash: [u8; 32],
472        timeout: Duration,
473    ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
474        let (tx, rx) = oneshot::channel();
475
476        self.messages_tx
477            .send(ToBackgroundChain::StartWarpSyncRequest {
478                target: target.clone(),
479                begin_hash,
480                timeout,
481                result: tx,
482            })
483            .await
484            .unwrap();
485
486        rx.await.unwrap()
487    }
488
489    pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
490        self.messages_tx
491            .send(ToBackgroundChain::SetLocalBestBlock {
492                best_hash,
493                best_number,
494            })
495            .await
496            .unwrap();
497    }
498
499    pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
500        self.messages_tx
501            .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
502            .await
503            .unwrap();
504    }
505
506    /// Sends a storage proof request to the given peer.
507    // TODO: more docs
508    pub async fn storage_proof_request(
509        self: Arc<Self>,
510        target: PeerId, // TODO: takes by value because of futures longevity issue
511        config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
512        timeout: Duration,
513    ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
514        let (tx, rx) = oneshot::channel();
515
516        self.messages_tx
517            .send(ToBackgroundChain::StartStorageProofRequest {
518                target: target.clone(),
519                config: codec::StorageProofRequestConfig {
520                    block_hash: config.block_hash,
521                    keys: config
522                        .keys
523                        .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead
524                        .collect::<Vec<_>>()
525                        .into_iter(),
526                },
527                timeout,
528                result: tx,
529            })
530            .await
531            .unwrap();
532
533        rx.await.unwrap()
534    }
535
536    /// Sends a call proof request to the given peer.
537    ///
538    /// See also [`NetworkServiceChain::call_proof_request`].
539    // TODO: more docs
540    pub async fn call_proof_request(
541        self: Arc<Self>,
542        target: PeerId, // TODO: takes by value because of futures longevity issue
543        config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
544        timeout: Duration,
545    ) -> Result<EncodedMerkleProof, CallProofRequestError> {
546        let (tx, rx) = oneshot::channel();
547
548        self.messages_tx
549            .send(ToBackgroundChain::StartCallProofRequest {
550                target: target.clone(),
551                config: codec::CallProofRequestConfig {
552                    block_hash: config.block_hash,
553                    method: config.method.into_owned().into(),
554                    parameter_vectored: config
555                        .parameter_vectored
556                        .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead
557                        .collect::<Vec<_>>()
558                        .into_iter(),
559                },
560                timeout,
561                result: tx,
562            })
563            .await
564            .unwrap();
565
566        rx.await.unwrap()
567    }
568
569    /// Sends a child storage proof request to the given peer.
570    pub async fn child_storage_proof_request(
571        self: Arc<Self>,
572        target: PeerId,
573        config: codec::ChildStorageProofRequestConfig<
574            impl AsRef<[u8]> + Clone,
575            impl Iterator<Item = impl AsRef<[u8]> + Clone>,
576        >,
577        timeout: Duration,
578    ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
579        let (tx, rx) = oneshot::channel();
580
581        self.messages_tx
582            .send(ToBackgroundChain::StartChildStorageProofRequest {
583                target: target.clone(),
584                config: ChildStorageProofRequestConfigOwned {
585                    block_hash: config.block_hash,
586                    child_trie: config.child_trie.as_ref().to_vec(),
587                    keys: config
588                        .keys
589                        .map(|key| key.as_ref().to_vec())
590                        .collect::<Vec<_>>(),
591                },
592                timeout,
593                result: tx,
594            })
595            .await
596            .unwrap();
597
598        rx.await.unwrap()
599    }
600
601    /// Announces transaction to the peers we are connected to.
602    ///
603    /// Returns a list of peers that we have sent the transaction to. Can return an empty `Vec`
604    /// if we didn't send the transaction to any peer.
605    ///
606    /// Note that the remote doesn't confirm that it has received the transaction. Because
607    /// networking is inherently unreliable, successfully sending a transaction to a peer doesn't
608    /// necessarily mean that the remote has received it. In practice, however, the likelihood of
609    /// a transaction not being received are extremely low. This can be considered as known flaw.
610    pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
611        let (tx, rx) = oneshot::channel();
612
613        self.messages_tx
614            .send(ToBackgroundChain::AnnounceTransaction {
615                transaction: transaction.to_vec(), // TODO: ovheread
616                result: tx,
617            })
618            .await
619            .unwrap();
620
621        rx.await.unwrap()
622    }
623
624    /// See [`service::ChainNetwork::gossip_send_block_announce`].
625    pub async fn send_block_announce(
626        self: Arc<Self>,
627        target: &PeerId,
628        scale_encoded_header: &[u8],
629        is_best: bool,
630    ) -> Result<(), QueueNotificationError> {
631        let (tx, rx) = oneshot::channel();
632
633        self.messages_tx
634            .send(ToBackgroundChain::SendBlockAnnounce {
635                target: target.clone(),                              // TODO: overhead
636                scale_encoded_header: scale_encoded_header.to_vec(), // TODO: overhead
637                is_best,
638                result: tx,
639            })
640            .await
641            .unwrap();
642
643        rx.await.unwrap()
644    }
645
646    /// Send Bitswap message to the given peer.
647    pub async fn send_bitswap_message(
648        &self,
649        target: PeerId,
650        message: Vec<u8>,
651    ) -> Result<(), SendBitswapMessageError> {
652        let (tx, rx) = oneshot::channel();
653
654        self.messages_tx
655            .send(ToBackgroundChain::SendBitswapMessage {
656                target,
657                message,
658                result: tx,
659            })
660            .await
661            .unwrap();
662
663        rx.await.unwrap()
664    }
665
666    /// Broadcast Bitswap message to all [`service::ChainNetwork::established_bitswap_desired`]
667    /// peers.
668    ///
669    /// Returns the peers message was broadcast to or an error if the message cannot be sent
670    /// to at least one peer.
671    // TODO: better use a dedicated error type instead of reusing a lower-level
672    // `SendBitswapMessageErorr`.
673    pub async fn broadcast_bitswap_message(
674        &self,
675        message: Vec<u8>,
676    ) -> Result<Vec<PeerId>, SendBitswapMessageError> {
677        let (tx, rx) = oneshot::channel();
678
679        self.messages_tx
680            .send(ToBackgroundChain::BroadcastBitswapMessage {
681                message,
682                result: tx,
683            })
684            .await
685            .unwrap();
686
687        rx.await.unwrap()
688    }
689
690    /// Broadcast a statement notification to all gossip-connected peers.
691    pub async fn broadcast_statement(
692        self: Arc<Self>,
693        statement: Vec<u8>,
694    ) -> BroadcastStatementResult {
695        let (tx, rx) = oneshot::channel();
696
697        self.messages_tx
698            .send(ToBackgroundChain::BroadcastStatement {
699                statement,
700                result: tx,
701            })
702            .await
703            .unwrap();
704
705        rx.await.unwrap()
706    }
707
708    pub async fn update_topic_affinity(&self, filter: AffinityFilter) {
709        self.messages_tx
710            .send(ToBackgroundChain::UpdateTopicAffinity { filter })
711            .await
712            .unwrap();
713    }
714
715    /// Marks the given peers as belonging to the given chain, and adds some addresses to these
716    /// peers to the address book.
717    ///
718    /// The `important_nodes` parameter indicates whether these nodes are considered note-worthy
719    /// and should have additional logging.
720    pub async fn discover(
721        &self,
722        list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
723        important_nodes: bool,
724    ) {
725        self.messages_tx
726            .send(ToBackgroundChain::Discover {
727                // TODO: overhead
728                list: list
729                    .into_iter()
730                    .map(|(peer_id, addrs)| {
731                        (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
732                    })
733                    .collect::<Vec<_>>()
734                    .into_iter(),
735                important_nodes,
736            })
737            .await
738            .unwrap();
739    }
740
741    /// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
742    /// the network.
743    ///
744    /// Nodes that are discovered might disappear over time. In other words, there is no guarantee
745    /// that a node that has been added through [`NetworkServiceChain::discover`] will later be
746    /// returned by [`NetworkServiceChain::discovered_nodes`].
747    pub async fn discovered_nodes(
748        &self,
749    ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
750        let (tx, rx) = oneshot::channel();
751
752        self.messages_tx
753            .send(ToBackgroundChain::DiscoveredNodes { result: tx })
754            .await
755            .unwrap();
756
757        rx.await
758            .unwrap()
759            .into_iter()
760            .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
761    }
762
763    /// Returns an iterator to the list of [`PeerId`]s that we have an established connection
764    /// with.
765    pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
766        let (tx, rx) = oneshot::channel();
767        self.messages_tx
768            .send(ToBackgroundChain::PeersList { result: tx })
769            .await
770            .unwrap();
771        rx.await.unwrap().into_iter()
772    }
773}
774
775#[derive(Debug, Clone)]
776pub struct BroadcastStatementResult {
777    pub sent: usize,
778    pub total: usize,
779}
780
781/// Event that can happen on the network service.
782#[derive(Debug, Clone)]
783pub enum Event {
784    Connected {
785        peer_id: PeerId,
786        role: Role,
787        best_block_number: u64,
788        best_block_hash: [u8; 32],
789    },
790    Disconnected {
791        peer_id: PeerId,
792    },
793    BlockAnnounce {
794        peer_id: PeerId,
795        announce: service::EncodedBlockAnnounce,
796    },
797    GrandpaNeighborPacket {
798        peer_id: PeerId,
799        finalized_block_height: u64,
800    },
801    /// Received a GrandPa commit message from the network.
802    GrandpaCommitMessage {
803        peer_id: PeerId,
804        message: service::EncodedGrandpaCommitMessage,
805    },
806    /// Received a statement notification from the network.
807    StatementsNotification {
808        peer_id: PeerId,
809        statements: Vec<([u8; 32], codec::Statement)>,
810    },
811}
812
813/// Bitswap event that can be generated by the network service. Because Bitswap messages are big
814/// (up to 2 MiB) and can be delivered at high rate, we use a dedicated subscriber to not copy them
815/// to all network service subscribers.
816#[derive(Debug, Clone)]
817pub enum BitswapEvent {
818    BitswapMessage {
819        peer_id: PeerId,
820        message: service::EncodedBitswapMessage,
821    },
822}
823
824/// Error returned by [`NetworkServiceChain::blocks_request`].
825#[derive(Debug, derive_more::Display, derive_more::Error)]
826pub enum BlocksRequestError {
827    /// No established connection with the target.
828    NoConnection,
829    /// Error during the request.
830    #[display("{_0}")]
831    Request(service::BlocksRequestError),
832}
833
834/// Error returned by [`NetworkServiceChain::grandpa_warp_sync_request`].
835#[derive(Debug, derive_more::Display, derive_more::Error)]
836pub enum WarpSyncRequestError {
837    /// No established connection with the target.
838    NoConnection,
839    /// Error during the request.
840    #[display("{_0}")]
841    Request(service::GrandpaWarpSyncRequestError),
842}
843
844/// Error returned by [`NetworkServiceChain::storage_proof_request`].
845#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
846pub enum StorageProofRequestError {
847    /// No established connection with the target.
848    NoConnection,
849    /// Storage proof request is too large and can't be sent.
850    RequestTooLarge,
851    /// Error during the request.
852    #[display("{_0}")]
853    Request(service::StorageProofRequestError),
854}
855
856/// Error returned by [`NetworkServiceChain::call_proof_request`].
857#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
858pub enum CallProofRequestError {
859    /// No established connection with the target.
860    NoConnection,
861    /// Call proof request is too large and can't be sent.
862    RequestTooLarge,
863    /// Error during the request.
864    #[display("{_0}")]
865    Request(service::CallProofRequestError),
866}
867
868impl CallProofRequestError {
869    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
870    /// issue.
871    pub fn is_network_problem(&self) -> bool {
872        match self {
873            CallProofRequestError::Request(err) => err.is_network_problem(),
874            CallProofRequestError::RequestTooLarge => false,
875            CallProofRequestError::NoConnection => true,
876        }
877    }
878}
879
880/// Error returned by [`NetworkServiceChain::child_storage_proof_request`].
881#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
882pub enum ChildStorageProofRequestError {
883    /// No established connection with the target.
884    NoConnection,
885    /// Child storage proof request is too large and can't be sent.
886    RequestTooLarge,
887    /// Error during the request.
888    #[display("{_0}")]
889    Request(service::StorageProofRequestError),
890}
891
892impl ChildStorageProofRequestError {
893    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
894    /// issue.
895    pub fn is_network_problem(&self) -> bool {
896        match self {
897            ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
898            ChildStorageProofRequestError::RequestTooLarge => false,
899            ChildStorageProofRequestError::NoConnection => true,
900        }
901    }
902}
903
904/// Owned version of [`codec::ChildStorageProofRequestConfig`] for sending across channel.
905struct ChildStorageProofRequestConfigOwned {
906    block_hash: [u8; 32],
907    child_trie: Vec<u8>,
908    keys: Vec<Vec<u8>>,
909}
910
911enum ToBackground<TPlat: PlatformRef> {
912    AddChain {
913        messages_rx: async_channel::Receiver<ToBackgroundChain>,
914        config: service::ChainConfig<Chain<TPlat>>,
915    },
916}
917
918enum ToBackgroundChain {
919    RemoveChain,
920    Subscribe {
921        sender: async_channel::Sender<Event>,
922    },
923    SubscribeBitswap {
924        sender: async_channel::Sender<BitswapEvent>,
925    },
926    DisconnectAndBan {
927        peer_id: PeerId,
928        severity: BanSeverity,
929        reason: &'static str,
930    },
931    // TODO: serialize the request before sending over channel
932    StartBlocksRequest {
933        target: PeerId, // TODO: takes by value because of future longevity issue
934        config: codec::BlocksRequestConfig,
935        timeout: Duration,
936        result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
937    },
938    // TODO: serialize the request before sending over channel
939    StartWarpSyncRequest {
940        target: PeerId,
941        begin_hash: [u8; 32],
942        timeout: Duration,
943        result:
944            oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
945    },
946    // TODO: serialize the request before sending over channel
947    StartStorageProofRequest {
948        target: PeerId,
949        config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
950        timeout: Duration,
951        result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
952    },
953    // TODO: serialize the request before sending over channel
954    StartCallProofRequest {
955        target: PeerId, // TODO: takes by value because of futures longevity issue
956        config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
957        timeout: Duration,
958        result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
959    },
960    // TODO: serialize the request before sending over channel
961    StartChildStorageProofRequest {
962        target: PeerId,
963        config: ChildStorageProofRequestConfigOwned,
964        timeout: Duration,
965        result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
966    },
967    SetLocalBestBlock {
968        best_hash: [u8; 32],
969        best_number: u64,
970    },
971    SetLocalGrandpaState {
972        grandpa_state: service::GrandpaState,
973    },
974    AnnounceTransaction {
975        transaction: Vec<u8>,
976        result: oneshot::Sender<Vec<PeerId>>,
977    },
978    SendBlockAnnounce {
979        target: PeerId,
980        scale_encoded_header: Vec<u8>,
981        is_best: bool,
982        result: oneshot::Sender<Result<(), QueueNotificationError>>,
983    },
984    SendBitswapMessage {
985        target: PeerId,
986        message: Vec<u8>,
987        result: oneshot::Sender<Result<(), SendBitswapMessageError>>,
988    },
989    BroadcastBitswapMessage {
990        message: Vec<u8>,
991        result: oneshot::Sender<Result<Vec<PeerId>, SendBitswapMessageError>>,
992    },
993    BroadcastStatement {
994        statement: Vec<u8>,
995        result: oneshot::Sender<BroadcastStatementResult>,
996    },
997    UpdateTopicAffinity {
998        filter: AffinityFilter,
999    },
1000    Discover {
1001        list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
1002        important_nodes: bool,
1003    },
1004    DiscoveredNodes {
1005        result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
1006    },
1007    PeersList {
1008        result: oneshot::Sender<Vec<PeerId>>,
1009    },
1010}
1011
1012struct BackgroundTask<TPlat: PlatformRef> {
1013    /// See [`Config::platform`].
1014    platform: TPlat,
1015
1016    /// Random number generator.
1017    randomness: rand_chacha::ChaCha20Rng,
1018
1019    /// Value provided through [`Config::identify_agent_version`].
1020    identify_agent_version: String,
1021
1022    /// Channel to send messages to the background task.
1023    tasks_messages_tx:
1024        async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
1025
1026    /// Channel to receive messages destined to the background task.
1027    tasks_messages_rx: Pin<
1028        Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
1029    >,
1030
1031    /// Data structure holding the entire state of the networking.
1032    network: service::ChainNetwork<
1033        Chain<TPlat>,
1034        async_channel::Sender<service::CoordinatorToConnection>,
1035        TPlat::Instant,
1036    >,
1037
1038    /// All known peers and their addresses.
1039    peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
1040
1041    /// Bitswap slot assignment strategy.
1042    bitswap_peering_strategy: bitswap_peering_strategy::BitswapPeeringStrategy<TPlat::Instant>,
1043
1044    /// See [`Config::connections_open_pool_size`].
1045    connections_open_pool_size: u32,
1046
1047    /// See [`Config::connections_open_pool_restore_delay`].
1048    connections_open_pool_restore_delay: Duration,
1049
1050    /// Every time a connection is opened, the value in this field is increased by one. After
1051    /// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
1052    /// one.
1053    num_recent_connection_opening: u32,
1054
1055    /// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
1056    next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
1057
1058    /// List of all open gossip links.
1059    // TODO: using this data structure unfortunately means that PeerIds are cloned a lot, maybe some user data in ChainNetwork is better? not sure
1060    open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
1061
1062    /// Connected peers using statement protocol V2, per chain.
1063    v2_statement_peers: HashMap<ChainId, HashSet<PeerId, fnv::FnvBuildHasher>, fnv::FnvBuildHasher>,
1064
1065    /// Current topic affinity filter per chain, sent to V2 peers on connect.
1066    current_affinity_filter: HashMap<ChainId, AffinityFilter, fnv::FnvBuildHasher>,
1067
1068    /// List of nodes that are considered as important for logging purposes.
1069    // TODO: should also detect whenever we fail to open a block announces substream with any of these peers
1070    important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
1071
1072    /// Event about to be sent on the senders of [`BackgroundTask::event_senders`].
1073    event_pending_send: Option<(ChainId, Event)>,
1074
1075    /// Bitswap event about to be sent on the senders of [`BackgroundTask::bitswap_event_senders`].
1076    bitswap_event_pending_send: Option<BitswapEvent>,
1077
1078    /// Sending events through the public API.
1079    ///
1080    /// Contains either senders, or a `Future` that is currently sending an event and will yield
1081    /// the senders back once it is finished.
1082    // TODO: sort by ChainId instead of using a Vec?
1083    event_senders: either::Either<
1084        Vec<(ChainId, async_channel::Sender<Event>)>,
1085        Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
1086    >,
1087
1088    /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list.
1089    /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders.
1090    pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
1091
1092    /// Sending Bitswap events through the public API. We use separate channels for Bitswap events,
1093    /// because Bitswap messages are big and only few of event subscribers are interested in them.
1094    ///
1095    /// Contains either senders, or a `Future` that is currently sending an event and will yield
1096    /// the senders back once it is finished.
1097    ///
1098    /// Note that compared to `event_senders`, `bitswap_event_senders` are not associated with
1099    /// chains, because Bitswap messages coming from the network do not have the information about
1100    /// what chain they are coming from.
1101    bitswap_event_senders: either::Either<
1102        Vec<async_channel::Sender<BitswapEvent>>,
1103        Pin<Box<dyn Future<Output = Vec<async_channel::Sender<BitswapEvent>>> + Send>>,
1104    >,
1105
1106    /// Whenever [`NetworkServiceChain::subscribe_bitswap`] is called, the new sender is added to
1107    /// this list. Once [`BackgroundTask::bitswap_event_senders`] is ready, we properly initialize
1108    /// these senders.
1109    pending_new_bitswap_subscriptions: Vec<async_channel::Sender<BitswapEvent>>,
1110
1111    main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
1112
1113    messages_rx:
1114        stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
1115
1116    blocks_requests: HashMap<
1117        service::SubstreamId,
1118        oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
1119        fnv::FnvBuildHasher,
1120    >,
1121
1122    grandpa_warp_sync_requests: HashMap<
1123        service::SubstreamId,
1124        oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
1125        fnv::FnvBuildHasher,
1126    >,
1127
1128    storage_proof_requests: HashMap<
1129        service::SubstreamId,
1130        oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
1131        fnv::FnvBuildHasher,
1132    >,
1133
1134    call_proof_requests: HashMap<
1135        service::SubstreamId,
1136        oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
1137        fnv::FnvBuildHasher,
1138    >,
1139
1140    child_storage_proof_requests: HashMap<
1141        service::SubstreamId,
1142        oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
1143        fnv::FnvBuildHasher,
1144    >,
1145
1146    /// All chains, indexed by the value of [`Chain::next_discovery_when`].
1147    chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
1148}
1149
1150struct Chain<TPlat: PlatformRef> {
1151    log_name: String,
1152
1153    // TODO: this field is a hack due to the fact that `add_chain` can't be `async`; should eventually be fixed after a lib.rs refactor
1154    num_references: NonZero<usize>,
1155
1156    /// See [`ConfigChain::block_number_bytes`].
1157    // TODO: redundant with ChainNetwork? since we might not need to know this in the future i'm reluctant to add a getter to ChainNetwork
1158    block_number_bytes: usize,
1159
1160    /// See [`ConfigChain::num_out_slots`].
1161    num_out_slots: usize,
1162
1163    /// When the next discovery should be started for this chain.
1164    next_discovery_when: TPlat::Instant,
1165
1166    /// After [`Chain::next_discovery_when`] is reached, the following discovery happens after
1167    /// the given duration.
1168    next_discovery_period: Duration,
1169}
1170
1171#[derive(Clone)]
1172struct OpenGossipLinkState {
1173    role: Role,
1174    best_block_number: u64,
1175    best_block_hash: [u8; 32],
1176    /// `None` if unknown.
1177    finalized_block_height: Option<u64>,
1178}
1179
1180async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
1181    loop {
1182        // Yield at every loop in order to provide better tasks granularity.
1183        futures_lite::future::yield_now().await;
1184
1185        enum WakeUpReason<TPlat: PlatformRef> {
1186            ForegroundClosed,
1187            Message(ToBackground<TPlat>),
1188            MessageForChain(ChainId, ToBackgroundChain),
1189            NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
1190            CanAssignSlot(PeerId, ChainId),
1191            CanAssignBitswapSlot(PeerId),
1192            NextRecentConnectionRestore,
1193            CanStartConnect(PeerId),
1194            CanOpenGossip(PeerId, ChainId),
1195            CanOpenBitswap(PeerId),
1196            MessageFromConnection {
1197                connection_id: service::ConnectionId,
1198                message: service::ConnectionToCoordinator,
1199            },
1200            MessageToConnection {
1201                connection_id: service::ConnectionId,
1202                message: service::CoordinatorToConnection,
1203            },
1204            EventSendersReady,
1205            BitswapEventSendersReady,
1206            StartDiscovery(ChainId),
1207        }
1208
1209        let wake_up_reason = {
1210            let message_received = async {
1211                task.main_messages_rx
1212                    .next()
1213                    .await
1214                    .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
1215            };
1216            let message_for_chain_received = async {
1217                // Note that when the last entry of `messages_rx` yields `None`, `messages_rx`
1218                // itself will yield `None`. For this reason, we can't use
1219                // `task.messages_rx.is_empty()` to determine whether `messages_rx` will
1220                // yield `None`.
1221                let Some((chain_id, message)) = task.messages_rx.next().await else {
1222                    future::pending().await
1223                };
1224                WakeUpReason::MessageForChain(chain_id, message)
1225            };
1226            let message_from_task_received = async {
1227                let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
1228                WakeUpReason::MessageFromConnection {
1229                    connection_id,
1230                    message,
1231                }
1232            };
1233            let service_event = async {
1234                if let Some(event) = (task.event_pending_send.is_none()
1235                    && task.bitswap_event_pending_send.is_none()
1236                    && task.pending_new_subscriptions.is_empty()
1237                    && task.pending_new_bitswap_subscriptions.is_empty())
1238                .then(|| task.network.next_event())
1239                .flatten()
1240                {
1241                    WakeUpReason::NetworkEvent(event)
1242                } else if let Some(start_connect) = {
1243                    let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1244                        .then(|| {
1245                            task.network
1246                                .unconnected_desired()
1247                                .choose(&mut task.randomness)
1248                                .cloned()
1249                        })
1250                        .flatten();
1251                    x
1252                } {
1253                    WakeUpReason::CanStartConnect(start_connect)
1254                } else if let Some((peer_id, chain_id)) = {
1255                    let x = task
1256                        .network
1257                        .connected_unopened_gossip_desired()
1258                        .choose(&mut task.randomness)
1259                        .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1260                    x
1261                } {
1262                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
1263                } else if let Some(peer_id) = {
1264                    let x = task
1265                        .network
1266                        .connected_unopened_bitswap_desired()
1267                        .choose(&mut task.randomness)
1268                        .cloned();
1269                    x
1270                } {
1271                    WakeUpReason::CanOpenBitswap(peer_id)
1272                } else if let Some((connection_id, message)) =
1273                    task.network.pull_message_to_connection()
1274                {
1275                    WakeUpReason::MessageToConnection {
1276                        connection_id,
1277                        message,
1278                    }
1279                } else {
1280                    'search: loop {
1281                        let mut earlier_unban = None;
1282
1283                        for chain_id in task.network.chains().collect::<Vec<_>>() {
1284                            if task.network.gossip_desired_num(
1285                                chain_id,
1286                                service::GossipKind::ConsensusTransactions,
1287                            ) >= task.network[chain_id].num_out_slots
1288                            {
1289                                continue;
1290                            }
1291
1292                            match task
1293                                .peering_strategy
1294                                .pick_assignable_peer(&chain_id, &task.platform.now())
1295                            {
1296                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1297                                    break 'search WakeUpReason::CanAssignSlot(
1298                                        peer_id.clone(),
1299                                        chain_id,
1300                                    );
1301                                }
1302                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
1303                                    next_unban,
1304                                } => {
1305                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1306                                        earlier_unban = Some(next_unban.clone());
1307                                    }
1308                                }
1309                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
1310                            }
1311                        }
1312
1313                        match task
1314                            .bitswap_peering_strategy
1315                            .pick_assignable_peer(&task.platform.now())
1316                        {
1317                            bitswap_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1318                                break 'search WakeUpReason::CanAssignBitswapSlot(peer_id.clone());
1319                            }
1320                            bitswap_peering_strategy::AssignablePeer::AllPeersBanned {
1321                                next_unban,
1322                            } => {
1323                                if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1324                                    earlier_unban = Some(next_unban.clone());
1325                                }
1326                            }
1327                            bitswap_peering_strategy::AssignablePeer::NoPeer => {}
1328                        }
1329
1330                        if let Some(earlier_unban) = earlier_unban {
1331                            task.platform.sleep_until(earlier_unban).await;
1332                        } else {
1333                            future::pending::<()>().await;
1334                        }
1335                    }
1336                }
1337            };
1338            let next_recent_connection_restore = async {
1339                if task.num_recent_connection_opening != 0
1340                    && task.next_recent_connection_restore.is_none()
1341                {
1342                    task.next_recent_connection_restore = Some(Box::pin(
1343                        task.platform
1344                            .sleep(task.connections_open_pool_restore_delay),
1345                    ));
1346                }
1347                if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1348                    delay.await;
1349                    task.next_recent_connection_restore = None;
1350                    WakeUpReason::NextRecentConnectionRestore
1351                } else {
1352                    future::pending().await
1353                }
1354            };
1355            let finished_sending_event = async {
1356                if let either::Right(event_sending_future) = &mut task.event_senders {
1357                    let event_senders = event_sending_future.await;
1358                    task.event_senders = either::Left(event_senders);
1359                    WakeUpReason::EventSendersReady
1360                } else if task.event_pending_send.is_some()
1361                    || !task.pending_new_subscriptions.is_empty()
1362                {
1363                    WakeUpReason::EventSendersReady
1364                } else {
1365                    future::pending().await
1366                }
1367            };
1368            let finished_sending_bitswap_event = async {
1369                if let either::Right(bitswap_event_sending_future) = &mut task.bitswap_event_senders
1370                {
1371                    let bitswap_event_senders = bitswap_event_sending_future.await;
1372                    task.bitswap_event_senders = either::Left(bitswap_event_senders);
1373                    WakeUpReason::BitswapEventSendersReady
1374                } else if task.bitswap_event_pending_send.is_some()
1375                    || !task.pending_new_bitswap_subscriptions.is_empty()
1376                {
1377                    WakeUpReason::BitswapEventSendersReady
1378                } else {
1379                    future::pending().await
1380                }
1381            };
1382            let start_discovery = async {
1383                let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1384                    future::pending().await
1385                };
1386                next_discovery.get_mut().await;
1387                let ((_, chain_id), _) = next_discovery.remove_entry();
1388                WakeUpReason::StartDiscovery(chain_id)
1389            };
1390
1391            message_for_chain_received
1392                .or(message_received)
1393                .or(message_from_task_received)
1394                .or(service_event)
1395                .or(next_recent_connection_restore)
1396                .or(finished_sending_event)
1397                .or(finished_sending_bitswap_event)
1398                .or(start_discovery)
1399                .await
1400        };
1401
1402        match wake_up_reason {
1403            WakeUpReason::ForegroundClosed => {
1404                // End the task.
1405                return;
1406            }
1407            WakeUpReason::Message(ToBackground::AddChain {
1408                messages_rx,
1409                config,
1410            }) => {
1411                // TODO: this is not a completely clean way of handling duplicate chains, because the existing chain might have a different best block and role and all ; also, multiple sync services will call set_best_block and set_finalized_block
1412                let chain_id = match task.network.add_chain(config) {
1413                    Ok(id) => id,
1414                    Err(service::AddChainError::Duplicate { existing_identical }) => {
1415                        task.network[existing_identical].num_references = task.network
1416                            [existing_identical]
1417                            .num_references
1418                            .checked_add(1)
1419                            .unwrap();
1420                        existing_identical
1421                    }
1422                };
1423
1424                task.chains_by_next_discovery.insert(
1425                    (task.network[chain_id].next_discovery_when.clone(), chain_id),
1426                    Box::pin(
1427                        task.platform
1428                            .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1429                    ),
1430                );
1431
1432                task.messages_rx
1433                    .push(Box::pin(
1434                        messages_rx
1435                            .map(move |msg| (chain_id, msg))
1436                            .chain(stream::once(future::ready((
1437                                chain_id,
1438                                ToBackgroundChain::RemoveChain,
1439                            )))),
1440                    ) as Pin<Box<_>>);
1441
1442                log!(
1443                    &task.platform,
1444                    Debug,
1445                    "network",
1446                    "chain-added",
1447                    id = task.network[chain_id].log_name
1448                );
1449            }
1450            WakeUpReason::EventSendersReady => {
1451                // Dispatch the pending event, if any, to the various senders.
1452
1453                // We made sure that the senders were ready before generating an event.
1454                let either::Left(event_senders) = &mut task.event_senders else {
1455                    unreachable!()
1456                };
1457
1458                if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1459                    task.event_pending_send.take()
1460                {
1461                    let mut event_senders = mem::take(event_senders);
1462                    task.event_senders = either::Right(Box::pin(async move {
1463                        // Elements in `event_senders` are removed one by one and inserted
1464                        // back if the channel is still open.
1465                        for index in (0..event_senders.len()).rev() {
1466                            let (event_sender_chain_id, event_sender) =
1467                                event_senders.swap_remove(index);
1468                            if event_sender_chain_id == event_to_dispatch_chain_id {
1469                                if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1470                                    continue;
1471                                }
1472                            }
1473                            event_senders.push((event_sender_chain_id, event_sender));
1474                        }
1475                        event_senders
1476                    }));
1477                } else if !task.pending_new_subscriptions.is_empty() {
1478                    let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1479                    let mut event_senders = mem::take(event_senders);
1480                    // TODO: cloning :-/
1481                    let open_gossip_links = task.open_gossip_links.clone();
1482                    task.event_senders = either::Right(Box::pin(async move {
1483                        for (chain_id, new_subscription) in pending_new_subscriptions {
1484                            for ((link_chain_id, peer_id), state) in &open_gossip_links {
1485                                // TODO: optimize? this is O(n) by chain
1486                                if *link_chain_id != chain_id {
1487                                    continue;
1488                                }
1489
1490                                let _ = new_subscription
1491                                    .send(Event::Connected {
1492                                        peer_id: peer_id.clone(),
1493                                        role: state.role,
1494                                        best_block_number: state.best_block_number,
1495                                        best_block_hash: state.best_block_hash,
1496                                    })
1497                                    .await;
1498
1499                                if let Some(finalized_block_height) = state.finalized_block_height {
1500                                    let _ = new_subscription
1501                                        .send(Event::GrandpaNeighborPacket {
1502                                            peer_id: peer_id.clone(),
1503                                            finalized_block_height,
1504                                        })
1505                                        .await;
1506                                }
1507                            }
1508
1509                            event_senders.push((chain_id, new_subscription));
1510                        }
1511
1512                        event_senders
1513                    }));
1514                }
1515            }
1516            WakeUpReason::BitswapEventSendersReady => {
1517                // We made sure that the senders were ready before generating an event.
1518                let either::Left(bitswap_event_senders) = &mut task.bitswap_event_senders else {
1519                    unreachable!()
1520                };
1521
1522                if let Some(event_to_dispatch) = task.bitswap_event_pending_send.take() {
1523                    let mut bitswap_event_senders = mem::take(bitswap_event_senders);
1524                    task.bitswap_event_senders = either::Right(Box::pin(async move {
1525                        // Elements in `bitswap_event_senders` are removed one by one and
1526                        // inserted back if the channel is still open.
1527                        for index in (0..bitswap_event_senders.len()).rev() {
1528                            let event_sender = bitswap_event_senders.swap_remove(index);
1529                            if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1530                                continue;
1531                            }
1532                            bitswap_event_senders.push(event_sender);
1533                        }
1534                        bitswap_event_senders
1535                    }));
1536                } else if !task.pending_new_bitswap_subscriptions.is_empty() {
1537                    bitswap_event_senders.append(&mut task.pending_new_bitswap_subscriptions);
1538                }
1539            }
1540            WakeUpReason::MessageFromConnection {
1541                connection_id,
1542                message,
1543            } => {
1544                task.network
1545                    .inject_connection_message(connection_id, message);
1546            }
1547            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1548                if let Some(new_ref) =
1549                    NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1550                {
1551                    task.network[chain_id].num_references = new_ref;
1552                    continue;
1553                }
1554
1555                for peer_id in task
1556                    .network
1557                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1558                    .cloned()
1559                    .collect::<Vec<_>>()
1560                {
1561                    task.network
1562                        .gossip_close(
1563                            chain_id,
1564                            &peer_id,
1565                            service::GossipKind::ConsensusTransactions,
1566                        )
1567                        .unwrap();
1568
1569                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1570                    debug_assert!(_was_in.is_some());
1571                }
1572
1573                let _was_in = task
1574                    .chains_by_next_discovery
1575                    .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1576                debug_assert!(_was_in.is_some());
1577
1578                log!(
1579                    &task.platform,
1580                    Debug,
1581                    "network",
1582                    "chain-removed",
1583                    id = task.network[chain_id].log_name
1584                );
1585                task.v2_statement_peers.remove(&chain_id);
1586                task.current_affinity_filter.remove(&chain_id);
1587                task.network.remove_chain(chain_id).unwrap();
1588                task.peering_strategy.remove_chain_peers(&chain_id);
1589            }
1590            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1591                task.pending_new_subscriptions.push((chain_id, sender));
1592            }
1593            WakeUpReason::MessageForChain(
1594                _chain_id,
1595                ToBackgroundChain::SubscribeBitswap { sender },
1596            ) => {
1597                task.pending_new_bitswap_subscriptions.push(sender);
1598            }
1599            WakeUpReason::MessageForChain(
1600                chain_id,
1601                ToBackgroundChain::DisconnectAndBan {
1602                    peer_id,
1603                    severity,
1604                    reason,
1605                },
1606            ) => {
1607                let ban_duration = Duration::from_secs(match severity {
1608                    BanSeverity::Low => 10,
1609                    BanSeverity::High => 40,
1610                });
1611
1612                let had_slot = matches!(
1613                    task.peering_strategy.unassign_slot_and_ban(
1614                        &chain_id,
1615                        &peer_id,
1616                        task.platform.now() + ban_duration,
1617                    ),
1618                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1619                );
1620
1621                if had_slot {
1622                    log!(
1623                        &task.platform,
1624                        Debug,
1625                        "network",
1626                        "slot-unassigned",
1627                        chain = &task.network[chain_id].log_name,
1628                        peer_id,
1629                        ?ban_duration,
1630                        reason = "user-ban",
1631                        user_reason = reason
1632                    );
1633                    task.network.gossip_remove_desired(
1634                        chain_id,
1635                        &peer_id,
1636                        service::GossipKind::ConsensusTransactions,
1637                    );
1638                }
1639
1640                if task.network.gossip_is_connected(
1641                    chain_id,
1642                    &peer_id,
1643                    service::GossipKind::ConsensusTransactions,
1644                ) {
1645                    let _closed_result = task.network.gossip_close(
1646                        chain_id,
1647                        &peer_id,
1648                        service::GossipKind::ConsensusTransactions,
1649                    );
1650                    debug_assert!(_closed_result.is_ok());
1651
1652                    log!(
1653                        &task.platform,
1654                        Debug,
1655                        "network",
1656                        "gossip-closed",
1657                        chain = &task.network[chain_id].log_name,
1658                        peer_id,
1659                    );
1660
1661                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1662                    debug_assert!(_was_in.is_some());
1663
1664                    if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
1665                        peers.remove(&peer_id);
1666                    }
1667
1668                    debug_assert!(task.event_pending_send.is_none());
1669                    task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1670                }
1671            }
1672            WakeUpReason::MessageForChain(
1673                chain_id,
1674                ToBackgroundChain::StartBlocksRequest {
1675                    target,
1676                    config,
1677                    timeout,
1678                    result,
1679                },
1680            ) => {
1681                match &config.start {
1682                    codec::BlocksRequestConfigStart::Hash(hash) => {
1683                        log!(
1684                            &task.platform,
1685                            Debug,
1686                            "network",
1687                            "blocks-request-started",
1688                            chain = task.network[chain_id].log_name, target,
1689                            start = HashDisplay(hash),
1690                            num = config.desired_count.get(),
1691                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1692                            header = ?config.fields.header, body = ?config.fields.body,
1693                            justifications = ?config.fields.justifications
1694                        );
1695                    }
1696                    codec::BlocksRequestConfigStart::Number(number) => {
1697                        log!(
1698                            &task.platform,
1699                            Debug,
1700                            "network",
1701                            "blocks-request-started",
1702                            chain = task.network[chain_id].log_name, target, start = number,
1703                            num = config.desired_count.get(),
1704                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1705                            header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1706                        );
1707                    }
1708                }
1709
1710                match task
1711                    .network
1712                    .start_blocks_request(&target, chain_id, config.clone(), timeout)
1713                {
1714                    Ok(substream_id) => {
1715                        task.blocks_requests.insert(substream_id, result);
1716                    }
1717                    Err(service::StartRequestError::NoConnection) => {
1718                        log!(
1719                            &task.platform,
1720                            Debug,
1721                            "network",
1722                            "blocks-request-error",
1723                            chain = task.network[chain_id].log_name,
1724                            target,
1725                            error = "NoConnection"
1726                        );
1727                        let _ = result.send(Err(BlocksRequestError::NoConnection));
1728                    }
1729                }
1730            }
1731            WakeUpReason::MessageForChain(
1732                chain_id,
1733                ToBackgroundChain::StartWarpSyncRequest {
1734                    target,
1735                    begin_hash,
1736                    timeout,
1737                    result,
1738                },
1739            ) => {
1740                log!(
1741                    &task.platform,
1742                    Debug,
1743                    "network",
1744                    "warp-sync-request-started",
1745                    chain = task.network[chain_id].log_name,
1746                    target,
1747                    start = HashDisplay(&begin_hash)
1748                );
1749
1750                match task
1751                    .network
1752                    .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1753                {
1754                    Ok(substream_id) => {
1755                        task.grandpa_warp_sync_requests.insert(substream_id, result);
1756                    }
1757                    Err(service::StartRequestError::NoConnection) => {
1758                        log!(
1759                            &task.platform,
1760                            Debug,
1761                            "network",
1762                            "warp-sync-request-error",
1763                            chain = task.network[chain_id].log_name,
1764                            target,
1765                            error = "NoConnection"
1766                        );
1767                        let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1768                    }
1769                }
1770            }
1771            WakeUpReason::MessageForChain(
1772                chain_id,
1773                ToBackgroundChain::StartStorageProofRequest {
1774                    target,
1775                    config,
1776                    timeout,
1777                    result,
1778                },
1779            ) => {
1780                log!(
1781                    &task.platform,
1782                    Debug,
1783                    "network",
1784                    "storage-proof-request-started",
1785                    chain = task.network[chain_id].log_name,
1786                    target,
1787                    block_hash = HashDisplay(&config.block_hash)
1788                );
1789
1790                match task.network.start_storage_proof_request(
1791                    &target,
1792                    chain_id,
1793                    config.clone(),
1794                    timeout,
1795                ) {
1796                    Ok(substream_id) => {
1797                        task.storage_proof_requests.insert(substream_id, result);
1798                    }
1799                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1800                        log!(
1801                            &task.platform,
1802                            Debug,
1803                            "network",
1804                            "storage-proof-request-error",
1805                            chain = task.network[chain_id].log_name,
1806                            target,
1807                            error = "NoConnection"
1808                        );
1809                        let _ = result.send(Err(StorageProofRequestError::NoConnection));
1810                    }
1811                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1812                        log!(
1813                            &task.platform,
1814                            Debug,
1815                            "network",
1816                            "storage-proof-request-error",
1817                            chain = task.network[chain_id].log_name,
1818                            target,
1819                            error = "RequestTooLarge"
1820                        );
1821                        let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1822                    }
1823                };
1824            }
1825            WakeUpReason::MessageForChain(
1826                chain_id,
1827                ToBackgroundChain::StartCallProofRequest {
1828                    target,
1829                    config,
1830                    timeout,
1831                    result,
1832                },
1833            ) => {
1834                log!(
1835                    &task.platform,
1836                    Debug,
1837                    "network",
1838                    "call-proof-request-started",
1839                    chain = task.network[chain_id].log_name,
1840                    target,
1841                    block_hash = HashDisplay(&config.block_hash),
1842                    function = config.method
1843                );
1844                // TODO: log parameter
1845
1846                match task.network.start_call_proof_request(
1847                    &target,
1848                    chain_id,
1849                    config.clone(),
1850                    timeout,
1851                ) {
1852                    Ok(substream_id) => {
1853                        task.call_proof_requests.insert(substream_id, result);
1854                    }
1855                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1856                        log!(
1857                            &task.platform,
1858                            Debug,
1859                            "network",
1860                            "call-proof-request-error",
1861                            chain = task.network[chain_id].log_name,
1862                            target,
1863                            error = "NoConnection"
1864                        );
1865                        let _ = result.send(Err(CallProofRequestError::NoConnection));
1866                    }
1867                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1868                        log!(
1869                            &task.platform,
1870                            Debug,
1871                            "network",
1872                            "call-proof-request-error",
1873                            chain = task.network[chain_id].log_name,
1874                            target,
1875                            error = "RequestTooLarge"
1876                        );
1877                        let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1878                    }
1879                };
1880            }
1881            WakeUpReason::MessageForChain(
1882                chain_id,
1883                ToBackgroundChain::StartChildStorageProofRequest {
1884                    target,
1885                    config,
1886                    timeout,
1887                    result,
1888                },
1889            ) => {
1890                log!(
1891                    &task.platform,
1892                    Debug,
1893                    "network",
1894                    "child-storage-proof-request-started",
1895                    chain = task.network[chain_id].log_name,
1896                    target,
1897                    block_hash = HashDisplay(&config.block_hash)
1898                );
1899
1900                match task.network.start_child_storage_proof_request(
1901                    &target,
1902                    chain_id,
1903                    codec::ChildStorageProofRequestConfig {
1904                        block_hash: config.block_hash,
1905                        child_trie: &config.child_trie,
1906                        keys: config.keys.iter().map(|k| k.as_slice()),
1907                    },
1908                    timeout,
1909                ) {
1910                    Ok(substream_id) => {
1911                        task.child_storage_proof_requests
1912                            .insert(substream_id, result);
1913                    }
1914                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1915                        log!(
1916                            &task.platform,
1917                            Debug,
1918                            "network",
1919                            "child-storage-proof-request-error",
1920                            chain = task.network[chain_id].log_name,
1921                            target,
1922                            error = "NoConnection"
1923                        );
1924                        let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1925                    }
1926                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1927                        log!(
1928                            &task.platform,
1929                            Debug,
1930                            "network",
1931                            "child-storage-proof-request-error",
1932                            chain = task.network[chain_id].log_name,
1933                            target,
1934                            error = "RequestTooLarge"
1935                        );
1936                        let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1937                    }
1938                };
1939            }
1940            WakeUpReason::MessageForChain(
1941                chain_id,
1942                ToBackgroundChain::SetLocalBestBlock {
1943                    best_hash,
1944                    best_number,
1945                },
1946            ) => {
1947                task.network
1948                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1949            }
1950            WakeUpReason::MessageForChain(
1951                chain_id,
1952                ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1953            ) => {
1954                log!(
1955                    &task.platform,
1956                    Debug,
1957                    "network",
1958                    "local-grandpa-state-announced",
1959                    chain = task.network[chain_id].log_name,
1960                    set_id = grandpa_state.set_id,
1961                    commit_finalized_height = grandpa_state.commit_finalized_height,
1962                );
1963
1964                // TODO: log the list of peers we sent the packet to
1965
1966                task.network
1967                    .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1968            }
1969            WakeUpReason::MessageForChain(
1970                chain_id,
1971                ToBackgroundChain::AnnounceTransaction {
1972                    transaction,
1973                    result,
1974                },
1975            ) => {
1976                // TODO: keep track of which peer knows about which transaction, and don't send it again
1977
1978                let peers_to_send = task
1979                    .network
1980                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1981                    .cloned()
1982                    .collect::<Vec<_>>();
1983
1984                let mut peers_sent = Vec::with_capacity(peers_to_send.len());
1985                let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
1986                for peer in &peers_to_send {
1987                    match task
1988                        .network
1989                        .gossip_send_transaction(peer, chain_id, &transaction)
1990                    {
1991                        Ok(()) => peers_sent.push(peer.to_base58()),
1992                        Err(QueueNotificationError::QueueFull) => {
1993                            peers_queue_full.push(peer.to_base58())
1994                        }
1995                        Err(QueueNotificationError::NoConnection) => unreachable!(),
1996                    }
1997                }
1998
1999                log!(
2000                    &task.platform,
2001                    Debug,
2002                    "network",
2003                    "transaction-announced",
2004                    chain = task.network[chain_id].log_name,
2005                    transaction =
2006                        hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
2007                    size = transaction.len(),
2008                    peers_sent = peers_sent.join(", "),
2009                    peers_queue_full = peers_queue_full.join(", "),
2010                );
2011
2012                let _ = result.send(peers_to_send);
2013            }
2014            WakeUpReason::MessageForChain(
2015                chain_id,
2016                ToBackgroundChain::SendBlockAnnounce {
2017                    target,
2018                    scale_encoded_header,
2019                    is_best,
2020                    result,
2021                },
2022            ) => {
2023                // TODO: log who the announce was sent to
2024                let _ = result.send(task.network.gossip_send_block_announce(
2025                    &target,
2026                    chain_id,
2027                    &scale_encoded_header,
2028                    is_best,
2029                ));
2030            }
2031            WakeUpReason::MessageForChain(
2032                _chain_id,
2033                ToBackgroundChain::SendBitswapMessage {
2034                    target,
2035                    message,
2036                    result,
2037                },
2038            ) => {
2039                let _ = result.send(task.network.bitswap_send_message(&target, message));
2040            }
2041            WakeUpReason::MessageForChain(
2042                _chain_id,
2043                ToBackgroundChain::BroadcastBitswapMessage { message, result },
2044            ) => {
2045                let peers = task
2046                    .network
2047                    .established_bitswap_desired()
2048                    .cloned()
2049                    .collect::<Vec<_>>();
2050                let results = peers
2051                    .iter()
2052                    .map(|peer| {
2053                        (
2054                            peer,
2055                            task.network.bitswap_send_message(peer, message.clone()),
2056                        )
2057                    })
2058                    .collect::<Vec<_>>(); // we must collect first to send all messages
2059
2060                let succeeded_peers = results
2061                    .iter()
2062                    .filter_map(|(peer, r)| r.is_ok().then(|| (*peer).clone()))
2063                    .collect::<Vec<_>>();
2064
2065                // TODO: introspecting a third-party error type below doesn't seem good.
2066                let r = if !succeeded_peers.is_empty() {
2067                    Ok(succeeded_peers)
2068                } else if results
2069                    .iter()
2070                    .any(|(_peer, r)| matches!(r, Err(SendBitswapMessageError::QueueFull)))
2071                {
2072                    // `QueueFull` has higher priority than `NoConnection` for possible
2073                    // back-pressure in higher level code.
2074                    Err(SendBitswapMessageError::QueueFull)
2075                } else {
2076                    // This is only emitted if all peers fail with `NoConnection` or there is no
2077                    // peers at all.
2078                    Err(SendBitswapMessageError::NoConnection)
2079                };
2080
2081                let _ = result.send(r);
2082            }
2083            WakeUpReason::MessageForChain(
2084                chain_id,
2085                ToBackgroundChain::BroadcastStatement { statement, result },
2086            ) => {
2087                let peers_to_send = task
2088                    .network
2089                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2090                    .cloned()
2091                    .collect::<Vec<_>>();
2092
2093                let total = peers_to_send.len();
2094                let mut sent = 0;
2095                for peer in &peers_to_send {
2096                    if task
2097                        .network
2098                        .gossip_send_statement(peer, chain_id, statement.clone())
2099                        .is_ok()
2100                    {
2101                        sent += 1;
2102                    }
2103                }
2104
2105                log!(
2106                    &task.platform,
2107                    Debug,
2108                    "network",
2109                    "statement-broadcast",
2110                    chain = task.network[chain_id].log_name,
2111                    sent,
2112                    total,
2113                );
2114
2115                let _ = result.send(BroadcastStatementResult { sent, total });
2116            }
2117            WakeUpReason::MessageForChain(
2118                chain_id,
2119                ToBackgroundChain::UpdateTopicAffinity { filter },
2120            ) => {
2121                task.current_affinity_filter
2122                    .insert(chain_id, filter.clone());
2123                if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2124                    let mut to_remove = Vec::new();
2125                    for peer_id in peers.iter() {
2126                        if let Err(
2127                            SendTopicAffinityError::NoConnection
2128                            | SendTopicAffinityError::ProtocolV1,
2129                        ) = task.network.send_topic_affinity(peer_id, chain_id, &filter)
2130                        {
2131                            to_remove.push(peer_id.clone());
2132                        }
2133                    }
2134                    for peer_id in &to_remove {
2135                        peers.remove(peer_id);
2136                    }
2137                }
2138            }
2139            WakeUpReason::MessageForChain(
2140                chain_id,
2141                ToBackgroundChain::Discover {
2142                    list,
2143                    important_nodes,
2144                },
2145            ) => {
2146                for (peer_id, addrs) in list {
2147                    if important_nodes {
2148                        task.important_nodes.insert(peer_id.clone());
2149                    }
2150
2151                    // Note that we must call this function before `insert_address`, as documented
2152                    // in `basic_peering_strategy`.
2153                    task.peering_strategy
2154                        .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2155
2156                    for addr in addrs {
2157                        let _ =
2158                            task.peering_strategy
2159                                .insert_address(&peer_id, addr.into_bytes(), 10);
2160                        // TODO: constant
2161                    }
2162                }
2163            }
2164            WakeUpReason::MessageForChain(
2165                chain_id,
2166                ToBackgroundChain::DiscoveredNodes { result },
2167            ) => {
2168                // TODO: consider returning Vec<u8>s for the addresses?
2169                let _ = result.send(
2170                    task.peering_strategy
2171                        .chain_peers_unordered(&chain_id)
2172                        .map(|peer_id| {
2173                            let addrs = task
2174                                .peering_strategy
2175                                .peer_addresses(peer_id)
2176                                .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
2177                                .collect::<Vec<_>>();
2178                            (peer_id.clone(), addrs)
2179                        })
2180                        .collect::<Vec<_>>(),
2181                );
2182            }
2183            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
2184                let _ = result.send(
2185                    task.network
2186                        .gossip_connected_peers(
2187                            chain_id,
2188                            service::GossipKind::ConsensusTransactions,
2189                        )
2190                        .cloned()
2191                        .collect(),
2192                );
2193            }
2194            WakeUpReason::StartDiscovery(chain_id) => {
2195                // Re-insert the chain in `chains_by_next_discovery`.
2196                let chain = &mut task.network[chain_id];
2197                chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
2198                chain.next_discovery_period =
2199                    cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
2200                task.chains_by_next_discovery.insert(
2201                    (chain.next_discovery_when.clone(), chain_id),
2202                    Box::pin(
2203                        task.platform
2204                            .sleep(task.network[chain_id].next_discovery_period),
2205                    ),
2206                );
2207
2208                let random_peer_id = {
2209                    let mut pub_key = [0; 32];
2210                    rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
2211                    PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
2212                };
2213
2214                // TODO: select target closest to the random peer instead
2215                let target = task
2216                    .network
2217                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
2218                    .next()
2219                    .cloned();
2220
2221                if let Some(target) = target {
2222                    match task.network.start_kademlia_find_node_request(
2223                        &target,
2224                        chain_id,
2225                        &random_peer_id,
2226                        Duration::from_secs(20),
2227                    ) {
2228                        Ok(_) => {}
2229                        Err(service::StartRequestError::NoConnection) => unreachable!(),
2230                    };
2231
2232                    log!(
2233                        &task.platform,
2234                        Debug,
2235                        "network",
2236                        "discovery-find-node-started",
2237                        chain = &task.network[chain_id].log_name,
2238                        request_target = target,
2239                        requested_peer_id = random_peer_id
2240                    );
2241                } else {
2242                    log!(
2243                        &task.platform,
2244                        Debug,
2245                        "network",
2246                        "discovery-skipped-no-peer",
2247                        chain = &task.network[chain_id].log_name
2248                    );
2249                }
2250            }
2251            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
2252                peer_id,
2253                expected_peer_id,
2254                id,
2255            }) => {
2256                let remote_addr =
2257                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
2258                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
2259                {
2260                    log!(
2261                        &task.platform,
2262                        Debug,
2263                        "network",
2264                        "handshake-finished-peer-id-mismatch",
2265                        remote_addr,
2266                        expected_peer_id,
2267                        actual_peer_id = peer_id
2268                    );
2269
2270                    let _was_in = task
2271                        .peering_strategy
2272                        .decrease_address_connections_and_remove_if_zero(
2273                            expected_peer_id,
2274                            remote_addr.as_ref(),
2275                        );
2276                    debug_assert!(_was_in.is_ok());
2277                    let _ = task.peering_strategy.increase_address_connections(
2278                        &peer_id,
2279                        remote_addr.into_bytes().to_vec(),
2280                        10,
2281                    );
2282                } else {
2283                    log!(
2284                        &task.platform,
2285                        Debug,
2286                        "network",
2287                        "handshake-finished",
2288                        remote_addr,
2289                        peer_id
2290                    );
2291                }
2292
2293                task.bitswap_peering_strategy
2294                    .increase_peer_connections(&peer_id);
2295            }
2296            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2297                expected_peer_id: Some(_),
2298                ..
2299            })
2300            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
2301                let (address, peer_id, handshake_finished) = match wake_up_reason {
2302                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2303                        address,
2304                        expected_peer_id: Some(peer_id),
2305                        ..
2306                    }) => (address, peer_id, false),
2307                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
2308                        address,
2309                        peer_id,
2310                        ..
2311                    }) => (address, peer_id, true),
2312                    _ => unreachable!(),
2313                };
2314
2315                task.peering_strategy
2316                    .decrease_address_connections(&peer_id, &address)
2317                    .unwrap();
2318                let address = Multiaddr::from_bytes(address).unwrap();
2319                log!(
2320                    &task.platform,
2321                    Debug,
2322                    "network",
2323                    "connection-shutdown",
2324                    peer_id,
2325                    address,
2326                    ?handshake_finished
2327                );
2328
2329                // Ban the peer in order to avoid trying over and over again the same address(es).
2330                // Even if the handshake was finished, it is possible that the peer simply shuts
2331                // down connections immediately after it has been opened, hence the ban.
2332                // Due to race conditions and peerid mismatches, it is possible that there is
2333                // another existing connection or connection attempt with that same peer. However,
2334                // it is not possible to be sure that we will reach 0 connections or connection
2335                // attempts, and thus we ban the peer every time.
2336                let ban_duration = Duration::from_secs(5);
2337                task.network.gossip_remove_desired_all(
2338                    &peer_id,
2339                    service::GossipKind::ConsensusTransactions,
2340                );
2341                for (&chain_id, what_happened) in task
2342                    .peering_strategy
2343                    .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
2344                {
2345                    if matches!(
2346                        what_happened,
2347                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2348                    ) {
2349                        log!(
2350                            &task.platform,
2351                            Debug,
2352                            "network",
2353                            "slot-unassigned",
2354                            chain = &task.network[chain_id].log_name,
2355                            peer_id,
2356                            ?ban_duration,
2357                            // TODO: `reason` might be wrong, `handshake_finished` is not checked.
2358                            reason = "pre-handshake-disconnect"
2359                        );
2360                    }
2361                }
2362
2363                if handshake_finished {
2364                    task.network.bitswap_remove_desired(&peer_id);
2365                    let what_happened = task
2366                        .bitswap_peering_strategy
2367                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration);
2368                    if matches!(
2369                        what_happened,
2370                        bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true },
2371                    ) {
2372                        log!(
2373                            &task.platform,
2374                            Debug,
2375                            "network",
2376                            "bitswap-slot-unassigned",
2377                            peer_id,
2378                            ?ban_duration,
2379                            reason = "disconnect",
2380                        );
2381                    }
2382                    let _ = task
2383                        .bitswap_peering_strategy
2384                        .decrease_peer_connections(&peer_id);
2385                }
2386            }
2387            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
2388                expected_peer_id: None,
2389                ..
2390            }) => {
2391                // This path can't be reached as we always set an expected peer id when creating
2392                // a connection.
2393                debug_assert!(false);
2394            }
2395            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
2396                id,
2397                peer_id,
2398                ping_time,
2399            }) => {
2400                let remote_addr =
2401                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
2402                log!(
2403                    &task.platform,
2404                    Debug,
2405                    "network",
2406                    "pong",
2407                    peer_id,
2408                    remote_addr,
2409                    ?ping_time
2410                );
2411            }
2412            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
2413                chain_id,
2414                peer_id,
2415                announce,
2416            }) => {
2417                log!(
2418                    &task.platform,
2419                    Debug,
2420                    "network",
2421                    "block-announce-received",
2422                    chain = &task.network[chain_id].log_name,
2423                    peer_id,
2424                    block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
2425                        announce.decode().scale_encoded_header
2426                    )),
2427                    is_best = announce.decode().is_best
2428                );
2429
2430                let decoded_announce = announce.decode();
2431                if decoded_announce.is_best {
2432                    let link = task
2433                        .open_gossip_links
2434                        .get_mut(&(chain_id, peer_id.clone()))
2435                        .unwrap();
2436                    if let Ok(decoded) = header::decode(
2437                        decoded_announce.scale_encoded_header,
2438                        task.network[chain_id].block_number_bytes,
2439                    ) {
2440                        link.best_block_hash = header::hash_from_scale_encoded_header(
2441                            decoded_announce.scale_encoded_header,
2442                        );
2443                        link.best_block_number = decoded.number;
2444                    }
2445                }
2446
2447                debug_assert!(task.event_pending_send.is_none());
2448                task.event_pending_send =
2449                    Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2450            }
2451            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2452                peer_id,
2453                chain_id,
2454                role,
2455                best_number,
2456                best_hash,
2457                kind: service::GossipKind::ConsensusTransactions,
2458            }) => {
2459                log!(
2460                    &task.platform,
2461                    Debug,
2462                    "network",
2463                    "gossip-open-success",
2464                    chain = &task.network[chain_id].log_name,
2465                    peer_id,
2466                    best_number,
2467                    best_hash = HashDisplay(&best_hash)
2468                );
2469
2470                let _prev_value = task.open_gossip_links.insert(
2471                    (chain_id, peer_id.clone()),
2472                    OpenGossipLinkState {
2473                        best_block_number: best_number,
2474                        best_block_hash: best_hash,
2475                        role,
2476                        finalized_block_height: None,
2477                    },
2478                );
2479                debug_assert!(_prev_value.is_none());
2480
2481                debug_assert!(task.event_pending_send.is_none());
2482                task.event_pending_send = Some((
2483                    chain_id,
2484                    Event::Connected {
2485                        peer_id,
2486                        role,
2487                        best_block_number: best_number,
2488                        best_block_hash: best_hash,
2489                    },
2490                ));
2491            }
2492            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2493                peer_id,
2494                chain_id,
2495                error,
2496                kind: service::GossipKind::ConsensusTransactions,
2497            }) => {
2498                log!(
2499                    &task.platform,
2500                    Debug,
2501                    "network",
2502                    "gossip-open-error",
2503                    chain = &task.network[chain_id].log_name,
2504                    peer_id,
2505                    ?error,
2506                );
2507                let ban_duration = Duration::from_secs(15);
2508
2509                // Note that peer doesn't necessarily have an out slot, as this event might happen
2510                // as a result of an inbound gossip connection.
2511                let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2512                    matches!(
2513                        task.peering_strategy
2514                            .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2515                        basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2516                    )
2517                } else {
2518                    matches!(
2519                        task.peering_strategy.unassign_slot_and_ban(
2520                            &chain_id,
2521                            &peer_id,
2522                            task.platform.now() + ban_duration,
2523                        ),
2524                        basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2525                    )
2526                };
2527
2528                if had_slot {
2529                    log!(
2530                        &task.platform,
2531                        Debug,
2532                        "network",
2533                        "slot-unassigned",
2534                        chain = &task.network[chain_id].log_name,
2535                        peer_id,
2536                        ?ban_duration,
2537                        reason = "gossip-open-failed"
2538                    );
2539                    task.network.gossip_remove_desired(
2540                        chain_id,
2541                        &peer_id,
2542                        service::GossipKind::ConsensusTransactions,
2543                    );
2544                }
2545            }
2546            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2547                peer_id,
2548                chain_id,
2549                kind: service::GossipKind::ConsensusTransactions,
2550            }) => {
2551                log!(
2552                    &task.platform,
2553                    Debug,
2554                    "network",
2555                    "gossip-closed",
2556                    chain = &task.network[chain_id].log_name,
2557                    peer_id,
2558                );
2559                let ban_duration = Duration::from_secs(10);
2560
2561                let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2562                debug_assert!(_was_in.is_some());
2563
2564                // Note that peer doesn't necessarily have an out slot, as this event might happen
2565                // as a result of an inbound gossip connection.
2566                if matches!(
2567                    task.peering_strategy.unassign_slot_and_ban(
2568                        &chain_id,
2569                        &peer_id,
2570                        task.platform.now() + ban_duration,
2571                    ),
2572                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2573                ) {
2574                    log!(
2575                        &task.platform,
2576                        Debug,
2577                        "network",
2578                        "slot-unassigned",
2579                        chain = &task.network[chain_id].log_name,
2580                        peer_id,
2581                        ?ban_duration,
2582                        reason = "gossip-closed"
2583                    );
2584                    task.network.gossip_remove_desired(
2585                        chain_id,
2586                        &peer_id,
2587                        service::GossipKind::ConsensusTransactions,
2588                    );
2589                }
2590
2591                if let Some(peers) = task.v2_statement_peers.get_mut(&chain_id) {
2592                    peers.remove(&peer_id);
2593                }
2594
2595                debug_assert!(task.event_pending_send.is_none());
2596                task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2597            }
2598            WakeUpReason::NetworkEvent(service::Event::BitswapConnected { peer_id }) => {
2599                log!(
2600                    &task.platform,
2601                    Debug,
2602                    "network",
2603                    "bitswap-open-success",
2604                    peer_id
2605                );
2606            }
2607            WakeUpReason::NetworkEvent(service::Event::BitswapOpenFailed { peer_id, error }) => {
2608                log!(
2609                    &task.platform,
2610                    Debug,
2611                    "network",
2612                    "bitswap-open-error",
2613                    peer_id,
2614                    ?error
2615                );
2616                let ban_duration = if error.is_protocol_not_available() {
2617                    Duration::from_secs(600)
2618                } else {
2619                    Duration::from_secs(15)
2620                };
2621                if matches!(
2622                    task.bitswap_peering_strategy
2623                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2624                    bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2625                ) {
2626                    log!(
2627                        &task.platform,
2628                        Debug,
2629                        "network",
2630                        "bitswap-slot-unassigned",
2631                        peer_id,
2632                        ?ban_duration,
2633                        reason = "bitswap-open-failed"
2634                    );
2635                    task.network.bitswap_remove_desired(&peer_id);
2636                }
2637            }
2638            WakeUpReason::NetworkEvent(service::Event::BitswapMessage { peer_id, message }) => {
2639                log!(
2640                    &task.platform,
2641                    Debug,
2642                    "network",
2643                    "bitswap-message-received",
2644                    peer_id
2645                );
2646                debug_assert!(task.bitswap_event_pending_send.is_none());
2647                task.bitswap_event_pending_send =
2648                    Some(BitswapEvent::BitswapMessage { peer_id, message });
2649            }
2650            WakeUpReason::NetworkEvent(service::Event::BitswapDisconnected { peer_id }) => {
2651                log!(&task.platform, Debug, "network", "bitswap-closed", peer_id);
2652                let ban_duration = Duration::from_secs(10);
2653                if matches!(
2654                    task.bitswap_peering_strategy
2655                        .unassign_slot_and_ban(&peer_id, task.platform.now() + ban_duration,),
2656                    bitswap_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2657                ) {
2658                    log!(
2659                        &task.platform,
2660                        Debug,
2661                        "network",
2662                        "bitswap-slot-unassigned",
2663                        peer_id,
2664                        ?ban_duration,
2665                        reason = "bitswap-closed"
2666                    );
2667                    task.network.bitswap_remove_desired(&peer_id);
2668                }
2669            }
2670            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2671                substream_id,
2672                peer_id,
2673                chain_id,
2674                response: service::RequestResult::Blocks(response),
2675            }) => {
2676                match &response {
2677                    Ok(blocks) => {
2678                        log!(
2679                            &task.platform,
2680                            Debug,
2681                            "network",
2682                            "blocks-request-success",
2683                            chain = task.network[chain_id].log_name,
2684                            target = peer_id,
2685                            num_blocks = blocks.len(),
2686                            block_data_total_size =
2687                                BytesDisplay(blocks.iter().fold(0, |sum, block| {
2688                                    let block_size = block.header.as_ref().map_or(0, |h| h.len())
2689                                        + block
2690                                            .body
2691                                            .as_ref()
2692                                            .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2693                                        + block
2694                                            .justifications
2695                                            .as_ref()
2696                                            .into_iter()
2697                                            .flat_map(|l| l.iter())
2698                                            .fold(0, |s, j| s + j.justification.len());
2699                                    sum + u64::try_from(block_size).unwrap()
2700                                }))
2701                        );
2702                    }
2703                    Err(error) => {
2704                        log!(
2705                            &task.platform,
2706                            Debug,
2707                            "network",
2708                            "blocks-request-error",
2709                            chain = task.network[chain_id].log_name,
2710                            target = peer_id,
2711                            ?error
2712                        );
2713                    }
2714                }
2715
2716                match &response {
2717                    Ok(_) => {}
2718                    Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2719                    Err(err) => {
2720                        log!(
2721                            &task.platform,
2722                            Debug,
2723                            "network",
2724                            format!(
2725                                "Error in block request with {}. This might indicate an \
2726                                incompatibility. Error: {}",
2727                                peer_id, err
2728                            )
2729                        );
2730                    }
2731                }
2732
2733                let _ = task
2734                    .blocks_requests
2735                    .remove(&substream_id)
2736                    .unwrap()
2737                    .send(response.map_err(BlocksRequestError::Request));
2738            }
2739            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2740                substream_id,
2741                peer_id,
2742                chain_id,
2743                response: service::RequestResult::GrandpaWarpSync(response),
2744            }) => {
2745                match &response {
2746                    Ok(response) => {
2747                        // TODO: print total bytes size
2748                        let decoded = response.decode();
2749                        log!(
2750                            &task.platform,
2751                            Debug,
2752                            "network",
2753                            "warp-sync-request-success",
2754                            chain = task.network[chain_id].log_name,
2755                            target = peer_id,
2756                            num_fragments = decoded.fragments.len(),
2757                            is_finished = ?decoded.is_finished,
2758                        );
2759                    }
2760                    Err(error) => {
2761                        log!(
2762                            &task.platform,
2763                            Debug,
2764                            "network",
2765                            "warp-sync-request-error",
2766                            chain = task.network[chain_id].log_name,
2767                            target = peer_id,
2768                            ?error,
2769                        );
2770                    }
2771                }
2772
2773                let _ = task
2774                    .grandpa_warp_sync_requests
2775                    .remove(&substream_id)
2776                    .unwrap()
2777                    .send(response.map_err(WarpSyncRequestError::Request));
2778            }
2779            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2780                substream_id,
2781                peer_id,
2782                chain_id,
2783                response: service::RequestResult::StorageProof(response),
2784            }) => {
2785                match &response {
2786                    Ok(items) => {
2787                        let decoded = items.decode();
2788                        log!(
2789                            &task.platform,
2790                            Debug,
2791                            "network",
2792                            "storage-proof-request-success",
2793                            chain = task.network[chain_id].log_name,
2794                            target = peer_id,
2795                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2796                        );
2797                    }
2798                    Err(error) => {
2799                        log!(
2800                            &task.platform,
2801                            Debug,
2802                            "network",
2803                            "storage-proof-request-error",
2804                            chain = task.network[chain_id].log_name,
2805                            target = peer_id,
2806                            ?error
2807                        );
2808                    }
2809                }
2810
2811                // Both regular storage proof and child storage proof use the same protocol,
2812                // so check both HashMaps for the request.
2813                if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2814                    let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2815                } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2816                {
2817                    let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2818                } else {
2819                    unreachable!()
2820                }
2821            }
2822            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2823                substream_id,
2824                peer_id,
2825                chain_id,
2826                response: service::RequestResult::CallProof(response),
2827            }) => {
2828                match &response {
2829                    Ok(items) => {
2830                        let decoded = items.decode();
2831                        log!(
2832                            &task.platform,
2833                            Debug,
2834                            "network",
2835                            "call-proof-request-success",
2836                            chain = task.network[chain_id].log_name,
2837                            target = peer_id,
2838                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2839                        );
2840                    }
2841                    Err(error) => {
2842                        log!(
2843                            &task.platform,
2844                            Debug,
2845                            "network",
2846                            "call-proof-request-error",
2847                            chain = task.network[chain_id].log_name,
2848                            target = peer_id,
2849                            ?error
2850                        );
2851                    }
2852                }
2853
2854                let _ = task
2855                    .call_proof_requests
2856                    .remove(&substream_id)
2857                    .unwrap()
2858                    .send(response.map_err(CallProofRequestError::Request));
2859            }
2860            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2861                peer_id: requestee_peer_id,
2862                chain_id,
2863                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2864                ..
2865            }) => {
2866                for (peer_id, mut addrs) in nodes {
2867                    // Make sure to not insert too many address for a single peer.
2868                    // While the .
2869                    if addrs.len() >= 10 {
2870                        addrs.truncate(10);
2871                    }
2872
2873                    let mut valid_addrs = Vec::with_capacity(addrs.len());
2874                    for addr in addrs {
2875                        match Multiaddr::from_bytes(addr) {
2876                            Ok(a) => {
2877                                if platform::address_parse::multiaddr_to_address(&a)
2878                                    .ok()
2879                                    .map_or(false, |addr| {
2880                                        task.platform.supports_connection_type((&addr).into())
2881                                    })
2882                                {
2883                                    valid_addrs.push(a)
2884                                } else {
2885                                    log!(
2886                                        &task.platform,
2887                                        Debug,
2888                                        "network",
2889                                        "discovered-address-not-supported",
2890                                        chain = &task.network[chain_id].log_name,
2891                                        peer_id,
2892                                        addr = &a,
2893                                        obtained_from = requestee_peer_id
2894                                    );
2895                                }
2896                            }
2897                            Err((error, addr)) => {
2898                                log!(
2899                                    &task.platform,
2900                                    Debug,
2901                                    "network",
2902                                    "discovered-address-invalid",
2903                                    chain = &task.network[chain_id].log_name,
2904                                    peer_id,
2905                                    error,
2906                                    addr = hex::encode(&addr),
2907                                    obtained_from = requestee_peer_id
2908                                );
2909                            }
2910                        }
2911                    }
2912
2913                    if !valid_addrs.is_empty() {
2914                        // Note that we must call this function before `insert_address`,
2915                        // as documented in `basic_peering_strategy`.
2916                        let insert_outcome =
2917                            task.peering_strategy
2918                                .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2919
2920                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2921                            peer_removed,
2922                        } = insert_outcome
2923                        {
2924                            if let Some(peer_removed) = peer_removed {
2925                                log!(
2926                                    &task.platform,
2927                                    Debug,
2928                                    "network",
2929                                    "peer-purged-from-address-book",
2930                                    chain = &task.network[chain_id].log_name,
2931                                    peer_id = peer_removed,
2932                                );
2933                            }
2934
2935                            log!(
2936                                &task.platform,
2937                                Debug,
2938                                "network",
2939                                "peer-discovered",
2940                                chain = &task.network[chain_id].log_name,
2941                                peer_id,
2942                                addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), // TODO: better formatting?
2943                                obtained_from = requestee_peer_id
2944                            );
2945                        }
2946                    }
2947
2948                    for addr in valid_addrs {
2949                        let _insert_result =
2950                            task.peering_strategy
2951                                .insert_address(&peer_id, addr.into_bytes(), 10); // TODO: constant
2952                        debug_assert!(!matches!(
2953                            _insert_result,
2954                            basic_peering_strategy::InsertAddressResult::UnknownPeer
2955                        ));
2956                    }
2957                }
2958            }
2959            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2960                peer_id,
2961                chain_id,
2962                response: service::RequestResult::KademliaFindNode(Err(error)),
2963                ..
2964            }) => {
2965                log!(
2966                    &task.platform,
2967                    Debug,
2968                    "network",
2969                    "discovery-find-node-error",
2970                    chain = &task.network[chain_id].log_name,
2971                    ?error,
2972                    find_node_target = peer_id,
2973                );
2974
2975                // No error is printed if the request fails due to a benign networking error such
2976                // as an unresponsive peer.
2977                match error {
2978                    service::KademliaFindNodeError::RequestFailed(err)
2979                        if !err.is_protocol_error() => {}
2980
2981                    service::KademliaFindNodeError::RequestFailed(
2982                        service::RequestError::Substream(
2983                            connection::established::RequestError::ProtocolNotAvailable,
2984                        ),
2985                    ) => {
2986                        // TODO: remove this warning in a long time
2987                        log!(
2988                            &task.platform,
2989                            Warn,
2990                            "network",
2991                            format!(
2992                                "Problem during discovery on {}: protocol not available. \
2993                                This might indicate that the version of Substrate used by \
2994                                the chain doesn't include \
2995                                <https://github.com/paritytech/substrate/pull/12545>.",
2996                                &task.network[chain_id].log_name
2997                            )
2998                        );
2999                    }
3000                    _ => {
3001                        log!(
3002                            &task.platform,
3003                            Debug,
3004                            "network",
3005                            format!(
3006                                "Problem during discovery on {}: {}",
3007                                &task.network[chain_id].log_name, error
3008                            )
3009                        );
3010                    }
3011                }
3012            }
3013            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
3014                // We never start any other kind of requests.
3015                unreachable!()
3016            }
3017            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
3018                peer_id,
3019                chain_id,
3020                kind: service::GossipKind::ConsensusTransactions,
3021            }) => {
3022                // The networking state machine guarantees that `GossipInDesired`
3023                // can't happen if we are already opening an out slot, which we do
3024                // immediately.
3025                // TODO: add debug_assert! ^
3026                if task
3027                    .network
3028                    .opened_gossip_undesired_by_chain(chain_id)
3029                    .count()
3030                    < 4
3031                {
3032                    log!(
3033                        &task.platform,
3034                        Debug,
3035                        "network",
3036                        "gossip-in-request",
3037                        chain = &task.network[chain_id].log_name,
3038                        peer_id,
3039                        outcome = "accepted"
3040                    );
3041                    task.network
3042                        .gossip_open(
3043                            chain_id,
3044                            &peer_id,
3045                            service::GossipKind::ConsensusTransactions,
3046                        )
3047                        .unwrap();
3048                } else {
3049                    log!(
3050                        &task.platform,
3051                        Debug,
3052                        "network",
3053                        "gossip-in-request",
3054                        chain = &task.network[chain_id].log_name,
3055                        peer_id,
3056                        outcome = "rejected",
3057                    );
3058                    task.network
3059                        .gossip_close(
3060                            chain_id,
3061                            &peer_id,
3062                            service::GossipKind::ConsensusTransactions,
3063                        )
3064                        .unwrap();
3065                }
3066            }
3067            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
3068                // Can't happen as we already instantaneously accept or reject gossip in requests.
3069                unreachable!()
3070            }
3071            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
3072                peer_id,
3073                substream_id,
3074            }) => {
3075                log!(
3076                    &task.platform,
3077                    Debug,
3078                    "network",
3079                    "identify-request-received",
3080                    peer_id,
3081                );
3082                task.network
3083                    .respond_identify(substream_id, &task.identify_agent_version);
3084            }
3085            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
3086            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
3087                // All incoming requests are immediately answered.
3088                unreachable!()
3089            }
3090            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
3091                chain_id,
3092                peer_id,
3093                state,
3094            }) => {
3095                log!(
3096                    &task.platform,
3097                    Debug,
3098                    "network",
3099                    "grandpa-neighbor-packet-received",
3100                    chain = &task.network[chain_id].log_name,
3101                    peer_id,
3102                    round_number = state.round_number,
3103                    set_id = state.set_id,
3104                    commit_finalized_height = state.commit_finalized_height,
3105                );
3106
3107                task.open_gossip_links
3108                    .get_mut(&(chain_id, peer_id.clone()))
3109                    .unwrap()
3110                    .finalized_block_height = Some(state.commit_finalized_height);
3111
3112                debug_assert!(task.event_pending_send.is_none());
3113                task.event_pending_send = Some((
3114                    chain_id,
3115                    Event::GrandpaNeighborPacket {
3116                        peer_id,
3117                        finalized_block_height: state.commit_finalized_height,
3118                    },
3119                ));
3120            }
3121            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
3122                chain_id,
3123                peer_id,
3124                message,
3125            }) => {
3126                log!(
3127                    &task.platform,
3128                    Debug,
3129                    "network",
3130                    "grandpa-commit-message-received",
3131                    chain = &task.network[chain_id].log_name,
3132                    peer_id,
3133                    target_block_hash = HashDisplay(message.decode().target_hash),
3134                );
3135
3136                debug_assert!(task.event_pending_send.is_none());
3137                task.event_pending_send =
3138                    Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
3139            }
3140            WakeUpReason::NetworkEvent(service::Event::StatementsNotification {
3141                chain_id,
3142                peer_id,
3143                statements,
3144            }) => {
3145                debug_assert!(task.event_pending_send.is_none());
3146
3147                if statements.is_empty() {
3148                    continue;
3149                }
3150
3151                task.event_pending_send = Some((
3152                    chain_id,
3153                    Event::StatementsNotification {
3154                        peer_id,
3155                        statements,
3156                    },
3157                ));
3158            }
3159            WakeUpReason::NetworkEvent(service::Event::StatementProtocolConnected {
3160                peer_id,
3161                chain_id,
3162                version,
3163            }) => {
3164                if matches!(version, codec::StatementProtocolVersion::V2) {
3165                    task.v2_statement_peers
3166                        .entry(chain_id)
3167                        .or_insert_with(|| {
3168                            HashSet::with_capacity_and_hasher(16, Default::default())
3169                        })
3170                        .insert(peer_id.clone());
3171                    if let Some(filter) = task.current_affinity_filter.get(&chain_id) {
3172                        if let Err(
3173                            SendTopicAffinityError::NoConnection
3174                            | SendTopicAffinityError::ProtocolV1,
3175                        ) = task.network.send_topic_affinity(&peer_id, chain_id, filter)
3176                        {
3177                            task.v2_statement_peers
3178                                .get_mut(&chain_id)
3179                                .unwrap()
3180                                .remove(&peer_id);
3181                        }
3182                    }
3183                }
3184            }
3185            // TODO: we don't filter outbound statements yet
3186            WakeUpReason::NetworkEvent(service::Event::StatementTopicAffinityReceived {
3187                ..
3188            }) => {}
3189            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
3190                // TODO: handle properly?
3191                log!(
3192                    &task.platform,
3193                    Warn,
3194                    "network",
3195                    "protocol-error",
3196                    peer_id,
3197                    ?error
3198                );
3199
3200                // TODO: disconnect peer
3201            }
3202            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
3203                task.peering_strategy.assign_slot(&chain_id, &peer_id);
3204
3205                log!(
3206                    &task.platform,
3207                    Debug,
3208                    "network",
3209                    "slot-assigned",
3210                    chain = &task.network[chain_id].log_name,
3211                    peer_id
3212                );
3213
3214                task.network.gossip_insert_desired(
3215                    chain_id,
3216                    peer_id,
3217                    service::GossipKind::ConsensusTransactions,
3218                );
3219            }
3220            WakeUpReason::CanAssignBitswapSlot(peer_id) => {
3221                task.bitswap_peering_strategy.assign_slot(&peer_id).unwrap();
3222
3223                log!(
3224                    &task.platform,
3225                    Debug,
3226                    "network",
3227                    "bitswap-slot-assigned",
3228                    peer_id
3229                );
3230
3231                task.network.bitswap_insert_desired(peer_id);
3232            }
3233            WakeUpReason::NextRecentConnectionRestore => {
3234                task.num_recent_connection_opening =
3235                    task.num_recent_connection_opening.saturating_sub(1);
3236            }
3237            WakeUpReason::CanStartConnect(expected_peer_id) => {
3238                let Some(multiaddr) = task
3239                    .peering_strategy
3240                    .pick_address_and_add_connection(&expected_peer_id)
3241                else {
3242                    // There is no address for that peer in the address book.
3243                    task.network.gossip_remove_desired_all(
3244                        &expected_peer_id,
3245                        service::GossipKind::ConsensusTransactions,
3246                    );
3247                    let ban_duration = Duration::from_secs(10);
3248                    for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
3249                        &expected_peer_id,
3250                        task.platform.now() + ban_duration,
3251                    ) {
3252                        if matches!(
3253                            what_happened,
3254                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
3255                        ) {
3256                            log!(
3257                                &task.platform,
3258                                Debug,
3259                                "network",
3260                                "slot-unassigned",
3261                                chain = &task.network[chain_id].log_name,
3262                                peer_id = expected_peer_id,
3263                                ?ban_duration,
3264                                reason = "no-address"
3265                            );
3266                        }
3267                    }
3268                    continue;
3269                };
3270
3271                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
3272                    Ok(a) => a,
3273                    Err((multiaddr::FromBytesError, addr)) => {
3274                        // Address is in an invalid format.
3275                        let _was_in = task
3276                            .peering_strategy
3277                            .decrease_address_connections_and_remove_if_zero(
3278                                &expected_peer_id,
3279                                &addr,
3280                            );
3281                        debug_assert!(_was_in.is_ok());
3282                        continue;
3283                    }
3284                };
3285
3286                let address = address_parse::multiaddr_to_address(&multiaddr)
3287                    .ok()
3288                    .filter(|addr| {
3289                        task.platform.supports_connection_type(match &addr {
3290                            address_parse::AddressOrMultiStreamAddress::Address(addr) => {
3291                                From::from(addr)
3292                            }
3293                            address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3294                                addr,
3295                            ) => From::from(addr),
3296                        })
3297                    });
3298
3299                let Some(address) = address else {
3300                    // Address is in an invalid format or isn't supported by the platform.
3301                    let _was_in = task
3302                        .peering_strategy
3303                        .decrease_address_connections_and_remove_if_zero(
3304                            &expected_peer_id,
3305                            multiaddr.as_ref(),
3306                        );
3307                    debug_assert!(_was_in.is_ok());
3308                    continue;
3309                };
3310
3311                // Each connection has its own individual Noise key.
3312                let noise_key = {
3313                    let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
3314                    task.platform.fill_random_bytes(&mut *noise_static_key);
3315                    let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
3316                    task.platform.fill_random_bytes(&mut *libp2p_key);
3317                    connection::NoiseKey::new(&libp2p_key, &noise_static_key)
3318                };
3319
3320                log!(
3321                    &task.platform,
3322                    Debug,
3323                    "network",
3324                    "connection-started",
3325                    expected_peer_id,
3326                    remote_addr = multiaddr,
3327                    local_peer_id =
3328                        peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
3329                            .into_peer_id(),
3330                );
3331
3332                task.num_recent_connection_opening += 1;
3333
3334                let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
3335                    async_channel::bounded(8);
3336                let task_name = format!("connection-{}", multiaddr);
3337
3338                match address {
3339                    address_parse::AddressOrMultiStreamAddress::Address(address) => {
3340                        // As documented in the `PlatformRef` trait, `connect_stream` must
3341                        // return as soon as possible.
3342                        let connection = task.platform.connect_stream(address).await;
3343
3344                        let (connection_id, connection_task) =
3345                            task.network.add_single_stream_connection(
3346                                task.platform.now(),
3347                                service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
3348                                    is_initiator: true,
3349                                    noise_key: &noise_key,
3350                                },
3351                                multiaddr.clone().into_bytes(),
3352                                Some(expected_peer_id.clone()),
3353                                coordinator_to_connection_tx,
3354                            );
3355
3356                        task.platform.spawn_task(
3357                            task_name.into(),
3358                            tasks::single_stream_connection_task::<TPlat>(
3359                                connection,
3360                                multiaddr.to_string(),
3361                                task.platform.clone(),
3362                                connection_id,
3363                                connection_task,
3364                                coordinator_to_connection_rx,
3365                                task.tasks_messages_tx.clone(),
3366                            ),
3367                        );
3368                    }
3369                    address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
3370                        platform::MultiStreamAddress::WebRtc {
3371                            ip,
3372                            port,
3373                            remote_certificate_sha256,
3374                        },
3375                    ) => {
3376                        // We need to know the local TLS certificate in order to insert the
3377                        // connection, and as such we need to call `connect_multistream` here.
3378                        // As documented in the `PlatformRef` trait, `connect_multistream` must
3379                        // return as soon as possible.
3380                        let connection = task
3381                            .platform
3382                            .connect_multistream(platform::MultiStreamAddress::WebRtc {
3383                                ip,
3384                                port,
3385                                remote_certificate_sha256,
3386                            })
3387                            .await;
3388
3389                        // Convert the SHA256 hashes into multihashes.
3390                        let local_tls_certificate_multihash = [18u8, 32]
3391                            .into_iter()
3392                            .chain(connection.local_tls_certificate_sha256.into_iter())
3393                            .collect();
3394                        let remote_tls_certificate_multihash = [18u8, 32]
3395                            .into_iter()
3396                            .chain(remote_certificate_sha256.iter().copied())
3397                            .collect();
3398
3399                        let (connection_id, connection_task) =
3400                            task.network.add_multi_stream_connection(
3401                                task.platform.now(),
3402                                service::MultiStreamHandshakeKind::WebRtc {
3403                                    is_initiator: true,
3404                                    local_tls_certificate_multihash,
3405                                    remote_tls_certificate_multihash,
3406                                    noise_key: &noise_key,
3407                                },
3408                                multiaddr.clone().into_bytes(),
3409                                Some(expected_peer_id.clone()),
3410                                coordinator_to_connection_tx,
3411                            );
3412
3413                        task.platform.spawn_task(
3414                            task_name.into(),
3415                            tasks::webrtc_multi_stream_connection_task::<TPlat>(
3416                                connection.connection,
3417                                multiaddr.to_string(),
3418                                task.platform.clone(),
3419                                connection_id,
3420                                connection_task,
3421                                coordinator_to_connection_rx,
3422                                task.tasks_messages_tx.clone(),
3423                            ),
3424                        );
3425                    }
3426                }
3427            }
3428            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
3429                task.network
3430                    .gossip_open(
3431                        chain_id,
3432                        &peer_id,
3433                        service::GossipKind::ConsensusTransactions,
3434                    )
3435                    .unwrap();
3436
3437                log!(
3438                    &task.platform,
3439                    Debug,
3440                    "network",
3441                    "gossip-open-start",
3442                    chain = &task.network[chain_id].log_name,
3443                    peer_id,
3444                );
3445            }
3446            WakeUpReason::CanOpenBitswap(peer_id) => {
3447                task.network.bitswap_open(&peer_id).unwrap();
3448
3449                log!(
3450                    &task.platform,
3451                    Debug,
3452                    "network",
3453                    "bitswap-open-start",
3454                    peer_id
3455                );
3456            }
3457            WakeUpReason::MessageToConnection {
3458                connection_id,
3459                message,
3460            } => {
3461                // Note that it is critical for the sending to not take too long here, in order to
3462                // not block the process of the network service.
3463                // In particular, if sending the message to the connection is blocked due to
3464                // sending a message on the connection-to-coordinator channel, this will result
3465                // in a deadlock.
3466                // For this reason, the connection task is always ready to immediately accept a
3467                // message on the coordinator-to-connection channel.
3468                let _send_result = task.network[connection_id].send(message).await;
3469                debug_assert!(_send_result.is_ok());
3470            }
3471        }
3472    }
3473}