1use self::schema::{
2 block::ProduceBlockArgs,
3 message::{
4 MessageProofArgs,
5 NonceArgs,
6 },
7};
8#[cfg(feature = "subscriptions")]
9use crate::client::types::StatusWithTransaction;
10use crate::{
11 client::{
12 schema::{
13 Tai64Timestamp,
14 TransactionId,
15 block::BlockByHeightArgs,
16 coins::{
17 ExcludeInput,
18 SpendQueryElementInput,
19 },
20 contract::ContractBalanceQueryArgs,
21 gas_price::EstimateGasPrice,
22 message::MessageStatusArgs,
23 relayed_tx::RelayedTransactionStatusArgs,
24 tx::{
25 DryRunArg,
26 TxWithEstimatedPredicatesArg,
27 },
28 },
29 types::{
30 RelayedTransactionStatus,
31 asset::AssetDetail,
32 gas_price::LatestGasPrice,
33 message::MessageStatus,
34 primitives::{
35 Address,
36 AssetId,
37 BlockId,
38 ContractId,
39 UtxoId,
40 },
41 upgrades::StateTransitionBytecode,
42 },
43 },
44 reqwest_ext::FuelGraphQlResponse,
45 transport::FailoverTransport,
46};
47use anyhow::{
48 Context,
49 anyhow,
50};
51#[cfg(feature = "subscriptions")]
52use cynic::SubscriptionBuilder;
53use cynic::{
54 Id,
55 MutationBuilder,
56 Operation,
57 QueryBuilder,
58 QueryFragment,
59 QueryVariables,
60};
61use fuel_core_types::{
62 blockchain::header::{
63 ConsensusParametersVersion,
64 StateTransitionBytecodeVersion,
65 },
66 fuel_asm::{
67 Instruction,
68 Word,
69 },
70 fuel_tx::{
71 BlobId,
72 Bytes32,
73 ConsensusParameters,
74 Receipt,
75 Transaction,
76 TxId,
77 },
78 fuel_types::{
79 self,
80 BlockHeight,
81 Nonce,
82 canonical::Serialize,
83 },
84 services::executor::{
85 StorageReadReplayEvent,
86 TransactionExecutionStatus,
87 },
88};
89#[cfg(feature = "subscriptions")]
90use futures::{
91 Stream,
92 StreamExt,
93};
94use itertools::Itertools;
95use pagination::{
96 PageDirection,
97 PaginatedResult,
98 PaginationRequest,
99};
100use reqwest::Url;
101use schema::{
102 Bytes,
103 ContinueTx,
104 ContinueTxArgs,
105 ConversionError,
106 HexString,
107 IdArg,
108 MemoryArgs,
109 RegisterArgs,
110 RunResult,
111 SetBreakpoint,
112 SetBreakpointArgs,
113 SetSingleStepping,
114 SetSingleSteppingArgs,
115 StartTx,
116 StartTxArgs,
117 U32,
118 U64,
119 assets::AssetInfoArg,
120 balance::BalanceArgs,
121 blob::BlobByIdArgs,
122 block::BlockByIdArgs,
123 coins::{
124 CoinByIdArgs,
125 CoinsConnectionArgs,
126 },
127 contract::{
128 ContractBalancesConnectionArgs,
129 ContractByIdArgs,
130 },
131 da_compressed::DaCompressedBlockByHeightArgs,
132 gas_price::BlockHorizonArgs,
133 storage_read_replay::{
134 StorageReadReplay,
135 StorageReadReplayArgs,
136 },
137 tx::{
138 AssembleTxArg,
139 TransactionsByOwnerConnectionArgs,
140 TxArg,
141 TxIdArgs,
142 },
143};
144#[cfg(feature = "subscriptions")]
145use std::future;
146use std::{
147 convert::TryInto,
148 io::{
149 self,
150 ErrorKind,
151 },
152 net,
153 str::{
154 self,
155 FromStr,
156 },
157 sync::{
158 Arc,
159 Mutex,
160 },
161};
162use tai64::Tai64;
163use tracing as _;
164use types::{
165 TransactionResponse,
166 TransactionStatus,
167 assemble_tx::{
168 AssembleTransactionResult,
169 RequiredBalance,
170 },
171};
172
173#[cfg(feature = "subscriptions")]
174use std::pin::Pin;
175
176#[cfg(feature = "rpc")]
177mod rpc_deps {
178 pub use aws_config::{
179 BehaviorVersion,
180 default_provider::credentials::DefaultCredentialsChain,
181 };
182 pub use aws_sdk_s3::Client as AWSClient;
183 pub use flate2::read::{
184 DeflateDecoder,
185 GzDecoder,
186 ZlibDecoder,
187 };
188 pub use fuel_core_block_aggregator_api::{
189 blocks::old_block_source::convertor_adapter::proto_to_fuel_conversions::fuel_block_from_protobuf,
190 protobuf_types::{
191 Block as ProtoBlock,
192 BlockHeightRequest as ProtoBlockHeightRequest,
193 BlockRangeRequest as ProtoBlockRangeRequest,
194 BlockResponse,
195 NewBlockSubscriptionRequest as ProtoNewBlockSubscriptionRequest,
196 RemoteBlockResponse,
197 RemoteHttpEndpoint,
198 RemoteS3Bucket,
199 block_aggregator_client::BlockAggregatorClient as ProtoBlockAggregatorClient,
200 block_response::Payload,
201 remote_block_response::Location,
202 },
203 };
204 pub use prost::Message;
205 pub use std::{
206 collections::HashMap,
207 io::{
208 Cursor,
209 Read,
210 },
211 };
212 pub use tokio::sync::RwLock;
213 pub use tonic::transport::Channel;
214}
215#[cfg(feature = "rpc")]
216use rpc_deps::*;
217
218#[cfg(feature = "rpc")]
222fn remote_block_object_http_client() -> reqwest::Client {
223 reqwest::Client::builder()
224 .no_gzip()
225 .no_brotli()
226 .no_deflate()
227 .no_zstd()
228 .build()
229 .expect("reqwest ClientBuilder for remote block fetches should succeed")
230}
231
232pub mod pagination;
233pub mod schema;
234pub mod types;
235
236type RegisterId = u32;
237
238#[derive(Debug, derive_more::Display, derive_more::From)]
239#[non_exhaustive]
240pub enum Error {
244 #[from]
246 Other(anyhow::Error),
247}
248
249#[derive(Debug)]
252pub enum ConsistencyPolicy {
253 Auto {
257 height: Arc<Mutex<Option<BlockHeight>>>,
259 },
260 Manual {
263 height: Option<BlockHeight>,
265 },
266}
267
268impl Clone for ConsistencyPolicy {
269 fn clone(&self) -> Self {
270 match self {
271 Self::Auto { height } => Self::Auto {
272 height: Arc::new(Mutex::new(height.lock().ok().and_then(|h| *h))),
275 },
276 Self::Manual { height } => Self::Manual { height: *height },
277 }
278 }
279}
280
281#[derive(Debug, Default)]
282struct ChainStateInfo {
283 current_stf_version: Arc<Mutex<Option<StateTransitionBytecodeVersion>>>,
284 current_consensus_parameters_version: Arc<Mutex<Option<ConsensusParametersVersion>>>,
285 node_version: Arc<Mutex<Option<String>>>,
287}
288
289impl Clone for ChainStateInfo {
290 fn clone(&self) -> Self {
291 Self {
292 current_stf_version: Arc::new(Mutex::new(
293 self.current_stf_version.lock().ok().and_then(|v| *v),
294 )),
295 current_consensus_parameters_version: Arc::new(Mutex::new(
296 self.current_consensus_parameters_version
297 .lock()
298 .ok()
299 .and_then(|v| *v),
300 )),
301 node_version: Arc::new(Mutex::new(
302 self.node_version.lock().ok().and_then(|v| v.clone()),
303 )),
304 }
305 }
306}
307
308#[derive(Debug, Clone)]
309pub struct FuelClient {
310 transport: FailoverTransport,
311 require_height: ConsistencyPolicy,
312 chain_state_info: ChainStateInfo,
313 #[cfg(feature = "rpc")]
314 rpc_client: Option<ProtoBlockAggregatorClient<Channel>>,
315 #[cfg(feature = "rpc")]
316 aws_client: AWSClientManager,
317 #[cfg(feature = "rpc")]
319 http_client: reqwest::Client,
320}
321
322#[cfg(feature = "rpc")]
323#[derive(Debug, Clone)]
324pub struct AWSClientManager {
325 specific: Arc<RwLock<HashMap<Option<String>, AWSClient>>>,
326}
327
328#[cfg(feature = "rpc")]
329impl AWSClientManager {
330 pub fn new() -> Self {
331 Self {
332 specific: Arc::new(RwLock::new(HashMap::new())),
333 }
334 }
335
336 pub async fn get_client(&self, url: &Option<String>) -> Result<AWSClient, io::Error> {
337 if let Some(existing) = self.specific.read().await.get(url).cloned() {
338 return Ok(existing);
339 }
340 let client = FuelClient::new_aws_client(url).await;
341 let mut guard = self.specific.write().await;
342 let client = guard
343 .entry(url.clone())
344 .or_insert_with(|| client.clone())
345 .clone();
346 Ok(client)
347 }
348}
349
350#[cfg(feature = "rpc")]
351impl Default for AWSClientManager {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357fn normalize_url(url_str: &str) -> anyhow::Result<Url> {
359 let mut raw_url = url_str.to_string();
360 if !raw_url.starts_with("http") {
361 raw_url = format!("http://{raw_url}");
362 }
363
364 let mut url = reqwest::Url::parse(&raw_url)
365 .map_err(anyhow::Error::msg)
366 .with_context(|| format!("Invalid fuel-core URL: {url_str}"))?;
367 url.set_path("/v1/graphql");
368
369 Ok(url)
370}
371
372impl FromStr for FuelClient {
373 type Err = anyhow::Error;
374
375 fn from_str(str: &str) -> Result<Self, Self::Err> {
376 let url = normalize_url(str)?;
377
378 Ok(Self {
379 transport: FailoverTransport::new(vec![url])?,
380 require_height: ConsistencyPolicy::Auto {
381 height: Arc::new(Mutex::new(None)),
382 },
383 chain_state_info: Default::default(),
384 #[cfg(feature = "rpc")]
385 rpc_client: None,
386 #[cfg(feature = "rpc")]
387 aws_client: AWSClientManager::new(),
388 #[cfg(feature = "rpc")]
389 http_client: remote_block_object_http_client(),
390 })
391 }
392}
393
394impl<S> From<S> for FuelClient
395where
396 S: Into<net::SocketAddr>,
397{
398 fn from(socket: S) -> Self {
399 format!("http://{}", socket.into())
400 .as_str()
401 .parse()
402 .unwrap()
403 }
404}
405
406fn is_legacy_node(version: &str) -> bool {
410 let mut parts = version.splitn(3, '.');
411 let major: u64 = match parts.next().and_then(|s| s.parse().ok()) {
412 Some(v) => v,
413 None => return false,
414 };
415 let minor: u64 = match parts.next().and_then(|s| s.parse().ok()) {
416 Some(v) => v,
417 None => return false,
418 };
419 major == 0 && minor < 48
420}
421
422pub fn from_strings_errors_to_std_error(errors: Vec<String>) -> io::Error {
423 let e = errors
424 .into_iter()
425 .fold(String::from("Response errors"), |mut s, e| {
426 s.push_str("; ");
427 s.push_str(e.as_str());
428 s
429 });
430 io::Error::other(e)
431}
432
433impl FuelClient {
434 pub fn new(url: impl AsRef<str>) -> anyhow::Result<Self> {
435 Self::from_str(url.as_ref())
436 }
437
438 #[cfg(feature = "rpc")]
439 pub async fn new_with_rpc<G: AsRef<str>, R: AsRef<str>>(
440 graph_ql_urls: impl Iterator<Item = G>,
441 rpc_url: R,
442 ) -> anyhow::Result<Self> {
443 let urls: Vec<_> = graph_ql_urls
444 .map(|str| normalize_url(str.as_ref()))
445 .try_collect()?;
446 let mut client = Self::with_urls(&urls)?;
447 let mut raw_rpc_url = <R as AsRef<str>>::as_ref(&rpc_url).to_string();
448 if !raw_rpc_url.starts_with("http") {
449 raw_rpc_url = format!("http://{raw_rpc_url}");
450 }
451 let rpc_client = ProtoBlockAggregatorClient::connect(raw_rpc_url).await?;
452 client.rpc_client = Some(rpc_client);
453 client.aws_client = AWSClientManager::new();
454 Ok(client)
455 }
456
457 pub fn with_urls(urls: &[impl AsRef<str>]) -> anyhow::Result<Self> {
458 if urls.is_empty() {
459 return Err(anyhow!("Failed to create FuelClient. No URL is provided."));
460 }
461 let urls = urls
462 .iter()
463 .map(|url| normalize_url(url.as_ref()))
464 .collect::<Result<Vec<_>, _>>()?;
465 Ok(Self {
466 transport: FailoverTransport::new(urls)?,
467 require_height: ConsistencyPolicy::Auto {
468 height: Arc::new(Mutex::new(None)),
469 },
470 chain_state_info: Default::default(),
471 #[cfg(feature = "rpc")]
472 rpc_client: None,
473 #[cfg(feature = "rpc")]
474 aws_client: AWSClientManager::new(),
475 #[cfg(feature = "rpc")]
476 http_client: remote_block_object_http_client(),
477 })
478 }
479
480 pub fn get_default_url(&self) -> &Url {
481 self.transport.get_default_url()
482 }
483}
484
485impl FuelClient {
486 pub fn with_required_fuel_block_height(
487 &mut self,
488 new_height: Option<BlockHeight>,
489 ) -> &mut Self {
490 match &mut self.require_height {
491 ConsistencyPolicy::Auto { height } => {
492 *height.lock().expect("Mutex poisoned") = new_height;
493 }
494 ConsistencyPolicy::Manual { height } => {
495 *height = new_height;
496 }
497 }
498 self
499 }
500
501 pub fn use_manual_consistency_policy(
502 &mut self,
503 height: Option<BlockHeight>,
504 ) -> &mut Self {
505 self.require_height = ConsistencyPolicy::Manual { height };
506 self
507 }
508
509 pub fn decode_response<R, E>(
510 &self,
511 response: FuelGraphQlResponse<R, E>,
512 ) -> io::Result<R>
513 where
514 R: serde::de::DeserializeOwned + 'static,
515 {
516 if response
517 .extensions
518 .as_ref()
519 .and_then(|e| e.fuel_block_height_precondition_failed)
520 == Some(true)
521 {
522 return Err(io::Error::other("The required block height was not met"));
523 }
524
525 let response = response.response;
526
527 match (response.data, response.errors) {
528 (Some(d), _) => Ok(d),
529 (_, Some(e)) => Err(from_strings_errors_to_std_error(
530 e.into_iter().map(|e| e.message).collect(),
531 )),
532 _ => Err(io::Error::other("Invalid response")),
533 }
534 }
535
536 pub fn required_block_height(&self) -> Option<BlockHeight> {
537 match &self.require_height {
538 ConsistencyPolicy::Auto { height } => height.lock().ok().and_then(|h| *h),
539 ConsistencyPolicy::Manual { height } => *height,
540 }
541 }
542
543 fn update_chain_state_info<R, E>(&self, response: &FuelGraphQlResponse<R, E>) {
544 if let Some(current_sft_version) = response
545 .extensions
546 .as_ref()
547 .and_then(|e| e.current_stf_version)
548 && let Ok(mut c) = self.chain_state_info.current_stf_version.lock()
549 {
550 *c = Some(current_sft_version);
551 }
552
553 if let Some(current_consensus_parameters_version) = response
554 .extensions
555 .as_ref()
556 .and_then(|e| e.current_consensus_parameters_version)
557 && let Ok(mut c) = self
558 .chain_state_info
559 .current_consensus_parameters_version
560 .lock()
561 {
562 *c = Some(current_consensus_parameters_version);
563 }
564
565 let inner_required_height = match &self.require_height {
566 ConsistencyPolicy::Auto { height } => Some(height.clone()),
567 ConsistencyPolicy::Manual { .. } => None,
568 };
569
570 if let Some(inner_required_height) = inner_required_height
571 && let Some(current_fuel_block_height) = response
572 .extensions
573 .as_ref()
574 .and_then(|e| e.current_fuel_block_height)
575 {
576 let mut lock = inner_required_height.lock().expect("Mutex poisoned");
577
578 if current_fuel_block_height >= lock.unwrap_or_default() {
579 *lock = Some(current_fuel_block_height);
580 }
581 }
582 }
583
584 async fn ensure_node_version(&self) -> io::Result<String> {
587 {
588 let lock = self
589 .chain_state_info
590 .node_version
591 .lock()
592 .map_err(|_| io::Error::other("Mutex poisoned"))?;
593 if let Some(v) = lock.as_ref() {
594 return Ok(v.clone());
595 }
596 }
597 let query = schema::node_info::QueryNodeVersionInfo::build(());
598 let version = self.query(query).await?.node_info.node_version;
599 if let Ok(mut lock) = self.chain_state_info.node_version.lock() {
600 *lock = Some(version.clone());
601 }
602 Ok(version)
603 }
604
605 pub async fn query<ResponseData, Vars>(
607 &self,
608 q: Operation<ResponseData, Vars>,
609 ) -> io::Result<ResponseData>
610 where
611 Vars: serde::Serialize + Clone + QueryVariables + Send + 'static,
612 ResponseData: serde::de::DeserializeOwned + QueryFragment + Send + 'static,
613 {
614 let required_fuel_block_height = self.required_block_height();
615 let response = self.transport.query(q, required_fuel_block_height).await?;
616
617 self.update_chain_state_info(&response);
618 self.decode_response(response)
619 }
620
621 #[tracing::instrument(skip_all)]
622 #[cfg(feature = "subscriptions")]
623 async fn subscribe<ResponseData, Variables>(
624 &self,
625 variables: Variables,
626 ) -> io::Result<Pin<Box<impl futures::Stream<Item = io::Result<ResponseData>> + '_>>>
627 where
628 Variables: serde::Serialize + QueryVariables + Send + Clone + 'static,
629 ResponseData: serde::de::DeserializeOwned
630 + QueryFragment
631 + SubscriptionBuilder<Variables>
632 + 'static
633 + Send,
634 {
635 let stream = self
636 .transport
637 .subscribe(variables, self.required_block_height())
638 .await?;
639
640 let client = self; Ok(Box::pin(stream.filter_map(move |result| {
642 async move {
643 match result {
644 Ok(resp) => {
645 client.update_chain_state_info(&resp);
646 Some(client.decode_response(resp))
647 }
648 Err(e) => Some(Err(e)), }
650 }
651 })))
652 }
653
654 pub fn latest_stf_version(&self) -> Option<StateTransitionBytecodeVersion> {
655 self.chain_state_info
656 .current_stf_version
657 .lock()
658 .ok()
659 .and_then(|value| *value)
660 }
661
662 pub fn latest_consensus_parameters_version(
663 &self,
664 ) -> Option<ConsensusParametersVersion> {
665 self.chain_state_info
666 .current_consensus_parameters_version
667 .lock()
668 .ok()
669 .and_then(|value| *value)
670 }
671
672 pub async fn health(&self) -> io::Result<bool> {
673 let query = schema::Health::build(());
674 self.query(query).await.map(|r| r.health)
675 }
676
677 pub async fn node_info(&self) -> io::Result<types::NodeInfo> {
678 let query = schema::node_info::QueryNodeInfo::build(());
679 self.query(query).await.map(|r| r.node_info.into())
680 }
681
682 pub async fn latest_gas_price(&self) -> io::Result<LatestGasPrice> {
683 let query = schema::gas_price::QueryLatestGasPrice::build(());
684 self.query(query).await.map(|r| r.latest_gas_price.into())
685 }
686
687 pub async fn estimate_gas_price(
688 &self,
689 block_horizon: u32,
690 ) -> io::Result<EstimateGasPrice> {
691 let args = BlockHorizonArgs {
692 block_horizon: Some(block_horizon.into()),
693 };
694 let query = schema::gas_price::QueryEstimateGasPrice::build(args);
695 self.query(query).await.map(|r| r.estimate_gas_price)
696 }
697
698 #[cfg(feature = "std")]
699 pub async fn connected_peers_info(
700 &self,
701 ) -> io::Result<Vec<fuel_core_types::services::p2p::PeerInfo>> {
702 let query = schema::node_info::QueryPeersInfo::build(());
703 self.query(query)
704 .await
705 .map(|r| r.node_info.peers.into_iter().map(Into::into).collect())
706 }
707
708 pub async fn chain_info(&self) -> io::Result<types::ChainInfo> {
709 let node_version = self.ensure_node_version().await?;
710 if is_legacy_node(&node_version) {
711 let query = schema::chain::ChainQueryLegacy::build(());
712 self.query(query).await.and_then(|r| {
713 let result = r.chain.try_into()?;
714 Ok(result)
715 })
716 } else {
717 let query = schema::chain::ChainQuery::build(());
718 self.query(query).await.and_then(|r| {
719 let result = r.chain.try_into()?;
720 Ok(result)
721 })
722 }
723 }
724
725 pub async fn consensus_parameters(
726 &self,
727 version: i32,
728 ) -> io::Result<Option<ConsensusParameters>> {
729 let node_version = self.ensure_node_version().await?;
730 let args = schema::upgrades::ConsensusParametersByVersionArgs { version };
731
732 if is_legacy_node(&node_version) {
733 let query =
734 schema::upgrades::ConsensusParametersByVersionQueryLegacy::build(args);
735 let result = self
736 .query(query)
737 .await?
738 .consensus_parameters
739 .map(TryInto::try_into)
740 .transpose()?;
741 Ok(result)
742 } else {
743 let query = schema::upgrades::ConsensusParametersByVersionQuery::build(args);
744 let result = self
745 .query(query)
746 .await?
747 .consensus_parameters
748 .map(TryInto::try_into)
749 .transpose()?;
750 Ok(result)
751 }
752 }
753
754 pub async fn state_transition_byte_code_by_version(
755 &self,
756 version: i32,
757 ) -> io::Result<Option<StateTransitionBytecode>> {
758 let args = schema::upgrades::StateTransitionBytecodeByVersionArgs { version };
759 let query = schema::upgrades::StateTransitionBytecodeByVersionQuery::build(args);
760
761 let result = self
762 .query(query)
763 .await?
764 .state_transition_bytecode_by_version
765 .map(TryInto::try_into)
766 .transpose()?;
767
768 Ok(result)
769 }
770
771 pub async fn state_transition_byte_code_by_root(
772 &self,
773 root: Bytes32,
774 ) -> io::Result<Option<StateTransitionBytecode>> {
775 let args = schema::upgrades::StateTransitionBytecodeByRootArgs {
776 root: HexString(Bytes(root.to_vec())),
777 };
778 let query = schema::upgrades::StateTransitionBytecodeByRootQuery::build(args);
779
780 let result = self
781 .query(query)
782 .await?
783 .state_transition_bytecode_by_root
784 .map(TryInto::try_into)
785 .transpose()?;
786
787 Ok(result)
788 }
789
790 pub async fn dry_run(
792 &self,
793 txs: &[Transaction],
794 ) -> io::Result<Vec<TransactionExecutionStatus>> {
795 self.dry_run_opt(txs, None, None, None).await
796 }
797
798 pub async fn dry_run_opt(
800 &self,
801 txs: &[Transaction],
802 utxo_validation: Option<bool>,
804 gas_price: Option<u64>,
805 at_height: Option<BlockHeight>,
806 ) -> io::Result<Vec<TransactionExecutionStatus>> {
807 let txs = txs
808 .iter()
809 .map(|tx| HexString(Bytes(tx.to_bytes())))
810 .collect::<Vec<HexString>>();
811 let query: Operation<schema::tx::DryRun, DryRunArg> =
812 schema::tx::DryRun::build(DryRunArg {
813 txs,
814 utxo_validation,
815 gas_price: gas_price.map(|gp| gp.into()),
816 block_height: at_height.map(|bh| bh.into()),
817 });
818 let tx_statuses = self.query(query).await.map(|r| r.dry_run)?;
819 tx_statuses
820 .into_iter()
821 .map(|tx_status| tx_status.try_into().map_err(Into::into))
822 .collect()
823 }
824
825 pub async fn dry_run_opt_record_storage_reads(
827 &self,
828 txs: &[Transaction],
829 utxo_validation: Option<bool>,
831 gas_price: Option<u64>,
832 at_height: Option<BlockHeight>,
833 ) -> io::Result<(Vec<TransactionExecutionStatus>, Vec<StorageReadReplayEvent>)> {
834 let txs = txs
835 .iter()
836 .map(|tx| HexString(Bytes(tx.to_bytes())))
837 .collect::<Vec<HexString>>();
838 let query: Operation<schema::tx::DryRunRecordStorageReads, DryRunArg> =
839 schema::tx::DryRunRecordStorageReads::build(DryRunArg {
840 txs,
841 utxo_validation,
842 gas_price: gas_price.map(|gp| gp.into()),
843 block_height: at_height.map(|bh| bh.into()),
844 });
845 let result = self
846 .query(query)
847 .await
848 .map(|r| r.dry_run_record_storage_reads)?;
849 let tx_statuses = result
850 .tx_statuses
851 .into_iter()
852 .map(|tx_status| tx_status.try_into().map_err(Into::into))
853 .collect::<io::Result<Vec<_>>>()?;
854 let storage_reads = result
855 .storage_reads
856 .into_iter()
857 .map(Into::into)
858 .collect::<Vec<_>>();
859 Ok((tx_statuses, storage_reads))
860 }
861
862 pub async fn storage_read_replay(
864 &self,
865 height: &BlockHeight,
866 ) -> io::Result<Vec<StorageReadReplayEvent>> {
867 let query: Operation<StorageReadReplay, StorageReadReplayArgs> =
868 StorageReadReplay::build(StorageReadReplayArgs {
869 height: (*height).into(),
870 });
871 Ok(self
872 .query(query)
873 .await
874 .map(|r| r.storage_read_replay)?
875 .into_iter()
876 .map(Into::into)
877 .collect())
878 }
879
880 #[allow(clippy::too_many_arguments)]
903 pub async fn assemble_tx(
904 &self,
905 tx: &Transaction,
906 block_horizon: u32,
907 required_balances: Vec<RequiredBalance>,
908 fee_address_index: u16,
909 exclude: Option<(Vec<UtxoId>, Vec<Nonce>)>,
910 estimate_predicates: bool,
911 reserve_gas: Option<u64>,
912 ) -> io::Result<AssembleTransactionResult> {
913 let tx = HexString(Bytes(tx.to_bytes()));
914 let block_horizon = block_horizon.into();
915
916 let required_balances: Vec<_> = required_balances
917 .into_iter()
918 .map(schema::tx::RequiredBalance::try_from)
919 .collect::<Result<Vec<_>, _>>()?;
920
921 let fee_address_index = fee_address_index.into();
922
923 let exclude_input = exclude.map(Into::into);
924
925 let reserve_gas = reserve_gas.map(U64::from);
926
927 let query_arg = AssembleTxArg {
928 tx,
929 block_horizon,
930 required_balances,
931 fee_address_index,
932 exclude_input,
933 estimate_predicates,
934 reserve_gas,
935 };
936
937 let query = schema::tx::AssembleTx::build(query_arg);
938 let assemble_tx_result = self.query(query).await.map(|r| r.assemble_tx)?;
939 Ok(assemble_tx_result.try_into()?)
940 }
941
942 pub async fn estimate_predicates(&self, tx: &mut Transaction) -> io::Result<()> {
944 let serialized_tx = tx.to_bytes();
945 let query = schema::tx::EstimatePredicates::build(TxArg {
946 tx: HexString(Bytes(serialized_tx)),
947 });
948 let tx_with_predicate = self.query(query).await.map(|r| r.estimate_predicates)?;
949 let tx_with_predicate: Transaction = tx_with_predicate.try_into()?;
950 *tx = tx_with_predicate;
951 Ok(())
952 }
953
954 pub async fn submit(
955 &self,
956 tx: &Transaction,
957 ) -> io::Result<types::primitives::TransactionId> {
958 self.submit_opt(tx, None).await
959 }
960
961 pub async fn submit_opt(
962 &self,
963 tx: &Transaction,
964 estimate_predicates: Option<bool>,
965 ) -> io::Result<types::primitives::TransactionId> {
966 let tx = tx.clone().to_bytes();
967 let query = schema::tx::Submit::build(TxWithEstimatedPredicatesArg {
968 tx: HexString(Bytes(tx)),
969 estimate_predicates,
970 });
971
972 let id = self.query(query).await.map(|r| r.submit)?.id.into();
973 Ok(id)
974 }
975
976 #[cfg(feature = "subscriptions")]
978 pub async fn submit_and_await_commit(
979 &self,
980 tx: &Transaction,
981 ) -> io::Result<TransactionStatus> {
982 self.submit_and_await_commit_opt(tx, None).await
983 }
984
985 #[cfg(feature = "subscriptions")]
994 pub async fn submit_and_await_commit_opt(
995 &self,
996 tx: &Transaction,
997 estimate_predicates: Option<bool>,
998 ) -> io::Result<TransactionStatus> {
999 let tx = tx.clone().to_bytes();
1000 let variables = TxWithEstimatedPredicatesArg {
1001 tx: HexString(Bytes(tx)),
1002 estimate_predicates,
1003 };
1004
1005 let mut stream = self.subscribe(variables).await?.map(
1006 |r: io::Result<schema::tx::SubmitAndAwaitSubscription>| {
1007 let status: TransactionStatus = r?.submit_and_await.try_into()?;
1008 Result::<_, io::Error>::Ok(status)
1009 },
1010 );
1011
1012 let status = stream.next().await.ok_or_else(|| {
1013 io::Error::other("Failed to get status from the submission")
1014 })??;
1015
1016 Ok(status)
1017 }
1018
1019 #[cfg(feature = "subscriptions")]
1021 pub async fn submit_and_await_commit_with_tx(
1022 &self,
1023 tx: &Transaction,
1024 ) -> io::Result<StatusWithTransaction> {
1025 self.submit_and_await_commit_with_tx_opt(tx, None).await
1026 }
1027
1028 #[cfg(feature = "subscriptions")]
1030 pub async fn submit_and_await_commit_with_tx_opt(
1031 &self,
1032 tx: &Transaction,
1033 estimate_predicates: Option<bool>,
1034 ) -> io::Result<StatusWithTransaction> {
1035 let tx = tx.clone().to_bytes();
1036 let variables = TxWithEstimatedPredicatesArg {
1037 tx: HexString(Bytes(tx)),
1038 estimate_predicates,
1039 };
1040
1041 let mut stream = self.subscribe(variables).await?.map(
1042 |r: io::Result<schema::tx::SubmitAndAwaitSubscriptionWithTransaction>| {
1043 let status: StatusWithTransaction = r?.submit_and_await.try_into()?;
1044 Result::<_, io::Error>::Ok(status)
1045 },
1046 );
1047
1048 let status = stream.next().await.ok_or_else(|| {
1049 io::Error::other("Failed to get status from the submission")
1050 })??;
1051
1052 Ok(status)
1053 }
1054
1055 #[cfg(feature = "subscriptions")]
1057 pub async fn submit_and_await_status(
1058 &self,
1059 tx: &Transaction,
1060 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1061 self.submit_and_await_status_opt(tx, None, None).await
1062 }
1063
1064 #[cfg(feature = "subscriptions")]
1066 pub async fn submit_and_await_status_opt(
1067 &self,
1068 tx: &Transaction,
1069 estimate_predicates: Option<bool>,
1070 include_preconfirmation: Option<bool>,
1071 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1072 use schema::tx::SubmitAndAwaitStatusArg;
1073 let tx = tx.clone().to_bytes();
1074 let variables = SubmitAndAwaitStatusArg {
1075 tx: HexString(Bytes(tx)),
1076 estimate_predicates,
1077 include_preconfirmation,
1078 };
1079
1080 let stream = self.subscribe(variables).await?.map(
1081 |r: io::Result<schema::tx::SubmitAndAwaitStatusSubscription>| {
1082 let status: TransactionStatus = r?.submit_and_await_status.try_into()?;
1083 Result::<_, io::Error>::Ok(status)
1084 },
1085 );
1086
1087 Ok(stream)
1088 }
1089
1090 #[cfg(feature = "subscriptions")]
1092 pub async fn contract_storage_slots(
1093 &self,
1094 contract_id: &ContractId,
1095 ) -> io::Result<impl Stream<Item = io::Result<(Bytes32, Vec<u8>)>> + '_> {
1096 use schema::storage::ContractStorageSlotsArgs;
1097 let variables = ContractStorageSlotsArgs {
1098 contract_id: (*contract_id).into(),
1099 };
1100
1101 let stream = self.subscribe(variables).await?.map(
1102 |result: io::Result<schema::storage::ContractStorageSlots>| {
1103 let result: (Bytes32, Vec<u8>) = result?.contract_storage_slots.into();
1104 Result::<_, io::Error>::Ok(result)
1105 },
1106 );
1107
1108 Ok(stream)
1109 }
1110
1111 #[cfg(feature = "subscriptions")]
1113 pub async fn contract_storage_balances(
1114 &self,
1115 contract_id: &ContractId,
1116 ) -> io::Result<impl Stream<Item = io::Result<schema::contract::ContractBalance>> + '_>
1117 {
1118 use schema::{
1119 contract::ContractBalance,
1120 storage::ContractStorageBalancesArgs,
1121 };
1122 let variables = ContractStorageBalancesArgs {
1123 contract_id: (*contract_id).into(),
1124 };
1125
1126 let stream = self.subscribe(variables).await?.map(
1127 |result: io::Result<schema::storage::ContractStorageBalances>| {
1128 let result: ContractBalance = result?.contract_storage_balances;
1129 Result::<_, io::Error>::Ok(result)
1130 },
1131 );
1132
1133 Ok(stream)
1134 }
1135
1136 #[cfg(feature = "subscriptions")]
1138 pub async fn new_blocks_subscription(
1139 &self,
1140 ) -> io::Result<
1141 impl Stream<
1142 Item = io::Result<fuel_core_types::services::block_importer::ImportResult>,
1143 > + '_,
1144 > {
1145 let stream = self.subscribe(()).await?.map(
1146 |r: io::Result<schema::block::NewBlocksSubscription>| {
1147 let result: fuel_core_types::services::block_importer::ImportResult =
1148 postcard::from_bytes(r?.new_blocks.0.0.as_slice()).map_err(|e| {
1149 io::Error::other(format!(
1150 "Failed to deserialize ImportResult: {e:?}"
1151 ))
1152 })?;
1153 Result::<_, io::Error>::Ok(result)
1154 },
1155 );
1156
1157 Ok(stream)
1158 }
1159
1160 #[cfg(feature = "subscriptions")]
1162 pub async fn preconfirmations_subscription(
1163 &self,
1164 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1165 let stream = self.subscribe(()).await?.map(
1166 |r: io::Result<schema::tx::PreconfirmationsSubscription>| {
1167 let status: TransactionStatus = r?.preconfirmations.try_into()?;
1168 Result::<_, io::Error>::Ok(status)
1169 },
1170 );
1171
1172 Ok(stream)
1173 }
1174
1175 pub async fn contract_slots_values(
1176 &self,
1177 contract_id: &ContractId,
1178 block_height: Option<BlockHeight>,
1179 requested_storage_slots: Vec<Bytes32>,
1180 ) -> io::Result<Vec<(Bytes32, Vec<u8>)>> {
1181 let query = schema::storage::ContractSlotValues::build(
1182 schema::storage::ContractSlotValuesArgs {
1183 contract_id: (*contract_id).into(),
1184 block_height: block_height.map(|b| (*b).into()),
1185 storage_slots: requested_storage_slots
1186 .into_iter()
1187 .map(Into::into)
1188 .collect(),
1189 },
1190 );
1191
1192 self.query(query)
1193 .await
1194 .map(|r| r.contract_slot_values.into_iter().map(Into::into).collect())
1195 }
1196
1197 pub async fn contract_balance_values(
1198 &self,
1199 contract_id: &ContractId,
1200 block_height: Option<BlockHeight>,
1201 requested_storage_slots: Vec<AssetId>,
1202 ) -> io::Result<Vec<schema::contract::ContractBalance>> {
1203 let query = schema::storage::ContractBalanceValues::build(
1204 schema::storage::ContractBalanceValuesArgs {
1205 contract_id: (*contract_id).into(),
1206 block_height: block_height.map(|b| (*b).into()),
1207 assets: requested_storage_slots
1208 .into_iter()
1209 .map(Into::into)
1210 .collect(),
1211 },
1212 );
1213
1214 self.query(query)
1215 .await
1216 .map(|r| r.contract_balance_values.into_iter().collect())
1217 }
1218
1219 pub async fn start_session(&self) -> io::Result<String> {
1220 let query = schema::StartSession::build(());
1221
1222 self.query(query)
1223 .await
1224 .map(|r| r.start_session.into_inner())
1225 }
1226
1227 pub async fn end_session(&self, id: &str) -> io::Result<bool> {
1228 let query = schema::EndSession::build(IdArg { id: id.into() });
1229
1230 self.query(query).await.map(|r| r.end_session)
1231 }
1232
1233 pub async fn reset(&self, id: &str) -> io::Result<bool> {
1234 let query = schema::Reset::build(IdArg { id: id.into() });
1235
1236 self.query(query).await.map(|r| r.reset)
1237 }
1238
1239 pub async fn execute(&self, id: &str, op: &Instruction) -> io::Result<bool> {
1240 let op = serde_json::to_string(op)?;
1241 let query = schema::Execute::build(schema::ExecuteArgs { id: id.into(), op });
1242
1243 self.query(query).await.map(|r| r.execute)
1244 }
1245
1246 pub async fn register(&self, id: &str, register: RegisterId) -> io::Result<Word> {
1247 let query = schema::Register::build(RegisterArgs {
1248 id: id.into(),
1249 register: register.into(),
1250 });
1251
1252 Ok(self.query(query).await?.register.0 as Word)
1253 }
1254
1255 pub async fn memory(&self, id: &str, start: u32, size: u32) -> io::Result<Vec<u8>> {
1256 let query = schema::Memory::build(MemoryArgs {
1257 id: id.into(),
1258 start: start.into(),
1259 size: size.into(),
1260 });
1261
1262 let memory = self.query(query).await?.memory;
1263
1264 Ok(serde_json::from_str(memory.as_str())?)
1265 }
1266
1267 pub async fn set_breakpoint(
1268 &self,
1269 session_id: &str,
1270 contract: fuel_types::ContractId,
1271 pc: u64,
1272 ) -> io::Result<()> {
1273 let operation = SetBreakpoint::build(SetBreakpointArgs {
1274 id: Id::new(session_id),
1275 bp: schema::Breakpoint {
1276 contract: contract.into(),
1277 pc: U64(pc),
1278 },
1279 });
1280
1281 let response = self.query(operation).await?;
1282 assert!(
1283 response.set_breakpoint,
1284 "Setting breakpoint returned invalid reply"
1285 );
1286 Ok(())
1287 }
1288
1289 pub async fn set_single_stepping(
1290 &self,
1291 session_id: &str,
1292 enable: bool,
1293 ) -> io::Result<()> {
1294 let operation = SetSingleStepping::build(SetSingleSteppingArgs {
1295 id: Id::new(session_id),
1296 enable,
1297 });
1298 self.query(operation).await?;
1299 Ok(())
1300 }
1301
1302 pub async fn start_tx(
1303 &self,
1304 session_id: &str,
1305 tx: &Transaction,
1306 ) -> io::Result<RunResult> {
1307 let operation = StartTx::build(StartTxArgs {
1308 id: Id::new(session_id),
1309 tx: serde_json::to_string(tx).expect("Couldn't serialize tx to json"),
1310 });
1311 let response = self.query(operation).await?.start_tx;
1312 Ok(response)
1313 }
1314
1315 pub async fn continue_tx(&self, session_id: &str) -> io::Result<RunResult> {
1316 let operation = ContinueTx::build(ContinueTxArgs {
1317 id: Id::new(session_id),
1318 });
1319 let response = self.query(operation).await?.continue_tx;
1320 Ok(response)
1321 }
1322
1323 pub async fn transaction(
1324 &self,
1325 id: &TxId,
1326 ) -> io::Result<Option<TransactionResponse>> {
1327 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1328
1329 let transaction = self.query(query).await?.transaction;
1330
1331 Ok(transaction.map(|tx| tx.try_into()).transpose()?)
1332 }
1333
1334 pub async fn transaction_status(&self, id: &TxId) -> io::Result<TransactionStatus> {
1336 let query =
1337 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1338
1339 let status = self.query(query).await?.transaction.ok_or_else(|| {
1340 io::Error::new(
1341 ErrorKind::NotFound,
1342 format!("status not found for transaction {id} "),
1343 )
1344 })?;
1345
1346 let status = status
1347 .status
1348 .ok_or_else(|| {
1349 io::Error::new(
1350 ErrorKind::NotFound,
1351 format!("status not found for transaction {id}"),
1352 )
1353 })?
1354 .try_into()?;
1355 Ok(status)
1356 }
1357
1358 #[tracing::instrument(skip(self), level = "debug")]
1359 #[cfg(feature = "subscriptions")]
1360 pub async fn subscribe_transaction_status(
1362 &self,
1363 id: &TxId,
1364 ) -> io::Result<impl futures::Stream<Item = io::Result<TransactionStatus>> + '_> {
1365 self.subscribe_transaction_status_opt(id, None).await
1366 }
1367
1368 #[cfg(feature = "subscriptions")]
1369 pub async fn subscribe_transaction_status_opt(
1371 &self,
1372 id: &TxId,
1373 include_preconfirmation: Option<bool>,
1374 ) -> io::Result<impl Stream<Item = io::Result<TransactionStatus>> + '_> {
1375 use schema::tx::{
1376 StatusChangeSubscription,
1377 StatusChangeSubscriptionArgs,
1378 };
1379 let tx_id: TransactionId = (*id).into();
1380 let variables = StatusChangeSubscriptionArgs {
1381 id: tx_id,
1382 include_preconfirmation,
1383 };
1384
1385 tracing::debug!("subscribing");
1386 let stream = self
1387 .subscribe::<StatusChangeSubscription, StatusChangeSubscriptionArgs>(
1388 variables,
1389 )
1390 .await?
1391 .map(|tx| {
1392 tracing::debug!("received {tx:?}");
1393 let tx = tx?;
1394 let status = tx.status_change.try_into()?;
1395 Ok(status)
1396 });
1397
1398 Ok(stream)
1399 }
1400
1401 #[cfg(feature = "subscriptions")]
1402 pub async fn await_transaction_commit(
1407 &self,
1408 id: &TxId,
1409 ) -> io::Result<TransactionStatus> {
1410 let status_result = self
1413 .subscribe_transaction_status(id)
1414 .await?
1415 .skip_while(|status| {
1416 future::ready(status.as_ref().map_or(true, |status| !status.is_final()))
1417 })
1418 .next()
1419 .await;
1420
1421 if let Some(Ok(status)) = status_result {
1422 Ok(status)
1423 } else {
1424 Err(io::Error::other(format!(
1425 "Failed to get status for transaction {status_result:?}"
1426 )))
1427 }
1428 }
1429
1430 pub async fn transactions(
1432 &self,
1433 request: PaginationRequest<String>,
1434 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1435 let args = schema::ConnectionArgs::from(request);
1436 let query = schema::tx::TransactionsQuery::build(args);
1437 let transactions = self.query(query).await?.transactions.try_into()?;
1438 Ok(transactions)
1439 }
1440
1441 pub async fn transactions_by_owner(
1443 &self,
1444 owner: &Address,
1445 request: PaginationRequest<String>,
1446 ) -> io::Result<PaginatedResult<TransactionResponse, String>> {
1447 let owner: schema::Address = (*owner).into();
1448 let args = TransactionsByOwnerConnectionArgs::from((owner, request));
1449 let query = schema::tx::TransactionsByOwnerQuery::build(args);
1450
1451 let transactions = self.query(query).await?.transactions_by_owner.try_into()?;
1452 Ok(transactions)
1453 }
1454
1455 pub async fn receipts(&self, id: &TxId) -> io::Result<Option<Vec<Receipt>>> {
1456 let query =
1457 schema::tx::TransactionStatusQuery::build(TxIdArgs { id: (*id).into() });
1458
1459 let tx = self.query(query).await?.transaction.ok_or_else(|| {
1460 io::Error::new(ErrorKind::NotFound, format!("transaction {id} not found"))
1461 })?;
1462
1463 let receipts = match tx.status {
1464 Some(status) => match status {
1465 schema::tx::TransactionStatus::SuccessStatus(s) => Some(
1466 s.receipts
1467 .into_iter()
1468 .map(TryInto::<Receipt>::try_into)
1469 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1470 )
1471 .transpose()?,
1472 schema::tx::TransactionStatus::FailureStatus(s) => Some(
1473 s.receipts
1474 .into_iter()
1475 .map(TryInto::<Receipt>::try_into)
1476 .collect::<Result<Vec<Receipt>, ConversionError>>(),
1477 )
1478 .transpose()?,
1479 _ => None,
1480 },
1481 _ => None,
1482 };
1483
1484 Ok(receipts)
1485 }
1486
1487 #[cfg(feature = "test-helpers")]
1488 pub async fn all_receipts(&self) -> io::Result<Vec<Receipt>> {
1489 let query = schema::tx::AllReceipts::build(());
1490 let receipts = self.query(query).await?.all_receipts;
1491
1492 let vec: Result<Vec<Receipt>, ConversionError> = receipts
1493 .into_iter()
1494 .map(TryInto::<Receipt>::try_into)
1495 .collect();
1496
1497 Ok(vec?)
1498 }
1499
1500 pub async fn produce_blocks(
1501 &self,
1502 blocks_to_produce: u32,
1503 start_timestamp: Option<u64>,
1504 ) -> io::Result<BlockHeight> {
1505 let query = schema::block::BlockMutation::build(ProduceBlockArgs {
1506 blocks_to_produce: blocks_to_produce.into(),
1507 start_timestamp: start_timestamp
1508 .map(|timestamp| Tai64Timestamp::from(Tai64(timestamp))),
1509 });
1510
1511 let new_height = self.query(query).await?.produce_blocks;
1512
1513 Ok(new_height.into())
1514 }
1515
1516 pub async fn block(&self, id: &BlockId) -> io::Result<Option<types::Block>> {
1517 let query = schema::block::BlockByIdQuery::build(BlockByIdArgs {
1518 id: Some((*id).into()),
1519 });
1520
1521 let block = self
1522 .query(query)
1523 .await?
1524 .block
1525 .map(TryInto::try_into)
1526 .transpose()?;
1527
1528 Ok(block)
1529 }
1530
1531 pub async fn block_by_height(
1532 &self,
1533 height: BlockHeight,
1534 ) -> io::Result<Option<types::Block>> {
1535 let query = schema::block::BlockByHeightQuery::build(BlockByHeightArgs {
1536 height: Some(U32(height.into())),
1537 });
1538
1539 let block = self
1540 .query(query)
1541 .await?
1542 .block
1543 .map(TryInto::try_into)
1544 .transpose()?;
1545
1546 Ok(block)
1547 }
1548
1549 pub async fn da_compressed_block(
1550 &self,
1551 height: BlockHeight,
1552 ) -> io::Result<Option<Vec<u8>>> {
1553 let query = schema::da_compressed::DaCompressedBlockByHeightQuery::build(
1554 DaCompressedBlockByHeightArgs {
1555 height: U32(height.into()),
1556 },
1557 );
1558
1559 Ok(self
1560 .query(query)
1561 .await?
1562 .da_compressed_block
1563 .map(|b| b.bytes.into()))
1564 }
1565
1566 pub async fn blob(&self, id: BlobId) -> io::Result<Option<types::Blob>> {
1568 let query = schema::blob::BlobByIdQuery::build(BlobByIdArgs { id: id.into() });
1569 let blob = self.query(query).await?.blob.map(Into::into);
1570 Ok(blob)
1571 }
1572
1573 pub async fn blob_exists(&self, id: BlobId) -> io::Result<bool> {
1575 let query = schema::blob::BlobExistsQuery::build(BlobByIdArgs { id: id.into() });
1576 Ok(self.query(query).await?.blob.is_some())
1577 }
1578
1579 pub async fn blocks(
1581 &self,
1582 request: PaginationRequest<String>,
1583 ) -> io::Result<PaginatedResult<types::Block, String>> {
1584 let args = schema::ConnectionArgs::from(request);
1585 let query = schema::block::BlocksQuery::build(args);
1586
1587 let blocks = self.query(query).await?.blocks.try_into()?;
1588
1589 Ok(blocks)
1590 }
1591
1592 pub async fn coin(&self, id: &UtxoId) -> io::Result<Option<types::Coin>> {
1593 let query = schema::coins::CoinByIdQuery::build(CoinByIdArgs {
1594 utxo_id: (*id).into(),
1595 });
1596 let coin = self.query(query).await?.coin.map(Into::into);
1597 Ok(coin)
1598 }
1599
1600 pub async fn coins(
1602 &self,
1603 owner: &Address,
1604 asset_id: Option<&AssetId>,
1605 request: PaginationRequest<String>,
1606 ) -> io::Result<PaginatedResult<types::Coin, String>> {
1607 let owner: schema::Address = (*owner).into();
1608 let asset_id = asset_id.map(|id| (*id).into());
1609 let args = CoinsConnectionArgs::from((owner, asset_id, request));
1610 let query = schema::coins::CoinsQuery::build(args);
1611
1612 let coins = self.query(query).await?.coins.into();
1613 Ok(coins)
1614 }
1615
1616 pub async fn coins_to_spend(
1618 &self,
1619 owner: &Address,
1620 spend_query: Vec<(AssetId, u128, Option<u16>)>,
1621 excluded_ids: Option<(Vec<UtxoId>, Vec<Nonce>)>,
1623 ) -> io::Result<Vec<Vec<types::CoinType>>> {
1624 let owner: schema::Address = (*owner).into();
1625 let spend_query: Vec<SpendQueryElementInput> = spend_query
1626 .iter()
1627 .map(|(asset_id, amount, max)| -> Result<_, ConversionError> {
1628 Ok(SpendQueryElementInput {
1629 asset_id: (*asset_id).into(),
1630 amount: (*amount).into(),
1631 max: (*max).map(|max| max.into()),
1632 })
1633 })
1634 .try_collect()?;
1635 let excluded_ids: Option<ExcludeInput> = excluded_ids.map(Into::into);
1636 let args =
1637 schema::coins::CoinsToSpendArgs::from((owner, spend_query, excluded_ids));
1638 let query = schema::coins::CoinsToSpendQuery::build(args);
1639
1640 let coins_per_asset = self
1641 .query(query)
1642 .await?
1643 .coins_to_spend
1644 .into_iter()
1645 .map(|v| v.into_iter().map(Into::into).collect::<Vec<_>>())
1646 .collect::<Vec<_>>();
1647 Ok(coins_per_asset)
1648 }
1649
1650 pub async fn contract(&self, id: &ContractId) -> io::Result<Option<types::Contract>> {
1651 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1652 id: (*id).into(),
1653 });
1654 let contract = self.query(query).await?.contract.map(Into::into);
1655 Ok(contract)
1656 }
1657
1658 pub async fn contract_balance(
1659 &self,
1660 id: &ContractId,
1661 asset: Option<&AssetId>,
1662 ) -> io::Result<u64> {
1663 let asset_id: schema::AssetId = match asset {
1664 Some(asset) => (*asset).into(),
1665 None => schema::AssetId::default(),
1666 };
1667
1668 let query =
1669 schema::contract::ContractBalanceQuery::build(ContractBalanceQueryArgs {
1670 id: (*id).into(),
1671 asset: asset_id,
1672 });
1673
1674 let balance: types::ContractBalance =
1675 self.query(query).await?.contract_balance.into();
1676 Ok(balance.amount)
1677 }
1678
1679 pub async fn balance(
1680 &self,
1681 owner: &Address,
1682 asset_id: Option<&AssetId>,
1683 ) -> io::Result<u128> {
1684 let owner: schema::Address = (*owner).into();
1685 let asset_id: schema::AssetId = match asset_id {
1686 Some(asset_id) => (*asset_id).into(),
1687 None => schema::AssetId::default(),
1688 };
1689 let query = schema::balance::BalanceQuery::build(BalanceArgs { owner, asset_id });
1690 let balance: types::Balance = self.query(query).await?.balance.into();
1691 Ok(balance.amount)
1692 }
1693
1694 pub async fn balances(
1696 &self,
1697 owner: &Address,
1698 request: PaginationRequest<String>,
1699 ) -> io::Result<PaginatedResult<types::Balance, String>> {
1700 let owner: schema::Address = (*owner).into();
1701 let args = schema::balance::BalancesConnectionArgs::from((owner, request));
1702 let query = schema::balance::BalancesQuery::build(args);
1703
1704 let balances = self.query(query).await?.balances.into();
1705 Ok(balances)
1706 }
1707
1708 pub async fn contract_balances(
1709 &self,
1710 contract: &ContractId,
1711 request: PaginationRequest<String>,
1712 ) -> io::Result<PaginatedResult<types::ContractBalance, String>> {
1713 let contract_id: schema::ContractId = (*contract).into();
1714 let args = ContractBalancesConnectionArgs::from((contract_id, request));
1715 let query = schema::contract::ContractBalancesQuery::build(args);
1716
1717 let balances = self.query(query).await?.contract_balances.into();
1718
1719 Ok(balances)
1720 }
1721
1722 pub async fn message(&self, nonce: &Nonce) -> io::Result<Option<types::Message>> {
1724 let query = schema::message::MessageQuery::build(NonceArgs {
1725 nonce: (*nonce).into(),
1726 });
1727 let message = self.query(query).await?.message.map(Into::into);
1728 Ok(message)
1729 }
1730
1731 pub async fn messages(
1732 &self,
1733 owner: Option<&Address>,
1734 request: PaginationRequest<String>,
1735 ) -> io::Result<PaginatedResult<types::Message, String>> {
1736 let owner: Option<schema::Address> = owner.map(|owner| (*owner).into());
1737 let args = schema::message::OwnedMessagesConnectionArgs::from((owner, request));
1738 let query = schema::message::OwnedMessageQuery::build(args);
1739
1740 let messages = self.query(query).await?.messages.into();
1741
1742 Ok(messages)
1743 }
1744
1745 pub async fn contract_info(
1746 &self,
1747 contract: &ContractId,
1748 ) -> io::Result<Option<types::Contract>> {
1749 let query = schema::contract::ContractByIdQuery::build(ContractByIdArgs {
1750 id: (*contract).into(),
1751 });
1752 let contract_info = self.query(query).await?.contract.map(Into::into);
1753 Ok(contract_info)
1754 }
1755
1756 pub async fn message_status(&self, nonce: &Nonce) -> io::Result<MessageStatus> {
1757 let query = schema::message::MessageStatusQuery::build(MessageStatusArgs {
1758 nonce: (*nonce).into(),
1759 });
1760 let status = self.query(query).await?.message_status.into();
1761
1762 Ok(status)
1763 }
1764
1765 pub async fn message_proof(
1767 &self,
1768 transaction_id: &TxId,
1769 nonce: &Nonce,
1770 commit_block_id: Option<&BlockId>,
1771 commit_block_height: Option<BlockHeight>,
1772 ) -> io::Result<types::MessageProof> {
1773 let transaction_id: TransactionId = (*transaction_id).into();
1774 let nonce: schema::Nonce = (*nonce).into();
1775 let commit_block_id: Option<schema::BlockId> =
1776 commit_block_id.map(|commit_block_id| (*commit_block_id).into());
1777 let commit_block_height = commit_block_height.map(Into::into);
1778 let query = schema::message::MessageProofQuery::build(MessageProofArgs {
1779 transaction_id,
1780 nonce,
1781 commit_block_id,
1782 commit_block_height,
1783 });
1784 let proof = self.query(query).await?.message_proof.try_into()?;
1785 Ok(proof)
1786 }
1787
1788 pub async fn relayed_transaction_status(
1789 &self,
1790 id: &Bytes32,
1791 ) -> io::Result<Option<RelayedTransactionStatus>> {
1792 let query = schema::relayed_tx::RelayedTransactionStatusQuery::build(
1793 RelayedTransactionStatusArgs {
1794 id: id.to_owned().into(),
1795 },
1796 );
1797 let status = self
1798 .query(query)
1799 .await?
1800 .relayed_transaction_status
1801 .map(|status| status.try_into())
1802 .transpose()?;
1803 Ok(status)
1804 }
1805
1806 pub async fn asset_info(
1807 &self,
1808 asset_id: &AssetId,
1809 ) -> io::Result<Option<AssetDetail>> {
1810 let query = schema::assets::AssetInfoQuery::build(AssetInfoArg {
1811 id: (*asset_id).into(),
1812 });
1813 let asset_info = self.query(query).await?.asset_details.map(Into::into);
1814 Ok(asset_info)
1815 }
1816}
1817
1818#[cfg(any(test, feature = "test-helpers"))]
1819impl FuelClient {
1820 pub async fn transparent_transaction(
1821 &self,
1822 id: &TxId,
1823 ) -> io::Result<Option<types::TransactionType>> {
1824 let query = schema::tx::TransactionQuery::build(TxIdArgs { id: (*id).into() });
1825
1826 let transaction = self.query(query).await?.transaction;
1827
1828 Ok(transaction
1829 .map(|tx| {
1830 let response: TransactionResponse = tx.try_into()?;
1831 Ok::<_, ConversionError>(response.transaction)
1832 })
1833 .transpose()?)
1834 }
1835}
1836
1837#[cfg(feature = "rpc")]
1838impl FuelClient {
1839 fn rpc_client(&self) -> io::Result<ProtoBlockAggregatorClient<Channel>> {
1840 self.rpc_client
1841 .clone()
1842 .ok_or(io::Error::other("RPC client not initialized"))
1843 }
1844
1845 pub async fn get_block_range(
1846 &self,
1847 start: BlockHeight,
1848 end: BlockHeight,
1849 ) -> io::Result<
1850 impl Stream<
1851 Item = io::Result<(
1852 fuel_core_types::blockchain::block::Block,
1853 Vec<Vec<Receipt>>,
1854 )>,
1855 >,
1856 > {
1857 let request = ProtoBlockRangeRequest {
1858 start: *start,
1859 end: *end,
1860 };
1861
1862 let stream = self
1863 .rpc_client()?
1864 .get_block_range(request)
1865 .await
1866 .map_err(io::Error::other)?
1867 .into_inner()
1868 .then(|res| {
1869 let maybe_aws_client = self.aws_client.clone();
1870 let http_client = self.http_client.clone();
1871 async move {
1872 let maybe_aws_client = maybe_aws_client.clone();
1873 let resp =
1874 res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
1875 Self::convert_block_response(resp, maybe_aws_client, http_client)
1876 .await
1877 }
1878 });
1879 Ok(stream)
1880 }
1881
1882 async fn convert_block_response(
1883 resp: BlockResponse,
1884 s3_client: AWSClientManager,
1885 http_client: reqwest::Client,
1886 ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec<Vec<Receipt>>)> {
1887 let payload = resp
1888 .payload
1889 .ok_or(io::Error::other("No RPC payload for `BlockResponse`"))?;
1890 match payload {
1891 Payload::Literal(_) => {
1892 Err(io::Error::other("Literal payloads are not supported yet"))
1894 }
1895 Payload::Bytes(bytes) => {
1896 let proto_block =
1897 ProtoBlock::decode(bytes.as_slice()).map_err(io::Error::other)?;
1898 fuel_block_from_protobuf(proto_block).map_err(|e| {
1899 io::Error::other(format!(
1900 "Failed to convert RPC block to internal block: {e:?}"
1901 ))
1902 })
1903 }
1904 Payload::Remote(remote) => {
1905 let RemoteBlockResponse { location } = remote;
1906 match location {
1907 Some(Location::S3(s3)) => {
1908 let RemoteS3Bucket {
1909 bucket,
1910 key,
1911 endpoint,
1912 requester_pays,
1913 } = s3;
1914 let (body, content_encoding) = Self::get_block_from_s3_bucket(
1915 s3_client,
1916 &endpoint,
1917 &bucket,
1918 &key,
1919 requester_pays,
1920 )
1921 .await?;
1922
1923 let block_bytes = Self::decode_body_by_content_encoding(
1924 body.as_ref(),
1925 content_encoding.as_deref(),
1926 )?;
1927 Self::decode_remote_protobuf_block(&block_bytes)
1928 }
1929 Some(Location::Http(http)) => {
1930 let (body, content_encoding) =
1931 Self::get_block_from_http(&http_client, &http).await?;
1932 let block_bytes = Self::decode_body_by_content_encoding(
1933 body.as_ref(),
1934 content_encoding.as_deref(),
1935 )?;
1936 Self::decode_remote_protobuf_block(&block_bytes)
1937 }
1938 None => {
1939 Err(io::Error::other("Remote block response missing location"))
1940 }
1941 }
1942 }
1943 }
1944 }
1945
1946 fn decode_body_by_content_encoding(
1950 body: &[u8],
1951 content_encoding: Option<&str>,
1952 ) -> io::Result<Vec<u8>> {
1953 let Some(header) = content_encoding.map(str::trim).filter(|s| !s.is_empty())
1954 else {
1955 return Ok(body.to_vec());
1956 };
1957 let tokens: Vec<&str> = header
1958 .split(',')
1959 .map(str::trim)
1960 .filter(|t| !t.is_empty())
1961 .collect();
1962 if tokens.is_empty() {
1963 return Ok(body.to_vec());
1964 }
1965 let mut data = body.to_vec();
1966 for &token in tokens.iter().rev() {
1967 let lower = token.trim().to_ascii_lowercase();
1968 if lower.is_empty() || lower == "identity" {
1969 continue;
1970 }
1971 data = Self::decode_one_content_encoding_layer(&data, &lower)?;
1972 }
1973 Ok(data)
1974 }
1975
1976 fn decode_one_content_encoding_layer(
1977 data: &[u8],
1978 token_lower: &str,
1979 ) -> io::Result<Vec<u8>> {
1980 match token_lower {
1981 "gzip" | "x-gzip" => Self::gunzip_remote_block_bytes(data),
1982 "deflate" => Self::inflate_deflate_remote_block_bytes(data),
1983 _ => Err(io::Error::other(format!(
1984 "unsupported Content-Encoding: {token_lower}"
1985 ))),
1986 }
1987 }
1988
1989 fn gunzip_remote_block_bytes(data: &[u8]) -> io::Result<Vec<u8>> {
1990 let mut decoder = GzDecoder::new(data);
1991 let mut output = Vec::new();
1992 decoder
1993 .read_to_end(&mut output)
1994 .map_err(|e| io::Error::other(e.to_string()))?;
1995 Ok(output)
1996 }
1997
1998 fn inflate_deflate_remote_block_bytes(data: &[u8]) -> io::Result<Vec<u8>> {
2004 let mut output = Vec::new();
2005 let mut zlib = ZlibDecoder::new(Cursor::new(data));
2006 if zlib.read_to_end(&mut output).is_ok() {
2007 return Ok(output);
2008 }
2009
2010 output.clear();
2011 let mut raw = DeflateDecoder::new(data);
2012 raw.read_to_end(&mut output)
2013 .map_err(|e| io::Error::other(e.to_string()))?;
2014 Ok(output)
2015 }
2016
2017 fn decode_remote_protobuf_block(
2018 block_bytes: &[u8],
2019 ) -> io::Result<(fuel_core_types::blockchain::block::Block, Vec<Vec<Receipt>>)> {
2020 let block = ProtoBlock::decode(block_bytes)
2021 .map_err(|e| io::Error::other(format!("Failed to decode block: {e}")))?;
2022 fuel_block_from_protobuf(block).map_err(|e| {
2023 io::Error::other(format!(
2024 "Failed to convert RPC block to internal block: {e:?}"
2025 ))
2026 })
2027 }
2028
2029 async fn get_block_from_http(
2030 client: &reqwest::Client,
2031 http: &RemoteHttpEndpoint,
2032 ) -> io::Result<(prost::bytes::Bytes, Option<String>)> {
2033 let mut req = client.get(http.endpoint.clone());
2034 for (k, v) in &http.headers {
2035 req = req.header(k, v);
2036 }
2037 let res = req.send().await.map_err(|e| {
2038 io::Error::other(format!("HTTP fetch for block object failed: {e}"))
2039 })?;
2040 if !res.status().is_success() {
2041 return Err(io::Error::other(format!(
2042 "HTTP {} when fetching block object",
2043 res.status()
2044 )));
2045 }
2046 let content_encoding = res
2047 .headers()
2048 .get(reqwest::header::CONTENT_ENCODING)
2049 .and_then(|v| v.to_str().ok())
2050 .map(std::string::ToString::to_string);
2051 let bytes = res
2052 .bytes()
2053 .await
2054 .map_err(|e| io::Error::other(format!("Reading block object body: {e}")))?;
2055 Ok((bytes, content_encoding))
2056 }
2057
2058 async fn get_block_from_s3_bucket(
2059 s3_client: AWSClientManager,
2060 url: &Option<String>,
2061 bucket: &str,
2062 key: &str,
2063 requester_pays: bool,
2064 ) -> io::Result<(prost::bytes::Bytes, Option<String>)> {
2065 use aws_sdk_s3::types::RequestPayer;
2066 tracing::debug!("getting block from bucket: {} with key {}", bucket, key);
2067 let mut req = s3_client
2068 .get_client(url)
2069 .await?
2070 .get_object()
2071 .bucket(bucket)
2072 .key(key);
2073 if requester_pays {
2074 req = req.request_payer(RequestPayer::Requester);
2075 }
2076 let obj = req.send().await.map_err(|e| {
2077 io::Error::other(format!("Failed to get object from S3: {e:?}"))
2078 })?;
2079 let content_encoding =
2080 obj.content_encoding().map(std::string::ToString::to_string);
2081 let bytes = obj
2082 .body
2083 .collect()
2084 .await
2085 .map_err(|e| {
2086 io::Error::other(format!("Failed to get object from S3: {e:?}"))
2087 })?
2088 .into_bytes();
2089 Ok((bytes, content_encoding))
2090 }
2091
2092 async fn new_aws_client(url: &Option<String>) -> AWSClient {
2093 let credentials = DefaultCredentialsChain::builder().build().await;
2094 let mut config_builder = aws_config::defaults(BehaviorVersion::latest())
2095 .credentials_provider(credentials);
2096 if let Some(url) = url {
2097 config_builder = config_builder.endpoint_url(url);
2098 }
2099 let sdk_config = config_builder.load().await;
2100 let builder = aws_sdk_s3::config::Builder::from(&sdk_config);
2101 let config = builder.force_path_style(true).build();
2102 AWSClient::from_conf(config)
2103 }
2104
2105 pub async fn get_aggregated_height(&self) -> io::Result<BlockHeight> {
2108 let request = ProtoBlockHeightRequest {};
2109 let height = self
2110 .rpc_client()?
2111 .get_synced_block_height(request)
2112 .await
2113 .map_err(io::Error::other)?
2114 .into_inner()
2115 .height
2116 .ok_or(io::Error::other("No height in RPC response"))?;
2117 Ok(BlockHeight::from(height))
2118 }
2119
2120 pub async fn new_block_subscription(
2121 &self,
2122 ) -> io::Result<
2123 impl Stream<
2124 Item = io::Result<(
2125 fuel_core_types::blockchain::block::Block,
2126 Vec<Vec<Receipt>>,
2127 )>,
2128 >,
2129 > {
2130 let request = ProtoNewBlockSubscriptionRequest {};
2131 let stream = self
2132 .rpc_client()?
2133 .new_block_subscription(request)
2134 .await
2135 .map_err(io::Error::other)?
2136 .into_inner()
2137 .then(|res| {
2138 let maybe_aws_client = self.aws_client.clone();
2139 let http_client = self.http_client.clone();
2140 async move {
2141 let maybe_aws_client = maybe_aws_client.clone();
2142 let resp =
2143 res.map_err(|e| io::Error::other(format!("RPC error: {:?}", e)))?;
2144 Self::convert_block_response(resp, maybe_aws_client, http_client)
2145 .await
2146 }
2147 });
2148 Ok(stream)
2149 }
2150}
2151
2152#[cfg(test)]
2153mod tests {
2154 use super::*;
2155
2156 #[test]
2157 fn with_urls_normalizes_urls_to_graphql_endpoint() {
2158 let urls = &["http://localhost:8080", "http://example.com:4000"];
2160
2161 let client = FuelClient::with_urls(urls).expect("should create client");
2163
2164 assert_eq!(
2166 client.get_default_url().as_str(),
2167 "http://localhost:8080/v1/graphql"
2168 );
2169 }
2170
2171 #[test]
2172 fn with_urls_adds_http_scheme_if_missing() {
2173 let urls = &["localhost:8080"];
2175
2176 let client = FuelClient::with_urls(urls).expect("should create client");
2178
2179 assert_eq!(
2181 client.get_default_url().as_str(),
2182 "http://localhost:8080/v1/graphql"
2183 );
2184 }
2185
2186 #[test]
2187 fn with_urls_overwrites_existing_path() {
2188 let urls = &["http://localhost:8080/some/path", "http://example.com/api"];
2190
2191 let client = FuelClient::with_urls(urls).expect("should create client");
2193
2194 assert_eq!(
2196 client.get_default_url().as_str(),
2197 "http://localhost:8080/v1/graphql"
2198 );
2199 }
2200
2201 #[test]
2202 fn new_and_with_urls_produce_same_url() {
2203 let url = "http://localhost:8080";
2205
2206 let client_new = FuelClient::new(url).expect("should create client via new");
2208 let client_with_urls =
2209 FuelClient::with_urls(&[url]).expect("should create client via with_urls");
2210
2211 assert_eq!(
2213 client_new.get_default_url().as_str(),
2214 client_with_urls.get_default_url().as_str()
2215 );
2216 }
2217
2218 #[test]
2219 fn is_legacy_node_version_detection() {
2220 assert!(is_legacy_node("0.47.3"));
2222 assert!(is_legacy_node("0.47.0"));
2223 assert!(is_legacy_node("0.1.0"));
2224 assert!(is_legacy_node("0.0.1"));
2225
2226 assert!(!is_legacy_node("0.48.0"));
2228 assert!(!is_legacy_node("0.49.0"));
2229 assert!(!is_legacy_node("0.100.0"));
2230 assert!(!is_legacy_node("1.0.0"));
2231 assert!(!is_legacy_node("2.47.0"));
2232 }
2233}