Skip to main content

commonware_storage/qmdb/sync/
engine.rs

1//! Core sync engine components that are shared across sync clients.
2use crate::{
3    merkle::{hasher::Standard as StandardHasher, Family, Location},
4    qmdb::{
5        self,
6        sync::{
7            database::Config as _,
8            error::EngineError,
9            requests::{Id as RequestId, Requests},
10            resolver::{FetchResult, Resolver},
11            target::validate_update,
12            Database, DbResolver, Error as SyncError, Journal, Target,
13        },
14    },
15};
16use commonware_codec::Encode;
17use commonware_cryptography::Digest;
18use commonware_macros::select;
19use commonware_runtime::{
20    telemetry::metrics::{Gauge, GaugeExt, MetricsExt},
21    Supervisor as _,
22};
23use commonware_utils::{
24    channel::{
25        fallible::{AsyncFallibleExt, OneshotExt as _},
26        mpsc, oneshot,
27    },
28    NZU64,
29};
30use futures::{
31    future::{pending, Either},
32    StreamExt,
33};
34use mpsc::error::TryRecvError;
35use std::{
36    collections::{BTreeMap, HashMap, VecDeque},
37    fmt::Debug,
38    num::NonZeroU64,
39};
40
41/// Type alias for sync engine errors
42type Error<DB, R> =
43    qmdb::sync::Error<<DB as Database>::Family, <R as Resolver>::Error, <DB as Database>::Digest>;
44
45/// Whether sync should continue or complete
46#[derive(Debug)]
47pub(crate) enum NextStep<C, D> {
48    /// Sync should continue with the updated client
49    Continue(C),
50    /// Sync is complete with the final database
51    Complete(D),
52}
53
54/// Events that can occur during synchronization
55#[derive(Debug)]
56enum Event<F: Family, Op, D: Digest, E> {
57    /// A target update was received
58    TargetUpdate(Target<F, D>),
59    /// A batch of operations was received
60    BatchReceived(IndexedFetchResult<F, Op, D, E>),
61    /// The target update channel was closed
62    UpdateChannelClosed,
63    /// A finish signal was received
64    FinishRequested,
65    /// The finish signal channel was closed
66    FinishChannelClosed,
67}
68
69/// Progress gauges updated by the sync engine.
70struct ProgressMetrics {
71    journal_size: Gauge,
72    target_end: Gauge,
73}
74
75impl ProgressMetrics {
76    /// Register sync progress metrics on the provided context.
77    fn new(context: &impl commonware_runtime::Metrics) -> Self {
78        let journal_size = context.gauge("journal_size", "Current sync journal size");
79        let target_end = context.gauge(
80            "target_end",
81            "Exclusive target range end, equal to journal size when sync completes",
82        );
83
84        Self {
85            journal_size,
86            target_end,
87        }
88    }
89
90    /// Update progress gauges from the current engine snapshot.
91    fn record(&self, journal_size: u64, target_end: u64) {
92        let _ = self.journal_size.try_set(journal_size);
93        let _ = self.target_end.try_set(target_end);
94    }
95}
96
97/// Result from a fetch operation with its request ID and starting location.
98#[derive(Debug)]
99pub(super) struct IndexedFetchResult<F: Family, Op, D: Digest, E> {
100    /// Unique ID assigned when the request was scheduled.
101    pub id: RequestId,
102    /// The result of the fetch operation.
103    pub result: Result<FetchResult<F, Op, D>, E>,
104}
105
106/// Wait for the next synchronization event.
107/// Returns `None` when there are no outstanding requests and no channels to wait on.
108async fn wait_for_event<F: Family, Op, D: Digest, E>(
109    update_rx: &mut Option<mpsc::Receiver<Target<F, D>>>,
110    finish_rx: &mut Option<mpsc::Receiver<()>>,
111    outstanding_requests: &mut Requests<F, Op, D, E>,
112) -> Option<Event<F, Op, D, E>> {
113    if outstanding_requests.len() == 0 && update_rx.is_none() && finish_rx.is_none() {
114        return None;
115    }
116
117    let target_update_fut = update_rx.as_mut().map_or_else(
118        || Either::Right(pending()),
119        |update_rx| Either::Left(update_rx.recv()),
120    );
121    let finish_fut = finish_rx.as_mut().map_or_else(
122        || Either::Right(pending()),
123        |finish_rx| Either::Left(finish_rx.recv()),
124    );
125    let batch_result_fut = if outstanding_requests.len() == 0 {
126        Either::Right(pending())
127    } else {
128        Either::Left(outstanding_requests.futures_mut().next())
129    };
130
131    select! {
132        finish = finish_fut => finish.map_or_else(
133            || Some(Event::FinishChannelClosed),
134            |_| Some(Event::FinishRequested)
135        ),
136        target = target_update_fut => target.map_or_else(
137            || Some(Event::UpdateChannelClosed),
138            |target| Some(Event::TargetUpdate(target))
139        ),
140        result = batch_result_fut => result.map(|fetch_result| Event::BatchReceived(fetch_result)),
141    }
142}
143
144/// Configuration for creating a new Engine
145pub struct Config<DB, R>
146where
147    DB: Database,
148    R: DbResolver<DB>,
149    DB::Op: Encode,
150{
151    /// Runtime context for creating database components
152    pub context: DB::Context,
153    /// Network resolver for fetching operations and proofs
154    pub resolver: R,
155    /// Sync target (root digest and operation bounds)
156    pub target: Target<DB::Family, DB::Digest>,
157    /// Maximum number of outstanding requests for operation batches
158    pub max_outstanding_requests: usize,
159    /// Maximum operations to fetch per batch
160    pub fetch_batch_size: NonZeroU64,
161    /// Number of operations to apply in a single batch
162    pub apply_batch_size: usize,
163    /// Database-specific configuration
164    pub db_config: DB::Config,
165    /// Channel for receiving sync target updates
166    pub update_rx: Option<mpsc::Receiver<Target<DB::Family, DB::Digest>>>,
167    /// Channel that requests sync completion once the current target is reached.
168    ///
169    /// When `None`, sync completes as soon as the target is reached.
170    pub finish_rx: Option<mpsc::Receiver<()>>,
171    /// Channel used to notify an observer once the current target is reached.
172    /// The engine sends at most one notification for each target.
173    ///
174    /// When `reached_target_tx` is `Some(...)`, this receiver must be actively
175    /// drained by the observer. The engine awaits send capacity on this channel before
176    /// proceeding, so backpressure can pause progress at target.
177    pub reached_target_tx: Option<mpsc::Sender<Target<DB::Family, DB::Digest>>>,
178    /// Maximum number of previous roots to retain for verifying in-flight
179    /// requests after target updates. Set to 0 to disable (all retained
180    /// requests will be re-fetched).
181    pub max_retained_roots: usize,
182}
183/// A shared sync engine that manages the core synchronization state and operations.
184pub(crate) struct Engine<DB, R>
185where
186    DB: Database,
187    R: DbResolver<DB>,
188    DB::Op: Encode,
189{
190    /// Tracks outstanding fetch requests and their futures
191    outstanding_requests: Requests<DB::Family, DB::Op, DB::Digest, R::Error>,
192
193    /// Operations that have been fetched but not yet applied to the log.
194    ///
195    /// # Invariant
196    ///
197    /// The vectors in the map are non-empty.
198    fetched_operations: BTreeMap<Location<DB::Family>, Vec<DB::Op>>,
199
200    /// Pinned merkle nodes extracted from proofs, used for database construction
201    pinned_nodes: Option<Vec<DB::Digest>>,
202
203    /// Historical roots from previous sync targets, keyed by tree size
204    /// (target.range.end()). Each tree size maps to a unique root because
205    /// the merkle tree is append-only and validate_update rejects unchanged
206    /// roots. When a retained request completes, proof.leaves identifies
207    /// which historical root to verify against.
208    retained_roots: HashMap<Location<DB::Family>, DB::Digest>,
209
210    /// Tree sizes of retained roots in insertion order (oldest first),
211    /// used for FIFO eviction when retained_roots exceeds capacity.
212    retained_roots_order: VecDeque<Location<DB::Family>>,
213
214    /// Maximum number of historical roots to retain
215    max_retained_roots: usize,
216
217    /// The current sync target (root digest and operation bounds)
218    target: Target<DB::Family, DB::Digest>,
219
220    /// Maximum number of parallel outstanding requests
221    max_outstanding_requests: usize,
222
223    /// Maximum operations to fetch in a single batch
224    fetch_batch_size: NonZeroU64,
225
226    /// Number of operations to apply in a single batch
227    apply_batch_size: usize,
228
229    /// Journal that operations are applied to during sync
230    journal: DB::Journal,
231
232    /// Resolver for fetching operations and proofs from the sync source
233    resolver: R,
234
235    /// Hasher used for proof verification
236    hasher: StandardHasher<DB::Hasher>,
237
238    /// Runtime context for database operations
239    context: DB::Context,
240
241    /// Configuration for building the final database
242    config: DB::Config,
243
244    /// Optional receiver for target updates during sync
245    update_rx: Option<mpsc::Receiver<Target<DB::Family, DB::Digest>>>,
246
247    /// Channel that requests sync completion once the current target is reached.
248    ///
249    /// When `None`, sync completes as soon as the target is reached.
250    finish_rx: Option<mpsc::Receiver<()>>,
251
252    /// Channel used to notify an observer once the current target is reached.
253    /// The engine sends at most one notification for each target.
254    ///
255    /// When `reached_target_tx` is `Some(...)`, this receiver must be actively
256    /// drained by the observer. The engine awaits send capacity on this channel before
257    /// proceeding, so backpressure can pause progress at target.
258    reached_target_tx: Option<mpsc::Sender<Target<DB::Family, DB::Digest>>>,
259
260    /// Progress gauges updated after target updates and batch application.
261    progress_metrics: ProgressMetrics,
262
263    /// Whether explicit finish has been requested.
264    finish_requested: bool,
265
266    /// Tracks whether the current target has already been reported as reached.
267    reached_current_target_reported: bool,
268}
269
270#[cfg(test)]
271impl<DB, R> Engine<DB, R>
272where
273    DB: Database,
274    R: DbResolver<DB>,
275    DB::Op: Encode,
276{
277    pub(crate) fn journal(&self) -> &DB::Journal {
278        &self.journal
279    }
280}
281
282impl<DB, R> Engine<DB, R>
283where
284    DB: Database,
285    R: DbResolver<DB>,
286    DB::Op: Encode,
287{
288    pub async fn new(config: Config<DB, R>) -> Result<Self, Error<DB, R>> {
289        if !config.target.range.end().is_valid() {
290            return Err(SyncError::Engine(EngineError::InvalidTarget {
291                lower_bound_pos: config.target.range.start(),
292                upper_bound_pos: config.target.range.end(),
293            }));
294        }
295
296        // Create journal and verifier using the database's factory methods
297        let journal = <DB::Journal as Journal<DB::Family>>::new(
298            config.context.child("journal"),
299            config.db_config.journal_config(),
300            config.target.range.clone(),
301        )
302        .await?;
303        let journal_size = journal.size().await;
304
305        // The sync journal is the source of truth for resume. If it already
306        // reaches the target, try to recover boundary pins from local Merkle
307        // state before asking peers for them. Partial journals resume without
308        // probing completed database state.
309        let pinned_nodes = if journal_size == *config.target.range.end() {
310            DB::local_boundary_nodes(
311                config.context.child("local_boundary"),
312                &config.db_config,
313                &config.target,
314                &journal,
315            )
316            .await?
317        } else {
318            None
319        };
320
321        let sync_context = config.context.child("sync");
322        let progress_metrics = ProgressMetrics::new(&sync_context);
323        let mut engine = Self {
324            outstanding_requests: Requests::new(),
325            fetched_operations: BTreeMap::new(),
326            pinned_nodes,
327            retained_roots: HashMap::new(),
328            retained_roots_order: VecDeque::new(),
329            max_retained_roots: config.max_retained_roots,
330            target: config.target.clone(),
331            max_outstanding_requests: config.max_outstanding_requests,
332            fetch_batch_size: config.fetch_batch_size,
333            apply_batch_size: config.apply_batch_size,
334            journal,
335            resolver: config.resolver.clone(),
336            hasher: qmdb::hasher::<DB::Hasher>(),
337            context: config.context,
338            config: config.db_config,
339            update_rx: config.update_rx,
340            finish_rx: config.finish_rx,
341            reached_target_tx: config.reached_target_tx,
342            finish_requested: false,
343            reached_current_target_reported: false,
344            progress_metrics,
345        };
346        engine.schedule_requests().await?;
347        engine.record_progress().await;
348        Ok(engine)
349    }
350
351    /// Schedule new fetch requests for operations in the sync range that we haven't yet fetched.
352    async fn schedule_requests(&mut self) -> Result<(), Error<DB, R>> {
353        let target_size = self.target.range.end();
354
355        // Schedule a pinned-nodes request at the lower sync bound if we don't
356        // have boundary state yet and one isn't already in flight.
357        if !self.has_boundary_state()
358            && !self
359                .outstanding_requests
360                .contains(&self.target.range.start())
361        {
362            let start_loc = self.target.range.start();
363            let resolver = self.resolver.clone();
364            let (cancel_tx, cancel_rx) = oneshot::channel();
365            let id = self.outstanding_requests.next_id();
366            self.outstanding_requests.insert(
367                id,
368                start_loc,
369                target_size,
370                cancel_tx,
371                Box::pin(async move {
372                    let result = resolver
373                        .get_operations(target_size, start_loc, NZU64!(1), true, cancel_rx)
374                        .await;
375                    IndexedFetchResult { id, result }
376                }),
377            );
378        }
379
380        // Calculate the maximum number of requests to make
381        let num_requests = self
382            .max_outstanding_requests
383            .saturating_sub(self.outstanding_requests.len());
384
385        let log_size = self.journal.size().await;
386
387        for _ in 0..num_requests {
388            // Convert fetched operations to operation counts for shared gap detection
389            let operation_counts: BTreeMap<Location<DB::Family>, u64> = self
390                .fetched_operations
391                .iter()
392                .map(|(&start_loc, operations)| (start_loc, operations.len() as u64))
393                .collect();
394
395            // Find the next gap in the sync range that needs to be fetched.
396            let Some(gap_range) = crate::qmdb::sync::gaps::find_next(
397                Location::new(log_size)..self.target.range.end(),
398                &operation_counts,
399                self.outstanding_requests.locations(),
400                self.fetch_batch_size,
401            ) else {
402                break; // No more gaps to fill
403            };
404
405            // Calculate batch size for this gap
406            let gap_size = *gap_range.end.checked_sub(*gap_range.start).unwrap();
407            let gap_size: NonZeroU64 = gap_size.try_into().unwrap();
408            let batch_size = self.fetch_batch_size.min(gap_size);
409
410            // Schedule the request
411            let resolver = self.resolver.clone();
412            let (cancel_tx, cancel_rx) = oneshot::channel();
413            let id = self.outstanding_requests.next_id();
414            self.outstanding_requests.insert(
415                id,
416                gap_range.start,
417                target_size,
418                cancel_tx,
419                Box::pin(async move {
420                    let result = resolver
421                        .get_operations(target_size, gap_range.start, batch_size, false, cancel_rx)
422                        .await;
423                    IndexedFetchResult { id, result }
424                }),
425            );
426        }
427
428        Ok(())
429    }
430
431    /// Reset sync state for a target update.
432    ///
433    /// Only cancels requests that cover ranges before the new target range
434    /// start. Requests at or after the new start are retained; their proofs
435    /// will be verified against the saved historical root (see
436    /// `retained_roots`) so the fetched operations can still be used.
437    pub async fn reset_for_target_update(
438        mut self,
439        new_target: Target<DB::Family, DB::Digest>,
440    ) -> Result<Self, Error<DB, R>> {
441        self.journal.resize(new_target.range.start()).await?;
442        // Remove requests at or before the new start. The request at start
443        // must be re-issued as a pinned-nodes request with the new target size.
444        self.outstanding_requests
445            .remove_before(new_target.range.start().checked_add(1).unwrap());
446        self.fetched_operations.clear();
447        self.pinned_nodes = None;
448
449        // Save the current root keyed by its tree size for verifying
450        // retained requests that were issued against this target.
451        if self.max_retained_roots > 0 {
452            let old_target_size = self.target.range.end();
453            assert!(
454                self.retained_roots
455                    .insert(old_target_size, self.target.root)
456                    .is_none(),
457                "duplicate retained root for tree size {old_target_size:?}"
458            );
459            self.retained_roots_order.push_back(old_target_size);
460            while self.retained_roots.len() > self.max_retained_roots {
461                if let Some(oldest) = self.retained_roots_order.pop_front() {
462                    self.retained_roots.remove(&oldest);
463                }
464            }
465        }
466
467        self.target = new_target;
468        self.reached_current_target_reported = false;
469        Ok(self)
470    }
471
472    /// Drain a pending explicit-finish signal without blocking.
473    ///
474    /// If a finish signal is present, the engine transitions into "finish requested"
475    /// mode via [`Self::accept_finish`]. If the finish channel is disconnected before
476    /// a finish request is observed, this returns [`EngineError::FinishChannelClosed`].
477    fn drain_finish_requests(&mut self) -> Result<(), Error<DB, R>> {
478        let Some(finish_rx) = self.finish_rx.as_mut() else {
479            return Ok(());
480        };
481        match finish_rx.try_recv() {
482            Ok(()) => {
483                self.accept_finish();
484                Ok(())
485            }
486            Err(TryRecvError::Empty) => Ok(()),
487            Err(TryRecvError::Disconnected) => {
488                Err(SyncError::Engine(EngineError::FinishChannelClosed))
489            }
490        }
491    }
492
493    /// Mark that explicit finish has been requested and stop listening for more signals.
494    ///
495    /// This is a one-way transition for the current engine instance. Once set, the
496    /// engine may complete as soon as it is at a target (or the next time it reaches one).
497    fn accept_finish(&mut self) {
498        self.finish_requested = true;
499        self.finish_rx = None;
500    }
501
502    /// Notify an observer that the current target has been reached. The notification is sent
503    /// at most once per target, guarded by `reached_current_target_reported`.
504    ///
505    /// This send awaits backpressure. When `reached_target_tx` is `Some(...)`,
506    /// the receiver is expected to consume notifications promptly so the engine
507    /// can keep making progress. If the receiver side is closed, we drop the
508    /// sender and continue syncing without further reached-target notifications.
509    async fn report_reached_target(&mut self) {
510        if self.reached_current_target_reported {
511            return;
512        }
513        if let Some(sender) = self.reached_target_tx.as_ref() {
514            if !sender.send_lossy(self.target.clone()).await {
515                self.reached_target_tx = None;
516            }
517        }
518        self.reached_current_target_reported = true;
519    }
520
521    /// Record a progress snapshot in metrics.
522    async fn record_progress(&mut self) {
523        self.progress_metrics
524            .record(self.journal.size().await, *self.target.range.end());
525    }
526
527    /// Store a batch of fetched operations. If the input list is empty, this is a no-op.
528    pub(crate) fn store_operations(
529        &mut self,
530        start_loc: Location<DB::Family>,
531        operations: Vec<DB::Op>,
532    ) {
533        if operations.is_empty() {
534            return;
535        }
536        self.fetched_operations.insert(start_loc, operations);
537    }
538
539    /// Apply fetched operations to the journal if we have them.
540    ///
541    /// This method finds operations that are contiguous with the current journal tip
542    /// and applies them in order. It removes stale batches and handles partial
543    /// application of batches when needed.
544    pub(crate) async fn apply_operations(&mut self) -> Result<(), Error<DB, R>> {
545        let mut next_loc = self.journal.size().await;
546
547        // Remove any batches of operations with stale data.
548        // That is, those whose last operation is before `next_loc`.
549        self.fetched_operations.retain(|&start_loc, operations| {
550            assert!(!operations.is_empty());
551            let end_loc = start_loc.checked_add(operations.len() as u64 - 1).unwrap();
552            end_loc >= next_loc
553        });
554
555        loop {
556            // See if we have the next operation to apply (i.e. at the journal tip).
557            // Find the index of the range that contains the next location.
558            let range_start_loc =
559                self.fetched_operations
560                    .iter()
561                    .find_map(|(range_start, range_ops)| {
562                        assert!(!range_ops.is_empty());
563                        let range_end =
564                            range_start.checked_add(range_ops.len() as u64 - 1).unwrap();
565                        if *range_start <= next_loc && next_loc <= range_end {
566                            Some(*range_start)
567                        } else {
568                            None
569                        }
570                    });
571
572            let Some(range_start_loc) = range_start_loc else {
573                // We don't have the next operation to apply (i.e. at the journal tip)
574                break;
575            };
576
577            // Remove the batch of operations that contains the next operation to apply.
578            let operations = self.fetched_operations.remove(&range_start_loc).unwrap();
579            assert!(!operations.is_empty());
580            // Skip operations that are before the next location.
581            let skip_count = (next_loc - *range_start_loc) as usize;
582            let operations_count = operations.len() - skip_count;
583            let remaining_operations = operations.into_iter().skip(skip_count);
584            next_loc += operations_count as u64;
585            self.apply_operations_batch(remaining_operations).await?;
586        }
587
588        Ok(())
589    }
590
591    /// Apply a batch of operations to the journal
592    async fn apply_operations_batch<I>(&mut self, operations: I) -> Result<(), Error<DB, R>>
593    where
594        I: IntoIterator<Item = DB::Op>,
595    {
596        for op in operations {
597            self.journal.append(op).await?;
598            // No need to sync here -- the journal will periodically sync its storage
599            // and we will also sync when we're done applying all operations.
600        }
601        Ok(())
602    }
603
604    /// Check if sync is complete based on the current journal size and target
605    pub async fn is_at_target(&mut self) -> Result<bool, Error<DB, R>> {
606        let journal_size = self.journal.size().await;
607        let target_journal_size = self.target.range.end();
608
609        // Check if we've completed sync
610        if journal_size >= target_journal_size {
611            if journal_size > target_journal_size {
612                // This shouldn't happen in normal operation - indicates a bug
613                return Err(SyncError::Engine(EngineError::InvalidState));
614            }
615            return Ok(true);
616        }
617
618        Ok(false)
619    }
620
621    /// Returns whether this target needs pinned boundary nodes to reconstruct pruned state.
622    fn needs_pinned_boundary(&self) -> bool {
623        self.target.range.start() > Location::new(0)
624    }
625
626    /// Returns whether the current target has the boundary state needed for completion.
627    fn has_boundary_state(&self) -> bool {
628        !self.needs_pinned_boundary() || self.pinned_nodes.is_some()
629    }
630
631    /// Returns whether the journal and boundary state are both ready for completion.
632    async fn is_ready_to_complete(&mut self) -> Result<bool, Error<DB, R>> {
633        Ok(self.is_at_target().await? && self.has_boundary_state())
634    }
635
636    /// Handle the result of a fetch operation.
637    ///
638    /// Discards results for requests no longer tracked (removed by
639    /// `remove_before` during a target update). For tracked requests,
640    /// verifies the proof against the current root first, then falls back
641    /// to a matching historical root from `retained_roots` if available.
642    fn handle_fetch_result(
643        &mut self,
644        fetch_result: IndexedFetchResult<DB::Family, DB::Op, DB::Digest, R::Error>,
645    ) -> Result<(), Error<DB, R>> {
646        // Discard results for stale requests (removed by a target update).
647        // Using the request ID prevents a stale future from consuming the
648        // tracking entry of a fresh request at the same location.
649        let Some(request) = self.outstanding_requests.remove(fetch_result.id) else {
650            return Ok(());
651        };
652
653        let start_loc = request.start_loc;
654        let FetchResult {
655            proof,
656            operations,
657            pinned_nodes,
658            callback,
659        } = fetch_result.result.map_err(SyncError::Resolver)?;
660
661        // Validate batch size
662        let operations_len = operations.len() as u64;
663        if operations_len == 0 || operations_len > self.fetch_batch_size.get() {
664            // Invalid batch size - notify resolver of failure.
665            // We will request these operations again when we scan for unfetched operations.
666            if let Some(callback) = callback {
667                callback.send_lossy(false);
668            }
669            return Ok(());
670        }
671
672        if proof.leaves != request.target_size {
673            if let Some(callback) = callback {
674                callback.send_lossy(false);
675            }
676            return Ok(());
677        }
678
679        // Look up the root to verify against using the tree size the request
680        // asked for. Fresh requests match the current target; retained
681        // requests match a historical root that was explicitly retained.
682        let is_current_target = request.target_size == self.target.range.end();
683        let target_root = if is_current_target {
684            &self.target.root
685        } else {
686            let Some(root) = self.retained_roots.get(&request.target_size) else {
687                // No historical root to verify against (evicted or
688                // max_retained_roots is 0). Drop the result without
689                // penalizing the resolver — the data may be valid.
690                return Ok(());
691            };
692            root
693        };
694
695        // Pinned nodes are only extracted from proofs for the current root because
696        // the database needs them for the latest tree size.
697        let need_pinned = is_current_target
698            && self.pinned_nodes.is_none()
699            && start_loc == self.target.range.start();
700        let elements = operations.iter().map(|op| op.encode()).collect::<Vec<_>>();
701        let valid = if need_pinned {
702            let nodes = pinned_nodes.as_deref().unwrap_or(&[]);
703            proof.verify_proof_and_pinned_nodes(
704                &self.hasher,
705                &elements,
706                start_loc,
707                nodes,
708                target_root,
709            )
710        } else {
711            proof.verify_range_inclusion(&self.hasher, &elements, start_loc, target_root)
712        };
713
714        // Report success or failure to the resolver.
715        if let Some(callback) = callback {
716            callback.send_lossy(valid);
717        }
718
719        if !valid {
720            if need_pinned {
721                tracing::warn!("boundary proof or pinned nodes failed verification, will retry");
722            }
723            return Ok(());
724        }
725
726        // Cache pinned nodes only from current-root-verified proofs.
727        if need_pinned {
728            if let Some(nodes) = pinned_nodes {
729                self.pinned_nodes = Some(nodes);
730            }
731        }
732
733        // Store operations for later application.
734        self.store_operations(start_loc, operations);
735
736        Ok(())
737    }
738
739    /// Handle a sync event and return the next engine state.
740    async fn handle_event(
741        mut self,
742        event: Event<DB::Family, DB::Op, DB::Digest, R::Error>,
743    ) -> Result<NextStep<Self, DB>, Error<DB, R>> {
744        match event {
745            Event::TargetUpdate(new_target) => {
746                validate_update(&self.target, &new_target)?;
747
748                let mut updated_self = self.reset_for_target_update(new_target).await?;
749                updated_self.record_progress().await;
750                updated_self.schedule_requests().await?;
751                Ok(NextStep::Continue(updated_self))
752            }
753            Event::UpdateChannelClosed => {
754                self.update_rx = None;
755                Ok(NextStep::Continue(self))
756            }
757            Event::FinishRequested => {
758                self.accept_finish();
759                Ok(NextStep::Continue(self))
760            }
761            Event::FinishChannelClosed => Err(SyncError::Engine(EngineError::FinishChannelClosed)),
762            Event::BatchReceived(fetch_result) => {
763                self.handle_fetch_result(fetch_result)?;
764                self.schedule_requests().await?;
765                self.apply_operations().await?;
766                self.record_progress().await;
767                Ok(NextStep::Continue(self))
768            }
769        }
770    }
771
772    /// Execute one step of the synchronization process.
773    ///
774    /// This is the main coordination method that:
775    /// 1. Checks if sync is complete
776    /// 2. Waits for the next synchronization event
777    /// 3. Handles different event types (target updates, fetch results)
778    /// 4. Coordinates request scheduling and operation application
779    ///
780    /// Returns `NextStep::Complete(database)` when sync is finished, or
781    /// `NextStep::Continue(self)` when more work remains.
782    pub(crate) async fn step(self) -> Result<NextStep<Self, DB>, Error<DB, R>> {
783        Box::pin(Self::step_inner(self)).await
784    }
785
786    /// Implements one sync step behind a boxed future boundary.
787    async fn step_inner(mut self) -> Result<NextStep<Self, DB>, Error<DB, R>> {
788        self.drain_finish_requests()?;
789
790        // Check if sync is complete
791        if self.is_ready_to_complete().await? {
792            self.report_reached_target().await;
793
794            if self.finish_rx.is_some() && !self.finish_requested {
795                let event = wait_for_event(
796                    &mut self.update_rx,
797                    &mut self.finish_rx,
798                    &mut self.outstanding_requests,
799                )
800                .await
801                .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
802                return self.handle_event(event).await;
803            }
804
805            self.journal.sync().await?;
806
807            // Build the database from the completed sync
808            let database = DB::from_sync_result(
809                self.context,
810                self.config,
811                self.journal,
812                self.pinned_nodes,
813                self.target.range.clone(),
814                self.apply_batch_size,
815            )
816            .await?;
817
818            // Verify the final root digest matches the final target
819            let got_root = database.root();
820            let expected_root = self.target.root;
821            if got_root != expected_root {
822                return Err(SyncError::Engine(EngineError::RootMismatch {
823                    expected: expected_root,
824                    actual: got_root,
825                }));
826            }
827
828            return Ok(NextStep::Complete(database));
829        }
830
831        // Wait for the next synchronization event
832        let event = wait_for_event(
833            &mut self.update_rx,
834            &mut self.finish_rx,
835            &mut self.outstanding_requests,
836        )
837        .await
838        .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
839        self.handle_event(event).await
840    }
841
842    /// Run sync to completion, returning the final database when done.
843    ///
844    /// This method repeatedly calls `step()` until sync is complete. The `step()` method
845    /// handles building the final database and verifying the root digest.
846    pub async fn sync(mut self) -> Result<DB, Error<DB, R>> {
847        // Run sync loop until completion
848        loop {
849            match self.step().await? {
850                NextStep::Continue(new_engine) => self = new_engine,
851                NextStep::Complete(database) => return Ok(database),
852            }
853        }
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use super::*;
860    use crate::{
861        merkle::mmr::{Family as MmrFamily, Proof},
862        qmdb::sync::requests::FetchFuture,
863    };
864    use commonware_cryptography::{sha256, Sha256};
865    use commonware_runtime::{deterministic, Runner as _};
866    use commonware_utils::{channel::oneshot, non_empty_range, NZU64};
867    use std::{
868        convert::Infallible,
869        sync::{
870            atomic::{AtomicUsize, Ordering},
871            Arc,
872        },
873    };
874
875    #[derive(Clone)]
876    struct TestConfig {
877        journal_size: u64,
878        boundary_probes: Arc<AtomicUsize>,
879    }
880
881    impl crate::qmdb::sync::DatabaseConfig for TestConfig {
882        type JournalConfig = u64;
883
884        fn journal_config(&self) -> Self::JournalConfig {
885            self.journal_size
886        }
887    }
888
889    struct TestJournal {
890        size: u64,
891    }
892
893    impl Journal<MmrFamily> for TestJournal {
894        type Config = u64;
895        type Context = deterministic::Context;
896        type Error = crate::journal::Error;
897        type Op = i32;
898
899        async fn new(
900            _context: Self::Context,
901            size: Self::Config,
902            _range: commonware_utils::range::NonEmptyRange<Location<MmrFamily>>,
903        ) -> Result<Self, Self::Error> {
904            Ok(Self { size })
905        }
906
907        async fn resize(&mut self, start: Location<MmrFamily>) -> Result<(), Self::Error> {
908            self.size = *start;
909            Ok(())
910        }
911
912        async fn sync(&mut self) -> Result<(), Self::Error> {
913            Ok(())
914        }
915
916        async fn size(&self) -> u64 {
917            self.size
918        }
919
920        async fn append(&mut self, _op: Self::Op) -> Result<(), Self::Error> {
921            self.size += 1;
922            Ok(())
923        }
924    }
925
926    struct TestDb;
927
928    impl Database for TestDb {
929        type Config = TestConfig;
930        type Context = deterministic::Context;
931        type Digest = sha256::Digest;
932        type Family = MmrFamily;
933        type Hasher = Sha256;
934        type Journal = TestJournal;
935        type Op = i32;
936
937        async fn from_sync_result(
938            _context: Self::Context,
939            _config: Self::Config,
940            _journal: Self::Journal,
941            _pinned_nodes: Option<Vec<Self::Digest>>,
942            _range: commonware_utils::range::NonEmptyRange<Location<Self::Family>>,
943            _apply_batch_size: usize,
944        ) -> Result<Self, qmdb::Error<Self::Family>> {
945            Ok(Self)
946        }
947
948        async fn local_boundary_nodes(
949            _context: Self::Context,
950            config: &Self::Config,
951            _target: &Target<Self::Family, Self::Digest>,
952            _journal: &Self::Journal,
953        ) -> Result<Option<Vec<Self::Digest>>, qmdb::Error<Self::Family>> {
954            config.boundary_probes.fetch_add(1, Ordering::SeqCst);
955            Ok(Some(vec![]))
956        }
957
958        fn root(&self) -> Self::Digest {
959            sha256::Digest::from([0u8; 32])
960        }
961    }
962
963    #[derive(Clone)]
964    struct TestResolver;
965
966    impl Resolver for TestResolver {
967        type Digest = sha256::Digest;
968        type Error = Infallible;
969        type Family = MmrFamily;
970        type Op = i32;
971
972        async fn get_operations(
973            &self,
974            _op_count: Location<Self::Family>,
975            _start_loc: Location<Self::Family>,
976            _max_ops: NonZeroU64,
977            _include_pinned_nodes: bool,
978            _cancel_rx: oneshot::Receiver<()>,
979        ) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
980            Ok(FetchResult::new(
981                Proof {
982                    leaves: Location::new(0),
983                    inactive_peaks: 0,
984                    digests: vec![],
985                },
986                vec![],
987                None,
988            ))
989        }
990    }
991
992    fn test_engine_config(
993        context: deterministic::Context,
994        journal_size: u64,
995        boundary_probes: Arc<AtomicUsize>,
996    ) -> Config<TestDb, TestResolver> {
997        Config {
998            context,
999            resolver: TestResolver,
1000            target: Target {
1001                root: sha256::Digest::from([1u8; 32]),
1002                range: non_empty_range!(Location::new(5), Location::new(10)),
1003            },
1004            max_outstanding_requests: 1,
1005            fetch_batch_size: NZU64!(1),
1006            apply_batch_size: 1,
1007            db_config: TestConfig {
1008                journal_size,
1009                boundary_probes,
1010            },
1011            update_rx: None,
1012            finish_rx: None,
1013            reached_target_tx: None,
1014            max_retained_roots: 0,
1015        }
1016    }
1017
1018    #[test]
1019    fn new_probes_local_boundary_when_journal_reaches_target() {
1020        deterministic::Runner::default().start(|context| async move {
1021            let boundary_probes = Arc::new(AtomicUsize::new(0));
1022            Engine::new(test_engine_config(context, 10, boundary_probes.clone()))
1023                .await
1024                .unwrap();
1025
1026            assert_eq!(boundary_probes.load(Ordering::SeqCst), 1);
1027        });
1028    }
1029
1030    #[test]
1031    fn new_skips_local_boundary_when_journal_is_partial() {
1032        deterministic::Runner::default().start(|context| async move {
1033            let boundary_probes = Arc::new(AtomicUsize::new(0));
1034            Engine::new(test_engine_config(context, 7, boundary_probes.clone()))
1035                .await
1036                .unwrap();
1037
1038            assert_eq!(boundary_probes.load(Ordering::SeqCst), 0);
1039        });
1040    }
1041
1042    /// Create a no-op fetch result future for testing request tracking.
1043    fn dummy_future(id: RequestId) -> FetchFuture<MmrFamily, i32, sha256::Digest, ()> {
1044        Box::pin(async move {
1045            IndexedFetchResult {
1046                id,
1047                result: Ok(FetchResult::new(
1048                    Proof {
1049                        leaves: Location::new(0),
1050                        inactive_peaks: 0,
1051                        digests: vec![],
1052                    },
1053                    vec![],
1054                    None,
1055                )),
1056            }
1057        })
1058    }
1059
1060    /// Helper to add a request at a given location.
1061    fn add(requests: &mut Requests<MmrFamily, i32, sha256::Digest, ()>, loc: u64) -> RequestId {
1062        let id = requests.next_id();
1063        requests.insert(
1064            id,
1065            Location::new(loc),
1066            Location::new(loc),
1067            oneshot::channel().0,
1068            dummy_future(id),
1069        );
1070        id
1071    }
1072
1073    #[test]
1074    fn test_add_and_remove() {
1075        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1076        assert_eq!(requests.len(), 0);
1077
1078        let id = add(&mut requests, 10);
1079        assert_eq!(requests.len(), 1);
1080        assert!(requests.contains(&Location::new(10)));
1081
1082        assert!(requests.remove(id).is_some());
1083        assert!(!requests.contains(&Location::new(10)));
1084        assert!(requests.remove(id).is_none());
1085    }
1086
1087    #[test]
1088    fn test_remove_before() {
1089        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1090
1091        add(&mut requests, 5);
1092        add(&mut requests, 10);
1093        add(&mut requests, 15);
1094        add(&mut requests, 20);
1095        assert_eq!(requests.len(), 4);
1096
1097        requests.remove_before(Location::new(10));
1098        assert_eq!(requests.len(), 3);
1099        assert!(!requests.contains(&Location::new(5)));
1100        assert!(requests.contains(&Location::new(10)));
1101        assert!(requests.contains(&Location::new(15)));
1102        assert!(requests.contains(&Location::new(20)));
1103    }
1104
1105    #[test]
1106    fn test_remove_before_all() {
1107        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1108
1109        add(&mut requests, 5);
1110        add(&mut requests, 10);
1111        assert_eq!(requests.len(), 2);
1112
1113        requests.remove_before(Location::new(100));
1114        assert_eq!(requests.len(), 0);
1115    }
1116
1117    #[test]
1118    fn test_remove_before_empty() {
1119        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1120        requests.remove_before(Location::new(10));
1121        assert_eq!(requests.len(), 0);
1122    }
1123
1124    #[test]
1125    fn test_remove_before_none() {
1126        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1127
1128        add(&mut requests, 10);
1129        add(&mut requests, 20);
1130        assert_eq!(requests.len(), 2);
1131
1132        requests.remove_before(Location::new(5));
1133        assert_eq!(requests.len(), 2);
1134        assert!(requests.contains(&Location::new(10)));
1135        assert!(requests.contains(&Location::new(20)));
1136    }
1137
1138    #[test]
1139    fn test_superseded_request() {
1140        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1141
1142        // Old request at location 10
1143        let old_id = add(&mut requests, 10);
1144        assert_eq!(requests.len(), 1);
1145
1146        // New request supersedes at same location
1147        let new_id = add(&mut requests, 10);
1148        assert_eq!(requests.len(), 1);
1149
1150        // Old ID is no longer tracked (superseded by insert)
1151        assert!(requests.remove(old_id).is_none());
1152
1153        // New ID is still tracked and by_location is intact
1154        assert!(requests.contains(&Location::new(10)));
1155        assert!(requests.remove(new_id).is_some());
1156        assert!(!requests.contains(&Location::new(10)));
1157    }
1158
1159    #[test]
1160    fn test_stale_id_after_remove_before() {
1161        let mut requests: Requests<MmrFamily, i32, sha256::Digest, ()> = Requests::new();
1162
1163        let old_id = add(&mut requests, 5);
1164        add(&mut requests, 15);
1165        requests.remove_before(Location::new(10));
1166
1167        // Old ID at location 5 was discarded by remove_before
1168        assert!(requests.remove(old_id).is_none());
1169
1170        // New request at the same location gets a different ID
1171        let new_id = add(&mut requests, 5);
1172        assert_ne!(old_id, new_id);
1173        assert!(requests.remove(new_id).is_some());
1174    }
1175}