fuel_core_client/
client.rs

1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4    client::{
5        schema::{
6            Tai64Timestamp,
7            TransactionId,
8            block::BlockByHeightArgs,
9            coins::{
10                ExcludeInput,
11                SpendQueryElementInput,
12            },
13            contract::ContractBalanceQueryArgs,
14            gas_price::EstimateGasPrice,
15            message::MessageStatusArgs,
16            relayed_tx::RelayedTransactionStatusArgs,
17            tx::{
18                DryRunArg,
19                TxWithEstimatedPredicatesArg,
20            },
21        },
22        types::{
23            RelayedTransactionStatus,
24            asset::AssetDetail,
25            gas_price::LatestGasPrice,
26            message::MessageStatus,
27            primitives::{
28                Address,
29                AssetId,
30                BlockId,
31                ContractId,
32                UtxoId,
33            },
34            upgrades::StateTransitionBytecode,
35        },
36    },
37    reqwest_ext::{
38        FuelGraphQlResponse,
39        FuelOperation,
40        ReqwestExt,
41    },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46    BASE64_STANDARD,
47    Engine as _,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52    Id,
53    MutationBuilder,
54    Operation,
55    QueryBuilder,
56};
57use fuel_core_types::{
58    blockchain::header::{
59        ConsensusParametersVersion,
60        StateTransitionBytecodeVersion,
61    },
62    fuel_asm::{
63        Instruction,
64        Word,
65    },
66    fuel_tx::{
67        BlobId,
68        Bytes32,
69        ConsensusParameters,
70        Receipt,
71        Transaction,
72        TxId,
73    },
74    fuel_types::{
75        self,
76        BlockHeight,
77        Nonce,
78        canonical::Serialize,
79    },
80    services::executor::{
81        StorageReadReplayEvent,
82        TransactionExecutionStatus,
83    },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87    Stream,
88    StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92    PageDirection,
93    PaginatedResult,
94    PaginationRequest,
95};
96use schema::{
97    Bytes,
98    ContinueTx,
99    ContinueTxArgs,
100    ConversionError,
101    HexString,
102    IdArg,
103    MemoryArgs,
104    RegisterArgs,
105    RunResult,
106    SetBreakpoint,
107    SetBreakpointArgs,
108    SetSingleStepping,
109    SetSingleSteppingArgs,
110    StartTx,
111    StartTxArgs,
112    U32,
113    U64,
114    assets::AssetInfoArg,
115    balance::BalanceArgs,
116    blob::BlobByIdArgs,
117    block::BlockByIdArgs,
118    coins::{
119        CoinByIdArgs,
120        CoinsConnectionArgs,
121    },
122    contract::{
123        ContractBalancesConnectionArgs,
124        ContractByIdArgs,
125    },
126    da_compressed::DaCompressedBlockByHeightArgs,
127    gas_price::BlockHorizonArgs,
128    storage_read_replay::{
129        StorageReadReplay,
130        StorageReadReplayArgs,
131    },
132    tx::{
133        AssembleTxArg,
134        TransactionsByOwnerConnectionArgs,
135        TxArg,
136        TxIdArgs,
137    },
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142    convert::TryInto,
143    io::{
144        self,
145        ErrorKind,
146    },
147    net,
148    str::{
149        self,
150        FromStr,
151    },
152    sync::{
153        Arc,
154        Mutex,
155    },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160    TransactionResponse,
161    TransactionStatus,
162    assemble_tx::{
163        AssembleTransactionResult,
164        RequiredBalance,
165    },
166};
167
168use self::schema::{
169    block::ProduceBlockArgs,
170    message::{
171        MessageProofArgs,
172        NonceArgs,
173    },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184/// Error occurring during interaction with the FuelClient
185// anyhow::Error is wrapped inside a custom Error type,
186// so that we can specific error variants in the future.
187pub enum Error {
188    /// Unknown or not expected(by architecture) error.
189    #[from]
190    Other(anyhow::Error),
191}
192
193/// Consistency policy for the [`FuelClient`] to define the strategy
194/// for the required height feature.
195#[derive(Debug)]
196pub enum ConsistencyPolicy {
197    /// Automatically fetch the next block height from the response and
198    /// use it as an input to the next query to guarantee consistency
199    /// of the results for the queries.
200    Auto {
201        /// The required block height for the queries.
202        height: Arc<Mutex<Option<BlockHeight>>>,
203    },
204    /// Use manually sets the block height for all queries
205    /// via the [`FuelClient::with_required_fuel_block_height`].
206    Manual {
207        /// The required block height for the queries.
208        height: Option<BlockHeight>,
209    },
210}
211
212impl Clone for ConsistencyPolicy {
213    fn clone(&self) -> Self {
214        match self {
215            Self::Auto { height } => Self::Auto {
216                // We don't want to share the same mutex between the different
217                // instances of the `FuelClient`.
218                height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219            },
220            Self::Manual { height } => Self::Manual { height: *height },
221        }
222    }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227    current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228    current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232    fn clone(&self) -> Self {
233        Self {
234            current_stf_version: Arc::new(Mutex::new(
235                self.current_stf_version.lock().ok().and_then(|v| *v),
236            )),
237            current_consensus_parameters_version: Arc::new(Mutex::new(
238                self.current_consensus_parameters_version
239                    .lock()
240                    .ok()
241                    .and_then(|v| *v),
242            )),
243        }
244    }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249    client: reqwest::Client,
250    #[cfg(feature = "subscriptions")]
251    cookie: std::sync::Arc<reqwest::cookie::Jar>,
252    url: reqwest::Url,
253    require_height: ConsistencyPolicy,
254    chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258    type Err = anyhow::Error;
259
260    fn from_str(str: &str) -> Result<Self, Self::Err> {
261        let mut raw_url = str.to_string();
262        if !raw_url.starts_with("http") {
263            raw_url = format!("http://{raw_url}");
264        }
265
266        let mut url = reqwest::Url::parse(&raw_url)
267            .map_err(anyhow::Error::msg)
268            .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269        url.set_path("/v1/graphql");
270
271        #[cfg(feature = "subscriptions")]
272        {
273            let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274            let client = reqwest::Client::builder()
275                .cookie_provider(cookie.clone())
276                .build()
277                .map_err(anyhow::Error::msg)?;
278            Ok(Self {
279                client,
280                cookie,
281                url,
282                require_height: ConsistencyPolicy::Auto {
283                    height: Arc::new(Mutex::new(None)),
284                },
285                chain_state_info: Default::default(),
286            })
287        }
288
289        #[cfg(not(feature = "subscriptions"))]
290        {
291            let client = reqwest::Client::new();
292            Ok(Self {
293                client,
294                url,
295                require_height: ConsistencyPolicy::Auto {
296                    height: Arc::new(Mutex::new(None)),
297                },
298                chain_state_info: Default::default(),
299            })
300        }
301    }
302}
303
304impl<S> From<S> for FuelClient
305where
306    S: Into<net::SocketAddr>,
307{
308    fn from(socket: S) -> Self {
309        format!("http://{}", socket.into())
310            .as_str()
311            .parse()
312            .unwrap()
313    }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317    let e = errors
318        .into_iter()
319        .fold(String::from("Response errors"), |mut s, e| {
320            s.push_str("; ");
321            s.push_str(e.as_str());
322            s
323        });
324    io::Error::other(e)
325}
326
327impl FuelClient {
328    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329        Self::from_str(url.as_ref())
330    }
331
332    pub fn with_required_fuel_block_height(
333        &mut self,
334        new_height: Option<BlockHeight>,
335    ) -> &mut Self {
336        match &mut self.require_height {
337            ConsistencyPolicy::Auto { height } => {
338                *height.lock().expect("Mutex poisoned") = new_height;
339            }
340            ConsistencyPolicy::Manual { height } => {
341                *height = new_height;
342            }
343        }
344        self
345    }
346
347    pub fn use_manual_consistency_policy(
348        &mut self,
349        height: Option<BlockHeight>,
350    ) -> &mut Self {
351        self.require_height = ConsistencyPolicy::Manual { height };
352        self
353    }
354
355    pub fn required_block_height(&self) -> Option<BlockHeight> {
356        match &self.require_height {
357            ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358            ConsistencyPolicy::Manual { height } => *height,
359        }
360    }
361
362    fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363        if let Some(current_sft_version) = response
364            .extensions
365            .as_ref()
366            .and_then(|e| e.current_stf_version)
367            && let Ok(mut c) = self.chain_state_info.current_stf_version.lock()
368        {
369            *c = Some(current_sft_version);
370        }
371
372        if let Some(current_consensus_parameters_version) = response
373            .extensions
374            .as_ref()
375            .and_then(|e| e.current_consensus_parameters_version)
376            && let Ok(mut c) = self
377                .chain_state_info
378                .current_consensus_parameters_version
379                .lock()
380        {
381            *c = Some(current_consensus_parameters_version);
382        }
383
384        let inner_required_height = match &self.require_height {
385            ConsistencyPolicy::Auto { height } => Some(height.clone()),
386            ConsistencyPolicy::Manual { .. } => None,
387        };
388
389        if let Some(inner_required_height) = inner_required_height
390            && let Some(current_fuel_block_height) = response
391                .extensions
392                .as_ref()
393                .and_then(|e| e.current_fuel_block_height)
394        {
395            let mut lock = inner_required_height.lock().expect("Mutex poisoned");
396
397            if current_fuel_block_height >= lock.unwrap_or_default() {
398                *lock = Some(current_fuel_block_height);
399            }
400        }
401    }
402
403    /// Send the GraphQL query to the client.
404    pub async fn query<ResponseData, Vars>(
405        &self,
406        q: Operation<ResponseData, Vars>,
407    ) -> io::Result<ResponseData>
408    where
409        Vars: serde::Serialize,
410        ResponseData: serde::de::DeserializeOwned + 'static,
411    {
412        let required_fuel_block_height = self.required_block_height();
413        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
414        let response = self
415            .client
416            .post(self.url.clone())
417            .run_fuel_graphql(fuel_operation)
418            .await
419            .map_err(io::Error::other)?;
420
421        self.decode_response(response)
422    }
423
424    fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
425    where
426        R: serde::de::DeserializeOwned + 'static,
427    {
428        self.update_chain_state_info(&response);
429
430        if response
431            .extensions
432            .as_ref()
433            .and_then(|e| e.fuel_block_height_precondition_failed)
434            == Some(true)
435        {
436            return Err(io::Error::other("The required block height was not met"));
437        }
438
439        let response = response.response;
440
441        match (response.data, response.errors) {
442            (Some(d), _) => Ok(d),
443            (_, Some(e)) => Err(from_strings_errors_to_std_error(
444                e.into_iter().map(|e| e.message).collect(),
445            )),
446            _ => Err(io::Error::other("Invalid response")),
447        }
448    }
449
450    #[tracing::instrument(skip_all)]
451    #[cfg(feature = "subscriptions")]
452    async fn subscribe<ResponseData, Vars>(
453        &self,
454        q: StreamingOperation<ResponseData, Vars>,
455    ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
456    where
457        Vars: serde::Serialize,
458        ResponseData: serde::de::DeserializeOwned + 'static + Send,
459    {
460        use core::ops::Deref;
461        use eventsource_client as es;
462        use hyper_rustls as _;
463        use reqwest::cookie::CookieStore;
464        let mut url = self.url.clone();
465        url.set_path("/v1/graphql-sub");
466
467        let required_fuel_block_height = self.required_block_height();
468        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
469
470        let json_query = serde_json::to_string(&fuel_operation)?;
471        let mut client_builder = es::ClientBuilder::for_url(url.as_str())
472            .map_err(|e| io::Error::other(format!("Failed to start client {e:?}")))?
473            .body(json_query)
474            .method("POST".to_string())
475            .header("content-type", "application/json")
476            .map_err(|e| {
477                io::Error::other(format!("Failed to add header to client {e:?}"))
478            })?;
479        if let Some(password) = url.password() {
480            let username = url.username();
481            let credentials = format!("{}:{}", username, password);
482            let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
483            client_builder = client_builder
484                .header("Authorization", &authorization)
485                .map_err(|e| {
486                    io::Error::other(format!("Failed to add header to client {e:?}"))
487                })?;
488        }
489
490        if let Some(value) = self.cookie.deref().cookies(&self.url) {
491            let value = value.to_str().map_err(|e| {
492                io::Error::other(format!("Unable convert header value to string {e:?}"))
493            })?;
494            client_builder = client_builder
495                .header(reqwest::header::COOKIE.as_str(), value)
496                .map_err(|e| {
497                    io::Error::other(format!(
498                        "Failed to add header from `reqwest` to client {e:?}"
499                    ))
500                })?;
501        }
502
503        let client = client_builder.build_with_conn(
504            hyper_rustls::HttpsConnectorBuilder::new()
505                .with_webpki_roots()
506                .https_or_http()
507                .enable_http1()
508                .build(),
509        );
510
511        let mut last = None;
512
513        enum Event<ResponseData> {
514            Connected,
515            ResponseData(ResponseData),
516        }
517
518        let mut init_stream = es::Client::stream(&client)
519            .take_while(|result| {
520                futures::future::ready(!matches!(result, Err(es::Error::Eof)))
521            })
522            .filter_map(move |result| {
523                tracing::debug!("Got result: {result:?}");
524                let r = match result {
525                    Ok(es::SSE::Event(es::Event { data, .. })) => {
526                        match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
527                            &data,
528                        ) {
529                            Ok(resp) => {
530                                match self.decode_response(resp) {
531                                    Ok(resp) => {
532                                        match last.replace(data) {
533                                            // Remove duplicates
534                                            Some(l)
535                                                if l == *last.as_ref().expect(
536                                                    "Safe because of the replace above",
537                                                ) =>
538                                            {
539                                                None
540                                            }
541                                            _ => Some(Ok(Event::ResponseData(resp))),
542                                        }
543                                    }
544                                    Err(e) => Some(Err(io::Error::other(format!(
545                                        "Decode error: {e:?}"
546                                    )))),
547                                }
548                            }
549                            Err(e) => {
550                                Some(Err(io::Error::other(format!("Json error: {e:?}"))))
551                            }
552                        }
553                    }
554                    Ok(es::SSE::Connected(_)) => Some(Ok(Event::Connected)),
555                    Ok(_) => None,
556                    Err(e) => {
557                        Some(Err(io::Error::other(format!("Graphql error: {e:?}"))))
558                    }
559                };
560                futures::future::ready(r)
561            });
562
563        let event = init_stream.next().await;
564        let stream_with_resp = init_stream.filter_map(|result| async move {
565            match result {
566                Ok(Event::Connected) => None,
567                Ok(Event::ResponseData(resp)) => Some(Ok(resp)),
568                Err(error) => Some(Err(error)),
569            }
570        });
571
572        let stream = match event {
573            Some(Ok(Event::Connected)) => {
574                tracing::debug!("Subscription connected");
575                stream_with_resp.boxed()
576            }
577            Some(Ok(Event::ResponseData(resp))) => {
578                tracing::debug!("Subscription returned response");
579                let joined_stream = futures::stream::once(async move { Ok(resp) })
580                    .chain(stream_with_resp);
581                joined_stream.boxed()
582            }
583            Some(Err(e)) => return Err(e),
584            None => {
585                return Err(io::Error::other("Subscription stream ended unexpectedly"));
586            }
587        };
588
589        Ok(stream)
590    }
591
592    pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
593        self.chain_state_info
594            .current_stf_version
595            .lock()
596            .ok()
597            .and_then(|value| *value)
598    }
599
600    pub fn latest_consensus_parameters_version(
601        &self,
602    ) -> Option<ConsensusParametersVersion> {
603        self.chain_state_info
604            .current_consensus_parameters_version
605            .lock()
606            .ok()
607            .and_then(|value| *value)
608    }
609
610    pub async fn health(&self) -> io::Result<bool> {
611        let query = schema::Health::build(());
612        self.query(query).await.map(|r| r.health)
613    }
614
615    pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
616        let query = schema::node_info::QueryNodeInfo::build(());
617        self.query(query).await.map(|r| r.node_info.into())
618    }
619
620    pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
621        let query = schema::gas_price::QueryLatestGasPrice::build(());
622        self.query(query).await.map(|r| r.latest_gas_price.into())
623    }
624
625    pub async fn estimate_gas_price(
626        &self,
627        block_horizon: u32,
628    ) -> io::Result<EstimateGasPrice> {
629        let args = BlockHorizonArgs {
630            block_horizon: Some(block_horizon.into()),
631        };
632        let query = schema::gas_price::QueryEstimateGasPrice::build(args);
633        self.query(query).await.map(|r| r.estimate_gas_price)
634    }
635
636    #[cfg(feature = "std")]
637    pub async fn connected_peers_info(
638        &self,
639    ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
640        let query = schema::node_info::QueryPeersInfo::build(());
641        self.query(query)
642            .await
643            .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
644    }
645
646    pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
647        let query = schema::chain::ChainQuery::build(());
648        self.query(query).await.and_then(|r| {
649            let result = r.chain.try_into()?;
650            Ok(result)
651        })
652    }
653
654    pub async fn consensus_parameters(
655        &self,
656        version: i32,
657    ) -> io::Result<Option<ConsensusParameters>> {
658        let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
659        let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
660
661        let result = self
662            .query(query)
663            .await?
664            .consensus_parameters
665            .map(TryInto::try_into)
666            .transpose()?;
667
668        Ok(result)
669    }
670
671    pub async fn state_transition_byte_code_by_version(
672        &self,
673        version: i32,
674    ) -> io::Result<Option<StateTransitionBytecode>> {
675        let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
676        let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
677
678        let result = self
679            .query(query)
680            .await?
681            .state_transition_bytecode_by_version
682            .map(TryInto::try_into)
683            .transpose()?;
684
685        Ok(result)
686    }
687
688    pub async fn state_transition_byte_code_by_root(
689        &self,
690        root: Bytes32,
691    ) -> io::Result<Option<StateTransitionBytecode>> {
692        let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
693            root: HexString(Bytes(root.to_vec())),
694        };
695        let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
696
697        let result = self
698            .query(query)
699            .await?
700            .state_transition_bytecode_by_root
701            .map(TryInto::try_into)
702            .transpose()?;
703
704        Ok(result)
705    }
706
707    /// Default dry run, matching the exact configuration as the node
708    pub async fn dry_run(
709        &self,
710        txs: &[Transaction],
711    ) -> io::Result<Vec<TransactionExecutionStatus>> {
712        self.dry_run_opt(txs, None, None, None).await
713    }
714
715    /// Dry run with options to override the node behavior
716    pub async fn dry_run_opt(
717        &self,
718        txs: &[Transaction],
719        // Disable utxo input checks (exists, unspent, and valid signature)
720        utxo_validation: Option<bool>,
721        gas_price: Option<u64>,
722        at_height: Option<BlockHeight>,
723    ) -> io::Result<Vec<TransactionExecutionStatus>> {
724        let txs = txs
725            .iter()
726            .map(|tx| HexString(Bytes(tx.to_bytes())))
727            .collect::<Vec<HexString>>();
728        let query: Operation<schema::tx::DryRun, DryRunArg> =
729            schema::tx::DryRun::build(DryRunArg {
730                txs,
731                utxo_validation,
732                gas_price: gas_price.map(|gp| gp.into()),
733                block_height: at_height.map(|bh| bh.into()),
734            });
735        let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
736        tx_statuses
737            .into_iter()
738            .map(|tx_status| tx_status.try_into().map_err(Into::into))
739            .collect()
740    }
741
742    /// Like `dry_run_opt`, but also returns the storage reads
743    pub async fn dry_run_opt_record_storage_reads(
744        &self,
745        txs: &[Transaction],
746        // Disable utxo input checks (exists, unspent, and valid signature)
747        utxo_validation: Option<bool>,
748        gas_price: Option<u64>,
749        at_height: Option<BlockHeight>,
750    ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
751        let txs = txs
752            .iter()
753            .map(|tx| HexString(Bytes(tx.to_bytes())))
754            .collect::<Vec<HexString>>();
755        let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
756            schema::tx::DryRunRecordStorageReads::build(DryRunArg {
757                txs,
758                utxo_validation,
759                gas_price: gas_price.map(|gp| gp.into()),
760                block_height: at_height.map(|bh| bh.into()),
761            });
762        let result = self
763            .query(query)
764            .await
765            .map(|r| r.dry_run_record_storage_reads)?;
766        let tx_statuses = result
767            .tx_statuses
768            .into_iter()
769            .map(|tx_status| tx_status.try_into().map_err(Into::into))
770            .collect::<io::Result<Vec<_>>>()?;
771        let storage_reads = result
772            .storage_reads
773            .into_iter()
774            .map(Into::into)
775            .collect::<Vec<_>>();
776        Ok((tx_statuses, storage_reads))
777    }
778
779    /// Get storage read replay for a block
780    pub async fn storage_read_replay(
781        &self,
782        height: &BlockHeight,
783    ) -> io::Result<Vec<StorageReadReplayEvent>> {
784        let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
785            StorageReadReplay::build(StorageReadReplayArgs {
786                height: (*height).into(),
787            });
788        Ok(self
789            .query(query)
790            .await
791            .map(|r| r.storage_read_replay)?
792            .into_iter()
793            .map(Into::into)
794            .collect())
795    }
796
797    /// Assembles the transaction based on the provided requirements.
798    /// The return transaction contains:
799    /// - Input coins to cover `required_balances`
800    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
801    /// - `Change` or `Destroy` outputs for all assets from the inputs
802    /// - `Variable` outputs in the case they are required during the execution
803    /// - `Contract` inputs and outputs in the case they are required during the execution
804    /// - Reserved witness slots for signed coins filled with `64` zeroes
805    /// - Set script gas limit(unless `script` is empty)
806    /// - Estimated predicates, if `estimate_predicates == true`
807    ///
808    /// Returns an error if:
809    /// - The number of required balances exceeds the maximum number of inputs allowed.
810    /// - The fee address index is out of bounds.
811    /// - The same asset has multiple change policies(either the receiver of
812    ///   the change is different, or one of the policies states about the destruction
813    ///   of the token while the other does not). The `Change` output from the transaction
814    ///   also count as a `ChangePolicy`.
815    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
816    /// - Required assets have multiple entries.
817    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
818    /// - If a constructed transaction breaks the rules defined by consensus parameters.
819    #[allow(clippy::too_many_arguments)]
820    pub async fn assemble_tx(
821        &self,
822        tx: &Transaction,
823        block_horizon: u32,
824        required_balances: Vec<RequiredBalance>,
825        fee_address_index: u16,
826        exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
827        estimate_predicates: bool,
828        reserve_gas: Option<u64>,
829    ) -> io::Result<AssembleTransactionResult> {
830        let tx = HexString(Bytes(tx.to_bytes()));
831        let block_horizon = block_horizon.into();
832
833        let required_balances: Vec<_> = required_balances
834            .into_iter()
835            .map(schema::tx::RequiredBalance::try_from)
836            .collect::<Result<Vec<_>, _>>()?;
837
838        let fee_address_index = fee_address_index.into();
839
840        let exclude_input = exclude.map(Into::into);
841
842        let reserve_gas = reserve_gas.map(U64::from);
843
844        let query_arg = AssembleTxArg {
845            tx,
846            block_horizon,
847            required_balances,
848            fee_address_index,
849            exclude_input,
850            estimate_predicates,
851            reserve_gas,
852        };
853
854        let query = schema::tx::AssembleTx::build(query_arg);
855        let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
856        Ok(assemble_tx_result.try_into()?)
857    }
858
859    /// Estimate predicates for the transaction
860    pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
861        let serialized_tx = tx.to_bytes();
862        let query = schema::tx::EstimatePredicates::build(TxArg {
863            tx: HexString(Bytes(serialized_tx)),
864        });
865        let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
866        let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
867        *tx = tx_with_predicate;
868        Ok(())
869    }
870
871    pub async fn submit(
872        &self,
873        tx: &Transaction,
874    ) -> io::Result<types::primitives::TransactionId> {
875        self.submit_opt(tx, None).await
876    }
877
878    pub async fn submit_opt(
879        &self,
880        tx: &Transaction,
881        estimate_predicates: Option<bool>,
882    ) -> io::Result<types::primitives::TransactionId> {
883        let tx = tx.clone().to_bytes();
884        let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
885            tx: HexString(Bytes(tx)),
886            estimate_predicates,
887        });
888
889        let id = self.query(query).await.map(|r| r.submit)?.id.into();
890        Ok(id)
891    }
892
893    /// Similar to [`Self::submit_and_await_commit_opt`], but with default options.
894    #[cfg(feature = "subscriptions")]
895    pub async fn submit_and_await_commit(
896        &self,
897        tx: &Transaction,
898    ) -> io::Result<TransactionStatus> {
899        self.submit_and_await_commit_opt(tx, None).await
900    }
901
902    /// Submit the transaction and wait for it either to be included in
903    /// a block or removed from `TxPool`.
904    ///
905    /// If `estimate_predicates` is set, the predicates will be estimated before
906    /// the transaction is inserted into transaction pool.
907    ///
908    /// This will wait forever if needed, so consider wrapping this call
909    /// with a `tokio::time::timeout`.
910    #[cfg(feature = "subscriptions")]
911    pub async fn submit_and_await_commit_opt(
912        &self,
913        tx: &Transaction,
914        estimate_predicates: Option<bool>,
915    ) -> io::Result<TransactionStatus> {
916        use cynic::SubscriptionBuilder;
917        let tx = tx.clone().to_bytes();
918        let s =
919            schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
920                tx: HexString(Bytes(tx)),
921                estimate_predicates,
922            });
923
924        let mut stream = self.subscribe(s).await?.map(
925            |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
926                let status: TransactionStatus = r?.submit_and_await.try_into()?;
927                Result::<_, io::Error>::Ok(status)
928            },
929        );
930
931        let status = stream.next().await.ok_or_else(|| {
932            io::Error::other("Failed to get status from the submission")
933        })??;
934
935        Ok(status)
936    }
937
938    /// Similar to [`Self::submit_and_await_commit`], but the status also contains transaction.
939    #[cfg(feature = "subscriptions")]
940    pub async fn submit_and_await_commit_with_tx(
941        &self,
942        tx: &Transaction,
943    ) -> io::Result<StatusWithTransaction> {
944        self.submit_and_await_commit_with_tx_opt(tx, None).await
945    }
946
947    /// Similar to [`Self::submit_and_await_commit_opt`], but the status also contains transaction.
948    #[cfg(feature = "subscriptions")]
949    pub async fn submit_and_await_commit_with_tx_opt(
950        &self,
951        tx: &Transaction,
952        estimate_predicates: Option<bool>,
953    ) -> io::Result<StatusWithTransaction> {
954        use cynic::SubscriptionBuilder;
955        let tx = tx.clone().to_bytes();
956        let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
957            TxWithEstimatedPredicatesArg {
958                tx: HexString(Bytes(tx)),
959                estimate_predicates,
960            },
961        );
962
963        let mut stream = self.subscribe(s).await?.map(
964            |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
965                let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
966                Result::<_, io::Error>::Ok(status)
967            },
968        );
969
970        let status = stream.next().await.ok_or_else(|| {
971            io::Error::other("Failed to get status from the submission")
972        })??;
973
974        Ok(status)
975    }
976
977    /// Similar to [`Self::submit_and_await_commit`], but includes all intermediate states.
978    #[cfg(feature = "subscriptions")]
979    pub async fn submit_and_await_status(
980        &self,
981        tx: &Transaction,
982    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
983        self.submit_and_await_status_opt(tx, None, None).await
984    }
985
986    /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
987    #[cfg(feature = "subscriptions")]
988    pub async fn submit_and_await_status_opt(
989        &self,
990        tx: &Transaction,
991        estimate_predicates: Option<bool>,
992        include_preconfirmation: Option<bool>,
993    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
994        use cynic::SubscriptionBuilder;
995        use schema::tx::SubmitAndAwaitStatusArg;
996        let tx = tx.clone().to_bytes();
997        let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
998            SubmitAndAwaitStatusArg {
999                tx: HexString(Bytes(tx)),
1000                estimate_predicates,
1001                include_preconfirmation,
1002            },
1003        );
1004
1005        let stream = self.subscribe(s).await?.map(
1006            |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1007                let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1008                Result::<_, io::Error>::Ok(status)
1009            },
1010        );
1011
1012        Ok(stream)
1013    }
1014
1015    /// Requests all storage slots for the `contract_id`.
1016    #[cfg(feature = "subscriptions")]
1017    pub async fn contract_storage_slots(
1018        &self,
1019        contract_id: &ContractId,
1020    ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1021        use cynic::SubscriptionBuilder;
1022        use schema::storage::ContractStorageSlotsArgs;
1023        let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
1024            contract_id: (*contract_id).into(),
1025        });
1026
1027        let stream = self.subscribe(s).await?.map(
1028            |result: io::Result<schema::storage::ContractStorageSlots>| {
1029                let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1030                Result::<_, io::Error>::Ok(result)
1031            },
1032        );
1033
1034        Ok(stream)
1035    }
1036
1037    /// Requests all storage balances for the `contract_id`.
1038    #[cfg(feature = "subscriptions")]
1039    pub async fn contract_storage_balances(
1040        &self,
1041        contract_id: &ContractId,
1042    ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1043    {
1044        use cynic::SubscriptionBuilder;
1045        use schema::{
1046            contract::ContractBalance,
1047            storage::ContractStorageBalancesArgs,
1048        };
1049        let s = schema::storage::ContractStorageBalances::build(
1050            ContractStorageBalancesArgs {
1051                contract_id: (*contract_id).into(),
1052            },
1053        );
1054
1055        let stream = self.subscribe(s).await?.map(
1056            |result: io::Result<schema::storage::ContractStorageBalances>| {
1057                let result: ContractBalance = result?.contract_storage_balances;
1058                Result::<_, io::Error>::Ok(result)
1059            },
1060        );
1061
1062        Ok(stream)
1063    }
1064
1065    /// Returns a stream of new blocks.
1066    #[cfg(feature = "subscriptions")]
1067    pub async fn new_blocks_subscription(
1068        &self,
1069    ) -> io::Result<
1070        impl Stream<
1071            Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1072        > + '_,
1073    > {
1074        use cynic::SubscriptionBuilder;
1075        let s = schema::block::NewBlocksSubscription::build(());
1076
1077        let stream = self.subscribe(s).await?.map(
1078            |r: io::Result<schema::block::NewBlocksSubscription>| {
1079                let result: fuel_core_types::services::block_importer::ImportResult =
1080                    postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1081                        io::Error::other(format!(
1082                            "Failed to deserialize ImportResult: {e:?}"
1083                        ))
1084                    })?;
1085                Result::<_, io::Error>::Ok(result)
1086            },
1087        );
1088
1089        Ok(stream)
1090    }
1091
1092    /// Returns a stream of preconfirmations for all transactions.
1093    #[cfg(feature = "subscriptions")]
1094    pub async fn preconfirmations_subscription(
1095        &self,
1096    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1097        use cynic::SubscriptionBuilder;
1098        let s = schema::tx::PreconfirmationsSubscription::build(());
1099
1100        let stream = self.subscribe(s).await?.map(
1101            |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1102                let status: TransactionStatus = r?.preconfirmations.try_into()?;
1103                Result::<_, io::Error>::Ok(status)
1104            },
1105        );
1106
1107        Ok(stream)
1108    }
1109
1110    pub async fn contract_slots_values(
1111        &self,
1112        contract_id: &ContractId,
1113        block_height: Option<BlockHeight>,
1114        requested_storage_slots: Vec<Bytes32>,
1115    ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1116        let query = schema::storage::ContractSlotValues::build(
1117            schema::storage::ContractSlotValuesArgs {
1118                contract_id: (*contract_id).into(),
1119                block_height: block_height.map(|b| (*b).into()),
1120                storage_slots: requested_storage_slots
1121                    .into_iter()
1122                    .map(Into::into)
1123                    .collect(),
1124            },
1125        );
1126
1127        self.query(query)
1128            .await
1129            .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1130    }
1131
1132    pub async fn contract_balance_values(
1133        &self,
1134        contract_id: &ContractId,
1135        block_height: Option<BlockHeight>,
1136        requested_storage_slots: Vec<AssetId>,
1137    ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1138        let query = schema::storage::ContractBalanceValues::build(
1139            schema::storage::ContractBalanceValuesArgs {
1140                contract_id: (*contract_id).into(),
1141                block_height: block_height.map(|b| (*b).into()),
1142                assets: requested_storage_slots
1143                    .into_iter()
1144                    .map(Into::into)
1145                    .collect(),
1146            },
1147        );
1148
1149        self.query(query)
1150            .await
1151            .map(|r| r.contract_balance_values.into_iter().collect())
1152    }
1153
1154    pub async fn start_session(&self) -> io::Result<String> {
1155        let query = schema::StartSession::build(());
1156
1157        self.query(query)
1158            .await
1159            .map(|r| r.start_session.into_inner())
1160    }
1161
1162    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1163        let query = schema::EndSession::build(IdArg { id: id.into() });
1164
1165        self.query(query).await.map(|r| r.end_session)
1166    }
1167
1168    pub async fn reset(&self, id: &str) -> io::Result<bool> {
1169        let query = schema::Reset::build(IdArg { id: id.into() });
1170
1171        self.query(query).await.map(|r| r.reset)
1172    }
1173
1174    pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1175        let op = serde_json::to_string(op)?;
1176        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1177
1178        self.query(query).await.map(|r| r.execute)
1179    }
1180
1181    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1182        let query = schema::Register::build(RegisterArgs {
1183            id: id.into(),
1184            register: register.into(),
1185        });
1186
1187        Ok(self.query(query).await?.register.0 as Word)
1188    }
1189
1190    pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1191        let query = schema::Memory::build(MemoryArgs {
1192            id: id.into(),
1193            start: start.into(),
1194            size: size.into(),
1195        });
1196
1197        let memory = self.query(query).await?.memory;
1198
1199        Ok(serde_json::from_str(memory.as_str())?)
1200    }
1201
1202    pub async fn set_breakpoint(
1203        &self,
1204        session_id: &str,
1205        contract: fuel_types::ContractId,
1206        pc: u64,
1207    ) -> io::Result<()> {
1208        let operation = SetBreakpoint::build(SetBreakpointArgs {
1209            id: Id::new(session_id),
1210            bp: schema::Breakpoint {
1211                contract: contract.into(),
1212                pc: U64(pc),
1213            },
1214        });
1215
1216        let response = self.query(operation).await?;
1217        assert!(
1218            response.set_breakpoint,
1219            "Setting breakpoint returned invalid reply"
1220        );
1221        Ok(())
1222    }
1223
1224    pub async fn set_single_stepping(
1225        &self,
1226        session_id: &str,
1227        enable: bool,
1228    ) -> io::Result<()> {
1229        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1230            id: Id::new(session_id),
1231            enable,
1232        });
1233        self.query(operation).await?;
1234        Ok(())
1235    }
1236
1237    pub async fn start_tx(
1238        &self,
1239        session_id: &str,
1240        tx: &Transaction,
1241    ) -> io::Result<RunResult> {
1242        let operation = StartTx::build(StartTxArgs {
1243            id: Id::new(session_id),
1244            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1245        });
1246        let response = self.query(operation).await?.start_tx;
1247        Ok(response)
1248    }
1249
1250    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1251        let operation = ContinueTx::build(ContinueTxArgs {
1252            id: Id::new(session_id),
1253        });
1254        let response = self.query(operation).await?.continue_tx;
1255        Ok(response)
1256    }
1257
1258    pub async fn transaction(
1259        &self,
1260        id: &TxId,
1261    ) -> io::Result<Option<TransactionResponse>> {
1262        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1263
1264        let transaction = self.query(query).await?.transaction;
1265
1266        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1267    }
1268
1269    /// Get the status of a transaction
1270    pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1271        let query =
1272            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1273
1274        let status = self.query(query).await?.transaction.ok_or_else(|| {
1275            io::Error::new(
1276                ErrorKind::NotFound,
1277                format!("status not found for transaction {id} "),
1278            )
1279        })?;
1280
1281        let status = status
1282            .status
1283            .ok_or_else(|| {
1284                io::Error::new(
1285                    ErrorKind::NotFound,
1286                    format!("status not found for transaction {id}"),
1287                )
1288            })?
1289            .try_into()?;
1290        Ok(status)
1291    }
1292
1293    #[tracing::instrument(skip(self), level = "debug")]
1294    #[cfg(feature = "subscriptions")]
1295    /// Similar to [`Self::subscribe_transaction_status_opt`], but with default options.
1296    pub async fn subscribe_transaction_status(
1297        &self,
1298        id: &TxId,
1299    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1300        self.subscribe_transaction_status_opt(id, None).await
1301    }
1302
1303    #[cfg(feature = "subscriptions")]
1304    /// Subscribe to the status of a transaction
1305    pub async fn subscribe_transaction_status_opt(
1306        &self,
1307        id: &TxId,
1308        include_preconfirmation: Option<bool>,
1309    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1310        use cynic::SubscriptionBuilder;
1311        use schema::tx::StatusChangeSubscriptionArgs;
1312        let tx_id: TransactionId = (*id).into();
1313        let s =
1314            schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
1315                id: tx_id,
1316                include_preconfirmation,
1317            });
1318
1319        tracing::debug!("subscribing");
1320        let stream = self.subscribe(s).await?.map(|tx| {
1321            tracing::debug!("received {tx:?}");
1322            let tx = tx?;
1323            let status = tx.status_change.try_into()?;
1324            Ok(status)
1325        });
1326
1327        Ok(stream)
1328    }
1329
1330    #[cfg(feature = "subscriptions")]
1331    /// Awaits for the transaction to be committed into a block
1332    ///
1333    /// This will wait forever if needed, so consider wrapping this call
1334    /// with a `tokio::time::timeout`.
1335    pub async fn await_transaction_commit(
1336        &self,
1337        id: &TxId,
1338    ) -> io::Result<TransactionStatus> {
1339        // skip until we've reached a final status and then stop consuming the stream
1340        // to avoid an EOF which the eventsource client considers as an error.
1341        let status_result = self
1342            .subscribe_transaction_status(id)
1343            .await?
1344            .skip_while(|status| {
1345                future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1346            })
1347            .next()
1348            .await;
1349
1350        if let Some(Ok(status)) = status_result {
1351            Ok(status)
1352        } else {
1353            Err(io::Error::other(format!(
1354                "Failed to get status for transaction {status_result:?}"
1355            )))
1356        }
1357    }
1358
1359    /// returns a paginated set of transactions sorted by block height
1360    pub async fn transactions(
1361        &self,
1362        request: PaginationRequest<String>,
1363    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1364        let args = schema::ConnectionArgs::from(request);
1365        let query = schema::tx::TransactionsQuery::build(args);
1366        let transactions = self.query(query).await?.transactions.try_into()?;
1367        Ok(transactions)
1368    }
1369
1370    /// Returns a paginated set of transactions associated with a txo owner address.
1371    pub async fn transactions_by_owner(
1372        &self,
1373        owner: &Address,
1374        request: PaginationRequest<String>,
1375    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1376        let owner: schema::Address = (*owner).into();
1377        let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1378        let query = schema::tx::TransactionsByOwnerQuery::build(args);
1379
1380        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1381        Ok(transactions)
1382    }
1383
1384    pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1385        let query =
1386            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1387
1388        let tx = self.query(query).await?.transaction.ok_or_else(|| {
1389            io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1390        })?;
1391
1392        let receipts = match tx.status {
1393            Some(status) => match status {
1394                schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1395                    s.receipts
1396                        .into_iter()
1397                        .map(TryInto::<Receipt>::try_into)
1398                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1399                )
1400                .transpose()?,
1401                schema::tx::TransactionStatus::FailureStatus(s) => Some(
1402                    s.receipts
1403                        .into_iter()
1404                        .map(TryInto::<Receipt>::try_into)
1405                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1406                )
1407                .transpose()?,
1408                _ => None,
1409            },
1410            _ => None,
1411        };
1412
1413        Ok(receipts)
1414    }
1415
1416    #[cfg(feature = "test-helpers")]
1417    pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1418        let query = schema::tx::AllReceipts::build(());
1419        let receipts = self.query(query).await?.all_receipts;
1420
1421        let vec: Result<Vec<Receipt>, ConversionError> = receipts
1422            .into_iter()
1423            .map(TryInto::<Receipt>::try_into)
1424            .collect();
1425
1426        Ok(vec?)
1427    }
1428
1429    pub async fn produce_blocks(
1430        &self,
1431        blocks_to_produce: u32,
1432        start_timestamp: Option<u64>,
1433    ) -> io::Result<BlockHeight> {
1434        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1435            blocks_to_produce: blocks_to_produce.into(),
1436            start_timestamp: start_timestamp
1437                .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1438        });
1439
1440        let new_height = self.query(query).await?.produce_blocks;
1441
1442        Ok(new_height.into())
1443    }
1444
1445    pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1446        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1447            id: Some((*id).into()),
1448        });
1449
1450        let block = self
1451            .query(query)
1452            .await?
1453            .block
1454            .map(TryInto::try_into)
1455            .transpose()?;
1456
1457        Ok(block)
1458    }
1459
1460    pub async fn block_by_height(
1461        &self,
1462        height: BlockHeight,
1463    ) -> io::Result<Option<types::Block>> {
1464        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1465            height: Some(U32(height.into())),
1466        });
1467
1468        let block = self
1469            .query(query)
1470            .await?
1471            .block
1472            .map(TryInto::try_into)
1473            .transpose()?;
1474
1475        Ok(block)
1476    }
1477
1478    pub async fn da_compressed_block(
1479        &self,
1480        height: BlockHeight,
1481    ) -> io::Result<Option<Vec<u8>>> {
1482        let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1483            DaCompressedBlockByHeightArgs {
1484                height: U32(height.into()),
1485            },
1486        );
1487
1488        Ok(self
1489            .query(query)
1490            .await?
1491            .da_compressed_block
1492            .map(|b| b.bytes.into()))
1493    }
1494
1495    /// Retrieve a blob by its ID
1496    pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1497        let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1498        let blob = self.query(query).await?.blob.map(Into::into);
1499        Ok(blob)
1500    }
1501
1502    /// Check whether a blob with ID exists
1503    pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1504        let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1505        Ok(self.query(query).await?.blob.is_some())
1506    }
1507
1508    /// Retrieve multiple blocks
1509    pub async fn blocks(
1510        &self,
1511        request: PaginationRequest<String>,
1512    ) -> io::Result<PaginatedResult<types::Block, String>> {
1513        let args = schema::ConnectionArgs::from(request);
1514        let query = schema::block::BlocksQuery::build(args);
1515
1516        let blocks = self.query(query).await?.blocks.try_into()?;
1517
1518        Ok(blocks)
1519    }
1520
1521    pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1522        let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1523            utxo_id: (*id).into(),
1524        });
1525        let coin = self.query(query).await?.coin.map(Into::into);
1526        Ok(coin)
1527    }
1528
1529    /// Retrieve a page of coins by their owner
1530    pub async fn coins(
1531        &self,
1532        owner: &Address,
1533        asset_id: Option<&AssetId>,
1534        request: PaginationRequest<String>,
1535    ) -> io::Result<PaginatedResult<types::Coin, String>> {
1536        let owner: schema::Address = (*owner).into();
1537        let asset_id = asset_id.map(|id| (*id).into());
1538        let args = CoinsConnectionArgs::from((owner, asset_id, request));
1539        let query = schema::coins::CoinsQuery::build(args);
1540
1541        let coins = self.query(query).await?.coins.into();
1542        Ok(coins)
1543    }
1544
1545    /// Retrieve coins to spend in a transaction
1546    pub async fn coins_to_spend(
1547        &self,
1548        owner: &Address,
1549        spend_query: Vec<(AssetId, u128, Option<u16>)>,
1550        // (Utxos, Messages Nonce)
1551        excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1552    ) -> io::Result<Vec<Vec<types::CoinType>>> {
1553        let owner: schema::Address = (*owner).into();
1554        let spend_query: Vec<SpendQueryElementInput> = spend_query
1555            .iter()
1556            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1557                Ok(SpendQueryElementInput {
1558                    asset_id: (*asset_id).into(),
1559                    amount: (*amount).into(),
1560                    max: (*max).map(|max| max.into()),
1561                })
1562            })
1563            .try_collect()?;
1564        let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1565        let args =
1566            schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1567        let query = schema::coins::CoinsToSpendQuery::build(args);
1568
1569        let coins_per_asset = self
1570            .query(query)
1571            .await?
1572            .coins_to_spend
1573            .into_iter()
1574            .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1575            .collect::<Vec<_>>();
1576        Ok(coins_per_asset)
1577    }
1578
1579    pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1580        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1581            id: (*id).into(),
1582        });
1583        let contract = self.query(query).await?.contract.map(Into::into);
1584        Ok(contract)
1585    }
1586
1587    pub async fn contract_balance(
1588        &self,
1589        id: &ContractId,
1590        asset: Option<&AssetId>,
1591    ) -> io::Result<u64> {
1592        let asset_id: schema::AssetId = match asset {
1593            Some(asset) => (*asset).into(),
1594            None => schema::AssetId::default(),
1595        };
1596
1597        let query =
1598            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1599                id: (*id).into(),
1600                asset: asset_id,
1601            });
1602
1603        let balance: types::ContractBalance =
1604            self.query(query).await?.contract_balance.into();
1605        Ok(balance.amount)
1606    }
1607
1608    pub async fn balance(
1609        &self,
1610        owner: &Address,
1611        asset_id: Option<&AssetId>,
1612    ) -> io::Result<u128> {
1613        let owner: schema::Address = (*owner).into();
1614        let asset_id: schema::AssetId = match asset_id {
1615            Some(asset_id) => (*asset_id).into(),
1616            None => schema::AssetId::default(),
1617        };
1618        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1619        let balance: types::Balance = self.query(query).await?.balance.into();
1620        Ok(balance.amount)
1621    }
1622
1623    // Retrieve a page of balances by their owner
1624    pub async fn balances(
1625        &self,
1626        owner: &Address,
1627        request: PaginationRequest<String>,
1628    ) -> io::Result<PaginatedResult<types::Balance, String>> {
1629        let owner: schema::Address = (*owner).into();
1630        let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1631        let query = schema::balance::BalancesQuery::build(args);
1632
1633        let balances = self.query(query).await?.balances.into();
1634        Ok(balances)
1635    }
1636
1637    pub async fn contract_balances(
1638        &self,
1639        contract: &ContractId,
1640        request: PaginationRequest<String>,
1641    ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1642        let contract_id: schema::ContractId = (*contract).into();
1643        let args = ContractBalancesConnectionArgs::from((contract_id, request));
1644        let query = schema::contract::ContractBalancesQuery::build(args);
1645
1646        let balances = self.query(query).await?.contract_balances.into();
1647
1648        Ok(balances)
1649    }
1650
1651    // Retrieve a message by its nonce
1652    pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1653        let query = schema::message::MessageQuery::build(NonceArgs {
1654            nonce: (*nonce).into(),
1655        });
1656        let message = self.query(query).await?.message.map(Into::into);
1657        Ok(message)
1658    }
1659
1660    pub async fn messages(
1661        &self,
1662        owner: Option<&Address>,
1663        request: PaginationRequest<String>,
1664    ) -> io::Result<PaginatedResult<types::Message, String>> {
1665        let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1666        let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1667        let query = schema::message::OwnedMessageQuery::build(args);
1668
1669        let messages = self.query(query).await?.messages.into();
1670
1671        Ok(messages)
1672    }
1673
1674    pub async fn contract_info(
1675        &self,
1676        contract: &ContractId,
1677    ) -> io::Result<Option<types::Contract>> {
1678        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1679            id: (*contract).into(),
1680        });
1681        let contract_info = self.query(query).await?.contract.map(Into::into);
1682        Ok(contract_info)
1683    }
1684
1685    pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1686        let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1687            nonce: (*nonce).into(),
1688        });
1689        let status = self.query(query).await?.message_status.into();
1690
1691        Ok(status)
1692    }
1693
1694    /// Request a merkle proof of an output message.
1695    pub async fn message_proof(
1696        &self,
1697        transaction_id: &TxId,
1698        nonce: &Nonce,
1699        commit_block_id: Option<&BlockId>,
1700        commit_block_height: Option<BlockHeight>,
1701    ) -> io::Result<types::MessageProof> {
1702        let transaction_id: TransactionId = (*transaction_id).into();
1703        let nonce: schema::Nonce = (*nonce).into();
1704        let commit_block_id: Option<schema::BlockId> =
1705            commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1706        let commit_block_height = commit_block_height.map(Into::into);
1707        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1708            transaction_id,
1709            nonce,
1710            commit_block_id,
1711            commit_block_height,
1712        });
1713        let proof = self.query(query).await?.message_proof.try_into()?;
1714        Ok(proof)
1715    }
1716
1717    pub async fn relayed_transaction_status(
1718        &self,
1719        id: &Bytes32,
1720    ) -> io::Result<Option<RelayedTransactionStatus>> {
1721        let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1722            RelayedTransactionStatusArgs {
1723                id: id.to_owned().into(),
1724            },
1725        );
1726        let status = self
1727            .query(query)
1728            .await?
1729            .relayed_transaction_status
1730            .map(|status| status.try_into())
1731            .transpose()?;
1732        Ok(status)
1733    }
1734
1735    pub async fn asset_info(
1736        &self,
1737        asset_id: &AssetId,
1738    ) -> io::Result<Option<AssetDetail>> {
1739        let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1740            id: (*asset_id).into(),
1741        });
1742        let asset_info = self.query(query).await?.asset_details.map(Into::into);
1743        Ok(asset_info)
1744    }
1745}
1746
1747#[cfg(any(test, feature = "test-helpers"))]
1748impl FuelClient {
1749    pub async fn transparent_transaction(
1750        &self,
1751        id: &TxId,
1752    ) -> io::Result<Option<types::TransactionType>> {
1753        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1754
1755        let transaction = self.query(query).await?.transaction;
1756
1757        Ok(transaction
1758            .map(|tx| {
1759                let response: TransactionResponse = tx.try_into()?;
1760                Ok::<_, ConversionError>(response.transaction)
1761            })
1762            .transpose()?)
1763    }
1764}