Skip to main content

miden_client/sync/
mod.rs

1//! Provides the client APIs for synchronizing the client's local state with the Miden
2//! network. It ensures that the client maintains a valid, up-to-date view of the chain.
3//!
4//! ## Overview
5//!
6//! This module handles the synchronization process between the local client and the Miden network.
7//! The sync operation involves:
8//!
9//! - Querying the Miden node for state updates using tracked account IDs, note tags, and nullifier
10//!   prefixes.
11//! - Processing the received data to update note inclusion proofs, reconcile note state (new,
12//!   committed, or consumed), and update account states.
13//! - Incorporating new block headers and updating the local Merkle Mountain Range (MMR) with new
14//!   peaks and authentication nodes.
15//! - Aggregating transaction updates to determine which transactions have been committed or
16//!   discarded.
17//!
18//! The result of the synchronization process is captured in a [`SyncSummary`], which provides
19//! a summary of the new block number along with lists of received, committed, and consumed note
20//! IDs, updated account IDs, locked accounts, and committed transaction IDs.
21//!
22//! Once the data is requested and retrieved, updates are persisted in the client's store.
23//!
24//! ## Examples
25//!
26//! The following example shows how to initiate a state sync and handle the resulting summary:
27//!
28//! ```rust
29//! # use miden_client::auth::TransactionAuthenticator;
30//! # use miden_client::sync::SyncSummary;
31//! # use miden_client::{Client, ClientError};
32//! # use miden_protocol::{block::BlockHeader, Felt, Word};
33//! # use miden_protocol::crypto::rand::FeltRng;
34//! # async fn run_sync<AUTH: TransactionAuthenticator + Sync + 'static>(client: &mut Client<AUTH>) -> Result<(), ClientError> {
35//! // Attempt to synchronize the client's state with the Miden network.
36//! // The requested data is based on the client's state: it gets updates for accounts, relevant
37//! // notes, etc. For more information on the data that gets requested, see the doc comments for
38//! // `sync_state()`.
39//! let sync_summary: SyncSummary = client.sync_state().await?;
40//!
41//! println!("Synced up to block number: {}", sync_summary.block_num);
42//! println!("New private notes: {}", sync_summary.new_private_notes.len());
43//! println!("Committed notes: {}", sync_summary.committed_notes.len());
44//! println!("Consumed notes: {}", sync_summary.consumed_notes.len());
45//! println!("Updated accounts: {}", sync_summary.updated_accounts.len());
46//! println!("Locked accounts: {}", sync_summary.locked_accounts.len());
47//! println!("Committed transactions: {}", sync_summary.committed_transactions.len());
48//!
49//! Ok(())
50//! # }
51//! ```
52//!
53//! The `sync_state` method loops internally until the client is fully synced to the network tip.
54//!
55//! For more advanced usage, refer to the individual functions (such as
56//! `committed_note_updates` and `consumed_note_updates`) to understand how the sync data is
57//! processed and applied to the local store.
58
59use alloc::collections::BTreeSet;
60use alloc::sync::Arc;
61use alloc::vec::Vec;
62use core::cmp::max;
63
64use miden_protocol::account::AccountId;
65use miden_protocol::block::BlockNumber;
66use miden_protocol::note::NoteId;
67use miden_protocol::transaction::TransactionId;
68use miden_tx::auth::TransactionAuthenticator;
69use miden_tx::utils::serde::{Deserializable, DeserializationError, Serializable};
70use tracing::{debug, info};
71
72use crate::pswap::PswapChainObserver;
73use crate::store::{NoteFilter, TransactionFilter};
74use crate::{Client, ClientError};
75mod block_header;
76
77mod tag;
78pub use tag::{NoteTagRecord, NoteTagSource};
79
80mod note_observer;
81pub use note_observer::NoteObserver;
82
83mod state_sync;
84pub use state_sync::{NoteUpdateAction, OnNoteReceived, StateSync, StateSyncInput};
85
86mod state_sync_update;
87pub use state_sync_update::{
88    AccountUpdates,
89    PartialBlockchainUpdates,
90    PublicAccountDelta,
91    PublicAccountUpdate,
92    StateSyncUpdate,
93    TransactionUpdateTracker,
94};
95
96/// Client synchronization methods.
97impl<AUTH> Client<AUTH>
98where
99    AUTH: TransactionAuthenticator + Sync + 'static,
100{
101    // SYNC STATE
102    // --------------------------------------------------------------------------------------------
103
104    /// Returns the block number of the last state sync block.
105    pub async fn get_sync_height(&self) -> Result<BlockNumber, ClientError> {
106        self.store.get_sync_height().await.map_err(Into::into)
107    }
108
109    /// Syncs the client's on-chain state with the current state of the Miden network and returns
110    /// a [`SyncSummary`] corresponding to the local state update.
111    ///
112    /// Does **not** fetch private notes from the Note Transport Layer. Use
113    /// [`Client::sync_state`] for the combined sync, or call [`Client::sync_note_transport`]
114    /// separately.
115    ///
116    /// Builds the default sync input, runs [`StateSync::sync_state`] (see that method for the
117    /// detailed pipeline), applies the resulting update to the store, caches the partial MMR, and
118    /// prunes irrelevant blocks according to the configured cadence.
119    pub async fn sync_chain(&mut self) -> Result<SyncSummary, ClientError> {
120        self.ensure_genesis_in_place().await?;
121        self.ensure_rpc_limits_in_place().await?;
122
123        // Each `NoteObserver` owns its own per-sync state; `with_note_observer` just attaches.
124        let note_screener = self.note_screener();
125        let state_sync =
126            StateSync::new(self.rpc_api.clone(), Arc::new(note_screener), self.tx_discard_delta)
127                .with_note_observer(Arc::new(PswapChainObserver::new(self.store.clone())));
128        let input = self.build_sync_input().await?;
129
130        let mut partial_mmr = self.get_current_partial_mmr().await?;
131
132        // Get the sync update from the network
133        let state_sync_update = state_sync.sync_state(&mut partial_mmr, input).await?;
134
135        let sync_summary: SyncSummary = (&state_sync_update).into();
136        debug!(sync_summary = ?sync_summary, "Sync summary computed");
137
138        // Post-sync observer hooks; run before persisting. Per-observer errors are logged, not
139        // propagated.
140        state_sync.run_apply_hooks(&state_sync_update).await?;
141
142        info!("Applying changes to the store.");
143
144        // Apply received and computed updates to the store
145        self.store
146            .apply_state_sync(state_sync_update)
147            .await
148            .map_err(ClientError::StoreError)?;
149
150        // Cache MMR so pruning can reuse in-memory MMR.
151        self.cache_partial_mmr(partial_mmr).await?;
152
153        self.maybe_untrack_and_prune_irrelevant_blocks().await?;
154
155        Ok(sync_summary)
156    }
157
158    /// Fetches private notes from the Note Transport Layer for the tracked note tags.
159    ///
160    /// Returns the IDs of notes imported in this call. No-op (returns an empty vec) if note
161    /// transport is disabled.
162    pub async fn sync_note_transport(&mut self) -> Result<Vec<NoteId>, ClientError> {
163        if !self.is_note_transport_enabled() {
164            return Ok(Vec::new());
165        }
166
167        // Drain any private notes whose previous relay attempt failed. A flush
168        // error is logged, not propagated: a failing relay must not block the
169        // sync, and the entries stay durable for the next attempt.
170        if let Err(err) = self.flush_relay_outbox().await {
171            tracing::warn!(?err, "relay outbox flush failed during sync; entries retained");
172        }
173
174        let cursor = self.store.get_note_transport_cursor().await?;
175        let note_tags: Vec<_> = self.store.get_unique_note_tags().await?.into_iter().collect();
176        let (ids, new_cursor) = self.fetch_transport_notes(cursor, &note_tags).await?;
177        self.store.update_note_transport_cursor(new_cursor).await?;
178        Ok(ids)
179    }
180
181    /// Runs the full client sync.
182    ///
183    /// First fetches private notes from the Note Transport Layer (see
184    /// [`Client::sync_note_transport`]), then syncs the client's on-chain state with the Miden
185    /// node (see [`Client::sync_chain`]). If note transport is disabled, this is equivalent to
186    /// [`Client::sync_chain`].
187    ///
188    /// Fails fast on the first error. Private notes delivered via NTL are imported before the
189    /// chain sync reads its input set, so their nullifiers are checked in the same call.
190    pub async fn sync_state(&mut self) -> Result<SyncSummary, ClientError> {
191        let new_private_notes = self.sync_note_transport().await?;
192        let mut summary = self.sync_chain().await?;
193        summary.new_private_notes = new_private_notes;
194        Ok(summary)
195    }
196
197    /// Builds a default [`StateSyncInput`] from the current client state.
198    ///
199    /// This includes all tracked account headers, all unique note tags, all unspent input and
200    /// output notes, and all uncommitted transactions.
201    pub async fn build_sync_input(&self) -> Result<StateSyncInput, ClientError> {
202        let accounts = self
203            .store
204            .get_account_headers()
205            .await?
206            .into_iter()
207            .map(|(header, _status)| header)
208            .collect();
209
210        let note_tags = self.store.get_unique_note_tags().await?;
211
212        let input_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
213        let output_notes = self.store.get_output_notes(NoteFilter::Unspent).await?;
214
215        let uncommitted_transactions =
216            self.store.get_transactions(TransactionFilter::Uncommitted).await?;
217
218        Ok(StateSyncInput {
219            accounts,
220            note_tags,
221            input_notes,
222            output_notes,
223            uncommitted_transactions,
224        })
225    }
226
227    /// Applies the state sync update to the store and prunes irrelevant blocks according to the
228    /// configured cadence.
229    ///
230    /// See [`crate::Store::apply_state_sync()`] for what the update implies.
231    pub async fn apply_state_sync(&mut self, update: StateSyncUpdate) -> Result<(), ClientError> {
232        self.store.apply_state_sync(update).await?;
233
234        self.maybe_untrack_and_prune_irrelevant_blocks().await?;
235
236        Ok(())
237    }
238
239    /// Prunes irrelevant blocks and their MMR authentication nodes according to the configured
240    /// cadence.
241    async fn maybe_untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
242        let Some(interval) = self.irrelevant_block_prune_interval else {
243            return Ok(());
244        };
245
246        let sync_height = self.store.get_sync_height().await?;
247
248        if let Some(last_prune_height) = self.last_irrelevant_block_prune_sync_height
249            && sync_height < last_prune_height + interval
250        {
251            return Ok(());
252        }
253
254        self.untrack_and_prune_irrelevant_blocks().await?;
255        self.last_irrelevant_block_prune_sync_height = Some(sync_height);
256
257        Ok(())
258    }
259
260    /// Prunes irrelevant block data from the store.
261    ///
262    /// Identifies tracked blocks whose input notes have all been consumed, untracks them from the
263    /// `PartialMmr` to determine which authentication nodes are no longer needed, then delegates
264    /// to [`Store::untrack_and_prune_irrelevant_blocks`] to atomically remove the stale nodes,
265    /// mark the blocks as irrelevant, and delete irrelevant block headers.
266    /// Any caller of this function should've cached the `PartialMmr` beforehand.
267    async fn untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
268        let tracked_blocks = self.store.get_tracked_block_header_numbers().await?;
269        let to_untrack: Vec<usize> = if tracked_blocks.is_empty() {
270            // Do not early-return: even without blocks to untrack, old irrelevant tip headers may
271            // need pruning.
272            Vec::new()
273        } else {
274            // Blocks that still have at least one unspent note need to stay tracked.
275            let unspent_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
276            let live_blocks: BTreeSet<usize> = unspent_notes
277                .iter()
278                .filter_map(|n| n.inclusion_proof().map(|p| p.location().block_num().as_usize()))
279                .collect();
280
281            tracked_blocks.difference(&live_blocks).copied().collect()
282        };
283
284        let mut blocks_to_untrack = Vec::new();
285        let mut nodes_to_remove = Vec::new();
286        let mut updated_partial_mmr = None;
287
288        if !to_untrack.is_empty() {
289            // Rebuild the PartialMmr and untrack each block to collect the authentication node
290            // indices that are no longer needed by any remaining tracked leaf.
291            let mut partial_mmr = self.get_current_partial_mmr().await?;
292            for &block_pos in &to_untrack {
293                nodes_to_remove
294                    .extend(partial_mmr.untrack(block_pos).into_iter().map(|(idx, _)| idx));
295            }
296
297            blocks_to_untrack = to_untrack
298                .iter()
299                .map(|&b| BlockNumber::from(u32::try_from(b).expect("block number fits in u32")))
300                .collect();
301            updated_partial_mmr = Some(partial_mmr);
302        }
303
304        // Store deletes stale auth nodes, marks blocks as irrelevant, and removes irrelevant
305        // block headers. Old irrelevant tip headers may still need pruning.
306        self.store
307            .untrack_and_prune_irrelevant_blocks(&blocks_to_untrack, &nodes_to_remove)
308            .await?;
309
310        if let Some(partial_mmr) = updated_partial_mmr {
311            self.cache_partial_mmr(partial_mmr).await?;
312        }
313
314        Ok(())
315    }
316
317    /// Ensures that the RPC limits are set in the RPC client. If not already cached,
318    /// fetches them from the node and persists them in the store.
319    pub async fn ensure_rpc_limits_in_place(&mut self) -> Result<(), ClientError> {
320        if self.rpc_api.has_rpc_limits().is_some() {
321            return Ok(());
322        }
323
324        let limits = self.rpc_api.get_rpc_limits().await?;
325        self.store.set_rpc_limits(limits).await?;
326        Ok(())
327    }
328}
329
330// SYNC SUMMARY
331// ================================================================================================
332
333/// Contains stats about the sync operation.
334#[derive(Debug, PartialEq)]
335pub struct SyncSummary {
336    /// Block number up to which the client has been synced.
337    pub block_num: BlockNumber,
338    /// IDs of new public notes that the client has received.
339    pub new_public_notes: Vec<NoteId>,
340    /// IDs of private notes imported from the Note Transport Layer in this sync. They are still
341    /// `Expected` until observed on-chain.
342    ///
343    /// Only populated by [`Client::sync_state`]; [`Client::sync_chain`] always leaves this empty
344    /// because it does not touch the Note Transport Layer.
345    pub new_private_notes: Vec<NoteId>,
346    /// IDs of tracked notes that have been committed.
347    pub committed_notes: Vec<NoteId>,
348    /// IDs of notes that have been consumed.
349    pub consumed_notes: Vec<NoteId>,
350    /// IDs of on-chain accounts that have been updated.
351    pub updated_accounts: Vec<AccountId>,
352    /// IDs of private accounts that have been locked.
353    pub locked_accounts: Vec<AccountId>,
354    /// IDs of committed transactions.
355    pub committed_transactions: Vec<TransactionId>,
356}
357
358impl SyncSummary {
359    pub fn new(
360        block_num: BlockNumber,
361        new_public_notes: Vec<NoteId>,
362        new_private_notes: Vec<NoteId>,
363        committed_notes: Vec<NoteId>,
364        consumed_notes: Vec<NoteId>,
365        updated_accounts: Vec<AccountId>,
366        locked_accounts: Vec<AccountId>,
367        committed_transactions: Vec<TransactionId>,
368    ) -> Self {
369        Self {
370            block_num,
371            new_public_notes,
372            new_private_notes,
373            committed_notes,
374            consumed_notes,
375            updated_accounts,
376            locked_accounts,
377            committed_transactions,
378        }
379    }
380
381    pub fn new_empty(block_num: BlockNumber) -> Self {
382        Self {
383            block_num,
384            new_public_notes: vec![],
385            new_private_notes: vec![],
386            committed_notes: vec![],
387            consumed_notes: vec![],
388            updated_accounts: vec![],
389            locked_accounts: vec![],
390            committed_transactions: vec![],
391        }
392    }
393
394    pub fn is_empty(&self) -> bool {
395        self.new_public_notes.is_empty()
396            && self.new_private_notes.is_empty()
397            && self.committed_notes.is_empty()
398            && self.consumed_notes.is_empty()
399            && self.updated_accounts.is_empty()
400            && self.locked_accounts.is_empty()
401            && self.committed_transactions.is_empty()
402    }
403
404    pub fn combine_with(&mut self, mut other: Self) {
405        self.block_num = max(self.block_num, other.block_num);
406        self.new_public_notes.append(&mut other.new_public_notes);
407        self.new_private_notes.append(&mut other.new_private_notes);
408        self.committed_notes.append(&mut other.committed_notes);
409        self.consumed_notes.append(&mut other.consumed_notes);
410        self.updated_accounts.append(&mut other.updated_accounts);
411        self.locked_accounts.append(&mut other.locked_accounts);
412        self.committed_transactions.append(&mut other.committed_transactions);
413    }
414}
415
416impl Serializable for SyncSummary {
417    fn write_into<W: miden_tx::utils::serde::ByteWriter>(&self, target: &mut W) {
418        self.block_num.write_into(target);
419        self.new_public_notes.write_into(target);
420        self.new_private_notes.write_into(target);
421        self.committed_notes.write_into(target);
422        self.consumed_notes.write_into(target);
423        self.updated_accounts.write_into(target);
424        self.locked_accounts.write_into(target);
425        self.committed_transactions.write_into(target);
426    }
427}
428
429impl Deserializable for SyncSummary {
430    fn read_from<R: miden_tx::utils::serde::ByteReader>(
431        source: &mut R,
432    ) -> Result<Self, DeserializationError> {
433        let block_num = BlockNumber::read_from(source)?;
434        let new_public_notes = Vec::<NoteId>::read_from(source)?;
435        let new_private_notes = Vec::<NoteId>::read_from(source)?;
436        let committed_notes = Vec::<NoteId>::read_from(source)?;
437        let consumed_notes = Vec::<NoteId>::read_from(source)?;
438        let updated_accounts = Vec::<AccountId>::read_from(source)?;
439        let locked_accounts = Vec::<AccountId>::read_from(source)?;
440        let committed_transactions = Vec::<TransactionId>::read_from(source)?;
441
442        Ok(Self {
443            block_num,
444            new_public_notes,
445            new_private_notes,
446            committed_notes,
447            consumed_notes,
448            updated_accounts,
449            locked_accounts,
450            committed_transactions,
451        })
452    }
453}