commonware_storage/qmdb/immutable/sync/
mod.rs

1use crate::{
2    journal::contiguous::variable,
3    mmr::Location,
4    qmdb::{
5        any::VariableValue,
6        immutable::{self, Operation},
7        sync, Error,
8    },
9    translator::Translator,
10};
11use commonware_cryptography::Hasher;
12use commonware_runtime::{Clock, Metrics, Storage};
13use commonware_utils::Array;
14use std::ops::Range;
15
16impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
17where
18    E: Storage + Clock + Metrics,
19    K: Array,
20    V: VariableValue,
21    H: Hasher,
22    T: Translator,
23{
24    type Op = Operation<K, V>;
25    type Journal = variable::Journal<E, Self::Op>;
26    type Hasher = H;
27    type Config = immutable::Config<T, V::Cfg>;
28    type Digest = H::Digest;
29    type Context = E;
30
31    async fn create_journal(
32        context: Self::Context,
33        config: &Self::Config,
34        range: Range<Location>,
35    ) -> Result<Self::Journal, Error> {
36        // Initialize contiguous journal for the sync range
37        variable::Journal::init_sync(
38            context.with_label("log"),
39            variable::Config {
40                items_per_section: config.log_items_per_section,
41                partition: config.log_partition.clone(),
42                compression: config.log_compression,
43                codec_config: config.log_codec_config.clone(),
44                buffer_pool: config.buffer_pool.clone(),
45                write_buffer: config.log_write_buffer,
46            },
47            *range.start..*range.end,
48        )
49        .await
50    }
51
52    /// Returns a [super::Immutable] initialized data collected in the sync process.
53    ///
54    /// # Behavior
55    ///
56    /// This method handles different initialization scenarios based on existing data:
57    /// - If the MMR journal is empty or the last item is before the range start, it creates a
58    ///   fresh MMR from the provided `pinned_nodes`
59    /// - If the MMR journal has data but is incomplete (has length < range end), missing operations
60    ///   from the log are applied to bring it up to the target state
61    /// - If the MMR journal has data beyond the range end, it is rewound to match the sync target
62    ///
63    /// # Returns
64    ///
65    /// A [super::Immutable] db populated with the state from the given range.
66    /// The pruning boundary is set to the range start.
67    async fn from_sync_result(
68        context: Self::Context,
69        db_config: Self::Config,
70        journal: Self::Journal,
71        pinned_nodes: Option<Vec<Self::Digest>>,
72        range: Range<Location>,
73        apply_batch_size: usize,
74    ) -> Result<Self, Error> {
75        let sync_config = Config {
76            db_config,
77            log: journal,
78            range,
79            pinned_nodes,
80            apply_batch_size,
81        };
82        Self::init_synced(context, sync_config).await
83    }
84
85    fn root(&self) -> Self::Digest {
86        self.root()
87    }
88
89    async fn resize_journal(
90        mut journal: Self::Journal,
91        context: Self::Context,
92        config: &Self::Config,
93        range: Range<Location>,
94    ) -> Result<Self::Journal, Error> {
95        let size = journal.size();
96
97        if size <= range.start {
98            // Destroy and recreate
99            journal.destroy().await?;
100            Self::create_journal(context, config, range).await
101        } else {
102            // Prune to range start (position-based, not section-based)
103            journal
104                .prune(*range.start)
105                .await
106                .map_err(crate::qmdb::Error::from)?;
107
108            // Verify size is within range
109            let size = journal.size();
110            if size > range.end {
111                return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
112                    size,
113                )));
114            }
115
116            Ok(journal)
117        }
118    }
119}
120
121/// Configuration for syncing an [immutable::Immutable] to a target state.
122pub struct Config<E, K, V, T, D, C>
123where
124    E: Storage + Metrics,
125    K: Array,
126    V: VariableValue,
127    T: Translator,
128    D: commonware_cryptography::Digest,
129{
130    /// Database configuration.
131    pub db_config: immutable::Config<T, C>,
132
133    /// The [immutable::Immutable]'s log of operations. It has elements within the range.
134    /// Reports the range start as its pruning boundary (oldest retained operation index).
135    pub log: variable::Journal<E, Operation<K, V>>,
136
137    /// Sync range - operations outside this range are pruned or not synced.
138    pub range: Range<Location>,
139
140    /// The pinned nodes the MMR needs at the pruning boundary (range start), in the order
141    /// specified by `Proof::nodes_to_pin`.
142    /// If `None`, the pinned nodes will be computed from the MMR's journal and metadata,
143    /// which are expected to have the necessary pinned nodes.
144    pub pinned_nodes: Option<Vec<D>>,
145
146    /// The maximum number of operations to keep in memory
147    /// before committing the database while applying operations.
148    /// Higher value will cause more memory usage during sync.
149    pub apply_batch_size: usize,
150}
151
152#[cfg(test)]
153mod tests {
154    use crate::{
155        mmr::Location,
156        qmdb::{
157            immutable,
158            immutable::Operation,
159            sync::{
160                self,
161                engine::{Config, NextStep},
162                Engine, Target,
163            },
164        },
165        translator::TwoCap,
166    };
167    use commonware_cryptography::{sha256, Sha256};
168    use commonware_macros::test_traced;
169    use commonware_math::algebra::Random;
170    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
171    use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64};
172    use futures::{channel::mpsc, SinkExt as _};
173    use rand::RngCore as _;
174    use rstest::rstest;
175    use std::{
176        collections::HashMap,
177        num::{NonZeroU16, NonZeroU64, NonZeroUsize},
178        sync::Arc,
179    };
180
181    /// Type alias for sync tests with simple codec config (Merkleized, Durable)
182    type ImmutableSyncTest = immutable::Immutable<
183        deterministic::Context,
184        sha256::Digest,
185        sha256::Digest,
186        Sha256,
187        crate::translator::TwoCap,
188    >;
189
190    /// Type alias for mutable state (Unmerkleized, NonDurable)
191    type ImmutableSyncTestMutable = immutable::Immutable<
192        deterministic::Context,
193        sha256::Digest,
194        sha256::Digest,
195        Sha256,
196        crate::translator::TwoCap,
197        immutable::Unmerkleized,
198        immutable::NonDurable,
199    >;
200
201    /// Create a simple config for sync tests
202    fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
203        const PAGE_SIZE: NonZeroU16 = NZU16!(77);
204        const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
205        const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
206
207        immutable::Config {
208            mmr_journal_partition: format!("journal_{suffix}"),
209            mmr_metadata_partition: format!("metadata_{suffix}"),
210            mmr_items_per_blob: NZU64!(11),
211            mmr_write_buffer: NZUsize!(1024),
212            log_partition: format!("log_{suffix}"),
213            log_items_per_section: ITEMS_PER_SECTION,
214            log_compression: None,
215            log_codec_config: (),
216            log_write_buffer: NZUsize!(1024),
217            translator: TwoCap,
218            thread_pool: None,
219            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
220        }
221    }
222
223    /// Create a test database with unique partition names
224    async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
225        let seed = context.next_u64();
226        let config = create_sync_config(&format!("sync_test_{seed}"));
227        ImmutableSyncTest::init(context, config).await.unwrap()
228    }
229
230    /// Create n random Set operations using the default seed (0).
231    /// create_test_ops(n) is a prefix of create_test_ops(n') for n < n'.
232    fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
233        create_test_ops_seeded(n, 0)
234    }
235
236    /// Create n random Set operations using a specific seed.
237    /// Use different seeds when you need non-overlapping keys in the same test.
238    fn create_test_ops_seeded(
239        n: usize,
240        seed: u64,
241    ) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
242        let mut rng = test_rng_seeded(seed);
243        let mut ops = Vec::new();
244        for _i in 0..n {
245            let key = sha256::Digest::random(&mut rng);
246            let value = sha256::Digest::random(&mut rng);
247            ops.push(Operation::Set(key, value));
248        }
249        ops
250    }
251
252    /// Applies the given operations to the database.
253    async fn apply_ops(
254        db: &mut ImmutableSyncTestMutable,
255        ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
256    ) {
257        for op in ops {
258            match op {
259                Operation::Set(key, value) => {
260                    db.set(key, value).await.unwrap();
261                }
262                Operation::Commit(_metadata) => {
263                    // Commit causes a state change, so it is not supported here.
264                    panic!("Commit operation not supported in apply_ops");
265                }
266            }
267        }
268    }
269
270    #[rstest]
271    #[case::singleton_batch_size_one(1, NZU64!(1))]
272    #[case::singleton_batch_size_gt_db_size(1, NZU64!(2))]
273    #[case::batch_size_one(1000, NZU64!(1))]
274    #[case::floor_div_db_batch_size(1000, NZU64!(3))]
275    #[case::floor_div_db_batch_size_2(1000, NZU64!(999))]
276    #[case::div_db_batch_size(1000, NZU64!(100))]
277    #[case::db_size_eq_batch_size(1000, NZU64!(1000))]
278    #[case::batch_size_gt_db_size(1000, NZU64!(1001))]
279    fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: NonZeroU64) {
280        let executor = deterministic::Runner::default();
281        executor.start(|mut context| async move {
282            let mut target_db = create_test_db(context.clone()).await.into_mutable();
283            let target_db_ops = create_test_ops(target_db_ops);
284            apply_ops(&mut target_db, target_db_ops.clone()).await;
285            let metadata = Some(Sha256::fill(1));
286            let (durable_db, _) = target_db.commit(metadata).await.unwrap();
287            let target_db = durable_db.into_merkleized();
288            let target_op_count = target_db.op_count();
289            let target_oldest_retained_loc = target_db.oldest_retained_loc();
290            let target_root = target_db.root();
291
292            // Capture target database state before moving into config
293            let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
294            for op in &target_db_ops {
295                if let Operation::Set(key, value) = op {
296                    expected_kvs.insert(*key, *value);
297                }
298            }
299
300            let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
301
302            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
303            let config = Config {
304                db_config: db_config.clone(),
305                fetch_batch_size,
306                target: Target {
307                    root: target_root,
308                    range: target_oldest_retained_loc..target_op_count,
309                },
310                context: context.clone(),
311                resolver: target_db.clone(),
312                apply_batch_size: 1024,
313                max_outstanding_requests: 1,
314                update_rx: None,
315            };
316            let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
317
318            // Verify database state
319            assert_eq!(got_db.op_count(), target_op_count);
320            assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
321
322            // Verify the root digest matches the target
323            assert_eq!(got_db.root(), target_root);
324
325            // Verify that the synced database matches the target state
326            for (key, expected_value) in &expected_kvs {
327                let synced_value = got_db.get(key).await.unwrap();
328                assert_eq!(synced_value, Some(*expected_value));
329            }
330
331            // Put more key-value pairs into both databases
332            let mut new_ops = Vec::new();
333            let mut rng = test_rng_seeded(1);
334            let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
335            for _i in 0..expected_kvs.len() {
336                let key = sha256::Digest::random(&mut rng);
337                let value = sha256::Digest::random(&mut rng);
338                new_ops.push(Operation::Set(key, value));
339                new_kvs.insert(key, value);
340            }
341
342            // Apply new operations to both databases.
343            let mut got_db = got_db.into_mutable();
344            apply_ops(&mut got_db, new_ops.clone()).await;
345            let mut target_db = Arc::try_unwrap(target_db).map_or_else(
346                |_| panic!("target_db should have no other references"),
347                |rw_lock| rw_lock.into_inner().into_mutable(),
348            );
349            apply_ops(&mut target_db, new_ops.clone()).await;
350
351            // Verify both databases have the new values
352            for (key, expected_value) in &new_kvs {
353                let synced_value = got_db.get(key).await.unwrap();
354                assert_eq!(synced_value, Some(*expected_value));
355                let target_value = target_db.get(key).await.unwrap();
356                assert_eq!(target_value, Some(*expected_value));
357            }
358
359            let (got_durable, _) = got_db.commit(None).await.unwrap();
360            got_durable.into_merkleized().destroy().await.unwrap();
361            let (target_durable, _) = target_db.commit(None).await.unwrap();
362            target_durable.into_merkleized().destroy().await.unwrap();
363        });
364    }
365
366    /// Test that sync works when the target database is initially empty
367    #[test_traced("WARN")]
368    fn test_sync_empty_to_nonempty() {
369        let executor = deterministic::Runner::default();
370        executor.start(|mut context| async move {
371            // Create an empty target database
372            let target_db = create_test_db(context.clone()).await;
373            let target_db = target_db.into_mutable();
374            let (durable_db, _) = target_db.commit(Some(Sha256::fill(1))).await.unwrap(); // Commit to establish a valid root
375            let target_db = durable_db.into_merkleized();
376
377            let target_op_count = target_db.op_count();
378            let target_oldest_retained_loc = target_db.oldest_retained_loc();
379            let target_root = target_db.root();
380
381            let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
382            let target_db = Arc::new(RwLock::new(target_db));
383            let config = Config {
384                db_config,
385                fetch_batch_size: NZU64!(10),
386                target: Target {
387                    root: target_root,
388                    range: target_oldest_retained_loc..target_op_count,
389                },
390                context: context.clone(),
391                resolver: target_db.clone(),
392                apply_batch_size: 1024,
393                max_outstanding_requests: 1,
394                update_rx: None,
395            };
396            let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
397
398            // Verify database state
399            assert_eq!(got_db.op_count(), target_op_count);
400            assert_eq!(got_db.oldest_retained_loc(), target_oldest_retained_loc);
401            assert_eq!(got_db.root(), target_root);
402            assert_eq!(got_db.get_metadata().await.unwrap(), Some(Sha256::fill(1)));
403
404            got_db.destroy().await.unwrap();
405            let target_db = Arc::try_unwrap(target_db).map_or_else(
406                |_| panic!("Failed to unwrap Arc - still has references"),
407                |rw_lock| rw_lock.into_inner(),
408            );
409            target_db.destroy().await.unwrap();
410        });
411    }
412
413    /// Test demonstrating that a synced database can be reopened and retain its state.
414    #[test_traced("WARN")]
415    fn test_sync_database_persistence() {
416        let executor = deterministic::Runner::default();
417        executor.start(|context| async move {
418            // Create and populate a simple target database
419            let target_db = create_test_db(context.clone()).await;
420            let mut target_db = target_db.into_mutable();
421            let target_ops = create_test_ops(10);
422            apply_ops(&mut target_db, target_ops.clone()).await;
423            let (durable_db, _) = target_db.commit(Some(Sha256::fill(0))).await.unwrap();
424            let target_db = durable_db.into_merkleized();
425
426            // Capture target state
427            let target_root = target_db.root();
428            let lower_bound = target_db.oldest_retained_loc();
429            let op_count = target_db.op_count();
430
431            // Perform sync
432            let db_config = create_sync_config("persistence_test");
433            let context_clone = context.clone();
434            let target_db = Arc::new(RwLock::new(target_db));
435            let config = Config {
436                db_config: db_config.clone(),
437                fetch_batch_size: NZU64!(5),
438                target: Target {
439                    root: target_root,
440                    range: lower_bound..op_count,
441                },
442                context,
443                resolver: target_db.clone(),
444                apply_batch_size: 1024,
445                max_outstanding_requests: 1,
446                update_rx: None,
447            };
448            let mut synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
449
450            // Verify initial sync worked
451            assert_eq!(synced_db.root(), target_root);
452
453            // Save state before closing
454            let expected_root = synced_db.root();
455            let expected_op_count = synced_db.op_count();
456            let expected_oldest_retained_loc = synced_db.oldest_retained_loc();
457
458            // Drop & reopen the database to test persistence
459            synced_db.sync().await.unwrap();
460            drop(synced_db);
461            let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
462                .await
463                .unwrap();
464
465            // Verify state is preserved
466            assert_eq!(reopened_db.root(), expected_root);
467            assert_eq!(reopened_db.op_count(), expected_op_count);
468            assert_eq!(
469                reopened_db.oldest_retained_loc(),
470                expected_oldest_retained_loc
471            );
472
473            // Verify data integrity
474            for op in &target_ops {
475                if let Operation::Set(key, value) = op {
476                    let stored_value = reopened_db.get(key).await.unwrap();
477                    assert_eq!(stored_value, Some(*value));
478                }
479            }
480
481            reopened_db.destroy().await.unwrap();
482            let target_db = Arc::try_unwrap(target_db).map_or_else(
483                |_| panic!("Failed to unwrap Arc - still has references"),
484                |rw_lock| rw_lock.into_inner(),
485            );
486            target_db.destroy().await.unwrap();
487        });
488    }
489
490    /// Test that target updates work correctly during sync
491    #[test_traced("WARN")]
492    fn test_target_update_during_sync() {
493        let executor = deterministic::Runner::default();
494        executor.start(|mut context| async move {
495            // Create and populate initial target database
496            let target_db = create_test_db(context.clone()).await;
497            let mut target_db = target_db.into_mutable();
498            let initial_ops = create_test_ops(50);
499            apply_ops(&mut target_db, initial_ops.clone()).await;
500            let (durable_db, _) = target_db.commit(None).await.unwrap();
501            let target_db = durable_db.into_merkleized();
502
503            // Capture the state after first commit
504            let initial_lower_bound = target_db.oldest_retained_loc();
505            let initial_upper_bound = target_db.op_count();
506            let initial_root = target_db.root();
507
508            // Add more operations to create the extended target
509            // (use different seed to avoid key collisions)
510            let mut target_db = target_db.into_mutable();
511            let additional_ops = create_test_ops_seeded(25, 1);
512            apply_ops(&mut target_db, additional_ops.clone()).await;
513            let (durable_db, _) = target_db.commit(None).await.unwrap();
514            let target_db = durable_db.into_merkleized();
515            let final_upper_bound = target_db.op_count();
516            let final_root = target_db.root();
517
518            // Wrap target database for shared mutable access
519            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
520
521            // Create client with initial smaller target and very small batch size
522            let (mut update_sender, update_receiver) = mpsc::channel(1);
523            let client = {
524                let config = Config {
525                    context: context.clone(),
526                    db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
527                    target: Target {
528                        root: initial_root,
529                        range: initial_lower_bound..initial_upper_bound,
530                    },
531                    resolver: target_db.clone(),
532                    fetch_batch_size: NZU64!(2), // Very small batch size to ensure multiple batches needed
533                    max_outstanding_requests: 10,
534                    apply_batch_size: 1024,
535                    update_rx: Some(update_receiver),
536                };
537                let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
538                loop {
539                    // Step the client until we have processed a batch of operations
540                    client = match client.step().await.unwrap() {
541                        NextStep::Continue(new_client) => new_client,
542                        NextStep::Complete(_) => panic!("client should not be complete"),
543                    };
544                    let log_size = client.journal().size();
545                    if log_size > initial_lower_bound {
546                        break client;
547                    }
548                }
549            };
550
551            // Send target update with SAME lower bound but higher upper bound
552            update_sender
553                .send(Target {
554                    root: final_root,
555                    range: initial_lower_bound..final_upper_bound,
556                })
557                .await
558                .unwrap();
559
560            // Complete the sync
561            let synced_db = client.sync().await.unwrap();
562
563            // Verify the synced database has the expected final state
564            assert_eq!(synced_db.root(), final_root);
565
566            // Verify the target database matches the synced database
567            let target_db = Arc::try_unwrap(target_db).map_or_else(
568                |_| panic!("Failed to unwrap Arc - still has references"),
569                |rw_lock| rw_lock.into_inner(),
570            );
571            {
572                assert_eq!(synced_db.op_count(), target_db.op_count());
573                assert_eq!(
574                    synced_db.oldest_retained_loc(),
575                    target_db.oldest_retained_loc()
576                );
577                assert_eq!(synced_db.root(), target_db.root());
578            }
579
580            // Verify all expected operations are present in the synced database
581            let all_ops = [initial_ops, additional_ops].concat();
582            for op in &all_ops {
583                if let Operation::Set(key, value) = op {
584                    let synced_value = synced_db.get(key).await.unwrap();
585                    assert_eq!(synced_value, Some(*value));
586                }
587            }
588
589            synced_db.destroy().await.unwrap();
590            target_db.destroy().await.unwrap();
591        });
592    }
593
594    /// Test that invalid bounds are rejected
595    #[test]
596    fn test_sync_invalid_bounds() {
597        let executor = deterministic::Runner::default();
598        executor.start(|mut context| async move {
599            let target_db = create_test_db(context.clone()).await;
600            let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
601            let config = Config {
602                db_config,
603                fetch_batch_size: NZU64!(10),
604                target: Target {
605                    root: sha256::Digest::from([1u8; 32]),
606                    range: Location::new_unchecked(31)..Location::new_unchecked(31),
607                },
608                context,
609                resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
610                apply_batch_size: 1024,
611                max_outstanding_requests: 1,
612                update_rx: None,
613            };
614            let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
615            match result {
616                Err(sync::Error::Engine(sync::EngineError::InvalidTarget {
617                    lower_bound_pos,
618                    upper_bound_pos,
619                })) => {
620                    assert_eq!(lower_bound_pos, Location::new_unchecked(31));
621                    assert_eq!(upper_bound_pos, Location::new_unchecked(31));
622                }
623                _ => panic!("Expected InvalidTarget error"),
624            }
625        });
626    }
627
628    /// Test that sync works when target database has operations beyond the requested range
629    /// of operations to sync.
630    #[test]
631    fn test_sync_subset_of_target_database() {
632        let executor = deterministic::Runner::default();
633        executor.start(|mut context| async move {
634            let target_db = create_test_db(context.clone()).await;
635            let mut target_db = target_db.into_mutable();
636            let target_ops = create_test_ops(30);
637            // Apply all but the last operation
638            apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
639            let (durable_db, _) = target_db.commit(None).await.unwrap();
640            let target_db = durable_db.into_merkleized();
641
642            let target_root = target_db.root();
643            let lower_bound = target_db.oldest_retained_loc();
644            let op_count = target_db.op_count();
645
646            // Add final op after capturing the range
647            let mut target_db = target_db.into_mutable();
648            apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
649            let (durable_db, _) = target_db.commit(None).await.unwrap();
650            let target_db = durable_db.into_merkleized();
651
652            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
653            let config = Config {
654                db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
655                fetch_batch_size: NZU64!(10),
656                target: Target {
657                    root: target_root,
658                    range: lower_bound..op_count,
659                },
660                context,
661                resolver: target_db.clone(),
662                apply_batch_size: 1024,
663                max_outstanding_requests: 1,
664                update_rx: None,
665            };
666            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
667
668            // Verify state matches the specified range
669            assert_eq!(synced_db.root(), target_root);
670            assert_eq!(synced_db.op_count(), op_count);
671
672            synced_db.destroy().await.unwrap();
673            let target_db =
674                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
675            let inner = target_db.into_inner();
676            inner.destroy().await.unwrap();
677        });
678    }
679
680    // Test syncing where the sync client has some but not all of the operations in the target
681    // database.
682    #[test]
683    fn test_sync_use_existing_db_partial_match() {
684        let executor = deterministic::Runner::default();
685        executor.start(|mut context| async move {
686            let original_ops = create_test_ops(50);
687
688            // Create two databases
689            let target_db = create_test_db(context.clone()).await;
690            let mut target_db = target_db.into_mutable();
691            let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
692            let sync_db: ImmutableSyncTest =
693                immutable::Immutable::init(context.clone(), sync_db_config.clone())
694                    .await
695                    .unwrap();
696            let mut sync_db = sync_db.into_mutable();
697
698            // Apply the same operations to both databases
699            apply_ops(&mut target_db, original_ops.clone()).await;
700            apply_ops(&mut sync_db, original_ops.clone()).await;
701            let (durable_db, _) = target_db.commit(None).await.unwrap();
702            let target_db = durable_db.into_merkleized();
703            let (durable_db, _) = sync_db.commit(None).await.unwrap();
704            let sync_db = durable_db.into_merkleized();
705
706            drop(sync_db);
707
708            // Add one more operation and commit the target database
709            // (use different seed to avoid key collisions)
710            let mut target_db = target_db.into_mutable();
711            let last_op = create_test_ops_seeded(1, 1);
712            apply_ops(&mut target_db, last_op.clone()).await;
713            let (durable_db, _) = target_db.commit(None).await.unwrap();
714            let target_db = durable_db.into_merkleized();
715            let root = target_db.root();
716            let lower_bound = target_db.oldest_retained_loc();
717            let upper_bound = target_db.op_count(); // Up to the last operation
718
719            // Reopen the sync database and sync it to the target database
720            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
721            let config = Config {
722                db_config: sync_db_config, // Use same config as before
723                fetch_batch_size: NZU64!(10),
724                target: Target {
725                    root,
726                    range: lower_bound..upper_bound,
727                },
728                context: context.clone(),
729                resolver: target_db.clone(),
730                apply_batch_size: 1024,
731                max_outstanding_requests: 1,
732                update_rx: None,
733            };
734            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
735
736            // Verify database state
737            assert_eq!(sync_db.op_count(), upper_bound);
738            assert_eq!(sync_db.root(), root);
739
740            sync_db.destroy().await.unwrap();
741            let target_db =
742                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
743            let inner = target_db.into_inner();
744            inner.destroy().await.unwrap();
745        });
746    }
747
748    /// Test case where existing database on disk exactly matches the sync target
749    #[test]
750    fn test_sync_use_existing_db_exact_match() {
751        let executor = deterministic::Runner::default();
752        executor.start(|mut context| async move {
753            let target_ops = create_test_ops(40);
754
755            // Create two databases
756            let target_db = create_test_db(context.clone()).await;
757            let mut target_db = target_db.into_mutable();
758            let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
759            let sync_db: ImmutableSyncTest =
760                immutable::Immutable::init(context.clone(), sync_config.clone())
761                    .await
762                    .unwrap();
763            let mut sync_db = sync_db.into_mutable();
764
765            // Apply the same operations to both databases
766            apply_ops(&mut target_db, target_ops.clone()).await;
767            apply_ops(&mut sync_db, target_ops.clone()).await;
768            let (durable_db, _) = target_db.commit(None).await.unwrap();
769            let target_db = durable_db.into_merkleized();
770            let (durable_db, _) = sync_db.commit(None).await.unwrap();
771            let sync_db = durable_db.into_merkleized();
772
773            drop(sync_db);
774
775            // Prepare target
776            let root = target_db.root();
777            let lower_bound = target_db.oldest_retained_loc();
778            let upper_bound = target_db.op_count();
779
780            // Sync should complete immediately without fetching
781            let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
782            let config = Config {
783                db_config: sync_config,
784                fetch_batch_size: NZU64!(10),
785                target: Target {
786                    root,
787                    range: lower_bound..upper_bound,
788                },
789                context,
790                resolver: resolver.clone(),
791                apply_batch_size: 1024,
792                max_outstanding_requests: 1,
793                update_rx: None,
794            };
795            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
796
797            assert_eq!(sync_db.op_count(), upper_bound);
798            assert_eq!(sync_db.root(), root);
799
800            sync_db.destroy().await.unwrap();
801            let target_db =
802                Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
803            let inner = target_db.into_inner();
804            inner.destroy().await.unwrap();
805        });
806    }
807
808    /// Test that the client fails to sync if the lower bound is decreased
809    #[test_traced("WARN")]
810    fn test_target_update_lower_bound_decrease() {
811        let executor = deterministic::Runner::default();
812        executor.start(|mut context| async move {
813            // Create and populate target database
814            let target_db = create_test_db(context.clone()).await;
815            let mut target_db = target_db.into_mutable();
816            let target_ops = create_test_ops(100);
817            apply_ops(&mut target_db, target_ops).await;
818            let (durable_db, _) = target_db.commit(None).await.unwrap();
819            let mut target_db = durable_db.into_merkleized();
820
821            target_db.prune(Location::new_unchecked(10)).await.unwrap();
822
823            // Capture initial target state
824            let initial_lower_bound = target_db.oldest_retained_loc();
825            let initial_upper_bound = target_db.op_count();
826            let initial_root = target_db.root();
827
828            // Create client with initial target
829            let (mut update_sender, update_receiver) = mpsc::channel(1);
830            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
831            let config = Config {
832                context: context.clone(),
833                db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
834                fetch_batch_size: NZU64!(5),
835                target: Target {
836                    root: initial_root,
837                    range: initial_lower_bound..initial_upper_bound,
838                },
839                resolver: target_db.clone(),
840                apply_batch_size: 1024,
841                max_outstanding_requests: 10,
842                update_rx: Some(update_receiver),
843            };
844            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
845
846            // Send target update with decreased lower bound
847            update_sender
848                .send(Target {
849                    root: initial_root,
850                    range: initial_lower_bound.checked_sub(1).unwrap()..initial_upper_bound,
851                })
852                .await
853                .unwrap();
854
855            let result = client.step().await;
856            assert!(matches!(
857                result,
858                Err(sync::Error::Engine(
859                    sync::EngineError::SyncTargetMovedBackward { .. }
860                ))
861            ));
862
863            let target_db =
864                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
865            let inner = target_db.into_inner();
866            inner.destroy().await.unwrap();
867        });
868    }
869
870    /// Test that the client fails to sync if the upper bound is decreased
871    #[test_traced("WARN")]
872    fn test_target_update_upper_bound_decrease() {
873        let executor = deterministic::Runner::default();
874        executor.start(|mut context| async move {
875            // Create and populate target database
876            let target_db = create_test_db(context.clone()).await;
877            let mut target_db = target_db.into_mutable();
878            let target_ops = create_test_ops(50);
879            apply_ops(&mut target_db, target_ops).await;
880            let (durable_db, _) = target_db.commit(None).await.unwrap();
881            let target_db = durable_db.into_merkleized();
882
883            // Capture initial target state
884            let initial_lower_bound = target_db.oldest_retained_loc();
885            let initial_upper_bound = target_db.op_count();
886            let initial_root = target_db.root();
887
888            // Create client with initial target
889            let (mut update_sender, update_receiver) = mpsc::channel(1);
890            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
891            let config = Config {
892                context: context.clone(),
893                db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
894                fetch_batch_size: NZU64!(5),
895                target: Target {
896                    root: initial_root,
897                    range: initial_lower_bound..initial_upper_bound,
898                },
899                resolver: target_db.clone(),
900                apply_batch_size: 1024,
901                max_outstanding_requests: 10,
902                update_rx: Some(update_receiver),
903            };
904            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
905
906            // Send target update with decreased upper bound
907            update_sender
908                .send(Target {
909                    root: initial_root,
910                    range: initial_lower_bound..(initial_upper_bound - 1),
911                })
912                .await
913                .unwrap();
914
915            let result = client.step().await;
916            assert!(matches!(
917                result,
918                Err(sync::Error::Engine(
919                    sync::EngineError::SyncTargetMovedBackward { .. }
920                ))
921            ));
922
923            let target_db =
924                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
925            let inner = target_db.into_inner();
926            inner.destroy().await.unwrap();
927        });
928    }
929
930    /// Test that the client succeeds when bounds are updated
931    #[test_traced("WARN")]
932    fn test_target_update_bounds_increase() {
933        let executor = deterministic::Runner::default();
934        executor.start(|mut context| async move {
935            // Create and populate target database
936            let target_db = create_test_db(context.clone()).await;
937            let mut target_db = target_db.into_mutable();
938            let target_ops = create_test_ops(100);
939            apply_ops(&mut target_db, target_ops.clone()).await;
940            let (durable_db, _) = target_db.commit(None).await.unwrap();
941            let target_db = durable_db.into_merkleized();
942
943            // Capture initial target state
944            let initial_lower_bound = target_db.oldest_retained_loc();
945            let initial_upper_bound = target_db.op_count();
946            let initial_root = target_db.root();
947
948            // Apply more operations to the target database
949            // (use different seed to avoid key collisions)
950            let mut target_db = target_db.into_mutable();
951            let more_ops = create_test_ops_seeded(5, 1);
952            apply_ops(&mut target_db, more_ops.clone()).await;
953            let (durable_db, _) = target_db.commit(None).await.unwrap();
954            let mut target_db = durable_db.into_merkleized();
955
956            target_db.prune(Location::new_unchecked(10)).await.unwrap();
957            let target_db = target_db.into_mutable();
958            let (durable_db, _) = target_db.commit(None).await.unwrap();
959            let target_db = durable_db.into_merkleized();
960
961            // Capture final target state
962            let final_lower_bound = target_db.oldest_retained_loc();
963            let final_upper_bound = target_db.op_count();
964            let final_root = target_db.root();
965
966            // Assert we're actually updating the bounds
967            assert_ne!(final_lower_bound, initial_lower_bound);
968            assert_ne!(final_upper_bound, initial_upper_bound);
969
970            // Create client with initial target
971            let (mut update_sender, update_receiver) = mpsc::channel(1);
972            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
973            let config = Config {
974                context: context.clone(),
975                db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
976                fetch_batch_size: NZU64!(1),
977                target: Target {
978                    root: initial_root,
979                    range: initial_lower_bound..initial_upper_bound,
980                },
981                resolver: target_db.clone(),
982                apply_batch_size: 1024,
983                max_outstanding_requests: 1,
984                update_rx: Some(update_receiver),
985            };
986
987            // Send target update with increased upper bound
988            update_sender
989                .send(Target {
990                    root: final_root,
991                    range: final_lower_bound..final_upper_bound,
992                })
993                .await
994                .unwrap();
995
996            // Complete the sync
997            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
998
999            // Verify the synced database has the expected state
1000            assert_eq!(synced_db.root(), final_root);
1001            assert_eq!(synced_db.op_count(), final_upper_bound);
1002            assert_eq!(synced_db.oldest_retained_loc(), final_lower_bound);
1003
1004            synced_db.destroy().await.unwrap();
1005            let target_db = Arc::try_unwrap(target_db).map_or_else(
1006                |_| panic!("Failed to unwrap Arc - still has references"),
1007                |rw_lock| rw_lock.into_inner(),
1008            );
1009            target_db.destroy().await.unwrap();
1010        });
1011    }
1012
1013    /// Test that the client fails to sync with invalid bounds (lower > upper)
1014    #[test_traced("WARN")]
1015    fn test_target_update_invalid_bounds() {
1016        let executor = deterministic::Runner::default();
1017        executor.start(|mut context| async move {
1018            // Create and populate target database
1019            let target_db = create_test_db(context.clone()).await;
1020            let mut target_db = target_db.into_mutable();
1021            let target_ops = create_test_ops(25);
1022            apply_ops(&mut target_db, target_ops).await;
1023            let (durable_db, _) = target_db.commit(None).await.unwrap();
1024            let target_db = durable_db.into_merkleized();
1025
1026            // Capture initial target state
1027            let initial_lower_bound = target_db.oldest_retained_loc();
1028            let initial_upper_bound = target_db.op_count();
1029            let initial_root = target_db.root();
1030
1031            // Create client with initial target
1032            let (mut update_sender, update_receiver) = mpsc::channel(1);
1033            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1034            let config = Config {
1035                context: context.clone(),
1036                db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
1037                fetch_batch_size: NZU64!(5),
1038                target: Target {
1039                    root: initial_root,
1040                    range: initial_lower_bound..initial_upper_bound,
1041                },
1042                resolver: target_db.clone(),
1043                apply_batch_size: 1024,
1044                max_outstanding_requests: 10,
1045                update_rx: Some(update_receiver),
1046            };
1047            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
1048
1049            // Send target update with invalid bounds (lower > upper)
1050            update_sender
1051                .send(Target {
1052                    root: initial_root,
1053                    range: initial_upper_bound..initial_lower_bound,
1054                })
1055                .await
1056                .unwrap();
1057
1058            let result = client.step().await;
1059            assert!(matches!(
1060                result,
1061                Err(sync::Error::Engine(sync::EngineError::InvalidTarget { .. }))
1062            ));
1063
1064            let target_db =
1065                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1066            let inner = target_db.into_inner();
1067            inner.destroy().await.unwrap();
1068        });
1069    }
1070
1071    /// Test that target updates can be sent even after the client is done
1072    #[test_traced("WARN")]
1073    fn test_target_update_on_done_client() {
1074        let executor = deterministic::Runner::default();
1075        executor.start(|mut context| async move {
1076            // Create and populate target database
1077            let target_db = create_test_db(context.clone()).await;
1078            let mut target_db = target_db.into_mutable();
1079            let target_ops = create_test_ops(10);
1080            apply_ops(&mut target_db, target_ops).await;
1081            let (durable_db, _) = target_db.commit(None).await.unwrap();
1082            let target_db = durable_db.into_merkleized();
1083
1084            // Capture target state
1085            let lower_bound = target_db.oldest_retained_loc();
1086            let upper_bound = target_db.op_count();
1087            let root = target_db.root();
1088
1089            // Create client with target that will complete immediately
1090            let (mut update_sender, update_receiver) = mpsc::channel(1);
1091            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1092            let config = Config {
1093                context: context.clone(),
1094                db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1095                fetch_batch_size: NZU64!(20),
1096                target: Target {
1097                    root,
1098                    range: lower_bound..upper_bound,
1099                },
1100                resolver: target_db.clone(),
1101                apply_batch_size: 1024,
1102                max_outstanding_requests: 10,
1103                update_rx: Some(update_receiver),
1104            };
1105
1106            // Complete the sync
1107            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1108
1109            // Attempt to apply a target update after sync is complete to verify we don't panic
1110            let _ = update_sender
1111                .send(Target {
1112                    root: sha256::Digest::from([2u8; 32]),
1113                    range: lower_bound + 1..upper_bound + 1,
1114                })
1115                .await;
1116
1117            // Verify the synced database has the expected state
1118            assert_eq!(synced_db.root(), root);
1119            assert_eq!(synced_db.op_count(), upper_bound);
1120            assert_eq!(synced_db.oldest_retained_loc(), lower_bound);
1121
1122            synced_db.destroy().await.unwrap();
1123            Arc::try_unwrap(target_db)
1124                .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1125                .into_inner()
1126                .destroy()
1127                .await
1128                .unwrap();
1129        });
1130    }
1131}