1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4 client::{
5 schema::{
6 Tai64Timestamp,
7 TransactionId,
8 block::BlockByHeightArgs,
9 coins::{
10 ExcludeInput,
11 SpendQueryElementInput,
12 },
13 contract::ContractBalanceQueryArgs,
14 gas_price::EstimateGasPrice,
15 message::MessageStatusArgs,
16 relayed_tx::RelayedTransactionStatusArgs,
17 tx::{
18 DryRunArg,
19 TxWithEstimatedPredicatesArg,
20 },
21 },
22 types::{
23 RelayedTransactionStatus,
24 asset::AssetDetail,
25 gas_price::LatestGasPrice,
26 message::MessageStatus,
27 primitives::{
28 Address,
29 AssetId,
30 BlockId,
31 ContractId,
32 UtxoId,
33 },
34 upgrades::StateTransitionBytecode,
35 },
36 },
37 reqwest_ext::{
38 FuelGraphQlResponse,
39 FuelOperation,
40 ReqwestExt,
41 },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46 BASE64_STANDARD,
47 Engine as _,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52 Id,
53 MutationBuilder,
54 Operation,
55 QueryBuilder,
56};
57use fuel_core_types::{
58 blockchain::header::{
59 ConsensusParametersVersion,
60 StateTransitionBytecodeVersion,
61 },
62 fuel_asm::{
63 Instruction,
64 Word,
65 },
66 fuel_tx::{
67 BlobId,
68 Bytes32,
69 ConsensusParameters,
70 Receipt,
71 Transaction,
72 TxId,
73 },
74 fuel_types::{
75 self,
76 BlockHeight,
77 Nonce,
78 canonical::Serialize,
79 },
80 services::executor::{
81 StorageReadReplayEvent,
82 TransactionExecutionStatus,
83 },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87 Stream,
88 StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92 PageDirection,
93 PaginatedResult,
94 PaginationRequest,
95};
96use schema::{
97 Bytes,
98 ContinueTx,
99 ContinueTxArgs,
100 ConversionError,
101 HexString,
102 IdArg,
103 MemoryArgs,
104 RegisterArgs,
105 RunResult,
106 SetBreakpoint,
107 SetBreakpointArgs,
108 SetSingleStepping,
109 SetSingleSteppingArgs,
110 StartTx,
111 StartTxArgs,
112 U32,
113 U64,
114 assets::AssetInfoArg,
115 balance::BalanceArgs,
116 blob::BlobByIdArgs,
117 block::BlockByIdArgs,
118 coins::{
119 CoinByIdArgs,
120 CoinsConnectionArgs,
121 },
122 contract::{
123 ContractBalancesConnectionArgs,
124 ContractByIdArgs,
125 },
126 da_compressed::DaCompressedBlockByHeightArgs,
127 gas_price::BlockHorizonArgs,
128 storage_read_replay::{
129 StorageReadReplay,
130 StorageReadReplayArgs,
131 },
132 tx::{
133 AssembleTxArg,
134 TransactionsByOwnerConnectionArgs,
135 TxArg,
136 TxIdArgs,
137 },
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142 convert::TryInto,
143 io::{
144 self,
145 ErrorKind,
146 },
147 net,
148 str::{
149 self,
150 FromStr,
151 },
152 sync::{
153 Arc,
154 Mutex,
155 },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160 TransactionResponse,
161 TransactionStatus,
162 assemble_tx::{
163 AssembleTransactionResult,
164 RequiredBalance,
165 },
166};
167
168use self::schema::{
169 block::ProduceBlockArgs,
170 message::{
171 MessageProofArgs,
172 NonceArgs,
173 },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184pub enum Error {
188 #[from]
190 Other(anyhow::Error),
191}
192
193#[derive(Debug)]
196pub enum ConsistencyPolicy {
197 Auto {
201 height: Arc<Mutex<Option<BlockHeight>>>,
203 },
204 Manual {
207 height: Option<BlockHeight>,
209 },
210}
211
212impl Clone for ConsistencyPolicy {
213 fn clone(&self) -> Self {
214 match self {
215 Self::Auto { height } => Self::Auto {
216 height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219 },
220 Self::Manual { height } => Self::Manual { height: *height },
221 }
222 }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227 current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228 current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232 fn clone(&self) -> Self {
233 Self {
234 current_stf_version: Arc::new(Mutex::new(
235 self.current_stf_version.lock().ok().and_then(|v| *v),
236 )),
237 current_consensus_parameters_version: Arc::new(Mutex::new(
238 self.current_consensus_parameters_version
239 .lock()
240 .ok()
241 .and_then(|v| *v),
242 )),
243 }
244 }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249 client: reqwest::Client,
250 #[cfg(feature = "subscriptions")]
251 cookie: std::sync::Arc<reqwest::cookie::Jar>,
252 url: reqwest::Url,
253 require_height: ConsistencyPolicy,
254 chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258 type Err = anyhow::Error;
259
260 fn from_str(str: &str) -> Result<Self, Self::Err> {
261 let mut raw_url = str.to_string();
262 if !raw_url.starts_with("http") {
263 raw_url = format!("http://{raw_url}");
264 }
265
266 let mut url = reqwest::Url::parse(&raw_url)
267 .map_err(anyhow::Error::msg)
268 .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269 url.set_path("/v1/graphql");
270
271 #[cfg(feature = "subscriptions")]
272 {
273 let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274 let client = reqwest::Client::builder()
275 .cookie_provider(cookie.clone())
276 .build()
277 .map_err(anyhow::Error::msg)?;
278 Ok(Self {
279 client,
280 cookie,
281 url,
282 require_height: ConsistencyPolicy::Auto {
283 height: Arc::new(Mutex::new(None)),
284 },
285 chain_state_info: Default::default(),
286 })
287 }
288
289 #[cfg(not(feature = "subscriptions"))]
290 {
291 let client = reqwest::Client::new();
292 Ok(Self {
293 client,
294 url,
295 require_height: ConsistencyPolicy::Auto {
296 height: Arc::new(Mutex::new(None)),
297 },
298 chain_state_info: Default::default(),
299 })
300 }
301 }
302}
303
304impl<S> From<S> for FuelClient
305where
306 S: Into<net::SocketAddr>,
307{
308 fn from(socket: S) -> Self {
309 format!("http://{}", socket.into())
310 .as_str()
311 .parse()
312 .unwrap()
313 }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317 let e = errors
318 .into_iter()
319 .fold(String::from("Response errors"), |mut s, e| {
320 s.push_str("; ");
321 s.push_str(e.as_str());
322 s
323 });
324 io::Error::other(e)
325}
326
327impl FuelClient {
328 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329 Self::from_str(url.as_ref())
330 }
331
332 pub fn with_required_fuel_block_height(
333 &mut self,
334 new_height: Option<BlockHeight>,
335 ) -> &mut Self {
336 match &mut self.require_height {
337 ConsistencyPolicy::Auto { height } => {
338 *height.lock().expect("Mutex poisoned") = new_height;
339 }
340 ConsistencyPolicy::Manual { height } => {
341 *height = new_height;
342 }
343 }
344 self
345 }
346
347 pub fn use_manual_consistency_policy(
348 &mut self,
349 height: Option<BlockHeight>,
350 ) -> &mut Self {
351 self.require_height = ConsistencyPolicy::Manual { height };
352 self
353 }
354
355 pub fn required_block_height(&self) -> Option<BlockHeight> {
356 match &self.require_height {
357 ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358 ConsistencyPolicy::Manual { height } => *height,
359 }
360 }
361
362 fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363 if let Some(current_sft_version) = response
364 .extensions
365 .as_ref()
366 .and_then(|e| e.current_stf_version)
367 && let Ok(mut c) = self.chain_state_info.current_stf_version.lock()
368 {
369 *c = Some(current_sft_version);
370 }
371
372 if let Some(current_consensus_parameters_version) = response
373 .extensions
374 .as_ref()
375 .and_then(|e| e.current_consensus_parameters_version)
376 && let Ok(mut c) = self
377 .chain_state_info
378 .current_consensus_parameters_version
379 .lock()
380 {
381 *c = Some(current_consensus_parameters_version);
382 }
383
384 let inner_required_height = match &self.require_height {
385 ConsistencyPolicy::Auto { height } => Some(height.clone()),
386 ConsistencyPolicy::Manual { .. } => None,
387 };
388
389 if let Some(inner_required_height) = inner_required_height
390 && let Some(current_fuel_block_height) = response
391 .extensions
392 .as_ref()
393 .and_then(|e| e.current_fuel_block_height)
394 {
395 let mut lock = inner_required_height.lock().expect("Mutex poisoned");
396
397 if current_fuel_block_height >= lock.unwrap_or_default() {
398 *lock = Some(current_fuel_block_height);
399 }
400 }
401 }
402
403 pub async fn query<ResponseData, Vars>(
405 &self,
406 q: Operation<ResponseData, Vars>,
407 ) -> io::Result<ResponseData>
408 where
409 Vars: serde::Serialize,
410 ResponseData: serde::de::DeserializeOwned + 'static,
411 {
412 let required_fuel_block_height = self.required_block_height();
413 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
414 let response = self
415 .client
416 .post(self.url.clone())
417 .run_fuel_graphql(fuel_operation)
418 .await
419 .map_err(io::Error::other)?;
420
421 self.decode_response(response)
422 }
423
424 fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
425 where
426 R: serde::de::DeserializeOwned + 'static,
427 {
428 self.update_chain_state_info(&response);
429
430 if response
431 .extensions
432 .as_ref()
433 .and_then(|e| e.fuel_block_height_precondition_failed)
434 == Some(true)
435 {
436 return Err(io::Error::other("The required block height was not met"));
437 }
438
439 let response = response.response;
440
441 match (response.data, response.errors) {
442 (Some(d), _) => Ok(d),
443 (_, Some(e)) => Err(from_strings_errors_to_std_error(
444 e.into_iter().map(|e| e.message).collect(),
445 )),
446 _ => Err(io::Error::other("Invalid response")),
447 }
448 }
449
450 #[tracing::instrument(skip_all)]
451 #[cfg(feature = "subscriptions")]
452 async fn subscribe<ResponseData, Vars>(
453 &self,
454 q: StreamingOperation<ResponseData, Vars>,
455 ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
456 where
457 Vars: serde::Serialize,
458 ResponseData: serde::de::DeserializeOwned + 'static + Send,
459 {
460 use core::ops::Deref;
461 use eventsource_client as es;
462 use hyper_rustls as _;
463 use reqwest::cookie::CookieStore;
464 let mut url = self.url.clone();
465 url.set_path("/v1/graphql-sub");
466
467 let required_fuel_block_height = self.required_block_height();
468 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
469
470 let json_query = serde_json::to_string(&fuel_operation)?;
471 let mut client_builder = es::ClientBuilder::for_url(url.as_str())
472 .map_err(|e| io::Error::other(format!("Failed to start client {e:?}")))?
473 .body(json_query)
474 .method("POST".to_string())
475 .header("content-type", "application/json")
476 .map_err(|e| {
477 io::Error::other(format!("Failed to add header to client {e:?}"))
478 })?;
479 if let Some(password) = url.password() {
480 let username = url.username();
481 let credentials = format!("{}:{}", username, password);
482 let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
483 client_builder = client_builder
484 .header("Authorization", &authorization)
485 .map_err(|e| {
486 io::Error::other(format!("Failed to add header to client {e:?}"))
487 })?;
488 }
489
490 if let Some(value) = self.cookie.deref().cookies(&self.url) {
491 let value = value.to_str().map_err(|e| {
492 io::Error::other(format!("Unable convert header value to string {e:?}"))
493 })?;
494 client_builder = client_builder
495 .header(reqwest::header::COOKIE.as_str(), value)
496 .map_err(|e| {
497 io::Error::other(format!(
498 "Failed to add header from `reqwest` to client {e:?}"
499 ))
500 })?;
501 }
502
503 let client = client_builder.build_with_conn(
504 hyper_rustls::HttpsConnectorBuilder::new()
505 .with_webpki_roots()
506 .https_or_http()
507 .enable_http1()
508 .build(),
509 );
510
511 let mut last = None;
512
513 enum Event<ResponseData> {
514 Connected,
515 ResponseData(ResponseData),
516 }
517
518 let mut init_stream = es::Client::stream(&client)
519 .take_while(|result| {
520 futures::future::ready(!matches!(result, Err(es::Error::Eof)))
521 })
522 .filter_map(move |result| {
523 tracing::debug!("Got result: {result:?}");
524 let r = match result {
525 Ok(es::SSE::Event(es::Event { data, .. })) => {
526 match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
527 &data,
528 ) {
529 Ok(resp) => {
530 match self.decode_response(resp) {
531 Ok(resp) => {
532 match last.replace(data) {
533 Some(l)
535 if l == *last.as_ref().expect(
536 "Safe because of the replace above",
537 ) =>
538 {
539 None
540 }
541 _ => Some(Ok(Event::ResponseData(resp))),
542 }
543 }
544 Err(e) => Some(Err(io::Error::other(format!(
545 "Decode error: {e:?}"
546 )))),
547 }
548 }
549 Err(e) => {
550 Some(Err(io::Error::other(format!("Json error: {e:?}"))))
551 }
552 }
553 }
554 Ok(es::SSE::Connected(_)) => Some(Ok(Event::Connected)),
555 Ok(_) => None,
556 Err(e) => {
557 Some(Err(io::Error::other(format!("Graphql error: {e:?}"))))
558 }
559 };
560 futures::future::ready(r)
561 });
562
563 let event = init_stream.next().await;
564 let stream_with_resp = init_stream.filter_map(|result| async move {
565 match result {
566 Ok(Event::Connected) => None,
567 Ok(Event::ResponseData(resp)) => Some(Ok(resp)),
568 Err(error) => Some(Err(error)),
569 }
570 });
571
572 let stream = match event {
573 Some(Ok(Event::Connected)) => {
574 tracing::debug!("Subscription connected");
575 stream_with_resp.boxed()
576 }
577 Some(Ok(Event::ResponseData(resp))) => {
578 tracing::debug!("Subscription returned response");
579 let joined_stream = futures::stream::once(async move { Ok(resp) })
580 .chain(stream_with_resp);
581 joined_stream.boxed()
582 }
583 Some(Err(e)) => return Err(e),
584 None => {
585 return Err(io::Error::other("Subscription stream ended unexpectedly"));
586 }
587 };
588
589 Ok(stream)
590 }
591
592 pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
593 self.chain_state_info
594 .current_stf_version
595 .lock()
596 .ok()
597 .and_then(|value| *value)
598 }
599
600 pub fn latest_consensus_parameters_version(
601 &self,
602 ) -> Option<ConsensusParametersVersion> {
603 self.chain_state_info
604 .current_consensus_parameters_version
605 .lock()
606 .ok()
607 .and_then(|value| *value)
608 }
609
610 pub async fn health(&self) -> io::Result<bool> {
611 let query = schema::Health::build(());
612 self.query(query).await.map(|r| r.health)
613 }
614
615 pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
616 let query = schema::node_info::QueryNodeInfo::build(());
617 self.query(query).await.map(|r| r.node_info.into())
618 }
619
620 pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
621 let query = schema::gas_price::QueryLatestGasPrice::build(());
622 self.query(query).await.map(|r| r.latest_gas_price.into())
623 }
624
625 pub async fn estimate_gas_price(
626 &self,
627 block_horizon: u32,
628 ) -> io::Result<EstimateGasPrice> {
629 let args = BlockHorizonArgs {
630 block_horizon: Some(block_horizon.into()),
631 };
632 let query = schema::gas_price::QueryEstimateGasPrice::build(args);
633 self.query(query).await.map(|r| r.estimate_gas_price)
634 }
635
636 #[cfg(feature = "std")]
637 pub async fn connected_peers_info(
638 &self,
639 ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
640 let query = schema::node_info::QueryPeersInfo::build(());
641 self.query(query)
642 .await
643 .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
644 }
645
646 pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
647 let query = schema::chain::ChainQuery::build(());
648 self.query(query).await.and_then(|r| {
649 let result = r.chain.try_into()?;
650 Ok(result)
651 })
652 }
653
654 pub async fn consensus_parameters(
655 &self,
656 version: i32,
657 ) -> io::Result<Option<ConsensusParameters>> {
658 let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
659 let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
660
661 let result = self
662 .query(query)
663 .await?
664 .consensus_parameters
665 .map(TryInto::try_into)
666 .transpose()?;
667
668 Ok(result)
669 }
670
671 pub async fn state_transition_byte_code_by_version(
672 &self,
673 version: i32,
674 ) -> io::Result<Option<StateTransitionBytecode>> {
675 let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
676 let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
677
678 let result = self
679 .query(query)
680 .await?
681 .state_transition_bytecode_by_version
682 .map(TryInto::try_into)
683 .transpose()?;
684
685 Ok(result)
686 }
687
688 pub async fn state_transition_byte_code_by_root(
689 &self,
690 root: Bytes32,
691 ) -> io::Result<Option<StateTransitionBytecode>> {
692 let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
693 root: HexString(Bytes(root.to_vec())),
694 };
695 let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
696
697 let result = self
698 .query(query)
699 .await?
700 .state_transition_bytecode_by_root
701 .map(TryInto::try_into)
702 .transpose()?;
703
704 Ok(result)
705 }
706
707 pub async fn dry_run(
709 &self,
710 txs: &[Transaction],
711 ) -> io::Result<Vec<TransactionExecutionStatus>> {
712 self.dry_run_opt(txs, None, None, None).await
713 }
714
715 pub async fn dry_run_opt(
717 &self,
718 txs: &[Transaction],
719 utxo_validation: Option<bool>,
721 gas_price: Option<u64>,
722 at_height: Option<BlockHeight>,
723 ) -> io::Result<Vec<TransactionExecutionStatus>> {
724 let txs = txs
725 .iter()
726 .map(|tx| HexString(Bytes(tx.to_bytes())))
727 .collect::<Vec<HexString>>();
728 let query: Operation<schema::tx::DryRun, DryRunArg> =
729 schema::tx::DryRun::build(DryRunArg {
730 txs,
731 utxo_validation,
732 gas_price: gas_price.map(|gp| gp.into()),
733 block_height: at_height.map(|bh| bh.into()),
734 });
735 let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
736 tx_statuses
737 .into_iter()
738 .map(|tx_status| tx_status.try_into().map_err(Into::into))
739 .collect()
740 }
741
742 pub async fn dry_run_opt_record_storage_reads(
744 &self,
745 txs: &[Transaction],
746 utxo_validation: Option<bool>,
748 gas_price: Option<u64>,
749 at_height: Option<BlockHeight>,
750 ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
751 let txs = txs
752 .iter()
753 .map(|tx| HexString(Bytes(tx.to_bytes())))
754 .collect::<Vec<HexString>>();
755 let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
756 schema::tx::DryRunRecordStorageReads::build(DryRunArg {
757 txs,
758 utxo_validation,
759 gas_price: gas_price.map(|gp| gp.into()),
760 block_height: at_height.map(|bh| bh.into()),
761 });
762 let result = self
763 .query(query)
764 .await
765 .map(|r| r.dry_run_record_storage_reads)?;
766 let tx_statuses = result
767 .tx_statuses
768 .into_iter()
769 .map(|tx_status| tx_status.try_into().map_err(Into::into))
770 .collect::<io::Result<Vec<_>>>()?;
771 let storage_reads = result
772 .storage_reads
773 .into_iter()
774 .map(Into::into)
775 .collect::<Vec<_>>();
776 Ok((tx_statuses, storage_reads))
777 }
778
779 pub async fn storage_read_replay(
781 &self,
782 height: &BlockHeight,
783 ) -> io::Result<Vec<StorageReadReplayEvent>> {
784 let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
785 StorageReadReplay::build(StorageReadReplayArgs {
786 height: (*height).into(),
787 });
788 Ok(self
789 .query(query)
790 .await
791 .map(|r| r.storage_read_replay)?
792 .into_iter()
793 .map(Into::into)
794 .collect())
795 }
796
797 #[allow(clippy::too_many_arguments)]
820 pub async fn assemble_tx(
821 &self,
822 tx: &Transaction,
823 block_horizon: u32,
824 required_balances: Vec<RequiredBalance>,
825 fee_address_index: u16,
826 exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
827 estimate_predicates: bool,
828 reserve_gas: Option<u64>,
829 ) -> io::Result<AssembleTransactionResult> {
830 let tx = HexString(Bytes(tx.to_bytes()));
831 let block_horizon = block_horizon.into();
832
833 let required_balances: Vec<_> = required_balances
834 .into_iter()
835 .map(schema::tx::RequiredBalance::try_from)
836 .collect::<Result<Vec<_>, _>>()?;
837
838 let fee_address_index = fee_address_index.into();
839
840 let exclude_input = exclude.map(Into::into);
841
842 let reserve_gas = reserve_gas.map(U64::from);
843
844 let query_arg = AssembleTxArg {
845 tx,
846 block_horizon,
847 required_balances,
848 fee_address_index,
849 exclude_input,
850 estimate_predicates,
851 reserve_gas,
852 };
853
854 let query = schema::tx::AssembleTx::build(query_arg);
855 let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
856 Ok(assemble_tx_result.try_into()?)
857 }
858
859 pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
861 let serialized_tx = tx.to_bytes();
862 let query = schema::tx::EstimatePredicates::build(TxArg {
863 tx: HexString(Bytes(serialized_tx)),
864 });
865 let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
866 let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
867 *tx = tx_with_predicate;
868 Ok(())
869 }
870
871 pub async fn submit(
872 &self,
873 tx: &Transaction,
874 ) -> io::Result<types::primitives::TransactionId> {
875 self.submit_opt(tx, None).await
876 }
877
878 pub async fn submit_opt(
879 &self,
880 tx: &Transaction,
881 estimate_predicates: Option<bool>,
882 ) -> io::Result<types::primitives::TransactionId> {
883 let tx = tx.clone().to_bytes();
884 let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
885 tx: HexString(Bytes(tx)),
886 estimate_predicates,
887 });
888
889 let id = self.query(query).await.map(|r| r.submit)?.id.into();
890 Ok(id)
891 }
892
893 #[cfg(feature = "subscriptions")]
895 pub async fn submit_and_await_commit(
896 &self,
897 tx: &Transaction,
898 ) -> io::Result<TransactionStatus> {
899 self.submit_and_await_commit_opt(tx, None).await
900 }
901
902 #[cfg(feature = "subscriptions")]
911 pub async fn submit_and_await_commit_opt(
912 &self,
913 tx: &Transaction,
914 estimate_predicates: Option<bool>,
915 ) -> io::Result<TransactionStatus> {
916 use cynic::SubscriptionBuilder;
917 let tx = tx.clone().to_bytes();
918 let s =
919 schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
920 tx: HexString(Bytes(tx)),
921 estimate_predicates,
922 });
923
924 let mut stream = self.subscribe(s).await?.map(
925 |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
926 let status: TransactionStatus = r?.submit_and_await.try_into()?;
927 Result::<_, io::Error>::Ok(status)
928 },
929 );
930
931 let status = stream.next().await.ok_or_else(|| {
932 io::Error::other("Failed to get status from the submission")
933 })??;
934
935 Ok(status)
936 }
937
938 #[cfg(feature = "subscriptions")]
940 pub async fn submit_and_await_commit_with_tx(
941 &self,
942 tx: &Transaction,
943 ) -> io::Result<StatusWithTransaction> {
944 self.submit_and_await_commit_with_tx_opt(tx, None).await
945 }
946
947 #[cfg(feature = "subscriptions")]
949 pub async fn submit_and_await_commit_with_tx_opt(
950 &self,
951 tx: &Transaction,
952 estimate_predicates: Option<bool>,
953 ) -> io::Result<StatusWithTransaction> {
954 use cynic::SubscriptionBuilder;
955 let tx = tx.clone().to_bytes();
956 let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
957 TxWithEstimatedPredicatesArg {
958 tx: HexString(Bytes(tx)),
959 estimate_predicates,
960 },
961 );
962
963 let mut stream = self.subscribe(s).await?.map(
964 |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
965 let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
966 Result::<_, io::Error>::Ok(status)
967 },
968 );
969
970 let status = stream.next().await.ok_or_else(|| {
971 io::Error::other("Failed to get status from the submission")
972 })??;
973
974 Ok(status)
975 }
976
977 #[cfg(feature = "subscriptions")]
979 pub async fn submit_and_await_status(
980 &self,
981 tx: &Transaction,
982 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
983 self.submit_and_await_status_opt(tx, None, None).await
984 }
985
986 #[cfg(feature = "subscriptions")]
988 pub async fn submit_and_await_status_opt(
989 &self,
990 tx: &Transaction,
991 estimate_predicates: Option<bool>,
992 include_preconfirmation: Option<bool>,
993 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
994 use cynic::SubscriptionBuilder;
995 use schema::tx::SubmitAndAwaitStatusArg;
996 let tx = tx.clone().to_bytes();
997 let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
998 SubmitAndAwaitStatusArg {
999 tx: HexString(Bytes(tx)),
1000 estimate_predicates,
1001 include_preconfirmation,
1002 },
1003 );
1004
1005 let stream = self.subscribe(s).await?.map(
1006 |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1007 let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1008 Result::<_, io::Error>::Ok(status)
1009 },
1010 );
1011
1012 Ok(stream)
1013 }
1014
1015 #[cfg(feature = "subscriptions")]
1017 pub async fn contract_storage_slots(
1018 &self,
1019 contract_id: &ContractId,
1020 ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1021 use cynic::SubscriptionBuilder;
1022 use schema::storage::ContractStorageSlotsArgs;
1023 let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
1024 contract_id: (*contract_id).into(),
1025 });
1026
1027 let stream = self.subscribe(s).await?.map(
1028 |result: io::Result<schema::storage::ContractStorageSlots>| {
1029 let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1030 Result::<_, io::Error>::Ok(result)
1031 },
1032 );
1033
1034 Ok(stream)
1035 }
1036
1037 #[cfg(feature = "subscriptions")]
1039 pub async fn contract_storage_balances(
1040 &self,
1041 contract_id: &ContractId,
1042 ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1043 {
1044 use cynic::SubscriptionBuilder;
1045 use schema::{
1046 contract::ContractBalance,
1047 storage::ContractStorageBalancesArgs,
1048 };
1049 let s = schema::storage::ContractStorageBalances::build(
1050 ContractStorageBalancesArgs {
1051 contract_id: (*contract_id).into(),
1052 },
1053 );
1054
1055 let stream = self.subscribe(s).await?.map(
1056 |result: io::Result<schema::storage::ContractStorageBalances>| {
1057 let result: ContractBalance = result?.contract_storage_balances;
1058 Result::<_, io::Error>::Ok(result)
1059 },
1060 );
1061
1062 Ok(stream)
1063 }
1064
1065 #[cfg(feature = "subscriptions")]
1067 pub async fn new_blocks_subscription(
1068 &self,
1069 ) -> io::Result<
1070 impl Stream<
1071 Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1072 > + '_,
1073 > {
1074 use cynic::SubscriptionBuilder;
1075 let s = schema::block::NewBlocksSubscription::build(());
1076
1077 let stream = self.subscribe(s).await?.map(
1078 |r: io::Result<schema::block::NewBlocksSubscription>| {
1079 let result: fuel_core_types::services::block_importer::ImportResult =
1080 postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1081 io::Error::other(format!(
1082 "Failed to deserialize ImportResult: {e:?}"
1083 ))
1084 })?;
1085 Result::<_, io::Error>::Ok(result)
1086 },
1087 );
1088
1089 Ok(stream)
1090 }
1091
1092 #[cfg(feature = "subscriptions")]
1094 pub async fn preconfirmations_subscription(
1095 &self,
1096 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1097 use cynic::SubscriptionBuilder;
1098 let s = schema::tx::PreconfirmationsSubscription::build(());
1099
1100 let stream = self.subscribe(s).await?.map(
1101 |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1102 let status: TransactionStatus = r?.preconfirmations.try_into()?;
1103 Result::<_, io::Error>::Ok(status)
1104 },
1105 );
1106
1107 Ok(stream)
1108 }
1109
1110 pub async fn contract_slots_values(
1111 &self,
1112 contract_id: &ContractId,
1113 block_height: Option<BlockHeight>,
1114 requested_storage_slots: Vec<Bytes32>,
1115 ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1116 let query = schema::storage::ContractSlotValues::build(
1117 schema::storage::ContractSlotValuesArgs {
1118 contract_id: (*contract_id).into(),
1119 block_height: block_height.map(|b| (*b).into()),
1120 storage_slots: requested_storage_slots
1121 .into_iter()
1122 .map(Into::into)
1123 .collect(),
1124 },
1125 );
1126
1127 self.query(query)
1128 .await
1129 .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1130 }
1131
1132 pub async fn contract_balance_values(
1133 &self,
1134 contract_id: &ContractId,
1135 block_height: Option<BlockHeight>,
1136 requested_storage_slots: Vec<AssetId>,
1137 ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1138 let query = schema::storage::ContractBalanceValues::build(
1139 schema::storage::ContractBalanceValuesArgs {
1140 contract_id: (*contract_id).into(),
1141 block_height: block_height.map(|b| (*b).into()),
1142 assets: requested_storage_slots
1143 .into_iter()
1144 .map(Into::into)
1145 .collect(),
1146 },
1147 );
1148
1149 self.query(query)
1150 .await
1151 .map(|r| r.contract_balance_values.into_iter().collect())
1152 }
1153
1154 pub async fn start_session(&self) -> io::Result<String> {
1155 let query = schema::StartSession::build(());
1156
1157 self.query(query)
1158 .await
1159 .map(|r| r.start_session.into_inner())
1160 }
1161
1162 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1163 let query = schema::EndSession::build(IdArg { id: id.into() });
1164
1165 self.query(query).await.map(|r| r.end_session)
1166 }
1167
1168 pub async fn reset(&self, id: &str) -> io::Result<bool> {
1169 let query = schema::Reset::build(IdArg { id: id.into() });
1170
1171 self.query(query).await.map(|r| r.reset)
1172 }
1173
1174 pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1175 let op = serde_json::to_string(op)?;
1176 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1177
1178 self.query(query).await.map(|r| r.execute)
1179 }
1180
1181 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1182 let query = schema::Register::build(RegisterArgs {
1183 id: id.into(),
1184 register: register.into(),
1185 });
1186
1187 Ok(self.query(query).await?.register.0 as Word)
1188 }
1189
1190 pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1191 let query = schema::Memory::build(MemoryArgs {
1192 id: id.into(),
1193 start: start.into(),
1194 size: size.into(),
1195 });
1196
1197 let memory = self.query(query).await?.memory;
1198
1199 Ok(serde_json::from_str(memory.as_str())?)
1200 }
1201
1202 pub async fn set_breakpoint(
1203 &self,
1204 session_id: &str,
1205 contract: fuel_types::ContractId,
1206 pc: u64,
1207 ) -> io::Result<()> {
1208 let operation = SetBreakpoint::build(SetBreakpointArgs {
1209 id: Id::new(session_id),
1210 bp: schema::Breakpoint {
1211 contract: contract.into(),
1212 pc: U64(pc),
1213 },
1214 });
1215
1216 let response = self.query(operation).await?;
1217 assert!(
1218 response.set_breakpoint,
1219 "Setting breakpoint returned invalid reply"
1220 );
1221 Ok(())
1222 }
1223
1224 pub async fn set_single_stepping(
1225 &self,
1226 session_id: &str,
1227 enable: bool,
1228 ) -> io::Result<()> {
1229 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1230 id: Id::new(session_id),
1231 enable,
1232 });
1233 self.query(operation).await?;
1234 Ok(())
1235 }
1236
1237 pub async fn start_tx(
1238 &self,
1239 session_id: &str,
1240 tx: &Transaction,
1241 ) -> io::Result<RunResult> {
1242 let operation = StartTx::build(StartTxArgs {
1243 id: Id::new(session_id),
1244 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1245 });
1246 let response = self.query(operation).await?.start_tx;
1247 Ok(response)
1248 }
1249
1250 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1251 let operation = ContinueTx::build(ContinueTxArgs {
1252 id: Id::new(session_id),
1253 });
1254 let response = self.query(operation).await?.continue_tx;
1255 Ok(response)
1256 }
1257
1258 pub async fn transaction(
1259 &self,
1260 id: &TxId,
1261 ) -> io::Result<Option<TransactionResponse>> {
1262 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1263
1264 let transaction = self.query(query).await?.transaction;
1265
1266 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1267 }
1268
1269 pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1271 let query =
1272 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1273
1274 let status = self.query(query).await?.transaction.ok_or_else(|| {
1275 io::Error::new(
1276 ErrorKind::NotFound,
1277 format!("status not found for transaction {id} "),
1278 )
1279 })?;
1280
1281 let status = status
1282 .status
1283 .ok_or_else(|| {
1284 io::Error::new(
1285 ErrorKind::NotFound,
1286 format!("status not found for transaction {id}"),
1287 )
1288 })?
1289 .try_into()?;
1290 Ok(status)
1291 }
1292
1293 #[tracing::instrument(skip(self), level = "debug")]
1294 #[cfg(feature = "subscriptions")]
1295 pub async fn subscribe_transaction_status(
1297 &self,
1298 id: &TxId,
1299 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1300 self.subscribe_transaction_status_opt(id, None).await
1301 }
1302
1303 #[cfg(feature = "subscriptions")]
1304 pub async fn subscribe_transaction_status_opt(
1306 &self,
1307 id: &TxId,
1308 include_preconfirmation: Option<bool>,
1309 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1310 use cynic::SubscriptionBuilder;
1311 use schema::tx::StatusChangeSubscriptionArgs;
1312 let tx_id: TransactionId = (*id).into();
1313 let s =
1314 schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
1315 id: tx_id,
1316 include_preconfirmation,
1317 });
1318
1319 tracing::debug!("subscribing");
1320 let stream = self.subscribe(s).await?.map(|tx| {
1321 tracing::debug!("received {tx:?}");
1322 let tx = tx?;
1323 let status = tx.status_change.try_into()?;
1324 Ok(status)
1325 });
1326
1327 Ok(stream)
1328 }
1329
1330 #[cfg(feature = "subscriptions")]
1331 pub async fn await_transaction_commit(
1336 &self,
1337 id: &TxId,
1338 ) -> io::Result<TransactionStatus> {
1339 let status_result = self
1342 .subscribe_transaction_status(id)
1343 .await?
1344 .skip_while(|status| {
1345 future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1346 })
1347 .next()
1348 .await;
1349
1350 if let Some(Ok(status)) = status_result {
1351 Ok(status)
1352 } else {
1353 Err(io::Error::other(format!(
1354 "Failed to get status for transaction {status_result:?}"
1355 )))
1356 }
1357 }
1358
1359 pub async fn transactions(
1361 &self,
1362 request: PaginationRequest<String>,
1363 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1364 let args = schema::ConnectionArgs::from(request);
1365 let query = schema::tx::TransactionsQuery::build(args);
1366 let transactions = self.query(query).await?.transactions.try_into()?;
1367 Ok(transactions)
1368 }
1369
1370 pub async fn transactions_by_owner(
1372 &self,
1373 owner: &Address,
1374 request: PaginationRequest<String>,
1375 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1376 let owner: schema::Address = (*owner).into();
1377 let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1378 let query = schema::tx::TransactionsByOwnerQuery::build(args);
1379
1380 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1381 Ok(transactions)
1382 }
1383
1384 pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1385 let query =
1386 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1387
1388 let tx = self.query(query).await?.transaction.ok_or_else(|| {
1389 io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1390 })?;
1391
1392 let receipts = match tx.status {
1393 Some(status) => match status {
1394 schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1395 s.receipts
1396 .into_iter()
1397 .map(TryInto::<Receipt>::try_into)
1398 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1399 )
1400 .transpose()?,
1401 schema::tx::TransactionStatus::FailureStatus(s) => Some(
1402 s.receipts
1403 .into_iter()
1404 .map(TryInto::<Receipt>::try_into)
1405 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1406 )
1407 .transpose()?,
1408 _ => None,
1409 },
1410 _ => None,
1411 };
1412
1413 Ok(receipts)
1414 }
1415
1416 #[cfg(feature = "test-helpers")]
1417 pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1418 let query = schema::tx::AllReceipts::build(());
1419 let receipts = self.query(query).await?.all_receipts;
1420
1421 let vec: Result<Vec<Receipt>, ConversionError> = receipts
1422 .into_iter()
1423 .map(TryInto::<Receipt>::try_into)
1424 .collect();
1425
1426 Ok(vec?)
1427 }
1428
1429 pub async fn produce_blocks(
1430 &self,
1431 blocks_to_produce: u32,
1432 start_timestamp: Option<u64>,
1433 ) -> io::Result<BlockHeight> {
1434 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1435 blocks_to_produce: blocks_to_produce.into(),
1436 start_timestamp: start_timestamp
1437 .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1438 });
1439
1440 let new_height = self.query(query).await?.produce_blocks;
1441
1442 Ok(new_height.into())
1443 }
1444
1445 pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1446 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1447 id: Some((*id).into()),
1448 });
1449
1450 let block = self
1451 .query(query)
1452 .await?
1453 .block
1454 .map(TryInto::try_into)
1455 .transpose()?;
1456
1457 Ok(block)
1458 }
1459
1460 pub async fn block_by_height(
1461 &self,
1462 height: BlockHeight,
1463 ) -> io::Result<Option<types::Block>> {
1464 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1465 height: Some(U32(height.into())),
1466 });
1467
1468 let block = self
1469 .query(query)
1470 .await?
1471 .block
1472 .map(TryInto::try_into)
1473 .transpose()?;
1474
1475 Ok(block)
1476 }
1477
1478 pub async fn da_compressed_block(
1479 &self,
1480 height: BlockHeight,
1481 ) -> io::Result<Option<Vec<u8>>> {
1482 let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1483 DaCompressedBlockByHeightArgs {
1484 height: U32(height.into()),
1485 },
1486 );
1487
1488 Ok(self
1489 .query(query)
1490 .await?
1491 .da_compressed_block
1492 .map(|b| b.bytes.into()))
1493 }
1494
1495 pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1497 let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1498 let blob = self.query(query).await?.blob.map(Into::into);
1499 Ok(blob)
1500 }
1501
1502 pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1504 let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1505 Ok(self.query(query).await?.blob.is_some())
1506 }
1507
1508 pub async fn blocks(
1510 &self,
1511 request: PaginationRequest<String>,
1512 ) -> io::Result<PaginatedResult<types::Block, String>> {
1513 let args = schema::ConnectionArgs::from(request);
1514 let query = schema::block::BlocksQuery::build(args);
1515
1516 let blocks = self.query(query).await?.blocks.try_into()?;
1517
1518 Ok(blocks)
1519 }
1520
1521 pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1522 let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1523 utxo_id: (*id).into(),
1524 });
1525 let coin = self.query(query).await?.coin.map(Into::into);
1526 Ok(coin)
1527 }
1528
1529 pub async fn coins(
1531 &self,
1532 owner: &Address,
1533 asset_id: Option<&AssetId>,
1534 request: PaginationRequest<String>,
1535 ) -> io::Result<PaginatedResult<types::Coin, String>> {
1536 let owner: schema::Address = (*owner).into();
1537 let asset_id = asset_id.map(|id| (*id).into());
1538 let args = CoinsConnectionArgs::from((owner, asset_id, request));
1539 let query = schema::coins::CoinsQuery::build(args);
1540
1541 let coins = self.query(query).await?.coins.into();
1542 Ok(coins)
1543 }
1544
1545 pub async fn coins_to_spend(
1547 &self,
1548 owner: &Address,
1549 spend_query: Vec<(AssetId, u128, Option<u16>)>,
1550 excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1552 ) -> io::Result<Vec<Vec<types::CoinType>>> {
1553 let owner: schema::Address = (*owner).into();
1554 let spend_query: Vec<SpendQueryElementInput> = spend_query
1555 .iter()
1556 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1557 Ok(SpendQueryElementInput {
1558 asset_id: (*asset_id).into(),
1559 amount: (*amount).into(),
1560 max: (*max).map(|max| max.into()),
1561 })
1562 })
1563 .try_collect()?;
1564 let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1565 let args =
1566 schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1567 let query = schema::coins::CoinsToSpendQuery::build(args);
1568
1569 let coins_per_asset = self
1570 .query(query)
1571 .await?
1572 .coins_to_spend
1573 .into_iter()
1574 .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1575 .collect::<Vec<_>>();
1576 Ok(coins_per_asset)
1577 }
1578
1579 pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1580 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1581 id: (*id).into(),
1582 });
1583 let contract = self.query(query).await?.contract.map(Into::into);
1584 Ok(contract)
1585 }
1586
1587 pub async fn contract_balance(
1588 &self,
1589 id: &ContractId,
1590 asset: Option<&AssetId>,
1591 ) -> io::Result<u64> {
1592 let asset_id: schema::AssetId = match asset {
1593 Some(asset) => (*asset).into(),
1594 None => schema::AssetId::default(),
1595 };
1596
1597 let query =
1598 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1599 id: (*id).into(),
1600 asset: asset_id,
1601 });
1602
1603 let balance: types::ContractBalance =
1604 self.query(query).await?.contract_balance.into();
1605 Ok(balance.amount)
1606 }
1607
1608 pub async fn balance(
1609 &self,
1610 owner: &Address,
1611 asset_id: Option<&AssetId>,
1612 ) -> io::Result<u128> {
1613 let owner: schema::Address = (*owner).into();
1614 let asset_id: schema::AssetId = match asset_id {
1615 Some(asset_id) => (*asset_id).into(),
1616 None => schema::AssetId::default(),
1617 };
1618 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1619 let balance: types::Balance = self.query(query).await?.balance.into();
1620 Ok(balance.amount)
1621 }
1622
1623 pub async fn balances(
1625 &self,
1626 owner: &Address,
1627 request: PaginationRequest<String>,
1628 ) -> io::Result<PaginatedResult<types::Balance, String>> {
1629 let owner: schema::Address = (*owner).into();
1630 let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1631 let query = schema::balance::BalancesQuery::build(args);
1632
1633 let balances = self.query(query).await?.balances.into();
1634 Ok(balances)
1635 }
1636
1637 pub async fn contract_balances(
1638 &self,
1639 contract: &ContractId,
1640 request: PaginationRequest<String>,
1641 ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1642 let contract_id: schema::ContractId = (*contract).into();
1643 let args = ContractBalancesConnectionArgs::from((contract_id, request));
1644 let query = schema::contract::ContractBalancesQuery::build(args);
1645
1646 let balances = self.query(query).await?.contract_balances.into();
1647
1648 Ok(balances)
1649 }
1650
1651 pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1653 let query = schema::message::MessageQuery::build(NonceArgs {
1654 nonce: (*nonce).into(),
1655 });
1656 let message = self.query(query).await?.message.map(Into::into);
1657 Ok(message)
1658 }
1659
1660 pub async fn messages(
1661 &self,
1662 owner: Option<&Address>,
1663 request: PaginationRequest<String>,
1664 ) -> io::Result<PaginatedResult<types::Message, String>> {
1665 let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1666 let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1667 let query = schema::message::OwnedMessageQuery::build(args);
1668
1669 let messages = self.query(query).await?.messages.into();
1670
1671 Ok(messages)
1672 }
1673
1674 pub async fn contract_info(
1675 &self,
1676 contract: &ContractId,
1677 ) -> io::Result<Option<types::Contract>> {
1678 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1679 id: (*contract).into(),
1680 });
1681 let contract_info = self.query(query).await?.contract.map(Into::into);
1682 Ok(contract_info)
1683 }
1684
1685 pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1686 let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1687 nonce: (*nonce).into(),
1688 });
1689 let status = self.query(query).await?.message_status.into();
1690
1691 Ok(status)
1692 }
1693
1694 pub async fn message_proof(
1696 &self,
1697 transaction_id: &TxId,
1698 nonce: &Nonce,
1699 commit_block_id: Option<&BlockId>,
1700 commit_block_height: Option<BlockHeight>,
1701 ) -> io::Result<types::MessageProof> {
1702 let transaction_id: TransactionId = (*transaction_id).into();
1703 let nonce: schema::Nonce = (*nonce).into();
1704 let commit_block_id: Option<schema::BlockId> =
1705 commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1706 let commit_block_height = commit_block_height.map(Into::into);
1707 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1708 transaction_id,
1709 nonce,
1710 commit_block_id,
1711 commit_block_height,
1712 });
1713 let proof = self.query(query).await?.message_proof.try_into()?;
1714 Ok(proof)
1715 }
1716
1717 pub async fn relayed_transaction_status(
1718 &self,
1719 id: &Bytes32,
1720 ) -> io::Result<Option<RelayedTransactionStatus>> {
1721 let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1722 RelayedTransactionStatusArgs {
1723 id: id.to_owned().into(),
1724 },
1725 );
1726 let status = self
1727 .query(query)
1728 .await?
1729 .relayed_transaction_status
1730 .map(|status| status.try_into())
1731 .transpose()?;
1732 Ok(status)
1733 }
1734
1735 pub async fn asset_info(
1736 &self,
1737 asset_id: &AssetId,
1738 ) -> io::Result<Option<AssetDetail>> {
1739 let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1740 id: (*asset_id).into(),
1741 });
1742 let asset_info = self.query(query).await?.asset_details.map(Into::into);
1743 Ok(asset_info)
1744 }
1745}
1746
1747#[cfg(any(test, feature = "test-helpers"))]
1748impl FuelClient {
1749 pub async fn transparent_transaction(
1750 &self,
1751 id: &TxId,
1752 ) -> io::Result<Option<types::TransactionType>> {
1753 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1754
1755 let transaction = self.query(query).await?.transaction;
1756
1757 Ok(transaction
1758 .map(|tx| {
1759 let response: TransactionResponse = tx.try_into()?;
1760 Ok::<_, ConversionError>(response.transaction)
1761 })
1762 .transpose()?)
1763 }
1764}