Skip to main content

miden_client/rpc/tonic_client/
mod.rs

1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::asset::{Asset, AssetVault};
10use miden_protocol::vm::FutureMaybeSend;
11
12type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
13
14use miden_protocol::account::{
15    Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
16};
17use miden_protocol::address::NetworkId;
18use miden_protocol::block::account_tree::AccountWitness;
19use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
20use miden_protocol::crypto::merkle::MerklePath;
21use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
22use miden_protocol::crypto::merkle::smt::SmtProof;
23use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
24use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
25use miden_protocol::utils::serde::Deserializable;
26use miden_protocol::{EMPTY_WORD, Word};
27use miden_tx::utils::serde::Serializable;
28use miden_tx::utils::sync::RwLock;
29use tonic::Status;
30use tracing::info;
31
32use super::domain::account::{
33    AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
34};
35use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
36use super::generated::rpc::account_request::AccountDetailRequest;
37use super::generated::rpc::AccountRequest;
38use super::{
39    Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
40    RpcStatusInfo,
41};
42use crate::rpc::domain::sync::ChainMmrInfo;
43use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
44use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
45use crate::rpc::domain::transaction::TransactionsInfo;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
49use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
50use crate::rpc::generated::rpc::BlockRange;
51use crate::rpc::domain::limits::RpcLimits;
52use crate::rpc::{AccountStateAt, generated as proto};
53
54mod api_client;
55mod retry;
56
57use api_client::api_client_wrapper::ApiClient;
58
59/// Tracks the pagination state for block-driven endpoints.
60struct BlockPagination {
61    current_block_from: BlockNumber,
62    block_to: Option<BlockNumber>,
63    iterations: u32,
64}
65
66enum PaginationResult {
67    Continue,
68    Done {
69        chain_tip: BlockNumber,
70        block_num: BlockNumber,
71    },
72}
73
74impl BlockPagination {
75    /// Maximum number of pagination iterations for a single request.
76    ///
77    /// Protects against nodes returning inconsistent pagination data that could otherwise
78    /// trigger an infinite loop.
79    const MAX_ITERATIONS: u32 = 1000;
80
81    fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
82        Self {
83            current_block_from: block_from,
84            block_to,
85            iterations: 0,
86        }
87    }
88
89    fn current_block_from(&self) -> BlockNumber {
90        self.current_block_from
91    }
92
93    fn block_to(&self) -> Option<BlockNumber> {
94        self.block_to
95    }
96
97    fn advance(
98        &mut self,
99        block_num: BlockNumber,
100        chain_tip: BlockNumber,
101    ) -> Result<PaginationResult, RpcError> {
102        if self.iterations >= Self::MAX_ITERATIONS {
103            return Err(RpcError::PaginationError(
104                "too many pagination iterations, possible infinite loop".to_owned(),
105            ));
106        }
107        self.iterations += 1;
108
109        if block_num < self.current_block_from {
110            return Err(RpcError::PaginationError(
111                "invalid pagination: block_num went backwards".to_owned(),
112            ));
113        }
114
115        let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
116
117        if block_num >= target_block {
118            return Ok(PaginationResult::Done { chain_tip, block_num });
119        }
120
121        self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
122
123        Ok(PaginationResult::Continue)
124    }
125}
126
127// GRPC CLIENT
128// ================================================================================================
129
130/// Client for the Node RPC API using gRPC.
131///
132/// If the `tonic` feature is enabled, this client will use a `tonic::transport::Channel` to
133/// communicate with the node. In this case the connection will be established lazily when the
134/// first request is made.
135/// If the `web-tonic` feature is enabled, this client will use a `tonic_web_wasm_client::Client`
136/// to communicate with the node.
137///
138/// In both cases, the [`GrpcClient`] depends on the types inside the `generated` module, which
139/// are generated by the build script and also depend on the target architecture.
140pub struct GrpcClient {
141    /// The underlying gRPC client, lazily initialized on first request.
142    client: RwLock<Option<ApiClient>>,
143    /// The node endpoint URL to connect to.
144    endpoint: String,
145    /// Request timeout in milliseconds.
146    timeout_ms: u64,
147    /// The genesis block commitment, used for request validation by the node.
148    genesis_commitment: RwLock<Option<Word>>,
149    /// Cached RPC limits fetched from the node.
150    limits: RwLock<Option<RpcLimits>>,
151    /// Maximum number of retry attempts for rate-limited or transiently unavailable requests.
152    max_retries: u32,
153    /// Fallback retry interval in milliseconds when no `retry-after` header is present.
154    retry_interval_ms: u64,
155}
156
157impl GrpcClient {
158    /// Returns a new instance of [`GrpcClient`] that'll do calls to the provided [`Endpoint`]
159    /// with the given timeout in milliseconds.
160    pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
161        GrpcClient {
162            client: RwLock::new(None),
163            endpoint: endpoint.to_string(),
164            timeout_ms,
165            genesis_commitment: RwLock::new(None),
166            limits: RwLock::new(None),
167            max_retries: retry::DEFAULT_MAX_RETRIES,
168            retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
169        }
170    }
171
172    /// Sets the maximum number of retry attempts for rate-limited or transiently unavailable
173    /// requests. Defaults to `4`.
174    #[must_use]
175    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
176        self.max_retries = max_retries;
177        self
178    }
179
180    /// Sets the fallback retry interval in milliseconds, used when the server does not provide
181    /// a `retry-after` header. Defaults to `100` ms.
182    #[must_use]
183    pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
184        self.retry_interval_ms = retry_interval_ms;
185        self
186    }
187
188    /// Takes care of establishing the RPC connection if not connected yet. It ensures that the
189    /// `rpc_api` field is initialized and returns a write guard to it.
190    async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
191        if self.client.read().is_none() {
192            self.connect().await?;
193        }
194
195        Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
196    }
197
198    /// Connects to the Miden node, setting the client API with the provided URL, timeout and
199    /// genesis commitment.
200    async fn connect(&self) -> Result<(), RpcError> {
201        let genesis_commitment = *self.genesis_commitment.read();
202        let new_client =
203            ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
204                .await?;
205        let mut client = self.client.write();
206        client.replace(new_client);
207
208        Ok(())
209    }
210
211    fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
212        let genesis_commitment = self
213            .genesis_commitment
214            .read()
215            .as_ref()
216            .map_or_else(|| "none".to_string(), Word::to_hex);
217        let context = AcceptHeaderContext {
218            client_version: env!("CARGO_PKG_VERSION").to_string(),
219            genesis_commitment,
220        };
221        RpcError::from_grpc_error_with_context(endpoint, status, context)
222    }
223
224    /// Executes an RPC call and automatically retries transient failures.
225    ///
226    /// The provided closure is invoked with a freshly connected [`ApiClient`] on each attempt.
227    /// Retries are delegated to [`retry::RetryState`], which currently handles gRPC
228    /// [`tonic::Code::ResourceExhausted`] and [`tonic::Code::Unavailable`] responses, including
229    /// honoring cooldown delays when the node provides them.
230    ///
231    /// Returns the first successful gRPC response. If the call keeps failing after retries are
232    /// exhausted, or if the error is not retryable, this returns the corresponding [`RpcError`]
233    /// for the provided [`RpcEndpoint`].
234    async fn call_with_retry<T: Send + 'static>(
235        &self,
236        endpoint: RpcEndpoint,
237        mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
238    ) -> Result<tonic::Response<T>, RpcError> {
239        let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
240
241        loop {
242            let rpc_api = self.ensure_connected().await?;
243
244            match call(rpc_api).await {
245                Ok(response) => return Ok(response),
246                Err(status) if retry_state.should_retry(&status).await => {},
247                Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
248            }
249        }
250    }
251
252    /// Fetches RPC status without injecting an Accept header.
253    ///
254    /// This instantiates a separate API client without the Accept interceptor, so it does not
255    /// reuse the primary gRPC client.
256    pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
257        let mut rpc_api =
258            ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
259                .await?;
260        rpc_api
261            .status(())
262            .await
263            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
264            .map(tonic::Response::into_inner)
265            .and_then(RpcStatusInfo::try_from)
266    }
267
268    // GET ACCOUNT HELPERS
269    // ============================================================================================
270
271    /// Given an [`AccountId`], return the proof for the account.
272    ///
273    /// If the account also has public state, its details will also be retrieved
274    pub async fn fetch_full_account_proof(
275        &self,
276        account_id: AccountId,
277    ) -> Result<(BlockNumber, AccountProof), RpcError> {
278        let has_public_state = account_id.has_public_state();
279        let account_request = {
280            AccountRequest {
281                account_id: Some(account_id.into()),
282                block_num: None,
283                details: {
284                    if has_public_state {
285                        // Since we have to request the storage maps for an account
286                        // we *dont know* anything about, we'll have to do first this
287                        // request, which will tell us about the account's storage slots,
288                        // and then, request the slots in another request.
289                        Some(AccountDetailRequest {
290                            code_commitment: Some(EMPTY_WORD.into()),
291                            asset_vault_commitment: Some(EMPTY_WORD.into()),
292                            storage_maps: vec![],
293                        })
294                    } else {
295                        None
296                    }
297                },
298            }
299        };
300        let account_response = self
301            .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
302                let request = account_request.clone();
303                Box::pin(async move { rpc_api.get_account(request).await })
304            })
305            .await?
306            .into_inner();
307        let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
308            "GetAccountDetails returned an account without a matching block number for the witness"
309                .to_owned(),
310        ))?;
311        let account_proof = {
312            if has_public_state {
313                let account_details = account_response
314                    .details
315                    .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
316                    .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
317                let storage_header = account_details.storage_details.header;
318                // This variable will hold the storage slots that are maps, below we will use it to
319                // actually fetch the storage maps details, since we now know the names of each
320                // storage slot.
321                let maps_to_request = storage_header
322                    .slots()
323                    .filter(|header| header.slot_type().is_map())
324                    .map(|map| map.name().to_string());
325                let account_request = AccountRequest {
326                    account_id: Some(account_id.into()),
327                    block_num: Some(block_number),
328                    details: Some(AccountDetailRequest {
329                        code_commitment: Some(EMPTY_WORD.into()),
330                        asset_vault_commitment: Some(EMPTY_WORD.into()),
331                        storage_maps: maps_to_request
332                            .map(|slot_name| StorageMapDetailRequest {
333                                slot_name,
334                                slot_data: Some(SlotData::AllEntries(true)),
335                            })
336                            .collect(),
337                    }),
338                };
339                let response = self
340                    .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
341                        let request = account_request.clone();
342                        Box::pin(async move { rpc_api.get_account(request).await })
343                    })
344                    .await?;
345                response.into_inner().try_into()
346            } else {
347                account_response.try_into()
348            }
349        };
350        Ok((block_number.block_num.into(), account_proof?))
351    }
352
353    /// Given the storage details for an account and its id, returns a vector with all of its
354    /// storage slots. Keep in mind that if an account triggers the `too_many_entries` flag, there
355    /// will potentially be multiple requests.
356    async fn build_storage_slots(
357        &self,
358        account_id: AccountId,
359        storage_details: &AccountStorageDetails,
360        block_to: Option<BlockNumber>,
361    ) -> Result<Vec<StorageSlot>, RpcError> {
362        let mut slots = vec![];
363        // `SyncStorageMaps` will return information for *every* map for a given account, so this
364        // map_cache value should be fetched only once, hence the None placeholder
365        let mut map_cache: Option<StorageMapInfo> = None;
366        for slot_header in storage_details.header.slots() {
367            // We have two cases for each slot:
368            // - Slot is a value => We simply instance a StorageSlot
369            // - Slot is a map => If the map is 'small', we can simply
370            // build the map from the given entries. Otherwise we will have to
371            // call the SyncStorageMaps RPC method to obtain the data for the map.
372            // With the current setup, one RPC call should be enough.
373            match slot_header.slot_type() {
374                StorageSlotType::Value => {
375                    slots.push(miden_protocol::account::StorageSlot::with_value(
376                        slot_header.name().clone(),
377                        slot_header.value(),
378                    ));
379                },
380                StorageSlotType::Map => {
381                    let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
382                        RpcError::ExpectedDataMissing(format!(
383                            "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
384                            slot_header.name(),
385                        )),
386                    )?;
387
388                    let storage_map = if map_details.too_many_entries {
389                        let map_info = if let Some(ref info) = map_cache {
390                            info
391                        } else {
392                            let fetched_data =
393                                self.sync_storage_maps(0_u32.into(), block_to, account_id).await?;
394                            map_cache.insert(fetched_data)
395                        };
396                        // The sync endpoint may return multiple updates for the same key
397                        // across different blocks. We sort by block number so that
398                        // inserting into the map keeps only the latest value per key.
399                        let mut sorted_updates: Vec<_> = map_info
400                            .updates
401                            .iter()
402                            .filter(|slot_info| slot_info.slot_name == *slot_header.name())
403                            .collect();
404                        sorted_updates.sort_by_key(|u| u.block_num);
405                        let map_entries: Vec<_> = sorted_updates
406                            .into_iter()
407                            .map(|u| (u.key, u.value))
408                            .collect::<BTreeMap<_, _>>()
409                            .into_iter()
410                            .collect();
411                        StorageMap::with_entries(map_entries)
412                    } else {
413                        map_details.entries.clone().into_storage_map().ok_or_else(|| {
414                            RpcError::ExpectedDataMissing(
415                                "expected AllEntries for full account fetch, got EntriesWithProofs"
416                                    .into(),
417                            )
418                        })?
419                    }
420                    .map_err(|err| {
421                        RpcError::InvalidResponse(format!(
422                            "the rpc api returned a non-valid map entry: {err}"
423                        ))
424                    })?;
425
426                    slots.push(miden_protocol::account::StorageSlot::with_map(
427                        slot_header.name().clone(),
428                        storage_map,
429                    ));
430                },
431            }
432        }
433        Ok(slots)
434    }
435
436    /// Fetches the full vault assets via `SyncAccountVault`.
437    ///
438    /// Used when `too_many_assets` is set in the response. Deduplicates updates by
439    /// vault key, keeping only the latest value per key.
440    async fn fetch_full_vault(
441        &self,
442        account_id: AccountId,
443        block_to: Option<BlockNumber>,
444    ) -> Result<Vec<Asset>, RpcError> {
445        let vault_info =
446            self.sync_account_vault(BlockNumber::from(0), block_to, account_id).await?;
447        let mut updates = vault_info.updates;
448        updates.sort_by_key(|u| u.block_num);
449        Ok(updates
450            .into_iter()
451            .map(|u| (u.vault_key, u.asset))
452            .collect::<BTreeMap<_, _>>()
453            .into_values()
454            .flatten()
455            .collect())
456    }
457}
458
459#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
460#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
461impl NodeRpcClient for GrpcClient {
462    /// Sets the genesis commitment for the client. If the client is already connected, it will be
463    /// updated to use the new commitment on subsequent requests. If the client is not connected,
464    /// the commitment will be stored and used when the client connects. If the genesis commitment
465    /// is already set, this method does nothing.
466    fn has_genesis_commitment(&self) -> Option<Word> {
467        *self.genesis_commitment.read()
468    }
469
470    async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
471        // Check if already set before doing anything else
472        if self.genesis_commitment.read().is_some() {
473            // Genesis commitment is already set, ignoring the new value.
474            return Ok(());
475        }
476
477        // Store the commitment for future connections
478        self.genesis_commitment.write().replace(commitment);
479
480        // If a client is already connected, update it to use the new genesis commitment.
481        // If not connected, the commitment will be used when connect() is called.
482        let mut client_guard = self.client.write();
483        if let Some(client) = client_guard.as_mut() {
484            client.set_genesis_commitment(commitment);
485        }
486
487        Ok(())
488    }
489
490    async fn submit_proven_transaction(
491        &self,
492        proven_transaction: ProvenTransaction,
493        transaction_inputs: TransactionInputs,
494    ) -> Result<BlockNumber, RpcError> {
495        let request = proto::transaction::ProvenTransaction {
496            transaction: proven_transaction.to_bytes(),
497            transaction_inputs: Some(transaction_inputs.to_bytes()),
498        };
499
500        let api_response = self
501            .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
502                let request = request.clone();
503                Box::pin(async move { rpc_api.submit_proven_transaction(request).await })
504            })
505            .await?;
506
507        Ok(BlockNumber::from(api_response.into_inner().block_num))
508    }
509
510    async fn get_block_header_by_number(
511        &self,
512        block_num: Option<BlockNumber>,
513        include_mmr_proof: bool,
514    ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
515        let request = proto::rpc::BlockHeaderByNumberRequest {
516            block_num: block_num.as_ref().map(BlockNumber::as_u32),
517            include_mmr_proof: Some(include_mmr_proof),
518        };
519
520        info!("Calling GetBlockHeaderByNumber: {:?}", request);
521
522        let api_response = self
523            .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
524                Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
525            })
526            .await?;
527
528        let response = api_response.into_inner();
529
530        let block_header: BlockHeader = response
531            .block_header
532            .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
533            .try_into()?;
534
535        let mmr_proof = if include_mmr_proof {
536            let forest = response
537                .chain_length
538                .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
539            let merkle_path: MerklePath = response
540                .mmr_path
541                .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
542                .try_into()?;
543
544            Some(MmrProof::new(
545                MmrPath::new(
546                    Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
547                    block_header.block_num().as_usize(),
548                    merkle_path,
549                ),
550                block_header.commitment(),
551            ))
552        } else {
553            None
554        };
555
556        Ok((block_header, mmr_proof))
557    }
558
559    async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
560        let limits = self.get_rpc_limits().await?;
561        let mut notes = Vec::with_capacity(note_ids.len());
562        for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
563            let request = proto::note::NoteIdList {
564                ids: chunk.iter().map(|id| (*id).into()).collect(),
565            };
566
567            let api_response = self
568                .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
569                    let request = request.clone();
570                    Box::pin(async move { rpc_api.get_notes_by_id(request).await })
571                })
572                .await?;
573
574            let response_notes = api_response
575                .into_inner()
576                .notes
577                .into_iter()
578                .map(FetchedNote::try_from)
579                .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
580
581            notes.extend(response_notes);
582        }
583        Ok(notes)
584    }
585
586    async fn sync_chain_mmr(
587        &self,
588        block_from: BlockNumber,
589        block_to: Option<BlockNumber>,
590    ) -> Result<ChainMmrInfo, RpcError> {
591        let block_range = Some(BlockRange {
592            block_from: block_from.as_u32(),
593            block_to: block_to.map(|b| b.as_u32()),
594        });
595
596        let request = proto::rpc::SyncChainMmrRequest {
597            block_range,
598            finality: proto::rpc::Finality::Committed as i32,
599        };
600
601        let response = self
602            .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
603                Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
604            })
605            .await?;
606
607        response.into_inner().try_into()
608    }
609
610    /// Sends a `GetAccountDetailsRequest` to the Miden node, and extracts an [`FetchedAccount`]
611    /// from the `GetAccountDetailsResponse` response.
612    ///
613    /// # Errors
614    ///
615    /// This function will return an error if:
616    ///
617    /// - There was an error sending the request to the node.
618    /// - The answer had a `None` for one of the expected fields (`account`, `summary`,
619    ///   `account_commitment`, `details`).
620    /// - There is an error during [Account] deserialization.
621    async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
622        let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
623        let update_summary =
624            AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
625
626        // The case for a private account is simple,
627        // we simple use the commitment and its id.
628        if account_id.is_private() {
629            Ok(FetchedAccount::new_private(account_id, update_summary))
630        } else {
631            // An account with public state has to fetch all of its state.
632            // Even more so, an account with a large state will have to do
633            // a couple of extra requests to fetch all of its data.
634            let details =
635                full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
636                    "GetAccountDetails returned a public account without details".to_owned(),
637                ))?;
638            let account_id = details.header.id();
639            let nonce = details.header.nonce();
640            let assets = if details.vault_details.too_many_assets {
641                self.fetch_full_vault(account_id, Some(block_number)).await?
642            } else {
643                details.vault_details.assets
644            };
645
646            let slots = self
647                .build_storage_slots(account_id, &details.storage_details, Some(block_number))
648                .await?;
649            let seed = None;
650            let asset_vault = AssetVault::new(&assets).map_err(|err| {
651                RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
652            })?;
653            let account_storage = AccountStorage::new(slots).map_err(|err| {
654                RpcError::InvalidResponse(format!(
655                    "api rpc returned non-valid storage slots: {err}"
656                ))
657            })?;
658            let account =
659                Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
660                    .map_err(|err| {
661                    RpcError::InvalidResponse(format!(
662                        "failed to instance an account from the rpc api response: {err}"
663                    ))
664                })?;
665            Ok(FetchedAccount::new_public(account, update_summary))
666        }
667    }
668
669    /// Sends a `GetAccountProof` request to the Miden node, and extracts the [AccountProof]
670    /// from the response, as well as the block number that it was retrieved for.
671    ///
672    /// # Errors
673    ///
674    /// This function will return an error if:
675    ///
676    /// - The requested Account isn't returned by the node.
677    /// - There was an error sending the request to the node.
678    /// - The answer had a `None` for one of the expected fields.
679    /// - There is an error during storage deserialization.
680    async fn get_account_proof(
681        &self,
682        account_id: AccountId,
683        storage_requirements: AccountStorageRequirements,
684        account_state: AccountStateAt,
685        known_account_code: Option<AccountCode>,
686        known_vault_commitment: Option<Word>,
687    ) -> Result<(BlockNumber, AccountProof), RpcError> {
688        let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
689        if let Some(account_code) = known_account_code {
690            known_codes_by_commitment.insert(account_code.commitment(), account_code);
691        }
692
693        let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
694
695        // Only request details for accounts with public state (Public or Network);
696        // include known code commitment for this account when available
697        let account_details = if account_id.has_public_state() {
698            Some(AccountDetailRequest {
699                code_commitment: Some(EMPTY_WORD.into()),
700                asset_vault_commitment: known_vault_commitment.map(Into::into),
701                storage_maps,
702            })
703        } else {
704            None
705        };
706
707        let block_num = match account_state {
708            AccountStateAt::Block(number) => Some(number.into()),
709            AccountStateAt::ChainTip => None,
710        };
711
712        let request = AccountRequest {
713            account_id: Some(account_id.into()),
714            block_num,
715            details: account_details,
716        };
717
718        let response = self
719            .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
720                let request = request.clone();
721                Box::pin(async move { rpc_api.get_account(request).await })
722            })
723            .await?
724            .into_inner();
725
726        let account_witness: AccountWitness = response
727            .witness
728            .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
729            .try_into()?;
730
731        let block_num: BlockNumber = response
732            .block_num
733            .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
734            .block_num
735            .into();
736
737        // For accounts with public state, details should be present when requested
738        let headers = if account_witness.id().has_public_state() {
739            let mut details = response
740                .details
741                .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
742                .into_domain(&known_codes_by_commitment, &storage_requirements)?;
743
744            if details.vault_details.too_many_assets {
745                details.vault_details.assets =
746                    self.fetch_full_vault(account_id, Some(block_num)).await?;
747            }
748
749            Some(details)
750        } else {
751            None
752        };
753
754        let proof = AccountProof::new(account_witness, headers)
755            .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
756
757        Ok((block_num, proof))
758    }
759
760    /// Sends a `SyncNoteRequest` to the Miden node, and extracts a [`NoteSyncInfo`] from the
761    /// response.
762    async fn sync_notes(
763        &self,
764        block_num: BlockNumber,
765        block_to: Option<BlockNumber>,
766        note_tags: &BTreeSet<NoteTag>,
767    ) -> Result<NoteSyncInfo, RpcError> {
768        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
769
770        let block_range = Some(BlockRange {
771            block_from: block_num.as_u32(),
772            block_to: block_to.map(|b| b.as_u32()),
773        });
774
775        let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
776
777        let response = self
778            .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
779                let request = request.clone();
780                Box::pin(async move { rpc_api.sync_notes(request).await })
781            })
782            .await?;
783
784        response.into_inner().try_into()
785    }
786
787    async fn sync_nullifiers(
788        &self,
789        prefixes: &[u16],
790        block_num: BlockNumber,
791        block_to: Option<BlockNumber>,
792    ) -> Result<Vec<NullifierUpdate>, RpcError> {
793        const MAX_ITERATIONS: u32 = 1000; // Safety limit to prevent infinite loops
794
795        let limits = self.get_rpc_limits().await?;
796        let mut all_nullifiers = BTreeSet::new();
797
798        // If the prefixes are too many, we need to chunk them into smaller groups to avoid
799        // violating the RPC limit.
800        'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
801            let mut current_block_from = block_num.as_u32();
802
803            for _ in 0..MAX_ITERATIONS {
804                let request = proto::rpc::SyncNullifiersRequest {
805                    nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
806                    prefix_len: 16,
807                    block_range: Some(BlockRange {
808                        block_from: current_block_from,
809                        block_to: block_to.map(|b| b.as_u32()),
810                    }),
811                };
812
813                let response = self
814                    .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
815                        let request = request.clone();
816                        Box::pin(async move { rpc_api.sync_nullifiers(request).await })
817                    })
818                    .await?;
819                let response = response.into_inner();
820
821                // Convert nullifiers for this batch
822                let batch_nullifiers = response
823                    .nullifiers
824                    .iter()
825                    .map(TryFrom::try_from)
826                    .collect::<Result<Vec<NullifierUpdate>, _>>()
827                    .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
828
829                all_nullifiers.extend(batch_nullifiers);
830
831                // Check if we need to fetch more pages
832                if let Some(page) = response.pagination_info {
833                    // Ensure we're making progress to avoid infinite loops
834                    if page.block_num < current_block_from {
835                        return Err(RpcError::PaginationError(
836                            "invalid pagination: block_num went backwards".to_string(),
837                        ));
838                    }
839
840                    // Calculate target block as minimum between block_to and chain_tip
841                    let target_block =
842                        block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
843
844                    if page.block_num >= target_block {
845                        // No pagination info or we've reached/passed the target so we're done
846                        continue 'chunk_nullifiers;
847                    }
848                    current_block_from = page.block_num + 1;
849                }
850            }
851            // If we exit the loop, we've hit the iteration limit
852            return Err(RpcError::PaginationError(
853                "too many pagination iterations, possible infinite loop".to_string(),
854            ));
855        }
856        Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
857    }
858
859    async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
860        let limits = self.get_rpc_limits().await?;
861        let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
862        for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
863            let request = proto::rpc::NullifierList {
864                nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
865            };
866
867            let response = self
868                .call_with_retry(RpcEndpoint::CheckNullifiers, |mut rpc_api| {
869                    let request = request.clone();
870                    Box::pin(async move { rpc_api.check_nullifiers(request).await })
871                })
872                .await?;
873
874            let mut response = response.into_inner();
875            let chunk_proofs = response
876                .proofs
877                .iter_mut()
878                .map(|r| r.to_owned().try_into())
879                .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
880            proofs.extend(chunk_proofs);
881        }
882        Ok(proofs)
883    }
884
885    async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
886        let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
887
888        let response = self
889            .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
890                Box::pin(async move { rpc_api.get_block_by_number(request).await })
891            })
892            .await?;
893
894        let response = response.into_inner();
895        let block =
896            ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
897                "GetBlockByNumberResponse.block".to_string(),
898            ))?)?;
899
900        Ok(block)
901    }
902
903    async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
904        let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
905
906        let response = self
907            .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
908                Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
909            })
910            .await?;
911
912        let response = response.into_inner();
913        let note_script = NoteScript::try_from(
914            response
915                .script
916                .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
917        )?;
918
919        Ok(note_script)
920    }
921
922    async fn sync_storage_maps(
923        &self,
924        block_from: BlockNumber,
925        block_to: Option<BlockNumber>,
926        account_id: AccountId,
927    ) -> Result<StorageMapInfo, RpcError> {
928        let mut pagination = BlockPagination::new(block_from, block_to);
929        let mut updates = Vec::new();
930
931        let (chain_tip, block_number) = loop {
932            let request = proto::rpc::SyncAccountStorageMapsRequest {
933                block_range: Some(BlockRange {
934                    block_from: pagination.current_block_from().as_u32(),
935                    block_to: pagination.block_to().map(|block| block.as_u32()),
936                }),
937                account_id: Some(account_id.into()),
938            };
939            let response = self
940                .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
941                    let request = request.clone();
942                    Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
943                })
944                .await?;
945            let response = response.into_inner();
946            let page = response
947                .pagination_info
948                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
949            let page_block_num = BlockNumber::from(page.block_num);
950            let page_chain_tip = BlockNumber::from(page.chain_tip);
951            let batch = response
952                .updates
953                .into_iter()
954                .map(TryInto::try_into)
955                .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
956            updates.extend(batch);
957
958            match pagination.advance(page_block_num, page_chain_tip)? {
959                PaginationResult::Continue => {},
960                PaginationResult::Done {
961                    chain_tip: final_chain_tip,
962                    block_num: final_block_num,
963                } => break (final_chain_tip, final_block_num),
964            }
965        };
966
967        Ok(StorageMapInfo { chain_tip, block_number, updates })
968    }
969
970    async fn sync_account_vault(
971        &self,
972        block_from: BlockNumber,
973        block_to: Option<BlockNumber>,
974        account_id: AccountId,
975    ) -> Result<AccountVaultInfo, RpcError> {
976        let mut pagination = BlockPagination::new(block_from, block_to);
977        let mut updates = Vec::new();
978
979        let (chain_tip, block_number) = loop {
980            let request = proto::rpc::SyncAccountVaultRequest {
981                block_range: Some(BlockRange {
982                    block_from: pagination.current_block_from().as_u32(),
983                    block_to: pagination.block_to().map(|block| block.as_u32()),
984                }),
985                account_id: Some(account_id.into()),
986            };
987            let response = self
988                .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
989                    let request = request.clone();
990                    Box::pin(async move { rpc_api.sync_account_vault(request).await })
991                })
992                .await?;
993            let response = response.into_inner();
994            let page = response
995                .pagination_info
996                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
997            let page_block_num = BlockNumber::from(page.block_num);
998            let page_chain_tip = BlockNumber::from(page.chain_tip);
999            let batch = response
1000                .updates
1001                .iter()
1002                .map(|u| (*u).try_into())
1003                .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
1004            updates.extend(batch);
1005
1006            match pagination.advance(page_block_num, page_chain_tip)? {
1007                PaginationResult::Continue => {},
1008                PaginationResult::Done {
1009                    chain_tip: final_chain_tip,
1010                    block_num: final_block_num,
1011                } => break (final_chain_tip, final_block_num),
1012            }
1013        };
1014
1015        Ok(AccountVaultInfo { chain_tip, block_number, updates })
1016    }
1017
1018    async fn sync_transactions(
1019        &self,
1020        block_from: BlockNumber,
1021        block_to: Option<BlockNumber>,
1022        account_ids: Vec<AccountId>,
1023    ) -> Result<TransactionsInfo, RpcError> {
1024        let block_range = Some(BlockRange {
1025            block_from: block_from.as_u32(),
1026            block_to: block_to.map(|b| b.as_u32()),
1027        });
1028
1029        let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
1030
1031        let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
1032
1033        let response = self
1034            .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
1035                let request = request.clone();
1036                Box::pin(async move { rpc_api.sync_transactions(request).await })
1037            })
1038            .await?;
1039
1040        response.into_inner().try_into()
1041    }
1042
1043    async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
1044        let endpoint: Endpoint =
1045            Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
1046        Ok(endpoint.to_network_id())
1047    }
1048
1049    async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
1050        // Return cached limits if available
1051        if let Some(limits) = *self.limits.read() {
1052            return Ok(limits);
1053        }
1054
1055        // Fetch limits from the node
1056        let response = self
1057            .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
1058                Box::pin(async move { rpc_api.get_limits(()).await })
1059            })
1060            .await?;
1061        let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
1062
1063        // Cache fetched values
1064        self.limits.write().replace(limits);
1065        Ok(limits)
1066    }
1067
1068    fn has_rpc_limits(&self) -> Option<RpcLimits> {
1069        *self.limits.read()
1070    }
1071
1072    async fn set_rpc_limits(&self, limits: RpcLimits) {
1073        self.limits.write().replace(limits);
1074    }
1075
1076    async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
1077        GrpcClient::get_status_unversioned(self).await
1078    }
1079}
1080
1081// ERRORS
1082// ================================================================================================
1083
1084impl RpcError {
1085    pub fn from_grpc_error_with_context(
1086        endpoint: RpcEndpoint,
1087        status: Status,
1088        context: AcceptHeaderContext,
1089    ) -> Self {
1090        if let Some(accept_error) =
1091            AcceptHeaderError::try_from_message_with_context(status.message(), context)
1092        {
1093            return Self::AcceptHeaderError(accept_error);
1094        }
1095
1096        // Parse application-level error from status details
1097        let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
1098
1099        let error_kind = GrpcError::from(&status);
1100        let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
1101
1102        Self::RequestError {
1103            endpoint,
1104            error_kind,
1105            endpoint_error,
1106            source: Some(source),
1107        }
1108    }
1109}
1110
1111impl From<&Status> for GrpcError {
1112    fn from(status: &Status) -> Self {
1113        GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1114    }
1115}
1116
1117#[cfg(test)]
1118mod tests {
1119    use std::boxed::Box;
1120
1121    use miden_protocol::Word;
1122    use miden_protocol::block::BlockNumber;
1123
1124    use super::{BlockPagination, GrpcClient, PaginationResult};
1125    use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1126
1127    fn assert_send_sync<T: Send + Sync>() {}
1128
1129    #[test]
1130    fn is_send_sync() {
1131        assert_send_sync::<GrpcClient>();
1132        assert_send_sync::<Box<dyn NodeRpcClient>>();
1133    }
1134
1135    #[test]
1136    fn block_pagination_errors_when_block_num_goes_backwards() {
1137        let mut pagination = BlockPagination::new(10_u32.into(), None);
1138
1139        let res = pagination.advance(9_u32.into(), 20_u32.into());
1140        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1141    }
1142
1143    #[test]
1144    fn block_pagination_errors_after_max_iterations() {
1145        let mut pagination = BlockPagination::new(0_u32.into(), None);
1146        let chain_tip: BlockNumber = 10_000_u32.into();
1147
1148        for _ in 0..BlockPagination::MAX_ITERATIONS {
1149            let current = pagination.current_block_from();
1150            let res = pagination
1151                .advance(current, chain_tip)
1152                .expect("expected pagination to continue within iteration limit");
1153            assert!(matches!(res, PaginationResult::Continue));
1154        }
1155
1156        let res = pagination.advance(pagination.current_block_from(), chain_tip);
1157        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1158    }
1159
1160    #[test]
1161    fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1162        // block_to is beyond chain tip, so target should be chain_tip.
1163        let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1164
1165        let res = pagination
1166            .advance(30_u32.into(), 30_u32.into())
1167            .expect("expected pagination to succeed");
1168
1169        assert!(matches!(
1170            res,
1171            PaginationResult::Done {
1172                chain_tip,
1173                block_num
1174            } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1175        ));
1176    }
1177
1178    #[test]
1179    fn block_pagination_advances_cursor_by_one() {
1180        let mut pagination = BlockPagination::new(5_u32.into(), None);
1181
1182        let res = pagination
1183            .advance(5_u32.into(), 100_u32.into())
1184            .expect("expected pagination to succeed");
1185        assert!(matches!(res, PaginationResult::Continue));
1186        assert_eq!(pagination.current_block_from().as_u32(), 6);
1187    }
1188
1189    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
1190    async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1191        // This won't compile if `get_block_header_by_number` doesn't return a `Send+Sync` future.
1192        let res = client.get_block_header_by_number(None, false).await;
1193        assert!(res.is_ok());
1194    }
1195
1196    #[tokio::test]
1197    async fn future_is_send() {
1198        let endpoint = &Endpoint::devnet();
1199        let client = GrpcClient::new(endpoint, 10000);
1200        let client: Box<GrpcClient> = client.into();
1201        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1202    }
1203
1204    #[tokio::test]
1205    async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1206        let endpoint = &Endpoint::devnet();
1207        let client = GrpcClient::new(endpoint, 10000);
1208
1209        assert!(client.genesis_commitment.read().is_none());
1210
1211        let commitment = Word::default();
1212        client.set_genesis_commitment(commitment).await.unwrap();
1213
1214        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1215    }
1216
1217    #[tokio::test]
1218    async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1219        use miden_protocol::Felt;
1220
1221        let endpoint = &Endpoint::devnet();
1222        let client = GrpcClient::new(endpoint, 10000);
1223
1224        let initial_commitment = Word::default();
1225        client.set_genesis_commitment(initial_commitment).await.unwrap();
1226
1227        let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1228        client.set_genesis_commitment(new_commitment).await.unwrap();
1229
1230        assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1231    }
1232
1233    #[tokio::test]
1234    async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1235        let endpoint = &Endpoint::devnet();
1236        let client = GrpcClient::new(endpoint, 10000);
1237
1238        // "Connect" the client
1239        client.connect().await.unwrap();
1240
1241        let commitment = Word::default();
1242        client.set_genesis_commitment(commitment).await.unwrap();
1243
1244        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1245        assert!(client.client.read().as_ref().is_some());
1246    }
1247}