cs_mwc_libp2p_swarm/registry.rs
1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use mwc_libp2p_core::Multiaddr;
22use smallvec::SmallVec;
23use std::{collections::VecDeque, cmp::Ordering, num::NonZeroUsize};
24use std::ops::{Add, Sub};
25
26/// A ranked collection of [`Multiaddr`] values.
27///
28/// Every address has an associated [score](`AddressScore`) and iterating
29/// over the addresses will return them in order from highest to lowest score.
30///
31/// In addition to the currently held addresses and their score, the collection
32/// keeps track of a limited history of the most-recently added addresses.
33/// This history determines how address scores are reduced over time as old
34/// scores expire in the context of new addresses being added:
35///
36/// * An address's score is increased by a given amount whenever it is
37/// [(re-)added](Addresses::add) to the collection.
38/// * An address's score is decreased by the same amount used when it
39/// was added when the least-recently seen addition is (as per the
40/// limited history) for this address in the context of [`Addresses::add`].
41/// * If an address's score reaches 0 in the context of [`Addresses::add`],
42/// it is removed from the collection.
43///
44#[derive(Debug, Clone)]
45pub struct Addresses {
46 /// The ranked sequence of addresses, from highest to lowest score.
47 ///
48 /// By design, the number of finitely scored addresses stored here is
49 /// never larger (but may be smaller) than the number of historic `reports`
50 /// at any time.
51 registry: SmallVec<[AddressRecord; 8]>,
52 /// The configured limit of the `reports` history of added addresses,
53 /// and thus also of the size of the `registry` w.r.t. finitely scored
54 /// addresses.
55 limit: NonZeroUsize,
56 /// The limited history of added addresses. If the queue reaches the `limit`,
57 /// the first record, i.e. the least-recently added, is removed in the
58 /// context of [`Addresses::add`] and the corresponding record in the
59 /// `registry` has its score reduced accordingly.
60 reports: VecDeque<Report>,
61}
62
63/// An record in a prioritised list of addresses.
64#[derive(Clone, Debug, PartialEq, Eq)]
65#[non_exhaustive]
66pub struct AddressRecord {
67 pub addr: Multiaddr,
68 pub score: AddressScore,
69}
70
71/// A report tracked for a finitely scored address.
72#[derive(Debug, Clone)]
73struct Report {
74 addr: Multiaddr,
75 score: u32,
76}
77
78impl AddressRecord {
79 fn new(addr: Multiaddr, score: AddressScore) -> Self {
80 AddressRecord {
81 addr, score,
82 }
83 }
84}
85
86/// The "score" of an address w.r.t. an ordered collection of addresses.
87///
88/// A score is a measure of the trusworthyness of a particular
89/// observation of an address. The same address may be repeatedly
90/// reported with the same or differing scores.
91#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)]
92pub enum AddressScore {
93 /// The score is "infinite", i.e. an address with this score is never
94 /// purged from the associated address records and remains sorted at
95 /// the beginning (possibly with other `Infinite`ly scored addresses).
96 Infinite,
97 /// The score is finite, i.e. an address with this score has
98 /// its score increased and decreased as per the frequency of
99 /// reports (i.e. additions) of the same address relative to
100 /// the reports of other addresses.
101 Finite(u32),
102}
103
104impl AddressScore {
105 fn is_zero(&self) -> bool {
106 &AddressScore::Finite(0) == self
107 }
108}
109
110impl PartialOrd for AddressScore {
111 fn partial_cmp(&self, other: &AddressScore) -> Option<Ordering> {
112 Some(self.cmp(other))
113 }
114}
115
116impl Ord for AddressScore {
117 fn cmp(&self, other: &AddressScore) -> Ordering {
118 // Semantics of cardinal numbers with a single infinite cardinal.
119 match (self, other) {
120 (AddressScore::Infinite, AddressScore::Infinite) =>
121 Ordering::Equal,
122 (AddressScore::Infinite, AddressScore::Finite(_)) =>
123 Ordering::Greater,
124 (AddressScore::Finite(_), AddressScore::Infinite) =>
125 Ordering::Less,
126 (AddressScore::Finite(a), AddressScore::Finite(b)) =>
127 a.cmp(b),
128 }
129 }
130}
131
132impl Add for AddressScore {
133 type Output = AddressScore;
134
135 fn add(self, rhs: AddressScore) -> Self::Output {
136 // Semantics of cardinal numbers with a single infinite cardinal.
137 match (self, rhs) {
138 (AddressScore::Infinite, AddressScore::Infinite) =>
139 AddressScore::Infinite,
140 (AddressScore::Infinite, AddressScore::Finite(_)) =>
141 AddressScore::Infinite,
142 (AddressScore::Finite(_), AddressScore::Infinite) =>
143 AddressScore::Infinite,
144 (AddressScore::Finite(a), AddressScore::Finite(b)) =>
145 AddressScore::Finite(a.saturating_add(b))
146 }
147 }
148}
149
150impl Sub<u32> for AddressScore {
151 type Output = AddressScore;
152
153 fn sub(self, rhs: u32) -> Self::Output {
154 // Semantics of cardinal numbers with a single infinite cardinal.
155 match self {
156 AddressScore::Infinite => AddressScore::Infinite,
157 AddressScore::Finite(score) => AddressScore::Finite(score.saturating_sub(rhs))
158 }
159 }
160}
161
162impl Default for Addresses {
163 fn default() -> Self {
164 Addresses::new(NonZeroUsize::new(200).expect("200 > 0"))
165 }
166}
167
168/// The result of adding an address to an ordered list of
169/// addresses with associated scores.
170pub enum AddAddressResult {
171 Inserted,
172 Updated,
173}
174
175impl Addresses {
176 /// Create a new ranked address collection with the given size limit
177 /// for [finitely scored](AddressScore::Finite) addresses.
178 pub fn new(limit: NonZeroUsize) -> Self {
179 Addresses {
180 registry: SmallVec::new(),
181 limit,
182 reports: VecDeque::with_capacity(limit.get()),
183 }
184 }
185
186 /// Add a [`Multiaddr`] to the collection.
187 ///
188 /// If the given address already exists in the collection,
189 /// the given score is added to the current score of the address.
190 ///
191 /// If the collection has already observed the configured
192 /// number of address additions, the least-recently added address
193 /// as per this limited history has its score reduced by the amount
194 /// used in this prior report, with removal from the collection
195 /// occurring when the score drops to 0.
196 pub fn add(&mut self, addr: Multiaddr, score: AddressScore) -> AddAddressResult {
197 // If enough reports (i.e. address additions) occurred, reduce
198 // the score of the least-recently added address.
199 if self.reports.len() == self.limit.get() {
200 let old_report = self.reports.pop_front().expect("len = limit > 0");
201 // If the address is still in the collection, decrease its score.
202 if let Some(record) = self.registry.iter_mut().find(|r| r.addr == old_report.addr) {
203 record.score = record.score - old_report.score;
204 isort(&mut self.registry);
205 }
206 }
207
208 // Remove addresses that have a score of 0.
209 while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) {
210 self.registry.pop();
211 }
212
213 // If the address score is finite, remember this report.
214 if let AddressScore::Finite(score) = score {
215 self.reports.push_back(Report { addr: addr.clone(), score });
216 }
217
218 // If the address is already in the collection, increase its score.
219 for r in &mut self.registry {
220 if r.addr == addr {
221 r.score = r.score + score;
222 isort(&mut self.registry);
223 return AddAddressResult::Updated
224 }
225 }
226
227 // It is a new record.
228 self.registry.push(AddressRecord::new(addr, score));
229 AddAddressResult::Inserted
230 }
231
232 /// Explicitly remove an address from the collection.
233 ///
234 /// Returns `true` if the address existed in the collection
235 /// and was thus removed, false otherwise.
236 pub fn remove(&mut self, addr: &Multiaddr) -> bool {
237 if let Some(pos) = self.registry.iter().position(|r| &r.addr == addr) {
238 self.registry.remove(pos);
239 true
240 } else {
241 false
242 }
243 }
244
245 /// Return an iterator over all [`Multiaddr`] values.
246 ///
247 /// The iteration is ordered by descending score.
248 pub fn iter(&self) -> AddressIter<'_> {
249 AddressIter { items: &self.registry, offset: 0 }
250 }
251
252 /// Return an iterator over all [`Multiaddr`] values.
253 ///
254 /// The iteration is ordered by descending score.
255 pub fn into_iter(self) -> AddressIntoIter {
256 AddressIntoIter { items: self.registry }
257 }
258}
259
260/// An iterator over [`Multiaddr`] values.
261#[derive(Clone)]
262pub struct AddressIter<'a> {
263 items: &'a [AddressRecord],
264 offset: usize
265}
266
267impl<'a> Iterator for AddressIter<'a> {
268 type Item = &'a AddressRecord;
269
270 fn next(&mut self) -> Option<Self::Item> {
271 if self.offset == self.items.len() {
272 return None
273 }
274 let item = &self.items[self.offset];
275 self.offset += 1;
276 Some(&item)
277 }
278
279 fn size_hint(&self) -> (usize, Option<usize>) {
280 let n = self.items.len() - self.offset;
281 (n, Some(n))
282 }
283}
284
285impl<'a> ExactSizeIterator for AddressIter<'a> {}
286
287/// An iterator over [`Multiaddr`] values.
288#[derive(Clone)]
289pub struct AddressIntoIter {
290 items: SmallVec<[AddressRecord; 8]>,
291}
292
293impl Iterator for AddressIntoIter {
294 type Item = AddressRecord;
295
296 fn next(&mut self) -> Option<Self::Item> {
297 if !self.items.is_empty() {
298 Some(self.items.remove(0))
299 } else {
300 None
301 }
302 }
303
304 fn size_hint(&self) -> (usize, Option<usize>) {
305 let n = self.items.len();
306 (n, Some(n))
307 }
308}
309
310impl ExactSizeIterator for AddressIntoIter {}
311
312// Reverse insertion sort.
313fn isort(xs: &mut [AddressRecord]) {
314 for i in 1 .. xs.len() {
315 for j in (1 ..= i).rev() {
316 if xs[j].score <= xs[j - 1].score {
317 break
318 }
319 xs.swap(j, j - 1)
320 }
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use mwc_libp2p_core::multiaddr::{Multiaddr, Protocol};
327 use quickcheck::*;
328 use rand::Rng;
329 use std::num::{NonZeroUsize, NonZeroU8};
330 use super::*;
331
332 impl Arbitrary for AddressScore {
333 fn arbitrary<G: Gen>(g: &mut G) -> AddressScore {
334 if g.gen_range(0, 10) == 0 { // ~10% "Infinitely" scored addresses
335 AddressScore::Infinite
336 } else {
337 AddressScore::Finite(g.gen())
338 }
339 }
340 }
341
342 impl Arbitrary for AddressRecord {
343 fn arbitrary<G: Gen>(g: &mut G) -> Self {
344 let addr = Protocol::Tcp(g.gen::<u16>() % 256).into();
345 let score = AddressScore::arbitrary(g);
346 AddressRecord::new(addr, score)
347 }
348 }
349
350 #[test]
351 fn isort_sorts() {
352 fn property(xs: Vec<AddressScore>) {
353 let mut xs = xs.into_iter()
354 .map(|score| AddressRecord::new(Multiaddr::empty(), score))
355 .collect::<Vec<_>>();
356
357 isort(&mut xs);
358
359 for i in 1 .. xs.len() {
360 assert!(xs[i - 1].score >= xs[i].score)
361 }
362 }
363
364 quickcheck(property as fn(_));
365 }
366
367 #[test]
368 fn score_retention() {
369 fn prop(first: AddressRecord, other: AddressRecord) -> TestResult {
370 if first.addr == other.addr {
371 return TestResult::discard()
372 }
373
374 let mut addresses = Addresses::default();
375
376 // Add the first address.
377 addresses.add(first.addr.clone(), first.score);
378 assert!(addresses.iter().any(|a| &a.addr == &first.addr));
379
380 // Add another address so often that the initial report of
381 // the first address may be purged and, since it was the
382 // only report, the address removed.
383 for _ in 0 .. addresses.limit.get() + 1 {
384 addresses.add(other.addr.clone(), other.score);
385 }
386
387 let exists = addresses.iter().any(|a| &a.addr == &first.addr);
388
389 match (first.score, other.score) {
390 // Only finite scores push out other finite scores.
391 (AddressScore::Finite(_), AddressScore::Finite(_)) => assert!(!exists),
392 _ => assert!(exists),
393 }
394
395 TestResult::passed()
396 }
397
398 quickcheck(prop as fn(_,_) -> _);
399 }
400
401 #[test]
402 fn finitely_scored_address_limit() {
403 fn prop(reports: Vec<AddressRecord>, limit: NonZeroU8) {
404 let mut addresses = Addresses::new(limit.into());
405
406 // Add all reports.
407 for r in reports {
408 addresses.add(r.addr, r.score);
409 }
410
411 // Count the finitely scored addresses.
412 let num_finite = addresses.iter().filter(|r| match r {
413 AddressRecord { score: AddressScore::Finite(_), .. } => true,
414 _ => false,
415 }).count();
416
417 // Check against the limit.
418 assert!(num_finite <= limit.get() as usize);
419 }
420
421 quickcheck(prop as fn(_,_));
422 }
423
424 #[test]
425 fn record_score_sum() {
426 fn prop(records: Vec<AddressRecord>) -> bool {
427 // Make sure the address collection can hold all reports.
428 let n = std::cmp::max(records.len(), 1);
429 let mut addresses = Addresses::new(NonZeroUsize::new(n).unwrap());
430
431 // Add all address reports to the collection.
432 for r in records.iter() {
433 addresses.add(r.addr.clone(), r.score.clone());
434 }
435
436 // Check that each address in the registry has the expected score.
437 for r in &addresses.registry {
438 let expected_score = records.iter().fold(
439 None::<AddressScore>, |sum, rec|
440 if &rec.addr == &r.addr {
441 sum.map_or(Some(rec.score), |s| Some(s + rec.score))
442 } else {
443 sum
444 });
445
446 if Some(r.score) != expected_score {
447 return false
448 }
449 }
450
451 true
452 }
453
454 quickcheck(prop as fn(_) -> _)
455 }
456}