Skip to main content

commonware_storage/qmdb/current/ordered/
db.rs

1//! Shared implementation for ordered Current QMDB variants.
2//!
3//! This module contains impl blocks that are generic over `ValueEncoding`, allowing them to be
4//! used by both fixed and variable ordered QMDB implementations.
5
6use crate::{
7    index::ordered::Index,
8    journal::contiguous::{Contiguous, MutableContiguous},
9    kv::{self, Batchable},
10    mmr::{grafting::Storage as GraftingStorage, Location},
11    qmdb::{
12        any::{
13            ordered::{Operation, Update},
14            ValueEncoding,
15        },
16        current::{
17            db::{Merkleized, State, Unmerkleized},
18            proof::OperationProof,
19        },
20        store::{self, LogStore as _},
21        DurabilityState, Durable, Error, NonDurable,
22    },
23    translator::Translator,
24};
25use commonware_codec::Codec;
26use commonware_cryptography::{Digest, DigestOf, Hasher};
27use commonware_runtime::{Clock, Metrics, Storage};
28use commonware_utils::Array;
29use futures::stream::Stream;
30
31/// Proof information for verifying a key has a particular value in the database.
32#[derive(Clone, Eq, PartialEq, Debug)]
33pub struct KeyValueProof<K: Array, D: Digest, const N: usize> {
34    pub proof: OperationProof<D, N>,
35    pub next_key: K,
36}
37
38/// The generic Db type for ordered Current QMDB variants.
39pub type Db<E, C, K, V, H, T, const N: usize, S = Merkleized<DigestOf<H>>, D = Durable> =
40    super::super::db::Db<E, C, Index<T, Location>, H, Update<K, V>, N, S, D>;
41
42// Functionality shared across all DB states, such as most non-mutating operations.
43impl<
44        E: Storage + Clock + Metrics,
45        C: Contiguous<Item = Operation<K, V>>,
46        K: Array,
47        V: ValueEncoding,
48        H: Hasher,
49        T: Translator,
50        const N: usize,
51        S: State<DigestOf<H>>,
52        D: DurabilityState,
53    > Db<E, C, K, V, H, T, N, S, D>
54where
55    Operation<K, V>: Codec,
56    V::Value: Send + Sync,
57{
58    /// Get the value of `key` in the db, or None if it has no value.
59    pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error> {
60        self.any.get(key).await
61    }
62
63    /// Return true if the proof authenticates that `key` currently has value `value` in the db with
64    /// the provided `root`.
65    pub fn verify_key_value_proof(
66        hasher: &mut H,
67        key: K,
68        value: V::Value,
69        proof: &KeyValueProof<K, H::Digest, N>,
70        root: &H::Digest,
71    ) -> bool {
72        let op = Operation::Update(Update {
73            key,
74            value,
75            next_key: proof.next_key.clone(),
76        });
77
78        proof
79            .proof
80            .verify(hasher, Self::grafting_height(), op, root)
81    }
82
83    /// Get the operation that currently defines the span whose range contains `key`, or None if the
84    /// DB is empty.
85    pub async fn get_span(&self, key: &K) -> Result<Option<(Location, Update<K, V>)>, Error> {
86        self.any.get_span(key).await
87    }
88
89    /// Streams all active (key, value) pairs in the database in key order, starting from the first
90    /// active key greater than or equal to `start`.
91    pub async fn stream_range<'a>(
92        &'a self,
93        start: K,
94    ) -> Result<impl Stream<Item = Result<(K, V::Value), Error>> + 'a, Error>
95    where
96        V: 'a,
97    {
98        self.any.stream_range(start).await
99    }
100
101    /// Return true if the proof authenticates that `key` does _not_ exist in the db with the
102    /// provided `root`.
103    pub fn verify_exclusion_proof(
104        hasher: &mut H,
105        key: &K,
106        proof: &super::ExclusionProof<K, V, H::Digest, N>,
107        root: &H::Digest,
108    ) -> bool {
109        let (op_proof, op) = match proof {
110            super::ExclusionProof::KeyValue(op_proof, data) => {
111                if data.key == *key {
112                    // The provided `key` is in the DB if it matches the start of the span.
113                    return false;
114                }
115                if !crate::qmdb::any::db::Db::<
116                    E,
117                    C,
118                    Index<T, Location>,
119                    H,
120                    Update<K, V>,
121                    S::AnyState,
122                    D,
123                >::span_contains(&data.key, &data.next_key, key)
124                {
125                    // If the key is not within the span, then this proof cannot prove its
126                    // exclusion.
127                    return false;
128                }
129
130                (op_proof, Operation::Update(data.clone()))
131            }
132            super::ExclusionProof::Commit(op_proof, metadata) => {
133                // Handle the case where the proof shows the db is empty, hence any key is proven
134                // excluded. For the db to be empty, the floor must equal the commit operation's
135                // location.
136                let floor_loc = op_proof.loc;
137                (
138                    op_proof,
139                    Operation::CommitFloor(metadata.clone(), floor_loc),
140                )
141            }
142        };
143
144        op_proof.verify(hasher, Self::grafting_height(), op, root)
145    }
146}
147
148// Functionality for any Merkleized state (both Durable and NonDurable).
149impl<
150        E: Storage + Clock + Metrics,
151        C: MutableContiguous<Item = Operation<K, V>>,
152        K: Array,
153        V: ValueEncoding,
154        H: Hasher,
155        T: Translator,
156        const N: usize,
157        D: store::State,
158    > Db<E, C, K, V, H, T, N, Merkleized<DigestOf<H>>, D>
159where
160    Operation<K, V>: Codec,
161    V::Value: Send + Sync,
162{
163    /// Generate and return a proof of the current value of `key`, along with the other
164    /// [KeyValueProof] required to verify the proof. Returns KeyNotFound error if the key is not
165    /// currently assigned any value.
166    ///
167    /// # Errors
168    ///
169    /// Returns [Error::KeyNotFound] if the key is not currently assigned any value.
170    pub async fn key_value_proof(
171        &self,
172        hasher: &mut H,
173        key: K,
174    ) -> Result<KeyValueProof<K, H::Digest, N>, Error> {
175        let op_loc = self.any.get_with_loc(&key).await?;
176        let Some((data, loc)) = op_loc else {
177            return Err(Error::KeyNotFound);
178        };
179        let height = Self::grafting_height();
180        let mmr = &self.any.log.mmr;
181        let proof =
182            OperationProof::<H::Digest, N>::new(hasher, &self.status, height, mmr, loc).await?;
183
184        Ok(KeyValueProof {
185            proof,
186            next_key: data.next_key,
187        })
188    }
189
190    /// Generate and return a proof that the specified `key` does not exist in the db.
191    ///
192    /// # Errors
193    ///
194    /// Returns [Error::KeyExists] if the key exists in the db.
195    pub async fn exclusion_proof(
196        &self,
197        hasher: &mut H,
198        key: &K,
199    ) -> Result<super::ExclusionProof<K, V, H::Digest, N>, Error> {
200        let height = Self::grafting_height();
201        let grafted_mmr =
202            GraftingStorage::<'_, H, _, _>::new(&self.status, &self.any.log.mmr, height);
203
204        let span = self.any.get_span(key).await?;
205        let loc = match &span {
206            Some((loc, key_data)) => {
207                if key_data.key == *key {
208                    // Cannot prove exclusion of a key that exists in the db.
209                    return Err(Error::KeyExists);
210                }
211                *loc
212            }
213            None => self.size().checked_sub(1).expect("db shouldn't be empty"),
214        };
215
216        let op_proof =
217            OperationProof::<H::Digest, N>::new(hasher, &self.status, height, &grafted_mmr, loc)
218                .await?;
219
220        Ok(match span {
221            Some((_, key_data)) => super::ExclusionProof::KeyValue(op_proof, key_data),
222            None => {
223                let value = match self.any.log.read(loc).await? {
224                    Operation::CommitFloor(value, _) => value,
225                    _ => unreachable!("last commit is not a CommitFloor operation"),
226                };
227                super::ExclusionProof::Commit(op_proof, value)
228            }
229        })
230    }
231}
232
233// Functionality for the Mutable state.
234impl<
235        E: Storage + Clock + Metrics,
236        C: MutableContiguous<Item = Operation<K, V>>,
237        K: Array,
238        V: ValueEncoding,
239        H: Hasher,
240        T: Translator,
241        const N: usize,
242    > Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
243where
244    Operation<K, V>: Codec,
245    V::Value: Send + Sync,
246{
247    /// Updates `key` to have value `value`. The operation is reflected in the snapshot, but will be
248    /// subject to rollback until the next successful `commit`.
249    pub async fn update(&mut self, key: K, value: V::Value) -> Result<(), Error> {
250        self.any
251            .update_with_callback(key, value, |loc| {
252                self.status.push(true);
253                if let Some(loc) = loc {
254                    self.status.set_bit(*loc, false);
255                }
256            })
257            .await
258    }
259
260    /// Creates a new key-value pair in the db. The operation is reflected in the snapshot, but will
261    /// be subject to rollback until the next successful `commit`. Returns true if the key was
262    /// created, false if it already existed.
263    pub async fn create(&mut self, key: K, value: V::Value) -> Result<bool, Error> {
264        self.any
265            .create_with_callback(key, value, |loc| {
266                self.status.push(true);
267                if let Some(loc) = loc {
268                    self.status.set_bit(*loc, false);
269                }
270            })
271            .await
272    }
273
274    /// Delete `key` and its value from the db. Deleting a key that already has no value is a no-op.
275    /// The operation is reflected in the snapshot, but will be subject to rollback until the next
276    /// successful `commit`. Returns true if the key was deleted, false if it was already inactive.
277    pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
278        let mut r = false;
279        self.any
280            .delete_with_callback(key, |append, loc| {
281                if let Some(loc) = loc {
282                    self.status.set_bit(*loc, false);
283                }
284                self.status.push(append);
285                r = true;
286            })
287            .await?;
288
289        Ok(r)
290    }
291}
292
293// Store implementation for all states
294impl<
295        E: Storage + Clock + Metrics,
296        C: Contiguous<Item = Operation<K, V>>,
297        K: Array,
298        V: ValueEncoding,
299        H: Hasher,
300        T: Translator,
301        const N: usize,
302        S: State<DigestOf<H>>,
303        D: DurabilityState,
304    > kv::Gettable for Db<E, C, K, V, H, T, N, S, D>
305where
306    Operation<K, V>: Codec,
307    V::Value: Send + Sync,
308{
309    type Key = K;
310    type Value = V::Value;
311    type Error = Error;
312
313    async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
314        self.get(key).await
315    }
316}
317
318// StoreMut for (Unmerkleized, NonDurable) (aka mutable) state
319impl<
320        E: Storage + Clock + Metrics,
321        C: MutableContiguous<Item = Operation<K, V>>,
322        K: Array,
323        V: ValueEncoding,
324        H: Hasher,
325        T: Translator,
326        const N: usize,
327    > kv::Updatable for Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
328where
329    Operation<K, V>: Codec,
330    V::Value: Send + Sync,
331{
332    async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
333        self.update(key, value).await
334    }
335}
336
337// StoreDeletable for (Unmerkleized, NonDurable) (aka mutable) state
338impl<
339        E: Storage + Clock + Metrics,
340        C: MutableContiguous<Item = Operation<K, V>>,
341        K: Array,
342        V: ValueEncoding,
343        H: Hasher,
344        T: Translator,
345        const N: usize,
346    > kv::Deletable for Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
347where
348    Operation<K, V>: Codec,
349    V::Value: Send + Sync,
350{
351    async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
352        self.delete(key).await
353    }
354}
355
356// Batchable for (Unmerkleized, NonDurable) (aka mutable) state
357impl<E, C, K, V, T, H, const N: usize> Batchable
358    for Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
359where
360    E: Storage + Clock + Metrics,
361    C: MutableContiguous<Item = Operation<K, V>>,
362    K: Array,
363    V: ValueEncoding,
364    T: Translator,
365    H: Hasher,
366    Operation<K, V>: Codec,
367    V::Value: Send + Sync,
368{
369    async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Error>
370    where
371        Iter: Iterator<Item = (K, Option<V::Value>)> + Send + 'a,
372    {
373        let status = &mut self.status;
374        self.any
375            .write_batch_with_callback(iter, move |append: bool, loc: Option<Location>| {
376                status.push(append);
377                if let Some(loc) = loc {
378                    status.set_bit(*loc, false);
379                }
380            })
381            .await
382    }
383}