Skip to main content

commonware_storage/metadata/
storage.rs

1use super::{Config, Error};
2use crate::Context;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_cryptography::{crc32, Crc32};
5use commonware_runtime::{telemetry::metrics::status::GaugeExt, Blob, BufMut, Error as RError};
6use commonware_utils::{sync::AsyncMutex, Span};
7use futures::future::try_join_all;
8use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
9use std::collections::{BTreeMap, BTreeSet, HashMap};
10use tracing::{debug, warn};
11
12/// The names of the two blobs that store metadata.
13const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
14
15/// Information about a value in a [Wrapper].
16struct Info {
17    start: usize,
18    length: usize,
19}
20
21impl Info {
22    /// Create a new [Info].
23    const fn new(start: usize, length: usize) -> Self {
24        Self { start, length }
25    }
26}
27
28/// One of the two wrappers that store metadata.
29struct Wrapper<B: Blob, K: Span> {
30    blob: B,
31    version: u64,
32    lengths: HashMap<K, Info>,
33    modified: BTreeSet<K>,
34    data: Vec<u8>,
35}
36
37impl<B: Blob, K: Span> Wrapper<B, K> {
38    /// Create a new [Wrapper].
39    const fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
40        Self {
41            blob,
42            version,
43            lengths,
44            modified: BTreeSet::new(),
45            data,
46        }
47    }
48
49    /// Create a new empty [Wrapper].
50    fn empty(blob: B) -> Self {
51        Self {
52            blob,
53            version: 0,
54            lengths: HashMap::new(),
55            modified: BTreeSet::new(),
56            data: Vec::new(),
57        }
58    }
59}
60
61/// State used during [Metadata::sync] operations.
62struct State<B: Blob, K: Span> {
63    cursor: usize,
64    next_version: u64,
65    key_order_changed: u64,
66    blobs: [Wrapper<B, K>; 2],
67}
68
69/// Implementation of [Metadata] storage.
70pub struct Metadata<E: Context, K: Span, V: Codec> {
71    context: E,
72
73    map: BTreeMap<K, V>,
74    partition: String,
75    state: AsyncMutex<State<E::Blob, K>>,
76
77    sync_overwrites: Counter,
78    sync_rewrites: Counter,
79    keys: Gauge,
80}
81
82impl<E: Context, K: Span, V: Codec> Metadata<E, K, V> {
83    /// Initialize a new [Metadata] instance.
84    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
85        // Open dedicated blobs
86        let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
87        let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
88
89        // Find latest blob (check which includes a hash of the other)
90        let (left_map, left_wrapper) =
91            Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
92        let (right_map, right_wrapper) =
93            Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
94
95        // Choose latest blob
96        let mut map = left_map;
97        let mut cursor = 0;
98        let mut version = left_wrapper.version;
99        if right_wrapper.version > left_wrapper.version {
100            cursor = 1;
101            map = right_map;
102            version = right_wrapper.version;
103        }
104        let next_version = version.checked_add(1).expect("version overflow");
105
106        // Create metrics
107        let sync_rewrites = Counter::default();
108        let sync_overwrites = Counter::default();
109        let keys = Gauge::default();
110        context.register(
111            "sync_rewrites",
112            "number of syncs that rewrote all data",
113            sync_rewrites.clone(),
114        );
115        context.register(
116            "sync_overwrites",
117            "number of syncs that modified existing data",
118            sync_overwrites.clone(),
119        );
120        context.register("keys", "number of tracked keys", keys.clone());
121
122        // Return metadata
123        let _ = keys.try_set(map.len());
124        Ok(Self {
125            context,
126
127            map,
128            partition: cfg.partition,
129            state: AsyncMutex::new(State {
130                cursor,
131                next_version,
132                key_order_changed: next_version, // rewrite on startup because we don't have a diff record
133                blobs: [left_wrapper, right_wrapper],
134            }),
135
136            sync_rewrites,
137            sync_overwrites,
138            keys,
139        })
140    }
141
142    async fn load(
143        codec_config: &V::Cfg,
144        index: usize,
145        blob: E::Blob,
146        len: u64,
147    ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
148        // Get blob length
149        if len == 0 {
150            // Empty blob
151            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
152        }
153
154        // Read blob
155        let len: usize = len.try_into().expect("blob too large for platform");
156        let buf = blob.read_at(0, len).await?.coalesce();
157
158        // Verify integrity.
159        //
160        // 8 bytes for version + 4 bytes for checksum.
161        if buf.len() < 8 + crc32::Digest::SIZE {
162            // Truncate and return none
163            warn!(
164                blob = index,
165                len = buf.len(),
166                "blob is too short: truncating"
167            );
168            blob.resize(0).await?;
169            blob.sync().await?;
170            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
171        }
172
173        // Extract checksum
174        let checksum_index = buf.len() - crc32::Digest::SIZE;
175        let stored_checksum =
176            u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
177        let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]);
178        if stored_checksum != computed_checksum {
179            // Truncate and return none
180            warn!(
181                blob = index,
182                stored = stored_checksum,
183                computed = computed_checksum,
184                "checksum mismatch: truncating"
185            );
186            blob.resize(0).await?;
187            blob.sync().await?;
188            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
189        }
190
191        // Get parent
192        let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
193
194        // Extract data
195        //
196        // If the checksum is correct, we assume data is correctly packed and we don't perform
197        // length checks on the cursor.
198        let mut data = BTreeMap::new();
199        let mut lengths = HashMap::new();
200        let mut cursor = u64::SIZE;
201        while cursor < checksum_index {
202            // Read key
203            let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
204                .expect("unable to read key from blob");
205            cursor += key.encode_size();
206
207            // Read value
208            let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
209                .expect("unable to read value from blob");
210            lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
211            cursor += value.encode_size();
212            data.insert(key, value);
213        }
214
215        // Return info
216        Ok((
217            data,
218            Wrapper::new(blob, version, lengths, buf.freeze().into()),
219        ))
220    }
221
222    /// Get a value from [Metadata] (if it exists).
223    pub fn get(&self, key: &K) -> Option<&V> {
224        self.map.get(key)
225    }
226
227    /// Get a mutable reference to a value from [Metadata] (if it exists).
228    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
229        // Get value
230        let value = self.map.get_mut(key)?;
231
232        // Mark key as modified.
233        //
234        // We need to mark both blobs as modified because we may need to update both files.
235        let state = self.state.get_mut();
236        state.blobs[state.cursor].modified.insert(key.clone());
237        state.blobs[1 - state.cursor].modified.insert(key.clone());
238
239        Some(value)
240    }
241
242    /// Clear all values from [Metadata]. The new state will not be persisted until [Self::sync] is
243    /// called.
244    pub fn clear(&mut self) {
245        // Clear map
246        self.map.clear();
247
248        // Mark key order as changed
249        let state = self.state.get_mut();
250        state.key_order_changed = state.next_version;
251        self.keys.set(0);
252    }
253
254    /// Put a value into [Metadata].
255    ///
256    /// If the key already exists, the value will be overwritten and the previous
257    /// value is returned. The value stored will not be persisted until [Self::sync]
258    /// is called.
259    pub fn put(&mut self, key: K, value: V) -> Option<V> {
260        // Insert value, getting previous value if it existed
261        let previous = self.map.insert(key.clone(), value);
262
263        // Mark key as modified.
264        //
265        // We need to mark both blobs as modified because we may need to update both files.
266        let state = self.state.get_mut();
267        if previous.is_some() {
268            state.blobs[state.cursor].modified.insert(key.clone());
269            state.blobs[1 - state.cursor].modified.insert(key);
270        } else {
271            state.key_order_changed = state.next_version;
272        }
273        let _ = self.keys.try_set(self.map.len());
274        previous
275    }
276
277    /// Perform a [Self::put] and [Self::sync] in a single operation.
278    pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
279        self.put(key, value);
280        self.sync().await
281    }
282
283    /// Update (or insert) a value in [Metadata] using a closure.
284    pub fn upsert(&mut self, key: K, f: impl FnOnce(&mut V))
285    where
286        V: Default,
287    {
288        if let Some(value) = self.get_mut(&key) {
289            // Update existing value
290            f(value);
291        } else {
292            // Insert new value
293            let mut value = V::default();
294            f(&mut value);
295            self.put(key, value);
296        }
297    }
298
299    /// Update (or insert) a value in [Metadata] using a closure and sync immediately.
300    pub async fn upsert_sync(&mut self, key: K, f: impl FnOnce(&mut V)) -> Result<(), Error>
301    where
302        V: Default,
303    {
304        self.upsert(key, f);
305        self.sync().await
306    }
307
308    /// Remove a value from [Metadata] (if it exists).
309    pub fn remove(&mut self, key: &K) -> Option<V> {
310        // Get value
311        let past = self.map.remove(key);
312
313        // Mark key as modified.
314        if past.is_some() {
315            let state = self.state.get_mut();
316            state.key_order_changed = state.next_version;
317        }
318        let _ = self.keys.try_set(self.map.len());
319
320        past
321    }
322
323    /// Iterate over all keys in metadata.
324    pub fn keys(&self) -> impl Iterator<Item = &K> {
325        self.map.keys()
326    }
327
328    /// Retain only the keys that satisfy the predicate.
329    pub fn retain(&mut self, mut f: impl FnMut(&K, &V) -> bool) {
330        // Retain only keys that satisfy the predicate
331        let old_len = self.map.len();
332        self.map.retain(|k, v| f(k, v));
333        let new_len = self.map.len();
334
335        // If the number of keys has changed, mark the key order as changed
336        if new_len != old_len {
337            let state = self.state.get_mut();
338            state.key_order_changed = state.next_version;
339            let _ = self.keys.try_set(self.map.len());
340        }
341    }
342
343    /// Atomically commit the current state of [Metadata].
344    pub async fn sync(&self) -> Result<(), Error> {
345        // Acquire lock on sync state which will prevent concurrent sync calls while not blocking
346        // reads from the metadata map.
347        let mut state = self.state.lock().await;
348
349        // Extract values we need
350        let cursor = state.cursor;
351        let next_version = state.next_version;
352        let key_order_changed = state.key_order_changed;
353
354        // Compute next version.
355        //
356        // While it is possible that extremely high-frequency updates to metadata could cause an
357        // eventual overflow of version, syncing once per millisecond would overflow in 584,942,417
358        // years.
359        let past_version = state.blobs[cursor].version;
360        let next_next_version = next_version.checked_add(1).expect("version overflow");
361
362        // Get target blob (the one we will modify)
363        let target_cursor = 1 - cursor;
364
365        // Update the state.
366        state.cursor = target_cursor;
367        state.next_version = next_next_version;
368
369        // Get a mutable reference to the target blob.
370        let target = &mut state.blobs[target_cursor];
371
372        // Determine if we can overwrite existing data in place, and prepare the list of data to
373        // write in that event.
374        let mut overwrite = true;
375        let mut writes = vec![];
376        if key_order_changed < past_version {
377            let write_capacity = target.modified.len() + 2;
378            writes.reserve(write_capacity);
379            for key in target.modified.iter() {
380                let info = target.lengths.get(key).expect("key must exist");
381                let new_value = self.map.get(key).expect("key must exist");
382                if info.length == new_value.encode_size() {
383                    // Overwrite existing value
384                    let encoded = new_value.encode_mut();
385                    target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
386                    writes.push(target.blob.write_at(info.start as u64, encoded));
387                } else {
388                    // Rewrite all
389                    overwrite = false;
390                    break;
391                }
392            }
393        } else {
394            // If the key order has changed, we need to rewrite all data
395            overwrite = false;
396        }
397
398        // Clear modified keys to avoid writing the same data
399        target.modified.clear();
400
401        // Overwrite existing data
402        if overwrite {
403            // Update version
404            let version = next_version.to_be_bytes();
405            target.data[0..8].copy_from_slice(&version);
406            writes.push(target.blob.write_at(0, version.as_slice().into()));
407
408            // Update checksum
409            let checksum_index = target.data.len() - crc32::Digest::SIZE;
410            let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes();
411            target.data[checksum_index..].copy_from_slice(&checksum);
412            writes.push(
413                target
414                    .blob
415                    .write_at(checksum_index as u64, checksum.as_slice().into()),
416            );
417
418            // Persist changes
419            try_join_all(writes).await?;
420            target.blob.sync().await?;
421
422            // Update state
423            target.version = next_version;
424            self.sync_overwrites.inc();
425            return Ok(());
426        }
427
428        // Since we can't overwrite in place, we rewrite the entire blob.
429        let mut lengths = HashMap::new();
430        let mut next_data = Vec::with_capacity(target.data.len());
431        next_data.put_u64(next_version);
432
433        // Build new data
434        for (key, value) in &self.map {
435            key.write(&mut next_data);
436            let start = next_data.len();
437            value.write(&mut next_data);
438            lengths.insert(key.clone(), Info::new(start, value.encode_size()));
439        }
440        next_data.put_u32(Crc32::checksum(&next_data[..]));
441
442        // Write and persist the new data
443        target.blob.write_at(0, next_data.clone()).await?;
444        if next_data.len() < target.data.len() {
445            target.blob.resize(next_data.len() as u64).await?;
446        }
447        target.blob.sync().await?;
448
449        // Update blob state
450        target.version = next_version;
451        target.lengths = lengths;
452        target.data = next_data;
453
454        self.sync_rewrites.inc();
455        Ok(())
456    }
457
458    /// Remove the underlying blobs for this [Metadata].
459    pub async fn destroy(self) -> Result<(), Error> {
460        let state = self.state.into_inner();
461        for (i, wrapper) in state.blobs.into_iter().enumerate() {
462            drop(wrapper.blob);
463            self.context
464                .remove(&self.partition, Some(BLOB_NAMES[i]))
465                .await?;
466            debug!(blob = i, "destroyed blob");
467        }
468        match self.context.remove(&self.partition, None).await {
469            Ok(()) => {}
470            Err(RError::PartitionMissing(_)) => {
471                // Partition already removed or never existed.
472            }
473            Err(err) => return Err(Error::Runtime(err)),
474        }
475        Ok(())
476    }
477}