use arrayvec::ArrayVec;
use bigint::U512;
use libp2p_core::PeerId;
use multihash::Multihash;
use std::slice::IterMut as SliceIterMut;
use std::time::{Duration, Instant};
use std::vec::IntoIter as VecIntoIter;
pub const MAX_NODES_PER_BUCKET: usize = 20;
#[derive(Debug, Clone)]
pub struct KBucketsTable<TPeerId, TVal> {
my_id: TPeerId,
tables: Vec<KBucket<TPeerId, TVal>>,
unresponsive_timeout: Duration,
}
#[derive(Debug, Clone)]
struct KBucket<TPeerId, TVal> {
nodes: ArrayVec<[Node<TPeerId, TVal>; MAX_NODES_PER_BUCKET]>,
first_connected_pos: usize,
pending_node: Option<(Node<TPeerId, TVal>, Instant)>,
latest_update: Instant,
}
#[derive(Debug, Clone)]
struct Node<TPeerId, TVal> {
id: TPeerId,
value: TVal,
}
impl<TPeerId, TVal> KBucket<TPeerId, TVal> {
fn flush(&mut self, timeout: Duration) {
if let Some((pending_node, instant)) = self.pending_node.take() {
if instant.elapsed() >= timeout {
let _ = self.nodes.remove(0);
self.nodes.push(pending_node);
} else {
self.pending_node = Some((pending_node, instant));
}
}
}
}
pub trait KBucketsPeerId<TOther = Self>: PartialEq<TOther> + Clone {
fn distance_with(&self, other: &TOther) -> u32;
fn max_distance() -> usize;
}
impl KBucketsPeerId for PeerId {
#[inline]
fn distance_with(&self, other: &Self) -> u32 {
Multihash::distance_with(self.as_ref(), other.as_ref())
}
#[inline]
fn max_distance() -> usize {
<Multihash as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId<Multihash> for PeerId {
#[inline]
fn distance_with(&self, other: &Multihash) -> u32 {
Multihash::distance_with(self.as_ref(), other)
}
#[inline]
fn max_distance() -> usize {
<Multihash as KBucketsPeerId>::max_distance()
}
}
impl KBucketsPeerId for Multihash {
#[inline]
fn distance_with(&self, other: &Self) -> u32 {
let my_hash = U512::from(self.digest());
let other_hash = U512::from(other.digest());
let xor = my_hash ^ other_hash;
512 - xor.leading_zeros()
}
#[inline]
fn max_distance() -> usize {
512
}
}
impl<TPeerId, TVal> KBucketsTable<TPeerId, TVal>
where
TPeerId: KBucketsPeerId,
{
pub fn new(my_id: TPeerId, unresponsive_timeout: Duration) -> Self {
KBucketsTable {
my_id,
tables: (0..TPeerId::max_distance())
.map(|_| KBucket {
nodes: ArrayVec::new(),
first_connected_pos: 0,
pending_node: None,
latest_update: Instant::now(),
})
.collect(),
unresponsive_timeout,
}
}
#[inline]
fn bucket_num(&self, id: &TPeerId) -> Option<usize> {
(self.my_id.distance_with(id) as usize).checked_sub(1)
}
#[inline]
pub fn buckets(&mut self) -> BucketsIter<TPeerId, TVal> {
BucketsIter(self.tables.iter_mut(), self.unresponsive_timeout)
}
#[inline]
pub fn my_id(&self) -> &TPeerId {
&self.my_id
}
pub fn get(&self, id: &TPeerId) -> Option<&TVal> {
let table = match self.bucket_num(&id) {
Some(n) => &self.tables[n],
None => return None,
};
for elem in &table.nodes {
if elem.id == *id {
return Some(&elem.value);
}
}
None
}
pub fn get_mut(&mut self, id: &TPeerId) -> Option<&mut TVal> {
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return None,
};
table.flush(self.unresponsive_timeout);
for elem in &mut table.nodes {
if elem.id == *id {
return Some(&mut elem.value);
}
}
None
}
pub fn entry_mut(&mut self, id: &TPeerId) -> Option<&mut TVal>
where
TVal: Default,
{
if let Some((bucket, entry)) = self.entry_mut_inner(id) {
Some(&mut self.tables[bucket].nodes[entry].value)
} else {
None
}
}
fn entry_mut_inner(&mut self, id: &TPeerId) -> Option<(usize, usize)>
where
TVal: Default,
{
let (bucket_num, table) = match self.bucket_num(&id) {
Some(n) => (n, &mut self.tables[n]),
None => return None,
};
table.flush(self.unresponsive_timeout);
if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) {
return Some((bucket_num, pos));
}
if !table.nodes.is_full() {
table.nodes.insert(table.first_connected_pos, Node {
id: id.clone(),
value: Default::default(),
});
table.first_connected_pos += 1;
table.latest_update = Instant::now();
return Some((bucket_num, table.first_connected_pos - 1));
}
None
}
pub fn set_connected(&mut self, id: &TPeerId) -> Update<TPeerId>
where
TVal: Default,
{
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return Update::FailSelfUpdate,
};
table.flush(self.unresponsive_timeout);
if let Some(pos) = table.nodes.iter().position(|elem| elem.id == *id) {
if pos < table.first_connected_pos.saturating_sub(1) {
let elem = table.nodes.remove(pos);
table.first_connected_pos -= 1;
table.nodes.insert(table.first_connected_pos, elem);
}
table.latest_update = Instant::now();
Update::Updated
} else if !table.nodes.is_full() {
table.nodes.insert(table.first_connected_pos, Node {
id: id.clone(),
value: Default::default(),
});
table.latest_update = Instant::now();
Update::Added
} else if table.first_connected_pos > 0 && table.pending_node.is_none() {
let pending_node = Node {
id: id.clone(),
value: Default::default(),
};
table.pending_node = Some((pending_node, Instant::now()));
Update::Pending(&table.nodes[0].id)
} else {
debug_assert!(table.first_connected_pos == 0 || table.pending_node.is_some());
Update::Discarded
}
}
pub fn set_disconnected(&mut self, id: &TPeerId) {
let table = match self.bucket_num(&id) {
Some(n) => &mut self.tables[n],
None => return,
};
table.flush(self.unresponsive_timeout);
let pos = match table.nodes.iter().position(|elem| elem.id == *id) {
Some(pos) => pos,
None => return,
};
if pos > table.first_connected_pos {
let elem = table.nodes.remove(pos);
table.nodes.insert(table.first_connected_pos, elem);
table.first_connected_pos += 1;
} else if pos == table.first_connected_pos {
table.first_connected_pos += 1;
}
}
pub fn find_closest<TOther>(&mut self, id: &TOther) -> VecIntoIter<TPeerId>
where
TPeerId: Clone + KBucketsPeerId<TOther>,
{
let mut out = Vec::new();
for table in self.tables.iter_mut() {
table.flush(self.unresponsive_timeout);
if table.latest_update.elapsed() > self.unresponsive_timeout {
continue;
}
for node in table.nodes.iter() {
out.push(node.id.clone());
}
}
out.sort_by(|a, b| b.distance_with(id).cmp(&a.distance_with(id)));
out.into_iter()
}
pub fn find_closest_with_self<TOther>(&mut self, id: &TOther) -> VecIntoIter<TPeerId>
where
TPeerId: Clone + KBucketsPeerId<TOther>,
{
let mut intermediate: Vec<_> = self.find_closest(id).collect();
if let Some(pos) = intermediate
.iter()
.position(|e| e.distance_with(id) >= self.my_id.distance_with(id))
{
if intermediate[pos] != self.my_id {
intermediate.insert(pos, self.my_id.clone());
}
} else {
intermediate.push(self.my_id.clone());
}
intermediate.into_iter()
}
}
#[derive(Debug)]
#[must_use]
pub enum Update<'a, TPeerId> {
Added,
Updated,
Pending(&'a TPeerId),
Discarded,
FailSelfUpdate,
}
pub struct BucketsIter<'a, TPeerId: 'a, TVal: 'a>(SliceIterMut<'a, KBucket<TPeerId, TVal>>, Duration);
impl<'a, TPeerId: 'a, TVal: 'a> Iterator for BucketsIter<'a, TPeerId, TVal> {
type Item = Bucket<'a, TPeerId, TVal>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|bucket| {
bucket.flush(self.1);
Bucket(bucket)
})
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}
impl<'a, TPeerId: 'a, TVal: 'a> ExactSizeIterator for BucketsIter<'a, TPeerId, TVal> {}
pub struct Bucket<'a, TPeerId: 'a, TVal: 'a>(&'a mut KBucket<TPeerId, TVal>);
impl<'a, TPeerId: 'a, TVal: 'a> Bucket<'a, TPeerId, TVal> {
#[inline]
pub fn num_entries(&self) -> usize {
self.0.nodes.len()
}
#[inline]
pub fn has_pending(&self) -> bool {
self.0.pending_node.is_some()
}
#[inline]
pub fn latest_update(&self) -> Instant {
self.0.latest_update
}
}
#[cfg(test)]
mod tests {
extern crate rand;
use self::rand::random;
use crate::kbucket::{KBucketsPeerId, KBucketsTable, Update, MAX_NODES_PER_BUCKET};
use multihash::{Multihash, Hash};
use std::thread;
use std::time::Duration;
#[test]
fn basic_closest() {
let my_id = Multihash::random(Hash::SHA2256);
let other_id = Multihash::random(Hash::SHA2256);
let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5));
table.entry_mut(&other_id);
let res = table.find_closest(&other_id).collect::<Vec<_>>();
assert_eq!(res.len(), 1);
assert_eq!(res[0], other_id);
}
#[test]
fn update_local_id_fails() {
let my_id = Multihash::random(Hash::SHA2256);
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(5));
assert!(table.entry_mut(&my_id).is_none());
match table.set_connected(&my_id) {
Update::FailSelfUpdate => (),
_ => panic!(),
}
}
#[test]
fn update_time_last_refresh() {
let my_id = Multihash::random(Hash::SHA2256);
let other_ids = (0..random::<usize>() % 20)
.map(|_| {
let bit_num = random::<usize>() % 256;
let mut id = my_id.as_bytes().to_vec().clone();
id[33 - (bit_num / 8)] ^= 1 << (bit_num % 8);
(Multihash::from_bytes(id).unwrap(), bit_num)
})
.collect::<Vec<_>>();
let mut table = KBucketsTable::<_, ()>::new(my_id, Duration::from_secs(5));
let before_update = table.buckets().map(|b| b.latest_update()).collect::<Vec<_>>();
thread::sleep(Duration::from_secs(2));
for &(ref id, _) in &other_ids {
table.entry_mut(&id);
}
let after_update = table.buckets().map(|b| b.latest_update()).collect::<Vec<_>>();
for (offset, (bef, aft)) in before_update.iter().zip(after_update.iter()).enumerate() {
if other_ids.iter().any(|&(_, bucket)| bucket == offset) {
assert_ne!(bef, aft);
} else {
assert_eq!(bef, aft);
}
}
}
#[test]
fn full_kbucket() {
let my_id = Multihash::random(Hash::SHA2256);
assert!(MAX_NODES_PER_BUCKET <= 251);
let mut fill_ids = (0..MAX_NODES_PER_BUCKET + 3)
.map(|n| {
let mut id = my_id.clone().into_bytes();
id[2] ^= 0x80;
id[33] = id[33].wrapping_add(n as u8);
Multihash::from_bytes(id).unwrap()
})
.collect::<Vec<_>>();
let first_node = fill_ids[0].clone();
let second_node = fill_ids[1].clone();
let mut table = KBucketsTable::<_, ()>::new(my_id.clone(), Duration::from_secs(1));
for (num, id) in fill_ids.drain(..MAX_NODES_PER_BUCKET).enumerate() {
match table.set_connected(&id) {
Update::Added => (),
_ => panic!()
}
table.set_disconnected(&id);
assert_eq!(table.buckets().nth(255).unwrap().num_entries(), num + 1);
}
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(!table.buckets().nth(255).unwrap().has_pending());
match table.set_connected(&fill_ids.remove(0)) {
Update::Pending(to_ping) => {
assert_eq!(*to_ping, first_node);
},
_ => panic!()
}
assert_eq!(
table.buckets().nth(255).unwrap().num_entries(),
MAX_NODES_PER_BUCKET
);
assert!(table.buckets().nth(255).unwrap().has_pending());
match table.set_connected(&fill_ids.remove(0)) {
Update::Discarded => (),
_ => panic!()
}
thread::sleep(Duration::from_secs(2));
assert!(!table.buckets().nth(255).unwrap().has_pending());
match table.set_connected(&fill_ids.remove(0)) {
Update::Pending(to_ping) => {
assert_eq!(*to_ping, second_node);
},
_ => panic!()
}
}
#[test]
fn self_distance_zero() {
let a = Multihash::random(Hash::SHA2256);
assert_eq!(a.distance_with(&a), 0);
}
#[test]
fn distance_correct_order() {
let a = Multihash::random(Hash::SHA2256);
let b = Multihash::random(Hash::SHA2256);
assert!(a.distance_with(&a) < b.distance_with(&a));
assert!(a.distance_with(&b) > b.distance_with(&b));
}
}