1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use miden_protocol::asset::{Asset, AssetVault};
8
9use miden_protocol::account::{
10 Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
11};
12use miden_protocol::address::NetworkId;
13use miden_protocol::block::account_tree::AccountWitness;
14use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
15use miden_protocol::crypto::merkle::MerklePath;
16use miden_protocol::crypto::merkle::mmr::{Forest, MmrProof};
17use miden_protocol::crypto::merkle::smt::SmtProof;
18use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
19use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
20use miden_protocol::utils::Deserializable;
21use miden_protocol::{EMPTY_WORD, Word};
22use miden_tx::utils::Serializable;
23use miden_tx::utils::sync::RwLock;
24use tonic::Status;
25use tracing::info;
26
27use super::domain::account::{
28 AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
29};
30use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
31use super::generated::rpc::account_request::AccountDetailRequest;
32use super::generated::rpc::AccountRequest;
33use super::{
34 Endpoint, FetchedAccount, NodeRpcClient, NodeRpcClientEndpoint, NoteSyncInfo, RpcError,
35 StateSyncInfo,
36};
37use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
38use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
39use crate::rpc::domain::transaction::TransactionsInfo;
40use crate::rpc::errors::{AcceptHeaderError, GrpcError, RpcConversionError};
41use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
42use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
43use crate::rpc::generated::rpc::BlockRange;
44use crate::rpc::{AccountStateAt, NOTE_IDS_LIMIT, NULLIFIER_PREFIXES_LIMIT, generated as proto};
45use crate::transaction::ForeignAccount;
46
47mod api_client;
48use api_client::api_client_wrapper::ApiClient;
49
50pub struct GrpcClient {
64 client: RwLock<Option<ApiClient>>,
65 endpoint: String,
66 timeout_ms: u64,
67 genesis_commitment: RwLock<Option<Word>>,
68}
69
70impl GrpcClient {
71 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
74 GrpcClient {
75 client: RwLock::new(None),
76 endpoint: endpoint.to_string(),
77 timeout_ms,
78 genesis_commitment: RwLock::new(None),
79 }
80 }
81
82 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
85 if self.client.read().is_none() {
86 self.connect().await?;
87 }
88
89 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
90 }
91
92 async fn connect(&self) -> Result<(), RpcError> {
95 let genesis_commitment = *self.genesis_commitment.read();
96 let new_client =
97 ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
98 .await?;
99 let mut client = self.client.write();
100 client.replace(new_client);
101
102 Ok(())
103 }
104
105 pub async fn fetch_full_account_proof(
112 &self,
113 account_id: AccountId,
114 ) -> Result<(BlockNumber, AccountProof), RpcError> {
115 let mut rpc_api = self.ensure_connected().await?;
116 let has_public_state = account_id.has_public_state();
117 let account_request = {
118 AccountRequest {
119 account_id: Some(account_id.into()),
120 block_num: None,
121 details: {
122 if has_public_state {
123 Some(AccountDetailRequest {
128 code_commitment: Some(EMPTY_WORD.into()),
129 asset_vault_commitment: Some(EMPTY_WORD.into()),
130 storage_maps: vec![],
131 })
132 } else {
133 None
134 }
135 },
136 }
137 };
138 let account_response = rpc_api
139 .get_account(account_request)
140 .await
141 .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
142 .into_inner();
143 let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
144 "GetAccountDetails returned an account without a matching block number for the witness"
145 .to_owned(),
146 ))?;
147 let account_proof = {
148 if has_public_state {
149 let account_details = account_response
150 .details
151 .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
152 .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
153 let storage_header = account_details.storage_details.header;
154 let maps_to_request = storage_header
158 .slots()
159 .filter(|header| header.slot_type().is_map())
160 .map(|map| map.name().to_string());
161 let account_request = AccountRequest {
162 account_id: Some(account_id.into()),
163 block_num: None,
164 details: Some(AccountDetailRequest {
165 code_commitment: Some(EMPTY_WORD.into()),
166 asset_vault_commitment: Some(EMPTY_WORD.into()),
167 storage_maps: maps_to_request
168 .map(|slot_name| StorageMapDetailRequest {
169 slot_name,
170 slot_data: Some(SlotData::AllEntries(true)),
171 })
172 .collect(),
173 }),
174 };
175 match rpc_api.get_account(account_request).await {
176 Ok(account_proof) => account_proof.into_inner().try_into(),
177 Err(err) => Err(RpcError::ConnectionError(
178 format!(
179 "failed to fetch account proof for account: {account_id}, got: {err}"
180 )
181 .into(),
182 )),
183 }
184 } else {
185 account_response.try_into()
186 }
187 };
188 Ok((block_number.block_num.into(), account_proof?))
189 }
190
191 async fn build_storage_slots(
195 &self,
196 account_id: AccountId,
197 storage_details: &AccountStorageDetails,
198 ) -> Result<Vec<StorageSlot>, RpcError> {
199 let mut slots = vec![];
200 let mut map_cache: Option<StorageMapInfo> = None;
203 for slot_header in storage_details.header.slots() {
204 match slot_header.slot_type() {
211 StorageSlotType::Value => {
212 slots.push(miden_protocol::account::StorageSlot::with_value(
213 slot_header.name().clone(),
214 slot_header.value(),
215 ));
216 },
217 StorageSlotType::Map => {
218 let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
219 RpcError::ExpectedDataMissing(format!(
220 "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
221 slot_header.name(),
222 )),
223 )?;
224
225 let storage_map = if map_details.too_many_entries {
226 let map_info = if let Some(ref info) = map_cache {
227 info
228 } else {
229 let fetched_data =
230 self.sync_storage_maps(0_u32.into(), None, account_id).await?;
231 map_cache.insert(fetched_data)
232 };
233 let mut sorted_updates: Vec<_> = map_info
237 .updates
238 .iter()
239 .filter(|slot_info| slot_info.slot_name == *slot_header.name())
240 .collect();
241 sorted_updates.sort_by_key(|u| u.block_num);
242 let map_entries: Vec<_> = sorted_updates
243 .into_iter()
244 .map(|u| (u.key, u.value))
245 .collect::<BTreeMap<_, _>>()
246 .into_iter()
247 .collect();
248 StorageMap::with_entries(map_entries)
249 } else {
250 map_details.entries.clone().into_storage_map()
251 }
252 .map_err(|err| {
253 RpcError::InvalidResponse(format!(
254 "the rpc api returned a non-valid map entry: {err}"
255 ))
256 })?;
257
258 slots.push(miden_protocol::account::StorageSlot::with_map(
259 slot_header.name().clone(),
260 storage_map,
261 ));
262 },
263 }
264 }
265 Ok(slots)
266 }
267}
268
269#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
270#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
271impl NodeRpcClient for GrpcClient {
272 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
277 if self.genesis_commitment.read().is_some() {
279 return Ok(());
281 }
282
283 self.genesis_commitment.write().replace(commitment);
285
286 let mut client_guard = self.client.write();
289 if let Some(client) = client_guard.as_mut() {
290 client.set_genesis_commitment(commitment);
291 }
292
293 Ok(())
294 }
295
296 async fn submit_proven_transaction(
297 &self,
298 proven_transaction: ProvenTransaction,
299 transaction_inputs: TransactionInputs,
300 ) -> Result<BlockNumber, RpcError> {
301 let request = proto::transaction::ProvenTransaction {
302 transaction: proven_transaction.to_bytes(),
303 transaction_inputs: Some(transaction_inputs.to_bytes()),
304 };
305
306 let mut rpc_api = self.ensure_connected().await?;
307
308 let api_response = rpc_api.submit_proven_transaction(request).await.map_err(|status| {
309 RpcError::from_grpc_error(NodeRpcClientEndpoint::SubmitProvenTx, status)
310 })?;
311
312 Ok(BlockNumber::from(api_response.into_inner().block_num))
313 }
314
315 async fn get_block_header_by_number(
316 &self,
317 block_num: Option<BlockNumber>,
318 include_mmr_proof: bool,
319 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
320 let request = proto::rpc::BlockHeaderByNumberRequest {
321 block_num: block_num.as_ref().map(BlockNumber::as_u32),
322 include_mmr_proof: Some(include_mmr_proof),
323 };
324
325 info!("Calling GetBlockHeaderByNumber: {:?}", request);
326
327 let mut rpc_api = self.ensure_connected().await?;
328
329 let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|status| {
330 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockHeaderByNumber, status)
331 })?;
332
333 let response = api_response.into_inner();
334
335 let block_header: BlockHeader = response
336 .block_header
337 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
338 .try_into()?;
339
340 let mmr_proof = if include_mmr_proof {
341 let forest = response
342 .chain_length
343 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
344 let merkle_path: MerklePath = response
345 .mmr_path
346 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
347 .try_into()?;
348
349 Some(MmrProof {
350 forest: Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
351 position: block_header.block_num().as_usize(),
352 merkle_path,
353 })
354 } else {
355 None
356 };
357
358 Ok((block_header, mmr_proof))
359 }
360
361 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
362 let mut notes = Vec::with_capacity(note_ids.len());
363 for chunk in note_ids.chunks(NOTE_IDS_LIMIT) {
364 let request = proto::note::NoteIdList {
365 ids: chunk.iter().map(|id| (*id).into()).collect(),
366 };
367
368 let mut rpc_api = self.ensure_connected().await?;
369
370 let api_response = rpc_api.get_notes_by_id(request).await.map_err(|status| {
371 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNotesById, status)
372 })?;
373
374 let response_notes = api_response
375 .into_inner()
376 .notes
377 .into_iter()
378 .map(FetchedNote::try_from)
379 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
380
381 notes.extend(response_notes);
382 }
383 Ok(notes)
384 }
385
386 async fn sync_state(
389 &self,
390 block_num: BlockNumber,
391 account_ids: &[AccountId],
392 note_tags: &BTreeSet<NoteTag>,
393 ) -> Result<StateSyncInfo, RpcError> {
394 let account_ids = account_ids.iter().map(|acc| (*acc).into()).collect();
395
396 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
397
398 let request = proto::rpc::SyncStateRequest {
399 block_num: block_num.as_u32(),
400 account_ids,
401 note_tags,
402 };
403
404 let mut rpc_api = self.ensure_connected().await?;
405
406 let response = rpc_api.sync_state(request).await.map_err(|status| {
407 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncState, status)
408 })?;
409 response.into_inner().try_into()
410 }
411
412 async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
424 let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
425 let update_summary =
426 AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
427
428 if account_id.is_private() {
431 Ok(FetchedAccount::new_private(account_id, update_summary))
432 } else {
433 let details =
437 full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
438 "GetAccountDetails returned a public account without details".to_owned(),
439 ))?;
440 let account_id = details.header.id();
441 let nonce = details.header.nonce();
442 let assets: Vec<Asset> = {
443 if details.vault_details.too_many_assets {
444 let vault_info =
445 self.sync_account_vault(BlockNumber::from(0), None, account_id).await?;
446 let mut updates = vault_info.updates;
450 updates.sort_by_key(|u| u.block_num);
451 updates
452 .into_iter()
453 .map(|u| (Word::from(u.vault_key), u.asset))
454 .collect::<BTreeMap<_, _>>()
455 .into_values()
456 .flatten()
457 .collect()
458 } else {
459 details.vault_details.assets
460 }
461 };
462
463 let slots = self.build_storage_slots(account_id, &details.storage_details).await?;
464 let seed = None;
465 let asset_vault = AssetVault::new(&assets).map_err(|err| {
466 RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
467 })?;
468 let account_storage = AccountStorage::new(slots).map_err(|err| {
469 RpcError::InvalidResponse(format!(
470 "api rpc returned non-valid storage slots: {err}"
471 ))
472 })?;
473 let account =
474 Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
475 .map_err(|err| {
476 RpcError::InvalidResponse(format!(
477 "failed to instance an account from the rpc api response: {err}"
478 ))
479 })?;
480 Ok(FetchedAccount::new_public(account, update_summary))
481 }
482 }
483
484 async fn get_account(
496 &self,
497 foreign_account: ForeignAccount,
498 account_state: AccountStateAt,
499 known_account_code: Option<AccountCode>,
500 ) -> Result<(BlockNumber, AccountProof), RpcError> {
501 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
502 if let Some(account_code) = known_account_code {
503 known_codes_by_commitment.insert(account_code.commitment(), account_code);
504 }
505
506 let mut rpc_api = self.ensure_connected().await?;
507
508 let account_id = foreign_account.account_id();
510 let storage_requirements = foreign_account.storage_slot_requirements();
511
512 let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
513
514 let account_details = if account_id.is_public() {
517 Some(AccountDetailRequest {
518 code_commitment: Some(EMPTY_WORD.into()),
519 asset_vault_commitment: None,
522 storage_maps,
523 })
524 } else {
525 None
526 };
527
528 let block_num = match account_state {
529 AccountStateAt::Block(number) => Some(number.into()),
530 AccountStateAt::ChainTip => None,
531 };
532
533 let request = AccountRequest {
534 account_id: Some(account_id.into()),
535 block_num,
536 details: account_details,
537 };
538
539 let response = rpc_api
540 .get_account(request)
541 .await
542 .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
543 .into_inner();
544
545 let account_witness: AccountWitness = response
546 .witness
547 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
548 .try_into()?;
549
550 let headers = if account_witness.id().is_public() {
552 Some(
553 response
554 .details
555 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
556 .into_domain(&known_codes_by_commitment, &storage_requirements)?,
557 )
558 } else {
559 None
560 };
561
562 let proof = AccountProof::new(account_witness, headers)
563 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
564
565 let block_num = response
566 .block_num
567 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
568 .block_num
569 .into();
570
571 Ok((block_num, proof))
572 }
573
574 async fn sync_notes(
577 &self,
578 block_num: BlockNumber,
579 block_to: Option<BlockNumber>,
580 note_tags: &BTreeSet<NoteTag>,
581 ) -> Result<NoteSyncInfo, RpcError> {
582 let note_tags = note_tags.iter().map(|¬e_tag| note_tag.into()).collect();
583
584 let block_range = Some(BlockRange {
585 block_from: block_num.as_u32(),
586 block_to: block_to.map(|b| b.as_u32()),
587 });
588
589 let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
590
591 let mut rpc_api = self.ensure_connected().await?;
592
593 let response = rpc_api.sync_notes(request).await.map_err(|status| {
594 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNotes, status)
595 })?;
596
597 response.into_inner().try_into()
598 }
599
600 async fn sync_nullifiers(
601 &self,
602 prefixes: &[u16],
603 block_num: BlockNumber,
604 block_to: Option<BlockNumber>,
605 ) -> Result<Vec<NullifierUpdate>, RpcError> {
606 const MAX_ITERATIONS: u32 = 1000; let mut all_nullifiers = BTreeSet::new();
609
610 let mut rpc_api = self.ensure_connected().await?;
612
613 'chunk_nullifiers: for chunk in prefixes.chunks(NULLIFIER_PREFIXES_LIMIT) {
616 let mut current_block_from = block_num.as_u32();
617
618 for _ in 0..MAX_ITERATIONS {
619 let request = proto::rpc::SyncNullifiersRequest {
620 nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
621 prefix_len: 16,
622 block_range: Some(BlockRange {
623 block_from: current_block_from,
624 block_to: block_to.map(|b| b.as_u32()),
625 }),
626 };
627
628 let response = rpc_api.sync_nullifiers(request).await.map_err(|status| {
629 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNullifiers, status)
630 })?;
631 let response = response.into_inner();
632
633 let batch_nullifiers = response
635 .nullifiers
636 .iter()
637 .map(TryFrom::try_from)
638 .collect::<Result<Vec<NullifierUpdate>, _>>()
639 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
640
641 all_nullifiers.extend(batch_nullifiers);
642
643 if let Some(page) = response.pagination_info {
645 if page.block_num < current_block_from {
647 return Err(RpcError::InvalidResponse(
648 "invalid pagination: block_num went backwards".to_string(),
649 ));
650 }
651
652 let target_block =
654 block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
655
656 if page.block_num >= target_block {
657 continue 'chunk_nullifiers;
659 }
660 current_block_from = page.block_num + 1;
661 }
662 }
663 return Err(RpcError::InvalidResponse(
665 "too many pagination iterations, possible infinite loop".to_string(),
666 ));
667 }
668 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
669 }
670
671 async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
672 let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
673 for chunk in nullifiers.chunks(NULLIFIER_PREFIXES_LIMIT) {
674 let request = proto::rpc::NullifierList {
675 nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
676 };
677
678 let mut rpc_api = self.ensure_connected().await?;
679
680 let response = rpc_api.check_nullifiers(request).await.map_err(|status| {
681 RpcError::from_grpc_error(NodeRpcClientEndpoint::CheckNullifiers, status)
682 })?;
683
684 let mut response = response.into_inner();
685 let chunk_proofs = response
686 .proofs
687 .iter_mut()
688 .map(|r| r.to_owned().try_into())
689 .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
690 proofs.extend(chunk_proofs);
691 }
692 Ok(proofs)
693 }
694
695 async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
696 let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
697
698 let mut rpc_api = self.ensure_connected().await?;
699
700 let response = rpc_api.get_block_by_number(request).await.map_err(|status| {
701 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockByNumber, status)
702 })?;
703
704 let response = response.into_inner();
705 let block =
706 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
707 "GetBlockByNumberResponse.block".to_string(),
708 ))?)?;
709
710 Ok(block)
711 }
712
713 async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
714 let request = proto::note::NoteRoot { root: Some(root.into()) };
715
716 let mut rpc_api = self.ensure_connected().await?;
717
718 let response = rpc_api.get_note_script_by_root(request).await.map_err(|status| {
719 RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNoteScriptByRoot, status)
720 })?;
721
722 let response = response.into_inner();
723 let note_script = NoteScript::try_from(
724 response
725 .script
726 .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
727 )?;
728
729 Ok(note_script)
730 }
731
732 async fn sync_storage_maps(
733 &self,
734 block_from: BlockNumber,
735 block_to: Option<BlockNumber>,
736 account_id: AccountId,
737 ) -> Result<StorageMapInfo, RpcError> {
738 let mut all_updates = Vec::new();
739 let mut current_block_from = block_from.as_u32();
740 let mut target_block_reached = false;
741 let mut final_chain_tip = 0;
742 let mut final_block_num = 0;
743
744 let mut rpc_api = self.ensure_connected().await?;
745
746 while !target_block_reached {
747 let request = proto::rpc::SyncAccountStorageMapsRequest {
748 block_range: Some(BlockRange {
749 block_from: current_block_from,
750 block_to: block_to.map(|b| b.as_u32()),
751 }),
752 account_id: Some(account_id.into()),
753 };
754
755 let response = rpc_api.sync_account_storage_maps(request).await.map_err(|status| {
756 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncStorageMaps, status)
757 })?;
758 let response = response.into_inner();
759
760 let batch_updates = response
761 .updates
762 .into_iter()
763 .map(TryInto::try_into)
764 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
765 all_updates.extend(batch_updates);
766
767 let page = response
768 .pagination_info
769 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
770
771 if page.block_num < current_block_from {
772 return Err(RpcError::InvalidResponse(
773 "invalid pagination: block_num went backwards".to_owned(),
774 ));
775 }
776
777 final_chain_tip = page.chain_tip;
778 final_block_num = page.block_num;
779
780 let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
781
782 target_block_reached = page.block_num >= target_block;
783 current_block_from = page.block_num + 1;
784 }
785
786 Ok(StorageMapInfo {
787 chain_tip: final_chain_tip.into(),
788 block_number: final_block_num.into(),
789 updates: all_updates,
790 })
791 }
792
793 async fn sync_account_vault(
794 &self,
795 block_from: BlockNumber,
796 block_to: Option<BlockNumber>,
797 account_id: AccountId,
798 ) -> Result<AccountVaultInfo, RpcError> {
799 let mut all_updates = Vec::new();
800 let mut current_block_from = block_from.as_u32();
801 let mut target_block_reached = false;
802 let mut final_chain_tip = 0;
803 let mut final_block_num = 0;
804
805 let mut rpc_api = self.ensure_connected().await?;
806
807 while !target_block_reached {
808 let request = proto::rpc::SyncAccountVaultRequest {
809 block_range: Some(BlockRange {
810 block_from: current_block_from,
811 block_to: block_to.map(|b| b.as_u32()),
812 }),
813 account_id: Some(account_id.into()),
814 };
815
816 let response = rpc_api
817 .sync_account_vault(request)
818 .await
819 .map_err(|status| {
820 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncAccountVault, status)
821 })?
822 .into_inner();
823
824 let batch_updates = response
825 .updates
826 .iter()
827 .map(|u| (*u).try_into())
828 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
829 all_updates.extend(batch_updates);
830
831 let page = response
832 .pagination_info
833 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
834
835 if page.block_num < current_block_from {
836 return Err(RpcError::InvalidResponse(
837 "invalid pagination: block_num went backwards".to_owned(),
838 ));
839 }
840
841 final_chain_tip = page.chain_tip;
842 final_block_num = page.block_num;
843
844 let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
845
846 target_block_reached = page.block_num >= target_block;
847 current_block_from = page.block_num + 1;
848 }
849
850 Ok(AccountVaultInfo {
851 chain_tip: final_chain_tip.into(),
852 block_number: final_block_num.into(),
853 updates: all_updates,
854 })
855 }
856
857 async fn sync_transactions(
858 &self,
859 block_from: BlockNumber,
860 block_to: Option<BlockNumber>,
861 account_ids: Vec<AccountId>,
862 ) -> Result<TransactionsInfo, RpcError> {
863 let block_range = Some(BlockRange {
864 block_from: block_from.as_u32(),
865 block_to: block_to.map(|b| b.as_u32()),
866 });
867
868 let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
869
870 let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
871
872 let mut rpc_api = self.ensure_connected().await?;
873
874 let response = rpc_api.sync_transactions(request).await.map_err(|status| {
875 RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncTransactions, status)
876 })?;
877
878 response.into_inner().try_into()
879 }
880
881 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
882 let endpoint_str: &str = &self.endpoint.clone();
883 let endpoint: Endpoint =
884 Endpoint::try_from(endpoint_str).map_err(RpcError::InvalidNodeEndpoint)?;
885 Ok(endpoint.to_network_id())
886 }
887}
888
889impl RpcError {
893 pub fn from_grpc_error(endpoint: NodeRpcClientEndpoint, status: Status) -> Self {
894 if let Some(accept_error) = AcceptHeaderError::try_from_message(status.message()) {
895 return Self::AcceptHeaderError(accept_error);
896 }
897
898 let error_kind = GrpcError::from(&status);
899 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
900
901 Self::GrpcError {
902 endpoint,
903 error_kind,
904 source: Some(source),
905 }
906 }
907}
908
909impl From<&Status> for GrpcError {
910 fn from(status: &Status) -> Self {
911 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
912 }
913}
914
915#[cfg(test)]
916mod tests {
917 use std::boxed::Box;
918
919 use miden_protocol::Word;
920
921 use super::GrpcClient;
922 use crate::rpc::{Endpoint, NodeRpcClient};
923
924 fn assert_send_sync<T: Send + Sync>() {}
925
926 #[test]
927 fn is_send_sync() {
928 assert_send_sync::<GrpcClient>();
929 assert_send_sync::<Box<dyn NodeRpcClient>>();
930 }
931
932 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
934 let res = client.get_block_header_by_number(None, false).await;
936 assert!(res.is_ok());
937 }
938
939 #[tokio::test]
940 async fn future_is_send() {
941 let endpoint = &Endpoint::devnet();
942 let client = GrpcClient::new(endpoint, 10000);
943 let client: Box<GrpcClient> = client.into();
944 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
945 }
946
947 #[tokio::test]
948 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
949 let endpoint = &Endpoint::devnet();
950 let client = GrpcClient::new(endpoint, 10000);
951
952 assert!(client.genesis_commitment.read().is_none());
953
954 let commitment = Word::default();
955 client.set_genesis_commitment(commitment).await.unwrap();
956
957 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
958 }
959
960 #[tokio::test]
961 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
962 use miden_protocol::Felt;
963
964 let endpoint = &Endpoint::devnet();
965 let client = GrpcClient::new(endpoint, 10000);
966
967 let initial_commitment = Word::default();
968 client.set_genesis_commitment(initial_commitment).await.unwrap();
969
970 let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
971 client.set_genesis_commitment(new_commitment).await.unwrap();
972
973 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
974 }
975
976 #[tokio::test]
977 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
978 let endpoint = &Endpoint::devnet();
979 let client = GrpcClient::new(endpoint, 10000);
980
981 client.connect().await.unwrap();
983
984 let commitment = Word::default();
985 client.set_genesis_commitment(commitment).await.unwrap();
986
987 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
988 assert!(client.client.read().as_ref().is_some());
989 }
990}