miden_client/sync/
state_sync.rs

1use alloc::{boxed::Box, collections::BTreeMap, sync::Arc, vec::Vec};
2use core::{future::Future, pin::Pin};
3
4use miden_objects::{
5    Digest,
6    account::{Account, AccountHeader, AccountId},
7    block::{BlockHeader, BlockNumber},
8    crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr},
9    note::{NoteId, NoteTag},
10    transaction::PartialBlockchain,
11};
12use tracing::info;
13
14use super::{
15    AccountUpdates, BlockUpdates, StateSyncUpdate, state_sync_update::TransactionUpdateTracker,
16};
17use crate::{
18    ClientError,
19    note::{NoteScreener, NoteUpdateTracker},
20    rpc::{
21        NodeRpcClient,
22        domain::{note::CommittedNote, transaction::TransactionInclusion},
23    },
24    store::{InputNoteRecord, NoteFilter, OutputNoteRecord, Store, StoreError},
25    transaction::TransactionRecord,
26};
27
28// SYNC CALLBACKS
29// ================================================================================================
30
31/// Callback that gets executed when a new note is received as part of the sync response.
32///
33/// It receives the committed note received from the network and an optional note record that
34/// corresponds to the state of the note in the network (only if the note is public).
35///
36/// It returns a boolean indicating if the received note update is relevant. If the return value
37/// is `false`, it gets discarded. If it is `true`, the update gets committed to the client's store.
38pub type OnNoteReceived<'a> = Box<
39    dyn Fn(
40        CommittedNote,
41        Option<InputNoteRecord>,
42        Arc<NoteScreener<'a>>,
43    ) -> Pin<Box<(dyn Future<Output = Result<bool, ClientError>> + 'a)>>,
44>;
45
46// STATE SYNC
47// ================================================================================================
48
49/// The state sync components encompasses the client's sync logic. It is then used to requset
50/// updates from the node and apply them to the relevant elements. The updates are then returned and
51/// can be applied to the store to persist the changes.
52///
53/// When created it receives a callback that will be executed when a new note inclusion is received
54/// in the sync response.
55pub struct StateSync<'a> {
56    /// The RPC client used to communicate with the node.
57    rpc_api: Arc<dyn NodeRpcClient + Send>,
58    /// Callback to be executed when a new note inclusion is received.
59    on_note_received: OnNoteReceived<'a>,
60    tx_graceful_blocks: Option<u32>,
61    note_screener: Arc<NoteScreener<'a>>,
62}
63
64impl<'a> StateSync<'a> {
65    /// Creates a new instance of the state sync component.
66    ///
67    /// # Arguments
68    ///
69    /// * `rpc_api` - The RPC client used to communicate with the node.
70    /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
71    pub fn new(
72        rpc_api: Arc<dyn NodeRpcClient + Send>,
73        on_note_received: OnNoteReceived<'a>,
74        tx_graceful_blocks: Option<u32>,
75        note_screener: NoteScreener<'a>,
76    ) -> Self {
77        Self {
78            rpc_api,
79            on_note_received,
80            tx_graceful_blocks,
81            #[allow(clippy::arc_with_non_send_sync)]
82            note_screener: Arc::new(note_screener),
83        }
84    }
85
86    /// Syncs the state of the client with the chain tip of the node, returning the updates that
87    /// should be applied to the store.
88    ///
89    /// During the sync process, the client will go through the following steps:
90    /// 1. A request is sent to the node to get the state updates. This request includes tracked
91    ///    account IDs and the tags of notes that might have changed or that might be of interest to
92    ///    the client.
93    /// 2. A response is received with the current state of the network. The response includes
94    ///    information about new and committed notes, updated accounts, and committed transactions.
95    /// 3. Tracked public accounts are updated and private accounts are validated against the node
96    ///    state.
97    /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
98    ///    during the sync processing.
99    /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
100    ///    [`OnNoteReceived`] callback.
101    /// 6. Transactions are updated with their new states. Transactions might be committed or
102    ///    discarded.
103    /// 7. The MMR is updated with the new peaks and authentication nodes.
104    ///
105    /// # Arguments
106    /// * `current_partial_blockchain` - The current partial view of the blockchain.
107    /// * `accounts` - All the headers of tracked accounts.
108    /// * `note_tags` - The note tags to be used in the sync state request.
109    /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
110    /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
111    pub async fn sync_state(
112        self,
113        current_partial_blockchain: PartialBlockchain,
114        accounts: Vec<AccountHeader>,
115        note_tags: Vec<NoteTag>,
116        unspent_input_notes: Vec<InputNoteRecord>,
117        unspent_output_notes: Vec<OutputNoteRecord>,
118        uncommitted_transactions: Vec<TransactionRecord>,
119    ) -> Result<StateSyncUpdate, ClientError> {
120        let block_num =
121            current_partial_blockchain.chain_length().checked_sub(1).unwrap_or_default();
122
123        let mut state_sync_update = StateSyncUpdate {
124            block_num,
125            note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
126            transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
127            ..Default::default()
128        };
129
130        let mut partial_mmr = current_partial_blockchain.mmr().clone();
131
132        loop {
133            if !self
134                .sync_state_step(&mut state_sync_update, &mut partial_mmr, &accounts, &note_tags)
135                .await?
136            {
137                break;
138            }
139        }
140
141        self.sync_nullifiers(&mut state_sync_update, block_num).await?;
142
143        Ok(state_sync_update)
144    }
145
146    /// Executes a single step of the state sync process, returning `true` if the client should
147    /// continue syncing and `false` if the client has reached the chain tip.
148    ///
149    /// A step in this context means a single request to the node to get the next relevant block and
150    /// the changes that happened in it. This block may not be the last one in the chain and
151    /// the client may need to call this method multiple times until it reaches the chain tip.
152    ///
153    /// The `sync_state_update` field of the struct will be updated with the new changes from this
154    /// step.
155    async fn sync_state_step(
156        &self,
157        state_sync_update: &mut StateSyncUpdate,
158        current_partial_mmr: &mut PartialMmr,
159        accounts: &[AccountHeader],
160        note_tags: &[NoteTag],
161    ) -> Result<bool, ClientError> {
162        let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
163
164        let response = self
165            .rpc_api
166            .sync_state(state_sync_update.block_num, &account_ids, note_tags)
167            .await?;
168
169        // We don't need to continue if the chain has not advanced, there are no new changes
170        if response.block_header.block_num() == state_sync_update.block_num {
171            return Ok(false);
172        }
173
174        let new_block_num = response.block_header.block_num();
175        state_sync_update.block_num = new_block_num;
176
177        self.account_state_sync(
178            &mut state_sync_update.account_updates,
179            accounts,
180            &response.account_commitment_updates,
181        )
182        .await?;
183
184        self.transaction_state_sync(
185            &mut state_sync_update.transaction_updates,
186            new_block_num,
187            &response.transactions,
188        );
189
190        let found_relevant_note = self
191            .note_state_sync(
192                &mut state_sync_update.note_updates,
193                response.note_inclusions,
194                &response.block_header,
195            )
196            .await?;
197
198        let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
199            &response.block_header,
200            found_relevant_note,
201            current_partial_mmr,
202            response.mmr_delta,
203        )?;
204
205        let mut new_blocks = vec![];
206        if found_relevant_note || response.chain_tip == new_block_num {
207            // Only track relevant blocks or the chain tip
208            new_blocks.push((response.block_header, found_relevant_note, new_mmr_peaks));
209        }
210
211        state_sync_update
212            .block_updates
213            .extend(BlockUpdates::new(new_blocks, new_authentication_nodes));
214
215        if response.chain_tip == new_block_num {
216            Ok(false)
217        } else {
218            Ok(true)
219        }
220    }
221
222    // HELPERS
223    // --------------------------------------------------------------------------------------------
224
225    /// Compares the state of tracked accounts with the updates received from the node. The method
226    /// updates the `state_sync_update` field with the details of the accounts that need to be
227    /// updated.
228    ///
229    /// The account updates might include:
230    /// * Public accounts that have been updated in the node.
231    /// * Private accounts that have been marked as mismatched because the current commitment
232    ///   doesn't match the one received from the node. The client will need to handle these cases
233    ///   as they could be a stale account state or a reason to lock the account.
234    async fn account_state_sync(
235        &self,
236        account_updates: &mut AccountUpdates,
237        accounts: &[AccountHeader],
238        account_commitment_updates: &[(AccountId, Digest)],
239    ) -> Result<(), ClientError> {
240        let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
241            accounts.iter().partition(|account_header| account_header.id().is_public());
242
243        let updated_public_accounts = self
244            .get_updated_public_accounts(account_commitment_updates, &public_accounts)
245            .await?;
246
247        let mismatched_private_accounts = account_commitment_updates
248            .iter()
249            .filter(|(account_id, digest)| {
250                private_accounts
251                    .iter()
252                    .any(|account| account.id() == *account_id && &account.commitment() != digest)
253            })
254            .copied()
255            .collect::<Vec<_>>();
256
257        account_updates
258            .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
259
260        Ok(())
261    }
262
263    /// Queries the node for the latest state of the public accounts that don't match the current
264    /// state of the client.
265    async fn get_updated_public_accounts(
266        &self,
267        account_updates: &[(AccountId, Digest)],
268        current_public_accounts: &[&AccountHeader],
269    ) -> Result<Vec<Account>, ClientError> {
270        let mut mismatched_public_accounts = vec![];
271
272        for (id, commitment) in account_updates {
273            // check if this updated account state is tracked by the client
274            if let Some(account) = current_public_accounts
275                .iter()
276                .find(|acc| *id == acc.id() && *commitment != acc.commitment())
277            {
278                mismatched_public_accounts.push(*account);
279            }
280        }
281
282        self.rpc_api
283            .get_updated_public_accounts(&mismatched_public_accounts)
284            .await
285            .map_err(ClientError::RpcError)
286    }
287
288    /// Applies the changes received from the sync response to the notes and transactions tracked
289    /// by the client and updates the `note_updates` accordingly.
290    ///
291    /// This method uses the callbacks provided to the [`StateSync`] component to check if the
292    /// updates received are relevant to the client.
293    ///
294    /// The note updates might include:
295    /// * New notes that we received from the node and might be relevant to the client.
296    /// * Tracked expected notes that were committed in the block.
297    /// * Tracked notes that were being processed by a transaction that got committed.
298    /// * Tracked notes that were nullified by an external transaction.
299    async fn note_state_sync(
300        &self,
301        note_updates: &mut NoteUpdateTracker,
302        note_inclusions: Vec<CommittedNote>,
303        block_header: &BlockHeader,
304    ) -> Result<bool, ClientError> {
305        let public_note_ids: Vec<NoteId> = note_inclusions
306            .iter()
307            .filter_map(|note| (!note.metadata().is_private()).then_some(*note.note_id()))
308            .collect();
309
310        let mut found_relevant_note = false;
311
312        // Process note inclusions
313        let new_public_notes = self.fetch_public_note_details(&public_note_ids).await?;
314        for committed_note in note_inclusions {
315            let public_note = new_public_notes.get(committed_note.note_id()).cloned();
316
317            if (self.on_note_received)(
318                committed_note.clone(),
319                public_note.clone(),
320                self.note_screener.clone(),
321            )
322            .await?
323            {
324                found_relevant_note = true;
325
326                note_updates.apply_committed_note_state_transitions(
327                    &committed_note,
328                    public_note,
329                    block_header,
330                )?;
331            }
332        }
333
334        Ok(found_relevant_note)
335    }
336
337    /// Queries the node for all received notes that aren't being locally tracked in the client.
338    ///
339    /// The client can receive metadata for private notes that it's not tracking. In this case,
340    /// notes are ignored for now as they become useless until details are imported.
341    async fn fetch_public_note_details(
342        &self,
343        query_notes: &[NoteId],
344    ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
345        if query_notes.is_empty() {
346            return Ok(BTreeMap::new());
347        }
348        info!("Getting note details for notes that are not being tracked.");
349
350        let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
351
352        Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
353    }
354
355    /// Collects the nullifier tags for the notes that were updated in the sync response and uses
356    /// the `check_nullifiers_by_prefix` endpoint to check if there are new nullifiers for these
357    /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
358    ///
359    /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
360    async fn sync_nullifiers(
361        &self,
362        state_sync_update: &mut StateSyncUpdate,
363        current_block_num: BlockNumber,
364    ) -> Result<(), ClientError> {
365        // To receive information about added nullifiers, we reduce them to the higher 16 bits
366        // Note that besides filtering by nullifier prefixes, the node also filters by block number
367        // (it only returns nullifiers from current_block_num until
368        // response.block_header.block_num())
369
370        // Check for new nullifiers for input notes that were updated
371        let nullifiers_tags: Vec<u16> = state_sync_update
372            .note_updates
373            .unspent_nullifiers()
374            .map(|nullifier| nullifier.prefix())
375            .collect();
376
377        let mut new_nullifiers = self
378            .rpc_api
379            .check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
380            .await?;
381
382        // Discard nullifiers that are newer than the current block (this might happen if the block
383        // changes between the sync_state and the check_nullifier calls)
384        new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num.as_u32());
385
386        for nullifier_update in new_nullifiers {
387            state_sync_update.note_updates.apply_nullifiers_state_transitions(
388                &nullifier_update,
389                state_sync_update.transaction_updates.committed_transactions(),
390            )?;
391
392            // Process nullifiers and track the updates of local tracked transactions that were
393            // discarded because the notes that they were processing were nullified by an
394            // another transaction.
395            state_sync_update
396                .transaction_updates
397                .apply_input_note_nullified(nullifier_update.nullifier);
398        }
399
400        Ok(())
401    }
402
403    /// Applies the changes received from the sync response to the transactions tracked by the
404    /// client and updates the `transaction_updates` accordingly.
405    ///
406    /// The transaction updates might include:
407    /// * New transactions that were committed in the block.
408    /// * Transactions that were discarded because they were stale or expired.
409    fn transaction_state_sync(
410        &self,
411        transaction_updates: &mut TransactionUpdateTracker,
412        new_sync_height: BlockNumber,
413        transaction_inclusions: &[TransactionInclusion],
414    ) {
415        for transaction_inclusion in transaction_inclusions {
416            transaction_updates.apply_transaction_inclusion(transaction_inclusion);
417        }
418
419        transaction_updates.apply_sync_height_update(new_sync_height, self.tx_graceful_blocks);
420    }
421}
422
423// HELPERS
424// ================================================================================================
425
426/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
427/// authentication nodes for leaves we track.
428fn apply_mmr_changes(
429    new_block: &BlockHeader,
430    new_block_has_relevant_notes: bool,
431    current_partial_mmr: &mut PartialMmr,
432    mmr_delta: MmrDelta,
433) -> Result<(MmrPeaks, Vec<(InOrderIndex, Digest)>), ClientError> {
434    // Apply the MMR delta to bring MMR to forest equal to chain tip
435    let mut new_authentication_nodes: Vec<(InOrderIndex, Digest)> =
436        current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
437
438    let new_peaks = current_partial_mmr.peaks();
439
440    new_authentication_nodes
441        .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
442
443    Ok((new_peaks, new_authentication_nodes))
444}
445
446// DEFAULT CALLBACK IMPLEMENTATIONS
447// ================================================================================================
448
449/// Default implementation of the [`OnNoteReceived`] callback. It queries the store for the
450/// committed note to check if it's relevant. If the note wasn't being tracked but it came in the
451/// sync response it may be a new public note, in that case we use the [`NoteScreener`] to check its
452/// relevance.
453pub async fn on_note_received(
454    store: Arc<dyn Store>,
455    committed_note: CommittedNote,
456    public_note: Option<InputNoteRecord>,
457    note_screener: Arc<NoteScreener<'_>>,
458) -> Result<bool, ClientError> {
459    let note_id = *committed_note.note_id();
460
461    if !store.get_input_notes(NoteFilter::Unique(note_id)).await?.is_empty()
462        || !store.get_output_notes(NoteFilter::Unique(note_id)).await?.is_empty()
463    {
464        // The note is being tracked by the client so it is relevant
465        Ok(true)
466    } else if let Some(public_note) = public_note {
467        // The note is not being tracked by the client and is public so we can screen it
468        let new_note_relevance = note_screener
469            .check_relevance(
470                &public_note.try_into().map_err(ClientError::NoteRecordConversionError)?,
471            )
472            .await?;
473
474        Ok(!new_note_relevance.is_empty())
475    } else {
476        // The note is not being tracked by the client and is private so we can't determine if it
477        // is relevant
478        Ok(false)
479    }
480}