Skip to main content

fuel_core/service/adapters/
block_importer.rs

1use crate::{
2    database::{
3        Database,
4        commit_changes_with_height_update,
5    },
6    service::adapters::{
7        BlockImporterAdapter,
8        ExecutorAdapter,
9        VerifierAdapter,
10        consensus_module::poa::RedisLeaderLeaseAdapter,
11    },
12};
13use fuel_core_importer::{
14    Config,
15    Importer,
16    ports::{
17        BlockVerifier,
18        ImporterDatabase,
19        Validator,
20    },
21};
22use fuel_core_storage::{
23    MerkleRoot,
24    Result as StorageResult,
25    StorageAsRef,
26    iter::{
27        IterDirection,
28        IteratorOverTable,
29    },
30    tables::{
31        FuelBlocks,
32        merkle::{
33            DenseMetadataKey,
34            FuelBlockMerkleMetadata,
35        },
36    },
37    transactional::{
38        Changes,
39        StorageChanges,
40    },
41};
42use fuel_core_txpool::ports::{
43    WasmChecker,
44    WasmValidityError,
45};
46use fuel_core_types::{
47    blockchain::{
48        SealedBlock,
49        block::Block,
50        consensus::Consensus,
51    },
52    fuel_tx::Bytes32,
53    fuel_types::{
54        BlockHeight,
55        ChainId,
56    },
57    services::executor::{
58        Result as ExecutorResult,
59        UncommittedValidationResult,
60    },
61};
62use itertools::Itertools;
63use std::sync::Arc;
64
65#[allow(clippy::large_enum_variant)]
66pub enum BlockReconciliationWriteAdapter {
67    Redis(RedisLeaderLeaseAdapter),
68    Noop(NoopBlockReconciliationWriteAdapter),
69}
70
71impl BlockImporterAdapter {
72    pub fn new(
73        chain_id: ChainId,
74        config: Config,
75        database: Database,
76        executor: ExecutorAdapter,
77        verifier: VerifierAdapter,
78        block_reconciliation_write_adapter: BlockReconciliationWriteAdapter,
79    ) -> Self {
80        let database_for_height = database.clone();
81        let importer = Importer::new(
82            chain_id,
83            config,
84            database,
85            executor,
86            verifier,
87            block_reconciliation_write_adapter,
88        );
89        Self {
90            block_importer: Arc::new(importer),
91            database: database_for_height,
92        }
93    }
94
95    pub async fn execute_and_commit(
96        &self,
97        sealed_block: SealedBlock,
98    ) -> anyhow::Result<()> {
99        self.block_importer.execute_and_commit(sealed_block).await?;
100        Ok(())
101    }
102}
103
104impl BlockVerifier for VerifierAdapter {
105    fn verify_block_fields(
106        &self,
107        consensus: &Consensus,
108        block: &Block,
109    ) -> anyhow::Result<()> {
110        self.block_verifier.verify_block_fields(consensus, block)
111    }
112}
113
114#[derive(Default)]
115pub struct NoopBlockReconciliationWriteAdapter;
116
117impl fuel_core_importer::ports::BlockReconciliationWritePort
118    for BlockReconciliationWriteAdapter
119{
120    fn publish_produced_block(&self, block: &SealedBlock) -> anyhow::Result<()> {
121        match self {
122            Self::Redis(adapter) => {
123                fuel_core_importer::ports::BlockReconciliationWritePort::publish_produced_block(adapter, block)
124            }
125            Self::Noop(adapter) => {
126                fuel_core_importer::ports::BlockReconciliationWritePort::publish_produced_block(adapter, block)
127            }
128        }
129    }
130}
131
132impl fuel_core_importer::ports::BlockReconciliationWritePort
133    for NoopBlockReconciliationWriteAdapter
134{
135    fn publish_produced_block(&self, _block: &SealedBlock) -> anyhow::Result<()> {
136        Ok(())
137    }
138}
139
140impl ImporterDatabase for Database {
141    fn latest_block_height(&self) -> StorageResult<Option<BlockHeight>> {
142        self.iter_all_keys::<FuelBlocks>(Some(IterDirection::Reverse))
143            .next()
144            .transpose()
145    }
146
147    fn latest_block_root(&self) -> StorageResult<Option<MerkleRoot>> {
148        Ok(self
149            .storage_as_ref::<FuelBlockMerkleMetadata>()
150            .get(&DenseMetadataKey::Latest)?
151            .map(|cow| *cow.root()))
152    }
153
154    fn commit_changes(&mut self, changes: StorageChanges) -> StorageResult<()> {
155        commit_changes_with_height_update(self, changes, |iter| {
156            iter.iter_all_keys::<FuelBlocks>(Some(IterDirection::Reverse))
157                .try_collect()
158        })
159    }
160}
161
162impl Validator for ExecutorAdapter {
163    fn validate(
164        &self,
165        block: &Block,
166    ) -> ExecutorResult<UncommittedValidationResult<Changes>> {
167        self.executor.validate(block)
168    }
169}
170
171#[cfg(feature = "wasm-executor")]
172impl WasmChecker for ExecutorAdapter {
173    fn validate_uploaded_wasm(
174        &self,
175        wasm_root: &Bytes32,
176    ) -> Result<(), WasmValidityError> {
177        self.executor
178            .validate_uploaded_wasm(wasm_root)
179            .map_err(|err| match err {
180                fuel_core_upgradable_executor::error::UpgradableError::InvalidWasm(_) => {
181                    WasmValidityError::NotValid
182                }
183                _ => WasmValidityError::NotFound,
184            })
185    }
186}
187
188#[cfg(not(feature = "wasm-executor"))]
189impl WasmChecker for ExecutorAdapter {
190    fn validate_uploaded_wasm(
191        &self,
192        _wasm_root: &Bytes32,
193    ) -> Result<(), WasmValidityError> {
194        Err(WasmValidityError::NotEnabled)
195    }
196}