Skip to main content

commonware_storage/index/
ordered.rs

1//! Implementation of [Ordered] that uses an ordered map internally to map translated keys to
2//! arbitrary values. Beyond the standard [Unordered] implementation, this variant adds the
3//! capability to retrieve values associated with both next and previous translated keys of a given
4//! key. There is no ordering guarantee provided over the values associated with each key. Ordering
5//! applies only to the _translated_ key space.
6
7use crate::{
8    index::{
9        storage::{Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
10        Cursor as CursorTrait, Ordered, Unordered,
11    },
12    translator::Translator,
13};
14use commonware_runtime::Metrics;
15use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
16use std::{
17    collections::{
18        btree_map::{
19            Entry as BTreeEntry, OccupiedEntry as BTreeOccupiedEntry,
20            VacantEntry as BTreeVacantEntry,
21        },
22        BTreeMap,
23    },
24    ops::Bound::{Excluded, Unbounded},
25};
26
27/// Implementation of [IndexEntry] for [BTreeOccupiedEntry].
28impl<K: Ord + Send + Sync, V: Eq + Send + Sync> IndexEntry<V>
29    for BTreeOccupiedEntry<'_, K, Record<V>>
30{
31    fn get(&self) -> &V {
32        &self.get().value
33    }
34    fn get_mut(&mut self) -> &mut Record<V> {
35        self.get_mut()
36    }
37    fn remove(self) {
38        self.remove_entry();
39    }
40}
41
42/// A cursor for the ordered [Index] that wraps the shared implementation.
43pub struct Cursor<'a, K: Ord + Send + Sync, V: Eq + Send + Sync> {
44    inner: CursorImpl<'a, V, BTreeOccupiedEntry<'a, K, Record<V>>>,
45}
46
47impl<'a, K: Ord + Send + Sync, V: Eq + Send + Sync> Cursor<'a, K, V> {
48    const fn new(
49        entry: BTreeOccupiedEntry<'a, K, Record<V>>,
50        keys: &'a Gauge,
51        items: &'a Gauge,
52        pruned: &'a Counter,
53    ) -> Self {
54        Self {
55            inner: CursorImpl::<'a, V, BTreeOccupiedEntry<'a, K, Record<V>>>::new(
56                entry, keys, items, pruned,
57            ),
58        }
59    }
60}
61
62impl<K: Ord + Send + Sync, V: Eq + Send + Sync> CursorTrait for Cursor<'_, K, V> {
63    type Value = V;
64
65    fn update(&mut self, v: V) {
66        self.inner.update(v)
67    }
68
69    fn next(&mut self) -> Option<&V> {
70        self.inner.next()
71    }
72
73    fn insert(&mut self, v: V) {
74        self.inner.insert(v)
75    }
76
77    fn delete(&mut self) {
78        self.inner.delete()
79    }
80
81    fn prune(&mut self, predicate: &impl Fn(&V) -> bool) {
82        self.inner.prune(predicate)
83    }
84}
85
86/// A memory-efficient index that uses an ordered map internally to map translated keys to arbitrary
87/// values.
88pub struct Index<T: Translator, V: Eq + Send + Sync> {
89    translator: T,
90    map: BTreeMap<T::Key, Record<V>>,
91
92    keys: Gauge,
93    items: Gauge,
94    pruned: Counter,
95}
96
97impl<T: Translator, V: Eq + Send + Sync> Index<T, V> {
98    /// Create a new entry in the index.
99    fn create(keys: &Gauge, items: &Gauge, vacant: BTreeVacantEntry<'_, T::Key, Record<V>>, v: V) {
100        keys.inc();
101        items.inc();
102        vacant.insert(Record {
103            value: v,
104            next: None,
105        });
106    }
107
108    /// Create a new [Index] with the given translator and metrics registry.
109    pub fn new(ctx: impl Metrics, translator: T) -> Self {
110        let s = Self {
111            translator,
112            map: BTreeMap::new(),
113            keys: Gauge::default(),
114            items: Gauge::default(),
115            pruned: Counter::default(),
116        };
117        ctx.register(
118            "keys",
119            "Number of translated keys in the index",
120            s.keys.clone(),
121        );
122        ctx.register("items", "Number of items in the index", s.items.clone());
123        ctx.register("pruned", "Number of items pruned", s.pruned.clone());
124        s
125    }
126
127    /// Like [Ordered::next_translated_key] but without cycling around to the first key if there is
128    /// no next key.
129    pub(super) fn next_translated_key_no_cycle<'a>(
130        &'a self,
131        key: &[u8],
132    ) -> Option<ImmutableCursor<'a, V>> {
133        let k = self.translator.transform(key);
134        self.map
135            .range((Excluded(k), Unbounded))
136            .next()
137            .map(|(_, record)| ImmutableCursor::new(record))
138    }
139
140    /// Like [Ordered::prev_translated_key] but without cycling around to the last key if there is
141    /// no previous key.
142    pub(super) fn prev_translated_key_no_cycle<'a>(
143        &'a self,
144        key: &[u8],
145    ) -> Option<ImmutableCursor<'a, V>> {
146        let k = self.translator.transform(key);
147        self.map
148            .range(..k)
149            .next_back()
150            .map(|(_, record)| ImmutableCursor::new(record))
151    }
152}
153
154impl<T: Translator, V: Eq + Send + Sync> Ordered for Index<T, V> {
155    type Iterator<'a>
156        = ImmutableCursor<'a, V>
157    where
158        Self: 'a;
159
160    fn prev_translated_key<'a>(&'a self, key: &[u8]) -> Option<(Self::Iterator<'a>, bool)>
161    where
162        Self::Value: 'a,
163    {
164        let res = self.prev_translated_key_no_cycle(key);
165        if let Some(res) = res {
166            return Some((res, false));
167        }
168
169        self.last_translated_key().map(|res| (res, true))
170    }
171
172    fn next_translated_key<'a>(&'a self, key: &[u8]) -> Option<(Self::Iterator<'a>, bool)>
173    where
174        Self::Value: 'a,
175    {
176        let res = self.next_translated_key_no_cycle(key);
177        if let Some(res) = res {
178            return Some((res, false));
179        }
180
181        self.first_translated_key().map(|res| (res, true))
182    }
183
184    fn first_translated_key<'a>(&'a self) -> Option<Self::Iterator<'a>>
185    where
186        Self::Value: 'a,
187    {
188        self.map
189            .first_key_value()
190            .map(|(_, record)| ImmutableCursor::new(record))
191    }
192
193    fn last_translated_key<'a>(&'a self) -> Option<Self::Iterator<'a>>
194    where
195        Self::Value: 'a,
196    {
197        self.map
198            .last_key_value()
199            .map(|(_, record)| ImmutableCursor::new(record))
200    }
201}
202
203impl<T: Translator, V: Eq + Send + Sync> super::Factory<T> for Index<T, V> {
204    fn new(ctx: impl commonware_runtime::Metrics, translator: T) -> Self {
205        Self::new(ctx, translator)
206    }
207}
208
209impl<T: Translator, V: Eq + Send + Sync> Unordered for Index<T, V> {
210    type Value = V;
211    type Cursor<'a>
212        = Cursor<'a, T::Key, V>
213    where
214        Self: 'a;
215
216    fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
217    where
218        V: 'a,
219    {
220        let k = self.translator.transform(key);
221        self.map
222            .get(&k)
223            .map(|record| ImmutableCursor::new(record))
224            .into_iter()
225            .flatten()
226    }
227
228    fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
229        let k = self.translator.transform(key);
230        match self.map.entry(k) {
231            BTreeEntry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
232                entry,
233                &self.keys,
234                &self.items,
235                &self.pruned,
236            )),
237            BTreeEntry::Vacant(_) => None,
238        }
239    }
240
241    fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
242        let k = self.translator.transform(key);
243        match self.map.entry(k) {
244            BTreeEntry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
245                entry,
246                &self.keys,
247                &self.items,
248                &self.pruned,
249            )),
250            BTreeEntry::Vacant(entry) => {
251                Self::create(&self.keys, &self.items, entry, value);
252                None
253            }
254        }
255    }
256
257    fn insert(&mut self, key: &[u8], value: V) {
258        let k = self.translator.transform(key);
259        match self.map.entry(k) {
260            BTreeEntry::Occupied(entry) => {
261                let mut cursor =
262                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
263                cursor.next();
264                cursor.insert(value);
265            }
266            BTreeEntry::Vacant(entry) => {
267                Self::create(&self.keys, &self.items, entry, value);
268            }
269        }
270    }
271
272    fn insert_and_prune(&mut self, key: &[u8], value: V, predicate: impl Fn(&V) -> bool) {
273        let k = self.translator.transform(key);
274        match self.map.entry(k) {
275            BTreeEntry::Occupied(entry) => {
276                // Get entry
277                let mut cursor =
278                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
279
280                // Remove anything that is prunable.
281                cursor.prune(&predicate);
282
283                // Add our new value (if not prunable).
284                if !predicate(&value) {
285                    cursor.insert(value);
286                }
287            }
288            BTreeEntry::Vacant(entry) => {
289                Self::create(&self.keys, &self.items, entry, value);
290            }
291        }
292    }
293
294    fn prune(&mut self, key: &[u8], predicate: impl Fn(&V) -> bool) {
295        let k = self.translator.transform(key);
296        match self.map.entry(k) {
297            BTreeEntry::Occupied(entry) => {
298                // Get cursor
299                let mut cursor =
300                    Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
301
302                // Remove anything that is prunable.
303                cursor.prune(&predicate);
304            }
305            BTreeEntry::Vacant(_) => {}
306        }
307    }
308
309    fn remove(&mut self, key: &[u8]) {
310        // To ensure metrics are accurate, we iterate over all conflicting values and remove them
311        // one-by-one (rather than just removing the entire entry).
312        self.prune(key, |_| true);
313    }
314
315    #[cfg(test)]
316    fn keys(&self) -> usize {
317        self.map.len()
318    }
319
320    #[cfg(test)]
321    fn items(&self) -> usize {
322        self.items.get() as usize
323    }
324
325    #[cfg(test)]
326    fn pruned(&self) -> usize {
327        self.pruned.get() as usize
328    }
329}
330
331impl<T: Translator, V: Eq + Send + Sync> Drop for Index<T, V> {
332    /// To avoid stack overflow on keys with many collisions, we implement an iterative drop (in
333    /// lieu of Rust's default recursive drop).
334    fn drop(&mut self) {
335        for (_, record) in self.map.iter_mut() {
336            let mut next = record.next.take();
337            while let Some(mut record) = next {
338                next = record.next.take();
339            }
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::translator::OneCap;
348    use commonware_macros::test_traced;
349    use commonware_runtime::{deterministic, Runner};
350    use commonware_utils::hex;
351
352    #[test_traced]
353    fn test_ordered_empty_index() {
354        let runner = deterministic::Runner::default();
355        runner.start(|context| async move {
356            let index = Index::<_, u64>::new(context, OneCap);
357
358            assert!(index.first_translated_key().is_none());
359            assert!(index.last_translated_key().is_none());
360            assert!(index.prev_translated_key(b"key").is_none());
361            assert!(index.next_translated_key(b"key").is_none());
362        });
363    }
364
365    #[test_traced]
366    fn test_ordered_index_ordering() {
367        let runner = deterministic::Runner::default();
368        runner.start(|context| async move {
369            let mut index = Index::<_, u64>::new(context, OneCap);
370            assert_eq!(index.keys(), 0);
371
372            let k1 = &hex!("0x0b02AA"); // translated key 0b
373            let k2 = &hex!("0x1c04CC"); // translated key 1c
374            let k2_collides = &hex!("0x1c0311");
375            let k3 = &hex!("0x2d06EE"); // translated key 2d
376            index.insert(k1, 1);
377            index.insert(k2, 21);
378            index.insert(k2_collides, 22);
379            index.insert(k3, 3);
380            assert_eq!(index.keys(), 3);
381
382            // First translated key is 0b.
383            let mut next = index.first_translated_key().unwrap();
384            assert_eq!(next.next().unwrap(), &1);
385            assert_eq!(next.next(), None);
386
387            // Next translated key to 0x00 is 0b.
388            let (mut next, wrapped) = index.next_translated_key(&[0x00]).unwrap();
389            assert!(!wrapped);
390            assert_eq!(next.next().unwrap(), &1);
391            assert_eq!(next.next(), None);
392
393            // Next translated key to 0x0b is 1c.
394            let (mut next, wrapped) = index.next_translated_key(&hex!("0x0b0102")).unwrap();
395            assert!(!wrapped);
396            assert_eq!(next.next().unwrap(), &21);
397            assert_eq!(next.next().unwrap(), &22);
398            assert_eq!(next.next(), None);
399
400            // Next translated key to 0x1b is 1c.
401            let (mut next, wrapped) = index.next_translated_key(&hex!("0x1b010203")).unwrap();
402            assert!(!wrapped);
403            assert_eq!(next.next().unwrap(), &21);
404            assert_eq!(next.next().unwrap(), &22);
405            assert_eq!(next.next(), None);
406
407            // Next translated key to 0x2a is 2d.
408            let (mut next, wrapped) = index.next_translated_key(&hex!("0x2a01020304")).unwrap();
409            assert!(!wrapped);
410            assert_eq!(next.next().unwrap(), &3);
411            assert_eq!(next.next(), None);
412
413            // Next translated key to 0x2d cycles around to 0x0b.
414            let (mut next, wrapped) = index.next_translated_key(k3).unwrap();
415            assert!(wrapped);
416            assert_eq!(next.next().unwrap(), &1);
417            assert_eq!(next.next(), None);
418
419            // Another cycle-around case.
420            let (mut next, wrapped) = index.next_translated_key(&hex!("0x2eFF")).unwrap();
421            assert!(wrapped);
422            assert_eq!(next.next().unwrap(), &1);
423            assert_eq!(next.next(), None);
424
425            // Previous translated key of first key is the last key.
426            let (mut prev, wrapped) = index.prev_translated_key(k1).unwrap();
427            assert!(wrapped);
428            assert_eq!(prev.next().unwrap(), &3);
429            assert_eq!(prev.next(), None);
430
431            // Previous translated key is 0b.
432            let (mut prev, wrapped) = index.prev_translated_key(&hex!("0x0c0102")).unwrap();
433            assert!(!wrapped);
434            assert_eq!(prev.next().unwrap(), &1);
435            assert_eq!(prev.next(), None);
436
437            // Previous translated key is 1c.
438            let (mut prev, wrapped) = index.prev_translated_key(&hex!("0x1d0102")).unwrap();
439            assert!(!wrapped);
440            assert_eq!(prev.next().unwrap(), &21);
441            assert_eq!(prev.next().unwrap(), &22);
442            assert_eq!(prev.next(), None);
443
444            // Previous translated key is 2d.
445            let (mut prev, wrapped) = index.prev_translated_key(&hex!("0xCC0102")).unwrap();
446            assert!(!wrapped);
447            assert_eq!(prev.next().unwrap(), &3);
448            assert_eq!(prev.next(), None);
449
450            // Last translated key is 2d.
451            let mut last = index.last_translated_key().unwrap();
452            assert_eq!(last.next().unwrap(), &3);
453            assert_eq!(last.next(), None);
454        });
455    }
456}