Skip to main content

fuel_core_client/
client.rs

1use self::schema::{
2    block::ProduceBlockArgs,
3    message::{
4        MessageProofArgs,
5        NonceArgs,
6    },
7};
8#[cfg(feature = "subscriptions")]
9use crate::client::types::StatusWithTransaction;
10use crate::{
11    client::{
12        schema::{
13            Tai64Timestamp,
14            TransactionId,
15            block::BlockByHeightArgs,
16            coins::{
17                ExcludeInput,
18                SpendQueryElementInput,
19            },
20            contract::ContractBalanceQueryArgs,
21            gas_price::EstimateGasPrice,
22            message::MessageStatusArgs,
23            relayed_tx::RelayedTransactionStatusArgs,
24            tx::{
25                DryRunArg,
26                TxWithEstimatedPredicatesArg,
27            },
28        },
29        types::{
30            RelayedTransactionStatus,
31            asset::AssetDetail,
32            gas_price::LatestGasPrice,
33            message::MessageStatus,
34            primitives::{
35                Address,
36                AssetId,
37                BlockId,
38                ContractId,
39                UtxoId,
40            },
41            upgrades::StateTransitionBytecode,
42        },
43    },
44    reqwest_ext::FuelGraphQlResponse,
45    transport::FailoverTransport,
46};
47use anyhow::{
48    Context,
49    anyhow,
50};
51#[cfg(feature = "subscriptions")]
52use cynic::SubscriptionBuilder;
53use cynic::{
54    Id,
55    MutationBuilder,
56    Operation,
57    QueryBuilder,
58    QueryFragment,
59    QueryVariables,
60};
61use fuel_core_types::{
62    blockchain::header::{
63        ConsensusParametersVersion,
64        StateTransitionBytecodeVersion,
65    },
66    fuel_asm::{
67        Instruction,
68        Word,
69    },
70    fuel_tx::{
71        BlobId,
72        Bytes32,
73        ConsensusParameters,
74        Receipt,
75        Transaction,
76        TxId,
77    },
78    fuel_types::{
79        self,
80        BlockHeight,
81        Nonce,
82        canonical::Serialize,
83    },
84    services::executor::{
85        StorageReadReplayEvent,
86        TransactionExecutionStatus,
87    },
88};
89#[cfg(feature = "subscriptions")]
90use futures::{
91    Stream,
92    StreamExt,
93};
94use itertools::Itertools;
95use pagination::{
96    PageDirection,
97    PaginatedResult,
98    PaginationRequest,
99};
100use reqwest::Url;
101use schema::{
102    Bytes,
103    ContinueTx,
104    ContinueTxArgs,
105    ConversionError,
106    HexString,
107    IdArg,
108    MemoryArgs,
109    RegisterArgs,
110    RunResult,
111    SetBreakpoint,
112    SetBreakpointArgs,
113    SetSingleStepping,
114    SetSingleSteppingArgs,
115    StartTx,
116    StartTxArgs,
117    U32,
118    U64,
119    assets::AssetInfoArg,
120    balance::BalanceArgs,
121    blob::BlobByIdArgs,
122    block::BlockByIdArgs,
123    coins::{
124        CoinByIdArgs,
125        CoinsConnectionArgs,
126    },
127    contract::{
128        ContractBalancesConnectionArgs,
129        ContractByIdArgs,
130    },
131    da_compressed::DaCompressedBlockByHeightArgs,
132    gas_price::BlockHorizonArgs,
133    storage_read_replay::{
134        StorageReadReplay,
135        StorageReadReplayArgs,
136    },
137    tx::{
138        AssembleTxArg,
139        TransactionsByOwnerConnectionArgs,
140        TxArg,
141        TxIdArgs,
142    },
143};
144#[cfg(feature = "subscriptions")]
145use std::future;
146use std::{
147    convert::TryInto,
148    io::{
149        self,
150        ErrorKind,
151    },
152    net,
153    str::{
154        self,
155        FromStr,
156    },
157    sync::{
158        Arc,
159        Mutex,
160    },
161};
162use tai64::Tai64;
163use tracing as _;
164use types::{
165    TransactionResponse,
166    TransactionStatus,
167    assemble_tx::{
168        AssembleTransactionResult,
169        RequiredBalance,
170    },
171};
172
173#[cfg(feature = "subscriptions")]
174use std::pin::Pin;
175
176#[cfg(feature = "rpc")]
177mod rpc_deps {
178    pub use aws_config::{
179        BehaviorVersion,
180        default_provider::credentials::DefaultCredentialsChain,
181    };
182    pub use aws_sdk_s3::Client as AWSClient;
183    pub use flate2::read::{
184        DeflateDecoder,
185        GzDecoder,
186        ZlibDecoder,
187    };
188    pub use fuel_core_block_aggregator_api::{
189        blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf,
190        protobuf_types::{
191            Block as ProtoBlock,
192            BlockHeightRequest as ProtoBlockHeightRequest,
193            BlockRangeRequest as ProtoBlockRangeRequest,
194            BlockResponse,
195            NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest,
196            RemoteBlockResponse,
197            RemoteHttpEndpoint,
198            RemoteS3Bucket,
199            block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient,
200            block_response::Payload,
201            remote_block_response::Location,
202        },
203    };
204    pub use prost::Message;
205    pub use std::{
206        collections::HashMap,
207        io::{
208            Cursor,
209            Read,
210        },
211    };
212    pub use tokio::sync::RwLock;
213    pub use tonic::transport::Channel;
214}
215#[cfg(feature = "rpc")]
216use rpc_deps::*;
217
218/// `reqwest::Client` used only for fetching block bytes over HTTP(S). Automatic response
219/// decompression is disabled so `Content-Encoding` matches the raw body and manual decoding in
220/// `FuelClient` stays consistent.
221#[cfg(feature = "rpc")]
222fn remote_block_object_http_client() -> reqwest::Client {
223    reqwest::Client::builder()
224        .no_gzip()
225        .no_brotli()
226        .no_deflate()
227        .no_zstd()
228        .build()
229        .expect("reqwest ClientBuilder for remote block fetches should succeed")
230}
231
232pub mod pagination;
233pub mod schema;
234pub mod types;
235
236type RegisterId = u32;
237
238#[derive(Debug, derive_more::Display, derive_more::From)]
239#[non_exhaustive]
240/// Error occurring during interaction with the FuelClient
241// anyhow::Error is wrapped inside a custom Error type,
242// so that we can specific error variants in the future.
243pub enum Error {
244    /// Unknown or not expected(by architecture) error.
245    #[from]
246    Other(anyhow::Error),
247}
248
249/// Consistency policy for the [`FuelClient`] to define the strategy
250/// for the required height feature.
251#[derive(Debug)]
252pub enum ConsistencyPolicy {
253    /// Automatically fetch the next block height from the response and
254    /// use it as an input to the next query to guarantee consistency
255    /// of the results for the queries.
256    Auto {
257        /// The required block height for the queries.
258        height: Arc<Mutex<Option<BlockHeight>>>,
259    },
260    /// Use manually sets the block height for all queries
261    /// via the [`FuelClient::with_required_fuel_block_height`].
262    Manual {
263        /// The required block height for the queries.
264        height: Option<BlockHeight>,
265    },
266}
267
268impl Clone for ConsistencyPolicy {
269    fn clone(&self) -> Self {
270        match self {
271            Self::Auto { height } => Self::Auto {
272                // We don't want to share the same mutex between the different
273                // instances of the `FuelClient`.
274                height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
275            },
276            Self::Manual { height } => Self::Manual { height: *height },
277        }
278    }
279}
280
281#[derive(Debug, Default)]
282struct ChainStateInfo {
283    current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
284    current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
285    /// Cached node version string (e.g. "0.47.3"). Populated on first use.
286    node_version: Arc<Mutex<Option<String>>>,
287}
288
289impl Clone for ChainStateInfo {
290    fn clone(&self) -> Self {
291        Self {
292            current_stf_version: Arc::new(Mutex::new(
293                self.current_stf_version.lock().ok().and_then(|v| *v),
294            )),
295            current_consensus_parameters_version: Arc::new(Mutex::new(
296                self.current_consensus_parameters_version
297                    .lock()
298                    .ok()
299                    .and_then(|v| *v),
300            )),
301            node_version: Arc::new(Mutex::new(
302                self.node_version.lock().ok().and_then(|v| v.clone()),
303            )),
304        }
305    }
306}
307
308#[derive(Debug, Clone)]
309pub struct FuelClient {
310    transport: FailoverTransport,
311    require_height: ConsistencyPolicy,
312    chain_state_info: ChainStateInfo,
313    #[cfg(feature = "rpc")]
314    rpc_client: Option<ProtoBlockAggregatorClient<Channel>>,
315    #[cfg(feature = "rpc")]
316    aws_client: AWSClientManager,
317    /// Shared HTTP client for remote block fetches (connection pool reuse).
318    #[cfg(feature = "rpc")]
319    http_client: reqwest::Client,
320}
321
322#[cfg(feature = "rpc")]
323#[derive(Debug, Clone)]
324pub struct AWSClientManager {
325    specific: Arc<RwLock<HashMap<Option<String>, AWSClient>>>,
326}
327
328#[cfg(feature = "rpc")]
329impl AWSClientManager {
330    pub fn new() -> Self {
331        Self {
332            specific: Arc::new(RwLock::new(HashMap::new())),
333        }
334    }
335
336    pub async fn get_client(&self, url: &Option<String>) -> Result<AWSClient, io::Error> {
337        if let Some(existing) = self.specific.read().await.get(url).cloned() {
338            return Ok(existing);
339        }
340        let client = FuelClient::new_aws_client(url).await;
341        let mut guard = self.specific.write().await;
342        let client = guard
343            .entry(url.clone())
344            .or_insert_with(|| client.clone())
345            .clone();
346        Ok(client)
347    }
348}
349
350#[cfg(feature = "rpc")]
351impl Default for AWSClientManager {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357/// Normalizes a URL string by ensuring it has an http(s) scheme and the `/v1/graphql` path.
358fn normalize_url(url_str: &str) -> anyhow::Result<Url> {
359    let mut raw_url = url_str.to_string();
360    if !raw_url.starts_with("http") {
361        raw_url = format!("http://{raw_url}");
362    }
363
364    let mut url = reqwest::Url::parse(&raw_url)
365        .map_err(anyhow::Error::msg)
366        .with_context(|| format!("Invalid fuel-core URL: {url_str}"))?;
367    url.set_path("/v1/graphql");
368
369    Ok(url)
370}
371
372impl FromStr for FuelClient {
373    type Err = anyhow::Error;
374
375    fn from_str(str: &str) -> Result<Self, Self::Err> {
376        let url = normalize_url(str)?;
377
378        Ok(Self {
379            transport: FailoverTransport::new(vec![url])?,
380            require_height: ConsistencyPolicy::Auto {
381                height: Arc::new(Mutex::new(None)),
382            },
383            chain_state_info: Default::default(),
384            #[cfg(feature = "rpc")]
385            rpc_client: None,
386            #[cfg(feature = "rpc")]
387            aws_client: AWSClientManager::new(),
388            #[cfg(feature = "rpc")]
389            http_client: remote_block_object_http_client(),
390        })
391    }
392}
393
394impl<S> From<S> for FuelClient
395where
396    S: Into<net::SocketAddr>,
397{
398    fn from(socket: S) -> Self {
399        format!("http://{}", socket.into())
400            .as_str()
401            .parse()
402            .unwrap()
403    }
404}
405
406/// Returns `true` when the node version predates v0.48.0, which introduced
407/// `maxStorageSlotLength` on `ScriptParameters` and the storage gas-cost fields.
408/// Falls back to `false` (i.e. assume new node) if the version string cannot be parsed.
409fn is_legacy_node(version: &str) -> bool {
410    let mut parts = version.splitn(3, '.');
411    let major: u64 = match parts.next().and_then(|s| s.parse().ok()) {
412        Some(v) => v,
413        None => return false,
414    };
415    let minor: u64 = match parts.next().and_then(|s| s.parse().ok()) {
416        Some(v) => v,
417        None => return false,
418    };
419    major == 0 && minor < 48
420}
421
422pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
423    let e = errors
424        .into_iter()
425        .fold(String::from("Response errors"), |mut s, e| {
426            s.push_str("; ");
427            s.push_str(e.as_str());
428            s
429        });
430    io::Error::other(e)
431}
432
433impl FuelClient {
434    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
435        Self::from_str(url.as_ref())
436    }
437
438    #[cfg(feature = "rpc")]
439    pub async fn new_with_rpc<G: AsRef<str>, R: AsRef<str>>(
440        graph_ql_urls: impl Iterator<Item = G>,
441        rpc_url: R,
442    ) -> anyhow::Result<Self> {
443        let urls: Vec<_> = graph_ql_urls
444            .map(|str| normalize_url(str.as_ref()))
445            .try_collect()?;
446        let mut client = Self::with_urls(&urls)?;
447        let mut raw_rpc_url = <R as AsRef<str>>::as_ref(&rpc_url).to_string();
448        if !raw_rpc_url.starts_with("http") {
449            raw_rpc_url = format!("http://{raw_rpc_url}");
450        }
451        let rpc_client = ProtoBlockAggregatorClient::connect(raw_rpc_url).await?;
452        client.rpc_client = Some(rpc_client);
453        client.aws_client = AWSClientManager::new();
454        Ok(client)
455    }
456
457    pub fn with_urls(urls: &[impl AsRef<str>]) -> anyhow::Result<Self> {
458        if urls.is_empty() {
459            return Err(anyhow!("Failed to create FuelClient. No URL is provided."));
460        }
461        let urls = urls
462            .iter()
463            .map(|url| normalize_url(url.as_ref()))
464            .collect::<Result<Vec<_>, _>>()?;
465        Ok(Self {
466            transport: FailoverTransport::new(urls)?,
467            require_height: ConsistencyPolicy::Auto {
468                height: Arc::new(Mutex::new(None)),
469            },
470            chain_state_info: Default::default(),
471            #[cfg(feature = "rpc")]
472            rpc_client: None,
473            #[cfg(feature = "rpc")]
474            aws_client: AWSClientManager::new(),
475            #[cfg(feature = "rpc")]
476            http_client: remote_block_object_http_client(),
477        })
478    }
479
480    pub fn get_default_url(&self) -> &Url {
481        self.transport.get_default_url()
482    }
483}
484
485impl FuelClient {
486    pub fn with_required_fuel_block_height(
487        &mut self,
488        new_height: Option<BlockHeight>,
489    ) -> &mut Self {
490        match &mut self.require_height {
491            ConsistencyPolicy::Auto { height } => {
492                *height.lock().expect("Mutex poisoned") = new_height;
493            }
494            ConsistencyPolicy::Manual { height } => {
495                *height = new_height;
496            }
497        }
498        self
499    }
500
501    pub fn use_manual_consistency_policy(
502        &mut self,
503        height: Option<BlockHeight>,
504    ) -> &mut Self {
505        self.require_height = ConsistencyPolicy::Manual { height };
506        self
507    }
508
509    pub fn decode_response<R, E>(
510        &self,
511        response: FuelGraphQlResponse<R, E>,
512    ) -> io::Result<R>
513    where
514        R: serde::de::DeserializeOwned + 'static,
515    {
516        if response
517            .extensions
518            .as_ref()
519            .and_then(|e| e.fuel_block_height_precondition_failed)
520            == Some(true)
521        {
522            return Err(io::Error::other("The required block height was not met"));
523        }
524
525        let response = response.response;
526
527        match (response.data, response.errors) {
528            (Some(d), _) => Ok(d),
529            (_, Some(e)) => Err(from_strings_errors_to_std_error(
530                e.into_iter().map(|e| e.message).collect(),
531            )),
532            _ => Err(io::Error::other("Invalid response")),
533        }
534    }
535
536    pub fn required_block_height(&self) -> Option<BlockHeight> {
537        match &self.require_height {
538            ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
539            ConsistencyPolicy::Manual { height } => *height,
540        }
541    }
542
543    fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
544        if let Some(current_sft_version) = response
545            .extensions
546            .as_ref()
547            .and_then(|e| e.current_stf_version)
548            && let Ok(mut c) = self.chain_state_info.current_stf_version.lock()
549        {
550            *c = Some(current_sft_version);
551        }
552
553        if let Some(current_consensus_parameters_version) = response
554            .extensions
555            .as_ref()
556            .and_then(|e| e.current_consensus_parameters_version)
557            && let Ok(mut c) = self
558                .chain_state_info
559                .current_consensus_parameters_version
560                .lock()
561        {
562            *c = Some(current_consensus_parameters_version);
563        }
564
565        let inner_required_height = match &self.require_height {
566            ConsistencyPolicy::Auto { height } => Some(height.clone()),
567            ConsistencyPolicy::Manual { .. } => None,
568        };
569
570        if let Some(inner_required_height) = inner_required_height
571            && let Some(current_fuel_block_height) = response
572                .extensions
573                .as_ref()
574                .and_then(|e| e.current_fuel_block_height)
575        {
576            let mut lock = inner_required_height.lock().expect("Mutex poisoned");
577
578            if current_fuel_block_height >= lock.unwrap_or_default() {
579                *lock = Some(current_fuel_block_height);
580            }
581        }
582    }
583
584    /// Fetch the connected node's version string, caching it after the first call.
585    /// Uses a minimal query fragment that is safe for all node versions.
586    async fn ensure_node_version(&self) -> io::Result<String> {
587        {
588            let lock = self
589                .chain_state_info
590                .node_version
591                .lock()
592                .map_err(|_| io::Error::other("Mutex poisoned"))?;
593            if let Some(v) = lock.as_ref() {
594                return Ok(v.clone());
595            }
596        }
597        let query = schema::node_info::QueryNodeVersionInfo::build(());
598        let version = self.query(query).await?.node_info.node_version;
599        if let Ok(mut lock) = self.chain_state_info.node_version.lock() {
600            *lock = Some(version.clone());
601        }
602        Ok(version)
603    }
604
605    /// Send the GraphQL query to the client.
606    pub async fn query<ResponseData, Vars>(
607        &self,
608        q: Operation<ResponseData, Vars>,
609    ) -> io::Result<ResponseData>
610    where
611        Vars: serde::Serialize + Clone + QueryVariables + Send + 'static,
612        ResponseData: serde::de::DeserializeOwned + QueryFragment + Send + 'static,
613    {
614        let required_fuel_block_height = self.required_block_height();
615        let response = self.transport.query(q, required_fuel_block_height).await?;
616
617        self.update_chain_state_info(&response);
618        self.decode_response(response)
619    }
620
621    #[tracing::instrument(skip_all)]
622    #[cfg(feature = "subscriptions")]
623    async fn subscribe<ResponseData, Variables>(
624        &self,
625        variables: Variables,
626    ) -> io::Result<Pin<Box<impl futures::Stream<Item = io::Result<ResponseData>> + '_>>>
627    where
628        Variables: serde::Serialize + QueryVariables + Send + Clone + 'static,
629        ResponseData: serde::de::DeserializeOwned
630            + QueryFragment
631            + SubscriptionBuilder<Variables>
632            + 'static
633            + Send,
634    {
635        let stream = self
636            .transport
637            .subscribe(variables, self.required_block_height())
638            .await?;
639
640        let client = self; // capture immutably
641        Ok(Box::pin(stream.filter_map(move |result| {
642            async move {
643                match result {
644                    Ok(resp) => {
645                        client.update_chain_state_info(&resp);
646                        Some(client.decode_response(resp))
647                    }
648                    Err(e) => Some(Err(e)), // pass through untouched
649                }
650            }
651        })))
652    }
653
654    pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
655        self.chain_state_info
656            .current_stf_version
657            .lock()
658            .ok()
659            .and_then(|value| *value)
660    }
661
662    pub fn latest_consensus_parameters_version(
663        &self,
664    ) -> Option<ConsensusParametersVersion> {
665        self.chain_state_info
666            .current_consensus_parameters_version
667            .lock()
668            .ok()
669            .and_then(|value| *value)
670    }
671
672    pub async fn health(&self) -> io::Result<bool> {
673        let query = schema::Health::build(());
674        self.query(query).await.map(|r| r.health)
675    }
676
677    pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
678        let query = schema::node_info::QueryNodeInfo::build(());
679        self.query(query).await.map(|r| r.node_info.into())
680    }
681
682    pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
683        let query = schema::gas_price::QueryLatestGasPrice::build(());
684        self.query(query).await.map(|r| r.latest_gas_price.into())
685    }
686
687    pub async fn estimate_gas_price(
688        &self,
689        block_horizon: u32,
690    ) -> io::Result<EstimateGasPrice> {
691        let args = BlockHorizonArgs {
692            block_horizon: Some(block_horizon.into()),
693        };
694        let query = schema::gas_price::QueryEstimateGasPrice::build(args);
695        self.query(query).await.map(|r| r.estimate_gas_price)
696    }
697
698    #[cfg(feature = "std")]
699    pub async fn connected_peers_info(
700        &self,
701    ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
702        let query = schema::node_info::QueryPeersInfo::build(());
703        self.query(query)
704            .await
705            .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
706    }
707
708    pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
709        let node_version = self.ensure_node_version().await?;
710        if is_legacy_node(&node_version) {
711            let query = schema::chain::ChainQueryLegacy::build(());
712            self.query(query).await.and_then(|r| {
713                let result = r.chain.try_into()?;
714                Ok(result)
715            })
716        } else {
717            let query = schema::chain::ChainQuery::build(());
718            self.query(query).await.and_then(|r| {
719                let result = r.chain.try_into()?;
720                Ok(result)
721            })
722        }
723    }
724
725    pub async fn consensus_parameters(
726        &self,
727        version: i32,
728    ) -> io::Result<Option<ConsensusParameters>> {
729        let node_version = self.ensure_node_version().await?;
730        let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
731
732        if is_legacy_node(&node_version) {
733            let query =
734                schema::upgrades::ConsensusParametersByVersionQueryLegacy::build(args);
735            let result = self
736                .query(query)
737                .await?
738                .consensus_parameters
739                .map(TryInto::try_into)
740                .transpose()?;
741            Ok(result)
742        } else {
743            let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
744            let result = self
745                .query(query)
746                .await?
747                .consensus_parameters
748                .map(TryInto::try_into)
749                .transpose()?;
750            Ok(result)
751        }
752    }
753
754    pub async fn state_transition_byte_code_by_version(
755        &self,
756        version: i32,
757    ) -> io::Result<Option<StateTransitionBytecode>> {
758        let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
759        let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
760
761        let result = self
762            .query(query)
763            .await?
764            .state_transition_bytecode_by_version
765            .map(TryInto::try_into)
766            .transpose()?;
767
768        Ok(result)
769    }
770
771    pub async fn state_transition_byte_code_by_root(
772        &self,
773        root: Bytes32,
774    ) -> io::Result<Option<StateTransitionBytecode>> {
775        let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
776            root: HexString(Bytes(root.to_vec())),
777        };
778        let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
779
780        let result = self
781            .query(query)
782            .await?
783            .state_transition_bytecode_by_root
784            .map(TryInto::try_into)
785            .transpose()?;
786
787        Ok(result)
788    }
789
790    /// Default dry run, matching the exact configuration as the node
791    pub async fn dry_run(
792        &self,
793        txs: &[Transaction],
794    ) -> io::Result<Vec<TransactionExecutionStatus>> {
795        self.dry_run_opt(txs, None, None, None).await
796    }
797
798    /// Dry run with options to override the node behavior
799    pub async fn dry_run_opt(
800        &self,
801        txs: &[Transaction],
802        // Disable utxo input checks (exists, unspent, and valid signature)
803        utxo_validation: Option<bool>,
804        gas_price: Option<u64>,
805        at_height: Option<BlockHeight>,
806    ) -> io::Result<Vec<TransactionExecutionStatus>> {
807        let txs = txs
808            .iter()
809            .map(|tx| HexString(Bytes(tx.to_bytes())))
810            .collect::<Vec<HexString>>();
811        let query: Operation<schema::tx::DryRun, DryRunArg> =
812            schema::tx::DryRun::build(DryRunArg {
813                txs,
814                utxo_validation,
815                gas_price: gas_price.map(|gp| gp.into()),
816                block_height: at_height.map(|bh| bh.into()),
817            });
818        let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
819        tx_statuses
820            .into_iter()
821            .map(|tx_status| tx_status.try_into().map_err(Into::into))
822            .collect()
823    }
824
825    /// Like `dry_run_opt`, but also returns the storage reads
826    pub async fn dry_run_opt_record_storage_reads(
827        &self,
828        txs: &[Transaction],
829        // Disable utxo input checks (exists, unspent, and valid signature)
830        utxo_validation: Option<bool>,
831        gas_price: Option<u64>,
832        at_height: Option<BlockHeight>,
833    ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
834        let txs = txs
835            .iter()
836            .map(|tx| HexString(Bytes(tx.to_bytes())))
837            .collect::<Vec<HexString>>();
838        let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
839            schema::tx::DryRunRecordStorageReads::build(DryRunArg {
840                txs,
841                utxo_validation,
842                gas_price: gas_price.map(|gp| gp.into()),
843                block_height: at_height.map(|bh| bh.into()),
844            });
845        let result = self
846            .query(query)
847            .await
848            .map(|r| r.dry_run_record_storage_reads)?;
849        let tx_statuses = result
850            .tx_statuses
851            .into_iter()
852            .map(|tx_status| tx_status.try_into().map_err(Into::into))
853            .collect::<io::Result<Vec<_>>>()?;
854        let storage_reads = result
855            .storage_reads
856            .into_iter()
857            .map(Into::into)
858            .collect::<Vec<_>>();
859        Ok((tx_statuses, storage_reads))
860    }
861
862    /// Get storage read replay for a block
863    pub async fn storage_read_replay(
864        &self,
865        height: &BlockHeight,
866    ) -> io::Result<Vec<StorageReadReplayEvent>> {
867        let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
868            StorageReadReplay::build(StorageReadReplayArgs {
869                height: (*height).into(),
870            });
871        Ok(self
872            .query(query)
873            .await
874            .map(|r| r.storage_read_replay)?
875            .into_iter()
876            .map(Into::into)
877            .collect())
878    }
879
880    /// Assembles the transaction based on the provided requirements.
881    /// The return transaction contains:
882    /// - Input coins to cover `required_balances`
883    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
884    /// - `Change` or `Destroy` outputs for all assets from the inputs
885    /// - `Variable` outputs in the case they are required during the execution
886    /// - `Contract` inputs and outputs in the case they are required during the execution
887    /// - Reserved witness slots for signed coins filled with `64` zeroes
888    /// - Set script gas limit(unless `script` is empty)
889    /// - Estimated predicates, if `estimate_predicates == true`
890    ///
891    /// Returns an error if:
892    /// - The number of required balances exceeds the maximum number of inputs allowed.
893    /// - The fee address index is out of bounds.
894    /// - The same asset has multiple change policies(either the receiver of
895    ///   the change is different, or one of the policies states about the destruction
896    ///   of the token while the other does not). The `Change` output from the transaction
897    ///   also count as a `ChangePolicy`.
898    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
899    /// - Required assets have multiple entries.
900    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
901    /// - If a constructed transaction breaks the rules defined by consensus parameters.
902    #[allow(clippy::too_many_arguments)]
903    pub async fn assemble_tx(
904        &self,
905        tx: &Transaction,
906        block_horizon: u32,
907        required_balances: Vec<RequiredBalance>,
908        fee_address_index: u16,
909        exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
910        estimate_predicates: bool,
911        reserve_gas: Option<u64>,
912    ) -> io::Result<AssembleTransactionResult> {
913        let tx = HexString(Bytes(tx.to_bytes()));
914        let block_horizon = block_horizon.into();
915
916        let required_balances: Vec<_> = required_balances
917            .into_iter()
918            .map(schema::tx::RequiredBalance::try_from)
919            .collect::<Result<Vec<_>, _>>()?;
920
921        let fee_address_index = fee_address_index.into();
922
923        let exclude_input = exclude.map(Into::into);
924
925        let reserve_gas = reserve_gas.map(U64::from);
926
927        let query_arg = AssembleTxArg {
928            tx,
929            block_horizon,
930            required_balances,
931            fee_address_index,
932            exclude_input,
933            estimate_predicates,
934            reserve_gas,
935        };
936
937        let query = schema::tx::AssembleTx::build(query_arg);
938        let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
939        Ok(assemble_tx_result.try_into()?)
940    }
941
942    /// Estimate predicates for the transaction
943    pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
944        let serialized_tx = tx.to_bytes();
945        let query = schema::tx::EstimatePredicates::build(TxArg {
946            tx: HexString(Bytes(serialized_tx)),
947        });
948        let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
949        let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
950        *tx = tx_with_predicate;
951        Ok(())
952    }
953
954    pub async fn submit(
955        &self,
956        tx: &Transaction,
957    ) -> io::Result<types::primitives::TransactionId> {
958        self.submit_opt(tx, None).await
959    }
960
961    pub async fn submit_opt(
962        &self,
963        tx: &Transaction,
964        estimate_predicates: Option<bool>,
965    ) -> io::Result<types::primitives::TransactionId> {
966        let tx = tx.clone().to_bytes();
967        let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
968            tx: HexString(Bytes(tx)),
969            estimate_predicates,
970        });
971
972        let id = self.query(query).await.map(|r| r.submit)?.id.into();
973        Ok(id)
974    }
975
976    /// Similar to [`Self::submit_and_await_commit_opt`], but with default options.
977    #[cfg(feature = "subscriptions")]
978    pub async fn submit_and_await_commit(
979        &self,
980        tx: &Transaction,
981    ) -> io::Result<TransactionStatus> {
982        self.submit_and_await_commit_opt(tx, None).await
983    }
984
985    /// Submit the transaction and wait for it either to be included in
986    /// a block or removed from `TxPool`.
987    ///
988    /// If `estimate_predicates` is set, the predicates will be estimated before
989    /// the transaction is inserted into transaction pool.
990    ///
991    /// This will wait forever if needed, so consider wrapping this call
992    /// with a `tokio::time::timeout`.
993    #[cfg(feature = "subscriptions")]
994    pub async fn submit_and_await_commit_opt(
995        &self,
996        tx: &Transaction,
997        estimate_predicates: Option<bool>,
998    ) -> io::Result<TransactionStatus> {
999        let tx = tx.clone().to_bytes();
1000        let variables = TxWithEstimatedPredicatesArg {
1001            tx: HexString(Bytes(tx)),
1002            estimate_predicates,
1003        };
1004
1005        let mut stream = self.subscribe(variables).await?.map(
1006            |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
1007                let status: TransactionStatus = r?.submit_and_await.try_into()?;
1008                Result::<_, io::Error>::Ok(status)
1009            },
1010        );
1011
1012        let status = stream.next().await.ok_or_else(|| {
1013            io::Error::other("Failed to get status from the submission")
1014        })??;
1015
1016        Ok(status)
1017    }
1018
1019    /// Similar to [`Self::submit_and_await_commit`], but the status also contains transaction.
1020    #[cfg(feature = "subscriptions")]
1021    pub async fn submit_and_await_commit_with_tx(
1022        &self,
1023        tx: &Transaction,
1024    ) -> io::Result<StatusWithTransaction> {
1025        self.submit_and_await_commit_with_tx_opt(tx, None).await
1026    }
1027
1028    /// Similar to [`Self::submit_and_await_commit_opt`], but the status also contains transaction.
1029    #[cfg(feature = "subscriptions")]
1030    pub async fn submit_and_await_commit_with_tx_opt(
1031        &self,
1032        tx: &Transaction,
1033        estimate_predicates: Option<bool>,
1034    ) -> io::Result<StatusWithTransaction> {
1035        let tx = tx.clone().to_bytes();
1036        let variables = TxWithEstimatedPredicatesArg {
1037            tx: HexString(Bytes(tx)),
1038            estimate_predicates,
1039        };
1040
1041        let mut stream = self.subscribe(variables).await?.map(
1042            |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
1043                let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
1044                Result::<_, io::Error>::Ok(status)
1045            },
1046        );
1047
1048        let status = stream.next().await.ok_or_else(|| {
1049            io::Error::other("Failed to get status from the submission")
1050        })??;
1051
1052        Ok(status)
1053    }
1054
1055    /// Similar to [`Self::submit_and_await_commit`], but includes all intermediate states.
1056    #[cfg(feature = "subscriptions")]
1057    pub async fn submit_and_await_status(
1058        &self,
1059        tx: &Transaction,
1060    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1061        self.submit_and_await_status_opt(tx, None, None).await
1062    }
1063
1064    /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
1065    #[cfg(feature = "subscriptions")]
1066    pub async fn submit_and_await_status_opt(
1067        &self,
1068        tx: &Transaction,
1069        estimate_predicates: Option<bool>,
1070        include_preconfirmation: Option<bool>,
1071    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1072        use schema::tx::SubmitAndAwaitStatusArg;
1073        let tx = tx.clone().to_bytes();
1074        let variables = SubmitAndAwaitStatusArg {
1075            tx: HexString(Bytes(tx)),
1076            estimate_predicates,
1077            include_preconfirmation,
1078        };
1079
1080        let stream = self.subscribe(variables).await?.map(
1081            |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1082                let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1083                Result::<_, io::Error>::Ok(status)
1084            },
1085        );
1086
1087        Ok(stream)
1088    }
1089
1090    /// Requests all storage slots for the `contract_id`.
1091    #[cfg(feature = "subscriptions")]
1092    pub async fn contract_storage_slots(
1093        &self,
1094        contract_id: &ContractId,
1095    ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1096        use schema::storage::ContractStorageSlotsArgs;
1097        let variables = ContractStorageSlotsArgs {
1098            contract_id: (*contract_id).into(),
1099        };
1100
1101        let stream = self.subscribe(variables).await?.map(
1102            |result: io::Result<schema::storage::ContractStorageSlots>| {
1103                let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1104                Result::<_, io::Error>::Ok(result)
1105            },
1106        );
1107
1108        Ok(stream)
1109    }
1110
1111    /// Requests all storage balances for the `contract_id`.
1112    #[cfg(feature = "subscriptions")]
1113    pub async fn contract_storage_balances(
1114        &self,
1115        contract_id: &ContractId,
1116    ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1117    {
1118        use schema::{
1119            contract::ContractBalance,
1120            storage::ContractStorageBalancesArgs,
1121        };
1122        let variables = ContractStorageBalancesArgs {
1123            contract_id: (*contract_id).into(),
1124        };
1125
1126        let stream = self.subscribe(variables).await?.map(
1127            |result: io::Result<schema::storage::ContractStorageBalances>| {
1128                let result: ContractBalance = result?.contract_storage_balances;
1129                Result::<_, io::Error>::Ok(result)
1130            },
1131        );
1132
1133        Ok(stream)
1134    }
1135
1136    /// Returns a stream of new blocks.
1137    #[cfg(feature = "subscriptions")]
1138    pub async fn new_blocks_subscription(
1139        &self,
1140    ) -> io::Result<
1141        impl Stream<
1142            Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1143        > + '_,
1144    > {
1145        let stream = self.subscribe(()).await?.map(
1146            |r: io::Result<schema::block::NewBlocksSubscription>| {
1147                let result: fuel_core_types::services::block_importer::ImportResult =
1148                    postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1149                        io::Error::other(format!(
1150                            "Failed to deserialize ImportResult: {e:?}"
1151                        ))
1152                    })?;
1153                Result::<_, io::Error>::Ok(result)
1154            },
1155        );
1156
1157        Ok(stream)
1158    }
1159
1160    /// Returns a stream of preconfirmations for all transactions.
1161    #[cfg(feature = "subscriptions")]
1162    pub async fn preconfirmations_subscription(
1163        &self,
1164    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1165        let stream = self.subscribe(()).await?.map(
1166            |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1167                let status: TransactionStatus = r?.preconfirmations.try_into()?;
1168                Result::<_, io::Error>::Ok(status)
1169            },
1170        );
1171
1172        Ok(stream)
1173    }
1174
1175    pub async fn contract_slots_values(
1176        &self,
1177        contract_id: &ContractId,
1178        block_height: Option<BlockHeight>,
1179        requested_storage_slots: Vec<Bytes32>,
1180    ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1181        let query = schema::storage::ContractSlotValues::build(
1182            schema::storage::ContractSlotValuesArgs {
1183                contract_id: (*contract_id).into(),
1184                block_height: block_height.map(|b| (*b).into()),
1185                storage_slots: requested_storage_slots
1186                    .into_iter()
1187                    .map(Into::into)
1188                    .collect(),
1189            },
1190        );
1191
1192        self.query(query)
1193            .await
1194            .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1195    }
1196
1197    pub async fn contract_balance_values(
1198        &self,
1199        contract_id: &ContractId,
1200        block_height: Option<BlockHeight>,
1201        requested_storage_slots: Vec<AssetId>,
1202    ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1203        let query = schema::storage::ContractBalanceValues::build(
1204            schema::storage::ContractBalanceValuesArgs {
1205                contract_id: (*contract_id).into(),
1206                block_height: block_height.map(|b| (*b).into()),
1207                assets: requested_storage_slots
1208                    .into_iter()
1209                    .map(Into::into)
1210                    .collect(),
1211            },
1212        );
1213
1214        self.query(query)
1215            .await
1216            .map(|r| r.contract_balance_values.into_iter().collect())
1217    }
1218
1219    pub async fn start_session(&self) -> io::Result<String> {
1220        let query = schema::StartSession::build(());
1221
1222        self.query(query)
1223            .await
1224            .map(|r| r.start_session.into_inner())
1225    }
1226
1227    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1228        let query = schema::EndSession::build(IdArg { id: id.into() });
1229
1230        self.query(query).await.map(|r| r.end_session)
1231    }
1232
1233    pub async fn reset(&self, id: &str) -> io::Result<bool> {
1234        let query = schema::Reset::build(IdArg { id: id.into() });
1235
1236        self.query(query).await.map(|r| r.reset)
1237    }
1238
1239    pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1240        let op = serde_json::to_string(op)?;
1241        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1242
1243        self.query(query).await.map(|r| r.execute)
1244    }
1245
1246    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1247        let query = schema::Register::build(RegisterArgs {
1248            id: id.into(),
1249            register: register.into(),
1250        });
1251
1252        Ok(self.query(query).await?.register.0 as Word)
1253    }
1254
1255    pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1256        let query = schema::Memory::build(MemoryArgs {
1257            id: id.into(),
1258            start: start.into(),
1259            size: size.into(),
1260        });
1261
1262        let memory = self.query(query).await?.memory;
1263
1264        Ok(serde_json::from_str(memory.as_str())?)
1265    }
1266
1267    pub async fn set_breakpoint(
1268        &self,
1269        session_id: &str,
1270        contract: fuel_types::ContractId,
1271        pc: u64,
1272    ) -> io::Result<()> {
1273        let operation = SetBreakpoint::build(SetBreakpointArgs {
1274            id: Id::new(session_id),
1275            bp: schema::Breakpoint {
1276                contract: contract.into(),
1277                pc: U64(pc),
1278            },
1279        });
1280
1281        let response = self.query(operation).await?;
1282        assert!(
1283            response.set_breakpoint,
1284            "Setting breakpoint returned invalid reply"
1285        );
1286        Ok(())
1287    }
1288
1289    pub async fn set_single_stepping(
1290        &self,
1291        session_id: &str,
1292        enable: bool,
1293    ) -> io::Result<()> {
1294        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1295            id: Id::new(session_id),
1296            enable,
1297        });
1298        self.query(operation).await?;
1299        Ok(())
1300    }
1301
1302    pub async fn start_tx(
1303        &self,
1304        session_id: &str,
1305        tx: &Transaction,
1306    ) -> io::Result<RunResult> {
1307        let operation = StartTx::build(StartTxArgs {
1308            id: Id::new(session_id),
1309            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1310        });
1311        let response = self.query(operation).await?.start_tx;
1312        Ok(response)
1313    }
1314
1315    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1316        let operation = ContinueTx::build(ContinueTxArgs {
1317            id: Id::new(session_id),
1318        });
1319        let response = self.query(operation).await?.continue_tx;
1320        Ok(response)
1321    }
1322
1323    pub async fn transaction(
1324        &self,
1325        id: &TxId,
1326    ) -> io::Result<Option<TransactionResponse>> {
1327        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1328
1329        let transaction = self.query(query).await?.transaction;
1330
1331        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1332    }
1333
1334    /// Get the status of a transaction
1335    pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1336        let query =
1337            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1338
1339        let status = self.query(query).await?.transaction.ok_or_else(|| {
1340            io::Error::new(
1341                ErrorKind::NotFound,
1342                format!("status not found for transaction {id} "),
1343            )
1344        })?;
1345
1346        let status = status
1347            .status
1348            .ok_or_else(|| {
1349                io::Error::new(
1350                    ErrorKind::NotFound,
1351                    format!("status not found for transaction {id}"),
1352                )
1353            })?
1354            .try_into()?;
1355        Ok(status)
1356    }
1357
1358    #[tracing::instrument(skip(self), level = "debug")]
1359    #[cfg(feature = "subscriptions")]
1360    /// Similar to [`Self::subscribe_transaction_status_opt`], but with default options.
1361    pub async fn subscribe_transaction_status(
1362        &self,
1363        id: &TxId,
1364    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1365        self.subscribe_transaction_status_opt(id, None).await
1366    }
1367
1368    #[cfg(feature = "subscriptions")]
1369    /// Subscribe to the status of a transaction
1370    pub async fn subscribe_transaction_status_opt(
1371        &self,
1372        id: &TxId,
1373        include_preconfirmation: Option<bool>,
1374    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1375        use schema::tx::{
1376            StatusChangeSubscription,
1377            StatusChangeSubscriptionArgs,
1378        };
1379        let tx_id: TransactionId = (*id).into();
1380        let variables = StatusChangeSubscriptionArgs {
1381            id: tx_id,
1382            include_preconfirmation,
1383        };
1384
1385        tracing::debug!("subscribing");
1386        let stream = self
1387            .subscribe::<StatusChangeSubscription, StatusChangeSubscriptionArgs>(
1388                variables,
1389            )
1390            .await?
1391            .map(|tx| {
1392                tracing::debug!("received {tx:?}");
1393                let tx = tx?;
1394                let status = tx.status_change.try_into()?;
1395                Ok(status)
1396            });
1397
1398        Ok(stream)
1399    }
1400
1401    #[cfg(feature = "subscriptions")]
1402    /// Awaits for the transaction to be committed into a block
1403    ///
1404    /// This will wait forever if needed, so consider wrapping this call
1405    /// with a `tokio::time::timeout`.
1406    pub async fn await_transaction_commit(
1407        &self,
1408        id: &TxId,
1409    ) -> io::Result<TransactionStatus> {
1410        // skip until we've reached a final status and then stop consuming the stream
1411        // to avoid an EOF which the eventsource client considers as an error.
1412        let status_result = self
1413            .subscribe_transaction_status(id)
1414            .await?
1415            .skip_while(|status| {
1416                future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1417            })
1418            .next()
1419            .await;
1420
1421        if let Some(Ok(status)) = status_result {
1422            Ok(status)
1423        } else {
1424            Err(io::Error::other(format!(
1425                "Failed to get status for transaction {status_result:?}"
1426            )))
1427        }
1428    }
1429
1430    /// returns a paginated set of transactions sorted by block height
1431    pub async fn transactions(
1432        &self,
1433        request: PaginationRequest<String>,
1434    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1435        let args = schema::ConnectionArgs::from(request);
1436        let query = schema::tx::TransactionsQuery::build(args);
1437        let transactions = self.query(query).await?.transactions.try_into()?;
1438        Ok(transactions)
1439    }
1440
1441    /// Returns a paginated set of transactions associated with a txo owner address.
1442    pub async fn transactions_by_owner(
1443        &self,
1444        owner: &Address,
1445        request: PaginationRequest<String>,
1446    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1447        let owner: schema::Address = (*owner).into();
1448        let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1449        let query = schema::tx::TransactionsByOwnerQuery::build(args);
1450
1451        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1452        Ok(transactions)
1453    }
1454
1455    pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1456        let query =
1457            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1458
1459        let tx = self.query(query).await?.transaction.ok_or_else(|| {
1460            io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1461        })?;
1462
1463        let receipts = match tx.status {
1464            Some(status) => match status {
1465                schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1466                    s.receipts
1467                        .into_iter()
1468                        .map(TryInto::<Receipt>::try_into)
1469                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1470                )
1471                .transpose()?,
1472                schema::tx::TransactionStatus::FailureStatus(s) => Some(
1473                    s.receipts
1474                        .into_iter()
1475                        .map(TryInto::<Receipt>::try_into)
1476                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1477                )
1478                .transpose()?,
1479                _ => None,
1480            },
1481            _ => None,
1482        };
1483
1484        Ok(receipts)
1485    }
1486
1487    #[cfg(feature = "test-helpers")]
1488    pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1489        let query = schema::tx::AllReceipts::build(());
1490        let receipts = self.query(query).await?.all_receipts;
1491
1492        let vec: Result<Vec<Receipt>, ConversionError> = receipts
1493            .into_iter()
1494            .map(TryInto::<Receipt>::try_into)
1495            .collect();
1496
1497        Ok(vec?)
1498    }
1499
1500    pub async fn produce_blocks(
1501        &self,
1502        blocks_to_produce: u32,
1503        start_timestamp: Option<u64>,
1504    ) -> io::Result<BlockHeight> {
1505        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1506            blocks_to_produce: blocks_to_produce.into(),
1507            start_timestamp: start_timestamp
1508                .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1509        });
1510
1511        let new_height = self.query(query).await?.produce_blocks;
1512
1513        Ok(new_height.into())
1514    }
1515
1516    pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1517        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1518            id: Some((*id).into()),
1519        });
1520
1521        let block = self
1522            .query(query)
1523            .await?
1524            .block
1525            .map(TryInto::try_into)
1526            .transpose()?;
1527
1528        Ok(block)
1529    }
1530
1531    pub async fn block_by_height(
1532        &self,
1533        height: BlockHeight,
1534    ) -> io::Result<Option<types::Block>> {
1535        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1536            height: Some(U32(height.into())),
1537        });
1538
1539        let block = self
1540            .query(query)
1541            .await?
1542            .block
1543            .map(TryInto::try_into)
1544            .transpose()?;
1545
1546        Ok(block)
1547    }
1548
1549    pub async fn da_compressed_block(
1550        &self,
1551        height: BlockHeight,
1552    ) -> io::Result<Option<Vec<u8>>> {
1553        let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1554            DaCompressedBlockByHeightArgs {
1555                height: U32(height.into()),
1556            },
1557        );
1558
1559        Ok(self
1560            .query(query)
1561            .await?
1562            .da_compressed_block
1563            .map(|b| b.bytes.into()))
1564    }
1565
1566    /// Retrieve a blob by its ID
1567    pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1568        let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1569        let blob = self.query(query).await?.blob.map(Into::into);
1570        Ok(blob)
1571    }
1572
1573    /// Check whether a blob with ID exists
1574    pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1575        let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1576        Ok(self.query(query).await?.blob.is_some())
1577    }
1578
1579    /// Retrieve multiple blocks
1580    pub async fn blocks(
1581        &self,
1582        request: PaginationRequest<String>,
1583    ) -> io::Result<PaginatedResult<types::Block, String>> {
1584        let args = schema::ConnectionArgs::from(request);
1585        let query = schema::block::BlocksQuery::build(args);
1586
1587        let blocks = self.query(query).await?.blocks.try_into()?;
1588
1589        Ok(blocks)
1590    }
1591
1592    pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1593        let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1594            utxo_id: (*id).into(),
1595        });
1596        let coin = self.query(query).await?.coin.map(Into::into);
1597        Ok(coin)
1598    }
1599
1600    /// Retrieve a page of coins by their owner
1601    pub async fn coins(
1602        &self,
1603        owner: &Address,
1604        asset_id: Option<&AssetId>,
1605        request: PaginationRequest<String>,
1606    ) -> io::Result<PaginatedResult<types::Coin, String>> {
1607        let owner: schema::Address = (*owner).into();
1608        let asset_id = asset_id.map(|id| (*id).into());
1609        let args = CoinsConnectionArgs::from((owner, asset_id, request));
1610        let query = schema::coins::CoinsQuery::build(args);
1611
1612        let coins = self.query(query).await?.coins.into();
1613        Ok(coins)
1614    }
1615
1616    /// Retrieve coins to spend in a transaction
1617    pub async fn coins_to_spend(
1618        &self,
1619        owner: &Address,
1620        spend_query: Vec<(AssetId, u128, Option<u16>)>,
1621        // (Utxos, Messages Nonce)
1622        excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1623    ) -> io::Result<Vec<Vec<types::CoinType>>> {
1624        let owner: schema::Address = (*owner).into();
1625        let spend_query: Vec<SpendQueryElementInput> = spend_query
1626            .iter()
1627            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1628                Ok(SpendQueryElementInput {
1629                    asset_id: (*asset_id).into(),
1630                    amount: (*amount).into(),
1631                    max: (*max).map(|max| max.into()),
1632                })
1633            })
1634            .try_collect()?;
1635        let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1636        let args =
1637            schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1638        let query = schema::coins::CoinsToSpendQuery::build(args);
1639
1640        let coins_per_asset = self
1641            .query(query)
1642            .await?
1643            .coins_to_spend
1644            .into_iter()
1645            .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1646            .collect::<Vec<_>>();
1647        Ok(coins_per_asset)
1648    }
1649
1650    pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1651        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1652            id: (*id).into(),
1653        });
1654        let contract = self.query(query).await?.contract.map(Into::into);
1655        Ok(contract)
1656    }
1657
1658    pub async fn contract_balance(
1659        &self,
1660        id: &ContractId,
1661        asset: Option<&AssetId>,
1662    ) -> io::Result<u64> {
1663        let asset_id: schema::AssetId = match asset {
1664            Some(asset) => (*asset).into(),
1665            None => schema::AssetId::default(),
1666        };
1667
1668        let query =
1669            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1670                id: (*id).into(),
1671                asset: asset_id,
1672            });
1673
1674        let balance: types::ContractBalance =
1675            self.query(query).await?.contract_balance.into();
1676        Ok(balance.amount)
1677    }
1678
1679    pub async fn balance(
1680        &self,
1681        owner: &Address,
1682        asset_id: Option<&AssetId>,
1683    ) -> io::Result<u128> {
1684        let owner: schema::Address = (*owner).into();
1685        let asset_id: schema::AssetId = match asset_id {
1686            Some(asset_id) => (*asset_id).into(),
1687            None => schema::AssetId::default(),
1688        };
1689        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1690        let balance: types::Balance = self.query(query).await?.balance.into();
1691        Ok(balance.amount)
1692    }
1693
1694    // Retrieve a page of balances by their owner
1695    pub async fn balances(
1696        &self,
1697        owner: &Address,
1698        request: PaginationRequest<String>,
1699    ) -> io::Result<PaginatedResult<types::Balance, String>> {
1700        let owner: schema::Address = (*owner).into();
1701        let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1702        let query = schema::balance::BalancesQuery::build(args);
1703
1704        let balances = self.query(query).await?.balances.into();
1705        Ok(balances)
1706    }
1707
1708    pub async fn contract_balances(
1709        &self,
1710        contract: &ContractId,
1711        request: PaginationRequest<String>,
1712    ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1713        let contract_id: schema::ContractId = (*contract).into();
1714        let args = ContractBalancesConnectionArgs::from((contract_id, request));
1715        let query = schema::contract::ContractBalancesQuery::build(args);
1716
1717        let balances = self.query(query).await?.contract_balances.into();
1718
1719        Ok(balances)
1720    }
1721
1722    // Retrieve a message by its nonce
1723    pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1724        let query = schema::message::MessageQuery::build(NonceArgs {
1725            nonce: (*nonce).into(),
1726        });
1727        let message = self.query(query).await?.message.map(Into::into);
1728        Ok(message)
1729    }
1730
1731    pub async fn messages(
1732        &self,
1733        owner: Option<&Address>,
1734        request: PaginationRequest<String>,
1735    ) -> io::Result<PaginatedResult<types::Message, String>> {
1736        let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1737        let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1738        let query = schema::message::OwnedMessageQuery::build(args);
1739
1740        let messages = self.query(query).await?.messages.into();
1741
1742        Ok(messages)
1743    }
1744
1745    pub async fn contract_info(
1746        &self,
1747        contract: &ContractId,
1748    ) -> io::Result<Option<types::Contract>> {
1749        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1750            id: (*contract).into(),
1751        });
1752        let contract_info = self.query(query).await?.contract.map(Into::into);
1753        Ok(contract_info)
1754    }
1755
1756    pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1757        let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1758            nonce: (*nonce).into(),
1759        });
1760        let status = self.query(query).await?.message_status.into();
1761
1762        Ok(status)
1763    }
1764
1765    /// Request a merkle proof of an output message.
1766    pub async fn message_proof(
1767        &self,
1768        transaction_id: &TxId,
1769        nonce: &Nonce,
1770        commit_block_id: Option<&BlockId>,
1771        commit_block_height: Option<BlockHeight>,
1772    ) -> io::Result<types::MessageProof> {
1773        let transaction_id: TransactionId = (*transaction_id).into();
1774        let nonce: schema::Nonce = (*nonce).into();
1775        let commit_block_id: Option<schema::BlockId> =
1776            commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1777        let commit_block_height = commit_block_height.map(Into::into);
1778        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1779            transaction_id,
1780            nonce,
1781            commit_block_id,
1782            commit_block_height,
1783        });
1784        let proof = self.query(query).await?.message_proof.try_into()?;
1785        Ok(proof)
1786    }
1787
1788    pub async fn relayed_transaction_status(
1789        &self,
1790        id: &Bytes32,
1791    ) -> io::Result<Option<RelayedTransactionStatus>> {
1792        let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1793            RelayedTransactionStatusArgs {
1794                id: id.to_owned().into(),
1795            },
1796        );
1797        let status = self
1798            .query(query)
1799            .await?
1800            .relayed_transaction_status
1801            .map(|status| status.try_into())
1802            .transpose()?;
1803        Ok(status)
1804    }
1805
1806    pub async fn asset_info(
1807        &self,
1808        asset_id: &AssetId,
1809    ) -> io::Result<Option<AssetDetail>> {
1810        let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1811            id: (*asset_id).into(),
1812        });
1813        let asset_info = self.query(query).await?.asset_details.map(Into::into);
1814        Ok(asset_info)
1815    }
1816}
1817
1818#[cfg(any(test, feature = "test-helpers"))]
1819impl FuelClient {
1820    pub async fn transparent_transaction(
1821        &self,
1822        id: &TxId,
1823    ) -> io::Result<Option<types::TransactionType>> {
1824        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1825
1826        let transaction = self.query(query).await?.transaction;
1827
1828        Ok(transaction
1829            .map(|tx| {
1830                let response: TransactionResponse = tx.try_into()?;
1831                Ok::<_, ConversionError>(response.transaction)
1832            })
1833            .transpose()?)
1834    }
1835}
1836
1837#[cfg(feature = "rpc")]
1838impl FuelClient {
1839    fn rpc_client(&self) -> io::Result<ProtoBlockAggregatorClient<Channel>> {
1840        self.rpc_client
1841            .clone()
1842            .ok_or(io::Error::other("RPC client not initialized"))
1843    }
1844
1845    pub async fn get_block_range(
1846        &self,
1847        start: BlockHeight,
1848        end: BlockHeight,
1849    ) -> io::Result<
1850        impl Stream<
1851            Item = io::Result<(
1852                fuel_core_types::blockchain::block::Block,
1853                Vec<Vec<Receipt>>,
1854            )>,
1855        >,
1856    > {
1857        let request = ProtoBlockRangeRequest {
1858            start: *start,
1859            end: *end,
1860        };
1861
1862        let stream = self
1863            .rpc_client()?
1864            .get_block_range(request)
1865            .await
1866            .map_err(io::Error::other)?
1867            .into_inner()
1868            .then(|res| {
1869                let maybe_aws_client = self.aws_client.clone();
1870                let http_client = self.http_client.clone();
1871                async move {
1872                    let maybe_aws_client = maybe_aws_client.clone();
1873                    let resp =
1874                        res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
1875                    Self::convert_block_response(resp, maybe_aws_client, http_client)
1876                        .await
1877                }
1878            });
1879        Ok(stream)
1880    }
1881
1882    async fn convert_block_response(
1883        resp: BlockResponse,
1884        s3_client: AWSClientManager,
1885        http_client: reqwest::Client,
1886    ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec<Vec<Receipt>>)> {
1887        let payload = resp
1888            .payload
1889            .ok_or(io::Error::other("No RPC payload for `BlockResponse`"))?;
1890        match payload {
1891            Payload::Literal(_) => {
1892                // Should never happen, as we don't return blocks as literal payloads
1893                Err(io::Error::other("Literal payloads are not supported yet"))
1894            }
1895            Payload::Bytes(bytes) => {
1896                let proto_block =
1897                    ProtoBlock::decode(bytes.as_slice()).map_err(io::Error::other)?;
1898                fuel_block_from_protobuf(proto_block).map_err(|e| {
1899                    io::Error::other(format!(
1900                        "Failed to convert RPC block to internal block: {e:?}"
1901                    ))
1902                })
1903            }
1904            Payload::Remote(remote) => {
1905                let RemoteBlockResponse { location } = remote;
1906                match location {
1907                    Some(Location::S3(s3)) => {
1908                        let RemoteS3Bucket {
1909                            bucket,
1910                            key,
1911                            endpoint,
1912                            requester_pays,
1913                        } = s3;
1914                        let (body, content_encoding) = Self::get_block_from_s3_bucket(
1915                            s3_client,
1916                            &endpoint,
1917                            &bucket,
1918                            &key,
1919                            requester_pays,
1920                        )
1921                        .await?;
1922
1923                        let block_bytes = Self::decode_body_by_content_encoding(
1924                            body.as_ref(),
1925                            content_encoding.as_deref(),
1926                        )?;
1927                        Self::decode_remote_protobuf_block(&block_bytes)
1928                    }
1929                    Some(Location::Http(http)) => {
1930                        let (body, content_encoding) =
1931                            Self::get_block_from_http(&http_client, &http).await?;
1932                        let block_bytes = Self::decode_body_by_content_encoding(
1933                            body.as_ref(),
1934                            content_encoding.as_deref(),
1935                        )?;
1936                        Self::decode_remote_protobuf_block(&block_bytes)
1937                    }
1938                    None => {
1939                        Err(io::Error::other("Remote block response missing location"))
1940                    }
1941                }
1942            }
1943        }
1944    }
1945
1946    /// Decodes the object body using a `Content-Encoding` value from HTTP headers or S3 object
1947    /// metadata. Missing or empty means identity (raw protobuf bytes). Codings are listed in
1948    /// application order; this reverses to decode from the outermost coding inward (RFC 9110).
1949    fn decode_body_by_content_encoding(
1950        body: &[u8],
1951        content_encoding: Option<&str>,
1952    ) -> io::Result<Vec<u8>> {
1953        let Some(header) = content_encoding.map(str::trim).filter(|s| !s.is_empty())
1954        else {
1955            return Ok(body.to_vec());
1956        };
1957        let tokens: Vec<&str> = header
1958            .split(',')
1959            .map(str::trim)
1960            .filter(|t| !t.is_empty())
1961            .collect();
1962        if tokens.is_empty() {
1963            return Ok(body.to_vec());
1964        }
1965        let mut data = body.to_vec();
1966        for &token in tokens.iter().rev() {
1967            let lower = token.trim().to_ascii_lowercase();
1968            if lower.is_empty() || lower == "identity" {
1969                continue;
1970            }
1971            data = Self::decode_one_content_encoding_layer(&data, &lower)?;
1972        }
1973        Ok(data)
1974    }
1975
1976    fn decode_one_content_encoding_layer(
1977        data: &[u8],
1978        token_lower: &str,
1979    ) -> io::Result<Vec<u8>> {
1980        match token_lower {
1981            "gzip" | "x-gzip" => Self::gunzip_remote_block_bytes(data),
1982            "deflate" => Self::inflate_deflate_remote_block_bytes(data),
1983            _ => Err(io::Error::other(format!(
1984                "unsupported Content-Encoding: {token_lower}"
1985            ))),
1986        }
1987    }
1988
1989    fn gunzip_remote_block_bytes(data: &[u8]) -> io::Result<Vec<u8>> {
1990        let mut decoder = GzDecoder::new(data);
1991        let mut output = Vec::new();
1992        decoder
1993            .read_to_end(&mut output)
1994            .map_err(|e| io::Error::other(e.to_string()))?;
1995        Ok(output)
1996    }
1997
1998    /// Decodes a `Content-Encoding: deflate` body. RFC 9110 specifies zlib-wrapped data (RFC 1950)
1999    /// for this coding, but in practice most CDNs and HTTP servers (nginx, Apache, CloudFront)
2000    /// emit raw deflate (RFC 1951) instead. Mirror mainstream browser/HTTP-client behavior: try
2001    /// the spec-compliant zlib format first, then fall back to raw deflate so we interoperate
2002    /// with either variant when a CDN fronts the block object store.
2003    fn inflate_deflate_remote_block_bytes(data: &[u8]) -> io::Result<Vec<u8>> {
2004        let mut output = Vec::new();
2005        let mut zlib = ZlibDecoder::new(Cursor::new(data));
2006        if zlib.read_to_end(&mut output).is_ok() {
2007            return Ok(output);
2008        }
2009
2010        output.clear();
2011        let mut raw = DeflateDecoder::new(data);
2012        raw.read_to_end(&mut output)
2013            .map_err(|e| io::Error::other(e.to_string()))?;
2014        Ok(output)
2015    }
2016
2017    fn decode_remote_protobuf_block(
2018        block_bytes: &[u8],
2019    ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec<Vec<Receipt>>)> {
2020        let block = ProtoBlock::decode(block_bytes)
2021            .map_err(|e| io::Error::other(format!("Failed to decode block: {e}")))?;
2022        fuel_block_from_protobuf(block).map_err(|e| {
2023            io::Error::other(format!(
2024                "Failed to convert RPC block to internal block: {e:?}"
2025            ))
2026        })
2027    }
2028
2029    async fn get_block_from_http(
2030        client: &reqwest::Client,
2031        http: &RemoteHttpEndpoint,
2032    ) -> io::Result<(prost::bytes::Bytes, Option<String>)> {
2033        let mut req = client.get(http.endpoint.clone());
2034        for (k, v) in &http.headers {
2035            req = req.header(k, v);
2036        }
2037        let res = req.send().await.map_err(|e| {
2038            io::Error::other(format!("HTTP fetch for block object failed: {e}"))
2039        })?;
2040        if !res.status().is_success() {
2041            return Err(io::Error::other(format!(
2042                "HTTP {} when fetching block object",
2043                res.status()
2044            )));
2045        }
2046        let content_encoding = res
2047            .headers()
2048            .get(reqwest::header::CONTENT_ENCODING)
2049            .and_then(|v| v.to_str().ok())
2050            .map(std::string::ToString::to_string);
2051        let bytes = res
2052            .bytes()
2053            .await
2054            .map_err(|e| io::Error::other(format!("Reading block object body: {e}")))?;
2055        Ok((bytes, content_encoding))
2056    }
2057
2058    async fn get_block_from_s3_bucket(
2059        s3_client: AWSClientManager,
2060        url: &Option<String>,
2061        bucket: &str,
2062        key: &str,
2063        requester_pays: bool,
2064    ) -> io::Result<(prost::bytes::Bytes, Option<String>)> {
2065        use aws_sdk_s3::types::RequestPayer;
2066        tracing::debug!("getting block from bucket: {} with key {}", bucket, key);
2067        let mut req = s3_client
2068            .get_client(url)
2069            .await?
2070            .get_object()
2071            .bucket(bucket)
2072            .key(key);
2073        if requester_pays {
2074            req = req.request_payer(RequestPayer::Requester);
2075        }
2076        let obj = req.send().await.map_err(|e| {
2077            io::Error::other(format!("Failed to get object from S3: {e:?}"))
2078        })?;
2079        let content_encoding =
2080            obj.content_encoding().map(std::string::ToString::to_string);
2081        let bytes = obj
2082            .body
2083            .collect()
2084            .await
2085            .map_err(|e| {
2086                io::Error::other(format!("Failed to get object from S3: {e:?}"))
2087            })?
2088            .into_bytes();
2089        Ok((bytes, content_encoding))
2090    }
2091
2092    async fn new_aws_client(url: &Option<String>) -> AWSClient {
2093        let credentials = DefaultCredentialsChain::builder().build().await;
2094        let mut config_builder = aws_config::defaults(BehaviorVersion::latest())
2095            .credentials_provider(credentials);
2096        if let Some(url) = url {
2097            config_builder = config_builder.endpoint_url(url);
2098        }
2099        let sdk_config = config_builder.load().await;
2100        let builder = aws_sdk_s3::config::Builder::from(&sdk_config);
2101        let config = builder.force_path_style(true).build();
2102        AWSClient::from_conf(config)
2103    }
2104
2105    /// Used to get the synced height of the block aggregator,
2106    /// as it doesn't always match the latest block height
2107    pub async fn get_aggregated_height(&self) -> io::Result<BlockHeight> {
2108        let request = ProtoBlockHeightRequest {};
2109        let height = self
2110            .rpc_client()?
2111            .get_synced_block_height(request)
2112            .await
2113            .map_err(io::Error::other)?
2114            .into_inner()
2115            .height
2116            .ok_or(io::Error::other("No height in RPC response"))?;
2117        Ok(BlockHeight::from(height))
2118    }
2119
2120    pub async fn new_block_subscription(
2121        &self,
2122    ) -> io::Result<
2123        impl Stream<
2124            Item = io::Result<(
2125                fuel_core_types::blockchain::block::Block,
2126                Vec<Vec<Receipt>>,
2127            )>,
2128        >,
2129    > {
2130        let request = ProtoNewBlockSubscriptionRequest {};
2131        let stream = self
2132            .rpc_client()?
2133            .new_block_subscription(request)
2134            .await
2135            .map_err(io::Error::other)?
2136            .into_inner()
2137            .then(|res| {
2138                let maybe_aws_client = self.aws_client.clone();
2139                let http_client = self.http_client.clone();
2140                async move {
2141                    let maybe_aws_client = maybe_aws_client.clone();
2142                    let resp =
2143                        res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
2144                    Self::convert_block_response(resp, maybe_aws_client, http_client)
2145                        .await
2146                }
2147            });
2148        Ok(stream)
2149    }
2150}
2151
2152#[cfg(test)]
2153mod tests {
2154    use super::*;
2155
2156    #[test]
2157    fn with_urls_normalizes_urls_to_graphql_endpoint() {
2158        // Given
2159        let urls = &["http://localhost:8080", "http://example.com:4000"];
2160
2161        // When
2162        let client = FuelClient::with_urls(urls).expect("should create client");
2163
2164        // Then
2165        assert_eq!(
2166            client.get_default_url().as_str(),
2167            "http://localhost:8080/v1/graphql"
2168        );
2169    }
2170
2171    #[test]
2172    fn with_urls_adds_http_scheme_if_missing() {
2173        // Given
2174        let urls = &["localhost:8080"];
2175
2176        // When
2177        let client = FuelClient::with_urls(urls).expect("should create client");
2178
2179        // Then
2180        assert_eq!(
2181            client.get_default_url().as_str(),
2182            "http://localhost:8080/v1/graphql"
2183        );
2184    }
2185
2186    #[test]
2187    fn with_urls_overwrites_existing_path() {
2188        // Given - URLs that already have some path
2189        let urls = &["http://localhost:8080/some/path", "http://example.com/api"];
2190
2191        // When
2192        let client = FuelClient::with_urls(urls).expect("should create client");
2193
2194        // Then - path should be normalized to /v1/graphql
2195        assert_eq!(
2196            client.get_default_url().as_str(),
2197            "http://localhost:8080/v1/graphql"
2198        );
2199    }
2200
2201    #[test]
2202    fn new_and_with_urls_produce_same_url() {
2203        // Given
2204        let url = "http://localhost:8080";
2205
2206        // When
2207        let client_new = FuelClient::new(url).expect("should create client via new");
2208        let client_with_urls =
2209            FuelClient::with_urls(&[url]).expect("should create client via with_urls");
2210
2211        // Then
2212        assert_eq!(
2213            client_new.get_default_url().as_str(),
2214            client_with_urls.get_default_url().as_str()
2215        );
2216    }
2217
2218    #[test]
2219    fn is_legacy_node_version_detection() {
2220        // Pre-0.48.0 nodes require the legacy query path.
2221        assert!(is_legacy_node("0.47.3"));
2222        assert!(is_legacy_node("0.47.0"));
2223        assert!(is_legacy_node("0.1.0"));
2224        assert!(is_legacy_node("0.0.1"));
2225
2226        // 0.48.0 and above use the standard query path.
2227        assert!(!is_legacy_node("0.48.0"));
2228        assert!(!is_legacy_node("0.49.0"));
2229        assert!(!is_legacy_node("0.100.0"));
2230        assert!(!is_legacy_node("1.0.0"));
2231        assert!(!is_legacy_node("2.47.0"));
2232    }
2233}