commonware_storage/adb/immutable/sync/
mod.rs

1use crate::{
2    adb::{immutable, operation::variable::Operation, sync, Error},
3    journal::contiguous::variable,
4    mmr::{Location, StandardHasher as Standard},
5    translator::Translator,
6};
7use commonware_codec::Codec;
8use commonware_cryptography::Hasher;
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::Array;
11use std::ops::Range;
12
13impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
14where
15    E: Storage + Clock + Metrics,
16    K: Array,
17    V: Codec + Send,
18    H: Hasher,
19    T: Translator,
20{
21    type Op = Operation<K, V>;
22    type Journal = variable::Journal<E, Self::Op>;
23    type Hasher = H;
24    type Config = immutable::Config<T, V::Cfg>;
25    type Digest = H::Digest;
26    type Context = E;
27
28    async fn create_journal(
29        context: Self::Context,
30        config: &Self::Config,
31        range: Range<Location>,
32    ) -> Result<Self::Journal, Error> {
33        // Initialize contiguous journal for the sync range
34        variable::Journal::init_sync(
35            context.with_label("log"),
36            variable::Config {
37                items_per_section: config.log_items_per_section,
38                partition: config.log_partition.clone(),
39                compression: config.log_compression,
40                codec_config: config.log_codec_config.clone(),
41                buffer_pool: config.buffer_pool.clone(),
42                write_buffer: config.log_write_buffer,
43            },
44            *range.start..*range.end,
45        )
46        .await
47    }
48
49    /// Returns a [super::Immutable] initialized data collected in the sync process.
50    ///
51    /// # Behavior
52    ///
53    /// This method handles different initialization scenarios based on existing data:
54    /// - If the MMR journal is empty or the last item is before the range start, it creates a
55    ///   fresh MMR from the provided `pinned_nodes`
56    /// - If the MMR journal has data but is incomplete (has length < range end), missing operations
57    ///   from the log are applied to bring it up to the target state
58    /// - If the MMR journal has data beyond the range end, it is rewound to match the sync target
59    ///
60    /// # Returns
61    ///
62    /// A [super::Immutable] db populated with the state from the given range.
63    /// The pruning boundary is set to the range start.
64    async fn from_sync_result(
65        context: Self::Context,
66        db_config: Self::Config,
67        journal: Self::Journal,
68        pinned_nodes: Option<Vec<Self::Digest>>,
69        range: Range<Location>,
70        apply_batch_size: usize,
71    ) -> Result<Self, Error> {
72        let sync_config = Config {
73            db_config,
74            log: journal,
75            range,
76            pinned_nodes,
77            apply_batch_size,
78        };
79        Self::init_synced(context, sync_config).await
80    }
81
82    fn root(&self) -> Self::Digest {
83        let mut hasher = Standard::<H>::new();
84        self.root(&mut hasher)
85    }
86
87    async fn resize_journal(
88        mut journal: Self::Journal,
89        context: Self::Context,
90        config: &Self::Config,
91        range: Range<Location>,
92    ) -> Result<Self::Journal, Error> {
93        let size = journal.size();
94
95        if size <= range.start {
96            // Destroy and recreate
97            journal.destroy().await?;
98            Self::create_journal(context, config, range).await
99        } else {
100            // Prune to range start (position-based, not section-based)
101            journal
102                .prune(*range.start)
103                .await
104                .map_err(crate::adb::Error::from)?;
105
106            // Verify size is within range
107            let size = journal.size();
108            if size > range.end {
109                return Err(crate::adb::Error::UnexpectedData(Location::new_unchecked(
110                    size,
111                )));
112            }
113
114            Ok(journal)
115        }
116    }
117}
118
119/// Configuration for syncing an [immutable::Immutable] to a target state.
120pub struct Config<E, K, V, T, D, C>
121where
122    E: Storage + Metrics,
123    K: Array,
124    V: Codec + Send,
125    T: Translator,
126    D: commonware_cryptography::Digest,
127{
128    /// Database configuration.
129    pub db_config: immutable::Config<T, C>,
130
131    /// The [immutable::Immutable]'s log of operations. It has elements within the range.
132    /// Reports the range start as its pruning boundary (oldest retained operation index).
133    pub log: variable::Journal<E, Operation<K, V>>,
134
135    /// Sync range - operations outside this range are pruned or not synced.
136    pub range: Range<Location>,
137
138    /// The pinned nodes the MMR needs at the pruning boundary (range start), in the order
139    /// specified by `Proof::nodes_to_pin`.
140    /// If `None`, the pinned nodes will be computed from the MMR's journal and metadata,
141    /// which are expected to have the necessary pinned nodes.
142    pub pinned_nodes: Option<Vec<D>>,
143
144    /// The maximum number of operations to keep in memory
145    /// before committing the database while applying operations.
146    /// Higher value will cause more memory usage during sync.
147    pub apply_batch_size: usize,
148}
149
150#[cfg(test)]
151mod tests {
152    use crate::{
153        adb::{
154            immutable,
155            operation::variable::Operation,
156            sync::{
157                self,
158                engine::{Config, NextStep},
159                Engine, Target,
160            },
161        },
162        mmr::{Location, StandardHasher as Standard},
163        translator::TwoCap,
164    };
165    use commonware_cryptography::{sha256, Digest, Sha256};
166    use commonware_macros::test_traced;
167    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
168    use commonware_utils::{NZUsize, NZU64};
169    use futures::{channel::mpsc, SinkExt as _};
170    use rand::{rngs::StdRng, RngCore as _, SeedableRng as _};
171    use std::{
172        collections::HashMap,
173        num::{NonZeroU64, NonZeroUsize},
174        sync::Arc,
175    };
176    use test_case::test_case;
177
178    /// Type alias for sync tests with simple codec config
179    type ImmutableSyncTest = immutable::Immutable<
180        deterministic::Context,
181        sha256::Digest,
182        sha256::Digest,
183        Sha256,
184        crate::translator::TwoCap,
185    >;
186
187    fn test_hasher() -> Standard<Sha256> {
188        Standard::<Sha256>::new()
189    }
190
191    /// Create a simple config for sync tests
192    fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
193        const PAGE_SIZE: NonZeroUsize = NZUsize!(77);
194        const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
195        const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
196
197        immutable::Config {
198            mmr_journal_partition: format!("journal_{suffix}"),
199            mmr_metadata_partition: format!("metadata_{suffix}"),
200            mmr_items_per_blob: NZU64!(11),
201            mmr_write_buffer: NZUsize!(1024),
202            log_partition: format!("log_{suffix}"),
203            log_items_per_section: ITEMS_PER_SECTION,
204            log_compression: None,
205            log_codec_config: (),
206            log_write_buffer: NZUsize!(1024),
207            translator: TwoCap,
208            thread_pool: None,
209            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
210        }
211    }
212
213    /// Create a test database with unique partition names
214    async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
215        let seed = context.next_u64();
216        let config = create_sync_config(&format!("sync_test_{seed}"));
217        ImmutableSyncTest::init(context, config).await.unwrap()
218    }
219
220    /// Create n random Set operations.
221    /// create_test_ops(n') is a suffix of create_test_ops(n) for n' > n.
222    fn create_test_ops(n: usize) -> Vec<Operation<sha256::Digest, sha256::Digest>> {
223        let mut rng = StdRng::seed_from_u64(1337);
224        let mut ops = Vec::new();
225        for _i in 0..n {
226            let key = sha256::Digest::random(&mut rng);
227            let value = sha256::Digest::random(&mut rng);
228            ops.push(Operation::Set(key, value));
229        }
230        ops
231    }
232
233    /// Applies the given operations to the database.
234    async fn apply_ops(
235        db: &mut ImmutableSyncTest,
236        ops: Vec<Operation<sha256::Digest, sha256::Digest>>,
237    ) {
238        for op in ops {
239            match op {
240                Operation::Set(key, value) => {
241                    db.set(key, value).await.unwrap();
242                }
243                Operation::Commit(metadata) => {
244                    db.commit(metadata).await.unwrap();
245                }
246                _ => {}
247            }
248        }
249    }
250
251    #[test_case(1, NZU64!(1); "singleton db with batch size == 1")]
252    #[test_case(1, NZU64!(2); "singleton db with batch size > db size")]
253    #[test_case(100, NZU64!(1); "db with batch size 1")]
254    #[test_case(100, NZU64!(3); "db size not evenly divided by batch size")]
255    #[test_case(100, NZU64!(99); "db size not evenly divided by batch size; different batch size")]
256    #[test_case(100, NZU64!(50); "db size divided by batch size")]
257    #[test_case(100, NZU64!(100); "db size == batch size")]
258    #[test_case(100, NZU64!(101); "batch size > db size")]
259    fn test_sync(target_db_ops: usize, fetch_batch_size: NonZeroU64) {
260        let executor = deterministic::Runner::default();
261        executor.start(|mut context| async move {
262            let mut target_db = create_test_db(context.clone()).await;
263            let target_db_ops = create_test_ops(target_db_ops);
264            apply_ops(&mut target_db, target_db_ops.clone()).await;
265            let metadata = Some(Sha256::fill(1));
266            target_db.commit(metadata).await.unwrap();
267            let target_op_count = target_db.op_count();
268            let target_oldest_retained_loc = target_db.oldest_retained_loc().unwrap();
269            let mut hasher = test_hasher();
270            let target_root = target_db.root(&mut hasher);
271
272            // Capture target database state before moving into config
273            let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
274            for op in &target_db_ops {
275                if let Operation::Set(key, value) = op {
276                    expected_kvs.insert(*key, *value);
277                }
278            }
279
280            let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
281
282            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
283            let config = Config {
284                db_config: db_config.clone(),
285                fetch_batch_size,
286                target: Target {
287                    root: target_root,
288                    range: target_oldest_retained_loc..target_op_count,
289                },
290                context: context.clone(),
291                resolver: target_db.clone(),
292                apply_batch_size: 1024,
293                max_outstanding_requests: 1,
294                update_rx: None,
295            };
296            let mut got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
297
298            // Verify database state
299            let mut hasher = test_hasher();
300            assert_eq!(got_db.op_count(), target_op_count);
301            assert_eq!(
302                got_db.oldest_retained_loc().unwrap(),
303                target_oldest_retained_loc
304            );
305
306            // Verify the root digest matches the target
307            assert_eq!(got_db.root(&mut hasher), target_root);
308
309            // Verify that the synced database matches the target state
310            for (key, expected_value) in &expected_kvs {
311                let synced_value = got_db.get(key).await.unwrap();
312                assert_eq!(synced_value, Some(*expected_value));
313            }
314
315            // Put more key-value pairs into both databases
316            let mut new_ops = Vec::new();
317            let mut rng = StdRng::seed_from_u64(42);
318            let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
319            for _i in 0..expected_kvs.len() {
320                let key = sha256::Digest::random(&mut rng);
321                let value = sha256::Digest::random(&mut rng);
322                new_ops.push(Operation::Set(key, value));
323                new_kvs.insert(key, value);
324            }
325
326            // Apply new operations to both databases
327            apply_ops(&mut got_db, new_ops.clone()).await;
328            {
329                let mut target_db = target_db.write().await;
330                apply_ops(&mut target_db, new_ops).await;
331            }
332
333            // Verify both databases have the same state after additional operations
334            for (key, expected_value) in &new_kvs {
335                let synced_value = got_db.get(key).await.unwrap();
336                let target_value = {
337                    let target_db = target_db.read().await;
338                    target_db.get(key).await.unwrap()
339                };
340                assert_eq!(synced_value, Some(*expected_value));
341                assert_eq!(target_value, Some(*expected_value));
342            }
343
344            got_db.destroy().await.unwrap();
345            let target_db = match Arc::try_unwrap(target_db) {
346                Ok(rw_lock) => rw_lock.into_inner(),
347                Err(_) => panic!("Failed to unwrap Arc - still has references"),
348            };
349            target_db.destroy().await.unwrap();
350        });
351    }
352
353    /// Test that sync works when the target database is initially empty
354    #[test_traced("WARN")]
355    fn test_sync_empty_to_nonempty() {
356        let executor = deterministic::Runner::default();
357        executor.start(|mut context| async move {
358            // Create an empty target database
359            let mut target_db = create_test_db(context.clone()).await;
360            target_db.commit(Some(Sha256::fill(1))).await.unwrap(); // Commit to establish a valid root
361
362            let target_op_count = target_db.op_count();
363            let target_oldest_retained_loc = target_db.oldest_retained_loc().unwrap();
364            let mut hasher = test_hasher();
365            let target_root = target_db.root(&mut hasher);
366
367            let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
368            let target_db = Arc::new(RwLock::new(target_db));
369            let config = Config {
370                db_config,
371                fetch_batch_size: NZU64!(10),
372                target: Target {
373                    root: target_root,
374                    range: target_oldest_retained_loc..target_op_count,
375                },
376                context: context.clone(),
377                resolver: target_db.clone(),
378                apply_batch_size: 1024,
379                max_outstanding_requests: 1,
380                update_rx: None,
381            };
382            let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
383
384            // Verify database state
385            let mut hasher = test_hasher();
386            assert_eq!(got_db.op_count(), target_op_count);
387            assert_eq!(
388                got_db.oldest_retained_loc().unwrap(),
389                target_oldest_retained_loc
390            );
391            assert_eq!(got_db.root(&mut hasher), target_root);
392            assert_eq!(
393                got_db.get_metadata().await.unwrap(),
394                Some((Location::new_unchecked(0), Some(Sha256::fill(1))))
395            );
396
397            got_db.destroy().await.unwrap();
398            let target_db = match Arc::try_unwrap(target_db) {
399                Ok(rw_lock) => rw_lock.into_inner(),
400                Err(_) => panic!("Failed to unwrap Arc - still has references"),
401            };
402            target_db.destroy().await.unwrap();
403        });
404    }
405
406    /// Test demonstrating that a synced database can be reopened and retain its state.
407    #[test_traced("WARN")]
408    fn test_sync_database_persistence() {
409        let executor = deterministic::Runner::default();
410        executor.start(|context| async move {
411            // Create and populate a simple target database
412            let mut target_db = create_test_db(context.clone()).await;
413            let target_ops = create_test_ops(10);
414            apply_ops(&mut target_db, target_ops.clone()).await;
415            target_db.commit(Some(Sha256::fill(0))).await.unwrap();
416
417            // Capture target state
418            let mut hasher = test_hasher();
419            let target_root = target_db.root(&mut hasher);
420            let lower_bound = target_db.oldest_retained_loc().unwrap();
421            let op_count = target_db.op_count();
422
423            // Perform sync
424            let db_config = create_sync_config("persistence_test");
425            let context_clone = context.clone();
426            let target_db = Arc::new(RwLock::new(target_db));
427            let config = Config {
428                db_config: db_config.clone(),
429                fetch_batch_size: NZU64!(5),
430                target: Target {
431                    root: target_root,
432                    range: lower_bound..op_count,
433                },
434                context,
435                resolver: target_db.clone(),
436                apply_batch_size: 1024,
437                max_outstanding_requests: 1,
438                update_rx: None,
439            };
440            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
441
442            // Verify initial sync worked
443            let mut hasher = test_hasher();
444            assert_eq!(synced_db.root(&mut hasher), target_root);
445
446            // Save state before closing
447            let expected_root = synced_db.root(&mut hasher);
448            let expected_op_count = synced_db.op_count();
449            let expected_oldest_retained_loc = synced_db.oldest_retained_loc().unwrap();
450
451            // Close and reopen the database to test persistence
452            synced_db.close().await.unwrap();
453            let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
454                .await
455                .unwrap();
456
457            // Verify state is preserved
458            let mut hasher = test_hasher();
459            assert_eq!(reopened_db.root(&mut hasher), expected_root);
460            assert_eq!(reopened_db.op_count(), expected_op_count);
461            assert_eq!(
462                reopened_db.oldest_retained_loc().unwrap(),
463                expected_oldest_retained_loc
464            );
465
466            // Verify data integrity
467            for op in &target_ops {
468                if let Operation::Set(key, value) = op {
469                    let stored_value = reopened_db.get(key).await.unwrap();
470                    assert_eq!(stored_value, Some(*value));
471                }
472            }
473
474            reopened_db.destroy().await.unwrap();
475            let target_db = match Arc::try_unwrap(target_db) {
476                Ok(rw_lock) => rw_lock.into_inner(),
477                Err(_) => panic!("Failed to unwrap Arc - still has references"),
478            };
479            target_db.destroy().await.unwrap();
480        });
481    }
482
483    /// Test that target updates work correctly during sync
484    #[test_traced("WARN")]
485    fn test_target_update_during_sync() {
486        let executor = deterministic::Runner::default();
487        executor.start(|mut context| async move {
488            // Create and populate initial target database
489            let mut target_db = create_test_db(context.clone()).await;
490            let initial_ops = create_test_ops(50);
491            apply_ops(&mut target_db, initial_ops.clone()).await;
492            target_db.commit(None).await.unwrap();
493
494            // Capture the state after first commit
495            let mut hasher = test_hasher();
496            let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
497            let initial_upper_bound = target_db.op_count();
498            let initial_root = target_db.root(&mut hasher);
499
500            // Add more operations to create the extended target
501            let additional_ops = create_test_ops(25);
502            apply_ops(&mut target_db, additional_ops.clone()).await;
503            target_db.commit(None).await.unwrap();
504            let final_upper_bound = target_db.op_count();
505            let final_root = target_db.root(&mut hasher);
506
507            // Wrap target database for shared mutable access
508            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
509
510            // Create client with initial smaller target and very small batch size
511            let (mut update_sender, update_receiver) = mpsc::channel(1);
512            let client = {
513                let config = Config {
514                    context: context.clone(),
515                    db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
516                    target: Target {
517                        root: initial_root,
518                        range: initial_lower_bound..initial_upper_bound,
519                    },
520                    resolver: target_db.clone(),
521                    fetch_batch_size: NZU64!(2), // Very small batch size to ensure multiple batches needed
522                    max_outstanding_requests: 10,
523                    apply_batch_size: 1024,
524                    update_rx: Some(update_receiver),
525                };
526                let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
527                loop {
528                    // Step the client until we have processed a batch of operations
529                    client = match client.step().await.unwrap() {
530                        NextStep::Continue(new_client) => new_client,
531                        NextStep::Complete(_) => panic!("client should not be complete"),
532                    };
533                    let log_size = client.journal().size();
534                    if log_size > initial_lower_bound {
535                        break client;
536                    }
537                }
538            };
539
540            // Send target update with SAME lower bound but higher upper bound
541            update_sender
542                .send(Target {
543                    root: final_root,
544                    range: initial_lower_bound..final_upper_bound,
545                })
546                .await
547                .unwrap();
548
549            // Complete the sync
550            let synced_db = client.sync().await.unwrap();
551
552            // Verify the synced database has the expected final state
553            let mut hasher = test_hasher();
554            assert_eq!(synced_db.root(&mut hasher), final_root);
555
556            // Verify the target database matches the synced database
557            let target_db = match Arc::try_unwrap(target_db) {
558                Ok(rw_lock) => rw_lock.into_inner(),
559                Err(_) => panic!("Failed to unwrap Arc - still has references"),
560            };
561            {
562                assert_eq!(synced_db.op_count(), target_db.op_count());
563                assert_eq!(
564                    synced_db.oldest_retained_loc(),
565                    target_db.oldest_retained_loc()
566                );
567                assert_eq!(synced_db.root(&mut hasher), target_db.root(&mut hasher));
568            }
569
570            // Verify all expected operations are present in the synced database
571            let all_ops = [initial_ops, additional_ops].concat();
572            for op in &all_ops {
573                if let Operation::Set(key, value) = op {
574                    let synced_value = synced_db.get(key).await.unwrap();
575                    assert_eq!(synced_value, Some(*value));
576                }
577            }
578
579            synced_db.destroy().await.unwrap();
580            target_db.destroy().await.unwrap();
581        });
582    }
583
584    /// Test that invalid bounds are rejected
585    #[test]
586    fn test_sync_invalid_bounds() {
587        let executor = deterministic::Runner::default();
588        executor.start(|mut context| async move {
589            let target_db = create_test_db(context.clone()).await;
590            let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
591            let config = Config {
592                db_config,
593                fetch_batch_size: NZU64!(10),
594                target: Target {
595                    root: sha256::Digest::from([1u8; 32]),
596                    range: Location::new_unchecked(31)..Location::new_unchecked(31),
597                },
598                context,
599                resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
600                apply_batch_size: 1024,
601                max_outstanding_requests: 1,
602                update_rx: None,
603            };
604            let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
605            match result {
606                Err(sync::Error::Engine(sync::EngineError::InvalidTarget {
607                    lower_bound_pos,
608                    upper_bound_pos,
609                })) => {
610                    assert_eq!(lower_bound_pos, Location::new_unchecked(31));
611                    assert_eq!(upper_bound_pos, Location::new_unchecked(31));
612                }
613                _ => panic!("Expected InvalidTarget error"),
614            }
615        });
616    }
617
618    /// Test that sync works when target database has operations beyond the requested range
619    /// of operations to sync.
620    #[test]
621    fn test_sync_subset_of_target_database() {
622        let executor = deterministic::Runner::default();
623        executor.start(|mut context| async move {
624            let mut target_db = create_test_db(context.clone()).await;
625            let target_ops = create_test_ops(30);
626            // Apply all but the last operation
627            apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
628            target_db.commit(None).await.unwrap();
629
630            let mut hasher = test_hasher();
631            let target_root = target_db.root(&mut hasher);
632            let lower_bound = target_db.oldest_retained_loc().unwrap();
633            let op_count = target_db.op_count();
634
635            // Add final op after capturing the range
636            apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
637            target_db.commit(None).await.unwrap();
638
639            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
640            let config = Config {
641                db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
642                fetch_batch_size: NZU64!(10),
643                target: Target {
644                    root: target_root,
645                    range: lower_bound..op_count,
646                },
647                context,
648                resolver: target_db.clone(),
649                apply_batch_size: 1024,
650                max_outstanding_requests: 1,
651                update_rx: None,
652            };
653            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
654
655            // Verify state matches the specified range
656            let mut hasher = test_hasher();
657            assert_eq!(synced_db.root(&mut hasher), target_root);
658            assert_eq!(synced_db.op_count(), op_count);
659
660            synced_db.destroy().await.unwrap();
661            let target_db =
662                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
663            let inner = target_db.into_inner();
664            inner.destroy().await.unwrap();
665        });
666    }
667
668    // Test syncing where the sync client has some but not all of the operations in the target
669    // database.
670    #[test]
671    fn test_sync_use_existing_db_partial_match() {
672        let executor = deterministic::Runner::default();
673        executor.start(|mut context| async move {
674            let original_ops = create_test_ops(50);
675
676            // Create two databases
677            let mut target_db = create_test_db(context.clone()).await;
678            let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
679            let mut sync_db: ImmutableSyncTest =
680                immutable::Immutable::init(context.clone(), sync_db_config.clone())
681                    .await
682                    .unwrap();
683
684            // Apply the same operations to both databases
685            apply_ops(&mut target_db, original_ops.clone()).await;
686            apply_ops(&mut sync_db, original_ops.clone()).await;
687            target_db.commit(None).await.unwrap();
688            sync_db.commit(None).await.unwrap();
689
690            // Close sync_db
691            sync_db.close().await.unwrap();
692
693            // Add one more operation and commit the target database
694            let last_op = create_test_ops(1);
695            apply_ops(&mut target_db, last_op.clone()).await;
696            target_db.commit(None).await.unwrap();
697            let mut hasher = test_hasher();
698            let root = target_db.root(&mut hasher);
699            let lower_bound = target_db.oldest_retained_loc().unwrap();
700            let upper_bound = target_db.op_count(); // Up to the last operation
701
702            // Reopen the sync database and sync it to the target database
703            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
704            let config = Config {
705                db_config: sync_db_config, // Use same config as before
706                fetch_batch_size: NZU64!(10),
707                target: Target {
708                    root,
709                    range: lower_bound..upper_bound,
710                },
711                context: context.clone(),
712                resolver: target_db.clone(),
713                apply_batch_size: 1024,
714                max_outstanding_requests: 1,
715                update_rx: None,
716            };
717            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
718
719            // Verify database state
720            let mut hasher = test_hasher();
721            assert_eq!(sync_db.op_count(), upper_bound);
722            assert_eq!(sync_db.root(&mut hasher), root);
723
724            sync_db.destroy().await.unwrap();
725            let target_db =
726                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
727            let inner = target_db.into_inner();
728            inner.destroy().await.unwrap();
729        });
730    }
731
732    /// Test case where existing database on disk exactly matches the sync target
733    #[test]
734    fn test_sync_use_existing_db_exact_match() {
735        let executor = deterministic::Runner::default();
736        executor.start(|mut context| async move {
737            let target_ops = create_test_ops(40);
738
739            // Create two databases
740            let mut target_db = create_test_db(context.clone()).await;
741            let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
742            let mut sync_db: ImmutableSyncTest =
743                immutable::Immutable::init(context.clone(), sync_config.clone())
744                    .await
745                    .unwrap();
746
747            // Apply the same operations to both databases
748            apply_ops(&mut target_db, target_ops.clone()).await;
749            apply_ops(&mut sync_db, target_ops.clone()).await;
750            target_db.commit(None).await.unwrap();
751            sync_db.commit(None).await.unwrap();
752
753            // Close sync_db
754            sync_db.close().await.unwrap();
755
756            // Prepare target
757            let mut hasher = test_hasher();
758            let root = target_db.root(&mut hasher);
759            let lower_bound = target_db.oldest_retained_loc().unwrap();
760            let upper_bound = target_db.op_count();
761
762            // Sync should complete immediately without fetching
763            let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
764            let config = Config {
765                db_config: sync_config,
766                fetch_batch_size: NZU64!(10),
767                target: Target {
768                    root,
769                    range: lower_bound..upper_bound,
770                },
771                context,
772                resolver: resolver.clone(),
773                apply_batch_size: 1024,
774                max_outstanding_requests: 1,
775                update_rx: None,
776            };
777            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
778
779            assert_eq!(sync_db.op_count(), upper_bound);
780            let mut hasher = test_hasher();
781            assert_eq!(sync_db.root(&mut hasher), root);
782
783            sync_db.destroy().await.unwrap();
784            let target_db =
785                Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
786            let inner = target_db.into_inner();
787            inner.destroy().await.unwrap();
788        });
789    }
790
791    /// Test that the client fails to sync if the lower bound is decreased
792    #[test_traced("WARN")]
793    fn test_target_update_lower_bound_decrease() {
794        let executor = deterministic::Runner::default();
795        executor.start(|mut context| async move {
796            // Create and populate target database
797            let mut target_db = create_test_db(context.clone()).await;
798            let target_ops = create_test_ops(100);
799            apply_ops(&mut target_db, target_ops).await;
800            target_db.commit(None).await.unwrap();
801
802            target_db.prune(Location::new_unchecked(10)).await.unwrap();
803
804            // Capture initial target state
805            let mut hasher = test_hasher();
806            let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
807            let initial_upper_bound = target_db.op_count();
808            let initial_root = target_db.root(&mut hasher);
809
810            // Create client with initial target
811            let (mut update_sender, update_receiver) = mpsc::channel(1);
812            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
813            let config = Config {
814                context: context.clone(),
815                db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
816                fetch_batch_size: NZU64!(5),
817                target: Target {
818                    root: initial_root,
819                    range: initial_lower_bound..initial_upper_bound,
820                },
821                resolver: target_db.clone(),
822                apply_batch_size: 1024,
823                max_outstanding_requests: 10,
824                update_rx: Some(update_receiver),
825            };
826            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
827
828            // Send target update with decreased lower bound
829            update_sender
830                .send(Target {
831                    root: initial_root,
832                    range: initial_lower_bound.checked_sub(1).unwrap()..initial_upper_bound,
833                })
834                .await
835                .unwrap();
836
837            let result = client.step().await;
838            assert!(matches!(
839                result,
840                Err(sync::Error::Engine(
841                    sync::EngineError::SyncTargetMovedBackward { .. }
842                ))
843            ));
844
845            let target_db =
846                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
847            let inner = target_db.into_inner();
848            inner.destroy().await.unwrap();
849        });
850    }
851
852    /// Test that the client fails to sync if the upper bound is decreased
853    #[test_traced("WARN")]
854    fn test_target_update_upper_bound_decrease() {
855        let executor = deterministic::Runner::default();
856        executor.start(|mut context| async move {
857            // Create and populate target database
858            let mut target_db = create_test_db(context.clone()).await;
859            let target_ops = create_test_ops(50);
860            apply_ops(&mut target_db, target_ops).await;
861            target_db.commit(None).await.unwrap();
862
863            // Capture initial target state
864            let mut hasher = test_hasher();
865            let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
866            let initial_upper_bound = target_db.op_count();
867            let initial_root = target_db.root(&mut hasher);
868
869            // Create client with initial target
870            let (mut update_sender, update_receiver) = mpsc::channel(1);
871            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
872            let config = Config {
873                context: context.clone(),
874                db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
875                fetch_batch_size: NZU64!(5),
876                target: Target {
877                    root: initial_root,
878                    range: initial_lower_bound..initial_upper_bound,
879                },
880                resolver: target_db.clone(),
881                apply_batch_size: 1024,
882                max_outstanding_requests: 10,
883                update_rx: Some(update_receiver),
884            };
885            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
886
887            // Send target update with decreased upper bound
888            update_sender
889                .send(Target {
890                    root: initial_root,
891                    range: initial_lower_bound..(initial_upper_bound - 1),
892                })
893                .await
894                .unwrap();
895
896            let result = client.step().await;
897            assert!(matches!(
898                result,
899                Err(sync::Error::Engine(
900                    sync::EngineError::SyncTargetMovedBackward { .. }
901                ))
902            ));
903
904            let target_db =
905                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
906            let inner = target_db.into_inner();
907            inner.destroy().await.unwrap();
908        });
909    }
910
911    /// Test that the client succeeds when bounds are updated
912    #[test_traced("WARN")]
913    fn test_target_update_bounds_increase() {
914        let executor = deterministic::Runner::default();
915        executor.start(|mut context| async move {
916            // Create and populate target database
917            let mut target_db = create_test_db(context.clone()).await;
918            let target_ops = create_test_ops(100);
919            apply_ops(&mut target_db, target_ops.clone()).await;
920            target_db.commit(None).await.unwrap();
921
922            // Capture initial target state
923            let mut hasher = test_hasher();
924            let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
925            let initial_upper_bound = target_db.op_count();
926            let initial_root = target_db.root(&mut hasher);
927
928            // Apply more operations to the target database
929            let more_ops = create_test_ops(5);
930            apply_ops(&mut target_db, more_ops.clone()).await;
931            target_db.commit(None).await.unwrap();
932
933            target_db.prune(Location::new_unchecked(10)).await.unwrap();
934            target_db.commit(None).await.unwrap();
935
936            // Capture final target state
937            let mut hasher = test_hasher();
938            let final_lower_bound = target_db.oldest_retained_loc().unwrap();
939            let final_upper_bound = target_db.op_count();
940            let final_root = target_db.root(&mut hasher);
941
942            // Assert we're actually updating the bounds
943            assert_ne!(final_lower_bound, initial_lower_bound);
944            assert_ne!(final_upper_bound, initial_upper_bound);
945
946            // Create client with initial target
947            let (mut update_sender, update_receiver) = mpsc::channel(1);
948            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
949            let config = Config {
950                context: context.clone(),
951                db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
952                fetch_batch_size: NZU64!(1),
953                target: Target {
954                    root: initial_root,
955                    range: initial_lower_bound..initial_upper_bound,
956                },
957                resolver: target_db.clone(),
958                apply_batch_size: 1024,
959                max_outstanding_requests: 1,
960                update_rx: Some(update_receiver),
961            };
962
963            // Send target update with increased upper bound
964            update_sender
965                .send(Target {
966                    root: final_root,
967                    range: final_lower_bound..final_upper_bound,
968                })
969                .await
970                .unwrap();
971
972            // Complete the sync
973            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
974
975            // Verify the synced database has the expected state
976            let mut hasher = test_hasher();
977            assert_eq!(synced_db.root(&mut hasher), final_root);
978            assert_eq!(synced_db.op_count(), final_upper_bound);
979            assert_eq!(synced_db.oldest_retained_loc().unwrap(), final_lower_bound);
980
981            synced_db.destroy().await.unwrap();
982            let target_db =
983                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
984            let inner = target_db.into_inner();
985            inner.destroy().await.unwrap();
986        });
987    }
988
989    /// Test that the client fails to sync with invalid bounds (lower > upper)
990    #[test_traced("WARN")]
991    fn test_target_update_invalid_bounds() {
992        let executor = deterministic::Runner::default();
993        executor.start(|mut context| async move {
994            // Create and populate target database
995            let mut target_db = create_test_db(context.clone()).await;
996            let target_ops = create_test_ops(25);
997            apply_ops(&mut target_db, target_ops).await;
998            target_db.commit(None).await.unwrap();
999
1000            // Capture initial target state
1001            let mut hasher = test_hasher();
1002            let initial_lower_bound = target_db.oldest_retained_loc().unwrap();
1003            let initial_upper_bound = target_db.op_count();
1004            let initial_root = target_db.root(&mut hasher);
1005
1006            // Create client with initial target
1007            let (mut update_sender, update_receiver) = mpsc::channel(1);
1008            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1009            let config = Config {
1010                context: context.clone(),
1011                db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
1012                fetch_batch_size: NZU64!(5),
1013                target: Target {
1014                    root: initial_root,
1015                    range: initial_lower_bound..initial_upper_bound,
1016                },
1017                resolver: target_db.clone(),
1018                apply_batch_size: 1024,
1019                max_outstanding_requests: 10,
1020                update_rx: Some(update_receiver),
1021            };
1022            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
1023
1024            // Send target update with invalid bounds (lower > upper)
1025            update_sender
1026                .send(Target {
1027                    root: initial_root,
1028                    range: initial_upper_bound..initial_lower_bound,
1029                })
1030                .await
1031                .unwrap();
1032
1033            let result = client.step().await;
1034            assert!(matches!(
1035                result,
1036                Err(sync::Error::Engine(sync::EngineError::InvalidTarget { .. }))
1037            ));
1038
1039            let target_db =
1040                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1041            let inner = target_db.into_inner();
1042            inner.destroy().await.unwrap();
1043        });
1044    }
1045
1046    /// Test that target updates can be sent even after the client is done
1047    #[test_traced("WARN")]
1048    fn test_target_update_on_done_client() {
1049        let executor = deterministic::Runner::default();
1050        executor.start(|mut context| async move {
1051            // Create and populate target database
1052            let mut target_db = create_test_db(context.clone()).await;
1053            let target_ops = create_test_ops(10);
1054            apply_ops(&mut target_db, target_ops).await;
1055            target_db.commit(None).await.unwrap();
1056
1057            // Capture target state
1058            let mut hasher = test_hasher();
1059            let lower_bound = target_db.oldest_retained_loc().unwrap();
1060            let upper_bound = target_db.op_count();
1061            let root = target_db.root(&mut hasher);
1062
1063            // Create client with target that will complete immediately
1064            let (mut update_sender, update_receiver) = mpsc::channel(1);
1065            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1066            let config = Config {
1067                context: context.clone(),
1068                db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1069                fetch_batch_size: NZU64!(20),
1070                target: Target {
1071                    root,
1072                    range: lower_bound..upper_bound,
1073                },
1074                resolver: target_db.clone(),
1075                apply_batch_size: 1024,
1076                max_outstanding_requests: 10,
1077                update_rx: Some(update_receiver),
1078            };
1079
1080            // Complete the sync
1081            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1082
1083            // Attempt to apply a target update after sync is complete to verify we don't panic
1084            let _ = update_sender
1085                .send(Target {
1086                    root: sha256::Digest::from([2u8; 32]),
1087                    range: lower_bound + 1..upper_bound + 1,
1088                })
1089                .await;
1090
1091            // Verify the synced database has the expected state
1092            let mut hasher = test_hasher();
1093            assert_eq!(synced_db.root(&mut hasher), root);
1094            assert_eq!(synced_db.op_count(), upper_bound);
1095            assert_eq!(synced_db.oldest_retained_loc().unwrap(), lower_bound);
1096
1097            synced_db.destroy().await.unwrap();
1098            Arc::try_unwrap(target_db)
1099                .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1100                .into_inner()
1101                .destroy()
1102                .await
1103                .unwrap();
1104        });
1105    }
1106}