Skip to main content

miden_client/rpc/tonic_client/
mod.rs

1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use core::pin::Pin;
8
9use miden_protocol::vm::FutureMaybeSend;
10
11type RpcFuture<T> = Pin<Box<dyn FutureMaybeSend<T>>>;
12
13use miden_protocol::account::{AccountCode, AccountId};
14use miden_protocol::address::NetworkId;
15use miden_protocol::batch::{ProposedBatch, ProvenBatch};
16use miden_protocol::block::account_tree::AccountWitness;
17use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
18use miden_protocol::crypto::merkle::MerklePath;
19use miden_protocol::crypto::merkle::mmr::{Forest, MmrPath, MmrProof};
20use miden_protocol::note::{NoteId, NoteScript, NoteTag};
21use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
22use miden_protocol::utils::serde::Deserializable;
23use miden_protocol::{EMPTY_WORD, Word};
24use miden_tx::utils::serde::Serializable;
25use miden_tx::utils::sync::RwLock;
26use tonic::Status;
27use tracing::info;
28
29use super::domain::account::{
30    AccountProof,
31    AccountStorageRequirements,
32    GetAccountRequest,
33    StorageMapFetch,
34};
35use super::domain::note::{FetchedNote, NoteSyncBlock};
36use super::domain::nullifier::NullifierUpdate;
37use super::generated::rpc::AccountRequest;
38use super::generated::rpc::account_request::AccountDetailRequest;
39use super::{Endpoint, NodeRpcClient, RpcEndpoint, RpcError, RpcStatusInfo};
40use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
41use crate::rpc::domain::limits::RpcLimits;
42use crate::rpc::domain::status::NetworkNoteStatusInfo;
43use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
44use crate::rpc::domain::sync::{ChainMmrInfo, SyncTarget};
45use crate::rpc::domain::transaction::TransactionRecord;
46use crate::rpc::errors::node::parse_node_error;
47use crate::rpc::errors::{AcceptHeaderContext, AcceptHeaderError, GrpcError, RpcConversionError};
48use crate::rpc::generated::rpc::BlockRange;
49use crate::rpc::{AccountStateAt, generated as proto};
50
51mod api_client;
52mod retry;
53
54use api_client::api_client_wrapper::ApiClient;
55
56/// Tracks the pagination state for block-driven endpoints.
57struct BlockPagination {
58    current_block_from: BlockNumber,
59    block_to: BlockNumber,
60    iterations: u32,
61}
62
63enum PaginationResult {
64    Continue,
65    Done {
66        chain_tip: BlockNumber,
67        block_num: BlockNumber,
68    },
69}
70
71impl BlockPagination {
72    /// Maximum number of pagination iterations for a single request.
73    ///
74    /// Protects against nodes returning inconsistent pagination data that could otherwise
75    /// trigger an infinite loop.
76    const MAX_ITERATIONS: u32 = 1000;
77
78    fn new(block_from: BlockNumber, block_to: BlockNumber) -> Self {
79        Self {
80            current_block_from: block_from,
81            block_to,
82            iterations: 0,
83        }
84    }
85
86    fn current_block_from(&self) -> BlockNumber {
87        self.current_block_from
88    }
89
90    fn block_to(&self) -> BlockNumber {
91        self.block_to
92    }
93
94    fn advance(
95        &mut self,
96        block_num: BlockNumber,
97        chain_tip: BlockNumber,
98    ) -> Result<PaginationResult, RpcError> {
99        if self.iterations >= Self::MAX_ITERATIONS {
100            return Err(RpcError::PaginationError(
101                "too many pagination iterations, possible infinite loop".to_owned(),
102            ));
103        }
104        self.iterations += 1;
105
106        if block_num < self.current_block_from {
107            return Err(RpcError::PaginationError(
108                "invalid pagination: block_num went backwards".to_owned(),
109            ));
110        }
111
112        let target_block = self.block_to.min(chain_tip);
113
114        if block_num >= target_block {
115            return Ok(PaginationResult::Done { chain_tip, block_num });
116        }
117
118        self.current_block_from = BlockNumber::from(block_num.as_u32().saturating_add(1));
119
120        Ok(PaginationResult::Continue)
121    }
122}
123
124// GRPC CLIENT
125// ================================================================================================
126
127/// Client for the Node RPC API using gRPC.
128///
129/// If the `tonic` feature is enabled, this client will use a `tonic::transport::Channel` to
130/// communicate with the node. In this case the connection will be established lazily when the
131/// first request is made.
132/// If the `web-tonic` feature is enabled, this client will use a `tonic_web_wasm_client::Client`
133/// to communicate with the node.
134///
135/// In both cases, the [`GrpcClient`] depends on the types inside the `generated` module, which
136/// are generated by the build script and also depend on the target architecture.
137pub struct GrpcClient {
138    /// The underlying gRPC client, lazily initialized on first request.
139    client: RwLock<Option<ApiClient>>,
140    /// The node endpoint URL to connect to.
141    endpoint: String,
142    /// Request timeout in milliseconds.
143    timeout_ms: u64,
144    /// The genesis block commitment, used for request validation by the node.
145    genesis_commitment: RwLock<Option<Word>>,
146    /// Cached RPC limits fetched from the node.
147    limits: RwLock<Option<RpcLimits>>,
148    /// Maximum number of retry attempts for rate-limited or transiently unavailable requests.
149    max_retries: u32,
150    /// Fallback retry interval in milliseconds when no `retry-after` header is present.
151    retry_interval_ms: u64,
152    /// Optional bearer token injected as `authorization: Bearer <token>` on every outbound
153    /// gRPC call, alongside the standard `accept` header. Used when talking to an
154    /// authenticating gateway in front of the node.
155    bearer_token: Option<String>,
156}
157
158impl GrpcClient {
159    /// Returns a new instance of [`GrpcClient`] that'll do calls to the provided [`Endpoint`]
160    /// with the given timeout in milliseconds.
161    pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
162        GrpcClient {
163            client: RwLock::new(None),
164            endpoint: endpoint.to_string(),
165            timeout_ms,
166            genesis_commitment: RwLock::new(None),
167            limits: RwLock::new(None),
168            max_retries: retry::DEFAULT_MAX_RETRIES,
169            retry_interval_ms: retry::DEFAULT_RETRY_INTERVAL_MS,
170            bearer_token: None,
171        }
172    }
173
174    /// Sets the maximum number of retry attempts for rate-limited or transiently unavailable
175    /// requests. Defaults to `4`.
176    #[must_use]
177    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
178        self.max_retries = max_retries;
179        self
180    }
181
182    /// Sets the fallback retry interval in milliseconds, used when the server does not provide
183    /// a `retry-after` header. Defaults to `100` ms.
184    #[must_use]
185    pub fn with_retry_interval_ms(mut self, retry_interval_ms: u64) -> Self {
186        self.retry_interval_ms = retry_interval_ms;
187        self
188    }
189
190    /// Attaches a `authorization: Bearer <token>` header to every outbound gRPC call made
191    /// by this client, alongside the standard `accept` header.
192    ///
193    /// Intended for connecting to a Miden node through an authenticating gateway
194    /// (e.g. `miden-testnet.eu-central-8.gateway.fm`) that rate-limits unauthenticated
195    /// traffic. Without an auth mechanism on the client side, callers would have no way
196    /// to supply the token the gateway requires.
197    ///
198    /// Calling this method twice overwrites the earlier token.
199    ///
200    /// Validation of the token against
201    /// [`AsciiMetadataValue`](tonic::metadata::AsciiMetadataValue) is deferred to
202    /// connection time (printable ASCII plus tab only — `HeaderValue::from_str` semantics):
203    /// invalid tokens surface as
204    /// [`RpcError::ConnectionError`](crate::rpc::RpcError::ConnectionError) on the first
205    /// request, so CR/LF header-injection attempts are rejected.
206    ///
207    /// # Example
208    ///
209    /// ```no_run
210    /// # use miden_client::rpc::{Endpoint, GrpcClient};
211    /// let endpoint = Endpoint::new("https".into(), "node.example".into(), Some(443));
212    /// let client = GrpcClient::new(&endpoint, 10_000).with_bearer_auth("<api-key>".into());
213    /// ```
214    #[must_use]
215    pub fn with_bearer_auth(mut self, token: String) -> Self {
216        self.bearer_token = Some(token);
217        self
218    }
219
220    /// Takes care of establishing the RPC connection if not connected yet. It ensures that the
221    /// `rpc_api` field is initialized and returns a write guard to it.
222    async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
223        if self.client.read().is_none() {
224            self.connect().await?;
225        }
226
227        Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
228    }
229
230    /// Connects to the Miden node, setting the client API with the provided URL, timeout and
231    /// genesis commitment.
232    async fn connect(&self) -> Result<(), RpcError> {
233        let genesis_commitment = *self.genesis_commitment.read();
234        let new_client = ApiClient::new_client(
235            self.endpoint.clone(),
236            self.timeout_ms,
237            genesis_commitment,
238            self.bearer_token.clone(),
239        )
240        .await?;
241        let mut client = self.client.write();
242        client.replace(new_client);
243
244        Ok(())
245    }
246
247    fn rpc_error_from_status(&self, endpoint: RpcEndpoint, status: Status) -> RpcError {
248        let genesis_commitment = self
249            .genesis_commitment
250            .read()
251            .as_ref()
252            .map_or_else(|| "none".to_string(), Word::to_hex);
253        let context = AcceptHeaderContext {
254            client_version: env!("CARGO_PKG_VERSION").to_string(),
255            genesis_commitment,
256        };
257        RpcError::from_grpc_error_with_context(endpoint, status, context)
258    }
259
260    /// Executes an RPC call and automatically retries transient failures.
261    ///
262    /// The provided closure is invoked with a freshly connected [`ApiClient`] on each attempt.
263    /// Retries are delegated to [`retry::RetryState`], which currently handles gRPC
264    /// [`tonic::Code::ResourceExhausted`] and [`tonic::Code::Unavailable`] responses, including
265    /// honoring cooldown delays when the node provides them.
266    ///
267    /// Returns the first successful gRPC response. If the call keeps failing after retries are
268    /// exhausted, or if the error is not retryable, this returns the corresponding [`RpcError`]
269    /// for the provided [`RpcEndpoint`].
270    async fn call_with_retry<T: Send + 'static>(
271        &self,
272        endpoint: RpcEndpoint,
273        mut call: impl FnMut(ApiClient) -> RpcFuture<Result<tonic::Response<T>, Status>>,
274    ) -> Result<tonic::Response<T>, RpcError> {
275        let mut retry_state = retry::RetryState::new(self.max_retries, self.retry_interval_ms);
276
277        loop {
278            let rpc_api = self.ensure_connected().await?;
279
280            match call(rpc_api).await {
281                Ok(response) => return Ok(response),
282                Err(status) if retry_state.should_retry(&status).await => {},
283                Err(status) => return Err(self.rpc_error_from_status(endpoint, status)),
284            }
285        }
286    }
287
288    /// Fetches RPC status without injecting an Accept header.
289    ///
290    /// This instantiates a separate API client without the Accept interceptor, so it does not
291    /// reuse the primary gRPC client. Any caller-supplied
292    /// [`with_bearer_auth`](Self::with_bearer_auth) token is still forwarded so gateway
293    /// authentication keeps working.
294    pub async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
295        let mut rpc_api = ApiClient::new_client_without_accept_header(
296            self.endpoint.clone(),
297            self.timeout_ms,
298            self.bearer_token.clone(),
299        )
300        .await?;
301        rpc_api
302            .status(())
303            .await
304            .map_err(|status| self.rpc_error_from_status(RpcEndpoint::Status, status))
305            .map(tonic::Response::into_inner)
306            .and_then(RpcStatusInfo::try_from)
307    }
308}
309
310#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
311#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
312impl NodeRpcClient for GrpcClient {
313    /// Sets the genesis commitment for the client. If the client is already connected, it will be
314    /// updated to use the new commitment on subsequent requests. If the client is not connected,
315    /// the commitment will be stored and used when the client connects. If the genesis commitment
316    /// is already set, this method does nothing.
317    fn has_genesis_commitment(&self) -> Option<Word> {
318        *self.genesis_commitment.read()
319    }
320
321    async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
322        // Check if already set before doing anything else
323        if self.genesis_commitment.read().is_some() {
324            // Genesis commitment is already set, ignoring the new value.
325            return Ok(());
326        }
327
328        // Store the commitment for future connections
329        self.genesis_commitment.write().replace(commitment);
330
331        // If a client is already connected, update it to use the new genesis commitment.
332        // If not connected, the commitment will be used when connect() is called.
333        let mut client_guard = self.client.write();
334        if let Some(client) = client_guard.as_mut() {
335            client.set_genesis_commitment(commitment);
336        }
337
338        Ok(())
339    }
340
341    async fn submit_proven_transaction(
342        &self,
343        proven_transaction: ProvenTransaction,
344        transaction_inputs: TransactionInputs,
345    ) -> Result<BlockNumber, RpcError> {
346        let request = proto::transaction::ProvenTransaction {
347            transaction: proven_transaction.to_bytes(),
348            transaction_inputs: Some(transaction_inputs.to_bytes()),
349        };
350
351        let api_response = self
352            .call_with_retry(RpcEndpoint::SubmitProvenTx, |mut rpc_api| {
353                let request = request.clone();
354                Box::pin(async move { rpc_api.submit_proven_tx(request).await })
355            })
356            .await?;
357
358        Ok(BlockNumber::from(api_response.into_inner().block_num))
359    }
360
361    async fn submit_proven_batch(
362        &self,
363        proven_batch: ProvenBatch,
364        proposed_batch: ProposedBatch,
365        transaction_inputs: Vec<TransactionInputs>,
366    ) -> Result<BlockNumber, RpcError> {
367        let request = proto::transaction::TransactionBatch {
368            batch_proof: proven_batch.to_bytes(),
369            proposed_batch: Some(proposed_batch.to_bytes()),
370            transaction_inputs: transaction_inputs.iter().map(Serializable::to_bytes).collect(),
371        };
372
373        let api_response = self
374            .call_with_retry(RpcEndpoint::SubmitProvenBatch, |mut rpc_api| {
375                let request = request.clone();
376                Box::pin(async move { rpc_api.submit_proven_tx_batch(request).await })
377            })
378            .await?;
379
380        Ok(BlockNumber::from(api_response.into_inner().block_num))
381    }
382
383    async fn get_block_header_by_number(
384        &self,
385        block_num: Option<BlockNumber>,
386        include_mmr_proof: bool,
387    ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
388        let request = proto::rpc::BlockHeaderByNumberRequest {
389            block_num: block_num.as_ref().map(BlockNumber::as_u32),
390            include_mmr_proof: Some(include_mmr_proof),
391        };
392
393        info!("Calling GetBlockHeaderByNumber: {:?}", request);
394
395        let api_response = self
396            .call_with_retry(RpcEndpoint::GetBlockHeaderByNumber, |mut rpc_api| {
397                Box::pin(async move { rpc_api.get_block_header_by_number(request).await })
398            })
399            .await?;
400
401        let response = api_response.into_inner();
402
403        let block_header: BlockHeader = response
404            .block_header
405            .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
406            .try_into()?;
407
408        let mmr_proof = if include_mmr_proof {
409            let forest = response
410                .chain_length
411                .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
412            let merkle_path: MerklePath = response
413                .mmr_path
414                .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
415                .try_into()?;
416
417            let forest_size = usize::try_from(forest).expect("u64 should fit in usize");
418            let forest = Forest::new(forest_size).map_err(|_| {
419                RpcError::InvalidResponse(format!("invalid forest size: {forest_size}"))
420            })?;
421            Some(MmrProof::new(
422                MmrPath::new(forest, block_header.block_num().as_usize(), merkle_path),
423                block_header.commitment(),
424            ))
425        } else {
426            None
427        };
428
429        Ok((block_header, mmr_proof))
430    }
431
432    async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
433        let limits = self.get_rpc_limits().await?;
434        let mut notes = Vec::with_capacity(note_ids.len());
435        for chunk in note_ids.chunks(limits.note_ids_limit as usize) {
436            let request = proto::note::NoteIdList {
437                ids: chunk.iter().map(|id| (*id).into()).collect(),
438            };
439
440            let api_response = self
441                .call_with_retry(RpcEndpoint::GetNotesById, |mut rpc_api| {
442                    let request = request.clone();
443                    Box::pin(async move { rpc_api.get_notes_by_id(request).await })
444                })
445                .await?;
446
447            let response_notes = api_response
448                .into_inner()
449                .notes
450                .into_iter()
451                .map(FetchedNote::try_from)
452                .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
453
454            notes.extend(response_notes);
455        }
456        Ok(notes)
457    }
458
459    async fn sync_chain_mmr(
460        &self,
461        current_block_height: BlockNumber,
462        upper_bound: SyncTarget,
463    ) -> Result<ChainMmrInfo, RpcError> {
464        let finality_level: proto::rpc::FinalityLevel = upper_bound.into();
465
466        let request = proto::rpc::SyncChainMmrRequest {
467            current_client_block_height: current_block_height.as_u32(),
468            finality_level: finality_level.into(),
469        };
470
471        let response = self
472            .call_with_retry(RpcEndpoint::SyncChainMmr, |mut rpc_api| {
473                Box::pin(async move { rpc_api.sync_chain_mmr(request).await })
474            })
475            .await?;
476
477        response.into_inner().try_into()
478    }
479
480    /// Sends a `GetAccount` request to the Miden node, and extracts the [`AccountProof`]
481    /// from the response, as well as the block number that it was retrieved for.
482    ///
483    /// # Errors
484    ///
485    /// This function will return an error if:
486    ///
487    /// - The requested Account isn't returned by the node.
488    /// - There was an error sending the request to the node.
489    /// - The answer had a `None` for one of the expected fields.
490    /// - There is an error during storage deserialization.
491    async fn get_account(
492        &self,
493        account_id: AccountId,
494        request: GetAccountRequest,
495    ) -> Result<(BlockNumber, AccountProof), RpcError> {
496        let GetAccountRequest { storage, at, known_code, vault } = request;
497
498        let known_code_commitment = known_code.as_ref().map_or(EMPTY_WORD, AccountCode::commitment);
499        let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
500        if let Some(account_code) = known_code {
501            known_codes_by_commitment.insert(account_code.commitment(), account_code);
502        }
503
504        // We need the requested slots to interpret the node response.
505        let requirements = match storage.clone() {
506            StorageMapFetch::Slots(reqs) => reqs,
507            StorageMapFetch::Skip | StorageMapFetch::All => AccountStorageRequirements::default(),
508        };
509
510        // Only request details for accounts with public state (Public or Network), passing the
511        // known code commitment so the node can skip re-sending code we already hold.
512        let account_details = if account_id.is_public() {
513            Some(AccountDetailRequest {
514                code_commitment: Some(known_code_commitment.into()),
515                asset_vault_commitment: vault.into(),
516                storage_request: storage.into(),
517            })
518        } else {
519            None
520        };
521
522        let block_num = match at {
523            AccountStateAt::Block(number) => Some(number.into()),
524            AccountStateAt::ChainTip => None,
525        };
526
527        let proto_request = AccountRequest {
528            account_id: Some(account_id.into()),
529            block_num,
530            details: account_details,
531        };
532
533        let response = self
534            .call_with_retry(RpcEndpoint::GetAccount, |mut rpc_api| {
535                let request = proto_request.clone();
536                Box::pin(async move { rpc_api.get_account(request).await })
537            })
538            .await?
539            .into_inner();
540
541        let account_witness: AccountWitness = response
542            .witness
543            .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
544            .try_into()?;
545
546        let block_num: BlockNumber = response
547            .block_num
548            .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
549            .block_num
550            .into();
551
552        // For accounts with public state, details should be present when requested
553        let headers = if account_witness.id().is_public() {
554            let details = response
555                .details
556                .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
557                .into_domain(&known_codes_by_commitment, &requirements)?;
558
559            Some(details)
560        } else {
561            None
562        };
563
564        let proof = AccountProof::new(account_witness, headers)
565            .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
566
567        Ok((block_num, proof))
568    }
569
570    /// Sends one or more `SyncNoteRequest`s to the node and merges the responses into a list of
571    /// [`NoteSyncBlock`]s.
572    ///
573    /// Chunks `note_tags` by [`RpcLimits::note_tags_limit`] and paginates each chunk across the
574    /// requested block range.
575    async fn sync_notes(
576        &self,
577        block_from: BlockNumber,
578        block_to: BlockNumber,
579        note_tags: &BTreeSet<NoteTag>,
580    ) -> Result<Vec<NoteSyncBlock>, RpcError> {
581        if note_tags.is_empty() {
582            return Ok(Vec::new());
583        }
584
585        let limits = self.get_rpc_limits().await?;
586        let tags: Vec<NoteTag> = note_tags.iter().copied().collect();
587
588        // Merge blocks across tag-chunks: a single block can hold notes whose tags fall into
589        // different chunks, so the same block can appear in multiple chunks' responses.
590        let mut merged_blocks: BTreeMap<BlockNumber, NoteSyncBlock> = BTreeMap::new();
591
592        for chunk in tags.chunks(limits.note_tags_limit as usize) {
593            let proto_tags: Vec<u32> = chunk.iter().map(|&t| t.into()).collect();
594            let mut pagination = BlockPagination::new(block_from, block_to);
595
596            loop {
597                let request = proto::rpc::SyncNotesRequest {
598                    block_range: Some(BlockRange {
599                        block_from: pagination.current_block_from().as_u32(),
600                        block_to: block_to.as_u32(),
601                    }),
602                    note_tags: proto_tags.clone(),
603                };
604
605                let response = self
606                    .call_with_retry(RpcEndpoint::SyncNotes, |mut rpc_api| {
607                        let request = request.clone();
608                        Box::pin(async move { rpc_api.sync_notes(request).await })
609                    })
610                    .await?
611                    .into_inner();
612
613                let page = response.pagination_info.ok_or(RpcError::ExpectedDataMissing(
614                    "SyncNotesResponse.pagination_info".to_owned(),
615                ))?;
616                let page_chain_tip = BlockNumber::from(page.chain_tip);
617                let page_block_to = BlockNumber::from(page.block_num);
618
619                for proto_block in response.blocks {
620                    let block: NoteSyncBlock = proto_block.try_into()?;
621                    let bn = block.block_header.block_num();
622                    if let Some(existing) = merged_blocks.get_mut(&bn) {
623                        for (id, note) in block.notes {
624                            existing.notes.entry(id).or_insert(note);
625                        }
626                    } else {
627                        merged_blocks.insert(bn, block);
628                    }
629                }
630
631                match pagination.advance(page_block_to, page_chain_tip)? {
632                    PaginationResult::Continue => {},
633                    PaginationResult::Done { .. } => break,
634                }
635            }
636        }
637
638        Ok(merged_blocks.into_values().collect())
639    }
640
641    async fn sync_nullifiers(
642        &self,
643        prefixes: &[u16],
644        block_from: BlockNumber,
645        block_to: BlockNumber,
646    ) -> Result<Vec<NullifierUpdate>, RpcError> {
647        let limits = self.get_rpc_limits().await?;
648        let mut all_nullifiers = BTreeSet::new();
649
650        // If the prefixes are too many, we need to chunk them into smaller groups to avoid
651        // violating the RPC limit.
652        for chunk in prefixes.chunks(limits.nullifiers_limit as usize) {
653            let proto_prefixes: Vec<u32> = chunk.iter().map(|&x| u32::from(x)).collect();
654            let mut pagination = BlockPagination::new(block_from, block_to);
655
656            loop {
657                let request = proto::rpc::SyncNullifiersRequest {
658                    nullifiers: proto_prefixes.clone(),
659                    prefix_len: 16,
660                    block_range: Some(BlockRange {
661                        block_from: pagination.current_block_from().as_u32(),
662                        block_to: pagination.block_to().as_u32(),
663                    }),
664                };
665
666                let response = self
667                    .call_with_retry(RpcEndpoint::SyncNullifiers, |mut rpc_api| {
668                        let request = request.clone();
669                        Box::pin(async move { rpc_api.sync_nullifiers(request).await })
670                    })
671                    .await?
672                    .into_inner();
673
674                let batch_nullifiers = response
675                    .nullifiers
676                    .iter()
677                    .map(TryFrom::try_from)
678                    .collect::<Result<Vec<NullifierUpdate>, _>>()
679                    .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
680
681                all_nullifiers.extend(batch_nullifiers);
682
683                let page = response.pagination_info.ok_or(RpcError::ExpectedDataMissing(
684                    "SyncNullifiersResponse.pagination_info".to_owned(),
685                ))?;
686
687                match pagination.advance(page.block_num.into(), page.chain_tip.into())? {
688                    PaginationResult::Continue => {},
689                    PaginationResult::Done { .. } => break,
690                }
691            }
692        }
693        Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
694    }
695
696    async fn get_block_by_number(
697        &self,
698        block_num: BlockNumber,
699        include_proof: bool,
700    ) -> Result<ProvenBlock, RpcError> {
701        let request = proto::blockchain::BlockRequest {
702            block_num: block_num.as_u32(),
703            include_proof: Some(include_proof),
704        };
705
706        let response = self
707            .call_with_retry(RpcEndpoint::GetBlockByNumber, |mut rpc_api| {
708                Box::pin(async move { rpc_api.get_block_by_number(request).await })
709            })
710            .await?;
711
712        let response = response.into_inner();
713        let block =
714            ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
715                "GetBlockByNumberResponse.block".to_string(),
716            ))?)?;
717
718        Ok(block)
719    }
720
721    async fn get_note_script_by_root(&self, root: Word) -> Result<Option<NoteScript>, RpcError> {
722        let request = proto::note::NoteScriptRoot { root: Some(root.into()) };
723
724        let response = self
725            .call_with_retry(RpcEndpoint::GetNoteScriptByRoot, |mut rpc_api| {
726                Box::pin(async move { rpc_api.get_note_script_by_root(request).await })
727            })
728            .await?;
729
730        // The node returns an empty payload when it has no script registered for the root.
731        let Some(script) = response.into_inner().script else {
732            return Ok(None);
733        };
734        let note_script = NoteScript::try_from(script)?;
735
736        let fetched_root = note_script.root();
737        if Word::from(fetched_root) != root {
738            return Err(RpcError::InvalidResponse(format!(
739                "node returned note script with root {fetched_root} for requested root {root}",
740            )));
741        }
742
743        Ok(Some(note_script))
744    }
745
746    async fn sync_storage_maps(
747        &self,
748        block_from: BlockNumber,
749        block_to: BlockNumber,
750        account_id: AccountId,
751    ) -> Result<StorageMapInfo, RpcError> {
752        let mut pagination = BlockPagination::new(block_from, block_to);
753        let mut updates = Vec::new();
754
755        let (chain_tip, block_number) = loop {
756            let request = proto::rpc::SyncAccountStorageMapsRequest {
757                block_range: Some(BlockRange {
758                    block_from: pagination.current_block_from().as_u32(),
759                    block_to: block_to.as_u32(),
760                }),
761                account_id: Some(account_id.into()),
762            };
763            let response = self
764                .call_with_retry(RpcEndpoint::SyncStorageMaps, |mut rpc_api| {
765                    let request = request.clone();
766                    Box::pin(async move { rpc_api.sync_account_storage_maps(request).await })
767                })
768                .await?;
769            let response = response.into_inner();
770            let page = response
771                .pagination_info
772                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
773            let page_block_num = BlockNumber::from(page.block_num);
774            let page_chain_tip = BlockNumber::from(page.chain_tip);
775            let batch = response
776                .updates
777                .into_iter()
778                .map(TryInto::try_into)
779                .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
780            updates.extend(batch);
781
782            match pagination.advance(page_block_num, page_chain_tip)? {
783                PaginationResult::Continue => {},
784                PaginationResult::Done {
785                    chain_tip: final_chain_tip,
786                    block_num: final_block_num,
787                } => break (final_chain_tip, final_block_num),
788            }
789        };
790
791        Ok(StorageMapInfo { chain_tip, block_number, updates })
792    }
793
794    async fn sync_account_vault(
795        &self,
796        block_from: BlockNumber,
797        block_to: BlockNumber,
798        account_id: AccountId,
799    ) -> Result<AccountVaultInfo, RpcError> {
800        let mut pagination = BlockPagination::new(block_from, block_to);
801        let mut updates = Vec::new();
802
803        let (chain_tip, block_number) = loop {
804            let request = proto::rpc::SyncAccountVaultRequest {
805                block_range: Some(BlockRange {
806                    block_from: pagination.current_block_from().as_u32(),
807                    block_to: block_to.as_u32(),
808                }),
809                account_id: Some(account_id.into()),
810            };
811            let response = self
812                .call_with_retry(RpcEndpoint::SyncAccountVault, |mut rpc_api| {
813                    let request = request.clone();
814                    Box::pin(async move { rpc_api.sync_account_vault(request).await })
815                })
816                .await?;
817            let response = response.into_inner();
818            let page = response
819                .pagination_info
820                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
821            let page_block_num = BlockNumber::from(page.block_num);
822            let page_chain_tip = BlockNumber::from(page.chain_tip);
823            let batch = response
824                .updates
825                .iter()
826                .map(|u| (*u).try_into())
827                .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
828            updates.extend(batch);
829
830            match pagination.advance(page_block_num, page_chain_tip)? {
831                PaginationResult::Continue => {},
832                PaginationResult::Done {
833                    chain_tip: final_chain_tip,
834                    block_num: final_block_num,
835                } => break (final_chain_tip, final_block_num),
836            }
837        };
838
839        Ok(AccountVaultInfo { chain_tip, block_number, updates })
840    }
841
842    /// Sends one or more `SyncTransactions` requests to the node and concatenates the responses
843    /// into a flat list of [`TransactionRecord`]s.
844    ///
845    /// Chunks `account_ids` by [`RpcLimits::account_ids_limit`] and paginates each chunk across the
846    /// requested block range.
847    async fn sync_transactions(
848        &self,
849        block_from: BlockNumber,
850        block_to: BlockNumber,
851        account_ids: Vec<AccountId>,
852    ) -> Result<Vec<TransactionRecord>, RpcError> {
853        if account_ids.is_empty() {
854            return Ok(Vec::new());
855        }
856
857        let limits = self.get_rpc_limits().await?;
858        let mut transactions: Vec<TransactionRecord> = Vec::new();
859
860        for chunk in account_ids.chunks(limits.account_ids_limit as usize) {
861            let proto_account_ids: Vec<_> = chunk.iter().map(|acc_id| (*acc_id).into()).collect();
862            let mut pagination = BlockPagination::new(block_from, block_to);
863
864            loop {
865                let request = proto::rpc::SyncTransactionsRequest {
866                    block_range: Some(BlockRange {
867                        block_from: pagination.current_block_from().as_u32(),
868                        block_to: block_to.as_u32(),
869                    }),
870                    account_ids: proto_account_ids.clone(),
871                };
872
873                let response = self
874                    .call_with_retry(RpcEndpoint::SyncTransactions, |mut rpc_api| {
875                        let request = request.clone();
876                        Box::pin(async move { rpc_api.sync_transactions(request).await })
877                    })
878                    .await?
879                    .into_inner();
880
881                let page = response.pagination_info.ok_or(RpcError::ExpectedDataMissing(
882                    "SyncTransactionsResponse.pagination_info".to_owned(),
883                ))?;
884                let page_chain_tip = BlockNumber::from(page.chain_tip);
885                let page_block_to = BlockNumber::from(page.block_num);
886
887                for proto_tx in response.transactions {
888                    transactions.push(TransactionRecord::try_from(proto_tx)?);
889                }
890
891                match pagination.advance(page_block_to, page_chain_tip)? {
892                    PaginationResult::Continue => {},
893                    PaginationResult::Done { .. } => break,
894                }
895            }
896        }
897
898        Ok(transactions)
899    }
900
901    async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
902        let endpoint: Endpoint =
903            Endpoint::try_from(self.endpoint.as_str()).map_err(RpcError::InvalidNodeEndpoint)?;
904        Ok(endpoint.to_network_id())
905    }
906
907    async fn get_rpc_limits(&self) -> Result<RpcLimits, RpcError> {
908        // Return cached limits if available
909        if let Some(limits) = *self.limits.read() {
910            return Ok(limits);
911        }
912
913        // Fetch limits from the node
914        let response = self
915            .call_with_retry(RpcEndpoint::GetLimits, |mut rpc_api| {
916                Box::pin(async move { rpc_api.get_limits(()).await })
917            })
918            .await?;
919        let limits = RpcLimits::try_from(response.into_inner()).map_err(RpcError::from)?;
920
921        // Cache fetched values
922        self.limits.write().replace(limits);
923        Ok(limits)
924    }
925
926    fn has_rpc_limits(&self) -> Option<RpcLimits> {
927        *self.limits.read()
928    }
929
930    async fn set_rpc_limits(&self, limits: RpcLimits) {
931        self.limits.write().replace(limits);
932    }
933
934    async fn get_status_unversioned(&self) -> Result<RpcStatusInfo, RpcError> {
935        GrpcClient::get_status_unversioned(self).await
936    }
937
938    async fn get_network_note_status(
939        &self,
940        note_id: NoteId,
941    ) -> Result<NetworkNoteStatusInfo, RpcError> {
942        let request = proto::note::NoteId { id: Some(note_id.into()) };
943
944        let response = self
945            .call_with_retry(RpcEndpoint::GetNetworkNoteStatus, |mut rpc_api| {
946                Box::pin(async move { rpc_api.get_network_note_status(request).await })
947            })
948            .await?;
949
950        response.into_inner().try_into()
951    }
952}
953
954// ERRORS
955// ================================================================================================
956
957impl RpcError {
958    pub fn from_grpc_error_with_context(
959        endpoint: RpcEndpoint,
960        status: Status,
961        context: AcceptHeaderContext,
962    ) -> Self {
963        if let Some(accept_error) =
964            AcceptHeaderError::try_from_message_with_context(status.message(), context)
965        {
966            return Self::AcceptHeaderError(accept_error);
967        }
968
969        // Parse application-level error from status details
970        let endpoint_error = parse_node_error(&endpoint, status.details(), status.message());
971
972        let error_kind = GrpcError::from(&status);
973        let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
974
975        Self::RequestError {
976            endpoint,
977            error_kind,
978            endpoint_error,
979            source: Some(source),
980        }
981    }
982}
983
984impl From<&Status> for GrpcError {
985    fn from(status: &Status) -> Self {
986        GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
987    }
988}
989
990#[cfg(test)]
991mod tests {
992    use std::boxed::Box;
993
994    use miden_protocol::Word;
995    use miden_protocol::block::BlockNumber;
996
997    use super::{BlockPagination, GrpcClient, PaginationResult};
998    use crate::alloc::string::ToString;
999    use crate::rpc::{Endpoint, NodeRpcClient, RpcError};
1000
1001    fn assert_send_sync<T: Send + Sync>() {}
1002
1003    #[test]
1004    fn is_send_sync() {
1005        assert_send_sync::<GrpcClient>();
1006        assert_send_sync::<Box<dyn NodeRpcClient>>();
1007    }
1008
1009    #[test]
1010    fn block_pagination_errors_when_block_num_goes_backwards() {
1011        let mut pagination = BlockPagination::new(10_u32.into(), 20_u32.into());
1012
1013        let res = pagination.advance(9_u32.into(), 20_u32.into());
1014        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1015    }
1016
1017    #[test]
1018    fn block_pagination_errors_after_max_iterations() {
1019        let mut pagination = BlockPagination::new(0_u32.into(), 10_000_u32.into());
1020        let chain_tip: BlockNumber = 10_000_u32.into();
1021
1022        for _ in 0..BlockPagination::MAX_ITERATIONS {
1023            let current = pagination.current_block_from();
1024            let res = pagination
1025                .advance(current, chain_tip)
1026                .expect("expected pagination to continue within iteration limit");
1027            assert!(matches!(res, PaginationResult::Continue));
1028        }
1029
1030        let res = pagination.advance(pagination.current_block_from(), chain_tip);
1031        assert!(matches!(res, Err(RpcError::PaginationError(_))));
1032    }
1033
1034    #[test]
1035    fn block_pagination_stops_at_min_of_block_to_and_chain_tip() {
1036        // block_to is beyond chain tip, so target should be chain_tip.
1037        let mut pagination = BlockPagination::new(0_u32.into(), 50_u32.into());
1038
1039        let res = pagination
1040            .advance(30_u32.into(), 30_u32.into())
1041            .expect("expected pagination to succeed");
1042
1043        assert!(matches!(
1044            res,
1045            PaginationResult::Done {
1046                chain_tip,
1047                block_num
1048            } if chain_tip.as_u32() == 30 && block_num.as_u32() == 30
1049        ));
1050    }
1051
1052    #[test]
1053    fn block_pagination_advances_cursor_by_one() {
1054        let mut pagination = BlockPagination::new(5_u32.into(), 100_u32.into());
1055
1056        let res = pagination
1057            .advance(5_u32.into(), 100_u32.into())
1058            .expect("expected pagination to succeed");
1059        assert!(matches!(res, PaginationResult::Continue));
1060        assert_eq!(pagination.current_block_from().as_u32(), 6);
1061    }
1062
1063    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
1064    async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
1065        // This won't compile if `get_block_header_by_number` doesn't return a `Send+Sync` future.
1066        let res = client.get_block_header_by_number(None, false).await;
1067        assert!(res.is_ok());
1068    }
1069
1070    #[tokio::test]
1071    async fn future_is_send() {
1072        let endpoint = &Endpoint::devnet();
1073        let client = GrpcClient::new(endpoint, 10000);
1074        let client: Box<GrpcClient> = client.into();
1075        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
1076    }
1077
1078    #[tokio::test]
1079    async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
1080        let endpoint = &Endpoint::devnet();
1081        let client = GrpcClient::new(endpoint, 10000);
1082
1083        assert!(client.genesis_commitment.read().is_none());
1084
1085        let commitment = Word::default();
1086        client.set_genesis_commitment(commitment).await.unwrap();
1087
1088        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1089    }
1090
1091    #[tokio::test]
1092    async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
1093        let endpoint = &Endpoint::devnet();
1094        let client = GrpcClient::new(endpoint, 10000);
1095
1096        let initial_commitment = Word::default();
1097        client.set_genesis_commitment(initial_commitment).await.unwrap();
1098
1099        let new_commitment = Word::from([1u32, 2, 3, 4]);
1100        client.set_genesis_commitment(new_commitment).await.unwrap();
1101
1102        assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
1103    }
1104
1105    #[tokio::test]
1106    async fn set_genesis_commitment_updates_the_client_if_already_connected() {
1107        let endpoint = &Endpoint::devnet();
1108        let client = GrpcClient::new(endpoint, 10000);
1109
1110        // "Connect" the client
1111        client.connect().await.unwrap();
1112
1113        let commitment = Word::default();
1114        client.set_genesis_commitment(commitment).await.unwrap();
1115
1116        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
1117        assert!(client.client.read().as_ref().is_some());
1118    }
1119
1120    #[test]
1121    fn with_bearer_auth_stores_token() {
1122        let endpoint = &Endpoint::devnet();
1123        let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("token-one".to_string());
1124
1125        assert_eq!(client.bearer_token.as_deref(), Some("token-one"));
1126    }
1127
1128    #[test]
1129    fn with_bearer_auth_overwrites_on_repeat_call() {
1130        let endpoint = &Endpoint::devnet();
1131        let client = GrpcClient::new(endpoint, 10000)
1132            .with_bearer_auth("token-one".to_string())
1133            .with_bearer_auth("token-two".to_string());
1134
1135        // Second call replaces the first.
1136        assert_eq!(client.bearer_token.as_deref(), Some("token-two"));
1137    }
1138
1139    #[tokio::test]
1140    async fn with_bearer_auth_surfaces_invalid_ascii_value_at_connect_time() {
1141        // Tokens containing control characters are rejected by `AsciiMetadataValue`. The
1142        // fluent builder defers the check to connection time, so the error must surface as
1143        // a `ConnectionError` on the first request — preventing CR/LF header-injection.
1144        let endpoint = &Endpoint::devnet();
1145        let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("bad\nvalue".to_string());
1146
1147        let err = client.connect().await.expect_err("expected invalid token to fail connect");
1148        assert!(
1149            matches!(err, RpcError::ConnectionError(_)),
1150            "expected ConnectionError, got {err:?}",
1151        );
1152    }
1153
1154    #[tokio::test]
1155    async fn with_bearer_auth_is_preserved_across_set_genesis_commitment() {
1156        let endpoint = &Endpoint::devnet();
1157        let client = GrpcClient::new(endpoint, 10000).with_bearer_auth("token".to_string());
1158        client.connect().await.unwrap();
1159
1160        client.set_genesis_commitment(Word::default()).await.unwrap();
1161
1162        // Rebuilding the interceptor after a genesis update must not drop the caller token.
1163        assert_eq!(client.bearer_token.as_deref(), Some("token"));
1164        assert!(client.client.read().as_ref().is_some());
1165    }
1166
1167    /// Real-network smoke test: hitting the public testnet with a caller-supplied bearer
1168    /// token must return a real [`RpcStatusInfo`], proving the header is a valid
1169    /// [`AsciiMetadataValue`](tonic::metadata::AsciiMetadataValue) on the wire and that an
1170    /// unauthenticated node ignores it cleanly.
1171    ///
1172    /// `#[ignore]`d by default so offline CI doesn't fail; run with
1173    /// `cargo test -- --ignored with_bearer_auth_does_not_break_real_rpc_against_testnet`
1174    /// when validating against the real network. The interceptor-level test
1175    /// (`api_client::tests::interceptor_injects_bearer_token_onto_request`)
1176    /// already proves the header reaches outbound request metadata without needing the
1177    /// network.
1178    #[tokio::test]
1179    #[ignore = "requires network access to public testnet"]
1180    async fn with_bearer_auth_does_not_break_real_rpc_against_testnet() {
1181        let endpoint = &Endpoint::testnet();
1182        let client = GrpcClient::new(endpoint, 10_000).with_bearer_auth("smoke-test".to_string());
1183
1184        let status = client
1185            .get_status_unversioned()
1186            .await
1187            .expect("testnet status with caller auth header must succeed");
1188        assert!(!status.version.is_empty(), "status must include a server version");
1189    }
1190}