1use crate::{
8 qmdb::any::{FixedConfig as AnyFixedConfig, VariableConfig as AnyVariableConfig},
9 translator::Translator,
10};
11use commonware_parallel::ThreadPool;
12use commonware_runtime::buffer::paged::CacheRef;
13use std::num::{NonZeroU64, NonZeroUsize};
14
15pub mod db;
16pub mod ordered;
17pub mod proof;
18pub mod unordered;
19
20#[derive(Clone)]
22pub struct FixedConfig<T: Translator> {
23 pub mmr_journal_partition: String,
25
26 pub mmr_items_per_blob: NonZeroU64,
28
29 pub mmr_write_buffer: NonZeroUsize,
31
32 pub mmr_metadata_partition: String,
34
35 pub log_journal_partition: String,
37
38 pub log_items_per_blob: NonZeroU64,
40
41 pub log_write_buffer: NonZeroUsize,
43
44 pub bitmap_metadata_partition: String,
46
47 pub translator: T,
49
50 pub thread_pool: Option<ThreadPool>,
52
53 pub page_cache: CacheRef,
55}
56
57impl<T: Translator> From<FixedConfig<T>> for AnyFixedConfig<T> {
58 fn from(cfg: FixedConfig<T>) -> Self {
59 Self {
60 mmr_journal_partition: cfg.mmr_journal_partition,
61 mmr_metadata_partition: cfg.mmr_metadata_partition,
62 mmr_items_per_blob: cfg.mmr_items_per_blob,
63 mmr_write_buffer: cfg.mmr_write_buffer,
64 log_journal_partition: cfg.log_journal_partition,
65 log_items_per_blob: cfg.log_items_per_blob,
66 log_write_buffer: cfg.log_write_buffer,
67 translator: cfg.translator,
68 thread_pool: cfg.thread_pool,
69 page_cache: cfg.page_cache,
70 }
71 }
72}
73
74#[derive(Clone)]
75pub struct VariableConfig<T: Translator, C> {
76 pub mmr_journal_partition: String,
78
79 pub mmr_items_per_blob: NonZeroU64,
81
82 pub mmr_write_buffer: NonZeroUsize,
84
85 pub mmr_metadata_partition: String,
87
88 pub log_partition: String,
90
91 pub log_write_buffer: NonZeroUsize,
93
94 pub log_compression: Option<u8>,
96
97 pub log_codec_config: C,
99
100 pub log_items_per_blob: NonZeroU64,
102
103 pub bitmap_metadata_partition: String,
105
106 pub translator: T,
108
109 pub thread_pool: Option<ThreadPool>,
111
112 pub page_cache: CacheRef,
114}
115
116impl<T: Translator, C> From<VariableConfig<T, C>> for AnyVariableConfig<T, C> {
117 fn from(cfg: VariableConfig<T, C>) -> Self {
118 Self {
119 mmr_journal_partition: cfg.mmr_journal_partition,
120 mmr_metadata_partition: cfg.mmr_metadata_partition,
121 mmr_items_per_blob: cfg.mmr_items_per_blob,
122 mmr_write_buffer: cfg.mmr_write_buffer,
123 log_items_per_blob: cfg.log_items_per_blob,
124 log_partition: cfg.log_partition,
125 log_write_buffer: cfg.log_write_buffer,
126 log_compression: cfg.log_compression,
127 log_codec_config: cfg.log_codec_config,
128 translator: cfg.translator,
129 thread_pool: cfg.thread_pool,
130 page_cache: cfg.page_cache,
131 }
132 }
133}
134
135#[cfg(any(test, feature = "test-traits"))]
137pub trait BitmapPrunedBits {
138 fn pruned_bits(&self) -> u64;
140
141 fn get_bit(&self, index: u64) -> bool;
143
144 fn oldest_retained(&self) -> u64;
146}
147
148#[cfg(test)]
149pub mod tests {
150 pub use super::BitmapPrunedBits;
153 use crate::{
154 kv::{Deletable as _, Updatable as _},
155 qmdb::{
156 any::states::{CleanAny, MutableAny as _, UnmerkleizedDurableAny as _},
157 store::{
158 batch_tests::{TestKey, TestValue},
159 LogStore,
160 },
161 Error,
162 },
163 };
164 use commonware_runtime::{
165 deterministic::{self, Context},
166 Metrics as _, Runner as _,
167 };
168 use core::future::Future;
169 use rand::{rngs::StdRng, RngCore, SeedableRng};
170 use tracing::warn;
171
172 pub async fn apply_random_ops<C>(
175 num_elements: u64,
176 commit_changes: bool,
177 rng_seed: u64,
178 mut db: C::Mutable,
179 ) -> Result<C::Mutable, Error>
180 where
181 C: CleanAny,
182 C::Key: TestKey,
183 <C as LogStore>::Value: TestValue,
184 {
185 warn!("rng_seed={}", rng_seed);
187 let mut rng = StdRng::seed_from_u64(rng_seed);
188
189 for i in 0u64..num_elements {
190 let k = TestKey::from_seed(i);
191 let v = TestValue::from_seed(rng.next_u64());
192 db.update(k, v).await.unwrap();
193 }
194
195 for _ in 0u64..num_elements * 10 {
198 let rand_key = TestKey::from_seed(rng.next_u64() % num_elements);
199 if rng.next_u32() % 7 == 0 {
200 db.delete(rand_key).await.unwrap();
201 continue;
202 }
203 let v = TestValue::from_seed(rng.next_u64());
204 db.update(rand_key, v).await.unwrap();
205 if commit_changes && rng.next_u32() % 20 == 0 {
206 let (durable_db, _) = db.commit(None).await?;
208 let clean_db: C = durable_db.into_merkleized().await?;
209 db = clean_db.into_mutable();
210 }
211 }
212 if commit_changes {
213 let (durable_db, _) = db.commit(None).await?;
214 let clean_db: C = durable_db.into_merkleized().await?;
215 db = clean_db.into_mutable();
216 }
217 Ok(db)
218 }
219
220 pub fn test_build_random_close_reopen<C, F, Fut>(mut open_db: F)
225 where
226 C: CleanAny,
227 C::Key: TestKey,
228 <C as LogStore>::Value: TestValue,
229 F: FnMut(Context, String) -> Fut + Clone,
230 Fut: Future<Output = C>,
231 {
232 const ELEMENTS: u64 = 1000;
233
234 let executor = deterministic::Runner::default();
235 let mut open_db_clone = open_db.clone();
236 let state1 = executor.start(|mut context| async move {
237 let partition = "build_random".to_string();
238 let rng_seed = context.next_u64();
239 let db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
240 let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
241 .await
242 .unwrap();
243 let (db, _) = db.commit(None).await.unwrap();
244 let mut db: C = db.into_merkleized().await.unwrap();
245 db.sync().await.unwrap();
246
247 let root = db.root();
249 drop(db);
250 let db: C = open_db_clone(context.with_label("second"), partition).await;
251
252 assert_eq!(db.root(), root);
254
255 db.destroy().await.unwrap();
256 context.auditor().state()
257 });
258
259 let executor = deterministic::Runner::default();
261 let state2 = executor.start(|mut context| async move {
262 let partition = "build_random".to_string();
263 let rng_seed = context.next_u64();
264 let db: C = open_db(context.with_label("first"), partition.clone()).await;
265 let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
266 .await
267 .unwrap();
268 let (db, _) = db.commit(None).await.unwrap();
269 let mut db: C = db.into_merkleized().await.unwrap();
270 db.sync().await.unwrap();
271
272 let root = db.root();
273 drop(db);
274 let db: C = open_db(context.with_label("second"), partition).await;
275 assert_eq!(db.root(), root);
276
277 db.destroy().await.unwrap();
278 context.auditor().state()
279 });
280
281 assert_eq!(state1, state2);
282 }
283
284 pub fn test_simulate_write_failures<C, F, Fut>(mut open_db: F)
289 where
290 C: CleanAny,
291 C::Key: TestKey,
292 <C as LogStore>::Value: TestValue,
293 F: FnMut(Context, String) -> Fut + Clone,
294 Fut: Future<Output = C>,
295 {
296 const ELEMENTS: u64 = 1000;
297
298 let executor = deterministic::Runner::default();
299 executor.start(|mut context| async move {
300 let partition = "build_random_fail_commit".to_string();
301 let rng_seed = context.next_u64();
302 let db: C = open_db(context.with_label("first"), partition.clone()).await;
303 let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
304 .await
305 .unwrap();
306 let (db, _) = db.commit(None).await.unwrap();
307 let mut db: C = db.into_merkleized().await.unwrap();
308 let committed_root = db.root();
309 let committed_op_count = db.bounds().end;
310 let committed_inactivity_floor = db.inactivity_floor_loc();
311 db.prune(committed_inactivity_floor).await.unwrap();
312
313 let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db.into_mutable())
315 .await
316 .unwrap();
317
318 drop(db);
321 let db: C = open_db(context.with_label("scenario1"), partition.clone()).await;
322 assert_eq!(db.root(), committed_root);
323 assert_eq!(db.bounds().end, committed_op_count);
324
325 let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db.into_mutable())
327 .await
328 .unwrap();
329
330 let (durable_db, _) = db.commit(None).await.unwrap();
335 let committed_op_count = durable_db.bounds().end;
336 drop(durable_db);
337
338 let db: C = open_db(context.with_label("scenario2"), partition.clone()).await;
341 let scenario_2_root = db.root();
342
343 let fresh_partition = "build_random_fail_commit_fresh".to_string();
346 let db: C = open_db(context.with_label("fresh"), fresh_partition.clone()).await;
347 let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
348 .await
349 .unwrap();
350 let (db, _) = db.commit(None).await.unwrap();
351 let db = apply_random_ops::<C>(ELEMENTS, false, rng_seed + 1, db.into_mutable())
352 .await
353 .unwrap();
354 let (db, _) = db.commit(None).await.unwrap();
355 let mut db: C = db.into_merkleized().await.unwrap();
356 db.prune(db.inactivity_floor_loc()).await.unwrap();
357 assert_eq!(db.bounds().end, committed_op_count);
359 assert_eq!(db.root(), scenario_2_root);
360
361 db.destroy().await.unwrap();
362 });
363 }
364
365 pub fn test_different_pruning_delays_same_root<C, F, Fut>(mut open_db: F)
370 where
371 C: CleanAny,
372 C::Key: TestKey,
373 <C as LogStore>::Value: TestValue,
374 F: FnMut(Context, String) -> Fut + Clone,
375 Fut: Future<Output = C>,
376 {
377 const NUM_OPERATIONS: u64 = 1000;
378
379 let executor = deterministic::Runner::default();
380 let mut open_db_clone = open_db.clone();
381 executor.start(|context| async move {
382 let mut db_no_pruning: C = open_db_clone(
384 context.with_label("no_pruning"),
385 "no_pruning_test".to_string(),
386 )
387 .await;
388 let mut db_pruning: C =
389 open_db(context.with_label("pruning"), "pruning_test".to_string()).await;
390
391 let mut db_no_pruning_mut = db_no_pruning.into_mutable();
392 let mut db_pruning_mut = db_pruning.into_mutable();
393
394 for i in 0..NUM_OPERATIONS {
396 let key: C::Key = TestKey::from_seed(i);
397 let value: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
398
399 db_no_pruning_mut.update(key, value.clone()).await.unwrap();
400 db_pruning_mut.update(key, value).await.unwrap();
401
402 if i % 50 == 49 {
404 let (db_1, _) = db_no_pruning_mut.commit(None).await.unwrap();
405 let clean_no_pruning: C = db_1.into_merkleized().await.unwrap();
406 let (db_2, _) = db_pruning_mut.commit(None).await.unwrap();
407 let mut clean_pruning: C = db_2.into_merkleized().await.unwrap();
408 clean_pruning
409 .prune(clean_no_pruning.inactivity_floor_loc())
410 .await
411 .unwrap();
412 db_no_pruning_mut = clean_no_pruning.into_mutable();
413 db_pruning_mut = clean_pruning.into_mutable();
414 }
415 }
416
417 let (db_1, _) = db_no_pruning_mut.commit(None).await.unwrap();
419 db_no_pruning = db_1.into_merkleized().await.unwrap();
420 let (db_2, _) = db_pruning_mut.commit(None).await.unwrap();
421 db_pruning = db_2.into_merkleized().await.unwrap();
422
423 let root_no_pruning = db_no_pruning.root();
425 let root_pruning = db_pruning.root();
426 assert_eq!(root_no_pruning, root_pruning);
427
428 assert_eq!(
430 db_no_pruning.inactivity_floor_loc(),
431 db_pruning.inactivity_floor_loc()
432 );
433
434 db_no_pruning.destroy().await.unwrap();
435 db_pruning.destroy().await.unwrap();
436 });
437 }
438
439 pub fn test_sync_persists_bitmap_pruning_boundary<C, F, Fut>(mut open_db: F)
445 where
446 C: CleanAny + BitmapPrunedBits,
447 C::Key: TestKey,
448 <C as LogStore>::Value: TestValue,
449 F: FnMut(Context, String) -> Fut + Clone,
450 Fut: Future<Output = C>,
451 {
452 const ELEMENTS: u64 = 500;
453
454 let executor = deterministic::Runner::default();
455 let mut open_db_clone = open_db.clone();
456 executor.start(|mut context| async move {
457 let partition = "sync_bitmap_pruning".to_string();
458 let rng_seed = context.next_u64();
459 let db: C = open_db_clone(context.with_label("first"), partition.clone()).await;
460
461 let db = apply_random_ops::<C>(ELEMENTS, true, rng_seed, db.into_mutable())
463 .await
464 .unwrap();
465 let (db, _) = db.commit(None).await.unwrap();
466 let mut db: C = db.into_merkleized().await.unwrap();
467
468 let pruned_bits_before = db.pruned_bits();
470 warn!(
471 "pruned_bits_before={}, inactivity_floor={}, op_count={}",
472 pruned_bits_before,
473 *db.inactivity_floor_loc(),
474 *db.bounds().end
475 );
476
477 assert!(
479 pruned_bits_before > 0,
480 "Expected bitmap to have pruned bits after merkleization"
481 );
482
483 db.sync().await.unwrap();
486
487 let root_before = db.root();
489 drop(db);
490
491 let db: C = open_db(context.with_label("second"), partition).await;
493
494 let pruned_bits_after = db.pruned_bits();
497 warn!("pruned_bits_after={}", pruned_bits_after);
498
499 assert_eq!(
500 pruned_bits_after, pruned_bits_before,
501 "Bitmap pruned bits mismatch after reopen - sync() may not have called write_pruned()"
502 );
503
504 assert_eq!(db.root(), root_before);
506
507 db.destroy().await.unwrap();
508 });
509 }
510
511 pub fn test_current_db_build_big<C, F, Fut>(
520 mut open_db: F,
521 expected_op_count: u64,
522 expected_inactivity_floor: u64,
523 ) where
524 C: CleanAny,
525 C::Key: TestKey,
526 <C as LogStore>::Value: TestValue,
527 F: FnMut(Context, String) -> Fut + Clone,
528 Fut: Future<Output = C>,
529 {
530 use crate::mmr::Location;
531
532 const ELEMENTS: u64 = 1000;
533
534 let executor = deterministic::Runner::default();
535 let mut open_db_clone = open_db.clone();
536 executor.start(|context| async move {
537 let mut db = open_db_clone(context.with_label("first"), "build_big".to_string())
538 .await
539 .into_mutable();
540
541 let mut map = std::collections::HashMap::<C::Key, <C as LogStore>::Value>::default();
542 for i in 0u64..ELEMENTS {
543 let k: C::Key = TestKey::from_seed(i);
544 let v: <C as LogStore>::Value = TestValue::from_seed(i * 1000);
545 db.update(k, v.clone()).await.unwrap();
546 map.insert(k, v);
547 }
548
549 for i in 0u64..ELEMENTS {
551 if i % 3 != 0 {
552 continue;
553 }
554 let k: C::Key = TestKey::from_seed(i);
555 let v: <C as LogStore>::Value = TestValue::from_seed((i + 1) * 10000);
556 db.update(k, v.clone()).await.unwrap();
557 map.insert(k, v);
558 }
559
560 for i in 0u64..ELEMENTS {
562 if i % 7 != 1 {
563 continue;
564 }
565 let k: C::Key = TestKey::from_seed(i);
566 db.delete(k).await.unwrap();
567 map.remove(&k);
568 }
569
570 let (db, _) = db.commit(None).await.unwrap();
572 let mut db: C = db.into_merkleized().await.unwrap();
573 db.sync().await.unwrap();
574 db.prune(db.inactivity_floor_loc()).await.unwrap();
575
576 assert_eq!(db.bounds().end, Location::new_unchecked(expected_op_count));
578 assert_eq!(
579 db.inactivity_floor_loc(),
580 Location::new_unchecked(expected_inactivity_floor)
581 );
582
583 let root = db.root();
585 db.sync().await.unwrap();
586 drop(db);
587
588 let db: C = open_db(context.with_label("second"), "build_big".to_string()).await;
590 assert_eq!(root, db.root());
591 assert_eq!(db.bounds().end, Location::new_unchecked(expected_op_count));
592 assert_eq!(
593 db.inactivity_floor_loc(),
594 Location::new_unchecked(expected_inactivity_floor)
595 );
596
597 for i in 0u64..ELEMENTS {
599 let k: C::Key = TestKey::from_seed(i);
600 if let Some(map_value) = map.get(&k) {
601 let Some(db_value) = db.get(&k).await.unwrap() else {
602 panic!("key not found in db: {k}");
603 };
604 assert_eq!(*map_value, db_value);
605 } else {
606 assert!(db.get(&k).await.unwrap().is_none());
607 }
608 }
609 });
610 }
611}