1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
use super::*;
use core::sync::atomic::Ordering;
impl_veilid_log_facility!("rtab");
/// Routing Table Bucket
/// Stores map of public keys to entries, which may be in multiple routing tables per crypto kind
/// Keeps entries at a particular 'hash coordinate distance' from this cryptokind's node id
/// Helps to keep managed lists at particular distances so we can evict nodes by priority
/// where the priority comes from liveness and age of the entry (older is better)
pub struct Bucket {
/// Component registryo accessor
registry: VeilidComponentRegistry,
/// Map of keys to entries for this bucket
entries: BTreeMap<BareNodeId, Arc<BucketEntry>>,
/// The crypto kind in use for the public keys in this bucket
kind: CryptoKind,
}
pub(super) type EntriesIter<'a> =
alloc::collections::btree_map::Iter<'a, BareNodeId, Arc<BucketEntry>>;
#[derive(Debug, Serialize, Deserialize)]
struct SerializedBucketEntryData {
key: BareNodeId,
value: u32, // index into serialized entries list
}
#[derive(Debug, Serialize, Deserialize)]
struct SerializedBucketData {
entries: Vec<SerializedBucketEntryData>,
}
impl_veilid_component_accessors!(Bucket);
impl Bucket {
pub fn new(registry: VeilidComponentRegistry, kind: CryptoKind) -> Self {
Self {
registry,
entries: BTreeMap::new(),
kind,
}
}
pub(super) fn load_bucket(
&mut self,
data: Vec<u8>,
all_entries: &[Arc<BucketEntry>],
) -> EyreResult<()> {
let bucket_data: SerializedBucketData = deserialize_json_bytes(&data)?;
for e in bucket_data.entries {
self.entries
.insert(e.key, all_entries[e.value as usize].clone());
}
Ok(())
}
pub(super) fn save_bucket(
&self,
all_entries: &mut Vec<Arc<BucketEntry>>,
entry_map: &mut HashMap<*const BucketEntry, u32>,
) -> Vec<u8> {
let mut entries = Vec::new();
for (k, v) in &self.entries {
let entry_index = entry_map.entry(Arc::as_ptr(v)).or_insert_with(|| {
let entry_index = all_entries.len();
all_entries.push(v.clone());
entry_index as u32
});
entries.push(SerializedBucketEntryData {
key: k.clone(),
value: *entry_index,
});
}
let bucket_data = SerializedBucketData { entries };
serialize_json_bytes(bucket_data)
}
/// Create a new entry with a node_id of this crypto kind and return it
pub(super) fn add_new_entry(&mut self, node_id_key: BareNodeId) -> Arc<BucketEntry> {
veilid_log!(self trace "Node added: {}:{}", self.kind, node_id_key);
// Add new entry
let entry = Arc::new(BucketEntry::new(NodeId::new(
self.kind,
node_id_key.clone(),
)));
self.entries.insert(node_id_key, entry.clone());
// Return the new entry
entry
}
/// Add an existing entry with a new node_id for this crypto kind
pub(super) fn add_existing_entry(&mut self, node_id_key: BareNodeId, entry: Arc<BucketEntry>) {
veilid_log!(self trace "Existing node added: {}:{}", self.kind, node_id_key);
// Add existing entry
self.entries.insert(node_id_key, entry);
}
/// Remove an entry with a node_id for this crypto kind from the bucket
pub(super) fn remove_entry(&mut self, node_id_key: &BareNodeId) {
veilid_log!(self trace "Node removed: {}:{}", self.kind, node_id_key);
// Remove the entry
self.entries.remove(node_id_key);
}
pub(super) fn entry(&self, key: &BareNodeId) -> Option<Arc<BucketEntry>> {
self.entries.get(key).cloned()
}
pub(super) fn entries(&self) -> EntriesIter<'_> {
self.entries.iter()
}
pub(super) fn kick(
&mut self,
bucket_depth: usize,
exempt_peers: &BTreeSet<BareNodeId>,
) -> Option<BTreeSet<BareNodeId>> {
// Get number of entries to attempt to purge from bucket
let bucket_len = self.entries.len();
// Don't bother kicking bucket unless it is full
if bucket_len <= bucket_depth {
return None;
}
// Try to purge the newest entries that overflow the bucket
let mut dead_node_ids: BTreeSet<BareNodeId> = BTreeSet::new();
let mut extra_entries = bucket_len - bucket_depth;
// Get the sorted list of kickable entries by their kick order
// Pre-snapshot mutable fields to avoid total-ordering violations from
// concurrent updates between comparisons (Rust 1.81+ driftsort)
let cur_ts = Timestamp::now();
let mut sorted_entries: Vec<(BareNodeId, Arc<BucketEntry>, BucketEntryState, Timestamp)> =
self.entries
.iter()
.filter(|(k, v)| {
// Skip entries with active NodeRef references
if v.ref_count.load(Ordering::Acquire) > 0 {
return false;
}
// Skip exempt entries
if exempt_peers.contains(k) {
return false;
}
true
})
.map(|(k, v)| {
let (state, time_added) =
v.with(|e| (e.state(cur_ts), e.peer_stats().time_added));
(k.clone(), v.clone(), state, time_added)
})
.collect();
sorted_entries.sort_by(|a, b| -> core::cmp::Ordering {
if a.0 == b.0 {
return core::cmp::Ordering::Equal;
}
let astate = a.2.ordering();
let bstate = b.2.ordering();
// first kick punished nodes, then dead nodes, then unreliable nodes
if astate < bstate {
return core::cmp::Ordering::Less;
}
if astate > bstate {
return core::cmp::Ordering::Greater;
}
// then kick by time added, most recent nodes are kicked first
b.3.cmp(&a.3)
});
for (bare_node_id, entry, _, _) in sorted_entries {
// If we're not evicting more entries, exit
if extra_entries == 0 {
break;
}
extra_entries -= 1;
// if no references, lets evict it
dead_node_ids.insert(bare_node_id);
// And remove the node id from the entry
entry.with_mut(|e| e.remove_node_id(self.kind));
}
// Now purge the dead node ids
for id in &dead_node_ids {
// Remove the entry
// The entry may not be completely gone after this happens
// because it may still be in another bucket for a different CryptoKind
self.remove_entry(id);
}
if !dead_node_ids.is_empty() {
Some(dead_node_ids)
} else {
None
}
}
}