fuel_core/service/adapters/
graphql_api.rs1use super::{
2 BlockImporterAdapter,
3 BlockProducerAdapter,
4 ChainStateInfoProvider,
5 SharedMemoryPool,
6 StaticGasPrice,
7 TxStatusManagerAdapter,
8 compression_adapters::CompressionServiceAdapter,
9 import_result_provider,
10};
11use crate::{
12 database::OnChainIterableKeyValueView,
13 fuel_core_graphql_api::ports::{
14 BlockProducerPort,
15 ChainStateProvider,
16 DatabaseMessageProof,
17 GasPriceEstimate,
18 P2pPort,
19 TxPoolPort,
20 worker::{
21 self,
22 BlockAt,
23 },
24 },
25 graphql_api::ports::{
26 DatabaseDaCompressedBlocks,
27 MemoryPool,
28 TxStatusManager,
29 },
30 service::{
31 adapters::{
32 P2PAdapter,
33 TxPoolAdapter,
34 import_result_provider::ImportResultProvider,
35 },
36 vm_pool::MemoryFromPool,
37 },
38};
39use async_trait::async_trait;
40use fuel_core_compression_service::storage::CompressedBlocks;
41use fuel_core_services::stream::BoxStream;
42use fuel_core_storage::{
43 Result as StorageResult,
44 blueprint::BlueprintCodec,
45 kv_store::KeyValueInspect,
46 not_found,
47 structured_storage::TableWithBlueprint,
48};
49use fuel_core_tx_status_manager::TxStatusMessage;
50use fuel_core_txpool::TxPoolStats;
51use fuel_core_types::{
52 blockchain::header::{
53 ConsensusParametersVersion,
54 StateTransitionBytecodeVersion,
55 },
56 entities::relayer::message::MerkleProof,
57 fuel_tx::{
58 Bytes32,
59 ConsensusParameters,
60 Transaction,
61 TxId,
62 },
63 fuel_types::BlockHeight,
64 services::{
65 block_importer::SharedImportResult,
66 executor::{
67 DryRunResult,
68 StorageReadReplayEvent,
69 },
70 p2p::PeerInfo,
71 transaction_status::TransactionStatus,
72 },
73 tai64::Tai64,
74};
75use std::{
76 ops::Deref,
77 sync::Arc,
78};
79
80mod off_chain;
81mod on_chain;
82
83#[async_trait]
84impl TxStatusManager for TxStatusManagerAdapter {
85 async fn status(&self, tx_id: TxId) -> anyhow::Result<Option<TransactionStatus>> {
86 self.tx_status_manager_shared_data.get_status(tx_id).await
87 }
88
89 async fn tx_update_subscribe(
90 &self,
91 tx_id: TxId,
92 ) -> anyhow::Result<BoxStream<TxStatusMessage>> {
93 self.tx_status_manager_shared_data.subscribe(tx_id).await
94 }
95
96 fn subscribe_txs_updates(
97 &self,
98 ) -> anyhow::Result<BoxStream<anyhow::Result<(TxId, TransactionStatus)>>> {
99 self.tx_status_manager_shared_data.subscribe_all()
100 }
101}
102
103#[async_trait]
104impl TxPoolPort for TxPoolAdapter {
105 async fn transaction(&self, id: TxId) -> anyhow::Result<Option<Transaction>> {
106 Ok(self
107 .service
108 .find_one(id)
109 .await
110 .map_err(|e| anyhow::anyhow!(e))?
111 .map(|info| info.tx().clone().deref().into()))
112 }
113
114 async fn insert(&self, tx: Transaction) -> anyhow::Result<()> {
115 self.service
116 .insert(tx)
117 .await
118 .map_err(|e| anyhow::anyhow!(e))
119 }
120
121 fn latest_pool_stats(&self) -> TxPoolStats {
122 self.service.latest_stats()
123 }
124}
125
126impl DatabaseMessageProof for OnChainIterableKeyValueView {
127 fn block_history_proof(
128 &self,
129 message_block_height: &BlockHeight,
130 commit_block_height: &BlockHeight,
131 ) -> StorageResult<MerkleProof> {
132 self.block_history_proof(message_block_height, commit_block_height)
133 }
134}
135
136#[async_trait]
137impl BlockProducerPort for BlockProducerAdapter {
138 async fn dry_run_txs(
139 &self,
140 transactions: Vec<Transaction>,
141 height: Option<BlockHeight>,
142 time: Option<Tai64>,
143 utxo_validation: Option<bool>,
144 gas_price: Option<u64>,
145 record_storage_reads: bool,
146 ) -> anyhow::Result<DryRunResult> {
147 self.block_producer
148 .dry_run(
149 transactions,
150 height,
151 time,
152 utxo_validation,
153 gas_price,
154 record_storage_reads,
155 )
156 .await
157 }
158
159 async fn storage_read_replay(
160 &self,
161 height: BlockHeight,
162 ) -> anyhow::Result<Vec<StorageReadReplayEvent>> {
163 self.block_producer.storage_read_replay(height).await
164 }
165}
166
167#[async_trait::async_trait]
168impl P2pPort for P2PAdapter {
169 async fn all_peer_info(&self) -> anyhow::Result<Vec<PeerInfo>> {
170 #[cfg(feature = "p2p")]
171 {
172 use fuel_core_types::services::p2p::HeartbeatData;
173 match &self.service {
174 Some(service) => {
175 let peers = service.get_all_peers().await?;
176 Ok(peers
177 .into_iter()
178 .map(|(peer_id, peer_info)| PeerInfo {
179 id: fuel_core_types::services::p2p::PeerId::from(
180 peer_id.to_bytes(),
181 ),
182 peer_addresses: peer_info
183 .peer_addresses
184 .iter()
185 .map(|addr| addr.to_string())
186 .collect(),
187 client_version: None,
188 heartbeat_data: HeartbeatData {
189 block_height: peer_info.heartbeat_data.block_height,
190 last_heartbeat: peer_info
191 .heartbeat_data
192 .last_heartbeat_sys,
193 },
194 app_score: peer_info.score,
195 })
196 .collect())
197 }
198 _ => Ok(vec![]),
199 }
200 }
201 #[cfg(not(feature = "p2p"))]
202 {
203 Ok(vec![])
204 }
205 }
206}
207
208impl worker::TxStatusCompletion for TxStatusManagerAdapter {
209 fn send_complete(
210 &self,
211 id: Bytes32,
212 block_height: &BlockHeight,
213 status: TransactionStatus,
214 ) {
215 tracing::info!("Transaction {id} successfully included in block {block_height}");
216 self.tx_status_manager_shared_data.update_status(id, status);
217 }
218}
219
220impl GasPriceEstimate for StaticGasPrice {
221 fn worst_case_gas_price(&self, _height: BlockHeight) -> Option<u64> {
222 Some(self.gas_price)
223 }
224}
225
226impl ChainStateProvider for ChainStateInfoProvider {
227 fn current_consensus_params(&self) -> Arc<ConsensusParameters> {
228 self.shared_state.latest_consensus_parameters()
229 }
230
231 fn current_consensus_parameters_version(&self) -> ConsensusParametersVersion {
232 self.shared_state.latest_consensus_parameters_version()
233 }
234
235 fn consensus_params_at_version(
236 &self,
237 version: &ConsensusParametersVersion,
238 ) -> anyhow::Result<Arc<ConsensusParameters>> {
239 Ok(self.shared_state.get_consensus_parameters(version)?)
240 }
241
242 fn current_stf_version(&self) -> StateTransitionBytecodeVersion {
243 self.shared_state.latest_stf_version()
244 }
245}
246
247#[derive(Clone)]
248pub struct GraphQLBlockImporter {
249 block_importer_adapter: BlockImporterAdapter,
250 import_result_provider_adapter: ImportResultProvider,
251}
252
253impl GraphQLBlockImporter {
254 pub fn new(
255 block_importer_adapter: BlockImporterAdapter,
256 import_result_provider_adapter: ImportResultProvider,
257 ) -> Self {
258 Self {
259 block_importer_adapter,
260 import_result_provider_adapter,
261 }
262 }
263}
264
265impl From<BlockAt> for import_result_provider::BlockAt {
266 fn from(value: BlockAt) -> Self {
267 match value {
268 BlockAt::Genesis => Self::Genesis,
269 BlockAt::Specific(h) => Self::Specific(h),
270 }
271 }
272}
273
274impl worker::BlockImporter for GraphQLBlockImporter {
275 fn block_events(&self) -> BoxStream<SharedImportResult> {
276 self.block_importer_adapter.events_shared_result()
277 }
278
279 fn block_event_at_height(
280 &self,
281 height: BlockAt,
282 ) -> anyhow::Result<SharedImportResult> {
283 self.import_result_provider_adapter
284 .result_at_height(height.into())
285 }
286}
287
288#[async_trait::async_trait]
289impl MemoryPool for SharedMemoryPool {
290 type Memory = MemoryFromPool;
291
292 async fn get_memory(&self) -> Self::Memory {
293 self.memory_pool.take_raw().await
294 }
295}
296
297impl DatabaseDaCompressedBlocks for CompressionServiceAdapter {
298 fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult<Vec<u8>> {
299 use fuel_core_storage::codec::Encode;
300
301 let encoded_height =
302 <<CompressedBlocks as TableWithBlueprint>::Blueprint as BlueprintCodec<
303 CompressedBlocks,
304 >>::KeyCodec::encode(height);
305 let column = <CompressedBlocks as TableWithBlueprint>::column();
306 self.storage()
307 .get(&encoded_height, column)?
308 .ok_or_else(|| not_found!(CompressedBlocks))
309 .map(|block| block.to_vec())
310 }
311}