commonware_storage/metadata/
storage.rs1use super::{Config, Error};
2use crate::Context;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_cryptography::{crc32, Crc32};
5use commonware_runtime::{
6 telemetry::metrics::{Counter, Gauge, GaugeExt, MetricsExt as _},
7 Blob, BufMut, Error as RError,
8};
9use commonware_utils::{sync::AsyncMutex, Span};
10use futures::future::try_join_all;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use tracing::{debug, warn};
13
14const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
16
17struct Info {
19 start: usize,
20 length: usize,
21}
22
23impl Info {
24 const fn new(start: usize, length: usize) -> Self {
26 Self { start, length }
27 }
28}
29
30struct Wrapper<B: Blob, K: Span> {
32 blob: B,
33 version: u64,
34 lengths: HashMap<K, Info>,
35 modified: BTreeSet<K>,
36 data: Vec<u8>,
37}
38
39impl<B: Blob, K: Span> Wrapper<B, K> {
40 const fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
42 Self {
43 blob,
44 version,
45 lengths,
46 modified: BTreeSet::new(),
47 data,
48 }
49 }
50
51 fn empty(blob: B) -> Self {
53 Self {
54 blob,
55 version: 0,
56 lengths: HashMap::new(),
57 modified: BTreeSet::new(),
58 data: Vec::new(),
59 }
60 }
61}
62
63struct State<B: Blob, K: Span> {
65 cursor: usize,
66 next_version: u64,
67 key_order_changed: u64,
68 blobs: [Wrapper<B, K>; 2],
69}
70
71pub struct Metadata<E: Context, K: Span, V: Codec> {
73 context: E,
74
75 map: BTreeMap<K, V>,
76 partition: String,
77 state: AsyncMutex<State<E::Blob, K>>,
78
79 sync_overwrites: Counter,
80 sync_rewrites: Counter,
81 keys: Gauge,
82}
83
84impl<E: Context, K: Span, V: Codec> Metadata<E, K, V> {
85 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
87 let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
89 let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
90
91 let (left_map, left_wrapper) =
93 Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
94 let (right_map, right_wrapper) =
95 Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
96
97 let mut map = left_map;
99 let mut cursor = 0;
100 let mut version = left_wrapper.version;
101 if right_wrapper.version > left_wrapper.version {
102 cursor = 1;
103 map = right_map;
104 version = right_wrapper.version;
105 }
106 let next_version = version.checked_add(1).expect("version overflow");
107
108 let sync_rewrites =
110 context.counter("sync_rewrites", "number of syncs that rewrote all data");
111 let sync_overwrites = context.counter(
112 "sync_overwrites",
113 "number of syncs that modified existing data",
114 );
115 let keys = context.gauge("keys", "number of tracked keys");
116
117 let _ = keys.try_set(map.len());
119 Ok(Self {
120 context,
121
122 map,
123 partition: cfg.partition,
124 state: AsyncMutex::new(State {
125 cursor,
126 next_version,
127 key_order_changed: next_version, blobs: [left_wrapper, right_wrapper],
129 }),
130
131 sync_rewrites,
132 sync_overwrites,
133 keys,
134 })
135 }
136
137 async fn load(
138 codec_config: &V::Cfg,
139 index: usize,
140 blob: E::Blob,
141 len: u64,
142 ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
143 if len == 0 {
145 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
147 }
148
149 let len: usize = len.try_into().expect("blob too large for platform");
151 let buf = blob.read_at(0, len).await?.coalesce();
152
153 if buf.len() < 8 + crc32::Digest::SIZE {
157 warn!(
159 blob = index,
160 len = buf.len(),
161 "blob is too short: truncating"
162 );
163 blob.resize(0).await?;
164 blob.sync().await?;
165 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
166 }
167
168 let checksum_index = buf.len() - crc32::Digest::SIZE;
170 let stored_checksum =
171 u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
172 let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]);
173 if stored_checksum != computed_checksum {
174 warn!(
176 blob = index,
177 stored = stored_checksum,
178 computed = computed_checksum,
179 "checksum mismatch: truncating"
180 );
181 blob.resize(0).await?;
182 blob.sync().await?;
183 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
184 }
185
186 let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
188
189 let mut data = BTreeMap::new();
194 let mut lengths = HashMap::new();
195 let mut cursor = u64::SIZE;
196 while cursor < checksum_index {
197 let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
199 .expect("unable to read key from blob");
200 cursor += key.encode_size();
201
202 let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
204 .expect("unable to read value from blob");
205 lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
206 cursor += value.encode_size();
207 data.insert(key, value);
208 }
209
210 Ok((
212 data,
213 Wrapper::new(blob, version, lengths, buf.freeze().into()),
214 ))
215 }
216
217 pub fn get(&self, key: &K) -> Option<&V> {
219 self.map.get(key)
220 }
221
222 pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
224 let value = self.map.get_mut(key)?;
226
227 let state = self.state.get_mut();
231 state.blobs[state.cursor].modified.insert(key.clone());
232 state.blobs[1 - state.cursor].modified.insert(key.clone());
233
234 Some(value)
235 }
236
237 pub fn clear(&mut self) {
240 self.map.clear();
242
243 let state = self.state.get_mut();
245 state.key_order_changed = state.next_version;
246 self.keys.set(0);
247 }
248
249 pub fn put(&mut self, key: K, value: V) -> Option<V> {
255 let previous = self.map.insert(key.clone(), value);
257
258 let state = self.state.get_mut();
262 if previous.is_some() {
263 state.blobs[state.cursor].modified.insert(key.clone());
264 state.blobs[1 - state.cursor].modified.insert(key);
265 } else {
266 state.key_order_changed = state.next_version;
267 }
268 let _ = self.keys.try_set(self.map.len());
269 previous
270 }
271
272 pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
277 self.put(key, value);
278 self.sync().await
279 }
280
281 pub fn upsert(&mut self, key: K, f: impl FnOnce(&mut V))
283 where
284 V: Default,
285 {
286 if let Some(value) = self.get_mut(&key) {
287 f(value);
289 } else {
290 let mut value = V::default();
292 f(&mut value);
293 self.put(key, value);
294 }
295 }
296
297 pub async fn upsert_sync(&mut self, key: K, f: impl FnOnce(&mut V)) -> Result<(), Error>
299 where
300 V: Default,
301 {
302 self.upsert(key, f);
303 self.sync().await
304 }
305
306 pub fn remove(&mut self, key: &K) -> Option<V> {
308 let past = self.map.remove(key);
310
311 if past.is_some() {
313 let state = self.state.get_mut();
314 state.key_order_changed = state.next_version;
315 }
316 let _ = self.keys.try_set(self.map.len());
317
318 past
319 }
320
321 pub fn keys(&self) -> impl Iterator<Item = &K> {
323 self.map.keys()
324 }
325
326 pub fn retain(&mut self, mut f: impl FnMut(&K, &V) -> bool) {
328 let old_len = self.map.len();
330 self.map.retain(|k, v| f(k, v));
331 let new_len = self.map.len();
332
333 if new_len != old_len {
335 let state = self.state.get_mut();
336 state.key_order_changed = state.next_version;
337 let _ = self.keys.try_set(self.map.len());
338 }
339 }
340
341 pub async fn sync(&self) -> Result<(), Error> {
343 let mut state = self.state.lock().await;
346
347 let cursor = state.cursor;
349 let next_version = state.next_version;
350 let key_order_changed = state.key_order_changed;
351
352 let past_version = state.blobs[cursor].version;
358 let next_next_version = next_version.checked_add(1).expect("version overflow");
359
360 let target_cursor = 1 - cursor;
362
363 state.cursor = target_cursor;
365 state.next_version = next_next_version;
366
367 let target = &mut state.blobs[target_cursor];
369
370 let mut overwrite = true;
373 let mut writes = vec![];
374 if key_order_changed < past_version {
375 let write_capacity = target.modified.len() + 2;
376 writes.reserve(write_capacity);
377 for key in target.modified.iter() {
378 let info = target.lengths.get(key).expect("key must exist");
379 let new_value = self.map.get(key).expect("key must exist");
380 if info.length == new_value.encode_size() {
381 let encoded = new_value.encode_mut();
383 target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
384 writes.push(target.blob.write_at(info.start as u64, encoded));
385 } else {
386 overwrite = false;
388 break;
389 }
390 }
391 } else {
392 overwrite = false;
394 }
395
396 target.modified.clear();
398
399 if overwrite {
401 let version = next_version.to_be_bytes();
403 target.data[0..8].copy_from_slice(&version);
404 writes.push(target.blob.write_at(0, version.as_slice().into()));
405
406 let checksum_index = target.data.len() - crc32::Digest::SIZE;
408 let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes();
409 target.data[checksum_index..].copy_from_slice(&checksum);
410 writes.push(
411 target
412 .blob
413 .write_at(checksum_index as u64, checksum.as_slice().into()),
414 );
415
416 try_join_all(writes).await?;
418 target.blob.sync().await?;
419
420 target.version = next_version;
422 self.sync_overwrites.inc();
423 return Ok(());
424 }
425
426 let mut lengths = HashMap::new();
428 let mut next_data = Vec::with_capacity(target.data.len());
429 next_data.put_u64(next_version);
430
431 for (key, value) in &self.map {
433 key.write(&mut next_data);
434 let start = next_data.len();
435 value.write(&mut next_data);
436 lengths.insert(key.clone(), Info::new(start, value.encode_size()));
437 }
438 next_data.put_u32(Crc32::checksum(&next_data[..]));
439
440 if next_data.len() < target.data.len() {
442 target.blob.write_at(0, next_data.clone()).await?;
443 target.blob.resize(next_data.len() as u64).await?;
444 target.blob.sync().await?;
445 } else {
446 target.blob.write_at_sync(0, next_data.clone()).await?;
449 }
450
451 target.version = next_version;
453 target.lengths = lengths;
454 target.data = next_data;
455
456 self.sync_rewrites.inc();
457 Ok(())
458 }
459
460 pub async fn destroy(self) -> Result<(), Error> {
462 let state = self.state.into_inner();
463 for (i, wrapper) in state.blobs.into_iter().enumerate() {
464 drop(wrapper.blob);
465 self.context
466 .remove(&self.partition, Some(BLOB_NAMES[i]))
467 .await?;
468 debug!(blob = i, "destroyed blob");
469 }
470 match self.context.remove(&self.partition, None).await {
471 Ok(()) => {}
472 Err(RError::PartitionMissing(_)) => {
473 }
475 Err(err) => return Err(Error::Runtime(err)),
476 }
477 Ok(())
478 }
479}