Skip to main content

commonware_storage/index/
unordered.rs

1//! A memory-efficient index that uses an unordered map internally to map translated keys to
2//! arbitrary values. If you require ordering over the map's keys, consider
3//! [crate::index::ordered::Index] instead.
4
5use crate::{
6    index::{
7        storage::{insert_front, Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
8        Cursor as CursorTrait, Unordered,
9    },
10    translator::Translator,
11};
12use commonware_runtime::{
13    telemetry::metrics::{Counter, Gauge, MetricsExt as _},
14    Metrics,
15};
16use std::collections::{
17    hash_map::{Entry, OccupiedEntry, VacantEntry},
18    HashMap,
19};
20
21/// The initial capacity of the internal hashmap. This is a guess at the number of unique keys we
22/// will encounter. The hashmap will grow as needed, but this is a good starting point (covering the
23/// entire [crate::translator::OneCap] range).
24const INITIAL_CAPACITY: usize = 256;
25
26/// Implementation of [IndexEntry] for [OccupiedEntry].
27impl<K: Send + Sync, V: Send + Sync> IndexEntry<V> for OccupiedEntry<'_, K, Record<V>> {
28    fn get_mut(&mut self) -> &mut Record<V> {
29        self.get_mut()
30    }
31    fn remove(self) {
32        OccupiedEntry::remove(self);
33    }
34}
35
36/// A [crate::index::Cursor] over the values associated with a translated key.
37pub type Cursor<'a, K, V> = CursorImpl<'a, V, OccupiedEntry<'a, K, Record<V>>>;
38
39/// A memory-efficient index that uses an unordered map internally to map translated keys to
40/// arbitrary values.
41pub struct Index<T: Translator, V: Send + Sync> {
42    translator: T,
43    map: HashMap<T::Key, Record<V>, T>,
44
45    keys: Gauge,
46    items: Gauge,
47    pruned: Counter,
48}
49
50impl<T: Translator, V: Send + Sync> Index<T, V> {
51    /// Create a new entry in the index.
52    fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<'_, T::Key, Record<V>>, v: V) {
53        keys.inc();
54        items.inc();
55        vacant.insert(Record {
56            value: v,
57            next: None,
58        });
59    }
60
61    /// Create a new index with the given translator and metrics registry.
62    pub fn new(ctx: impl Metrics, translator: T) -> Self {
63        Self {
64            translator: translator.clone(),
65            map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, translator),
66            keys: ctx.gauge("keys", "Number of translated keys in the index"),
67            items: ctx.gauge("items", "Number of items in the index"),
68            pruned: ctx.counter("pruned", "Number of items pruned"),
69        }
70    }
71}
72
73impl<T: Translator, V: Send + Sync> super::Factory<T> for Index<T, V> {
74    fn new(ctx: impl commonware_runtime::Metrics, translator: T) -> Self {
75        Self::new(ctx, translator)
76    }
77}
78
79impl<T: Translator, V: Send + Sync> Unordered for Index<T, V> {
80    type Value = V;
81    type Cursor<'a>
82        = Cursor<'a, T::Key, V>
83    where
84        Self: 'a;
85
86    fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
87    where
88        V: 'a,
89    {
90        let k = self.translator.transform(key);
91        self.map
92            .get(&k)
93            .map(|record| ImmutableCursor::new(record))
94            .into_iter()
95            .flatten()
96    }
97
98    fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
99        let k = self.translator.transform(key);
100        match self.map.entry(k) {
101            Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
102                entry,
103                &self.keys,
104                &self.items,
105                &self.pruned,
106            )),
107            Entry::Vacant(_) => None,
108        }
109    }
110
111    fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
112        let k = self.translator.transform(key);
113        match self.map.entry(k) {
114            Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
115                entry,
116                &self.keys,
117                &self.items,
118                &self.pruned,
119            )),
120            Entry::Vacant(entry) => {
121                Self::create(&self.keys, &self.items, entry, value);
122                None
123            }
124        }
125    }
126
127    fn insert(&mut self, key: &[u8], v: V) {
128        let k = self.translator.transform(key);
129        match self.map.entry(k) {
130            Entry::Occupied(mut entry) => {
131                insert_front(entry.get_mut(), v);
132                self.items.inc();
133            }
134            Entry::Vacant(entry) => {
135                Self::create(&self.keys, &self.items, entry, v);
136            }
137        }
138    }
139
140    fn insert_and_retain(&mut self, key: &[u8], value: V, should_retain: impl Fn(&V) -> bool) {
141        let k = self.translator.transform(key);
142        match self.map.entry(k) {
143            Entry::Occupied(entry) => {
144                let mut cursor =
145                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
146
147                // Drop anything that should not be retained.
148                cursor.retain(&should_retain);
149
150                // Add the new value only if it should be retained.
151                if should_retain(&value) {
152                    cursor.insert(value);
153                }
154            }
155            Entry::Vacant(entry) => {
156                // Create the entry only if the value should be retained.
157                if should_retain(&value) {
158                    Self::create(&self.keys, &self.items, entry, value);
159                }
160            }
161        }
162    }
163
164    fn remove(&mut self, key: &[u8]) {
165        let k = self.translator.transform(key);
166        if let Some(mut record) = self.map.remove(&k) {
167            // To ensure metrics are accurate, account for all conflicting values in the chain.
168            self.keys.dec();
169            self.items.dec();
170            self.pruned.inc();
171            while let Some(next) = record.next.take() {
172                self.items.dec();
173                self.pruned.inc();
174                record = *next;
175            }
176        }
177    }
178
179    #[cfg(test)]
180    fn keys(&self) -> usize {
181        self.map.len()
182    }
183
184    #[cfg(test)]
185    fn items(&self) -> usize {
186        self.items.get() as usize
187    }
188
189    #[cfg(test)]
190    fn pruned(&self) -> usize {
191        self.pruned.get() as usize
192    }
193}
194
195impl<T: Translator, V: Send + Sync> Drop for Index<T, V> {
196    /// To avoid stack overflow on keys with many collisions, we implement an iterative drop (in
197    /// lieu of Rust's default recursive drop).
198    fn drop(&mut self) {
199        for (_, mut record) in self.map.drain() {
200            let mut next = record.next.take();
201            while let Some(mut record) = next {
202                next = record.next.take();
203            }
204        }
205    }
206}