Skip to main content

o2_tools/
wallet_ext.rs

1use crate::utxo_manager::{
2    FuelTxCoin,
3    UtxoProvider,
4};
5use fuel_core_client::client::types::{
6    ResolvedOutput,
7    TransactionStatus,
8    TransactionType,
9};
10use fuel_core_types::{
11    blockchain::transaction::TransactionExt,
12    fuel_tx::{
13        Address,
14        AssetId,
15        ConsensusParameters,
16        Transaction,
17        TxId,
18        UniqueIdentifier,
19        UtxoId,
20        Witness,
21    },
22    fuel_types::ChainId,
23};
24use fuels::{
25    accounts::{
26        ViewOnlyAccount,
27        wallet::Unlocked,
28    },
29    prelude::{
30        BuildableTransaction,
31        ResourceFilter,
32        ScriptTransactionBuilder,
33        TransactionBuilder,
34        TxPolicies,
35        Wallet,
36    },
37    types::{
38        coin_type::CoinType,
39        input::Input,
40        output::Output,
41        transaction::ScriptTransaction,
42        tx_status::TxStatus,
43    },
44};
45use futures::{
46    StreamExt,
47    future::{
48        Either,
49        select,
50    },
51    pin_mut,
52};
53use std::{
54    future::Future,
55    ops::Mul,
56    time::{
57        Duration,
58        Instant,
59    },
60};
61
62pub const SIGNATURE_MARGIN: usize = 100;
63
64#[derive(Clone, Debug)]
65pub struct SendResult<T = TxStatus> {
66    pub tx_id: TxId,
67    pub tx_status: T,
68    pub known_coins: Vec<FuelTxCoin>,
69    pub dynamic_coins: Vec<FuelTxCoin>,
70    pub preconf_rx_time: Option<Duration>,
71}
72
73#[derive(Clone)]
74pub struct BuilderData {
75    pub consensus_parameters: ConsensusParameters,
76    pub gas_price: u64,
77}
78
79impl BuilderData {
80    pub fn max_fee(&self) -> u64 {
81        let max_gas_limit = self.consensus_parameters.tx_params().max_gas_per_tx();
82        // Get the max fee based on the current info for the chain
83        max_gas_limit
84            .mul(self.gas_price)
85            .div_ceil(self.consensus_parameters.fee_params().gas_price_factor())
86    }
87}
88
89pub trait WalletExt {
90    fn builder_data(&self) -> impl Future<Output = anyhow::Result<BuilderData>> + Send;
91
92    fn build_transfer(
93        &self,
94        asset_id: AssetId,
95        transfers: &[(Address, u64)],
96        utxo_manager: &mut dyn UtxoProvider,
97        builder_data: &BuilderData,
98        fetch_coins: bool,
99    ) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
100
101    fn build_transaction(
102        &self,
103        inputs: Vec<Input>,
104        outputs: Vec<Output>,
105        witnesses: Vec<Witness>,
106        tx_policies: TxPolicies,
107    ) -> impl Future<Output = anyhow::Result<Transaction>> + Send;
108
109    fn send_transaction(
110        &self,
111        chain_id: ChainId,
112        tx: &Transaction,
113    ) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
114
115    fn transfer_many(
116        &self,
117        asset_id: AssetId,
118        transfers: &[(Address, u64)],
119        utxo_manager: &mut dyn UtxoProvider,
120        builder_data: &BuilderData,
121        fetch_coins: bool,
122        chunk_size: Option<usize>,
123    ) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
124
125    fn transfer_many_and_wait(
126        &self,
127        asset_id: AssetId,
128        transfers: &[(Address, u64)],
129        utxo_manager: &mut dyn UtxoProvider,
130        builder_data: &BuilderData,
131        fetch_coins: bool,
132        chunk_size: Option<usize>,
133    ) -> impl Future<Output = anyhow::Result<Vec<FuelTxCoin>>> + Send;
134
135    fn await_send_result(
136        &self,
137        tx_id: &TxId,
138        tx: &Transaction,
139    ) -> impl Future<Output = anyhow::Result<SendResult>> + Send;
140}
141
142impl<S> WalletExt for Wallet<Unlocked<S>>
143where
144    S: fuels::core::traits::Signer + Clone + Send + Sync + std::fmt::Debug + 'static,
145{
146    async fn builder_data(&self) -> anyhow::Result<BuilderData> {
147        let provider = self.provider();
148        let consensus_parameters = provider.consensus_parameters().await?;
149        let gas_price = provider.estimate_gas_price(10).await?;
150
151        let builder_data = BuilderData {
152            consensus_parameters,
153            gas_price: gas_price.gas_price,
154        };
155
156        Ok(builder_data)
157    }
158
159    async fn build_transaction(
160        &self,
161        inputs: Vec<Input>,
162        outputs: Vec<Output>,
163        witnesses: Vec<Witness>,
164        mut tx_policies: TxPolicies,
165    ) -> anyhow::Result<Transaction> {
166        if tx_policies.witness_limit().is_none() {
167            let witness_size = witnesses
168                .iter()
169                .map(|w| w.as_vec().len() as u64)
170                .sum::<u64>()
171                + SIGNATURE_MARGIN as u64;
172
173            tx_policies = tx_policies.with_witness_limit(witness_size);
174        }
175
176        let mut tx_builder = ScriptTransactionBuilder::prepare_transfer(
177            inputs,
178            outputs.clone(),
179            tx_policies,
180        );
181        *tx_builder.witnesses_mut() = witnesses;
182        tx_builder = tx_builder.enable_burn(true);
183        tx_builder.add_signer(self.signer().clone())?;
184
185        let tx = tx_builder.build(self.provider()).await?;
186        Ok(tx.into())
187    }
188
189    #[tracing::instrument(skip_all)]
190    async fn send_transaction(
191        &self,
192        chain_id: ChainId,
193        tx: &Transaction,
194    ) -> anyhow::Result<SendResult> {
195        let fuel_client = self.provider().client();
196        let tx_id = tx.id(&chain_id);
197
198        let result = async move {
199            let estimate_predicates = false;
200            let include_preconfirmation = true;
201
202            let send_future = fuel_client.submit_opt(tx, Some(estimate_predicates));
203
204            let stream_future = fuel_client
205                .subscribe_transaction_status_opt(&tx_id, Some(include_preconfirmation));
206
207            pin_mut!(send_future, stream_future);
208
209            // Just submitting transactions is faster than submit and subscribe.
210            // Subscription requires 3 round trips to the server under the hood
211            // to establish connection.
212            // So we subscribe separately to statuses, because we want network see transaction
213            // as soon as possible.
214            // Plus, if the caller doesn't wait for send result, they can get notification for
215            // events from API faster.
216            // TODO: Use indexer to subscribe for transaction status to reduce load on the node.
217            //  and remove additional latency here.
218            //  https://github.com/FuelLabs/fuel-o2/issues/1358
219            let result = select(send_future, stream_future).await;
220
221            let mut stream = match result {
222                Either::Left((send_result, stream_future)) => {
223                    send_result?;
224                    stream_future.await?
225                }
226                Either::Right((stream_result, send_future)) => {
227                    let stream = stream_result?;
228                    send_future.await?;
229                    stream
230                }
231            };
232
233            let mut status;
234            let now = Instant::now();
235
236            loop {
237                status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
238                    "Failed to get pre confirmation from the stream"
239                ))?;
240
241                if matches!(status, TransactionStatus::PreconfirmationSuccess { .. })
242                    || matches!(status, TransactionStatus::PreconfirmationFailure { .. })
243                    || matches!(status, TransactionStatus::Success { .. })
244                    || matches!(status, TransactionStatus::Failure { .. })
245                {
246                    break;
247                }
248
249                if let TransactionStatus::SqueezedOut { reason } = &status {
250                    return Err(anyhow::anyhow!(
251                        "Transaction was squeezed out: {reason:?}"
252                    ));
253                }
254            }
255            let preconf_rx_time = now.elapsed();
256
257            let resolved;
258            match &status {
259                TransactionStatus::PreconfirmationSuccess {
260                    resolved_outputs, ..
261                }
262                | TransactionStatus::PreconfirmationFailure {
263                    resolved_outputs, ..
264                } => {
265                    resolved =
266                        resolved_outputs.clone().expect("Expected resolved outputs");
267                }
268                TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
269                    let transaction = fuel_client
270                        .transaction(&tx_id)
271                        .await?
272                        .ok_or(anyhow::anyhow!("Transaction not found"))?;
273
274                    let TransactionType::Known(executed_tx) = transaction.transaction
275                    else {
276                        return Err(anyhow::anyhow!("Expected known transaction type"));
277                    };
278
279                    let resolved_outputs = executed_tx
280                        .outputs()
281                        .iter()
282                        .enumerate()
283                        .filter_map(|(index, output)| {
284                            if output.is_change()
285                                || output.is_variable() && output.amount() != Some(0)
286                            {
287                                Some(ResolvedOutput {
288                                    utxo_id: UtxoId::new(tx_id, index as u16),
289                                    output: *output,
290                                })
291                            } else {
292                                None
293                            }
294                        })
295                        .collect::<Vec<_>>();
296
297                    resolved = resolved_outputs;
298                }
299                _ => {
300                    return Err(anyhow::anyhow!(
301                        "Expected pre confirmation, but received: {status:?}"
302                    ));
303                }
304            }
305
306            let mut known_coins = vec![];
307            for (i, output) in tx.outputs().iter().enumerate() {
308                let utxo_id = UtxoId::new(tx_id, i as u16);
309                if let Output::Coin {
310                    amount,
311                    to,
312                    asset_id,
313                } = *output
314                {
315                    let coin = FuelTxCoin {
316                        amount,
317                        asset_id,
318                        utxo_id,
319                        owner: to,
320                    };
321
322                    known_coins.push(coin);
323                }
324            }
325
326            let mut dynamic_coins = vec![];
327            for output in resolved {
328                let ResolvedOutput { utxo_id, output } = output;
329                match output {
330                    Output::Change {
331                        amount,
332                        to,
333                        asset_id,
334                    } => {
335                        let coin = FuelTxCoin {
336                            amount,
337                            asset_id,
338                            utxo_id,
339                            owner: to,
340                        };
341
342                        dynamic_coins.push(coin);
343                    }
344                    Output::Variable {
345                        amount,
346                        to,
347                        asset_id,
348                    } => {
349                        let coin = FuelTxCoin {
350                            amount,
351                            asset_id,
352                            utxo_id,
353                            owner: to,
354                        };
355
356                        dynamic_coins.push(coin);
357                    }
358                    _ => {}
359                }
360            }
361
362            let result = SendResult {
363                tx_id,
364                tx_status: status.into(),
365                known_coins,
366                dynamic_coins,
367                preconf_rx_time: Some(preconf_rx_time),
368            };
369
370            Ok(result)
371        }
372        .await;
373
374        match result {
375            Ok(result) => Ok(result),
376            Err(err) => {
377                if err.is_duplicate() {
378                    let chain_id =
379                        self.provider().consensus_parameters().await?.chain_id();
380                    let tx_id = tx.id(&chain_id);
381                    tracing::info!(
382                        "Transaction {tx_id} already exists in the chain, \
383                        waiting for confirmation."
384                    );
385                    self.await_send_result(&tx_id, tx).await
386                } else {
387                    tracing::error!(
388                        "The error is not duplicate, so returning it: {err:?}",
389                    );
390                    Err(err)
391                }
392            }
393        }
394    }
395
396    async fn build_transfer(
397        &self,
398        asset_id: AssetId,
399        transfers: &[(Address, u64)],
400        utxo_manager: &mut dyn UtxoProvider,
401        builder_data: &BuilderData,
402        fetch_coins: bool,
403    ) -> anyhow::Result<Transaction> {
404        // Get the max fee based on the current info for the chain
405        let max_fee = builder_data.max_fee();
406
407        let base_asset_id = *builder_data.consensus_parameters.base_asset_id();
408
409        let payer: Address = self.address();
410
411        let asset_total = transfers
412            .iter()
413            .map(|(_, amount)| u128::from(*amount))
414            .sum::<u128>();
415
416        let balance_of = utxo_manager.balance_of(payer, asset_id);
417        if fetch_coins && balance_of < asset_total {
418            let asset_coins = self
419                .provider()
420                .get_spendable_resources(ResourceFilter {
421                    from: self.address(),
422                    asset_id: Some(asset_id),
423                    amount: asset_total,
424                    excluded_utxos: vec![],
425                    excluded_message_nonces: vec![],
426                })
427                .await
428                .map_err(|e| {
429                    anyhow::anyhow!(
430                        "Failed to get spendable resources: \
431                        {e} for {asset_id:?} from {payer:?} with amount {asset_total}"
432                    )
433                })?
434                .into_iter()
435                .filter_map(|coin| match coin {
436                    CoinType::Coin(coin) => Some(coin.into()),
437                    _ => None,
438                });
439
440            utxo_manager.load_from_coins_vec(asset_coins.collect());
441        }
442
443        let fee_coins = if asset_id != base_asset_id {
444            utxo_manager.guaranteed_extract_coins(
445                payer,
446                base_asset_id,
447                max_fee as u128,
448            )?
449        } else {
450            vec![]
451        };
452
453        let mut total = transfers
454            .iter()
455            .map(|(_, amount)| u128::from(*amount))
456            .sum::<u128>();
457
458        if base_asset_id == asset_id {
459            total += max_fee as u128;
460        }
461
462        let asset_coins =
463            utxo_manager.guaranteed_extract_coins(payer, asset_id, total)?;
464
465        let mut output_coins = vec![];
466        for (recipient, amount) in transfers {
467            let output = Output::Coin {
468                to: *recipient,
469                amount: *amount,
470                asset_id,
471            };
472            output_coins.push(output);
473        }
474
475        output_coins.push(Output::Change {
476            to: payer,
477            amount: 0,
478            asset_id: base_asset_id,
479        });
480
481        if asset_id != base_asset_id {
482            output_coins.push(Output::Change {
483                to: payer,
484                amount: 0,
485                asset_id,
486            });
487        }
488
489        let mut input_coins = asset_coins;
490        input_coins.extend(fee_coins);
491
492        let inputs = input_coins
493            .into_iter()
494            .map(|coin| Input::resource_signed(CoinType::Coin(coin.into())))
495            .collect::<Vec<_>>();
496
497        let tx = self
498            .build_transaction(
499                inputs,
500                output_coins,
501                vec![],
502                TxPolicies::default().with_max_fee(max_fee),
503            )
504            .await?;
505
506        Ok(tx)
507    }
508
509    async fn transfer_many_and_wait(
510        &self,
511        asset_id: AssetId,
512        transfers: &[(Address, u64)],
513        utxo_manager: &mut dyn UtxoProvider,
514        builder_data: &BuilderData,
515        fetch_coins: bool,
516        chunk_size: Option<usize>,
517    ) -> anyhow::Result<Vec<FuelTxCoin>> {
518        let known_coins = self
519            .transfer_many(
520                asset_id,
521                transfers,
522                utxo_manager,
523                builder_data,
524                fetch_coins,
525                chunk_size,
526            )
527            .await?;
528
529        if let Some(last_tx_id) = known_coins.last().map(|coin| coin.utxo_id.tx_id()) {
530            let tx_id = TxId::new((*last_tx_id).into());
531            self.provider()
532                .await_transaction_commit::<ScriptTransaction>(tx_id)
533                .await?;
534        }
535
536        Ok(known_coins)
537    }
538
539    async fn transfer_many(
540        &self,
541        asset_id: AssetId,
542        transfers: &[(Address, u64)],
543        utxo_manager: &mut dyn UtxoProvider,
544        builder_data: &BuilderData,
545        fetch_coins: bool,
546        chunk_size: Option<usize>,
547    ) -> anyhow::Result<Vec<FuelTxCoin>> {
548        let chain_id = builder_data.consensus_parameters.chain_id();
549        match chunk_size {
550            None => {
551                let tx = self
552                    .build_transfer(
553                        asset_id,
554                        transfers,
555                        utxo_manager,
556                        builder_data,
557                        fetch_coins,
558                    )
559                    .await?;
560                let result = self.send_transaction(chain_id, &tx).await?;
561                Ok(result.known_coins)
562            }
563            Some(chunk_size) => {
564                let mut known_coins = vec![];
565                for chunk in transfers.chunks(chunk_size) {
566                    let tx = self
567                        .build_transfer(
568                            asset_id,
569                            chunk,
570                            utxo_manager,
571                            builder_data,
572                            fetch_coins,
573                        )
574                        .await?;
575                    let result = self.send_transaction(chain_id, &tx).await?;
576
577                    known_coins.extend(result.known_coins);
578                    utxo_manager.load_from_coins_vec(result.dynamic_coins);
579                }
580
581                Ok(known_coins)
582            }
583        }
584    }
585
586    #[tracing::instrument(skip(self, tx), fields(tx_id))]
587    async fn await_send_result(
588        &self,
589        tx_id: &TxId,
590        tx: &Transaction,
591    ) -> anyhow::Result<SendResult> {
592        let fuel_client = self.provider().client();
593
594        let include_preconfirmation = true;
595        let result = fuel_client
596            .subscribe_transaction_status_opt(tx_id, Some(include_preconfirmation))
597            .await;
598        let mut stream = match result {
599            Ok(stream) => stream,
600            Err(err) => {
601                tracing::error!("Failed to subscribe to transaction status: {err:?}");
602                return Err(err.into());
603            }
604        };
605
606        let mut status;
607        let mut preconf_rx_time = None;
608        loop {
609            let now = Instant::now();
610            status = stream.next().await.transpose()?.ok_or(anyhow::anyhow!(
611                "Failed to get transaction status from stream"
612            ))?;
613
614            match status {
615                TransactionStatus::PreconfirmationSuccess { .. }
616                | TransactionStatus::PreconfirmationFailure { .. } => {
617                    preconf_rx_time = Some(now.elapsed());
618                    break;
619                }
620                TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
621                    break;
622                }
623                TransactionStatus::SqueezedOut { reason } => {
624                    tracing::error!(%tx_id, "Transaction was squeezed out: {reason:?}");
625                    continue;
626                }
627                _ => continue,
628            }
629        }
630
631        let mut known_coins = vec![];
632        for (i, output) in tx.outputs().iter().enumerate() {
633            let utxo_id = UtxoId::new(*tx_id, i as u16);
634            if let Output::Coin {
635                amount,
636                to,
637                asset_id,
638            } = *output
639            {
640                let coin = FuelTxCoin {
641                    amount,
642                    asset_id,
643                    utxo_id,
644                    owner: to,
645                };
646
647                known_coins.push(coin);
648            }
649        }
650
651        let mut dynamic_coins = vec![];
652        match &status {
653            TransactionStatus::PreconfirmationSuccess {
654                resolved_outputs, ..
655            }
656            | TransactionStatus::PreconfirmationFailure {
657                resolved_outputs, ..
658            } => {
659                let resolved_outputs = resolved_outputs.clone().unwrap_or_default();
660
661                for output in resolved_outputs {
662                    let ResolvedOutput { utxo_id, output } = output;
663                    match output {
664                        Output::Change {
665                            amount,
666                            to,
667                            asset_id,
668                        } => {
669                            let coin = FuelTxCoin {
670                                amount,
671                                asset_id,
672                                utxo_id,
673                                owner: to,
674                            };
675
676                            dynamic_coins.push(coin);
677                        }
678                        Output::Variable {
679                            amount,
680                            to,
681                            asset_id,
682                        } => {
683                            let coin = FuelTxCoin {
684                                amount,
685                                asset_id,
686                                utxo_id,
687                                owner: to,
688                            };
689
690                            dynamic_coins.push(coin);
691                        }
692                        _ => {}
693                    }
694                }
695            }
696            TransactionStatus::Success { .. } | TransactionStatus::Failure { .. } => {
697                let tx = fuel_client
698                    .transaction(tx_id)
699                    .await?
700                    .ok_or(anyhow::anyhow!("Transaction not found"))?;
701
702                match tx.transaction {
703                    TransactionType::Known(tx) => {
704                        for (index, output) in tx.outputs().iter().enumerate() {
705                            let utxo_id = UtxoId::new(*tx_id, index as u16);
706
707                            match *output {
708                                Output::Change {
709                                    amount,
710                                    to,
711                                    asset_id,
712                                } => {
713                                    let coin = FuelTxCoin {
714                                        amount,
715                                        asset_id,
716                                        utxo_id,
717                                        owner: to,
718                                    };
719
720                                    dynamic_coins.push(coin);
721                                }
722                                Output::Variable {
723                                    amount,
724                                    to,
725                                    asset_id,
726                                } => {
727                                    let coin = FuelTxCoin {
728                                        amount,
729                                        asset_id,
730                                        utxo_id,
731                                        owner: to,
732                                    };
733
734                                    dynamic_coins.push(coin);
735                                }
736                                _ => {}
737                            }
738                        }
739                    }
740                    TransactionType::Unknown => {}
741                }
742            }
743            _ => {
744                return Err(anyhow::anyhow!(
745                    "Expected pre confirmation, but received: {status:?}"
746                ));
747            }
748        }
749
750        let result = SendResult {
751            tx_id: *tx_id,
752            tx_status: status.into(),
753            known_coins,
754            dynamic_coins,
755            preconf_rx_time,
756        };
757
758        Ok(result)
759    }
760}
761
762pub(crate) trait ClientError {
763    fn is_duplicate(&self) -> bool;
764}
765
766impl<T> ClientError for T
767where
768    T: ToString,
769{
770    fn is_duplicate(&self) -> bool {
771        self.to_string().contains("Transaction id already exists")
772    }
773}