use crate::utxo_manager::{
FuelTxCoin,
UtxoProvider,
};
use fuel_core_client::client::types::{
ResolvedOutput,
TransactionStatus,
TransactionType,
};
use fuel_core_types::{
blockchain::transaction::TransactionExt,
fuel_tx::{
Address,
AssetId,
ConsensusParameters,
Transaction,
TxId,
UniqueIdentifier,
UtxoId,
Witness,
},
fuel_types::ChainId,
};
use fuels::{
accounts::{
ViewOnlyAccount,
wallet::Unlocked,
},
prelude::{
BuildableTransaction,
ResourceFilter,
ScriptTransactionBuilder,
TransactionBuilder,
TxPolicies,
Wallet,
},
types::{
coin_type::CoinType,
input::Input,
output::Output,
transaction::ScriptTransaction,
tx_status::TxStatus,
},
};
use futures::{
StreamExt,
future::{
Either,
select,
},
pin_mut,
};
use std::{
future::Future,
ops::Mul,
time::{
Duration,
Instant,
},
};
pub const SIGNATURE_MARGIN: usize = 100;
#[derive(Clone, Debug)]
pub struct SendResult<T = TxStatus> {
pub tx_id: TxId,
pub tx_status: T,
pub known_coins: Vec<FuelTxCoin>,
pub dynamic_coins: Vec<FuelTxCoin>,
pub preconf_rx_time: Option<Duration>,
}
#[derive(Clone)]
pub struct BuilderData {
pub consensus_parameters: ConsensusParameters,
pub gas_price: u64,
}
impl BuilderData {
pub fn max_fee(&self) -> u64 {
let max_gas_limit = self.consensus_parameters.tx_params().max_gas_per_tx();
max_gas_limit
.mul(self.gas_price)
.div_ceil(self.consensus_parameters.fee_params().gas_price_factor())
}
}
pub trait WalletExt {
fn builder_data(&self) -> impl Future<Output = anyhow::Result<BuilderData>> + Send;
fn build_transfer(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
fn build_transaction(
&self,
inputs: Vec<Input>,
outputs: Vec<Output>,
witnesses: Vec<Witness>,
tx_policies: TxPolicies,
) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
fn send_transaction(
&self,
chain_id: ChainId,
tx: &Transaction,
) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
fn transfer_many(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
fn transfer_many_and_wait(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
fn await_send_result(
&self,
tx_id: &TxId,
tx: &Transaction,
) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
}
impl<S> WalletExt for Wallet<Unlocked<S>>
where
S: fuels::core::traits::Signer + Clone + Send + Sync + std::fmt::Debug + 'static,
{
async fn builder_data(&self) -> anyhow::Result<BuilderData> {
let provider = self.provider();
let consensus_parameters = provider.consensus_parameters().await?;
let gas_price = provider.estimate_gas_price(10).await?;
let builder_data = BuilderData {
consensus_parameters,
gas_price: gas_price.gas_price,
};
Ok(builder_data)
}
async fn build_transaction(
&self,
inputs: Vec<Input>,
outputs: Vec<Output>,
witnesses: Vec<Witness>,
mut tx_policies: TxPolicies,
) -> anyhow::Result<Transaction> {
if tx_policies.witness_limit().is_none() {
let witness_size = witnesses
.iter()
.map(|w| w.as_vec().len() as u64)
.sum::<u64>()
+ SIGNATURE_MARGIN as u64;
tx_policies = tx_policies.with_witness_limit(witness_size);
}
let mut tx_builder = ScriptTransactionBuilder::prepare_transfer(
inputs,
outputs.clone(),
tx_policies,
);
*tx_builder.witnesses_mut() = witnesses;
tx_builder = tx_builder.enable_burn(true);
tx_builder.add_signer(self.signer().clone())?;
let tx = tx_builder.build(self.provider()).await?;
Ok(tx.into())
}
#[tracing::instrument(skip_all)]
async fn send_transaction(
&self,
chain_id: ChainId,
tx: &Transaction,
) -> anyhow::Result<SendResult> {
let fuel_client = self.provider().client();
let tx_id = tx.id(&chain_id);
let result = async move {
let estimate_predicates = false;
let include_preconfirmation = true;
let send_future = fuel_client.submit_opt(tx, Some(estimate_predicates));
let stream_future = fuel_client
.subscribe_transaction_status_opt(&tx_id, Some(include_preconfirmation));
pin_mut!(send_future, stream_future);
let result = select(send_future, stream_future).await;
let mut stream = match result {
Either::Left((send_result, stream_future)) => {
send_result?;
stream_future.await?
}
Either::Right((stream_result, send_future)) => {
let stream = stream_result?;
send_future.await?;
stream
}
};
let mut status;
let now = Instant::now();
loop {
status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
"Failed to get pre confirmation from the stream"
))?;
if matches!(status, TransactionStatus::PreconfirmationSuccess { .. })
|| matches!(status, TransactionStatus::PreconfirmationFailure { .. })
|| matches!(status, TransactionStatus::Success { .. })
|| matches!(status, TransactionStatus::Failure { .. })
{
break;
}
if let TransactionStatus::SqueezedOut { reason } = &status {
return Err(anyhow::anyhow!(
"Transaction was squeezed out: {reason:?}"
));
}
}
let preconf_rx_time = now.elapsed();
let resolved;
match &status {
TransactionStatus::PreconfirmationSuccess {
resolved_outputs, ..
}
| TransactionStatus::PreconfirmationFailure {
resolved_outputs, ..
} => {
resolved =
resolved_outputs.clone().expect("Expected resolved outputs");
}
TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
let transaction = fuel_client
.transaction(&tx_id)
.await?
.ok_or(anyhow::anyhow!("Transaction not found"))?;
let TransactionType::Known(executed_tx) = transaction.transaction
else {
return Err(anyhow::anyhow!("Expected known transaction type"));
};
let resolved_outputs = executed_tx
.outputs()
.iter()
.enumerate()
.filter_map(|(index, output)| {
if output.is_change()
|| output.is_variable() && output.amount() != Some(0)
{
Some(ResolvedOutput {
utxo_id: UtxoId::new(tx_id, index as u16),
output: *output,
})
} else {
None
}
})
.collect::<Vec<_>>();
resolved = resolved_outputs;
}
_ => {
return Err(anyhow::anyhow!(
"Expected pre confirmation, but received: {status:?}"
));
}
}
let mut known_coins = vec![];
for (i, output) in tx.outputs().iter().enumerate() {
let utxo_id = UtxoId::new(tx_id, i as u16);
if let Output::Coin {
amount,
to,
asset_id,
} = *output
{
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
known_coins.push(coin);
}
}
let mut dynamic_coins = vec![];
for output in resolved {
let ResolvedOutput { utxo_id, output } = output;
match output {
Output::Change {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
Output::Variable {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
_ => {}
}
}
let result = SendResult {
tx_id,
tx_status: status.into(),
known_coins,
dynamic_coins,
preconf_rx_time: Some(preconf_rx_time),
};
Ok(result)
}
.await;
match result {
Ok(result) => Ok(result),
Err(err) => {
if err.is_duplicate() {
let chain_id =
self.provider().consensus_parameters().await?.chain_id();
let tx_id = tx.id(&chain_id);
tracing::info!(
"Transaction {tx_id} already exists in the chain, \
waiting for confirmation."
);
self.await_send_result(&tx_id, tx).await
} else {
tracing::error!(
"The error is not duplicate, so returning it: {err:?}",
);
Err(err)
}
}
}
}
async fn build_transfer(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
) -> anyhow::Result<Transaction> {
let max_fee = builder_data.max_fee();
let base_asset_id = *builder_data.consensus_parameters.base_asset_id();
let payer: Address = self.address();
let asset_total = transfers
.iter()
.map(|(_, amount)| u128::from(*amount))
.sum::<u128>();
let balance_of = utxo_manager.balance_of(payer, asset_id);
if fetch_coins && balance_of < asset_total {
let asset_coins = self
.provider()
.get_spendable_resources(ResourceFilter {
from: self.address(),
asset_id: Some(asset_id),
amount: asset_total,
excluded_utxos: vec![],
excluded_message_nonces: vec![],
})
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to get spendable resources: \
{e} for {asset_id:?} from {payer:?} with amount {asset_total}"
)
})?
.into_iter()
.filter_map(|coin| match coin {
CoinType::Coin(coin) => Some(coin.into()),
_ => None,
});
utxo_manager.load_from_coins_vec(asset_coins.collect());
}
let fee_coins = if asset_id != base_asset_id {
utxo_manager.guaranteed_extract_coins(
payer,
base_asset_id,
max_fee as u128,
)?
} else {
vec![]
};
let mut total = transfers
.iter()
.map(|(_, amount)| u128::from(*amount))
.sum::<u128>();
if base_asset_id == asset_id {
total += max_fee as u128;
}
let asset_coins =
utxo_manager.guaranteed_extract_coins(payer, asset_id, total)?;
let mut output_coins = vec![];
for (recipient, amount) in transfers {
let output = Output::Coin {
to: *recipient,
amount: *amount,
asset_id,
};
output_coins.push(output);
}
output_coins.push(Output::Change {
to: payer,
amount: 0,
asset_id: base_asset_id,
});
if asset_id != base_asset_id {
output_coins.push(Output::Change {
to: payer,
amount: 0,
asset_id,
});
}
let mut input_coins = asset_coins;
input_coins.extend(fee_coins);
let inputs = input_coins
.into_iter()
.map(|coin| Input::resource_signed(CoinType::Coin(coin.into())))
.collect::<Vec<_>>();
let tx = self
.build_transaction(
inputs,
output_coins,
vec![],
TxPolicies::default().with_max_fee(max_fee),
)
.await?;
Ok(tx)
}
async fn transfer_many_and_wait(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> anyhow::Result<Vec<FuelTxCoin>> {
let known_coins = self
.transfer_many(
asset_id,
transfers,
utxo_manager,
builder_data,
fetch_coins,
chunk_size,
)
.await?;
if let Some(last_tx_id) = known_coins.last().map(|coin| coin.utxo_id.tx_id()) {
let tx_id = TxId::new((*last_tx_id).into());
self.provider()
.await_transaction_commit::<ScriptTransaction>(tx_id)
.await?;
}
Ok(known_coins)
}
async fn transfer_many(
&self,
asset_id: AssetId,
transfers: &[(Address, u64)],
utxo_manager: &mut dyn UtxoProvider,
builder_data: &BuilderData,
fetch_coins: bool,
chunk_size: Option<usize>,
) -> anyhow::Result<Vec<FuelTxCoin>> {
let chain_id = builder_data.consensus_parameters.chain_id();
match chunk_size {
None => {
let tx = self
.build_transfer(
asset_id,
transfers,
utxo_manager,
builder_data,
fetch_coins,
)
.await?;
let result = self.send_transaction(chain_id, &tx).await?;
Ok(result.known_coins)
}
Some(chunk_size) => {
let mut known_coins = vec![];
for chunk in transfers.chunks(chunk_size) {
let tx = self
.build_transfer(
asset_id,
chunk,
utxo_manager,
builder_data,
fetch_coins,
)
.await?;
let result = self.send_transaction(chain_id, &tx).await?;
known_coins.extend(result.known_coins);
utxo_manager.load_from_coins_vec(result.dynamic_coins);
}
Ok(known_coins)
}
}
}
#[tracing::instrument(skip(self, tx), fields(tx_id))]
async fn await_send_result(
&self,
tx_id: &TxId,
tx: &Transaction,
) -> anyhow::Result<SendResult> {
let fuel_client = self.provider().client();
let include_preconfirmation = true;
let result = fuel_client
.subscribe_transaction_status_opt(tx_id, Some(include_preconfirmation))
.await;
let mut stream = match result {
Ok(stream) => stream,
Err(err) => {
tracing::error!("Failed to subscribe to transaction status: {err:?}");
return Err(err.into());
}
};
let mut status;
let mut preconf_rx_time = None;
loop {
let now = Instant::now();
status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
"Failed to get transaction status from stream"
))?;
match status {
TransactionStatus::PreconfirmationSuccess { .. }
| TransactionStatus::PreconfirmationFailure { .. } => {
preconf_rx_time = Some(now.elapsed());
break;
}
TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
break;
}
TransactionStatus::SqueezedOut { reason } => {
tracing::error!(%tx_id, "Transaction was squeezed out: {reason:?}");
continue;
}
_ => continue,
}
}
let mut known_coins = vec![];
for (i, output) in tx.outputs().iter().enumerate() {
let utxo_id = UtxoId::new(*tx_id, i as u16);
if let Output::Coin {
amount,
to,
asset_id,
} = *output
{
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
known_coins.push(coin);
}
}
let mut dynamic_coins = vec![];
match &status {
TransactionStatus::PreconfirmationSuccess {
resolved_outputs, ..
}
| TransactionStatus::PreconfirmationFailure {
resolved_outputs, ..
} => {
let resolved_outputs = resolved_outputs.clone().unwrap_or_default();
for output in resolved_outputs {
let ResolvedOutput { utxo_id, output } = output;
match output {
Output::Change {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
Output::Variable {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
_ => {}
}
}
}
TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
let tx = fuel_client
.transaction(tx_id)
.await?
.ok_or(anyhow::anyhow!("Transaction not found"))?;
match tx.transaction {
TransactionType::Known(tx) => {
for (index, output) in tx.outputs().iter().enumerate() {
let utxo_id = UtxoId::new(*tx_id, index as u16);
match *output {
Output::Change {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
Output::Variable {
amount,
to,
asset_id,
} => {
let coin = FuelTxCoin {
amount,
asset_id,
utxo_id,
owner: to,
};
dynamic_coins.push(coin);
}
_ => {}
}
}
}
TransactionType::Unknown => {}
}
}
_ => {
return Err(anyhow::anyhow!(
"Expected pre confirmation, but received: {status:?}"
));
}
}
let result = SendResult {
tx_id: *tx_id,
tx_status: status.into(),
known_coins,
dynamic_coins,
preconf_rx_time,
};
Ok(result)
}
}
pub(crate) trait ClientError {
fn is_duplicate(&self) -> bool;
}
impl<T> ClientError for T
where
T: ToString,
{
fn is_duplicate(&self) -> bool {
self.to_string().contains("Transaction id already exists")
}
}