use alloc::borrow::ToOwned;
use alloc::boxed::Box;
use alloc::collections::{BTreeMap, BTreeSet};
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::error::Error;
use core::pin::Pin;
use miden_protocol::asset::{Asset, AssetVault};
use miden_protocol::vm::FutureMaybeSend;
type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
use miden_protocol::account::{
Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
};
use miden_protocol::address::NetworkId;
use miden_protocol::block::account_tree::AccountWitness;
use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
use miden_protocol::crypto::merkle::MerklePath;
use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
use miden_protocol::crypto::merkle::smt::SmtProof;
use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
use miden_protocol::utils::serde::Deserializable;
use miden_protocol::{EMPTY_WORD, Word};
use miden_tx::utils::serde::Serializable;
use miden_tx::utils::sync::RwLock;
use tonic::Status;
use tracing::info;
use super::domain::account::{
AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
};
use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
use super::generated::rpc::account_request::AccountDetailRequest;
use super::generated::rpc::AccountRequest;
use super::{
Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
RpcStatusInfo,
};
use crate::rpc::domain::sync::ChainMmrInfo;
use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
use crate::rpc::domain::transaction::TransactionsInfo;
use crate::rpc::errors::node::parse_node_error;
use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
use crate::rpc::generated::rpc::BlockRange;
use crate::rpc::domain::limits::RpcLimits;
use crate::rpc::{AccountStateAt, generated as proto};
mod api_client;
mod retry;
use api_client::api_client_wrapper::ApiClient;
struct BlockPagination {
current_block_from: BlockNumber,
block_to: Option<BlockNumber>,
iterations: u32,
}
enum PaginationResult {
Continue,
Done {
chain_tip: BlockNumber,
block_num: BlockNumber,
},
}
impl BlockPagination {
const MAX_ITERATIONS: u32 = 1000;
fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
Self {
current_block_from: block_from,
block_to,
iterations: 0,
}
}
fn current_block_from(&self) -> BlockNumber {
self.current_block_from
}
fn block_to(&self) -> Option<BlockNumber> {
self.block_to
}
fn advance(
&mut self,
block_num: BlockNumber,
chain_tip: BlockNumber,
) -> Result<PaginationResult, RpcError> {
if self.iterations >= Self::MAX_ITERATIONS {
return Err(RpcError::PaginationError(
"too many pagination iterations, possible infinite loop".to_owned(),
));
}
self.iterations += 1;
if block_num < self.current_block_from {
return Err(RpcError::PaginationError(
"invalid pagination: block_num went backwards".to_owned(),
));
}
let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
if block_num >= target_block {
return Ok(PaginationResult::Done { chain_tip, block_num });
}
self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
Ok(PaginationResult::Continue)
}
}
pub struct GrpcClient {
client: RwLock<Option<ApiClient>>,
endpoint: String,
timeout_ms: u64,
genesis_commitment: RwLock<Option<Word>>,
limits: RwLock<Option<RpcLimits>>,
max_retries: u32,
retry_interval_ms: u64,
}
impl GrpcClient {
pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
GrpcClient {
client: RwLock::new(None),
endpoint: endpoint.to_string(),
timeout_ms,
genesis_commitment: RwLock::new(None),
limits: RwLock::new(None),
max_retries: retry::DEFAULT_MAX_RETRIES,
retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
}
}
#[must_use]
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
#[must_use]
pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
self.retry_interval_ms = retry_interval_ms;
self
}
async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
if self.client.read().is_none() {
self.connect().await?;
}
Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
}
async fn connect(&self) -> Result<(), RpcError> {
let genesis_commitment = *self.genesis_commitment.read();
let new_client =
ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
.await?;
let mut client = self.client.write();
client.replace(new_client);
Ok(())
}
fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
let genesis_commitment = self
.genesis_commitment
.read()
.as_ref()
.map_or_else(|| "none".to_string(), Word::to_hex);
let context = AcceptHeaderContext {
client_version: env!("CARGO_PKG_VERSION").to_string(),
genesis_commitment,
};
RpcError::from_grpc_error_with_context(endpoint, status, context)
}
async fn call_with_retry<T: Send + 'static>(
&self,
endpoint: RpcEndpoint,
mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
) -> Result<tonic::Response<T>, RpcError> {
let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
loop {
let rpc_api = self.ensure_connected().await?;
match call(rpc_api).await {
Ok(response) => return Ok(response),
Err(status) if retry_state.should_retry(&status).await => {},
Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
}
}
}
pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
let mut rpc_api =
ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
.await?;
rpc_api
.status(())
.await
.map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
.map(tonic::Response::into_inner)
.and_then(RpcStatusInfo::try_from)
}
pub async fn fetch_full_account_proof(
&self,
account_id: AccountId,
) -> Result<(BlockNumber, AccountProof), RpcError> {
let has_public_state = account_id.has_public_state();
let account_request = {
AccountRequest {
account_id: Some(account_id.into()),
block_num: None,
details: {
if has_public_state {
Some(AccountDetailRequest {
code_commitment: Some(EMPTY_WORD.into()),
asset_vault_commitment: Some(EMPTY_WORD.into()),
storage_maps: vec![],
})
} else {
None
}
},
}
};
let account_response = self
.call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
let request = account_request.clone();
Box::pin(async move { rpc_api.get_account(request).await })
})
.await?
.into_inner();
let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
"GetAccountDetails returned an account without a matching block number for the witness"
.to_owned(),
))?;
let account_proof = {
if has_public_state {
let account_details = account_response
.details
.ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
.into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
let storage_header = account_details.storage_details.header;
let maps_to_request = storage_header
.slots()
.filter(|header| header.slot_type().is_map())
.map(|map| map.name().to_string());
let account_request = AccountRequest {
account_id: Some(account_id.into()),
block_num: Some(block_number),
details: Some(AccountDetailRequest {
code_commitment: Some(EMPTY_WORD.into()),
asset_vault_commitment: Some(EMPTY_WORD.into()),
storage_maps: maps_to_request
.map(|slot_name| StorageMapDetailRequest {
slot_name,
slot_data: Some(SlotData::AllEntries(true)),
})
.collect(),
}),
};
let response = self
.call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
let request = account_request.clone();
Box::pin(async move { rpc_api.get_account(request).await })
})
.await?;
response.into_inner().try_into()
} else {
account_response.try_into()
}
};
Ok((block_number.block_num.into(), account_proof?))
}
async fn build_storage_slots(
&self,
account_id: AccountId,
storage_details: &AccountStorageDetails,
block_to: Option<BlockNumber>,
) -> Result<Vec<StorageSlot>, RpcError> {
let mut slots = vec![];
let mut map_cache: Option<StorageMapInfo> = None;
for slot_header in storage_details.header.slots() {
match slot_header.slot_type() {
StorageSlotType::Value => {
slots.push(miden_protocol::account::StorageSlot::with_value(
slot_header.name().clone(),
slot_header.value(),
));
},
StorageSlotType::Map => {
let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
RpcError::ExpectedDataMissing(format!(
"slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
slot_header.name(),
)),
)?;
let storage_map = if map_details.too_many_entries {
let map_info = if let Some(ref info) = map_cache {
info
} else {
let fetched_data =
self.sync_storage_maps(0_u32.into(), block_to, account_id).await?;
map_cache.insert(fetched_data)
};
let mut sorted_updates: Vec<_> = map_info
.updates
.iter()
.filter(|slot_info| slot_info.slot_name == *slot_header.name())
.collect();
sorted_updates.sort_by_key(|u| u.block_num);
let map_entries: Vec<_> = sorted_updates
.into_iter()
.map(|u| (u.key, u.value))
.collect::<BTreeMap<_, _>>()
.into_iter()
.collect();
StorageMap::with_entries(map_entries)
} else {
map_details.entries.clone().into_storage_map().ok_or_else(|| {
RpcError::ExpectedDataMissing(
"expected AllEntries for full account fetch, got EntriesWithProofs"
.into(),
)
})?
}
.map_err(|err| {
RpcError::InvalidResponse(format!(
"the rpc api returned a non-valid map entry: {err}"
))
})?;
slots.push(miden_protocol::account::StorageSlot::with_map(
slot_header.name().clone(),
storage_map,
));
},
}
}
Ok(slots)
}
async fn fetch_full_vault(
&self,
account_id: AccountId,
block_to: Option<BlockNumber>,
) -> Result<Vec<Asset>, RpcError> {
let vault_info =
self.sync_account_vault(BlockNumber::from(0), block_to, account_id).await?;
let mut updates = vault_info.updates;
updates.sort_by_key(|u| u.block_num);
Ok(updates
.into_iter()
.map(|u| (u.vault_key, u.asset))
.collect::<BTreeMap<_, _>>()
.into_values()
.flatten()
.collect())
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl NodeRpcClient for GrpcClient {
fn has_genesis_commitment(&self) -> Option<Word> {
*self.genesis_commitment.read()
}
async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
if self.genesis_commitment.read().is_some() {
return Ok(());
}
self.genesis_commitment.write().replace(commitment);
let mut client_guard = self.client.write();
if let Some(client) = client_guard.as_mut() {
client.set_genesis_commitment(commitment);
}
Ok(())
}
async fn submit_proven_transaction(
&self,
proven_transaction: ProvenTransaction,
transaction_inputs: TransactionInputs,
) -> Result<BlockNumber, RpcError> {
let request = proto::transaction::ProvenTransaction {
transaction: proven_transaction.to_bytes(),
transaction_inputs: Some(transaction_inputs.to_bytes()),
};
let api_response = self
.call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.submit_proven_transaction(request).await })
})
.await?;
Ok(BlockNumber::from(api_response.into_inner().block_num))
}
async fn get_block_header_by_number(
&self,
block_num: Option<BlockNumber>,
include_mmr_proof: bool,
) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
let request = proto::rpc::BlockHeaderByNumberRequest {
block_num: block_num.as_ref().map(BlockNumber::as_u32),
include_mmr_proof: Some(include_mmr_proof),
};
info!("Calling GetBlockHeaderByNumber: {:?}", request);
let api_response = self
.call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
})
.await?;
let response = api_response.into_inner();
let block_header: BlockHeader = response
.block_header
.ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
.try_into()?;
let mmr_proof = if include_mmr_proof {
let forest = response
.chain_length
.ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
let merkle_path: MerklePath = response
.mmr_path
.ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
.try_into()?;
Some(MmrProof::new(
MmrPath::new(
Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
block_header.block_num().as_usize(),
merkle_path,
),
block_header.commitment(),
))
} else {
None
};
Ok((block_header, mmr_proof))
}
async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
let limits = self.get_rpc_limits().await?;
let mut notes = Vec::with_capacity(note_ids.len());
for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
let request = proto::note::NoteIdList {
ids: chunk.iter().map(|id| (*id).into()).collect(),
};
let api_response = self
.call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.get_notes_by_id(request).await })
})
.await?;
let response_notes = api_response
.into_inner()
.notes
.into_iter()
.map(FetchedNote::try_from)
.collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
notes.extend(response_notes);
}
Ok(notes)
}
async fn sync_chain_mmr(
&self,
block_from: BlockNumber,
block_to: Option<BlockNumber>,
) -> Result<ChainMmrInfo, RpcError> {
let block_range = Some(BlockRange {
block_from: block_from.as_u32(),
block_to: block_to.map(|b| b.as_u32()),
});
let request = proto::rpc::SyncChainMmrRequest {
block_range,
finality: proto::rpc::Finality::Committed as i32,
};
let response = self
.call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
})
.await?;
response.into_inner().try_into()
}
async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
let update_summary =
AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
if account_id.is_private() {
Ok(FetchedAccount::new_private(account_id, update_summary))
} else {
let details =
full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
"GetAccountDetails returned a public account without details".to_owned(),
))?;
let account_id = details.header.id();
let nonce = details.header.nonce();
let assets = if details.vault_details.too_many_assets {
self.fetch_full_vault(account_id, Some(block_number)).await?
} else {
details.vault_details.assets
};
let slots = self
.build_storage_slots(account_id, &details.storage_details, Some(block_number))
.await?;
let asset_vault = AssetVault::new(&assets).map_err(|err| {
RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
})?;
let account_storage = AccountStorage::new(slots).map_err(|err| {
RpcError::InvalidResponse(format!(
"api rpc returned non-valid storage slots: {err}"
))
})?;
let account =
Account::new(account_id, asset_vault, account_storage, details.code, nonce, None)
.map_err(|err| {
RpcError::InvalidResponse(format!(
"failed to instance an account from the rpc api response: {err}"
))
})?;
Ok(FetchedAccount::new_public(account, update_summary))
}
}
async fn get_account_proof(
&self,
account_id: AccountId,
storage_requirements: AccountStorageRequirements,
account_state: AccountStateAt,
known_account_code: Option<AccountCode>,
known_vault_commitment: Option<Word>,
) -> Result<(BlockNumber, AccountProof), RpcError> {
let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
if let Some(account_code) = known_account_code {
known_codes_by_commitment.insert(account_code.commitment(), account_code);
}
let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
let account_details = if account_id.has_public_state() {
Some(AccountDetailRequest {
code_commitment: Some(EMPTY_WORD.into()),
asset_vault_commitment: known_vault_commitment.map(Into::into),
storage_maps,
})
} else {
None
};
let block_num = match account_state {
AccountStateAt::Block(number) => Some(number.into()),
AccountStateAt::ChainTip => None,
};
let request = AccountRequest {
account_id: Some(account_id.into()),
block_num,
details: account_details,
};
let response = self
.call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.get_account(request).await })
})
.await?
.into_inner();
let account_witness: AccountWitness = response
.witness
.ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
.try_into()?;
let block_num: BlockNumber = response
.block_num
.ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
.block_num
.into();
let headers = if account_witness.id().has_public_state() {
let mut details = response
.details
.ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
.into_domain(&known_codes_by_commitment, &storage_requirements)?;
if details.vault_details.too_many_assets {
details.vault_details.assets =
self.fetch_full_vault(account_id, Some(block_num)).await?;
}
Some(details)
} else {
None
};
let proof = AccountProof::new(account_witness, headers)
.map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
Ok((block_num, proof))
}
async fn sync_notes(
&self,
block_num: BlockNumber,
block_to: Option<BlockNumber>,
note_tags: &BTreeSet<NoteTag>,
) -> Result<NoteSyncInfo, RpcError> {
let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
let block_range = Some(BlockRange {
block_from: block_num.as_u32(),
block_to: block_to.map(|b| b.as_u32()),
});
let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
let response = self
.call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.sync_notes(request).await })
})
.await?;
response.into_inner().try_into()
}
async fn sync_nullifiers(
&self,
prefixes: &[u16],
block_num: BlockNumber,
block_to: Option<BlockNumber>,
) -> Result<Vec<NullifierUpdate>, RpcError> {
const MAX_ITERATIONS: u32 = 1000;
let limits = self.get_rpc_limits().await?;
let mut all_nullifiers = BTreeSet::new();
'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
let mut current_block_from = block_num.as_u32();
for _ in 0..MAX_ITERATIONS {
let request = proto::rpc::SyncNullifiersRequest {
nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
prefix_len: 16,
block_range: Some(BlockRange {
block_from: current_block_from,
block_to: block_to.map(|b| b.as_u32()),
}),
};
let response = self
.call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.sync_nullifiers(request).await })
})
.await?;
let response = response.into_inner();
let batch_nullifiers = response
.nullifiers
.iter()
.map(TryFrom::try_from)
.collect::<Result<Vec<NullifierUpdate>, _>>()
.map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
all_nullifiers.extend(batch_nullifiers);
if let Some(page) = response.pagination_info {
if page.block_num < current_block_from {
return Err(RpcError::PaginationError(
"invalid pagination: block_num went backwards".to_string(),
));
}
let target_block =
block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
if page.block_num >= target_block {
continue 'chunk_nullifiers;
}
current_block_from = page.block_num + 1;
}
}
return Err(RpcError::PaginationError(
"too many pagination iterations, possible infinite loop".to_string(),
));
}
Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
}
async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
let limits = self.get_rpc_limits().await?;
let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
let request = proto::rpc::NullifierList {
nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
};
let response = self
.call_with_retry(RpcEndpoint::CheckNullifiers, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.check_nullifiers(request).await })
})
.await?;
let mut response = response.into_inner();
let chunk_proofs = response
.proofs
.iter_mut()
.map(|r| r.to_owned().try_into())
.collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
proofs.extend(chunk_proofs);
}
Ok(proofs)
}
async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
let response = self
.call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
Box::pin(async move { rpc_api.get_block_by_number(request).await })
})
.await?;
let response = response.into_inner();
let block =
ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
"GetBlockByNumberResponse.block".to_string(),
))?)?;
Ok(block)
}
async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
let response = self
.call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
})
.await?;
let response = response.into_inner();
let note_script = NoteScript::try_from(
response
.script
.ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
)?;
Ok(note_script)
}
async fn sync_storage_maps(
&self,
block_from: BlockNumber,
block_to: Option<BlockNumber>,
account_id: AccountId,
) -> Result<StorageMapInfo, RpcError> {
let mut pagination = BlockPagination::new(block_from, block_to);
let mut updates = Vec::new();
let (chain_tip, block_number) = loop {
let request = proto::rpc::SyncAccountStorageMapsRequest {
block_range: Some(BlockRange {
block_from: pagination.current_block_from().as_u32(),
block_to: pagination.block_to().map(|block| block.as_u32()),
}),
account_id: Some(account_id.into()),
};
let response = self
.call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
})
.await?;
let response = response.into_inner();
let page = response
.pagination_info
.ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
let page_block_num = BlockNumber::from(page.block_num);
let page_chain_tip = BlockNumber::from(page.chain_tip);
let batch = response
.updates
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<StorageMapUpdate>, _>>()?;
updates.extend(batch);
match pagination.advance(page_block_num, page_chain_tip)? {
PaginationResult::Continue => {},
PaginationResult::Done {
chain_tip: final_chain_tip,
block_num: final_block_num,
} => break (final_chain_tip, final_block_num),
}
};
Ok(StorageMapInfo { chain_tip, block_number, updates })
}
async fn sync_account_vault(
&self,
block_from: BlockNumber,
block_to: Option<BlockNumber>,
account_id: AccountId,
) -> Result<AccountVaultInfo, RpcError> {
let mut pagination = BlockPagination::new(block_from, block_to);
let mut updates = Vec::new();
let (chain_tip, block_number) = loop {
let request = proto::rpc::SyncAccountVaultRequest {
block_range: Some(BlockRange {
block_from: pagination.current_block_from().as_u32(),
block_to: pagination.block_to().map(|block| block.as_u32()),
}),
account_id: Some(account_id.into()),
};
let response = self
.call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.sync_account_vault(request).await })
})
.await?;
let response = response.into_inner();
let page = response
.pagination_info
.ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
let page_block_num = BlockNumber::from(page.block_num);
let page_chain_tip = BlockNumber::from(page.chain_tip);
let batch = response
.updates
.iter()
.map(|u| (*u).try_into())
.collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
updates.extend(batch);
match pagination.advance(page_block_num, page_chain_tip)? {
PaginationResult::Continue => {},
PaginationResult::Done {
chain_tip: final_chain_tip,
block_num: final_block_num,
} => break (final_chain_tip, final_block_num),
}
};
Ok(AccountVaultInfo { chain_tip, block_number, updates })
}
async fn sync_transactions(
&self,
block_from: BlockNumber,
block_to: Option<BlockNumber>,
account_ids: Vec<AccountId>,
) -> Result<TransactionsInfo, RpcError> {
let block_range = Some(BlockRange {
block_from: block_from.as_u32(),
block_to: block_to.map(|b| b.as_u32()),
});
let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
let response = self
.call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
let request = request.clone();
Box::pin(async move { rpc_api.sync_transactions(request).await })
})
.await?;
response.into_inner().try_into()
}
async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
let endpoint: Endpoint =
Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
Ok(endpoint.to_network_id())
}
async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
if let Some(limits) = *self.limits.read() {
return Ok(limits);
}
let response = self
.call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
Box::pin(async move { rpc_api.get_limits(()).await })
})
.await?;
let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
self.limits.write().replace(limits);
Ok(limits)
}
fn has_rpc_limits(&self) -> Option<RpcLimits> {
*self.limits.read()
}
async fn set_rpc_limits(&self, limits: RpcLimits) {
self.limits.write().replace(limits);
}
async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
GrpcClient::get_status_unversioned(self).await
}
}
impl RpcError {
pub fn from_grpc_error_with_context(
endpoint: RpcEndpoint,
status: Status,
context: AcceptHeaderContext,
) -> Self {
if let Some(accept_error) =
AcceptHeaderError::try_from_message_with_context(status.message(), context)
{
return Self::AcceptHeaderError(accept_error);
}
let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
let error_kind = GrpcError::from(&status);
let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
Self::RequestError {
endpoint,
error_kind,
endpoint_error,
source: Some(source),
}
}
}
impl From<&Status> for GrpcError {
fn from(status: &Status) -> Self {
GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
}
}
#[cfg(test)]
mod tests {
use std::boxed::Box;
use miden_protocol::Word;
use miden_protocol::block::BlockNumber;
use super::{BlockPagination, GrpcClient, PaginationResult};
use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
fn assert_send_sync<T: Send + Sync>() {}
#[test]
fn is_send_sync() {
assert_send_sync::<GrpcClient>();
assert_send_sync::<Box<dyn NodeRpcClient>>();
}
#[test]
fn block_pagination_errors_when_block_num_goes_backwards() {
let mut pagination = BlockPagination::new(10_u32.into(), None);
let res = pagination.advance(9_u32.into(), 20_u32.into());
assert!(matches!(res, Err(RpcError::PaginationError(_))));
}
#[test]
fn block_pagination_errors_after_max_iterations() {
let mut pagination = BlockPagination::new(0_u32.into(), None);
let chain_tip: BlockNumber = 10_000_u32.into();
for _ in 0..BlockPagination::MAX_ITERATIONS {
let current = pagination.current_block_from();
let res = pagination
.advance(current, chain_tip)
.expect("expected pagination to continue within iteration limit");
assert!(matches!(res, PaginationResult::Continue));
}
let res = pagination.advance(pagination.current_block_from(), chain_tip);
assert!(matches!(res, Err(RpcError::PaginationError(_))));
}
#[test]
fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
let res = pagination
.advance(30_u32.into(), 30_u32.into())
.expect("expected pagination to succeed");
assert!(matches!(
res,
PaginationResult::Done {
chain_tip,
block_num
} if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
));
}
#[test]
fn block_pagination_advances_cursor_by_one() {
let mut pagination = BlockPagination::new(5_u32.into(), None);
let res = pagination
.advance(5_u32.into(), 100_u32.into())
.expect("expected pagination to succeed");
assert!(matches!(res, PaginationResult::Continue));
assert_eq!(pagination.current_block_from().as_u32(), 6);
}
async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
let res = client.get_block_header_by_number(None, false).await;
assert!(res.is_ok());
}
#[tokio::test]
async fn future_is_send() {
let endpoint = &Endpoint::devnet();
let client = GrpcClient::new(endpoint, 10000);
let client: Box<GrpcClient> = client.into();
tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
}
#[tokio::test]
async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
let endpoint = &Endpoint::devnet();
let client = GrpcClient::new(endpoint, 10000);
assert!(client.genesis_commitment.read().is_none());
let commitment = Word::default();
client.set_genesis_commitment(commitment).await.unwrap();
assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
}
#[tokio::test]
async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
use miden_protocol::Felt;
let endpoint = &Endpoint::devnet();
let client = GrpcClient::new(endpoint, 10000);
let initial_commitment = Word::default();
client.set_genesis_commitment(initial_commitment).await.unwrap();
let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
client.set_genesis_commitment(new_commitment).await.unwrap();
assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
}
#[tokio::test]
async fn set_genesis_commitment_updates_the_client_if_already_connected() {
let endpoint = &Endpoint::devnet();
let client = GrpcClient::new(endpoint, 10000);
client.connect().await.unwrap();
let commitment = Word::default();
client.set_genesis_commitment(commitment).await.unwrap();
assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
assert!(client.client.read().as_ref().is_some());
}
}