fuel_core/service/adapters/
graphql_api.rs

1use super::{
2    BlockImporterAdapter,
3    BlockProducerAdapter,
4    ChainStateInfoProvider,
5    SharedMemoryPool,
6    StaticGasPrice,
7    TxStatusManagerAdapter,
8    compression_adapters::CompressionServiceAdapter,
9    import_result_provider,
10};
11use crate::{
12    database::OnChainIterableKeyValueView,
13    fuel_core_graphql_api::ports::{
14        BlockProducerPort,
15        ChainStateProvider,
16        DatabaseMessageProof,
17        GasPriceEstimate,
18        P2pPort,
19        TxPoolPort,
20        worker::{
21            self,
22            BlockAt,
23        },
24    },
25    graphql_api::ports::{
26        DatabaseDaCompressedBlocks,
27        MemoryPool,
28        TxStatusManager,
29    },
30    service::{
31        adapters::{
32            P2PAdapter,
33            TxPoolAdapter,
34            import_result_provider::ImportResultProvider,
35        },
36        vm_pool::MemoryFromPool,
37    },
38};
39use async_trait::async_trait;
40use fuel_core_compression_service::storage::CompressedBlocks;
41use fuel_core_services::stream::BoxStream;
42use fuel_core_storage::{
43    Result as StorageResult,
44    blueprint::BlueprintCodec,
45    kv_store::KeyValueInspect,
46    not_found,
47    structured_storage::TableWithBlueprint,
48};
49use fuel_core_tx_status_manager::TxStatusMessage;
50use fuel_core_txpool::TxPoolStats;
51use fuel_core_types::{
52    blockchain::header::{
53        ConsensusParametersVersion,
54        StateTransitionBytecodeVersion,
55    },
56    entities::relayer::message::MerkleProof,
57    fuel_tx::{
58        Bytes32,
59        ConsensusParameters,
60        Transaction,
61        TxId,
62    },
63    fuel_types::BlockHeight,
64    services::{
65        block_importer::SharedImportResult,
66        executor::{
67            DryRunResult,
68            StorageReadReplayEvent,
69        },
70        p2p::PeerInfo,
71        transaction_status::TransactionStatus,
72    },
73    tai64::Tai64,
74};
75use std::{
76    ops::Deref,
77    sync::Arc,
78};
79
80mod off_chain;
81mod on_chain;
82
83#[async_trait]
84impl TxStatusManager for TxStatusManagerAdapter {
85    async fn status(&self, tx_id: TxId) -> anyhow::Result<Option<TransactionStatus>> {
86        self.tx_status_manager_shared_data.get_status(tx_id).await
87    }
88
89    async fn tx_update_subscribe(
90        &self,
91        tx_id: TxId,
92    ) -> anyhow::Result<BoxStream<TxStatusMessage>> {
93        self.tx_status_manager_shared_data.subscribe(tx_id).await
94    }
95
96    fn subscribe_txs_updates(
97        &self,
98    ) -> anyhow::Result<BoxStream<anyhow::Result<(TxId, TransactionStatus)>>> {
99        self.tx_status_manager_shared_data.subscribe_all()
100    }
101}
102
103#[async_trait]
104impl TxPoolPort for TxPoolAdapter {
105    async fn transaction(&self, id: TxId) -> anyhow::Result<Option<Transaction>> {
106        Ok(self
107            .service
108            .find_one(id)
109            .await
110            .map_err(|e| anyhow::anyhow!(e))?
111            .map(|info| info.tx().clone().deref().into()))
112    }
113
114    async fn insert(&self, tx: Transaction) -> anyhow::Result<()> {
115        self.service
116            .insert(tx)
117            .await
118            .map_err(|e| anyhow::anyhow!(e))
119    }
120
121    fn latest_pool_stats(&self) -> TxPoolStats {
122        self.service.latest_stats()
123    }
124}
125
126impl DatabaseMessageProof for OnChainIterableKeyValueView {
127    fn block_history_proof(
128        &self,
129        message_block_height: &BlockHeight,
130        commit_block_height: &BlockHeight,
131    ) -> StorageResult<MerkleProof> {
132        self.block_history_proof(message_block_height, commit_block_height)
133    }
134}
135
136#[async_trait]
137impl BlockProducerPort for BlockProducerAdapter {
138    async fn dry_run_txs(
139        &self,
140        transactions: Vec<Transaction>,
141        height: Option<BlockHeight>,
142        time: Option<Tai64>,
143        utxo_validation: Option<bool>,
144        gas_price: Option<u64>,
145        record_storage_reads: bool,
146    ) -> anyhow::Result<DryRunResult> {
147        self.block_producer
148            .dry_run(
149                transactions,
150                height,
151                time,
152                utxo_validation,
153                gas_price,
154                record_storage_reads,
155            )
156            .await
157    }
158
159    async fn storage_read_replay(
160        &self,
161        height: BlockHeight,
162    ) -> anyhow::Result<Vec<StorageReadReplayEvent>> {
163        self.block_producer.storage_read_replay(height).await
164    }
165}
166
167#[async_trait::async_trait]
168impl P2pPort for P2PAdapter {
169    async fn all_peer_info(&self) -> anyhow::Result<Vec<PeerInfo>> {
170        #[cfg(feature = "p2p")]
171        {
172            use fuel_core_types::services::p2p::HeartbeatData;
173            match &self.service {
174                Some(service) => {
175                    let peers = service.get_all_peers().await?;
176                    Ok(peers
177                        .into_iter()
178                        .map(|(peer_id, peer_info)| PeerInfo {
179                            id: fuel_core_types::services::p2p::PeerId::from(
180                                peer_id.to_bytes(),
181                            ),
182                            peer_addresses: peer_info
183                                .peer_addresses
184                                .iter()
185                                .map(|addr| addr.to_string())
186                                .collect(),
187                            client_version: None,
188                            heartbeat_data: HeartbeatData {
189                                block_height: peer_info.heartbeat_data.block_height,
190                                last_heartbeat: peer_info
191                                    .heartbeat_data
192                                    .last_heartbeat_sys,
193                            },
194                            app_score: peer_info.score,
195                        })
196                        .collect())
197                }
198                _ => Ok(vec![]),
199            }
200        }
201        #[cfg(not(feature = "p2p"))]
202        {
203            Ok(vec![])
204        }
205    }
206}
207
208impl worker::TxStatusCompletion for TxStatusManagerAdapter {
209    fn send_complete(
210        &self,
211        id: Bytes32,
212        block_height: &BlockHeight,
213        status: TransactionStatus,
214    ) {
215        tracing::info!("Transaction {id} successfully included in block {block_height}");
216        self.tx_status_manager_shared_data.update_status(id, status);
217    }
218}
219
220impl GasPriceEstimate for StaticGasPrice {
221    fn worst_case_gas_price(&self, _height: BlockHeight) -> Option<u64> {
222        Some(self.gas_price)
223    }
224}
225
226impl ChainStateProvider for ChainStateInfoProvider {
227    fn current_consensus_params(&self) -> Arc<ConsensusParameters> {
228        self.shared_state.latest_consensus_parameters()
229    }
230
231    fn current_consensus_parameters_version(&self) -> ConsensusParametersVersion {
232        self.shared_state.latest_consensus_parameters_version()
233    }
234
235    fn consensus_params_at_version(
236        &self,
237        version: &ConsensusParametersVersion,
238    ) -> anyhow::Result<Arc<ConsensusParameters>> {
239        Ok(self.shared_state.get_consensus_parameters(version)?)
240    }
241
242    fn current_stf_version(&self) -> StateTransitionBytecodeVersion {
243        self.shared_state.latest_stf_version()
244    }
245}
246
247#[derive(Clone)]
248pub struct GraphQLBlockImporter {
249    block_importer_adapter: BlockImporterAdapter,
250    import_result_provider_adapter: ImportResultProvider,
251}
252
253impl GraphQLBlockImporter {
254    pub fn new(
255        block_importer_adapter: BlockImporterAdapter,
256        import_result_provider_adapter: ImportResultProvider,
257    ) -> Self {
258        Self {
259            block_importer_adapter,
260            import_result_provider_adapter,
261        }
262    }
263}
264
265impl From<BlockAt> for import_result_provider::BlockAt {
266    fn from(value: BlockAt) -> Self {
267        match value {
268            BlockAt::Genesis => Self::Genesis,
269            BlockAt::Specific(h) => Self::Specific(h),
270        }
271    }
272}
273
274impl worker::BlockImporter for GraphQLBlockImporter {
275    fn block_events(&self) -> BoxStream<SharedImportResult> {
276        self.block_importer_adapter.events_shared_result()
277    }
278
279    fn block_event_at_height(
280        &self,
281        height: BlockAt,
282    ) -> anyhow::Result<SharedImportResult> {
283        self.import_result_provider_adapter
284            .result_at_height(height.into())
285    }
286}
287
288#[async_trait::async_trait]
289impl MemoryPool for SharedMemoryPool {
290    type Memory = MemoryFromPool;
291
292    async fn get_memory(&self) -> Self::Memory {
293        self.memory_pool.take_raw().await
294    }
295}
296
297impl DatabaseDaCompressedBlocks for CompressionServiceAdapter {
298    fn da_compressed_block(&self, height: &BlockHeight) -> StorageResult<Vec<u8>> {
299        use fuel_core_storage::codec::Encode;
300
301        let encoded_height =
302            <<CompressedBlocks as TableWithBlueprint>::Blueprint as BlueprintCodec<
303                CompressedBlocks,
304            >>::KeyCodec::encode(height);
305        let column = <CompressedBlocks as TableWithBlueprint>::column();
306        self.storage()
307            .get(&encoded_height, column)?
308            .ok_or_else(|| not_found!(CompressedBlocks))
309            .map(|block| block.to_vec())
310    }
311}