mod bucket;
mod entry;
mod filter;
mod key;
pub use entry::*;
pub use crate::handler::ConnectionDirection;
use arrayvec::{self, ArrayVec};
use bucket::KBucket;
pub use bucket::{
ConnectionState, FailureReason, InsertResult as BucketInsertResult, UpdateResult,
MAX_NODES_PER_BUCKET,
};
pub use filter::{Filter, IpBucketFilter, IpTableFilter};
use std::{
collections::VecDeque,
time::{Duration, Instant},
};
const NUM_BUCKETS: usize = 256;
pub struct ClosestValue<TNodeId, TVal> {
pub key: Key<TNodeId>,
pub value: TVal,
}
impl<TNodeId, TVal> AsRef<Key<TNodeId>> for ClosestValue<TNodeId, TVal> {
fn as_ref(&self) -> &Key<TNodeId> {
&self.key
}
}
pub struct PredicateKey<TNodeId> {
pub key: Key<TNodeId>,
pub predicate_match: bool,
}
impl<TNodeId> From<PredicateKey<TNodeId>> for Key<TNodeId> {
fn from(key: PredicateKey<TNodeId>) -> Self {
key.key
}
}
pub struct PredicateValue<TNodeId, TVal> {
pub key: Key<TNodeId>,
pub predicate_match: bool,
pub value: TVal,
}
impl<TNodeId, TVal> AsRef<Key<TNodeId>> for PredicateValue<TNodeId, TVal> {
fn as_ref(&self) -> &Key<TNodeId> {
&self.key
}
}
impl<TNodeId, TVal> PredicateValue<TNodeId, TVal> {
pub fn to_key_value(self) -> (PredicateKey<TNodeId>, TVal) {
let PredicateValue {
key,
predicate_match,
value,
} = self;
let key = PredicateKey {
key,
predicate_match,
};
(key, value)
}
}
#[derive(Clone)]
pub struct KBucketsTable<TNodeId, TVal: Eq> {
local_key: Key<TNodeId>,
buckets: Vec<KBucket<TNodeId, TVal>>,
applied_pending: VecDeque<AppliedPending<TNodeId, TVal>>,
table_filter: Option<Box<dyn Filter<TVal>>>,
}
#[must_use]
#[derive(Debug, Clone)]
pub enum InsertResult<TNodeId> {
Inserted,
Pending {
disconnected: Key<TNodeId>,
},
StatusUpdated {
promoted_to_connected: bool,
},
ValueUpdated,
Updated {
promoted_to_connected: bool,
},
UpdatedPending,
Failed(FailureReason),
}
#[derive(Copy, Clone)]
struct BucketIndex(usize);
impl BucketIndex {
fn new(d: &Distance) -> Option<BucketIndex> {
(NUM_BUCKETS - d.0.leading_zeros() as usize)
.checked_sub(1)
.map(BucketIndex)
}
fn get(self) -> usize {
self.0
}
}
impl<TNodeId, TVal> KBucketsTable<TNodeId, TVal>
where
TNodeId: Clone,
TVal: Eq,
{
pub fn new(
local_key: Key<TNodeId>,
pending_timeout: Duration,
max_incoming_per_bucket: usize,
table_filter: Option<Box<dyn Filter<TVal>>>,
bucket_filter: Option<Box<dyn Filter<TVal>>>,
) -> Self {
KBucketsTable {
local_key,
buckets: (0..NUM_BUCKETS)
.map(|_| {
KBucket::new(
pending_timeout,
max_incoming_per_bucket,
bucket_filter.clone(),
)
})
.collect(),
applied_pending: VecDeque::new(),
table_filter,
}
}
pub fn update_node_status(
&mut self,
key: &Key<TNodeId>,
state: ConnectionState,
direction: Option<ConnectionDirection>,
) -> UpdateResult {
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
bucket.update_status(key, state, direction)
} else {
UpdateResult::NotModified }
}
pub fn update_node(
&mut self,
key: &Key<TNodeId>,
value: TVal,
state: Option<ConnectionState>,
) -> UpdateResult {
let mut passed_table_filter = true;
if let Some(table_filter) = self.table_filter.as_ref() {
let duplicate = {
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(node) = bucket.get(key) {
node.value == value
} else {
false
}
} else {
false
}
};
if !duplicate && !table_filter.filter(&value, &mut self.table_iter()) {
passed_table_filter = false;
}
}
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
if !passed_table_filter {
bucket.remove(key);
return UpdateResult::Failed(FailureReason::TableFilter);
}
let update_result = bucket.update_value(key, value);
if let UpdateResult::Failed(_) = &update_result {
return update_result;
}
let status_result = if let Some(state) = state {
bucket.update_status(key, state, None)
} else {
UpdateResult::NotModified
};
match (&update_result, &status_result) {
(_, UpdateResult::Failed(_)) => status_result,
(UpdateResult::Failed(_), _) => update_result,
(_, UpdateResult::UpdatedAndPromoted) => UpdateResult::UpdatedAndPromoted,
(UpdateResult::UpdatedPending, _) => UpdateResult::UpdatedPending,
(_, UpdateResult::UpdatedPending) => UpdateResult::UpdatedPending,
(UpdateResult::NotModified, UpdateResult::NotModified) => UpdateResult::NotModified,
(_, _) => UpdateResult::Updated,
}
} else {
UpdateResult::NotModified }
}
pub fn insert_or_update(
&mut self,
key: &Key<TNodeId>,
value: TVal,
status: NodeStatus,
) -> InsertResult<TNodeId> {
let mut passed_table_filter = true;
if let Some(table_filter) = self.table_filter.as_ref() {
let duplicate = {
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(node) = bucket.get(key) {
node.value == value
} else {
false
}
} else {
false
}
};
if !duplicate && !table_filter.filter(&value, &mut self.table_iter()) {
passed_table_filter = false;
}
}
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
if !passed_table_filter {
bucket.remove(key);
return InsertResult::Failed(FailureReason::TableFilter);
}
if bucket.position(key).is_none() {
let node = Node {
key: key.clone(),
value,
status,
};
match bucket.insert(node) {
bucket::InsertResult::NodeExists => unreachable!("Node must exist"),
bucket::InsertResult::Full => InsertResult::Failed(FailureReason::BucketFull),
bucket::InsertResult::TooManyIncoming => {
InsertResult::Failed(FailureReason::TooManyIncoming)
}
bucket::InsertResult::FailedFilter => {
InsertResult::Failed(FailureReason::BucketFilter)
}
bucket::InsertResult::Pending { disconnected } => {
InsertResult::Pending { disconnected }
}
bucket::InsertResult::Inserted => InsertResult::Inserted,
}
} else {
let update_status = bucket.update_status(key, status.state, Some(status.direction));
if update_status.failed() {
return InsertResult::Failed(FailureReason::TooManyIncoming);
}
let update_value = bucket.update_value(key, value);
match (update_value, update_status) {
(UpdateResult::Updated, UpdateResult::Updated) => InsertResult::Updated {
promoted_to_connected: false,
},
(UpdateResult::Updated, UpdateResult::UpdatedAndPromoted) => {
InsertResult::Updated {
promoted_to_connected: true,
}
}
(UpdateResult::Updated, UpdateResult::NotModified)
| (UpdateResult::Updated, UpdateResult::UpdatedPending) => {
InsertResult::ValueUpdated
}
(UpdateResult::NotModified, UpdateResult::Updated) => {
InsertResult::StatusUpdated {
promoted_to_connected: false,
}
}
(UpdateResult::NotModified, UpdateResult::UpdatedAndPromoted) => {
InsertResult::StatusUpdated {
promoted_to_connected: true,
}
}
(UpdateResult::NotModified, UpdateResult::NotModified) => {
InsertResult::Updated {
promoted_to_connected: false,
}
}
(UpdateResult::UpdatedPending, _) | (_, UpdateResult::UpdatedPending) => {
InsertResult::UpdatedPending
}
(UpdateResult::Failed(reason), _) => InsertResult::Failed(reason),
(_, UpdateResult::Failed(_)) => unreachable!("Status failure handled earlier."),
(UpdateResult::UpdatedAndPromoted, _) => {
unreachable!("Value update cannot promote a connection.")
}
}
}
} else {
InsertResult::Failed(FailureReason::InvalidSelfUpdate)
}
}
pub fn remove(&mut self, key: &Key<TNodeId>) -> bool {
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
bucket.remove(key)
} else {
false
}
}
pub fn entry<'a>(&'a mut self, key: &'a Key<TNodeId>) -> Entry<'a, TNodeId, TVal> {
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &mut self.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied)
}
Entry::new(bucket, key)
} else {
Entry::SelfEntry
}
}
pub fn iter(&mut self) -> impl Iterator<Item = EntryRefView<'_, TNodeId, TVal>> {
let applied_pending = &mut self.applied_pending;
self.buckets.iter_mut().flat_map(move |table| {
if let Some(applied) = table.apply_pending() {
applied_pending.push_back(applied)
}
table.iter().map(move |n| EntryRefView {
node: NodeRefView {
key: &n.key,
value: &n.value,
},
status: n.status,
})
})
}
pub fn buckets_iter(&self) -> impl Iterator<Item = &KBucket<TNodeId, TVal>> {
self.buckets.iter()
}
fn table_iter(&self) -> impl Iterator<Item = &TVal> {
self.buckets
.iter()
.flat_map(move |table| table.iter().map(|n| &n.value))
}
pub fn iter_ref(&self) -> impl Iterator<Item = EntryRefView<'_, TNodeId, TVal>> {
self.buckets.iter().flat_map(move |table| {
table.iter().map(move |n| EntryRefView {
node: NodeRefView {
key: &n.key,
value: &n.value,
},
status: n.status,
})
})
}
pub fn take_applied_pending(&mut self) -> Option<AppliedPending<TNodeId, TVal>> {
self.applied_pending.pop_front()
}
pub fn nodes_by_distances(
&mut self,
log2_distances: &[u64],
max_nodes: usize,
) -> Vec<EntryRefView<'_, TNodeId, TVal>> {
let distances = log2_distances
.iter()
.filter_map(|&d| {
if d > 0 && d <= (NUM_BUCKETS as u64) {
Some(d)
} else {
None
}
})
.collect::<Vec<_>>();
let mut node_count = 0;
for distance in &distances {
let bucket = &mut self.buckets[(distance - 1) as usize];
if let Some(applied) = bucket.apply_pending() {
self.applied_pending.push_back(applied);
node_count += bucket.num_entries();
if node_count >= max_nodes {
break;
}
}
}
let mut matching_nodes = Vec::new();
for distance in distances {
let bucket = &self.buckets[(distance - 1) as usize];
for node in bucket.iter().map(|n| {
let node = NodeRefView {
key: &n.key,
value: &n.value,
};
EntryRefView {
node,
status: n.status,
}
}) {
matching_nodes.push(node);
if matching_nodes.len() >= max_nodes {
return matching_nodes;
}
}
}
matching_nodes
}
pub fn closest_keys<'a, T>(
&'a mut self,
target: &'a Key<T>,
) -> impl Iterator<Item = Key<TNodeId>> + 'a
where
T: Clone,
{
let distance = self.local_key.distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<TNodeId, TVal>| -> ArrayVec<_, MAX_NODES_PER_BUCKET> {
b.iter().map(|n| n.key.clone()).collect()
},
}
}
pub fn closest_values<'a, T>(
&'a mut self,
target: &'a Key<T>,
) -> impl Iterator<Item = ClosestValue<TNodeId, TVal>> + 'a
where
T: Clone,
TVal: Clone,
{
let distance = self.local_key.distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: |b: &KBucket<TNodeId, TVal>| -> ArrayVec<_, MAX_NODES_PER_BUCKET> {
b.iter()
.map(|n| ClosestValue {
key: n.key.clone(),
value: n.value.clone(),
})
.collect()
},
}
}
pub fn closest_values_predicate<'a, T, F>(
&'a mut self,
target: &'a Key<T>,
predicate: F,
) -> impl Iterator<Item = PredicateValue<TNodeId, TVal>> + 'a
where
T: Clone,
F: Fn(&TVal) -> bool + 'a,
TVal: Clone,
{
let distance = self.local_key.distance(target);
ClosestIter {
target,
iter: None,
table: self,
buckets_iter: ClosestBucketsIter::new(distance),
fmap: move |b: &KBucket<TNodeId, TVal>| -> ArrayVec<_, MAX_NODES_PER_BUCKET> {
b.iter()
.map(|n| PredicateValue {
key: n.key.clone(),
predicate_match: predicate(&n.value),
value: n.value.clone(),
})
.collect()
},
}
}
pub fn get_bucket(&self, key: &Key<TNodeId>) -> Option<&KBucket<TNodeId, TVal>> {
let index = BucketIndex::new(&self.local_key.distance(key));
if let Some(i) = index {
let bucket = &self.buckets[i.get()];
Some(bucket)
} else {
None
}
}
pub fn get_index(&self, key: &Key<TNodeId>) -> Option<usize> {
let index = BucketIndex::new(&self.local_key.distance(key));
index.map(|i| i.get())
}
}
struct ClosestIter<'a, TTarget, TNodeId, TVal: Eq, TMap, TOut> {
target: &'a Key<TTarget>,
table: &'a mut KBucketsTable<TNodeId, TVal>,
buckets_iter: ClosestBucketsIter,
iter: Option<arrayvec::IntoIter<TOut, MAX_NODES_PER_BUCKET>>,
fmap: TMap,
}
struct ClosestBucketsIter {
distance: Distance,
state: ClosestBucketsIterState,
}
enum ClosestBucketsIterState {
Start(BucketIndex),
ZoomIn(BucketIndex),
ZoomOut(BucketIndex),
Done,
}
impl ClosestBucketsIter {
fn new(distance: Distance) -> Self {
let state = match BucketIndex::new(&distance) {
Some(i) => ClosestBucketsIterState::Start(i),
None => ClosestBucketsIterState::Start(BucketIndex(0)),
};
Self { distance, state }
}
fn next_in(&self, i: BucketIndex) -> Option<BucketIndex> {
(0..i.get()).rev().find_map(|i| {
if self.distance.0.bit(i) {
Some(BucketIndex(i))
} else {
None
}
})
}
fn next_out(&self, i: BucketIndex) -> Option<BucketIndex> {
(i.get() + 1..NUM_BUCKETS).find_map(|i| {
if !self.distance.0.bit(i) {
Some(BucketIndex(i))
} else {
None
}
})
}
}
impl Iterator for ClosestBucketsIter {
type Item = BucketIndex;
fn next(&mut self) -> Option<Self::Item> {
match self.state {
ClosestBucketsIterState::Start(i) => {
self.state = ClosestBucketsIterState::ZoomIn(i);
Some(i)
}
ClosestBucketsIterState::ZoomIn(i) => {
if let Some(i) = self.next_in(i) {
self.state = ClosestBucketsIterState::ZoomIn(i);
Some(i)
} else {
let i = BucketIndex(0);
self.state = ClosestBucketsIterState::ZoomOut(i);
Some(i)
}
}
ClosestBucketsIterState::ZoomOut(i) => {
if let Some(i) = self.next_out(i) {
self.state = ClosestBucketsIterState::ZoomOut(i);
Some(i)
} else {
self.state = ClosestBucketsIterState::Done;
None
}
}
ClosestBucketsIterState::Done => None,
}
}
}
impl<TTarget, TNodeId, TVal, TMap, TOut> Iterator
for ClosestIter<'_, TTarget, TNodeId, TVal, TMap, TOut>
where
TNodeId: Clone,
TVal: Eq,
TMap: Fn(&KBucket<TNodeId, TVal>) -> ArrayVec<TOut, MAX_NODES_PER_BUCKET>,
TOut: AsRef<Key<TNodeId>>,
{
type Item = TOut;
fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.iter {
Some(iter) => match iter.next() {
Some(k) => return Some(k),
None => self.iter = None,
},
None => {
if let Some(i) = self.buckets_iter.next() {
let bucket = &mut self.table.buckets[i.get()];
if let Some(applied) = bucket.apply_pending() {
self.table.applied_pending.push_back(applied)
}
let mut v = (self.fmap)(bucket);
v.sort_by(|a, b| {
self.target
.distance(a.as_ref())
.cmp(&self.target.distance(b.as_ref()))
});
self.iter = Some(v.into_iter());
} else {
return None;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::{bucket::InsertResult as BucketInsertResult, *};
use enr::NodeId;
fn connected_state() -> NodeStatus {
NodeStatus {
state: ConnectionState::Connected,
direction: ConnectionDirection::Outgoing,
}
}
fn disconnected_state() -> NodeStatus {
NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Outgoing,
}
}
#[test]
fn basic_closest() {
let local_key = Key::from(NodeId::random());
let other_id = Key::from(NodeId::random());
let mut table = KBucketsTable::<_, ()>::new(
local_key,
Duration::from_secs(5),
MAX_NODES_PER_BUCKET,
None,
None,
);
if let Entry::Absent(entry) = table.entry(&other_id) {
match entry.insert((), connected_state()) {
BucketInsertResult::Inserted => (),
_ => panic!(),
}
} else {
panic!()
}
let res = table.closest_keys(&other_id).collect::<Vec<_>>();
assert_eq!(res.len(), 1);
assert_eq!(res[0], other_id);
}
#[test]
fn update_local_id_fails() {
let local_key = Key::from(NodeId::random());
let mut table = KBucketsTable::<_, ()>::new(
local_key.clone(),
Duration::from_secs(5),
MAX_NODES_PER_BUCKET,
None,
None,
);
match table.entry(&local_key) {
Entry::SelfEntry => (),
_ => panic!(),
}
}
#[test]
fn closest() {
let local_key = Key::from(NodeId::random());
let mut table = KBucketsTable::<_, ()>::new(
local_key,
Duration::from_secs(5),
MAX_NODES_PER_BUCKET,
None,
None,
);
let mut count = 0;
loop {
if count == 100 {
break;
}
let key = Key::from(NodeId::random());
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), connected_state()) {
BucketInsertResult::Inserted => count += 1,
_ => continue,
}
} else {
panic!("entry exists")
}
}
let mut expected_keys: Vec<_> = table
.buckets
.iter()
.flat_map(|t| t.iter().map(|n| n.key.clone()))
.collect();
for _ in 0..10 {
let target_key = Key::from(NodeId::random());
let keys = table.closest_keys(&target_key).collect::<Vec<_>>();
expected_keys.sort_by_key(|k| k.distance(&target_key));
assert_eq!(keys, expected_keys);
}
}
#[test]
fn closest_local() {
let local_key = Key::from(NodeId::random());
let mut table = KBucketsTable::<_, ()>::new(
local_key,
Duration::from_secs(5),
MAX_NODES_PER_BUCKET,
None,
None,
);
let mut count = 0;
loop {
if count == 100 {
break;
}
let key = Key::from(NodeId::random());
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), connected_state()) {
BucketInsertResult::Inserted => count += 1,
_ => continue,
}
} else {
panic!("entry exists")
}
}
let local_key = table.local_key.clone();
assert_eq!(table.closest_keys(&local_key).count(), count);
}
#[test]
fn applied_pending() {
let local_key = Key::from(NodeId::random());
let mut table = KBucketsTable::<_, ()>::new(
local_key.clone(),
Duration::from_millis(1),
MAX_NODES_PER_BUCKET,
None,
None,
);
let expected_applied;
let full_bucket_index;
loop {
let key = Key::from(NodeId::random());
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), disconnected_state()) {
BucketInsertResult::Full => {
if let Entry::Absent(e) = table.entry(&key) {
match e.insert((), connected_state()) {
BucketInsertResult::Pending { disconnected } => {
expected_applied = AppliedPending {
inserted: key.clone(),
evicted: Some(Node {
key: disconnected,
value: (),
status: disconnected_state(),
}),
};
full_bucket_index = BucketIndex::new(&key.distance(&local_key));
break;
}
_ => panic!(),
}
} else {
panic!()
}
}
_ => continue,
}
} else {
panic!("entry exists")
}
}
let full_bucket = &mut table.buckets[full_bucket_index.unwrap().get()];
let elapsed = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
full_bucket.pending_mut().unwrap().set_ready_at(elapsed);
match table.entry(&expected_applied.inserted) {
Entry::Present(
_,
NodeStatus {
state: ConnectionState::Connected,
direction: _direction,
},
) => {}
x => panic!("Unexpected entry: {:?}", x),
}
match table.entry(&expected_applied.evicted.as_ref().unwrap().key) {
Entry::Absent(_) => {}
x => panic!("Unexpected entry: {:?}", x),
}
assert_eq!(Some(expected_applied), table.take_applied_pending());
assert_eq!(None, table.take_applied_pending());
}
}