1use alloc::collections::BTreeSet;
60use alloc::sync::Arc;
61use alloc::vec::Vec;
62use core::cmp::max;
63
64use miden_protocol::account::AccountId;
65use miden_protocol::block::BlockNumber;
66use miden_protocol::note::NoteId;
67use miden_protocol::transaction::TransactionId;
68use miden_tx::auth::TransactionAuthenticator;
69use miden_tx::utils::serde::{Deserializable, DeserializationError, Serializable};
70use tracing::{debug, info};
71
72use crate::pswap::PswapChainObserver;
73use crate::store::{NoteFilter, TransactionFilter};
74use crate::{Client, ClientError};
75mod block_header;
76
77mod tag;
78pub use tag::{NoteTagRecord, NoteTagSource};
79
80mod note_observer;
81pub use note_observer::NoteObserver;
82
83mod state_sync;
84pub use state_sync::{NoteUpdateAction, OnNoteReceived, StateSync, StateSyncInput};
85
86mod state_sync_update;
87pub use state_sync_update::{
88 AccountUpdates,
89 PartialBlockchainUpdates,
90 PublicAccountDelta,
91 PublicAccountUpdate,
92 StateSyncUpdate,
93 TransactionUpdateTracker,
94};
95
96impl<AUTH> Client<AUTH>
98where
99 AUTH: TransactionAuthenticator + Sync + 'static,
100{
101 pub async fn get_sync_height(&self) -> Result<BlockNumber, ClientError> {
106 self.store.get_sync_height().await.map_err(Into::into)
107 }
108
109 pub async fn sync_chain(&mut self) -> Result<SyncSummary, ClientError> {
120 self.ensure_genesis_in_place().await?;
121 self.ensure_rpc_limits_in_place().await?;
122
123 let note_screener = self.note_screener();
125 let state_sync =
126 StateSync::new(self.rpc_api.clone(), Arc::new(note_screener), self.tx_discard_delta)
127 .with_note_observer(Arc::new(PswapChainObserver::new(self.store.clone())));
128 let input = self.build_sync_input().await?;
129
130 let mut partial_mmr = self.get_current_partial_mmr().await?;
131
132 let state_sync_update = state_sync.sync_state(&mut partial_mmr, input).await?;
134
135 let sync_summary: SyncSummary = (&state_sync_update).into();
136 debug!(sync_summary = ?sync_summary, "Sync summary computed");
137
138 state_sync.run_apply_hooks(&state_sync_update).await?;
141
142 info!("Applying changes to the store.");
143
144 self.store
146 .apply_state_sync(state_sync_update)
147 .await
148 .map_err(ClientError::StoreError)?;
149
150 self.cache_partial_mmr(partial_mmr).await?;
152
153 self.maybe_untrack_and_prune_irrelevant_blocks().await?;
154
155 Ok(sync_summary)
156 }
157
158 pub async fn sync_note_transport(&mut self) -> Result<Vec<NoteId>, ClientError> {
163 if !self.is_note_transport_enabled() {
164 return Ok(Vec::new());
165 }
166
167 if let Err(err) = self.flush_relay_outbox().await {
171 tracing::warn!(?err, "relay outbox flush failed during sync; entries retained");
172 }
173
174 let cursor = self.store.get_note_transport_cursor().await?;
175 let note_tags: Vec<_> = self.store.get_unique_note_tags().await?.into_iter().collect();
176 let (ids, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
177 self.store.update_note_transport_cursor(new_cursor).await?;
178 Ok(ids)
179 }
180
181 pub async fn sync_state(&mut self) -> Result<SyncSummary, ClientError> {
191 let new_private_notes = self.sync_note_transport().await?;
192 let mut summary = self.sync_chain().await?;
193 summary.new_private_notes = new_private_notes;
194 Ok(summary)
195 }
196
197 pub async fn build_sync_input(&self) -> Result<StateSyncInput, ClientError> {
202 let accounts = self
203 .store
204 .get_account_headers()
205 .await?
206 .into_iter()
207 .map(|(header, _status)| header)
208 .collect();
209
210 let note_tags = self.store.get_unique_note_tags().await?;
211
212 let input_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
213 let output_notes = self.store.get_output_notes(NoteFilter::Unspent).await?;
214
215 let uncommitted_transactions =
216 self.store.get_transactions(TransactionFilter::Uncommitted).await?;
217
218 Ok(StateSyncInput {
219 accounts,
220 note_tags,
221 input_notes,
222 output_notes,
223 uncommitted_transactions,
224 })
225 }
226
227 pub async fn apply_state_sync(&mut self, update: StateSyncUpdate) -> Result<(), ClientError> {
232 self.store.apply_state_sync(update).await?;
233
234 self.maybe_untrack_and_prune_irrelevant_blocks().await?;
235
236 Ok(())
237 }
238
239 async fn maybe_untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
242 let Some(interval) = self.irrelevant_block_prune_interval else {
243 return Ok(());
244 };
245
246 let sync_height = self.store.get_sync_height().await?;
247
248 if let Some(last_prune_height) = self.last_irrelevant_block_prune_sync_height
249 && sync_height < last_prune_height + interval
250 {
251 return Ok(());
252 }
253
254 self.untrack_and_prune_irrelevant_blocks().await?;
255 self.last_irrelevant_block_prune_sync_height = Some(sync_height);
256
257 Ok(())
258 }
259
260 async fn untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
268 let tracked_blocks = self.store.get_tracked_block_header_numbers().await?;
269 let to_untrack: Vec<usize> = if tracked_blocks.is_empty() {
270 Vec::new()
273 } else {
274 let unspent_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
276 let live_blocks: BTreeSet<usize> = unspent_notes
277 .iter()
278 .filter_map(|n| n.inclusion_proof().map(|p| p.location().block_num().as_usize()))
279 .collect();
280
281 tracked_blocks.difference(&live_blocks).copied().collect()
282 };
283
284 let mut blocks_to_untrack = Vec::new();
285 let mut nodes_to_remove = Vec::new();
286 let mut updated_partial_mmr = None;
287
288 if !to_untrack.is_empty() {
289 let mut partial_mmr = self.get_current_partial_mmr().await?;
292 for &block_pos in &to_untrack {
293 nodes_to_remove
294 .extend(partial_mmr.untrack(block_pos).into_iter().map(|(idx, _)| idx));
295 }
296
297 blocks_to_untrack = to_untrack
298 .iter()
299 .map(|&b| BlockNumber::from(u32::try_from(b).expect("block number fits in u32")))
300 .collect();
301 updated_partial_mmr = Some(partial_mmr);
302 }
303
304 self.store
307 .untrack_and_prune_irrelevant_blocks(&blocks_to_untrack, &nodes_to_remove)
308 .await?;
309
310 if let Some(partial_mmr) = updated_partial_mmr {
311 self.cache_partial_mmr(partial_mmr).await?;
312 }
313
314 Ok(())
315 }
316
317 pub async fn ensure_rpc_limits_in_place(&mut self) -> Result<(), ClientError> {
320 if self.rpc_api.has_rpc_limits().is_some() {
321 return Ok(());
322 }
323
324 let limits = self.rpc_api.get_rpc_limits().await?;
325 self.store.set_rpc_limits(limits).await?;
326 Ok(())
327 }
328}
329
330#[derive(Debug, PartialEq)]
335pub struct SyncSummary {
336 pub block_num: BlockNumber,
338 pub new_public_notes: Vec<NoteId>,
340 pub new_private_notes: Vec<NoteId>,
346 pub committed_notes: Vec<NoteId>,
348 pub consumed_notes: Vec<NoteId>,
350 pub updated_accounts: Vec<AccountId>,
352 pub locked_accounts: Vec<AccountId>,
354 pub committed_transactions: Vec<TransactionId>,
356}
357
358impl SyncSummary {
359 pub fn new(
360 block_num: BlockNumber,
361 new_public_notes: Vec<NoteId>,
362 new_private_notes: Vec<NoteId>,
363 committed_notes: Vec<NoteId>,
364 consumed_notes: Vec<NoteId>,
365 updated_accounts: Vec<AccountId>,
366 locked_accounts: Vec<AccountId>,
367 committed_transactions: Vec<TransactionId>,
368 ) -> Self {
369 Self {
370 block_num,
371 new_public_notes,
372 new_private_notes,
373 committed_notes,
374 consumed_notes,
375 updated_accounts,
376 locked_accounts,
377 committed_transactions,
378 }
379 }
380
381 pub fn new_empty(block_num: BlockNumber) -> Self {
382 Self {
383 block_num,
384 new_public_notes: vec![],
385 new_private_notes: vec![],
386 committed_notes: vec![],
387 consumed_notes: vec![],
388 updated_accounts: vec![],
389 locked_accounts: vec![],
390 committed_transactions: vec![],
391 }
392 }
393
394 pub fn is_empty(&self) -> bool {
395 self.new_public_notes.is_empty()
396 && self.new_private_notes.is_empty()
397 && self.committed_notes.is_empty()
398 && self.consumed_notes.is_empty()
399 && self.updated_accounts.is_empty()
400 && self.locked_accounts.is_empty()
401 && self.committed_transactions.is_empty()
402 }
403
404 pub fn combine_with(&mut self, mut other: Self) {
405 self.block_num = max(self.block_num, other.block_num);
406 self.new_public_notes.append(&mut other.new_public_notes);
407 self.new_private_notes.append(&mut other.new_private_notes);
408 self.committed_notes.append(&mut other.committed_notes);
409 self.consumed_notes.append(&mut other.consumed_notes);
410 self.updated_accounts.append(&mut other.updated_accounts);
411 self.locked_accounts.append(&mut other.locked_accounts);
412 self.committed_transactions.append(&mut other.committed_transactions);
413 }
414}
415
416impl Serializable for SyncSummary {
417 fn write_into<W: miden_tx::utils::serde::ByteWriter>(&self, target: &mut W) {
418 self.block_num.write_into(target);
419 self.new_public_notes.write_into(target);
420 self.new_private_notes.write_into(target);
421 self.committed_notes.write_into(target);
422 self.consumed_notes.write_into(target);
423 self.updated_accounts.write_into(target);
424 self.locked_accounts.write_into(target);
425 self.committed_transactions.write_into(target);
426 }
427}
428
429impl Deserializable for SyncSummary {
430 fn read_from<R: miden_tx::utils::serde::ByteReader>(
431 source: &mut R,
432 ) -> Result<Self, DeserializationError> {
433 let block_num = BlockNumber::read_from(source)?;
434 let new_public_notes = Vec::<NoteId>::read_from(source)?;
435 let new_private_notes = Vec::<NoteId>::read_from(source)?;
436 let committed_notes = Vec::<NoteId>::read_from(source)?;
437 let consumed_notes = Vec::<NoteId>::read_from(source)?;
438 let updated_accounts = Vec::<AccountId>::read_from(source)?;
439 let locked_accounts = Vec::<AccountId>::read_from(source)?;
440 let committed_transactions = Vec::<TransactionId>::read_from(source)?;
441
442 Ok(Self {
443 block_num,
444 new_public_notes,
445 new_private_notes,
446 committed_notes,
447 consumed_notes,
448 updated_accounts,
449 locked_accounts,
450 committed_transactions,
451 })
452 }
453}