commonware_storage/metadata/
storage.rs1use super::{Config, Error};
2use bytes::BufMut;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_runtime::{Blob, Clock, Error as RError, Metrics, Storage};
5use commonware_utils::Array;
6use futures::future::try_join_all;
7use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
8use std::collections::{BTreeMap, BTreeSet, HashMap};
9use tracing::{debug, warn};
10
11const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
13
14struct Info {
16 start: usize,
17 length: usize,
18}
19
20impl Info {
21 fn new(start: usize, length: usize) -> Self {
23 Self { start, length }
24 }
25}
26
27struct Wrapper<B: Blob, K: Array> {
29 blob: B,
30 version: u64,
31 lengths: HashMap<K, Info>,
32 modified: BTreeSet<K>,
33 data: Vec<u8>,
34}
35
36impl<B: Blob, K: Array> Wrapper<B, K> {
37 fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
39 Self {
40 blob,
41 version,
42 lengths,
43 modified: BTreeSet::new(),
44 data,
45 }
46 }
47
48 fn empty(blob: B) -> Self {
50 Self {
51 blob,
52 version: 0,
53 lengths: HashMap::new(),
54 modified: BTreeSet::new(),
55 data: Vec::new(),
56 }
57 }
58}
59
60pub struct Metadata<E: Clock + Storage + Metrics, K: Array, V: Codec> {
62 context: E,
63
64 map: BTreeMap<K, V>,
65 cursor: usize,
66 key_order_changed: u64,
67 next_version: u64,
68 partition: String,
69 blobs: [Wrapper<E::Blob, K>; 2],
70
71 sync_overwrites: Counter,
72 sync_rewrites: Counter,
73 keys: Gauge,
74}
75
76impl<E: Clock + Storage + Metrics, K: Array, V: Codec> Metadata<E, K, V> {
77 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
79 let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
81 let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
82
83 let (left_map, left_wrapper) =
85 Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
86 let (right_map, right_wrapper) =
87 Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
88
89 let mut map = left_map;
91 let mut cursor = 0;
92 let mut version = left_wrapper.version;
93 if right_wrapper.version > left_wrapper.version {
94 cursor = 1;
95 map = right_map;
96 version = right_wrapper.version;
97 }
98 let next_version = version.checked_add(1).expect("version overflow");
99
100 let sync_rewrites = Counter::default();
102 let sync_overwrites = Counter::default();
103 let keys = Gauge::default();
104 context.register(
105 "sync_rewrites",
106 "number of syncs that rewrote all data",
107 sync_rewrites.clone(),
108 );
109 context.register(
110 "sync_overwrites",
111 "number of syncs that modified existing data",
112 sync_overwrites.clone(),
113 );
114 context.register("keys", "number of tracked keys", keys.clone());
115
116 keys.set(map.len() as i64);
118 Ok(Self {
119 context,
120
121 map,
122 cursor,
123 key_order_changed: next_version, next_version,
125 partition: cfg.partition,
126 blobs: [left_wrapper, right_wrapper],
127
128 sync_rewrites,
129 sync_overwrites,
130 keys,
131 })
132 }
133
134 async fn load(
135 codec_config: &V::Cfg,
136 index: usize,
137 blob: E::Blob,
138 len: u64,
139 ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
140 if len == 0 {
142 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
144 }
145
146 let len = len.try_into().map_err(|_| Error::BlobTooLarge(len))?;
148 let buf = blob.read_at(vec![0u8; len], 0).await?;
149
150 if buf.len() < 12 {
154 warn!(
156 blob = index,
157 len = buf.len(),
158 "blob is too short: truncating"
159 );
160 blob.resize(0).await?;
161 blob.sync().await?;
162 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
163 }
164
165 let checksum_index = buf.len() - 4;
167 let stored_checksum =
168 u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
169 let computed_checksum = crc32fast::hash(&buf.as_ref()[..checksum_index]);
170 if stored_checksum != computed_checksum {
171 warn!(
173 blob = index,
174 stored = stored_checksum,
175 computed = computed_checksum,
176 "checksum mismatch: truncating"
177 );
178 blob.resize(0).await?;
179 blob.sync().await?;
180 return Ok((BTreeMap::new(), Wrapper::empty(blob)));
181 }
182
183 let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
185
186 let mut data = BTreeMap::new();
191 let mut lengths = HashMap::new();
192 let mut cursor = u64::SIZE;
193 while cursor < checksum_index {
194 let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
196 .expect("unable to read key from blob");
197 cursor += key.encode_size();
198
199 let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
201 .expect("unable to read value from blob");
202 lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
203 cursor += value.encode_size();
204 data.insert(key, value);
205 }
206
207 Ok((data, Wrapper::new(blob, version, lengths, buf.into())))
209 }
210
211 pub fn get(&self, key: &K) -> Option<&V> {
213 self.map.get(key)
214 }
215
216 pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
218 let value = self.map.get_mut(key)?;
220
221 self.blobs[self.cursor].modified.insert(key.clone());
225 self.blobs[1 - self.cursor].modified.insert(key.clone());
226
227 Some(value)
228 }
229
230 pub fn clear(&mut self) {
233 self.map.clear();
235
236 self.key_order_changed = self.next_version;
238 self.keys.set(0);
239 }
240
241 pub fn put(&mut self, key: K, value: V) {
246 let exists = self.map.insert(key.clone(), value).is_some();
248
249 if exists {
253 self.blobs[self.cursor].modified.insert(key.clone());
254 self.blobs[1 - self.cursor].modified.insert(key.clone());
255 } else {
256 self.key_order_changed = self.next_version;
257 }
258 self.keys.set(self.map.len() as i64);
259 }
260
261 pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
263 self.put(key, value);
264 self.sync().await
265 }
266
267 pub fn remove(&mut self, key: &K) -> Option<V> {
269 let past = self.map.remove(key);
271
272 if past.is_some() {
274 self.key_order_changed = self.next_version;
275 }
276 self.keys.set(self.map.len() as i64);
277
278 past
279 }
280
281 pub fn keys<'a>(&'a self, prefix: Option<&'a [u8]>) -> impl Iterator<Item = &'a K> + 'a {
285 self.map.keys().filter(move |key| {
286 if let Some(prefix_bytes) = prefix {
287 key.as_ref().starts_with(prefix_bytes)
288 } else {
289 true
290 }
291 })
292 }
293
294 pub fn remove_prefix(&mut self, prefix: &[u8]) {
296 self.map.retain(|key, _| !key.as_ref().starts_with(prefix));
298
299 self.key_order_changed = self.next_version;
301 self.keys.set(self.map.len() as i64);
302 }
303
304 pub async fn sync(&mut self) -> Result<(), Error> {
306 let past_version = self.blobs[self.cursor].version;
311 let next_next_version = self.next_version.checked_add(1).expect("version overflow");
312
313 let target_cursor = 1 - self.cursor;
315 let target = &mut self.blobs[target_cursor];
316
317 let mut overwrite = true;
319 let mut writes = Vec::with_capacity(target.modified.len());
320 if self.key_order_changed < past_version {
321 for key in target.modified.iter() {
322 let info = target.lengths.get(key).expect("key must exist");
323 let new_value = self.map.get(key).expect("key must exist");
324 if info.length == new_value.encode_size() {
325 let encoded = new_value.encode();
327 target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
328 writes.push(target.blob.write_at(encoded, info.start as u64));
329 } else {
330 overwrite = false;
332 break;
333 }
334 }
335 } else {
336 overwrite = false;
338 }
339
340 target.modified.clear();
342
343 if overwrite {
345 let version = self.next_version.to_be_bytes();
347 target.data[0..8].copy_from_slice(&version);
348 writes.push(target.blob.write_at(version.as_slice().into(), 0));
349
350 let checksum_index = target.data.len() - 4;
352 let checksum = crc32fast::hash(&target.data[..checksum_index]).to_be_bytes();
353 target.data[checksum_index..].copy_from_slice(&checksum);
354 writes.push(
355 target
356 .blob
357 .write_at(checksum.as_slice().into(), checksum_index as u64),
358 );
359
360 try_join_all(writes).await?;
362 target.blob.sync().await?;
363
364 target.version = self.next_version;
366 self.cursor = target_cursor;
367 self.next_version = next_next_version;
368 self.sync_overwrites.inc();
369 return Ok(());
370 }
371
372 let mut lengths = HashMap::new();
374 let mut next_data = Vec::with_capacity(target.data.len());
375 next_data.put_u64(self.next_version);
376 for (key, value) in &self.map {
377 key.write(&mut next_data);
378 let start = next_data.len();
379 value.write(&mut next_data);
380 lengths.insert(key.clone(), Info::new(start, value.encode_size()));
381 }
382 next_data.put_u32(crc32fast::hash(&next_data[..]));
383
384 target.blob.write_at(next_data.clone(), 0).await?;
386 if next_data.len() < target.data.len() {
387 target.blob.resize(next_data.len() as u64).await?;
388 }
389 target.blob.sync().await?;
390
391 target.version = self.next_version;
393 target.lengths = lengths;
394 target.data = next_data;
395 self.cursor = target_cursor;
396 self.next_version = next_next_version;
397 self.sync_rewrites.inc();
398 Ok(())
399 }
400
401 pub async fn close(mut self) -> Result<(), Error> {
403 self.sync().await?;
405 for wrapper in self.blobs.into_iter() {
406 wrapper.blob.sync().await?;
407 }
408 Ok(())
409 }
410
411 pub async fn destroy(self) -> Result<(), Error> {
413 for (i, wrapper) in self.blobs.into_iter().enumerate() {
414 drop(wrapper.blob);
415 self.context
416 .remove(&self.partition, Some(BLOB_NAMES[i]))
417 .await?;
418 debug!(blob = i, "destroyed blob");
419 }
420 match self.context.remove(&self.partition, None).await {
421 Ok(()) => {}
422 Err(RError::PartitionMissing(_)) => {
423 }
425 Err(err) => return Err(Error::Runtime(err)),
426 }
427 Ok(())
428 }
429}