1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use miden_protocol::asset::{Asset, AssetVault};
8
9use miden_protocol::account::{
10 Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
11};
12use miden_protocol::address::NetworkId;
13use miden_protocol::block::account_tree::AccountWitness;
14use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
15use miden_protocol::crypto::merkle::MerklePath;
16use miden_protocol::crypto::merkle::mmr::{Forest, MmrProof};
17use miden_protocol::crypto::merkle::smt::SmtProof;
18use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
19use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
20use miden_protocol::utils::Deserializable;
21use miden_protocol::{EMPTY_WORD, Word};
22use miden_tx::utils::Serializable;
23use miden_tx::utils::sync::RwLock;
24use tonic::Status;
25use tracing::info;
26
27use super::domain::account::{AccountProof, AccountStorageDetails, AccountUpdateSummary};
28use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
29use super::generated::rpc::account_request::AccountDetailRequest;
30use super::generated::rpc::AccountRequest;
31use super::{
32 Endpoint, FetchedAccount, NodeRpcClient, NodeRpcClientEndpoint, NoteSyncInfo, RpcError,
33 StateSyncInfo,
34};
35use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
36use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
37use crate::rpc::domain::transaction::TransactionsInfo;
38use crate::rpc::errors::{AcceptHeaderError, GrpcError, RpcConversionError};
39use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
40use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
41use crate::rpc::generated::rpc::BlockRange;
42use crate::rpc::{AccountStateAt, NOTE_IDS_LIMIT, NULLIFIER_PREFIXES_LIMIT, generated as proto};
43use crate::transaction::ForeignAccount;
44
45mod api_client;
46use api_client::api_client_wrapper::ApiClient;
47
48pub struct GrpcClient {
62 client: RwLock<Option<ApiClient>>,
63 endpoint: String,
64 timeout_ms: u64,
65 genesis_commitment: RwLock<Option<Word>>,
66}
67
68impl GrpcClient {
69 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
72 GrpcClient {
73 client: RwLock::new(None),
74 endpoint: endpoint.to_string(),
75 timeout_ms,
76 genesis_commitment: RwLock::new(None),
77 }
78 }
79
80 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
83 if self.client.read().is_none() {
84 self.connect().await?;
85 }
86
87 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
88 }
89
90 async fn connect(&self) -> Result<(), RpcError> {
93 let genesis_commitment = *self.genesis_commitment.read();
94 let new_client =
95 ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
96 .await?;
97 let mut client = self.client.write();
98 client.replace(new_client);
99
100 Ok(())
101 }
102
103 pub async fn fetch_full_account_proof(
110 &self,
111 account_id: AccountId,
112 ) -> Result<(BlockNumber, AccountProof), RpcError> {
113 let mut rpc_api = self.ensure_connected().await?;
114 let has_public_state = account_id.has_public_state();
115 let account_request = {
116 AccountRequest {
117 account_id: Some(account_id.into()),
118 block_num: None,
119 details: {
120 if has_public_state {
121 Some(AccountDetailRequest {
126 code_commitment: Some(EMPTY_WORD.into()),
127 asset_vault_commitment: Some(EMPTY_WORD.into()),
128 storage_maps: vec![],
129 })
130 } else {
131 None
132 }
133 },
134 }
135 };
136 let account_response = rpc_api
137 .get_account(account_request)
138 .await
139 .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
140 .into_inner();
141 let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
142 "GetAccountDetails returned an account without a matching block number for the witness"
143 .to_owned(),
144 ))?;
145 let account_proof = {
146 if has_public_state {
147 let account_details = account_response
148 .details
149 .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
150 .into_domain(&BTreeMap::new())?;
151 let storage_header = account_details.storage_details.header;
152 let maps_to_request = storage_header
156 .slots()
157 .filter(|header| header.slot_type().is_map())
158 .map(|map| map.name().to_string());
159 let account_request = AccountRequest {
160 account_id: Some(account_id.into()),
161 block_num: None,
162 details: Some(AccountDetailRequest {
163 code_commitment: Some(EMPTY_WORD.into()),
164 asset_vault_commitment: Some(EMPTY_WORD.into()),
165 storage_maps: maps_to_request
166 .map(|slot_name| StorageMapDetailRequest {
167 slot_name,
168 slot_data: Some(SlotData::AllEntries(true)),
169 })
170 .collect(),
171 }),
172 };
173 match rpc_api.get_account(account_request).await {
174 Ok(account_proof) => account_proof.into_inner().try_into(),
175 Err(err) => Err(RpcError::ConnectionError(
176 format!(
177 "failed to fetch account proof for account: {account_id}, got: {err}"
178 )
179 .into(),
180 )),
181 }
182 } else {
183 account_response.try_into()
184 }
185 };
186 Ok((block_number.block_num.into(), account_proof?))
187 }
188
189 async fn build_storage_slots(
193 &self,
194 account_id: AccountId,
195 storage_details: &AccountStorageDetails,
196 ) -> Result<Vec<StorageSlot>, RpcError> {
197 let mut slots = vec![];
198 let mut map_cache: Option<StorageMapInfo> = None;
201 for slot_header in storage_details.header.slots() {
202 match slot_header.slot_type() {
209 StorageSlotType::Value => {
210 slots.push(miden_protocol::account::StorageSlot::with_value(
211 slot_header.name().clone(),
212 slot_header.value(),
213 ));
214 },
215 StorageSlotType::Map => {
216 let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
217 RpcError::ExpectedDataMissing(format!(
218 "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
219 slot_header.name(),
220 )),
221 )?;
222
223 let storage_map = if map_details.too_many_entries {
224 let map_info = if let Some(ref info) = map_cache {
225 info
226 } else {
227 let fetched_data =
228 self.sync_storage_maps(0_u32.into(), None, account_id).await?;
229 map_cache.insert(fetched_data)
230 };
231 let mut sorted_updates: Vec<_> = map_info
235 .updates
236 .iter()
237 .filter(|slot_info| slot_info.slot_name == *slot_header.name())
238 .collect();
239 sorted_updates.sort_by_key(|u| u.block_num);
240 let map_entries: Vec<_> = sorted_updates
241 .into_iter()
242 .map(|u| (u.key, u.value))
243 .collect::<BTreeMap<_, _>>()
244 .into_iter()
245 .collect();
246 StorageMap::with_entries(map_entries)
247 } else {
248 map_details.entries.clone().into_storage_map()
249 }
250 .map_err(|err| {
251 RpcError::InvalidResponse(format!(
252 "the rpc api returned a non-valid map entry: {err}"
253 ))
254 })?;
255
256 slots.push(miden_protocol::account::StorageSlot::with_map(
257 slot_header.name().clone(),
258 storage_map,
259 ));
260 },
261 }
262 }
263 Ok(slots)
264 }
265}
266
267#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
268#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
269impl NodeRpcClient for GrpcClient {
270 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
275 if self.genesis_commitment.read().is_some() {
277 return Ok(());
279 }
280
281 self.genesis_commitment.write().replace(commitment);
283
284 let mut client_guard = self.client.write();
287 if let Some(client) = client_guard.as_mut() {
288 client.set_genesis_commitment(commitment);
289 }
290
291 Ok(())
292 }
293
294 async fn submit_proven_transaction(
295 &self,
296 proven_transaction: ProvenTransaction,
297 transaction_inputs: TransactionInputs,
298 ) -> Result<BlockNumber, RpcError> {
299 let request = proto::transaction::ProvenTransaction {
300 transaction: proven_transaction.to_bytes(),
301 transaction_inputs: Some(transaction_inputs.to_bytes()),
302 };
303
304 let mut rpc_api = self.ensure_connected().await?;
305
306 let api_response = rpc_api.submit_proven_transaction(request).await.map_err(|status| {
307 RpcError::from_grpc_error(NodeRpcClientEndpoint::SubmitProvenTx, status)
308 })?;
309
310 Ok(BlockNumber::from(api_response.into_inner().block_num))
311 }
312
313 async fn get_block_header_by_number(
314 &self,
315 block_num: Option<BlockNumber>,
316 include_mmr_proof: bool,
317 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
318 let request = proto::rpc::BlockHeaderByNumberRequest {
319 block_num: block_num.as_ref().map(BlockNumber::as_u32),
320 include_mmr_proof: Some(include_mmr_proof),
321 };
322
323 info!("Calling GetBlockHeaderByNumber: {:?}", request);
324
325 let mut rpc_api = self.ensure_connected().await?;
326
327 let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|status| {
328 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockHeaderByNumber, status)
329 })?;
330
331 let response = api_response.into_inner();
332
333 let block_header: BlockHeader = response
334 .block_header
335 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
336 .try_into()?;
337
338 let mmr_proof = if include_mmr_proof {
339 let forest = response
340 .chain_length
341 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
342 let merkle_path: MerklePath = response
343 .mmr_path
344 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
345 .try_into()?;
346
347 Some(MmrProof {
348 forest: Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
349 position: block_header.block_num().as_usize(),
350 merkle_path,
351 })
352 } else {
353 None
354 };
355
356 Ok((block_header, mmr_proof))
357 }
358
359 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
360 let mut notes = Vec::with_capacity(note_ids.len());
361 for chunk in note_ids.chunks(NOTE_IDS_LIMIT) {
362 let request = proto::note::NoteIdList {
363 ids: chunk.iter().map(|id| (*id).into()).collect(),
364 };
365
366 let mut rpc_api = self.ensure_connected().await?;
367
368 let api_response = rpc_api.get_notes_by_id(request).await.map_err(|status| {
369 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNotesById, status)
370 })?;
371
372 let response_notes = api_response
373 .into_inner()
374 .notes
375 .into_iter()
376 .map(FetchedNote::try_from)
377 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
378
379 notes.extend(response_notes);
380 }
381 Ok(notes)
382 }
383
384 async fn sync_state(
387 &self,
388 block_num: BlockNumber,
389 account_ids: &[AccountId],
390 note_tags: &BTreeSet<NoteTag>,
391 ) -> Result<StateSyncInfo, RpcError> {
392 let account_ids = account_ids.iter().map(|acc| (*acc).into()).collect();
393
394 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
395
396 let request = proto::rpc::SyncStateRequest {
397 block_num: block_num.as_u32(),
398 account_ids,
399 note_tags,
400 };
401
402 let mut rpc_api = self.ensure_connected().await?;
403
404 let response = rpc_api.sync_state(request).await.map_err(|status| {
405 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncState, status)
406 })?;
407 response.into_inner().try_into()
408 }
409
410 async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
422 let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
423 let update_summary =
424 AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
425
426 if account_id.is_private() {
429 Ok(FetchedAccount::new_private(account_id, update_summary))
430 } else {
431 let details =
435 full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
436 "GetAccountDetails returned a public account without details".to_owned(),
437 ))?;
438 let account_id = details.header.id();
439 let nonce = details.header.nonce();
440 let assets: Vec<Asset> = {
441 if details.vault_details.too_many_assets {
442 let vault_info =
443 self.sync_account_vault(BlockNumber::from(0), None, account_id).await?;
444 let mut updates = vault_info.updates;
448 updates.sort_by_key(|u| u.block_num);
449 updates
450 .into_iter()
451 .map(|u| (Word::from(u.vault_key), u.asset))
452 .collect::<BTreeMap<_, _>>()
453 .into_values()
454 .flatten()
455 .collect()
456 } else {
457 details.vault_details.assets
458 }
459 };
460
461 let slots = self.build_storage_slots(account_id, &details.storage_details).await?;
462 let seed = None;
463 let asset_vault = AssetVault::new(&assets).map_err(|err| {
464 RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
465 })?;
466 let account_storage = AccountStorage::new(slots).map_err(|err| {
467 RpcError::InvalidResponse(format!(
468 "api rpc returned non-valid storage slots: {err}"
469 ))
470 })?;
471 let account =
472 Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
473 .map_err(|err| {
474 RpcError::InvalidResponse(format!(
475 "failed to instance an account from the rpc api response: {err}"
476 ))
477 })?;
478 Ok(FetchedAccount::new_public(account, update_summary))
479 }
480 }
481
482 async fn get_account(
494 &self,
495 foreign_account: ForeignAccount,
496 account_state: AccountStateAt,
497 known_account_code: Option<AccountCode>,
498 ) -> Result<(BlockNumber, AccountProof), RpcError> {
499 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
500 if let Some(account_code) = known_account_code {
501 known_codes_by_commitment.insert(account_code.commitment(), account_code);
502 }
503
504 let mut rpc_api = self.ensure_connected().await?;
505
506 let account_id = foreign_account.account_id();
508 let storage_requirements = foreign_account.storage_slot_requirements();
509
510 let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
511
512 let account_details = if account_id.is_public() {
515 Some(AccountDetailRequest {
516 code_commitment: Some(EMPTY_WORD.into()),
517 asset_vault_commitment: None,
520 storage_maps,
521 })
522 } else {
523 None
524 };
525
526 let block_num = match account_state {
527 AccountStateAt::Block(number) => Some(number.into()),
528 AccountStateAt::ChainTip => None,
529 };
530
531 let request = AccountRequest {
532 account_id: Some(account_id.into()),
533 block_num,
534 details: account_details,
535 };
536
537 let response = rpc_api
538 .get_account(request)
539 .await
540 .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
541 .into_inner();
542
543 let account_witness: AccountWitness = response
544 .witness
545 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
546 .try_into()?;
547
548 let headers = if account_witness.id().is_public() {
550 Some(
551 response
552 .details
553 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
554 .into_domain(&known_codes_by_commitment)?,
555 )
556 } else {
557 None
558 };
559
560 let proof = AccountProof::new(account_witness, headers)
561 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
562
563 let block_num = response
564 .block_num
565 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
566 .block_num
567 .into();
568
569 Ok((block_num, proof))
570 }
571
572 async fn sync_notes(
575 &self,
576 block_num: BlockNumber,
577 block_to: Option<BlockNumber>,
578 note_tags: &BTreeSet<NoteTag>,
579 ) -> Result<NoteSyncInfo, RpcError> {
580 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
581
582 let block_range = Some(BlockRange {
583 block_from: block_num.as_u32(),
584 block_to: block_to.map(|b| b.as_u32()),
585 });
586
587 let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
588
589 let mut rpc_api = self.ensure_connected().await?;
590
591 let response = rpc_api.sync_notes(request).await.map_err(|status| {
592 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNotes, status)
593 })?;
594
595 response.into_inner().try_into()
596 }
597
598 async fn sync_nullifiers(
599 &self,
600 prefixes: &[u16],
601 block_num: BlockNumber,
602 block_to: Option<BlockNumber>,
603 ) -> Result<Vec<NullifierUpdate>, RpcError> {
604 const MAX_ITERATIONS: u32 = 1000; let mut all_nullifiers = BTreeSet::new();
607
608 let mut rpc_api = self.ensure_connected().await?;
610
611 'chunk_nullifiers: for chunk in prefixes.chunks(NULLIFIER_PREFIXES_LIMIT) {
614 let mut current_block_from = block_num.as_u32();
615
616 for _ in 0..MAX_ITERATIONS {
617 let request = proto::rpc::SyncNullifiersRequest {
618 nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
619 prefix_len: 16,
620 block_range: Some(BlockRange {
621 block_from: current_block_from,
622 block_to: block_to.map(|b| b.as_u32()),
623 }),
624 };
625
626 let response = rpc_api.sync_nullifiers(request).await.map_err(|status| {
627 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNullifiers, status)
628 })?;
629 let response = response.into_inner();
630
631 let batch_nullifiers = response
633 .nullifiers
634 .iter()
635 .map(TryFrom::try_from)
636 .collect::<Result<Vec<NullifierUpdate>, _>>()
637 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
638
639 all_nullifiers.extend(batch_nullifiers);
640
641 if let Some(page) = response.pagination_info {
643 if page.block_num < current_block_from {
645 return Err(RpcError::InvalidResponse(
646 "invalid pagination: block_num went backwards".to_string(),
647 ));
648 }
649
650 let target_block =
652 block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
653
654 if page.block_num >= target_block {
655 continue 'chunk_nullifiers;
657 }
658 current_block_from = page.block_num + 1;
659 }
660 }
661 return Err(RpcError::InvalidResponse(
663 "too many pagination iterations, possible infinite loop".to_string(),
664 ));
665 }
666 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
667 }
668
669 async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
670 let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
671 for chunk in nullifiers.chunks(NULLIFIER_PREFIXES_LIMIT) {
672 let request = proto::rpc::NullifierList {
673 nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
674 };
675
676 let mut rpc_api = self.ensure_connected().await?;
677
678 let response = rpc_api.check_nullifiers(request).await.map_err(|status| {
679 RpcError::from_grpc_error(NodeRpcClientEndpoint::CheckNullifiers, status)
680 })?;
681
682 let mut response = response.into_inner();
683 let chunk_proofs = response
684 .proofs
685 .iter_mut()
686 .map(|r| r.to_owned().try_into())
687 .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
688 proofs.extend(chunk_proofs);
689 }
690 Ok(proofs)
691 }
692
693 async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
694 let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
695
696 let mut rpc_api = self.ensure_connected().await?;
697
698 let response = rpc_api.get_block_by_number(request).await.map_err(|status| {
699 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockByNumber, status)
700 })?;
701
702 let response = response.into_inner();
703 let block =
704 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
705 "GetBlockByNumberResponse.block".to_string(),
706 ))?)?;
707
708 Ok(block)
709 }
710
711 async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
712 let request = proto::note::NoteRoot { root: Some(root.into()) };
713
714 let mut rpc_api = self.ensure_connected().await?;
715
716 let response = rpc_api.get_note_script_by_root(request).await.map_err(|status| {
717 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNoteScriptByRoot, status)
718 })?;
719
720 let response = response.into_inner();
721 let note_script = NoteScript::try_from(
722 response
723 .script
724 .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
725 )?;
726
727 Ok(note_script)
728 }
729
730 async fn sync_storage_maps(
731 &self,
732 block_from: BlockNumber,
733 block_to: Option<BlockNumber>,
734 account_id: AccountId,
735 ) -> Result<StorageMapInfo, RpcError> {
736 let mut all_updates = Vec::new();
737 let mut current_block_from = block_from.as_u32();
738 let mut target_block_reached = false;
739 let mut final_chain_tip = 0;
740 let mut final_block_num = 0;
741
742 let mut rpc_api = self.ensure_connected().await?;
743
744 while !target_block_reached {
745 let request = proto::rpc::SyncAccountStorageMapsRequest {
746 block_range: Some(BlockRange {
747 block_from: current_block_from,
748 block_to: block_to.map(|b| b.as_u32()),
749 }),
750 account_id: Some(account_id.into()),
751 };
752
753 let response = rpc_api.sync_account_storage_maps(request).await.map_err(|status| {
754 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncStorageMaps, status)
755 })?;
756 let response = response.into_inner();
757
758 let batch_updates = response
759 .updates
760 .into_iter()
761 .map(TryInto::try_into)
762 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
763 all_updates.extend(batch_updates);
764
765 let page = response
766 .pagination_info
767 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
768
769 if page.block_num < current_block_from {
770 return Err(RpcError::InvalidResponse(
771 "invalid pagination: block_num went backwards".to_owned(),
772 ));
773 }
774
775 final_chain_tip = page.chain_tip;
776 final_block_num = page.block_num;
777
778 let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
779
780 target_block_reached = page.block_num >= target_block;
781 current_block_from = page.block_num + 1;
782 }
783
784 Ok(StorageMapInfo {
785 chain_tip: final_chain_tip.into(),
786 block_number: final_block_num.into(),
787 updates: all_updates,
788 })
789 }
790
791 async fn sync_account_vault(
792 &self,
793 block_from: BlockNumber,
794 block_to: Option<BlockNumber>,
795 account_id: AccountId,
796 ) -> Result<AccountVaultInfo, RpcError> {
797 let mut all_updates = Vec::new();
798 let mut current_block_from = block_from.as_u32();
799 let mut target_block_reached = false;
800 let mut final_chain_tip = 0;
801 let mut final_block_num = 0;
802
803 let mut rpc_api = self.ensure_connected().await?;
804
805 while !target_block_reached {
806 let request = proto::rpc::SyncAccountVaultRequest {
807 block_range: Some(BlockRange {
808 block_from: current_block_from,
809 block_to: block_to.map(|b| b.as_u32()),
810 }),
811 account_id: Some(account_id.into()),
812 };
813
814 let response = rpc_api
815 .sync_account_vault(request)
816 .await
817 .map_err(|status| {
818 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncAccountVault, status)
819 })?
820 .into_inner();
821
822 let batch_updates = response
823 .updates
824 .iter()
825 .map(|u| (*u).try_into())
826 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
827 all_updates.extend(batch_updates);
828
829 let page = response
830 .pagination_info
831 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
832
833 if page.block_num < current_block_from {
834 return Err(RpcError::InvalidResponse(
835 "invalid pagination: block_num went backwards".to_owned(),
836 ));
837 }
838
839 final_chain_tip = page.chain_tip;
840 final_block_num = page.block_num;
841
842 let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
843
844 target_block_reached = page.block_num >= target_block;
845 current_block_from = page.block_num + 1;
846 }
847
848 Ok(AccountVaultInfo {
849 chain_tip: final_chain_tip.into(),
850 block_number: final_block_num.into(),
851 updates: all_updates,
852 })
853 }
854
855 async fn sync_transactions(
856 &self,
857 block_from: BlockNumber,
858 block_to: Option<BlockNumber>,
859 account_ids: Vec<AccountId>,
860 ) -> Result<TransactionsInfo, RpcError> {
861 let block_range = Some(BlockRange {
862 block_from: block_from.as_u32(),
863 block_to: block_to.map(|b| b.as_u32()),
864 });
865
866 let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
867
868 let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
869
870 let mut rpc_api = self.ensure_connected().await?;
871
872 let response = rpc_api.sync_transactions(request).await.map_err(|status| {
873 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncTransactions, status)
874 })?;
875
876 response.into_inner().try_into()
877 }
878
879 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
880 let endpoint_str: &str = &self.endpoint.clone();
881 let endpoint: Endpoint =
882 Endpoint::try_from(endpoint_str).map_err(RpcError::InvalidNodeEndpoint)?;
883 Ok(endpoint.to_network_id())
884 }
885}
886
887impl RpcError {
891 pub fn from_grpc_error(endpoint: NodeRpcClientEndpoint, status: Status) -> Self {
892 if let Some(accept_error) = AcceptHeaderError::try_from_message(status.message()) {
893 return Self::AcceptHeaderError(accept_error);
894 }
895
896 let error_kind = GrpcError::from(&status);
897 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
898
899 Self::GrpcError {
900 endpoint,
901 error_kind,
902 source: Some(source),
903 }
904 }
905}
906
907impl From<&Status> for GrpcError {
908 fn from(status: &Status) -> Self {
909 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
910 }
911}
912
913#[cfg(test)]
914mod tests {
915 use std::boxed::Box;
916
917 use miden_protocol::Word;
918
919 use super::GrpcClient;
920 use crate::rpc::{Endpoint, NodeRpcClient};
921
922 fn assert_send_sync<T: Send + Sync>() {}
923
924 #[test]
925 fn is_send_sync() {
926 assert_send_sync::<GrpcClient>();
927 assert_send_sync::<Box<dyn NodeRpcClient>>();
928 }
929
930 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
932 let res = client.get_block_header_by_number(None, false).await;
934 assert!(res.is_ok());
935 }
936
937 #[tokio::test]
938 async fn future_is_send() {
939 let endpoint = &Endpoint::devnet();
940 let client = GrpcClient::new(endpoint, 10000);
941 let client: Box<GrpcClient> = client.into();
942 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
943 }
944
945 #[tokio::test]
946 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
947 let endpoint = &Endpoint::devnet();
948 let client = GrpcClient::new(endpoint, 10000);
949
950 assert!(client.genesis_commitment.read().is_none());
951
952 let commitment = Word::default();
953 client.set_genesis_commitment(commitment).await.unwrap();
954
955 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
956 }
957
958 #[tokio::test]
959 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
960 use miden_protocol::Felt;
961
962 let endpoint = &Endpoint::devnet();
963 let client = GrpcClient::new(endpoint, 10000);
964
965 let initial_commitment = Word::default();
966 client.set_genesis_commitment(initial_commitment).await.unwrap();
967
968 let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
969 client.set_genesis_commitment(new_commitment).await.unwrap();
970
971 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
972 }
973
974 #[tokio::test]
975 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
976 let endpoint = &Endpoint::devnet();
977 let client = GrpcClient::new(endpoint, 10000);
978
979 client.connect().await.unwrap();
981
982 let commitment = Word::default();
983 client.set_genesis_commitment(commitment).await.unwrap();
984
985 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
986 assert!(client.client.read().as_ref().is_some());
987 }
988}