use crate::error::{Error, Result};
use crate::tx::errors::decode_tx_error;
use crate::tx::{broadcast, BroadcastMode};
use serde_json::Value;
use std::future::Future;
use std::time::Duration;
pub const DEFAULT_WAIT_TIMEOUT: Duration = Duration::from_secs(60);
pub const DEFAULT_WAIT_POLL: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, Copy, Default)]
pub struct WaitOptions {
pub timeout: Option<Duration>,
pub poll: Option<Duration>,
}
#[derive(Debug, Clone)]
pub struct TxResult {
pub tx_hash: String,
pub height: i64,
pub code: u32,
pub codespace: String,
pub gas_wanted: i64,
pub gas_used: i64,
pub raw_log: String,
pub raw: Value,
}
pub async fn wait_for_tx(rest_url: &str, hash: &str, opts: WaitOptions) -> Result<TxResult> {
let timeout = opts.timeout.unwrap_or(DEFAULT_WAIT_TIMEOUT);
let poll = opts.poll.unwrap_or(DEFAULT_WAIT_POLL);
let url = format!(
"{}/cosmos/tx/v1beta1/txs/{hash}",
rest_url.trim_end_matches('/')
);
let deadline = std::time::Instant::now() + timeout;
let http = reqwest::Client::new();
loop {
if let Some(result) = fetch_tx_result(&http, &url).await? {
if result.code != 0 {
let mut err = decode_tx_error(result.code, &result.codespace, &result.raw_log)
.expect("non-zero code yields an error");
err.tx_hash = result.tx_hash.clone();
return Err(Error::Tx(err));
}
return Ok(result);
}
if std::time::Instant::now() >= deadline {
return Err(Error::Transport(format!(
"timed out after {timeout:?} waiting for tx {hash}"
)));
}
tokio::time::sleep(poll).await;
}
}
pub async fn broadcast_and_wait(
rest_url: &str,
tx_bytes: &[u8],
opts: WaitOptions,
) -> Result<TxResult> {
let resp = broadcast(rest_url, tx_bytes, BroadcastMode::Sync).await?;
let hash = resp["tx_response"]["txhash"]
.as_str()
.ok_or_else(|| Error::InvalidResponse("broadcast response missing txhash".into()))?;
if let Some(code) = resp["tx_response"]["code"].as_u64() {
if code != 0 {
let codespace = resp["tx_response"]["codespace"].as_str().unwrap_or("");
let raw_log = resp["tx_response"]["raw_log"].as_str().unwrap_or("");
let mut err = decode_tx_error(code as u32, codespace, raw_log)
.expect("non-zero code yields an error");
err.tx_hash = hash.to_string();
return Err(Error::Tx(err));
}
}
wait_for_tx(rest_url, hash, opts).await
}
pub async fn with_retry<T, F, Fut>(attempts: u32, delay: Duration, mut f: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T>>,
{
let attempts = attempts.max(1);
let mut last_err: Option<Error> = None;
for i in 0..attempts {
match f().await {
Ok(v) => return Ok(v),
Err(e) => {
last_err = Some(e);
if i + 1 < attempts {
tokio::time::sleep(delay).await;
}
}
}
}
Err(last_err.expect("at least one attempt ran"))
}
pub(crate) async fn fetch_tx_result(http: &reqwest::Client, url: &str) -> Result<Option<TxResult>> {
let resp = http
.get(url)
.header("Accept", "application/json")
.send()
.await?;
let status = resp.status();
let body = resp.text().await?;
if status.as_u16() == 404 {
return Ok(None);
}
if !status.is_success() {
if body.to_lowercase().contains("not found") {
return Ok(None);
}
return Err(Error::Http {
status: status.as_u16(),
url: url.to_string(),
body,
});
}
let v: Value =
serde_json::from_str(&body).map_err(|e| Error::InvalidResponse(e.to_string()))?;
let result = parse_tx_response(&v);
if result.tx_hash.is_empty() {
return Ok(None);
}
Ok(Some(result))
}
pub(crate) fn parse_tx_response(v: &Value) -> TxResult {
let tr = &v["tx_response"];
TxResult {
tx_hash: tr["txhash"].as_str().unwrap_or("").to_string(),
height: str_to_i64(&tr["height"]),
code: tr["code"].as_u64().unwrap_or(0) as u32,
codespace: tr["codespace"].as_str().unwrap_or("").to_string(),
gas_wanted: str_to_i64(&tr["gas_wanted"]),
gas_used: str_to_i64(&tr["gas_used"]),
raw_log: tr["raw_log"].as_str().unwrap_or("").to_string(),
raw: v.clone(),
}
}
fn str_to_i64(v: &Value) -> i64 {
match v {
Value::String(s) => s.parse().unwrap_or(0),
Value::Number(n) => n.as_i64().unwrap_or(0),
_ => 0,
}
}