use std::cmp::{Ord, Ordering};
use std::convert::{From, Into};
use tox_crypto::*;
use tox_packet::dht::packed_node::*;
pub fn kbucket_index(&PublicKey(ref own_pk): &PublicKey,
&PublicKey(ref other_pk): &PublicKey) -> Option<u8> {
debug!(target: "KBucketIndex", "Calculating KBucketIndex for PKs.");
trace!(target: "KBucketIndex", "With PK1: {:?}; PK2: {:?}", own_pk, other_pk);
let xoring = own_pk.iter().zip(other_pk.iter()).map(|(x, y)| x ^ y);
for (i, byte) in xoring.enumerate() {
for j in 0..8 {
if byte & (0x80 >> j) != 0 {
return Some(i as u8 * 8 + j);
}
}
}
None }
pub trait Distance {
fn distance(&self, pk1: &PublicKey, pk2: &PublicKey) -> Ordering;
}
impl Distance for PublicKey {
fn distance(&self,
&PublicKey(ref pk1): &PublicKey,
&PublicKey(ref pk2): &PublicKey) -> Ordering {
trace!(target: "Distance", "Comparing distance between PKs.");
let &PublicKey(own) = self;
for i in 0..PUBLICKEYBYTES {
if pk1[i] != pk2[i] {
return Ord::cmp(&(own[i] ^ pk1[i]), &(own[i] ^ pk2[i]))
}
}
Ordering::Equal
}
}
pub trait HasPK {
fn pk(&self) -> PublicKey;
}
impl HasPK for PackedNode {
fn pk(&self) -> PublicKey {
self.pk
}
}
pub trait KbucketNode : Sized + HasPK {
type NewNode: HasPK;
type CheckNode: HasPK;
fn is_outdated(&self, other: &Self::CheckNode) -> bool;
fn update(&mut self, other: &Self::NewNode);
fn is_evictable(&self) -> bool;
fn eviction_index(nodes: &[Self]) -> Option<usize> {
nodes.iter().rposition(|node| node.is_evictable())
}
}
impl KbucketNode for PackedNode {
type NewNode = PackedNode;
type CheckNode = PackedNode;
fn is_outdated(&self, other: &PackedNode) -> bool {
self.saddr != other.saddr
}
fn update(&mut self, other: &PackedNode) {
self.saddr = other.saddr;
}
fn is_evictable(&self) -> bool {
false
}
fn eviction_index(_nodes: &[Self]) -> Option<usize> {
None
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Kbucket<Node> {
pub capacity: u8,
pub nodes: Vec<Node>,
}
impl<Node> Into<Vec<Node>> for Kbucket<Node> {
fn into(self) -> Vec<Node> {
self.nodes
}
}
pub const KBUCKET_DEFAULT_SIZE: u8 = 8;
impl<NewNode, CheckNode, Node> Kbucket<Node>
where
NewNode: HasPK,
CheckNode: HasPK,
Node: KbucketNode<NewNode = NewNode, CheckNode = CheckNode> + From<NewNode>
{
pub fn new(capacity: u8) -> Self {
trace!("Creating a new Kbucket with capacity: {}", capacity);
Kbucket {
capacity,
nodes: Vec::with_capacity(capacity as usize),
}
}
fn find(&self, base_pk: &PublicKey, pk: &PublicKey) -> Option<usize> {
self.nodes.binary_search_by(|n| base_pk.distance(&n.pk(), pk)).ok()
}
pub fn get_node(&self, base_pk: &PublicKey, pk: &PublicKey) -> Option<&Node> {
self.find(base_pk, pk)
.map(move |node_index| &self.nodes[node_index])
}
pub fn get_node_mut(&mut self, base_pk: &PublicKey, pk: &PublicKey) -> Option<&mut Node> {
self.find(base_pk, pk)
.map(move |node_index| &mut self.nodes[node_index])
}
pub fn try_add(&mut self, base_pk: &PublicKey, new_node: NewNode, evict: bool) -> bool {
trace!(target: "Kbucket", "Trying to add PackedNode: {:?}.", new_node.pk());
match self.nodes.binary_search_by(|n| base_pk.distance(&n.pk(), &new_node.pk())) {
Ok(index) => {
debug!(target: "Kbucket",
"Updated: the node was already in the kbucket.");
self.nodes[index].update(&new_node);
true
},
Err(index) if !evict || index == self.nodes.len() => {
if self.is_full() {
if let Some(eviction_index) = Node::eviction_index(&self.nodes) {
debug!(target: "Kbucket",
"No free space left in the kbucket, the last bad node removed.");
self.nodes.remove(eviction_index);
let index = index - if eviction_index < index { 1 } else { 0 };
self.nodes.insert(index, new_node.into());
true
} else {
debug!(target: "Kbucket",
"Node can't be added to the kbucket.");
false
}
} else {
debug!(target: "Kbucket",
"Node inserted inside the kbucket.");
self.nodes.insert(index, new_node.into());
true
}
},
Err(index) => {
if self.is_full() {
debug!(target: "Kbucket",
"No free space left in the kbucket, the last (bad) node removed.");
let eviction_index = Node::eviction_index(&self.nodes).unwrap_or_else(|| self.nodes.len() - 1);
self.nodes.remove(eviction_index);
let index = index - if eviction_index < index { 1 } else { 0 };
self.nodes.insert(index, new_node.into());
} else {
self.nodes.insert(index, new_node.into());
debug!(target: "Kbucket", "Node inserted inside the kbucket.");
}
true
},
}
}
pub fn remove(&mut self, base_pk: &PublicKey, node_pk: &PublicKey) -> Option<Node> {
trace!(target: "Kbucket", "Removing KbucketNode with PK: {:?}", node_pk);
match self.nodes.binary_search_by(|n| base_pk.distance(&n.pk(), node_pk)) {
Ok(index) => Some(self.nodes.remove(index)),
Err(_) => {
trace!("No KbucketNode to remove with PK: {:?}", node_pk);
None
}
}
}
pub fn contains(&self, base_pk: &PublicKey, pk: &PublicKey) -> bool {
self.nodes.binary_search_by(|n| base_pk.distance(&n.pk(), pk)).is_ok()
}
pub fn len(&self) -> usize {
self.nodes.len()
}
pub fn capacity(&self) -> usize {
self.capacity as usize
}
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
}
pub fn is_full(&self) -> bool {
self.nodes.len() == self.capacity()
}
pub fn can_add(&self, base_pk: &PublicKey, new_node: &CheckNode, evict: bool) -> bool {
match self.nodes.binary_search_by(|n| base_pk.distance(&n.pk(), &new_node.pk())) {
Ok(index) =>
self.nodes[index].is_evictable() || self.nodes[index].is_outdated(new_node),
Err(index) if !evict || index == self.nodes.len() =>
!self.is_full() || self.nodes.iter().any(|n| n.is_evictable()),
Err(_index) =>
true,
}
}
pub fn iter(&self) -> impl Iterator<Item = &Node> + Clone {
self.nodes.iter()
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
self.nodes.iter_mut()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{
Ipv4Addr,
SocketAddr,
SocketAddrV4,
};
use std::time::Duration;
use crate::dht::dht_node::*;
#[test]
fn public_key_distance() {
let pk_0 = PublicKey([0; PUBLICKEYBYTES]);
let pk_1 = PublicKey([1; PUBLICKEYBYTES]);
let pk_2 = PublicKey([2; PUBLICKEYBYTES]);
let pk_ff = PublicKey([0xff; PUBLICKEYBYTES]);
let pk_fe = PublicKey([0xfe; PUBLICKEYBYTES]);
assert_eq!(Ordering::Less, pk_0.distance(&pk_1, &pk_2));
assert_eq!(Ordering::Equal, pk_2.distance(&pk_2, &pk_2));
assert_eq!(Ordering::Less, pk_2.distance(&pk_0, &pk_1));
assert_eq!(Ordering::Greater, pk_2.distance(&pk_ff, &pk_fe));
assert_eq!(Ordering::Greater, pk_2.distance(&pk_ff, &pk_fe));
assert_eq!(Ordering::Less, pk_fe.distance(&pk_ff, &pk_2));
}
#[test]
fn kbucket_index_test() {
let pk1 = PublicKey([0b10_10_10_10; PUBLICKEYBYTES]);
let pk2 = PublicKey([0; PUBLICKEYBYTES]);
let pk3 = PublicKey([0b00_10_10_10; PUBLICKEYBYTES]);
assert_eq!(None, kbucket_index(&pk1, &pk1));
assert_eq!(Some(0), kbucket_index(&pk1, &pk2));
assert_eq!(Some(2), kbucket_index(&pk2, &pk3));
}
#[test]
fn kbucket_try_add() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
for i in 0 .. 8 {
let addr = SocketAddr::new("1.2.3.4".parse().unwrap(), 12345 + u16::from(i));
let node = PackedNode::new(addr, &PublicKey([i + 2; PUBLICKEYBYTES]));
assert!(kbucket.try_add(&pk, node, false));
}
let closer_node = PackedNode::new(
"1.2.3.5:12345".parse().unwrap(),
&PublicKey([1; PUBLICKEYBYTES])
);
let farther_node = PackedNode::new(
"1.2.3.5:12346".parse().unwrap(),
&PublicKey([10; PUBLICKEYBYTES])
);
let existing_node = PackedNode::new(
"1.2.3.5:12347".parse().unwrap(),
&PublicKey([2; PUBLICKEYBYTES])
);
assert!(!kbucket.try_add(&pk, farther_node, false));
assert!(!kbucket.try_add(&pk, farther_node, true));
assert!(!kbucket.try_add(&pk, closer_node, false));
assert!(kbucket.try_add(&pk, closer_node, true));
assert!(kbucket.try_add(&pk, existing_node, false));
}
#[tokio::test]
async fn kbucket_try_add_should_replace_bad_nodes() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(1);
let node_1 = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&PublicKey([1; PUBLICKEYBYTES])
);
let node_2 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&PublicKey([2; PUBLICKEYBYTES])
);
assert!(kbucket.try_add(&pk, node_2, false));
assert!(!kbucket.try_add(&pk, node_1, false));
tokio::time::pause();
tokio::time::advance(BAD_NODE_TIMEOUT + Duration::from_secs(1)).await;
assert!(kbucket.try_add(&pk, node_1, false));
}
#[tokio::test]
async fn kbucket_try_add_should_replace_bad_nodes_in_the_middle() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(3);
let pk_1 = PublicKey([1; PUBLICKEYBYTES]);
let pk_2 = PublicKey([2; PUBLICKEYBYTES]);
let pk_3 = PublicKey([3; PUBLICKEYBYTES]);
let pk_4 = PublicKey([4; PUBLICKEYBYTES]);
let node_1 = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&pk_1,
);
let node_2 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&pk_2,
);
let node_3 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&pk_3,
);
let node_4 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&pk_4,
);
assert!(kbucket.try_add(&pk, node_2, false));
tokio::time::pause();
tokio::time::advance(BAD_NODE_TIMEOUT + Duration::from_secs(1)).await;
assert!(kbucket.try_add(&pk, node_3, false));
assert!(kbucket.try_add(&pk, node_4, false));
assert!(kbucket.try_add(&pk, node_1, false));
assert!(!kbucket.contains(&pk, &pk_2));
assert!(kbucket.contains(&pk, &pk_1));
assert!(kbucket.contains(&pk, &pk_3));
assert!(kbucket.contains(&pk, &pk_4));
}
#[tokio::test]
async fn kbucket_try_add_evict_should_replace_bad_nodes() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(1);
let node_1 = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&PublicKey([1; PUBLICKEYBYTES])
);
let node_2 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&PublicKey([2; PUBLICKEYBYTES])
);
assert!(kbucket.try_add(&pk, node_1, true));
assert!(!kbucket.try_add(&pk, node_2, true));
tokio::time::pause();
tokio::time::advance(BAD_NODE_TIMEOUT + Duration::from_secs(1)).await;
assert!(kbucket.try_add(&pk, node_2, true));
}
#[tokio::test]
async fn kbucket_try_add_evict_should_replace_bad_nodes_first() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(2);
let pk_1 = PublicKey([1; PUBLICKEYBYTES]);
let pk_2 = PublicKey([2; PUBLICKEYBYTES]);
let pk_3 = PublicKey([3; PUBLICKEYBYTES]);
let node_1 = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&pk_1,
);
let node_2 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&pk_2,
);
let node_3 = PackedNode::new(
"1.2.3.4:12346".parse().unwrap(),
&pk_3,
);
assert!(kbucket.try_add(&pk, node_1, true));
tokio::time::pause();
tokio::time::advance(BAD_NODE_TIMEOUT + Duration::from_secs(1)).await;
assert!(kbucket.try_add(&pk, node_3, true));
assert!(kbucket.try_add(&pk, node_2, true));
assert!(!kbucket.contains(&pk, &pk_1));
assert!(kbucket.contains(&pk, &pk_2));
assert!(kbucket.contains(&pk, &pk_3));
}
#[test]
fn kbucket_remove() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let node = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&PublicKey([1; PUBLICKEYBYTES])
);
assert!(kbucket.remove(&pk, &node.pk).is_none());
assert!(kbucket.is_empty());
assert!(kbucket.try_add(&pk, node, true));
assert!(!kbucket.is_empty());
assert!(kbucket.remove(&pk, &node.pk).is_some());
assert!(kbucket.is_empty());
}
#[test]
fn kbucket_is_empty() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
assert!(kbucket.is_empty());
let node = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&PublicKey([1; PUBLICKEYBYTES])
);
assert!(kbucket.try_add(&pk, node, true));
assert!(!kbucket.is_empty());
}
#[test]
fn kbucket_get_node() {
crypto_init().unwrap();
let (pk, _) = gen_keypair();
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let node_pk = gen_keypair().0;
let pn = PackedNode {
pk: node_pk,
saddr: "127.0.0.1:33445".parse().unwrap(),
};
assert!(kbucket.try_add(&pk, pn, true));
assert!(kbucket.get_node(&pk, &node_pk).is_some());
}
#[test]
fn kbucket_get_node_mut() {
crypto_init().unwrap();
let (pk, _) = gen_keypair();
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let node_pk = gen_keypair().0;
let pn = PackedNode {
pk: node_pk,
saddr: "127.0.0.1:33445".parse().unwrap(),
};
assert!(kbucket.try_add(&pk, pn, true));
assert!(kbucket.get_node_mut(&pk, &node_pk).is_some());
}
fn position_test_data() -> (PublicKey, PackedNode, PackedNode, PackedNode) {
let mut pk_bytes = [3; PUBLICKEYBYTES];
pk_bytes[0] = 1;
let base_pk = PublicKey(pk_bytes);
let addr = Ipv4Addr::new(0, 0, 0, 0);
let saddr = SocketAddrV4::new(addr, 0);
pk_bytes[5] = 1;
let pk1 = PublicKey(pk_bytes);
let n1 = PackedNode::new(SocketAddr::V4(saddr), &pk1);
pk_bytes[10] = 2;
let pk2 = PublicKey(pk_bytes);
let n2 = PackedNode::new(SocketAddr::V4(saddr), &pk2);
pk_bytes[14] = 4;
let pk3 = PublicKey(pk_bytes);
let n3 = PackedNode::new(SocketAddr::V4(saddr), &pk3);
assert!(pk1 > pk2);
assert!(pk2 < pk3);
assert!(pk1 > pk3);
(base_pk, n1, n2, n3)
}
#[test]
fn kbucket_position_straight_insertion() {
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let (base_pk, n1, n2, n3) = position_test_data();
kbucket.try_add(&base_pk, n1, true);
kbucket.try_add(&base_pk, n2, true);
kbucket.try_add(&base_pk, n3, true);
assert_eq!(kbucket.find(&base_pk, &n1.pk), Some(0));
assert_eq!(kbucket.find(&base_pk, &n2.pk), Some(1));
assert_eq!(kbucket.find(&base_pk, &n3.pk), Some(2));
}
#[test]
fn kbucket_position_reverse_insertion() {
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let (base_pk, n1, n2, n3) = position_test_data();
kbucket.try_add(&base_pk, n3, true);
kbucket.try_add(&base_pk, n2, true);
kbucket.try_add(&base_pk, n1, true);
assert_eq!(kbucket.find(&base_pk, &n1.pk), Some(0));
assert_eq!(kbucket.find(&base_pk, &n2.pk), Some(1));
assert_eq!(kbucket.find(&base_pk, &n3.pk), Some(2));
}
#[test]
fn kbucket_position_remove_first() {
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let (base_pk, n1, n2, n3) = position_test_data();
kbucket.try_add(&base_pk, n1, true); kbucket.try_add(&base_pk, n2, true); kbucket.try_add(&base_pk, n3, true); kbucket.remove(&base_pk, &n1.pk);
assert_eq!(kbucket.find(&base_pk, &n1.pk), None);
assert_eq!(kbucket.find(&base_pk, &n2.pk), Some(0));
assert_eq!(kbucket.find(&base_pk, &n3.pk), Some(1));
}
#[test]
fn kbucket_position_remove_second() {
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let (base_pk, n1, n2, n3) = position_test_data();
kbucket.try_add(&base_pk, n1, true); kbucket.try_add(&base_pk, n2, true); kbucket.try_add(&base_pk, n3, true); kbucket.remove(&base_pk, &n2.pk);
assert_eq!(kbucket.find(&base_pk, &n1.pk), Some(0));
assert_eq!(kbucket.find(&base_pk, &n2.pk), None);
assert_eq!(kbucket.find(&base_pk, &n3.pk), Some(1));
}
#[test]
fn kbucket_position_remove_third() {
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
let (base_pk, n1, n2, n3) = position_test_data();
kbucket.try_add(&base_pk, n1, true); kbucket.try_add(&base_pk, n2, true); kbucket.try_add(&base_pk, n3, true); kbucket.remove(&base_pk, &n3.pk);
assert_eq!(kbucket.find(&base_pk, &n1.pk), Some(0));
assert_eq!(kbucket.find(&base_pk, &n2.pk), Some(1));
assert_eq!(kbucket.find(&base_pk, &n3.pk), None);
}
#[test]
fn kbucket_len() {
let pk = PublicKey([0; PUBLICKEYBYTES]);
let mut kbucket = Kbucket::<DhtNode>::new(KBUCKET_DEFAULT_SIZE);
assert_eq!(kbucket.len(), 0);
let node = PackedNode::new(
"1.2.3.4:12345".parse().unwrap(),
&PublicKey([1; PUBLICKEYBYTES])
);
assert!(kbucket.try_add(&pk, node, true));
assert_eq!(kbucket.len(), 1);
}
}