fuel_core_client/
client.rs

1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4    client::{
5        schema::{
6            Tai64Timestamp,
7            TransactionId,
8            block::BlockByHeightArgs,
9            coins::{
10                ExcludeInput,
11                SpendQueryElementInput,
12            },
13            contract::ContractBalanceQueryArgs,
14            gas_price::EstimateGasPrice,
15            message::MessageStatusArgs,
16            relayed_tx::RelayedTransactionStatusArgs,
17            tx::{
18                DryRunArg,
19                TxWithEstimatedPredicatesArg,
20            },
21        },
22        types::{
23            RelayedTransactionStatus,
24            asset::AssetDetail,
25            gas_price::LatestGasPrice,
26            message::MessageStatus,
27            primitives::{
28                Address,
29                AssetId,
30                BlockId,
31                ContractId,
32                UtxoId,
33            },
34            upgrades::StateTransitionBytecode,
35        },
36    },
37    reqwest_ext::{
38        FuelGraphQlResponse,
39        FuelOperation,
40        ReqwestExt,
41    },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46    BASE64_STANDARD,
47    Engine as _,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52    Id,
53    MutationBuilder,
54    Operation,
55    QueryBuilder,
56};
57use fuel_core_types::{
58    blockchain::header::{
59        ConsensusParametersVersion,
60        StateTransitionBytecodeVersion,
61    },
62    fuel_asm::{
63        Instruction,
64        Word,
65    },
66    fuel_tx::{
67        BlobId,
68        Bytes32,
69        ConsensusParameters,
70        Receipt,
71        Transaction,
72        TxId,
73    },
74    fuel_types::{
75        self,
76        BlockHeight,
77        Nonce,
78        canonical::Serialize,
79    },
80    services::executor::{
81        StorageReadReplayEvent,
82        TransactionExecutionStatus,
83    },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87    Stream,
88    StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92    PageDirection,
93    PaginatedResult,
94    PaginationRequest,
95};
96use schema::{
97    Bytes,
98    ContinueTx,
99    ContinueTxArgs,
100    ConversionError,
101    HexString,
102    IdArg,
103    MemoryArgs,
104    RegisterArgs,
105    RunResult,
106    SetBreakpoint,
107    SetBreakpointArgs,
108    SetSingleStepping,
109    SetSingleSteppingArgs,
110    StartTx,
111    StartTxArgs,
112    U32,
113    U64,
114    assets::AssetInfoArg,
115    balance::BalanceArgs,
116    blob::BlobByIdArgs,
117    block::BlockByIdArgs,
118    coins::{
119        CoinByIdArgs,
120        CoinsConnectionArgs,
121    },
122    contract::{
123        ContractBalancesConnectionArgs,
124        ContractByIdArgs,
125    },
126    da_compressed::DaCompressedBlockByHeightArgs,
127    gas_price::BlockHorizonArgs,
128    storage_read_replay::{
129        StorageReadReplay,
130        StorageReadReplayArgs,
131    },
132    tx::{
133        AssembleTxArg,
134        TransactionsByOwnerConnectionArgs,
135        TxArg,
136        TxIdArgs,
137    },
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142    convert::TryInto,
143    io::{
144        self,
145        ErrorKind,
146    },
147    net,
148    str::{
149        self,
150        FromStr,
151    },
152    sync::{
153        Arc,
154        Mutex,
155    },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160    TransactionResponse,
161    TransactionStatus,
162    assemble_tx::{
163        AssembleTransactionResult,
164        RequiredBalance,
165    },
166};
167
168use self::schema::{
169    block::ProduceBlockArgs,
170    message::{
171        MessageProofArgs,
172        NonceArgs,
173    },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184/// Error occurring during interaction with the FuelClient
185// anyhow::Error is wrapped inside a custom Error type,
186// so that we can specific error variants in the future.
187pub enum Error {
188    /// Unknown or not expected(by architecture) error.
189    #[from]
190    Other(anyhow::Error),
191}
192
193/// Consistency policy for the [`FuelClient`] to define the strategy
194/// for the required height feature.
195#[derive(Debug)]
196pub enum ConsistencyPolicy {
197    /// Automatically fetch the next block height from the response and
198    /// use it as an input to the next query to guarantee consistency
199    /// of the results for the queries.
200    Auto {
201        /// The required block height for the queries.
202        height: Arc<Mutex<Option<BlockHeight>>>,
203    },
204    /// Use manually sets the block height for all queries
205    /// via the [`FuelClient::with_required_fuel_block_height`].
206    Manual {
207        /// The required block height for the queries.
208        height: Option<BlockHeight>,
209    },
210}
211
212impl Clone for ConsistencyPolicy {
213    fn clone(&self) -> Self {
214        match self {
215            Self::Auto { height } => Self::Auto {
216                // We don't want to share the same mutex between the different
217                // instances of the `FuelClient`.
218                height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219            },
220            Self::Manual { height } => Self::Manual { height: *height },
221        }
222    }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227    current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228    current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232    fn clone(&self) -> Self {
233        Self {
234            current_stf_version: Arc::new(Mutex::new(
235                self.current_stf_version.lock().ok().and_then(|v| *v),
236            )),
237            current_consensus_parameters_version: Arc::new(Mutex::new(
238                self.current_consensus_parameters_version
239                    .lock()
240                    .ok()
241                    .and_then(|v| *v),
242            )),
243        }
244    }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249    client: reqwest::Client,
250    #[cfg(feature = "subscriptions")]
251    cookie: std::sync::Arc<reqwest::cookie::Jar>,
252    url: reqwest::Url,
253    require_height: ConsistencyPolicy,
254    chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258    type Err = anyhow::Error;
259
260    fn from_str(str: &str) -> Result<Self, Self::Err> {
261        let mut raw_url = str.to_string();
262        if !raw_url.starts_with("http") {
263            raw_url = format!("http://{raw_url}");
264        }
265
266        let mut url = reqwest::Url::parse(&raw_url)
267            .map_err(anyhow::Error::msg)
268            .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269        url.set_path("/v1/graphql");
270
271        #[cfg(feature = "subscriptions")]
272        {
273            let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274            let client = reqwest::Client::builder()
275                .cookie_provider(cookie.clone())
276                .build()
277                .map_err(anyhow::Error::msg)?;
278            Ok(Self {
279                client,
280                cookie,
281                url,
282                require_height: ConsistencyPolicy::Auto {
283                    height: Arc::new(Mutex::new(None)),
284                },
285                chain_state_info: Default::default(),
286            })
287        }
288
289        #[cfg(not(feature = "subscriptions"))]
290        {
291            let client = reqwest::Client::new();
292            Ok(Self {
293                client,
294                url,
295                require_height: ConsistencyPolicy::Auto {
296                    height: Arc::new(Mutex::new(None)),
297                },
298                chain_state_info: Default::default(),
299            })
300        }
301    }
302}
303
304impl<S> From<S> for FuelClient
305where
306    S: Into<net::SocketAddr>,
307{
308    fn from(socket: S) -> Self {
309        format!("http://{}", socket.into())
310            .as_str()
311            .parse()
312            .unwrap()
313    }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317    let e = errors
318        .into_iter()
319        .fold(String::from("Response errors"), |mut s, e| {
320            s.push_str("; ");
321            s.push_str(e.as_str());
322            s
323        });
324    io::Error::new(io::ErrorKind::Other, e)
325}
326
327impl FuelClient {
328    pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329        Self::from_str(url.as_ref())
330    }
331
332    pub fn with_required_fuel_block_height(
333        &mut self,
334        new_height: Option<BlockHeight>,
335    ) -> &mut Self {
336        match &mut self.require_height {
337            ConsistencyPolicy::Auto { height } => {
338                *height.lock().expect("Mutex poisoned") = new_height;
339            }
340            ConsistencyPolicy::Manual { height } => {
341                *height = new_height;
342            }
343        }
344        self
345    }
346
347    pub fn use_manual_consistency_policy(
348        &mut self,
349        height: Option<BlockHeight>,
350    ) -> &mut Self {
351        self.require_height = ConsistencyPolicy::Manual { height };
352        self
353    }
354
355    pub fn required_block_height(&self) -> Option<BlockHeight> {
356        match &self.require_height {
357            ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358            ConsistencyPolicy::Manual { height } => *height,
359        }
360    }
361
362    fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363        if let Some(current_sft_version) = response
364            .extensions
365            .as_ref()
366            .and_then(|e| e.current_stf_version)
367        {
368            if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
369                *c = Some(current_sft_version);
370            }
371        }
372
373        if let Some(current_consensus_parameters_version) = response
374            .extensions
375            .as_ref()
376            .and_then(|e| e.current_consensus_parameters_version)
377        {
378            if let Ok(mut c) = self
379                .chain_state_info
380                .current_consensus_parameters_version
381                .lock()
382            {
383                *c = Some(current_consensus_parameters_version);
384            }
385        }
386
387        let inner_required_height = match &self.require_height {
388            ConsistencyPolicy::Auto { height } => Some(height.clone()),
389            ConsistencyPolicy::Manual { .. } => None,
390        };
391
392        if let Some(inner_required_height) = inner_required_height {
393            if let Some(current_fuel_block_height) = response
394                .extensions
395                .as_ref()
396                .and_then(|e| e.current_fuel_block_height)
397            {
398                let mut lock = inner_required_height.lock().expect("Mutex poisoned");
399
400                if current_fuel_block_height >= lock.unwrap_or_default() {
401                    *lock = Some(current_fuel_block_height);
402                }
403            }
404        }
405    }
406
407    /// Send the GraphQL query to the client.
408    pub async fn query<ResponseData, Vars>(
409        &self,
410        q: Operation<ResponseData, Vars>,
411    ) -> io::Result<ResponseData>
412    where
413        Vars: serde::Serialize,
414        ResponseData: serde::de::DeserializeOwned + 'static,
415    {
416        let required_fuel_block_height = self.required_block_height();
417        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
418        let response = self
419            .client
420            .post(self.url.clone())
421            .run_fuel_graphql(fuel_operation)
422            .await
423            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425        self.decode_response(response)
426    }
427
428    fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
429    where
430        R: serde::de::DeserializeOwned + 'static,
431    {
432        self.update_chain_state_info(&response);
433
434        if let Some(failed) = response
435            .extensions
436            .as_ref()
437            .and_then(|e| e.fuel_block_height_precondition_failed)
438        {
439            if failed {
440                return Err(io::Error::new(
441                    io::ErrorKind::Other,
442                    "The required block height was not met",
443                ));
444            }
445        }
446
447        let response = response.response;
448
449        match (response.data, response.errors) {
450            (Some(d), _) => Ok(d),
451            (_, Some(e)) => Err(from_strings_errors_to_std_error(
452                e.into_iter().map(|e| e.message).collect(),
453            )),
454            _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
455        }
456    }
457
458    #[tracing::instrument(skip_all)]
459    #[cfg(feature = "subscriptions")]
460    async fn subscribe<ResponseData, Vars>(
461        &self,
462        q: StreamingOperation<ResponseData, Vars>,
463    ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
464    where
465        Vars: serde::Serialize,
466        ResponseData: serde::de::DeserializeOwned + 'static + Send,
467    {
468        use core::ops::Deref;
469        use eventsource_client as es;
470        use hyper_rustls as _;
471        use reqwest::cookie::CookieStore;
472        let mut url = self.url.clone();
473        url.set_path("/v1/graphql-sub");
474
475        let required_fuel_block_height = self.required_block_height();
476        let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
477
478        let json_query = serde_json::to_string(&fuel_operation)?;
479        let mut client_builder = es::ClientBuilder::for_url(url.as_str())
480            .map_err(|e| {
481                io::Error::new(
482                    io::ErrorKind::Other,
483                    format!("Failed to start client {e:?}"),
484                )
485            })?
486            .body(json_query)
487            .method("POST".to_string())
488            .header("content-type", "application/json")
489            .map_err(|e| {
490                io::Error::new(
491                    io::ErrorKind::Other,
492                    format!("Failed to add header to client {e:?}"),
493                )
494            })?;
495        if let Some(password) = url.password() {
496            let username = url.username();
497            let credentials = format!("{}:{}", username, password);
498            let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
499            client_builder = client_builder
500                .header("Authorization", &authorization)
501                .map_err(|e| {
502                    io::Error::new(
503                        io::ErrorKind::Other,
504                        format!("Failed to add header to client {e:?}"),
505                    )
506                })?;
507        }
508
509        if let Some(value) = self.cookie.deref().cookies(&self.url) {
510            let value = value.to_str().map_err(|e| {
511                io::Error::new(
512                    io::ErrorKind::Other,
513                    format!("Unable convert header value to string {e:?}"),
514                )
515            })?;
516            client_builder = client_builder
517                .header(reqwest::header::COOKIE.as_str(), value)
518                .map_err(|e| {
519                    io::Error::new(
520                        io::ErrorKind::Other,
521                        format!("Failed to add header from `reqwest` to client {e:?}"),
522                    )
523                })?;
524        }
525
526        let client = client_builder.build_with_conn(
527            hyper_rustls::HttpsConnectorBuilder::new()
528                .with_webpki_roots()
529                .https_or_http()
530                .enable_http1()
531                .build(),
532        );
533
534        let mut last = None;
535
536        enum Event<ResponseData> {
537            Connected,
538            ResponseData(ResponseData),
539        }
540
541        let mut init_stream = es::Client::stream(&client)
542            .take_while(|result| {
543                futures::future::ready(!matches!(result, Err(es::Error::Eof)))
544            })
545            .filter_map(move |result| {
546                tracing::debug!("Got result: {result:?}");
547                let r = match result {
548                    Ok(es::SSE::Event(es::Event { data, .. })) => {
549                        match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
550                            &data,
551                        ) {
552                            Ok(resp) => {
553                                match self.decode_response(resp) {
554                                    Ok(resp) => {
555                                        match last.replace(data) {
556                                            // Remove duplicates
557                                            Some(l)
558                                                if l == *last.as_ref().expect(
559                                                    "Safe because of the replace above",
560                                                ) =>
561                                            {
562                                                None
563                                            }
564                                            _ => Some(Ok(Event::ResponseData(resp))),
565                                        }
566                                    }
567                                    Err(e) => Some(Err(io::Error::new(
568                                        io::ErrorKind::Other,
569                                        format!("Decode error: {e:?}"),
570                                    ))),
571                                }
572                            }
573                            Err(e) => Some(Err(io::Error::new(
574                                io::ErrorKind::Other,
575                                format!("Json error: {e:?}"),
576                            ))),
577                        }
578                    }
579                    Ok(es::SSE::Connected(_)) => Some(Ok(Event::Connected)),
580                    Ok(_) => None,
581                    Err(e) => Some(Err(io::Error::new(
582                        io::ErrorKind::Other,
583                        format!("Graphql error: {e:?}"),
584                    ))),
585                };
586                futures::future::ready(r)
587            });
588
589        let event = init_stream.next().await;
590        let stream_with_resp = init_stream.filter_map(|result| async move {
591            match result {
592                Ok(Event::Connected) => None,
593                Ok(Event::ResponseData(resp)) => Some(Ok(resp)),
594                Err(error) => Some(Err(error)),
595            }
596        });
597
598        let stream = match event {
599            Some(Ok(Event::Connected)) => {
600                tracing::debug!("Subscription connected");
601                stream_with_resp.boxed()
602            }
603            Some(Ok(Event::ResponseData(resp))) => {
604                tracing::debug!("Subscription returned response");
605                let joined_stream = futures::stream::once(async move { Ok(resp) })
606                    .chain(stream_with_resp);
607                joined_stream.boxed()
608            }
609            Some(Err(e)) => return Err(e),
610            None => {
611                return Err(io::Error::new(
612                    io::ErrorKind::Other,
613                    "Subscription stream ended unexpectedly",
614                ));
615            }
616        };
617
618        Ok(stream)
619    }
620
621    pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
622        self.chain_state_info
623            .current_stf_version
624            .lock()
625            .ok()
626            .and_then(|value| *value)
627    }
628
629    pub fn latest_consensus_parameters_version(
630        &self,
631    ) -> Option<ConsensusParametersVersion> {
632        self.chain_state_info
633            .current_consensus_parameters_version
634            .lock()
635            .ok()
636            .and_then(|value| *value)
637    }
638
639    pub async fn health(&self) -> io::Result<bool> {
640        let query = schema::Health::build(());
641        self.query(query).await.map(|r| r.health)
642    }
643
644    pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
645        let query = schema::node_info::QueryNodeInfo::build(());
646        self.query(query).await.map(|r| r.node_info.into())
647    }
648
649    pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
650        let query = schema::gas_price::QueryLatestGasPrice::build(());
651        self.query(query).await.map(|r| r.latest_gas_price.into())
652    }
653
654    pub async fn estimate_gas_price(
655        &self,
656        block_horizon: u32,
657    ) -> io::Result<EstimateGasPrice> {
658        let args = BlockHorizonArgs {
659            block_horizon: Some(block_horizon.into()),
660        };
661        let query = schema::gas_price::QueryEstimateGasPrice::build(args);
662        self.query(query).await.map(|r| r.estimate_gas_price)
663    }
664
665    #[cfg(feature = "std")]
666    pub async fn connected_peers_info(
667        &self,
668    ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
669        let query = schema::node_info::QueryPeersInfo::build(());
670        self.query(query)
671            .await
672            .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
673    }
674
675    pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
676        let query = schema::chain::ChainQuery::build(());
677        self.query(query).await.and_then(|r| {
678            let result = r.chain.try_into()?;
679            Ok(result)
680        })
681    }
682
683    pub async fn consensus_parameters(
684        &self,
685        version: i32,
686    ) -> io::Result<Option<ConsensusParameters>> {
687        let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
688        let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
689
690        let result = self
691            .query(query)
692            .await?
693            .consensus_parameters
694            .map(TryInto::try_into)
695            .transpose()?;
696
697        Ok(result)
698    }
699
700    pub async fn state_transition_byte_code_by_version(
701        &self,
702        version: i32,
703    ) -> io::Result<Option<StateTransitionBytecode>> {
704        let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
705        let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
706
707        let result = self
708            .query(query)
709            .await?
710            .state_transition_bytecode_by_version
711            .map(TryInto::try_into)
712            .transpose()?;
713
714        Ok(result)
715    }
716
717    pub async fn state_transition_byte_code_by_root(
718        &self,
719        root: Bytes32,
720    ) -> io::Result<Option<StateTransitionBytecode>> {
721        let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
722            root: HexString(Bytes(root.to_vec())),
723        };
724        let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
725
726        let result = self
727            .query(query)
728            .await?
729            .state_transition_bytecode_by_root
730            .map(TryInto::try_into)
731            .transpose()?;
732
733        Ok(result)
734    }
735
736    /// Default dry run, matching the exact configuration as the node
737    pub async fn dry_run(
738        &self,
739        txs: &[Transaction],
740    ) -> io::Result<Vec<TransactionExecutionStatus>> {
741        self.dry_run_opt(txs, None, None, None).await
742    }
743
744    /// Dry run with options to override the node behavior
745    pub async fn dry_run_opt(
746        &self,
747        txs: &[Transaction],
748        // Disable utxo input checks (exists, unspent, and valid signature)
749        utxo_validation: Option<bool>,
750        gas_price: Option<u64>,
751        at_height: Option<BlockHeight>,
752    ) -> io::Result<Vec<TransactionExecutionStatus>> {
753        let txs = txs
754            .iter()
755            .map(|tx| HexString(Bytes(tx.to_bytes())))
756            .collect::<Vec<HexString>>();
757        let query: Operation<schema::tx::DryRun, DryRunArg> =
758            schema::tx::DryRun::build(DryRunArg {
759                txs,
760                utxo_validation,
761                gas_price: gas_price.map(|gp| gp.into()),
762                block_height: at_height.map(|bh| bh.into()),
763            });
764        let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
765        tx_statuses
766            .into_iter()
767            .map(|tx_status| tx_status.try_into().map_err(Into::into))
768            .collect()
769    }
770
771    /// Like `dry_run_opt`, but also returns the storage reads
772    pub async fn dry_run_opt_record_storage_reads(
773        &self,
774        txs: &[Transaction],
775        // Disable utxo input checks (exists, unspent, and valid signature)
776        utxo_validation: Option<bool>,
777        gas_price: Option<u64>,
778        at_height: Option<BlockHeight>,
779    ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
780        let txs = txs
781            .iter()
782            .map(|tx| HexString(Bytes(tx.to_bytes())))
783            .collect::<Vec<HexString>>();
784        let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
785            schema::tx::DryRunRecordStorageReads::build(DryRunArg {
786                txs,
787                utxo_validation,
788                gas_price: gas_price.map(|gp| gp.into()),
789                block_height: at_height.map(|bh| bh.into()),
790            });
791        let result = self
792            .query(query)
793            .await
794            .map(|r| r.dry_run_record_storage_reads)?;
795        let tx_statuses = result
796            .tx_statuses
797            .into_iter()
798            .map(|tx_status| tx_status.try_into().map_err(Into::into))
799            .collect::<io::Result<Vec<_>>>()?;
800        let storage_reads = result
801            .storage_reads
802            .into_iter()
803            .map(Into::into)
804            .collect::<Vec<_>>();
805        Ok((tx_statuses, storage_reads))
806    }
807
808    /// Get storage read replay for a block
809    pub async fn storage_read_replay(
810        &self,
811        height: &BlockHeight,
812    ) -> io::Result<Vec<StorageReadReplayEvent>> {
813        let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
814            StorageReadReplay::build(StorageReadReplayArgs {
815                height: (*height).into(),
816            });
817        Ok(self
818            .query(query)
819            .await
820            .map(|r| r.storage_read_replay)?
821            .into_iter()
822            .map(Into::into)
823            .collect())
824    }
825
826    /// Assembles the transaction based on the provided requirements.
827    /// The return transaction contains:
828    /// - Input coins to cover `required_balances`
829    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
830    /// - `Change` or `Destroy` outputs for all assets from the inputs
831    /// - `Variable` outputs in the case they are required during the execution
832    /// - `Contract` inputs and outputs in the case they are required during the execution
833    /// - Reserved witness slots for signed coins filled with `64` zeroes
834    /// - Set script gas limit(unless `script` is empty)
835    /// - Estimated predicates, if `estimate_predicates == true`
836    ///
837    /// Returns an error if:
838    /// - The number of required balances exceeds the maximum number of inputs allowed.
839    /// - The fee address index is out of bounds.
840    /// - The same asset has multiple change policies(either the receiver of
841    ///   the change is different, or one of the policies states about the destruction
842    ///   of the token while the other does not). The `Change` output from the transaction
843    ///   also count as a `ChangePolicy`.
844    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
845    /// - Required assets have multiple entries.
846    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
847    /// - If a constructed transaction breaks the rules defined by consensus parameters.
848    #[allow(clippy::too_many_arguments)]
849    pub async fn assemble_tx(
850        &self,
851        tx: &Transaction,
852        block_horizon: u32,
853        required_balances: Vec<RequiredBalance>,
854        fee_address_index: u16,
855        exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
856        estimate_predicates: bool,
857        reserve_gas: Option<u64>,
858    ) -> io::Result<AssembleTransactionResult> {
859        let tx = HexString(Bytes(tx.to_bytes()));
860        let block_horizon = block_horizon.into();
861
862        let required_balances: Vec<_> = required_balances
863            .into_iter()
864            .map(schema::tx::RequiredBalance::try_from)
865            .collect::<Result<Vec<_>, _>>()?;
866
867        let fee_address_index = fee_address_index.into();
868
869        let exclude_input = exclude.map(Into::into);
870
871        let reserve_gas = reserve_gas.map(U64::from);
872
873        let query_arg = AssembleTxArg {
874            tx,
875            block_horizon,
876            required_balances,
877            fee_address_index,
878            exclude_input,
879            estimate_predicates,
880            reserve_gas,
881        };
882
883        let query = schema::tx::AssembleTx::build(query_arg);
884        let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
885        Ok(assemble_tx_result.try_into()?)
886    }
887
888    /// Estimate predicates for the transaction
889    pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
890        let serialized_tx = tx.to_bytes();
891        let query = schema::tx::EstimatePredicates::build(TxArg {
892            tx: HexString(Bytes(serialized_tx)),
893        });
894        let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
895        let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
896        *tx = tx_with_predicate;
897        Ok(())
898    }
899
900    pub async fn submit(
901        &self,
902        tx: &Transaction,
903    ) -> io::Result<types::primitives::TransactionId> {
904        self.submit_opt(tx, None).await
905    }
906
907    pub async fn submit_opt(
908        &self,
909        tx: &Transaction,
910        estimate_predicates: Option<bool>,
911    ) -> io::Result<types::primitives::TransactionId> {
912        let tx = tx.clone().to_bytes();
913        let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
914            tx: HexString(Bytes(tx)),
915            estimate_predicates,
916        });
917
918        let id = self.query(query).await.map(|r| r.submit)?.id.into();
919        Ok(id)
920    }
921
922    /// Similar to [`Self::submit_and_await_commit_opt`], but with default options.
923    #[cfg(feature = "subscriptions")]
924    pub async fn submit_and_await_commit(
925        &self,
926        tx: &Transaction,
927    ) -> io::Result<TransactionStatus> {
928        self.submit_and_await_commit_opt(tx, None).await
929    }
930
931    /// Submit the transaction and wait for it either to be included in
932    /// a block or removed from `TxPool`.
933    ///
934    /// If `estimate_predicates` is set, the predicates will be estimated before
935    /// the transaction is inserted into transaction pool.
936    ///
937    /// This will wait forever if needed, so consider wrapping this call
938    /// with a `tokio::time::timeout`.
939    #[cfg(feature = "subscriptions")]
940    pub async fn submit_and_await_commit_opt(
941        &self,
942        tx: &Transaction,
943        estimate_predicates: Option<bool>,
944    ) -> io::Result<TransactionStatus> {
945        use cynic::SubscriptionBuilder;
946        let tx = tx.clone().to_bytes();
947        let s =
948            schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
949                tx: HexString(Bytes(tx)),
950                estimate_predicates,
951            });
952
953        let mut stream = self.subscribe(s).await?.map(
954            |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
955                let status: TransactionStatus = r?.submit_and_await.try_into()?;
956                Result::<_, io::Error>::Ok(status)
957            },
958        );
959
960        let status = stream.next().await.ok_or(io::Error::new(
961            io::ErrorKind::Other,
962            "Failed to get status from the submission",
963        ))??;
964
965        Ok(status)
966    }
967
968    /// Similar to [`Self::submit_and_await_commit`], but the status also contains transaction.
969    #[cfg(feature = "subscriptions")]
970    pub async fn submit_and_await_commit_with_tx(
971        &self,
972        tx: &Transaction,
973    ) -> io::Result<StatusWithTransaction> {
974        self.submit_and_await_commit_with_tx_opt(tx, None).await
975    }
976
977    /// Similar to [`Self::submit_and_await_commit_opt`], but the status also contains transaction.
978    #[cfg(feature = "subscriptions")]
979    pub async fn submit_and_await_commit_with_tx_opt(
980        &self,
981        tx: &Transaction,
982        estimate_predicates: Option<bool>,
983    ) -> io::Result<StatusWithTransaction> {
984        use cynic::SubscriptionBuilder;
985        let tx = tx.clone().to_bytes();
986        let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
987            TxWithEstimatedPredicatesArg {
988                tx: HexString(Bytes(tx)),
989                estimate_predicates,
990            },
991        );
992
993        let mut stream = self.subscribe(s).await?.map(
994            |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
995                let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
996                Result::<_, io::Error>::Ok(status)
997            },
998        );
999
1000        let status = stream.next().await.ok_or(io::Error::new(
1001            io::ErrorKind::Other,
1002            "Failed to get status from the submission",
1003        ))??;
1004
1005        Ok(status)
1006    }
1007
1008    /// Similar to [`Self::submit_and_await_commit`], but includes all intermediate states.
1009    #[cfg(feature = "subscriptions")]
1010    pub async fn submit_and_await_status(
1011        &self,
1012        tx: &Transaction,
1013    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1014        self.submit_and_await_status_opt(tx, None, None).await
1015    }
1016
1017    /// Similar to [`Self::submit_and_await_commit_opt`], but includes all intermediate states.
1018    #[cfg(feature = "subscriptions")]
1019    pub async fn submit_and_await_status_opt(
1020        &self,
1021        tx: &Transaction,
1022        estimate_predicates: Option<bool>,
1023        include_preconfirmation: Option<bool>,
1024    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1025        use cynic::SubscriptionBuilder;
1026        use schema::tx::SubmitAndAwaitStatusArg;
1027        let tx = tx.clone().to_bytes();
1028        let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
1029            SubmitAndAwaitStatusArg {
1030                tx: HexString(Bytes(tx)),
1031                estimate_predicates,
1032                include_preconfirmation,
1033            },
1034        );
1035
1036        let stream = self.subscribe(s).await?.map(
1037            |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1038                let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1039                Result::<_, io::Error>::Ok(status)
1040            },
1041        );
1042
1043        Ok(stream)
1044    }
1045
1046    /// Requests all storage slots for the `contract_id`.
1047    #[cfg(feature = "subscriptions")]
1048    pub async fn contract_storage_slots(
1049        &self,
1050        contract_id: &ContractId,
1051    ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1052        use cynic::SubscriptionBuilder;
1053        use schema::storage::ContractStorageSlotsArgs;
1054        let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
1055            contract_id: (*contract_id).into(),
1056        });
1057
1058        let stream = self.subscribe(s).await?.map(
1059            |result: io::Result<schema::storage::ContractStorageSlots>| {
1060                let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1061                Result::<_, io::Error>::Ok(result)
1062            },
1063        );
1064
1065        Ok(stream)
1066    }
1067
1068    /// Requests all storage balances for the `contract_id`.
1069    #[cfg(feature = "subscriptions")]
1070    pub async fn contract_storage_balances(
1071        &self,
1072        contract_id: &ContractId,
1073    ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1074    {
1075        use cynic::SubscriptionBuilder;
1076        use schema::{
1077            contract::ContractBalance,
1078            storage::ContractStorageBalancesArgs,
1079        };
1080        let s = schema::storage::ContractStorageBalances::build(
1081            ContractStorageBalancesArgs {
1082                contract_id: (*contract_id).into(),
1083            },
1084        );
1085
1086        let stream = self.subscribe(s).await?.map(
1087            |result: io::Result<schema::storage::ContractStorageBalances>| {
1088                let result: ContractBalance = result?.contract_storage_balances;
1089                Result::<_, io::Error>::Ok(result)
1090            },
1091        );
1092
1093        Ok(stream)
1094    }
1095
1096    /// Returns a stream of new blocks.
1097    #[cfg(feature = "subscriptions")]
1098    pub async fn new_blocks_subscription(
1099        &self,
1100    ) -> io::Result<
1101        impl Stream<
1102            Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1103        > + '_,
1104    > {
1105        use cynic::SubscriptionBuilder;
1106        let s = schema::block::NewBlocksSubscription::build(());
1107
1108        let stream = self.subscribe(s).await?.map(
1109            |r: io::Result<schema::block::NewBlocksSubscription>| {
1110                let result: fuel_core_types::services::block_importer::ImportResult =
1111                    postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1112                        io::Error::new(
1113                            io::ErrorKind::Other,
1114                            format!("Failed to deserialize ImportResult: {e:?}"),
1115                        )
1116                    })?;
1117                Result::<_, io::Error>::Ok(result)
1118            },
1119        );
1120
1121        Ok(stream)
1122    }
1123
1124    /// Returns a stream of preconfirmations for all transactions.
1125    #[cfg(feature = "subscriptions")]
1126    pub async fn preconfirmations_subscription(
1127        &self,
1128    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1129        use cynic::SubscriptionBuilder;
1130        let s = schema::tx::PreconfirmationsSubscription::build(());
1131
1132        let stream = self.subscribe(s).await?.map(
1133            |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1134                let status: TransactionStatus = r?.preconfirmations.try_into()?;
1135                Result::<_, io::Error>::Ok(status)
1136            },
1137        );
1138
1139        Ok(stream)
1140    }
1141
1142    pub async fn contract_slots_values(
1143        &self,
1144        contract_id: &ContractId,
1145        block_height: Option<BlockHeight>,
1146        requested_storage_slots: Vec<Bytes32>,
1147    ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1148        let query = schema::storage::ContractSlotValues::build(
1149            schema::storage::ContractSlotValuesArgs {
1150                contract_id: (*contract_id).into(),
1151                block_height: block_height.map(|b| (*b).into()),
1152                storage_slots: requested_storage_slots
1153                    .into_iter()
1154                    .map(Into::into)
1155                    .collect(),
1156            },
1157        );
1158
1159        self.query(query)
1160            .await
1161            .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1162    }
1163
1164    pub async fn contract_balance_values(
1165        &self,
1166        contract_id: &ContractId,
1167        block_height: Option<BlockHeight>,
1168        requested_storage_slots: Vec<AssetId>,
1169    ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1170        let query = schema::storage::ContractBalanceValues::build(
1171            schema::storage::ContractBalanceValuesArgs {
1172                contract_id: (*contract_id).into(),
1173                block_height: block_height.map(|b| (*b).into()),
1174                assets: requested_storage_slots
1175                    .into_iter()
1176                    .map(Into::into)
1177                    .collect(),
1178            },
1179        );
1180
1181        self.query(query)
1182            .await
1183            .map(|r| r.contract_balance_values.into_iter().collect())
1184    }
1185
1186    pub async fn start_session(&self) -> io::Result<String> {
1187        let query = schema::StartSession::build(());
1188
1189        self.query(query)
1190            .await
1191            .map(|r| r.start_session.into_inner())
1192    }
1193
1194    pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1195        let query = schema::EndSession::build(IdArg { id: id.into() });
1196
1197        self.query(query).await.map(|r| r.end_session)
1198    }
1199
1200    pub async fn reset(&self, id: &str) -> io::Result<bool> {
1201        let query = schema::Reset::build(IdArg { id: id.into() });
1202
1203        self.query(query).await.map(|r| r.reset)
1204    }
1205
1206    pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1207        let op = serde_json::to_string(op)?;
1208        let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1209
1210        self.query(query).await.map(|r| r.execute)
1211    }
1212
1213    pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1214        let query = schema::Register::build(RegisterArgs {
1215            id: id.into(),
1216            register: register.into(),
1217        });
1218
1219        Ok(self.query(query).await?.register.0 as Word)
1220    }
1221
1222    pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1223        let query = schema::Memory::build(MemoryArgs {
1224            id: id.into(),
1225            start: start.into(),
1226            size: size.into(),
1227        });
1228
1229        let memory = self.query(query).await?.memory;
1230
1231        Ok(serde_json::from_str(memory.as_str())?)
1232    }
1233
1234    pub async fn set_breakpoint(
1235        &self,
1236        session_id: &str,
1237        contract: fuel_types::ContractId,
1238        pc: u64,
1239    ) -> io::Result<()> {
1240        let operation = SetBreakpoint::build(SetBreakpointArgs {
1241            id: Id::new(session_id),
1242            bp: schema::Breakpoint {
1243                contract: contract.into(),
1244                pc: U64(pc),
1245            },
1246        });
1247
1248        let response = self.query(operation).await?;
1249        assert!(
1250            response.set_breakpoint,
1251            "Setting breakpoint returned invalid reply"
1252        );
1253        Ok(())
1254    }
1255
1256    pub async fn set_single_stepping(
1257        &self,
1258        session_id: &str,
1259        enable: bool,
1260    ) -> io::Result<()> {
1261        let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1262            id: Id::new(session_id),
1263            enable,
1264        });
1265        self.query(operation).await?;
1266        Ok(())
1267    }
1268
1269    pub async fn start_tx(
1270        &self,
1271        session_id: &str,
1272        tx: &Transaction,
1273    ) -> io::Result<RunResult> {
1274        let operation = StartTx::build(StartTxArgs {
1275            id: Id::new(session_id),
1276            tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1277        });
1278        let response = self.query(operation).await?.start_tx;
1279        Ok(response)
1280    }
1281
1282    pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1283        let operation = ContinueTx::build(ContinueTxArgs {
1284            id: Id::new(session_id),
1285        });
1286        let response = self.query(operation).await?.continue_tx;
1287        Ok(response)
1288    }
1289
1290    pub async fn transaction(
1291        &self,
1292        id: &TxId,
1293    ) -> io::Result<Option<TransactionResponse>> {
1294        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1295
1296        let transaction = self.query(query).await?.transaction;
1297
1298        Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1299    }
1300
1301    /// Get the status of a transaction
1302    pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1303        let query =
1304            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1305
1306        let status = self.query(query).await?.transaction.ok_or_else(|| {
1307            io::Error::new(
1308                ErrorKind::NotFound,
1309                format!("status not found for transaction {id} "),
1310            )
1311        })?;
1312
1313        let status = status
1314            .status
1315            .ok_or_else(|| {
1316                io::Error::new(
1317                    ErrorKind::NotFound,
1318                    format!("status not found for transaction {id}"),
1319                )
1320            })?
1321            .try_into()?;
1322        Ok(status)
1323    }
1324
1325    #[tracing::instrument(skip(self), level = "debug")]
1326    #[cfg(feature = "subscriptions")]
1327    /// Similar to [`Self::subscribe_transaction_status_opt`], but with default options.
1328    pub async fn subscribe_transaction_status(
1329        &self,
1330        id: &TxId,
1331    ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1332        self.subscribe_transaction_status_opt(id, None).await
1333    }
1334
1335    #[cfg(feature = "subscriptions")]
1336    /// Subscribe to the status of a transaction
1337    pub async fn subscribe_transaction_status_opt(
1338        &self,
1339        id: &TxId,
1340        include_preconfirmation: Option<bool>,
1341    ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1342        use cynic::SubscriptionBuilder;
1343        use schema::tx::StatusChangeSubscriptionArgs;
1344        let tx_id: TransactionId = (*id).into();
1345        let s =
1346            schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
1347                id: tx_id,
1348                include_preconfirmation,
1349            });
1350
1351        tracing::debug!("subscribing");
1352        let stream = self.subscribe(s).await?.map(|tx| {
1353            tracing::debug!("received {tx:?}");
1354            let tx = tx?;
1355            let status = tx.status_change.try_into()?;
1356            Ok(status)
1357        });
1358
1359        Ok(stream)
1360    }
1361
1362    #[cfg(feature = "subscriptions")]
1363    /// Awaits for the transaction to be committed into a block
1364    ///
1365    /// This will wait forever if needed, so consider wrapping this call
1366    /// with a `tokio::time::timeout`.
1367    pub async fn await_transaction_commit(
1368        &self,
1369        id: &TxId,
1370    ) -> io::Result<TransactionStatus> {
1371        // skip until we've reached a final status and then stop consuming the stream
1372        // to avoid an EOF which the eventsource client considers as an error.
1373        let status_result = self
1374            .subscribe_transaction_status(id)
1375            .await?
1376            .skip_while(|status| {
1377                future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1378            })
1379            .next()
1380            .await;
1381
1382        if let Some(Ok(status)) = status_result {
1383            Ok(status)
1384        } else {
1385            Err(io::Error::new(
1386                io::ErrorKind::Other,
1387                format!("Failed to get status for transaction {status_result:?}"),
1388            ))
1389        }
1390    }
1391
1392    /// returns a paginated set of transactions sorted by block height
1393    pub async fn transactions(
1394        &self,
1395        request: PaginationRequest<String>,
1396    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1397        let args = schema::ConnectionArgs::from(request);
1398        let query = schema::tx::TransactionsQuery::build(args);
1399        let transactions = self.query(query).await?.transactions.try_into()?;
1400        Ok(transactions)
1401    }
1402
1403    /// Returns a paginated set of transactions associated with a txo owner address.
1404    pub async fn transactions_by_owner(
1405        &self,
1406        owner: &Address,
1407        request: PaginationRequest<String>,
1408    ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1409        let owner: schema::Address = (*owner).into();
1410        let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1411        let query = schema::tx::TransactionsByOwnerQuery::build(args);
1412
1413        let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1414        Ok(transactions)
1415    }
1416
1417    pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1418        let query =
1419            schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1420
1421        let tx = self.query(query).await?.transaction.ok_or_else(|| {
1422            io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1423        })?;
1424
1425        let receipts = match tx.status {
1426            Some(status) => match status {
1427                schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1428                    s.receipts
1429                        .into_iter()
1430                        .map(TryInto::<Receipt>::try_into)
1431                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1432                )
1433                .transpose()?,
1434                schema::tx::TransactionStatus::FailureStatus(s) => Some(
1435                    s.receipts
1436                        .into_iter()
1437                        .map(TryInto::<Receipt>::try_into)
1438                        .collect::<Result<Vec<Receipt>, ConversionError>>(),
1439                )
1440                .transpose()?,
1441                _ => None,
1442            },
1443            _ => None,
1444        };
1445
1446        Ok(receipts)
1447    }
1448
1449    #[cfg(feature = "test-helpers")]
1450    pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1451        let query = schema::tx::AllReceipts::build(());
1452        let receipts = self.query(query).await?.all_receipts;
1453
1454        let vec: Result<Vec<Receipt>, ConversionError> = receipts
1455            .into_iter()
1456            .map(TryInto::<Receipt>::try_into)
1457            .collect();
1458
1459        Ok(vec?)
1460    }
1461
1462    pub async fn produce_blocks(
1463        &self,
1464        blocks_to_produce: u32,
1465        start_timestamp: Option<u64>,
1466    ) -> io::Result<BlockHeight> {
1467        let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1468            blocks_to_produce: blocks_to_produce.into(),
1469            start_timestamp: start_timestamp
1470                .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1471        });
1472
1473        let new_height = self.query(query).await?.produce_blocks;
1474
1475        Ok(new_height.into())
1476    }
1477
1478    pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1479        let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1480            id: Some((*id).into()),
1481        });
1482
1483        let block = self
1484            .query(query)
1485            .await?
1486            .block
1487            .map(TryInto::try_into)
1488            .transpose()?;
1489
1490        Ok(block)
1491    }
1492
1493    pub async fn block_by_height(
1494        &self,
1495        height: BlockHeight,
1496    ) -> io::Result<Option<types::Block>> {
1497        let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1498            height: Some(U32(height.into())),
1499        });
1500
1501        let block = self
1502            .query(query)
1503            .await?
1504            .block
1505            .map(TryInto::try_into)
1506            .transpose()?;
1507
1508        Ok(block)
1509    }
1510
1511    pub async fn da_compressed_block(
1512        &self,
1513        height: BlockHeight,
1514    ) -> io::Result<Option<Vec<u8>>> {
1515        let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1516            DaCompressedBlockByHeightArgs {
1517                height: U32(height.into()),
1518            },
1519        );
1520
1521        Ok(self
1522            .query(query)
1523            .await?
1524            .da_compressed_block
1525            .map(|b| b.bytes.into()))
1526    }
1527
1528    /// Retrieve a blob by its ID
1529    pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1530        let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1531        let blob = self.query(query).await?.blob.map(Into::into);
1532        Ok(blob)
1533    }
1534
1535    /// Check whether a blob with ID exists
1536    pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1537        let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1538        Ok(self.query(query).await?.blob.is_some())
1539    }
1540
1541    /// Retrieve multiple blocks
1542    pub async fn blocks(
1543        &self,
1544        request: PaginationRequest<String>,
1545    ) -> io::Result<PaginatedResult<types::Block, String>> {
1546        let args = schema::ConnectionArgs::from(request);
1547        let query = schema::block::BlocksQuery::build(args);
1548
1549        let blocks = self.query(query).await?.blocks.try_into()?;
1550
1551        Ok(blocks)
1552    }
1553
1554    pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1555        let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1556            utxo_id: (*id).into(),
1557        });
1558        let coin = self.query(query).await?.coin.map(Into::into);
1559        Ok(coin)
1560    }
1561
1562    /// Retrieve a page of coins by their owner
1563    pub async fn coins(
1564        &self,
1565        owner: &Address,
1566        asset_id: Option<&AssetId>,
1567        request: PaginationRequest<String>,
1568    ) -> io::Result<PaginatedResult<types::Coin, String>> {
1569        let owner: schema::Address = (*owner).into();
1570        let asset_id = asset_id.map(|id| (*id).into());
1571        let args = CoinsConnectionArgs::from((owner, asset_id, request));
1572        let query = schema::coins::CoinsQuery::build(args);
1573
1574        let coins = self.query(query).await?.coins.into();
1575        Ok(coins)
1576    }
1577
1578    /// Retrieve coins to spend in a transaction
1579    pub async fn coins_to_spend(
1580        &self,
1581        owner: &Address,
1582        spend_query: Vec<(AssetId, u128, Option<u16>)>,
1583        // (Utxos, Messages Nonce)
1584        excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1585    ) -> io::Result<Vec<Vec<types::CoinType>>> {
1586        let owner: schema::Address = (*owner).into();
1587        let spend_query: Vec<SpendQueryElementInput> = spend_query
1588            .iter()
1589            .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1590                Ok(SpendQueryElementInput {
1591                    asset_id: (*asset_id).into(),
1592                    amount: (*amount).into(),
1593                    max: (*max).map(|max| max.into()),
1594                })
1595            })
1596            .try_collect()?;
1597        let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1598        let args =
1599            schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1600        let query = schema::coins::CoinsToSpendQuery::build(args);
1601
1602        let coins_per_asset = self
1603            .query(query)
1604            .await?
1605            .coins_to_spend
1606            .into_iter()
1607            .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1608            .collect::<Vec<_>>();
1609        Ok(coins_per_asset)
1610    }
1611
1612    pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1613        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1614            id: (*id).into(),
1615        });
1616        let contract = self.query(query).await?.contract.map(Into::into);
1617        Ok(contract)
1618    }
1619
1620    pub async fn contract_balance(
1621        &self,
1622        id: &ContractId,
1623        asset: Option<&AssetId>,
1624    ) -> io::Result<u64> {
1625        let asset_id: schema::AssetId = match asset {
1626            Some(asset) => (*asset).into(),
1627            None => schema::AssetId::default(),
1628        };
1629
1630        let query =
1631            schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1632                id: (*id).into(),
1633                asset: asset_id,
1634            });
1635
1636        let balance: types::ContractBalance =
1637            self.query(query).await?.contract_balance.into();
1638        Ok(balance.amount)
1639    }
1640
1641    pub async fn balance(
1642        &self,
1643        owner: &Address,
1644        asset_id: Option<&AssetId>,
1645    ) -> io::Result<u128> {
1646        let owner: schema::Address = (*owner).into();
1647        let asset_id: schema::AssetId = match asset_id {
1648            Some(asset_id) => (*asset_id).into(),
1649            None => schema::AssetId::default(),
1650        };
1651        let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1652        let balance: types::Balance = self.query(query).await?.balance.into();
1653        Ok(balance.amount)
1654    }
1655
1656    // Retrieve a page of balances by their owner
1657    pub async fn balances(
1658        &self,
1659        owner: &Address,
1660        request: PaginationRequest<String>,
1661    ) -> io::Result<PaginatedResult<types::Balance, String>> {
1662        let owner: schema::Address = (*owner).into();
1663        let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1664        let query = schema::balance::BalancesQuery::build(args);
1665
1666        let balances = self.query(query).await?.balances.into();
1667        Ok(balances)
1668    }
1669
1670    pub async fn contract_balances(
1671        &self,
1672        contract: &ContractId,
1673        request: PaginationRequest<String>,
1674    ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1675        let contract_id: schema::ContractId = (*contract).into();
1676        let args = ContractBalancesConnectionArgs::from((contract_id, request));
1677        let query = schema::contract::ContractBalancesQuery::build(args);
1678
1679        let balances = self.query(query).await?.contract_balances.into();
1680
1681        Ok(balances)
1682    }
1683
1684    // Retrieve a message by its nonce
1685    pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1686        let query = schema::message::MessageQuery::build(NonceArgs {
1687            nonce: (*nonce).into(),
1688        });
1689        let message = self.query(query).await?.message.map(Into::into);
1690        Ok(message)
1691    }
1692
1693    pub async fn messages(
1694        &self,
1695        owner: Option<&Address>,
1696        request: PaginationRequest<String>,
1697    ) -> io::Result<PaginatedResult<types::Message, String>> {
1698        let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1699        let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1700        let query = schema::message::OwnedMessageQuery::build(args);
1701
1702        let messages = self.query(query).await?.messages.into();
1703
1704        Ok(messages)
1705    }
1706
1707    pub async fn contract_info(
1708        &self,
1709        contract: &ContractId,
1710    ) -> io::Result<Option<types::Contract>> {
1711        let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1712            id: (*contract).into(),
1713        });
1714        let contract_info = self.query(query).await?.contract.map(Into::into);
1715        Ok(contract_info)
1716    }
1717
1718    pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1719        let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1720            nonce: (*nonce).into(),
1721        });
1722        let status = self.query(query).await?.message_status.into();
1723
1724        Ok(status)
1725    }
1726
1727    /// Request a merkle proof of an output message.
1728    pub async fn message_proof(
1729        &self,
1730        transaction_id: &TxId,
1731        nonce: &Nonce,
1732        commit_block_id: Option<&BlockId>,
1733        commit_block_height: Option<BlockHeight>,
1734    ) -> io::Result<types::MessageProof> {
1735        let transaction_id: TransactionId = (*transaction_id).into();
1736        let nonce: schema::Nonce = (*nonce).into();
1737        let commit_block_id: Option<schema::BlockId> =
1738            commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1739        let commit_block_height = commit_block_height.map(Into::into);
1740        let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1741            transaction_id,
1742            nonce,
1743            commit_block_id,
1744            commit_block_height,
1745        });
1746        let proof = self.query(query).await?.message_proof.try_into()?;
1747        Ok(proof)
1748    }
1749
1750    pub async fn relayed_transaction_status(
1751        &self,
1752        id: &Bytes32,
1753    ) -> io::Result<Option<RelayedTransactionStatus>> {
1754        let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1755            RelayedTransactionStatusArgs {
1756                id: id.to_owned().into(),
1757            },
1758        );
1759        let status = self
1760            .query(query)
1761            .await?
1762            .relayed_transaction_status
1763            .map(|status| status.try_into())
1764            .transpose()?;
1765        Ok(status)
1766    }
1767
1768    pub async fn asset_info(
1769        &self,
1770        asset_id: &AssetId,
1771    ) -> io::Result<Option<AssetDetail>> {
1772        let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1773            id: (*asset_id).into(),
1774        });
1775        let asset_info = self.query(query).await?.asset_details.map(Into::into);
1776        Ok(asset_info)
1777    }
1778}
1779
1780#[cfg(any(test, feature = "test-helpers"))]
1781impl FuelClient {
1782    pub async fn transparent_transaction(
1783        &self,
1784        id: &TxId,
1785    ) -> io::Result<Option<types::TransactionType>> {
1786        let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1787
1788        let transaction = self.query(query).await?.transaction;
1789
1790        Ok(transaction
1791            .map(|tx| {
1792                let response: TransactionResponse = tx.try_into()?;
1793                Ok::<_, ConversionError>(response.transaction)
1794            })
1795            .transpose()?)
1796    }
1797}