Skip to main content

miden_client/rpc/tonic_client/
mod.rs

1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use miden_protocol::asset::{Asset, AssetVault};
8
9use miden_protocol::account::{
10    Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
11};
12use miden_protocol::address::NetworkId;
13use miden_protocol::block::account_tree::AccountWitness;
14use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
15use miden_protocol::crypto::merkle::MerklePath;
16use miden_protocol::crypto::merkle::mmr::{Forest, MmrProof};
17use miden_protocol::crypto::merkle::smt::SmtProof;
18use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
19use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
20use miden_protocol::utils::Deserializable;
21use miden_protocol::{EMPTY_WORD, Word};
22use miden_tx::utils::Serializable;
23use miden_tx::utils::sync::RwLock;
24use tonic::Status;
25use tracing::info;
26
27use super::domain::account::{AccountProof, AccountStorageDetails, AccountUpdateSummary};
28use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
29use super::generated::rpc::account_request::AccountDetailRequest;
30use super::generated::rpc::AccountRequest;
31use super::{
32    Endpoint, FetchedAccount, NodeRpcClient, RpcEndpoint, NoteSyncInfo, RpcError,
33    RpcStatusInfo,
34};
35use crate::rpc::domain::sync::ChainMmrInfo;
36use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
37use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
38use crate::rpc::domain::transaction::TransactionsInfo;
39use crate::rpc::errors::node::parse_node_error;
40use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
41use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
42use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
43use crate::rpc::generated::rpc::BlockRange;
44use crate::rpc::domain::limits::RpcLimits;
45use crate::rpc::{AccountStateAt, generated as proto};
46use crate::rpc::domain::account::AccountStorageRequirements;
47
48mod api_client;
49use api_client::api_client_wrapper::ApiClient;
50
51/// Tracks the pagination state for block-driven endpoints.
52struct BlockPagination {
53    current_block_from: BlockNumber,
54    block_to: Option<BlockNumber>,
55    iterations: u32,
56}
57
58enum PaginationResult {
59    Continue,
60    Done {
61        chain_tip: BlockNumber,
62        block_num: BlockNumber,
63    },
64}
65
66impl BlockPagination {
67    /// Maximum number of pagination iterations for a single request.
68    ///
69    /// Protects against nodes returning inconsistent pagination data that could otherwise
70    /// trigger an infinite loop.
71    const MAX_ITERATIONS: u32 = 1000;
72
73    fn new(block_from: BlockNumber, block_to: Option<BlockNumber>) -> Self {
74        Self {
75            current_block_from: block_from,
76            block_to,
77            iterations: 0,
78        }
79    }
80
81    fn current_block_from(&self) -> BlockNumber {
82        self.current_block_from
83    }
84
85    fn block_to(&self) -> Option<BlockNumber> {
86        self.block_to
87    }
88
89    fn advance(
90        &mut self,
91        block_num: BlockNumber,
92        chain_tip: BlockNumber,
93    ) -> Result<PaginationResult, RpcError> {
94        if self.iterations >= Self::MAX_ITERATIONS {
95            return Err(RpcError::PaginationError(
96                "too many pagination iterations, possible infinite loop".to_owned(),
97            ));
98        }
99        self.iterations += 1;
100
101        if block_num < self.current_block_from {
102            return Err(RpcError::PaginationError(
103                "invalid pagination: block_num went backwards".to_owned(),
104            ));
105        }
106
107        let target_block = self.block_to.map_or(chain_tip, |to| to.min(chain_tip));
108
109        if block_num >= target_block {
110            return Ok(PaginationResult::Done { chain_tip, block_num });
111        }
112
113        self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
114
115        Ok(PaginationResult::Continue)
116    }
117}
118
119// GRPC CLIENT
120// ================================================================================================
121
122/// Client for the Node RPC API using gRPC.
123///
124/// If the `tonic` feature is enabled, this client will use a `tonic::transport::Channel` to
125/// communicate with the node. In this case the connection will be established lazily when the
126/// first request is made.
127/// If the `web-tonic` feature is enabled, this client will use a `tonic_web_wasm_client::Client`
128/// to communicate with the node.
129///
130/// In both cases, the [`GrpcClient`] depends on the types inside the `generated` module, which
131/// are generated by the build script and also depend on the target architecture.
132pub struct GrpcClient {
133    /// The underlying gRPC client, lazily initialized on first request.
134    client: RwLock<Option<ApiClient>>,
135    /// The node endpoint URL to connect to.
136    endpoint: String,
137    /// Request timeout in milliseconds.
138    timeout_ms: u64,
139    /// The genesis block commitment, used for request validation by the node.
140    genesis_commitment: RwLock<Option<Word>>,
141    /// Cached RPC limits fetched from the node.
142    limits: RwLock<Option<RpcLimits>>,
143}
144
145impl GrpcClient {
146    /// Returns a new instance of [`GrpcClient`] that'll do calls to the provided [`Endpoint`]
147    /// with the given timeout in milliseconds.
148    pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
149        GrpcClient {
150            client: RwLock::new(None),
151            endpoint: endpoint.to_string(),
152            timeout_ms,
153            genesis_commitment: RwLock::new(None),
154            limits: RwLock::new(None),
155        }
156    }
157
158    /// Takes care of establishing the RPC connection if not connected yet. It ensures that the
159    /// `rpc_api` field is initialized and returns a write guard to it.
160    async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
161        if self.client.read().is_none() {
162            self.connect().await?;
163        }
164
165        Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
166    }
167
168    /// Connects to the Miden node, setting the client API with the provided URL, timeout and
169    /// genesis commitment.
170    async fn connect(&self) -> Result<(), RpcError> {
171        let genesis_commitment = *self.genesis_commitment.read();
172        let new_client =
173            ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
174                .await?;
175        let mut client = self.client.write();
176        client.replace(new_client);
177
178        Ok(())
179    }
180
181    fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
182        let genesis_commitment = self
183            .genesis_commitment
184            .read()
185            .as_ref()
186            .map_or_else(|| "none".to_string(), Word::to_hex);
187        let context = AcceptHeaderContext {
188            client_version: env!("CARGO_PKG_VERSION").to_string(),
189            genesis_commitment,
190        };
191        RpcError::from_grpc_error_with_context(endpoint, status, context)
192    }
193
194    /// Fetches RPC status without injecting an Accept header.
195    ///
196    /// This instantiates a separate API client without the Accept interceptor, so it does not
197    /// reuse the primary gRPC client.
198    pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
199        let mut rpc_api =
200            ApiClient::new_client_without_accept_header(self.endpoint.clone(), self.timeout_ms)
201                .await?;
202        rpc_api
203            .status(())
204            .await
205            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
206            .map(tonic::Response::into_inner)
207            .and_then(RpcStatusInfo::try_from)
208    }
209
210    // GET ACCOUNT HELPERS
211    // ============================================================================================
212
213    /// Given an [`AccountId`], return the proof for the account.
214    ///
215    /// If the account also has public state, its details will also be retrieved
216    pub async fn fetch_full_account_proof(
217        &self,
218        account_id: AccountId,
219    ) -> Result<(BlockNumber, AccountProof), RpcError> {
220        let mut rpc_api = self.ensure_connected().await?;
221        let has_public_state = account_id.has_public_state();
222        let account_request = {
223            AccountRequest {
224                account_id: Some(account_id.into()),
225                block_num: None,
226                details: {
227                    if has_public_state {
228                        // Since we have to request the storage maps for an account
229                        // we *dont know* anything about, we'll have to do first this
230                        // request, which will tell us about the account's storage slots,
231                        // and then, request the slots in another request.
232                        Some(AccountDetailRequest {
233                            code_commitment: Some(EMPTY_WORD.into()),
234                            asset_vault_commitment: Some(EMPTY_WORD.into()),
235                            storage_maps: vec![],
236                        })
237                    } else {
238                        None
239                    }
240                },
241            }
242        };
243        let account_response = rpc_api
244            .get_account(account_request)
245            .await
246            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetAccount, status))?
247            .into_inner();
248        let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
249            "GetAccountDetails returned an account without a matching block number for the witness"
250                .to_owned(),
251        ))?;
252        let account_proof = {
253            if has_public_state {
254                let account_details = account_response
255                    .details
256                    .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
257                    .into_domain(&BTreeMap::new())?;
258                let storage_header = account_details.storage_details.header;
259                // This variable will hold the storage slots that are maps, below we will use it to
260                // actually fetch the storage maps details, since we now know the names of each
261                // storage slot.
262                let maps_to_request = storage_header
263                    .slots()
264                    .filter(|header| header.slot_type().is_map())
265                    .map(|map| map.name().to_string());
266                let account_request = AccountRequest {
267                    account_id: Some(account_id.into()),
268                    block_num: None,
269                    details: Some(AccountDetailRequest {
270                        code_commitment: Some(EMPTY_WORD.into()),
271                        asset_vault_commitment: Some(EMPTY_WORD.into()),
272                        storage_maps: maps_to_request
273                            .map(|slot_name| StorageMapDetailRequest {
274                                slot_name,
275                                slot_data: Some(SlotData::AllEntries(true)),
276                            })
277                            .collect(),
278                    }),
279                };
280                match rpc_api.get_account(account_request).await {
281                    Ok(account_proof) => account_proof.into_inner().try_into(),
282                    Err(err) => Err(RpcError::ConnectionError(
283                        format!(
284                            "failed to fetch account proof for account: {account_id}, got: {err}"
285                        )
286                        .into(),
287                    )),
288                }
289            } else {
290                account_response.try_into()
291            }
292        };
293        Ok((block_number.block_num.into(), account_proof?))
294    }
295
296    /// Given the storage details for an account and its id, returns a vector with all of its
297    /// storage slots. Keep in mind that if an account triggers the `too_many_entries` flag, there
298    /// will potentially be multiple requests.
299    async fn build_storage_slots(
300        &self,
301        account_id: AccountId,
302        storage_details: &AccountStorageDetails,
303    ) -> Result<Vec<StorageSlot>, RpcError> {
304        let mut slots = vec![];
305        // `SyncStorageMaps` will return information for *every* map for a given account, so this
306        // map_cache value should be fetched only once, hence the None placeholder
307        let mut map_cache: Option<StorageMapInfo> = None;
308        for slot_header in storage_details.header.slots() {
309            // We have two cases for each slot:
310            // - Slot is a value => We simply instance a StorageSlot
311            // - Slot is a map => If the map is 'small', we can simply
312            // build the map from the given entries. Otherwise we will have to
313            // call the SyncStorageMaps RPC method to obtain the data for the map.
314            // With the current setup, one RPC call should be enough.
315            match slot_header.slot_type() {
316                StorageSlotType::Value => {
317                    slots.push(miden_protocol::account::StorageSlot::with_value(
318                        slot_header.name().clone(),
319                        slot_header.value(),
320                    ));
321                },
322                StorageSlotType::Map => {
323                    let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
324                        RpcError::ExpectedDataMissing(format!(
325                            "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
326                            slot_header.name(),
327                        )),
328                    )?;
329
330                    let storage_map = if map_details.too_many_entries {
331                        let map_info = if let Some(ref info) = map_cache {
332                            info
333                        } else {
334                            let fetched_data =
335                                self.sync_storage_maps(0_u32.into(), None, account_id).await?;
336                            map_cache.insert(fetched_data)
337                        };
338                        let map_entries: Vec<_> = map_info
339                            .updates
340                            .iter()
341                            .filter(|slot_info| slot_info.slot_name == *slot_header.name())
342                            .map(|slot_info| (slot_info.key, slot_info.value))
343                            .collect();
344                        StorageMap::with_entries(map_entries)
345                    } else {
346                        map_details.entries.clone().into_storage_map()
347                    }
348                    .map_err(|err| {
349                        RpcError::InvalidResponse(format!(
350                            "the rpc api returned a non-valid map entry: {err}"
351                        ))
352                    })?;
353
354                    slots.push(miden_protocol::account::StorageSlot::with_map(
355                        slot_header.name().clone(),
356                        storage_map,
357                    ));
358                },
359            }
360        }
361        Ok(slots)
362    }
363}
364
365#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
366#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
367impl NodeRpcClient for GrpcClient {
368    /// Sets the genesis commitment for the client. If the client is already connected, it will be
369    /// updated to use the new commitment on subsequent requests. If the client is not connected,
370    /// the commitment will be stored and used when the client connects. If the genesis commitment
371    /// is already set, this method does nothing.
372    fn has_genesis_commitment(&self) -> Option<Word> {
373        *self.genesis_commitment.read()
374    }
375
376    async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
377        // Check if already set before doing anything else
378        if self.genesis_commitment.read().is_some() {
379            // Genesis commitment is already set, ignoring the new value.
380            return Ok(());
381        }
382
383        // Store the commitment for future connections
384        self.genesis_commitment.write().replace(commitment);
385
386        // If a client is already connected, update it to use the new genesis commitment.
387        // If not connected, the commitment will be used when connect() is called.
388        let mut client_guard = self.client.write();
389        if let Some(client) = client_guard.as_mut() {
390            client.set_genesis_commitment(commitment);
391        }
392
393        Ok(())
394    }
395
396    async fn submit_proven_transaction(
397        &self,
398        proven_transaction: ProvenTransaction,
399        transaction_inputs: TransactionInputs,
400    ) -> Result<BlockNumber, RpcError> {
401        let request = proto::transaction::ProvenTransaction {
402            transaction: proven_transaction.to_bytes(),
403            transaction_inputs: Some(transaction_inputs.to_bytes()),
404        };
405
406        let mut rpc_api = self.ensure_connected().await?;
407
408        let api_response = rpc_api
409            .submit_proven_transaction(request)
410            .await
411            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SubmitProvenTx, status))?;
412
413        Ok(BlockNumber::from(api_response.into_inner().block_num))
414    }
415
416    async fn get_block_header_by_number(
417        &self,
418        block_num: Option<BlockNumber>,
419        include_mmr_proof: bool,
420    ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
421        let request = proto::rpc::BlockHeaderByNumberRequest {
422            block_num: block_num.as_ref().map(BlockNumber::as_u32),
423            include_mmr_proof: Some(include_mmr_proof),
424        };
425
426        info!("Calling GetBlockHeaderByNumber: {:?}", request);
427
428        let mut rpc_api = self.ensure_connected().await?;
429
430        let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|status| {
431            self.rpc_error_from_status(RpcEndpoint::GetBlockHeaderByNumber, status)
432        })?;
433
434        let response = api_response.into_inner();
435
436        let block_header: BlockHeader = response
437            .block_header
438            .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
439            .try_into()?;
440
441        let mmr_proof = if include_mmr_proof {
442            let forest = response
443                .chain_length
444                .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
445            let merkle_path: MerklePath = response
446                .mmr_path
447                .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
448                .try_into()?;
449
450            Some(MmrProof {
451                forest: Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
452                position: block_header.block_num().as_usize(),
453                merkle_path,
454            })
455        } else {
456            None
457        };
458
459        Ok((block_header, mmr_proof))
460    }
461
462    async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
463        let limits = self.get_rpc_limits().await?;
464        let mut notes = Vec::with_capacity(note_ids.len());
465        for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
466            let request = proto::note::NoteIdList {
467                ids: chunk.iter().map(|id| (*id).into()).collect(),
468            };
469
470            let mut rpc_api = self.ensure_connected().await?;
471
472            let api_response = rpc_api
473                .get_notes_by_id(request)
474                .await
475                .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetNotesById, status))?;
476
477            let response_notes = api_response
478                .into_inner()
479                .notes
480                .into_iter()
481                .map(FetchedNote::try_from)
482                .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
483
484            notes.extend(response_notes);
485        }
486        Ok(notes)
487    }
488
489    async fn sync_chain_mmr(
490        &self,
491        block_from: BlockNumber,
492        block_to: Option<BlockNumber>,
493    ) -> Result<ChainMmrInfo, RpcError> {
494        let block_range = Some(BlockRange {
495            block_from: block_from.as_u32(),
496            block_to: block_to.map(|b| b.as_u32()),
497        });
498
499        let request = proto::rpc::SyncChainMmrRequest { block_range };
500
501        let mut rpc_api = self.ensure_connected().await?;
502
503        let response = rpc_api
504            .sync_chain_mmr(request)
505            .await
506            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SyncChainMmr, status))?;
507
508        response.into_inner().try_into()
509    }
510
511    /// Sends a `GetAccountDetailsRequest` to the Miden node, and extracts an [`FetchedAccount`]
512    /// from the `GetAccountDetailsResponse` response.
513    ///
514    /// # Errors
515    ///
516    /// This function will return an error if:
517    ///
518    /// - There was an error sending the request to the node.
519    /// - The answer had a `None` for one of the expected fields (`account`, `summary`,
520    ///   `account_commitment`, `details`).
521    /// - There is an error during [Account] deserialization.
522    async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
523        let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
524        let update_summary =
525            AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
526
527        // The case for a private account is simple,
528        // we simple use the commitment and its id.
529        if account_id.is_private() {
530            Ok(FetchedAccount::new_private(account_id, update_summary))
531        } else {
532            // An account with public state has to fetch all of its state.
533            // Even more so, an account with a large state will have to do
534            // a couple of extra requests to fetch all of its data.
535            let details =
536                full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
537                    "GetAccountDetails returned a public account without details".to_owned(),
538                ))?;
539            let account_id = details.header.id();
540            let nonce = details.header.nonce();
541            let assets: Vec<Asset> = {
542                if details.vault_details.too_many_assets {
543                    self.sync_account_vault(BlockNumber::from(0), None, account_id)
544                        .await?
545                        .updates
546                        .into_iter()
547                        .filter_map(|update| update.asset)
548                        .collect()
549                } else {
550                    details.vault_details.assets
551                }
552            };
553
554            let slots = self.build_storage_slots(account_id, &details.storage_details).await?;
555            let seed = None;
556            let asset_vault = AssetVault::new(&assets).map_err(|err| {
557                RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
558            })?;
559            let account_storage = AccountStorage::new(slots).map_err(|err| {
560                RpcError::InvalidResponse(format!(
561                    "api rpc returned non-valid storage slots: {err}"
562                ))
563            })?;
564            let account =
565                Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
566                    .map_err(|err| {
567                    RpcError::InvalidResponse(format!(
568                        "failed to instance an account from the rpc api response: {err}"
569                    ))
570                })?;
571            Ok(FetchedAccount::new_public(account, update_summary))
572        }
573    }
574
575    /// Sends a `GetAccountProof` request to the Miden node, and extracts the [AccountProof]
576    /// from the response, as well as the block number that it was retrieved for.
577    ///
578    /// # Errors
579    ///
580    /// This function will return an error if:
581    ///
582    /// - The requested Account isn't returned by the node.
583    /// - There was an error sending the request to the node.
584    /// - The answer had a `None` for one of the expected fields.
585    /// - There is an error during storage deserialization.
586    async fn get_account_proof(
587        &self,
588        account_id: AccountId,
589        storage_requirements: AccountStorageRequirements,
590        account_state: AccountStateAt,
591        known_account_code: Option<AccountCode>,
592    ) -> Result<(BlockNumber, AccountProof), RpcError> {
593        let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
594        if let Some(account_code) = known_account_code {
595            known_codes_by_commitment.insert(account_code.commitment(), account_code);
596        }
597
598        let mut rpc_api = self.ensure_connected().await?;
599
600        let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
601
602        // Only request details for public accounts; include known code commitment for this
603        // account when available
604        let account_details = if account_id.is_public() {
605            Some(AccountDetailRequest {
606                code_commitment: Some(EMPTY_WORD.into()),
607                // TODO: implement a way to request asset vaults
608                // https://github.com/0xMiden/miden-client/issues/1412
609                asset_vault_commitment: None,
610                storage_maps,
611            })
612        } else {
613            None
614        };
615
616        let block_num = match account_state {
617            AccountStateAt::Block(number) => Some(number.into()),
618            AccountStateAt::ChainTip => None,
619        };
620
621        let request = AccountRequest {
622            account_id: Some(account_id.into()),
623            block_num,
624            details: account_details,
625        };
626
627        let response = rpc_api
628            .get_account(request)
629            .await
630            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetAccount, status))?
631            .into_inner();
632
633        let account_witness: AccountWitness = response
634            .witness
635            .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
636            .try_into()?;
637
638        // For public accounts, details should be present when requested
639        let headers = if account_witness.id().is_public() {
640            Some(
641                response
642                    .details
643                    .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
644                    .into_domain(&known_codes_by_commitment)?,
645            )
646        } else {
647            None
648        };
649
650        let proof = AccountProof::new(account_witness, headers)
651            .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
652
653        let block_num = response
654            .block_num
655            .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
656            .block_num
657            .into();
658
659        Ok((block_num, proof))
660    }
661
662    /// Sends a `SyncNoteRequest` to the Miden node, and extracts a [`NoteSyncInfo`] from the
663    /// response.
664    async fn sync_notes(
665        &self,
666        block_num: BlockNumber,
667        block_to: Option<BlockNumber>,
668        note_tags: &BTreeSet<NoteTag>,
669    ) -> Result<NoteSyncInfo, RpcError> {
670        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
671
672        let block_range = Some(BlockRange {
673            block_from: block_num.as_u32(),
674            block_to: block_to.map(|b| b.as_u32()),
675        });
676
677        let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
678
679        let mut rpc_api = self.ensure_connected().await?;
680
681        let response = rpc_api
682            .sync_notes(request)
683            .await
684            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SyncNotes, status))?;
685
686        response.into_inner().try_into()
687    }
688
689    async fn sync_nullifiers(
690        &self,
691        prefixes: &[u16],
692        block_num: BlockNumber,
693        block_to: Option<BlockNumber>,
694    ) -> Result<Vec<NullifierUpdate>, RpcError> {
695        const MAX_ITERATIONS: u32 = 1000; // Safety limit to prevent infinite loops
696
697        let limits = self.get_rpc_limits().await?;
698        let mut all_nullifiers = BTreeSet::new();
699
700        // Establish RPC connection once before the loop
701        let mut rpc_api = self.ensure_connected().await?;
702
703        // If the prefixes are too many, we need to chunk them into smaller groups to avoid
704        // violating the RPC limit.
705        'chunk_nullifiers: for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
706            let mut current_block_from = block_num.as_u32();
707
708            for _ in 0..MAX_ITERATIONS {
709                let request = proto::rpc::SyncNullifiersRequest {
710                    nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
711                    prefix_len: 16,
712                    block_range: Some(BlockRange {
713                        block_from: current_block_from,
714                        block_to: block_to.map(|b| b.as_u32()),
715                    }),
716                };
717
718                let response = rpc_api.sync_nullifiers(request).await.map_err(|status| {
719                    self.rpc_error_from_status(RpcEndpoint::SyncNullifiers, status)
720                })?;
721                let response = response.into_inner();
722
723                // Convert nullifiers for this batch
724                let batch_nullifiers = response
725                    .nullifiers
726                    .iter()
727                    .map(TryFrom::try_from)
728                    .collect::<Result<Vec<NullifierUpdate>, _>>()
729                    .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
730
731                all_nullifiers.extend(batch_nullifiers);
732
733                // Check if we need to fetch more pages
734                if let Some(page) = response.pagination_info {
735                    // Ensure we're making progress to avoid infinite loops
736                    if page.block_num < current_block_from {
737                        return Err(RpcError::PaginationError(
738                            "invalid pagination: block_num went backwards".to_string(),
739                        ));
740                    }
741
742                    // Calculate target block as minimum between block_to and chain_tip
743                    let target_block =
744                        block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
745
746                    if page.block_num >= target_block {
747                        // No pagination info or we've reached/passed the target so we're done
748                        continue 'chunk_nullifiers;
749                    }
750                    current_block_from = page.block_num + 1;
751                }
752            }
753            // If we exit the loop, we've hit the iteration limit
754            return Err(RpcError::PaginationError(
755                "too many pagination iterations, possible infinite loop".to_string(),
756            ));
757        }
758        Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
759    }
760
761    async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
762        let limits = self.get_rpc_limits().await?;
763        let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
764        for chunk in nullifiers.chunks(limits.nullifiers_limit as usize) {
765            let request = proto::rpc::NullifierList {
766                nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
767            };
768
769            let mut rpc_api = self.ensure_connected().await?;
770
771            let response = rpc_api.check_nullifiers(request).await.map_err(|status| {
772                self.rpc_error_from_status(RpcEndpoint::CheckNullifiers, status)
773            })?;
774
775            let mut response = response.into_inner();
776            let chunk_proofs = response
777                .proofs
778                .iter_mut()
779                .map(|r| r.to_owned().try_into())
780                .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
781            proofs.extend(chunk_proofs);
782        }
783        Ok(proofs)
784    }
785
786    async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
787        let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
788
789        let mut rpc_api = self.ensure_connected().await?;
790
791        let response = rpc_api
792            .get_block_by_number(request)
793            .await
794            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetBlockByNumber, status))?;
795
796        let response = response.into_inner();
797        let block =
798            ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
799                "GetBlockByNumberResponse.block".to_string(),
800            ))?)?;
801
802        Ok(block)
803    }
804
805    async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
806        let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
807
808        let mut rpc_api = self.ensure_connected().await?;
809
810        let response = rpc_api.get_note_script_by_root(request).await.map_err(|status| {
811            self.rpc_error_from_status(RpcEndpoint::GetNoteScriptByRoot, status)
812        })?;
813
814        let response = response.into_inner();
815        let note_script = NoteScript::try_from(
816            response
817                .script
818                .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
819        )?;
820
821        Ok(note_script)
822    }
823
824    async fn sync_storage_maps(
825        &self,
826        block_from: BlockNumber,
827        block_to: Option<BlockNumber>,
828        account_id: AccountId,
829    ) -> Result<StorageMapInfo, RpcError> {
830        let mut rpc_api = self.ensure_connected().await?;
831        let mut pagination = BlockPagination::new(block_from, block_to);
832        let mut updates = Vec::new();
833
834        let (chain_tip, block_number) = loop {
835            let request = proto::rpc::SyncAccountStorageMapsRequest {
836                block_range: Some(BlockRange {
837                    block_from: pagination.current_block_from().as_u32(),
838                    block_to: pagination.block_to().map(|block| block.as_u32()),
839                }),
840                account_id: Some(account_id.into()),
841            };
842            let response = rpc_api.sync_account_storage_maps(request).await.map_err(|status| {
843                self.rpc_error_from_status(RpcEndpoint::SyncStorageMaps, status)
844            })?;
845            let response = response.into_inner();
846            let page = response
847                .pagination_info
848                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
849            let page_block_num = BlockNumber::from(page.block_num);
850            let page_chain_tip = BlockNumber::from(page.chain_tip);
851            let batch = response
852                .updates
853                .into_iter()
854                .map(TryInto::try_into)
855                .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
856            updates.extend(batch);
857
858            match pagination.advance(page_block_num, page_chain_tip)? {
859                PaginationResult::Continue => {},
860                PaginationResult::Done {
861                    chain_tip: final_chain_tip,
862                    block_num: final_block_num,
863                } => break (final_chain_tip, final_block_num),
864            }
865        };
866
867        Ok(StorageMapInfo { chain_tip, block_number, updates })
868    }
869
870    async fn sync_account_vault(
871        &self,
872        block_from: BlockNumber,
873        block_to: Option<BlockNumber>,
874        account_id: AccountId,
875    ) -> Result<AccountVaultInfo, RpcError> {
876        let mut rpc_api = self.ensure_connected().await?;
877        let mut pagination = BlockPagination::new(block_from, block_to);
878        let mut updates = Vec::new();
879
880        let (chain_tip, block_number) = loop {
881            let request = proto::rpc::SyncAccountVaultRequest {
882                block_range: Some(BlockRange {
883                    block_from: pagination.current_block_from().as_u32(),
884                    block_to: pagination.block_to().map(|block| block.as_u32()),
885                }),
886                account_id: Some(account_id.into()),
887            };
888            let response = rpc_api.sync_account_vault(request).await.map_err(|status| {
889                self.rpc_error_from_status(RpcEndpoint::SyncAccountVault, status)
890            })?;
891            let response = response.into_inner();
892            let page = response
893                .pagination_info
894                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
895            let page_block_num = BlockNumber::from(page.block_num);
896            let page_chain_tip = BlockNumber::from(page.chain_tip);
897            let batch = response
898                .updates
899                .iter()
900                .map(|u| (*u).try_into())
901                .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
902            updates.extend(batch);
903
904            match pagination.advance(page_block_num, page_chain_tip)? {
905                PaginationResult::Continue => {},
906                PaginationResult::Done {
907                    chain_tip: final_chain_tip,
908                    block_num: final_block_num,
909                } => break (final_chain_tip, final_block_num),
910            }
911        };
912
913        Ok(AccountVaultInfo { chain_tip, block_number, updates })
914    }
915
916    async fn sync_transactions(
917        &self,
918        block_from: BlockNumber,
919        block_to: Option<BlockNumber>,
920        account_ids: Vec<AccountId>,
921    ) -> Result<TransactionsInfo, RpcError> {
922        let block_range = Some(BlockRange {
923            block_from: block_from.as_u32(),
924            block_to: block_to.map(|b| b.as_u32()),
925        });
926
927        let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
928
929        let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
930
931        let mut rpc_api = self.ensure_connected().await?;
932
933        let response = rpc_api
934            .sync_transactions(request)
935            .await
936            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::SyncTransactions, status))?;
937
938        response.into_inner().try_into()
939    }
940
941    async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
942        let endpoint: Endpoint =
943            Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
944        Ok(endpoint.to_network_id())
945    }
946
947    async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
948        // Return cached limits if available
949        if let Some(limits) = *self.limits.read() {
950            return Ok(limits);
951        }
952
953        // Fetch limits from the node
954        let mut rpc_api = self.ensure_connected().await?;
955        let response = rpc_api
956            .get_limits(())
957            .await
958            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::GetLimits, status))?;
959        let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
960
961        // Cache fetched values
962        self.limits.write().replace(limits);
963        Ok(limits)
964    }
965
966    fn has_rpc_limits(&self) -> Option<RpcLimits> {
967        *self.limits.read()
968    }
969
970    async fn set_rpc_limits(&self, limits: RpcLimits) {
971        self.limits.write().replace(limits);
972    }
973
974    async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
975        GrpcClient::get_status_unversioned(self).await
976    }
977}
978
979// ERRORS
980// ================================================================================================
981
982impl RpcError {
983    pub fn from_grpc_error_with_context(
984        endpoint: RpcEndpoint,
985        status: Status,
986        context: AcceptHeaderContext,
987    ) -> Self {
988        if let Some(accept_error) =
989            AcceptHeaderError::try_from_message_with_context(status.message(), context)
990        {
991            return Self::AcceptHeaderError(accept_error);
992        }
993
994        // Parse application-level error from status details
995        let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
996
997        let error_kind = GrpcError::from(&status);
998        let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
999
1000        Self::RequestError {
1001            endpoint,
1002            error_kind,
1003            endpoint_error,
1004            source: Some(source),
1005        }
1006    }
1007}
1008
1009impl From<&Status> for GrpcError {
1010    fn from(status: &Status) -> Self {
1011        GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
1012    }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use std::boxed::Box;
1018
1019    use miden_protocol::Word;
1020    use miden_protocol::block::BlockNumber;
1021
1022    use super::{BlockPagination, GrpcClient, PaginationResult};
1023    use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1024
1025    fn assert_send_sync<T: Send + Sync>() {}
1026
1027    #[test]
1028    fn is_send_sync() {
1029        assert_send_sync::<GrpcClient>();
1030        assert_send_sync::<Box<dyn NodeRpcClient>>();
1031    }
1032
1033    #[test]
1034    fn block_pagination_errors_when_block_num_goes_backwards() {
1035        let mut pagination = BlockPagination::new(10_u32.into(), None);
1036
1037        let res = pagination.advance(9_u32.into(), 20_u32.into());
1038        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1039    }
1040
1041    #[test]
1042    fn block_pagination_errors_after_max_iterations() {
1043        let mut pagination = BlockPagination::new(0_u32.into(), None);
1044        let chain_tip: BlockNumber = 10_000_u32.into();
1045
1046        for _ in 0..BlockPagination::MAX_ITERATIONS {
1047            let current = pagination.current_block_from();
1048            let res = pagination
1049                .advance(current, chain_tip)
1050                .expect("expected pagination to continue within iteration limit");
1051            assert!(matches!(res, PaginationResult::Continue));
1052        }
1053
1054        let res = pagination.advance(pagination.current_block_from(), chain_tip);
1055        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1056    }
1057
1058    #[test]
1059    fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1060        // block_to is beyond chain tip, so target should be chain_tip.
1061        let mut pagination = BlockPagination::new(0_u32.into(), Some(50_u32.into()));
1062
1063        let res = pagination
1064            .advance(30_u32.into(), 30_u32.into())
1065            .expect("expected pagination to succeed");
1066
1067        assert!(matches!(
1068            res,
1069            PaginationResult::Done {
1070                chain_tip,
1071                block_num
1072            } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1073        ));
1074    }
1075
1076    #[test]
1077    fn block_pagination_advances_cursor_by_one() {
1078        let mut pagination = BlockPagination::new(5_u32.into(), None);
1079
1080        let res = pagination
1081            .advance(5_u32.into(), 100_u32.into())
1082            .expect("expected pagination to succeed");
1083        assert!(matches!(res, PaginationResult::Continue));
1084        assert_eq!(pagination.current_block_from().as_u32(), 6);
1085    }
1086
1087    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
1088    async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1089        // This won't compile if `get_block_header_by_number` doesn't return a `Send+Sync` future.
1090        let res = client.get_block_header_by_number(None, false).await;
1091        assert!(res.is_ok());
1092    }
1093
1094    #[tokio::test]
1095    async fn future_is_send() {
1096        let endpoint = &Endpoint::devnet();
1097        let client = GrpcClient::new(endpoint, 10000);
1098        let client: Box<GrpcClient> = client.into();
1099        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1100    }
1101
1102    #[tokio::test]
1103    async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1104        let endpoint = &Endpoint::devnet();
1105        let client = GrpcClient::new(endpoint, 10000);
1106
1107        assert!(client.genesis_commitment.read().is_none());
1108
1109        let commitment = Word::default();
1110        client.set_genesis_commitment(commitment).await.unwrap();
1111
1112        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1113    }
1114
1115    #[tokio::test]
1116    async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1117        use miden_protocol::Felt;
1118
1119        let endpoint = &Endpoint::devnet();
1120        let client = GrpcClient::new(endpoint, 10000);
1121
1122        let initial_commitment = Word::default();
1123        client.set_genesis_commitment(initial_commitment).await.unwrap();
1124
1125        let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
1126        client.set_genesis_commitment(new_commitment).await.unwrap();
1127
1128        assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1129    }
1130
1131    #[tokio::test]
1132    async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1133        let endpoint = &Endpoint::devnet();
1134        let client = GrpcClient::new(endpoint, 10000);
1135
1136        // "Connect" the client
1137        client.connect().await.unwrap();
1138
1139        let commitment = Word::default();
1140        client.set_genesis_commitment(commitment).await.unwrap();
1141
1142        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1143        assert!(client.client.read().as_ref().is_some());
1144    }
1145}