use std::fmt;
use std::future::Future;
use std::ops::Deref;
use std::sync::RwLock;
use std::time::Duration;
use bytes::Bytes;
use celestia_proto::cosmos::crypto::secp256k1;
pub use celestia_proto::cosmos::tx::v1beta1::SignDoc;
use celestia_types::blob::{Blob, MsgPayForBlobs, RawBlobTx, RawMsgPayForBlobs};
use celestia_types::consts::appconsts;
use celestia_types::hash::Hash;
use celestia_types::state::auth::BaseAccount;
use celestia_types::state::{
Address, AuthInfo, ErrorCode, Fee, ModeInfo, RawTx, RawTxBody, SignerInfo, Sum,
};
use celestia_types::{AppVersion, Height};
use http_body::Body;
use k256::ecdsa::signature::{Error as SignatureError, Signer};
use k256::ecdsa::{Signature, VerifyingKey};
use lumina_utils::time::Interval;
use prost::{Message, Name};
use tendermint::chain::Id;
use tendermint::PublicKey;
use tendermint_proto::google::protobuf::Any;
use tendermint_proto::Protobuf;
use tokio::sync::{Mutex, MutexGuard};
use tonic::body::BoxBody;
use tonic::client::GrpcService;
use crate::grpc::{Account, BroadcastMode, GrpcClient, StdError, TxStatus};
use crate::{Error, Result};
#[cfg(feature = "uniffi")]
uniffi::use_remote_type!(celestia_types::Hash);
const PFB_GAS_FIXED_COST: u64 = 75000;
const BYTES_PER_BLOB_INFO: u64 = 70;
const DEFAULT_GAS_MULTIPLIER: f64 = 1.1;
const BLOB_TX_TYPE_ID: &str = "BLOB";
pub struct TxClient<T, S> {
client: GrpcClient<T>,
account: Mutex<Account>,
pubkey: VerifyingKey,
signer: S,
app_version: AppVersion,
chain_id: Id,
gas_price: RwLock<f64>,
}
impl<T, S> TxClient<T, S>
where
T: GrpcService<BoxBody> + Clone,
T::Error: Into<StdError>,
T::ResponseBody: Body<Data = Bytes> + Send + 'static,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
S: DocSigner,
{
pub async fn new(
transport: T,
account_address: &Address,
account_pubkey: VerifyingKey,
signer: S,
) -> Result<Self> {
let client = GrpcClient::new(transport);
let account = client.get_account(account_address).await?;
if let Some(pubkey) = account.pub_key {
if pubkey != PublicKey::Secp256k1(account_pubkey) {
return Err(Error::PublicKeyMismatch);
}
};
let account = Mutex::new(account);
let gas_price = client.get_min_gas_price().await?;
let block = client.get_latest_block().await?;
let app_version = block.header.version.app;
let app_version = AppVersion::from_u64(app_version)
.ok_or(celestia_types::Error::UnsupportedAppVersion(app_version))?;
let chain_id = block.header.chain_id;
Ok(Self {
client,
signer,
account,
pubkey: account_pubkey,
app_version,
chain_id,
gas_price: RwLock::new(gas_price),
})
}
pub async fn submit_message<M>(&self, message: M, cfg: TxConfig) -> Result<TxInfo>
where
M: IntoAny,
{
let tx_body = RawTxBody {
messages: vec![message.into_any()],
memo: cfg.memo.clone().unwrap_or_default(),
..RawTxBody::default()
};
let mut retries = 0;
let (tx_hash, sequence) = loop {
match self
.sign_and_broadcast_tx(tx_body.clone(), cfg.clone())
.await
{
Ok(resp) => break resp,
Err(Error::TxBroadcastFailed(_, ErrorCode::InsufficientFee, _))
if retries < 3 && cfg.gas_price.is_none() =>
{
retries += 1;
continue;
}
Err(e) => return Err(e),
}
};
self.confirm_tx(tx_hash, sequence).await
}
pub async fn submit_blobs(&self, blobs: &[Blob], cfg: TxConfig) -> Result<TxInfo> {
if blobs.is_empty() {
return Err(Error::TxEmptyBlobList);
}
for blob in blobs {
blob.validate(self.app_version)?;
}
let mut retries = 0;
let (tx_hash, sequence) = loop {
match self
.sign_and_broadcast_blobs(blobs.to_vec(), cfg.clone())
.await
{
Ok(resp) => break resp,
Err(Error::TxBroadcastFailed(_, ErrorCode::InsufficientFee, _))
if retries < 3 && cfg.gas_price.is_none() =>
{
retries += 1;
continue;
}
Err(e) => return Err(e),
}
};
self.confirm_tx(tx_hash, sequence).await
}
pub fn last_seen_gas_price(&self) -> f64 {
*self.gas_price.read().expect("lock poisoned")
}
async fn update_gas_price(&self) -> Result<f64> {
let gas_price = self.client.get_min_gas_price().await?;
*self.gas_price.write().expect("lock poisoned") = gas_price;
Ok(gas_price)
}
pub fn chain_id(&self) -> &Id {
&self.chain_id
}
pub fn app_version(&self) -> AppVersion {
self.app_version
}
async fn sign_and_broadcast_tx(&self, tx: RawTxBody, cfg: TxConfig) -> Result<(Hash, u64)> {
let account = self.account.lock().await;
let sign_tx = |tx, gas, fee| {
sign_tx(
tx,
self.chain_id.clone(),
&account,
&self.pubkey,
&self.signer,
gas,
fee,
)
};
let gas_limit = if let Some(gas_limit) = cfg.gas_limit {
gas_limit
} else {
let tx = sign_tx(tx.clone(), 0, 1).await?;
let gas_info = self.client.simulate(tx.encode_to_vec()).await?;
(gas_info.gas_used as f64 * DEFAULT_GAS_MULTIPLIER) as u64
};
let gas_price = if let Some(gas_price) = cfg.gas_price {
gas_price
} else {
self.update_gas_price().await?
};
let fee = (gas_limit as f64 * gas_price).ceil();
let tx = sign_tx(tx, gas_limit, fee as u64).await?;
self.broadcast_tx_with_account(tx.encode_to_vec(), account)
.await
}
async fn sign_and_broadcast_blobs(
&self,
blobs: Vec<Blob>,
cfg: TxConfig,
) -> Result<(Hash, u64)> {
let account = self.account.lock().await;
let pfb = MsgPayForBlobs::new(&blobs, account.address.clone())?;
let pfb = RawTxBody {
messages: vec![RawMsgPayForBlobs::from(pfb).into_any()],
memo: cfg.memo.unwrap_or_default(),
..RawTxBody::default()
};
let gas_limit = cfg
.gas_limit
.unwrap_or_else(|| estimate_gas(&blobs, self.app_version, DEFAULT_GAS_MULTIPLIER));
let gas_price = if let Some(gas_price) = cfg.gas_price {
gas_price
} else {
self.update_gas_price().await?
};
let fee = (gas_limit as f64 * gas_price).ceil() as u64;
let tx = sign_tx(
pfb,
self.chain_id.clone(),
&account,
&self.pubkey,
&self.signer,
gas_limit,
fee,
)
.await?;
let blobs = blobs.into_iter().map(Into::into).collect();
let blob_tx = RawBlobTx {
tx: tx.encode_to_vec(),
blobs,
type_id: BLOB_TX_TYPE_ID.to_string(),
};
self.broadcast_tx_with_account(blob_tx.encode_to_vec(), account)
.await
}
async fn broadcast_tx_with_account(
&self,
tx: Vec<u8>,
mut account: MutexGuard<'_, Account>,
) -> Result<(Hash, u64)> {
let resp = self.client.broadcast_tx(tx, BroadcastMode::Sync).await?;
if resp.code != ErrorCode::Success {
return Err(Error::TxBroadcastFailed(
resp.txhash,
resp.code,
resp.raw_log,
));
}
let tx_sequence = account.sequence;
account.sequence += 1;
Ok((resp.txhash, tx_sequence))
}
async fn confirm_tx(&self, hash: Hash, sequence: u64) -> Result<TxInfo> {
let mut interval = Interval::new(Duration::from_millis(500)).await;
loop {
let tx_status = self.client.tx_status(hash).await?;
match tx_status.status {
TxStatus::Pending => interval.tick().await,
TxStatus::Committed => {
if tx_status.execution_code == ErrorCode::Success {
return Ok(TxInfo {
hash,
height: tx_status.height,
});
} else {
return Err(Error::TxExecutionFailed(
hash,
tx_status.execution_code,
tx_status.error,
));
}
}
TxStatus::Evicted => {
let mut acc = self.account.lock().await;
acc.sequence = sequence;
return Err(Error::TxEvicted(hash));
}
TxStatus::Unknown => {
let mut acc = self.account.lock().await;
acc.sequence = sequence;
return Err(Error::TxNotFound(hash));
}
}
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<S> TxClient<tonic::transport::Channel, S>
where
S: DocSigner,
{
pub async fn with_url(
url: impl Into<String>,
account_address: &Address,
account_pubkey: VerifyingKey,
signer: S,
) -> Result<Self> {
let transport = tonic::transport::Endpoint::from_shared(url.into())?.connect_lazy();
Self::new(transport, account_address, account_pubkey, signer).await
}
}
#[cfg(target_arch = "wasm32")]
impl<S> TxClient<tonic_web_wasm_client::Client, S>
where
S: DocSigner,
{
pub async fn with_grpcweb_url(
url: impl Into<String>,
account_address: &Address,
account_pubkey: VerifyingKey,
signer: S,
) -> Result<Self> {
let transport = tonic_web_wasm_client::Client::new(url.into());
Self::new(transport, account_address, account_pubkey, signer).await
}
}
impl<T, S> Deref for TxClient<T, S> {
type Target = GrpcClient<T>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl<T, S> fmt::Debug for TxClient<T, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("TxClient { .. }")
}
}
pub trait DocSigner {
fn try_sign(&self, doc: SignDoc) -> impl Future<Output = Result<Signature, SignatureError>>;
}
impl<T> DocSigner for T
where
T: Signer<Signature>,
{
async fn try_sign(&self, doc: SignDoc) -> Result<Signature, SignatureError> {
let bytes = doc.encode_to_vec();
self.try_sign(&bytes)
}
}
pub trait IntoAny {
fn into_any(self) -> Any;
}
impl<T> IntoAny for T
where
T: Name,
{
fn into_any(self) -> Any {
Any {
type_url: T::type_url(),
value: self.encode_to_vec(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
pub struct TxInfo {
pub hash: Hash,
pub height: Height,
}
#[derive(Debug, Default, Clone, PartialEq)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
pub struct TxConfig {
pub gas_limit: Option<u64>,
pub gas_price: Option<f64>,
pub memo: Option<String>,
}
impl TxConfig {
pub fn with_gas_limit(mut self, gas_limit: u64) -> Self {
self.gas_limit = Some(gas_limit);
self
}
pub fn with_gas_price(mut self, gas_price: f64) -> Self {
self.gas_price = Some(gas_price);
self
}
pub fn with_memo(mut self, memo: impl Into<String>) -> Self {
self.memo = Some(memo.into());
self
}
}
#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
pub use wbg::*;
#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
mod wbg {
use wasm_bindgen::{prelude::*, JsCast};
use super::{TxConfig, TxInfo};
use crate::utils::make_object;
#[wasm_bindgen(typescript_custom_section)]
const _: &str = "
/**
* Transaction info
*/
export interface TxInfo {
hash: string;
height: bigint;
}
/**
* Transaction config.
*/
export interface TxConfig {
gasLimit?: bigint; // utia
gasPrice?: number;
memo?: string;
}
";
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(typescript_type = "TxInfo")]
pub type JsTxInfo;
#[wasm_bindgen(typescript_type = "TxConfig")]
pub type JsTxConfig;
#[wasm_bindgen(method, getter, js_name = gasLimit)]
pub fn gas_limit(this: &JsTxConfig) -> Option<u64>;
#[wasm_bindgen(method, getter, js_name = gasPrice)]
pub fn gas_price(this: &JsTxConfig) -> Option<f64>;
#[wasm_bindgen(method, getter, js_name = memo)]
pub fn memo(this: &JsTxConfig) -> Option<String>;
}
impl From<TxInfo> for JsTxInfo {
fn from(value: TxInfo) -> JsTxInfo {
let obj = make_object!(
"hash" => value.hash.to_string().into(),
"height" => js_sys::BigInt::from(value.height.value())
);
obj.unchecked_into()
}
}
impl From<JsTxConfig> for TxConfig {
fn from(value: JsTxConfig) -> TxConfig {
TxConfig {
gas_limit: value.gas_limit(),
gas_price: value.gas_price(),
memo: value.memo(),
}
}
}
}
pub async fn sign_tx(
tx_body: RawTxBody,
chain_id: Id,
base_account: &BaseAccount,
verifying_key: &VerifyingKey,
signer: &impl DocSigner,
gas_limit: u64,
fee: u64,
) -> Result<RawTx> {
const SIGNING_MODE_INFO: ModeInfo = ModeInfo {
sum: Sum::Single { mode: 1 },
};
let public_key = secp256k1::PubKey {
key: verifying_key.to_encoded_point(true).as_bytes().to_vec(),
};
let public_key_as_any = Any {
type_url: secp256k1::PubKey::type_url(),
value: public_key.encode_to_vec(),
};
let mut fee = Fee::new(fee, gas_limit);
fee.payer = Some(base_account.address.clone());
let auth_info = AuthInfo {
signer_infos: vec![SignerInfo {
public_key: Some(public_key_as_any),
mode_info: SIGNING_MODE_INFO,
sequence: base_account.sequence,
}],
fee,
};
let doc = SignDoc {
body_bytes: tx_body.encode_to_vec(),
auth_info_bytes: auth_info.clone().encode_vec(),
chain_id: chain_id.into(),
account_number: base_account.account_number,
};
let signature = signer.try_sign(doc).await?;
Ok(RawTx {
auth_info: Some(auth_info.into()),
body: Some(tx_body),
signatures: vec![signature.to_bytes().to_vec()],
})
}
fn estimate_gas(blobs: &[Blob], app_version: AppVersion, gas_multiplier: f64) -> u64 {
let gas_per_blob_byte = appconsts::gas_per_blob_byte(app_version);
let tx_size_cost_per_byte = appconsts::tx_size_cost_per_byte(app_version);
let blobs_bytes =
blobs.iter().map(Blob::shares_len).sum::<usize>() as u64 * appconsts::SHARE_SIZE as u64;
let gas = blobs_bytes * gas_per_blob_byte
+ (tx_size_cost_per_byte * BYTES_PER_BLOB_INFO * blobs.len() as u64)
+ PFB_GAS_FIXED_COST;
(gas as f64 * gas_multiplier) as u64
}