1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4 client::{
5 schema::{
6 block::BlockByHeightArgs,
7 coins::{
8 ExcludeInput,
9 SpendQueryElementInput,
10 },
11 contract::ContractBalanceQueryArgs,
12 gas_price::EstimateGasPrice,
13 message::MessageStatusArgs,
14 relayed_tx::RelayedTransactionStatusArgs,
15 tx::{
16 DryRunArg,
17 TxWithEstimatedPredicatesArg,
18 },
19 Tai64Timestamp,
20 TransactionId,
21 },
22 types::{
23 asset::AssetDetail,
24 gas_price::LatestGasPrice,
25 message::MessageStatus,
26 primitives::{
27 Address,
28 AssetId,
29 BlockId,
30 ContractId,
31 UtxoId,
32 },
33 upgrades::StateTransitionBytecode,
34 RelayedTransactionStatus,
35 },
36 },
37 reqwest_ext::{
38 FuelGraphQlResponse,
39 FuelOperation,
40 ReqwestExt,
41 },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46 Engine as _,
47 BASE64_STANDARD,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52 Id,
53 MutationBuilder,
54 Operation,
55 QueryBuilder,
56};
57use fuel_core_types::{
58 blockchain::header::{
59 ConsensusParametersVersion,
60 StateTransitionBytecodeVersion,
61 },
62 fuel_asm::{
63 Instruction,
64 Word,
65 },
66 fuel_tx::{
67 BlobId,
68 Bytes32,
69 ConsensusParameters,
70 Receipt,
71 Transaction,
72 TxId,
73 },
74 fuel_types::{
75 self,
76 canonical::Serialize,
77 BlockHeight,
78 Nonce,
79 },
80 services::executor::{
81 StorageReadReplayEvent,
82 TransactionExecutionStatus,
83 },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87 Stream,
88 StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92 PageDirection,
93 PaginatedResult,
94 PaginationRequest,
95};
96use schema::{
97 assets::AssetInfoArg,
98 balance::BalanceArgs,
99 blob::BlobByIdArgs,
100 block::BlockByIdArgs,
101 coins::{
102 CoinByIdArgs,
103 CoinsConnectionArgs,
104 },
105 contract::{
106 ContractBalancesConnectionArgs,
107 ContractByIdArgs,
108 },
109 da_compressed::DaCompressedBlockByHeightArgs,
110 gas_price::BlockHorizonArgs,
111 storage_read_replay::{
112 StorageReadReplay,
113 StorageReadReplayArgs,
114 },
115 tx::{
116 AssembleTxArg,
117 TransactionsByOwnerConnectionArgs,
118 TxArg,
119 TxIdArgs,
120 },
121 Bytes,
122 ContinueTx,
123 ContinueTxArgs,
124 ConversionError,
125 HexString,
126 IdArg,
127 MemoryArgs,
128 RegisterArgs,
129 RunResult,
130 SetBreakpoint,
131 SetBreakpointArgs,
132 SetSingleStepping,
133 SetSingleSteppingArgs,
134 StartTx,
135 StartTxArgs,
136 U32,
137 U64,
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142 convert::TryInto,
143 io::{
144 self,
145 ErrorKind,
146 },
147 net,
148 str::{
149 self,
150 FromStr,
151 },
152 sync::{
153 Arc,
154 Mutex,
155 },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160 assemble_tx::{
161 AssembleTransactionResult,
162 RequiredBalance,
163 },
164 TransactionResponse,
165 TransactionStatus,
166};
167
168use self::schema::{
169 block::ProduceBlockArgs,
170 message::{
171 MessageProofArgs,
172 NonceArgs,
173 },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184pub enum Error {
188 #[from]
190 Other(anyhow::Error),
191}
192
193#[derive(Debug)]
196pub enum ConsistencyPolicy {
197 Auto {
201 height: Arc<Mutex<Option<BlockHeight>>>,
203 },
204 Manual {
207 height: Option<BlockHeight>,
209 },
210}
211
212impl Clone for ConsistencyPolicy {
213 fn clone(&self) -> Self {
214 match self {
215 Self::Auto { height } => Self::Auto {
216 height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219 },
220 Self::Manual { height } => Self::Manual { height: *height },
221 }
222 }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227 current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228 current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232 fn clone(&self) -> Self {
233 Self {
234 current_stf_version: Arc::new(Mutex::new(
235 self.current_stf_version.lock().ok().and_then(|v| *v),
236 )),
237 current_consensus_parameters_version: Arc::new(Mutex::new(
238 self.current_consensus_parameters_version
239 .lock()
240 .ok()
241 .and_then(|v| *v),
242 )),
243 }
244 }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249 client: reqwest::Client,
250 #[cfg(feature = "subscriptions")]
251 cookie: std::sync::Arc<reqwest::cookie::Jar>,
252 url: reqwest::Url,
253 require_height: ConsistencyPolicy,
254 chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258 type Err = anyhow::Error;
259
260 fn from_str(str: &str) -> Result<Self, Self::Err> {
261 let mut raw_url = str.to_string();
262 if !raw_url.starts_with("http") {
263 raw_url = format!("http://{raw_url}");
264 }
265
266 let mut url = reqwest::Url::parse(&raw_url)
267 .map_err(anyhow::Error::msg)
268 .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269 url.set_path("/v1/graphql");
270
271 #[cfg(feature = "subscriptions")]
272 {
273 let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274 let client = reqwest::Client::builder()
275 .cookie_provider(cookie.clone())
276 .build()
277 .map_err(anyhow::Error::msg)?;
278 Ok(Self {
279 client,
280 cookie,
281 url,
282 require_height: ConsistencyPolicy::Auto {
283 height: Arc::new(Mutex::new(None)),
284 },
285 chain_state_info: Default::default(),
286 })
287 }
288
289 #[cfg(not(feature = "subscriptions"))]
290 {
291 let client = reqwest::Client::new();
292 Ok(Self {
293 client,
294 url,
295 require_height: ConsistencyPolicy::Auto {
296 height: Arc::new(Mutex::new(None)),
297 },
298 chain_state_info: Default::default(),
299 })
300 }
301 }
302}
303
304impl<S> From<S> for FuelClient
305where
306 S: Into<net::SocketAddr>,
307{
308 fn from(socket: S) -> Self {
309 format!("http://{}", socket.into())
310 .as_str()
311 .parse()
312 .unwrap()
313 }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317 let e = errors
318 .into_iter()
319 .fold(String::from("Response errors"), |mut s, e| {
320 s.push_str("; ");
321 s.push_str(e.as_str());
322 s
323 });
324 io::Error::new(io::ErrorKind::Other, e)
325}
326
327impl FuelClient {
328 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329 Self::from_str(url.as_ref())
330 }
331
332 pub fn with_required_fuel_block_height(
333 &mut self,
334 new_height: Option<BlockHeight>,
335 ) -> &mut Self {
336 match &mut self.require_height {
337 ConsistencyPolicy::Auto { height } => {
338 *height.lock().expect("Mutex poisoned") = new_height;
339 }
340 ConsistencyPolicy::Manual { height } => {
341 *height = new_height;
342 }
343 }
344 self
345 }
346
347 pub fn use_manual_consistency_policy(
348 &mut self,
349 height: Option<BlockHeight>,
350 ) -> &mut Self {
351 self.require_height = ConsistencyPolicy::Manual { height };
352 self
353 }
354
355 pub fn required_block_height(&self) -> Option<BlockHeight> {
356 match &self.require_height {
357 ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358 ConsistencyPolicy::Manual { height } => *height,
359 }
360 }
361
362 fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363 if let Some(current_sft_version) = response
364 .extensions
365 .as_ref()
366 .and_then(|e| e.current_stf_version)
367 {
368 if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
369 *c = Some(current_sft_version);
370 }
371 }
372
373 if let Some(current_consensus_parameters_version) = response
374 .extensions
375 .as_ref()
376 .and_then(|e| e.current_consensus_parameters_version)
377 {
378 if let Ok(mut c) = self
379 .chain_state_info
380 .current_consensus_parameters_version
381 .lock()
382 {
383 *c = Some(current_consensus_parameters_version);
384 }
385 }
386
387 let inner_required_height = match &self.require_height {
388 ConsistencyPolicy::Auto { height } => Some(height.clone()),
389 ConsistencyPolicy::Manual { .. } => None,
390 };
391
392 if let Some(inner_required_height) = inner_required_height {
393 if let Some(current_fuel_block_height) = response
394 .extensions
395 .as_ref()
396 .and_then(|e| e.current_fuel_block_height)
397 {
398 let mut lock = inner_required_height.lock().expect("Mutex poisoned");
399
400 if current_fuel_block_height >= lock.unwrap_or_default() {
401 *lock = Some(current_fuel_block_height);
402 }
403 }
404 }
405 }
406
407 pub async fn query<ResponseData, Vars>(
409 &self,
410 q: Operation<ResponseData, Vars>,
411 ) -> io::Result<ResponseData>
412 where
413 Vars: serde::Serialize,
414 ResponseData: serde::de::DeserializeOwned + 'static,
415 {
416 let required_fuel_block_height = self.required_block_height();
417 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
418 let response = self
419 .client
420 .post(self.url.clone())
421 .run_fuel_graphql(fuel_operation)
422 .await
423 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425 self.decode_response(response)
426 }
427
428 fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
429 where
430 R: serde::de::DeserializeOwned + 'static,
431 {
432 self.update_chain_state_info(&response);
433
434 if let Some(failed) = response
435 .extensions
436 .as_ref()
437 .and_then(|e| e.fuel_block_height_precondition_failed)
438 {
439 if failed {
440 return Err(io::Error::new(
441 io::ErrorKind::Other,
442 "The required block height was not met",
443 ));
444 }
445 }
446
447 let response = response.response;
448
449 match (response.data, response.errors) {
450 (Some(d), _) => Ok(d),
451 (_, Some(e)) => Err(from_strings_errors_to_std_error(
452 e.into_iter().map(|e| e.message).collect(),
453 )),
454 _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
455 }
456 }
457
458 #[tracing::instrument(skip_all)]
459 #[cfg(feature = "subscriptions")]
460 async fn subscribe<ResponseData, Vars>(
461 &self,
462 q: StreamingOperation<ResponseData, Vars>,
463 ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
464 where
465 Vars: serde::Serialize,
466 ResponseData: serde::de::DeserializeOwned + 'static,
467 {
468 use core::ops::Deref;
469 use eventsource_client as es;
470 use hyper_rustls as _;
471 use reqwest::cookie::CookieStore;
472 let mut url = self.url.clone();
473 url.set_path("/v1/graphql-sub");
474
475 let required_fuel_block_height = self.required_block_height();
476 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
477
478 let json_query = serde_json::to_string(&fuel_operation)?;
479 let mut client_builder = es::ClientBuilder::for_url(url.as_str())
480 .map_err(|e| {
481 io::Error::new(
482 io::ErrorKind::Other,
483 format!("Failed to start client {e:?}"),
484 )
485 })?
486 .body(json_query)
487 .method("POST".to_string())
488 .header("content-type", "application/json")
489 .map_err(|e| {
490 io::Error::new(
491 io::ErrorKind::Other,
492 format!("Failed to add header to client {e:?}"),
493 )
494 })?;
495 if let Some(password) = url.password() {
496 let username = url.username();
497 let credentials = format!("{}:{}", username, password);
498 let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
499 client_builder = client_builder
500 .header("Authorization", &authorization)
501 .map_err(|e| {
502 io::Error::new(
503 io::ErrorKind::Other,
504 format!("Failed to add header to client {e:?}"),
505 )
506 })?;
507 }
508
509 if let Some(value) = self.cookie.deref().cookies(&self.url) {
510 let value = value.to_str().map_err(|e| {
511 io::Error::new(
512 io::ErrorKind::Other,
513 format!("Unable convert header value to string {e:?}"),
514 )
515 })?;
516 client_builder = client_builder
517 .header(reqwest::header::COOKIE.as_str(), value)
518 .map_err(|e| {
519 io::Error::new(
520 io::ErrorKind::Other,
521 format!("Failed to add header from `reqwest` to client {e:?}"),
522 )
523 })?;
524 }
525
526 let client = client_builder.build_with_conn(
527 hyper_rustls::HttpsConnectorBuilder::new()
528 .with_webpki_roots()
529 .https_or_http()
530 .enable_http1()
531 .build(),
532 );
533
534 let mut last = None;
535
536 let stream = es::Client::stream(&client)
537 .take_while(|result| {
538 futures::future::ready(!matches!(result, Err(es::Error::Eof)))
539 })
540 .filter_map(move |result| {
541 tracing::debug!("Got result: {result:?}");
542 let r = match result {
543 Ok(es::SSE::Event(es::Event { data, .. })) => {
544 match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
545 &data,
546 ) {
547 Ok(resp) => {
548 match self.decode_response(resp) {
549 Ok(resp) => {
550 match last.replace(data) {
551 Some(l)
553 if l == *last.as_ref().expect(
554 "Safe because of the replace above",
555 ) =>
556 {
557 None
558 }
559 _ => Some(Ok(resp)),
560 }
561 }
562 Err(e) => Some(Err(io::Error::new(
563 io::ErrorKind::Other,
564 format!("Decode error: {e:?}"),
565 ))),
566 }
567 }
568 Err(e) => Some(Err(io::Error::new(
569 io::ErrorKind::Other,
570 format!("Json error: {e:?}"),
571 ))),
572 }
573 }
574 Ok(_) => None,
575 Err(e) => Some(Err(io::Error::new(
576 io::ErrorKind::Other,
577 format!("Graphql error: {e:?}"),
578 ))),
579 };
580 futures::future::ready(r)
581 });
582
583 Ok(stream)
584 }
585
586 pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
587 self.chain_state_info
588 .current_stf_version
589 .lock()
590 .ok()
591 .and_then(|value| *value)
592 }
593
594 pub fn latest_consensus_parameters_version(
595 &self,
596 ) -> Option<ConsensusParametersVersion> {
597 self.chain_state_info
598 .current_consensus_parameters_version
599 .lock()
600 .ok()
601 .and_then(|value| *value)
602 }
603
604 pub async fn health(&self) -> io::Result<bool> {
605 let query = schema::Health::build(());
606 self.query(query).await.map(|r| r.health)
607 }
608
609 pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
610 let query = schema::node_info::QueryNodeInfo::build(());
611 self.query(query).await.map(|r| r.node_info.into())
612 }
613
614 pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
615 let query = schema::gas_price::QueryLatestGasPrice::build(());
616 self.query(query).await.map(|r| r.latest_gas_price.into())
617 }
618
619 pub async fn estimate_gas_price(
620 &self,
621 block_horizon: u32,
622 ) -> io::Result<EstimateGasPrice> {
623 let args = BlockHorizonArgs {
624 block_horizon: Some(block_horizon.into()),
625 };
626 let query = schema::gas_price::QueryEstimateGasPrice::build(args);
627 self.query(query).await.map(|r| r.estimate_gas_price)
628 }
629
630 #[cfg(feature = "std")]
631 pub async fn connected_peers_info(
632 &self,
633 ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
634 let query = schema::node_info::QueryPeersInfo::build(());
635 self.query(query)
636 .await
637 .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
638 }
639
640 pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
641 let query = schema::chain::ChainQuery::build(());
642 self.query(query).await.and_then(|r| {
643 let result = r.chain.try_into()?;
644 Ok(result)
645 })
646 }
647
648 pub async fn consensus_parameters(
649 &self,
650 version: i32,
651 ) -> io::Result<Option<ConsensusParameters>> {
652 let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
653 let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
654
655 let result = self
656 .query(query)
657 .await?
658 .consensus_parameters
659 .map(TryInto::try_into)
660 .transpose()?;
661
662 Ok(result)
663 }
664
665 pub async fn state_transition_byte_code_by_version(
666 &self,
667 version: i32,
668 ) -> io::Result<Option<StateTransitionBytecode>> {
669 let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
670 let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
671
672 let result = self
673 .query(query)
674 .await?
675 .state_transition_bytecode_by_version
676 .map(TryInto::try_into)
677 .transpose()?;
678
679 Ok(result)
680 }
681
682 pub async fn state_transition_byte_code_by_root(
683 &self,
684 root: Bytes32,
685 ) -> io::Result<Option<StateTransitionBytecode>> {
686 let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
687 root: HexString(Bytes(root.to_vec())),
688 };
689 let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
690
691 let result = self
692 .query(query)
693 .await?
694 .state_transition_bytecode_by_root
695 .map(TryInto::try_into)
696 .transpose()?;
697
698 Ok(result)
699 }
700
701 pub async fn dry_run(
703 &self,
704 txs: &[Transaction],
705 ) -> io::Result<Vec<TransactionExecutionStatus>> {
706 self.dry_run_opt(txs, None, None, None).await
707 }
708
709 pub async fn dry_run_opt(
711 &self,
712 txs: &[Transaction],
713 utxo_validation: Option<bool>,
715 gas_price: Option<u64>,
716 at_height: Option<BlockHeight>,
717 ) -> io::Result<Vec<TransactionExecutionStatus>> {
718 let txs = txs
719 .iter()
720 .map(|tx| HexString(Bytes(tx.to_bytes())))
721 .collect::<Vec<HexString>>();
722 let query: Operation<schema::tx::DryRun, DryRunArg> =
723 schema::tx::DryRun::build(DryRunArg {
724 txs,
725 utxo_validation,
726 gas_price: gas_price.map(|gp| gp.into()),
727 block_height: at_height.map(|bh| bh.into()),
728 });
729 let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
730 tx_statuses
731 .into_iter()
732 .map(|tx_status| tx_status.try_into().map_err(Into::into))
733 .collect()
734 }
735
736 pub async fn storage_read_replay(
738 &self,
739 height: &BlockHeight,
740 ) -> io::Result<Vec<StorageReadReplayEvent>> {
741 let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
742 StorageReadReplay::build(StorageReadReplayArgs {
743 height: (*height).into(),
744 });
745 Ok(self
746 .query(query)
747 .await
748 .map(|r| r.storage_read_replay)?
749 .into_iter()
750 .map(Into::into)
751 .collect())
752 }
753
754 #[allow(clippy::too_many_arguments)]
777 pub async fn assemble_tx(
778 &self,
779 tx: &Transaction,
780 block_horizon: u32,
781 required_balances: Vec<RequiredBalance>,
782 fee_address_index: u16,
783 exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
784 estimate_predicates: bool,
785 reserve_gas: Option<u64>,
786 ) -> io::Result<AssembleTransactionResult> {
787 let tx = HexString(Bytes(tx.to_bytes()));
788 let block_horizon = block_horizon.into();
789
790 let required_balances: Vec<_> = required_balances
791 .into_iter()
792 .map(schema::tx::RequiredBalance::try_from)
793 .collect::<Result<Vec<_>, _>>()?;
794
795 let fee_address_index = fee_address_index.into();
796
797 let exclude_input = exclude.map(Into::into);
798
799 let reserve_gas = reserve_gas.map(U64::from);
800
801 let query_arg = AssembleTxArg {
802 tx,
803 block_horizon,
804 required_balances,
805 fee_address_index,
806 exclude_input,
807 estimate_predicates,
808 reserve_gas,
809 };
810
811 let query = schema::tx::AssembleTx::build(query_arg);
812 let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
813 Ok(assemble_tx_result.try_into()?)
814 }
815
816 pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
818 let serialized_tx = tx.to_bytes();
819 let query = schema::tx::EstimatePredicates::build(TxArg {
820 tx: HexString(Bytes(serialized_tx)),
821 });
822 let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
823 let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
824 *tx = tx_with_predicate;
825 Ok(())
826 }
827
828 pub async fn submit(
829 &self,
830 tx: &Transaction,
831 ) -> io::Result<types::primitives::TransactionId> {
832 self.submit_opt(tx, None).await
833 }
834
835 pub async fn submit_opt(
836 &self,
837 tx: &Transaction,
838 estimate_predicates: Option<bool>,
839 ) -> io::Result<types::primitives::TransactionId> {
840 let tx = tx.clone().to_bytes();
841 let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
842 tx: HexString(Bytes(tx)),
843 estimate_predicates,
844 });
845
846 let id = self.query(query).await.map(|r| r.submit)?.id.into();
847 Ok(id)
848 }
849
850 #[cfg(feature = "subscriptions")]
852 pub async fn submit_and_await_commit(
853 &self,
854 tx: &Transaction,
855 ) -> io::Result<TransactionStatus> {
856 self.submit_and_await_commit_opt(tx, None).await
857 }
858
859 #[cfg(feature = "subscriptions")]
868 pub async fn submit_and_await_commit_opt(
869 &self,
870 tx: &Transaction,
871 estimate_predicates: Option<bool>,
872 ) -> io::Result<TransactionStatus> {
873 use cynic::SubscriptionBuilder;
874 let tx = tx.clone().to_bytes();
875 let s =
876 schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
877 tx: HexString(Bytes(tx)),
878 estimate_predicates,
879 });
880
881 let mut stream = self.subscribe(s).await?.map(
882 |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
883 let status: TransactionStatus = r?.submit_and_await.try_into()?;
884 Result::<_, io::Error>::Ok(status)
885 },
886 );
887
888 let status = stream.next().await.ok_or(io::Error::new(
889 io::ErrorKind::Other,
890 "Failed to get status from the submission",
891 ))??;
892
893 Ok(status)
894 }
895
896 #[cfg(feature = "subscriptions")]
898 pub async fn submit_and_await_commit_with_tx(
899 &self,
900 tx: &Transaction,
901 ) -> io::Result<StatusWithTransaction> {
902 self.submit_and_await_commit_with_tx_opt(tx, None).await
903 }
904
905 #[cfg(feature = "subscriptions")]
907 pub async fn submit_and_await_commit_with_tx_opt(
908 &self,
909 tx: &Transaction,
910 estimate_predicates: Option<bool>,
911 ) -> io::Result<StatusWithTransaction> {
912 use cynic::SubscriptionBuilder;
913 let tx = tx.clone().to_bytes();
914 let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
915 TxWithEstimatedPredicatesArg {
916 tx: HexString(Bytes(tx)),
917 estimate_predicates,
918 },
919 );
920
921 let mut stream = self.subscribe(s).await?.map(
922 |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
923 let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
924 Result::<_, io::Error>::Ok(status)
925 },
926 );
927
928 let status = stream.next().await.ok_or(io::Error::new(
929 io::ErrorKind::Other,
930 "Failed to get status from the submission",
931 ))??;
932
933 Ok(status)
934 }
935
936 #[cfg(feature = "subscriptions")]
938 pub async fn submit_and_await_status<'a>(
939 &'a self,
940 tx: &'a Transaction,
941 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
942 self.submit_and_await_status_opt(tx, None).await
943 }
944
945 #[cfg(feature = "subscriptions")]
947 pub async fn submit_and_await_status_opt<'a>(
948 &'a self,
949 tx: &'a Transaction,
950 estimate_predicates: Option<bool>,
951 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + 'a> {
952 use cynic::SubscriptionBuilder;
953 let tx = tx.clone().to_bytes();
954 let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
955 TxWithEstimatedPredicatesArg {
956 tx: HexString(Bytes(tx)),
957 estimate_predicates,
958 },
959 );
960
961 let stream = self.subscribe(s).await?.map(
962 |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
963 let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
964 Result::<_, io::Error>::Ok(status)
965 },
966 );
967
968 Ok(stream)
969 }
970
971 #[cfg(feature = "subscriptions")]
973 pub async fn contract_storage_slots<'a>(
974 &'a self,
975 contract_id: &'a ContractId,
976 ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + 'a> {
977 use cynic::SubscriptionBuilder;
978 use schema::storage::ContractStorageSlotsArgs;
979 let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
980 contract_id: (*contract_id).into(),
981 });
982
983 let stream = self.subscribe(s).await?.map(
984 |result: io::Result<schema::storage::ContractStorageSlots>| {
985 let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
986 Result::<_, io::Error>::Ok(result)
987 },
988 );
989
990 Ok(stream)
991 }
992
993 #[cfg(feature = "subscriptions")]
995 pub async fn contract_storage_balances<'a>(
996 &'a self,
997 contract_id: &'a ContractId,
998 ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + 'a>
999 {
1000 use cynic::SubscriptionBuilder;
1001 use schema::{
1002 contract::ContractBalance,
1003 storage::ContractStorageBalancesArgs,
1004 };
1005 let s = schema::storage::ContractStorageBalances::build(
1006 ContractStorageBalancesArgs {
1007 contract_id: (*contract_id).into(),
1008 },
1009 );
1010
1011 let stream = self.subscribe(s).await?.map(
1012 |result: io::Result<schema::storage::ContractStorageBalances>| {
1013 let result: ContractBalance = result?.contract_storage_balances;
1014 Result::<_, io::Error>::Ok(result)
1015 },
1016 );
1017
1018 Ok(stream)
1019 }
1020
1021 pub async fn contract_slots_values(
1022 &self,
1023 contract_id: &ContractId,
1024 block_height: Option<BlockHeight>,
1025 requested_storage_slots: Vec<Bytes32>,
1026 ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1027 let query = schema::storage::ContractSlotValues::build(
1028 schema::storage::ContractSlotValuesArgs {
1029 contract_id: (*contract_id).into(),
1030 block_height: block_height.map(|b| (*b).into()),
1031 storage_slots: requested_storage_slots
1032 .into_iter()
1033 .map(Into::into)
1034 .collect(),
1035 },
1036 );
1037
1038 self.query(query)
1039 .await
1040 .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1041 }
1042
1043 pub async fn contract_balance_values(
1044 &self,
1045 contract_id: &ContractId,
1046 block_height: Option<BlockHeight>,
1047 requested_storage_slots: Vec<AssetId>,
1048 ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1049 let query = schema::storage::ContractBalanceValues::build(
1050 schema::storage::ContractBalanceValuesArgs {
1051 contract_id: (*contract_id).into(),
1052 block_height: block_height.map(|b| (*b).into()),
1053 assets: requested_storage_slots
1054 .into_iter()
1055 .map(Into::into)
1056 .collect(),
1057 },
1058 );
1059
1060 self.query(query).await.map(|r| {
1061 r.contract_balance_values
1062 .into_iter()
1063 .map(Into::into)
1064 .collect()
1065 })
1066 }
1067
1068 pub async fn start_session(&self) -> io::Result<String> {
1069 let query = schema::StartSession::build(());
1070
1071 self.query(query)
1072 .await
1073 .map(|r| r.start_session.into_inner())
1074 }
1075
1076 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1077 let query = schema::EndSession::build(IdArg { id: id.into() });
1078
1079 self.query(query).await.map(|r| r.end_session)
1080 }
1081
1082 pub async fn reset(&self, id: &str) -> io::Result<bool> {
1083 let query = schema::Reset::build(IdArg { id: id.into() });
1084
1085 self.query(query).await.map(|r| r.reset)
1086 }
1087
1088 pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1089 let op = serde_json::to_string(op)?;
1090 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1091
1092 self.query(query).await.map(|r| r.execute)
1093 }
1094
1095 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1096 let query = schema::Register::build(RegisterArgs {
1097 id: id.into(),
1098 register: register.into(),
1099 });
1100
1101 Ok(self.query(query).await?.register.0 as Word)
1102 }
1103
1104 pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1105 let query = schema::Memory::build(MemoryArgs {
1106 id: id.into(),
1107 start: start.into(),
1108 size: size.into(),
1109 });
1110
1111 let memory = self.query(query).await?.memory;
1112
1113 Ok(serde_json::from_str(memory.as_str())?)
1114 }
1115
1116 pub async fn set_breakpoint(
1117 &self,
1118 session_id: &str,
1119 contract: fuel_types::ContractId,
1120 pc: u64,
1121 ) -> io::Result<()> {
1122 let operation = SetBreakpoint::build(SetBreakpointArgs {
1123 id: Id::new(session_id),
1124 bp: schema::Breakpoint {
1125 contract: contract.into(),
1126 pc: U64(pc),
1127 },
1128 });
1129
1130 let response = self.query(operation).await?;
1131 assert!(
1132 response.set_breakpoint,
1133 "Setting breakpoint returned invalid reply"
1134 );
1135 Ok(())
1136 }
1137
1138 pub async fn set_single_stepping(
1139 &self,
1140 session_id: &str,
1141 enable: bool,
1142 ) -> io::Result<()> {
1143 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1144 id: Id::new(session_id),
1145 enable,
1146 });
1147 self.query(operation).await?;
1148 Ok(())
1149 }
1150
1151 pub async fn start_tx(
1152 &self,
1153 session_id: &str,
1154 tx: &Transaction,
1155 ) -> io::Result<RunResult> {
1156 let operation = StartTx::build(StartTxArgs {
1157 id: Id::new(session_id),
1158 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1159 });
1160 let response = self.query(operation).await?.start_tx;
1161 Ok(response)
1162 }
1163
1164 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1165 let operation = ContinueTx::build(ContinueTxArgs {
1166 id: Id::new(session_id),
1167 });
1168 let response = self.query(operation).await?.continue_tx;
1169 Ok(response)
1170 }
1171
1172 pub async fn transaction(
1173 &self,
1174 id: &TxId,
1175 ) -> io::Result<Option<TransactionResponse>> {
1176 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1177
1178 let transaction = self.query(query).await?.transaction;
1179
1180 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1181 }
1182
1183 pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1185 let query =
1186 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1187
1188 let status = self.query(query).await?.transaction.ok_or_else(|| {
1189 io::Error::new(
1190 ErrorKind::NotFound,
1191 format!("status not found for transaction {id} "),
1192 )
1193 })?;
1194
1195 let status = status
1196 .status
1197 .ok_or_else(|| {
1198 io::Error::new(
1199 ErrorKind::NotFound,
1200 format!("status not found for transaction {id}"),
1201 )
1202 })?
1203 .try_into()?;
1204 Ok(status)
1205 }
1206
1207 #[tracing::instrument(skip(self), level = "debug")]
1208 #[cfg(feature = "subscriptions")]
1209 pub async fn subscribe_transaction_status<'a>(
1211 &'a self,
1212 id: &'a TxId,
1213 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + 'a> {
1214 use cynic::SubscriptionBuilder;
1215 let tx_id: TransactionId = (*id).into();
1216 let s = schema::tx::StatusChangeSubscription::build(TxIdArgs { id: tx_id });
1217
1218 tracing::debug!("subscribing");
1219 let stream = self.subscribe(s).await?.map(|tx| {
1220 tracing::debug!("received {tx:?}");
1221 let tx = tx?;
1222 let status = tx.status_change.try_into()?;
1223 Ok(status)
1224 });
1225
1226 Ok(stream)
1227 }
1228
1229 #[cfg(feature = "subscriptions")]
1230 pub async fn await_transaction_commit(
1235 &self,
1236 id: &TxId,
1237 ) -> io::Result<TransactionStatus> {
1238 let status_result = self
1241 .subscribe_transaction_status(id)
1242 .await?
1243 .skip_while(|status| {
1244 future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1245 })
1246 .next()
1247 .await;
1248
1249 if let Some(Ok(status)) = status_result {
1250 Ok(status)
1251 } else {
1252 Err(io::Error::new(
1253 io::ErrorKind::Other,
1254 format!("Failed to get status for transaction {status_result:?}"),
1255 ))
1256 }
1257 }
1258
1259 pub async fn transactions(
1261 &self,
1262 request: PaginationRequest<String>,
1263 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1264 let args = schema::ConnectionArgs::from(request);
1265 let query = schema::tx::TransactionsQuery::build(args);
1266 let transactions = self.query(query).await?.transactions.try_into()?;
1267 Ok(transactions)
1268 }
1269
1270 pub async fn transactions_by_owner(
1272 &self,
1273 owner: &Address,
1274 request: PaginationRequest<String>,
1275 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1276 let owner: schema::Address = (*owner).into();
1277 let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1278 let query = schema::tx::TransactionsByOwnerQuery::build(args);
1279
1280 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1281 Ok(transactions)
1282 }
1283
1284 pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1285 let query =
1286 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1287
1288 let tx = self.query(query).await?.transaction.ok_or_else(|| {
1289 io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1290 })?;
1291
1292 let receipts = match tx.status {
1293 Some(status) => match status {
1294 schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1295 s.receipts
1296 .into_iter()
1297 .map(TryInto::<Receipt>::try_into)
1298 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1299 )
1300 .transpose()?,
1301 schema::tx::TransactionStatus::FailureStatus(s) => Some(
1302 s.receipts
1303 .into_iter()
1304 .map(TryInto::<Receipt>::try_into)
1305 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1306 )
1307 .transpose()?,
1308 _ => None,
1309 },
1310 _ => None,
1311 };
1312
1313 Ok(receipts)
1314 }
1315
1316 #[cfg(feature = "test-helpers")]
1317 pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1318 let query = schema::tx::AllReceipts::build(());
1319 let receipts = self.query(query).await?.all_receipts;
1320
1321 let vec: Result<Vec<Receipt>, ConversionError> = receipts
1322 .into_iter()
1323 .map(TryInto::<Receipt>::try_into)
1324 .collect();
1325
1326 Ok(vec?)
1327 }
1328
1329 pub async fn produce_blocks(
1330 &self,
1331 blocks_to_produce: u32,
1332 start_timestamp: Option<u64>,
1333 ) -> io::Result<BlockHeight> {
1334 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1335 blocks_to_produce: blocks_to_produce.into(),
1336 start_timestamp: start_timestamp
1337 .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1338 });
1339
1340 let new_height = self.query(query).await?.produce_blocks;
1341
1342 Ok(new_height.into())
1343 }
1344
1345 pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1346 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1347 id: Some((*id).into()),
1348 });
1349
1350 let block = self
1351 .query(query)
1352 .await?
1353 .block
1354 .map(TryInto::try_into)
1355 .transpose()?;
1356
1357 Ok(block)
1358 }
1359
1360 pub async fn block_by_height(
1361 &self,
1362 height: BlockHeight,
1363 ) -> io::Result<Option<types::Block>> {
1364 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1365 height: Some(U32(height.into())),
1366 });
1367
1368 let block = self
1369 .query(query)
1370 .await?
1371 .block
1372 .map(TryInto::try_into)
1373 .transpose()?;
1374
1375 Ok(block)
1376 }
1377
1378 pub async fn da_compressed_block(
1379 &self,
1380 height: BlockHeight,
1381 ) -> io::Result<Option<Vec<u8>>> {
1382 let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1383 DaCompressedBlockByHeightArgs {
1384 height: U32(height.into()),
1385 },
1386 );
1387
1388 Ok(self
1389 .query(query)
1390 .await?
1391 .da_compressed_block
1392 .map(|b| b.bytes.into()))
1393 }
1394
1395 pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1397 let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1398 let blob = self.query(query).await?.blob.map(Into::into);
1399 Ok(blob)
1400 }
1401
1402 pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1404 let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1405 Ok(self.query(query).await?.blob.is_some())
1406 }
1407
1408 pub async fn blocks(
1410 &self,
1411 request: PaginationRequest<String>,
1412 ) -> io::Result<PaginatedResult<types::Block, String>> {
1413 let args = schema::ConnectionArgs::from(request);
1414 let query = schema::block::BlocksQuery::build(args);
1415
1416 let blocks = self.query(query).await?.blocks.try_into()?;
1417
1418 Ok(blocks)
1419 }
1420
1421 pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1422 let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1423 utxo_id: (*id).into(),
1424 });
1425 let coin = self.query(query).await?.coin.map(Into::into);
1426 Ok(coin)
1427 }
1428
1429 pub async fn coins(
1431 &self,
1432 owner: &Address,
1433 asset_id: Option<&AssetId>,
1434 request: PaginationRequest<String>,
1435 ) -> io::Result<PaginatedResult<types::Coin, String>> {
1436 let owner: schema::Address = (*owner).into();
1437 let asset_id: schema::AssetId = match asset_id {
1438 Some(asset_id) => (*asset_id).into(),
1439 None => schema::AssetId::default(),
1440 };
1441 let args = CoinsConnectionArgs::from((owner, asset_id, request));
1442 let query = schema::coins::CoinsQuery::build(args);
1443
1444 let coins = self.query(query).await?.coins.into();
1445 Ok(coins)
1446 }
1447
1448 pub async fn coins_to_spend(
1450 &self,
1451 owner: &Address,
1452 spend_query: Vec<(AssetId, u128, Option<u16>)>,
1453 excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1455 ) -> io::Result<Vec<Vec<types::CoinType>>> {
1456 let owner: schema::Address = (*owner).into();
1457 let spend_query: Vec<SpendQueryElementInput> = spend_query
1458 .iter()
1459 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1460 Ok(SpendQueryElementInput {
1461 asset_id: (*asset_id).into(),
1462 amount: (*amount).into(),
1463 max: (*max).map(|max| max.into()),
1464 })
1465 })
1466 .try_collect()?;
1467 let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1468 let args =
1469 schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1470 let query = schema::coins::CoinsToSpendQuery::build(args);
1471
1472 let coins_per_asset = self
1473 .query(query)
1474 .await?
1475 .coins_to_spend
1476 .into_iter()
1477 .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1478 .collect::<Vec<_>>();
1479 Ok(coins_per_asset)
1480 }
1481
1482 pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1483 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1484 id: (*id).into(),
1485 });
1486 let contract = self.query(query).await?.contract.map(Into::into);
1487 Ok(contract)
1488 }
1489
1490 pub async fn contract_balance(
1491 &self,
1492 id: &ContractId,
1493 asset: Option<&AssetId>,
1494 ) -> io::Result<u64> {
1495 let asset_id: schema::AssetId = match asset {
1496 Some(asset) => (*asset).into(),
1497 None => schema::AssetId::default(),
1498 };
1499
1500 let query =
1501 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1502 id: (*id).into(),
1503 asset: asset_id,
1504 });
1505
1506 let balance: types::ContractBalance =
1507 self.query(query).await?.contract_balance.into();
1508 Ok(balance.amount)
1509 }
1510
1511 pub async fn balance(
1512 &self,
1513 owner: &Address,
1514 asset_id: Option<&AssetId>,
1515 ) -> io::Result<u64> {
1516 let owner: schema::Address = (*owner).into();
1517 let asset_id: schema::AssetId = match asset_id {
1518 Some(asset_id) => (*asset_id).into(),
1519 None => schema::AssetId::default(),
1520 };
1521 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1522 let balance: types::Balance = self.query(query).await?.balance.into();
1523 Ok(balance.amount.try_into().unwrap_or(u64::MAX))
1524 }
1525
1526 pub async fn balances(
1528 &self,
1529 owner: &Address,
1530 request: PaginationRequest<String>,
1531 ) -> io::Result<PaginatedResult<types::Balance, String>> {
1532 let owner: schema::Address = (*owner).into();
1533 let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1534 let query = schema::balance::BalancesQuery::build(args);
1535
1536 let balances = self.query(query).await?.balances.into();
1537 Ok(balances)
1538 }
1539
1540 pub async fn contract_balances(
1541 &self,
1542 contract: &ContractId,
1543 request: PaginationRequest<String>,
1544 ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1545 let contract_id: schema::ContractId = (*contract).into();
1546 let args = ContractBalancesConnectionArgs::from((contract_id, request));
1547 let query = schema::contract::ContractBalancesQuery::build(args);
1548
1549 let balances = self.query(query).await?.contract_balances.into();
1550
1551 Ok(balances)
1552 }
1553
1554 pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1556 let query = schema::message::MessageQuery::build(NonceArgs {
1557 nonce: (*nonce).into(),
1558 });
1559 let message = self.query(query).await?.message.map(Into::into);
1560 Ok(message)
1561 }
1562
1563 pub async fn messages(
1564 &self,
1565 owner: Option<&Address>,
1566 request: PaginationRequest<String>,
1567 ) -> io::Result<PaginatedResult<types::Message, String>> {
1568 let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1569 let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1570 let query = schema::message::OwnedMessageQuery::build(args);
1571
1572 let messages = self.query(query).await?.messages.into();
1573
1574 Ok(messages)
1575 }
1576
1577 pub async fn contract_info(
1578 &self,
1579 contract: &ContractId,
1580 ) -> io::Result<Option<types::Contract>> {
1581 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1582 id: (*contract).into(),
1583 });
1584 let contract_info = self.query(query).await?.contract.map(Into::into);
1585 Ok(contract_info)
1586 }
1587
1588 pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1589 let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1590 nonce: (*nonce).into(),
1591 });
1592 let status = self.query(query).await?.message_status.into();
1593
1594 Ok(status)
1595 }
1596
1597 pub async fn message_proof(
1599 &self,
1600 transaction_id: &TxId,
1601 nonce: &Nonce,
1602 commit_block_id: Option<&BlockId>,
1603 commit_block_height: Option<BlockHeight>,
1604 ) -> io::Result<types::MessageProof> {
1605 let transaction_id: TransactionId = (*transaction_id).into();
1606 let nonce: schema::Nonce = (*nonce).into();
1607 let commit_block_id: Option<schema::BlockId> =
1608 commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1609 let commit_block_height = commit_block_height.map(Into::into);
1610 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1611 transaction_id,
1612 nonce,
1613 commit_block_id,
1614 commit_block_height,
1615 });
1616 let proof = self.query(query).await?.message_proof.try_into()?;
1617 Ok(proof)
1618 }
1619
1620 pub async fn relayed_transaction_status(
1621 &self,
1622 id: &Bytes32,
1623 ) -> io::Result<Option<RelayedTransactionStatus>> {
1624 let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1625 RelayedTransactionStatusArgs {
1626 id: id.to_owned().into(),
1627 },
1628 );
1629 let status = self
1630 .query(query)
1631 .await?
1632 .relayed_transaction_status
1633 .map(|status| status.try_into())
1634 .transpose()?;
1635 Ok(status)
1636 }
1637
1638 pub async fn asset_info(&self, asset_id: &AssetId) -> io::Result<AssetDetail> {
1639 let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1640 id: (*asset_id).into(),
1641 });
1642 let asset_info = self.query(query).await?.asset_details.into();
1643 Ok(asset_info)
1644 }
1645}
1646
1647#[cfg(any(test, feature = "test-helpers"))]
1648impl FuelClient {
1649 pub async fn transparent_transaction(
1650 &self,
1651 id: &TxId,
1652 ) -> io::Result<Option<types::TransactionType>> {
1653 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1654
1655 let transaction = self.query(query).await?.transaction;
1656
1657 Ok(transaction
1658 .map(|tx| {
1659 let response: TransactionResponse = tx.try_into()?;
1660 Ok::<_, ConversionError>(response.transaction)
1661 })
1662 .transpose()?)
1663 }
1664}