1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::asset::{Asset, AssetVault};
10use miden_protocol::vm::FutureMaybeSend;
11
12type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
13
14use miden_protocol::account::{
15 Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
16};
17use miden_protocol::address::NetworkId;
18use miden_protocol::block::account_tree::AccountWitness;
19use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
20use miden_protocol::crypto::merkle::MerklePath;
21use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
22use miden_protocol::crypto::merkle::smt::SmtProof;
23use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
24use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
25use miden_protocol::utils::serde::Deserializable;
26use miden_protocol::{EMPTY_WORD, Word};
27use miden_tx::utils::serde::Serializable;
28use miden_tx::utils::sync::RwLock;
29use tonic::Status;
30use tracing::info;
31
32use super::domain::account::{
33 AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
34};
35use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
36use super::generated::rpc::account_request::AccountDetailRequest;
37use super::generated::rpc::AccountRequest;
38use super::{
39 Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
40 RpcStatusInfo,
41};
42use crate::rpc::domain::sync::ChainMmrInfo;
43use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
44use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
45use crate::rpc::domain::transaction::TransactionsInfo;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
49use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
50use crate::rpc::generated::rpc::BlockRange;
51use crate::rpc::domain::limits::RpcLimits;
52use crate::rpc::{AccountStateAt, generated as proto};
53
54mod api_client;
55mod retry;
56
57use api_client::api_client_wrapper::ApiClient;
58
59struct BlockPagination {
61 current_block_from: BlockNumber,
62 block_to: Option<BlockNumber>,
63 iterations: u32,
64}
65
66enum PaginationResult {
67 Continue,
68 Done {
69 chain_tip: BlockNumber,
70 block_num: BlockNumber,
71 },
72}
73
74impl BlockPagination {
75 const MAX_ITERATIONS: u32 = 1000;
80
81 fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
82 Self {
83 current_block_from: block_from,
84 block_to,
85 iterations: 0,
86 }
87 }
88
89 fn current_block_from(&self) -> BlockNumber {
90 self.current_block_from
91 }
92
93 fn block_to(&self) -> Option<BlockNumber> {
94 self.block_to
95 }
96
97 fn advance(
98 &mut self,
99 block_num: BlockNumber,
100 chain_tip: BlockNumber,
101 ) -> Result<PaginationResult, RpcError> {
102 if self.iterations >= Self::MAX_ITERATIONS {
103 return Err(RpcError::PaginationError(
104 "too many pagination iterations, possible infinite loop".to_owned(),
105 ));
106 }
107 self.iterations += 1;
108
109 if block_num < self.current_block_from {
110 return Err(RpcError::PaginationError(
111 "invalid pagination: block_num went backwards".to_owned(),
112 ));
113 }
114
115 let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
116
117 if block_num >= target_block {
118 return Ok(PaginationResult::Done { chain_tip, block_num });
119 }
120
121 self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
122
123 Ok(PaginationResult::Continue)
124 }
125}
126
127pub struct GrpcClient {
141 client: RwLock<Option<ApiClient>>,
143 endpoint: String,
145 timeout_ms: u64,
147 genesis_commitment: RwLock<Option<Word>>,
149 limits: RwLock<Option<RpcLimits>>,
151 max_retries: u32,
153 retry_interval_ms: u64,
155}
156
157impl GrpcClient {
158 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
161 GrpcClient {
162 client: RwLock::new(None),
163 endpoint: endpoint.to_string(),
164 timeout_ms,
165 genesis_commitment: RwLock::new(None),
166 limits: RwLock::new(None),
167 max_retries: retry::DEFAULT_MAX_RETRIES,
168 retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
169 }
170 }
171
172 #[must_use]
175 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
176 self.max_retries = max_retries;
177 self
178 }
179
180 #[must_use]
183 pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
184 self.retry_interval_ms = retry_interval_ms;
185 self
186 }
187
188 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
191 if self.client.read().is_none() {
192 self.connect().await?;
193 }
194
195 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
196 }
197
198 async fn connect(&self) -> Result<(), RpcError> {
201 let genesis_commitment = *self.genesis_commitment.read();
202 let new_client =
203 ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
204 .await?;
205 let mut client = self.client.write();
206 client.replace(new_client);
207
208 Ok(())
209 }
210
211 fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
212 let genesis_commitment = self
213 .genesis_commitment
214 .read()
215 .as_ref()
216 .map_or_else(|| "none".to_string(), Word::to_hex);
217 let context = AcceptHeaderContext {
218 client_version: env!("CARGO_PKG_VERSION").to_string(),
219 genesis_commitment,
220 };
221 RpcError::from_grpc_error_with_context(endpoint, status, context)
222 }
223
224 async fn call_with_retry<T: Send + 'static>(
235 &self,
236 endpoint: RpcEndpoint,
237 mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
238 ) -> Result<tonic::Response<T>, RpcError> {
239 let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
240
241 loop {
242 let rpc_api = self.ensure_connected().await?;
243
244 match call(rpc_api).await {
245 Ok(response) => return Ok(response),
246 Err(status) if retry_state.should_retry(&status).await => {},
247 Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
248 }
249 }
250 }
251
252 pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
257 let mut rpc_api =
258 ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
259 .await?;
260 rpc_api
261 .status(())
262 .await
263 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
264 .map(tonic::Response::into_inner)
265 .and_then(RpcStatusInfo::try_from)
266 }
267
268 pub async fn fetch_full_account_proof(
275 &self,
276 account_id: AccountId,
277 ) -> Result<(BlockNumber, AccountProof), RpcError> {
278 let has_public_state = account_id.has_public_state();
279 let account_request = {
280 AccountRequest {
281 account_id: Some(account_id.into()),
282 block_num: None,
283 details: {
284 if has_public_state {
285 Some(AccountDetailRequest {
290 code_commitment: Some(EMPTY_WORD.into()),
291 asset_vault_commitment: Some(EMPTY_WORD.into()),
292 storage_maps: vec![],
293 })
294 } else {
295 None
296 }
297 },
298 }
299 };
300 let account_response = self
301 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
302 let request = account_request.clone();
303 Box::pin(async move { rpc_api.get_account(request).await })
304 })
305 .await?
306 .into_inner();
307 let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
308 "GetAccountDetails returned an account without a matching block number for the witness"
309 .to_owned(),
310 ))?;
311 let account_proof = {
312 if has_public_state {
313 let account_details = account_response
314 .details
315 .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
316 .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
317 let storage_header = account_details.storage_details.header;
318 let maps_to_request = storage_header
322 .slots()
323 .filter(|header| header.slot_type().is_map())
324 .map(|map| map.name().to_string());
325 let account_request = AccountRequest {
326 account_id: Some(account_id.into()),
327 block_num: Some(block_number),
328 details: Some(AccountDetailRequest {
329 code_commitment: Some(EMPTY_WORD.into()),
330 asset_vault_commitment: Some(EMPTY_WORD.into()),
331 storage_maps: maps_to_request
332 .map(|slot_name| StorageMapDetailRequest {
333 slot_name,
334 slot_data: Some(SlotData::AllEntries(true)),
335 })
336 .collect(),
337 }),
338 };
339 let response = self
340 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
341 let request = account_request.clone();
342 Box::pin(async move { rpc_api.get_account(request).await })
343 })
344 .await?;
345 response.into_inner().try_into()
346 } else {
347 account_response.try_into()
348 }
349 };
350 Ok((block_number.block_num.into(), account_proof?))
351 }
352
353 async fn build_storage_slots(
357 &self,
358 account_id: AccountId,
359 storage_details: &AccountStorageDetails,
360 block_to: Option<BlockNumber>,
361 ) -> Result<Vec<StorageSlot>, RpcError> {
362 let mut slots = vec![];
363 let mut map_cache: Option<StorageMapInfo> = None;
366 for slot_header in storage_details.header.slots() {
367 match slot_header.slot_type() {
374 StorageSlotType::Value => {
375 slots.push(miden_protocol::account::StorageSlot::with_value(
376 slot_header.name().clone(),
377 slot_header.value(),
378 ));
379 },
380 StorageSlotType::Map => {
381 let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
382 RpcError::ExpectedDataMissing(format!(
383 "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
384 slot_header.name(),
385 )),
386 )?;
387
388 let storage_map = if map_details.too_many_entries {
389 let map_info = if let Some(ref info) = map_cache {
390 info
391 } else {
392 let fetched_data =
393 self.sync_storage_maps(0_u32.into(), block_to, account_id).await?;
394 map_cache.insert(fetched_data)
395 };
396 let mut sorted_updates: Vec<_> = map_info
400 .updates
401 .iter()
402 .filter(|slot_info| slot_info.slot_name == *slot_header.name())
403 .collect();
404 sorted_updates.sort_by_key(|u| u.block_num);
405 let map_entries: Vec<_> = sorted_updates
406 .into_iter()
407 .map(|u| (u.key, u.value))
408 .collect::<BTreeMap<_, _>>()
409 .into_iter()
410 .collect();
411 StorageMap::with_entries(map_entries)
412 } else {
413 map_details.entries.clone().into_storage_map().ok_or_else(|| {
414 RpcError::ExpectedDataMissing(
415 "expected AllEntries for full account fetch, got EntriesWithProofs"
416 .into(),
417 )
418 })?
419 }
420 .map_err(|err| {
421 RpcError::InvalidResponse(format!(
422 "the rpc api returned a non-valid map entry: {err}"
423 ))
424 })?;
425
426 slots.push(miden_protocol::account::StorageSlot::with_map(
427 slot_header.name().clone(),
428 storage_map,
429 ));
430 },
431 }
432 }
433 Ok(slots)
434 }
435
436 async fn fetch_full_vault(
441 &self,
442 account_id: AccountId,
443 block_to: Option<BlockNumber>,
444 ) -> Result<Vec<Asset>, RpcError> {
445 let vault_info =
446 self.sync_account_vault(BlockNumber::from(0), block_to, account_id).await?;
447 let mut updates = vault_info.updates;
448 updates.sort_by_key(|u| u.block_num);
449 Ok(updates
450 .into_iter()
451 .map(|u| (u.vault_key, u.asset))
452 .collect::<BTreeMap<_, _>>()
453 .into_values()
454 .flatten()
455 .collect())
456 }
457}
458
459#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
460#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
461impl NodeRpcClient for GrpcClient {
462 fn has_genesis_commitment(&self) -> Option<Word> {
467 *self.genesis_commitment.read()
468 }
469
470 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
471 if self.genesis_commitment.read().is_some() {
473 return Ok(());
475 }
476
477 self.genesis_commitment.write().replace(commitment);
479
480 let mut client_guard = self.client.write();
483 if let Some(client) = client_guard.as_mut() {
484 client.set_genesis_commitment(commitment);
485 }
486
487 Ok(())
488 }
489
490 async fn submit_proven_transaction(
491 &self,
492 proven_transaction: ProvenTransaction,
493 transaction_inputs: TransactionInputs,
494 ) -> Result<BlockNumber, RpcError> {
495 let request = proto::transaction::ProvenTransaction {
496 transaction: proven_transaction.to_bytes(),
497 transaction_inputs: Some(transaction_inputs.to_bytes()),
498 };
499
500 let api_response = self
501 .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
502 let request = request.clone();
503 Box::pin(async move { rpc_api.submit_proven_transaction(request).await })
504 })
505 .await?;
506
507 Ok(BlockNumber::from(api_response.into_inner().block_num))
508 }
509
510 async fn get_block_header_by_number(
511 &self,
512 block_num: Option<BlockNumber>,
513 include_mmr_proof: bool,
514 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
515 let request = proto::rpc::BlockHeaderByNumberRequest {
516 block_num: block_num.as_ref().map(BlockNumber::as_u32),
517 include_mmr_proof: Some(include_mmr_proof),
518 };
519
520 info!("Calling GetBlockHeaderByNumber: {:?}", request);
521
522 let api_response = self
523 .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
524 Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
525 })
526 .await?;
527
528 let response = api_response.into_inner();
529
530 let block_header: BlockHeader = response
531 .block_header
532 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
533 .try_into()?;
534
535 let mmr_proof = if include_mmr_proof {
536 let forest = response
537 .chain_length
538 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
539 let merkle_path: MerklePath = response
540 .mmr_path
541 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
542 .try_into()?;
543
544 Some(MmrProof::new(
545 MmrPath::new(
546 Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
547 block_header.block_num().as_usize(),
548 merkle_path,
549 ),
550 block_header.commitment(),
551 ))
552 } else {
553 None
554 };
555
556 Ok((block_header, mmr_proof))
557 }
558
559 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
560 let limits = self.get_rpc_limits().await?;
561 let mut notes = Vec::with_capacity(note_ids.len());
562 for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
563 let request = proto::note::NoteIdList {
564 ids: chunk.iter().map(|id| (*id).into()).collect(),
565 };
566
567 let api_response = self
568 .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
569 let request = request.clone();
570 Box::pin(async move { rpc_api.get_notes_by_id(request).await })
571 })
572 .await?;
573
574 let response_notes = api_response
575 .into_inner()
576 .notes
577 .into_iter()
578 .map(FetchedNote::try_from)
579 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
580
581 notes.extend(response_notes);
582 }
583 Ok(notes)
584 }
585
586 async fn sync_chain_mmr(
587 &self,
588 block_from: BlockNumber,
589 block_to: Option<BlockNumber>,
590 ) -> Result<ChainMmrInfo, RpcError> {
591 let block_range = Some(BlockRange {
592 block_from: block_from.as_u32(),
593 block_to: block_to.map(|b| b.as_u32()),
594 });
595
596 let request = proto::rpc::SyncChainMmrRequest {
597 block_range,
598 finality: proto::rpc::Finality::Committed as i32,
599 };
600
601 let response = self
602 .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
603 Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
604 })
605 .await?;
606
607 response.into_inner().try_into()
608 }
609
610 async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
622 let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
623 let update_summary =
624 AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
625
626 if account_id.is_private() {
629 Ok(FetchedAccount::new_private(account_id, update_summary))
630 } else {
631 let details =
632 full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
633 "GetAccountDetails returned a public account without details".to_owned(),
634 ))?;
635
636 let account_id = details.header.id();
637 let nonce = details.header.nonce();
638
639 let assets = if details.vault_details.too_many_assets {
642 self.fetch_full_vault(account_id, Some(block_number)).await?
643 } else {
644 details.vault_details.assets
645 };
646
647 let slots = self
650 .build_storage_slots(account_id, &details.storage_details, Some(block_number))
651 .await?;
652 let asset_vault = AssetVault::new(&assets).map_err(|err| {
653 RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
654 })?;
655 let account_storage = AccountStorage::new(slots).map_err(|err| {
656 RpcError::InvalidResponse(format!(
657 "api rpc returned non-valid storage slots: {err}"
658 ))
659 })?;
660 let account =
661 Account::new(account_id, asset_vault, account_storage, details.code, nonce, None)
662 .map_err(|err| {
663 RpcError::InvalidResponse(format!(
664 "failed to instance an account from the rpc api response: {err}"
665 ))
666 })?;
667 Ok(FetchedAccount::new_public(account, update_summary))
668 }
669 }
670
671 async fn get_account_proof(
683 &self,
684 account_id: AccountId,
685 storage_requirements: AccountStorageRequirements,
686 account_state: AccountStateAt,
687 known_account_code: Option<AccountCode>,
688 known_vault_commitment: Option<Word>,
689 ) -> Result<(BlockNumber, AccountProof), RpcError> {
690 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
691 if let Some(account_code) = known_account_code {
692 known_codes_by_commitment.insert(account_code.commitment(), account_code);
693 }
694
695 let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
696
697 let account_details = if account_id.has_public_state() {
700 Some(AccountDetailRequest {
701 code_commitment: Some(EMPTY_WORD.into()),
702 asset_vault_commitment: known_vault_commitment.map(Into::into),
703 storage_maps,
704 })
705 } else {
706 None
707 };
708
709 let block_num = match account_state {
710 AccountStateAt::Block(number) => Some(number.into()),
711 AccountStateAt::ChainTip => None,
712 };
713
714 let request = AccountRequest {
715 account_id: Some(account_id.into()),
716 block_num,
717 details: account_details,
718 };
719
720 let response = self
721 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
722 let request = request.clone();
723 Box::pin(async move { rpc_api.get_account(request).await })
724 })
725 .await?
726 .into_inner();
727
728 let account_witness: AccountWitness = response
729 .witness
730 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
731 .try_into()?;
732
733 let block_num: BlockNumber = response
734 .block_num
735 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
736 .block_num
737 .into();
738
739 let headers = if account_witness.id().has_public_state() {
741 let mut details = response
742 .details
743 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
744 .into_domain(&known_codes_by_commitment, &storage_requirements)?;
745
746 if details.vault_details.too_many_assets {
747 details.vault_details.assets =
748 self.fetch_full_vault(account_id, Some(block_num)).await?;
749 }
750
751 Some(details)
752 } else {
753 None
754 };
755
756 let proof = AccountProof::new(account_witness, headers)
757 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
758
759 Ok((block_num, proof))
760 }
761
762 async fn sync_notes(
765 &self,
766 block_num: BlockNumber,
767 block_to: Option<BlockNumber>,
768 note_tags: &BTreeSet<NoteTag>,
769 ) -> Result<NoteSyncInfo, RpcError> {
770 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
771
772 let block_range = Some(BlockRange {
773 block_from: block_num.as_u32(),
774 block_to: block_to.map(|b| b.as_u32()),
775 });
776
777 let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
778
779 let response = self
780 .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
781 let request = request.clone();
782 Box::pin(async move { rpc_api.sync_notes(request).await })
783 })
784 .await?;
785
786 response.into_inner().try_into()
787 }
788
789 async fn sync_nullifiers(
790 &self,
791 prefixes: &[u16],
792 block_num: BlockNumber,
793 block_to: Option<BlockNumber>,
794 ) -> Result<Vec<NullifierUpdate>, RpcError> {
795 const MAX_ITERATIONS: u32 = 1000; let limits = self.get_rpc_limits().await?;
798 let mut all_nullifiers = BTreeSet::new();
799
800 'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
803 let mut current_block_from = block_num.as_u32();
804
805 for _ in 0..MAX_ITERATIONS {
806 let request = proto::rpc::SyncNullifiersRequest {
807 nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
808 prefix_len: 16,
809 block_range: Some(BlockRange {
810 block_from: current_block_from,
811 block_to: block_to.map(|b| b.as_u32()),
812 }),
813 };
814
815 let response = self
816 .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
817 let request = request.clone();
818 Box::pin(async move { rpc_api.sync_nullifiers(request).await })
819 })
820 .await?;
821 let response = response.into_inner();
822
823 let batch_nullifiers = response
825 .nullifiers
826 .iter()
827 .map(TryFrom::try_from)
828 .collect::<Result<Vec<NullifierUpdate>, _>>()
829 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
830
831 all_nullifiers.extend(batch_nullifiers);
832
833 if let Some(page) = response.pagination_info {
835 if page.block_num < current_block_from {
837 return Err(RpcError::PaginationError(
838 "invalid pagination: block_num went backwards".to_string(),
839 ));
840 }
841
842 let target_block =
844 block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
845
846 if page.block_num >= target_block {
847 continue 'chunk_nullifiers;
849 }
850 current_block_from = page.block_num + 1;
851 }
852 }
853 return Err(RpcError::PaginationError(
855 "too many pagination iterations, possible infinite loop".to_string(),
856 ));
857 }
858 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
859 }
860
861 async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
862 let limits = self.get_rpc_limits().await?;
863 let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
864 for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
865 let request = proto::rpc::NullifierList {
866 nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
867 };
868
869 let response = self
870 .call_with_retry(RpcEndpoint::CheckNullifiers, |mut rpc_api| {
871 let request = request.clone();
872 Box::pin(async move { rpc_api.check_nullifiers(request).await })
873 })
874 .await?;
875
876 let mut response = response.into_inner();
877 let chunk_proofs = response
878 .proofs
879 .iter_mut()
880 .map(|r| r.to_owned().try_into())
881 .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
882 proofs.extend(chunk_proofs);
883 }
884 Ok(proofs)
885 }
886
887 async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
888 let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
889
890 let response = self
891 .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
892 Box::pin(async move { rpc_api.get_block_by_number(request).await })
893 })
894 .await?;
895
896 let response = response.into_inner();
897 let block =
898 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
899 "GetBlockByNumberResponse.block".to_string(),
900 ))?)?;
901
902 Ok(block)
903 }
904
905 async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
906 let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
907
908 let response = self
909 .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
910 Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
911 })
912 .await?;
913
914 let response = response.into_inner();
915 let note_script = NoteScript::try_from(
916 response
917 .script
918 .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
919 )?;
920
921 Ok(note_script)
922 }
923
924 async fn sync_storage_maps(
925 &self,
926 block_from: BlockNumber,
927 block_to: Option<BlockNumber>,
928 account_id: AccountId,
929 ) -> Result<StorageMapInfo, RpcError> {
930 let mut pagination = BlockPagination::new(block_from, block_to);
931 let mut updates = Vec::new();
932
933 let (chain_tip, block_number) = loop {
934 let request = proto::rpc::SyncAccountStorageMapsRequest {
935 block_range: Some(BlockRange {
936 block_from: pagination.current_block_from().as_u32(),
937 block_to: pagination.block_to().map(|block| block.as_u32()),
938 }),
939 account_id: Some(account_id.into()),
940 };
941 let response = self
942 .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
943 let request = request.clone();
944 Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
945 })
946 .await?;
947 let response = response.into_inner();
948 let page = response
949 .pagination_info
950 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
951 let page_block_num = BlockNumber::from(page.block_num);
952 let page_chain_tip = BlockNumber::from(page.chain_tip);
953 let batch = response
954 .updates
955 .into_iter()
956 .map(TryInto::try_into)
957 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
958 updates.extend(batch);
959
960 match pagination.advance(page_block_num, page_chain_tip)? {
961 PaginationResult::Continue => {},
962 PaginationResult::Done {
963 chain_tip: final_chain_tip,
964 block_num: final_block_num,
965 } => break (final_chain_tip, final_block_num),
966 }
967 };
968
969 Ok(StorageMapInfo { chain_tip, block_number, updates })
970 }
971
972 async fn sync_account_vault(
973 &self,
974 block_from: BlockNumber,
975 block_to: Option<BlockNumber>,
976 account_id: AccountId,
977 ) -> Result<AccountVaultInfo, RpcError> {
978 let mut pagination = BlockPagination::new(block_from, block_to);
979 let mut updates = Vec::new();
980
981 let (chain_tip, block_number) = loop {
982 let request = proto::rpc::SyncAccountVaultRequest {
983 block_range: Some(BlockRange {
984 block_from: pagination.current_block_from().as_u32(),
985 block_to: pagination.block_to().map(|block| block.as_u32()),
986 }),
987 account_id: Some(account_id.into()),
988 };
989 let response = self
990 .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
991 let request = request.clone();
992 Box::pin(async move { rpc_api.sync_account_vault(request).await })
993 })
994 .await?;
995 let response = response.into_inner();
996 let page = response
997 .pagination_info
998 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
999 let page_block_num = BlockNumber::from(page.block_num);
1000 let page_chain_tip = BlockNumber::from(page.chain_tip);
1001 let batch = response
1002 .updates
1003 .iter()
1004 .map(|u| (*u).try_into())
1005 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
1006 updates.extend(batch);
1007
1008 match pagination.advance(page_block_num, page_chain_tip)? {
1009 PaginationResult::Continue => {},
1010 PaginationResult::Done {
1011 chain_tip: final_chain_tip,
1012 block_num: final_block_num,
1013 } => break (final_chain_tip, final_block_num),
1014 }
1015 };
1016
1017 Ok(AccountVaultInfo { chain_tip, block_number, updates })
1018 }
1019
1020 async fn sync_transactions(
1021 &self,
1022 block_from: BlockNumber,
1023 block_to: Option<BlockNumber>,
1024 account_ids: Vec<AccountId>,
1025 ) -> Result<TransactionsInfo, RpcError> {
1026 let block_range = Some(BlockRange {
1027 block_from: block_from.as_u32(),
1028 block_to: block_to.map(|b| b.as_u32()),
1029 });
1030
1031 let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
1032
1033 let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
1034
1035 let response = self
1036 .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
1037 let request = request.clone();
1038 Box::pin(async move { rpc_api.sync_transactions(request).await })
1039 })
1040 .await?;
1041
1042 response.into_inner().try_into()
1043 }
1044
1045 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
1046 let endpoint: Endpoint =
1047 Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
1048 Ok(endpoint.to_network_id())
1049 }
1050
1051 async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
1052 if let Some(limits) = *self.limits.read() {
1054 return Ok(limits);
1055 }
1056
1057 let response = self
1059 .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
1060 Box::pin(async move { rpc_api.get_limits(()).await })
1061 })
1062 .await?;
1063 let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
1064
1065 self.limits.write().replace(limits);
1067 Ok(limits)
1068 }
1069
1070 fn has_rpc_limits(&self) -> Option<RpcLimits> {
1071 *self.limits.read()
1072 }
1073
1074 async fn set_rpc_limits(&self, limits: RpcLimits) {
1075 self.limits.write().replace(limits);
1076 }
1077
1078 async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
1079 GrpcClient::get_status_unversioned(self).await
1080 }
1081}
1082
1083impl RpcError {
1087 pub fn from_grpc_error_with_context(
1088 endpoint: RpcEndpoint,
1089 status: Status,
1090 context: AcceptHeaderContext,
1091 ) -> Self {
1092 if let Some(accept_error) =
1093 AcceptHeaderError::try_from_message_with_context(status.message(), context)
1094 {
1095 return Self::AcceptHeaderError(accept_error);
1096 }
1097
1098 let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
1100
1101 let error_kind = GrpcError::from(&status);
1102 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
1103
1104 Self::RequestError {
1105 endpoint,
1106 error_kind,
1107 endpoint_error,
1108 source: Some(source),
1109 }
1110 }
1111}
1112
1113impl From<&Status> for GrpcError {
1114 fn from(status: &Status) -> Self {
1115 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use std::boxed::Box;
1122
1123 use miden_protocol::Word;
1124 use miden_protocol::block::BlockNumber;
1125
1126 use super::{BlockPagination, GrpcClient, PaginationResult};
1127 use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1128
1129 fn assert_send_sync<T: Send + Sync>() {}
1130
1131 #[test]
1132 fn is_send_sync() {
1133 assert_send_sync::<GrpcClient>();
1134 assert_send_sync::<Box<dyn NodeRpcClient>>();
1135 }
1136
1137 #[test]
1138 fn block_pagination_errors_when_block_num_goes_backwards() {
1139 let mut pagination = BlockPagination::new(10_u32.into(), None);
1140
1141 let res = pagination.advance(9_u32.into(), 20_u32.into());
1142 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1143 }
1144
1145 #[test]
1146 fn block_pagination_errors_after_max_iterations() {
1147 let mut pagination = BlockPagination::new(0_u32.into(), None);
1148 let chain_tip: BlockNumber = 10_000_u32.into();
1149
1150 for _ in 0..BlockPagination::MAX_ITERATIONS {
1151 let current = pagination.current_block_from();
1152 let res = pagination
1153 .advance(current, chain_tip)
1154 .expect("expected pagination to continue within iteration limit");
1155 assert!(matches!(res, PaginationResult::Continue));
1156 }
1157
1158 let res = pagination.advance(pagination.current_block_from(), chain_tip);
1159 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1160 }
1161
1162 #[test]
1163 fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1164 let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1166
1167 let res = pagination
1168 .advance(30_u32.into(), 30_u32.into())
1169 .expect("expected pagination to succeed");
1170
1171 assert!(matches!(
1172 res,
1173 PaginationResult::Done {
1174 chain_tip,
1175 block_num
1176 } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1177 ));
1178 }
1179
1180 #[test]
1181 fn block_pagination_advances_cursor_by_one() {
1182 let mut pagination = BlockPagination::new(5_u32.into(), None);
1183
1184 let res = pagination
1185 .advance(5_u32.into(), 100_u32.into())
1186 .expect("expected pagination to succeed");
1187 assert!(matches!(res, PaginationResult::Continue));
1188 assert_eq!(pagination.current_block_from().as_u32(), 6);
1189 }
1190
1191 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1193 let res = client.get_block_header_by_number(None, false).await;
1195 assert!(res.is_ok());
1196 }
1197
1198 #[tokio::test]
1199 async fn future_is_send() {
1200 let endpoint = &Endpoint::devnet();
1201 let client = GrpcClient::new(endpoint, 10000);
1202 let client: Box<GrpcClient> = client.into();
1203 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1204 }
1205
1206 #[tokio::test]
1207 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1208 let endpoint = &Endpoint::devnet();
1209 let client = GrpcClient::new(endpoint, 10000);
1210
1211 assert!(client.genesis_commitment.read().is_none());
1212
1213 let commitment = Word::default();
1214 client.set_genesis_commitment(commitment).await.unwrap();
1215
1216 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1217 }
1218
1219 #[tokio::test]
1220 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1221 use miden_protocol::Felt;
1222
1223 let endpoint = &Endpoint::devnet();
1224 let client = GrpcClient::new(endpoint, 10000);
1225
1226 let initial_commitment = Word::default();
1227 client.set_genesis_commitment(initial_commitment).await.unwrap();
1228
1229 let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1230 client.set_genesis_commitment(new_commitment).await.unwrap();
1231
1232 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1233 }
1234
1235 #[tokio::test]
1236 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1237 let endpoint = &Endpoint::devnet();
1238 let client = GrpcClient::new(endpoint, 10000);
1239
1240 client.connect().await.unwrap();
1242
1243 let commitment = Word::default();
1244 client.set_genesis_commitment(commitment).await.unwrap();
1245
1246 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1247 assert!(client.client.read().as_ref().is_some());
1248 }
1249}