Skip to main content

miden_client/sync/
state_sync.rs

1use alloc::boxed::Box;
2use alloc::collections::{BTreeMap, BTreeSet};
3use alloc::sync::Arc;
4use alloc::vec::Vec;
5
6use async_trait::async_trait;
7use miden_protocol::Word;
8use miden_protocol::account::{Account, AccountHeader, AccountId};
9use miden_protocol::block::{BlockHeader, BlockNumber};
10use miden_protocol::crypto::merkle::mmr::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr};
11use miden_protocol::note::{NoteId, NoteTag};
12use tracing::info;
13
14use super::state_sync_update::TransactionUpdateTracker;
15use super::{AccountUpdates, StateSyncUpdate};
16use crate::ClientError;
17use crate::note::NoteUpdateTracker;
18use crate::rpc::NodeRpcClient;
19use crate::rpc::domain::note::CommittedNote;
20use crate::rpc::domain::sync::StateSyncInfo;
21use crate::rpc::domain::transaction::TransactionInclusion;
22use crate::store::{InputNoteRecord, OutputNoteRecord, StoreError};
23use crate::transaction::TransactionRecord;
24
25// SYNC CALLBACKS
26// ================================================================================================
27
28/// The action to be taken when a note update is received as part of the sync response.
29#[allow(clippy::large_enum_variant)]
30pub enum NoteUpdateAction {
31    /// The note commit update is relevant and the specified note should be marked as committed in
32    /// the store, storing its inclusion proof.
33    Commit(CommittedNote),
34    /// The public note is relevant and should be inserted into the store.
35    Insert(InputNoteRecord),
36    /// The note update is not relevant and should be discarded.
37    Discard,
38}
39
40#[async_trait(?Send)]
41pub trait OnNoteReceived {
42    /// Callback that gets executed when a new note is received as part of the sync response.
43    ///
44    /// It receives:
45    ///
46    /// - The committed note received from the network.
47    /// - An optional note record that corresponds to the state of the note in the network (only if
48    ///   the note is public).
49    ///
50    /// It returns an enum indicating the action to be taken for the received note update. Whether
51    /// the note updated should be committed, new public note inserted, or ignored.
52    async fn on_note_received(
53        &self,
54        committed_note: CommittedNote,
55        public_note: Option<InputNoteRecord>,
56    ) -> Result<NoteUpdateAction, ClientError>;
57}
58
59// STATE SYNC
60// ================================================================================================
61
62/// The state sync components encompasses the client's sync logic. It is then used to request
63/// updates from the node and apply them to the relevant elements. The updates are then returned and
64/// can be applied to the store to persist the changes.
65///
66/// When created it receives a callback that will be executed when a new note inclusion is received
67/// in the sync response.
68#[derive(Clone)]
69pub struct StateSync {
70    /// The RPC client used to communicate with the node.
71    rpc_api: Arc<dyn NodeRpcClient>,
72    /// Responsible for checking the relevance of notes and executing the
73    /// [`OnNoteReceived`] callback when a new note inclusion is received.
74    note_screener: Arc<dyn OnNoteReceived>,
75    /// The number of blocks that are considered old enough to discard pending transactions. If
76    /// `None`, there is no limit and transactions will be kept indefinitely.
77    tx_graceful_blocks: Option<u32>,
78}
79
80impl StateSync {
81    /// Creates a new instance of the state sync component.
82    ///
83    /// # Arguments
84    ///
85    /// * `rpc_api` - The RPC client used to communicate with the node.
86    /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
87    /// * `tx_graceful_blocks` - The number of blocks that are considered old enough to discard.
88    /// * `note_screener` - The note screener used to check the relevance of notes.
89    pub fn new(
90        rpc_api: Arc<dyn NodeRpcClient>,
91        note_screener: Arc<dyn OnNoteReceived>,
92        tx_graceful_blocks: Option<u32>,
93    ) -> Self {
94        Self {
95            rpc_api,
96            note_screener,
97            tx_graceful_blocks,
98        }
99    }
100
101    /// Syncs the state of the client with the chain tip of the node, returning the updates that
102    /// should be applied to the store.
103    ///
104    /// During the sync process, the client will go through the following steps:
105    /// 1. A request is sent to the node to get the state updates. This request includes tracked
106    ///    account IDs and the tags of notes that might have changed or that might be of interest to
107    ///    the client.
108    /// 2. A response is received with the current state of the network. The response includes
109    ///    information about new and committed notes, updated accounts, and committed transactions.
110    /// 3. Tracked public accounts are updated and private accounts are validated against the node
111    ///    state.
112    /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
113    ///    during the sync processing.
114    /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
115    ///    [`OnNoteReceived`] callback.
116    /// 6. Transactions are updated with their new states. Transactions might be committed or
117    ///    discarded.
118    /// 7. The MMR is updated with the new peaks and authentication nodes.
119    ///
120    /// # Arguments
121    /// * `current_partial_blockchain` - The current partial view of the blockchain.
122    /// * `accounts` - All the headers of tracked accounts.
123    /// * `note_tags` - The note tags to be used in the sync state request.
124    /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
125    /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
126    pub async fn sync_state(
127        &self,
128        mut current_partial_mmr: PartialMmr,
129        accounts: Vec<AccountHeader>,
130        note_tags: BTreeSet<NoteTag>,
131        unspent_input_notes: Vec<InputNoteRecord>,
132        unspent_output_notes: Vec<OutputNoteRecord>,
133        uncommitted_transactions: Vec<TransactionRecord>,
134    ) -> Result<StateSyncUpdate, ClientError> {
135        let block_num = u32::try_from(
136            current_partial_mmr.forest().num_leaves().checked_sub(1).unwrap_or_default(),
137        )
138        .map_err(|_| ClientError::InvalidPartialMmrForest)?
139        .into();
140
141        let mut state_sync_update = StateSyncUpdate {
142            block_num,
143            note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
144            transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
145            ..Default::default()
146        };
147
148        let note_tags = Arc::new(note_tags);
149        let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
150        let mut state_sync_steps = Vec::new();
151
152        while let Some(step) = self
153            .sync_state_step(state_sync_update.block_num, &account_ids, &note_tags)
154            .await?
155        {
156            let sync_block_num = step.block_header.block_num();
157
158            let reached_tip = step.chain_tip == sync_block_num;
159
160            state_sync_update.block_num = sync_block_num;
161            state_sync_steps.push(step);
162
163            if reached_tip {
164                break;
165            }
166        }
167
168        // TODO: fetch_public_note_details should take an iterator or btreeset down to the RPC call
169        // (this would be a breaking change so it should be done separately)
170        let public_note_ids: Vec<NoteId> = state_sync_steps
171            .iter()
172            .flat_map(|s| s.note_inclusions.iter())
173            .filter(|n| !n.metadata().is_private())
174            .map(|n| *n.note_id())
175            .collect();
176
177        let public_note_records = self.fetch_public_note_details(&public_note_ids).await?;
178
179        // Collect account commitment updates across all sync steps. Each account only needs
180        // to be checked once since GetAccount always returns the latest state.
181        let merged_commitment_updates: Vec<(AccountId, Word)> = state_sync_steps
182            .iter()
183            .flat_map(|s| s.account_commitment_updates.iter())
184            .map(|(id, w)| (*id, *w))
185            .collect::<BTreeMap<_, _>>()
186            .into_iter()
187            .collect();
188
189        self.account_state_sync(
190            &mut state_sync_update.account_updates,
191            &accounts,
192            &merged_commitment_updates,
193        )
194        .await?;
195
196        // Apply local changes. These involve updating the MMR and applying state transitions
197        // to notes based on the received information.
198        info!("Applying state transitions locally.");
199
200        for sync_step in state_sync_steps {
201            let StateSyncInfo {
202                chain_tip,
203                block_header,
204                mmr_delta,
205                note_inclusions,
206                transactions,
207                ..
208            } = sync_step;
209
210            self.transaction_state_sync(
211                &mut state_sync_update.transaction_updates,
212                &block_header,
213                &transactions,
214            );
215
216            let found_relevant_note = self
217                .note_state_sync(
218                    &mut state_sync_update.note_updates,
219                    note_inclusions,
220                    &block_header,
221                    &public_note_records,
222                )
223                .await?;
224
225            let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
226                &block_header,
227                found_relevant_note,
228                &mut current_partial_mmr,
229                mmr_delta,
230            )?;
231
232            let include_block = found_relevant_note || chain_tip == block_header.block_num();
233            if include_block {
234                state_sync_update.block_updates.insert(
235                    block_header,
236                    found_relevant_note,
237                    new_mmr_peaks,
238                    new_authentication_nodes,
239                );
240            } else {
241                // Even though this block header is not stored, `apply_mmr_changes` may
242                // produce authentication nodes for already-tracked leaves whose Merkle
243                // paths change as the MMR grows. These must be persisted so that the
244                // `PartialMmr` can be correctly reconstructed from the store after a
245                // client restart.
246                state_sync_update
247                    .block_updates
248                    .extend_authentication_nodes(new_authentication_nodes);
249            }
250        }
251        info!("Syncing nullifiers.");
252
253        self.sync_nullifiers(&mut state_sync_update, block_num).await?;
254
255        Ok(state_sync_update)
256    }
257
258    /// Executes a single sync request and returns the node response if the chain advanced.
259    ///
260    /// This method issues a `/SyncState` call starting from `current_block_num`. If the node
261    /// reports the same block number, `None` is returned, signalling that the client is already at
262    /// the requested height. Otherwise the full [`StateSyncInfo`] is returned for deferred
263    /// processing by the caller.
264    async fn sync_state_step(
265        &self,
266        current_block_num: BlockNumber,
267        account_ids: &[AccountId],
268        note_tags: &Arc<BTreeSet<NoteTag>>,
269    ) -> Result<Option<StateSyncInfo>, ClientError> {
270        info!("Performing sync state step.");
271        let response = self
272            .rpc_api
273            .sync_state(current_block_num, account_ids, note_tags.as_ref())
274            .await?;
275
276        // We don't need to continue if the chain has not advanced, there are no new changes
277        if response.block_header.block_num() == current_block_num {
278            return Ok(None);
279        }
280
281        Ok(Some(response))
282    }
283
284    // HELPERS
285    // --------------------------------------------------------------------------------------------
286
287    /// Compares the state of tracked accounts with the updates received from the node. The method
288    /// updates the `state_sync_update` field with the details of the accounts that need to be
289    /// updated.
290    ///
291    /// The account updates might include:
292    /// * Public accounts that have been updated in the node.
293    /// * Network accounts that have been updated in the node and are being tracked by the client.
294    /// * Private accounts that have been marked as mismatched because the current commitment
295    ///   doesn't match the one received from the node. The client will need to handle these cases
296    ///   as they could be a stale account state or a reason to lock the account.
297    async fn account_state_sync(
298        &self,
299        account_updates: &mut AccountUpdates,
300        accounts: &[AccountHeader],
301        account_commitment_updates: &[(AccountId, Word)],
302    ) -> Result<(), ClientError> {
303        let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
304            accounts.iter().partition(|account_header| !account_header.id().is_private());
305
306        let updated_public_accounts = self
307            .get_updated_public_accounts(account_commitment_updates, &public_accounts)
308            .await?;
309
310        let mismatched_private_accounts = account_commitment_updates
311            .iter()
312            .filter(|(account_id, digest)| {
313                private_accounts
314                    .iter()
315                    .any(|account| account.id() == *account_id && &account.commitment() != digest)
316            })
317            .copied()
318            .collect::<Vec<_>>();
319
320        account_updates
321            .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
322
323        Ok(())
324    }
325
326    /// Queries the node for the latest state of the public accounts that don't match the current
327    /// state of the client.
328    async fn get_updated_public_accounts(
329        &self,
330        account_updates: &[(AccountId, Word)],
331        current_public_accounts: &[&AccountHeader],
332    ) -> Result<Vec<Account>, ClientError> {
333        let mut mismatched_public_accounts = vec![];
334
335        for (id, commitment) in account_updates {
336            // check if this updated account state is tracked by the client
337            if let Some(account) = current_public_accounts
338                .iter()
339                .find(|acc| *id == acc.id() && *commitment != acc.commitment())
340            {
341                mismatched_public_accounts.push(*account);
342            }
343        }
344
345        self.rpc_api
346            .get_updated_public_accounts(&mismatched_public_accounts)
347            .await
348            .map_err(ClientError::RpcError)
349    }
350
351    /// Applies the changes received from the sync response to the notes and transactions tracked
352    /// by the client and updates the `note_updates` accordingly.
353    ///
354    /// This method uses the callbacks provided to the [`StateSync`] component to check if the
355    /// updates received are relevant to the client.
356    ///
357    /// The note updates might include:
358    /// * New notes that we received from the node and might be relevant to the client.
359    /// * Tracked expected notes that were committed in the block.
360    /// * Tracked notes that were being processed by a transaction that got committed.
361    /// * Tracked notes that were nullified by an external transaction.
362    ///
363    /// The `public_notes` parameter provides cached public note details for the current sync
364    /// iteration so the node is only queried once per batch.
365    async fn note_state_sync(
366        &self,
367        note_updates: &mut NoteUpdateTracker,
368        note_inclusions: Vec<CommittedNote>,
369        block_header: &BlockHeader,
370        public_notes: &BTreeMap<NoteId, InputNoteRecord>,
371    ) -> Result<bool, ClientError> {
372        // `found_relevant_note` tracks whether we want to persist the block header in the end
373        let mut found_relevant_note = false;
374
375        for committed_note in note_inclusions {
376            let public_note = (!committed_note.metadata().is_private())
377                .then(|| public_notes.get(committed_note.note_id()))
378                .flatten()
379                .cloned();
380
381            match self.note_screener.on_note_received(committed_note, public_note).await? {
382                NoteUpdateAction::Commit(committed_note) => {
383                    // Only mark the downloaded block header as relevant if we are talking about
384                    // an input note (output notes get marked as committed but we don't need the
385                    // block for anything there)
386                    found_relevant_note |= note_updates
387                        .apply_committed_note_state_transitions(&committed_note, block_header)?;
388                },
389                NoteUpdateAction::Insert(public_note) => {
390                    found_relevant_note = true;
391
392                    note_updates.apply_new_public_note(public_note, block_header)?;
393                },
394                NoteUpdateAction::Discard => {},
395            }
396        }
397
398        Ok(found_relevant_note)
399    }
400
401    /// Queries the node for all received notes that aren't being locally tracked in the client.
402    ///
403    /// The client can receive metadata for private notes that it's not tracking. In this case,
404    /// notes are ignored for now as they become useless until details are imported.
405    async fn fetch_public_note_details(
406        &self,
407        query_notes: &[NoteId],
408    ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
409        if query_notes.is_empty() {
410            return Ok(BTreeMap::new());
411        }
412
413        info!("Getting note details for notes that are not being tracked.");
414
415        let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
416
417        Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
418    }
419
420    /// Collects the nullifier tags for the notes that were updated in the sync response and uses
421    /// the `sync_nullifiers` endpoint to check if there are new nullifiers for these
422    /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
423    ///
424    /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
425    async fn sync_nullifiers(
426        &self,
427        state_sync_update: &mut StateSyncUpdate,
428        current_block_num: BlockNumber,
429    ) -> Result<(), ClientError> {
430        // To receive information about added nullifiers, we reduce them to the higher 16 bits
431        // Note that besides filtering by nullifier prefixes, the node also filters by block number
432        // (it only returns nullifiers from current_block_num until
433        // response.block_header.block_num())
434
435        // Check for new nullifiers for input notes that were updated
436        let nullifiers_tags: Vec<u16> = state_sync_update
437            .note_updates
438            .unspent_nullifiers()
439            .map(|nullifier| nullifier.prefix())
440            .collect();
441
442        let mut new_nullifiers = self
443            .rpc_api
444            .sync_nullifiers(&nullifiers_tags, current_block_num, Some(state_sync_update.block_num))
445            .await?;
446
447        // Discard nullifiers that are newer than the current block (this might happen if the block
448        // changes between the sync_state and the check_nullifier calls)
449        new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num);
450
451        for nullifier_update in new_nullifiers {
452            state_sync_update.note_updates.apply_nullifiers_state_transitions(
453                &nullifier_update,
454                state_sync_update.transaction_updates.committed_transactions(),
455            )?;
456
457            // Process nullifiers and track the updates of local tracked transactions that were
458            // discarded because the notes that they were processing were nullified by an
459            // another transaction.
460            state_sync_update
461                .transaction_updates
462                .apply_input_note_nullified(nullifier_update.nullifier);
463        }
464
465        Ok(())
466    }
467
468    /// Applies the changes received from the sync response to the transactions tracked by the
469    /// client and updates the `transaction_updates` accordingly.
470    ///
471    /// The transaction updates might include:
472    /// * New transactions that were committed in the block.
473    /// * Transactions that were discarded because they were stale or expired.
474    fn transaction_state_sync(
475        &self,
476        transaction_updates: &mut TransactionUpdateTracker,
477        new_block_header: &BlockHeader,
478        transaction_inclusions: &[TransactionInclusion],
479    ) {
480        for transaction_inclusion in transaction_inclusions {
481            transaction_updates.apply_transaction_inclusion(
482                transaction_inclusion,
483                u64::from(new_block_header.timestamp()),
484            ); //TODO: Change timestamps from u64 to u32
485        }
486
487        transaction_updates
488            .apply_sync_height_update(new_block_header.block_num(), self.tx_graceful_blocks);
489    }
490}
491
492// HELPERS
493// ================================================================================================
494
495/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
496/// authentication nodes for leaves we track.
497fn apply_mmr_changes(
498    new_block: &BlockHeader,
499    new_block_has_relevant_notes: bool,
500    current_partial_mmr: &mut PartialMmr,
501    mmr_delta: MmrDelta,
502) -> Result<(MmrPeaks, Vec<(InOrderIndex, Word)>), ClientError> {
503    // Apply the MMR delta to bring MMR to forest equal to chain tip
504    let mut new_authentication_nodes: Vec<(InOrderIndex, Word)> =
505        current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
506
507    let new_peaks = current_partial_mmr.peaks();
508
509    new_authentication_nodes
510        .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
511
512    Ok((new_peaks, new_authentication_nodes))
513}