Skip to main content

commonware_storage/qmdb/sync/
engine.rs

1//! Core sync engine components that are shared across sync clients.
2use crate::{
3    merkle::mmr::{Location, StandardHasher},
4    qmdb::{
5        self,
6        sync::{
7            database::Config as _,
8            error::EngineError,
9            requests::{Id as RequestId, Requests},
10            resolver::{FetchResult, Resolver},
11            target::validate_update,
12            Database, Error as SyncError, Journal, Target,
13        },
14    },
15};
16use commonware_codec::Encode;
17use commonware_cryptography::Digest;
18use commonware_macros::select;
19use commonware_runtime::Metrics as _;
20use commonware_utils::{
21    channel::{
22        fallible::{AsyncFallibleExt, OneshotExt as _},
23        mpsc, oneshot,
24    },
25    NZU64,
26};
27use futures::{
28    future::{pending, Either},
29    StreamExt,
30};
31use mpsc::error::TryRecvError;
32use std::{
33    collections::{BTreeMap, HashMap, VecDeque},
34    fmt::Debug,
35    num::NonZeroU64,
36};
37
38/// Type alias for sync engine errors
39type Error<DB, R> = qmdb::sync::Error<<R as Resolver>::Error, <DB as Database>::Digest>;
40
41/// Whether sync should continue or complete
42#[derive(Debug)]
43pub(crate) enum NextStep<C, D> {
44    /// Sync should continue with the updated client
45    Continue(C),
46    /// Sync is complete with the final database
47    Complete(D),
48}
49
50/// Events that can occur during synchronization
51#[derive(Debug)]
52enum Event<Op, D: Digest, E> {
53    /// A target update was received
54    TargetUpdate(Target<D>),
55    /// A batch of operations was received
56    BatchReceived(IndexedFetchResult<Op, D, E>),
57    /// The target update channel was closed
58    UpdateChannelClosed,
59    /// A finish signal was received
60    FinishRequested,
61    /// The finish signal channel was closed
62    FinishChannelClosed,
63}
64
65/// Result from a fetch operation with its request ID and starting location.
66#[derive(Debug)]
67pub(super) struct IndexedFetchResult<Op, D: Digest, E> {
68    /// Unique ID assigned when the request was scheduled.
69    pub id: RequestId,
70    /// The location of the first operation in the batch.
71    pub start_loc: Location,
72    /// The result of the fetch operation.
73    pub result: Result<FetchResult<Op, D>, E>,
74}
75
76/// Wait for the next synchronization event.
77/// Returns `None` when there are no outstanding requests and no channels to wait on.
78async fn wait_for_event<Op, D: Digest, E>(
79    update_rx: &mut Option<mpsc::Receiver<Target<D>>>,
80    finish_rx: &mut Option<mpsc::Receiver<()>>,
81    outstanding_requests: &mut Requests<Op, D, E>,
82) -> Option<Event<Op, D, E>> {
83    if outstanding_requests.len() == 0 && update_rx.is_none() && finish_rx.is_none() {
84        return None;
85    }
86
87    let target_update_fut = update_rx.as_mut().map_or_else(
88        || Either::Right(pending()),
89        |update_rx| Either::Left(update_rx.recv()),
90    );
91    let finish_fut = finish_rx.as_mut().map_or_else(
92        || Either::Right(pending()),
93        |finish_rx| Either::Left(finish_rx.recv()),
94    );
95    let batch_result_fut = if outstanding_requests.len() == 0 {
96        Either::Right(pending())
97    } else {
98        Either::Left(outstanding_requests.futures_mut().next())
99    };
100
101    select! {
102        finish = finish_fut => finish.map_or_else(
103            || Some(Event::FinishChannelClosed),
104            |_| Some(Event::FinishRequested)
105        ),
106        target = target_update_fut => target.map_or_else(
107            || Some(Event::UpdateChannelClosed),
108            |target| Some(Event::TargetUpdate(target))
109        ),
110        result = batch_result_fut => result.map(|fetch_result| Event::BatchReceived(fetch_result)),
111    }
112}
113
114/// Configuration for creating a new Engine
115pub struct Config<DB, R>
116where
117    DB: Database,
118    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
119    DB::Op: Encode,
120{
121    /// Runtime context for creating database components
122    pub context: DB::Context,
123    /// Network resolver for fetching operations and proofs
124    pub resolver: R,
125    /// Sync target (root digest and operation bounds)
126    pub target: Target<DB::Digest>,
127    /// Maximum number of outstanding requests for operation batches
128    pub max_outstanding_requests: usize,
129    /// Maximum operations to fetch per batch
130    pub fetch_batch_size: NonZeroU64,
131    /// Number of operations to apply in a single batch
132    pub apply_batch_size: usize,
133    /// Database-specific configuration
134    pub db_config: DB::Config,
135    /// Channel for receiving sync target updates
136    pub update_rx: Option<mpsc::Receiver<Target<DB::Digest>>>,
137    /// Channel that requests sync completion once the current target is reached.
138    ///
139    /// When `None`, sync completes as soon as the target is reached.
140    pub finish_rx: Option<mpsc::Receiver<()>>,
141    /// Channel used to notify an observer once the current target is reached.
142    /// The engine sends at most one notification for each target.
143    ///
144    /// When `reached_target_tx` is `Some(...)`, this receiver must be actively
145    /// drained by the observer. The engine awaits send capacity on this channel before
146    /// proceeding, so backpressure can pause progress at target.
147    pub reached_target_tx: Option<mpsc::Sender<Target<DB::Digest>>>,
148    /// Maximum number of previous roots to retain for verifying in-flight
149    /// requests after target updates. Set to 0 to disable (all retained
150    /// requests will be re-fetched).
151    pub max_retained_roots: usize,
152}
153/// A shared sync engine that manages the core synchronization state and operations.
154pub(crate) struct Engine<DB, R>
155where
156    DB: Database,
157    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
158    DB::Op: Encode,
159{
160    /// Tracks outstanding fetch requests and their futures
161    outstanding_requests: Requests<DB::Op, DB::Digest, R::Error>,
162
163    /// Operations that have been fetched but not yet applied to the log.
164    ///
165    /// # Invariant
166    ///
167    /// The vectors in the map are non-empty.
168    fetched_operations: BTreeMap<Location, Vec<DB::Op>>,
169
170    /// Pinned MMR nodes extracted from proofs, used for database construction
171    pinned_nodes: Option<Vec<DB::Digest>>,
172
173    /// Historical roots from previous sync targets, keyed by tree size
174    /// (target.range.end()). Each tree size maps to a unique root because
175    /// the MMR is append-only and validate_update rejects unchanged roots.
176    /// When a retained request completes, proof.leaves identifies which
177    /// historical root to verify against.
178    retained_roots: HashMap<Location, DB::Digest>,
179
180    /// Tree sizes of retained roots in insertion order (oldest first),
181    /// used for FIFO eviction when retained_roots exceeds capacity.
182    retained_roots_order: VecDeque<Location>,
183
184    /// Maximum number of historical roots to retain
185    max_retained_roots: usize,
186
187    /// The current sync target (root digest and operation bounds)
188    target: Target<DB::Digest>,
189
190    /// Maximum number of parallel outstanding requests
191    max_outstanding_requests: usize,
192
193    /// Maximum operations to fetch in a single batch
194    fetch_batch_size: NonZeroU64,
195
196    /// Number of operations to apply in a single batch
197    apply_batch_size: usize,
198
199    /// Journal that operations are applied to during sync
200    journal: DB::Journal,
201
202    /// Resolver for fetching operations and proofs from the sync source
203    resolver: R,
204
205    /// Hasher used for proof verification
206    hasher: StandardHasher<DB::Hasher>,
207
208    /// Runtime context for database operations
209    context: DB::Context,
210
211    /// Configuration for building the final database
212    config: DB::Config,
213
214    /// Optional receiver for target updates during sync
215    update_rx: Option<mpsc::Receiver<Target<DB::Digest>>>,
216
217    /// Channel that requests sync completion once the current target is reached.
218    ///
219    /// When `None`, sync completes as soon as the target is reached.
220    finish_rx: Option<mpsc::Receiver<()>>,
221
222    /// Channel used to notify an observer once the current target is reached.
223    /// The engine sends at most one notification for each target.
224    ///
225    /// When `reached_target_tx` is `Some(...)`, this receiver must be actively
226    /// drained by the observer. The engine awaits send capacity on this channel before
227    /// proceeding, so backpressure can pause progress at target.
228    reached_target_tx: Option<mpsc::Sender<Target<DB::Digest>>>,
229
230    /// Whether explicit finish has been requested.
231    finish_requested: bool,
232
233    /// Tracks whether the current target has already been reported as reached.
234    reached_current_target_reported: bool,
235}
236
237#[cfg(test)]
238impl<DB, R> Engine<DB, R>
239where
240    DB: Database,
241    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
242    DB::Op: Encode,
243{
244    pub(crate) fn journal(&self) -> &DB::Journal {
245        &self.journal
246    }
247}
248
249impl<DB, R> Engine<DB, R>
250where
251    DB: Database,
252    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
253    DB::Op: Encode,
254{
255    /// Create a new sync engine with the given configuration
256    pub async fn new(config: Config<DB, R>) -> Result<Self, Error<DB, R>> {
257        if !config.target.range.end().is_valid() {
258            return Err(SyncError::Engine(EngineError::InvalidTarget {
259                lower_bound_pos: config.target.range.start(),
260                upper_bound_pos: config.target.range.end(),
261            }));
262        }
263
264        // Create journal and verifier using the database's factory methods
265        let journal = <DB::Journal as Journal>::new(
266            config.context.with_label("journal"),
267            config.db_config.journal_config(),
268            config.target.range.clone().into(),
269        )
270        .await?;
271
272        let mut engine = Self {
273            outstanding_requests: Requests::new(),
274            fetched_operations: BTreeMap::new(),
275            pinned_nodes: None,
276            retained_roots: HashMap::new(),
277            retained_roots_order: VecDeque::new(),
278            max_retained_roots: config.max_retained_roots,
279            target: config.target.clone(),
280            max_outstanding_requests: config.max_outstanding_requests,
281            fetch_batch_size: config.fetch_batch_size,
282            apply_batch_size: config.apply_batch_size,
283            journal,
284            resolver: config.resolver.clone(),
285            hasher: StandardHasher::<DB::Hasher>::new(),
286            context: config.context,
287            config: config.db_config,
288            update_rx: config.update_rx,
289            finish_rx: config.finish_rx,
290            reached_target_tx: config.reached_target_tx,
291            finish_requested: false,
292            reached_current_target_reported: false,
293        };
294        engine.schedule_requests().await?;
295        Ok(engine)
296    }
297
298    /// Schedule new fetch requests for operations in the sync range that we haven't yet fetched.
299    async fn schedule_requests(&mut self) -> Result<(), Error<DB, R>> {
300        let target_size = self.target.range.end();
301
302        // Schedule a pinned-nodes request at the lower sync bound if we don't
303        // have pinned nodes yet and one isn't already in flight.
304        if self.pinned_nodes.is_none()
305            && !self
306                .outstanding_requests
307                .contains(&self.target.range.start())
308        {
309            let start_loc = self.target.range.start();
310            let resolver = self.resolver.clone();
311            let (cancel_tx, cancel_rx) = oneshot::channel();
312            let id = self.outstanding_requests.next_id();
313            self.outstanding_requests.insert(
314                id,
315                start_loc,
316                cancel_tx,
317                Box::pin(async move {
318                    let result = resolver
319                        .get_operations(target_size, start_loc, NZU64!(1), true, cancel_rx)
320                        .await;
321                    IndexedFetchResult {
322                        id,
323                        start_loc,
324                        result,
325                    }
326                }),
327            );
328        }
329
330        // Calculate the maximum number of requests to make
331        let num_requests = self
332            .max_outstanding_requests
333            .saturating_sub(self.outstanding_requests.len());
334
335        let log_size = self.journal.size().await;
336
337        for _ in 0..num_requests {
338            // Convert fetched operations to operation counts for shared gap detection
339            let operation_counts: BTreeMap<Location, u64> = self
340                .fetched_operations
341                .iter()
342                .map(|(&start_loc, operations)| (start_loc, operations.len() as u64))
343                .collect();
344
345            // Find the next gap in the sync range that needs to be fetched.
346            let Some(gap_range) = crate::qmdb::sync::gaps::find_next(
347                Location::new(log_size)..self.target.range.end(),
348                &operation_counts,
349                self.outstanding_requests.locations(),
350                self.fetch_batch_size,
351            ) else {
352                break; // No more gaps to fill
353            };
354
355            // Calculate batch size for this gap
356            let gap_size = *gap_range.end.checked_sub(*gap_range.start).unwrap();
357            let gap_size: NonZeroU64 = gap_size.try_into().unwrap();
358            let batch_size = self.fetch_batch_size.min(gap_size);
359
360            // Schedule the request
361            let resolver = self.resolver.clone();
362            let (cancel_tx, cancel_rx) = oneshot::channel();
363            let id = self.outstanding_requests.next_id();
364            self.outstanding_requests.insert(
365                id,
366                gap_range.start,
367                cancel_tx,
368                Box::pin(async move {
369                    let result = resolver
370                        .get_operations(target_size, gap_range.start, batch_size, false, cancel_rx)
371                        .await;
372                    IndexedFetchResult {
373                        id,
374                        start_loc: gap_range.start,
375                        result,
376                    }
377                }),
378            );
379        }
380
381        Ok(())
382    }
383
384    /// Reset sync state for a target update.
385    ///
386    /// Only cancels requests that cover ranges before the new target range
387    /// start. Requests at or after the new start are retained; their proofs
388    /// will be verified against the saved historical root (see
389    /// `retained_roots`) so the fetched operations can still be used.
390    pub async fn reset_for_target_update(
391        mut self,
392        new_target: Target<DB::Digest>,
393    ) -> Result<Self, Error<DB, R>> {
394        self.journal.resize(new_target.range.start()).await?;
395        // Remove requests at or before the new start. The request at start
396        // must be re-issued as a pinned-nodes request with the new target size.
397        self.outstanding_requests
398            .remove_before(new_target.range.start().checked_add(1).unwrap());
399        self.fetched_operations.clear();
400        self.pinned_nodes = None;
401
402        // Save the current root keyed by its tree size for verifying
403        // retained requests that were issued against this target.
404        if self.max_retained_roots > 0 {
405            let old_target_size = self.target.range.end();
406            assert!(
407                self.retained_roots
408                    .insert(old_target_size, self.target.root)
409                    .is_none(),
410                "duplicate retained root for tree size {old_target_size:?}"
411            );
412            self.retained_roots_order.push_back(old_target_size);
413            while self.retained_roots.len() > self.max_retained_roots {
414                if let Some(oldest) = self.retained_roots_order.pop_front() {
415                    self.retained_roots.remove(&oldest);
416                }
417            }
418        }
419
420        self.target = new_target;
421        self.reached_current_target_reported = false;
422        Ok(self)
423    }
424
425    /// Drain a pending explicit-finish signal without blocking.
426    ///
427    /// If a finish signal is present, the engine transitions into "finish requested"
428    /// mode via [`Self::accept_finish`]. If the finish channel is disconnected before
429    /// a finish request is observed, this returns [`EngineError::FinishChannelClosed`].
430    fn drain_finish_requests(&mut self) -> Result<(), Error<DB, R>> {
431        let Some(finish_rx) = self.finish_rx.as_mut() else {
432            return Ok(());
433        };
434        match finish_rx.try_recv() {
435            Ok(()) => {
436                self.accept_finish();
437                Ok(())
438            }
439            Err(TryRecvError::Empty) => Ok(()),
440            Err(TryRecvError::Disconnected) => {
441                Err(SyncError::Engine(EngineError::FinishChannelClosed))
442            }
443        }
444    }
445
446    /// Mark that explicit finish has been requested and stop listening for more signals.
447    ///
448    /// This is a one-way transition for the current engine instance. Once set, the
449    /// engine may complete as soon as it is at a target (or the next time it reaches one).
450    fn accept_finish(&mut self) {
451        self.finish_requested = true;
452        self.finish_rx = None;
453    }
454
455    /// Notify an observer that the current target has been reached. The notification is sent
456    /// at most once per target, guarded by `reached_current_target_reported`.
457    ///
458    /// This send awaits backpressure. When `reached_target_tx` is `Some(...)`,
459    /// the receiver is expected to consume notifications promptly so the engine
460    /// can keep making progress. If the receiver side is closed, we drop the
461    /// sender and continue syncing without further reached-target notifications.
462    async fn report_reached_target(&mut self) {
463        if self.reached_current_target_reported {
464            return;
465        }
466        if let Some(sender) = self.reached_target_tx.as_ref() {
467            if !sender.send_lossy(self.target.clone()).await {
468                self.reached_target_tx = None;
469            }
470        }
471        self.reached_current_target_reported = true;
472    }
473
474    /// Store a batch of fetched operations. If the input list is empty, this is a no-op.
475    pub(crate) fn store_operations(&mut self, start_loc: Location, operations: Vec<DB::Op>) {
476        if operations.is_empty() {
477            return;
478        }
479        self.fetched_operations.insert(start_loc, operations);
480    }
481
482    /// Apply fetched operations to the journal if we have them.
483    ///
484    /// This method finds operations that are contiguous with the current journal tip
485    /// and applies them in order. It removes stale batches and handles partial
486    /// application of batches when needed.
487    pub(crate) async fn apply_operations(&mut self) -> Result<(), Error<DB, R>> {
488        let mut next_loc = self.journal.size().await;
489
490        // Remove any batches of operations with stale data.
491        // That is, those whose last operation is before `next_loc`.
492        self.fetched_operations.retain(|&start_loc, operations| {
493            assert!(!operations.is_empty());
494            let end_loc = start_loc.checked_add(operations.len() as u64 - 1).unwrap();
495            end_loc >= next_loc
496        });
497
498        loop {
499            // See if we have the next operation to apply (i.e. at the journal tip).
500            // Find the index of the range that contains the next location.
501            let range_start_loc =
502                self.fetched_operations
503                    .iter()
504                    .find_map(|(range_start, range_ops)| {
505                        assert!(!range_ops.is_empty());
506                        let range_end =
507                            range_start.checked_add(range_ops.len() as u64 - 1).unwrap();
508                        if *range_start <= next_loc && next_loc <= range_end {
509                            Some(*range_start)
510                        } else {
511                            None
512                        }
513                    });
514
515            let Some(range_start_loc) = range_start_loc else {
516                // We don't have the next operation to apply (i.e. at the journal tip)
517                break;
518            };
519
520            // Remove the batch of operations that contains the next operation to apply.
521            let operations = self.fetched_operations.remove(&range_start_loc).unwrap();
522            assert!(!operations.is_empty());
523            // Skip operations that are before the next location.
524            let skip_count = (next_loc - *range_start_loc) as usize;
525            let operations_count = operations.len() - skip_count;
526            let remaining_operations = operations.into_iter().skip(skip_count);
527            next_loc += operations_count as u64;
528            self.apply_operations_batch(remaining_operations).await?;
529        }
530
531        Ok(())
532    }
533
534    /// Apply a batch of operations to the journal
535    async fn apply_operations_batch<I>(&mut self, operations: I) -> Result<(), Error<DB, R>>
536    where
537        I: IntoIterator<Item = DB::Op>,
538    {
539        for op in operations {
540            self.journal.append(op).await?;
541            // No need to sync here -- the journal will periodically sync its storage
542            // and we will also sync when we're done applying all operations.
543        }
544        Ok(())
545    }
546
547    /// Check if sync is complete based on the current journal size and target
548    pub async fn is_at_target(&self) -> Result<bool, Error<DB, R>> {
549        let journal_size = self.journal.size().await;
550        let target_journal_size = self.target.range.end();
551
552        // Check if we've completed sync
553        if journal_size >= target_journal_size {
554            if journal_size > target_journal_size {
555                // This shouldn't happen in normal operation - indicates a bug
556                return Err(SyncError::Engine(EngineError::InvalidState));
557            }
558            return Ok(true);
559        }
560
561        Ok(false)
562    }
563
564    /// Handle the result of a fetch operation.
565    ///
566    /// Discards results for requests no longer tracked (removed by
567    /// `remove_before` during a target update). For tracked requests,
568    /// verifies the proof against the current root first, then falls back
569    /// to a matching historical root from `retained_roots` if available.
570    fn handle_fetch_result(
571        &mut self,
572        fetch_result: IndexedFetchResult<DB::Op, DB::Digest, R::Error>,
573    ) -> Result<(), Error<DB, R>> {
574        // Discard results for stale requests (removed by a target update).
575        // Using the request ID prevents a stale future from consuming the
576        // tracking entry of a fresh request at the same location.
577        if !self.outstanding_requests.remove(fetch_result.id) {
578            return Ok(());
579        }
580
581        let start_loc = fetch_result.start_loc;
582        let FetchResult {
583            proof,
584            operations,
585            success_tx,
586            pinned_nodes,
587        } = fetch_result.result.map_err(SyncError::Resolver)?;
588
589        // Validate batch size
590        let operations_len = operations.len() as u64;
591        if operations_len == 0 || operations_len > self.fetch_batch_size.get() {
592            // Invalid batch size - notify resolver of failure.
593            // We will request these operations again when we scan for unfetched operations.
594            success_tx.send_lossy(false);
595            return Ok(());
596        }
597
598        // Look up the root to verify against using proof.leaves (the tree
599        // size the request was issued for). Fresh requests match the current
600        // target; retained requests match a historical root.
601        let is_current = proof.leaves == self.target.range.end();
602        let target_root = if is_current {
603            &self.target.root
604        } else {
605            let Some(root) = self.retained_roots.get(&proof.leaves) else {
606                // No historical root to verify against (evicted or
607                // max_retained_roots is 0). Drop the result without
608                // penalizing the resolver — the data may be valid.
609                return Ok(());
610            };
611            root
612        };
613
614        // Verify the proof. Pinned nodes are only extracted from proofs
615        // for the current root because the database needs them for the
616        // latest tree size.
617        let need_pinned =
618            is_current && self.pinned_nodes.is_none() && start_loc == self.target.range.start();
619        let valid = if need_pinned {
620            let nodes = pinned_nodes.as_deref().unwrap_or(&[]);
621            qmdb::verify_proof_and_pinned_nodes(
622                &self.hasher,
623                &proof,
624                start_loc,
625                &operations,
626                nodes,
627                target_root,
628            )
629        } else {
630            qmdb::verify_proof(&self.hasher, &proof, start_loc, &operations, target_root)
631        };
632
633        // Report success or failure to the resolver.
634        success_tx.send_lossy(valid);
635
636        if !valid {
637            if need_pinned {
638                tracing::warn!("boundary proof or pinned nodes failed verification, will retry");
639            }
640            return Ok(());
641        }
642
643        // Cache pinned nodes only from current-root-verified proofs.
644        if need_pinned {
645            if let Some(nodes) = pinned_nodes {
646                self.pinned_nodes = Some(nodes);
647            }
648        }
649
650        // Store operations for later application.
651        self.store_operations(start_loc, operations);
652
653        Ok(())
654    }
655
656    /// Handle a sync event and return the next engine state.
657    async fn handle_event(
658        mut self,
659        event: Event<DB::Op, DB::Digest, R::Error>,
660    ) -> Result<NextStep<Self, DB>, Error<DB, R>> {
661        match event {
662            Event::TargetUpdate(new_target) => {
663                validate_update(&self.target, &new_target)?;
664
665                let mut updated_self = self.reset_for_target_update(new_target).await?;
666                updated_self.schedule_requests().await?;
667                Ok(NextStep::Continue(updated_self))
668            }
669            Event::UpdateChannelClosed => {
670                self.update_rx = None;
671                Ok(NextStep::Continue(self))
672            }
673            Event::FinishRequested => {
674                self.accept_finish();
675                Ok(NextStep::Continue(self))
676            }
677            Event::FinishChannelClosed => Err(SyncError::Engine(EngineError::FinishChannelClosed)),
678            Event::BatchReceived(fetch_result) => {
679                self.handle_fetch_result(fetch_result)?;
680                self.schedule_requests().await?;
681                self.apply_operations().await?;
682                Ok(NextStep::Continue(self))
683            }
684        }
685    }
686
687    /// Execute one step of the synchronization process.
688    ///
689    /// This is the main coordination method that:
690    /// 1. Checks if sync is complete
691    /// 2. Waits for the next synchronization event
692    /// 3. Handles different event types (target updates, fetch results)
693    /// 4. Coordinates request scheduling and operation application
694    ///
695    /// Returns `StepResult::Complete(database)` when sync is finished, or
696    /// `StepResult::Continue(self)` when more work remains.
697    pub(crate) async fn step(mut self) -> Result<NextStep<Self, DB>, Error<DB, R>> {
698        self.drain_finish_requests()?;
699
700        // Check if sync is complete
701        if self.is_at_target().await? {
702            self.report_reached_target().await;
703
704            if self.finish_rx.is_some() && !self.finish_requested {
705                let event = wait_for_event(
706                    &mut self.update_rx,
707                    &mut self.finish_rx,
708                    &mut self.outstanding_requests,
709                )
710                .await
711                .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
712                return self.handle_event(event).await;
713            }
714
715            self.journal.sync().await?;
716
717            // Build the database from the completed sync
718            let database = DB::from_sync_result(
719                self.context,
720                self.config,
721                self.journal,
722                self.pinned_nodes,
723                self.target.range.clone().into(),
724                self.apply_batch_size,
725            )
726            .await?;
727
728            // Verify the final root digest matches the final target
729            let got_root = database.root();
730            let expected_root = self.target.root;
731            if got_root != expected_root {
732                return Err(SyncError::Engine(EngineError::RootMismatch {
733                    expected: expected_root,
734                    actual: got_root,
735                }));
736            }
737
738            return Ok(NextStep::Complete(database));
739        }
740
741        // Wait for the next synchronization event
742        let event = wait_for_event(
743            &mut self.update_rx,
744            &mut self.finish_rx,
745            &mut self.outstanding_requests,
746        )
747        .await
748        .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
749        self.handle_event(event).await
750    }
751
752    /// Run sync to completion, returning the final database when done.
753    ///
754    /// This method repeatedly calls `step()` until sync is complete. The `step()` method
755    /// handles building the final database and verifying the root digest.
756    pub async fn sync(mut self) -> Result<DB, Error<DB, R>> {
757        // Run sync loop until completion
758        loop {
759            match self.step().await? {
760                NextStep::Continue(new_engine) => self = new_engine,
761                NextStep::Complete(database) => return Ok(database),
762            }
763        }
764    }
765}
766
767#[cfg(test)]
768mod tests {
769    use super::*;
770    use crate::merkle::mmr::Proof;
771    use commonware_cryptography::sha256;
772    use commonware_utils::channel::oneshot;
773    use std::{future::Future, pin::Pin};
774
775    /// Create a no-op fetch result future for testing request tracking.
776    fn dummy_future(
777        id: RequestId,
778        loc: u64,
779    ) -> Pin<Box<dyn Future<Output = IndexedFetchResult<i32, sha256::Digest, ()>> + Send>> {
780        Box::pin(async move {
781            IndexedFetchResult {
782                id,
783                start_loc: Location::new(loc),
784                result: Ok(FetchResult {
785                    proof: Proof {
786                        leaves: Location::new(0),
787                        digests: vec![],
788                    },
789                    operations: vec![],
790                    success_tx: oneshot::channel().0,
791                    pinned_nodes: None,
792                }),
793            }
794        })
795    }
796
797    /// Helper to add a request at a given location.
798    fn add(requests: &mut Requests<i32, sha256::Digest, ()>, loc: u64) -> RequestId {
799        let id = requests.next_id();
800        requests.insert(
801            id,
802            Location::new(loc),
803            oneshot::channel().0,
804            dummy_future(id, loc),
805        );
806        id
807    }
808
809    #[test]
810    fn test_add_and_remove() {
811        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
812        assert_eq!(requests.len(), 0);
813
814        let id = add(&mut requests, 10);
815        assert_eq!(requests.len(), 1);
816        assert!(requests.contains(&Location::new(10)));
817
818        assert!(requests.remove(id));
819        assert!(!requests.contains(&Location::new(10)));
820        assert!(!requests.remove(id));
821    }
822
823    #[test]
824    fn test_remove_before() {
825        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
826
827        add(&mut requests, 5);
828        add(&mut requests, 10);
829        add(&mut requests, 15);
830        add(&mut requests, 20);
831        assert_eq!(requests.len(), 4);
832
833        requests.remove_before(Location::new(10));
834        assert_eq!(requests.len(), 3);
835        assert!(!requests.contains(&Location::new(5)));
836        assert!(requests.contains(&Location::new(10)));
837        assert!(requests.contains(&Location::new(15)));
838        assert!(requests.contains(&Location::new(20)));
839    }
840
841    #[test]
842    fn test_remove_before_all() {
843        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
844
845        add(&mut requests, 5);
846        add(&mut requests, 10);
847        assert_eq!(requests.len(), 2);
848
849        requests.remove_before(Location::new(100));
850        assert_eq!(requests.len(), 0);
851    }
852
853    #[test]
854    fn test_remove_before_empty() {
855        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
856        requests.remove_before(Location::new(10));
857        assert_eq!(requests.len(), 0);
858    }
859
860    #[test]
861    fn test_remove_before_none() {
862        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
863
864        add(&mut requests, 10);
865        add(&mut requests, 20);
866        assert_eq!(requests.len(), 2);
867
868        requests.remove_before(Location::new(5));
869        assert_eq!(requests.len(), 2);
870        assert!(requests.contains(&Location::new(10)));
871        assert!(requests.contains(&Location::new(20)));
872    }
873
874    #[test]
875    fn test_superseded_request() {
876        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
877
878        // Old request at location 10
879        let old_id = add(&mut requests, 10);
880        assert_eq!(requests.len(), 1);
881
882        // New request supersedes at same location
883        let new_id = add(&mut requests, 10);
884        assert_eq!(requests.len(), 1);
885
886        // Old ID is no longer tracked (superseded by insert)
887        assert!(!requests.remove(old_id));
888
889        // New ID is still tracked and by_location is intact
890        assert!(requests.contains(&Location::new(10)));
891        assert!(requests.remove(new_id));
892        assert!(!requests.contains(&Location::new(10)));
893    }
894
895    #[test]
896    fn test_stale_id_after_remove_before() {
897        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
898
899        let old_id = add(&mut requests, 5);
900        add(&mut requests, 15);
901        requests.remove_before(Location::new(10));
902
903        // Old ID at location 5 was discarded by remove_before
904        assert!(!requests.remove(old_id));
905
906        // New request at the same location gets a different ID
907        let new_id = add(&mut requests, 5);
908        assert_ne!(old_id, new_id);
909        assert!(requests.remove(new_id));
910    }
911}