1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::asset::{Asset, AssetVault};
10use miden_protocol::vm::FutureMaybeSend;
11
12type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
13
14use miden_protocol::account::{
15 Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
16};
17use miden_protocol::address::NetworkId;
18use miden_protocol::block::account_tree::AccountWitness;
19use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
20use miden_protocol::crypto::merkle::MerklePath;
21use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
22use miden_protocol::crypto::merkle::smt::SmtProof;
23use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
24use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
25use miden_protocol::utils::serde::Deserializable;
26use miden_protocol::{EMPTY_WORD, Word};
27use miden_tx::utils::serde::Serializable;
28use miden_tx::utils::sync::RwLock;
29use tonic::Status;
30use tracing::info;
31
32use super::domain::account::{
33 AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
34};
35use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
36use super::generated::rpc::account_request::AccountDetailRequest;
37use super::generated::rpc::AccountRequest;
38use super::{
39 Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
40 RpcStatusInfo,
41};
42use crate::rpc::domain::sync::ChainMmrInfo;
43use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
44use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
45use crate::rpc::domain::transaction::TransactionsInfo;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
49use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
50use crate::rpc::generated::rpc::BlockRange;
51use crate::rpc::domain::limits::RpcLimits;
52use crate::rpc::{AccountStateAt, generated as proto};
53
54mod api_client;
55mod retry;
56
57use api_client::api_client_wrapper::ApiClient;
58
59struct BlockPagination {
61 current_block_from: BlockNumber,
62 block_to: Option<BlockNumber>,
63 iterations: u32,
64}
65
66enum PaginationResult {
67 Continue,
68 Done {
69 chain_tip: BlockNumber,
70 block_num: BlockNumber,
71 },
72}
73
74impl BlockPagination {
75 const MAX_ITERATIONS: u32 = 1000;
80
81 fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
82 Self {
83 current_block_from: block_from,
84 block_to,
85 iterations: 0,
86 }
87 }
88
89 fn current_block_from(&self) -> BlockNumber {
90 self.current_block_from
91 }
92
93 fn block_to(&self) -> Option<BlockNumber> {
94 self.block_to
95 }
96
97 fn advance(
98 &mut self,
99 block_num: BlockNumber,
100 chain_tip: BlockNumber,
101 ) -> Result<PaginationResult, RpcError> {
102 if self.iterations >= Self::MAX_ITERATIONS {
103 return Err(RpcError::PaginationError(
104 "too many pagination iterations, possible infinite loop".to_owned(),
105 ));
106 }
107 self.iterations += 1;
108
109 if block_num < self.current_block_from {
110 return Err(RpcError::PaginationError(
111 "invalid pagination: block_num went backwards".to_owned(),
112 ));
113 }
114
115 let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
116
117 if block_num >= target_block {
118 return Ok(PaginationResult::Done { chain_tip, block_num });
119 }
120
121 self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
122
123 Ok(PaginationResult::Continue)
124 }
125}
126
127pub struct GrpcClient {
141 client: RwLock<Option<ApiClient>>,
143 endpoint: String,
145 timeout_ms: u64,
147 genesis_commitment: RwLock<Option<Word>>,
149 limits: RwLock<Option<RpcLimits>>,
151 max_retries: u32,
153 retry_interval_ms: u64,
155 bearer_token: Option<String>,
159}
160
161impl GrpcClient {
162 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
165 GrpcClient {
166 client: RwLock::new(None),
167 endpoint: endpoint.to_string(),
168 timeout_ms,
169 genesis_commitment: RwLock::new(None),
170 limits: RwLock::new(None),
171 max_retries: retry::DEFAULT_MAX_RETRIES,
172 retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
173 bearer_token: None,
174 }
175 }
176
177 #[must_use]
180 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
181 self.max_retries = max_retries;
182 self
183 }
184
185 #[must_use]
188 pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
189 self.retry_interval_ms = retry_interval_ms;
190 self
191 }
192
193 #[must_use]
218 pub fn with_bearer_auth(mut self, token: String) -> Self {
219 self.bearer_token = Some(token);
220 self
221 }
222
223 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
226 if self.client.read().is_none() {
227 self.connect().await?;
228 }
229
230 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
231 }
232
233 async fn connect(&self) -> Result<(), RpcError> {
236 let genesis_commitment = *self.genesis_commitment.read();
237 let new_client = ApiClient::new_client(
238 self.endpoint.clone(),
239 self.timeout_ms,
240 genesis_commitment,
241 self.bearer_token.clone(),
242 )
243 .await?;
244 let mut client = self.client.write();
245 client.replace(new_client);
246
247 Ok(())
248 }
249
250 fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
251 let genesis_commitment = self
252 .genesis_commitment
253 .read()
254 .as_ref()
255 .map_or_else(|| "none".to_string(), Word::to_hex);
256 let context = AcceptHeaderContext {
257 client_version: env!("CARGO_PKG_VERSION").to_string(),
258 genesis_commitment,
259 };
260 RpcError::from_grpc_error_with_context(endpoint, status, context)
261 }
262
263 async fn call_with_retry<T: Send + 'static>(
274 &self,
275 endpoint: RpcEndpoint,
276 mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
277 ) -> Result<tonic::Response<T>, RpcError> {
278 let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
279
280 loop {
281 let rpc_api = self.ensure_connected().await?;
282
283 match call(rpc_api).await {
284 Ok(response) => return Ok(response),
285 Err(status) if retry_state.should_retry(&status).await => {},
286 Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
287 }
288 }
289 }
290
291 pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
298 let mut rpc_api = ApiClient::new_client_without_accept_header(
299 self.endpoint.clone(),
300 self.timeout_ms,
301 self.bearer_token.clone(),
302 )
303 .await?;
304 rpc_api
305 .status(())
306 .await
307 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
308 .map(tonic::Response::into_inner)
309 .and_then(RpcStatusInfo::try_from)
310 }
311
312 pub async fn fetch_full_account_proof(
319 &self,
320 account_id: AccountId,
321 ) -> Result<(BlockNumber, AccountProof), RpcError> {
322 let has_public_state = account_id.has_public_state();
323 let account_request = {
324 AccountRequest {
325 account_id: Some(account_id.into()),
326 block_num: None,
327 details: {
328 if has_public_state {
329 Some(AccountDetailRequest {
334 code_commitment: Some(EMPTY_WORD.into()),
335 asset_vault_commitment: Some(EMPTY_WORD.into()),
336 storage_maps: vec![],
337 })
338 } else {
339 None
340 }
341 },
342 }
343 };
344 let account_response = self
345 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
346 let request = account_request.clone();
347 Box::pin(async move { rpc_api.get_account(request).await })
348 })
349 .await?
350 .into_inner();
351 let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
352 "GetAccountDetails returned an account without a matching block number for the witness"
353 .to_owned(),
354 ))?;
355 let account_proof = {
356 if has_public_state {
357 let account_details = account_response
358 .details
359 .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
360 .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
361 let storage_header = account_details.storage_details.header;
362 let maps_to_request = storage_header
366 .slots()
367 .filter(|header| header.slot_type().is_map())
368 .map(|map| map.name().to_string());
369 let account_request = AccountRequest {
370 account_id: Some(account_id.into()),
371 block_num: Some(block_number),
372 details: Some(AccountDetailRequest {
373 code_commitment: Some(EMPTY_WORD.into()),
374 asset_vault_commitment: Some(EMPTY_WORD.into()),
375 storage_maps: maps_to_request
376 .map(|slot_name| StorageMapDetailRequest {
377 slot_name,
378 slot_data: Some(SlotData::AllEntries(true)),
379 })
380 .collect(),
381 }),
382 };
383 let response = self
384 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
385 let request = account_request.clone();
386 Box::pin(async move { rpc_api.get_account(request).await })
387 })
388 .await?;
389 response.into_inner().try_into()
390 } else {
391 account_response.try_into()
392 }
393 };
394 Ok((block_number.block_num.into(), account_proof?))
395 }
396
397 async fn build_storage_slots(
401 &self,
402 account_id: AccountId,
403 storage_details: &AccountStorageDetails,
404 block_to: Option<BlockNumber>,
405 ) -> Result<Vec<StorageSlot>, RpcError> {
406 let mut slots = vec![];
407 let mut map_cache: Option<StorageMapInfo> = None;
410 for slot_header in storage_details.header.slots() {
411 match slot_header.slot_type() {
418 StorageSlotType::Value => {
419 slots.push(miden_protocol::account::StorageSlot::with_value(
420 slot_header.name().clone(),
421 slot_header.value(),
422 ));
423 },
424 StorageSlotType::Map => {
425 let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
426 RpcError::ExpectedDataMissing(format!(
427 "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
428 slot_header.name(),
429 )),
430 )?;
431
432 let storage_map = if map_details.too_many_entries {
433 let map_info = if let Some(ref info) = map_cache {
434 info
435 } else {
436 let fetched_data =
437 self.sync_storage_maps(0_u32.into(), block_to, account_id).await?;
438 map_cache.insert(fetched_data)
439 };
440 let mut sorted_updates: Vec<_> = map_info
444 .updates
445 .iter()
446 .filter(|slot_info| slot_info.slot_name == *slot_header.name())
447 .collect();
448 sorted_updates.sort_by_key(|u| u.block_num);
449 let map_entries: Vec<_> = sorted_updates
450 .into_iter()
451 .map(|u| (u.key, u.value))
452 .collect::<BTreeMap<_, _>>()
453 .into_iter()
454 .collect();
455 StorageMap::with_entries(map_entries)
456 } else {
457 map_details.entries.clone().into_storage_map().ok_or_else(|| {
458 RpcError::ExpectedDataMissing(
459 "expected AllEntries for full account fetch, got EntriesWithProofs"
460 .into(),
461 )
462 })?
463 }
464 .map_err(|err| {
465 RpcError::InvalidResponse(format!(
466 "the rpc api returned a non-valid map entry: {err}"
467 ))
468 })?;
469
470 slots.push(miden_protocol::account::StorageSlot::with_map(
471 slot_header.name().clone(),
472 storage_map,
473 ));
474 },
475 }
476 }
477 Ok(slots)
478 }
479
480 async fn fetch_full_vault(
485 &self,
486 account_id: AccountId,
487 block_to: Option<BlockNumber>,
488 ) -> Result<Vec<Asset>, RpcError> {
489 let vault_info =
490 self.sync_account_vault(BlockNumber::from(0), block_to, account_id).await?;
491 let mut updates = vault_info.updates;
492 updates.sort_by_key(|u| u.block_num);
493 Ok(updates
494 .into_iter()
495 .map(|u| (u.vault_key, u.asset))
496 .collect::<BTreeMap<_, _>>()
497 .into_values()
498 .flatten()
499 .collect())
500 }
501}
502
503#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
504#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
505impl NodeRpcClient for GrpcClient {
506 fn has_genesis_commitment(&self) -> Option<Word> {
511 *self.genesis_commitment.read()
512 }
513
514 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
515 if self.genesis_commitment.read().is_some() {
517 return Ok(());
519 }
520
521 self.genesis_commitment.write().replace(commitment);
523
524 let mut client_guard = self.client.write();
527 if let Some(client) = client_guard.as_mut() {
528 client.set_genesis_commitment(commitment);
529 }
530
531 Ok(())
532 }
533
534 async fn submit_proven_transaction(
535 &self,
536 proven_transaction: ProvenTransaction,
537 transaction_inputs: TransactionInputs,
538 ) -> Result<BlockNumber, RpcError> {
539 let request = proto::transaction::ProvenTransaction {
540 transaction: proven_transaction.to_bytes(),
541 transaction_inputs: Some(transaction_inputs.to_bytes()),
542 };
543
544 let api_response = self
545 .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
546 let request = request.clone();
547 Box::pin(async move { rpc_api.submit_proven_transaction(request).await })
548 })
549 .await?;
550
551 Ok(BlockNumber::from(api_response.into_inner().block_num))
552 }
553
554 async fn get_block_header_by_number(
555 &self,
556 block_num: Option<BlockNumber>,
557 include_mmr_proof: bool,
558 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
559 let request = proto::rpc::BlockHeaderByNumberRequest {
560 block_num: block_num.as_ref().map(BlockNumber::as_u32),
561 include_mmr_proof: Some(include_mmr_proof),
562 };
563
564 info!("Calling GetBlockHeaderByNumber: {:?}", request);
565
566 let api_response = self
567 .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
568 Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
569 })
570 .await?;
571
572 let response = api_response.into_inner();
573
574 let block_header: BlockHeader = response
575 .block_header
576 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
577 .try_into()?;
578
579 let mmr_proof = if include_mmr_proof {
580 let forest = response
581 .chain_length
582 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
583 let merkle_path: MerklePath = response
584 .mmr_path
585 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
586 .try_into()?;
587
588 Some(MmrProof::new(
589 MmrPath::new(
590 Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
591 block_header.block_num().as_usize(),
592 merkle_path,
593 ),
594 block_header.commitment(),
595 ))
596 } else {
597 None
598 };
599
600 Ok((block_header, mmr_proof))
601 }
602
603 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
604 let limits = self.get_rpc_limits().await?;
605 let mut notes = Vec::with_capacity(note_ids.len());
606 for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
607 let request = proto::note::NoteIdList {
608 ids: chunk.iter().map(|id| (*id).into()).collect(),
609 };
610
611 let api_response = self
612 .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
613 let request = request.clone();
614 Box::pin(async move { rpc_api.get_notes_by_id(request).await })
615 })
616 .await?;
617
618 let response_notes = api_response
619 .into_inner()
620 .notes
621 .into_iter()
622 .map(FetchedNote::try_from)
623 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
624
625 notes.extend(response_notes);
626 }
627 Ok(notes)
628 }
629
630 async fn sync_chain_mmr(
631 &self,
632 block_from: BlockNumber,
633 block_to: Option<BlockNumber>,
634 ) -> Result<ChainMmrInfo, RpcError> {
635 let block_range = Some(BlockRange {
636 block_from: block_from.as_u32(),
637 block_to: block_to.map(|b| b.as_u32()),
638 });
639
640 let request = proto::rpc::SyncChainMmrRequest {
641 block_range,
642 finality: proto::rpc::Finality::Committed as i32,
643 };
644
645 let response = self
646 .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
647 Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
648 })
649 .await?;
650
651 response.into_inner().try_into()
652 }
653
654 async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
666 let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
667 let update_summary =
668 AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
669
670 if account_id.is_private() {
673 Ok(FetchedAccount::new_private(account_id, update_summary))
674 } else {
675 let details =
676 full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
677 "GetAccountDetails returned a public account without details".to_owned(),
678 ))?;
679
680 let account_id = details.header.id();
681 let nonce = details.header.nonce();
682
683 let assets = if details.vault_details.too_many_assets {
686 self.fetch_full_vault(account_id, Some(block_number)).await?
687 } else {
688 details.vault_details.assets
689 };
690
691 let slots = self
694 .build_storage_slots(account_id, &details.storage_details, Some(block_number))
695 .await?;
696 let asset_vault = AssetVault::new(&assets).map_err(|err| {
697 RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
698 })?;
699 let account_storage = AccountStorage::new(slots).map_err(|err| {
700 RpcError::InvalidResponse(format!(
701 "api rpc returned non-valid storage slots: {err}"
702 ))
703 })?;
704 let account =
705 Account::new(account_id, asset_vault, account_storage, details.code, nonce, None)
706 .map_err(|err| {
707 RpcError::InvalidResponse(format!(
708 "failed to instance an account from the rpc api response: {err}"
709 ))
710 })?;
711 Ok(FetchedAccount::new_public(account, update_summary))
712 }
713 }
714
715 async fn get_account_proof(
727 &self,
728 account_id: AccountId,
729 storage_requirements: AccountStorageRequirements,
730 account_state: AccountStateAt,
731 known_account_code: Option<AccountCode>,
732 known_vault_commitment: Option<Word>,
733 ) -> Result<(BlockNumber, AccountProof), RpcError> {
734 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
735 if let Some(account_code) = known_account_code {
736 known_codes_by_commitment.insert(account_code.commitment(), account_code);
737 }
738
739 let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
740
741 let account_details = if account_id.has_public_state() {
744 Some(AccountDetailRequest {
745 code_commitment: Some(EMPTY_WORD.into()),
746 asset_vault_commitment: known_vault_commitment.map(Into::into),
747 storage_maps,
748 })
749 } else {
750 None
751 };
752
753 let block_num = match account_state {
754 AccountStateAt::Block(number) => Some(number.into()),
755 AccountStateAt::ChainTip => None,
756 };
757
758 let request = AccountRequest {
759 account_id: Some(account_id.into()),
760 block_num,
761 details: account_details,
762 };
763
764 let response = self
765 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
766 let request = request.clone();
767 Box::pin(async move { rpc_api.get_account(request).await })
768 })
769 .await?
770 .into_inner();
771
772 let account_witness: AccountWitness = response
773 .witness
774 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
775 .try_into()?;
776
777 let block_num: BlockNumber = response
778 .block_num
779 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
780 .block_num
781 .into();
782
783 let headers = if account_witness.id().has_public_state() {
785 let mut details = response
786 .details
787 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
788 .into_domain(&known_codes_by_commitment, &storage_requirements)?;
789
790 if details.vault_details.too_many_assets {
791 details.vault_details.assets =
792 self.fetch_full_vault(account_id, Some(block_num)).await?;
793 }
794
795 Some(details)
796 } else {
797 None
798 };
799
800 let proof = AccountProof::new(account_witness, headers)
801 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
802
803 Ok((block_num, proof))
804 }
805
806 async fn sync_notes(
809 &self,
810 block_num: BlockNumber,
811 block_to: Option<BlockNumber>,
812 note_tags: &BTreeSet<NoteTag>,
813 ) -> Result<NoteSyncInfo, RpcError> {
814 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
815
816 let block_range = Some(BlockRange {
817 block_from: block_num.as_u32(),
818 block_to: block_to.map(|b| b.as_u32()),
819 });
820
821 let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
822
823 let response = self
824 .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
825 let request = request.clone();
826 Box::pin(async move { rpc_api.sync_notes(request).await })
827 })
828 .await?;
829
830 response.into_inner().try_into()
831 }
832
833 async fn sync_nullifiers(
834 &self,
835 prefixes: &[u16],
836 block_num: BlockNumber,
837 block_to: Option<BlockNumber>,
838 ) -> Result<Vec<NullifierUpdate>, RpcError> {
839 const MAX_ITERATIONS: u32 = 1000; let limits = self.get_rpc_limits().await?;
842 let mut all_nullifiers = BTreeSet::new();
843
844 'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
847 let mut current_block_from = block_num.as_u32();
848
849 for _ in 0..MAX_ITERATIONS {
850 let request = proto::rpc::SyncNullifiersRequest {
851 nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
852 prefix_len: 16,
853 block_range: Some(BlockRange {
854 block_from: current_block_from,
855 block_to: block_to.map(|b| b.as_u32()),
856 }),
857 };
858
859 let response = self
860 .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
861 let request = request.clone();
862 Box::pin(async move { rpc_api.sync_nullifiers(request).await })
863 })
864 .await?;
865 let response = response.into_inner();
866
867 let batch_nullifiers = response
869 .nullifiers
870 .iter()
871 .map(TryFrom::try_from)
872 .collect::<Result<Vec<NullifierUpdate>, _>>()
873 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
874
875 all_nullifiers.extend(batch_nullifiers);
876
877 if let Some(page) = response.pagination_info {
879 if page.block_num < current_block_from {
881 return Err(RpcError::PaginationError(
882 "invalid pagination: block_num went backwards".to_string(),
883 ));
884 }
885
886 let target_block =
888 block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
889
890 if page.block_num >= target_block {
891 continue 'chunk_nullifiers;
893 }
894 current_block_from = page.block_num + 1;
895 }
896 }
897 return Err(RpcError::PaginationError(
899 "too many pagination iterations, possible infinite loop".to_string(),
900 ));
901 }
902 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
903 }
904
905 async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
906 let limits = self.get_rpc_limits().await?;
907 let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
908 for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
909 let request = proto::rpc::NullifierList {
910 nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
911 };
912
913 let response = self
914 .call_with_retry(RpcEndpoint::CheckNullifiers, |mut rpc_api| {
915 let request = request.clone();
916 Box::pin(async move { rpc_api.check_nullifiers(request).await })
917 })
918 .await?;
919
920 let mut response = response.into_inner();
921 let chunk_proofs = response
922 .proofs
923 .iter_mut()
924 .map(|r| r.to_owned().try_into())
925 .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
926 proofs.extend(chunk_proofs);
927 }
928 Ok(proofs)
929 }
930
931 async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
932 let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
933
934 let response = self
935 .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
936 Box::pin(async move { rpc_api.get_block_by_number(request).await })
937 })
938 .await?;
939
940 let response = response.into_inner();
941 let block =
942 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
943 "GetBlockByNumberResponse.block".to_string(),
944 ))?)?;
945
946 Ok(block)
947 }
948
949 async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
950 let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
951
952 let response = self
953 .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
954 Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
955 })
956 .await?;
957
958 let response = response.into_inner();
959 let note_script = NoteScript::try_from(
960 response
961 .script
962 .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
963 )?;
964
965 Ok(note_script)
966 }
967
968 async fn sync_storage_maps(
969 &self,
970 block_from: BlockNumber,
971 block_to: Option<BlockNumber>,
972 account_id: AccountId,
973 ) -> Result<StorageMapInfo, RpcError> {
974 let mut pagination = BlockPagination::new(block_from, block_to);
975 let mut updates = Vec::new();
976
977 let (chain_tip, block_number) = loop {
978 let request = proto::rpc::SyncAccountStorageMapsRequest {
979 block_range: Some(BlockRange {
980 block_from: pagination.current_block_from().as_u32(),
981 block_to: pagination.block_to().map(|block| block.as_u32()),
982 }),
983 account_id: Some(account_id.into()),
984 };
985 let response = self
986 .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
987 let request = request.clone();
988 Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
989 })
990 .await?;
991 let response = response.into_inner();
992 let page = response
993 .pagination_info
994 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
995 let page_block_num = BlockNumber::from(page.block_num);
996 let page_chain_tip = BlockNumber::from(page.chain_tip);
997 let batch = response
998 .updates
999 .into_iter()
1000 .map(TryInto::try_into)
1001 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
1002 updates.extend(batch);
1003
1004 match pagination.advance(page_block_num, page_chain_tip)? {
1005 PaginationResult::Continue => {},
1006 PaginationResult::Done {
1007 chain_tip: final_chain_tip,
1008 block_num: final_block_num,
1009 } => break (final_chain_tip, final_block_num),
1010 }
1011 };
1012
1013 Ok(StorageMapInfo { chain_tip, block_number, updates })
1014 }
1015
1016 async fn sync_account_vault(
1017 &self,
1018 block_from: BlockNumber,
1019 block_to: Option<BlockNumber>,
1020 account_id: AccountId,
1021 ) -> Result<AccountVaultInfo, RpcError> {
1022 let mut pagination = BlockPagination::new(block_from, block_to);
1023 let mut updates = Vec::new();
1024
1025 let (chain_tip, block_number) = loop {
1026 let request = proto::rpc::SyncAccountVaultRequest {
1027 block_range: Some(BlockRange {
1028 block_from: pagination.current_block_from().as_u32(),
1029 block_to: pagination.block_to().map(|block| block.as_u32()),
1030 }),
1031 account_id: Some(account_id.into()),
1032 };
1033 let response = self
1034 .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
1035 let request = request.clone();
1036 Box::pin(async move { rpc_api.sync_account_vault(request).await })
1037 })
1038 .await?;
1039 let response = response.into_inner();
1040 let page = response
1041 .pagination_info
1042 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
1043 let page_block_num = BlockNumber::from(page.block_num);
1044 let page_chain_tip = BlockNumber::from(page.chain_tip);
1045 let batch = response
1046 .updates
1047 .iter()
1048 .map(|u| (*u).try_into())
1049 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
1050 updates.extend(batch);
1051
1052 match pagination.advance(page_block_num, page_chain_tip)? {
1053 PaginationResult::Continue => {},
1054 PaginationResult::Done {
1055 chain_tip: final_chain_tip,
1056 block_num: final_block_num,
1057 } => break (final_chain_tip, final_block_num),
1058 }
1059 };
1060
1061 Ok(AccountVaultInfo { chain_tip, block_number, updates })
1062 }
1063
1064 async fn sync_transactions(
1065 &self,
1066 block_from: BlockNumber,
1067 block_to: Option<BlockNumber>,
1068 account_ids: Vec<AccountId>,
1069 ) -> Result<TransactionsInfo, RpcError> {
1070 let block_range = Some(BlockRange {
1071 block_from: block_from.as_u32(),
1072 block_to: block_to.map(|b| b.as_u32()),
1073 });
1074
1075 let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
1076
1077 let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
1078
1079 let response = self
1080 .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
1081 let request = request.clone();
1082 Box::pin(async move { rpc_api.sync_transactions(request).await })
1083 })
1084 .await?;
1085
1086 response.into_inner().try_into()
1087 }
1088
1089 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
1090 let endpoint: Endpoint =
1091 Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
1092 Ok(endpoint.to_network_id())
1093 }
1094
1095 async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
1096 if let Some(limits) = *self.limits.read() {
1098 return Ok(limits);
1099 }
1100
1101 let response = self
1103 .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
1104 Box::pin(async move { rpc_api.get_limits(()).await })
1105 })
1106 .await?;
1107 let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
1108
1109 self.limits.write().replace(limits);
1111 Ok(limits)
1112 }
1113
1114 fn has_rpc_limits(&self) -> Option<RpcLimits> {
1115 *self.limits.read()
1116 }
1117
1118 async fn set_rpc_limits(&self, limits: RpcLimits) {
1119 self.limits.write().replace(limits);
1120 }
1121
1122 async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
1123 GrpcClient::get_status_unversioned(self).await
1124 }
1125}
1126
1127impl RpcError {
1131 pub fn from_grpc_error_with_context(
1132 endpoint: RpcEndpoint,
1133 status: Status,
1134 context: AcceptHeaderContext,
1135 ) -> Self {
1136 if let Some(accept_error) =
1137 AcceptHeaderError::try_from_message_with_context(status.message(), context)
1138 {
1139 return Self::AcceptHeaderError(accept_error);
1140 }
1141
1142 let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
1144
1145 let error_kind = GrpcError::from(&status);
1146 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
1147
1148 Self::RequestError {
1149 endpoint,
1150 error_kind,
1151 endpoint_error,
1152 source: Some(source),
1153 }
1154 }
1155}
1156
1157impl From<&Status> for GrpcError {
1158 fn from(status: &Status) -> Self {
1159 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1160 }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165 use std::boxed::Box;
1166
1167 use miden_protocol::Word;
1168 use miden_protocol::block::BlockNumber;
1169
1170 use super::{BlockPagination, GrpcClient, PaginationResult};
1171 use crate::alloc::string::ToString;
1172 use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1173
1174 fn assert_send_sync<T: Send + Sync>() {}
1175
1176 #[test]
1177 fn is_send_sync() {
1178 assert_send_sync::<GrpcClient>();
1179 assert_send_sync::<Box<dyn NodeRpcClient>>();
1180 }
1181
1182 #[test]
1183 fn block_pagination_errors_when_block_num_goes_backwards() {
1184 let mut pagination = BlockPagination::new(10_u32.into(), None);
1185
1186 let res = pagination.advance(9_u32.into(), 20_u32.into());
1187 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1188 }
1189
1190 #[test]
1191 fn block_pagination_errors_after_max_iterations() {
1192 let mut pagination = BlockPagination::new(0_u32.into(), None);
1193 let chain_tip: BlockNumber = 10_000_u32.into();
1194
1195 for _ in 0..BlockPagination::MAX_ITERATIONS {
1196 let current = pagination.current_block_from();
1197 let res = pagination
1198 .advance(current, chain_tip)
1199 .expect("expected pagination to continue within iteration limit");
1200 assert!(matches!(res, PaginationResult::Continue));
1201 }
1202
1203 let res = pagination.advance(pagination.current_block_from(), chain_tip);
1204 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1205 }
1206
1207 #[test]
1208 fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1209 let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1211
1212 let res = pagination
1213 .advance(30_u32.into(), 30_u32.into())
1214 .expect("expected pagination to succeed");
1215
1216 assert!(matches!(
1217 res,
1218 PaginationResult::Done {
1219 chain_tip,
1220 block_num
1221 } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1222 ));
1223 }
1224
1225 #[test]
1226 fn block_pagination_advances_cursor_by_one() {
1227 let mut pagination = BlockPagination::new(5_u32.into(), None);
1228
1229 let res = pagination
1230 .advance(5_u32.into(), 100_u32.into())
1231 .expect("expected pagination to succeed");
1232 assert!(matches!(res, PaginationResult::Continue));
1233 assert_eq!(pagination.current_block_from().as_u32(), 6);
1234 }
1235
1236 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1238 let res = client.get_block_header_by_number(None, false).await;
1240 assert!(res.is_ok());
1241 }
1242
1243 #[tokio::test]
1244 async fn future_is_send() {
1245 let endpoint = &Endpoint::devnet();
1246 let client = GrpcClient::new(endpoint, 10000);
1247 let client: Box<GrpcClient> = client.into();
1248 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1249 }
1250
1251 #[tokio::test]
1252 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1253 let endpoint = &Endpoint::devnet();
1254 let client = GrpcClient::new(endpoint, 10000);
1255
1256 assert!(client.genesis_commitment.read().is_none());
1257
1258 let commitment = Word::default();
1259 client.set_genesis_commitment(commitment).await.unwrap();
1260
1261 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1262 }
1263
1264 #[tokio::test]
1265 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1266 use miden_protocol::Felt;
1267
1268 let endpoint = &Endpoint::devnet();
1269 let client = GrpcClient::new(endpoint, 10000);
1270
1271 let initial_commitment = Word::default();
1272 client.set_genesis_commitment(initial_commitment).await.unwrap();
1273
1274 let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1275 client.set_genesis_commitment(new_commitment).await.unwrap();
1276
1277 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1278 }
1279
1280 #[tokio::test]
1281 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1282 let endpoint = &Endpoint::devnet();
1283 let client = GrpcClient::new(endpoint, 10000);
1284
1285 client.connect().await.unwrap();
1287
1288 let commitment = Word::default();
1289 client.set_genesis_commitment(commitment).await.unwrap();
1290
1291 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1292 assert!(client.client.read().as_ref().is_some());
1293 }
1294
1295 #[test]
1296 fn with_bearer_auth_stores_token() {
1297 let endpoint = &Endpoint::devnet();
1298 let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("token-one".to_string());
1299
1300 assert_eq!(client.bearer_token.as_deref(), Some("token-one"));
1301 }
1302
1303 #[test]
1304 fn with_bearer_auth_overwrites_on_repeat_call() {
1305 let endpoint = &Endpoint::devnet();
1306 let client = GrpcClient::new(endpoint, 10000)
1307 .with_bearer_auth("token-one".to_string())
1308 .with_bearer_auth("token-two".to_string());
1309
1310 assert_eq!(client.bearer_token.as_deref(), Some("token-two"));
1312 }
1313
1314 #[tokio::test]
1315 async fn with_bearer_auth_surfaces_invalid_ascii_value_at_connect_time() {
1316 let endpoint = &Endpoint::devnet();
1320 let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("bad\nvalue".to_string());
1321
1322 let err = client.connect().await.expect_err("expected invalid token to fail connect");
1323 assert!(
1324 matches!(err, RpcError::ConnectionError(_)),
1325 "expected ConnectionError, got {err:?}",
1326 );
1327 }
1328
1329 #[tokio::test]
1330 async fn with_bearer_auth_is_preserved_across_set_genesis_commitment() {
1331 let endpoint = &Endpoint::devnet();
1332 let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("token".to_string());
1333 client.connect().await.unwrap();
1334
1335 client.set_genesis_commitment(Word::default()).await.unwrap();
1336
1337 assert_eq!(client.bearer_token.as_deref(), Some("token"));
1339 assert!(client.client.read().as_ref().is_some());
1340 }
1341
1342 #[tokio::test]
1354 #[ignore = "requires network access to public testnet"]
1355 async fn with_bearer_auth_does_not_break_real_rpc_against_testnet() {
1356 let endpoint = &Endpoint::testnet();
1357 let client = GrpcClient::new(endpoint, 10_000).with_bearer_auth("smoke-test".to_string());
1358
1359 let status = client
1360 .get_status_unversioned()
1361 .await
1362 .expect("testnet status with caller auth header must succeed");
1363 assert!(!status.version.is_empty(), "status must include a server version");
1364 }
1365}