1use alloc::collections::BTreeSet;
60use alloc::sync::Arc;
61use alloc::vec::Vec;
62use core::cmp::max;
63
64use miden_protocol::account::AccountId;
65use miden_protocol::block::BlockNumber;
66use miden_protocol::note::NoteId;
67use miden_protocol::transaction::TransactionId;
68use miden_tx::auth::TransactionAuthenticator;
69use miden_tx::utils::serde::{Deserializable, DeserializationError, Serializable};
70use tracing::{debug, info};
71
72use crate::store::{NoteFilter, TransactionFilter};
73use crate::{Client, ClientError};
74mod block_header;
75
76mod tag;
77pub use tag::{NoteTagRecord, NoteTagSource};
78
79mod state_sync;
80pub use state_sync::{NoteUpdateAction, OnNoteReceived, StateSync, StateSyncInput};
81
82mod state_sync_update;
83pub use state_sync_update::{
84 AccountUpdates,
85 PartialBlockchainUpdates,
86 PublicAccountDelta,
87 PublicAccountUpdate,
88 StateSyncUpdate,
89 TransactionUpdateTracker,
90};
91
92impl<AUTH> Client<AUTH>
94where
95 AUTH: TransactionAuthenticator + Sync + 'static,
96{
97 pub async fn get_sync_height(&self) -> Result<BlockNumber, ClientError> {
102 self.store.get_sync_height().await.map_err(Into::into)
103 }
104
105 pub async fn sync_chain(&mut self) -> Result<SyncSummary, ClientError> {
116 self.ensure_genesis_in_place().await?;
117 self.ensure_rpc_limits_in_place().await?;
118
119 let note_screener = self.note_screener();
121 let state_sync =
122 StateSync::new(self.rpc_api.clone(), Arc::new(note_screener), self.tx_discard_delta);
123 let input = self.build_sync_input().await?;
124
125 let mut partial_mmr = self.get_current_partial_mmr().await?;
126
127 let state_sync_update = state_sync.sync_state(&mut partial_mmr, input).await?;
129
130 let sync_summary: SyncSummary = (&state_sync_update).into();
131 debug!(sync_summary = ?sync_summary, "Sync summary computed");
132 info!("Applying changes to the store.");
133
134 self.store
136 .apply_state_sync(state_sync_update)
137 .await
138 .map_err(ClientError::StoreError)?;
139
140 self.cache_partial_mmr(partial_mmr).await?;
142
143 self.maybe_untrack_and_prune_irrelevant_blocks().await?;
144
145 Ok(sync_summary)
146 }
147
148 pub async fn sync_note_transport(&mut self) -> Result<Vec<NoteId>, ClientError> {
153 if !self.is_note_transport_enabled() {
154 return Ok(Vec::new());
155 }
156
157 if let Err(err) = self.flush_relay_outbox().await {
161 tracing::warn!(?err, "relay outbox flush failed during sync; entries retained");
162 }
163
164 let cursor = self.store.get_note_transport_cursor().await?;
165 let note_tags: Vec<_> = self.store.get_unique_note_tags().await?.into_iter().collect();
166 let (ids, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
167 self.store.update_note_transport_cursor(new_cursor).await?;
168 Ok(ids)
169 }
170
171 pub async fn sync_state(&mut self) -> Result<SyncSummary, ClientError> {
181 let new_private_notes = self.sync_note_transport().await?;
182 let mut summary = self.sync_chain().await?;
183 summary.new_private_notes = new_private_notes;
184 Ok(summary)
185 }
186
187 pub async fn build_sync_input(&self) -> Result<StateSyncInput, ClientError> {
192 let accounts = self
193 .store
194 .get_account_headers()
195 .await?
196 .into_iter()
197 .map(|(header, _status)| header)
198 .collect();
199
200 let note_tags = self.store.get_unique_note_tags().await?;
201
202 let input_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
203 let output_notes = self.store.get_output_notes(NoteFilter::Unspent).await?;
204
205 let uncommitted_transactions =
206 self.store.get_transactions(TransactionFilter::Uncommitted).await?;
207
208 Ok(StateSyncInput {
209 accounts,
210 note_tags,
211 input_notes,
212 output_notes,
213 uncommitted_transactions,
214 })
215 }
216
217 pub async fn apply_state_sync(&mut self, update: StateSyncUpdate) -> Result<(), ClientError> {
222 self.store.apply_state_sync(update).await?;
223
224 self.maybe_untrack_and_prune_irrelevant_blocks().await?;
225
226 Ok(())
227 }
228
229 async fn maybe_untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
232 let Some(interval) = self.irrelevant_block_prune_interval else {
233 return Ok(());
234 };
235
236 let sync_height = self.store.get_sync_height().await?;
237
238 if let Some(last_prune_height) = self.last_irrelevant_block_prune_sync_height
239 && sync_height < last_prune_height + interval
240 {
241 return Ok(());
242 }
243
244 self.untrack_and_prune_irrelevant_blocks().await?;
245 self.last_irrelevant_block_prune_sync_height = Some(sync_height);
246
247 Ok(())
248 }
249
250 async fn untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
258 let tracked_blocks = self.store.get_tracked_block_header_numbers().await?;
259 let to_untrack: Vec<usize> = if tracked_blocks.is_empty() {
260 Vec::new()
263 } else {
264 let unspent_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
266 let live_blocks: BTreeSet<usize> = unspent_notes
267 .iter()
268 .filter_map(|n| n.inclusion_proof().map(|p| p.location().block_num().as_usize()))
269 .collect();
270
271 tracked_blocks.difference(&live_blocks).copied().collect()
272 };
273
274 let mut blocks_to_untrack = Vec::new();
275 let mut nodes_to_remove = Vec::new();
276 let mut updated_partial_mmr = None;
277
278 if !to_untrack.is_empty() {
279 let mut partial_mmr = self.get_current_partial_mmr().await?;
282 for &block_pos in &to_untrack {
283 nodes_to_remove
284 .extend(partial_mmr.untrack(block_pos).into_iter().map(|(idx, _)| idx));
285 }
286
287 blocks_to_untrack = to_untrack
288 .iter()
289 .map(|&b| BlockNumber::from(u32::try_from(b).expect("block number fits in u32")))
290 .collect();
291 updated_partial_mmr = Some(partial_mmr);
292 }
293
294 self.store
297 .untrack_and_prune_irrelevant_blocks(&blocks_to_untrack, &nodes_to_remove)
298 .await?;
299
300 if let Some(partial_mmr) = updated_partial_mmr {
301 self.cache_partial_mmr(partial_mmr).await?;
302 }
303
304 Ok(())
305 }
306
307 pub async fn ensure_rpc_limits_in_place(&mut self) -> Result<(), ClientError> {
310 if self.rpc_api.has_rpc_limits().is_some() {
311 return Ok(());
312 }
313
314 let limits = self.rpc_api.get_rpc_limits().await?;
315 self.store.set_rpc_limits(limits).await?;
316 Ok(())
317 }
318}
319
320#[derive(Debug, PartialEq)]
325pub struct SyncSummary {
326 pub block_num: BlockNumber,
328 pub new_public_notes: Vec<NoteId>,
330 pub new_private_notes: Vec<NoteId>,
336 pub committed_notes: Vec<NoteId>,
338 pub consumed_notes: Vec<NoteId>,
340 pub updated_accounts: Vec<AccountId>,
342 pub locked_accounts: Vec<AccountId>,
344 pub committed_transactions: Vec<TransactionId>,
346}
347
348impl SyncSummary {
349 pub fn new(
350 block_num: BlockNumber,
351 new_public_notes: Vec<NoteId>,
352 new_private_notes: Vec<NoteId>,
353 committed_notes: Vec<NoteId>,
354 consumed_notes: Vec<NoteId>,
355 updated_accounts: Vec<AccountId>,
356 locked_accounts: Vec<AccountId>,
357 committed_transactions: Vec<TransactionId>,
358 ) -> Self {
359 Self {
360 block_num,
361 new_public_notes,
362 new_private_notes,
363 committed_notes,
364 consumed_notes,
365 updated_accounts,
366 locked_accounts,
367 committed_transactions,
368 }
369 }
370
371 pub fn new_empty(block_num: BlockNumber) -> Self {
372 Self {
373 block_num,
374 new_public_notes: vec![],
375 new_private_notes: vec![],
376 committed_notes: vec![],
377 consumed_notes: vec![],
378 updated_accounts: vec![],
379 locked_accounts: vec![],
380 committed_transactions: vec![],
381 }
382 }
383
384 pub fn is_empty(&self) -> bool {
385 self.new_public_notes.is_empty()
386 && self.new_private_notes.is_empty()
387 && self.committed_notes.is_empty()
388 && self.consumed_notes.is_empty()
389 && self.updated_accounts.is_empty()
390 && self.locked_accounts.is_empty()
391 && self.committed_transactions.is_empty()
392 }
393
394 pub fn combine_with(&mut self, mut other: Self) {
395 self.block_num = max(self.block_num, other.block_num);
396 self.new_public_notes.append(&mut other.new_public_notes);
397 self.new_private_notes.append(&mut other.new_private_notes);
398 self.committed_notes.append(&mut other.committed_notes);
399 self.consumed_notes.append(&mut other.consumed_notes);
400 self.updated_accounts.append(&mut other.updated_accounts);
401 self.locked_accounts.append(&mut other.locked_accounts);
402 self.committed_transactions.append(&mut other.committed_transactions);
403 }
404}
405
406impl Serializable for SyncSummary {
407 fn write_into<W: miden_tx::utils::serde::ByteWriter>(&self, target: &mut W) {
408 self.block_num.write_into(target);
409 self.new_public_notes.write_into(target);
410 self.new_private_notes.write_into(target);
411 self.committed_notes.write_into(target);
412 self.consumed_notes.write_into(target);
413 self.updated_accounts.write_into(target);
414 self.locked_accounts.write_into(target);
415 self.committed_transactions.write_into(target);
416 }
417}
418
419impl Deserializable for SyncSummary {
420 fn read_from<R: miden_tx::utils::serde::ByteReader>(
421 source: &mut R,
422 ) -> Result<Self, DeserializationError> {
423 let block_num = BlockNumber::read_from(source)?;
424 let new_public_notes = Vec::<NoteId>::read_from(source)?;
425 let new_private_notes = Vec::<NoteId>::read_from(source)?;
426 let committed_notes = Vec::<NoteId>::read_from(source)?;
427 let consumed_notes = Vec::<NoteId>::read_from(source)?;
428 let updated_accounts = Vec::<AccountId>::read_from(source)?;
429 let locked_accounts = Vec::<AccountId>::read_from(source)?;
430 let committed_transactions = Vec::<TransactionId>::read_from(source)?;
431
432 Ok(Self {
433 block_num,
434 new_public_notes,
435 new_private_notes,
436 committed_notes,
437 consumed_notes,
438 updated_accounts,
439 locked_accounts,
440 committed_transactions,
441 })
442 }
443}