Skip to main content

fuel_core_client/
client.rs

1use self::schema::{
2    block::ProduceBlockArgs,
3    message::{
4        MessageProofArgs,
5        NonceArgs,
6    },
7};
8#[cfg(feature = "subscriptions")]
9use crate::client::types::StatusWithTransaction;
10use crate::{
11    client::{
12        schema::{
13            Tai64Timestamp,
14            TransactionId,
15            block::BlockByHeightArgs,
16            coins::{
17                ExcludeInput,
18                SpendQueryElementInput,
19            },
20            contract::ContractBalanceQueryArgs,
21            gas_price::EstimateGasPrice,
22            message::MessageStatusArgs,
23            relayed_tx::RelayedTransactionStatusArgs,
24            tx::{
25                DryRunArg,
26                TxWithEstimatedPredicatesArg,
27            },
28        },
29        types::{
30            RelayedTransactionStatus,
31            asset::AssetDetail,
32            gas_price::LatestGasPrice,
33            message::MessageStatus,
34            primitives::{
35                Address,
36                AssetId,
37                BlockId,
38                ContractId,
39                UtxoId,
40            },
41            upgrades::StateTransitionBytecode,
42        },
43    },
44    reqwest_ext::FuelGraphQlResponse,
45    transport::FailoverTransport,
46};
47use anyhow::{
48    Context,
49    anyhow,
50};
51#[cfg(feature = "subscriptions")]
52use cynic::SubscriptionBuilder;
53use cynic::{
54    Id,
55    MutationBuilder,
56    Operation,
57    QueryBuilder,
58    QueryFragment,
59    QueryVariables,
60};
61use fuel_core_types::{
62    blockchain::header::{
63        ConsensusParametersVersion,
64        StateTransitionBytecodeVersion,
65    },
66    fuel_asm::{
67        Instruction,
68        Word,
69    },
70    fuel_tx::{
71        BlobId,
72        Bytes32,
73        ConsensusParameters,
74        Receipt,
75        Transaction,
76        TxId,
77    },
78    fuel_types::{
79        self,
80        BlockHeight,
81        Nonce,
82        canonical::Serialize,
83    },
84    services::executor::{
85        StorageReadReplayEvent,
86        TransactionExecutionStatus,
87    },
88};
89#[cfg(feature = "subscriptions")]
90use futures::{
91    Stream,
92    StreamExt,
93};
94use itertools::Itertools;
95use pagination::{
96    PageDirection,
97    PaginatedResult,
98    PaginationRequest,
99};
100use reqwest::Url;
101use schema::{
102    Bytes,
103    ContinueTx,
104    ContinueTxArgs,
105    ConversionError,
106    HexString,
107    IdArg,
108    MemoryArgs,
109    RegisterArgs,
110    RunResult,
111    SetBreakpoint,
112    SetBreakpointArgs,
113    SetSingleStepping,
114    SetSingleSteppingArgs,
115    StartTx,
116    StartTxArgs,
117    U32,
118    U64,
119    assets::AssetInfoArg,
120    balance::BalanceArgs,
121    blob::BlobByIdArgs,
122    block::BlockByIdArgs,
123    coins::{
124        CoinByIdArgs,
125        CoinsConnectionArgs,
126    },
127    contract::{
128        ContractBalancesConnectionArgs,
129        ContractByIdArgs,
130    },
131    da_compressed::DaCompressedBlockByHeightArgs,
132    gas_price::BlockHorizonArgs,
133    storage_read_replay::{
134        StorageReadReplay,
135        StorageReadReplayArgs,
136    },
137    tx::{
138        AssembleTxArg,
139        TransactionsByOwnerConnectionArgs,
140        TxArg,
141        TxIdArgs,
142    },
143};
144#[cfg(feature = "subscriptions")]
145use std::future;
146use std::{
147    convert::TryInto,
148    io::{
149        self,
150        ErrorKind,
151    },
152    net,
153    str::{
154        self,
155        FromStr,
156    },
157    sync::{
158        Arc,
159        Mutex,
160    },
161};
162use tai64::Tai64;
163use tracing as _;
164use types::{
165    TransactionResponse,
166    TransactionStatus,
167    assemble_tx::{
168        AssembleTransactionResult,
169        RequiredBalance,
170    },
171};
172
173#[cfg(feature = "subscriptions")]
174use std::pin::Pin;
175
176#[cfg(feature = "rpc")]
177mod rpc_deps {
178    pub use aws_config::{
179        BehaviorVersion,
180        default_provider::credentials::DefaultCredentialsChain,
181    };
182    pub use aws_sdk_s3::Client as AWSClient;
183    pub use flate2::read::GzDecoder;
184    pub use fuel_core_block_aggregator_api::{
185        blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf,
186        protobuf_types::{
187            Block as ProtoBlock,
188            BlockHeightRequest as ProtoBlockHeightRequest,
189            BlockRangeRequest as ProtoBlockRangeRequest,
190            BlockResponse,
191            NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest,
192            RemoteBlockResponse,
193            RemoteS3Bucket,
194            block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient,
195            block_response::Payload,
196            remote_block_response::Location,
197        },
198    };
199    pub use prost::Message;
200    pub use std::{
201        collections::HashMap,
202        io::Read,
203    };
204    pub use tokio::sync::RwLock;
205    pub use tonic::transport::Channel;
206}
207#[cfg(feature = "rpc")]
208use rpc_deps::*;
209
210pub mod pagination;
211pub mod schema;
212pub mod types;
213
214type RegisterId = u32;
215
216#[derive(Debug, derive_more::Display, derive_more::From)]
217#[non_exhaustive]
218/// Error occurring during interaction with the FuelClient
219// anyhow::Error is wrapped inside a custom Error type,
220// so that we can specific error variants in the future.
221pub enum Error {
222    /// Unknown or not expected(by architecture) error.
223    #[from]
224    Other(anyhow::Error),
225}
226
227/// Consistency policy for the [`FuelClient`] to define the strategy
228/// for the required height feature.
229#[derive(Debug)]
230pub enum ConsistencyPolicy {
231    /// Automatically fetch the next block height from the response and
232    /// use it as an input to the next query to guarantee consistency
233    /// of the results for the queries.
234    Auto {
235        /// The required block height for the queries.
236        height: Arc<Mutex<Option<BlockHeight>>>,
237    },
238    /// Use manually sets the block height for all queries
239    /// via the [`FuelClient::with_required_fuel_block_height`].
240    Manual {
241        /// The required block height for the queries.
242        height: Option<BlockHeight>,
243    },
244}
245
246impl Clone for ConsistencyPolicy {
247    fn clone(&self) -> Self {
248        match self {
249            Self::Auto { height } => Self::Auto {
250                // We don't want to share the same mutex between the different
251                // instances of the `FuelClient`.
252                height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
253            },
254            Self::Manual { height } => Self::Manual { height: *height },
255        }
256    }
257}
258
259#[derive(Debug, Default)]
260struct ChainStateInfo {
261    current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
262    current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
263}
264
265impl Clone for ChainStateInfo {
266    fn clone(&self) -> Self {
267        Self {
268            current_stf_version: Arc::new(Mutex::new(
269                self.current_stf_version.lock().ok().and_then(|v| *v),
270            )),
271            current_consensus_parameters_version: Arc::new(Mutex::new(
272                self.current_consensus_parameters_version
273                    .lock()
274                    .ok()
275                    .and_then(|v| *v),
276            )),
277        }
278    }
279}
280
281#[derive(Debug, Clone)]
282pub struct FuelClient {
283    transport: FailoverTransport,
284    require_height: ConsistencyPolicy,
285    chain_state_info: ChainStateInfo,
286    #[cfg(feature = "rpc")]
287    rpc_client: Option<ProtoBlockAggregatorClient<Channel>>,
288    #[cfg(feature = "rpc")]
289    aws_client: AWSClientManager,
290}
291
292#[cfg(feature = "rpc")]
293#[derive(Debug, Clone)]
294pub struct AWSClientManager {
295    specific: Arc<RwLock<HashMap<Option<String>, AWSClient>>>,
296}
297
298#[cfg(feature = "rpc")]
299impl AWSClientManager {
300    pub fn new() -> Self {
301        Self {
302            specific: Arc::new(RwLock::new(HashMap::new())),
303        }
304    }
305
306    pub async fn get_client(&self, url: &Option<String>) -> Result<AWSClient, io::Error> {
307        if let Some(existing) = self.specific.read().await.get(url).cloned() {
308            return Ok(existing);
309        }
310        let client = FuelClient::new_aws_client(url).await;
311        let mut guard = self.specific.write().await;
312        let client = guard
313            .entry(url.clone())
314            .or_insert_with(|| client.clone())
315            .clone();
316        Ok(client)
317    }
318}
319
320#[cfg(feature = "rpc")]
321impl Default for AWSClientManager {
322    fn default() -> Self {
323        Self::new()
324    }
325}
326
327/// Normalizes a URL string by ensuring it has an http(s) scheme and the `/v1/graphql` path.
328fn normalize_url(url_str: &str) -> anyhow::Result<Url> {
329    let mut raw_url = url_str.to_string();
330    if !raw_url.starts_with("http") {
331        raw_url = format!("http://{raw_url}");
332    }
333
334    let mut url = reqwest::Url::parse(&raw_url)
335        .map_err(anyhow::Error::msg)
336        .with_context(|| format!("Invalid fuel-core URL: {url_str}"))?;
337    url.set_path("/v1/graphql");
338
339    Ok(url)
340}
341
342impl FromStr for FuelClient {
343    type Err = anyhow::Error;
344
345    fn from_str(str: &str) -> Result<Self, Self::Err> {
346        let url = normalize_url(str)?;
347
348        Ok(Self {
349            transport: FailoverTransport::new(vec![url])?,
350            require_height: ConsistencyPolicy::Auto {
351                height: Arc::new(Mutex::new(None)),
352            },
353            chain_state_info: Default::default(),
354            #[cfg(feature = "rpc")]
355            rpc_client: None,
356            #[cfg(feature = "rpc")]
357            aws_client: AWSClientManager::new(),
358        })
359    }
360}
361
362impl<S> From<S> for FuelClient
363where
364    S: Into<net::SocketAddr>,
365{
366    fn from(socket: S) -> Self {
367        format!("http://{}", socket.into())
368            .as_str()
369            .parse()
370            .unwrap()
371    }
372}
373
374pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
375    let e = errors
376        .into_iter()
377        .fold(String::from("Response errors"), |mut s, e| {
378            s.push_str("; ");
379            s.push_str(e.as_str());
380            s
381        });
382    io::Error::other(e)
383}
384
385impl FuelClient {
386    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
387        Self::from_str(url.as_ref())
388    }
389
390    #[cfg(feature = "rpc")]
391    pub async fn new_with_rpc<G: AsRef<str>, R: AsRef<str>>(
392        graph_ql_urls: impl Iterator<Item = G>,
393        rpc_url: R,
394    ) -> anyhow::Result<Self> {
395        let urls: Vec<_> = graph_ql_urls
396            .map(|str| normalize_url(str.as_ref()))
397            .try_collect()?;
398        let mut client = Self::with_urls(&urls)?;
399        let mut raw_rpc_url = <R as AsRef<str>>::as_ref(&rpc_url).to_string();
400        if !raw_rpc_url.starts_with("http") {
401            raw_rpc_url = format!("http://{raw_rpc_url}");
402        }
403        let rpc_client = ProtoBlockAggregatorClient::connect(raw_rpc_url).await?;
404        client.rpc_client = Some(rpc_client);
405        client.aws_client = AWSClientManager::new();
406        Ok(client)
407    }
408
409    pub fn with_urls(urls: &[impl AsRef<str>]) -> anyhow::Result<Self> {
410        if urls.is_empty() {
411            return Err(anyhow!("Failed to create FuelClient. No URL is provided."));
412        }
413        let urls = urls
414            .iter()
415            .map(|url| normalize_url(url.as_ref()))
416            .collect::<Result<Vec<_>, _>>()?;
417        Ok(Self {
418            transport: FailoverTransport::new(urls)?,
419            require_height: ConsistencyPolicy::Auto {
420                height: Arc::new(Mutex::new(None)),
421            },
422            chain_state_info: Default::default(),
423            #[cfg(feature = "rpc")]
424            rpc_client: None,
425            #[cfg(feature = "rpc")]
426            aws_client: AWSClientManager::new(),
427        })
428    }
429
430    pub fn get_default_url(&self) -> &Url {
431        self.transport.get_default_url()
432    }
433}
434
435impl FuelClient {
436    pub fn with_required_fuel_block_height(
437        &mut self,
438        new_height: Option<BlockHeight>,
439    ) -> &mut Self {
440        match &mut self.require_height {
441            ConsistencyPolicy::Auto { height } => {
442                *height.lock().expect("Mutex poisoned") = new_height;
443            }
444            ConsistencyPolicy::Manual { height } => {
445                *height = new_height;
446            }
447        }
448        self
449    }
450
451    pub fn use_manual_consistency_policy(
452        &mut self,
453        height: Option<BlockHeight>,
454    ) -> &mut Self {
455        self.require_height = ConsistencyPolicy::Manual { height };
456        self
457    }
458
459    pub fn decode_response<R, E>(
460        &self,
461        response: FuelGraphQlResponse<R, E>,
462    ) -> io::Result<R>
463    where
464        R: serde::de::DeserializeOwned + 'static,
465    {
466        if response
467            .extensions
468            .as_ref()
469            .and_then(|e| e.fuel_block_height_precondition_failed)
470            == Some(true)
471        {
472            return Err(io::Error::other("The required block height was not met"));
473        }
474
475        let response = response.response;
476
477        match (response.data, response.errors) {
478            (Some(d), _) => Ok(d),
479            (_, Some(e)) => Err(from_strings_errors_to_std_error(
480                e.into_iter().map(|e| e.message).collect(),
481            )),
482            _ => Err(io::Error::other("Invalid response")),
483        }
484    }
485
486    pub fn required_block_height(&self) -> Option<BlockHeight> {
487        match &self.require_height {
488            ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
489            ConsistencyPolicy::Manual { height } => *height,
490        }
491    }
492
493    fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
494        if let Some(current_sft_version) = response
495            .extensions
496            .as_ref()
497            .and_then(|e| e.current_stf_version)
498            && let Ok(mut c) = self.chain_state_info.current_stf_version.lock()
499        {
500            *c = Some(current_sft_version);
501        }
502
503        if let Some(current_consensus_parameters_version) = response
504            .extensions
505            .as_ref()
506            .and_then(|e| e.current_consensus_parameters_version)
507            && let Ok(mut c) = self
508                .chain_state_info
509                .current_consensus_parameters_version
510                .lock()
511        {
512            *c = Some(current_consensus_parameters_version);
513        }
514
515        let inner_required_height = match &self.require_height {
516            ConsistencyPolicy::Auto { height } => Some(height.clone()),
517            ConsistencyPolicy::Manual { .. } => None,
518        };
519
520        if let Some(inner_required_height) = inner_required_height
521            && let Some(current_fuel_block_height) = response
522                .extensions
523                .as_ref()
524                .and_then(|e| e.current_fuel_block_height)
525        {
526            let mut lock = inner_required_height.lock().expect("Mutex poisoned");
527
528            if current_fuel_block_height >= lock.unwrap_or_default() {
529                *lock = Some(current_fuel_block_height);
530            }
531        }
532    }
533
534    /// Send the GraphQL query to the client.
535    pub async fn query<ResponseData, Vars>(
536        &self,
537        q: Operation<ResponseData, Vars>,
538    ) -> io::Result<ResponseData>
539    where
540        Vars: serde::Serialize + Clone + QueryVariables + Send + 'static,
541        ResponseData: serde::de::DeserializeOwned + QueryFragment + Send + 'static,
542    {
543        let required_fuel_block_height = self.required_block_height();
544        let response = self.transport.query(q, required_fuel_block_height).await?;
545
546        self.update_chain_state_info(&response);
547        self.decode_response(response)
548    }
549
550    #[tracing::instrument(skip_all)]
551    #[cfg(feature = "subscriptions")]
552    async fn subscribe<ResponseData, Variables>(
553        &self,
554        variables: Variables,
555    ) -> io::Result<Pin<Box<impl futures::Stream<Item = io::Result<ResponseData>> + '_>>>
556    where
557        Variables: serde::Serialize + QueryVariables + Send + Clone + 'static,
558        ResponseData: serde::de::DeserializeOwned
559            + QueryFragment
560            + SubscriptionBuilder<Variables>
561            + 'static
562            + Send,
563    {
564        let stream = self
565            .transport
566            .subscribe(variables, self.required_block_height())
567            .await?;
568
569        let client = self; // capture immutably
570        Ok(Box::pin(stream.filter_map(move |result| {
571            async move {
572                match result {
573                    Ok(resp) => {
574                        client.update_chain_state_info(&resp);
575                        Some(client.decode_response(resp))
576                    }
577                    Err(e) => Some(Err(e)), // pass through untouched
578                }
579            }
580        })))
581    }
582
583    pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
584        self.chain_state_info
585            .current_stf_version
586            .lock()
587            .ok()
588            .and_then(|value| *value)
589    }
590
591    pub fn latest_consensus_parameters_version(
592        &self,
593    ) -> Option<ConsensusParametersVersion> {
594        self.chain_state_info
595            .current_consensus_parameters_version
596            .lock()
597            .ok()
598            .and_then(|value| *value)
599    }
600
601    pub async fn health(&self) -> io::Result<bool> {
602        let query = schema::Health::build(());
603        self.query(query).await.map(|r| r.health)
604    }
605
606    pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
607        let query = schema::node_info::QueryNodeInfo::build(());
608        self.query(query).await.map(|r| r.node_info.into())
609    }
610
611    pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
612        let query = schema::gas_price::QueryLatestGasPrice::build(());
613        self.query(query).await.map(|r| r.latest_gas_price.into())
614    }
615
616    pub async fn estimate_gas_price(
617        &self,
618        block_horizon: u32,
619    ) -> io::Result<EstimateGasPrice> {
620        let args = BlockHorizonArgs {
621            block_horizon: Some(block_horizon.into()),
622        };
623        let query = schema::gas_price::QueryEstimateGasPrice::build(args);
624        self.query(query).await.map(|r| r.estimate_gas_price)
625    }
626
627    #[cfg(feature = "std")]
628    pub async fn connected_peers_info(
629        &self,
630    ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
631        let query = schema::node_info::QueryPeersInfo::build(());
632        self.query(query)
633            .await
634            .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
635    }
636
637    pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
638        let query = schema::chain::ChainQuery::build(());
639        self.query(query).await.and_then(|r| {
640            let result = r.chain.try_into()?;
641            Ok(result)
642        })
643    }
644
645    pub async fn consensus_parameters(
646        &self,
647        version: i32,
648    ) -> io::Result<Option<ConsensusParameters>> {
649        let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
650        let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
651
652        let result = self
653            .query(query)
654            .await?
655            .consensus_parameters
656            .map(TryInto::try_into)
657            .transpose()?;
658
659        Ok(result)
660    }
661
662    pub async fn state_transition_byte_code_by_version(
663        &self,
664        version: i32,
665    ) -> io::Result<Option<StateTransitionBytecode>> {
666        let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
667        let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
668
669        let result = self
670            .query(query)
671            .await?
672            .state_transition_bytecode_by_version
673            .map(TryInto::try_into)
674            .transpose()?;
675
676        Ok(result)
677    }
678
679    pub async fn state_transition_byte_code_by_root(
680        &self,
681        root: Bytes32,
682    ) -> io::Result<Option<StateTransitionBytecode>> {
683        let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
684            root: HexString(Bytes(root.to_vec())),
685        };
686        let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
687
688        let result = self
689            .query(query)
690            .await?
691            .state_transition_bytecode_by_root
692            .map(TryInto::try_into)
693            .transpose()?;
694
695        Ok(result)
696    }
697
698    /// Default dry run, matching the exact configuration as the node
699    pub async fn dry_run(
700        &self,
701        txs: &[Transaction],
702    ) -> io::Result<Vec<TransactionExecutionStatus>> {
703        self.dry_run_opt(txs, None, None, None).await
704    }
705
706    /// Dry run with options to override the node behavior
707    pub async fn dry_run_opt(
708        &self,
709        txs: &[Transaction],
710        // Disable utxo input checks (exists, unspent, and valid signature)
711        utxo_validation: Option<bool>,
712        gas_price: Option<u64>,
713        at_height: Option<BlockHeight>,
714    ) -> io::Result<Vec<TransactionExecutionStatus>> {
715        let txs = txs
716            .iter()
717            .map(|tx| HexString(Bytes(tx.to_bytes())))
718            .collect::<Vec<HexString>>();
719        let query: Operation<schema::tx::DryRun, DryRunArg> =
720            schema::tx::DryRun::build(DryRunArg {
721                txs,
722                utxo_validation,
723                gas_price: gas_price.map(|gp| gp.into()),
724                block_height: at_height.map(|bh| bh.into()),
725            });
726        let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
727        tx_statuses
728            .into_iter()
729            .map(|tx_status| tx_status.try_into().map_err(Into::into))
730            .collect()
731    }
732
733    /// Like `dry_run_opt`, but also returns the storage reads
734    pub async fn dry_run_opt_record_storage_reads(
735        &self,
736        txs: &[Transaction],
737        // Disable utxo input checks (exists, unspent, and valid signature)
738        utxo_validation: Option<bool>,
739        gas_price: Option<u64>,
740        at_height: Option<BlockHeight>,
741    ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
742        let txs = txs
743            .iter()
744            .map(|tx| HexString(Bytes(tx.to_bytes())))
745            .collect::<Vec<HexString>>();
746        let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
747            schema::tx::DryRunRecordStorageReads::build(DryRunArg {
748                txs,
749                utxo_validation,
750                gas_price: gas_price.map(|gp| gp.into()),
751                block_height: at_height.map(|bh| bh.into()),
752            });
753        let result = self
754            .query(query)
755            .await
756            .map(|r| r.dry_run_record_storage_reads)?;
757        let tx_statuses = result
758            .tx_statuses
759            .into_iter()
760            .map(|tx_status| tx_status.try_into().map_err(Into::into))
761            .collect::<io::Result<Vec<_>>>()?;
762        let storage_reads = result
763            .storage_reads
764            .into_iter()
765            .map(Into::into)
766            .collect::<Vec<_>>();
767        Ok((tx_statuses, storage_reads))
768    }
769
770    /// Get storage read replay for a block
771    pub async fn storage_read_replay(
772        &self,
773        height: &BlockHeight,
774    ) -> io::Result<Vec<StorageReadReplayEvent>> {
775        let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
776            StorageReadReplay::build(StorageReadReplayArgs {
777                height: (*height).into(),
778            });
779        Ok(self
780            .query(query)
781            .await
782            .map(|r| r.storage_read_replay)?
783            .into_iter()
784            .map(Into::into)
785            .collect())
786    }
787
788    /// Assembles the transaction based on the provided requirements.
789    /// The return transaction contains:
790    /// - Input coins to cover `required_balances`
791    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
792    /// - `Change` or `Destroy` outputs for all assets from the inputs
793    /// - `Variable` outputs in the case they are required during the execution
794    /// - `Contract` inputs and outputs in the case they are required during the execution
795    /// - Reserved witness slots for signed coins filled with `64` zeroes
796    /// - Set script gas limit(unless `script` is empty)
797    /// - Estimated predicates, if `estimate_predicates == true`
798    ///
799    /// Returns an error if:
800    /// - The number of required balances exceeds the maximum number of inputs allowed.
801    /// - The fee address index is out of bounds.
802    /// - The same asset has multiple change policies(either the receiver of
803    ///   the change is different, or one of the policies states about the destruction
804    ///   of the token while the other does not). The `Change` output from the transaction
805    ///   also count as a `ChangePolicy`.
806    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
807    /// - Required assets have multiple entries.
808    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
809    /// - If a constructed transaction breaks the rules defined by consensus parameters.
810    #[allow(clippy::too_many_arguments)]
811    pub async fn assemble_tx(
812        &self,
813        tx: &Transaction,
814        block_horizon: u32,
815        required_balances: Vec<RequiredBalance>,
816        fee_address_index: u16,
817        exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
818        estimate_predicates: bool,
819        reserve_gas: Option<u64>,
820    ) -> io::Result<AssembleTransactionResult> {
821        let tx = HexString(Bytes(tx.to_bytes()));
822        let block_horizon = block_horizon.into();
823
824        let required_balances: Vec<_> = required_balances
825            .into_iter()
826            .map(schema::tx::RequiredBalance::try_from)
827            .collect::<Result<Vec<_>, _>>()?;
828
829        let fee_address_index = fee_address_index.into();
830
831        let exclude_input = exclude.map(Into::into);
832
833        let reserve_gas = reserve_gas.map(U64::from);
834
835        let query_arg = AssembleTxArg {
836            tx,
837            block_horizon,
838            required_balances,
839            fee_address_index,
840            exclude_input,
841            estimate_predicates,
842            reserve_gas,
843        };
844
845        let query = schema::tx::AssembleTx::build(query_arg);
846        let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
847        Ok(assemble_tx_result.try_into()?)
848    }
849
850    /// Estimate predicates for the transaction
851    pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
852        let serialized_tx = tx.to_bytes();
853        let query = schema::tx::EstimatePredicates::build(TxArg {
854            tx: HexString(Bytes(serialized_tx)),
855        });
856        let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
857        let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
858        *tx = tx_with_predicate;
859        Ok(())
860    }
861
862    pub async fn submit(
863        &self,
864        tx: &Transaction,
865    ) -> io::Result<types::primitives::TransactionId> {
866        self.submit_opt(tx, None).await
867    }
868
869    pub async fn submit_opt(
870        &self,
871        tx: &Transaction,
872        estimate_predicates: Option<bool>,
873    ) -> io::Result<types::primitives::TransactionId> {
874        let tx = tx.clone().to_bytes();
875        let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
876            tx: HexString(Bytes(tx)),
877            estimate_predicates,
878        });
879
880        let id = self.query(query).await.map(|r| r.submit)?.id.into();
881        Ok(id)
882    }
883
884    /// Similar to [`Self::submit_and_await_commit_opt`], but with default options.
885    #[cfg(feature = "subscriptions")]
886    pub async fn submit_and_await_commit(
887        &self,
888        tx: &Transaction,
889    ) -> io::Result<TransactionStatus> {
890        self.submit_and_await_commit_opt(tx, None).await
891    }
892
893    /// Submit the transaction and wait for it either to be included in
894    /// a block or removed from `TxPool`.
895    ///
896    /// If `estimate_predicates` is set, the predicates will be estimated before
897    /// the transaction is inserted into transaction pool.
898    ///
899    /// This will wait forever if needed, so consider wrapping this call
900    /// with a `tokio::time::timeout`.
901    #[cfg(feature = "subscriptions")]
902    pub async fn submit_and_await_commit_opt(
903        &self,
904        tx: &Transaction,
905        estimate_predicates: Option<bool>,
906    ) -> io::Result<TransactionStatus> {
907        let tx = tx.clone().to_bytes();
908        let variables = TxWithEstimatedPredicatesArg {
909            tx: HexString(Bytes(tx)),
910            estimate_predicates,
911        };
912
913        let mut stream = self.subscribe(variables).await?.map(
914            |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
915                let status: TransactionStatus = r?.submit_and_await.try_into()?;
916                Result::<_, io::Error>::Ok(status)
917            },
918        );
919
920        let status = stream.next().await.ok_or_else(|| {
921            io::Error::other("Failed to get status from the submission")
922        })??;
923
924        Ok(status)
925    }
926
927    /// Similar to [`Self::submit_and_await_commit`], but the status also contains transaction.
928    #[cfg(feature = "subscriptions")]
929    pub async fn submit_and_await_commit_with_tx(
930        &self,
931        tx: &Transaction,
932    ) -> io::Result<StatusWithTransaction> {
933        self.submit_and_await_commit_with_tx_opt(tx, None).await
934    }
935
936    /// Similar to [`Self::submit_and_await_commit_opt`], but the status also contains transaction.
937    #[cfg(feature = "subscriptions")]
938    pub async fn submit_and_await_commit_with_tx_opt(
939        &self,
940        tx: &Transaction,
941        estimate_predicates: Option<bool>,
942    ) -> io::Result<StatusWithTransaction> {
943        let tx = tx.clone().to_bytes();
944        let variables = TxWithEstimatedPredicatesArg {
945            tx: HexString(Bytes(tx)),
946            estimate_predicates,
947        };
948
949        let mut stream = self.subscribe(variables).await?.map(
950            |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
951                let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
952                Result::<_, io::Error>::Ok(status)
953            },
954        );
955
956        let status = stream.next().await.ok_or_else(|| {
957            io::Error::other("Failed to get status from the submission")
958        })??;
959
960        Ok(status)
961    }
962
963    /// Similar to [`Self::submit_and_await_commit`], but includes all intermediate states.
964    #[cfg(feature = "subscriptions")]
965    pub async fn submit_and_await_status(
966        &self,
967        tx: &Transaction,
968    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
969        self.submit_and_await_status_opt(tx, None, None).await
970    }
971
972    /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
973    #[cfg(feature = "subscriptions")]
974    pub async fn submit_and_await_status_opt(
975        &self,
976        tx: &Transaction,
977        estimate_predicates: Option<bool>,
978        include_preconfirmation: Option<bool>,
979    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
980        use schema::tx::SubmitAndAwaitStatusArg;
981        let tx = tx.clone().to_bytes();
982        let variables = SubmitAndAwaitStatusArg {
983            tx: HexString(Bytes(tx)),
984            estimate_predicates,
985            include_preconfirmation,
986        };
987
988        let stream = self.subscribe(variables).await?.map(
989            |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
990                let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
991                Result::<_, io::Error>::Ok(status)
992            },
993        );
994
995        Ok(stream)
996    }
997
998    /// Requests all storage slots for the `contract_id`.
999    #[cfg(feature = "subscriptions")]
1000    pub async fn contract_storage_slots(
1001        &self,
1002        contract_id: &ContractId,
1003    ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1004        use schema::storage::ContractStorageSlotsArgs;
1005        let variables = ContractStorageSlotsArgs {
1006            contract_id: (*contract_id).into(),
1007        };
1008
1009        let stream = self.subscribe(variables).await?.map(
1010            |result: io::Result<schema::storage::ContractStorageSlots>| {
1011                let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1012                Result::<_, io::Error>::Ok(result)
1013            },
1014        );
1015
1016        Ok(stream)
1017    }
1018
1019    /// Requests all storage balances for the `contract_id`.
1020    #[cfg(feature = "subscriptions")]
1021    pub async fn contract_storage_balances(
1022        &self,
1023        contract_id: &ContractId,
1024    ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1025    {
1026        use schema::{
1027            contract::ContractBalance,
1028            storage::ContractStorageBalancesArgs,
1029        };
1030        let variables = ContractStorageBalancesArgs {
1031            contract_id: (*contract_id).into(),
1032        };
1033
1034        let stream = self.subscribe(variables).await?.map(
1035            |result: io::Result<schema::storage::ContractStorageBalances>| {
1036                let result: ContractBalance = result?.contract_storage_balances;
1037                Result::<_, io::Error>::Ok(result)
1038            },
1039        );
1040
1041        Ok(stream)
1042    }
1043
1044    /// Returns a stream of new blocks.
1045    #[cfg(feature = "subscriptions")]
1046    pub async fn new_blocks_subscription(
1047        &self,
1048    ) -> io::Result<
1049        impl Stream<
1050            Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1051        > + '_,
1052    > {
1053        let stream = self.subscribe(()).await?.map(
1054            |r: io::Result<schema::block::NewBlocksSubscription>| {
1055                let result: fuel_core_types::services::block_importer::ImportResult =
1056                    postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1057                        io::Error::other(format!(
1058                            "Failed to deserialize ImportResult: {e:?}"
1059                        ))
1060                    })?;
1061                Result::<_, io::Error>::Ok(result)
1062            },
1063        );
1064
1065        Ok(stream)
1066    }
1067
1068    /// Returns a stream of preconfirmations for all transactions.
1069    #[cfg(feature = "subscriptions")]
1070    pub async fn preconfirmations_subscription(
1071        &self,
1072    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1073        let stream = self.subscribe(()).await?.map(
1074            |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1075                let status: TransactionStatus = r?.preconfirmations.try_into()?;
1076                Result::<_, io::Error>::Ok(status)
1077            },
1078        );
1079
1080        Ok(stream)
1081    }
1082
1083    pub async fn contract_slots_values(
1084        &self,
1085        contract_id: &ContractId,
1086        block_height: Option<BlockHeight>,
1087        requested_storage_slots: Vec<Bytes32>,
1088    ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1089        let query = schema::storage::ContractSlotValues::build(
1090            schema::storage::ContractSlotValuesArgs {
1091                contract_id: (*contract_id).into(),
1092                block_height: block_height.map(|b| (*b).into()),
1093                storage_slots: requested_storage_slots
1094                    .into_iter()
1095                    .map(Into::into)
1096                    .collect(),
1097            },
1098        );
1099
1100        self.query(query)
1101            .await
1102            .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1103    }
1104
1105    pub async fn contract_balance_values(
1106        &self,
1107        contract_id: &ContractId,
1108        block_height: Option<BlockHeight>,
1109        requested_storage_slots: Vec<AssetId>,
1110    ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1111        let query = schema::storage::ContractBalanceValues::build(
1112            schema::storage::ContractBalanceValuesArgs {
1113                contract_id: (*contract_id).into(),
1114                block_height: block_height.map(|b| (*b).into()),
1115                assets: requested_storage_slots
1116                    .into_iter()
1117                    .map(Into::into)
1118                    .collect(),
1119            },
1120        );
1121
1122        self.query(query)
1123            .await
1124            .map(|r| r.contract_balance_values.into_iter().collect())
1125    }
1126
1127    pub async fn start_session(&self) -> io::Result<String> {
1128        let query = schema::StartSession::build(());
1129
1130        self.query(query)
1131            .await
1132            .map(|r| r.start_session.into_inner())
1133    }
1134
1135    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1136        let query = schema::EndSession::build(IdArg { id: id.into() });
1137
1138        self.query(query).await.map(|r| r.end_session)
1139    }
1140
1141    pub async fn reset(&self, id: &str) -> io::Result<bool> {
1142        let query = schema::Reset::build(IdArg { id: id.into() });
1143
1144        self.query(query).await.map(|r| r.reset)
1145    }
1146
1147    pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1148        let op = serde_json::to_string(op)?;
1149        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1150
1151        self.query(query).await.map(|r| r.execute)
1152    }
1153
1154    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1155        let query = schema::Register::build(RegisterArgs {
1156            id: id.into(),
1157            register: register.into(),
1158        });
1159
1160        Ok(self.query(query).await?.register.0 as Word)
1161    }
1162
1163    pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1164        let query = schema::Memory::build(MemoryArgs {
1165            id: id.into(),
1166            start: start.into(),
1167            size: size.into(),
1168        });
1169
1170        let memory = self.query(query).await?.memory;
1171
1172        Ok(serde_json::from_str(memory.as_str())?)
1173    }
1174
1175    pub async fn set_breakpoint(
1176        &self,
1177        session_id: &str,
1178        contract: fuel_types::ContractId,
1179        pc: u64,
1180    ) -> io::Result<()> {
1181        let operation = SetBreakpoint::build(SetBreakpointArgs {
1182            id: Id::new(session_id),
1183            bp: schema::Breakpoint {
1184                contract: contract.into(),
1185                pc: U64(pc),
1186            },
1187        });
1188
1189        let response = self.query(operation).await?;
1190        assert!(
1191            response.set_breakpoint,
1192            "Setting breakpoint returned invalid reply"
1193        );
1194        Ok(())
1195    }
1196
1197    pub async fn set_single_stepping(
1198        &self,
1199        session_id: &str,
1200        enable: bool,
1201    ) -> io::Result<()> {
1202        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1203            id: Id::new(session_id),
1204            enable,
1205        });
1206        self.query(operation).await?;
1207        Ok(())
1208    }
1209
1210    pub async fn start_tx(
1211        &self,
1212        session_id: &str,
1213        tx: &Transaction,
1214    ) -> io::Result<RunResult> {
1215        let operation = StartTx::build(StartTxArgs {
1216            id: Id::new(session_id),
1217            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1218        });
1219        let response = self.query(operation).await?.start_tx;
1220        Ok(response)
1221    }
1222
1223    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1224        let operation = ContinueTx::build(ContinueTxArgs {
1225            id: Id::new(session_id),
1226        });
1227        let response = self.query(operation).await?.continue_tx;
1228        Ok(response)
1229    }
1230
1231    pub async fn transaction(
1232        &self,
1233        id: &TxId,
1234    ) -> io::Result<Option<TransactionResponse>> {
1235        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1236
1237        let transaction = self.query(query).await?.transaction;
1238
1239        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1240    }
1241
1242    /// Get the status of a transaction
1243    pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1244        let query =
1245            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1246
1247        let status = self.query(query).await?.transaction.ok_or_else(|| {
1248            io::Error::new(
1249                ErrorKind::NotFound,
1250                format!("status not found for transaction {id} "),
1251            )
1252        })?;
1253
1254        let status = status
1255            .status
1256            .ok_or_else(|| {
1257                io::Error::new(
1258                    ErrorKind::NotFound,
1259                    format!("status not found for transaction {id}"),
1260                )
1261            })?
1262            .try_into()?;
1263        Ok(status)
1264    }
1265
1266    #[tracing::instrument(skip(self), level = "debug")]
1267    #[cfg(feature = "subscriptions")]
1268    /// Similar to [`Self::subscribe_transaction_status_opt`], but with default options.
1269    pub async fn subscribe_transaction_status(
1270        &self,
1271        id: &TxId,
1272    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1273        self.subscribe_transaction_status_opt(id, None).await
1274    }
1275
1276    #[cfg(feature = "subscriptions")]
1277    /// Subscribe to the status of a transaction
1278    pub async fn subscribe_transaction_status_opt(
1279        &self,
1280        id: &TxId,
1281        include_preconfirmation: Option<bool>,
1282    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1283        use schema::tx::{
1284            StatusChangeSubscription,
1285            StatusChangeSubscriptionArgs,
1286        };
1287        let tx_id: TransactionId = (*id).into();
1288        let variables = StatusChangeSubscriptionArgs {
1289            id: tx_id,
1290            include_preconfirmation,
1291        };
1292
1293        tracing::debug!("subscribing");
1294        let stream = self
1295            .subscribe::<StatusChangeSubscription, StatusChangeSubscriptionArgs>(
1296                variables,
1297            )
1298            .await?
1299            .map(|tx| {
1300                tracing::debug!("received {tx:?}");
1301                let tx = tx?;
1302                let status = tx.status_change.try_into()?;
1303                Ok(status)
1304            });
1305
1306        Ok(stream)
1307    }
1308
1309    #[cfg(feature = "subscriptions")]
1310    /// Awaits for the transaction to be committed into a block
1311    ///
1312    /// This will wait forever if needed, so consider wrapping this call
1313    /// with a `tokio::time::timeout`.
1314    pub async fn await_transaction_commit(
1315        &self,
1316        id: &TxId,
1317    ) -> io::Result<TransactionStatus> {
1318        // skip until we've reached a final status and then stop consuming the stream
1319        // to avoid an EOF which the eventsource client considers as an error.
1320        let status_result = self
1321            .subscribe_transaction_status(id)
1322            .await?
1323            .skip_while(|status| {
1324                future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1325            })
1326            .next()
1327            .await;
1328
1329        if let Some(Ok(status)) = status_result {
1330            Ok(status)
1331        } else {
1332            Err(io::Error::other(format!(
1333                "Failed to get status for transaction {status_result:?}"
1334            )))
1335        }
1336    }
1337
1338    /// returns a paginated set of transactions sorted by block height
1339    pub async fn transactions(
1340        &self,
1341        request: PaginationRequest<String>,
1342    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1343        let args = schema::ConnectionArgs::from(request);
1344        let query = schema::tx::TransactionsQuery::build(args);
1345        let transactions = self.query(query).await?.transactions.try_into()?;
1346        Ok(transactions)
1347    }
1348
1349    /// Returns a paginated set of transactions associated with a txo owner address.
1350    pub async fn transactions_by_owner(
1351        &self,
1352        owner: &Address,
1353        request: PaginationRequest<String>,
1354    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1355        let owner: schema::Address = (*owner).into();
1356        let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1357        let query = schema::tx::TransactionsByOwnerQuery::build(args);
1358
1359        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1360        Ok(transactions)
1361    }
1362
1363    pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1364        let query =
1365            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1366
1367        let tx = self.query(query).await?.transaction.ok_or_else(|| {
1368            io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1369        })?;
1370
1371        let receipts = match tx.status {
1372            Some(status) => match status {
1373                schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1374                    s.receipts
1375                        .into_iter()
1376                        .map(TryInto::<Receipt>::try_into)
1377                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1378                )
1379                .transpose()?,
1380                schema::tx::TransactionStatus::FailureStatus(s) => Some(
1381                    s.receipts
1382                        .into_iter()
1383                        .map(TryInto::<Receipt>::try_into)
1384                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1385                )
1386                .transpose()?,
1387                _ => None,
1388            },
1389            _ => None,
1390        };
1391
1392        Ok(receipts)
1393    }
1394
1395    #[cfg(feature = "test-helpers")]
1396    pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1397        let query = schema::tx::AllReceipts::build(());
1398        let receipts = self.query(query).await?.all_receipts;
1399
1400        let vec: Result<Vec<Receipt>, ConversionError> = receipts
1401            .into_iter()
1402            .map(TryInto::<Receipt>::try_into)
1403            .collect();
1404
1405        Ok(vec?)
1406    }
1407
1408    pub async fn produce_blocks(
1409        &self,
1410        blocks_to_produce: u32,
1411        start_timestamp: Option<u64>,
1412    ) -> io::Result<BlockHeight> {
1413        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1414            blocks_to_produce: blocks_to_produce.into(),
1415            start_timestamp: start_timestamp
1416                .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1417        });
1418
1419        let new_height = self.query(query).await?.produce_blocks;
1420
1421        Ok(new_height.into())
1422    }
1423
1424    pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1425        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1426            id: Some((*id).into()),
1427        });
1428
1429        let block = self
1430            .query(query)
1431            .await?
1432            .block
1433            .map(TryInto::try_into)
1434            .transpose()?;
1435
1436        Ok(block)
1437    }
1438
1439    pub async fn block_by_height(
1440        &self,
1441        height: BlockHeight,
1442    ) -> io::Result<Option<types::Block>> {
1443        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1444            height: Some(U32(height.into())),
1445        });
1446
1447        let block = self
1448            .query(query)
1449            .await?
1450            .block
1451            .map(TryInto::try_into)
1452            .transpose()?;
1453
1454        Ok(block)
1455    }
1456
1457    pub async fn da_compressed_block(
1458        &self,
1459        height: BlockHeight,
1460    ) -> io::Result<Option<Vec<u8>>> {
1461        let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1462            DaCompressedBlockByHeightArgs {
1463                height: U32(height.into()),
1464            },
1465        );
1466
1467        Ok(self
1468            .query(query)
1469            .await?
1470            .da_compressed_block
1471            .map(|b| b.bytes.into()))
1472    }
1473
1474    /// Retrieve a blob by its ID
1475    pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1476        let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1477        let blob = self.query(query).await?.blob.map(Into::into);
1478        Ok(blob)
1479    }
1480
1481    /// Check whether a blob with ID exists
1482    pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1483        let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1484        Ok(self.query(query).await?.blob.is_some())
1485    }
1486
1487    /// Retrieve multiple blocks
1488    pub async fn blocks(
1489        &self,
1490        request: PaginationRequest<String>,
1491    ) -> io::Result<PaginatedResult<types::Block, String>> {
1492        let args = schema::ConnectionArgs::from(request);
1493        let query = schema::block::BlocksQuery::build(args);
1494
1495        let blocks = self.query(query).await?.blocks.try_into()?;
1496
1497        Ok(blocks)
1498    }
1499
1500    pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1501        let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1502            utxo_id: (*id).into(),
1503        });
1504        let coin = self.query(query).await?.coin.map(Into::into);
1505        Ok(coin)
1506    }
1507
1508    /// Retrieve a page of coins by their owner
1509    pub async fn coins(
1510        &self,
1511        owner: &Address,
1512        asset_id: Option<&AssetId>,
1513        request: PaginationRequest<String>,
1514    ) -> io::Result<PaginatedResult<types::Coin, String>> {
1515        let owner: schema::Address = (*owner).into();
1516        let asset_id = asset_id.map(|id| (*id).into());
1517        let args = CoinsConnectionArgs::from((owner, asset_id, request));
1518        let query = schema::coins::CoinsQuery::build(args);
1519
1520        let coins = self.query(query).await?.coins.into();
1521        Ok(coins)
1522    }
1523
1524    /// Retrieve coins to spend in a transaction
1525    pub async fn coins_to_spend(
1526        &self,
1527        owner: &Address,
1528        spend_query: Vec<(AssetId, u128, Option<u16>)>,
1529        // (Utxos, Messages Nonce)
1530        excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1531    ) -> io::Result<Vec<Vec<types::CoinType>>> {
1532        let owner: schema::Address = (*owner).into();
1533        let spend_query: Vec<SpendQueryElementInput> = spend_query
1534            .iter()
1535            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1536                Ok(SpendQueryElementInput {
1537                    asset_id: (*asset_id).into(),
1538                    amount: (*amount).into(),
1539                    max: (*max).map(|max| max.into()),
1540                })
1541            })
1542            .try_collect()?;
1543        let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1544        let args =
1545            schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1546        let query = schema::coins::CoinsToSpendQuery::build(args);
1547
1548        let coins_per_asset = self
1549            .query(query)
1550            .await?
1551            .coins_to_spend
1552            .into_iter()
1553            .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1554            .collect::<Vec<_>>();
1555        Ok(coins_per_asset)
1556    }
1557
1558    pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1559        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1560            id: (*id).into(),
1561        });
1562        let contract = self.query(query).await?.contract.map(Into::into);
1563        Ok(contract)
1564    }
1565
1566    pub async fn contract_balance(
1567        &self,
1568        id: &ContractId,
1569        asset: Option<&AssetId>,
1570    ) -> io::Result<u64> {
1571        let asset_id: schema::AssetId = match asset {
1572            Some(asset) => (*asset).into(),
1573            None => schema::AssetId::default(),
1574        };
1575
1576        let query =
1577            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1578                id: (*id).into(),
1579                asset: asset_id,
1580            });
1581
1582        let balance: types::ContractBalance =
1583            self.query(query).await?.contract_balance.into();
1584        Ok(balance.amount)
1585    }
1586
1587    pub async fn balance(
1588        &self,
1589        owner: &Address,
1590        asset_id: Option<&AssetId>,
1591    ) -> io::Result<u128> {
1592        let owner: schema::Address = (*owner).into();
1593        let asset_id: schema::AssetId = match asset_id {
1594            Some(asset_id) => (*asset_id).into(),
1595            None => schema::AssetId::default(),
1596        };
1597        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1598        let balance: types::Balance = self.query(query).await?.balance.into();
1599        Ok(balance.amount)
1600    }
1601
1602    // Retrieve a page of balances by their owner
1603    pub async fn balances(
1604        &self,
1605        owner: &Address,
1606        request: PaginationRequest<String>,
1607    ) -> io::Result<PaginatedResult<types::Balance, String>> {
1608        let owner: schema::Address = (*owner).into();
1609        let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1610        let query = schema::balance::BalancesQuery::build(args);
1611
1612        let balances = self.query(query).await?.balances.into();
1613        Ok(balances)
1614    }
1615
1616    pub async fn contract_balances(
1617        &self,
1618        contract: &ContractId,
1619        request: PaginationRequest<String>,
1620    ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1621        let contract_id: schema::ContractId = (*contract).into();
1622        let args = ContractBalancesConnectionArgs::from((contract_id, request));
1623        let query = schema::contract::ContractBalancesQuery::build(args);
1624
1625        let balances = self.query(query).await?.contract_balances.into();
1626
1627        Ok(balances)
1628    }
1629
1630    // Retrieve a message by its nonce
1631    pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1632        let query = schema::message::MessageQuery::build(NonceArgs {
1633            nonce: (*nonce).into(),
1634        });
1635        let message = self.query(query).await?.message.map(Into::into);
1636        Ok(message)
1637    }
1638
1639    pub async fn messages(
1640        &self,
1641        owner: Option<&Address>,
1642        request: PaginationRequest<String>,
1643    ) -> io::Result<PaginatedResult<types::Message, String>> {
1644        let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1645        let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1646        let query = schema::message::OwnedMessageQuery::build(args);
1647
1648        let messages = self.query(query).await?.messages.into();
1649
1650        Ok(messages)
1651    }
1652
1653    pub async fn contract_info(
1654        &self,
1655        contract: &ContractId,
1656    ) -> io::Result<Option<types::Contract>> {
1657        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1658            id: (*contract).into(),
1659        });
1660        let contract_info = self.query(query).await?.contract.map(Into::into);
1661        Ok(contract_info)
1662    }
1663
1664    pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1665        let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1666            nonce: (*nonce).into(),
1667        });
1668        let status = self.query(query).await?.message_status.into();
1669
1670        Ok(status)
1671    }
1672
1673    /// Request a merkle proof of an output message.
1674    pub async fn message_proof(
1675        &self,
1676        transaction_id: &TxId,
1677        nonce: &Nonce,
1678        commit_block_id: Option<&BlockId>,
1679        commit_block_height: Option<BlockHeight>,
1680    ) -> io::Result<types::MessageProof> {
1681        let transaction_id: TransactionId = (*transaction_id).into();
1682        let nonce: schema::Nonce = (*nonce).into();
1683        let commit_block_id: Option<schema::BlockId> =
1684            commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1685        let commit_block_height = commit_block_height.map(Into::into);
1686        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1687            transaction_id,
1688            nonce,
1689            commit_block_id,
1690            commit_block_height,
1691        });
1692        let proof = self.query(query).await?.message_proof.try_into()?;
1693        Ok(proof)
1694    }
1695
1696    pub async fn relayed_transaction_status(
1697        &self,
1698        id: &Bytes32,
1699    ) -> io::Result<Option<RelayedTransactionStatus>> {
1700        let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1701            RelayedTransactionStatusArgs {
1702                id: id.to_owned().into(),
1703            },
1704        );
1705        let status = self
1706            .query(query)
1707            .await?
1708            .relayed_transaction_status
1709            .map(|status| status.try_into())
1710            .transpose()?;
1711        Ok(status)
1712    }
1713
1714    pub async fn asset_info(
1715        &self,
1716        asset_id: &AssetId,
1717    ) -> io::Result<Option<AssetDetail>> {
1718        let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1719            id: (*asset_id).into(),
1720        });
1721        let asset_info = self.query(query).await?.asset_details.map(Into::into);
1722        Ok(asset_info)
1723    }
1724}
1725
1726#[cfg(any(test, feature = "test-helpers"))]
1727impl FuelClient {
1728    pub async fn transparent_transaction(
1729        &self,
1730        id: &TxId,
1731    ) -> io::Result<Option<types::TransactionType>> {
1732        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1733
1734        let transaction = self.query(query).await?.transaction;
1735
1736        Ok(transaction
1737            .map(|tx| {
1738                let response: TransactionResponse = tx.try_into()?;
1739                Ok::<_, ConversionError>(response.transaction)
1740            })
1741            .transpose()?)
1742    }
1743}
1744
1745#[cfg(feature = "rpc")]
1746impl FuelClient {
1747    fn rpc_client(&self) -> io::Result<ProtoBlockAggregatorClient<Channel>> {
1748        self.rpc_client
1749            .clone()
1750            .ok_or(io::Error::other("RPC client not initialized"))
1751    }
1752
1753    pub async fn get_block_range(
1754        &self,
1755        start: BlockHeight,
1756        end: BlockHeight,
1757    ) -> io::Result<
1758        impl Stream<
1759            Item = io::Result<(
1760                fuel_core_types::blockchain::block::Block,
1761                Vec<Vec<Receipt>>,
1762            )>,
1763        >,
1764    > {
1765        let request = ProtoBlockRangeRequest {
1766            start: *start,
1767            end: *end,
1768        };
1769
1770        let stream = self
1771            .rpc_client()?
1772            .get_block_range(request)
1773            .await
1774            .map_err(io::Error::other)?
1775            .into_inner()
1776            .then(|res| {
1777                let maybe_aws_client = self.aws_client.clone();
1778                async move {
1779                    let maybe_aws_client = maybe_aws_client.clone();
1780                    let resp =
1781                        res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
1782                    Self::convert_block_response(resp, maybe_aws_client).await
1783                }
1784            });
1785        Ok(stream)
1786    }
1787
1788    async fn convert_block_response(
1789        resp: BlockResponse,
1790        s3_client: AWSClientManager,
1791    ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec<Vec<Receipt>>)> {
1792        let payload = resp
1793            .payload
1794            .ok_or(io::Error::other("No RPC payload for `BlockResponse`"))?;
1795        match payload {
1796            Payload::Literal(_) => {
1797                // Should never happen, as we don't return blocks as literal payloads
1798                Err(io::Error::other("Literal payloads are not supported yet"))
1799            }
1800            Payload::Bytes(bytes) => {
1801                let proto_block =
1802                    ProtoBlock::decode(bytes.as_slice()).map_err(io::Error::other)?;
1803                fuel_block_from_protobuf(proto_block).map_err(|e| {
1804                    io::Error::other(format!(
1805                        "Failed to convert RPC block to internal block: {e:?}"
1806                    ))
1807                })
1808            }
1809            Payload::Remote(remote) => {
1810                let RemoteBlockResponse { location } = remote;
1811                match location {
1812                    Some(Location::S3(s3)) => {
1813                        let RemoteS3Bucket {
1814                            bucket,
1815                            key,
1816                            endpoint,
1817                            requester_pays,
1818                        } = s3;
1819                        let zipped_bytes = Self::get_block_from_s3_bucket(
1820                            s3_client,
1821                            &endpoint,
1822                            &bucket,
1823                            &key,
1824                            requester_pays,
1825                        )
1826                        .await?;
1827
1828                        let block_bytes = Self::unzip_bytes(&zipped_bytes)?;
1829                        let block =
1830                            ProtoBlock::decode(block_bytes.as_slice()).map_err(|e| {
1831                                io::Error::other(format!("Failed to decode block: {e}"))
1832                            })?;
1833                        let (block, receipts) =
1834                            fuel_block_from_protobuf(block).map_err(|e| {
1835                                io::Error::other(format!(
1836                                    "Failed to convert RPC block to internal block: {e:?}"
1837                                ))
1838                            })?;
1839                        Ok((block, receipts))
1840                    }
1841                    _ => Err(io::Error::other("Remote blocks are not supported yet")),
1842                }
1843            }
1844        }
1845    }
1846    async fn get_block_from_s3_bucket(
1847        s3_client: AWSClientManager,
1848        url: &Option<String>,
1849        bucket: &str,
1850        key: &str,
1851        requester_pays: bool,
1852    ) -> io::Result<prost::bytes::Bytes> {
1853        use aws_sdk_s3::types::RequestPayer;
1854        tracing::debug!("getting block from bucket: {} with key {}", bucket, key);
1855        let mut req = s3_client
1856            .get_client(url)
1857            .await?
1858            .get_object()
1859            .bucket(bucket)
1860            .key(key);
1861        if requester_pays {
1862            req = req.request_payer(RequestPayer::Requester);
1863        }
1864        let obj = req.send().await.map_err(|e| {
1865            io::Error::other(format!("Failed to get object from S3: {e:?}"))
1866        })?;
1867        let bytes = obj
1868            .body
1869            .collect()
1870            .await
1871            .map_err(|e| {
1872                io::Error::other(format!("Failed to get object from S3: {e:?}"))
1873            })?
1874            .into_bytes();
1875        Ok(bytes)
1876    }
1877
1878    async fn new_aws_client(url: &Option<String>) -> AWSClient {
1879        let credentials = DefaultCredentialsChain::builder().build().await;
1880        let mut config_builder = aws_config::defaults(BehaviorVersion::latest())
1881            .credentials_provider(credentials);
1882        if let Some(url) = url {
1883            config_builder = config_builder.endpoint_url(url);
1884        }
1885        let sdk_config = config_builder.load().await;
1886        let builder = aws_sdk_s3::config::Builder::from(&sdk_config);
1887        let config = builder.force_path_style(true).build();
1888        AWSClient::from_conf(config)
1889    }
1890
1891    fn unzip_bytes(bytes: &[u8]) -> io::Result<Vec<u8>> {
1892        let mut decoder = GzDecoder::new(bytes);
1893        let mut output = Vec::new();
1894        decoder.read_to_end(&mut output).map_err(io::Error::other)?;
1895        Ok(output)
1896    }
1897
1898    /// Used to get the synced height of the block aggregator,
1899    /// as it doesn't always match the latest block height
1900    pub async fn get_aggregated_height(&self) -> io::Result<BlockHeight> {
1901        let request = ProtoBlockHeightRequest {};
1902        let height = self
1903            .rpc_client()?
1904            .get_synced_block_height(request)
1905            .await
1906            .map_err(io::Error::other)?
1907            .into_inner()
1908            .height
1909            .ok_or(io::Error::other("No height in RPC response"))?;
1910        Ok(BlockHeight::from(height))
1911    }
1912
1913    pub async fn new_block_subscription(
1914        &self,
1915    ) -> io::Result<
1916        impl Stream<
1917            Item = io::Result<(
1918                fuel_core_types::blockchain::block::Block,
1919                Vec<Vec<Receipt>>,
1920            )>,
1921        >,
1922    > {
1923        let request = ProtoNewBlockSubscriptionRequest {};
1924        let stream = self
1925            .rpc_client()?
1926            .new_block_subscription(request)
1927            .await
1928            .map_err(io::Error::other)?
1929            .into_inner()
1930            .then(|res| {
1931                let maybe_aws_client = self.aws_client.clone();
1932                async move {
1933                    let maybe_aws_client = maybe_aws_client.clone();
1934                    let resp =
1935                        res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
1936                    Self::convert_block_response(resp, maybe_aws_client).await
1937                }
1938            });
1939        Ok(stream)
1940    }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945    use super::*;
1946
1947    #[test]
1948    fn with_urls_normalizes_urls_to_graphql_endpoint() {
1949        // Given
1950        let urls = &["http://localhost:8080", "http://example.com:4000"];
1951
1952        // When
1953        let client = FuelClient::with_urls(urls).expect("should create client");
1954
1955        // Then
1956        assert_eq!(
1957            client.get_default_url().as_str(),
1958            "http://localhost:8080/v1/graphql"
1959        );
1960    }
1961
1962    #[test]
1963    fn with_urls_adds_http_scheme_if_missing() {
1964        // Given
1965        let urls = &["localhost:8080"];
1966
1967        // When
1968        let client = FuelClient::with_urls(urls).expect("should create client");
1969
1970        // Then
1971        assert_eq!(
1972            client.get_default_url().as_str(),
1973            "http://localhost:8080/v1/graphql"
1974        );
1975    }
1976
1977    #[test]
1978    fn with_urls_overwrites_existing_path() {
1979        // Given - URLs that already have some path
1980        let urls = &["http://localhost:8080/some/path", "http://example.com/api"];
1981
1982        // When
1983        let client = FuelClient::with_urls(urls).expect("should create client");
1984
1985        // Then - path should be normalized to /v1/graphql
1986        assert_eq!(
1987            client.get_default_url().as_str(),
1988            "http://localhost:8080/v1/graphql"
1989        );
1990    }
1991
1992    #[test]
1993    fn new_and_with_urls_produce_same_url() {
1994        // Given
1995        let url = "http://localhost:8080";
1996
1997        // When
1998        let client_new = FuelClient::new(url).expect("should create client via new");
1999        let client_with_urls =
2000            FuelClient::with_urls(&[url]).expect("should create client via with_urls");
2001
2002        // Then
2003        assert_eq!(
2004            client_new.get_default_url().as_str(),
2005            client_with_urls.get_default_url().as_str()
2006        );
2007    }
2008}