1use alloc::{collections::BTreeMap, vec::Vec};
58use core::cmp::max;
59
60use miden_objects::{
61 Digest,
62 account::{Account, AccountHeader, AccountId},
63 block::{BlockHeader, BlockNumber},
64 note::{NoteId, NoteInclusionProof, NoteTag, Nullifier},
65 transaction::TransactionId,
66};
67use miden_tx::utils::{Deserializable, DeserializationError, Serializable};
68use tracing::info;
69
70use crate::{
71 Client, ClientError,
72 note::NoteUpdates,
73 rpc::domain::{
74 note::CommittedNote, nullifier::NullifierUpdate, transaction::TransactionUpdate,
75 },
76 store::{AccountUpdates, InputNoteRecord, NoteFilter, OutputNoteRecord, TransactionFilter},
77 transaction::{TransactionRecord, TransactionStatus, TransactionUpdates},
78};
79
80mod block_header;
81pub(crate) use block_header::MAX_BLOCK_NUMBER_DELTA;
82use block_header::apply_mmr_changes;
83
84mod tag;
85pub use tag::{NoteTagRecord, NoteTagSource};
86
87pub const TX_GRACEFUL_BLOCKS: u32 = 20;
89mod state_sync_update;
90pub use state_sync_update::StateSyncUpdate;
91
92#[derive(Debug, PartialEq)]
94pub struct SyncSummary {
95 pub block_num: BlockNumber,
97 pub committed_notes: Vec<NoteId>,
99 pub consumed_notes: Vec<NoteId>,
101 pub updated_accounts: Vec<AccountId>,
103 pub locked_accounts: Vec<AccountId>,
105 pub committed_transactions: Vec<TransactionId>,
107}
108
109impl SyncSummary {
110 pub fn new(
111 block_num: BlockNumber,
112 committed_notes: Vec<NoteId>,
113 consumed_notes: Vec<NoteId>,
114 updated_accounts: Vec<AccountId>,
115 locked_accounts: Vec<AccountId>,
116 committed_transactions: Vec<TransactionId>,
117 ) -> Self {
118 Self {
119 block_num,
120 committed_notes,
121 consumed_notes,
122 updated_accounts,
123 locked_accounts,
124 committed_transactions,
125 }
126 }
127
128 pub fn new_empty(block_num: BlockNumber) -> Self {
129 Self {
130 block_num,
131 committed_notes: vec![],
132 consumed_notes: vec![],
133 updated_accounts: vec![],
134 locked_accounts: vec![],
135 committed_transactions: vec![],
136 }
137 }
138
139 pub fn is_empty(&self) -> bool {
140 self.committed_notes.is_empty()
141 && self.consumed_notes.is_empty()
142 && self.updated_accounts.is_empty()
143 && self.locked_accounts.is_empty()
144 && self.committed_transactions.is_empty()
145 }
146
147 pub fn combine_with(&mut self, mut other: Self) {
148 self.block_num = max(self.block_num, other.block_num);
149 self.committed_notes.append(&mut other.committed_notes);
150 self.consumed_notes.append(&mut other.consumed_notes);
151 self.updated_accounts.append(&mut other.updated_accounts);
152 self.locked_accounts.append(&mut other.locked_accounts);
153 self.committed_transactions.append(&mut other.committed_transactions);
154 }
155}
156
157impl Serializable for SyncSummary {
158 fn write_into<W: miden_tx::utils::ByteWriter>(&self, target: &mut W) {
159 self.block_num.write_into(target);
160 self.committed_notes.write_into(target);
161 self.consumed_notes.write_into(target);
162 self.updated_accounts.write_into(target);
163 self.locked_accounts.write_into(target);
164 self.committed_transactions.write_into(target);
165 }
166}
167
168impl Deserializable for SyncSummary {
169 fn read_from<R: miden_tx::utils::ByteReader>(
170 source: &mut R,
171 ) -> Result<Self, DeserializationError> {
172 let block_num = BlockNumber::read_from(source)?;
173 let committed_notes = Vec::<NoteId>::read_from(source)?;
174 let consumed_notes = Vec::<NoteId>::read_from(source)?;
175 let updated_accounts = Vec::<AccountId>::read_from(source)?;
176 let locked_accounts = Vec::<AccountId>::read_from(source)?;
177 let committed_transactions = Vec::<TransactionId>::read_from(source)?;
178
179 Ok(Self {
180 block_num,
181 committed_notes,
182 consumed_notes,
183 updated_accounts,
184 locked_accounts,
185 committed_transactions,
186 })
187 }
188}
189
190enum SyncStatus {
191 SyncedToLastBlock(SyncSummary),
192 SyncedToBlock(SyncSummary),
193}
194
195impl SyncStatus {
196 pub fn into_sync_summary(self) -> SyncSummary {
197 match self {
198 SyncStatus::SyncedToBlock(summary) | SyncStatus::SyncedToLastBlock(summary) => summary,
199 }
200 }
201}
202
203impl Client {
208 pub async fn get_sync_height(&self) -> Result<BlockNumber, ClientError> {
213 self.store.get_sync_height().await.map_err(Into::into)
214 }
215
216 pub async fn sync_state(&mut self) -> Result<SyncSummary, ClientError> {
236 let starting_block_num = self.get_sync_height().await?;
237
238 _ = self.ensure_genesis_in_place().await?;
239 let mut total_sync_summary = SyncSummary::new_empty(0.into());
240 loop {
241 let response = self.sync_state_once().await?;
242 let is_last_block = matches!(response, SyncStatus::SyncedToLastBlock(_));
243 total_sync_summary.combine_with(response.into_sync_summary());
244
245 if is_last_block {
246 break;
247 }
248 }
249 self.update_mmr_data().await?;
250 total_sync_summary.combine_with(self.sync_nullifiers(starting_block_num).await?);
252
253 Ok(total_sync_summary)
254 }
255
256 async fn sync_state_once(&mut self) -> Result<SyncStatus, ClientError> {
257 let current_block_num = self.store.get_sync_height().await?;
258
259 let accounts: Vec<AccountHeader> = self
260 .store
261 .get_account_headers()
262 .await?
263 .into_iter()
264 .map(|(acc_header, _)| acc_header)
265 .collect();
266
267 let note_tags: Vec<NoteTag> =
268 self.store.get_unique_note_tags().await?.into_iter().collect();
269
270 let account_ids: Vec<AccountId> = accounts.iter().map(AccountHeader::id).collect();
272 let response = self.rpc_api.sync_state(current_block_num, &account_ids, ¬e_tags).await?;
273
274 if response.block_header.block_num() == current_block_num {
276 return Ok(SyncStatus::SyncedToLastBlock(SyncSummary::new_empty(current_block_num)));
277 }
278
279 let (note_updates, tags_to_remove) = self
280 .committed_note_updates(response.note_inclusions, &response.block_header)
281 .await?;
282
283 let incoming_block_has_relevant_notes = self.check_block_relevance(¬e_updates).await?;
284
285 let transactions_to_commit = self.get_transactions_to_commit(response.transactions).await?;
286
287 let (public_accounts, private_accounts): (Vec<_>, Vec<_>) =
288 accounts.into_iter().partition(|account_header| account_header.id().is_public());
289
290 let updated_public_accounts = self
291 .get_updated_public_accounts(&response.account_commitment_updates, &public_accounts)
292 .await?;
293
294 let mismatched_private_accounts = self
295 .validate_local_account_commitments(
296 &response.account_commitment_updates,
297 &private_accounts,
298 )
299 .await?;
300
301 let (new_peaks, new_authentication_nodes) = {
303 let current_partial_mmr = self.build_current_partial_mmr(false).await?;
304
305 let (current_block, has_relevant_notes) = self
306 .store
307 .get_block_header_by_num(current_block_num)
308 .await?
309 .expect("Current block should be in the store");
310
311 apply_mmr_changes(
312 current_partial_mmr,
313 response.mmr_delta,
314 ¤t_block,
315 has_relevant_notes,
316 )?
317 };
318
319 let sync_summary = SyncSummary::new(
321 response.block_header.block_num(),
322 note_updates.committed_note_ids().into_iter().collect(),
323 note_updates.consumed_note_ids().into_iter().collect(),
324 updated_public_accounts.iter().map(Account::id).collect(),
325 mismatched_private_accounts.iter().map(|(acc_id, _)| *acc_id).collect(),
326 transactions_to_commit.iter().map(|tx| tx.transaction_id).collect(),
327 );
328 let response_block_num = response.block_header.block_num();
329
330 let transactions_to_discard = vec![];
331
332 let graceful_block_num =
334 response_block_num.checked_sub(TX_GRACEFUL_BLOCKS).unwrap_or_default();
335 let mut stale_transactions: Vec<TransactionRecord> = self
337 .store
338 .get_transactions(TransactionFilter::ExpiredBefore(graceful_block_num))
339 .await?;
340
341 stale_transactions.retain(|tx| {
342 !transactions_to_commit
343 .iter()
344 .map(|tx| tx.transaction_id)
345 .collect::<Vec<_>>()
346 .contains(&tx.id)
347 && !transactions_to_discard.contains(&tx.id)
348 });
349
350 let state_sync_update = StateSyncUpdate {
351 block_header: response.block_header,
352 block_has_relevant_notes: incoming_block_has_relevant_notes,
353 new_mmr_peaks: new_peaks,
354 new_authentication_nodes,
355 note_updates,
356 transaction_updates: TransactionUpdates::new(
357 transactions_to_commit,
358 transactions_to_discard,
359 stale_transactions,
360 ),
361 account_updates: AccountUpdates::new(
362 updated_public_accounts,
363 mismatched_private_accounts,
364 ),
365 tags_to_remove,
366 };
367
368 self.store
370 .apply_state_sync(state_sync_update)
371 .await
372 .map_err(ClientError::StoreError)?;
373
374 if response.chain_tip == response_block_num {
375 Ok(SyncStatus::SyncedToLastBlock(sync_summary))
376 } else {
377 Ok(SyncStatus::SyncedToBlock(sync_summary))
378 }
379 }
380
381 async fn sync_nullifiers(
385 &mut self,
386 starting_block_num: BlockNumber,
387 ) -> Result<SyncSummary, ClientError> {
388 let nullifiers_tags: Vec<u16> = self
393 .store
394 .get_unspent_input_note_nullifiers()
395 .await?
396 .iter()
397 .map(Nullifier::prefix)
398 .collect();
399
400 let mut nullifiers = self
401 .rpc_api
402 .check_nullifiers_by_prefix(&nullifiers_tags, starting_block_num)
403 .await?;
404
405 let current_block_num = self.get_sync_height().await?;
408 nullifiers.retain(|update| update.block_num <= current_block_num.as_u32());
409
410 let committed_transactions = self
412 .store
413 .get_transactions(TransactionFilter::All)
414 .await?
415 .into_iter()
416 .filter_map(|tx| {
417 if let TransactionStatus::Committed(block_num) = tx.transaction_status {
418 Some(TransactionUpdate {
419 transaction_id: tx.id,
420 account_id: tx.account_id,
421 block_num: block_num.as_u32(),
422 })
423 } else {
424 None
425 }
426 })
427 .collect::<Vec<_>>();
428
429 let (consumed_note_updates, transactions_to_discard) =
430 self.consumed_note_updates(&nullifiers, &committed_transactions).await?;
431
432 let sync_summary = SyncSummary::new(
434 0.into(),
435 consumed_note_updates.committed_note_ids().into_iter().collect(),
436 consumed_note_updates.consumed_note_ids().into_iter().collect(),
437 vec![],
438 vec![],
439 committed_transactions.iter().map(|tx| tx.transaction_id).collect(),
440 );
441
442 self.store
444 .apply_nullifiers(consumed_note_updates, transactions_to_discard)
445 .await
446 .map_err(ClientError::StoreError)?;
447
448 Ok(sync_summary)
449 }
450
451 async fn committed_note_updates(
454 &mut self,
455 committed_notes: Vec<CommittedNote>,
456 block_header: &BlockHeader,
457 ) -> Result<(NoteUpdates, Vec<NoteTagRecord>), ClientError> {
458 let relevant_note_filter =
462 NoteFilter::List(committed_notes.iter().map(CommittedNote::note_id).copied().collect());
463
464 let mut committed_input_notes: BTreeMap<NoteId, InputNoteRecord> = self
465 .store
466 .get_input_notes(relevant_note_filter.clone())
467 .await?
468 .into_iter()
469 .map(|n| (n.id(), n))
470 .collect();
471
472 let mut committed_output_notes: BTreeMap<NoteId, OutputNoteRecord> = self
473 .store
474 .get_output_notes(relevant_note_filter)
475 .await?
476 .into_iter()
477 .map(|n| (n.id(), n))
478 .collect();
479
480 let mut new_public_notes = vec![];
481 let mut committed_tracked_input_notes = vec![];
482 let mut committed_tracked_output_notes = vec![];
483 let mut removed_tags = vec![];
484
485 for committed_note in committed_notes {
486 let inclusion_proof = NoteInclusionProof::new(
487 block_header.block_num(),
488 committed_note.note_index(),
489 committed_note.merkle_path().clone(),
490 )?;
491
492 if let Some(mut note_record) = committed_input_notes.remove(committed_note.note_id()) {
493 let inclusion_proof_received = note_record
496 .inclusion_proof_received(inclusion_proof.clone(), committed_note.metadata())?;
497 let block_header_received = note_record.block_header_received(block_header)?;
498
499 removed_tags.push((¬e_record).try_into()?);
500
501 if inclusion_proof_received || block_header_received {
502 committed_tracked_input_notes.push(note_record);
503 }
504 }
505
506 if let Some(mut note_record) = committed_output_notes.remove(committed_note.note_id()) {
507 if note_record.inclusion_proof_received(inclusion_proof.clone())? {
510 committed_tracked_output_notes.push(note_record);
511 }
512 }
513
514 if !committed_input_notes.contains_key(committed_note.note_id())
515 && !committed_output_notes.contains_key(committed_note.note_id())
516 {
517 new_public_notes.push(*committed_note.note_id());
519 }
520 }
521
522 let new_public_notes =
524 self.fetch_public_note_details(&new_public_notes, block_header).await?;
525
526 Ok((
527 NoteUpdates::new(
528 [new_public_notes, committed_tracked_input_notes].concat(),
529 committed_tracked_output_notes,
530 ),
531 removed_tags,
532 ))
533 }
534
535 async fn consumed_note_updates(
538 &mut self,
539 nullifiers: &[NullifierUpdate],
540 committed_transactions: &[TransactionUpdate],
541 ) -> Result<(NoteUpdates, Vec<TransactionId>), ClientError> {
542 let nullifier_filter = NoteFilter::Nullifiers(
543 nullifiers.iter().map(|nullifier_update| nullifier_update.nullifier).collect(),
544 );
545
546 let mut consumed_input_notes: BTreeMap<Nullifier, InputNoteRecord> = self
547 .store
548 .get_input_notes(nullifier_filter.clone())
549 .await?
550 .into_iter()
551 .map(|n| (n.nullifier(), n))
552 .collect();
553
554 let mut consumed_output_notes: BTreeMap<Nullifier, OutputNoteRecord> = self
555 .store
556 .get_output_notes(nullifier_filter)
557 .await?
558 .into_iter()
559 .map(|n| {
560 (
561 n.nullifier()
562 .expect("Output notes returned by this query should have nullifiers"),
563 n,
564 )
565 })
566 .collect();
567
568 let mut consumed_tracked_input_notes = vec![];
569 let mut consumed_tracked_output_notes = vec![];
570
571 for transaction_update in committed_transactions {
572 let transaction_nullifiers: Vec<Nullifier> = consumed_input_notes
573 .iter()
574 .filter_map(|(nullifier, note_record)| {
575 if note_record.is_processing()
576 && note_record.consumer_transaction_id()
577 == Some(&transaction_update.transaction_id)
578 {
579 Some(nullifier)
580 } else {
581 None
582 }
583 })
584 .copied()
585 .collect();
586
587 for nullifier in transaction_nullifiers {
588 if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) {
589 if input_note_record.transaction_committed(
590 transaction_update.transaction_id,
591 transaction_update.block_num,
592 )? {
593 consumed_tracked_input_notes.push(input_note_record);
594 }
595 }
596 }
597 }
598
599 let mut discarded_transactions = vec![];
601 for nullifier_update in nullifiers {
602 let nullifier = nullifier_update.nullifier;
603 let block_num = nullifier_update.block_num;
604
605 if let Some(mut input_note_record) = consumed_input_notes.remove(&nullifier) {
606 if input_note_record.is_processing() {
607 discarded_transactions.push(
608 *input_note_record
609 .consumer_transaction_id()
610 .expect("Processing note should have consumer transaction id"),
611 );
612 }
613
614 if input_note_record.consumed_externally(nullifier, block_num)? {
615 consumed_tracked_input_notes.push(input_note_record);
616 }
617 }
618
619 if let Some(mut output_note_record) = consumed_output_notes.remove(&nullifier) {
620 if output_note_record.nullifier_received(nullifier, block_num)? {
621 consumed_tracked_output_notes.push(output_note_record);
622 }
623 }
624 }
625
626 Ok((
627 NoteUpdates::new(consumed_tracked_input_notes, consumed_tracked_output_notes),
628 discarded_transactions,
629 ))
630 }
631
632 async fn fetch_public_note_details(
637 &mut self,
638 query_notes: &[NoteId],
639 block_header: &BlockHeader,
640 ) -> Result<Vec<InputNoteRecord>, ClientError> {
641 if query_notes.is_empty() {
642 return Ok(vec![]);
643 }
644 info!("Getting note details for notes that are not being tracked.");
645
646 let mut return_notes = self
647 .rpc_api
648 .get_public_note_records(query_notes, self.store.get_current_timestamp())
649 .await?;
650
651 for note in &mut return_notes {
652 note.block_header_received(block_header)?;
653 }
654
655 Ok(return_notes)
656 }
657
658 async fn get_transactions_to_commit(
661 &self,
662 mut transactions: Vec<TransactionUpdate>,
663 ) -> Result<Vec<TransactionUpdate>, ClientError> {
664 let uncommitted_transaction_ids = self
666 .store
667 .get_transactions(TransactionFilter::Uncomitted)
668 .await?
669 .into_iter()
670 .map(|tx| tx.id)
671 .collect::<Vec<_>>();
672
673 transactions.retain(|transaction_update| {
674 uncommitted_transaction_ids.contains(&transaction_update.transaction_id)
675 });
676
677 Ok(transactions)
678 }
679
680 async fn get_updated_public_accounts(
681 &mut self,
682 account_updates: &[(AccountId, Digest)],
683 current_public_accounts: &[AccountHeader],
684 ) -> Result<Vec<Account>, ClientError> {
685 let mut mismatched_public_accounts = vec![];
686
687 for (id, commitment) in account_updates {
688 if let Some(account) = current_public_accounts
690 .iter()
691 .find(|acc| *id == acc.id() && *commitment != acc.commitment())
692 {
693 mismatched_public_accounts.push(account);
694 }
695 }
696
697 self.rpc_api
698 .get_updated_public_accounts(&mismatched_public_accounts)
699 .await
700 .map_err(ClientError::RpcError)
701 }
702
703 async fn validate_local_account_commitments(
713 &mut self,
714 account_updates: &[(AccountId, Digest)],
715 current_private_accounts: &[AccountHeader],
716 ) -> Result<Vec<(AccountId, Digest)>, ClientError> {
717 let mut mismatched_accounts = vec![];
718
719 for (remote_account_id, remote_account_commitment) in account_updates {
720 let mismatched_account = current_private_accounts.iter().find(|acc| {
722 *remote_account_id == acc.id() && *remote_account_commitment != acc.commitment()
723 });
724
725 if mismatched_account.is_some() {
728 let account_by_commitment =
729 self.store.get_account_header_by_commitment(*remote_account_commitment).await?;
730
731 if account_by_commitment.is_none() {
732 mismatched_accounts.push((*remote_account_id, *remote_account_commitment));
733 }
734 }
735 }
736 Ok(mismatched_accounts)
737 }
738}