commonware_storage/index/
unordered.rs1use crate::{
6 index::{
7 storage::{Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
8 Cursor as CursorTrait, Unordered,
9 },
10 translator::Translator,
11};
12use commonware_runtime::Metrics;
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{
15 hash_map::{Entry, OccupiedEntry, VacantEntry},
16 HashMap,
17};
18
19const INITIAL_CAPACITY: usize = 256;
23
24impl<K: Send + Sync, V: Eq + Send + Sync> IndexEntry<V> for OccupiedEntry<'_, K, Record<V>> {
26 fn get(&self) -> &V {
27 &self.get().value
28 }
29 fn get_mut(&mut self) -> &mut Record<V> {
30 self.get_mut()
31 }
32 fn remove(self) {
33 OccupiedEntry::remove(self);
34 }
35}
36
37pub struct Cursor<'a, K: Send + Sync, V: Eq + Send + Sync> {
39 inner: CursorImpl<'a, V, OccupiedEntry<'a, K, Record<V>>>,
40}
41
42impl<'a, K: Send + Sync, V: Eq + Send + Sync> Cursor<'a, K, V> {
43 const fn new(
44 entry: OccupiedEntry<'a, K, Record<V>>,
45 keys: &'a Gauge,
46 items: &'a Gauge,
47 pruned: &'a Counter,
48 ) -> Self {
49 Self {
50 inner: CursorImpl::<'a, V, OccupiedEntry<'a, K, Record<V>>>::new(
51 entry, keys, items, pruned,
52 ),
53 }
54 }
55}
56
57impl<K: Send + Sync, V: Eq + Send + Sync> CursorTrait for Cursor<'_, K, V> {
58 type Value = V;
59
60 fn next(&mut self) -> Option<&V> {
61 self.inner.next()
62 }
63
64 fn insert(&mut self, value: V) {
65 self.inner.insert(value)
66 }
67
68 fn delete(&mut self) {
69 self.inner.delete()
70 }
71
72 fn update(&mut self, value: V) {
73 self.inner.update(value)
74 }
75
76 fn prune(&mut self, predicate: &impl Fn(&V) -> bool) {
77 self.inner.prune(predicate)
78 }
79}
80
81pub struct Index<T: Translator, V: Eq + Send + Sync> {
84 translator: T,
85 map: HashMap<T::Key, Record<V>, T>,
86
87 keys: Gauge,
88 items: Gauge,
89 pruned: Counter,
90}
91
92impl<T: Translator, V: Eq + Send + Sync> Index<T, V> {
93 fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<'_, T::Key, Record<V>>, v: V) {
95 keys.inc();
96 items.inc();
97 vacant.insert(Record {
98 value: v,
99 next: None,
100 });
101 }
102
103 pub fn new(ctx: impl Metrics, translator: T) -> Self {
105 let s = Self {
106 translator: translator.clone(),
107 map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, translator),
108 keys: Gauge::default(),
109 items: Gauge::default(),
110 pruned: Counter::default(),
111 };
112 ctx.register(
113 "keys",
114 "Number of translated keys in the index",
115 s.keys.clone(),
116 );
117 ctx.register("items", "Number of items in the index", s.items.clone());
118 ctx.register("pruned", "Number of items pruned", s.pruned.clone());
119 s
120 }
121}
122
123impl<T: Translator, V: Eq + Send + Sync> Unordered for Index<T, V> {
124 type Value = V;
125 type Cursor<'a>
126 = Cursor<'a, T::Key, V>
127 where
128 Self: 'a;
129
130 fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
131 where
132 V: 'a,
133 {
134 let k = self.translator.transform(key);
135 self.map
136 .get(&k)
137 .map(|record| ImmutableCursor::new(record))
138 .into_iter()
139 .flatten()
140 }
141
142 fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
143 let k = self.translator.transform(key);
144 match self.map.entry(k) {
145 Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
146 entry,
147 &self.keys,
148 &self.items,
149 &self.pruned,
150 )),
151 Entry::Vacant(_) => None,
152 }
153 }
154
155 fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
156 let k = self.translator.transform(key);
157 match self.map.entry(k) {
158 Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
159 entry,
160 &self.keys,
161 &self.items,
162 &self.pruned,
163 )),
164 Entry::Vacant(entry) => {
165 Self::create(&self.keys, &self.items, entry, value);
166 None
167 }
168 }
169 }
170
171 fn insert(&mut self, key: &[u8], v: V) {
172 let k = self.translator.transform(key);
173 match self.map.entry(k) {
174 Entry::Occupied(entry) => {
175 let mut cursor =
176 Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
177 cursor.next();
178 cursor.insert(v);
179 }
180 Entry::Vacant(entry) => {
181 Self::create(&self.keys, &self.items, entry, v);
182 }
183 }
184 }
185
186 fn insert_and_prune(&mut self, key: &[u8], value: V, predicate: impl Fn(&V) -> bool) {
187 let k = self.translator.transform(key);
188 match self.map.entry(k) {
189 Entry::Occupied(entry) => {
190 let mut cursor =
192 Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
193
194 cursor.prune(&predicate);
195
196 if !predicate(&value) {
198 cursor.insert(value);
199 }
200 }
201 Entry::Vacant(entry) => {
202 Self::create(&self.keys, &self.items, entry, value);
203 }
204 }
205 }
206
207 fn prune(&mut self, key: &[u8], predicate: impl Fn(&V) -> bool) {
208 let k = self.translator.transform(key);
209 match self.map.entry(k) {
210 Entry::Occupied(entry) => {
211 let mut cursor =
213 Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
214
215 cursor.prune(&predicate);
216 }
217 Entry::Vacant(_) => {}
218 }
219 }
220
221 fn remove(&mut self, key: &[u8]) {
222 self.prune(key, |_| true);
225 }
226
227 #[cfg(test)]
228 fn keys(&self) -> usize {
229 self.map.len()
230 }
231
232 #[cfg(test)]
233 fn items(&self) -> usize {
234 self.items.get() as usize
235 }
236
237 #[cfg(test)]
238 fn pruned(&self) -> usize {
239 self.pruned.get() as usize
240 }
241}
242
243impl<T: Translator, V: Eq + Send + Sync> Drop for Index<T, V> {
244 fn drop(&mut self) {
247 for (_, mut record) in self.map.drain() {
248 let mut next = record.next.take();
249 while let Some(mut record) = next {
250 next = record.next.take();
251 }
252 }
253 }
254}