use crate::client::types::BlockParams;
use crate::client::types::*;
use crate::coin::Fee;
use crate::{address::Address, private_key::MessageArgs};
use crate::{client::Contact, error::CosmosGrpcError};
use cosmos_sdk_proto::cosmos::base::tendermint::v1beta1::service_client::ServiceClient as TendermintServiceClient;
use cosmos_sdk_proto::cosmos::base::tendermint::v1beta1::GetBlockByHeightRequest;
use cosmos_sdk_proto::cosmos::base::tendermint::v1beta1::GetLatestBlockRequest;
use cosmos_sdk_proto::cosmos::base::tendermint::v1beta1::GetSyncingRequest;
use cosmos_sdk_proto::cosmos::consensus::v1::query_client::QueryClient as ConsensusQueryClient;
use cosmos_sdk_proto::cosmos::consensus::v1::QueryParamsRequest;
use cosmos_sdk_proto::cosmos::params::v1beta1::query_client::QueryClient as ParamsQueryClient;
use cosmos_sdk_proto::cosmos::params::v1beta1::{
QueryParamsRequest as LegacyQueryParamsRequest,
QueryParamsResponse as LegacyQueryParamsResponse,
};
use cosmos_sdk_proto::cosmos::tx::v1beta1::service_client::ServiceClient as TxServiceClient;
use cosmos_sdk_proto::cosmos::tx::v1beta1::GetTxRequest;
use cosmos_sdk_proto::cosmos::tx::v1beta1::GetTxResponse;
use cosmos_sdk_proto::tendermint::types::Block;
use std::collections::HashSet;
use std::time::Duration;
use std::time::Instant;
use tokio::time::{sleep, timeout};
pub const DEFAULT_TRANSACTION_TIMEOUT_BLOCKS: u64 = 100;
impl Contact {
pub async fn get_chain_status(&self) -> Result<ChainStatus, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
TendermintServiceClient::connect(self.url.clone()),
)
.await??;
let syncing = timeout(self.get_timeout(), grpc.get_syncing(GetSyncingRequest {}))
.await??
.into_inner();
if syncing.syncing {
Ok(ChainStatus::Syncing)
} else {
let block = timeout(
self.get_timeout(),
grpc.get_latest_block(GetLatestBlockRequest {}),
)
.await?;
match block {
Ok(block) => match block.into_inner().block {
Some(block) => match block.last_commit {
Some(commit) => Ok(ChainStatus::Moving {
block_height: u64::try_from(commit.height)?,
}),
None => Err(CosmosGrpcError::BadResponse(
"No commit in block?".to_string(),
)),
},
None => Ok(ChainStatus::WaitingToStart),
},
Err(e) => {
if e.message().contains("nil Block") {
Ok(ChainStatus::WaitingToStart)
} else {
Err(e.into())
}
}
}
}
}
pub async fn get_latest_block(&self) -> Result<LatestBlock, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
TendermintServiceClient::connect(self.url.clone()),
)
.await??;
let syncing = timeout(self.get_timeout(), grpc.get_syncing(GetSyncingRequest {}))
.await??
.into_inner()
.syncing;
let block = timeout(
self.get_timeout(),
grpc.get_latest_block(GetLatestBlockRequest {}),
)
.await?
.unwrap();
let block = block.into_inner().block;
match block {
Some(block) => {
if syncing {
Ok(LatestBlock::Syncing { block })
} else {
Ok(LatestBlock::Latest { block })
}
}
None => Ok(LatestBlock::WaitingToStart),
}
}
pub async fn get_latest_block_height(&self) -> Result<u64, CosmosGrpcError> {
let latest_block = self.get_latest_block().await?;
match latest_block {
LatestBlock::Latest { block } | LatestBlock::Syncing { block } => {
if let Some(header) = block.header {
Ok(u64::try_from(header.height)?)
} else {
Err(CosmosGrpcError::BadResponse(
"Null block header?".to_string(),
))
}
}
LatestBlock::WaitingToStart => Err(CosmosGrpcError::ChainNotRunning),
}
}
pub async fn get_block(&self, block: u64) -> Result<Option<Block>, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
TendermintServiceClient::connect(self.url.clone()),
)
.await??;
let block = timeout(
self.get_timeout(),
grpc.get_block_by_height(GetBlockByHeightRequest {
height: i64::try_from(block)?,
}),
)
.await??
.into_inner();
Ok(block.block)
}
pub async fn get_block_range(
&self,
start: u64,
end: u64,
) -> Result<Vec<Option<Block>>, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
TendermintServiceClient::connect(self.url.clone()),
)
.await??;
let mut result = Vec::new();
for i in start..end {
let block = timeout(
self.get_timeout(),
grpc.get_block_by_height(GetBlockByHeightRequest {
height: i64::try_from(i)?,
}),
)
.await??
.into_inner();
result.push(block.block);
}
Ok(result)
}
pub async fn get_block_set(
&self,
blocks: HashSet<u64>,
) -> Result<Vec<Option<Block>>, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
TendermintServiceClient::connect(self.url.clone()),
)
.await??;
let mut result = Vec::new();
for i in blocks {
let block = timeout(
self.get_timeout(),
grpc.get_block_by_height(GetBlockByHeightRequest {
height: i64::try_from(i)?,
}),
)
.await??
.into_inner();
result.push(block.block);
}
Ok(result)
}
pub async fn get_block_params(&self) -> Result<BlockParams, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
ConsensusQueryClient::connect(self.url.clone()),
)
.await??;
let res = timeout(self.get_timeout(), grpc.params(QueryParamsRequest {})).await?;
if let Err(e) = res {
if e.code() == tonic::Code::Unimplemented {
debug!("Chain does not support new params query endpoint, falling back to legacy method");
return self.get_block_params_fallback().await;
} else {
return Err(e.into());
}
}
if let Some(v) = res.unwrap().into_inner().params {
match v.block {
Some(v) => Ok(BlockParams {
max_bytes: u64::try_from(v.max_bytes)?,
max_gas: Some(u64::try_from(v.max_gas)?),
}),
None => Err(CosmosGrpcError::BadResponse(
"No BlockParams? Deep Space/protos probably need an update".to_string(),
)),
}
} else {
Err(CosmosGrpcError::BadResponse(
"No BlockParams? Deep Space/protos probably need an update".to_string(),
))
}
}
async fn get_block_params_fallback(&self) -> Result<BlockParams, CosmosGrpcError> {
#[allow(deprecated)]
let res = self.get_param("baseapp", "BlockParams").await?;
if let Some(v) = res.param {
match serde_json::from_str(&v.value) {
Ok(v) => {
let v: BlockParamsJson = v;
Ok(v.into())
}
Err(e) => Err(CosmosGrpcError::BadResponse(format!(
"Failed to parse BlockParams: {}",
e
))),
}
} else {
Err(CosmosGrpcError::BadResponse(
"No BlockParams? Deep Space probably needs to be upgraded".to_string(),
))
}
}
#[deprecated(
note = "Modules manage their own parameters now, use the module's grpc client to get parameters"
)]
pub async fn get_param(
&self,
subspace: impl ToString,
key: impl ToString,
) -> Result<LegacyQueryParamsResponse, CosmosGrpcError> {
let mut grpc = timeout(
self.get_timeout(),
ParamsQueryClient::connect(self.url.clone()),
)
.await??;
let res = timeout(
self.get_timeout(),
grpc.params(LegacyQueryParamsRequest {
subspace: subspace.to_string(),
key: key.to_string(),
}),
)
.await??;
Ok(res.into_inner())
}
pub async fn get_tx_by_hash(&self, txhash: String) -> Result<GetTxResponse, CosmosGrpcError> {
let mut txrpc = timeout(
self.get_timeout(),
TxServiceClient::connect(self.url.clone()),
)
.await??;
let res = timeout(
self.get_timeout(),
txrpc.get_tx(GetTxRequest { hash: txhash }),
)
.await??
.into_inner();
Ok(res)
}
pub async fn get_message_args(
&self,
our_address: Address,
fee: Fee,
timeout_block: Option<u64>,
) -> Result<MessageArgs, CosmosGrpcError> {
let account_info = self.get_account_info(our_address).await?;
debug!("Account info: {:?}", account_info);
let latest_block = self.get_latest_block().await?;
debug!("Latest block: {:?}", latest_block);
match latest_block {
LatestBlock::Latest { block } => {
if let Some(header) = block.header {
Ok(MessageArgs {
sequence: account_info.sequence,
account_number: account_info.account_number,
chain_id: header.chain_id,
fee,
tip: None,
timeout_height: u64::try_from(header.height)?
+ timeout_block.unwrap_or(DEFAULT_TRANSACTION_TIMEOUT_BLOCKS),
})
} else {
Err(CosmosGrpcError::BadResponse(
"Null block header?".to_string(),
))
}
}
LatestBlock::Syncing { .. } => Err(CosmosGrpcError::NodeNotSynced),
LatestBlock::WaitingToStart => Err(CosmosGrpcError::ChainNotRunning),
}
}
pub async fn wait_for_next_block(&self, timeout: Duration) -> Result<(), CosmosGrpcError> {
let start = Instant::now();
let mut last_height = None;
while Instant::now() - start < timeout {
let res = self.get_chain_status().await;
debug!("Got chain status: {:?}", res);
match (res, last_height) {
(Ok(ChainStatus::Moving { block_height }), None) => {
last_height = Some(block_height)
}
(Ok(ChainStatus::Moving { block_height }), Some(last_height)) => {
if block_height > last_height {
return Ok(());
}
}
(Ok(ChainStatus::Syncing), _) => return Err(CosmosGrpcError::NodeNotSynced),
(Ok(ChainStatus::WaitingToStart), _) => {
return Err(CosmosGrpcError::ChainNotRunning)
}
(Err(_), _) => {}
}
sleep(Duration::from_secs(1)).await;
}
Err(CosmosGrpcError::NoBlockProduced { time: timeout })
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct BlockParamsJson {
max_bytes: String,
max_gas: String,
}
impl From<BlockParamsJson> for BlockParams {
fn from(input: BlockParamsJson) -> Self {
let max_gas = input.max_gas.parse().ok();
let max_bytes = input.max_bytes.parse().unwrap_or(0u64);
BlockParams { max_bytes, max_gas }
}
}