1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::vm::FutureMaybeSend;
10
11type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
12
13use miden_protocol::account::{AccountCode, AccountId};
14use miden_protocol::address::NetworkId;
15use miden_protocol::batch::{ProposedBatch, ProvenBatch};
16use miden_protocol::block::account_tree::AccountWitness;
17use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
18use miden_protocol::crypto::merkle::MerklePath;
19use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
20use miden_protocol::note::{NoteId, NoteScript, NoteTag};
21use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
22use miden_protocol::utils::serde::Deserializable;
23use miden_protocol::{EMPTY_WORD, Word};
24use miden_tx::utils::serde::Serializable;
25use miden_tx::utils::sync::RwLock;
26use tonic::Status;
27use tracing::info;
28
29use super::domain::account::{
30 AccountProof,
31 AccountStorageRequirements,
32 GetAccountRequest,
33 StorageMapFetch,
34};
35use super::domain::note::{FetchedNote, NoteSyncBlock};
36use super::domain::nullifier::NullifierUpdate;
37use super::generated::rpc::AccountRequest;
38use super::generated::rpc::account_request::AccountDetailRequest;
39use super::{Endpoint, NodeRpcClient, RpcEndpoint, RpcError, RpcStatusInfo};
40use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
41use crate::rpc::domain::limits::RpcLimits;
42use crate::rpc::domain::status::NetworkNoteStatusInfo;
43use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
44use crate::rpc::domain::sync::{ChainMmrInfo, SyncTarget};
45use crate::rpc::domain::transaction::TransactionRecord;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::BlockRange;
49use crate::rpc::{AccountStateAt, generated as proto};
50
51mod api_client;
52mod retry;
53
54use api_client::api_client_wrapper::ApiClient;
55
56struct BlockPagination {
58 current_block_from: BlockNumber,
59 block_to: BlockNumber,
60 iterations: u32,
61}
62
63enum PaginationResult {
64 Continue,
65 Done {
66 chain_tip: BlockNumber,
67 block_num: BlockNumber,
68 },
69}
70
71impl BlockPagination {
72 const MAX_ITERATIONS: u32 = 1000;
77
78 fn new(block_from: BlockNumber, block_to: BlockNumber) -> Self {
79 Self {
80 current_block_from: block_from,
81 block_to,
82 iterations: 0,
83 }
84 }
85
86 fn current_block_from(&self) -> BlockNumber {
87 self.current_block_from
88 }
89
90 fn block_to(&self) -> BlockNumber {
91 self.block_to
92 }
93
94 fn advance(
95 &mut self,
96 block_num: BlockNumber,
97 chain_tip: BlockNumber,
98 ) -> Result<PaginationResult, RpcError> {
99 if self.iterations >= Self::MAX_ITERATIONS {
100 return Err(RpcError::PaginationError(
101 "too many pagination iterations, possible infinite loop".to_owned(),
102 ));
103 }
104 self.iterations += 1;
105
106 if block_num < self.current_block_from {
107 return Err(RpcError::PaginationError(
108 "invalid pagination: block_num went backwards".to_owned(),
109 ));
110 }
111
112 let target_block = self.block_to.min(chain_tip);
113
114 if block_num >= target_block {
115 return Ok(PaginationResult::Done { chain_tip, block_num });
116 }
117
118 self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
119
120 Ok(PaginationResult::Continue)
121 }
122}
123
124pub struct GrpcClient {
138 client: RwLock<Option<ApiClient>>,
140 endpoint: String,
142 timeout_ms: u64,
144 genesis_commitment: RwLock<Option<Word>>,
146 limits: RwLock<Option<RpcLimits>>,
148 max_retries: u32,
150 retry_interval_ms: u64,
152 bearer_token: Option<String>,
156}
157
158impl GrpcClient {
159 pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
162 GrpcClient {
163 client: RwLock::new(None),
164 endpoint: endpoint.to_string(),
165 timeout_ms,
166 genesis_commitment: RwLock::new(None),
167 limits: RwLock::new(None),
168 max_retries: retry::DEFAULT_MAX_RETRIES,
169 retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
170 bearer_token: None,
171 }
172 }
173
174 #[must_use]
177 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
178 self.max_retries = max_retries;
179 self
180 }
181
182 #[must_use]
185 pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
186 self.retry_interval_ms = retry_interval_ms;
187 self
188 }
189
190 #[must_use]
215 pub fn with_bearer_auth(mut self, token: String) -> Self {
216 self.bearer_token = Some(token);
217 self
218 }
219
220 async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
223 if self.client.read().is_none() {
224 self.connect().await?;
225 }
226
227 Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
228 }
229
230 async fn connect(&self) -> Result<(), RpcError> {
233 let genesis_commitment = *self.genesis_commitment.read();
234 let new_client = ApiClient::new_client(
235 self.endpoint.clone(),
236 self.timeout_ms,
237 genesis_commitment,
238 self.bearer_token.clone(),
239 )
240 .await?;
241 let mut client = self.client.write();
242 client.replace(new_client);
243
244 Ok(())
245 }
246
247 fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
248 let genesis_commitment = self
249 .genesis_commitment
250 .read()
251 .as_ref()
252 .map_or_else(|| "none".to_string(), Word::to_hex);
253 let context = AcceptHeaderContext {
254 client_version: env!("CARGO_PKG_VERSION").to_string(),
255 genesis_commitment,
256 };
257 RpcError::from_grpc_error_with_context(endpoint, status, context)
258 }
259
260 async fn call_with_retry<T: Send + 'static>(
271 &self,
272 endpoint: RpcEndpoint,
273 mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
274 ) -> Result<tonic::Response<T>, RpcError> {
275 let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
276
277 loop {
278 let rpc_api = self.ensure_connected().await?;
279
280 match call(rpc_api).await {
281 Ok(response) => return Ok(response),
282 Err(status) if retry_state.should_retry(&status).await => {},
283 Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
284 }
285 }
286 }
287
288 pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
295 let mut rpc_api = ApiClient::new_client_without_accept_header(
296 self.endpoint.clone(),
297 self.timeout_ms,
298 self.bearer_token.clone(),
299 )
300 .await?;
301 rpc_api
302 .status(())
303 .await
304 .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
305 .map(tonic::Response::into_inner)
306 .and_then(RpcStatusInfo::try_from)
307 }
308}
309
310#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
311#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
312impl NodeRpcClient for GrpcClient {
313 fn has_genesis_commitment(&self) -> Option<Word> {
318 *self.genesis_commitment.read()
319 }
320
321 async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
322 if self.genesis_commitment.read().is_some() {
324 return Ok(());
326 }
327
328 self.genesis_commitment.write().replace(commitment);
330
331 let mut client_guard = self.client.write();
334 if let Some(client) = client_guard.as_mut() {
335 client.set_genesis_commitment(commitment);
336 }
337
338 Ok(())
339 }
340
341 async fn submit_proven_transaction(
342 &self,
343 proven_transaction: ProvenTransaction,
344 transaction_inputs: TransactionInputs,
345 ) -> Result<BlockNumber, RpcError> {
346 let request = proto::transaction::ProvenTransaction {
347 transaction: proven_transaction.to_bytes(),
348 transaction_inputs: Some(transaction_inputs.to_bytes()),
349 };
350
351 let api_response = self
352 .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
353 let request = request.clone();
354 Box::pin(async move { rpc_api.submit_proven_tx(request).await })
355 })
356 .await?;
357
358 Ok(BlockNumber::from(api_response.into_inner().block_num))
359 }
360
361 async fn submit_proven_batch(
362 &self,
363 proven_batch: ProvenBatch,
364 proposed_batch: ProposedBatch,
365 transaction_inputs: Vec<TransactionInputs>,
366 ) -> Result<BlockNumber, RpcError> {
367 let request = proto::transaction::TransactionBatch {
368 batch_proof: proven_batch.to_bytes(),
369 proposed_batch: Some(proposed_batch.to_bytes()),
370 transaction_inputs: transaction_inputs.iter().map(Serializable::to_bytes).collect(),
371 };
372
373 let api_response = self
374 .call_with_retry(RpcEndpoint::SubmitProvenBatch, |mut rpc_api| {
375 let request = request.clone();
376 Box::pin(async move { rpc_api.submit_proven_tx_batch(request).await })
377 })
378 .await?;
379
380 Ok(BlockNumber::from(api_response.into_inner().block_num))
381 }
382
383 async fn get_block_header_by_number(
384 &self,
385 block_num: Option<BlockNumber>,
386 include_mmr_proof: bool,
387 ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
388 let request = proto::rpc::BlockHeaderByNumberRequest {
389 block_num: block_num.as_ref().map(BlockNumber::as_u32),
390 include_mmr_proof: Some(include_mmr_proof),
391 };
392
393 info!("Calling GetBlockHeaderByNumber: {:?}", request);
394
395 let api_response = self
396 .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
397 Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
398 })
399 .await?;
400
401 let response = api_response.into_inner();
402
403 let block_header: BlockHeader = response
404 .block_header
405 .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
406 .try_into()?;
407
408 let mmr_proof = if include_mmr_proof {
409 let forest = response
410 .chain_length
411 .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
412 let merkle_path: MerklePath = response
413 .mmr_path
414 .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
415 .try_into()?;
416
417 let forest_size = usize::try_from(forest).expect("u64 should fit in usize");
418 let forest = Forest::new(forest_size).map_err(|_| {
419 RpcError::InvalidResponse(format!("invalid forest size: {forest_size}"))
420 })?;
421 Some(MmrProof::new(
422 MmrPath::new(forest, block_header.block_num().as_usize(), merkle_path),
423 block_header.commitment(),
424 ))
425 } else {
426 None
427 };
428
429 Ok((block_header, mmr_proof))
430 }
431
432 async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
433 let limits = self.get_rpc_limits().await?;
434 let mut notes = Vec::with_capacity(note_ids.len());
435 for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
436 let request = proto::note::NoteIdList {
437 ids: chunk.iter().map(|id| (*id).into()).collect(),
438 };
439
440 let api_response = self
441 .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
442 let request = request.clone();
443 Box::pin(async move { rpc_api.get_notes_by_id(request).await })
444 })
445 .await?;
446
447 let response_notes = api_response
448 .into_inner()
449 .notes
450 .into_iter()
451 .map(FetchedNote::try_from)
452 .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
453
454 notes.extend(response_notes);
455 }
456 Ok(notes)
457 }
458
459 async fn sync_chain_mmr(
460 &self,
461 current_block_height: BlockNumber,
462 upper_bound: SyncTarget,
463 ) -> Result<ChainMmrInfo, RpcError> {
464 let finality_level: proto::rpc::FinalityLevel = upper_bound.into();
465
466 let request = proto::rpc::SyncChainMmrRequest {
467 current_client_block_height: current_block_height.as_u32(),
468 finality_level: finality_level.into(),
469 };
470
471 let response = self
472 .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
473 Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
474 })
475 .await?;
476
477 response.into_inner().try_into()
478 }
479
480 async fn get_account(
492 &self,
493 account_id: AccountId,
494 request: GetAccountRequest,
495 ) -> Result<(BlockNumber, AccountProof), RpcError> {
496 let GetAccountRequest { storage, at, known_code, vault } = request;
497
498 let known_code_commitment = known_code.as_ref().map_or(EMPTY_WORD, AccountCode::commitment);
499 let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
500 if let Some(account_code) = known_code {
501 known_codes_by_commitment.insert(account_code.commitment(), account_code);
502 }
503
504 let requirements = match storage.clone() {
506 StorageMapFetch::Slots(reqs) => reqs,
507 StorageMapFetch::Skip | StorageMapFetch::All => AccountStorageRequirements::default(),
508 };
509
510 let account_details = if account_id.is_public() {
513 Some(AccountDetailRequest {
514 code_commitment: Some(known_code_commitment.into()),
515 asset_vault_commitment: vault.into(),
516 storage_request: storage.into(),
517 })
518 } else {
519 None
520 };
521
522 let block_num = match at {
523 AccountStateAt::Block(number) => Some(number.into()),
524 AccountStateAt::ChainTip => None,
525 };
526
527 let proto_request = AccountRequest {
528 account_id: Some(account_id.into()),
529 block_num,
530 details: account_details,
531 };
532
533 let response = self
534 .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
535 let request = proto_request.clone();
536 Box::pin(async move { rpc_api.get_account(request).await })
537 })
538 .await?
539 .into_inner();
540
541 let account_witness: AccountWitness = response
542 .witness
543 .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
544 .try_into()?;
545
546 let block_num: BlockNumber = response
547 .block_num
548 .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
549 .block_num
550 .into();
551
552 let headers = if account_witness.id().is_public() {
554 let details = response
555 .details
556 .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
557 .into_domain(&known_codes_by_commitment, &requirements)?;
558
559 Some(details)
560 } else {
561 None
562 };
563
564 let proof = AccountProof::new(account_witness, headers)
565 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
566
567 Ok((block_num, proof))
568 }
569
570 async fn sync_notes(
576 &self,
577 block_from: BlockNumber,
578 block_to: BlockNumber,
579 note_tags: &BTreeSet<NoteTag>,
580 ) -> Result<Vec<NoteSyncBlock>, RpcError> {
581 if note_tags.is_empty() {
582 return Ok(Vec::new());
583 }
584
585 let limits = self.get_rpc_limits().await?;
586 let tags: Vec<NoteTag> = note_tags.iter().copied().collect();
587
588 let mut merged_blocks: BTreeMap<BlockNumber, NoteSyncBlock> = BTreeMap::new();
591
592 for chunk in tags.chunks(limits.note_tags_limit as usize) {
593 let proto_tags: Vec<u32> = chunk.iter().map(|&t| t.into()).collect();
594 let mut pagination = BlockPagination::new(block_from, block_to);
595
596 loop {
597 let request = proto::rpc::SyncNotesRequest {
598 block_range: Some(BlockRange {
599 block_from: pagination.current_block_from().as_u32(),
600 block_to: block_to.as_u32(),
601 }),
602 note_tags: proto_tags.clone(),
603 };
604
605 let response = self
606 .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
607 let request = request.clone();
608 Box::pin(async move { rpc_api.sync_notes(request).await })
609 })
610 .await?
611 .into_inner();
612
613 let page = response.pagination_info.ok_or(RpcError::ExpectedDataMissing(
614 "SyncNotesResponse.pagination_info".to_owned(),
615 ))?;
616 let page_chain_tip = BlockNumber::from(page.chain_tip);
617 let page_block_to = BlockNumber::from(page.block_num);
618
619 for proto_block in response.blocks {
620 let block: NoteSyncBlock = proto_block.try_into()?;
621 let bn = block.block_header.block_num();
622 if let Some(existing) = merged_blocks.get_mut(&bn) {
623 for (id, note) in block.notes {
624 existing.notes.entry(id).or_insert(note);
625 }
626 } else {
627 merged_blocks.insert(bn, block);
628 }
629 }
630
631 match pagination.advance(page_block_to, page_chain_tip)? {
632 PaginationResult::Continue => {},
633 PaginationResult::Done { .. } => break,
634 }
635 }
636 }
637
638 Ok(merged_blocks.into_values().collect())
639 }
640
641 async fn sync_nullifiers(
642 &self,
643 prefixes: &[u16],
644 block_from: BlockNumber,
645 block_to: BlockNumber,
646 ) -> Result<Vec<NullifierUpdate>, RpcError> {
647 let limits = self.get_rpc_limits().await?;
648 let mut all_nullifiers = BTreeSet::new();
649
650 for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
653 let proto_prefixes: Vec<u32> = chunk.iter().map(|&x| u32::from(x)).collect();
654 let mut pagination = BlockPagination::new(block_from, block_to);
655
656 loop {
657 let request = proto::rpc::SyncNullifiersRequest {
658 nullifiers: proto_prefixes.clone(),
659 prefix_len: 16,
660 block_range: Some(BlockRange {
661 block_from: pagination.current_block_from().as_u32(),
662 block_to: pagination.block_to().as_u32(),
663 }),
664 };
665
666 let response = self
667 .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
668 let request = request.clone();
669 Box::pin(async move { rpc_api.sync_nullifiers(request).await })
670 })
671 .await?
672 .into_inner();
673
674 let batch_nullifiers = response
675 .nullifiers
676 .iter()
677 .map(TryFrom::try_from)
678 .collect::<Result<Vec<NullifierUpdate>, _>>()
679 .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
680
681 all_nullifiers.extend(batch_nullifiers);
682
683 let page = response.pagination_info.ok_or(RpcError::ExpectedDataMissing(
684 "SyncNullifiersResponse.pagination_info".to_owned(),
685 ))?;
686
687 match pagination.advance(page.block_num.into(), page.chain_tip.into())? {
688 PaginationResult::Continue => {},
689 PaginationResult::Done { .. } => break,
690 }
691 }
692 }
693 Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
694 }
695
696 async fn get_block_by_number(
697 &self,
698 block_num: BlockNumber,
699 include_proof: bool,
700 ) -> Result<ProvenBlock, RpcError> {
701 let request = proto::blockchain::BlockRequest {
702 block_num: block_num.as_u32(),
703 include_proof: Some(include_proof),
704 };
705
706 let response = self
707 .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
708 Box::pin(async move { rpc_api.get_block_by_number(request).await })
709 })
710 .await?;
711
712 let response = response.into_inner();
713 let block =
714 ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
715 "GetBlockByNumberResponse.block".to_string(),
716 ))?)?;
717
718 Ok(block)
719 }
720
721 async fn get_note_script_by_root(&self, root: Word) -> Result<Option<NoteScript>, RpcError> {
722 let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
723
724 let response = self
725 .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
726 Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
727 })
728 .await?;
729
730 let Some(script) = response.into_inner().script else {
732 return Ok(None);
733 };
734 let note_script = NoteScript::try_from(script)?;
735
736 let fetched_root = note_script.root();
737 if Word::from(fetched_root) != root {
738 return Err(RpcError::InvalidResponse(format!(
739 "node returned note script with root {fetched_root} for requested root {root}",
740 )));
741 }
742
743 Ok(Some(note_script))
744 }
745
746 async fn sync_storage_maps(
747 &self,
748 block_from: BlockNumber,
749 block_to: BlockNumber,
750 account_id: AccountId,
751 ) -> Result<StorageMapInfo, RpcError> {
752 let mut pagination = BlockPagination::new(block_from, block_to);
753 let mut updates = Vec::new();
754
755 let (chain_tip, block_number) = loop {
756 let request = proto::rpc::SyncAccountStorageMapsRequest {
757 block_range: Some(BlockRange {
758 block_from: pagination.current_block_from().as_u32(),
759 block_to: block_to.as_u32(),
760 }),
761 account_id: Some(account_id.into()),
762 };
763 let response = self
764 .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
765 let request = request.clone();
766 Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
767 })
768 .await?;
769 let response = response.into_inner();
770 let page = response
771 .pagination_info
772 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
773 let page_block_num = BlockNumber::from(page.block_num);
774 let page_chain_tip = BlockNumber::from(page.chain_tip);
775 let batch = response
776 .updates
777 .into_iter()
778 .map(TryInto::try_into)
779 .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
780 updates.extend(batch);
781
782 match pagination.advance(page_block_num, page_chain_tip)? {
783 PaginationResult::Continue => {},
784 PaginationResult::Done {
785 chain_tip: final_chain_tip,
786 block_num: final_block_num,
787 } => break (final_chain_tip, final_block_num),
788 }
789 };
790
791 Ok(StorageMapInfo { chain_tip, block_number, updates })
792 }
793
794 async fn sync_account_vault(
795 &self,
796 block_from: BlockNumber,
797 block_to: BlockNumber,
798 account_id: AccountId,
799 ) -> Result<AccountVaultInfo, RpcError> {
800 let mut pagination = BlockPagination::new(block_from, block_to);
801 let mut updates = Vec::new();
802
803 let (chain_tip, block_number) = loop {
804 let request = proto::rpc::SyncAccountVaultRequest {
805 block_range: Some(BlockRange {
806 block_from: pagination.current_block_from().as_u32(),
807 block_to: block_to.as_u32(),
808 }),
809 account_id: Some(account_id.into()),
810 };
811 let response = self
812 .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
813 let request = request.clone();
814 Box::pin(async move { rpc_api.sync_account_vault(request).await })
815 })
816 .await?;
817 let response = response.into_inner();
818 let page = response
819 .pagination_info
820 .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
821 let page_block_num = BlockNumber::from(page.block_num);
822 let page_chain_tip = BlockNumber::from(page.chain_tip);
823 let batch = response
824 .updates
825 .iter()
826 .map(|u| (*u).try_into())
827 .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
828 updates.extend(batch);
829
830 match pagination.advance(page_block_num, page_chain_tip)? {
831 PaginationResult::Continue => {},
832 PaginationResult::Done {
833 chain_tip: final_chain_tip,
834 block_num: final_block_num,
835 } => break (final_chain_tip, final_block_num),
836 }
837 };
838
839 Ok(AccountVaultInfo { chain_tip, block_number, updates })
840 }
841
842 async fn sync_transactions(
848 &self,
849 block_from: BlockNumber,
850 block_to: BlockNumber,
851 account_ids: Vec<AccountId>,
852 ) -> Result<Vec<TransactionRecord>, RpcError> {
853 if account_ids.is_empty() {
854 return Ok(Vec::new());
855 }
856
857 let limits = self.get_rpc_limits().await?;
858 let mut transactions: Vec<TransactionRecord> = Vec::new();
859
860 for chunk in account_ids.chunks(limits.account_ids_limit as usize) {
861 let proto_account_ids: Vec<_> = chunk.iter().map(|acc_id| (*acc_id).into()).collect();
862 let mut pagination = BlockPagination::new(block_from, block_to);
863
864 loop {
865 let request = proto::rpc::SyncTransactionsRequest {
866 block_range: Some(BlockRange {
867 block_from: pagination.current_block_from().as_u32(),
868 block_to: block_to.as_u32(),
869 }),
870 account_ids: proto_account_ids.clone(),
871 };
872
873 let response = self
874 .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
875 let request = request.clone();
876 Box::pin(async move { rpc_api.sync_transactions(request).await })
877 })
878 .await?
879 .into_inner();
880
881 let page = response.pagination_info.ok_or(RpcError::ExpectedDataMissing(
882 "SyncTransactionsResponse.pagination_info".to_owned(),
883 ))?;
884 let page_chain_tip = BlockNumber::from(page.chain_tip);
885 let page_block_to = BlockNumber::from(page.block_num);
886
887 for proto_tx in response.transactions {
888 transactions.push(TransactionRecord::try_from(proto_tx)?);
889 }
890
891 match pagination.advance(page_block_to, page_chain_tip)? {
892 PaginationResult::Continue => {},
893 PaginationResult::Done { .. } => break,
894 }
895 }
896 }
897
898 Ok(transactions)
899 }
900
901 async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
902 let endpoint: Endpoint =
903 Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
904 Ok(endpoint.to_network_id())
905 }
906
907 async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
908 if let Some(limits) = *self.limits.read() {
910 return Ok(limits);
911 }
912
913 let response = self
915 .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
916 Box::pin(async move { rpc_api.get_limits(()).await })
917 })
918 .await?;
919 let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
920
921 self.limits.write().replace(limits);
923 Ok(limits)
924 }
925
926 fn has_rpc_limits(&self) -> Option<RpcLimits> {
927 *self.limits.read()
928 }
929
930 async fn set_rpc_limits(&self, limits: RpcLimits) {
931 self.limits.write().replace(limits);
932 }
933
934 async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
935 GrpcClient::get_status_unversioned(self).await
936 }
937
938 async fn get_network_note_status(
939 &self,
940 note_id: NoteId,
941 ) -> Result<NetworkNoteStatusInfo, RpcError> {
942 let request = proto::note::NoteId { id: Some(note_id.into()) };
943
944 let response = self
945 .call_with_retry(RpcEndpoint::GetNetworkNoteStatus, |mut rpc_api| {
946 Box::pin(async move { rpc_api.get_network_note_status(request).await })
947 })
948 .await?;
949
950 response.into_inner().try_into()
951 }
952}
953
954impl RpcError {
958 pub fn from_grpc_error_with_context(
959 endpoint: RpcEndpoint,
960 status: Status,
961 context: AcceptHeaderContext,
962 ) -> Self {
963 if let Some(accept_error) =
964 AcceptHeaderError::try_from_message_with_context(status.message(), context)
965 {
966 return Self::AcceptHeaderError(accept_error);
967 }
968
969 let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
971
972 let error_kind = GrpcError::from(&status);
973 let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
974
975 Self::RequestError {
976 endpoint,
977 error_kind,
978 endpoint_error,
979 source: Some(source),
980 }
981 }
982}
983
984impl From<&Status> for GrpcError {
985 fn from(status: &Status) -> Self {
986 GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
987 }
988}
989
990#[cfg(test)]
991mod tests {
992 use std::boxed::Box;
993
994 use miden_protocol::Word;
995 use miden_protocol::block::BlockNumber;
996
997 use super::{BlockPagination, GrpcClient, PaginationResult};
998 use crate::alloc::string::ToString;
999 use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1000
1001 fn assert_send_sync<T: Send + Sync>() {}
1002
1003 #[test]
1004 fn is_send_sync() {
1005 assert_send_sync::<GrpcClient>();
1006 assert_send_sync::<Box<dyn NodeRpcClient>>();
1007 }
1008
1009 #[test]
1010 fn block_pagination_errors_when_block_num_goes_backwards() {
1011 let mut pagination = BlockPagination::new(10_u32.into(), 20_u32.into());
1012
1013 let res = pagination.advance(9_u32.into(), 20_u32.into());
1014 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1015 }
1016
1017 #[test]
1018 fn block_pagination_errors_after_max_iterations() {
1019 let mut pagination = BlockPagination::new(0_u32.into(), 10_000_u32.into());
1020 let chain_tip: BlockNumber = 10_000_u32.into();
1021
1022 for _ in 0..BlockPagination::MAX_ITERATIONS {
1023 let current = pagination.current_block_from();
1024 let res = pagination
1025 .advance(current, chain_tip)
1026 .expect("expected pagination to continue within iteration limit");
1027 assert!(matches!(res, PaginationResult::Continue));
1028 }
1029
1030 let res = pagination.advance(pagination.current_block_from(), chain_tip);
1031 assert!(matches!(res, Err(RpcError::PaginationError(_))));
1032 }
1033
1034 #[test]
1035 fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1036 let mut pagination = BlockPagination::new(0_u32.into(), 50_u32.into());
1038
1039 let res = pagination
1040 .advance(30_u32.into(), 30_u32.into())
1041 .expect("expected pagination to succeed");
1042
1043 assert!(matches!(
1044 res,
1045 PaginationResult::Done {
1046 chain_tip,
1047 block_num
1048 } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1049 ));
1050 }
1051
1052 #[test]
1053 fn block_pagination_advances_cursor_by_one() {
1054 let mut pagination = BlockPagination::new(5_u32.into(), 100_u32.into());
1055
1056 let res = pagination
1057 .advance(5_u32.into(), 100_u32.into())
1058 .expect("expected pagination to succeed");
1059 assert!(matches!(res, PaginationResult::Continue));
1060 assert_eq!(pagination.current_block_from().as_u32(), 6);
1061 }
1062
1063 async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1065 let res = client.get_block_header_by_number(None, false).await;
1067 assert!(res.is_ok());
1068 }
1069
1070 #[tokio::test]
1071 async fn future_is_send() {
1072 let endpoint = &Endpoint::devnet();
1073 let client = GrpcClient::new(endpoint, 10000);
1074 let client: Box<GrpcClient> = client.into();
1075 tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1076 }
1077
1078 #[tokio::test]
1079 async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1080 let endpoint = &Endpoint::devnet();
1081 let client = GrpcClient::new(endpoint, 10000);
1082
1083 assert!(client.genesis_commitment.read().is_none());
1084
1085 let commitment = Word::default();
1086 client.set_genesis_commitment(commitment).await.unwrap();
1087
1088 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1089 }
1090
1091 #[tokio::test]
1092 async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1093 let endpoint = &Endpoint::devnet();
1094 let client = GrpcClient::new(endpoint, 10000);
1095
1096 let initial_commitment = Word::default();
1097 client.set_genesis_commitment(initial_commitment).await.unwrap();
1098
1099 let new_commitment = Word::from([1u32, 2, 3, 4]);
1100 client.set_genesis_commitment(new_commitment).await.unwrap();
1101
1102 assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1103 }
1104
1105 #[tokio::test]
1106 async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1107 let endpoint = &Endpoint::devnet();
1108 let client = GrpcClient::new(endpoint, 10000);
1109
1110 client.connect().await.unwrap();
1112
1113 let commitment = Word::default();
1114 client.set_genesis_commitment(commitment).await.unwrap();
1115
1116 assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1117 assert!(client.client.read().as_ref().is_some());
1118 }
1119
1120 #[test]
1121 fn with_bearer_auth_stores_token() {
1122 let endpoint = &Endpoint::devnet();
1123 let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("token-one".to_string());
1124
1125 assert_eq!(client.bearer_token.as_deref(), Some("token-one"));
1126 }
1127
1128 #[test]
1129 fn with_bearer_auth_overwrites_on_repeat_call() {
1130 let endpoint = &Endpoint::devnet();
1131 let client = GrpcClient::new(endpoint, 10000)
1132 .with_bearer_auth("token-one".to_string())
1133 .with_bearer_auth("token-two".to_string());
1134
1135 assert_eq!(client.bearer_token.as_deref(), Some("token-two"));
1137 }
1138
1139 #[tokio::test]
1140 async fn with_bearer_auth_surfaces_invalid_ascii_value_at_connect_time() {
1141 let endpoint = &Endpoint::devnet();
1145 let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("bad\nvalue".to_string());
1146
1147 let err = client.connect().await.expect_err("expected invalid token to fail connect");
1148 assert!(
1149 matches!(err, RpcError::ConnectionError(_)),
1150 "expected ConnectionError, got {err:?}",
1151 );
1152 }
1153
1154 #[tokio::test]
1155 async fn with_bearer_auth_is_preserved_across_set_genesis_commitment() {
1156 let endpoint = &Endpoint::devnet();
1157 let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("token".to_string());
1158 client.connect().await.unwrap();
1159
1160 client.set_genesis_commitment(Word::default()).await.unwrap();
1161
1162 assert_eq!(client.bearer_token.as_deref(), Some("token"));
1164 assert!(client.client.read().as_ref().is_some());
1165 }
1166
1167 #[tokio::test]
1179 #[ignore = "requires network access to public testnet"]
1180 async fn with_bearer_auth_does_not_break_real_rpc_against_testnet() {
1181 let endpoint = &Endpoint::testnet();
1182 let client = GrpcClient::new(endpoint, 10_000).with_bearer_auth("smoke-test".to_string());
1183
1184 let status = client
1185 .get_status_unversioned()
1186 .await
1187 .expect("testnet status with caller auth header must succeed");
1188 assert!(!status.version.is_empty(), "status must include a server version");
1189 }
1190}