commonware_storage/index/
unordered.rs1use crate::{
6 index::{
7 storage::{Cursor as CursorImpl, ImmutableCursor, IndexEntry, Record},
8 Cursor as CursorTrait, Unordered,
9 },
10 translator::Translator,
11};
12use commonware_runtime::Metrics;
13use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
14use std::collections::{
15 hash_map::{Entry, OccupiedEntry, VacantEntry},
16 HashMap,
17};
18
19const INITIAL_CAPACITY: usize = 256;
23
24impl<K: Send + Sync, V: Eq + Send + Sync> IndexEntry<V> for OccupiedEntry<'_, K, Record<V>> {
26 fn get(&self) -> &V {
27 &self.get().value
28 }
29 fn get_mut(&mut self) -> &mut Record<V> {
30 self.get_mut()
31 }
32 fn remove(self) {
33 OccupiedEntry::remove(self);
34 }
35}
36
37pub struct Cursor<'a, K: Send + Sync, V: Eq + Send + Sync> {
39 inner: CursorImpl<'a, V, OccupiedEntry<'a, K, Record<V>>>,
40}
41
42impl<'a, K: Send + Sync, V: Eq + Send + Sync> Cursor<'a, K, V> {
43 const fn new(
44 entry: OccupiedEntry<'a, K, Record<V>>,
45 keys: &'a Gauge,
46 items: &'a Gauge,
47 pruned: &'a Counter,
48 ) -> Self {
49 Self {
50 inner: CursorImpl::<'a, V, OccupiedEntry<'a, K, Record<V>>>::new(
51 entry, keys, items, pruned,
52 ),
53 }
54 }
55}
56
57impl<K: Send + Sync, V: Eq + Send + Sync> CursorTrait for Cursor<'_, K, V> {
58 type Value = V;
59
60 fn next(&mut self) -> Option<&V> {
61 self.inner.next()
62 }
63
64 fn insert(&mut self, value: V) {
65 self.inner.insert(value)
66 }
67
68 fn delete(&mut self) {
69 self.inner.delete()
70 }
71
72 fn update(&mut self, value: V) {
73 self.inner.update(value)
74 }
75
76 fn prune(&mut self, predicate: &impl Fn(&V) -> bool) {
77 self.inner.prune(predicate)
78 }
79}
80
81pub struct Index<T: Translator, V: Eq + Send + Sync> {
84 translator: T,
85 map: HashMap<T::Key, Record<V>, T>,
86
87 keys: Gauge,
88 items: Gauge,
89 pruned: Counter,
90}
91
92impl<T: Translator, V: Eq + Send + Sync> Index<T, V> {
93 fn create(keys: &Gauge, items: &Gauge, vacant: VacantEntry<'_, T::Key, Record<V>>, v: V) {
95 keys.inc();
96 items.inc();
97 vacant.insert(Record {
98 value: v,
99 next: None,
100 });
101 }
102
103 pub fn new(ctx: impl Metrics, translator: T) -> Self {
105 let s = Self {
106 translator: translator.clone(),
107 map: HashMap::with_capacity_and_hasher(INITIAL_CAPACITY, translator),
108 keys: Gauge::default(),
109 items: Gauge::default(),
110 pruned: Counter::default(),
111 };
112 ctx.register(
113 "keys",
114 "Number of translated keys in the index",
115 s.keys.clone(),
116 );
117 ctx.register("items", "Number of items in the index", s.items.clone());
118 ctx.register("pruned", "Number of items pruned", s.pruned.clone());
119 s
120 }
121}
122
123impl<T: Translator, V: Eq + Send + Sync> super::Factory<T> for Index<T, V> {
124 fn new(ctx: impl commonware_runtime::Metrics, translator: T) -> Self {
125 Self::new(ctx, translator)
126 }
127}
128
129impl<T: Translator, V: Eq + Send + Sync> Unordered for Index<T, V> {
130 type Value = V;
131 type Cursor<'a>
132 = Cursor<'a, T::Key, V>
133 where
134 Self: 'a;
135
136 fn get<'a>(&'a self, key: &[u8]) -> impl Iterator<Item = &'a V> + 'a
137 where
138 V: 'a,
139 {
140 let k = self.translator.transform(key);
141 self.map
142 .get(&k)
143 .map(|record| ImmutableCursor::new(record))
144 .into_iter()
145 .flatten()
146 }
147
148 fn get_mut<'a>(&'a mut self, key: &[u8]) -> Option<Self::Cursor<'a>> {
149 let k = self.translator.transform(key);
150 match self.map.entry(k) {
151 Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
152 entry,
153 &self.keys,
154 &self.items,
155 &self.pruned,
156 )),
157 Entry::Vacant(_) => None,
158 }
159 }
160
161 fn get_mut_or_insert<'a>(&'a mut self, key: &[u8], value: V) -> Option<Self::Cursor<'a>> {
162 let k = self.translator.transform(key);
163 match self.map.entry(k) {
164 Entry::Occupied(entry) => Some(Cursor::<'_, T::Key, V>::new(
165 entry,
166 &self.keys,
167 &self.items,
168 &self.pruned,
169 )),
170 Entry::Vacant(entry) => {
171 Self::create(&self.keys, &self.items, entry, value);
172 None
173 }
174 }
175 }
176
177 fn insert(&mut self, key: &[u8], v: V) {
178 let k = self.translator.transform(key);
179 match self.map.entry(k) {
180 Entry::Occupied(entry) => {
181 let mut cursor =
182 Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
183 cursor.next();
184 cursor.insert(v);
185 }
186 Entry::Vacant(entry) => {
187 Self::create(&self.keys, &self.items, entry, v);
188 }
189 }
190 }
191
192 fn insert_and_prune(&mut self, key: &[u8], value: V, predicate: impl Fn(&V) -> bool) {
193 let k = self.translator.transform(key);
194 match self.map.entry(k) {
195 Entry::Occupied(entry) => {
196 let mut cursor =
198 Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
199
200 cursor.prune(&predicate);
201
202 if !predicate(&value) {
204 cursor.insert(value);
205 }
206 }
207 Entry::Vacant(entry) => {
208 Self::create(&self.keys, &self.items, entry, value);
209 }
210 }
211 }
212
213 fn prune(&mut self, key: &[u8], predicate: impl Fn(&V) -> bool) {
214 let k = self.translator.transform(key);
215 match self.map.entry(k) {
216 Entry::Occupied(entry) => {
217 let mut cursor =
219 Cursor::<'_, T::Key, V>::new(entry, &self.keys, &self.items, &self.pruned);
220
221 cursor.prune(&predicate);
222 }
223 Entry::Vacant(_) => {}
224 }
225 }
226
227 fn remove(&mut self, key: &[u8]) {
228 self.prune(key, |_| true);
231 }
232
233 #[cfg(test)]
234 fn keys(&self) -> usize {
235 self.map.len()
236 }
237
238 #[cfg(test)]
239 fn items(&self) -> usize {
240 self.items.get() as usize
241 }
242
243 #[cfg(test)]
244 fn pruned(&self) -> usize {
245 self.pruned.get() as usize
246 }
247}
248
249impl<T: Translator, V: Eq + Send + Sync> Drop for Index<T, V> {
250 fn drop(&mut self) {
253 for (_, mut record) in self.map.drain() {
254 let mut next = record.next.take();
255 while let Some(mut record) = next {
256 next = record.next.take();
257 }
258 }
259 }
260}