use arrayvec::ArrayVec;
use bigint::{U512, U256};
use crate::kad_hash::KadHash;
use libp2p_core::PeerId;
use multihash::Multihash;
use std::num::NonZeroUsize;
use std::slice::IterMut as SliceIterMut;
use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter;
pub const MAX_NODES_PER_BUCKET: usize = 20;
#[derive(Debug, Clone)]
pub struct KBucketsTable<TPeerId, TVal> {
my_id: TPeerId,
tables: Vec<KBucket<TPeerId, TVal>>,
unresponsive_timeout: Duration,
}
#[derive(Debug, Clone)]
struct KBucket<TPeerId, TVal> {
nodes: ArrayVec<[Node<TPeerId, TVal>; MAX_NODES_PER_BUCKET]>,
first_connected_pos: usize,
pending_node: Option<PendingNode<TPeerId, TVal>>,
}
#[derive(Debug, Clone)]
struct PendingNode<TPeerId, TVal> {
node: Node<TPeerId, TVal>,
connected: bool,
replace: Instant,
}
#[derive(Debug, Clone)]
struct Node<TPeerId, TVal> {
id: TPeerId,
value: TVal,
}
pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> {
fn distance_with(&self, other: &TOther) -> u32;
fn max_distance() -> NonZeroUsize;
}
impl KBucketsPeerId for PeerId {
fn distance_with(&self, other: &Self) -> u32 {
<Multihash as KBucketsPeerId<Multihash>>::distance_with(self.as_ref(), other.as_ref())
}
fn max_distance() -> NonZeroUsize {
<Multihash as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId<PeerId> for Multihash {
fn distance_with(&self, other: &PeerId) -> u32 {
<Multihash as KBucketsPeerId<Multihash>>::distance_with(self, other.as_ref())
}
fn max_distance() -> NonZeroUsize {
<PeerId as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId for KadHash {
fn distance_with(&self, other: &Self) -> u32 {
let my_hash = U256::from(self.hash());
let other_hash = U256::from(other.hash());
let xor = my_hash ^ other_hash;
256 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
NonZeroUsize::new(256).expect("256 is not zero; QED")
}
}
impl KBucketsPeerId<KadHash> for Multihash {
fn distance_with(&self, other: &KadHash) -> u32 {
let my_hash = U512::from(self.digest());
let other_hash = U512::from(U256::from(other.hash()));
let xor = my_hash ^ other_hash;
512 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
NonZeroUsize::new(512).expect("512 is not zero; QED")
}
}
impl KBucketsPeerId for Multihash {
fn distance_with(&self, other: &Self) -> u32 {
let my_hash = U512::from(self.digest());
let other_hash = U512::from(other.digest());
let xor = my_hash ^ other_hash;
512 - xor.leading_zeros()
}
fn max_distance() -> NonZeroUsize {
NonZeroUsize::new(512).expect("512 is not zero; QED")
}
}
impl<A, B> KBucketsPeerId for (A, B)
where
A: KBucketsPeerId + PartialEq,
B: KBucketsPeerId + PartialEq,
{
fn distance_with(&self, other: &(A, B)) -> u32 {
A::distance_with(&self.0, &other.0) + B::distance_with(&self.1, &other.1)
}
fn max_distance() -> NonZeroUsize {
let n = <A as KBucketsPeerId<A>>::max_distance().get()
.saturating_add(<B as KBucketsPeerId<B>>::max_distance().get());
NonZeroUsize::new(n).expect("Saturating-add of two non-zeros can't be zero; QED")
}
}
impl<'a, T> KBucketsPeerId for &'a T
where
T: KBucketsPeerId,
{
fn distance_with(&self, other: &&'a T) -> u32 {
T::distance_with(*self, *other)
}
fn max_distance() -> NonZeroUsize {
<T as KBucketsPeerId>::max_distance()
}
}
impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self {
KBucketsTable {
my_id,
tables: (0..TPeerId::max_distance().get())
.map(|_| KBucket {
nodes: ArrayVec::new(),
first_connected_pos: 0,
pending_node: None,
})
.collect(),
unresponsive_timeout,
}
}
pub fn my_id(&self) -> &TPeerId {
&self.my_id
}
fn bucket_num(&self, id: &TPeerId) -> Option<usize> {
(self.my_id.distance_with(id) as usize).checked_sub(1)
}
pub fn entry<'a>(&'a mut self, peer_id: &'a TPeerId) -> Entry<'a, TPeerId, TVal> {
let bucket_num = if let Some(num) = self.bucket_num(peer_id) {
num
} else {
return Entry::SelfEntry;
};
if let Some(pending) = self.tables[bucket_num].pending_node.take() {
if pending.replace < Instant::now() {
let table = &mut self.tables[bucket_num];
let first_connected_pos = &mut table.first_connected_pos;
debug_assert!(*first_connected_pos >= 1);
table.nodes.remove(0);
if pending.connected {
*first_connected_pos -= 1;
table.nodes.insert(*first_connected_pos, pending.node);
} else {
table.nodes.insert(*first_connected_pos - 1, pending.node);
}
} else {
self.tables[bucket_num].pending_node = Some(pending);
}
}
if let Some(pos) = self.tables[bucket_num].nodes.iter().position(|p| p.id == *peer_id) {
if pos >= self.tables[bucket_num].first_connected_pos {
Entry::InKbucketConnected(EntryInKbucketConn {
parent: self,
peer_id,
})
} else {
Entry::InKbucketDisconnected(EntryInKbucketDisc {
parent: self,
peer_id,
})
}
} else if self.tables[bucket_num].pending_node.as_ref().map(|p| p.node.id == *peer_id).unwrap_or(false) {
if self.tables[bucket_num].pending_node.as_ref().map(|p| p.connected).unwrap_or(false) {
Entry::InKbucketConnectedPending(EntryInKbucketConnPending {
parent: self,
peer_id,
})
} else {
Entry::InKbucketDisconnectedPending(EntryInKbucketDiscPending {
parent: self,
peer_id,
})
}
} else {
Entry::NotInKbucket(EntryNotInKbucket {
parent: self,
peer_id,
})
}
}
pub fn entries_not_pending(&self) -> impl Iterator<Item = (&TPeerId, &TVal)> {
self.tables
.iter()
.flat_map(|table| table.nodes.iter())
.map(|node| (&node.id, &node.value))
}
pub fn buckets(&mut self) -> BucketsIter<'_, TPeerId, TVal> {
BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout)
}
pub fn find_closest(&mut self, id: &impl KBucketsPeerId<TPeerId>) -> VecIntoIter<TPeerId> {
let mut out = Vec::new();
for table in self.tables.iter_mut() {
for node in table.nodes.iter() {
out.push(node.id.clone());
}
if let Some(ref pending) = table.pending_node {
if pending.replace <= Instant::now() && pending.connected {
out.pop();
out.push(pending.node.id.clone());
}
}
}
out.sort_by(|a, b| id.distance_with(a).cmp(&id.distance_with(b)));
out.into_iter()
}
}
pub enum Entry<'a, TPeerId, TVal> {
InKbucketConnected(EntryInKbucketConn<'a, TPeerId, TVal>),
InKbucketConnectedPending(EntryInKbucketConnPending<'a, TPeerId, TVal>),
InKbucketDisconnected(EntryInKbucketDisc<'a, TPeerId, TVal>),
InKbucketDisconnectedPending(EntryInKbucketDiscPending<'a, TPeerId, TVal>),
NotInKbucket(EntryNotInKbucket<'a, TPeerId, TVal>),
SelfEntry,
}
impl<'a, TPeerId, TVal> Entry<'a, TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn value(&mut self) -> Option<&mut TVal> {
match self {
Entry::InKbucketConnected(entry) => Some(entry.value()),
Entry::InKbucketConnectedPending(entry) => Some(entry.value()),
Entry::InKbucketDisconnected(entry) => Some(entry.value()),
Entry::InKbucketDisconnectedPending(entry) => Some(entry.value()),
Entry::NotInKbucket(_entry) => None,
Entry::SelfEntry => None,
}
}
pub fn value_not_pending(&mut self) -> Option<&mut TVal> {
match self {
Entry::InKbucketConnected(entry) => Some(entry.value()),
Entry::InKbucketConnectedPending(_entry) => None,
Entry::InKbucketDisconnected(entry) => Some(entry.value()),
Entry::InKbucketDisconnectedPending(_entry) => None,
Entry::NotInKbucket(_entry) => None,
Entry::SelfEntry => None,
}
}
}
pub struct EntryInKbucketConn<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId,
}
impl<'a, TPeerId, TVal> EntryInKbucketConn<'a, TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn value(&mut self) -> &mut TVal {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketConn if we know of a bucket; QED");
&mut self.parent.tables[num]
};
let peer_id = self.peer_id;
&mut table.nodes.iter_mut()
.find(move |p| p.id == *peer_id)
.expect("We can only build a EntryInKbucketConn if we know that the peer is in its \
bucket; QED")
.value
}
pub fn set_disconnected(self) -> SetDisconnectedOutcome<'a, TPeerId, TVal> {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketConn if we know of a bucket; QED");
&mut self.parent.tables[num]
};
let peer_id = self.peer_id;
let pos = table.nodes.iter().position(move |elem| elem.id == *peer_id)
.expect("we can only build a EntryInKbucketConn if the node is in its bucket; QED");
debug_assert!(table.first_connected_pos <= pos);
if let Some(pending) = table.pending_node.take() {
if pending.connected {
let removed = table.nodes.remove(pos);
let ret = SetDisconnectedOutcome::Replaced {
replacement: pending.node.id.clone(),
old_val: removed.value,
};
table.nodes.insert(table.first_connected_pos, pending.node);
return ret;
} else {
table.pending_node = Some(pending);
}
}
if pos != table.first_connected_pos {
let elem = table.nodes.remove(pos);
table.nodes.insert(table.first_connected_pos, elem);
}
table.first_connected_pos += 1;
debug_assert!(table.nodes.iter()
.position(move |e| e.id == *peer_id)
.map(|p| p < table.first_connected_pos)
.unwrap_or(false));
SetDisconnectedOutcome::Kept(EntryInKbucketDisc {
parent: self.parent,
peer_id: self.peer_id,
})
}
}
#[must_use]
pub enum SetDisconnectedOutcome<'a, TPeerId, TVal> {
Kept(EntryInKbucketDisc<'a, TPeerId, TVal>),
Replaced {
replacement: TPeerId,
old_val: TVal,
},
}
pub struct EntryInKbucketConnPending<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId,
}
impl<'a, TPeerId, TVal> EntryInKbucketConnPending<'a, TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn value(&mut self) -> &mut TVal {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED");
&mut self.parent.tables[num]
};
assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id));
&mut table.pending_node
.as_mut()
.expect("we can only build a EntryInKbucketConnPending if the node is pending; QED")
.node.value
}
pub fn set_disconnected(self) -> EntryInKbucketDiscPending<'a, TPeerId, TVal> {
{
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketConnPending if we know of a bucket; QED");
&mut self.parent.tables[num]
};
let mut pending = table.pending_node.as_mut()
.expect("we can only build a EntryInKbucketConnPending if there's a pending node; QED");
debug_assert!(pending.connected);
pending.connected = false;
}
EntryInKbucketDiscPending {
parent: self.parent,
peer_id: self.peer_id,
}
}
}
pub struct EntryInKbucketDiscPending<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId,
}
impl<'a, TPeerId, TVal> EntryInKbucketDiscPending<'a, TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn value(&mut self) -> &mut TVal {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED");
&mut self.parent.tables[num]
};
assert!(table.pending_node.as_ref().map(|n| &n.node.id) == Some(self.peer_id));
&mut table.pending_node
.as_mut()
.expect("we can only build a EntryInKbucketDiscPending if the node is pending; QED")
.node.value
}
pub fn set_connected(self) -> EntryInKbucketConnPending<'a, TPeerId, TVal> {
{
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketDiscPending if we know of a bucket; QED");
&mut self.parent.tables[num]
};
let mut pending = table.pending_node.as_mut()
.expect("we can only build a EntryInKbucketDiscPending if there's a pending node; QED");
debug_assert!(!pending.connected);
pending.connected = true;
}
EntryInKbucketConnPending {
parent: self.parent,
peer_id: self.peer_id,
}
}
}
pub struct EntryInKbucketDisc<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId,
}
impl<'a, TPeerId, TVal> EntryInKbucketDisc<'a, TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn value(&mut self) -> &mut TVal {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED");
&mut self.parent.tables[num]
};
let peer_id = self.peer_id;
&mut table.nodes.iter_mut()
.find(move |p| p.id == *peer_id)
.expect("We can only build a EntryInKbucketDisc if we know that the peer is in its \
bucket; QED")
.value
}
pub fn set_connected(self) -> EntryInKbucketConn<'a, TPeerId, TVal> {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryInKbucketDisc if we know of a bucket; QED");
&mut self.parent.tables[num]
};
let pos = {
let peer_id = self.peer_id;
table.nodes.iter().position(move |p| p.id == *peer_id)
.expect("We can only build a EntryInKbucketDisc if we know that the peer is in \
its bucket; QED")
};
if pos == 0 {
table.pending_node = None;
}
debug_assert!(pos < table.first_connected_pos);
table.first_connected_pos -= 1;
if pos != table.first_connected_pos {
let entry = table.nodes.remove(pos);
table.nodes.insert(table.first_connected_pos, entry);
}
debug_assert!(!(table.first_connected_pos == 0 && table.pending_node.is_some()));
EntryInKbucketConn {
parent: self.parent,
peer_id: self.peer_id,
}
}
}
pub struct EntryNotInKbucket<'a, TPeerId, TVal> {
parent: &'a mut KBucketsTable<TPeerId, TVal>,
peer_id: &'a TPeerId,
}
impl<'a, TPeerId, TVal> EntryNotInKbucket<'a, TPeerId, TVal>
where
TPeerId: KBucketsPeerId + Clone,
{
pub fn insert_connected(self, value: TVal) -> InsertOutcome<TPeerId> {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryNotInKbucket if we know of a bucket; QED");
&mut self.parent.tables[num]
};
if table.nodes.is_full() {
if table.first_connected_pos == 0 || table.pending_node.is_some() {
InsertOutcome::Full
} else {
table.pending_node = Some(PendingNode {
node: Node { id: self.peer_id.clone(), value },
replace: Instant::now() + self.parent.unresponsive_timeout,
connected: true,
});
InsertOutcome::Pending {
to_ping: table.nodes[0].id.clone()
}
}
} else {
table.nodes.insert(table.first_connected_pos, Node {
id: self.peer_id.clone(),
value,
});
InsertOutcome::Inserted
}
}
pub fn insert_disconnected(self, value: TVal) -> InsertOutcome<TPeerId> {
let table = {
let num = self.parent.bucket_num(&self.peer_id)
.expect("we can only build a EntryNotInKbucket if we know of a bucket; QED");
&mut self.parent.tables[num]
};
if table.nodes.is_full() {
InsertOutcome::Full
} else {
table.nodes.insert(table.first_connected_pos, Node {
id: self.peer_id.clone(),
value,
});
table.first_connected_pos += 1;
InsertOutcome::Inserted
}
}
}
#[must_use]
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum InsertOutcome<TPeerId> {
Inserted,
Pending {
to_ping: TPeerId,
},
Full,
}
pub struct BucketsIter<'a, TPeerId, TVal>(SliceIterMut<'a, KBucket<TPeerId, TVal>>, Duration);
impl<'a, TPeerId, TVal> Iterator for BucketsIter<'a, TPeerId, TVal> {
type Item = Bucket<'a, TPeerId, TVal>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|bucket| {
Bucket(bucket)
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}
impl<'a, TPeerId, TVal> ExactSizeIterator for BucketsIter<'a, TPeerId, TVal> {}
pub struct Bucket<'a, TPeerId, TVal>(&'a mut KBucket<TPeerId, TVal>);
impl<'a, TPeerId, TVal> Bucket<'a, TPeerId, TVal> {
pub fn num_entries(&self) -> usize {
self.0.nodes.len()
}
pub fn has_pending(&self) -> bool {
if let Some(ref node) = self.0.pending_node {
node.replace > Instant::now()
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use crate::kbucket::{Entry, InsertOutcome, KBucketsPeerId, KBucketsTable, MAX_NODES_PER_BUCKET};
use multihash::{Multihash, Hash};
use std::thread;
use std::time::Duration;
#[test]
fn basic_closest() {
let my_id = Multihash::random(Hash::SHA2256);
let other_id = Multihash::random(Hash::SHA2256);
let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5));
if let Entry::NotInKbucket(entry) = table.entry(&other_id) {
match entry.insert_connected(()) {
InsertOutcome::Inserted => (),
_ => panic!()
}
} else {
panic!()
}
let res = table.find_closest(&other_id).collect::<Vec<_>>();
assert_eq!(res.len(), 1);
assert_eq!(res[0], other_id);
}
#[test]
fn update_local_id_fails() {
let my_id = Multihash::random(Hash::SHA2256);
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5));
match table.entry(&my_id) {
Entry::SelfEntry => (),
_ => panic!(),
}
}
#[test]
fn full_kbucket() {
let my_id = Multihash::random(Hash::SHA2256);
assert!(MAX_NODES_PER_BUCKET <= 251);
let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3)
.map(|n| {
let mut id = my_id.clone().into_bytes();
id[2] ^= 0x80;
id[33] = id[33].wrapping_add(n as u8);
Multihash::from_bytes(id).unwrap()
})
.collect::<Vec<_>>();
let first_node = fill_ids[0].clone();
let second_node = fill_ids[1].clone();
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1));
for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() {
if let Entry::NotInKbucket(entry) = table.entry(&id) {
match entry.insert_disconnected(()) {
InsertOutcome::Inserted => (),
_ => panic!()
}
} else {
panic!()
}
assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1);
}
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(!table.buckets().nth(255).unwrap().has_pending());
if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) {
match entry.insert_connected(()) {
InsertOutcome::Pending { ref to_ping } if *to_ping == first_node => (),
_ => panic!()
}
} else {
panic!()
}
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(table.buckets().nth(255).unwrap().has_pending());
if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) {
match entry.insert_connected(()) {
InsertOutcome::Full => (),
_ => panic!()
}
} else {
panic!()
}
thread::sleep(Duration::from_secs(2));
assert!(!table.buckets().nth(255).unwrap().has_pending());
if let Entry::NotInKbucket(entry) = table.entry(&fill_ids.remove(0)) {
match entry.insert_connected(()) {
InsertOutcome::Pending { ref to_ping } if *to_ping == second_node => (),
_ => panic!()
}
} else {
panic!()
}
}
#[test]
fn self_distance_zero() {
let a = Multihash::random(Hash::SHA2256);
assert_eq!(a.distance_with(&a), 0);
}
#[test]
fn distance_correct_order() {
let a = Multihash::random(Hash::SHA2256);
let b = Multihash::random(Hash::SHA2256);
assert!(a.distance_with(&a) < b.distance_with(&a));
assert!(a.distance_with(&b) > b.distance_with(&b));
}
}