fuel_core/service/adapters/
producer.rs

1use crate::{
2    database::OnChainIterableKeyValueView,
3    service::{
4        adapters::{
5            BlockProducerAdapter,
6            ChainStateInfoProvider,
7            ExecutorAdapter,
8            MaybeRelayerAdapter,
9            NewTxWaiter,
10            StaticGasPrice,
11            TransactionsSource,
12            TxPoolAdapter,
13        },
14        sub_services::BlockProducerService,
15    },
16};
17use fuel_core_producer::{
18    block_producer::gas_price::{
19        ChainStateInfoProvider as ChainStateInfoProviderTrait,
20        GasPriceProvider,
21    },
22    ports::{
23        RelayerBlockInfo,
24        TxPool,
25    },
26};
27use fuel_core_storage::{
28    Result as StorageResult,
29    StorageAsRef,
30    iter::{
31        IterDirection,
32        IteratorOverTable,
33    },
34    not_found,
35    tables::{
36        ConsensusParametersVersions,
37        FuelBlocks,
38        StateTransitionBytecodeVersions,
39        Transactions,
40    },
41    transactional::Changes,
42};
43use fuel_core_types::{
44    blockchain::{
45        block::{
46            Block,
47            CompressedBlock,
48        },
49        header::{
50            ConsensusParametersVersion,
51            StateTransitionBytecodeVersion,
52        },
53        primitives::DaBlockHeight,
54    },
55    fuel_tx::{
56        ConsensusParameters,
57        Transaction,
58    },
59    fuel_types::{
60        BlockHeight,
61        Bytes32,
62    },
63    services::{
64        block_producer::Components,
65        executor::{
66            DryRunResult,
67            Result as ExecutorResult,
68            StorageReadReplayEvent,
69            UncommittedResult,
70        },
71    },
72};
73use std::{
74    borrow::Cow,
75    sync::Arc,
76};
77use tokio::time::Instant;
78
79impl BlockProducerAdapter {
80    pub fn new(block_producer: BlockProducerService) -> Self {
81        Self {
82            block_producer: Arc::new(block_producer),
83        }
84    }
85}
86
87impl TxPool for TxPoolAdapter {
88    type TxSource = TransactionsSource;
89
90    async fn get_source(
91        &self,
92        gas_price: u64,
93        _: BlockHeight,
94    ) -> anyhow::Result<Self::TxSource> {
95        Ok(TransactionsSource::new(gas_price, self.service.clone()))
96    }
97}
98
99impl fuel_core_producer::ports::BlockProducer<TransactionsSource> for ExecutorAdapter {
100    type Deadline = Instant;
101    async fn produce_without_commit(
102        &self,
103        component: Components<TransactionsSource>,
104        deadline: Instant,
105    ) -> ExecutorResult<UncommittedResult<Changes>> {
106        let new_tx_waiter = NewTxWaiter::new(self.new_txs_watcher.clone(), deadline);
107        self.executor
108            .produce_without_commit_with_source(
109                component,
110                new_tx_waiter,
111                self.preconfirmation_sender.clone(),
112            )
113            .await
114    }
115}
116
117impl fuel_core_producer::ports::BlockProducer<Vec<Transaction>> for ExecutorAdapter {
118    type Deadline = ();
119    async fn produce_without_commit(
120        &self,
121        component: Components<Vec<Transaction>>,
122        _: (),
123    ) -> ExecutorResult<UncommittedResult<Changes>> {
124        self.produce_without_commit_from_vector(component)
125    }
126}
127
128impl fuel_core_producer::ports::DryRunner for ExecutorAdapter {
129    fn dry_run(
130        &self,
131        block: Components<Vec<Transaction>>,
132        forbid_fake_coins: Option<bool>,
133        at_height: Option<BlockHeight>,
134        record_storage_read_replay: bool,
135    ) -> ExecutorResult<DryRunResult> {
136        self.executor.dry_run(
137            block,
138            forbid_fake_coins,
139            at_height,
140            record_storage_read_replay,
141        )
142    }
143}
144
145impl fuel_core_producer::ports::StorageReadReplayRecorder for ExecutorAdapter {
146    fn storage_read_replay(
147        &self,
148        block: &Block,
149    ) -> ExecutorResult<Vec<StorageReadReplayEvent>> {
150        self.executor.storage_read_replay(block)
151    }
152}
153
154#[async_trait::async_trait]
155impl fuel_core_producer::ports::Relayer for MaybeRelayerAdapter {
156    async fn wait_for_at_least_height(
157        &self,
158        height: &DaBlockHeight,
159    ) -> anyhow::Result<DaBlockHeight> {
160        #[cfg(feature = "relayer")]
161        {
162            match &self.relayer_synced {
163                Some(sync) => {
164                    sync.await_at_least_synced(height).await?;
165                    let highest = sync.get_finalized_da_height();
166                    Ok(highest)
167                }
168                _ => Ok(*height),
169            }
170        }
171        #[cfg(not(feature = "relayer"))]
172        {
173            anyhow::ensure!(
174                **height == 0,
175                "Cannot have a da height above zero without a relayer"
176            );
177            // If the relayer is not enabled, then all blocks are zero.
178            Ok(0u64.into())
179        }
180    }
181
182    async fn get_cost_and_transactions_number_for_block(
183        &self,
184        height: &DaBlockHeight,
185    ) -> anyhow::Result<RelayerBlockInfo> {
186        #[cfg(feature = "relayer")]
187        {
188            let (gas_cost, tx_count) = self
189                .relayer_database
190                .get_events(height)?
191                .iter()
192                .fold((0u64, 0u64), |(gas_cost, tx_count), event| {
193                    let gas_cost = gas_cost.saturating_add(event.cost());
194                    let tx_count = match event {
195                        fuel_core_types::services::relayer::Event::Message(_) => tx_count,
196                        fuel_core_types::services::relayer::Event::Transaction(_) => {
197                            tx_count.saturating_add(1)
198                        }
199                    };
200                    (gas_cost, tx_count)
201                });
202            Ok(RelayerBlockInfo { gas_cost, tx_count })
203        }
204        #[cfg(not(feature = "relayer"))]
205        {
206            anyhow::ensure!(
207                **height == 0,
208                "Cannot have a da height above zero without a relayer"
209            );
210            // If the relayer is not enabled, then all blocks are zero.
211            Ok(RelayerBlockInfo {
212                gas_cost: 0,
213                tx_count: 0,
214            })
215        }
216    }
217}
218
219impl fuel_core_producer::ports::BlockProducerDatabase for OnChainIterableKeyValueView {
220    fn latest_height(&self) -> Option<BlockHeight> {
221        self.latest_height().ok()
222    }
223
224    fn get_block(&self, height: &BlockHeight) -> StorageResult<Cow<'_, CompressedBlock>> {
225        self.storage::<FuelBlocks>()
226            .get(height)?
227            .ok_or(not_found!(FuelBlocks))
228    }
229
230    fn get_full_block(&self, height: &BlockHeight) -> StorageResult<Block> {
231        let block = self.get_block(height)?;
232        let transactions = block
233            .transactions()
234            .iter()
235            .map(|id| {
236                self.storage::<Transactions>()
237                    .get(id)?
238                    .ok_or(not_found!(Transactions))
239                    .map(|tx| tx.into_owned())
240            })
241            .collect::<Result<Vec<_>, _>>()?;
242        Ok(block.into_owned().uncompress(transactions))
243    }
244
245    fn block_header_merkle_root(&self, height: &BlockHeight) -> StorageResult<Bytes32> {
246        self.storage::<FuelBlocks>().root(height).map(Into::into)
247    }
248
249    fn latest_consensus_parameters_version(
250        &self,
251    ) -> StorageResult<ConsensusParametersVersion> {
252        let (version, _) = self
253            .iter_all::<ConsensusParametersVersions>(Some(IterDirection::Reverse))
254            .next()
255            .ok_or(not_found!(ConsensusParametersVersions))??;
256
257        Ok(version)
258    }
259
260    fn latest_state_transition_bytecode_version(
261        &self,
262    ) -> StorageResult<StateTransitionBytecodeVersion> {
263        let (version, _) = self
264            .iter_all::<StateTransitionBytecodeVersions>(Some(IterDirection::Reverse))
265            .next()
266            .ok_or(not_found!(StateTransitionBytecodeVersions))??;
267
268        Ok(version)
269    }
270}
271
272impl GasPriceProvider for StaticGasPrice {
273    fn production_gas_price(&self) -> anyhow::Result<u64> {
274        Ok(self.gas_price)
275    }
276
277    fn dry_run_gas_price(&self) -> anyhow::Result<u64> {
278        Ok(self.gas_price)
279    }
280}
281
282impl ChainStateInfoProviderTrait for ChainStateInfoProvider {
283    fn consensus_params_at_version(
284        &self,
285        version: &ConsensusParametersVersion,
286    ) -> anyhow::Result<Arc<ConsensusParameters>> {
287        Ok(self.shared_state.get_consensus_parameters(version)?)
288    }
289}