libp2p_pubsub_common/ttl_cache/cache.rs
1use std::hash::Hash;
2use std::time::Duration;
3
4use hashlink::linked_hash_map::{LinkedHashMap, RawEntryMut};
5use instant::Instant;
6
7struct CacheEntry<M> {
8 /// The timestamp at which the message was received.
9 pub timestamp: Instant,
10 /// The message.
11 pub message: M,
12}
13
14/// Cache of messages that we have already seen.
15///
16/// This is used to avoid sending the same message multiple times.
17pub struct Cache<K, V> {
18 /// Maximum number of messages in the cache.
19 capacity: usize,
20
21 /// Time-to-live of messages in the cache.
22 ttl: Duration,
23
24 /// The internal cache data structure.
25 ///
26 /// A `LinkedHashMap` is used to keep track of the insertion order of the messages. The
27 /// oldest insertions are at the front of the map, and the newest insertions are at the back of
28 /// the map.
29 cache: LinkedHashMap<K, CacheEntry<V>>,
30}
31
32impl<K, V> Default for Cache<K, V> {
33 /// Creates a new empty cache.
34 ///
35 /// Capacity defaults to 1024 messages and a time-to-live to 5 seconds.
36 fn default() -> Self {
37 Self::with_capacity_and_ttl(1024, Duration::from_secs(5))
38 }
39}
40
41impl<K, V> Cache<K, V> {
42 /// Creates a new empty cache with the given capacity and time-to-live.
43 #[must_use]
44 pub fn with_capacity_and_ttl(capacity: usize, ttl: Duration) -> Self {
45 Self {
46 capacity,
47 ttl,
48 cache: LinkedHashMap::with_capacity(capacity),
49 }
50 }
51}
52
53impl<K, V> Cache<K, V>
54where
55 K: Eq + Hash + Clone,
56{
57 /// Inserts a message in the cache.
58 ///
59 /// Returns `true` if the message was not already in the cache. Returns `false` if the message
60 /// was already in the cache.
61 ///
62 /// If the source is `None`, then the message is assumed to have been sent by us.
63 pub fn put(&mut self, id: K, message: V) -> bool {
64 let result = match self.cache.raw_entry_mut().from_key(&id) {
65 RawEntryMut::Occupied(mut entry) => {
66 // If the entry has expired but it is still present, update the timestamp
67 // and pretend that the entry was not already in the cache.
68 let was_expired = entry.get().timestamp.elapsed() > self.ttl;
69
70 // Update the insertion time of the entry and push it to the back of the map.
71 entry.get_mut().timestamp = Instant::now();
72 entry.to_back();
73
74 // If entry was expired but it has been refreshed, return `true`.
75 was_expired
76 }
77 RawEntryMut::Vacant(entry) => {
78 let timestamp = Instant::now();
79 entry.insert(id, CacheEntry { timestamp, message });
80
81 true
82 }
83 };
84
85 // If the cache is full, remove the oldest message.
86 if self.cache.len() > self.capacity {
87 self.cache.pop_front();
88 }
89
90 result
91 }
92
93 /// Returns an iterator over all the entries of the cache (expired and not-expired).
94 #[cfg(test)]
95 pub fn iter(&self) -> impl Iterator<Item = (&K, &V)> {
96 self.cache.iter().map(|(id, entry)| (id, &entry.message))
97 }
98
99 /// Returns the number of non-expired messages in the cache.
100 #[must_use]
101 pub fn len(&self) -> usize {
102 self.cache
103 .iter()
104 .skip_while(|(_, entry)| entry.timestamp.elapsed() > self.ttl)
105 .count()
106 }
107
108 /// Returns if the cache is empty.
109 #[must_use]
110 pub fn is_empty(&self) -> bool {
111 self.len() == 0
112 }
113
114 /// Returns `true` if the cache contains a non-expired message with the given ID.
115 #[must_use]
116 pub fn contains_key(&self, id: &K) -> bool {
117 self.cache
118 .get(id)
119 .map(|entry| entry.timestamp.elapsed() <= self.ttl)
120 .unwrap_or(false)
121 }
122
123 /// Returns a reference to the message with the given ID, if it exists in the cache and has not
124 /// expired.
125 #[must_use]
126 pub fn get(&self, id: &K) -> Option<&V> {
127 self.cache
128 .get(id)
129 .filter(|entry| entry.timestamp.elapsed() <= self.ttl)
130 .map(|entry| &entry.message)
131 }
132
133 /// Removes the message with the given ID from the cache.
134 ///
135 /// Returns the removed cache entry, if it existed in the cache and had not expired.
136 pub fn remove(&mut self, id: &K) -> Option<V> {
137 self.cache
138 .remove(id)
139 .filter(|entry| entry.timestamp.elapsed() <= self.ttl)
140 .map(|entry| entry.message)
141 }
142
143 /// Remove all expired messages from the cache.
144 ///
145 /// An entry is considered expired if the elapsed time since the insertion of the entry is
146 /// greater than the time-to-live of the cache, then the entry is considered expired.
147 pub fn clear_expired_entries(&mut self) {
148 let mut to_remove = Vec::new();
149
150 for (id, entry) in self.cache.iter() {
151 if entry.timestamp.elapsed() <= self.ttl {
152 break;
153 }
154
155 to_remove.push(id.clone());
156 }
157
158 for id in to_remove {
159 self.cache.remove(&id);
160 }
161 }
162}