Skip to main content

commonware_storage/index/
unordered.rs

1//! A memory-efficient index that uses an unordered map internally to map translated keys to
2//! arbitrary values. If you require ordering over the map's keys, consider
3//! [crate::index::ordered::Index] instead.
4
5use crate::{
6    index::{
7        storage::{Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
8        Cursor as CursorTrait, Unordered,
9    },
10    translator::Translator,
11};
12use commonware_runtime::Metrics;
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{
15    hash_map::{Entry, OccupiedEntry, VacantEntry},
16    HashMap,
17};
18
19/// The initial capacity of the internal hashmap. This is a guess at the number of unique keys we
20/// will encounter. The hashmap will grow as needed, but this is a good starting point (covering the
21/// entire [crate::translator::OneCap] range).
22const INITIAL_CAPACITY: usize = 256;
23
24/// Implementation of [IndexEntry] for [OccupiedEntry].
25impl<K: Send + Sync, V: Eq + Send + Sync> IndexEntry<V> for OccupiedEntry<'_, K, Record<V>> {
26    fn get(&self) -> &V {
27        &self.get().value
28    }
29    fn get_mut(&mut self) -> &mut Record<V> {
30        self.get_mut()
31    }
32    fn remove(self) {
33        OccupiedEntry::remove(self);
34    }
35}
36
37/// A cursor for the unordered [Index] that wraps the shared implementation.
38pub struct Cursor<'a, K: Send + Sync, V: Eq + Send + Sync> {
39    inner: CursorImpl<'a, V, OccupiedEntry<'a, K, Record<V>>>,
40}
41
42impl<'a, K: Send + Sync, V: Eq + Send + Sync> Cursor<'a, K, V> {
43    const fn new(
44        entry: OccupiedEntry<'a, K, Record<V>>,
45        keys: &'a Gauge,
46        items: &'a Gauge,
47        pruned: &'a Counter,
48    ) -> Self {
49        Self {
50            inner: CursorImpl::<'a, V, OccupiedEntry<'a, K, Record<V>>>::new(
51                entry, keys, items, pruned,
52            ),
53        }
54    }
55}
56
57impl<K: Send + Sync, V: Eq + Send + Sync> CursorTrait for Cursor<'_, K, V> {
58    type Value = V;
59
60    fn next(&mut self) -> Option<&V> {
61        self.inner.next()
62    }
63
64    fn insert(&mut self, value: V) {
65        self.inner.insert(value)
66    }
67
68    fn delete(&mut self) {
69        self.inner.delete()
70    }
71
72    fn update(&mut self, value: V) {
73        self.inner.update(value)
74    }
75
76    fn prune(&mut self, predicate: &impl Fn(&V) -> bool) {
77        self.inner.prune(predicate)
78    }
79}
80
81/// A memory-efficient index that uses an unordered map internally to map translated keys to
82/// arbitrary values.
83pub struct Index<T: Translator, V: Eq + Send + Sync> {
84    translator: T,
85    map: HashMap<T::Key, Record<V>, T>,
86
87    keys: Gauge,
88    items: Gauge,
89    pruned: Counter,
90}
91
92impl<T: Translator, V: Eq + Send + Sync> Index<T, V> {
93    /// Create a new entry in the index.
94    fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<'_, T::Key, Record<V>>, v: V) {
95        keys.inc();
96        items.inc();
97        vacant.insert(Record {
98            value: v,
99            next: None,
100        });
101    }
102
103    /// Create a new index with the given translator and metrics registry.
104    pub fn new(ctx: impl Metrics, translator: T) -> Self {
105        let s = Self {
106            translator: translator.clone(),
107            map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, translator),
108            keys: Gauge::default(),
109            items: Gauge::default(),
110            pruned: Counter::default(),
111        };
112        ctx.register(
113            "keys",
114            "Number of translated keys in the index",
115            s.keys.clone(),
116        );
117        ctx.register("items", "Number of items in the index", s.items.clone());
118        ctx.register("pruned", "Number of items pruned", s.pruned.clone());
119        s
120    }
121}
122
123impl<T: Translator, V: Eq + Send + Sync> super::Factory<T> for Index<T, V> {
124    fn new(ctx: impl commonware_runtime::Metrics, translator: T) -> Self {
125        Self::new(ctx, translator)
126    }
127}
128
129impl<T: Translator, V: Eq + Send + Sync> Unordered for Index<T, V> {
130    type Value = V;
131    type Cursor<'a>
132        = Cursor<'a, T::Key, V>
133    where
134        Self: 'a;
135
136    fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
137    where
138        V: 'a,
139    {
140        let k = self.translator.transform(key);
141        self.map
142            .get(&k)
143            .map(|record| ImmutableCursor::new(record))
144            .into_iter()
145            .flatten()
146    }
147
148    fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
149        let k = self.translator.transform(key);
150        match self.map.entry(k) {
151            Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
152                entry,
153                &self.keys,
154                &self.items,
155                &self.pruned,
156            )),
157            Entry::Vacant(_) => None,
158        }
159    }
160
161    fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
162        let k = self.translator.transform(key);
163        match self.map.entry(k) {
164            Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
165                entry,
166                &self.keys,
167                &self.items,
168                &self.pruned,
169            )),
170            Entry::Vacant(entry) => {
171                Self::create(&self.keys, &self.items, entry, value);
172                None
173            }
174        }
175    }
176
177    fn insert(&mut self, key: &[u8], v: V) {
178        let k = self.translator.transform(key);
179        match self.map.entry(k) {
180            Entry::Occupied(entry) => {
181                let mut cursor =
182                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
183                cursor.next();
184                cursor.insert(v);
185            }
186            Entry::Vacant(entry) => {
187                Self::create(&self.keys, &self.items, entry, v);
188            }
189        }
190    }
191
192    fn insert_and_prune(&mut self, key: &[u8], value: V, predicate: impl Fn(&V) -> bool) {
193        let k = self.translator.transform(key);
194        match self.map.entry(k) {
195            Entry::Occupied(entry) => {
196                // Get entry
197                let mut cursor =
198                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
199
200                cursor.prune(&predicate);
201
202                // Add our new value (if not prunable).
203                if !predicate(&value) {
204                    cursor.insert(value);
205                }
206            }
207            Entry::Vacant(entry) => {
208                Self::create(&self.keys, &self.items, entry, value);
209            }
210        }
211    }
212
213    fn prune(&mut self, key: &[u8], predicate: impl Fn(&V) -> bool) {
214        let k = self.translator.transform(key);
215        match self.map.entry(k) {
216            Entry::Occupied(entry) => {
217                // Get cursor
218                let mut cursor =
219                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
220
221                cursor.prune(&predicate);
222            }
223            Entry::Vacant(_) => {}
224        }
225    }
226
227    fn remove(&mut self, key: &[u8]) {
228        // To ensure metrics are accurate, we iterate over all conflicting values and remove them
229        // one-by-one (rather than just removing the entire entry).
230        self.prune(key, |_| true);
231    }
232
233    #[cfg(test)]
234    fn keys(&self) -> usize {
235        self.map.len()
236    }
237
238    #[cfg(test)]
239    fn items(&self) -> usize {
240        self.items.get() as usize
241    }
242
243    #[cfg(test)]
244    fn pruned(&self) -> usize {
245        self.pruned.get() as usize
246    }
247}
248
249impl<T: Translator, V: Eq + Send + Sync> Drop for Index<T, V> {
250    /// To avoid stack overflow on keys with many collisions, we implement an iterative drop (in
251    /// lieu of Rust's default recursive drop).
252    fn drop(&mut self) {
253        for (_, mut record) in self.map.drain() {
254            let mut next = record.next.take();
255            while let Some(mut record) = next {
256                next = record.next.take();
257            }
258        }
259    }
260}