Skip to main content

commonware_storage/qmdb/current/ordered/
db.rs

1//! Shared implementation for ordered Current QMDB variants.
2//!
3//! This module contains impl blocks that are generic over `ValueEncoding`, allowing them to be
4//! used by both fixed and variable ordered QMDB implementations.
5
6use crate::{
7    index::Ordered as OrderedIndex,
8    journal::contiguous::{Contiguous, Mutable, Reader},
9    merkle::{self, hasher::Standard as StandardHasher, Location},
10    qmdb::{
11        any::{
12            ordered::{Operation, Update},
13            ValueEncoding,
14        },
15        current::proof::OperationProof,
16        operation::Key,
17        Error,
18    },
19    Context,
20};
21use bytes::{Buf, BufMut};
22use commonware_codec::{Codec, EncodeSize, Read, Write};
23use commonware_cryptography::{Digest, Hasher};
24use commonware_parallel::Strategy;
25use futures::stream::Stream;
26
27/// Proof information for verifying a key has a particular value in the database.
28#[derive(Clone, Eq, PartialEq, Debug)]
29pub struct KeyValueProof<F: merkle::Graftable, K: Key, D: Digest, const N: usize> {
30    pub proof: OperationProof<F, D, N>,
31    pub next_key: K,
32}
33
34impl<F: merkle::Graftable, K: Key, D: Digest, const N: usize> Write for KeyValueProof<F, K, D, N> {
35    fn write(&self, buf: &mut impl BufMut) {
36        self.proof.write(buf);
37        self.next_key.write(buf);
38    }
39}
40
41impl<F: merkle::Graftable, K: Key, D: Digest, const N: usize> EncodeSize
42    for KeyValueProof<F, K, D, N>
43{
44    fn encode_size(&self) -> usize {
45        self.proof.encode_size() + self.next_key.encode_size()
46    }
47}
48
49impl<F: merkle::Graftable, K: Key, D: Digest, const N: usize> Read for KeyValueProof<F, K, D, N> {
50    /// `(max_digests, key_cfg)`: the Merkle digest cap forwarded to the embedded operation
51    /// proof and the read configuration for the key type.
52    type Cfg = (usize, <K as Read>::Cfg);
53
54    fn read_cfg(
55        buf: &mut impl Buf,
56        (max_digests, key_cfg): &Self::Cfg,
57    ) -> Result<Self, commonware_codec::Error> {
58        let proof = OperationProof::<F, D, N>::read_cfg(buf, max_digests)?;
59        let next_key = K::read_cfg(buf, key_cfg)?;
60        Ok(Self { proof, next_key })
61    }
62}
63
64#[cfg(feature = "arbitrary")]
65impl<F: merkle::Graftable, K: Key, D: Digest, const N: usize> arbitrary::Arbitrary<'_>
66    for KeyValueProof<F, K, D, N>
67where
68    K: for<'a> arbitrary::Arbitrary<'a>,
69    D: for<'a> arbitrary::Arbitrary<'a>,
70    F::PendingChunk<D>: for<'a> arbitrary::Arbitrary<'a>,
71{
72    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
73        Ok(Self {
74            proof: u.arbitrary()?,
75            next_key: u.arbitrary()?,
76        })
77    }
78}
79
80/// The generic Db type for ordered Current QMDB variants.
81///
82/// This type is generic over the index type `I`, allowing it to be used with both regular
83/// and partitioned indices.
84pub type Db<F, E, C, K, V, I, H, const N: usize, S> =
85    crate::qmdb::current::db::Db<F, E, C, I, H, Update<K, V>, N, S>;
86
87// Shared read-only functionality.
88impl<
89        F: merkle::Graftable,
90        E: Context,
91        C: Contiguous<Item = Operation<F, K, V>>,
92        K: Key,
93        V: ValueEncoding,
94        I: OrderedIndex<Value = Location<F>>,
95        H: Hasher,
96        const N: usize,
97        S: Strategy,
98    > Db<F, E, C, K, V, I, H, N, S>
99where
100    Operation<F, K, V>: Codec,
101{
102    /// Get the value of `key` in the db, or None if it has no value.
103    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
104        self.any.get(key).await
105    }
106
107    /// Return true if the proof authenticates that `key` currently has value `value` in the db with
108    /// the provided `root`.
109    pub fn verify_key_value_proof(
110        hasher: &StandardHasher<H>,
111        key: K,
112        value: V::Value,
113        proof: &KeyValueProof<F, K, H::Digest, N>,
114        root: &H::Digest,
115    ) -> bool {
116        let op = Operation::Update(Update {
117            key,
118            value,
119            next_key: proof.next_key.clone(),
120        });
121
122        proof.proof.verify(hasher, op, root)
123    }
124
125    /// Get the operation that currently defines the span whose range contains `key`, or None if the
126    /// DB is empty.
127    pub async fn get_span(&self, key: &K) -> Result<Option<(Location<F>, Update<K, V>)>, Error<F>> {
128        self.any.get_span(key).await
129    }
130
131    /// Streams all active (key, value) pairs in the database in key order, starting from the first
132    /// active key greater than or equal to `start`.
133    pub async fn stream_range<'a>(
134        &'a self,
135        start: K,
136    ) -> Result<impl Stream<Item = Result<(K, V::Value), Error<F>>> + 'a, Error<F>>
137    where
138        V: 'a,
139    {
140        self.any.stream_range(start).await
141    }
142
143    /// Return true if the proof authenticates that `key` does _not_ exist in the db with the
144    /// provided `root`.
145    pub fn verify_exclusion_proof(
146        hasher: &StandardHasher<H>,
147        key: &K,
148        proof: &super::ExclusionProof<F, K, V, H::Digest, N>,
149        root: &H::Digest,
150    ) -> bool {
151        let (op_proof, op) = match proof {
152            super::ExclusionProof::KeyValue(op_proof, data) => {
153                if data.key == *key {
154                    // The provided `key` is in the DB if it matches the start of the span.
155                    return false;
156                }
157                if !crate::qmdb::any::db::Db::<F, E, C, I, H, Update<K, V>, N, S>::span_contains(
158                    &data.key,
159                    &data.next_key,
160                    key,
161                ) {
162                    // If the key is not within the span, then this proof cannot prove its
163                    // exclusion.
164                    return false;
165                }
166
167                (op_proof, Operation::Update(data.clone()))
168            }
169            super::ExclusionProof::Commit(op_proof, metadata) => {
170                // Handle the case where the proof shows the db is empty, hence any key is proven
171                // excluded. For the db to be empty, the floor must equal the commit operation's
172                // location.
173                let floor_loc = op_proof.loc;
174                (
175                    op_proof,
176                    Operation::CommitFloor(metadata.clone(), floor_loc),
177                )
178            }
179        };
180
181        op_proof.verify(hasher, op, root)
182    }
183}
184
185impl<
186        F: merkle::Graftable,
187        E: Context,
188        C: Mutable<Item = Operation<F, K, V>>,
189        K: Key,
190        V: ValueEncoding,
191        I: OrderedIndex<Value = Location<F>>,
192        H: Hasher,
193        const N: usize,
194        S: Strategy,
195    > Db<F, E, C, K, V, I, H, N, S>
196where
197    Operation<F, K, V>: Codec,
198{
199    /// Generate and return a proof of the current value of `key`, along with the other
200    /// [KeyValueProof] required to verify the proof. Returns KeyNotFound error if the key is not
201    /// currently assigned any value.
202    ///
203    /// # Errors
204    ///
205    /// Returns [Error::KeyNotFound] if the key is not currently assigned any value.
206    pub async fn key_value_proof(
207        &self,
208        hasher: &StandardHasher<H>,
209        key: K,
210    ) -> Result<KeyValueProof<F, K, H::Digest, N>, Error<F>> {
211        let op_loc = self.any.get_with_loc(&key).await?;
212        let Some((data, loc)) = op_loc else {
213            return Err(Error::<F>::KeyNotFound);
214        };
215        let proof = self.operation_proof(hasher, loc).await?;
216
217        Ok(KeyValueProof {
218            proof,
219            next_key: data.next_key,
220        })
221    }
222
223    /// Generate and return a proof that the specified `key` does not exist in the db.
224    ///
225    /// # Errors
226    ///
227    /// Returns [Error::KeyExists] if the key exists in the db.
228    pub async fn exclusion_proof(
229        &self,
230        hasher: &StandardHasher<H>,
231        key: &K,
232    ) -> Result<super::ExclusionProof<F, K, V, H::Digest, N>, Error<F>> {
233        match self.any.get_span(key).await? {
234            Some((loc, key_data)) => {
235                if key_data.key == *key {
236                    // Cannot prove exclusion of a key that exists in the db.
237                    return Err(Error::<F>::KeyExists);
238                }
239                let op_proof = self.operation_proof(hasher, loc).await?;
240                Ok(super::ExclusionProof::KeyValue(op_proof, key_data))
241            }
242            None => {
243                // The DB is empty. Use the last CommitFloor to prove emptiness. The Commit proof
244                // variant requires the CommitFloor's floor to equal its own location (genuinely
245                // empty at commit time). If this doesn't hold, the persisted state is inconsistent.
246                let op = self
247                    .any
248                    .log
249                    .reader()
250                    .await
251                    .read(*self.any.last_commit_loc)
252                    .await?;
253                let Operation::CommitFloor(value, floor) = op else {
254                    unreachable!("last_commit_loc should always point to a CommitFloor");
255                };
256                assert_eq!(
257                    floor, self.any.last_commit_loc,
258                    "inconsistent commit floor: expected last_commit_loc={}, got floor={}",
259                    self.any.last_commit_loc, floor
260                );
261                let op_proof = self
262                    .operation_proof(hasher, self.any.last_commit_loc)
263                    .await?;
264                Ok(super::ExclusionProof::Commit(op_proof, value))
265            }
266        }
267    }
268}