commonware_storage/metadata/
storage.rs1use super::{Config, Error};
2use bytes::BufMut;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_cryptography::{crc32, Crc32};
5use commonware_runtime::{
6 telemetry::metrics::status::GaugeExt, Blob, Clock, Error as RError, Metrics, Storage,
7};
8use commonware_utils::Span;
9use futures::future::try_join_all;
10use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use tracing::{debug, warn};
13
14const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
16
17struct Info {
19 start: usize,
20 length: usize,
21}
22
23impl Info {
24 const fn new(start: usize, length: usize) -> Self {
26 Self { start, length }
27 }
28}
29
30struct Wrapper<B: Blob, K: Span> {
32 blob: B,
33 version: u64,
34 lengths: HashMap<K, Info>,
35 modified: BTreeSet<K>,
36 data: Vec<u8>,
37}
38
39impl<B: Blob, K: Span> Wrapper<B, K> {
40 const fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
42 Self {
43 blob,
44 version,
45 lengths,
46 modified: BTreeSet::new(),
47 data,
48 }
49 }
50
51 fn empty(blob: B) -> Self {
53 Self {
54 blob,
55 version: 0,
56 lengths: HashMap::new(),
57 modified: BTreeSet::new(),
58 data: Vec::new(),
59 }
60 }
61}
62
63pub struct Metadata<E: Clock + Storage + Metrics, K: Span, V: Codec> {
65 context: E,
66
67 map: BTreeMap<K, V>,
68 cursor: usize,
69 key_order_changed: u64,
70 next_version: u64,
71 partition: String,
72 blobs: [Wrapper<E::Blob, K>; 2],
73
74 sync_overwrites: Counter,
75 sync_rewrites: Counter,
76 keys: Gauge,
77}
78
79impl<E: Clock + Storage + Metrics, K: Span, V: Codec> Metadata<E, K, V> {
80 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
82 let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
84 let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
85
86 let (left_map, left_wrapper) =
88 Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
89 let (right_map, right_wrapper) =
90 Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
91
92 let mut map = left_map;
94 let mut cursor = 0;
95 let mut version = left_wrapper.version;
96 if right_wrapper.version > left_wrapper.version {
97 cursor = 1;
98 map = right_map;
99 version = right_wrapper.version;
100 }
101 let next_version = version.checked_add(1).expect("version overflow");
102
103 let sync_rewrites = Counter::default();
105 let sync_overwrites = Counter::default();
106 let keys = Gauge::default();
107 context.register(
108 "sync_rewrites",
109 "number of syncs that rewrote all data",
110 sync_rewrites.clone(),
111 );
112 context.register(
113 "sync_overwrites",
114 "number of syncs that modified existing data",
115 sync_overwrites.clone(),
116 );
117 context.register("keys", "number of tracked keys", keys.clone());
118
119 let _ = keys.try_set(map.len());
121 Ok(Self {
122 context,
123
124 map,
125 cursor,
126 key_order_changed: next_version, next_version,
128 partition: cfg.partition,
129 blobs: [left_wrapper, right_wrapper],
130
131 sync_rewrites,
132 sync_overwrites,
133 keys,
134 })
135 }
136
137 async fn load(
138 codec_config: &V::Cfg,
139 index: usize,
140 blob: E::Blob,
141 len: u64,
142 ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
143 if len == 0 {
145 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
147 }
148
149 let len = len.try_into().map_err(|_| Error::BlobTooLarge(len))?;
151 let buf = blob.read_at(vec![0u8; len], 0).await?;
152
153 if buf.len() < 8 + crc32::Digest::SIZE {
157 warn!(
159 blob = index,
160 len = buf.len(),
161 "blob is too short: truncating"
162 );
163 blob.resize(0).await?;
164 blob.sync().await?;
165 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
166 }
167
168 let checksum_index = buf.len() - crc32::Digest::SIZE;
170 let stored_checksum =
171 u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
172 let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]);
173 if stored_checksum != computed_checksum {
174 warn!(
176 blob = index,
177 stored = stored_checksum,
178 computed = computed_checksum,
179 "checksum mismatch: truncating"
180 );
181 blob.resize(0).await?;
182 blob.sync().await?;
183 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
184 }
185
186 let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
188
189 let mut data = BTreeMap::new();
194 let mut lengths = HashMap::new();
195 let mut cursor = u64::SIZE;
196 while cursor < checksum_index {
197 let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
199 .expect("unable to read key from blob");
200 cursor += key.encode_size();
201
202 let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
204 .expect("unable to read value from blob");
205 lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
206 cursor += value.encode_size();
207 data.insert(key, value);
208 }
209
210 Ok((data, Wrapper::new(blob, version, lengths, buf.into())))
212 }
213
214 pub fn get(&self, key: &K) -> Option<&V> {
216 self.map.get(key)
217 }
218
219 pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
221 let value = self.map.get_mut(key)?;
223
224 self.blobs[self.cursor].modified.insert(key.clone());
228 self.blobs[1 - self.cursor].modified.insert(key.clone());
229
230 Some(value)
231 }
232
233 pub fn clear(&mut self) {
236 self.map.clear();
238
239 self.key_order_changed = self.next_version;
241 self.keys.set(0);
242 }
243
244 pub fn put(&mut self, key: K, value: V) -> Option<V> {
250 let previous = self.map.insert(key.clone(), value);
252
253 if previous.is_some() {
257 self.blobs[self.cursor].modified.insert(key.clone());
258 self.blobs[1 - self.cursor].modified.insert(key);
259 } else {
260 self.key_order_changed = self.next_version;
261 }
262 let _ = self.keys.try_set(self.map.len());
263 previous
264 }
265
266 pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
268 self.put(key, value);
269 self.sync().await
270 }
271
272 pub fn upsert(&mut self, key: K, f: impl FnOnce(&mut V))
274 where
275 V: Default,
276 {
277 if let Some(value) = self.get_mut(&key) {
278 f(value);
280 } else {
281 let mut value = V::default();
283 f(&mut value);
284 self.put(key, value);
285 }
286 }
287
288 pub async fn upsert_sync(&mut self, key: K, f: impl FnOnce(&mut V)) -> Result<(), Error>
290 where
291 V: Default,
292 {
293 self.upsert(key, f);
294 self.sync().await
295 }
296
297 pub fn remove(&mut self, key: &K) -> Option<V> {
299 let past = self.map.remove(key);
301
302 if past.is_some() {
304 self.key_order_changed = self.next_version;
305 }
306 let _ = self.keys.try_set(self.map.len());
307
308 past
309 }
310
311 pub fn keys(&self) -> impl Iterator<Item = &K> {
313 self.map.keys()
314 }
315
316 pub fn retain(&mut self, mut f: impl FnMut(&K, &V) -> bool) {
318 let old_len = self.map.len();
320 self.map.retain(|k, v| f(k, v));
321 let new_len = self.map.len();
322
323 if new_len != old_len {
325 self.key_order_changed = self.next_version;
326 let _ = self.keys.try_set(self.map.len());
327 }
328 }
329
330 pub async fn sync(&mut self) -> Result<(), Error> {
332 let past_version = self.blobs[self.cursor].version;
337 let next_next_version = self.next_version.checked_add(1).expect("version overflow");
338
339 let target_cursor = 1 - self.cursor;
341 let target = &mut self.blobs[target_cursor];
342
343 let mut overwrite = true;
345 let mut writes = Vec::with_capacity(target.modified.len());
346 if self.key_order_changed < past_version {
347 for key in target.modified.iter() {
348 let info = target.lengths.get(key).expect("key must exist");
349 let new_value = self.map.get(key).expect("key must exist");
350 if info.length == new_value.encode_size() {
351 let encoded = new_value.encode_mut();
353 target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
354 writes.push(target.blob.write_at(encoded, info.start as u64));
355 } else {
356 overwrite = false;
358 break;
359 }
360 }
361 } else {
362 overwrite = false;
364 }
365
366 target.modified.clear();
368
369 if overwrite {
371 let version = self.next_version.to_be_bytes();
373 target.data[0..8].copy_from_slice(&version);
374 writes.push(target.blob.write_at(version.as_slice().into(), 0));
375
376 let checksum_index = target.data.len() - crc32::Digest::SIZE;
378 let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes();
379 target.data[checksum_index..].copy_from_slice(&checksum);
380 writes.push(
381 target
382 .blob
383 .write_at(checksum.as_slice().into(), checksum_index as u64),
384 );
385
386 try_join_all(writes).await?;
388 target.blob.sync().await?;
389
390 target.version = self.next_version;
392 self.cursor = target_cursor;
393 self.next_version = next_next_version;
394 self.sync_overwrites.inc();
395 return Ok(());
396 }
397
398 let mut lengths = HashMap::new();
400 let mut next_data = Vec::with_capacity(target.data.len());
401 next_data.put_u64(self.next_version);
402 for (key, value) in &self.map {
403 key.write(&mut next_data);
404 let start = next_data.len();
405 value.write(&mut next_data);
406 lengths.insert(key.clone(), Info::new(start, value.encode_size()));
407 }
408 next_data.put_u32(Crc32::checksum(&next_data[..]));
409
410 target.blob.write_at(next_data.clone(), 0).await?;
412 if next_data.len() < target.data.len() {
413 target.blob.resize(next_data.len() as u64).await?;
414 }
415 target.blob.sync().await?;
416
417 target.version = self.next_version;
419 target.lengths = lengths;
420 target.data = next_data;
421 self.cursor = target_cursor;
422 self.next_version = next_next_version;
423 self.sync_rewrites.inc();
424 Ok(())
425 }
426
427 pub async fn destroy(self) -> Result<(), Error> {
429 for (i, wrapper) in self.blobs.into_iter().enumerate() {
430 drop(wrapper.blob);
431 self.context
432 .remove(&self.partition, Some(BLOB_NAMES[i]))
433 .await?;
434 debug!(blob = i, "destroyed blob");
435 }
436 match self.context.remove(&self.partition, None).await {
437 Ok(()) => {}
438 Err(RError::PartitionMissing(_)) => {
439 }
441 Err(err) => return Err(Error::Runtime(err)),
442 }
443 Ok(())
444 }
445}