miden_client/sync/state_sync.rs
1use alloc::boxed::Box;
2use alloc::collections::{BTreeMap, BTreeSet};
3use alloc::sync::Arc;
4use alloc::vec::Vec;
5
6use async_trait::async_trait;
7use miden_protocol::Word;
8use miden_protocol::account::{Account, AccountHeader, AccountId};
9use miden_protocol::block::{BlockHeader, BlockNumber};
10use miden_protocol::crypto::merkle::mmr::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr};
11use miden_protocol::note::{NoteId, NoteTag};
12use tracing::info;
13
14use super::state_sync_update::TransactionUpdateTracker;
15use super::{AccountUpdates, StateSyncUpdate};
16use crate::ClientError;
17use crate::note::NoteUpdateTracker;
18use crate::rpc::NodeRpcClient;
19use crate::rpc::domain::note::CommittedNote;
20use crate::rpc::domain::sync::StateSyncInfo;
21use crate::rpc::domain::transaction::TransactionInclusion;
22use crate::store::{InputNoteRecord, OutputNoteRecord, StoreError};
23use crate::transaction::TransactionRecord;
24
25// SYNC CALLBACKS
26// ================================================================================================
27
28/// The action to be taken when a note update is received as part of the sync response.
29#[allow(clippy::large_enum_variant)]
30pub enum NoteUpdateAction {
31 /// The note commit update is relevant and the specified note should be marked as committed in
32 /// the store, storing its inclusion proof.
33 Commit(CommittedNote),
34 /// The public note is relevant and should be inserted into the store.
35 Insert(InputNoteRecord),
36 /// The note update is not relevant and should be discarded.
37 Discard,
38}
39
40#[async_trait(?Send)]
41pub trait OnNoteReceived {
42 /// Callback that gets executed when a new note is received as part of the sync response.
43 ///
44 /// It receives:
45 ///
46 /// - The committed note received from the network.
47 /// - An optional note record that corresponds to the state of the note in the network (only if
48 /// the note is public).
49 ///
50 /// It returns an enum indicating the action to be taken for the received note update. Whether
51 /// the note updated should be committed, new public note inserted, or ignored.
52 async fn on_note_received(
53 &self,
54 committed_note: CommittedNote,
55 public_note: Option<InputNoteRecord>,
56 ) -> Result<NoteUpdateAction, ClientError>;
57}
58
59// STATE SYNC
60// ================================================================================================
61
62/// The state sync components encompasses the client's sync logic. It is then used to request
63/// updates from the node and apply them to the relevant elements. The updates are then returned and
64/// can be applied to the store to persist the changes.
65///
66/// When created it receives a callback that will be executed when a new note inclusion is received
67/// in the sync response.
68#[derive(Clone)]
69pub struct StateSync {
70 /// The RPC client used to communicate with the node.
71 rpc_api: Arc<dyn NodeRpcClient>,
72 /// Responsible for checking the relevance of notes and executing the
73 /// [`OnNoteReceived`] callback when a new note inclusion is received.
74 note_screener: Arc<dyn OnNoteReceived>,
75 /// The number of blocks that are considered old enough to discard pending transactions. If
76 /// `None`, there is no limit and transactions will be kept indefinitely.
77 tx_graceful_blocks: Option<u32>,
78}
79
80impl StateSync {
81 /// Creates a new instance of the state sync component.
82 ///
83 /// # Arguments
84 ///
85 /// * `rpc_api` - The RPC client used to communicate with the node.
86 /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
87 /// * `tx_graceful_blocks` - The number of blocks that are considered old enough to discard.
88 /// * `note_screener` - The note screener used to check the relevance of notes.
89 pub fn new(
90 rpc_api: Arc<dyn NodeRpcClient>,
91 note_screener: Arc<dyn OnNoteReceived>,
92 tx_graceful_blocks: Option<u32>,
93 ) -> Self {
94 Self {
95 rpc_api,
96 note_screener,
97 tx_graceful_blocks,
98 }
99 }
100
101 /// Syncs the state of the client with the chain tip of the node, returning the updates that
102 /// should be applied to the store.
103 ///
104 /// During the sync process, the client will go through the following steps:
105 /// 1. A request is sent to the node to get the state updates. This request includes tracked
106 /// account IDs and the tags of notes that might have changed or that might be of interest to
107 /// the client.
108 /// 2. A response is received with the current state of the network. The response includes
109 /// information about new and committed notes, updated accounts, and committed transactions.
110 /// 3. Tracked public accounts are updated and private accounts are validated against the node
111 /// state.
112 /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
113 /// during the sync processing.
114 /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
115 /// [`OnNoteReceived`] callback.
116 /// 6. Transactions are updated with their new states. Transactions might be committed or
117 /// discarded.
118 /// 7. The MMR is updated with the new peaks and authentication nodes.
119 ///
120 /// # Arguments
121 /// * `current_partial_blockchain` - The current partial view of the blockchain.
122 /// * `accounts` - All the headers of tracked accounts.
123 /// * `note_tags` - The note tags to be used in the sync state request.
124 /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
125 /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
126 pub async fn sync_state(
127 &self,
128 mut current_partial_mmr: PartialMmr,
129 accounts: Vec<AccountHeader>,
130 note_tags: BTreeSet<NoteTag>,
131 unspent_input_notes: Vec<InputNoteRecord>,
132 unspent_output_notes: Vec<OutputNoteRecord>,
133 uncommitted_transactions: Vec<TransactionRecord>,
134 ) -> Result<StateSyncUpdate, ClientError> {
135 let block_num = u32::try_from(
136 current_partial_mmr.forest().num_leaves().checked_sub(1).unwrap_or_default(),
137 )
138 .map_err(|_| ClientError::InvalidPartialMmrForest)?
139 .into();
140
141 let mut state_sync_update = StateSyncUpdate {
142 block_num,
143 note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
144 transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
145 ..Default::default()
146 };
147
148 let note_tags = Arc::new(note_tags);
149 let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
150 let mut state_sync_steps = Vec::new();
151
152 while let Some(step) = self
153 .sync_state_step(state_sync_update.block_num, &account_ids, ¬e_tags)
154 .await?
155 {
156 let sync_block_num = step.block_header.block_num();
157
158 let reached_tip = step.chain_tip == sync_block_num;
159
160 state_sync_update.block_num = sync_block_num;
161 state_sync_steps.push(step);
162
163 if reached_tip {
164 break;
165 }
166 }
167
168 // TODO: fetch_public_note_details should take an iterator or btreeset down to the RPC call
169 // (this would be a breaking change so it should be done separately)
170 let public_note_ids: Vec<NoteId> = state_sync_steps
171 .iter()
172 .flat_map(|s| s.note_inclusions.iter())
173 .filter(|n| !n.metadata().is_private())
174 .map(|n| *n.note_id())
175 .collect();
176
177 let public_note_records = self.fetch_public_note_details(&public_note_ids).await?;
178
179 // Apply local changes. These involve updating the MMR and applying state transitions
180 // to notes based on the received information.
181 info!("Applying state transitions locally.");
182
183 for sync_step in state_sync_steps {
184 let StateSyncInfo {
185 chain_tip,
186 block_header,
187 mmr_delta,
188 account_commitment_updates,
189 note_inclusions,
190 transactions,
191 } = sync_step;
192
193 self.account_state_sync(
194 &mut state_sync_update.account_updates,
195 &accounts,
196 &account_commitment_updates,
197 )
198 .await?;
199
200 self.transaction_state_sync(
201 &mut state_sync_update.transaction_updates,
202 &block_header,
203 &transactions,
204 );
205
206 let found_relevant_note = self
207 .note_state_sync(
208 &mut state_sync_update.note_updates,
209 note_inclusions,
210 &block_header,
211 &public_note_records,
212 )
213 .await?;
214
215 let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
216 &block_header,
217 found_relevant_note,
218 &mut current_partial_mmr,
219 mmr_delta,
220 )?;
221
222 let include_block = found_relevant_note || chain_tip == block_header.block_num();
223 if include_block {
224 state_sync_update.block_updates.insert(
225 block_header,
226 found_relevant_note,
227 new_mmr_peaks,
228 new_authentication_nodes,
229 );
230 } else {
231 // Even though this block header is not stored, `apply_mmr_changes` may
232 // produce authentication nodes for already-tracked leaves whose Merkle
233 // paths change as the MMR grows. These must be persisted so that the
234 // `PartialMmr` can be correctly reconstructed from the store after a
235 // client restart.
236 state_sync_update
237 .block_updates
238 .extend_authentication_nodes(new_authentication_nodes);
239 }
240 }
241 info!("Syncing nullifiers.");
242
243 self.sync_nullifiers(&mut state_sync_update, block_num).await?;
244
245 Ok(state_sync_update)
246 }
247
248 /// Executes a single sync request and returns the node response if the chain advanced.
249 ///
250 /// This method issues a `/SyncState` call starting from `current_block_num`. If the node
251 /// reports the same block number, `None` is returned, signalling that the client is already at
252 /// the requested height. Otherwise the full [`StateSyncInfo`] is returned for deferred
253 /// processing by the caller.
254 async fn sync_state_step(
255 &self,
256 current_block_num: BlockNumber,
257 account_ids: &[AccountId],
258 note_tags: &Arc<BTreeSet<NoteTag>>,
259 ) -> Result<Option<StateSyncInfo>, ClientError> {
260 info!("Performing sync state step.");
261 let response = self
262 .rpc_api
263 .sync_state(current_block_num, account_ids, note_tags.as_ref())
264 .await?;
265
266 // We don't need to continue if the chain has not advanced, there are no new changes
267 if response.block_header.block_num() == current_block_num {
268 return Ok(None);
269 }
270
271 Ok(Some(response))
272 }
273
274 // HELPERS
275 // --------------------------------------------------------------------------------------------
276
277 /// Compares the state of tracked accounts with the updates received from the node. The method
278 /// updates the `state_sync_update` field with the details of the accounts that need to be
279 /// updated.
280 ///
281 /// The account updates might include:
282 /// * Public accounts that have been updated in the node.
283 /// * Network accounts that have been updated in the node and are being tracked by the client.
284 /// * Private accounts that have been marked as mismatched because the current commitment
285 /// doesn't match the one received from the node. The client will need to handle these cases
286 /// as they could be a stale account state or a reason to lock the account.
287 async fn account_state_sync(
288 &self,
289 account_updates: &mut AccountUpdates,
290 accounts: &[AccountHeader],
291 account_commitment_updates: &[(AccountId, Word)],
292 ) -> Result<(), ClientError> {
293 let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
294 accounts.iter().partition(|account_header| !account_header.id().is_private());
295
296 let updated_public_accounts = self
297 .get_updated_public_accounts(account_commitment_updates, &public_accounts)
298 .await?;
299
300 let mismatched_private_accounts = account_commitment_updates
301 .iter()
302 .filter(|(account_id, digest)| {
303 private_accounts
304 .iter()
305 .any(|account| account.id() == *account_id && &account.commitment() != digest)
306 })
307 .copied()
308 .collect::<Vec<_>>();
309
310 account_updates
311 .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
312
313 Ok(())
314 }
315
316 /// Queries the node for the latest state of the public accounts that don't match the current
317 /// state of the client.
318 async fn get_updated_public_accounts(
319 &self,
320 account_updates: &[(AccountId, Word)],
321 current_public_accounts: &[&AccountHeader],
322 ) -> Result<Vec<Account>, ClientError> {
323 let mut mismatched_public_accounts = vec![];
324
325 for (id, commitment) in account_updates {
326 // check if this updated account state is tracked by the client
327 if let Some(account) = current_public_accounts
328 .iter()
329 .find(|acc| *id == acc.id() && *commitment != acc.commitment())
330 {
331 mismatched_public_accounts.push(*account);
332 }
333 }
334
335 self.rpc_api
336 .get_updated_public_accounts(&mismatched_public_accounts)
337 .await
338 .map_err(ClientError::RpcError)
339 }
340
341 /// Applies the changes received from the sync response to the notes and transactions tracked
342 /// by the client and updates the `note_updates` accordingly.
343 ///
344 /// This method uses the callbacks provided to the [`StateSync`] component to check if the
345 /// updates received are relevant to the client.
346 ///
347 /// The note updates might include:
348 /// * New notes that we received from the node and might be relevant to the client.
349 /// * Tracked expected notes that were committed in the block.
350 /// * Tracked notes that were being processed by a transaction that got committed.
351 /// * Tracked notes that were nullified by an external transaction.
352 ///
353 /// The `public_notes` parameter provides cached public note details for the current sync
354 /// iteration so the node is only queried once per batch.
355 async fn note_state_sync(
356 &self,
357 note_updates: &mut NoteUpdateTracker,
358 note_inclusions: Vec<CommittedNote>,
359 block_header: &BlockHeader,
360 public_notes: &BTreeMap<NoteId, InputNoteRecord>,
361 ) -> Result<bool, ClientError> {
362 // `found_relevant_note` tracks whether we want to persist the block header in the end
363 let mut found_relevant_note = false;
364
365 for committed_note in note_inclusions {
366 let public_note = (!committed_note.metadata().is_private())
367 .then(|| public_notes.get(committed_note.note_id()))
368 .flatten()
369 .cloned();
370
371 match self.note_screener.on_note_received(committed_note, public_note).await? {
372 NoteUpdateAction::Commit(committed_note) => {
373 // Only mark the downloaded block header as relevant if we are talking about
374 // an input note (output notes get marked as committed but we don't need the
375 // block for anything there)
376 found_relevant_note |= note_updates
377 .apply_committed_note_state_transitions(&committed_note, block_header)?;
378 },
379 NoteUpdateAction::Insert(public_note) => {
380 found_relevant_note = true;
381
382 note_updates.apply_new_public_note(public_note, block_header)?;
383 },
384 NoteUpdateAction::Discard => {},
385 }
386 }
387
388 Ok(found_relevant_note)
389 }
390
391 /// Queries the node for all received notes that aren't being locally tracked in the client.
392 ///
393 /// The client can receive metadata for private notes that it's not tracking. In this case,
394 /// notes are ignored for now as they become useless until details are imported.
395 async fn fetch_public_note_details(
396 &self,
397 query_notes: &[NoteId],
398 ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
399 if query_notes.is_empty() {
400 return Ok(BTreeMap::new());
401 }
402
403 info!("Getting note details for notes that are not being tracked.");
404
405 let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
406
407 Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
408 }
409
410 /// Collects the nullifier tags for the notes that were updated in the sync response and uses
411 /// the `sync_nullifiers` endpoint to check if there are new nullifiers for these
412 /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
413 ///
414 /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
415 async fn sync_nullifiers(
416 &self,
417 state_sync_update: &mut StateSyncUpdate,
418 current_block_num: BlockNumber,
419 ) -> Result<(), ClientError> {
420 // To receive information about added nullifiers, we reduce them to the higher 16 bits
421 // Note that besides filtering by nullifier prefixes, the node also filters by block number
422 // (it only returns nullifiers from current_block_num until
423 // response.block_header.block_num())
424
425 // Check for new nullifiers for input notes that were updated
426 let nullifiers_tags: Vec<u16> = state_sync_update
427 .note_updates
428 .unspent_nullifiers()
429 .map(|nullifier| nullifier.prefix())
430 .collect();
431
432 let mut new_nullifiers = self
433 .rpc_api
434 .sync_nullifiers(&nullifiers_tags, current_block_num, Some(state_sync_update.block_num))
435 .await?;
436
437 // Discard nullifiers that are newer than the current block (this might happen if the block
438 // changes between the sync_state and the check_nullifier calls)
439 new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num);
440
441 for nullifier_update in new_nullifiers {
442 state_sync_update.note_updates.apply_nullifiers_state_transitions(
443 &nullifier_update,
444 state_sync_update.transaction_updates.committed_transactions(),
445 )?;
446
447 // Process nullifiers and track the updates of local tracked transactions that were
448 // discarded because the notes that they were processing were nullified by an
449 // another transaction.
450 state_sync_update
451 .transaction_updates
452 .apply_input_note_nullified(nullifier_update.nullifier);
453 }
454
455 Ok(())
456 }
457
458 /// Applies the changes received from the sync response to the transactions tracked by the
459 /// client and updates the `transaction_updates` accordingly.
460 ///
461 /// The transaction updates might include:
462 /// * New transactions that were committed in the block.
463 /// * Transactions that were discarded because they were stale or expired.
464 fn transaction_state_sync(
465 &self,
466 transaction_updates: &mut TransactionUpdateTracker,
467 new_block_header: &BlockHeader,
468 transaction_inclusions: &[TransactionInclusion],
469 ) {
470 for transaction_inclusion in transaction_inclusions {
471 transaction_updates.apply_transaction_inclusion(
472 transaction_inclusion,
473 u64::from(new_block_header.timestamp()),
474 ); //TODO: Change timestamps from u64 to u32
475 }
476
477 transaction_updates
478 .apply_sync_height_update(new_block_header.block_num(), self.tx_graceful_blocks);
479 }
480}
481
482// HELPERS
483// ================================================================================================
484
485/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
486/// authentication nodes for leaves we track.
487fn apply_mmr_changes(
488 new_block: &BlockHeader,
489 new_block_has_relevant_notes: bool,
490 current_partial_mmr: &mut PartialMmr,
491 mmr_delta: MmrDelta,
492) -> Result<(MmrPeaks, Vec<(InOrderIndex, Word)>), ClientError> {
493 // Apply the MMR delta to bring MMR to forest equal to chain tip
494 let mut new_authentication_nodes: Vec<(InOrderIndex, Word)> =
495 current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
496
497 let new_peaks = current_partial_mmr.peaks();
498
499 new_authentication_nodes
500 .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
501
502 Ok((new_peaks, new_authentication_nodes))
503}