1use crate::{
2 journal::contiguous::variable,
3 mmr::Location,
4 qmdb::{
5 any::VariableValue,
6 immutable::{self, Operation},
7 sync, Error,
8 },
9 translator::Translator,
10};
11use commonware_cryptography::Hasher;
12use commonware_runtime::{Clock, Metrics, Storage};
13use commonware_utils::Array;
14use std::ops::Range;
15
16impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
17where
18 E: Storage + Clock + Metrics,
19 K: Array,
20 V: VariableValue,
21 H: Hasher,
22 T: Translator,
23{
24 type Op = Operation<K, V>;
25 type Journal = variable::Journal<E, Self::Op>;
26 type Hasher = H;
27 type Config = immutable::Config<T, V::Cfg>;
28 type Digest = H::Digest;
29 type Context = E;
30
31 async fn create_journal(
32 context: Self::Context,
33 config: &Self::Config,
34 range: Range<Location>,
35 ) -> Result<Self::Journal, Error> {
36 variable::Journal::init_sync(
38 context.with_label("log"),
39 variable::Config {
40 items_per_section: config.log_items_per_section,
41 partition: config.log_partition.clone(),
42 compression: config.log_compression,
43 codec_config: config.log_codec_config.clone(),
44 buffer_pool: config.buffer_pool.clone(),
45 write_buffer: config.log_write_buffer,
46 },
47 *range.start..*range.end,
48 )
49 .await
50 }
51
52 async fn from_sync_result(
68 context: Self::Context,
69 db_config: Self::Config,
70 journal: Self::Journal,
71 pinned_nodes: Option<Vec<Self::Digest>>,
72 range: Range<Location>,
73 apply_batch_size: usize,
74 ) -> Result<Self, Error> {
75 let sync_config = Config {
76 db_config,
77 log: journal,
78 range,
79 pinned_nodes,
80 apply_batch_size,
81 };
82 Self::init_synced(context, sync_config).await
83 }
84
85 fn root(&self) -> Self::Digest {
86 self.root()
87 }
88
89 async fn resize_journal(
90 mut journal: Self::Journal,
91 context: Self::Context,
92 config: &Self::Config,
93 range: Range<Location>,
94 ) -> Result<Self::Journal, Error> {
95 let size = journal.size();
96
97 if size <= range.start {
98 journal.destroy().await?;
100 Self::create_journal(context, config, range).await
101 } else {
102 journal
104 .prune(*range.start)
105 .await
106 .map_err(crate::qmdb::Error::from)?;
107
108 let size = journal.size();
110 if size > range.end {
111 return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
112 size,
113 )));
114 }
115
116 Ok(journal)
117 }
118 }
119}
120
121pub struct Config<E, K, V, T, D, C>
123where
124 E: Storage + Metrics,
125 K: Array,
126 V: VariableValue,
127 T: Translator,
128 D: commonware_cryptography::Digest,
129{
130 pub db_config: immutable::Config<T, C>,
132
133 pub log: variable::Journal<E, Operation<K, V>>,
136
137 pub range: Range<Location>,
139
140 pub pinned_nodes: Option<Vec<D>>,
145
146 pub apply_batch_size: usize,
150}
151
152#[cfg(test)]
153mod tests {
154 use crate::{
155 mmr::Location,
156 qmdb::{
157 immutable,
158 immutable::Operation,
159 sync::{
160 self,
161 engine::{Config, NextStep},
162 Engine, Target,
163 },
164 },
165 translator::TwoCap,
166 };
167 use commonware_cryptography::{sha256, Sha256};
168 use commonware_macros::test_traced;
169 use commonware_math::algebra::Random;
170 use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
171 use commonware_utils::{NZUsize, NZU64};
172 use futures::{channel::mpsc, SinkExt as _};
173 use rand::{rngs::StdRng, RngCore as _, SeedableRng as _};
174 use rstest::rstest;
175 use std::{
176 collections::HashMap,
177 num::{NonZeroU64, NonZeroUsize},
178 sync::Arc,
179 };
180
181 type ImmutableSyncTest = immutable::Immutable<
183 deterministic::Context,
184 sha256::Digest,
185 sha256::Digest,
186 Sha256,
187 crate::translator::TwoCap,
188 >;
189
190 fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
192 const PAGE_SIZE: NonZeroUsize = NZUsize!(77);
193 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
194 const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
195
196 immutable::Config {
197 mmr_journal_partition: format!("journal_{suffix}"),
198 mmr_metadata_partition: format!("metadata_{suffix}"),
199 mmr_items_per_blob: NZU64!(11),
200 mmr_write_buffer: NZUsize!(1024),
201 log_partition: format!("log_{suffix}"),
202 log_items_per_section: ITEMS_PER_SECTION,
203 log_compression: None,
204 log_codec_config: (),
205 log_write_buffer: NZUsize!(1024),
206 translator: TwoCap,
207 thread_pool: None,
208 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
209 }
210 }
211
212 async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
214 let seed = context.next_u64();
215 let config = create_sync_config(&format!("sync_test_{seed}"));
216 ImmutableSyncTest::init(context, config).await.unwrap()
217 }
218
219 fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
222 let mut rng = StdRng::seed_from_u64(1337);
223 let mut ops = Vec::new();
224 for _i in 0..n {
225 let key = sha256::Digest::random(&mut rng);
226 let value = sha256::Digest::random(&mut rng);
227 ops.push(Operation::Set(key, value));
228 }
229 ops
230 }
231
232 async fn apply_ops(
234 db: &mut ImmutableSyncTest,
235 ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
236 ) {
237 for op in ops {
238 match op {
239 Operation::Set(key, value) => {
240 db.set(key, value).await.unwrap();
241 }
242 Operation::Commit(metadata) => {
243 db.commit(metadata).await.unwrap();
244 }
245 }
246 }
247 }
248
249 #[rstest]
250 #[case::singleton_batch_size_one(1, NZU64!(1))]
251 #[case::singleton_batch_size_gt_db_size(1, NZU64!(2))]
252 #[case::batch_size_one(1000, NZU64!(1))]
253 #[case::floor_div_db_batch_size(1000, NZU64!(3))]
254 #[case::floor_div_db_batch_size_2(1000, NZU64!(999))]
255 #[case::div_db_batch_size(1000, NZU64!(100))]
256 #[case::db_size_eq_batch_size(1000, NZU64!(1000))]
257 #[case::batch_size_gt_db_size(1000, NZU64!(1001))]
258 fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: NonZeroU64) {
259 let executor = deterministic::Runner::default();
260 executor.start(|mut context| async move {
261 let mut target_db = create_test_db(context.clone()).await;
262 let target_db_ops = create_test_ops(target_db_ops);
263 apply_ops(&mut target_db, target_db_ops.clone()).await;
264 let metadata = Some(Sha256::fill(1));
265 target_db.commit(metadata).await.unwrap();
266 let target_op_count = target_db.op_count();
267 let target_oldest_retained_loc = target_db.oldest_retained_loc();
268 let target_root = target_db.root();
269
270 let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
272 for op in &target_db_ops {
273 if let Operation::Set(key, value) = op {
274 expected_kvs.insert(*key, *value);
275 }
276 }
277
278 let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
279
280 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
281 let config = Config {
282 db_config: db_config.clone(),
283 fetch_batch_size,
284 target: Target {
285 root: target_root,
286 range: target_oldest_retained_loc..target_op_count,
287 },
288 context: context.clone(),
289 resolver: target_db.clone(),
290 apply_batch_size: 1024,
291 max_outstanding_requests: 1,
292 update_rx: None,
293 };
294 let mut got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
295
296 assert_eq!(got_db.op_count(), target_op_count);
298 assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
299
300 assert_eq!(got_db.root(), target_root);
302
303 for (key, expected_value) in &expected_kvs {
305 let synced_value = got_db.get(key).await.unwrap();
306 assert_eq!(synced_value, Some(*expected_value));
307 }
308
309 let mut new_ops = Vec::new();
311 let mut rng = StdRng::seed_from_u64(42);
312 let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
313 for _i in 0..expected_kvs.len() {
314 let key = sha256::Digest::random(&mut rng);
315 let value = sha256::Digest::random(&mut rng);
316 new_ops.push(Operation::Set(key, value));
317 new_kvs.insert(key, value);
318 }
319
320 apply_ops(&mut got_db, new_ops.clone()).await;
322 {
323 let mut target_db = target_db.write().await;
324 apply_ops(&mut target_db, new_ops).await;
325 }
326
327 for (key, expected_value) in &new_kvs {
329 let synced_value = got_db.get(key).await.unwrap();
330 let target_value = {
331 let target_db = target_db.read().await;
332 target_db.get(key).await.unwrap()
333 };
334 assert_eq!(synced_value, Some(*expected_value));
335 assert_eq!(target_value, Some(*expected_value));
336 }
337
338 got_db.destroy().await.unwrap();
339 let target_db = Arc::try_unwrap(target_db).map_or_else(
340 |_| panic!("Failed to unwrap Arc - still has references"),
341 |rw_lock| rw_lock.into_inner(),
342 );
343 target_db.destroy().await.unwrap();
344 });
345 }
346
347 #[test_traced("WARN")]
349 fn test_sync_empty_to_nonempty() {
350 let executor = deterministic::Runner::default();
351 executor.start(|mut context| async move {
352 let mut target_db = create_test_db(context.clone()).await;
354 target_db.commit(Some(Sha256::fill(1))).await.unwrap(); let target_op_count = target_db.op_count();
357 let target_oldest_retained_loc = target_db.oldest_retained_loc();
358 let target_root = target_db.root();
359
360 let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
361 let target_db = Arc::new(RwLock::new(target_db));
362 let config = Config {
363 db_config,
364 fetch_batch_size: NZU64!(10),
365 target: Target {
366 root: target_root,
367 range: target_oldest_retained_loc..target_op_count,
368 },
369 context: context.clone(),
370 resolver: target_db.clone(),
371 apply_batch_size: 1024,
372 max_outstanding_requests: 1,
373 update_rx: None,
374 };
375 let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
376
377 assert_eq!(got_db.op_count(), target_op_count);
379 assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
380 assert_eq!(got_db.root(), target_root);
381 assert_eq!(got_db.get_metadata().await.unwrap(), Some(Sha256::fill(1)));
382
383 got_db.destroy().await.unwrap();
384 let target_db = Arc::try_unwrap(target_db).map_or_else(
385 |_| panic!("Failed to unwrap Arc - still has references"),
386 |rw_lock| rw_lock.into_inner(),
387 );
388 target_db.destroy().await.unwrap();
389 });
390 }
391
392 #[test_traced("WARN")]
394 fn test_sync_database_persistence() {
395 let executor = deterministic::Runner::default();
396 executor.start(|context| async move {
397 let mut target_db = create_test_db(context.clone()).await;
399 let target_ops = create_test_ops(10);
400 apply_ops(&mut target_db, target_ops.clone()).await;
401 target_db.commit(Some(Sha256::fill(0))).await.unwrap();
402
403 let target_root = target_db.root();
405 let lower_bound = target_db.oldest_retained_loc();
406 let op_count = target_db.op_count();
407
408 let db_config = create_sync_config("persistence_test");
410 let context_clone = context.clone();
411 let target_db = Arc::new(RwLock::new(target_db));
412 let config = Config {
413 db_config: db_config.clone(),
414 fetch_batch_size: NZU64!(5),
415 target: Target {
416 root: target_root,
417 range: lower_bound..op_count,
418 },
419 context,
420 resolver: target_db.clone(),
421 apply_batch_size: 1024,
422 max_outstanding_requests: 1,
423 update_rx: None,
424 };
425 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
426
427 assert_eq!(synced_db.root(), target_root);
429
430 let expected_root = synced_db.root();
432 let expected_op_count = synced_db.op_count();
433 let expected_oldest_retained_loc = synced_db.oldest_retained_loc();
434
435 synced_db.close().await.unwrap();
437 let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
438 .await
439 .unwrap();
440
441 assert_eq!(reopened_db.root(), expected_root);
443 assert_eq!(reopened_db.op_count(), expected_op_count);
444 assert_eq!(
445 reopened_db.oldest_retained_loc(),
446 expected_oldest_retained_loc
447 );
448
449 for op in &target_ops {
451 if let Operation::Set(key, value) = op {
452 let stored_value = reopened_db.get(key).await.unwrap();
453 assert_eq!(stored_value, Some(*value));
454 }
455 }
456
457 reopened_db.destroy().await.unwrap();
458 let target_db = Arc::try_unwrap(target_db).map_or_else(
459 |_| panic!("Failed to unwrap Arc - still has references"),
460 |rw_lock| rw_lock.into_inner(),
461 );
462 target_db.destroy().await.unwrap();
463 });
464 }
465
466 #[test_traced("WARN")]
468 fn test_target_update_during_sync() {
469 let executor = deterministic::Runner::default();
470 executor.start(|mut context| async move {
471 let mut target_db = create_test_db(context.clone()).await;
473 let initial_ops = create_test_ops(50);
474 apply_ops(&mut target_db, initial_ops.clone()).await;
475 target_db.commit(None).await.unwrap();
476
477 let initial_lower_bound = target_db.oldest_retained_loc();
479 let initial_upper_bound = target_db.op_count();
480 let initial_root = target_db.root();
481
482 let additional_ops = create_test_ops(25);
484 apply_ops(&mut target_db, additional_ops.clone()).await;
485 target_db.commit(None).await.unwrap();
486 let final_upper_bound = target_db.op_count();
487 let final_root = target_db.root();
488
489 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
491
492 let (mut update_sender, update_receiver) = mpsc::channel(1);
494 let client = {
495 let config = Config {
496 context: context.clone(),
497 db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
498 target: Target {
499 root: initial_root,
500 range: initial_lower_bound..initial_upper_bound,
501 },
502 resolver: target_db.clone(),
503 fetch_batch_size: NZU64!(2), max_outstanding_requests: 10,
505 apply_batch_size: 1024,
506 update_rx: Some(update_receiver),
507 };
508 let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
509 loop {
510 client = match client.step().await.unwrap() {
512 NextStep::Continue(new_client) => new_client,
513 NextStep::Complete(_) => panic!("client should not be complete"),
514 };
515 let log_size = client.journal().size();
516 if log_size > initial_lower_bound {
517 break client;
518 }
519 }
520 };
521
522 update_sender
524 .send(Target {
525 root: final_root,
526 range: initial_lower_bound..final_upper_bound,
527 })
528 .await
529 .unwrap();
530
531 let synced_db = client.sync().await.unwrap();
533
534 assert_eq!(synced_db.root(), final_root);
536
537 let target_db = Arc::try_unwrap(target_db).map_or_else(
539 |_| panic!("Failed to unwrap Arc - still has references"),
540 |rw_lock| rw_lock.into_inner(),
541 );
542 {
543 assert_eq!(synced_db.op_count(), target_db.op_count());
544 assert_eq!(
545 synced_db.oldest_retained_loc(),
546 target_db.oldest_retained_loc()
547 );
548 assert_eq!(synced_db.root(), target_db.root());
549 }
550
551 let all_ops = [initial_ops, additional_ops].concat();
553 for op in &all_ops {
554 if let Operation::Set(key, value) = op {
555 let synced_value = synced_db.get(key).await.unwrap();
556 assert_eq!(synced_value, Some(*value));
557 }
558 }
559
560 synced_db.destroy().await.unwrap();
561 target_db.destroy().await.unwrap();
562 });
563 }
564
565 #[test]
567 fn test_sync_invalid_bounds() {
568 let executor = deterministic::Runner::default();
569 executor.start(|mut context| async move {
570 let target_db = create_test_db(context.clone()).await;
571 let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
572 let config = Config {
573 db_config,
574 fetch_batch_size: NZU64!(10),
575 target: Target {
576 root: sha256::Digest::from([1u8; 32]),
577 range: Location::new_unchecked(31)..Location::new_unchecked(31),
578 },
579 context,
580 resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
581 apply_batch_size: 1024,
582 max_outstanding_requests: 1,
583 update_rx: None,
584 };
585 let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
586 match result {
587 Err(sync::Error::Engine(sync::EngineError::InvalidTarget {
588 lower_bound_pos,
589 upper_bound_pos,
590 })) => {
591 assert_eq!(lower_bound_pos, Location::new_unchecked(31));
592 assert_eq!(upper_bound_pos, Location::new_unchecked(31));
593 }
594 _ => panic!("Expected InvalidTarget error"),
595 }
596 });
597 }
598
599 #[test]
602 fn test_sync_subset_of_target_database() {
603 let executor = deterministic::Runner::default();
604 executor.start(|mut context| async move {
605 let mut target_db = create_test_db(context.clone()).await;
606 let target_ops = create_test_ops(30);
607 apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
609 target_db.commit(None).await.unwrap();
610
611 let target_root = target_db.root();
612 let lower_bound = target_db.oldest_retained_loc();
613 let op_count = target_db.op_count();
614
615 apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
617 target_db.commit(None).await.unwrap();
618
619 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
620 let config = Config {
621 db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
622 fetch_batch_size: NZU64!(10),
623 target: Target {
624 root: target_root,
625 range: lower_bound..op_count,
626 },
627 context,
628 resolver: target_db.clone(),
629 apply_batch_size: 1024,
630 max_outstanding_requests: 1,
631 update_rx: None,
632 };
633 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
634
635 assert_eq!(synced_db.root(), target_root);
637 assert_eq!(synced_db.op_count(), op_count);
638
639 synced_db.destroy().await.unwrap();
640 let target_db =
641 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
642 let inner = target_db.into_inner();
643 inner.destroy().await.unwrap();
644 });
645 }
646
647 #[test]
650 fn test_sync_use_existing_db_partial_match() {
651 let executor = deterministic::Runner::default();
652 executor.start(|mut context| async move {
653 let original_ops = create_test_ops(50);
654
655 let mut target_db = create_test_db(context.clone()).await;
657 let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
658 let mut sync_db: ImmutableSyncTest =
659 immutable::Immutable::init(context.clone(), sync_db_config.clone())
660 .await
661 .unwrap();
662
663 apply_ops(&mut target_db, original_ops.clone()).await;
665 apply_ops(&mut sync_db, original_ops.clone()).await;
666 target_db.commit(None).await.unwrap();
667 sync_db.commit(None).await.unwrap();
668
669 sync_db.close().await.unwrap();
671
672 let last_op = create_test_ops(1);
674 apply_ops(&mut target_db, last_op.clone()).await;
675 target_db.commit(None).await.unwrap();
676 let root = target_db.root();
677 let lower_bound = target_db.oldest_retained_loc();
678 let upper_bound = target_db.op_count(); let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
682 let config = Config {
683 db_config: sync_db_config, fetch_batch_size: NZU64!(10),
685 target: Target {
686 root,
687 range: lower_bound..upper_bound,
688 },
689 context: context.clone(),
690 resolver: target_db.clone(),
691 apply_batch_size: 1024,
692 max_outstanding_requests: 1,
693 update_rx: None,
694 };
695 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
696
697 assert_eq!(sync_db.op_count(), upper_bound);
699 assert_eq!(sync_db.root(), root);
700
701 sync_db.destroy().await.unwrap();
702 let target_db =
703 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
704 let inner = target_db.into_inner();
705 inner.destroy().await.unwrap();
706 });
707 }
708
709 #[test]
711 fn test_sync_use_existing_db_exact_match() {
712 let executor = deterministic::Runner::default();
713 executor.start(|mut context| async move {
714 let target_ops = create_test_ops(40);
715
716 let mut target_db = create_test_db(context.clone()).await;
718 let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
719 let mut sync_db: ImmutableSyncTest =
720 immutable::Immutable::init(context.clone(), sync_config.clone())
721 .await
722 .unwrap();
723
724 apply_ops(&mut target_db, target_ops.clone()).await;
726 apply_ops(&mut sync_db, target_ops.clone()).await;
727 target_db.commit(None).await.unwrap();
728 sync_db.commit(None).await.unwrap();
729
730 sync_db.close().await.unwrap();
732
733 let root = target_db.root();
735 let lower_bound = target_db.oldest_retained_loc();
736 let upper_bound = target_db.op_count();
737
738 let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
740 let config = Config {
741 db_config: sync_config,
742 fetch_batch_size: NZU64!(10),
743 target: Target {
744 root,
745 range: lower_bound..upper_bound,
746 },
747 context,
748 resolver: resolver.clone(),
749 apply_batch_size: 1024,
750 max_outstanding_requests: 1,
751 update_rx: None,
752 };
753 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
754
755 assert_eq!(sync_db.op_count(), upper_bound);
756 assert_eq!(sync_db.root(), root);
757
758 sync_db.destroy().await.unwrap();
759 let target_db =
760 Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
761 let inner = target_db.into_inner();
762 inner.destroy().await.unwrap();
763 });
764 }
765
766 #[test_traced("WARN")]
768 fn test_target_update_lower_bound_decrease() {
769 let executor = deterministic::Runner::default();
770 executor.start(|mut context| async move {
771 let mut target_db = create_test_db(context.clone()).await;
773 let target_ops = create_test_ops(100);
774 apply_ops(&mut target_db, target_ops).await;
775 target_db.commit(None).await.unwrap();
776
777 target_db.prune(Location::new_unchecked(10)).await.unwrap();
778
779 let initial_lower_bound = target_db.oldest_retained_loc();
781 let initial_upper_bound = target_db.op_count();
782 let initial_root = target_db.root();
783
784 let (mut update_sender, update_receiver) = mpsc::channel(1);
786 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
787 let config = Config {
788 context: context.clone(),
789 db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
790 fetch_batch_size: NZU64!(5),
791 target: Target {
792 root: initial_root,
793 range: initial_lower_bound..initial_upper_bound,
794 },
795 resolver: target_db.clone(),
796 apply_batch_size: 1024,
797 max_outstanding_requests: 10,
798 update_rx: Some(update_receiver),
799 };
800 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
801
802 update_sender
804 .send(Target {
805 root: initial_root,
806 range: initial_lower_bound.checked_sub(1).unwrap()..initial_upper_bound,
807 })
808 .await
809 .unwrap();
810
811 let result = client.step().await;
812 assert!(matches!(
813 result,
814 Err(sync::Error::Engine(
815 sync::EngineError::SyncTargetMovedBackward { .. }
816 ))
817 ));
818
819 let target_db =
820 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
821 let inner = target_db.into_inner();
822 inner.destroy().await.unwrap();
823 });
824 }
825
826 #[test_traced("WARN")]
828 fn test_target_update_upper_bound_decrease() {
829 let executor = deterministic::Runner::default();
830 executor.start(|mut context| async move {
831 let mut target_db = create_test_db(context.clone()).await;
833 let target_ops = create_test_ops(50);
834 apply_ops(&mut target_db, target_ops).await;
835 target_db.commit(None).await.unwrap();
836
837 let initial_lower_bound = target_db.oldest_retained_loc();
839 let initial_upper_bound = target_db.op_count();
840 let initial_root = target_db.root();
841
842 let (mut update_sender, update_receiver) = mpsc::channel(1);
844 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
845 let config = Config {
846 context: context.clone(),
847 db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
848 fetch_batch_size: NZU64!(5),
849 target: Target {
850 root: initial_root,
851 range: initial_lower_bound..initial_upper_bound,
852 },
853 resolver: target_db.clone(),
854 apply_batch_size: 1024,
855 max_outstanding_requests: 10,
856 update_rx: Some(update_receiver),
857 };
858 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
859
860 update_sender
862 .send(Target {
863 root: initial_root,
864 range: initial_lower_bound..(initial_upper_bound - 1),
865 })
866 .await
867 .unwrap();
868
869 let result = client.step().await;
870 assert!(matches!(
871 result,
872 Err(sync::Error::Engine(
873 sync::EngineError::SyncTargetMovedBackward { .. }
874 ))
875 ));
876
877 let target_db =
878 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
879 let inner = target_db.into_inner();
880 inner.destroy().await.unwrap();
881 });
882 }
883
884 #[test_traced("WARN")]
886 fn test_target_update_bounds_increase() {
887 let executor = deterministic::Runner::default();
888 executor.start(|mut context| async move {
889 let mut target_db = create_test_db(context.clone()).await;
891 let target_ops = create_test_ops(100);
892 apply_ops(&mut target_db, target_ops.clone()).await;
893 target_db.commit(None).await.unwrap();
894
895 let initial_lower_bound = target_db.oldest_retained_loc();
897 let initial_upper_bound = target_db.op_count();
898 let initial_root = target_db.root();
899
900 let more_ops = create_test_ops(5);
902 apply_ops(&mut target_db, more_ops.clone()).await;
903 target_db.commit(None).await.unwrap();
904
905 target_db.prune(Location::new_unchecked(10)).await.unwrap();
906 target_db.commit(None).await.unwrap();
907
908 let final_lower_bound = target_db.oldest_retained_loc();
910 let final_upper_bound = target_db.op_count();
911 let final_root = target_db.root();
912
913 assert_ne!(final_lower_bound, initial_lower_bound);
915 assert_ne!(final_upper_bound, initial_upper_bound);
916
917 let (mut update_sender, update_receiver) = mpsc::channel(1);
919 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
920 let config = Config {
921 context: context.clone(),
922 db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
923 fetch_batch_size: NZU64!(1),
924 target: Target {
925 root: initial_root,
926 range: initial_lower_bound..initial_upper_bound,
927 },
928 resolver: target_db.clone(),
929 apply_batch_size: 1024,
930 max_outstanding_requests: 1,
931 update_rx: Some(update_receiver),
932 };
933
934 update_sender
936 .send(Target {
937 root: final_root,
938 range: final_lower_bound..final_upper_bound,
939 })
940 .await
941 .unwrap();
942
943 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
945
946 assert_eq!(synced_db.root(), final_root);
948 assert_eq!(synced_db.op_count(), final_upper_bound);
949 assert_eq!(synced_db.oldest_retained_loc(), final_lower_bound);
950
951 synced_db.destroy().await.unwrap();
952 let target_db = Arc::try_unwrap(target_db).map_or_else(
953 |_| panic!("Failed to unwrap Arc - still has references"),
954 |rw_lock| rw_lock.into_inner(),
955 );
956 target_db.destroy().await.unwrap();
957 });
958 }
959
960 #[test_traced("WARN")]
962 fn test_target_update_invalid_bounds() {
963 let executor = deterministic::Runner::default();
964 executor.start(|mut context| async move {
965 let mut target_db = create_test_db(context.clone()).await;
967 let target_ops = create_test_ops(25);
968 apply_ops(&mut target_db, target_ops).await;
969 target_db.commit(None).await.unwrap();
970
971 let initial_lower_bound = target_db.oldest_retained_loc();
973 let initial_upper_bound = target_db.op_count();
974 let initial_root = target_db.root();
975
976 let (mut update_sender, update_receiver) = mpsc::channel(1);
978 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
979 let config = Config {
980 context: context.clone(),
981 db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
982 fetch_batch_size: NZU64!(5),
983 target: Target {
984 root: initial_root,
985 range: initial_lower_bound..initial_upper_bound,
986 },
987 resolver: target_db.clone(),
988 apply_batch_size: 1024,
989 max_outstanding_requests: 10,
990 update_rx: Some(update_receiver),
991 };
992 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
993
994 update_sender
996 .send(Target {
997 root: initial_root,
998 range: initial_upper_bound..initial_lower_bound,
999 })
1000 .await
1001 .unwrap();
1002
1003 let result = client.step().await;
1004 assert!(matches!(
1005 result,
1006 Err(sync::Error::Engine(sync::EngineError::InvalidTarget { .. }))
1007 ));
1008
1009 let target_db =
1010 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1011 let inner = target_db.into_inner();
1012 inner.destroy().await.unwrap();
1013 });
1014 }
1015
1016 #[test_traced("WARN")]
1018 fn test_target_update_on_done_client() {
1019 let executor = deterministic::Runner::default();
1020 executor.start(|mut context| async move {
1021 let mut target_db = create_test_db(context.clone()).await;
1023 let target_ops = create_test_ops(10);
1024 apply_ops(&mut target_db, target_ops).await;
1025 target_db.commit(None).await.unwrap();
1026
1027 let lower_bound = target_db.oldest_retained_loc();
1029 let upper_bound = target_db.op_count();
1030 let root = target_db.root();
1031
1032 let (mut update_sender, update_receiver) = mpsc::channel(1);
1034 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1035 let config = Config {
1036 context: context.clone(),
1037 db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1038 fetch_batch_size: NZU64!(20),
1039 target: Target {
1040 root,
1041 range: lower_bound..upper_bound,
1042 },
1043 resolver: target_db.clone(),
1044 apply_batch_size: 1024,
1045 max_outstanding_requests: 10,
1046 update_rx: Some(update_receiver),
1047 };
1048
1049 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1051
1052 let _ = update_sender
1054 .send(Target {
1055 root: sha256::Digest::from([2u8; 32]),
1056 range: lower_bound + 1..upper_bound + 1,
1057 })
1058 .await;
1059
1060 assert_eq!(synced_db.root(), root);
1062 assert_eq!(synced_db.op_count(), upper_bound);
1063 assert_eq!(synced_db.oldest_retained_loc(), lower_bound);
1064
1065 synced_db.destroy().await.unwrap();
1066 Arc::try_unwrap(target_db)
1067 .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1068 .into_inner()
1069 .destroy()
1070 .await
1071 .unwrap();
1072 });
1073 }
1074}