1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use miden_protocol::asset::{Asset, AssetVault};
8
9use miden_protocol::account::{
10 Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
11};
12use miden_protocol::address::NetworkId;
13use miden_protocol::block::account_tree::AccountWitness;
14use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
15use miden_protocol::crypto::merkle::MerklePath;
16use miden_protocol::crypto::merkle::mmr::{Forest, MmrProof};
17use miden_protocol::crypto::merkle::smt::SmtProof;
18use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
19use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
20use miden_protocol::utils::Deserializable;
21use miden_protocol::{EMPTY_WORD, Word};
22use miden_tx::utils::Serializable;
23use miden_tx::utils::sync::RwLock;
24use tonic::Status;
25use tracing::info;
26
27use super::domain::account::{AccountProof, AccountStorageDetails, AccountUpdateSummary};
28use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
29use super::generated::rpc::account_request::AccountDetailRequest;
30use super::generated::rpc::AccountRequest;
31use super::{
32 Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
33 RpcStatusInfo,
34};
35use crate::rpc::domain::sync::ChainMmrInfo;
36use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
37use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
38use crate::rpc::domain::transaction::TransactionsInfo;
39use crate::rpc::errors::node::parse_node_error;
40use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
41use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
42use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
43use crate::rpc::generated::rpc::BlockRange;
44use crate::rpc::domain::limits::RpcLimits;
45use crate::rpc::{AccountStateAt, generated as proto};
46use crate::rpc::domain::account::AccountStorageRequirements;
47
48mod api_client;
49use api_client::api_client_wrapper::ApiClient;
50
51struct BlockPagination {
53 current_block_from: BlockNumber,
54 block_to: Option<BlockNumber>,
55 iterations: u32,
56}
57
58enum PaginationResult {
59 Continue,
60 Done {
61 chain_tip: BlockNumber,
62 block_num: BlockNumber,
63 },
64}
65
66impl BlockPagination {
67 const MAX_ITERATIONS: u32 = 1000;
72
73 fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
74 Self {
75 current_block_from: block_from,
76 block_to,
77 iterations: 0,
78 }
79 }
80
81 fn current_block_from(&self) -> BlockNumber {
82 self.current_block_from
83 }
84
85 fn block_to(&self) -> Option<BlockNumber> {
86 self.block_to
87 }
88
89 fn advance(
90 &mut self,
91 block_num: BlockNumber,
92 chain_tip: BlockNumber,
93 ) -> Result<PaginationResult, RpcError> {
94 if self.iterations >= Self::MAX_ITERATIONS {
95 return Err(RpcError::PaginationError(
96 "too many pagination iterations, possible infinite loop".to_owned(),
97 ));
98 }
99 self.iterations += 1;
100
101 if block_num < self.current_block_from {
102 return Err(RpcError::PaginationError(
103 "invalid pagination: block_num went backwards".to_owned(),
104 ));
105 }
106
107 let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
108
109 if block_num >= target_block {
110 return Ok(PaginationResult::Done { chain_tip, block_num });
111 }
112
113 self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
114
115 Ok(PaginationResult::Continue)
116 }
117}
118
119pub struct GrpcClient {
133 client: RwLock<Option<ApiClient>>,
135 endpoint: String,
137 timeout_ms: u64,
139 genesis_commitment: RwLock<Option<Word>>,
141 limits: RwLock<Option<RpcLimits>>,
143}
144
145impl GrpcClient {
146 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
149 GrpcClient {
150 client: RwLock::new(None),
151 endpoint: endpoint.to_string(),
152 timeout_ms,
153 genesis_commitment: RwLock::new(None),
154 limits: RwLock::new(None),
155 }
156 }
157
158 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
161 if self.client.read().is_none() {
162 self.connect().await?;
163 }
164
165 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
166 }
167
168 async fn connect(&self) -> Result<(), RpcError> {
171 let genesis_commitment = *self.genesis_commitment.read();
172 let new_client =
173 ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
174 .await?;
175 let mut client = self.client.write();
176 client.replace(new_client);
177
178 Ok(())
179 }
180
181 fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
182 let genesis_commitment = self
183 .genesis_commitment
184 .read()
185 .as_ref()
186 .map_or_else(|| "none".to_string(), Word::to_hex);
187 let context = AcceptHeaderContext {
188 client_version: env!("CARGO_PKG_VERSION").to_string(),
189 genesis_commitment,
190 };
191 RpcError::from_grpc_error_with_context(endpoint, status, context)
192 }
193
194 pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
199 let mut rpc_api =
200 ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
201 .await?;
202 rpc_api
203 .status(())
204 .await
205 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
206 .map(tonic::Response::into_inner)
207 .and_then(RpcStatusInfo::try_from)
208 }
209
210 pub async fn fetch_full_account_proof(
217 &self,
218 account_id: AccountId,
219 ) -> Result<(BlockNumber, AccountProof), RpcError> {
220 let mut rpc_api = self.ensure_connected().await?;
221 let has_public_state = account_id.has_public_state();
222 let account_request = {
223 AccountRequest {
224 account_id: Some(account_id.into()),
225 block_num: None,
226 details: {
227 if has_public_state {
228 Some(AccountDetailRequest {
233 code_commitment: Some(EMPTY_WORD.into()),
234 asset_vault_commitment: Some(EMPTY_WORD.into()),
235 storage_maps: vec![],
236 })
237 } else {
238 None
239 }
240 },
241 }
242 };
243 let account_response = rpc_api
244 .get_account(account_request)
245 .await
246 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetAccount, status))?
247 .into_inner();
248 let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
249 "GetAccountDetails returned an account without a matching block number for the witness"
250 .to_owned(),
251 ))?;
252 let account_proof = {
253 if has_public_state {
254 let account_details = account_response
255 .details
256 .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
257 .into_domain(&BTreeMap::new())?;
258 let storage_header = account_details.storage_details.header;
259 let maps_to_request = storage_header
263 .slots()
264 .filter(|header| header.slot_type().is_map())
265 .map(|map| map.name().to_string());
266 let account_request = AccountRequest {
267 account_id: Some(account_id.into()),
268 block_num: None,
269 details: Some(AccountDetailRequest {
270 code_commitment: Some(EMPTY_WORD.into()),
271 asset_vault_commitment: Some(EMPTY_WORD.into()),
272 storage_maps: maps_to_request
273 .map(|slot_name| StorageMapDetailRequest {
274 slot_name,
275 slot_data: Some(SlotData::AllEntries(true)),
276 })
277 .collect(),
278 }),
279 };
280 match rpc_api.get_account(account_request).await {
281 Ok(account_proof) => account_proof.into_inner().try_into(),
282 Err(err) => Err(RpcError::ConnectionError(
283 format!(
284 "failed to fetch account proof for account: {account_id}, got: {err}"
285 )
286 .into(),
287 )),
288 }
289 } else {
290 account_response.try_into()
291 }
292 };
293 Ok((block_number.block_num.into(), account_proof?))
294 }
295
296 async fn build_storage_slots(
300 &self,
301 account_id: AccountId,
302 storage_details: &AccountStorageDetails,
303 ) -> Result<Vec<StorageSlot>, RpcError> {
304 let mut slots = vec![];
305 let mut map_cache: Option<StorageMapInfo> = None;
308 for slot_header in storage_details.header.slots() {
309 match slot_header.slot_type() {
316 StorageSlotType::Value => {
317 slots.push(miden_protocol::account::StorageSlot::with_value(
318 slot_header.name().clone(),
319 slot_header.value(),
320 ));
321 },
322 StorageSlotType::Map => {
323 let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
324 RpcError::ExpectedDataMissing(format!(
325 "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
326 slot_header.name(),
327 )),
328 )?;
329
330 let storage_map = if map_details.too_many_entries {
331 let map_info = if let Some(ref info) = map_cache {
332 info
333 } else {
334 let fetched_data =
335 self.sync_storage_maps(0_u32.into(), None, account_id).await?;
336 map_cache.insert(fetched_data)
337 };
338 let map_entries: Vec<_> = map_info
339 .updates
340 .iter()
341 .filter(|slot_info| slot_info.slot_name == *slot_header.name())
342 .map(|slot_info| (slot_info.key, slot_info.value))
343 .collect();
344 StorageMap::with_entries(map_entries)
345 } else {
346 map_details.entries.clone().into_storage_map()
347 }
348 .map_err(|err| {
349 RpcError::InvalidResponse(format!(
350 "the rpc api returned a non-valid map entry: {err}"
351 ))
352 })?;
353
354 slots.push(miden_protocol::account::StorageSlot::with_map(
355 slot_header.name().clone(),
356 storage_map,
357 ));
358 },
359 }
360 }
361 Ok(slots)
362 }
363}
364
365#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
366#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
367impl NodeRpcClient for GrpcClient {
368 fn has_genesis_commitment(&self) -> Option<Word> {
373 *self.genesis_commitment.read()
374 }
375
376 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
377 if self.genesis_commitment.read().is_some() {
379 return Ok(());
381 }
382
383 self.genesis_commitment.write().replace(commitment);
385
386 let mut client_guard = self.client.write();
389 if let Some(client) = client_guard.as_mut() {
390 client.set_genesis_commitment(commitment);
391 }
392
393 Ok(())
394 }
395
396 async fn submit_proven_transaction(
397 &self,
398 proven_transaction: ProvenTransaction,
399 transaction_inputs: TransactionInputs,
400 ) -> Result<BlockNumber, RpcError> {
401 let request = proto::transaction::ProvenTransaction {
402 transaction: proven_transaction.to_bytes(),
403 transaction_inputs: Some(transaction_inputs.to_bytes()),
404 };
405
406 let mut rpc_api = self.ensure_connected().await?;
407
408 let api_response = rpc_api
409 .submit_proven_transaction(request)
410 .await
411 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SubmitProvenTx, status))?;
412
413 Ok(BlockNumber::from(api_response.into_inner().block_num))
414 }
415
416 async fn get_block_header_by_number(
417 &self,
418 block_num: Option<BlockNumber>,
419 include_mmr_proof: bool,
420 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
421 let request = proto::rpc::BlockHeaderByNumberRequest {
422 block_num: block_num.as_ref().map(BlockNumber::as_u32),
423 include_mmr_proof: Some(include_mmr_proof),
424 };
425
426 info!("Calling GetBlockHeaderByNumber: {:?}", request);
427
428 let mut rpc_api = self.ensure_connected().await?;
429
430 let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|status| {
431 self.rpc_error_from_status(RpcEndpoint::GetBlockHeaderByNumber, status)
432 })?;
433
434 let response = api_response.into_inner();
435
436 let block_header: BlockHeader = response
437 .block_header
438 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
439 .try_into()?;
440
441 let mmr_proof = if include_mmr_proof {
442 let forest = response
443 .chain_length
444 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
445 let merkle_path: MerklePath = response
446 .mmr_path
447 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
448 .try_into()?;
449
450 Some(MmrProof {
451 forest: Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
452 position: block_header.block_num().as_usize(),
453 merkle_path,
454 })
455 } else {
456 None
457 };
458
459 Ok((block_header, mmr_proof))
460 }
461
462 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
463 let limits = self.get_rpc_limits().await?;
464 let mut notes = Vec::with_capacity(note_ids.len());
465 for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
466 let request = proto::note::NoteIdList {
467 ids: chunk.iter().map(|id| (*id).into()).collect(),
468 };
469
470 let mut rpc_api = self.ensure_connected().await?;
471
472 let api_response = rpc_api
473 .get_notes_by_id(request)
474 .await
475 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetNotesById, status))?;
476
477 let response_notes = api_response
478 .into_inner()
479 .notes
480 .into_iter()
481 .map(FetchedNote::try_from)
482 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
483
484 notes.extend(response_notes);
485 }
486 Ok(notes)
487 }
488
489 async fn sync_chain_mmr(
490 &self,
491 block_from: BlockNumber,
492 block_to: Option<BlockNumber>,
493 ) -> Result<ChainMmrInfo, RpcError> {
494 let block_range = Some(BlockRange {
495 block_from: block_from.as_u32(),
496 block_to: block_to.map(|b| b.as_u32()),
497 });
498
499 let request = proto::rpc::SyncChainMmrRequest { block_range };
500
501 let mut rpc_api = self.ensure_connected().await?;
502
503 let response = rpc_api
504 .sync_chain_mmr(request)
505 .await
506 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SyncChainMmr, status))?;
507
508 response.into_inner().try_into()
509 }
510
511 async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
523 let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
524 let update_summary =
525 AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
526
527 if account_id.is_private() {
530 Ok(FetchedAccount::new_private(account_id, update_summary))
531 } else {
532 let details =
536 full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
537 "GetAccountDetails returned a public account without details".to_owned(),
538 ))?;
539 let account_id = details.header.id();
540 let nonce = details.header.nonce();
541 let assets: Vec<Asset> = {
542 if details.vault_details.too_many_assets {
543 self.sync_account_vault(BlockNumber::from(0), None, account_id)
544 .await?
545 .updates
546 .into_iter()
547 .filter_map(|update| update.asset)
548 .collect()
549 } else {
550 details.vault_details.assets
551 }
552 };
553
554 let slots = self.build_storage_slots(account_id, &details.storage_details).await?;
555 let seed = None;
556 let asset_vault = AssetVault::new(&assets).map_err(|err| {
557 RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
558 })?;
559 let account_storage = AccountStorage::new(slots).map_err(|err| {
560 RpcError::InvalidResponse(format!(
561 "api rpc returned non-valid storage slots: {err}"
562 ))
563 })?;
564 let account =
565 Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
566 .map_err(|err| {
567 RpcError::InvalidResponse(format!(
568 "failed to instance an account from the rpc api response: {err}"
569 ))
570 })?;
571 Ok(FetchedAccount::new_public(account, update_summary))
572 }
573 }
574
575 async fn get_account_proof(
587 &self,
588 account_id: AccountId,
589 storage_requirements: AccountStorageRequirements,
590 account_state: AccountStateAt,
591 known_account_code: Option<AccountCode>,
592 ) -> Result<(BlockNumber, AccountProof), RpcError> {
593 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
594 if let Some(account_code) = known_account_code {
595 known_codes_by_commitment.insert(account_code.commitment(), account_code);
596 }
597
598 let mut rpc_api = self.ensure_connected().await?;
599
600 let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
601
602 let account_details = if account_id.is_public() {
605 Some(AccountDetailRequest {
606 code_commitment: Some(EMPTY_WORD.into()),
607 asset_vault_commitment: None,
610 storage_maps,
611 })
612 } else {
613 None
614 };
615
616 let block_num = match account_state {
617 AccountStateAt::Block(number) => Some(number.into()),
618 AccountStateAt::ChainTip => None,
619 };
620
621 let request = AccountRequest {
622 account_id: Some(account_id.into()),
623 block_num,
624 details: account_details,
625 };
626
627 let response = rpc_api
628 .get_account(request)
629 .await
630 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetAccount, status))?
631 .into_inner();
632
633 let account_witness: AccountWitness = response
634 .witness
635 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
636 .try_into()?;
637
638 let headers = if account_witness.id().is_public() {
640 Some(
641 response
642 .details
643 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
644 .into_domain(&known_codes_by_commitment)?,
645 )
646 } else {
647 None
648 };
649
650 let proof = AccountProof::new(account_witness, headers)
651 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
652
653 let block_num = response
654 .block_num
655 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
656 .block_num
657 .into();
658
659 Ok((block_num, proof))
660 }
661
662 async fn sync_notes(
665 &self,
666 block_num: BlockNumber,
667 block_to: Option<BlockNumber>,
668 note_tags: &BTreeSet<NoteTag>,
669 ) -> Result<NoteSyncInfo, RpcError> {
670 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
671
672 let block_range = Some(BlockRange {
673 block_from: block_num.as_u32(),
674 block_to: block_to.map(|b| b.as_u32()),
675 });
676
677 let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
678
679 let mut rpc_api = self.ensure_connected().await?;
680
681 let response = rpc_api
682 .sync_notes(request)
683 .await
684 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SyncNotes, status))?;
685
686 response.into_inner().try_into()
687 }
688
689 async fn sync_nullifiers(
690 &self,
691 prefixes: &[u16],
692 block_num: BlockNumber,
693 block_to: Option<BlockNumber>,
694 ) -> Result<Vec<NullifierUpdate>, RpcError> {
695 const MAX_ITERATIONS: u32 = 1000; let limits = self.get_rpc_limits().await?;
698 let mut all_nullifiers = BTreeSet::new();
699
700 let mut rpc_api = self.ensure_connected().await?;
702
703 'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
706 let mut current_block_from = block_num.as_u32();
707
708 for _ in 0..MAX_ITERATIONS {
709 let request = proto::rpc::SyncNullifiersRequest {
710 nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
711 prefix_len: 16,
712 block_range: Some(BlockRange {
713 block_from: current_block_from,
714 block_to: block_to.map(|b| b.as_u32()),
715 }),
716 };
717
718 let response = rpc_api.sync_nullifiers(request).await.map_err(|status| {
719 self.rpc_error_from_status(RpcEndpoint::SyncNullifiers, status)
720 })?;
721 let response = response.into_inner();
722
723 let batch_nullifiers = response
725 .nullifiers
726 .iter()
727 .map(TryFrom::try_from)
728 .collect::<Result<Vec<NullifierUpdate>, _>>()
729 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
730
731 all_nullifiers.extend(batch_nullifiers);
732
733 if let Some(page) = response.pagination_info {
735 if page.block_num < current_block_from {
737 return Err(RpcError::PaginationError(
738 "invalid pagination: block_num went backwards".to_string(),
739 ));
740 }
741
742 let target_block =
744 block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
745
746 if page.block_num >= target_block {
747 continue 'chunk_nullifiers;
749 }
750 current_block_from = page.block_num + 1;
751 }
752 }
753 return Err(RpcError::PaginationError(
755 "too many pagination iterations, possible infinite loop".to_string(),
756 ));
757 }
758 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
759 }
760
761 async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
762 let limits = self.get_rpc_limits().await?;
763 let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
764 for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
765 let request = proto::rpc::NullifierList {
766 nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
767 };
768
769 let mut rpc_api = self.ensure_connected().await?;
770
771 let response = rpc_api.check_nullifiers(request).await.map_err(|status| {
772 self.rpc_error_from_status(RpcEndpoint::CheckNullifiers, status)
773 })?;
774
775 let mut response = response.into_inner();
776 let chunk_proofs = response
777 .proofs
778 .iter_mut()
779 .map(|r| r.to_owned().try_into())
780 .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
781 proofs.extend(chunk_proofs);
782 }
783 Ok(proofs)
784 }
785
786 async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
787 let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
788
789 let mut rpc_api = self.ensure_connected().await?;
790
791 let response = rpc_api
792 .get_block_by_number(request)
793 .await
794 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetBlockByNumber, status))?;
795
796 let response = response.into_inner();
797 let block =
798 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
799 "GetBlockByNumberResponse.block".to_string(),
800 ))?)?;
801
802 Ok(block)
803 }
804
805 async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
806 let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
807
808 let mut rpc_api = self.ensure_connected().await?;
809
810 let response = rpc_api.get_note_script_by_root(request).await.map_err(|status| {
811 self.rpc_error_from_status(RpcEndpoint::GetNoteScriptByRoot, status)
812 })?;
813
814 let response = response.into_inner();
815 let note_script = NoteScript::try_from(
816 response
817 .script
818 .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
819 )?;
820
821 Ok(note_script)
822 }
823
824 async fn sync_storage_maps(
825 &self,
826 block_from: BlockNumber,
827 block_to: Option<BlockNumber>,
828 account_id: AccountId,
829 ) -> Result<StorageMapInfo, RpcError> {
830 let mut rpc_api = self.ensure_connected().await?;
831 let mut pagination = BlockPagination::new(block_from, block_to);
832 let mut updates = Vec::new();
833
834 let (chain_tip, block_number) = loop {
835 let request = proto::rpc::SyncAccountStorageMapsRequest {
836 block_range: Some(BlockRange {
837 block_from: pagination.current_block_from().as_u32(),
838 block_to: pagination.block_to().map(|block| block.as_u32()),
839 }),
840 account_id: Some(account_id.into()),
841 };
842 let response = rpc_api.sync_account_storage_maps(request).await.map_err(|status| {
843 self.rpc_error_from_status(RpcEndpoint::SyncStorageMaps, status)
844 })?;
845 let response = response.into_inner();
846 let page = response
847 .pagination_info
848 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
849 let page_block_num = BlockNumber::from(page.block_num);
850 let page_chain_tip = BlockNumber::from(page.chain_tip);
851 let batch = response
852 .updates
853 .into_iter()
854 .map(TryInto::try_into)
855 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
856 updates.extend(batch);
857
858 match pagination.advance(page_block_num, page_chain_tip)? {
859 PaginationResult::Continue => {},
860 PaginationResult::Done {
861 chain_tip: final_chain_tip,
862 block_num: final_block_num,
863 } => break (final_chain_tip, final_block_num),
864 }
865 };
866
867 Ok(StorageMapInfo { chain_tip, block_number, updates })
868 }
869
870 async fn sync_account_vault(
871 &self,
872 block_from: BlockNumber,
873 block_to: Option<BlockNumber>,
874 account_id: AccountId,
875 ) -> Result<AccountVaultInfo, RpcError> {
876 let mut rpc_api = self.ensure_connected().await?;
877 let mut pagination = BlockPagination::new(block_from, block_to);
878 let mut updates = Vec::new();
879
880 let (chain_tip, block_number) = loop {
881 let request = proto::rpc::SyncAccountVaultRequest {
882 block_range: Some(BlockRange {
883 block_from: pagination.current_block_from().as_u32(),
884 block_to: pagination.block_to().map(|block| block.as_u32()),
885 }),
886 account_id: Some(account_id.into()),
887 };
888 let response = rpc_api.sync_account_vault(request).await.map_err(|status| {
889 self.rpc_error_from_status(RpcEndpoint::SyncAccountVault, status)
890 })?;
891 let response = response.into_inner();
892 let page = response
893 .pagination_info
894 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
895 let page_block_num = BlockNumber::from(page.block_num);
896 let page_chain_tip = BlockNumber::from(page.chain_tip);
897 let batch = response
898 .updates
899 .iter()
900 .map(|u| (*u).try_into())
901 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
902 updates.extend(batch);
903
904 match pagination.advance(page_block_num, page_chain_tip)? {
905 PaginationResult::Continue => {},
906 PaginationResult::Done {
907 chain_tip: final_chain_tip,
908 block_num: final_block_num,
909 } => break (final_chain_tip, final_block_num),
910 }
911 };
912
913 Ok(AccountVaultInfo { chain_tip, block_number, updates })
914 }
915
916 async fn sync_transactions(
917 &self,
918 block_from: BlockNumber,
919 block_to: Option<BlockNumber>,
920 account_ids: Vec<AccountId>,
921 ) -> Result<TransactionsInfo, RpcError> {
922 let block_range = Some(BlockRange {
923 block_from: block_from.as_u32(),
924 block_to: block_to.map(|b| b.as_u32()),
925 });
926
927 let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
928
929 let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
930
931 let mut rpc_api = self.ensure_connected().await?;
932
933 let response = rpc_api
934 .sync_transactions(request)
935 .await
936 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SyncTransactions, status))?;
937
938 response.into_inner().try_into()
939 }
940
941 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
942 let endpoint: Endpoint =
943 Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
944 Ok(endpoint.to_network_id())
945 }
946
947 async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
948 if let Some(limits) = *self.limits.read() {
950 return Ok(limits);
951 }
952
953 let mut rpc_api = self.ensure_connected().await?;
955 let response = rpc_api
956 .get_limits(())
957 .await
958 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetLimits, status))?;
959 let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
960
961 self.limits.write().replace(limits);
963 Ok(limits)
964 }
965
966 fn has_rpc_limits(&self) -> Option<RpcLimits> {
967 *self.limits.read()
968 }
969
970 async fn set_rpc_limits(&self, limits: RpcLimits) {
971 self.limits.write().replace(limits);
972 }
973
974 async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
975 GrpcClient::get_status_unversioned(self).await
976 }
977}
978
979impl RpcError {
983 pub fn from_grpc_error_with_context(
984 endpoint: RpcEndpoint,
985 status: Status,
986 context: AcceptHeaderContext,
987 ) -> Self {
988 if let Some(accept_error) =
989 AcceptHeaderError::try_from_message_with_context(status.message(), context)
990 {
991 return Self::AcceptHeaderError(accept_error);
992 }
993
994 let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
996
997 let error_kind = GrpcError::from(&status);
998 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
999
1000 Self::RequestError {
1001 endpoint,
1002 error_kind,
1003 endpoint_error,
1004 source: Some(source),
1005 }
1006 }
1007}
1008
1009impl From<&Status> for GrpcError {
1010 fn from(status: &Status) -> Self {
1011 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1012 }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017 use std::boxed::Box;
1018
1019 use miden_protocol::Word;
1020 use miden_protocol::block::BlockNumber;
1021
1022 use super::{BlockPagination, GrpcClient, PaginationResult};
1023 use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1024
1025 fn assert_send_sync<T: Send + Sync>() {}
1026
1027 #[test]
1028 fn is_send_sync() {
1029 assert_send_sync::<GrpcClient>();
1030 assert_send_sync::<Box<dyn NodeRpcClient>>();
1031 }
1032
1033 #[test]
1034 fn block_pagination_errors_when_block_num_goes_backwards() {
1035 let mut pagination = BlockPagination::new(10_u32.into(), None);
1036
1037 let res = pagination.advance(9_u32.into(), 20_u32.into());
1038 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1039 }
1040
1041 #[test]
1042 fn block_pagination_errors_after_max_iterations() {
1043 let mut pagination = BlockPagination::new(0_u32.into(), None);
1044 let chain_tip: BlockNumber = 10_000_u32.into();
1045
1046 for _ in 0..BlockPagination::MAX_ITERATIONS {
1047 let current = pagination.current_block_from();
1048 let res = pagination
1049 .advance(current, chain_tip)
1050 .expect("expected pagination to continue within iteration limit");
1051 assert!(matches!(res, PaginationResult::Continue));
1052 }
1053
1054 let res = pagination.advance(pagination.current_block_from(), chain_tip);
1055 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1056 }
1057
1058 #[test]
1059 fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1060 let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1062
1063 let res = pagination
1064 .advance(30_u32.into(), 30_u32.into())
1065 .expect("expected pagination to succeed");
1066
1067 assert!(matches!(
1068 res,
1069 PaginationResult::Done {
1070 chain_tip,
1071 block_num
1072 } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1073 ));
1074 }
1075
1076 #[test]
1077 fn block_pagination_advances_cursor_by_one() {
1078 let mut pagination = BlockPagination::new(5_u32.into(), None);
1079
1080 let res = pagination
1081 .advance(5_u32.into(), 100_u32.into())
1082 .expect("expected pagination to succeed");
1083 assert!(matches!(res, PaginationResult::Continue));
1084 assert_eq!(pagination.current_block_from().as_u32(), 6);
1085 }
1086
1087 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1089 let res = client.get_block_header_by_number(None, false).await;
1091 assert!(res.is_ok());
1092 }
1093
1094 #[tokio::test]
1095 async fn future_is_send() {
1096 let endpoint = &Endpoint::devnet();
1097 let client = GrpcClient::new(endpoint, 10000);
1098 let client: Box<GrpcClient> = client.into();
1099 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1100 }
1101
1102 #[tokio::test]
1103 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1104 let endpoint = &Endpoint::devnet();
1105 let client = GrpcClient::new(endpoint, 10000);
1106
1107 assert!(client.genesis_commitment.read().is_none());
1108
1109 let commitment = Word::default();
1110 client.set_genesis_commitment(commitment).await.unwrap();
1111
1112 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1113 }
1114
1115 #[tokio::test]
1116 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1117 use miden_protocol::Felt;
1118
1119 let endpoint = &Endpoint::devnet();
1120 let client = GrpcClient::new(endpoint, 10000);
1121
1122 let initial_commitment = Word::default();
1123 client.set_genesis_commitment(initial_commitment).await.unwrap();
1124
1125 let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1126 client.set_genesis_commitment(new_commitment).await.unwrap();
1127
1128 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1129 }
1130
1131 #[tokio::test]
1132 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1133 let endpoint = &Endpoint::devnet();
1134 let client = GrpcClient::new(endpoint, 10000);
1135
1136 client.connect().await.unwrap();
1138
1139 let commitment = Word::default();
1140 client.set_genesis_commitment(commitment).await.unwrap();
1141
1142 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1143 assert!(client.client.read().as_ref().is_some());
1144 }
1145}