use crate::utxo_manager::{
FuelTxCoin,
UtxoProvider,
};
use fuel_core_client::client::types::{
ResolvedOutput,
TransactionStatus,
TransactionType,
};
use fuel_core_types::{
blockchain::transaction::TransactionExt,
fuel_tx::{
Address,
AssetId,
ConsensusParameters,
Transaction,
TxId,
UniqueIdentifier,
UtxoId,
Witness,
},
fuel_types::ChainId,
};
use fuels::{
accounts::{
ViewOnlyAccount,
wallet::Unlocked,
},
prelude::{
BuildableTransaction,
ResourceFilter,
ScriptTransactionBuilder,
TransactionBuilder,
TxPolicies,
Wallet,
},
types::{
coin_type::CoinType,
input::Input,
output::Output,
transaction::ScriptTransaction,
tx_status::TxStatus,
},
};
use futures::StreamExt;
use std::{
future::Future,
ops::Mul,
time::{
Duration,
Instant,
},
};
pub const SIGNATURE_MARGIN: usize = 100;
#[derive(Clone, Debug)]
pub struct SendResult<T = TxStatus> {
pub tx_id: TxId,
pub tx_status: T,
pub known_coins: Vec<FuelTxCoin>,
pub dynamic_coins: Vec<FuelTxCoin>,
pub preconf_rx_time: Option<Duration>,
}
#[derive(Clone)]
pub struct BuilderData {
pub consensus_parameters: ConsensusParameters,
pub gas_price: u64,
}
impl BuilderData {
pub fn max_fee(&self) -> u64 {
let max_gas_limit = self.consensus_parameters.tx_params().max_gas_per_tx();
max_gas_limit
.mul(self.gas_price)
.div_ceil(self.consensus_parameters.fee_params().gas_price_factor())
}
}
pub trait WalletExt {
fn builder_data(&self) -> impl Future<Output = anyhow::Result<BuilderData>> + Send;
fn build_transfer(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
fn build_transaction(
&self,
inputs: Vec<Input>,
outputs: Vec<Output>,
witnesses: Vec<Witness>,
tx_policies: TxPolicies,
) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
fn send_transaction(
&self,
chain_id: ChainId,
tx: &Transaction,
) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
fn transfer_many(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
fn transfer_many_and_wait(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
fn await_send_result(
&self,
tx_id: &TxId,
tx: &Transaction,
) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
}
impl<S> WalletExt for Wallet<Unlocked<S>>
where
S: fuels::core::traits::Signer + Clone + Send + Sync + std::fmt::Debug + 'static,
{
async fn builder_data(&self) -> anyhow::Result<BuilderData> {
let provider = self.provider();
let consensus_parameters = provider.consensus_parameters().await?;
let gas_price = provider.estimate_gas_price(10).await?;
let builder_data = BuilderData {
consensus_parameters,
gas_price: gas_price.gas_price,
};
Ok(builder_data)
}
async fn build_transaction(
&self,
inputs: Vec<Input>,
outputs: Vec<Output>,
witnesses: Vec<Witness>,
mut tx_policies: TxPolicies,
) -> anyhow::Result<Transaction> {
if tx_policies.witness_limit().is_none() {
let witness_size = witnesses
.iter()
.map(|w| w.as_vec().len() as u64)
.sum::<u64>()
+ SIGNATURE_MARGIN as u64;
tx_policies = tx_policies.with_witness_limit(witness_size);
}
let mut tx_builder = ScriptTransactionBuilder::prepare_transfer(
inputs,
outputs.clone(),
tx_policies,
);
*tx_builder.witnesses_mut() = witnesses;
tx_builder = tx_builder.enable_burn(true);
tx_builder.add_signer(self.signer().clone())?;
let tx = tx_builder.build(self.provider()).await?;
Ok(tx.into())
}
#[tracing::instrument(skip_all)]
async fn send_transaction(
&self,
chain_id: ChainId,
tx: &Transaction,
) -> anyhow::Result<SendResult> {
let fuel_client = self.provider().client();
let tx_id = tx.id(&chain_id);
let result = async move {
let estimate_predicates = false;
let include_preconfirmation = true;
let mut stream = fuel_client
.submit_and_await_status_opt(
tx,
Some(estimate_predicates),
Some(include_preconfirmation),
)
.await?;
let mut status;
let now = Instant::now();
loop {
status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
"Failed to get pre confirmation from the stream"
))?;
if matches!(status, TransactionStatus::PreconfirmationSuccess { .. })
|| matches!(status, TransactionStatus::PreconfirmationFailure { .. })
|| matches!(status, TransactionStatus::Success { .. })
|| matches!(status, TransactionStatus::Failure { .. })
{
break;
}
if let TransactionStatus::SqueezedOut { reason } = &status {
return Err(anyhow::anyhow!(
"Transaction was squeezed out: {reason:?}"
));
}
}
let preconf_rx_time = now.elapsed();
let resolved;
match &status {
TransactionStatus::PreconfirmationSuccess {
resolved_outputs, ..
}
| TransactionStatus::PreconfirmationFailure {
resolved_outputs, ..
} => {
resolved =
resolved_outputs.clone().expect("Expected resolved outputs");
}
TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
let transaction = fuel_client
.transaction(&tx_id)
.await?
.ok_or(anyhow::anyhow!("Transaction not found"))?;
let TransactionType::Known(executed_tx) = transaction.transaction
else {
return Err(anyhow::anyhow!("Expected known transaction type"));
};
let resolved_outputs = executed_tx
.outputs()
.iter()
.enumerate()
.filter_map(|(index, output)| {
if output.is_change()
|| output.is_variable() && output.amount() != Some(0)
{
Some(ResolvedOutput {
utxo_id: UtxoId::new(tx_id, index as u16),
output: *output,
})
} else {
None
}
})
.collect::<Vec<_>>();
resolved = resolved_outputs;
}
_ => {
return Err(anyhow::anyhow!(
"Expected pre confirmation, but received: {status:?}"
));
}
}
let mut known_coins = vec![];
for (i, output) in tx.outputs().iter().enumerate() {
let utxo_id = UtxoId::new(tx_id, i as u16);
if let Output::Coin {
amount,
to,
asset_id,
} = *output
{
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
known_coins.push(coin);
}
}
let mut dynamic_coins = vec![];
for output in resolved {
let ResolvedOutput { utxo_id, output } = output;
match output {
Output::Change {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
Output::Variable {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
_ => {}
}
}
let result = SendResult {
tx_id,
tx_status: status.into(),
known_coins,
dynamic_coins,
preconf_rx_time: Some(preconf_rx_time),
};
Ok(result)
}
.await;
match result {
Ok(result) => Ok(result),
Err(err) => {
if err.is_duplicate() {
tracing::info!(
"Transaction {tx_id} already exists in the chain, \
waiting for confirmation."
);
self.await_send_result(&tx_id, tx).await
} else {
tracing::error!(
"The error is not duplicate, so returning it: {err:?}",
);
Err(err)
}
}
}
}
async fn build_transfer(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
) -> anyhow::Result<Transaction> {
let max_fee = builder_data.max_fee();
let base_asset_id = *builder_data.consensus_parameters.base_asset_id();
let payer: Address = self.address();
let asset_total = transfers
.iter()
.map(|(_, amount)| u128::from(*amount))
.sum::<u128>();
let balance_of = utxo_manager.balance_of(payer, asset_id);
if fetch_coins && balance_of < asset_total {
let asset_coins = self
.provider()
.get_spendable_resources(ResourceFilter {
from: self.address(),
asset_id: Some(asset_id),
amount: asset_total,
excluded_utxos: vec![],
excluded_message_nonces: vec![],
})
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to get spendable resources: \
{e} for {asset_id:?} from {payer:?} with amount {asset_total}"
)
})?
.into_iter()
.filter_map(|coin| match coin {
CoinType::Coin(coin) => Some(coin.into()),
_ => None,
});
utxo_manager.load_from_coins_vec(asset_coins.collect());
}
let fee_coins = if asset_id != base_asset_id {
utxo_manager.guaranteed_extract_coins(
payer,
base_asset_id,
max_fee as u128,
)?
} else {
vec![]
};
let mut total = transfers
.iter()
.map(|(_, amount)| u128::from(*amount))
.sum::<u128>();
if base_asset_id == asset_id {
total += max_fee as u128;
}
let asset_coins =
utxo_manager.guaranteed_extract_coins(payer, asset_id, total)?;
let mut output_coins = vec![];
for (recipient, amount) in transfers {
let output = Output::Coin {
to: *recipient,
amount: *amount,
asset_id,
};
output_coins.push(output);
}
output_coins.push(Output::Change {
to: payer,
amount: 0,
asset_id: base_asset_id,
});
if asset_id != base_asset_id {
output_coins.push(Output::Change {
to: payer,
amount: 0,
asset_id,
});
}
let mut input_coins = asset_coins;
input_coins.extend(fee_coins);
let inputs = input_coins
.into_iter()
.map(|coin| Input::resource_signed(CoinType::Coin(coin.into())))
.collect::<Vec<_>>();
let tx = self
.build_transaction(
inputs,
output_coins,
vec![],
TxPolicies::default().with_max_fee(max_fee),
)
.await?;
Ok(tx)
}
async fn transfer_many_and_wait(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> anyhow::Result<Vec<FuelTxCoin>> {
let known_coins = self
.transfer_many(
asset_id,
transfers,
utxo_manager,
builder_data,
fetch_coins,
chunk_size,
)
.await?;
if let Some(last_tx_id) = known_coins.last().map(|coin| coin.utxo_id.tx_id()) {
let tx_id = TxId::new((*last_tx_id).into());
self.provider()
.await_transaction_commit::<ScriptTransaction>(tx_id)
.await?;
}
Ok(known_coins)
}
async fn transfer_many(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> anyhow::Result<Vec<FuelTxCoin>> {
let chain_id = builder_data.consensus_parameters.chain_id();
match chunk_size {
None => {
let tx = self
.build_transfer(
asset_id,
transfers,
utxo_manager,
builder_data,
fetch_coins,
)
.await?;
let result = self.send_transaction(chain_id, &tx).await?;
Ok(result.known_coins)
}
Some(chunk_size) => {
let mut known_coins = vec![];
for chunk in transfers.chunks(chunk_size) {
let tx = self
.build_transfer(
asset_id,
chunk,
utxo_manager,
builder_data,
fetch_coins,
)
.await?;
let result = self.send_transaction(chain_id, &tx).await?;
known_coins.extend(result.known_coins);
utxo_manager.load_from_coins_vec(result.dynamic_coins);
}
Ok(known_coins)
}
}
}
#[tracing::instrument(skip(self, tx), fields(tx_id))]
async fn await_send_result(
&self,
tx_id: &TxId,
tx: &Transaction,
) -> anyhow::Result<SendResult> {
let fuel_client = self.provider().client();
let include_preconfirmation = true;
let result = fuel_client
.subscribe_transaction_status_opt(tx_id, Some(include_preconfirmation))
.await;
let mut stream = match result {
Ok(stream) => stream,
Err(err) => {
tracing::error!("Failed to subscribe to transaction status: {err:?}");
return Err(err.into());
}
};
let mut status;
let mut preconf_rx_time = None;
loop {
let now = Instant::now();
status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
"Failed to get transaction status from stream"
))?;
match status {
TransactionStatus::PreconfirmationSuccess { .. }
| TransactionStatus::PreconfirmationFailure { .. } => {
preconf_rx_time = Some(now.elapsed());
break;
}
TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
break;
}
TransactionStatus::SqueezedOut { reason } => {
tracing::error!(%tx_id, "Transaction was squeezed out: {reason:?}");
continue;
}
_ => continue,
}
}
let mut known_coins = vec![];
for (i, output) in tx.outputs().iter().enumerate() {
let utxo_id = UtxoId::new(*tx_id, i as u16);
if let Output::Coin {
amount,
to,
asset_id,
} = *output
{
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
known_coins.push(coin);
}
}
let mut dynamic_coins = vec![];
match &status {
TransactionStatus::PreconfirmationSuccess {
resolved_outputs, ..
}
| TransactionStatus::PreconfirmationFailure {
resolved_outputs, ..
} => {
let resolved_outputs = resolved_outputs.clone().unwrap_or_default();
for output in resolved_outputs {
let ResolvedOutput { utxo_id, output } = output;
match output {
Output::Change {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
Output::Variable {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
_ => {}
}
}
}
TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
let tx = fuel_client
.transaction(tx_id)
.await?
.ok_or(anyhow::anyhow!("Transaction not found"))?;
match tx.transaction {
TransactionType::Known(tx) => {
for (index, output) in tx.outputs().iter().enumerate() {
let utxo_id = UtxoId::new(*tx_id, index as u16);
match *output {
Output::Change {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
Output::Variable {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
_ => {}
}
}
}
TransactionType::Unknown => {}
}
}
_ => {
return Err(anyhow::anyhow!(
"Expected pre confirmation, but received: {status:?}"
));
}
}
let result = SendResult {
tx_id: *tx_id,
tx_status: status.into(),
known_coins,
dynamic_coins,
preconf_rx_time,
};
Ok(result)
}
}
pub(crate) trait ClientError {
fn is_duplicate(&self) -> bool;
}
impl<T> ClientError for T
where
T: ToString,
{
fn is_duplicate(&self) -> bool {
self.to_string().contains("Transaction id already exists")
}
}
const COIN_INVALID_PATTERNS: &[&str] = &[
"was already spent",
"does not exist",
"does not match the values from database",
"Coin owner is different from expected input",
"Coin output does not match expected input",
"asset_id does not match expected inputs",
"is blacklisted",
"Expected coin but output is contract",
];
pub fn is_coin_invalid_error(error: &str) -> bool {
COIN_INVALID_PATTERNS
.iter()
.any(|pattern| error.contains(pattern))
}
#[cfg(test)]
mod coin_error_tests {
use super::*;
#[test]
fn detects_coin_invalid_errors() {
let cases = [
"The UTXO input 0xabcd was already spent",
"UTXO (id: 0xabcd) does not exist",
"Input coin does not match the values from database",
"Input output mismatch. Coin owner is different from expected input",
"Input output mismatch. Coin output does not match expected input",
"Input output mismatch. Coin output asset_id does not match expected inputs",
"The UTXO `0xabcd` is blacklisted",
"Input output mismatch. Expected coin but output is contract",
];
for msg in cases {
assert!(is_coin_invalid_error(msg), "Should detect: {msg}");
}
}
#[test]
fn does_not_flag_non_coin_errors() {
let cases = [
"Transaction was squeezed out",
"Pool limit is hit, try to increase gas_price",
"The provided max fee can't cover the transaction cost",
"Transaction id already exists",
"Transaction chain dependency is already too big",
"Too much transactions are in queue",
];
for msg in cases {
assert!(!is_coin_invalid_error(msg), "Should NOT detect: {msg}");
}
}
}