commonware_storage/index/
storage.rs

1use crate::translator::Translator;
2use commonware_runtime::Metrics;
3use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
4use std::collections::{
5    hash_map::{Entry, OccupiedEntry, VacantEntry},
6    HashMap,
7};
8
9/// The initial capacity of the internal hashmap. This is a guess at the number of unique keys we will
10/// encounter. The hashmap will grow as needed, but this is a good starting point (covering
11/// the entire [crate::translator::OneCap] range).
12const INITIAL_CAPACITY: usize = 256;
13
14/// Panic message shown when `next()` is not called after `Cursor` creation or after `insert()` or ``delete()`.
15const MUST_CALL_NEXT: &str = "must call Cursor::next()";
16
17/// Panic message shown when `update()` is called after `Cursor` has returned `None` or after `insert()`
18/// or `delete()` (but before `next()`).
19const NO_ACTIVE_ITEM: &str = "no active item in Cursor";
20
21/// Each key is mapped to a `Record` that contains a linked list of potential values for that key.
22///
23/// We avoid using a `Vec` to store values because the common case (where there are no collisions) would
24/// require an additional 24 bytes of memory for each value (the `len`, `capacity`, and `ptr` fields).
25///
26/// Again optimizing for the common case, we store the first value directly in the `Record` to avoid
27/// indirection (heap jumping).
28#[derive(PartialEq, Eq)]
29struct Record<V: Eq> {
30    value: V,
31    next: Option<Box<Record<V>>>,
32}
33
34/// Phases of the `Cursor` during iteration.
35#[derive(PartialEq, Eq)]
36enum Phase<V: Eq> {
37    /// Before iteration starts.
38    Initial,
39
40    /// The current entry.
41    Entry,
42    /// Some item after the current entry.
43    Next(Box<Record<V>>),
44
45    /// Iteration is done.
46    Done,
47    /// The current entry has no valid item.
48    EntryDeleted,
49
50    /// The current entry has been deleted and we've updated its value in-place
51    /// to be the value of the next record.
52    PostDeleteEntry,
53    /// The item has been deleted and we may be pointing to the next item.
54    PostDeleteNext(Option<Box<Record<V>>>),
55    /// An item has been inserted.
56    PostInsert(Box<Record<V>>),
57}
58
59/// A mutable iterator over the values associated with a translated key, allowing in-place modifications.
60///
61/// The `Cursor` provides a way to traverse and modify the linked list of `Record`s while maintaining its
62/// structure. It supports:
63///
64/// - Iteration via `next()` to access values.
65/// - Modification via `update()` to change the current value.
66/// - Insertion via `insert()` to add new values.
67/// - Deletion via `delete()` to remove values.
68///
69/// # Safety
70///
71/// - Must call `next()` before `update()`, `insert()`, or `delete()` to establish a valid position.
72/// - Once `next()` returns `None`, only `insert()` can be called.
73/// - Dropping the `Cursor` automatically restores the list structure by reattaching any detached `next` nodes.
74///
75/// _If you don't need advanced functionality, just use `insert()`, `insert_and_prune()`, or `remove()` instead._
76pub struct Cursor<'a, T: Translator, V: Eq> {
77    // The current phase of the cursor.
78    phase: Phase<V>,
79
80    // The current entry.
81    entry: Option<OccupiedEntry<'a, T::Key, Record<V>>>,
82
83    // The head of the linked list of previously visited records.
84    past: Option<Box<Record<V>>>,
85    // The tail of the linked list of previously visited records.
86    past_tail: Option<*mut Record<V>>,
87    // Whether we've pushed a record with a populated `next` field to `past` (invalidates `past_tail`).
88    past_pushed_list: bool,
89
90    // Metrics.
91    keys: &'a Gauge,
92    items: &'a Gauge,
93    pruned: &'a Counter,
94}
95
96impl<'a, T: Translator, V: Eq> Cursor<'a, T, V> {
97    /// Creates a new `Cursor` from a mutable record reference, detaching its `next` chain for iteration.
98    fn new(
99        entry: OccupiedEntry<'a, T::Key, Record<V>>,
100        keys: &'a Gauge,
101        items: &'a Gauge,
102        pruned: &'a Counter,
103    ) -> Self {
104        Self {
105            phase: Phase::Initial,
106
107            entry: Some(entry),
108
109            past: None,
110            past_tail: None,
111            past_pushed_list: false,
112
113            keys,
114            items,
115            pruned,
116        }
117    }
118
119    /// Pushes a `Record` to the end of `past`.
120    ///
121    /// If the record has a `next`, this function cannot be called again.
122    fn past_push(&mut self, next: Box<Record<V>>) {
123        // Ensure we only push a list once (`past_tail` becomes stale).
124        assert!(!self.past_pushed_list);
125        self.past_pushed_list = next.next.is_some();
126
127        // Add `next` to the tail of `past`.
128        if self.past_tail.is_none() {
129            self.past = Some(next);
130            self.past_tail = self.past.as_mut().map(|b| &mut **b as *mut Record<V>);
131        } else {
132            unsafe {
133                assert!((*self.past_tail.unwrap()).next.is_none());
134                (*self.past_tail.unwrap()).next = Some(next);
135                let tail_next = (*self.past_tail.unwrap()).next.as_mut().unwrap();
136                self.past_tail = Some(&mut **tail_next as *mut Record<V>);
137            }
138        }
139    }
140
141    /// Updates the value at the current position in the iteration.
142    ///
143    /// Panics if called before `next()` or after iteration is complete (`Status::Done` phase).
144    pub fn update(&mut self, v: V) {
145        match &mut self.phase {
146            Phase::Initial => unreachable!("{MUST_CALL_NEXT}"),
147            Phase::Entry => {
148                self.entry.as_mut().unwrap().get_mut().value = v;
149            }
150            Phase::Next(next) => {
151                next.value = v;
152            }
153            Phase::Done
154            | Phase::EntryDeleted
155            | Phase::PostDeleteEntry
156            | Phase::PostDeleteNext(_)
157            | Phase::PostInsert(_) => unreachable!("{NO_ACTIVE_ITEM}"),
158        }
159    }
160
161    /// If we are in a phase where we could return a value, return it.
162    fn value(&self) -> Option<&V> {
163        match &self.phase {
164            Phase::Initial => unreachable!(),
165            Phase::Entry => self.entry.as_ref().map(|r| &r.get().value),
166            Phase::Next(current) => Some(&current.value),
167            Phase::Done | Phase::EntryDeleted => None,
168            Phase::PostDeleteEntry | Phase::PostDeleteNext(_) | Phase::PostInsert(_) => {
169                unreachable!()
170            }
171        }
172    }
173
174    /// Advances the cursor to the next value in the chain, returning a reference to it.
175    ///
176    /// This method must be called before any other operations (`insert()`, `delete()`, etc.). If
177    /// either `insert()` or `delete()` is called, `next()` must be called to set a new active
178    /// item. If after `insert()`, the next active item is the item after the inserted item. If after
179    /// `delete()`, the next active item is the item after the deleted item.
180    ///
181    /// Handles transitions between phases and adjusts for deletions. Returns `None` when the list is exhausted.
182    /// It is safe to call `next()` even after it returns `None`.
183    #[allow(clippy::should_implement_trait)]
184    pub fn next(&mut self) -> Option<&V> {
185        match std::mem::replace(&mut self.phase, Phase::Done) {
186            Phase::Initial | Phase::PostDeleteEntry => {
187                // We must start with some entry, so this will always be some non-None value.
188                self.phase = Phase::Entry;
189            }
190            Phase::Entry => {
191                // If there is a record after, we set it to be the current record.
192                if let Some(next) = self.entry.as_mut().unwrap().get_mut().next.take() {
193                    self.phase = Phase::Next(next);
194                }
195            }
196            Phase::Next(mut current) | Phase::PostInsert(mut current) => {
197                // Take the next record and push the current one to the past list.
198                let next = current.next.take();
199                self.past_push(current);
200
201                // Set the next record to be the current record.
202                if let Some(next) = next {
203                    self.phase = Phase::Next(next);
204                }
205            }
206            Phase::Done => {}
207            Phase::EntryDeleted => {
208                self.phase = Phase::EntryDeleted;
209            }
210            Phase::PostDeleteNext(current) => {
211                // If the stale value is some, we set it to be the current record.
212                if let Some(current) = current {
213                    self.phase = Phase::Next(current);
214                }
215            }
216        }
217        self.value()
218    }
219
220    /// Inserts a new value at the current position.
221    pub fn insert(&mut self, v: V) {
222        self.items.inc();
223        match std::mem::replace(&mut self.phase, Phase::Done) {
224            Phase::Initial => unreachable!("{MUST_CALL_NEXT}"),
225            Phase::Entry => {
226                // Create a new record that points to entry's next.
227                let new = Box::new(Record {
228                    value: v,
229                    next: self.entry.as_mut().unwrap().get_mut().next.take(),
230                });
231
232                // Set the phase to the new record.
233                self.phase = Phase::PostInsert(new);
234            }
235            Phase::Next(mut current) => {
236                // Take next.
237                let next = current.next.take();
238
239                // Add current to the past list.
240                self.past_push(current);
241
242                // Create a new record that points to the next's next.
243                let new = Box::new(Record { value: v, next });
244                self.phase = Phase::PostInsert(new);
245            }
246            Phase::Done => {
247                // If we are done, we need to create a new record and
248                // immediately push it to the past list.
249                let new = Box::new(Record {
250                    value: v,
251                    next: None,
252                });
253                self.past_push(new);
254            }
255            Phase::EntryDeleted => {
256                // If entry is deleted, we need to update it.
257                self.entry.as_mut().unwrap().get_mut().value = v;
258
259                // We don't consider overwriting a deleted entry a collision.
260            }
261            Phase::PostDeleteEntry | Phase::PostDeleteNext(_) | Phase::PostInsert(_) => {
262                unreachable!("{MUST_CALL_NEXT}")
263            }
264        }
265    }
266
267    /// Deletes the current value, adjusting the list structure.
268    pub fn delete(&mut self) {
269        self.pruned.inc();
270        self.items.dec();
271        match std::mem::replace(&mut self.phase, Phase::Done) {
272            Phase::Initial => unreachable!("{MUST_CALL_NEXT}"),
273            Phase::Entry => {
274                // Attempt to overwrite the entry with the next value.
275                let entry = self.entry.as_mut().unwrap().get_mut();
276                if let Some(next) = entry.next.take() {
277                    entry.value = next.value;
278                    entry.next = next.next;
279                    self.phase = Phase::PostDeleteEntry;
280                    return;
281                }
282
283                // If there is no next, we consider the entry deleted.
284                self.phase = Phase::EntryDeleted;
285                // We wait to update metrics until `drop()`.
286            }
287            Phase::Next(mut current) => {
288                // Drop current instead of pushing it to the past list.
289                let next = current.next.take();
290                self.phase = Phase::PostDeleteNext(next);
291            }
292            Phase::Done | Phase::EntryDeleted => unreachable!("{NO_ACTIVE_ITEM}"),
293            Phase::PostDeleteEntry | Phase::PostDeleteNext(_) | Phase::PostInsert(_) => {
294                unreachable!("{MUST_CALL_NEXT}")
295            }
296        }
297    }
298}
299
300unsafe impl<'a, T, V> Send for Cursor<'a, T, V>
301where
302    T: Translator + Send,
303    T::Key: Send,
304    V: Eq + Send,
305{
306    // SAFETY: `Send` is safe because the raw pointer `past_tail` only ever points
307    // to heap memory owned by `self.past`. Since the pointer's referent is moved
308    // along with the `Cursor`, no data races can occur. The `where` clause
309    // ensures all generic parameters are also `Send`.
310}
311
312impl<T: Translator, V: Eq> Drop for Cursor<'_, T, V> {
313    fn drop(&mut self) {
314        // Take the entry.
315        let mut entry = self.entry.take().unwrap();
316
317        // If there is a dangling next, we should add it to past.
318        match std::mem::replace(&mut self.phase, Phase::Done) {
319            Phase::Initial | Phase::Entry => {
320                // No action needed.
321            }
322            Phase::Next(next) => {
323                // If there is a next, we should add it to past.
324                self.past_push(next);
325            }
326            Phase::Done => {
327                // No action needed.
328            }
329            Phase::EntryDeleted => {
330                // If the entry is deleted, we should remove it.
331                self.keys.dec();
332                entry.remove();
333                return;
334            }
335            Phase::PostDeleteEntry => {
336                // No action needed.
337            }
338            Phase::PostDeleteNext(Some(next)) => {
339                // If there is a stale record, we should add it to past.
340                self.past_push(next);
341            }
342            Phase::PostDeleteNext(None) => {
343                // No action needed.
344            }
345            Phase::PostInsert(next) => {
346                // If there is a current record, we should add it to past.
347                self.past_push(next);
348            }
349        }
350
351        // Attach the tip of past to the entry.
352        if let Some(past) = self.past.take() {
353            entry.get_mut().next = Some(past);
354        }
355    }
356}
357
358/// An immutable iterator over the values associated with a translated key.
359pub struct ImmutableCursor<'a, V: Eq> {
360    current: Option<&'a Record<V>>,
361}
362
363impl<'a, V: Eq> ImmutableCursor<'a, V> {
364    /// Creates a new `ImmutableCursor` from a `Record`.
365    fn new(record: &'a Record<V>) -> Self {
366        Self {
367            current: Some(record),
368        }
369    }
370}
371
372impl<'a, V: Eq> Iterator for ImmutableCursor<'a, V> {
373    type Item = &'a V;
374
375    fn next(&mut self) -> Option<Self::Item> {
376        self.current.map(|record| {
377            let value = &record.value;
378            self.current = record.next.as_deref();
379            value
380        })
381    }
382}
383
384/// A memory-efficient index that maps translated keys to arbitrary values.
385pub struct Index<T: Translator, V: Eq> {
386    translator: T,
387    map: HashMap<T::Key, Record<V>, T>,
388
389    keys: Gauge,
390    items: Gauge,
391    pruned: Counter,
392}
393
394impl<T: Translator, V: Eq> Index<T, V> {
395    /// Create a new index with the given translator.
396    pub fn init(ctx: impl Metrics, tr: T) -> Self {
397        let s = Self {
398            translator: tr.clone(),
399            map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, tr),
400
401            keys: Gauge::default(),
402            items: Gauge::default(),
403            pruned: Counter::default(),
404        };
405        ctx.register(
406            "keys",
407            "Number of translated keys in the index",
408            s.keys.clone(),
409        );
410        ctx.register("items", "Number of items in the index", s.items.clone());
411        ctx.register("pruned", "Number of items pruned", s.pruned.clone());
412        s
413    }
414
415    #[inline]
416    /// Returns the number of translated keys in the index.
417    pub fn keys(&self) -> usize {
418        self.map.len()
419    }
420
421    /// Returns the number of items in the index, for use in testing. The number of items is always
422    /// at least as large as the number of keys, but may be larger in the case of collisions.
423    #[cfg(test)]
424    pub fn items(&self) -> usize {
425        self.items.get() as usize
426    }
427
428    /// Returns an iterator over all values associated with a translated key.
429    pub fn get(&self, key: &[u8]) -> impl Iterator<Item = &V> {
430        let k = self.translator.transform(key);
431        self.map
432            .get(&k)
433            .map(|record| ImmutableCursor::new(record))
434            .into_iter()
435            .flatten()
436    }
437
438    /// Provides mutable access to the values associated with a translated key, if the key exists.
439    pub fn get_mut(&mut self, key: &[u8]) -> Option<Cursor<'_, T, V>> {
440        let k = self.translator.transform(key);
441        match self.map.entry(k) {
442            Entry::Occupied(entry) => Some(Cursor::<'_, T, V>::new(
443                entry,
444                &self.keys,
445                &self.items,
446                &self.pruned,
447            )),
448            Entry::Vacant(_) => None,
449        }
450    }
451
452    /// Create a new entry in the index.
453    fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<T::Key, Record<V>>, v: V) {
454        keys.inc();
455        items.inc();
456        vacant.insert(Record {
457            value: v,
458            next: None,
459        });
460    }
461
462    /// Provides mutable access to the values associated with a translated key (if the key exists), otherwise
463    /// inserts a new value and returns `None`.
464    pub fn get_mut_or_insert(&mut self, key: &[u8], v: V) -> Option<Cursor<'_, T, V>> {
465        let k = self.translator.transform(key);
466        match self.map.entry(k) {
467            Entry::Occupied(entry) => Some(Cursor::<'_, T, V>::new(
468                entry,
469                &self.keys,
470                &self.items,
471                &self.pruned,
472            )),
473            Entry::Vacant(entry) => {
474                Self::create(&self.keys, &self.items, entry, v);
475                None
476            }
477        }
478    }
479
480    /// Remove all values at the given translated key.
481    pub fn remove(&mut self, key: &[u8]) {
482        // To ensure metrics are accurate, we iterate over all
483        // conflicting values and remove them one-by-one (rather
484        // than just removing the entire entry).
485        self.prune(key, |_| true);
486    }
487
488    /// Insert a value at the given translated key.
489    pub fn insert(&mut self, key: &[u8], v: V) {
490        let k = self.translator.transform(key);
491        match self.map.entry(k) {
492            Entry::Occupied(entry) => {
493                let mut cursor =
494                    Cursor::<'_, T, V>::new(entry, &self.keys, &self.items, &self.pruned);
495                cursor.next();
496                cursor.insert(v);
497            }
498            Entry::Vacant(entry) => {
499                Self::create(&self.keys, &self.items, entry, v);
500            }
501        }
502    }
503
504    /// Insert a value at the given translated key, and prune any values that are no longer valid.
505    ///
506    /// If the value is prunable, it will not be inserted.
507    pub fn insert_and_prune(&mut self, key: &[u8], v: V, prune: impl Fn(&V) -> bool) {
508        let k = self.translator.transform(key);
509        match self.map.entry(k) {
510            Entry::Occupied(entry) => {
511                // Get entry
512                let mut cursor =
513                    Cursor::<'_, T, V>::new(entry, &self.keys, &self.items, &self.pruned);
514
515                // Remove anything that is prunable.
516                loop {
517                    let Some(old) = cursor.next() else {
518                        break;
519                    };
520                    if prune(old) {
521                        cursor.delete();
522                    }
523                }
524
525                // Add our new value (if not prunable).
526                if !prune(&v) {
527                    cursor.insert(v);
528                }
529            }
530            Entry::Vacant(entry) => {
531                Self::create(&self.keys, &self.items, entry, v);
532            }
533        }
534    }
535
536    /// Remove all values associated with a translated key that match the `prune` predicate.
537    pub fn prune(&mut self, key: &[u8], prune: impl Fn(&V) -> bool) {
538        let k = self.translator.transform(key);
539        match self.map.entry(k) {
540            Entry::Occupied(entry) => {
541                // Get cursor
542                let mut cursor =
543                    Cursor::<'_, T, V>::new(entry, &self.keys, &self.items, &self.pruned);
544
545                // Remove anything that is prunable.
546                loop {
547                    let Some(old) = cursor.next() else {
548                        break;
549                    };
550                    if prune(old) {
551                        cursor.delete();
552                    }
553                }
554            }
555            Entry::Vacant(_) => {}
556        }
557    }
558}
559
560impl<T: Translator, V: Eq> Drop for Index<T, V> {
561    /// To avoid stack overflow on keys with many collisions, we implement an iterative
562    /// drop (in lieu of Rust's default recursive drop).
563    fn drop(&mut self) {
564        for (_, mut record) in self.map.drain() {
565            let mut next = record.next.take();
566            while let Some(mut record) = next {
567                next = record.next.take();
568            }
569        }
570    }
571}