1use crate::{
67 index::Factory as IndexFactory,
68 journal::{
69 authenticated::Inner,
70 contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
71 },
72 merkle::{journaled::Config as MerkleConfig, Family, Location},
73 qmdb::{
74 any::operation::{Operation, Update},
75 operation::Committable,
76 },
77 translator::Translator,
78 Context,
79};
80use commonware_codec::CodecShared;
81use commonware_cryptography::Hasher;
82use tracing::warn;
83
84pub mod batch;
85pub mod db;
86pub mod operation;
87#[cfg(any(test, feature = "test-traits"))]
88pub mod traits;
89pub mod value;
90pub use value::{FixedValue, ValueEncoding, VariableValue};
91pub mod ordered;
92pub(crate) mod sync;
93pub mod unordered;
94
95#[derive(Clone)]
97pub struct Config<T: Translator, J> {
98 pub merkle_config: MerkleConfig,
100
101 pub journal_config: J,
103
104 pub translator: T,
106}
107
108pub type FixedConfig<T> = Config<T, FConfig>;
110
111pub type VariableConfig<T, C> = Config<T, VConfig<C>>;
113
114pub async fn init<F, E, U, H, T, I, J, Cb>(
116 context: E,
117 cfg: Config<T, J::Config>,
118 known_inactivity_floor: Option<Location<F>>,
119 callback: Cb,
120) -> Result<db::Db<F, E, J, I, H, U>, crate::qmdb::Error<F>>
121where
122 F: Family,
123 E: Context,
124 U: Update + Send + Sync,
125 H: Hasher,
126 T: Translator,
127 I: IndexFactory<T, Value = Location<F>>,
128 J: Inner<E, Item = Operation<F, U>>,
129 Operation<F, U>: Committable + CodecShared,
130 Cb: FnMut(bool, Option<Location<F>>),
131{
132 let mut log = J::init::<F, H>(
133 context.with_label("log"),
134 cfg.merkle_config,
135 cfg.journal_config,
136 Operation::is_commit,
137 )
138 .await?;
139
140 if log.size().await == 0 {
141 warn!("Authenticated log is empty, initializing new db");
142 let commit_floor = Operation::CommitFloor(None, Location::new(0));
143 log.append(&commit_floor).await?;
144 log.sync().await?;
145 }
146
147 let index = I::new(context.with_label("index"), cfg.translator);
148 db::Db::init_from_log(index, log, known_inactivity_floor, callback).await
149}
150
151#[cfg(test)]
152pub(crate) mod test {
154 use super::*;
155 use crate::{
156 journal::contiguous::{fixed::Config as FConfig, variable::Config as VConfig},
157 qmdb::any::{FixedConfig, MerkleConfig, VariableConfig},
158 translator::OneCap,
159 };
160 use commonware_codec::{Codec, CodecShared};
161 use commonware_cryptography::{sha256::Digest, Hasher, Sha256};
162 use commonware_runtime::{
163 buffer::paged::CacheRef, deterministic::Context, BufferPooler, Metrics,
164 };
165 use commonware_utils::{NZUsize, NZU16, NZU64};
166 use core::{future::Future, pin::Pin};
167 use std::{
168 collections::HashMap,
169 num::{NonZeroU16, NonZeroUsize},
170 };
171
172 pub(crate) fn colliding_digest(prefix: u8, suffix: u64) -> Digest {
173 let mut bytes = [0u8; 32];
174 bytes[0] = prefix;
175 bytes[24..].copy_from_slice(&suffix.to_be_bytes());
176 Digest::from(bytes)
177 }
178
179 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
181 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
182
183 pub(crate) fn fixed_db_config<T: Translator + Default>(
184 suffix: &str,
185 pooler: &impl BufferPooler,
186 ) -> FixedConfig<T> {
187 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
188 FixedConfig {
189 merkle_config: MerkleConfig {
190 journal_partition: format!("journal-{suffix}"),
191 metadata_partition: format!("metadata-{suffix}"),
192 items_per_blob: NZU64!(11),
193 write_buffer: NZUsize!(1024),
194 thread_pool: None,
195 page_cache: page_cache.clone(),
196 },
197 journal_config: FConfig {
198 partition: format!("log-journal-{suffix}"),
199 items_per_blob: NZU64!(7),
200 page_cache,
201 write_buffer: NZUsize!(1024),
202 },
203 translator: T::default(),
204 }
205 }
206
207 pub(crate) fn variable_db_config<T: Translator + Default>(
208 suffix: &str,
209 pooler: &impl BufferPooler,
210 ) -> VariableConfig<T, ((), ())> {
211 let page_cache = CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE);
212 VariableConfig {
213 merkle_config: MerkleConfig {
214 journal_partition: format!("journal-{suffix}"),
215 metadata_partition: format!("metadata-{suffix}"),
216 items_per_blob: NZU64!(11),
217 write_buffer: NZUsize!(1024),
218 thread_pool: None,
219 page_cache: page_cache.clone(),
220 },
221 journal_config: VConfig {
222 partition: format!("log-journal-{suffix}"),
223 items_per_section: NZU64!(7),
224 compression: None,
225 codec_config: ((), ()),
226 page_cache,
227 write_buffer: NZUsize!(1024),
228 },
229 translator: T::default(),
230 }
231 }
232
233 use crate::{
234 index::Unordered as UnorderedIndex,
235 journal::contiguous::Mutable,
236 merkle::mmr,
237 qmdb::any::{
238 db::Db as AnyDb,
239 operation::{update::Update as UpdateTrait, Operation as AnyOperation},
240 traits::{DbAny, Provable, UnmerkleizedBatch as _},
241 },
242 };
243
244 type Error = crate::qmdb::Error<mmr::Family>;
245 type Location = mmr::Location;
246
247 pub(crate) trait RewindableDb {
248 fn rewind_to_size(
249 &mut self,
250 size: Location,
251 ) -> impl Future<Output = Result<(), Error>> + Send;
252 }
253
254 impl<E, U, C, I, H> RewindableDb for AnyDb<mmr::Family, E, C, I, H, U>
255 where
256 E: crate::Context,
257 U: UpdateTrait,
258 C: Mutable<Item = AnyOperation<mmr::Family, U>>,
259 I: UnorderedIndex<Value = Location>,
260 H: Hasher,
261 AnyOperation<mmr::Family, U>: Codec,
262 {
263 async fn rewind_to_size(&mut self, size: Location) -> Result<(), Error> {
264 self.rewind(size).await?;
265 Ok(())
266 }
267 }
268
269 pub(crate) async fn test_any_db_non_empty_recovery<F: Family, D, V: Clone + CodecShared>(
271 context: Context,
272 mut db: D,
273 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
274 make_value: impl Fn(u64) -> V,
275 ) where
276 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
277 {
278 const ELEMENTS: u64 = 1000;
279
280 {
282 let mut batch = db.new_batch();
283 for i in 0u64..ELEMENTS {
284 let k = Sha256::hash(&i.to_be_bytes());
285 let v = make_value(i * 1000);
286 batch = batch.write(k, Some(v));
287 }
288 let merkleized = batch.merkleize(&db, None).await.unwrap();
289 db.apply_batch(merkleized).await.unwrap();
290 }
291 db.commit().await.unwrap();
292 db.prune(db.inactivity_floor_loc().await).await.unwrap();
293 let root = db.root();
294 let op_count = db.size().await;
295 let inactivity_floor_loc = db.inactivity_floor_loc().await;
296
297 let db = reopen_db(context.with_label("reopen1")).await;
298 assert_eq!(db.size().await, op_count);
299 assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
300 assert_eq!(db.root(), root);
301
302 {
304 let mut batch = db.new_batch();
305 for i in 0u64..ELEMENTS {
306 let k = Sha256::hash(&i.to_be_bytes());
307 let v = make_value((i + 1) * 10000);
308 batch = batch.write(k, Some(v));
309 }
310 let _merkleized = batch.merkleize(&db, None).await.unwrap();
311 }
312 let db = reopen_db(context.with_label("reopen2")).await;
313 assert_eq!(db.size().await, op_count);
314 assert_eq!(db.inactivity_floor_loc().await, inactivity_floor_loc);
315 assert_eq!(db.root(), root);
316
317 {
319 let mut batch = db.new_batch();
320 for i in 0u64..ELEMENTS {
321 let k = Sha256::hash(&i.to_be_bytes());
322 let v = make_value((i + 1) * 10000);
323 batch = batch.write(k, Some(v));
324 }
325 let _merkleized = batch.merkleize(&db, None).await.unwrap();
326 }
327 let db = reopen_db(context.with_label("reopen3")).await;
328 assert_eq!(db.size().await, op_count);
329 assert_eq!(db.root(), root);
330
331 for _ in 0..3 {
333 let mut batch = db.new_batch();
334 for i in 0u64..ELEMENTS {
335 let k = Sha256::hash(&i.to_be_bytes());
336 let v = make_value((i + 1) * 10000);
337 batch = batch.write(k, Some(v));
338 }
339 let _merkleized = batch.merkleize(&db, None).await.unwrap();
340 }
341 let mut db = reopen_db(context.with_label("reopen4")).await;
342 assert_eq!(db.size().await, op_count);
343 assert_eq!(db.root(), root);
344
345 {
347 let mut batch = db.new_batch();
348 for i in 0u64..ELEMENTS {
349 let k = Sha256::hash(&i.to_be_bytes());
350 let v = make_value((i + 1) * 10000);
351 batch = batch.write(k, Some(v));
352 }
353 let merkleized = batch.merkleize(&db, None).await.unwrap();
354 db.apply_batch(merkleized).await.unwrap();
355 }
356 db.commit().await.unwrap();
357 let db = reopen_db(context.with_label("reopen5")).await;
358 assert!(db.size().await > op_count);
359 assert_ne!(db.inactivity_floor_loc().await, inactivity_floor_loc);
360 assert_ne!(db.root(), root);
361
362 db.destroy().await.unwrap();
363 }
364
365 pub(crate) async fn test_any_db_empty_recovery<F: Family, D, V: Clone + CodecShared>(
367 context: Context,
368 db: D,
369 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
370 make_value: impl Fn(u64) -> V,
371 ) where
372 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
373 {
374 let root = db.root();
375
376 let db = reopen_db(context.with_label("reopen1")).await;
377 assert_eq!(db.size().await, 1);
378 assert_eq!(db.root(), root);
379
380 {
382 let mut batch = db.new_batch();
383 for i in 0u64..1000 {
384 let k = Sha256::hash(&i.to_be_bytes());
385 let v = make_value((i + 1) * 10000);
386 batch = batch.write(k, Some(v));
387 }
388 let _merkleized = batch.merkleize(&db, None).await.unwrap();
389 }
390 let db = reopen_db(context.with_label("reopen2")).await;
391 assert_eq!(db.size().await, 1);
392 assert_eq!(db.root(), root);
393
394 {
396 let mut batch = db.new_batch();
397 for i in 0u64..1000 {
398 let k = Sha256::hash(&i.to_be_bytes());
399 let v = make_value((i + 1) * 10000);
400 batch = batch.write(k, Some(v));
401 }
402 let _merkleized = batch.merkleize(&db, None).await.unwrap();
403 }
404 drop(db);
405 let db = reopen_db(context.with_label("reopen3")).await;
406 assert_eq!(db.size().await, 1);
407 assert_eq!(db.root(), root);
408
409 for _ in 0..3 {
411 let mut batch = db.new_batch();
412 for i in 0u64..1000 {
413 let k = Sha256::hash(&i.to_be_bytes());
414 let v = make_value((i + 1) * 10000);
415 batch = batch.write(k, Some(v));
416 }
417 let _merkleized = batch.merkleize(&db, None).await.unwrap();
418 }
419 drop(db);
420 let mut db = reopen_db(context.with_label("reopen4")).await;
421 assert_eq!(db.size().await, 1);
422 assert_eq!(db.root(), root);
423
424 {
426 let mut batch = db.new_batch();
427 for i in 0u64..1000 {
428 let k = Sha256::hash(&i.to_be_bytes());
429 let v = make_value((i + 1) * 10000);
430 batch = batch.write(k, Some(v));
431 }
432 let merkleized = batch.merkleize(&db, None).await.unwrap();
433 db.apply_batch(merkleized).await.unwrap();
434 }
435 db.commit().await.unwrap();
436 drop(db);
437 let db = reopen_db(context.with_label("reopen5")).await;
438 assert!(db.size().await > 1);
439 assert_ne!(db.root(), root);
440
441 db.destroy().await.unwrap();
442 }
443
444 pub(crate) async fn test_any_db_rewind_recovery<D, V>(
446 context: Context,
447 mut db: D,
448 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
449 make_value: impl Fn(u64) -> V,
450 ) where
451 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + RewindableDb,
452 V: Clone + CodecShared + Eq + std::fmt::Debug,
453 {
454 let key0 = Sha256::hash(&0u64.to_be_bytes());
455 let key1 = Sha256::hash(&1u64.to_be_bytes());
456 let key2 = Sha256::hash(&2u64.to_be_bytes());
457 let initial_root = db.root();
458 let initial_size = db.size().await;
459 let initial_floor = db.inactivity_floor_loc().await;
460
461 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
463 let empty_range = db.apply_batch(merkleized).await.unwrap();
464 db.commit().await.unwrap();
465 assert_eq!(empty_range.start, initial_size);
466 assert_eq!(db.size().await, empty_range.end);
467 db.rewind_to_size(initial_size).await.unwrap();
468 assert_eq!(db.root(), initial_root);
469 assert_eq!(db.size().await, initial_size);
470 assert_eq!(db.inactivity_floor_loc().await, initial_floor);
471 assert_eq!(db.get_metadata().await.unwrap(), None);
472
473 let value0_a = make_value(10);
474 let value1_a = make_value(11);
475 let metadata_a = make_value(12);
476
477 let merkleized = db
478 .new_batch()
479 .write(key0, Some(value0_a.clone()))
480 .write(key1, Some(value1_a.clone()))
481 .merkleize(&db, Some(metadata_a.clone()))
482 .await
483 .unwrap();
484 let range_a = db.apply_batch(merkleized).await.unwrap();
485 db.commit().await.unwrap();
486
487 let root_a = db.root();
488 let size_a = db.size().await;
489 let floor_a = db.inactivity_floor_loc().await;
490 assert_eq!(size_a, range_a.end);
491
492 let value0_b = make_value(20);
493 let value2_b = make_value(21);
494 let metadata_b = make_value(22);
495
496 let merkleized = db
497 .new_batch()
498 .write(key0, Some(value0_b))
499 .write(key1, None)
500 .write(key2, Some(value2_b))
501 .merkleize(&db, Some(metadata_b))
502 .await
503 .unwrap();
504 let range_b = db.apply_batch(merkleized).await.unwrap();
505 db.commit().await.unwrap();
506 assert_eq!(range_b.start, size_a);
507 assert_ne!(db.root(), root_a);
508
509 let value0_c = make_value(30);
510 let value1_c = make_value(31);
511 let metadata_c = make_value(32);
512 let merkleized = db
513 .new_batch()
514 .write(key0, Some(value0_c))
515 .write(key1, Some(value1_c))
516 .write(key2, None)
517 .merkleize(&db, Some(metadata_c))
518 .await
519 .unwrap();
520 db.apply_batch(merkleized).await.unwrap();
521 db.commit().await.unwrap();
522
523 db.rewind_to_size(size_a).await.unwrap();
527 assert_eq!(db.root(), root_a);
528 assert_eq!(db.size().await, size_a);
529 assert_eq!(db.inactivity_floor_loc().await, floor_a);
530 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
531 assert_eq!(db.get(&key0).await.unwrap(), Some(value0_a));
532 assert_eq!(db.get(&key1).await.unwrap(), Some(value1_a));
533 assert_eq!(db.get(&key2).await.unwrap(), None);
534
535 db.commit().await.unwrap();
536 drop(db);
537 let mut db = reopen_db(context.with_label("reopen_after_rewind")).await;
538 assert_eq!(db.root(), root_a);
539 assert_eq!(db.size().await, size_a);
540 assert_eq!(db.inactivity_floor_loc().await, floor_a);
541 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
542 assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
543 assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
544 assert_eq!(db.get(&key2).await.unwrap(), None);
545
546 let value2_d = make_value(40);
549 let metadata_d = make_value(41);
550 let merkleized = db
551 .new_batch()
552 .write(key2, Some(value2_d.clone()))
553 .merkleize(&db, Some(metadata_d.clone()))
554 .await
555 .unwrap();
556 db.apply_batch(merkleized).await.unwrap();
557 db.commit().await.unwrap();
558 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d.clone()));
559 assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
560 assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
561 assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d.clone()));
562
563 drop(db);
564 let mut db = reopen_db(context.with_label("reopen_after_rewind_new_writes")).await;
565 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_d));
566 assert_eq!(db.get(&key0).await.unwrap(), Some(make_value(10)));
567 assert_eq!(db.get(&key1).await.unwrap(), Some(make_value(11)));
568 assert_eq!(db.get(&key2).await.unwrap(), Some(value2_d));
569
570 db.rewind_to_size(initial_size).await.unwrap();
572 assert_eq!(db.root(), initial_root);
573 assert_eq!(db.size().await, initial_size);
574 assert_eq!(db.inactivity_floor_loc().await, initial_floor);
575 assert_eq!(db.get_metadata().await.unwrap(), None);
576 assert_eq!(db.get(&key0).await.unwrap(), None);
577 assert_eq!(db.get(&key1).await.unwrap(), None);
578 assert_eq!(db.get(&key2).await.unwrap(), None);
579
580 db.commit().await.unwrap();
581 drop(db);
582 let db = reopen_db(context.with_label("reopen_initial_boundary")).await;
583 assert_eq!(db.root(), initial_root);
584 assert_eq!(db.size().await, initial_size);
585 assert_eq!(db.inactivity_floor_loc().await, initial_floor);
586 assert_eq!(db.get_metadata().await.unwrap(), None);
587 assert_eq!(db.get(&key0).await.unwrap(), None);
588 assert_eq!(db.get(&key1).await.unwrap(), None);
589 assert_eq!(db.get(&key2).await.unwrap(), None);
590
591 db.destroy().await.unwrap();
592 }
593
594 pub(crate) async fn test_any_db_build_and_authenticate<D, V>(
596 context: Context,
597 mut db: D,
598 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
599 make_value: impl Fn(u64) -> V,
600 ) where
601 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
602 V: CodecShared + Clone + Eq + std::hash::Hash + std::fmt::Debug,
603 <D as Provable<mmr::Family>>::Operation: Codec,
604 {
605 use crate::{mmr::StandardHasher, qmdb::verify_proof};
606
607 const ELEMENTS: u64 = 1000;
608
609 let mut map = HashMap::<Digest, V>::default();
610 {
611 let mut batch = db.new_batch();
612 for i in 0u64..ELEMENTS {
613 let k = Sha256::hash(&i.to_be_bytes());
614 let v = make_value(i * 1000);
615 batch = batch.write(k, Some(v.clone()));
616 map.insert(k, v);
617 }
618
619 for i in 0u64..ELEMENTS {
621 if i % 3 != 0 {
622 continue;
623 }
624 let k = Sha256::hash(&i.to_be_bytes());
625 let v = make_value((i + 1) * 10000);
626 batch = batch.write(k, Some(v.clone()));
627 map.insert(k, v);
628 }
629
630 for i in 0u64..ELEMENTS {
632 if i % 7 != 1 {
633 continue;
634 }
635 let k = Sha256::hash(&i.to_be_bytes());
636 batch = batch.write(k, None);
637 map.remove(&k);
638 }
639
640 let merkleized = batch.merkleize(&db, None).await.unwrap();
641 db.apply_batch(merkleized).await.unwrap();
642 }
643 db.sync().await.unwrap();
645 db.prune(db.inactivity_floor_loc().await).await.unwrap();
646
647 let root = db.root();
649 db.sync().await.unwrap();
650 drop(db);
651 let db = reopen_db(context.with_label("reopened")).await;
652 assert_eq!(root, db.root());
653
654 for i in 0u64..ELEMENTS {
656 let k = Sha256::hash(&i.to_be_bytes());
657 if let Some(map_value) = map.get(&k) {
658 let Some(db_value) = db.get(&k).await.unwrap() else {
659 panic!("key not found in db: {k}");
660 };
661 assert_eq!(*map_value, db_value);
662 } else {
663 assert!(db.get(&k).await.unwrap().is_none());
664 }
665 }
666
667 let hasher = StandardHasher::<Sha256>::new();
668 let bounds = db.bounds().await;
669 let inactivity_floor = db.inactivity_floor_loc().await;
670 for loc in *inactivity_floor..*bounds.end {
671 let loc = Location::new(loc);
672 let (proof, ops) = db.proof(loc, NZU64!(10)).await.unwrap();
673 assert!(verify_proof(&hasher, &proof, loc, &ops, &root));
674 }
675
676 db.destroy().await.unwrap();
677 }
678
679 pub(crate) async fn test_any_db_log_replay<
681 F: Family,
682 D,
683 V: Clone + CodecShared + PartialEq + std::fmt::Debug,
684 >(
685 context: Context,
686 mut db: D,
687 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
688 make_value: impl Fn(u64) -> V,
689 ) where
690 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
691 {
692 const UPDATES: u64 = 100;
694 let k = Sha256::hash(&UPDATES.to_be_bytes());
695 let mut last_value = None;
696 {
697 let mut batch = db.new_batch();
698 for i in 0u64..UPDATES {
699 let v = make_value(i * 1000);
700 last_value = Some(v.clone());
701 batch = batch.write(k, Some(v));
702 }
703 let merkleized = batch.merkleize(&db, None).await.unwrap();
704 db.apply_batch(merkleized).await.unwrap();
705 }
706 db.commit().await.unwrap();
707 let root = db.root();
708
709 drop(db);
711 let db = reopen_db(context.with_label("reopened")).await;
712 assert_eq!(db.root(), root);
713 assert_eq!(db.get(&k).await.unwrap(), last_value);
714
715 db.destroy().await.unwrap();
716 }
717
718 pub(crate) async fn test_any_db_historical_proof_basic<D, V: Clone + CodecShared>(
720 _context: Context,
721 mut db: D,
722 make_value: impl Fn(u64) -> V,
723 ) where
724 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
725 <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
726 {
727 use crate::{mmr::StandardHasher, qmdb::verify_proof};
728 use commonware_utils::NZU64;
729
730 const OPS: u64 = 20;
732 {
733 let mut batch = db.new_batch();
734 for i in 0u64..OPS {
735 let k = Sha256::hash(&i.to_be_bytes());
736 let v = make_value(i * 1000);
737 batch = batch.write(k, Some(v));
738 }
739 let merkleized = batch.merkleize(&db, None).await.unwrap();
740 db.apply_batch(merkleized).await.unwrap();
741 }
742 let root_hash = db.root();
743 let original_op_count = db.size().await;
744
745 let max_ops = NZU64!(10);
747 let start_loc = Location::new(5);
748 let (historical_proof, historical_ops) = db
749 .historical_proof(original_op_count, start_loc, max_ops)
750 .await
751 .unwrap();
752 let (regular_proof, regular_ops) = db.proof(start_loc, max_ops).await.unwrap();
753
754 assert_eq!(historical_proof.leaves, regular_proof.leaves);
755 assert_eq!(historical_proof.digests, regular_proof.digests);
756 assert_eq!(historical_ops, regular_ops);
757 let hasher = StandardHasher::<Sha256>::new();
758 assert!(verify_proof(
759 &hasher,
760 &historical_proof,
761 start_loc,
762 &historical_ops,
763 &root_hash
764 ));
765
766 {
768 let mut batch = db.new_batch();
769 for i in OPS..(OPS + 5) {
770 let k = Sha256::hash(&(i + 1000).to_be_bytes()); let v = make_value(i * 1000);
772 batch = batch.write(k, Some(v));
773 }
774 let merkleized = batch.merkleize(&db, None).await.unwrap();
775 db.apply_batch(merkleized).await.unwrap();
776 }
777
778 let (historical_proof2, historical_ops2) = db
780 .historical_proof(original_op_count, start_loc, max_ops)
781 .await
782 .unwrap();
783 assert_eq!(historical_proof2.leaves, original_op_count);
784 assert_eq!(historical_proof2.digests, regular_proof.digests);
785 assert_eq!(historical_ops2, regular_ops);
786 assert!(verify_proof(
787 &hasher,
788 &historical_proof2,
789 start_loc,
790 &historical_ops2,
791 &root_hash
792 ));
793
794 db.destroy().await.unwrap();
795 }
796
797 pub(crate) async fn test_any_db_historical_proof_invalid<D, V: Clone + CodecShared>(
799 _context: Context,
800 mut db: D,
801 make_value: impl Fn(u64) -> V,
802 ) where
803 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
804 <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug + Clone,
805 {
806 use crate::{mmr::StandardHasher, qmdb::verify_proof};
807 use commonware_utils::NZU64;
808
809 {
811 let mut batch = db.new_batch();
812 for i in 0u64..10 {
813 let k = Sha256::hash(&i.to_be_bytes());
814 let v = make_value(i * 1000);
815 batch = batch.write(k, Some(v));
816 }
817 let merkleized = batch.merkleize(&db, None).await.unwrap();
818 db.apply_batch(merkleized).await.unwrap();
819 }
820
821 let historical_op_count = Location::new(5);
822 let (proof, ops) = db
823 .historical_proof(historical_op_count, Location::new(1), NZU64!(10))
824 .await
825 .unwrap();
826 assert_eq!(proof.leaves, historical_op_count);
827 assert_eq!(ops.len(), 4);
828
829 let hasher = StandardHasher::<Sha256>::new();
830
831 {
833 let mut tampered_proof = proof.clone();
834 tampered_proof.digests[0] = Sha256::hash(b"invalid");
835 let root_hash = db.root();
836 assert!(!verify_proof(
837 &hasher,
838 &tampered_proof,
839 Location::new(1),
840 &ops,
841 &root_hash
842 ));
843 }
844
845 {
847 let mut tampered_proof = proof.clone();
848 tampered_proof.digests.push(Sha256::hash(b"invalid"));
849 let root_hash = db.root();
850 assert!(!verify_proof(
851 &hasher,
852 &tampered_proof,
853 Location::new(1),
854 &ops,
855 &root_hash
856 ));
857 }
858
859 {
861 let root_hash = db.root();
862 let mut tampered_ops = ops.clone();
863 if tampered_ops.len() >= 2 {
865 tampered_ops.swap(0, 1);
866 assert!(!verify_proof(
867 &hasher,
868 &proof,
869 Location::new(1),
870 &tampered_ops,
871 &root_hash
872 ));
873 }
874 }
875
876 {
878 let root_hash = db.root();
879 let mut tampered_ops = ops.clone();
880 tampered_ops.push(tampered_ops[0].clone());
881 assert!(!verify_proof(
882 &hasher,
883 &proof,
884 Location::new(1),
885 &tampered_ops,
886 &root_hash
887 ));
888 }
889
890 {
892 let root_hash = db.root();
893 assert!(!verify_proof(
894 &hasher,
895 &proof,
896 Location::new(2),
897 &ops,
898 &root_hash
899 ));
900 }
901
902 {
904 let invalid_root = Sha256::hash(b"invalid");
905 assert!(!verify_proof(
906 &hasher,
907 &proof,
908 Location::new(1),
909 &ops,
910 &invalid_root
911 ));
912 }
913
914 {
916 let mut tampered_proof = proof.clone();
917 tampered_proof.leaves = Location::new(100);
918 let root_hash = db.root();
919 assert!(!verify_proof(
920 &hasher,
921 &tampered_proof,
922 Location::new(1),
923 &ops,
924 &root_hash
925 ));
926 }
927
928 db.destroy().await.unwrap();
929 }
930
931 pub(crate) async fn test_any_db_historical_proof_edge_cases<D, V: Clone + CodecShared>(
933 _context: Context,
934 mut db: D,
935 make_value: impl Fn(u64) -> V,
936 ) where
937 D: DbAny<mmr::Family, Key = Digest, Value = V, Digest = Digest> + Provable<mmr::Family>,
938 <D as Provable<mmr::Family>>::Operation: Codec + PartialEq + std::fmt::Debug,
939 {
940 use commonware_utils::NZU64;
941
942 {
944 let mut batch = db.new_batch();
945 for i in 0u64..50 {
946 let k = Sha256::hash(&i.to_be_bytes());
947 let v = make_value(i * 1000);
948 batch = batch.write(k, Some(v));
949 }
950 let merkleized = batch.merkleize(&db, None).await.unwrap();
951 db.apply_batch(merkleized).await.unwrap();
952 }
953
954 let (single_proof, single_ops) = db
956 .historical_proof(Location::new(2), Location::new(1), NZU64!(1))
957 .await
958 .unwrap();
959 assert_eq!(single_proof.leaves, Location::new(2));
960 assert_eq!(single_ops.len(), 1);
961
962 let (_limited_proof, limited_ops) = db
964 .historical_proof(Location::new(11), Location::new(6), NZU64!(20))
965 .await
966 .unwrap();
967 assert_eq!(limited_ops.len(), 5); let (min_proof, min_ops) = db
971 .historical_proof(Location::new(4), Location::new(1), NZU64!(3))
972 .await
973 .unwrap();
974 assert_eq!(min_proof.leaves, Location::new(4));
975 assert_eq!(min_ops.len(), 3);
976
977 db.destroy().await.unwrap();
978 }
979
980 pub(crate) async fn test_any_db_multiple_commits_delete_replayed<F: Family, D, V>(
982 context: Context,
983 mut db: D,
984 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
985 make_value: impl Fn(u64) -> V,
986 ) where
987 D: DbAny<F, Key = Digest, Value = V, Digest = Digest>,
988 V: Clone + CodecShared + Eq + std::fmt::Debug,
989 {
990 let mut map = HashMap::<Digest, V>::default();
991 const ELEMENTS: u64 = 10;
992 let metadata_value = make_value(42);
993 let key_at = |j: u64, i: u64| Sha256::hash(&(j * 1000 + i).to_be_bytes());
994 for j in 0u64..ELEMENTS {
995 let mut batch = db.new_batch();
996 for i in 0u64..ELEMENTS {
997 let k = key_at(j, i);
998 let v = make_value(i * 1000);
999 batch = batch.write(k, Some(v.clone()));
1000 map.insert(k, v);
1001 }
1002 let merkleized = batch
1003 .merkleize(&db, Some(metadata_value.clone()))
1004 .await
1005 .unwrap();
1006 db.apply_batch(merkleized).await.unwrap();
1007 db.commit().await.unwrap();
1008 }
1009 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_value));
1010 let k = key_at(ELEMENTS - 1, ELEMENTS - 1);
1011
1012 let merkleized = db
1013 .new_batch()
1014 .write(k, None)
1015 .merkleize(&db, None)
1016 .await
1017 .unwrap();
1018 db.apply_batch(merkleized).await.unwrap();
1019 db.commit().await.unwrap();
1020 assert_eq!(db.get_metadata().await.unwrap(), None);
1021 assert!(db.get(&k).await.unwrap().is_none());
1022
1023 let root = db.root();
1024 drop(db);
1025 let db = reopen_db(context.with_label("reopened")).await;
1026 assert_eq!(root, db.root());
1027 assert_eq!(db.get_metadata().await.unwrap(), None);
1028 assert!(db.get(&k).await.unwrap().is_none());
1029
1030 db.destroy().await.unwrap();
1031 }
1032
1033 use crate::qmdb::any::{
1034 ordered::{fixed::Db as OrderedFixedDb, variable::Db as OrderedVariableDb},
1035 unordered::{fixed::Db as UnorderedFixedDb, variable::Db as UnorderedVariableDb},
1036 };
1037 use commonware_macros::{test_group, test_traced};
1038 use commonware_runtime::{deterministic, Runner as _};
1039
1040 type UnorderedFixed = UnorderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1042 type UnorderedVariable =
1043 UnorderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1044 type OrderedFixed = OrderedFixedDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1045 type OrderedVariable = OrderedVariableDb<mmr::Family, Context, Digest, Digest, Sha256, OneCap>;
1046 type UnorderedFixedP1 =
1047 unordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
1048 type UnorderedVariableP1 = unordered::variable::partitioned::Db<
1049 mmr::Family,
1050 Context,
1051 Digest,
1052 Digest,
1053 Sha256,
1054 OneCap,
1055 1,
1056 >;
1057 type OrderedFixedP1 =
1058 ordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
1059 type OrderedVariableP1 =
1060 ordered::variable::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 1>;
1061 type UnorderedFixedP2 =
1062 unordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
1063 type UnorderedVariableP2 = unordered::variable::partitioned::Db<
1064 mmr::Family,
1065 Context,
1066 Digest,
1067 Digest,
1068 Sha256,
1069 OneCap,
1070 2,
1071 >;
1072 type OrderedFixedP2 =
1073 ordered::fixed::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
1074 type OrderedVariableP2 =
1075 ordered::variable::partitioned::Db<mmr::Family, Context, Digest, Digest, Sha256, OneCap, 2>;
1076
1077 mod mmb_types {
1079 use super::*;
1080 use crate::{
1081 index::{ordered::Index as OrderedIndex, unordered::Index as UnorderedIndex},
1082 journal::contiguous::{fixed::Journal as FJournal, variable::Journal as VJournal},
1083 merkle::{mmb, Location},
1084 qmdb::any::{
1085 operation::{update, Operation},
1086 value::{FixedEncoding, VariableEncoding},
1087 },
1088 };
1089
1090 type MmbLocation = Location<mmb::Family>;
1091
1092 pub type MmbUnorderedFixed = super::super::db::Db<
1093 mmb::Family,
1094 Context,
1095 FJournal<
1096 Context,
1097 Operation<mmb::Family, update::Unordered<Digest, FixedEncoding<Digest>>>,
1098 >,
1099 UnorderedIndex<OneCap, MmbLocation>,
1100 Sha256,
1101 update::Unordered<Digest, FixedEncoding<Digest>>,
1102 >;
1103
1104 pub type MmbUnorderedVariable = super::super::db::Db<
1105 mmb::Family,
1106 Context,
1107 VJournal<
1108 Context,
1109 Operation<mmb::Family, update::Unordered<Digest, VariableEncoding<Digest>>>,
1110 >,
1111 UnorderedIndex<OneCap, MmbLocation>,
1112 Sha256,
1113 update::Unordered<Digest, VariableEncoding<Digest>>,
1114 >;
1115
1116 pub type MmbOrderedFixed = super::super::db::Db<
1117 mmb::Family,
1118 Context,
1119 FJournal<
1120 Context,
1121 Operation<mmb::Family, update::Ordered<Digest, FixedEncoding<Digest>>>,
1122 >,
1123 OrderedIndex<OneCap, MmbLocation>,
1124 Sha256,
1125 update::Ordered<Digest, FixedEncoding<Digest>>,
1126 >;
1127
1128 pub type MmbOrderedVariable = super::super::db::Db<
1129 mmb::Family,
1130 Context,
1131 VJournal<
1132 Context,
1133 Operation<mmb::Family, update::Ordered<Digest, VariableEncoding<Digest>>>,
1134 >,
1135 OrderedIndex<OneCap, MmbLocation>,
1136 Sha256,
1137 update::Ordered<Digest, VariableEncoding<Digest>>,
1138 >;
1139 }
1140 use mmb_types::*;
1141
1142 #[inline]
1143 fn to_digest(i: u64) -> Digest {
1144 Sha256::hash(&i.to_be_bytes())
1145 }
1146
1147 macro_rules! with_mmr_variants {
1149 ($cb:ident!($($args:tt)*)) => {
1150 $cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
1151 $cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
1152 $cb!($($args)*, "of", OrderedFixed, fixed_db_config);
1153 $cb!($($args)*, "ov", OrderedVariable, variable_db_config);
1154 $cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
1155 $cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
1156 $cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
1157 $cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
1158 $cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
1159 $cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
1160 $cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
1161 $cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
1162 };
1163 }
1164
1165 macro_rules! with_all_variants {
1167 ($cb:ident!($($args:tt)*)) => {
1168 $cb!($($args)*, "uf", UnorderedFixed, fixed_db_config);
1169 $cb!($($args)*, "uv", UnorderedVariable, variable_db_config);
1170 $cb!($($args)*, "of", OrderedFixed, fixed_db_config);
1171 $cb!($($args)*, "ov", OrderedVariable, variable_db_config);
1172 $cb!($($args)*, "ufp1", UnorderedFixedP1, fixed_db_config);
1173 $cb!($($args)*, "uvp1", UnorderedVariableP1, variable_db_config);
1174 $cb!($($args)*, "ofp1", OrderedFixedP1, fixed_db_config);
1175 $cb!($($args)*, "ovp1", OrderedVariableP1, variable_db_config);
1176 $cb!($($args)*, "ufp2", UnorderedFixedP2, fixed_db_config);
1177 $cb!($($args)*, "uvp2", UnorderedVariableP2, variable_db_config);
1178 $cb!($($args)*, "ofp2", OrderedFixedP2, fixed_db_config);
1179 $cb!($($args)*, "ovp2", OrderedVariableP2, variable_db_config);
1180 $cb!($($args)*, "uf_mmb", MmbUnorderedFixed, fixed_db_config);
1181 $cb!($($args)*, "uv_mmb", MmbUnorderedVariable, variable_db_config);
1182 $cb!($($args)*, "of_mmb", MmbOrderedFixed, fixed_db_config);
1183 $cb!($($args)*, "ov_mmb", MmbOrderedVariable, variable_db_config);
1184 };
1185 }
1186
1187 macro_rules! test_with_reopen {
1190 ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
1191 let p = concat!($l, "_", $sfx);
1192 Box::pin(async {
1193 let ctx = $ctx.with_label($l);
1194 let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1195 .await
1196 .unwrap();
1197 $f(
1198 ctx,
1199 db,
1200 |ctx| {
1201 Box::pin(async move {
1202 <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1203 .await
1204 .unwrap()
1205 })
1206 },
1207 to_digest,
1208 )
1209 .await;
1210 })
1211 .await
1212 }};
1213 }
1214
1215 macro_rules! test_with_make_value {
1216 ($ctx:expr, $sfx:expr, $f:expr, $l:literal, $db:ty, $cfg:ident) => {{
1217 let p = concat!($l, "_", $sfx);
1218 Box::pin(async {
1219 let ctx = $ctx.with_label($l);
1220 let db = <$db>::init(ctx.clone(), $cfg::<OneCap>(p, &ctx))
1221 .await
1222 .unwrap();
1223 $f(ctx, db, to_digest).await;
1224 })
1225 .await
1226 }};
1227 }
1228
1229 macro_rules! for_all_variants {
1231 ($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
1232 with_all_variants!(test_with_reopen!($ctx, $sfx, $f));
1233 }};
1234 ($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
1235 with_all_variants!(test_with_make_value!($ctx, $sfx, $f));
1236 }};
1237 }
1238
1239 macro_rules! for_mmr_variants {
1242 ($ctx:expr, $sfx:expr, with_reopen: $f:expr) => {{
1243 with_mmr_variants!(test_with_reopen!($ctx, $sfx, $f));
1244 }};
1245 ($ctx:expr, $sfx:expr, with_make_value: $f:expr) => {{
1246 with_mmr_variants!(test_with_make_value!($ctx, $sfx, $f));
1247 }};
1248 }
1249
1250 #[test_group("slow")]
1251 #[test_traced("WARN")]
1252 fn test_all_variants_log_replay() {
1253 let executor = deterministic::Runner::default();
1254 executor.start(|context| async move {
1255 for_all_variants!(context, "lr", with_reopen: test_any_db_log_replay);
1256 });
1257 }
1258
1259 #[test_group("slow")]
1260 #[test_traced("WARN")]
1261 fn test_all_variants_build_and_authenticate() {
1262 let executor = deterministic::Runner::default();
1263 executor.start(|context| async move {
1264 for_mmr_variants!(context, "baa", with_reopen: test_any_db_build_and_authenticate);
1265 });
1266 }
1267
1268 #[test_group("slow")]
1269 #[test_traced("WARN")]
1270 fn test_all_variants_historical_proof_basic() {
1271 let executor = deterministic::Runner::default();
1272 executor.start(|context| async move {
1273 for_mmr_variants!(context, "hpb", with_make_value: test_any_db_historical_proof_basic);
1274 });
1275 }
1276
1277 #[test_group("slow")]
1278 #[test_traced("WARN")]
1279 fn test_all_variants_historical_proof_invalid() {
1280 let executor = deterministic::Runner::default();
1281 executor.start(|context| async move {
1282 for_mmr_variants!(context, "hpi", with_make_value: test_any_db_historical_proof_invalid);
1283 });
1284 }
1285
1286 #[test_group("slow")]
1287 #[test_traced("WARN")]
1288 fn test_all_variants_historical_proof_edge_cases() {
1289 let executor = deterministic::Runner::default();
1290 executor.start(|context| async move {
1291 for_mmr_variants!(context, "hpec", with_make_value: test_any_db_historical_proof_edge_cases);
1292 });
1293 }
1294
1295 #[test_group("slow")]
1296 #[test_traced("WARN")]
1297 fn test_all_variants_multiple_commits_delete_replayed() {
1298 let executor = deterministic::Runner::default();
1299 executor.start(|context| async move {
1300 for_all_variants!(context, "mcdr", with_reopen: test_any_db_multiple_commits_delete_replayed);
1301 });
1302 }
1303
1304 #[test_group("slow")]
1305 #[test_traced("WARN")]
1306 fn test_all_variants_non_empty_recovery() {
1307 let executor = deterministic::Runner::default();
1308 executor.start(|context| async move {
1309 for_all_variants!(context, "ner", with_reopen: test_any_db_non_empty_recovery);
1310 });
1311 }
1312
1313 #[test_group("slow")]
1314 #[test_traced("WARN")]
1315 fn test_all_variants_empty_recovery() {
1316 let executor = deterministic::Runner::default();
1317 executor.start(|context| async move {
1318 for_all_variants!(context, "er", with_reopen: test_any_db_empty_recovery);
1319 });
1320 }
1321
1322 #[test_group("slow")]
1323 #[test_traced("WARN")]
1324 fn test_all_variants_rewind_recovery() {
1325 let executor = deterministic::Runner::default();
1326 executor.start(|context| async move {
1327 for_mmr_variants!(context, "rr", with_reopen: test_any_db_rewind_recovery);
1328 });
1329 }
1330
1331 fn key(i: u64) -> Digest {
1332 Sha256::hash(&i.to_be_bytes())
1333 }
1334
1335 fn val(i: u64) -> Digest {
1336 Sha256::hash(&(i + 10000).to_be_bytes())
1337 }
1338
1339 async fn commit_writes(
1341 db: &mut UnorderedVariable,
1342 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
1343 metadata: Option<Digest>,
1344 ) -> std::ops::Range<crate::mmr::Location> {
1345 let mut batch = db.new_batch();
1346 for (k, v) in writes {
1347 batch = batch.write(k, v);
1348 }
1349 let merkleized = batch.merkleize(&*db, metadata).await.unwrap();
1350 let range = db.apply_batch(merkleized).await.unwrap();
1351 db.commit().await.unwrap();
1352 range
1353 }
1354
1355 #[test_traced("INFO")]
1357 fn test_any_batch_empty() {
1358 let executor = deterministic::Runner::default();
1359 executor.start(|context| async move {
1360 let ctx = context.with_label("db");
1361 let mut db: UnorderedVariable =
1362 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("e", &ctx))
1363 .await
1364 .unwrap();
1365
1366 let root_before = db.root();
1367 let batch = db.new_batch();
1368 let merkleized = batch.merkleize(&db, None).await.unwrap();
1369 db.apply_batch(merkleized).await.unwrap();
1370
1371 assert_ne!(db.root(), root_before);
1373
1374 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1376 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1377
1378 db.destroy().await.unwrap();
1379 });
1380 }
1381
1382 #[test_traced("INFO")]
1384 fn test_any_batch_metadata() {
1385 let executor = deterministic::Runner::default();
1386 executor.start(|context| async move {
1387 let ctx = context.with_label("db");
1388 let mut db: UnorderedVariable =
1389 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("m", &ctx))
1390 .await
1391 .unwrap();
1392
1393 let metadata = val(42);
1394
1395 commit_writes(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
1397 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1398
1399 let batch = db.new_batch();
1401 let merkleized = batch.merkleize(&db, None).await.unwrap();
1402 db.apply_batch(merkleized).await.unwrap();
1403 assert_eq!(db.get_metadata().await.unwrap(), None);
1404
1405 db.destroy().await.unwrap();
1406 });
1407 }
1408
1409 #[test_traced("INFO")]
1412 fn test_any_batch_get_read_through() {
1413 let executor = deterministic::Runner::default();
1414 executor.start(|context| async move {
1415 let ctx = context.with_label("db");
1416 let mut db: UnorderedVariable =
1417 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("g", &ctx))
1418 .await
1419 .unwrap();
1420
1421 let ka = key(0);
1423 let va = val(0);
1424 commit_writes(&mut db, [(ka, Some(va))], None).await;
1425
1426 let kb = key(1);
1427 let vb = val(1);
1428 let kc = key(2);
1429
1430 let mut batch = db.new_batch();
1431
1432 assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va));
1434
1435 batch = batch.write(kb, Some(vb));
1437 assert_eq!(batch.get(&kb, &db).await.unwrap(), Some(vb));
1438
1439 assert_eq!(batch.get(&kc, &db).await.unwrap(), None);
1441
1442 let va2 = val(100);
1444 batch = batch.write(ka, Some(va2));
1445 assert_eq!(batch.get(&ka, &db).await.unwrap(), Some(va2));
1446
1447 batch = batch.write(ka, None);
1449 assert_eq!(batch.get(&ka, &db).await.unwrap(), None);
1450
1451 db.destroy().await.unwrap();
1452 });
1453 }
1454
1455 #[test_traced("INFO")]
1457 fn test_any_batch_get_on_merkleized() {
1458 let executor = deterministic::Runner::default();
1459 executor.start(|context| async move {
1460 let ctx = context.with_label("db");
1461 let mut db: UnorderedVariable =
1462 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("mg", &ctx))
1463 .await
1464 .unwrap();
1465
1466 let ka = key(0);
1467 let kb = key(1);
1468 let kc = key(2);
1469 let kd = key(3);
1470
1471 commit_writes(&mut db, [(ka, Some(val(0))), (kb, Some(val(1)))], None).await;
1473
1474 let va2 = val(100);
1476 let vc = val(2);
1477 let mut batch = db.new_batch();
1478 batch = batch.write(ka, Some(va2));
1479 batch = batch.write(kb, None);
1480 batch = batch.write(kc, Some(vc));
1481 let merkleized = batch.merkleize(&db, None).await.unwrap();
1482
1483 assert_eq!(merkleized.get(&ka, &db).await.unwrap(), Some(va2));
1484 assert_eq!(merkleized.get(&kb, &db).await.unwrap(), None);
1485 assert_eq!(merkleized.get(&kc, &db).await.unwrap(), Some(vc));
1486 assert_eq!(merkleized.get(&kd, &db).await.unwrap(), None);
1487
1488 db.destroy().await.unwrap();
1489 });
1490 }
1491
1492 #[test_traced("INFO")]
1494 fn test_any_batch_stacked_get() {
1495 let executor = deterministic::Runner::default();
1496 executor.start(|context| async move {
1497 let ctx = context.with_label("db");
1498 let db: UnorderedVariable =
1499 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("sg", &ctx))
1500 .await
1501 .unwrap();
1502
1503 let ka = key(0);
1504 let kb = key(1);
1505
1506 let mut batch = db.new_batch();
1508 batch = batch.write(ka, Some(val(0)));
1509 let merkleized = batch.merkleize(&db, None).await.unwrap();
1510
1511 let mut child = merkleized.new_batch::<Sha256>();
1513 assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(0)));
1514
1515 child = child.write(ka, Some(val(100)));
1517 assert_eq!(child.get(&ka, &db).await.unwrap(), Some(val(100)));
1518
1519 child = child.write(kb, Some(val(1)));
1521 assert_eq!(child.get(&kb, &db).await.unwrap(), Some(val(1)));
1522
1523 child = child.write(ka, None);
1525 assert_eq!(child.get(&ka, &db).await.unwrap(), None);
1526
1527 db.destroy().await.unwrap();
1528 });
1529 }
1530
1531 #[test_traced("INFO")]
1533 fn test_any_batch_stacked_delete_recreate() {
1534 let executor = deterministic::Runner::default();
1535 executor.start(|context| async move {
1536 let ctx = context.with_label("db");
1537 let mut db: UnorderedVariable =
1538 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dr", &ctx))
1539 .await
1540 .unwrap();
1541
1542 let ka = key(0);
1543
1544 commit_writes(&mut db, [(ka, Some(val(0)))], None).await;
1546
1547 let mut parent = db.new_batch();
1549 parent = parent.write(ka, None);
1550 let parent_m = parent.merkleize(&db, None).await.unwrap();
1551 assert_eq!(parent_m.get(&ka, &db).await.unwrap(), None);
1552
1553 let mut child = parent_m.new_batch::<Sha256>();
1555 child = child.write(ka, Some(val(200)));
1556 let child_m = child.merkleize(&db, None).await.unwrap();
1557 assert_eq!(child_m.get(&ka, &db).await.unwrap(), Some(val(200)));
1558
1559 db.apply_batch(child_m).await.unwrap();
1561 assert_eq!(db.get(&ka).await.unwrap(), Some(val(200)));
1562
1563 db.destroy().await.unwrap();
1564 });
1565 }
1566
1567 #[test_traced("INFO")]
1570 fn test_any_batch_floor_raise() {
1571 let executor = deterministic::Runner::default();
1572 executor.start(|context| async move {
1573 let ctx = context.with_label("db");
1574 let mut db: UnorderedVariable =
1575 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("fr", &ctx))
1576 .await
1577 .unwrap();
1578
1579 let init: Vec<_> = (0..100).map(|i| (key(i), Some(val(i)))).collect();
1581 commit_writes(&mut db, init, None).await;
1582
1583 let floor_before = db.inactivity_floor_loc();
1584
1585 let updates: Vec<_> = (0..30).map(|i| (key(i), Some(val(i + 500)))).collect();
1587 commit_writes(&mut db, updates, None).await;
1588
1589 assert!(db.inactivity_floor_loc() > floor_before);
1591
1592 for i in 0..30 {
1594 assert_eq!(
1595 db.get(&key(i)).await.unwrap(),
1596 Some(val(i + 500)),
1597 "updated key {i} mismatch"
1598 );
1599 }
1600 for i in 30..100 {
1601 assert_eq!(
1602 db.get(&key(i)).await.unwrap(),
1603 Some(val(i)),
1604 "untouched key {i} mismatch"
1605 );
1606 }
1607
1608 db.destroy().await.unwrap();
1609 });
1610 }
1611
1612 #[test_traced("INFO")]
1614 fn test_any_batch_apply_returns_range() {
1615 let executor = deterministic::Runner::default();
1616 executor.start(|context| async move {
1617 let ctx = context.with_label("db");
1618 let mut db: UnorderedVariable =
1619 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ar", &ctx))
1620 .await
1621 .unwrap();
1622
1623 let writes: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1625 let range1 = commit_writes(&mut db, writes, None).await;
1626
1627 assert_eq!(range1.start, crate::mmr::Location::new(1));
1629 assert!(range1.end.saturating_sub(*range1.start) >= 6);
1631
1632 let writes: Vec<_> = (5..10).map(|i| (key(i), Some(val(i)))).collect();
1634 let range2 = commit_writes(&mut db, writes, None).await;
1635 assert_eq!(range2.start, range1.end);
1636
1637 db.destroy().await.unwrap();
1638 });
1639 }
1640
1641 #[test_traced("INFO")]
1643 fn test_any_batch_deep_chain() {
1644 let executor = deterministic::Runner::default();
1645 executor.start(|context| async move {
1646 let ctx = context.with_label("db");
1647 let mut db: UnorderedVariable =
1648 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("dc", &ctx))
1649 .await
1650 .unwrap();
1651
1652 let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1654 commit_writes(&mut db, init, None).await;
1655
1656 let mut parent = db.new_batch();
1658 parent = parent.write(key(0), Some(val(100)));
1659 parent = parent.write(key(5), Some(val(5)));
1660 let parent_m = parent.merkleize(&db, None).await.unwrap();
1661
1662 let mut child = parent_m.new_batch::<Sha256>();
1664 child = child.write(key(1), Some(val(101)));
1665 child = child.write(key(6), Some(val(6)));
1666 let child_m = child.merkleize(&db, None).await.unwrap();
1667
1668 let mut grandchild = child_m.new_batch::<Sha256>();
1670 grandchild = grandchild.write(key(2), None);
1671 grandchild = grandchild.write(key(7), Some(val(7)));
1672 let grandchild_m = grandchild.merkleize(&db, None).await.unwrap();
1673
1674 assert_eq!(
1676 grandchild_m.get(&key(0), &db).await.unwrap(),
1677 Some(val(100))
1678 );
1679 assert_eq!(
1680 grandchild_m.get(&key(1), &db).await.unwrap(),
1681 Some(val(101))
1682 );
1683 assert_eq!(grandchild_m.get(&key(2), &db).await.unwrap(), None);
1684 assert_eq!(grandchild_m.get(&key(7), &db).await.unwrap(), Some(val(7)));
1685
1686 db.apply_batch(grandchild_m).await.unwrap();
1688
1689 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1690 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(101)));
1691 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1692 assert_eq!(db.get(&key(3)).await.unwrap(), Some(val(3)));
1693 assert_eq!(db.get(&key(4)).await.unwrap(), Some(val(4)));
1694 assert_eq!(db.get(&key(5)).await.unwrap(), Some(val(5)));
1695 assert_eq!(db.get(&key(6)).await.unwrap(), Some(val(6)));
1696 assert_eq!(db.get(&key(7)).await.unwrap(), Some(val(7)));
1697
1698 db.destroy().await.unwrap();
1699 });
1700 }
1701
1702 #[test_traced("INFO")]
1704 fn test_any_batch_chain_matches_sequential() {
1705 let executor = deterministic::Runner::default();
1706 executor.start(|context| async move {
1707 let ctx = context.with_label("db");
1708
1709 let ctx_a = ctx.with_label("a");
1711 let mut db_a: UnorderedVariable = UnorderedVariableDb::init(
1712 ctx_a.clone(),
1713 variable_db_config::<OneCap>("cms-a", &ctx_a),
1714 )
1715 .await
1716 .unwrap();
1717
1718 let ctx_b = ctx.with_label("b");
1720 let mut db_b: UnorderedVariable = UnorderedVariableDb::init(
1721 ctx_b.clone(),
1722 variable_db_config::<OneCap>("cms-b", &ctx_b),
1723 )
1724 .await
1725 .unwrap();
1726
1727 let writes1: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1729
1730 let writes2 = vec![
1732 (key(0), Some(val(100))),
1733 (key(1), None),
1734 (key(5), Some(val(5))),
1735 ];
1736
1737 commit_writes(&mut db_a, writes1.clone(), None).await;
1739 commit_writes(&mut db_a, writes2.clone(), None).await;
1740
1741 let mut parent = db_b.new_batch();
1743 for (k, v) in &writes1 {
1744 parent = parent.write(*k, *v);
1745 }
1746 let parent_m = parent.merkleize(&db_b, None).await.unwrap();
1747
1748 let mut child = parent_m.new_batch::<Sha256>();
1749 for (k, v) in &writes2 {
1750 child = child.write(*k, *v);
1751 }
1752 let child_m = child.merkleize(&db_b, None).await.unwrap();
1753 db_b.apply_batch(child_m).await.unwrap();
1754
1755 assert_eq!(db_a.root(), db_b.root());
1757 for i in 0..6 {
1758 assert_eq!(
1759 db_a.get(&key(i)).await.unwrap(),
1760 db_b.get(&key(i)).await.unwrap(),
1761 "key {i} mismatch"
1762 );
1763 }
1764
1765 db_a.destroy().await.unwrap();
1766 db_b.destroy().await.unwrap();
1767 });
1768 }
1769
1770 #[test_traced("INFO")]
1772 fn test_any_batch_create_then_delete_same_batch() {
1773 let executor = deterministic::Runner::default();
1774 executor.start(|context| async move {
1775 let ctx = context.with_label("db");
1776 let mut db: UnorderedVariable =
1777 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("cd", &ctx))
1778 .await
1779 .unwrap();
1780
1781 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1783
1784 let mut batch = db.new_batch();
1786 batch = batch.write(key(1), Some(val(1))); batch = batch.write(key(1), None); batch = batch.write(key(2), Some(val(2))); batch = batch.write(key(0), None); let merkleized = batch.merkleize(&db, None).await.unwrap();
1791 db.apply_batch(merkleized).await.unwrap();
1792
1793 assert_eq!(db.get(&key(0)).await.unwrap(), None);
1794 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1795 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
1796
1797 db.destroy().await.unwrap();
1798 });
1799 }
1800
1801 #[test_traced("INFO")]
1803 fn test_any_batch_delete_all_keys() {
1804 let executor = deterministic::Runner::default();
1805 executor.start(|context| async move {
1806 let ctx = context.with_label("db");
1807 let mut db: UnorderedVariable =
1808 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("da", &ctx))
1809 .await
1810 .unwrap();
1811
1812 let init: Vec<_> = (0..5).map(|i| (key(i), Some(val(i)))).collect();
1814 commit_writes(&mut db, init, None).await;
1815
1816 let deletes: Vec<_> = (0..5).map(|i| (key(i), None)).collect();
1818 commit_writes(&mut db, deletes, None).await;
1819
1820 for i in 0..5 {
1821 assert_eq!(db.get(&key(i)).await.unwrap(), None, "key {i} not deleted");
1822 }
1823
1824 commit_writes(&mut db, [(key(10), Some(val(10)))], None).await;
1826 assert_eq!(db.get(&key(10)).await.unwrap(), Some(val(10)));
1827
1828 db.destroy().await.unwrap();
1829 });
1830 }
1831
1832 #[test_traced("INFO")]
1834 fn test_any_batch_parallel_forks() {
1835 let executor = deterministic::Runner::default();
1836 executor.start(|context| async move {
1837 let ctx = context.with_label("db");
1838 let mut db: UnorderedVariable =
1839 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pf", &ctx))
1840 .await
1841 .unwrap();
1842
1843 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1845 let root_before = db.root();
1846
1847 let fork_a_m = db
1849 .new_batch()
1850 .write(key(0), Some(val(100)))
1851 .write(key(1), Some(val(1)))
1852 .merkleize(&db, None)
1853 .await
1854 .unwrap();
1855
1856 let fork_b_m = db
1858 .new_batch()
1859 .write(key(0), None)
1860 .write(key(2), Some(val(2)))
1861 .merkleize(&db, None)
1862 .await
1863 .unwrap();
1864
1865 assert_ne!(fork_a_m.root(), fork_b_m.root());
1867
1868 assert_eq!(db.root(), root_before);
1870 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1871 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1872
1873 db.apply_batch(fork_a_m).await.unwrap();
1875 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(100)));
1876 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
1877 assert_eq!(db.get(&key(2)).await.unwrap(), None);
1878
1879 db.destroy().await.unwrap();
1880 });
1881 }
1882
1883 #[test_traced("INFO")]
1885 fn test_any_batch_floor_raise_chained() {
1886 let executor = deterministic::Runner::default();
1887 executor.start(|context| async move {
1888 let ctx = context.with_label("db");
1889 let mut db: UnorderedVariable =
1890 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("frc", &ctx))
1891 .await
1892 .unwrap();
1893
1894 let init: Vec<_> = (0..50).map(|i| (key(i), Some(val(i)))).collect();
1896 commit_writes(&mut db, init, None).await;
1897 let floor_before = db.inactivity_floor_loc();
1898
1899 let mut parent = db.new_batch();
1901 for i in 0..20 {
1902 parent = parent.write(key(i), Some(val(i + 500)));
1903 }
1904 let parent_m = parent.merkleize(&db, None).await.unwrap();
1905
1906 let mut child = parent_m.new_batch::<Sha256>();
1908 for i in 20..30 {
1909 child = child.write(key(i), Some(val(i + 500)));
1910 }
1911 let child_m = child.merkleize(&db, None).await.unwrap();
1912 db.apply_batch(child_m).await.unwrap();
1913
1914 assert!(db.inactivity_floor_loc() > floor_before);
1916
1917 for i in 0..30 {
1919 assert_eq!(
1920 db.get(&key(i)).await.unwrap(),
1921 Some(val(i + 500)),
1922 "updated key {i} mismatch"
1923 );
1924 }
1925 for i in 30..50 {
1926 assert_eq!(
1927 db.get(&key(i)).await.unwrap(),
1928 Some(val(i)),
1929 "untouched key {i} mismatch"
1930 );
1931 }
1932
1933 db.destroy().await.unwrap();
1934 });
1935 }
1936
1937 #[test_traced("INFO")]
1939 fn test_any_batch_abandoned() {
1940 let executor = deterministic::Runner::default();
1941 executor.start(|context| async move {
1942 let ctx = context.with_label("db");
1943 let mut db: UnorderedVariable =
1944 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ab", &ctx))
1945 .await
1946 .unwrap();
1947
1948 commit_writes(&mut db, [(key(0), Some(val(0)))], None).await;
1949 let root_before = db.root();
1950
1951 {
1953 let mut batch = db.new_batch();
1954 batch = batch.write(key(0), Some(val(999)));
1955 batch = batch.write(key(1), Some(val(1)));
1956 let _merkleized = batch.merkleize(&db, None).await.unwrap();
1957 }
1959
1960 assert_eq!(db.root(), root_before);
1962 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1963 assert_eq!(db.get(&key(1)).await.unwrap(), None);
1964
1965 db.destroy().await.unwrap();
1966 });
1967 }
1968
1969 #[test_traced("INFO")]
1971 fn test_any_batch_apply_requires_commit_for_recovery() {
1972 let executor = deterministic::Runner::default();
1973 executor.start(|context| async move {
1974 let partition = "apply_requires_commit";
1975 let ctx = context.with_label("db");
1976 let mut db: UnorderedVariable = UnorderedVariableDb::init(
1977 ctx.clone(),
1978 variable_db_config::<OneCap>(partition, &ctx),
1979 )
1980 .await
1981 .unwrap();
1982
1983 let committed_root = db.root();
1984
1985 let merkleized = db
1986 .new_batch()
1987 .write(key(0), Some(val(0)))
1988 .merkleize(&db, None)
1989 .await
1990 .unwrap();
1991 db.apply_batch(merkleized).await.unwrap();
1992
1993 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
1994
1995 drop(db);
1996
1997 let reopened: UnorderedVariable = UnorderedVariableDb::init(
1998 context.with_label("reopen"),
1999 variable_db_config::<OneCap>(partition, &context),
2000 )
2001 .await
2002 .unwrap();
2003 assert_eq!(reopened.root(), committed_root);
2004 assert_eq!(reopened.get(&key(0)).await.unwrap(), None);
2005
2006 reopened.destroy().await.unwrap();
2007 });
2008 }
2009
2010 #[test_traced("INFO")]
2012 fn test_any_rewind_pruned_target_errors() {
2013 let executor = deterministic::Runner::default();
2014 executor.start(|context| async move {
2015 const KEYS: u64 = 64;
2016
2017 let ctx = context.with_label("db");
2018 let mut db: UnorderedVariable =
2019 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("rp", &ctx))
2020 .await
2021 .unwrap();
2022
2023 let initial: Vec<_> = (0..KEYS).map(|i| (key(i), Some(val(i)))).collect();
2024 let first_range = commit_writes(&mut db, initial, None).await;
2025
2026 let mut round = 0u64;
2027 loop {
2028 round += 1;
2029 assert!(
2030 round <= 64,
2031 "failed to prune enough history for rewind test"
2032 );
2033
2034 let updates: Vec<_> = (0..KEYS)
2035 .map(|i| (key(i), Some(val(1000 + round * KEYS + i))))
2036 .collect();
2037 commit_writes(&mut db, updates, None).await;
2038
2039 db.prune(db.inactivity_floor_loc()).await.unwrap();
2040 let bounds = db.bounds().await;
2041 if bounds.start > first_range.start {
2042 break;
2043 }
2044 }
2045
2046 let oldest_retained = db.bounds().await.start;
2047 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
2048 assert!(
2049 matches!(
2050 boundary_err,
2051 crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2052 ),
2053 "unexpected rewind error at retained boundary: {boundary_err:?}"
2054 );
2055
2056 let err = db.rewind(first_range.start).await.unwrap_err();
2057 assert!(
2058 matches!(
2059 err,
2060 crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2061 ),
2062 "unexpected rewind error: {err:?}"
2063 );
2064
2065 db.destroy().await.unwrap();
2066 });
2067 }
2068
2069 #[test_traced("INFO")]
2071 fn test_any_rewind_invalid_target_errors() {
2072 let executor = deterministic::Runner::default();
2073 executor.start(|context| async move {
2074 let ctx = context.with_label("db");
2075 let mut db: UnorderedVariable =
2076 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("ri", &ctx))
2077 .await
2078 .unwrap();
2079
2080 let root_before = db.root();
2081 let size_before = db.size().await;
2082 let no_op_locs = db.rewind(size_before).await.unwrap();
2083 assert!(
2084 no_op_locs.is_empty(),
2085 "expected no-op rewind to return no restored locations"
2086 );
2087 assert_eq!(db.root(), root_before);
2088 assert_eq!(db.size().await, size_before);
2089
2090 let zero_err = db.rewind(Location::new(0)).await.unwrap_err();
2091 assert!(
2092 matches!(
2093 zero_err,
2094 crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(0))
2095 ),
2096 "unexpected rewind error: {zero_err:?}"
2097 );
2098 assert_eq!(db.root(), root_before);
2099 assert_eq!(db.size().await, size_before);
2100
2101 let too_large_target = Location::new(*size_before + 1);
2102 let too_large_err = db.rewind(too_large_target).await.unwrap_err();
2103 assert!(
2104 matches!(
2105 too_large_err,
2106 crate::qmdb::Error::Journal(crate::journal::Error::InvalidRewind(size))
2107 if size == *too_large_target
2108 ),
2109 "unexpected rewind error: {too_large_err:?}"
2110 );
2111 assert_eq!(db.root(), root_before);
2112 assert_eq!(db.size().await, size_before);
2113
2114 db.destroy().await.unwrap();
2115 });
2116 }
2117
2118 #[test_traced("INFO")]
2121 fn test_any_rewind_rejects_target_with_pruned_floor() {
2122 let executor = deterministic::Runner::default();
2123 executor.start(|context| async move {
2124 const KEYS: u64 = 64;
2125
2126 let ctx = context.with_label("db");
2127 let mut db: UnorderedVariable =
2128 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("rf", &ctx))
2129 .await
2130 .unwrap();
2131
2132 commit_writes(&mut db, (0..KEYS).map(|i| (key(i), Some(val(i)))), None).await;
2133 commit_writes(
2134 &mut db,
2135 (0..KEYS).map(|i| (key(i), Some(val(1_000 + i)))),
2136 None,
2137 )
2138 .await;
2139
2140 let rewind_target = db.size().await;
2141 let target_floor = db.inactivity_floor_loc();
2142 let prune_loc = Location::new(*target_floor + (KEYS / 2));
2143 assert!(
2144 rewind_target > *prune_loc,
2145 "test setup expected target size > prune_loc; target={rewind_target:?}, floor={target_floor:?}"
2146 );
2147
2148 let mut round = 0u64;
2149 while db.inactivity_floor_loc() < prune_loc {
2150 round += 1;
2151 assert!(
2152 round <= 8,
2153 "failed to advance inactivity floor enough for floor-pruned rewind test"
2154 );
2155 commit_writes(
2156 &mut db,
2157 (0..KEYS).map(|i| (key(i), Some(val(10_000 + round * KEYS + i)))),
2158 None,
2159 )
2160 .await;
2161 }
2162
2163 db.prune(prune_loc).await.unwrap();
2164 let bounds = db.bounds().await;
2165 assert!(
2166 bounds.start > *target_floor,
2167 "test setup expected pruned start beyond target floor; bounds={bounds:?}, target_floor={target_floor:?}"
2168 );
2169 assert!(
2170 rewind_target > bounds.start,
2171 "test setup expected target commit retained; target={rewind_target:?}, bounds={bounds:?}"
2172 );
2173
2174 let err = db.rewind(rewind_target).await.unwrap_err();
2175 assert!(
2176 matches!(
2177 err,
2178 crate::qmdb::Error::Journal(crate::journal::Error::ItemPruned(_))
2179 ),
2180 "unexpected rewind error: {err:?}"
2181 );
2182
2183 db.destroy().await.unwrap();
2184 });
2185 }
2186
2187 type MmbVariable = super::db::Db<
2194 crate::merkle::mmb::Family,
2195 Context,
2196 crate::journal::contiguous::variable::Journal<
2197 Context,
2198 super::operation::Operation<
2199 crate::merkle::mmb::Family,
2200 super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2201 >,
2202 >,
2203 crate::index::unordered::Index<OneCap, crate::merkle::Location<crate::merkle::mmb::Family>>,
2204 Sha256,
2205 super::operation::update::Unordered<Digest, super::value::VariableEncoding<Digest>>,
2206 >;
2207
2208 async fn open_mmb_db(context: Context, suffix: &str) -> MmbVariable {
2209 let cfg = variable_db_config::<OneCap>(suffix, &context);
2210 super::init(context, cfg, None, |_, _| {}).await.unwrap()
2211 }
2212
2213 async fn commit_writes_mmb(
2214 db: &mut MmbVariable,
2215 writes: impl IntoIterator<Item = (Digest, Option<Digest>)>,
2216 metadata: Option<Digest>,
2217 ) {
2218 let mut batch = db.new_batch();
2219 for (k, v) in writes {
2220 batch = batch.write(k, v);
2221 }
2222 let merkleized = batch.merkleize(db, metadata).await.unwrap();
2223 db.apply_batch(merkleized).await.unwrap();
2224 db.commit().await.unwrap();
2225 }
2226
2227 #[test_traced("INFO")]
2228 fn test_mmb_batch_crud() {
2229 let executor = deterministic::Runner::default();
2230 executor.start(|context| async move {
2231 let mut db = open_mmb_db(context.with_label("db"), "crud").await;
2232
2233 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2235 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2236
2237 commit_writes_mmb(&mut db, [(key(0), Some(val(1)))], None).await;
2239 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(1)));
2240
2241 commit_writes_mmb(&mut db, [(key(0), None)], None).await;
2243 assert!(db.get(&key(0)).await.unwrap().is_none());
2244
2245 commit_writes_mmb(
2247 &mut db,
2248 [(key(1), Some(val(1))), (key(2), Some(val(2)))],
2249 None,
2250 )
2251 .await;
2252 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2253 assert_eq!(db.get(&key(2)).await.unwrap(), Some(val(2)));
2254
2255 db.destroy().await.unwrap();
2256 });
2257 }
2258
2259 #[test_traced("INFO")]
2260 fn test_mmb_batch_empty() {
2261 let executor = deterministic::Runner::default();
2262 executor.start(|context| async move {
2263 let mut db = open_mmb_db(context.with_label("db"), "empty").await;
2264 let root_before = db.root();
2265
2266 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2267 db.apply_batch(merkleized).await.unwrap();
2268 assert_ne!(db.root(), root_before);
2269
2270 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], None).await;
2271 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2272
2273 db.destroy().await.unwrap();
2274 });
2275 }
2276
2277 #[test_traced("INFO")]
2278 fn test_mmb_batch_metadata() {
2279 let executor = deterministic::Runner::default();
2280 executor.start(|context| async move {
2281 let mut db = open_mmb_db(context.with_label("db"), "meta").await;
2282
2283 let metadata = val(42);
2284 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(metadata)).await;
2285 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
2286
2287 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
2288 db.apply_batch(merkleized).await.unwrap();
2289 assert_eq!(db.get_metadata().await.unwrap(), None);
2290
2291 db.destroy().await.unwrap();
2292 });
2293 }
2294
2295 #[test_traced("WARN")]
2296 fn test_mmb_recovery() {
2297 let executor = deterministic::Runner::default();
2298 executor.start(|context| async move {
2299 let mut db = open_mmb_db(context.with_label("db0"), "recovery").await;
2300
2301 commit_writes_mmb(&mut db, [(key(0), Some(val(0)))], Some(val(99))).await;
2302 commit_writes_mmb(&mut db, [(key(1), Some(val(1)))], None).await;
2303
2304 let root = db.root();
2305 let bounds = db.bounds().await;
2306 db.sync().await.unwrap();
2307 drop(db);
2308
2309 let db = open_mmb_db(context.with_label("db1"), "recovery").await;
2311 assert_eq!(db.root(), root);
2312 assert_eq!(db.bounds().await, bounds);
2313 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2314 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2315 assert_eq!(db.get_metadata().await.unwrap(), None);
2316
2317 db.destroy().await.unwrap();
2318 });
2319 }
2320
2321 #[test_traced("INFO")]
2322 fn test_mmb_prune() {
2323 let executor = deterministic::Runner::default();
2324 executor.start(|context| async move {
2325 let mut db = open_mmb_db(context.with_label("db"), "prune").await;
2326
2327 for i in 0u64..20 {
2328 commit_writes_mmb(&mut db, [(key(i), Some(val(i)))], None).await;
2329 }
2330
2331 let floor = db.inactivity_floor_loc();
2332 db.prune(floor).await.unwrap();
2333
2334 for i in 0u64..20 {
2336 assert_eq!(db.get(&key(i)).await.unwrap(), Some(val(i)));
2337 }
2338
2339 db.destroy().await.unwrap();
2340 });
2341 }
2342
2343 #[test_traced("INFO")]
2345 fn test_any_batch_single_stage_pipeline() {
2346 let executor = deterministic::Runner::default();
2347 executor.start(|context| async move {
2348 let ctx = context.with_label("db");
2349 let mut db: UnorderedVariable =
2350 UnorderedVariableDb::init(ctx.clone(), variable_db_config::<OneCap>("pipe", &ctx))
2351 .await
2352 .unwrap();
2353
2354 {
2355 let mut batch = db.new_batch();
2356 batch = batch.write(key(0), Some(val(0)));
2357 let merkleized = batch.merkleize(&db, None).await.unwrap();
2358 db.apply_batch(merkleized).await.unwrap();
2359 }
2360
2361 let (child_merkleized, commit_result) = futures::join!(
2362 async {
2363 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2364 let mut child = db.new_batch();
2365 child = child.write(key(1), Some(val(1)));
2366 child.merkleize(&db, None).await.unwrap()
2367 },
2368 db.commit(),
2369 );
2370 commit_result.unwrap();
2371
2372 db.apply_batch(child_merkleized).await.unwrap();
2373 db.commit().await.unwrap();
2374
2375 assert_eq!(db.get(&key(0)).await.unwrap(), Some(val(0)));
2376 assert_eq!(db.get(&key(1)).await.unwrap(), Some(val(1)));
2377
2378 db.destroy().await.unwrap();
2379 });
2380 }
2381}