use super::*;
pub(crate) use crate::K_VALUE;
#[derive(Debug, Clone)]
pub(crate) struct PendingNode<TKey, TVal> {
node: Node<TKey, TVal>,
status: NodeStatus,
replace: Instant,
}
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub enum NodeStatus {
Connected,
Disconnected,
}
impl<TKey, TVal> PendingNode<TKey, TVal> {
pub(crate) fn status(&self) -> NodeStatus {
self.status
}
pub(crate) fn value_mut(&mut self) -> &mut TVal {
&mut self.node.value
}
pub(crate) fn is_ready(&self) -> bool {
Instant::now() >= self.replace
}
#[cfg(test)]
pub(crate) fn set_ready_at(&mut self, t: Instant) {
self.replace = t;
}
pub(crate) fn into_node(self) -> Node<TKey, TVal> {
self.node
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Node<TKey, TVal> {
pub key: TKey,
pub value: TVal,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub(crate) struct Position(usize);
#[derive(Debug, Clone)]
pub(crate) struct KBucket<TKey, TVal> {
nodes: Vec<Node<TKey, TVal>>,
capacity: usize,
first_connected_pos: Option<usize>,
pending: Option<PendingNode<TKey, TVal>>,
pending_timeout: Duration,
}
#[must_use]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum InsertResult<TKey> {
Inserted,
Pending {
disconnected: TKey,
},
Full,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct AppliedPending<TKey, TVal> {
pub(crate) inserted: Node<TKey, TVal>,
pub(crate) evicted: Option<Node<TKey, TVal>>,
}
impl<TKey, TVal> Default for KBucket<TKey, TVal> {
fn default() -> Self {
KBucket {
nodes: Vec::with_capacity(K_VALUE.get()),
capacity: K_VALUE.get(),
first_connected_pos: None,
pending: None,
pending_timeout: Duration::from_secs(60),
}
}
}
impl<TKey, TVal> KBucket<TKey, TVal>
where
TKey: Clone + AsRef<KeyBytes>,
TVal: Clone,
{
pub(crate) fn new(config: KBucketConfig) -> Self {
KBucket {
nodes: Vec::with_capacity(config.bucket_size),
capacity: config.bucket_size,
first_connected_pos: None,
pending: None,
pending_timeout: config.pending_timeout,
}
}
pub(crate) fn pending(&self) -> Option<&PendingNode<TKey, TVal>> {
self.pending.as_ref()
}
pub(crate) fn pending_mut(&mut self) -> Option<&mut PendingNode<TKey, TVal>> {
self.pending.as_mut()
}
pub(crate) fn as_pending(&self, key: &TKey) -> Option<&PendingNode<TKey, TVal>> {
self.pending()
.filter(|p| p.node.key.as_ref() == key.as_ref())
}
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Node<TKey, TVal>, NodeStatus)> {
self.nodes
.iter()
.enumerate()
.map(move |(p, n)| (n, self.status(Position(p))))
}
pub(crate) fn apply_pending(&mut self) -> Option<AppliedPending<TKey, TVal>> {
if let Some(pending) = self.pending.take() {
if pending.replace <= Instant::now() {
if self.nodes.len() >= self.capacity {
if self.status(Position(0)) == NodeStatus::Connected {
return None;
}
debug_assert!(self.first_connected_pos.is_none_or(|p| p > 0)); let inserted = pending.node.clone();
if pending.status == NodeStatus::Connected {
let evicted = Some(self.nodes.remove(0));
self.first_connected_pos = self
.first_connected_pos
.map_or_else(|| Some(self.nodes.len()), |p| p.checked_sub(1));
self.nodes.push(pending.node);
return Some(AppliedPending { inserted, evicted });
}
else if let Some(p) = self.first_connected_pos {
let insert_pos = p.checked_sub(1).expect("by (*)");
let evicted = Some(self.nodes.remove(0));
self.nodes.insert(insert_pos, pending.node);
return Some(AppliedPending { inserted, evicted });
} else {
let evicted = Some(self.nodes.remove(0));
self.nodes.push(pending.node);
return Some(AppliedPending { inserted, evicted });
}
} else {
let inserted = pending.node.clone();
match self.insert(pending.node, pending.status) {
InsertResult::Inserted => {
return Some(AppliedPending {
inserted,
evicted: None,
})
}
_ => unreachable!("Bucket is not full."),
}
}
} else {
self.pending = Some(pending);
}
}
None
}
pub(crate) fn update_pending(&mut self, status: NodeStatus) {
if let Some(pending) = &mut self.pending {
pending.status = status
}
}
pub(crate) fn remove_pending(&mut self) -> Option<PendingNode<TKey, TVal>> {
self.pending.take()
}
pub(crate) fn update(&mut self, key: &TKey, status: NodeStatus) {
if let Some((node, _status, pos)) = self.remove(key) {
if pos == Position(0) && status == NodeStatus::Connected {
self.pending = None
}
match self.insert(node, status) {
InsertResult::Inserted => {}
_ => unreachable!("The node is removed before being (re)inserted."),
}
}
}
pub(crate) fn insert(
&mut self,
node: Node<TKey, TVal>,
status: NodeStatus,
) -> InsertResult<TKey> {
match status {
NodeStatus::Connected => {
if self.nodes.len() >= self.capacity {
if self.first_connected_pos == Some(0) || self.pending.is_some() {
return InsertResult::Full;
} else {
self.pending = Some(PendingNode {
node,
status: NodeStatus::Connected,
replace: Instant::now() + self.pending_timeout,
});
return InsertResult::Pending {
disconnected: self.nodes[0].key.clone(),
};
}
}
let pos = self.nodes.len();
self.first_connected_pos = self.first_connected_pos.or(Some(pos));
self.nodes.push(node);
InsertResult::Inserted
}
NodeStatus::Disconnected => {
if self.nodes.len() >= self.capacity {
return InsertResult::Full;
}
if let Some(ref mut p) = self.first_connected_pos {
self.nodes.insert(*p, node);
*p += 1;
} else {
self.nodes.push(node);
}
InsertResult::Inserted
}
}
}
pub(crate) fn remove(
&mut self,
key: &TKey,
) -> Option<(Node<TKey, TVal>, NodeStatus, Position)> {
if let Some(pos) = self.position(key) {
let status = self.status(pos);
let node = self.nodes.remove(pos.0);
match status {
NodeStatus::Connected => {
if self.first_connected_pos.is_some_and(|p| p == pos.0)
&& pos.0 == self.nodes.len()
{
self.first_connected_pos = None
}
}
NodeStatus::Disconnected => {
if let Some(ref mut p) = self.first_connected_pos {
*p -= 1;
}
}
}
Some((node, status, pos))
} else {
None
}
}
pub(crate) fn status(&self, pos: Position) -> NodeStatus {
if self.first_connected_pos.is_some_and(|i| pos.0 >= i) {
NodeStatus::Connected
} else {
NodeStatus::Disconnected
}
}
pub(crate) fn num_entries(&self) -> usize {
self.nodes.len()
}
#[cfg(test)]
pub(crate) fn num_connected(&self) -> usize {
self.first_connected_pos.map_or(0, |i| self.nodes.len() - i)
}
#[cfg(test)]
pub(crate) fn num_disconnected(&self) -> usize {
self.nodes.len() - self.num_connected()
}
pub(crate) fn position(&self, key: &TKey) -> Option<Position> {
self.nodes
.iter()
.position(|p| p.key.as_ref() == key.as_ref())
.map(Position)
}
pub(crate) fn get_mut(&mut self, key: &TKey) -> Option<&mut Node<TKey, TVal>> {
self.nodes
.iter_mut()
.find(move |p| p.key.as_ref() == key.as_ref())
}
}
#[cfg(test)]
mod tests {
use libp2p_identity::PeerId;
use quickcheck::*;
use super::*;
impl Arbitrary for KBucket<Key<PeerId>, ()> {
fn arbitrary(g: &mut Gen) -> KBucket<Key<PeerId>, ()> {
let timeout = Duration::from_secs(g.gen_range(1..g.size()) as u64);
let mut config = KBucketConfig::default();
config.set_pending_timeout(timeout);
let mut bucket = KBucket::<Key<PeerId>, ()>::new(config);
let num_nodes = g.gen_range(1..bucket.capacity + 1);
for _ in 0..num_nodes {
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
let status = NodeStatus::arbitrary(g);
match bucket.insert(node, status) {
InsertResult::Inserted => {}
_ => panic!(),
}
}
bucket
}
}
impl Arbitrary for NodeStatus {
fn arbitrary(g: &mut Gen) -> NodeStatus {
if bool::arbitrary(g) {
NodeStatus::Connected
} else {
NodeStatus::Disconnected
}
}
}
impl Arbitrary for Position {
fn arbitrary(g: &mut Gen) -> Position {
Position(g.gen_range(0..K_VALUE.get()))
}
}
fn fill_bucket(bucket: &mut KBucket<Key<PeerId>, ()>, status: NodeStatus) {
let num_entries_start = bucket.num_entries();
for i in 0..bucket.capacity - num_entries_start {
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
assert_eq!(InsertResult::Inserted, bucket.insert(node, status));
assert_eq!(bucket.num_entries(), num_entries_start + i + 1);
}
}
#[test]
fn ordering() {
fn prop(status: Vec<NodeStatus>) -> bool {
let mut bucket = KBucket::<Key<PeerId>, ()>::default();
let mut connected = VecDeque::new();
let mut disconnected = VecDeque::new();
for status in status {
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
let full = bucket.num_entries() == bucket.capacity;
if let InsertResult::Inserted = bucket.insert(node, status) {
let vec = match status {
NodeStatus::Connected => &mut connected,
NodeStatus::Disconnected => &mut disconnected,
};
if full {
vec.pop_front();
}
vec.push_back((status, key));
}
}
let mut nodes = bucket.iter().map(|(n, s)| (s, n.key)).collect::<Vec<_>>();
let first_connected_pos = nodes.iter().position(|(s, _)| *s == NodeStatus::Connected);
assert_eq!(bucket.first_connected_pos, first_connected_pos);
let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p));
disconnected == nodes && connected == tail
}
quickcheck(prop as fn(_) -> _);
}
#[test]
fn full_bucket() {
let mut bucket = KBucket::<Key<PeerId>, ()>::default();
fill_bucket(&mut bucket, NodeStatus::Disconnected);
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
match bucket.insert(node, NodeStatus::Disconnected) {
InsertResult::Full => {}
x => panic!("{x:?}"),
}
for i in 0..bucket.capacity {
let (first, first_status) = bucket.iter().next().unwrap();
let first_disconnected = first.clone();
assert_eq!(first_status, NodeStatus::Disconnected);
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
match bucket.insert(node.clone(), NodeStatus::Connected) {
InsertResult::Pending { disconnected } => {
assert_eq!(disconnected, first_disconnected.key)
}
x => panic!("{x:?}"),
}
match bucket.insert(node.clone(), NodeStatus::Connected) {
InsertResult::Full => {}
x => panic!("{x:?}"),
}
assert!(bucket.pending().is_some());
let pending = bucket.pending_mut().expect("No pending node.");
pending.set_ready_at(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
let result = bucket.apply_pending();
assert_eq!(
result,
Some(AppliedPending {
inserted: node.clone(),
evicted: Some(first_disconnected)
})
);
assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last());
assert!(bucket.pending().is_none());
assert_eq!(Some(bucket.capacity - (i + 1)), bucket.first_connected_pos);
}
assert!(bucket.pending().is_none());
assert_eq!(bucket.capacity, bucket.num_entries());
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
match bucket.insert(node, NodeStatus::Connected) {
InsertResult::Full => {}
x => panic!("{x:?}"),
}
}
#[test]
fn full_bucket_discard_pending() {
let mut bucket = KBucket::<Key<PeerId>, ()>::default();
fill_bucket(&mut bucket, NodeStatus::Disconnected);
let (first, _) = bucket.iter().next().unwrap();
let first_disconnected = first.clone();
let key = Key::from(PeerId::random());
let node = Node { key, value: () };
if let InsertResult::Pending { disconnected } = bucket.insert(node, NodeStatus::Connected) {
assert_eq!(&disconnected, &first_disconnected.key);
} else {
panic!()
}
assert!(bucket.pending().is_some());
bucket.update(&first_disconnected.key, NodeStatus::Connected);
assert!(bucket.pending().is_none());
assert!(bucket.iter().all(|(n, _)| n.key != key));
assert_eq!(
Some((&first_disconnected, NodeStatus::Connected)),
bucket.iter().last()
);
assert_eq!(
bucket.position(&first_disconnected.key).map(|p| p.0),
bucket.first_connected_pos
);
assert_eq!(1, bucket.num_connected());
assert_eq!(bucket.capacity - 1, bucket.num_disconnected());
}
#[test]
fn bucket_update() {
fn prop(mut bucket: KBucket<Key<PeerId>, ()>, pos: Position, status: NodeStatus) -> bool {
let num_nodes = bucket.num_entries();
let pos = pos.0 % num_nodes;
let key = bucket.nodes[pos].key;
let mut expected = bucket.iter().map(|(n, s)| (n.key, s)).collect::<Vec<_>>();
bucket.update(&key, status);
let expected_pos = match status {
NodeStatus::Connected => num_nodes - 1,
NodeStatus::Disconnected => bucket.first_connected_pos.unwrap_or(num_nodes) - 1,
};
expected.remove(pos);
expected.insert(expected_pos, (key, status));
let actual = bucket.iter().map(|(n, s)| (n.key, s)).collect::<Vec<_>>();
expected == actual
}
quickcheck(prop as fn(_, _, _) -> _);
}
#[test]
fn test_custom_bucket_size() {
let bucket_sizes: [NonZeroUsize; 4] = [
NonZeroUsize::new(2).unwrap(),
NonZeroUsize::new(20).unwrap(),
NonZeroUsize::new(200).unwrap(),
NonZeroUsize::new(2000).unwrap(),
];
for &size in &bucket_sizes {
let mut config = KBucketConfig::default();
config.set_bucket_size(size);
let mut bucket = KBucket::<Key<PeerId>, ()>::new(config);
fill_bucket(&mut bucket, NodeStatus::Disconnected);
assert_eq!(size.get(), bucket.num_entries());
}
}
}