libp2p_pubsub_common/ttl_cache/
cache.rs

1use std::hash::Hash;
2use std::time::Duration;
3
4use hashlink::linked_hash_map::{LinkedHashMap, RawEntryMut};
5use instant::Instant;
6
7struct CacheEntry<M> {
8    /// The timestamp at which the message was received.
9    pub timestamp: Instant,
10    /// The message.
11    pub message: M,
12}
13
14/// Cache of messages that we have already seen.
15///
16/// This is used to avoid sending the same message multiple times.
17pub struct Cache<K, V> {
18    /// Maximum number of messages in the cache.
19    capacity: usize,
20
21    /// Time-to-live of messages in the cache.
22    ttl: Duration,
23
24    /// The internal cache data structure.
25    ///
26    /// A `LinkedHashMap` is used to keep track of the insertion order of the messages. The
27    /// oldest insertions are at the front of the map, and the newest insertions are at the back of
28    /// the map.
29    cache: LinkedHashMap<K, CacheEntry<V>>,
30}
31
32impl<K, V> Default for Cache<K, V> {
33    /// Creates a new empty cache.
34    ///
35    /// Capacity defaults to 1024 messages and a time-to-live to 5 seconds.
36    fn default() -> Self {
37        Self::with_capacity_and_ttl(1024, Duration::from_secs(5))
38    }
39}
40
41impl<K, V> Cache<K, V> {
42    /// Creates a new empty cache with the given capacity and time-to-live.
43    #[must_use]
44    pub fn with_capacity_and_ttl(capacity: usize, ttl: Duration) -> Self {
45        Self {
46            capacity,
47            ttl,
48            cache: LinkedHashMap::with_capacity(capacity),
49        }
50    }
51}
52
53impl<K, V> Cache<K, V>
54where
55    K: Eq + Hash + Clone,
56{
57    /// Inserts a message in the cache.
58    ///
59    /// Returns `true` if the message was not already in the cache. Returns `false` if the message
60    /// was already in the cache.
61    ///
62    /// If the source is `None`, then the message is assumed to have been sent by us.
63    pub fn put(&mut self, id: K, message: V) -> bool {
64        let result = match self.cache.raw_entry_mut().from_key(&id) {
65            RawEntryMut::Occupied(mut entry) => {
66                // If the entry has expired but it is still present, update the timestamp
67                // and pretend that the entry was not already in the cache.
68                let was_expired = entry.get().timestamp.elapsed() > self.ttl;
69
70                // Update the insertion time of the entry and push it to the back of the map.
71                entry.get_mut().timestamp = Instant::now();
72                entry.to_back();
73
74                // If entry was expired but it has been refreshed, return `true`.
75                was_expired
76            }
77            RawEntryMut::Vacant(entry) => {
78                let timestamp = Instant::now();
79                entry.insert(id, CacheEntry { timestamp, message });
80
81                true
82            }
83        };
84
85        // If the cache is full, remove the oldest message.
86        if self.cache.len() > self.capacity {
87            self.cache.pop_front();
88        }
89
90        result
91    }
92
93    /// Returns an iterator over all the entries of the cache (expired and not-expired).
94    #[cfg(test)]
95    pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
96        self.cache.iter().map(|(id, entry)| (id, &entry.message))
97    }
98
99    /// Returns the number of non-expired messages in the cache.
100    #[must_use]
101    pub fn len(&self) -> usize {
102        self.cache
103            .iter()
104            .skip_while(|(_, entry)| entry.timestamp.elapsed() > self.ttl)
105            .count()
106    }
107
108    /// Returns if the cache is empty.
109    #[must_use]
110    pub fn is_empty(&self) -> bool {
111        self.len() == 0
112    }
113
114    /// Returns `true` if the cache contains a non-expired message with the given ID.
115    #[must_use]
116    pub fn contains_key(&self, id: &K) -> bool {
117        self.cache
118            .get(id)
119            .map(|entry| entry.timestamp.elapsed() <= self.ttl)
120            .unwrap_or(false)
121    }
122
123    /// Returns a reference to the message with the given ID, if it exists in the cache and has not
124    /// expired.
125    #[must_use]
126    pub fn get(&self, id: &K) -> Option<&V> {
127        self.cache
128            .get(id)
129            .filter(|entry| entry.timestamp.elapsed() <= self.ttl)
130            .map(|entry| &entry.message)
131    }
132
133    /// Removes the message with the given ID from the cache.
134    ///
135    /// Returns the removed cache entry, if it existed in the cache and had not expired.
136    pub fn remove(&mut self, id: &K) -> Option<V> {
137        self.cache
138            .remove(id)
139            .filter(|entry| entry.timestamp.elapsed() <= self.ttl)
140            .map(|entry| entry.message)
141    }
142
143    /// Remove all expired messages from the cache.
144    ///
145    /// An entry is considered expired if the elapsed time since the insertion of the entry is
146    /// greater than the time-to-live of the cache, then the entry is considered expired.
147    pub fn clear_expired_entries(&mut self) {
148        let mut to_remove = Vec::new();
149
150        for (id, entry) in self.cache.iter() {
151            if entry.timestamp.elapsed() <= self.ttl {
152                break;
153            }
154
155            to_remove.push(id.clone());
156        }
157
158        for id in to_remove {
159            self.cache.remove(&id);
160        }
161    }
162}