fuel_core_producer/
block_producer.rs

1use crate::{
2    Config,
3    block_producer::gas_price::{
4        ChainStateInfoProvider,
5        GasPriceProvider as GasPriceProviderConstraint,
6    },
7    ports::{
8        self,
9        BlockProducerDatabase,
10        RelayerBlockInfo,
11    },
12};
13use anyhow::{
14    Context,
15    anyhow,
16};
17use fuel_core_storage::transactional::{
18    AtomicView,
19    Changes,
20    HistoricalView,
21};
22use fuel_core_types::{
23    blockchain::{
24        block::Block,
25        header::{
26            ApplicationHeader,
27            ConsensusHeader,
28            PartialBlockHeader,
29        },
30        primitives::DaBlockHeight,
31    },
32    fuel_tx::{
33        Transaction,
34        field::{
35            InputContract,
36            MintGasPrice,
37        },
38    },
39    fuel_types::{
40        BlockHeight,
41        Bytes32,
42    },
43    services::{
44        block_producer::Components,
45        executor::{
46            DryRunResult,
47            StorageReadReplayEvent,
48            UncommittedResult,
49        },
50    },
51    tai64::Tai64,
52};
53use std::{
54    future::Future,
55    sync::Arc,
56};
57use tokio::sync::Mutex;
58use tracing::debug;
59
60#[cfg(test)]
61mod tests;
62
63pub mod gas_price;
64
65#[derive(Debug, derive_more::Display)]
66pub enum Error {
67    #[display(fmt = "Genesis block is absent")]
68    NoGenesisBlock,
69    #[display(
70        fmt = "The block height {height} should be higher than the previous block height {previous_block}"
71    )]
72    BlockHeightShouldBeHigherThanPrevious {
73        height: BlockHeight,
74        previous_block: BlockHeight,
75    },
76    #[display(fmt = "Previous block height {_0} doesn't exist")]
77    MissingBlock(BlockHeight),
78    #[display(
79        fmt = "Best finalized da_height {best} is behind previous block da_height {previous_block}"
80    )]
81    InvalidDaFinalizationState {
82        best: DaBlockHeight,
83        previous_block: DaBlockHeight,
84    },
85}
86
87impl From<Error> for anyhow::Error {
88    fn from(error: Error) -> Self {
89        anyhow::Error::msg(error)
90    }
91}
92
93pub struct Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
94{
95    pub config: Config,
96    pub view_provider: ViewProvider,
97    pub txpool: TxPool,
98    pub executor: Arc<Executor>,
99    pub relayer: Box<dyn ports::Relayer>,
100    // use a tokio lock since we want callers to yield until the previous block
101    // execution has completed (which may take a while).
102    pub lock: Mutex<()>,
103    pub gas_price_provider: GasPriceProvider,
104    pub chain_state_info_provider: ChainStateProvider,
105}
106
107impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
108    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
109where
110    ViewProvider: AtomicView + 'static,
111    ViewProvider::LatestView: BlockProducerDatabase,
112    ChainStateProvider: ChainStateInfoProvider,
113{
114    pub async fn produce_and_execute_predefined<D>(
115        &self,
116        predefined_block: &Block,
117        deadline: D,
118    ) -> anyhow::Result<UncommittedResult<Changes>>
119    where
120        Executor: ports::BlockProducer<Vec<Transaction>, Deadline = D> + 'static,
121    {
122        let _production_guard = self.lock.try_lock().map_err(|_| {
123            anyhow!("Failed to acquire the production lock, block production is already in progress")
124        })?;
125
126        let mut transactions_source = predefined_block.transactions().to_vec();
127
128        let height = predefined_block.header().consensus().height;
129
130        let block_time = predefined_block.header().consensus().time;
131
132        let da_height = predefined_block.header().da_height();
133
134        let view = self.view_provider.latest_view()?;
135
136        let header_to_produce =
137            self.new_header_with_da_height(height, block_time, da_height, &view)?;
138
139        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
140
141        if header_to_produce.height() <= &latest_height {
142            return Err(Error::BlockHeightShouldBeHigherThanPrevious {
143                height,
144                previous_block: latest_height,
145            }
146            .into())
147        }
148
149        let maybe_mint_tx = transactions_source.pop();
150        let mint_tx =
151            maybe_mint_tx
152                .and_then(|tx| tx.as_mint().cloned())
153                .ok_or(anyhow!(
154                    "The last transaction in the block should be a mint transaction"
155                ))?;
156
157        let gas_price = *mint_tx.gas_price();
158        let coinbase_recipient = mint_tx.input_contract().contract_id;
159
160        let component = Components {
161            header_to_produce,
162            transactions_source,
163            coinbase_recipient,
164            gas_price,
165        };
166
167        let result = self
168            .executor
169            .produce_without_commit(component, deadline)
170            .await
171            .map_err(Into::<anyhow::Error>::into)
172            .with_context(|| {
173                format!("Failed to produce block {height:?} due to execution failure")
174            })?;
175
176        debug!("Produced block with result: {:?}", result.result());
177        Ok(result)
178    }
179}
180impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
181    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
182where
183    ViewProvider: AtomicView + 'static,
184    ViewProvider::LatestView: BlockProducerDatabase,
185    GasPriceProvider: GasPriceProviderConstraint,
186    ChainStateProvider: ChainStateInfoProvider,
187{
188    /// Produces and execute block for the specified height.
189    async fn produce_and_execute<TxSource, F, Deadline>(
190        &self,
191        height: BlockHeight,
192        block_time: Tai64,
193        tx_source: impl FnOnce(u64, BlockHeight) -> F,
194        deadline: Deadline,
195    ) -> anyhow::Result<UncommittedResult<Changes>>
196    where
197        Executor: ports::BlockProducer<TxSource, Deadline = Deadline> + 'static,
198        F: Future<Output = anyhow::Result<TxSource>>,
199    {
200        //  - get previous block info (hash, root, etc)
201        //  - select best da_height from relayer
202        //  - get available txs from txpool
203        //  - select best txs based on factors like:
204        //      1. fees
205        //      2. parallel throughput
206        //  - Execute block with production mode to correctly malleate txs outputs and block headers
207
208        // prevent simultaneous block production calls
209        let _production_guard = self.lock.try_lock().map_err(|_| {
210            anyhow!("Failed to acquire the production lock, block production is already in progress")
211        })?;
212
213        let gas_price = self.production_gas_price().await?;
214
215        let source = tx_source(gas_price, height).await?;
216
217        let view = self.view_provider.latest_view()?;
218
219        let header = self
220            .new_header_with_new_da_height(height, block_time, &view)
221            .await?;
222
223        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
224
225        if header.height() <= &latest_height {
226            return Err(Error::BlockHeightShouldBeHigherThanPrevious {
227                height,
228                previous_block: latest_height,
229            }
230            .into())
231        }
232
233        let component = Components {
234            header_to_produce: header,
235            transactions_source: source,
236            coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
237            gas_price,
238        };
239
240        // Store the context string in case we error.
241        let context_string =
242            format!("Failed to produce block {height:?} due to execution failure");
243        let result = self
244            .executor
245            .produce_without_commit(component, deadline)
246            .await
247            .map_err(Into::<anyhow::Error>::into)
248            .context(context_string)?;
249
250        debug!("Produced block with result: {:?}", result.result());
251        Ok(result)
252    }
253
254    async fn production_gas_price(&self) -> anyhow::Result<u64> {
255        self.gas_price_provider
256            .production_gas_price()
257            .map_err(|e| anyhow!("No gas price found: {e:?}"))
258    }
259
260    async fn dry_run_gas_price(&self) -> anyhow::Result<u64> {
261        self.gas_price_provider
262            .dry_run_gas_price()
263            .map_err(|e| anyhow!("No gas price found: {e:?}"))
264    }
265}
266
267impl<
268    ViewProvider,
269    TxPool,
270    Executor,
271    TxSource,
272    GasPriceProvider,
273    ChainStateProvider,
274    Deadline,
275> Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
276where
277    ViewProvider: AtomicView + 'static,
278    ViewProvider::LatestView: BlockProducerDatabase,
279    TxPool: ports::TxPool<TxSource = TxSource> + 'static,
280    Executor: ports::BlockProducer<TxSource, Deadline = Deadline> + 'static,
281    GasPriceProvider: GasPriceProviderConstraint,
282    ChainStateProvider: ChainStateInfoProvider,
283{
284    /// Produces and execute block for the specified height with transactions from the `TxPool`.
285    pub async fn produce_and_execute_block_txpool(
286        &self,
287        height: BlockHeight,
288        block_time: Tai64,
289        deadline: Deadline,
290    ) -> anyhow::Result<UncommittedResult<Changes>> {
291        self.produce_and_execute::<TxSource, _, Deadline>(
292            height,
293            block_time,
294            |gas_price, height| self.txpool.get_source(gas_price, height),
295            deadline,
296        )
297        .await
298    }
299}
300
301impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
302    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
303where
304    ViewProvider: AtomicView + 'static,
305    ViewProvider::LatestView: BlockProducerDatabase,
306    Executor: ports::BlockProducer<Vec<Transaction>, Deadline = ()> + 'static,
307    GasPriceProvider: GasPriceProviderConstraint,
308    ChainStateProvider: ChainStateInfoProvider,
309{
310    /// Produces and execute block for the specified height with `transactions`.
311    pub async fn produce_and_execute_block_transactions(
312        &self,
313        height: BlockHeight,
314        block_time: Tai64,
315        transactions: Vec<Transaction>,
316    ) -> anyhow::Result<UncommittedResult<Changes>> {
317        self.produce_and_execute(
318            height,
319            block_time,
320            |_, _| async { Ok(transactions) },
321            (),
322        )
323        .await
324    }
325}
326
327impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
328    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
329where
330    ViewProvider: AtomicView + 'static,
331    ViewProvider::LatestView: BlockProducerDatabase,
332    Executor: ports::DryRunner + 'static,
333    GasPriceProvider: GasPriceProviderConstraint,
334    ChainStateProvider: ChainStateInfoProvider,
335{
336    /// Simulates multiple transactions without altering any state. Does not acquire the production lock.
337    /// since it is basically a "read only" operation and shouldn't get in the way of normal
338    /// production.
339    pub async fn dry_run(
340        &self,
341        transactions: Vec<Transaction>,
342        height: Option<BlockHeight>,
343        time: Option<Tai64>,
344        utxo_validation: Option<bool>,
345        gas_price: Option<u64>,
346        record_storage_reads: bool,
347    ) -> anyhow::Result<DryRunResult> {
348        let view = self.view_provider.latest_view()?;
349        let latest_height = view.latest_height().unwrap_or_default();
350
351        let simulated_height = height.unwrap_or_else(|| {
352            latest_height
353                .succ()
354                .expect("It is impossible to overflow the current block height")
355        });
356
357        let simulated_time = time.unwrap_or_else(|| {
358            view.get_block(&latest_height)
359                .map(|block| block.header().time())
360                .unwrap_or(Tai64::UNIX_EPOCH)
361        });
362
363        let header = self.new_header(simulated_height, simulated_time, &view)?;
364
365        let gas_price = if let Some(inner) = gas_price {
366            inner
367        } else {
368            self.dry_run_gas_price().await?
369        };
370
371        // The dry run execution should use the state of the blockchain based on the
372        // last available block, not on the upcoming one. It means that we need to
373        // use the same configuration as the last block -> the same DA height.
374        // It is deterministic from the result perspective, plus it is more performant
375        // because we don't need to wait for the relayer to sync.
376        let component = Components {
377            header_to_produce: header,
378            transactions_source: transactions.clone(),
379            coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(),
380            gas_price,
381        };
382
383        let executor = self.executor.clone();
384
385        // use the blocking threadpool for dry_run to avoid clogging up the main async runtime
386        let result = tokio_rayon::spawn_fifo(move || {
387            executor.dry_run(component, utxo_validation, height, record_storage_reads)
388        })
389        .await?;
390
391        if result.transactions.iter().any(|(transaction, tx_status)| {
392            transaction.is_script() && tx_status.result.receipts().is_empty()
393        }) {
394            Err(anyhow!("Expected at least one set of receipts"))
395        } else {
396            Ok(result)
397        }
398    }
399}
400
401impl<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
402    Producer<ViewProvider, TxPool, Executor, GasPriceProvider, ChainStateProvider>
403where
404    ViewProvider: HistoricalView + 'static,
405    ViewProvider::LatestView: BlockProducerDatabase,
406    Executor: ports::StorageReadReplayRecorder + 'static,
407    GasPriceProvider: GasPriceProviderConstraint,
408    ChainStateProvider: ChainStateInfoProvider,
409{
410    /// Re-executes an old block, getting the storage read events.
411    pub async fn storage_read_replay(
412        &self,
413        height: BlockHeight,
414    ) -> anyhow::Result<Vec<StorageReadReplayEvent>> {
415        let view = self.view_provider.latest_view()?;
416
417        let executor = self.executor.clone();
418
419        // use the blocking threadpool to avoid clogging up the main async runtime
420        tokio_rayon::spawn_fifo(move || {
421            let block = view.get_full_block(&height)?;
422            Ok(executor.storage_read_replay(&block)?)
423        })
424        .await
425    }
426}
427
428pub const NO_NEW_DA_HEIGHT_FOUND: &str = "No new da_height found";
429
430impl<ViewProvider, TxPool, Executor, GP, ChainStateProvider>
431    Producer<ViewProvider, TxPool, Executor, GP, ChainStateProvider>
432where
433    ViewProvider: AtomicView + 'static,
434    ViewProvider::LatestView: BlockProducerDatabase,
435    ChainStateProvider: ChainStateInfoProvider,
436{
437    /// Create the header for a new block at the provided height
438    async fn new_header_with_new_da_height(
439        &self,
440        height: BlockHeight,
441        block_time: Tai64,
442        view: &ViewProvider::LatestView,
443    ) -> anyhow::Result<PartialBlockHeader> {
444        let mut block_header = self.new_header(height, block_time, view)?;
445        let previous_da_height = block_header.da_height;
446        let gas_limit = self
447            .chain_state_info_provider
448            .consensus_params_at_version(&block_header.consensus_parameters_version)?
449            .block_gas_limit();
450        // We have a hard limit of u16::MAX transactions per block, including the final mint transactions.
451        // Therefore we choose the `new_da_height` to never include more than u16::MAX - 1 transactions in a block.
452        let new_da_height = self
453            .select_new_da_height(gas_limit, previous_da_height, u16::MAX - 1)
454            .await?;
455
456        block_header.application.da_height = new_da_height;
457
458        Ok(block_header)
459    }
460    /// Create the header for a new block at the provided height
461    fn new_header_with_da_height(
462        &self,
463        height: BlockHeight,
464        block_time: Tai64,
465        da_height: DaBlockHeight,
466        view: &ViewProvider::LatestView,
467    ) -> anyhow::Result<PartialBlockHeader> {
468        let mut block_header = self.new_header(height, block_time, view)?;
469        block_header.application.da_height = da_height;
470        Ok(block_header)
471    }
472
473    async fn select_new_da_height(
474        &self,
475        gas_limit: u64,
476        previous_da_height: DaBlockHeight,
477        transactions_limit: u16,
478    ) -> anyhow::Result<DaBlockHeight> {
479        let mut new_best = previous_da_height;
480        let mut total_cost: u64 = 0;
481        let transactions_limit: u64 = transactions_limit as u64;
482        let mut total_transactions: u64 = 0;
483        let highest = self
484            .relayer
485            .wait_for_at_least_height(&previous_da_height)
486            .await?;
487        if highest < previous_da_height {
488            return Err(Error::InvalidDaFinalizationState {
489                best: highest,
490                previous_block: previous_da_height,
491            }
492            .into());
493        }
494
495        if highest == previous_da_height {
496            return Ok(highest);
497        }
498
499        let next_da_height = previous_da_height.saturating_add(1);
500        for height in next_da_height..=highest.0 {
501            let RelayerBlockInfo { gas_cost, tx_count } = self
502                .relayer
503                .get_cost_and_transactions_number_for_block(&DaBlockHeight(height))
504                .await?;
505            total_cost = total_cost.saturating_add(gas_cost);
506            total_transactions = total_transactions.saturating_add(tx_count);
507            if total_cost > gas_limit || total_transactions > transactions_limit {
508                break;
509            }
510
511            new_best = DaBlockHeight(height);
512        }
513
514        if new_best == previous_da_height {
515            Err(anyhow!(NO_NEW_DA_HEIGHT_FOUND))
516        } else {
517            Ok(new_best)
518        }
519    }
520
521    fn new_header(
522        &self,
523        height: BlockHeight,
524        block_time: Tai64,
525        view: &ViewProvider::LatestView,
526    ) -> anyhow::Result<PartialBlockHeader> {
527        let previous_block_info = self.previous_block_info(height, view)?;
528        let consensus_parameters_version = view.latest_consensus_parameters_version()?;
529        let state_transition_bytecode_version =
530            view.latest_state_transition_bytecode_version()?;
531
532        Ok(PartialBlockHeader {
533            application: ApplicationHeader {
534                da_height: previous_block_info.da_height,
535                consensus_parameters_version,
536                state_transition_bytecode_version,
537                generated: Default::default(),
538            },
539            consensus: ConsensusHeader {
540                prev_root: previous_block_info.prev_root,
541                height,
542                time: block_time,
543                generated: Default::default(),
544            },
545        })
546    }
547
548    fn previous_block_info(
549        &self,
550        height: BlockHeight,
551        view: &ViewProvider::LatestView,
552    ) -> anyhow::Result<PreviousBlockInfo> {
553        let latest_height = view.latest_height().ok_or(Error::NoGenesisBlock)?;
554
555        // get info from previous block height
556        let prev_height =
557            height
558                .pred()
559                .ok_or(Error::BlockHeightShouldBeHigherThanPrevious {
560                    height: 0u32.into(),
561                    previous_block: latest_height,
562                })?;
563        let previous_block = view.get_block(&prev_height)?;
564        let prev_root = view.block_header_merkle_root(&prev_height)?;
565
566        Ok(PreviousBlockInfo {
567            prev_root,
568            da_height: previous_block.header().da_height(),
569        })
570    }
571}
572
573struct PreviousBlockInfo {
574    prev_root: Bytes32,
575    da_height: DaBlockHeight,
576}