smoldot/network/
basic_peering_strategy.rs

1// Smoldot
2// Copyright (C) 2023  Pierre Krieger
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Basic address book and slots assignments algorithm.
19//!
20//! The [`BasicPeeringStrategy`] contains a collection of network identities, identified by
21//! a [`PeerId`]. Each network identity is associated to one or more chains, identified by a
22//! `TChainId`.
23//!
24//! Each network-identity-chain association can be in one of these three states:
25//!
26//! - Normal.
27//! - Banned until a certain instant represented by `TInstant`.
28//! - Has a slot.
29//!
30//! "Normal" and "banned" network-identity-chain associations represent the potential peers to
31//! connect to, while "slot" represent pending or established gossip slots.
32//!
33//! Use [`BasicPeeringStrategy::pick_assignable_peer`] in order to assign a slot to a
34//! randomly-chosen network-identity that doesn't currently have one.
35//!
36//! If a gossip slot fails to be established with a certain peer, or if the peer misbehaves,
37//! use [`BasicPeeringStrategy::unassign_slot_and_ban`] to ban the peer, preventing it from
38//! obtaining a slot for a certain amount of time.
39//!
40//! Each network identity that is associated with at least one chain is associated with zero or
41//! more addresses. It is not possible to insert addresses to peers that aren't associated to at
42//! least one chain. The number of active connections of each address is also tracked.
43//!
44//! There exists a limit to the number of peers per chain and the number of addresses per peer,
45//! guaranteeing that the data structure only uses a bounded amount of memory. If these limits
46//! are reached, peers and addresses are removed randomly. Peers that have a slot and at least one
47//! connected address are never removed.
48//!
49
50use crate::util;
51use alloc::{
52    borrow::ToOwned as _,
53    collections::{BTreeMap, BTreeSet, btree_map},
54    vec::Vec,
55};
56use core::{hash::Hash, iter, ops};
57use rand::seq::IteratorRandom as _;
58use rand_chacha::{
59    ChaCha20Rng,
60    rand_core::{RngCore as _, SeedableRng as _},
61};
62
63pub use crate::libp2p::PeerId;
64
65#[derive(Debug)]
66pub struct BasicPeeringStrategy<TChainId, TInstant> {
67    /// Contains all the `PeerId`s used throughout the collection.
68    peer_ids: slab::Slab<PeerId>,
69
70    /// Contains all the keys of [`BasicPeeringStrategy::peer_ids`] indexed differently.
71    peer_ids_indices: hashbrown::HashMap<PeerId, usize, util::SipHasherBuild>,
72
73    /// List of all known addresses, indexed by `peer_id_index`, with the number of connections to
74    /// each address. The addresses are not intended to be in a particular order.
75    addresses: BTreeMap<(usize, Vec<u8>), u32>,
76
77    /// List of all chains throughout the collection.
78    ///
79    /// > **Note**: In principle this field is completely unnecessary. In practice, however, we
80    /// >           can't use `BTreeMap::range` with `TChainId`s because we don't know the minimum
81    /// >           and maximum values of a `TChainId`. In order to bypass this problem,
82    /// >           `TChainId`s are instead refered to as a `usize`.
83    chains: slab::Slab<TChainId>,
84
85    /// Contains all the keys of [`BasicPeeringStrategy::chains`] indexed differently.
86    /// While a dumber hasher is in principle enough, we use a `SipHasherBuild` "just in case"
87    /// as we don't know the properties of `TChainId`.
88    chains_indices: hashbrown::HashMap<TChainId, usize, util::SipHasherBuild>,
89
90    /// Collection of
91    /// Keys are `(peer_id_index, chain_id_index)`.
92    peers_chains: BTreeMap<(usize, usize), PeerChainState<TInstant>>,
93
94    /// Entries are `(chain_id_index, state, peer_id_index)`.
95    peers_chains_by_state: BTreeSet<(usize, PeerChainState<TInstant>, usize)>,
96
97    /// Random number generator used to select peers to assign slots to and remove addresses/peers.
98    randomness: ChaCha20Rng,
99}
100
101#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
102enum PeerChainState<TInstant> {
103    Assignable,
104    Banned { expires: TInstant },
105    Slot,
106}
107
108/// Configuration passed to [`BasicPeeringStrategy::new`].
109pub struct Config {
110    /// Seed used for the randomness for choosing peers and addresses to connect to or remove.
111    pub randomness_seed: [u8; 32],
112
113    /// Number of peers, all chains together, to initially reserve memory for.
114    pub peers_capacity: usize,
115
116    /// Number of chains to initially reserve memory for.
117    pub chains_capacity: usize,
118}
119
120impl<TChainId, TInstant> BasicPeeringStrategy<TChainId, TInstant>
121where
122    TChainId: PartialOrd + Ord + Eq + Hash + Clone,
123    TInstant: PartialOrd + Ord + Eq + Clone,
124{
125    /// Creates a new empty [`BasicPeeringStrategy`].
126    ///
127    /// Must be passed a seed for randomness used
128    /// in [`BasicPeeringStrategy::pick_assignable_peer`].
129    pub fn new(config: Config) -> Self {
130        let mut randomness = ChaCha20Rng::from_seed(config.randomness_seed);
131
132        BasicPeeringStrategy {
133            peer_ids: slab::Slab::with_capacity(config.peers_capacity),
134            peer_ids_indices: hashbrown::HashMap::with_capacity_and_hasher(
135                config.peers_capacity,
136                util::SipHasherBuild::new({
137                    let mut seed = [0; 16];
138                    randomness.fill_bytes(&mut seed);
139                    seed
140                }),
141            ),
142            addresses: BTreeMap::new(),
143            chains: slab::Slab::with_capacity(config.chains_capacity),
144            chains_indices: hashbrown::HashMap::with_capacity_and_hasher(
145                config.chains_capacity,
146                util::SipHasherBuild::new({
147                    let mut seed = [0; 16];
148                    randomness.fill_bytes(&mut seed);
149                    seed
150                }),
151            ),
152            peers_chains: BTreeMap::new(),
153            peers_chains_by_state: BTreeSet::new(),
154            randomness,
155        }
156    }
157
158    /// Removes all the chain assignments for the given chain.
159    ///
160    /// If a peer isn't assigned to any chain anymore and doesn't have any connected address,
161    /// all of its addresses are also removed from the collection.
162    pub fn remove_chain_peers(&mut self, chain: &TChainId) {
163        let Some(chain_index) = self.chains_indices.remove(chain) else {
164            // Chain didn't exist.
165            return;
166        };
167        self.chains.remove(chain_index);
168
169        let chain_peers = {
170            let mut in_chain_and_after_chain = self.peers_chains_by_state.split_off(&(
171                chain_index,
172                PeerChainState::Assignable,
173                usize::MIN,
174            ));
175            let mut after_chain = in_chain_and_after_chain.split_off(&(
176                chain_index + 1,
177                PeerChainState::Assignable,
178                usize::MIN,
179            ));
180            self.peers_chains_by_state.append(&mut after_chain);
181            in_chain_and_after_chain
182        };
183
184        for (_, _, peer_id_index) in chain_peers {
185            let _was_in = self.peers_chains.remove(&(peer_id_index, chain_index));
186            debug_assert!(_was_in.is_some());
187            self.try_clean_up_peer_id(peer_id_index);
188        }
189    }
190
191    /// Inserts a chain-peer combination to the collection, indicating that the given peer belongs
192    /// to the given chain.
193    ///
194    /// Has no effect if the peer is already assigned to the given chain, in which case
195    /// [`InsertChainPeerResult::Duplicate`] is returned.
196    ///
197    /// A maximum number of peers per chain must be provided. If the peer is inserted and the
198    /// limit is exceeded, a peer (other than the one that has just been inserted) that belongs
199    /// to the given chain is randomly chosen and removed. Peers that have slots assigned to them
200    /// are never removed.
201    pub fn insert_chain_peer(
202        &mut self,
203        chain: TChainId,
204        peer_id: PeerId,
205        max_peers_per_chain: usize,
206    ) -> InsertChainPeerResult {
207        let peer_id_index = self.get_or_insert_peer_index(&peer_id);
208        let chain_index = self.get_or_insert_chain_index(&chain);
209
210        if let btree_map::Entry::Vacant(entry) =
211            self.peers_chains.entry((peer_id_index, chain_index))
212        {
213            let peer_to_remove = if self
214                .peers_chains_by_state
215                .range(
216                    (chain_index, PeerChainState::Assignable, usize::MIN)
217                        ..=(chain_index, PeerChainState::Slot, usize::MAX),
218                )
219                .count()
220                >= max_peers_per_chain
221            {
222                self.peers_chains_by_state
223                    .range(
224                        (chain_index, PeerChainState::Assignable, usize::MIN)
225                            ..(chain_index, PeerChainState::Slot, usize::MIN),
226                    )
227                    .choose(&mut self.randomness)
228                    .map(|(_, _, peer_index)| *peer_index)
229            } else {
230                None
231            };
232
233            let _was_inserted = self.peers_chains_by_state.insert((
234                chain_index,
235                PeerChainState::Assignable,
236                peer_id_index,
237            ));
238            debug_assert!(_was_inserted);
239
240            entry.insert(PeerChainState::Assignable);
241
242            let peer_removed = if let Some(peer_to_remove) = peer_to_remove {
243                let peer_id_to_remove = self.peer_ids[peer_to_remove].clone();
244                let state = self
245                    .peers_chains
246                    .remove(&(peer_to_remove, chain_index))
247                    .unwrap_or_else(|| unreachable!());
248                debug_assert!(!matches!(state, PeerChainState::Slot));
249                let _was_removed =
250                    self.peers_chains_by_state
251                        .remove(&(chain_index, state, peer_to_remove));
252                debug_assert!(_was_removed);
253                self.try_clean_up_peer_id(peer_to_remove);
254                Some(peer_id_to_remove)
255            } else {
256                None
257            };
258
259            InsertChainPeerResult::Inserted { peer_removed }
260        } else {
261            InsertChainPeerResult::Duplicate
262        }
263    }
264
265    /// Removes a peer-chain associated previously inserted with
266    /// [`BasicPeeringStrategy::insert_chain_peer`].
267    ///
268    /// Has no effect if the peer-chain association didn't exist.
269    ///
270    /// If the peer isn't assigned to any chain anymore and doesn't have any connected address,
271    /// all of its addresses are also removed from the collection.
272    pub fn unassign_slot_and_remove_chain_peer(
273        &mut self,
274        chain: &TChainId,
275        peer_id: &PeerId,
276    ) -> UnassignSlotAndRemoveChainPeer<TInstant> {
277        let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
278            // If the `PeerId` is unknown, it means it wasn't assigned in the first place.
279            return UnassignSlotAndRemoveChainPeer::NotAssigned;
280        };
281
282        let Some(&chain_index) = self.chains_indices.get(chain) else {
283            // If the `TChainId` is unknown, it means the peer wasn't assigned in the first place.
284            return UnassignSlotAndRemoveChainPeer::NotAssigned;
285        };
286
287        if let Some(state) = self.peers_chains.remove(&(peer_id_index, chain_index)) {
288            let _was_removed =
289                self.peers_chains_by_state
290                    .remove(&(chain_index, state.clone(), peer_id_index));
291            debug_assert!(_was_removed);
292
293            self.try_clean_up_peer_id(peer_id_index);
294            self.try_clean_up_chain(chain_index);
295
296            match state {
297                PeerChainState::Assignable => UnassignSlotAndRemoveChainPeer::Assigned {
298                    ban_expiration: None,
299                },
300                PeerChainState::Banned { expires } => UnassignSlotAndRemoveChainPeer::Assigned {
301                    ban_expiration: Some(expires),
302                },
303                PeerChainState::Slot => UnassignSlotAndRemoveChainPeer::HadSlot,
304            }
305        } else {
306            UnassignSlotAndRemoveChainPeer::NotAssigned
307        }
308    }
309
310    /// Returns the list of all peers that are known to belong to the given chain, in other
311    /// words peers added through [`BasicPeeringStrategy::insert_chain_peer`].
312    ///
313    /// The order of the yielded elements is unspecified.
314    pub fn chain_peers_unordered(&self, chain: &TChainId) -> impl Iterator<Item = &PeerId> {
315        let Some(&chain_index) = self.chains_indices.get(chain) else {
316            // If the `TChainId` is unknown, it means that it doesn't have any peer.
317            return either::Right(iter::empty());
318        };
319
320        either::Left(
321            self.peers_chains_by_state
322                .range(
323                    (chain_index, PeerChainState::Assignable, usize::MIN)
324                        ..=(chain_index, PeerChainState::Slot, usize::MAX),
325                )
326                .map(|(_, _, p)| &self.peer_ids[*p]),
327        )
328    }
329
330    /// Inserts a new address for the given peer.
331    ///
332    /// If the address wasn't known yet, its number of connections is set to zero.
333    ///
334    /// If the peer doesn't belong to any chain (see [`BasicPeeringStrategy::insert_chain_peer`]),
335    /// then this function has no effect, unless the peer has at least one connected address. This
336    /// is to avoid accidentally collecting addresses for peers that will never be removed and
337    /// create a memory leak. For this reason, you most likely want to call
338    /// [`BasicPeeringStrategy::insert_chain_peer`] before calling this function.
339    ///
340    /// A maximum number of addresses that are maintained for this peer must be passed as
341    /// parameter. If this number is exceeded, an address with zero connections (other than
342    /// the one passed as parameter) is randomly removed.
343    pub fn insert_address(
344        &mut self,
345        peer_id: &PeerId,
346        address: Vec<u8>,
347        max_addresses: usize,
348    ) -> InsertAddressResult {
349        let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
350            return InsertAddressResult::UnknownPeer;
351        };
352
353        match self.insert_address_inner(peer_id_index, address, max_addresses, 0, false) {
354            InsertAddressConnectionsResult::AlreadyKnown => InsertAddressResult::AlreadyKnown,
355            InsertAddressConnectionsResult::Inserted { address_removed } => {
356                InsertAddressResult::Inserted { address_removed }
357            }
358        }
359    }
360
361    /// Increases the number of connections of the given address. If the address isn't known, it
362    /// is inserted.
363    ///
364    /// Contrary to [`BasicPeeringStrategy::insert_address`], the address is inserted anyway if
365    /// the `PeerId` isn't known.
366    ///
367    /// > **Note**: Use this function if you establish a connection and accidentally reach a
368    /// >           certain [`PeerId`].
369    ///
370    /// # Panic
371    ///
372    /// Panics if the number of connections is equal to `u32::MAX`.
373    ///
374    pub fn increase_address_connections(
375        &mut self,
376        peer_id: &PeerId,
377        address: Vec<u8>,
378        max_addresses: usize,
379    ) -> InsertAddressConnectionsResult {
380        let peer_id_index = self.get_or_insert_peer_index(peer_id);
381        self.insert_address_inner(peer_id_index, address, max_addresses, 1, true)
382    }
383
384    fn insert_address_inner(
385        &mut self,
386        peer_id_index: usize,
387        address: Vec<u8>,
388        max_addresses: usize,
389        initial_num_connections: u32,
390        increase_if_present: bool,
391    ) -> InsertAddressConnectionsResult {
392        match self.addresses.entry((peer_id_index, address.clone())) {
393            btree_map::Entry::Vacant(entry) => {
394                entry.insert(initial_num_connections);
395
396                let address_removed = {
397                    let num_addresses = self
398                        .addresses
399                        .range((peer_id_index, Vec::new())..=(peer_id_index + 1, Vec::new()))
400                        .count();
401
402                    if num_addresses >= max_addresses {
403                        // TODO: is it a good idea to choose the address randomly to remove? maybe there should be a sorting system with best addresses first?
404                        self.addresses
405                            .range((peer_id_index, Vec::new())..=(peer_id_index + 1, Vec::new()))
406                            .filter(|((_, a), n)| **n == 0 && *a != address)
407                            .choose(&mut self.randomness)
408                            .map(|((_, a), _)| a.clone())
409                    } else {
410                        None
411                    }
412                };
413
414                if let Some(address_removed) = address_removed.as_ref() {
415                    self.addresses
416                        .remove(&(peer_id_index, address_removed.clone()));
417                }
418
419                InsertAddressConnectionsResult::Inserted { address_removed }
420            }
421            btree_map::Entry::Occupied(entry) => {
422                let entry = entry.into_mut();
423                if increase_if_present {
424                    *entry = entry
425                        .checked_add(1)
426                        .unwrap_or_else(|| panic!("overflow in number of connections"));
427                }
428
429                InsertAddressConnectionsResult::AlreadyKnown
430            }
431        }
432    }
433
434    /// Returns the list of all addresses that have been inserted for the given peer.
435    pub fn peer_addresses(&self, peer_id: &PeerId) -> impl Iterator<Item = &[u8]> {
436        let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
437            // If the `PeerId` is unknown, it means it doesn't have any address.
438            return either::Right(iter::empty());
439        };
440
441        either::Left(
442            self.addresses
443                .range((peer_id_index, Vec::new())..(peer_id_index + 1, Vec::new()))
444                .map(|((_, a), _)| &a[..]),
445        )
446    }
447
448    /// Chooses a [`PeerId`] that is known to belong to the given chain, that is not banned, and
449    /// that doesn't have a slot assigned to it.
450    ///
451    /// A `TInstant` must be provided in order to determine whether past bans have expired.
452    ///
453    /// If multiple peers can be assigned a slot, the one returned is chosen randomly. Calling
454    /// this function multiple times might return different peers.
455    /// For this reason, this function requires `&mut self`.
456    ///
457    /// Note that this function might return a peer for which no address is present. While this is
458    /// often not desirable, it is preferable to keep the API simple and straight-forward rather
459    /// than try to be smart about function behaviours.
460    pub fn pick_assignable_peer(
461        &mut self,
462        chain: &TChainId,
463        now: &TInstant,
464    ) -> AssignablePeer<'_, TInstant> {
465        let Some(&chain_index) = self.chains_indices.get(chain) else {
466            return AssignablePeer::NoPeer;
467        };
468
469        if let Some((_, _, peer_id_index)) = self
470            .peers_chains_by_state
471            .range(
472                (chain_index, PeerChainState::Assignable, usize::MIN)
473                    ..=(
474                        chain_index,
475                        PeerChainState::Banned {
476                            expires: now.clone(),
477                        },
478                        usize::MAX,
479                    ),
480            )
481            .choose(&mut self.randomness)
482        {
483            return AssignablePeer::Assignable(&self.peer_ids[*peer_id_index]);
484        }
485
486        if let Some((_, state, _)) = self
487            .peers_chains_by_state
488            .range((
489                ops::Bound::Excluded((
490                    chain_index,
491                    PeerChainState::Banned {
492                        expires: now.clone(),
493                    },
494                    usize::MAX,
495                )),
496                ops::Bound::Excluded((chain_index, PeerChainState::Slot, usize::MIN)),
497            ))
498            .next()
499        {
500            let PeerChainState::Banned { expires } = state else {
501                unreachable!()
502            };
503            AssignablePeer::AllPeersBanned {
504                next_unban: expires,
505            }
506        } else {
507            AssignablePeer::NoPeer
508        }
509    }
510
511    /// Assigns a slot to the given peer on the given chain.
512    ///
513    /// Acts as an implicit call to [`BasicPeeringStrategy::insert_chain_peer`].
514    ///
515    /// A slot is assigned even if the peer is banned. API users that call this function are
516    /// expected to be aware of that.
517    pub fn assign_slot(&mut self, chain: &TChainId, peer_id: &PeerId) {
518        let peer_id_index = self.get_or_insert_peer_index(peer_id);
519        let chain_index = self.get_or_insert_chain_index(chain);
520
521        match self.peers_chains.entry((peer_id_index, chain_index)) {
522            btree_map::Entry::Occupied(e) => {
523                let _was_removed = self.peers_chains_by_state.remove(&(
524                    chain_index,
525                    e.get().clone(),
526                    peer_id_index,
527                ));
528                debug_assert!(_was_removed);
529                *e.into_mut() = PeerChainState::Slot;
530            }
531            btree_map::Entry::Vacant(e) => {
532                e.insert(PeerChainState::Slot);
533            }
534        }
535
536        let _was_inserted =
537            self.peers_chains_by_state
538                .insert((chain_index, PeerChainState::Slot, peer_id_index));
539        debug_assert!(_was_inserted);
540    }
541
542    /// Unassign the slot that has been assigned to the given peer and bans the peer, preventing
543    /// it from being assigned a slot on this chain for a certain amount of time.
544    ///
545    /// Has no effect if the peer isn't assigned to the given chain.
546    ///
547    /// If the peer was already banned, the new ban expiration is `max(existing_ban, when_unban)`.
548    ///
549    /// Returns what this function did.
550    pub fn unassign_slot_and_ban(
551        &mut self,
552        chain: &TChainId,
553        peer_id: &PeerId,
554        when_unban: TInstant,
555    ) -> UnassignSlotAndBan<TInstant> {
556        let (Some(&peer_id_index), Some(&chain_index)) = (
557            self.peer_ids_indices.get(peer_id),
558            self.chains_indices.get(chain),
559        ) else {
560            return UnassignSlotAndBan::NotAssigned;
561        };
562
563        if let Some(state) = self.peers_chains.get_mut(&(peer_id_index, chain_index)) {
564            let return_value = match state {
565                PeerChainState::Banned { expires } if *expires >= when_unban => {
566                    // Ban is already long enough. Nothing to do.
567                    return UnassignSlotAndBan::AlreadyBanned {
568                        when_unban: expires.clone(),
569                        ban_extended: false,
570                    };
571                }
572                PeerChainState::Banned { .. } => UnassignSlotAndBan::AlreadyBanned {
573                    when_unban: when_unban.clone(),
574                    ban_extended: true,
575                },
576                PeerChainState::Assignable => UnassignSlotAndBan::Banned { had_slot: false },
577                PeerChainState::Slot => UnassignSlotAndBan::Banned { had_slot: true },
578            };
579
580            let _was_in =
581                self.peers_chains_by_state
582                    .remove(&(chain_index, state.clone(), peer_id_index));
583            debug_assert!(_was_in);
584
585            *state = PeerChainState::Banned {
586                expires: when_unban,
587            };
588
589            let _was_inserted =
590                self.peers_chains_by_state
591                    .insert((chain_index, state.clone(), peer_id_index));
592            debug_assert!(_was_inserted);
593
594            return_value
595        } else {
596            UnassignSlotAndBan::NotAssigned
597        }
598    }
599
600    /// Unassigns all the slots that have been assigned to the given peer and bans the peer,
601    /// preventing it from being assigned a slot for all of the chains it had a slot on for a
602    /// certain amount of time.
603    ///
604    /// Has no effect on chains the peer isn't assigned to.
605    ///
606    /// If the peer was already banned, the new ban expiration is `max(existing_ban, when_unban)`.
607    ///
608    /// Returns an iterator to the list of chains where the peer is now banned, and the details
609    /// of what has happened.
610    ///
611    /// > **Note**: This function is a shortcut for calling
612    /// >           [`BasicPeeringStrategy::unassign_slot_and_ban`] for all existing chains.
613    pub fn unassign_slots_and_ban(
614        &'_ mut self,
615        peer_id: &PeerId,
616        when_unban: TInstant,
617    ) -> UnassignSlotsAndBanIter<'_, TChainId, TInstant> {
618        let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
619            return UnassignSlotsAndBanIter {
620                chains: &self.chains,
621                peers_chains_by_state: &mut self.peers_chains_by_state,
622                inner_iter: None,
623                peer_id_index: 0,
624                when_unban,
625            };
626        };
627
628        UnassignSlotsAndBanIter {
629            chains: &self.chains,
630            peers_chains_by_state: &mut self.peers_chains_by_state,
631            inner_iter: Some(
632                self.peers_chains
633                    .range_mut((peer_id_index, usize::MIN)..=(peer_id_index, usize::MAX))
634                    .fuse(),
635            ),
636            peer_id_index,
637            when_unban,
638        }
639    }
640
641    /// Picks an address from the list with zero connections, and sets the number of connections
642    /// to one. Returns `None` if no such address is available.
643    pub fn pick_address_and_add_connection(&mut self, peer_id: &PeerId) -> Option<&[u8]> {
644        let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
645            // If the `PeerId` is unknown, it means it doesn't have any address.
646            return None;
647        };
648
649        // TODO: could be optimized further by removing filter() and adjusting the set
650        if let Some(((_, address), num_connections)) = self
651            .addresses
652            .range_mut((peer_id_index, Vec::new())..(peer_id_index + 1, Vec::new()))
653            .filter(|(_, num_connections)| **num_connections == 0)
654            .choose(&mut self.randomness)
655        {
656            *num_connections = 1;
657            return Some(address);
658        }
659
660        None
661    }
662
663    /// Removes one connection from the given address.
664    ///
665    /// Returns an error if the address isn't known to the data structure, or if there was no
666    /// connection.
667    pub fn decrease_address_connections(
668        &mut self,
669        peer_id: &PeerId,
670        address: &[u8],
671    ) -> Result<(), DecreaseAddressConnectionsError> {
672        self.decrease_address_connections_inner(peer_id, address, false)
673    }
674
675    /// Removes one connection from the given address. If this decreases the number of connections
676    /// from one to zero, the address is removed entirely.
677    ///
678    /// Returns an error if the address isn't known to the data structure, or if there was no
679    /// connection.
680    pub fn decrease_address_connections_and_remove_if_zero(
681        &mut self,
682        peer_id: &PeerId,
683        address: &[u8],
684    ) -> Result<(), DecreaseAddressConnectionsError> {
685        self.decrease_address_connections_inner(peer_id, address, true)
686    }
687
688    fn decrease_address_connections_inner(
689        &mut self,
690        peer_id: &PeerId,
691        address: &[u8],
692        remove_if_reaches_zero: bool,
693    ) -> Result<(), DecreaseAddressConnectionsError> {
694        let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
695            // If the `PeerId` is unknown, it means it doesn't have any address.
696            return Err(DecreaseAddressConnectionsError::UnknownAddress);
697        };
698
699        let Some(num_connections) = self.addresses.get_mut(&(peer_id_index, address.to_owned()))
700        else {
701            return Err(DecreaseAddressConnectionsError::UnknownAddress);
702        };
703
704        if *num_connections == 0 {
705            return Err(DecreaseAddressConnectionsError::NotConnected);
706        }
707
708        *num_connections -= 1;
709
710        if *num_connections != 0 {
711            return Ok(());
712        }
713
714        if remove_if_reaches_zero {
715            self.addresses.remove(&(peer_id_index, address.to_owned()));
716        }
717
718        self.try_clean_up_peer_id(peer_id_index);
719        Ok(())
720    }
721
722    /// Finds the index of the given `TChainId` in [`BasicPeeringStrategy::chains`], or inserts
723    /// one if there is none.
724    fn get_or_insert_chain_index(&mut self, chain: &TChainId) -> usize {
725        debug_assert_eq!(self.chains.len(), self.chains_indices.len());
726
727        match self.chains_indices.raw_entry_mut().from_key(chain) {
728            hashbrown::hash_map::RawEntryMut::Occupied(occupied_entry) => *occupied_entry.get(),
729            hashbrown::hash_map::RawEntryMut::Vacant(vacant_entry) => {
730                let idx = self.chains.insert(chain.clone());
731                vacant_entry.insert(chain.clone(), idx);
732                idx
733            }
734        }
735    }
736
737    /// Check if the given `TChainId` is still used within the collection. If no, removes it from
738    /// [`BasicPeeringStrategy::chains`].
739    fn try_clean_up_chain(&mut self, chain_index: usize) {
740        if self
741            .peers_chains_by_state
742            .range(
743                (chain_index, PeerChainState::Assignable, usize::MIN)
744                    ..=(chain_index, PeerChainState::Slot, usize::MAX),
745            )
746            .next()
747            .is_some()
748        {
749            return;
750        }
751
752        // Chain is unused. We can remove it.
753        let chain_id = self.chains.remove(chain_index);
754        let _was_in = self.chains_indices.remove(&chain_id);
755        debug_assert_eq!(_was_in, Some(chain_index));
756    }
757
758    /// Finds the index of the given [`PeerId`] in [`BasicPeeringStrategy::peer_ids`], or inserts
759    /// one if there is none.
760    fn get_or_insert_peer_index(&mut self, peer_id: &PeerId) -> usize {
761        debug_assert_eq!(self.peer_ids.len(), self.peer_ids_indices.len());
762
763        match self.peer_ids_indices.raw_entry_mut().from_key(peer_id) {
764            hashbrown::hash_map::RawEntryMut::Occupied(occupied_entry) => *occupied_entry.get(),
765            hashbrown::hash_map::RawEntryMut::Vacant(vacant_entry) => {
766                let idx = self.peer_ids.insert(peer_id.clone());
767                vacant_entry.insert(peer_id.clone(), idx);
768                idx
769            }
770        }
771    }
772
773    /// Check if the given [`PeerId`] is still used within the collection. If no, removes it from
774    /// [`BasicPeeringStrategy::peer_ids`].
775    fn try_clean_up_peer_id(&mut self, peer_id_index: usize) {
776        if self
777            .peers_chains
778            .range((peer_id_index, usize::MIN)..=(peer_id_index, usize::MAX))
779            .next()
780            .is_some()
781        {
782            return;
783        }
784
785        if self
786            .addresses
787            .range((peer_id_index, Vec::new())..(peer_id_index + 1, Vec::new()))
788            .any(|(_, num_connections)| *num_connections >= 1)
789        {
790            return;
791        }
792
793        // PeerId is unused. We can remove it.
794        let peer_id = self.peer_ids.remove(peer_id_index);
795        let _was_in = self.peer_ids_indices.remove(&peer_id);
796        debug_assert_eq!(_was_in, Some(peer_id_index));
797        for address in self
798            .addresses
799            .range((peer_id_index, Vec::new())..(peer_id_index + 1, Vec::new()))
800            .map(|((_, a), _)| a.clone())
801            .collect::<Vec<_>>()
802        {
803            let _was_removed = self.addresses.remove(&(peer_id_index, address));
804            debug_assert!(_was_removed.is_some());
805        }
806    }
807}
808
809/// See [`BasicPeeringStrategy::decrease_address_connections`].
810#[derive(Debug, derive_more::Display, derive_more::Error)]
811pub enum DecreaseAddressConnectionsError {
812    /// Address isn't known to the collection.
813    UnknownAddress,
814    /// The address didn't have any connection.
815    NotConnected,
816}
817
818/// See [`BasicPeeringStrategy::pick_assignable_peer`].
819pub enum AssignablePeer<'a, TInstant> {
820    /// An assignal peer was found. Note that the peer wasn't assigned yet.
821    Assignable(&'a PeerId),
822    /// No peer was found as all known un-assigned peers are currently in the "banned" state.
823    AllPeersBanned {
824        /// Instant when the first peer will be unbanned.
825        next_unban: &'a TInstant,
826    },
827    /// No un-assigned peer was found.
828    NoPeer,
829}
830
831/// See [`BasicPeeringStrategy::insert_chain_peer`].
832pub enum InsertChainPeerResult {
833    /// Peer-chain association has been successfully inserted.
834    Inserted {
835        /// If the maximum number of peers is reached, an old peer might have been removed. If so,
836        /// this contains the peer.
837        peer_removed: Option<PeerId>,
838    },
839    /// Peer-chain association was already inserted.
840    Duplicate,
841}
842
843/// See [`BasicPeeringStrategy::insert_address`].
844pub enum InsertAddressResult {
845    /// Address has been successfully inserted.
846    Inserted {
847        /// If the maximum number of addresses is reached, an old address might have been
848        /// removed. If so, this contains the address.
849        address_removed: Option<Vec<u8>>,
850    },
851    /// Address was already known.
852    AlreadyKnown,
853    /// The peer isn't associated to any chain, and as such the address was not inserted.
854    UnknownPeer,
855}
856
857/// See [`BasicPeeringStrategy::increase_address_connections`].
858pub enum InsertAddressConnectionsResult {
859    /// Address has been inserted.
860    Inserted {
861        /// If the maximum number of addresses is reached, an old address might have been
862        /// removed. If so, this contains the address.
863        address_removed: Option<Vec<u8>>,
864    },
865    /// Address was already known.
866    AlreadyKnown,
867}
868
869/// See [`BasicPeeringStrategy::unassign_slot_and_ban`].
870pub enum UnassignSlotAndBan<TInstant> {
871    /// Peer wasn't assigned to the given chain.
872    NotAssigned,
873    /// Peer was already banned.
874    AlreadyBanned {
875        /// When the peer is unbanned.
876        when_unban: TInstant,
877        /// `true` if the ban has been extended, in other words if the value of `when_unban` was
878        /// superior to the existing ban.
879        ban_extended: bool,
880    },
881    /// Peer wasn't banned and is now banned.
882    Banned {
883        /// `true` if the peer had a slot on the chain.
884        had_slot: bool,
885    },
886}
887
888impl<TInstant> UnassignSlotAndBan<TInstant> {
889    /// Returns `true` for [`UnassignSlotAndBan::Banned`] where `had_slot` is `true`.
890    pub fn had_slot(&self) -> bool {
891        matches!(self, UnassignSlotAndBan::Banned { had_slot: true })
892    }
893}
894
895/// See [`BasicPeeringStrategy::unassign_slot_and_remove_chain_peer`].
896pub enum UnassignSlotAndRemoveChainPeer<TInstant> {
897    /// Peer wasn't assigned to the given chain.
898    NotAssigned,
899    /// Peer was assigned to the given chain but didn't have a slot or was banned.
900    Assigned {
901        /// `Some` if the peer was banned. Contains the ban expiration.
902        ban_expiration: Option<TInstant>,
903    },
904    /// Peer was assigned to the given chain and had a slot.
905    HadSlot,
906}
907
908/// See [`BasicPeeringStrategy::unassign_slots_and_ban`].
909pub struct UnassignSlotsAndBanIter<'a, TChainId, TInstant>
910where
911    TInstant: PartialOrd + Ord + Eq + Clone,
912{
913    /// Same field as in [`BasicPeeringStrategy`].
914    chains: &'a slab::Slab<TChainId>,
915    /// Same field as in [`BasicPeeringStrategy`].
916    peers_chains_by_state: &'a mut BTreeSet<(usize, PeerChainState<TInstant>, usize)>,
917    /// Iterator within [`BasicPeeringStrategy::peers_chains`].
918    inner_iter:
919        Option<iter::Fuse<btree_map::RangeMut<'a, (usize, usize), PeerChainState<TInstant>>>>,
920    /// Parameter passed to [`BasicPeeringStrategy::unassign_slots_and_ban`]. Dummy value when
921    /// [`UnassignSlotsAndBanIter::inner_iter`] is `None`.
922    peer_id_index: usize,
923    /// Parameter passed to [`BasicPeeringStrategy::unassign_slots_and_ban`].
924    when_unban: TInstant,
925}
926
927/// See [`BasicPeeringStrategy::unassign_slots_and_ban`].
928pub enum UnassignSlotsAndBan<TInstant> {
929    /// Peer was already banned.
930    AlreadyBanned {
931        /// When the peer is unbanned.
932        when_unban: TInstant,
933        /// `true` if the ban has been extended, in other words if the value of `when_unban` was
934        /// superior to the existing ban.
935        ban_extended: bool,
936    },
937    /// Peer wasn't banned and is now banned.
938    Banned {
939        /// `true` if the peer had a slot on the chain.
940        had_slot: bool,
941    },
942}
943
944impl<'a, TChainId, TInstant> Iterator for UnassignSlotsAndBanIter<'a, TChainId, TInstant>
945where
946    TInstant: PartialOrd + Ord + Eq + Clone,
947{
948    type Item = (&'a TChainId, UnassignSlotsAndBan<TInstant>);
949
950    fn next(&mut self) -> Option<Self::Item> {
951        let inner_iter = self.inner_iter.as_mut()?;
952        let (&(_, chain_index), state) = inner_iter.next()?;
953
954        let return_value = match state {
955            PeerChainState::Banned { expires } if *expires >= self.when_unban => {
956                // Ban is already long enough. Nothing to do.
957                return Some((
958                    &self.chains[chain_index],
959                    UnassignSlotsAndBan::AlreadyBanned {
960                        when_unban: expires.clone(),
961                        ban_extended: false,
962                    },
963                ));
964            }
965            PeerChainState::Banned { .. } => UnassignSlotsAndBan::AlreadyBanned {
966                when_unban: self.when_unban.clone(),
967                ban_extended: true,
968            },
969            PeerChainState::Assignable => UnassignSlotsAndBan::Banned { had_slot: false },
970            PeerChainState::Slot => UnassignSlotsAndBan::Banned { had_slot: true },
971        };
972
973        let _was_in =
974            self.peers_chains_by_state
975                .remove(&(chain_index, state.clone(), self.peer_id_index));
976        debug_assert!(_was_in);
977
978        *state = PeerChainState::Banned {
979            expires: self.when_unban.clone(),
980        };
981
982        let _was_inserted =
983            self.peers_chains_by_state
984                .insert((chain_index, state.clone(), self.peer_id_index));
985        debug_assert!(_was_inserted);
986
987        Some((&self.chains[chain_index], return_value))
988    }
989
990    fn size_hint(&self) -> (usize, Option<usize>) {
991        self.inner_iter
992            .as_ref()
993            .map_or((0, Some(0)), |inner| inner.size_hint())
994    }
995}
996
997impl<'a, TChainId, TInstant> iter::FusedIterator for UnassignSlotsAndBanIter<'a, TChainId, TInstant> where
998    TInstant: PartialOrd + Ord + Eq + Clone
999{
1000}
1001
1002impl<'a, TChainId, TInstant> Drop for UnassignSlotsAndBanIter<'a, TChainId, TInstant>
1003where
1004    TInstant: PartialOrd + Ord + Eq + Clone,
1005{
1006    fn drop(&mut self) {
1007        // Note that this is safe because `UnassignSlotsAndBanIter` is a `FusedIterator`.
1008        while let Some(_) = self.next() {}
1009    }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::{
1015        BasicPeeringStrategy, Config, InsertAddressConnectionsResult, InsertAddressResult,
1016        InsertChainPeerResult,
1017    };
1018    use crate::network::service::{PeerId, peer_id::PublicKey};
1019    use core::time::Duration;
1020
1021    #[test]
1022    fn peer_state_ordering() {
1023        // The implementation above relies on the properties tested here.
1024        use super::PeerChainState;
1025        assert!(PeerChainState::Assignable < PeerChainState::Banned { expires: 0 });
1026        assert!(PeerChainState::Banned { expires: 5 } < PeerChainState::Banned { expires: 7 });
1027        assert!(PeerChainState::Banned { expires: u32::MAX } < PeerChainState::Slot);
1028    }
1029
1030    #[test]
1031    fn addresses_removed_when_peer_has_no_chain_association() {
1032        let mut bps = BasicPeeringStrategy::<u32, Duration>::new(Config {
1033            randomness_seed: [0; 32],
1034            peers_capacity: 0,
1035            chains_capacity: 0,
1036        });
1037
1038        let peer_id = PeerId::from_public_key(&PublicKey::Ed25519([0; 32]));
1039
1040        assert!(matches!(
1041            bps.insert_chain_peer(0, peer_id.clone(), usize::MAX),
1042            InsertChainPeerResult::Inserted { peer_removed: None }
1043        ));
1044
1045        assert!(matches!(
1046            bps.insert_address(&peer_id, Vec::new(), usize::MAX),
1047            InsertAddressResult::Inserted {
1048                address_removed: None
1049            }
1050        ));
1051
1052        assert_eq!(bps.peer_addresses(&peer_id).count(), 1);
1053        bps.unassign_slot_and_remove_chain_peer(&0, &peer_id);
1054        assert_eq!(bps.peer_addresses(&peer_id).count(), 0);
1055    }
1056
1057    #[test]
1058    fn addresses_not_removed_if_connected_when_peer_has_no_chain_association() {
1059        let mut bps = BasicPeeringStrategy::<u32, Duration>::new(Config {
1060            randomness_seed: [0; 32],
1061            peers_capacity: 0,
1062            chains_capacity: 0,
1063        });
1064
1065        let peer_id = PeerId::from_public_key(&PublicKey::Ed25519([0; 32]));
1066
1067        assert!(matches!(
1068            bps.insert_chain_peer(0, peer_id.clone(), usize::MAX),
1069            InsertChainPeerResult::Inserted { peer_removed: None }
1070        ));
1071
1072        assert!(matches!(
1073            bps.increase_address_connections(&peer_id, Vec::new(), usize::MAX),
1074            InsertAddressConnectionsResult::Inserted {
1075                address_removed: None
1076            }
1077        ));
1078
1079        assert!(matches!(
1080            bps.insert_address(&peer_id, vec![1], usize::MAX),
1081            InsertAddressResult::Inserted {
1082                address_removed: None
1083            }
1084        ));
1085
1086        assert_eq!(bps.peer_addresses(&peer_id).count(), 2);
1087        bps.unassign_slot_and_remove_chain_peer(&0, &peer_id);
1088        assert_eq!(bps.peer_addresses(&peer_id).count(), 2);
1089
1090        bps.decrease_address_connections(&peer_id, &[]).unwrap();
1091        assert_eq!(bps.peer_addresses(&peer_id).count(), 0);
1092    }
1093
1094    #[test]
1095    fn address_not_inserted_when_peer_has_no_chain_association() {
1096        let mut bps = BasicPeeringStrategy::<u32, Duration>::new(Config {
1097            randomness_seed: [0; 32],
1098            peers_capacity: 0,
1099            chains_capacity: 0,
1100        });
1101
1102        let peer_id = PeerId::from_public_key(&PublicKey::Ed25519([0; 32]));
1103
1104        assert!(matches!(
1105            bps.insert_address(&peer_id, Vec::new(), usize::MAX),
1106            InsertAddressResult::UnknownPeer
1107        ));
1108
1109        assert_eq!(bps.peer_addresses(&peer_id).count(), 0);
1110    }
1111
1112    #[test]
1113    fn address_connections_inserted_when_peer_has_no_chain_association() {
1114        let mut bps = BasicPeeringStrategy::<u32, Duration>::new(Config {
1115            randomness_seed: [0; 32],
1116            peers_capacity: 0,
1117            chains_capacity: 0,
1118        });
1119
1120        let peer_id = PeerId::from_public_key(&PublicKey::Ed25519([0; 32]));
1121
1122        assert!(matches!(
1123            bps.increase_address_connections(&peer_id, Vec::new(), usize::MAX),
1124            InsertAddressConnectionsResult::Inserted { .. }
1125        ));
1126
1127        assert_eq!(bps.peer_addresses(&peer_id).count(), 1);
1128    }
1129
1130    // TODO: more tests
1131}