1use crate::{
2 adb::{
3 any::variable::sync::init_journal,
4 immutable,
5 sync::{self, Journal as _},
6 },
7 journal::variable,
8 mmr::hasher::Standard,
9 store::operation::Variable,
10 translator::Translator,
11};
12use commonware_codec::Codec;
13use commonware_cryptography::Hasher;
14use commonware_runtime::{Clock, Metrics, Storage};
15use commonware_utils::{Array, NZUsize};
16use futures::{pin_mut, StreamExt};
17use std::num::NonZeroU64;
18
19mod journal;
20
21async fn compute_size<E, K, V>(
24 journal: &variable::Journal<E, Variable<K, V>>,
25 items_per_section: NonZeroU64,
26 lower_bound: u64,
27 upper_bound: u64,
28) -> Result<u64, crate::journal::Error>
29where
30 E: Storage + Metrics,
31 K: Array,
32 V: Codec,
33{
34 let items_per_section = items_per_section.get();
35 let mut size = lower_bound;
36 let mut current_section: Option<u64> = None;
37 let mut index_in_section: u64 = 0;
38 let stream = journal.replay(0, 0, NZUsize!(1024)).await?;
39 pin_mut!(stream);
40 while let Some(item) = stream.next().await {
41 match item {
42 Ok((section, _offset, _size, _op)) => {
43 if current_section != Some(section) {
44 current_section = Some(section);
45 index_in_section = 0;
46 }
47 let loc = section
48 .saturating_mul(items_per_section)
49 .saturating_add(index_in_section);
50 if loc < lower_bound {
51 index_in_section = index_in_section.saturating_add(1);
52 continue;
53 }
54 if loc > upper_bound {
55 return Ok(size);
56 }
57 size = loc.saturating_add(1);
58 index_in_section = index_in_section.saturating_add(1);
59 }
60 Err(e) => return Err(e),
61 }
62 }
63 Ok(size)
64}
65
66impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
67where
68 E: Storage + Clock + Metrics,
69 K: Array,
70 V: Codec,
71 H: Hasher,
72 T: Translator,
73{
74 type Op = Variable<K, V>;
75 type Journal = journal::Journal<E, K, V>;
76 type Hasher = H;
77 type Error = crate::adb::Error;
78 type Config = immutable::Config<T, V::Cfg>;
79 type Digest = H::Digest;
80 type Context = E;
81
82 async fn create_journal(
83 context: Self::Context,
84 config: &Self::Config,
85 lower_bound_loc: u64,
86 upper_bound_loc: u64,
87 ) -> Result<Self::Journal, <Self::Journal as sync::Journal>::Error> {
88 let journal = init_journal(
90 context.with_label("log"),
91 variable::Config {
92 partition: config.log_journal_partition.clone(),
93 compression: config.log_compression,
94 codec_config: config.log_codec_config.clone(),
95 write_buffer: config.log_write_buffer,
96 buffer_pool: config.buffer_pool.clone(),
97 },
98 lower_bound_loc,
99 upper_bound_loc,
100 config.log_items_per_section,
101 )
102 .await?;
103
104 let size = compute_size(
106 &journal,
107 config.log_items_per_section,
108 lower_bound_loc,
109 upper_bound_loc,
110 )
111 .await?;
112
113 Ok(journal::Journal::new(
114 journal,
115 config.log_items_per_section,
116 size,
117 ))
118 }
119
120 async fn from_sync_result(
136 context: Self::Context,
137 db_config: Self::Config,
138 journal: Self::Journal,
139 pinned_nodes: Option<Vec<Self::Digest>>,
140 lower_bound: u64,
141 upper_bound: u64,
142 apply_batch_size: usize,
143 ) -> Result<Self, Self::Error> {
144 let journal = journal.into_inner();
145 let sync_config = Config {
146 db_config,
147 log: journal,
148 lower_bound,
149 upper_bound,
150 pinned_nodes,
151 apply_batch_size,
152 };
153 Self::init_synced(context, sync_config).await
154 }
155
156 fn root(&self) -> Self::Digest {
157 let mut hasher = Standard::<H>::new();
158 self.root(&mut hasher)
159 }
160
161 async fn resize_journal(
162 journal: Self::Journal,
163 context: Self::Context,
164 config: &Self::Config,
165 lower_bound: u64,
166 upper_bound: u64,
167 ) -> Result<Self::Journal, Self::Error> {
168 let size = journal.size().await.map_err(crate::adb::Error::from)?;
169
170 if size <= lower_bound {
171 journal.close().await.map_err(crate::adb::Error::from)?;
173 Self::create_journal(context, config, lower_bound, upper_bound)
174 .await
175 .map_err(crate::adb::Error::from)
176 } else {
177 let mut variable_journal = journal.into_inner();
179
180 let items_per_section = config.log_items_per_section.get();
182 let lower_section = lower_bound / items_per_section;
183 variable_journal
184 .prune(lower_section)
185 .await
186 .map_err(crate::adb::Error::from)?;
187
188 let size = compute_size(
190 &variable_journal,
191 config.log_items_per_section,
192 lower_bound,
193 upper_bound,
194 )
195 .await
196 .map_err(crate::adb::Error::from)?;
197
198 Ok(journal::Journal::new(
199 variable_journal,
200 config.log_items_per_section,
201 size,
202 ))
203 }
204 }
205}
206
207pub struct Config<E, K, V, T, D, C>
209where
210 E: Storage + Metrics,
211 K: Array,
212 V: Codec,
213 T: Translator,
214 D: commonware_cryptography::Digest,
215{
216 pub db_config: immutable::Config<T, C>,
218
219 pub log: variable::Journal<E, Variable<K, V>>,
223
224 pub lower_bound: u64,
226
227 pub upper_bound: u64,
229
230 pub pinned_nodes: Option<Vec<D>>,
235
236 pub apply_batch_size: usize,
240}
241
242#[cfg(test)]
243mod tests {
244 use crate::{
245 adb::{
246 immutable,
247 sync::{
248 self,
249 engine::{Config, NextStep},
250 Engine, Journal, Target,
251 },
252 },
253 mmr::hasher::Standard,
254 store::operation::Variable,
255 translator::TwoCap,
256 };
257 use commonware_cryptography::{sha256, Digest, Sha256};
258 use commonware_macros::test_traced;
259 use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
260 use commonware_utils::{NZUsize, NZU64};
261 use futures::{channel::mpsc, SinkExt as _};
262 use rand::{rngs::StdRng, RngCore as _, SeedableRng as _};
263 use std::{
264 collections::HashMap,
265 num::{NonZeroU64, NonZeroUsize},
266 sync::Arc,
267 };
268 use test_case::test_case;
269
270 type ImmutableSyncTest = immutable::Immutable<
272 deterministic::Context,
273 sha256::Digest,
274 sha256::Digest,
275 Sha256,
276 crate::translator::TwoCap,
277 >;
278
279 fn test_hasher() -> Standard<Sha256> {
280 Standard::<Sha256>::new()
281 }
282
283 fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
285 const PAGE_SIZE: NonZeroUsize = NZUsize!(77);
286 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
287 const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
288
289 immutable::Config {
290 mmr_journal_partition: format!("journal_{suffix}"),
291 mmr_metadata_partition: format!("metadata_{suffix}"),
292 mmr_items_per_blob: NZU64!(11),
293 mmr_write_buffer: NZUsize!(1024),
294 log_journal_partition: format!("log_journal_{suffix}"),
295 log_items_per_section: ITEMS_PER_SECTION,
296 log_compression: None,
297 log_codec_config: (),
298 log_write_buffer: NZUsize!(1024),
299 locations_journal_partition: format!("locations_journal_{suffix}"),
300 locations_items_per_blob: NZU64!(7),
301 translator: TwoCap,
302 thread_pool: None,
303 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
304 }
305 }
306
307 async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
309 let seed = context.next_u64();
310 let config = create_sync_config(&format!("sync_test_{seed}"));
311 ImmutableSyncTest::init(context, config).await.unwrap()
312 }
313
314 fn create_test_ops(n: usize) -> Vec<Variable<sha256::Digest, sha256::Digest>> {
317 let mut rng = StdRng::seed_from_u64(1337);
318 let mut ops = Vec::new();
319 for _i in 0..n {
320 let key = sha256::Digest::random(&mut rng);
321 let value = sha256::Digest::random(&mut rng);
322 ops.push(Variable::Set(key, value));
323 }
324 ops
325 }
326
327 async fn apply_ops(
329 db: &mut ImmutableSyncTest,
330 ops: Vec<Variable<sha256::Digest, sha256::Digest>>,
331 ) {
332 for op in ops {
333 match op {
334 Variable::Set(key, value) => {
335 db.set(key, value).await.unwrap();
336 }
337 Variable::Commit(metadata) => {
338 db.commit(metadata).await.unwrap();
339 }
340 _ => {}
341 }
342 }
343 }
344
345 #[test_case(1, NZU64!(1); "singleton db with batch size == 1")]
346 #[test_case(1, NZU64!(2); "singleton db with batch size > db size")]
347 #[test_case(100, NZU64!(1); "db with batch size 1")]
348 #[test_case(100, NZU64!(3); "db size not evenly divided by batch size")]
349 #[test_case(100, NZU64!(99); "db size not evenly divided by batch size; different batch size")]
350 #[test_case(100, NZU64!(50); "db size divided by batch size")]
351 #[test_case(100, NZU64!(100); "db size == batch size")]
352 #[test_case(100, NZU64!(101); "batch size > db size")]
353 fn test_sync(target_db_ops: usize, fetch_batch_size: NonZeroU64) {
354 let executor = deterministic::Runner::default();
355 executor.start(|mut context| async move {
356 let mut target_db = create_test_db(context.clone()).await;
357 let target_db_ops = create_test_ops(target_db_ops);
358 apply_ops(&mut target_db, target_db_ops.clone()).await;
359 let metadata = Some(Sha256::fill(1));
360 target_db.commit(metadata).await.unwrap();
361 let target_op_count = target_db.op_count();
362 let target_oldest_retained_loc = target_db.oldest_retained_loc;
363 let mut hasher = test_hasher();
364 let target_root = target_db.root(&mut hasher);
365
366 let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
368 for op in &target_db_ops {
369 if let Variable::Set(key, value) = op {
370 expected_kvs.insert(*key, *value);
371 }
372 }
373
374 let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
375
376 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
377 let config = Config {
378 db_config: db_config.clone(),
379 fetch_batch_size,
380 target: Target {
381 root: target_root,
382 lower_bound_ops: target_oldest_retained_loc,
383 upper_bound_ops: target_op_count - 1, },
385 context: context.clone(),
386 resolver: target_db.clone(),
387 apply_batch_size: 1024,
388 max_outstanding_requests: 1,
389 update_rx: None,
390 };
391 let mut got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
392
393 let mut hasher = test_hasher();
395 assert_eq!(got_db.op_count(), target_op_count);
396 assert_eq!(got_db.oldest_retained_loc, target_oldest_retained_loc);
397
398 assert_eq!(got_db.root(&mut hasher), target_root);
400
401 for (key, expected_value) in &expected_kvs {
403 let synced_value = got_db.get(key).await.unwrap();
404 assert_eq!(synced_value, Some(*expected_value));
405 }
406
407 let mut new_ops = Vec::new();
409 let mut rng = StdRng::seed_from_u64(42);
410 let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
411 for _i in 0..expected_kvs.len() {
412 let key = sha256::Digest::random(&mut rng);
413 let value = sha256::Digest::random(&mut rng);
414 new_ops.push(Variable::Set(key, value));
415 new_kvs.insert(key, value);
416 }
417
418 apply_ops(&mut got_db, new_ops.clone()).await;
420 {
421 let mut target_db = target_db.write().await;
422 apply_ops(&mut target_db, new_ops).await;
423 }
424
425 for (key, expected_value) in &new_kvs {
427 let synced_value = got_db.get(key).await.unwrap();
428 let target_value = {
429 let target_db = target_db.read().await;
430 target_db.get(key).await.unwrap()
431 };
432 assert_eq!(synced_value, Some(*expected_value));
433 assert_eq!(target_value, Some(*expected_value));
434 }
435
436 got_db.destroy().await.unwrap();
437 let target_db = match Arc::try_unwrap(target_db) {
438 Ok(rw_lock) => rw_lock.into_inner(),
439 Err(_) => panic!("Failed to unwrap Arc - still has references"),
440 };
441 target_db.destroy().await.unwrap();
442 });
443 }
444
445 #[test_traced("WARN")]
447 fn test_sync_empty_to_nonempty() {
448 let executor = deterministic::Runner::default();
449 executor.start(|mut context| async move {
450 let mut target_db = create_test_db(context.clone()).await;
452 target_db.commit(Some(Sha256::fill(1))).await.unwrap(); let target_op_count = target_db.op_count();
455 let target_oldest_retained_loc = target_db.oldest_retained_loc;
456 let mut hasher = test_hasher();
457 let target_root = target_db.root(&mut hasher);
458
459 let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
460 let target_db = Arc::new(RwLock::new(target_db));
461 let config = Config {
462 db_config,
463 fetch_batch_size: NZU64!(10),
464 target: Target {
465 root: target_root,
466 lower_bound_ops: target_oldest_retained_loc,
467 upper_bound_ops: target_op_count - 1,
468 },
469 context: context.clone(),
470 resolver: target_db.clone(),
471 apply_batch_size: 1024,
472 max_outstanding_requests: 1,
473 update_rx: None,
474 };
475 let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
476
477 let mut hasher = test_hasher();
479 assert_eq!(got_db.op_count(), target_op_count);
480 assert_eq!(got_db.oldest_retained_loc, target_oldest_retained_loc);
481 assert_eq!(got_db.root(&mut hasher), target_root);
482 assert_eq!(
483 got_db.get_metadata().await.unwrap(),
484 Some((0, Some(Sha256::fill(1))))
485 );
486
487 got_db.destroy().await.unwrap();
488 let target_db = match Arc::try_unwrap(target_db) {
489 Ok(rw_lock) => rw_lock.into_inner(),
490 Err(_) => panic!("Failed to unwrap Arc - still has references"),
491 };
492 target_db.destroy().await.unwrap();
493 });
494 }
495
496 #[test_traced("WARN")]
498 fn test_sync_database_persistence() {
499 let executor = deterministic::Runner::default();
500 executor.start(|context| async move {
501 let mut target_db = create_test_db(context.clone()).await;
503 let target_ops = create_test_ops(10);
504 apply_ops(&mut target_db, target_ops.clone()).await;
505 target_db.commit(Some(Sha256::fill(0))).await.unwrap();
506
507 let mut hasher = test_hasher();
509 let target_root = target_db.root(&mut hasher);
510 let lower_bound = target_db.oldest_retained_loc;
511 let upper_bound = target_db.op_count() - 1;
512
513 let db_config = create_sync_config("persistence_test");
515 let context_clone = context.clone();
516 let target_db = Arc::new(RwLock::new(target_db));
517 let config = Config {
518 db_config: db_config.clone(),
519 fetch_batch_size: NZU64!(5),
520 target: Target {
521 root: target_root,
522 lower_bound_ops: lower_bound,
523 upper_bound_ops: upper_bound,
524 },
525 context,
526 resolver: target_db.clone(),
527 apply_batch_size: 1024,
528 max_outstanding_requests: 1,
529 update_rx: None,
530 };
531 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
532
533 let mut hasher = test_hasher();
535 assert_eq!(synced_db.root(&mut hasher), target_root);
536
537 let expected_root = synced_db.root(&mut hasher);
539 let expected_op_count = synced_db.op_count();
540 let expected_oldest_retained_loc = synced_db.oldest_retained_loc;
541
542 synced_db.close().await.unwrap();
544 let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
545 .await
546 .unwrap();
547
548 let mut hasher = test_hasher();
550 assert_eq!(reopened_db.root(&mut hasher), expected_root);
551 assert_eq!(reopened_db.op_count(), expected_op_count);
552 assert_eq!(
553 reopened_db.oldest_retained_loc,
554 expected_oldest_retained_loc
555 );
556
557 for op in &target_ops {
559 if let Variable::Set(key, value) = op {
560 let stored_value = reopened_db.get(key).await.unwrap();
561 assert_eq!(stored_value, Some(*value));
562 }
563 }
564
565 reopened_db.destroy().await.unwrap();
566 let target_db = match Arc::try_unwrap(target_db) {
567 Ok(rw_lock) => rw_lock.into_inner(),
568 Err(_) => panic!("Failed to unwrap Arc - still has references"),
569 };
570 target_db.destroy().await.unwrap();
571 });
572 }
573
574 #[test_traced("WARN")]
576 fn test_target_update_during_sync() {
577 let executor = deterministic::Runner::default();
578 executor.start(|mut context| async move {
579 let mut target_db = create_test_db(context.clone()).await;
581 let initial_ops = create_test_ops(50);
582 apply_ops(&mut target_db, initial_ops.clone()).await;
583 target_db.commit(None).await.unwrap();
584
585 let mut hasher = test_hasher();
587 let initial_lower_bound = target_db.oldest_retained_loc;
588 let initial_upper_bound = target_db.op_count() - 1;
589 let initial_root = target_db.root(&mut hasher);
590
591 let additional_ops = create_test_ops(25);
593 apply_ops(&mut target_db, additional_ops.clone()).await;
594 target_db.commit(None).await.unwrap();
595 let final_upper_bound = target_db.op_count() - 1;
596 let final_root = target_db.root(&mut hasher);
597
598 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
600
601 let (mut update_sender, update_receiver) = mpsc::channel(1);
603 let client = {
604 let config = Config {
605 context: context.clone(),
606 db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
607 target: Target {
608 root: initial_root,
609 lower_bound_ops: initial_lower_bound,
610 upper_bound_ops: initial_upper_bound,
611 },
612 resolver: target_db.clone(),
613 fetch_batch_size: NZU64!(2), max_outstanding_requests: 10,
615 apply_batch_size: 1024,
616 update_rx: Some(update_receiver),
617 };
618 let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
619 loop {
620 client = match client.step().await.unwrap() {
622 NextStep::Continue(new_client) => new_client,
623 NextStep::Complete(_) => panic!("client should not be complete"),
624 };
625 let log_size = client.journal().size().await.unwrap();
626 if log_size > initial_lower_bound {
627 break client;
628 }
629 }
630 };
631
632 update_sender
634 .send(Target {
635 root: final_root,
636 lower_bound_ops: initial_lower_bound,
637 upper_bound_ops: final_upper_bound,
638 })
639 .await
640 .unwrap();
641
642 let synced_db = client.sync().await.unwrap();
644
645 let mut hasher = test_hasher();
647 assert_eq!(synced_db.root(&mut hasher), final_root);
648
649 let target_db = match Arc::try_unwrap(target_db) {
651 Ok(rw_lock) => rw_lock.into_inner(),
652 Err(_) => panic!("Failed to unwrap Arc - still has references"),
653 };
654 {
655 assert_eq!(synced_db.op_count(), target_db.op_count());
656 assert_eq!(synced_db.oldest_retained_loc, target_db.oldest_retained_loc);
657 assert_eq!(synced_db.root(&mut hasher), target_db.root(&mut hasher));
658 }
659
660 let all_ops = [initial_ops, additional_ops].concat();
662 for op in &all_ops {
663 if let Variable::Set(key, value) = op {
664 let synced_value = synced_db.get(key).await.unwrap();
665 assert_eq!(synced_value, Some(*value));
666 }
667 }
668
669 synced_db.destroy().await.unwrap();
670 target_db.destroy().await.unwrap();
671 });
672 }
673
674 #[test]
676 fn test_sync_invalid_bounds() {
677 let executor = deterministic::Runner::default();
678 executor.start(|mut context| async move {
679 let target_db = create_test_db(context.clone()).await;
680 let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
681 let config = Config {
682 db_config,
683 fetch_batch_size: NZU64!(10),
684 target: Target {
685 root: sha256::Digest::from([1u8; 32]),
686 lower_bound_ops: 31,
687 upper_bound_ops: 30,
688 },
689 context,
690 resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
691 apply_batch_size: 1024,
692 max_outstanding_requests: 1,
693 update_rx: None,
694 };
695 let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
696 assert!(matches!(
697 result,
698 Err(sync::Error::InvalidTarget {
699 lower_bound_pos: 31,
700 upper_bound_pos: 30,
701 })
702 ));
703 });
704 }
705
706 #[test]
709 fn test_sync_subset_of_target_database() {
710 let executor = deterministic::Runner::default();
711 executor.start(|mut context| async move {
712 let mut target_db = create_test_db(context.clone()).await;
713 let target_ops = create_test_ops(30);
714 apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
716 target_db.commit(None).await.unwrap();
717
718 let mut hasher = test_hasher();
719 let target_root = target_db.root(&mut hasher);
720 let lower_bound_ops = target_db.oldest_retained_loc;
721 let upper_bound_ops = target_db.op_count() - 1; apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
725 target_db.commit(None).await.unwrap();
726
727 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
728 let config = Config {
729 db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
730 fetch_batch_size: NZU64!(10),
731 target: Target {
732 root: target_root,
733 lower_bound_ops,
734 upper_bound_ops,
735 },
736 context,
737 resolver: target_db.clone(),
738 apply_batch_size: 1024,
739 max_outstanding_requests: 1,
740 update_rx: None,
741 };
742 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
743
744 let mut hasher = test_hasher();
746 assert_eq!(synced_db.root(&mut hasher), target_root);
747 assert_eq!(synced_db.op_count(), upper_bound_ops + 1);
748
749 synced_db.destroy().await.unwrap();
750 let target_db =
751 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
752 let inner = target_db.into_inner();
753 inner.destroy().await.unwrap();
754 });
755 }
756
757 #[test]
760 fn test_sync_use_existing_db_partial_match() {
761 let executor = deterministic::Runner::default();
762 executor.start(|mut context| async move {
763 let original_ops = create_test_ops(50);
764
765 let mut target_db = create_test_db(context.clone()).await;
767 let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
768 let mut sync_db: ImmutableSyncTest =
769 immutable::Immutable::init(context.clone(), sync_db_config.clone())
770 .await
771 .unwrap();
772
773 apply_ops(&mut target_db, original_ops.clone()).await;
775 apply_ops(&mut sync_db, original_ops.clone()).await;
776 target_db.commit(None).await.unwrap();
777 sync_db.commit(None).await.unwrap();
778
779 sync_db.close().await.unwrap();
781
782 let last_op = create_test_ops(1);
784 apply_ops(&mut target_db, last_op.clone()).await;
785 target_db.commit(None).await.unwrap();
786 let mut hasher = test_hasher();
787 let root = target_db.root(&mut hasher);
788 let lower_bound_ops = target_db.oldest_retained_loc;
789 let upper_bound_ops = target_db.op_count() - 1; let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
793 let config = Config {
794 db_config: sync_db_config, fetch_batch_size: NZU64!(10),
796 target: Target {
797 root,
798 lower_bound_ops,
799 upper_bound_ops,
800 },
801 context: context.clone(),
802 resolver: target_db.clone(),
803 apply_batch_size: 1024,
804 max_outstanding_requests: 1,
805 update_rx: None,
806 };
807 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
808
809 let mut hasher = test_hasher();
811 assert_eq!(sync_db.op_count(), upper_bound_ops + 1);
812 assert_eq!(sync_db.root(&mut hasher), root);
813
814 sync_db.destroy().await.unwrap();
815 let target_db =
816 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
817 let inner = target_db.into_inner();
818 inner.destroy().await.unwrap();
819 });
820 }
821
822 #[test]
824 fn test_sync_use_existing_db_exact_match() {
825 let executor = deterministic::Runner::default();
826 executor.start(|mut context| async move {
827 let target_ops = create_test_ops(40);
828
829 let mut target_db = create_test_db(context.clone()).await;
831 let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
832 let mut sync_db: ImmutableSyncTest =
833 immutable::Immutable::init(context.clone(), sync_config.clone())
834 .await
835 .unwrap();
836
837 apply_ops(&mut target_db, target_ops.clone()).await;
839 apply_ops(&mut sync_db, target_ops.clone()).await;
840 target_db.commit(None).await.unwrap();
841 sync_db.commit(None).await.unwrap();
842
843 sync_db.close().await.unwrap();
845
846 let mut hasher = test_hasher();
848 let root = target_db.root(&mut hasher);
849 let lower_bound_ops = target_db.oldest_retained_loc;
850 let upper_bound_ops = target_db.op_count() - 1;
851
852 let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
854 let config = Config {
855 db_config: sync_config,
856 fetch_batch_size: NZU64!(10),
857 target: Target {
858 root,
859 lower_bound_ops,
860 upper_bound_ops,
861 },
862 context,
863 resolver: resolver.clone(),
864 apply_batch_size: 1024,
865 max_outstanding_requests: 1,
866 update_rx: None,
867 };
868 let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
869
870 assert_eq!(sync_db.op_count(), upper_bound_ops + 1);
871 let mut hasher = test_hasher();
872 assert_eq!(sync_db.root(&mut hasher), root);
873
874 sync_db.destroy().await.unwrap();
875 let target_db =
876 Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
877 let inner = target_db.into_inner();
878 inner.destroy().await.unwrap();
879 });
880 }
881
882 #[test_traced("WARN")]
884 fn test_target_update_lower_bound_decrease() {
885 let executor = deterministic::Runner::default();
886 executor.start(|mut context| async move {
887 let mut target_db = create_test_db(context.clone()).await;
889 let target_ops = create_test_ops(100);
890 apply_ops(&mut target_db, target_ops).await;
891 target_db.commit(None).await.unwrap();
892
893 target_db.prune(10).await.unwrap();
894
895 let mut hasher = test_hasher();
897 let initial_lower_bound = target_db.oldest_retained_loc;
898 let initial_upper_bound = target_db.op_count() - 1;
899 let initial_root = target_db.root(&mut hasher);
900
901 let (mut update_sender, update_receiver) = mpsc::channel(1);
903 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
904 let config = Config {
905 context: context.clone(),
906 db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
907 fetch_batch_size: NZU64!(5),
908 target: Target {
909 root: initial_root,
910 lower_bound_ops: initial_lower_bound,
911 upper_bound_ops: initial_upper_bound,
912 },
913 resolver: target_db.clone(),
914 apply_batch_size: 1024,
915 max_outstanding_requests: 10,
916 update_rx: Some(update_receiver),
917 };
918 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
919
920 update_sender
922 .send(Target {
923 root: initial_root,
924 lower_bound_ops: initial_lower_bound.saturating_sub(1),
925 upper_bound_ops: initial_upper_bound,
926 })
927 .await
928 .unwrap();
929
930 let result = client.step().await;
931 assert!(matches!(
932 result,
933 Err(sync::Error::SyncTargetMovedBackward { .. })
934 ));
935
936 let target_db =
937 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
938 let inner = target_db.into_inner();
939 inner.destroy().await.unwrap();
940 });
941 }
942
943 #[test_traced("WARN")]
945 fn test_target_update_upper_bound_decrease() {
946 let executor = deterministic::Runner::default();
947 executor.start(|mut context| async move {
948 let mut target_db = create_test_db(context.clone()).await;
950 let target_ops = create_test_ops(50);
951 apply_ops(&mut target_db, target_ops).await;
952 target_db.commit(None).await.unwrap();
953
954 let mut hasher = test_hasher();
956 let initial_lower_bound = target_db.oldest_retained_loc;
957 let initial_upper_bound = target_db.op_count() - 1;
958 let initial_root = target_db.root(&mut hasher);
959
960 let (mut update_sender, update_receiver) = mpsc::channel(1);
962 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
963 let config = Config {
964 context: context.clone(),
965 db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
966 fetch_batch_size: NZU64!(5),
967 target: Target {
968 root: initial_root,
969 lower_bound_ops: initial_lower_bound,
970 upper_bound_ops: initial_upper_bound,
971 },
972 resolver: target_db.clone(),
973 apply_batch_size: 1024,
974 max_outstanding_requests: 10,
975 update_rx: Some(update_receiver),
976 };
977 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
978
979 update_sender
981 .send(Target {
982 root: initial_root,
983 lower_bound_ops: initial_lower_bound,
984 upper_bound_ops: initial_upper_bound.saturating_sub(1),
985 })
986 .await
987 .unwrap();
988
989 let result = client.step().await;
990 assert!(matches!(
991 result,
992 Err(sync::Error::SyncTargetMovedBackward { .. })
993 ));
994
995 let target_db =
996 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
997 let inner = target_db.into_inner();
998 inner.destroy().await.unwrap();
999 });
1000 }
1001
1002 #[test_traced("WARN")]
1004 fn test_target_update_bounds_increase() {
1005 let executor = deterministic::Runner::default();
1006 executor.start(|mut context| async move {
1007 let mut target_db = create_test_db(context.clone()).await;
1009 let target_ops = create_test_ops(100);
1010 apply_ops(&mut target_db, target_ops.clone()).await;
1011 target_db.commit(None).await.unwrap();
1012
1013 let mut hasher = test_hasher();
1015 let initial_lower_bound = target_db.oldest_retained_loc;
1016 let initial_upper_bound = target_db.op_count() - 1;
1017 let initial_root = target_db.root(&mut hasher);
1018
1019 let more_ops = create_test_ops(5);
1021 apply_ops(&mut target_db, more_ops.clone()).await;
1022 target_db.commit(None).await.unwrap();
1023
1024 target_db.prune(10).await.unwrap();
1025 target_db.commit(None).await.unwrap();
1026
1027 let mut hasher = test_hasher();
1029 let final_lower_bound = target_db.oldest_retained_loc;
1030 let final_upper_bound = target_db.op_count() - 1;
1031 let final_root = target_db.root(&mut hasher);
1032
1033 assert_ne!(final_lower_bound, initial_lower_bound);
1035 assert_ne!(final_upper_bound, initial_upper_bound);
1036
1037 let (mut update_sender, update_receiver) = mpsc::channel(1);
1039 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1040 let config = Config {
1041 context: context.clone(),
1042 db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
1043 fetch_batch_size: NZU64!(1),
1044 target: Target {
1045 root: initial_root,
1046 lower_bound_ops: initial_lower_bound,
1047 upper_bound_ops: initial_upper_bound,
1048 },
1049 resolver: target_db.clone(),
1050 apply_batch_size: 1024,
1051 max_outstanding_requests: 1,
1052 update_rx: Some(update_receiver),
1053 };
1054
1055 update_sender
1057 .send(Target {
1058 root: final_root,
1059 lower_bound_ops: final_lower_bound,
1060 upper_bound_ops: final_upper_bound,
1061 })
1062 .await
1063 .unwrap();
1064
1065 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1067
1068 let mut hasher = test_hasher();
1070 assert_eq!(synced_db.root(&mut hasher), final_root);
1071 assert_eq!(synced_db.op_count(), final_upper_bound + 1);
1072 assert_eq!(synced_db.oldest_retained_loc, final_lower_bound);
1073
1074 synced_db.destroy().await.unwrap();
1075 let target_db =
1076 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1077 let inner = target_db.into_inner();
1078 inner.destroy().await.unwrap();
1079 });
1080 }
1081
1082 #[test_traced("WARN")]
1084 fn test_target_update_invalid_bounds() {
1085 let executor = deterministic::Runner::default();
1086 executor.start(|mut context| async move {
1087 let mut target_db = create_test_db(context.clone()).await;
1089 let target_ops = create_test_ops(25);
1090 apply_ops(&mut target_db, target_ops).await;
1091 target_db.commit(None).await.unwrap();
1092
1093 let mut hasher = test_hasher();
1095 let initial_lower_bound = target_db.oldest_retained_loc;
1096 let initial_upper_bound = target_db.op_count() - 1;
1097 let initial_root = target_db.root(&mut hasher);
1098
1099 let (mut update_sender, update_receiver) = mpsc::channel(1);
1101 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1102 let config = Config {
1103 context: context.clone(),
1104 db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
1105 fetch_batch_size: NZU64!(5),
1106 target: Target {
1107 root: initial_root,
1108 lower_bound_ops: initial_lower_bound,
1109 upper_bound_ops: initial_upper_bound,
1110 },
1111 resolver: target_db.clone(),
1112 apply_batch_size: 1024,
1113 max_outstanding_requests: 10,
1114 update_rx: Some(update_receiver),
1115 };
1116 let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
1117
1118 update_sender
1120 .send(Target {
1121 root: initial_root,
1122 lower_bound_ops: initial_upper_bound,
1123 upper_bound_ops: initial_lower_bound,
1124 })
1125 .await
1126 .unwrap();
1127
1128 let result = client.step().await;
1129 assert!(matches!(result, Err(sync::Error::InvalidTarget { .. })));
1130
1131 let target_db =
1132 Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1133 let inner = target_db.into_inner();
1134 inner.destroy().await.unwrap();
1135 });
1136 }
1137
1138 #[test_traced("WARN")]
1140 fn test_target_update_on_done_client() {
1141 let executor = deterministic::Runner::default();
1142 executor.start(|mut context| async move {
1143 let mut target_db = create_test_db(context.clone()).await;
1145 let target_ops = create_test_ops(10);
1146 apply_ops(&mut target_db, target_ops).await;
1147 target_db.commit(None).await.unwrap();
1148
1149 let mut hasher = test_hasher();
1151 let lower_bound = target_db.oldest_retained_loc;
1152 let upper_bound = target_db.op_count() - 1;
1153 let root = target_db.root(&mut hasher);
1154
1155 let (mut update_sender, update_receiver) = mpsc::channel(1);
1157 let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1158 let config = Config {
1159 context: context.clone(),
1160 db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1161 fetch_batch_size: NZU64!(20),
1162 target: Target {
1163 root,
1164 lower_bound_ops: lower_bound,
1165 upper_bound_ops: upper_bound,
1166 },
1167 resolver: target_db.clone(),
1168 apply_batch_size: 1024,
1169 max_outstanding_requests: 10,
1170 update_rx: Some(update_receiver),
1171 };
1172
1173 let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1175
1176 let _ = update_sender
1178 .send(Target {
1179 root: sha256::Digest::from([2u8; 32]),
1180 lower_bound_ops: lower_bound + 1,
1181 upper_bound_ops: upper_bound + 1,
1182 })
1183 .await;
1184
1185 let mut hasher = test_hasher();
1187 assert_eq!(synced_db.root(&mut hasher), root);
1188 assert_eq!(synced_db.op_count(), upper_bound + 1);
1189 assert_eq!(synced_db.oldest_retained_loc, lower_bound);
1190
1191 synced_db.destroy().await.unwrap();
1192 Arc::try_unwrap(target_db)
1193 .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1194 .into_inner()
1195 .destroy()
1196 .await
1197 .unwrap();
1198 });
1199 }
1200}