1use crate::{
67 index::Factory as IndexFactory,
68 journal::{
69 authenticated::Inner,
70 contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
71 },
72 merkle::{full::Config as MerkleConfig, Family, Location},
73 qmdb::{
74 any::operation::{Operation, Update},
75 bitmap::Shared,
76 operation::Committable,
77 ROOT_BAGGING,
78 },
79 translator::Translator,
80 Context,
81};
82use commonware_codec::CodecShared;
83use commonware_cryptography::Hasher;
84use commonware_parallel::Strategy;
85use std::sync::Arc;
86use tracing::warn;
87
88pub mod batch;
89pub mod db;
90pub mod operation;
91#[cfg(any(test, feature = "test-traits"))]
92pub mod traits;
93pub mod value;
94pub use value::{FixedValue, ValueEncoding, VariableValue};
95pub mod ordered;
96pub(crate) mod sync;
97pub mod unordered;
98
99pub(crate) const BITMAP_CHUNK_BYTES: usize = 64;
100
101#[derive(Clone)]
103pub struct Config<T: Translator, J, S: Strategy> {
104 pub merkle_config: MerkleConfig<S>,
106
107 pub journal_config: J,
109
110 pub translator: T,
112}
113
114pub type FixedConfig<T, S> = Config<T, FConfig, S>;
116
117pub type VariableConfig<T, C, S> = Config<T, VConfig<C>, S>;
119
120pub async fn init<F, E, U, H, T, I, J, S>(
122 context: E,
123 cfg: Config<T, J::Config, S>,
124) -> Result<db::Db<F, E, J, I, H, U, BITMAP_CHUNK_BYTES, S>, crate::qmdb::Error<F>>
125where
126 F: Family,
127 E: Context,
128 U: Update + Send + Sync,
129 H: Hasher,
130 T: Translator,
131 I: IndexFactory<T, Value = Location<F>>,
132 J: Inner<E, Item = Operation<F, U>>,
133 S: Strategy,
134 Operation<F, U>: Committable + CodecShared,
135{
136 init_with_bitmap::<F, E, U, H, T, I, J, S, BITMAP_CHUNK_BYTES>(context, cfg, None).await
137}
138
139pub(crate) async fn init_with_bitmap<F, E, U, H, T, I, J, S, const N: usize>(
142 context: E,
143 cfg: Config<T, J::Config, S>,
144 bitmap: Option<Arc<Shared<N>>>,
145) -> Result<db::Db<F, E, J, I, H, U, N, S>, crate::qmdb::Error<F>>
146where
147 F: Family,
148 E: Context,
149 U: Update + Send + Sync,
150 H: Hasher,
151 T: Translator,
152 I: IndexFactory<T, Value = Location<F>>,
153 J: Inner<E, Item = Operation<F, U>>,
154 S: Strategy,
155 Operation<F, U>: Committable + CodecShared,
156{
157 let mut log = J::init::<F, H, S>(
158 context.child("log"),
159 cfg.merkle_config,
160 cfg.journal_config,
161 Operation::is_commit,
162 ROOT_BAGGING,
163 )
164 .await?;
165
166 if log.size().await == 0 {
167 warn!("Authenticated log is empty, initializing new db");
168 let commit_floor = Operation::CommitFloor(None, Location::new(0));
169 log.append(&commit_floor).await?;
170 log.sync().await?;
171 }
172
173 let index = I::new(context.child("index"), cfg.translator);
174 let metrics = db::Metrics::new(context);
175 db::Db::init_from_log(index, log, bitmap, metrics).await
176}
177
178#[cfg(test)]
179pub(crate) mod test {
181 use super::*;
182 use crate::{
183 journal::contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
184 qmdb::{
185 self,
186 any::{FixedConfig, MerkleConfig, VariableConfig},
187 },
188 translator::OneCap,
189 };
190 use commonware_codec::{Codec, CodecShared};
191 use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
192 use commonware_runtime::{
193 buffer::paged::CacheRef, deterministic::Context, BufferPooler, Supervisor as _,
194 };
195 use commonware_utils::{NZUsize, NZU16, NZU64};
196 use core::{future::Future, pin::Pin};
197 use std::{
198 collections::HashMap,
199 num::{NonZeroU16, NonZeroUsize},
200 };
201
202 pub(crate) fn colliding_digest(prefix: u8, suffix: u64) -> Digest {
203 let mut bytes = [0u8; 32];
204 bytes[0] = prefix;
205 bytes[24..].copy_from_slice(&suffix.to_be_bytes());
206 Digest::from(bytes)
207 }
208
209 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
211 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
212
213 pub(crate) fn fixed_db_config<T: Translator + Default>(
214 suffix: &str,
215 pooler: &impl BufferPooler,
216 ) -> FixedConfig<T, Sequential> {
217 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
218 FixedConfig {
219 merkle_config: MerkleConfig {
220 journal_partition: format!("journal-{suffix}"),
221 metadata_partition: format!("metadata-{suffix}"),
222 items_per_blob: NZU64!(11),
223 write_buffer: NZUsize!(1024),
224 strategy: Sequential,
225 page_cache: page_cache.clone(),
226 },
227 journal_config: FConfig {
228 partition: format!("log-journal-{suffix}"),
229 items_per_blob: NZU64!(7),
230 page_cache,
231 write_buffer: NZUsize!(1024),
232 },
233 translator: T::default(),
234 }
235 }
236
237 pub(crate) fn variable_db_config<T: Translator + Default>(
238 suffix: &str,
239 pooler: &impl BufferPooler,
240 ) -> VariableConfig<T, ((), ()), Sequential> {
241 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
242 VariableConfig {
243 merkle_config: MerkleConfig {
244 journal_partition: format!("journal-{suffix}"),
245 metadata_partition: format!("metadata-{suffix}"),
246 items_per_blob: NZU64!(11),
247 write_buffer: NZUsize!(1024),
248 strategy: Sequential,
249 page_cache: page_cache.clone(),
250 },
251 journal_config: VConfig {
252 partition: format!("log-journal-{suffix}"),
253 items_per_section: NZU64!(7),
254 compression: None,
255 codec_config: ((), ()),
256 page_cache,
257 write_buffer: NZUsize!(1024),
258 },
259 translator: T::default(),
260 }
261 }
262
263 use crate::{
264 index::Unordered as UnorderedIndex,
265 journal::contiguous::Mutable,
266 merkle::mmr,
267 qmdb::any::{
268 db::Db as AnyDb,
269 operation::{update::Update as UpdateTrait, Operation as AnyOperation},
270 traits::{DbAny, Provable, UnmerkleizedBatch as _},
271 },
272 };
273
274 type Error = crate::qmdb::Error<mmr::Family>;
275 type Location = mmr::Location;
276
277 pub(crate) trait RewindableDb {
278 fn rewind_to_size(
279 &mut self,
280 size: Location,
281 ) -> impl Future<Output = Result<(), Error>> + Send;
282 }
283
284 impl<E, U, C, I, H, const N: usize, S> RewindableDb for AnyDb<mmr::Family, E, C, I, H, U, N, S>
285 where
286 E: crate::Context,
287 U: UpdateTrait,
288 C: Mutable<Item = AnyOperation<mmr::Family, U>>,
289 I: UnorderedIndex<Value = Location>,
290 H: Hasher,
291 AnyOperation<mmr::Family, U>: Codec,
292 S: Strategy,
293 {
294 async fn rewind_to_size(&mut self, size: Location) -> Result<(), Error> {
295 self.rewind(size).await?;
296 Ok(())
297 }
298 }
299
300 pub(crate) async fn test_any_db_non_empty_recovery<F: Family, D, V: Clone + CodecShared>(
302 context: Context,
303 mut db: D,
304 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
305 make_value: impl Fn(u64) -> V,
306 ) where
307 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
308 {
309 const ELEMENTS: u64 = 1000;
310
311 {
313 let mut batch = db.new_batch();
314 for i in 0u64..ELEMENTS {
315 let k = Sha256::hash(&i.to_be_bytes());
316 let v = make_value(i * 1000);
317 batch = batch.write(k, Some(v));
318 }
319 let merkleized = batch.merkleize(&db, None).await.unwrap();
320 db.apply_batch(merkleized).await.unwrap();
321 }
322 db.commit().await.unwrap();
323 db.prune(db.sync_boundary().await).await.unwrap();
324 let root = db.root();
325 let op_count = db.size().await;
326 let inactivity_floor_loc = db.inactivity_floor_loc().await;
327
328 let db = reopen_db(context.child("reopen").with_attribute("index", 1)).await;
329 assert_eq!(db.size().await, op_count);
330 assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
331 assert_eq!(db.root(), root);
332
333 {
335 let mut batch = db.new_batch();
336 for i in 0u64..ELEMENTS {
337 let k = Sha256::hash(&i.to_be_bytes());
338 let v = make_value((i + 1) * 10000);
339 batch = batch.write(k, Some(v));
340 }
341 let _merkleized = batch.merkleize(&db, None).await.unwrap();
342 }
343 let db = reopen_db(context.child("reopen").with_attribute("index", 2)).await;
344 assert_eq!(db.size().await, op_count);
345 assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
346 assert_eq!(db.root(), root);
347
348 {
350 let mut batch = db.new_batch();
351 for i in 0u64..ELEMENTS {
352 let k = Sha256::hash(&i.to_be_bytes());
353 let v = make_value((i + 1) * 10000);
354 batch = batch.write(k, Some(v));
355 }
356 let _merkleized = batch.merkleize(&db, None).await.unwrap();
357 }
358 let db = reopen_db(context.child("reopen").with_attribute("index", 3)).await;
359 assert_eq!(db.size().await, op_count);
360 assert_eq!(db.root(), root);
361
362 for _ in 0..3 {
364 let mut batch = db.new_batch();
365 for i in 0u64..ELEMENTS {
366 let k = Sha256::hash(&i.to_be_bytes());
367 let v = make_value((i + 1) * 10000);
368 batch = batch.write(k, Some(v));
369 }
370 let _merkleized = batch.merkleize(&db, None).await.unwrap();
371 }
372 let mut db = reopen_db(context.child("reopen").with_attribute("index", 4)).await;
373 assert_eq!(db.size().await, op_count);
374 assert_eq!(db.root(), root);
375
376 {
378 let mut batch = db.new_batch();
379 for i in 0u64..ELEMENTS {
380 let k = Sha256::hash(&i.to_be_bytes());
381 let v = make_value((i + 1) * 10000);
382 batch = batch.write(k, Some(v));
383 }
384 let merkleized = batch.merkleize(&db, None).await.unwrap();
385 db.apply_batch(merkleized).await.unwrap();
386 }
387 db.commit().await.unwrap();
388 let db = reopen_db(context.child("reopen").with_attribute("index", 5)).await;
389 assert!(db.size().await > op_count);
390 assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
391 assert_ne!(db.root(), root);
392
393 db.destroy().await.unwrap();
394 }
395
396 pub(crate) async fn test_any_db_empty_recovery<F: Family, D, V: Clone + CodecShared>(
398 context: Context,
399 db: D,
400 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
401 make_value: impl Fn(u64) -> V,
402 ) where
403 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
404 {
405 let root = db.root();
406
407 let db = reopen_db(context.child("reopen").with_attribute("index", 1)).await;
408 assert_eq!(db.size().await, 1);
409 assert_eq!(db.root(), root);
410
411 {
413 let mut batch = db.new_batch();
414 for i in 0u64..1000 {
415 let k = Sha256::hash(&i.to_be_bytes());
416 let v = make_value((i + 1) * 10000);
417 batch = batch.write(k, Some(v));
418 }
419 let _merkleized = batch.merkleize(&db, None).await.unwrap();
420 }
421 let db = reopen_db(context.child("reopen").with_attribute("index", 2)).await;
422 assert_eq!(db.size().await, 1);
423 assert_eq!(db.root(), root);
424
425 {
427 let mut batch = db.new_batch();
428 for i in 0u64..1000 {
429 let k = Sha256::hash(&i.to_be_bytes());
430 let v = make_value((i + 1) * 10000);
431 batch = batch.write(k, Some(v));
432 }
433 let _merkleized = batch.merkleize(&db, None).await.unwrap();
434 }
435 drop(db);
436 let db = reopen_db(context.child("reopen").with_attribute("index", 3)).await;
437 assert_eq!(db.size().await, 1);
438 assert_eq!(db.root(), root);
439
440 for _ in 0..3 {
442 let mut batch = db.new_batch();
443 for i in 0u64..1000 {
444 let k = Sha256::hash(&i.to_be_bytes());
445 let v = make_value((i + 1) * 10000);
446 batch = batch.write(k, Some(v));
447 }
448 let _merkleized = batch.merkleize(&db, None).await.unwrap();
449 }
450 drop(db);
451 let mut db = reopen_db(context.child("reopen").with_attribute("index", 4)).await;
452 assert_eq!(db.size().await, 1);
453 assert_eq!(db.root(), root);
454
455 {
457 let mut batch = db.new_batch();
458 for i in 0u64..1000 {
459 let k = Sha256::hash(&i.to_be_bytes());
460 let v = make_value((i + 1) * 10000);
461 batch = batch.write(k, Some(v));
462 }
463 let merkleized = batch.merkleize(&db, None).await.unwrap();
464 db.apply_batch(merkleized).await.unwrap();
465 }
466 db.commit().await.unwrap();
467 drop(db);
468 let db = reopen_db(context.child("reopen").with_attribute("index", 5)).await;
469 assert!(db.size().await > 1);
470 assert_ne!(db.root(), root);
471
472 db.destroy().await.unwrap();
473 }
474
475 pub(crate) async fn test_any_db_rewind_recovery<D, V>(
477 context: Context,
478 mut db: D,
479 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
480 make_value: impl Fn(u64) -> V,
481 ) where
482 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + RewindableDb,
483 V: Clone + CodecShared + Eq + std::fmt::Debug,
484 {
485 let key0 = Sha256::hash(&0u64.to_be_bytes());
486 let key1 = Sha256::hash(&1u64.to_be_bytes());
487 let key2 = Sha256::hash(&2u64.to_be_bytes());
488 let initial_root = db.root();
489 let initial_size = db.size().await;
490 let initial_floor = db.inactivity_floor_loc().await;
491
492 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
494 let empty_range = db.apply_batch(merkleized).await.unwrap();
495 db.commit().await.unwrap();
496 assert_eq!(empty_range.start, initial_size);
497 assert_eq!(db.size().await, empty_range.end);
498 db.rewind_to_size(initial_size).await.unwrap();
499 assert_eq!(db.root(), initial_root);
500 assert_eq!(db.size().await, initial_size);
501 assert_eq!(db.inactivity_floor_loc().await, initial_floor);
502 assert_eq!(db.get_metadata().await.unwrap(), None);
503
504 let value0_a = make_value(10);
505 let value1_a = make_value(11);
506 let metadata_a = make_value(12);
507
508 let merkleized = db
509 .new_batch()
510 .write(key0, Some(value0_a.clone()))
511 .write(key1, Some(value1_a.clone()))
512 .merkleize(&db, Some(metadata_a.clone()))
513 .await
514 .unwrap();
515 let range_a = db.apply_batch(merkleized).await.unwrap();
516 db.commit().await.unwrap();
517
518 let root_a = db.root();
519 let size_a = db.size().await;
520 let floor_a = db.inactivity_floor_loc().await;
521 assert_eq!(size_a, range_a.end);
522
523 let value0_b = make_value(20);
524 let value2_b = make_value(21);
525 let metadata_b = make_value(22);
526
527 let merkleized = db
528 .new_batch()
529 .write(key0, Some(value0_b))
530 .write(key1, None)
531 .write(key2, Some(value2_b))
532 .merkleize(&db, Some(metadata_b))
533 .await
534 .unwrap();
535 let range_b = db.apply_batch(merkleized).await.unwrap();
536 db.commit().await.unwrap();
537 assert_eq!(range_b.start, size_a);
538 assert_ne!(db.root(), root_a);
539
540 let value0_c = make_value(30);
541 let value1_c = make_value(31);
542 let metadata_c = make_value(32);
543 let merkleized = db
544 .new_batch()
545 .write(key0, Some(value0_c))
546 .write(key1, Some(value1_c))
547 .write(key2, None)
548 .merkleize(&db, Some(metadata_c))
549 .await
550 .unwrap();
551 db.apply_batch(merkleized).await.unwrap();
552 db.commit().await.unwrap();
553
554 db.rewind_to_size(size_a).await.unwrap();
558 assert_eq!(db.root(), root_a);
559 assert_eq!(db.size().await, size_a);
560 assert_eq!(db.inactivity_floor_loc().await, floor_a);
561 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
562 assert_eq!(db.get(&key0).await.unwrap(), Some(value0_a));
563 assert_eq!(db.get(&key1).await.unwrap(), Some(value1_a));
564 assert_eq!(db.get(&key2).await.unwrap(), None);
565
566 db.commit().await.unwrap();
567 drop(db);
568 let mut db = reopen_db(context.child("reopen_after_rewind")).await;
569 assert_eq!(db.root(), root_a);
570 assert_eq!(db.size().await, size_a);
571 assert_eq!(db.inactivity_floor_loc().await, floor_a);
572 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
573 assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
574 assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
575 assert_eq!(db.get(&key2).await.unwrap(), None);
576
577 let value2_d = make_value(40);
580 let metadata_d = make_value(41);
581 let merkleized = db
582 .new_batch()
583 .write(key2, Some(value2_d.clone()))
584 .merkleize(&db, Some(metadata_d.clone()))
585 .await
586 .unwrap();
587 db.apply_batch(merkleized).await.unwrap();
588 db.commit().await.unwrap();
589 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d.clone()));
590 assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
591 assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
592 assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d.clone()));
593
594 drop(db);
595 let mut db = reopen_db(context.child("reopen_after_rewind_new_writes")).await;
596 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d));
597 assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
598 assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
599 assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d));
600
601 db.rewind_to_size(initial_size).await.unwrap();
603 assert_eq!(db.root(), initial_root);
604 assert_eq!(db.size().await, initial_size);
605 assert_eq!(db.inactivity_floor_loc().await, initial_floor);
606 assert_eq!(db.get_metadata().await.unwrap(), None);
607 assert_eq!(db.get(&key0).await.unwrap(), None);
608 assert_eq!(db.get(&key1).await.unwrap(), None);
609 assert_eq!(db.get(&key2).await.unwrap(), None);
610
611 db.commit().await.unwrap();
612 drop(db);
613 let db = reopen_db(context.child("reopen_initial_boundary")).await;
614 assert_eq!(db.root(), initial_root);
615 assert_eq!(db.size().await, initial_size);
616 assert_eq!(db.inactivity_floor_loc().await, initial_floor);
617 assert_eq!(db.get_metadata().await.unwrap(), None);
618 assert_eq!(db.get(&key0).await.unwrap(), None);
619 assert_eq!(db.get(&key1).await.unwrap(), None);
620 assert_eq!(db.get(&key2).await.unwrap(), None);
621
622 db.destroy().await.unwrap();
623 }
624
625 pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
627 context: Context,
628 mut db: D,
629 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
630 make_value: impl Fn(u64) -> V,
631 ) where
632 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
633 V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
634 <D as Provable<mmr::Family>>::Operation: Codec,
635 {
636 use crate::qmdb::verify_proof;
637
638 const ELEMENTS: u64 = 1000;
639
640 let mut map = HashMap::<Digest, V>::default();
641 {
642 let mut batch = db.new_batch();
643 for i in 0u64..ELEMENTS {
644 let k = Sha256::hash(&i.to_be_bytes());
645 let v = make_value(i * 1000);
646 batch = batch.write(k, Some(v.clone()));
647 map.insert(k, v);
648 }
649
650 for i in 0u64..ELEMENTS {
652 if i % 3 != 0 {
653 continue;
654 }
655 let k = Sha256::hash(&i.to_be_bytes());
656 let v = make_value((i + 1) * 10000);
657 batch = batch.write(k, Some(v.clone()));
658 map.insert(k, v);
659 }
660
661 for i in 0u64..ELEMENTS {
663 if i % 7 != 1 {
664 continue;
665 }
666 let k = Sha256::hash(&i.to_be_bytes());
667 batch = batch.write(k, None);
668 map.remove(&k);
669 }
670
671 let merkleized = batch.merkleize(&db, None).await.unwrap();
672 db.apply_batch(merkleized).await.unwrap();
673 }
674 db.sync().await.unwrap();
676 db.prune(db.sync_boundary().await).await.unwrap();
677
678 let root = db.root();
680 db.sync().await.unwrap();
681 drop(db);
682 let db = reopen_db(context.child("reopened")).await;
683 assert_eq!(root, db.root());
684
685 for i in 0u64..ELEMENTS {
687 let k = Sha256::hash(&i.to_be_bytes());
688 if let Some(map_value) = map.get(&k) {
689 let Some(db_value) = db.get(&k).await.unwrap() else {
690 panic!("key not found in db: {k}");
691 };
692 assert_eq!(*map_value, db_value);
693 } else {
694 assert!(db.get(&k).await.unwrap().is_none());
695 }
696 }
697
698 let hasher = qmdb::hasher::<Sha256>();
699 let bounds = db.bounds().await;
700 let inactivity_floor = db.inactivity_floor_loc().await;
701 for loc in *inactivity_floor..*bounds.end {
702 let loc = Location::new(loc);
703 let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
704 assert!(verify_proof(&hasher, &proof, loc, &ops, &root));
705 }
706
707 db.destroy().await.unwrap();
708 }
709
710 pub(crate) async fn test_any_db_log_replay<
712 F: Family,
713 D,
714 V: Clone + CodecShared + PartialEq + std::fmt::Debug,
715 >(
716 context: Context,
717 mut db: D,
718 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
719 make_value: impl Fn(u64) -> V,
720 ) where
721 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
722 {
723 const UPDATES: u64 = 100;
725 let k = Sha256::hash(&UPDATES.to_be_bytes());
726 let mut last_value = None;
727 {
728 let mut batch = db.new_batch();
729 for i in 0u64..UPDATES {
730 let v = make_value(i * 1000);
731 last_value = Some(v.clone());
732 batch = batch.write(k, Some(v));
733 }
734 let merkleized = batch.merkleize(&db, None).await.unwrap();
735 db.apply_batch(merkleized).await.unwrap();
736 }
737 db.commit().await.unwrap();
738 let root = db.root();
739
740 drop(db);
742 let db = reopen_db(context.child("reopened")).await;
743 assert_eq!(db.root(), root);
744 assert_eq!(db.get(&k).await.unwrap(), last_value);
745
746 db.destroy().await.unwrap();
747 }
748
749 pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
751 _context: Context,
752 mut db: D,
753 make_value: impl Fn(u64) -> V,
754 ) where
755 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
756 <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
757 {
758 use crate::qmdb::verify_proof;
759 use commonware_utils::NZU64;
760
761 const OPS: u64 = 20;
763 {
764 let mut batch = db.new_batch();
765 for i in 0u64..OPS {
766 let k = Sha256::hash(&i.to_be_bytes());
767 let v = make_value(i * 1000);
768 batch = batch.write(k, Some(v));
769 }
770 let merkleized = batch.merkleize(&db, None).await.unwrap();
771 db.apply_batch(merkleized).await.unwrap();
772 }
773 let root_hash = db.root();
774 let original_op_count = db.size().await;
775
776 let max_ops = NZU64!(10);
778 let start_loc = Location::new(5);
779 let (historical_proof, historical_ops) = db
780 .historical_proof(original_op_count, start_loc, max_ops)
781 .await
782 .unwrap();
783 let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
784
785 assert_eq!(historical_proof.leaves, regular_proof.leaves);
786 assert_eq!(historical_proof.digests, regular_proof.digests);
787 assert_eq!(historical_ops, regular_ops);
788 let hasher = qmdb::hasher::<Sha256>();
789 assert!(verify_proof(
790 &hasher,
791 &historical_proof,
792 start_loc,
793 &historical_ops,
794 &root_hash,
795 ));
796
797 {
799 let mut batch = db.new_batch();
800 for i in OPS..(OPS + 5) {
801 let k = Sha256::hash(&(i + 1000).to_be_bytes()); let v = make_value(i * 1000);
803 batch = batch.write(k, Some(v));
804 }
805 let merkleized = batch.merkleize(&db, None).await.unwrap();
806 db.apply_batch(merkleized).await.unwrap();
807 }
808
809 let (historical_proof2, historical_ops2) = db
811 .historical_proof(original_op_count, start_loc, max_ops)
812 .await
813 .unwrap();
814 assert_eq!(historical_proof2.leaves, original_op_count);
815 assert_eq!(historical_proof2.digests, regular_proof.digests);
816 assert_eq!(historical_ops2, regular_ops);
817 assert!(verify_proof(
818 &hasher,
819 &historical_proof2,
820 start_loc,
821 &historical_ops2,
822 &root_hash,
823 ));
824
825 db.destroy().await.unwrap();
826 }
827
828 pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
830 _context: Context,
831 mut db: D,
832 make_value: impl Fn(u64) -> V,
833 ) where
834 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
835 <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
836 {
837 use crate::qmdb::verify_proof;
838 use commonware_utils::NZU64;
839
840 let mut historical_op_count = Location::new(0);
844 for i in 0u64..2 {
845 let k = Sha256::hash(&i.to_be_bytes());
846 let v = make_value(i * 1000);
847 let merkleized = db
848 .new_batch()
849 .write(k, Some(v))
850 .merkleize(&db, None)
851 .await
852 .unwrap();
853 db.apply_batch(merkleized).await.unwrap();
854 if i == 0 {
855 historical_op_count = db.bounds().await.end;
856 }
857 }
858
859 let expected_ops_len = (*historical_op_count - 1) as usize;
860 let (proof, ops) = db
861 .historical_proof(historical_op_count, Location::new(1), NZU64!(10))
862 .await
863 .unwrap();
864 assert_eq!(proof.leaves, historical_op_count);
865 assert_eq!(ops.len(), expected_ops_len);
866
867 let hasher = qmdb::hasher::<Sha256>();
868
869 {
871 let mut tampered_proof = proof.clone();
872 tampered_proof.digests[0] = Sha256::hash(b"invalid");
873 let root_hash = db.root();
874 assert!(!verify_proof(
875 &hasher,
876 &tampered_proof,
877 Location::new(1),
878 &ops,
879 &root_hash,
880 ));
881 }
882
883 {
885 let mut tampered_proof = proof.clone();
886 tampered_proof.digests.push(Sha256::hash(b"invalid"));
887 let root_hash = db.root();
888 assert!(!verify_proof(
889 &hasher,
890 &tampered_proof,
891 Location::new(1),
892 &ops,
893 &root_hash,
894 ));
895 }
896
897 {
899 let root_hash = db.root();
900 let mut tampered_ops = ops.clone();
901 if tampered_ops.len() >= 2 {
903 tampered_ops.swap(0, 1);
904 assert!(!verify_proof(
905 &hasher,
906 &proof,
907 Location::new(1),
908 &tampered_ops,
909 &root_hash,
910 ));
911 }
912 }
913
914 {
916 let root_hash = db.root();
917 let mut tampered_ops = ops.clone();
918 tampered_ops.push(tampered_ops[0].clone());
919 assert!(!verify_proof(
920 &hasher,
921 &proof,
922 Location::new(1),
923 &tampered_ops,
924 &root_hash,
925 ));
926 }
927
928 {
930 let root_hash = db.root();
931 assert!(!verify_proof(
932 &hasher,
933 &proof,
934 Location::new(2),
935 &ops,
936 &root_hash,
937 ));
938 }
939
940 {
942 let invalid_root = Sha256::hash(b"invalid");
943 assert!(!verify_proof(
944 &hasher,
945 &proof,
946 Location::new(1),
947 &ops,
948 &invalid_root,
949 ));
950 }
951
952 {
954 let mut tampered_proof = proof.clone();
955 tampered_proof.leaves = Location::new(100);
956 let root_hash = db.root();
957 assert!(!verify_proof(
958 &hasher,
959 &tampered_proof,
960 Location::new(1),
961 &ops,
962 &root_hash,
963 ));
964 }
965
966 db.destroy().await.unwrap();
967 }
968
969 pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
971 _context: Context,
972 mut db: D,
973 make_value: impl Fn(u64) -> V,
974 ) where
975 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
976 <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
977 {
978 use commonware_utils::NZU64;
979
980 let initial_size = db.bounds().await.end;
986 let mut boundaries = vec![initial_size];
987 for i in 0u64..5 {
988 let k = Sha256::hash(&i.to_be_bytes());
989 let v = make_value(i * 1000);
990 let merkleized = db
991 .new_batch()
992 .write(k, Some(v))
993 .merkleize(&db, None)
994 .await
995 .unwrap();
996 db.apply_batch(merkleized).await.unwrap();
997 boundaries.push(db.bounds().await.end);
998 }
999
1000 let singleton_size = boundaries[0];
1002 let (single_proof, single_ops) = db
1003 .historical_proof(singleton_size, Location::new(0), NZU64!(1))
1004 .await
1005 .unwrap();
1006 assert_eq!(single_proof.leaves, singleton_size);
1007 assert_eq!(single_ops.len(), 1);
1008
1009 let limited_size = boundaries[2];
1013 let limited_start = boundaries[1];
1014 let expected_limited = (*limited_size - *limited_start) as usize;
1015 assert!(expected_limited > 0);
1016 let (_limited_proof, limited_ops) = db
1017 .historical_proof(limited_size, limited_start, NZU64!(20))
1018 .await
1019 .unwrap();
1020 assert_eq!(limited_ops.len(), expected_limited);
1021
1022 let min_size = boundaries[2];
1025 let max_ops = NZU64!(3);
1026 let expected_min = core::cmp::min(max_ops.get(), *min_size - 1) as usize;
1027 let (min_proof, min_ops) = db
1028 .historical_proof(min_size, Location::new(1), max_ops)
1029 .await
1030 .unwrap();
1031 assert_eq!(min_proof.leaves, min_size);
1032 assert_eq!(min_ops.len(), expected_min);
1033
1034 db.destroy().await.unwrap();
1035 }
1036
1037 pub(crate) async fn test_any_db_multiple_commits_delete_replayed<F: Family, D, V>(
1039 context: Context,
1040 mut db: D,
1041 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
1042 make_value: impl Fn(u64) -> V,
1043 ) where
1044 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
1045 V: Clone + CodecShared + Eq + std::fmt::Debug,
1046 {
1047 let mut map = HashMap::<Digest, V>::default();
1048 const ELEMENTS: u64 = 10;
1049 let metadata_value = make_value(42);
1050 let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
1051 for j in 0u64..ELEMENTS {
1052 let mut batch = db.new_batch();
1053 for i in 0u64..ELEMENTS {
1054 let k = key_at(j, i);
1055 let v = make_value(i * 1000);
1056 batch = batch.write(k, Some(v.clone()));
1057 map.insert(k, v);
1058 }
1059 let merkleized = batch
1060 .merkleize(&db, Some(metadata_value.clone()))
1061 .await
1062 .unwrap();
1063 db.apply_batch(merkleized).await.unwrap();
1064 db.commit().await.unwrap();
1065 }
1066 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
1067 let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
1068
1069 let merkleized = db
1070 .new_batch()
1071 .write(k, None)
1072 .merkleize(&db, None)
1073 .await
1074 .unwrap();
1075 db.apply_batch(merkleized).await.unwrap();
1076 db.commit().await.unwrap();
1077 assert_eq!(db.get_metadata().await.unwrap(), None);
1078 assert!(db.get(&k).await.unwrap().is_none());
1079
1080 let root = db.root();
1081 drop(db);
1082 let db = reopen_db(context.child("reopened")).await;
1083 assert_eq!(root, db.root());
1084 assert_eq!(db.get_metadata().await.unwrap(), None);
1085 assert!(db.get(&k).await.unwrap().is_none());
1086
1087 db.destroy().await.unwrap();
1088 }
1089
1090 use crate::qmdb::any::{
1091 ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
1092 unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
1093 };
1094 use commonware_macros::{test_group, test_traced};
1095 use commonware_parallel::{Sequential, Strategy};
1096 use commonware_runtime::{deterministic, Runner as _};
1097
1098 type UnorderedFixed =
1100 UnorderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1101 type UnorderedVariable =
1102 UnorderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1103 type OrderedFixed =
1104 OrderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1105 type OrderedVariable =
1106 OrderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap, Sequential>;
1107 type UnorderedFixedP1 = unordered::fixed::partitioned::Db<
1108 mmr::Family,
1109 Context,
1110 Digest,
1111 Digest,
1112 Sha256,
1113 OneCap,
1114 1,
1115 Sequential,
1116 >;
1117 type UnorderedVariableP1 = unordered::variable::partitioned::Db<
1118 mmr::Family,
1119 Context,
1120 Digest,
1121 Digest,
1122 Sha256,
1123 OneCap,
1124 1,
1125 Sequential,
1126 >;
1127 type OrderedFixedP1 = ordered::fixed::partitioned::Db<
1128 mmr::Family,
1129 Context,
1130 Digest,
1131 Digest,
1132 Sha256,
1133 OneCap,
1134 1,
1135 Sequential,
1136 >;
1137 type OrderedVariableP1 = ordered::variable::partitioned::Db<
1138 mmr::Family,
1139 Context,
1140 Digest,
1141 Digest,
1142 Sha256,
1143 OneCap,
1144 1,
1145 Sequential,
1146 >;
1147 type UnorderedFixedP2 = unordered::fixed::partitioned::Db<
1148 mmr::Family,
1149 Context,
1150 Digest,
1151 Digest,
1152 Sha256,
1153 OneCap,
1154 2,
1155 Sequential,
1156 >;
1157 type UnorderedVariableP2 = unordered::variable::partitioned::Db<
1158 mmr::Family,
1159 Context,
1160 Digest,
1161 Digest,
1162 Sha256,
1163 OneCap,
1164 2,
1165 Sequential,
1166 >;
1167 type OrderedFixedP2 = ordered::fixed::partitioned::Db<
1168 mmr::Family,
1169 Context,
1170 Digest,
1171 Digest,
1172 Sha256,
1173 OneCap,
1174 2,
1175 Sequential,
1176 >;
1177 type OrderedVariableP2 = ordered::variable::partitioned::Db<
1178 mmr::Family,
1179 Context,
1180 Digest,
1181 Digest,
1182 Sha256,
1183 OneCap,
1184 2,
1185 Sequential,
1186 >;
1187
1188 mod mmb_types {
1190 use super::*;
1191 use crate::{
1192 index::{ordered::Index as OrderedIndex, unordered::Index as UnorderedIndex},
1193 journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
1194 merkle::{mmb, Location},
1195 qmdb::any::{
1196 operation::{update, Operation},
1197 value::{FixedEncoding, VariableEncoding},
1198 },
1199 };
1200
1201 type MmbLocation = Location<mmb::Family>;
1202
1203 pub type MmbUnorderedFixed = super::super::db::Db<
1204 mmb::Family,
1205 Context,
1206 FJournal<
1207 Context,
1208 Operation<mmb::Family, update::Unordered<Digest, FixedEncoding<Digest>>>,
1209 >,
1210 UnorderedIndex<OneCap, MmbLocation>,
1211 Sha256,
1212 update::Unordered<Digest, FixedEncoding<Digest>>,
1213 { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1214 Sequential,
1215 >;
1216
1217 pub type MmbUnorderedVariable = super::super::db::Db<
1218 mmb::Family,
1219 Context,
1220 VJournal<
1221 Context,
1222 Operation<mmb::Family, update::Unordered<Digest, VariableEncoding<Digest>>>,
1223 >,
1224 UnorderedIndex<OneCap, MmbLocation>,
1225 Sha256,
1226 update::Unordered<Digest, VariableEncoding<Digest>>,
1227 { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1228 Sequential,
1229 >;
1230
1231 pub type MmbOrderedFixed = super::super::db::Db<
1232 mmb::Family,
1233 Context,
1234 FJournal<
1235 Context,
1236 Operation<mmb::Family, update::Ordered<Digest, FixedEncoding<Digest>>>,
1237 >,
1238 OrderedIndex<OneCap, MmbLocation>,
1239 Sha256,
1240 update::Ordered<Digest, FixedEncoding<Digest>>,
1241 { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1242 Sequential,
1243 >;
1244
1245 pub type MmbOrderedVariable = super::super::db::Db<
1246 mmb::Family,
1247 Context,
1248 VJournal<
1249 Context,
1250 Operation<mmb::Family, update::Ordered<Digest, VariableEncoding<Digest>>>,
1251 >,
1252 OrderedIndex<OneCap, MmbLocation>,
1253 Sha256,
1254 update::Ordered<Digest, VariableEncoding<Digest>>,
1255 { crate::qmdb::any::BITMAP_CHUNK_BYTES },
1256 Sequential,
1257 >;
1258 }
1259 use mmb_types::*;
1260
1261 #[inline]
1262 fn to_digest(i: u64) -> Digest {
1263 Sha256::hash(&i.to_be_bytes())
1264 }
1265
1266 macro_rules! with_mmr_variants {
1268 ($cb:ident!($($args:tt)*)) => {
1269 $cb!($($args)*, "uf", UnorderedFixed, mmr::Family, fixed_db_config);
1270 $cb!($($args)*, "uv", UnorderedVariable, mmr::Family, variable_db_config);
1271 $cb!($($args)*, "of", OrderedFixed, mmr::Family, fixed_db_config);
1272 $cb!($($args)*, "ov", OrderedVariable, mmr::Family, variable_db_config);
1273 $cb!($($args)*, "ufp1", UnorderedFixedP1, mmr::Family, fixed_db_config);
1274 $cb!($($args)*, "uvp1", UnorderedVariableP1, mmr::Family, variable_db_config);
1275 $cb!($($args)*, "ofp1", OrderedFixedP1, mmr::Family, fixed_db_config);
1276 $cb!($($args)*, "ovp1", OrderedVariableP1, mmr::Family, variable_db_config);
1277 $cb!($($args)*, "ufp2", UnorderedFixedP2, mmr::Family, fixed_db_config);
1278 $cb!($($args)*, "uvp2", UnorderedVariableP2, mmr::Family, variable_db_config);
1279 $cb!($($args)*, "ofp2", OrderedFixedP2, mmr::Family, fixed_db_config);
1280 $cb!($($args)*, "ovp2", OrderedVariableP2, mmr::Family, variable_db_config);
1281 };
1282 }
1283
1284 macro_rules! with_all_variants {
1286 ($cb:ident!($($args:tt)*)) => {
1287 $cb!($($args)*, "uf", UnorderedFixed, mmr::Family, fixed_db_config);
1288 $cb!($($args)*, "uv", UnorderedVariable, mmr::Family, variable_db_config);
1289 $cb!($($args)*, "of", OrderedFixed, mmr::Family, fixed_db_config);
1290 $cb!($($args)*, "ov", OrderedVariable, mmr::Family, variable_db_config);
1291 $cb!($($args)*, "ufp1", UnorderedFixedP1, mmr::Family, fixed_db_config);
1292 $cb!($($args)*, "uvp1", UnorderedVariableP1, mmr::Family, variable_db_config);
1293 $cb!($($args)*, "ofp1", OrderedFixedP1, mmr::Family, fixed_db_config);
1294 $cb!($($args)*, "ovp1", OrderedVariableP1, mmr::Family, variable_db_config);
1295 $cb!($($args)*, "ufp2", UnorderedFixedP2, mmr::Family, fixed_db_config);
1296 $cb!($($args)*, "uvp2", UnorderedVariableP2, mmr::Family, variable_db_config);
1297 $cb!($($args)*, "ofp2", OrderedFixedP2, mmr::Family, fixed_db_config);
1298 $cb!($($args)*, "ovp2", OrderedVariableP2, mmr::Family, variable_db_config);
1299 $cb!($($args)*, "uf_mmb", MmbUnorderedFixed, mmb::Family, fixed_db_config);
1300 $cb!($($args)*, "uv_mmb", MmbUnorderedVariable, mmb::Family, variable_db_config);
1301 $cb!($($args)*, "of_mmb", MmbOrderedFixed, mmb::Family, fixed_db_config);
1302 $cb!($($args)*, "ov_mmb", MmbOrderedVariable, mmb::Family, variable_db_config);
1303 };
1304 }
1305
1306 macro_rules! test_with_reopen {
1311 ($sfx:expr, $f:expr, $l:literal, $db:ty, $family:ty, $cfg:ident) => {{
1312 let p = concat!($l, "_", $sfx);
1313 let executor = deterministic::Runner::default();
1314 executor.start(|context| async move {
1315 let ctx = context.child($l);
1316 let db = <$db>::init(ctx.child("storage"), $cfg::<OneCap>(p, &ctx))
1317 .await
1318 .unwrap();
1319 $f(
1320 ctx,
1321 db,
1322 |ctx| {
1323 Box::pin(async move {
1324 <$db>::init(ctx.child("storage"), $cfg::<OneCap>(p, &ctx))
1325 .await
1326 .unwrap()
1327 })
1328 },
1329 to_digest,
1330 )
1331 .await;
1332 });
1333 }};
1334 }
1335
1336 macro_rules! test_with_make_value {
1337 ($sfx:expr, $f:expr, $l:literal, $db:ty, $family:ty, $cfg:ident) => {{
1338 let p = concat!($l, "_", $sfx);
1339 let executor = deterministic::Runner::default();
1340 executor.start(|context| async move {
1341 let ctx = context.child($l);
1342 let db = <$db>::init(ctx.child("storage"), $cfg::<OneCap>(p, &ctx))
1343 .await
1344 .unwrap();
1345 $f(ctx, db, to_digest).await;
1346 });
1347 }};
1348 }
1349
1350 macro_rules! for_all_variants {
1352 ($sfx:expr, with_reopen: $f:expr) => {{
1353 with_all_variants!(test_with_reopen!($sfx, $f));
1354 }};
1355 ($sfx:expr, with_make_value: $f:expr) => {{
1356 with_all_variants!(test_with_make_value!($sfx, $f));
1357 }};
1358 }
1359
1360 macro_rules! for_mmr_variants {
1363 ($sfx:expr, with_reopen: $f:expr) => {{
1364 with_mmr_variants!(test_with_reopen!($sfx, $f));
1365 }};
1366 ($sfx:expr, with_make_value: $f:expr) => {{
1367 with_mmr_variants!(test_with_make_value!($sfx, $f));
1368 }};
1369 }
1370
1371 #[test_group("slow")]
1372 #[test_traced("WARN")]
1373 fn test_all_variants_log_replay() {
1374 for_all_variants!("lr", with_reopen: test_any_db_log_replay);
1375 }
1376
1377 #[test_group("slow")]
1378 #[test_traced("WARN")]
1379 fn test_all_variants_build_and_authenticate() {
1380 for_mmr_variants!("baa", with_reopen: test_any_db_build_and_authenticate);
1381 }
1382
1383 #[test_group("slow")]
1384 #[test_traced("WARN")]
1385 fn test_all_variants_historical_proof_basic() {
1386 for_mmr_variants!("hpb", with_make_value: test_any_db_historical_proof_basic);
1387 }
1388
1389 #[test_group("slow")]
1390 #[test_traced("WARN")]
1391 fn test_all_variants_historical_proof_invalid() {
1392 for_mmr_variants!("hpi", with_make_value: test_any_db_historical_proof_invalid);
1393 }
1394
1395 #[test_group("slow")]
1396 #[test_traced("WARN")]
1397 fn test_all_variants_historical_proof_edge_cases() {
1398 for_mmr_variants!("hpec", with_make_value: test_any_db_historical_proof_edge_cases);
1399 }
1400
1401 #[test_group("slow")]
1402 #[test_traced("WARN")]
1403 fn test_all_variants_multiple_commits_delete_replayed() {
1404 for_all_variants!("mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
1405 }
1406
1407 #[test_group("slow")]
1408 #[test_traced("WARN")]
1409 fn test_all_variants_non_empty_recovery() {
1410 for_all_variants!("ner", with_reopen: test_any_db_non_empty_recovery);
1411 }
1412
1413 #[test_group("slow")]
1414 #[test_traced("WARN")]
1415 fn test_all_variants_empty_recovery() {
1416 for_all_variants!("er", with_reopen: test_any_db_empty_recovery);
1417 }
1418
1419 #[test_group("slow")]
1420 #[test_traced("WARN")]
1421 fn test_all_variants_rewind_recovery() {
1422 for_mmr_variants!("rr", with_reopen: test_any_db_rewind_recovery);
1423 }
1424
1425 fn key(i: u64) -> Digest {
1426 Sha256::hash(&i.to_be_bytes())
1427 }
1428
1429 fn val(i: u64) -> Digest {
1430 Sha256::hash(&(i + 10000).to_be_bytes())
1431 }
1432
1433 async fn commit_writes(
1435 db: &mut UnorderedVariable,
1436 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1437 metadata: Option<Digest>,
1438 ) -> std::ops::Range<crate::mmr::Location> {
1439 let mut batch = db.new_batch();
1440 for (k, v) in writes {
1441 batch = batch.write(k, v);
1442 }
1443 let merkleized = batch.merkleize(&*db, metadata).await.unwrap();
1444 let range = db.apply_batch(merkleized).await.unwrap();
1445 db.commit().await.unwrap();
1446 range
1447 }
1448
1449 #[test_traced("INFO")]
1451 fn test_any_batch_empty() {
1452 let executor = deterministic::Runner::default();
1453 executor.start(|context| async move {
1454 let ctx = context.child("db");
1455 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1456 ctx.child("storage"),
1457 variable_db_config::<OneCap>("e", &ctx),
1458 )
1459 .await
1460 .unwrap();
1461
1462 let root_before = db.root();
1463 let batch = db.new_batch();
1464 let merkleized = batch.merkleize(&db, None).await.unwrap();
1465 db.apply_batch(merkleized).await.unwrap();
1466
1467 assert_ne!(db.root(), root_before);
1469
1470 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1472 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1473
1474 db.destroy().await.unwrap();
1475 });
1476 }
1477
1478 #[test_traced("INFO")]
1480 fn test_any_batch_metadata() {
1481 let executor = deterministic::Runner::default();
1482 executor.start(|context| async move {
1483 let ctx = context.child("db");
1484 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1485 ctx.child("storage"),
1486 variable_db_config::<OneCap>("m", &ctx),
1487 )
1488 .await
1489 .unwrap();
1490
1491 let metadata = val(42);
1492
1493 commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1495 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1496
1497 let batch = db.new_batch();
1499 let merkleized = batch.merkleize(&db, None).await.unwrap();
1500 db.apply_batch(merkleized).await.unwrap();
1501 assert_eq!(db.get_metadata().await.unwrap(), None);
1502
1503 db.destroy().await.unwrap();
1504 });
1505 }
1506
1507 #[test_traced("INFO")]
1510 fn test_any_batch_get_read_through() {
1511 let executor = deterministic::Runner::default();
1512 executor.start(|context| async move {
1513 let ctx = context.child("db");
1514 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1515 ctx.child("storage"),
1516 variable_db_config::<OneCap>("g", &ctx),
1517 )
1518 .await
1519 .unwrap();
1520
1521 let ka = key(0);
1523 let va = val(0);
1524 commit_writes(&mut db, [(ka, Some(va))], None).await;
1525
1526 let kb = key(1);
1527 let vb = val(1);
1528 let kc = key(2);
1529
1530 let mut batch = db.new_batch();
1531
1532 assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
1534
1535 batch = batch.write(kb, Some(vb));
1537 assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
1538
1539 assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
1541
1542 let va2 = val(100);
1544 batch = batch.write(ka, Some(va2));
1545 assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
1546
1547 batch = batch.write(ka, None);
1549 assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
1550
1551 db.destroy().await.unwrap();
1552 });
1553 }
1554
1555 #[test_traced("INFO")]
1557 fn test_any_batch_get_on_merkleized() {
1558 let executor = deterministic::Runner::default();
1559 executor.start(|context| async move {
1560 let ctx = context.child("db");
1561 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1562 ctx.child("storage"),
1563 variable_db_config::<OneCap>("mg", &ctx),
1564 )
1565 .await
1566 .unwrap();
1567
1568 let ka = key(0);
1569 let kb = key(1);
1570 let kc = key(2);
1571 let kd = key(3);
1572
1573 commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1575
1576 let va2 = val(100);
1578 let vc = val(2);
1579 let mut batch = db.new_batch();
1580 batch = batch.write(ka, Some(va2));
1581 batch = batch.write(kb, None);
1582 batch = batch.write(kc, Some(vc));
1583 let merkleized = batch.merkleize(&db, None).await.unwrap();
1584
1585 assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1586 assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
1587 assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
1588 assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
1589
1590 db.destroy().await.unwrap();
1591 });
1592 }
1593
1594 #[test_traced("INFO")]
1596 fn test_any_batch_stacked_get() {
1597 let executor = deterministic::Runner::default();
1598 executor.start(|context| async move {
1599 let ctx = context.child("db");
1600 let db: UnorderedVariable = UnorderedVariableDb::init(
1601 ctx.child("storage"),
1602 variable_db_config::<OneCap>("sg", &ctx),
1603 )
1604 .await
1605 .unwrap();
1606
1607 let ka = key(0);
1608 let kb = key(1);
1609
1610 let mut batch = db.new_batch();
1612 batch = batch.write(ka, Some(val(0)));
1613 let merkleized = batch.merkleize(&db, None).await.unwrap();
1614
1615 let mut child = merkleized.new_batch::<Sha256>();
1617 assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
1618
1619 child = child.write(ka, Some(val(100)));
1621 assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
1622
1623 child = child.write(kb, Some(val(1)));
1625 assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
1626
1627 child = child.write(ka, None);
1629 assert_eq!(child.get(&ka, &db).await.unwrap(), None);
1630
1631 db.destroy().await.unwrap();
1632 });
1633 }
1634
1635 #[test_traced("INFO")]
1637 fn test_any_batch_stacked_delete_recreate() {
1638 let executor = deterministic::Runner::default();
1639 executor.start(|context| async move {
1640 let ctx = context.child("db");
1641 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1642 ctx.child("storage"),
1643 variable_db_config::<OneCap>("dr", &ctx),
1644 )
1645 .await
1646 .unwrap();
1647
1648 let ka = key(0);
1649
1650 commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
1652
1653 let mut parent = db.new_batch();
1655 parent = parent.write(ka, None);
1656 let parent_m = parent.merkleize(&db, None).await.unwrap();
1657 assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
1658
1659 let mut child = parent_m.new_batch::<Sha256>();
1661 child = child.write(ka, Some(val(200)));
1662 let child_m = child.merkleize(&db, None).await.unwrap();
1663 assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
1664
1665 db.apply_batch(child_m).await.unwrap();
1667 assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1668
1669 db.destroy().await.unwrap();
1670 });
1671 }
1672
1673 #[test_traced("INFO")]
1676 fn test_any_batch_floor_raise() {
1677 let executor = deterministic::Runner::default();
1678 executor.start(|context| async move {
1679 let ctx = context.child("db");
1680 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1681 ctx.child("storage"),
1682 variable_db_config::<OneCap>("fr", &ctx),
1683 )
1684 .await
1685 .unwrap();
1686
1687 let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
1689 commit_writes(&mut db, init, None).await;
1690
1691 let floor_before = db.inactivity_floor_loc();
1692
1693 let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
1695 commit_writes(&mut db, updates, None).await;
1696
1697 assert!(db.inactivity_floor_loc() > floor_before);
1699
1700 for i in 0..30 {
1702 assert_eq!(
1703 db.get(&key(i)).await.unwrap(),
1704 Some(val(i + 500)),
1705 "updated key {i} mismatch"
1706 );
1707 }
1708 for i in 30..100 {
1709 assert_eq!(
1710 db.get(&key(i)).await.unwrap(),
1711 Some(val(i)),
1712 "untouched key {i} mismatch"
1713 );
1714 }
1715
1716 db.destroy().await.unwrap();
1717 });
1718 }
1719
1720 #[test_traced("INFO")]
1722 fn test_any_batch_apply_returns_range() {
1723 let executor = deterministic::Runner::default();
1724 executor.start(|context| async move {
1725 let ctx = context.child("db");
1726 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1727 ctx.child("storage"),
1728 variable_db_config::<OneCap>("ar", &ctx),
1729 )
1730 .await
1731 .unwrap();
1732
1733 let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1735 let range1 = commit_writes(&mut db, writes, None).await;
1736
1737 assert_eq!(range1.start, crate::mmr::Location::new(1));
1739 assert!(range1.end.saturating_sub(*range1.start) >= 6);
1741
1742 let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1744 let range2 = commit_writes(&mut db, writes, None).await;
1745 assert_eq!(range2.start, range1.end);
1746
1747 db.destroy().await.unwrap();
1748 });
1749 }
1750
1751 #[test_traced("INFO")]
1753 fn test_any_batch_deep_chain() {
1754 let executor = deterministic::Runner::default();
1755 executor.start(|context| async move {
1756 let ctx = context.child("db");
1757 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1758 ctx.child("storage"),
1759 variable_db_config::<OneCap>("dc", &ctx),
1760 )
1761 .await
1762 .unwrap();
1763
1764 let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1766 commit_writes(&mut db, init, None).await;
1767
1768 let mut parent = db.new_batch();
1770 parent = parent.write(key(0), Some(val(100)));
1771 parent = parent.write(key(5), Some(val(5)));
1772 let parent_m = parent.merkleize(&db, None).await.unwrap();
1773
1774 let mut child = parent_m.new_batch::<Sha256>();
1776 child = child.write(key(1), Some(val(101)));
1777 child = child.write(key(6), Some(val(6)));
1778 let child_m = child.merkleize(&db, None).await.unwrap();
1779
1780 let mut grandchild = child_m.new_batch::<Sha256>();
1782 grandchild = grandchild.write(key(2), None);
1783 grandchild = grandchild.write(key(7), Some(val(7)));
1784 let grandchild_m = grandchild.merkleize(&db, None).await.unwrap();
1785
1786 assert_eq!(
1788 grandchild_m.get(&key(0), &db).await.unwrap(),
1789 Some(val(100))
1790 );
1791 assert_eq!(
1792 grandchild_m.get(&key(1), &db).await.unwrap(),
1793 Some(val(101))
1794 );
1795 assert_eq!(grandchild_m.get(&key(2), &db).await.unwrap(), None);
1796 assert_eq!(grandchild_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
1797
1798 db.apply_batch(grandchild_m).await.unwrap();
1800
1801 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1802 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
1803 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1804 assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
1805 assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
1806 assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
1807 assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
1808 assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
1809
1810 db.destroy().await.unwrap();
1811 });
1812 }
1813
1814 #[test_traced("INFO")]
1816 fn test_any_batch_chain_matches_sequential() {
1817 let executor = deterministic::Runner::default();
1818 executor.start(|context| async move {
1819 let ctx = context.child("db");
1820
1821 let ctx_a = ctx.child("a");
1823 let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
1824 ctx_a.child("db"),
1825 variable_db_config::<OneCap>("cms-a", &ctx_a),
1826 )
1827 .await
1828 .unwrap();
1829
1830 let ctx_b = ctx.child("b");
1832 let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
1833 ctx_b.child("db"),
1834 variable_db_config::<OneCap>("cms-b", &ctx_b),
1835 )
1836 .await
1837 .unwrap();
1838
1839 let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1841
1842 let writes2 = vec![
1844 (key(0), Some(val(100))),
1845 (key(1), None),
1846 (key(5), Some(val(5))),
1847 ];
1848
1849 commit_writes(&mut db_a, writes1.clone(), None).await;
1851 commit_writes(&mut db_a, writes2.clone(), None).await;
1852
1853 let mut parent = db_b.new_batch();
1855 for (k, v) in &writes1 {
1856 parent = parent.write(*k, *v);
1857 }
1858 let parent_m = parent.merkleize(&db_b, None).await.unwrap();
1859
1860 let mut child = parent_m.new_batch::<Sha256>();
1861 for (k, v) in &writes2 {
1862 child = child.write(*k, *v);
1863 }
1864 let child_m = child.merkleize(&db_b, None).await.unwrap();
1865 db_b.apply_batch(child_m).await.unwrap();
1866
1867 assert_eq!(db_a.root(), db_b.root());
1869 for i in 0..6 {
1870 assert_eq!(
1871 db_a.get(&key(i)).await.unwrap(),
1872 db_b.get(&key(i)).await.unwrap(),
1873 "key {i} mismatch"
1874 );
1875 }
1876
1877 db_a.destroy().await.unwrap();
1878 db_b.destroy().await.unwrap();
1879 });
1880 }
1881
1882 #[test_traced("INFO")]
1884 fn test_any_batch_create_then_delete_same_batch() {
1885 let executor = deterministic::Runner::default();
1886 executor.start(|context| async move {
1887 let ctx = context.child("db");
1888 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1889 ctx.child("storage"),
1890 variable_db_config::<OneCap>("cd", &ctx),
1891 )
1892 .await
1893 .unwrap();
1894
1895 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1897
1898 let mut batch = db.new_batch();
1900 batch = batch.write(key(1), Some(val(1))); batch = batch.write(key(1), None); batch = batch.write(key(2), Some(val(2))); batch = batch.write(key(0), None); let merkleized = batch.merkleize(&db, None).await.unwrap();
1905 db.apply_batch(merkleized).await.unwrap();
1906
1907 assert_eq!(db.get(&key(0)).await.unwrap(), None);
1908 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1909 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1910
1911 db.destroy().await.unwrap();
1912 });
1913 }
1914
1915 #[test_traced("INFO")]
1917 fn test_any_batch_delete_all_keys() {
1918 let executor = deterministic::Runner::default();
1919 executor.start(|context| async move {
1920 let ctx = context.child("db");
1921 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1922 ctx.child("storage"),
1923 variable_db_config::<OneCap>("da", &ctx),
1924 )
1925 .await
1926 .unwrap();
1927
1928 let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1930 commit_writes(&mut db, init, None).await;
1931
1932 let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
1934 commit_writes(&mut db, deletes, None).await;
1935
1936 for i in 0..5 {
1937 assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
1938 }
1939
1940 commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
1942 assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
1943
1944 db.destroy().await.unwrap();
1945 });
1946 }
1947
1948 #[test_traced("INFO")]
1950 fn test_any_batch_parallel_forks() {
1951 let executor = deterministic::Runner::default();
1952 executor.start(|context| async move {
1953 let ctx = context.child("db");
1954 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1955 ctx.child("storage"),
1956 variable_db_config::<OneCap>("pf", &ctx),
1957 )
1958 .await
1959 .unwrap();
1960
1961 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1963 let root_before = db.root();
1964
1965 let fork_a_m = db
1967 .new_batch()
1968 .write(key(0), Some(val(100)))
1969 .write(key(1), Some(val(1)))
1970 .merkleize(&db, None)
1971 .await
1972 .unwrap();
1973
1974 let fork_b_m = db
1976 .new_batch()
1977 .write(key(0), None)
1978 .write(key(2), Some(val(2)))
1979 .merkleize(&db, None)
1980 .await
1981 .unwrap();
1982
1983 assert_ne!(fork_a_m.root(), fork_b_m.root());
1985
1986 assert_eq!(db.root(), root_before);
1988 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1989 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1990
1991 db.apply_batch(fork_a_m).await.unwrap();
1993 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1994 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1995 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1996
1997 db.destroy().await.unwrap();
1998 });
1999 }
2000
2001 #[test_traced("INFO")]
2003 fn test_any_batch_floor_raise_chained() {
2004 let executor = deterministic::Runner::default();
2005 executor.start(|context| async move {
2006 let ctx = context.child("db");
2007 let mut db: UnorderedVariable = UnorderedVariableDb::init(
2008 ctx.child("storage"),
2009 variable_db_config::<OneCap>("frc", &ctx),
2010 )
2011 .await
2012 .unwrap();
2013
2014 let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
2016 commit_writes(&mut db, init, None).await;
2017 let floor_before = db.inactivity_floor_loc();
2018
2019 let mut parent = db.new_batch();
2021 for i in 0..20 {
2022 parent = parent.write(key(i), Some(val(i + 500)));
2023 }
2024 let parent_m = parent.merkleize(&db, None).await.unwrap();
2025
2026 let mut child = parent_m.new_batch::<Sha256>();
2028 for i in 20..30 {
2029 child = child.write(key(i), Some(val(i + 500)));
2030 }
2031 let child_m = child.merkleize(&db, None).await.unwrap();
2032 db.apply_batch(child_m).await.unwrap();
2033
2034 assert!(db.inactivity_floor_loc() > floor_before);
2036
2037 for i in 0..30 {
2039 assert_eq!(
2040 db.get(&key(i)).await.unwrap(),
2041 Some(val(i + 500)),
2042 "updated key {i} mismatch"
2043 );
2044 }
2045 for i in 30..50 {
2046 assert_eq!(
2047 db.get(&key(i)).await.unwrap(),
2048 Some(val(i)),
2049 "untouched key {i} mismatch"
2050 );
2051 }
2052
2053 db.destroy().await.unwrap();
2054 });
2055 }
2056
2057 #[test_traced("INFO")]
2059 fn test_any_batch_abandoned() {
2060 let executor = deterministic::Runner::default();
2061 executor.start(|context| async move {
2062 let ctx = context.child("db");
2063 let mut db: UnorderedVariable = UnorderedVariableDb::init(
2064 ctx.child("storage"),
2065 variable_db_config::<OneCap>("ab", &ctx),
2066 )
2067 .await
2068 .unwrap();
2069
2070 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
2071 let root_before = db.root();
2072
2073 {
2075 let mut batch = db.new_batch();
2076 batch = batch.write(key(0), Some(val(999)));
2077 batch = batch.write(key(1), Some(val(1)));
2078 let _merkleized = batch.merkleize(&db, None).await.unwrap();
2079 }
2081
2082 assert_eq!(db.root(), root_before);
2084 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2085 assert_eq!(db.get(&key(1)).await.unwrap(), None);
2086
2087 db.destroy().await.unwrap();
2088 });
2089 }
2090
2091 #[test_traced("INFO")]
2093 fn test_any_batch_apply_requires_commit_for_recovery() {
2094 let executor = deterministic::Runner::default();
2095 executor.start(|context| async move {
2096 let partition = "apply_requires_commit";
2097 let ctx = context.child("db");
2098 let mut db: UnorderedVariable = UnorderedVariableDb::init(
2099 ctx.child("storage"),
2100 variable_db_config::<OneCap>(partition, &ctx),
2101 )
2102 .await
2103 .unwrap();
2104
2105 let committed_root = db.root();
2106
2107 let merkleized = db
2108 .new_batch()
2109 .write(key(0), Some(val(0)))
2110 .merkleize(&db, None)
2111 .await
2112 .unwrap();
2113 db.apply_batch(merkleized).await.unwrap();
2114
2115 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2116
2117 drop(db);
2118
2119 let reopened: UnorderedVariable = UnorderedVariableDb::init(
2120 context.child("reopen"),
2121 variable_db_config::<OneCap>(partition, &context),
2122 )
2123 .await
2124 .unwrap();
2125 assert_eq!(reopened.root(), committed_root);
2126 assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2127
2128 reopened.destroy().await.unwrap();
2129 });
2130 }
2131
2132 #[test_traced("INFO")]
2134 fn test_any_rewind_pruned_target_errors() {
2135 let executor = deterministic::Runner::default();
2136 executor.start(|context| async move {
2137 const KEYS: u64 = 64;
2138
2139 let ctx = context.child("db");
2140 let mut db: UnorderedVariable = UnorderedVariableDb::init(
2141 ctx.child("storage"),
2142 variable_db_config::<OneCap>("rp", &ctx),
2143 )
2144 .await
2145 .unwrap();
2146
2147 let initial: Vec<_> = (0..KEYS).map(|i| (key(i), Some(val(i)))).collect();
2148 let first_range = commit_writes(&mut db, initial, None).await;
2149
2150 let mut round = 0u64;
2151 loop {
2152 round += 1;
2153 assert!(
2154 round <= 64,
2155 "failed to prune enough history for rewind test"
2156 );
2157
2158 let updates: Vec<_> = (0..KEYS)
2159 .map(|i| (key(i), Some(val(1000 + round * KEYS + i))))
2160 .collect();
2161 commit_writes(&mut db, updates, None).await;
2162
2163 db.prune(db.sync_boundary()).await.unwrap();
2164 let bounds = db.bounds().await;
2165 if bounds.start > first_range.start {
2166 break;
2167 }
2168 }
2169
2170 let oldest_retained = db.bounds().await.start;
2171 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
2172 assert!(
2173 matches!(
2174 boundary_err,
2175 crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2176 ),
2177 "unexpected rewind error at retained boundary: {boundary_err:?}"
2178 );
2179
2180 let err = db.rewind(first_range.start).await.unwrap_err();
2181 assert!(
2182 matches!(
2183 err,
2184 crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2185 ),
2186 "unexpected rewind error: {err:?}"
2187 );
2188
2189 db.destroy().await.unwrap();
2190 });
2191 }
2192
2193 #[test_traced("INFO")]
2195 fn test_any_rewind_invalid_target_errors() {
2196 let executor = deterministic::Runner::default();
2197 executor.start(|context| async move {
2198 let ctx = context.child("db");
2199 let mut db: UnorderedVariable = UnorderedVariableDb::init(
2200 ctx.child("storage"),
2201 variable_db_config::<OneCap>("ri", &ctx),
2202 )
2203 .await
2204 .unwrap();
2205
2206 let root_before = db.root();
2207 let size_before = db.size().await;
2208 db.rewind(size_before).await.unwrap();
2209 assert_eq!(db.root(), root_before);
2210 assert_eq!(db.size().await, size_before);
2211
2212 let zero_err = db.rewind(Location::new(0)).await.unwrap_err();
2213 assert!(
2214 matches!(
2215 zero_err,
2216 crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(0))
2217 ),
2218 "unexpected rewind error: {zero_err:?}"
2219 );
2220 assert_eq!(db.root(), root_before);
2221 assert_eq!(db.size().await, size_before);
2222
2223 let too_large_target = Location::new(*size_before + 1);
2224 let too_large_err = db.rewind(too_large_target).await.unwrap_err();
2225 assert!(
2226 matches!(
2227 too_large_err,
2228 crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(size))
2229 if size == *too_large_target
2230 ),
2231 "unexpected rewind error: {too_large_err:?}"
2232 );
2233 assert_eq!(db.root(), root_before);
2234 assert_eq!(db.size().await, size_before);
2235
2236 db.destroy().await.unwrap();
2237 });
2238 }
2239
2240 #[test_traced("INFO")]
2243 fn test_any_rewind_rejects_target_with_pruned_floor() {
2244 let executor = deterministic::Runner::default();
2245 executor.start(|context| async move {
2246 const KEYS: u64 = 64;
2247
2248 let ctx = context.child("db");
2249 let mut db: UnorderedVariable =
2250 UnorderedVariableDb::init(ctx.child("storage"), variable_db_config::<OneCap>("rf", &ctx))
2251 .await
2252 .unwrap();
2253
2254 commit_writes(&mut db, (0..KEYS).map(|i| (key(i), Some(val(i)))), None).await;
2255 commit_writes(
2256 &mut db,
2257 (0..KEYS).map(|i| (key(i), Some(val(1_000 + i)))),
2258 None,
2259 )
2260 .await;
2261
2262 let rewind_target = db.size().await;
2263 let target_floor = db.inactivity_floor_loc();
2264 let prune_loc = Location::new(*target_floor + (KEYS / 2));
2265 assert!(
2266 rewind_target > *prune_loc,
2267 "test setup expected target size > prune_loc; target={rewind_target:?}, floor={target_floor:?}"
2268 );
2269
2270 let mut round = 0u64;
2271 while db.inactivity_floor_loc() < prune_loc {
2272 round += 1;
2273 assert!(
2274 round <= 8,
2275 "failed to advance inactivity floor enough for floor-pruned rewind test"
2276 );
2277 commit_writes(
2278 &mut db,
2279 (0..KEYS).map(|i| (key(i), Some(val(10_000 + round * KEYS + i)))),
2280 None,
2281 )
2282 .await;
2283 }
2284
2285 db.prune(prune_loc).await.unwrap();
2286 let bounds = db.bounds().await;
2287 assert!(
2288 bounds.start > *target_floor,
2289 "test setup expected pruned start beyond target floor; bounds={bounds:?}, target_floor={target_floor:?}"
2290 );
2291 assert!(
2292 rewind_target > bounds.start,
2293 "test setup expected target commit retained; target={rewind_target:?}, bounds={bounds:?}"
2294 );
2295
2296 let err = db.rewind(rewind_target).await.unwrap_err();
2297 assert!(
2298 matches!(
2299 err,
2300 crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2301 ),
2302 "unexpected rewind error: {err:?}"
2303 );
2304
2305 db.destroy().await.unwrap();
2306 });
2307 }
2308
2309 #[test_traced("INFO")]
2315 fn test_any_prune_keeps_bitmap_aligned_with_journal() {
2316 let executor = deterministic::Runner::default();
2317 executor.start(|context| async move {
2318 const BITMAP_CHUNK_BITS: u64 =
2321 commonware_utils::bitmap::Prunable::<BITMAP_CHUNK_BYTES>::CHUNK_SIZE_BITS;
2322 const ITEMS_PER_SECTION: u64 = 2048;
2326 const { assert!(ITEMS_PER_SECTION > BITMAP_CHUNK_BITS) };
2327
2328 let ctx = context.child("db");
2329 let mut cfg = variable_db_config::<OneCap>("rg", &ctx);
2330 cfg.journal_config.items_per_section = NZU64!(ITEMS_PER_SECTION);
2331
2332 let mut db: UnorderedVariable =
2333 UnorderedVariableDb::init(ctx.child("storage"), cfg).await.unwrap();
2334
2335 commit_writes(&mut db, (0..100).map(|i| (key(i), Some(val(i)))), None).await;
2336 let rewind_target = db.size().await;
2337 assert!(
2340 *rewind_target < BITMAP_CHUNK_BITS,
2341 "rewind_target {rewind_target:?} must be < {BITMAP_CHUNK_BITS} for the bug to manifest"
2342 );
2343 let root_at_target = db.root();
2344
2345 commit_writes(
2346 &mut db,
2347 (0..700).map(|i| (key(i), Some(val(1_000 + i)))),
2348 None,
2349 )
2350 .await;
2351 commit_writes(
2352 &mut db,
2353 (0..700).map(|i| (key(i), Some(val(10_000 + i)))),
2354 None,
2355 )
2356 .await;
2357
2358 let pre_prune_size = db.size().await;
2361 assert!(pre_prune_size > rewind_target);
2362
2363 let prune_loc = Location::new(600);
2364 assert!(
2367 *prune_loc > BITMAP_CHUNK_BITS,
2368 "prune_loc {prune_loc:?} must exceed one bitmap chunk ({BITMAP_CHUNK_BITS} bits)"
2369 );
2370 assert!(
2373 *prune_loc < ITEMS_PER_SECTION,
2374 "prune_loc {prune_loc:?} must be < {ITEMS_PER_SECTION} so the journal retains section 0"
2375 );
2376 assert!(db.inactivity_floor_loc() >= prune_loc);
2377
2378 db.prune(prune_loc).await.unwrap();
2379
2380 let bounds = db.bounds().await;
2383 assert_eq!(bounds.start, Location::new(0));
2384 assert_eq!(
2385 db.bitmap.pruned_bits(),
2386 0,
2387 "bitmap pruned past journal retained start"
2388 );
2389
2390 db.rewind(rewind_target).await.unwrap();
2393 assert_eq!(db.size().await, rewind_target);
2394 assert_eq!(db.root(), root_at_target);
2395
2396 db.destroy().await.unwrap();
2397 });
2398 }
2399
2400 type MmbVariable = super::db::Db<
2407 crate::merkle::mmb::Family,
2408 Context,
2409 crate::journal::contiguous::variable::Journal<
2410 Context,
2411 super::operation::Operation<
2412 crate::merkle::mmb::Family,
2413 super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2414 >,
2415 >,
2416 crate::index::unordered::Index<OneCap, crate::merkle::Location<crate::merkle::mmb::Family>>,
2417 Sha256,
2418 super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2419 { crate::qmdb::any::BITMAP_CHUNK_BYTES },
2420 Sequential,
2421 >;
2422
2423 async fn open_mmb_db(context: Context, suffix: &str) -> MmbVariable {
2424 let cfg = variable_db_config::<OneCap>(suffix, &context);
2425 super::init(context, cfg).await.unwrap()
2426 }
2427
2428 async fn commit_writes_mmb(
2429 db: &mut MmbVariable,
2430 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
2431 metadata: Option<Digest>,
2432 ) {
2433 let mut batch = db.new_batch();
2434 for (k, v) in writes {
2435 batch = batch.write(k, v);
2436 }
2437 let merkleized = batch.merkleize(db, metadata).await.unwrap();
2438 db.apply_batch(merkleized).await.unwrap();
2439 db.commit().await.unwrap();
2440 }
2441
2442 #[test_traced("INFO")]
2443 fn test_mmb_batch_crud() {
2444 let executor = deterministic::Runner::default();
2445 executor.start(|context| async move {
2446 let mut db = open_mmb_db(context.child("db"), "crud").await;
2447
2448 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2450 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2451
2452 commit_writes_mmb(&mut db, [(key(0), Some(val(1)))], None).await;
2454 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(1)));
2455
2456 commit_writes_mmb(&mut db, [(key(0), None)], None).await;
2458 assert!(db.get(&key(0)).await.unwrap().is_none());
2459
2460 commit_writes_mmb(
2462 &mut db,
2463 [(key(1), Some(val(1))), (key(2), Some(val(2)))],
2464 None,
2465 )
2466 .await;
2467 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2468 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
2469
2470 db.destroy().await.unwrap();
2471 });
2472 }
2473
2474 #[test_traced("INFO")]
2475 fn test_mmb_batch_empty() {
2476 let executor = deterministic::Runner::default();
2477 executor.start(|context| async move {
2478 let mut db = open_mmb_db(context.child("db"), "empty").await;
2479 let root_before = db.root();
2480
2481 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2482 db.apply_batch(merkleized).await.unwrap();
2483 assert_ne!(db.root(), root_before);
2484
2485 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2486 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2487
2488 db.destroy().await.unwrap();
2489 });
2490 }
2491
2492 #[test_traced("INFO")]
2493 fn test_mmb_batch_metadata() {
2494 let executor = deterministic::Runner::default();
2495 executor.start(|context| async move {
2496 let mut db = open_mmb_db(context.child("db"), "meta").await;
2497
2498 let metadata = val(42);
2499 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
2500 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
2501
2502 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2503 db.apply_batch(merkleized).await.unwrap();
2504 assert_eq!(db.get_metadata().await.unwrap(), None);
2505
2506 db.destroy().await.unwrap();
2507 });
2508 }
2509
2510 #[test_traced("WARN")]
2511 fn test_mmb_recovery() {
2512 let executor = deterministic::Runner::default();
2513 executor.start(|context| async move {
2514 let mut db =
2515 open_mmb_db(context.child("db").with_attribute("index", 0), "recovery").await;
2516
2517 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(val(99))).await;
2518 commit_writes_mmb(&mut db, [(key(1), Some(val(1)))], None).await;
2519
2520 let root = db.root();
2521 let bounds = db.bounds().await;
2522 db.sync().await.unwrap();
2523 drop(db);
2524
2525 let db = open_mmb_db(context.child("db").with_attribute("index", 1), "recovery").await;
2527 assert_eq!(db.root(), root);
2528 assert_eq!(db.bounds().await, bounds);
2529 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2530 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2531 assert_eq!(db.get_metadata().await.unwrap(), None);
2532
2533 db.destroy().await.unwrap();
2534 });
2535 }
2536
2537 #[test_traced("INFO")]
2538 fn test_mmb_prune() {
2539 let executor = deterministic::Runner::default();
2540 executor.start(|context| async move {
2541 let mut db = open_mmb_db(context.child("db"), "prune").await;
2542
2543 for i in 0u64..20 {
2544 commit_writes_mmb(&mut db, [(key(i), Some(val(i)))], None).await;
2545 }
2546
2547 let floor = db.inactivity_floor_loc();
2548 db.prune(floor).await.unwrap();
2549
2550 for i in 0u64..20 {
2552 assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2553 }
2554
2555 db.destroy().await.unwrap();
2556 });
2557 }
2558
2559 #[test_traced("INFO")]
2561 fn test_any_batch_single_stage_pipeline() {
2562 let executor = deterministic::Runner::default();
2563 executor.start(|context| async move {
2564 let ctx = context.child("db");
2565 let mut db: UnorderedVariable = UnorderedVariableDb::init(
2566 ctx.child("storage"),
2567 variable_db_config::<OneCap>("pipe", &ctx),
2568 )
2569 .await
2570 .unwrap();
2571
2572 {
2573 let mut batch = db.new_batch();
2574 batch = batch.write(key(0), Some(val(0)));
2575 let merkleized = batch.merkleize(&db, None).await.unwrap();
2576 db.apply_batch(merkleized).await.unwrap();
2577 }
2578
2579 let (child_merkleized, commit_result) = futures::join!(
2580 async {
2581 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2582 let mut child = db.new_batch();
2583 child = child.write(key(1), Some(val(1)));
2584 child.merkleize(&db, None).await.unwrap()
2585 },
2586 db.commit(),
2587 );
2588 commit_result.unwrap();
2589
2590 db.apply_batch(child_merkleized).await.unwrap();
2591 db.commit().await.unwrap();
2592
2593 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2594 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2595
2596 db.destroy().await.unwrap();
2597 });
2598 }
2599}
2600
2601#[cfg(test)]
2602mod bitmap_tests {
2603 use crate::{
2607 merkle::Location,
2608 qmdb::any::unordered::variable::test::{create_test_config, AnyTest},
2609 };
2610 use commonware_cryptography::{Hasher, Sha256};
2611 use commonware_macros::test_traced;
2612 use commonware_runtime::{
2613 deterministic::{self, Context},
2614 Runner as _, Supervisor as _,
2615 };
2616 use commonware_utils::bitmap::Readable as _;
2617
2618 async fn open_db(context: Context) -> AnyTest {
2620 let cfg = create_test_config(0, &context);
2621 AnyTest::init(context, cfg).await.unwrap()
2622 }
2623
2624 fn bitmap_active_locs(db: &AnyTest) -> Vec<u64> {
2626 let b = &db.bitmap;
2627 (b.pruned_bits()..b.len())
2628 .filter(|loc| b.get_bit(*loc))
2629 .collect()
2630 }
2631
2632 async fn assert_oracle_round_trip(db: AnyTest, context: Context, label: &str) -> AnyTest {
2634 let pre_active = bitmap_active_locs(&db);
2635 let pre_len = db.bitmap.len();
2636 let pre_pruned = db.bitmap.pruned_bits();
2637
2638 db.commit().await.unwrap();
2639 drop(db);
2640
2641 let db = open_db(context.child("reopen").with_attribute("case", label)).await;
2642
2643 assert_eq!(
2644 db.bitmap.pruned_bits(),
2645 pre_pruned,
2646 "pruned_bits diverged on reopen",
2647 );
2648 assert_eq!(db.bitmap.len(), pre_len, "bitmap len diverged on reopen");
2649 assert_eq!(
2650 bitmap_active_locs(&db),
2651 pre_active,
2652 "active locations diverged on reopen",
2653 );
2654 db
2655 }
2656
2657 #[test_traced]
2663 fn current_commit_floor_bit_is_one_others_zero() {
2664 deterministic::Runner::default().start(|context| async move {
2665 let mut db = open_db(context.child("db")).await;
2666
2667 let mut commit_locs = Vec::new();
2669 for i in 0..3u64 {
2670 let key = Sha256::hash(&i.to_be_bytes());
2671 let batch = db
2672 .new_batch()
2673 .write(key, Some(vec![i as u8]))
2674 .merkleize(&db, None)
2675 .await
2676 .unwrap();
2677 commit_locs.push(Location::<crate::merkle::mmr::Family>::new(
2678 batch.bounds.total_size - 1,
2679 ));
2680 db.apply_batch(batch).await.unwrap();
2681 }
2682 db.commit().await.unwrap();
2683
2684 assert_eq!(commit_locs.len(), 3);
2686 assert!(*commit_locs[0] < *commit_locs[1]);
2687 assert!(*commit_locs[1] < *commit_locs[2]);
2688 assert!(*commit_locs[2] < db.bitmap.len());
2689
2690 assert!(!db.bitmap.get_bit(*commit_locs[0]));
2692 assert!(!db.bitmap.get_bit(*commit_locs[1]));
2693 assert!(db.bitmap.get_bit(*commit_locs[2]));
2695
2696 let db = assert_oracle_round_trip(db, context, "commit_floor").await;
2697 db.destroy().await.unwrap();
2698 });
2699 }
2700
2701 #[test_traced]
2712 fn rewind_restores_bitmap_to_target_commit() {
2713 deterministic::Runner::default().start(|context| async move {
2714 let mut db = open_db(context.child("db")).await;
2715 let k1 = Sha256::hash(&[1]);
2716 let k2 = Sha256::hash(&[2]);
2717
2718 let b1 = db
2720 .new_batch()
2721 .write(k1, Some(vec![10]))
2722 .merkleize(&db, None)
2723 .await
2724 .unwrap();
2725 db.apply_batch(b1).await.unwrap();
2726 db.commit().await.unwrap();
2727 let size_after_first = Location::new(*db.last_commit_loc + 1);
2728
2729 let b2 = db
2730 .new_batch()
2731 .write(k2, Some(vec![20]))
2732 .merkleize(&db, None)
2733 .await
2734 .unwrap();
2735 db.apply_batch(b2).await.unwrap();
2736
2737 assert_eq!(db.get(&k1).await.unwrap(), Some(vec![10]));
2739 assert_eq!(db.get(&k2).await.unwrap(), Some(vec![20]));
2740 assert!(*db.last_commit_loc + 1 > *size_after_first);
2741
2742 db.rewind(size_after_first).await.unwrap();
2744
2745 assert_eq!(db.get(&k1).await.unwrap(), Some(vec![10]));
2747 assert!(db.get(&k2).await.unwrap().is_none());
2748
2749 let db = assert_oracle_round_trip(db, context, "rewind").await;
2750 db.destroy().await.unwrap();
2751 });
2752 }
2753
2754 #[test_traced]
2774 fn floor_scan_falls_through_to_uncommitted_tail() {
2775 deterministic::Runner::default().start(|context| async move {
2776 let mut db = open_db(context.child("db")).await;
2777 let anchor = Sha256::hash(&[0xAA]);
2778
2779 let b = db
2781 .new_batch()
2782 .write(anchor, Some(vec![1]))
2783 .merkleize(&db, None)
2784 .await
2785 .unwrap();
2786 db.apply_batch(b).await.unwrap();
2787
2788 assert_eq!(db.get(&anchor).await.unwrap(), Some(vec![1]));
2790 let committed_bitmap_len = db.bitmap.len();
2791
2792 let parent = db
2794 .new_batch()
2795 .write(anchor, Some(vec![2]))
2796 .merkleize(&db, None)
2797 .await
2798 .unwrap();
2799 assert!(
2800 parent.bounds.total_size > committed_bitmap_len,
2801 "parent must extend past committed bitmap to exercise the tail path",
2802 );
2803
2804 let mut child_batch = parent.new_batch::<Sha256>();
2808 child_batch = child_batch.write(anchor, Some(vec![3]));
2809 for i in 0..16u64 {
2810 let k = Sha256::hash(&(1000 + i).to_be_bytes());
2811 child_batch = child_batch.write(k, Some(vec![i as u8]));
2812 }
2813 let child = child_batch.merkleize(&db, None).await.unwrap();
2814 assert!(
2815 child.bounds.total_size > committed_bitmap_len,
2816 "child must include an uncommitted tail beyond committed bitmap",
2817 );
2818 let expected_root = child.root();
2819
2820 db.apply_batch(child).await.unwrap();
2823 assert_eq!(db.root(), expected_root);
2824 assert_eq!(db.get(&anchor).await.unwrap(), Some(vec![3]));
2825
2826 let db = assert_oracle_round_trip(db, context, "tail").await;
2827 db.destroy().await.unwrap();
2828 });
2829 }
2830}