discv5/kbucket/
bucket.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21// This basis of this file has been taken from the rust-libp2p codebase:
22// https://github.com/libp2p/rust-libp2p
23
24//! The internal API for a single `KBucket` in a `KBucketsTable`.
25//!
26//! > **Note**: Uniqueness of entries w.r.t. a `Key` in a `KBucket` is not
27//! > checked in this module. This is an invariant that must hold across all
28//! > buckets in a `KBucketsTable` and hence is enforced by the public API
29//! > of the `KBucketsTable` and in particular the public `Entry` API.
30
31#![allow(dead_code)]
32
33use super::*;
34use tracing::{debug, error};
35
36/// Maximum number of nodes in a bucket, i.e. the (fixed) `k` parameter.
37pub const MAX_NODES_PER_BUCKET: usize = 16;
38
39/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
40#[derive(Debug, Clone)]
41pub struct PendingNode<TNodeId, TVal: Eq> {
42    /// The pending node to insert.
43    node: Node<TNodeId, TVal>,
44
45    /// The instant at which the pending node is eligible for insertion into a bucket.
46    replace: Instant,
47}
48
49/// The status of a node in a bucket.
50///
51/// The status of a node in a bucket together with the time of the
52/// last status change determines the position of the node in a
53/// bucket.
54#[derive(PartialEq, Eq, Debug, Copy, Clone)]
55pub struct NodeStatus {
56    /// The direction (incoming or outgoing) for the node. If in the disconnected state, this
57    /// represents the last connection status.
58    pub direction: ConnectionDirection,
59    /// The connection state, connected or disconnected.
60    pub state: ConnectionState,
61}
62
63/// The connection state of a node.
64#[derive(PartialEq, Eq, Debug, Copy, Clone)]
65pub enum ConnectionState {
66    /// The node is connected.
67    Connected,
68    /// The node is considered disconnected.
69    Disconnected,
70}
71
72impl NodeStatus {
73    pub fn is_connected(&self) -> bool {
74        match self.state {
75            ConnectionState::Connected => true,
76            ConnectionState::Disconnected => false,
77        }
78    }
79
80    pub fn is_incoming(&self) -> bool {
81        match self.direction {
82            ConnectionDirection::Outgoing => false,
83            ConnectionDirection::Incoming => true,
84        }
85    }
86}
87
88impl<TNodeId, TVal: Eq> PendingNode<TNodeId, TVal> {
89    pub fn status(&self) -> NodeStatus {
90        self.node.status
91    }
92
93    pub fn value_mut(&mut self) -> &mut TVal {
94        &mut self.node.value
95    }
96
97    pub fn set_ready_at(&mut self, t: Instant) {
98        self.replace = t;
99    }
100}
101
102/// A `Node` in a bucket, representing a peer participating
103/// in the Kademlia DHT together with an associated value (e.g. contact
104/// information).
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct Node<TNodeId, TVal: Eq> {
107    /// The key of the node, identifying the peer.
108    pub key: Key<TNodeId>,
109    /// The associated value.
110    pub value: TVal,
111    /// The status of the node.
112    pub status: NodeStatus,
113}
114
115/// The position of a node in a `KBucket`, i.e. a non-negative integer
116/// in the range `[0, MAX_NODES_PER_BUCKET)`.
117#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
118pub struct Position(usize);
119
120/// A `KBucket` is a list of up to `MAX_NODES_PER_BUCKET` `Key`s and associated values,
121/// ordered from least-recently connected to most-recently connected.
122#[derive(Clone)]
123pub struct KBucket<TNodeId, TVal: Eq> {
124    /// The nodes contained in the bucket.
125    nodes: ArrayVec<Node<TNodeId, TVal>, MAX_NODES_PER_BUCKET>,
126
127    /// The position (index) in `nodes` that marks the first connected node.
128    ///
129    /// Since the entries in `nodes` are ordered from least-recently connected to
130    /// most-recently connected, all entries above this index are also considered
131    /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries
132    /// that are considered disconnected and the range
133    /// `[first_connected_pos, MAX_NODES_PER_BUCKET)` marks sub-list of entries that are
134    /// considered connected.
135    ///
136    /// `None` indicates that there are no connected entries in the bucket, i.e.
137    /// the bucket is either empty, or contains only entries for peers that are
138    /// considered disconnected.
139    first_connected_pos: Option<usize>,
140
141    /// A node that is pending to be inserted into a full bucket, should the
142    /// least-recently connected (and currently disconnected) node not be
143    /// marked as connected within `unresponsive_timeout`.
144    pending: Option<PendingNode<TNodeId, TVal>>,
145
146    /// The timeout window before a new pending node is eligible for insertion,
147    /// if the least-recently connected node is not updated as being connected
148    /// in the meantime.
149    pending_timeout: Duration,
150
151    /// An optional filter that filters new entries given an iterator over current entries in
152    /// the bucket.
153    filter: Option<Box<dyn Filter<TVal>>>,
154
155    /// The maximum number of incoming connections allowed per bucket. Setting this to
156    /// MAX_NODES_PER_BUCKET means there is no restriction on incoming nodes.
157    max_incoming: usize,
158}
159
160/// The result of inserting an entry into a bucket.
161#[must_use]
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub enum InsertResult<TNodeId> {
164    /// The entry has been successfully inserted.
165    Inserted,
166    /// The entry is pending insertion because the relevant bucket is currently full.
167    /// The entry is inserted after a timeout elapsed, if the status of the
168    /// least-recently connected (and currently disconnected) node in the bucket
169    /// is not updated before the timeout expires.
170    Pending {
171        /// The key of the least-recently connected entry that is currently considered
172        /// disconnected and whose corresponding peer should be checked for connectivity
173        /// in order to prevent it from being evicted. If connectivity to the peer is
174        /// re-established, the corresponding entry should be updated with a connected status.
175        disconnected: Key<TNodeId>,
176    },
177    /// The attempted entry failed to pass the filter.
178    FailedFilter,
179    /// There were too many incoming nodes for this bucket.
180    TooManyIncoming,
181    /// The entry was not inserted because the relevant bucket is full.
182    Full,
183    /// The entry already exists.
184    NodeExists,
185}
186
187/// The result of performing an update on a kbucket/table.
188#[must_use]
189#[derive(Debug, Clone, PartialEq, Eq)]
190pub enum UpdateResult {
191    /// The node was updated successfully,
192    Updated,
193    /// The update promoted the node to a connected state from a disconnected state.
194    UpdatedAndPromoted,
195    /// The pending entry was updated.
196    UpdatedPending,
197    /// The update removed the node because it would violate the incoming peers condition.
198    Failed(FailureReason),
199    /// There were no changes made to the value of the node.
200    NotModified,
201}
202
203impl UpdateResult {
204    // The update failed.
205    pub fn failed(&self) -> bool {
206        matches!(self, UpdateResult::Failed(_))
207    }
208}
209
210/// A reason for failing to update or insert a node into the bucket.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub enum FailureReason {
213    /// Too many incoming nodes already in the bucket.
214    TooManyIncoming,
215    /// The node didn't pass the bucket filter.
216    BucketFilter,
217    /// The node didn't pass the table filter.
218    TableFilter,
219    /// The node didn't exist.
220    KeyNonExistent,
221    /// The bucket was full.
222    BucketFull,
223    /// Cannot update self,
224    InvalidSelfUpdate,
225}
226
227/// The result of applying a pending node to a bucket, possibly
228/// replacing an existing node.
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct AppliedPending<TNodeId, TVal: Eq> {
231    /// The key of the inserted pending node.
232    pub inserted: Key<TNodeId>,
233    /// The node that has been evicted from the bucket to make room for the
234    /// pending node, if any.
235    pub evicted: Option<Node<TNodeId, TVal>>,
236}
237
238impl<TNodeId, TVal> KBucket<TNodeId, TVal>
239where
240    TNodeId: Clone,
241    TVal: Eq,
242{
243    /// Creates a new `KBucket` with the given timeout for pending entries.
244    pub fn new(
245        pending_timeout: Duration,
246        max_incoming: usize,
247        filter: Option<Box<dyn Filter<TVal>>>,
248    ) -> Self {
249        KBucket {
250            nodes: ArrayVec::new(),
251            first_connected_pos: None,
252            pending: None,
253            pending_timeout,
254            filter,
255            max_incoming,
256        }
257    }
258
259    /// Returns a reference to the pending node of the bucket, if there is any.
260    pub fn pending(&self) -> Option<&PendingNode<TNodeId, TVal>> {
261        self.pending.as_ref()
262    }
263
264    /// Returns a mutable reference to the pending node of the bucket, if there is any.
265    pub fn pending_mut(&mut self) -> Option<&mut PendingNode<TNodeId, TVal>> {
266        self.pending.as_mut()
267    }
268
269    /// Returns a reference to the pending node of the bucket, if there is any
270    /// with a matching key.
271    pub fn as_pending(&self, key: &Key<TNodeId>) -> Option<&PendingNode<TNodeId, TVal>> {
272        self.pending().filter(|p| &p.node.key == key)
273    }
274
275    /// Returns an iterator over the nodes in the bucket, together with their status.
276    pub fn iter(&self) -> impl Iterator<Item = &Node<TNodeId, TVal>> {
277        self.nodes.iter()
278    }
279
280    /// Inserts the pending node into the bucket, if its timeout has elapsed,
281    /// replacing the least-recently connected node.
282    ///
283    /// If a pending node has been inserted, its key is returned together with
284    /// the node that was replaced. `None` indicates that the nodes in the
285    /// bucket remained unchanged.
286    pub fn apply_pending(&mut self) -> Option<AppliedPending<TNodeId, TVal>> {
287        if let Some(pending) = self.pending.take() {
288            if pending.replace <= Instant::now() {
289                // Check if the bucket is full
290                if self.nodes.is_full() {
291                    // Apply bucket filters
292
293                    if self.nodes[0].status.is_connected() {
294                        // The bucket is full with connected nodes. Drop the pending node.
295                        return None;
296                    }
297                    // Check the custom filter
298                    if let Some(filter) = self.filter.as_ref() {
299                        if !filter.filter(
300                            &pending.node.value,
301                            &mut self.iter().map(|node| &node.value),
302                        ) {
303                            // The pending node doesn't satisfy the bucket filter. Drop the pending
304                            // node.
305                            return None;
306                        }
307                    }
308                    // Check the incoming node restriction
309                    if pending.status().is_connected() && pending.status().is_incoming() {
310                        // Make sure this doesn't violate the incoming conditions
311                        if self.is_max_incoming() {
312                            // The pending node doesn't satisfy the incoming/outgoing limits. Drop
313                            // the pending node.
314                            return None;
315                        }
316                    }
317
318                    // The pending node will be inserted.
319                    let inserted = pending.node.key.clone();
320                    // A connected pending node goes at the end of the list for
321                    // the connected peers, removing the least-recently connected.
322                    if pending.status().is_connected() {
323                        let evicted = Some(self.nodes.remove(0));
324                        self.first_connected_pos = self
325                            .first_connected_pos
326                            .map_or_else(|| Some(self.nodes.len()), |p| p.checked_sub(1));
327                        self.nodes.push(pending.node);
328                        return Some(AppliedPending { inserted, evicted });
329                    }
330                    // A disconnected pending node goes at the end of the list
331                    // for the disconnected peers.
332                    else if let Some(p) = self.first_connected_pos {
333                        if let Some(insert_pos) = p.checked_sub(1) {
334                            let evicted = Some(self.nodes.remove(0));
335                            self.nodes.insert(insert_pos, pending.node);
336                            return Some(AppliedPending { inserted, evicted });
337                        }
338                    } else {
339                        // All nodes are disconnected. Insert the new node as the most
340                        // recently disconnected, removing the least-recently disconnected.
341                        let evicted = Some(self.nodes.remove(0));
342                        self.nodes.push(pending.node);
343                        return Some(AppliedPending { inserted, evicted });
344                    }
345                } else {
346                    // There is room in the bucket, so just insert the pending node.
347                    let inserted = pending.node.key.clone();
348                    match self.insert(pending.node) {
349                        InsertResult::Inserted => {
350                            return Some(AppliedPending {
351                                inserted,
352                                evicted: None,
353                            })
354                        }
355                        InsertResult::Full => unreachable!("Bucket cannot be full"),
356                        InsertResult::Pending { .. } | InsertResult::NodeExists => {
357                            error!("Bucket is not full or double node")
358                        }
359                        InsertResult::FailedFilter => debug!("Pending node failed filter"),
360                        InsertResult::TooManyIncoming => {
361                            debug!("Pending node failed incoming filter")
362                        }
363                    }
364                }
365            } else {
366                self.pending = Some(pending);
367            }
368        }
369
370        None
371    }
372
373    /// Updates the status of the pending node, if any.
374    pub fn update_pending(&mut self, status: NodeStatus) {
375        if let Some(pending) = &mut self.pending {
376            pending.node.status = status
377        }
378    }
379
380    /// Updates the status of the node referred to by the given key, if it is
381    /// in the bucket. If the node is not in the bucket, or the update would violate a bucket
382    /// filter or incoming limits, returns an update result indicating the outcome.
383    /// An optional connection state can be given. If this is omitted the connection state will not
384    /// be modified.
385    pub fn update_status(
386        &mut self,
387        key: &Key<TNodeId>,
388        state: ConnectionState,
389        direction: Option<ConnectionDirection>,
390    ) -> UpdateResult {
391        // Remove the node from its current position and then reinsert it
392        // with the desired status, which puts it at the end of either the
393        // prefix list of disconnected nodes or the suffix list of connected
394        // nodes (i.e. most-recently disconnected or most-recently connected,
395        // respectively).
396        if let Some(pos) = self.position(key) {
397            // Remove the node from its current position.
398            let mut node = self.nodes.remove(pos.0);
399            let old_status = node.status;
400            node.status.state = state;
401            if let Some(direction) = direction {
402                node.status.direction = direction;
403            }
404
405            // Flag indicating if this update modified the entry.
406            let not_modified = old_status == node.status;
407            // Flag indicating we are upgrading to a connected status
408            let is_connected = matches!(state, ConnectionState::Connected);
409
410            // Adjust `first_connected_pos` accordingly.
411            match old_status.state {
412                ConnectionState::Connected => {
413                    if self.first_connected_pos == Some(pos.0) && pos.0 == self.nodes.len() {
414                        // It was the last connected node.
415                        self.first_connected_pos = None
416                    }
417                }
418                ConnectionState::Disconnected => {
419                    self.first_connected_pos =
420                        self.first_connected_pos.and_then(|p| p.checked_sub(1))
421                }
422            }
423            // If the least-recently connected node re-establishes its
424            // connected status, drop the pending node.
425            if pos == Position(0) && is_connected {
426                self.pending = None
427            }
428            // Reinsert the node with the desired status.
429            match self.insert(node) {
430                InsertResult::Inserted => {
431                    if not_modified {
432                        UpdateResult::NotModified
433                    } else if !old_status.is_connected() && is_connected {
434                        // This means the status was updated from a disconnected state to connected
435                        // state
436                        UpdateResult::UpdatedAndPromoted
437                    } else {
438                        UpdateResult::Updated
439                    }
440                }
441                InsertResult::TooManyIncoming => {
442                    UpdateResult::Failed(FailureReason::TooManyIncoming)
443                }
444                // Node could not be inserted. None of these should be possible.
445                InsertResult::FailedFilter => {
446                    // If the filter is non-deterministic, potentially a re-insertion of the same
447                    // node can fail the filter.
448                    UpdateResult::Failed(FailureReason::BucketFilter)
449                }
450                InsertResult::NodeExists => {
451                    unreachable!("The node was removed and shouldn't already exist")
452                }
453                InsertResult::Full => {
454                    unreachable!("The node was removed so the bucket cannot be full")
455                }
456                InsertResult::Pending { .. } => {
457                    unreachable!("The node was removed so can't be added as pending")
458                }
459            }
460        } else if let Some(pending) = &mut self.pending {
461            if &pending.node.key == key {
462                pending.node.status.state = state;
463                if let Some(direction) = direction {
464                    pending.node.status.direction = direction;
465                }
466                UpdateResult::UpdatedPending
467            } else {
468                UpdateResult::Failed(FailureReason::KeyNonExistent)
469            }
470        } else {
471            UpdateResult::Failed(FailureReason::KeyNonExistent)
472        }
473    }
474
475    /// Updates the value of the node referred to by the given key, if it is
476    /// in the bucket. If the node is not in the bucket, or the update would violate a bucket
477    /// filter or incoming limits, returns false and removes the node from the bucket.
478    /// NOTE: This does not update the position of the node in the table. It node will be removed
479    /// if it fails the filter however.
480    pub fn update_value(&mut self, key: &Key<TNodeId>, value: TVal) -> UpdateResult {
481        // Remove the node from its current position, check the filter and add it back in.
482        if let Some(Position(pos)) = self.position(key) {
483            // Remove the node from its current position.
484            let mut node = self.nodes.remove(pos);
485            if node.value == value {
486                self.nodes.insert(pos, node);
487                UpdateResult::NotModified
488            } else {
489                // Check bucket filter
490                if let Some(filter) = self.filter.as_ref() {
491                    if !filter.filter(&value, &mut self.iter().map(|node| &node.value)) {
492                        // Node is removed, update the `first_connected_pos` accordingly.
493                        self.update_first_connected_pos_for_removal(pos);
494
495                        return UpdateResult::Failed(FailureReason::BucketFilter);
496                    }
497                }
498                node.value = value;
499                self.nodes.insert(pos, node);
500                UpdateResult::Updated
501            }
502        } else if let Some(pending) = &mut self.pending {
503            if &pending.node.key == key {
504                pending.node.value = value;
505                UpdateResult::UpdatedPending
506            } else {
507                UpdateResult::Failed(FailureReason::KeyNonExistent)
508            }
509        } else {
510            UpdateResult::Failed(FailureReason::KeyNonExistent)
511        }
512    }
513
514    /// Inserts a new node into the bucket with the given status.
515    ///
516    /// The status of the node to insert determines the result as follows:
517    ///
518    ///   * [`ConnectionState::Connected`] for both directions: If the bucket is full and either all nodes are connected
519    ///     or there is already a pending node, insertion fails with [`InsertResult::Full`].
520    ///     If the bucket is full but at least one node is disconnected and there is no pending
521    ///     node, the new node is inserted as pending, yielding [`InsertResult::Pending`].
522    ///     Otherwise the bucket has free slots and the new node is added to the end of the
523    ///     bucket as the most-recently connected node.
524    ///
525    ///   * [`ConnectionState::Disconnected`]: If the bucket is full, insertion fails with
526    ///     [`InsertResult::Full`]. Otherwise the bucket has free slots and the new node
527    ///     is inserted at the position preceding the first connected node,
528    ///     i.e. as the most-recently disconnected node. If there are no connected nodes,
529    ///     the new node is added as the last element of the bucket.
530    ///
531    /// The insert can fail if a provided bucket filter does not pass. If a node is attempted
532    /// to be inserted that doesn't pass the bucket filter, [`InsertResult::FailedFilter`] will be
533    /// returned. Similarly, if the inserted node would violate the `max_incoming` value, the
534    /// result will return [`InsertResult::TooManyIncoming`].
535    pub fn insert(&mut self, node: Node<TNodeId, TVal>) -> InsertResult<TNodeId> {
536        // Prevent inserting duplicate nodes.
537        if self.position(&node.key).is_some() {
538            return InsertResult::NodeExists;
539        }
540
541        // check bucket filter
542        if let Some(filter) = self.filter.as_ref() {
543            if !filter.filter(&node.value, &mut self.iter().map(|node| &node.value)) {
544                return InsertResult::FailedFilter;
545            }
546        }
547
548        let inserting_pending = self
549            .pending
550            .as_ref()
551            .map(|pending| pending.node.key == node.key)
552            .unwrap_or_default();
553
554        let insert_result = match node.status.state {
555            ConnectionState::Connected => {
556                if node.status.is_incoming() {
557                    // check the maximum counter
558                    if self.is_max_incoming() {
559                        return InsertResult::TooManyIncoming;
560                    }
561                }
562                if self.nodes.is_full() {
563                    if self.first_connected_pos == Some(0) || self.pending.is_some() {
564                        return InsertResult::Full;
565                    } else {
566                        self.pending = Some(PendingNode {
567                            node,
568                            replace: Instant::now() + self.pending_timeout,
569                        });
570                        return InsertResult::Pending {
571                            disconnected: self.nodes[0].key.clone(),
572                        };
573                    }
574                }
575
576                let pos = self.nodes.len();
577                self.first_connected_pos = self.first_connected_pos.or(Some(pos));
578                self.nodes.push(node);
579                InsertResult::Inserted
580            }
581            ConnectionState::Disconnected => {
582                if self.nodes.is_full() {
583                    return InsertResult::Full;
584                }
585
586                if let Some(ref mut first_connected_pos) = self.first_connected_pos {
587                    self.nodes.insert(*first_connected_pos, node);
588                    *first_connected_pos += 1;
589                } else {
590                    self.nodes.push(node);
591                }
592                InsertResult::Inserted
593            }
594        };
595
596        // If we inserted the node, make sure there is no pending node of the same key. This can
597        // happen when a pending node is inserted, a node gets removed from the bucket, freeing up
598        // space and then re-inserted here.
599        if matches!(insert_result, InsertResult::Inserted) && inserting_pending {
600            self.pending = None
601        }
602        insert_result
603    }
604
605    /// Removes a node from the bucket.
606    pub fn remove(&mut self, key: &Key<TNodeId>) -> bool {
607        if let Some(Position(position)) = self.position(key) {
608            self.nodes.remove(position);
609            self.update_first_connected_pos_for_removal(position);
610            self.apply_pending();
611            true
612        } else {
613            false
614        }
615    }
616
617    /// Gets the number of entries currently in the bucket.
618    pub fn num_entries(&self) -> usize {
619        self.nodes.len()
620    }
621
622    /// Gets the number of entries in the bucket that are considered connected.
623    pub fn num_connected(&self) -> usize {
624        self.first_connected_pos.map_or(0, |i| self.nodes.len() - i)
625    }
626
627    /// Gets the number of entries in the bucket that are considered disconnected.
628    pub fn num_disconnected(&self) -> usize {
629        self.nodes.len() - self.num_connected()
630    }
631
632    /// Gets the position of an node in the bucket.
633    pub fn position(&self, key: &Key<TNodeId>) -> Option<Position> {
634        self.nodes.iter().position(|p| &p.key == key).map(Position)
635    }
636
637    /// Returns the state of the node at the given position.
638    pub fn status(&self, pos: Position) -> NodeStatus {
639        if let Some(node) = self.nodes.get(pos.0) {
640            node.status
641        } else {
642            // If the node isn't in the bucket, return the worst kind of state.
643            NodeStatus {
644                state: ConnectionState::Disconnected,
645                direction: ConnectionDirection::Incoming,
646            }
647        }
648    }
649
650    /// Gets a mutable reference to the node identified by the given key.
651    ///
652    /// Returns `None` if the given key does not refer to an node in the
653    /// bucket.
654    pub fn get_mut(&mut self, key: &Key<TNodeId>) -> Option<&mut Node<TNodeId, TVal>> {
655        self.nodes.iter_mut().find(move |p| &p.key == key)
656    }
657
658    /// Gets a reference to the node identified by the given key.
659    ///
660    /// Returns `None` if the given key does not refer to an node in the
661    /// bucket.
662    pub fn get(&self, key: &Key<TNodeId>) -> Option<&Node<TNodeId, TVal>> {
663        self.nodes.iter().find(move |p| &p.key == key)
664    }
665
666    /// Returns whether the bucket has reached its maximum capacity of incoming nodes. This is used
667    /// to determine if new nodes can be added to the bucket or not.
668    fn is_max_incoming(&self) -> bool {
669        self.nodes
670            .iter()
671            .filter(|node| node.status.is_connected() && node.status.is_incoming())
672            .count()
673            >= self.max_incoming
674    }
675
676    /// Update the `first_connected_pos` for the removal of a node at position `removed_pos`.
677    ///
678    /// This function should be called *after* removing the node. It has the ability to destroy
679    /// the bucket's internal consistency invariants if misused.
680    fn update_first_connected_pos_for_removal(&mut self, removed_pos: usize) {
681        self.first_connected_pos = self.first_connected_pos.and_then(|fcp| {
682            if removed_pos < fcp {
683                // Remove node is before the first connected position, decrement it.
684                Some(fcp - 1)
685            } else {
686                // FCP is unchanged, unless there are no nodes following the removed node.
687                Some(fcp).filter(|_| fcp < self.nodes.len())
688            }
689        });
690    }
691}
692
693impl<TNodeId: std::fmt::Debug, TVal: Eq + std::fmt::Debug> std::fmt::Debug
694    for KBucket<TNodeId, TVal>
695{
696    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
697        f.debug_struct("KBucket")
698            .field("nodes", &self.nodes)
699            .field("first_connected_pos", &self.first_connected_pos)
700            .field("pending", &self.pending)
701            .field("pending_timeout", &self.pending_timeout)
702            .field("filter", &self.filter.is_some())
703            .field("max_incoming", &self.max_incoming)
704            .finish()
705    }
706}
707
708impl std::fmt::Display for ConnectionDirection {
709    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710        match &self {
711            ConnectionDirection::Incoming => write!(f, "Incoming"),
712            ConnectionDirection::Outgoing => write!(f, "Outgoing"),
713        }
714    }
715}
716
717#[cfg(test)]
718pub mod tests {
719    use super::*;
720    use enr::NodeId;
721    use quickcheck::*;
722    use rand_07::Rng;
723    use std::{
724        collections::{HashSet, VecDeque},
725        hash::Hash,
726    };
727
728    fn connected_state() -> NodeStatus {
729        NodeStatus {
730            state: ConnectionState::Connected,
731            direction: ConnectionDirection::Outgoing,
732        }
733    }
734
735    fn disconnected_state() -> NodeStatus {
736        NodeStatus {
737            state: ConnectionState::Disconnected,
738            direction: ConnectionDirection::Outgoing,
739        }
740    }
741
742    pub fn arbitrary_node_id<G: Gen>(g: &mut G) -> NodeId {
743        let mut node_id = [0u8; 32];
744        g.fill_bytes(&mut node_id);
745        NodeId::new(&node_id)
746    }
747
748    impl<V> KBucket<NodeId, V>
749    where
750        V: Eq + std::fmt::Debug,
751    {
752        /// Check invariants that must hold on the `KBucket`.
753        fn check_invariants(&self) {
754            self.check_first_connected_pos();
755            self.check_status_ordering();
756            self.check_max_incoming_nodes();
757        }
758
759        /// Check that the cached `first_connected_pos` field matches the list of nodes.
760        fn check_first_connected_pos(&self) {
761            let first_connected_pos = self
762                .nodes
763                .iter()
764                .position(|node| node.status.is_connected());
765            assert_eq!(self.first_connected_pos, first_connected_pos);
766        }
767
768        /// Check that disconnected nodes are listed first, follow by connected nodes.
769        fn check_status_ordering(&self) {
770            let first_connected_pos = self.first_connected_pos.unwrap_or(self.nodes.len());
771            assert!(self.nodes[..first_connected_pos]
772                .iter()
773                .all(|n| !n.status.is_connected()));
774            assert!(self.nodes[first_connected_pos..]
775                .iter()
776                .all(|n| n.status.is_connected()));
777        }
778
779        /// Check that the limit on incoming connections is respected.
780        fn check_max_incoming_nodes(&self) {
781            let number_of_incoming_nodes = self
782                .nodes
783                .iter()
784                .filter(|n| n.status.is_connected() && n.status.is_incoming())
785                .count();
786            assert!(number_of_incoming_nodes <= self.max_incoming);
787        }
788    }
789
790    impl<V> Arbitrary for KBucket<NodeId, V>
791    where
792        V: Arbitrary + Eq,
793    {
794        fn arbitrary<G: Gen>(g: &mut G) -> KBucket<NodeId, V> {
795            let timeout = Duration::from_secs(g.gen_range(1, g.size() as u64));
796            let mut bucket = KBucket::<NodeId, V>::new(timeout, MAX_NODES_PER_BUCKET, None);
797            let num_nodes = g.gen_range(1, MAX_NODES_PER_BUCKET + 1);
798            for _ in 0..num_nodes {
799                loop {
800                    let node = Node::arbitrary(g);
801                    match bucket.insert(node) {
802                        InsertResult::Inserted => break,
803                        InsertResult::TooManyIncoming => {}
804                        _ => panic!(),
805                    }
806                }
807            }
808            bucket
809        }
810    }
811
812    impl<V> Arbitrary for Node<NodeId, V>
813    where
814        V: Arbitrary + Eq,
815    {
816        fn arbitrary<G: Gen>(g: &mut G) -> Self {
817            let key = Key::from(arbitrary_node_id(g));
818            Node {
819                key,
820                value: V::arbitrary(g),
821                status: NodeStatus::arbitrary(g),
822            }
823        }
824    }
825
826    impl Arbitrary for NodeStatus {
827        fn arbitrary<G: Gen>(g: &mut G) -> NodeStatus {
828            match g.gen_range(1, 4) {
829                1 => NodeStatus {
830                    direction: ConnectionDirection::Incoming,
831                    state: ConnectionState::Connected,
832                },
833                2 => NodeStatus {
834                    direction: ConnectionDirection::Outgoing,
835                    state: ConnectionState::Connected,
836                },
837                3 => NodeStatus {
838                    direction: ConnectionDirection::Incoming,
839                    state: ConnectionState::Disconnected,
840                },
841                4 => NodeStatus {
842                    direction: ConnectionDirection::Outgoing,
843                    state: ConnectionState::Disconnected,
844                },
845                x => unreachable!("Should not generate numbers out of this range {}", x),
846            }
847        }
848    }
849
850    impl Arbitrary for Position {
851        fn arbitrary<G: Gen>(g: &mut G) -> Position {
852            Position(g.gen_range(0, MAX_NODES_PER_BUCKET))
853        }
854    }
855
856    // Fill a bucket with random nodes with the given status.
857    fn fill_bucket(bucket: &mut KBucket<NodeId, ()>, status: NodeStatus) {
858        let num_entries_start = bucket.num_entries();
859        for i in 0..MAX_NODES_PER_BUCKET - num_entries_start {
860            let key = Key::from(NodeId::random());
861            let node = Node {
862                key,
863                value: (),
864                status,
865            };
866            assert_eq!(InsertResult::Inserted, bucket.insert(node));
867            assert_eq!(bucket.num_entries(), num_entries_start + i + 1);
868        }
869    }
870
871    /// Filter for testing that returns true if the value is in `self.set`.
872    #[derive(Debug, Clone)]
873    pub struct SetFilter<T> {
874        set: HashSet<T>,
875    }
876
877    impl<T> Filter<T> for SetFilter<T>
878    where
879        T: Clone + Hash + Eq + Send + Sync + 'static,
880    {
881        fn filter(&self, value: &T, _: &mut dyn Iterator<Item = &T>) -> bool {
882            self.set.contains(value)
883        }
884    }
885
886    /// Enum encoding mutable method calls on KBucket, implements Arbitrary.
887    #[derive(Debug, Clone)]
888    pub enum Action<TVal>
889    where
890        TVal: Eq,
891    {
892        Insert(Node<NodeId, TVal>),
893        Remove(usize),
894        UpdatePending(NodeStatus),
895        ApplyPending,
896        UpdateStatus(usize, NodeStatus),
897        UpdateValue(usize, TVal),
898    }
899
900    impl<V> Arbitrary for Action<V>
901    where
902        V: Arbitrary + Eq,
903    {
904        fn arbitrary<G: Gen>(g: &mut G) -> Self {
905            match g.gen_range(0, 6) {
906                0 => Action::Insert(<_>::arbitrary(g)),
907                1 => Action::Remove(<_>::arbitrary(g)),
908                2 => Action::UpdatePending(<_>::arbitrary(g)),
909                3 => Action::ApplyPending,
910                4 => Action::UpdateStatus(<_>::arbitrary(g), <_>::arbitrary(g)),
911                5 => Action::UpdateValue(<_>::arbitrary(g), <_>::arbitrary(g)),
912                _ => panic!("wrong number of action variants"),
913            }
914        }
915    }
916
917    impl<V> KBucket<NodeId, V>
918    where
919        V: Eq + std::fmt::Debug,
920    {
921        fn apply_action(&mut self, action: Action<V>) -> Result<(), FailureReason> {
922            match action {
923                Action::Insert(node) => match self.insert(node) {
924                    InsertResult::FailedFilter => Err(FailureReason::BucketFilter),
925                    InsertResult::TooManyIncoming => Err(FailureReason::TooManyIncoming),
926                    InsertResult::Full => Err(FailureReason::BucketFull),
927                    _ => Ok(()),
928                },
929                Action::Remove(pos) => {
930                    if let Some(key) = self.key_of_pos(pos) {
931                        self.remove(&key);
932                    }
933                    Ok(())
934                }
935                Action::UpdatePending(status) => {
936                    self.update_pending(status);
937                    Ok(())
938                }
939                Action::ApplyPending => {
940                    self.apply_pending();
941                    Ok(())
942                }
943                Action::UpdateStatus(pos, status) => {
944                    if let Some(key) = self.key_of_pos(pos) {
945                        match self.update_status(&key, status.state, Some(status.direction)) {
946                            UpdateResult::Failed(reason) => Err(reason),
947                            _ => Ok(()),
948                        }
949                    } else {
950                        Ok(())
951                    }
952                }
953                Action::UpdateValue(pos, value) => {
954                    if let Some(key) = self.key_of_pos(pos) {
955                        match self.update_value(&key, value) {
956                            UpdateResult::Failed(reason) => Err(reason),
957                            _ => Ok(()),
958                        }
959                    } else {
960                        Ok(())
961                    }
962                }
963            }
964        }
965
966        fn key_of_pos(&self, pos: usize) -> Option<Key<NodeId>> {
967            let num_nodes = self.num_entries();
968            if num_nodes > 0 {
969                let pos = pos % num_nodes;
970                let key = self.nodes[pos].key.clone();
971                Some(key)
972            } else {
973                None
974            }
975        }
976    }
977
978    #[test]
979    fn ordering() {
980        fn prop(status: Vec<NodeStatus>) -> bool {
981            let mut bucket =
982                KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
983
984            // The expected lists of connected and disconnected nodes.
985            let mut connected = VecDeque::new();
986            let mut disconnected = VecDeque::new();
987
988            // Fill the bucket, thereby populating the expected lists in insertion order.
989            for status in status {
990                let key = Key::from(NodeId::random());
991                let node = Node {
992                    key: key.clone(),
993                    value: (),
994                    status,
995                };
996                let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
997                if let InsertResult::Inserted = bucket.insert(node) {
998                    let vec = if status.is_connected() {
999                        &mut connected
1000                    } else {
1001                        &mut disconnected
1002                    };
1003                    if full {
1004                        vec.pop_front();
1005                    }
1006                    vec.push_back((status, key.clone()));
1007                }
1008            }
1009
1010            // Get all nodes from the bucket, together with their status.
1011            let mut nodes = bucket
1012                .iter()
1013                .map(|n| (n.status, n.key.clone()))
1014                .collect::<Vec<_>>();
1015
1016            // Split the list of nodes at the first connected node.
1017            let first_connected_pos = nodes.iter().position(|(status, _)| status.is_connected());
1018            assert_eq!(bucket.first_connected_pos, first_connected_pos);
1019            let tail = first_connected_pos.map_or(Vec::new(), |p| nodes.split_off(p));
1020
1021            // All nodes before the first connected node must be disconnected and
1022            // in insertion order. Similarly, all remaining nodes must be connected
1023            // and in insertion order.
1024            disconnected == nodes && connected == tail
1025        }
1026
1027        quickcheck(prop as fn(_) -> _);
1028    }
1029
1030    #[test]
1031    fn full_bucket() {
1032        let mut bucket =
1033            KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
1034
1035        let disconnected_status = NodeStatus {
1036            state: ConnectionState::Disconnected,
1037            direction: ConnectionDirection::Outgoing,
1038        };
1039        // Fill the bucket with disconnected nodes.
1040        fill_bucket(&mut bucket, disconnected_status);
1041
1042        // Trying to insert another disconnected node fails.
1043        let key = Key::from(NodeId::random());
1044        let node = Node {
1045            key,
1046            value: (),
1047            status: disconnected_status,
1048        };
1049        match bucket.insert(node) {
1050            InsertResult::Full => {}
1051            x => panic!("{:?}", x),
1052        }
1053
1054        // One-by-one fill the bucket with connected nodes, replacing the disconnected ones.
1055        for i in 0..MAX_NODES_PER_BUCKET {
1056            let first = bucket.iter().next().unwrap();
1057            let first_disconnected = first.clone();
1058            assert_eq!(first.status, disconnected_status);
1059
1060            // Add a connected node, which is expected to be pending, scheduled to
1061            // replace the first (i.e. least-recently connected) node.
1062            let key = Key::from(NodeId::random());
1063            let node = Node {
1064                key: key.clone(),
1065                value: (),
1066                status: connected_state(),
1067            };
1068            match bucket.insert(node.clone()) {
1069                InsertResult::Pending { disconnected } => {
1070                    assert_eq!(disconnected, first_disconnected.key)
1071                }
1072                x => panic!("{:?}", x),
1073            }
1074
1075            // Trying to insert another connected node fails.
1076            match bucket.insert(node.clone()) {
1077                InsertResult::Full => {}
1078                x => panic!("{:?}", x),
1079            }
1080
1081            assert!(bucket.pending().is_some());
1082
1083            // Apply the pending node.
1084            let pending = bucket.pending_mut().expect("No pending node.");
1085            pending.set_ready_at(Instant::now().checked_sub(Duration::from_secs(1)).unwrap());
1086            let result = bucket.apply_pending();
1087            assert_eq!(
1088                result,
1089                Some(AppliedPending {
1090                    inserted: key.clone(),
1091                    evicted: Some(first_disconnected)
1092                })
1093            );
1094            assert_eq!(
1095                Some(connected_state()),
1096                bucket.iter().map(|v| v.status).last()
1097            );
1098            assert!(bucket.pending().is_none());
1099            assert_eq!(
1100                Some(MAX_NODES_PER_BUCKET - (i + 1)),
1101                bucket.first_connected_pos
1102            );
1103        }
1104
1105        assert!(bucket.pending().is_none());
1106        assert_eq!(MAX_NODES_PER_BUCKET, bucket.num_entries());
1107
1108        // Trying to insert another connected node fails.
1109        let key = Key::from(NodeId::random());
1110        let node = Node {
1111            key,
1112            value: (),
1113            status: connected_state(),
1114        };
1115        match bucket.insert(node) {
1116            InsertResult::Full => {}
1117            x => panic!("{:?}", x),
1118        }
1119    }
1120
1121    #[test]
1122    fn full_bucket_discard_pending() {
1123        let mut bucket =
1124            KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
1125        fill_bucket(&mut bucket, disconnected_state());
1126        let first = bucket.iter().next().unwrap();
1127        let first_disconnected = first.clone();
1128
1129        // Add a connected pending node.
1130        let key = Key::from(NodeId::random());
1131        let node = Node {
1132            key: key.clone(),
1133            value: (),
1134            status: connected_state(),
1135        };
1136        if let InsertResult::Pending { disconnected } = bucket.insert(node) {
1137            assert_eq!(&disconnected, &first_disconnected.key);
1138        } else {
1139            panic!()
1140        }
1141        assert!(bucket.pending().is_some());
1142
1143        // Update the status of the first disconnected node to be connected.
1144        let _ = bucket.update_status(&first_disconnected.key, ConnectionState::Connected, None);
1145
1146        // The pending node has been discarded.
1147        assert!(bucket.pending().is_none());
1148        assert!(bucket.iter().all(|n| n.key != key));
1149
1150        // The initially disconnected node is now the most-recently connected.
1151        assert_eq!(
1152            Some((&first_disconnected.key, connected_state())),
1153            bucket.iter().map(|v| (&v.key, v.status)).last()
1154        );
1155        assert_eq!(
1156            bucket.position(&first_disconnected.key).map(|p| p.0),
1157            bucket.first_connected_pos
1158        );
1159        assert_eq!(1, bucket.num_connected());
1160        assert_eq!(MAX_NODES_PER_BUCKET - 1, bucket.num_disconnected());
1161    }
1162
1163    /// No duplicate nodes can be inserted via the apply_pending function.
1164    #[test]
1165    fn full_bucket_applied_no_duplicates() {
1166        // First fill the bucket with connected nodes.
1167        let mut bucket =
1168            KBucket::<NodeId, ()>::new(Duration::from_secs(1), MAX_NODES_PER_BUCKET, None);
1169        fill_bucket(&mut bucket, connected_state());
1170
1171        let first = bucket.iter().next().unwrap().clone();
1172
1173        let third = bucket.iter().nth(2).unwrap().clone();
1174
1175        // Set the first connected node as disconnected
1176
1177        assert_eq!(
1178            bucket.update_status(&first.key, ConnectionState::Disconnected, None),
1179            UpdateResult::Updated
1180        );
1181
1182        // Add a connected pending node.
1183        let key = Key::from(NodeId::random());
1184        let node = Node {
1185            key,
1186            value: (),
1187            status: connected_state(),
1188        };
1189
1190        // Add a pending node
1191        if let InsertResult::Pending { disconnected } = bucket.insert(node.clone()) {
1192            assert_eq!(&disconnected, &first.key);
1193        } else {
1194            panic!()
1195        }
1196        assert!(bucket.pending().is_some());
1197
1198        // A misc node gets dropped, because it may not pass a filter when updating its connection
1199        // status.
1200        bucket.remove(&third.key);
1201
1202        // The pending nodes status gets updated
1203        // Apply pending gets called within kbuckets, so we mimic here.
1204        // The pending time hasn't elapsed so nothing should occur.
1205        assert_eq!(bucket.apply_pending(), None);
1206        assert_eq!(bucket.insert(node.clone()), InsertResult::Inserted);
1207        assert!(bucket.pending.is_none());
1208
1209        // Speed up the pending time
1210        if let Some(pending) = bucket.pending.as_mut() {
1211            pending.replace = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
1212        }
1213
1214        // At some later time apply pending
1215        assert_eq!(bucket.apply_pending(), None);
1216        // And try and update the status of the pending node
1217        assert_eq!(
1218            bucket.update_status(&node.key, ConnectionState::Connected, None),
1219            UpdateResult::NotModified
1220        );
1221    }
1222
1223    #[test]
1224    fn bucket_update_status() {
1225        fn prop(mut bucket: KBucket<NodeId, ()>, pos: Position, status: NodeStatus) -> bool {
1226            let num_nodes = bucket.num_entries();
1227
1228            // Capture position and key of the random node to update.
1229            let pos = pos.0 % num_nodes;
1230            let key = bucket.nodes[pos].key.clone();
1231
1232            // Record the (ordered) list of status of all nodes in the bucket.
1233            let mut expected = bucket
1234                .iter()
1235                .map(|n| (n.key.clone(), n.status))
1236                .collect::<Vec<_>>();
1237
1238            // Update the node in the bucket.
1239            let _ = bucket.update_status(&key, status.state, Some(status.direction));
1240
1241            // Check that the bucket now contains the node with the new status,
1242            // preserving the status and relative order of all other nodes.
1243            let expected_pos = if status.is_connected() {
1244                num_nodes - 1
1245            } else {
1246                bucket.first_connected_pos.unwrap_or(num_nodes) - 1
1247            };
1248            expected.remove(pos);
1249            expected.insert(expected_pos, (key, status));
1250            let actual = bucket
1251                .iter()
1252                .map(|n| (n.key.clone(), n.status))
1253                .collect::<Vec<_>>();
1254            expected == actual
1255        }
1256
1257        quickcheck(prop as fn(_, _, _) -> _);
1258    }
1259
1260    #[test]
1261    fn bucket_update_value_with_filtering() {
1262        fn prop(
1263            mut bucket: KBucket<NodeId, u8>,
1264            pos: Position,
1265            value: u8,
1266            value_matches_filter: bool,
1267        ) -> bool {
1268            // Initialise filter.
1269            let filter = SetFilter {
1270                set: value_matches_filter.then_some(value).into_iter().collect(),
1271            };
1272            bucket.filter = Some(Box::new(filter));
1273
1274            let num_nodes = bucket.num_entries();
1275
1276            // Capture position and key of the random node to update.
1277            let pos = pos.0 % num_nodes;
1278            let key = bucket.nodes[pos].key.clone();
1279
1280            // Record the (ordered) list of values of all nodes in the bucket.
1281            let mut expected = bucket
1282                .iter()
1283                .map(|n| (n.key.clone(), n.value))
1284                .collect::<Vec<_>>();
1285
1286            // Update the node in the bucket.
1287            let _ = bucket.update_value(&key, value);
1288
1289            bucket.check_invariants();
1290
1291            // Check that the bucket now contains the node with the new value, or that the node
1292            // has been removed.
1293            if value_matches_filter || expected[pos].1 == value {
1294                expected[pos].1 = value;
1295            } else {
1296                expected.remove(pos);
1297            }
1298            let actual = bucket
1299                .iter()
1300                .map(|n| (n.key.clone(), n.value))
1301                .collect::<Vec<_>>();
1302            expected == actual
1303        }
1304
1305        quickcheck(prop as fn(_, _, _, _) -> _);
1306    }
1307
1308    /// Hammer a bucket with random mutations to ensure invariants are always maintained.
1309    #[test]
1310    fn random_actions_with_filtering() {
1311        fn prop(
1312            initial_nodes: Vec<Node<NodeId, u8>>,
1313            pending_timeout_millis: u64,
1314            max_incoming: usize,
1315            filter_set: HashSet<u8>,
1316            actions: Vec<Action<u8>>,
1317        ) -> bool {
1318            let filter = SetFilter { set: filter_set };
1319            let pending_timeout = Duration::from_millis(pending_timeout_millis);
1320            let mut kbucket =
1321                KBucket::<NodeId, u8>::new(pending_timeout, max_incoming, Some(Box::new(filter)));
1322
1323            for node in initial_nodes {
1324                let _ = kbucket.insert(node);
1325            }
1326
1327            for action in actions {
1328                // Throwing random nodes into a bucket will likely cause some actions to fail as
1329                // they don't pass the filter. We ignore these errors and rely on the
1330                // `check_invariants()` to ensure the insert/update action failed appropriately.
1331                let _ = kbucket.apply_action(action);
1332                kbucket.check_invariants();
1333            }
1334            true
1335        }
1336
1337        quickcheck(prop as fn(_, _, _, _, _) -> _);
1338    }
1339
1340    #[test]
1341    fn table_update_status_connection() {
1342        let max_incoming = 7;
1343        let mut bucket = KBucket::<NodeId, ()>::new(Duration::from_secs(1), max_incoming, None);
1344
1345        let mut incoming_connected = 0;
1346        let mut keys = Vec::new();
1347        for _ in 0..MAX_NODES_PER_BUCKET {
1348            let key = Key::from(NodeId::random());
1349            keys.push(key.clone());
1350            incoming_connected += 1;
1351            let direction = if incoming_connected <= max_incoming {
1352                ConnectionDirection::Incoming
1353            } else {
1354                ConnectionDirection::Outgoing
1355            };
1356            let status = NodeStatus {
1357                state: ConnectionState::Connected,
1358                direction,
1359            };
1360            let node = Node {
1361                key: key.clone(),
1362                value: (),
1363                status,
1364            };
1365            assert_eq!(InsertResult::Inserted, bucket.insert(node));
1366        }
1367
1368        // Bucket is full
1369        // Attempt to modify a new state
1370        let result = bucket.update_status(
1371            &keys[max_incoming],
1372            ConnectionState::Disconnected,
1373            Some(ConnectionDirection::Incoming),
1374        );
1375        assert_eq!(result, UpdateResult::Updated);
1376        let result = bucket.update_status(
1377            &keys[max_incoming],
1378            ConnectionState::Connected,
1379            Some(ConnectionDirection::Outgoing),
1380        );
1381        assert_eq!(result, UpdateResult::UpdatedAndPromoted);
1382        let result = bucket.update_status(
1383            &keys[max_incoming],
1384            ConnectionState::Connected,
1385            Some(ConnectionDirection::Outgoing),
1386        );
1387        assert_eq!(result, UpdateResult::NotModified);
1388        let result = bucket.update_status(
1389            &keys[max_incoming],
1390            ConnectionState::Connected,
1391            Some(ConnectionDirection::Incoming),
1392        );
1393        assert_eq!(result, UpdateResult::Failed(FailureReason::TooManyIncoming));
1394    }
1395
1396    #[test]
1397    fn bucket_max_incoming_nodes() {
1398        fn prop(status: Vec<NodeStatus>) -> bool {
1399            let max_incoming_nodes = 5;
1400            let mut bucket =
1401                KBucket::<NodeId, ()>::new(Duration::from_secs(1), max_incoming_nodes, None);
1402
1403            // The expected lists of connected and disconnected nodes.
1404            let mut connected = VecDeque::new();
1405            let mut disconnected = VecDeque::new();
1406
1407            // Fill the bucket, thereby populating the expected lists in insertion order.
1408            for status in status {
1409                let key = Key::from(NodeId::random());
1410                let node = Node {
1411                    key: key.clone(),
1412                    value: (),
1413                    status,
1414                };
1415                let full = bucket.num_entries() == MAX_NODES_PER_BUCKET;
1416                match bucket.insert(node) {
1417                    InsertResult::Inserted => {
1418                        let vec = if status.is_connected() {
1419                            &mut connected
1420                        } else {
1421                            &mut disconnected
1422                        };
1423                        if full {
1424                            vec.pop_front();
1425                        }
1426                        vec.push_back((status, key.clone()));
1427                    }
1428                    InsertResult::FailedFilter => break,
1429                    _ => {}
1430                }
1431            }
1432
1433            // Check all invariants.
1434            bucket.check_invariants();
1435
1436            // Get all nodes from the bucket, together with their status.
1437            let mut nodes = bucket
1438                .iter()
1439                .map(|n| (n.status, n.key.clone()))
1440                .collect::<Vec<_>>();
1441
1442            // Split the list of nodes at the first connected node.
1443            let tail = bucket
1444                .first_connected_pos
1445                .map_or(Vec::new(), |p| nodes.split_off(p));
1446
1447            // All nodes before the first connected node must be disconnected and
1448            // in insertion order. Similarly, all remaining nodes must be connected
1449            // and in insertion order.
1450            // The number of incoming nodes does not exceed the maximum limit.
1451            disconnected == nodes && connected == tail
1452        }
1453
1454        quickcheck(prop as fn(_) -> _);
1455    }
1456}