commonware_storage/adb/immutable/sync/
mod.rs

1use crate::{
2    adb::{
3        any::variable::sync::init_journal,
4        immutable,
5        sync::{self, Journal as _},
6    },
7    journal::variable,
8    mmr::hasher::Standard,
9    store::operation::Variable,
10    translator::Translator,
11};
12use commonware_codec::Codec;
13use commonware_cryptography::Hasher;
14use commonware_runtime::{Clock, Metrics, Storage};
15use commonware_utils::{Array, NZUsize};
16use futures::{pin_mut, StreamExt};
17use std::num::NonZeroU64;
18
19mod journal;
20
21// Compute the next append location (size) by scanning the variable journal and
22// counting only items whose logical location is within [lower_bound, upper_bound].
23async fn compute_size<E, K, V>(
24    journal: &variable::Journal<E, Variable<K, V>>,
25    items_per_section: NonZeroU64,
26    lower_bound: u64,
27    upper_bound: u64,
28) -> Result<u64, crate::journal::Error>
29where
30    E: Storage + Metrics,
31    K: Array,
32    V: Codec,
33{
34    let items_per_section = items_per_section.get();
35    let mut size = lower_bound;
36    let mut current_section: Option<u64> = None;
37    let mut index_in_section: u64 = 0;
38    let stream = journal.replay(0, 0, NZUsize!(1024)).await?;
39    pin_mut!(stream);
40    while let Some(item) = stream.next().await {
41        match item {
42            Ok((section, _offset, _size, _op)) => {
43                if current_section != Some(section) {
44                    current_section = Some(section);
45                    index_in_section = 0;
46                }
47                let loc = section
48                    .saturating_mul(items_per_section)
49                    .saturating_add(index_in_section);
50                if loc < lower_bound {
51                    index_in_section = index_in_section.saturating_add(1);
52                    continue;
53                }
54                if loc > upper_bound {
55                    return Ok(size);
56                }
57                size = loc.saturating_add(1);
58                index_in_section = index_in_section.saturating_add(1);
59            }
60            Err(e) => return Err(e),
61        }
62    }
63    Ok(size)
64}
65
66impl<E, K, V, H, T> sync::Database for immutable::Immutable<E, K, V, H, T>
67where
68    E: Storage + Clock + Metrics,
69    K: Array,
70    V: Codec,
71    H: Hasher,
72    T: Translator,
73{
74    type Op = Variable<K, V>;
75    type Journal = journal::Journal<E, K, V>;
76    type Hasher = H;
77    type Error = crate::adb::Error;
78    type Config = immutable::Config<T, V::Cfg>;
79    type Digest = H::Digest;
80    type Context = E;
81
82    async fn create_journal(
83        context: Self::Context,
84        config: &Self::Config,
85        lower_bound_loc: u64,
86        upper_bound_loc: u64,
87    ) -> Result<Self::Journal, <Self::Journal as sync::Journal>::Error> {
88        // Open the journal and discard operations outside the sync range.
89        let journal = init_journal(
90            context.with_label("log"),
91            variable::Config {
92                partition: config.log_journal_partition.clone(),
93                compression: config.log_compression,
94                codec_config: config.log_codec_config.clone(),
95                write_buffer: config.log_write_buffer,
96                buffer_pool: config.buffer_pool.clone(),
97            },
98            lower_bound_loc,
99            upper_bound_loc,
100            config.log_items_per_section,
101        )
102        .await?;
103
104        // Compute next append location based on logical locations within [lower_bound, upper_bound]
105        let size = compute_size(
106            &journal,
107            config.log_items_per_section,
108            lower_bound_loc,
109            upper_bound_loc,
110        )
111        .await?;
112
113        Ok(journal::Journal::new(
114            journal,
115            config.log_items_per_section,
116            size,
117        ))
118    }
119
120    /// Returns a [super::Immutable] initialized data collected in the sync process.
121    ///
122    /// # Behavior
123    ///
124    /// This method handles different initialization scenarios based on existing data:
125    /// - If the MMR journal is empty or the last item is before `lower_bound`, it creates a
126    ///   fresh MMR from the provided `pinned_nodes`
127    /// - If the MMR journal has data but is incomplete (< `upper_bound`), missing operations
128    ///   from the log are applied to bring it up to the target state
129    /// - If the MMR journal has data beyond the `upper_bound`, it is rewound to match the sync target
130    ///
131    /// # Returns
132    ///
133    /// A [super::Immutable] db populated with the state from `lower_bound` to `upper_bound`, inclusive.
134    /// The pruning boundary is set to `lower_bound`.
135    async fn from_sync_result(
136        context: Self::Context,
137        db_config: Self::Config,
138        journal: Self::Journal,
139        pinned_nodes: Option<Vec<Self::Digest>>,
140        lower_bound: u64,
141        upper_bound: u64,
142        apply_batch_size: usize,
143    ) -> Result<Self, Self::Error> {
144        let journal = journal.into_inner();
145        let sync_config = Config {
146            db_config,
147            log: journal,
148            lower_bound,
149            upper_bound,
150            pinned_nodes,
151            apply_batch_size,
152        };
153        Self::init_synced(context, sync_config).await
154    }
155
156    fn root(&self) -> Self::Digest {
157        let mut hasher = Standard::<H>::new();
158        self.root(&mut hasher)
159    }
160
161    async fn resize_journal(
162        journal: Self::Journal,
163        context: Self::Context,
164        config: &Self::Config,
165        lower_bound: u64,
166        upper_bound: u64,
167    ) -> Result<Self::Journal, Self::Error> {
168        let size = journal.size().await.map_err(crate::adb::Error::from)?;
169
170        if size <= lower_bound {
171            // Close existing journal and create new one
172            journal.close().await.map_err(crate::adb::Error::from)?;
173            Self::create_journal(context, config, lower_bound, upper_bound)
174                .await
175                .map_err(crate::adb::Error::from)
176        } else {
177            // Extract the Variable journal to perform section-based pruning
178            let mut variable_journal = journal.into_inner();
179
180            // Use Variable journal's section-based pruning
181            let items_per_section = config.log_items_per_section.get();
182            let lower_section = lower_bound / items_per_section;
183            variable_journal
184                .prune(lower_section)
185                .await
186                .map_err(crate::adb::Error::from)?;
187
188            // Compute next append location based on logical locations within [lower_bound, upper_bound]
189            let size = compute_size(
190                &variable_journal,
191                config.log_items_per_section,
192                lower_bound,
193                upper_bound,
194            )
195            .await
196            .map_err(crate::adb::Error::from)?;
197
198            Ok(journal::Journal::new(
199                variable_journal,
200                config.log_items_per_section,
201                size,
202            ))
203        }
204    }
205}
206
207/// Configuration for syncing an [immutable::Immutable] to a target state.
208pub struct Config<E, K, V, T, D, C>
209where
210    E: Storage + Metrics,
211    K: Array,
212    V: Codec,
213    T: Translator,
214    D: commonware_cryptography::Digest,
215{
216    /// Database configuration.
217    pub db_config: immutable::Config<T, C>,
218
219    /// The [immutable::Immutable]'s log of operations. It has elements from `lower_bound` to
220    /// `upper_bound`, inclusive. Reports `lower_bound` as its pruning boundary (oldest retained
221    /// operation index).
222    pub log: variable::Journal<E, Variable<K, V>>,
223
224    /// Sync lower boundary (inclusive) - operations below this index are pruned.
225    pub lower_bound: u64,
226
227    /// Sync upper boundary (inclusive) - operations above this index are not synced.
228    pub upper_bound: u64,
229
230    /// The pinned nodes the MMR needs at the pruning boundary given by
231    /// `lower_bound`, in the order specified by `Proof::nodes_to_pin`.
232    /// If `None`, the pinned nodes will be computed from the MMR's journal and metadata,
233    /// which are expected to have the necessary pinned nodes.
234    pub pinned_nodes: Option<Vec<D>>,
235
236    /// The maximum number of operations to keep in memory
237    /// before committing the database while applying operations.
238    /// Higher value will cause more memory usage during sync.
239    pub apply_batch_size: usize,
240}
241
242#[cfg(test)]
243mod tests {
244    use crate::{
245        adb::{
246            immutable,
247            sync::{
248                self,
249                engine::{Config, NextStep},
250                Engine, Journal, Target,
251            },
252        },
253        mmr::hasher::Standard,
254        store::operation::Variable,
255        translator::TwoCap,
256    };
257    use commonware_cryptography::{sha256, Digest, Sha256};
258    use commonware_macros::test_traced;
259    use commonware_runtime::{buffer::PoolRef, deterministic, Runner as _, RwLock};
260    use commonware_utils::{NZUsize, NZU64};
261    use futures::{channel::mpsc, SinkExt as _};
262    use rand::{rngs::StdRng, RngCore as _, SeedableRng as _};
263    use std::{
264        collections::HashMap,
265        num::{NonZeroU64, NonZeroUsize},
266        sync::Arc,
267    };
268    use test_case::test_case;
269
270    /// Type alias for sync tests with simple codec config
271    type ImmutableSyncTest = immutable::Immutable<
272        deterministic::Context,
273        sha256::Digest,
274        sha256::Digest,
275        Sha256,
276        crate::translator::TwoCap,
277    >;
278
279    fn test_hasher() -> Standard<Sha256> {
280        Standard::<Sha256>::new()
281    }
282
283    /// Create a simple config for sync tests
284    fn create_sync_config(suffix: &str) -> immutable::Config<crate::translator::TwoCap, ()> {
285        const PAGE_SIZE: NonZeroUsize = NZUsize!(77);
286        const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
287        const ITEMS_PER_SECTION: NonZeroU64 = NZU64!(5);
288
289        immutable::Config {
290            mmr_journal_partition: format!("journal_{suffix}"),
291            mmr_metadata_partition: format!("metadata_{suffix}"),
292            mmr_items_per_blob: NZU64!(11),
293            mmr_write_buffer: NZUsize!(1024),
294            log_journal_partition: format!("log_journal_{suffix}"),
295            log_items_per_section: ITEMS_PER_SECTION,
296            log_compression: None,
297            log_codec_config: (),
298            log_write_buffer: NZUsize!(1024),
299            locations_journal_partition: format!("locations_journal_{suffix}"),
300            locations_items_per_blob: NZU64!(7),
301            translator: TwoCap,
302            thread_pool: None,
303            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
304        }
305    }
306
307    /// Create a test database with unique partition names
308    async fn create_test_db(mut context: deterministic::Context) -> ImmutableSyncTest {
309        let seed = context.next_u64();
310        let config = create_sync_config(&format!("sync_test_{seed}"));
311        ImmutableSyncTest::init(context, config).await.unwrap()
312    }
313
314    /// Create n random Set operations.
315    /// create_test_ops(n') is a suffix of create_test_ops(n) for n' > n.
316    fn create_test_ops(n: usize) -> Vec<Variable<sha256::Digest, sha256::Digest>> {
317        let mut rng = StdRng::seed_from_u64(1337);
318        let mut ops = Vec::new();
319        for _i in 0..n {
320            let key = sha256::Digest::random(&mut rng);
321            let value = sha256::Digest::random(&mut rng);
322            ops.push(Variable::Set(key, value));
323        }
324        ops
325    }
326
327    /// Applies the given operations to the database.
328    async fn apply_ops(
329        db: &mut ImmutableSyncTest,
330        ops: Vec<Variable<sha256::Digest, sha256::Digest>>,
331    ) {
332        for op in ops {
333            match op {
334                Variable::Set(key, value) => {
335                    db.set(key, value).await.unwrap();
336                }
337                Variable::Commit(metadata) => {
338                    db.commit(metadata).await.unwrap();
339                }
340                _ => {}
341            }
342        }
343    }
344
345    #[test_case(1, NZU64!(1); "singleton db with batch size == 1")]
346    #[test_case(1, NZU64!(2); "singleton db with batch size > db size")]
347    #[test_case(100, NZU64!(1); "db with batch size 1")]
348    #[test_case(100, NZU64!(3); "db size not evenly divided by batch size")]
349    #[test_case(100, NZU64!(99); "db size not evenly divided by batch size; different batch size")]
350    #[test_case(100, NZU64!(50); "db size divided by batch size")]
351    #[test_case(100, NZU64!(100); "db size == batch size")]
352    #[test_case(100, NZU64!(101); "batch size > db size")]
353    fn test_sync(target_db_ops: usize, fetch_batch_size: NonZeroU64) {
354        let executor = deterministic::Runner::default();
355        executor.start(|mut context| async move {
356            let mut target_db = create_test_db(context.clone()).await;
357            let target_db_ops = create_test_ops(target_db_ops);
358            apply_ops(&mut target_db, target_db_ops.clone()).await;
359            let metadata = Some(Sha256::fill(1));
360            target_db.commit(metadata).await.unwrap();
361            let target_op_count = target_db.op_count();
362            let target_oldest_retained_loc = target_db.oldest_retained_loc;
363            let mut hasher = test_hasher();
364            let target_root = target_db.root(&mut hasher);
365
366            // Capture target database state before moving into config
367            let mut expected_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
368            for op in &target_db_ops {
369                if let Variable::Set(key, value) = op {
370                    expected_kvs.insert(*key, *value);
371                }
372            }
373
374            let db_config = create_sync_config(&format!("sync_client_{}", context.next_u64()));
375
376            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
377            let config = Config {
378                db_config: db_config.clone(),
379                fetch_batch_size,
380                target: Target {
381                    root: target_root,
382                    lower_bound_ops: target_oldest_retained_loc,
383                    upper_bound_ops: target_op_count - 1, // target_op_count is the count, operations are 0-indexed
384                },
385                context: context.clone(),
386                resolver: target_db.clone(),
387                apply_batch_size: 1024,
388                max_outstanding_requests: 1,
389                update_rx: None,
390            };
391            let mut got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
392
393            // Verify database state
394            let mut hasher = test_hasher();
395            assert_eq!(got_db.op_count(), target_op_count);
396            assert_eq!(got_db.oldest_retained_loc, target_oldest_retained_loc);
397
398            // Verify the root digest matches the target
399            assert_eq!(got_db.root(&mut hasher), target_root);
400
401            // Verify that the synced database matches the target state
402            for (key, expected_value) in &expected_kvs {
403                let synced_value = got_db.get(key).await.unwrap();
404                assert_eq!(synced_value, Some(*expected_value));
405            }
406
407            // Put more key-value pairs into both databases
408            let mut new_ops = Vec::new();
409            let mut rng = StdRng::seed_from_u64(42);
410            let mut new_kvs: HashMap<sha256::Digest, sha256::Digest> = HashMap::new();
411            for _i in 0..expected_kvs.len() {
412                let key = sha256::Digest::random(&mut rng);
413                let value = sha256::Digest::random(&mut rng);
414                new_ops.push(Variable::Set(key, value));
415                new_kvs.insert(key, value);
416            }
417
418            // Apply new operations to both databases
419            apply_ops(&mut got_db, new_ops.clone()).await;
420            {
421                let mut target_db = target_db.write().await;
422                apply_ops(&mut target_db, new_ops).await;
423            }
424
425            // Verify both databases have the same state after additional operations
426            for (key, expected_value) in &new_kvs {
427                let synced_value = got_db.get(key).await.unwrap();
428                let target_value = {
429                    let target_db = target_db.read().await;
430                    target_db.get(key).await.unwrap()
431                };
432                assert_eq!(synced_value, Some(*expected_value));
433                assert_eq!(target_value, Some(*expected_value));
434            }
435
436            got_db.destroy().await.unwrap();
437            let target_db = match Arc::try_unwrap(target_db) {
438                Ok(rw_lock) => rw_lock.into_inner(),
439                Err(_) => panic!("Failed to unwrap Arc - still has references"),
440            };
441            target_db.destroy().await.unwrap();
442        });
443    }
444
445    /// Test that sync works when the target database is initially empty
446    #[test_traced("WARN")]
447    fn test_sync_empty_to_nonempty() {
448        let executor = deterministic::Runner::default();
449        executor.start(|mut context| async move {
450            // Create an empty target database
451            let mut target_db = create_test_db(context.clone()).await;
452            target_db.commit(Some(Sha256::fill(1))).await.unwrap(); // Commit to establish a valid root
453
454            let target_op_count = target_db.op_count();
455            let target_oldest_retained_loc = target_db.oldest_retained_loc;
456            let mut hasher = test_hasher();
457            let target_root = target_db.root(&mut hasher);
458
459            let db_config = create_sync_config(&format!("empty_sync_{}", context.next_u64()));
460            let target_db = Arc::new(RwLock::new(target_db));
461            let config = Config {
462                db_config,
463                fetch_batch_size: NZU64!(10),
464                target: Target {
465                    root: target_root,
466                    lower_bound_ops: target_oldest_retained_loc,
467                    upper_bound_ops: target_op_count - 1,
468                },
469                context: context.clone(),
470                resolver: target_db.clone(),
471                apply_batch_size: 1024,
472                max_outstanding_requests: 1,
473                update_rx: None,
474            };
475            let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
476
477            // Verify database state
478            let mut hasher = test_hasher();
479            assert_eq!(got_db.op_count(), target_op_count);
480            assert_eq!(got_db.oldest_retained_loc, target_oldest_retained_loc);
481            assert_eq!(got_db.root(&mut hasher), target_root);
482            assert_eq!(
483                got_db.get_metadata().await.unwrap(),
484                Some((0, Some(Sha256::fill(1))))
485            );
486
487            got_db.destroy().await.unwrap();
488            let target_db = match Arc::try_unwrap(target_db) {
489                Ok(rw_lock) => rw_lock.into_inner(),
490                Err(_) => panic!("Failed to unwrap Arc - still has references"),
491            };
492            target_db.destroy().await.unwrap();
493        });
494    }
495
496    /// Test demonstrating that a synced database can be reopened and retain its state.
497    #[test_traced("WARN")]
498    fn test_sync_database_persistence() {
499        let executor = deterministic::Runner::default();
500        executor.start(|context| async move {
501            // Create and populate a simple target database
502            let mut target_db = create_test_db(context.clone()).await;
503            let target_ops = create_test_ops(10);
504            apply_ops(&mut target_db, target_ops.clone()).await;
505            target_db.commit(Some(Sha256::fill(0))).await.unwrap();
506
507            // Capture target state
508            let mut hasher = test_hasher();
509            let target_root = target_db.root(&mut hasher);
510            let lower_bound = target_db.oldest_retained_loc;
511            let upper_bound = target_db.op_count() - 1;
512
513            // Perform sync
514            let db_config = create_sync_config("persistence_test");
515            let context_clone = context.clone();
516            let target_db = Arc::new(RwLock::new(target_db));
517            let config = Config {
518                db_config: db_config.clone(),
519                fetch_batch_size: NZU64!(5),
520                target: Target {
521                    root: target_root,
522                    lower_bound_ops: lower_bound,
523                    upper_bound_ops: upper_bound,
524                },
525                context,
526                resolver: target_db.clone(),
527                apply_batch_size: 1024,
528                max_outstanding_requests: 1,
529                update_rx: None,
530            };
531            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
532
533            // Verify initial sync worked
534            let mut hasher = test_hasher();
535            assert_eq!(synced_db.root(&mut hasher), target_root);
536
537            // Save state before closing
538            let expected_root = synced_db.root(&mut hasher);
539            let expected_op_count = synced_db.op_count();
540            let expected_oldest_retained_loc = synced_db.oldest_retained_loc;
541
542            // Close and reopen the database to test persistence
543            synced_db.close().await.unwrap();
544            let reopened_db = ImmutableSyncTest::init(context_clone, db_config)
545                .await
546                .unwrap();
547
548            // Verify state is preserved
549            let mut hasher = test_hasher();
550            assert_eq!(reopened_db.root(&mut hasher), expected_root);
551            assert_eq!(reopened_db.op_count(), expected_op_count);
552            assert_eq!(
553                reopened_db.oldest_retained_loc,
554                expected_oldest_retained_loc
555            );
556
557            // Verify data integrity
558            for op in &target_ops {
559                if let Variable::Set(key, value) = op {
560                    let stored_value = reopened_db.get(key).await.unwrap();
561                    assert_eq!(stored_value, Some(*value));
562                }
563            }
564
565            reopened_db.destroy().await.unwrap();
566            let target_db = match Arc::try_unwrap(target_db) {
567                Ok(rw_lock) => rw_lock.into_inner(),
568                Err(_) => panic!("Failed to unwrap Arc - still has references"),
569            };
570            target_db.destroy().await.unwrap();
571        });
572    }
573
574    /// Test that target updates work correctly during sync
575    #[test_traced("WARN")]
576    fn test_target_update_during_sync() {
577        let executor = deterministic::Runner::default();
578        executor.start(|mut context| async move {
579            // Create and populate initial target database
580            let mut target_db = create_test_db(context.clone()).await;
581            let initial_ops = create_test_ops(50);
582            apply_ops(&mut target_db, initial_ops.clone()).await;
583            target_db.commit(None).await.unwrap();
584
585            // Capture the state after first commit
586            let mut hasher = test_hasher();
587            let initial_lower_bound = target_db.oldest_retained_loc;
588            let initial_upper_bound = target_db.op_count() - 1;
589            let initial_root = target_db.root(&mut hasher);
590
591            // Add more operations to create the extended target
592            let additional_ops = create_test_ops(25);
593            apply_ops(&mut target_db, additional_ops.clone()).await;
594            target_db.commit(None).await.unwrap();
595            let final_upper_bound = target_db.op_count() - 1;
596            let final_root = target_db.root(&mut hasher);
597
598            // Wrap target database for shared mutable access
599            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
600
601            // Create client with initial smaller target and very small batch size
602            let (mut update_sender, update_receiver) = mpsc::channel(1);
603            let client = {
604                let config = Config {
605                    context: context.clone(),
606                    db_config: create_sync_config(&format!("update_test_{}", context.next_u64())),
607                    target: Target {
608                        root: initial_root,
609                        lower_bound_ops: initial_lower_bound,
610                        upper_bound_ops: initial_upper_bound,
611                    },
612                    resolver: target_db.clone(),
613                    fetch_batch_size: NZU64!(2), // Very small batch size to ensure multiple batches needed
614                    max_outstanding_requests: 10,
615                    apply_batch_size: 1024,
616                    update_rx: Some(update_receiver),
617                };
618                let mut client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
619                loop {
620                    // Step the client until we have processed a batch of operations
621                    client = match client.step().await.unwrap() {
622                        NextStep::Continue(new_client) => new_client,
623                        NextStep::Complete(_) => panic!("client should not be complete"),
624                    };
625                    let log_size = client.journal().size().await.unwrap();
626                    if log_size > initial_lower_bound {
627                        break client;
628                    }
629                }
630            };
631
632            // Send target update with SAME lower bound but higher upper bound
633            update_sender
634                .send(Target {
635                    root: final_root,
636                    lower_bound_ops: initial_lower_bound,
637                    upper_bound_ops: final_upper_bound,
638                })
639                .await
640                .unwrap();
641
642            // Complete the sync
643            let synced_db = client.sync().await.unwrap();
644
645            // Verify the synced database has the expected final state
646            let mut hasher = test_hasher();
647            assert_eq!(synced_db.root(&mut hasher), final_root);
648
649            // Verify the target database matches the synced database
650            let target_db = match Arc::try_unwrap(target_db) {
651                Ok(rw_lock) => rw_lock.into_inner(),
652                Err(_) => panic!("Failed to unwrap Arc - still has references"),
653            };
654            {
655                assert_eq!(synced_db.op_count(), target_db.op_count());
656                assert_eq!(synced_db.oldest_retained_loc, target_db.oldest_retained_loc);
657                assert_eq!(synced_db.root(&mut hasher), target_db.root(&mut hasher));
658            }
659
660            // Verify all expected operations are present in the synced database
661            let all_ops = [initial_ops, additional_ops].concat();
662            for op in &all_ops {
663                if let Variable::Set(key, value) = op {
664                    let synced_value = synced_db.get(key).await.unwrap();
665                    assert_eq!(synced_value, Some(*value));
666                }
667            }
668
669            synced_db.destroy().await.unwrap();
670            target_db.destroy().await.unwrap();
671        });
672    }
673
674    /// Test that invalid bounds are rejected
675    #[test]
676    fn test_sync_invalid_bounds() {
677        let executor = deterministic::Runner::default();
678        executor.start(|mut context| async move {
679            let target_db = create_test_db(context.clone()).await;
680            let db_config = create_sync_config(&format!("invalid_bounds_{}", context.next_u64()));
681            let config = Config {
682                db_config,
683                fetch_batch_size: NZU64!(10),
684                target: Target {
685                    root: sha256::Digest::from([1u8; 32]),
686                    lower_bound_ops: 31,
687                    upper_bound_ops: 30,
688                },
689                context,
690                resolver: Arc::new(commonware_runtime::RwLock::new(target_db)),
691                apply_batch_size: 1024,
692                max_outstanding_requests: 1,
693                update_rx: None,
694            };
695            let result: Result<ImmutableSyncTest, _> = sync::sync(config).await;
696            assert!(matches!(
697                result,
698                Err(sync::Error::InvalidTarget {
699                    lower_bound_pos: 31,
700                    upper_bound_pos: 30,
701                })
702            ));
703        });
704    }
705
706    /// Test that sync works when target database has operations beyond the requested range
707    /// of operations to sync.
708    #[test]
709    fn test_sync_subset_of_target_database() {
710        let executor = deterministic::Runner::default();
711        executor.start(|mut context| async move {
712            let mut target_db = create_test_db(context.clone()).await;
713            let target_ops = create_test_ops(30);
714            // Apply all but the last operation
715            apply_ops(&mut target_db, target_ops[..29].to_vec()).await;
716            target_db.commit(None).await.unwrap();
717
718            let mut hasher = test_hasher();
719            let target_root = target_db.root(&mut hasher);
720            let lower_bound_ops = target_db.oldest_retained_loc;
721            let upper_bound_ops = target_db.op_count() - 1; // exclude final op
722
723            // Add final op after capturing the range
724            apply_ops(&mut target_db, target_ops[29..].to_vec()).await;
725            target_db.commit(None).await.unwrap();
726
727            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
728            let config = Config {
729                db_config: create_sync_config(&format!("subset_{}", context.next_u64())),
730                fetch_batch_size: NZU64!(10),
731                target: Target {
732                    root: target_root,
733                    lower_bound_ops,
734                    upper_bound_ops,
735                },
736                context,
737                resolver: target_db.clone(),
738                apply_batch_size: 1024,
739                max_outstanding_requests: 1,
740                update_rx: None,
741            };
742            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
743
744            // Verify state matches the specified range
745            let mut hasher = test_hasher();
746            assert_eq!(synced_db.root(&mut hasher), target_root);
747            assert_eq!(synced_db.op_count(), upper_bound_ops + 1);
748
749            synced_db.destroy().await.unwrap();
750            let target_db =
751                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
752            let inner = target_db.into_inner();
753            inner.destroy().await.unwrap();
754        });
755    }
756
757    // Test syncing where the sync client has some but not all of the operations in the target
758    // database.
759    #[test]
760    fn test_sync_use_existing_db_partial_match() {
761        let executor = deterministic::Runner::default();
762        executor.start(|mut context| async move {
763            let original_ops = create_test_ops(50);
764
765            // Create two databases
766            let mut target_db = create_test_db(context.clone()).await;
767            let sync_db_config = create_sync_config(&format!("partial_{}", context.next_u64()));
768            let mut sync_db: ImmutableSyncTest =
769                immutable::Immutable::init(context.clone(), sync_db_config.clone())
770                    .await
771                    .unwrap();
772
773            // Apply the same operations to both databases
774            apply_ops(&mut target_db, original_ops.clone()).await;
775            apply_ops(&mut sync_db, original_ops.clone()).await;
776            target_db.commit(None).await.unwrap();
777            sync_db.commit(None).await.unwrap();
778
779            // Close sync_db
780            sync_db.close().await.unwrap();
781
782            // Add one more operation and commit the target database
783            let last_op = create_test_ops(1);
784            apply_ops(&mut target_db, last_op.clone()).await;
785            target_db.commit(None).await.unwrap();
786            let mut hasher = test_hasher();
787            let root = target_db.root(&mut hasher);
788            let lower_bound_ops = target_db.oldest_retained_loc;
789            let upper_bound_ops = target_db.op_count() - 1; // Up to the last operation
790
791            // Reopen the sync database and sync it to the target database
792            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
793            let config = Config {
794                db_config: sync_db_config, // Use same config as before
795                fetch_batch_size: NZU64!(10),
796                target: Target {
797                    root,
798                    lower_bound_ops,
799                    upper_bound_ops,
800                },
801                context: context.clone(),
802                resolver: target_db.clone(),
803                apply_batch_size: 1024,
804                max_outstanding_requests: 1,
805                update_rx: None,
806            };
807            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
808
809            // Verify database state
810            let mut hasher = test_hasher();
811            assert_eq!(sync_db.op_count(), upper_bound_ops + 1);
812            assert_eq!(sync_db.root(&mut hasher), root);
813
814            sync_db.destroy().await.unwrap();
815            let target_db =
816                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
817            let inner = target_db.into_inner();
818            inner.destroy().await.unwrap();
819        });
820    }
821
822    /// Test case where existing database on disk exactly matches the sync target
823    #[test]
824    fn test_sync_use_existing_db_exact_match() {
825        let executor = deterministic::Runner::default();
826        executor.start(|mut context| async move {
827            let target_ops = create_test_ops(40);
828
829            // Create two databases
830            let mut target_db = create_test_db(context.clone()).await;
831            let sync_config = create_sync_config(&format!("exact_{}", context.next_u64()));
832            let mut sync_db: ImmutableSyncTest =
833                immutable::Immutable::init(context.clone(), sync_config.clone())
834                    .await
835                    .unwrap();
836
837            // Apply the same operations to both databases
838            apply_ops(&mut target_db, target_ops.clone()).await;
839            apply_ops(&mut sync_db, target_ops.clone()).await;
840            target_db.commit(None).await.unwrap();
841            sync_db.commit(None).await.unwrap();
842
843            // Close sync_db
844            sync_db.close().await.unwrap();
845
846            // Prepare target
847            let mut hasher = test_hasher();
848            let root = target_db.root(&mut hasher);
849            let lower_bound_ops = target_db.oldest_retained_loc;
850            let upper_bound_ops = target_db.op_count() - 1;
851
852            // Sync should complete immediately without fetching
853            let resolver = Arc::new(commonware_runtime::RwLock::new(target_db));
854            let config = Config {
855                db_config: sync_config,
856                fetch_batch_size: NZU64!(10),
857                target: Target {
858                    root,
859                    lower_bound_ops,
860                    upper_bound_ops,
861                },
862                context,
863                resolver: resolver.clone(),
864                apply_batch_size: 1024,
865                max_outstanding_requests: 1,
866                update_rx: None,
867            };
868            let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
869
870            assert_eq!(sync_db.op_count(), upper_bound_ops + 1);
871            let mut hasher = test_hasher();
872            assert_eq!(sync_db.root(&mut hasher), root);
873
874            sync_db.destroy().await.unwrap();
875            let target_db =
876                Arc::try_unwrap(resolver).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
877            let inner = target_db.into_inner();
878            inner.destroy().await.unwrap();
879        });
880    }
881
882    /// Test that the client fails to sync if the lower bound is decreased
883    #[test_traced("WARN")]
884    fn test_target_update_lower_bound_decrease() {
885        let executor = deterministic::Runner::default();
886        executor.start(|mut context| async move {
887            // Create and populate target database
888            let mut target_db = create_test_db(context.clone()).await;
889            let target_ops = create_test_ops(100);
890            apply_ops(&mut target_db, target_ops).await;
891            target_db.commit(None).await.unwrap();
892
893            target_db.prune(10).await.unwrap();
894
895            // Capture initial target state
896            let mut hasher = test_hasher();
897            let initial_lower_bound = target_db.oldest_retained_loc;
898            let initial_upper_bound = target_db.op_count() - 1;
899            let initial_root = target_db.root(&mut hasher);
900
901            // Create client with initial target
902            let (mut update_sender, update_receiver) = mpsc::channel(1);
903            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
904            let config = Config {
905                context: context.clone(),
906                db_config: create_sync_config(&format!("lb_dec_{}", context.next_u64())),
907                fetch_batch_size: NZU64!(5),
908                target: Target {
909                    root: initial_root,
910                    lower_bound_ops: initial_lower_bound,
911                    upper_bound_ops: initial_upper_bound,
912                },
913                resolver: target_db.clone(),
914                apply_batch_size: 1024,
915                max_outstanding_requests: 10,
916                update_rx: Some(update_receiver),
917            };
918            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
919
920            // Send target update with decreased lower bound
921            update_sender
922                .send(Target {
923                    root: initial_root,
924                    lower_bound_ops: initial_lower_bound.saturating_sub(1),
925                    upper_bound_ops: initial_upper_bound,
926                })
927                .await
928                .unwrap();
929
930            let result = client.step().await;
931            assert!(matches!(
932                result,
933                Err(sync::Error::SyncTargetMovedBackward { .. })
934            ));
935
936            let target_db =
937                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
938            let inner = target_db.into_inner();
939            inner.destroy().await.unwrap();
940        });
941    }
942
943    /// Test that the client fails to sync if the upper bound is decreased
944    #[test_traced("WARN")]
945    fn test_target_update_upper_bound_decrease() {
946        let executor = deterministic::Runner::default();
947        executor.start(|mut context| async move {
948            // Create and populate target database
949            let mut target_db = create_test_db(context.clone()).await;
950            let target_ops = create_test_ops(50);
951            apply_ops(&mut target_db, target_ops).await;
952            target_db.commit(None).await.unwrap();
953
954            // Capture initial target state
955            let mut hasher = test_hasher();
956            let initial_lower_bound = target_db.oldest_retained_loc;
957            let initial_upper_bound = target_db.op_count() - 1;
958            let initial_root = target_db.root(&mut hasher);
959
960            // Create client with initial target
961            let (mut update_sender, update_receiver) = mpsc::channel(1);
962            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
963            let config = Config {
964                context: context.clone(),
965                db_config: create_sync_config(&format!("ub_dec_{}", context.next_u64())),
966                fetch_batch_size: NZU64!(5),
967                target: Target {
968                    root: initial_root,
969                    lower_bound_ops: initial_lower_bound,
970                    upper_bound_ops: initial_upper_bound,
971                },
972                resolver: target_db.clone(),
973                apply_batch_size: 1024,
974                max_outstanding_requests: 10,
975                update_rx: Some(update_receiver),
976            };
977            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
978
979            // Send target update with decreased upper bound
980            update_sender
981                .send(Target {
982                    root: initial_root,
983                    lower_bound_ops: initial_lower_bound,
984                    upper_bound_ops: initial_upper_bound.saturating_sub(1),
985                })
986                .await
987                .unwrap();
988
989            let result = client.step().await;
990            assert!(matches!(
991                result,
992                Err(sync::Error::SyncTargetMovedBackward { .. })
993            ));
994
995            let target_db =
996                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
997            let inner = target_db.into_inner();
998            inner.destroy().await.unwrap();
999        });
1000    }
1001
1002    /// Test that the client succeeds when bounds are updated
1003    #[test_traced("WARN")]
1004    fn test_target_update_bounds_increase() {
1005        let executor = deterministic::Runner::default();
1006        executor.start(|mut context| async move {
1007            // Create and populate target database
1008            let mut target_db = create_test_db(context.clone()).await;
1009            let target_ops = create_test_ops(100);
1010            apply_ops(&mut target_db, target_ops.clone()).await;
1011            target_db.commit(None).await.unwrap();
1012
1013            // Capture initial target state
1014            let mut hasher = test_hasher();
1015            let initial_lower_bound = target_db.oldest_retained_loc;
1016            let initial_upper_bound = target_db.op_count() - 1;
1017            let initial_root = target_db.root(&mut hasher);
1018
1019            // Apply more operations to the target database
1020            let more_ops = create_test_ops(5);
1021            apply_ops(&mut target_db, more_ops.clone()).await;
1022            target_db.commit(None).await.unwrap();
1023
1024            target_db.prune(10).await.unwrap();
1025            target_db.commit(None).await.unwrap();
1026
1027            // Capture final target state
1028            let mut hasher = test_hasher();
1029            let final_lower_bound = target_db.oldest_retained_loc;
1030            let final_upper_bound = target_db.op_count() - 1;
1031            let final_root = target_db.root(&mut hasher);
1032
1033            // Assert we're actually updating the bounds
1034            assert_ne!(final_lower_bound, initial_lower_bound);
1035            assert_ne!(final_upper_bound, initial_upper_bound);
1036
1037            // Create client with initial target
1038            let (mut update_sender, update_receiver) = mpsc::channel(1);
1039            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1040            let config = Config {
1041                context: context.clone(),
1042                db_config: create_sync_config(&format!("bounds_inc_{}", context.next_u64())),
1043                fetch_batch_size: NZU64!(1),
1044                target: Target {
1045                    root: initial_root,
1046                    lower_bound_ops: initial_lower_bound,
1047                    upper_bound_ops: initial_upper_bound,
1048                },
1049                resolver: target_db.clone(),
1050                apply_batch_size: 1024,
1051                max_outstanding_requests: 1,
1052                update_rx: Some(update_receiver),
1053            };
1054
1055            // Send target update with increased bounds
1056            update_sender
1057                .send(Target {
1058                    root: final_root,
1059                    lower_bound_ops: final_lower_bound,
1060                    upper_bound_ops: final_upper_bound,
1061                })
1062                .await
1063                .unwrap();
1064
1065            // Complete the sync
1066            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1067
1068            // Verify the synced database has the expected state
1069            let mut hasher = test_hasher();
1070            assert_eq!(synced_db.root(&mut hasher), final_root);
1071            assert_eq!(synced_db.op_count(), final_upper_bound + 1);
1072            assert_eq!(synced_db.oldest_retained_loc, final_lower_bound);
1073
1074            synced_db.destroy().await.unwrap();
1075            let target_db =
1076                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1077            let inner = target_db.into_inner();
1078            inner.destroy().await.unwrap();
1079        });
1080    }
1081
1082    /// Test that the client fails to sync with invalid bounds (lower > upper)
1083    #[test_traced("WARN")]
1084    fn test_target_update_invalid_bounds() {
1085        let executor = deterministic::Runner::default();
1086        executor.start(|mut context| async move {
1087            // Create and populate target database
1088            let mut target_db = create_test_db(context.clone()).await;
1089            let target_ops = create_test_ops(25);
1090            apply_ops(&mut target_db, target_ops).await;
1091            target_db.commit(None).await.unwrap();
1092
1093            // Capture initial target state
1094            let mut hasher = test_hasher();
1095            let initial_lower_bound = target_db.oldest_retained_loc;
1096            let initial_upper_bound = target_db.op_count() - 1;
1097            let initial_root = target_db.root(&mut hasher);
1098
1099            // Create client with initial target
1100            let (mut update_sender, update_receiver) = mpsc::channel(1);
1101            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1102            let config = Config {
1103                context: context.clone(),
1104                db_config: create_sync_config(&format!("invalid_update_{}", context.next_u64())),
1105                fetch_batch_size: NZU64!(5),
1106                target: Target {
1107                    root: initial_root,
1108                    lower_bound_ops: initial_lower_bound,
1109                    upper_bound_ops: initial_upper_bound,
1110                },
1111                resolver: target_db.clone(),
1112                apply_batch_size: 1024,
1113                max_outstanding_requests: 10,
1114                update_rx: Some(update_receiver),
1115            };
1116            let client: Engine<ImmutableSyncTest, _> = Engine::new(config).await.unwrap();
1117
1118            // Send target update with invalid bounds (lower > upper)
1119            update_sender
1120                .send(Target {
1121                    root: initial_root,
1122                    lower_bound_ops: initial_upper_bound,
1123                    upper_bound_ops: initial_lower_bound,
1124                })
1125                .await
1126                .unwrap();
1127
1128            let result = client.step().await;
1129            assert!(matches!(result, Err(sync::Error::InvalidTarget { .. })));
1130
1131            let target_db =
1132                Arc::try_unwrap(target_db).unwrap_or_else(|_| panic!("failed to unwrap Arc"));
1133            let inner = target_db.into_inner();
1134            inner.destroy().await.unwrap();
1135        });
1136    }
1137
1138    /// Test that target updates can be sent even after the client is done
1139    #[test_traced("WARN")]
1140    fn test_target_update_on_done_client() {
1141        let executor = deterministic::Runner::default();
1142        executor.start(|mut context| async move {
1143            // Create and populate target database
1144            let mut target_db = create_test_db(context.clone()).await;
1145            let target_ops = create_test_ops(10);
1146            apply_ops(&mut target_db, target_ops).await;
1147            target_db.commit(None).await.unwrap();
1148
1149            // Capture target state
1150            let mut hasher = test_hasher();
1151            let lower_bound = target_db.oldest_retained_loc;
1152            let upper_bound = target_db.op_count() - 1;
1153            let root = target_db.root(&mut hasher);
1154
1155            // Create client with target that will complete immediately
1156            let (mut update_sender, update_receiver) = mpsc::channel(1);
1157            let target_db = Arc::new(commonware_runtime::RwLock::new(target_db));
1158            let config = Config {
1159                context: context.clone(),
1160                db_config: create_sync_config(&format!("done_{}", context.next_u64())),
1161                fetch_batch_size: NZU64!(20),
1162                target: Target {
1163                    root,
1164                    lower_bound_ops: lower_bound,
1165                    upper_bound_ops: upper_bound,
1166                },
1167                resolver: target_db.clone(),
1168                apply_batch_size: 1024,
1169                max_outstanding_requests: 10,
1170                update_rx: Some(update_receiver),
1171            };
1172
1173            // Complete the sync
1174            let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap();
1175
1176            // Attempt to apply a target update after sync is complete to verify we don't panic
1177            let _ = update_sender
1178                .send(Target {
1179                    root: sha256::Digest::from([2u8; 32]),
1180                    lower_bound_ops: lower_bound + 1,
1181                    upper_bound_ops: upper_bound + 1,
1182                })
1183                .await;
1184
1185            // Verify the synced database has the expected state
1186            let mut hasher = test_hasher();
1187            assert_eq!(synced_db.root(&mut hasher), root);
1188            assert_eq!(synced_db.op_count(), upper_bound + 1);
1189            assert_eq!(synced_db.oldest_retained_loc, lower_bound);
1190
1191            synced_db.destroy().await.unwrap();
1192            Arc::try_unwrap(target_db)
1193                .unwrap_or_else(|_| panic!("failed to unwrap Arc"))
1194                .into_inner()
1195                .destroy()
1196                .await
1197                .unwrap();
1198        });
1199    }
1200}