commonware_storage/qmdb/immutable/sync/
mod.rs

1use crate::{
2    journal::contiguous::variable,
3    mmr::Location,
4    qmdb::{
5        any::VariableValue,
6        immutable::{self, Operation},
7        sync, Error,
8    },
9    translator::Translator,
10};
11use commonware_cryptography::Hasher;
12use commonware_runtime::{Clock, Metrics, Storage};
13use commonware_utils::Array;
14use std::ops::Range;
15
16impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
17where
18    E: Storage + Clock + Metrics,
19    K: Array,
20    V: VariableValue,
21    H: Hasher,
22    T: Translator,
23{
24    type Op = Operation<K, V>;
25    type Journal = variable::Journal<E, Self::Op>;
26    type Hasher = H;
27    type Config = immutable::Config<T, V::Cfg>;
28    type Digest = H::Digest;
29    type Context = E;
30
31    async fn create_journal(
32        context: Self::Context,
33        config: &Self::Config,
34        range: Range<Location>,
35    ) -> Result<Self::Journal, Error> {
36        // Initialize contiguous journal for the sync range
37        variable::Journal::init_sync(
38            context.with_label("log"),
39            variable::Config {
40                items_per_section: config.log_items_per_section,
41                partition: config.log_partition.clone(),
42                compression: config.log_compression,
43                codec_config: config.log_codec_config.clone(),
44                buffer_pool: config.buffer_pool.clone(),
45                write_buffer: config.log_write_buffer,
46            },
47            *range.start..*range.end,
48        )
49        .await
50    }
51
52    /// Returns a [super::Immutable] initialized data collected in the sync process.
53    ///
54    /// # Behavior
55    ///
56    /// This method handles different initialization scenarios based on existing data:
57    /// - If the MMR journal is empty or the last item is before the range start, it creates a
58    ///   fresh MMR from the provided `pinned_nodes`
59    /// - If the MMR journal has data but is incomplete (has length < range end), missing operations
60    ///   from the log are applied to bring it up to the target state
61    /// - If the MMR journal has data beyond the range end, it is rewound to match the sync target
62    ///
63    /// # Returns
64    ///
65    /// A [super::Immutable] db populated with the state from the given range.
66    /// The pruning boundary is set to the range start.
67    async fn from_sync_result(
68        context: Self::Context,
69        db_config: Self::Config,
70        journal: Self::Journal,
71        pinned_nodes: Option<Vec<Self::Digest>>,
72        range: Range<Location>,
73        apply_batch_size: usize,
74    ) -> Result<Self, Error> {
75        let sync_config = Config {
76            db_config,
77            log: journal,
78            range,
79            pinned_nodes,
80            apply_batch_size,
81        };
82        Self::init_synced(context, sync_config).await
83    }
84
85    fn root(&self) -> Self::Digest {
86        self.root()
87    }
88
89    async fn resize_journal(
90        mut journal: Self::Journal,
91        context: Self::Context,
92        config: &Self::Config,
93        range: Range<Location>,
94    ) -> Result<Self::Journal, Error> {
95        let size = journal.size();
96
97        if size <= range.start {
98            // Destroy and recreate
99            journal.destroy().await?;
100            Self::create_journal(context, config, range).await
101        } else {
102            // Prune to range start (position-based, not section-based)
103            journal
104                .prune(*range.start)
105                .await
106                .map_err(crate::qmdb::Error::from)?;
107
108            // Verify size is within range
109            let size = journal.size();
110            if size > range.end {
111                return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
112                    size,
113                )));
114            }
115
116            Ok(journal)
117        }
118    }
119}
120
121/// Configuration for syncing an [immutable::Immutable] to a target state.
122pub struct Config<E, K, V, T, D, C>
123where
124    E: Storage + Metrics,
125    K: Array,
126    V: VariableValue,
127    T: Translator,
128    D: commonware_cryptography::Digest,
129{
130    /// Database configuration.
131    pub db_config: immutable::Config<T, C>,
132
133    /// The [immutable::Immutable]'s log of operations. It has elements within the range.
134    /// Reports the range start as its pruning boundary (oldest retained operation index).
135    pub log: variable::Journal<E, Operation<K, V>>,
136
137    /// Sync range - operations outside this range are pruned or not synced.
138    pub range: Range<Location>,
139
140    /// The pinned nodes the MMR needs at the pruning boundary (range start), in the order
141    /// specified by `Proof::nodes_to_pin`.
142    /// If `None`, the pinned nodes will be computed from the MMR's journal and metadata,
143    /// which are expected to have the necessary pinned nodes.
144    pub pinned_nodes: Option<Vec<D>>,
145
146    /// The maximum number of operations to keep in memory
147    /// before committing the database while applying operations.
148    /// Higher value will cause more memory usage during sync.
149    pub apply_batch_size: usize,
150}
151
152#[cfg(test)]
153mod tests {
154    use crate::{
155        mmr::Location,
156        qmdb::{
157            immutable,
158            immutable::Operation,
159            sync::{
160                self,
161                engine::{Config, NextStep},
162                Engine, Target,
163            },
164        },
165        translator::TwoCap,
166    };
167    use commonware_cryptography::{sha256, Sha256};
168    use commonware_macros::test_traced;
169    use commonware_math::algebra::Random;
170    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
171    use commonware_utils::{NZUsize, NZU64};
172    use futures::{channel::mpsc, SinkExt as _};
173    use rand::{rngs::StdRng, RngCore as _, SeedableRng as _};
174    use rstest::rstest;
175    use std::{
176        collections::HashMap,
177        num::{NonZeroU64, NonZeroUsize},
178        sync::Arc,
179    };
180
181    /// Type alias for sync tests with simple codec config
182    type ImmutableSyncTest = immutable::Immutable<
183        deterministic::Context,
184        sha256::Digest,
185        sha256::Digest,
186        Sha256,
187        crate::translator::TwoCap,
188    >;
189
190    /// Create a simple config for sync tests
191    fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
192        const PAGE_SIZE: NonZeroUsize = NZUsize!(77);
193        const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
194        const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
195
196        immutable::Config {
197            mmr_journal_partition: format!("journal_{suffix}"),
198            mmr_metadata_partition: format!("metadata_{suffix}"),
199            mmr_items_per_blob: NZU64!(11),
200            mmr_write_buffer: NZUsize!(1024),
201            log_partition: format!("log_{suffix}"),
202            log_items_per_section: ITEMS_PER_SECTION,
203            log_compression: None,
204            log_codec_config: (),
205            log_write_buffer: NZUsize!(1024),
206            translator: TwoCap,
207            thread_pool: None,
208            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
209        }
210    }
211
212    /// Create a test database with unique partition names
213    async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
214        let seed = context.next_u64();
215        let config = create_sync_config(&format!("sync_test_{seed}"));
216        ImmutableSyncTest::init(context, config).await.unwrap()
217    }
218
219    /// Create n random Set operations.
220    /// create_test_ops(n') is a suffix of create_test_ops(n) for n' > n.
221    fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
222        let mut rng = StdRng::seed_from_u64(1337);
223        let mut ops = Vec::new();
224        for _i in 0..n {
225            let key = sha256::Digest::random(&mut rng);
226            let value = sha256::Digest::random(&mut rng);
227            ops.push(Operation::Set(key, value));
228        }
229        ops
230    }
231
232    /// Applies the given operations to the database.
233    async fn apply_ops(
234        db: &mut ImmutableSyncTest,
235        ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
236    ) {
237        for op in ops {
238            match op {
239                Operation::Set(key, value) => {
240                    db.set(key, value).await.unwrap();
241                }
242                Operation::Commit(metadata) => {
243                    db.commit(metadata).await.unwrap();
244                }
245            }
246        }
247    }
248
249    #[rstest]
250    #[case::singleton_batch_size_one(1, NZU64!(1))]
251    #[case::singleton_batch_size_gt_db_size(1, NZU64!(2))]
252    #[case::batch_size_one(1000, NZU64!(1))]
253    #[case::floor_div_db_batch_size(1000, NZU64!(3))]
254    #[case::floor_div_db_batch_size_2(1000, NZU64!(999))]
255    #[case::div_db_batch_size(1000, NZU64!(100))]
256    #[case::db_size_eq_batch_size(1000, NZU64!(1000))]
257    #[case::batch_size_gt_db_size(1000, NZU64!(1001))]
258    fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: NonZeroU64) {
259        let executor = deterministic::Runner::default();
260        executor.start(|mut context| async move {
261            let mut target_db = create_test_db(context.clone()).await;
262            let target_db_ops = create_test_ops(target_db_ops);
263            apply_ops(&mut target_db, target_db_ops.clone()).await;
264            let metadata = Some(Sha256::fill(1));
265            target_db.commit(metadata).await.unwrap();
266            let target_op_count = target_db.op_count();
267            let target_oldest_retained_loc = target_db.oldest_retained_loc();
268            let target_root = target_db.root();
269
270            // Capture target database state before moving into config
271            let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
272            for op in &target_db_ops {
273                if let Operation::Set(key, value) = op {
274                    expected_kvs.insert(*key, *value);
275                }
276            }
277
278            let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
279
280            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
281            let config = Config {
282                db_config: db_config.clone(),
283                fetch_batch_size,
284                target: Target {
285                    root: target_root,
286                    range: target_oldest_retained_loc..target_op_count,
287                },
288                context: context.clone(),
289                resolver: target_db.clone(),
290                apply_batch_size: 1024,
291                max_outstanding_requests: 1,
292                update_rx: None,
293            };
294            let mut got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
295
296            // Verify database state
297            assert_eq!(got_db.op_count(), target_op_count);
298            assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
299
300            // Verify the root digest matches the target
301            assert_eq!(got_db.root(), target_root);
302
303            // Verify that the synced database matches the target state
304            for (key, expected_value) in &expected_kvs {
305                let synced_value = got_db.get(key).await.unwrap();
306                assert_eq!(synced_value, Some(*expected_value));
307            }
308
309            // Put more key-value pairs into both databases
310            let mut new_ops = Vec::new();
311            let mut rng = StdRng::seed_from_u64(42);
312            let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
313            for _i in 0..expected_kvs.len() {
314                let key = sha256::Digest::random(&mut rng);
315                let value = sha256::Digest::random(&mut rng);
316                new_ops.push(Operation::Set(key, value));
317                new_kvs.insert(key, value);
318            }
319
320            // Apply new operations to both databases
321            apply_ops(&mut got_db, new_ops.clone()).await;
322            {
323                let mut target_db = target_db.write().await;
324                apply_ops(&mut target_db, new_ops).await;
325            }
326
327            // Verify both databases have the same state after additional operations
328            for (key, expected_value) in &new_kvs {
329                let synced_value = got_db.get(key).await.unwrap();
330                let target_value = {
331                    let target_db = target_db.read().await;
332                    target_db.get(key).await.unwrap()
333                };
334                assert_eq!(synced_value, Some(*expected_value));
335                assert_eq!(target_value, Some(*expected_value));
336            }
337
338            got_db.destroy().await.unwrap();
339            let target_db = Arc::try_unwrap(target_db).map_or_else(
340                |_| panic!("Failed to unwrap Arc - still has references"),
341                |rw_lock| rw_lock.into_inner(),
342            );
343            target_db.destroy().await.unwrap();
344        });
345    }
346
347    /// Test that sync works when the target database is initially empty
348    #[test_traced("WARN")]
349    fn test_sync_empty_to_nonempty() {
350        let executor = deterministic::Runner::default();
351        executor.start(|mut context| async move {
352            // Create an empty target database
353            let mut target_db = create_test_db(context.clone()).await;
354            target_db.commit(Some(Sha256::fill(1))).await.unwrap(); // Commit to establish a valid root
355
356            let target_op_count = target_db.op_count();
357            let target_oldest_retained_loc = target_db.oldest_retained_loc();
358            let target_root = target_db.root();
359
360            let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
361            let target_db = Arc::new(RwLock::new(target_db));
362            let config = Config {
363                db_config,
364                fetch_batch_size: NZU64!(10),
365                target: Target {
366                    root: target_root,
367                    range: target_oldest_retained_loc..target_op_count,
368                },
369                context: context.clone(),
370                resolver: target_db.clone(),
371                apply_batch_size: 1024,
372                max_outstanding_requests: 1,
373                update_rx: None,
374            };
375            let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
376
377            // Verify database state
378            assert_eq!(got_db.op_count(), target_op_count);
379            assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
380            assert_eq!(got_db.root(), target_root);
381            assert_eq!(got_db.get_metadata().await.unwrap(), Some(Sha256::fill(1)));
382
383            got_db.destroy().await.unwrap();
384            let target_db = Arc::try_unwrap(target_db).map_or_else(
385                |_| panic!("Failed to unwrap Arc - still has references"),
386                |rw_lock| rw_lock.into_inner(),
387            );
388            target_db.destroy().await.unwrap();
389        });
390    }
391
392    /// Test demonstrating that a synced database can be reopened and retain its state.
393    #[test_traced("WARN")]
394    fn test_sync_database_persistence() {
395        let executor = deterministic::Runner::default();
396        executor.start(|context| async move {
397            // Create and populate a simple target database
398            let mut target_db = create_test_db(context.clone()).await;
399            let target_ops = create_test_ops(10);
400            apply_ops(&mut target_db, target_ops.clone()).await;
401            target_db.commit(Some(Sha256::fill(0))).await.unwrap();
402
403            // Capture target state
404            let target_root = target_db.root();
405            let lower_bound = target_db.oldest_retained_loc();
406            let op_count = target_db.op_count();
407
408            // Perform sync
409            let db_config = create_sync_config("persistence_test");
410            let context_clone = context.clone();
411            let target_db = Arc::new(RwLock::new(target_db));
412            let config = Config {
413                db_config: db_config.clone(),
414                fetch_batch_size: NZU64!(5),
415                target: Target {
416                    root: target_root,
417                    range: lower_bound..op_count,
418                },
419                context,
420                resolver: target_db.clone(),
421                apply_batch_size: 1024,
422                max_outstanding_requests: 1,
423                update_rx: None,
424            };
425            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
426
427            // Verify initial sync worked
428            assert_eq!(synced_db.root(), target_root);
429
430            // Save state before closing
431            let expected_root = synced_db.root();
432            let expected_op_count = synced_db.op_count();
433            let expected_oldest_retained_loc = synced_db.oldest_retained_loc();
434
435            // Close and reopen the database to test persistence
436            synced_db.close().await.unwrap();
437            let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
438                .await
439                .unwrap();
440
441            // Verify state is preserved
442            assert_eq!(reopened_db.root(), expected_root);
443            assert_eq!(reopened_db.op_count(), expected_op_count);
444            assert_eq!(
445                reopened_db.oldest_retained_loc(),
446                expected_oldest_retained_loc
447            );
448
449            // Verify data integrity
450            for op in &target_ops {
451                if let Operation::Set(key, value) = op {
452                    let stored_value = reopened_db.get(key).await.unwrap();
453                    assert_eq!(stored_value, Some(*value));
454                }
455            }
456
457            reopened_db.destroy().await.unwrap();
458            let target_db = Arc::try_unwrap(target_db).map_or_else(
459                |_| panic!("Failed to unwrap Arc - still has references"),
460                |rw_lock| rw_lock.into_inner(),
461            );
462            target_db.destroy().await.unwrap();
463        });
464    }
465
466    /// Test that target updates work correctly during sync
467    #[test_traced("WARN")]
468    fn test_target_update_during_sync() {
469        let executor = deterministic::Runner::default();
470        executor.start(|mut context| async move {
471            // Create and populate initial target database
472            let mut target_db = create_test_db(context.clone()).await;
473            let initial_ops = create_test_ops(50);
474            apply_ops(&mut target_db, initial_ops.clone()).await;
475            target_db.commit(None).await.unwrap();
476
477            // Capture the state after first commit
478            let initial_lower_bound = target_db.oldest_retained_loc();
479            let initial_upper_bound = target_db.op_count();
480            let initial_root = target_db.root();
481
482            // Add more operations to create the extended target
483            let additional_ops = create_test_ops(25);
484            apply_ops(&mut target_db, additional_ops.clone()).await;
485            target_db.commit(None).await.unwrap();
486            let final_upper_bound = target_db.op_count();
487            let final_root = target_db.root();
488
489            // Wrap target database for shared mutable access
490            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
491
492            // Create client with initial smaller target and very small batch size
493            let (mut update_sender, update_receiver) = mpsc::channel(1);
494            let client = {
495                let config = Config {
496                    context: context.clone(),
497                    db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
498                    target: Target {
499                        root: initial_root,
500                        range: initial_lower_bound..initial_upper_bound,
501                    },
502                    resolver: target_db.clone(),
503                    fetch_batch_size: NZU64!(2), // Very small batch size to ensure multiple batches needed
504                    max_outstanding_requests: 10,
505                    apply_batch_size: 1024,
506                    update_rx: Some(update_receiver),
507                };
508                let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
509                loop {
510                    // Step the client until we have processed a batch of operations
511                    client = match client.step().await.unwrap() {
512                        NextStep::Continue(new_client) => new_client,
513                        NextStep::Complete(_) => panic!("client should not be complete"),
514                    };
515                    let log_size = client.journal().size();
516                    if log_size > initial_lower_bound {
517                        break client;
518                    }
519                }
520            };
521
522            // Send target update with SAME lower bound but higher upper bound
523            update_sender
524                .send(Target {
525                    root: final_root,
526                    range: initial_lower_bound..final_upper_bound,
527                })
528                .await
529                .unwrap();
530
531            // Complete the sync
532            let synced_db = client.sync().await.unwrap();
533
534            // Verify the synced database has the expected final state
535            assert_eq!(synced_db.root(), final_root);
536
537            // Verify the target database matches the synced database
538            let target_db = Arc::try_unwrap(target_db).map_or_else(
539                |_| panic!("Failed to unwrap Arc - still has references"),
540                |rw_lock| rw_lock.into_inner(),
541            );
542            {
543                assert_eq!(synced_db.op_count(), target_db.op_count());
544                assert_eq!(
545                    synced_db.oldest_retained_loc(),
546                    target_db.oldest_retained_loc()
547                );
548                assert_eq!(synced_db.root(), target_db.root());
549            }
550
551            // Verify all expected operations are present in the synced database
552            let all_ops = [initial_ops, additional_ops].concat();
553            for op in &all_ops {
554                if let Operation::Set(key, value) = op {
555                    let synced_value = synced_db.get(key).await.unwrap();
556                    assert_eq!(synced_value, Some(*value));
557                }
558            }
559
560            synced_db.destroy().await.unwrap();
561            target_db.destroy().await.unwrap();
562        });
563    }
564
565    /// Test that invalid bounds are rejected
566    #[test]
567    fn test_sync_invalid_bounds() {
568        let executor = deterministic::Runner::default();
569        executor.start(|mut context| async move {
570            let target_db = create_test_db(context.clone()).await;
571            let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
572            let config = Config {
573                db_config,
574                fetch_batch_size: NZU64!(10),
575                target: Target {
576                    root: sha256::Digest::from([1u8; 32]),
577                    range: Location::new_unchecked(31)..Location::new_unchecked(31),
578                },
579                context,
580                resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
581                apply_batch_size: 1024,
582                max_outstanding_requests: 1,
583                update_rx: None,
584            };
585            let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
586            match result {
587                Err(sync::Error::Engine(sync::EngineError::InvalidTarget {
588                    lower_bound_pos,
589                    upper_bound_pos,
590                })) => {
591                    assert_eq!(lower_bound_pos, Location::new_unchecked(31));
592                    assert_eq!(upper_bound_pos, Location::new_unchecked(31));
593                }
594                _ => panic!("Expected InvalidTarget error"),
595            }
596        });
597    }
598
599    /// Test that sync works when target database has operations beyond the requested range
600    /// of operations to sync.
601    #[test]
602    fn test_sync_subset_of_target_database() {
603        let executor = deterministic::Runner::default();
604        executor.start(|mut context| async move {
605            let mut target_db = create_test_db(context.clone()).await;
606            let target_ops = create_test_ops(30);
607            // Apply all but the last operation
608            apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
609            target_db.commit(None).await.unwrap();
610
611            let target_root = target_db.root();
612            let lower_bound = target_db.oldest_retained_loc();
613            let op_count = target_db.op_count();
614
615            // Add final op after capturing the range
616            apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
617            target_db.commit(None).await.unwrap();
618
619            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
620            let config = Config {
621                db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
622                fetch_batch_size: NZU64!(10),
623                target: Target {
624                    root: target_root,
625                    range: lower_bound..op_count,
626                },
627                context,
628                resolver: target_db.clone(),
629                apply_batch_size: 1024,
630                max_outstanding_requests: 1,
631                update_rx: None,
632            };
633            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
634
635            // Verify state matches the specified range
636            assert_eq!(synced_db.root(), target_root);
637            assert_eq!(synced_db.op_count(), op_count);
638
639            synced_db.destroy().await.unwrap();
640            let target_db =
641                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
642            let inner = target_db.into_inner();
643            inner.destroy().await.unwrap();
644        });
645    }
646
647    // Test syncing where the sync client has some but not all of the operations in the target
648    // database.
649    #[test]
650    fn test_sync_use_existing_db_partial_match() {
651        let executor = deterministic::Runner::default();
652        executor.start(|mut context| async move {
653            let original_ops = create_test_ops(50);
654
655            // Create two databases
656            let mut target_db = create_test_db(context.clone()).await;
657            let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
658            let mut sync_db: ImmutableSyncTest =
659                immutable::Immutable::init(context.clone(), sync_db_config.clone())
660                    .await
661                    .unwrap();
662
663            // Apply the same operations to both databases
664            apply_ops(&mut target_db, original_ops.clone()).await;
665            apply_ops(&mut sync_db, original_ops.clone()).await;
666            target_db.commit(None).await.unwrap();
667            sync_db.commit(None).await.unwrap();
668
669            // Close sync_db
670            sync_db.close().await.unwrap();
671
672            // Add one more operation and commit the target database
673            let last_op = create_test_ops(1);
674            apply_ops(&mut target_db, last_op.clone()).await;
675            target_db.commit(None).await.unwrap();
676            let root = target_db.root();
677            let lower_bound = target_db.oldest_retained_loc();
678            let upper_bound = target_db.op_count(); // Up to the last operation
679
680            // Reopen the sync database and sync it to the target database
681            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
682            let config = Config {
683                db_config: sync_db_config, // Use same config as before
684                fetch_batch_size: NZU64!(10),
685                target: Target {
686                    root,
687                    range: lower_bound..upper_bound,
688                },
689                context: context.clone(),
690                resolver: target_db.clone(),
691                apply_batch_size: 1024,
692                max_outstanding_requests: 1,
693                update_rx: None,
694            };
695            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
696
697            // Verify database state
698            assert_eq!(sync_db.op_count(), upper_bound);
699            assert_eq!(sync_db.root(), root);
700
701            sync_db.destroy().await.unwrap();
702            let target_db =
703                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
704            let inner = target_db.into_inner();
705            inner.destroy().await.unwrap();
706        });
707    }
708
709    /// Test case where existing database on disk exactly matches the sync target
710    #[test]
711    fn test_sync_use_existing_db_exact_match() {
712        let executor = deterministic::Runner::default();
713        executor.start(|mut context| async move {
714            let target_ops = create_test_ops(40);
715
716            // Create two databases
717            let mut target_db = create_test_db(context.clone()).await;
718            let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
719            let mut sync_db: ImmutableSyncTest =
720                immutable::Immutable::init(context.clone(), sync_config.clone())
721                    .await
722                    .unwrap();
723
724            // Apply the same operations to both databases
725            apply_ops(&mut target_db, target_ops.clone()).await;
726            apply_ops(&mut sync_db, target_ops.clone()).await;
727            target_db.commit(None).await.unwrap();
728            sync_db.commit(None).await.unwrap();
729
730            // Close sync_db
731            sync_db.close().await.unwrap();
732
733            // Prepare target
734            let root = target_db.root();
735            let lower_bound = target_db.oldest_retained_loc();
736            let upper_bound = target_db.op_count();
737
738            // Sync should complete immediately without fetching
739            let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
740            let config = Config {
741                db_config: sync_config,
742                fetch_batch_size: NZU64!(10),
743                target: Target {
744                    root,
745                    range: lower_bound..upper_bound,
746                },
747                context,
748                resolver: resolver.clone(),
749                apply_batch_size: 1024,
750                max_outstanding_requests: 1,
751                update_rx: None,
752            };
753            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
754
755            assert_eq!(sync_db.op_count(), upper_bound);
756            assert_eq!(sync_db.root(), root);
757
758            sync_db.destroy().await.unwrap();
759            let target_db =
760                Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
761            let inner = target_db.into_inner();
762            inner.destroy().await.unwrap();
763        });
764    }
765
766    /// Test that the client fails to sync if the lower bound is decreased
767    #[test_traced("WARN")]
768    fn test_target_update_lower_bound_decrease() {
769        let executor = deterministic::Runner::default();
770        executor.start(|mut context| async move {
771            // Create and populate target database
772            let mut target_db = create_test_db(context.clone()).await;
773            let target_ops = create_test_ops(100);
774            apply_ops(&mut target_db, target_ops).await;
775            target_db.commit(None).await.unwrap();
776
777            target_db.prune(Location::new_unchecked(10)).await.unwrap();
778
779            // Capture initial target state
780            let initial_lower_bound = target_db.oldest_retained_loc();
781            let initial_upper_bound = target_db.op_count();
782            let initial_root = target_db.root();
783
784            // Create client with initial target
785            let (mut update_sender, update_receiver) = mpsc::channel(1);
786            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
787            let config = Config {
788                context: context.clone(),
789                db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
790                fetch_batch_size: NZU64!(5),
791                target: Target {
792                    root: initial_root,
793                    range: initial_lower_bound..initial_upper_bound,
794                },
795                resolver: target_db.clone(),
796                apply_batch_size: 1024,
797                max_outstanding_requests: 10,
798                update_rx: Some(update_receiver),
799            };
800            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
801
802            // Send target update with decreased lower bound
803            update_sender
804                .send(Target {
805                    root: initial_root,
806                    range: initial_lower_bound.checked_sub(1).unwrap()..initial_upper_bound,
807                })
808                .await
809                .unwrap();
810
811            let result = client.step().await;
812            assert!(matches!(
813                result,
814                Err(sync::Error::Engine(
815                    sync::EngineError::SyncTargetMovedBackward { .. }
816                ))
817            ));
818
819            let target_db =
820                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
821            let inner = target_db.into_inner();
822            inner.destroy().await.unwrap();
823        });
824    }
825
826    /// Test that the client fails to sync if the upper bound is decreased
827    #[test_traced("WARN")]
828    fn test_target_update_upper_bound_decrease() {
829        let executor = deterministic::Runner::default();
830        executor.start(|mut context| async move {
831            // Create and populate target database
832            let mut target_db = create_test_db(context.clone()).await;
833            let target_ops = create_test_ops(50);
834            apply_ops(&mut target_db, target_ops).await;
835            target_db.commit(None).await.unwrap();
836
837            // Capture initial target state
838            let initial_lower_bound = target_db.oldest_retained_loc();
839            let initial_upper_bound = target_db.op_count();
840            let initial_root = target_db.root();
841
842            // Create client with initial target
843            let (mut update_sender, update_receiver) = mpsc::channel(1);
844            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
845            let config = Config {
846                context: context.clone(),
847                db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
848                fetch_batch_size: NZU64!(5),
849                target: Target {
850                    root: initial_root,
851                    range: initial_lower_bound..initial_upper_bound,
852                },
853                resolver: target_db.clone(),
854                apply_batch_size: 1024,
855                max_outstanding_requests: 10,
856                update_rx: Some(update_receiver),
857            };
858            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
859
860            // Send target update with decreased upper bound
861            update_sender
862                .send(Target {
863                    root: initial_root,
864                    range: initial_lower_bound..(initial_upper_bound - 1),
865                })
866                .await
867                .unwrap();
868
869            let result = client.step().await;
870            assert!(matches!(
871                result,
872                Err(sync::Error::Engine(
873                    sync::EngineError::SyncTargetMovedBackward { .. }
874                ))
875            ));
876
877            let target_db =
878                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
879            let inner = target_db.into_inner();
880            inner.destroy().await.unwrap();
881        });
882    }
883
884    /// Test that the client succeeds when bounds are updated
885    #[test_traced("WARN")]
886    fn test_target_update_bounds_increase() {
887        let executor = deterministic::Runner::default();
888        executor.start(|mut context| async move {
889            // Create and populate target database
890            let mut target_db = create_test_db(context.clone()).await;
891            let target_ops = create_test_ops(100);
892            apply_ops(&mut target_db, target_ops.clone()).await;
893            target_db.commit(None).await.unwrap();
894
895            // Capture initial target state
896            let initial_lower_bound = target_db.oldest_retained_loc();
897            let initial_upper_bound = target_db.op_count();
898            let initial_root = target_db.root();
899
900            // Apply more operations to the target database
901            let more_ops = create_test_ops(5);
902            apply_ops(&mut target_db, more_ops.clone()).await;
903            target_db.commit(None).await.unwrap();
904
905            target_db.prune(Location::new_unchecked(10)).await.unwrap();
906            target_db.commit(None).await.unwrap();
907
908            // Capture final target state
909            let final_lower_bound = target_db.oldest_retained_loc();
910            let final_upper_bound = target_db.op_count();
911            let final_root = target_db.root();
912
913            // Assert we're actually updating the bounds
914            assert_ne!(final_lower_bound, initial_lower_bound);
915            assert_ne!(final_upper_bound, initial_upper_bound);
916
917            // Create client with initial target
918            let (mut update_sender, update_receiver) = mpsc::channel(1);
919            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
920            let config = Config {
921                context: context.clone(),
922                db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
923                fetch_batch_size: NZU64!(1),
924                target: Target {
925                    root: initial_root,
926                    range: initial_lower_bound..initial_upper_bound,
927                },
928                resolver: target_db.clone(),
929                apply_batch_size: 1024,
930                max_outstanding_requests: 1,
931                update_rx: Some(update_receiver),
932            };
933
934            // Send target update with increased upper bound
935            update_sender
936                .send(Target {
937                    root: final_root,
938                    range: final_lower_bound..final_upper_bound,
939                })
940                .await
941                .unwrap();
942
943            // Complete the sync
944            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
945
946            // Verify the synced database has the expected state
947            assert_eq!(synced_db.root(), final_root);
948            assert_eq!(synced_db.op_count(), final_upper_bound);
949            assert_eq!(synced_db.oldest_retained_loc(), final_lower_bound);
950
951            synced_db.destroy().await.unwrap();
952            let target_db = Arc::try_unwrap(target_db).map_or_else(
953                |_| panic!("Failed to unwrap Arc - still has references"),
954                |rw_lock| rw_lock.into_inner(),
955            );
956            target_db.destroy().await.unwrap();
957        });
958    }
959
960    /// Test that the client fails to sync with invalid bounds (lower > upper)
961    #[test_traced("WARN")]
962    fn test_target_update_invalid_bounds() {
963        let executor = deterministic::Runner::default();
964        executor.start(|mut context| async move {
965            // Create and populate target database
966            let mut target_db = create_test_db(context.clone()).await;
967            let target_ops = create_test_ops(25);
968            apply_ops(&mut target_db, target_ops).await;
969            target_db.commit(None).await.unwrap();
970
971            // Capture initial target state
972            let initial_lower_bound = target_db.oldest_retained_loc();
973            let initial_upper_bound = target_db.op_count();
974            let initial_root = target_db.root();
975
976            // Create client with initial target
977            let (mut update_sender, update_receiver) = mpsc::channel(1);
978            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
979            let config = Config {
980                context: context.clone(),
981                db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
982                fetch_batch_size: NZU64!(5),
983                target: Target {
984                    root: initial_root,
985                    range: initial_lower_bound..initial_upper_bound,
986                },
987                resolver: target_db.clone(),
988                apply_batch_size: 1024,
989                max_outstanding_requests: 10,
990                update_rx: Some(update_receiver),
991            };
992            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
993
994            // Send target update with invalid bounds (lower > upper)
995            update_sender
996                .send(Target {
997                    root: initial_root,
998                    range: initial_upper_bound..initial_lower_bound,
999                })
1000                .await
1001                .unwrap();
1002
1003            let result = client.step().await;
1004            assert!(matches!(
1005                result,
1006                Err(sync::Error::Engine(sync::EngineError::InvalidTarget { .. }))
1007            ));
1008
1009            let target_db =
1010                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1011            let inner = target_db.into_inner();
1012            inner.destroy().await.unwrap();
1013        });
1014    }
1015
1016    /// Test that target updates can be sent even after the client is done
1017    #[test_traced("WARN")]
1018    fn test_target_update_on_done_client() {
1019        let executor = deterministic::Runner::default();
1020        executor.start(|mut context| async move {
1021            // Create and populate target database
1022            let mut target_db = create_test_db(context.clone()).await;
1023            let target_ops = create_test_ops(10);
1024            apply_ops(&mut target_db, target_ops).await;
1025            target_db.commit(None).await.unwrap();
1026
1027            // Capture target state
1028            let lower_bound = target_db.oldest_retained_loc();
1029            let upper_bound = target_db.op_count();
1030            let root = target_db.root();
1031
1032            // Create client with target that will complete immediately
1033            let (mut update_sender, update_receiver) = mpsc::channel(1);
1034            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1035            let config = Config {
1036                context: context.clone(),
1037                db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1038                fetch_batch_size: NZU64!(20),
1039                target: Target {
1040                    root,
1041                    range: lower_bound..upper_bound,
1042                },
1043                resolver: target_db.clone(),
1044                apply_batch_size: 1024,
1045                max_outstanding_requests: 10,
1046                update_rx: Some(update_receiver),
1047            };
1048
1049            // Complete the sync
1050            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1051
1052            // Attempt to apply a target update after sync is complete to verify we don't panic
1053            let _ = update_sender
1054                .send(Target {
1055                    root: sha256::Digest::from([2u8; 32]),
1056                    range: lower_bound + 1..upper_bound + 1,
1057                })
1058                .await;
1059
1060            // Verify the synced database has the expected state
1061            assert_eq!(synced_db.root(), root);
1062            assert_eq!(synced_db.op_count(), upper_bound);
1063            assert_eq!(synced_db.oldest_retained_loc(), lower_bound);
1064
1065            synced_db.destroy().await.unwrap();
1066            Arc::try_unwrap(target_db)
1067                .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1068                .into_inner()
1069                .destroy()
1070                .await
1071                .unwrap();
1072        });
1073    }
1074}