Skip to main content

commonware_storage/metadata/
storage.rs

1use super::{Config, Error};
2use crate::Context;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_cryptography::{crc32, Crc32};
5use commonware_runtime::{
6    telemetry::metrics::{Counter, Gauge, GaugeExt, MetricsExt as _},
7    Blob, BufMut, Error as RError,
8};
9use commonware_utils::{sync::AsyncMutex, Span};
10use futures::future::try_join_all;
11use std::collections::{BTreeMap, BTreeSet, HashMap};
12use tracing::{debug, warn};
13
14/// The names of the two blobs that store metadata.
15const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
16
17/// Information about a value in a [Wrapper].
18struct Info {
19    start: usize,
20    length: usize,
21}
22
23impl Info {
24    /// Create a new [Info].
25    const fn new(start: usize, length: usize) -> Self {
26        Self { start, length }
27    }
28}
29
30/// One of the two wrappers that store metadata.
31struct Wrapper<B: Blob, K: Span> {
32    blob: B,
33    version: u64,
34    lengths: HashMap<K, Info>,
35    modified: BTreeSet<K>,
36    data: Vec<u8>,
37}
38
39impl<B: Blob, K: Span> Wrapper<B, K> {
40    /// Create a new [Wrapper].
41    const fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
42        Self {
43            blob,
44            version,
45            lengths,
46            modified: BTreeSet::new(),
47            data,
48        }
49    }
50
51    /// Create a new empty [Wrapper].
52    fn empty(blob: B) -> Self {
53        Self {
54            blob,
55            version: 0,
56            lengths: HashMap::new(),
57            modified: BTreeSet::new(),
58            data: Vec::new(),
59        }
60    }
61}
62
63/// State used during [Metadata::sync] operations.
64struct State<B: Blob, K: Span> {
65    cursor: usize,
66    next_version: u64,
67    key_order_changed: u64,
68    blobs: [Wrapper<B, K>; 2],
69}
70
71/// Implementation of [Metadata] storage.
72pub struct Metadata<E: Context, K: Span, V: Codec> {
73    context: E,
74
75    map: BTreeMap<K, V>,
76    partition: String,
77    state: AsyncMutex<State<E::Blob, K>>,
78
79    sync_overwrites: Counter,
80    sync_rewrites: Counter,
81    keys: Gauge,
82}
83
84impl<E: Context, K: Span, V: Codec> Metadata<E, K, V> {
85    /// Initialize a new [Metadata] instance.
86    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
87        // Open dedicated blobs
88        let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
89        let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
90
91        // Find latest blob (check which includes a hash of the other)
92        let (left_map, left_wrapper) =
93            Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
94        let (right_map, right_wrapper) =
95            Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
96
97        // Choose latest blob
98        let mut map = left_map;
99        let mut cursor = 0;
100        let mut version = left_wrapper.version;
101        if right_wrapper.version > left_wrapper.version {
102            cursor = 1;
103            map = right_map;
104            version = right_wrapper.version;
105        }
106        let next_version = version.checked_add(1).expect("version overflow");
107
108        // Create metrics
109        let sync_rewrites =
110            context.counter("sync_rewrites", "number of syncs that rewrote all data");
111        let sync_overwrites = context.counter(
112            "sync_overwrites",
113            "number of syncs that modified existing data",
114        );
115        let keys = context.gauge("keys", "number of tracked keys");
116
117        // Return metadata
118        let _ = keys.try_set(map.len());
119        Ok(Self {
120            context,
121
122            map,
123            partition: cfg.partition,
124            state: AsyncMutex::new(State {
125                cursor,
126                next_version,
127                key_order_changed: next_version, // rewrite on startup because we don't have a diff record
128                blobs: [left_wrapper, right_wrapper],
129            }),
130
131            sync_rewrites,
132            sync_overwrites,
133            keys,
134        })
135    }
136
137    async fn load(
138        codec_config: &V::Cfg,
139        index: usize,
140        blob: E::Blob,
141        len: u64,
142    ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
143        // Get blob length
144        if len == 0 {
145            // Empty blob
146            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
147        }
148
149        // Read blob
150        let len: usize = len.try_into().expect("blob too large for platform");
151        let buf = blob.read_at(0, len).await?.coalesce();
152
153        // Verify integrity.
154        //
155        // 8 bytes for version + 4 bytes for checksum.
156        if buf.len() < 8 + crc32::Digest::SIZE {
157            // Truncate and return none
158            warn!(
159                blob = index,
160                len = buf.len(),
161                "blob is too short: truncating"
162            );
163            blob.resize(0).await?;
164            blob.sync().await?;
165            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
166        }
167
168        // Extract checksum
169        let checksum_index = buf.len() - crc32::Digest::SIZE;
170        let stored_checksum =
171            u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
172        let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]);
173        if stored_checksum != computed_checksum {
174            // Truncate and return none
175            warn!(
176                blob = index,
177                stored = stored_checksum,
178                computed = computed_checksum,
179                "checksum mismatch: truncating"
180            );
181            blob.resize(0).await?;
182            blob.sync().await?;
183            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
184        }
185
186        // Get parent
187        let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
188
189        // Extract data
190        //
191        // If the checksum is correct, we assume data is correctly packed and we don't perform
192        // length checks on the cursor.
193        let mut data = BTreeMap::new();
194        let mut lengths = HashMap::new();
195        let mut cursor = u64::SIZE;
196        while cursor < checksum_index {
197            // Read key
198            let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
199                .expect("unable to read key from blob");
200            cursor += key.encode_size();
201
202            // Read value
203            let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
204                .expect("unable to read value from blob");
205            lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
206            cursor += value.encode_size();
207            data.insert(key, value);
208        }
209
210        // Return info
211        Ok((
212            data,
213            Wrapper::new(blob, version, lengths, buf.freeze().into()),
214        ))
215    }
216
217    /// Get a value from [Metadata] (if it exists).
218    pub fn get(&self, key: &K) -> Option<&V> {
219        self.map.get(key)
220    }
221
222    /// Get a mutable reference to a value from [Metadata] (if it exists).
223    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
224        // Get value
225        let value = self.map.get_mut(key)?;
226
227        // Mark key as modified.
228        //
229        // We need to mark both blobs as modified because we may need to update both files.
230        let state = self.state.get_mut();
231        state.blobs[state.cursor].modified.insert(key.clone());
232        state.blobs[1 - state.cursor].modified.insert(key.clone());
233
234        Some(value)
235    }
236
237    /// Clear all values from [Metadata]. The new state will not be persisted until [Self::sync] is
238    /// called.
239    pub fn clear(&mut self) {
240        // Clear map
241        self.map.clear();
242
243        // Mark key order as changed
244        let state = self.state.get_mut();
245        state.key_order_changed = state.next_version;
246        self.keys.set(0);
247    }
248
249    /// Put a value into [Metadata].
250    ///
251    /// If the key already exists, the value will be overwritten and the previous
252    /// value is returned. The value stored will not be persisted until [Self::sync]
253    /// is called.
254    pub fn put(&mut self, key: K, value: V) -> Option<V> {
255        // Insert value, getting previous value if it existed
256        let previous = self.map.insert(key.clone(), value);
257
258        // Mark key as modified.
259        //
260        // We need to mark both blobs as modified because we may need to update both files.
261        let state = self.state.get_mut();
262        if previous.is_some() {
263            state.blobs[state.cursor].modified.insert(key.clone());
264            state.blobs[1 - state.cursor].modified.insert(key);
265        } else {
266            state.key_order_changed = state.next_version;
267        }
268        let _ = self.keys.try_set(self.map.len());
269        previous
270    }
271
272    /// Perform a [Self::put] and [Self::sync] in a single operation.
273    ///
274    /// Like calling [Self::sync] directly, this commits all pending metadata
275    /// changes, not just the provided key.
276    pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
277        self.put(key, value);
278        self.sync().await
279    }
280
281    /// Update (or insert) a value in [Metadata] using a closure.
282    pub fn upsert(&mut self, key: K, f: impl FnOnce(&mut V))
283    where
284        V: Default,
285    {
286        if let Some(value) = self.get_mut(&key) {
287            // Update existing value
288            f(value);
289        } else {
290            // Insert new value
291            let mut value = V::default();
292            f(&mut value);
293            self.put(key, value);
294        }
295    }
296
297    /// Update (or insert) a value in [Metadata] using a closure and sync immediately.
298    pub async fn upsert_sync(&mut self, key: K, f: impl FnOnce(&mut V)) -> Result<(), Error>
299    where
300        V: Default,
301    {
302        self.upsert(key, f);
303        self.sync().await
304    }
305
306    /// Remove a value from [Metadata] (if it exists).
307    pub fn remove(&mut self, key: &K) -> Option<V> {
308        // Get value
309        let past = self.map.remove(key);
310
311        // Mark key as modified.
312        if past.is_some() {
313            let state = self.state.get_mut();
314            state.key_order_changed = state.next_version;
315        }
316        let _ = self.keys.try_set(self.map.len());
317
318        past
319    }
320
321    /// Iterate over all keys in metadata.
322    pub fn keys(&self) -> impl Iterator<Item = &K> {
323        self.map.keys()
324    }
325
326    /// Retain only the keys that satisfy the predicate.
327    pub fn retain(&mut self, mut f: impl FnMut(&K, &V) -> bool) {
328        // Retain only keys that satisfy the predicate
329        let old_len = self.map.len();
330        self.map.retain(|k, v| f(k, v));
331        let new_len = self.map.len();
332
333        // If the number of keys has changed, mark the key order as changed
334        if new_len != old_len {
335            let state = self.state.get_mut();
336            state.key_order_changed = state.next_version;
337            let _ = self.keys.try_set(self.map.len());
338        }
339    }
340
341    /// Atomically commit the current state of [Metadata].
342    pub async fn sync(&self) -> Result<(), Error> {
343        // Acquire lock on sync state which will prevent concurrent sync calls while not blocking
344        // reads from the metadata map.
345        let mut state = self.state.lock().await;
346
347        // Extract values we need
348        let cursor = state.cursor;
349        let next_version = state.next_version;
350        let key_order_changed = state.key_order_changed;
351
352        // Compute next version.
353        //
354        // While it is possible that extremely high-frequency updates to metadata could cause an
355        // eventual overflow of version, syncing once per millisecond would overflow in 584,942,417
356        // years.
357        let past_version = state.blobs[cursor].version;
358        let next_next_version = next_version.checked_add(1).expect("version overflow");
359
360        // Get target blob (the one we will modify)
361        let target_cursor = 1 - cursor;
362
363        // Update the state.
364        state.cursor = target_cursor;
365        state.next_version = next_next_version;
366
367        // Get a mutable reference to the target blob.
368        let target = &mut state.blobs[target_cursor];
369
370        // Determine if we can overwrite existing data in place, and prepare the list of data to
371        // write in that event.
372        let mut overwrite = true;
373        let mut writes = vec![];
374        if key_order_changed < past_version {
375            let write_capacity = target.modified.len() + 2;
376            writes.reserve(write_capacity);
377            for key in target.modified.iter() {
378                let info = target.lengths.get(key).expect("key must exist");
379                let new_value = self.map.get(key).expect("key must exist");
380                if info.length == new_value.encode_size() {
381                    // Overwrite existing value
382                    let encoded = new_value.encode_mut();
383                    target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
384                    writes.push(target.blob.write_at(info.start as u64, encoded));
385                } else {
386                    // Rewrite all
387                    overwrite = false;
388                    break;
389                }
390            }
391        } else {
392            // If the key order has changed, we need to rewrite all data
393            overwrite = false;
394        }
395
396        // Clear modified keys to avoid writing the same data
397        target.modified.clear();
398
399        // Overwrite existing data
400        if overwrite {
401            // Update version
402            let version = next_version.to_be_bytes();
403            target.data[0..8].copy_from_slice(&version);
404            writes.push(target.blob.write_at(0, version.as_slice().into()));
405
406            // Update checksum
407            let checksum_index = target.data.len() - crc32::Digest::SIZE;
408            let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes();
409            target.data[checksum_index..].copy_from_slice(&checksum);
410            writes.push(
411                target
412                    .blob
413                    .write_at(checksum_index as u64, checksum.as_slice().into()),
414            );
415
416            // Persist changes
417            try_join_all(writes).await?;
418            target.blob.sync().await?;
419
420            // Update state
421            target.version = next_version;
422            self.sync_overwrites.inc();
423            return Ok(());
424        }
425
426        // Since we can't overwrite in place, we rewrite the entire blob.
427        let mut lengths = HashMap::new();
428        let mut next_data = Vec::with_capacity(target.data.len());
429        next_data.put_u64(next_version);
430
431        // Build new data
432        for (key, value) in &self.map {
433            key.write(&mut next_data);
434            let start = next_data.len();
435            value.write(&mut next_data);
436            lengths.insert(key.clone(), Info::new(start, value.encode_size()));
437        }
438        next_data.put_u32(Crc32::checksum(&next_data[..]));
439
440        // Shrinking rewrites must also persist the resize, so they need a full sync.
441        if next_data.len() < target.data.len() {
442            target.blob.write_at(0, next_data.clone()).await?;
443            target.blob.resize(next_data.len() as u64).await?;
444            target.blob.sync().await?;
445        } else {
446            // Non-shrinking rewrites are a single write and can use range-scoped
447            // durability.
448            target.blob.write_at_sync(0, next_data.clone()).await?;
449        }
450
451        // Update blob state
452        target.version = next_version;
453        target.lengths = lengths;
454        target.data = next_data;
455
456        self.sync_rewrites.inc();
457        Ok(())
458    }
459
460    /// Remove the underlying blobs for this [Metadata].
461    pub async fn destroy(self) -> Result<(), Error> {
462        let state = self.state.into_inner();
463        for (i, wrapper) in state.blobs.into_iter().enumerate() {
464            drop(wrapper.blob);
465            self.context
466                .remove(&self.partition, Some(BLOB_NAMES[i]))
467                .await?;
468            debug!(blob = i, "destroyed blob");
469        }
470        match self.context.remove(&self.partition, None).await {
471            Ok(()) => {}
472            Err(RError::PartitionMissing(_)) => {
473                // Partition already removed or never existed.
474            }
475            Err(err) => return Err(Error::Runtime(err)),
476        }
477        Ok(())
478    }
479}