1use std::fmt;
2use std::sync::Arc;
3
4use ::tendermint::chain::Id;
5use celestia_types::any::IntoProtobufAny;
6use k256::ecdsa::VerifyingKey;
7use lumina_utils::time::Interval;
8use prost::Message;
9use std::time::Duration;
10use tokio::sync::{Mutex, MutexGuard, OnceCell};
11
12use celestia_grpc_macros::grpc_method;
13use celestia_proto::celestia::blob::v1::query_client::QueryClient as BlobQueryClient;
14use celestia_proto::celestia::core::v1::gas_estimation::gas_estimator_client::GasEstimatorClient;
15use celestia_proto::celestia::core::v1::tx::tx_client::TxClient as TxStatusClient;
16use celestia_proto::cosmos::auth::v1beta1::query_client::QueryClient as AuthQueryClient;
17use celestia_proto::cosmos::bank::v1beta1::query_client::QueryClient as BankQueryClient;
18use celestia_proto::cosmos::base::node::v1beta1::service_client::ServiceClient as ConfigServiceClient;
19use celestia_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient as TendermintServiceClient;
20use celestia_proto::cosmos::staking::v1beta1::query_client::QueryClient as StakingQueryClient;
21use celestia_proto::cosmos::tx::v1beta1::service_client::ServiceClient as TxServiceClient;
22use celestia_types::blob::{BlobParams, MsgPayForBlobs, RawBlobTx, RawMsgPayForBlobs};
23use celestia_types::block::Block;
24use celestia_types::consts::appconsts;
25use celestia_types::hash::Hash;
26use celestia_types::state::auth::{Account, AuthParams, BaseAccount};
27use celestia_types::state::{
28 AbciQueryResponse, PageRequest, QueryDelegationResponse, QueryRedelegationsResponse,
29 QueryUnbondingDelegationResponse, RawTxBody, ValAddress,
30};
31use celestia_types::state::{
32 AccAddress, Address, AddressTrait, BOND_DENOM, Coin, ErrorCode, TxResponse,
33};
34use celestia_types::{AppVersion, Blob, ExtendedHeader};
35
36use crate::abci_proofs::ProofChain;
37use crate::boxed::BoxedTransport;
38use crate::builder::GrpcClientBuilder;
39use crate::grpc::{
40 AsyncGrpcCall, BroadcastMode, ConfigResponse, Context, GasEstimate, GasInfo, GetTxResponse,
41 TxPriority, TxStatus, TxStatusResponse,
42};
43use crate::signer::{BoxedDocSigner, sign_tx};
44use crate::tx::TxInfo;
45use crate::{Error, Result, TxConfig};
46
47const BLOB_TX_TYPE_ID: &str = "BLOB";
49const SEQUENCE_ERROR_PAT: &str = "account sequence mismatch, expected ";
51
52#[derive(Debug, Clone)]
53struct ChainState {
54 app_version: AppVersion,
55 chain_id: Id,
56}
57
58#[derive(Debug)]
59pub(crate) struct AccountState {
60 base: OnceCell<Mutex<BaseAccount>>,
61 pubkey: VerifyingKey,
62 signer: BoxedDocSigner,
63}
64
65#[derive(Debug)]
66struct AccountGuard<'a> {
67 base: MutexGuard<'a, BaseAccount>,
68 pubkey: &'a VerifyingKey,
69 signer: &'a BoxedDocSigner,
70}
71
72impl AccountState {
73 pub fn new(pubkey: VerifyingKey, signer: BoxedDocSigner) -> Self {
74 Self {
75 base: OnceCell::new(),
76 pubkey,
77 signer,
78 }
79 }
80}
81
82#[derive(Debug)]
84struct BroadcastedTx {
85 tx: Vec<u8>,
87 hash: Hash,
89 sequence: u64,
91}
92
93#[derive(Clone)]
97pub struct GrpcClient {
98 inner: Arc<GrpcClientInner>,
99}
100
101struct GrpcClientInner {
102 transport: BoxedTransport,
103 account: Option<AccountState>,
104 chain_state: OnceCell<ChainState>,
105 context: Context,
106}
107
108impl GrpcClient {
109 pub(crate) fn new(
111 transport: BoxedTransport,
112 account: Option<AccountState>,
113 context: Context,
114 ) -> Self {
115 Self {
116 inner: Arc::new(GrpcClientInner {
117 transport,
118 account,
119 chain_state: OnceCell::new(),
120 context,
121 }),
122 }
123 }
124
125 pub fn builder() -> GrpcClientBuilder {
127 GrpcClientBuilder::new()
128 }
129
130 #[grpc_method(AuthQueryClient::params)]
134 fn get_auth_params(&self) -> AsyncGrpcCall<AuthParams>;
135
136 #[grpc_method(AuthQueryClient::account)]
138 fn get_account(&self, account: &AccAddress) -> AsyncGrpcCall<Account>;
139
140 #[grpc_method(AuthQueryClient::accounts)]
142 fn get_accounts(&self) -> AsyncGrpcCall<Vec<Account>>;
143
144 pub fn get_verified_balance(
155 &self,
156 address: &Address,
157 header: &ExtendedHeader,
158 ) -> AsyncGrpcCall<Coin> {
159 let this = self.clone();
160 let address = *address;
161 let header = header.clone();
162
163 AsyncGrpcCall::new(move |context| async move {
164 this.get_verified_balance_impl(&address, &header, &context)
165 .await
166 })
167 .context(&self.inner.context)
168 }
169
170 #[grpc_method(BankQueryClient::balance)]
172 fn get_balance(&self, address: &Address, denom: impl Into<String>) -> AsyncGrpcCall<Coin>;
173
174 #[grpc_method(BankQueryClient::all_balances)]
176 fn get_all_balances(&self, address: &Address) -> AsyncGrpcCall<Vec<Coin>>;
177
178 #[grpc_method(BankQueryClient::spendable_balances)]
180 fn get_spendable_balances(&self, address: &Address) -> AsyncGrpcCall<Vec<Coin>>;
181
182 #[grpc_method(BankQueryClient::total_supply)]
184 fn get_total_supply(&self) -> AsyncGrpcCall<Vec<Coin>>;
185
186 #[grpc_method(ConfigServiceClient::config)]
190 fn get_node_config(&self) -> AsyncGrpcCall<ConfigResponse>;
191
192 #[grpc_method(TendermintServiceClient::get_latest_block)]
196 fn get_latest_block(&self) -> AsyncGrpcCall<Block>;
197
198 #[grpc_method(TendermintServiceClient::get_block_by_height)]
200 fn get_block_by_height(&self, height: i64) -> AsyncGrpcCall<Block>;
201
202 #[grpc_method(TendermintServiceClient::abci_query)]
204 fn abci_query(
205 &self,
206 data: impl AsRef<[u8]>,
207 path: impl Into<String>,
208 height: u64,
209 prove: bool,
210 ) -> AsyncGrpcCall<AbciQueryResponse>;
211
212 #[grpc_method(TxServiceClient::broadcast_tx)]
216 fn broadcast_tx(&self, tx_bytes: Vec<u8>, mode: BroadcastMode) -> AsyncGrpcCall<TxResponse>;
217
218 #[grpc_method(TxServiceClient::get_tx)]
220 fn get_tx(&self, hash: Hash) -> AsyncGrpcCall<GetTxResponse>;
221
222 #[grpc_method(TxServiceClient::simulate)]
224 fn simulate(&self, tx_bytes: Vec<u8>) -> AsyncGrpcCall<GasInfo>;
225
226 #[grpc_method(StakingQueryClient::delegation)]
231 fn query_delegation(
232 &self,
233 delegator_address: &AccAddress,
234 validator_address: &ValAddress,
235 ) -> AsyncGrpcCall<QueryDelegationResponse>;
236
237 #[grpc_method(StakingQueryClient::unbonding_delegation)]
240 fn query_unbonding(
241 &self,
242 delegator_address: &AccAddress,
243 validator_address: &ValAddress,
244 ) -> AsyncGrpcCall<QueryUnbondingDelegationResponse>;
245
246 #[grpc_method(StakingQueryClient::redelegations)]
249 fn query_redelegations(
250 &self,
251 delegator_address: &AccAddress,
252 src_validator_address: &ValAddress,
253 dest_validator_address: &ValAddress,
254 pagination: Option<PageRequest>,
255 ) -> AsyncGrpcCall<QueryRedelegationsResponse>;
256
257 #[grpc_method(BlobQueryClient::params)]
261 fn get_blob_params(&self) -> AsyncGrpcCall<BlobParams>;
262
263 #[grpc_method(TxStatusClient::tx_status)]
267 fn tx_status(&self, hash: Hash) -> AsyncGrpcCall<TxStatusResponse>;
268
269 #[grpc_method(GasEstimatorClient::estimate_gas_price)]
277 fn estimate_gas_price(&self, priority: TxPriority) -> AsyncGrpcCall<f64>;
278
279 #[grpc_method(GasEstimatorClient::estimate_gas_price_and_usage)]
289 fn estimate_gas_price_and_usage(
290 &self,
291 priority: TxPriority,
292 tx_bytes: Vec<u8>,
293 ) -> AsyncGrpcCall<GasEstimate>;
294
295 pub fn submit_message<M>(&self, message: M, cfg: TxConfig) -> AsyncGrpcCall<TxInfo>
328 where
329 M: IntoProtobufAny + Send + 'static,
330 {
331 let this = self.clone();
332
333 AsyncGrpcCall::new(move |context| async move {
334 this.submit_message_impl(message, cfg, &context).await
335 })
336 .context(&self.inner.context)
337 }
338
339 pub fn submit_blobs(&self, blobs: &[Blob], cfg: TxConfig) -> AsyncGrpcCall<TxInfo> {
370 let this = self.clone();
371 let blobs = blobs.to_vec();
372
373 AsyncGrpcCall::new(move |context| async move {
374 this.submit_blobs_impl(&blobs, cfg, &context).await
375 })
376 .context(&self.inner.context)
377 }
378
379 pub fn app_version(&self) -> AsyncGrpcCall<AppVersion> {
381 let this = self.clone();
382
383 AsyncGrpcCall::new(move |context| async move {
384 let ChainState { app_version, .. } = this.load_chain_state(&context).await?;
385 Ok(*app_version)
386 })
387 .context(&self.inner.context)
388 }
389
390 pub fn chain_id(&self) -> AsyncGrpcCall<Id> {
392 let this = self.clone();
393
394 AsyncGrpcCall::new(move |context| async move {
395 let ChainState { chain_id, .. } = this.load_chain_state(&context).await?;
396 Ok(chain_id.clone())
397 })
398 .context(&self.inner.context)
399 }
400
401 pub fn get_account_pubkey(&self) -> Option<VerifyingKey> {
403 self.account().map(|account| account.pubkey).ok()
404 }
405
406 pub fn get_account_address(&self) -> Option<AccAddress> {
408 self.get_account_pubkey().map(Into::into)
409 }
410}
411
412impl GrpcClient {
413 async fn get_verified_balance_impl(
414 &self,
415 address: &Address,
416 header: &ExtendedHeader,
417 context: &Context,
418 ) -> Result<Coin> {
419 let mut prefixed_account_key = Vec::with_capacity(1 + 1 + appconsts::SIGNER_SIZE + 4);
421 prefixed_account_key.push(0x02); prefixed_account_key.push(address.as_bytes().len() as u8); prefixed_account_key.extend_from_slice(address.as_bytes()); prefixed_account_key.extend_from_slice(BOND_DENOM.as_bytes()); let height = 1.max(header.height().value().saturating_sub(1));
432
433 let response = self
434 .abci_query(&prefixed_account_key, "store/bank/key", height, true)
435 .context(context)
436 .await?;
437 if response.code != ErrorCode::Success {
438 return Err(Error::AbciQuery(response.code, response.log));
439 }
440
441 if response.value.is_empty() {
443 return Ok(Coin::utia(0));
444 }
445
446 let proof: ProofChain = response.proof_ops.unwrap_or_default().try_into()?;
450 proof.verify_membership(
451 &header.header.app_hash,
452 [prefixed_account_key.as_slice(), b"bank"],
453 &response.value,
454 )?;
455
456 let amount = std::str::from_utf8(&response.value)
457 .map_err(|_| Error::FailedToParseResponse)?
458 .parse()
459 .map_err(|_| Error::FailedToParseResponse)?;
460
461 Ok(Coin::utia(amount))
462 }
463
464 async fn submit_message_impl<M>(
465 &self,
466 message: M,
467 cfg: TxConfig,
468 context: &Context,
469 ) -> Result<TxInfo>
470 where
471 M: IntoProtobufAny,
472 {
473 let tx_body = RawTxBody {
474 messages: vec![message.into_any()],
475 memo: cfg.memo.clone().unwrap_or_default(),
476 ..RawTxBody::default()
477 };
478
479 let tx = self
480 .sign_and_broadcast_tx(tx_body, cfg.clone(), context)
481 .await?;
482
483 self.confirm_tx(tx, cfg, context).await
484 }
485
486 async fn submit_blobs_impl(
487 &self,
488 blobs: &[Blob],
489 cfg: TxConfig,
490 context: &Context,
491 ) -> Result<TxInfo> {
492 if blobs.is_empty() {
493 return Err(Error::TxEmptyBlobList);
494 }
495 let app_version = self.app_version().await?;
496 for blob in blobs {
497 blob.validate(app_version)?;
498 }
499
500 let tx = self
501 .sign_and_broadcast_blobs(blobs.to_vec(), cfg.clone(), context)
502 .await?;
503
504 self.confirm_tx(tx, cfg, context).await
505 }
506
507 async fn load_chain_state(&self, context: &Context) -> Result<&ChainState> {
508 self.inner
509 .chain_state
510 .get_or_try_init(|| async {
511 let block = self.get_latest_block().context(context).await?;
512 let app_version = block.header.version.app;
513 let app_version = AppVersion::from_u64(app_version)
514 .ok_or(celestia_types::Error::UnsupportedAppVersion(app_version))?;
515 let chain_id = block.header.chain_id;
516
517 Ok::<_, Error>(ChainState {
518 app_version,
519 chain_id,
520 })
521 })
522 .await
523 }
524
525 fn account(&self) -> Result<&AccountState> {
526 self.inner.account.as_ref().ok_or(Error::MissingSigner)
527 }
528
529 async fn lock_account(&self, context: &Context) -> Result<AccountGuard<'_>> {
530 let account = self.account()?;
531
532 let base = account
533 .base
534 .get_or_try_init(|| async {
535 let address = AccAddress::from(account.pubkey);
536 let account = self.get_account(&address).context(context).await?;
537 Ok::<_, Error>(Mutex::new(account.into()))
538 })
539 .await?
540 .lock()
541 .await;
542
543 Ok(AccountGuard {
544 base,
545 pubkey: &account.pubkey,
546 signer: &account.signer,
547 })
548 }
549
550 async fn calculate_transaction_gas_params(
553 &self,
554 tx_body: &RawTxBody,
555 cfg: &TxConfig,
556 chain_id: Id,
557 account: &mut AccountGuard<'_>,
558 context: &Context,
559 ) -> Result<(u64, f64)> {
560 if let Some(gas_limit) = cfg.gas_limit {
561 let gas_price = if let Some(gas_price) = cfg.gas_price {
562 gas_price
563 } else {
564 self.estimate_gas_price(cfg.priority)
565 .context(context)
566 .await?
567 };
568
569 return Ok((gas_limit, gas_price));
570 }
571
572 let tx = sign_tx(
573 tx_body.clone(),
574 chain_id.clone(),
575 &account.base,
576 account.pubkey,
577 account.signer,
578 0,
579 1,
580 )
581 .await?;
582
583 let GasEstimate { price, usage } = self
584 .estimate_gas_price_and_usage(cfg.priority, tx.encode_to_vec())
585 .context(context)
586 .await?;
587
588 Ok((usage, cfg.gas_price.unwrap_or(price)))
589 }
590
591 async fn sign_and_broadcast_blobs(
592 &self,
593 blobs: Vec<Blob>,
594 cfg: TxConfig,
595 context: &Context,
596 ) -> Result<BroadcastedTx> {
597 let ChainState { chain_id, .. } = self.load_chain_state(context).await?;
598 let mut account = self.lock_account(context).await?;
601
602 let pfb = MsgPayForBlobs::new(&blobs, account.base.address)?;
603 let pfb = RawTxBody {
604 messages: vec![RawMsgPayForBlobs::from(pfb).into_any()],
605 memo: cfg.memo.clone().unwrap_or_default(),
606 ..RawTxBody::default()
607 };
608 let blobs: Vec<_> = blobs.into_iter().map(Into::into).collect();
609
610 loop {
613 let res = self
614 .calculate_transaction_gas_params(
615 &pfb,
616 &cfg,
617 chain_id.clone(),
618 &mut account,
619 context,
620 )
621 .await;
622
623 if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
626 account.base.sequence = new_sequence?;
627 continue;
628 }
629
630 let (gas_limit, gas_price) = res?;
631 let fee = (gas_limit as f64 * gas_price).ceil() as u64;
632
633 let tx = sign_tx(
634 pfb.clone(),
635 chain_id.clone(),
636 &account.base,
637 account.pubkey,
638 account.signer,
639 gas_limit,
640 fee,
641 )
642 .await?;
643
644 let blob_tx = RawBlobTx {
645 tx: tx.encode_to_vec(),
646 blobs: blobs.clone(),
647 type_id: BLOB_TX_TYPE_ID.to_string(),
648 }
649 .encode_to_vec();
650
651 let res = self
652 .broadcast_tx_with_account(blob_tx, &cfg, &mut account, context)
653 .await;
654
655 if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
656 account.base.sequence = new_sequence?;
657 continue;
658 }
659
660 break res;
661 }
662 }
663
664 async fn sign_and_broadcast_tx(
665 &self,
666 tx: RawTxBody,
667 cfg: TxConfig,
668 context: &Context,
669 ) -> Result<BroadcastedTx> {
670 let ChainState { chain_id, .. } = self.load_chain_state(context).await?;
671
672 let mut account = self.lock_account(context).await?;
675
676 loop {
679 let res = self
680 .calculate_transaction_gas_params(
681 &tx,
682 &cfg,
683 chain_id.clone(),
684 &mut account,
685 context,
686 )
687 .await;
688
689 if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
692 account.base.sequence = new_sequence?;
693 continue;
694 }
695
696 let (gas_limit, gas_price) = res?;
697 let fee = (gas_limit as f64 * gas_price).ceil() as u64;
698
699 let tx = sign_tx(
700 tx.clone(),
701 chain_id.clone(),
702 &account.base,
703 account.pubkey,
704 account.signer,
705 gas_limit,
706 fee,
707 )
708 .await?;
709
710 let res = self
711 .broadcast_tx_with_account(tx.encode_to_vec(), &cfg, &mut account, context)
712 .await;
713
714 if let Some(new_sequence) = res.as_ref().err().and_then(extract_sequence_on_mismatch) {
715 account.base.sequence = new_sequence?;
716 continue;
717 }
718
719 break res;
720 }
721 }
722
723 async fn broadcast_tx_with_account(
724 &self,
725 tx: Vec<u8>,
726 cfg: &TxConfig,
727 account: &mut AccountGuard<'_>,
728 context: &Context,
729 ) -> Result<BroadcastedTx> {
730 let resp = self.broadcast_tx_with_cfg(tx.clone(), cfg, context).await?;
731
732 let sequence = account.base.sequence;
733 account.base.sequence += 1;
734
735 Ok(BroadcastedTx {
736 tx,
737 hash: resp.txhash,
738 sequence,
739 })
740 }
741
742 async fn broadcast_tx_with_cfg(
743 &self,
744 tx: Vec<u8>,
745 cfg: &TxConfig,
746 context: &Context,
747 ) -> Result<TxResponse> {
748 let resp = self
749 .broadcast_tx(tx, BroadcastMode::Sync)
750 .context(context)
751 .await?;
752
753 if resp.code != ErrorCode::Success {
754 let message = if resp.code == ErrorCode::InsufficientFee {
757 if cfg.gas_price.is_some() {
758 format!("Gas price was set via config. {}", resp.raw_log)
759 } else {
760 format!("Gas price was estimated. {}", resp.raw_log)
761 }
762 } else {
763 resp.raw_log
764 };
765
766 Err(Error::TxBroadcastFailed(resp.txhash, resp.code, message))
767 } else {
768 Ok(resp)
769 }
770 }
771
772 async fn confirm_tx(
773 &self,
774 tx: BroadcastedTx,
775 cfg: TxConfig,
776 context: &Context,
777 ) -> Result<TxInfo> {
778 let BroadcastedTx { tx, hash, sequence } = tx;
779 let mut interval = Interval::new(Duration::from_millis(500));
780
781 loop {
782 let tx_status = self.tx_status(hash).context(context).await?;
783 match tx_status.status {
784 TxStatus::Pending => interval.tick().await,
785 TxStatus::Committed => {
786 if tx_status.execution_code == ErrorCode::Success {
787 return Ok(TxInfo {
788 hash,
789 height: tx_status.height,
790 });
791 } else {
792 return Err(Error::TxExecutionFailed(
793 hash,
794 tx_status.execution_code,
795 tx_status.error,
796 ));
797 }
798 }
799 TxStatus::Rejected => {
806 if !is_wrong_sequence(tx_status.execution_code) {
807 let mut acc = self.lock_account(context).await?;
808 acc.base.sequence = sequence;
809 }
810
811 return Err(Error::TxRejected(
812 hash,
813 tx_status.execution_code,
814 tx_status.error,
815 ));
816 }
817 TxStatus::Evicted => {
822 if self
823 .broadcast_tx_with_cfg(tx.clone(), &cfg, context)
824 .await
825 .is_err()
826 {
827 return Err(Error::TxEvicted(hash));
842 }
843 }
844 TxStatus::Unknown => {
847 let mut acc = self.lock_account(context).await?;
848 acc.base.sequence = sequence;
849 return Err(Error::TxNotFound(hash));
850 }
851 }
852 }
853 }
854}
855
856impl fmt::Debug for GrpcClient {
857 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
858 f.write_str("GrpcClient { .. }")
859 }
860}
861
862fn is_wrong_sequence(code: ErrorCode) -> bool {
863 code == ErrorCode::InvalidSequence || code == ErrorCode::WrongSequence
864}
865
866fn extract_sequence_on_mismatch(err: &Error) -> Option<Result<u64>> {
869 let msg = match err {
870 Error::TonicError(status) if status.message().contains(SEQUENCE_ERROR_PAT) => {
871 status.message()
872 }
873 Error::TxBroadcastFailed(_, code, message) if is_wrong_sequence(*code) => message.as_str(),
874 _ => return None,
875 };
876
877 Some(extract_sequence(msg))
878}
879
880fn extract_sequence(msg: &str) -> Result<u64> {
881 let (_, msg_with_sequence) = msg
883 .split_once(SEQUENCE_ERROR_PAT)
884 .ok_or_else(|| Error::SequenceParsingFailed(msg.into()))?;
885
886 let (sequence, _) = msg_with_sequence
888 .split_once(',')
889 .ok_or_else(|| Error::SequenceParsingFailed(msg.into()))?;
890
891 sequence
892 .parse()
893 .map_err(|_| Error::SequenceParsingFailed(msg.into()))
894}
895
896#[cfg(test)]
897mod tests {
898 use std::future::IntoFuture;
899 use std::ops::RangeInclusive;
900 use std::sync::Arc;
901
902 use celestia_proto::cosmos::bank::v1beta1::MsgSend;
903 use celestia_rpc::HeaderClient;
904 use celestia_types::nmt::Namespace;
905 use celestia_types::state::{Coin, ErrorCode};
906 use celestia_types::{AppVersion, Blob};
907 use futures::FutureExt;
908 use lumina_utils::test_utils::async_test;
909 use rand::{Rng, RngCore};
910
911 use super::GrpcClient;
912 use crate::grpc::Context;
913 use crate::test_utils::{
914 TestAccount, load_account, new_grpc_client, new_rpc_client, new_tx_client, spawn,
915 };
916 use crate::{Error, TxConfig};
917
918 #[async_test]
919 async fn extending_client_context() {
920 let client = GrpcClient::builder()
921 .url("http://foo")
922 .metadata("x-token", "secret-token")
923 .build()
924 .unwrap();
925 let call = client.app_version().block_height(1234);
926
927 assert!(call.context.metadata.contains_key("x-token"));
928 assert!(call.context.metadata.contains_key("x-cosmos-block-height"));
929 }
930
931 #[async_test]
932 async fn get_auth_params() {
933 let client = new_grpc_client();
934 let params = client.get_auth_params().await.unwrap();
935 assert!(params.max_memo_characters > 0);
936 assert!(params.tx_sig_limit > 0);
937 assert!(params.tx_size_cost_per_byte > 0);
938 assert!(params.sig_verify_cost_ed25519 > 0);
939 assert!(params.sig_verify_cost_secp256k1 > 0);
940 }
941
942 #[async_test]
943 async fn get_account() {
944 let client = new_grpc_client();
945
946 let accounts = client.get_accounts().await.unwrap();
947
948 let first_account = accounts.first().expect("account to exist");
949 let account = client.get_account(&first_account.address).await.unwrap();
950
951 assert_eq!(&account, first_account);
952 }
953
954 #[async_test]
955 async fn get_balance() {
956 let account = load_account();
957 let client = new_grpc_client();
958
959 let coin = client.get_balance(&account.address, "utia").await.unwrap();
960 assert_eq!("utia", coin.denom());
961 assert!(coin.amount() > 0);
962
963 let all_coins = client.get_all_balances(&account.address).await.unwrap();
964 assert!(!all_coins.is_empty());
965 assert!(all_coins.iter().map(|c| c.amount()).sum::<u64>() > 0);
966
967 let spendable_coins = client
968 .get_spendable_balances(&account.address)
969 .await
970 .unwrap();
971 assert!(!spendable_coins.is_empty());
972 assert!(spendable_coins.iter().map(|c| c.amount()).sum::<u64>() > 0);
973
974 let total_supply = client.get_total_supply().await.unwrap();
975 assert!(!total_supply.is_empty());
976 assert!(total_supply.iter().map(|c| c.amount()).sum::<u64>() > 0);
977 }
978
979 #[async_test]
980 async fn get_verified_balance() {
981 let client = new_grpc_client();
982 let account = load_account();
983
984 let jrpc_client = new_rpc_client().await;
985
986 let (head, expected_balance) = tokio::join!(
987 jrpc_client.header_network_head().map(Result::unwrap),
988 client
989 .get_balance(&account.address, "utia")
990 .into_future()
991 .map(Result::unwrap)
992 );
993
994 let head = jrpc_client
997 .header_wait_for_height(head.height().value() + 1)
998 .await
999 .unwrap();
1000
1001 let verified_balance = client
1002 .get_verified_balance(&account.address, &head)
1003 .await
1004 .unwrap();
1005
1006 assert_eq!(expected_balance, verified_balance);
1007 }
1008
1009 #[async_test]
1010 async fn get_verified_balance_not_funded_account() {
1011 let client = new_grpc_client();
1012 let account = TestAccount::random();
1013
1014 let jrpc_client = new_rpc_client().await;
1015 let head = jrpc_client.header_network_head().await.unwrap();
1016
1017 let verified_balance = client
1018 .get_verified_balance(&account.address, &head)
1019 .await
1020 .unwrap();
1021
1022 assert_eq!(Coin::utia(0), verified_balance);
1023 }
1024
1025 #[async_test]
1026 async fn get_node_config() {
1027 let client = new_grpc_client();
1028 let config = client.get_node_config().await.unwrap();
1029
1030 assert!(config.minimum_gas_price.is_none());
1033 }
1034
1035 #[async_test]
1036 async fn get_block() {
1037 let client = new_grpc_client();
1038
1039 let latest_block = client.get_latest_block().await.unwrap();
1040 let height = latest_block.header.height.value() as i64;
1041
1042 let block = client.get_block_by_height(height).await.unwrap();
1043 assert_eq!(block.header, latest_block.header);
1044 }
1045
1046 #[async_test]
1047 async fn get_blob_params() {
1048 let client = new_grpc_client();
1049 let params = client.get_blob_params().await.unwrap();
1050 assert!(params.gas_per_blob_byte > 0);
1051 assert!(params.gov_max_square_size > 0);
1052 }
1053
1054 #[async_test]
1055 async fn query_state_at_block_height_with_metadata() {
1056 let (_lock, tx_client) = new_tx_client().await;
1057
1058 let tx = tx_client
1059 .submit_blobs(&[random_blob(10..=1000)], TxConfig::default())
1060 .await
1061 .unwrap();
1062
1063 let addr = tx_client.get_account_address().unwrap().into();
1064 let new_balance = tx_client.get_balance(&addr, "utia").await.unwrap();
1065 let old_balance = tx_client
1066 .get_balance(&addr, "utia")
1067 .block_height(tx.height.value() - 1)
1068 .await
1069 .unwrap();
1070
1071 assert!(new_balance.amount() < old_balance.amount());
1072 }
1073
1074 #[async_test]
1075 async fn submit_and_get_tx() {
1076 let (_lock, tx_client) = new_tx_client().await;
1077
1078 let tx = tx_client
1079 .submit_blobs(
1080 &[random_blob(10..=1000)],
1081 TxConfig::default().with_memo("foo"),
1082 )
1083 .await
1084 .unwrap();
1085 let tx2 = tx_client.get_tx(tx.hash).await.unwrap();
1086
1087 assert_eq!(tx.hash, tx2.tx_response.txhash);
1088 assert_eq!(tx2.tx.body.memo, "foo");
1089 }
1090
1091 #[async_test]
1092 async fn parallel_submission() {
1093 let (_lock, tx_client) = new_tx_client().await;
1094 let tx_client = Arc::new(tx_client);
1095
1096 let futs = (0..100)
1097 .map(|_| {
1098 let tx_client = tx_client.clone();
1099 spawn(async move {
1100 let response = if rand::random() {
1101 tx_client
1102 .submit_blobs(&[random_blob(10..=10000)], TxConfig::default())
1103 .await
1104 } else {
1105 tx_client
1106 .submit_message(random_transfer(&tx_client), TxConfig::default())
1107 .await
1108 };
1109
1110 match response {
1111 Ok(_) => (),
1112 Err(Error::TxRejected(_, ErrorCode::WrongSequence, _)) => {}
1115 err => panic!("{err:?}"),
1116 }
1117 })
1118 })
1119 .collect::<Vec<_>>();
1120
1121 for fut in futs {
1122 fut.await.unwrap();
1123 }
1124 }
1125
1126 #[async_test]
1127 async fn updating_sequence_and_resigning() {
1128 let (_lock, tx_client) = new_tx_client().await;
1129
1130 invalidate_sequence(&tx_client).await;
1133 tx_client
1134 .submit_blobs(&[random_blob(10..=1000)], TxConfig::default())
1135 .await
1136 .unwrap();
1137
1138 invalidate_sequence(&tx_client).await;
1140 tx_client
1141 .submit_blobs(
1142 &[random_blob(10..=1000)],
1143 TxConfig::default().with_gas_limit(100000),
1144 )
1145 .await
1146 .unwrap();
1147
1148 invalidate_sequence(&tx_client).await;
1151 tx_client
1152 .submit_message(random_transfer(&tx_client), TxConfig::default())
1153 .await
1154 .unwrap();
1155
1156 invalidate_sequence(&tx_client).await;
1158 tx_client
1159 .submit_message(
1160 random_transfer(&tx_client),
1161 TxConfig::default().with_gas_limit(100000),
1162 )
1163 .await
1164 .unwrap();
1165 }
1166
1167 #[cfg(not(target_arch = "wasm32"))]
1168 #[tokio::test]
1169 async fn retransmit_evicted() {
1170 use tokio::task::JoinSet;
1171
1172 use crate::grpc::TxStatus;
1173
1174 const EVICTION_TESTING_VAL_URL: &str = "http://localhost:29090";
1175
1176 let account = load_account();
1177 let client = GrpcClient::builder()
1178 .url(EVICTION_TESTING_VAL_URL)
1179 .signer_keypair(account.signing_key)
1180 .build()
1181 .unwrap();
1182
1183 let txs = (0..10)
1184 .map(|_| {
1185 let client = client.clone();
1186 async move {
1187 let blobs = (0..2).map(|_| random_blob(500000..=500000)).collect();
1188 client
1189 .sign_and_broadcast_blobs(blobs, TxConfig::default(), &Context::default())
1190 .await
1191 .unwrap()
1192 }
1193 })
1194 .collect::<JoinSet<_>>()
1195 .join_all()
1196 .await;
1197
1198 let successfully_retransmitted = txs
1199 .into_iter()
1200 .map(|tx| {
1201 let client = client.clone();
1202 async move {
1203 let mut status = TxStatus::Pending;
1207 while status == TxStatus::Pending {
1208 status = client.tx_status(tx.hash).await.unwrap().status;
1209 }
1210
1211 let was_evicted = status == TxStatus::Evicted;
1212
1213 match client
1214 .confirm_tx(tx, TxConfig::default(), &Context::default())
1215 .await
1216 {
1217 Err(Error::TxEvicted(_)) => false,
1220 res => {
1221 res.unwrap();
1222 was_evicted
1223 }
1224 }
1225 }
1226 })
1227 .collect::<JoinSet<_>>()
1228 .join_all()
1229 .await;
1230
1231 assert!(
1232 successfully_retransmitted
1233 .into_iter()
1234 .any(std::convert::identity)
1235 );
1236 }
1237
1238 #[async_test]
1239 async fn submit_blobs_insufficient_gas_price_and_limit() {
1240 let (_lock, tx_client) = new_tx_client().await;
1241
1242 let blobs = vec![random_blob(10..=1000)];
1243
1244 let err = tx_client
1245 .submit_blobs(&blobs, TxConfig::default().with_gas_limit(10000))
1246 .await
1247 .unwrap_err();
1248 assert!(matches!(
1249 err,
1250 Error::TxBroadcastFailed(_, ErrorCode::OutOfGas, _)
1251 ));
1252
1253 let err = tx_client
1254 .submit_blobs(&blobs, TxConfig::default().with_gas_price(0.0005))
1255 .await
1256 .unwrap_err();
1257 assert!(matches!(
1258 err,
1259 Error::TxBroadcastFailed(_, ErrorCode::InsufficientFee, _)
1260 ));
1261 }
1262
1263 #[async_test]
1264 async fn submit_message() {
1265 let account = load_account();
1266 let other_account = TestAccount::random();
1267 let amount = Coin::utia(12345);
1268 let (_lock, tx_client) = new_tx_client().await;
1269
1270 let msg = MsgSend {
1271 from_address: account.address.to_string(),
1272 to_address: other_account.address.to_string(),
1273 amount: vec![amount.clone().into()],
1274 };
1275
1276 tx_client
1277 .submit_message(msg, TxConfig::default())
1278 .await
1279 .unwrap();
1280
1281 let coins = tx_client
1282 .get_all_balances(&other_account.address)
1283 .await
1284 .unwrap();
1285
1286 assert_eq!(coins.len(), 1);
1287 assert_eq!(amount, coins[0]);
1288 }
1289
1290 #[async_test]
1291 async fn submit_message_insufficient_gas_price_and_limit() {
1292 let account = load_account();
1293 let other_account = TestAccount::random();
1294 let amount = Coin::utia(12345);
1295 let (_lock, tx_client) = new_tx_client().await;
1296
1297 let msg = MsgSend {
1298 from_address: account.address.to_string(),
1299 to_address: other_account.address.to_string(),
1300 amount: vec![amount.clone().into()],
1301 };
1302
1303 let err = tx_client
1304 .submit_message(msg.clone(), TxConfig::default().with_gas_limit(10000))
1305 .await
1306 .unwrap_err();
1307 assert!(matches!(
1308 err,
1309 Error::TxBroadcastFailed(_, ErrorCode::OutOfGas, _)
1310 ));
1311
1312 let err = tx_client
1313 .submit_message(msg, TxConfig::default().with_gas_price(0.0005))
1314 .await
1315 .unwrap_err();
1316 assert!(matches!(
1317 err,
1318 Error::TxBroadcastFailed(_, ErrorCode::InsufficientFee, _)
1319 ));
1320 }
1321
1322 #[async_test]
1323 async fn tx_client_is_send_and_sync() {
1324 fn is_send_and_sync<T: Send + Sync>(_: &T) {}
1325 fn is_send<T: Send>(_: &T) {}
1326
1327 let (_lock, tx_client) = new_tx_client().await;
1328 is_send_and_sync(&tx_client);
1329
1330 is_send(
1331 &tx_client
1332 .submit_blobs(&[], TxConfig::default())
1333 .into_future(),
1334 );
1335 is_send(
1336 &tx_client
1337 .submit_message(
1338 MsgSend {
1339 from_address: "".into(),
1340 to_address: "".into(),
1341 amount: vec![],
1342 },
1343 TxConfig::default(),
1344 )
1345 .into_future(),
1346 );
1347 }
1348
1349 fn random_blob(size: RangeInclusive<usize>) -> Blob {
1350 let rng = &mut rand::thread_rng();
1351
1352 let mut ns_bytes = vec![0u8; 10];
1353 rng.fill_bytes(&mut ns_bytes);
1354 let namespace = Namespace::new_v0(&ns_bytes).unwrap();
1355
1356 let len = rng.gen_range(size);
1357 let mut blob = vec![0; len];
1358 rng.fill_bytes(&mut blob);
1359 blob.resize(len, 1);
1360
1361 Blob::new(namespace, blob, None, AppVersion::latest()).unwrap()
1362 }
1363
1364 fn random_transfer(client: &GrpcClient) -> MsgSend {
1365 let address = client.get_account_address().unwrap();
1366 let other_account = TestAccount::random();
1367 let amount = rand::thread_rng().gen_range(10..1000);
1368
1369 MsgSend {
1370 from_address: address.to_string(),
1371 to_address: other_account.address.to_string(),
1372 amount: vec![Coin::utia(amount).into()],
1373 }
1374 }
1375
1376 async fn invalidate_sequence(client: &GrpcClient) {
1377 client
1378 .lock_account(&Context::default())
1379 .await
1380 .unwrap()
1381 .base
1382 .sequence += rand::thread_rng().gen_range(2..200);
1383 }
1384}