commonware_storage/index/storage.rs
1use crate::translator::Translator;
2use commonware_runtime::Metrics;
3use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
4use std::collections::{
5 hash_map::{Entry, OccupiedEntry, VacantEntry},
6 HashMap,
7};
8
9/// The initial capacity of the internal hashmap. This is a guess at the number of unique keys we will
10/// encounter. The hashmap will grow as needed, but this is a good starting point (covering
11/// the entire [crate::translator::OneCap] range).
12const INITIAL_CAPACITY: usize = 256;
13
14/// Panic message shown when `next()` is not called after `Cursor` creation or after `insert()` or ``delete()`.
15const MUST_CALL_NEXT: &str = "must call Cursor::next()";
16
17/// Panic message shown when `update()` is called after `Cursor` has returned `None` or after `insert()`
18/// or `delete()` (but before `next()`).
19const NO_ACTIVE_ITEM: &str = "no active item in Cursor";
20
21/// Each key is mapped to a `Record` that contains a linked list of potential values for that key.
22///
23/// We avoid using a `Vec` to store values because the common case (where there are no collisions) would
24/// require an additional 24 bytes of memory for each value (the `len`, `capacity`, and `ptr` fields).
25///
26/// Again optimizing for the common case, we store the first value directly in the `Record` to avoid
27/// indirection (heap jumping).
28#[derive(PartialEq, Eq)]
29struct Record<V: Eq> {
30 value: V,
31 next: Option<Box<Record<V>>>,
32}
33
34/// Phases of the `Cursor` during iteration.
35#[derive(PartialEq, Eq)]
36enum Phase<V: Eq> {
37 /// Before iteration starts.
38 Initial,
39
40 /// The current entry.
41 Entry,
42 /// Some item after the current entry.
43 Next(Box<Record<V>>),
44
45 /// Iteration is done.
46 Done,
47 /// The current entry has no valid item.
48 EntryDeleted,
49
50 /// The current entry has been deleted and we've updated its value in-place
51 /// to be the value of the next record.
52 PostDeleteEntry,
53 /// The item has been deleted and we may be pointing to the next item.
54 PostDeleteNext(Option<Box<Record<V>>>),
55 /// An item has been inserted.
56 PostInsert(Box<Record<V>>),
57}
58
59/// A mutable iterator over the values associated with a translated key, allowing in-place modifications.
60///
61/// The `Cursor` provides a way to traverse and modify the linked list of `Record`s while maintaining its
62/// structure. It supports:
63///
64/// - Iteration via `next()` to access values.
65/// - Modification via `update()` to change the current value.
66/// - Insertion via `insert()` to add new values.
67/// - Deletion via `delete()` to remove values.
68///
69/// # Safety
70///
71/// - Must call `next()` before `update()`, `insert()`, or `delete()` to establish a valid position.
72/// - Once `next()` returns `None`, only `insert()` can be called.
73/// - Dropping the `Cursor` automatically restores the list structure by reattaching any detached `next` nodes.
74///
75/// _If you don't need advanced functionality, just use `insert()`, `insert_and_prune()`, or `remove()` instead._
76pub struct Cursor<'a, T: Translator, V: Eq> {
77 // The current phase of the cursor.
78 phase: Phase<V>,
79
80 // The current entry.
81 entry: Option<OccupiedEntry<'a, T::Key, Record<V>>>,
82
83 // The head of the linked list of previously visited records.
84 past: Option<Box<Record<V>>>,
85 // The tail of the linked list of previously visited records.
86 past_tail: Option<*mut Record<V>>,
87 // Whether we've pushed a record with a populated `next` field to `past` (invalidates `past_tail`).
88 past_pushed_list: bool,
89
90 // Metrics.
91 keys: &'a Gauge,
92 items: &'a Gauge,
93 pruned: &'a Counter,
94}
95
96impl<'a, T: Translator, V: Eq> Cursor<'a, T, V> {
97 /// Creates a new `Cursor` from a mutable record reference, detaching its `next` chain for iteration.
98 fn new(
99 entry: OccupiedEntry<'a, T::Key, Record<V>>,
100 keys: &'a Gauge,
101 items: &'a Gauge,
102 pruned: &'a Counter,
103 ) -> Self {
104 Self {
105 phase: Phase::Initial,
106
107 entry: Some(entry),
108
109 past: None,
110 past_tail: None,
111 past_pushed_list: false,
112
113 keys,
114 items,
115 pruned,
116 }
117 }
118
119 /// Pushes a `Record` to the end of `past`.
120 ///
121 /// If the record has a `next`, this function cannot be called again.
122 fn past_push(&mut self, next: Box<Record<V>>) {
123 // Ensure we only push a list once (`past_tail` becomes stale).
124 assert!(!self.past_pushed_list);
125 self.past_pushed_list = next.next.is_some();
126
127 // Add `next` to the tail of `past`.
128 if self.past_tail.is_none() {
129 self.past = Some(next);
130 self.past_tail = self.past.as_mut().map(|b| &mut **b as *mut Record<V>);
131 } else {
132 unsafe {
133 assert!((*self.past_tail.unwrap()).next.is_none());
134 (*self.past_tail.unwrap()).next = Some(next);
135 let tail_next = (*self.past_tail.unwrap()).next.as_mut().unwrap();
136 self.past_tail = Some(&mut **tail_next as *mut Record<V>);
137 }
138 }
139 }
140
141 /// Updates the value at the current position in the iteration.
142 ///
143 /// Panics if called before `next()` or after iteration is complete (`Status::Done` phase).
144 pub fn update(&mut self, v: V) {
145 match &mut self.phase {
146 Phase::Initial => unreachable!("{MUST_CALL_NEXT}"),
147 Phase::Entry => {
148 self.entry.as_mut().unwrap().get_mut().value = v;
149 }
150 Phase::Next(next) => {
151 next.value = v;
152 }
153 Phase::Done
154 | Phase::EntryDeleted
155 | Phase::PostDeleteEntry
156 | Phase::PostDeleteNext(_)
157 | Phase::PostInsert(_) => unreachable!("{NO_ACTIVE_ITEM}"),
158 }
159 }
160
161 /// If we are in a phase where we could return a value, return it.
162 fn value(&self) -> Option<&V> {
163 match &self.phase {
164 Phase::Initial => unreachable!(),
165 Phase::Entry => self.entry.as_ref().map(|r| &r.get().value),
166 Phase::Next(current) => Some(¤t.value),
167 Phase::Done | Phase::EntryDeleted => None,
168 Phase::PostDeleteEntry | Phase::PostDeleteNext(_) | Phase::PostInsert(_) => {
169 unreachable!()
170 }
171 }
172 }
173
174 /// Advances the cursor to the next value in the chain, returning a reference to it.
175 ///
176 /// This method must be called before any other operations (`insert()`, `delete()`, etc.). If
177 /// either `insert()` or `delete()` is called, `next()` must be called to set a new active
178 /// item. If after `insert()`, the next active item is the item after the inserted item. If after
179 /// `delete()`, the next active item is the item after the deleted item.
180 ///
181 /// Handles transitions between phases and adjusts for deletions. Returns `None` when the list is exhausted.
182 /// It is safe to call `next()` even after it returns `None`.
183 #[allow(clippy::should_implement_trait)]
184 pub fn next(&mut self) -> Option<&V> {
185 match std::mem::replace(&mut self.phase, Phase::Done) {
186 Phase::Initial | Phase::PostDeleteEntry => {
187 // We must start with some entry, so this will always be some non-None value.
188 self.phase = Phase::Entry;
189 }
190 Phase::Entry => {
191 // If there is a record after, we set it to be the current record.
192 if let Some(next) = self.entry.as_mut().unwrap().get_mut().next.take() {
193 self.phase = Phase::Next(next);
194 }
195 }
196 Phase::Next(mut current) | Phase::PostInsert(mut current) => {
197 // Take the next record and push the current one to the past list.
198 let next = current.next.take();
199 self.past_push(current);
200
201 // Set the next record to be the current record.
202 if let Some(next) = next {
203 self.phase = Phase::Next(next);
204 }
205 }
206 Phase::Done => {}
207 Phase::EntryDeleted => {
208 self.phase = Phase::EntryDeleted;
209 }
210 Phase::PostDeleteNext(current) => {
211 // If the stale value is some, we set it to be the current record.
212 if let Some(current) = current {
213 self.phase = Phase::Next(current);
214 }
215 }
216 }
217 self.value()
218 }
219
220 /// Inserts a new value at the current position.
221 pub fn insert(&mut self, v: V) {
222 self.items.inc();
223 match std::mem::replace(&mut self.phase, Phase::Done) {
224 Phase::Initial => unreachable!("{MUST_CALL_NEXT}"),
225 Phase::Entry => {
226 // Create a new record that points to entry's next.
227 let new = Box::new(Record {
228 value: v,
229 next: self.entry.as_mut().unwrap().get_mut().next.take(),
230 });
231
232 // Set the phase to the new record.
233 self.phase = Phase::PostInsert(new);
234 }
235 Phase::Next(mut current) => {
236 // Take next.
237 let next = current.next.take();
238
239 // Add current to the past list.
240 self.past_push(current);
241
242 // Create a new record that points to the next's next.
243 let new = Box::new(Record { value: v, next });
244 self.phase = Phase::PostInsert(new);
245 }
246 Phase::Done => {
247 // If we are done, we need to create a new record and
248 // immediately push it to the past list.
249 let new = Box::new(Record {
250 value: v,
251 next: None,
252 });
253 self.past_push(new);
254 }
255 Phase::EntryDeleted => {
256 // If entry is deleted, we need to update it.
257 self.entry.as_mut().unwrap().get_mut().value = v;
258
259 // We don't consider overwriting a deleted entry a collision.
260 }
261 Phase::PostDeleteEntry | Phase::PostDeleteNext(_) | Phase::PostInsert(_) => {
262 unreachable!("{MUST_CALL_NEXT}")
263 }
264 }
265 }
266
267 /// Deletes the current value, adjusting the list structure.
268 pub fn delete(&mut self) {
269 self.pruned.inc();
270 self.items.dec();
271 match std::mem::replace(&mut self.phase, Phase::Done) {
272 Phase::Initial => unreachable!("{MUST_CALL_NEXT}"),
273 Phase::Entry => {
274 // Attempt to overwrite the entry with the next value.
275 let entry = self.entry.as_mut().unwrap().get_mut();
276 if let Some(next) = entry.next.take() {
277 entry.value = next.value;
278 entry.next = next.next;
279 self.phase = Phase::PostDeleteEntry;
280 return;
281 }
282
283 // If there is no next, we consider the entry deleted.
284 self.phase = Phase::EntryDeleted;
285 // We wait to update metrics until `drop()`.
286 }
287 Phase::Next(mut current) => {
288 // Drop current instead of pushing it to the past list.
289 let next = current.next.take();
290 self.phase = Phase::PostDeleteNext(next);
291 }
292 Phase::Done | Phase::EntryDeleted => unreachable!("{NO_ACTIVE_ITEM}"),
293 Phase::PostDeleteEntry | Phase::PostDeleteNext(_) | Phase::PostInsert(_) => {
294 unreachable!("{MUST_CALL_NEXT}")
295 }
296 }
297 }
298}
299
300unsafe impl<'a, T, V> Send for Cursor<'a, T, V>
301where
302 T: Translator + Send,
303 T::Key: Send,
304 V: Eq + Send,
305{
306 // SAFETY: `Send` is safe because the raw pointer `past_tail` only ever points
307 // to heap memory owned by `self.past`. Since the pointer's referent is moved
308 // along with the `Cursor`, no data races can occur. The `where` clause
309 // ensures all generic parameters are also `Send`.
310}
311
312impl<T: Translator, V: Eq> Drop for Cursor<'_, T, V> {
313 fn drop(&mut self) {
314 // Take the entry.
315 let mut entry = self.entry.take().unwrap();
316
317 // If there is a dangling next, we should add it to past.
318 match std::mem::replace(&mut self.phase, Phase::Done) {
319 Phase::Initial | Phase::Entry => {
320 // No action needed.
321 }
322 Phase::Next(next) => {
323 // If there is a next, we should add it to past.
324 self.past_push(next);
325 }
326 Phase::Done => {
327 // No action needed.
328 }
329 Phase::EntryDeleted => {
330 // If the entry is deleted, we should remove it.
331 self.keys.dec();
332 entry.remove();
333 return;
334 }
335 Phase::PostDeleteEntry => {
336 // No action needed.
337 }
338 Phase::PostDeleteNext(Some(next)) => {
339 // If there is a stale record, we should add it to past.
340 self.past_push(next);
341 }
342 Phase::PostDeleteNext(None) => {
343 // No action needed.
344 }
345 Phase::PostInsert(next) => {
346 // If there is a current record, we should add it to past.
347 self.past_push(next);
348 }
349 }
350
351 // Attach the tip of past to the entry.
352 if let Some(past) = self.past.take() {
353 entry.get_mut().next = Some(past);
354 }
355 }
356}
357
358/// An immutable iterator over the values associated with a translated key.
359pub struct ImmutableCursor<'a, V: Eq> {
360 current: Option<&'a Record<V>>,
361}
362
363impl<'a, V: Eq> ImmutableCursor<'a, V> {
364 /// Creates a new `ImmutableCursor` from a `Record`.
365 fn new(record: &'a Record<V>) -> Self {
366 Self {
367 current: Some(record),
368 }
369 }
370}
371
372impl<'a, V: Eq> Iterator for ImmutableCursor<'a, V> {
373 type Item = &'a V;
374
375 fn next(&mut self) -> Option<Self::Item> {
376 self.current.map(|record| {
377 let value = &record.value;
378 self.current = record.next.as_deref();
379 value
380 })
381 }
382}
383
384/// A memory-efficient index that maps translated keys to arbitrary values.
385pub struct Index<T: Translator, V: Eq> {
386 translator: T,
387 map: HashMap<T::Key, Record<V>, T>,
388
389 keys: Gauge,
390 items: Gauge,
391 pruned: Counter,
392}
393
394impl<T: Translator, V: Eq> Index<T, V> {
395 /// Create a new index with the given translator.
396 pub fn init(ctx: impl Metrics, tr: T) -> Self {
397 let s = Self {
398 translator: tr.clone(),
399 map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, tr),
400
401 keys: Gauge::default(),
402 items: Gauge::default(),
403 pruned: Counter::default(),
404 };
405 ctx.register(
406 "keys",
407 "Number of translated keys in the index",
408 s.keys.clone(),
409 );
410 ctx.register("items", "Number of items in the index", s.items.clone());
411 ctx.register("pruned", "Number of items pruned", s.pruned.clone());
412 s
413 }
414
415 #[inline]
416 /// Returns the number of translated keys in the index.
417 pub fn keys(&self) -> usize {
418 self.map.len()
419 }
420
421 /// Returns the number of items in the index, for use in testing. The number of items is always
422 /// at least as large as the number of keys, but may be larger in the case of collisions.
423 #[cfg(test)]
424 pub fn items(&self) -> usize {
425 self.items.get() as usize
426 }
427
428 /// Returns an iterator over all values associated with a translated key.
429 pub fn get(&self, key: &[u8]) -> impl Iterator<Item = &V> {
430 let k = self.translator.transform(key);
431 self.map
432 .get(&k)
433 .map(|record| ImmutableCursor::new(record))
434 .into_iter()
435 .flatten()
436 }
437
438 /// Provides mutable access to the values associated with a translated key, if the key exists.
439 pub fn get_mut(&mut self, key: &[u8]) -> Option<Cursor<'_, T, V>> {
440 let k = self.translator.transform(key);
441 match self.map.entry(k) {
442 Entry::Occupied(entry) => Some(Cursor::<'_, T, V>::new(
443 entry,
444 &self.keys,
445 &self.items,
446 &self.pruned,
447 )),
448 Entry::Vacant(_) => None,
449 }
450 }
451
452 /// Create a new entry in the index.
453 fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<T::Key, Record<V>>, v: V) {
454 keys.inc();
455 items.inc();
456 vacant.insert(Record {
457 value: v,
458 next: None,
459 });
460 }
461
462 /// Provides mutable access to the values associated with a translated key (if the key exists), otherwise
463 /// inserts a new value and returns `None`.
464 pub fn get_mut_or_insert(&mut self, key: &[u8], v: V) -> Option<Cursor<'_, T, V>> {
465 let k = self.translator.transform(key);
466 match self.map.entry(k) {
467 Entry::Occupied(entry) => Some(Cursor::<'_, T, V>::new(
468 entry,
469 &self.keys,
470 &self.items,
471 &self.pruned,
472 )),
473 Entry::Vacant(entry) => {
474 Self::create(&self.keys, &self.items, entry, v);
475 None
476 }
477 }
478 }
479
480 /// Remove all values at the given translated key.
481 pub fn remove(&mut self, key: &[u8]) {
482 // To ensure metrics are accurate, we iterate over all
483 // conflicting values and remove them one-by-one (rather
484 // than just removing the entire entry).
485 self.prune(key, |_| true);
486 }
487
488 /// Insert a value at the given translated key.
489 pub fn insert(&mut self, key: &[u8], v: V) {
490 let k = self.translator.transform(key);
491 match self.map.entry(k) {
492 Entry::Occupied(entry) => {
493 let mut cursor =
494 Cursor::<'_, T, V>::new(entry, &self.keys, &self.items, &self.pruned);
495 cursor.next();
496 cursor.insert(v);
497 }
498 Entry::Vacant(entry) => {
499 Self::create(&self.keys, &self.items, entry, v);
500 }
501 }
502 }
503
504 /// Insert a value at the given translated key, and prune any values that are no longer valid.
505 ///
506 /// If the value is prunable, it will not be inserted.
507 pub fn insert_and_prune(&mut self, key: &[u8], v: V, prune: impl Fn(&V) -> bool) {
508 let k = self.translator.transform(key);
509 match self.map.entry(k) {
510 Entry::Occupied(entry) => {
511 // Get entry
512 let mut cursor =
513 Cursor::<'_, T, V>::new(entry, &self.keys, &self.items, &self.pruned);
514
515 // Remove anything that is prunable.
516 loop {
517 let Some(old) = cursor.next() else {
518 break;
519 };
520 if prune(old) {
521 cursor.delete();
522 }
523 }
524
525 // Add our new value (if not prunable).
526 if !prune(&v) {
527 cursor.insert(v);
528 }
529 }
530 Entry::Vacant(entry) => {
531 Self::create(&self.keys, &self.items, entry, v);
532 }
533 }
534 }
535
536 /// Remove all values associated with a translated key that match the `prune` predicate.
537 pub fn prune(&mut self, key: &[u8], prune: impl Fn(&V) -> bool) {
538 let k = self.translator.transform(key);
539 match self.map.entry(k) {
540 Entry::Occupied(entry) => {
541 // Get cursor
542 let mut cursor =
543 Cursor::<'_, T, V>::new(entry, &self.keys, &self.items, &self.pruned);
544
545 // Remove anything that is prunable.
546 loop {
547 let Some(old) = cursor.next() else {
548 break;
549 };
550 if prune(old) {
551 cursor.delete();
552 }
553 }
554 }
555 Entry::Vacant(_) => {}
556 }
557 }
558}
559
560impl<T: Translator, V: Eq> Drop for Index<T, V> {
561 /// To avoid stack overflow on keys with many collisions, we implement an iterative
562 /// drop (in lieu of Rust's default recursive drop).
563 fn drop(&mut self) {
564 for (_, mut record) in self.map.drain() {
565 let mut next = record.next.take();
566 while let Some(mut record) = next {
567 next = record.next.take();
568 }
569 }
570 }
571}