fuel_core_client/
client.rs

1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4    client::{
5        schema::{
6            block::BlockByHeightArgs,
7            coins::{
8                ExcludeInput,
9                SpendQueryElementInput,
10            },
11            contract::ContractBalanceQueryArgs,
12            gas_price::EstimateGasPrice,
13            message::MessageStatusArgs,
14            relayed_tx::RelayedTransactionStatusArgs,
15            tx::{
16                DryRunArg,
17                TxWithEstimatedPredicatesArg,
18            },
19            Tai64Timestamp,
20            TransactionId,
21        },
22        types::{
23            asset::AssetDetail,
24            gas_price::LatestGasPrice,
25            message::MessageStatus,
26            primitives::{
27                Address,
28                AssetId,
29                BlockId,
30                ContractId,
31                UtxoId,
32            },
33            upgrades::StateTransitionBytecode,
34            RelayedTransactionStatus,
35        },
36    },
37    reqwest_ext::{
38        FuelGraphQlResponse,
39        FuelOperation,
40        ReqwestExt,
41    },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46    Engine as _,
47    BASE64_STANDARD,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52    Id,
53    MutationBuilder,
54    Operation,
55    QueryBuilder,
56};
57use fuel_core_types::{
58    blockchain::header::{
59        ConsensusParametersVersion,
60        StateTransitionBytecodeVersion,
61    },
62    fuel_asm::{
63        Instruction,
64        Word,
65    },
66    fuel_tx::{
67        BlobId,
68        Bytes32,
69        ConsensusParameters,
70        Receipt,
71        Transaction,
72        TxId,
73    },
74    fuel_types::{
75        self,
76        canonical::Serialize,
77        BlockHeight,
78        Nonce,
79    },
80    services::executor::{
81        StorageReadReplayEvent,
82        TransactionExecutionStatus,
83    },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87    Stream,
88    StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92    PageDirection,
93    PaginatedResult,
94    PaginationRequest,
95};
96use schema::{
97    assets::AssetInfoArg,
98    balance::BalanceArgs,
99    blob::BlobByIdArgs,
100    block::BlockByIdArgs,
101    coins::{
102        CoinByIdArgs,
103        CoinsConnectionArgs,
104    },
105    contract::{
106        ContractBalancesConnectionArgs,
107        ContractByIdArgs,
108    },
109    da_compressed::DaCompressedBlockByHeightArgs,
110    gas_price::BlockHorizonArgs,
111    storage_read_replay::{
112        StorageReadReplay,
113        StorageReadReplayArgs,
114    },
115    tx::{
116        AssembleTxArg,
117        TransactionsByOwnerConnectionArgs,
118        TxArg,
119        TxIdArgs,
120    },
121    Bytes,
122    ContinueTx,
123    ContinueTxArgs,
124    ConversionError,
125    HexString,
126    IdArg,
127    MemoryArgs,
128    RegisterArgs,
129    RunResult,
130    SetBreakpoint,
131    SetBreakpointArgs,
132    SetSingleStepping,
133    SetSingleSteppingArgs,
134    StartTx,
135    StartTxArgs,
136    U32,
137    U64,
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142    convert::TryInto,
143    io::{
144        self,
145        ErrorKind,
146    },
147    net,
148    str::{
149        self,
150        FromStr,
151    },
152    sync::{
153        Arc,
154        Mutex,
155    },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160    assemble_tx::{
161        AssembleTransactionResult,
162        RequiredBalance,
163    },
164    TransactionResponse,
165    TransactionStatus,
166};
167
168use self::schema::{
169    block::ProduceBlockArgs,
170    message::{
171        MessageProofArgs,
172        NonceArgs,
173    },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184/// Error occurring during interaction with the FuelClient
185// anyhow::Error is wrapped inside a custom Error type,
186// so that we can specific error variants in the future.
187pub enum Error {
188    /// Unknown or not expected(by architecture) error.
189    #[from]
190    Other(anyhow::Error),
191}
192
193/// Consistency policy for the [`FuelClient`] to define the strategy
194/// for the required height feature.
195#[derive(Debug)]
196pub enum ConsistencyPolicy {
197    /// Automatically fetch the next block height from the response and
198    /// use it as an input to the next query to guarantee consistency
199    /// of the results for the queries.
200    Auto {
201        /// The required block height for the queries.
202        height: Arc<Mutex<Option<BlockHeight>>>,
203    },
204    /// Use manually sets the block height for all queries
205    /// via the [`FuelClient::with_required_fuel_block_height`].
206    Manual {
207        /// The required block height for the queries.
208        height: Option<BlockHeight>,
209    },
210}
211
212impl Clone for ConsistencyPolicy {
213    fn clone(&self) -> Self {
214        match self {
215            Self::Auto { height } => Self::Auto {
216                // We don't want to share the same mutex between the different
217                // instances of the `FuelClient`.
218                height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219            },
220            Self::Manual { height } => Self::Manual { height: *height },
221        }
222    }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227    current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228    current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232    fn clone(&self) -> Self {
233        Self {
234            current_stf_version: Arc::new(Mutex::new(
235                self.current_stf_version.lock().ok().and_then(|v| *v),
236            )),
237            current_consensus_parameters_version: Arc::new(Mutex::new(
238                self.current_consensus_parameters_version
239                    .lock()
240                    .ok()
241                    .and_then(|v| *v),
242            )),
243        }
244    }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249    client: reqwest::Client,
250    #[cfg(feature = "subscriptions")]
251    cookie: std::sync::Arc<reqwest::cookie::Jar>,
252    url: reqwest::Url,
253    require_height: ConsistencyPolicy,
254    chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258    type Err = anyhow::Error;
259
260    fn from_str(str: &str) -> Result<Self, Self::Err> {
261        let mut raw_url = str.to_string();
262        if !raw_url.starts_with("http") {
263            raw_url = format!("http://{raw_url}");
264        }
265
266        let mut url = reqwest::Url::parse(&raw_url)
267            .map_err(anyhow::Error::msg)
268            .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269        url.set_path("/v1/graphql");
270
271        #[cfg(feature = "subscriptions")]
272        {
273            let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274            let client = reqwest::Client::builder()
275                .cookie_provider(cookie.clone())
276                .build()
277                .map_err(anyhow::Error::msg)?;
278            Ok(Self {
279                client,
280                cookie,
281                url,
282                require_height: ConsistencyPolicy::Auto {
283                    height: Arc::new(Mutex::new(None)),
284                },
285                chain_state_info: Default::default(),
286            })
287        }
288
289        #[cfg(not(feature = "subscriptions"))]
290        {
291            let client = reqwest::Client::new();
292            Ok(Self {
293                client,
294                url,
295                require_height: ConsistencyPolicy::Auto {
296                    height: Arc::new(Mutex::new(None)),
297                },
298                chain_state_info: Default::default(),
299            })
300        }
301    }
302}
303
304impl<S> From<S> for FuelClient
305where
306    S: Into<net::SocketAddr>,
307{
308    fn from(socket: S) -> Self {
309        format!("http://{}", socket.into())
310            .as_str()
311            .parse()
312            .unwrap()
313    }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317    let e = errors
318        .into_iter()
319        .fold(String::from("Response errors"), |mut s, e| {
320            s.push_str("; ");
321            s.push_str(e.as_str());
322            s
323        });
324    io::Error::new(io::ErrorKind::Other, e)
325}
326
327impl FuelClient {
328    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329        Self::from_str(url.as_ref())
330    }
331
332    pub fn with_required_fuel_block_height(
333        &mut self,
334        new_height: Option<BlockHeight>,
335    ) -> &mut Self {
336        match &mut self.require_height {
337            ConsistencyPolicy::Auto { height } => {
338                *height.lock().expect("Mutex poisoned") = new_height;
339            }
340            ConsistencyPolicy::Manual { height } => {
341                *height = new_height;
342            }
343        }
344        self
345    }
346
347    pub fn use_manual_consistency_policy(
348        &mut self,
349        height: Option<BlockHeight>,
350    ) -> &mut Self {
351        self.require_height = ConsistencyPolicy::Manual { height };
352        self
353    }
354
355    pub fn required_block_height(&self) -> Option<BlockHeight> {
356        match &self.require_height {
357            ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358            ConsistencyPolicy::Manual { height } => *height,
359        }
360    }
361
362    fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363        if let Some(current_sft_version) = response
364            .extensions
365            .as_ref()
366            .and_then(|e| e.current_stf_version)
367        {
368            if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
369                *c = Some(current_sft_version);
370            }
371        }
372
373        if let Some(current_consensus_parameters_version) = response
374            .extensions
375            .as_ref()
376            .and_then(|e| e.current_consensus_parameters_version)
377        {
378            if let Ok(mut c) = self
379                .chain_state_info
380                .current_consensus_parameters_version
381                .lock()
382            {
383                *c = Some(current_consensus_parameters_version);
384            }
385        }
386
387        let inner_required_height = match &self.require_height {
388            ConsistencyPolicy::Auto { height } => Some(height.clone()),
389            ConsistencyPolicy::Manual { .. } => None,
390        };
391
392        if let Some(inner_required_height) = inner_required_height {
393            if let Some(current_fuel_block_height) = response
394                .extensions
395                .as_ref()
396                .and_then(|e| e.current_fuel_block_height)
397            {
398                let mut lock = inner_required_height.lock().expect("Mutex poisoned");
399
400                if current_fuel_block_height >= lock.unwrap_or_default() {
401                    *lock = Some(current_fuel_block_height);
402                }
403            }
404        }
405    }
406
407    /// Send the GraphQL query to the client.
408    pub async fn query<ResponseData, Vars>(
409        &self,
410        q: Operation<ResponseData, Vars>,
411    ) -> io::Result<ResponseData>
412    where
413        Vars: serde::Serialize,
414        ResponseData: serde::de::DeserializeOwned + 'static,
415    {
416        let required_fuel_block_height = self.required_block_height();
417        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
418        let response = self
419            .client
420            .post(self.url.clone())
421            .run_fuel_graphql(fuel_operation)
422            .await
423            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425        self.decode_response(response)
426    }
427
428    fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
429    where
430        R: serde::de::DeserializeOwned + 'static,
431    {
432        self.update_chain_state_info(&response);
433
434        if let Some(failed) = response
435            .extensions
436            .as_ref()
437            .and_then(|e| e.fuel_block_height_precondition_failed)
438        {
439            if failed {
440                return Err(io::Error::new(
441                    io::ErrorKind::Other,
442                    "The required block height was not met",
443                ));
444            }
445        }
446
447        let response = response.response;
448
449        match (response.data, response.errors) {
450            (Some(d), _) => Ok(d),
451            (_, Some(e)) => Err(from_strings_errors_to_std_error(
452                e.into_iter().map(|e| e.message).collect(),
453            )),
454            _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
455        }
456    }
457
458    #[tracing::instrument(skip_all)]
459    #[cfg(feature = "subscriptions")]
460    async fn subscribe<ResponseData, Vars>(
461        &self,
462        q: StreamingOperation<ResponseData, Vars>,
463    ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
464    where
465        Vars: serde::Serialize,
466        ResponseData: serde::de::DeserializeOwned + 'static,
467    {
468        use core::ops::Deref;
469        use eventsource_client as es;
470        use hyper_rustls as _;
471        use reqwest::cookie::CookieStore;
472        let mut url = self.url.clone();
473        url.set_path("/v1/graphql-sub");
474
475        let required_fuel_block_height = self.required_block_height();
476        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
477
478        let json_query = serde_json::to_string(&fuel_operation)?;
479        let mut client_builder = es::ClientBuilder::for_url(url.as_str())
480            .map_err(|e| {
481                io::Error::new(
482                    io::ErrorKind::Other,
483                    format!("Failed to start client {e:?}"),
484                )
485            })?
486            .body(json_query)
487            .method("POST".to_string())
488            .header("content-type", "application/json")
489            .map_err(|e| {
490                io::Error::new(
491                    io::ErrorKind::Other,
492                    format!("Failed to add header to client {e:?}"),
493                )
494            })?;
495        if let Some(password) = url.password() {
496            let username = url.username();
497            let credentials = format!("{}:{}", username, password);
498            let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
499            client_builder = client_builder
500                .header("Authorization", &authorization)
501                .map_err(|e| {
502                    io::Error::new(
503                        io::ErrorKind::Other,
504                        format!("Failed to add header to client {e:?}"),
505                    )
506                })?;
507        }
508
509        if let Some(value) = self.cookie.deref().cookies(&self.url) {
510            let value = value.to_str().map_err(|e| {
511                io::Error::new(
512                    io::ErrorKind::Other,
513                    format!("Unable convert header value to string {e:?}"),
514                )
515            })?;
516            client_builder = client_builder
517                .header(reqwest::header::COOKIE.as_str(), value)
518                .map_err(|e| {
519                    io::Error::new(
520                        io::ErrorKind::Other,
521                        format!("Failed to add header from `reqwest` to client {e:?}"),
522                    )
523                })?;
524        }
525
526        let client = client_builder.build_with_conn(
527            hyper_rustls::HttpsConnectorBuilder::new()
528                .with_webpki_roots()
529                .https_or_http()
530                .enable_http1()
531                .build(),
532        );
533
534        let mut last = None;
535
536        let stream = es::Client::stream(&client)
537            .take_while(|result| {
538                futures::future::ready(!matches!(result, Err(es::Error::Eof)))
539            })
540            .filter_map(move |result| {
541                tracing::debug!("Got result: {result:?}");
542                let r = match result {
543                    Ok(es::SSE::Event(es::Event { data, .. })) => {
544                        match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
545                            &data,
546                        ) {
547                            Ok(resp) => {
548                                match self.decode_response(resp) {
549                                    Ok(resp) => {
550                                        match last.replace(data) {
551                                            // Remove duplicates
552                                            Some(l)
553                                                if l == *last.as_ref().expect(
554                                                    "Safe because of the replace above",
555                                                ) =>
556                                            {
557                                                None
558                                            }
559                                            _ => Some(Ok(resp)),
560                                        }
561                                    }
562                                    Err(e) => Some(Err(io::Error::new(
563                                        io::ErrorKind::Other,
564                                        format!("Decode error: {e:?}"),
565                                    ))),
566                                }
567                            }
568                            Err(e) => Some(Err(io::Error::new(
569                                io::ErrorKind::Other,
570                                format!("Json error: {e:?}"),
571                            ))),
572                        }
573                    }
574                    Ok(_) => None,
575                    Err(e) => Some(Err(io::Error::new(
576                        io::ErrorKind::Other,
577                        format!("Graphql error: {e:?}"),
578                    ))),
579                };
580                futures::future::ready(r)
581            });
582
583        Ok(stream)
584    }
585
586    pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
587        self.chain_state_info
588            .current_stf_version
589            .lock()
590            .ok()
591            .and_then(|value| *value)
592    }
593
594    pub fn latest_consensus_parameters_version(
595        &self,
596    ) -> Option<ConsensusParametersVersion> {
597        self.chain_state_info
598            .current_consensus_parameters_version
599            .lock()
600            .ok()
601            .and_then(|value| *value)
602    }
603
604    pub async fn health(&self) -> io::Result<bool> {
605        let query = schema::Health::build(());
606        self.query(query).await.map(|r| r.health)
607    }
608
609    pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
610        let query = schema::node_info::QueryNodeInfo::build(());
611        self.query(query).await.map(|r| r.node_info.into())
612    }
613
614    pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
615        let query = schema::gas_price::QueryLatestGasPrice::build(());
616        self.query(query).await.map(|r| r.latest_gas_price.into())
617    }
618
619    pub async fn estimate_gas_price(
620        &self,
621        block_horizon: u32,
622    ) -> io::Result<EstimateGasPrice> {
623        let args = BlockHorizonArgs {
624            block_horizon: Some(block_horizon.into()),
625        };
626        let query = schema::gas_price::QueryEstimateGasPrice::build(args);
627        self.query(query).await.map(|r| r.estimate_gas_price)
628    }
629
630    #[cfg(feature = "std")]
631    pub async fn connected_peers_info(
632        &self,
633    ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
634        let query = schema::node_info::QueryPeersInfo::build(());
635        self.query(query)
636            .await
637            .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
638    }
639
640    pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
641        let query = schema::chain::ChainQuery::build(());
642        self.query(query).await.and_then(|r| {
643            let result = r.chain.try_into()?;
644            Ok(result)
645        })
646    }
647
648    pub async fn consensus_parameters(
649        &self,
650        version: i32,
651    ) -> io::Result<Option<ConsensusParameters>> {
652        let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
653        let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
654
655        let result = self
656            .query(query)
657            .await?
658            .consensus_parameters
659            .map(TryInto::try_into)
660            .transpose()?;
661
662        Ok(result)
663    }
664
665    pub async fn state_transition_byte_code_by_version(
666        &self,
667        version: i32,
668    ) -> io::Result<Option<StateTransitionBytecode>> {
669        let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
670        let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
671
672        let result = self
673            .query(query)
674            .await?
675            .state_transition_bytecode_by_version
676            .map(TryInto::try_into)
677            .transpose()?;
678
679        Ok(result)
680    }
681
682    pub async fn state_transition_byte_code_by_root(
683        &self,
684        root: Bytes32,
685    ) -> io::Result<Option<StateTransitionBytecode>> {
686        let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
687            root: HexString(Bytes(root.to_vec())),
688        };
689        let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
690
691        let result = self
692            .query(query)
693            .await?
694            .state_transition_bytecode_by_root
695            .map(TryInto::try_into)
696            .transpose()?;
697
698        Ok(result)
699    }
700
701    /// Default dry run, matching the exact configuration as the node
702    pub async fn dry_run(
703        &self,
704        txs: &[Transaction],
705    ) -> io::Result<Vec<TransactionExecutionStatus>> {
706        self.dry_run_opt(txs, None, None, None).await
707    }
708
709    /// Dry run with options to override the node behavior
710    pub async fn dry_run_opt(
711        &self,
712        txs: &[Transaction],
713        // Disable utxo input checks (exists, unspent, and valid signature)
714        utxo_validation: Option<bool>,
715        gas_price: Option<u64>,
716        at_height: Option<BlockHeight>,
717    ) -> io::Result<Vec<TransactionExecutionStatus>> {
718        let txs = txs
719            .iter()
720            .map(|tx| HexString(Bytes(tx.to_bytes())))
721            .collect::<Vec<HexString>>();
722        let query: Operation<schema::tx::DryRun, DryRunArg> =
723            schema::tx::DryRun::build(DryRunArg {
724                txs,
725                utxo_validation,
726                gas_price: gas_price.map(|gp| gp.into()),
727                block_height: at_height.map(|bh| bh.into()),
728            });
729        let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
730        tx_statuses
731            .into_iter()
732            .map(|tx_status| tx_status.try_into().map_err(Into::into))
733            .collect()
734    }
735
736    /// Get storage read replay for a block
737    pub async fn storage_read_replay(
738        &self,
739        height: &BlockHeight,
740    ) -> io::Result<Vec<StorageReadReplayEvent>> {
741        let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
742            StorageReadReplay::build(StorageReadReplayArgs {
743                height: (*height).into(),
744            });
745        Ok(self
746            .query(query)
747            .await
748            .map(|r| r.storage_read_replay)?
749            .into_iter()
750            .map(Into::into)
751            .collect())
752    }
753
754    /// Assembles the transaction based on the provided requirements.
755    /// The return transaction contains:
756    /// - Input coins to cover `required_balances`
757    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
758    /// - `Change` or `Destroy` outputs for all assets from the inputs
759    /// - `Variable` outputs in the case they are required during the execution
760    /// - `Contract` inputs and outputs in the case they are required during the execution
761    /// - Reserved witness slots for signed coins filled with `64` zeroes
762    /// - Set script gas limit(unless `script` is empty)
763    /// - Estimated predicates, if `estimate_predicates == true`
764    ///
765    /// Returns an error if:
766    /// - The number of required balances exceeds the maximum number of inputs allowed.
767    /// - The fee address index is out of bounds.
768    /// - The same asset has multiple change policies(either the receiver of
769    ///     the change is different, or one of the policies states about the destruction
770    ///     of the token while the other does not). The `Change` output from the transaction
771    ///     also count as a `ChangePolicy`.
772    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
773    /// - Required assets have multiple entries.
774    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
775    /// - If a constructed transaction breaks the rules defined by consensus parameters.
776    #[allow(clippy::too_many_arguments)]
777    pub async fn assemble_tx(
778        &self,
779        tx: &Transaction,
780        block_horizon: u32,
781        required_balances: Vec<RequiredBalance>,
782        fee_address_index: u16,
783        exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
784        estimate_predicates: bool,
785        reserve_gas: Option<u64>,
786    ) -> io::Result<AssembleTransactionResult> {
787        let tx = HexString(Bytes(tx.to_bytes()));
788        let block_horizon = block_horizon.into();
789
790        let required_balances: Vec<_> = required_balances
791            .into_iter()
792            .map(schema::tx::RequiredBalance::try_from)
793            .collect::<Result<Vec<_>, _>>()?;
794
795        let fee_address_index = fee_address_index.into();
796
797        let exclude_input = exclude.map(Into::into);
798
799        let reserve_gas = reserve_gas.map(U64::from);
800
801        let query_arg = AssembleTxArg {
802            tx,
803            block_horizon,
804            required_balances,
805            fee_address_index,
806            exclude_input,
807            estimate_predicates,
808            reserve_gas,
809        };
810
811        let query = schema::tx::AssembleTx::build(query_arg);
812        let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
813        Ok(assemble_tx_result.try_into()?)
814    }
815
816    /// Estimate predicates for the transaction
817    pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
818        let serialized_tx = tx.to_bytes();
819        let query = schema::tx::EstimatePredicates::build(TxArg {
820            tx: HexString(Bytes(serialized_tx)),
821        });
822        let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
823        let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
824        *tx = tx_with_predicate;
825        Ok(())
826    }
827
828    pub async fn submit(
829        &self,
830        tx: &Transaction,
831    ) -> io::Result<types::primitives::TransactionId> {
832        self.submit_opt(tx, None).await
833    }
834
835    pub async fn submit_opt(
836        &self,
837        tx: &Transaction,
838        estimate_predicates: Option<bool>,
839    ) -> io::Result<types::primitives::TransactionId> {
840        let tx = tx.clone().to_bytes();
841        let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
842            tx: HexString(Bytes(tx)),
843            estimate_predicates,
844        });
845
846        let id = self.query(query).await.map(|r| r.submit)?.id.into();
847        Ok(id)
848    }
849
850    /// Similar to [`Self::submit_and_await_commit_opt`], but with default options.
851    #[cfg(feature = "subscriptions")]
852    pub async fn submit_and_await_commit(
853        &self,
854        tx: &Transaction,
855    ) -> io::Result<TransactionStatus> {
856        self.submit_and_await_commit_opt(tx, None).await
857    }
858
859    /// Submit the transaction and wait for it either to be included in
860    /// a block or removed from `TxPool`.
861    ///
862    /// If `estimate_predicates` is set, the predicates will be estimated before
863    /// the transaction is inserted into transaction pool.
864    ///
865    /// This will wait forever if needed, so consider wrapping this call
866    /// with a `tokio::time::timeout`.
867    #[cfg(feature = "subscriptions")]
868    pub async fn submit_and_await_commit_opt(
869        &self,
870        tx: &Transaction,
871        estimate_predicates: Option<bool>,
872    ) -> io::Result<TransactionStatus> {
873        use cynic::SubscriptionBuilder;
874        let tx = tx.clone().to_bytes();
875        let s =
876            schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
877                tx: HexString(Bytes(tx)),
878                estimate_predicates,
879            });
880
881        let mut stream = self.subscribe(s).await?.map(
882            |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
883                let status: TransactionStatus = r?.submit_and_await.try_into()?;
884                Result::<_, io::Error>::Ok(status)
885            },
886        );
887
888        let status = stream.next().await.ok_or(io::Error::new(
889            io::ErrorKind::Other,
890            "Failed to get status from the submission",
891        ))??;
892
893        Ok(status)
894    }
895
896    /// Similar to [`Self::submit_and_await_commit`], but the status also contains transaction.
897    #[cfg(feature = "subscriptions")]
898    pub async fn submit_and_await_commit_with_tx(
899        &self,
900        tx: &Transaction,
901    ) -> io::Result<StatusWithTransaction> {
902        self.submit_and_await_commit_with_tx_opt(tx, None).await
903    }
904
905    /// Similar to [`Self::submit_and_await_commit_opt`], but the status also contains transaction.
906    #[cfg(feature = "subscriptions")]
907    pub async fn submit_and_await_commit_with_tx_opt(
908        &self,
909        tx: &Transaction,
910        estimate_predicates: Option<bool>,
911    ) -> io::Result<StatusWithTransaction> {
912        use cynic::SubscriptionBuilder;
913        let tx = tx.clone().to_bytes();
914        let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
915            TxWithEstimatedPredicatesArg {
916                tx: HexString(Bytes(tx)),
917                estimate_predicates,
918            },
919        );
920
921        let mut stream = self.subscribe(s).await?.map(
922            |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
923                let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
924                Result::<_, io::Error>::Ok(status)
925            },
926        );
927
928        let status = stream.next().await.ok_or(io::Error::new(
929            io::ErrorKind::Other,
930            "Failed to get status from the submission",
931        ))??;
932
933        Ok(status)
934    }
935
936    /// Similar to [`Self::submit_and_await_commit`], but includes all intermediate states.
937    #[cfg(feature = "subscriptions")]
938    pub async fn submit_and_await_status<'a>(
939        &'a self,
940        tx: &'a Transaction,
941    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
942        self.submit_and_await_status_opt(tx, None).await
943    }
944
945    /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
946    #[cfg(feature = "subscriptions")]
947    pub async fn submit_and_await_status_opt<'a>(
948        &'a self,
949        tx: &'a Transaction,
950        estimate_predicates: Option<bool>,
951    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
952        use cynic::SubscriptionBuilder;
953        let tx = tx.clone().to_bytes();
954        let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
955            TxWithEstimatedPredicatesArg {
956                tx: HexString(Bytes(tx)),
957                estimate_predicates,
958            },
959        );
960
961        let stream = self.subscribe(s).await?.map(
962            |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
963                let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
964                Result::<_, io::Error>::Ok(status)
965            },
966        );
967
968        Ok(stream)
969    }
970
971    /// Requests all storage slots for the `contract_id`.
972    #[cfg(feature = "subscriptions")]
973    pub async fn contract_storage_slots<'a>(
974        &'a self,
975        contract_id: &'a ContractId,
976    ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + 'a> {
977        use cynic::SubscriptionBuilder;
978        use schema::storage::ContractStorageSlotsArgs;
979        let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
980            contract_id: (*contract_id).into(),
981        });
982
983        let stream = self.subscribe(s).await?.map(
984            |result: io::Result<schema::storage::ContractStorageSlots>| {
985                let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
986                Result::<_, io::Error>::Ok(result)
987            },
988        );
989
990        Ok(stream)
991    }
992
993    /// Requests all storage balances for the `contract_id`.
994    #[cfg(feature = "subscriptions")]
995    pub async fn contract_storage_balances<'a>(
996        &'a self,
997        contract_id: &'a ContractId,
998    ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + 'a>
999    {
1000        use cynic::SubscriptionBuilder;
1001        use schema::{
1002            contract::ContractBalance,
1003            storage::ContractStorageBalancesArgs,
1004        };
1005        let s = schema::storage::ContractStorageBalances::build(
1006            ContractStorageBalancesArgs {
1007                contract_id: (*contract_id).into(),
1008            },
1009        );
1010
1011        let stream = self.subscribe(s).await?.map(
1012            |result: io::Result<schema::storage::ContractStorageBalances>| {
1013                let result: ContractBalance = result?.contract_storage_balances;
1014                Result::<_, io::Error>::Ok(result)
1015            },
1016        );
1017
1018        Ok(stream)
1019    }
1020
1021    pub async fn contract_slots_values(
1022        &self,
1023        contract_id: &ContractId,
1024        block_height: Option<BlockHeight>,
1025        requested_storage_slots: Vec<Bytes32>,
1026    ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1027        let query = schema::storage::ContractSlotValues::build(
1028            schema::storage::ContractSlotValuesArgs {
1029                contract_id: (*contract_id).into(),
1030                block_height: block_height.map(|b| (*b).into()),
1031                storage_slots: requested_storage_slots
1032                    .into_iter()
1033                    .map(Into::into)
1034                    .collect(),
1035            },
1036        );
1037
1038        self.query(query)
1039            .await
1040            .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1041    }
1042
1043    pub async fn contract_balance_values(
1044        &self,
1045        contract_id: &ContractId,
1046        block_height: Option<BlockHeight>,
1047        requested_storage_slots: Vec<AssetId>,
1048    ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1049        let query = schema::storage::ContractBalanceValues::build(
1050            schema::storage::ContractBalanceValuesArgs {
1051                contract_id: (*contract_id).into(),
1052                block_height: block_height.map(|b| (*b).into()),
1053                assets: requested_storage_slots
1054                    .into_iter()
1055                    .map(Into::into)
1056                    .collect(),
1057            },
1058        );
1059
1060        self.query(query).await.map(|r| {
1061            r.contract_balance_values
1062                .into_iter()
1063                .map(Into::into)
1064                .collect()
1065        })
1066    }
1067
1068    pub async fn start_session(&self) -> io::Result<String> {
1069        let query = schema::StartSession::build(());
1070
1071        self.query(query)
1072            .await
1073            .map(|r| r.start_session.into_inner())
1074    }
1075
1076    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1077        let query = schema::EndSession::build(IdArg { id: id.into() });
1078
1079        self.query(query).await.map(|r| r.end_session)
1080    }
1081
1082    pub async fn reset(&self, id: &str) -> io::Result<bool> {
1083        let query = schema::Reset::build(IdArg { id: id.into() });
1084
1085        self.query(query).await.map(|r| r.reset)
1086    }
1087
1088    pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1089        let op = serde_json::to_string(op)?;
1090        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1091
1092        self.query(query).await.map(|r| r.execute)
1093    }
1094
1095    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1096        let query = schema::Register::build(RegisterArgs {
1097            id: id.into(),
1098            register: register.into(),
1099        });
1100
1101        Ok(self.query(query).await?.register.0 as Word)
1102    }
1103
1104    pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1105        let query = schema::Memory::build(MemoryArgs {
1106            id: id.into(),
1107            start: start.into(),
1108            size: size.into(),
1109        });
1110
1111        let memory = self.query(query).await?.memory;
1112
1113        Ok(serde_json::from_str(memory.as_str())?)
1114    }
1115
1116    pub async fn set_breakpoint(
1117        &self,
1118        session_id: &str,
1119        contract: fuel_types::ContractId,
1120        pc: u64,
1121    ) -> io::Result<()> {
1122        let operation = SetBreakpoint::build(SetBreakpointArgs {
1123            id: Id::new(session_id),
1124            bp: schema::Breakpoint {
1125                contract: contract.into(),
1126                pc: U64(pc),
1127            },
1128        });
1129
1130        let response = self.query(operation).await?;
1131        assert!(
1132            response.set_breakpoint,
1133            "Setting breakpoint returned invalid reply"
1134        );
1135        Ok(())
1136    }
1137
1138    pub async fn set_single_stepping(
1139        &self,
1140        session_id: &str,
1141        enable: bool,
1142    ) -> io::Result<()> {
1143        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1144            id: Id::new(session_id),
1145            enable,
1146        });
1147        self.query(operation).await?;
1148        Ok(())
1149    }
1150
1151    pub async fn start_tx(
1152        &self,
1153        session_id: &str,
1154        tx: &Transaction,
1155    ) -> io::Result<RunResult> {
1156        let operation = StartTx::build(StartTxArgs {
1157            id: Id::new(session_id),
1158            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1159        });
1160        let response = self.query(operation).await?.start_tx;
1161        Ok(response)
1162    }
1163
1164    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1165        let operation = ContinueTx::build(ContinueTxArgs {
1166            id: Id::new(session_id),
1167        });
1168        let response = self.query(operation).await?.continue_tx;
1169        Ok(response)
1170    }
1171
1172    pub async fn transaction(
1173        &self,
1174        id: &TxId,
1175    ) -> io::Result<Option<TransactionResponse>> {
1176        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1177
1178        let transaction = self.query(query).await?.transaction;
1179
1180        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1181    }
1182
1183    /// Get the status of a transaction
1184    pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1185        let query =
1186            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1187
1188        let status = self.query(query).await?.transaction.ok_or_else(|| {
1189            io::Error::new(
1190                ErrorKind::NotFound,
1191                format!("status not found for transaction {id} "),
1192            )
1193        })?;
1194
1195        let status = status
1196            .status
1197            .ok_or_else(|| {
1198                io::Error::new(
1199                    ErrorKind::NotFound,
1200                    format!("status not found for transaction {id}"),
1201                )
1202            })?
1203            .try_into()?;
1204        Ok(status)
1205    }
1206
1207    #[tracing::instrument(skip(self), level = "debug")]
1208    #[cfg(feature = "subscriptions")]
1209    /// Subscribe to the status of a transaction
1210    pub async fn subscribe_transaction_status<'a>(
1211        &'a self,
1212        id: &'a TxId,
1213    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + 'a> {
1214        use cynic::SubscriptionBuilder;
1215        let tx_id: TransactionId = (*id).into();
1216        let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: tx_id });
1217
1218        tracing::debug!("subscribing");
1219        let stream = self.subscribe(s).await?.map(|tx| {
1220            tracing::debug!("received {tx:?}");
1221            let tx = tx?;
1222            let status = tx.status_change.try_into()?;
1223            Ok(status)
1224        });
1225
1226        Ok(stream)
1227    }
1228
1229    #[cfg(feature = "subscriptions")]
1230    /// Awaits for the transaction to be committed into a block
1231    ///
1232    /// This will wait forever if needed, so consider wrapping this call
1233    /// with a `tokio::time::timeout`.
1234    pub async fn await_transaction_commit(
1235        &self,
1236        id: &TxId,
1237    ) -> io::Result<TransactionStatus> {
1238        // skip until we've reached a final status and then stop consuming the stream
1239        // to avoid an EOF which the eventsource client considers as an error.
1240        let status_result = self
1241            .subscribe_transaction_status(id)
1242            .await?
1243            .skip_while(|status| {
1244                future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1245            })
1246            .next()
1247            .await;
1248
1249        if let Some(Ok(status)) = status_result {
1250            Ok(status)
1251        } else {
1252            Err(io::Error::new(
1253                io::ErrorKind::Other,
1254                format!("Failed to get status for transaction {status_result:?}"),
1255            ))
1256        }
1257    }
1258
1259    /// returns a paginated set of transactions sorted by block height
1260    pub async fn transactions(
1261        &self,
1262        request: PaginationRequest<String>,
1263    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1264        let args = schema::ConnectionArgs::from(request);
1265        let query = schema::tx::TransactionsQuery::build(args);
1266        let transactions = self.query(query).await?.transactions.try_into()?;
1267        Ok(transactions)
1268    }
1269
1270    /// Returns a paginated set of transactions associated with a txo owner address.
1271    pub async fn transactions_by_owner(
1272        &self,
1273        owner: &Address,
1274        request: PaginationRequest<String>,
1275    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1276        let owner: schema::Address = (*owner).into();
1277        let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1278        let query = schema::tx::TransactionsByOwnerQuery::build(args);
1279
1280        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1281        Ok(transactions)
1282    }
1283
1284    pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1285        let query =
1286            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1287
1288        let tx = self.query(query).await?.transaction.ok_or_else(|| {
1289            io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1290        })?;
1291
1292        let receipts = match tx.status {
1293            Some(status) => match status {
1294                schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1295                    s.receipts
1296                        .into_iter()
1297                        .map(TryInto::<Receipt>::try_into)
1298                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1299                )
1300                .transpose()?,
1301                schema::tx::TransactionStatus::FailureStatus(s) => Some(
1302                    s.receipts
1303                        .into_iter()
1304                        .map(TryInto::<Receipt>::try_into)
1305                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1306                )
1307                .transpose()?,
1308                _ => None,
1309            },
1310            _ => None,
1311        };
1312
1313        Ok(receipts)
1314    }
1315
1316    #[cfg(feature = "test-helpers")]
1317    pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1318        let query = schema::tx::AllReceipts::build(());
1319        let receipts = self.query(query).await?.all_receipts;
1320
1321        let vec: Result<Vec<Receipt>, ConversionError> = receipts
1322            .into_iter()
1323            .map(TryInto::<Receipt>::try_into)
1324            .collect();
1325
1326        Ok(vec?)
1327    }
1328
1329    pub async fn produce_blocks(
1330        &self,
1331        blocks_to_produce: u32,
1332        start_timestamp: Option<u64>,
1333    ) -> io::Result<BlockHeight> {
1334        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1335            blocks_to_produce: blocks_to_produce.into(),
1336            start_timestamp: start_timestamp
1337                .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1338        });
1339
1340        let new_height = self.query(query).await?.produce_blocks;
1341
1342        Ok(new_height.into())
1343    }
1344
1345    pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1346        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1347            id: Some((*id).into()),
1348        });
1349
1350        let block = self
1351            .query(query)
1352            .await?
1353            .block
1354            .map(TryInto::try_into)
1355            .transpose()?;
1356
1357        Ok(block)
1358    }
1359
1360    pub async fn block_by_height(
1361        &self,
1362        height: BlockHeight,
1363    ) -> io::Result<Option<types::Block>> {
1364        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1365            height: Some(U32(height.into())),
1366        });
1367
1368        let block = self
1369            .query(query)
1370            .await?
1371            .block
1372            .map(TryInto::try_into)
1373            .transpose()?;
1374
1375        Ok(block)
1376    }
1377
1378    pub async fn da_compressed_block(
1379        &self,
1380        height: BlockHeight,
1381    ) -> io::Result<Option<Vec<u8>>> {
1382        let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1383            DaCompressedBlockByHeightArgs {
1384                height: U32(height.into()),
1385            },
1386        );
1387
1388        Ok(self
1389            .query(query)
1390            .await?
1391            .da_compressed_block
1392            .map(|b| b.bytes.into()))
1393    }
1394
1395    /// Retrieve a blob by its ID
1396    pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1397        let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1398        let blob = self.query(query).await?.blob.map(Into::into);
1399        Ok(blob)
1400    }
1401
1402    /// Check whether a blob with ID exists
1403    pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1404        let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1405        Ok(self.query(query).await?.blob.is_some())
1406    }
1407
1408    /// Retrieve multiple blocks
1409    pub async fn blocks(
1410        &self,
1411        request: PaginationRequest<String>,
1412    ) -> io::Result<PaginatedResult<types::Block, String>> {
1413        let args = schema::ConnectionArgs::from(request);
1414        let query = schema::block::BlocksQuery::build(args);
1415
1416        let blocks = self.query(query).await?.blocks.try_into()?;
1417
1418        Ok(blocks)
1419    }
1420
1421    pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1422        let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1423            utxo_id: (*id).into(),
1424        });
1425        let coin = self.query(query).await?.coin.map(Into::into);
1426        Ok(coin)
1427    }
1428
1429    /// Retrieve a page of coins by their owner
1430    pub async fn coins(
1431        &self,
1432        owner: &Address,
1433        asset_id: Option<&AssetId>,
1434        request: PaginationRequest<String>,
1435    ) -> io::Result<PaginatedResult<types::Coin, String>> {
1436        let owner: schema::Address = (*owner).into();
1437        let asset_id: schema::AssetId = match asset_id {
1438            Some(asset_id) => (*asset_id).into(),
1439            None => schema::AssetId::default(),
1440        };
1441        let args = CoinsConnectionArgs::from((owner, asset_id, request));
1442        let query = schema::coins::CoinsQuery::build(args);
1443
1444        let coins = self.query(query).await?.coins.into();
1445        Ok(coins)
1446    }
1447
1448    /// Retrieve coins to spend in a transaction
1449    pub async fn coins_to_spend(
1450        &self,
1451        owner: &Address,
1452        spend_query: Vec<(AssetId, u128, Option<u16>)>,
1453        // (Utxos, Messages Nonce)
1454        excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1455    ) -> io::Result<Vec<Vec<types::CoinType>>> {
1456        let owner: schema::Address = (*owner).into();
1457        let spend_query: Vec<SpendQueryElementInput> = spend_query
1458            .iter()
1459            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1460                Ok(SpendQueryElementInput {
1461                    asset_id: (*asset_id).into(),
1462                    amount: (*amount).into(),
1463                    max: (*max).map(|max| max.into()),
1464                })
1465            })
1466            .try_collect()?;
1467        let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1468        let args =
1469            schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1470        let query = schema::coins::CoinsToSpendQuery::build(args);
1471
1472        let coins_per_asset = self
1473            .query(query)
1474            .await?
1475            .coins_to_spend
1476            .into_iter()
1477            .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1478            .collect::<Vec<_>>();
1479        Ok(coins_per_asset)
1480    }
1481
1482    pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1483        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1484            id: (*id).into(),
1485        });
1486        let contract = self.query(query).await?.contract.map(Into::into);
1487        Ok(contract)
1488    }
1489
1490    pub async fn contract_balance(
1491        &self,
1492        id: &ContractId,
1493        asset: Option<&AssetId>,
1494    ) -> io::Result<u64> {
1495        let asset_id: schema::AssetId = match asset {
1496            Some(asset) => (*asset).into(),
1497            None => schema::AssetId::default(),
1498        };
1499
1500        let query =
1501            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1502                id: (*id).into(),
1503                asset: asset_id,
1504            });
1505
1506        let balance: types::ContractBalance =
1507            self.query(query).await?.contract_balance.into();
1508        Ok(balance.amount)
1509    }
1510
1511    pub async fn balance(
1512        &self,
1513        owner: &Address,
1514        asset_id: Option<&AssetId>,
1515    ) -> io::Result<u64> {
1516        let owner: schema::Address = (*owner).into();
1517        let asset_id: schema::AssetId = match asset_id {
1518            Some(asset_id) => (*asset_id).into(),
1519            None => schema::AssetId::default(),
1520        };
1521        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1522        let balance: types::Balance = self.query(query).await?.balance.into();
1523        Ok(balance.amount.try_into().unwrap_or(u64::MAX))
1524    }
1525
1526    // Retrieve a page of balances by their owner
1527    pub async fn balances(
1528        &self,
1529        owner: &Address,
1530        request: PaginationRequest<String>,
1531    ) -> io::Result<PaginatedResult<types::Balance, String>> {
1532        let owner: schema::Address = (*owner).into();
1533        let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1534        let query = schema::balance::BalancesQuery::build(args);
1535
1536        let balances = self.query(query).await?.balances.into();
1537        Ok(balances)
1538    }
1539
1540    pub async fn contract_balances(
1541        &self,
1542        contract: &ContractId,
1543        request: PaginationRequest<String>,
1544    ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1545        let contract_id: schema::ContractId = (*contract).into();
1546        let args = ContractBalancesConnectionArgs::from((contract_id, request));
1547        let query = schema::contract::ContractBalancesQuery::build(args);
1548
1549        let balances = self.query(query).await?.contract_balances.into();
1550
1551        Ok(balances)
1552    }
1553
1554    // Retrieve a message by its nonce
1555    pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1556        let query = schema::message::MessageQuery::build(NonceArgs {
1557            nonce: (*nonce).into(),
1558        });
1559        let message = self.query(query).await?.message.map(Into::into);
1560        Ok(message)
1561    }
1562
1563    pub async fn messages(
1564        &self,
1565        owner: Option<&Address>,
1566        request: PaginationRequest<String>,
1567    ) -> io::Result<PaginatedResult<types::Message, String>> {
1568        let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1569        let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1570        let query = schema::message::OwnedMessageQuery::build(args);
1571
1572        let messages = self.query(query).await?.messages.into();
1573
1574        Ok(messages)
1575    }
1576
1577    pub async fn contract_info(
1578        &self,
1579        contract: &ContractId,
1580    ) -> io::Result<Option<types::Contract>> {
1581        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1582            id: (*contract).into(),
1583        });
1584        let contract_info = self.query(query).await?.contract.map(Into::into);
1585        Ok(contract_info)
1586    }
1587
1588    pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1589        let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1590            nonce: (*nonce).into(),
1591        });
1592        let status = self.query(query).await?.message_status.into();
1593
1594        Ok(status)
1595    }
1596
1597    /// Request a merkle proof of an output message.
1598    pub async fn message_proof(
1599        &self,
1600        transaction_id: &TxId,
1601        nonce: &Nonce,
1602        commit_block_id: Option<&BlockId>,
1603        commit_block_height: Option<BlockHeight>,
1604    ) -> io::Result<types::MessageProof> {
1605        let transaction_id: TransactionId = (*transaction_id).into();
1606        let nonce: schema::Nonce = (*nonce).into();
1607        let commit_block_id: Option<schema::BlockId> =
1608            commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1609        let commit_block_height = commit_block_height.map(Into::into);
1610        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1611            transaction_id,
1612            nonce,
1613            commit_block_id,
1614            commit_block_height,
1615        });
1616        let proof = self.query(query).await?.message_proof.try_into()?;
1617        Ok(proof)
1618    }
1619
1620    pub async fn relayed_transaction_status(
1621        &self,
1622        id: &Bytes32,
1623    ) -> io::Result<Option<RelayedTransactionStatus>> {
1624        let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1625            RelayedTransactionStatusArgs {
1626                id: id.to_owned().into(),
1627            },
1628        );
1629        let status = self
1630            .query(query)
1631            .await?
1632            .relayed_transaction_status
1633            .map(|status| status.try_into())
1634            .transpose()?;
1635        Ok(status)
1636    }
1637
1638    pub async fn asset_info(&self, asset_id: &AssetId) -> io::Result<AssetDetail> {
1639        let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1640            id: (*asset_id).into(),
1641        });
1642        let asset_info = self.query(query).await?.asset_details.into();
1643        Ok(asset_info)
1644    }
1645}
1646
1647#[cfg(any(test, feature = "test-helpers"))]
1648impl FuelClient {
1649    pub async fn transparent_transaction(
1650        &self,
1651        id: &TxId,
1652    ) -> io::Result<Option<types::TransactionType>> {
1653        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1654
1655        let transaction = self.query(query).await?.transaction;
1656
1657        Ok(transaction
1658            .map(|tx| {
1659                let response: TransactionResponse = tx.try_into()?;
1660                Ok::<_, ConversionError>(response.transaction)
1661            })
1662            .transpose()?)
1663    }
1664}