Skip to main content

miden_client/rpc/tonic_client/
mod.rs

1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use miden_protocol::asset::{Asset, AssetVault};
8
9use miden_protocol::account::{
10    Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
11};
12use miden_protocol::address::NetworkId;
13use miden_protocol::block::account_tree::AccountWitness;
14use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
15use miden_protocol::crypto::merkle::MerklePath;
16use miden_protocol::crypto::merkle::mmr::{Forest, MmrProof};
17use miden_protocol::crypto::merkle::smt::SmtProof;
18use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
19use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
20use miden_protocol::utils::Deserializable;
21use miden_protocol::{EMPTY_WORD, Word};
22use miden_tx::utils::Serializable;
23use miden_tx::utils::sync::RwLock;
24use tonic::Status;
25use tracing::info;
26
27use super::domain::account::{
28    AccountProof, AccountStorageDetails, AccountStorageRequirements, AccountUpdateSummary,
29};
30use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
31use super::generated::rpc::account_request::AccountDetailRequest;
32use super::generated::rpc::AccountRequest;
33use super::{
34    Endpoint, FetchedAccount, NodeRpcClient, NodeRpcClientEndpoint, NoteSyncInfo, RpcError,
35    StateSyncInfo,
36};
37use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
38use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
39use crate::rpc::domain::transaction::TransactionsInfo;
40use crate::rpc::errors::{AcceptHeaderError, GrpcError, RpcConversionError};
41use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
42use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
43use crate::rpc::generated::rpc::BlockRange;
44use crate::rpc::{AccountStateAt, NOTE_IDS_LIMIT, NULLIFIER_PREFIXES_LIMIT, generated as proto};
45use crate::transaction::ForeignAccount;
46
47mod api_client;
48use api_client::api_client_wrapper::ApiClient;
49
50// GRPC CLIENT
51// ================================================================================================
52
53/// Client for the Node RPC API using gRPC.
54///
55/// If the `tonic` feature is enabled, this client will use a `tonic::transport::Channel` to
56/// communicate with the node. In this case the connection will be established lazily when the
57/// first request is made.
58/// If the `web-tonic` feature is enabled, this client will use a `tonic_web_wasm_client::Client`
59/// to communicate with the node.
60///
61/// In both cases, the [`GrpcClient`] depends on the types inside the `generated` module, which
62/// are generated by the build script and also depend on the target architecture.
63pub struct GrpcClient {
64    client: RwLock<Option<ApiClient>>,
65    endpoint: String,
66    timeout_ms: u64,
67    genesis_commitment: RwLock<Option<Word>>,
68}
69
70impl GrpcClient {
71    /// Returns a new instance of [`GrpcClient`] that'll do calls to the provided [`Endpoint`]
72    /// with the given timeout in milliseconds.
73    pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
74        GrpcClient {
75            client: RwLock::new(None),
76            endpoint: endpoint.to_string(),
77            timeout_ms,
78            genesis_commitment: RwLock::new(None),
79        }
80    }
81
82    /// Takes care of establishing the RPC connection if not connected yet. It ensures that the
83    /// `rpc_api` field is initialized and returns a write guard to it.
84    async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
85        if self.client.read().is_none() {
86            self.connect().await?;
87        }
88
89        Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
90    }
91
92    /// Connects to the Miden node, setting the client API with the provided URL, timeout and
93    /// genesis commitment.
94    async fn connect(&self) -> Result<(), RpcError> {
95        let genesis_commitment = *self.genesis_commitment.read();
96        let new_client =
97            ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
98                .await?;
99        let mut client = self.client.write();
100        client.replace(new_client);
101
102        Ok(())
103    }
104
105    // GET ACCOUNT HELPERS
106    // ============================================================================================
107
108    /// Given an [`AccountId`], return the proof for the account.
109    ///
110    /// If the account also has public state, its details will also be retrieved
111    pub async fn fetch_full_account_proof(
112        &self,
113        account_id: AccountId,
114    ) -> Result<(BlockNumber, AccountProof), RpcError> {
115        let mut rpc_api = self.ensure_connected().await?;
116        let has_public_state = account_id.has_public_state();
117        let account_request = {
118            AccountRequest {
119                account_id: Some(account_id.into()),
120                block_num: None,
121                details: {
122                    if has_public_state {
123                        // Since we have to request the storage maps for an account
124                        // we *dont know* anything about, we'll have to do first this
125                        // request, which will tell us about the account's storage slots,
126                        // and then, request the slots in another request.
127                        Some(AccountDetailRequest {
128                            code_commitment: Some(EMPTY_WORD.into()),
129                            asset_vault_commitment: Some(EMPTY_WORD.into()),
130                            storage_maps: vec![],
131                        })
132                    } else {
133                        None
134                    }
135                },
136            }
137        };
138        let account_response = rpc_api
139            .get_account(account_request)
140            .await
141            .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
142            .into_inner();
143        let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
144            "GetAccountDetails returned an account without a matching block number for the witness"
145                .to_owned(),
146        ))?;
147        let account_proof = {
148            if has_public_state {
149                let account_details = account_response
150                    .details
151                    .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
152                    .into_domain(&BTreeMap::new(), &AccountStorageRequirements::default())?;
153                let storage_header = account_details.storage_details.header;
154                // This variable will hold the storage slots that are maps, below we will use it to
155                // actually fetch the storage maps details, since we now know the names of each
156                // storage slot.
157                let maps_to_request = storage_header
158                    .slots()
159                    .filter(|header| header.slot_type().is_map())
160                    .map(|map| map.name().to_string());
161                let account_request = AccountRequest {
162                    account_id: Some(account_id.into()),
163                    block_num: None,
164                    details: Some(AccountDetailRequest {
165                        code_commitment: Some(EMPTY_WORD.into()),
166                        asset_vault_commitment: Some(EMPTY_WORD.into()),
167                        storage_maps: maps_to_request
168                            .map(|slot_name| StorageMapDetailRequest {
169                                slot_name,
170                                slot_data: Some(SlotData::AllEntries(true)),
171                            })
172                            .collect(),
173                    }),
174                };
175                match rpc_api.get_account(account_request).await {
176                    Ok(account_proof) => account_proof.into_inner().try_into(),
177                    Err(err) => Err(RpcError::ConnectionError(
178                        format!(
179                            "failed to fetch account proof for account: {account_id}, got: {err}"
180                        )
181                        .into(),
182                    )),
183                }
184            } else {
185                account_response.try_into()
186            }
187        };
188        Ok((block_number.block_num.into(), account_proof?))
189    }
190
191    /// Given the storage details for an account and its id, returns a vector with all of its
192    /// storage slots. Keep in mind that if an account triggers the `too_many_entries` flag, there
193    /// will potentially be multiple requests.
194    async fn build_storage_slots(
195        &self,
196        account_id: AccountId,
197        storage_details: &AccountStorageDetails,
198    ) -> Result<Vec<StorageSlot>, RpcError> {
199        let mut slots = vec![];
200        // `SyncStorageMaps` will return information for *every* map for a given account, so this
201        // map_cache value should be fetched only once, hence the None placeholder
202        let mut map_cache: Option<StorageMapInfo> = None;
203        for slot_header in storage_details.header.slots() {
204            // We have two cases for each slot:
205            // - Slot is a value => We simply instance a StorageSlot
206            // - Slot is a map => If the map is 'small', we can simply
207            // build the map from the given entries. Otherwise we will have to
208            // call the SyncStorageMaps RPC method to obtain the data for the map.
209            // With the current setup, one RPC call should be enough.
210            match slot_header.slot_type() {
211                StorageSlotType::Value => {
212                    slots.push(miden_protocol::account::StorageSlot::with_value(
213                        slot_header.name().clone(),
214                        slot_header.value(),
215                    ));
216                },
217                StorageSlotType::Map => {
218                    let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
219                        RpcError::ExpectedDataMissing(format!(
220                            "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
221                            slot_header.name(),
222                        )),
223                    )?;
224
225                    let storage_map = if map_details.too_many_entries {
226                        let map_info = if let Some(ref info) = map_cache {
227                            info
228                        } else {
229                            let fetched_data =
230                                self.sync_storage_maps(0_u32.into(), None, account_id).await?;
231                            map_cache.insert(fetched_data)
232                        };
233                        // The sync endpoint may return multiple updates for the same key
234                        // across different blocks. We sort by block number so that
235                        // inserting into the map keeps only the latest value per key.
236                        let mut sorted_updates: Vec<_> = map_info
237                            .updates
238                            .iter()
239                            .filter(|slot_info| slot_info.slot_name == *slot_header.name())
240                            .collect();
241                        sorted_updates.sort_by_key(|u| u.block_num);
242                        let map_entries: Vec<_> = sorted_updates
243                            .into_iter()
244                            .map(|u| (u.key, u.value))
245                            .collect::<BTreeMap<_, _>>()
246                            .into_iter()
247                            .collect();
248                        StorageMap::with_entries(map_entries)
249                    } else {
250                        map_details.entries.clone().into_storage_map()
251                    }
252                    .map_err(|err| {
253                        RpcError::InvalidResponse(format!(
254                            "the rpc api returned a non-valid map entry: {err}"
255                        ))
256                    })?;
257
258                    slots.push(miden_protocol::account::StorageSlot::with_map(
259                        slot_header.name().clone(),
260                        storage_map,
261                    ));
262                },
263            }
264        }
265        Ok(slots)
266    }
267}
268
269#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
270#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
271impl NodeRpcClient for GrpcClient {
272    /// Sets the genesis commitment for the client. If the client is already connected, it will be
273    /// updated to use the new commitment on subsequent requests. If the client is not connected,
274    /// the commitment will be stored and used when the client connects. If the genesis commitment
275    /// is already set, this method does nothing.
276    async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
277        // Check if already set before doing anything else
278        if self.genesis_commitment.read().is_some() {
279            // Genesis commitment is already set, ignoring the new value.
280            return Ok(());
281        }
282
283        // Store the commitment for future connections
284        self.genesis_commitment.write().replace(commitment);
285
286        // If a client is already connected, update it to use the new genesis commitment.
287        // If not connected, the commitment will be used when connect() is called.
288        let mut client_guard = self.client.write();
289        if let Some(client) = client_guard.as_mut() {
290            client.set_genesis_commitment(commitment);
291        }
292
293        Ok(())
294    }
295
296    async fn submit_proven_transaction(
297        &self,
298        proven_transaction: ProvenTransaction,
299        transaction_inputs: TransactionInputs,
300    ) -> Result<BlockNumber, RpcError> {
301        let request = proto::transaction::ProvenTransaction {
302            transaction: proven_transaction.to_bytes(),
303            transaction_inputs: Some(transaction_inputs.to_bytes()),
304        };
305
306        let mut rpc_api = self.ensure_connected().await?;
307
308        let api_response = rpc_api.submit_proven_transaction(request).await.map_err(|status| {
309            RpcError::from_grpc_error(NodeRpcClientEndpoint::SubmitProvenTx, status)
310        })?;
311
312        Ok(BlockNumber::from(api_response.into_inner().block_num))
313    }
314
315    async fn get_block_header_by_number(
316        &self,
317        block_num: Option<BlockNumber>,
318        include_mmr_proof: bool,
319    ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
320        let request = proto::rpc::BlockHeaderByNumberRequest {
321            block_num: block_num.as_ref().map(BlockNumber::as_u32),
322            include_mmr_proof: Some(include_mmr_proof),
323        };
324
325        info!("Calling GetBlockHeaderByNumber: {:?}", request);
326
327        let mut rpc_api = self.ensure_connected().await?;
328
329        let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|status| {
330            RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockHeaderByNumber, status)
331        })?;
332
333        let response = api_response.into_inner();
334
335        let block_header: BlockHeader = response
336            .block_header
337            .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
338            .try_into()?;
339
340        let mmr_proof = if include_mmr_proof {
341            let forest = response
342                .chain_length
343                .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
344            let merkle_path: MerklePath = response
345                .mmr_path
346                .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
347                .try_into()?;
348
349            Some(MmrProof {
350                forest: Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
351                position: block_header.block_num().as_usize(),
352                merkle_path,
353            })
354        } else {
355            None
356        };
357
358        Ok((block_header, mmr_proof))
359    }
360
361    async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
362        let mut notes = Vec::with_capacity(note_ids.len());
363        for chunk in note_ids.chunks(NOTE_IDS_LIMIT) {
364            let request = proto::note::NoteIdList {
365                ids: chunk.iter().map(|id| (*id).into()).collect(),
366            };
367
368            let mut rpc_api = self.ensure_connected().await?;
369
370            let api_response = rpc_api.get_notes_by_id(request).await.map_err(|status| {
371                RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNotesById, status)
372            })?;
373
374            let response_notes = api_response
375                .into_inner()
376                .notes
377                .into_iter()
378                .map(FetchedNote::try_from)
379                .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
380
381            notes.extend(response_notes);
382        }
383        Ok(notes)
384    }
385
386    /// Sends a sync state request to the Miden node, validates and converts the response
387    /// into a [`StateSyncInfo`] struct.
388    async fn sync_state(
389        &self,
390        block_num: BlockNumber,
391        account_ids: &[AccountId],
392        note_tags: &BTreeSet<NoteTag>,
393    ) -> Result<StateSyncInfo, RpcError> {
394        let account_ids = account_ids.iter().map(|acc| (*acc).into()).collect();
395
396        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
397
398        let request = proto::rpc::SyncStateRequest {
399            block_num: block_num.as_u32(),
400            account_ids,
401            note_tags,
402        };
403
404        let mut rpc_api = self.ensure_connected().await?;
405
406        let response = rpc_api.sync_state(request).await.map_err(|status| {
407            RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncState, status)
408        })?;
409        response.into_inner().try_into()
410    }
411
412    /// Sends a `GetAccountDetailsRequest` to the Miden node, and extracts an [`FetchedAccount`]
413    /// from the `GetAccountDetailsResponse` response.
414    ///
415    /// # Errors
416    ///
417    /// This function will return an error if:
418    ///
419    /// - There was an error sending the request to the node.
420    /// - The answer had a `None` for one of the expected fields (`account`, `summary`,
421    ///   `account_commitment`, `details`).
422    /// - There is an error during [Account] deserialization.
423    async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
424        let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
425        let update_summary =
426            AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
427
428        // The case for a private account is simple,
429        // we simple use the commitment and its id.
430        if account_id.is_private() {
431            Ok(FetchedAccount::new_private(account_id, update_summary))
432        } else {
433            // An account with public state has to fetch all of its state.
434            // Even more so, an account with a large state will have to do
435            // a couple of extra requests to fetch all of its data.
436            let details =
437                full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
438                    "GetAccountDetails returned a public account without details".to_owned(),
439                ))?;
440            let account_id = details.header.id();
441            let nonce = details.header.nonce();
442            let assets: Vec<Asset> = {
443                if details.vault_details.too_many_assets {
444                    let vault_info =
445                        self.sync_account_vault(BlockNumber::from(0), None, account_id).await?;
446                    // The sync endpoint may return multiple updates for the same vault key
447                    // across different blocks. We sort by block number so that
448                    // inserting into the map keeps only the latest value per key.
449                    let mut updates = vault_info.updates;
450                    updates.sort_by_key(|u| u.block_num);
451                    updates
452                        .into_iter()
453                        .map(|u| (Word::from(u.vault_key), u.asset))
454                        .collect::<BTreeMap<_, _>>()
455                        .into_values()
456                        .flatten()
457                        .collect()
458                } else {
459                    details.vault_details.assets
460                }
461            };
462
463            let slots = self.build_storage_slots(account_id, &details.storage_details).await?;
464            let seed = None;
465            let asset_vault = AssetVault::new(&assets).map_err(|err| {
466                RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
467            })?;
468            let account_storage = AccountStorage::new(slots).map_err(|err| {
469                RpcError::InvalidResponse(format!(
470                    "api rpc returned non-valid storage slots: {err}"
471                ))
472            })?;
473            let account =
474                Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
475                    .map_err(|err| {
476                    RpcError::InvalidResponse(format!(
477                        "failed to instance an account from the rpc api response: {err}"
478                    ))
479                })?;
480            Ok(FetchedAccount::new_public(account, update_summary))
481        }
482    }
483
484    /// Sends a `GetAccountProof` request to the Miden node, and extracts the [AccountProof]
485    /// from the response, as well as the block number that it was retrieved for.
486    ///
487    /// # Errors
488    ///
489    /// This function will return an error if:
490    ///
491    /// - The requested Account isn't returned by the node.
492    /// - There was an error sending the request to the node.
493    /// - The answer had a `None` for one of the expected fields.
494    /// - There is an error during storage deserialization.
495    async fn get_account(
496        &self,
497        foreign_account: ForeignAccount,
498        account_state: AccountStateAt,
499        known_account_code: Option<AccountCode>,
500    ) -> Result<(BlockNumber, AccountProof), RpcError> {
501        let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
502        if let Some(account_code) = known_account_code {
503            known_codes_by_commitment.insert(account_code.commitment(), account_code);
504        }
505
506        let mut rpc_api = self.ensure_connected().await?;
507
508        // Request proofs one-by-one using the singular API
509        let account_id = foreign_account.account_id();
510        let storage_requirements = foreign_account.storage_slot_requirements();
511
512        let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
513
514        // Only request details for public accounts; include known code commitment for this
515        // account when available
516        let account_details = if account_id.is_public() {
517            Some(AccountDetailRequest {
518                code_commitment: Some(EMPTY_WORD.into()),
519                // TODO: implement a way to request asset vaults
520                // https://github.com/0xMiden/miden-client/issues/1412
521                asset_vault_commitment: None,
522                storage_maps,
523            })
524        } else {
525            None
526        };
527
528        let block_num = match account_state {
529            AccountStateAt::Block(number) => Some(number.into()),
530            AccountStateAt::ChainTip => None,
531        };
532
533        let request = AccountRequest {
534            account_id: Some(account_id.into()),
535            block_num,
536            details: account_details,
537        };
538
539        let response = rpc_api
540            .get_account(request)
541            .await
542            .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
543            .into_inner();
544
545        let account_witness: AccountWitness = response
546            .witness
547            .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
548            .try_into()?;
549
550        // For public accounts, details should be present when requested
551        let headers = if account_witness.id().is_public() {
552            Some(
553                response
554                    .details
555                    .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
556                    .into_domain(&known_codes_by_commitment, &storage_requirements)?,
557            )
558        } else {
559            None
560        };
561
562        let proof = AccountProof::new(account_witness, headers)
563            .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
564
565        let block_num = response
566            .block_num
567            .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
568            .block_num
569            .into();
570
571        Ok((block_num, proof))
572    }
573
574    /// Sends a `SyncNoteRequest` to the Miden node, and extracts a [`NoteSyncInfo`] from the
575    /// response.
576    async fn sync_notes(
577        &self,
578        block_num: BlockNumber,
579        block_to: Option<BlockNumber>,
580        note_tags: &BTreeSet<NoteTag>,
581    ) -> Result<NoteSyncInfo, RpcError> {
582        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
583
584        let block_range = Some(BlockRange {
585            block_from: block_num.as_u32(),
586            block_to: block_to.map(|b| b.as_u32()),
587        });
588
589        let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
590
591        let mut rpc_api = self.ensure_connected().await?;
592
593        let response = rpc_api.sync_notes(request).await.map_err(|status| {
594            RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNotes, status)
595        })?;
596
597        response.into_inner().try_into()
598    }
599
600    async fn sync_nullifiers(
601        &self,
602        prefixes: &[u16],
603        block_num: BlockNumber,
604        block_to: Option<BlockNumber>,
605    ) -> Result<Vec<NullifierUpdate>, RpcError> {
606        const MAX_ITERATIONS: u32 = 1000; // Safety limit to prevent infinite loops
607
608        let mut all_nullifiers = BTreeSet::new();
609
610        // Establish RPC connection once before the loop
611        let mut rpc_api = self.ensure_connected().await?;
612
613        // If the prefixes are too many, we need to chunk them into smaller groups to avoid
614        // violating the RPC limit.
615        'chunk_nullifiers: for chunk in prefixes.chunks(NULLIFIER_PREFIXES_LIMIT) {
616            let mut current_block_from = block_num.as_u32();
617
618            for _ in 0..MAX_ITERATIONS {
619                let request = proto::rpc::SyncNullifiersRequest {
620                    nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
621                    prefix_len: 16,
622                    block_range: Some(BlockRange {
623                        block_from: current_block_from,
624                        block_to: block_to.map(|b| b.as_u32()),
625                    }),
626                };
627
628                let response = rpc_api.sync_nullifiers(request).await.map_err(|status| {
629                    RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNullifiers, status)
630                })?;
631                let response = response.into_inner();
632
633                // Convert nullifiers for this batch
634                let batch_nullifiers = response
635                    .nullifiers
636                    .iter()
637                    .map(TryFrom::try_from)
638                    .collect::<Result<Vec<NullifierUpdate>, _>>()
639                    .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
640
641                all_nullifiers.extend(batch_nullifiers);
642
643                // Check if we need to fetch more pages
644                if let Some(page) = response.pagination_info {
645                    // Ensure we're making progress to avoid infinite loops
646                    if page.block_num < current_block_from {
647                        return Err(RpcError::InvalidResponse(
648                            "invalid pagination: block_num went backwards".to_string(),
649                        ));
650                    }
651
652                    // Calculate target block as minimum between block_to and chain_tip
653                    let target_block =
654                        block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
655
656                    if page.block_num >= target_block {
657                        // No pagination info or we've reached/passed the target so we're done
658                        continue 'chunk_nullifiers;
659                    }
660                    current_block_from = page.block_num + 1;
661                }
662            }
663            // If we exit the loop, we've hit the iteration limit
664            return Err(RpcError::InvalidResponse(
665                "too many pagination iterations, possible infinite loop".to_string(),
666            ));
667        }
668        Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
669    }
670
671    async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
672        let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
673        for chunk in nullifiers.chunks(NULLIFIER_PREFIXES_LIMIT) {
674            let request = proto::rpc::NullifierList {
675                nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
676            };
677
678            let mut rpc_api = self.ensure_connected().await?;
679
680            let response = rpc_api.check_nullifiers(request).await.map_err(|status| {
681                RpcError::from_grpc_error(NodeRpcClientEndpoint::CheckNullifiers, status)
682            })?;
683
684            let mut response = response.into_inner();
685            let chunk_proofs = response
686                .proofs
687                .iter_mut()
688                .map(|r| r.to_owned().try_into())
689                .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
690            proofs.extend(chunk_proofs);
691        }
692        Ok(proofs)
693    }
694
695    async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
696        let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
697
698        let mut rpc_api = self.ensure_connected().await?;
699
700        let response = rpc_api.get_block_by_number(request).await.map_err(|status| {
701            RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockByNumber, status)
702        })?;
703
704        let response = response.into_inner();
705        let block =
706            ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
707                "GetBlockByNumberResponse.block".to_string(),
708            ))?)?;
709
710        Ok(block)
711    }
712
713    async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
714        let request = proto::note::NoteRoot { root: Some(root.into()) };
715
716        let mut rpc_api = self.ensure_connected().await?;
717
718        let response = rpc_api.get_note_script_by_root(request).await.map_err(|status| {
719            RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNoteScriptByRoot, status)
720        })?;
721
722        let response = response.into_inner();
723        let note_script = NoteScript::try_from(
724            response
725                .script
726                .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
727        )?;
728
729        Ok(note_script)
730    }
731
732    async fn sync_storage_maps(
733        &self,
734        block_from: BlockNumber,
735        block_to: Option<BlockNumber>,
736        account_id: AccountId,
737    ) -> Result<StorageMapInfo, RpcError> {
738        let mut all_updates = Vec::new();
739        let mut current_block_from = block_from.as_u32();
740        let mut target_block_reached = false;
741        let mut final_chain_tip = 0;
742        let mut final_block_num = 0;
743
744        let mut rpc_api = self.ensure_connected().await?;
745
746        while !target_block_reached {
747            let request = proto::rpc::SyncAccountStorageMapsRequest {
748                block_range: Some(BlockRange {
749                    block_from: current_block_from,
750                    block_to: block_to.map(|b| b.as_u32()),
751                }),
752                account_id: Some(account_id.into()),
753            };
754
755            let response = rpc_api.sync_account_storage_maps(request).await.map_err(|status| {
756                RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncStorageMaps, status)
757            })?;
758            let response = response.into_inner();
759
760            let batch_updates = response
761                .updates
762                .into_iter()
763                .map(TryInto::try_into)
764                .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
765            all_updates.extend(batch_updates);
766
767            let page = response
768                .pagination_info
769                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
770
771            if page.block_num < current_block_from {
772                return Err(RpcError::InvalidResponse(
773                    "invalid pagination: block_num went backwards".to_owned(),
774                ));
775            }
776
777            final_chain_tip = page.chain_tip;
778            final_block_num = page.block_num;
779
780            let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
781
782            target_block_reached = page.block_num >= target_block;
783            current_block_from = page.block_num + 1;
784        }
785
786        Ok(StorageMapInfo {
787            chain_tip: final_chain_tip.into(),
788            block_number: final_block_num.into(),
789            updates: all_updates,
790        })
791    }
792
793    async fn sync_account_vault(
794        &self,
795        block_from: BlockNumber,
796        block_to: Option<BlockNumber>,
797        account_id: AccountId,
798    ) -> Result<AccountVaultInfo, RpcError> {
799        let mut all_updates = Vec::new();
800        let mut current_block_from = block_from.as_u32();
801        let mut target_block_reached = false;
802        let mut final_chain_tip = 0;
803        let mut final_block_num = 0;
804
805        let mut rpc_api = self.ensure_connected().await?;
806
807        while !target_block_reached {
808            let request = proto::rpc::SyncAccountVaultRequest {
809                block_range: Some(BlockRange {
810                    block_from: current_block_from,
811                    block_to: block_to.map(|b| b.as_u32()),
812                }),
813                account_id: Some(account_id.into()),
814            };
815
816            let response = rpc_api
817                .sync_account_vault(request)
818                .await
819                .map_err(|status| {
820                    RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncAccountVault, status)
821                })?
822                .into_inner();
823
824            let batch_updates = response
825                .updates
826                .iter()
827                .map(|u| (*u).try_into())
828                .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
829            all_updates.extend(batch_updates);
830
831            let page = response
832                .pagination_info
833                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
834
835            if page.block_num < current_block_from {
836                return Err(RpcError::InvalidResponse(
837                    "invalid pagination: block_num went backwards".to_owned(),
838                ));
839            }
840
841            final_chain_tip = page.chain_tip;
842            final_block_num = page.block_num;
843
844            let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
845
846            target_block_reached = page.block_num >= target_block;
847            current_block_from = page.block_num + 1;
848        }
849
850        Ok(AccountVaultInfo {
851            chain_tip: final_chain_tip.into(),
852            block_number: final_block_num.into(),
853            updates: all_updates,
854        })
855    }
856
857    async fn sync_transactions(
858        &self,
859        block_from: BlockNumber,
860        block_to: Option<BlockNumber>,
861        account_ids: Vec<AccountId>,
862    ) -> Result<TransactionsInfo, RpcError> {
863        let block_range = Some(BlockRange {
864            block_from: block_from.as_u32(),
865            block_to: block_to.map(|b| b.as_u32()),
866        });
867
868        let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
869
870        let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
871
872        let mut rpc_api = self.ensure_connected().await?;
873
874        let response = rpc_api.sync_transactions(request).await.map_err(|status| {
875            RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncTransactions, status)
876        })?;
877
878        response.into_inner().try_into()
879    }
880
881    async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
882        let endpoint_str: &str = &self.endpoint.clone();
883        let endpoint: Endpoint =
884            Endpoint::try_from(endpoint_str).map_err(RpcError::InvalidNodeEndpoint)?;
885        Ok(endpoint.to_network_id())
886    }
887}
888
889// ERRORS
890// ================================================================================================
891
892impl RpcError {
893    pub fn from_grpc_error(endpoint: NodeRpcClientEndpoint, status: Status) -> Self {
894        if let Some(accept_error) = AcceptHeaderError::try_from_message(status.message()) {
895            return Self::AcceptHeaderError(accept_error);
896        }
897
898        let error_kind = GrpcError::from(&status);
899        let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
900
901        Self::GrpcError {
902            endpoint,
903            error_kind,
904            source: Some(source),
905        }
906    }
907}
908
909impl From<&Status> for GrpcError {
910    fn from(status: &Status) -> Self {
911        GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
912    }
913}
914
915#[cfg(test)]
916mod tests {
917    use std::boxed::Box;
918
919    use miden_protocol::Word;
920
921    use super::GrpcClient;
922    use crate::rpc::{Endpoint, NodeRpcClient};
923
924    fn assert_send_sync<T: Send + Sync>() {}
925
926    #[test]
927    fn is_send_sync() {
928        assert_send_sync::<GrpcClient>();
929        assert_send_sync::<Box<dyn NodeRpcClient>>();
930    }
931
932    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
933    async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
934        // This won't compile if `get_block_header_by_number` doesn't return a `Send+Sync` future.
935        let res = client.get_block_header_by_number(None, false).await;
936        assert!(res.is_ok());
937    }
938
939    #[tokio::test]
940    async fn future_is_send() {
941        let endpoint = &Endpoint::devnet();
942        let client = GrpcClient::new(endpoint, 10000);
943        let client: Box<GrpcClient> = client.into();
944        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
945    }
946
947    #[tokio::test]
948    async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
949        let endpoint = &Endpoint::devnet();
950        let client = GrpcClient::new(endpoint, 10000);
951
952        assert!(client.genesis_commitment.read().is_none());
953
954        let commitment = Word::default();
955        client.set_genesis_commitment(commitment).await.unwrap();
956
957        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
958    }
959
960    #[tokio::test]
961    async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
962        use miden_protocol::Felt;
963
964        let endpoint = &Endpoint::devnet();
965        let client = GrpcClient::new(endpoint, 10000);
966
967        let initial_commitment = Word::default();
968        client.set_genesis_commitment(initial_commitment).await.unwrap();
969
970        let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
971        client.set_genesis_commitment(new_commitment).await.unwrap();
972
973        assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
974    }
975
976    #[tokio::test]
977    async fn set_genesis_commitment_updates_the_client_if_already_connected() {
978        let endpoint = &Endpoint::devnet();
979        let client = GrpcClient::new(endpoint, 10000);
980
981        // "Connect" the client
982        client.connect().await.unwrap();
983
984        let commitment = Word::default();
985        client.set_genesis_commitment(commitment).await.unwrap();
986
987        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
988        assert!(client.client.read().as_ref().is_some());
989    }
990}