use alloc::collections::BTreeSet;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::cmp::max;
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::NoteId;
use miden_protocol::transaction::TransactionId;
use miden_tx::auth::TransactionAuthenticator;
use miden_tx::utils::serde::{Deserializable, DeserializationError, Serializable};
use tracing::{debug, info};
use crate::pswap::PswapChainObserver;
use crate::store::{NoteFilter, TransactionFilter};
use crate::{Client, ClientError};
mod block_header;
mod tag;
pub use tag::{NoteTagRecord, NoteTagSource};
mod note_observer;
pub use note_observer::NoteObserver;
mod state_sync;
pub use state_sync::{NoteUpdateAction, OnNoteReceived, StateSync, StateSyncInput};
mod state_sync_update;
pub use state_sync_update::{
AccountUpdates,
PartialBlockchainUpdates,
PublicAccountDelta,
PublicAccountUpdate,
StateSyncUpdate,
TransactionUpdateTracker,
};
impl<AUTH> Client<AUTH>
where
AUTH: TransactionAuthenticator + Sync + 'static,
{
pub async fn get_sync_height(&self) -> Result<BlockNumber, ClientError> {
self.store.get_sync_height().await.map_err(Into::into)
}
pub async fn sync_chain(&mut self) -> Result<SyncSummary, ClientError> {
self.ensure_genesis_in_place().await?;
self.ensure_rpc_limits_in_place().await?;
let note_screener = self.note_screener();
let state_sync =
StateSync::new(self.rpc_api.clone(), Arc::new(note_screener), self.tx_discard_delta)
.with_note_observer(Arc::new(PswapChainObserver::new(self.store.clone())));
let input = self.build_sync_input().await?;
let mut partial_mmr = self.get_current_partial_mmr().await?;
let state_sync_update = state_sync.sync_state(&mut partial_mmr, input).await?;
let sync_summary: SyncSummary = (&state_sync_update).into();
debug!(sync_summary = ?sync_summary, "Sync summary computed");
state_sync.run_apply_hooks(&state_sync_update).await?;
info!("Applying changes to the store.");
self.store
.apply_state_sync(state_sync_update)
.await
.map_err(ClientError::StoreError)?;
self.cache_partial_mmr(partial_mmr).await?;
self.maybe_untrack_and_prune_irrelevant_blocks().await?;
Ok(sync_summary)
}
pub async fn sync_note_transport(&mut self) -> Result<Vec<NoteId>, ClientError> {
if !self.is_note_transport_enabled() {
return Ok(Vec::new());
}
if let Err(err) = self.flush_relay_outbox().await {
tracing::warn!(?err, "relay outbox flush failed during sync; entries retained");
}
let cursor = self.store.get_note_transport_cursor().await?;
let note_tags: Vec<_> = self.store.get_unique_note_tags().await?.into_iter().collect();
let (ids, new_cursor) = self.fetch_transport_notes(cursor, ¬e_tags).await?;
self.store.update_note_transport_cursor(new_cursor).await?;
Ok(ids)
}
pub async fn sync_state(&mut self) -> Result<SyncSummary, ClientError> {
let new_private_notes = self.sync_note_transport().await?;
let mut summary = self.sync_chain().await?;
summary.new_private_notes = new_private_notes;
Ok(summary)
}
pub async fn build_sync_input(&self) -> Result<StateSyncInput, ClientError> {
let accounts = self
.store
.get_account_headers()
.await?
.into_iter()
.map(|(header, _status)| header)
.collect();
let note_tags = self.store.get_unique_note_tags().await?;
let input_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
let output_notes = self.store.get_output_notes(NoteFilter::Unspent).await?;
let uncommitted_transactions =
self.store.get_transactions(TransactionFilter::Uncommitted).await?;
Ok(StateSyncInput {
accounts,
note_tags,
input_notes,
output_notes,
uncommitted_transactions,
})
}
pub async fn apply_state_sync(&mut self, update: StateSyncUpdate) -> Result<(), ClientError> {
self.store.apply_state_sync(update).await?;
self.maybe_untrack_and_prune_irrelevant_blocks().await?;
Ok(())
}
async fn maybe_untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
let Some(interval) = self.irrelevant_block_prune_interval else {
return Ok(());
};
let sync_height = self.store.get_sync_height().await?;
if let Some(last_prune_height) = self.last_irrelevant_block_prune_sync_height
&& sync_height < last_prune_height + interval
{
return Ok(());
}
self.untrack_and_prune_irrelevant_blocks().await?;
self.last_irrelevant_block_prune_sync_height = Some(sync_height);
Ok(())
}
async fn untrack_and_prune_irrelevant_blocks(&mut self) -> Result<(), ClientError> {
let tracked_blocks = self.store.get_tracked_block_header_numbers().await?;
let to_untrack: Vec<usize> = if tracked_blocks.is_empty() {
Vec::new()
} else {
let unspent_notes = self.store.get_input_notes(NoteFilter::Unspent).await?;
let live_blocks: BTreeSet<usize> = unspent_notes
.iter()
.filter_map(|n| n.inclusion_proof().map(|p| p.location().block_num().as_usize()))
.collect();
tracked_blocks.difference(&live_blocks).copied().collect()
};
let mut blocks_to_untrack = Vec::new();
let mut nodes_to_remove = Vec::new();
let mut updated_partial_mmr = None;
if !to_untrack.is_empty() {
let mut partial_mmr = self.get_current_partial_mmr().await?;
for &block_pos in &to_untrack {
nodes_to_remove
.extend(partial_mmr.untrack(block_pos).into_iter().map(|(idx, _)| idx));
}
blocks_to_untrack = to_untrack
.iter()
.map(|&b| BlockNumber::from(u32::try_from(b).expect("block number fits in u32")))
.collect();
updated_partial_mmr = Some(partial_mmr);
}
self.store
.untrack_and_prune_irrelevant_blocks(&blocks_to_untrack, &nodes_to_remove)
.await?;
if let Some(partial_mmr) = updated_partial_mmr {
self.cache_partial_mmr(partial_mmr).await?;
}
Ok(())
}
pub async fn ensure_rpc_limits_in_place(&mut self) -> Result<(), ClientError> {
if self.rpc_api.has_rpc_limits().is_some() {
return Ok(());
}
let limits = self.rpc_api.get_rpc_limits().await?;
self.store.set_rpc_limits(limits).await?;
Ok(())
}
}
#[derive(Debug, PartialEq)]
pub struct SyncSummary {
pub block_num: BlockNumber,
pub new_public_notes: Vec<NoteId>,
pub new_private_notes: Vec<NoteId>,
pub committed_notes: Vec<NoteId>,
pub consumed_notes: Vec<NoteId>,
pub updated_accounts: Vec<AccountId>,
pub locked_accounts: Vec<AccountId>,
pub committed_transactions: Vec<TransactionId>,
}
impl SyncSummary {
pub fn new(
block_num: BlockNumber,
new_public_notes: Vec<NoteId>,
new_private_notes: Vec<NoteId>,
committed_notes: Vec<NoteId>,
consumed_notes: Vec<NoteId>,
updated_accounts: Vec<AccountId>,
locked_accounts: Vec<AccountId>,
committed_transactions: Vec<TransactionId>,
) -> Self {
Self {
block_num,
new_public_notes,
new_private_notes,
committed_notes,
consumed_notes,
updated_accounts,
locked_accounts,
committed_transactions,
}
}
pub fn new_empty(block_num: BlockNumber) -> Self {
Self {
block_num,
new_public_notes: vec![],
new_private_notes: vec![],
committed_notes: vec![],
consumed_notes: vec![],
updated_accounts: vec![],
locked_accounts: vec![],
committed_transactions: vec![],
}
}
pub fn is_empty(&self) -> bool {
self.new_public_notes.is_empty()
&& self.new_private_notes.is_empty()
&& self.committed_notes.is_empty()
&& self.consumed_notes.is_empty()
&& self.updated_accounts.is_empty()
&& self.locked_accounts.is_empty()
&& self.committed_transactions.is_empty()
}
pub fn combine_with(&mut self, mut other: Self) {
self.block_num = max(self.block_num, other.block_num);
self.new_public_notes.append(&mut other.new_public_notes);
self.new_private_notes.append(&mut other.new_private_notes);
self.committed_notes.append(&mut other.committed_notes);
self.consumed_notes.append(&mut other.consumed_notes);
self.updated_accounts.append(&mut other.updated_accounts);
self.locked_accounts.append(&mut other.locked_accounts);
self.committed_transactions.append(&mut other.committed_transactions);
}
}
impl Serializable for SyncSummary {
fn write_into<W: miden_tx::utils::serde::ByteWriter>(&self, target: &mut W) {
self.block_num.write_into(target);
self.new_public_notes.write_into(target);
self.new_private_notes.write_into(target);
self.committed_notes.write_into(target);
self.consumed_notes.write_into(target);
self.updated_accounts.write_into(target);
self.locked_accounts.write_into(target);
self.committed_transactions.write_into(target);
}
}
impl Deserializable for SyncSummary {
fn read_from<R: miden_tx::utils::serde::ByteReader>(
source: &mut R,
) -> Result<Self, DeserializationError> {
let block_num = BlockNumber::read_from(source)?;
let new_public_notes = Vec::<NoteId>::read_from(source)?;
let new_private_notes = Vec::<NoteId>::read_from(source)?;
let committed_notes = Vec::<NoteId>::read_from(source)?;
let consumed_notes = Vec::<NoteId>::read_from(source)?;
let updated_accounts = Vec::<AccountId>::read_from(source)?;
let locked_accounts = Vec::<AccountId>::read_from(source)?;
let committed_transactions = Vec::<TransactionId>::read_from(source)?;
Ok(Self {
block_num,
new_public_notes,
new_private_notes,
committed_notes,
consumed_notes,
updated_accounts,
locked_accounts,
committed_transactions,
})
}
}