#![allow(dead_code)]
use super::*;
use tracing::{debug, error};
pub const MAX_NODES_PER_BUCKET: usize = 16;
#[derive(Debug, Clone)]
pub struct PendingNode<TNodeId, TVal: Eq> {
node: Node<TNodeId, TVal>,
replace: Instant,
}
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub struct NodeStatus {
pub direction: ConnectionDirection,
pub state: ConnectionState,
}
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub enum ConnectionState {
Connected,
Disconnected,
}
impl NodeStatus {
pub fn is_connected(&self) -> bool {
match self.state {
ConnectionState::Connected => true,
ConnectionState::Disconnected => false,
}
}
pub fn is_incoming(&self) -> bool {
match self.direction {
ConnectionDirection::Outgoing => false,
ConnectionDirection::Incoming => true,
}
}
}
impl<TNodeId, TVal: Eq> PendingNode<TNodeId, TVal> {
pub fn status(&self) -> NodeStatus {
self.node.status
}
pub fn value_mut(&mut self) -> &mut TVal {
&mut self.node.value
}
pub fn set_ready_at(&mut self, t: Instant) {
self.replace = t;
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node<TNodeId, TVal: Eq> {
pub key: Key<TNodeId>,
pub value: TVal,
pub status: NodeStatus,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Position(usize);
#[derive(Clone)]
pub struct KBucket<TNodeId, TVal: Eq> {
nodes: ArrayVec<Node<TNodeId, TVal>, MAX_NODES_PER_BUCKET>,
first_connected_pos: Option<usize>,
pending: Option<PendingNode<TNodeId, TVal>>,
pending_timeout: Duration,
filter: Option<Box<dyn Filter<TVal>>>,
max_incoming: usize,
}
#[must_use]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InsertResult<TNodeId> {
Inserted,
Pending {
disconnected: Key<TNodeId>,
},
FailedFilter,
TooManyIncoming,
Full,
NodeExists,
}
#[must_use]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum UpdateResult {
Updated,
UpdatedAndPromoted,
UpdatedPending,
Failed(FailureReason),
NotModified,
}
impl UpdateResult {
pub fn failed(&self) -> bool {
matches!(self, UpdateResult::Failed(_))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FailureReason {
TooManyIncoming,
BucketFilter,
TableFilter,
KeyNonExistent,
BucketFull,
InvalidSelfUpdate,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppliedPending<TNodeId, TVal: Eq> {
pub inserted: Key<TNodeId>,
pub evicted: Option<Node<TNodeId, TVal>>,
}
impl<TNodeId, TVal> KBucket<TNodeId, TVal>
where
TNodeId: Clone,
TVal: Eq,
{
pub fn new(
pending_timeout: Duration,
max_incoming: usize,
filter: Option<Box<dyn Filter<TVal>>>,
) -> Self {
KBucket {
nodes: ArrayVec::new(),
first_connected_pos: None,
pending: None,
pending_timeout,
filter,
max_incoming,
}
}
pub fn pending(&self) -> Option<&PendingNode<TNodeId, TVal>> {
self.pending.as_ref()
}
pub fn pending_mut(&mut self) -> Option<&mut PendingNode<TNodeId, TVal>> {
self.pending.as_mut()
}
pub fn as_pending(&self, key: &Key<TNodeId>) -> Option<&PendingNode<TNodeId, TVal>> {
self.pending().filter(|p| &p.node.key == key)
}
pub fn iter(&self) -> impl Iterator<Item = &Node<TNodeId, TVal>> {
self.nodes.iter()
}
pub fn apply_pending(&mut self) -> Option<AppliedPending<TNodeId, TVal>> {
if let Some(pending) = self.pending.take() {
if pending.replace <= Instant::now() {
if self.nodes.is_full() {
if self.nodes[0].status.is_connected() {
return None;
}
if let Some(filter) = self.filter.as_ref() {
if !filter.filter(
&pending.node.value,
&mut self.iter().map(|node| &node.value),
) {
return None;
}
}
if pending.status().is_connected() && pending.status().is_incoming() {
if self.is_max_incoming() {
return None;
}
}
let inserted = pending.node.key.clone();
if pending.status().is_connected() {
let evicted = Some(self.nodes.remove(0));
self.first_connected_pos = self
.first_connected_pos
.map_or_else(|| Some(self.nodes.len()), |p| p.checked_sub(1));
self.nodes.push(pending.node);
return Some(AppliedPending { inserted, evicted });
}
else if let Some(p) = self.first_connected_pos {
if let Some(insert_pos) = p.checked_sub(1) {
let evicted = Some(self.nodes.remove(0));
self.nodes.insert(insert_pos, pending.node);
return Some(AppliedPending { inserted, evicted });
}
} else {
let evicted = Some(self.nodes.remove(0));
self.nodes.push(pending.node);
return Some(AppliedPending { inserted, evicted });
}
} else {
let inserted = pending.node.key.clone();
match self.insert(pending.node) {
InsertResult::Inserted => {
return Some(AppliedPending {
inserted,
evicted: None,
})
}
InsertResult::Full => unreachable!("Bucket cannot be full"),
InsertResult::Pending { .. } | InsertResult::NodeExists => {
error!("Bucket is not full or double node")
}
InsertResult::FailedFilter => debug!("Pending node failed filter"),
InsertResult::TooManyIncoming => {
debug!("Pending node failed incoming filter")
}
}
}
} else {
self.pending = Some(pending);
}
}
None
}
pub fn update_pending(&mut self, status: NodeStatus) {
if let Some(pending) = &mut self.pending {
pending.node.status = status
}
}
pub fn update_status(
&mut self,
key: &Key<TNodeId>,
state: ConnectionState,
direction: Option<ConnectionDirection>,
) -> UpdateResult {
if let Some(pos) = self.position(key) {
let mut node = self.nodes.remove(pos.0);
let old_status = node.status;
node.status.state = state;
if let Some(direction) = direction {
node.status.direction = direction;
}
let not_modified = old_status == node.status;
let is_connected = matches!(state, ConnectionState::Connected);
match old_status.state {
ConnectionState::Connected => {
if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
self.first_connected_pos = None
}
}
ConnectionState::Disconnected => {
self.first_connected_pos =
self.first_connected_pos.and_then(|p| p.checked_sub(1))
}
}
if pos == Position(0) && is_connected {
self.pending = None
}
match self.insert(node) {
InsertResult::Inserted => {
if not_modified {
UpdateResult::NotModified
} else if !old_status.is_connected() && is_connected {
UpdateResult::UpdatedAndPromoted
} else {
UpdateResult::Updated
}
}
InsertResult::TooManyIncoming => {
UpdateResult::Failed(FailureReason::TooManyIncoming)
}
InsertResult::FailedFilter => {
UpdateResult::Failed(FailureReason::BucketFilter)
}
InsertResult::NodeExists => {
unreachable!("The node was removed and shouldn't already exist")
}
InsertResult::Full => {
unreachable!("The node was removed so the bucket cannot be full")
}
InsertResult::Pending { .. } => {
unreachable!("The node was removed so can't be added as pending")
}
}
} else if let Some(pending) = &mut self.pending {
if &pending.node.key == key {
pending.node.status.state = state;
if let Some(direction) = direction {
pending.node.status.direction = direction;
}
UpdateResult::UpdatedPending
} else {
UpdateResult::Failed(FailureReason::KeyNonExistent)
}
} else {
UpdateResult::Failed(FailureReason::KeyNonExistent)
}
}
pub fn update_value(&mut self, key: &Key<TNodeId>, value: TVal) -> UpdateResult {
if let Some(Position(pos)) = self.position(key) {
let mut node = self.nodes.remove(pos);
if node.value == value {
self.nodes.insert(pos, node);
UpdateResult::NotModified
} else {
if let Some(filter) = self.filter.as_ref() {
if !filter.filter(&value, &mut self.iter().map(|node| &node.value)) {
self.update_first_connected_pos_for_removal(pos);
return UpdateResult::Failed(FailureReason::BucketFilter);
}
}
node.value = value;
self.nodes.insert(pos, node);
UpdateResult::Updated
}
} else if let Some(pending) = &mut self.pending {
if &pending.node.key == key {
pending.node.value = value;
UpdateResult::UpdatedPending
} else {
UpdateResult::Failed(FailureReason::KeyNonExistent)
}
} else {
UpdateResult::Failed(FailureReason::KeyNonExistent)
}
}
pub fn insert(&mut self, node: Node<TNodeId, TVal>) -> InsertResult<TNodeId> {
if self.position(&node.key).is_some() {
return InsertResult::NodeExists;
}
if let Some(filter) = self.filter.as_ref() {
if !filter.filter(&node.value, &mut self.iter().map(|node| &node.value)) {
return InsertResult::FailedFilter;
}
}
let inserting_pending = self
.pending
.as_ref()
.map(|pending| pending.node.key == node.key)
.unwrap_or_default();
let insert_result = match node.status.state {
ConnectionState::Connected => {
if node.status.is_incoming() {
if self.is_max_incoming() {
return InsertResult::TooManyIncoming;
}
}
if self.nodes.is_full() {
if self.first_connected_pos == Some(0) || self.pending.is_some() {
return InsertResult::Full;
} else {
self.pending = Some(PendingNode {
node,
replace: Instant::now() + self.pending_timeout,
});
return InsertResult::Pending {
disconnected: self.nodes[0].key.clone(),
};
}
}
let pos = self.nodes.len();
self.first_connected_pos = self.first_connected_pos.or(Some(pos));
self.nodes.push(node);
InsertResult::Inserted
}
ConnectionState::Disconnected => {
if self.nodes.is_full() {
return InsertResult::Full;
}
if let Some(ref mut first_connected_pos) = self.first_connected_pos {
self.nodes.insert(*first_connected_pos, node);
*first_connected_pos += 1;
} else {
self.nodes.push(node);
}
InsertResult::Inserted
}
};
if matches!(insert_result, InsertResult::Inserted) && inserting_pending {
self.pending = None
}
insert_result
}
pub fn remove(&mut self, key: &Key<TNodeId>) -> bool {
if let Some(Position(position)) = self.position(key) {
self.nodes.remove(position);
self.update_first_connected_pos_for_removal(position);
self.apply_pending();
true
} else {
false
}
}
pub fn num_entries(&self) -> usize {
self.nodes.len()
}
pub fn num_connected(&self) -> usize {
self.first_connected_pos.map_or(0, |i| self.nodes.len() - i)
}
pub fn num_disconnected(&self) -> usize {
self.nodes.len() - self.num_connected()
}
pub fn position(&self, key: &Key<TNodeId>) -> Option<Position> {
self.nodes.iter().position(|p| &p.key == key).map(Position)
}
pub fn status(&self, pos: Position) -> NodeStatus {
if let Some(node) = self.nodes.get(pos.0) {
node.status
} else {
NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Incoming,
}
}
}
pub fn get_mut(&mut self, key: &Key<TNodeId>) -> Option<&mut Node<TNodeId, TVal>> {
self.nodes.iter_mut().find(move |p| &p.key == key)
}
pub fn get(&self, key: &Key<TNodeId>) -> Option<&Node<TNodeId, TVal>> {
self.nodes.iter().find(move |p| &p.key == key)
}
fn is_max_incoming(&self) -> bool {
self.nodes
.iter()
.filter(|node| node.status.is_connected() && node.status.is_incoming())
.count()
>= self.max_incoming
}
fn update_first_connected_pos_for_removal(&mut self, removed_pos: usize) {
self.first_connected_pos = self.first_connected_pos.and_then(|fcp| {
if removed_pos < fcp {
Some(fcp - 1)
} else {
Some(fcp).filter(|_| fcp < self.nodes.len())
}
});
}
}
impl<TNodeId: std::fmt::Debug, TVal: Eq + std::fmt::Debug> std::fmt::Debug
for KBucket<TNodeId, TVal>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KBucket")
.field("nodes", &self.nodes)
.field("first_connected_pos", &self.first_connected_pos)
.field("pending", &self.pending)
.field("pending_timeout", &self.pending_timeout)
.field("filter", &self.filter.is_some())
.field("max_incoming", &self.max_incoming)
.finish()
}
}
impl std::fmt::Display for ConnectionDirection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
ConnectionDirection::Incoming => write!(f, "Incoming"),
ConnectionDirection::Outgoing => write!(f, "Outgoing"),
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use enr::NodeId;
use quickcheck::*;
use rand_07::Rng;
use std::{
collections::{HashSet, VecDeque},
hash::Hash,
};
fn connected_state() -> NodeStatus {
NodeStatus {
state: ConnectionState::Connected,
direction: ConnectionDirection::Outgoing,
}
}
fn disconnected_state() -> NodeStatus {
NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Outgoing,
}
}
pub fn arbitrary_node_id<G: Gen>(g: &mut G) -> NodeId {
let mut node_id = [0u8; 32];
g.fill_bytes(&mut node_id);
NodeId::new(&node_id)
}
impl<V> KBucket<NodeId, V>
where
V: Eq + std::fmt::Debug,
{
fn check_invariants(&self) {
self.check_first_connected_pos();
self.check_status_ordering();
self.check_max_incoming_nodes();
}
fn check_first_connected_pos(&self) {
let first_connected_pos = self
.nodes
.iter()
.position(|node| node.status.is_connected());
assert_eq!(self.first_connected_pos, first_connected_pos);
}
fn check_status_ordering(&self) {
let first_connected_pos = self.first_connected_pos.unwrap_or(self.nodes.len());
assert!(self.nodes[..first_connected_pos]
.iter()
.all(|n| !n.status.is_connected()));
assert!(self.nodes[first_connected_pos..]
.iter()
.all(|n| n.status.is_connected()));
}
fn check_max_incoming_nodes(&self) {
let number_of_incoming_nodes = self
.nodes
.iter()
.filter(|n| n.status.is_connected() && n.status.is_incoming())
.count();
assert!(number_of_incoming_nodes <= self.max_incoming);
}
}
impl<V> Arbitrary for KBucket<NodeId, V>
where
V: Arbitrary + Eq,
{
fn arbitrary<G: Gen>(g: &mut G) -> KBucket<NodeId, V> {
let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64));
let mut bucket = KBucket::<NodeId, V>::new(timeout, MAX_NODES_PER_BUCKET, None);
let num_nodes = g.gen_range(1, MAX_NODES_PER_BUCKET + 1);
for _ in 0..num_nodes {
loop {
let node = Node::arbitrary(g);
match bucket.insert(node) {
InsertResult::Inserted => break,
InsertResult::TooManyIncoming => {}
_ => panic!(),
}
}
}
bucket
}
}
impl<V> Arbitrary for Node<NodeId, V>
where
V: Arbitrary + Eq,
{
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let key = Key::from(arbitrary_node_id(g));
Node {
key,
value: V::arbitrary(g),
status: NodeStatus::arbitrary(g),
}
}
}
impl Arbitrary for NodeStatus {
fn arbitrary<G: Gen>(g: &mut G) -> NodeStatus {
match g.gen_range(1, 4) {
1 => NodeStatus {
direction: ConnectionDirection::Incoming,
state: ConnectionState::Connected,
},
2 => NodeStatus {
direction: ConnectionDirection::Outgoing,
state: ConnectionState::Connected,
},
3 => NodeStatus {
direction: ConnectionDirection::Incoming,
state: ConnectionState::Disconnected,
},
4 => NodeStatus {
direction: ConnectionDirection::Outgoing,
state: ConnectionState::Disconnected,
},
x => unreachable!("Should not generate numbers out of this range {}", x),
}
}
}
impl Arbitrary for Position {
fn arbitrary<G: Gen>(g: &mut G) -> Position {
Position(g.gen_range(0, MAX_NODES_PER_BUCKET))
}
}
fn fill_bucket(bucket: &mut KBucket<NodeId, ()>, status: NodeStatus) {
let num_entries_start = bucket.num_entries();
for i in 0..MAX_NODES_PER_BUCKET - num_entries_start {
let key = Key::from(NodeId::random());
let node = Node {
key,
value: (),
status,
};
assert_eq!(InsertResult::Inserted, bucket.insert(node));
assert_eq!(bucket.num_entries(), num_entries_start + i + 1);
}
}
#[derive(Debug, Clone)]
pub struct SetFilter<T> {
set: HashSet<T>,
}
impl<T> Filter<T> for SetFilter<T>
where
T: Clone + Hash + Eq + Send + Sync + 'static,
{
fn filter(&self, value: &T, _: &mut dyn Iterator<Item = &T>) -> bool {
self.set.contains(value)
}
}
#[derive(Debug, Clone)]
pub enum Action<TVal>
where
TVal: Eq,
{
Insert(Node<NodeId, TVal>),
Remove(usize),
UpdatePending(NodeStatus),
ApplyPending,
UpdateStatus(usize, NodeStatus),
UpdateValue(usize, TVal),
}
impl<V> Arbitrary for Action<V>
where
V: Arbitrary + Eq,
{
fn arbitrary<G: Gen>(g: &mut G) -> Self {
match g.gen_range(0, 6) {
0 => Action::Insert(<_>::arbitrary(g)),
1 => Action::Remove(<_>::arbitrary(g)),
2 => Action::UpdatePending(<_>::arbitrary(g)),
3 => Action::ApplyPending,
4 => Action::UpdateStatus(<_>::arbitrary(g), <_>::arbitrary(g)),
5 => Action::UpdateValue(<_>::arbitrary(g), <_>::arbitrary(g)),
_ => panic!("wrong number of action variants"),
}
}
}
impl<V> KBucket<NodeId, V>
where
V: Eq + std::fmt::Debug,
{
fn apply_action(&mut self, action: Action<V>) -> Result<(), FailureReason> {
match action {
Action::Insert(node) => match self.insert(node) {
InsertResult::FailedFilter => Err(FailureReason::BucketFilter),
InsertResult::TooManyIncoming => Err(FailureReason::TooManyIncoming),
InsertResult::Full => Err(FailureReason::BucketFull),
_ => Ok(()),
},
Action::Remove(pos) => {
if let Some(key) = self.key_of_pos(pos) {
self.remove(&key);
}
Ok(())
}
Action::UpdatePending(status) => {
self.update_pending(status);
Ok(())
}
Action::ApplyPending => {
self.apply_pending();
Ok(())
}
Action::UpdateStatus(pos, status) => {
if let Some(key) = self.key_of_pos(pos) {
match self.update_status(&key, status.state, Some(status.direction)) {
UpdateResult::Failed(reason) => Err(reason),
_ => Ok(()),
}
} else {
Ok(())
}
}
Action::UpdateValue(pos, value) => {
if let Some(key) = self.key_of_pos(pos) {
match self.update_value(&key, value) {
UpdateResult::Failed(reason) => Err(reason),
_ => Ok(()),
}
} else {
Ok(())
}
}
}
}
fn key_of_pos(&self, pos: usize) -> Option<Key<NodeId>> {
let num_nodes = self.num_entries();
if num_nodes > 0 {
let pos = pos % num_nodes;
let key = self.nodes[pos].key.clone();
Some(key)
} else {
None
}
}
}
#[test]
fn ordering() {
fn prop(status: Vec<NodeStatus>) -> bool {
let mut bucket =
KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
let mut connected = VecDeque::new();
let mut disconnected = VecDeque::new();
for status in status {
let key = Key::from(NodeId::random());
let node = Node {
key: key.clone(),
value: (),
status,
};
let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
if let InsertResult::Inserted = bucket.insert(node) {
let vec = if status.is_connected() {
&mut connected
} else {
&mut disconnected
};
if full {
vec.pop_front();
}
vec.push_back((status, key.clone()));
}
}
let mut nodes = bucket
.iter()
.map(|n| (n.status, n.key.clone()))
.collect::<Vec<_>>();
let first_connected_pos = nodes.iter().position(|(status, _)| status.is_connected());
assert_eq!(bucket.first_connected_pos, first_connected_pos);
let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p));
disconnected == nodes && connected == tail
}
quickcheck(prop as fn(_) -> _);
}
#[test]
fn full_bucket() {
let mut bucket =
KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
let disconnected_status = NodeStatus {
state: ConnectionState::Disconnected,
direction: ConnectionDirection::Outgoing,
};
fill_bucket(&mut bucket, disconnected_status);
let key = Key::from(NodeId::random());
let node = Node {
key,
value: (),
status: disconnected_status,
};
match bucket.insert(node) {
InsertResult::Full => {}
x => panic!("{:?}", x),
}
for i in 0..MAX_NODES_PER_BUCKET {
let first = bucket.iter().next().unwrap();
let first_disconnected = first.clone();
assert_eq!(first.status, disconnected_status);
let key = Key::from(NodeId::random());
let node = Node {
key: key.clone(),
value: (),
status: connected_state(),
};
match bucket.insert(node.clone()) {
InsertResult::Pending { disconnected } => {
assert_eq!(disconnected, first_disconnected.key)
}
x => panic!("{:?}", x),
}
match bucket.insert(node.clone()) {
InsertResult::Full => {}
x => panic!("{:?}", x),
}
assert!(bucket.pending().is_some());
let pending = bucket.pending_mut().expect("No pending node.");
pending.set_ready_at(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
let result = bucket.apply_pending();
assert_eq!(
result,
Some(AppliedPending {
inserted: key.clone(),
evicted: Some(first_disconnected)
})
);
assert_eq!(
Some(connected_state()),
bucket.iter().map(|v| v.status).last()
);
assert!(bucket.pending().is_none());
assert_eq!(
Some(MAX_NODES_PER_BUCKET - (i + 1)),
bucket.first_connected_pos
);
}
assert!(bucket.pending().is_none());
assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries());
let key = Key::from(NodeId::random());
let node = Node {
key,
value: (),
status: connected_state(),
};
match bucket.insert(node) {
InsertResult::Full => {}
x => panic!("{:?}", x),
}
}
#[test]
fn full_bucket_discard_pending() {
let mut bucket =
KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
fill_bucket(&mut bucket, disconnected_state());
let first = bucket.iter().next().unwrap();
let first_disconnected = first.clone();
let key = Key::from(NodeId::random());
let node = Node {
key: key.clone(),
value: (),
status: connected_state(),
};
if let InsertResult::Pending { disconnected } = bucket.insert(node) {
assert_eq!(&disconnected, &first_disconnected.key);
} else {
panic!()
}
assert!(bucket.pending().is_some());
let _ = bucket.update_status(&first_disconnected.key, ConnectionState::Connected, None);
assert!(bucket.pending().is_none());
assert!(bucket.iter().all(|n| n.key != key));
assert_eq!(
Some((&first_disconnected.key, connected_state())),
bucket.iter().map(|v| (&v.key, v.status)).last()
);
assert_eq!(
bucket.position(&first_disconnected.key).map(|p| p.0),
bucket.first_connected_pos
);
assert_eq!(1, bucket.num_connected());
assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected());
}
#[test]
fn full_bucket_applied_no_duplicates() {
let mut bucket =
KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
fill_bucket(&mut bucket, connected_state());
let first = bucket.iter().next().unwrap().clone();
let third = bucket.iter().nth(2).unwrap().clone();
assert_eq!(
bucket.update_status(&first.key, ConnectionState::Disconnected, None),
UpdateResult::Updated
);
let key = Key::from(NodeId::random());
let node = Node {
key,
value: (),
status: connected_state(),
};
if let InsertResult::Pending { disconnected } = bucket.insert(node.clone()) {
assert_eq!(&disconnected, &first.key);
} else {
panic!()
}
assert!(bucket.pending().is_some());
bucket.remove(&third.key);
assert_eq!(bucket.apply_pending(), None);
assert_eq!(bucket.insert(node.clone()), InsertResult::Inserted);
assert!(bucket.pending.is_none());
if let Some(pending) = bucket.pending.as_mut() {
pending.replace = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
}
assert_eq!(bucket.apply_pending(), None);
assert_eq!(
bucket.update_status(&node.key, ConnectionState::Connected, None),
UpdateResult::NotModified
);
}
#[test]
fn bucket_update_status() {
fn prop(mut bucket: KBucket<NodeId, ()>, pos: Position, status: NodeStatus) -> bool {
let num_nodes = bucket.num_entries();
let pos = pos.0 % num_nodes;
let key = bucket.nodes[pos].key.clone();
let mut expected = bucket
.iter()
.map(|n| (n.key.clone(), n.status))
.collect::<Vec<_>>();
let _ = bucket.update_status(&key, status.state, Some(status.direction));
let expected_pos = if status.is_connected() {
num_nodes - 1
} else {
bucket.first_connected_pos.unwrap_or(num_nodes) - 1
};
expected.remove(pos);
expected.insert(expected_pos, (key, status));
let actual = bucket
.iter()
.map(|n| (n.key.clone(), n.status))
.collect::<Vec<_>>();
expected == actual
}
quickcheck(prop as fn(_, _, _) -> _);
}
#[test]
fn bucket_update_value_with_filtering() {
fn prop(
mut bucket: KBucket<NodeId, u8>,
pos: Position,
value: u8,
value_matches_filter: bool,
) -> bool {
let filter = SetFilter {
set: value_matches_filter.then_some(value).into_iter().collect(),
};
bucket.filter = Some(Box::new(filter));
let num_nodes = bucket.num_entries();
let pos = pos.0 % num_nodes;
let key = bucket.nodes[pos].key.clone();
let mut expected = bucket
.iter()
.map(|n| (n.key.clone(), n.value))
.collect::<Vec<_>>();
let _ = bucket.update_value(&key, value);
bucket.check_invariants();
if value_matches_filter || expected[pos].1 == value {
expected[pos].1 = value;
} else {
expected.remove(pos);
}
let actual = bucket
.iter()
.map(|n| (n.key.clone(), n.value))
.collect::<Vec<_>>();
expected == actual
}
quickcheck(prop as fn(_, _, _, _) -> _);
}
#[test]
fn random_actions_with_filtering() {
fn prop(
initial_nodes: Vec<Node<NodeId, u8>>,
pending_timeout_millis: u64,
max_incoming: usize,
filter_set: HashSet<u8>,
actions: Vec<Action<u8>>,
) -> bool {
let filter = SetFilter { set: filter_set };
let pending_timeout = Duration::from_millis(pending_timeout_millis);
let mut kbucket =
KBucket::<NodeId, u8>::new(pending_timeout, max_incoming, Some(Box::new(filter)));
for node in initial_nodes {
let _ = kbucket.insert(node);
}
for action in actions {
let _ = kbucket.apply_action(action);
kbucket.check_invariants();
}
true
}
quickcheck(prop as fn(_, _, _, _, _) -> _);
}
#[test]
fn table_update_status_connection() {
let max_incoming = 7;
let mut bucket = KBucket::<NodeId, ()>::new(Duration::from_secs(1), max_incoming, None);
let mut incoming_connected = 0;
let mut keys = Vec::new();
for _ in 0..MAX_NODES_PER_BUCKET {
let key = Key::from(NodeId::random());
keys.push(key.clone());
incoming_connected += 1;
let direction = if incoming_connected <= max_incoming {
ConnectionDirection::Incoming
} else {
ConnectionDirection::Outgoing
};
let status = NodeStatus {
state: ConnectionState::Connected,
direction,
};
let node = Node {
key: key.clone(),
value: (),
status,
};
assert_eq!(InsertResult::Inserted, bucket.insert(node));
}
let result = bucket.update_status(
&keys[max_incoming],
ConnectionState::Disconnected,
Some(ConnectionDirection::Incoming),
);
assert_eq!(result, UpdateResult::Updated);
let result = bucket.update_status(
&keys[max_incoming],
ConnectionState::Connected,
Some(ConnectionDirection::Outgoing),
);
assert_eq!(result, UpdateResult::UpdatedAndPromoted);
let result = bucket.update_status(
&keys[max_incoming],
ConnectionState::Connected,
Some(ConnectionDirection::Outgoing),
);
assert_eq!(result, UpdateResult::NotModified);
let result = bucket.update_status(
&keys[max_incoming],
ConnectionState::Connected,
Some(ConnectionDirection::Incoming),
);
assert_eq!(result, UpdateResult::Failed(FailureReason::TooManyIncoming));
}
#[test]
fn bucket_max_incoming_nodes() {
fn prop(status: Vec<NodeStatus>) -> bool {
let max_incoming_nodes = 5;
let mut bucket =
KBucket::<NodeId, ()>::new(Duration::from_secs(1), max_incoming_nodes, None);
let mut connected = VecDeque::new();
let mut disconnected = VecDeque::new();
for status in status {
let key = Key::from(NodeId::random());
let node = Node {
key: key.clone(),
value: (),
status,
};
let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
match bucket.insert(node) {
InsertResult::Inserted => {
let vec = if status.is_connected() {
&mut connected
} else {
&mut disconnected
};
if full {
vec.pop_front();
}
vec.push_back((status, key.clone()));
}
InsertResult::FailedFilter => break,
_ => {}
}
}
bucket.check_invariants();
let mut nodes = bucket
.iter()
.map(|n| (n.status, n.key.clone()))
.collect::<Vec<_>>();
let tail = bucket
.first_connected_pos
.map_or(Vec::new(), |p| nodes.split_off(p));
disconnected == nodes && connected == tail
}
quickcheck(prop as fn(_) -> _);
}
}