miden_client/sync/state_sync.rs
1use alloc::{boxed::Box, collections::BTreeMap, sync::Arc, vec::Vec};
2use core::{future::Future, pin::Pin};
3
4use miden_objects::{
5 Digest,
6 account::{Account, AccountHeader, AccountId},
7 block::{BlockHeader, BlockNumber},
8 crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr},
9 note::{NoteId, NoteTag},
10 transaction::PartialBlockchain,
11};
12use tracing::info;
13
14use super::{
15 AccountUpdates, BlockUpdates, StateSyncUpdate, state_sync_update::TransactionUpdateTracker,
16};
17use crate::{
18 ClientError,
19 note::{NoteScreener, NoteUpdateTracker},
20 rpc::{
21 NodeRpcClient,
22 domain::{note::CommittedNote, transaction::TransactionInclusion},
23 },
24 store::{InputNoteRecord, NoteFilter, OutputNoteRecord, Store, StoreError},
25 transaction::TransactionRecord,
26};
27
28// SYNC CALLBACKS
29// ================================================================================================
30
31/// Callback that gets executed when a new note is received as part of the sync response.
32///
33/// It receives the committed note received from the network and an optional note record that
34/// corresponds to the state of the note in the network (only if the note is public).
35///
36/// It returns a boolean indicating if the received note update is relevant. If the return value
37/// is `false`, it gets discarded. If it is `true`, the update gets committed to the client's store.
38pub type OnNoteReceived<'a> = Box<
39 dyn Fn(
40 CommittedNote,
41 Option<InputNoteRecord>,
42 Arc<NoteScreener<'a>>,
43 ) -> Pin<Box<(dyn Future<Output = Result<bool, ClientError>> + 'a)>>,
44>;
45
46// STATE SYNC
47// ================================================================================================
48
49/// The state sync components encompasses the client's sync logic. It is then used to requset
50/// updates from the node and apply them to the relevant elements. The updates are then returned and
51/// can be applied to the store to persist the changes.
52///
53/// When created it receives a callback that will be executed when a new note inclusion is received
54/// in the sync response.
55pub struct StateSync<'a> {
56 /// The RPC client used to communicate with the node.
57 rpc_api: Arc<dyn NodeRpcClient + Send>,
58 /// Callback to be executed when a new note inclusion is received.
59 on_note_received: OnNoteReceived<'a>,
60 tx_graceful_blocks: Option<u32>,
61 note_screener: Arc<NoteScreener<'a>>,
62}
63
64impl<'a> StateSync<'a> {
65 /// Creates a new instance of the state sync component.
66 ///
67 /// # Arguments
68 ///
69 /// * `rpc_api` - The RPC client used to communicate with the node.
70 /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
71 pub fn new(
72 rpc_api: Arc<dyn NodeRpcClient + Send>,
73 on_note_received: OnNoteReceived<'a>,
74 tx_graceful_blocks: Option<u32>,
75 note_screener: NoteScreener<'a>,
76 ) -> Self {
77 Self {
78 rpc_api,
79 on_note_received,
80 tx_graceful_blocks,
81 #[allow(clippy::arc_with_non_send_sync)]
82 note_screener: Arc::new(note_screener),
83 }
84 }
85
86 /// Syncs the state of the client with the chain tip of the node, returning the updates that
87 /// should be applied to the store.
88 ///
89 /// During the sync process, the client will go through the following steps:
90 /// 1. A request is sent to the node to get the state updates. This request includes tracked
91 /// account IDs and the tags of notes that might have changed or that might be of interest to
92 /// the client.
93 /// 2. A response is received with the current state of the network. The response includes
94 /// information about new and committed notes, updated accounts, and committed transactions.
95 /// 3. Tracked public accounts are updated and private accounts are validated against the node
96 /// state.
97 /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
98 /// during the sync processing.
99 /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
100 /// [`OnNoteReceived`] callback.
101 /// 6. Transactions are updated with their new states. Transactions might be committed or
102 /// discarded.
103 /// 7. The MMR is updated with the new peaks and authentication nodes.
104 ///
105 /// # Arguments
106 /// * `current_partial_blockchain` - The current partial view of the blockchain.
107 /// * `accounts` - All the headers of tracked accounts.
108 /// * `note_tags` - The note tags to be used in the sync state request.
109 /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
110 /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
111 pub async fn sync_state(
112 self,
113 current_partial_blockchain: PartialBlockchain,
114 accounts: Vec<AccountHeader>,
115 note_tags: Vec<NoteTag>,
116 unspent_input_notes: Vec<InputNoteRecord>,
117 unspent_output_notes: Vec<OutputNoteRecord>,
118 uncommitted_transactions: Vec<TransactionRecord>,
119 ) -> Result<StateSyncUpdate, ClientError> {
120 let block_num =
121 current_partial_blockchain.chain_length().checked_sub(1).unwrap_or_default();
122
123 let mut state_sync_update = StateSyncUpdate {
124 block_num,
125 note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
126 transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
127 ..Default::default()
128 };
129
130 let mut partial_mmr = current_partial_blockchain.mmr().clone();
131
132 loop {
133 if !self
134 .sync_state_step(&mut state_sync_update, &mut partial_mmr, &accounts, ¬e_tags)
135 .await?
136 {
137 break;
138 }
139 }
140
141 self.sync_nullifiers(&mut state_sync_update, block_num).await?;
142
143 Ok(state_sync_update)
144 }
145
146 /// Executes a single step of the state sync process, returning `true` if the client should
147 /// continue syncing and `false` if the client has reached the chain tip.
148 ///
149 /// A step in this context means a single request to the node to get the next relevant block and
150 /// the changes that happened in it. This block may not be the last one in the chain and
151 /// the client may need to call this method multiple times until it reaches the chain tip.
152 ///
153 /// The `sync_state_update` field of the struct will be updated with the new changes from this
154 /// step.
155 async fn sync_state_step(
156 &self,
157 state_sync_update: &mut StateSyncUpdate,
158 current_partial_mmr: &mut PartialMmr,
159 accounts: &[AccountHeader],
160 note_tags: &[NoteTag],
161 ) -> Result<bool, ClientError> {
162 let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
163
164 let response = self
165 .rpc_api
166 .sync_state(state_sync_update.block_num, &account_ids, note_tags)
167 .await?;
168
169 // We don't need to continue if the chain has not advanced, there are no new changes
170 if response.block_header.block_num() == state_sync_update.block_num {
171 return Ok(false);
172 }
173
174 let new_block_num = response.block_header.block_num();
175 state_sync_update.block_num = new_block_num;
176
177 self.account_state_sync(
178 &mut state_sync_update.account_updates,
179 accounts,
180 &response.account_commitment_updates,
181 )
182 .await?;
183
184 self.transaction_state_sync(
185 &mut state_sync_update.transaction_updates,
186 new_block_num,
187 &response.transactions,
188 );
189
190 let found_relevant_note = self
191 .note_state_sync(
192 &mut state_sync_update.note_updates,
193 response.note_inclusions,
194 &response.block_header,
195 )
196 .await?;
197
198 let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
199 &response.block_header,
200 found_relevant_note,
201 current_partial_mmr,
202 response.mmr_delta,
203 )?;
204
205 let mut new_blocks = vec![];
206 if found_relevant_note || response.chain_tip == new_block_num {
207 // Only track relevant blocks or the chain tip
208 new_blocks.push((response.block_header, found_relevant_note, new_mmr_peaks));
209 }
210
211 state_sync_update
212 .block_updates
213 .extend(BlockUpdates::new(new_blocks, new_authentication_nodes));
214
215 if response.chain_tip == new_block_num {
216 Ok(false)
217 } else {
218 Ok(true)
219 }
220 }
221
222 // HELPERS
223 // --------------------------------------------------------------------------------------------
224
225 /// Compares the state of tracked accounts with the updates received from the node. The method
226 /// updates the `state_sync_update` field with the details of the accounts that need to be
227 /// updated.
228 ///
229 /// The account updates might include:
230 /// * Public accounts that have been updated in the node.
231 /// * Private accounts that have been marked as mismatched because the current commitment
232 /// doesn't match the one received from the node. The client will need to handle these cases
233 /// as they could be a stale account state or a reason to lock the account.
234 async fn account_state_sync(
235 &self,
236 account_updates: &mut AccountUpdates,
237 accounts: &[AccountHeader],
238 account_commitment_updates: &[(AccountId, Digest)],
239 ) -> Result<(), ClientError> {
240 let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
241 accounts.iter().partition(|account_header| account_header.id().is_public());
242
243 let updated_public_accounts = self
244 .get_updated_public_accounts(account_commitment_updates, &public_accounts)
245 .await?;
246
247 let mismatched_private_accounts = account_commitment_updates
248 .iter()
249 .filter(|(account_id, digest)| {
250 private_accounts
251 .iter()
252 .any(|account| account.id() == *account_id && &account.commitment() != digest)
253 })
254 .copied()
255 .collect::<Vec<_>>();
256
257 account_updates
258 .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
259
260 Ok(())
261 }
262
263 /// Queries the node for the latest state of the public accounts that don't match the current
264 /// state of the client.
265 async fn get_updated_public_accounts(
266 &self,
267 account_updates: &[(AccountId, Digest)],
268 current_public_accounts: &[&AccountHeader],
269 ) -> Result<Vec<Account>, ClientError> {
270 let mut mismatched_public_accounts = vec![];
271
272 for (id, commitment) in account_updates {
273 // check if this updated account state is tracked by the client
274 if let Some(account) = current_public_accounts
275 .iter()
276 .find(|acc| *id == acc.id() && *commitment != acc.commitment())
277 {
278 mismatched_public_accounts.push(*account);
279 }
280 }
281
282 self.rpc_api
283 .get_updated_public_accounts(&mismatched_public_accounts)
284 .await
285 .map_err(ClientError::RpcError)
286 }
287
288 /// Applies the changes received from the sync response to the notes and transactions tracked
289 /// by the client and updates the `note_updates` accordingly.
290 ///
291 /// This method uses the callbacks provided to the [`StateSync`] component to check if the
292 /// updates received are relevant to the client.
293 ///
294 /// The note updates might include:
295 /// * New notes that we received from the node and might be relevant to the client.
296 /// * Tracked expected notes that were committed in the block.
297 /// * Tracked notes that were being processed by a transaction that got committed.
298 /// * Tracked notes that were nullified by an external transaction.
299 async fn note_state_sync(
300 &self,
301 note_updates: &mut NoteUpdateTracker,
302 note_inclusions: Vec<CommittedNote>,
303 block_header: &BlockHeader,
304 ) -> Result<bool, ClientError> {
305 let public_note_ids: Vec<NoteId> = note_inclusions
306 .iter()
307 .filter_map(|note| (!note.metadata().is_private()).then_some(*note.note_id()))
308 .collect();
309
310 let mut found_relevant_note = false;
311
312 // Process note inclusions
313 let new_public_notes = self.fetch_public_note_details(&public_note_ids).await?;
314 for committed_note in note_inclusions {
315 let public_note = new_public_notes.get(committed_note.note_id()).cloned();
316
317 if (self.on_note_received)(
318 committed_note.clone(),
319 public_note.clone(),
320 self.note_screener.clone(),
321 )
322 .await?
323 {
324 found_relevant_note = true;
325
326 note_updates.apply_committed_note_state_transitions(
327 &committed_note,
328 public_note,
329 block_header,
330 )?;
331 }
332 }
333
334 Ok(found_relevant_note)
335 }
336
337 /// Queries the node for all received notes that aren't being locally tracked in the client.
338 ///
339 /// The client can receive metadata for private notes that it's not tracking. In this case,
340 /// notes are ignored for now as they become useless until details are imported.
341 async fn fetch_public_note_details(
342 &self,
343 query_notes: &[NoteId],
344 ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
345 if query_notes.is_empty() {
346 return Ok(BTreeMap::new());
347 }
348 info!("Getting note details for notes that are not being tracked.");
349
350 let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
351
352 Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
353 }
354
355 /// Collects the nullifier tags for the notes that were updated in the sync response and uses
356 /// the `check_nullifiers_by_prefix` endpoint to check if there are new nullifiers for these
357 /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
358 ///
359 /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
360 async fn sync_nullifiers(
361 &self,
362 state_sync_update: &mut StateSyncUpdate,
363 current_block_num: BlockNumber,
364 ) -> Result<(), ClientError> {
365 // To receive information about added nullifiers, we reduce them to the higher 16 bits
366 // Note that besides filtering by nullifier prefixes, the node also filters by block number
367 // (it only returns nullifiers from current_block_num until
368 // response.block_header.block_num())
369
370 // Check for new nullifiers for input notes that were updated
371 let nullifiers_tags: Vec<u16> = state_sync_update
372 .note_updates
373 .unspent_nullifiers()
374 .map(|nullifier| nullifier.prefix())
375 .collect();
376
377 let mut new_nullifiers = self
378 .rpc_api
379 .check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
380 .await?;
381
382 // Discard nullifiers that are newer than the current block (this might happen if the block
383 // changes between the sync_state and the check_nullifier calls)
384 new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num.as_u32());
385
386 for nullifier_update in new_nullifiers {
387 state_sync_update.note_updates.apply_nullifiers_state_transitions(
388 &nullifier_update,
389 state_sync_update.transaction_updates.committed_transactions(),
390 )?;
391
392 // Process nullifiers and track the updates of local tracked transactions that were
393 // discarded because the notes that they were processing were nullified by an
394 // another transaction.
395 state_sync_update
396 .transaction_updates
397 .apply_input_note_nullified(nullifier_update.nullifier);
398 }
399
400 Ok(())
401 }
402
403 /// Applies the changes received from the sync response to the transactions tracked by the
404 /// client and updates the `transaction_updates` accordingly.
405 ///
406 /// The transaction updates might include:
407 /// * New transactions that were committed in the block.
408 /// * Transactions that were discarded because they were stale or expired.
409 fn transaction_state_sync(
410 &self,
411 transaction_updates: &mut TransactionUpdateTracker,
412 new_sync_height: BlockNumber,
413 transaction_inclusions: &[TransactionInclusion],
414 ) {
415 for transaction_inclusion in transaction_inclusions {
416 transaction_updates.apply_transaction_inclusion(transaction_inclusion);
417 }
418
419 transaction_updates.apply_sync_height_update(new_sync_height, self.tx_graceful_blocks);
420 }
421}
422
423// HELPERS
424// ================================================================================================
425
426/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
427/// authentication nodes for leaves we track.
428fn apply_mmr_changes(
429 new_block: &BlockHeader,
430 new_block_has_relevant_notes: bool,
431 current_partial_mmr: &mut PartialMmr,
432 mmr_delta: MmrDelta,
433) -> Result<(MmrPeaks, Vec<(InOrderIndex, Digest)>), ClientError> {
434 // Apply the MMR delta to bring MMR to forest equal to chain tip
435 let mut new_authentication_nodes: Vec<(InOrderIndex, Digest)> =
436 current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
437
438 let new_peaks = current_partial_mmr.peaks();
439
440 new_authentication_nodes
441 .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
442
443 Ok((new_peaks, new_authentication_nodes))
444}
445
446// DEFAULT CALLBACK IMPLEMENTATIONS
447// ================================================================================================
448
449/// Default implementation of the [`OnNoteReceived`] callback. It queries the store for the
450/// committed note to check if it's relevant. If the note wasn't being tracked but it came in the
451/// sync response it may be a new public note, in that case we use the [`NoteScreener`] to check its
452/// relevance.
453pub async fn on_note_received(
454 store: Arc<dyn Store>,
455 committed_note: CommittedNote,
456 public_note: Option<InputNoteRecord>,
457 note_screener: Arc<NoteScreener<'_>>,
458) -> Result<bool, ClientError> {
459 let note_id = *committed_note.note_id();
460
461 if !store.get_input_notes(NoteFilter::Unique(note_id)).await?.is_empty()
462 || !store.get_output_notes(NoteFilter::Unique(note_id)).await?.is_empty()
463 {
464 // The note is being tracked by the client so it is relevant
465 Ok(true)
466 } else if let Some(public_note) = public_note {
467 // The note is not being tracked by the client and is public so we can screen it
468 let new_note_relevance = note_screener
469 .check_relevance(
470 &public_note.try_into().map_err(ClientError::NoteRecordConversionError)?,
471 )
472 .await?;
473
474 Ok(!new_note_relevance.is_empty())
475 } else {
476 // The note is not being tracked by the client and is private so we can't determine if it
477 // is relevant
478 Ok(false)
479 }
480}