1#[cfg(feature = "subscriptions")]
2use crate::client::types::StatusWithTransaction;
3use crate::{
4 client::{
5 schema::{
6 Tai64Timestamp,
7 TransactionId,
8 block::BlockByHeightArgs,
9 coins::{
10 ExcludeInput,
11 SpendQueryElementInput,
12 },
13 contract::ContractBalanceQueryArgs,
14 gas_price::EstimateGasPrice,
15 message::MessageStatusArgs,
16 relayed_tx::RelayedTransactionStatusArgs,
17 tx::{
18 DryRunArg,
19 TxWithEstimatedPredicatesArg,
20 },
21 },
22 types::{
23 RelayedTransactionStatus,
24 asset::AssetDetail,
25 gas_price::LatestGasPrice,
26 message::MessageStatus,
27 primitives::{
28 Address,
29 AssetId,
30 BlockId,
31 ContractId,
32 UtxoId,
33 },
34 upgrades::StateTransitionBytecode,
35 },
36 },
37 reqwest_ext::{
38 FuelGraphQlResponse,
39 FuelOperation,
40 ReqwestExt,
41 },
42};
43use anyhow::Context;
44#[cfg(feature = "subscriptions")]
45use base64::prelude::{
46 BASE64_STANDARD,
47 Engine as _,
48};
49#[cfg(feature = "subscriptions")]
50use cynic::StreamingOperation;
51use cynic::{
52 Id,
53 MutationBuilder,
54 Operation,
55 QueryBuilder,
56};
57use fuel_core_types::{
58 blockchain::header::{
59 ConsensusParametersVersion,
60 StateTransitionBytecodeVersion,
61 },
62 fuel_asm::{
63 Instruction,
64 Word,
65 },
66 fuel_tx::{
67 BlobId,
68 Bytes32,
69 ConsensusParameters,
70 Receipt,
71 Transaction,
72 TxId,
73 },
74 fuel_types::{
75 self,
76 BlockHeight,
77 Nonce,
78 canonical::Serialize,
79 },
80 services::executor::{
81 StorageReadReplayEvent,
82 TransactionExecutionStatus,
83 },
84};
85#[cfg(feature = "subscriptions")]
86use futures::{
87 Stream,
88 StreamExt,
89};
90use itertools::Itertools;
91use pagination::{
92 PageDirection,
93 PaginatedResult,
94 PaginationRequest,
95};
96use schema::{
97 Bytes,
98 ContinueTx,
99 ContinueTxArgs,
100 ConversionError,
101 HexString,
102 IdArg,
103 MemoryArgs,
104 RegisterArgs,
105 RunResult,
106 SetBreakpoint,
107 SetBreakpointArgs,
108 SetSingleStepping,
109 SetSingleSteppingArgs,
110 StartTx,
111 StartTxArgs,
112 U32,
113 U64,
114 assets::AssetInfoArg,
115 balance::BalanceArgs,
116 blob::BlobByIdArgs,
117 block::BlockByIdArgs,
118 coins::{
119 CoinByIdArgs,
120 CoinsConnectionArgs,
121 },
122 contract::{
123 ContractBalancesConnectionArgs,
124 ContractByIdArgs,
125 },
126 da_compressed::DaCompressedBlockByHeightArgs,
127 gas_price::BlockHorizonArgs,
128 storage_read_replay::{
129 StorageReadReplay,
130 StorageReadReplayArgs,
131 },
132 tx::{
133 AssembleTxArg,
134 TransactionsByOwnerConnectionArgs,
135 TxArg,
136 TxIdArgs,
137 },
138};
139#[cfg(feature = "subscriptions")]
140use std::future;
141use std::{
142 convert::TryInto,
143 io::{
144 self,
145 ErrorKind,
146 },
147 net,
148 str::{
149 self,
150 FromStr,
151 },
152 sync::{
153 Arc,
154 Mutex,
155 },
156};
157use tai64::Tai64;
158use tracing as _;
159use types::{
160 TransactionResponse,
161 TransactionStatus,
162 assemble_tx::{
163 AssembleTransactionResult,
164 RequiredBalance,
165 },
166};
167
168use self::schema::{
169 block::ProduceBlockArgs,
170 message::{
171 MessageProofArgs,
172 NonceArgs,
173 },
174};
175
176pub mod pagination;
177pub mod schema;
178pub mod types;
179
180type RegisterId = u32;
181
182#[derive(Debug, derive_more::Display, derive_more::From)]
183#[non_exhaustive]
184pub enum Error {
188 #[from]
190 Other(anyhow::Error),
191}
192
193#[derive(Debug)]
196pub enum ConsistencyPolicy {
197 Auto {
201 height: Arc<Mutex<Option<BlockHeight>>>,
203 },
204 Manual {
207 height: Option<BlockHeight>,
209 },
210}
211
212impl Clone for ConsistencyPolicy {
213 fn clone(&self) -> Self {
214 match self {
215 Self::Auto { height } => Self::Auto {
216 height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
219 },
220 Self::Manual { height } => Self::Manual { height: *height },
221 }
222 }
223}
224
225#[derive(Debug, Default)]
226struct ChainStateInfo {
227 current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
228 current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
229}
230
231impl Clone for ChainStateInfo {
232 fn clone(&self) -> Self {
233 Self {
234 current_stf_version: Arc::new(Mutex::new(
235 self.current_stf_version.lock().ok().and_then(|v| *v),
236 )),
237 current_consensus_parameters_version: Arc::new(Mutex::new(
238 self.current_consensus_parameters_version
239 .lock()
240 .ok()
241 .and_then(|v| *v),
242 )),
243 }
244 }
245}
246
247#[derive(Debug, Clone)]
248pub struct FuelClient {
249 client: reqwest::Client,
250 #[cfg(feature = "subscriptions")]
251 cookie: std::sync::Arc<reqwest::cookie::Jar>,
252 url: reqwest::Url,
253 require_height: ConsistencyPolicy,
254 chain_state_info: ChainStateInfo,
255}
256
257impl FromStr for FuelClient {
258 type Err = anyhow::Error;
259
260 fn from_str(str: &str) -> Result<Self, Self::Err> {
261 let mut raw_url = str.to_string();
262 if !raw_url.starts_with("http") {
263 raw_url = format!("http://{raw_url}");
264 }
265
266 let mut url = reqwest::Url::parse(&raw_url)
267 .map_err(anyhow::Error::msg)
268 .with_context(|| format!("Invalid fuel-core URL: {str}"))?;
269 url.set_path("/v1/graphql");
270
271 #[cfg(feature = "subscriptions")]
272 {
273 let cookie = std::sync::Arc::new(reqwest::cookie::Jar::default());
274 let client = reqwest::Client::builder()
275 .cookie_provider(cookie.clone())
276 .build()
277 .map_err(anyhow::Error::msg)?;
278 Ok(Self {
279 client,
280 cookie,
281 url,
282 require_height: ConsistencyPolicy::Auto {
283 height: Arc::new(Mutex::new(None)),
284 },
285 chain_state_info: Default::default(),
286 })
287 }
288
289 #[cfg(not(feature = "subscriptions"))]
290 {
291 let client = reqwest::Client::new();
292 Ok(Self {
293 client,
294 url,
295 require_height: ConsistencyPolicy::Auto {
296 height: Arc::new(Mutex::new(None)),
297 },
298 chain_state_info: Default::default(),
299 })
300 }
301 }
302}
303
304impl<S> From<S> for FuelClient
305where
306 S: Into<net::SocketAddr>,
307{
308 fn from(socket: S) -> Self {
309 format!("http://{}", socket.into())
310 .as_str()
311 .parse()
312 .unwrap()
313 }
314}
315
316pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
317 let e = errors
318 .into_iter()
319 .fold(String::from("Response errors"), |mut s, e| {
320 s.push_str("; ");
321 s.push_str(e.as_str());
322 s
323 });
324 io::Error::new(io::ErrorKind::Other, e)
325}
326
327impl FuelClient {
328 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
329 Self::from_str(url.as_ref())
330 }
331
332 pub fn with_required_fuel_block_height(
333 &mut self,
334 new_height: Option<BlockHeight>,
335 ) -> &mut Self {
336 match &mut self.require_height {
337 ConsistencyPolicy::Auto { height } => {
338 *height.lock().expect("Mutex poisoned") = new_height;
339 }
340 ConsistencyPolicy::Manual { height } => {
341 *height = new_height;
342 }
343 }
344 self
345 }
346
347 pub fn use_manual_consistency_policy(
348 &mut self,
349 height: Option<BlockHeight>,
350 ) -> &mut Self {
351 self.require_height = ConsistencyPolicy::Manual { height };
352 self
353 }
354
355 pub fn required_block_height(&self) -> Option<BlockHeight> {
356 match &self.require_height {
357 ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
358 ConsistencyPolicy::Manual { height } => *height,
359 }
360 }
361
362 fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
363 if let Some(current_sft_version) = response
364 .extensions
365 .as_ref()
366 .and_then(|e| e.current_stf_version)
367 {
368 if let Ok(mut c) = self.chain_state_info.current_stf_version.lock() {
369 *c = Some(current_sft_version);
370 }
371 }
372
373 if let Some(current_consensus_parameters_version) = response
374 .extensions
375 .as_ref()
376 .and_then(|e| e.current_consensus_parameters_version)
377 {
378 if let Ok(mut c) = self
379 .chain_state_info
380 .current_consensus_parameters_version
381 .lock()
382 {
383 *c = Some(current_consensus_parameters_version);
384 }
385 }
386
387 let inner_required_height = match &self.require_height {
388 ConsistencyPolicy::Auto { height } => Some(height.clone()),
389 ConsistencyPolicy::Manual { .. } => None,
390 };
391
392 if let Some(inner_required_height) = inner_required_height {
393 if let Some(current_fuel_block_height) = response
394 .extensions
395 .as_ref()
396 .and_then(|e| e.current_fuel_block_height)
397 {
398 let mut lock = inner_required_height.lock().expect("Mutex poisoned");
399
400 if current_fuel_block_height >= lock.unwrap_or_default() {
401 *lock = Some(current_fuel_block_height);
402 }
403 }
404 }
405 }
406
407 pub async fn query<ResponseData, Vars>(
409 &self,
410 q: Operation<ResponseData, Vars>,
411 ) -> io::Result<ResponseData>
412 where
413 Vars: serde::Serialize,
414 ResponseData: serde::de::DeserializeOwned + 'static,
415 {
416 let required_fuel_block_height = self.required_block_height();
417 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
418 let response = self
419 .client
420 .post(self.url.clone())
421 .run_fuel_graphql(fuel_operation)
422 .await
423 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
424
425 self.decode_response(response)
426 }
427
428 fn decode_response<R, E>(&self, response: FuelGraphQlResponse<R, E>) -> io::Result<R>
429 where
430 R: serde::de::DeserializeOwned + 'static,
431 {
432 self.update_chain_state_info(&response);
433
434 if let Some(failed) = response
435 .extensions
436 .as_ref()
437 .and_then(|e| e.fuel_block_height_precondition_failed)
438 {
439 if failed {
440 return Err(io::Error::new(
441 io::ErrorKind::Other,
442 "The required block height was not met",
443 ));
444 }
445 }
446
447 let response = response.response;
448
449 match (response.data, response.errors) {
450 (Some(d), _) => Ok(d),
451 (_, Some(e)) => Err(from_strings_errors_to_std_error(
452 e.into_iter().map(|e| e.message).collect(),
453 )),
454 _ => Err(io::Error::new(io::ErrorKind::Other, "Invalid response")),
455 }
456 }
457
458 #[tracing::instrument(skip_all)]
459 #[cfg(feature = "subscriptions")]
460 async fn subscribe<ResponseData, Vars>(
461 &self,
462 q: StreamingOperation<ResponseData, Vars>,
463 ) -> io::Result<impl futures::Stream<Item = io::Result<ResponseData>> + '_>
464 where
465 Vars: serde::Serialize,
466 ResponseData: serde::de::DeserializeOwned + 'static + Send,
467 {
468 use core::ops::Deref;
469 use eventsource_client as es;
470 use hyper_rustls as _;
471 use reqwest::cookie::CookieStore;
472 let mut url = self.url.clone();
473 url.set_path("/v1/graphql-sub");
474
475 let required_fuel_block_height = self.required_block_height();
476 let fuel_operation = FuelOperation::new(q, required_fuel_block_height);
477
478 let json_query = serde_json::to_string(&fuel_operation)?;
479 let mut client_builder = es::ClientBuilder::for_url(url.as_str())
480 .map_err(|e| {
481 io::Error::new(
482 io::ErrorKind::Other,
483 format!("Failed to start client {e:?}"),
484 )
485 })?
486 .body(json_query)
487 .method("POST".to_string())
488 .header("content-type", "application/json")
489 .map_err(|e| {
490 io::Error::new(
491 io::ErrorKind::Other,
492 format!("Failed to add header to client {e:?}"),
493 )
494 })?;
495 if let Some(password) = url.password() {
496 let username = url.username();
497 let credentials = format!("{}:{}", username, password);
498 let authorization = format!("Basic {}", BASE64_STANDARD.encode(credentials));
499 client_builder = client_builder
500 .header("Authorization", &authorization)
501 .map_err(|e| {
502 io::Error::new(
503 io::ErrorKind::Other,
504 format!("Failed to add header to client {e:?}"),
505 )
506 })?;
507 }
508
509 if let Some(value) = self.cookie.deref().cookies(&self.url) {
510 let value = value.to_str().map_err(|e| {
511 io::Error::new(
512 io::ErrorKind::Other,
513 format!("Unable convert header value to string {e:?}"),
514 )
515 })?;
516 client_builder = client_builder
517 .header(reqwest::header::COOKIE.as_str(), value)
518 .map_err(|e| {
519 io::Error::new(
520 io::ErrorKind::Other,
521 format!("Failed to add header from `reqwest` to client {e:?}"),
522 )
523 })?;
524 }
525
526 let client = client_builder.build_with_conn(
527 hyper_rustls::HttpsConnectorBuilder::new()
528 .with_webpki_roots()
529 .https_or_http()
530 .enable_http1()
531 .build(),
532 );
533
534 let mut last = None;
535
536 enum Event<ResponseData> {
537 Connected,
538 ResponseData(ResponseData),
539 }
540
541 let mut init_stream = es::Client::stream(&client)
542 .take_while(|result| {
543 futures::future::ready(!matches!(result, Err(es::Error::Eof)))
544 })
545 .filter_map(move |result| {
546 tracing::debug!("Got result: {result:?}");
547 let r = match result {
548 Ok(es::SSE::Event(es::Event { data, .. })) => {
549 match serde_json::from_str::<FuelGraphQlResponse<ResponseData>>(
550 &data,
551 ) {
552 Ok(resp) => {
553 match self.decode_response(resp) {
554 Ok(resp) => {
555 match last.replace(data) {
556 Some(l)
558 if l == *last.as_ref().expect(
559 "Safe because of the replace above",
560 ) =>
561 {
562 None
563 }
564 _ => Some(Ok(Event::ResponseData(resp))),
565 }
566 }
567 Err(e) => Some(Err(io::Error::new(
568 io::ErrorKind::Other,
569 format!("Decode error: {e:?}"),
570 ))),
571 }
572 }
573 Err(e) => Some(Err(io::Error::new(
574 io::ErrorKind::Other,
575 format!("Json error: {e:?}"),
576 ))),
577 }
578 }
579 Ok(es::SSE::Connected(_)) => Some(Ok(Event::Connected)),
580 Ok(_) => None,
581 Err(e) => Some(Err(io::Error::new(
582 io::ErrorKind::Other,
583 format!("Graphql error: {e:?}"),
584 ))),
585 };
586 futures::future::ready(r)
587 });
588
589 let event = init_stream.next().await;
590 let stream_with_resp = init_stream.filter_map(|result| async move {
591 match result {
592 Ok(Event::Connected) => None,
593 Ok(Event::ResponseData(resp)) => Some(Ok(resp)),
594 Err(error) => Some(Err(error)),
595 }
596 });
597
598 let stream = match event {
599 Some(Ok(Event::Connected)) => {
600 tracing::debug!("Subscription connected");
601 stream_with_resp.boxed()
602 }
603 Some(Ok(Event::ResponseData(resp))) => {
604 tracing::debug!("Subscription returned response");
605 let joined_stream = futures::stream::once(async move { Ok(resp) })
606 .chain(stream_with_resp);
607 joined_stream.boxed()
608 }
609 Some(Err(e)) => return Err(e),
610 None => {
611 return Err(io::Error::new(
612 io::ErrorKind::Other,
613 "Subscription stream ended unexpectedly",
614 ));
615 }
616 };
617
618 Ok(stream)
619 }
620
621 pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
622 self.chain_state_info
623 .current_stf_version
624 .lock()
625 .ok()
626 .and_then(|value| *value)
627 }
628
629 pub fn latest_consensus_parameters_version(
630 &self,
631 ) -> Option<ConsensusParametersVersion> {
632 self.chain_state_info
633 .current_consensus_parameters_version
634 .lock()
635 .ok()
636 .and_then(|value| *value)
637 }
638
639 pub async fn health(&self) -> io::Result<bool> {
640 let query = schema::Health::build(());
641 self.query(query).await.map(|r| r.health)
642 }
643
644 pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
645 let query = schema::node_info::QueryNodeInfo::build(());
646 self.query(query).await.map(|r| r.node_info.into())
647 }
648
649 pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
650 let query = schema::gas_price::QueryLatestGasPrice::build(());
651 self.query(query).await.map(|r| r.latest_gas_price.into())
652 }
653
654 pub async fn estimate_gas_price(
655 &self,
656 block_horizon: u32,
657 ) -> io::Result<EstimateGasPrice> {
658 let args = BlockHorizonArgs {
659 block_horizon: Some(block_horizon.into()),
660 };
661 let query = schema::gas_price::QueryEstimateGasPrice::build(args);
662 self.query(query).await.map(|r| r.estimate_gas_price)
663 }
664
665 #[cfg(feature = "std")]
666 pub async fn connected_peers_info(
667 &self,
668 ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
669 let query = schema::node_info::QueryPeersInfo::build(());
670 self.query(query)
671 .await
672 .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
673 }
674
675 pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
676 let query = schema::chain::ChainQuery::build(());
677 self.query(query).await.and_then(|r| {
678 let result = r.chain.try_into()?;
679 Ok(result)
680 })
681 }
682
683 pub async fn consensus_parameters(
684 &self,
685 version: i32,
686 ) -> io::Result<Option<ConsensusParameters>> {
687 let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
688 let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
689
690 let result = self
691 .query(query)
692 .await?
693 .consensus_parameters
694 .map(TryInto::try_into)
695 .transpose()?;
696
697 Ok(result)
698 }
699
700 pub async fn state_transition_byte_code_by_version(
701 &self,
702 version: i32,
703 ) -> io::Result<Option<StateTransitionBytecode>> {
704 let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
705 let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
706
707 let result = self
708 .query(query)
709 .await?
710 .state_transition_bytecode_by_version
711 .map(TryInto::try_into)
712 .transpose()?;
713
714 Ok(result)
715 }
716
717 pub async fn state_transition_byte_code_by_root(
718 &self,
719 root: Bytes32,
720 ) -> io::Result<Option<StateTransitionBytecode>> {
721 let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
722 root: HexString(Bytes(root.to_vec())),
723 };
724 let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
725
726 let result = self
727 .query(query)
728 .await?
729 .state_transition_bytecode_by_root
730 .map(TryInto::try_into)
731 .transpose()?;
732
733 Ok(result)
734 }
735
736 pub async fn dry_run(
738 &self,
739 txs: &[Transaction],
740 ) -> io::Result<Vec<TransactionExecutionStatus>> {
741 self.dry_run_opt(txs, None, None, None).await
742 }
743
744 pub async fn dry_run_opt(
746 &self,
747 txs: &[Transaction],
748 utxo_validation: Option<bool>,
750 gas_price: Option<u64>,
751 at_height: Option<BlockHeight>,
752 ) -> io::Result<Vec<TransactionExecutionStatus>> {
753 let txs = txs
754 .iter()
755 .map(|tx| HexString(Bytes(tx.to_bytes())))
756 .collect::<Vec<HexString>>();
757 let query: Operation<schema::tx::DryRun, DryRunArg> =
758 schema::tx::DryRun::build(DryRunArg {
759 txs,
760 utxo_validation,
761 gas_price: gas_price.map(|gp| gp.into()),
762 block_height: at_height.map(|bh| bh.into()),
763 });
764 let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
765 tx_statuses
766 .into_iter()
767 .map(|tx_status| tx_status.try_into().map_err(Into::into))
768 .collect()
769 }
770
771 pub async fn dry_run_opt_record_storage_reads(
773 &self,
774 txs: &[Transaction],
775 utxo_validation: Option<bool>,
777 gas_price: Option<u64>,
778 at_height: Option<BlockHeight>,
779 ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
780 let txs = txs
781 .iter()
782 .map(|tx| HexString(Bytes(tx.to_bytes())))
783 .collect::<Vec<HexString>>();
784 let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
785 schema::tx::DryRunRecordStorageReads::build(DryRunArg {
786 txs,
787 utxo_validation,
788 gas_price: gas_price.map(|gp| gp.into()),
789 block_height: at_height.map(|bh| bh.into()),
790 });
791 let result = self
792 .query(query)
793 .await
794 .map(|r| r.dry_run_record_storage_reads)?;
795 let tx_statuses = result
796 .tx_statuses
797 .into_iter()
798 .map(|tx_status| tx_status.try_into().map_err(Into::into))
799 .collect::<io::Result<Vec<_>>>()?;
800 let storage_reads = result
801 .storage_reads
802 .into_iter()
803 .map(Into::into)
804 .collect::<Vec<_>>();
805 Ok((tx_statuses, storage_reads))
806 }
807
808 pub async fn storage_read_replay(
810 &self,
811 height: &BlockHeight,
812 ) -> io::Result<Vec<StorageReadReplayEvent>> {
813 let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
814 StorageReadReplay::build(StorageReadReplayArgs {
815 height: (*height).into(),
816 });
817 Ok(self
818 .query(query)
819 .await
820 .map(|r| r.storage_read_replay)?
821 .into_iter()
822 .map(Into::into)
823 .collect())
824 }
825
826 #[allow(clippy::too_many_arguments)]
849 pub async fn assemble_tx(
850 &self,
851 tx: &Transaction,
852 block_horizon: u32,
853 required_balances: Vec<RequiredBalance>,
854 fee_address_index: u16,
855 exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
856 estimate_predicates: bool,
857 reserve_gas: Option<u64>,
858 ) -> io::Result<AssembleTransactionResult> {
859 let tx = HexString(Bytes(tx.to_bytes()));
860 let block_horizon = block_horizon.into();
861
862 let required_balances: Vec<_> = required_balances
863 .into_iter()
864 .map(schema::tx::RequiredBalance::try_from)
865 .collect::<Result<Vec<_>, _>>()?;
866
867 let fee_address_index = fee_address_index.into();
868
869 let exclude_input = exclude.map(Into::into);
870
871 let reserve_gas = reserve_gas.map(U64::from);
872
873 let query_arg = AssembleTxArg {
874 tx,
875 block_horizon,
876 required_balances,
877 fee_address_index,
878 exclude_input,
879 estimate_predicates,
880 reserve_gas,
881 };
882
883 let query = schema::tx::AssembleTx::build(query_arg);
884 let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
885 Ok(assemble_tx_result.try_into()?)
886 }
887
888 pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
890 let serialized_tx = tx.to_bytes();
891 let query = schema::tx::EstimatePredicates::build(TxArg {
892 tx: HexString(Bytes(serialized_tx)),
893 });
894 let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
895 let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
896 *tx = tx_with_predicate;
897 Ok(())
898 }
899
900 pub async fn submit(
901 &self,
902 tx: &Transaction,
903 ) -> io::Result<types::primitives::TransactionId> {
904 self.submit_opt(tx, None).await
905 }
906
907 pub async fn submit_opt(
908 &self,
909 tx: &Transaction,
910 estimate_predicates: Option<bool>,
911 ) -> io::Result<types::primitives::TransactionId> {
912 let tx = tx.clone().to_bytes();
913 let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
914 tx: HexString(Bytes(tx)),
915 estimate_predicates,
916 });
917
918 let id = self.query(query).await.map(|r| r.submit)?.id.into();
919 Ok(id)
920 }
921
922 #[cfg(feature = "subscriptions")]
924 pub async fn submit_and_await_commit(
925 &self,
926 tx: &Transaction,
927 ) -> io::Result<TransactionStatus> {
928 self.submit_and_await_commit_opt(tx, None).await
929 }
930
931 #[cfg(feature = "subscriptions")]
940 pub async fn submit_and_await_commit_opt(
941 &self,
942 tx: &Transaction,
943 estimate_predicates: Option<bool>,
944 ) -> io::Result<TransactionStatus> {
945 use cynic::SubscriptionBuilder;
946 let tx = tx.clone().to_bytes();
947 let s =
948 schema::tx::SubmitAndAwaitSubscription::build(TxWithEstimatedPredicatesArg {
949 tx: HexString(Bytes(tx)),
950 estimate_predicates,
951 });
952
953 let mut stream = self.subscribe(s).await?.map(
954 |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
955 let status: TransactionStatus = r?.submit_and_await.try_into()?;
956 Result::<_, io::Error>::Ok(status)
957 },
958 );
959
960 let status = stream.next().await.ok_or(io::Error::new(
961 io::ErrorKind::Other,
962 "Failed to get status from the submission",
963 ))??;
964
965 Ok(status)
966 }
967
968 #[cfg(feature = "subscriptions")]
970 pub async fn submit_and_await_commit_with_tx(
971 &self,
972 tx: &Transaction,
973 ) -> io::Result<StatusWithTransaction> {
974 self.submit_and_await_commit_with_tx_opt(tx, None).await
975 }
976
977 #[cfg(feature = "subscriptions")]
979 pub async fn submit_and_await_commit_with_tx_opt(
980 &self,
981 tx: &Transaction,
982 estimate_predicates: Option<bool>,
983 ) -> io::Result<StatusWithTransaction> {
984 use cynic::SubscriptionBuilder;
985 let tx = tx.clone().to_bytes();
986 let s = schema::tx::SubmitAndAwaitSubscriptionWithTransaction::build(
987 TxWithEstimatedPredicatesArg {
988 tx: HexString(Bytes(tx)),
989 estimate_predicates,
990 },
991 );
992
993 let mut stream = self.subscribe(s).await?.map(
994 |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
995 let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
996 Result::<_, io::Error>::Ok(status)
997 },
998 );
999
1000 let status = stream.next().await.ok_or(io::Error::new(
1001 io::ErrorKind::Other,
1002 "Failed to get status from the submission",
1003 ))??;
1004
1005 Ok(status)
1006 }
1007
1008 #[cfg(feature = "subscriptions")]
1010 pub async fn submit_and_await_status(
1011 &self,
1012 tx: &Transaction,
1013 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1014 self.submit_and_await_status_opt(tx, None, None).await
1015 }
1016
1017 #[cfg(feature = "subscriptions")]
1019 pub async fn submit_and_await_status_opt(
1020 &self,
1021 tx: &Transaction,
1022 estimate_predicates: Option<bool>,
1023 include_preconfirmation: Option<bool>,
1024 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1025 use cynic::SubscriptionBuilder;
1026 use schema::tx::SubmitAndAwaitStatusArg;
1027 let tx = tx.clone().to_bytes();
1028 let s = schema::tx::SubmitAndAwaitStatusSubscription::build(
1029 SubmitAndAwaitStatusArg {
1030 tx: HexString(Bytes(tx)),
1031 estimate_predicates,
1032 include_preconfirmation,
1033 },
1034 );
1035
1036 let stream = self.subscribe(s).await?.map(
1037 |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1038 let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1039 Result::<_, io::Error>::Ok(status)
1040 },
1041 );
1042
1043 Ok(stream)
1044 }
1045
1046 #[cfg(feature = "subscriptions")]
1048 pub async fn contract_storage_slots(
1049 &self,
1050 contract_id: &ContractId,
1051 ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1052 use cynic::SubscriptionBuilder;
1053 use schema::storage::ContractStorageSlotsArgs;
1054 let s = schema::storage::ContractStorageSlots::build(ContractStorageSlotsArgs {
1055 contract_id: (*contract_id).into(),
1056 });
1057
1058 let stream = self.subscribe(s).await?.map(
1059 |result: io::Result<schema::storage::ContractStorageSlots>| {
1060 let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1061 Result::<_, io::Error>::Ok(result)
1062 },
1063 );
1064
1065 Ok(stream)
1066 }
1067
1068 #[cfg(feature = "subscriptions")]
1070 pub async fn contract_storage_balances(
1071 &self,
1072 contract_id: &ContractId,
1073 ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1074 {
1075 use cynic::SubscriptionBuilder;
1076 use schema::{
1077 contract::ContractBalance,
1078 storage::ContractStorageBalancesArgs,
1079 };
1080 let s = schema::storage::ContractStorageBalances::build(
1081 ContractStorageBalancesArgs {
1082 contract_id: (*contract_id).into(),
1083 },
1084 );
1085
1086 let stream = self.subscribe(s).await?.map(
1087 |result: io::Result<schema::storage::ContractStorageBalances>| {
1088 let result: ContractBalance = result?.contract_storage_balances;
1089 Result::<_, io::Error>::Ok(result)
1090 },
1091 );
1092
1093 Ok(stream)
1094 }
1095
1096 #[cfg(feature = "subscriptions")]
1098 pub async fn new_blocks_subscription(
1099 &self,
1100 ) -> io::Result<
1101 impl Stream<
1102 Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1103 > + '_,
1104 > {
1105 use cynic::SubscriptionBuilder;
1106 let s = schema::block::NewBlocksSubscription::build(());
1107
1108 let stream = self.subscribe(s).await?.map(
1109 |r: io::Result<schema::block::NewBlocksSubscription>| {
1110 let result: fuel_core_types::services::block_importer::ImportResult =
1111 postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1112 io::Error::new(
1113 io::ErrorKind::Other,
1114 format!("Failed to deserialize ImportResult: {e:?}"),
1115 )
1116 })?;
1117 Result::<_, io::Error>::Ok(result)
1118 },
1119 );
1120
1121 Ok(stream)
1122 }
1123
1124 #[cfg(feature = "subscriptions")]
1126 pub async fn preconfirmations_subscription(
1127 &self,
1128 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1129 use cynic::SubscriptionBuilder;
1130 let s = schema::tx::PreconfirmationsSubscription::build(());
1131
1132 let stream = self.subscribe(s).await?.map(
1133 |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1134 let status: TransactionStatus = r?.preconfirmations.try_into()?;
1135 Result::<_, io::Error>::Ok(status)
1136 },
1137 );
1138
1139 Ok(stream)
1140 }
1141
1142 pub async fn contract_slots_values(
1143 &self,
1144 contract_id: &ContractId,
1145 block_height: Option<BlockHeight>,
1146 requested_storage_slots: Vec<Bytes32>,
1147 ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1148 let query = schema::storage::ContractSlotValues::build(
1149 schema::storage::ContractSlotValuesArgs {
1150 contract_id: (*contract_id).into(),
1151 block_height: block_height.map(|b| (*b).into()),
1152 storage_slots: requested_storage_slots
1153 .into_iter()
1154 .map(Into::into)
1155 .collect(),
1156 },
1157 );
1158
1159 self.query(query)
1160 .await
1161 .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1162 }
1163
1164 pub async fn contract_balance_values(
1165 &self,
1166 contract_id: &ContractId,
1167 block_height: Option<BlockHeight>,
1168 requested_storage_slots: Vec<AssetId>,
1169 ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1170 let query = schema::storage::ContractBalanceValues::build(
1171 schema::storage::ContractBalanceValuesArgs {
1172 contract_id: (*contract_id).into(),
1173 block_height: block_height.map(|b| (*b).into()),
1174 assets: requested_storage_slots
1175 .into_iter()
1176 .map(Into::into)
1177 .collect(),
1178 },
1179 );
1180
1181 self.query(query)
1182 .await
1183 .map(|r| r.contract_balance_values.into_iter().collect())
1184 }
1185
1186 pub async fn start_session(&self) -> io::Result<String> {
1187 let query = schema::StartSession::build(());
1188
1189 self.query(query)
1190 .await
1191 .map(|r| r.start_session.into_inner())
1192 }
1193
1194 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1195 let query = schema::EndSession::build(IdArg { id: id.into() });
1196
1197 self.query(query).await.map(|r| r.end_session)
1198 }
1199
1200 pub async fn reset(&self, id: &str) -> io::Result<bool> {
1201 let query = schema::Reset::build(IdArg { id: id.into() });
1202
1203 self.query(query).await.map(|r| r.reset)
1204 }
1205
1206 pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1207 let op = serde_json::to_string(op)?;
1208 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1209
1210 self.query(query).await.map(|r| r.execute)
1211 }
1212
1213 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1214 let query = schema::Register::build(RegisterArgs {
1215 id: id.into(),
1216 register: register.into(),
1217 });
1218
1219 Ok(self.query(query).await?.register.0 as Word)
1220 }
1221
1222 pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1223 let query = schema::Memory::build(MemoryArgs {
1224 id: id.into(),
1225 start: start.into(),
1226 size: size.into(),
1227 });
1228
1229 let memory = self.query(query).await?.memory;
1230
1231 Ok(serde_json::from_str(memory.as_str())?)
1232 }
1233
1234 pub async fn set_breakpoint(
1235 &self,
1236 session_id: &str,
1237 contract: fuel_types::ContractId,
1238 pc: u64,
1239 ) -> io::Result<()> {
1240 let operation = SetBreakpoint::build(SetBreakpointArgs {
1241 id: Id::new(session_id),
1242 bp: schema::Breakpoint {
1243 contract: contract.into(),
1244 pc: U64(pc),
1245 },
1246 });
1247
1248 let response = self.query(operation).await?;
1249 assert!(
1250 response.set_breakpoint,
1251 "Setting breakpoint returned invalid reply"
1252 );
1253 Ok(())
1254 }
1255
1256 pub async fn set_single_stepping(
1257 &self,
1258 session_id: &str,
1259 enable: bool,
1260 ) -> io::Result<()> {
1261 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1262 id: Id::new(session_id),
1263 enable,
1264 });
1265 self.query(operation).await?;
1266 Ok(())
1267 }
1268
1269 pub async fn start_tx(
1270 &self,
1271 session_id: &str,
1272 tx: &Transaction,
1273 ) -> io::Result<RunResult> {
1274 let operation = StartTx::build(StartTxArgs {
1275 id: Id::new(session_id),
1276 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1277 });
1278 let response = self.query(operation).await?.start_tx;
1279 Ok(response)
1280 }
1281
1282 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1283 let operation = ContinueTx::build(ContinueTxArgs {
1284 id: Id::new(session_id),
1285 });
1286 let response = self.query(operation).await?.continue_tx;
1287 Ok(response)
1288 }
1289
1290 pub async fn transaction(
1291 &self,
1292 id: &TxId,
1293 ) -> io::Result<Option<TransactionResponse>> {
1294 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1295
1296 let transaction = self.query(query).await?.transaction;
1297
1298 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1299 }
1300
1301 pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1303 let query =
1304 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1305
1306 let status = self.query(query).await?.transaction.ok_or_else(|| {
1307 io::Error::new(
1308 ErrorKind::NotFound,
1309 format!("status not found for transaction {id} "),
1310 )
1311 })?;
1312
1313 let status = status
1314 .status
1315 .ok_or_else(|| {
1316 io::Error::new(
1317 ErrorKind::NotFound,
1318 format!("status not found for transaction {id}"),
1319 )
1320 })?
1321 .try_into()?;
1322 Ok(status)
1323 }
1324
1325 #[tracing::instrument(skip(self), level = "debug")]
1326 #[cfg(feature = "subscriptions")]
1327 pub async fn subscribe_transaction_status(
1329 &self,
1330 id: &TxId,
1331 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1332 self.subscribe_transaction_status_opt(id, None).await
1333 }
1334
1335 #[cfg(feature = "subscriptions")]
1336 pub async fn subscribe_transaction_status_opt(
1338 &self,
1339 id: &TxId,
1340 include_preconfirmation: Option<bool>,
1341 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1342 use cynic::SubscriptionBuilder;
1343 use schema::tx::StatusChangeSubscriptionArgs;
1344 let tx_id: TransactionId = (*id).into();
1345 let s =
1346 schema::tx::StatusChangeSubscription::build(StatusChangeSubscriptionArgs {
1347 id: tx_id,
1348 include_preconfirmation,
1349 });
1350
1351 tracing::debug!("subscribing");
1352 let stream = self.subscribe(s).await?.map(|tx| {
1353 tracing::debug!("received {tx:?}");
1354 let tx = tx?;
1355 let status = tx.status_change.try_into()?;
1356 Ok(status)
1357 });
1358
1359 Ok(stream)
1360 }
1361
1362 #[cfg(feature = "subscriptions")]
1363 pub async fn await_transaction_commit(
1368 &self,
1369 id: &TxId,
1370 ) -> io::Result<TransactionStatus> {
1371 let status_result = self
1374 .subscribe_transaction_status(id)
1375 .await?
1376 .skip_while(|status| {
1377 future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1378 })
1379 .next()
1380 .await;
1381
1382 if let Some(Ok(status)) = status_result {
1383 Ok(status)
1384 } else {
1385 Err(io::Error::new(
1386 io::ErrorKind::Other,
1387 format!("Failed to get status for transaction {status_result:?}"),
1388 ))
1389 }
1390 }
1391
1392 pub async fn transactions(
1394 &self,
1395 request: PaginationRequest<String>,
1396 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1397 let args = schema::ConnectionArgs::from(request);
1398 let query = schema::tx::TransactionsQuery::build(args);
1399 let transactions = self.query(query).await?.transactions.try_into()?;
1400 Ok(transactions)
1401 }
1402
1403 pub async fn transactions_by_owner(
1405 &self,
1406 owner: &Address,
1407 request: PaginationRequest<String>,
1408 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1409 let owner: schema::Address = (*owner).into();
1410 let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1411 let query = schema::tx::TransactionsByOwnerQuery::build(args);
1412
1413 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1414 Ok(transactions)
1415 }
1416
1417 pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1418 let query =
1419 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1420
1421 let tx = self.query(query).await?.transaction.ok_or_else(|| {
1422 io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1423 })?;
1424
1425 let receipts = match tx.status {
1426 Some(status) => match status {
1427 schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1428 s.receipts
1429 .into_iter()
1430 .map(TryInto::<Receipt>::try_into)
1431 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1432 )
1433 .transpose()?,
1434 schema::tx::TransactionStatus::FailureStatus(s) => Some(
1435 s.receipts
1436 .into_iter()
1437 .map(TryInto::<Receipt>::try_into)
1438 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1439 )
1440 .transpose()?,
1441 _ => None,
1442 },
1443 _ => None,
1444 };
1445
1446 Ok(receipts)
1447 }
1448
1449 #[cfg(feature = "test-helpers")]
1450 pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1451 let query = schema::tx::AllReceipts::build(());
1452 let receipts = self.query(query).await?.all_receipts;
1453
1454 let vec: Result<Vec<Receipt>, ConversionError> = receipts
1455 .into_iter()
1456 .map(TryInto::<Receipt>::try_into)
1457 .collect();
1458
1459 Ok(vec?)
1460 }
1461
1462 pub async fn produce_blocks(
1463 &self,
1464 blocks_to_produce: u32,
1465 start_timestamp: Option<u64>,
1466 ) -> io::Result<BlockHeight> {
1467 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1468 blocks_to_produce: blocks_to_produce.into(),
1469 start_timestamp: start_timestamp
1470 .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1471 });
1472
1473 let new_height = self.query(query).await?.produce_blocks;
1474
1475 Ok(new_height.into())
1476 }
1477
1478 pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1479 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1480 id: Some((*id).into()),
1481 });
1482
1483 let block = self
1484 .query(query)
1485 .await?
1486 .block
1487 .map(TryInto::try_into)
1488 .transpose()?;
1489
1490 Ok(block)
1491 }
1492
1493 pub async fn block_by_height(
1494 &self,
1495 height: BlockHeight,
1496 ) -> io::Result<Option<types::Block>> {
1497 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1498 height: Some(U32(height.into())),
1499 });
1500
1501 let block = self
1502 .query(query)
1503 .await?
1504 .block
1505 .map(TryInto::try_into)
1506 .transpose()?;
1507
1508 Ok(block)
1509 }
1510
1511 pub async fn da_compressed_block(
1512 &self,
1513 height: BlockHeight,
1514 ) -> io::Result<Option<Vec<u8>>> {
1515 let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1516 DaCompressedBlockByHeightArgs {
1517 height: U32(height.into()),
1518 },
1519 );
1520
1521 Ok(self
1522 .query(query)
1523 .await?
1524 .da_compressed_block
1525 .map(|b| b.bytes.into()))
1526 }
1527
1528 pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1530 let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1531 let blob = self.query(query).await?.blob.map(Into::into);
1532 Ok(blob)
1533 }
1534
1535 pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1537 let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1538 Ok(self.query(query).await?.blob.is_some())
1539 }
1540
1541 pub async fn blocks(
1543 &self,
1544 request: PaginationRequest<String>,
1545 ) -> io::Result<PaginatedResult<types::Block, String>> {
1546 let args = schema::ConnectionArgs::from(request);
1547 let query = schema::block::BlocksQuery::build(args);
1548
1549 let blocks = self.query(query).await?.blocks.try_into()?;
1550
1551 Ok(blocks)
1552 }
1553
1554 pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1555 let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1556 utxo_id: (*id).into(),
1557 });
1558 let coin = self.query(query).await?.coin.map(Into::into);
1559 Ok(coin)
1560 }
1561
1562 pub async fn coins(
1564 &self,
1565 owner: &Address,
1566 asset_id: Option<&AssetId>,
1567 request: PaginationRequest<String>,
1568 ) -> io::Result<PaginatedResult<types::Coin, String>> {
1569 let owner: schema::Address = (*owner).into();
1570 let asset_id = asset_id.map(|id| (*id).into());
1571 let args = CoinsConnectionArgs::from((owner, asset_id, request));
1572 let query = schema::coins::CoinsQuery::build(args);
1573
1574 let coins = self.query(query).await?.coins.into();
1575 Ok(coins)
1576 }
1577
1578 pub async fn coins_to_spend(
1580 &self,
1581 owner: &Address,
1582 spend_query: Vec<(AssetId, u128, Option<u16>)>,
1583 excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1585 ) -> io::Result<Vec<Vec<types::CoinType>>> {
1586 let owner: schema::Address = (*owner).into();
1587 let spend_query: Vec<SpendQueryElementInput> = spend_query
1588 .iter()
1589 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1590 Ok(SpendQueryElementInput {
1591 asset_id: (*asset_id).into(),
1592 amount: (*amount).into(),
1593 max: (*max).map(|max| max.into()),
1594 })
1595 })
1596 .try_collect()?;
1597 let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1598 let args =
1599 schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1600 let query = schema::coins::CoinsToSpendQuery::build(args);
1601
1602 let coins_per_asset = self
1603 .query(query)
1604 .await?
1605 .coins_to_spend
1606 .into_iter()
1607 .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1608 .collect::<Vec<_>>();
1609 Ok(coins_per_asset)
1610 }
1611
1612 pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1613 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1614 id: (*id).into(),
1615 });
1616 let contract = self.query(query).await?.contract.map(Into::into);
1617 Ok(contract)
1618 }
1619
1620 pub async fn contract_balance(
1621 &self,
1622 id: &ContractId,
1623 asset: Option<&AssetId>,
1624 ) -> io::Result<u64> {
1625 let asset_id: schema::AssetId = match asset {
1626 Some(asset) => (*asset).into(),
1627 None => schema::AssetId::default(),
1628 };
1629
1630 let query =
1631 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1632 id: (*id).into(),
1633 asset: asset_id,
1634 });
1635
1636 let balance: types::ContractBalance =
1637 self.query(query).await?.contract_balance.into();
1638 Ok(balance.amount)
1639 }
1640
1641 pub async fn balance(
1642 &self,
1643 owner: &Address,
1644 asset_id: Option<&AssetId>,
1645 ) -> io::Result<u128> {
1646 let owner: schema::Address = (*owner).into();
1647 let asset_id: schema::AssetId = match asset_id {
1648 Some(asset_id) => (*asset_id).into(),
1649 None => schema::AssetId::default(),
1650 };
1651 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1652 let balance: types::Balance = self.query(query).await?.balance.into();
1653 Ok(balance.amount)
1654 }
1655
1656 pub async fn balances(
1658 &self,
1659 owner: &Address,
1660 request: PaginationRequest<String>,
1661 ) -> io::Result<PaginatedResult<types::Balance, String>> {
1662 let owner: schema::Address = (*owner).into();
1663 let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1664 let query = schema::balance::BalancesQuery::build(args);
1665
1666 let balances = self.query(query).await?.balances.into();
1667 Ok(balances)
1668 }
1669
1670 pub async fn contract_balances(
1671 &self,
1672 contract: &ContractId,
1673 request: PaginationRequest<String>,
1674 ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1675 let contract_id: schema::ContractId = (*contract).into();
1676 let args = ContractBalancesConnectionArgs::from((contract_id, request));
1677 let query = schema::contract::ContractBalancesQuery::build(args);
1678
1679 let balances = self.query(query).await?.contract_balances.into();
1680
1681 Ok(balances)
1682 }
1683
1684 pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1686 let query = schema::message::MessageQuery::build(NonceArgs {
1687 nonce: (*nonce).into(),
1688 });
1689 let message = self.query(query).await?.message.map(Into::into);
1690 Ok(message)
1691 }
1692
1693 pub async fn messages(
1694 &self,
1695 owner: Option<&Address>,
1696 request: PaginationRequest<String>,
1697 ) -> io::Result<PaginatedResult<types::Message, String>> {
1698 let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1699 let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1700 let query = schema::message::OwnedMessageQuery::build(args);
1701
1702 let messages = self.query(query).await?.messages.into();
1703
1704 Ok(messages)
1705 }
1706
1707 pub async fn contract_info(
1708 &self,
1709 contract: &ContractId,
1710 ) -> io::Result<Option<types::Contract>> {
1711 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1712 id: (*contract).into(),
1713 });
1714 let contract_info = self.query(query).await?.contract.map(Into::into);
1715 Ok(contract_info)
1716 }
1717
1718 pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1719 let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1720 nonce: (*nonce).into(),
1721 });
1722 let status = self.query(query).await?.message_status.into();
1723
1724 Ok(status)
1725 }
1726
1727 pub async fn message_proof(
1729 &self,
1730 transaction_id: &TxId,
1731 nonce: &Nonce,
1732 commit_block_id: Option<&BlockId>,
1733 commit_block_height: Option<BlockHeight>,
1734 ) -> io::Result<types::MessageProof> {
1735 let transaction_id: TransactionId = (*transaction_id).into();
1736 let nonce: schema::Nonce = (*nonce).into();
1737 let commit_block_id: Option<schema::BlockId> =
1738 commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1739 let commit_block_height = commit_block_height.map(Into::into);
1740 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1741 transaction_id,
1742 nonce,
1743 commit_block_id,
1744 commit_block_height,
1745 });
1746 let proof = self.query(query).await?.message_proof.try_into()?;
1747 Ok(proof)
1748 }
1749
1750 pub async fn relayed_transaction_status(
1751 &self,
1752 id: &Bytes32,
1753 ) -> io::Result<Option<RelayedTransactionStatus>> {
1754 let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1755 RelayedTransactionStatusArgs {
1756 id: id.to_owned().into(),
1757 },
1758 );
1759 let status = self
1760 .query(query)
1761 .await?
1762 .relayed_transaction_status
1763 .map(|status| status.try_into())
1764 .transpose()?;
1765 Ok(status)
1766 }
1767
1768 pub async fn asset_info(
1769 &self,
1770 asset_id: &AssetId,
1771 ) -> io::Result<Option<AssetDetail>> {
1772 let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1773 id: (*asset_id).into(),
1774 });
1775 let asset_info = self.query(query).await?.asset_details.map(Into::into);
1776 Ok(asset_info)
1777 }
1778}
1779
1780#[cfg(any(test, feature = "test-helpers"))]
1781impl FuelClient {
1782 pub async fn transparent_transaction(
1783 &self,
1784 id: &TxId,
1785 ) -> io::Result<Option<types::TransactionType>> {
1786 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1787
1788 let transaction = self.query(query).await?.transaction;
1789
1790 Ok(transaction
1791 .map(|tx| {
1792 let response: TransactionResponse = tx.try_into()?;
1793 Ok::<_, ConversionError>(response.transaction)
1794 })
1795 .transpose()?)
1796 }
1797}