Skip to main content

commonware_storage/qmdb/sync/
engine.rs

1//! Core sync engine components that are shared across sync clients.
2use crate::{
3    mmr::{Location, StandardHasher},
4    qmdb::{
5        self,
6        sync::{
7            database::Config as _,
8            error::EngineError,
9            requests::Requests,
10            resolver::{FetchResult, Resolver},
11            target::validate_update,
12            Database, Error as SyncError, Journal, Target,
13        },
14    },
15};
16use commonware_codec::Encode;
17use commonware_cryptography::Digest;
18use commonware_macros::select;
19use commonware_runtime::Metrics as _;
20use commonware_utils::{channel::mpsc, NZU64};
21use futures::{future::Either, StreamExt};
22use std::{collections::BTreeMap, fmt::Debug, num::NonZeroU64};
23
24/// Type alias for sync engine errors
25type Error<DB, R> = qmdb::sync::Error<<R as Resolver>::Error, <DB as Database>::Digest>;
26
27/// Whether sync should continue or complete
28#[derive(Debug)]
29pub(crate) enum NextStep<C, D> {
30    /// Sync should continue with the updated client
31    Continue(C),
32    /// Sync is complete with the final database
33    Complete(D),
34}
35
36/// Events that can occur during synchronization
37#[derive(Debug)]
38enum Event<Op, D: Digest, E> {
39    /// A target update was received
40    TargetUpdate(Target<D>),
41    /// A batch of operations was received
42    BatchReceived(IndexedFetchResult<Op, D, E>),
43    /// The target update channel was closed
44    UpdateChannelClosed,
45}
46
47/// Result from a fetch operation with its starting location
48#[derive(Debug)]
49pub(super) struct IndexedFetchResult<Op, D: Digest, E> {
50    /// The location of the first operation in the batch
51    pub start_loc: Location,
52    /// The result of the fetch operation
53    pub result: Result<FetchResult<Op, D>, E>,
54}
55
56/// Wait for the next synchronization event from either target updates or fetch results.
57/// Returns `None` if the sync is stalled (there are no outstanding requests).
58async fn wait_for_event<Op, D: Digest, E>(
59    update_receiver: &mut Option<mpsc::Receiver<Target<D>>>,
60    outstanding_requests: &mut Requests<Op, D, E>,
61) -> Option<Event<Op, D, E>> {
62    let target_update_fut = update_receiver.as_mut().map_or_else(
63        || Either::Right(futures::future::pending()),
64        |update_rx| Either::Left(update_rx.recv()),
65    );
66
67    select! {
68        target = target_update_fut => target.map_or_else(
69            || Some(Event::UpdateChannelClosed),
70            |target| Some(Event::TargetUpdate(target))
71        ),
72        result = outstanding_requests.futures_mut().next() => {
73            result.map(|fetch_result| Event::BatchReceived(fetch_result))
74        },
75    }
76}
77
78/// Configuration for creating a new Engine
79pub struct Config<DB, R>
80where
81    DB: Database,
82    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
83    DB::Op: Encode,
84{
85    /// Runtime context for creating database components
86    pub context: DB::Context,
87    /// Network resolver for fetching operations and proofs
88    pub resolver: R,
89    /// Sync target (root digest and operation bounds)
90    pub target: Target<DB::Digest>,
91    /// Maximum number of outstanding requests for operation batches
92    pub max_outstanding_requests: usize,
93    /// Maximum operations to fetch per batch
94    pub fetch_batch_size: NonZeroU64,
95    /// Number of operations to apply in a single batch
96    pub apply_batch_size: usize,
97    /// Database-specific configuration
98    pub db_config: DB::Config,
99    /// Channel for receiving sync target updates
100    pub update_rx: Option<mpsc::Receiver<Target<DB::Digest>>>,
101}
102/// A shared sync engine that manages the core synchronization state and operations.
103pub(crate) struct Engine<DB, R>
104where
105    DB: Database,
106    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
107    DB::Op: Encode,
108{
109    /// Tracks outstanding fetch requests and their futures
110    outstanding_requests: Requests<DB::Op, DB::Digest, R::Error>,
111
112    /// Operations that have been fetched but not yet applied to the log
113    fetched_operations: BTreeMap<Location, Vec<DB::Op>>,
114
115    /// Pinned MMR nodes extracted from proofs, used for database construction
116    pinned_nodes: Option<Vec<DB::Digest>>,
117
118    /// The current sync target (root digest and operation bounds)
119    target: Target<DB::Digest>,
120
121    /// Maximum number of parallel outstanding requests
122    max_outstanding_requests: usize,
123
124    /// Maximum operations to fetch in a single batch
125    fetch_batch_size: NonZeroU64,
126
127    /// Number of operations to apply in a single batch
128    apply_batch_size: usize,
129
130    /// Journal that operations are applied to during sync
131    journal: DB::Journal,
132
133    /// Resolver for fetching operations and proofs from the sync source
134    resolver: R,
135
136    /// Hasher used for proof verification
137    hasher: StandardHasher<DB::Hasher>,
138
139    /// Runtime context for database operations
140    context: DB::Context,
141
142    /// Configuration for building the final database
143    config: DB::Config,
144
145    /// Optional receiver for target updates during sync
146    update_receiver: Option<mpsc::Receiver<Target<DB::Digest>>>,
147}
148
149#[cfg(test)]
150impl<DB, R> Engine<DB, R>
151where
152    DB: Database,
153    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
154    DB::Op: Encode,
155{
156    pub(crate) fn journal(&self) -> &DB::Journal {
157        &self.journal
158    }
159}
160
161impl<DB, R> Engine<DB, R>
162where
163    DB: Database,
164    R: Resolver<Op = DB::Op, Digest = DB::Digest>,
165    DB::Op: Encode,
166{
167    /// Create a new sync engine with the given configuration
168    pub async fn new(config: Config<DB, R>) -> Result<Self, Error<DB, R>> {
169        if config.target.range.is_empty() || !config.target.range.end.is_valid() {
170            return Err(SyncError::Engine(EngineError::InvalidTarget {
171                lower_bound_pos: config.target.range.start,
172                upper_bound_pos: config.target.range.end,
173            }));
174        }
175
176        // Create journal and verifier using the database's factory methods
177        let journal = <DB::Journal as Journal>::new(
178            config.context.with_label("journal"),
179            config.db_config.journal_config(),
180            config.target.range.clone(),
181        )
182        .await?;
183
184        let mut engine = Self {
185            outstanding_requests: Requests::new(),
186            fetched_operations: BTreeMap::new(),
187            pinned_nodes: None,
188            target: config.target.clone(),
189            max_outstanding_requests: config.max_outstanding_requests,
190            fetch_batch_size: config.fetch_batch_size,
191            apply_batch_size: config.apply_batch_size,
192            journal,
193            resolver: config.resolver.clone(),
194            hasher: StandardHasher::<DB::Hasher>::new(),
195            context: config.context,
196            config: config.db_config,
197            update_receiver: config.update_rx,
198        };
199        engine.schedule_requests().await?;
200        Ok(engine)
201    }
202
203    /// Schedule new fetch requests for operations in the sync range that we haven't yet fetched.
204    async fn schedule_requests(&mut self) -> Result<(), Error<DB, R>> {
205        let target_size = self.target.range.end;
206
207        // Special case: If we don't have pinned nodes, we need to extract them from a proof
208        // for the lower sync bound.
209        if self.pinned_nodes.is_none() {
210            let start_loc = self.target.range.start;
211            let resolver = self.resolver.clone();
212            self.outstanding_requests.add(
213                start_loc,
214                Box::pin(async move {
215                    let result = resolver
216                        .get_operations(target_size, start_loc, NZU64!(1))
217                        .await;
218                    IndexedFetchResult { start_loc, result }
219                }),
220            );
221        }
222
223        // Calculate the maximum number of requests to make
224        let num_requests = self
225            .max_outstanding_requests
226            .saturating_sub(self.outstanding_requests.len());
227
228        let log_size = self.journal.size().await;
229
230        for _ in 0..num_requests {
231            // Convert fetched operations to operation counts for shared gap detection
232            let operation_counts: BTreeMap<Location, u64> = self
233                .fetched_operations
234                .iter()
235                .map(|(&start_loc, operations)| (start_loc, operations.len() as u64))
236                .collect();
237
238            // Find the next gap in the sync range that needs to be fetched.
239            let Some(gap_range) = crate::qmdb::sync::gaps::find_next(
240                Location::new_unchecked(log_size)..self.target.range.end,
241                &operation_counts,
242                self.outstanding_requests.locations(),
243                self.fetch_batch_size,
244            ) else {
245                break; // No more gaps to fill
246            };
247
248            // Calculate batch size for this gap
249            let gap_size = *gap_range.end.checked_sub(*gap_range.start).unwrap();
250            let gap_size: NonZeroU64 = gap_size.try_into().unwrap();
251            let batch_size = self.fetch_batch_size.min(gap_size);
252
253            // Schedule the request
254            let resolver = self.resolver.clone();
255            self.outstanding_requests.add(
256                gap_range.start,
257                Box::pin(async move {
258                    let result = resolver
259                        .get_operations(target_size, gap_range.start, batch_size)
260                        .await;
261                    IndexedFetchResult {
262                        start_loc: gap_range.start,
263                        result,
264                    }
265                }),
266            );
267        }
268
269        Ok(())
270    }
271
272    /// Clear all sync state for a target update
273    pub async fn reset_for_target_update(
274        mut self,
275        new_target: Target<DB::Digest>,
276    ) -> Result<Self, Error<DB, R>> {
277        self.journal.resize(new_target.range.start).await?;
278
279        Ok(Self {
280            outstanding_requests: Requests::new(),
281            fetched_operations: BTreeMap::new(),
282            pinned_nodes: None,
283            target: new_target,
284            max_outstanding_requests: self.max_outstanding_requests,
285            fetch_batch_size: self.fetch_batch_size,
286            apply_batch_size: self.apply_batch_size,
287            journal: self.journal,
288            resolver: self.resolver,
289            hasher: self.hasher,
290            context: self.context,
291            config: self.config,
292            update_receiver: self.update_receiver,
293        })
294    }
295
296    /// Store a batch of fetched operations
297    pub fn store_operations(&mut self, start_loc: Location, operations: Vec<DB::Op>) {
298        self.fetched_operations.insert(start_loc, operations);
299    }
300
301    /// Apply fetched operations to the journal if we have them.
302    ///
303    /// This method finds operations that are contiguous with the current journal tip
304    /// and applies them in order. It removes stale batches and handles partial
305    /// application of batches when needed.
306    pub async fn apply_operations(&mut self) -> Result<(), Error<DB, R>> {
307        let mut next_loc = self.journal.size().await;
308
309        // Remove any batches of operations with stale data.
310        // That is, those whose last operation is before `next_loc`.
311        self.fetched_operations.retain(|&start_loc, operations| {
312            let end_loc = start_loc.checked_add(operations.len() as u64 - 1).unwrap();
313            end_loc >= next_loc
314        });
315
316        loop {
317            // See if we have the next operation to apply (i.e. at the journal tip).
318            // Find the index of the range that contains the next location.
319            let range_start_loc =
320                self.fetched_operations
321                    .iter()
322                    .find_map(|(range_start, range_ops)| {
323                        let range_end =
324                            range_start.checked_add(range_ops.len() as u64 - 1).unwrap();
325                        if *range_start <= next_loc && next_loc <= range_end {
326                            Some(*range_start)
327                        } else {
328                            None
329                        }
330                    });
331
332            let Some(range_start_loc) = range_start_loc else {
333                // We don't have the next operation to apply (i.e. at the journal tip)
334                break;
335            };
336
337            // Remove the batch of operations that contains the next operation to apply.
338            let operations = self.fetched_operations.remove(&range_start_loc).unwrap();
339            // Skip operations that are before the next location.
340            let skip_count = (next_loc - *range_start_loc) as usize;
341            let operations_count = operations.len() - skip_count;
342            let remaining_operations = operations.into_iter().skip(skip_count);
343            next_loc += operations_count as u64;
344            self.apply_operations_batch(remaining_operations).await?;
345        }
346
347        Ok(())
348    }
349
350    /// Apply a batch of operations to the journal
351    async fn apply_operations_batch<I>(&mut self, operations: I) -> Result<(), Error<DB, R>>
352    where
353        I: IntoIterator<Item = DB::Op>,
354    {
355        for op in operations {
356            self.journal.append(op).await?;
357            // No need to sync here -- the journal will periodically sync its storage
358            // and we will also sync when we're done applying all operations.
359        }
360        Ok(())
361    }
362
363    /// Check if sync is complete based on the current journal size and target
364    pub async fn is_complete(&self) -> Result<bool, Error<DB, R>> {
365        let journal_size = self.journal.size().await;
366        let target_journal_size = self.target.range.end;
367
368        // Check if we've completed sync
369        if journal_size >= target_journal_size {
370            if journal_size > target_journal_size {
371                // This shouldn't happen in normal operation - indicates a bug
372                return Err(SyncError::Engine(EngineError::InvalidState));
373            }
374            return Ok(true);
375        }
376
377        Ok(false)
378    }
379
380    /// Handle the result of a fetch operation.
381    ///
382    /// This method processes incoming fetch results by:
383    /// 1. Removing the request from outstanding requests
384    /// 2. Validating batch size
385    /// 3. Verifying proofs using the configured verifier
386    /// 4. Extracting pinned nodes if needed
387    /// 5. Storing valid operations for later application
388    fn handle_fetch_result(
389        &mut self,
390        fetch_result: IndexedFetchResult<DB::Op, DB::Digest, R::Error>,
391    ) -> Result<(), Error<DB, R>> {
392        // Mark request as complete
393        self.outstanding_requests.remove(fetch_result.start_loc);
394
395        let start_loc = fetch_result.start_loc;
396        let FetchResult {
397            proof,
398            operations,
399            success_tx,
400        } = fetch_result.result.map_err(SyncError::Resolver)?;
401
402        // Validate batch size
403        let operations_len = operations.len() as u64;
404        if operations_len == 0 || operations_len > self.fetch_batch_size.get() {
405            // Invalid batch size - notify resolver of failure.
406            // We will request these operations again when we scan for unfetched operations.
407            let _ = success_tx.send(false);
408            return Ok(());
409        }
410
411        // Verify the proof
412        let proof_valid = qmdb::verify_proof(
413            &mut self.hasher,
414            &proof,
415            start_loc,
416            &operations,
417            &self.target.root,
418        );
419
420        // Report success or failure to the resolver
421        let _ = success_tx.send(proof_valid);
422
423        if proof_valid {
424            // Extract pinned nodes if we don't have them and this is the first batch
425            if self.pinned_nodes.is_none() && start_loc == self.target.range.start {
426                if let Ok(nodes) =
427                    crate::qmdb::extract_pinned_nodes(&proof, start_loc, operations_len)
428                {
429                    self.pinned_nodes = Some(nodes);
430                }
431            }
432
433            // Store operations for later application
434            self.store_operations(start_loc, operations);
435        }
436
437        Ok(())
438    }
439
440    /// Execute one step of the synchronization process.
441    ///
442    /// This is the main coordination method that:
443    /// 1. Checks if sync is complete
444    /// 2. Waits for the next synchronization event
445    /// 3. Handles different event types (target updates, fetch results)
446    /// 4. Coordinates request scheduling and operation application
447    ///
448    /// Returns `StepResult::Complete(database)` when sync is finished, or
449    /// `StepResult::Continue(self)` when more work remains.
450    pub(crate) async fn step(mut self) -> Result<NextStep<Self, DB>, Error<DB, R>> {
451        // Check if sync is complete
452        if self.is_complete().await? {
453            self.journal.sync().await?;
454
455            // Build the database from the completed sync
456            let database = DB::from_sync_result(
457                self.context,
458                self.config,
459                self.journal,
460                self.pinned_nodes,
461                self.target.range.clone(),
462                self.apply_batch_size,
463            )
464            .await?;
465
466            // Verify the final root digest matches the final target
467            let got_root = database.root();
468            let expected_root = self.target.root;
469            if got_root != expected_root {
470                return Err(SyncError::Engine(EngineError::RootMismatch {
471                    expected: expected_root,
472                    actual: got_root,
473                }));
474            }
475
476            return Ok(NextStep::Complete(database));
477        }
478
479        // Wait for the next synchronization event
480        let event = wait_for_event(&mut self.update_receiver, &mut self.outstanding_requests)
481            .await
482            .ok_or(SyncError::Engine(EngineError::SyncStalled))?;
483
484        match event {
485            Event::TargetUpdate(new_target) => {
486                // Validate and handle the target update
487                validate_update(&self.target, &new_target)?;
488
489                let mut updated_self = self.reset_for_target_update(new_target).await?;
490
491                // Schedule new requests for the updated target
492                updated_self.schedule_requests().await?;
493
494                return Ok(NextStep::Continue(updated_self));
495            }
496            Event::UpdateChannelClosed => {
497                self.update_receiver = None;
498            }
499            Event::BatchReceived(fetch_result) => {
500                // Process the fetch result
501                self.handle_fetch_result(fetch_result)?;
502
503                // Request operations in the sync range
504                self.schedule_requests().await?;
505
506                // Apply operations that are now contiguous with the current journal
507                self.apply_operations().await?;
508            }
509        }
510
511        Ok(NextStep::Continue(self))
512    }
513
514    /// Run sync to completion, returning the final database when done.
515    ///
516    /// This method repeatedly calls `step()` until sync is complete. The `step()` method
517    /// handles building the final database and verifying the root digest.
518    pub async fn sync(mut self) -> Result<DB, Error<DB, R>> {
519        // Run sync loop until completion
520        loop {
521            match self.step().await? {
522                NextStep::Continue(new_engine) => self = new_engine,
523                NextStep::Complete(database) => return Ok(database),
524            }
525        }
526    }
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use crate::mmr::Proof;
533    use commonware_cryptography::sha256;
534    use commonware_utils::channel::oneshot;
535
536    #[test]
537    fn test_outstanding_requests() {
538        let mut requests: Requests<i32, sha256::Digest, ()> = Requests::new();
539        assert_eq!(requests.len(), 0);
540
541        // Test adding requests
542        let fut = Box::pin(async {
543            IndexedFetchResult {
544                start_loc: Location::new_unchecked(0),
545                result: Ok(FetchResult {
546                    proof: Proof {
547                        leaves: Location::new_unchecked(0),
548                        digests: vec![],
549                    },
550                    operations: vec![],
551                    success_tx: oneshot::channel().0,
552                }),
553            }
554        });
555        requests.add(Location::new_unchecked(10), fut);
556        assert_eq!(requests.len(), 1);
557        assert!(requests.locations().contains(&Location::new_unchecked(10)));
558
559        // Test removing requests
560        requests.remove(Location::new_unchecked(10));
561        assert!(!requests.locations().contains(&Location::new_unchecked(10)));
562    }
563}