Skip to main content

miden_client/rpc/tonic_client/
mod.rs

1use alloc::borrow::ToOwned;
2use alloc::boxed::Box;
3use alloc::collections::{BTreeMap, BTreeSet};
4use alloc::string::{String, ToString};
5use alloc::vec::Vec;
6use core::error::Error;
7use miden_protocol::asset::{Asset, AssetVault};
8
9use miden_protocol::account::{
10    Account, AccountCode, AccountId, AccountStorage, StorageMap, StorageSlot, StorageSlotType,
11};
12use miden_protocol::address::NetworkId;
13use miden_protocol::block::account_tree::AccountWitness;
14use miden_protocol::block::{BlockHeader, BlockNumber, ProvenBlock};
15use miden_protocol::crypto::merkle::MerklePath;
16use miden_protocol::crypto::merkle::mmr::{Forest, MmrProof};
17use miden_protocol::crypto::merkle::smt::SmtProof;
18use miden_protocol::note::{NoteId, NoteScript, NoteTag, Nullifier};
19use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
20use miden_protocol::utils::Deserializable;
21use miden_protocol::{EMPTY_WORD, Word};
22use miden_tx::utils::Serializable;
23use miden_tx::utils::sync::RwLock;
24use tonic::Status;
25use tracing::info;
26
27use super::domain::account::{AccountProof, AccountStorageDetails, AccountUpdateSummary};
28use super::domain::{note::FetchedNote, nullifier::NullifierUpdate};
29use super::generated::rpc::account_request::AccountDetailRequest;
30use super::generated::rpc::AccountRequest;
31use super::{
32    Endpoint, FetchedAccount, NodeRpcClient, NodeRpcClientEndpoint, NoteSyncInfo, RpcError,
33    StateSyncInfo,
34};
35use crate::rpc::domain::account_vault::{AccountVaultInfo, AccountVaultUpdate};
36use crate::rpc::domain::storage_map::{StorageMapInfo, StorageMapUpdate};
37use crate::rpc::domain::transaction::TransactionsInfo;
38use crate::rpc::errors::{AcceptHeaderError, GrpcError, RpcConversionError};
39use crate::rpc::generated::rpc::account_request::account_detail_request::storage_map_detail_request::SlotData;
40use crate::rpc::generated::rpc::account_request::account_detail_request::StorageMapDetailRequest;
41use crate::rpc::generated::rpc::BlockRange;
42use crate::rpc::{AccountStateAt, NOTE_IDS_LIMIT, NULLIFIER_PREFIXES_LIMIT, generated as proto};
43use crate::transaction::ForeignAccount;
44
45mod api_client;
46use api_client::api_client_wrapper::ApiClient;
47
48// GRPC CLIENT
49// ================================================================================================
50
51/// Client for the Node RPC API using gRPC.
52///
53/// If the `tonic` feature is enabled, this client will use a `tonic::transport::Channel` to
54/// communicate with the node. In this case the connection will be established lazily when the
55/// first request is made.
56/// If the `web-tonic` feature is enabled, this client will use a `tonic_web_wasm_client::Client`
57/// to communicate with the node.
58///
59/// In both cases, the [`GrpcClient`] depends on the types inside the `generated` module, which
60/// are generated by the build script and also depend on the target architecture.
61pub struct GrpcClient {
62    client: RwLock<Option<ApiClient>>,
63    endpoint: String,
64    timeout_ms: u64,
65    genesis_commitment: RwLock<Option<Word>>,
66}
67
68impl GrpcClient {
69    /// Returns a new instance of [`GrpcClient`] that'll do calls to the provided [`Endpoint`]
70    /// with the given timeout in milliseconds.
71    pub fn new(endpoint: &Endpoint, timeout_ms: u64) -> GrpcClient {
72        GrpcClient {
73            client: RwLock::new(None),
74            endpoint: endpoint.to_string(),
75            timeout_ms,
76            genesis_commitment: RwLock::new(None),
77        }
78    }
79
80    /// Takes care of establishing the RPC connection if not connected yet. It ensures that the
81    /// `rpc_api` field is initialized and returns a write guard to it.
82    async fn ensure_connected(&self) -> Result<ApiClient, RpcError> {
83        if self.client.read().is_none() {
84            self.connect().await?;
85        }
86
87        Ok(self.client.read().as_ref().expect("rpc_api should be initialized").clone())
88    }
89
90    /// Connects to the Miden node, setting the client API with the provided URL, timeout and
91    /// genesis commitment.
92    async fn connect(&self) -> Result<(), RpcError> {
93        let genesis_commitment = *self.genesis_commitment.read();
94        let new_client =
95            ApiClient::new_client(self.endpoint.clone(), self.timeout_ms, genesis_commitment)
96                .await?;
97        let mut client = self.client.write();
98        client.replace(new_client);
99
100        Ok(())
101    }
102
103    // GET ACCOUNT HELPERS
104    // ============================================================================================
105
106    /// Given an [`AccountId`], return the proof for the account.
107    ///
108    /// If the account also has public state, its details will also be retrieved
109    pub async fn fetch_full_account_proof(
110        &self,
111        account_id: AccountId,
112    ) -> Result<(BlockNumber, AccountProof), RpcError> {
113        let mut rpc_api = self.ensure_connected().await?;
114        let has_public_state = account_id.has_public_state();
115        let account_request = {
116            AccountRequest {
117                account_id: Some(account_id.into()),
118                block_num: None,
119                details: {
120                    if has_public_state {
121                        // Since we have to request the storage maps for an account
122                        // we *dont know* anything about, we'll have to do first this
123                        // request, which will tell us about the account's storage slots,
124                        // and then, request the slots in another request.
125                        Some(AccountDetailRequest {
126                            code_commitment: Some(EMPTY_WORD.into()),
127                            asset_vault_commitment: Some(EMPTY_WORD.into()),
128                            storage_maps: vec![],
129                        })
130                    } else {
131                        None
132                    }
133                },
134            }
135        };
136        let account_response = rpc_api
137            .get_account(account_request)
138            .await
139            .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
140            .into_inner();
141        let block_number = account_response.block_num.ok_or(RpcError::ExpectedDataMissing(
142            "GetAccountDetails returned an account without a matching block number for the witness"
143                .to_owned(),
144        ))?;
145        let account_proof = {
146            if has_public_state {
147                let account_details = account_response
148                    .details
149                    .ok_or(RpcError::ExpectedDataMissing("details in public account".to_owned()))?
150                    .into_domain(&BTreeMap::new())?;
151                let storage_header = account_details.storage_details.header;
152                // This variable will hold the storage slots that are maps, below we will use it to
153                // actually fetch the storage maps details, since we now know the names of each
154                // storage slot.
155                let maps_to_request = storage_header
156                    .slots()
157                    .filter(|header| header.slot_type().is_map())
158                    .map(|map| map.name().to_string());
159                let account_request = AccountRequest {
160                    account_id: Some(account_id.into()),
161                    block_num: None,
162                    details: Some(AccountDetailRequest {
163                        code_commitment: Some(EMPTY_WORD.into()),
164                        asset_vault_commitment: Some(EMPTY_WORD.into()),
165                        storage_maps: maps_to_request
166                            .map(|slot_name| StorageMapDetailRequest {
167                                slot_name,
168                                slot_data: Some(SlotData::AllEntries(true)),
169                            })
170                            .collect(),
171                    }),
172                };
173                match rpc_api.get_account(account_request).await {
174                    Ok(account_proof) => account_proof.into_inner().try_into(),
175                    Err(err) => Err(RpcError::ConnectionError(
176                        format!(
177                            "failed to fetch account proof for account: {account_id}, got: {err}"
178                        )
179                        .into(),
180                    )),
181                }
182            } else {
183                account_response.try_into()
184            }
185        };
186        Ok((block_number.block_num.into(), account_proof?))
187    }
188
189    /// Given the storage details for an account and its id, returns a vector with all of its
190    /// storage slots. Keep in mind that if an account triggers the `too_many_entries` flag, there
191    /// will potentially be multiple requests.
192    async fn build_storage_slots(
193        &self,
194        account_id: AccountId,
195        storage_details: &AccountStorageDetails,
196    ) -> Result<Vec<StorageSlot>, RpcError> {
197        let mut slots = vec![];
198        // `SyncStorageMaps` will return information for *every* map for a given account, so this
199        // map_cache value should be fetched only once, hence the None placeholder
200        let mut map_cache: Option<StorageMapInfo> = None;
201        for slot_header in storage_details.header.slots() {
202            // We have two cases for each slot:
203            // - Slot is a value => We simply instance a StorageSlot
204            // - Slot is a map => If the map is 'small', we can simply
205            // build the map from the given entries. Otherwise we will have to
206            // call the SyncStorageMaps RPC method to obtain the data for the map.
207            // With the current setup, one RPC call should be enough.
208            match slot_header.slot_type() {
209                StorageSlotType::Value => {
210                    slots.push(miden_protocol::account::StorageSlot::with_value(
211                        slot_header.name().clone(),
212                        slot_header.value(),
213                    ));
214                },
215                StorageSlotType::Map => {
216                    let map_details = storage_details.find_map_details(slot_header.name()).ok_or(
217                        RpcError::ExpectedDataMissing(format!(
218                            "slot named '{}' was reported as a map, but it does not have a matching map_detail entry",
219                            slot_header.name(),
220                        )),
221                    )?;
222
223                    let storage_map = if map_details.too_many_entries {
224                        let map_info = if let Some(ref info) = map_cache {
225                            info
226                        } else {
227                            let fetched_data =
228                                self.sync_storage_maps(0_u32.into(), None, account_id).await?;
229                            map_cache.insert(fetched_data)
230                        };
231                        // The sync endpoint may return multiple updates for the same key
232                        // across different blocks. We sort by block number so that
233                        // inserting into the map keeps only the latest value per key.
234                        let mut sorted_updates: Vec<_> = map_info
235                            .updates
236                            .iter()
237                            .filter(|slot_info| slot_info.slot_name == *slot_header.name())
238                            .collect();
239                        sorted_updates.sort_by_key(|u| u.block_num);
240                        let map_entries: Vec<_> = sorted_updates
241                            .into_iter()
242                            .map(|u| (u.key, u.value))
243                            .collect::<BTreeMap<_, _>>()
244                            .into_iter()
245                            .collect();
246                        StorageMap::with_entries(map_entries)
247                    } else {
248                        map_details.entries.clone().into_storage_map()
249                    }
250                    .map_err(|err| {
251                        RpcError::InvalidResponse(format!(
252                            "the rpc api returned a non-valid map entry: {err}"
253                        ))
254                    })?;
255
256                    slots.push(miden_protocol::account::StorageSlot::with_map(
257                        slot_header.name().clone(),
258                        storage_map,
259                    ));
260                },
261            }
262        }
263        Ok(slots)
264    }
265}
266
267#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
268#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
269impl NodeRpcClient for GrpcClient {
270    /// Sets the genesis commitment for the client. If the client is already connected, it will be
271    /// updated to use the new commitment on subsequent requests. If the client is not connected,
272    /// the commitment will be stored and used when the client connects. If the genesis commitment
273    /// is already set, this method does nothing.
274    async fn set_genesis_commitment(&self, commitment: Word) -> Result<(), RpcError> {
275        // Check if already set before doing anything else
276        if self.genesis_commitment.read().is_some() {
277            // Genesis commitment is already set, ignoring the new value.
278            return Ok(());
279        }
280
281        // Store the commitment for future connections
282        self.genesis_commitment.write().replace(commitment);
283
284        // If a client is already connected, update it to use the new genesis commitment.
285        // If not connected, the commitment will be used when connect() is called.
286        let mut client_guard = self.client.write();
287        if let Some(client) = client_guard.as_mut() {
288            client.set_genesis_commitment(commitment);
289        }
290
291        Ok(())
292    }
293
294    async fn submit_proven_transaction(
295        &self,
296        proven_transaction: ProvenTransaction,
297        transaction_inputs: TransactionInputs,
298    ) -> Result<BlockNumber, RpcError> {
299        let request = proto::transaction::ProvenTransaction {
300            transaction: proven_transaction.to_bytes(),
301            transaction_inputs: Some(transaction_inputs.to_bytes()),
302        };
303
304        let mut rpc_api = self.ensure_connected().await?;
305
306        let api_response = rpc_api.submit_proven_transaction(request).await.map_err(|status| {
307            RpcError::from_grpc_error(NodeRpcClientEndpoint::SubmitProvenTx, status)
308        })?;
309
310        Ok(BlockNumber::from(api_response.into_inner().block_num))
311    }
312
313    async fn get_block_header_by_number(
314        &self,
315        block_num: Option<BlockNumber>,
316        include_mmr_proof: bool,
317    ) -> Result<(BlockHeader, Option<MmrProof>), RpcError> {
318        let request = proto::rpc::BlockHeaderByNumberRequest {
319            block_num: block_num.as_ref().map(BlockNumber::as_u32),
320            include_mmr_proof: Some(include_mmr_proof),
321        };
322
323        info!("Calling GetBlockHeaderByNumber: {:?}", request);
324
325        let mut rpc_api = self.ensure_connected().await?;
326
327        let api_response = rpc_api.get_block_header_by_number(request).await.map_err(|status| {
328            RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockHeaderByNumber, status)
329        })?;
330
331        let response = api_response.into_inner();
332
333        let block_header: BlockHeader = response
334            .block_header
335            .ok_or(RpcError::ExpectedDataMissing("BlockHeader".into()))?
336            .try_into()?;
337
338        let mmr_proof = if include_mmr_proof {
339            let forest = response
340                .chain_length
341                .ok_or(RpcError::ExpectedDataMissing("ChainLength".into()))?;
342            let merkle_path: MerklePath = response
343                .mmr_path
344                .ok_or(RpcError::ExpectedDataMissing("MmrPath".into()))?
345                .try_into()?;
346
347            Some(MmrProof {
348                forest: Forest::new(usize::try_from(forest).expect("u64 should fit in usize")),
349                position: block_header.block_num().as_usize(),
350                merkle_path,
351            })
352        } else {
353            None
354        };
355
356        Ok((block_header, mmr_proof))
357    }
358
359    async fn get_notes_by_id(&self, note_ids: &[NoteId]) -> Result<Vec<FetchedNote>, RpcError> {
360        let mut notes = Vec::with_capacity(note_ids.len());
361        for chunk in note_ids.chunks(NOTE_IDS_LIMIT) {
362            let request = proto::note::NoteIdList {
363                ids: chunk.iter().map(|id| (*id).into()).collect(),
364            };
365
366            let mut rpc_api = self.ensure_connected().await?;
367
368            let api_response = rpc_api.get_notes_by_id(request).await.map_err(|status| {
369                RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNotesById, status)
370            })?;
371
372            let response_notes = api_response
373                .into_inner()
374                .notes
375                .into_iter()
376                .map(FetchedNote::try_from)
377                .collect::<Result<Vec<FetchedNote>, RpcConversionError>>()?;
378
379            notes.extend(response_notes);
380        }
381        Ok(notes)
382    }
383
384    /// Sends a sync state request to the Miden node, validates and converts the response
385    /// into a [`StateSyncInfo`] struct.
386    async fn sync_state(
387        &self,
388        block_num: BlockNumber,
389        account_ids: &[AccountId],
390        note_tags: &BTreeSet<NoteTag>,
391    ) -> Result<StateSyncInfo, RpcError> {
392        let account_ids = account_ids.iter().map(|acc| (*acc).into()).collect();
393
394        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
395
396        let request = proto::rpc::SyncStateRequest {
397            block_num: block_num.as_u32(),
398            account_ids,
399            note_tags,
400        };
401
402        let mut rpc_api = self.ensure_connected().await?;
403
404        let response = rpc_api.sync_state(request).await.map_err(|status| {
405            RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncState, status)
406        })?;
407        response.into_inner().try_into()
408    }
409
410    /// Sends a `GetAccountDetailsRequest` to the Miden node, and extracts an [`FetchedAccount`]
411    /// from the `GetAccountDetailsResponse` response.
412    ///
413    /// # Errors
414    ///
415    /// This function will return an error if:
416    ///
417    /// - There was an error sending the request to the node.
418    /// - The answer had a `None` for one of the expected fields (`account`, `summary`,
419    ///   `account_commitment`, `details`).
420    /// - There is an error during [Account] deserialization.
421    async fn get_account_details(&self, account_id: AccountId) -> Result<FetchedAccount, RpcError> {
422        let (block_number, full_account_proof) = self.fetch_full_account_proof(account_id).await?;
423        let update_summary =
424            AccountUpdateSummary::new(full_account_proof.account_commitment(), block_number);
425
426        // The case for a private account is simple,
427        // we simple use the commitment and its id.
428        if account_id.is_private() {
429            Ok(FetchedAccount::new_private(account_id, update_summary))
430        } else {
431            // An account with public state has to fetch all of its state.
432            // Even more so, an account with a large state will have to do
433            // a couple of extra requests to fetch all of its data.
434            let details =
435                full_account_proof.into_parts().1.ok_or(RpcError::ExpectedDataMissing(
436                    "GetAccountDetails returned a public account without details".to_owned(),
437                ))?;
438            let account_id = details.header.id();
439            let nonce = details.header.nonce();
440            let assets: Vec<Asset> = {
441                if details.vault_details.too_many_assets {
442                    let vault_info =
443                        self.sync_account_vault(BlockNumber::from(0), None, account_id).await?;
444                    // The sync endpoint may return multiple updates for the same vault key
445                    // across different blocks. We sort by block number so that
446                    // inserting into the map keeps only the latest value per key.
447                    let mut updates = vault_info.updates;
448                    updates.sort_by_key(|u| u.block_num);
449                    updates
450                        .into_iter()
451                        .map(|u| (Word::from(u.vault_key), u.asset))
452                        .collect::<BTreeMap<_, _>>()
453                        .into_values()
454                        .flatten()
455                        .collect()
456                } else {
457                    details.vault_details.assets
458                }
459            };
460
461            let slots = self.build_storage_slots(account_id, &details.storage_details).await?;
462            let seed = None;
463            let asset_vault = AssetVault::new(&assets).map_err(|err| {
464                RpcError::InvalidResponse(format!("api rpc returned non-valid assets: {err}"))
465            })?;
466            let account_storage = AccountStorage::new(slots).map_err(|err| {
467                RpcError::InvalidResponse(format!(
468                    "api rpc returned non-valid storage slots: {err}"
469                ))
470            })?;
471            let account =
472                Account::new(account_id, asset_vault, account_storage, details.code, nonce, seed)
473                    .map_err(|err| {
474                    RpcError::InvalidResponse(format!(
475                        "failed to instance an account from the rpc api response: {err}"
476                    ))
477                })?;
478            Ok(FetchedAccount::new_public(account, update_summary))
479        }
480    }
481
482    /// Sends a `GetAccountProof` request to the Miden node, and extracts the [AccountProof]
483    /// from the response, as well as the block number that it was retrieved for.
484    ///
485    /// # Errors
486    ///
487    /// This function will return an error if:
488    ///
489    /// - The requested Account isn't returned by the node.
490    /// - There was an error sending the request to the node.
491    /// - The answer had a `None` for one of the expected fields.
492    /// - There is an error during storage deserialization.
493    async fn get_account(
494        &self,
495        foreign_account: ForeignAccount,
496        account_state: AccountStateAt,
497        known_account_code: Option<AccountCode>,
498    ) -> Result<(BlockNumber, AccountProof), RpcError> {
499        let mut known_codes_by_commitment: BTreeMap<Word, AccountCode> = BTreeMap::new();
500        if let Some(account_code) = known_account_code {
501            known_codes_by_commitment.insert(account_code.commitment(), account_code);
502        }
503
504        let mut rpc_api = self.ensure_connected().await?;
505
506        // Request proofs one-by-one using the singular API
507        let account_id = foreign_account.account_id();
508        let storage_requirements = foreign_account.storage_slot_requirements();
509
510        let storage_maps: Vec<StorageMapDetailRequest> = storage_requirements.clone().into();
511
512        // Only request details for public accounts; include known code commitment for this
513        // account when available
514        let account_details = if account_id.is_public() {
515            Some(AccountDetailRequest {
516                code_commitment: Some(EMPTY_WORD.into()),
517                // TODO: implement a way to request asset vaults
518                // https://github.com/0xMiden/miden-client/issues/1412
519                asset_vault_commitment: None,
520                storage_maps,
521            })
522        } else {
523            None
524        };
525
526        let block_num = match account_state {
527            AccountStateAt::Block(number) => Some(number.into()),
528            AccountStateAt::ChainTip => None,
529        };
530
531        let request = AccountRequest {
532            account_id: Some(account_id.into()),
533            block_num,
534            details: account_details,
535        };
536
537        let response = rpc_api
538            .get_account(request)
539            .await
540            .map_err(|status| RpcError::from_grpc_error(NodeRpcClientEndpoint::GetAccount, status))?
541            .into_inner();
542
543        let account_witness: AccountWitness = response
544            .witness
545            .ok_or(RpcError::ExpectedDataMissing("AccountWitness".to_string()))?
546            .try_into()?;
547
548        // For public accounts, details should be present when requested
549        let headers = if account_witness.id().is_public() {
550            Some(
551                response
552                    .details
553                    .ok_or(RpcError::ExpectedDataMissing("Account.Details".to_string()))?
554                    .into_domain(&known_codes_by_commitment)?,
555            )
556        } else {
557            None
558        };
559
560        let proof = AccountProof::new(account_witness, headers)
561            .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
562
563        let block_num = response
564            .block_num
565            .ok_or(RpcError::ExpectedDataMissing("response block num".to_string()))?
566            .block_num
567            .into();
568
569        Ok((block_num, proof))
570    }
571
572    /// Sends a `SyncNoteRequest` to the Miden node, and extracts a [`NoteSyncInfo`] from the
573    /// response.
574    async fn sync_notes(
575        &self,
576        block_num: BlockNumber,
577        block_to: Option<BlockNumber>,
578        note_tags: &BTreeSet<NoteTag>,
579    ) -> Result<NoteSyncInfo, RpcError> {
580        let note_tags = note_tags.iter().map(|&note_tag| note_tag.into()).collect();
581
582        let block_range = Some(BlockRange {
583            block_from: block_num.as_u32(),
584            block_to: block_to.map(|b| b.as_u32()),
585        });
586
587        let request = proto::rpc::SyncNotesRequest { block_range, note_tags };
588
589        let mut rpc_api = self.ensure_connected().await?;
590
591        let response = rpc_api.sync_notes(request).await.map_err(|status| {
592            RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNotes, status)
593        })?;
594
595        response.into_inner().try_into()
596    }
597
598    async fn sync_nullifiers(
599        &self,
600        prefixes: &[u16],
601        block_num: BlockNumber,
602        block_to: Option<BlockNumber>,
603    ) -> Result<Vec<NullifierUpdate>, RpcError> {
604        const MAX_ITERATIONS: u32 = 1000; // Safety limit to prevent infinite loops
605
606        let mut all_nullifiers = BTreeSet::new();
607
608        // Establish RPC connection once before the loop
609        let mut rpc_api = self.ensure_connected().await?;
610
611        // If the prefixes are too many, we need to chunk them into smaller groups to avoid
612        // violating the RPC limit.
613        'chunk_nullifiers: for chunk in prefixes.chunks(NULLIFIER_PREFIXES_LIMIT) {
614            let mut current_block_from = block_num.as_u32();
615
616            for _ in 0..MAX_ITERATIONS {
617                let request = proto::rpc::SyncNullifiersRequest {
618                    nullifiers: chunk.iter().map(|&x| u32::from(x)).collect(),
619                    prefix_len: 16,
620                    block_range: Some(BlockRange {
621                        block_from: current_block_from,
622                        block_to: block_to.map(|b| b.as_u32()),
623                    }),
624                };
625
626                let response = rpc_api.sync_nullifiers(request).await.map_err(|status| {
627                    RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncNullifiers, status)
628                })?;
629                let response = response.into_inner();
630
631                // Convert nullifiers for this batch
632                let batch_nullifiers = response
633                    .nullifiers
634                    .iter()
635                    .map(TryFrom::try_from)
636                    .collect::<Result<Vec<NullifierUpdate>, _>>()
637                    .map_err(|err| RpcError::InvalidResponse(err.to_string()))?;
638
639                all_nullifiers.extend(batch_nullifiers);
640
641                // Check if we need to fetch more pages
642                if let Some(page) = response.pagination_info {
643                    // Ensure we're making progress to avoid infinite loops
644                    if page.block_num < current_block_from {
645                        return Err(RpcError::InvalidResponse(
646                            "invalid pagination: block_num went backwards".to_string(),
647                        ));
648                    }
649
650                    // Calculate target block as minimum between block_to and chain_tip
651                    let target_block =
652                        block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
653
654                    if page.block_num >= target_block {
655                        // No pagination info or we've reached/passed the target so we're done
656                        continue 'chunk_nullifiers;
657                    }
658                    current_block_from = page.block_num + 1;
659                }
660            }
661            // If we exit the loop, we've hit the iteration limit
662            return Err(RpcError::InvalidResponse(
663                "too many pagination iterations, possible infinite loop".to_string(),
664            ));
665        }
666        Ok(all_nullifiers.into_iter().collect::<Vec<_>>())
667    }
668
669    async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Result<Vec<SmtProof>, RpcError> {
670        let mut proofs: Vec<SmtProof> = Vec::with_capacity(nullifiers.len());
671        for chunk in nullifiers.chunks(NULLIFIER_PREFIXES_LIMIT) {
672            let request = proto::rpc::NullifierList {
673                nullifiers: chunk.iter().map(|nul| nul.as_word().into()).collect(),
674            };
675
676            let mut rpc_api = self.ensure_connected().await?;
677
678            let response = rpc_api.check_nullifiers(request).await.map_err(|status| {
679                RpcError::from_grpc_error(NodeRpcClientEndpoint::CheckNullifiers, status)
680            })?;
681
682            let mut response = response.into_inner();
683            let chunk_proofs = response
684                .proofs
685                .iter_mut()
686                .map(|r| r.to_owned().try_into())
687                .collect::<Result<Vec<SmtProof>, RpcConversionError>>()?;
688            proofs.extend(chunk_proofs);
689        }
690        Ok(proofs)
691    }
692
693    async fn get_block_by_number(&self, block_num: BlockNumber) -> Result<ProvenBlock, RpcError> {
694        let request = proto::blockchain::BlockNumber { block_num: block_num.as_u32() };
695
696        let mut rpc_api = self.ensure_connected().await?;
697
698        let response = rpc_api.get_block_by_number(request).await.map_err(|status| {
699            RpcError::from_grpc_error(NodeRpcClientEndpoint::GetBlockByNumber, status)
700        })?;
701
702        let response = response.into_inner();
703        let block =
704            ProvenBlock::read_from_bytes(&response.block.ok_or(RpcError::ExpectedDataMissing(
705                "GetBlockByNumberResponse.block".to_string(),
706            ))?)?;
707
708        Ok(block)
709    }
710
711    async fn get_note_script_by_root(&self, root: Word) -> Result<NoteScript, RpcError> {
712        let request = proto::note::NoteRoot { root: Some(root.into()) };
713
714        let mut rpc_api = self.ensure_connected().await?;
715
716        let response = rpc_api.get_note_script_by_root(request).await.map_err(|status| {
717            RpcError::from_grpc_error(NodeRpcClientEndpoint::GetNoteScriptByRoot, status)
718        })?;
719
720        let response = response.into_inner();
721        let note_script = NoteScript::try_from(
722            response
723                .script
724                .ok_or(RpcError::ExpectedDataMissing("GetNoteScriptByRoot.script".to_string()))?,
725        )?;
726
727        Ok(note_script)
728    }
729
730    async fn sync_storage_maps(
731        &self,
732        block_from: BlockNumber,
733        block_to: Option<BlockNumber>,
734        account_id: AccountId,
735    ) -> Result<StorageMapInfo, RpcError> {
736        let mut all_updates = Vec::new();
737        let mut current_block_from = block_from.as_u32();
738        let mut target_block_reached = false;
739        let mut final_chain_tip = 0;
740        let mut final_block_num = 0;
741
742        let mut rpc_api = self.ensure_connected().await?;
743
744        while !target_block_reached {
745            let request = proto::rpc::SyncAccountStorageMapsRequest {
746                block_range: Some(BlockRange {
747                    block_from: current_block_from,
748                    block_to: block_to.map(|b| b.as_u32()),
749                }),
750                account_id: Some(account_id.into()),
751            };
752
753            let response = rpc_api.sync_account_storage_maps(request).await.map_err(|status| {
754                RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncStorageMaps, status)
755            })?;
756            let response = response.into_inner();
757
758            let batch_updates = response
759                .updates
760                .into_iter()
761                .map(TryInto::try_into)
762                .collect::<Result<Vec<StorageMapUpdate>, _>>()?;
763            all_updates.extend(batch_updates);
764
765            let page = response
766                .pagination_info
767                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
768
769            if page.block_num < current_block_from {
770                return Err(RpcError::InvalidResponse(
771                    "invalid pagination: block_num went backwards".to_owned(),
772                ));
773            }
774
775            final_chain_tip = page.chain_tip;
776            final_block_num = page.block_num;
777
778            let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
779
780            target_block_reached = page.block_num >= target_block;
781            current_block_from = page.block_num + 1;
782        }
783
784        Ok(StorageMapInfo {
785            chain_tip: final_chain_tip.into(),
786            block_number: final_block_num.into(),
787            updates: all_updates,
788        })
789    }
790
791    async fn sync_account_vault(
792        &self,
793        block_from: BlockNumber,
794        block_to: Option<BlockNumber>,
795        account_id: AccountId,
796    ) -> Result<AccountVaultInfo, RpcError> {
797        let mut all_updates = Vec::new();
798        let mut current_block_from = block_from.as_u32();
799        let mut target_block_reached = false;
800        let mut final_chain_tip = 0;
801        let mut final_block_num = 0;
802
803        let mut rpc_api = self.ensure_connected().await?;
804
805        while !target_block_reached {
806            let request = proto::rpc::SyncAccountVaultRequest {
807                block_range: Some(BlockRange {
808                    block_from: current_block_from,
809                    block_to: block_to.map(|b| b.as_u32()),
810                }),
811                account_id: Some(account_id.into()),
812            };
813
814            let response = rpc_api
815                .sync_account_vault(request)
816                .await
817                .map_err(|status| {
818                    RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncAccountVault, status)
819                })?
820                .into_inner();
821
822            let batch_updates = response
823                .updates
824                .iter()
825                .map(|u| (*u).try_into())
826                .collect::<Result<Vec<AccountVaultUpdate>, _>>()?;
827            all_updates.extend(batch_updates);
828
829            let page = response
830                .pagination_info
831                .ok_or(RpcError::ExpectedDataMissing("pagination_info".to_owned()))?;
832
833            if page.block_num < current_block_from {
834                return Err(RpcError::InvalidResponse(
835                    "invalid pagination: block_num went backwards".to_owned(),
836                ));
837            }
838
839            final_chain_tip = page.chain_tip;
840            final_block_num = page.block_num;
841
842            let target_block = block_to.map_or(page.chain_tip, |b| b.as_u32().min(page.chain_tip));
843
844            target_block_reached = page.block_num >= target_block;
845            current_block_from = page.block_num + 1;
846        }
847
848        Ok(AccountVaultInfo {
849            chain_tip: final_chain_tip.into(),
850            block_number: final_block_num.into(),
851            updates: all_updates,
852        })
853    }
854
855    async fn sync_transactions(
856        &self,
857        block_from: BlockNumber,
858        block_to: Option<BlockNumber>,
859        account_ids: Vec<AccountId>,
860    ) -> Result<TransactionsInfo, RpcError> {
861        let block_range = Some(BlockRange {
862            block_from: block_from.as_u32(),
863            block_to: block_to.map(|b| b.as_u32()),
864        });
865
866        let account_ids = account_ids.iter().map(|acc_id| (*acc_id).into()).collect();
867
868        let request = proto::rpc::SyncTransactionsRequest { block_range, account_ids };
869
870        let mut rpc_api = self.ensure_connected().await?;
871
872        let response = rpc_api.sync_transactions(request).await.map_err(|status| {
873            RpcError::from_grpc_error(NodeRpcClientEndpoint::SyncTransactions, status)
874        })?;
875
876        response.into_inner().try_into()
877    }
878
879    async fn get_network_id(&self) -> Result<NetworkId, RpcError> {
880        let endpoint_str: &str = &self.endpoint.clone();
881        let endpoint: Endpoint =
882            Endpoint::try_from(endpoint_str).map_err(RpcError::InvalidNodeEndpoint)?;
883        Ok(endpoint.to_network_id())
884    }
885}
886
887// ERRORS
888// ================================================================================================
889
890impl RpcError {
891    pub fn from_grpc_error(endpoint: NodeRpcClientEndpoint, status: Status) -> Self {
892        if let Some(accept_error) = AcceptHeaderError::try_from_message(status.message()) {
893            return Self::AcceptHeaderError(accept_error);
894        }
895
896        let error_kind = GrpcError::from(&status);
897        let source = Box::new(status) as Box<dyn Error + Send + Sync + 'static>;
898
899        Self::GrpcError {
900            endpoint,
901            error_kind,
902            source: Some(source),
903        }
904    }
905}
906
907impl From<&Status> for GrpcError {
908    fn from(status: &Status) -> Self {
909        GrpcError::from_code(status.code() as i32, Some(status.message().to_string()))
910    }
911}
912
913#[cfg(test)]
914mod tests {
915    use std::boxed::Box;
916
917    use miden_protocol::Word;
918
919    use super::GrpcClient;
920    use crate::rpc::{Endpoint, NodeRpcClient};
921
922    fn assert_send_sync<T: Send + Sync>() {}
923
924    #[test]
925    fn is_send_sync() {
926        assert_send_sync::<GrpcClient>();
927        assert_send_sync::<Box<dyn NodeRpcClient>>();
928    }
929
930    // Function that returns a `Send` future from a dynamic trait that must be `Sync`.
931    async fn dyn_trait_send_fut(client: Box<dyn NodeRpcClient>) {
932        // This won't compile if `get_block_header_by_number` doesn't return a `Send+Sync` future.
933        let res = client.get_block_header_by_number(None, false).await;
934        assert!(res.is_ok());
935    }
936
937    #[tokio::test]
938    async fn future_is_send() {
939        let endpoint = &Endpoint::devnet();
940        let client = GrpcClient::new(endpoint, 10000);
941        let client: Box<GrpcClient> = client.into();
942        tokio::task::spawn(async move { dyn_trait_send_fut(client).await });
943    }
944
945    #[tokio::test]
946    async fn set_genesis_commitment_sets_the_commitment_when_its_not_already_set() {
947        let endpoint = &Endpoint::devnet();
948        let client = GrpcClient::new(endpoint, 10000);
949
950        assert!(client.genesis_commitment.read().is_none());
951
952        let commitment = Word::default();
953        client.set_genesis_commitment(commitment).await.unwrap();
954
955        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
956    }
957
958    #[tokio::test]
959    async fn set_genesis_commitment_does_nothing_if_the_commitment_is_already_set() {
960        use miden_protocol::Felt;
961
962        let endpoint = &Endpoint::devnet();
963        let client = GrpcClient::new(endpoint, 10000);
964
965        let initial_commitment = Word::default();
966        client.set_genesis_commitment(initial_commitment).await.unwrap();
967
968        let new_commitment = Word::from([Felt::new(1), Felt::new(2), Felt::new(3), Felt::new(4)]);
969        client.set_genesis_commitment(new_commitment).await.unwrap();
970
971        assert_eq!(client.genesis_commitment.read().unwrap(), initial_commitment);
972    }
973
974    #[tokio::test]
975    async fn set_genesis_commitment_updates_the_client_if_already_connected() {
976        let endpoint = &Endpoint::devnet();
977        let client = GrpcClient::new(endpoint, 10000);
978
979        // "Connect" the client
980        client.connect().await.unwrap();
981
982        let commitment = Word::default();
983        client.set_genesis_commitment(commitment).await.unwrap();
984
985        assert_eq!(client.genesis_commitment.read().unwrap(), commitment);
986        assert!(client.client.read().as_ref().is_some());
987    }
988}