Skip to main content

ethrex_storage/
layering.rs

1use ethrex_common::H256;
2use fastbloom::AtomicBloomFilter;
3use rayon::prelude::*;
4use rustc_hash::{FxBuildHasher, FxHashMap};
5use std::{fmt, sync::Arc};
6
7use ethrex_trie::{Nibbles, TrieDB, TrieError};
8
9const BLOOM_SIZE: usize = 1_000_000;
10const FALSE_POSITIVE_RATE: f64 = 0.02;
11
12#[derive(Debug, Clone)]
13struct TrieLayer {
14    nodes: FxHashMap<Vec<u8>, Vec<u8>>,
15    parent: H256,
16    id: usize,
17}
18
19/// In-memory cache of trie diff-layers, one per block (or per batch of blocks in full sync).
20///
21/// Layers form a singly-linked chain from newest to oldest via the `parent` field:
22///
23/// ```text
24/// newest_root -> parent_1 -> parent_2 -> ... -> oldest_root -> (on-disk state)
25/// ```
26///
27/// Each layer stores the trie node diffs produced by one block (regular sync) or one batch
28/// of ~1024 blocks (full sync). When the chain reaches `commit_threshold` layers,
29/// [`get_commitable`](Self::get_commitable) identifies the layer to flush, and
30/// [`commit`](Self::commit) removes it (plus all ancestors) and returns the merged key-values
31/// for writing to RocksDB.
32///
33/// Two commit thresholds are used in practice:
34/// - **128** — regular block-by-block execution (one layer ≈ one block's trie diff).
35/// - **4** — full sync / batch mode (one layer ≈ 1024 blocks ≈ 1 GB), configured via
36///   `BATCH_COMMIT_THRESHOLD` in `store.rs`.
37///
38/// A global bloom filter is maintained across all layers to short-circuit lookups for keys
39/// that don't exist in any layer, avoiding a full layer-chain walk.
40#[derive(Clone)]
41pub struct TrieLayerCache {
42    /// Monotonically increasing ID for layers, starting at 1.
43    /// TODO: this implementation panics on overflow
44    last_id: usize,
45    /// Number of layers after which we should commit to the database.
46    commit_threshold: usize,
47    layers: FxHashMap<H256, Arc<TrieLayer>>,
48    /// Global bloom filter that tracks all keys across all layers.
49    ///
50    /// Used to avoid looking up all layers when the given path doesn't exist in any
51    /// layer, thus going directly to the database.
52    bloom: AtomicBloomFilter<FxBuildHasher>,
53}
54
55impl fmt::Debug for TrieLayerCache {
56    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57        f.debug_struct("TrieLayerCache")
58            .field("last_id", &self.last_id)
59            .field("commit_threshold", &self.commit_threshold)
60            .field("layers", &self.layers)
61            .field("bloom", &"AtomicBloomFilter")
62            .finish()
63    }
64}
65
66impl Default for TrieLayerCache {
67    fn default() -> Self {
68        Self {
69            bloom: Self::create_filter(BLOOM_SIZE),
70            last_id: 0,
71            layers: Default::default(),
72            // TODO (issue #6345): this is coupled with DB_COMMIT_THRESHOLD in store.rs — unify them.
73            commit_threshold: 128,
74        }
75    }
76}
77
78impl TrieLayerCache {
79    /// Creates a new cache with the given commit threshold.
80    ///
81    /// The threshold controls how many layers accumulate before a disk flush is triggered.
82    pub fn new(commit_threshold: usize) -> Self {
83        Self {
84            bloom: Self::create_filter(BLOOM_SIZE),
85            last_id: 0,
86            layers: Default::default(),
87            commit_threshold,
88        }
89    }
90
91    fn create_filter(expected_items: usize) -> AtomicBloomFilter<FxBuildHasher> {
92        AtomicBloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
93            .hasher(FxBuildHasher)
94            .expected_items(expected_items.max(BLOOM_SIZE))
95    }
96
97    /// Looks up a trie node `key` starting from the layer identified by `state_root`,
98    /// walking the parent chain toward older layers.
99    ///
100    /// Returns `Some(value)` from the first (newest) layer that contains the key, or `None`
101    /// if no layer has it. A bloom filter is checked first to skip the walk entirely when the
102    /// key is guaranteed absent from all layers (callers then fall through to the on-disk trie).
103    pub fn get(&self, state_root: H256, key: &[u8]) -> Option<Vec<u8>> {
104        // Fast check to know if any layer may contain the given key.
105        // We can only be certain it doesn't exist, but if it returns true it may or may not exist (false positive).
106        if !self.bloom.contains(key) {
107            // TrieWrapper goes to db when returning None.
108            return None;
109        }
110
111        let mut current_state_root = state_root;
112
113        while let Some(layer) = self.layers.get(&current_state_root) {
114            if let Some(value) = layer.nodes.get(key) {
115                return Some(value.clone());
116            }
117            current_state_root = layer.parent;
118            if current_state_root == state_root {
119                // TODO: check if this is possible in practice
120                // This can't happen in L1, due to system contracts irreversibly modifying state
121                // at each block.
122                // On L2, if no transactions are included in a block, the state root remains the same,
123                // but we handle that case in put_batch. It may happen, however, if someone modifies
124                // state with a privileged tx and later reverts it (since it doesn't update nonce).
125                panic!("State cycle found");
126            }
127        }
128        None
129    }
130
131    /// Returns the state root from which to start a disk commit, using the cache's
132    /// default `commit_threshold`.
133    ///
134    /// Used during regular block-by-block execution (threshold = 128).
135    /// See [`get_commitable_with_threshold`](Self::get_commitable_with_threshold) for details.
136    // TODO: use finalized hash to know when to commit
137    pub fn get_commitable(&self, state_root: H256) -> Option<H256> {
138        self.get_commitable_with_threshold(state_root, self.commit_threshold)
139    }
140
141    /// Walks the layer chain starting from `state_root` toward older ancestors, counting
142    /// layers. When the count reaches `threshold`, returns the state root of that ancestor layer.
143    ///
144    /// Returns `None` if the chain has fewer than `threshold` layers (nothing to commit yet).
145    ///
146    /// This function is used to determine when to trigger a disk commit. We consider a layer "committable"
147    /// when it has at least `threshold` newer layers on top of it, ensuring that we only commit sufficiently
148    /// old layers and keep recent ones in memory for fast access.
149    ///
150    /// Having a threshold allows both customizing the commit frequency (e.g. full sync vs regular block execution)
151    /// and avoiding edge cases where there could, theoretically, be a cycle in the layer change.
152    pub(crate) fn get_commitable_with_threshold(
153        &self,
154        mut state_root: H256,
155        threshold: usize,
156    ) -> Option<H256> {
157        let mut counter = 0;
158        while let Some(layer) = self.layers.get(&state_root) {
159            counter += 1;
160            if counter >= threshold {
161                return Some(state_root);
162            }
163            state_root = layer.parent;
164        }
165        None
166    }
167
168    /// Inserts a new diff-layer into the cache, keyed by `state_root` and pointing to `parent`.
169    ///
170    /// In regular sync each call adds one block's trie diffs. In full sync (batch mode), each
171    /// call adds diffs for an entire batch of ~1024 blocks.
172    ///
173    /// No-ops if `parent == state_root` (empty block with no state change), or if `state_root`
174    /// is already present (duplicate insertion guard).
175    pub fn put_batch(
176        &mut self,
177        parent: H256,
178        state_root: H256,
179        key_values: Vec<(Nibbles, Vec<u8>)>,
180    ) {
181        if parent == state_root && key_values.is_empty() {
182            return;
183        } else if parent == state_root {
184            // L1 always changes the state root (system contracts run even on empty blocks), so
185            // this should not happen there. L2 can legitimately keep the same root on empty blocks
186            // because it has no system contract calls.
187            tracing::trace!("parent == state_root but key_values not empty");
188            return;
189        }
190        if self.layers.contains_key(&state_root) {
191            tracing::warn!("tried to insert a state_root that's already inserted");
192            return;
193        }
194
195        // Add keys to the global bloom filter
196        for (p, _) in &key_values {
197            self.bloom.insert(p.as_ref());
198        }
199
200        let nodes: FxHashMap<Vec<u8>, Vec<u8>> = key_values
201            .into_iter()
202            .map(|(path, value)| (path.into_vec(), value))
203            .collect();
204
205        self.last_id += 1;
206        let entry = TrieLayer {
207            nodes,
208            parent,
209            id: self.last_id,
210        };
211        self.layers.insert(state_root, Arc::new(entry));
212    }
213
214    /// Rebuilds the global bloom filter from scratch using all keys across all remaining layers.
215    ///
216    /// Called after [`commit`](Self::commit) removes layers, since the old filter may contain
217    /// keys from the removed layers (producing unnecessary false positives).
218    pub fn rebuild_bloom(&mut self) {
219        // Pre-compute total keys for optimal filter sizing
220        let total_keys: usize = self.layers.values().map(|layer| layer.nodes.len()).sum();
221
222        let filter = Self::create_filter(total_keys.max(BLOOM_SIZE));
223
224        // Parallel insertion - AtomicBloomFilter allows concurrent insert via &self
225        self.layers.par_iter().for_each(|(_, layer)| {
226            for path in layer.nodes.keys() {
227                filter.insert(path);
228            }
229        });
230
231        self.bloom = filter;
232    }
233
234    /// Removes the layer at `state_root` and all its ancestors from the cache, returning
235    /// their merged trie node diffs in oldest-first order (suitable for sequential disk write).
236    ///
237    /// `state_root` must be a key in `self.layers` (as returned by
238    /// [`get_commitable`](Self::get_commitable) /
239    /// [`get_commitable_with_threshold`](Self::get_commitable_with_threshold)).
240    /// If it isn't, the walk exits immediately and returns `None`.
241    ///
242    /// After removal, any orphaned layers (older than the committed ones) are pruned, and
243    /// the bloom filter is rebuilt to remove stale entries.
244    pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
245        let mut layers_to_commit = vec![];
246        let mut current_state_root = state_root;
247        while let Some(layer) = self.layers.remove(&current_state_root) {
248            let layer = Arc::unwrap_or_clone(layer);
249            current_state_root = layer.parent;
250            layers_to_commit.push(layer);
251        }
252        let top_layer_id = layers_to_commit.first()?.id;
253        // older layers are useless
254        self.layers.retain(|_, item| item.id > top_layer_id);
255        self.rebuild_bloom(); // layers removed, rebuild global bloom filter.
256        let nodes_to_commit = layers_to_commit
257            .into_iter()
258            .rev()
259            .flat_map(|layer| layer.nodes)
260            .collect();
261        Some(nodes_to_commit)
262    }
263}
264
265/// [`TrieDB`] adapter that checks in-memory diff-layers ([`TrieLayerCache`]) first,
266/// falling back to the on-disk trie only for keys not found in any layer.
267///
268/// Used by the EVM during block execution: reads see the latest uncommitted state without
269/// waiting for a disk flush.
270pub struct TrieWrapper {
271    pub state_root: H256,
272    pub inner: Arc<TrieLayerCache>,
273    pub db: Box<dyn TrieDB>,
274    /// Pre-computed prefix nibbles for storage tries.
275    /// For state tries this is None; for storage tries this is
276    /// `Nibbles::from_bytes(address.as_bytes()).append_new(17)`.
277    prefix_nibbles: Option<Nibbles>,
278}
279
280impl TrieWrapper {
281    pub fn new(
282        state_root: H256,
283        inner: Arc<TrieLayerCache>,
284        db: Box<dyn TrieDB>,
285        prefix: Option<H256>,
286    ) -> Self {
287        let prefix_nibbles = prefix.map(|p| Nibbles::from_bytes(p.as_bytes()).append_new(17));
288        Self {
289            state_root,
290            inner,
291            db,
292            prefix_nibbles,
293        }
294    }
295}
296
297/// Prepends an account address prefix (with an invalid nibble `17` as separator) to a
298/// trie path, distinguishing storage trie entries from state trie entries in the flat
299/// key-value namespace. Returns the path unchanged if `prefix` is `None` (state trie).
300pub fn apply_prefix(prefix: Option<H256>, path: Nibbles) -> Nibbles {
301    match prefix {
302        Some(prefix) => Nibbles::from_bytes(prefix.as_bytes())
303            .append_new(17)
304            .concat(&path),
305        None => path,
306    }
307}
308
309impl TrieDB for TrieWrapper {
310    fn flatkeyvalue_computed(&self, key: Nibbles) -> bool {
311        // NOTE: we apply the prefix here, since the underlying TrieDB should
312        // always be for the state trie.
313        let key = match &self.prefix_nibbles {
314            Some(prefix) => prefix.concat(&key),
315            None => key,
316        };
317        self.db.flatkeyvalue_computed(key)
318    }
319
320    fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
321        let key = match &self.prefix_nibbles {
322            Some(prefix) => prefix.concat(&key),
323            None => key,
324        };
325        if let Some(value) = self.inner.get(self.state_root, key.as_ref()) {
326            return Ok(Some(value));
327        }
328        self.db.get(key)
329    }
330
331    fn put_batch(&self, _key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
332        // TODO: Get rid of this.
333        unimplemented!("This function should not be called");
334    }
335}