use std::collections::BTreeSet;
use std::time::Duration;
use backon::ExponentialBuilder;
use futures::Stream;
use futures::stream::TryStreamExt;
use miden_node_proto::clients::{Builder, RpcClient as InnerRpcClient};
use miden_node_proto::domain::account::{
AccountDetails, AccountResponse, AccountVaultDetails, StorageMapEntries
};
use miden_node_proto::errors::ConversionError;
use miden_node_proto::generated::rpc::account_request::account_detail_request::{StorageMapDetailRequest, StorageMapDetailRequests, StorageRequest, storage_map_detail_request};
use miden_node_proto::generated::rpc::account_request::account_detail_request::storage_map_detail_request::MapKeys;
use miden_node_proto::generated::rpc::{BlockSubscriptionRequest, BlockSubscriptionResponse};
use miden_node_proto::generated::{self as proto};
use miden_node_utils::ErrorReport;
use miden_node_utils::retry::{self, Retryable};
use miden_protocol::Word;
use miden_protocol::account::{
AccountCode,
AccountId,
PartialAccount,
PartialStorage,
StorageMapKey,
StorageMapWitness,
StorageSlotName,
};
use miden_protocol::asset::{Asset, AssetVault, AssetVaultKey, AssetWitness, PartialVault};
use miden_protocol::block::{BlockNumber, SignedBlock};
use miden_protocol::note::NoteScript;
use miden_protocol::transaction::{AccountInputs, ProvenTransaction, TransactionInputs};
use miden_protocol::utils::serde::{Deserializable, Serializable};
use thiserror::Error;
use tonic::Status;
use tonic::metadata::AsciiMetadataValue;
use tracing::{info, instrument};
use url::Url;
use crate::COMPONENT;
#[derive(Clone, Debug)]
pub struct RpcClient {
inner: InnerRpcClient,
backoff: ExponentialBuilder,
}
impl RpcClient {
pub fn new(
rpc_url: Url,
genesis_commitment: Word,
backoff_initial: Duration,
backoff_max: Duration,
) -> anyhow::Result<Self> {
Self::new_with_auth(rpc_url, None, genesis_commitment, backoff_initial, backoff_max)
}
pub fn new_with_auth(
rpc_url: Url,
rpc_auth_header_value: Option<AsciiMetadataValue>,
genesis_commitment: Word,
backoff_initial: Duration,
backoff_max: Duration,
) -> anyhow::Result<Self> {
info!(target: COMPONENT, rpc_endpoint = %rpc_url, "Initializing RPC client");
let builder = Builder::new(rpc_url)
.with_tls()?
.without_timeout()
.without_metadata_version()
.with_metadata_genesis(genesis_commitment.to_hex());
let builder = match rpc_auth_header_value {
Some(value) => builder.with_auth_header_value(value),
None => builder.without_auth_header(),
};
let rpc = builder.with_otel_context_injection().connect_lazy::<InnerRpcClient>();
let backoff = retry::exponential(backoff_initial, backoff_max);
Ok(Self { inner: rpc, backoff })
}
#[instrument(
target = COMPONENT,
name = "rpc.client.block_subscription_with_retry",
skip_all,
fields(%block_from),
err,
)]
pub async fn block_subscription_with_retry(
&self,
block_from: BlockNumber,
) -> Result<
impl Stream<Item = Result<(SignedBlock, BlockNumber), RpcError>> + Send + 'static,
RpcError,
> {
(|| async move {
let request =
tonic::Request::new(BlockSubscriptionRequest { block_from: block_from.as_u32() });
let stream = self
.inner
.clone()
.block_subscription(request)
.await
.map_err(RpcError::GrpcClientError)?
.into_inner();
Ok(stream
.map_err(RpcError::GrpcClientError)
.and_then(|response| async move { decode_block_subscription_response(&response) }))
})
.retry(self.backoff)
.notify(|err: &RpcError, dur| {
tracing::warn!(
target: COMPONENT,
sleep_ms = dur.as_millis() as u64,
err = %err.as_report(),
"RPC connection failed while opening block subscription, retrying",
);
})
.await
}
#[instrument(target = COMPONENT, name = "ntx.rpc.client.submit_proven_tx", skip_all, err)]
pub async fn submit_proven_tx(
&self,
proven_tx: &ProvenTransaction,
tx_inputs: &TransactionInputs,
) -> Result<(), Status> {
let request = proto::transaction::ProvenTransaction {
transaction: proven_tx.to_bytes(),
transaction_inputs: Some(tx_inputs.to_bytes()),
};
self.inner.clone().submit_proven_tx(request).await?;
Ok(())
}
}
fn decode_block_subscription_response(
response: &BlockSubscriptionResponse,
) -> Result<(SignedBlock, BlockNumber), RpcError> {
let block = SignedBlock::read_from_bytes(&response.block).map_err(RpcError::Deserialize)?;
let committed_tip = BlockNumber::from(response.committed_chain_tip);
Ok((block, committed_tip))
}
impl RpcClient {
pub async fn get_account_inputs(
&self,
account_id: AccountId,
block_num: BlockNumber,
) -> Result<AccountInputs, RpcError> {
let request = proto::rpc::AccountRequest {
account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }),
block_num: Some(block_num.into()),
details: Some(proto::rpc::account_request::AccountDetailRequest {
code_commitment: Some(Word::default().into()),
asset_vault_commitment: None, storage_request: None,
}),
};
let response = self.get_account(request).await?;
let details = response.details.as_ref().ok_or_else(|| {
RpcError::InvalidResponse("response did not include account details".into())
})?;
let partial_account = build_minimal_partial_account(details)?;
Ok(AccountInputs::new(partial_account, response.witness))
}
pub async fn get_vault_asset_witnesses(
&self,
account_id: AccountId,
vault_keys: BTreeSet<AssetVaultKey>,
block_num: Option<BlockNumber>,
) -> Result<Vec<AssetWitness>, RpcError> {
if vault_keys.is_empty() {
return Ok(Vec::new());
}
let request = proto::rpc::AccountRequest {
account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }),
block_num: block_num.map(Into::into),
details: Some(proto::rpc::account_request::AccountDetailRequest {
code_commitment: None,
asset_vault_commitment: Some(Word::default().into()),
storage_request: None,
}),
};
let response = self.get_account(request).await?;
let assets: Vec<Asset> = match response.details.map(|details| details.vault_details) {
Some(AccountVaultDetails::Assets(assets)) => assets,
Some(AccountVaultDetails::LimitExceeded) => {
panic!("should never exceed maximum number of requested keys")
},
None => Vec::new(),
};
let vault =
AssetVault::new(&assets).map_err(|err| RpcError::InvalidResponse(err.as_report()))?;
Ok(vault_keys.into_iter().map(|key| vault.open(key)).collect())
}
pub async fn get_storage_map_witness(
&self,
account_id: AccountId,
slot_name: StorageSlotName,
map_key: StorageMapKey,
block_num: Option<BlockNumber>,
) -> Result<StorageMapWitness, RpcError> {
let request = proto::rpc::AccountRequest {
account_id: Some(proto::account::AccountId { id: account_id.to_bytes() }),
block_num: block_num.map(Into::into),
details: Some(proto::rpc::account_request::AccountDetailRequest {
code_commitment: None,
asset_vault_commitment: None,
storage_request: Some(StorageRequest::StorageMaps(StorageMapDetailRequests {
storage_maps: vec![StorageMapDetailRequest {
slot_name: slot_name.to_string(),
slot_data: Some(storage_map_detail_request::SlotData::MapKeys(MapKeys {
map_keys: vec![map_key.into()],
})),
}],
})),
}),
};
let response = self.get_account(request).await?;
let details = response.details.as_ref().ok_or_else(|| {
RpcError::InvalidResponse("response did not include account details".into())
})?;
let map_details = details
.storage_details
.map_details
.iter()
.find(|detail| detail.slot_name == slot_name)
.ok_or_else(|| {
RpcError::InvalidResponse(format!(
"response is missing storage map details for slot {slot_name}"
))
})?;
let StorageMapEntries::EntriesWithProofs(proofs) = &map_details.entries else {
return Err(RpcError::InvalidResponse(
"response did not include storage map entry proofs".into(),
));
};
let proof = proofs.first().cloned().ok_or_else(|| {
RpcError::InvalidResponse(
"response did not include a proof for the requested key".into(),
)
})?;
StorageMapWitness::new(proof, [map_key])
.map_err(|err| RpcError::InvalidResponse(err.as_report()))
}
#[instrument(target = COMPONENT, name = "ntx.rpc.client.get_note_script_by_root", skip_all, err)]
pub async fn get_note_script_by_root(
&self,
script_root: Word,
) -> Result<Option<NoteScript>, RpcError> {
let request = proto::note::NoteScriptRoot { root: Some(script_root.into()) };
let script = self
.inner
.clone()
.get_note_script_by_root(request)
.await
.map_err(RpcError::GrpcClientError)?
.into_inner()
.script;
script.map(NoteScript::try_from).transpose().map_err(RpcError::Conversion)
}
async fn get_account(
&self,
request: proto::rpc::AccountRequest,
) -> Result<AccountResponse, RpcError> {
let response = self
.inner
.clone()
.get_account(request)
.await
.map_err(RpcError::GrpcClientError)?
.into_inner();
AccountResponse::try_from(response).map_err(RpcError::Conversion)
}
}
fn build_minimal_partial_account(details: &AccountDetails) -> Result<PartialAccount, RpcError> {
let code_bytes = details
.account_code
.as_ref()
.ok_or_else(|| RpcError::InvalidResponse("response did not include account code".into()))?;
let account_code = AccountCode::read_from_bytes(code_bytes).map_err(RpcError::Deserialize)?;
let partial_storage = PartialStorage::new(details.storage_details.header.clone(), [])
.map_err(|err| RpcError::InvalidResponse(err.as_report()))?;
let partial_vault = PartialVault::new(details.account_header.vault_root());
PartialAccount::new(
details.account_header.id(),
details.account_header.nonce(),
account_code,
partial_storage,
partial_vault,
None,
)
.map_err(|err| RpcError::InvalidResponse(err.as_report()))
}
#[derive(Debug, Error)]
pub enum RpcError {
#[error("RPC gRPC call failed")]
GrpcClientError(#[source] tonic::Status),
#[error("failed to deserialize RPC payload")]
Deserialize(#[source] miden_protocol::utils::serde::DeserializationError),
#[error("failed to convert RPC response")]
Conversion(#[source] ConversionError),
#[error("invalid RPC response: {0}")]
InvalidResponse(String),
}