use crate::util;
use alloc::collections::BTreeSet;
use core::ops;
use rand::seq::IteratorRandom as _;
use rand_chacha::{
ChaCha20Rng,
rand_core::{RngCore as _, SeedableRng as _},
};
pub use crate::libp2p::PeerId;
#[derive(Debug)]
pub struct BitswapPeeringStrategy<TInstant> {
peer_ids: slab::Slab<PeerId>,
peer_ids_indices: hashbrown::HashMap<PeerId, usize, util::SipHasherBuild>,
peers: hashbrown::HashMap<usize, (PeerState<TInstant>, u32), fnv::FnvBuildHasher>,
peers_by_state: BTreeSet<(PeerState<TInstant>, usize)>,
randomness: ChaCha20Rng,
}
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
enum PeerState<TInstant> {
Assignable,
Banned { expires: TInstant },
Slot,
}
pub struct Config {
pub randomness_seed: [u8; 32],
pub peers_capacity: usize,
}
impl<TInstant> BitswapPeeringStrategy<TInstant>
where
TInstant: PartialOrd + Ord + Eq + Clone,
{
pub fn new(config: Config) -> Self {
let mut randomness = ChaCha20Rng::from_seed(config.randomness_seed);
BitswapPeeringStrategy {
peer_ids: slab::Slab::with_capacity(config.peers_capacity),
peer_ids_indices: hashbrown::HashMap::with_capacity_and_hasher(
config.peers_capacity,
util::SipHasherBuild::new({
let mut seed = [0; 16];
randomness.fill_bytes(&mut seed);
seed
}),
),
peers: hashbrown::HashMap::with_hasher(fnv::FnvBuildHasher::default()),
peers_by_state: BTreeSet::new(),
randomness,
}
}
pub fn increase_peer_connections(&mut self, peer_id: &PeerId) {
let peer_id_index = self.get_or_insert_peer_index(peer_id);
match self.peers.get_mut(&peer_id_index) {
Some((_, num_connections)) => {
*num_connections = num_connections
.checked_add(1)
.unwrap_or_else(|| panic!("overflow in number of connections"));
}
None => {
self.peers.insert(peer_id_index, (PeerState::Assignable, 1));
let _was_inserted = self
.peers_by_state
.insert((PeerState::Assignable, peer_id_index));
debug_assert!(_was_inserted);
}
}
}
pub fn decrease_peer_connections(
&mut self,
peer_id: &PeerId,
) -> Result<(), DecreasePeerConnectionsError> {
let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
return Err(DecreasePeerConnectionsError::UnknownPeer);
};
let (state, num_connections) = self
.peers
.get_mut(&peer_id_index)
.unwrap_or_else(|| unreachable!());
*num_connections -= 1;
if *num_connections == 0 {
let state = state.clone();
self.peers.remove(&peer_id_index);
let _was_removed = self.peers_by_state.remove(&(state, peer_id_index));
debug_assert!(_was_removed);
let peer_id = self.peer_ids.remove(peer_id_index);
let _was_in = self.peer_ids_indices.remove(&peer_id);
debug_assert_eq!(_was_in, Some(peer_id_index));
}
Ok(())
}
pub fn pick_assignable_peer(&mut self, now: &TInstant) -> AssignablePeer<'_, TInstant> {
if let Some((_, peer_id_index)) = self
.peers_by_state
.range(
(PeerState::Assignable, usize::MIN)
..=(
PeerState::Banned {
expires: now.clone(),
},
usize::MAX,
),
)
.choose(&mut self.randomness)
{
return AssignablePeer::Assignable(&self.peer_ids[*peer_id_index]);
}
if let Some((state, _)) = self
.peers_by_state
.range((
ops::Bound::Excluded((
PeerState::Banned {
expires: now.clone(),
},
usize::MAX,
)),
ops::Bound::Excluded((PeerState::Slot, usize::MIN)),
))
.next()
{
let PeerState::Banned { expires } = state else {
unreachable!()
};
AssignablePeer::AllPeersBanned {
next_unban: expires,
}
} else {
AssignablePeer::NoPeer
}
}
pub fn assign_slot(&mut self, peer_id: &PeerId) -> Result<(), AssignSlotError> {
let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
return Err(AssignSlotError::UnknownPeer);
};
let (state, _) = self
.peers
.get_mut(&peer_id_index)
.unwrap_or_else(|| unreachable!());
let _was_removed = self.peers_by_state.remove(&(state.clone(), peer_id_index));
debug_assert!(_was_removed);
*state = PeerState::Slot;
let _was_inserted = self.peers_by_state.insert((PeerState::Slot, peer_id_index));
debug_assert!(_was_inserted);
Ok(())
}
pub fn unassign_slot_and_ban(
&mut self,
peer_id: &PeerId,
when_unban: TInstant,
) -> UnassignSlotAndBan {
let Some(&peer_id_index) = self.peer_ids_indices.get(peer_id) else {
return UnassignSlotAndBan::UnknownPeer;
};
let (state, _) = self
.peers
.get_mut(&peer_id_index)
.unwrap_or_else(|| unreachable!());
let return_value = match state {
PeerState::Banned { expires } if *expires >= when_unban => {
return UnassignSlotAndBan::Banned { had_slot: false };
}
PeerState::Banned { .. } => UnassignSlotAndBan::Banned { had_slot: false },
PeerState::Assignable => UnassignSlotAndBan::Banned { had_slot: false },
PeerState::Slot => UnassignSlotAndBan::Banned { had_slot: true },
};
let _was_in = self.peers_by_state.remove(&(state.clone(), peer_id_index));
debug_assert!(_was_in);
*state = PeerState::Banned {
expires: when_unban,
};
let _was_inserted = self.peers_by_state.insert((state.clone(), peer_id_index));
debug_assert!(_was_inserted);
return_value
}
fn get_or_insert_peer_index(&mut self, peer_id: &PeerId) -> usize {
debug_assert_eq!(self.peer_ids.len(), self.peer_ids_indices.len());
match self.peer_ids_indices.raw_entry_mut().from_key(peer_id) {
hashbrown::hash_map::RawEntryMut::Occupied(occupied_entry) => *occupied_entry.get(),
hashbrown::hash_map::RawEntryMut::Vacant(vacant_entry) => {
let idx = self.peer_ids.insert(peer_id.clone());
vacant_entry.insert(peer_id.clone(), idx);
idx
}
}
}
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum DecreasePeerConnectionsError {
UnknownPeer,
}
pub enum AssignablePeer<'a, TInstant> {
Assignable(&'a PeerId),
AllPeersBanned {
next_unban: &'a TInstant,
},
NoPeer,
}
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum AssignSlotError {
UnknownPeer,
}
pub enum UnassignSlotAndBan {
UnknownPeer,
Banned {
had_slot: bool,
},
}
#[cfg(test)]
mod tests {
use super::*;
use crate::libp2p::peer_id::PublicKey;
fn make_peer(n: u8) -> PeerId {
PeerId::from_public_key(&PublicKey::Ed25519([n; 32]))
}
fn new_strategy() -> BitswapPeeringStrategy<u64> {
BitswapPeeringStrategy::new(Config {
randomness_seed: [0; 32],
peers_capacity: 0,
})
}
#[test]
fn new_strategy_is_empty() {
let mut s = new_strategy();
assert!(matches!(s.pick_assignable_peer(&0), AssignablePeer::NoPeer));
}
#[test]
fn increase_adds_peer() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
assert!(matches!(
s.pick_assignable_peer(&0),
AssignablePeer::Assignable(_)
));
}
#[test]
fn increase_twice_same_peer() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.increase_peer_connections(&peer);
s.decrease_peer_connections(&peer).unwrap();
assert!(matches!(
s.pick_assignable_peer(&0),
AssignablePeer::Assignable(_)
));
}
#[test]
fn decrease_unknown_peer_errors() {
let mut s = new_strategy();
let peer = make_peer(1);
assert!(matches!(
s.decrease_peer_connections(&peer),
Err(DecreasePeerConnectionsError::UnknownPeer)
));
}
#[test]
fn decrease_to_zero_removes_peer() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.decrease_peer_connections(&peer).unwrap();
assert!(matches!(s.pick_assignable_peer(&0), AssignablePeer::NoPeer));
}
#[test]
fn assign_slot() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.assign_slot(&peer).unwrap();
assert!(matches!(s.pick_assignable_peer(&0), AssignablePeer::NoPeer));
}
#[test]
fn assign_slot_unknown_peer_errors() {
let mut s = new_strategy();
let peer = make_peer(1);
assert!(matches!(
s.assign_slot(&peer),
Err(AssignSlotError::UnknownPeer)
));
}
#[test]
fn unassign_slot_and_ban_with_slot() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.assign_slot(&peer).unwrap();
assert!(matches!(
s.unassign_slot_and_ban(&peer, 100),
UnassignSlotAndBan::Banned { had_slot: true }
));
}
#[test]
fn ban_assignable_peer() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
assert!(matches!(
s.unassign_slot_and_ban(&peer, 100),
UnassignSlotAndBan::Banned { had_slot: false }
));
}
#[test]
fn ban_unknown_peer() {
let mut s = new_strategy();
let peer = make_peer(1);
assert!(matches!(
s.unassign_slot_and_ban(&peer, 100),
UnassignSlotAndBan::UnknownPeer
));
}
#[test]
fn ban_extension_keeps_longer_ban() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.unassign_slot_and_ban(&peer, 200);
s.unassign_slot_and_ban(&peer, 100);
assert!(matches!(
s.pick_assignable_peer(&150),
AssignablePeer::AllPeersBanned { .. }
));
}
#[test]
fn ban_extension_extends_shorter_ban() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.unassign_slot_and_ban(&peer, 100);
s.unassign_slot_and_ban(&peer, 200);
assert!(matches!(
s.pick_assignable_peer(&150),
AssignablePeer::AllPeersBanned { .. }
));
}
#[test]
fn banned_peer_not_picked() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.unassign_slot_and_ban(&peer, 100);
assert!(matches!(
s.pick_assignable_peer(&50),
AssignablePeer::AllPeersBanned { .. }
));
}
#[test]
fn banned_peer_picked_after_expiry() {
let mut s = new_strategy();
let peer = make_peer(1);
s.increase_peer_connections(&peer);
s.unassign_slot_and_ban(&peer, 100);
assert!(matches!(
s.pick_assignable_peer(&100),
AssignablePeer::Assignable(_)
));
}
#[test]
fn all_peers_banned_returns_next_unban() {
let mut s = new_strategy();
let peer1 = make_peer(1);
let peer2 = make_peer(2);
s.increase_peer_connections(&peer1);
s.increase_peer_connections(&peer2);
s.unassign_slot_and_ban(&peer1, 200);
s.unassign_slot_and_ban(&peer2, 300);
match s.pick_assignable_peer(&150) {
AssignablePeer::AllPeersBanned { next_unban } => {
assert_eq!(*next_unban, 200);
}
_ => panic!("expected AllPeersBanned"),
}
}
#[test]
fn multiple_peers_picks_only_assignable() {
let mut s = new_strategy();
let peer1 = make_peer(1);
let peer2 = make_peer(2);
let peer3 = make_peer(3);
s.increase_peer_connections(&peer1);
s.increase_peer_connections(&peer2);
s.increase_peer_connections(&peer3);
s.unassign_slot_and_ban(&peer1, 1000);
s.assign_slot(&peer2).unwrap();
match s.pick_assignable_peer(&0) {
AssignablePeer::Assignable(p) => assert_eq!(*p, peer3),
_ => panic!("expected Assignable(peer3)"),
}
}
#[test]
fn peer_state_ordering() {
assert!(PeerState::<u64>::Assignable < PeerState::Banned { expires: 0 });
assert!(PeerState::<u64>::Banned { expires: 5 } < PeerState::Banned { expires: 7 });
assert!(PeerState::<u64>::Banned { expires: u64::MAX } < PeerState::Slot);
}
}