miden_client/sync/state_sync.rs
1use alloc::boxed::Box;
2use alloc::collections::{BTreeMap, BTreeSet};
3use alloc::sync::Arc;
4use alloc::vec::Vec;
5
6use miden_objects::Word;
7use miden_objects::account::{Account, AccountHeader, AccountId};
8use miden_objects::block::{BlockHeader, BlockNumber};
9use miden_objects::crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr};
10use miden_objects::note::{NoteId, NoteTag};
11use miden_objects::transaction::PartialBlockchain;
12use tonic::async_trait;
13use tracing::info;
14
15use super::state_sync_update::TransactionUpdateTracker;
16use super::{AccountUpdates, BlockUpdates, StateSyncUpdate};
17use crate::ClientError;
18use crate::note::NoteUpdateTracker;
19use crate::rpc::NodeRpcClient;
20use crate::rpc::domain::note::CommittedNote;
21use crate::rpc::domain::transaction::TransactionInclusion;
22use crate::store::{InputNoteRecord, OutputNoteRecord, StoreError};
23use crate::transaction::TransactionRecord;
24
25// SYNC CALLBACKS
26// ================================================================================================
27
28/// The action to be taken when a note update is received as part of the sync response.
29#[allow(clippy::large_enum_variant)]
30pub enum NoteUpdateAction {
31 /// The note commit update is relevant and the specified note should be marked as committed in
32 /// the store, storing its inclusion proof.
33 Commit(CommittedNote),
34 /// The public note is relevant and should be inserted into the store.
35 Insert(InputNoteRecord),
36 /// The note update is not relevant and should be discarded.
37 Discard,
38}
39
40#[async_trait(?Send)]
41pub trait OnNoteReceived {
42 /// Callback that gets executed when a new note is received as part of the sync response.
43 ///
44 /// It receives:
45 ///
46 /// - The committed note received from the network.
47 /// - An optional note record that corresponds to the state of the note in the network (only if
48 /// the note is public).
49 ///
50 /// It returns an enum indicating the action to be taken for the received note update. Whether
51 /// the note updated should be committed, new public note inserted, or ignored.
52 async fn on_note_received(
53 &self,
54 committed_note: CommittedNote,
55 public_note: Option<InputNoteRecord>,
56 ) -> Result<NoteUpdateAction, ClientError>;
57}
58
59// STATE SYNC
60// ================================================================================================
61
62/// The state sync components encompasses the client's sync logic. It is then used to request
63/// updates from the node and apply them to the relevant elements. The updates are then returned and
64/// can be applied to the store to persist the changes.
65///
66/// When created it receives a callback that will be executed when a new note inclusion is received
67/// in the sync response.
68pub struct StateSync {
69 /// The RPC client used to communicate with the node.
70 rpc_api: Arc<dyn NodeRpcClient + Send>,
71 /// Responsible for checking the relevance of notes and executing the
72 /// [`OnNoteReceived`] callback when a new note inclusion is received.
73 note_screener: Arc<dyn OnNoteReceived>,
74 /// The number of blocks that are considered old enough to discard pending transactions. If
75 /// `None`, there is no limit and transactions will be kept indefinitely.
76 tx_graceful_blocks: Option<u32>,
77}
78
79impl StateSync {
80 /// Creates a new instance of the state sync component.
81 ///
82 /// # Arguments
83 ///
84 /// * `rpc_api` - The RPC client used to communicate with the node.
85 /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
86 /// * `tx_graceful_blocks` - The number of blocks that are considered old enough to discard.
87 /// * `note_screener` - The note screener used to check the relevance of notes.
88 pub fn new(
89 rpc_api: Arc<dyn NodeRpcClient + Send>,
90 note_screener: Arc<dyn OnNoteReceived>,
91 tx_graceful_blocks: Option<u32>,
92 ) -> Self {
93 Self {
94 rpc_api,
95 note_screener,
96 tx_graceful_blocks,
97 }
98 }
99
100 /// Syncs the state of the client with the chain tip of the node, returning the updates that
101 /// should be applied to the store.
102 ///
103 /// During the sync process, the client will go through the following steps:
104 /// 1. A request is sent to the node to get the state updates. This request includes tracked
105 /// account IDs and the tags of notes that might have changed or that might be of interest to
106 /// the client.
107 /// 2. A response is received with the current state of the network. The response includes
108 /// information about new and committed notes, updated accounts, and committed transactions.
109 /// 3. Tracked public accounts are updated and private accounts are validated against the node
110 /// state.
111 /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
112 /// during the sync processing.
113 /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
114 /// [`OnNoteReceived`] callback.
115 /// 6. Transactions are updated with their new states. Transactions might be committed or
116 /// discarded.
117 /// 7. The MMR is updated with the new peaks and authentication nodes.
118 ///
119 /// # Arguments
120 /// * `current_partial_blockchain` - The current partial view of the blockchain.
121 /// * `accounts` - All the headers of tracked accounts.
122 /// * `note_tags` - The note tags to be used in the sync state request.
123 /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
124 /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
125 pub async fn sync_state(
126 self,
127 current_partial_blockchain: PartialBlockchain,
128 accounts: Vec<AccountHeader>,
129 note_tags: BTreeSet<NoteTag>,
130 unspent_input_notes: Vec<InputNoteRecord>,
131 unspent_output_notes: Vec<OutputNoteRecord>,
132 uncommitted_transactions: Vec<TransactionRecord>,
133 ) -> Result<StateSyncUpdate, ClientError> {
134 let block_num =
135 current_partial_blockchain.chain_length().checked_sub(1).unwrap_or_default();
136
137 let mut state_sync_update = StateSyncUpdate {
138 block_num,
139 note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
140 transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
141 ..Default::default()
142 };
143
144 let mut partial_mmr = current_partial_blockchain.mmr().clone();
145 let note_tags = Arc::new(note_tags);
146 loop {
147 if !self
148 .sync_state_step(
149 &mut state_sync_update,
150 &mut partial_mmr,
151 &accounts,
152 note_tags.clone(),
153 )
154 .await?
155 {
156 break;
157 }
158 }
159
160 self.sync_nullifiers(&mut state_sync_update, block_num).await?;
161
162 Ok(state_sync_update)
163 }
164
165 /// Executes a single step of the state sync process, returning `true` if the client should
166 /// continue syncing and `false` if the client has reached the chain tip.
167 ///
168 /// A step in this context means a single request to the node to get the next relevant block and
169 /// the changes that happened in it. This block may not be the last one in the chain and
170 /// the client may need to call this method multiple times until it reaches the chain tip.
171 ///
172 /// The `sync_state_update` field of the struct will be updated with the new changes from this
173 /// step.
174 ///
175 /// This function returns whether the state sync process must continue, depending on whether
176 /// the chain tip was reached already.
177 async fn sync_state_step(
178 &self,
179 state_sync_update: &mut StateSyncUpdate,
180 current_partial_mmr: &mut PartialMmr,
181 accounts: &[AccountHeader],
182 note_tags: Arc<BTreeSet<NoteTag>>,
183 ) -> Result<bool, ClientError> {
184 let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
185
186 let response = self
187 .rpc_api
188 .sync_state(state_sync_update.block_num, &account_ids, note_tags.as_ref())
189 .await?;
190
191 // We don't need to continue if the chain has not advanced, there are no new changes
192 if response.block_header.block_num() == state_sync_update.block_num {
193 return Ok(false);
194 }
195
196 let new_block_num = response.block_header.block_num();
197 state_sync_update.block_num = new_block_num;
198
199 self.account_state_sync(
200 &mut state_sync_update.account_updates,
201 accounts,
202 &response.account_commitment_updates,
203 )
204 .await?;
205
206 self.transaction_state_sync(
207 &mut state_sync_update.transaction_updates,
208 &response.block_header,
209 &response.transactions,
210 );
211
212 let found_relevant_note = self
213 .note_state_sync(
214 &mut state_sync_update.note_updates,
215 response.note_inclusions,
216 &response.block_header,
217 )
218 .await?;
219
220 let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
221 &response.block_header,
222 found_relevant_note,
223 current_partial_mmr,
224 response.mmr_delta,
225 )?;
226
227 let mut new_blocks = vec![];
228 if found_relevant_note || response.chain_tip == new_block_num {
229 // Only track relevant blocks or the chain tip
230 new_blocks.push((response.block_header, found_relevant_note, new_mmr_peaks));
231 }
232
233 state_sync_update
234 .block_updates
235 .extend(BlockUpdates::new(new_blocks, new_authentication_nodes));
236
237 if response.chain_tip == new_block_num {
238 Ok(false)
239 } else {
240 Ok(true)
241 }
242 }
243
244 // HELPERS
245 // --------------------------------------------------------------------------------------------
246
247 /// Compares the state of tracked accounts with the updates received from the node. The method
248 /// updates the `state_sync_update` field with the details of the accounts that need to be
249 /// updated.
250 ///
251 /// The account updates might include:
252 /// * Public accounts that have been updated in the node.
253 /// * Network accounts that have been updated in the node and are being tracked by the client.
254 /// * Private accounts that have been marked as mismatched because the current commitment
255 /// doesn't match the one received from the node. The client will need to handle these cases
256 /// as they could be a stale account state or a reason to lock the account.
257 async fn account_state_sync(
258 &self,
259 account_updates: &mut AccountUpdates,
260 accounts: &[AccountHeader],
261 account_commitment_updates: &[(AccountId, Word)],
262 ) -> Result<(), ClientError> {
263 let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
264 accounts.iter().partition(|account_header| !account_header.id().is_private());
265
266 let updated_public_accounts = self
267 .get_updated_public_accounts(account_commitment_updates, &public_accounts)
268 .await?;
269
270 let mismatched_private_accounts = account_commitment_updates
271 .iter()
272 .filter(|(account_id, digest)| {
273 private_accounts
274 .iter()
275 .any(|account| account.id() == *account_id && &account.commitment() != digest)
276 })
277 .copied()
278 .collect::<Vec<_>>();
279
280 account_updates
281 .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
282
283 Ok(())
284 }
285
286 /// Queries the node for the latest state of the public accounts that don't match the current
287 /// state of the client.
288 async fn get_updated_public_accounts(
289 &self,
290 account_updates: &[(AccountId, Word)],
291 current_public_accounts: &[&AccountHeader],
292 ) -> Result<Vec<Account>, ClientError> {
293 let mut mismatched_public_accounts = vec![];
294
295 for (id, commitment) in account_updates {
296 // check if this updated account state is tracked by the client
297 if let Some(account) = current_public_accounts
298 .iter()
299 .find(|acc| *id == acc.id() && *commitment != acc.commitment())
300 {
301 mismatched_public_accounts.push(*account);
302 }
303 }
304
305 self.rpc_api
306 .get_updated_public_accounts(&mismatched_public_accounts)
307 .await
308 .map_err(ClientError::RpcError)
309 }
310
311 /// Applies the changes received from the sync response to the notes and transactions tracked
312 /// by the client and updates the `note_updates` accordingly.
313 ///
314 /// This method uses the callbacks provided to the [`StateSync`] component to check if the
315 /// updates received are relevant to the client.
316 ///
317 /// The note updates might include:
318 /// * New notes that we received from the node and might be relevant to the client.
319 /// * Tracked expected notes that were committed in the block.
320 /// * Tracked notes that were being processed by a transaction that got committed.
321 /// * Tracked notes that were nullified by an external transaction.
322 async fn note_state_sync(
323 &self,
324 note_updates: &mut NoteUpdateTracker,
325 note_inclusions: Vec<CommittedNote>,
326 block_header: &BlockHeader,
327 ) -> Result<bool, ClientError> {
328 let public_note_ids: Vec<NoteId> = note_inclusions
329 .iter()
330 .filter_map(|note| (!note.metadata().is_private()).then_some(*note.note_id()))
331 .collect();
332
333 let mut found_relevant_note = false;
334
335 // Process note inclusions
336 let new_public_notes = self.fetch_public_note_details(&public_note_ids).await?;
337 for committed_note in note_inclusions {
338 let public_note = new_public_notes.get(committed_note.note_id()).cloned();
339
340 match self.note_screener.on_note_received(committed_note, public_note).await? {
341 NoteUpdateAction::Commit(committed_note) => {
342 found_relevant_note = true;
343
344 note_updates
345 .apply_committed_note_state_transitions(&committed_note, block_header)?;
346 },
347 NoteUpdateAction::Insert(public_note) => {
348 found_relevant_note = true;
349
350 note_updates.apply_new_public_note(public_note, block_header)?;
351 },
352 NoteUpdateAction::Discard => {},
353 }
354 }
355
356 Ok(found_relevant_note)
357 }
358
359 /// Queries the node for all received notes that aren't being locally tracked in the client.
360 ///
361 /// The client can receive metadata for private notes that it's not tracking. In this case,
362 /// notes are ignored for now as they become useless until details are imported.
363 async fn fetch_public_note_details(
364 &self,
365 query_notes: &[NoteId],
366 ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
367 if query_notes.is_empty() {
368 return Ok(BTreeMap::new());
369 }
370 info!("Getting note details for notes that are not being tracked.");
371
372 let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
373
374 Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
375 }
376
377 /// Collects the nullifier tags for the notes that were updated in the sync response and uses
378 /// the `check_nullifiers_by_prefix` endpoint to check if there are new nullifiers for these
379 /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
380 ///
381 /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
382 async fn sync_nullifiers(
383 &self,
384 state_sync_update: &mut StateSyncUpdate,
385 current_block_num: BlockNumber,
386 ) -> Result<(), ClientError> {
387 // To receive information about added nullifiers, we reduce them to the higher 16 bits
388 // Note that besides filtering by nullifier prefixes, the node also filters by block number
389 // (it only returns nullifiers from current_block_num until
390 // response.block_header.block_num())
391
392 // Check for new nullifiers for input notes that were updated
393 let nullifiers_tags: Vec<u16> = state_sync_update
394 .note_updates
395 .unspent_nullifiers()
396 .map(|nullifier| nullifier.prefix())
397 .collect();
398
399 let mut new_nullifiers = self
400 .rpc_api
401 .check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
402 .await?;
403
404 // Discard nullifiers that are newer than the current block (this might happen if the block
405 // changes between the sync_state and the check_nullifier calls)
406 new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num.as_u32());
407
408 for nullifier_update in new_nullifiers {
409 state_sync_update.note_updates.apply_nullifiers_state_transitions(
410 &nullifier_update,
411 state_sync_update.transaction_updates.committed_transactions(),
412 )?;
413
414 // Process nullifiers and track the updates of local tracked transactions that were
415 // discarded because the notes that they were processing were nullified by an
416 // another transaction.
417 state_sync_update
418 .transaction_updates
419 .apply_input_note_nullified(nullifier_update.nullifier);
420 }
421
422 Ok(())
423 }
424
425 /// Applies the changes received from the sync response to the transactions tracked by the
426 /// client and updates the `transaction_updates` accordingly.
427 ///
428 /// The transaction updates might include:
429 /// * New transactions that were committed in the block.
430 /// * Transactions that were discarded because they were stale or expired.
431 fn transaction_state_sync(
432 &self,
433 transaction_updates: &mut TransactionUpdateTracker,
434 new_block_header: &BlockHeader,
435 transaction_inclusions: &[TransactionInclusion],
436 ) {
437 for transaction_inclusion in transaction_inclusions {
438 transaction_updates.apply_transaction_inclusion(
439 transaction_inclusion,
440 u64::from(new_block_header.timestamp()),
441 ); //TODO: Change timestamps from u64 to u32
442 }
443
444 transaction_updates
445 .apply_sync_height_update(new_block_header.block_num(), self.tx_graceful_blocks);
446 }
447}
448
449// HELPERS
450// ================================================================================================
451
452/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
453/// authentication nodes for leaves we track.
454fn apply_mmr_changes(
455 new_block: &BlockHeader,
456 new_block_has_relevant_notes: bool,
457 current_partial_mmr: &mut PartialMmr,
458 mmr_delta: MmrDelta,
459) -> Result<(MmrPeaks, Vec<(InOrderIndex, Word)>), ClientError> {
460 // Apply the MMR delta to bring MMR to forest equal to chain tip
461 let mut new_authentication_nodes: Vec<(InOrderIndex, Word)> =
462 current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
463
464 let new_peaks = current_partial_mmr.peaks();
465
466 new_authentication_nodes
467 .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
468
469 Ok((new_peaks, new_authentication_nodes))
470}