miden_client/sync/
state_sync.rs

1use alloc::boxed::Box;
2use alloc::collections::{BTreeMap, BTreeSet};
3use alloc::sync::Arc;
4use alloc::vec::Vec;
5
6use miden_objects::Word;
7use miden_objects::account::{Account, AccountHeader, AccountId};
8use miden_objects::block::{BlockHeader, BlockNumber};
9use miden_objects::crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr};
10use miden_objects::note::{NoteId, NoteTag};
11use miden_objects::transaction::PartialBlockchain;
12use tonic::async_trait;
13use tracing::info;
14
15use super::state_sync_update::TransactionUpdateTracker;
16use super::{AccountUpdates, BlockUpdates, StateSyncUpdate};
17use crate::ClientError;
18use crate::note::NoteUpdateTracker;
19use crate::rpc::NodeRpcClient;
20use crate::rpc::domain::note::CommittedNote;
21use crate::rpc::domain::transaction::TransactionInclusion;
22use crate::store::{InputNoteRecord, OutputNoteRecord, StoreError};
23use crate::transaction::TransactionRecord;
24
25// SYNC CALLBACKS
26// ================================================================================================
27
28/// The action to be taken when a note update is received as part of the sync response.
29#[allow(clippy::large_enum_variant)]
30pub enum NoteUpdateAction {
31    /// The note commit update is relevant and the specified note should be marked as committed in
32    /// the store, storing its inclusion proof.
33    Commit(CommittedNote),
34    /// The public note is relevant and should be inserted into the store.
35    Insert(InputNoteRecord),
36    /// The note update is not relevant and should be discarded.
37    Discard,
38}
39
40#[async_trait(?Send)]
41pub trait OnNoteReceived {
42    /// Callback that gets executed when a new note is received as part of the sync response.
43    ///
44    /// It receives:
45    ///
46    /// - The committed note received from the network.
47    /// - An optional note record that corresponds to the state of the note in the network (only if
48    ///   the note is public).
49    ///
50    /// It returns an enum indicating the action to be taken for the received note update. Whether
51    /// the note updated should be committed, new public note inserted, or ignored.
52    async fn on_note_received(
53        &self,
54        committed_note: CommittedNote,
55        public_note: Option<InputNoteRecord>,
56    ) -> Result<NoteUpdateAction, ClientError>;
57}
58
59// STATE SYNC
60// ================================================================================================
61
62/// The state sync components encompasses the client's sync logic. It is then used to request
63/// updates from the node and apply them to the relevant elements. The updates are then returned and
64/// can be applied to the store to persist the changes.
65///
66/// When created it receives a callback that will be executed when a new note inclusion is received
67/// in the sync response.
68pub struct StateSync {
69    /// The RPC client used to communicate with the node.
70    rpc_api: Arc<dyn NodeRpcClient + Send>,
71    /// Responsible for checking the relevance of notes and executing the
72    /// [`OnNoteReceived`] callback when a new note inclusion is received.
73    note_screener: Arc<dyn OnNoteReceived>,
74    /// The number of blocks that are considered old enough to discard pending transactions. If
75    /// `None`, there is no limit and transactions will be kept indefinitely.
76    tx_graceful_blocks: Option<u32>,
77}
78
79impl StateSync {
80    /// Creates a new instance of the state sync component.
81    ///
82    /// # Arguments
83    ///
84    /// * `rpc_api` - The RPC client used to communicate with the node.
85    /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
86    /// * `tx_graceful_blocks` - The number of blocks that are considered old enough to discard.
87    /// * `note_screener` - The note screener used to check the relevance of notes.
88    pub fn new(
89        rpc_api: Arc<dyn NodeRpcClient + Send>,
90        note_screener: Arc<dyn OnNoteReceived>,
91        tx_graceful_blocks: Option<u32>,
92    ) -> Self {
93        Self {
94            rpc_api,
95            note_screener,
96            tx_graceful_blocks,
97        }
98    }
99
100    /// Syncs the state of the client with the chain tip of the node, returning the updates that
101    /// should be applied to the store.
102    ///
103    /// During the sync process, the client will go through the following steps:
104    /// 1. A request is sent to the node to get the state updates. This request includes tracked
105    ///    account IDs and the tags of notes that might have changed or that might be of interest to
106    ///    the client.
107    /// 2. A response is received with the current state of the network. The response includes
108    ///    information about new and committed notes, updated accounts, and committed transactions.
109    /// 3. Tracked public accounts are updated and private accounts are validated against the node
110    ///    state.
111    /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
112    ///    during the sync processing.
113    /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
114    ///    [`OnNoteReceived`] callback.
115    /// 6. Transactions are updated with their new states. Transactions might be committed or
116    ///    discarded.
117    /// 7. The MMR is updated with the new peaks and authentication nodes.
118    ///
119    /// # Arguments
120    /// * `current_partial_blockchain` - The current partial view of the blockchain.
121    /// * `accounts` - All the headers of tracked accounts.
122    /// * `note_tags` - The note tags to be used in the sync state request.
123    /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
124    /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
125    pub async fn sync_state(
126        self,
127        current_partial_blockchain: PartialBlockchain,
128        accounts: Vec<AccountHeader>,
129        note_tags: BTreeSet<NoteTag>,
130        unspent_input_notes: Vec<InputNoteRecord>,
131        unspent_output_notes: Vec<OutputNoteRecord>,
132        uncommitted_transactions: Vec<TransactionRecord>,
133    ) -> Result<StateSyncUpdate, ClientError> {
134        let block_num =
135            current_partial_blockchain.chain_length().checked_sub(1).unwrap_or_default();
136
137        let mut state_sync_update = StateSyncUpdate {
138            block_num,
139            note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
140            transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
141            ..Default::default()
142        };
143
144        let mut partial_mmr = current_partial_blockchain.mmr().clone();
145        let note_tags = Arc::new(note_tags);
146        loop {
147            if !self
148                .sync_state_step(
149                    &mut state_sync_update,
150                    &mut partial_mmr,
151                    &accounts,
152                    note_tags.clone(),
153                )
154                .await?
155            {
156                break;
157            }
158        }
159
160        self.sync_nullifiers(&mut state_sync_update, block_num).await?;
161
162        Ok(state_sync_update)
163    }
164
165    /// Executes a single step of the state sync process, returning `true` if the client should
166    /// continue syncing and `false` if the client has reached the chain tip.
167    ///
168    /// A step in this context means a single request to the node to get the next relevant block and
169    /// the changes that happened in it. This block may not be the last one in the chain and
170    /// the client may need to call this method multiple times until it reaches the chain tip.
171    ///
172    /// The `sync_state_update` field of the struct will be updated with the new changes from this
173    /// step.
174    ///
175    /// This function returns whether the state sync process must continue, depending on whether
176    /// the chain tip was reached already.
177    async fn sync_state_step(
178        &self,
179        state_sync_update: &mut StateSyncUpdate,
180        current_partial_mmr: &mut PartialMmr,
181        accounts: &[AccountHeader],
182        note_tags: Arc<BTreeSet<NoteTag>>,
183    ) -> Result<bool, ClientError> {
184        let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
185
186        let response = self
187            .rpc_api
188            .sync_state(state_sync_update.block_num, &account_ids, note_tags.as_ref())
189            .await?;
190
191        // We don't need to continue if the chain has not advanced, there are no new changes
192        if response.block_header.block_num() == state_sync_update.block_num {
193            return Ok(false);
194        }
195
196        let new_block_num = response.block_header.block_num();
197        state_sync_update.block_num = new_block_num;
198
199        self.account_state_sync(
200            &mut state_sync_update.account_updates,
201            accounts,
202            &response.account_commitment_updates,
203        )
204        .await?;
205
206        self.transaction_state_sync(
207            &mut state_sync_update.transaction_updates,
208            &response.block_header,
209            &response.transactions,
210        );
211
212        let found_relevant_note = self
213            .note_state_sync(
214                &mut state_sync_update.note_updates,
215                response.note_inclusions,
216                &response.block_header,
217            )
218            .await?;
219
220        let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
221            &response.block_header,
222            found_relevant_note,
223            current_partial_mmr,
224            response.mmr_delta,
225        )?;
226
227        let mut new_blocks = vec![];
228        if found_relevant_note || response.chain_tip == new_block_num {
229            // Only track relevant blocks or the chain tip
230            new_blocks.push((response.block_header, found_relevant_note, new_mmr_peaks));
231        }
232
233        state_sync_update
234            .block_updates
235            .extend(BlockUpdates::new(new_blocks, new_authentication_nodes));
236
237        if response.chain_tip == new_block_num {
238            Ok(false)
239        } else {
240            Ok(true)
241        }
242    }
243
244    // HELPERS
245    // --------------------------------------------------------------------------------------------
246
247    /// Compares the state of tracked accounts with the updates received from the node. The method
248    /// updates the `state_sync_update` field with the details of the accounts that need to be
249    /// updated.
250    ///
251    /// The account updates might include:
252    /// * Public accounts that have been updated in the node.
253    /// * Network accounts that have been updated in the node and are being tracked by the client.
254    /// * Private accounts that have been marked as mismatched because the current commitment
255    ///   doesn't match the one received from the node. The client will need to handle these cases
256    ///   as they could be a stale account state or a reason to lock the account.
257    async fn account_state_sync(
258        &self,
259        account_updates: &mut AccountUpdates,
260        accounts: &[AccountHeader],
261        account_commitment_updates: &[(AccountId, Word)],
262    ) -> Result<(), ClientError> {
263        let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
264            accounts.iter().partition(|account_header| !account_header.id().is_private());
265
266        let updated_public_accounts = self
267            .get_updated_public_accounts(account_commitment_updates, &public_accounts)
268            .await?;
269
270        let mismatched_private_accounts = account_commitment_updates
271            .iter()
272            .filter(|(account_id, digest)| {
273                private_accounts
274                    .iter()
275                    .any(|account| account.id() == *account_id && &account.commitment() != digest)
276            })
277            .copied()
278            .collect::<Vec<_>>();
279
280        account_updates
281            .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
282
283        Ok(())
284    }
285
286    /// Queries the node for the latest state of the public accounts that don't match the current
287    /// state of the client.
288    async fn get_updated_public_accounts(
289        &self,
290        account_updates: &[(AccountId, Word)],
291        current_public_accounts: &[&AccountHeader],
292    ) -> Result<Vec<Account>, ClientError> {
293        let mut mismatched_public_accounts = vec![];
294
295        for (id, commitment) in account_updates {
296            // check if this updated account state is tracked by the client
297            if let Some(account) = current_public_accounts
298                .iter()
299                .find(|acc| *id == acc.id() && *commitment != acc.commitment())
300            {
301                mismatched_public_accounts.push(*account);
302            }
303        }
304
305        self.rpc_api
306            .get_updated_public_accounts(&mismatched_public_accounts)
307            .await
308            .map_err(ClientError::RpcError)
309    }
310
311    /// Applies the changes received from the sync response to the notes and transactions tracked
312    /// by the client and updates the `note_updates` accordingly.
313    ///
314    /// This method uses the callbacks provided to the [`StateSync`] component to check if the
315    /// updates received are relevant to the client.
316    ///
317    /// The note updates might include:
318    /// * New notes that we received from the node and might be relevant to the client.
319    /// * Tracked expected notes that were committed in the block.
320    /// * Tracked notes that were being processed by a transaction that got committed.
321    /// * Tracked notes that were nullified by an external transaction.
322    async fn note_state_sync(
323        &self,
324        note_updates: &mut NoteUpdateTracker,
325        note_inclusions: Vec<CommittedNote>,
326        block_header: &BlockHeader,
327    ) -> Result<bool, ClientError> {
328        let public_note_ids: Vec<NoteId> = note_inclusions
329            .iter()
330            .filter_map(|note| (!note.metadata().is_private()).then_some(*note.note_id()))
331            .collect();
332
333        let mut found_relevant_note = false;
334
335        // Process note inclusions
336        let new_public_notes = self.fetch_public_note_details(&public_note_ids).await?;
337        for committed_note in note_inclusions {
338            let public_note = new_public_notes.get(committed_note.note_id()).cloned();
339
340            match self.note_screener.on_note_received(committed_note, public_note).await? {
341                NoteUpdateAction::Commit(committed_note) => {
342                    found_relevant_note = true;
343
344                    note_updates
345                        .apply_committed_note_state_transitions(&committed_note, block_header)?;
346                },
347                NoteUpdateAction::Insert(public_note) => {
348                    found_relevant_note = true;
349
350                    note_updates.apply_new_public_note(public_note, block_header)?;
351                },
352                NoteUpdateAction::Discard => {},
353            }
354        }
355
356        Ok(found_relevant_note)
357    }
358
359    /// Queries the node for all received notes that aren't being locally tracked in the client.
360    ///
361    /// The client can receive metadata for private notes that it's not tracking. In this case,
362    /// notes are ignored for now as they become useless until details are imported.
363    async fn fetch_public_note_details(
364        &self,
365        query_notes: &[NoteId],
366    ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
367        if query_notes.is_empty() {
368            return Ok(BTreeMap::new());
369        }
370        info!("Getting note details for notes that are not being tracked.");
371
372        let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
373
374        Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
375    }
376
377    /// Collects the nullifier tags for the notes that were updated in the sync response and uses
378    /// the `check_nullifiers_by_prefix` endpoint to check if there are new nullifiers for these
379    /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
380    ///
381    /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
382    async fn sync_nullifiers(
383        &self,
384        state_sync_update: &mut StateSyncUpdate,
385        current_block_num: BlockNumber,
386    ) -> Result<(), ClientError> {
387        // To receive information about added nullifiers, we reduce them to the higher 16 bits
388        // Note that besides filtering by nullifier prefixes, the node also filters by block number
389        // (it only returns nullifiers from current_block_num until
390        // response.block_header.block_num())
391
392        // Check for new nullifiers for input notes that were updated
393        let nullifiers_tags: Vec<u16> = state_sync_update
394            .note_updates
395            .unspent_nullifiers()
396            .map(|nullifier| nullifier.prefix())
397            .collect();
398
399        let mut new_nullifiers = self
400            .rpc_api
401            .check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
402            .await?;
403
404        // Discard nullifiers that are newer than the current block (this might happen if the block
405        // changes between the sync_state and the check_nullifier calls)
406        new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num.as_u32());
407
408        for nullifier_update in new_nullifiers {
409            state_sync_update.note_updates.apply_nullifiers_state_transitions(
410                &nullifier_update,
411                state_sync_update.transaction_updates.committed_transactions(),
412            )?;
413
414            // Process nullifiers and track the updates of local tracked transactions that were
415            // discarded because the notes that they were processing were nullified by an
416            // another transaction.
417            state_sync_update
418                .transaction_updates
419                .apply_input_note_nullified(nullifier_update.nullifier);
420        }
421
422        Ok(())
423    }
424
425    /// Applies the changes received from the sync response to the transactions tracked by the
426    /// client and updates the `transaction_updates` accordingly.
427    ///
428    /// The transaction updates might include:
429    /// * New transactions that were committed in the block.
430    /// * Transactions that were discarded because they were stale or expired.
431    fn transaction_state_sync(
432        &self,
433        transaction_updates: &mut TransactionUpdateTracker,
434        new_block_header: &BlockHeader,
435        transaction_inclusions: &[TransactionInclusion],
436    ) {
437        for transaction_inclusion in transaction_inclusions {
438            transaction_updates.apply_transaction_inclusion(
439                transaction_inclusion,
440                u64::from(new_block_header.timestamp()),
441            ); //TODO: Change timestamps from u64 to u32
442        }
443
444        transaction_updates
445            .apply_sync_height_update(new_block_header.block_num(), self.tx_graceful_blocks);
446    }
447}
448
449// HELPERS
450// ================================================================================================
451
452/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
453/// authentication nodes for leaves we track.
454fn apply_mmr_changes(
455    new_block: &BlockHeader,
456    new_block_has_relevant_notes: bool,
457    current_partial_mmr: &mut PartialMmr,
458    mmr_delta: MmrDelta,
459) -> Result<(MmrPeaks, Vec<(InOrderIndex, Word)>), ClientError> {
460    // Apply the MMR delta to bring MMR to forest equal to chain tip
461    let mut new_authentication_nodes: Vec<(InOrderIndex, Word)> =
462        current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
463
464    let new_peaks = current_partial_mmr.peaks();
465
466    new_authentication_nodes
467        .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
468
469    Ok((new_peaks, new_authentication_nodes))
470}