fuel_core/service/adapters/
producer.rs1use crate::{
2 database::OnChainIterableKeyValueView,
3 service::{
4 adapters::{
5 BlockProducerAdapter,
6 ChainStateInfoProvider,
7 ExecutorAdapter,
8 MaybeRelayerAdapter,
9 NewTxWaiter,
10 StaticGasPrice,
11 TransactionsSource,
12 TxPoolAdapter,
13 },
14 sub_services::BlockProducerService,
15 },
16};
17use fuel_core_producer::{
18 block_producer::gas_price::{
19 ChainStateInfoProvider as ChainStateInfoProviderTrait,
20 GasPriceProvider,
21 },
22 ports::{
23 RelayerBlockInfo,
24 TxPool,
25 },
26};
27use fuel_core_storage::{
28 Result as StorageResult,
29 StorageAsRef,
30 iter::{
31 IterDirection,
32 IteratorOverTable,
33 },
34 not_found,
35 tables::{
36 ConsensusParametersVersions,
37 FuelBlocks,
38 StateTransitionBytecodeVersions,
39 Transactions,
40 },
41 transactional::Changes,
42};
43use fuel_core_types::{
44 blockchain::{
45 block::{
46 Block,
47 CompressedBlock,
48 },
49 header::{
50 ConsensusParametersVersion,
51 StateTransitionBytecodeVersion,
52 },
53 primitives::DaBlockHeight,
54 },
55 fuel_tx::{
56 ConsensusParameters,
57 Transaction,
58 },
59 fuel_types::{
60 BlockHeight,
61 Bytes32,
62 },
63 services::{
64 block_producer::Components,
65 executor::{
66 DryRunResult,
67 Result as ExecutorResult,
68 StorageReadReplayEvent,
69 UncommittedResult,
70 },
71 },
72};
73use std::{
74 borrow::Cow,
75 sync::Arc,
76};
77use tokio::time::Instant;
78
79impl BlockProducerAdapter {
80 pub fn new(block_producer: BlockProducerService) -> Self {
81 Self {
82 block_producer: Arc::new(block_producer),
83 }
84 }
85}
86
87impl TxPool for TxPoolAdapter {
88 type TxSource = TransactionsSource;
89
90 async fn get_source(
91 &self,
92 gas_price: u64,
93 _: BlockHeight,
94 ) -> anyhow::Result<Self::TxSource> {
95 Ok(TransactionsSource::new(gas_price, self.service.clone()))
96 }
97}
98
99impl fuel_core_producer::ports::BlockProducer<TransactionsSource> for ExecutorAdapter {
100 type Deadline = Instant;
101 async fn produce_without_commit(
102 &self,
103 component: Components<TransactionsSource>,
104 deadline: Instant,
105 ) -> ExecutorResult<UncommittedResult<Changes>> {
106 let new_tx_waiter = NewTxWaiter::new(self.new_txs_watcher.clone(), deadline);
107 self.executor
108 .produce_without_commit_with_source(
109 component,
110 new_tx_waiter,
111 self.preconfirmation_sender.clone(),
112 )
113 .await
114 }
115}
116
117impl fuel_core_producer::ports::BlockProducer<Vec<Transaction>> for ExecutorAdapter {
118 type Deadline = ();
119 async fn produce_without_commit(
120 &self,
121 component: Components<Vec<Transaction>>,
122 _: (),
123 ) -> ExecutorResult<UncommittedResult<Changes>> {
124 self.produce_without_commit_from_vector(component)
125 }
126}
127
128impl fuel_core_producer::ports::DryRunner for ExecutorAdapter {
129 fn dry_run(
130 &self,
131 block: Components<Vec<Transaction>>,
132 forbid_fake_coins: Option<bool>,
133 at_height: Option<BlockHeight>,
134 record_storage_read_replay: bool,
135 ) -> ExecutorResult<DryRunResult> {
136 self.executor.dry_run(
137 block,
138 forbid_fake_coins,
139 at_height,
140 record_storage_read_replay,
141 )
142 }
143}
144
145impl fuel_core_producer::ports::StorageReadReplayRecorder for ExecutorAdapter {
146 fn storage_read_replay(
147 &self,
148 block: &Block,
149 ) -> ExecutorResult<Vec<StorageReadReplayEvent>> {
150 self.executor.storage_read_replay(block)
151 }
152}
153
154#[async_trait::async_trait]
155impl fuel_core_producer::ports::Relayer for MaybeRelayerAdapter {
156 async fn wait_for_at_least_height(
157 &self,
158 height: &DaBlockHeight,
159 ) -> anyhow::Result<DaBlockHeight> {
160 #[cfg(feature = "relayer")]
161 {
162 match &self.relayer_synced {
163 Some(sync) => {
164 sync.await_at_least_synced(height).await?;
165 let highest = sync.get_finalized_da_height();
166 Ok(highest)
167 }
168 _ => Ok(*height),
169 }
170 }
171 #[cfg(not(feature = "relayer"))]
172 {
173 anyhow::ensure!(
174 **height == 0,
175 "Cannot have a da height above zero without a relayer"
176 );
177 Ok(0u64.into())
179 }
180 }
181
182 async fn get_cost_and_transactions_number_for_block(
183 &self,
184 height: &DaBlockHeight,
185 ) -> anyhow::Result<RelayerBlockInfo> {
186 #[cfg(feature = "relayer")]
187 {
188 let (gas_cost, tx_count) = self
189 .relayer_database
190 .get_events(height)?
191 .iter()
192 .fold((0u64, 0u64), |(gas_cost, tx_count), event| {
193 let gas_cost = gas_cost.saturating_add(event.cost());
194 let tx_count = match event {
195 fuel_core_types::services::relayer::Event::Message(_) => tx_count,
196 fuel_core_types::services::relayer::Event::Transaction(_) => {
197 tx_count.saturating_add(1)
198 }
199 };
200 (gas_cost, tx_count)
201 });
202 Ok(RelayerBlockInfo { gas_cost, tx_count })
203 }
204 #[cfg(not(feature = "relayer"))]
205 {
206 anyhow::ensure!(
207 **height == 0,
208 "Cannot have a da height above zero without a relayer"
209 );
210 Ok(RelayerBlockInfo {
212 gas_cost: 0,
213 tx_count: 0,
214 })
215 }
216 }
217}
218
219impl fuel_core_producer::ports::BlockProducerDatabase for OnChainIterableKeyValueView {
220 fn latest_height(&self) -> Option<BlockHeight> {
221 self.latest_height().ok()
222 }
223
224 fn get_block(&self, height: &BlockHeight) -> StorageResult<Cow<'_, CompressedBlock>> {
225 self.storage::<FuelBlocks>()
226 .get(height)?
227 .ok_or(not_found!(FuelBlocks))
228 }
229
230 fn get_full_block(&self, height: &BlockHeight) -> StorageResult<Block> {
231 let block = self.get_block(height)?;
232 let transactions = block
233 .transactions()
234 .iter()
235 .map(|id| {
236 self.storage::<Transactions>()
237 .get(id)?
238 .ok_or(not_found!(Transactions))
239 .map(|tx| tx.into_owned())
240 })
241 .collect::<Result<Vec<_>, _>>()?;
242 Ok(block.into_owned().uncompress(transactions))
243 }
244
245 fn block_header_merkle_root(&self, height: &BlockHeight) -> StorageResult<Bytes32> {
246 self.storage::<FuelBlocks>().root(height).map(Into::into)
247 }
248
249 fn latest_consensus_parameters_version(
250 &self,
251 ) -> StorageResult<ConsensusParametersVersion> {
252 let (version, _) = self
253 .iter_all::<ConsensusParametersVersions>(Some(IterDirection::Reverse))
254 .next()
255 .ok_or(not_found!(ConsensusParametersVersions))??;
256
257 Ok(version)
258 }
259
260 fn latest_state_transition_bytecode_version(
261 &self,
262 ) -> StorageResult<StateTransitionBytecodeVersion> {
263 let (version, _) = self
264 .iter_all::<StateTransitionBytecodeVersions>(Some(IterDirection::Reverse))
265 .next()
266 .ok_or(not_found!(StateTransitionBytecodeVersions))??;
267
268 Ok(version)
269 }
270}
271
272impl GasPriceProvider for StaticGasPrice {
273 fn production_gas_price(&self) -> anyhow::Result<u64> {
274 Ok(self.gas_price)
275 }
276
277 fn dry_run_gas_price(&self) -> anyhow::Result<u64> {
278 Ok(self.gas_price)
279 }
280}
281
282impl ChainStateInfoProviderTrait for ChainStateInfoProvider {
283 fn consensus_params_at_version(
284 &self,
285 version: &ConsensusParametersVersion,
286 ) -> anyhow::Result<Arc<ConsensusParameters>> {
287 Ok(self.shared_state.get_consensus_parameters(version)?)
288 }
289}