cs_mwc_libp2p_gossipsub/
time_cache.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! This implements a time-based LRU cache for checking gossipsub message duplicates.
22
23use fnv::FnvHashMap;
24use std::collections::hash_map::{
25    self,
26    Entry::{Occupied, Vacant},
27};
28use std::collections::VecDeque;
29use std::time::{Duration, Instant};
30
31struct ExpiringElement<Element> {
32    /// The element that expires
33    element: Element,
34    /// The expire time.
35    expires: Instant,
36}
37
38pub struct TimeCache<Key, Value> {
39    /// Mapping a key to its value together with its latest expire time (can be updated through
40    /// reinserts).
41    map: FnvHashMap<Key, ExpiringElement<Value>>,
42    /// An ordered list of keys by expires time.
43    list: VecDeque<ExpiringElement<Key>>,
44    /// The time elements remain in the cache.
45    ttl: Duration,
46}
47
48pub struct OccupiedEntry<'a, K, V> {
49    expiration: Instant,
50    entry: hash_map::OccupiedEntry<'a, K, ExpiringElement<V>>,
51    list: &'a mut VecDeque<ExpiringElement<K>>,
52}
53
54impl<'a, K, V> OccupiedEntry<'a, K, V>
55where
56    K: Eq + std::hash::Hash + Clone,
57{
58    pub fn into_mut(self) -> &'a mut V {
59        &mut self.entry.into_mut().element
60    }
61
62    pub fn insert_without_updating_expiration(&mut self, value: V) -> V {
63        //keep old expiration, only replace value of element
64        ::std::mem::replace(&mut self.entry.get_mut().element, value)
65    }
66
67    pub fn insert_and_update_expiration(&mut self, value: V) -> V {
68        //We push back an additional element, the first reference in the list will be ignored
69        // since we also updated the expires in the map, see below.
70        self.list.push_back(ExpiringElement {
71            element: self.entry.key().clone(),
72            expires: self.expiration,
73        });
74        self.entry
75            .insert(ExpiringElement {
76                element: value,
77                expires: self.expiration,
78            })
79            .element
80    }
81}
82
83pub struct VacantEntry<'a, K, V> {
84    expiration: Instant,
85    entry: hash_map::VacantEntry<'a, K, ExpiringElement<V>>,
86    list: &'a mut VecDeque<ExpiringElement<K>>,
87}
88
89impl<'a, K, V> VacantEntry<'a, K, V>
90where
91    K: Eq + std::hash::Hash + Clone,
92{
93    pub fn insert(self, value: V) -> &'a mut V {
94        self.list.push_back(ExpiringElement {
95            element: self.entry.key().clone(),
96            expires: self.expiration,
97        });
98        &mut self
99            .entry
100            .insert(ExpiringElement {
101                element: value,
102                expires: self.expiration,
103            })
104            .element
105    }
106}
107
108pub enum Entry<'a, K: 'a, V: 'a> {
109    Occupied(OccupiedEntry<'a, K, V>),
110    Vacant(VacantEntry<'a, K, V>),
111}
112
113impl<'a, K: 'a, V: 'a> Entry<'a, K, V>
114where
115    K: Eq + std::hash::Hash + Clone,
116{
117    pub fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> &'a mut V {
118        match self {
119            Entry::Occupied(entry) => entry.into_mut(),
120            Entry::Vacant(entry) => entry.insert(default()),
121        }
122    }
123}
124
125impl<Key, Value> TimeCache<Key, Value>
126where
127    Key: Eq + std::hash::Hash + Clone,
128{
129    pub fn new(ttl: Duration) -> Self {
130        TimeCache {
131            map: FnvHashMap::default(),
132            list: VecDeque::new(),
133            ttl,
134        }
135    }
136
137    fn remove_expired_keys(&mut self, now: Instant) {
138        while let Some(element) = self.list.pop_front() {
139            if element.expires > now {
140                self.list.push_front(element);
141                break;
142            }
143            if let Occupied(entry) = self.map.entry(element.element.clone()) {
144                if entry.get().expires <= now {
145                    entry.remove();
146                }
147            }
148        }
149    }
150
151    pub fn entry(&mut self, key: Key) -> Entry<Key, Value> {
152        let now = Instant::now();
153        self.remove_expired_keys(now);
154        match self.map.entry(key) {
155            Occupied(entry) => Entry::Occupied(OccupiedEntry {
156                expiration: now + self.ttl,
157                entry,
158                list: &mut self.list,
159            }),
160            Vacant(entry) => Entry::Vacant(VacantEntry {
161                expiration: now + self.ttl,
162                entry,
163                list: &mut self.list,
164            }),
165        }
166    }
167
168    /// Empties the entire cache.
169    pub fn clear(&mut self) {
170        self.map.clear();
171        self.list.clear();
172    }
173
174    pub fn contains_key(&mut self, key: &Key) -> bool {
175        self.map.contains_key(key)
176    }
177
178    pub fn get(&self, key: &Key) -> Option<&Value> {
179        self.map.get(key).map(|e| &e.element)
180    }
181}
182
183pub struct DuplicateCache<Key>(TimeCache<Key, ()>);
184
185impl<Key> DuplicateCache<Key>
186where
187    Key: Eq + std::hash::Hash + Clone,
188{
189    pub fn new(ttl: Duration) -> Self {
190        Self(TimeCache::new(ttl))
191    }
192
193    // Inserts new elements and removes any expired elements.
194    //
195    // If the key was not present this returns `true`. If the value was already present this
196    // returns `false`.
197    pub fn insert(&mut self, key: Key) -> bool {
198        if let Entry::Vacant(entry) = self.0.entry(key) {
199            entry.insert(());
200            true
201        } else {
202            false
203        }
204    }
205
206    pub fn contains(&mut self, key: &Key) -> bool {
207        self.0.contains_key(key)
208    }
209}
210
211#[cfg(test)]
212mod test {
213    use super::*;
214
215    #[test]
216    fn cache_added_entries_exist() {
217        let mut cache = DuplicateCache::new(Duration::from_secs(10));
218
219        cache.insert("t");
220        cache.insert("e");
221
222        // Should report that 't' and 't' already exists
223        assert!(!cache.insert("t"));
224        assert!(!cache.insert("e"));
225    }
226
227    #[test]
228    fn cache_entries_expire() {
229        let mut cache = DuplicateCache::new(Duration::from_millis(100));
230
231        cache.insert("t");
232        assert!(!cache.insert("t"));
233        cache.insert("e");
234        //assert!(!cache.insert("t"));
235        assert!(!cache.insert("e"));
236        // sleep until cache expiry
237        std::thread::sleep(Duration::from_millis(101));
238        // add another element to clear previous cache
239        cache.insert("s");
240
241        // should be removed from the cache
242        assert!(cache.insert("t"));
243    }
244}