commonware-storage 2026.5.0

Persist and retrieve data from an abstract store.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
//! Shared machinery for the compact-db compact-sync witness.
//!
//! The witness is the encoded last-commit operation together with its single-leaf inclusion proof
//! against the current root. Persisting that witness alongside the compact Merkle frontier lets a
//! compact database serve compact sync for its latest committed state without retaining the full
//! historical operation log.
//!
//! This state lives at the db layer rather than the Merkle layer because only the db knows how to
//! encode and decode the typed commit operation. Both [`crate::qmdb::immutable::CompactDb`] and
//! [`crate::qmdb::keyless::CompactDb`] store the witness in the same ping-pong slots as the
//! frontier state, then re-verify it against the current root on every reopen and rewind. If those
//! bytes no longer describe a valid witness for the stored root, reopening fails with
//! [`Error::DataCorrupted`].
//!
//! The lifecycle in this file is:
//!
//! 1. Build a [`ServeState`] from the current in-memory tip.
//! 2. Persist its witness bytes into the same active/inactive slot scheme used by the compact
//!    Merkle frontier.
//! 3. Reload that witness on reopen or rewind, re-verify it against the currently loaded root, and
//!    rebuild the cache.
//! 4. Use the cache to answer compact-sync requests without re-encoding the commit or recomputing
//!    its proof on every serve.

use crate::{
    merkle::{compact, Family, Location, Proof},
    metadata::Metadata,
    qmdb::{self, sync::compact::Target, Error},
    Context,
};
use commonware_codec::{Decode as _, Encode as _, FixedSize, Read};
use commonware_cryptography::{Digest, Hasher};
use commonware_parallel::Strategy;
use commonware_utils::{sequence::prefixed_u64::U64, sync::RwLock};

// Per-slot db-extra layout. A "slot" is one side of the compact Merkle's ping-pong persistence
// scheme: each sync writes the next committed state into the inactive slot and then flips which
// slot is active. Slots 0-4 belong to the Merkle layer; slots 5-8 mirror that scheme for the
// db-level compact-sync witness. Only the active slot is ever consulted on reopen, rewind, or
// serving; stale witness bytes left behind in the inactive slot are harmless until the next sync
// overwrites them.
//   (5, 0) slot A last commit op bytes     (7, 0) slot B last commit op bytes
//   (6, 0) slot A last commit proof bytes  (8, 0) slot B last commit proof bytes
const SLOT_A_LAST_COMMIT_OP_PREFIX: u8 = 5;
const SLOT_A_LAST_COMMIT_PROOF_PREFIX: u8 = 6;
const SLOT_B_LAST_COMMIT_OP_PREFIX: u8 = 7;
const SLOT_B_LAST_COMMIT_PROOF_PREFIX: u8 = 8;

/// Return the metadata prefix used for last-commit op bytes in the given ping-pong slot.
const fn last_commit_op_prefix(slot: u8) -> u8 {
    if slot == 0 {
        SLOT_A_LAST_COMMIT_OP_PREFIX
    } else {
        SLOT_B_LAST_COMMIT_OP_PREFIX
    }
}

/// Return the metadata prefix used for last-commit proof bytes in the given ping-pong slot.
const fn last_commit_proof_prefix(slot: u8) -> u8 {
    if slot == 0 {
        SLOT_A_LAST_COMMIT_PROOF_PREFIX
    } else {
        SLOT_B_LAST_COMMIT_PROOF_PREFIX
    }
}

/// Metadata key for the encoded last-commit operation in `slot`.
pub(crate) const fn last_commit_op_key(slot: u8) -> U64 {
    U64::new(last_commit_op_prefix(slot), 0)
}

/// Metadata key for the last-commit inclusion proof in `slot`.
pub(crate) const fn last_commit_proof_key(slot: u8) -> U64 {
    U64::new(last_commit_proof_prefix(slot), 0)
}

/// In-memory cache of the witness currently associated with the active compact state.
///
/// Compact sync serving needs the current root, frontier pins, last commit bytes, and proof as one
/// coherent unit. Keeping them together here avoids repeated re-encoding and re-proofing during
/// steady-state serving while still letting reopen/rewind rebuild the cache from persisted bytes.
#[derive(Clone)]
pub(crate) struct ServeState<F: Family, D: Digest> {
    /// Root committed by the current persisted frontier/witness pair.
    pub(crate) root: D,
    /// Total leaves in the committed Merkle, which also identifies the tip commit location.
    pub(crate) leaf_count: Location<F>,
    /// Frontier nodes pinned by compact sync; these are the persisted peaks, not the proof path.
    pub(crate) pinned_nodes: Vec<D>,
    /// Encoded last-commit operation bytes used for root verification and serving.
    pub(crate) last_commit_op_bytes: Vec<u8>,
    /// Inclusion proof for the last-commit leaf against `root`.
    pub(crate) last_commit_proof: Proof<F, D>,
}

impl<F: Family, D: Digest> ServeState<F, D> {
    /// Convert the cached witness into the compact-sync target this source can currently serve.
    ///
    /// Compact sources only serve their current committed tip, so the target is just the root plus
    /// the total committed leaf count.
    pub(crate) const fn target(&self) -> Target<F, D> {
        Target {
            root: self.root,
            leaf_count: self.leaf_count,
        }
    }
}

/// Synchronous cache for the compact witness currently safe to serve.
///
/// The cache is intentionally tiny: it only hides the lock used to read and replace the witness.
/// Higher-level persistence and serving logic stays explicit at call sites.
pub(crate) struct Cache<F: Family, D: Digest> {
    witness: RwLock<ServeState<F, D>>,
}

impl<F: Family, D: Digest> Cache<F, D> {
    /// Create a cache from the witness loaded or bootstrapped during db initialization.
    pub(crate) const fn new(witness: ServeState<F, D>) -> Self {
        Self {
            witness: RwLock::new(witness),
        }
    }

    /// Read the cached witness without exposing the underlying lock to db code.
    pub(crate) fn with<R>(&self, f: impl FnOnce(&ServeState<F, D>) -> R) -> R {
        f(&self.witness.read())
    }

    /// Replace the cached witness after the matching compact Merkle state is persisted or loaded.
    pub(crate) fn replace(&self, witness: ServeState<F, D>) {
        *self.witness.write() = witness;
    }

    /// Mutate the cache in tests that intentionally corrupt witness state.
    #[cfg(test)]
    pub(crate) fn mutate(&self, f: impl FnOnce(&mut ServeState<F, D>)) {
        f(&mut self.witness.write());
    }
}

/// Write the witness portion of `witness` into the given ping-pong slot's db metadata.
///
/// The compact Merkle layer persists the frontier itself. This helper persists the db-owned
/// witness bytes that must move in lockstep with that frontier.
pub(crate) fn write_witness_metadata<E, F, D>(
    metadata: &mut Metadata<E, U64, Vec<u8>>,
    slot: u8,
    witness: &ServeState<F, D>,
) where
    E: Context,
    F: Family,
    D: Digest,
{
    metadata.put(
        last_commit_op_key(slot),
        witness.last_commit_op_bytes.clone(),
    );
    metadata.put(
        last_commit_proof_key(slot),
        witness.last_commit_proof.encode().to_vec(),
    );
}

/// Validate that a decoded commit floor does not point past the commit it authenticates.
///
/// The inactivity floor of a commit must sit at or below the commit's own location. A higher
/// floor would reference operations that do not exist yet, which indicates either disk corruption
/// when reloading a persisted witness or malformed compact-sync input when validating reconstructed
/// state from a remote source.
pub(crate) fn validate_inactivity_floor<F: Family>(
    inactivity_floor_loc: Location<F>,
    last_commit_loc: Location<F>,
) -> Result<(), Error<F>> {
    if inactivity_floor_loc > last_commit_loc {
        return Err(Error::DataCorrupted("invalid compact witness"));
    }
    Ok(())
}

/// Build a witness from compact state that was already authenticated by the caller.
#[allow(clippy::type_complexity)]
pub(crate) fn witness_from_authenticated_state<F, E, D, S>(
    merkle: &compact::Merkle<F, E, D, S>,
    root: D,
    inactivity_floor_loc: Location<F>,
    last_commit_op_bytes: Vec<u8>,
    last_commit_proof: Proof<F, D>,
    pinned_nodes: Vec<D>,
) -> Result<(Location<F>, ServeState<F, D>), Error<F>>
where
    F: Family,
    E: Context,
    D: Digest,
    S: Strategy,
{
    if merkle.leaves() == 0 {
        return Err(Error::DataCorrupted("missing final commit"));
    }
    let leaf_count = merkle.leaves();
    let last_commit_loc = Location::<F>::new(*leaf_count - 1);
    validate_inactivity_floor(inactivity_floor_loc, last_commit_loc)?;
    let witness = ServeState {
        root,
        leaf_count,
        pinned_nodes,
        last_commit_op_bytes,
        last_commit_proof,
    };
    Ok((last_commit_loc, witness))
}

/// Rebuild the in-memory witness cache from the active slot's persisted witness.
///
/// This is the authoritative recovery path after reopen and rewind. It:
///
/// 1. reads the active slot's commit bytes and proof bytes,
/// 2. decodes the proof,
/// 3. re-verifies that proof against the Merkle currently loaded in memory,
/// 4. reconstructs the frontier pins for serving, and
/// 5. decodes the typed last commit operation needed by the caller's db state.
///
/// Any missing metadata, decode failure, or proof/root mismatch is treated as
/// [`Error::DataCorrupted`], because the persisted frontier and witness no longer describe the same
/// committed state.
pub(crate) async fn load_active_witness<F, E, H, S, C, Op, LastCommitFloor>(
    merkle: &compact::Merkle<F, E, H::Digest, S>,
    commit_codec_config: &C,
    last_commit_floor: LastCommitFloor,
) -> Result<(ServeState<F, H::Digest>, Op), Error<F>>
where
    F: Family,
    E: Context,
    H: Hasher,
    S: Strategy,
    Op: Read<Cfg = C>,
    LastCommitFloor: FnOnce(&Op) -> Option<Location<F>>,
{
    let slot = merkle.active_slot();
    let last_commit_op_bytes = merkle
        .read_metadata_key(&last_commit_op_key(slot))
        .await
        .ok_or(Error::DataCorrupted("missing compact witness"))?;
    let last_commit_proof_bytes = merkle
        .read_metadata_key(&last_commit_proof_key(slot))
        .await
        .ok_or(Error::DataCorrupted("missing compact witness"))?;
    // Every encoded digest is at least `D::SIZE` bytes on the wire, so `proof_bytes.len() /
    // D::SIZE` is a hard upper bound on the digest count. Using this as the decode cap prevents a
    // malformed length prefix from forcing a large preallocation.
    let max_digests = last_commit_proof_bytes.len() / H::Digest::SIZE;
    let last_commit_proof =
        Proof::<F, H::Digest>::decode_cfg(last_commit_proof_bytes.as_ref(), &max_digests)
            .map_err(|_| Error::DataCorrupted("invalid compact witness"))?;
    let leaf_count = last_commit_proof.leaves;
    if leaf_count == 0 {
        return Err(Error::DataCorrupted("invalid compact witness"));
    }

    // Decode the commit op to get the inactivity floor, which determines the inactive peak
    // boundary used for root computation.
    let last_commit_loc = Location::new(*leaf_count - 1);
    let last_commit_op = Op::decode_cfg(last_commit_op_bytes.as_ref(), commit_codec_config)
        .map_err(|_| Error::DataCorrupted("invalid commit operation"))?;
    let inactivity_floor_loc = last_commit_floor(&last_commit_op)
        .ok_or(Error::DataCorrupted("last operation was not a commit"))?;
    validate_inactivity_floor(inactivity_floor_loc, last_commit_loc)?;

    let inactive_peaks =
        F::inactive_peaks(F::location_to_position(leaf_count), inactivity_floor_loc);
    let hasher = qmdb::hasher::<H>();
    let root = merkle
        .root(&hasher, inactive_peaks)
        .map_err(|_| Error::DataCorrupted("failed to compute compact witness root"))?;
    if !last_commit_proof.verify_range_inclusion(
        &hasher,
        &[last_commit_op_bytes.as_slice()],
        last_commit_loc,
        &root,
    ) {
        return Err(Error::DataCorrupted("invalid compact witness"));
    }
    let pinned_nodes = merkle.with_mem(|mem| {
        F::nodes_to_pin(leaf_count)
            .map(|pos| *mem.get_node_unchecked(pos))
            .collect::<Vec<_>>()
    });
    let witness = ServeState {
        root,
        leaf_count,
        pinned_nodes,
        last_commit_op_bytes,
        last_commit_proof,
    };
    Ok((witness, last_commit_op))
}

/// Bootstrap the first persisted witness for a brand-new compact db.
///
/// Fresh compact databases begin with exactly one committed operation: the initial commit. This
/// helper inserts that commit into the compact Merkle, builds its one-leaf proof, and persists the
/// resulting witness into the active slot so later reopen and rewind paths can use the
/// same recovery logic as every subsequent commit.
pub(crate) async fn bootstrap_initial_commit<F, E, H, S>(
    merkle: &mut compact::Merkle<F, E, H::Digest, S>,
    last_commit_op_bytes: Vec<u8>,
) -> Result<(), Error<F>>
where
    F: Family,
    E: Context,
    H: Hasher,
    S: Strategy,
{
    let hasher = qmdb::hasher::<H>();
    let batch = {
        let batch = merkle.new_batch().add(&hasher, &last_commit_op_bytes);
        merkle.with_mem(|mem| batch.merkleize(mem, &hasher))
    };
    merkle.apply_batch(&batch)?;

    // The initial commit has one leaf and an inactivity floor of 0, giving 0 inactive peaks.
    let leaf_count = merkle.leaves();
    let inactive_peaks = F::inactive_peaks(F::location_to_position(leaf_count), Location::new(0));
    merkle
        .sync_with_witness(
            |mem| {
                let root = mem.root(&hasher, inactive_peaks)?;
                let last_commit_proof = mem.proof(&hasher, Location::new(0), inactive_peaks)?;
                Ok(ServeState {
                    root,
                    leaf_count: Location::new(1),
                    pinned_nodes: Vec::new(),
                    last_commit_op_bytes: last_commit_op_bytes.clone(),
                    last_commit_proof,
                })
            },
            |metadata, slot, witness| {
                write_witness_metadata(metadata, slot, &witness);
                Ok(())
            },
        )
        .await?;
    Ok(())
}

/// Persist the current compact witness for a compact db.
///
/// If the cached witness already matches the Merkle leaf count being synced, it is copied into
/// the Merkle slot being activated. Otherwise, a new witness is built from the unpruned Merkle
/// before sync prunes it. The cache check runs inside `sync_with_witness` so concurrent syncs
/// observe the latest witness cache after each persisted slot flip.
pub(crate) async fn persist_witness<F, E, H, S>(
    merkle: &compact::Merkle<F, E, H::Digest, S>,
    cache: &Cache<F, H::Digest>,
    last_commit_loc: Location<F>,
    inactivity_floor_loc: Location<F>,
    last_commit_op_bytes: Vec<u8>,
) -> Result<(), Error<F>>
where
    F: Family,
    E: Context,
    H: Hasher,
    S: Strategy,
{
    let hasher = qmdb::hasher::<H>();
    merkle
        .sync_with_witness(
            |mem| {
                let mem_leaves = mem.leaves();
                if let Some(cached) = cache
                    .with(|witness| (witness.leaf_count == mem_leaves).then(|| witness.clone()))
                {
                    return Ok(cached);
                }
                let inactive_peaks =
                    F::inactive_peaks(F::location_to_position(mem_leaves), inactivity_floor_loc);
                let mem_root = mem.root(&hasher, inactive_peaks)?;
                let pinned_nodes = F::nodes_to_pin(mem_leaves)
                    .map(|pos| *mem.get_node_unchecked(pos))
                    .collect::<Vec<_>>();
                let last_commit_proof = mem.proof(&hasher, last_commit_loc, inactive_peaks)?;
                Ok(ServeState {
                    root: mem_root,
                    leaf_count: mem_leaves,
                    pinned_nodes,
                    last_commit_op_bytes: last_commit_op_bytes.clone(),
                    last_commit_proof,
                })
            },
            |metadata, slot, witness| {
                write_witness_metadata(metadata, slot, &witness);
                cache.replace(witness);
                Ok(())
            },
        )
        .await?;
    Ok(())
}

/// Re-persist the already-verified cached witness into the newly active slot.
///
/// This is used when a compact db has reconstructed and verified state from persisted data and only
/// needs to move that known-good witness into the Merkle's slot layout, without recomputing the
/// proof from a fresh tip commit.
pub(crate) async fn persist_cached_witness<F, E, H, S>(
    merkle: &compact::Merkle<F, E, H::Digest, S>,
    cache: &Cache<F, H::Digest>,
) -> Result<(), Error<F>>
where
    F: Family,
    E: Context,
    H: Hasher,
    S: Strategy,
{
    // Re-persist the already-verified cached witness after the Merkle slot changes (for example,
    // after root verification on compact sync initialization) without recomputing proofs.
    let witness = cache.with(Clone::clone);
    merkle
        .sync_with_witness(
            |_| Ok(witness),
            |metadata, slot, witness| {
                write_witness_metadata(metadata, slot, &witness);
                Ok(())
            },
        )
        .await
        .map(|_| ())
        .map_err(Into::into)
}