Skip to main content

ethrex_storage/
store.rs

1#[cfg(feature = "rocksdb")]
2use crate::backend::rocksdb::RocksDBBackend;
3use crate::{
4    STORE_METADATA_FILENAME, STORE_SCHEMA_VERSION,
5    api::{
6        StorageBackend, StorageReadView,
7        tables::{
8            ACCOUNT_CODE_METADATA, ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES,
9            BLOCK_ACCESS_LISTS, BLOCK_NUMBERS, BODIES, CANONICAL_BLOCK_HASHES, CHAIN_DATA,
10            EXECUTION_WITNESSES, FULLSYNC_HEADERS, HEADERS, INVALID_CHAINS, MISC_VALUES,
11            PENDING_BLOCKS, RECEIPTS_V2, SNAP_STATE, STORAGE_FLATKEYVALUE, STORAGE_TRIE_NODES,
12            TRANSACTION_LOCATIONS,
13        },
14    },
15    apply_prefix,
16    backend::in_memory::InMemoryBackend,
17    error::StoreError,
18    layering::{TrieLayerCache, TrieWrapper},
19    rlp::{BlockBodyRLP, BlockHeaderRLP, BlockRLP},
20    trie::{BackendTrieDB, BackendTrieDBLocked},
21    utils::{ChainDataIndex, SnapStateIndex},
22};
23
24use ethrex_common::{
25    Address, H256, U256,
26    types::{
27        AccountInfo, AccountState, AccountUpdate, Block, BlockBody, BlockHash, BlockHeader,
28        BlockNumber, ChainConfig, Code, CodeMetadata, ForkId, Genesis, GenesisAccount, Index,
29        Receipt, Transaction,
30        block_access_list::BlockAccessList,
31        block_execution_witness::{ExecutionWitness, RpcExecutionWitness},
32    },
33    utils::keccak,
34};
35use ethrex_crypto::{NativeCrypto, keccak::keccak_hash};
36use ethrex_rlp::{
37    decode::{RLPDecode, decode_bytes},
38    encode::RLPEncode,
39};
40use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitness};
41use ethrex_trie::{Node, NodeRLP};
42use lru::LruCache;
43use rustc_hash::FxBuildHasher;
44use serde::{Deserialize, Serialize};
45use std::{
46    collections::{BTreeMap, HashMap, hash_map::Entry},
47    fmt::Debug,
48    io::Write,
49    path::{Path, PathBuf},
50    sync::{
51        Arc, Mutex, RwLock,
52        mpsc::{SyncSender, TryRecvError, sync_channel},
53    },
54    thread::JoinHandle,
55};
56#[cfg(feature = "rocksdb")]
57use tracing::warn;
58use tracing::{debug, error, info};
59
60/// Maximum number of execution witnesses to keep in the database
61pub const MAX_WITNESSES: u64 = 128;
62
63// We use one constant for in-memory and another for on-disk backends.
64// This is due to tests requiring state older than 128 blocks.
65// TODO: unify these
66#[allow(unused)]
67const DB_COMMIT_THRESHOLD: usize = 128;
68const IN_MEMORY_COMMIT_THRESHOLD: usize = 10000;
69
70/// Commit threshold for batch (full sync) mode. Each batch layer holds ~1024
71/// blocks of trie diffs (~1 GB), so we flush aggressively to bound memory.
72const BATCH_COMMIT_THRESHOLD: usize = 4;
73
74/// Default size in bytes of the RocksDB shared block cache: 12 GiB.
75///
76/// This cache holds both data blocks AND the index/bloom-filter blocks for every
77/// open SST file (because we enable `cache_index_and_filter_blocks`), so its size
78/// is the effective upper bound on RocksDB's resident memory footprint. 12 GiB
79/// keeps the filter/index working set resident plus hot EVM state; a sweep on a
80/// synced mainnet node (32 GiB cap) found 8-16 GiB all keep up with head-following,
81/// with larger giving no gain (the OS page cache backstops the uncompressed state
82/// CFs) and ~8 GiB the floor where the filter set starts to thrash.
83pub const DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES: usize = 12 * 1024 * 1024 * 1024;
84
85/// Tunable configuration for [`Store::new_with_config`] and related constructors.
86///
87/// Use [`StoreConfig::default()`] for production-tuned defaults; callers that
88/// don't need to override anything should keep calling [`Store::new`] directly.
89#[derive(Debug, Clone, Copy)]
90pub struct StoreConfig {
91    /// Total size in bytes of the RocksDB shared block cache. With
92    /// `cache_index_and_filter_blocks` enabled (the ethrex default), this cache
93    /// stores data blocks AND the per-SST index + bloom-filter blocks, so it is
94    /// the effective ceiling on RocksDB's resident memory. Lowering it below
95    /// the filter + working-set size degrades block-import performance —
96    /// see the `--rocksdb.block-cache-size` CLI flag for guidance.
97    ///
98    /// Ignored when the engine type is in-memory.
99    pub rocksdb_block_cache_size: usize,
100}
101
102impl Default for StoreConfig {
103    fn default() -> Self {
104        Self {
105            rocksdb_block_cache_size: DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES,
106        }
107    }
108}
109
110/// Control messages for the FlatKeyValue generator
111#[derive(Debug, PartialEq)]
112enum FKVGeneratorControlMessage {
113    Stop,
114    Continue,
115}
116
117// 64mb
118const CODE_CACHE_MAX_SIZE: u64 = 64 * 1024 * 1024;
119
120#[derive(Debug)]
121struct CodeCache {
122    inner_cache: LruCache<H256, Code, FxBuildHasher>,
123    cache_size: u64,
124}
125
126impl Default for CodeCache {
127    fn default() -> Self {
128        Self {
129            inner_cache: LruCache::unbounded_with_hasher(FxBuildHasher),
130            cache_size: 0,
131        }
132    }
133}
134
135impl CodeCache {
136    fn get(&mut self, code_hash: &H256) -> Result<Option<Code>, StoreError> {
137        Ok(self.inner_cache.get(code_hash).cloned())
138    }
139
140    fn insert(&mut self, code: &Code) -> Result<(), StoreError> {
141        let code_size = code.size();
142        let cache_len = self.inner_cache.len() + 1;
143        self.cache_size += code_size as u64;
144        let current_size = self.cache_size;
145        debug!(
146            "[ACCOUNT CODE CACHE] cache elements (): {cache_len}, total size: {current_size} bytes"
147        );
148
149        while self.cache_size > CODE_CACHE_MAX_SIZE {
150            if let Some((_, code)) = self.inner_cache.pop_lru() {
151                self.cache_size -= code.size() as u64;
152            } else {
153                break;
154            }
155        }
156
157        self.inner_cache.get_or_insert(code.hash, || code.clone());
158        Ok(())
159    }
160}
161
162/// Main storage interface for the ethrex client.
163///
164/// The `Store` provides a high-level API for all blockchain data operations:
165/// - Block storage and retrieval
166/// - State trie management
167/// - Account and storage queries
168/// - Transaction indexing
169///
170/// # Thread Safety
171///
172/// `Store` is `Clone` and thread-safe. All clones share the same underlying
173/// database connection and caches via `Arc`.
174///
175/// # Caching
176///
177/// The store maintains several caches for performance:
178/// - **Trie Layer Cache**: Recent trie nodes for fast state access
179/// - **Code Cache**: LRU cache for contract bytecode (64MB default)
180/// - **Latest Block Cache**: Cached latest block header for RPC
181///
182/// # Example
183///
184/// ```ignore
185/// let store = Store::new("./data", EngineType::RocksDB)?;
186///
187/// // Add a block
188/// store.add_block(block).await?;
189///
190/// // Query account balance
191/// let info = store.get_account_info(block_number, address)?;
192/// let balance = info.map(|a| a.balance).unwrap_or_default();
193/// ```
194#[derive(Debug, Clone)]
195pub struct Store {
196    /// Path to the database directory.
197    db_path: PathBuf,
198    /// Storage backend (InMemory or RocksDB).
199    backend: Arc<dyn StorageBackend>,
200    /// Chain configuration (fork schedule, chain ID, etc.).
201    chain_config: ChainConfig,
202    /// Cache for trie nodes from recent blocks.
203    trie_cache: Arc<RwLock<Arc<TrieLayerCache>>>,
204    /// Channel for controlling the FlatKeyValue generator background task.
205    flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
206    /// Channel for sending trie updates (and idle pings) to the background worker.
207    trie_update_worker_tx: std::sync::mpsc::SyncSender<TrieMessage>,
208    /// Cached latest canonical block header.
209    ///
210    /// Wrapped in Arc for cheap reads with infrequent writes.
211    /// May be slightly out of date, which is acceptable for:
212    /// - Caching frequently requested headers
213    /// - RPC "latest" block queries (small delay acceptable)
214    /// - Sync operations (must be idempotent anyway)
215    latest_block_header: LatestBlockHeaderCache,
216    /// Last computed FlatKeyValue for incremental updates.
217    last_computed_flatkeyvalue: Arc<RwLock<Vec<u8>>>,
218
219    /// Cache for account bytecodes, keyed by the bytecode hash.
220    /// Note that we don't remove entries on account code changes, since
221    /// those changes already affect the code hash stored in the account, and only
222    /// may result in this cache having useless data.
223    account_code_cache: Arc<Mutex<CodeCache>>,
224
225    /// Cache for code metadata (code length), keyed by the bytecode hash.
226    /// Uses FxHashMap for efficient lookups, much smaller than code cache.
227    code_metadata_cache: Arc<Mutex<rustc_hash::FxHashMap<H256, CodeMetadata>>>,
228
229    /// Serializes concurrent `forkchoice_update` callers so that the cache
230    /// update and the DB write transaction remain mutually ordered.
231    fcu_lock: Arc<tokio::sync::Mutex<()>>,
232
233    background_threads: Arc<ThreadList>,
234}
235
236#[derive(Debug, Default)]
237struct ThreadList {
238    list: Vec<JoinHandle<()>>,
239}
240
241impl Drop for ThreadList {
242    fn drop(&mut self) {
243        for handle in self.list.drain(..) {
244            let _ = handle.join();
245        }
246    }
247}
248
249/// Storage trie nodes grouped by account address hash.
250///
251/// Each entry contains the hashed account address and the trie nodes
252/// for that account's storage trie.
253pub type StorageTrieNodes = Vec<(H256, Vec<(Nibbles, Vec<u8>)>)>;
254type StorageTries = HashMap<Address, (TrieWitness, Trie)>;
255
256/// Storage backend type selection.
257///
258/// Used when creating a new [`Store`] to specify which backend to use.
259#[derive(Debug, Clone, Copy, PartialEq, Eq)]
260pub enum EngineType {
261    /// In-memory storage, non-persistent. Suitable for testing.
262    InMemory,
263    /// RocksDB storage, persistent. Suitable for production.
264    #[cfg(feature = "rocksdb")]
265    RocksDB,
266}
267
268/// Batch of updates to apply to the store atomically.
269///
270/// Used during block execution to collect all state changes before
271/// committing them to the database in a single transaction.
272pub struct UpdateBatch {
273    /// New nodes to add to the state trie.
274    pub account_updates: Vec<TrieNode>,
275    /// Storage trie updates per account (keyed by hashed address).
276    pub storage_updates: Vec<(H256, Vec<TrieNode>)>,
277    /// Blocks to store.
278    pub blocks: Vec<Block>,
279    /// Receipts to store, grouped by block hash.
280    pub receipts: Vec<(H256, Vec<Receipt>)>,
281    /// Contract code updates (code hash -> bytecode).
282    pub code_updates: Vec<(H256, Code)>,
283    /// Whether this batch comes from full sync (batch execution mode).
284    /// When true, uses `BATCH_COMMIT_THRESHOLD` (aggressive) instead of
285    /// `DB_COMMIT_THRESHOLD` to bound memory during bulk block import.
286    pub batch_mode: bool,
287}
288
289/// Storage trie updates grouped by account address hash.
290pub type StorageUpdates = Vec<(H256, Vec<(Nibbles, Vec<u8>)>)>;
291
292/// Collection of account state changes from block execution.
293///
294/// Contains all the data needed to update the state trie after
295/// executing a block: account updates, storage updates, and code deployments.
296pub struct AccountUpdatesList {
297    /// Root hash of the state trie after applying these updates.
298    pub state_trie_hash: H256,
299    /// State trie node updates (path -> RLP-encoded node).
300    pub state_updates: Vec<(Nibbles, Vec<u8>)>,
301    /// Storage trie updates per account.
302    pub storage_updates: StorageUpdates,
303    /// New contract bytecode deployments.
304    pub code_updates: Vec<(H256, Code)>,
305}
306
307/// Encodes a tx-location entry as the operand passed to `merge_cf`.
308///
309/// The operand uses the **same encoding as the stored value** — a
310/// `Vec<(BlockNumber, BlockHash, Index)>` with a single element. This is
311/// required for an *associative* merge operator: RocksDB folds operands
312/// together with PartialMerge (during compaction, without a base value), and
313/// the result becomes an operand for a later merge. If the operand format
314/// differed from the merge output (e.g. operand = bare tuple, output = Vec),
315/// the re-fed result would fail to decode and entries would be silently
316/// dropped. Keeping both as `Vec` makes the merge truly associative.
317pub(crate) fn encode_tx_location_operand(
318    block_number: BlockNumber,
319    block_hash: BlockHash,
320    index: Index,
321) -> Vec<u8> {
322    vec![(block_number, block_hash, index)].encode_to_vec()
323}
324
325/// Merge function for the `TRANSACTION_LOCATIONS` column family.
326///
327/// The CF is keyed by tx hash and stores `Vec<(BlockNumber, BlockHash, Index)>` —
328/// one entry per block the tx appears in (multiple only on reorgs). Both the
329/// stored value and every merge operand are encoded as this same `Vec` type.
330/// Writers call `merge_cf` with a single-element `Vec` operand per tx; RocksDB
331/// invokes this function on `get_cf` and during compaction to fold the base
332/// value plus pending operands into one `Vec`. Within the fold, a later entry
333/// replaces any earlier entry with the same `block_hash`, preserving the dedupe
334/// semantics of the previous composite-key schema.
335///
336/// Associativity: because input operands and the output are the same `Vec`
337/// format, PartialMerge (operand-only folding) produces a valid operand that
338/// can be re-merged later. This is essential for correctness — see
339/// `encode_tx_location_operand`.
340///
341/// Used by:
342/// - The RocksDB backend, via `set_merge_operator_associative`.
343/// - The InMemory backend, which dispatches by table name and applies this
344///   inline at `merge()` call time.
345///
346/// Why merge instead of read-modify-write: a per-tx `get_cf` on the write path
347/// is expensive (~5–20 ms/block on mainnet, dominated by the per-tx point
348/// lookup, not fully fixable by a bloom filter). With merge, the write path is
349/// a pure `merge_cf` append (no read), and consolidation is deferred to
350/// compaction or the next read. As a bonus the merge is atomic at the RocksDB
351/// level — no serialized-writer assumption is needed at the application layer.
352///
353/// Failure mode: any RLP decode error (base value or operand) returns `None`,
354/// which makes RocksDB treat the merge as failed and surface a corruption error
355/// on the affected key rather than silently dropping locations. We prefer
356/// failing loud — a corrupt operand signals real DB damage, and a silently
357/// half-populated `Vec` committed at the next compaction would be undetectable.
358pub fn tx_locations_merge(
359    existing: Option<&[u8]>,
360    operands: impl IntoIterator<Item = impl AsRef<[u8]>>,
361) -> Option<Vec<u8>> {
362    // Fold one RLP-encoded `Vec` chunk into `list`, deduping by block_hash
363    // (later entry wins). Returns false on decode failure so the caller can
364    // abort the whole merge.
365    fn fold_chunk(
366        list: &mut Vec<(BlockNumber, BlockHash, Index)>,
367        bytes: &[u8],
368        what: &str,
369    ) -> bool {
370        match <Vec<(BlockNumber, BlockHash, Index)>>::decode(bytes) {
371            Ok(entries) => {
372                for (bn, bh, idx) in entries {
373                    list.retain(|(_, existing_bh, _)| *existing_bh != bh);
374                    list.push((bn, bh, idx));
375                }
376                true
377            }
378            Err(e) => {
379                error!(
380                    "tx_locations_merge: failed to decode {what} ({} bytes): {e}; \
381                     aborting merge to avoid silent data loss",
382                    bytes.len()
383                );
384                false
385            }
386        }
387    }
388
389    let mut list: Vec<(BlockNumber, BlockHash, Index)> = Vec::new();
390
391    // Order matters: RocksDB delivers operands oldest-first.
392    if let Some(bytes) = existing
393        && !fold_chunk(&mut list, bytes, "existing value")
394    {
395        return None;
396    }
397    for op in operands {
398        if !fold_chunk(&mut list, op.as_ref(), "operand") {
399            return None;
400        }
401    }
402    Some(list.encode_to_vec())
403}
404
405impl Store {
406    /// Block until the trie-update background worker has drained every prior
407    /// message and is waiting for new work — i.e. Phase 2 (disk write of the
408    /// bottom-most diff layer) and Phase 3 (in-memory layer removal) for all
409    /// previously-applied updates have completed.
410    ///
411    /// Implementation: the worker channel is `sync_channel(0)`, so a send only
412    /// returns once the worker calls `recv()` on the next loop iteration.
413    /// `TrieMessage::Ping` carries no work, so the send completing is itself
414    /// the idle signal.
415    ///
416    /// Caller's responsibility: hold off other senders to `trie_update_worker_tx`
417    /// while this is in flight. Under concurrent producers the rendezvous
418    /// guarantee degrades to "the prior message has been drained", not
419    /// "persistence is idle going forward" — a racing `Update` from another
420    /// thread can be in-flight by the time this returns.
421    pub async fn wait_for_persistence_idle(&self) -> Result<(), StoreError> {
422        let tx = self.trie_update_worker_tx.clone();
423        tokio::task::spawn_blocking(move || tx.send(TrieMessage::Ping))
424            .await
425            .map_err(|e| StoreError::Custom(format!("wait_for_persistence_idle join: {e}")))?
426            .map_err(|e| StoreError::Custom(format!("wait_for_persistence_idle send: {e}")))
427    }
428
429    /// Add a block in a single transaction.
430    /// This will store -> BlockHeader, BlockBody, BlockTransactions, BlockNumber.
431    pub async fn add_block(&self, block: Block) -> Result<(), StoreError> {
432        self.add_blocks(vec![block]).await
433    }
434
435    /// Add a batch of blocks in a single transaction.
436    /// This will store -> BlockHeader, BlockBody, BlockTransactions, BlockNumber.
437    pub async fn add_blocks(&self, blocks: Vec<Block>) -> Result<(), StoreError> {
438        let db = self.backend.clone();
439        tokio::task::spawn_blocking(move || {
440            let mut tx = db.begin_write()?;
441
442            // TODO: Same logic in apply_updates
443            for block in blocks {
444                let block_number = block.header.number;
445                let block_hash = block.hash();
446                let hash_key = block_hash.encode_to_vec();
447
448                let header_value_rlp = BlockHeaderRLP::from(block.header.clone());
449                tx.put(HEADERS, &hash_key, header_value_rlp.bytes())?;
450
451                let body_value = BlockBodyRLP::from_bytes(block.body.encode_to_vec());
452                tx.put(BODIES, &hash_key, body_value.bytes())?;
453
454                tx.put(BLOCK_NUMBERS, &hash_key, &block_number.to_le_bytes())?;
455
456                for (index, transaction) in block.body.transactions.iter().enumerate() {
457                    tx.merge(
458                        TRANSACTION_LOCATIONS,
459                        transaction.hash().as_bytes(),
460                        &encode_tx_location_operand(block_number, block_hash, index as u64),
461                    )?;
462                }
463            }
464
465            tx.commit()
466        })
467        .await
468        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
469    }
470
471    /// Add block header
472    pub async fn add_block_header(
473        &self,
474        block_hash: BlockHash,
475        block_header: BlockHeader,
476    ) -> Result<(), StoreError> {
477        let hash_key = block_hash.encode_to_vec();
478        let header_value = BlockHeaderRLP::from(block_header).into_vec();
479        self.write_async(HEADERS, hash_key, header_value).await
480    }
481
482    /// Add a batch of block headers
483    pub async fn add_block_headers(
484        &self,
485        block_headers: Vec<BlockHeader>,
486    ) -> Result<(), StoreError> {
487        let mut txn = self.backend.begin_write()?;
488
489        for header in block_headers {
490            let block_hash = header.hash();
491            let block_number = header.number;
492            let hash_key = block_hash.encode_to_vec();
493            let header_value = BlockHeaderRLP::from(header).into_vec();
494
495            txn.put(HEADERS, &hash_key, &header_value)?;
496
497            let number_key = block_number.to_le_bytes().to_vec();
498            txn.put(BLOCK_NUMBERS, &hash_key, &number_key)?;
499        }
500        txn.commit()?;
501        Ok(())
502    }
503
504    /// Obtain canonical block header
505    pub fn get_block_header(
506        &self,
507        block_number: BlockNumber,
508    ) -> Result<Option<BlockHeader>, StoreError> {
509        let latest = self.latest_block_header.get();
510        if block_number == latest.number {
511            return Ok(Some((*latest).clone()));
512        }
513        self.load_block_header(block_number)
514    }
515
516    /// Add block body
517    pub async fn add_block_body(
518        &self,
519        block_hash: BlockHash,
520        block_body: BlockBody,
521    ) -> Result<(), StoreError> {
522        let hash_key = block_hash.encode_to_vec();
523        let body_value = BlockBodyRLP::from(block_body).into_vec();
524        self.write_async(BODIES, hash_key, body_value).await
525    }
526
527    /// Obtain canonical block body
528    pub async fn get_block_body(
529        &self,
530        block_number: BlockNumber,
531    ) -> Result<Option<BlockBody>, StoreError> {
532        let Some(block_hash) = self.get_canonical_block_hash_sync(block_number)? else {
533            return Ok(None);
534        };
535
536        self.get_block_body_by_hash(block_hash).await
537    }
538
539    /// Remove canonical block
540    pub async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
541        let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else {
542            return Ok(());
543        };
544
545        let backend = self.backend.clone();
546        tokio::task::spawn_blocking(move || {
547            let hash_key = hash.encode_to_vec();
548
549            let mut txn = backend.begin_write()?;
550            txn.delete(
551                CANONICAL_BLOCK_HASHES,
552                block_number.to_le_bytes().as_slice(),
553            )?;
554            txn.delete(BODIES, &hash_key)?;
555            txn.delete(HEADERS, &hash_key)?;
556            txn.delete(BLOCK_NUMBERS, &hash_key)?;
557            txn.commit()
558        })
559        .await
560        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
561    }
562
563    /// Obtain canonical block bodies in from..=to
564    pub async fn get_block_bodies(
565        &self,
566        from: BlockNumber,
567        to: BlockNumber,
568    ) -> Result<Vec<Option<BlockBody>>, StoreError> {
569        // TODO: Implement read bulk
570        let backend = self.backend.clone();
571        tokio::task::spawn_blocking(move || {
572            let numbers: Vec<BlockNumber> = (from..=to).collect();
573            let mut block_bodies = Vec::new();
574
575            let txn = backend.begin_read()?;
576            for number in numbers {
577                let Some(hash) = txn
578                    .get(CANONICAL_BLOCK_HASHES, number.to_le_bytes().as_slice())?
579                    .map(|bytes| H256::decode(bytes.as_slice()))
580                    .transpose()?
581                else {
582                    block_bodies.push(None);
583                    continue;
584                };
585                let hash_key = hash.encode_to_vec();
586                let block_body_opt = txn
587                    .get(BODIES, &hash_key)?
588                    .map(|bytes| BlockBodyRLP::from_bytes(bytes).to())
589                    .transpose()
590                    .map_err(StoreError::from)?;
591
592                block_bodies.push(block_body_opt);
593            }
594
595            Ok(block_bodies)
596        })
597        .await
598        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
599    }
600
601    /// Obtain block bodies from a list of hashes
602    pub async fn get_block_bodies_by_hash(
603        &self,
604        hashes: Vec<BlockHash>,
605    ) -> Result<Vec<BlockBody>, StoreError> {
606        let backend = self.backend.clone();
607        // TODO: Implement read bulk
608        tokio::task::spawn_blocking(move || {
609            let txn = backend.begin_read()?;
610            let mut block_bodies = Vec::new();
611            for hash in hashes {
612                let hash_key = hash.encode_to_vec();
613
614                let Some(block_body) = txn
615                    .get(BODIES, &hash_key)?
616                    .map(|bytes| BlockBodyRLP::from_bytes(bytes).to())
617                    .transpose()
618                    .map_err(StoreError::from)?
619                else {
620                    return Err(StoreError::Custom(format!(
621                        "Block body not found for hash: {hash}"
622                    )));
623                };
624                block_bodies.push(block_body);
625            }
626            Ok(block_bodies)
627        })
628        .await
629        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
630    }
631
632    /// Obtain any block body using the hash
633    pub async fn get_block_body_by_hash(
634        &self,
635        block_hash: BlockHash,
636    ) -> Result<Option<BlockBody>, StoreError> {
637        self.read_async(BODIES, block_hash.encode_to_vec())
638            .await?
639            .map(|bytes| BlockBodyRLP::from_bytes(bytes).to())
640            .transpose()
641            .map_err(StoreError::from)
642    }
643
644    pub fn get_block_header_by_hash(
645        &self,
646        block_hash: BlockHash,
647    ) -> Result<Option<BlockHeader>, StoreError> {
648        let latest = self.latest_block_header.get();
649        if block_hash == latest.hash() {
650            return Ok(Some((*latest).clone()));
651        }
652        self.load_block_header_by_hash(block_hash)
653    }
654
655    pub fn add_pending_block(&self, block: Block) -> Result<(), StoreError> {
656        let block_hash = block.hash();
657        let block_value = BlockRLP::from(block).into_vec();
658        self.write(PENDING_BLOCKS, block_hash.as_bytes().to_vec(), block_value)
659    }
660
661    pub async fn get_pending_block(
662        &self,
663        block_hash: BlockHash,
664    ) -> Result<Option<Block>, StoreError> {
665        self.read_async(PENDING_BLOCKS, block_hash.as_bytes().to_vec())
666            .await?
667            .map(|bytes| BlockRLP::from_bytes(bytes).to())
668            .transpose()
669            .map_err(StoreError::from)
670    }
671
672    /// Add block number for a given hash
673    pub async fn add_block_number(
674        &self,
675        block_hash: BlockHash,
676        block_number: BlockNumber,
677    ) -> Result<(), StoreError> {
678        let number_value = block_number.to_le_bytes().to_vec();
679        self.write_async(BLOCK_NUMBERS, block_hash.encode_to_vec(), number_value)
680            .await
681    }
682
683    /// Obtain block number for a given hash
684    pub async fn get_block_number(
685        &self,
686        block_hash: BlockHash,
687    ) -> Result<Option<BlockNumber>, StoreError> {
688        self.read_async(BLOCK_NUMBERS, block_hash.encode_to_vec())
689            .await?
690            .map(|bytes| -> Result<BlockNumber, StoreError> {
691                let array: [u8; 8] = bytes
692                    .try_into()
693                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
694                Ok(BlockNumber::from_le_bytes(array))
695            })
696            .transpose()
697    }
698
699    /// Store transaction location (block number and index of the transaction within the block)
700    pub async fn add_transaction_location(
701        &self,
702        transaction_hash: H256,
703        block_number: BlockNumber,
704        block_hash: BlockHash,
705        index: Index,
706    ) -> Result<(), StoreError> {
707        self.add_transaction_locations(vec![(transaction_hash, block_number, block_hash, index)])
708            .await
709    }
710
711    /// Store transaction locations in batch (one db transaction for all)
712    pub async fn add_transaction_locations(
713        &self,
714        locations: Vec<(H256, BlockNumber, BlockHash, Index)>,
715    ) -> Result<(), StoreError> {
716        let db = self.backend.clone();
717        tokio::task::spawn_blocking(move || {
718            let mut tx = db.begin_write()?;
719            for (tx_hash, block_number, block_hash, index) in locations {
720                tx.merge(
721                    TRANSACTION_LOCATIONS,
722                    tx_hash.as_bytes(),
723                    &encode_tx_location_operand(block_number, block_hash, index),
724                )?;
725            }
726            tx.commit()
727        })
728        .await
729        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
730    }
731
732    /// Obtain transaction location (block hash and index)
733    pub async fn get_transaction_location(
734        &self,
735        transaction_hash: H256,
736    ) -> Result<Option<(BlockNumber, BlockHash, Index)>, StoreError> {
737        let db = self.backend.clone();
738        tokio::task::spawn_blocking(move || {
739            let tx = db.begin_read()?;
740            let Some(bytes) = tx.get(TRANSACTION_LOCATIONS, transaction_hash.as_bytes())? else {
741                return Ok(None);
742            };
743            let locations = <Vec<(BlockNumber, BlockHash, Index)>>::decode(&bytes)?;
744
745            // In the absence of reorgs, locations has exactly one entry.
746            // If multiple, filter by the canonical chain.
747            for (block_number, block_hash, index) in locations {
748                let canonical_hash = tx
749                    .get(
750                        CANONICAL_BLOCK_HASHES,
751                        block_number.to_le_bytes().as_slice(),
752                    )?
753                    .map(|bytes| H256::decode(bytes.as_slice()))
754                    .transpose()?;
755
756                if canonical_hash == Some(block_hash) {
757                    return Ok(Some((block_number, block_hash, index)));
758                }
759            }
760
761            Ok(None)
762        })
763        .await
764        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
765    }
766
767    /// Add receipt
768    pub async fn add_receipt(
769        &self,
770        block_hash: BlockHash,
771        index: Index,
772        receipt: Receipt,
773    ) -> Result<(), StoreError> {
774        let key = receipt_key(&block_hash, index);
775        let value = receipt.encode_to_vec();
776        self.write_async(RECEIPTS_V2, key, value).await
777    }
778
779    /// Add receipts
780    pub async fn add_receipts(
781        &self,
782        block_hash: BlockHash,
783        receipts: Vec<Receipt>,
784    ) -> Result<(), StoreError> {
785        let batch_items: Vec<_> = receipts
786            .into_iter()
787            .enumerate()
788            .map(|(index, receipt)| {
789                let key = receipt_key(&block_hash, index as u64);
790                let value = receipt.encode_to_vec();
791                (key, value)
792            })
793            .collect();
794        self.write_batch_async(RECEIPTS_V2, batch_items).await
795    }
796
797    /// Obtain receipt for a canonical block represented by the block number.
798    pub async fn get_receipt(
799        &self,
800        block_number: BlockNumber,
801        index: Index,
802    ) -> Result<Option<Receipt>, StoreError> {
803        // FIXME (#4353)
804        let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
805            return Ok(None);
806        };
807        self.get_receipt_by_block_hash(block_hash, index).await
808    }
809
810    /// Obtain receipt by block hash and index
811    async fn get_receipt_by_block_hash(
812        &self,
813        block_hash: BlockHash,
814        index: Index,
815    ) -> Result<Option<Receipt>, StoreError> {
816        let key = receipt_key(&block_hash, index);
817        self.read_async(RECEIPTS_V2, key)
818            .await?
819            .map(|bytes| Receipt::decode(bytes.as_slice()))
820            .transpose()
821            .map_err(StoreError::from)
822    }
823
824    /// Get account code by its hash.
825    ///
826    /// Check if the code exists in the cache (attribute `account_code_cache`), if not,
827    /// reads the database, and if it exists, decodes and returns it.
828    pub fn get_account_code(&self, code_hash: H256) -> Result<Option<Code>, StoreError> {
829        // check cache first
830        if let Some(code) = self
831            .account_code_cache
832            .lock()
833            .map_err(|_| StoreError::LockError)?
834            .get(&code_hash)?
835        {
836            return Ok(Some(code));
837        }
838
839        let Some(bytes) = self
840            .backend
841            .begin_read()?
842            .get(ACCOUNT_CODES, code_hash.as_bytes())?
843        else {
844            return Ok(None);
845        };
846        let (bytecode_slice, targets) = decode_bytes(&bytes)?;
847        let code = Code::from_parts_unchecked(
848            code_hash,
849            bytecode_slice,
850            <Vec<u32>>::decode(targets)?.into(),
851        );
852
853        // insert into cache and evict if needed
854        self.account_code_cache
855            .lock()
856            .map_err(|_| StoreError::LockError)?
857            .insert(&code)?;
858
859        Ok(Some(code))
860    }
861
862    /// Check if account code exists by its hash, without constructing the full `Code` struct.
863    /// More efficient than `get_account_code` for existence checks since it skips
864    /// RLP decoding and `Code` struct construction (no `jump_targets` deserialization).
865    /// Note: The underlying `get()` still reads the value from RocksDB (including blob files).
866    pub fn code_exists(&self, code_hash: H256) -> Result<bool, StoreError> {
867        // Check cache first
868        if self
869            .account_code_cache
870            .lock()
871            .map_err(|_| StoreError::LockError)?
872            .get(&code_hash)?
873            .is_some()
874        {
875            return Ok(true);
876        }
877        // Check DB without reading the full value
878        Ok(self
879            .backend
880            .begin_read()?
881            .get(ACCOUNT_CODES, code_hash.as_bytes())?
882            .is_some())
883    }
884
885    /// Get code metadata (length) by its hash.
886    ///
887    /// Checks cache first, falls back to database. If metadata is missing,
888    /// falls back to loading full code and extracts length (auto-migration).
889    pub fn get_code_metadata(&self, code_hash: H256) -> Result<Option<CodeMetadata>, StoreError> {
890        use ethrex_common::constants::EMPTY_KECCAK_HASH;
891
892        // Empty code special case
893        if code_hash == *EMPTY_KECCAK_HASH {
894            return Ok(Some(CodeMetadata { length: 0 }));
895        }
896
897        // Check cache first
898        if let Some(metadata) = self
899            .code_metadata_cache
900            .lock()
901            .map_err(|_| StoreError::LockError)?
902            .get(&code_hash)
903            .copied()
904        {
905            return Ok(Some(metadata));
906        }
907
908        // Try reading from metadata table
909        let metadata = if let Some(bytes) = self
910            .backend
911            .begin_read()?
912            .get(ACCOUNT_CODE_METADATA, code_hash.as_bytes())?
913        {
914            let length =
915                u64::from_be_bytes(bytes.try_into().map_err(|_| {
916                    StoreError::Custom("Invalid metadata length encoding".to_string())
917                })?);
918            CodeMetadata { length }
919        } else {
920            // Fallback: load full code and extract length (auto-migration)
921            let Some(code) = self.get_account_code(code_hash)? else {
922                return Ok(None);
923            };
924            let metadata = CodeMetadata {
925                length: code.len() as u64,
926            };
927
928            // Write metadata for future use (async, fire and forget)
929            let metadata_buf = metadata.length.to_be_bytes().to_vec();
930            let hash_key = code_hash.0.to_vec();
931            let backend = self.backend.clone();
932            tokio::task::spawn(async move {
933                if let Err(e) = async {
934                    let mut tx = backend.begin_write()?;
935                    tx.put(ACCOUNT_CODE_METADATA, &hash_key, &metadata_buf)?;
936                    tx.commit()
937                }
938                .await
939                {
940                    tracing::warn!("Failed to write code metadata during auto-migration: {}", e);
941                }
942            });
943
944            metadata
945        };
946
947        // Update cache
948        self.code_metadata_cache
949            .lock()
950            .map_err(|_| StoreError::LockError)?
951            .insert(code_hash, metadata);
952
953        Ok(Some(metadata))
954    }
955
956    /// Add account code
957    pub async fn add_account_code(&self, code: Code) -> Result<(), StoreError> {
958        let hash_key = code.hash.0.to_vec();
959        let buf = encode_code(&code);
960        let metadata_buf = (code.len() as u64).to_be_bytes();
961
962        // Write both code and metadata atomically
963        let backend = self.backend.clone();
964        tokio::task::spawn_blocking(move || {
965            let mut tx = backend.begin_write()?;
966            tx.put(ACCOUNT_CODES, &hash_key, &buf)?;
967            tx.put(ACCOUNT_CODE_METADATA, &hash_key, &metadata_buf)?;
968            tx.commit()
969        })
970        .await
971        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
972    }
973
974    /// Clears all checkpoint data created during the last snap sync
975    pub async fn clear_snap_state(&self) -> Result<(), StoreError> {
976        let db = self.backend.clone();
977        tokio::task::spawn_blocking(move || db.clear_table(SNAP_STATE))
978            .await
979            .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
980    }
981
982    pub async fn get_transaction_by_hash(
983        &self,
984        transaction_hash: H256,
985    ) -> Result<Option<Transaction>, StoreError> {
986        let (_block_number, block_hash, index) =
987            match self.get_transaction_location(transaction_hash).await? {
988                Some(location) => location,
989                None => return Ok(None),
990            };
991        self.get_transaction_by_location(block_hash, index).await
992    }
993
994    pub async fn get_transaction_by_location(
995        &self,
996        block_hash: H256,
997        index: u64,
998    ) -> Result<Option<Transaction>, StoreError> {
999        let block_body = match self.get_block_body_by_hash(block_hash).await? {
1000            Some(body) => body,
1001            None => return Ok(None),
1002        };
1003        let index: usize = index.try_into()?;
1004        Ok(block_body.transactions.get(index).cloned())
1005    }
1006
1007    pub async fn get_block_by_hash(
1008        &self,
1009        block_hash: BlockHash,
1010    ) -> Result<Option<Block>, StoreError> {
1011        let header = match self.get_block_header_by_hash(block_hash)? {
1012            Some(header) => header,
1013            None => return Ok(None),
1014        };
1015        let body = match self.get_block_body_by_hash(block_hash).await? {
1016            Some(body) => body,
1017            None => return Ok(None),
1018        };
1019        Ok(Some(Block::new(header, body)))
1020    }
1021
1022    pub async fn get_block_by_number(
1023        &self,
1024        block_number: BlockNumber,
1025    ) -> Result<Option<Block>, StoreError> {
1026        let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
1027            return Ok(None);
1028        };
1029        self.get_block_by_hash(block_hash).await
1030    }
1031
1032    // Get the canonical block hash for a given block number.
1033    pub async fn get_canonical_block_hash(
1034        &self,
1035        block_number: BlockNumber,
1036    ) -> Result<Option<BlockHash>, StoreError> {
1037        let last = self.latest_block_header.get();
1038        if last.number == block_number {
1039            return Ok(Some(last.hash()));
1040        }
1041        let backend = self.backend.clone();
1042        tokio::task::spawn_blocking(move || {
1043            backend
1044                .begin_read()?
1045                .get(
1046                    CANONICAL_BLOCK_HASHES,
1047                    block_number.to_le_bytes().as_slice(),
1048                )?
1049                .map(|bytes| H256::decode(bytes.as_slice()))
1050                .transpose()
1051                .map_err(StoreError::from)
1052        })
1053        .await
1054        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1055    }
1056
1057    /// Stores the chain configuration values, should only be called once after reading the genesis file
1058    /// Ignores previously stored values if present
1059    pub async fn set_chain_config(&mut self, chain_config: &ChainConfig) -> Result<(), StoreError> {
1060        self.chain_config = *chain_config;
1061        let key = chain_data_key(ChainDataIndex::ChainConfig);
1062        let value = serde_json::to_string(chain_config)
1063            .map_err(|_| StoreError::Custom("Failed to serialize chain config".to_string()))?
1064            .into_bytes();
1065        self.write_async(CHAIN_DATA, key, value).await
1066    }
1067
1068    /// Update earliest block number
1069    pub async fn update_earliest_block_number(
1070        &self,
1071        block_number: BlockNumber,
1072    ) -> Result<(), StoreError> {
1073        let key = chain_data_key(ChainDataIndex::EarliestBlockNumber);
1074        let value = block_number.to_le_bytes().to_vec();
1075        self.write_async(CHAIN_DATA, key, value).await
1076    }
1077
1078    /// Obtain earliest block number
1079    pub async fn get_earliest_block_number(&self) -> Result<BlockNumber, StoreError> {
1080        let key = chain_data_key(ChainDataIndex::EarliestBlockNumber);
1081        self.read_async(CHAIN_DATA, key)
1082            .await?
1083            .map(|bytes| -> Result<BlockNumber, StoreError> {
1084                let array: [u8; 8] = bytes
1085                    .try_into()
1086                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1087                Ok(BlockNumber::from_le_bytes(array))
1088            })
1089            .ok_or(StoreError::MissingEarliestBlockNumber)?
1090    }
1091
1092    /// Obtain finalized block number
1093    pub async fn get_finalized_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
1094        let key = chain_data_key(ChainDataIndex::FinalizedBlockNumber);
1095        self.read_async(CHAIN_DATA, key)
1096            .await?
1097            .map(|bytes| -> Result<BlockNumber, StoreError> {
1098                let array: [u8; 8] = bytes
1099                    .try_into()
1100                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1101                Ok(BlockNumber::from_le_bytes(array))
1102            })
1103            .transpose()
1104    }
1105
1106    /// Obtain safe block number
1107    pub async fn get_safe_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
1108        let key = chain_data_key(ChainDataIndex::SafeBlockNumber);
1109        self.read_async(CHAIN_DATA, key)
1110            .await?
1111            .map(|bytes| -> Result<BlockNumber, StoreError> {
1112                let array: [u8; 8] = bytes
1113                    .try_into()
1114                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1115                Ok(BlockNumber::from_le_bytes(array))
1116            })
1117            .transpose()
1118    }
1119
1120    /// Obtain latest block number
1121    pub async fn get_latest_block_number(&self) -> Result<BlockNumber, StoreError> {
1122        Ok(self.latest_block_header.get().number)
1123    }
1124
1125    /// Update pending block number
1126    pub async fn update_pending_block_number(
1127        &self,
1128        block_number: BlockNumber,
1129    ) -> Result<(), StoreError> {
1130        let key = chain_data_key(ChainDataIndex::PendingBlockNumber);
1131        let value = block_number.to_le_bytes().to_vec();
1132        self.write_async(CHAIN_DATA, key, value).await
1133    }
1134
1135    /// Obtain pending block number
1136    pub async fn get_pending_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
1137        let key = chain_data_key(ChainDataIndex::PendingBlockNumber);
1138        self.read_async(CHAIN_DATA, key)
1139            .await?
1140            .map(|bytes| -> Result<BlockNumber, StoreError> {
1141                let array: [u8; 8] = bytes
1142                    .try_into()
1143                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1144                Ok(BlockNumber::from_le_bytes(array))
1145            })
1146            .transpose()
1147    }
1148
1149    /// DB mutation step of `forkchoice_update`.
1150    ///
1151    /// Callers MUST hold `fcu_lock` (only `forkchoice_update` should invoke this).
1152    /// The read of `LatestBlockNumber` below happens outside the write
1153    /// transaction and would be a TOCTOU window without that serialization.
1154    async fn forkchoice_update_inner(
1155        &self,
1156        new_canonical_blocks: Vec<(BlockNumber, BlockHash)>,
1157        head_number: BlockNumber,
1158        head_hash: BlockHash,
1159        safe: Option<BlockNumber>,
1160        finalized: Option<BlockNumber>,
1161    ) -> Result<(), StoreError> {
1162        let latest = self.load_latest_block_number().await?.unwrap_or(0);
1163        let db = self.backend.clone();
1164        tokio::task::spawn_blocking(move || {
1165            let mut txn = db.begin_write()?;
1166
1167            for (block_number, block_hash) in new_canonical_blocks {
1168                let head_key = block_number.to_le_bytes();
1169                let head_value = block_hash.encode_to_vec();
1170                txn.put(CANONICAL_BLOCK_HASHES, &head_key, &head_value)?;
1171            }
1172
1173            // Delete canonical entries above the new head by enumerating each key.
1174            // `delete_range` is not safe here: keys are `u64::to_le_bytes()`, and
1175            // RocksDB's lexicographic comparator does not match LE numeric order
1176            // (e.g. block 256 = [0x00, 0x01, ..] sorts before block 11 = [0x0B, ..]),
1177            // so a range-delete would silently miss blocks whose LE first byte is
1178            // smaller than `head+1`'s first byte.
1179            for number in (head_number + 1)..=(latest) {
1180                txn.delete(CANONICAL_BLOCK_HASHES, number.to_le_bytes().as_slice())?;
1181            }
1182
1183            // Make head canonical
1184            let head_key = head_number.to_le_bytes();
1185            let head_value = head_hash.encode_to_vec();
1186            txn.put(CANONICAL_BLOCK_HASHES, &head_key, &head_value)?;
1187
1188            // Update chain data
1189            let latest_key = chain_data_key(ChainDataIndex::LatestBlockNumber);
1190            txn.put(CHAIN_DATA, &latest_key, &head_number.to_le_bytes())?;
1191
1192            if let Some(safe) = safe {
1193                let safe_key = chain_data_key(ChainDataIndex::SafeBlockNumber);
1194                txn.put(CHAIN_DATA, &safe_key, &safe.to_le_bytes())?;
1195            }
1196
1197            if let Some(finalized) = finalized {
1198                let finalized_key = chain_data_key(ChainDataIndex::FinalizedBlockNumber);
1199                txn.put(CHAIN_DATA, &finalized_key, &finalized.to_le_bytes())?;
1200            }
1201
1202            txn.commit()
1203        })
1204        .await
1205        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1206    }
1207
1208    pub async fn get_receipts_for_block(
1209        &self,
1210        block_hash: &BlockHash,
1211    ) -> Result<Vec<Receipt>, StoreError> {
1212        self.get_receipts_for_block_from_index(block_hash, 0, None)
1213            .await
1214    }
1215
1216    /// Retrieves receipts for a block starting from the given index,
1217    /// optionally limited to `max_count` receipts.
1218    ///
1219    /// Uses cursor-based prefix iteration over the 32-byte block hash prefix
1220    /// for efficient batch retrieval. Used by:
1221    /// - eth/70 partial receipt requests (EIP-7975) via p2p
1222    /// - `eth_getTransactionReceipt` RPC with a count limit to avoid
1223    ///   fetching the entire block's receipts
1224    pub async fn get_receipts_for_block_from_index(
1225        &self,
1226        block_hash: &BlockHash,
1227        start_index: u64,
1228        max_count: Option<usize>,
1229    ) -> Result<Vec<Receipt>, StoreError> {
1230        let backend = self.backend.clone();
1231        let block_hash = *block_hash;
1232
1233        tokio::task::spawn_blocking(move || {
1234            let txn = backend.begin_read()?;
1235            let prefix = block_hash.as_bytes().to_vec();
1236            // Seek directly to block_hash || start_index to avoid O(start_index) scan.
1237            // Keys are big-endian u64, so lexicographic order matches numeric order.
1238            let mut seek_key = prefix.clone();
1239            seek_key.extend_from_slice(&start_index.to_be_bytes());
1240            let iter = txn.prefix_iterator(RECEIPTS_V2, &seek_key)?;
1241            let mut receipts = Vec::new();
1242            for result in iter {
1243                let (k, v) = result?;
1244                if !k.starts_with(&prefix) {
1245                    break;
1246                }
1247                if k.len() != 40 {
1248                    continue;
1249                }
1250                receipts.push(Receipt::decode(v.as_ref())?);
1251                if let Some(max) = max_count
1252                    && receipts.len() >= max
1253                {
1254                    break;
1255                }
1256            }
1257            Ok(receipts)
1258        })
1259        .await
1260        .map_err(|e| StoreError::Custom(format!("Task panicked: {e}")))?
1261    }
1262
1263    // Snap State methods
1264
1265    /// Sets the hash of the last header downloaded during a snap sync
1266    pub async fn set_header_download_checkpoint(
1267        &self,
1268        block_hash: BlockHash,
1269    ) -> Result<(), StoreError> {
1270        let key = snap_state_key(SnapStateIndex::HeaderDownloadCheckpoint);
1271        let value = block_hash.encode_to_vec();
1272        self.write_async(SNAP_STATE, key, value).await
1273    }
1274
1275    /// Gets the hash of the last header downloaded during a snap sync
1276    pub async fn get_header_download_checkpoint(&self) -> Result<Option<BlockHash>, StoreError> {
1277        let key = snap_state_key(SnapStateIndex::HeaderDownloadCheckpoint);
1278        self.backend
1279            .begin_read()?
1280            .get(SNAP_STATE, &key)?
1281            .map(|bytes| H256::decode(bytes.as_slice()))
1282            .transpose()
1283            .map_err(StoreError::from)
1284    }
1285
1286    /// The `forkchoice_update` and `new_payload` methods require the `latest_valid_hash`
1287    /// when processing an invalid payload. To provide this, we must track invalid chains.
1288    ///
1289    /// We only store the last known valid head upon encountering a bad block,
1290    /// rather than tracking every subsequent invalid block.
1291    pub async fn set_latest_valid_ancestor(
1292        &self,
1293        bad_block: BlockHash,
1294        latest_valid: BlockHash,
1295    ) -> Result<(), StoreError> {
1296        let value = latest_valid.encode_to_vec();
1297        self.write_async(INVALID_CHAINS, bad_block.as_bytes().to_vec(), value)
1298            .await
1299    }
1300
1301    /// Returns the latest valid ancestor hash for a given invalid block hash.
1302    /// Used to provide `latest_valid_hash` in the Engine API when processing invalid payloads.
1303    pub async fn get_latest_valid_ancestor(
1304        &self,
1305        block: BlockHash,
1306    ) -> Result<Option<BlockHash>, StoreError> {
1307        self.read_async(INVALID_CHAINS, block.as_bytes().to_vec())
1308            .await?
1309            .map(|bytes| H256::decode(bytes.as_slice()))
1310            .transpose()
1311            .map_err(StoreError::from)
1312    }
1313
1314    /// Obtain block number for a given hash
1315    pub fn get_block_number_sync(
1316        &self,
1317        block_hash: BlockHash,
1318    ) -> Result<Option<BlockNumber>, StoreError> {
1319        let txn = self.backend.begin_read()?;
1320        txn.get(BLOCK_NUMBERS, &block_hash.encode_to_vec())?
1321            .map(|bytes| -> Result<BlockNumber, StoreError> {
1322                let array: [u8; 8] = bytes
1323                    .try_into()
1324                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
1325                Ok(BlockNumber::from_le_bytes(array))
1326            })
1327            .transpose()
1328    }
1329
1330    /// Get the canonical block hash for a given block number.
1331    pub fn get_canonical_block_hash_sync(
1332        &self,
1333        block_number: BlockNumber,
1334    ) -> Result<Option<BlockHash>, StoreError> {
1335        let last = self.latest_block_header.get();
1336        if last.number == block_number {
1337            return Ok(Some(last.hash()));
1338        }
1339        let txn = self.backend.begin_read()?;
1340        txn.get(
1341            CANONICAL_BLOCK_HASHES,
1342            block_number.to_le_bytes().as_slice(),
1343        )?
1344        .map(|bytes| H256::decode(bytes.as_slice()))
1345        .transpose()
1346        .map_err(StoreError::from)
1347    }
1348
1349    /// CAUTION: This method writes directly to the underlying database, bypassing any caching layer.
1350    /// For updating the state after block execution, use [`Self::store_block_updates`].
1351    pub async fn write_storage_trie_nodes_batch(
1352        &self,
1353        storage_trie_nodes: StorageUpdates,
1354    ) -> Result<(), StoreError> {
1355        let mut txn = self.backend.begin_write()?;
1356        tokio::task::spawn_blocking(move || {
1357            for (address_hash, nodes) in storage_trie_nodes {
1358                for (node_path, node_data) in nodes {
1359                    let key = apply_prefix(Some(address_hash), node_path);
1360                    if node_data.is_empty() {
1361                        txn.delete(STORAGE_TRIE_NODES, key.as_ref())?;
1362                    } else {
1363                        txn.put(STORAGE_TRIE_NODES, key.as_ref(), &node_data)?;
1364                    }
1365                }
1366            }
1367            txn.commit()
1368        })
1369        .await
1370        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1371    }
1372
1373    /// CAUTION: This method writes directly to the underlying database, bypassing any caching layer.
1374    /// For updating the state after block execution, use [`Self::store_block_updates`].
1375    pub async fn write_account_code_batch(
1376        &self,
1377        account_codes: Vec<(H256, Code)>,
1378    ) -> Result<(), StoreError> {
1379        let mut code_batch_items = Vec::new();
1380        let mut metadata_batch_items = Vec::new();
1381
1382        for (code_hash, code) in account_codes {
1383            let buf = encode_code(&code);
1384            let metadata_buf = (code.len() as u64).to_be_bytes().to_vec();
1385            code_batch_items.push((code_hash.as_bytes().to_vec(), buf));
1386            metadata_batch_items.push((code_hash.as_bytes().to_vec(), metadata_buf));
1387        }
1388
1389        // Write both batches
1390        self.write_batch_async(ACCOUNT_CODES, code_batch_items)
1391            .await?;
1392        self.write_batch_async(ACCOUNT_CODE_METADATA, metadata_batch_items)
1393            .await
1394    }
1395
1396    // Helper methods for async operations with spawn_blocking
1397    // These methods ensure RocksDB I/O doesn't block the tokio runtime
1398
1399    /// Helper method for async writes
1400    /// Spawns blocking task to avoid blocking tokio runtime
1401    pub fn write(
1402        &self,
1403        table: &'static str,
1404        key: Vec<u8>,
1405        value: Vec<u8>,
1406    ) -> Result<(), StoreError> {
1407        let backend = self.backend.clone();
1408        let mut txn = backend.begin_write()?;
1409        txn.put(table, &key, &value)?;
1410        txn.commit()
1411    }
1412
1413    /// Helper method for async writes
1414    /// Spawns blocking task to avoid blocking tokio runtime
1415    async fn write_async(
1416        &self,
1417        table: &'static str,
1418        key: Vec<u8>,
1419        value: Vec<u8>,
1420    ) -> Result<(), StoreError> {
1421        let backend = self.backend.clone();
1422
1423        tokio::task::spawn_blocking(move || {
1424            let mut txn = backend.begin_write()?;
1425            txn.put(table, &key, &value)?;
1426            txn.commit()
1427        })
1428        .await
1429        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1430    }
1431
1432    /// Helper method for async reads
1433    /// Spawns blocking task to avoid blocking tokio runtime
1434    pub async fn read_async(
1435        &self,
1436        table: &'static str,
1437        key: Vec<u8>,
1438    ) -> Result<Option<Vec<u8>>, StoreError> {
1439        let backend = self.backend.clone();
1440
1441        tokio::task::spawn_blocking(move || {
1442            let txn = backend.begin_read()?;
1443            txn.get(table, &key)
1444        })
1445        .await
1446        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1447    }
1448
1449    /// Helper method for sync reads
1450    /// Spawns blocking task to avoid blocking tokio runtime
1451    pub fn read(&self, table: &'static str, key: Vec<u8>) -> Result<Option<Vec<u8>>, StoreError> {
1452        let backend = self.backend.clone();
1453        let txn = backend.begin_read()?;
1454        txn.get(table, &key)
1455    }
1456
1457    /// Helper method for batch writes
1458    /// Spawns blocking task to avoid blocking tokio runtime
1459    /// This is the most important optimization for healing performance
1460    pub async fn write_batch_async(
1461        &self,
1462        table: &'static str,
1463        batch_ops: Vec<(Vec<u8>, Vec<u8>)>,
1464    ) -> Result<(), StoreError> {
1465        let backend = self.backend.clone();
1466
1467        tokio::task::spawn_blocking(move || {
1468            let mut txn = backend.begin_write()?;
1469            txn.put_batch(table, batch_ops)?;
1470            txn.commit()
1471        })
1472        .await
1473        .map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
1474    }
1475
1476    /// Helper method for batch writes
1477    pub fn write_batch(
1478        &self,
1479        table: &'static str,
1480        batch_ops: Vec<(Vec<u8>, Vec<u8>)>,
1481    ) -> Result<(), StoreError> {
1482        let backend = self.backend.clone();
1483        let mut txn = backend.begin_write()?;
1484        txn.put_batch(table, batch_ops)?;
1485        txn.commit()
1486    }
1487
1488    pub async fn add_fullsync_batch(&self, headers: Vec<BlockHeader>) -> Result<(), StoreError> {
1489        self.write_batch_async(
1490            FULLSYNC_HEADERS,
1491            headers
1492                .into_iter()
1493                .map(|header| (header.number.to_le_bytes().to_vec(), header.encode_to_vec()))
1494                .collect(),
1495        )
1496        .await
1497    }
1498
1499    pub async fn read_fullsync_batch(
1500        &self,
1501        start: BlockNumber,
1502        limit: u64,
1503    ) -> Result<Vec<Option<BlockHeader>>, StoreError> {
1504        let mut res = vec![];
1505        let read_tx = self.backend.begin_read()?;
1506        // TODO: use read_bulk here
1507        for key in start..start + limit {
1508            let header_opt = read_tx
1509                .get(FULLSYNC_HEADERS, &key.to_le_bytes())?
1510                .map(|header| BlockHeader::decode(&header))
1511                .transpose()?;
1512            res.push(header_opt);
1513        }
1514        Ok(res)
1515    }
1516
1517    pub async fn clear_fullsync_headers(&self) -> Result<(), StoreError> {
1518        self.backend.clear_table(FULLSYNC_HEADERS)
1519    }
1520
1521    /// Delete a key from a table
1522    pub fn delete(&self, table: &'static str, key: Vec<u8>) -> Result<(), StoreError> {
1523        let mut txn = self.backend.begin_write()?;
1524        txn.delete(table, &key)?;
1525        txn.commit()
1526    }
1527
1528    pub fn store_block_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> {
1529        self.apply_updates(update_batch)
1530    }
1531
1532    fn apply_updates(&self, update_batch: UpdateBatch) -> Result<(), StoreError> {
1533        let db = self.backend.clone();
1534        let parent_state_root = self
1535            .get_block_header_by_hash(
1536                update_batch
1537                    .blocks
1538                    .first()
1539                    .ok_or(StoreError::UpdateBatchNoBlocks)?
1540                    .header
1541                    .parent_hash,
1542            )?
1543            .map(|header| header.state_root)
1544            .unwrap_or_default();
1545        let last_state_root = update_batch
1546            .blocks
1547            .last()
1548            .ok_or(StoreError::UpdateBatchNoBlocks)?
1549            .header
1550            .state_root;
1551        let trie_upd_worker_tx = self.trie_update_worker_tx.clone();
1552
1553        let is_batch = update_batch.batch_mode;
1554
1555        let UpdateBatch {
1556            account_updates,
1557            storage_updates,
1558            ..
1559        } = update_batch;
1560
1561        // Capacity one ensures sender just notifies and goes on
1562        let (notify_tx, notify_rx) = sync_channel(1);
1563        let wait_for_new_layer = notify_rx;
1564        let trie_update = TrieUpdate {
1565            parent_state_root,
1566            account_updates,
1567            storage_updates,
1568            result_sender: notify_tx,
1569            child_state_root: last_state_root,
1570            is_batch,
1571        };
1572        trie_upd_worker_tx
1573            .send(TrieMessage::Update(trie_update))
1574            .map_err(|e| {
1575                StoreError::Custom(format!("failed to read new trie layer notification: {e}"))
1576            })?;
1577        let mut tx = db.begin_write()?;
1578
1579        for block in update_batch.blocks {
1580            let block_number = block.header.number;
1581            let block_hash = block.hash();
1582            let hash_key = block_hash.encode_to_vec();
1583
1584            let header_value_rlp = BlockHeaderRLP::from(block.header.clone());
1585            tx.put(HEADERS, &hash_key, header_value_rlp.bytes())?;
1586
1587            let body_value = BlockBodyRLP::from_bytes(block.body.encode_to_vec());
1588            tx.put(BODIES, &hash_key, body_value.bytes())?;
1589
1590            tx.put(BLOCK_NUMBERS, &hash_key, &block_number.to_le_bytes())?;
1591
1592            for (index, transaction) in block.body.transactions.iter().enumerate() {
1593                tx.merge(
1594                    TRANSACTION_LOCATIONS,
1595                    transaction.hash().as_bytes(),
1596                    &encode_tx_location_operand(block_number, block_hash, index as u64),
1597                )?;
1598            }
1599        }
1600
1601        for (block_hash, receipts) in update_batch.receipts {
1602            for (index, receipt) in receipts.into_iter().enumerate() {
1603                let key = receipt_key(&block_hash, index as u64);
1604                let value = receipt.encode_to_vec();
1605                tx.put(RECEIPTS_V2, &key, &value)?;
1606            }
1607        }
1608
1609        for (code_hash, code) in update_batch.code_updates {
1610            let buf = encode_code(&code);
1611            let metadata_buf = (code.len() as u64).to_be_bytes();
1612            tx.put(ACCOUNT_CODES, code_hash.as_ref(), &buf)?;
1613            tx.put(ACCOUNT_CODE_METADATA, code_hash.as_ref(), &metadata_buf)?;
1614        }
1615
1616        // Wait for an updated top layer so every caller afterwards sees a consistent view.
1617        // Specifically, the next block produced MUST see this upper layer.
1618        wait_for_new_layer
1619            .recv()
1620            .map_err(|e| StoreError::Custom(format!("recv failed: {e}")))??;
1621        // After top-level is added, we can make the rest of the changes visible.
1622        tx.commit()?;
1623
1624        Ok(())
1625    }
1626
1627    /// Opens (or creates) a store at `path` with the default [`StoreConfig`].
1628    ///
1629    /// Production callers that need to override storage tunables (e.g. the RocksDB
1630    /// block cache size from a CLI option) should use [`Store::new_with_config`].
1631    pub fn new(path: impl AsRef<Path>, engine_type: EngineType) -> Result<Self, StoreError> {
1632        Self::new_with_config(path, engine_type, StoreConfig::default())
1633    }
1634
1635    /// Opens (or creates) a store at `path`, applying the supplied [`StoreConfig`].
1636    pub fn new_with_config(
1637        path: impl AsRef<Path>,
1638        engine_type: EngineType,
1639        // `config` only feeds the RocksDB backend; without that feature it is unused.
1640        #[cfg_attr(not(feature = "rocksdb"), allow(unused_variables))] config: StoreConfig,
1641    ) -> Result<Self, StoreError> {
1642        let db_path = path.as_ref().to_path_buf();
1643
1644        if engine_type != EngineType::InMemory {
1645            let version = read_store_schema_version(&db_path)?;
1646
1647            match version {
1648                None if db_path.exists() && dir_contains_legacy_db(&db_path)? => {
1649                    // Pre-metadata DB — cannot migrate safely
1650                    return Err(StoreError::NotFoundDBVersion);
1651                }
1652                None => {
1653                    // No metadata and no recognizable database files. The directory
1654                    // may still hold unrelated files (e.g. a JWT secret placed in the
1655                    // datadir by tooling such as EthDocker, see issue #5680), so treat
1656                    // this as a fresh datadir and write the initial metadata instead
1657                    // of erroring out.
1658                    init_metadata_file(&db_path)?;
1659                }
1660                Some(v) if v < 1 => {
1661                    return Err(StoreError::MigrationFailed {
1662                        from: v,
1663                        to: STORE_SCHEMA_VERSION,
1664                        reason: format!("DB version v{v} is invalid (predates migrations)"),
1665                    });
1666                }
1667                Some(v) if v > STORE_SCHEMA_VERSION => {
1668                    return Err(StoreError::MigrationFailed {
1669                        from: v,
1670                        to: STORE_SCHEMA_VERSION,
1671                        reason: format!(
1672                            "DB version v{v} is more recent than the client expects (v{STORE_SCHEMA_VERSION}). Rolling back is not supported"
1673                        ),
1674                    });
1675                }
1676                #[cfg(feature = "rocksdb")]
1677                Some(v) if v < STORE_SCHEMA_VERSION => {
1678                    // Open backend, run migrations, then drop obsolete CFs.
1679                    // Cleanup must happen AFTER migrations so legacy CFs (e.g.
1680                    // `receipts`) are still readable during the migration.
1681                    let rocksdb = Arc::new(RocksDBBackend::open(
1682                        &path,
1683                        config.rocksdb_block_cache_size,
1684                    )?);
1685                    crate::migrations::run_pending_migrations(rocksdb.as_ref(), &db_path, v)?;
1686                    rocksdb.drop_obsolete_cfs(&path);
1687                    let backend: Arc<dyn crate::api::StorageBackend> = rocksdb;
1688                    return Self::from_backend(backend, db_path, DB_COMMIT_THRESHOLD);
1689                }
1690                Some(_) => {
1691                    // version == STORE_SCHEMA_VERSION, proceed normally.
1692                    // Without the `rocksdb` feature this also covers v < target,
1693                    // but that path is unreachable since InMemory is the only
1694                    // engine type and the outer guard excludes it.
1695                }
1696            }
1697        }
1698
1699        match engine_type {
1700            #[cfg(feature = "rocksdb")]
1701            EngineType::RocksDB => {
1702                let rocksdb = RocksDBBackend::open(&path, config.rocksdb_block_cache_size)?;
1703                rocksdb.drop_obsolete_cfs(&path);
1704                let backend: Arc<dyn StorageBackend> = Arc::new(rocksdb);
1705                Self::from_backend(backend, db_path, DB_COMMIT_THRESHOLD)
1706            }
1707            EngineType::InMemory => {
1708                let backend = Arc::new(InMemoryBackend::open()?);
1709                Self::from_backend(backend, db_path, IN_MEMORY_COMMIT_THRESHOLD)
1710            }
1711        }
1712    }
1713
1714    fn from_backend(
1715        backend: Arc<dyn StorageBackend>,
1716        db_path: PathBuf,
1717        commit_threshold: usize,
1718    ) -> Result<Self, StoreError> {
1719        debug!("Initializing Store with {commit_threshold} in-memory diff-layers");
1720        let (fkv_tx, fkv_rx) = std::sync::mpsc::sync_channel(0);
1721        let (trie_upd_tx, trie_upd_rx) = std::sync::mpsc::sync_channel(0);
1722
1723        let last_written = {
1724            let tx = backend.begin_read()?;
1725            let last_written = tx
1726                .get(MISC_VALUES, "last_written".as_bytes())?
1727                .unwrap_or_else(|| vec![0u8; 64]);
1728            if last_written == [0xff] {
1729                vec![0xff; 64]
1730            } else {
1731                last_written
1732            }
1733        };
1734        let mut background_threads = Vec::new();
1735        let mut store = Self {
1736            db_path,
1737            backend,
1738            chain_config: Default::default(),
1739            latest_block_header: Default::default(),
1740            trie_cache: Arc::new(RwLock::new(Arc::new(TrieLayerCache::new(commit_threshold)))),
1741            flatkeyvalue_control_tx: fkv_tx,
1742            trie_update_worker_tx: trie_upd_tx,
1743            last_computed_flatkeyvalue: Arc::new(RwLock::new(last_written)),
1744            account_code_cache: Arc::new(Mutex::new(CodeCache::default())),
1745            code_metadata_cache: Arc::new(Mutex::new(rustc_hash::FxHashMap::default())),
1746            fcu_lock: Arc::new(tokio::sync::Mutex::new(())),
1747            background_threads: Default::default(),
1748        };
1749        let backend_clone = store.backend.clone();
1750        let last_computed_fkv = store.last_computed_flatkeyvalue.clone();
1751        background_threads.push(std::thread::spawn(move || {
1752            let rx = fkv_rx;
1753            // Wait for the first Continue to start generation
1754            loop {
1755                match rx.recv() {
1756                    Ok(FKVGeneratorControlMessage::Continue) => break,
1757                    Ok(FKVGeneratorControlMessage::Stop) => {}
1758                    Err(std::sync::mpsc::RecvError) => {
1759                        debug!("Closing FlatKeyValue generator.");
1760                        return;
1761                    }
1762                }
1763            }
1764
1765            let _ = flatkeyvalue_generator(&backend_clone, &last_computed_fkv, &rx)
1766                .inspect_err(|err| error!("Error while generating FlatKeyValue: {err}"));
1767        }));
1768        let backend = store.backend.clone();
1769        let flatkeyvalue_control_tx = store.flatkeyvalue_control_tx.clone();
1770        let trie_cache = store.trie_cache.clone();
1771        /*
1772            When a block is executed, the write of the bottom-most diff layer to disk is done in the background through this thread.
1773            This is to improve block execution times, since it's not necessary when executing the next block to have this layer flushed to disk.
1774
1775            This background thread receives messages through a channel to apply new trie updates and does three things:
1776
1777            - First, it updates the top-most in-memory diff layer and notifies the process that sent the message (i.e. the
1778            block production thread) so it can continue with block execution (block execution cannot proceed without the
1779            diff layers updated, otherwise it would see wrong state when reading from the trie). This section is done in an RCU manner:
1780            a shared pointer with the trie is kept behind a lock. This thread first acquires the lock, then copies the pointer and drops the lock;
1781            afterwards it makes a deep copy of the trie layer and mutates it, then takes the lock again, replaces the pointer with the updated copy,
1782            then drops the lock again.
1783
1784            - Second, it performs the logic of persisting the bottom-most diff layer to disk. This is the part of the logic that block execution does not
1785            need to proceed. What does need to be aware of this section is the process in charge of generating the snapshot (a.k.a. FlatKeyValue).
1786            Because of this, this section first sends a message to pause the FlatKeyValue generation, then persists the diff layer to disk, then notifies
1787            again for FlatKeyValue generation to continue.
1788
1789            - Third, it removes the (no longer needed) bottom-most diff layer from the trie layers in the same way as the first step.
1790        */
1791        background_threads.push(std::thread::spawn(move || {
1792            let rx = trie_upd_rx;
1793            loop {
1794                match rx.recv() {
1795                    Ok(TrieMessage::Update(trie_update)) => {
1796                        // FIXME: what should we do on error?
1797                        let _ = apply_trie_updates(
1798                            backend.as_ref(),
1799                            &flatkeyvalue_control_tx,
1800                            &trie_cache,
1801                            trie_update,
1802                        )
1803                        .inspect_err(|err| error!("apply_trie_updates failed: {err}"));
1804                    }
1805                    Ok(TrieMessage::Ping) => {
1806                        // Rendezvous handshake only — sender just wanted to know
1807                        // we were idle and back at recv(). No work to do.
1808                    }
1809                    Err(err) => {
1810                        debug!("Trie update sender disconnected: {err}");
1811                        return;
1812                    }
1813                }
1814            }
1815        }));
1816        store.background_threads = Arc::new(ThreadList {
1817            list: background_threads,
1818        });
1819        Ok(store)
1820    }
1821
1822    /// Opens (or creates) a store at `store_path` and seeds it from the
1823    /// given genesis file, using the default [`StoreConfig`].
1824    pub async fn new_from_genesis(
1825        store_path: &Path,
1826        engine_type: EngineType,
1827        genesis_path: &str,
1828    ) -> Result<Self, StoreError> {
1829        Self::new_from_genesis_with_config(
1830            store_path,
1831            engine_type,
1832            genesis_path,
1833            StoreConfig::default(),
1834        )
1835        .await
1836    }
1837
1838    /// Opens (or creates) a store at `store_path` from genesis, applying the
1839    /// supplied [`StoreConfig`].
1840    pub async fn new_from_genesis_with_config(
1841        store_path: &Path,
1842        engine_type: EngineType,
1843        genesis_path: &str,
1844        config: StoreConfig,
1845    ) -> Result<Self, StoreError> {
1846        let file = std::fs::File::open(genesis_path)
1847            .map_err(|error| StoreError::Custom(format!("Failed to open genesis file: {error}")))?;
1848        let reader = std::io::BufReader::new(file);
1849        let genesis: Genesis = serde_json::from_reader(reader)
1850            .map_err(|e| StoreError::Custom(format!("Failed to deserialize genesis file: {e}")))?;
1851        let mut store = Self::new_with_config(store_path, engine_type, config)?;
1852        store.add_initial_state(genesis).await?;
1853        Ok(store)
1854    }
1855
1856    pub async fn get_account_info(
1857        &self,
1858        block_number: BlockNumber,
1859        address: Address,
1860    ) -> Result<Option<AccountInfo>, StoreError> {
1861        match self.get_canonical_block_hash(block_number).await? {
1862            Some(block_hash) => self.get_account_info_by_hash(block_hash, address),
1863            None => Ok(None),
1864        }
1865    }
1866
1867    pub fn get_account_info_by_hash(
1868        &self,
1869        block_hash: BlockHash,
1870        address: Address,
1871    ) -> Result<Option<AccountInfo>, StoreError> {
1872        let Some(state_trie) = self.state_trie(block_hash)? else {
1873            return Ok(None);
1874        };
1875        let hashed_address = hash_address_fixed(&address);
1876
1877        let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
1878            return Ok(None);
1879        };
1880
1881        let account_state = AccountState::decode(&encoded_state)?;
1882        Ok(Some(AccountInfo {
1883            code_hash: account_state.code_hash,
1884            balance: account_state.balance,
1885            nonce: account_state.nonce,
1886        }))
1887    }
1888
1889    pub fn get_account_state_by_acc_hash(
1890        &self,
1891        block_hash: BlockHash,
1892        account_hash: H256,
1893    ) -> Result<Option<AccountState>, StoreError> {
1894        let Some(state_trie) = self.state_trie(block_hash)? else {
1895            return Ok(None);
1896        };
1897        let Some(encoded_state) = state_trie.get(account_hash.as_bytes())? else {
1898            return Ok(None);
1899        };
1900        let account_state = AccountState::decode(&encoded_state)?;
1901        Ok(Some(account_state))
1902    }
1903
1904    pub async fn get_fork_id(&self) -> Result<ForkId, StoreError> {
1905        let chain_config = self.get_chain_config();
1906        let genesis_header = self
1907            .load_block_header(0)?
1908            .ok_or(StoreError::MissingEarliestBlockNumber)?;
1909        let block_header = self.latest_block_header.get();
1910
1911        Ok(ForkId::new(
1912            chain_config,
1913            genesis_header,
1914            block_header.timestamp,
1915            block_header.number,
1916        ))
1917    }
1918
1919    pub async fn get_code_by_account_address(
1920        &self,
1921        block_number: BlockNumber,
1922        address: Address,
1923    ) -> Result<Option<Code>, StoreError> {
1924        let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
1925            return Ok(None);
1926        };
1927        let Some(state_trie) = self.state_trie(block_hash)? else {
1928            return Ok(None);
1929        };
1930        let hashed_address = hash_address_fixed(&address);
1931        let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
1932            return Ok(None);
1933        };
1934        let account_state = AccountState::decode(&encoded_state)?;
1935        self.get_account_code(account_state.code_hash)
1936    }
1937
1938    pub async fn get_nonce_by_account_address(
1939        &self,
1940        block_number: BlockNumber,
1941        address: Address,
1942    ) -> Result<Option<u64>, StoreError> {
1943        let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
1944            return Ok(None);
1945        };
1946        let Some(state_trie) = self.state_trie(block_hash)? else {
1947            return Ok(None);
1948        };
1949        let hashed_address = hash_address_fixed(&address);
1950        let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
1951            return Ok(None);
1952        };
1953        let account_state = AccountState::decode(&encoded_state)?;
1954        Ok(Some(account_state.nonce))
1955    }
1956
1957    /// Applies account updates based on the block's latest storage state
1958    /// and returns the new state root after the updates have been applied.
1959    pub fn apply_account_updates_batch(
1960        &self,
1961        block_hash: BlockHash,
1962        account_updates: &[AccountUpdate],
1963    ) -> Result<Option<AccountUpdatesList>, StoreError> {
1964        let Some(mut state_trie) = self.state_trie(block_hash)? else {
1965            return Ok(None);
1966        };
1967
1968        Ok(Some(self.apply_account_updates_from_trie_batch(
1969            &mut state_trie,
1970            account_updates,
1971        )?))
1972    }
1973
1974    pub fn apply_account_updates_from_trie_batch<'a>(
1975        &self,
1976        state_trie: &mut Trie,
1977        account_updates: impl IntoIterator<Item = &'a AccountUpdate>,
1978    ) -> Result<AccountUpdatesList, StoreError> {
1979        let mut ret_storage_updates = Vec::new();
1980        let mut code_updates = Vec::new();
1981        let state_root = state_trie.hash_no_commit(&NativeCrypto);
1982        for update in account_updates {
1983            let hashed_address = hash_address_fixed(&update.address);
1984            if update.removed {
1985                // Remove account from trie
1986                state_trie.remove(hashed_address.as_bytes())?;
1987                continue;
1988            }
1989            // Add or update AccountState in the trie
1990            // Fetch current state or create a new state to be inserted
1991            let mut account_state = match state_trie.get(hashed_address.as_bytes())? {
1992                Some(encoded_state) => AccountState::decode(&encoded_state)?,
1993                None => AccountState::default(),
1994            };
1995            if update.removed_storage {
1996                account_state.storage_root = *EMPTY_TRIE_HASH;
1997            }
1998            if let Some(info) = &update.info {
1999                account_state.nonce = info.nonce;
2000                account_state.balance = info.balance;
2001                account_state.code_hash = info.code_hash;
2002                // Store updated code in DB
2003                if let Some(code) = &update.code {
2004                    code_updates.push((info.code_hash, code.clone()));
2005                }
2006            }
2007            // Store the added storage in the account's storage trie and compute its new root
2008            if !update.added_storage.is_empty() {
2009                let mut storage_trie =
2010                    self.open_storage_trie(hashed_address, state_root, account_state.storage_root)?;
2011                for (storage_key, storage_value) in &update.added_storage {
2012                    let hashed_key = hash_key(storage_key);
2013                    if storage_value.is_zero() {
2014                        storage_trie.remove(&hashed_key)?;
2015                    } else {
2016                        storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
2017                    }
2018                }
2019                let (storage_hash, storage_updates) =
2020                    storage_trie.collect_changes_since_last_hash(&NativeCrypto);
2021                account_state.storage_root = storage_hash;
2022                ret_storage_updates.push((hashed_address, storage_updates));
2023            }
2024            state_trie.insert(
2025                hashed_address.as_bytes().to_vec(),
2026                account_state.encode_to_vec(),
2027            )?;
2028        }
2029        let (state_trie_hash, state_updates) =
2030            state_trie.collect_changes_since_last_hash(&NativeCrypto);
2031
2032        Ok(AccountUpdatesList {
2033            state_trie_hash,
2034            state_updates,
2035            storage_updates: ret_storage_updates,
2036            code_updates,
2037        })
2038    }
2039
2040    /// Performs the same actions as apply_account_updates_from_trie
2041    ///  but also returns the used storage tries with witness recorded
2042    pub fn apply_account_updates_from_trie_with_witness(
2043        &self,
2044        mut state_trie: Trie,
2045        account_updates: &[AccountUpdate],
2046        mut storage_tries: StorageTries,
2047    ) -> Result<(StorageTries, AccountUpdatesList), StoreError> {
2048        let mut ret_storage_updates = Vec::new();
2049
2050        let mut code_updates = Vec::new();
2051
2052        let state_root = state_trie.hash_no_commit(&NativeCrypto);
2053
2054        for update in account_updates.iter() {
2055            let hashed_address = hash_address(&update.address);
2056
2057            if update.removed {
2058                // Remove account from trie
2059                state_trie.remove(&hashed_address)?;
2060
2061                continue;
2062            }
2063
2064            // Add or update AccountState in the trie
2065            // Fetch current state or create a new state to be inserted
2066            let mut account_state = match state_trie.get(&hashed_address)? {
2067                Some(encoded_state) => AccountState::decode(&encoded_state)?,
2068                None => AccountState::default(),
2069            };
2070
2071            if update.removed_storage {
2072                account_state.storage_root = *EMPTY_TRIE_HASH;
2073            }
2074
2075            if let Some(info) = &update.info {
2076                account_state.nonce = info.nonce;
2077
2078                account_state.balance = info.balance;
2079
2080                account_state.code_hash = info.code_hash;
2081
2082                // Store updated code in DB
2083                if let Some(code) = &update.code {
2084                    code_updates.push((info.code_hash, code.clone()));
2085                }
2086            }
2087
2088            // Store the added storage in the account's storage trie and compute its new root
2089            if !update.added_storage.is_empty() {
2090                let (_witness, storage_trie) = match storage_tries.entry(update.address) {
2091                    Entry::Occupied(value) => value.into_mut(),
2092                    Entry::Vacant(vacant) => {
2093                        let trie = self.open_storage_trie(
2094                            H256::from_slice(&hashed_address),
2095                            state_root,
2096                            account_state.storage_root,
2097                        )?;
2098                        vacant.insert(TrieLogger::open_trie(trie))
2099                    }
2100                };
2101
2102                for (storage_key, storage_value) in &update.added_storage {
2103                    let hashed_key = hash_key(storage_key);
2104
2105                    if storage_value.is_zero() {
2106                        storage_trie.remove(&hashed_key)?;
2107                    } else {
2108                        storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
2109                    }
2110                }
2111
2112                let (storage_hash, storage_updates) =
2113                    storage_trie.collect_changes_since_last_hash(&NativeCrypto);
2114
2115                account_state.storage_root = storage_hash;
2116
2117                ret_storage_updates.push((H256::from_slice(&hashed_address), storage_updates));
2118            }
2119
2120            state_trie.insert(hashed_address, account_state.encode_to_vec())?;
2121        }
2122
2123        let (state_trie_hash, state_updates) =
2124            state_trie.collect_changes_since_last_hash(&NativeCrypto);
2125
2126        let account_updates_list = AccountUpdatesList {
2127            state_trie_hash,
2128            state_updates,
2129            storage_updates: ret_storage_updates,
2130            code_updates,
2131        };
2132
2133        Ok((storage_tries, account_updates_list))
2134    }
2135
2136    /// Adds all genesis accounts and returns the genesis block's state_root
2137    pub async fn setup_genesis_state_trie(
2138        &self,
2139        genesis_accounts: BTreeMap<Address, GenesisAccount>,
2140    ) -> Result<H256, StoreError> {
2141        let mut storage_trie_nodes = vec![];
2142        let mut genesis_state_trie = self.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
2143        for (address, account) in genesis_accounts {
2144            let hashed_address = hash_address(&address);
2145            let h256_hashed_address = H256::from_slice(&hashed_address);
2146
2147            // Store account code (as this won't be stored in the trie)
2148            let code = Code::from_bytecode(account.code, &NativeCrypto);
2149            let code_hash = code.hash;
2150            self.add_account_code(code).await?;
2151
2152            // Store the account's storage in a clean storage trie and compute its root
2153            let mut storage_trie =
2154                self.open_direct_storage_trie(h256_hashed_address, *EMPTY_TRIE_HASH)?;
2155            for (storage_key, storage_value) in account.storage {
2156                if !storage_value.is_zero() {
2157                    let hashed_key = hash_key(&H256(storage_key.to_big_endian()));
2158                    storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
2159                }
2160            }
2161
2162            let (storage_root, storage_nodes) =
2163                storage_trie.collect_changes_since_last_hash(&NativeCrypto);
2164
2165            storage_trie_nodes.extend(
2166                storage_nodes
2167                    .into_iter()
2168                    .map(|(path, n)| (apply_prefix(Some(h256_hashed_address), path).into_vec(), n)),
2169            );
2170
2171            // Add account to trie
2172            let account_state = AccountState {
2173                nonce: account.nonce,
2174                balance: account.balance,
2175                storage_root,
2176                code_hash,
2177            };
2178            genesis_state_trie.insert(hashed_address, account_state.encode_to_vec())?;
2179        }
2180
2181        let (state_root, account_trie_nodes) =
2182            genesis_state_trie.collect_changes_since_last_hash(&NativeCrypto);
2183        let account_trie_nodes = account_trie_nodes
2184            .into_iter()
2185            .map(|(path, n)| (apply_prefix(None, path).into_vec(), n))
2186            .collect::<Vec<_>>();
2187
2188        let mut tx = self.backend.begin_write()?;
2189        tx.put_batch(ACCOUNT_TRIE_NODES, account_trie_nodes)?;
2190        tx.put_batch(STORAGE_TRIE_NODES, storage_trie_nodes)?;
2191        tx.commit()?;
2192
2193        Ok(state_root)
2194    }
2195
2196    // Key format: block_number (8 bytes, big-endian) + block_hash (32 bytes)
2197    fn make_witness_key(block_number: u64, block_hash: &BlockHash) -> Vec<u8> {
2198        let mut composite_key = Vec::with_capacity(8 + 32);
2199        composite_key.extend_from_slice(&block_number.to_be_bytes());
2200        composite_key.extend_from_slice(block_hash.as_bytes());
2201        composite_key
2202    }
2203
2204    /// Stores a pre-serialized execution witness for a block.
2205    ///
2206    /// The witness is converted to RPC format (RpcExecutionWitness) before storage
2207    /// to avoid expensive `encode_subtrie` traversal on every read. This pre-computes
2208    /// the serialization at write time instead of read time.
2209    pub fn store_witness(
2210        &self,
2211        block_hash: BlockHash,
2212        block_number: u64,
2213        witness: ExecutionWitness,
2214    ) -> Result<(), StoreError> {
2215        // Convert to RPC format once at storage time
2216        let rpc_witness = RpcExecutionWitness::try_from(witness)?;
2217        let key = Self::make_witness_key(block_number, &block_hash);
2218        let value = serde_json::to_vec(&rpc_witness)?;
2219        self.write(EXECUTION_WITNESSES, key, value)?;
2220        // Clean up old witnesses (keep only last 128)
2221        self.cleanup_old_witnesses(block_number)
2222    }
2223
2224    fn cleanup_old_witnesses(&self, latest_block_number: u64) -> Result<(), StoreError> {
2225        // If we have less than 128 blocks, no cleanup needed
2226        if latest_block_number <= MAX_WITNESSES {
2227            return Ok(());
2228        }
2229
2230        let threshold = latest_block_number - MAX_WITNESSES;
2231
2232        if let Some(oldest_block_number) = self.get_oldest_witness_number()? {
2233            let prefix = oldest_block_number.to_be_bytes();
2234            let mut to_delete = Vec::new();
2235
2236            {
2237                let read_txn = self.backend.begin_read()?;
2238                let iter = read_txn.prefix_iterator(EXECUTION_WITNESSES, &prefix)?;
2239
2240                // We may have multiple witnesses for the same block number (forks)
2241                for item in iter {
2242                    let (key, _value) = item?;
2243                    let mut block_number_bytes = [0u8; 8];
2244                    block_number_bytes.copy_from_slice(&key[0..8]);
2245                    let block_number = u64::from_be_bytes(block_number_bytes);
2246                    if block_number > threshold {
2247                        break;
2248                    }
2249                    to_delete.push(key.to_vec());
2250                }
2251            }
2252
2253            for key in to_delete {
2254                self.delete(EXECUTION_WITNESSES, key)?;
2255            }
2256        };
2257
2258        self.update_oldest_witness_number(threshold + 1)?;
2259
2260        Ok(())
2261    }
2262
2263    fn update_oldest_witness_number(&self, oldest_block_number: u64) -> Result<(), StoreError> {
2264        self.write(
2265            MISC_VALUES,
2266            b"oldest_witness_block_number".to_vec(),
2267            oldest_block_number.to_le_bytes().to_vec(),
2268        )?;
2269        Ok(())
2270    }
2271
2272    fn get_oldest_witness_number(&self) -> Result<Option<u64>, StoreError> {
2273        let Some(value) = self.read(MISC_VALUES, b"oldest_witness_block_number".to_vec())? else {
2274            return Ok(None);
2275        };
2276
2277        let array: [u8; 8] = value.as_slice().try_into().map_err(|_| {
2278            StoreError::Custom("Invalid oldest witness block number bytes".to_string())
2279        })?;
2280        Ok(Some(u64::from_le_bytes(array)))
2281    }
2282
2283    /// Returns the raw JSON bytes of a cached witness for a block.
2284    ///
2285    /// This is the most efficient method for the RPC handler since it avoids
2286    /// deserialization and re-serialization. The bytes can be parsed directly
2287    /// as a JSON Value for the RPC response.
2288    pub fn get_witness_json_bytes(
2289        &self,
2290        block_number: u64,
2291        block_hash: BlockHash,
2292    ) -> Result<Option<Vec<u8>>, StoreError> {
2293        let key = Self::make_witness_key(block_number, &block_hash);
2294        self.read(EXECUTION_WITNESSES, key)
2295    }
2296
2297    /// Returns the deserialized RpcExecutionWitness for a block.
2298    ///
2299    /// Prefer `get_witness_json_bytes` when you need to return the witness
2300    /// as JSON (e.g., for RPC responses) to avoid re-serialization.
2301    pub fn get_witness_by_number_and_hash(
2302        &self,
2303        block_number: u64,
2304        block_hash: BlockHash,
2305    ) -> Result<Option<RpcExecutionWitness>, StoreError> {
2306        let key = Self::make_witness_key(block_number, &block_hash);
2307        match self.read(EXECUTION_WITNESSES, key)? {
2308            Some(value) => {
2309                let witness: RpcExecutionWitness = serde_json::from_slice(&value)?;
2310                Ok(Some(witness))
2311            }
2312            None => Ok(None),
2313        }
2314    }
2315
2316    /// Stores a block access list for a given block hash.
2317    pub fn store_block_access_list(
2318        &self,
2319        block_hash: BlockHash,
2320        bal: &BlockAccessList,
2321    ) -> Result<(), StoreError> {
2322        let key = block_hash.as_bytes().to_vec();
2323        let mut value = vec![];
2324        bal.encode(&mut value);
2325        self.write(BLOCK_ACCESS_LISTS, key, value)
2326    }
2327
2328    /// Returns the block access list for a given block hash, if stored.
2329    pub fn get_block_access_list(
2330        &self,
2331        block_hash: BlockHash,
2332    ) -> Result<Option<BlockAccessList>, StoreError> {
2333        let key = block_hash.as_bytes().to_vec();
2334        match self.read(BLOCK_ACCESS_LISTS, key)? {
2335            Some(value) => {
2336                let bal = BlockAccessList::decode(&value)
2337                    .map_err(|e| StoreError::Custom(format!("Failed to decode BAL: {e}")))?;
2338                Ok(Some(bal))
2339            }
2340            None => Ok(None),
2341        }
2342    }
2343
2344    pub async fn add_initial_state(&mut self, genesis: Genesis) -> Result<(), StoreError> {
2345        self.add_initial_state_inner(genesis, false).await
2346    }
2347
2348    /// Like [`Store::add_initial_state`], but trusts a pre-existing datadir's
2349    /// state instead of validating it against the provided genesis. If a genesis
2350    /// header is already stored, it is kept as-is rather than recomputing the
2351    /// genesis state root from `genesis.alloc` and rejecting on mismatch. The
2352    /// chain config from the genesis file is still applied either way.
2353    ///
2354    /// Intended for booting a datadir produced out-of-band (e.g. by a state
2355    /// generator that writes the state trie directly and emits a genesis file
2356    /// with an empty `alloc`), where the operator vouches for the stored state
2357    /// root. Has no effect on a fresh datadir: the genesis is built normally.
2358    pub async fn add_initial_state_skip_validation(
2359        &mut self,
2360        genesis: Genesis,
2361    ) -> Result<(), StoreError> {
2362        self.add_initial_state_inner(genesis, true).await
2363    }
2364
2365    async fn add_initial_state_inner(
2366        &mut self,
2367        genesis: Genesis,
2368        skip_genesis_validation: bool,
2369    ) -> Result<(), StoreError> {
2370        debug!("Storing initial state from genesis");
2371
2372        // Obtain genesis block
2373        let genesis_block = genesis.get_block();
2374        let genesis_block_number = genesis_block.header.number;
2375
2376        let genesis_hash = genesis_block.hash();
2377
2378        let stored_genesis_header = self.load_block_header(genesis_block_number)?;
2379
2380        // Always set the chain config from the genesis file. The in-memory
2381        // `chain_config` starts at `Default::default()` on every boot and is
2382        // not reloaded from the datadir, so skipping this would leave the store
2383        // with the wrong chainId and an empty fork schedule. Skip-validation
2384        // only waives the genesis state-root/header check; the `config` section
2385        // of the genesis file is still authoritative and must be applied.
2386        self.set_chain_config(&genesis.config).await?;
2387
2388        // The cache can't be empty
2389        if let Some(number) = self.load_latest_block_number().await? {
2390            let latest_block_header = self
2391                .load_block_header(number)?
2392                .ok_or_else(|| StoreError::MissingLatestBlockNumber)?;
2393            self.latest_block_header.update(latest_block_header);
2394        }
2395
2396        match stored_genesis_header {
2397            Some(header) if skip_genesis_validation => {
2398                info!(
2399                    stored_genesis = %header.hash(),
2400                    "Skipping genesis state validation; trusting the genesis header and state already stored in the datadir"
2401                );
2402                return Ok(());
2403            }
2404            Some(header) if header.hash() == genesis_hash => {
2405                info!("Received genesis file matching a previously stored one, nothing to do");
2406                return Ok(());
2407            }
2408            Some(_) => {
2409                error!(
2410                    "The chain configuration stored in the database is incompatible with the provided configuration. If you intended to switch networks, choose another datadir or clear the database (e.g., run `ethrex removedb`) and try again."
2411                );
2412                return Err(StoreError::IncompatibleChainConfig);
2413            }
2414            None => {
2415                self.add_block_header(genesis_hash, genesis_block.header.clone())
2416                    .await?
2417            }
2418        }
2419        // Store genesis accounts
2420        // TODO: Should we use this root instead of computing it before the block hash check?
2421        let genesis_state_root = self.setup_genesis_state_trie(genesis.alloc).await?;
2422        debug_assert_eq!(genesis_state_root, genesis_block.header.state_root);
2423
2424        // Store genesis block
2425        info!(hash = %genesis_hash, "Storing genesis block");
2426
2427        self.add_block(genesis_block).await?;
2428        self.update_earliest_block_number(genesis_block_number)
2429            .await?;
2430        self.forkchoice_update(vec![], genesis_block_number, genesis_hash, None, None)
2431            .await?;
2432        Ok(())
2433    }
2434
2435    pub async fn load_initial_state(&self) -> Result<(), StoreError> {
2436        info!("Loading initial state from DB");
2437        let Some(number) = self.load_latest_block_number().await? else {
2438            return Err(StoreError::MissingLatestBlockNumber);
2439        };
2440        let latest_block_header = self
2441            .load_block_header(number)?
2442            .ok_or_else(|| StoreError::Custom("latest block header is missing".to_string()))?;
2443        self.latest_block_header.update(latest_block_header);
2444        Ok(())
2445    }
2446
2447    pub fn get_storage_at(
2448        &self,
2449        block_number: BlockNumber,
2450        address: Address,
2451        storage_key: H256,
2452    ) -> Result<Option<U256>, StoreError> {
2453        match self.get_block_header(block_number)? {
2454            Some(header) => self.get_storage_at_root(header.state_root, address, storage_key),
2455            None => Ok(None),
2456        }
2457    }
2458
2459    pub fn get_storage_at_root(
2460        &self,
2461        state_root: H256,
2462        address: Address,
2463        storage_key: H256,
2464    ) -> Result<Option<U256>, StoreError> {
2465        let account_hash = hash_address_fixed(&address);
2466
2467        // Pre-acquire shared resources once for both trie opens
2468        let read_view = self.backend.begin_read()?;
2469        let cache = self
2470            .trie_cache
2471            .read()
2472            .map_err(|_| StoreError::LockError)?
2473            .clone();
2474        let last_written = self.last_written()?;
2475        let use_fkv = Self::flatkeyvalue_computed_with_last_written(account_hash, &last_written);
2476
2477        let storage_root = if use_fkv {
2478            // We will use FKVs, we don't need the root
2479            *EMPTY_TRIE_HASH
2480        } else {
2481            let state_trie = self.open_state_trie_shared(
2482                state_root,
2483                read_view.clone(),
2484                cache.clone(),
2485                last_written.clone(),
2486            )?;
2487            let Some(encoded_account) = state_trie.get(account_hash.as_bytes())? else {
2488                return Ok(None);
2489            };
2490            let account = AccountState::decode(&encoded_account)?;
2491            account.storage_root
2492        };
2493        let storage_trie = self.open_storage_trie_shared(
2494            account_hash,
2495            state_root,
2496            storage_root,
2497            read_view,
2498            cache,
2499            last_written,
2500        )?;
2501
2502        let hashed_key = hash_key_fixed(&storage_key);
2503        storage_trie
2504            .get(&hashed_key)?
2505            .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode))
2506            .transpose()
2507    }
2508
2509    /// Gets storage value when the account hash and storage root are already known.
2510    ///
2511    /// This skips the state-trie account lookup and account RLP decode done by
2512    /// [`Self::get_storage_at_root`], and directly opens the account storage trie.
2513    pub fn get_storage_at_root_with_known_storage_root(
2514        &self,
2515        state_root: H256,
2516        account_hash: H256,
2517        storage_root: H256,
2518        storage_key: H256,
2519    ) -> Result<Option<U256>, StoreError> {
2520        let read_view = self.backend.begin_read()?;
2521        let cache = self
2522            .trie_cache
2523            .read()
2524            .map_err(|_| StoreError::LockError)?
2525            .clone();
2526        let last_written = self.last_written()?;
2527        // When FKV is active the real storage root is in the flatkeyvalue store,
2528        // not in the account's RLP-encoded storage_root field. Use EMPTY_TRIE_HASH
2529        // so open_storage_trie_shared falls through to the FKV path.
2530        let storage_root =
2531            if Self::flatkeyvalue_computed_with_last_written(account_hash, &last_written) {
2532                *EMPTY_TRIE_HASH
2533            } else {
2534                storage_root
2535            };
2536        let storage_trie = self.open_storage_trie_shared(
2537            account_hash,
2538            state_root,
2539            storage_root,
2540            read_view,
2541            cache,
2542            last_written,
2543        )?;
2544
2545        let hashed_key = hash_key_fixed(&storage_key);
2546        storage_trie
2547            .get(&hashed_key)?
2548            .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode))
2549            .transpose()
2550    }
2551
2552    pub fn get_chain_config(&self) -> ChainConfig {
2553        self.chain_config
2554    }
2555
2556    pub async fn get_latest_canonical_block_hash(&self) -> Result<Option<BlockHash>, StoreError> {
2557        Ok(Some(self.latest_block_header.get().hash()))
2558    }
2559
2560    /// Updates the canonical chain.
2561    /// Inserts new canonical blocks, removes blocks beyond the new head,
2562    /// and updates the head, safe, and finalized block pointers.
2563    /// All operations are performed in a single database transaction.
2564    pub async fn forkchoice_update(
2565        &self,
2566        new_canonical_blocks: Vec<(BlockNumber, BlockHash)>,
2567        head_number: BlockNumber,
2568        head_hash: BlockHash,
2569        safe: Option<BlockNumber>,
2570        finalized: Option<BlockNumber>,
2571    ) -> Result<(), StoreError> {
2572        // Serialize concurrent forkchoice updates. Without this, two callers
2573        // could interleave their `latest_block_header` cache updates with each
2574        // other's DB writes, leaving the cache inconsistent with the DB or
2575        // letting a later caller's write reorder relative to the cache update
2576        // order (see the TOCTOU discussion around canonical/latest drift).
2577        let _guard = self.fcu_lock.lock().await;
2578
2579        // Updates first the latest_block_header to avoid nonce inconsistencies #3927.
2580        // Snapshot the previous header so we can roll the cache back if the DB
2581        // write fails — otherwise the cache would point at a block the DB does
2582        // not consider canonical.
2583        let previous_head = self.latest_block_header.get();
2584        let new_head = self
2585            .load_block_header_by_hash(head_hash)?
2586            .ok_or_else(|| StoreError::MissingLatestBlockNumber)?;
2587        self.latest_block_header.update(new_head);
2588        if let Err(err) = self
2589            .forkchoice_update_inner(
2590                new_canonical_blocks,
2591                head_number,
2592                head_hash,
2593                safe,
2594                finalized,
2595            )
2596            .await
2597        {
2598            self.latest_block_header.update((*previous_head).clone());
2599            return Err(err);
2600        }
2601
2602        Ok(())
2603    }
2604
2605    /// Obtain the storage trie for the given block
2606    pub fn state_trie(&self, block_hash: BlockHash) -> Result<Option<Trie>, StoreError> {
2607        let Some(header) = self.get_block_header_by_hash(block_hash)? else {
2608            return Ok(None);
2609        };
2610        Ok(Some(self.open_state_trie(header.state_root)?))
2611    }
2612
2613    /// Obtain the storage trie for the given account on the given block
2614    pub fn storage_trie(
2615        &self,
2616        block_hash: BlockHash,
2617        address: Address,
2618    ) -> Result<Option<Trie>, StoreError> {
2619        let Some(header) = self.get_block_header_by_hash(block_hash)? else {
2620            return Ok(None);
2621        };
2622        // Fetch Account from state_trie
2623        let Some(state_trie) = self.state_trie(block_hash)? else {
2624            return Ok(None);
2625        };
2626        let hashed_address = hash_address_fixed(&address);
2627        let Some(encoded_account) = state_trie.get(hashed_address.as_bytes())? else {
2628            return Ok(None);
2629        };
2630        let account = AccountState::decode(&encoded_account)?;
2631        // Open storage_trie
2632        let storage_root = account.storage_root;
2633        Ok(Some(self.open_storage_trie(
2634            hashed_address,
2635            header.state_root,
2636            storage_root,
2637        )?))
2638    }
2639
2640    pub async fn get_account_state(
2641        &self,
2642        block_number: BlockNumber,
2643        address: Address,
2644    ) -> Result<Option<AccountState>, StoreError> {
2645        let Some(block_hash) = self.get_canonical_block_hash(block_number).await? else {
2646            return Ok(None);
2647        };
2648        let Some(state_trie) = self.state_trie(block_hash)? else {
2649            return Ok(None);
2650        };
2651        self.get_account_state_from_trie(&state_trie, address)
2652    }
2653
2654    pub fn get_account_state_by_root(
2655        &self,
2656        state_root: H256,
2657        address: Address,
2658    ) -> Result<Option<AccountState>, StoreError> {
2659        let state_trie = self.open_state_trie(state_root)?;
2660        self.get_account_state_from_trie(&state_trie, address)
2661    }
2662
2663    pub fn get_account_state_from_trie(
2664        &self,
2665        state_trie: &Trie,
2666        address: Address,
2667    ) -> Result<Option<AccountState>, StoreError> {
2668        let hashed_address = hash_address_fixed(&address);
2669        let Some(encoded_state) = state_trie.get(hashed_address.as_bytes())? else {
2670            return Ok(None);
2671        };
2672        Ok(Some(AccountState::decode(&encoded_state)?))
2673    }
2674
2675    /// Constructs a merkle proof for the given account address against a given state.
2676    /// If storage_keys are provided, also constructs the storage proofs for those keys.
2677    ///
2678    /// Returns `None` if the state trie is missing, otherwise returns the proof.
2679    pub async fn get_account_proof(
2680        &self,
2681        state_root: H256,
2682        address: Address,
2683        storage_keys: &[H256],
2684    ) -> Result<Option<AccountProof>, StoreError> {
2685        // TODO: check state root
2686        // let Some(state_trie) = self.open_state_trie(state_trie)? else {
2687        //     return Ok(None);
2688        // };
2689        let state_trie = self.open_state_trie(state_root)?;
2690        let address_path = hash_address_fixed(&address);
2691        let proof = state_trie.get_proof(address_path.as_bytes())?;
2692        let account_opt = state_trie
2693            .get(address_path.as_bytes())?
2694            .map(|encoded_state| AccountState::decode(&encoded_state))
2695            .transpose()?;
2696
2697        let mut storage_proof = Vec::with_capacity(storage_keys.len());
2698
2699        if let Some(account) = &account_opt {
2700            let storage_trie =
2701                self.open_storage_trie(address_path, state_root, account.storage_root)?;
2702
2703            for key in storage_keys {
2704                let hashed_key = hash_key(key);
2705                let proof = storage_trie.get_proof(&hashed_key)?;
2706                let value = storage_trie
2707                    .get(&hashed_key)?
2708                    .map(|rlp| U256::decode(&rlp).map_err(StoreError::RLPDecode))
2709                    .transpose()?
2710                    .unwrap_or_default();
2711
2712                let slot_proof = StorageSlotProof {
2713                    proof,
2714                    key: *key,
2715                    value,
2716                };
2717                storage_proof.push(slot_proof);
2718            }
2719        } else {
2720            storage_proof.extend(storage_keys.iter().map(|key| StorageSlotProof {
2721                proof: Vec::new(),
2722                key: *key,
2723                value: U256::zero(),
2724            }));
2725        }
2726        let account = account_opt.unwrap_or_default();
2727        let account_proof = AccountProof {
2728            proof,
2729            account,
2730            storage_proof,
2731        };
2732        Ok(Some(account_proof))
2733    }
2734
2735    // Returns an iterator across all accounts in the state trie given by the state_root
2736    // Does not check that the state_root is valid
2737    pub fn iter_accounts_from(
2738        &self,
2739        state_root: H256,
2740        starting_address: H256,
2741    ) -> Result<impl Iterator<Item = (H256, AccountState)>, StoreError> {
2742        let mut iter = self.open_locked_state_trie(state_root)?.into_iter();
2743        iter.advance(starting_address.0.to_vec())?;
2744        Ok(iter.content().map_while(|(path, value)| {
2745            Some((H256::from_slice(&path), AccountState::decode(&value).ok()?))
2746        }))
2747    }
2748
2749    // Returns an iterator across all accounts in the state trie given by the state_root
2750    // Does not check that the state_root is valid
2751    pub fn iter_accounts(
2752        &self,
2753        state_root: H256,
2754    ) -> Result<impl Iterator<Item = (H256, AccountState)>, StoreError> {
2755        self.iter_accounts_from(state_root, H256::zero())
2756    }
2757
2758    // Returns an iterator across all accounts in the state trie given by the state_root
2759    // Does not check that the state_root is valid
2760    pub fn iter_storage_from(
2761        &self,
2762        state_root: H256,
2763        hashed_address: H256,
2764        starting_slot: H256,
2765    ) -> Result<Option<impl Iterator<Item = (H256, U256)>>, StoreError> {
2766        let state_trie = self.open_locked_state_trie(state_root)?;
2767        let Some(account_rlp) = state_trie.get(hashed_address.as_bytes())? else {
2768            return Ok(None);
2769        };
2770        let storage_root = AccountState::decode(&account_rlp)?.storage_root;
2771        let mut iter = self
2772            .open_locked_storage_trie(hashed_address, state_root, storage_root)?
2773            .into_iter();
2774        iter.advance(starting_slot.0.to_vec())?;
2775        Ok(Some(iter.content().map_while(|(path, value)| {
2776            Some((H256::from_slice(&path), U256::decode(&value).ok()?))
2777        })))
2778    }
2779
2780    // Returns an iterator across all accounts in the state trie given by the state_root
2781    // Does not check that the state_root is valid
2782    pub fn iter_storage(
2783        &self,
2784        state_root: H256,
2785        hashed_address: H256,
2786    ) -> Result<Option<impl Iterator<Item = (H256, U256)>>, StoreError> {
2787        self.iter_storage_from(state_root, hashed_address, H256::zero())
2788    }
2789
2790    pub fn get_account_range_proof(
2791        &self,
2792        state_root: H256,
2793        starting_hash: H256,
2794        last_hash: Option<H256>,
2795    ) -> Result<Vec<Vec<u8>>, StoreError> {
2796        let state_trie = self.open_state_trie(state_root)?;
2797        let mut proof = state_trie.get_proof(starting_hash.as_bytes())?;
2798        if let Some(last_hash) = last_hash {
2799            proof.extend_from_slice(&state_trie.get_proof(last_hash.as_bytes())?);
2800        }
2801        Ok(proof)
2802    }
2803
2804    pub fn get_storage_range_proof(
2805        &self,
2806        state_root: H256,
2807        hashed_address: H256,
2808        starting_hash: H256,
2809        last_hash: Option<H256>,
2810    ) -> Result<Option<Vec<Vec<u8>>>, StoreError> {
2811        let state_trie = self.open_state_trie(state_root)?;
2812        let Some(account_rlp) = state_trie.get(hashed_address.as_bytes())? else {
2813            return Ok(None);
2814        };
2815        let storage_root = AccountState::decode(&account_rlp)?.storage_root;
2816        let storage_trie = self.open_storage_trie(hashed_address, state_root, storage_root)?;
2817        let mut proof = storage_trie.get_proof(starting_hash.as_bytes())?;
2818        if let Some(last_hash) = last_hash {
2819            proof.extend_from_slice(&storage_trie.get_proof(last_hash.as_bytes())?);
2820        }
2821        Ok(Some(proof))
2822    }
2823
2824    /// Receives the root of the state trie and a list of paths where the first path will correspond to a path in the state trie
2825    /// (aka a hashed account address) and the following paths will be paths in the account's storage trie (aka hashed storage keys)
2826    /// If only one hash (account) is received, then the state trie node containing the account will be returned.
2827    /// If more than one hash is received, then the storage trie nodes where each storage key is stored will be returned
2828    /// For more information check out snap capability message [`GetTrieNodes`](https://github.com/ethereum/devp2p/blob/master/caps/snap.md#gettrienodes-0x06)
2829    /// The paths can be either full paths (hash) or partial paths (compact-encoded nibbles), if a partial path is given for the account this method will not return storage nodes for it
2830    pub fn get_trie_nodes(
2831        &self,
2832        state_root: H256,
2833        paths: Vec<Vec<u8>>,
2834        byte_limit: u64,
2835    ) -> Result<Vec<Vec<u8>>, StoreError> {
2836        let Some(account_path) = paths.first() else {
2837            return Ok(vec![]);
2838        };
2839        let state_trie = self.open_state_trie(state_root)?;
2840        // State Trie Nodes Request
2841        if paths.len() == 1 {
2842            // Fetch state trie node
2843            let node = state_trie.get_node(account_path)?;
2844            return Ok(vec![node]);
2845        }
2846        // Storage Trie Nodes Request
2847        let Some(account_state) = state_trie
2848            .get(account_path)?
2849            .map(|ref rlp| AccountState::decode(rlp))
2850            .transpose()?
2851        else {
2852            return Ok(vec![]);
2853        };
2854        // We can't access the storage trie without the account's address hash
2855        let Ok(hashed_address) = account_path.clone().try_into().map(H256) else {
2856            return Ok(vec![]);
2857        };
2858        let storage_trie =
2859            self.open_storage_trie(hashed_address, state_root, account_state.storage_root)?;
2860        // Fetch storage trie nodes
2861        let mut nodes = vec![];
2862        let mut bytes_used = 0;
2863        for path in paths.iter().skip(1) {
2864            if bytes_used >= byte_limit {
2865                break;
2866            }
2867            let node = storage_trie.get_node(path)?;
2868            bytes_used += node.len() as u64;
2869            nodes.push(node);
2870        }
2871        Ok(nodes)
2872    }
2873
2874    /// Creates a new state trie with an empty state root, for testing purposes only
2875    pub fn new_state_trie_for_test(&self) -> Result<Trie, StoreError> {
2876        self.open_state_trie(*EMPTY_TRIE_HASH)
2877    }
2878
2879    // Methods exclusive for trie management during snap-syncing
2880
2881    /// Obtain a state trie from the given state root
2882    /// Doesn't check if the state root is valid
2883    /// Used for internal store operations
2884    pub fn open_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
2885        let trie_db = TrieWrapper::new(
2886            state_root,
2887            self.trie_cache
2888                .read()
2889                .map_err(|_| StoreError::LockError)?
2890                .clone(),
2891            Box::new(BackendTrieDB::new_for_accounts(
2892                self.backend.clone(),
2893                self.last_written()?,
2894            )?),
2895            None,
2896        );
2897        Ok(Trie::open(Box::new(trie_db), state_root))
2898    }
2899
2900    /// Obtain a state trie from the given state root
2901    /// Doesn't check if the state root is valid
2902    /// Used for internal store operations
2903    pub fn open_direct_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
2904        Ok(Trie::open(
2905            Box::new(BackendTrieDB::new_for_accounts(
2906                self.backend.clone(),
2907                self.last_written()?,
2908            )?),
2909            state_root,
2910        ))
2911    }
2912
2913    /// Obtain a state trie locked for reads from the given state root
2914    /// Doesn't check if the state root is valid
2915    /// Used for internal store operations
2916    pub fn open_locked_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
2917        let trie_db = TrieWrapper::new(
2918            state_root,
2919            self.trie_cache
2920                .read()
2921                .map_err(|_| StoreError::LockError)?
2922                .clone(),
2923            Box::new(state_trie_locked_backend(
2924                self.backend.as_ref(),
2925                self.last_written()?,
2926            )?),
2927            None,
2928        );
2929        Ok(Trie::open(Box::new(trie_db), state_root))
2930    }
2931
2932    /// Obtain a storage trie from the given address and storage_root.
2933    /// Doesn't check if the account is stored
2934    pub fn open_storage_trie(
2935        &self,
2936        account_hash: H256,
2937        state_root: H256,
2938        storage_root: H256,
2939    ) -> Result<Trie, StoreError> {
2940        let trie_db = TrieWrapper::new(
2941            state_root,
2942            self.trie_cache
2943                .read()
2944                .map_err(|_| StoreError::LockError)?
2945                .clone(),
2946            Box::new(BackendTrieDB::new_for_storages(
2947                self.backend.clone(),
2948                self.last_written()?,
2949            )?),
2950            Some(account_hash),
2951        );
2952        Ok(Trie::open(Box::new(trie_db), storage_root))
2953    }
2954
2955    /// Open a state trie using pre-acquired shared resources.
2956    /// Avoids redundant RwLock acquisitions when multiple tries are opened
2957    /// in the same operation (e.g., state trie + storage trie in get_storage_at_root).
2958    fn open_state_trie_shared(
2959        &self,
2960        state_root: H256,
2961        read_view: Arc<dyn StorageReadView>,
2962        cache: Arc<TrieLayerCache>,
2963        last_written: Vec<u8>,
2964    ) -> Result<Trie, StoreError> {
2965        let trie_db = TrieWrapper::new(
2966            state_root,
2967            cache,
2968            Box::new(BackendTrieDB::new_for_accounts_with_view(
2969                self.backend.clone(),
2970                read_view,
2971                last_written,
2972            )?),
2973            None,
2974        );
2975        Ok(Trie::open(Box::new(trie_db), state_root))
2976    }
2977
2978    /// Open a storage trie using pre-acquired shared resources.
2979    fn open_storage_trie_shared(
2980        &self,
2981        account_hash: H256,
2982        state_root: H256,
2983        storage_root: H256,
2984        read_view: Arc<dyn StorageReadView>,
2985        cache: Arc<TrieLayerCache>,
2986        last_written: Vec<u8>,
2987    ) -> Result<Trie, StoreError> {
2988        let trie_db = TrieWrapper::new(
2989            state_root,
2990            cache,
2991            Box::new(BackendTrieDB::new_for_storages_with_view(
2992                self.backend.clone(),
2993                read_view,
2994                last_written,
2995            )?),
2996            Some(account_hash),
2997        );
2998        Ok(Trie::open(Box::new(trie_db), storage_root))
2999    }
3000
3001    /// Obtain a storage trie from the given address and storage_root.
3002    /// Doesn't check if the account is stored
3003    pub fn open_direct_storage_trie(
3004        &self,
3005        account_hash: H256,
3006        storage_root: H256,
3007    ) -> Result<Trie, StoreError> {
3008        Ok(Trie::open(
3009            Box::new(BackendTrieDB::new_for_account_storage(
3010                self.backend.clone(),
3011                account_hash,
3012                self.last_written()?,
3013            )?),
3014            storage_root,
3015        ))
3016    }
3017
3018    /// Obtain a read-locked storage trie from the given address and storage_root.
3019    /// Doesn't check if the account is stored
3020    pub fn open_locked_storage_trie(
3021        &self,
3022        account_hash: H256,
3023        state_root: H256,
3024        storage_root: H256,
3025    ) -> Result<Trie, StoreError> {
3026        let trie_db = TrieWrapper::new(
3027            state_root,
3028            self.trie_cache
3029                .read()
3030                .map_err(|_| StoreError::LockError)?
3031                .clone(),
3032            Box::new(state_trie_locked_backend(
3033                self.backend.as_ref(),
3034                self.last_written()?,
3035            )?),
3036            Some(account_hash),
3037        );
3038        Ok(Trie::open(Box::new(trie_db), storage_root))
3039    }
3040
3041    pub fn has_state_root(&self, state_root: H256) -> Result<bool, StoreError> {
3042        // Empty state trie is always available
3043        if state_root == *EMPTY_TRIE_HASH {
3044            return Ok(true);
3045        }
3046        let trie = self.open_state_trie(state_root)?;
3047        // NOTE: here we hash the root because the trie doesn't check the state root is correct
3048        let Some(root) = trie.db().get(Nibbles::default())? else {
3049            return Ok(false);
3050        };
3051        let root_hash = ethrex_trie::Node::decode(&root)?
3052            .compute_hash(&NativeCrypto)
3053            .finalize(&NativeCrypto);
3054        Ok(state_root == root_hash)
3055    }
3056
3057    /// Takes a block hash and returns an iterator to its ancestors. Block headers are returned
3058    /// in reverse order, starting from the given block and going up to the genesis block.
3059    pub fn ancestors(&self, block_hash: BlockHash) -> AncestorIterator {
3060        AncestorIterator {
3061            store: self.clone(),
3062            next_hash: block_hash,
3063        }
3064    }
3065
3066    /// Checks if a given block belongs to the current canonical chain. Returns false if the block is not known
3067    pub fn is_canonical_sync(&self, block_hash: BlockHash) -> Result<bool, StoreError> {
3068        let Some(block_number) = self.get_block_number_sync(block_hash)? else {
3069            return Ok(false);
3070        };
3071        Ok(self
3072            .get_canonical_block_hash_sync(block_number)?
3073            .is_some_and(|h| h == block_hash))
3074    }
3075
3076    pub fn generate_flatkeyvalue(&self) -> Result<(), StoreError> {
3077        self.flatkeyvalue_control_tx
3078            .send(FKVGeneratorControlMessage::Continue)
3079            .map_err(|_| StoreError::Custom("FlatKeyValue thread disconnected.".to_string()))
3080    }
3081
3082    pub fn create_checkpoint(&self, path: impl AsRef<Path>) -> Result<(), StoreError> {
3083        self.backend.create_checkpoint(path.as_ref())?;
3084        init_metadata_file(path.as_ref())?;
3085        Ok(())
3086    }
3087
3088    pub fn get_store_directory(&self) -> Result<PathBuf, StoreError> {
3089        Ok(self.db_path.clone())
3090    }
3091
3092    /// Loads the latest block number stored in the database, bypassing the latest block number cache
3093    async fn load_latest_block_number(&self) -> Result<Option<BlockNumber>, StoreError> {
3094        let key = chain_data_key(ChainDataIndex::LatestBlockNumber);
3095        self.read_async(CHAIN_DATA, key)
3096            .await?
3097            .map(|bytes| -> Result<BlockNumber, StoreError> {
3098                let array: [u8; 8] = bytes
3099                    .try_into()
3100                    .map_err(|_| StoreError::Custom("Invalid BlockNumber bytes".to_string()))?;
3101                Ok(BlockNumber::from_le_bytes(array))
3102            })
3103            .transpose()
3104    }
3105
3106    fn load_canonical_block_hash(
3107        &self,
3108        block_number: BlockNumber,
3109    ) -> Result<Option<BlockHash>, StoreError> {
3110        let txn = self.backend.begin_read()?;
3111        txn.get(
3112            CANONICAL_BLOCK_HASHES,
3113            block_number.to_le_bytes().as_slice(),
3114        )?
3115        .map(|bytes| H256::decode(bytes.as_slice()))
3116        .transpose()
3117        .map_err(StoreError::from)
3118    }
3119
3120    fn load_block_header(
3121        &self,
3122        block_number: BlockNumber,
3123    ) -> Result<Option<BlockHeader>, StoreError> {
3124        let Some(block_hash) = self.load_canonical_block_hash(block_number)? else {
3125            return Ok(None);
3126        };
3127        self.load_block_header_by_hash(block_hash)
3128    }
3129
3130    /// Load a block header, bypassing the latest header cache
3131    fn load_block_header_by_hash(
3132        &self,
3133        block_hash: BlockHash,
3134    ) -> Result<Option<BlockHeader>, StoreError> {
3135        let txn = self.backend.begin_read()?;
3136        let hash_key = block_hash.encode_to_vec();
3137        let header_value = txn.get(HEADERS, hash_key.as_slice())?;
3138        let mut header = header_value
3139            .map(|bytes| BlockHeaderRLP::from_bytes(bytes).to())
3140            .transpose()
3141            .map_err(StoreError::from)?;
3142        header.as_mut().inspect(|h| {
3143            // Set the hash so we avoid recomputing it later
3144            let _ = h.hash.set(block_hash);
3145        });
3146        Ok(header)
3147    }
3148
3149    pub fn last_written(&self) -> Result<Vec<u8>, StoreError> {
3150        let last_computed_flatkeyvalue = self
3151            .last_computed_flatkeyvalue
3152            .read()
3153            .map_err(|_| StoreError::LockError)?;
3154        Ok(last_computed_flatkeyvalue.clone())
3155    }
3156
3157    fn flatkeyvalue_computed_with_last_written(account: H256, last_written: &[u8]) -> bool {
3158        let account_nibbles = Nibbles::from_bytes(account.as_bytes());
3159        &last_written[0..64] > account_nibbles.as_ref()
3160    }
3161}
3162
3163type TrieNodesUpdate = Vec<(Nibbles, Vec<u8>)>;
3164
3165struct TrieUpdate {
3166    result_sender: std::sync::mpsc::SyncSender<Result<(), StoreError>>,
3167    parent_state_root: H256,
3168    child_state_root: H256,
3169    account_updates: TrieNodesUpdate,
3170    storage_updates: Vec<(H256, TrieNodesUpdate)>,
3171    is_batch: bool,
3172}
3173
3174/// Messages handled by the trie-update background worker.
3175///
3176/// `Ping` is a no-op the worker handles between real updates. Because the
3177/// worker channel is `sync_channel(0)` (rendezvous), a successful `Ping` send
3178/// proves the worker has finished its previous iteration (Phase 1+2+3) and is
3179/// blocked back at `recv()` — i.e. persistence is idle. See
3180/// `Store::wait_for_persistence_idle`.
3181enum TrieMessage {
3182    Update(TrieUpdate),
3183    Ping,
3184}
3185
3186// NOTE: we don't receive `Store` here to avoid cyclic dependencies
3187// with the other end of `fkv_ctl`
3188fn apply_trie_updates(
3189    backend: &dyn StorageBackend,
3190    fkv_ctl: &SyncSender<FKVGeneratorControlMessage>,
3191    trie_cache: &Arc<RwLock<Arc<TrieLayerCache>>>,
3192    trie_update: TrieUpdate,
3193) -> Result<(), StoreError> {
3194    let TrieUpdate {
3195        result_sender,
3196        parent_state_root,
3197        child_state_root,
3198        account_updates,
3199        storage_updates,
3200        is_batch,
3201    } = trie_update;
3202
3203    // Phase 1: update the in-memory diff-layers only, then notify block production.
3204    let new_layer = storage_updates
3205        .into_iter()
3206        .flat_map(|(account_hash, nodes)| {
3207            nodes
3208                .into_iter()
3209                .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
3210        })
3211        .chain(account_updates)
3212        .collect();
3213    // Read-Copy-Update the trie cache with a new layer.
3214    let trie = trie_cache
3215        .read()
3216        .map_err(|_| StoreError::LockError)?
3217        .clone();
3218    let mut trie_mut = (*trie).clone();
3219    trie_mut.put_batch(parent_state_root, child_state_root, new_layer);
3220    let trie = Arc::new(trie_mut);
3221    *trie_cache.write().map_err(|_| StoreError::LockError)? = trie.clone();
3222    // Update finished, signal block processing.
3223    result_sender
3224        .send(Ok(()))
3225        .map_err(|_| StoreError::LockError)?;
3226
3227    // Phase 2: update disk layer.
3228    let commitable = if is_batch {
3229        trie.get_commitable_with_threshold(parent_state_root, BATCH_COMMIT_THRESHOLD)
3230    } else {
3231        trie.get_commitable(parent_state_root)
3232    };
3233    let Some(root) = commitable else {
3234        // Nothing to commit to disk, move on.
3235        return Ok(());
3236    };
3237    // Stop the flat-key-value generator thread, as the underlying trie is about to change.
3238    // Ignore the error, if the channel is closed it means there is no worker to notify.
3239    let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop);
3240
3241    // RCU to remove the bottom layer: update step needs to happen after disk layer is updated.
3242    let mut trie_mut = (*trie).clone();
3243
3244    let last_written = backend
3245        .begin_read()?
3246        .get(MISC_VALUES, "last_written".as_bytes())?
3247        .unwrap_or_default();
3248
3249    let mut write_tx = backend.begin_write()?;
3250
3251    // Before encoding, accounts have only the account address as their path, while storage keys have
3252    // the account address (32 bytes) + storage path (up to 32 bytes).
3253
3254    // Commit removes the bottom layer and returns it, this is the mutation step.
3255    let nodes = trie_mut.commit(root).unwrap_or_default();
3256    let mut result = Ok(());
3257    for (key, value) in nodes {
3258        let is_leaf = key.len() == 65 || key.len() == 131;
3259        let is_account = key.len() <= 65;
3260
3261        if is_leaf && key > last_written {
3262            continue;
3263        }
3264        let table = if is_leaf {
3265            if is_account {
3266                &ACCOUNT_FLATKEYVALUE
3267            } else {
3268                &STORAGE_FLATKEYVALUE
3269            }
3270        } else if is_account {
3271            &ACCOUNT_TRIE_NODES
3272        } else {
3273            &STORAGE_TRIE_NODES
3274        };
3275        if value.is_empty() {
3276            result = write_tx.delete(table, &key);
3277        } else {
3278            result = write_tx.put(table, &key, &value);
3279        }
3280        if result.is_err() {
3281            break;
3282        }
3283    }
3284    if result.is_ok() {
3285        result = write_tx.commit();
3286    }
3287    // We want to send this message even if there was an error during the batch write
3288    let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue);
3289    result?;
3290    // Phase 3: update diff layers with the removal of bottom layer.
3291    *trie_cache.write().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut);
3292    Ok(())
3293}
3294
3295// NOTE: we don't receive `Store` here to avoid cyclic dependencies
3296// with the other end of `control_rx`
3297fn flatkeyvalue_generator(
3298    backend: &Arc<dyn StorageBackend>,
3299    last_computed_fkv: &RwLock<Vec<u8>>,
3300    control_rx: &std::sync::mpsc::Receiver<FKVGeneratorControlMessage>,
3301) -> Result<(), StoreError> {
3302    info!("Generation of FlatKeyValue started.");
3303    let initial_last_written = backend
3304        .begin_read()?
3305        .get(MISC_VALUES, "last_written".as_bytes())?
3306        .unwrap_or_default();
3307
3308    if initial_last_written.is_empty() {
3309        // First time generating the FKV. Remove all FKV entries just in case
3310        backend.clear_table(ACCOUNT_FLATKEYVALUE)?;
3311        backend.clear_table(STORAGE_FLATKEYVALUE)?;
3312    } else if initial_last_written == [0xff] {
3313        // FKV was already generated
3314        info!("FlatKeyValue already generated. Skipping.");
3315        return Ok(());
3316    }
3317
3318    loop {
3319        // Acquire a fresh read view per iteration so updates performed while the
3320        // generator is paused are visible after a Continue signal.
3321        let read_tx = backend.begin_read()?;
3322        let root = read_tx
3323            .get(ACCOUNT_TRIE_NODES, &[])?
3324            .ok_or(StoreError::MissingLatestBlockNumber)?;
3325        let root: Node = ethrex_trie::Node::decode(&root)?;
3326        let state_root = root.compute_hash(&NativeCrypto).finalize(&NativeCrypto);
3327
3328        let last_written = read_tx
3329            .get(MISC_VALUES, "last_written".as_bytes())?
3330            .unwrap_or_default();
3331        let last_written_account = last_written
3332            .get(0..64)
3333            .map(|v| Nibbles::from_hex(v.to_vec()))
3334            .unwrap_or_default();
3335        let mut last_written_storage = last_written
3336            .get(66..130)
3337            .map(|v| Nibbles::from_hex(v.to_vec()))
3338            .unwrap_or_default();
3339
3340        debug!("Starting FlatKeyValue loop pivot={last_written:?} SR={state_root:x}");
3341
3342        let mut ctr = 0;
3343        let mut write_txn = backend.begin_write()?;
3344        let mut iter = Trie::open(
3345            Box::new(BackendTrieDB::new_for_accounts_with_view(
3346                backend.clone(),
3347                read_tx.clone(),
3348                last_written.clone(),
3349            )?),
3350            state_root,
3351        )
3352        .into_iter();
3353        if last_written_account > Nibbles::default() {
3354            iter.advance(last_written_account.to_bytes())?;
3355        }
3356        let res = iter.try_for_each(|(path, node)| -> Result<(), StoreError> {
3357            let Node::Leaf(node) = node else {
3358                return Ok(());
3359            };
3360            let account_state = AccountState::decode(&node.value)?;
3361            let account_hash = H256::from_slice(&path.to_bytes());
3362            write_txn.put(MISC_VALUES, "last_written".as_bytes(), path.as_ref())?;
3363            write_txn.put(ACCOUNT_FLATKEYVALUE, path.as_ref(), &node.value)?;
3364            ctr += 1;
3365            if ctr > 10_000 {
3366                write_txn.commit()?;
3367                write_txn = backend.begin_write()?;
3368                *last_computed_fkv
3369                    .write()
3370                    .map_err(|_| StoreError::LockError)? = path.as_ref().to_vec();
3371                ctr = 0;
3372            }
3373
3374            let mut iter_inner = Trie::open(
3375                Box::new(BackendTrieDB::new_for_account_storage_with_view(
3376                    backend.clone(),
3377                    read_tx.clone(),
3378                    account_hash,
3379                    path.as_ref().to_vec(),
3380                )?),
3381                account_state.storage_root,
3382            )
3383            .into_iter();
3384            if last_written_storage > Nibbles::default() {
3385                iter_inner.advance(last_written_storage.to_bytes())?;
3386                last_written_storage = Nibbles::default();
3387            }
3388            iter_inner.try_for_each(|(path, node)| -> Result<(), StoreError> {
3389                let Node::Leaf(node) = node else {
3390                    return Ok(());
3391                };
3392                let key = apply_prefix(Some(account_hash), path);
3393                write_txn.put(MISC_VALUES, "last_written".as_bytes(), key.as_ref())?;
3394                write_txn.put(STORAGE_FLATKEYVALUE, key.as_ref(), &node.value)?;
3395                ctr += 1;
3396                if ctr > 10_000 {
3397                    write_txn.commit()?;
3398                    write_txn = backend.begin_write()?;
3399                    *last_computed_fkv
3400                        .write()
3401                        .map_err(|_| StoreError::LockError)? = key.into_vec();
3402                    ctr = 0;
3403                }
3404                fkv_check_for_stop_msg(control_rx)?;
3405                Ok(())
3406            })?;
3407            fkv_check_for_stop_msg(control_rx)?;
3408            Ok(())
3409        });
3410        match res {
3411            Err(StoreError::PivotChanged) => {
3412                match control_rx.recv() {
3413                    Ok(FKVGeneratorControlMessage::Continue) => {}
3414                    Ok(FKVGeneratorControlMessage::Stop) => {
3415                        return Err(StoreError::Custom("Unexpected Stop message".to_string()));
3416                    }
3417                    // If the channel was closed, we stop generation prematurely
3418                    Err(std::sync::mpsc::RecvError) => {
3419                        info!("Store closed, stopping FlatKeyValue generation.");
3420                        return Ok(());
3421                    }
3422                }
3423            }
3424            Err(err) => return Err(err),
3425            Ok(()) => {
3426                write_txn.put(MISC_VALUES, "last_written".as_bytes(), &[0xff])?;
3427                write_txn.commit()?;
3428                *last_computed_fkv
3429                    .write()
3430                    .map_err(|_| StoreError::LockError)? = vec![0xff; 131];
3431                info!("FlatKeyValue generation finished.");
3432                return Ok(());
3433            }
3434        };
3435    }
3436}
3437
3438fn fkv_check_for_stop_msg(
3439    control_rx: &std::sync::mpsc::Receiver<FKVGeneratorControlMessage>,
3440) -> Result<(), StoreError> {
3441    match control_rx.try_recv() {
3442        Ok(FKVGeneratorControlMessage::Stop) | Err(TryRecvError::Disconnected) => {
3443            return Err(StoreError::PivotChanged);
3444        }
3445        Ok(FKVGeneratorControlMessage::Continue) => {
3446            return Err(StoreError::Custom(
3447                "Unexpected Continue message".to_string(),
3448            ));
3449        }
3450        Err(TryRecvError::Empty) => {}
3451    }
3452    Ok(())
3453}
3454
3455fn state_trie_locked_backend(
3456    backend: &dyn StorageBackend,
3457    last_written: Vec<u8>,
3458) -> Result<BackendTrieDBLocked, StoreError> {
3459    // No address prefix for state trie
3460    BackendTrieDBLocked::new(backend, last_written)
3461}
3462
3463pub struct AccountProof {
3464    pub proof: Vec<NodeRLP>,
3465    pub account: AccountState,
3466    pub storage_proof: Vec<StorageSlotProof>,
3467}
3468
3469pub struct StorageSlotProof {
3470    pub proof: Vec<NodeRLP>,
3471    pub key: H256,
3472    pub value: U256,
3473}
3474
3475pub struct AncestorIterator {
3476    store: Store,
3477    next_hash: BlockHash,
3478}
3479
3480impl Iterator for AncestorIterator {
3481    type Item = Result<(BlockHash, BlockHeader), StoreError>;
3482
3483    fn next(&mut self) -> Option<Self::Item> {
3484        let next_hash = self.next_hash;
3485        match self.store.load_block_header_by_hash(next_hash) {
3486            Ok(Some(header)) => {
3487                let ret_hash = self.next_hash;
3488                self.next_hash = header.parent_hash;
3489                Some(Ok((ret_hash, header)))
3490            }
3491            Ok(None) => None,
3492            Err(e) => Some(Err(e)),
3493        }
3494    }
3495}
3496
3497pub fn hash_address(address: &Address) -> Vec<u8> {
3498    keccak_hash(address.to_fixed_bytes()).to_vec()
3499}
3500
3501fn hash_address_fixed(address: &Address) -> H256 {
3502    keccak(address.to_fixed_bytes())
3503}
3504
3505pub fn hash_key(key: &H256) -> Vec<u8> {
3506    keccak_hash(key.to_fixed_bytes()).to_vec()
3507}
3508
3509pub fn hash_key_fixed(key: &H256) -> [u8; 32] {
3510    keccak_hash(key.to_fixed_bytes())
3511}
3512
3513fn chain_data_key(index: ChainDataIndex) -> Vec<u8> {
3514    (index as u8).encode_to_vec()
3515}
3516
3517fn snap_state_key(index: SnapStateIndex) -> Vec<u8> {
3518    (index as u8).encode_to_vec()
3519}
3520
3521/// Builds a fixed-width RECEIPTS key: block_hash (32B) || index (8B BE).
3522pub fn receipt_key(block_hash: &BlockHash, index: u64) -> Vec<u8> {
3523    let mut key = Vec::with_capacity(40);
3524    key.extend_from_slice(block_hash.as_bytes());
3525    key.extend_from_slice(&index.to_be_bytes());
3526    key
3527}
3528
3529fn encode_code(code: &Code) -> Vec<u8> {
3530    let mut buf =
3531        Vec::with_capacity(6 + code.len() + std::mem::size_of_val::<[u32]>(&code.jump_targets));
3532    code.code().encode(&mut buf);
3533    // `Arc<[u32]>` (the in-memory share) has no `RLPEncode` impl; encode through an
3534    // owned `Vec` on this cold DB-write path (code is persisted once per hash).
3535    code.jump_targets.to_vec().encode(&mut buf);
3536    buf
3537}
3538
3539#[derive(Debug, Default, Clone)]
3540struct LatestBlockHeaderCache {
3541    current: Arc<Mutex<Arc<BlockHeader>>>,
3542}
3543
3544impl LatestBlockHeaderCache {
3545    pub fn get(&self) -> Arc<BlockHeader> {
3546        self.current.lock().expect("poisoned mutex").clone()
3547    }
3548
3549    pub fn update(&self, header: BlockHeader) {
3550        let new = Arc::new(header);
3551        *self.current.lock().expect("poisoned mutex") = new;
3552    }
3553}
3554
3555#[derive(Debug, Serialize, Deserialize)]
3556pub struct StoreMetadata {
3557    pub schema_version: u64,
3558}
3559
3560impl StoreMetadata {
3561    pub fn new(schema_version: u64) -> Self {
3562        Self { schema_version }
3563    }
3564}
3565
3566/// Reads the schema version from the metadata file, if it exists.
3567///
3568/// Returns `Some(version)` when metadata.json is present and valid,
3569/// or `None` when the file does not exist.
3570fn read_store_schema_version(path: &Path) -> Result<Option<u64>, StoreError> {
3571    let metadata_path = path.join(STORE_METADATA_FILENAME);
3572    if !metadata_path.exists() {
3573        return Ok(None);
3574    }
3575    if !metadata_path.is_file() {
3576        return Err(StoreError::Custom(
3577            "store schema path exists but is not a file".to_string(),
3578        ));
3579    }
3580    let file_contents = std::fs::read_to_string(metadata_path)?;
3581    let metadata: StoreMetadata = serde_json::from_str(&file_contents)?;
3582    Ok(Some(metadata.schema_version))
3583}
3584
3585fn init_metadata_file(parent_path: &Path) -> Result<(), StoreError> {
3586    std::fs::create_dir_all(parent_path)?;
3587
3588    let metadata_path = parent_path.join(STORE_METADATA_FILENAME);
3589    let metadata = StoreMetadata::new(STORE_SCHEMA_VERSION);
3590    let serialized_metadata = serde_json::to_string_pretty(&metadata)?;
3591    let mut new_file = std::fs::File::create_new(metadata_path)?;
3592    new_file.write_all(serialized_metadata.as_bytes())?;
3593    Ok(())
3594}
3595
3596/// Returns `true` if `path` contains a *legacy* database — one written before
3597/// the metadata file existed, so it has no `metadata.json` to identify it.
3598/// Detected by RocksDB's own marker files, as opposed to unrelated files that
3599/// merely share the datadir. Only meaningful once metadata has been confirmed
3600/// absent; otherwise prefer `has_valid_db`, which keys off the metadata file.
3601///
3602/// Previously the caller treated *any* non-empty directory as such a legacy
3603/// database, which made startup fail when unrelated files lived alongside the DB
3604/// — e.g. EthDocker writes the JWT secret into the datadir (issue #5680). We
3605/// instead look for RocksDB's marker files, so a datadir that only contains such
3606/// unrelated files is correctly treated as fresh.
3607fn dir_contains_legacy_db(path: &Path) -> Result<bool, StoreError> {
3608    // `CURRENT` has a fixed name and is written by every RocksDB instance, so
3609    // check for it directly instead of scanning a datadir that may hold many
3610    // unrelated files.
3611    if path.join("CURRENT").is_file() {
3612        return Ok(true);
3613    }
3614    // The manifest has a numeric suffix (`MANIFEST-<n>`), so it can only be
3615    // found by scanning. Restrict to plain files: a directory that happens to
3616    // share the name is not a database marker.
3617    for entry in std::fs::read_dir(path)? {
3618        let entry = entry?;
3619        if !entry.file_type()?.is_file() {
3620            continue;
3621        }
3622        if entry.file_name().to_string_lossy().starts_with("MANIFEST-") {
3623            return Ok(true);
3624        }
3625    }
3626    Ok(false)
3627}
3628
3629/// Checks whether a valid (or migratable) database exists at the given path
3630/// by looking for a metadata.json file with a schema version between 1 and
3631/// `STORE_SCHEMA_VERSION` (inclusive).
3632pub fn has_valid_db(path: &Path) -> bool {
3633    let metadata_path = path.join(STORE_METADATA_FILENAME);
3634    if !metadata_path.is_file() {
3635        return false;
3636    }
3637    let Ok(contents) = std::fs::read_to_string(&metadata_path) else {
3638        return false;
3639    };
3640    let Ok(metadata) = serde_json::from_str::<StoreMetadata>(&contents) else {
3641        return false;
3642    };
3643    metadata.schema_version >= 1 && metadata.schema_version <= STORE_SCHEMA_VERSION
3644}
3645
3646/// Reads the chain ID from an existing database without performing a full
3647/// store initialization. Returns `None` if the database doesn't exist or
3648/// the chain config can't be read. Always returns `None` when compiled
3649/// without the `rocksdb` feature.
3650///
3651/// Each failure mode logs a warning so callers (and operators) can diagnose
3652/// why an existing database was not usable — previously every error was
3653/// silently swallowed by `.ok()?`.
3654pub fn read_chain_id_from_db(path: &Path) -> Option<u64> {
3655    if !has_valid_db(path) {
3656        return None;
3657    }
3658    #[cfg(feature = "rocksdb")]
3659    {
3660        // The cache size is irrelevant for this one-shot chain-id read (the LRU
3661        // is sized as a ceiling, not pre-allocated), so we use the default.
3662        let backend = match RocksDBBackend::open(path, DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES) {
3663            Ok(backend) => backend,
3664            Err(e) => {
3665                warn!("Failed to open RocksDB at {path:?} to read chain ID: {e}");
3666                return None;
3667            }
3668        };
3669        let read = match backend.begin_read() {
3670            Ok(read) => read,
3671            Err(e) => {
3672                warn!("Failed to begin read transaction at {path:?}: {e}");
3673                return None;
3674            }
3675        };
3676        let key = chain_data_key(ChainDataIndex::ChainConfig);
3677        let bytes = match read.get(CHAIN_DATA, &key) {
3678            Ok(Some(bytes)) => bytes,
3679            Ok(None) => {
3680                warn!("Chain config entry not found in database at {path:?}");
3681                return None;
3682            }
3683            Err(e) => {
3684                warn!("Failed to read chain config from database at {path:?}: {e}");
3685                return None;
3686            }
3687        };
3688        // Only extract chain_id here: the stored `ChainConfig` JSON may include
3689        // fields whose serialization changed across releases (e.g. pre-v10 wrote
3690        // `terminal_total_difficulty` as a plain number, v10 expects hex string).
3691        // Deserializing the full struct would reject otherwise-migratable v9 data.
3692        #[derive(serde::Deserialize)]
3693        #[serde(rename_all = "camelCase")]
3694        struct ChainIdOnly {
3695            chain_id: u64,
3696        }
3697        match serde_json::from_slice::<ChainIdOnly>(&bytes) {
3698            Ok(partial) => Some(partial.chain_id),
3699            Err(e) => {
3700                warn!("Failed to deserialize chain ID from database at {path:?}: {e}");
3701                None
3702            }
3703        }
3704    }
3705    #[cfg(not(feature = "rocksdb"))]
3706    {
3707        let _ = path;
3708        None
3709    }
3710}
3711
3712#[cfg(test)]
3713mod merge_tests {
3714    use super::*;
3715
3716    fn h256(b: u8) -> H256 {
3717        H256::from_low_u64_be(b as u64)
3718    }
3719
3720    fn op(bn: BlockNumber, bh: H256, idx: Index) -> Vec<u8> {
3721        encode_tx_location_operand(bn, bh, idx)
3722    }
3723
3724    fn decode(v: &[u8]) -> Vec<(BlockNumber, BlockHash, Index)> {
3725        <Vec<(BlockNumber, BlockHash, Index)>>::decode(v).unwrap()
3726    }
3727
3728    #[test]
3729    fn single_operand_on_empty_base() {
3730        let out = tx_locations_merge(None, vec![op(100, h256(0x10), 0)]).unwrap();
3731        assert_eq!(decode(&out), vec![(100, h256(0x10), 0)]);
3732    }
3733
3734    #[test]
3735    fn operand_appended_to_existing_base() {
3736        let base = vec![(100u64, h256(0x10), 0u64)].encode_to_vec();
3737        let out = tx_locations_merge(Some(&base), vec![op(101, h256(0x11), 5)]).unwrap();
3738        let mut got = decode(&out);
3739        got.sort();
3740        let mut want = vec![(100, h256(0x10), 0), (101, h256(0x11), 5)];
3741        want.sort();
3742        assert_eq!(got, want);
3743    }
3744
3745    #[test]
3746    fn multiple_operands_combined() {
3747        let out = tx_locations_merge(
3748            None,
3749            vec![
3750                op(100, h256(0x10), 0),
3751                op(100, h256(0x11), 1),
3752                op(101, h256(0x12), 2),
3753            ],
3754        )
3755        .unwrap();
3756        assert_eq!(decode(&out).len(), 3);
3757    }
3758
3759    #[test]
3760    fn same_block_hash_is_deduped() {
3761        // Two operands with the same block_hash: the later one replaces the earlier.
3762        let out =
3763            tx_locations_merge(None, vec![op(100, h256(0x10), 0), op(100, h256(0x10), 7)]).unwrap();
3764        assert_eq!(decode(&out), vec![(100, h256(0x10), 7)]);
3765    }
3766
3767    #[test]
3768    fn malformed_operand_aborts_merge() {
3769        // Fail loud: a malformed operand must abort the merge (return None), not
3770        // silently drop it and commit a partial result.
3771        let out = tx_locations_merge(None, vec![vec![0xff, 0xff], op(100, h256(0x10), 0)]);
3772        assert!(out.is_none(), "merge must abort on a malformed operand");
3773    }
3774
3775    #[test]
3776    fn malformed_base_value_aborts_merge() {
3777        let out = tx_locations_merge(Some(&[0xff, 0xff]), vec![op(100, h256(0x10), 0)]);
3778        assert!(out.is_none(), "merge must abort on a corrupt base value");
3779    }
3780
3781    /// Regression for the associative-merge format bug: a PartialMerge result
3782    /// must be re-mergeable as an operand. RocksDB folds operands together
3783    /// without a base value during compaction, then feeds that result back into
3784    /// a later merge. If the operand format differed from the output format,
3785    /// the re-fed result would fail to decode and entries would be dropped
3786    /// (observed as 1664 silent drops during a compaction pass on mainnet).
3787    #[test]
3788    fn partial_merge_result_is_a_valid_operand() {
3789        // Step 1: PartialMerge — combine operands with NO base value.
3790        let partial =
3791            tx_locations_merge(None, vec![op(100, h256(0x10), 0), op(101, h256(0x11), 1)]).unwrap();
3792
3793        // Step 2: the partial result is now itself an operand in a later merge,
3794        // on top of an existing base value. This is the path that used to drop
3795        // entries.
3796        let base = vec![(99u64, h256(0x09), 9u64)].encode_to_vec();
3797        let out = tx_locations_merge(Some(&base), vec![partial]).unwrap();
3798
3799        let mut got = decode(&out);
3800        got.sort();
3801        let mut want = vec![
3802            (99, h256(0x09), 9),
3803            (100, h256(0x10), 0),
3804            (101, h256(0x11), 1),
3805        ];
3806        want.sort();
3807        assert_eq!(
3808            got, want,
3809            "no entries may be lost when re-merging a partial result"
3810        );
3811    }
3812
3813    /// Operand and stored-value encodings must be byte-identical types, so a
3814    /// freshly-encoded operand round-trips through the value decoder.
3815    #[test]
3816    fn operand_encoding_matches_value_encoding() {
3817        let operand = op(100, h256(0x10), 3);
3818        // Decoding the operand as the stored Vec type must succeed.
3819        assert_eq!(decode(&operand), vec![(100, h256(0x10), 3)]);
3820    }
3821
3822    /// Chained PartialMerges (operand-only folds applied repeatedly) stay valid.
3823    #[test]
3824    fn chained_partial_merges() {
3825        let p1 = tx_locations_merge(None, vec![op(1, h256(0x01), 0)]).unwrap();
3826        let p2 = tx_locations_merge(None, vec![p1, op(2, h256(0x02), 0)]).unwrap();
3827        let p3 = tx_locations_merge(None, vec![p2, op(3, h256(0x03), 0)]).unwrap();
3828        let out = tx_locations_merge(None, vec![p3]).unwrap();
3829        assert_eq!(decode(&out).len(), 3);
3830    }
3831}
3832
3833#[cfg(test)]
3834mod datadir_tests {
3835    use super::*;
3836    use std::fs;
3837
3838    #[test]
3839    fn empty_dir_has_no_existing_db() {
3840        let dir = tempfile::tempdir().unwrap();
3841        assert!(!dir_contains_legacy_db(dir.path()).unwrap());
3842    }
3843
3844    #[test]
3845    fn dir_with_only_unrelated_files_has_no_existing_db() {
3846        // Regression for #5680: a JWT secret (or any unrelated file) in the
3847        // datadir must not be mistaken for an existing database.
3848        let dir = tempfile::tempdir().unwrap();
3849        fs::write(dir.path().join("jwt.hex"), "0xdeadbeef").unwrap();
3850        fs::write(dir.path().join("LOG"), "noise").unwrap();
3851        assert!(!dir_contains_legacy_db(dir.path()).unwrap());
3852    }
3853
3854    #[test]
3855    fn dir_with_rocksdb_markers_has_existing_db() {
3856        // A `CURRENT` file (and, separately, a `MANIFEST-*` file) marks a real DB.
3857        let dir = tempfile::tempdir().unwrap();
3858        fs::write(dir.path().join("CURRENT"), "MANIFEST-000001\n").unwrap();
3859        assert!(dir_contains_legacy_db(dir.path()).unwrap());
3860
3861        let dir2 = tempfile::tempdir().unwrap();
3862        fs::write(dir2.path().join("MANIFEST-000007"), "x").unwrap();
3863        assert!(dir_contains_legacy_db(dir2.path()).unwrap());
3864    }
3865
3866    #[test]
3867    fn dir_with_marker_named_subdirectories_has_no_existing_db() {
3868        // A *directory* named like a marker file must not be mistaken for a DB;
3869        // RocksDB only ever writes these as plain files.
3870        let dir = tempfile::tempdir().unwrap();
3871        fs::create_dir(dir.path().join("CURRENT")).unwrap();
3872        fs::create_dir(dir.path().join("MANIFEST-000001")).unwrap();
3873        assert!(!dir_contains_legacy_db(dir.path()).unwrap());
3874    }
3875}