1#![allow(dead_code)]
32
33use super::*;
34use tracing::{debug, error};
35
36pub const MAX_NODES_PER_BUCKET: usize = 16;
38
39#[derive(Debug, Clone)]
41pub struct PendingNode<TNodeId, TVal: Eq> {
42 node: Node<TNodeId, TVal>,
44
45 replace: Instant,
47}
48
49#[derive(PartialEq, Eq, Debug, Copy, Clone)]
55pub struct NodeStatus {
56 pub direction: ConnectionDirection,
59 pub state: ConnectionState,
61}
62
63#[derive(PartialEq, Eq, Debug, Copy, Clone)]
65pub enum ConnectionState {
66 Connected,
68 Disconnected,
70}
71
72impl NodeStatus {
73 pub fn is_connected(&self) -> bool {
74 match self.state {
75 ConnectionState::Connected => true,
76 ConnectionState::Disconnected => false,
77 }
78 }
79
80 pub fn is_incoming(&self) -> bool {
81 match self.direction {
82 ConnectionDirection::Outgoing => false,
83 ConnectionDirection::Incoming => true,
84 }
85 }
86}
87
88impl<TNodeId, TVal: Eq> PendingNode<TNodeId, TVal> {
89 pub fn status(&self) -> NodeStatus {
90 self.node.status
91 }
92
93 pub fn value_mut(&mut self) -> &mut TVal {
94 &mut self.node.value
95 }
96
97 pub fn set_ready_at(&mut self, t: Instant) {
98 self.replace = t;
99 }
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct Node<TNodeId, TVal: Eq> {
107 pub key: Key<TNodeId>,
109 pub value: TVal,
111 pub status: NodeStatus,
113}
114
115#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
118pub struct Position(usize);
119
120#[derive(Clone)]
123pub struct KBucket<TNodeId, TVal: Eq> {
124 nodes: ArrayVec<Node<TNodeId, TVal>, MAX_NODES_PER_BUCKET>,
126
127 first_connected_pos: Option<usize>,
140
141 pending: Option<PendingNode<TNodeId, TVal>>,
145
146 pending_timeout: Duration,
150
151 filter: Option<Box<dyn Filter<TVal>>>,
154
155 max_incoming: usize,
158}
159
160#[must_use]
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub enum InsertResult<TNodeId> {
164 Inserted,
166 Pending {
171 disconnected: Key<TNodeId>,
176 },
177 FailedFilter,
179 TooManyIncoming,
181 Full,
183 NodeExists,
185}
186
187#[must_use]
189#[derive(Debug, Clone, PartialEq, Eq)]
190pub enum UpdateResult {
191 Updated,
193 UpdatedAndPromoted,
195 UpdatedPending,
197 Failed(FailureReason),
199 NotModified,
201}
202
203impl UpdateResult {
204 pub fn failed(&self) -> bool {
206 matches!(self, UpdateResult::Failed(_))
207 }
208}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum FailureReason {
213 TooManyIncoming,
215 BucketFilter,
217 TableFilter,
219 KeyNonExistent,
221 BucketFull,
223 InvalidSelfUpdate,
225}
226
227#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct AppliedPending<TNodeId, TVal: Eq> {
231 pub inserted: Key<TNodeId>,
233 pub evicted: Option<Node<TNodeId, TVal>>,
236}
237
238impl<TNodeId, TVal> KBucket<TNodeId, TVal>
239where
240 TNodeId: Clone,
241 TVal: Eq,
242{
243 pub fn new(
245 pending_timeout: Duration,
246 max_incoming: usize,
247 filter: Option<Box<dyn Filter<TVal>>>,
248 ) -> Self {
249 KBucket {
250 nodes: ArrayVec::new(),
251 first_connected_pos: None,
252 pending: None,
253 pending_timeout,
254 filter,
255 max_incoming,
256 }
257 }
258
259 pub fn pending(&self) -> Option<&PendingNode<TNodeId, TVal>> {
261 self.pending.as_ref()
262 }
263
264 pub fn pending_mut(&mut self) -> Option<&mut PendingNode<TNodeId, TVal>> {
266 self.pending.as_mut()
267 }
268
269 pub fn as_pending(&self, key: &Key<TNodeId>) -> Option<&PendingNode<TNodeId, TVal>> {
272 self.pending().filter(|p| &p.node.key == key)
273 }
274
275 pub fn iter(&self) -> impl Iterator<Item = &Node<TNodeId, TVal>> {
277 self.nodes.iter()
278 }
279
280 pub fn apply_pending(&mut self) -> Option<AppliedPending<TNodeId, TVal>> {
287 if let Some(pending) = self.pending.take() {
288 if pending.replace <= Instant::now() {
289 if self.nodes.is_full() {
291 if self.nodes[0].status.is_connected() {
294 return None;
296 }
297 if let Some(filter) = self.filter.as_ref() {
299 if !filter.filter(
300 &pending.node.value,
301 &mut self.iter().map(|node| &node.value),
302 ) {
303 return None;
306 }
307 }
308 if pending.status().is_connected() && pending.status().is_incoming() {
310 if self.is_max_incoming() {
312 return None;
315 }
316 }
317
318 let inserted = pending.node.key.clone();
320 if pending.status().is_connected() {
323 let evicted = Some(self.nodes.remove(0));
324 self.first_connected_pos = self
325 .first_connected_pos
326 .map_or_else(|| Some(self.nodes.len()), |p| p.checked_sub(1));
327 self.nodes.push(pending.node);
328 return Some(AppliedPending { inserted, evicted });
329 }
330 else if let Some(p) = self.first_connected_pos {
333 if let Some(insert_pos) = p.checked_sub(1) {
334 let evicted = Some(self.nodes.remove(0));
335 self.nodes.insert(insert_pos, pending.node);
336 return Some(AppliedPending { inserted, evicted });
337 }
338 } else {
339 let evicted = Some(self.nodes.remove(0));
342 self.nodes.push(pending.node);
343 return Some(AppliedPending { inserted, evicted });
344 }
345 } else {
346 let inserted = pending.node.key.clone();
348 match self.insert(pending.node) {
349 InsertResult::Inserted => {
350 return Some(AppliedPending {
351 inserted,
352 evicted: None,
353 })
354 }
355 InsertResult::Full => unreachable!("Bucket cannot be full"),
356 InsertResult::Pending { .. } | InsertResult::NodeExists => {
357 error!("Bucket is not full or double node")
358 }
359 InsertResult::FailedFilter => debug!("Pending node failed filter"),
360 InsertResult::TooManyIncoming => {
361 debug!("Pending node failed incoming filter")
362 }
363 }
364 }
365 } else {
366 self.pending = Some(pending);
367 }
368 }
369
370 None
371 }
372
373 pub fn update_pending(&mut self, status: NodeStatus) {
375 if let Some(pending) = &mut self.pending {
376 pending.node.status = status
377 }
378 }
379
380 pub fn update_status(
386 &mut self,
387 key: &Key<TNodeId>,
388 state: ConnectionState,
389 direction: Option<ConnectionDirection>,
390 ) -> UpdateResult {
391 if let Some(pos) = self.position(key) {
397 let mut node = self.nodes.remove(pos.0);
399 let old_status = node.status;
400 node.status.state = state;
401 if let Some(direction) = direction {
402 node.status.direction = direction;
403 }
404
405 let not_modified = old_status == node.status;
407 let is_connected = matches!(state, ConnectionState::Connected);
409
410 match old_status.state {
412 ConnectionState::Connected => {
413 if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
414 self.first_connected_pos = None
416 }
417 }
418 ConnectionState::Disconnected => {
419 self.first_connected_pos =
420 self.first_connected_pos.and_then(|p| p.checked_sub(1))
421 }
422 }
423 if pos == Position(0) && is_connected {
426 self.pending = None
427 }
428 match self.insert(node) {
430 InsertResult::Inserted => {
431 if not_modified {
432 UpdateResult::NotModified
433 } else if !old_status.is_connected() && is_connected {
434 UpdateResult::UpdatedAndPromoted
437 } else {
438 UpdateResult::Updated
439 }
440 }
441 InsertResult::TooManyIncoming => {
442 UpdateResult::Failed(FailureReason::TooManyIncoming)
443 }
444 InsertResult::FailedFilter => {
446 UpdateResult::Failed(FailureReason::BucketFilter)
449 }
450 InsertResult::NodeExists => {
451 unreachable!("The node was removed and shouldn't already exist")
452 }
453 InsertResult::Full => {
454 unreachable!("The node was removed so the bucket cannot be full")
455 }
456 InsertResult::Pending { .. } => {
457 unreachable!("The node was removed so can't be added as pending")
458 }
459 }
460 } else if let Some(pending) = &mut self.pending {
461 if &pending.node.key == key {
462 pending.node.status.state = state;
463 if let Some(direction) = direction {
464 pending.node.status.direction = direction;
465 }
466 UpdateResult::UpdatedPending
467 } else {
468 UpdateResult::Failed(FailureReason::KeyNonExistent)
469 }
470 } else {
471 UpdateResult::Failed(FailureReason::KeyNonExistent)
472 }
473 }
474
475 pub fn update_value(&mut self, key: &Key<TNodeId>, value: TVal) -> UpdateResult {
481 if let Some(Position(pos)) = self.position(key) {
483 let mut node = self.nodes.remove(pos);
485 if node.value == value {
486 self.nodes.insert(pos, node);
487 UpdateResult::NotModified
488 } else {
489 if let Some(filter) = self.filter.as_ref() {
491 if !filter.filter(&value, &mut self.iter().map(|node| &node.value)) {
492 self.update_first_connected_pos_for_removal(pos);
494
495 return UpdateResult::Failed(FailureReason::BucketFilter);
496 }
497 }
498 node.value = value;
499 self.nodes.insert(pos, node);
500 UpdateResult::Updated
501 }
502 } else if let Some(pending) = &mut self.pending {
503 if &pending.node.key == key {
504 pending.node.value = value;
505 UpdateResult::UpdatedPending
506 } else {
507 UpdateResult::Failed(FailureReason::KeyNonExistent)
508 }
509 } else {
510 UpdateResult::Failed(FailureReason::KeyNonExistent)
511 }
512 }
513
514 pub fn insert(&mut self, node: Node<TNodeId, TVal>) -> InsertResult<TNodeId> {
536 if self.position(&node.key).is_some() {
538 return InsertResult::NodeExists;
539 }
540
541 if let Some(filter) = self.filter.as_ref() {
543 if !filter.filter(&node.value, &mut self.iter().map(|node| &node.value)) {
544 return InsertResult::FailedFilter;
545 }
546 }
547
548 let inserting_pending = self
549 .pending
550 .as_ref()
551 .map(|pending| pending.node.key == node.key)
552 .unwrap_or_default();
553
554 let insert_result = match node.status.state {
555 ConnectionState::Connected => {
556 if node.status.is_incoming() {
557 if self.is_max_incoming() {
559 return InsertResult::TooManyIncoming;
560 }
561 }
562 if self.nodes.is_full() {
563 if self.first_connected_pos == Some(0) || self.pending.is_some() {
564 return InsertResult::Full;
565 } else {
566 self.pending = Some(PendingNode {
567 node,
568 replace: Instant::now() + self.pending_timeout,
569 });
570 return InsertResult::Pending {
571 disconnected: self.nodes[0].key.clone(),
572 };
573 }
574 }
575
576 let pos = self.nodes.len();
577 self.first_connected_pos = self.first_connected_pos.or(Some(pos));
578 self.nodes.push(node);
579 InsertResult::Inserted
580 }
581 ConnectionState::Disconnected => {
582 if self.nodes.is_full() {
583 return InsertResult::Full;
584 }
585
586 if let Some(ref mut first_connected_pos) = self.first_connected_pos {
587 self.nodes.insert(*first_connected_pos, node);
588 *first_connected_pos += 1;
589 } else {
590 self.nodes.push(node);
591 }
592 InsertResult::Inserted
593 }
594 };
595
596 if matches!(insert_result, InsertResult::Inserted) && inserting_pending {
600 self.pending = None
601 }
602 insert_result
603 }
604
605 pub fn remove(&mut self, key: &Key<TNodeId>) -> bool {
607 if let Some(Position(position)) = self.position(key) {
608 self.nodes.remove(position);
609 self.update_first_connected_pos_for_removal(position);
610 self.apply_pending();
611 true
612 } else {
613 false
614 }
615 }
616
617 pub fn num_entries(&self) -> usize {
619 self.nodes.len()
620 }
621
622 pub fn num_connected(&self) -> usize {
624 self.first_connected_pos.map_or(0, |i| self.nodes.len() - i)
625 }
626
627 pub fn num_disconnected(&self) -> usize {
629 self.nodes.len() - self.num_connected()
630 }
631
632 pub fn position(&self, key: &Key<TNodeId>) -> Option<Position> {
634 self.nodes.iter().position(|p| &p.key == key).map(Position)
635 }
636
637 pub fn status(&self, pos: Position) -> NodeStatus {
639 if let Some(node) = self.nodes.get(pos.0) {
640 node.status
641 } else {
642 NodeStatus {
644 state: ConnectionState::Disconnected,
645 direction: ConnectionDirection::Incoming,
646 }
647 }
648 }
649
650 pub fn get_mut(&mut self, key: &Key<TNodeId>) -> Option<&mut Node<TNodeId, TVal>> {
655 self.nodes.iter_mut().find(move |p| &p.key == key)
656 }
657
658 pub fn get(&self, key: &Key<TNodeId>) -> Option<&Node<TNodeId, TVal>> {
663 self.nodes.iter().find(move |p| &p.key == key)
664 }
665
666 fn is_max_incoming(&self) -> bool {
669 self.nodes
670 .iter()
671 .filter(|node| node.status.is_connected() && node.status.is_incoming())
672 .count()
673 >= self.max_incoming
674 }
675
676 fn update_first_connected_pos_for_removal(&mut self, removed_pos: usize) {
681 self.first_connected_pos = self.first_connected_pos.and_then(|fcp| {
682 if removed_pos < fcp {
683 Some(fcp - 1)
685 } else {
686 Some(fcp).filter(|_| fcp < self.nodes.len())
688 }
689 });
690 }
691}
692
693impl<TNodeId: std::fmt::Debug, TVal: Eq + std::fmt::Debug> std::fmt::Debug
694 for KBucket<TNodeId, TVal>
695{
696 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697 f.debug_struct("KBucket")
698 .field("nodes", &self.nodes)
699 .field("first_connected_pos", &self.first_connected_pos)
700 .field("pending", &self.pending)
701 .field("pending_timeout", &self.pending_timeout)
702 .field("filter", &self.filter.is_some())
703 .field("max_incoming", &self.max_incoming)
704 .finish()
705 }
706}
707
708impl std::fmt::Display for ConnectionDirection {
709 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710 match &self {
711 ConnectionDirection::Incoming => write!(f, "Incoming"),
712 ConnectionDirection::Outgoing => write!(f, "Outgoing"),
713 }
714 }
715}
716
717#[cfg(test)]
718pub mod tests {
719 use super::*;
720 use enr::NodeId;
721 use quickcheck::*;
722 use rand_07::Rng;
723 use std::{
724 collections::{HashSet, VecDeque},
725 hash::Hash,
726 };
727
728 fn connected_state() -> NodeStatus {
729 NodeStatus {
730 state: ConnectionState::Connected,
731 direction: ConnectionDirection::Outgoing,
732 }
733 }
734
735 fn disconnected_state() -> NodeStatus {
736 NodeStatus {
737 state: ConnectionState::Disconnected,
738 direction: ConnectionDirection::Outgoing,
739 }
740 }
741
742 pub fn arbitrary_node_id<G: Gen>(g: &mut G) -> NodeId {
743 let mut node_id = [0u8; 32];
744 g.fill_bytes(&mut node_id);
745 NodeId::new(&node_id)
746 }
747
748 impl<V> KBucket<NodeId, V>
749 where
750 V: Eq + std::fmt::Debug,
751 {
752 fn check_invariants(&self) {
754 self.check_first_connected_pos();
755 self.check_status_ordering();
756 self.check_max_incoming_nodes();
757 }
758
759 fn check_first_connected_pos(&self) {
761 let first_connected_pos = self
762 .nodes
763 .iter()
764 .position(|node| node.status.is_connected());
765 assert_eq!(self.first_connected_pos, first_connected_pos);
766 }
767
768 fn check_status_ordering(&self) {
770 let first_connected_pos = self.first_connected_pos.unwrap_or(self.nodes.len());
771 assert!(self.nodes[..first_connected_pos]
772 .iter()
773 .all(|n| !n.status.is_connected()));
774 assert!(self.nodes[first_connected_pos..]
775 .iter()
776 .all(|n| n.status.is_connected()));
777 }
778
779 fn check_max_incoming_nodes(&self) {
781 let number_of_incoming_nodes = self
782 .nodes
783 .iter()
784 .filter(|n| n.status.is_connected() && n.status.is_incoming())
785 .count();
786 assert!(number_of_incoming_nodes <= self.max_incoming);
787 }
788 }
789
790 impl<V> Arbitrary for KBucket<NodeId, V>
791 where
792 V: Arbitrary + Eq,
793 {
794 fn arbitrary<G: Gen>(g: &mut G) -> KBucket<NodeId, V> {
795 let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64));
796 let mut bucket = KBucket::<NodeId, V>::new(timeout, MAX_NODES_PER_BUCKET, None);
797 let num_nodes = g.gen_range(1, MAX_NODES_PER_BUCKET + 1);
798 for _ in 0..num_nodes {
799 loop {
800 let node = Node::arbitrary(g);
801 match bucket.insert(node) {
802 InsertResult::Inserted => break,
803 InsertResult::TooManyIncoming => {}
804 _ => panic!(),
805 }
806 }
807 }
808 bucket
809 }
810 }
811
812 impl<V> Arbitrary for Node<NodeId, V>
813 where
814 V: Arbitrary + Eq,
815 {
816 fn arbitrary<G: Gen>(g: &mut G) -> Self {
817 let key = Key::from(arbitrary_node_id(g));
818 Node {
819 key,
820 value: V::arbitrary(g),
821 status: NodeStatus::arbitrary(g),
822 }
823 }
824 }
825
826 impl Arbitrary for NodeStatus {
827 fn arbitrary<G: Gen>(g: &mut G) -> NodeStatus {
828 match g.gen_range(1, 4) {
829 1 => NodeStatus {
830 direction: ConnectionDirection::Incoming,
831 state: ConnectionState::Connected,
832 },
833 2 => NodeStatus {
834 direction: ConnectionDirection::Outgoing,
835 state: ConnectionState::Connected,
836 },
837 3 => NodeStatus {
838 direction: ConnectionDirection::Incoming,
839 state: ConnectionState::Disconnected,
840 },
841 4 => NodeStatus {
842 direction: ConnectionDirection::Outgoing,
843 state: ConnectionState::Disconnected,
844 },
845 x => unreachable!("Should not generate numbers out of this range {}", x),
846 }
847 }
848 }
849
850 impl Arbitrary for Position {
851 fn arbitrary<G: Gen>(g: &mut G) -> Position {
852 Position(g.gen_range(0, MAX_NODES_PER_BUCKET))
853 }
854 }
855
856 fn fill_bucket(bucket: &mut KBucket<NodeId, ()>, status: NodeStatus) {
858 let num_entries_start = bucket.num_entries();
859 for i in 0..MAX_NODES_PER_BUCKET - num_entries_start {
860 let key = Key::from(NodeId::random());
861 let node = Node {
862 key,
863 value: (),
864 status,
865 };
866 assert_eq!(InsertResult::Inserted, bucket.insert(node));
867 assert_eq!(bucket.num_entries(), num_entries_start + i + 1);
868 }
869 }
870
871 #[derive(Debug, Clone)]
873 pub struct SetFilter<T> {
874 set: HashSet<T>,
875 }
876
877 impl<T> Filter<T> for SetFilter<T>
878 where
879 T: Clone + Hash + Eq + Send + Sync + 'static,
880 {
881 fn filter(&self, value: &T, _: &mut dyn Iterator<Item = &T>) -> bool {
882 self.set.contains(value)
883 }
884 }
885
886 #[derive(Debug, Clone)]
888 pub enum Action<TVal>
889 where
890 TVal: Eq,
891 {
892 Insert(Node<NodeId, TVal>),
893 Remove(usize),
894 UpdatePending(NodeStatus),
895 ApplyPending,
896 UpdateStatus(usize, NodeStatus),
897 UpdateValue(usize, TVal),
898 }
899
900 impl<V> Arbitrary for Action<V>
901 where
902 V: Arbitrary + Eq,
903 {
904 fn arbitrary<G: Gen>(g: &mut G) -> Self {
905 match g.gen_range(0, 6) {
906 0 => Action::Insert(<_>::arbitrary(g)),
907 1 => Action::Remove(<_>::arbitrary(g)),
908 2 => Action::UpdatePending(<_>::arbitrary(g)),
909 3 => Action::ApplyPending,
910 4 => Action::UpdateStatus(<_>::arbitrary(g), <_>::arbitrary(g)),
911 5 => Action::UpdateValue(<_>::arbitrary(g), <_>::arbitrary(g)),
912 _ => panic!("wrong number of action variants"),
913 }
914 }
915 }
916
917 impl<V> KBucket<NodeId, V>
918 where
919 V: Eq + std::fmt::Debug,
920 {
921 fn apply_action(&mut self, action: Action<V>) -> Result<(), FailureReason> {
922 match action {
923 Action::Insert(node) => match self.insert(node) {
924 InsertResult::FailedFilter => Err(FailureReason::BucketFilter),
925 InsertResult::TooManyIncoming => Err(FailureReason::TooManyIncoming),
926 InsertResult::Full => Err(FailureReason::BucketFull),
927 _ => Ok(()),
928 },
929 Action::Remove(pos) => {
930 if let Some(key) = self.key_of_pos(pos) {
931 self.remove(&key);
932 }
933 Ok(())
934 }
935 Action::UpdatePending(status) => {
936 self.update_pending(status);
937 Ok(())
938 }
939 Action::ApplyPending => {
940 self.apply_pending();
941 Ok(())
942 }
943 Action::UpdateStatus(pos, status) => {
944 if let Some(key) = self.key_of_pos(pos) {
945 match self.update_status(&key, status.state, Some(status.direction)) {
946 UpdateResult::Failed(reason) => Err(reason),
947 _ => Ok(()),
948 }
949 } else {
950 Ok(())
951 }
952 }
953 Action::UpdateValue(pos, value) => {
954 if let Some(key) = self.key_of_pos(pos) {
955 match self.update_value(&key, value) {
956 UpdateResult::Failed(reason) => Err(reason),
957 _ => Ok(()),
958 }
959 } else {
960 Ok(())
961 }
962 }
963 }
964 }
965
966 fn key_of_pos(&self, pos: usize) -> Option<Key<NodeId>> {
967 let num_nodes = self.num_entries();
968 if num_nodes > 0 {
969 let pos = pos % num_nodes;
970 let key = self.nodes[pos].key.clone();
971 Some(key)
972 } else {
973 None
974 }
975 }
976 }
977
978 #[test]
979 fn ordering() {
980 fn prop(status: Vec<NodeStatus>) -> bool {
981 let mut bucket =
982 KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
983
984 let mut connected = VecDeque::new();
986 let mut disconnected = VecDeque::new();
987
988 for status in status {
990 let key = Key::from(NodeId::random());
991 let node = Node {
992 key: key.clone(),
993 value: (),
994 status,
995 };
996 let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
997 if let InsertResult::Inserted = bucket.insert(node) {
998 let vec = if status.is_connected() {
999 &mut connected
1000 } else {
1001 &mut disconnected
1002 };
1003 if full {
1004 vec.pop_front();
1005 }
1006 vec.push_back((status, key.clone()));
1007 }
1008 }
1009
1010 let mut nodes = bucket
1012 .iter()
1013 .map(|n| (n.status, n.key.clone()))
1014 .collect::<Vec<_>>();
1015
1016 let first_connected_pos = nodes.iter().position(|(status, _)| status.is_connected());
1018 assert_eq!(bucket.first_connected_pos, first_connected_pos);
1019 let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p));
1020
1021 disconnected == nodes && connected == tail
1025 }
1026
1027 quickcheck(prop as fn(_) -> _);
1028 }
1029
1030 #[test]
1031 fn full_bucket() {
1032 let mut bucket =
1033 KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
1034
1035 let disconnected_status = NodeStatus {
1036 state: ConnectionState::Disconnected,
1037 direction: ConnectionDirection::Outgoing,
1038 };
1039 fill_bucket(&mut bucket, disconnected_status);
1041
1042 let key = Key::from(NodeId::random());
1044 let node = Node {
1045 key,
1046 value: (),
1047 status: disconnected_status,
1048 };
1049 match bucket.insert(node) {
1050 InsertResult::Full => {}
1051 x => panic!("{:?}", x),
1052 }
1053
1054 for i in 0..MAX_NODES_PER_BUCKET {
1056 let first = bucket.iter().next().unwrap();
1057 let first_disconnected = first.clone();
1058 assert_eq!(first.status, disconnected_status);
1059
1060 let key = Key::from(NodeId::random());
1063 let node = Node {
1064 key: key.clone(),
1065 value: (),
1066 status: connected_state(),
1067 };
1068 match bucket.insert(node.clone()) {
1069 InsertResult::Pending { disconnected } => {
1070 assert_eq!(disconnected, first_disconnected.key)
1071 }
1072 x => panic!("{:?}", x),
1073 }
1074
1075 match bucket.insert(node.clone()) {
1077 InsertResult::Full => {}
1078 x => panic!("{:?}", x),
1079 }
1080
1081 assert!(bucket.pending().is_some());
1082
1083 let pending = bucket.pending_mut().expect("No pending node.");
1085 pending.set_ready_at(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
1086 let result = bucket.apply_pending();
1087 assert_eq!(
1088 result,
1089 Some(AppliedPending {
1090 inserted: key.clone(),
1091 evicted: Some(first_disconnected)
1092 })
1093 );
1094 assert_eq!(
1095 Some(connected_state()),
1096 bucket.iter().map(|v| v.status).last()
1097 );
1098 assert!(bucket.pending().is_none());
1099 assert_eq!(
1100 Some(MAX_NODES_PER_BUCKET - (i + 1)),
1101 bucket.first_connected_pos
1102 );
1103 }
1104
1105 assert!(bucket.pending().is_none());
1106 assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries());
1107
1108 let key = Key::from(NodeId::random());
1110 let node = Node {
1111 key,
1112 value: (),
1113 status: connected_state(),
1114 };
1115 match bucket.insert(node) {
1116 InsertResult::Full => {}
1117 x => panic!("{:?}", x),
1118 }
1119 }
1120
1121 #[test]
1122 fn full_bucket_discard_pending() {
1123 let mut bucket =
1124 KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
1125 fill_bucket(&mut bucket, disconnected_state());
1126 let first = bucket.iter().next().unwrap();
1127 let first_disconnected = first.clone();
1128
1129 let key = Key::from(NodeId::random());
1131 let node = Node {
1132 key: key.clone(),
1133 value: (),
1134 status: connected_state(),
1135 };
1136 if let InsertResult::Pending { disconnected } = bucket.insert(node) {
1137 assert_eq!(&disconnected, &first_disconnected.key);
1138 } else {
1139 panic!()
1140 }
1141 assert!(bucket.pending().is_some());
1142
1143 let _ = bucket.update_status(&first_disconnected.key, ConnectionState::Connected, None);
1145
1146 assert!(bucket.pending().is_none());
1148 assert!(bucket.iter().all(|n| n.key != key));
1149
1150 assert_eq!(
1152 Some((&first_disconnected.key, connected_state())),
1153 bucket.iter().map(|v| (&v.key, v.status)).last()
1154 );
1155 assert_eq!(
1156 bucket.position(&first_disconnected.key).map(|p| p.0),
1157 bucket.first_connected_pos
1158 );
1159 assert_eq!(1, bucket.num_connected());
1160 assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected());
1161 }
1162
1163 #[test]
1165 fn full_bucket_applied_no_duplicates() {
1166 let mut bucket =
1168 KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
1169 fill_bucket(&mut bucket, connected_state());
1170
1171 let first = bucket.iter().next().unwrap().clone();
1172
1173 let third = bucket.iter().nth(2).unwrap().clone();
1174
1175 assert_eq!(
1178 bucket.update_status(&first.key, ConnectionState::Disconnected, None),
1179 UpdateResult::Updated
1180 );
1181
1182 let key = Key::from(NodeId::random());
1184 let node = Node {
1185 key,
1186 value: (),
1187 status: connected_state(),
1188 };
1189
1190 if let InsertResult::Pending { disconnected } = bucket.insert(node.clone()) {
1192 assert_eq!(&disconnected, &first.key);
1193 } else {
1194 panic!()
1195 }
1196 assert!(bucket.pending().is_some());
1197
1198 bucket.remove(&third.key);
1201
1202 assert_eq!(bucket.apply_pending(), None);
1206 assert_eq!(bucket.insert(node.clone()), InsertResult::Inserted);
1207 assert!(bucket.pending.is_none());
1208
1209 if let Some(pending) = bucket.pending.as_mut() {
1211 pending.replace = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
1212 }
1213
1214 assert_eq!(bucket.apply_pending(), None);
1216 assert_eq!(
1218 bucket.update_status(&node.key, ConnectionState::Connected, None),
1219 UpdateResult::NotModified
1220 );
1221 }
1222
1223 #[test]
1224 fn bucket_update_status() {
1225 fn prop(mut bucket: KBucket<NodeId, ()>, pos: Position, status: NodeStatus) -> bool {
1226 let num_nodes = bucket.num_entries();
1227
1228 let pos = pos.0 % num_nodes;
1230 let key = bucket.nodes[pos].key.clone();
1231
1232 let mut expected = bucket
1234 .iter()
1235 .map(|n| (n.key.clone(), n.status))
1236 .collect::<Vec<_>>();
1237
1238 let _ = bucket.update_status(&key, status.state, Some(status.direction));
1240
1241 let expected_pos = if status.is_connected() {
1244 num_nodes - 1
1245 } else {
1246 bucket.first_connected_pos.unwrap_or(num_nodes) - 1
1247 };
1248 expected.remove(pos);
1249 expected.insert(expected_pos, (key, status));
1250 let actual = bucket
1251 .iter()
1252 .map(|n| (n.key.clone(), n.status))
1253 .collect::<Vec<_>>();
1254 expected == actual
1255 }
1256
1257 quickcheck(prop as fn(_, _, _) -> _);
1258 }
1259
1260 #[test]
1261 fn bucket_update_value_with_filtering() {
1262 fn prop(
1263 mut bucket: KBucket<NodeId, u8>,
1264 pos: Position,
1265 value: u8,
1266 value_matches_filter: bool,
1267 ) -> bool {
1268 let filter = SetFilter {
1270 set: value_matches_filter.then_some(value).into_iter().collect(),
1271 };
1272 bucket.filter = Some(Box::new(filter));
1273
1274 let num_nodes = bucket.num_entries();
1275
1276 let pos = pos.0 % num_nodes;
1278 let key = bucket.nodes[pos].key.clone();
1279
1280 let mut expected = bucket
1282 .iter()
1283 .map(|n| (n.key.clone(), n.value))
1284 .collect::<Vec<_>>();
1285
1286 let _ = bucket.update_value(&key, value);
1288
1289 bucket.check_invariants();
1290
1291 if value_matches_filter || expected[pos].1 == value {
1294 expected[pos].1 = value;
1295 } else {
1296 expected.remove(pos);
1297 }
1298 let actual = bucket
1299 .iter()
1300 .map(|n| (n.key.clone(), n.value))
1301 .collect::<Vec<_>>();
1302 expected == actual
1303 }
1304
1305 quickcheck(prop as fn(_, _, _, _) -> _);
1306 }
1307
1308 #[test]
1310 fn random_actions_with_filtering() {
1311 fn prop(
1312 initial_nodes: Vec<Node<NodeId, u8>>,
1313 pending_timeout_millis: u64,
1314 max_incoming: usize,
1315 filter_set: HashSet<u8>,
1316 actions: Vec<Action<u8>>,
1317 ) -> bool {
1318 let filter = SetFilter { set: filter_set };
1319 let pending_timeout = Duration::from_millis(pending_timeout_millis);
1320 let mut kbucket =
1321 KBucket::<NodeId, u8>::new(pending_timeout, max_incoming, Some(Box::new(filter)));
1322
1323 for node in initial_nodes {
1324 let _ = kbucket.insert(node);
1325 }
1326
1327 for action in actions {
1328 let _ = kbucket.apply_action(action);
1332 kbucket.check_invariants();
1333 }
1334 true
1335 }
1336
1337 quickcheck(prop as fn(_, _, _, _, _) -> _);
1338 }
1339
1340 #[test]
1341 fn table_update_status_connection() {
1342 let max_incoming = 7;
1343 let mut bucket = KBucket::<NodeId, ()>::new(Duration::from_secs(1), max_incoming, None);
1344
1345 let mut incoming_connected = 0;
1346 let mut keys = Vec::new();
1347 for _ in 0..MAX_NODES_PER_BUCKET {
1348 let key = Key::from(NodeId::random());
1349 keys.push(key.clone());
1350 incoming_connected += 1;
1351 let direction = if incoming_connected <= max_incoming {
1352 ConnectionDirection::Incoming
1353 } else {
1354 ConnectionDirection::Outgoing
1355 };
1356 let status = NodeStatus {
1357 state: ConnectionState::Connected,
1358 direction,
1359 };
1360 let node = Node {
1361 key: key.clone(),
1362 value: (),
1363 status,
1364 };
1365 assert_eq!(InsertResult::Inserted, bucket.insert(node));
1366 }
1367
1368 let result = bucket.update_status(
1371 &keys[max_incoming],
1372 ConnectionState::Disconnected,
1373 Some(ConnectionDirection::Incoming),
1374 );
1375 assert_eq!(result, UpdateResult::Updated);
1376 let result = bucket.update_status(
1377 &keys[max_incoming],
1378 ConnectionState::Connected,
1379 Some(ConnectionDirection::Outgoing),
1380 );
1381 assert_eq!(result, UpdateResult::UpdatedAndPromoted);
1382 let result = bucket.update_status(
1383 &keys[max_incoming],
1384 ConnectionState::Connected,
1385 Some(ConnectionDirection::Outgoing),
1386 );
1387 assert_eq!(result, UpdateResult::NotModified);
1388 let result = bucket.update_status(
1389 &keys[max_incoming],
1390 ConnectionState::Connected,
1391 Some(ConnectionDirection::Incoming),
1392 );
1393 assert_eq!(result, UpdateResult::Failed(FailureReason::TooManyIncoming));
1394 }
1395
1396 #[test]
1397 fn bucket_max_incoming_nodes() {
1398 fn prop(status: Vec<NodeStatus>) -> bool {
1399 let max_incoming_nodes = 5;
1400 let mut bucket =
1401 KBucket::<NodeId, ()>::new(Duration::from_secs(1), max_incoming_nodes, None);
1402
1403 let mut connected = VecDeque::new();
1405 let mut disconnected = VecDeque::new();
1406
1407 for status in status {
1409 let key = Key::from(NodeId::random());
1410 let node = Node {
1411 key: key.clone(),
1412 value: (),
1413 status,
1414 };
1415 let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
1416 match bucket.insert(node) {
1417 InsertResult::Inserted => {
1418 let vec = if status.is_connected() {
1419 &mut connected
1420 } else {
1421 &mut disconnected
1422 };
1423 if full {
1424 vec.pop_front();
1425 }
1426 vec.push_back((status, key.clone()));
1427 }
1428 InsertResult::FailedFilter => break,
1429 _ => {}
1430 }
1431 }
1432
1433 bucket.check_invariants();
1435
1436 let mut nodes = bucket
1438 .iter()
1439 .map(|n| (n.status, n.key.clone()))
1440 .collect::<Vec<_>>();
1441
1442 let tail = bucket
1444 .first_connected_pos
1445 .map_or(Vec::new(), |p| nodes.split_off(p));
1446
1447 disconnected == nodes && connected == tail
1452 }
1453
1454 quickcheck(prop as fn(_) -> _);
1455 }
1456}