1use crate::utxo_manager::{
2 FuelTxCoin,
3 UtxoProvider,
4};
5use fuel_core_client::client::types::{
6 ResolvedOutput,
7 TransactionStatus,
8 TransactionType,
9};
10use fuel_core_types::{
11 blockchain::transaction::TransactionExt,
12 fuel_tx::{
13 Address,
14 AssetId,
15 ConsensusParameters,
16 Transaction,
17 TxId,
18 UniqueIdentifier,
19 UtxoId,
20 Witness,
21 },
22 fuel_types::ChainId,
23};
24use fuels::{
25 accounts::{
26 ViewOnlyAccount,
27 wallet::Unlocked,
28 },
29 prelude::{
30 BuildableTransaction,
31 ResourceFilter,
32 ScriptTransactionBuilder,
33 TransactionBuilder,
34 TxPolicies,
35 Wallet,
36 },
37 types::{
38 coin_type::CoinType,
39 input::Input,
40 output::Output,
41 transaction::ScriptTransaction,
42 tx_status::TxStatus,
43 },
44};
45use futures::StreamExt;
46use std::{
47 future::Future,
48 ops::Mul,
49 time::{
50 Duration,
51 Instant,
52 },
53};
54
55pub const SIGNATURE_MARGIN: usize = 100;
56
57#[derive(Clone, Debug)]
58pub struct SendResult<T = TxStatus> {
59 pub tx_id: TxId,
60 pub tx_status: T,
61 pub known_coins: Vec<FuelTxCoin>,
62 pub dynamic_coins: Vec<FuelTxCoin>,
63 pub preconf_rx_time: Option<Duration>,
64}
65
66#[derive(Clone)]
67pub struct BuilderData {
68 pub consensus_parameters: ConsensusParameters,
69 pub gas_price: u64,
70}
71
72impl BuilderData {
73 pub fn max_fee(&self) -> u64 {
74 let max_gas_limit = self.consensus_parameters.tx_params().max_gas_per_tx();
75 max_gas_limit
77 .mul(self.gas_price)
78 .div_ceil(self.consensus_parameters.fee_params().gas_price_factor())
79 }
80}
81
82pub trait WalletExt {
83 fn builder_data(&self) -> impl Future<Output = anyhow::Result<BuilderData>> + Send;
84
85 fn build_transfer(
86 &self,
87 asset_id: AssetId,
88 transfers: &[(Address, u64)],
89 utxo_manager: &mut dyn UtxoProvider,
90 builder_data: &BuilderData,
91 fetch_coins: bool,
92 ) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
93
94 fn build_transaction(
95 &self,
96 inputs: Vec<Input>,
97 outputs: Vec<Output>,
98 witnesses: Vec<Witness>,
99 tx_policies: TxPolicies,
100 ) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
101
102 fn send_transaction(
103 &self,
104 chain_id: ChainId,
105 tx: &Transaction,
106 ) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
107
108 fn transfer_many(
109 &self,
110 asset_id: AssetId,
111 transfers: &[(Address, u64)],
112 utxo_manager: &mut dyn UtxoProvider,
113 builder_data: &BuilderData,
114 fetch_coins: bool,
115 chunk_size: Option<usize>,
116 ) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
117
118 fn transfer_many_and_wait(
119 &self,
120 asset_id: AssetId,
121 transfers: &[(Address, u64)],
122 utxo_manager: &mut dyn UtxoProvider,
123 builder_data: &BuilderData,
124 fetch_coins: bool,
125 chunk_size: Option<usize>,
126 ) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
127
128 fn await_send_result(
129 &self,
130 tx_id: &TxId,
131 tx: &Transaction,
132 ) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
133}
134
135impl<S> WalletExt for Wallet<Unlocked<S>>
136where
137 S: fuels::core::traits::Signer + Clone + Send + Sync + std::fmt::Debug + 'static,
138{
139 async fn builder_data(&self) -> anyhow::Result<BuilderData> {
140 let provider = self.provider();
141 let consensus_parameters = provider.consensus_parameters().await?;
142 let gas_price = provider.estimate_gas_price(10).await?;
143
144 let builder_data = BuilderData {
145 consensus_parameters,
146 gas_price: gas_price.gas_price,
147 };
148
149 Ok(builder_data)
150 }
151
152 async fn build_transaction(
153 &self,
154 inputs: Vec<Input>,
155 outputs: Vec<Output>,
156 witnesses: Vec<Witness>,
157 mut tx_policies: TxPolicies,
158 ) -> anyhow::Result<Transaction> {
159 if tx_policies.witness_limit().is_none() {
160 let witness_size = witnesses
161 .iter()
162 .map(|w| w.as_vec().len() as u64)
163 .sum::<u64>()
164 + SIGNATURE_MARGIN as u64;
165
166 tx_policies = tx_policies.with_witness_limit(witness_size);
167 }
168
169 let mut tx_builder = ScriptTransactionBuilder::prepare_transfer(
170 inputs,
171 outputs.clone(),
172 tx_policies,
173 );
174 *tx_builder.witnesses_mut() = witnesses;
175 tx_builder = tx_builder.enable_burn(true);
176 tx_builder.add_signer(self.signer().clone())?;
177
178 let tx = tx_builder.build(self.provider()).await?;
179 Ok(tx.into())
180 }
181
182 #[tracing::instrument(skip_all)]
183 async fn send_transaction(
184 &self,
185 chain_id: ChainId,
186 tx: &Transaction,
187 ) -> anyhow::Result<SendResult> {
188 let fuel_client = self.provider().client();
189 let tx_id = tx.id(&chain_id);
190
191 let result = async move {
192 let estimate_predicates = false;
193 let include_preconfirmation = true;
194 let mut stream = fuel_client
195 .submit_and_await_status_opt(
196 tx,
197 Some(estimate_predicates),
198 Some(include_preconfirmation),
199 )
200 .await?;
201
202 let mut status;
203 let now = Instant::now();
204
205 loop {
206 status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
207 "Failed to get pre confirmation from the stream"
208 ))?;
209
210 if matches!(status, TransactionStatus::PreconfirmationSuccess { .. })
211 || matches!(status, TransactionStatus::PreconfirmationFailure { .. })
212 || matches!(status, TransactionStatus::Success { .. })
213 || matches!(status, TransactionStatus::Failure { .. })
214 {
215 break;
216 }
217
218 if let TransactionStatus::SqueezedOut { reason } = &status {
219 return Err(anyhow::anyhow!(
220 "Transaction was squeezed out: {reason:?}"
221 ));
222 }
223 }
224 let preconf_rx_time = now.elapsed();
225
226 let resolved;
227 match &status {
228 TransactionStatus::PreconfirmationSuccess {
229 resolved_outputs, ..
230 }
231 | TransactionStatus::PreconfirmationFailure {
232 resolved_outputs, ..
233 } => {
234 resolved =
235 resolved_outputs.clone().expect("Expected resolved outputs");
236 }
237 TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
238 let transaction = fuel_client
239 .transaction(&tx_id)
240 .await?
241 .ok_or(anyhow::anyhow!("Transaction not found"))?;
242
243 let TransactionType::Known(executed_tx) = transaction.transaction
244 else {
245 return Err(anyhow::anyhow!("Expected known transaction type"));
246 };
247
248 let resolved_outputs = executed_tx
249 .outputs()
250 .iter()
251 .enumerate()
252 .filter_map(|(index, output)| {
253 if output.is_change()
254 || output.is_variable() && output.amount() != Some(0)
255 {
256 Some(ResolvedOutput {
257 utxo_id: UtxoId::new(tx_id, index as u16),
258 output: *output,
259 })
260 } else {
261 None
262 }
263 })
264 .collect::<Vec<_>>();
265
266 resolved = resolved_outputs;
267 }
268 _ => {
269 return Err(anyhow::anyhow!(
270 "Expected pre confirmation, but received: {status:?}"
271 ));
272 }
273 }
274
275 let mut known_coins = vec![];
276 for (i, output) in tx.outputs().iter().enumerate() {
277 let utxo_id = UtxoId::new(tx_id, i as u16);
278 if let Output::Coin {
279 amount,
280 to,
281 asset_id,
282 } = *output
283 {
284 let coin = FuelTxCoin {
285 amount,
286 asset_id,
287 utxo_id,
288 owner: to,
289 };
290
291 known_coins.push(coin);
292 }
293 }
294
295 let mut dynamic_coins = vec![];
296 for output in resolved {
297 let ResolvedOutput { utxo_id, output } = output;
298 match output {
299 Output::Change {
300 amount,
301 to,
302 asset_id,
303 } => {
304 let coin = FuelTxCoin {
305 amount,
306 asset_id,
307 utxo_id,
308 owner: to,
309 };
310
311 dynamic_coins.push(coin);
312 }
313 Output::Variable {
314 amount,
315 to,
316 asset_id,
317 } => {
318 let coin = FuelTxCoin {
319 amount,
320 asset_id,
321 utxo_id,
322 owner: to,
323 };
324
325 dynamic_coins.push(coin);
326 }
327 _ => {}
328 }
329 }
330
331 let result = SendResult {
332 tx_id,
333 tx_status: status.into(),
334 known_coins,
335 dynamic_coins,
336 preconf_rx_time: Some(preconf_rx_time),
337 };
338
339 Ok(result)
340 }
341 .await;
342
343 match result {
344 Ok(result) => Ok(result),
345 Err(err) => {
346 if err.is_duplicate() {
347 tracing::info!(
348 "Transaction {tx_id} already exists in the chain, \
349 waiting for confirmation."
350 );
351 self.await_send_result(&tx_id, tx).await
352 } else {
353 tracing::error!(
354 "The error is not duplicate, so returning it: {err:?}",
355 );
356 Err(err)
357 }
358 }
359 }
360 }
361
362 async fn build_transfer(
363 &self,
364 asset_id: AssetId,
365 transfers: &[(Address, u64)],
366 utxo_manager: &mut dyn UtxoProvider,
367 builder_data: &BuilderData,
368 fetch_coins: bool,
369 ) -> anyhow::Result<Transaction> {
370 let max_fee = builder_data.max_fee();
372
373 let base_asset_id = *builder_data.consensus_parameters.base_asset_id();
374
375 let payer: Address = self.address();
376
377 let asset_total = transfers
378 .iter()
379 .map(|(_, amount)| u128::from(*amount))
380 .sum::<u128>();
381
382 let balance_of = utxo_manager.balance_of(payer, asset_id);
383 if fetch_coins && balance_of < asset_total {
384 let asset_coins = self
385 .provider()
386 .get_spendable_resources(ResourceFilter {
387 from: self.address(),
388 asset_id: Some(asset_id),
389 amount: asset_total,
390 excluded_utxos: vec![],
391 excluded_message_nonces: vec![],
392 })
393 .await
394 .map_err(|e| {
395 anyhow::anyhow!(
396 "Failed to get spendable resources: \
397 {e} for {asset_id:?} from {payer:?} with amount {asset_total}"
398 )
399 })?
400 .into_iter()
401 .filter_map(|coin| match coin {
402 CoinType::Coin(coin) => Some(coin.into()),
403 _ => None,
404 });
405
406 utxo_manager.load_from_coins_vec(asset_coins.collect());
407 }
408
409 let fee_coins = if asset_id != base_asset_id {
410 utxo_manager.guaranteed_extract_coins(
411 payer,
412 base_asset_id,
413 max_fee as u128,
414 )?
415 } else {
416 vec![]
417 };
418
419 let mut total = transfers
420 .iter()
421 .map(|(_, amount)| u128::from(*amount))
422 .sum::<u128>();
423
424 if base_asset_id == asset_id {
425 total += max_fee as u128;
426 }
427
428 let asset_coins =
429 utxo_manager.guaranteed_extract_coins(payer, asset_id, total)?;
430
431 let mut output_coins = vec![];
432 for (recipient, amount) in transfers {
433 let output = Output::Coin {
434 to: *recipient,
435 amount: *amount,
436 asset_id,
437 };
438 output_coins.push(output);
439 }
440
441 output_coins.push(Output::Change {
442 to: payer,
443 amount: 0,
444 asset_id: base_asset_id,
445 });
446
447 if asset_id != base_asset_id {
448 output_coins.push(Output::Change {
449 to: payer,
450 amount: 0,
451 asset_id,
452 });
453 }
454
455 let mut input_coins = asset_coins;
456 input_coins.extend(fee_coins);
457
458 let inputs = input_coins
459 .into_iter()
460 .map(|coin| Input::resource_signed(CoinType::Coin(coin.into())))
461 .collect::<Vec<_>>();
462
463 let tx = self
464 .build_transaction(
465 inputs,
466 output_coins,
467 vec![],
468 TxPolicies::default().with_max_fee(max_fee),
469 )
470 .await?;
471
472 Ok(tx)
473 }
474
475 async fn transfer_many_and_wait(
476 &self,
477 asset_id: AssetId,
478 transfers: &[(Address, u64)],
479 utxo_manager: &mut dyn UtxoProvider,
480 builder_data: &BuilderData,
481 fetch_coins: bool,
482 chunk_size: Option<usize>,
483 ) -> anyhow::Result<Vec<FuelTxCoin>> {
484 let known_coins = self
485 .transfer_many(
486 asset_id,
487 transfers,
488 utxo_manager,
489 builder_data,
490 fetch_coins,
491 chunk_size,
492 )
493 .await?;
494
495 if let Some(last_tx_id) = known_coins.last().map(|coin| coin.utxo_id.tx_id()) {
496 let tx_id = TxId::new((*last_tx_id).into());
497 self.provider()
498 .await_transaction_commit::<ScriptTransaction>(tx_id)
499 .await?;
500 }
501
502 Ok(known_coins)
503 }
504
505 async fn transfer_many(
506 &self,
507 asset_id: AssetId,
508 transfers: &[(Address, u64)],
509 utxo_manager: &mut dyn UtxoProvider,
510 builder_data: &BuilderData,
511 fetch_coins: bool,
512 chunk_size: Option<usize>,
513 ) -> anyhow::Result<Vec<FuelTxCoin>> {
514 let chain_id = builder_data.consensus_parameters.chain_id();
515 match chunk_size {
516 None => {
517 let tx = self
518 .build_transfer(
519 asset_id,
520 transfers,
521 utxo_manager,
522 builder_data,
523 fetch_coins,
524 )
525 .await?;
526 let result = self.send_transaction(chain_id, &tx).await?;
527 Ok(result.known_coins)
528 }
529 Some(chunk_size) => {
530 let mut known_coins = vec![];
531 for chunk in transfers.chunks(chunk_size) {
532 let tx = self
533 .build_transfer(
534 asset_id,
535 chunk,
536 utxo_manager,
537 builder_data,
538 fetch_coins,
539 )
540 .await?;
541 let result = self.send_transaction(chain_id, &tx).await?;
542
543 known_coins.extend(result.known_coins);
544 utxo_manager.load_from_coins_vec(result.dynamic_coins);
545 }
546
547 Ok(known_coins)
548 }
549 }
550 }
551
552 #[tracing::instrument(skip(self, tx), fields(tx_id))]
553 async fn await_send_result(
554 &self,
555 tx_id: &TxId,
556 tx: &Transaction,
557 ) -> anyhow::Result<SendResult> {
558 let fuel_client = self.provider().client();
559
560 let include_preconfirmation = true;
561 let result = fuel_client
562 .subscribe_transaction_status_opt(tx_id, Some(include_preconfirmation))
563 .await;
564 let mut stream = match result {
565 Ok(stream) => stream,
566 Err(err) => {
567 tracing::error!("Failed to subscribe to transaction status: {err:?}");
568 return Err(err.into());
569 }
570 };
571
572 let mut status;
573 let mut preconf_rx_time = None;
574 loop {
575 let now = Instant::now();
576 status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
577 "Failed to get transaction status from stream"
578 ))?;
579
580 match status {
581 TransactionStatus::PreconfirmationSuccess { .. }
582 | TransactionStatus::PreconfirmationFailure { .. } => {
583 preconf_rx_time = Some(now.elapsed());
584 break;
585 }
586 TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
587 break;
588 }
589 TransactionStatus::SqueezedOut { reason } => {
590 tracing::error!(%tx_id, "Transaction was squeezed out: {reason:?}");
591 continue;
592 }
593 _ => continue,
594 }
595 }
596
597 let mut known_coins = vec![];
598 for (i, output) in tx.outputs().iter().enumerate() {
599 let utxo_id = UtxoId::new(*tx_id, i as u16);
600 if let Output::Coin {
601 amount,
602 to,
603 asset_id,
604 } = *output
605 {
606 let coin = FuelTxCoin {
607 amount,
608 asset_id,
609 utxo_id,
610 owner: to,
611 };
612
613 known_coins.push(coin);
614 }
615 }
616
617 let mut dynamic_coins = vec![];
618 match &status {
619 TransactionStatus::PreconfirmationSuccess {
620 resolved_outputs, ..
621 }
622 | TransactionStatus::PreconfirmationFailure {
623 resolved_outputs, ..
624 } => {
625 let resolved_outputs = resolved_outputs.clone().unwrap_or_default();
626
627 for output in resolved_outputs {
628 let ResolvedOutput { utxo_id, output } = output;
629 match output {
630 Output::Change {
631 amount,
632 to,
633 asset_id,
634 } => {
635 let coin = FuelTxCoin {
636 amount,
637 asset_id,
638 utxo_id,
639 owner: to,
640 };
641
642 dynamic_coins.push(coin);
643 }
644 Output::Variable {
645 amount,
646 to,
647 asset_id,
648 } => {
649 let coin = FuelTxCoin {
650 amount,
651 asset_id,
652 utxo_id,
653 owner: to,
654 };
655
656 dynamic_coins.push(coin);
657 }
658 _ => {}
659 }
660 }
661 }
662 TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
663 let tx = fuel_client
664 .transaction(tx_id)
665 .await?
666 .ok_or(anyhow::anyhow!("Transaction not found"))?;
667
668 match tx.transaction {
669 TransactionType::Known(tx) => {
670 for (index, output) in tx.outputs().iter().enumerate() {
671 let utxo_id = UtxoId::new(*tx_id, index as u16);
672
673 match *output {
674 Output::Change {
675 amount,
676 to,
677 asset_id,
678 } => {
679 let coin = FuelTxCoin {
680 amount,
681 asset_id,
682 utxo_id,
683 owner: to,
684 };
685
686 dynamic_coins.push(coin);
687 }
688 Output::Variable {
689 amount,
690 to,
691 asset_id,
692 } => {
693 let coin = FuelTxCoin {
694 amount,
695 asset_id,
696 utxo_id,
697 owner: to,
698 };
699
700 dynamic_coins.push(coin);
701 }
702 _ => {}
703 }
704 }
705 }
706 TransactionType::Unknown => {}
707 }
708 }
709 _ => {
710 return Err(anyhow::anyhow!(
711 "Expected pre confirmation, but received: {status:?}"
712 ));
713 }
714 }
715
716 let result = SendResult {
717 tx_id: *tx_id,
718 tx_status: status.into(),
719 known_coins,
720 dynamic_coins,
721 preconf_rx_time,
722 };
723
724 Ok(result)
725 }
726}
727
728pub(crate) trait ClientError {
729 fn is_duplicate(&self) -> bool;
730}
731
732impl<T> ClientError for T
733where
734 T: ToString,
735{
736 fn is_duplicate(&self) -> bool {
737 self.to_string().contains("Transaction id already exists")
738 }
739}
740
741const COIN_INVALID_PATTERNS: &[&str] = &[
750 "was already spent",
751 "does not exist",
752 "does not match the values from database",
753 "Coin owner is different from expected input",
754 "Coin output does not match expected input",
755 "asset_id does not match expected inputs",
756 "is blacklisted",
757 "Expected coin but output is contract",
758];
759
760pub fn is_coin_invalid_error(error: &str) -> bool {
764 COIN_INVALID_PATTERNS
765 .iter()
766 .any(|pattern| error.contains(pattern))
767}
768
769#[cfg(test)]
770mod coin_error_tests {
771 use super::*;
772
773 #[test]
774 fn detects_coin_invalid_errors() {
775 let cases = [
776 "The UTXO input 0xabcd was already spent",
777 "UTXO (id: 0xabcd) does not exist",
778 "Input coin does not match the values from database",
779 "Input output mismatch. Coin owner is different from expected input",
780 "Input output mismatch. Coin output does not match expected input",
781 "Input output mismatch. Coin output asset_id does not match expected inputs",
782 "The UTXO `0xabcd` is blacklisted",
783 "Input output mismatch. Expected coin but output is contract",
784 ];
785 for msg in cases {
786 assert!(is_coin_invalid_error(msg), "Should detect: {msg}");
787 }
788 }
789
790 #[test]
791 fn does_not_flag_non_coin_errors() {
792 let cases = [
793 "Transaction was squeezed out",
794 "Pool limit is hit, try to increase gas_price",
795 "The provided max fee can't cover the transaction cost",
796 "Transaction id already exists",
797 "Transaction chain dependency is already too big",
798 "Too much transactions are in queue",
799 ];
800 for msg in cases {
801 assert!(!is_coin_invalid_error(msg), "Should NOT detect: {msg}");
802 }
803 }
804}