Skip to main content

miden_client/sync/
state_sync.rs

1use alloc::boxed::Box;
2use alloc::collections::{BTreeMap, BTreeSet};
3use alloc::sync::Arc;
4use alloc::vec::Vec;
5
6use async_trait::async_trait;
7use miden_protocol::Word;
8use miden_protocol::account::{Account, AccountHeader, AccountId};
9use miden_protocol::block::{BlockHeader, BlockNumber};
10use miden_protocol::crypto::merkle::mmr::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr};
11use miden_protocol::note::{NoteId, NoteTag};
12use tracing::info;
13
14use super::state_sync_update::TransactionUpdateTracker;
15use super::{AccountUpdates, StateSyncUpdate};
16use crate::ClientError;
17use crate::note::NoteUpdateTracker;
18use crate::rpc::NodeRpcClient;
19use crate::rpc::domain::note::CommittedNote;
20use crate::rpc::domain::sync::StateSyncInfo;
21use crate::rpc::domain::transaction::{self as rpc_tx, TransactionInclusion};
22use crate::store::{InputNoteRecord, OutputNoteRecord, StoreError};
23use crate::transaction::TransactionRecord;
24
25// SYNC REQUEST
26// ================================================================================================
27
28/// Bundles the client state needed to perform a sync operation.
29///
30/// The sync process uses these inputs to:
31/// - Request account commitment updates from the node for the provided accounts.
32/// - Filter which note inclusions the node returns based on the provided note tags.
33/// - Follow the lifecycle of every tracked note (input and output), transitioning them from pending
34///   to committed to consumed as the network state advances.
35/// - Track uncommitted transactions so they can be marked as committed when the node confirms them,
36///   or discarded when they become stale.
37///
38/// Use [`Client::build_sync_input()`](`crate::Client::build_sync_input()`) to build a default input
39/// from the client state, or construct this struct manually for custom sync scenarios.
40pub struct StateSyncInput {
41    /// Account headers to request commitment updates for.
42    pub accounts: Vec<AccountHeader>,
43    /// Note tags that the node uses to filter which note inclusions to return.
44    pub note_tags: BTreeSet<NoteTag>,
45    /// Input notes whose lifecycle should be followed during sync.
46    pub input_notes: Vec<InputNoteRecord>,
47    /// Output notes whose lifecycle should be followed during sync.
48    pub output_notes: Vec<OutputNoteRecord>,
49    /// Transactions to track for commitment or discard during sync.
50    pub uncommitted_transactions: Vec<TransactionRecord>,
51}
52
53// SYNC CALLBACKS
54// ================================================================================================
55
56/// The action to be taken when a note update is received as part of the sync response.
57#[allow(clippy::large_enum_variant)]
58pub enum NoteUpdateAction {
59    /// The note commit update is relevant and the specified note should be marked as committed in
60    /// the store, storing its inclusion proof.
61    Commit(CommittedNote),
62    /// The public note is relevant and should be inserted into the store.
63    Insert(InputNoteRecord),
64    /// The note update is not relevant and should be discarded.
65    Discard,
66}
67
68#[async_trait(?Send)]
69pub trait OnNoteReceived {
70    /// Callback that gets executed when a new note is received as part of the sync response.
71    ///
72    /// It receives:
73    ///
74    /// - The committed note received from the network.
75    /// - An optional note record that corresponds to the state of the note in the network (only if
76    ///   the note is public).
77    ///
78    /// It returns an enum indicating the action to be taken for the received note update. Whether
79    /// the note updated should be committed, new public note inserted, or ignored.
80    async fn on_note_received(
81        &self,
82        committed_note: CommittedNote,
83        public_note: Option<InputNoteRecord>,
84    ) -> Result<NoteUpdateAction, ClientError>;
85}
86// STATE SYNC
87// ================================================================================================
88
89/// The state sync component encompasses the client's sync logic. It is then used to request
90/// updates from the node and apply them to the relevant elements. The updates are then returned and
91/// can be applied to the store to persist the changes.
92#[derive(Clone)]
93pub struct StateSync {
94    /// The RPC client used to communicate with the node.
95    rpc_api: Arc<dyn NodeRpcClient>,
96    /// Responsible for checking the relevance of notes and executing the
97    /// [`OnNoteReceived`] callback when a new note inclusion is received.
98    note_screener: Arc<dyn OnNoteReceived>,
99    /// Number of blocks after which pending transactions are considered stale and discarded.
100    /// If `None`, there is no limit and transactions will be kept indefinitely.
101    tx_discard_delta: Option<u32>,
102    /// Whether to check for nullifiers during state sync. When enabled, the component will query
103    /// the nullifiers for unspent notes at each sync step. This allows to detect when tracked
104    /// notes have been consumed externally and discard local transactions that depend on them.
105    sync_nullifiers: bool,
106}
107
108impl StateSync {
109    /// Creates a new instance of the state sync component.
110    ///
111    /// The nullifiers sync is enabled by default. To disable it, see
112    /// [`Self::disable_nullifier_sync`].
113    ///
114    /// # Arguments
115    ///
116    /// * `rpc_api` - The RPC client used to communicate with the node.
117    /// * `note_screener` - The note screener used to check the relevance of notes.
118    /// * `tx_discard_delta` - Number of blocks after which pending transactions are discarded.
119    pub fn new(
120        rpc_api: Arc<dyn NodeRpcClient>,
121        note_screener: Arc<dyn OnNoteReceived>,
122        tx_discard_delta: Option<u32>,
123    ) -> Self {
124        Self {
125            rpc_api,
126            note_screener,
127            tx_discard_delta,
128            sync_nullifiers: true,
129        }
130    }
131
132    /// Disables the nullifier sync.
133    ///
134    /// When disabled, the component will not query the node for new nullifiers after each sync
135    /// step. This is useful for clients that don't need to track note consumption, such as
136    /// faucets.
137    pub fn disable_nullifier_sync(&mut self) {
138        self.sync_nullifiers = false;
139    }
140
141    /// Enables the nullifier sync.
142    pub fn enable_nullifier_sync(&mut self) {
143        self.sync_nullifiers = true;
144    }
145
146    /// Syncs the state of the client with the chain tip of the node, returning the updates that
147    /// should be applied to the store.
148    ///
149    /// Use [`Client::build_sync_input()`](`crate::Client::build_sync_input()`) to build the default
150    /// input, or assemble it manually for custom sync. The `current_partial_mmr` is taken by
151    /// mutable reference so callers can keep it in memory across syncs.
152    ///
153    /// During the sync process, the following steps are performed:
154    /// 1. A request is sent to the node to get the state updates. This request includes tracked
155    ///    account IDs and the tags of notes that might have changed or that might be of interest to
156    ///    the client.
157    /// 2. A response is received with the current state of the network. The response includes
158    ///    information about new and committed notes, updated accounts, and committed transactions.
159    /// 3. Tracked public accounts are updated and private accounts are validated against the node
160    ///    state.
161    /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
162    ///    during the sync processing.
163    /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
164    ///    [`OnNoteReceived`] callback.
165    /// 6. Transactions are updated with their new states. Transactions might be committed or
166    ///    discarded.
167    /// 7. The MMR is updated with the new peaks and authentication nodes.
168    pub async fn sync_state(
169        &self,
170        current_partial_mmr: &mut PartialMmr,
171        input: StateSyncInput,
172    ) -> Result<StateSyncUpdate, ClientError> {
173        let StateSyncInput {
174            accounts,
175            note_tags,
176            input_notes,
177            output_notes,
178            uncommitted_transactions,
179        } = input;
180        let block_num = u32::try_from(
181            current_partial_mmr.forest().num_leaves().checked_sub(1).unwrap_or_default(),
182        )
183        .map_err(|_| ClientError::InvalidPartialMmrForest)?
184        .into();
185
186        let mut state_sync_update = StateSyncUpdate {
187            block_num,
188            note_updates: NoteUpdateTracker::new(input_notes, output_notes),
189            transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
190            ..Default::default()
191        };
192
193        let note_tags = Arc::new(note_tags);
194        let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
195        let mut state_sync_steps = Vec::new();
196
197        while let Some(step) = self
198            .sync_state_step(state_sync_update.block_num, &account_ids, &note_tags)
199            .await?
200        {
201            let sync_block_num = step.block_header.block_num();
202
203            let reached_tip = step.chain_tip == sync_block_num;
204
205            state_sync_update.block_num = sync_block_num;
206            state_sync_steps.push(step);
207
208            if reached_tip {
209                break;
210            }
211        }
212
213        // TODO: fetch_public_note_details should take an iterator or btreeset down to the RPC call
214        // (this would be a breaking change so it should be done separately)
215        let public_note_ids: Vec<NoteId> = state_sync_steps
216            .iter()
217            .flat_map(|s| s.note_inclusions.iter())
218            .filter(|n| !n.metadata().is_private())
219            .map(|n| *n.note_id())
220            .collect();
221
222        let public_note_records = self.fetch_public_note_details(&public_note_ids).await?;
223
224        // Apply local changes. These involve updating the MMR and applying state transitions
225        // to notes based on the received information.
226        info!("Applying state transitions locally.");
227
228        for sync_step in state_sync_steps {
229            let StateSyncInfo {
230                chain_tip,
231                block_header,
232                mmr_delta,
233                account_commitment_updates,
234                note_inclusions,
235                transactions,
236            } = sync_step;
237
238            self.account_state_sync(
239                &mut state_sync_update.account_updates,
240                &accounts,
241                &account_commitment_updates,
242            )
243            .await?;
244
245            self.transaction_state_sync(
246                &mut state_sync_update.transaction_updates,
247                &block_header,
248                &transactions,
249            );
250
251            let found_relevant_note = self
252                .note_state_sync(
253                    &mut state_sync_update.note_updates,
254                    note_inclusions,
255                    &block_header,
256                    &public_note_records,
257                )
258                .await?;
259
260            let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
261                &block_header,
262                found_relevant_note,
263                current_partial_mmr,
264                mmr_delta,
265            )?;
266
267            let include_block = found_relevant_note || chain_tip == block_header.block_num();
268            if include_block {
269                state_sync_update.block_updates.insert(
270                    block_header,
271                    found_relevant_note,
272                    new_mmr_peaks,
273                    new_authentication_nodes,
274                );
275            } else {
276                // Even though this block header is not stored, `apply_mmr_changes` may
277                // produce authentication nodes for already-tracked leaves whose Merkle
278                // paths change as the MMR grows. These must be persisted so that the
279                // `PartialMmr` can be correctly reconstructed from the store after a
280                // client restart.
281                state_sync_update
282                    .block_updates
283                    .extend_authentication_nodes(new_authentication_nodes);
284            }
285        }
286        if self.sync_nullifiers {
287            info!("Syncing nullifiers.");
288            self.nullifiers_state_sync(&mut state_sync_update, block_num).await?;
289        }
290
291        Ok(state_sync_update)
292    }
293
294    /// Executes a single sync step by composing calls to `sync_notes`, `sync_chain_mmr`, and
295    /// `sync_transactions`.
296    ///
297    /// `sync_notes` drives the loop: it determines the target block (the first block containing
298    /// a matching note, or the chain tip). If the target block equals `current_block_num`, `None`
299    /// is returned, signalling that the client is already at the requested height.
300    ///
301    /// The other two calls use the same target block to ensure a consistent range.
302    async fn sync_state_step(
303        &self,
304        current_block_num: BlockNumber,
305        account_ids: &[AccountId],
306        note_tags: &Arc<BTreeSet<NoteTag>>,
307    ) -> Result<Option<StateSyncInfo>, ClientError> {
308        info!("Performing sync state step.");
309
310        // Retrieve sync_notes
311        let note_sync =
312            self.rpc_api.sync_notes(current_block_num, None, note_tags.as_ref()).await?;
313
314        let target_block = note_sync.block_header.block_num();
315
316        // We don't need to continue if the chain has not advanced
317        if target_block == current_block_num {
318            return Ok(None);
319        }
320
321        // Get MMR delta for the same range
322        let mmr_delta = self
323            .rpc_api
324            .sync_chain_mmr(current_block_num, Some(target_block))
325            .await?
326            .mmr_delta;
327
328        // Gather transactions for tracked accounts (skip if none)
329        let (account_commitment_updates, transactions) = if account_ids.is_empty() {
330            (vec![], vec![])
331        } else {
332            let tx_info = self
333                .rpc_api
334                .sync_transactions(current_block_num, Some(target_block), account_ids.to_vec())
335                .await?;
336
337            let account_updates = derive_account_commitment_updates(&tx_info.transaction_records);
338
339            let tx_inclusions = tx_info
340                .transaction_records
341                .iter()
342                .map(|r| TransactionInclusion {
343                    transaction_id: r.transaction_header.id(),
344                    block_num: r.block_num,
345                    account_id: r.transaction_header.account_id(),
346                    initial_state_commitment: r.transaction_header.initial_state_commitment(),
347                })
348                .collect();
349
350            (account_updates, tx_inclusions)
351        };
352
353        // Compose StateSyncInfo with sync results
354        Ok(Some(StateSyncInfo {
355            chain_tip: note_sync.chain_tip,
356            block_header: note_sync.block_header,
357            mmr_delta,
358            account_commitment_updates,
359            note_inclusions: note_sync.notes,
360            transactions,
361        }))
362    }
363
364    // HELPERS
365    // --------------------------------------------------------------------------------------------
366
367    /// Compares the state of tracked accounts with the updates received from the node. The method
368    /// updates the `state_sync_update` field with the details of the accounts that need to be
369    /// updated.
370    ///
371    /// The account updates might include:
372    /// * Public accounts that have been updated in the node.
373    /// * Network accounts that have been updated in the node and are being tracked by the client.
374    /// * Private accounts that have been marked as mismatched because the current commitment
375    ///   doesn't match the one received from the node. The client will need to handle these cases
376    ///   as they could be a stale account state or a reason to lock the account.
377    async fn account_state_sync(
378        &self,
379        account_updates: &mut AccountUpdates,
380        accounts: &[AccountHeader],
381        account_commitment_updates: &[(AccountId, Word)],
382    ) -> Result<(), ClientError> {
383        let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
384            accounts.iter().partition(|account_header| !account_header.id().is_private());
385
386        let updated_public_accounts = self
387            .get_updated_public_accounts(account_commitment_updates, &public_accounts)
388            .await?;
389
390        let mismatched_private_accounts = account_commitment_updates
391            .iter()
392            .filter(|(account_id, digest)| {
393                private_accounts.iter().any(|account| {
394                    account.id() == *account_id && &account.to_commitment() != digest
395                })
396            })
397            .copied()
398            .collect::<Vec<_>>();
399
400        account_updates
401            .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
402
403        Ok(())
404    }
405
406    /// Queries the node for the latest state of the public accounts that don't match the current
407    /// state of the client.
408    async fn get_updated_public_accounts(
409        &self,
410        account_updates: &[(AccountId, Word)],
411        current_public_accounts: &[&AccountHeader],
412    ) -> Result<Vec<Account>, ClientError> {
413        let mut mismatched_public_accounts = vec![];
414
415        for (id, commitment) in account_updates {
416            // check if this updated account state is tracked by the client
417            if let Some(account) = current_public_accounts
418                .iter()
419                .find(|acc| *id == acc.id() && *commitment != acc.to_commitment())
420            {
421                mismatched_public_accounts.push(*account);
422            }
423        }
424
425        self.rpc_api
426            .get_updated_public_accounts(&mismatched_public_accounts)
427            .await
428            .map_err(ClientError::RpcError)
429    }
430
431    /// Applies the changes received from the sync response to the notes and transactions tracked
432    /// by the client and updates the `note_updates` accordingly.
433    ///
434    /// This method uses the callbacks provided to the [`StateSync`] component to check if the
435    /// updates received are relevant to the client.
436    ///
437    /// The note updates might include:
438    /// * New notes that we received from the node and might be relevant to the client.
439    /// * Tracked expected notes that were committed in the block.
440    /// * Tracked notes that were being processed by a transaction that got committed.
441    /// * Tracked notes that were nullified by an external transaction.
442    ///
443    /// The `public_notes` parameter provides cached public note details for the current sync
444    /// iteration so the node is only queried once per batch.
445    async fn note_state_sync(
446        &self,
447        note_updates: &mut NoteUpdateTracker,
448        note_inclusions: Vec<CommittedNote>,
449        block_header: &BlockHeader,
450        public_notes: &BTreeMap<NoteId, InputNoteRecord>,
451    ) -> Result<bool, ClientError> {
452        // `found_relevant_note` tracks whether we want to persist the block header in the end
453        let mut found_relevant_note = false;
454
455        for committed_note in note_inclusions {
456            let public_note = (!committed_note.metadata().is_private())
457                .then(|| public_notes.get(committed_note.note_id()))
458                .flatten()
459                .cloned();
460
461            match self.note_screener.on_note_received(committed_note, public_note).await? {
462                NoteUpdateAction::Commit(committed_note) => {
463                    // Only mark the downloaded block header as relevant if we are talking about
464                    // an input note (output notes get marked as committed but we don't need the
465                    // block for anything there)
466                    found_relevant_note |= note_updates
467                        .apply_committed_note_state_transitions(&committed_note, block_header)?;
468                },
469                NoteUpdateAction::Insert(public_note) => {
470                    found_relevant_note = true;
471
472                    note_updates.apply_new_public_note(public_note, block_header)?;
473                },
474                NoteUpdateAction::Discard => {},
475            }
476        }
477
478        Ok(found_relevant_note)
479    }
480
481    /// Queries the node for all received notes that aren't being locally tracked in the client.
482    ///
483    /// The client can receive metadata for private notes that it's not tracking. In this case,
484    /// notes are ignored for now as they become useless until details are imported.
485    async fn fetch_public_note_details(
486        &self,
487        query_notes: &[NoteId],
488    ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
489        if query_notes.is_empty() {
490            return Ok(BTreeMap::new());
491        }
492
493        info!("Getting note details for notes that are not being tracked.");
494
495        let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
496
497        Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
498    }
499
500    /// Collects the nullifier tags for the notes that were updated in the sync response and uses
501    /// the `sync_nullifiers` endpoint to check if there are new nullifiers for these
502    /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
503    ///
504    /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
505    async fn nullifiers_state_sync(
506        &self,
507        state_sync_update: &mut StateSyncUpdate,
508        current_block_num: BlockNumber,
509    ) -> Result<(), ClientError> {
510        // To receive information about added nullifiers, we reduce them to the higher 16 bits
511        // Note that besides filtering by nullifier prefixes, the node also filters by block number
512        // (it only returns nullifiers from current_block_num until
513        // response.block_header.block_num())
514
515        // Check for new nullifiers for input notes that were updated
516        let nullifiers_tags: Vec<u16> = state_sync_update
517            .note_updates
518            .unspent_nullifiers()
519            .map(|nullifier| nullifier.prefix())
520            .collect();
521
522        let mut new_nullifiers = self
523            .rpc_api
524            .sync_nullifiers(&nullifiers_tags, current_block_num, Some(state_sync_update.block_num))
525            .await?;
526
527        // Discard nullifiers that are newer than the current block (this might happen if the block
528        // changes between the sync_state and the check_nullifier calls)
529        new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num);
530
531        for nullifier_update in new_nullifiers {
532            state_sync_update.note_updates.apply_nullifiers_state_transitions(
533                &nullifier_update,
534                state_sync_update.transaction_updates.committed_transactions(),
535            )?;
536
537            // Process nullifiers and track the updates of local tracked transactions that were
538            // discarded because the notes that they were processing were nullified by an
539            // another transaction.
540            state_sync_update
541                .transaction_updates
542                .apply_input_note_nullified(nullifier_update.nullifier);
543        }
544
545        Ok(())
546    }
547
548    /// Applies the changes received from the sync response to the transactions tracked by the
549    /// client and updates the `transaction_updates` accordingly.
550    ///
551    /// The transaction updates might include:
552    /// * New transactions that were committed in the block.
553    /// * Transactions that were discarded because they were stale or expired.
554    fn transaction_state_sync(
555        &self,
556        transaction_updates: &mut TransactionUpdateTracker,
557        new_block_header: &BlockHeader,
558        transaction_inclusions: &[TransactionInclusion],
559    ) {
560        for transaction_inclusion in transaction_inclusions {
561            transaction_updates.apply_transaction_inclusion(
562                transaction_inclusion,
563                u64::from(new_block_header.timestamp()),
564            ); //TODO: Change timestamps from u64 to u32
565        }
566
567        transaction_updates
568            .apply_sync_height_update(new_block_header.block_num(), self.tx_discard_delta);
569    }
570}
571
572// HELPERS
573// ================================================================================================
574
575/// Derives account commitment updates from transaction records.
576///
577/// For each unique account, takes the `final_state_commitment` from the transaction with the
578/// highest `block_num`. This replicates the old `SyncState` behavior where the node returned
579/// the latest account commitment per account in the synced range.
580fn derive_account_commitment_updates(
581    transaction_records: &[rpc_tx::TransactionRecord],
582) -> Vec<(AccountId, Word)> {
583    let mut latest_by_account: BTreeMap<AccountId, &rpc_tx::TransactionRecord> = BTreeMap::new();
584
585    for record in transaction_records {
586        let account_id = record.transaction_header.account_id();
587        latest_by_account
588            .entry(account_id)
589            .and_modify(|existing| {
590                if record.block_num > existing.block_num {
591                    *existing = record;
592                }
593            })
594            .or_insert(record);
595    }
596
597    latest_by_account
598        .into_iter()
599        .map(|(account_id, record)| {
600            (account_id, record.transaction_header.final_state_commitment())
601        })
602        .collect()
603}
604
605/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
606/// authentication nodes for leaves we track.
607fn apply_mmr_changes(
608    new_block: &BlockHeader,
609    new_block_has_relevant_notes: bool,
610    current_partial_mmr: &mut PartialMmr,
611    mmr_delta: MmrDelta,
612) -> Result<(MmrPeaks, Vec<(InOrderIndex, Word)>), ClientError> {
613    // Apply the MMR delta to bring MMR to forest equal to chain tip
614    let mut new_authentication_nodes: Vec<(InOrderIndex, Word)> =
615        current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
616
617    let new_peaks = current_partial_mmr.peaks();
618
619    new_authentication_nodes
620        .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
621
622    Ok((new_peaks, new_authentication_nodes))
623}
624
625#[cfg(test)]
626mod tests {
627    use alloc::collections::BTreeSet;
628    use alloc::sync::Arc;
629
630    use async_trait::async_trait;
631    use miden_protocol::crypto::merkle::mmr::{Forest, PartialMmr};
632
633    use super::*;
634    use crate::testing::mock::MockRpcApi;
635
636    /// Mock note screener that discards all notes, for minimal test setup.
637    struct MockScreener;
638
639    #[async_trait(?Send)]
640    impl OnNoteReceived for MockScreener {
641        async fn on_note_received(
642            &self,
643            _committed_note: CommittedNote,
644            _public_note: Option<InputNoteRecord>,
645        ) -> Result<NoteUpdateAction, ClientError> {
646            Ok(NoteUpdateAction::Discard)
647        }
648    }
649
650    fn empty() -> StateSyncInput {
651        StateSyncInput {
652            accounts: vec![],
653            note_tags: BTreeSet::new(),
654            input_notes: vec![],
655            output_notes: vec![],
656            uncommitted_transactions: vec![],
657        }
658    }
659
660    #[tokio::test]
661    async fn sync_state_across_multiple_iterations_with_same_mmr() {
662        // Setup: create a mock chain and advance it so there are blocks to sync.
663        let mock_rpc = MockRpcApi::default();
664        mock_rpc.advance_blocks(3);
665        let chain_tip_1 = mock_rpc.get_chain_tip_block_num();
666
667        let state_sync = StateSync::new(Arc::new(mock_rpc.clone()), Arc::new(MockScreener), None);
668
669        // Build the initial PartialMmr from genesis (only 1 leaf).
670        let genesis_peaks = mock_rpc.get_mmr().peaks_at(Forest::new(1)).unwrap();
671        let mut partial_mmr = PartialMmr::from_peaks(genesis_peaks);
672        assert_eq!(partial_mmr.forest().num_leaves(), 1);
673
674        // First sync
675        let update = state_sync.sync_state(&mut partial_mmr, empty()).await.unwrap();
676
677        assert_eq!(update.block_num, chain_tip_1);
678        let forest_1 = partial_mmr.forest();
679        // The MMR should contain one leaf per block (genesis + the new blocks).
680        assert_eq!(forest_1.num_leaves(), chain_tip_1.as_u32() as usize + 1);
681
682        // Second sync
683        mock_rpc.advance_blocks(2);
684        let chain_tip_2 = mock_rpc.get_chain_tip_block_num();
685
686        let update = state_sync.sync_state(&mut partial_mmr, empty()).await.unwrap();
687
688        assert_eq!(update.block_num, chain_tip_2);
689        let forest_2 = partial_mmr.forest();
690        assert!(forest_2 > forest_1);
691        assert_eq!(forest_2.num_leaves(), chain_tip_2.as_u32() as usize + 1);
692
693        // Third sync (no new blocks)
694        let update = state_sync.sync_state(&mut partial_mmr, empty()).await.unwrap();
695
696        assert_eq!(update.block_num, chain_tip_2);
697        assert_eq!(partial_mmr.forest(), forest_2);
698    }
699}