commonware_storage/metadata/
storage.rs

1use super::{Config, Error};
2use bytes::BufMut;
3use commonware_codec::{Codec, FixedSize, ReadExt};
4use commonware_runtime::{Blob, Clock, Error as RError, Metrics, Storage};
5use commonware_utils::Array;
6use futures::future::try_join_all;
7use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
8use std::collections::{BTreeMap, BTreeSet, HashMap};
9use tracing::{debug, warn};
10
11/// The names of the two blobs that store metadata.
12const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];
13
14/// Information about a value in a [Wrapper].
15struct Info {
16    start: usize,
17    length: usize,
18}
19
20impl Info {
21    /// Create a new [Info].
22    fn new(start: usize, length: usize) -> Self {
23        Self { start, length }
24    }
25}
26
27/// One of the two wrappers that store metadata.
28struct Wrapper<B: Blob, K: Array> {
29    blob: B,
30    version: u64,
31    lengths: HashMap<K, Info>,
32    modified: BTreeSet<K>,
33    data: Vec<u8>,
34}
35
36impl<B: Blob, K: Array> Wrapper<B, K> {
37    /// Create a new [Wrapper].
38    fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
39        Self {
40            blob,
41            version,
42            lengths,
43            modified: BTreeSet::new(),
44            data,
45        }
46    }
47
48    /// Create a new empty [Wrapper].
49    fn empty(blob: B) -> Self {
50        Self {
51            blob,
52            version: 0,
53            lengths: HashMap::new(),
54            modified: BTreeSet::new(),
55            data: Vec::new(),
56        }
57    }
58}
59
60/// Implementation of [Metadata] storage.
61pub struct Metadata<E: Clock + Storage + Metrics, K: Array, V: Codec> {
62    context: E,
63
64    map: BTreeMap<K, V>,
65    cursor: usize,
66    key_order_changed: u64,
67    next_version: u64,
68    partition: String,
69    blobs: [Wrapper<E::Blob, K>; 2],
70
71    sync_overwrites: Counter,
72    sync_rewrites: Counter,
73    keys: Gauge,
74}
75
76impl<E: Clock + Storage + Metrics, K: Array, V: Codec> Metadata<E, K, V> {
77    /// Initialize a new [Metadata] instance.
78    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
79        // Open dedicated blobs
80        let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
81        let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;
82
83        // Find latest blob (check which includes a hash of the other)
84        let (left_map, left_wrapper) =
85            Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
86        let (right_map, right_wrapper) =
87            Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;
88
89        // Choose latest blob
90        let mut map = left_map;
91        let mut cursor = 0;
92        let mut version = left_wrapper.version;
93        if right_wrapper.version > left_wrapper.version {
94            cursor = 1;
95            map = right_map;
96            version = right_wrapper.version;
97        }
98        let next_version = version.checked_add(1).expect("version overflow");
99
100        // Create metrics
101        let sync_rewrites = Counter::default();
102        let sync_overwrites = Counter::default();
103        let keys = Gauge::default();
104        context.register(
105            "sync_rewrites",
106            "number of syncs that rewrote all data",
107            sync_rewrites.clone(),
108        );
109        context.register(
110            "sync_overwrites",
111            "number of syncs that modified existing data",
112            sync_overwrites.clone(),
113        );
114        context.register("keys", "number of tracked keys", keys.clone());
115
116        // Return metadata
117        keys.set(map.len() as i64);
118        Ok(Self {
119            context,
120
121            map,
122            cursor,
123            key_order_changed: next_version, // rewrite on startup because we don't have a diff record
124            next_version,
125            partition: cfg.partition,
126            blobs: [left_wrapper, right_wrapper],
127
128            sync_rewrites,
129            sync_overwrites,
130            keys,
131        })
132    }
133
134    async fn load(
135        codec_config: &V::Cfg,
136        index: usize,
137        blob: E::Blob,
138        len: u64,
139    ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
140        // Get blob length
141        if len == 0 {
142            // Empty blob
143            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
144        }
145
146        // Read blob
147        let len = len.try_into().map_err(|_| Error::BlobTooLarge(len))?;
148        let buf = blob.read_at(vec![0u8; len], 0).await?;
149
150        // Verify integrity.
151        //
152        // 8 bytes for version + 4 bytes for checksum.
153        if buf.len() < 12 {
154            // Truncate and return none
155            warn!(
156                blob = index,
157                len = buf.len(),
158                "blob is too short: truncating"
159            );
160            blob.resize(0).await?;
161            blob.sync().await?;
162            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
163        }
164
165        // Extract checksum
166        let checksum_index = buf.len() - 4;
167        let stored_checksum =
168            u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
169        let computed_checksum = crc32fast::hash(&buf.as_ref()[..checksum_index]);
170        if stored_checksum != computed_checksum {
171            // Truncate and return none
172            warn!(
173                blob = index,
174                stored = stored_checksum,
175                computed = computed_checksum,
176                "checksum mismatch: truncating"
177            );
178            blob.resize(0).await?;
179            blob.sync().await?;
180            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
181        }
182
183        // Get parent
184        let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());
185
186        // Extract data
187        //
188        // If the checksum is correct, we assume data is correctly packed and we don't perform
189        // length checks on the cursor.
190        let mut data = BTreeMap::new();
191        let mut lengths = HashMap::new();
192        let mut cursor = u64::SIZE;
193        while cursor < checksum_index {
194            // Read key
195            let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
196                .expect("unable to read key from blob");
197            cursor += key.encode_size();
198
199            // Read value
200            let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
201                .expect("unable to read value from blob");
202            lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
203            cursor += value.encode_size();
204            data.insert(key, value);
205        }
206
207        // Return info
208        Ok((data, Wrapper::new(blob, version, lengths, buf.into())))
209    }
210
211    /// Get a value from [Metadata] (if it exists).
212    pub fn get(&self, key: &K) -> Option<&V> {
213        self.map.get(key)
214    }
215
216    /// Get a mutable reference to a value from [Metadata] (if it exists).
217    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
218        // Get value
219        let value = self.map.get_mut(key)?;
220
221        // Mark key as modified.
222        //
223        // We need to mark both blobs as modified because we may need to update both files.
224        self.blobs[self.cursor].modified.insert(key.clone());
225        self.blobs[1 - self.cursor].modified.insert(key.clone());
226
227        Some(value)
228    }
229
230    /// Clear all values from [Metadata]. The new state will not be persisted until [Self::sync] is
231    /// called.
232    pub fn clear(&mut self) {
233        // Clear map
234        self.map.clear();
235
236        // Mark key order as changed
237        self.key_order_changed = self.next_version;
238        self.keys.set(0);
239    }
240
241    /// Put a value into [Metadata].
242    ///
243    /// If the key already exists, the value will be overwritten. The
244    /// value stored will not be persisted until [Self::sync] is called.
245    pub fn put(&mut self, key: K, value: V) {
246        // Get value
247        let exists = self.map.insert(key.clone(), value).is_some();
248
249        // Mark key as modified.
250        //
251        // We need to mark both blobs as modified because we may need to update both files.
252        if exists {
253            self.blobs[self.cursor].modified.insert(key.clone());
254            self.blobs[1 - self.cursor].modified.insert(key.clone());
255        } else {
256            self.key_order_changed = self.next_version;
257        }
258        self.keys.set(self.map.len() as i64);
259    }
260
261    /// Perform a [Self::put] and [Self::sync] in a single operation.
262    pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
263        self.put(key, value);
264        self.sync().await
265    }
266
267    /// Remove a value from [Metadata] (if it exists).
268    pub fn remove(&mut self, key: &K) -> Option<V> {
269        // Get value
270        let past = self.map.remove(key);
271
272        // Mark key as modified.
273        if past.is_some() {
274            self.key_order_changed = self.next_version;
275        }
276        self.keys.set(self.map.len() as i64);
277
278        past
279    }
280
281    /// Iterate over all keys in metadata, optionally filtered by prefix.
282    ///
283    /// If a prefix is provided, only keys that start with the prefix bytes will be returned.
284    pub fn keys<'a>(&'a self, prefix: Option<&'a [u8]>) -> impl Iterator<Item = &'a K> + 'a {
285        self.map.keys().filter(move |key| {
286            if let Some(prefix_bytes) = prefix {
287                key.as_ref().starts_with(prefix_bytes)
288            } else {
289                true
290            }
291        })
292    }
293
294    /// Remove all keys that start with the given prefix.
295    pub fn remove_prefix(&mut self, prefix: &[u8]) {
296        // Retain only keys that do not start with the prefix
297        self.map.retain(|key, _| !key.as_ref().starts_with(prefix));
298
299        // Mark key order as changed since we may have removed keys
300        self.key_order_changed = self.next_version;
301        self.keys.set(self.map.len() as i64);
302    }
303
304    /// Atomically commit the current state of [Metadata].
305    pub async fn sync(&mut self) -> Result<(), Error> {
306        // Compute next version.
307        //
308        // While it is possible that extremely high-frequency updates to metadata could cause an eventual
309        // overflow of version, syncing once per millisecond would overflow in 584,942,417 years.
310        let past_version = self.blobs[self.cursor].version;
311        let next_next_version = self.next_version.checked_add(1).expect("version overflow");
312
313        // Get target blob (the one we will modify)
314        let target_cursor = 1 - self.cursor;
315        let target = &mut self.blobs[target_cursor];
316
317        // Attempt to overwrite existing data if key order has not changed recently
318        let mut overwrite = true;
319        let mut writes = Vec::with_capacity(target.modified.len());
320        if self.key_order_changed < past_version {
321            for key in target.modified.iter() {
322                let info = target.lengths.get(key).expect("key must exist");
323                let new_value = self.map.get(key).expect("key must exist");
324                if info.length == new_value.encode_size() {
325                    // Overwrite existing value
326                    let encoded = new_value.encode();
327                    target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
328                    writes.push(target.blob.write_at(encoded, info.start as u64));
329                } else {
330                    // Rewrite all
331                    overwrite = false;
332                    break;
333                }
334            }
335        } else {
336            // If the key order has changed, we need to rewrite all data
337            overwrite = false;
338        }
339
340        // Clear modified keys to avoid writing the same data
341        target.modified.clear();
342
343        // Overwrite existing data
344        if overwrite {
345            // Update version
346            let version = self.next_version.to_be_bytes();
347            target.data[0..8].copy_from_slice(&version);
348            writes.push(target.blob.write_at(version.as_slice().into(), 0));
349
350            // Update checksum
351            let checksum_index = target.data.len() - 4;
352            let checksum = crc32fast::hash(&target.data[..checksum_index]).to_be_bytes();
353            target.data[checksum_index..].copy_from_slice(&checksum);
354            writes.push(
355                target
356                    .blob
357                    .write_at(checksum.as_slice().into(), checksum_index as u64),
358            );
359
360            // Persist changes
361            try_join_all(writes).await?;
362            target.blob.sync().await?;
363
364            // Update state
365            target.version = self.next_version;
366            self.cursor = target_cursor;
367            self.next_version = next_next_version;
368            self.sync_overwrites.inc();
369            return Ok(());
370        }
371
372        // Rewrite all data
373        let mut lengths = HashMap::new();
374        let mut next_data = Vec::with_capacity(target.data.len());
375        next_data.put_u64(self.next_version);
376        for (key, value) in &self.map {
377            key.write(&mut next_data);
378            let start = next_data.len();
379            value.write(&mut next_data);
380            lengths.insert(key.clone(), Info::new(start, value.encode_size()));
381        }
382        next_data.put_u32(crc32fast::hash(&next_data[..]));
383
384        // Persist changes
385        target.blob.write_at(next_data.clone(), 0).await?;
386        if next_data.len() < target.data.len() {
387            target.blob.resize(next_data.len() as u64).await?;
388        }
389        target.blob.sync().await?;
390
391        // Update state
392        target.version = self.next_version;
393        target.lengths = lengths;
394        target.data = next_data;
395        self.cursor = target_cursor;
396        self.next_version = next_next_version;
397        self.sync_rewrites.inc();
398        Ok(())
399    }
400
401    /// Sync outstanding data and close [Metadata].
402    pub async fn close(mut self) -> Result<(), Error> {
403        // Sync and drop blobs
404        self.sync().await?;
405        for wrapper in self.blobs.into_iter() {
406            wrapper.blob.sync().await?;
407        }
408        Ok(())
409    }
410
411    /// Remove the underlying blobs for this [Metadata].
412    pub async fn destroy(self) -> Result<(), Error> {
413        for (i, wrapper) in self.blobs.into_iter().enumerate() {
414            drop(wrapper.blob);
415            self.context
416                .remove(&self.partition, Some(BLOB_NAMES[i]))
417                .await?;
418            debug!(blob = i, "destroyed blob");
419        }
420        match self.context.remove(&self.partition, None).await {
421            Ok(()) => {}
422            Err(RError::PartitionMissing(_)) => {
423                // Partition already removed or never existed.
424            }
425            Err(err) => return Err(Error::Runtime(err)),
426        }
427        Ok(())
428    }
429}