use miden_node_proto::convert;
use miden_node_proto::decode::{
convert_digests_to_words,
read_account_id,
read_account_ids,
read_block_range,
read_root,
};
use miden_node_proto::domain::block::InvalidBlockRange;
use miden_node_proto::errors::ConversionError;
use miden_node_proto::generated::store::rpc_server;
use miden_node_proto::generated::{self as proto};
use miden_node_utils::limiter::{
QueryParamAccountIdLimit,
QueryParamLimiter,
QueryParamNoteIdLimit,
QueryParamNoteTagLimit,
QueryParamNullifierLimit,
};
use miden_protocol::Word;
use miden_protocol::account::AccountId;
use miden_protocol::block::BlockNumber;
use miden_protocol::note::NoteId;
use tonic::{Request, Response, Status};
use tracing::{debug, info};
use crate::COMPONENT;
use crate::errors::{
CheckNullifiersError,
GetAccountError,
GetBlockByNumberError,
GetNoteScriptByRootError,
GetNotesByIdError,
NoteSyncError,
SyncAccountStorageMapsError,
SyncAccountVaultError,
SyncChainMmrError,
SyncNullifiersError,
SyncTransactionsError,
};
use crate::server::api::{StoreApi, internal_error, validate_nullifiers};
#[tonic::async_trait]
impl rpc_server::Rpc for StoreApi {
async fn get_block_header_by_number(
&self,
request: Request<proto::rpc::BlockHeaderByNumberRequest>,
) -> Result<Response<proto::rpc::BlockHeaderByNumberResponse>, Status> {
self.get_block_header_by_number_inner(request).await
}
async fn check_nullifiers(
&self,
request: Request<proto::rpc::NullifierList>,
) -> Result<Response<proto::rpc::CheckNullifiersResponse>, Status> {
let request = request.into_inner();
check::<QueryParamNullifierLimit>(request.nullifiers.len())?;
let nullifiers = validate_nullifiers::<CheckNullifiersError>(&request.nullifiers)?;
let proofs = self.state.check_nullifiers(&nullifiers).await;
Ok(Response::new(proto::rpc::CheckNullifiersResponse {
proofs: convert(proofs).collect(),
}))
}
async fn sync_nullifiers(
&self,
request: Request<proto::rpc::SyncNullifiersRequest>,
) -> Result<Response<proto::rpc::SyncNullifiersResponse>, Status> {
let request = request.into_inner();
if request.prefix_len != 16 {
return Err(SyncNullifiersError::InvalidPrefixLength(request.prefix_len).into());
}
let chain_tip = self.state.latest_block_num().await;
let block_range =
read_block_range::<SyncNullifiersError>(request.block_range, "SyncNullifiersRequest")?
.into_inclusive_range::<SyncNullifiersError>(&chain_tip)?;
let (nullifiers, block_num) = self
.state
.sync_nullifiers(request.prefix_len, request.nullifiers, block_range)
.await
.map_err(SyncNullifiersError::from)?;
let nullifiers = nullifiers
.into_iter()
.map(|nullifier_info| proto::rpc::sync_nullifiers_response::NullifierUpdate {
nullifier: Some(nullifier_info.nullifier.into()),
block_num: nullifier_info.block_num.as_u32(),
})
.collect();
Ok(Response::new(proto::rpc::SyncNullifiersResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: block_num.as_u32(),
}),
nullifiers,
}))
}
async fn sync_notes(
&self,
request: Request<proto::rpc::SyncNotesRequest>,
) -> Result<Response<proto::rpc::SyncNotesResponse>, Status> {
let request = request.into_inner();
let chain_tip = self.state.latest_block_num().await;
let block_range =
read_block_range::<NoteSyncError>(request.block_range, "SyncNotesRequest")?
.into_inclusive_range::<NoteSyncError>(&chain_tip)?;
if *block_range.end() > chain_tip {
Err(NoteSyncError::FutureBlock { chain_tip, block_to: *block_range.end() })?;
}
check::<QueryParamNoteTagLimit>(request.note_tags.len())?;
let (results, last_block_checked) =
self.state.sync_notes(request.note_tags, block_range).await?;
let blocks = results
.into_iter()
.map(|(state, mmr_proof)| proto::rpc::sync_notes_response::NoteSyncBlock {
block_header: Some(state.block_header.into()),
mmr_path: Some(mmr_proof.merkle_path().clone().into()),
notes: state.notes.into_iter().map(Into::into).collect(),
})
.collect();
Ok(Response::new(proto::rpc::SyncNotesResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: last_block_checked.as_u32(),
}),
blocks,
}))
}
async fn sync_chain_mmr(
&self,
request: Request<proto::rpc::SyncChainMmrRequest>,
) -> Result<Response<proto::rpc::SyncChainMmrResponse>, Status> {
let request = request.into_inner();
let chain_tip = self.state.latest_block_num().await;
let block_range = request
.block_range
.ok_or_else(|| {
ConversionError::missing_field::<proto::rpc::SyncChainMmrRequest>("block_range")
})
.map_err(SyncChainMmrError::DeserializationFailed)?;
let effective_tip = match request.finality() {
proto::rpc::Finality::Unspecified | proto::rpc::Finality::Committed => chain_tip,
proto::rpc::Finality::Proven => self
.state
.db()
.select_latest_proven_in_sequence_block_num()
.await
.map_err(SyncChainMmrError::DatabaseError)?,
};
let block_from = BlockNumber::from(block_range.block_from);
if block_from > effective_tip {
Err(SyncChainMmrError::FutureBlock { chain_tip: effective_tip, block_from })?;
}
let block_to =
block_range.block_to.map_or(effective_tip, BlockNumber::from).min(effective_tip);
if block_from > block_to {
Err(SyncChainMmrError::InvalidBlockRange(InvalidBlockRange::StartGreaterThanEnd {
start: block_from,
end: block_to,
}))?;
}
let block_range = block_from..=block_to;
let (mmr_delta, block_header) =
self.state.sync_chain_mmr(block_range.clone()).await.map_err(internal_error)?;
Ok(Response::new(proto::rpc::SyncChainMmrResponse {
block_range: Some(proto::rpc::BlockRange {
block_from: block_range.start().as_u32(),
block_to: Some(block_range.end().as_u32()),
}),
mmr_delta: Some(mmr_delta.into()),
block_header: Some(block_header.into()),
}))
}
async fn get_notes_by_id(
&self,
request: Request<proto::note::NoteIdList>,
) -> Result<Response<proto::note::CommittedNoteList>, Status> {
info!(target: COMPONENT, ?request);
let note_ids = request.into_inner().ids;
check::<QueryParamNoteIdLimit>(note_ids.len())?;
let note_ids: Vec<Word> = convert_digests_to_words::<GetNotesByIdError, _>(note_ids)?;
let note_ids: Vec<NoteId> = note_ids.into_iter().map(NoteId::from_raw).collect();
let notes = self
.state
.get_notes_by_id(note_ids)
.await
.map_err(GetNotesByIdError::from)?
.into_iter()
.map(Into::into)
.collect();
Ok(Response::new(proto::note::CommittedNoteList { notes }))
}
async fn get_block_by_number(
&self,
request: Request<proto::blockchain::BlockNumber>,
) -> Result<Response<proto::blockchain::MaybeBlock>, Status> {
let request = request.into_inner();
debug!(target: COMPONENT, ?request);
let block = self
.state
.load_block(request.block_num.into())
.await
.map_err(GetBlockByNumberError::from)?;
Ok(Response::new(proto::blockchain::MaybeBlock { block }))
}
async fn get_account(
&self,
request: Request<proto::rpc::AccountRequest>,
) -> Result<Response<proto::rpc::AccountResponse>, Status> {
debug!(target: COMPONENT, ?request);
let request = request.into_inner();
let account_request = request.try_into().map_err(GetAccountError::DeserializationFailed)?;
let account_data = self.state.get_account(account_request).await?;
Ok(Response::new(account_data.into()))
}
async fn sync_account_vault(
&self,
request: Request<proto::rpc::SyncAccountVaultRequest>,
) -> Result<Response<proto::rpc::SyncAccountVaultResponse>, Status> {
let request = request.into_inner();
let chain_tip = self.state.latest_block_num().await;
let account_id: AccountId = read_account_id::<
proto::rpc::SyncAccountVaultRequest,
SyncAccountVaultError,
>(request.account_id)?;
if !account_id.has_public_state() {
return Err(SyncAccountVaultError::AccountNotPublic(account_id).into());
}
let block_range = read_block_range::<SyncAccountVaultError>(
request.block_range,
"SyncAccountVaultRequest",
)?
.into_inclusive_range::<SyncAccountVaultError>(&chain_tip)?;
let (last_included_block, updates) = self
.state
.sync_account_vault(account_id, block_range)
.await
.map_err(SyncAccountVaultError::from)?;
let updates = updates
.into_iter()
.map(|update| {
let vault_key: Word = update.vault_key.into();
proto::rpc::AccountVaultUpdate {
vault_key: Some(vault_key.into()),
asset: update.asset.map(Into::into),
block_num: update.block_num.as_u32(),
}
})
.collect();
Ok(Response::new(proto::rpc::SyncAccountVaultResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: last_included_block.as_u32(),
}),
updates,
}))
}
async fn sync_account_storage_maps(
&self,
request: Request<proto::rpc::SyncAccountStorageMapsRequest>,
) -> Result<Response<proto::rpc::SyncAccountStorageMapsResponse>, Status> {
let request = request.into_inner();
let account_id = read_account_id::<
proto::rpc::SyncAccountStorageMapsRequest,
SyncAccountStorageMapsError,
>(request.account_id)?;
if !account_id.has_public_state() {
Err(SyncAccountStorageMapsError::AccountNotPublic(account_id))?;
}
let chain_tip = self.state.latest_block_num().await;
let block_range = read_block_range::<SyncAccountStorageMapsError>(
request.block_range,
"SyncAccountStorageMapsRequest",
)?
.into_inclusive_range::<SyncAccountStorageMapsError>(&chain_tip)?;
let storage_maps_page = self
.state
.sync_account_storage_maps(account_id, block_range)
.await
.map_err(SyncAccountStorageMapsError::from)?;
let updates = storage_maps_page
.values
.into_iter()
.map(|map_value| proto::rpc::StorageMapUpdate {
slot_name: map_value.slot_name.to_string(),
key: Some(map_value.key.into()),
value: Some(map_value.value.into()),
block_num: map_value.block_num.as_u32(),
})
.collect();
Ok(Response::new(proto::rpc::SyncAccountStorageMapsResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: storage_maps_page.last_block_included.as_u32(),
}),
updates,
}))
}
async fn status(
&self,
_request: Request<()>,
) -> Result<Response<proto::rpc::StoreStatus>, Status> {
Ok(Response::new(proto::rpc::StoreStatus {
version: env!("CARGO_PKG_VERSION").to_string(),
status: "connected".to_string(),
chain_tip: self.state.latest_block_num().await.as_u32(),
}))
}
async fn get_note_script_by_root(
&self,
request: Request<proto::note::NoteScriptRoot>,
) -> Result<Response<proto::rpc::MaybeNoteScript>, Status> {
debug!(target: COMPONENT, request = ?request);
let root =
read_root::<GetNoteScriptByRootError>(request.into_inner().root, "NoteScriptRoot")?;
let note_script = self
.state
.get_note_script_by_root(root)
.await
.map_err(GetNoteScriptByRootError::from)?;
Ok(Response::new(proto::rpc::MaybeNoteScript {
script: note_script.map(Into::into),
}))
}
async fn sync_transactions(
&self,
request: Request<proto::rpc::SyncTransactionsRequest>,
) -> Result<Response<proto::rpc::SyncTransactionsResponse>, Status> {
debug!(target: COMPONENT, request = ?request);
let request = request.into_inner();
let chain_tip = self.state.latest_block_num().await;
let block_range = read_block_range::<SyncTransactionsError>(
request.block_range,
"SyncTransactionsRequest",
)?
.into_inclusive_range::<SyncTransactionsError>(&chain_tip)?;
let account_ids: Vec<AccountId> =
read_account_ids::<SyncTransactionsError, _>(request.account_ids)?;
check::<QueryParamAccountIdLimit>(account_ids.len())?;
let (last_block_included, transaction_records_db) = self
.state
.sync_transactions(account_ids, block_range.clone())
.await
.map_err(SyncTransactionsError::from)?;
let transactions: Vec<_> = transaction_records_db
.into_iter()
.map(crate::db::TransactionRecord::into_proto)
.collect();
Ok(Response::new(proto::rpc::SyncTransactionsResponse {
pagination_info: Some(proto::rpc::PaginationInfo {
chain_tip: chain_tip.as_u32(),
block_num: last_block_included.as_u32(),
}),
transactions,
}))
}
}
fn out_of_range_error<E: core::fmt::Display>(err: E) -> Status {
Status::out_of_range(err.to_string())
}
fn check<Q: QueryParamLimiter>(n: usize) -> Result<(), Status> {
<Q as QueryParamLimiter>::check(n).map_err(out_of_range_error)
}