1use crate::{
2 journal::contiguous::variable,
3 mmr::Location,
4 qmdb::{
5 any::VariableValue,
6 immutable::{self, Operation},
7 sync, Error,
8 },
9 translator::Translator,
10};
11use commonware_cryptography::Hasher;
12use commonware_runtime::{Clock, Metrics, Storage};
13use commonware_utils::Array;
14use std::ops::Range;
15
16impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
17where
18 E: Storage + Clock + Metrics,
19 K: Array,
20 V: VariableValue,
21 H: Hasher,
22 T: Translator,
23{
24 type Op = Operation<K, V>;
25 type Journal = variable::Journal<E, Self::Op>;
26 type Hasher = H;
27 type Config = immutable::Config<T, V::Cfg>;
28 type Digest = H::Digest;
29 type Context = E;
30
31 async fn create_journal(
32 context: Self::Context,
33 config: &Self::Config,
34 range: Range<Location>,
35 ) -> Result<Self::Journal, Error> {
36 variable::Journal::init_sync(
38 context.with_label("log"),
39 variable::Config {
40 items_per_section: config.log_items_per_section,
41 partition: config.log_partition.clone(),
42 compression: config.log_compression,
43 codec_config: config.log_codec_config.clone(),
44 buffer_pool: config.buffer_pool.clone(),
45 write_buffer: config.log_write_buffer,
46 },
47 *range.start..*range.end,
48 )
49 .await
50 }
51
52 async fn from_sync_result(
68 context: Self::Context,
69 db_config: Self::Config,
70 journal: Self::Journal,
71 pinned_nodes: Option<Vec<Self::Digest>>,
72 range: Range<Location>,
73 apply_batch_size: usize,
74 ) -> Result<Self, Error> {
75 let sync_config = Config {
76 db_config,
77 log: journal,
78 range,
79 pinned_nodes,
80 apply_batch_size,
81 };
82 Self::init_synced(context, sync_config).await
83 }
84
85 fn root(&self) -> Self::Digest {
86 self.root()
87 }
88
89 async fn resize_journal(
90 mut journal: Self::Journal,
91 context: Self::Context,
92 config: &Self::Config,
93 range: Range<Location>,
94 ) -> Result<Self::Journal, Error> {
95 let size = journal.size();
96
97 if size <= range.start {
98 journal.destroy().await?;
100 Self::create_journal(context, config, range).await
101 } else {
102 journal
104 .prune(*range.start)
105 .await
106 .map_err(crate::qmdb::Error::from)?;
107
108 let size = journal.size();
110 if size > range.end {
111 return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
112 size,
113 )));
114 }
115
116 Ok(journal)
117 }
118 }
119}
120
121pub struct Config<E, K, V, T, D, C>
123where
124 E: Storage + Metrics,
125 K: Array,
126 V: VariableValue,
127 T: Translator,
128 D: commonware_cryptography::Digest,
129{
130 pub db_config: immutable::Config<T, C>,
132
133 pub log: variable::Journal<E, Operation<K, V>>,
136
137 pub range: Range<Location>,
139
140 pub pinned_nodes: Option<Vec<D>>,
145
146 pub apply_batch_size: usize,
150}
151
152#[cfg(test)]
153mod tests {
154 use crate::{
155 mmr::Location,
156 qmdb::{
157 immutable,
158 immutable::Operation,
159 sync::{
160 self,
161 engine::{Config, NextStep},
162 Engine, Target,
163 },
164 },
165 translator::TwoCap,
166 };
167 use commonware_cryptography::{sha256, Sha256};
168 use commonware_macros::test_traced;
169 use commonware_math::algebra::Random;
170 use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
171 use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64};
172 use futures::{channel::mpsc, SinkExt as _};
173 use rand::RngCore as _;
174 use rstest::rstest;
175 use std::{
176 collections::HashMap,
177 num::{NonZeroU16, NonZeroU64, NonZeroUsize},
178 sync::Arc,
179 };
180
181 type ImmutableSyncTest = immutable::Immutable<
183 deterministic::Context,
184 sha256::Digest,
185 sha256::Digest,
186 Sha256,
187 crate::translator::TwoCap,
188 >;
189
190 type ImmutableSyncTestMutable = immutable::Immutable<
192 deterministic::Context,
193 sha256::Digest,
194 sha256::Digest,
195 Sha256,
196 crate::translator::TwoCap,
197 immutable::Unmerkleized,
198 immutable::NonDurable,
199 >;
200
201 fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
203 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
204 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
205 const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
206
207 immutable::Config {
208 mmr_journal_partition: format!("journal_{suffix}"),
209 mmr_metadata_partition: format!("metadata_{suffix}"),
210 mmr_items_per_blob: NZU64!(11),
211 mmr_write_buffer: NZUsize!(1024),
212 log_partition: format!("log_{suffix}"),
213 log_items_per_section: ITEMS_PER_SECTION,
214 log_compression: None,
215 log_codec_config: (),
216 log_write_buffer: NZUsize!(1024),
217 translator: TwoCap,
218 thread_pool: None,
219 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
220 }
221 }
222
223 async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
225 let seed = context.next_u64();
226 let config = create_sync_config(&format!("sync_test_{seed}"));
227 ImmutableSyncTest::init(context, config).await.unwrap()
228 }
229
230 fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
233 create_test_ops_seeded(n, 0)
234 }
235
236 fn create_test_ops_seeded(
239 n: usize,
240 seed: u64,
241 ) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
242 let mut rng = test_rng_seeded(seed);
243 let mut ops = Vec::new();
244 for _i in 0..n {
245 let key = sha256::Digest::random(&mut rng);
246 let value = sha256::Digest::random(&mut rng);
247 ops.push(Operation::Set(key, value));
248 }
249 ops
250 }
251
252 async fn apply_ops(
254 db: &mut ImmutableSyncTestMutable,
255 ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
256 ) {
257 for op in ops {
258 match op {
259 Operation::Set(key, value) => {
260 db.set(key, value).await.unwrap();
261 }
262 Operation::Commit(_metadata) => {
263 panic!("Commit operation not supported in apply_ops");
265 }
266 }
267 }
268 }
269
270 #[rstest]
271 #[case::singleton_batch_size_one(1, NZU64!(1))]
272 #[case::singleton_batch_size_gt_db_size(1, NZU64!(2))]
273 #[case::batch_size_one(1000, NZU64!(1))]
274 #[case::floor_div_db_batch_size(1000, NZU64!(3))]
275 #[case::floor_div_db_batch_size_2(1000, NZU64!(999))]
276 #[case::div_db_batch_size(1000, NZU64!(100))]
277 #[case::db_size_eq_batch_size(1000, NZU64!(1000))]
278 #[case::batch_size_gt_db_size(1000, NZU64!(1001))]
279 fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: NonZeroU64) {
280 let executor = deterministic::Runner::default();
281 executor.start(|mut context| async move {
282 let mut target_db = create_test_db(context.clone()).await.into_mutable();
283 let target_db_ops = create_test_ops(target_db_ops);
284 apply_ops(&mut target_db, target_db_ops.clone()).await;
285 let metadata = Some(Sha256::fill(1));
286 let (durable_db, _) = target_db.commit(metadata).await.unwrap();
287 let target_db = durable_db.into_merkleized();
288 let target_op_count = target_db.op_count();
289 let target_oldest_retained_loc = target_db.oldest_retained_loc();
290 let target_root = target_db.root();
291
292 let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
294 for op in &target_db_ops {
295 if let Operation::Set(key, value) = op {
296 expected_kvs.insert(*key, *value);
297 }
298 }
299
300 let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
301
302 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
303 let config = Config {
304 db_config: db_config.clone(),
305 fetch_batch_size,
306 target: Target {
307 root: target_root,
308 range: target_oldest_retained_loc..target_op_count,
309 },
310 context: context.clone(),
311 resolver: target_db.clone(),
312 apply_batch_size: 1024,
313 max_outstanding_requests: 1,
314 update_rx: None,
315 };
316 let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
317
318 assert_eq!(got_db.op_count(), target_op_count);
320 assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
321
322 assert_eq!(got_db.root(), target_root);
324
325 for (key, expected_value) in &expected_kvs {
327 let synced_value = got_db.get(key).await.unwrap();
328 assert_eq!(synced_value, Some(*expected_value));
329 }
330
331 let mut new_ops = Vec::new();
333 let mut rng = test_rng_seeded(1);
334 let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
335 for _i in 0..expected_kvs.len() {
336 let key = sha256::Digest::random(&mut rng);
337 let value = sha256::Digest::random(&mut rng);
338 new_ops.push(Operation::Set(key, value));
339 new_kvs.insert(key, value);
340 }
341
342 let mut got_db = got_db.into_mutable();
344 apply_ops(&mut got_db, new_ops.clone()).await;
345 let mut target_db = Arc::try_unwrap(target_db).map_or_else(
346 |_| panic!("target_db should have no other references"),
347 |rw_lock| rw_lock.into_inner().into_mutable(),
348 );
349 apply_ops(&mut target_db, new_ops.clone()).await;
350
351 for (key, expected_value) in &new_kvs {
353 let synced_value = got_db.get(key).await.unwrap();
354 assert_eq!(synced_value, Some(*expected_value));
355 let target_value = target_db.get(key).await.unwrap();
356 assert_eq!(target_value, Some(*expected_value));
357 }
358
359 let (got_durable, _) = got_db.commit(None).await.unwrap();
360 got_durable.into_merkleized().destroy().await.unwrap();
361 let (target_durable, _) = target_db.commit(None).await.unwrap();
362 target_durable.into_merkleized().destroy().await.unwrap();
363 });
364 }
365
366 #[test_traced("WARN")]
368 fn test_sync_empty_to_nonempty() {
369 let executor = deterministic::Runner::default();
370 executor.start(|mut context| async move {
371 let target_db = create_test_db(context.clone()).await;
373 let target_db = target_db.into_mutable();
374 let (durable_db, _) = target_db.commit(Some(Sha256::fill(1))).await.unwrap(); let target_db = durable_db.into_merkleized();
376
377 let target_op_count = target_db.op_count();
378 let target_oldest_retained_loc = target_db.oldest_retained_loc();
379 let target_root = target_db.root();
380
381 let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
382 let target_db = Arc::new(RwLock::new(target_db));
383 let config = Config {
384 db_config,
385 fetch_batch_size: NZU64!(10),
386 target: Target {
387 root: target_root,
388 range: target_oldest_retained_loc..target_op_count,
389 },
390 context: context.clone(),
391 resolver: target_db.clone(),
392 apply_batch_size: 1024,
393 max_outstanding_requests: 1,
394 update_rx: None,
395 };
396 let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
397
398 assert_eq!(got_db.op_count(), target_op_count);
400 assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
401 assert_eq!(got_db.root(), target_root);
402 assert_eq!(got_db.get_metadata().await.unwrap(), Some(Sha256::fill(1)));
403
404 got_db.destroy().await.unwrap();
405 let target_db = Arc::try_unwrap(target_db).map_or_else(
406 |_| panic!("Failed to unwrap Arc - still has references"),
407 |rw_lock| rw_lock.into_inner(),
408 );
409 target_db.destroy().await.unwrap();
410 });
411 }
412
413 #[test_traced("WARN")]
415 fn test_sync_database_persistence() {
416 let executor = deterministic::Runner::default();
417 executor.start(|context| async move {
418 let target_db = create_test_db(context.clone()).await;
420 let mut target_db = target_db.into_mutable();
421 let target_ops = create_test_ops(10);
422 apply_ops(&mut target_db, target_ops.clone()).await;
423 let (durable_db, _) = target_db.commit(Some(Sha256::fill(0))).await.unwrap();
424 let target_db = durable_db.into_merkleized();
425
426 let target_root = target_db.root();
428 let lower_bound = target_db.oldest_retained_loc();
429 let op_count = target_db.op_count();
430
431 let db_config = create_sync_config("persistence_test");
433 let context_clone = context.clone();
434 let target_db = Arc::new(RwLock::new(target_db));
435 let config = Config {
436 db_config: db_config.clone(),
437 fetch_batch_size: NZU64!(5),
438 target: Target {
439 root: target_root,
440 range: lower_bound..op_count,
441 },
442 context,
443 resolver: target_db.clone(),
444 apply_batch_size: 1024,
445 max_outstanding_requests: 1,
446 update_rx: None,
447 };
448 let mut synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
449
450 assert_eq!(synced_db.root(), target_root);
452
453 let expected_root = synced_db.root();
455 let expected_op_count = synced_db.op_count();
456 let expected_oldest_retained_loc = synced_db.oldest_retained_loc();
457
458 synced_db.sync().await.unwrap();
460 drop(synced_db);
461 let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
462 .await
463 .unwrap();
464
465 assert_eq!(reopened_db.root(), expected_root);
467 assert_eq!(reopened_db.op_count(), expected_op_count);
468 assert_eq!(
469 reopened_db.oldest_retained_loc(),
470 expected_oldest_retained_loc
471 );
472
473 for op in &target_ops {
475 if let Operation::Set(key, value) = op {
476 let stored_value = reopened_db.get(key).await.unwrap();
477 assert_eq!(stored_value, Some(*value));
478 }
479 }
480
481 reopened_db.destroy().await.unwrap();
482 let target_db = Arc::try_unwrap(target_db).map_or_else(
483 |_| panic!("Failed to unwrap Arc - still has references"),
484 |rw_lock| rw_lock.into_inner(),
485 );
486 target_db.destroy().await.unwrap();
487 });
488 }
489
490 #[test_traced("WARN")]
492 fn test_target_update_during_sync() {
493 let executor = deterministic::Runner::default();
494 executor.start(|mut context| async move {
495 let target_db = create_test_db(context.clone()).await;
497 let mut target_db = target_db.into_mutable();
498 let initial_ops = create_test_ops(50);
499 apply_ops(&mut target_db, initial_ops.clone()).await;
500 let (durable_db, _) = target_db.commit(None).await.unwrap();
501 let target_db = durable_db.into_merkleized();
502
503 let initial_lower_bound = target_db.oldest_retained_loc();
505 let initial_upper_bound = target_db.op_count();
506 let initial_root = target_db.root();
507
508 let mut target_db = target_db.into_mutable();
511 let additional_ops = create_test_ops_seeded(25, 1);
512 apply_ops(&mut target_db, additional_ops.clone()).await;
513 let (durable_db, _) = target_db.commit(None).await.unwrap();
514 let target_db = durable_db.into_merkleized();
515 let final_upper_bound = target_db.op_count();
516 let final_root = target_db.root();
517
518 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
520
521 let (mut update_sender, update_receiver) = mpsc::channel(1);
523 let client = {
524 let config = Config {
525 context: context.clone(),
526 db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
527 target: Target {
528 root: initial_root,
529 range: initial_lower_bound..initial_upper_bound,
530 },
531 resolver: target_db.clone(),
532 fetch_batch_size: NZU64!(2), max_outstanding_requests: 10,
534 apply_batch_size: 1024,
535 update_rx: Some(update_receiver),
536 };
537 let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
538 loop {
539 client = match client.step().await.unwrap() {
541 NextStep::Continue(new_client) => new_client,
542 NextStep::Complete(_) => panic!("client should not be complete"),
543 };
544 let log_size = client.journal().size();
545 if log_size > initial_lower_bound {
546 break client;
547 }
548 }
549 };
550
551 update_sender
553 .send(Target {
554 root: final_root,
555 range: initial_lower_bound..final_upper_bound,
556 })
557 .await
558 .unwrap();
559
560 let synced_db = client.sync().await.unwrap();
562
563 assert_eq!(synced_db.root(), final_root);
565
566 let target_db = Arc::try_unwrap(target_db).map_or_else(
568 |_| panic!("Failed to unwrap Arc - still has references"),
569 |rw_lock| rw_lock.into_inner(),
570 );
571 {
572 assert_eq!(synced_db.op_count(), target_db.op_count());
573 assert_eq!(
574 synced_db.oldest_retained_loc(),
575 target_db.oldest_retained_loc()
576 );
577 assert_eq!(synced_db.root(), target_db.root());
578 }
579
580 let all_ops = [initial_ops, additional_ops].concat();
582 for op in &all_ops {
583 if let Operation::Set(key, value) = op {
584 let synced_value = synced_db.get(key).await.unwrap();
585 assert_eq!(synced_value, Some(*value));
586 }
587 }
588
589 synced_db.destroy().await.unwrap();
590 target_db.destroy().await.unwrap();
591 });
592 }
593
594 #[test]
596 fn test_sync_invalid_bounds() {
597 let executor = deterministic::Runner::default();
598 executor.start(|mut context| async move {
599 let target_db = create_test_db(context.clone()).await;
600 let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
601 let config = Config {
602 db_config,
603 fetch_batch_size: NZU64!(10),
604 target: Target {
605 root: sha256::Digest::from([1u8; 32]),
606 range: Location::new_unchecked(31)..Location::new_unchecked(31),
607 },
608 context,
609 resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
610 apply_batch_size: 1024,
611 max_outstanding_requests: 1,
612 update_rx: None,
613 };
614 let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
615 match result {
616 Err(sync::Error::Engine(sync::EngineError::InvalidTarget {
617 lower_bound_pos,
618 upper_bound_pos,
619 })) => {
620 assert_eq!(lower_bound_pos, Location::new_unchecked(31));
621 assert_eq!(upper_bound_pos, Location::new_unchecked(31));
622 }
623 _ => panic!("Expected InvalidTarget error"),
624 }
625 });
626 }
627
628 #[test]
631 fn test_sync_subset_of_target_database() {
632 let executor = deterministic::Runner::default();
633 executor.start(|mut context| async move {
634 let target_db = create_test_db(context.clone()).await;
635 let mut target_db = target_db.into_mutable();
636 let target_ops = create_test_ops(30);
637 apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
639 let (durable_db, _) = target_db.commit(None).await.unwrap();
640 let target_db = durable_db.into_merkleized();
641
642 let target_root = target_db.root();
643 let lower_bound = target_db.oldest_retained_loc();
644 let op_count = target_db.op_count();
645
646 let mut target_db = target_db.into_mutable();
648 apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
649 let (durable_db, _) = target_db.commit(None).await.unwrap();
650 let target_db = durable_db.into_merkleized();
651
652 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
653 let config = Config {
654 db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
655 fetch_batch_size: NZU64!(10),
656 target: Target {
657 root: target_root,
658 range: lower_bound..op_count,
659 },
660 context,
661 resolver: target_db.clone(),
662 apply_batch_size: 1024,
663 max_outstanding_requests: 1,
664 update_rx: None,
665 };
666 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
667
668 assert_eq!(synced_db.root(), target_root);
670 assert_eq!(synced_db.op_count(), op_count);
671
672 synced_db.destroy().await.unwrap();
673 let target_db =
674 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
675 let inner = target_db.into_inner();
676 inner.destroy().await.unwrap();
677 });
678 }
679
680 #[test]
683 fn test_sync_use_existing_db_partial_match() {
684 let executor = deterministic::Runner::default();
685 executor.start(|mut context| async move {
686 let original_ops = create_test_ops(50);
687
688 let target_db = create_test_db(context.clone()).await;
690 let mut target_db = target_db.into_mutable();
691 let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
692 let sync_db: ImmutableSyncTest =
693 immutable::Immutable::init(context.clone(), sync_db_config.clone())
694 .await
695 .unwrap();
696 let mut sync_db = sync_db.into_mutable();
697
698 apply_ops(&mut target_db, original_ops.clone()).await;
700 apply_ops(&mut sync_db, original_ops.clone()).await;
701 let (durable_db, _) = target_db.commit(None).await.unwrap();
702 let target_db = durable_db.into_merkleized();
703 let (durable_db, _) = sync_db.commit(None).await.unwrap();
704 let sync_db = durable_db.into_merkleized();
705
706 drop(sync_db);
707
708 let mut target_db = target_db.into_mutable();
711 let last_op = create_test_ops_seeded(1, 1);
712 apply_ops(&mut target_db, last_op.clone()).await;
713 let (durable_db, _) = target_db.commit(None).await.unwrap();
714 let target_db = durable_db.into_merkleized();
715 let root = target_db.root();
716 let lower_bound = target_db.oldest_retained_loc();
717 let upper_bound = target_db.op_count(); let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
721 let config = Config {
722 db_config: sync_db_config, fetch_batch_size: NZU64!(10),
724 target: Target {
725 root,
726 range: lower_bound..upper_bound,
727 },
728 context: context.clone(),
729 resolver: target_db.clone(),
730 apply_batch_size: 1024,
731 max_outstanding_requests: 1,
732 update_rx: None,
733 };
734 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
735
736 assert_eq!(sync_db.op_count(), upper_bound);
738 assert_eq!(sync_db.root(), root);
739
740 sync_db.destroy().await.unwrap();
741 let target_db =
742 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
743 let inner = target_db.into_inner();
744 inner.destroy().await.unwrap();
745 });
746 }
747
748 #[test]
750 fn test_sync_use_existing_db_exact_match() {
751 let executor = deterministic::Runner::default();
752 executor.start(|mut context| async move {
753 let target_ops = create_test_ops(40);
754
755 let target_db = create_test_db(context.clone()).await;
757 let mut target_db = target_db.into_mutable();
758 let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
759 let sync_db: ImmutableSyncTest =
760 immutable::Immutable::init(context.clone(), sync_config.clone())
761 .await
762 .unwrap();
763 let mut sync_db = sync_db.into_mutable();
764
765 apply_ops(&mut target_db, target_ops.clone()).await;
767 apply_ops(&mut sync_db, target_ops.clone()).await;
768 let (durable_db, _) = target_db.commit(None).await.unwrap();
769 let target_db = durable_db.into_merkleized();
770 let (durable_db, _) = sync_db.commit(None).await.unwrap();
771 let sync_db = durable_db.into_merkleized();
772
773 drop(sync_db);
774
775 let root = target_db.root();
777 let lower_bound = target_db.oldest_retained_loc();
778 let upper_bound = target_db.op_count();
779
780 let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
782 let config = Config {
783 db_config: sync_config,
784 fetch_batch_size: NZU64!(10),
785 target: Target {
786 root,
787 range: lower_bound..upper_bound,
788 },
789 context,
790 resolver: resolver.clone(),
791 apply_batch_size: 1024,
792 max_outstanding_requests: 1,
793 update_rx: None,
794 };
795 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
796
797 assert_eq!(sync_db.op_count(), upper_bound);
798 assert_eq!(sync_db.root(), root);
799
800 sync_db.destroy().await.unwrap();
801 let target_db =
802 Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
803 let inner = target_db.into_inner();
804 inner.destroy().await.unwrap();
805 });
806 }
807
808 #[test_traced("WARN")]
810 fn test_target_update_lower_bound_decrease() {
811 let executor = deterministic::Runner::default();
812 executor.start(|mut context| async move {
813 let target_db = create_test_db(context.clone()).await;
815 let mut target_db = target_db.into_mutable();
816 let target_ops = create_test_ops(100);
817 apply_ops(&mut target_db, target_ops).await;
818 let (durable_db, _) = target_db.commit(None).await.unwrap();
819 let mut target_db = durable_db.into_merkleized();
820
821 target_db.prune(Location::new_unchecked(10)).await.unwrap();
822
823 let initial_lower_bound = target_db.oldest_retained_loc();
825 let initial_upper_bound = target_db.op_count();
826 let initial_root = target_db.root();
827
828 let (mut update_sender, update_receiver) = mpsc::channel(1);
830 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
831 let config = Config {
832 context: context.clone(),
833 db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
834 fetch_batch_size: NZU64!(5),
835 target: Target {
836 root: initial_root,
837 range: initial_lower_bound..initial_upper_bound,
838 },
839 resolver: target_db.clone(),
840 apply_batch_size: 1024,
841 max_outstanding_requests: 10,
842 update_rx: Some(update_receiver),
843 };
844 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
845
846 update_sender
848 .send(Target {
849 root: initial_root,
850 range: initial_lower_bound.checked_sub(1).unwrap()..initial_upper_bound,
851 })
852 .await
853 .unwrap();
854
855 let result = client.step().await;
856 assert!(matches!(
857 result,
858 Err(sync::Error::Engine(
859 sync::EngineError::SyncTargetMovedBackward { .. }
860 ))
861 ));
862
863 let target_db =
864 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
865 let inner = target_db.into_inner();
866 inner.destroy().await.unwrap();
867 });
868 }
869
870 #[test_traced("WARN")]
872 fn test_target_update_upper_bound_decrease() {
873 let executor = deterministic::Runner::default();
874 executor.start(|mut context| async move {
875 let target_db = create_test_db(context.clone()).await;
877 let mut target_db = target_db.into_mutable();
878 let target_ops = create_test_ops(50);
879 apply_ops(&mut target_db, target_ops).await;
880 let (durable_db, _) = target_db.commit(None).await.unwrap();
881 let target_db = durable_db.into_merkleized();
882
883 let initial_lower_bound = target_db.oldest_retained_loc();
885 let initial_upper_bound = target_db.op_count();
886 let initial_root = target_db.root();
887
888 let (mut update_sender, update_receiver) = mpsc::channel(1);
890 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
891 let config = Config {
892 context: context.clone(),
893 db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
894 fetch_batch_size: NZU64!(5),
895 target: Target {
896 root: initial_root,
897 range: initial_lower_bound..initial_upper_bound,
898 },
899 resolver: target_db.clone(),
900 apply_batch_size: 1024,
901 max_outstanding_requests: 10,
902 update_rx: Some(update_receiver),
903 };
904 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
905
906 update_sender
908 .send(Target {
909 root: initial_root,
910 range: initial_lower_bound..(initial_upper_bound - 1),
911 })
912 .await
913 .unwrap();
914
915 let result = client.step().await;
916 assert!(matches!(
917 result,
918 Err(sync::Error::Engine(
919 sync::EngineError::SyncTargetMovedBackward { .. }
920 ))
921 ));
922
923 let target_db =
924 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
925 let inner = target_db.into_inner();
926 inner.destroy().await.unwrap();
927 });
928 }
929
930 #[test_traced("WARN")]
932 fn test_target_update_bounds_increase() {
933 let executor = deterministic::Runner::default();
934 executor.start(|mut context| async move {
935 let target_db = create_test_db(context.clone()).await;
937 let mut target_db = target_db.into_mutable();
938 let target_ops = create_test_ops(100);
939 apply_ops(&mut target_db, target_ops.clone()).await;
940 let (durable_db, _) = target_db.commit(None).await.unwrap();
941 let target_db = durable_db.into_merkleized();
942
943 let initial_lower_bound = target_db.oldest_retained_loc();
945 let initial_upper_bound = target_db.op_count();
946 let initial_root = target_db.root();
947
948 let mut target_db = target_db.into_mutable();
951 let more_ops = create_test_ops_seeded(5, 1);
952 apply_ops(&mut target_db, more_ops.clone()).await;
953 let (durable_db, _) = target_db.commit(None).await.unwrap();
954 let mut target_db = durable_db.into_merkleized();
955
956 target_db.prune(Location::new_unchecked(10)).await.unwrap();
957 let target_db = target_db.into_mutable();
958 let (durable_db, _) = target_db.commit(None).await.unwrap();
959 let target_db = durable_db.into_merkleized();
960
961 let final_lower_bound = target_db.oldest_retained_loc();
963 let final_upper_bound = target_db.op_count();
964 let final_root = target_db.root();
965
966 assert_ne!(final_lower_bound, initial_lower_bound);
968 assert_ne!(final_upper_bound, initial_upper_bound);
969
970 let (mut update_sender, update_receiver) = mpsc::channel(1);
972 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
973 let config = Config {
974 context: context.clone(),
975 db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
976 fetch_batch_size: NZU64!(1),
977 target: Target {
978 root: initial_root,
979 range: initial_lower_bound..initial_upper_bound,
980 },
981 resolver: target_db.clone(),
982 apply_batch_size: 1024,
983 max_outstanding_requests: 1,
984 update_rx: Some(update_receiver),
985 };
986
987 update_sender
989 .send(Target {
990 root: final_root,
991 range: final_lower_bound..final_upper_bound,
992 })
993 .await
994 .unwrap();
995
996 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
998
999 assert_eq!(synced_db.root(), final_root);
1001 assert_eq!(synced_db.op_count(), final_upper_bound);
1002 assert_eq!(synced_db.oldest_retained_loc(), final_lower_bound);
1003
1004 synced_db.destroy().await.unwrap();
1005 let target_db = Arc::try_unwrap(target_db).map_or_else(
1006 |_| panic!("Failed to unwrap Arc - still has references"),
1007 |rw_lock| rw_lock.into_inner(),
1008 );
1009 target_db.destroy().await.unwrap();
1010 });
1011 }
1012
1013 #[test_traced("WARN")]
1015 fn test_target_update_invalid_bounds() {
1016 let executor = deterministic::Runner::default();
1017 executor.start(|mut context| async move {
1018 let target_db = create_test_db(context.clone()).await;
1020 let mut target_db = target_db.into_mutable();
1021 let target_ops = create_test_ops(25);
1022 apply_ops(&mut target_db, target_ops).await;
1023 let (durable_db, _) = target_db.commit(None).await.unwrap();
1024 let target_db = durable_db.into_merkleized();
1025
1026 let initial_lower_bound = target_db.oldest_retained_loc();
1028 let initial_upper_bound = target_db.op_count();
1029 let initial_root = target_db.root();
1030
1031 let (mut update_sender, update_receiver) = mpsc::channel(1);
1033 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1034 let config = Config {
1035 context: context.clone(),
1036 db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
1037 fetch_batch_size: NZU64!(5),
1038 target: Target {
1039 root: initial_root,
1040 range: initial_lower_bound..initial_upper_bound,
1041 },
1042 resolver: target_db.clone(),
1043 apply_batch_size: 1024,
1044 max_outstanding_requests: 10,
1045 update_rx: Some(update_receiver),
1046 };
1047 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
1048
1049 update_sender
1051 .send(Target {
1052 root: initial_root,
1053 range: initial_upper_bound..initial_lower_bound,
1054 })
1055 .await
1056 .unwrap();
1057
1058 let result = client.step().await;
1059 assert!(matches!(
1060 result,
1061 Err(sync::Error::Engine(sync::EngineError::InvalidTarget { .. }))
1062 ));
1063
1064 let target_db =
1065 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1066 let inner = target_db.into_inner();
1067 inner.destroy().await.unwrap();
1068 });
1069 }
1070
1071 #[test_traced("WARN")]
1073 fn test_target_update_on_done_client() {
1074 let executor = deterministic::Runner::default();
1075 executor.start(|mut context| async move {
1076 let target_db = create_test_db(context.clone()).await;
1078 let mut target_db = target_db.into_mutable();
1079 let target_ops = create_test_ops(10);
1080 apply_ops(&mut target_db, target_ops).await;
1081 let (durable_db, _) = target_db.commit(None).await.unwrap();
1082 let target_db = durable_db.into_merkleized();
1083
1084 let lower_bound = target_db.oldest_retained_loc();
1086 let upper_bound = target_db.op_count();
1087 let root = target_db.root();
1088
1089 let (mut update_sender, update_receiver) = mpsc::channel(1);
1091 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1092 let config = Config {
1093 context: context.clone(),
1094 db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1095 fetch_batch_size: NZU64!(20),
1096 target: Target {
1097 root,
1098 range: lower_bound..upper_bound,
1099 },
1100 resolver: target_db.clone(),
1101 apply_batch_size: 1024,
1102 max_outstanding_requests: 10,
1103 update_rx: Some(update_receiver),
1104 };
1105
1106 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1108
1109 let _ = update_sender
1111 .send(Target {
1112 root: sha256::Digest::from([2u8; 32]),
1113 range: lower_bound + 1..upper_bound + 1,
1114 })
1115 .await;
1116
1117 assert_eq!(synced_db.root(), root);
1119 assert_eq!(synced_db.op_count(), upper_bound);
1120 assert_eq!(synced_db.oldest_retained_loc(), lower_bound);
1121
1122 synced_db.destroy().await.unwrap();
1123 Arc::try_unwrap(target_db)
1124 .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1125 .into_inner()
1126 .destroy()
1127 .await
1128 .unwrap();
1129 });
1130 }
1131}