Skip to main content

commonware_storage/qmdb/any/ordered/
mod.rs

1#[cfg(any(test, feature = "test-traits"))]
2use crate::qmdb::any::traits::PersistableMutableLog;
3use crate::{
4    index::Ordered as Index,
5    journal::contiguous::{Contiguous, Reader},
6    merkle::{Family, Location},
7    qmdb::{
8        any::{db::Db, ValueEncoding},
9        operation::{Key, Operation as OperationTrait},
10    },
11    Context,
12};
13use commonware_codec::Codec;
14use commonware_cryptography::Hasher;
15use futures::{
16    future::try_join_all,
17    stream::{self, Stream},
18};
19use std::{
20    collections::{BTreeMap, BTreeSet},
21    ops::Bound,
22};
23
24pub mod fixed;
25pub mod variable;
26
27pub use crate::qmdb::any::operation::{update::Ordered as Update, Ordered as Operation};
28
29/// Type alias for a location and its associated key data.
30type LocatedKey<F, K, V> = Option<(Location<F>, Update<K, V>)>;
31
32impl<
33        F: Family,
34        E: Context,
35        K: Key,
36        V: ValueEncoding,
37        C: Contiguous<Item = Operation<F, K, V>>,
38        I: Index<Value = Location<F>>,
39        H: Hasher,
40    > Db<F, E, C, I, H, Update<K, V>>
41where
42    Operation<F, K, V>: Codec,
43{
44    async fn get_update_op(
45        reader: &impl Reader<Item = Operation<F, K, V>>,
46        loc: Location<F>,
47    ) -> Result<Update<K, V>, crate::qmdb::Error<F>> {
48        match reader.read(*loc).await? {
49            Operation::Update(key_data) => Ok(key_data),
50            _ => unreachable!("expected update operation at location {}", loc),
51        }
52    }
53
54    /// Whether the span defined by `span_start` and `span_end` contains `key`.
55    pub fn span_contains(span_start: &K, span_end: &K, key: &K) -> bool {
56        if span_start >= span_end {
57            // cyclic span case
58            if key >= span_start || key < span_end {
59                return true;
60            }
61        } else {
62            // normal span case
63            if key >= span_start && key < span_end {
64                return true;
65            }
66        }
67
68        false
69    }
70
71    /// Find the span produced by the provided locations that contains `key`, if any.
72    async fn find_span(
73        &self,
74        locs: impl IntoIterator<Item = Location<F>>,
75        key: &K,
76    ) -> Result<LocatedKey<F, K, V>, crate::qmdb::Error<F>> {
77        let reader = self.log.reader().await;
78        for loc in locs {
79            // Iterate over conflicts in the snapshot entry to find the span.
80            let data = Self::get_update_op(&reader, loc).await?;
81            if Self::span_contains(&data.key, &data.next_key, key) {
82                return Ok(Some((loc, data)));
83            }
84        }
85
86        Ok(None)
87    }
88
89    /// Get the operation that defines the span whose range contains `key`, or None if the DB is
90    /// empty.
91    pub async fn get_span(&self, key: &K) -> Result<LocatedKey<F, K, V>, crate::qmdb::Error<F>> {
92        if self.is_empty() {
93            return Ok(None);
94        }
95
96        // If the translated key is in the snapshot, get a cursor to look for the key.
97        // Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
98        let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
99        let span = self.find_span(locs, key).await?;
100        if let Some(span) = span {
101            return Ok(Some(span));
102        }
103
104        let Some((iter, _)) = self.snapshot.prev_translated_key(key) else {
105            // DB is empty.
106            return Ok(None);
107        };
108
109        // Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
110        let locs: Vec<Location<F>> = iter.copied().collect();
111        let span = self
112            .find_span(locs, key)
113            .await?
114            .expect("a span that includes any given key should always exist if db is non-empty");
115
116        Ok(Some(span))
117    }
118
119    /// Get the (value, next-key) pair of `key` in the db, or None if it has no value.
120    pub async fn get_all(&self, key: &K) -> Result<Option<(V::Value, K)>, crate::qmdb::Error<F>> {
121        self.get_with_loc(key)
122            .await
123            .map(|res| res.map(|(data, _)| (data.value, data.next_key)))
124    }
125
126    /// Returns the key data for `key` with its location, or None if the key is not active.
127    pub(crate) async fn get_with_loc(
128        &self,
129        key: &K,
130    ) -> Result<Option<(Update<K, V>, Location<F>)>, crate::qmdb::Error<F>> {
131        // Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
132        let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
133        let reader = self.log.reader().await;
134        for loc in locs {
135            let op = reader.read(*loc).await?;
136            assert!(
137                op.is_update(),
138                "location does not reference update operation. loc={loc}"
139            );
140            if op.key().expect("update operation must have key") == key {
141                let Operation::Update(data) = op else {
142                    unreachable!("expected update operation");
143                };
144                return Ok(Some((data, loc)));
145            }
146        }
147
148        Ok(None)
149    }
150
151    /// Streams all active (key, value) pairs in the database in key order, starting from the first
152    /// active key greater than or equal to `start`.
153    pub async fn stream_range<'a>(
154        &'a self,
155        start: K,
156    ) -> Result<
157        impl Stream<Item = Result<(K, V::Value), crate::qmdb::Error<F>>> + 'a,
158        crate::qmdb::Error<F>,
159    >
160    where
161        V: 'a,
162    {
163        let start_iter = self.snapshot.get(&start);
164        let mut init_pending = self.fetch_all_updates(start_iter).await?;
165        init_pending.retain(|x| x.key >= start);
166
167        Ok(stream::unfold(
168            (start, init_pending),
169            move |(driver_key, mut pending): (K, Vec<Update<K, V>>)| async move {
170                if !pending.is_empty() {
171                    let item = pending.pop().expect("pending is not empty");
172                    return Some((Ok((item.key, item.value)), (driver_key, pending)));
173                }
174
175                let Some((iter, wrapped)) = self.snapshot.next_translated_key(&driver_key) else {
176                    return None; // DB is empty
177                };
178                if wrapped {
179                    return None; // End of DB
180                }
181
182                // TODO(https://github.com/commonwarexyz/monorepo/issues/2527): concurrently
183                // fetch a much larger batch of "pending" keys.
184                match self.fetch_all_updates(iter).await {
185                    Ok(mut pending) => {
186                        let item = pending.pop().expect("pending is not empty");
187                        let key = item.key.clone();
188                        Some((Ok((item.key, item.value)), (key, pending)))
189                    }
190                    Err(e) => Some((Err(e), (driver_key, pending))),
191                }
192            },
193        ))
194    }
195
196    /// Fetches all update operations corresponding to the input locations, returning the result in
197    /// reverse order of the keys.
198    async fn fetch_all_updates(
199        &self,
200        locs: impl IntoIterator<Item = &Location<F>>,
201    ) -> Result<Vec<Update<K, V>>, crate::qmdb::Error<F>> {
202        let reader = self.log.reader().await;
203        let futures = locs
204            .into_iter()
205            .map(|loc| Self::get_update_op(&reader, *loc));
206        let mut updates = try_join_all(futures).await?;
207        updates.sort_by(|a, b| b.key.cmp(&a.key));
208
209        Ok(updates)
210    }
211}
212
213/// Returns the next key to `key` within `possible_next`. The result will "cycle around" to the
214/// first key if `key` is the last key.
215///
216/// # Panics
217///
218/// Panics if `possible_next` is empty.
219pub(crate) fn find_next_key<K: Ord + Clone>(key: &K, possible_next: &BTreeSet<K>) -> K {
220    let next = possible_next
221        .range((Bound::Excluded(key), Bound::Unbounded))
222        .next();
223    if let Some(next) = next {
224        return next.clone();
225    }
226    possible_next
227        .first()
228        .expect("possible_next should not be empty")
229        .clone()
230}
231
232/// Returns the previous key to `key` within `possible_previous`. The result will "cycle around"
233/// to the last key if `key` is the first key.
234///
235/// # Panics
236///
237/// Panics if `possible_previous` is empty.
238pub(crate) fn find_prev_key<'a, K: Ord, V>(
239    key: &K,
240    possible_previous: &'a BTreeMap<K, V>,
241) -> (&'a K, &'a V) {
242    let prev = possible_previous
243        .range((Bound::Unbounded, Bound::Excluded(key)))
244        .next_back();
245    if let Some(prev) = prev {
246        return prev;
247    }
248    possible_previous
249        .iter()
250        .next_back()
251        .expect("possible_previous should not be empty")
252}
253
254#[cfg(any(test, feature = "test-traits"))]
255crate::qmdb::any::traits::impl_db_any! {
256    [E, K, V, C, I, H] Db<crate::merkle::mmr::Family, E, C, I, H, Update<K, V>>
257    where {
258        E: Context,
259        K: Key,
260        V: ValueEncoding + 'static,
261        C: PersistableMutableLog<Operation<crate::merkle::mmr::Family, K, V>>,
262        I: Index<Value = crate::mmr::Location> + 'static,
263        H: Hasher,
264        Operation<crate::merkle::mmr::Family, K, V>: Codec,
265        V::Value: Send + Sync,
266    }
267    Family = crate::merkle::mmr::Family, Key = K, Value = V::Value, Digest = H::Digest
268}
269
270#[cfg(any(test, feature = "test-traits"))]
271crate::qmdb::any::traits::impl_provable! {
272    [E, K, V, C, I, H] Db<crate::merkle::mmr::Family, E, C, I, H, Update<K, V>>
273    where {
274        E: Context,
275        K: Key,
276        V: ValueEncoding + 'static,
277        C: PersistableMutableLog<Operation<crate::merkle::mmr::Family, K, V>>,
278        I: Index<Value = crate::mmr::Location> + 'static,
279        H: Hasher,
280        Operation<crate::merkle::mmr::Family, K, V>: Codec,
281        V::Value: Send + Sync,
282    }
283    Family = crate::merkle::mmr::Family, Operation = Operation<crate::merkle::mmr::Family, K, V>
284}
285
286#[cfg(any(test, feature = "test-traits"))]
287crate::qmdb::any::traits::impl_db_any! {
288    [E, K, V, C, I, H] Db<crate::merkle::mmb::Family, E, C, I, H, Update<K, V>>
289    where {
290        E: Context,
291        K: Key,
292        V: ValueEncoding + 'static,
293        C: PersistableMutableLog<Operation<crate::merkle::mmb::Family, K, V>>,
294        I: Index<Value = crate::merkle::Location<crate::merkle::mmb::Family>> + 'static,
295        H: Hasher,
296        Operation<crate::merkle::mmb::Family, K, V>: Codec,
297        V::Value: Send + Sync,
298    }
299    Family = crate::merkle::mmb::Family, Key = K, Value = V::Value, Digest = H::Digest
300}
301
302#[cfg(any(test, feature = "test-traits"))]
303crate::qmdb::any::traits::impl_provable! {
304    [E, K, V, C, I, H] Db<crate::merkle::mmb::Family, E, C, I, H, Update<K, V>>
305    where {
306        E: Context,
307        K: Key,
308        V: ValueEncoding + 'static,
309        C: PersistableMutableLog<Operation<crate::merkle::mmb::Family, K, V>>,
310        I: Index<Value = crate::merkle::Location<crate::merkle::mmb::Family>> + 'static,
311        H: Hasher,
312        Operation<crate::merkle::mmb::Family, K, V>: Codec,
313        V::Value: Send + Sync,
314    }
315    Family = crate::merkle::mmb::Family, Operation = Operation<crate::merkle::mmb::Family, K, V>
316}
317
318#[cfg(test)]
319mod test {
320    use super::*;
321    use crate::{
322        merkle::mmr,
323        qmdb::any::traits::{DbAny, UnmerkleizedBatch as _},
324    };
325    use commonware_cryptography::{sha256::Digest, Sha256};
326    use commonware_runtime::{deterministic::Context, Metrics};
327    use commonware_utils::sequence::FixedBytes;
328    use core::{future::Future, pin::Pin};
329
330    pub(crate) async fn test_ordered_any_db_empty<
331        D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
332    >(
333        context: Context,
334        mut db: D,
335        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
336    ) {
337        assert!(db.get_metadata().await.unwrap().is_none());
338        assert!(matches!(
339            db.prune(db.inactivity_floor_loc().await).await,
340            Ok(())
341        ));
342
343        // Make sure closing/reopening gets us back to the same state, even after adding an
344        // uncommitted op, and even without a clean shutdown.
345        let d1 = FixedBytes::from([1u8; 4]);
346        let d2 = Sha256::fill(2u8);
347        let root = db.root();
348        // Write without applying (unapplied batch should be lost on reopen).
349        {
350            let _batch = db.new_batch().write(d1, Some(d2));
351            // Don't merkleize/apply -- simulates uncommitted write
352        }
353        let mut db = reopen_db(context.with_label("reopen1")).await;
354        assert_eq!(db.root(), root);
355
356        // Test applying an empty batch on an empty db.
357        let metadata = Sha256::fill(3u8);
358        let merkleized = db.new_batch().merkleize(&db, Some(metadata)).await.unwrap();
359        let range = db.apply_batch(merkleized).await.unwrap();
360        db.commit().await.unwrap();
361        assert_eq!(range.start, Location::new(1));
362        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
363        let root = db.root();
364        assert!(matches!(
365            db.prune(db.inactivity_floor_loc().await).await,
366            Ok(())
367        ));
368
369        // Re-opening the DB without a clean shutdown should still recover the correct state.
370        let mut db = reopen_db(context.with_label("reopen2")).await;
371        assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
372        assert_eq!(db.root(), root);
373
374        // Confirm the inactivity floor doesn't fall endlessly behind with multiple commits.
375        for _ in 1..100 {
376            let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
377            let _ = db.apply_batch(merkleized).await.unwrap();
378            db.commit().await.unwrap();
379        }
380        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
381        let _ = db.apply_batch(merkleized).await.unwrap();
382        db.commit().await.unwrap();
383        db.destroy().await.unwrap();
384    }
385
386    pub(crate) async fn test_ordered_any_db_basic<
387        D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
388    >(
389        context: Context,
390        mut db: D,
391        reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
392    ) {
393        // Build a db with 2 keys and make sure updates and deletions of those keys work as
394        // expected.
395        let key1 = FixedBytes::from([1u8; 4]);
396        let key2 = FixedBytes::from([2u8; 4]);
397        let val1 = Sha256::fill(3u8);
398        let val2 = Sha256::fill(4u8);
399
400        assert!(db.get(&key1).await.unwrap().is_none());
401        assert!(db.get(&key2).await.unwrap().is_none());
402
403        assert!(db.get(&key1).await.unwrap().is_none());
404        let merkleized = db
405            .new_batch()
406            .write(key1.clone(), Some(val1))
407            .merkleize(&db, None)
408            .await
409            .unwrap();
410        db.apply_batch(merkleized).await.unwrap();
411        db.commit().await.unwrap();
412        assert_eq!(db.get(&key1).await.unwrap().unwrap(), val1);
413        assert!(db.get(&key2).await.unwrap().is_none());
414
415        assert!(db.get(&key2).await.unwrap().is_none());
416        let merkleized = db
417            .new_batch()
418            .write(key2.clone(), Some(val2))
419            .merkleize(&db, None)
420            .await
421            .unwrap();
422        db.apply_batch(merkleized).await.unwrap();
423        db.commit().await.unwrap();
424        assert_eq!(db.get(&key1).await.unwrap().unwrap(), val1);
425        assert_eq!(db.get(&key2).await.unwrap().unwrap(), val2);
426
427        let merkleized = db
428            .new_batch()
429            .write(key1.clone(), None)
430            .merkleize(&db, None)
431            .await
432            .unwrap();
433        db.apply_batch(merkleized).await.unwrap();
434        db.commit().await.unwrap();
435        assert!(db.get(&key1).await.unwrap().is_none());
436        assert_eq!(db.get(&key2).await.unwrap().unwrap(), val2);
437
438        let new_val = Sha256::fill(5u8);
439        let merkleized = db
440            .new_batch()
441            .write(key1.clone(), Some(new_val))
442            .merkleize(&db, None)
443            .await
444            .unwrap();
445        db.apply_batch(merkleized).await.unwrap();
446        db.commit().await.unwrap();
447        assert_eq!(db.get(&key1).await.unwrap().unwrap(), new_val);
448
449        let merkleized = db
450            .new_batch()
451            .write(key2.clone(), Some(new_val))
452            .merkleize(&db, None)
453            .await
454            .unwrap();
455        db.apply_batch(merkleized).await.unwrap();
456        db.commit().await.unwrap();
457        assert_eq!(db.get(&key2).await.unwrap().unwrap(), new_val);
458
459        // Empty commit batch (no preceding uncommitted writes).
460        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
461        let _ = db.apply_batch(merkleized).await.unwrap();
462        db.commit().await.unwrap();
463
464        // Make sure key1 is already active.
465        assert!(db.get(&key1).await.unwrap().is_some());
466
467        // Delete all keys.
468        assert!(db.get(&key1).await.unwrap().is_some());
469        let merkleized = db
470            .new_batch()
471            .write(key1.clone(), None)
472            .merkleize(&db, None)
473            .await
474            .unwrap();
475        db.apply_batch(merkleized).await.unwrap();
476        db.commit().await.unwrap();
477        assert!(db.get(&key2).await.unwrap().is_some());
478        let merkleized = db
479            .new_batch()
480            .write(key2.clone(), None)
481            .merkleize(&db, None)
482            .await
483            .unwrap();
484        db.apply_batch(merkleized).await.unwrap();
485        db.commit().await.unwrap();
486        assert!(db.get(&key1).await.unwrap().is_none());
487        assert!(db.get(&key2).await.unwrap().is_none());
488
489        // Empty commit batch.
490        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
491        let _ = db.apply_batch(merkleized).await.unwrap();
492        db.commit().await.unwrap();
493
494        // Multiple deletions of the same key should be a no-op.
495        assert!(db.get(&key1).await.unwrap().is_none());
496
497        // Deletions of non-existent keys should be a no-op.
498        let key3 = FixedBytes::from([6u8; 4]);
499        assert!(db.get(&key3).await.unwrap().is_none());
500
501        // Make sure closing/reopening gets us back to the same state.
502        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
503        let _ = db.apply_batch(merkleized).await.unwrap();
504        db.commit().await.unwrap();
505        let op_count = db.bounds().await.end;
506        let root = db.root();
507        let mut db = reopen_db(context.with_label("reopen1")).await;
508        assert_eq!(db.bounds().await.end, op_count);
509        assert_eq!(db.root(), root);
510
511        // Re-activate the keys by updating them.
512        let merkleized = db
513            .new_batch()
514            .write(key1.clone(), Some(val1))
515            .merkleize(&db, None)
516            .await
517            .unwrap();
518        db.apply_batch(merkleized).await.unwrap();
519        db.commit().await.unwrap();
520
521        let merkleized = db
522            .new_batch()
523            .write(key2.clone(), Some(val2))
524            .merkleize(&db, None)
525            .await
526            .unwrap();
527        db.apply_batch(merkleized).await.unwrap();
528        db.commit().await.unwrap();
529
530        let merkleized = db
531            .new_batch()
532            .write(key1.clone(), None)
533            .merkleize(&db, None)
534            .await
535            .unwrap();
536        db.apply_batch(merkleized).await.unwrap();
537        db.commit().await.unwrap();
538
539        let merkleized = db
540            .new_batch()
541            .write(key2.clone(), Some(val1))
542            .merkleize(&db, None)
543            .await
544            .unwrap();
545        db.apply_batch(merkleized).await.unwrap();
546        db.commit().await.unwrap();
547
548        let merkleized = db
549            .new_batch()
550            .write(key1.clone(), Some(val2))
551            .merkleize(&db, None)
552            .await
553            .unwrap();
554        db.apply_batch(merkleized).await.unwrap();
555        db.commit().await.unwrap();
556
557        // Empty commit batch.
558        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
559        let _ = db.apply_batch(merkleized).await.unwrap();
560        db.commit().await.unwrap();
561
562        // Confirm close/reopen gets us back to the same state.
563        let op_count = db.bounds().await.end;
564        let root = db.root();
565        let mut db = reopen_db(context.with_label("reopen2")).await;
566
567        assert_eq!(db.root(), root);
568        assert_eq!(db.bounds().await.end, op_count);
569
570        // Commit will raise the inactivity floor, which won't affect state but will affect the
571        // root.
572        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
573        let _ = db.apply_batch(merkleized).await.unwrap();
574        db.commit().await.unwrap();
575
576        assert!(db.root() != root);
577
578        // Pruning inactive ops should not affect current state or root.
579        let root = db.root();
580        db.prune(db.inactivity_floor_loc().await).await.unwrap();
581        assert_eq!(db.root(), root);
582
583        db.destroy().await.unwrap();
584    }
585
586    /// Builds a db with colliding keys to make sure the "cycle around when there are translated
587    /// key collisions" edge case is exercised.
588    pub(crate) async fn test_ordered_any_update_collision_edge_case<
589        D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
590    >(
591        mut db: D,
592    ) {
593        // This DB uses a TwoCap so we use equivalent two byte prefixes for each key to ensure
594        // collisions.
595        let key1 = FixedBytes::from([0xFFu8, 0xFFu8, 5u8, 5u8]);
596        let key2 = FixedBytes::from([0xFFu8, 0xFFu8, 6u8, 6u8]);
597        // Our last must precede the others to trigger previous-key cycle around.
598        let key3 = FixedBytes::from([0xFFu8, 0xFFu8, 0u8, 0u8]);
599        let val = Sha256::fill(1u8);
600
601        let merkleized = db
602            .new_batch()
603            .write(key1.clone(), Some(val))
604            .write(key2.clone(), Some(val))
605            .write(key3.clone(), Some(val))
606            .merkleize(&db, None)
607            .await
608            .unwrap();
609        db.apply_batch(merkleized).await.unwrap();
610
611        assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
612        assert_eq!(db.get(&key2).await.unwrap().unwrap(), val);
613        assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
614
615        let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
616        let _ = db.apply_batch(merkleized).await.unwrap();
617        db.commit().await.unwrap();
618        db.destroy().await.unwrap();
619    }
620}