commonware_storage/metadata/
storage.rs1use super::{Config, Error};
2use crate::Context;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_cryptography::{crc32, Crc32};
5use commonware_runtime::{telemetry::metrics::status::GaugeExt, Blob, BufMut, Error as RError};
6use commonware_utils::{sync::AsyncMutex, Span};
7use futures::future::try_join_all;
8use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use tracing::{debug, warn};
11
12const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
14
15struct Info {
17 start: usize,
18 length: usize,
19}
20
21impl Info {
22 const fn new(start: usize, length: usize) -> Self {
24 Self { start, length }
25 }
26}
27
28struct Wrapper<B: Blob, K: Span> {
30 blob: B,
31 version: u64,
32 lengths: HashMap<K, Info>,
33 modified: BTreeSet<K>,
34 data: Vec<u8>,
35}
36
37impl<B: Blob, K: Span> Wrapper<B, K> {
38 const fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
40 Self {
41 blob,
42 version,
43 lengths,
44 modified: BTreeSet::new(),
45 data,
46 }
47 }
48
49 fn empty(blob: B) -> Self {
51 Self {
52 blob,
53 version: 0,
54 lengths: HashMap::new(),
55 modified: BTreeSet::new(),
56 data: Vec::new(),
57 }
58 }
59}
60
61struct State<B: Blob, K: Span> {
63 cursor: usize,
64 next_version: u64,
65 key_order_changed: u64,
66 blobs: [Wrapper<B, K>; 2],
67}
68
69pub struct Metadata<E: Context, K: Span, V: Codec> {
71 context: E,
72
73 map: BTreeMap<K, V>,
74 partition: String,
75 state: AsyncMutex<State<E::Blob, K>>,
76
77 sync_overwrites: Counter,
78 sync_rewrites: Counter,
79 keys: Gauge,
80}
81
82impl<E: Context, K: Span, V: Codec> Metadata<E, K, V> {
83 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
85 let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
87 let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
88
89 let (left_map, left_wrapper) =
91 Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
92 let (right_map, right_wrapper) =
93 Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
94
95 let mut map = left_map;
97 let mut cursor = 0;
98 let mut version = left_wrapper.version;
99 if right_wrapper.version > left_wrapper.version {
100 cursor = 1;
101 map = right_map;
102 version = right_wrapper.version;
103 }
104 let next_version = version.checked_add(1).expect("version overflow");
105
106 let sync_rewrites = Counter::default();
108 let sync_overwrites = Counter::default();
109 let keys = Gauge::default();
110 context.register(
111 "sync_rewrites",
112 "number of syncs that rewrote all data",
113 sync_rewrites.clone(),
114 );
115 context.register(
116 "sync_overwrites",
117 "number of syncs that modified existing data",
118 sync_overwrites.clone(),
119 );
120 context.register("keys", "number of tracked keys", keys.clone());
121
122 let _ = keys.try_set(map.len());
124 Ok(Self {
125 context,
126
127 map,
128 partition: cfg.partition,
129 state: AsyncMutex::new(State {
130 cursor,
131 next_version,
132 key_order_changed: next_version, blobs: [left_wrapper, right_wrapper],
134 }),
135
136 sync_rewrites,
137 sync_overwrites,
138 keys,
139 })
140 }
141
142 async fn load(
143 codec_config: &V::Cfg,
144 index: usize,
145 blob: E::Blob,
146 len: u64,
147 ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
148 if len == 0 {
150 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
152 }
153
154 let len: usize = len.try_into().expect("blob too large for platform");
156 let buf = blob.read_at(0, len).await?.coalesce();
157
158 if buf.len() < 8 + crc32::Digest::SIZE {
162 warn!(
164 blob = index,
165 len = buf.len(),
166 "blob is too short: truncating"
167 );
168 blob.resize(0).await?;
169 blob.sync().await?;
170 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
171 }
172
173 let checksum_index = buf.len() - crc32::Digest::SIZE;
175 let stored_checksum =
176 u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
177 let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]);
178 if stored_checksum != computed_checksum {
179 warn!(
181 blob = index,
182 stored = stored_checksum,
183 computed = computed_checksum,
184 "checksum mismatch: truncating"
185 );
186 blob.resize(0).await?;
187 blob.sync().await?;
188 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
189 }
190
191 let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
193
194 let mut data = BTreeMap::new();
199 let mut lengths = HashMap::new();
200 let mut cursor = u64::SIZE;
201 while cursor < checksum_index {
202 let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
204 .expect("unable to read key from blob");
205 cursor += key.encode_size();
206
207 let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
209 .expect("unable to read value from blob");
210 lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
211 cursor += value.encode_size();
212 data.insert(key, value);
213 }
214
215 Ok((
217 data,
218 Wrapper::new(blob, version, lengths, buf.freeze().into()),
219 ))
220 }
221
222 pub fn get(&self, key: &K) -> Option<&V> {
224 self.map.get(key)
225 }
226
227 pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
229 let value = self.map.get_mut(key)?;
231
232 let state = self.state.get_mut();
236 state.blobs[state.cursor].modified.insert(key.clone());
237 state.blobs[1 - state.cursor].modified.insert(key.clone());
238
239 Some(value)
240 }
241
242 pub fn clear(&mut self) {
245 self.map.clear();
247
248 let state = self.state.get_mut();
250 state.key_order_changed = state.next_version;
251 self.keys.set(0);
252 }
253
254 pub fn put(&mut self, key: K, value: V) -> Option<V> {
260 let previous = self.map.insert(key.clone(), value);
262
263 let state = self.state.get_mut();
267 if previous.is_some() {
268 state.blobs[state.cursor].modified.insert(key.clone());
269 state.blobs[1 - state.cursor].modified.insert(key);
270 } else {
271 state.key_order_changed = state.next_version;
272 }
273 let _ = self.keys.try_set(self.map.len());
274 previous
275 }
276
277 pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
279 self.put(key, value);
280 self.sync().await
281 }
282
283 pub fn upsert(&mut self, key: K, f: impl FnOnce(&mut V))
285 where
286 V: Default,
287 {
288 if let Some(value) = self.get_mut(&key) {
289 f(value);
291 } else {
292 let mut value = V::default();
294 f(&mut value);
295 self.put(key, value);
296 }
297 }
298
299 pub async fn upsert_sync(&mut self, key: K, f: impl FnOnce(&mut V)) -> Result<(), Error>
301 where
302 V: Default,
303 {
304 self.upsert(key, f);
305 self.sync().await
306 }
307
308 pub fn remove(&mut self, key: &K) -> Option<V> {
310 let past = self.map.remove(key);
312
313 if past.is_some() {
315 let state = self.state.get_mut();
316 state.key_order_changed = state.next_version;
317 }
318 let _ = self.keys.try_set(self.map.len());
319
320 past
321 }
322
323 pub fn keys(&self) -> impl Iterator<Item = &K> {
325 self.map.keys()
326 }
327
328 pub fn retain(&mut self, mut f: impl FnMut(&K, &V) -> bool) {
330 let old_len = self.map.len();
332 self.map.retain(|k, v| f(k, v));
333 let new_len = self.map.len();
334
335 if new_len != old_len {
337 let state = self.state.get_mut();
338 state.key_order_changed = state.next_version;
339 let _ = self.keys.try_set(self.map.len());
340 }
341 }
342
343 pub async fn sync(&self) -> Result<(), Error> {
345 let mut state = self.state.lock().await;
348
349 let cursor = state.cursor;
351 let next_version = state.next_version;
352 let key_order_changed = state.key_order_changed;
353
354 let past_version = state.blobs[cursor].version;
360 let next_next_version = next_version.checked_add(1).expect("version overflow");
361
362 let target_cursor = 1 - cursor;
364
365 state.cursor = target_cursor;
367 state.next_version = next_next_version;
368
369 let target = &mut state.blobs[target_cursor];
371
372 let mut overwrite = true;
375 let mut writes = vec![];
376 if key_order_changed < past_version {
377 let write_capacity = target.modified.len() + 2;
378 writes.reserve(write_capacity);
379 for key in target.modified.iter() {
380 let info = target.lengths.get(key).expect("key must exist");
381 let new_value = self.map.get(key).expect("key must exist");
382 if info.length == new_value.encode_size() {
383 let encoded = new_value.encode_mut();
385 target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
386 writes.push(target.blob.write_at(info.start as u64, encoded));
387 } else {
388 overwrite = false;
390 break;
391 }
392 }
393 } else {
394 overwrite = false;
396 }
397
398 target.modified.clear();
400
401 if overwrite {
403 let version = next_version.to_be_bytes();
405 target.data[0..8].copy_from_slice(&version);
406 writes.push(target.blob.write_at(0, version.as_slice().into()));
407
408 let checksum_index = target.data.len() - crc32::Digest::SIZE;
410 let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes();
411 target.data[checksum_index..].copy_from_slice(&checksum);
412 writes.push(
413 target
414 .blob
415 .write_at(checksum_index as u64, checksum.as_slice().into()),
416 );
417
418 try_join_all(writes).await?;
420 target.blob.sync().await?;
421
422 target.version = next_version;
424 self.sync_overwrites.inc();
425 return Ok(());
426 }
427
428 let mut lengths = HashMap::new();
430 let mut next_data = Vec::with_capacity(target.data.len());
431 next_data.put_u64(next_version);
432
433 for (key, value) in &self.map {
435 key.write(&mut next_data);
436 let start = next_data.len();
437 value.write(&mut next_data);
438 lengths.insert(key.clone(), Info::new(start, value.encode_size()));
439 }
440 next_data.put_u32(Crc32::checksum(&next_data[..]));
441
442 target.blob.write_at(0, next_data.clone()).await?;
444 if next_data.len() < target.data.len() {
445 target.blob.resize(next_data.len() as u64).await?;
446 }
447 target.blob.sync().await?;
448
449 target.version = next_version;
451 target.lengths = lengths;
452 target.data = next_data;
453
454 self.sync_rewrites.inc();
455 Ok(())
456 }
457
458 pub async fn destroy(self) -> Result<(), Error> {
460 let state = self.state.into_inner();
461 for (i, wrapper) in state.blobs.into_iter().enumerate() {
462 drop(wrapper.blob);
463 self.context
464 .remove(&self.partition, Some(BLOB_NAMES[i]))
465 .await?;
466 debug!(blob = i, "destroyed blob");
467 }
468 match self.context.remove(&self.partition, None).await {
469 Ok(()) => {}
470 Err(RError::PartitionMissing(_)) => {
471 }
473 Err(err) => return Err(Error::Runtime(err)),
474 }
475 Ok(())
476 }
477}