1use crate::{
2 adb::{immutable, operation::variable::Operation, sync, Error},
3 journal::contiguous::variable,
4 mmr::{Location, StandardHasher as Standard},
5 translator::Translator,
6};
7use commonware_codec::Codec;
8use commonware_cryptography::Hasher;
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::Array;
11use std::ops::Range;
12
13impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
14where
15 E: Storage + Clock + Metrics,
16 K: Array,
17 V: Codec + Send,
18 H: Hasher,
19 T: Translator,
20{
21 type Op = Operation<K, V>;
22 type Journal = variable::Journal<E, Self::Op>;
23 type Hasher = H;
24 type Config = immutable::Config<T, V::Cfg>;
25 type Digest = H::Digest;
26 type Context = E;
27
28 async fn create_journal(
29 context: Self::Context,
30 config: &Self::Config,
31 range: Range<Location>,
32 ) -> Result<Self::Journal, Error> {
33 variable::Journal::init_sync(
35 context.with_label("log"),
36 variable::Config {
37 items_per_section: config.log_items_per_section,
38 partition: config.log_partition.clone(),
39 compression: config.log_compression,
40 codec_config: config.log_codec_config.clone(),
41 buffer_pool: config.buffer_pool.clone(),
42 write_buffer: config.log_write_buffer,
43 },
44 *range.start..*range.end,
45 )
46 .await
47 }
48
49 async fn from_sync_result(
65 context: Self::Context,
66 db_config: Self::Config,
67 journal: Self::Journal,
68 pinned_nodes: Option<Vec<Self::Digest>>,
69 range: Range<Location>,
70 apply_batch_size: usize,
71 ) -> Result<Self, Error> {
72 let sync_config = Config {
73 db_config,
74 log: journal,
75 range,
76 pinned_nodes,
77 apply_batch_size,
78 };
79 Self::init_synced(context, sync_config).await
80 }
81
82 fn root(&self) -> Self::Digest {
83 let mut hasher = Standard::<H>::new();
84 self.root(&mut hasher)
85 }
86
87 async fn resize_journal(
88 mut journal: Self::Journal,
89 context: Self::Context,
90 config: &Self::Config,
91 range: Range<Location>,
92 ) -> Result<Self::Journal, Error> {
93 let size = journal.size();
94
95 if size <= range.start {
96 journal.destroy().await?;
98 Self::create_journal(context, config, range).await
99 } else {
100 journal
102 .prune(*range.start)
103 .await
104 .map_err(crate::adb::Error::from)?;
105
106 let size = journal.size();
108 if size > range.end {
109 return Err(crate::adb::Error::UnexpectedData(Location::new_unchecked(
110 size,
111 )));
112 }
113
114 Ok(journal)
115 }
116 }
117}
118
119pub struct Config<E, K, V, T, D, C>
121where
122 E: Storage + Metrics,
123 K: Array,
124 V: Codec + Send,
125 T: Translator,
126 D: commonware_cryptography::Digest,
127{
128 pub db_config: immutable::Config<T, C>,
130
131 pub log: variable::Journal<E, Operation<K, V>>,
134
135 pub range: Range<Location>,
137
138 pub pinned_nodes: Option<Vec<D>>,
143
144 pub apply_batch_size: usize,
148}
149
150#[cfg(test)]
151mod tests {
152 use crate::{
153 adb::{
154 immutable,
155 operation::variable::Operation,
156 sync::{
157 self,
158 engine::{Config, NextStep},
159 Engine, Target,
160 },
161 },
162 mmr::{Location, StandardHasher as Standard},
163 translator::TwoCap,
164 };
165 use commonware_cryptography::{sha256, Digest, Sha256};
166 use commonware_macros::test_traced;
167 use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
168 use commonware_utils::{NZUsize, NZU64};
169 use futures::{channel::mpsc, SinkExt as _};
170 use rand::{rngs::StdRng, RngCore as _, SeedableRng as _};
171 use std::{
172 collections::HashMap,
173 num::{NonZeroU64, NonZeroUsize},
174 sync::Arc,
175 };
176 use test_case::test_case;
177
178 type ImmutableSyncTest = immutable::Immutable<
180 deterministic::Context,
181 sha256::Digest,
182 sha256::Digest,
183 Sha256,
184 crate::translator::TwoCap,
185 >;
186
187 fn test_hasher() -> Standard<Sha256> {
188 Standard::<Sha256>::new()
189 }
190
191 fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
193 const PAGE_SIZE: NonZeroUsize = NZUsize!(77);
194 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
195 const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
196
197 immutable::Config {
198 mmr_journal_partition: format!("journal_{suffix}"),
199 mmr_metadata_partition: format!("metadata_{suffix}"),
200 mmr_items_per_blob: NZU64!(11),
201 mmr_write_buffer: NZUsize!(1024),
202 log_partition: format!("log_{suffix}"),
203 log_items_per_section: ITEMS_PER_SECTION,
204 log_compression: None,
205 log_codec_config: (),
206 log_write_buffer: NZUsize!(1024),
207 translator: TwoCap,
208 thread_pool: None,
209 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
210 }
211 }
212
213 async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
215 let seed = context.next_u64();
216 let config = create_sync_config(&format!("sync_test_{seed}"));
217 ImmutableSyncTest::init(context, config).await.unwrap()
218 }
219
220 fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
223 let mut rng = StdRng::seed_from_u64(1337);
224 let mut ops = Vec::new();
225 for _i in 0..n {
226 let key = sha256::Digest::random(&mut rng);
227 let value = sha256::Digest::random(&mut rng);
228 ops.push(Operation::Set(key, value));
229 }
230 ops
231 }
232
233 async fn apply_ops(
235 db: &mut ImmutableSyncTest,
236 ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
237 ) {
238 for op in ops {
239 match op {
240 Operation::Set(key, value) => {
241 db.set(key, value).await.unwrap();
242 }
243 Operation::Commit(metadata) => {
244 db.commit(metadata).await.unwrap();
245 }
246 _ => {}
247 }
248 }
249 }
250
251 #[test_case(1, NZU64!(1); "singleton db with batch size == 1")]
252 #[test_case(1, NZU64!(2); "singleton db with batch size > db size")]
253 #[test_case(100, NZU64!(1); "db with batch size 1")]
254 #[test_case(100, NZU64!(3); "db size not evenly divided by batch size")]
255 #[test_case(100, NZU64!(99); "db size not evenly divided by batch size; different batch size")]
256 #[test_case(100, NZU64!(50); "db size divided by batch size")]
257 #[test_case(100, NZU64!(100); "db size == batch size")]
258 #[test_case(100, NZU64!(101); "batch size > db size")]
259 fn test_sync(target_db_ops: usize, fetch_batch_size: NonZeroU64) {
260 let executor = deterministic::Runner::default();
261 executor.start(|mut context| async move {
262 let mut target_db = create_test_db(context.clone()).await;
263 let target_db_ops = create_test_ops(target_db_ops);
264 apply_ops(&mut target_db, target_db_ops.clone()).await;
265 let metadata = Some(Sha256::fill(1));
266 target_db.commit(metadata).await.unwrap();
267 let target_op_count = target_db.op_count();
268 let target_oldest_retained_loc = target_db.oldest_retained_loc().unwrap();
269 let mut hasher = test_hasher();
270 let target_root = target_db.root(&mut hasher);
271
272 let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
274 for op in &target_db_ops {
275 if let Operation::Set(key, value) = op {
276 expected_kvs.insert(*key, *value);
277 }
278 }
279
280 let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
281
282 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
283 let config = Config {
284 db_config: db_config.clone(),
285 fetch_batch_size,
286 target: Target {
287 root: target_root,
288 range: target_oldest_retained_loc..target_op_count,
289 },
290 context: context.clone(),
291 resolver: target_db.clone(),
292 apply_batch_size: 1024,
293 max_outstanding_requests: 1,
294 update_rx: None,
295 };
296 let mut got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
297
298 let mut hasher = test_hasher();
300 assert_eq!(got_db.op_count(), target_op_count);
301 assert_eq!(
302 got_db.oldest_retained_loc().unwrap(),
303 target_oldest_retained_loc
304 );
305
306 assert_eq!(got_db.root(&mut hasher), target_root);
308
309 for (key, expected_value) in &expected_kvs {
311 let synced_value = got_db.get(key).await.unwrap();
312 assert_eq!(synced_value, Some(*expected_value));
313 }
314
315 let mut new_ops = Vec::new();
317 let mut rng = StdRng::seed_from_u64(42);
318 let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
319 for _i in 0..expected_kvs.len() {
320 let key = sha256::Digest::random(&mut rng);
321 let value = sha256::Digest::random(&mut rng);
322 new_ops.push(Operation::Set(key, value));
323 new_kvs.insert(key, value);
324 }
325
326 apply_ops(&mut got_db, new_ops.clone()).await;
328 {
329 let mut target_db = target_db.write().await;
330 apply_ops(&mut target_db, new_ops).await;
331 }
332
333 for (key, expected_value) in &new_kvs {
335 let synced_value = got_db.get(key).await.unwrap();
336 let target_value = {
337 let target_db = target_db.read().await;
338 target_db.get(key).await.unwrap()
339 };
340 assert_eq!(synced_value, Some(*expected_value));
341 assert_eq!(target_value, Some(*expected_value));
342 }
343
344 got_db.destroy().await.unwrap();
345 let target_db = match Arc::try_unwrap(target_db) {
346 Ok(rw_lock) => rw_lock.into_inner(),
347 Err(_) => panic!("Failed to unwrap Arc - still has references"),
348 };
349 target_db.destroy().await.unwrap();
350 });
351 }
352
353 #[test_traced("WARN")]
355 fn test_sync_empty_to_nonempty() {
356 let executor = deterministic::Runner::default();
357 executor.start(|mut context| async move {
358 let mut target_db = create_test_db(context.clone()).await;
360 target_db.commit(Some(Sha256::fill(1))).await.unwrap(); let target_op_count = target_db.op_count();
363 let target_oldest_retained_loc = target_db.oldest_retained_loc().unwrap();
364 let mut hasher = test_hasher();
365 let target_root = target_db.root(&mut hasher);
366
367 let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
368 let target_db = Arc::new(RwLock::new(target_db));
369 let config = Config {
370 db_config,
371 fetch_batch_size: NZU64!(10),
372 target: Target {
373 root: target_root,
374 range: target_oldest_retained_loc..target_op_count,
375 },
376 context: context.clone(),
377 resolver: target_db.clone(),
378 apply_batch_size: 1024,
379 max_outstanding_requests: 1,
380 update_rx: None,
381 };
382 let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
383
384 let mut hasher = test_hasher();
386 assert_eq!(got_db.op_count(), target_op_count);
387 assert_eq!(
388 got_db.oldest_retained_loc().unwrap(),
389 target_oldest_retained_loc
390 );
391 assert_eq!(got_db.root(&mut hasher), target_root);
392 assert_eq!(
393 got_db.get_metadata().await.unwrap(),
394 Some((Location::new_unchecked(0), Some(Sha256::fill(1))))
395 );
396
397 got_db.destroy().await.unwrap();
398 let target_db = match Arc::try_unwrap(target_db) {
399 Ok(rw_lock) => rw_lock.into_inner(),
400 Err(_) => panic!("Failed to unwrap Arc - still has references"),
401 };
402 target_db.destroy().await.unwrap();
403 });
404 }
405
406 #[test_traced("WARN")]
408 fn test_sync_database_persistence() {
409 let executor = deterministic::Runner::default();
410 executor.start(|context| async move {
411 let mut target_db = create_test_db(context.clone()).await;
413 let target_ops = create_test_ops(10);
414 apply_ops(&mut target_db, target_ops.clone()).await;
415 target_db.commit(Some(Sha256::fill(0))).await.unwrap();
416
417 let mut hasher = test_hasher();
419 let target_root = target_db.root(&mut hasher);
420 let lower_bound = target_db.oldest_retained_loc().unwrap();
421 let op_count = target_db.op_count();
422
423 let db_config = create_sync_config("persistence_test");
425 let context_clone = context.clone();
426 let target_db = Arc::new(RwLock::new(target_db));
427 let config = Config {
428 db_config: db_config.clone(),
429 fetch_batch_size: NZU64!(5),
430 target: Target {
431 root: target_root,
432 range: lower_bound..op_count,
433 },
434 context,
435 resolver: target_db.clone(),
436 apply_batch_size: 1024,
437 max_outstanding_requests: 1,
438 update_rx: None,
439 };
440 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
441
442 let mut hasher = test_hasher();
444 assert_eq!(synced_db.root(&mut hasher), target_root);
445
446 let expected_root = synced_db.root(&mut hasher);
448 let expected_op_count = synced_db.op_count();
449 let expected_oldest_retained_loc = synced_db.oldest_retained_loc().unwrap();
450
451 synced_db.close().await.unwrap();
453 let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
454 .await
455 .unwrap();
456
457 let mut hasher = test_hasher();
459 assert_eq!(reopened_db.root(&mut hasher), expected_root);
460 assert_eq!(reopened_db.op_count(), expected_op_count);
461 assert_eq!(
462 reopened_db.oldest_retained_loc().unwrap(),
463 expected_oldest_retained_loc
464 );
465
466 for op in &target_ops {
468 if let Operation::Set(key, value) = op {
469 let stored_value = reopened_db.get(key).await.unwrap();
470 assert_eq!(stored_value, Some(*value));
471 }
472 }
473
474 reopened_db.destroy().await.unwrap();
475 let target_db = match Arc::try_unwrap(target_db) {
476 Ok(rw_lock) => rw_lock.into_inner(),
477 Err(_) => panic!("Failed to unwrap Arc - still has references"),
478 };
479 target_db.destroy().await.unwrap();
480 });
481 }
482
483 #[test_traced("WARN")]
485 fn test_target_update_during_sync() {
486 let executor = deterministic::Runner::default();
487 executor.start(|mut context| async move {
488 let mut target_db = create_test_db(context.clone()).await;
490 let initial_ops = create_test_ops(50);
491 apply_ops(&mut target_db, initial_ops.clone()).await;
492 target_db.commit(None).await.unwrap();
493
494 let mut hasher = test_hasher();
496 let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
497 let initial_upper_bound = target_db.op_count();
498 let initial_root = target_db.root(&mut hasher);
499
500 let additional_ops = create_test_ops(25);
502 apply_ops(&mut target_db, additional_ops.clone()).await;
503 target_db.commit(None).await.unwrap();
504 let final_upper_bound = target_db.op_count();
505 let final_root = target_db.root(&mut hasher);
506
507 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
509
510 let (mut update_sender, update_receiver) = mpsc::channel(1);
512 let client = {
513 let config = Config {
514 context: context.clone(),
515 db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
516 target: Target {
517 root: initial_root,
518 range: initial_lower_bound..initial_upper_bound,
519 },
520 resolver: target_db.clone(),
521 fetch_batch_size: NZU64!(2), max_outstanding_requests: 10,
523 apply_batch_size: 1024,
524 update_rx: Some(update_receiver),
525 };
526 let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
527 loop {
528 client = match client.step().await.unwrap() {
530 NextStep::Continue(new_client) => new_client,
531 NextStep::Complete(_) => panic!("client should not be complete"),
532 };
533 let log_size = client.journal().size();
534 if log_size > initial_lower_bound {
535 break client;
536 }
537 }
538 };
539
540 update_sender
542 .send(Target {
543 root: final_root,
544 range: initial_lower_bound..final_upper_bound,
545 })
546 .await
547 .unwrap();
548
549 let synced_db = client.sync().await.unwrap();
551
552 let mut hasher = test_hasher();
554 assert_eq!(synced_db.root(&mut hasher), final_root);
555
556 let target_db = match Arc::try_unwrap(target_db) {
558 Ok(rw_lock) => rw_lock.into_inner(),
559 Err(_) => panic!("Failed to unwrap Arc - still has references"),
560 };
561 {
562 assert_eq!(synced_db.op_count(), target_db.op_count());
563 assert_eq!(
564 synced_db.oldest_retained_loc(),
565 target_db.oldest_retained_loc()
566 );
567 assert_eq!(synced_db.root(&mut hasher), target_db.root(&mut hasher));
568 }
569
570 let all_ops = [initial_ops, additional_ops].concat();
572 for op in &all_ops {
573 if let Operation::Set(key, value) = op {
574 let synced_value = synced_db.get(key).await.unwrap();
575 assert_eq!(synced_value, Some(*value));
576 }
577 }
578
579 synced_db.destroy().await.unwrap();
580 target_db.destroy().await.unwrap();
581 });
582 }
583
584 #[test]
586 fn test_sync_invalid_bounds() {
587 let executor = deterministic::Runner::default();
588 executor.start(|mut context| async move {
589 let target_db = create_test_db(context.clone()).await;
590 let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
591 let config = Config {
592 db_config,
593 fetch_batch_size: NZU64!(10),
594 target: Target {
595 root: sha256::Digest::from([1u8; 32]),
596 range: Location::new_unchecked(31)..Location::new_unchecked(31),
597 },
598 context,
599 resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
600 apply_batch_size: 1024,
601 max_outstanding_requests: 1,
602 update_rx: None,
603 };
604 let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
605 match result {
606 Err(sync::Error::Engine(sync::EngineError::InvalidTarget {
607 lower_bound_pos,
608 upper_bound_pos,
609 })) => {
610 assert_eq!(lower_bound_pos, Location::new_unchecked(31));
611 assert_eq!(upper_bound_pos, Location::new_unchecked(31));
612 }
613 _ => panic!("Expected InvalidTarget error"),
614 }
615 });
616 }
617
618 #[test]
621 fn test_sync_subset_of_target_database() {
622 let executor = deterministic::Runner::default();
623 executor.start(|mut context| async move {
624 let mut target_db = create_test_db(context.clone()).await;
625 let target_ops = create_test_ops(30);
626 apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
628 target_db.commit(None).await.unwrap();
629
630 let mut hasher = test_hasher();
631 let target_root = target_db.root(&mut hasher);
632 let lower_bound = target_db.oldest_retained_loc().unwrap();
633 let op_count = target_db.op_count();
634
635 apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
637 target_db.commit(None).await.unwrap();
638
639 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
640 let config = Config {
641 db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
642 fetch_batch_size: NZU64!(10),
643 target: Target {
644 root: target_root,
645 range: lower_bound..op_count,
646 },
647 context,
648 resolver: target_db.clone(),
649 apply_batch_size: 1024,
650 max_outstanding_requests: 1,
651 update_rx: None,
652 };
653 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
654
655 let mut hasher = test_hasher();
657 assert_eq!(synced_db.root(&mut hasher), target_root);
658 assert_eq!(synced_db.op_count(), op_count);
659
660 synced_db.destroy().await.unwrap();
661 let target_db =
662 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
663 let inner = target_db.into_inner();
664 inner.destroy().await.unwrap();
665 });
666 }
667
668 #[test]
671 fn test_sync_use_existing_db_partial_match() {
672 let executor = deterministic::Runner::default();
673 executor.start(|mut context| async move {
674 let original_ops = create_test_ops(50);
675
676 let mut target_db = create_test_db(context.clone()).await;
678 let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
679 let mut sync_db: ImmutableSyncTest =
680 immutable::Immutable::init(context.clone(), sync_db_config.clone())
681 .await
682 .unwrap();
683
684 apply_ops(&mut target_db, original_ops.clone()).await;
686 apply_ops(&mut sync_db, original_ops.clone()).await;
687 target_db.commit(None).await.unwrap();
688 sync_db.commit(None).await.unwrap();
689
690 sync_db.close().await.unwrap();
692
693 let last_op = create_test_ops(1);
695 apply_ops(&mut target_db, last_op.clone()).await;
696 target_db.commit(None).await.unwrap();
697 let mut hasher = test_hasher();
698 let root = target_db.root(&mut hasher);
699 let lower_bound = target_db.oldest_retained_loc().unwrap();
700 let upper_bound = target_db.op_count(); let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
704 let config = Config {
705 db_config: sync_db_config, fetch_batch_size: NZU64!(10),
707 target: Target {
708 root,
709 range: lower_bound..upper_bound,
710 },
711 context: context.clone(),
712 resolver: target_db.clone(),
713 apply_batch_size: 1024,
714 max_outstanding_requests: 1,
715 update_rx: None,
716 };
717 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
718
719 let mut hasher = test_hasher();
721 assert_eq!(sync_db.op_count(), upper_bound);
722 assert_eq!(sync_db.root(&mut hasher), root);
723
724 sync_db.destroy().await.unwrap();
725 let target_db =
726 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
727 let inner = target_db.into_inner();
728 inner.destroy().await.unwrap();
729 });
730 }
731
732 #[test]
734 fn test_sync_use_existing_db_exact_match() {
735 let executor = deterministic::Runner::default();
736 executor.start(|mut context| async move {
737 let target_ops = create_test_ops(40);
738
739 let mut target_db = create_test_db(context.clone()).await;
741 let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
742 let mut sync_db: ImmutableSyncTest =
743 immutable::Immutable::init(context.clone(), sync_config.clone())
744 .await
745 .unwrap();
746
747 apply_ops(&mut target_db, target_ops.clone()).await;
749 apply_ops(&mut sync_db, target_ops.clone()).await;
750 target_db.commit(None).await.unwrap();
751 sync_db.commit(None).await.unwrap();
752
753 sync_db.close().await.unwrap();
755
756 let mut hasher = test_hasher();
758 let root = target_db.root(&mut hasher);
759 let lower_bound = target_db.oldest_retained_loc().unwrap();
760 let upper_bound = target_db.op_count();
761
762 let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
764 let config = Config {
765 db_config: sync_config,
766 fetch_batch_size: NZU64!(10),
767 target: Target {
768 root,
769 range: lower_bound..upper_bound,
770 },
771 context,
772 resolver: resolver.clone(),
773 apply_batch_size: 1024,
774 max_outstanding_requests: 1,
775 update_rx: None,
776 };
777 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
778
779 assert_eq!(sync_db.op_count(), upper_bound);
780 let mut hasher = test_hasher();
781 assert_eq!(sync_db.root(&mut hasher), root);
782
783 sync_db.destroy().await.unwrap();
784 let target_db =
785 Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
786 let inner = target_db.into_inner();
787 inner.destroy().await.unwrap();
788 });
789 }
790
791 #[test_traced("WARN")]
793 fn test_target_update_lower_bound_decrease() {
794 let executor = deterministic::Runner::default();
795 executor.start(|mut context| async move {
796 let mut target_db = create_test_db(context.clone()).await;
798 let target_ops = create_test_ops(100);
799 apply_ops(&mut target_db, target_ops).await;
800 target_db.commit(None).await.unwrap();
801
802 target_db.prune(Location::new_unchecked(10)).await.unwrap();
803
804 let mut hasher = test_hasher();
806 let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
807 let initial_upper_bound = target_db.op_count();
808 let initial_root = target_db.root(&mut hasher);
809
810 let (mut update_sender, update_receiver) = mpsc::channel(1);
812 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
813 let config = Config {
814 context: context.clone(),
815 db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
816 fetch_batch_size: NZU64!(5),
817 target: Target {
818 root: initial_root,
819 range: initial_lower_bound..initial_upper_bound,
820 },
821 resolver: target_db.clone(),
822 apply_batch_size: 1024,
823 max_outstanding_requests: 10,
824 update_rx: Some(update_receiver),
825 };
826 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
827
828 update_sender
830 .send(Target {
831 root: initial_root,
832 range: initial_lower_bound.checked_sub(1).unwrap()..initial_upper_bound,
833 })
834 .await
835 .unwrap();
836
837 let result = client.step().await;
838 assert!(matches!(
839 result,
840 Err(sync::Error::Engine(
841 sync::EngineError::SyncTargetMovedBackward { .. }
842 ))
843 ));
844
845 let target_db =
846 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
847 let inner = target_db.into_inner();
848 inner.destroy().await.unwrap();
849 });
850 }
851
852 #[test_traced("WARN")]
854 fn test_target_update_upper_bound_decrease() {
855 let executor = deterministic::Runner::default();
856 executor.start(|mut context| async move {
857 let mut target_db = create_test_db(context.clone()).await;
859 let target_ops = create_test_ops(50);
860 apply_ops(&mut target_db, target_ops).await;
861 target_db.commit(None).await.unwrap();
862
863 let mut hasher = test_hasher();
865 let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
866 let initial_upper_bound = target_db.op_count();
867 let initial_root = target_db.root(&mut hasher);
868
869 let (mut update_sender, update_receiver) = mpsc::channel(1);
871 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
872 let config = Config {
873 context: context.clone(),
874 db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
875 fetch_batch_size: NZU64!(5),
876 target: Target {
877 root: initial_root,
878 range: initial_lower_bound..initial_upper_bound,
879 },
880 resolver: target_db.clone(),
881 apply_batch_size: 1024,
882 max_outstanding_requests: 10,
883 update_rx: Some(update_receiver),
884 };
885 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
886
887 update_sender
889 .send(Target {
890 root: initial_root,
891 range: initial_lower_bound..(initial_upper_bound - 1),
892 })
893 .await
894 .unwrap();
895
896 let result = client.step().await;
897 assert!(matches!(
898 result,
899 Err(sync::Error::Engine(
900 sync::EngineError::SyncTargetMovedBackward { .. }
901 ))
902 ));
903
904 let target_db =
905 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
906 let inner = target_db.into_inner();
907 inner.destroy().await.unwrap();
908 });
909 }
910
911 #[test_traced("WARN")]
913 fn test_target_update_bounds_increase() {
914 let executor = deterministic::Runner::default();
915 executor.start(|mut context| async move {
916 let mut target_db = create_test_db(context.clone()).await;
918 let target_ops = create_test_ops(100);
919 apply_ops(&mut target_db, target_ops.clone()).await;
920 target_db.commit(None).await.unwrap();
921
922 let mut hasher = test_hasher();
924 let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
925 let initial_upper_bound = target_db.op_count();
926 let initial_root = target_db.root(&mut hasher);
927
928 let more_ops = create_test_ops(5);
930 apply_ops(&mut target_db, more_ops.clone()).await;
931 target_db.commit(None).await.unwrap();
932
933 target_db.prune(Location::new_unchecked(10)).await.unwrap();
934 target_db.commit(None).await.unwrap();
935
936 let mut hasher = test_hasher();
938 let final_lower_bound = target_db.oldest_retained_loc().unwrap();
939 let final_upper_bound = target_db.op_count();
940 let final_root = target_db.root(&mut hasher);
941
942 assert_ne!(final_lower_bound, initial_lower_bound);
944 assert_ne!(final_upper_bound, initial_upper_bound);
945
946 let (mut update_sender, update_receiver) = mpsc::channel(1);
948 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
949 let config = Config {
950 context: context.clone(),
951 db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
952 fetch_batch_size: NZU64!(1),
953 target: Target {
954 root: initial_root,
955 range: initial_lower_bound..initial_upper_bound,
956 },
957 resolver: target_db.clone(),
958 apply_batch_size: 1024,
959 max_outstanding_requests: 1,
960 update_rx: Some(update_receiver),
961 };
962
963 update_sender
965 .send(Target {
966 root: final_root,
967 range: final_lower_bound..final_upper_bound,
968 })
969 .await
970 .unwrap();
971
972 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
974
975 let mut hasher = test_hasher();
977 assert_eq!(synced_db.root(&mut hasher), final_root);
978 assert_eq!(synced_db.op_count(), final_upper_bound);
979 assert_eq!(synced_db.oldest_retained_loc().unwrap(), final_lower_bound);
980
981 synced_db.destroy().await.unwrap();
982 let target_db =
983 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
984 let inner = target_db.into_inner();
985 inner.destroy().await.unwrap();
986 });
987 }
988
989 #[test_traced("WARN")]
991 fn test_target_update_invalid_bounds() {
992 let executor = deterministic::Runner::default();
993 executor.start(|mut context| async move {
994 let mut target_db = create_test_db(context.clone()).await;
996 let target_ops = create_test_ops(25);
997 apply_ops(&mut target_db, target_ops).await;
998 target_db.commit(None).await.unwrap();
999
1000 let mut hasher = test_hasher();
1002 let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
1003 let initial_upper_bound = target_db.op_count();
1004 let initial_root = target_db.root(&mut hasher);
1005
1006 let (mut update_sender, update_receiver) = mpsc::channel(1);
1008 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1009 let config = Config {
1010 context: context.clone(),
1011 db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
1012 fetch_batch_size: NZU64!(5),
1013 target: Target {
1014 root: initial_root,
1015 range: initial_lower_bound..initial_upper_bound,
1016 },
1017 resolver: target_db.clone(),
1018 apply_batch_size: 1024,
1019 max_outstanding_requests: 10,
1020 update_rx: Some(update_receiver),
1021 };
1022 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
1023
1024 update_sender
1026 .send(Target {
1027 root: initial_root,
1028 range: initial_upper_bound..initial_lower_bound,
1029 })
1030 .await
1031 .unwrap();
1032
1033 let result = client.step().await;
1034 assert!(matches!(
1035 result,
1036 Err(sync::Error::Engine(sync::EngineError::InvalidTarget { .. }))
1037 ));
1038
1039 let target_db =
1040 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1041 let inner = target_db.into_inner();
1042 inner.destroy().await.unwrap();
1043 });
1044 }
1045
1046 #[test_traced("WARN")]
1048 fn test_target_update_on_done_client() {
1049 let executor = deterministic::Runner::default();
1050 executor.start(|mut context| async move {
1051 let mut target_db = create_test_db(context.clone()).await;
1053 let target_ops = create_test_ops(10);
1054 apply_ops(&mut target_db, target_ops).await;
1055 target_db.commit(None).await.unwrap();
1056
1057 let mut hasher = test_hasher();
1059 let lower_bound = target_db.oldest_retained_loc().unwrap();
1060 let upper_bound = target_db.op_count();
1061 let root = target_db.root(&mut hasher);
1062
1063 let (mut update_sender, update_receiver) = mpsc::channel(1);
1065 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1066 let config = Config {
1067 context: context.clone(),
1068 db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1069 fetch_batch_size: NZU64!(20),
1070 target: Target {
1071 root,
1072 range: lower_bound..upper_bound,
1073 },
1074 resolver: target_db.clone(),
1075 apply_batch_size: 1024,
1076 max_outstanding_requests: 10,
1077 update_rx: Some(update_receiver),
1078 };
1079
1080 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1082
1083 let _ = update_sender
1085 .send(Target {
1086 root: sha256::Digest::from([2u8; 32]),
1087 range: lower_bound + 1..upper_bound + 1,
1088 })
1089 .await;
1090
1091 let mut hasher = test_hasher();
1093 assert_eq!(synced_db.root(&mut hasher), root);
1094 assert_eq!(synced_db.op_count(), upper_bound);
1095 assert_eq!(synced_db.oldest_retained_loc().unwrap(), lower_bound);
1096
1097 synced_db.destroy().await.unwrap();
1098 Arc::try_unwrap(target_db)
1099 .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1100 .into_inner()
1101 .destroy()
1102 .await
1103 .unwrap();
1104 });
1105 }
1106}