cs_mwc_libp2p_swarm/
registry.rs

1// Copyright 2019 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use mwc_libp2p_core::Multiaddr;
22use smallvec::SmallVec;
23use std::{collections::VecDeque, cmp::Ordering, num::NonZeroUsize};
24use std::ops::{Add, Sub};
25
26/// A ranked collection of [`Multiaddr`] values.
27///
28/// Every address has an associated [score](`AddressScore`) and iterating
29/// over the addresses will return them in order from highest to lowest score.
30///
31/// In addition to the currently held addresses and their score, the collection
32/// keeps track of a limited history of the most-recently added addresses.
33/// This history determines how address scores are reduced over time as old
34/// scores expire in the context of new addresses being added:
35///
36///   * An address's score is increased by a given amount whenever it is
37///     [(re-)added](Addresses::add) to the collection.
38///   * An address's score is decreased by the same amount used when it
39///     was added when the least-recently seen addition is (as per the
40///     limited history) for this address in the context of [`Addresses::add`].
41///   * If an address's score reaches 0 in the context of [`Addresses::add`],
42///     it is removed from the collection.
43///
44#[derive(Debug, Clone)]
45pub struct Addresses {
46    /// The ranked sequence of addresses, from highest to lowest score.
47    ///
48    /// By design, the number of finitely scored addresses stored here is
49    /// never larger (but may be smaller) than the number of historic `reports`
50    /// at any time.
51    registry: SmallVec<[AddressRecord; 8]>,
52    /// The configured limit of the `reports` history of added addresses,
53    /// and thus also of the size of the `registry` w.r.t. finitely scored
54    /// addresses.
55    limit: NonZeroUsize,
56    /// The limited history of added addresses. If the queue reaches the `limit`,
57    /// the first record, i.e. the least-recently added, is removed in the
58    /// context of [`Addresses::add`] and the corresponding record in the
59    /// `registry` has its score reduced accordingly.
60    reports: VecDeque<Report>,
61}
62
63/// An record in a prioritised list of addresses.
64#[derive(Clone, Debug, PartialEq, Eq)]
65#[non_exhaustive]
66pub struct AddressRecord {
67    pub addr: Multiaddr,
68    pub score: AddressScore,
69}
70
71/// A report tracked for a finitely scored address.
72#[derive(Debug, Clone)]
73struct Report {
74    addr: Multiaddr,
75    score: u32,
76}
77
78impl AddressRecord {
79    fn new(addr: Multiaddr, score: AddressScore) -> Self {
80        AddressRecord {
81            addr, score,
82        }
83    }
84}
85
86/// The "score" of an address w.r.t. an ordered collection of addresses.
87///
88/// A score is a measure of the trusworthyness of a particular
89/// observation of an address. The same address may be repeatedly
90/// reported with the same or differing scores.
91#[derive(PartialEq, Eq, Debug, Clone, Copy, Hash)]
92pub enum AddressScore {
93    /// The score is "infinite", i.e. an address with this score is never
94    /// purged from the associated address records and remains sorted at
95    /// the beginning (possibly with other `Infinite`ly scored addresses).
96    Infinite,
97    /// The score is finite, i.e. an address with this score has
98    /// its score increased and decreased as per the frequency of
99    /// reports (i.e. additions) of the same address relative to
100    /// the reports of other addresses.
101    Finite(u32),
102}
103
104impl AddressScore {
105    fn is_zero(&self) -> bool {
106        &AddressScore::Finite(0) == self
107    }
108}
109
110impl PartialOrd for AddressScore {
111    fn partial_cmp(&self, other: &AddressScore) -> Option<Ordering> {
112        Some(self.cmp(other))
113    }
114}
115
116impl Ord for AddressScore {
117    fn cmp(&self, other: &AddressScore) -> Ordering {
118        // Semantics of cardinal numbers with a single infinite cardinal.
119        match (self, other) {
120            (AddressScore::Infinite, AddressScore::Infinite) =>
121                Ordering::Equal,
122            (AddressScore::Infinite, AddressScore::Finite(_)) =>
123                Ordering::Greater,
124            (AddressScore::Finite(_), AddressScore::Infinite) =>
125                Ordering::Less,
126            (AddressScore::Finite(a), AddressScore::Finite(b)) =>
127                a.cmp(b),
128        }
129    }
130}
131
132impl Add for AddressScore {
133    type Output = AddressScore;
134
135    fn add(self, rhs: AddressScore) -> Self::Output {
136        // Semantics of cardinal numbers with a single infinite cardinal.
137        match (self, rhs) {
138            (AddressScore::Infinite, AddressScore::Infinite) =>
139                AddressScore::Infinite,
140            (AddressScore::Infinite, AddressScore::Finite(_)) =>
141                AddressScore::Infinite,
142            (AddressScore::Finite(_), AddressScore::Infinite) =>
143                AddressScore::Infinite,
144            (AddressScore::Finite(a), AddressScore::Finite(b)) =>
145                AddressScore::Finite(a.saturating_add(b))
146        }
147    }
148}
149
150impl Sub<u32> for AddressScore {
151    type Output = AddressScore;
152
153    fn sub(self, rhs: u32) -> Self::Output {
154        // Semantics of cardinal numbers with a single infinite cardinal.
155        match self {
156            AddressScore::Infinite => AddressScore::Infinite,
157            AddressScore::Finite(score) => AddressScore::Finite(score.saturating_sub(rhs))
158        }
159    }
160}
161
162impl Default for Addresses {
163    fn default() -> Self {
164        Addresses::new(NonZeroUsize::new(200).expect("200 > 0"))
165    }
166}
167
168/// The result of adding an address to an ordered list of
169/// addresses with associated scores.
170pub enum AddAddressResult {
171    Inserted,
172    Updated,
173}
174
175impl Addresses {
176    /// Create a new ranked address collection with the given size limit
177    /// for [finitely scored](AddressScore::Finite) addresses.
178    pub fn new(limit: NonZeroUsize) -> Self {
179        Addresses {
180            registry: SmallVec::new(),
181            limit,
182            reports: VecDeque::with_capacity(limit.get()),
183        }
184    }
185
186    /// Add a [`Multiaddr`] to the collection.
187    ///
188    /// If the given address already exists in the collection,
189    /// the given score is added to the current score of the address.
190    ///
191    /// If the collection has already observed the configured
192    /// number of address additions, the least-recently added address
193    /// as per this limited history has its score reduced by the amount
194    /// used in this prior report, with removal from the collection
195    /// occurring when the score drops to 0.
196    pub fn add(&mut self, addr: Multiaddr, score: AddressScore) -> AddAddressResult {
197        // If enough reports (i.e. address additions) occurred, reduce
198        // the score of the least-recently added address.
199        if self.reports.len() == self.limit.get() {
200            let old_report = self.reports.pop_front().expect("len = limit > 0");
201            // If the address is still in the collection, decrease its score.
202            if let Some(record) = self.registry.iter_mut().find(|r| r.addr == old_report.addr) {
203                record.score = record.score - old_report.score;
204                isort(&mut self.registry);
205            }
206        }
207
208        // Remove addresses that have a score of 0.
209        while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) {
210            self.registry.pop();
211        }
212
213        // If the address score is finite, remember this report.
214        if let AddressScore::Finite(score) = score {
215            self.reports.push_back(Report { addr: addr.clone(), score });
216        }
217
218        // If the address is already in the collection, increase its score.
219        for r in &mut self.registry {
220            if r.addr == addr {
221                r.score = r.score + score;
222                isort(&mut self.registry);
223                return AddAddressResult::Updated
224            }
225        }
226
227        // It is a new record.
228        self.registry.push(AddressRecord::new(addr, score));
229        AddAddressResult::Inserted
230    }
231
232    /// Explicitly remove an address from the collection.
233    ///
234    /// Returns `true` if the address existed in the collection
235    /// and was thus removed, false otherwise.
236    pub fn remove(&mut self, addr: &Multiaddr) -> bool {
237        if let Some(pos) = self.registry.iter().position(|r| &r.addr == addr) {
238            self.registry.remove(pos);
239            true
240        } else {
241            false
242        }
243    }
244
245    /// Return an iterator over all [`Multiaddr`] values.
246    ///
247    /// The iteration is ordered by descending score.
248    pub fn iter(&self) -> AddressIter<'_> {
249        AddressIter { items: &self.registry, offset: 0 }
250    }
251
252    /// Return an iterator over all [`Multiaddr`] values.
253    ///
254    /// The iteration is ordered by descending score.
255    pub fn into_iter(self) -> AddressIntoIter {
256        AddressIntoIter { items: self.registry }
257    }
258}
259
260/// An iterator over [`Multiaddr`] values.
261#[derive(Clone)]
262pub struct AddressIter<'a> {
263    items: &'a [AddressRecord],
264    offset: usize
265}
266
267impl<'a> Iterator for AddressIter<'a> {
268    type Item = &'a AddressRecord;
269
270    fn next(&mut self) -> Option<Self::Item> {
271        if self.offset == self.items.len() {
272            return None
273        }
274        let item = &self.items[self.offset];
275        self.offset += 1;
276        Some(&item)
277    }
278
279    fn size_hint(&self) -> (usize, Option<usize>) {
280        let n = self.items.len() - self.offset;
281        (n, Some(n))
282    }
283}
284
285impl<'a> ExactSizeIterator for AddressIter<'a> {}
286
287/// An iterator over [`Multiaddr`] values.
288#[derive(Clone)]
289pub struct AddressIntoIter {
290    items: SmallVec<[AddressRecord; 8]>,
291}
292
293impl Iterator for AddressIntoIter {
294    type Item = AddressRecord;
295
296    fn next(&mut self) -> Option<Self::Item> {
297        if !self.items.is_empty() {
298            Some(self.items.remove(0))
299        } else {
300            None
301        }
302    }
303
304    fn size_hint(&self) -> (usize, Option<usize>) {
305        let n = self.items.len();
306        (n, Some(n))
307    }
308}
309
310impl ExactSizeIterator for AddressIntoIter {}
311
312// Reverse insertion sort.
313fn isort(xs: &mut [AddressRecord]) {
314    for i in 1 .. xs.len() {
315        for j in (1 ..= i).rev() {
316            if xs[j].score <= xs[j - 1].score {
317                break
318            }
319            xs.swap(j, j - 1)
320        }
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use mwc_libp2p_core::multiaddr::{Multiaddr, Protocol};
327    use quickcheck::*;
328    use rand::Rng;
329    use std::num::{NonZeroUsize, NonZeroU8};
330    use super::*;
331
332    impl Arbitrary for AddressScore {
333        fn arbitrary<G: Gen>(g: &mut G) -> AddressScore {
334            if g.gen_range(0, 10) == 0 { // ~10% "Infinitely" scored addresses
335                AddressScore::Infinite
336            } else {
337                AddressScore::Finite(g.gen())
338            }
339        }
340    }
341
342    impl Arbitrary for AddressRecord {
343        fn arbitrary<G: Gen>(g: &mut G) -> Self {
344            let addr = Protocol::Tcp(g.gen::<u16>() % 256).into();
345            let score = AddressScore::arbitrary(g);
346            AddressRecord::new(addr, score)
347        }
348    }
349
350    #[test]
351    fn isort_sorts() {
352        fn property(xs: Vec<AddressScore>) {
353            let mut xs = xs.into_iter()
354                .map(|score| AddressRecord::new(Multiaddr::empty(), score))
355                .collect::<Vec<_>>();
356
357            isort(&mut xs);
358
359            for i in 1 .. xs.len() {
360                assert!(xs[i - 1].score >= xs[i].score)
361            }
362        }
363
364        quickcheck(property as fn(_));
365    }
366
367    #[test]
368    fn score_retention() {
369        fn prop(first: AddressRecord, other: AddressRecord) -> TestResult {
370            if first.addr == other.addr {
371                return TestResult::discard()
372            }
373
374            let mut addresses = Addresses::default();
375
376            // Add the first address.
377            addresses.add(first.addr.clone(), first.score);
378            assert!(addresses.iter().any(|a| &a.addr == &first.addr));
379
380            // Add another address so often that the initial report of
381            // the first address may be purged and, since it was the
382            // only report, the address removed.
383            for _ in 0 .. addresses.limit.get() + 1 {
384                addresses.add(other.addr.clone(), other.score);
385            }
386
387            let exists = addresses.iter().any(|a| &a.addr == &first.addr);
388
389            match (first.score, other.score) {
390                // Only finite scores push out other finite scores.
391                (AddressScore::Finite(_), AddressScore::Finite(_)) => assert!(!exists),
392                _ => assert!(exists),
393            }
394
395            TestResult::passed()
396        }
397
398        quickcheck(prop as fn(_,_) -> _);
399    }
400
401    #[test]
402    fn finitely_scored_address_limit() {
403        fn prop(reports: Vec<AddressRecord>, limit: NonZeroU8) {
404            let mut addresses = Addresses::new(limit.into());
405
406            // Add all reports.
407            for r in reports {
408                addresses.add(r.addr, r.score);
409            }
410
411            // Count the finitely scored addresses.
412            let num_finite = addresses.iter().filter(|r| match r {
413                AddressRecord { score: AddressScore::Finite(_), .. } => true,
414                _ => false,
415            }).count();
416
417            // Check against the limit.
418            assert!(num_finite <= limit.get() as usize);
419        }
420
421        quickcheck(prop as fn(_,_));
422    }
423
424    #[test]
425    fn record_score_sum() {
426        fn prop(records: Vec<AddressRecord>) -> bool {
427            // Make sure the address collection can hold all reports.
428            let n = std::cmp::max(records.len(), 1);
429            let mut addresses = Addresses::new(NonZeroUsize::new(n).unwrap());
430
431            // Add all address reports to the collection.
432            for r in records.iter() {
433                addresses.add(r.addr.clone(), r.score.clone());
434            }
435
436            // Check that each address in the registry has the expected score.
437            for r in &addresses.registry {
438                let expected_score = records.iter().fold(
439                    None::<AddressScore>, |sum, rec|
440                        if &rec.addr == &r.addr {
441                            sum.map_or(Some(rec.score), |s| Some(s + rec.score))
442                        } else {
443                            sum
444                        });
445
446                if Some(r.score) != expected_score {
447                    return false
448                }
449            }
450
451            true
452        }
453
454        quickcheck(prop as fn(_) -> _)
455    }
456}