libp2p_kad/kbucket/
sub_bucket.rs

1/*
2 * Copyright 2019 Fluence Labs Limited
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17use std::time::Instant;
18use log::debug;
19
20enum ChangePosition {
21    AddDisconnected,
22    // num_entries – number of nodes in a bucket BEFORE appending
23    AppendConnected { num_entries: usize },
24    RemoveConnected,
25    RemoveDisconnected,
26}
27
28/// The status of a node in a bucket.
29///
30/// The status of a node in a bucket together with the time of the
31/// last status change determines the position of the node in a
32/// bucket.
33#[derive(PartialEq, Eq, Debug, Copy, Clone)]
34pub enum NodeStatus {
35    /// The node is considered connected.
36    Connected,
37    /// The node is considered disconnected.
38    Disconnected,
39}
40
41/// A `PendingNode` is a `Node` that is pending insertion into a `KBucket`.
42#[derive(Debug, Clone)]
43pub struct PendingNode<TKey, TVal> {
44    /// The pending node to insert.
45    pub node: Node<TKey, TVal>,
46
47    /// The status of the pending node.
48    pub status: NodeStatus,
49
50    /// The instant at which the pending node is eligible for insertion into a bucket.
51    pub replace: Instant,
52}
53
54impl<TKey, TVal> PendingNode<TKey, TVal> {
55    pub fn key(&self) -> &TKey {
56        &self.node.key
57    }
58
59    pub fn status(&self) -> NodeStatus {
60        self.status
61    }
62
63    pub fn value_mut(&mut self) -> &mut TVal {
64        &mut self.node.value
65    }
66
67    pub fn is_ready(&self) -> bool {
68        Instant::now() >= self.replace
69    }
70
71    pub fn set_ready_at(&mut self, t: Instant) {
72        self.replace = t;
73    }
74
75    pub fn into_node(self) -> Node<TKey, TVal> {
76        self.node
77    }
78}
79
80/// A `Node` in a bucket, representing a peer participating
81/// in the Kademlia DHT together with an associated value (e.g. contact
82/// information).
83#[derive(Debug, Clone, PartialEq, Eq)]
84pub struct Node<TKey, TVal> {
85    /// The key of the node, identifying the peer.
86    pub key: TKey,
87    /// The associated value.
88    pub value: TVal,
89    pub weight: u32,
90}
91
92/// The position of a node in a `KBucket`, i.e. a non-negative integer
93/// in the range `[0, K_VALUE)`.
94#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
95pub struct Position(pub usize);
96
97#[derive(Debug, Clone)]
98pub struct SubBucket<Node> {
99    pub nodes: Vec<Node>,
100    pub first_connected_pos: Option<usize>,
101    pub capacity: usize,
102}
103
104impl<Node> SubBucket<Node> {
105    pub fn new(capacity: usize) -> Self {
106        Self {
107            nodes: Vec::with_capacity(capacity + 1),
108            first_connected_pos: None,
109            capacity,
110        }
111    }
112
113    pub fn status(&self, pos: Position) -> NodeStatus {
114        if self.first_connected_pos.map_or(false, |i| pos.0 >= i) {
115            NodeStatus::Connected
116        } else {
117            NodeStatus::Disconnected
118        }
119    }
120
121    /// Returns an iterator over the nodes in the bucket, together with their status.
122    pub fn iter(&self) -> impl Iterator<Item = (&Node, NodeStatus)> {
123        self.nodes
124            .iter()
125            .enumerate()
126            .map(move |(p, n)| (n, self.status(Position(p))))
127    }
128
129    pub fn is_full(&self) -> bool {
130        self.nodes.len() >= self.capacity
131    }
132
133    pub fn all_nodes_connected(&self) -> bool {
134        self.first_connected_pos == Some(0)
135    }
136
137    pub fn append_connected_node(&mut self, node: Node) {
138        // `num_entries` MUST be calculated BEFORE insertion
139        self.change_connected_pos(ChangePosition::AppendConnected {
140            num_entries: self.num_entries(),
141        });
142        self.nodes.push(node);
143    }
144
145    pub fn insert_disconnected_node(&mut self, node: Node) {
146        let current_position = self.first_connected_pos;
147        self.change_connected_pos(ChangePosition::AddDisconnected);
148        match current_position {
149            Some(p) => self.nodes.insert(p, node), // Insert disconnected node just before the first connected node
150            None => self.nodes.push(node),         // Or simply append disconnected node
151        }
152    }
153
154    fn change_connected_pos(&mut self, action: ChangePosition) {
155        match action {
156            ChangePosition::AddDisconnected => {
157                // New disconnected node added => position of the first connected node moved by 1
158                self.first_connected_pos = self.first_connected_pos.map(|p| p + 1)
159            }
160            ChangePosition::AppendConnected { num_entries } => {
161                // If there were no previously connected nodes – set mark to the given one (usually the last one)
162                // Otherwise – keep it the same
163                self.first_connected_pos = self.first_connected_pos.or(Some(num_entries));
164            }
165            ChangePosition::RemoveConnected => {
166                if self.num_connected() == 1 {
167                    // If it was the last connected node
168                    self.first_connected_pos = None // Then mark there is no connected nodes left
169                }
170                // Otherwise – keep mark the same
171            }
172            ChangePosition::RemoveDisconnected => {
173                // If there are connected nodes – lower mark
174                // Otherwise – keep it None
175                self.first_connected_pos = self
176                    .first_connected_pos
177                    .map(|p| p.checked_sub(1).unwrap_or(0))
178            }
179        }
180    }
181
182    pub fn evict_node(&mut self, position: Position) -> Option<Node> {
183        match self.status(position) {
184            NodeStatus::Connected => self.change_connected_pos(ChangePosition::RemoveConnected),
185            NodeStatus::Disconnected => {
186                self.change_connected_pos(ChangePosition::RemoveDisconnected)
187            }
188        }
189        if position.0 >= self.nodes.len() {
190            debug!(
191                "WARNING: tried to evict node at {} while there's only {} nodes",
192                position.0,
193                self.nodes.len()
194            );
195            None
196        } else {
197            Some(self.nodes.remove(position.0))
198        }
199    }
200
201    pub fn pop_node(&mut self) -> Option<Node> {
202        self.evict_node(Position(0))
203    }
204
205    pub fn least_recently_connected(&self) -> Option<&Node> {
206        self.nodes.get(0)
207    }
208
209    pub fn is_least_recently_connected(&self, pos: Position) -> bool {
210        pos.0 == 0
211    }
212
213    /// Checks whether the given position refers to a connected node.
214    pub fn is_connected(&self, pos: Position) -> bool {
215        self.status(pos) == NodeStatus::Connected
216    }
217
218    /// Gets the number of entries currently in the bucket.
219    pub fn num_entries(&self) -> usize {
220        self.nodes.len()
221    }
222
223    /// Gets the number of entries in the bucket that are considered connected.
224    pub fn num_connected(&self) -> usize {
225        self.first_connected_pos
226            .map_or(0, |i| self.num_entries() - i)
227    }
228
229    /// Gets the number of entries in the bucket that are considered disconnected.
230    pub fn num_disconnected(&self) -> usize {
231        self.num_entries() - self.num_connected()
232    }
233
234    /// Gets the position of an node in the bucket.
235    pub fn position<P>(&self, pred: P) -> Option<Position>
236    where
237        P: Fn(&Node) -> bool,
238    {
239        self.nodes.iter().position(pred).map(Position)
240    }
241
242    pub fn get_mut(&mut self, position: Position) -> Option<&mut Node> {
243        self.nodes.get_mut(position.0)
244    }
245}