commonware_storage/index/
unordered.rs

1//! A memory-efficient index that uses an unordered map internally to map translated keys to
2//! arbitrary values. If you require ordering over the map's keys, consider
3//! [crate::index::ordered::Index] instead.
4
5use crate::{
6    index::{
7        storage::{Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
8        Cursor as CursorTrait, Unordered,
9    },
10    translator::Translator,
11};
12use commonware_runtime::Metrics;
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{
15    hash_map::{Entry, OccupiedEntry, VacantEntry},
16    HashMap,
17};
18
19/// The initial capacity of the internal hashmap. This is a guess at the number of unique keys we
20/// will encounter. The hashmap will grow as needed, but this is a good starting point (covering the
21/// entire [crate::translator::OneCap] range).
22const INITIAL_CAPACITY: usize = 256;
23
24/// Implementation of [IndexEntry] for [OccupiedEntry].
25impl<K: Send + Sync, V: Eq + Send + Sync> IndexEntry<V> for OccupiedEntry<'_, K, Record<V>> {
26    fn get(&self) -> &V {
27        &self.get().value
28    }
29    fn get_mut(&mut self) -> &mut Record<V> {
30        self.get_mut()
31    }
32    fn remove(self) {
33        OccupiedEntry::remove(self);
34    }
35}
36
37/// A cursor for the unordered [Index] that wraps the shared implementation.
38pub struct Cursor<'a, K: Send + Sync, V: Eq + Send + Sync> {
39    inner: CursorImpl<'a, V, OccupiedEntry<'a, K, Record<V>>>,
40}
41
42impl<'a, K: Send + Sync, V: Eq + Send + Sync> Cursor<'a, K, V> {
43    const fn new(
44        entry: OccupiedEntry<'a, K, Record<V>>,
45        keys: &'a Gauge,
46        items: &'a Gauge,
47        pruned: &'a Counter,
48    ) -> Self {
49        Self {
50            inner: CursorImpl::<'a, V, OccupiedEntry<'a, K, Record<V>>>::new(
51                entry, keys, items, pruned,
52            ),
53        }
54    }
55}
56
57impl<K: Send + Sync, V: Eq + Send + Sync> CursorTrait for Cursor<'_, K, V> {
58    type Value = V;
59
60    fn next(&mut self) -> Option<&V> {
61        self.inner.next()
62    }
63
64    fn insert(&mut self, value: V) {
65        self.inner.insert(value)
66    }
67
68    fn delete(&mut self) {
69        self.inner.delete()
70    }
71
72    fn update(&mut self, value: V) {
73        self.inner.update(value)
74    }
75
76    fn prune(&mut self, predicate: &impl Fn(&V) -> bool) {
77        self.inner.prune(predicate)
78    }
79}
80
81/// A memory-efficient index that uses an unordered map internally to map translated keys to
82/// arbitrary values.
83pub struct Index<T: Translator, V: Eq + Send + Sync> {
84    translator: T,
85    map: HashMap<T::Key, Record<V>, T>,
86
87    keys: Gauge,
88    items: Gauge,
89    pruned: Counter,
90}
91
92impl<T: Translator, V: Eq + Send + Sync> Index<T, V> {
93    /// Create a new entry in the index.
94    fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<'_, T::Key, Record<V>>, v: V) {
95        keys.inc();
96        items.inc();
97        vacant.insert(Record {
98            value: v,
99            next: None,
100        });
101    }
102
103    /// Create a new index with the given translator and metrics registry.
104    pub fn new(ctx: impl Metrics, translator: T) -> Self {
105        let s = Self {
106            translator: translator.clone(),
107            map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, translator),
108            keys: Gauge::default(),
109            items: Gauge::default(),
110            pruned: Counter::default(),
111        };
112        ctx.register(
113            "keys",
114            "Number of translated keys in the index",
115            s.keys.clone(),
116        );
117        ctx.register("items", "Number of items in the index", s.items.clone());
118        ctx.register("pruned", "Number of items pruned", s.pruned.clone());
119        s
120    }
121}
122
123impl<T: Translator, V: Eq + Send + Sync> Unordered for Index<T, V> {
124    type Value = V;
125    type Cursor<'a>
126        = Cursor<'a, T::Key, V>
127    where
128        Self: 'a;
129
130    fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
131    where
132        V: 'a,
133    {
134        let k = self.translator.transform(key);
135        self.map
136            .get(&k)
137            .map(|record| ImmutableCursor::new(record))
138            .into_iter()
139            .flatten()
140    }
141
142    fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
143        let k = self.translator.transform(key);
144        match self.map.entry(k) {
145            Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
146                entry,
147                &self.keys,
148                &self.items,
149                &self.pruned,
150            )),
151            Entry::Vacant(_) => None,
152        }
153    }
154
155    fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
156        let k = self.translator.transform(key);
157        match self.map.entry(k) {
158            Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
159                entry,
160                &self.keys,
161                &self.items,
162                &self.pruned,
163            )),
164            Entry::Vacant(entry) => {
165                Self::create(&self.keys, &self.items, entry, value);
166                None
167            }
168        }
169    }
170
171    fn insert(&mut self, key: &[u8], v: V) {
172        let k = self.translator.transform(key);
173        match self.map.entry(k) {
174            Entry::Occupied(entry) => {
175                let mut cursor =
176                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
177                cursor.next();
178                cursor.insert(v);
179            }
180            Entry::Vacant(entry) => {
181                Self::create(&self.keys, &self.items, entry, v);
182            }
183        }
184    }
185
186    fn insert_and_prune(&mut self, key: &[u8], value: V, predicate: impl Fn(&V) -> bool) {
187        let k = self.translator.transform(key);
188        match self.map.entry(k) {
189            Entry::Occupied(entry) => {
190                // Get entry
191                let mut cursor =
192                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
193
194                cursor.prune(&predicate);
195
196                // Add our new value (if not prunable).
197                if !predicate(&value) {
198                    cursor.insert(value);
199                }
200            }
201            Entry::Vacant(entry) => {
202                Self::create(&self.keys, &self.items, entry, value);
203            }
204        }
205    }
206
207    fn prune(&mut self, key: &[u8], predicate: impl Fn(&V) -> bool) {
208        let k = self.translator.transform(key);
209        match self.map.entry(k) {
210            Entry::Occupied(entry) => {
211                // Get cursor
212                let mut cursor =
213                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
214
215                cursor.prune(&predicate);
216            }
217            Entry::Vacant(_) => {}
218        }
219    }
220
221    fn remove(&mut self, key: &[u8]) {
222        // To ensure metrics are accurate, we iterate over all conflicting values and remove them
223        // one-by-one (rather than just removing the entire entry).
224        self.prune(key, |_| true);
225    }
226
227    #[cfg(test)]
228    fn keys(&self) -> usize {
229        self.map.len()
230    }
231
232    #[cfg(test)]
233    fn items(&self) -> usize {
234        self.items.get() as usize
235    }
236
237    #[cfg(test)]
238    fn pruned(&self) -> usize {
239        self.pruned.get() as usize
240    }
241}
242
243impl<T: Translator, V: Eq + Send + Sync> Drop for Index<T, V> {
244    /// To avoid stack overflow on keys with many collisions, we implement an iterative drop (in
245    /// lieu of Rust's default recursive drop).
246    fn drop(&mut self) {
247        for (_, mut record) in self.map.drain() {
248            let mut next = record.next.take();
249            while let Some(mut record) = next {
250                next = record.next.take();
251            }
252        }
253    }
254}