1use self::schema::{
2 block::ProduceBlockArgs,
3 message::{
4 MessageProofArgs,
5 NonceArgs,
6 },
7};
8#[cfg(feature = "subscriptions")]
9use crate::client::types::StatusWithTransaction;
10use crate::{
11 client::{
12 schema::{
13 Tai64Timestamp,
14 TransactionId,
15 block::BlockByHeightArgs,
16 coins::{
17 ExcludeInput,
18 SpendQueryElementInput,
19 },
20 contract::ContractBalanceQueryArgs,
21 gas_price::EstimateGasPrice,
22 message::MessageStatusArgs,
23 relayed_tx::RelayedTransactionStatusArgs,
24 tx::{
25 DryRunArg,
26 TxWithEstimatedPredicatesArg,
27 },
28 },
29 types::{
30 RelayedTransactionStatus,
31 asset::AssetDetail,
32 gas_price::LatestGasPrice,
33 message::MessageStatus,
34 primitives::{
35 Address,
36 AssetId,
37 BlockId,
38 ContractId,
39 UtxoId,
40 },
41 upgrades::StateTransitionBytecode,
42 },
43 },
44 reqwest_ext::FuelGraphQlResponse,
45 transport::FailoverTransport,
46};
47use anyhow::{
48 Context,
49 anyhow,
50};
51#[cfg(feature = "subscriptions")]
52use cynic::SubscriptionBuilder;
53use cynic::{
54 Id,
55 MutationBuilder,
56 Operation,
57 QueryBuilder,
58 QueryFragment,
59 QueryVariables,
60};
61use fuel_core_types::{
62 blockchain::header::{
63 ConsensusParametersVersion,
64 StateTransitionBytecodeVersion,
65 },
66 fuel_asm::{
67 Instruction,
68 Word,
69 },
70 fuel_tx::{
71 BlobId,
72 Bytes32,
73 ConsensusParameters,
74 Receipt,
75 Transaction,
76 TxId,
77 },
78 fuel_types::{
79 self,
80 BlockHeight,
81 Nonce,
82 canonical::Serialize,
83 },
84 services::executor::{
85 StorageReadReplayEvent,
86 TransactionExecutionStatus,
87 },
88};
89#[cfg(feature = "subscriptions")]
90use futures::{
91 Stream,
92 StreamExt,
93};
94use itertools::Itertools;
95use pagination::{
96 PageDirection,
97 PaginatedResult,
98 PaginationRequest,
99};
100use reqwest::Url;
101use schema::{
102 Bytes,
103 ContinueTx,
104 ContinueTxArgs,
105 ConversionError,
106 HexString,
107 IdArg,
108 MemoryArgs,
109 RegisterArgs,
110 RunResult,
111 SetBreakpoint,
112 SetBreakpointArgs,
113 SetSingleStepping,
114 SetSingleSteppingArgs,
115 StartTx,
116 StartTxArgs,
117 U32,
118 U64,
119 assets::AssetInfoArg,
120 balance::BalanceArgs,
121 blob::BlobByIdArgs,
122 block::BlockByIdArgs,
123 coins::{
124 CoinByIdArgs,
125 CoinsConnectionArgs,
126 },
127 contract::{
128 ContractBalancesConnectionArgs,
129 ContractByIdArgs,
130 },
131 da_compressed::DaCompressedBlockByHeightArgs,
132 gas_price::BlockHorizonArgs,
133 storage_read_replay::{
134 StorageReadReplay,
135 StorageReadReplayArgs,
136 },
137 tx::{
138 AssembleTxArg,
139 TransactionsByOwnerConnectionArgs,
140 TxArg,
141 TxIdArgs,
142 },
143};
144#[cfg(feature = "subscriptions")]
145use std::future;
146use std::{
147 convert::TryInto,
148 io::{
149 self,
150 ErrorKind,
151 },
152 net,
153 str::{
154 self,
155 FromStr,
156 },
157 sync::{
158 Arc,
159 Mutex,
160 },
161};
162use tai64::Tai64;
163use tracing as _;
164use types::{
165 TransactionResponse,
166 TransactionStatus,
167 assemble_tx::{
168 AssembleTransactionResult,
169 RequiredBalance,
170 },
171};
172
173#[cfg(feature = "subscriptions")]
174use std::pin::Pin;
175
176#[cfg(feature = "rpc")]
177mod rpc_deps {
178 pub use aws_config::{
179 BehaviorVersion,
180 default_provider::credentials::DefaultCredentialsChain,
181 };
182 pub use aws_sdk_s3::Client as AWSClient;
183 pub use flate2::read::GzDecoder;
184 pub use fuel_core_block_aggregator_api::{
185 blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf,
186 protobuf_types::{
187 Block as ProtoBlock,
188 BlockHeightRequest as ProtoBlockHeightRequest,
189 BlockRangeRequest as ProtoBlockRangeRequest,
190 BlockResponse,
191 NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest,
192 RemoteBlockResponse,
193 RemoteS3Bucket,
194 block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient,
195 block_response::Payload,
196 remote_block_response::Location,
197 },
198 };
199 pub use prost::Message;
200 pub use std::{
201 collections::HashMap,
202 io::Read,
203 };
204 pub use tokio::sync::RwLock;
205 pub use tonic::transport::Channel;
206}
207#[cfg(feature = "rpc")]
208use rpc_deps::*;
209
210pub mod pagination;
211pub mod schema;
212pub mod types;
213
214type RegisterId = u32;
215
216#[derive(Debug, derive_more::Display, derive_more::From)]
217#[non_exhaustive]
218pub enum Error {
222 #[from]
224 Other(anyhow::Error),
225}
226
227#[derive(Debug)]
230pub enum ConsistencyPolicy {
231 Auto {
235 height: Arc<Mutex<Option<BlockHeight>>>,
237 },
238 Manual {
241 height: Option<BlockHeight>,
243 },
244}
245
246impl Clone for ConsistencyPolicy {
247 fn clone(&self) -> Self {
248 match self {
249 Self::Auto { height } => Self::Auto {
250 height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
253 },
254 Self::Manual { height } => Self::Manual { height: *height },
255 }
256 }
257}
258
259#[derive(Debug, Default)]
260struct ChainStateInfo {
261 current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
262 current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
263}
264
265impl Clone for ChainStateInfo {
266 fn clone(&self) -> Self {
267 Self {
268 current_stf_version: Arc::new(Mutex::new(
269 self.current_stf_version.lock().ok().and_then(|v| *v),
270 )),
271 current_consensus_parameters_version: Arc::new(Mutex::new(
272 self.current_consensus_parameters_version
273 .lock()
274 .ok()
275 .and_then(|v| *v),
276 )),
277 }
278 }
279}
280
281#[derive(Debug, Clone)]
282pub struct FuelClient {
283 transport: FailoverTransport,
284 require_height: ConsistencyPolicy,
285 chain_state_info: ChainStateInfo,
286 #[cfg(feature = "rpc")]
287 rpc_client: Option<ProtoBlockAggregatorClient<Channel>>,
288 #[cfg(feature = "rpc")]
289 aws_client: AWSClientManager,
290}
291
292#[cfg(feature = "rpc")]
293#[derive(Debug, Clone)]
294pub struct AWSClientManager {
295 specific: Arc<RwLock<HashMap<Option<String>, AWSClient>>>,
296}
297
298#[cfg(feature = "rpc")]
299impl AWSClientManager {
300 pub fn new() -> Self {
301 Self {
302 specific: Arc::new(RwLock::new(HashMap::new())),
303 }
304 }
305
306 pub async fn get_client(&self, url: &Option<String>) -> Result<AWSClient, io::Error> {
307 if let Some(existing) = self.specific.read().await.get(url).cloned() {
308 return Ok(existing);
309 }
310 let client = FuelClient::new_aws_client(url).await;
311 let mut guard = self.specific.write().await;
312 let client = guard
313 .entry(url.clone())
314 .or_insert_with(|| client.clone())
315 .clone();
316 Ok(client)
317 }
318}
319
320#[cfg(feature = "rpc")]
321impl Default for AWSClientManager {
322 fn default() -> Self {
323 Self::new()
324 }
325}
326
327fn normalize_url(url_str: &str) -> anyhow::Result<Url> {
329 let mut raw_url = url_str.to_string();
330 if !raw_url.starts_with("http") {
331 raw_url = format!("http://{raw_url}");
332 }
333
334 let mut url = reqwest::Url::parse(&raw_url)
335 .map_err(anyhow::Error::msg)
336 .with_context(|| format!("Invalid fuel-core URL: {url_str}"))?;
337 url.set_path("/v1/graphql");
338
339 Ok(url)
340}
341
342impl FromStr for FuelClient {
343 type Err = anyhow::Error;
344
345 fn from_str(str: &str) -> Result<Self, Self::Err> {
346 let url = normalize_url(str)?;
347
348 Ok(Self {
349 transport: FailoverTransport::new(vec![url])?,
350 require_height: ConsistencyPolicy::Auto {
351 height: Arc::new(Mutex::new(None)),
352 },
353 chain_state_info: Default::default(),
354 #[cfg(feature = "rpc")]
355 rpc_client: None,
356 #[cfg(feature = "rpc")]
357 aws_client: AWSClientManager::new(),
358 })
359 }
360}
361
362impl<S> From<S> for FuelClient
363where
364 S: Into<net::SocketAddr>,
365{
366 fn from(socket: S) -> Self {
367 format!("http://{}", socket.into())
368 .as_str()
369 .parse()
370 .unwrap()
371 }
372}
373
374pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
375 let e = errors
376 .into_iter()
377 .fold(String::from("Response errors"), |mut s, e| {
378 s.push_str("; ");
379 s.push_str(e.as_str());
380 s
381 });
382 io::Error::other(e)
383}
384
385impl FuelClient {
386 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
387 Self::from_str(url.as_ref())
388 }
389
390 #[cfg(feature = "rpc")]
391 pub async fn new_with_rpc<G: AsRef<str>, R: AsRef<str>>(
392 graph_ql_urls: impl Iterator<Item = G>,
393 rpc_url: R,
394 ) -> anyhow::Result<Self> {
395 let urls: Vec<_> = graph_ql_urls
396 .map(|str| normalize_url(str.as_ref()))
397 .try_collect()?;
398 let mut client = Self::with_urls(&urls)?;
399 let mut raw_rpc_url = <R as AsRef<str>>::as_ref(&rpc_url).to_string();
400 if !raw_rpc_url.starts_with("http") {
401 raw_rpc_url = format!("http://{raw_rpc_url}");
402 }
403 let rpc_client = ProtoBlockAggregatorClient::connect(raw_rpc_url).await?;
404 client.rpc_client = Some(rpc_client);
405 client.aws_client = AWSClientManager::new();
406 Ok(client)
407 }
408
409 pub fn with_urls(urls: &[impl AsRef<str>]) -> anyhow::Result<Self> {
410 if urls.is_empty() {
411 return Err(anyhow!("Failed to create FuelClient. No URL is provided."));
412 }
413 let urls = urls
414 .iter()
415 .map(|url| normalize_url(url.as_ref()))
416 .collect::<Result<Vec<_>, _>>()?;
417 Ok(Self {
418 transport: FailoverTransport::new(urls)?,
419 require_height: ConsistencyPolicy::Auto {
420 height: Arc::new(Mutex::new(None)),
421 },
422 chain_state_info: Default::default(),
423 #[cfg(feature = "rpc")]
424 rpc_client: None,
425 #[cfg(feature = "rpc")]
426 aws_client: AWSClientManager::new(),
427 })
428 }
429
430 pub fn get_default_url(&self) -> &Url {
431 self.transport.get_default_url()
432 }
433}
434
435impl FuelClient {
436 pub fn with_required_fuel_block_height(
437 &mut self,
438 new_height: Option<BlockHeight>,
439 ) -> &mut Self {
440 match &mut self.require_height {
441 ConsistencyPolicy::Auto { height } => {
442 *height.lock().expect("Mutex poisoned") = new_height;
443 }
444 ConsistencyPolicy::Manual { height } => {
445 *height = new_height;
446 }
447 }
448 self
449 }
450
451 pub fn use_manual_consistency_policy(
452 &mut self,
453 height: Option<BlockHeight>,
454 ) -> &mut Self {
455 self.require_height = ConsistencyPolicy::Manual { height };
456 self
457 }
458
459 pub fn decode_response<R, E>(
460 &self,
461 response: FuelGraphQlResponse<R, E>,
462 ) -> io::Result<R>
463 where
464 R: serde::de::DeserializeOwned + 'static,
465 {
466 if response
467 .extensions
468 .as_ref()
469 .and_then(|e| e.fuel_block_height_precondition_failed)
470 == Some(true)
471 {
472 return Err(io::Error::other("The required block height was not met"));
473 }
474
475 let response = response.response;
476
477 match (response.data, response.errors) {
478 (Some(d), _) => Ok(d),
479 (_, Some(e)) => Err(from_strings_errors_to_std_error(
480 e.into_iter().map(|e| e.message).collect(),
481 )),
482 _ => Err(io::Error::other("Invalid response")),
483 }
484 }
485
486 pub fn required_block_height(&self) -> Option<BlockHeight> {
487 match &self.require_height {
488 ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
489 ConsistencyPolicy::Manual { height } => *height,
490 }
491 }
492
493 fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
494 if let Some(current_sft_version) = response
495 .extensions
496 .as_ref()
497 .and_then(|e| e.current_stf_version)
498 && let Ok(mut c) = self.chain_state_info.current_stf_version.lock()
499 {
500 *c = Some(current_sft_version);
501 }
502
503 if let Some(current_consensus_parameters_version) = response
504 .extensions
505 .as_ref()
506 .and_then(|e| e.current_consensus_parameters_version)
507 && let Ok(mut c) = self
508 .chain_state_info
509 .current_consensus_parameters_version
510 .lock()
511 {
512 *c = Some(current_consensus_parameters_version);
513 }
514
515 let inner_required_height = match &self.require_height {
516 ConsistencyPolicy::Auto { height } => Some(height.clone()),
517 ConsistencyPolicy::Manual { .. } => None,
518 };
519
520 if let Some(inner_required_height) = inner_required_height
521 && let Some(current_fuel_block_height) = response
522 .extensions
523 .as_ref()
524 .and_then(|e| e.current_fuel_block_height)
525 {
526 let mut lock = inner_required_height.lock().expect("Mutex poisoned");
527
528 if current_fuel_block_height >= lock.unwrap_or_default() {
529 *lock = Some(current_fuel_block_height);
530 }
531 }
532 }
533
534 pub async fn query<ResponseData, Vars>(
536 &self,
537 q: Operation<ResponseData, Vars>,
538 ) -> io::Result<ResponseData>
539 where
540 Vars: serde::Serialize + Clone + QueryVariables + Send + 'static,
541 ResponseData: serde::de::DeserializeOwned + QueryFragment + Send + 'static,
542 {
543 let required_fuel_block_height = self.required_block_height();
544 let response = self.transport.query(q, required_fuel_block_height).await?;
545
546 self.update_chain_state_info(&response);
547 self.decode_response(response)
548 }
549
550 #[tracing::instrument(skip_all)]
551 #[cfg(feature = "subscriptions")]
552 async fn subscribe<ResponseData, Variables>(
553 &self,
554 variables: Variables,
555 ) -> io::Result<Pin<Box<impl futures::Stream<Item = io::Result<ResponseData>> + '_>>>
556 where
557 Variables: serde::Serialize + QueryVariables + Send + Clone + 'static,
558 ResponseData: serde::de::DeserializeOwned
559 + QueryFragment
560 + SubscriptionBuilder<Variables>
561 + 'static
562 + Send,
563 {
564 let stream = self
565 .transport
566 .subscribe(variables, self.required_block_height())
567 .await?;
568
569 let client = self; Ok(Box::pin(stream.filter_map(move |result| {
571 async move {
572 match result {
573 Ok(resp) => {
574 client.update_chain_state_info(&resp);
575 Some(client.decode_response(resp))
576 }
577 Err(e) => Some(Err(e)), }
579 }
580 })))
581 }
582
583 pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
584 self.chain_state_info
585 .current_stf_version
586 .lock()
587 .ok()
588 .and_then(|value| *value)
589 }
590
591 pub fn latest_consensus_parameters_version(
592 &self,
593 ) -> Option<ConsensusParametersVersion> {
594 self.chain_state_info
595 .current_consensus_parameters_version
596 .lock()
597 .ok()
598 .and_then(|value| *value)
599 }
600
601 pub async fn health(&self) -> io::Result<bool> {
602 let query = schema::Health::build(());
603 self.query(query).await.map(|r| r.health)
604 }
605
606 pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
607 let query = schema::node_info::QueryNodeInfo::build(());
608 self.query(query).await.map(|r| r.node_info.into())
609 }
610
611 pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
612 let query = schema::gas_price::QueryLatestGasPrice::build(());
613 self.query(query).await.map(|r| r.latest_gas_price.into())
614 }
615
616 pub async fn estimate_gas_price(
617 &self,
618 block_horizon: u32,
619 ) -> io::Result<EstimateGasPrice> {
620 let args = BlockHorizonArgs {
621 block_horizon: Some(block_horizon.into()),
622 };
623 let query = schema::gas_price::QueryEstimateGasPrice::build(args);
624 self.query(query).await.map(|r| r.estimate_gas_price)
625 }
626
627 #[cfg(feature = "std")]
628 pub async fn connected_peers_info(
629 &self,
630 ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
631 let query = schema::node_info::QueryPeersInfo::build(());
632 self.query(query)
633 .await
634 .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
635 }
636
637 pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
638 let query = schema::chain::ChainQuery::build(());
639 self.query(query).await.and_then(|r| {
640 let result = r.chain.try_into()?;
641 Ok(result)
642 })
643 }
644
645 pub async fn consensus_parameters(
646 &self,
647 version: i32,
648 ) -> io::Result<Option<ConsensusParameters>> {
649 let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
650 let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
651
652 let result = self
653 .query(query)
654 .await?
655 .consensus_parameters
656 .map(TryInto::try_into)
657 .transpose()?;
658
659 Ok(result)
660 }
661
662 pub async fn state_transition_byte_code_by_version(
663 &self,
664 version: i32,
665 ) -> io::Result<Option<StateTransitionBytecode>> {
666 let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
667 let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
668
669 let result = self
670 .query(query)
671 .await?
672 .state_transition_bytecode_by_version
673 .map(TryInto::try_into)
674 .transpose()?;
675
676 Ok(result)
677 }
678
679 pub async fn state_transition_byte_code_by_root(
680 &self,
681 root: Bytes32,
682 ) -> io::Result<Option<StateTransitionBytecode>> {
683 let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
684 root: HexString(Bytes(root.to_vec())),
685 };
686 let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
687
688 let result = self
689 .query(query)
690 .await?
691 .state_transition_bytecode_by_root
692 .map(TryInto::try_into)
693 .transpose()?;
694
695 Ok(result)
696 }
697
698 pub async fn dry_run(
700 &self,
701 txs: &[Transaction],
702 ) -> io::Result<Vec<TransactionExecutionStatus>> {
703 self.dry_run_opt(txs, None, None, None).await
704 }
705
706 pub async fn dry_run_opt(
708 &self,
709 txs: &[Transaction],
710 utxo_validation: Option<bool>,
712 gas_price: Option<u64>,
713 at_height: Option<BlockHeight>,
714 ) -> io::Result<Vec<TransactionExecutionStatus>> {
715 let txs = txs
716 .iter()
717 .map(|tx| HexString(Bytes(tx.to_bytes())))
718 .collect::<Vec<HexString>>();
719 let query: Operation<schema::tx::DryRun, DryRunArg> =
720 schema::tx::DryRun::build(DryRunArg {
721 txs,
722 utxo_validation,
723 gas_price: gas_price.map(|gp| gp.into()),
724 block_height: at_height.map(|bh| bh.into()),
725 });
726 let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
727 tx_statuses
728 .into_iter()
729 .map(|tx_status| tx_status.try_into().map_err(Into::into))
730 .collect()
731 }
732
733 pub async fn dry_run_opt_record_storage_reads(
735 &self,
736 txs: &[Transaction],
737 utxo_validation: Option<bool>,
739 gas_price: Option<u64>,
740 at_height: Option<BlockHeight>,
741 ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
742 let txs = txs
743 .iter()
744 .map(|tx| HexString(Bytes(tx.to_bytes())))
745 .collect::<Vec<HexString>>();
746 let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
747 schema::tx::DryRunRecordStorageReads::build(DryRunArg {
748 txs,
749 utxo_validation,
750 gas_price: gas_price.map(|gp| gp.into()),
751 block_height: at_height.map(|bh| bh.into()),
752 });
753 let result = self
754 .query(query)
755 .await
756 .map(|r| r.dry_run_record_storage_reads)?;
757 let tx_statuses = result
758 .tx_statuses
759 .into_iter()
760 .map(|tx_status| tx_status.try_into().map_err(Into::into))
761 .collect::<io::Result<Vec<_>>>()?;
762 let storage_reads = result
763 .storage_reads
764 .into_iter()
765 .map(Into::into)
766 .collect::<Vec<_>>();
767 Ok((tx_statuses, storage_reads))
768 }
769
770 pub async fn storage_read_replay(
772 &self,
773 height: &BlockHeight,
774 ) -> io::Result<Vec<StorageReadReplayEvent>> {
775 let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
776 StorageReadReplay::build(StorageReadReplayArgs {
777 height: (*height).into(),
778 });
779 Ok(self
780 .query(query)
781 .await
782 .map(|r| r.storage_read_replay)?
783 .into_iter()
784 .map(Into::into)
785 .collect())
786 }
787
788 #[allow(clippy::too_many_arguments)]
811 pub async fn assemble_tx(
812 &self,
813 tx: &Transaction,
814 block_horizon: u32,
815 required_balances: Vec<RequiredBalance>,
816 fee_address_index: u16,
817 exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
818 estimate_predicates: bool,
819 reserve_gas: Option<u64>,
820 ) -> io::Result<AssembleTransactionResult> {
821 let tx = HexString(Bytes(tx.to_bytes()));
822 let block_horizon = block_horizon.into();
823
824 let required_balances: Vec<_> = required_balances
825 .into_iter()
826 .map(schema::tx::RequiredBalance::try_from)
827 .collect::<Result<Vec<_>, _>>()?;
828
829 let fee_address_index = fee_address_index.into();
830
831 let exclude_input = exclude.map(Into::into);
832
833 let reserve_gas = reserve_gas.map(U64::from);
834
835 let query_arg = AssembleTxArg {
836 tx,
837 block_horizon,
838 required_balances,
839 fee_address_index,
840 exclude_input,
841 estimate_predicates,
842 reserve_gas,
843 };
844
845 let query = schema::tx::AssembleTx::build(query_arg);
846 let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
847 Ok(assemble_tx_result.try_into()?)
848 }
849
850 pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
852 let serialized_tx = tx.to_bytes();
853 let query = schema::tx::EstimatePredicates::build(TxArg {
854 tx: HexString(Bytes(serialized_tx)),
855 });
856 let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
857 let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
858 *tx = tx_with_predicate;
859 Ok(())
860 }
861
862 pub async fn submit(
863 &self,
864 tx: &Transaction,
865 ) -> io::Result<types::primitives::TransactionId> {
866 self.submit_opt(tx, None).await
867 }
868
869 pub async fn submit_opt(
870 &self,
871 tx: &Transaction,
872 estimate_predicates: Option<bool>,
873 ) -> io::Result<types::primitives::TransactionId> {
874 let tx = tx.clone().to_bytes();
875 let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
876 tx: HexString(Bytes(tx)),
877 estimate_predicates,
878 });
879
880 let id = self.query(query).await.map(|r| r.submit)?.id.into();
881 Ok(id)
882 }
883
884 #[cfg(feature = "subscriptions")]
886 pub async fn submit_and_await_commit(
887 &self,
888 tx: &Transaction,
889 ) -> io::Result<TransactionStatus> {
890 self.submit_and_await_commit_opt(tx, None).await
891 }
892
893 #[cfg(feature = "subscriptions")]
902 pub async fn submit_and_await_commit_opt(
903 &self,
904 tx: &Transaction,
905 estimate_predicates: Option<bool>,
906 ) -> io::Result<TransactionStatus> {
907 let tx = tx.clone().to_bytes();
908 let variables = TxWithEstimatedPredicatesArg {
909 tx: HexString(Bytes(tx)),
910 estimate_predicates,
911 };
912
913 let mut stream = self.subscribe(variables).await?.map(
914 |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
915 let status: TransactionStatus = r?.submit_and_await.try_into()?;
916 Result::<_, io::Error>::Ok(status)
917 },
918 );
919
920 let status = stream.next().await.ok_or_else(|| {
921 io::Error::other("Failed to get status from the submission")
922 })??;
923
924 Ok(status)
925 }
926
927 #[cfg(feature = "subscriptions")]
929 pub async fn submit_and_await_commit_with_tx(
930 &self,
931 tx: &Transaction,
932 ) -> io::Result<StatusWithTransaction> {
933 self.submit_and_await_commit_with_tx_opt(tx, None).await
934 }
935
936 #[cfg(feature = "subscriptions")]
938 pub async fn submit_and_await_commit_with_tx_opt(
939 &self,
940 tx: &Transaction,
941 estimate_predicates: Option<bool>,
942 ) -> io::Result<StatusWithTransaction> {
943 let tx = tx.clone().to_bytes();
944 let variables = TxWithEstimatedPredicatesArg {
945 tx: HexString(Bytes(tx)),
946 estimate_predicates,
947 };
948
949 let mut stream = self.subscribe(variables).await?.map(
950 |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
951 let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
952 Result::<_, io::Error>::Ok(status)
953 },
954 );
955
956 let status = stream.next().await.ok_or_else(|| {
957 io::Error::other("Failed to get status from the submission")
958 })??;
959
960 Ok(status)
961 }
962
963 #[cfg(feature = "subscriptions")]
965 pub async fn submit_and_await_status(
966 &self,
967 tx: &Transaction,
968 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
969 self.submit_and_await_status_opt(tx, None, None).await
970 }
971
972 #[cfg(feature = "subscriptions")]
974 pub async fn submit_and_await_status_opt(
975 &self,
976 tx: &Transaction,
977 estimate_predicates: Option<bool>,
978 include_preconfirmation: Option<bool>,
979 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
980 use schema::tx::SubmitAndAwaitStatusArg;
981 let tx = tx.clone().to_bytes();
982 let variables = SubmitAndAwaitStatusArg {
983 tx: HexString(Bytes(tx)),
984 estimate_predicates,
985 include_preconfirmation,
986 };
987
988 let stream = self.subscribe(variables).await?.map(
989 |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
990 let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
991 Result::<_, io::Error>::Ok(status)
992 },
993 );
994
995 Ok(stream)
996 }
997
998 #[cfg(feature = "subscriptions")]
1000 pub async fn contract_storage_slots(
1001 &self,
1002 contract_id: &ContractId,
1003 ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1004 use schema::storage::ContractStorageSlotsArgs;
1005 let variables = ContractStorageSlotsArgs {
1006 contract_id: (*contract_id).into(),
1007 };
1008
1009 let stream = self.subscribe(variables).await?.map(
1010 |result: io::Result<schema::storage::ContractStorageSlots>| {
1011 let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1012 Result::<_, io::Error>::Ok(result)
1013 },
1014 );
1015
1016 Ok(stream)
1017 }
1018
1019 #[cfg(feature = "subscriptions")]
1021 pub async fn contract_storage_balances(
1022 &self,
1023 contract_id: &ContractId,
1024 ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1025 {
1026 use schema::{
1027 contract::ContractBalance,
1028 storage::ContractStorageBalancesArgs,
1029 };
1030 let variables = ContractStorageBalancesArgs {
1031 contract_id: (*contract_id).into(),
1032 };
1033
1034 let stream = self.subscribe(variables).await?.map(
1035 |result: io::Result<schema::storage::ContractStorageBalances>| {
1036 let result: ContractBalance = result?.contract_storage_balances;
1037 Result::<_, io::Error>::Ok(result)
1038 },
1039 );
1040
1041 Ok(stream)
1042 }
1043
1044 #[cfg(feature = "subscriptions")]
1046 pub async fn new_blocks_subscription(
1047 &self,
1048 ) -> io::Result<
1049 impl Stream<
1050 Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1051 > + '_,
1052 > {
1053 let stream = self.subscribe(()).await?.map(
1054 |r: io::Result<schema::block::NewBlocksSubscription>| {
1055 let result: fuel_core_types::services::block_importer::ImportResult =
1056 postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1057 io::Error::other(format!(
1058 "Failed to deserialize ImportResult: {e:?}"
1059 ))
1060 })?;
1061 Result::<_, io::Error>::Ok(result)
1062 },
1063 );
1064
1065 Ok(stream)
1066 }
1067
1068 #[cfg(feature = "subscriptions")]
1070 pub async fn preconfirmations_subscription(
1071 &self,
1072 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1073 let stream = self.subscribe(()).await?.map(
1074 |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1075 let status: TransactionStatus = r?.preconfirmations.try_into()?;
1076 Result::<_, io::Error>::Ok(status)
1077 },
1078 );
1079
1080 Ok(stream)
1081 }
1082
1083 pub async fn contract_slots_values(
1084 &self,
1085 contract_id: &ContractId,
1086 block_height: Option<BlockHeight>,
1087 requested_storage_slots: Vec<Bytes32>,
1088 ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1089 let query = schema::storage::ContractSlotValues::build(
1090 schema::storage::ContractSlotValuesArgs {
1091 contract_id: (*contract_id).into(),
1092 block_height: block_height.map(|b| (*b).into()),
1093 storage_slots: requested_storage_slots
1094 .into_iter()
1095 .map(Into::into)
1096 .collect(),
1097 },
1098 );
1099
1100 self.query(query)
1101 .await
1102 .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1103 }
1104
1105 pub async fn contract_balance_values(
1106 &self,
1107 contract_id: &ContractId,
1108 block_height: Option<BlockHeight>,
1109 requested_storage_slots: Vec<AssetId>,
1110 ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1111 let query = schema::storage::ContractBalanceValues::build(
1112 schema::storage::ContractBalanceValuesArgs {
1113 contract_id: (*contract_id).into(),
1114 block_height: block_height.map(|b| (*b).into()),
1115 assets: requested_storage_slots
1116 .into_iter()
1117 .map(Into::into)
1118 .collect(),
1119 },
1120 );
1121
1122 self.query(query)
1123 .await
1124 .map(|r| r.contract_balance_values.into_iter().collect())
1125 }
1126
1127 pub async fn start_session(&self) -> io::Result<String> {
1128 let query = schema::StartSession::build(());
1129
1130 self.query(query)
1131 .await
1132 .map(|r| r.start_session.into_inner())
1133 }
1134
1135 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1136 let query = schema::EndSession::build(IdArg { id: id.into() });
1137
1138 self.query(query).await.map(|r| r.end_session)
1139 }
1140
1141 pub async fn reset(&self, id: &str) -> io::Result<bool> {
1142 let query = schema::Reset::build(IdArg { id: id.into() });
1143
1144 self.query(query).await.map(|r| r.reset)
1145 }
1146
1147 pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1148 let op = serde_json::to_string(op)?;
1149 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1150
1151 self.query(query).await.map(|r| r.execute)
1152 }
1153
1154 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1155 let query = schema::Register::build(RegisterArgs {
1156 id: id.into(),
1157 register: register.into(),
1158 });
1159
1160 Ok(self.query(query).await?.register.0 as Word)
1161 }
1162
1163 pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1164 let query = schema::Memory::build(MemoryArgs {
1165 id: id.into(),
1166 start: start.into(),
1167 size: size.into(),
1168 });
1169
1170 let memory = self.query(query).await?.memory;
1171
1172 Ok(serde_json::from_str(memory.as_str())?)
1173 }
1174
1175 pub async fn set_breakpoint(
1176 &self,
1177 session_id: &str,
1178 contract: fuel_types::ContractId,
1179 pc: u64,
1180 ) -> io::Result<()> {
1181 let operation = SetBreakpoint::build(SetBreakpointArgs {
1182 id: Id::new(session_id),
1183 bp: schema::Breakpoint {
1184 contract: contract.into(),
1185 pc: U64(pc),
1186 },
1187 });
1188
1189 let response = self.query(operation).await?;
1190 assert!(
1191 response.set_breakpoint,
1192 "Setting breakpoint returned invalid reply"
1193 );
1194 Ok(())
1195 }
1196
1197 pub async fn set_single_stepping(
1198 &self,
1199 session_id: &str,
1200 enable: bool,
1201 ) -> io::Result<()> {
1202 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1203 id: Id::new(session_id),
1204 enable,
1205 });
1206 self.query(operation).await?;
1207 Ok(())
1208 }
1209
1210 pub async fn start_tx(
1211 &self,
1212 session_id: &str,
1213 tx: &Transaction,
1214 ) -> io::Result<RunResult> {
1215 let operation = StartTx::build(StartTxArgs {
1216 id: Id::new(session_id),
1217 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1218 });
1219 let response = self.query(operation).await?.start_tx;
1220 Ok(response)
1221 }
1222
1223 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1224 let operation = ContinueTx::build(ContinueTxArgs {
1225 id: Id::new(session_id),
1226 });
1227 let response = self.query(operation).await?.continue_tx;
1228 Ok(response)
1229 }
1230
1231 pub async fn transaction(
1232 &self,
1233 id: &TxId,
1234 ) -> io::Result<Option<TransactionResponse>> {
1235 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1236
1237 let transaction = self.query(query).await?.transaction;
1238
1239 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1240 }
1241
1242 pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1244 let query =
1245 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1246
1247 let status = self.query(query).await?.transaction.ok_or_else(|| {
1248 io::Error::new(
1249 ErrorKind::NotFound,
1250 format!("status not found for transaction {id} "),
1251 )
1252 })?;
1253
1254 let status = status
1255 .status
1256 .ok_or_else(|| {
1257 io::Error::new(
1258 ErrorKind::NotFound,
1259 format!("status not found for transaction {id}"),
1260 )
1261 })?
1262 .try_into()?;
1263 Ok(status)
1264 }
1265
1266 #[tracing::instrument(skip(self), level = "debug")]
1267 #[cfg(feature = "subscriptions")]
1268 pub async fn subscribe_transaction_status(
1270 &self,
1271 id: &TxId,
1272 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1273 self.subscribe_transaction_status_opt(id, None).await
1274 }
1275
1276 #[cfg(feature = "subscriptions")]
1277 pub async fn subscribe_transaction_status_opt(
1279 &self,
1280 id: &TxId,
1281 include_preconfirmation: Option<bool>,
1282 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1283 use schema::tx::{
1284 StatusChangeSubscription,
1285 StatusChangeSubscriptionArgs,
1286 };
1287 let tx_id: TransactionId = (*id).into();
1288 let variables = StatusChangeSubscriptionArgs {
1289 id: tx_id,
1290 include_preconfirmation,
1291 };
1292
1293 tracing::debug!("subscribing");
1294 let stream = self
1295 .subscribe::<StatusChangeSubscription, StatusChangeSubscriptionArgs>(
1296 variables,
1297 )
1298 .await?
1299 .map(|tx| {
1300 tracing::debug!("received {tx:?}");
1301 let tx = tx?;
1302 let status = tx.status_change.try_into()?;
1303 Ok(status)
1304 });
1305
1306 Ok(stream)
1307 }
1308
1309 #[cfg(feature = "subscriptions")]
1310 pub async fn await_transaction_commit(
1315 &self,
1316 id: &TxId,
1317 ) -> io::Result<TransactionStatus> {
1318 let status_result = self
1321 .subscribe_transaction_status(id)
1322 .await?
1323 .skip_while(|status| {
1324 future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1325 })
1326 .next()
1327 .await;
1328
1329 if let Some(Ok(status)) = status_result {
1330 Ok(status)
1331 } else {
1332 Err(io::Error::other(format!(
1333 "Failed to get status for transaction {status_result:?}"
1334 )))
1335 }
1336 }
1337
1338 pub async fn transactions(
1340 &self,
1341 request: PaginationRequest<String>,
1342 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1343 let args = schema::ConnectionArgs::from(request);
1344 let query = schema::tx::TransactionsQuery::build(args);
1345 let transactions = self.query(query).await?.transactions.try_into()?;
1346 Ok(transactions)
1347 }
1348
1349 pub async fn transactions_by_owner(
1351 &self,
1352 owner: &Address,
1353 request: PaginationRequest<String>,
1354 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1355 let owner: schema::Address = (*owner).into();
1356 let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1357 let query = schema::tx::TransactionsByOwnerQuery::build(args);
1358
1359 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1360 Ok(transactions)
1361 }
1362
1363 pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1364 let query =
1365 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1366
1367 let tx = self.query(query).await?.transaction.ok_or_else(|| {
1368 io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1369 })?;
1370
1371 let receipts = match tx.status {
1372 Some(status) => match status {
1373 schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1374 s.receipts
1375 .into_iter()
1376 .map(TryInto::<Receipt>::try_into)
1377 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1378 )
1379 .transpose()?,
1380 schema::tx::TransactionStatus::FailureStatus(s) => Some(
1381 s.receipts
1382 .into_iter()
1383 .map(TryInto::<Receipt>::try_into)
1384 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1385 )
1386 .transpose()?,
1387 _ => None,
1388 },
1389 _ => None,
1390 };
1391
1392 Ok(receipts)
1393 }
1394
1395 #[cfg(feature = "test-helpers")]
1396 pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1397 let query = schema::tx::AllReceipts::build(());
1398 let receipts = self.query(query).await?.all_receipts;
1399
1400 let vec: Result<Vec<Receipt>, ConversionError> = receipts
1401 .into_iter()
1402 .map(TryInto::<Receipt>::try_into)
1403 .collect();
1404
1405 Ok(vec?)
1406 }
1407
1408 pub async fn produce_blocks(
1409 &self,
1410 blocks_to_produce: u32,
1411 start_timestamp: Option<u64>,
1412 ) -> io::Result<BlockHeight> {
1413 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1414 blocks_to_produce: blocks_to_produce.into(),
1415 start_timestamp: start_timestamp
1416 .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1417 });
1418
1419 let new_height = self.query(query).await?.produce_blocks;
1420
1421 Ok(new_height.into())
1422 }
1423
1424 pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1425 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1426 id: Some((*id).into()),
1427 });
1428
1429 let block = self
1430 .query(query)
1431 .await?
1432 .block
1433 .map(TryInto::try_into)
1434 .transpose()?;
1435
1436 Ok(block)
1437 }
1438
1439 pub async fn block_by_height(
1440 &self,
1441 height: BlockHeight,
1442 ) -> io::Result<Option<types::Block>> {
1443 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1444 height: Some(U32(height.into())),
1445 });
1446
1447 let block = self
1448 .query(query)
1449 .await?
1450 .block
1451 .map(TryInto::try_into)
1452 .transpose()?;
1453
1454 Ok(block)
1455 }
1456
1457 pub async fn da_compressed_block(
1458 &self,
1459 height: BlockHeight,
1460 ) -> io::Result<Option<Vec<u8>>> {
1461 let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1462 DaCompressedBlockByHeightArgs {
1463 height: U32(height.into()),
1464 },
1465 );
1466
1467 Ok(self
1468 .query(query)
1469 .await?
1470 .da_compressed_block
1471 .map(|b| b.bytes.into()))
1472 }
1473
1474 pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1476 let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1477 let blob = self.query(query).await?.blob.map(Into::into);
1478 Ok(blob)
1479 }
1480
1481 pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1483 let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1484 Ok(self.query(query).await?.blob.is_some())
1485 }
1486
1487 pub async fn blocks(
1489 &self,
1490 request: PaginationRequest<String>,
1491 ) -> io::Result<PaginatedResult<types::Block, String>> {
1492 let args = schema::ConnectionArgs::from(request);
1493 let query = schema::block::BlocksQuery::build(args);
1494
1495 let blocks = self.query(query).await?.blocks.try_into()?;
1496
1497 Ok(blocks)
1498 }
1499
1500 pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1501 let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1502 utxo_id: (*id).into(),
1503 });
1504 let coin = self.query(query).await?.coin.map(Into::into);
1505 Ok(coin)
1506 }
1507
1508 pub async fn coins(
1510 &self,
1511 owner: &Address,
1512 asset_id: Option<&AssetId>,
1513 request: PaginationRequest<String>,
1514 ) -> io::Result<PaginatedResult<types::Coin, String>> {
1515 let owner: schema::Address = (*owner).into();
1516 let asset_id = asset_id.map(|id| (*id).into());
1517 let args = CoinsConnectionArgs::from((owner, asset_id, request));
1518 let query = schema::coins::CoinsQuery::build(args);
1519
1520 let coins = self.query(query).await?.coins.into();
1521 Ok(coins)
1522 }
1523
1524 pub async fn coins_to_spend(
1526 &self,
1527 owner: &Address,
1528 spend_query: Vec<(AssetId, u128, Option<u16>)>,
1529 excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1531 ) -> io::Result<Vec<Vec<types::CoinType>>> {
1532 let owner: schema::Address = (*owner).into();
1533 let spend_query: Vec<SpendQueryElementInput> = spend_query
1534 .iter()
1535 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1536 Ok(SpendQueryElementInput {
1537 asset_id: (*asset_id).into(),
1538 amount: (*amount).into(),
1539 max: (*max).map(|max| max.into()),
1540 })
1541 })
1542 .try_collect()?;
1543 let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1544 let args =
1545 schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1546 let query = schema::coins::CoinsToSpendQuery::build(args);
1547
1548 let coins_per_asset = self
1549 .query(query)
1550 .await?
1551 .coins_to_spend
1552 .into_iter()
1553 .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1554 .collect::<Vec<_>>();
1555 Ok(coins_per_asset)
1556 }
1557
1558 pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1559 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1560 id: (*id).into(),
1561 });
1562 let contract = self.query(query).await?.contract.map(Into::into);
1563 Ok(contract)
1564 }
1565
1566 pub async fn contract_balance(
1567 &self,
1568 id: &ContractId,
1569 asset: Option<&AssetId>,
1570 ) -> io::Result<u64> {
1571 let asset_id: schema::AssetId = match asset {
1572 Some(asset) => (*asset).into(),
1573 None => schema::AssetId::default(),
1574 };
1575
1576 let query =
1577 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1578 id: (*id).into(),
1579 asset: asset_id,
1580 });
1581
1582 let balance: types::ContractBalance =
1583 self.query(query).await?.contract_balance.into();
1584 Ok(balance.amount)
1585 }
1586
1587 pub async fn balance(
1588 &self,
1589 owner: &Address,
1590 asset_id: Option<&AssetId>,
1591 ) -> io::Result<u128> {
1592 let owner: schema::Address = (*owner).into();
1593 let asset_id: schema::AssetId = match asset_id {
1594 Some(asset_id) => (*asset_id).into(),
1595 None => schema::AssetId::default(),
1596 };
1597 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1598 let balance: types::Balance = self.query(query).await?.balance.into();
1599 Ok(balance.amount)
1600 }
1601
1602 pub async fn balances(
1604 &self,
1605 owner: &Address,
1606 request: PaginationRequest<String>,
1607 ) -> io::Result<PaginatedResult<types::Balance, String>> {
1608 let owner: schema::Address = (*owner).into();
1609 let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1610 let query = schema::balance::BalancesQuery::build(args);
1611
1612 let balances = self.query(query).await?.balances.into();
1613 Ok(balances)
1614 }
1615
1616 pub async fn contract_balances(
1617 &self,
1618 contract: &ContractId,
1619 request: PaginationRequest<String>,
1620 ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1621 let contract_id: schema::ContractId = (*contract).into();
1622 let args = ContractBalancesConnectionArgs::from((contract_id, request));
1623 let query = schema::contract::ContractBalancesQuery::build(args);
1624
1625 let balances = self.query(query).await?.contract_balances.into();
1626
1627 Ok(balances)
1628 }
1629
1630 pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1632 let query = schema::message::MessageQuery::build(NonceArgs {
1633 nonce: (*nonce).into(),
1634 });
1635 let message = self.query(query).await?.message.map(Into::into);
1636 Ok(message)
1637 }
1638
1639 pub async fn messages(
1640 &self,
1641 owner: Option<&Address>,
1642 request: PaginationRequest<String>,
1643 ) -> io::Result<PaginatedResult<types::Message, String>> {
1644 let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1645 let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1646 let query = schema::message::OwnedMessageQuery::build(args);
1647
1648 let messages = self.query(query).await?.messages.into();
1649
1650 Ok(messages)
1651 }
1652
1653 pub async fn contract_info(
1654 &self,
1655 contract: &ContractId,
1656 ) -> io::Result<Option<types::Contract>> {
1657 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1658 id: (*contract).into(),
1659 });
1660 let contract_info = self.query(query).await?.contract.map(Into::into);
1661 Ok(contract_info)
1662 }
1663
1664 pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1665 let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1666 nonce: (*nonce).into(),
1667 });
1668 let status = self.query(query).await?.message_status.into();
1669
1670 Ok(status)
1671 }
1672
1673 pub async fn message_proof(
1675 &self,
1676 transaction_id: &TxId,
1677 nonce: &Nonce,
1678 commit_block_id: Option<&BlockId>,
1679 commit_block_height: Option<BlockHeight>,
1680 ) -> io::Result<types::MessageProof> {
1681 let transaction_id: TransactionId = (*transaction_id).into();
1682 let nonce: schema::Nonce = (*nonce).into();
1683 let commit_block_id: Option<schema::BlockId> =
1684 commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1685 let commit_block_height = commit_block_height.map(Into::into);
1686 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1687 transaction_id,
1688 nonce,
1689 commit_block_id,
1690 commit_block_height,
1691 });
1692 let proof = self.query(query).await?.message_proof.try_into()?;
1693 Ok(proof)
1694 }
1695
1696 pub async fn relayed_transaction_status(
1697 &self,
1698 id: &Bytes32,
1699 ) -> io::Result<Option<RelayedTransactionStatus>> {
1700 let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1701 RelayedTransactionStatusArgs {
1702 id: id.to_owned().into(),
1703 },
1704 );
1705 let status = self
1706 .query(query)
1707 .await?
1708 .relayed_transaction_status
1709 .map(|status| status.try_into())
1710 .transpose()?;
1711 Ok(status)
1712 }
1713
1714 pub async fn asset_info(
1715 &self,
1716 asset_id: &AssetId,
1717 ) -> io::Result<Option<AssetDetail>> {
1718 let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1719 id: (*asset_id).into(),
1720 });
1721 let asset_info = self.query(query).await?.asset_details.map(Into::into);
1722 Ok(asset_info)
1723 }
1724}
1725
1726#[cfg(any(test, feature = "test-helpers"))]
1727impl FuelClient {
1728 pub async fn transparent_transaction(
1729 &self,
1730 id: &TxId,
1731 ) -> io::Result<Option<types::TransactionType>> {
1732 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1733
1734 let transaction = self.query(query).await?.transaction;
1735
1736 Ok(transaction
1737 .map(|tx| {
1738 let response: TransactionResponse = tx.try_into()?;
1739 Ok::<_, ConversionError>(response.transaction)
1740 })
1741 .transpose()?)
1742 }
1743}
1744
1745#[cfg(feature = "rpc")]
1746impl FuelClient {
1747 fn rpc_client(&self) -> io::Result<ProtoBlockAggregatorClient<Channel>> {
1748 self.rpc_client
1749 .clone()
1750 .ok_or(io::Error::other("RPC client not initialized"))
1751 }
1752
1753 pub async fn get_block_range(
1754 &self,
1755 start: BlockHeight,
1756 end: BlockHeight,
1757 ) -> io::Result<
1758 impl Stream<
1759 Item = io::Result<(
1760 fuel_core_types::blockchain::block::Block,
1761 Vec<Vec<Receipt>>,
1762 )>,
1763 >,
1764 > {
1765 let request = ProtoBlockRangeRequest {
1766 start: *start,
1767 end: *end,
1768 };
1769
1770 let stream = self
1771 .rpc_client()?
1772 .get_block_range(request)
1773 .await
1774 .map_err(io::Error::other)?
1775 .into_inner()
1776 .then(|res| {
1777 let maybe_aws_client = self.aws_client.clone();
1778 async move {
1779 let maybe_aws_client = maybe_aws_client.clone();
1780 let resp =
1781 res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
1782 Self::convert_block_response(resp, maybe_aws_client).await
1783 }
1784 });
1785 Ok(stream)
1786 }
1787
1788 async fn convert_block_response(
1789 resp: BlockResponse,
1790 s3_client: AWSClientManager,
1791 ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec<Vec<Receipt>>)> {
1792 let payload = resp
1793 .payload
1794 .ok_or(io::Error::other("No RPC payload for `BlockResponse`"))?;
1795 match payload {
1796 Payload::Literal(_) => {
1797 Err(io::Error::other("Literal payloads are not supported yet"))
1799 }
1800 Payload::Bytes(bytes) => {
1801 let proto_block =
1802 ProtoBlock::decode(bytes.as_slice()).map_err(io::Error::other)?;
1803 fuel_block_from_protobuf(proto_block).map_err(|e| {
1804 io::Error::other(format!(
1805 "Failed to convert RPC block to internal block: {e:?}"
1806 ))
1807 })
1808 }
1809 Payload::Remote(remote) => {
1810 let RemoteBlockResponse { location } = remote;
1811 match location {
1812 Some(Location::S3(s3)) => {
1813 let RemoteS3Bucket {
1814 bucket,
1815 key,
1816 endpoint,
1817 requester_pays,
1818 } = s3;
1819 let zipped_bytes = Self::get_block_from_s3_bucket(
1820 s3_client,
1821 &endpoint,
1822 &bucket,
1823 &key,
1824 requester_pays,
1825 )
1826 .await?;
1827
1828 let block_bytes = Self::unzip_bytes(&zipped_bytes)?;
1829 let block =
1830 ProtoBlock::decode(block_bytes.as_slice()).map_err(|e| {
1831 io::Error::other(format!("Failed to decode block: {e}"))
1832 })?;
1833 let (block, receipts) =
1834 fuel_block_from_protobuf(block).map_err(|e| {
1835 io::Error::other(format!(
1836 "Failed to convert RPC block to internal block: {e:?}"
1837 ))
1838 })?;
1839 Ok((block, receipts))
1840 }
1841 _ => Err(io::Error::other("Remote blocks are not supported yet")),
1842 }
1843 }
1844 }
1845 }
1846 async fn get_block_from_s3_bucket(
1847 s3_client: AWSClientManager,
1848 url: &Option<String>,
1849 bucket: &str,
1850 key: &str,
1851 requester_pays: bool,
1852 ) -> io::Result<prost::bytes::Bytes> {
1853 use aws_sdk_s3::types::RequestPayer;
1854 tracing::debug!("getting block from bucket: {} with key {}", bucket, key);
1855 let mut req = s3_client
1856 .get_client(url)
1857 .await?
1858 .get_object()
1859 .bucket(bucket)
1860 .key(key);
1861 if requester_pays {
1862 req = req.request_payer(RequestPayer::Requester);
1863 }
1864 let obj = req.send().await.map_err(|e| {
1865 io::Error::other(format!("Failed to get object from S3: {e:?}"))
1866 })?;
1867 let bytes = obj
1868 .body
1869 .collect()
1870 .await
1871 .map_err(|e| {
1872 io::Error::other(format!("Failed to get object from S3: {e:?}"))
1873 })?
1874 .into_bytes();
1875 Ok(bytes)
1876 }
1877
1878 async fn new_aws_client(url: &Option<String>) -> AWSClient {
1879 let credentials = DefaultCredentialsChain::builder().build().await;
1880 let mut config_builder = aws_config::defaults(BehaviorVersion::latest())
1881 .credentials_provider(credentials);
1882 if let Some(url) = url {
1883 config_builder = config_builder.endpoint_url(url);
1884 }
1885 let sdk_config = config_builder.load().await;
1886 let builder = aws_sdk_s3::config::Builder::from(&sdk_config);
1887 let config = builder.force_path_style(true).build();
1888 AWSClient::from_conf(config)
1889 }
1890
1891 fn unzip_bytes(bytes: &[u8]) -> io::Result<Vec<u8>> {
1892 let mut decoder = GzDecoder::new(bytes);
1893 let mut output = Vec::new();
1894 decoder.read_to_end(&mut output).map_err(io::Error::other)?;
1895 Ok(output)
1896 }
1897
1898 pub async fn get_aggregated_height(&self) -> io::Result<BlockHeight> {
1901 let request = ProtoBlockHeightRequest {};
1902 let height = self
1903 .rpc_client()?
1904 .get_synced_block_height(request)
1905 .await
1906 .map_err(io::Error::other)?
1907 .into_inner()
1908 .height
1909 .ok_or(io::Error::other("No height in RPC response"))?;
1910 Ok(BlockHeight::from(height))
1911 }
1912
1913 pub async fn new_block_subscription(
1914 &self,
1915 ) -> io::Result<
1916 impl Stream<
1917 Item = io::Result<(
1918 fuel_core_types::blockchain::block::Block,
1919 Vec<Vec<Receipt>>,
1920 )>,
1921 >,
1922 > {
1923 let request = ProtoNewBlockSubscriptionRequest {};
1924 let stream = self
1925 .rpc_client()?
1926 .new_block_subscription(request)
1927 .await
1928 .map_err(io::Error::other)?
1929 .into_inner()
1930 .then(|res| {
1931 let maybe_aws_client = self.aws_client.clone();
1932 async move {
1933 let maybe_aws_client = maybe_aws_client.clone();
1934 let resp =
1935 res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
1936 Self::convert_block_response(resp, maybe_aws_client).await
1937 }
1938 });
1939 Ok(stream)
1940 }
1941}
1942
1943#[cfg(test)]
1944mod tests {
1945 use super::*;
1946
1947 #[test]
1948 fn with_urls_normalizes_urls_to_graphql_endpoint() {
1949 let urls = &["http://localhost:8080", "http://example.com:4000"];
1951
1952 let client = FuelClient::with_urls(urls).expect("should create client");
1954
1955 assert_eq!(
1957 client.get_default_url().as_str(),
1958 "http://localhost:8080/v1/graphql"
1959 );
1960 }
1961
1962 #[test]
1963 fn with_urls_adds_http_scheme_if_missing() {
1964 let urls = &["localhost:8080"];
1966
1967 let client = FuelClient::with_urls(urls).expect("should create client");
1969
1970 assert_eq!(
1972 client.get_default_url().as_str(),
1973 "http://localhost:8080/v1/graphql"
1974 );
1975 }
1976
1977 #[test]
1978 fn with_urls_overwrites_existing_path() {
1979 let urls = &["http://localhost:8080/some/path", "http://example.com/api"];
1981
1982 let client = FuelClient::with_urls(urls).expect("should create client");
1984
1985 assert_eq!(
1987 client.get_default_url().as_str(),
1988 "http://localhost:8080/v1/graphql"
1989 );
1990 }
1991
1992 #[test]
1993 fn new_and_with_urls_produce_same_url() {
1994 let url = "http://localhost:8080";
1996
1997 let client_new = FuelClient::new(url).expect("should create client via new");
1999 let client_with_urls =
2000 FuelClient::with_urls(&[url]).expect("should create client via with_urls");
2001
2002 assert_eq!(
2004 client_new.get_default_url().as_str(),
2005 client_with_urls.get_default_url().as_str()
2006 );
2007 }
2008}