1use crate::utxo_manager::{
2 FuelTxCoin,
3 UtxoProvider,
4};
5use fuel_core_client::client::types::{
6 ResolvedOutput,
7 TransactionStatus,
8 TransactionType,
9};
10use fuel_core_types::{
11 blockchain::transaction::TransactionExt,
12 fuel_tx::{
13 Address,
14 AssetId,
15 ConsensusParameters,
16 Transaction,
17 TxId,
18 UniqueIdentifier,
19 UtxoId,
20 Witness,
21 },
22 fuel_types::ChainId,
23};
24use fuels::{
25 accounts::{
26 ViewOnlyAccount,
27 wallet::Unlocked,
28 },
29 prelude::{
30 BuildableTransaction,
31 ResourceFilter,
32 ScriptTransactionBuilder,
33 TransactionBuilder,
34 TxPolicies,
35 Wallet,
36 },
37 types::{
38 coin_type::CoinType,
39 input::Input,
40 output::Output,
41 transaction::ScriptTransaction,
42 tx_status::TxStatus,
43 },
44};
45use futures::{
46 StreamExt,
47 future::{
48 Either,
49 select,
50 },
51 pin_mut,
52};
53use std::{
54 future::Future,
55 ops::Mul,
56 time::{
57 Duration,
58 Instant,
59 },
60};
61
62pub const SIGNATURE_MARGIN: usize = 100;
63
64#[derive(Clone, Debug)]
65pub struct SendResult<T = TxStatus> {
66 pub tx_id: TxId,
67 pub tx_status: T,
68 pub known_coins: Vec<FuelTxCoin>,
69 pub dynamic_coins: Vec<FuelTxCoin>,
70 pub preconf_rx_time: Option<Duration>,
71}
72
73#[derive(Clone)]
74pub struct BuilderData {
75 pub consensus_parameters: ConsensusParameters,
76 pub gas_price: u64,
77}
78
79impl BuilderData {
80 pub fn max_fee(&self) -> u64 {
81 let max_gas_limit = self.consensus_parameters.tx_params().max_gas_per_tx();
82 max_gas_limit
84 .mul(self.gas_price)
85 .div_ceil(self.consensus_parameters.fee_params().gas_price_factor())
86 }
87}
88
89pub trait WalletExt {
90 fn builder_data(&self) -> impl Future<Output = anyhow::Result<BuilderData>> + Send;
91
92 fn build_transfer(
93 &self,
94 asset_id: AssetId,
95 transfers: &[(Address, u64)],
96 utxo_manager: &mut dyn UtxoProvider,
97 builder_data: &BuilderData,
98 fetch_coins: bool,
99 ) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
100
101 fn build_transaction(
102 &self,
103 inputs: Vec<Input>,
104 outputs: Vec<Output>,
105 witnesses: Vec<Witness>,
106 tx_policies: TxPolicies,
107 ) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
108
109 fn send_transaction(
110 &self,
111 chain_id: ChainId,
112 tx: &Transaction,
113 ) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
114
115 fn transfer_many(
116 &self,
117 asset_id: AssetId,
118 transfers: &[(Address, u64)],
119 utxo_manager: &mut dyn UtxoProvider,
120 builder_data: &BuilderData,
121 fetch_coins: bool,
122 chunk_size: Option<usize>,
123 ) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
124
125 fn transfer_many_and_wait(
126 &self,
127 asset_id: AssetId,
128 transfers: &[(Address, u64)],
129 utxo_manager: &mut dyn UtxoProvider,
130 builder_data: &BuilderData,
131 fetch_coins: bool,
132 chunk_size: Option<usize>,
133 ) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
134
135 fn await_send_result(
136 &self,
137 tx_id: &TxId,
138 tx: &Transaction,
139 ) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
140}
141
142impl<S> WalletExt for Wallet<Unlocked<S>>
143where
144 S: fuels::core::traits::Signer + Clone + Send + Sync + std::fmt::Debug + 'static,
145{
146 async fn builder_data(&self) -> anyhow::Result<BuilderData> {
147 let provider = self.provider();
148 let consensus_parameters = provider.consensus_parameters().await?;
149 let gas_price = provider.estimate_gas_price(10).await?;
150
151 let builder_data = BuilderData {
152 consensus_parameters,
153 gas_price: gas_price.gas_price,
154 };
155
156 Ok(builder_data)
157 }
158
159 async fn build_transaction(
160 &self,
161 inputs: Vec<Input>,
162 outputs: Vec<Output>,
163 witnesses: Vec<Witness>,
164 mut tx_policies: TxPolicies,
165 ) -> anyhow::Result<Transaction> {
166 if tx_policies.witness_limit().is_none() {
167 let witness_size = witnesses
168 .iter()
169 .map(|w| w.as_vec().len() as u64)
170 .sum::<u64>()
171 + SIGNATURE_MARGIN as u64;
172
173 tx_policies = tx_policies.with_witness_limit(witness_size);
174 }
175
176 let mut tx_builder = ScriptTransactionBuilder::prepare_transfer(
177 inputs,
178 outputs.clone(),
179 tx_policies,
180 );
181 *tx_builder.witnesses_mut() = witnesses;
182 tx_builder = tx_builder.enable_burn(true);
183 tx_builder.add_signer(self.signer().clone())?;
184
185 let tx = tx_builder.build(self.provider()).await?;
186 Ok(tx.into())
187 }
188
189 #[tracing::instrument(skip_all)]
190 async fn send_transaction(
191 &self,
192 chain_id: ChainId,
193 tx: &Transaction,
194 ) -> anyhow::Result<SendResult> {
195 let fuel_client = self.provider().client();
196 let tx_id = tx.id(&chain_id);
197
198 let result = async move {
199 let estimate_predicates = false;
200 let include_preconfirmation = true;
201
202 let send_future = fuel_client.submit_opt(tx, Some(estimate_predicates));
203
204 let stream_future = fuel_client
205 .subscribe_transaction_status_opt(&tx_id, Some(include_preconfirmation));
206
207 pin_mut!(send_future, stream_future);
208
209 let result = select(send_future, stream_future).await;
220
221 let mut stream = match result {
222 Either::Left((send_result, stream_future)) => {
223 send_result?;
224 stream_future.await?
225 }
226 Either::Right((stream_result, send_future)) => {
227 let stream = stream_result?;
228 send_future.await?;
229 stream
230 }
231 };
232
233 let mut status;
234 let now = Instant::now();
235
236 loop {
237 status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
238 "Failed to get pre confirmation from the stream"
239 ))?;
240
241 if matches!(status, TransactionStatus::PreconfirmationSuccess { .. })
242 || matches!(status, TransactionStatus::PreconfirmationFailure { .. })
243 || matches!(status, TransactionStatus::Success { .. })
244 || matches!(status, TransactionStatus::Failure { .. })
245 {
246 break;
247 }
248
249 if let TransactionStatus::SqueezedOut { reason } = &status {
250 return Err(anyhow::anyhow!(
251 "Transaction was squeezed out: {reason:?}"
252 ));
253 }
254 }
255 let preconf_rx_time = now.elapsed();
256
257 let resolved;
258 match &status {
259 TransactionStatus::PreconfirmationSuccess {
260 resolved_outputs, ..
261 }
262 | TransactionStatus::PreconfirmationFailure {
263 resolved_outputs, ..
264 } => {
265 resolved =
266 resolved_outputs.clone().expect("Expected resolved outputs");
267 }
268 TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
269 let transaction = fuel_client
270 .transaction(&tx_id)
271 .await?
272 .ok_or(anyhow::anyhow!("Transaction not found"))?;
273
274 let TransactionType::Known(executed_tx) = transaction.transaction
275 else {
276 return Err(anyhow::anyhow!("Expected known transaction type"));
277 };
278
279 let resolved_outputs = executed_tx
280 .outputs()
281 .iter()
282 .enumerate()
283 .filter_map(|(index, output)| {
284 if output.is_change()
285 || output.is_variable() && output.amount() != Some(0)
286 {
287 Some(ResolvedOutput {
288 utxo_id: UtxoId::new(tx_id, index as u16),
289 output: *output,
290 })
291 } else {
292 None
293 }
294 })
295 .collect::<Vec<_>>();
296
297 resolved = resolved_outputs;
298 }
299 _ => {
300 return Err(anyhow::anyhow!(
301 "Expected pre confirmation, but received: {status:?}"
302 ));
303 }
304 }
305
306 let mut known_coins = vec![];
307 for (i, output) in tx.outputs().iter().enumerate() {
308 let utxo_id = UtxoId::new(tx_id, i as u16);
309 if let Output::Coin {
310 amount,
311 to,
312 asset_id,
313 } = *output
314 {
315 let coin = FuelTxCoin {
316 amount,
317 asset_id,
318 utxo_id,
319 owner: to,
320 };
321
322 known_coins.push(coin);
323 }
324 }
325
326 let mut dynamic_coins = vec![];
327 for output in resolved {
328 let ResolvedOutput { utxo_id, output } = output;
329 match output {
330 Output::Change {
331 amount,
332 to,
333 asset_id,
334 } => {
335 let coin = FuelTxCoin {
336 amount,
337 asset_id,
338 utxo_id,
339 owner: to,
340 };
341
342 dynamic_coins.push(coin);
343 }
344 Output::Variable {
345 amount,
346 to,
347 asset_id,
348 } => {
349 let coin = FuelTxCoin {
350 amount,
351 asset_id,
352 utxo_id,
353 owner: to,
354 };
355
356 dynamic_coins.push(coin);
357 }
358 _ => {}
359 }
360 }
361
362 let result = SendResult {
363 tx_id,
364 tx_status: status.into(),
365 known_coins,
366 dynamic_coins,
367 preconf_rx_time: Some(preconf_rx_time),
368 };
369
370 Ok(result)
371 }
372 .await;
373
374 match result {
375 Ok(result) => Ok(result),
376 Err(err) => {
377 if err.is_duplicate() {
378 let chain_id =
379 self.provider().consensus_parameters().await?.chain_id();
380 let tx_id = tx.id(&chain_id);
381 tracing::info!(
382 "Transaction {tx_id} already exists in the chain, \
383 waiting for confirmation."
384 );
385 self.await_send_result(&tx_id, tx).await
386 } else {
387 tracing::error!(
388 "The error is not duplicate, so returning it: {err:?}",
389 );
390 Err(err)
391 }
392 }
393 }
394 }
395
396 async fn build_transfer(
397 &self,
398 asset_id: AssetId,
399 transfers: &[(Address, u64)],
400 utxo_manager: &mut dyn UtxoProvider,
401 builder_data: &BuilderData,
402 fetch_coins: bool,
403 ) -> anyhow::Result<Transaction> {
404 let max_fee = builder_data.max_fee();
406
407 let base_asset_id = *builder_data.consensus_parameters.base_asset_id();
408
409 let payer: Address = self.address();
410
411 let asset_total = transfers
412 .iter()
413 .map(|(_, amount)| u128::from(*amount))
414 .sum::<u128>();
415
416 let balance_of = utxo_manager.balance_of(payer, asset_id);
417 if fetch_coins && balance_of < asset_total {
418 let asset_coins = self
419 .provider()
420 .get_spendable_resources(ResourceFilter {
421 from: self.address(),
422 asset_id: Some(asset_id),
423 amount: asset_total,
424 excluded_utxos: vec![],
425 excluded_message_nonces: vec![],
426 })
427 .await
428 .map_err(|e| {
429 anyhow::anyhow!(
430 "Failed to get spendable resources: \
431 {e} for {asset_id:?} from {payer:?} with amount {asset_total}"
432 )
433 })?
434 .into_iter()
435 .filter_map(|coin| match coin {
436 CoinType::Coin(coin) => Some(coin.into()),
437 _ => None,
438 });
439
440 utxo_manager.load_from_coins_vec(asset_coins.collect());
441 }
442
443 let fee_coins = if asset_id != base_asset_id {
444 utxo_manager.guaranteed_extract_coins(
445 payer,
446 base_asset_id,
447 max_fee as u128,
448 )?
449 } else {
450 vec![]
451 };
452
453 let mut total = transfers
454 .iter()
455 .map(|(_, amount)| u128::from(*amount))
456 .sum::<u128>();
457
458 if base_asset_id == asset_id {
459 total += max_fee as u128;
460 }
461
462 let asset_coins =
463 utxo_manager.guaranteed_extract_coins(payer, asset_id, total)?;
464
465 let mut output_coins = vec![];
466 for (recipient, amount) in transfers {
467 let output = Output::Coin {
468 to: *recipient,
469 amount: *amount,
470 asset_id,
471 };
472 output_coins.push(output);
473 }
474
475 output_coins.push(Output::Change {
476 to: payer,
477 amount: 0,
478 asset_id: base_asset_id,
479 });
480
481 if asset_id != base_asset_id {
482 output_coins.push(Output::Change {
483 to: payer,
484 amount: 0,
485 asset_id,
486 });
487 }
488
489 let mut input_coins = asset_coins;
490 input_coins.extend(fee_coins);
491
492 let inputs = input_coins
493 .into_iter()
494 .map(|coin| Input::resource_signed(CoinType::Coin(coin.into())))
495 .collect::<Vec<_>>();
496
497 let tx = self
498 .build_transaction(
499 inputs,
500 output_coins,
501 vec![],
502 TxPolicies::default().with_max_fee(max_fee),
503 )
504 .await?;
505
506 Ok(tx)
507 }
508
509 async fn transfer_many_and_wait(
510 &self,
511 asset_id: AssetId,
512 transfers: &[(Address, u64)],
513 utxo_manager: &mut dyn UtxoProvider,
514 builder_data: &BuilderData,
515 fetch_coins: bool,
516 chunk_size: Option<usize>,
517 ) -> anyhow::Result<Vec<FuelTxCoin>> {
518 let known_coins = self
519 .transfer_many(
520 asset_id,
521 transfers,
522 utxo_manager,
523 builder_data,
524 fetch_coins,
525 chunk_size,
526 )
527 .await?;
528
529 if let Some(last_tx_id) = known_coins.last().map(|coin| coin.utxo_id.tx_id()) {
530 let tx_id = TxId::new((*last_tx_id).into());
531 self.provider()
532 .await_transaction_commit::<ScriptTransaction>(tx_id)
533 .await?;
534 }
535
536 Ok(known_coins)
537 }
538
539 async fn transfer_many(
540 &self,
541 asset_id: AssetId,
542 transfers: &[(Address, u64)],
543 utxo_manager: &mut dyn UtxoProvider,
544 builder_data: &BuilderData,
545 fetch_coins: bool,
546 chunk_size: Option<usize>,
547 ) -> anyhow::Result<Vec<FuelTxCoin>> {
548 let chain_id = builder_data.consensus_parameters.chain_id();
549 match chunk_size {
550 None => {
551 let tx = self
552 .build_transfer(
553 asset_id,
554 transfers,
555 utxo_manager,
556 builder_data,
557 fetch_coins,
558 )
559 .await?;
560 let result = self.send_transaction(chain_id, &tx).await?;
561 Ok(result.known_coins)
562 }
563 Some(chunk_size) => {
564 let mut known_coins = vec![];
565 for chunk in transfers.chunks(chunk_size) {
566 let tx = self
567 .build_transfer(
568 asset_id,
569 chunk,
570 utxo_manager,
571 builder_data,
572 fetch_coins,
573 )
574 .await?;
575 let result = self.send_transaction(chain_id, &tx).await?;
576
577 known_coins.extend(result.known_coins);
578 utxo_manager.load_from_coins_vec(result.dynamic_coins);
579 }
580
581 Ok(known_coins)
582 }
583 }
584 }
585
586 #[tracing::instrument(skip(self, tx), fields(tx_id))]
587 async fn await_send_result(
588 &self,
589 tx_id: &TxId,
590 tx: &Transaction,
591 ) -> anyhow::Result<SendResult> {
592 let fuel_client = self.provider().client();
593
594 let include_preconfirmation = true;
595 let result = fuel_client
596 .subscribe_transaction_status_opt(tx_id, Some(include_preconfirmation))
597 .await;
598 let mut stream = match result {
599 Ok(stream) => stream,
600 Err(err) => {
601 tracing::error!("Failed to subscribe to transaction status: {err:?}");
602 return Err(err.into());
603 }
604 };
605
606 let mut status;
607 let mut preconf_rx_time = None;
608 loop {
609 let now = Instant::now();
610 status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
611 "Failed to get transaction status from stream"
612 ))?;
613
614 match status {
615 TransactionStatus::PreconfirmationSuccess { .. }
616 | TransactionStatus::PreconfirmationFailure { .. } => {
617 preconf_rx_time = Some(now.elapsed());
618 break;
619 }
620 TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
621 break;
622 }
623 TransactionStatus::SqueezedOut { reason } => {
624 tracing::error!(%tx_id, "Transaction was squeezed out: {reason:?}");
625 continue;
626 }
627 _ => continue,
628 }
629 }
630
631 let mut known_coins = vec![];
632 for (i, output) in tx.outputs().iter().enumerate() {
633 let utxo_id = UtxoId::new(*tx_id, i as u16);
634 if let Output::Coin {
635 amount,
636 to,
637 asset_id,
638 } = *output
639 {
640 let coin = FuelTxCoin {
641 amount,
642 asset_id,
643 utxo_id,
644 owner: to,
645 };
646
647 known_coins.push(coin);
648 }
649 }
650
651 let mut dynamic_coins = vec![];
652 match &status {
653 TransactionStatus::PreconfirmationSuccess {
654 resolved_outputs, ..
655 }
656 | TransactionStatus::PreconfirmationFailure {
657 resolved_outputs, ..
658 } => {
659 let resolved_outputs = resolved_outputs.clone().unwrap_or_default();
660
661 for output in resolved_outputs {
662 let ResolvedOutput { utxo_id, output } = output;
663 match output {
664 Output::Change {
665 amount,
666 to,
667 asset_id,
668 } => {
669 let coin = FuelTxCoin {
670 amount,
671 asset_id,
672 utxo_id,
673 owner: to,
674 };
675
676 dynamic_coins.push(coin);
677 }
678 Output::Variable {
679 amount,
680 to,
681 asset_id,
682 } => {
683 let coin = FuelTxCoin {
684 amount,
685 asset_id,
686 utxo_id,
687 owner: to,
688 };
689
690 dynamic_coins.push(coin);
691 }
692 _ => {}
693 }
694 }
695 }
696 TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
697 let tx = fuel_client
698 .transaction(tx_id)
699 .await?
700 .ok_or(anyhow::anyhow!("Transaction not found"))?;
701
702 match tx.transaction {
703 TransactionType::Known(tx) => {
704 for (index, output) in tx.outputs().iter().enumerate() {
705 let utxo_id = UtxoId::new(*tx_id, index as u16);
706
707 match *output {
708 Output::Change {
709 amount,
710 to,
711 asset_id,
712 } => {
713 let coin = FuelTxCoin {
714 amount,
715 asset_id,
716 utxo_id,
717 owner: to,
718 };
719
720 dynamic_coins.push(coin);
721 }
722 Output::Variable {
723 amount,
724 to,
725 asset_id,
726 } => {
727 let coin = FuelTxCoin {
728 amount,
729 asset_id,
730 utxo_id,
731 owner: to,
732 };
733
734 dynamic_coins.push(coin);
735 }
736 _ => {}
737 }
738 }
739 }
740 TransactionType::Unknown => {}
741 }
742 }
743 _ => {
744 return Err(anyhow::anyhow!(
745 "Expected pre confirmation, but received: {status:?}"
746 ));
747 }
748 }
749
750 let result = SendResult {
751 tx_id: *tx_id,
752 tx_status: status.into(),
753 known_coins,
754 dynamic_coins,
755 preconf_rx_time,
756 };
757
758 Ok(result)
759 }
760}
761
762pub(crate) trait ClientError {
763 fn is_duplicate(&self) -> bool;
764}
765
766impl<T> ClientError for T
767where
768 T: ToString,
769{
770 fn is_duplicate(&self) -> bool {
771 self.to_string().contains("Transaction id already exists")
772 }
773}