celestia_grpc/
client.rs

1use std::fmt;
2use std::sync::Arc;
3
4use ::tendermint::chain::Id;
5use celestia_types::any::IntoProtobufAny;
6use k256::ecdsa::VerifyingKey;
7use lumina_utils::time::Interval;
8use prost::Message;
9use std::time::Duration;
10use tokio::sync::{Mutex, MutexGuard, OnceCell};
11
12use celestia_grpc_macros::grpc_method;
13use celestia_proto::celestia::blob::v1::query_client::QueryClient as BlobQueryClient;
14use celestia_proto::celestia::core::v1::gas_estimation::gas_estimator_client::GasEstimatorClient;
15use celestia_proto::celestia::core::v1::tx::tx_client::TxClient as TxStatusClient;
16use celestia_proto::cosmos::auth::v1beta1::query_client::QueryClient as AuthQueryClient;
17use celestia_proto::cosmos::bank::v1beta1::query_client::QueryClient as BankQueryClient;
18use celestia_proto::cosmos::base::node::v1beta1::service_client::ServiceClient as ConfigServiceClient;
19use celestia_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient as TendermintServiceClient;
20use celestia_proto::cosmos::staking::v1beta1::query_client::QueryClient as StakingQueryClient;
21use celestia_proto::cosmos::tx::v1beta1::service_client::ServiceClient as TxServiceClient;
22use celestia_types::blob::{BlobParams, MsgPayForBlobs, RawBlobTx, RawMsgPayForBlobs};
23use celestia_types::block::Block;
24use celestia_types::consts::appconsts;
25use celestia_types::hash::Hash;
26use celestia_types::state::auth::{Account, AuthParams, BaseAccount};
27use celestia_types::state::{
28    AbciQueryResponse, PageRequest, QueryDelegationResponse, QueryRedelegationsResponse,
29    QueryUnbondingDelegationResponse, RawTxBody, ValAddress,
30};
31use celestia_types::state::{
32    AccAddress, Address, AddressTrait, BOND_DENOM, Coin, ErrorCode, TxResponse,
33};
34use celestia_types::{AppVersion, Blob, ExtendedHeader};
35
36use crate::abci_proofs::ProofChain;
37use crate::boxed::BoxedTransport;
38use crate::builder::GrpcClientBuilder;
39use crate::grpc::{
40    AsyncGrpcCall, BroadcastMode, ConfigResponse, Context, GasEstimate, GasInfo, GetTxResponse,
41    TxPriority, TxStatus, TxStatusResponse,
42};
43use crate::signer::{BoxedDocSigner, sign_tx};
44use crate::tx::TxInfo;
45use crate::{Error, Result, TxConfig};
46
47// source https://github.com/celestiaorg/celestia-core/blob/v1.43.0-tm-v0.34.35/pkg/consts/consts.go#L19
48const BLOB_TX_TYPE_ID: &str = "BLOB";
49/// Message returned on errors related to sequence
50const SEQUENCE_ERROR_PAT: &str = "account sequence mismatch, expected ";
51
52#[derive(Debug, Clone)]
53struct ChainState {
54    app_version: AppVersion,
55    chain_id: Id,
56}
57
58#[derive(Debug)]
59pub(crate) struct AccountState {
60    base: OnceCell<Mutex<BaseAccount>>,
61    pubkey: VerifyingKey,
62    signer: BoxedDocSigner,
63}
64
65#[derive(Debug)]
66struct AccountGuard<'a> {
67    base: MutexGuard<'a, BaseAccount>,
68    pubkey: &'a VerifyingKey,
69    signer: &'a BoxedDocSigner,
70}
71
72impl AccountState {
73    pub fn new(pubkey: VerifyingKey, signer: BoxedDocSigner) -> Self {
74        Self {
75            base: OnceCell::new(),
76            pubkey,
77            signer,
78        }
79    }
80}
81
82/// A transaction that was broadcasted
83#[derive(Debug)]
84struct BroadcastedTx {
85    /// Broadcasted bytes
86    tx: Vec<u8>,
87    /// Transaction hash
88    hash: Hash,
89    /// Transaction sequence
90    sequence: u64,
91}
92
93/// gRPC client for the Celestia network
94///
95/// Under the hood, this struct wraps tonic and does type conversion
96#[derive(Clone)]
97pub struct GrpcClient {
98    inner: Arc<GrpcClientInner>,
99}
100
101struct GrpcClientInner {
102    transport: BoxedTransport,
103    account: Option<AccountState>,
104    chain_state: OnceCell<ChainState>,
105    context: Context,
106}
107
108impl GrpcClient {
109    /// Create a new client wrapping given transport
110    pub(crate) fn new(
111        transport: BoxedTransport,
112        account: Option<AccountState>,
113        context: Context,
114    ) -> Self {
115        Self {
116            inner: Arc::new(GrpcClientInner {
117                transport,
118                account,
119                chain_state: OnceCell::new(),
120                context,
121            }),
122        }
123    }
124
125    /// Create a builder for [`GrpcClient`] connected to `url`
126    pub fn builder() -> GrpcClientBuilder {
127        GrpcClientBuilder::new()
128    }
129
130    // cosmos.auth
131
132    /// Get auth params
133    #[grpc_method(AuthQueryClient::params)]
134    fn get_auth_params(&self) -> AsyncGrpcCall<AuthParams>;
135
136    /// Get account
137    #[grpc_method(AuthQueryClient::account)]
138    fn get_account(&self, account: &AccAddress) -> AsyncGrpcCall<Account>;
139
140    /// Get accounts
141    #[grpc_method(AuthQueryClient::accounts)]
142    fn get_accounts(&self) -> AsyncGrpcCall<Vec<Account>>;
143
144    // cosmos.bank
145
146    /// Retrieves the verified Celestia coin balance for the address.
147    ///
148    /// # Notes
149    ///
150    /// This returns the verified balance which is the one that was reported by
151    /// the previous network block. In other words, if you transfer some coins,
152    /// you need to wait 1 more block in order to see the new balance. If you want
153    /// something more immediate then use [`GrpcClient::get_balance`].
154    pub fn get_verified_balance(
155        &self,
156        address: &Address,
157        header: &ExtendedHeader,
158    ) -> AsyncGrpcCall<Coin> {
159        let this = self.clone();
160        let address = *address;
161        let header = header.clone();
162
163        AsyncGrpcCall::new(move |context| async move {
164            this.get_verified_balance_impl(&address, &header, &context)
165                .await
166        })
167        .context(&self.inner.context)
168    }
169
170    /// Retrieves the Celestia coin balance for the given address.
171    #[grpc_method(BankQueryClient::balance)]
172    fn get_balance(&self, address: &Address, denom: impl Into<String>) -> AsyncGrpcCall<Coin>;
173
174    /// Get balance of all coins
175    #[grpc_method(BankQueryClient::all_balances)]
176    fn get_all_balances(&self, address: &Address) -> AsyncGrpcCall<Vec<Coin>>;
177
178    /// Get balance of all spendable coins
179    #[grpc_method(BankQueryClient::spendable_balances)]
180    fn get_spendable_balances(&self, address: &Address) -> AsyncGrpcCall<Vec<Coin>>;
181
182    /// Get total supply
183    #[grpc_method(BankQueryClient::total_supply)]
184    fn get_total_supply(&self) -> AsyncGrpcCall<Vec<Coin>>;
185
186    // cosmos.base.node
187
188    /// Get node configuration
189    #[grpc_method(ConfigServiceClient::config)]
190    fn get_node_config(&self) -> AsyncGrpcCall<ConfigResponse>;
191
192    // cosmos.base.tendermint
193
194    /// Get latest block
195    #[grpc_method(TendermintServiceClient::get_latest_block)]
196    fn get_latest_block(&self) -> AsyncGrpcCall<Block>;
197
198    /// Get block by height
199    #[grpc_method(TendermintServiceClient::get_block_by_height)]
200    fn get_block_by_height(&self, height: i64) -> AsyncGrpcCall<Block>;
201
202    /// Issue a direct ABCI query to the application
203    #[grpc_method(TendermintServiceClient::abci_query)]
204    fn abci_query(
205        &self,
206        data: impl AsRef<[u8]>,
207        path: impl Into<String>,
208        height: u64,
209        prove: bool,
210    ) -> AsyncGrpcCall<AbciQueryResponse>;
211
212    // cosmos.tx
213
214    /// Broadcast prepared and serialised transaction
215    #[grpc_method(TxServiceClient::broadcast_tx)]
216    fn broadcast_tx(&self, tx_bytes: Vec<u8>, mode: BroadcastMode) -> AsyncGrpcCall<TxResponse>;
217
218    /// Get Tx
219    #[grpc_method(TxServiceClient::get_tx)]
220    fn get_tx(&self, hash: Hash) -> AsyncGrpcCall<GetTxResponse>;
221
222    /// Broadcast prepared and serialised transaction
223    #[grpc_method(TxServiceClient::simulate)]
224    fn simulate(&self, tx_bytes: Vec<u8>) -> AsyncGrpcCall<GasInfo>;
225
226    // cosmos.staking
227
228    /// Retrieves the delegation information between a delegator and a validator
229    // TODO: Expose this to JS and  UniFFI
230    #[grpc_method(StakingQueryClient::delegation)]
231    fn query_delegation(
232        &self,
233        delegator_address: &AccAddress,
234        validator_address: &ValAddress,
235    ) -> AsyncGrpcCall<QueryDelegationResponse>;
236
237    /// Retrieves the unbonding status between a delegator and a validator
238    // TODO: Expose this to JS and  UniFFI
239    #[grpc_method(StakingQueryClient::unbonding_delegation)]
240    fn query_unbonding(
241        &self,
242        delegator_address: &AccAddress,
243        validator_address: &ValAddress,
244    ) -> AsyncGrpcCall<QueryUnbondingDelegationResponse>;
245
246    /// Retrieves the status of the redelegations between a delegator and a validator
247    // TODO: Expose this to JS and  UniFFI
248    #[grpc_method(StakingQueryClient::redelegations)]
249    fn query_redelegations(
250        &self,
251        delegator_address: &AccAddress,
252        src_validator_address: &ValAddress,
253        dest_validator_address: &ValAddress,
254        pagination: Option<PageRequest>,
255    ) -> AsyncGrpcCall<QueryRedelegationsResponse>;
256
257    // celestia.blob
258
259    /// Get blob params
260    #[grpc_method(BlobQueryClient::params)]
261    fn get_blob_params(&self) -> AsyncGrpcCall<BlobParams>;
262
263    // celestia.core.tx
264
265    /// Get status of the transaction
266    #[grpc_method(TxStatusClient::tx_status)]
267    fn tx_status(&self, hash: Hash) -> AsyncGrpcCall<TxStatusResponse>;
268
269    // celestia.core.gas_estimation
270
271    /// Estimate gas price for given transaction priority based
272    /// on the gas prices of the transactions in the last five blocks.
273    ///
274    /// If no transaction is found in the last five blocks, return the network
275    /// min gas price.
276    #[grpc_method(GasEstimatorClient::estimate_gas_price)]
277    fn estimate_gas_price(&self, priority: TxPriority) -> AsyncGrpcCall<f64>;
278
279    /// Estimate gas price for transaction with given priority and estimate gas usage
280    /// for the provided serialised transaction.
281    ///
282    /// The gas price estimation is based on the gas prices of the transactions
283    /// in the last five blocks.
284    /// If no transaction is found in the last five blocks, return the network
285    /// min gas price.
286    ///
287    /// The gas used is estimated using the state machine simulation.
288    #[grpc_method(GasEstimatorClient::estimate_gas_price_and_usage)]
289    fn estimate_gas_price_and_usage(
290        &self,
291        priority: TxPriority,
292        tx_bytes: Vec<u8>,
293    ) -> AsyncGrpcCall<GasEstimate>;
294
295    /// Submit given message to celestia network.
296    ///
297    /// # Example
298    /// ```no_run
299    /// # async fn docs() {
300    /// use celestia_grpc::{GrpcClient, TxConfig};
301    /// use celestia_proto::cosmos::bank::v1beta1::MsgSend;
302    /// use celestia_types::state::{Address, Coin};
303    /// use tendermint::crypto::default::ecdsa_secp256k1::SigningKey;
304    ///
305    /// let signing_key = SigningKey::random(&mut rand::rngs::OsRng);
306    /// let address = Address::from_account_verifying_key(*signing_key.verifying_key());
307    /// let grpc_url = "public-celestia-mocha4-consensus.numia.xyz:9090";
308    ///
309    /// let tx_client = GrpcClient::builder()
310    ///     .url(grpc_url)
311    ///     .signer_keypair(signing_key)
312    ///     .build()
313    ///     .unwrap();
314    ///
315    /// let msg = MsgSend {
316    ///     from_address: address.to_string(),
317    ///     to_address: "celestia169s50psyj2f4la9a2235329xz7rk6c53zhw9mm".to_string(),
318    ///     amount: vec![Coin::utia(12345).into()],
319    /// };
320    ///
321    /// tx_client
322    ///     .submit_message(msg.clone(), TxConfig::default())
323    ///     .await
324    ///     .unwrap();
325    /// # }
326    /// ```
327    pub fn submit_message<M>(&self, message: M, cfg: TxConfig) -> AsyncGrpcCall<TxInfo>
328    where
329        M: IntoProtobufAny + Send + 'static,
330    {
331        let this = self.clone();
332
333        AsyncGrpcCall::new(move |context| async move {
334            this.submit_message_impl(message, cfg, &context).await
335        })
336        .context(&self.inner.context)
337    }
338
339    /// Submit given blobs to celestia network.
340    ///
341    /// # Example
342    /// ```no_run
343    /// # async fn docs() {
344    /// use celestia_grpc::{GrpcClient, TxConfig};
345    /// use celestia_types::state::{Address, Coin};
346    /// use celestia_types::{AppVersion, Blob};
347    /// use celestia_types::nmt::Namespace;
348    /// use tendermint::crypto::default::ecdsa_secp256k1::SigningKey;
349    ///
350    /// let signing_key = SigningKey::random(&mut rand::rngs::OsRng);
351    /// let address = Address::from_account_verifying_key(*signing_key.verifying_key());
352    /// let grpc_url = "public-celestia-mocha4-consensus.numia.xyz:9090";
353    ///
354    /// let tx_client = GrpcClient::builder()
355    ///     .url(grpc_url)
356    ///     .signer_keypair(signing_key)
357    ///     .build()
358    ///     .unwrap();
359    ///
360    /// let ns = Namespace::new_v0(b"abcd").unwrap();
361    /// let blob = Blob::new(ns, "some data".into(), None, AppVersion::V3).unwrap();
362    ///
363    /// tx_client
364    ///     .submit_blobs(&[blob], TxConfig::default())
365    ///     .await
366    ///     .unwrap();
367    /// # }
368    /// ```
369    pub fn submit_blobs(&self, blobs: &[Blob], cfg: TxConfig) -> AsyncGrpcCall<TxInfo> {
370        let this = self.clone();
371        let blobs = blobs.to_vec();
372
373        AsyncGrpcCall::new(move |context| async move {
374            this.submit_blobs_impl(&blobs, cfg, &context).await
375        })
376        .context(&self.inner.context)
377    }
378
379    /// Get client's app version
380    pub fn app_version(&self) -> AsyncGrpcCall<AppVersion> {
381        let this = self.clone();
382
383        AsyncGrpcCall::new(move |context| async move {
384            let ChainState { app_version, .. } = this.load_chain_state(&context).await?;
385            Ok(*app_version)
386        })
387        .context(&self.inner.context)
388    }
389
390    /// Get client's chain id
391    pub fn chain_id(&self) -> AsyncGrpcCall<Id> {
392        let this = self.clone();
393
394        AsyncGrpcCall::new(move |context| async move {
395            let ChainState { chain_id, .. } = this.load_chain_state(&context).await?;
396            Ok(chain_id.clone())
397        })
398        .context(&self.inner.context)
399    }
400
401    /// Get client's account public key if the signer is set
402    pub fn get_account_pubkey(&self) -> Option<VerifyingKey> {
403        self.account().map(|account| account.pubkey).ok()
404    }
405
406    /// Get client's account address if the signer is set
407    pub fn get_account_address(&self) -> Option<AccAddress> {
408        self.get_account_pubkey().map(Into::into)
409    }
410}
411
412impl GrpcClient {
413    async fn get_verified_balance_impl(
414        &self,
415        address: &Address,
416        header: &ExtendedHeader,
417        context: &Context,
418    ) -> Result<Coin> {
419        // construct the key for querying account balance from bank state
420        let mut prefixed_account_key = Vec::with_capacity(1 + 1 + appconsts::SIGNER_SIZE + 4);
421        prefixed_account_key.push(0x02); // balances prefix
422        prefixed_account_key.push(address.as_bytes().len() as u8); // address length
423        prefixed_account_key.extend_from_slice(address.as_bytes()); // address
424        prefixed_account_key.extend_from_slice(BOND_DENOM.as_bytes()); // denom
425
426        // abci queries are possible only from 2nd block, but we can rely on the node to
427        // return an error if that is the case. We only want to prevent height == 0 here,
428        // because that will make node interpret that as a network head. It would also fail,
429        // on proof verification, but it would be harder to debug as the message would just
430        // say that computed root is different than expected.
431        let height = 1.max(header.height().value().saturating_sub(1));
432
433        let response = self
434            .abci_query(&prefixed_account_key, "store/bank/key", height, true)
435            .context(context)
436            .await?;
437        if response.code != ErrorCode::Success {
438            return Err(Error::AbciQuery(response.code, response.log));
439        }
440
441        // If account doesn't exist yet, just return 0
442        if response.value.is_empty() {
443            return Ok(Coin::utia(0));
444        }
445
446        // NOTE: don't put `ProofChain` directly in the AbciQueryResponse, because
447        // it supports only small subset of proofs that are required for the balance
448        // queries
449        let proof: ProofChain = response.proof_ops.unwrap_or_default().try_into()?;
450        proof.verify_membership(
451            &header.header.app_hash,
452            [prefixed_account_key.as_slice(), b"bank"],
453            &response.value,
454        )?;
455
456        let amount = std::str::from_utf8(&response.value)
457            .map_err(|_| Error::FailedToParseResponse)?
458            .parse()
459            .map_err(|_| Error::FailedToParseResponse)?;
460
461        Ok(Coin::utia(amount))
462    }
463
464    async fn submit_message_impl<M>(
465        &self,
466        message: M,
467        cfg: TxConfig,
468        context: &Context,
469    ) -> Result<TxInfo>
470    where
471        M: IntoProtobufAny,
472    {
473        let tx_body = RawTxBody {
474            messages: vec![message.into_any()],
475            memo: cfg.memo.clone().unwrap_or_default(),
476            ..RawTxBody::default()
477        };
478
479        let tx = self
480            .sign_and_broadcast_tx(tx_body, cfg.clone(), context)
481            .await?;
482
483        self.confirm_tx(tx, cfg, context).await
484    }
485
486    async fn submit_blobs_impl(
487        &self,
488        blobs: &[Blob],
489        cfg: TxConfig,
490        context: &Context,
491    ) -> Result<TxInfo> {
492        if blobs.is_empty() {
493            return Err(Error::TxEmptyBlobList);
494        }
495        let app_version = self.app_version().await?;
496        for blob in blobs {
497            blob.validate(app_version)?;
498        }
499
500        let tx = self
501            .sign_and_broadcast_blobs(blobs.to_vec(), cfg.clone(), context)
502            .await?;
503
504        self.confirm_tx(tx, cfg, context).await
505    }
506
507    async fn load_chain_state(&self, context: &Context) -> Result<&ChainState> {
508        self.inner
509            .chain_state
510            .get_or_try_init(|| async {
511                let block = self.get_latest_block().context(context).await?;
512                let app_version = block.header.version.app;
513                let app_version = AppVersion::from_u64(app_version)
514                    .ok_or(celestia_types::Error::UnsupportedAppVersion(app_version))?;
515                let chain_id = block.header.chain_id;
516
517                Ok::<_, Error>(ChainState {
518                    app_version,
519                    chain_id,
520                })
521            })
522            .await
523    }
524
525    fn account(&self) -> Result<&AccountState> {
526        self.inner.account.as_ref().ok_or(Error::MissingSigner)
527    }
528
529    async fn lock_account(&self, context: &Context) -> Result<AccountGuard<'_>> {
530        let account = self.account()?;
531
532        let base = account
533            .base
534            .get_or_try_init(|| async {
535                let address = AccAddress::from(account.pubkey);
536                let account = self.get_account(&address).context(context).await?;
537                Ok::<_, Error>(Mutex::new(account.into()))
538            })
539            .await?
540            .lock()
541            .await;
542
543        Ok(AccountGuard {
544            base,
545            pubkey: &account.pubkey,
546            signer: &account.signer,
547        })
548    }
549
550    /// compute gas limit and gas price according to provided TxConfig for serialised
551    /// transaction, potentially calling gas estimation service
552    async fn calculate_transaction_gas_params(
553        &self,
554        tx_body: &RawTxBody,
555        cfg: &TxConfig,
556        chain_id: Id,
557        account: &mut AccountGuard<'_>,
558        context: &Context,
559    ) -> Result<(u64, f64)> {
560        if let Some(gas_limit) = cfg.gas_limit {
561            let gas_price = if let Some(gas_price) = cfg.gas_price {
562                gas_price
563            } else {
564                self.estimate_gas_price(cfg.priority)
565                    .context(context)
566                    .await?
567            };
568
569            return Ok((gas_limit, gas_price));
570        }
571
572        let tx = sign_tx(
573            tx_body.clone(),
574            chain_id.clone(),
575            &account.base,
576            account.pubkey,
577            account.signer,
578            0,
579            1,
580        )
581        .await?;
582
583        let GasEstimate { price, usage } = self
584            .estimate_gas_price_and_usage(cfg.priority, tx.encode_to_vec())
585            .context(context)
586            .await?;
587
588        Ok((usage, cfg.gas_price.unwrap_or(price)))
589    }
590
591    async fn sign_and_broadcast_blobs(
592        &self,
593        blobs: Vec<Blob>,
594        cfg: TxConfig,
595        context: &Context,
596    ) -> Result<BroadcastedTx> {
597        let ChainState { chain_id, .. } = self.load_chain_state(context).await?;
598        // lock the account; tx signing and broadcast must be atomic
599        // because node requires all transactions to be sequenced by account.sequence
600        let mut account = self.lock_account(context).await?;
601
602        let pfb = MsgPayForBlobs::new(&blobs, account.base.address)?;
603        let pfb = RawTxBody {
604            messages: vec![RawMsgPayForBlobs::from(pfb).into_any()],
605            memo: cfg.memo.clone().unwrap_or_default(),
606            ..RawTxBody::default()
607        };
608        let blobs: Vec<_> = blobs.into_iter().map(Into::into).collect();
609
610        // in case parallel submission failed because of the sequence mismatch,
611        // we update the account, resign and broadcast transaction
612        loop {
613            let res = self
614                .calculate_transaction_gas_params(
615                    &pfb,
616                    &cfg,
617                    chain_id.clone(),
618                    &mut account,
619                    context,
620                )
621                .await;
622
623            // if gas limit is not provided, we simulate the transaction
624            // which can also signal that sequence is wrong
625            if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
626                account.base.sequence = new_sequence?;
627                continue;
628            }
629
630            let (gas_limit, gas_price) = res?;
631            let fee = (gas_limit as f64 * gas_price).ceil() as u64;
632
633            let tx = sign_tx(
634                pfb.clone(),
635                chain_id.clone(),
636                &account.base,
637                account.pubkey,
638                account.signer,
639                gas_limit,
640                fee,
641            )
642            .await?;
643
644            let blob_tx = RawBlobTx {
645                tx: tx.encode_to_vec(),
646                blobs: blobs.clone(),
647                type_id: BLOB_TX_TYPE_ID.to_string(),
648            }
649            .encode_to_vec();
650
651            let res = self
652                .broadcast_tx_with_account(blob_tx, &cfg, &mut account, context)
653                .await;
654
655            if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
656                account.base.sequence = new_sequence?;
657                continue;
658            }
659
660            break res;
661        }
662    }
663
664    async fn sign_and_broadcast_tx(
665        &self,
666        tx: RawTxBody,
667        cfg: TxConfig,
668        context: &Context,
669    ) -> Result<BroadcastedTx> {
670        let ChainState { chain_id, .. } = self.load_chain_state(context).await?;
671
672        // lock the account; tx signing and broadcast must be atomic
673        // because node requires all transactions to be sequenced by account.sequence
674        let mut account = self.lock_account(context).await?;
675
676        // in case parallel submission failed because of the sequence mismatch,
677        // we update the account, resign and broadcast transaction
678        loop {
679            let res = self
680                .calculate_transaction_gas_params(
681                    &tx,
682                    &cfg,
683                    chain_id.clone(),
684                    &mut account,
685                    context,
686                )
687                .await;
688
689            // if gas limit is not provided, we simulate the transaction
690            // which can also signal that sequence is wrong
691            if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
692                account.base.sequence = new_sequence?;
693                continue;
694            }
695
696            let (gas_limit, gas_price) = res?;
697            let fee = (gas_limit as f64 * gas_price).ceil() as u64;
698
699            let tx = sign_tx(
700                tx.clone(),
701                chain_id.clone(),
702                &account.base,
703                account.pubkey,
704                account.signer,
705                gas_limit,
706                fee,
707            )
708            .await?;
709
710            let res = self
711                .broadcast_tx_with_account(tx.encode_to_vec(), &cfg, &mut account, context)
712                .await;
713
714            if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
715                account.base.sequence = new_sequence?;
716                continue;
717            }
718
719            break res;
720        }
721    }
722
723    async fn broadcast_tx_with_account(
724        &self,
725        tx: Vec<u8>,
726        cfg: &TxConfig,
727        account: &mut AccountGuard<'_>,
728        context: &Context,
729    ) -> Result<BroadcastedTx> {
730        let resp = self.broadcast_tx_with_cfg(tx.clone(), cfg, context).await?;
731
732        let sequence = account.base.sequence;
733        account.base.sequence += 1;
734
735        Ok(BroadcastedTx {
736            tx,
737            hash: resp.txhash,
738            sequence,
739        })
740    }
741
742    async fn broadcast_tx_with_cfg(
743        &self,
744        tx: Vec<u8>,
745        cfg: &TxConfig,
746        context: &Context,
747    ) -> Result<TxResponse> {
748        let resp = self
749            .broadcast_tx(tx, BroadcastMode::Sync)
750            .context(context)
751            .await?;
752
753        if resp.code != ErrorCode::Success {
754            // if transaction failed due to insufficient fee, include info
755            // whether gas price was estimated or explicitely set
756            let message = if resp.code == ErrorCode::InsufficientFee {
757                if cfg.gas_price.is_some() {
758                    format!("Gas price was set via config. {}", resp.raw_log)
759                } else {
760                    format!("Gas price was estimated. {}", resp.raw_log)
761                }
762            } else {
763                resp.raw_log
764            };
765
766            Err(Error::TxBroadcastFailed(resp.txhash, resp.code, message))
767        } else {
768            Ok(resp)
769        }
770    }
771
772    async fn confirm_tx(
773        &self,
774        tx: BroadcastedTx,
775        cfg: TxConfig,
776        context: &Context,
777    ) -> Result<TxInfo> {
778        let BroadcastedTx { tx, hash, sequence } = tx;
779        let mut interval = Interval::new(Duration::from_millis(500));
780
781        loop {
782            let tx_status = self.tx_status(hash).context(context).await?;
783            match tx_status.status {
784                TxStatus::Pending => interval.tick().await,
785                TxStatus::Committed => {
786                    if tx_status.execution_code == ErrorCode::Success {
787                        return Ok(TxInfo {
788                            hash,
789                            height: tx_status.height,
790                        });
791                    } else {
792                        return Err(Error::TxExecutionFailed(
793                            hash,
794                            tx_status.execution_code,
795                            tx_status.error,
796                        ));
797                    }
798                }
799                // If some transaction was rejected when creating a block, then it means that its
800                // sequence wasn't used. This will cause all the following transactions in the
801                // same block to also be rejected due to the sequence mismatch.
802                // To recover, we roll back the sequence to the one of rejected transaction, but
803                // only if rejection didn't happen due to the sequence mismatch. That way we should
804                // always revert the sequence to the one of the first rejected transaction.
805                TxStatus::Rejected => {
806                    if !is_wrong_sequence(tx_status.execution_code) {
807                        let mut acc = self.lock_account(context).await?;
808                        acc.base.sequence = sequence;
809                    }
810
811                    return Err(Error::TxRejected(
812                        hash,
813                        tx_status.execution_code,
814                        tx_status.error,
815                    ));
816                }
817                // Evicted transaction should be retransmitted without ever trying to re-sign it
818                // to prevent double spending, because some nodes may have it in a mempool and some not.
819                // If we would re-sign it and both old and new one are confirmed, it'd double spend.
820                // https://github.com/celestiaorg/celestia-app/pull/5783#discussion_r2360546232
821                TxStatus::Evicted => {
822                    if self
823                        .broadcast_tx_with_cfg(tx.clone(), &cfg, context)
824                        .await
825                        .is_err()
826                    {
827                        // Go version of TxClient starts a one minute timeout before it returns an eviction
828                        // error, to avoid user re-submitting the failed transaction right away.
829                        //
830                        // The reason for it is that Go client supports multi-endpoint, sending tx
831                        // to multiple consensus nodes at once, each node having its own mempool.
832                        // One of them could evict the tx from the mempool, but the tx may still be
833                        // proposed and confirmed by other nodes which didn't evict it.
834                        //
835                        // In that case, if client would return eviction error right away, the user could
836                        // see the error, eagerly resubmit the transaction, and have both old and new
837                        // one succeed, resulting in double spending.
838                        //
839                        // Since we don't support multi-endpoint in our client yet, we can just return
840                        // an error right away, as no other consensus node will try to include the transaction.
841                        return Err(Error::TxEvicted(hash));
842                    }
843                }
844                // this case should never happen for node that accepted a broadcast
845                // however we handle it the same as evicted for extra safety
846                TxStatus::Unknown => {
847                    let mut acc = self.lock_account(context).await?;
848                    acc.base.sequence = sequence;
849                    return Err(Error::TxNotFound(hash));
850                }
851            }
852        }
853    }
854}
855
856impl fmt::Debug for GrpcClient {
857    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858        f.write_str("GrpcClient { .. }")
859    }
860}
861
862fn is_wrong_sequence(code: ErrorCode) -> bool {
863    code == ErrorCode::InvalidSequence || code == ErrorCode::WrongSequence
864}
865
866/// Returns Some if error is related to the wrong sequence.
867/// Inner result is the outcome of the parsing operation.
868fn extract_sequence_on_mismatch(err: &Error) -> Option<Result<u64>> {
869    let msg = match err {
870        Error::TonicError(status) if status.message().contains(SEQUENCE_ERROR_PAT) => {
871            status.message()
872        }
873        Error::TxBroadcastFailed(_, code, message) if is_wrong_sequence(*code) => message.as_str(),
874        _ => return None,
875    };
876
877    Some(extract_sequence(msg))
878}
879
880fn extract_sequence(msg: &str) -> Result<u64> {
881    // get the `{expected_sequence}, {rest of the message}` part
882    let (_, msg_with_sequence) = msg
883        .split_once(SEQUENCE_ERROR_PAT)
884        .ok_or_else(|| Error::SequenceParsingFailed(msg.into()))?;
885
886    // drop the comma and rest of the message
887    let (sequence, _) = msg_with_sequence
888        .split_once(',')
889        .ok_or_else(|| Error::SequenceParsingFailed(msg.into()))?;
890
891    sequence
892        .parse()
893        .map_err(|_| Error::SequenceParsingFailed(msg.into()))
894}
895
896#[cfg(test)]
897mod tests {
898    use std::future::IntoFuture;
899    use std::ops::RangeInclusive;
900    use std::sync::Arc;
901
902    use celestia_proto::cosmos::bank::v1beta1::MsgSend;
903    use celestia_rpc::HeaderClient;
904    use celestia_types::nmt::Namespace;
905    use celestia_types::state::{Coin, ErrorCode};
906    use celestia_types::{AppVersion, Blob};
907    use futures::FutureExt;
908    use lumina_utils::test_utils::async_test;
909    use rand::{Rng, RngCore};
910
911    use super::GrpcClient;
912    use crate::grpc::Context;
913    use crate::test_utils::{
914        TestAccount, load_account, new_grpc_client, new_rpc_client, new_tx_client, spawn,
915    };
916    use crate::{Error, TxConfig};
917
918    #[async_test]
919    async fn extending_client_context() {
920        let client = GrpcClient::builder()
921            .url("http://foo")
922            .metadata("x-token", "secret-token")
923            .build()
924            .unwrap();
925        let call = client.app_version().block_height(1234);
926
927        assert!(call.context.metadata.contains_key("x-token"));
928        assert!(call.context.metadata.contains_key("x-cosmos-block-height"));
929    }
930
931    #[async_test]
932    async fn get_auth_params() {
933        let client = new_grpc_client();
934        let params = client.get_auth_params().await.unwrap();
935        assert!(params.max_memo_characters > 0);
936        assert!(params.tx_sig_limit > 0);
937        assert!(params.tx_size_cost_per_byte > 0);
938        assert!(params.sig_verify_cost_ed25519 > 0);
939        assert!(params.sig_verify_cost_secp256k1 > 0);
940    }
941
942    #[async_test]
943    async fn get_account() {
944        let client = new_grpc_client();
945
946        let accounts = client.get_accounts().await.unwrap();
947
948        let first_account = accounts.first().expect("account to exist");
949        let account = client.get_account(&first_account.address).await.unwrap();
950
951        assert_eq!(&account, first_account);
952    }
953
954    #[async_test]
955    async fn get_balance() {
956        let account = load_account();
957        let client = new_grpc_client();
958
959        let coin = client.get_balance(&account.address, "utia").await.unwrap();
960        assert_eq!("utia", coin.denom());
961        assert!(coin.amount() > 0);
962
963        let all_coins = client.get_all_balances(&account.address).await.unwrap();
964        assert!(!all_coins.is_empty());
965        assert!(all_coins.iter().map(|c| c.amount()).sum::<u64>() > 0);
966
967        let spendable_coins = client
968            .get_spendable_balances(&account.address)
969            .await
970            .unwrap();
971        assert!(!spendable_coins.is_empty());
972        assert!(spendable_coins.iter().map(|c| c.amount()).sum::<u64>() > 0);
973
974        let total_supply = client.get_total_supply().await.unwrap();
975        assert!(!total_supply.is_empty());
976        assert!(total_supply.iter().map(|c| c.amount()).sum::<u64>() > 0);
977    }
978
979    #[async_test]
980    async fn get_verified_balance() {
981        let client = new_grpc_client();
982        let account = load_account();
983
984        let jrpc_client = new_rpc_client().await;
985
986        let (head, expected_balance) = tokio::join!(
987            jrpc_client.header_network_head().map(Result::unwrap),
988            client
989                .get_balance(&account.address, "utia")
990                .into_future()
991                .map(Result::unwrap)
992        );
993
994        // trustless balance queries represent state at header.height - 1, so
995        // we need to wait for a new head to compare it with the expected balance
996        let head = jrpc_client
997            .header_wait_for_height(head.height().value() + 1)
998            .await
999            .unwrap();
1000
1001        let verified_balance = client
1002            .get_verified_balance(&account.address, &head)
1003            .await
1004            .unwrap();
1005
1006        assert_eq!(expected_balance, verified_balance);
1007    }
1008
1009    #[async_test]
1010    async fn get_verified_balance_not_funded_account() {
1011        let client = new_grpc_client();
1012        let account = TestAccount::random();
1013
1014        let jrpc_client = new_rpc_client().await;
1015        let head = jrpc_client.header_network_head().await.unwrap();
1016
1017        let verified_balance = client
1018            .get_verified_balance(&account.address, &head)
1019            .await
1020            .unwrap();
1021
1022        assert_eq!(Coin::utia(0), verified_balance);
1023    }
1024
1025    #[async_test]
1026    async fn get_node_config() {
1027        let client = new_grpc_client();
1028        let config = client.get_node_config().await.unwrap();
1029
1030        // we don't set any explicit value for it in the config
1031        // so it should be empty since v6
1032        assert!(config.minimum_gas_price.is_none());
1033    }
1034
1035    #[async_test]
1036    async fn get_block() {
1037        let client = new_grpc_client();
1038
1039        let latest_block = client.get_latest_block().await.unwrap();
1040        let height = latest_block.header.height.value() as i64;
1041
1042        let block = client.get_block_by_height(height).await.unwrap();
1043        assert_eq!(block.header, latest_block.header);
1044    }
1045
1046    #[async_test]
1047    async fn get_blob_params() {
1048        let client = new_grpc_client();
1049        let params = client.get_blob_params().await.unwrap();
1050        assert!(params.gas_per_blob_byte > 0);
1051        assert!(params.gov_max_square_size > 0);
1052    }
1053
1054    #[async_test]
1055    async fn query_state_at_block_height_with_metadata() {
1056        let (_lock, tx_client) = new_tx_client().await;
1057
1058        let tx = tx_client
1059            .submit_blobs(&[random_blob(10..=1000)], TxConfig::default())
1060            .await
1061            .unwrap();
1062
1063        let addr = tx_client.get_account_address().unwrap().into();
1064        let new_balance = tx_client.get_balance(&addr, "utia").await.unwrap();
1065        let old_balance = tx_client
1066            .get_balance(&addr, "utia")
1067            .block_height(tx.height.value() - 1)
1068            .await
1069            .unwrap();
1070
1071        assert!(new_balance.amount() < old_balance.amount());
1072    }
1073
1074    #[async_test]
1075    async fn submit_and_get_tx() {
1076        let (_lock, tx_client) = new_tx_client().await;
1077
1078        let tx = tx_client
1079            .submit_blobs(
1080                &[random_blob(10..=1000)],
1081                TxConfig::default().with_memo("foo"),
1082            )
1083            .await
1084            .unwrap();
1085        let tx2 = tx_client.get_tx(tx.hash).await.unwrap();
1086
1087        assert_eq!(tx.hash, tx2.tx_response.txhash);
1088        assert_eq!(tx2.tx.body.memo, "foo");
1089    }
1090
1091    #[async_test]
1092    async fn parallel_submission() {
1093        let (_lock, tx_client) = new_tx_client().await;
1094        let tx_client = Arc::new(tx_client);
1095
1096        let futs = (0..100)
1097            .map(|_| {
1098                let tx_client = tx_client.clone();
1099                spawn(async move {
1100                    let response = if rand::random() {
1101                        tx_client
1102                            .submit_blobs(&[random_blob(10..=10000)], TxConfig::default())
1103                            .await
1104                    } else {
1105                        tx_client
1106                            .submit_message(random_transfer(&tx_client), TxConfig::default())
1107                            .await
1108                    };
1109
1110                    match response {
1111                        Ok(_) => (),
1112                        // some wrong sequence errors are still expected until we implement
1113                        // multi-account submission
1114                        Err(Error::TxRejected(_, ErrorCode::WrongSequence, _)) => {}
1115                        err => panic!("{err:?}"),
1116                    }
1117                })
1118            })
1119            .collect::<Vec<_>>();
1120
1121        for fut in futs {
1122            fut.await.unwrap();
1123        }
1124    }
1125
1126    #[async_test]
1127    async fn updating_sequence_and_resigning() {
1128        let (_lock, tx_client) = new_tx_client().await;
1129
1130        // submit blob without providing gas limit, this will fail on gas estimation
1131        // with a dummy tx with invalid sequence
1132        invalidate_sequence(&tx_client).await;
1133        tx_client
1134            .submit_blobs(&[random_blob(10..=1000)], TxConfig::default())
1135            .await
1136            .unwrap();
1137
1138        // submit blob with provided gas limit, this will fail on broadcasting the tx
1139        invalidate_sequence(&tx_client).await;
1140        tx_client
1141            .submit_blobs(
1142                &[random_blob(10..=1000)],
1143                TxConfig::default().with_gas_limit(100000),
1144            )
1145            .await
1146            .unwrap();
1147
1148        // submit message without providing gas limit, this will fail on gas estimation
1149        // with a dummy tx with invalid sequence
1150        invalidate_sequence(&tx_client).await;
1151        tx_client
1152            .submit_message(random_transfer(&tx_client), TxConfig::default())
1153            .await
1154            .unwrap();
1155
1156        // submit message with provided gas limit, this will fail on broadcasting the tx
1157        invalidate_sequence(&tx_client).await;
1158        tx_client
1159            .submit_message(
1160                random_transfer(&tx_client),
1161                TxConfig::default().with_gas_limit(100000),
1162            )
1163            .await
1164            .unwrap();
1165    }
1166
1167    #[cfg(not(target_arch = "wasm32"))]
1168    #[tokio::test]
1169    async fn retransmit_evicted() {
1170        use tokio::task::JoinSet;
1171
1172        use crate::grpc::TxStatus;
1173
1174        const EVICTION_TESTING_VAL_URL: &str = "http://localhost:29090";
1175
1176        let account = load_account();
1177        let client = GrpcClient::builder()
1178            .url(EVICTION_TESTING_VAL_URL)
1179            .signer_keypair(account.signing_key)
1180            .build()
1181            .unwrap();
1182
1183        let txs = (0..10)
1184            .map(|_| {
1185                let client = client.clone();
1186                async move {
1187                    let blobs = (0..2).map(|_| random_blob(500000..=500000)).collect();
1188                    client
1189                        .sign_and_broadcast_blobs(blobs, TxConfig::default(), &Context::default())
1190                        .await
1191                        .unwrap()
1192                }
1193            })
1194            .collect::<JoinSet<_>>()
1195            .join_all()
1196            .await;
1197
1198        let successfully_retransmitted = txs
1199            .into_iter()
1200            .map(|tx| {
1201                let client = client.clone();
1202                async move {
1203                    // eviction happens if tx is in a mempool for an amount of blocks
1204                    // higher than the mempool.ttl-num-blocks, so we need to wait a bit
1205                    // to see correct status.
1206                    let mut status = TxStatus::Pending;
1207                    while status == TxStatus::Pending {
1208                        status = client.tx_status(tx.hash).await.unwrap().status;
1209                    }
1210
1211                    let was_evicted = status == TxStatus::Evicted;
1212
1213                    match client
1214                        .confirm_tx(tx, TxConfig::default(), &Context::default())
1215                        .await
1216                    {
1217                        // some of the retransmitted tx's will still fail because
1218                        // of sequence mismatch
1219                        Err(Error::TxEvicted(_)) => false,
1220                        res => {
1221                            res.unwrap();
1222                            was_evicted
1223                        }
1224                    }
1225                }
1226            })
1227            .collect::<JoinSet<_>>()
1228            .join_all()
1229            .await;
1230
1231        assert!(
1232            successfully_retransmitted
1233                .into_iter()
1234                .any(std::convert::identity)
1235        );
1236    }
1237
1238    #[async_test]
1239    async fn submit_blobs_insufficient_gas_price_and_limit() {
1240        let (_lock, tx_client) = new_tx_client().await;
1241
1242        let blobs = vec![random_blob(10..=1000)];
1243
1244        let err = tx_client
1245            .submit_blobs(&blobs, TxConfig::default().with_gas_limit(10000))
1246            .await
1247            .unwrap_err();
1248        assert!(matches!(
1249            err,
1250            Error::TxBroadcastFailed(_, ErrorCode::OutOfGas, _)
1251        ));
1252
1253        let err = tx_client
1254            .submit_blobs(&blobs, TxConfig::default().with_gas_price(0.0005))
1255            .await
1256            .unwrap_err();
1257        assert!(matches!(
1258            err,
1259            Error::TxBroadcastFailed(_, ErrorCode::InsufficientFee, _)
1260        ));
1261    }
1262
1263    #[async_test]
1264    async fn submit_message() {
1265        let account = load_account();
1266        let other_account = TestAccount::random();
1267        let amount = Coin::utia(12345);
1268        let (_lock, tx_client) = new_tx_client().await;
1269
1270        let msg = MsgSend {
1271            from_address: account.address.to_string(),
1272            to_address: other_account.address.to_string(),
1273            amount: vec![amount.clone().into()],
1274        };
1275
1276        tx_client
1277            .submit_message(msg, TxConfig::default())
1278            .await
1279            .unwrap();
1280
1281        let coins = tx_client
1282            .get_all_balances(&other_account.address)
1283            .await
1284            .unwrap();
1285
1286        assert_eq!(coins.len(), 1);
1287        assert_eq!(amount, coins[0]);
1288    }
1289
1290    #[async_test]
1291    async fn submit_message_insufficient_gas_price_and_limit() {
1292        let account = load_account();
1293        let other_account = TestAccount::random();
1294        let amount = Coin::utia(12345);
1295        let (_lock, tx_client) = new_tx_client().await;
1296
1297        let msg = MsgSend {
1298            from_address: account.address.to_string(),
1299            to_address: other_account.address.to_string(),
1300            amount: vec![amount.clone().into()],
1301        };
1302
1303        let err = tx_client
1304            .submit_message(msg.clone(), TxConfig::default().with_gas_limit(10000))
1305            .await
1306            .unwrap_err();
1307        assert!(matches!(
1308            err,
1309            Error::TxBroadcastFailed(_, ErrorCode::OutOfGas, _)
1310        ));
1311
1312        let err = tx_client
1313            .submit_message(msg, TxConfig::default().with_gas_price(0.0005))
1314            .await
1315            .unwrap_err();
1316        assert!(matches!(
1317            err,
1318            Error::TxBroadcastFailed(_, ErrorCode::InsufficientFee, _)
1319        ));
1320    }
1321
1322    #[async_test]
1323    async fn tx_client_is_send_and_sync() {
1324        fn is_send_and_sync<T: Send + Sync>(_: &T) {}
1325        fn is_send<T: Send>(_: &T) {}
1326
1327        let (_lock, tx_client) = new_tx_client().await;
1328        is_send_and_sync(&tx_client);
1329
1330        is_send(
1331            &tx_client
1332                .submit_blobs(&[], TxConfig::default())
1333                .into_future(),
1334        );
1335        is_send(
1336            &tx_client
1337                .submit_message(
1338                    MsgSend {
1339                        from_address: "".into(),
1340                        to_address: "".into(),
1341                        amount: vec![],
1342                    },
1343                    TxConfig::default(),
1344                )
1345                .into_future(),
1346        );
1347    }
1348
1349    fn random_blob(size: RangeInclusive<usize>) -> Blob {
1350        let rng = &mut rand::thread_rng();
1351
1352        let mut ns_bytes = vec![0u8; 10];
1353        rng.fill_bytes(&mut ns_bytes);
1354        let namespace = Namespace::new_v0(&ns_bytes).unwrap();
1355
1356        let len = rng.gen_range(size);
1357        let mut blob = vec![0; len];
1358        rng.fill_bytes(&mut blob);
1359        blob.resize(len, 1);
1360
1361        Blob::new(namespace, blob, None, AppVersion::latest()).unwrap()
1362    }
1363
1364    fn random_transfer(client: &GrpcClient) -> MsgSend {
1365        let address = client.get_account_address().unwrap();
1366        let other_account = TestAccount::random();
1367        let amount = rand::thread_rng().gen_range(10..1000);
1368
1369        MsgSend {
1370            from_address: address.to_string(),
1371            to_address: other_account.address.to_string(),
1372            amount: vec![Coin::utia(amount).into()],
1373        }
1374    }
1375
1376    async fn invalidate_sequence(client: &GrpcClient) {
1377        client
1378            .lock_account(&Context::default())
1379            .await
1380            .unwrap()
1381            .base
1382            .sequence += rand::thread_rng().gen_range(2..200);
1383    }
1384}