Skip to main content

miden_client/rpc/tonic_client/
mod.rs

1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::asset::{Asset, AssetVault};
10use miden_protocol::vm::FutureMaybeSend;
11
12type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
13
14use miden_protocol::account::{
15    Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
16};
17use miden_protocol::address::NetworkId;
18use miden_protocol::block::account_tree::AccountWitness;
19use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
20use miden_protocol::crypto::merkle::MerklePath;
21use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
22use miden_protocol::crypto::merkle::smt::SmtProof;
23use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
24use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
25use miden_protocol::utils::serde::Deserializable;
26use miden_protocol::{EMPTY_WORD, Word};
27use miden_tx::utils::serde::Serializable;
28use miden_tx::utils::sync::RwLock;
29use tonic::Status;
30use tracing::info;
31
32use super::domain::account::{
33    AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
34};
35use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
36use super::generated::rpc::account_request::AccountDetailRequest;
37use super::generated::rpc::AccountRequest;
38use super::{
39    Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
40    RpcStatusInfo,
41};
42use crate::rpc::domain::sync::ChainMmrInfo;
43use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
44use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
45use crate::rpc::domain::transaction::TransactionsInfo;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
49use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
50use crate::rpc::generated::rpc::BlockRange;
51use crate::rpc::domain::limits::RpcLimits;
52use crate::rpc::{AccountStateAt, generated as proto};
53
54mod api_client;
55mod retry;
56
57use api_client::api_client_wrapper::ApiClient;
58
59/// Tracks the pagination state for block-driven endpoints.
60struct BlockPagination {
61    current_block_from: BlockNumber,
62    block_to: Option<BlockNumber>,
63    iterations: u32,
64}
65
66enum PaginationResult {
67    Continue,
68    Done {
69        chain_tip: BlockNumber,
70        block_num: BlockNumber,
71    },
72}
73
74impl BlockPagination {
75    /// Maximum number of pagination iterations for a single request.
76    ///
77    /// Protects against nodes returning inconsistent pagination data that could otherwise
78    /// trigger an infinite loop.
79    const MAX_ITERATIONS: u32 = 1000;
80
81    fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
82        Self {
83            current_block_from: block_from,
84            block_to,
85            iterations: 0,
86        }
87    }
88
89    fn current_block_from(&self) -> BlockNumber {
90        self.current_block_from
91    }
92
93    fn block_to(&self) -> Option<BlockNumber> {
94        self.block_to
95    }
96
97    fn advance(
98        &mut self,
99        block_num: BlockNumber,
100        chain_tip: BlockNumber,
101    ) -> Result<PaginationResult, RpcError> {
102        if self.iterations >= Self::MAX_ITERATIONS {
103            return Err(RpcError::PaginationError(
104                "too many pagination iterations, possible infinite loop".to_owned(),
105            ));
106        }
107        self.iterations += 1;
108
109        if block_num < self.current_block_from {
110            return Err(RpcError::PaginationError(
111                "invalid pagination: block_num went backwards".to_owned(),
112            ));
113        }
114
115        let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
116
117        if block_num >= target_block {
118            return Ok(PaginationResult::Done { chain_tip, block_num });
119        }
120
121        self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
122
123        Ok(PaginationResult::Continue)
124    }
125}
126
127// GRPC CLIENT
128// ================================================================================================
129
130/// Client for the Node RPC API using gRPC.
131///
132/// If the `tonic` feature is enabled, this client will use a `tonic::transport::Channel` to
133/// communicate with the node. In this case the connection will be established lazily when the
134/// first request is made.
135/// If the `web-tonic` feature is enabled, this client will use a `tonic_web_wasm_client::Client`
136/// to communicate with the node.
137///
138/// In both cases, the [`GrpcClient`] depends on the types inside the `generated` module, which
139/// are generated by the build script and also depend on the target architecture.
140pub struct GrpcClient {
141    /// The underlying gRPC client, lazily initialized on first request.
142    client: RwLock<Option<ApiClient>>,
143    /// The node endpoint URL to connect to.
144    endpoint: String,
145    /// Request timeout in milliseconds.
146    timeout_ms: u64,
147    /// The genesis block commitment, used for request validation by the node.
148    genesis_commitment: RwLock<Option<Word>>,
149    /// Cached RPC limits fetched from the node.
150    limits: RwLock<Option<RpcLimits>>,
151    /// Maximum number of retry attempts for rate-limited or transiently unavailable requests.
152    max_retries: u32,
153    /// Fallback retry interval in milliseconds when no `retry-after` header is present.
154    retry_interval_ms: u64,
155}
156
157impl GrpcClient {
158    /// Returns a new instance of [`GrpcClient`] that'll do calls to the provided [`Endpoint`]
159    /// with the given timeout in milliseconds.
160    pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
161        GrpcClient {
162            client: RwLock::new(None),
163            endpoint: endpoint.to_string(),
164            timeout_ms,
165            genesis_commitment: RwLock::new(None),
166            limits: RwLock::new(None),
167            max_retries: retry::DEFAULT_MAX_RETRIES,
168            retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
169        }
170    }
171
172    /// Sets the maximum number of retry attempts for rate-limited or transiently unavailable
173    /// requests. Defaults to `4`.
174    #[must_use]
175    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
176        self.max_retries = max_retries;
177        self
178    }
179
180    /// Sets the fallback retry interval in milliseconds, used when the server does not provide
181    /// a `retry-after` header. Defaults to `100` ms.
182    #[must_use]
183    pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
184        self.retry_interval_ms = retry_interval_ms;
185        self
186    }
187
188    /// Takes care of establishing the RPC connection if not connected yet. It ensures that the
189    /// `rpc_api` field is initialized and returns a write guard to it.
190    async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
191        if self.client.read().is_none() {
192            self.connect().await?;
193        }
194
195        Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
196    }
197
198    /// Connects to the Miden node, setting the client API with the provided URL, timeout and
199    /// genesis commitment.
200    async fn connect(&self) -> Result<(), RpcError> {
201        let genesis_commitment = *self.genesis_commitment.read();
202        let new_client =
203            ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
204                .await?;
205        let mut client = self.client.write();
206        client.replace(new_client);
207
208        Ok(())
209    }
210
211    fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
212        let genesis_commitment = self
213            .genesis_commitment
214            .read()
215            .as_ref()
216            .map_or_else(|| "none".to_string(), Word::to_hex);
217        let context = AcceptHeaderContext {
218            client_version: env!("CARGO_PKG_VERSION").to_string(),
219            genesis_commitment,
220        };
221        RpcError::from_grpc_error_with_context(endpoint, status, context)
222    }
223
224    /// Executes an RPC call and automatically retries transient failures.
225    ///
226    /// The provided closure is invoked with a freshly connected [`ApiClient`] on each attempt.
227    /// Retries are delegated to [`retry::RetryState`], which currently handles gRPC
228    /// [`tonic::Code::ResourceExhausted`] and [`tonic::Code::Unavailable`] responses, including
229    /// honoring cooldown delays when the node provides them.
230    ///
231    /// Returns the first successful gRPC response. If the call keeps failing after retries are
232    /// exhausted, or if the error is not retryable, this returns the corresponding [`RpcError`]
233    /// for the provided [`RpcEndpoint`].
234    async fn call_with_retry<T: Send + 'static>(
235        &self,
236        endpoint: RpcEndpoint,
237        mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
238    ) -> Result<tonic::Response<T>, RpcError> {
239        let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
240
241        loop {
242            let rpc_api = self.ensure_connected().await?;
243
244            match call(rpc_api).await {
245                Ok(response) => return Ok(response),
246                Err(status) if retry_state.should_retry(&status).await => {},
247                Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
248            }
249        }
250    }
251
252    /// Fetches RPC status without injecting an Accept header.
253    ///
254    /// This instantiates a separate API client without the Accept interceptor, so it does not
255    /// reuse the primary gRPC client.
256    pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
257        let mut rpc_api =
258            ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
259                .await?;
260        rpc_api
261            .status(())
262            .await
263            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
264            .map(tonic::Response::into_inner)
265            .and_then(RpcStatusInfo::try_from)
266    }
267
268    // GET ACCOUNT HELPERS
269    // ============================================================================================
270
271    /// Given an [`AccountId`], return the proof for the account.
272    ///
273    /// If the account also has public state, its details will also be retrieved
274    pub async fn fetch_full_account_proof(
275        &self,
276        account_id: AccountId,
277    ) -> Result<(BlockNumber, AccountProof), RpcError> {
278        let has_public_state = account_id.has_public_state();
279        let account_request = {
280            AccountRequest {
281                account_id: Some(account_id.into()),
282                block_num: None,
283                details: {
284                    if has_public_state {
285                        // Since we have to request the storage maps for an account
286                        // we *dont know* anything about, we'll have to do first this
287                        // request, which will tell us about the account's storage slots,
288                        // and then, request the slots in another request.
289                        Some(AccountDetailRequest {
290                            code_commitment: Some(EMPTY_WORD.into()),
291                            asset_vault_commitment: Some(EMPTY_WORD.into()),
292                            storage_maps: vec![],
293                        })
294                    } else {
295                        None
296                    }
297                },
298            }
299        };
300        let account_response = self
301            .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
302                let request = account_request.clone();
303                Box::pin(async move { rpc_api.get_account(request).await })
304            })
305            .await?
306            .into_inner();
307        let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
308            "GetAccountDetails returned an account without a matching block number for the witness"
309                .to_owned(),
310        ))?;
311        let account_proof = {
312            if has_public_state {
313                let account_details = account_response
314                    .details
315                    .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
316                    .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
317                let storage_header = account_details.storage_details.header;
318                // This variable will hold the storage slots that are maps, below we will use it to
319                // actually fetch the storage maps details, since we now know the names of each
320                // storage slot.
321                let maps_to_request = storage_header
322                    .slots()
323                    .filter(|header| header.slot_type().is_map())
324                    .map(|map| map.name().to_string());
325                let account_request = AccountRequest {
326                    account_id: Some(account_id.into()),
327                    block_num: Some(block_number),
328                    details: Some(AccountDetailRequest {
329                        code_commitment: Some(EMPTY_WORD.into()),
330                        asset_vault_commitment: Some(EMPTY_WORD.into()),
331                        storage_maps: maps_to_request
332                            .map(|slot_name| StorageMapDetailRequest {
333                                slot_name,
334                                slot_data: Some(SlotData::AllEntries(true)),
335                            })
336                            .collect(),
337                    }),
338                };
339                let response = self
340                    .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
341                        let request = account_request.clone();
342                        Box::pin(async move { rpc_api.get_account(request).await })
343                    })
344                    .await?;
345                response.into_inner().try_into()
346            } else {
347                account_response.try_into()
348            }
349        };
350        Ok((block_number.block_num.into(), account_proof?))
351    }
352
353    /// Given the storage details for an account and its id, returns a vector with all of its
354    /// storage slots. Keep in mind that if an account triggers the `too_many_entries` flag, there
355    /// will potentially be multiple requests.
356    async fn build_storage_slots(
357        &self,
358        account_id: AccountId,
359        storage_details: &AccountStorageDetails,
360        block_to: Option<BlockNumber>,
361    ) -> Result<Vec<StorageSlot>, RpcError> {
362        let mut slots = vec![];
363        // `SyncStorageMaps` will return information for *every* map for a given account, so this
364        // map_cache value should be fetched only once, hence the None placeholder
365        let mut map_cache: Option<StorageMapInfo> = None;
366        for slot_header in storage_details.header.slots() {
367            // We have two cases for each slot:
368            // - Slot is a value => We simply instance a StorageSlot
369            // - Slot is a map => If the map is 'small', we can simply
370            // build the map from the given entries. Otherwise we will have to
371            // call the SyncStorageMaps RPC method to obtain the data for the map.
372            // With the current setup, one RPC call should be enough.
373            match slot_header.slot_type() {
374                StorageSlotType::Value => {
375                    slots.push(miden_protocol::account::StorageSlot::with_value(
376                        slot_header.name().clone(),
377                        slot_header.value(),
378                    ));
379                },
380                StorageSlotType::Map => {
381                    let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
382                        RpcError::ExpectedDataMissing(format!(
383                            "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
384                            slot_header.name(),
385                        )),
386                    )?;
387
388                    let storage_map = if map_details.too_many_entries {
389                        let map_info = if let Some(ref info) = map_cache {
390                            info
391                        } else {
392                            let fetched_data =
393                                self.sync_storage_maps(0_u32.into(), block_to, account_id).await?;
394                            map_cache.insert(fetched_data)
395                        };
396                        // The sync endpoint may return multiple updates for the same key
397                        // across different blocks. We sort by block number so that
398                        // inserting into the map keeps only the latest value per key.
399                        let mut sorted_updates: Vec<_> = map_info
400                            .updates
401                            .iter()
402                            .filter(|slot_info| slot_info.slot_name == *slot_header.name())
403                            .collect();
404                        sorted_updates.sort_by_key(|u| u.block_num);
405                        let map_entries: Vec<_> = sorted_updates
406                            .into_iter()
407                            .map(|u| (u.key, u.value))
408                            .collect::<BTreeMap<_, _>>()
409                            .into_iter()
410                            .collect();
411                        StorageMap::with_entries(map_entries)
412                    } else {
413                        map_details.entries.clone().into_storage_map().ok_or_else(|| {
414                            RpcError::ExpectedDataMissing(
415                                "expected AllEntries for full account fetch, got EntriesWithProofs"
416                                    .into(),
417                            )
418                        })?
419                    }
420                    .map_err(|err| {
421                        RpcError::InvalidResponse(format!(
422                            "the rpc api returned a non-valid map entry: {err}"
423                        ))
424                    })?;
425
426                    slots.push(miden_protocol::account::StorageSlot::with_map(
427                        slot_header.name().clone(),
428                        storage_map,
429                    ));
430                },
431            }
432        }
433        Ok(slots)
434    }
435
436    /// Fetches the full vault assets via `SyncAccountVault`.
437    ///
438    /// Used when `too_many_assets` is set in the response. Deduplicates updates by
439    /// vault key, keeping only the latest value per key.
440    async fn fetch_full_vault(
441        &self,
442        account_id: AccountId,
443        block_to: Option<BlockNumber>,
444    ) -> Result<Vec<Asset>, RpcError> {
445        let vault_info =
446            self.sync_account_vault(BlockNumber::from(0), block_to, account_id).await?;
447        let mut updates = vault_info.updates;
448        updates.sort_by_key(|u| u.block_num);
449        Ok(updates
450            .into_iter()
451            .map(|u| (u.vault_key, u.asset))
452            .collect::<BTreeMap<_, _>>()
453            .into_values()
454            .flatten()
455            .collect())
456    }
457}
458
459#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
460#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
461impl NodeRpcClient for GrpcClient {
462    /// Sets the genesis commitment for the client. If the client is already connected, it will be
463    /// updated to use the new commitment on subsequent requests. If the client is not connected,
464    /// the commitment will be stored and used when the client connects. If the genesis commitment
465    /// is already set, this method does nothing.
466    fn has_genesis_commitment(&self) -> Option<Word> {
467        *self.genesis_commitment.read()
468    }
469
470    async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
471        // Check if already set before doing anything else
472        if self.genesis_commitment.read().is_some() {
473            // Genesis commitment is already set, ignoring the new value.
474            return Ok(());
475        }
476
477        // Store the commitment for future connections
478        self.genesis_commitment.write().replace(commitment);
479
480        // If a client is already connected, update it to use the new genesis commitment.
481        // If not connected, the commitment will be used when connect() is called.
482        let mut client_guard = self.client.write();
483        if let Some(client) = client_guard.as_mut() {
484            client.set_genesis_commitment(commitment);
485        }
486
487        Ok(())
488    }
489
490    async fn submit_proven_transaction(
491        &self,
492        proven_transaction: ProvenTransaction,
493        transaction_inputs: TransactionInputs,
494    ) -> Result<BlockNumber, RpcError> {
495        let request = proto::transaction::ProvenTransaction {
496            transaction: proven_transaction.to_bytes(),
497            transaction_inputs: Some(transaction_inputs.to_bytes()),
498        };
499
500        let api_response = self
501            .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
502                let request = request.clone();
503                Box::pin(async move { rpc_api.submit_proven_transaction(request).await })
504            })
505            .await?;
506
507        Ok(BlockNumber::from(api_response.into_inner().block_num))
508    }
509
510    async fn get_block_header_by_number(
511        &self,
512        block_num: Option<BlockNumber>,
513        include_mmr_proof: bool,
514    ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
515        let request = proto::rpc::BlockHeaderByNumberRequest {
516            block_num: block_num.as_ref().map(BlockNumber::as_u32),
517            include_mmr_proof: Some(include_mmr_proof),
518        };
519
520        info!("Calling GetBlockHeaderByNumber: {:?}", request);
521
522        let api_response = self
523            .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
524                Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
525            })
526            .await?;
527
528        let response = api_response.into_inner();
529
530        let block_header: BlockHeader = response
531            .block_header
532            .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
533            .try_into()?;
534
535        let mmr_proof = if include_mmr_proof {
536            let forest = response
537                .chain_length
538                .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
539            let merkle_path: MerklePath = response
540                .mmr_path
541                .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
542                .try_into()?;
543
544            Some(MmrProof::new(
545                MmrPath::new(
546                    Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
547                    block_header.block_num().as_usize(),
548                    merkle_path,
549                ),
550                block_header.commitment(),
551            ))
552        } else {
553            None
554        };
555
556        Ok((block_header, mmr_proof))
557    }
558
559    async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
560        let limits = self.get_rpc_limits().await?;
561        let mut notes = Vec::with_capacity(note_ids.len());
562        for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
563            let request = proto::note::NoteIdList {
564                ids: chunk.iter().map(|id| (*id).into()).collect(),
565            };
566
567            let api_response = self
568                .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
569                    let request = request.clone();
570                    Box::pin(async move { rpc_api.get_notes_by_id(request).await })
571                })
572                .await?;
573
574            let response_notes = api_response
575                .into_inner()
576                .notes
577                .into_iter()
578                .map(FetchedNote::try_from)
579                .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
580
581            notes.extend(response_notes);
582        }
583        Ok(notes)
584    }
585
586    async fn sync_chain_mmr(
587        &self,
588        block_from: BlockNumber,
589        block_to: Option<BlockNumber>,
590    ) -> Result<ChainMmrInfo, RpcError> {
591        let block_range = Some(BlockRange {
592            block_from: block_from.as_u32(),
593            block_to: block_to.map(|b| b.as_u32()),
594        });
595
596        let request = proto::rpc::SyncChainMmrRequest {
597            block_range,
598            finality: proto::rpc::Finality::Committed as i32,
599        };
600
601        let response = self
602            .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
603                Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
604            })
605            .await?;
606
607        response.into_inner().try_into()
608    }
609
610    /// Sends a `GetAccountDetailsRequest` to the Miden node, and extracts an [`FetchedAccount`]
611    /// from the `GetAccountDetailsResponse` response.
612    ///
613    /// # Errors
614    ///
615    /// This function will return an error if:
616    ///
617    /// - There was an error sending the request to the node.
618    /// - The answer had a `None` for one of the expected fields (`account`, `summary`,
619    ///   `account_commitment`, `details`).
620    /// - There is an error during [Account] deserialization.
621    async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
622        let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
623        let update_summary =
624            AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
625
626        // The case for a private account is simple,
627        // we simple use the commitment and its id.
628        if account_id.is_private() {
629            Ok(FetchedAccount::new_private(account_id, update_summary))
630        } else {
631            let details =
632                full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
633                    "GetAccountDetails returned a public account without details".to_owned(),
634                ))?;
635
636            let account_id = details.header.id();
637            let nonce = details.header.nonce();
638
639            // If the vault exceeds the node's size threshold, download the full vault
640            // via the sync endpoint; otherwise use the assets from the response directly.
641            let assets = if details.vault_details.too_many_assets {
642                self.fetch_full_vault(account_id, Some(block_number)).await?
643            } else {
644                details.vault_details.assets
645            };
646
647            // build_storage_slots handles too_many_entries maps internally via
648            // sync_storage_maps.
649            let slots = self
650                .build_storage_slots(account_id, &details.storage_details, Some(block_number))
651                .await?;
652            let asset_vault = AssetVault::new(&assets).map_err(|err| {
653                RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
654            })?;
655            let account_storage = AccountStorage::new(slots).map_err(|err| {
656                RpcError::InvalidResponse(format!(
657                    "api rpc returned non-valid storage slots: {err}"
658                ))
659            })?;
660            let account =
661                Account::new(account_id, asset_vault, account_storage, details.code, nonce, None)
662                    .map_err(|err| {
663                    RpcError::InvalidResponse(format!(
664                        "failed to instance an account from the rpc api response: {err}"
665                    ))
666                })?;
667            Ok(FetchedAccount::new_public(account, update_summary))
668        }
669    }
670
671    /// Sends a `GetAccountProof` request to the Miden node, and extracts the [AccountProof]
672    /// from the response, as well as the block number that it was retrieved for.
673    ///
674    /// # Errors
675    ///
676    /// This function will return an error if:
677    ///
678    /// - The requested Account isn't returned by the node.
679    /// - There was an error sending the request to the node.
680    /// - The answer had a `None` for one of the expected fields.
681    /// - There is an error during storage deserialization.
682    async fn get_account_proof(
683        &self,
684        account_id: AccountId,
685        storage_requirements: AccountStorageRequirements,
686        account_state: AccountStateAt,
687        known_account_code: Option<AccountCode>,
688        known_vault_commitment: Option<Word>,
689    ) -> Result<(BlockNumber, AccountProof), RpcError> {
690        let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
691        if let Some(account_code) = known_account_code {
692            known_codes_by_commitment.insert(account_code.commitment(), account_code);
693        }
694
695        let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
696
697        // Only request details for accounts with public state (Public or Network);
698        // include known code commitment for this account when available
699        let account_details = if account_id.has_public_state() {
700            Some(AccountDetailRequest {
701                code_commitment: Some(EMPTY_WORD.into()),
702                asset_vault_commitment: known_vault_commitment.map(Into::into),
703                storage_maps,
704            })
705        } else {
706            None
707        };
708
709        let block_num = match account_state {
710            AccountStateAt::Block(number) => Some(number.into()),
711            AccountStateAt::ChainTip => None,
712        };
713
714        let request = AccountRequest {
715            account_id: Some(account_id.into()),
716            block_num,
717            details: account_details,
718        };
719
720        let response = self
721            .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
722                let request = request.clone();
723                Box::pin(async move { rpc_api.get_account(request).await })
724            })
725            .await?
726            .into_inner();
727
728        let account_witness: AccountWitness = response
729            .witness
730            .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
731            .try_into()?;
732
733        let block_num: BlockNumber = response
734            .block_num
735            .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
736            .block_num
737            .into();
738
739        // For accounts with public state, details should be present when requested
740        let headers = if account_witness.id().has_public_state() {
741            let mut details = response
742                .details
743                .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
744                .into_domain(&known_codes_by_commitment, &storage_requirements)?;
745
746            if details.vault_details.too_many_assets {
747                details.vault_details.assets =
748                    self.fetch_full_vault(account_id, Some(block_num)).await?;
749            }
750
751            Some(details)
752        } else {
753            None
754        };
755
756        let proof = AccountProof::new(account_witness, headers)
757            .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
758
759        Ok((block_num, proof))
760    }
761
762    /// Sends a `SyncNoteRequest` to the Miden node, and extracts a [`NoteSyncInfo`] from the
763    /// response.
764    async fn sync_notes(
765        &self,
766        block_num: BlockNumber,
767        block_to: Option<BlockNumber>,
768        note_tags: &BTreeSet<NoteTag>,
769    ) -> Result<NoteSyncInfo, RpcError> {
770        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
771
772        let block_range = Some(BlockRange {
773            block_from: block_num.as_u32(),
774            block_to: block_to.map(|b| b.as_u32()),
775        });
776
777        let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
778
779        let response = self
780            .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
781                let request = request.clone();
782                Box::pin(async move { rpc_api.sync_notes(request).await })
783            })
784            .await?;
785
786        response.into_inner().try_into()
787    }
788
789    async fn sync_nullifiers(
790        &self,
791        prefixes: &[u16],
792        block_num: BlockNumber,
793        block_to: Option<BlockNumber>,
794    ) -> Result<Vec<NullifierUpdate>, RpcError> {
795        const MAX_ITERATIONS: u32 = 1000; // Safety limit to prevent infinite loops
796
797        let limits = self.get_rpc_limits().await?;
798        let mut all_nullifiers = BTreeSet::new();
799
800        // If the prefixes are too many, we need to chunk them into smaller groups to avoid
801        // violating the RPC limit.
802        'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
803            let mut current_block_from = block_num.as_u32();
804
805            for _ in 0..MAX_ITERATIONS {
806                let request = proto::rpc::SyncNullifiersRequest {
807                    nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
808                    prefix_len: 16,
809                    block_range: Some(BlockRange {
810                        block_from: current_block_from,
811                        block_to: block_to.map(|b| b.as_u32()),
812                    }),
813                };
814
815                let response = self
816                    .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
817                        let request = request.clone();
818                        Box::pin(async move { rpc_api.sync_nullifiers(request).await })
819                    })
820                    .await?;
821                let response = response.into_inner();
822
823                // Convert nullifiers for this batch
824                let batch_nullifiers = response
825                    .nullifiers
826                    .iter()
827                    .map(TryFrom::try_from)
828                    .collect::<Result<Vec<NullifierUpdate>, _>>()
829                    .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
830
831                all_nullifiers.extend(batch_nullifiers);
832
833                // Check if we need to fetch more pages
834                if let Some(page) = response.pagination_info {
835                    // Ensure we're making progress to avoid infinite loops
836                    if page.block_num < current_block_from {
837                        return Err(RpcError::PaginationError(
838                            "invalid pagination: block_num went backwards".to_string(),
839                        ));
840                    }
841
842                    // Calculate target block as minimum between block_to and chain_tip
843                    let target_block =
844                        block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
845
846                    if page.block_num >= target_block {
847                        // No pagination info or we've reached/passed the target so we're done
848                        continue 'chunk_nullifiers;
849                    }
850                    current_block_from = page.block_num + 1;
851                }
852            }
853            // If we exit the loop, we've hit the iteration limit
854            return Err(RpcError::PaginationError(
855                "too many pagination iterations, possible infinite loop".to_string(),
856            ));
857        }
858        Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
859    }
860
861    async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
862        let limits = self.get_rpc_limits().await?;
863        let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
864        for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
865            let request = proto::rpc::NullifierList {
866                nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
867            };
868
869            let response = self
870                .call_with_retry(RpcEndpoint::CheckNullifiers, |mut rpc_api| {
871                    let request = request.clone();
872                    Box::pin(async move { rpc_api.check_nullifiers(request).await })
873                })
874                .await?;
875
876            let mut response = response.into_inner();
877            let chunk_proofs = response
878                .proofs
879                .iter_mut()
880                .map(|r| r.to_owned().try_into())
881                .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
882            proofs.extend(chunk_proofs);
883        }
884        Ok(proofs)
885    }
886
887    async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
888        let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
889
890        let response = self
891            .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
892                Box::pin(async move { rpc_api.get_block_by_number(request).await })
893            })
894            .await?;
895
896        let response = response.into_inner();
897        let block =
898            ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
899                "GetBlockByNumberResponse.block".to_string(),
900            ))?)?;
901
902        Ok(block)
903    }
904
905    async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
906        let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
907
908        let response = self
909            .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
910                Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
911            })
912            .await?;
913
914        let response = response.into_inner();
915        let note_script = NoteScript::try_from(
916            response
917                .script
918                .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
919        )?;
920
921        Ok(note_script)
922    }
923
924    async fn sync_storage_maps(
925        &self,
926        block_from: BlockNumber,
927        block_to: Option<BlockNumber>,
928        account_id: AccountId,
929    ) -> Result<StorageMapInfo, RpcError> {
930        let mut pagination = BlockPagination::new(block_from, block_to);
931        let mut updates = Vec::new();
932
933        let (chain_tip, block_number) = loop {
934            let request = proto::rpc::SyncAccountStorageMapsRequest {
935                block_range: Some(BlockRange {
936                    block_from: pagination.current_block_from().as_u32(),
937                    block_to: pagination.block_to().map(|block| block.as_u32()),
938                }),
939                account_id: Some(account_id.into()),
940            };
941            let response = self
942                .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
943                    let request = request.clone();
944                    Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
945                })
946                .await?;
947            let response = response.into_inner();
948            let page = response
949                .pagination_info
950                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
951            let page_block_num = BlockNumber::from(page.block_num);
952            let page_chain_tip = BlockNumber::from(page.chain_tip);
953            let batch = response
954                .updates
955                .into_iter()
956                .map(TryInto::try_into)
957                .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
958            updates.extend(batch);
959
960            match pagination.advance(page_block_num, page_chain_tip)? {
961                PaginationResult::Continue => {},
962                PaginationResult::Done {
963                    chain_tip: final_chain_tip,
964                    block_num: final_block_num,
965                } => break (final_chain_tip, final_block_num),
966            }
967        };
968
969        Ok(StorageMapInfo { chain_tip, block_number, updates })
970    }
971
972    async fn sync_account_vault(
973        &self,
974        block_from: BlockNumber,
975        block_to: Option<BlockNumber>,
976        account_id: AccountId,
977    ) -> Result<AccountVaultInfo, RpcError> {
978        let mut pagination = BlockPagination::new(block_from, block_to);
979        let mut updates = Vec::new();
980
981        let (chain_tip, block_number) = loop {
982            let request = proto::rpc::SyncAccountVaultRequest {
983                block_range: Some(BlockRange {
984                    block_from: pagination.current_block_from().as_u32(),
985                    block_to: pagination.block_to().map(|block| block.as_u32()),
986                }),
987                account_id: Some(account_id.into()),
988            };
989            let response = self
990                .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
991                    let request = request.clone();
992                    Box::pin(async move { rpc_api.sync_account_vault(request).await })
993                })
994                .await?;
995            let response = response.into_inner();
996            let page = response
997                .pagination_info
998                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
999            let page_block_num = BlockNumber::from(page.block_num);
1000            let page_chain_tip = BlockNumber::from(page.chain_tip);
1001            let batch = response
1002                .updates
1003                .iter()
1004                .map(|u| (*u).try_into())
1005                .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
1006            updates.extend(batch);
1007
1008            match pagination.advance(page_block_num, page_chain_tip)? {
1009                PaginationResult::Continue => {},
1010                PaginationResult::Done {
1011                    chain_tip: final_chain_tip,
1012                    block_num: final_block_num,
1013                } => break (final_chain_tip, final_block_num),
1014            }
1015        };
1016
1017        Ok(AccountVaultInfo { chain_tip, block_number, updates })
1018    }
1019
1020    async fn sync_transactions(
1021        &self,
1022        block_from: BlockNumber,
1023        block_to: Option<BlockNumber>,
1024        account_ids: Vec<AccountId>,
1025    ) -> Result<TransactionsInfo, RpcError> {
1026        let block_range = Some(BlockRange {
1027            block_from: block_from.as_u32(),
1028            block_to: block_to.map(|b| b.as_u32()),
1029        });
1030
1031        let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
1032
1033        let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
1034
1035        let response = self
1036            .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
1037                let request = request.clone();
1038                Box::pin(async move { rpc_api.sync_transactions(request).await })
1039            })
1040            .await?;
1041
1042        response.into_inner().try_into()
1043    }
1044
1045    async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
1046        let endpoint: Endpoint =
1047            Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
1048        Ok(endpoint.to_network_id())
1049    }
1050
1051    async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
1052        // Return cached limits if available
1053        if let Some(limits) = *self.limits.read() {
1054            return Ok(limits);
1055        }
1056
1057        // Fetch limits from the node
1058        let response = self
1059            .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
1060                Box::pin(async move { rpc_api.get_limits(()).await })
1061            })
1062            .await?;
1063        let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
1064
1065        // Cache fetched values
1066        self.limits.write().replace(limits);
1067        Ok(limits)
1068    }
1069
1070    fn has_rpc_limits(&self) -> Option<RpcLimits> {
1071        *self.limits.read()
1072    }
1073
1074    async fn set_rpc_limits(&self, limits: RpcLimits) {
1075        self.limits.write().replace(limits);
1076    }
1077
1078    async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
1079        GrpcClient::get_status_unversioned(self).await
1080    }
1081}
1082
1083// ERRORS
1084// ================================================================================================
1085
1086impl RpcError {
1087    pub fn from_grpc_error_with_context(
1088        endpoint: RpcEndpoint,
1089        status: Status,
1090        context: AcceptHeaderContext,
1091    ) -> Self {
1092        if let Some(accept_error) =
1093            AcceptHeaderError::try_from_message_with_context(status.message(), context)
1094        {
1095            return Self::AcceptHeaderError(accept_error);
1096        }
1097
1098        // Parse application-level error from status details
1099        let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
1100
1101        let error_kind = GrpcError::from(&status);
1102        let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
1103
1104        Self::RequestError {
1105            endpoint,
1106            error_kind,
1107            endpoint_error,
1108            source: Some(source),
1109        }
1110    }
1111}
1112
1113impl From<&Status> for GrpcError {
1114    fn from(status: &Status) -> Self {
1115        GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use std::boxed::Box;
1122
1123    use miden_protocol::Word;
1124    use miden_protocol::block::BlockNumber;
1125
1126    use super::{BlockPagination, GrpcClient, PaginationResult};
1127    use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1128
1129    fn assert_send_sync<T: Send + Sync>() {}
1130
1131    #[test]
1132    fn is_send_sync() {
1133        assert_send_sync::<GrpcClient>();
1134        assert_send_sync::<Box<dyn NodeRpcClient>>();
1135    }
1136
1137    #[test]
1138    fn block_pagination_errors_when_block_num_goes_backwards() {
1139        let mut pagination = BlockPagination::new(10_u32.into(), None);
1140
1141        let res = pagination.advance(9_u32.into(), 20_u32.into());
1142        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1143    }
1144
1145    #[test]
1146    fn block_pagination_errors_after_max_iterations() {
1147        let mut pagination = BlockPagination::new(0_u32.into(), None);
1148        let chain_tip: BlockNumber = 10_000_u32.into();
1149
1150        for _ in 0..BlockPagination::MAX_ITERATIONS {
1151            let current = pagination.current_block_from();
1152            let res = pagination
1153                .advance(current, chain_tip)
1154                .expect("expected pagination to continue within iteration limit");
1155            assert!(matches!(res, PaginationResult::Continue));
1156        }
1157
1158        let res = pagination.advance(pagination.current_block_from(), chain_tip);
1159        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1160    }
1161
1162    #[test]
1163    fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1164        // block_to is beyond chain tip, so target should be chain_tip.
1165        let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1166
1167        let res = pagination
1168            .advance(30_u32.into(), 30_u32.into())
1169            .expect("expected pagination to succeed");
1170
1171        assert!(matches!(
1172            res,
1173            PaginationResult::Done {
1174                chain_tip,
1175                block_num
1176            } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1177        ));
1178    }
1179
1180    #[test]
1181    fn block_pagination_advances_cursor_by_one() {
1182        let mut pagination = BlockPagination::new(5_u32.into(), None);
1183
1184        let res = pagination
1185            .advance(5_u32.into(), 100_u32.into())
1186            .expect("expected pagination to succeed");
1187        assert!(matches!(res, PaginationResult::Continue));
1188        assert_eq!(pagination.current_block_from().as_u32(), 6);
1189    }
1190
1191    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
1192    async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1193        // This won't compile if `get_block_header_by_number` doesn't return a `Send+Sync` future.
1194        let res = client.get_block_header_by_number(None, false).await;
1195        assert!(res.is_ok());
1196    }
1197
1198    #[tokio::test]
1199    async fn future_is_send() {
1200        let endpoint = &Endpoint::devnet();
1201        let client = GrpcClient::new(endpoint, 10000);
1202        let client: Box<GrpcClient> = client.into();
1203        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1204    }
1205
1206    #[tokio::test]
1207    async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1208        let endpoint = &Endpoint::devnet();
1209        let client = GrpcClient::new(endpoint, 10000);
1210
1211        assert!(client.genesis_commitment.read().is_none());
1212
1213        let commitment = Word::default();
1214        client.set_genesis_commitment(commitment).await.unwrap();
1215
1216        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1217    }
1218
1219    #[tokio::test]
1220    async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1221        use miden_protocol::Felt;
1222
1223        let endpoint = &Endpoint::devnet();
1224        let client = GrpcClient::new(endpoint, 10000);
1225
1226        let initial_commitment = Word::default();
1227        client.set_genesis_commitment(initial_commitment).await.unwrap();
1228
1229        let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1230        client.set_genesis_commitment(new_commitment).await.unwrap();
1231
1232        assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1233    }
1234
1235    #[tokio::test]
1236    async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1237        let endpoint = &Endpoint::devnet();
1238        let client = GrpcClient::new(endpoint, 10000);
1239
1240        // "Connect" the client
1241        client.connect().await.unwrap();
1242
1243        let commitment = Word::default();
1244        client.set_genesis_commitment(commitment).await.unwrap();
1245
1246        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1247        assert!(client.client.read().as_ref().is_some());
1248    }
1249}