1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::asset::{Asset, AssetVault};
10use miden_protocol::vm::FutureMaybeSend;
11
12type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
13
14use miden_protocol::account::{
15 Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
16};
17use miden_protocol::address::NetworkId;
18use miden_protocol::block::account_tree::AccountWitness;
19use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
20use miden_protocol::crypto::merkle::MerklePath;
21use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
22use miden_protocol::crypto::merkle::smt::SmtProof;
23use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
24use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
25use miden_protocol::utils::serde::Deserializable;
26use miden_protocol::{EMPTY_WORD, Word};
27use miden_tx::utils::serde::Serializable;
28use miden_tx::utils::sync::RwLock;
29use tonic::Status;
30use tracing::info;
31
32use super::domain::account::{
33 AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
34};
35use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
36use super::generated::rpc::account_request::AccountDetailRequest;
37use super::generated::rpc::AccountRequest;
38use super::{
39 Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
40 RpcStatusInfo,
41};
42use crate::rpc::domain::sync::ChainMmrInfo;
43use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
44use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
45use crate::rpc::domain::transaction::TransactionsInfo;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
49use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
50use crate::rpc::generated::rpc::BlockRange;
51use crate::rpc::domain::limits::RpcLimits;
52use crate::rpc::{AccountStateAt, generated as proto};
53
54mod api_client;
55mod retry;
56
57use api_client::api_client_wrapper::ApiClient;
58
59struct BlockPagination {
61 current_block_from: BlockNumber,
62 block_to: Option<BlockNumber>,
63 iterations: u32,
64}
65
66enum PaginationResult {
67 Continue,
68 Done {
69 chain_tip: BlockNumber,
70 block_num: BlockNumber,
71 },
72}
73
74impl BlockPagination {
75 const MAX_ITERATIONS: u32 = 1000;
80
81 fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
82 Self {
83 current_block_from: block_from,
84 block_to,
85 iterations: 0,
86 }
87 }
88
89 fn current_block_from(&self) -> BlockNumber {
90 self.current_block_from
91 }
92
93 fn block_to(&self) -> Option<BlockNumber> {
94 self.block_to
95 }
96
97 fn advance(
98 &mut self,
99 block_num: BlockNumber,
100 chain_tip: BlockNumber,
101 ) -> Result<PaginationResult, RpcError> {
102 if self.iterations >= Self::MAX_ITERATIONS {
103 return Err(RpcError::PaginationError(
104 "too many pagination iterations, possible infinite loop".to_owned(),
105 ));
106 }
107 self.iterations += 1;
108
109 if block_num < self.current_block_from {
110 return Err(RpcError::PaginationError(
111 "invalid pagination: block_num went backwards".to_owned(),
112 ));
113 }
114
115 let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
116
117 if block_num >= target_block {
118 return Ok(PaginationResult::Done { chain_tip, block_num });
119 }
120
121 self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
122
123 Ok(PaginationResult::Continue)
124 }
125}
126
127pub struct GrpcClient {
141 client: RwLock<Option<ApiClient>>,
143 endpoint: String,
145 timeout_ms: u64,
147 genesis_commitment: RwLock<Option<Word>>,
149 limits: RwLock<Option<RpcLimits>>,
151 max_retries: u32,
153 retry_interval_ms: u64,
155}
156
157impl GrpcClient {
158 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
161 GrpcClient {
162 client: RwLock::new(None),
163 endpoint: endpoint.to_string(),
164 timeout_ms,
165 genesis_commitment: RwLock::new(None),
166 limits: RwLock::new(None),
167 max_retries: retry::DEFAULT_MAX_RETRIES,
168 retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
169 }
170 }
171
172 #[must_use]
175 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
176 self.max_retries = max_retries;
177 self
178 }
179
180 #[must_use]
183 pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
184 self.retry_interval_ms = retry_interval_ms;
185 self
186 }
187
188 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
191 if self.client.read().is_none() {
192 self.connect().await?;
193 }
194
195 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
196 }
197
198 async fn connect(&self) -> Result<(), RpcError> {
201 let genesis_commitment = *self.genesis_commitment.read();
202 let new_client =
203 ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
204 .await?;
205 let mut client = self.client.write();
206 client.replace(new_client);
207
208 Ok(())
209 }
210
211 fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
212 let genesis_commitment = self
213 .genesis_commitment
214 .read()
215 .as_ref()
216 .map_or_else(|| "none".to_string(), Word::to_hex);
217 let context = AcceptHeaderContext {
218 client_version: env!("CARGO_PKG_VERSION").to_string(),
219 genesis_commitment,
220 };
221 RpcError::from_grpc_error_with_context(endpoint, status, context)
222 }
223
224 async fn call_with_retry<T: Send + 'static>(
235 &self,
236 endpoint: RpcEndpoint,
237 mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
238 ) -> Result<tonic::Response<T>, RpcError> {
239 let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
240
241 loop {
242 let rpc_api = self.ensure_connected().await?;
243
244 match call(rpc_api).await {
245 Ok(response) => return Ok(response),
246 Err(status) if retry_state.should_retry(&status).await => {},
247 Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
248 }
249 }
250 }
251
252 pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
257 let mut rpc_api =
258 ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
259 .await?;
260 rpc_api
261 .status(())
262 .await
263 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
264 .map(tonic::Response::into_inner)
265 .and_then(RpcStatusInfo::try_from)
266 }
267
268 pub async fn fetch_full_account_proof(
275 &self,
276 account_id: AccountId,
277 ) -> Result<(BlockNumber, AccountProof), RpcError> {
278 let has_public_state = account_id.has_public_state();
279 let account_request = {
280 AccountRequest {
281 account_id: Some(account_id.into()),
282 block_num: None,
283 details: {
284 if has_public_state {
285 Some(AccountDetailRequest {
290 code_commitment: Some(EMPTY_WORD.into()),
291 asset_vault_commitment: Some(EMPTY_WORD.into()),
292 storage_maps: vec![],
293 })
294 } else {
295 None
296 }
297 },
298 }
299 };
300 let account_response = self
301 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
302 let request = account_request.clone();
303 Box::pin(async move { rpc_api.get_account(request).await })
304 })
305 .await?
306 .into_inner();
307 let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
308 "GetAccountDetails returned an account without a matching block number for the witness"
309 .to_owned(),
310 ))?;
311 let account_proof = {
312 if has_public_state {
313 let account_details = account_response
314 .details
315 .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
316 .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
317 let storage_header = account_details.storage_details.header;
318 let maps_to_request = storage_header
322 .slots()
323 .filter(|header| header.slot_type().is_map())
324 .map(|map| map.name().to_string());
325 let account_request = AccountRequest {
326 account_id: Some(account_id.into()),
327 block_num: Some(block_number),
328 details: Some(AccountDetailRequest {
329 code_commitment: Some(EMPTY_WORD.into()),
330 asset_vault_commitment: Some(EMPTY_WORD.into()),
331 storage_maps: maps_to_request
332 .map(|slot_name| StorageMapDetailRequest {
333 slot_name,
334 slot_data: Some(SlotData::AllEntries(true)),
335 })
336 .collect(),
337 }),
338 };
339 let response = self
340 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
341 let request = account_request.clone();
342 Box::pin(async move { rpc_api.get_account(request).await })
343 })
344 .await?;
345 response.into_inner().try_into()
346 } else {
347 account_response.try_into()
348 }
349 };
350 Ok((block_number.block_num.into(), account_proof?))
351 }
352
353 async fn build_storage_slots(
357 &self,
358 account_id: AccountId,
359 storage_details: &AccountStorageDetails,
360 block_to: Option<BlockNumber>,
361 ) -> Result<Vec<StorageSlot>, RpcError> {
362 let mut slots = vec![];
363 let mut map_cache: Option<StorageMapInfo> = None;
366 for slot_header in storage_details.header.slots() {
367 match slot_header.slot_type() {
374 StorageSlotType::Value => {
375 slots.push(miden_protocol::account::StorageSlot::with_value(
376 slot_header.name().clone(),
377 slot_header.value(),
378 ));
379 },
380 StorageSlotType::Map => {
381 let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
382 RpcError::ExpectedDataMissing(format!(
383 "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
384 slot_header.name(),
385 )),
386 )?;
387
388 let storage_map = if map_details.too_many_entries {
389 let map_info = if let Some(ref info) = map_cache {
390 info
391 } else {
392 let fetched_data =
393 self.sync_storage_maps(0_u32.into(), block_to, account_id).await?;
394 map_cache.insert(fetched_data)
395 };
396 let mut sorted_updates: Vec<_> = map_info
400 .updates
401 .iter()
402 .filter(|slot_info| slot_info.slot_name == *slot_header.name())
403 .collect();
404 sorted_updates.sort_by_key(|u| u.block_num);
405 let map_entries: Vec<_> = sorted_updates
406 .into_iter()
407 .map(|u| (u.key, u.value))
408 .collect::<BTreeMap<_, _>>()
409 .into_iter()
410 .collect();
411 StorageMap::with_entries(map_entries)
412 } else {
413 map_details.entries.clone().into_storage_map().ok_or_else(|| {
414 RpcError::ExpectedDataMissing(
415 "expected AllEntries for full account fetch, got EntriesWithProofs"
416 .into(),
417 )
418 })?
419 }
420 .map_err(|err| {
421 RpcError::InvalidResponse(format!(
422 "the rpc api returned a non-valid map entry: {err}"
423 ))
424 })?;
425
426 slots.push(miden_protocol::account::StorageSlot::with_map(
427 slot_header.name().clone(),
428 storage_map,
429 ));
430 },
431 }
432 }
433 Ok(slots)
434 }
435
436 async fn fetch_full_vault(
441 &self,
442 account_id: AccountId,
443 block_to: Option<BlockNumber>,
444 ) -> Result<Vec<Asset>, RpcError> {
445 let vault_info =
446 self.sync_account_vault(BlockNumber::from(0), block_to, account_id).await?;
447 let mut updates = vault_info.updates;
448 updates.sort_by_key(|u| u.block_num);
449 Ok(updates
450 .into_iter()
451 .map(|u| (u.vault_key, u.asset))
452 .collect::<BTreeMap<_, _>>()
453 .into_values()
454 .flatten()
455 .collect())
456 }
457}
458
459#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
460#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
461impl NodeRpcClient for GrpcClient {
462 fn has_genesis_commitment(&self) -> Option<Word> {
467 *self.genesis_commitment.read()
468 }
469
470 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
471 if self.genesis_commitment.read().is_some() {
473 return Ok(());
475 }
476
477 self.genesis_commitment.write().replace(commitment);
479
480 let mut client_guard = self.client.write();
483 if let Some(client) = client_guard.as_mut() {
484 client.set_genesis_commitment(commitment);
485 }
486
487 Ok(())
488 }
489
490 async fn submit_proven_transaction(
491 &self,
492 proven_transaction: ProvenTransaction,
493 transaction_inputs: TransactionInputs,
494 ) -> Result<BlockNumber, RpcError> {
495 let request = proto::transaction::ProvenTransaction {
496 transaction: proven_transaction.to_bytes(),
497 transaction_inputs: Some(transaction_inputs.to_bytes()),
498 };
499
500 let api_response = self
501 .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
502 let request = request.clone();
503 Box::pin(async move { rpc_api.submit_proven_transaction(request).await })
504 })
505 .await?;
506
507 Ok(BlockNumber::from(api_response.into_inner().block_num))
508 }
509
510 async fn get_block_header_by_number(
511 &self,
512 block_num: Option<BlockNumber>,
513 include_mmr_proof: bool,
514 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
515 let request = proto::rpc::BlockHeaderByNumberRequest {
516 block_num: block_num.as_ref().map(BlockNumber::as_u32),
517 include_mmr_proof: Some(include_mmr_proof),
518 };
519
520 info!("Calling GetBlockHeaderByNumber: {:?}", request);
521
522 let api_response = self
523 .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
524 Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
525 })
526 .await?;
527
528 let response = api_response.into_inner();
529
530 let block_header: BlockHeader = response
531 .block_header
532 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
533 .try_into()?;
534
535 let mmr_proof = if include_mmr_proof {
536 let forest = response
537 .chain_length
538 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
539 let merkle_path: MerklePath = response
540 .mmr_path
541 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
542 .try_into()?;
543
544 Some(MmrProof::new(
545 MmrPath::new(
546 Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
547 block_header.block_num().as_usize(),
548 merkle_path,
549 ),
550 block_header.commitment(),
551 ))
552 } else {
553 None
554 };
555
556 Ok((block_header, mmr_proof))
557 }
558
559 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
560 let limits = self.get_rpc_limits().await?;
561 let mut notes = Vec::with_capacity(note_ids.len());
562 for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
563 let request = proto::note::NoteIdList {
564 ids: chunk.iter().map(|id| (*id).into()).collect(),
565 };
566
567 let api_response = self
568 .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
569 let request = request.clone();
570 Box::pin(async move { rpc_api.get_notes_by_id(request).await })
571 })
572 .await?;
573
574 let response_notes = api_response
575 .into_inner()
576 .notes
577 .into_iter()
578 .map(FetchedNote::try_from)
579 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
580
581 notes.extend(response_notes);
582 }
583 Ok(notes)
584 }
585
586 async fn sync_chain_mmr(
587 &self,
588 block_from: BlockNumber,
589 block_to: Option<BlockNumber>,
590 ) -> Result<ChainMmrInfo, RpcError> {
591 let block_range = Some(BlockRange {
592 block_from: block_from.as_u32(),
593 block_to: block_to.map(|b| b.as_u32()),
594 });
595
596 let request = proto::rpc::SyncChainMmrRequest {
597 block_range,
598 finality: proto::rpc::Finality::Committed as i32,
599 };
600
601 let response = self
602 .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
603 Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
604 })
605 .await?;
606
607 response.into_inner().try_into()
608 }
609
610 async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
622 let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
623 let update_summary =
624 AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
625
626 if account_id.is_private() {
629 Ok(FetchedAccount::new_private(account_id, update_summary))
630 } else {
631 let details =
635 full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
636 "GetAccountDetails returned a public account without details".to_owned(),
637 ))?;
638 let account_id = details.header.id();
639 let nonce = details.header.nonce();
640 let assets = if details.vault_details.too_many_assets {
641 self.fetch_full_vault(account_id, Some(block_number)).await?
642 } else {
643 details.vault_details.assets
644 };
645
646 let slots = self
647 .build_storage_slots(account_id, &details.storage_details, Some(block_number))
648 .await?;
649 let seed = None;
650 let asset_vault = AssetVault::new(&assets).map_err(|err| {
651 RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
652 })?;
653 let account_storage = AccountStorage::new(slots).map_err(|err| {
654 RpcError::InvalidResponse(format!(
655 "api rpc returned non-valid storage slots: {err}"
656 ))
657 })?;
658 let account =
659 Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
660 .map_err(|err| {
661 RpcError::InvalidResponse(format!(
662 "failed to instance an account from the rpc api response: {err}"
663 ))
664 })?;
665 Ok(FetchedAccount::new_public(account, update_summary))
666 }
667 }
668
669 async fn get_account_proof(
681 &self,
682 account_id: AccountId,
683 storage_requirements: AccountStorageRequirements,
684 account_state: AccountStateAt,
685 known_account_code: Option<AccountCode>,
686 known_vault_commitment: Option<Word>,
687 ) -> Result<(BlockNumber, AccountProof), RpcError> {
688 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
689 if let Some(account_code) = known_account_code {
690 known_codes_by_commitment.insert(account_code.commitment(), account_code);
691 }
692
693 let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
694
695 let account_details = if account_id.has_public_state() {
698 Some(AccountDetailRequest {
699 code_commitment: Some(EMPTY_WORD.into()),
700 asset_vault_commitment: known_vault_commitment.map(Into::into),
701 storage_maps,
702 })
703 } else {
704 None
705 };
706
707 let block_num = match account_state {
708 AccountStateAt::Block(number) => Some(number.into()),
709 AccountStateAt::ChainTip => None,
710 };
711
712 let request = AccountRequest {
713 account_id: Some(account_id.into()),
714 block_num,
715 details: account_details,
716 };
717
718 let response = self
719 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
720 let request = request.clone();
721 Box::pin(async move { rpc_api.get_account(request).await })
722 })
723 .await?
724 .into_inner();
725
726 let account_witness: AccountWitness = response
727 .witness
728 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
729 .try_into()?;
730
731 let block_num: BlockNumber = response
732 .block_num
733 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
734 .block_num
735 .into();
736
737 let headers = if account_witness.id().has_public_state() {
739 let mut details = response
740 .details
741 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
742 .into_domain(&known_codes_by_commitment, &storage_requirements)?;
743
744 if details.vault_details.too_many_assets {
745 details.vault_details.assets =
746 self.fetch_full_vault(account_id, Some(block_num)).await?;
747 }
748
749 Some(details)
750 } else {
751 None
752 };
753
754 let proof = AccountProof::new(account_witness, headers)
755 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
756
757 Ok((block_num, proof))
758 }
759
760 async fn sync_notes(
763 &self,
764 block_num: BlockNumber,
765 block_to: Option<BlockNumber>,
766 note_tags: &BTreeSet<NoteTag>,
767 ) -> Result<NoteSyncInfo, RpcError> {
768 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
769
770 let block_range = Some(BlockRange {
771 block_from: block_num.as_u32(),
772 block_to: block_to.map(|b| b.as_u32()),
773 });
774
775 let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
776
777 let response = self
778 .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
779 let request = request.clone();
780 Box::pin(async move { rpc_api.sync_notes(request).await })
781 })
782 .await?;
783
784 response.into_inner().try_into()
785 }
786
787 async fn sync_nullifiers(
788 &self,
789 prefixes: &[u16],
790 block_num: BlockNumber,
791 block_to: Option<BlockNumber>,
792 ) -> Result<Vec<NullifierUpdate>, RpcError> {
793 const MAX_ITERATIONS: u32 = 1000; let limits = self.get_rpc_limits().await?;
796 let mut all_nullifiers = BTreeSet::new();
797
798 'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
801 let mut current_block_from = block_num.as_u32();
802
803 for _ in 0..MAX_ITERATIONS {
804 let request = proto::rpc::SyncNullifiersRequest {
805 nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
806 prefix_len: 16,
807 block_range: Some(BlockRange {
808 block_from: current_block_from,
809 block_to: block_to.map(|b| b.as_u32()),
810 }),
811 };
812
813 let response = self
814 .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
815 let request = request.clone();
816 Box::pin(async move { rpc_api.sync_nullifiers(request).await })
817 })
818 .await?;
819 let response = response.into_inner();
820
821 let batch_nullifiers = response
823 .nullifiers
824 .iter()
825 .map(TryFrom::try_from)
826 .collect::<Result<Vec<NullifierUpdate>, _>>()
827 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
828
829 all_nullifiers.extend(batch_nullifiers);
830
831 if let Some(page) = response.pagination_info {
833 if page.block_num < current_block_from {
835 return Err(RpcError::PaginationError(
836 "invalid pagination: block_num went backwards".to_string(),
837 ));
838 }
839
840 let target_block =
842 block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
843
844 if page.block_num >= target_block {
845 continue 'chunk_nullifiers;
847 }
848 current_block_from = page.block_num + 1;
849 }
850 }
851 return Err(RpcError::PaginationError(
853 "too many pagination iterations, possible infinite loop".to_string(),
854 ));
855 }
856 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
857 }
858
859 async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
860 let limits = self.get_rpc_limits().await?;
861 let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
862 for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
863 let request = proto::rpc::NullifierList {
864 nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
865 };
866
867 let response = self
868 .call_with_retry(RpcEndpoint::CheckNullifiers, |mut rpc_api| {
869 let request = request.clone();
870 Box::pin(async move { rpc_api.check_nullifiers(request).await })
871 })
872 .await?;
873
874 let mut response = response.into_inner();
875 let chunk_proofs = response
876 .proofs
877 .iter_mut()
878 .map(|r| r.to_owned().try_into())
879 .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
880 proofs.extend(chunk_proofs);
881 }
882 Ok(proofs)
883 }
884
885 async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
886 let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
887
888 let response = self
889 .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
890 Box::pin(async move { rpc_api.get_block_by_number(request).await })
891 })
892 .await?;
893
894 let response = response.into_inner();
895 let block =
896 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
897 "GetBlockByNumberResponse.block".to_string(),
898 ))?)?;
899
900 Ok(block)
901 }
902
903 async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
904 let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
905
906 let response = self
907 .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
908 Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
909 })
910 .await?;
911
912 let response = response.into_inner();
913 let note_script = NoteScript::try_from(
914 response
915 .script
916 .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
917 )?;
918
919 Ok(note_script)
920 }
921
922 async fn sync_storage_maps(
923 &self,
924 block_from: BlockNumber,
925 block_to: Option<BlockNumber>,
926 account_id: AccountId,
927 ) -> Result<StorageMapInfo, RpcError> {
928 let mut pagination = BlockPagination::new(block_from, block_to);
929 let mut updates = Vec::new();
930
931 let (chain_tip, block_number) = loop {
932 let request = proto::rpc::SyncAccountStorageMapsRequest {
933 block_range: Some(BlockRange {
934 block_from: pagination.current_block_from().as_u32(),
935 block_to: pagination.block_to().map(|block| block.as_u32()),
936 }),
937 account_id: Some(account_id.into()),
938 };
939 let response = self
940 .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
941 let request = request.clone();
942 Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
943 })
944 .await?;
945 let response = response.into_inner();
946 let page = response
947 .pagination_info
948 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
949 let page_block_num = BlockNumber::from(page.block_num);
950 let page_chain_tip = BlockNumber::from(page.chain_tip);
951 let batch = response
952 .updates
953 .into_iter()
954 .map(TryInto::try_into)
955 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
956 updates.extend(batch);
957
958 match pagination.advance(page_block_num, page_chain_tip)? {
959 PaginationResult::Continue => {},
960 PaginationResult::Done {
961 chain_tip: final_chain_tip,
962 block_num: final_block_num,
963 } => break (final_chain_tip, final_block_num),
964 }
965 };
966
967 Ok(StorageMapInfo { chain_tip, block_number, updates })
968 }
969
970 async fn sync_account_vault(
971 &self,
972 block_from: BlockNumber,
973 block_to: Option<BlockNumber>,
974 account_id: AccountId,
975 ) -> Result<AccountVaultInfo, RpcError> {
976 let mut pagination = BlockPagination::new(block_from, block_to);
977 let mut updates = Vec::new();
978
979 let (chain_tip, block_number) = loop {
980 let request = proto::rpc::SyncAccountVaultRequest {
981 block_range: Some(BlockRange {
982 block_from: pagination.current_block_from().as_u32(),
983 block_to: pagination.block_to().map(|block| block.as_u32()),
984 }),
985 account_id: Some(account_id.into()),
986 };
987 let response = self
988 .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
989 let request = request.clone();
990 Box::pin(async move { rpc_api.sync_account_vault(request).await })
991 })
992 .await?;
993 let response = response.into_inner();
994 let page = response
995 .pagination_info
996 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
997 let page_block_num = BlockNumber::from(page.block_num);
998 let page_chain_tip = BlockNumber::from(page.chain_tip);
999 let batch = response
1000 .updates
1001 .iter()
1002 .map(|u| (*u).try_into())
1003 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
1004 updates.extend(batch);
1005
1006 match pagination.advance(page_block_num, page_chain_tip)? {
1007 PaginationResult::Continue => {},
1008 PaginationResult::Done {
1009 chain_tip: final_chain_tip,
1010 block_num: final_block_num,
1011 } => break (final_chain_tip, final_block_num),
1012 }
1013 };
1014
1015 Ok(AccountVaultInfo { chain_tip, block_number, updates })
1016 }
1017
1018 async fn sync_transactions(
1019 &self,
1020 block_from: BlockNumber,
1021 block_to: Option<BlockNumber>,
1022 account_ids: Vec<AccountId>,
1023 ) -> Result<TransactionsInfo, RpcError> {
1024 let block_range = Some(BlockRange {
1025 block_from: block_from.as_u32(),
1026 block_to: block_to.map(|b| b.as_u32()),
1027 });
1028
1029 let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
1030
1031 let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
1032
1033 let response = self
1034 .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
1035 let request = request.clone();
1036 Box::pin(async move { rpc_api.sync_transactions(request).await })
1037 })
1038 .await?;
1039
1040 response.into_inner().try_into()
1041 }
1042
1043 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
1044 let endpoint: Endpoint =
1045 Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
1046 Ok(endpoint.to_network_id())
1047 }
1048
1049 async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
1050 if let Some(limits) = *self.limits.read() {
1052 return Ok(limits);
1053 }
1054
1055 let response = self
1057 .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
1058 Box::pin(async move { rpc_api.get_limits(()).await })
1059 })
1060 .await?;
1061 let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
1062
1063 self.limits.write().replace(limits);
1065 Ok(limits)
1066 }
1067
1068 fn has_rpc_limits(&self) -> Option<RpcLimits> {
1069 *self.limits.read()
1070 }
1071
1072 async fn set_rpc_limits(&self, limits: RpcLimits) {
1073 self.limits.write().replace(limits);
1074 }
1075
1076 async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
1077 GrpcClient::get_status_unversioned(self).await
1078 }
1079}
1080
1081impl RpcError {
1085 pub fn from_grpc_error_with_context(
1086 endpoint: RpcEndpoint,
1087 status: Status,
1088 context: AcceptHeaderContext,
1089 ) -> Self {
1090 if let Some(accept_error) =
1091 AcceptHeaderError::try_from_message_with_context(status.message(), context)
1092 {
1093 return Self::AcceptHeaderError(accept_error);
1094 }
1095
1096 let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
1098
1099 let error_kind = GrpcError::from(&status);
1100 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
1101
1102 Self::RequestError {
1103 endpoint,
1104 error_kind,
1105 endpoint_error,
1106 source: Some(source),
1107 }
1108 }
1109}
1110
1111impl From<&Status> for GrpcError {
1112 fn from(status: &Status) -> Self {
1113 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1114 }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119 use std::boxed::Box;
1120
1121 use miden_protocol::Word;
1122 use miden_protocol::block::BlockNumber;
1123
1124 use super::{BlockPagination, GrpcClient, PaginationResult};
1125 use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1126
1127 fn assert_send_sync<T: Send + Sync>() {}
1128
1129 #[test]
1130 fn is_send_sync() {
1131 assert_send_sync::<GrpcClient>();
1132 assert_send_sync::<Box<dyn NodeRpcClient>>();
1133 }
1134
1135 #[test]
1136 fn block_pagination_errors_when_block_num_goes_backwards() {
1137 let mut pagination = BlockPagination::new(10_u32.into(), None);
1138
1139 let res = pagination.advance(9_u32.into(), 20_u32.into());
1140 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1141 }
1142
1143 #[test]
1144 fn block_pagination_errors_after_max_iterations() {
1145 let mut pagination = BlockPagination::new(0_u32.into(), None);
1146 let chain_tip: BlockNumber = 10_000_u32.into();
1147
1148 for _ in 0..BlockPagination::MAX_ITERATIONS {
1149 let current = pagination.current_block_from();
1150 let res = pagination
1151 .advance(current, chain_tip)
1152 .expect("expected pagination to continue within iteration limit");
1153 assert!(matches!(res, PaginationResult::Continue));
1154 }
1155
1156 let res = pagination.advance(pagination.current_block_from(), chain_tip);
1157 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1158 }
1159
1160 #[test]
1161 fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1162 let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1164
1165 let res = pagination
1166 .advance(30_u32.into(), 30_u32.into())
1167 .expect("expected pagination to succeed");
1168
1169 assert!(matches!(
1170 res,
1171 PaginationResult::Done {
1172 chain_tip,
1173 block_num
1174 } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1175 ));
1176 }
1177
1178 #[test]
1179 fn block_pagination_advances_cursor_by_one() {
1180 let mut pagination = BlockPagination::new(5_u32.into(), None);
1181
1182 let res = pagination
1183 .advance(5_u32.into(), 100_u32.into())
1184 .expect("expected pagination to succeed");
1185 assert!(matches!(res, PaginationResult::Continue));
1186 assert_eq!(pagination.current_block_from().as_u32(), 6);
1187 }
1188
1189 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1191 let res = client.get_block_header_by_number(None, false).await;
1193 assert!(res.is_ok());
1194 }
1195
1196 #[tokio::test]
1197 async fn future_is_send() {
1198 let endpoint = &Endpoint::devnet();
1199 let client = GrpcClient::new(endpoint, 10000);
1200 let client: Box<GrpcClient> = client.into();
1201 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1202 }
1203
1204 #[tokio::test]
1205 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1206 let endpoint = &Endpoint::devnet();
1207 let client = GrpcClient::new(endpoint, 10000);
1208
1209 assert!(client.genesis_commitment.read().is_none());
1210
1211 let commitment = Word::default();
1212 client.set_genesis_commitment(commitment).await.unwrap();
1213
1214 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1215 }
1216
1217 #[tokio::test]
1218 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1219 use miden_protocol::Felt;
1220
1221 let endpoint = &Endpoint::devnet();
1222 let client = GrpcClient::new(endpoint, 10000);
1223
1224 let initial_commitment = Word::default();
1225 client.set_genesis_commitment(initial_commitment).await.unwrap();
1226
1227 let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1228 client.set_genesis_commitment(new_commitment).await.unwrap();
1229
1230 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1231 }
1232
1233 #[tokio::test]
1234 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1235 let endpoint = &Endpoint::devnet();
1236 let client = GrpcClient::new(endpoint, 10000);
1237
1238 client.connect().await.unwrap();
1240
1241 let commitment = Word::default();
1242 client.set_genesis_commitment(commitment).await.unwrap();
1243
1244 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1245 assert!(client.client.read().as_ref().is_some());
1246 }
1247}