1use crate::{
9 index::ordered::Index,
10 journal::contiguous::variable::Journal,
11 merkle::{self, Location},
12 qmdb::{
13 any::{ordered, value::VariableEncoding, VariableConfig, VariableValue},
14 operation::Key,
15 Error,
16 },
17 translator::Translator,
18 Context,
19};
20use commonware_codec::{Codec, Read};
21use commonware_cryptography::Hasher;
22
23pub type Update<K, V> = ordered::Update<K, VariableEncoding<V>>;
24pub type Operation<F, K, V> = ordered::Operation<F, K, VariableEncoding<V>>;
25
26pub type Db<F, E, K, V, H, T> =
29 super::Db<F, E, Journal<E, Operation<F, K, V>>, Index<T, Location<F>>, H, Update<K, V>>;
30
31impl<F: merkle::Family, E: Context, K: Key, V: VariableValue, H: Hasher, T: Translator>
32 Db<F, E, K, V, H, T>
33where
34 Operation<F, K, V>: Codec,
35{
36 pub async fn init(
39 context: E,
40 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
41 ) -> Result<Self, Error<F>> {
42 Self::init_with_callback(context, cfg, None, |_, _| {}).await
43 }
44
45 pub(crate) async fn init_with_callback(
52 context: E,
53 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
54 known_inactivity_floor: Option<Location<F>>,
55 callback: impl FnMut(bool, Option<Location<F>>),
56 ) -> Result<Self, Error<F>> {
57 crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
58 }
59}
60
61pub mod partitioned {
67 pub use super::{Operation, Update};
68 use crate::{
69 index::partitioned::ordered::Index,
70 journal::contiguous::variable::Journal,
71 merkle::{self, Location},
72 qmdb::{
73 any::{VariableConfig, VariableValue},
74 operation::Key,
75 Error,
76 },
77 translator::Translator,
78 Context,
79 };
80 use commonware_codec::{Codec, Read};
81 use commonware_cryptography::Hasher;
82
83 pub type Db<F, E, K, V, H, T, const P: usize> = crate::qmdb::any::ordered::Db<
93 F,
94 E,
95 Journal<E, Operation<F, K, V>>,
96 Index<T, Location<F>, P>,
97 H,
98 Update<K, V>,
99 >;
100
101 impl<
102 F: merkle::Family,
103 E: Context,
104 K: Key,
105 V: VariableValue,
106 H: Hasher,
107 T: Translator,
108 const P: usize,
109 > Db<F, E, K, V, H, T, P>
110 where
111 Operation<F, K, V>: Codec,
112 {
113 pub async fn init(
116 context: E,
117 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
118 ) -> Result<Self, Error<F>> {
119 Self::init_with_callback(context, cfg, None, |_, _| {}).await
120 }
121
122 pub(crate) async fn init_with_callback(
129 context: E,
130 cfg: VariableConfig<T, <Operation<F, K, V> as Read>::Cfg>,
131 known_inactivity_floor: Option<Location<F>>,
132 callback: impl FnMut(bool, Option<Location<F>>),
133 ) -> Result<Self, Error<F>> {
134 crate::qmdb::any::init(context, cfg, known_inactivity_floor, callback).await
135 }
136 }
137
138 pub mod p256 {
140 pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 1>;
142 }
143
144 pub mod p64k {
146 pub type Db<F, E, K, V, H, T> = super::Db<F, E, K, V, H, T, 2>;
148 }
149}
150
151#[cfg(test)]
152pub(crate) mod test {
153 use super::*;
154 use crate::{
155 mmr,
156 qmdb::any::{
157 ordered::test::{
158 test_ordered_any_db_basic, test_ordered_any_db_empty,
159 test_ordered_any_update_collision_edge_case,
160 },
161 test::variable_db_config,
162 },
163 translator::TwoCap,
164 };
165 use commonware_cryptography::{sha256::Digest, Sha256};
166 use commonware_macros::test_traced;
167 use commonware_math::algebra::Random;
168 use commonware_runtime::{
169 buffer::paged::CacheRef,
170 deterministic::{self, Context},
171 BufferPooler, Metrics, Runner as _,
172 };
173 use commonware_utils::{sequence::FixedBytes, test_rng_seeded, NZUsize, NZU16, NZU64};
174 use rand::RngCore;
175 const PAGE_SIZE: u16 = 103;
177 const PAGE_CACHE_SIZE: usize = 13;
178
179 pub(crate) type VarConfig =
180 VariableConfig<TwoCap, ((), (commonware_codec::RangeCfg<usize>, ()))>;
181
182 pub(crate) type AnyTest =
184 Db<mmr::Family, deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap>;
185
186 pub(crate) fn create_test_config(seed: u64, pooler: &impl BufferPooler) -> VarConfig {
187 let page_cache =
188 CacheRef::from_pooler(pooler, NZU16!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE));
189 VariableConfig {
190 merkle_config: crate::mmr::journaled::Config {
191 journal_partition: format!("mmr-journal-{seed}"),
192 metadata_partition: format!("mmr-metadata-{seed}"),
193 items_per_blob: NZU64!(12), write_buffer: NZUsize!(64),
195 thread_pool: None,
196 page_cache: page_cache.clone(),
197 },
198 journal_config: crate::journal::contiguous::variable::Config {
199 partition: format!("log-journal-{seed}"),
200 items_per_section: NZU64!(14), write_buffer: NZUsize!(64),
202 compression: None,
203 codec_config: ((), ((0..=10000).into(), ())),
204 page_cache,
205 },
206 translator: TwoCap,
207 }
208 }
209
210 pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
212 let seed = context.next_u64();
213 let config = create_test_config(seed, &context);
214 AnyTest::init(context, config).await.unwrap()
215 }
216
217 fn to_bytes(i: u64) -> Vec<u8> {
219 let len = ((i % 13) + 7) as usize;
220 vec![(i % 255) as u8; len]
221 }
222
223 pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<mmr::Family, Digest, Vec<u8>>> {
227 create_test_ops_seeded(n, 0)
228 }
229
230 pub(crate) fn create_test_ops_seeded(
233 n: usize,
234 seed: u64,
235 ) -> Vec<Operation<mmr::Family, Digest, Vec<u8>>> {
236 let mut rng = test_rng_seeded(seed);
237 let mut prev_key = Digest::random(&mut rng);
238 let mut ops = Vec::new();
239 for i in 0..n {
240 if i % 10 == 0 && i > 0 {
241 ops.push(Operation::Delete(prev_key));
242 } else {
243 let key = Digest::random(&mut rng);
244 let next_key = Digest::random(&mut rng);
245 let value = to_bytes(rng.next_u64());
246 ops.push(Operation::Update(ordered::Update {
247 key,
248 value,
249 next_key,
250 }));
251 prev_key = key;
252 }
253 }
254 ops
255 }
256
257 pub(crate) async fn apply_ops(
259 db: &mut AnyTest,
260 ops: Vec<Operation<mmr::Family, Digest, Vec<u8>>>,
261 ) {
262 let mut batch = db.new_batch();
263 for op in ops {
264 match op {
265 Operation::Update(data) => {
266 batch = batch.write(data.key, Some(data.value));
267 }
268 Operation::Delete(key) => {
269 batch = batch.write(key, None);
270 }
271 Operation::CommitFloor(_, _) => {
272 panic!("CommitFloor not supported in apply_ops");
275 }
276 }
277 }
278 let merkleized = batch.merkleize(db, None).await.unwrap();
279 db.apply_batch(merkleized).await.unwrap();
280 }
281
282 type VariableDb = Db<mmr::Family, Context, FixedBytes<4>, Digest, Sha256, TwoCap>;
286
287 async fn open_variable_db(context: Context) -> VariableDb {
289 let cfg = variable_db_config("fixed-bytes-var-partition", &context);
290 VariableDb::init(context, cfg).await.unwrap()
291 }
292
293 #[test_traced("WARN")]
294 fn test_ordered_any_variable_db_empty() {
295 let executor = deterministic::Runner::default();
296 executor.start(|context| async move {
297 let db = open_variable_db(context.with_label("initial")).await;
298 test_ordered_any_db_empty(context, db, |ctx| Box::pin(open_variable_db(ctx))).await;
299 });
300 }
301
302 #[test_traced("WARN")]
303 fn test_ordered_any_variable_db_basic() {
304 let executor = deterministic::Runner::default();
305 executor.start(|context| async move {
306 let db = open_variable_db(context.with_label("initial")).await;
307 test_ordered_any_db_basic(context, db, |ctx| Box::pin(open_variable_db(ctx))).await;
308 });
309 }
310
311 #[test_traced("WARN")]
312 fn test_ordered_any_update_collision_edge_case_variable() {
313 let executor = deterministic::Runner::default();
314 executor.start(|context| async move {
315 let db = open_variable_db(context.clone()).await;
316 test_ordered_any_update_collision_edge_case(db).await;
317 });
318 }
319
320 #[test_traced("WARN")]
323 fn test_ordered_any_update_batch_create_between_collisions() {
324 let executor = deterministic::Runner::default();
325 executor.start(|context| async move {
326 let mut db = open_variable_db(context.clone()).await;
327
328 let key1 = FixedBytes::from([0xFFu8, 0xFFu8, 5u8, 5u8]);
331 let key2 = FixedBytes::from([0xFFu8, 0xFFu8, 6u8, 6u8]);
332 let key3 = FixedBytes::from([0xFFu8, 0xFFu8, 7u8, 0u8]);
333 let val = Sha256::fill(1u8);
334
335 let merkleized = db
336 .new_batch()
337 .write(key1.clone(), Some(val))
338 .write(key3.clone(), Some(val))
339 .merkleize(&db, None)
340 .await
341 .unwrap();
342 db.apply_batch(merkleized).await.unwrap();
343
344 assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
345 assert!(db.get(&key2).await.unwrap().is_none());
346 assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
347
348 let merkleized = db
350 .new_batch()
351 .write(key2.clone(), Some(val))
352 .merkleize(&db, None)
353 .await
354 .unwrap();
355 db.apply_batch(merkleized).await.unwrap();
356
357 assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
358 assert_eq!(db.get(&key2).await.unwrap().unwrap(), val);
359 assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
360
361 let span1 = db.get_span(&key1).await.unwrap().unwrap();
362 assert_eq!(span1.1.next_key, key2);
363 let span2 = db.get_span(&key2).await.unwrap().unwrap();
364 assert_eq!(span2.1.next_key, key3);
365 let span3 = db.get_span(&key3).await.unwrap().unwrap();
366 assert_eq!(span3.1.next_key, key1);
367
368 db.destroy().await.unwrap();
369 });
370 }
371
372 #[test_traced("WARN")]
375 fn test_ordered_any_batch_create_delete_prev_links() {
376 let executor = deterministic::Runner::default();
377 executor.start(|context| async move {
378 let key1 = FixedBytes::from([0x10u8, 0x00, 0x00, 0x00]);
379 let key2 = FixedBytes::from([0x20u8, 0x00, 0x00, 0x00]);
380 let key3 = FixedBytes::from([0x30u8, 0x00, 0x00, 0x00]);
381 let val1 = Sha256::fill(1u8);
382 let val2 = Sha256::fill(2u8);
383 let val3 = Sha256::fill(3u8);
384
385 let mut db = open_variable_db(context.with_label("first")).await;
387 let merkleized = db
388 .new_batch()
389 .write(key1.clone(), Some(val1))
390 .write(key3.clone(), Some(val3))
391 .merkleize(&db, None)
392 .await
393 .unwrap();
394 db.apply_batch(merkleized).await.unwrap();
395
396 let merkleized = db
397 .new_batch()
398 .write(key1.clone(), None)
399 .write(key2.clone(), Some(val2))
400 .merkleize(&db, None)
401 .await
402 .unwrap();
403 db.apply_batch(merkleized).await.unwrap();
404
405 assert!(db.get(&key1).await.unwrap().is_none());
406 assert_eq!(db.get(&key2).await.unwrap(), Some(val2));
407 assert_eq!(db.get(&key3).await.unwrap(), Some(val3));
408 let span2 = db.get_span(&key2).await.unwrap().unwrap();
409 assert_eq!(span2.1.next_key, key3);
410 let span3 = db.get_span(&key3).await.unwrap().unwrap();
411 assert_eq!(span3.1.next_key, key2);
412 db.destroy().await.unwrap();
413
414 let mut db = open_variable_db(context.with_label("second")).await;
416 let merkleized = db
417 .new_batch()
418 .write(key1.clone(), Some(val1))
419 .write(key3.clone(), Some(val3))
420 .merkleize(&db, None)
421 .await
422 .unwrap();
423 db.apply_batch(merkleized).await.unwrap();
424
425 let merkleized = db
426 .new_batch()
427 .write(key2.clone(), Some(val2))
428 .write(key3.clone(), None)
429 .merkleize(&db, None)
430 .await
431 .unwrap();
432 db.apply_batch(merkleized).await.unwrap();
433
434 assert_eq!(db.get(&key1).await.unwrap(), Some(val1));
435 assert_eq!(db.get(&key2).await.unwrap(), Some(val2));
436 assert!(db.get(&key3).await.unwrap().is_none());
437 let span1 = db.get_span(&key1).await.unwrap().unwrap();
438 assert_eq!(span1.1.next_key, key2);
439 let span2 = db.get_span(&key2).await.unwrap().unwrap();
440 assert_eq!(span2.1.next_key, key1);
441 db.destroy().await.unwrap();
442 });
443 }
444
445 fn is_send<T: Send>(_: T) {}
446
447 #[allow(dead_code)]
448 fn assert_non_trait_futures_are_send(db: &mut AnyTest, key: Digest) {
449 is_send(db.get_all(&key));
450 is_send(db.get_with_loc(&key));
451 is_send(db.get_span(&key));
452 }
453
454 #[test_traced("WARN")]
458 fn test_ordered_sequential_commit_basic() {
459 let executor = deterministic::Runner::default();
460 executor.start(|context| async move {
461 let mut db = create_test_db(context).await;
462
463 apply_ops(&mut db, create_test_ops(10)).await;
465 db.commit().await.unwrap();
466
467 let base = db.to_batch();
468
469 let key_a = Digest::random(&mut test_rng_seeded(800));
471 let val_a = vec![1u8; 10];
472 let parent_batch = base
473 .new_batch::<Sha256>()
474 .write(key_a, Some(val_a.clone()))
475 .merkleize(&db, None)
476 .await
477 .unwrap();
478
479 let key_b = Digest::random(&mut test_rng_seeded(801));
481 let val_b = vec![2u8; 10];
482 let child_batch = parent_batch
483 .new_batch::<Sha256>()
484 .write(key_b, Some(val_b.clone()))
485 .merkleize(&db, None)
486 .await
487 .unwrap();
488
489 db.apply_batch(parent_batch).await.unwrap();
490 db.commit().await.unwrap();
491
492 db.apply_batch(child_batch).await.unwrap();
494 db.commit().await.unwrap();
495
496 assert_eq!(db.get(&key_a).await.unwrap().unwrap(), val_a);
498 assert_eq!(db.get(&key_b).await.unwrap().unwrap(), val_b);
499
500 db.destroy().await.unwrap();
501 });
502 }
503
504 #[test_traced("WARN")]
508 fn test_ordered_sequential_commit_delete_after_insert() {
509 let executor = deterministic::Runner::default();
510 executor.start(|context| async move {
511 let mut db = create_test_db(context).await;
512
513 apply_ops(&mut db, create_test_ops(5)).await;
514 db.commit().await.unwrap();
515
516 let base = db.to_batch();
517
518 let key_x = Digest::random(&mut test_rng_seeded(810));
519 let val_x = vec![10u8; 8];
520 let parent_batch = base
521 .new_batch::<Sha256>()
522 .write(key_x, Some(val_x.clone()))
523 .merkleize(&db, None)
524 .await
525 .unwrap();
526
527 let child_batch = parent_batch
528 .new_batch::<Sha256>()
529 .write(key_x, None)
530 .merkleize(&db, None)
531 .await
532 .unwrap();
533
534 db.apply_batch(parent_batch).await.unwrap();
535 db.commit().await.unwrap();
536 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_x);
537
538 db.apply_batch(child_batch).await.unwrap();
540 db.commit().await.unwrap();
541
542 assert!(db.get(&key_x).await.unwrap().is_none());
544
545 db.destroy().await.unwrap();
546 });
547 }
548
549 #[test_traced("WARN")]
552 fn test_ordered_sequential_commit_overlapping_keys() {
553 let executor = deterministic::Runner::default();
554 executor.start(|context| async move {
555 let mut db = create_test_db(context).await;
556
557 apply_ops(&mut db, create_test_ops(5)).await;
558 db.commit().await.unwrap();
559
560 let base = db.to_batch();
561
562 let key_x = Digest::random(&mut test_rng_seeded(820));
563 let val_a = vec![10u8; 8];
564 let parent_batch = base
565 .new_batch::<Sha256>()
566 .write(key_x, Some(val_a.clone()))
567 .merkleize(&db, None)
568 .await
569 .unwrap();
570
571 let val_b = vec![20u8; 8];
572 let child_batch = parent_batch
573 .new_batch::<Sha256>()
574 .write(key_x, Some(val_b.clone()))
575 .merkleize(&db, None)
576 .await
577 .unwrap();
578
579 db.apply_batch(parent_batch).await.unwrap();
580 db.commit().await.unwrap();
581 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_a);
582
583 db.apply_batch(child_batch).await.unwrap();
585 db.commit().await.unwrap();
586
587 assert_eq!(db.get(&key_x).await.unwrap().unwrap(), val_b);
588
589 db.destroy().await.unwrap();
590 });
591 }
592
593 mod from_sync_testable {
595 use super::*;
596 use crate::{
597 merkle::{
598 mmr::{self, journaled::Mmr},
599 Family as _,
600 },
601 qmdb::any::sync::tests::FromSyncTestable,
602 };
603 use futures::future::join_all;
604
605 type TestMmr = Mmr<deterministic::Context, Digest>;
606
607 impl FromSyncTestable for AnyTest {
608 type Mmr = TestMmr;
609
610 fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
611 (self.log.merkle, self.log.journal)
612 }
613
614 async fn pinned_nodes_at(&self, loc: mmr::Location) -> Vec<Digest> {
615 join_all(mmr::Family::nodes_to_pin(loc).map(|p| self.log.merkle.get_node(p)))
616 .await
617 .into_iter()
618 .map(|n| n.unwrap().unwrap())
619 .collect()
620 }
621 }
622 }
623}