miden_client/sync/state_sync.rs
1use alloc::boxed::Box;
2use alloc::collections::{BTreeMap, BTreeSet};
3use alloc::sync::Arc;
4use alloc::vec::Vec;
5
6use miden_objects::Word;
7use miden_objects::account::{Account, AccountHeader, AccountId};
8use miden_objects::block::{BlockHeader, BlockNumber};
9use miden_objects::crypto::merkle::{InOrderIndex, MmrDelta, MmrPeaks, PartialMmr};
10use miden_objects::note::{NoteId, NoteTag};
11use tonic::async_trait;
12use tracing::info;
13
14use super::state_sync_update::TransactionUpdateTracker;
15use super::{AccountUpdates, BlockUpdates, StateSyncUpdate};
16use crate::ClientError;
17use crate::note::NoteUpdateTracker;
18use crate::rpc::NodeRpcClient;
19use crate::rpc::domain::note::CommittedNote;
20use crate::rpc::domain::transaction::TransactionInclusion;
21use crate::store::{InputNoteRecord, OutputNoteRecord, StoreError};
22use crate::transaction::TransactionRecord;
23
24// SYNC CALLBACKS
25// ================================================================================================
26
27/// The action to be taken when a note update is received as part of the sync response.
28#[allow(clippy::large_enum_variant)]
29pub enum NoteUpdateAction {
30 /// The note commit update is relevant and the specified note should be marked as committed in
31 /// the store, storing its inclusion proof.
32 Commit(CommittedNote),
33 /// The public note is relevant and should be inserted into the store.
34 Insert(InputNoteRecord),
35 /// The note update is not relevant and should be discarded.
36 Discard,
37}
38
39#[async_trait(?Send)]
40pub trait OnNoteReceived {
41 /// Callback that gets executed when a new note is received as part of the sync response.
42 ///
43 /// It receives:
44 ///
45 /// - The committed note received from the network.
46 /// - An optional note record that corresponds to the state of the note in the network (only if
47 /// the note is public).
48 ///
49 /// It returns an enum indicating the action to be taken for the received note update. Whether
50 /// the note updated should be committed, new public note inserted, or ignored.
51 async fn on_note_received(
52 &self,
53 committed_note: CommittedNote,
54 public_note: Option<InputNoteRecord>,
55 ) -> Result<NoteUpdateAction, ClientError>;
56}
57
58// STATE SYNC
59// ================================================================================================
60
61/// The state sync components encompasses the client's sync logic. It is then used to request
62/// updates from the node and apply them to the relevant elements. The updates are then returned and
63/// can be applied to the store to persist the changes.
64///
65/// When created it receives a callback that will be executed when a new note inclusion is received
66/// in the sync response.
67pub struct StateSync {
68 /// The RPC client used to communicate with the node.
69 rpc_api: Arc<dyn NodeRpcClient + Send>,
70 /// Responsible for checking the relevance of notes and executing the
71 /// [`OnNoteReceived`] callback when a new note inclusion is received.
72 note_screener: Arc<dyn OnNoteReceived>,
73 /// The number of blocks that are considered old enough to discard pending transactions. If
74 /// `None`, there is no limit and transactions will be kept indefinitely.
75 tx_graceful_blocks: Option<u32>,
76}
77
78impl StateSync {
79 /// Creates a new instance of the state sync component.
80 ///
81 /// # Arguments
82 ///
83 /// * `rpc_api` - The RPC client used to communicate with the node.
84 /// * `on_note_received` - A callback to be executed when a new note inclusion is received.
85 /// * `tx_graceful_blocks` - The number of blocks that are considered old enough to discard.
86 /// * `note_screener` - The note screener used to check the relevance of notes.
87 pub fn new(
88 rpc_api: Arc<dyn NodeRpcClient + Send>,
89 note_screener: Arc<dyn OnNoteReceived>,
90 tx_graceful_blocks: Option<u32>,
91 ) -> Self {
92 Self {
93 rpc_api,
94 note_screener,
95 tx_graceful_blocks,
96 }
97 }
98
99 /// Syncs the state of the client with the chain tip of the node, returning the updates that
100 /// should be applied to the store.
101 ///
102 /// During the sync process, the client will go through the following steps:
103 /// 1. A request is sent to the node to get the state updates. This request includes tracked
104 /// account IDs and the tags of notes that might have changed or that might be of interest to
105 /// the client.
106 /// 2. A response is received with the current state of the network. The response includes
107 /// information about new and committed notes, updated accounts, and committed transactions.
108 /// 3. Tracked public accounts are updated and private accounts are validated against the node
109 /// state.
110 /// 4. Tracked notes are updated with their new states. Notes might be committed or nullified
111 /// during the sync processing.
112 /// 5. New notes are checked, and only relevant ones are stored. Relevance is determined by the
113 /// [`OnNoteReceived`] callback.
114 /// 6. Transactions are updated with their new states. Transactions might be committed or
115 /// discarded.
116 /// 7. The MMR is updated with the new peaks and authentication nodes.
117 ///
118 /// # Arguments
119 /// * `current_partial_blockchain` - The current partial view of the blockchain.
120 /// * `accounts` - All the headers of tracked accounts.
121 /// * `note_tags` - The note tags to be used in the sync state request.
122 /// * `unspent_input_notes` - The current state of unspent input notes tracked by the client.
123 /// * `unspent_output_notes` - The current state of unspent output notes tracked by the client.
124 pub async fn sync_state(
125 self,
126 mut current_partial_mmr: PartialMmr,
127 accounts: Vec<AccountHeader>,
128 note_tags: BTreeSet<NoteTag>,
129 unspent_input_notes: Vec<InputNoteRecord>,
130 unspent_output_notes: Vec<OutputNoteRecord>,
131 uncommitted_transactions: Vec<TransactionRecord>,
132 ) -> Result<StateSyncUpdate, ClientError> {
133 let block_num = u32::try_from(
134 current_partial_mmr.forest().num_leaves().checked_sub(1).unwrap_or_default(),
135 )
136 .map_err(|_| ClientError::InvalidPartialMmrForest)?
137 .into();
138
139 let mut state_sync_update = StateSyncUpdate {
140 block_num,
141 note_updates: NoteUpdateTracker::new(unspent_input_notes, unspent_output_notes),
142 transaction_updates: TransactionUpdateTracker::new(uncommitted_transactions),
143 ..Default::default()
144 };
145
146 let note_tags = Arc::new(note_tags);
147 loop {
148 if !self
149 .sync_state_step(
150 &mut state_sync_update,
151 &mut current_partial_mmr,
152 &accounts,
153 note_tags.clone(),
154 )
155 .await?
156 {
157 break;
158 }
159 }
160
161 self.sync_nullifiers(&mut state_sync_update, block_num).await?;
162
163 Ok(state_sync_update)
164 }
165
166 /// Executes a single step of the state sync process, returning `true` if the client should
167 /// continue syncing and `false` if the client has reached the chain tip.
168 ///
169 /// A step in this context means a single request to the node to get the next relevant block and
170 /// the changes that happened in it. This block may not be the last one in the chain and
171 /// the client may need to call this method multiple times until it reaches the chain tip.
172 ///
173 /// The `sync_state_update` field of the struct will be updated with the new changes from this
174 /// step.
175 ///
176 /// This function returns whether the state sync process must continue, depending on whether
177 /// the chain tip was reached already.
178 async fn sync_state_step(
179 &self,
180 state_sync_update: &mut StateSyncUpdate,
181 current_partial_mmr: &mut PartialMmr,
182 accounts: &[AccountHeader],
183 note_tags: Arc<BTreeSet<NoteTag>>,
184 ) -> Result<bool, ClientError> {
185 let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
186
187 let response = self
188 .rpc_api
189 .sync_state(state_sync_update.block_num, &account_ids, note_tags.as_ref())
190 .await?;
191
192 // We don't need to continue if the chain has not advanced, there are no new changes
193 if response.block_header.block_num() == state_sync_update.block_num {
194 return Ok(false);
195 }
196
197 let new_block_num = response.block_header.block_num();
198 state_sync_update.block_num = new_block_num;
199
200 self.account_state_sync(
201 &mut state_sync_update.account_updates,
202 accounts,
203 &response.account_commitment_updates,
204 )
205 .await?;
206
207 self.transaction_state_sync(
208 &mut state_sync_update.transaction_updates,
209 &response.block_header,
210 &response.transactions,
211 );
212
213 let found_relevant_note = self
214 .note_state_sync(
215 &mut state_sync_update.note_updates,
216 response.note_inclusions,
217 &response.block_header,
218 )
219 .await?;
220
221 let (new_mmr_peaks, new_authentication_nodes) = apply_mmr_changes(
222 &response.block_header,
223 found_relevant_note,
224 current_partial_mmr,
225 response.mmr_delta,
226 )?;
227
228 let mut new_blocks = vec![];
229 if found_relevant_note || response.chain_tip == new_block_num {
230 // Only track relevant blocks or the chain tip
231 new_blocks.push((response.block_header, found_relevant_note, new_mmr_peaks));
232 }
233
234 state_sync_update
235 .block_updates
236 .extend(BlockUpdates::new(new_blocks, new_authentication_nodes));
237
238 if response.chain_tip == new_block_num {
239 Ok(false)
240 } else {
241 Ok(true)
242 }
243 }
244
245 // HELPERS
246 // --------------------------------------------------------------------------------------------
247
248 /// Compares the state of tracked accounts with the updates received from the node. The method
249 /// updates the `state_sync_update` field with the details of the accounts that need to be
250 /// updated.
251 ///
252 /// The account updates might include:
253 /// * Public accounts that have been updated in the node.
254 /// * Network accounts that have been updated in the node and are being tracked by the client.
255 /// * Private accounts that have been marked as mismatched because the current commitment
256 /// doesn't match the one received from the node. The client will need to handle these cases
257 /// as they could be a stale account state or a reason to lock the account.
258 async fn account_state_sync(
259 &self,
260 account_updates: &mut AccountUpdates,
261 accounts: &[AccountHeader],
262 account_commitment_updates: &[(AccountId, Word)],
263 ) -> Result<(), ClientError> {
264 let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
265 accounts.iter().partition(|account_header| !account_header.id().is_private());
266
267 let updated_public_accounts = self
268 .get_updated_public_accounts(account_commitment_updates, &public_accounts)
269 .await?;
270
271 let mismatched_private_accounts = account_commitment_updates
272 .iter()
273 .filter(|(account_id, digest)| {
274 private_accounts
275 .iter()
276 .any(|account| account.id() == *account_id && &account.commitment() != digest)
277 })
278 .copied()
279 .collect::<Vec<_>>();
280
281 account_updates
282 .extend(AccountUpdates::new(updated_public_accounts, mismatched_private_accounts));
283
284 Ok(())
285 }
286
287 /// Queries the node for the latest state of the public accounts that don't match the current
288 /// state of the client.
289 async fn get_updated_public_accounts(
290 &self,
291 account_updates: &[(AccountId, Word)],
292 current_public_accounts: &[&AccountHeader],
293 ) -> Result<Vec<Account>, ClientError> {
294 let mut mismatched_public_accounts = vec![];
295
296 for (id, commitment) in account_updates {
297 // check if this updated account state is tracked by the client
298 if let Some(account) = current_public_accounts
299 .iter()
300 .find(|acc| *id == acc.id() && *commitment != acc.commitment())
301 {
302 mismatched_public_accounts.push(*account);
303 }
304 }
305
306 self.rpc_api
307 .get_updated_public_accounts(&mismatched_public_accounts)
308 .await
309 .map_err(ClientError::RpcError)
310 }
311
312 /// Applies the changes received from the sync response to the notes and transactions tracked
313 /// by the client and updates the `note_updates` accordingly.
314 ///
315 /// This method uses the callbacks provided to the [`StateSync`] component to check if the
316 /// updates received are relevant to the client.
317 ///
318 /// The note updates might include:
319 /// * New notes that we received from the node and might be relevant to the client.
320 /// * Tracked expected notes that were committed in the block.
321 /// * Tracked notes that were being processed by a transaction that got committed.
322 /// * Tracked notes that were nullified by an external transaction.
323 async fn note_state_sync(
324 &self,
325 note_updates: &mut NoteUpdateTracker,
326 note_inclusions: Vec<CommittedNote>,
327 block_header: &BlockHeader,
328 ) -> Result<bool, ClientError> {
329 let public_note_ids: Vec<NoteId> = note_inclusions
330 .iter()
331 .filter_map(|note| (!note.metadata().is_private()).then_some(*note.note_id()))
332 .collect();
333
334 let mut found_relevant_note = false;
335
336 // Process note inclusions
337 let new_public_notes = self.fetch_public_note_details(&public_note_ids).await?;
338 for committed_note in note_inclusions {
339 let public_note = new_public_notes.get(committed_note.note_id()).cloned();
340
341 match self.note_screener.on_note_received(committed_note, public_note).await? {
342 NoteUpdateAction::Commit(committed_note) => {
343 found_relevant_note = true;
344
345 note_updates
346 .apply_committed_note_state_transitions(&committed_note, block_header)?;
347 },
348 NoteUpdateAction::Insert(public_note) => {
349 found_relevant_note = true;
350
351 note_updates.apply_new_public_note(public_note, block_header)?;
352 },
353 NoteUpdateAction::Discard => {},
354 }
355 }
356
357 Ok(found_relevant_note)
358 }
359
360 /// Queries the node for all received notes that aren't being locally tracked in the client.
361 ///
362 /// The client can receive metadata for private notes that it's not tracking. In this case,
363 /// notes are ignored for now as they become useless until details are imported.
364 async fn fetch_public_note_details(
365 &self,
366 query_notes: &[NoteId],
367 ) -> Result<BTreeMap<NoteId, InputNoteRecord>, ClientError> {
368 if query_notes.is_empty() {
369 return Ok(BTreeMap::new());
370 }
371 info!("Getting note details for notes that are not being tracked.");
372
373 let return_notes = self.rpc_api.get_public_note_records(query_notes, None).await?;
374
375 Ok(return_notes.into_iter().map(|note| (note.id(), note)).collect())
376 }
377
378 /// Collects the nullifier tags for the notes that were updated in the sync response and uses
379 /// the `check_nullifiers_by_prefix` endpoint to check if there are new nullifiers for these
380 /// notes. It then processes the nullifiers to apply the state transitions on the note updates.
381 ///
382 /// The `state_sync_update` parameter will be updated to track the new discarded transactions.
383 async fn sync_nullifiers(
384 &self,
385 state_sync_update: &mut StateSyncUpdate,
386 current_block_num: BlockNumber,
387 ) -> Result<(), ClientError> {
388 // To receive information about added nullifiers, we reduce them to the higher 16 bits
389 // Note that besides filtering by nullifier prefixes, the node also filters by block number
390 // (it only returns nullifiers from current_block_num until
391 // response.block_header.block_num())
392
393 // Check for new nullifiers for input notes that were updated
394 let nullifiers_tags: Vec<u16> = state_sync_update
395 .note_updates
396 .unspent_nullifiers()
397 .map(|nullifier| nullifier.prefix())
398 .collect();
399
400 let mut new_nullifiers = self
401 .rpc_api
402 .check_nullifiers_by_prefix(&nullifiers_tags, current_block_num)
403 .await?;
404
405 // Discard nullifiers that are newer than the current block (this might happen if the block
406 // changes between the sync_state and the check_nullifier calls)
407 new_nullifiers.retain(|update| update.block_num <= state_sync_update.block_num.as_u32());
408
409 for nullifier_update in new_nullifiers {
410 state_sync_update.note_updates.apply_nullifiers_state_transitions(
411 &nullifier_update,
412 state_sync_update.transaction_updates.committed_transactions(),
413 )?;
414
415 // Process nullifiers and track the updates of local tracked transactions that were
416 // discarded because the notes that they were processing were nullified by an
417 // another transaction.
418 state_sync_update
419 .transaction_updates
420 .apply_input_note_nullified(nullifier_update.nullifier);
421 }
422
423 Ok(())
424 }
425
426 /// Applies the changes received from the sync response to the transactions tracked by the
427 /// client and updates the `transaction_updates` accordingly.
428 ///
429 /// The transaction updates might include:
430 /// * New transactions that were committed in the block.
431 /// * Transactions that were discarded because they were stale or expired.
432 fn transaction_state_sync(
433 &self,
434 transaction_updates: &mut TransactionUpdateTracker,
435 new_block_header: &BlockHeader,
436 transaction_inclusions: &[TransactionInclusion],
437 ) {
438 for transaction_inclusion in transaction_inclusions {
439 transaction_updates.apply_transaction_inclusion(
440 transaction_inclusion,
441 u64::from(new_block_header.timestamp()),
442 ); //TODO: Change timestamps from u64 to u32
443 }
444
445 transaction_updates
446 .apply_sync_height_update(new_block_header.block_num(), self.tx_graceful_blocks);
447 }
448}
449
450// HELPERS
451// ================================================================================================
452
453/// Applies changes to the current MMR structure, returns the updated [`MmrPeaks`] and the
454/// authentication nodes for leaves we track.
455fn apply_mmr_changes(
456 new_block: &BlockHeader,
457 new_block_has_relevant_notes: bool,
458 current_partial_mmr: &mut PartialMmr,
459 mmr_delta: MmrDelta,
460) -> Result<(MmrPeaks, Vec<(InOrderIndex, Word)>), ClientError> {
461 // Apply the MMR delta to bring MMR to forest equal to chain tip
462 let mut new_authentication_nodes: Vec<(InOrderIndex, Word)> =
463 current_partial_mmr.apply(mmr_delta).map_err(StoreError::MmrError)?;
464
465 let new_peaks = current_partial_mmr.peaks();
466
467 new_authentication_nodes
468 .append(&mut current_partial_mmr.add(new_block.commitment(), new_block_has_relevant_notes));
469
470 Ok((new_peaks, new_authentication_nodes))
471}