ethrex_storage/layering.rs
1use ethrex_common::H256;
2use fastbloom::AtomicBloomFilter;
3use rayon::prelude::*;
4use rustc_hash::{FxBuildHasher, FxHashMap};
5use std::{fmt, sync::Arc};
6
7use ethrex_trie::{Nibbles, TrieDB, TrieError};
8
9const BLOOM_SIZE: usize = 1_000_000;
10const FALSE_POSITIVE_RATE: f64 = 0.02;
11
12#[derive(Debug, Clone)]
13struct TrieLayer {
14 nodes: FxHashMap<Vec<u8>, Vec<u8>>,
15 parent: H256,
16 id: usize,
17}
18
19/// In-memory cache of trie diff-layers, one per block (or per batch of blocks in full sync).
20///
21/// Layers form a singly-linked chain from newest to oldest via the `parent` field:
22///
23/// ```text
24/// newest_root -> parent_1 -> parent_2 -> ... -> oldest_root -> (on-disk state)
25/// ```
26///
27/// Each layer stores the trie node diffs produced by one block (regular sync) or one batch
28/// of ~1024 blocks (full sync). When the chain reaches `commit_threshold` layers,
29/// [`get_commitable`](Self::get_commitable) identifies the layer to flush, and
30/// [`commit`](Self::commit) removes it (plus all ancestors) and returns the merged key-values
31/// for writing to RocksDB.
32///
33/// Two commit thresholds are used in practice:
34/// - **128** — regular block-by-block execution (one layer ≈ one block's trie diff).
35/// - **4** — full sync / batch mode (one layer ≈ 1024 blocks ≈ 1 GB), configured via
36/// `BATCH_COMMIT_THRESHOLD` in `store.rs`.
37///
38/// A global bloom filter is maintained across all layers to short-circuit lookups for keys
39/// that don't exist in any layer, avoiding a full layer-chain walk.
40#[derive(Clone)]
41pub struct TrieLayerCache {
42 /// Monotonically increasing ID for layers, starting at 1.
43 /// TODO: this implementation panics on overflow
44 last_id: usize,
45 /// Number of layers after which we should commit to the database.
46 commit_threshold: usize,
47 layers: FxHashMap<H256, Arc<TrieLayer>>,
48 /// Global bloom filter that tracks all keys across all layers.
49 ///
50 /// Used to avoid looking up all layers when the given path doesn't exist in any
51 /// layer, thus going directly to the database.
52 bloom: AtomicBloomFilter<FxBuildHasher>,
53}
54
55impl fmt::Debug for TrieLayerCache {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 f.debug_struct("TrieLayerCache")
58 .field("last_id", &self.last_id)
59 .field("commit_threshold", &self.commit_threshold)
60 .field("layers", &self.layers)
61 .field("bloom", &"AtomicBloomFilter")
62 .finish()
63 }
64}
65
66impl Default for TrieLayerCache {
67 fn default() -> Self {
68 Self {
69 bloom: Self::create_filter(BLOOM_SIZE),
70 last_id: 0,
71 layers: Default::default(),
72 // TODO (issue #6345): this is coupled with DB_COMMIT_THRESHOLD in store.rs — unify them.
73 commit_threshold: 128,
74 }
75 }
76}
77
78impl TrieLayerCache {
79 /// Creates a new cache with the given commit threshold.
80 ///
81 /// The threshold controls how many layers accumulate before a disk flush is triggered.
82 pub fn new(commit_threshold: usize) -> Self {
83 Self {
84 bloom: Self::create_filter(BLOOM_SIZE),
85 last_id: 0,
86 layers: Default::default(),
87 commit_threshold,
88 }
89 }
90
91 fn create_filter(expected_items: usize) -> AtomicBloomFilter<FxBuildHasher> {
92 AtomicBloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
93 .hasher(FxBuildHasher)
94 .expected_items(expected_items.max(BLOOM_SIZE))
95 }
96
97 /// Looks up a trie node `key` starting from the layer identified by `state_root`,
98 /// walking the parent chain toward older layers.
99 ///
100 /// Returns `Some(value)` from the first (newest) layer that contains the key, or `None`
101 /// if no layer has it. A bloom filter is checked first to skip the walk entirely when the
102 /// key is guaranteed absent from all layers (callers then fall through to the on-disk trie).
103 pub fn get(&self, state_root: H256, key: &[u8]) -> Option<Vec<u8>> {
104 // Fast check to know if any layer may contain the given key.
105 // We can only be certain it doesn't exist, but if it returns true it may or may not exist (false positive).
106 if !self.bloom.contains(key) {
107 // TrieWrapper goes to db when returning None.
108 return None;
109 }
110
111 let mut current_state_root = state_root;
112
113 while let Some(layer) = self.layers.get(¤t_state_root) {
114 if let Some(value) = layer.nodes.get(key) {
115 return Some(value.clone());
116 }
117 current_state_root = layer.parent;
118 if current_state_root == state_root {
119 // TODO: check if this is possible in practice
120 // This can't happen in L1, due to system contracts irreversibly modifying state
121 // at each block.
122 // On L2, if no transactions are included in a block, the state root remains the same,
123 // but we handle that case in put_batch. It may happen, however, if someone modifies
124 // state with a privileged tx and later reverts it (since it doesn't update nonce).
125 panic!("State cycle found");
126 }
127 }
128 None
129 }
130
131 /// Returns the state root from which to start a disk commit, using the cache's
132 /// default `commit_threshold`.
133 ///
134 /// Used during regular block-by-block execution (threshold = 128).
135 /// See [`get_commitable_with_threshold`](Self::get_commitable_with_threshold) for details.
136 // TODO: use finalized hash to know when to commit
137 pub fn get_commitable(&self, state_root: H256) -> Option<H256> {
138 self.get_commitable_with_threshold(state_root, self.commit_threshold)
139 }
140
141 /// Walks the layer chain starting from `state_root` toward older ancestors, counting
142 /// layers. When the count reaches `threshold`, returns the state root of that ancestor layer.
143 ///
144 /// Returns `None` if the chain has fewer than `threshold` layers (nothing to commit yet).
145 ///
146 /// This function is used to determine when to trigger a disk commit. We consider a layer "committable"
147 /// when it has at least `threshold` newer layers on top of it, ensuring that we only commit sufficiently
148 /// old layers and keep recent ones in memory for fast access.
149 ///
150 /// Having a threshold allows both customizing the commit frequency (e.g. full sync vs regular block execution)
151 /// and avoiding edge cases where there could, theoretically, be a cycle in the layer change.
152 pub(crate) fn get_commitable_with_threshold(
153 &self,
154 mut state_root: H256,
155 threshold: usize,
156 ) -> Option<H256> {
157 let mut counter = 0;
158 while let Some(layer) = self.layers.get(&state_root) {
159 counter += 1;
160 if counter >= threshold {
161 return Some(state_root);
162 }
163 state_root = layer.parent;
164 }
165 None
166 }
167
168 /// Inserts a new diff-layer into the cache, keyed by `state_root` and pointing to `parent`.
169 ///
170 /// In regular sync each call adds one block's trie diffs. In full sync (batch mode), each
171 /// call adds diffs for an entire batch of ~1024 blocks.
172 ///
173 /// No-ops if `parent == state_root` (empty block with no state change), or if `state_root`
174 /// is already present (duplicate insertion guard).
175 pub fn put_batch(
176 &mut self,
177 parent: H256,
178 state_root: H256,
179 key_values: Vec<(Nibbles, Vec<u8>)>,
180 ) {
181 if parent == state_root && key_values.is_empty() {
182 return;
183 } else if parent == state_root {
184 // L1 always changes the state root (system contracts run even on empty blocks), so
185 // this should not happen there. L2 can legitimately keep the same root on empty blocks
186 // because it has no system contract calls.
187 tracing::trace!("parent == state_root but key_values not empty");
188 return;
189 }
190 if self.layers.contains_key(&state_root) {
191 tracing::warn!("tried to insert a state_root that's already inserted");
192 return;
193 }
194
195 // Add keys to the global bloom filter
196 for (p, _) in &key_values {
197 self.bloom.insert(p.as_ref());
198 }
199
200 let nodes: FxHashMap<Vec<u8>, Vec<u8>> = key_values
201 .into_iter()
202 .map(|(path, value)| (path.into_vec(), value))
203 .collect();
204
205 self.last_id += 1;
206 let entry = TrieLayer {
207 nodes,
208 parent,
209 id: self.last_id,
210 };
211 self.layers.insert(state_root, Arc::new(entry));
212 }
213
214 /// Rebuilds the global bloom filter from scratch using all keys across all remaining layers.
215 ///
216 /// Called after [`commit`](Self::commit) removes layers, since the old filter may contain
217 /// keys from the removed layers (producing unnecessary false positives).
218 pub fn rebuild_bloom(&mut self) {
219 // Pre-compute total keys for optimal filter sizing
220 let total_keys: usize = self.layers.values().map(|layer| layer.nodes.len()).sum();
221
222 let filter = Self::create_filter(total_keys.max(BLOOM_SIZE));
223
224 // Parallel insertion - AtomicBloomFilter allows concurrent insert via &self
225 self.layers.par_iter().for_each(|(_, layer)| {
226 for path in layer.nodes.keys() {
227 filter.insert(path);
228 }
229 });
230
231 self.bloom = filter;
232 }
233
234 /// Removes the layer at `state_root` and all its ancestors from the cache, returning
235 /// their merged trie node diffs in oldest-first order (suitable for sequential disk write).
236 ///
237 /// `state_root` must be a key in `self.layers` (as returned by
238 /// [`get_commitable`](Self::get_commitable) /
239 /// [`get_commitable_with_threshold`](Self::get_commitable_with_threshold)).
240 /// If it isn't, the walk exits immediately and returns `None`.
241 ///
242 /// After removal, any orphaned layers (older than the committed ones) are pruned, and
243 /// the bloom filter is rebuilt to remove stale entries.
244 pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
245 let mut layers_to_commit = vec![];
246 let mut current_state_root = state_root;
247 while let Some(layer) = self.layers.remove(¤t_state_root) {
248 let layer = Arc::unwrap_or_clone(layer);
249 current_state_root = layer.parent;
250 layers_to_commit.push(layer);
251 }
252 let top_layer_id = layers_to_commit.first()?.id;
253 // older layers are useless
254 self.layers.retain(|_, item| item.id > top_layer_id);
255 self.rebuild_bloom(); // layers removed, rebuild global bloom filter.
256 let nodes_to_commit = layers_to_commit
257 .into_iter()
258 .rev()
259 .flat_map(|layer| layer.nodes)
260 .collect();
261 Some(nodes_to_commit)
262 }
263}
264
265/// [`TrieDB`] adapter that checks in-memory diff-layers ([`TrieLayerCache`]) first,
266/// falling back to the on-disk trie only for keys not found in any layer.
267///
268/// Used by the EVM during block execution: reads see the latest uncommitted state without
269/// waiting for a disk flush.
270pub struct TrieWrapper {
271 pub state_root: H256,
272 pub inner: Arc<TrieLayerCache>,
273 pub db: Box<dyn TrieDB>,
274 /// Pre-computed prefix nibbles for storage tries.
275 /// For state tries this is None; for storage tries this is
276 /// `Nibbles::from_bytes(address.as_bytes()).append_new(17)`.
277 prefix_nibbles: Option<Nibbles>,
278}
279
280impl TrieWrapper {
281 pub fn new(
282 state_root: H256,
283 inner: Arc<TrieLayerCache>,
284 db: Box<dyn TrieDB>,
285 prefix: Option<H256>,
286 ) -> Self {
287 let prefix_nibbles = prefix.map(|p| Nibbles::from_bytes(p.as_bytes()).append_new(17));
288 Self {
289 state_root,
290 inner,
291 db,
292 prefix_nibbles,
293 }
294 }
295}
296
297/// Prepends an account address prefix (with an invalid nibble `17` as separator) to a
298/// trie path, distinguishing storage trie entries from state trie entries in the flat
299/// key-value namespace. Returns the path unchanged if `prefix` is `None` (state trie).
300pub fn apply_prefix(prefix: Option<H256>, path: Nibbles) -> Nibbles {
301 match prefix {
302 Some(prefix) => Nibbles::from_bytes(prefix.as_bytes())
303 .append_new(17)
304 .concat(&path),
305 None => path,
306 }
307}
308
309impl TrieDB for TrieWrapper {
310 fn flatkeyvalue_computed(&self, key: Nibbles) -> bool {
311 // NOTE: we apply the prefix here, since the underlying TrieDB should
312 // always be for the state trie.
313 let key = match &self.prefix_nibbles {
314 Some(prefix) => prefix.concat(&key),
315 None => key,
316 };
317 self.db.flatkeyvalue_computed(key)
318 }
319
320 fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
321 let key = match &self.prefix_nibbles {
322 Some(prefix) => prefix.concat(&key),
323 None => key,
324 };
325 if let Some(value) = self.inner.get(self.state_root, key.as_ref()) {
326 return Ok(Some(value));
327 }
328 self.db.get(key)
329 }
330
331 fn put_batch(&self, _key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
332 // TODO: Get rid of this.
333 unimplemented!("This function should not be called");
334 }
335}