use core::time::Duration;
use std::thread;
use tracing::{debug, error, info, instrument, warn};
use ibc_proto::google::protobuf::Any;
use tendermint::abci::Code;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tendermint_rpc::HttpClient;
use crate::chain::cosmos::query::account::refresh_account;
use crate::chain::cosmos::tx::estimate_fee_and_send_tx;
use crate::chain::cosmos::types::account::Account;
use crate::chain::cosmos::types::config::TxConfig;
use crate::config::types::Memo;
use crate::error::Error;
use crate::keyring::{Secp256k1KeyPair, SigningKeyPair};
use crate::sdk_error::sdk_error_from_tx_sync_error_code;
use crate::{telemetry, time};
const ACCOUNT_SEQUENCE_RETRY_DELAY: u64 = 300;
const INCORRECT_ACCOUNT_SEQUENCE_ERR: u32 = 32;
#[instrument(
name = "send_tx_with_account_sequence_retry",
level = "error",
skip_all,
fields(
chain = %config.chain_id,
account.sequence = %account.sequence,
),
)]
pub async fn send_tx_with_account_sequence_retry(
rpc_client: &HttpClient,
config: &TxConfig,
key_pair: &Secp256k1KeyPair,
account: &mut Account,
tx_memo: &Memo,
messages: &[Any],
) -> Result<Response, Error> {
time!(
"send_tx_with_account_sequence_retry",
{
"src_chain": config.chain_id,
}
);
let _message_count = messages.len() as u64;
let response = do_send_tx_with_account_sequence_retry(
rpc_client, config, key_pair, account, tx_memo, messages,
)
.await;
if response.is_ok() {
telemetry!(messages_submitted, &config.chain_id, _message_count);
}
response
}
async fn do_send_tx_with_account_sequence_retry(
rpc_client: &HttpClient,
config: &TxConfig,
key_pair: &Secp256k1KeyPair,
account: &mut Account,
tx_memo: &Memo,
messages: &[Any],
) -> Result<Response, Error> {
match estimate_fee_and_send_tx(rpc_client, config, key_pair, account, tx_memo, messages).await {
Err(ref e) if mismatch_account_sequence_number_error_requires_refresh(e) => {
warn!(
error = %e,
"failed to estimate gas because of a mismatched account sequence number, \
refreshing account sequence number and retrying once",
);
refresh_account_and_retry_send_tx_with_account_sequence(
rpc_client, config, key_pair, account, tx_memo, messages,
)
.await
}
Ok((ref response, _)) if response.code == Code::from(INCORRECT_ACCOUNT_SEQUENCE_ERR) => {
warn!(
?response,
"failed to broadcast tx because of a mismatched account sequence number, \
refreshing account sequence number and retrying once"
);
telemetry!(
broadcast_errors,
&account.address.to_string(),
response.code.into(),
&response.log,
);
refresh_account_and_retry_send_tx_with_account_sequence(
rpc_client, config, key_pair, account, tx_memo, messages,
)
.await
}
Ok((response, estimated_gas)) => {
debug!("gas estimation succeeded");
match response.code {
Code::Ok => {
let old_account_sequence = account.sequence;
account.sequence.increment_mut();
debug!(
?response,
account.sequence.old = %old_account_sequence,
account.sequence.new = %account.sequence,
"tx was successfully broadcasted, \
increasing account sequence number"
);
Ok(response)
}
Code::Err(code) if response.log.contains("packet messages are redundant") => {
info!(
?response,
diagnostic = response.log,
?code,
"broadcast tx was not completed, all packets in tx have been relayed already, no fees were consumed"
);
Ok(response)
}
Code::Err(code) => {
error!(
?response,
diagnostic = ?sdk_error_from_tx_sync_error_code(code.into(), estimated_gas),
"failed to broadcast tx with unrecoverable error"
);
telemetry!(
broadcast_errors,
&account.address.to_string(),
code.into(),
&response.log
);
Ok(response)
}
}
}
Err(e) => {
error!(error = %e, "gas estimation failed or encountered another unrecoverable error");
Err(e)
}
}
}
async fn refresh_account_and_retry_send_tx_with_account_sequence(
rpc_client: &HttpClient,
config: &TxConfig,
key_pair: &Secp256k1KeyPair,
account: &mut Account,
tx_memo: &Memo,
messages: &[Any],
) -> Result<Response, Error> {
let key_account = key_pair.account();
refresh_account(&config.grpc_address, &key_account, account).await?;
thread::sleep(Duration::from_millis(ACCOUNT_SEQUENCE_RETRY_DELAY));
let (estimate_result, _) =
estimate_fee_and_send_tx(rpc_client, config, key_pair, account, tx_memo, messages).await?;
Ok(estimate_result)
}
fn mismatch_account_sequence_number_error_requires_refresh(e: &Error) -> bool {
use crate::error::ErrorDetail::*;
match e.detail() {
GrpcStatus(detail) => detail.is_account_sequence_mismatch_that_requires_refresh(),
_ => false,
}
}