libdd_telemetry/worker/
store.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::VecDeque, hash::Hash};
5
6mod queuehashmap {
7    use hashbrown::{hash_table::HashTable, DefaultHashBuilder};
8    use std::{
9        collections::VecDeque,
10        hash::{BuildHasher, Hash},
11    };
12
13    #[derive(Debug)]
14    pub struct QueueHashMap<K, V> {
15        table: HashTable<usize>,
16        hash_builder: DefaultHashBuilder,
17        items: VecDeque<(K, V)>,
18        popped: usize,
19    }
20
21    impl<K, V> QueueHashMap<K, V>
22    where
23        K: PartialEq + Eq + Hash,
24    {
25        pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
26            self.items.iter()
27        }
28
29        pub fn iter_idx(&self) -> impl Iterator<Item = usize> {
30            self.popped..(self.popped + self.items.len())
31        }
32
33        pub fn len(&self) -> usize {
34            self.items.len()
35        }
36
37        pub fn is_empty(&self) -> bool {
38            self.items.is_empty()
39        }
40
41        // Remove the oldest item in the queue and return it
42        pub fn pop_front(&mut self) -> Option<(K, V)> {
43            let (k, v) = self.items.pop_front()?;
44            let hash = make_hash(&self.hash_builder, &k);
45            if let Ok(entry) = self.table.find_entry(hash, |&other| other == self.popped) {
46                entry.remove();
47            }
48            debug_assert!(self.items.len() == self.table.len());
49            self.popped += 1;
50            Some((k, v))
51        }
52
53        pub fn get(&self, k: &K) -> Option<&V> {
54            let hash = make_hash(&self.hash_builder, k);
55            let idx = self
56                .table
57                .find(hash, |other| &self.items[other - self.popped].0 == k)?;
58            Some(&self.items[*idx - self.popped].1)
59        }
60
61        pub fn get_idx(&self, idx: usize) -> Option<&(K, V)> {
62            self.items.get(idx - self.popped)
63        }
64
65        pub fn get_mut_or_insert(&mut self, key: K, default: V) -> (&mut V, bool) {
66            let hash = make_hash(&self.hash_builder, &key);
67            if let Some(idx) = self
68                .table
69                .find(hash, |other| self.items[other - self.popped].0 == key)
70            {
71                return (&mut self.items[*idx - self.popped].1, false);
72            }
73            self.insert_nocheck(hash, key, default);
74
75            #[allow(clippy::unwrap_used)]
76            (&mut self.items.back_mut().unwrap().1, true)
77        }
78
79        // Insert a new item at the back if the queue if it doesn't yet exist.
80        //
81        // If the key already exists, replace the previous value
82        pub fn insert(&mut self, key: K, value: V) -> (usize, bool) {
83            let hash = make_hash(&self.hash_builder, &key);
84            if let Some(idx) = self
85                .table
86                .find(hash, |other| self.items[other - self.popped].0 == key)
87            {
88                self.items[*idx - self.popped].1 = value;
89                (*idx, false)
90            } else {
91                (self.insert_nocheck(hash, key, value), true)
92            }
93        }
94
95        /// # Safety
96        ///
97        /// This function inserts a new item in the store unconditionally
98        /// If the item already exists, it's drop implementation will not be called, and memory
99        /// might leak
100        ///
101        /// The hash needs to be precomputed too
102        fn insert_nocheck(&mut self, hash: u64, key: K, value: V) -> usize {
103            let item_index = self.items.len() + self.popped;
104
105            // Separate set and items since set is mutably borrowed, while items is immutable
106            let Self {
107                table,
108                items,
109                popped,
110                hash_builder,
111                ..
112            } = self;
113            table.insert_unique(hash, item_index, |i| {
114                make_hash(hash_builder, &items[i - *popped].0)
115            });
116            self.items.push_back((key, value));
117            item_index
118        }
119    }
120
121    impl<K, V> Default for QueueHashMap<K, V> {
122        fn default() -> Self {
123            Self {
124                table: HashTable::new(),
125                hash_builder: DefaultHashBuilder::default(),
126                items: VecDeque::new(),
127                popped: 0,
128            }
129        }
130    }
131
132    fn make_hash<T: Hash>(h: &DefaultHashBuilder, i: &T) -> u64 {
133        h.hash_one(i)
134    }
135}
136
137pub use queuehashmap::QueueHashMap;
138
139#[derive(Debug, Default)]
140/// Stores telemetry data item, like dependencies and integrations
141///
142/// * Bounds the length of the collection it uses to prevent memory leaks
143/// * Tries to keep a list of items that it has seen (within max number of items)
144/// * Tries to keep a list of items that haven't been sent to datadog yet
145/// * Deduplicates items, to make sure we don't send the item twice
146pub struct Store<T> {
147    // unflushed and set contain indices into
148    unflushed: VecDeque<usize>,
149    items: QueueHashMap<T, ()>,
150    max_items: usize,
151}
152
153impl<T> Store<T>
154where
155    T: PartialEq + Eq + Hash,
156{
157    pub fn new(max_items: usize) -> Self {
158        Self {
159            unflushed: VecDeque::new(),
160            items: QueueHashMap::default(),
161            max_items,
162        }
163    }
164
165    pub fn insert(&mut self, item: T) {
166        if self.items.get(&item).is_some() {
167            return;
168        }
169        if self.items.len() == self.max_items {
170            self.items.pop_front();
171        }
172        let (idx, _) = self.items.insert(item, ());
173        if self.unflushed.len() == self.max_items {
174            self.unflushed.pop_front();
175        }
176        self.unflushed.push_back(idx);
177    }
178
179    // Reinsert all already flushed items in the flush queue
180    pub fn unflush_stored(&mut self) {
181        self.unflushed.clear();
182        for i in self.items.iter_idx() {
183            self.unflushed.push_back(i);
184        }
185    }
186
187    // Remove the first `count` items in the queue
188    pub fn removed_flushed(&mut self, count: usize) {
189        for _ in 0..count {
190            self.unflushed.pop_front();
191        }
192    }
193
194    pub fn flush_not_empty(&self) -> bool {
195        !self.unflushed.is_empty()
196    }
197
198    pub fn unflushed(&self) -> impl Iterator<Item = &T> {
199        self.unflushed
200            .iter()
201            .flat_map(|i| Some(&self.items.get_idx(*i)?.0))
202    }
203
204    pub fn len_unflushed(&self) -> usize {
205        self.unflushed.len()
206    }
207
208    pub fn len_stored(&self) -> usize {
209        self.items.len()
210    }
211}
212
213impl<T> Extend<T> for Store<T>
214where
215    T: PartialEq + Eq + Hash,
216{
217    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
218        for i in iter {
219            self.insert(i)
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_smoke_insert() {
230        let mut store = Store::new(10);
231        store.insert("hello");
232        store.insert("world");
233        store.insert("world");
234
235        assert_eq!(store.unflushed.len(), 2);
236        assert_eq!(store.items.len(), 2);
237        assert_eq!(store.unflushed().collect::<Vec<_>>(), &[&"hello", &"world"]);
238
239        store.removed_flushed(1);
240        assert_eq!(store.items.len(), 2);
241        assert_eq!(store.unflushed().collect::<Vec<_>>(), &[&"world"]);
242
243        store.removed_flushed(1);
244        assert_eq!(store.items.len(), 2);
245        assert!(store.unflushed().next().is_none());
246
247        store.insert("hello");
248        assert!(store.unflushed().next().is_none());
249    }
250
251    #[test]
252    fn test_insert_spill() {
253        let mut store = Store::new(5);
254        for i in 2..15 {
255            store.insert(i);
256        }
257        assert_eq!(store.unflushed.len(), 5);
258        assert_eq!(store.items.len(), 5);
259
260        assert_eq!(
261            store.unflushed().collect::<Vec<_>>(),
262            &[&10, &11, &12, &13, &14]
263        )
264    }
265
266    #[test]
267    fn test_insert_spill_no_unflush() {
268        let mut store = Store::new(5);
269        for i in 2..7 {
270            store.insert(i);
271        }
272        assert_eq!(store.unflushed.len(), 5);
273
274        assert_eq!(store.unflushed().collect::<Vec<_>>(), &[&2, &3, &4, &5, &6]);
275        store.removed_flushed(4);
276
277        for i in 7..10 {
278            store.insert(i);
279        }
280
281        assert_eq!(store.unflushed.len(), 4);
282        assert_eq!(store.unflushed().collect::<Vec<_>>(), &[&6, &7, &8, &9]);
283    }
284
285    #[test]
286    fn test_unflush_stored() {
287        let mut store = Store::new(5);
288        for i in 2..7 {
289            store.insert(i);
290        }
291        assert_eq!(store.unflushed.len(), 5);
292
293        assert_eq!(store.unflushed().collect::<Vec<_>>(), &[&2, &3, &4, &5, &6]);
294        store.unflush_stored();
295        assert_eq!(store.unflushed().collect::<Vec<_>>(), &[&2, &3, &4, &5, &6]);
296    }
297}