commonware-storage 2026.4.0

Persist and retrieve data from an abstract store.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
use super::{Config, Error};
use crate::Context;
use commonware_codec::{Codec, FixedSize, ReadExt};
use commonware_cryptography::{crc32, Crc32};
use commonware_runtime::{telemetry::metrics::status::GaugeExt, Blob, BufMut, Error as RError};
use commonware_utils::{sync::AsyncMutex, Span};
use futures::future::try_join_all;
use prometheus_client::metrics::{counter::Counter, gauge::Gauge};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use tracing::{debug, warn};

/// The names of the two blobs that store metadata.
const BLOB_NAMES: [&[u8]; 2] = [b"left", b"right"];

/// Information about a value in a [Wrapper].
struct Info {
    start: usize,
    length: usize,
}

impl Info {
    /// Create a new [Info].
    const fn new(start: usize, length: usize) -> Self {
        Self { start, length }
    }
}

/// One of the two wrappers that store metadata.
struct Wrapper<B: Blob, K: Span> {
    blob: B,
    version: u64,
    lengths: HashMap<K, Info>,
    modified: BTreeSet<K>,
    data: Vec<u8>,
}

impl<B: Blob, K: Span> Wrapper<B, K> {
    /// Create a new [Wrapper].
    const fn new(blob: B, version: u64, lengths: HashMap<K, Info>, data: Vec<u8>) -> Self {
        Self {
            blob,
            version,
            lengths,
            modified: BTreeSet::new(),
            data,
        }
    }

    /// Create a new empty [Wrapper].
    fn empty(blob: B) -> Self {
        Self {
            blob,
            version: 0,
            lengths: HashMap::new(),
            modified: BTreeSet::new(),
            data: Vec::new(),
        }
    }
}

/// State used during [Metadata::sync] operations.
struct State<B: Blob, K: Span> {
    cursor: usize,
    next_version: u64,
    key_order_changed: u64,
    blobs: [Wrapper<B, K>; 2],
}

/// Implementation of [Metadata] storage.
pub struct Metadata<E: Context, K: Span, V: Codec> {
    context: E,

    map: BTreeMap<K, V>,
    partition: String,
    state: AsyncMutex<State<E::Blob, K>>,

    sync_overwrites: Counter,
    sync_rewrites: Counter,
    keys: Gauge,
}

impl<E: Context, K: Span, V: Codec> Metadata<E, K, V> {
    /// Initialize a new [Metadata] instance.
    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
        // Open dedicated blobs
        let (left_blob, left_len) = context.open(&cfg.partition, BLOB_NAMES[0]).await?;
        let (right_blob, right_len) = context.open(&cfg.partition, BLOB_NAMES[1]).await?;

        // Find latest blob (check which includes a hash of the other)
        let (left_map, left_wrapper) =
            Self::load(&cfg.codec_config, 0, left_blob, left_len).await?;
        let (right_map, right_wrapper) =
            Self::load(&cfg.codec_config, 1, right_blob, right_len).await?;

        // Choose latest blob
        let mut map = left_map;
        let mut cursor = 0;
        let mut version = left_wrapper.version;
        if right_wrapper.version > left_wrapper.version {
            cursor = 1;
            map = right_map;
            version = right_wrapper.version;
        }
        let next_version = version.checked_add(1).expect("version overflow");

        // Create metrics
        let sync_rewrites = Counter::default();
        let sync_overwrites = Counter::default();
        let keys = Gauge::default();
        context.register(
            "sync_rewrites",
            "number of syncs that rewrote all data",
            sync_rewrites.clone(),
        );
        context.register(
            "sync_overwrites",
            "number of syncs that modified existing data",
            sync_overwrites.clone(),
        );
        context.register("keys", "number of tracked keys", keys.clone());

        // Return metadata
        let _ = keys.try_set(map.len());
        Ok(Self {
            context,

            map,
            partition: cfg.partition,
            state: AsyncMutex::new(State {
                cursor,
                next_version,
                key_order_changed: next_version, // rewrite on startup because we don't have a diff record
                blobs: [left_wrapper, right_wrapper],
            }),

            sync_rewrites,
            sync_overwrites,
            keys,
        })
    }

    async fn load(
        codec_config: &V::Cfg,
        index: usize,
        blob: E::Blob,
        len: u64,
    ) -> Result<(BTreeMap<K, V>, Wrapper<E::Blob, K>), Error> {
        // Get blob length
        if len == 0 {
            // Empty blob
            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
        }

        // Read blob
        let len: usize = len.try_into().expect("blob too large for platform");
        let buf = blob.read_at(0, len).await?.coalesce();

        // Verify integrity.
        //
        // 8 bytes for version + 4 bytes for checksum.
        if buf.len() < 8 + crc32::Digest::SIZE {
            // Truncate and return none
            warn!(
                blob = index,
                len = buf.len(),
                "blob is too short: truncating"
            );
            blob.resize(0).await?;
            blob.sync().await?;
            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
        }

        // Extract checksum
        let checksum_index = buf.len() - crc32::Digest::SIZE;
        let stored_checksum =
            u32::from_be_bytes(buf.as_ref()[checksum_index..].try_into().unwrap());
        let computed_checksum = Crc32::checksum(&buf.as_ref()[..checksum_index]);
        if stored_checksum != computed_checksum {
            // Truncate and return none
            warn!(
                blob = index,
                stored = stored_checksum,
                computed = computed_checksum,
                "checksum mismatch: truncating"
            );
            blob.resize(0).await?;
            blob.sync().await?;
            return Ok((BTreeMap::new(), Wrapper::empty(blob)));
        }

        // Get parent
        let version = u64::from_be_bytes(buf.as_ref()[..8].try_into().unwrap());

        // Extract data
        //
        // If the checksum is correct, we assume data is correctly packed and we don't perform
        // length checks on the cursor.
        let mut data = BTreeMap::new();
        let mut lengths = HashMap::new();
        let mut cursor = u64::SIZE;
        while cursor < checksum_index {
            // Read key
            let key = K::read(&mut buf.as_ref()[cursor..].as_ref())
                .expect("unable to read key from blob");
            cursor += key.encode_size();

            // Read value
            let value = V::read_cfg(&mut buf.as_ref()[cursor..].as_ref(), codec_config)
                .expect("unable to read value from blob");
            lengths.insert(key.clone(), Info::new(cursor, value.encode_size()));
            cursor += value.encode_size();
            data.insert(key, value);
        }

        // Return info
        Ok((
            data,
            Wrapper::new(blob, version, lengths, buf.freeze().into()),
        ))
    }

    /// Get a value from [Metadata] (if it exists).
    pub fn get(&self, key: &K) -> Option<&V> {
        self.map.get(key)
    }

    /// Get a mutable reference to a value from [Metadata] (if it exists).
    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
        // Get value
        let value = self.map.get_mut(key)?;

        // Mark key as modified.
        //
        // We need to mark both blobs as modified because we may need to update both files.
        let state = self.state.get_mut();
        state.blobs[state.cursor].modified.insert(key.clone());
        state.blobs[1 - state.cursor].modified.insert(key.clone());

        Some(value)
    }

    /// Clear all values from [Metadata]. The new state will not be persisted until [Self::sync] is
    /// called.
    pub fn clear(&mut self) {
        // Clear map
        self.map.clear();

        // Mark key order as changed
        let state = self.state.get_mut();
        state.key_order_changed = state.next_version;
        self.keys.set(0);
    }

    /// Put a value into [Metadata].
    ///
    /// If the key already exists, the value will be overwritten and the previous
    /// value is returned. The value stored will not be persisted until [Self::sync]
    /// is called.
    pub fn put(&mut self, key: K, value: V) -> Option<V> {
        // Insert value, getting previous value if it existed
        let previous = self.map.insert(key.clone(), value);

        // Mark key as modified.
        //
        // We need to mark both blobs as modified because we may need to update both files.
        let state = self.state.get_mut();
        if previous.is_some() {
            state.blobs[state.cursor].modified.insert(key.clone());
            state.blobs[1 - state.cursor].modified.insert(key);
        } else {
            state.key_order_changed = state.next_version;
        }
        let _ = self.keys.try_set(self.map.len());
        previous
    }

    /// Perform a [Self::put] and [Self::sync] in a single operation.
    pub async fn put_sync(&mut self, key: K, value: V) -> Result<(), Error> {
        self.put(key, value);
        self.sync().await
    }

    /// Update (or insert) a value in [Metadata] using a closure.
    pub fn upsert(&mut self, key: K, f: impl FnOnce(&mut V))
    where
        V: Default,
    {
        if let Some(value) = self.get_mut(&key) {
            // Update existing value
            f(value);
        } else {
            // Insert new value
            let mut value = V::default();
            f(&mut value);
            self.put(key, value);
        }
    }

    /// Update (or insert) a value in [Metadata] using a closure and sync immediately.
    pub async fn upsert_sync(&mut self, key: K, f: impl FnOnce(&mut V)) -> Result<(), Error>
    where
        V: Default,
    {
        self.upsert(key, f);
        self.sync().await
    }

    /// Remove a value from [Metadata] (if it exists).
    pub fn remove(&mut self, key: &K) -> Option<V> {
        // Get value
        let past = self.map.remove(key);

        // Mark key as modified.
        if past.is_some() {
            let state = self.state.get_mut();
            state.key_order_changed = state.next_version;
        }
        let _ = self.keys.try_set(self.map.len());

        past
    }

    /// Iterate over all keys in metadata.
    pub fn keys(&self) -> impl Iterator<Item = &K> {
        self.map.keys()
    }

    /// Retain only the keys that satisfy the predicate.
    pub fn retain(&mut self, mut f: impl FnMut(&K, &V) -> bool) {
        // Retain only keys that satisfy the predicate
        let old_len = self.map.len();
        self.map.retain(|k, v| f(k, v));
        let new_len = self.map.len();

        // If the number of keys has changed, mark the key order as changed
        if new_len != old_len {
            let state = self.state.get_mut();
            state.key_order_changed = state.next_version;
            let _ = self.keys.try_set(self.map.len());
        }
    }

    /// Atomically commit the current state of [Metadata].
    pub async fn sync(&self) -> Result<(), Error> {
        // Acquire lock on sync state which will prevent concurrent sync calls while not blocking
        // reads from the metadata map.
        let mut state = self.state.lock().await;

        // Extract values we need
        let cursor = state.cursor;
        let next_version = state.next_version;
        let key_order_changed = state.key_order_changed;

        // Compute next version.
        //
        // While it is possible that extremely high-frequency updates to metadata could cause an
        // eventual overflow of version, syncing once per millisecond would overflow in 584,942,417
        // years.
        let past_version = state.blobs[cursor].version;
        let next_next_version = next_version.checked_add(1).expect("version overflow");

        // Get target blob (the one we will modify)
        let target_cursor = 1 - cursor;

        // Update the state.
        state.cursor = target_cursor;
        state.next_version = next_next_version;

        // Get a mutable reference to the target blob.
        let target = &mut state.blobs[target_cursor];

        // Determine if we can overwrite existing data in place, and prepare the list of data to
        // write in that event.
        let mut overwrite = true;
        let mut writes = vec![];
        if key_order_changed < past_version {
            let write_capacity = target.modified.len() + 2;
            writes.reserve(write_capacity);
            for key in target.modified.iter() {
                let info = target.lengths.get(key).expect("key must exist");
                let new_value = self.map.get(key).expect("key must exist");
                if info.length == new_value.encode_size() {
                    // Overwrite existing value
                    let encoded = new_value.encode_mut();
                    target.data[info.start..info.start + info.length].copy_from_slice(&encoded);
                    writes.push(target.blob.write_at(info.start as u64, encoded));
                } else {
                    // Rewrite all
                    overwrite = false;
                    break;
                }
            }
        } else {
            // If the key order has changed, we need to rewrite all data
            overwrite = false;
        }

        // Clear modified keys to avoid writing the same data
        target.modified.clear();

        // Overwrite existing data
        if overwrite {
            // Update version
            let version = next_version.to_be_bytes();
            target.data[0..8].copy_from_slice(&version);
            writes.push(target.blob.write_at(0, version.as_slice().into()));

            // Update checksum
            let checksum_index = target.data.len() - crc32::Digest::SIZE;
            let checksum = Crc32::checksum(&target.data[..checksum_index]).to_be_bytes();
            target.data[checksum_index..].copy_from_slice(&checksum);
            writes.push(
                target
                    .blob
                    .write_at(checksum_index as u64, checksum.as_slice().into()),
            );

            // Persist changes
            try_join_all(writes).await?;
            target.blob.sync().await?;

            // Update state
            target.version = next_version;
            self.sync_overwrites.inc();
            return Ok(());
        }

        // Since we can't overwrite in place, we rewrite the entire blob.
        let mut lengths = HashMap::new();
        let mut next_data = Vec::with_capacity(target.data.len());
        next_data.put_u64(next_version);

        // Build new data
        for (key, value) in &self.map {
            key.write(&mut next_data);
            let start = next_data.len();
            value.write(&mut next_data);
            lengths.insert(key.clone(), Info::new(start, value.encode_size()));
        }
        next_data.put_u32(Crc32::checksum(&next_data[..]));

        // Write and persist the new data
        target.blob.write_at(0, next_data.clone()).await?;
        if next_data.len() < target.data.len() {
            target.blob.resize(next_data.len() as u64).await?;
        }
        target.blob.sync().await?;

        // Update blob state
        target.version = next_version;
        target.lengths = lengths;
        target.data = next_data;

        self.sync_rewrites.inc();
        Ok(())
    }

    /// Remove the underlying blobs for this [Metadata].
    pub async fn destroy(self) -> Result<(), Error> {
        let state = self.state.into_inner();
        for (i, wrapper) in state.blobs.into_iter().enumerate() {
            drop(wrapper.blob);
            self.context
                .remove(&self.partition, Some(BLOB_NAMES[i]))
                .await?;
            debug!(blob = i, "destroyed blob");
        }
        match self.context.remove(&self.partition, None).await {
            Ok(()) => {}
            Err(RError::PartitionMissing(_)) => {
                // Partition already removed or never existed.
            }
            Err(err) => return Err(Error::Runtime(err)),
        }
        Ok(())
    }
}