use ethrex_common::H256;
use fastbloom::AtomicBloomFilter;
use rayon::prelude::*;
use rustc_hash::{FxBuildHasher, FxHashMap};
use std::{fmt, sync::Arc};
use ethrex_trie::{Nibbles, TrieDB, TrieError};
const BLOOM_SIZE: usize = 1_000_000;
const FALSE_POSITIVE_RATE: f64 = 0.02;
#[derive(Debug, Clone)]
struct TrieLayer {
nodes: FxHashMap<Vec<u8>, Vec<u8>>,
parent: H256,
id: usize,
}
#[derive(Clone)]
pub struct TrieLayerCache {
last_id: usize,
commit_threshold: usize,
layers: FxHashMap<H256, Arc<TrieLayer>>,
bloom: AtomicBloomFilter<FxBuildHasher>,
}
impl fmt::Debug for TrieLayerCache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TrieLayerCache")
.field("last_id", &self.last_id)
.field("commit_threshold", &self.commit_threshold)
.field("layers", &self.layers)
.field("bloom", &"AtomicBloomFilter")
.finish()
}
}
impl Default for TrieLayerCache {
fn default() -> Self {
Self {
bloom: Self::create_filter(BLOOM_SIZE),
last_id: 0,
layers: Default::default(),
commit_threshold: 128,
}
}
}
impl TrieLayerCache {
pub fn new(commit_threshold: usize) -> Self {
Self {
bloom: Self::create_filter(BLOOM_SIZE),
last_id: 0,
layers: Default::default(),
commit_threshold,
}
}
fn create_filter(expected_items: usize) -> AtomicBloomFilter<FxBuildHasher> {
AtomicBloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.hasher(FxBuildHasher)
.expected_items(expected_items.max(BLOOM_SIZE))
}
pub fn get(&self, state_root: H256, key: &[u8]) -> Option<Vec<u8>> {
if !self.bloom.contains(key) {
return None;
}
let mut current_state_root = state_root;
while let Some(layer) = self.layers.get(¤t_state_root) {
if let Some(value) = layer.nodes.get(key) {
return Some(value.clone());
}
current_state_root = layer.parent;
if current_state_root == state_root {
panic!("State cycle found");
}
}
None
}
pub fn get_commitable(&self, state_root: H256) -> Option<H256> {
self.get_commitable_with_threshold(state_root, self.commit_threshold)
}
pub(crate) fn get_commitable_with_threshold(
&self,
mut state_root: H256,
threshold: usize,
) -> Option<H256> {
let mut counter = 0;
while let Some(layer) = self.layers.get(&state_root) {
counter += 1;
if counter >= threshold {
return Some(state_root);
}
state_root = layer.parent;
}
None
}
pub fn put_batch(
&mut self,
parent: H256,
state_root: H256,
key_values: Vec<(Nibbles, Vec<u8>)>,
) {
if parent == state_root && key_values.is_empty() {
return;
} else if parent == state_root {
tracing::trace!("parent == state_root but key_values not empty");
return;
}
if self.layers.contains_key(&state_root) {
tracing::warn!("tried to insert a state_root that's already inserted");
return;
}
for (p, _) in &key_values {
self.bloom.insert(p.as_ref());
}
let nodes: FxHashMap<Vec<u8>, Vec<u8>> = key_values
.into_iter()
.map(|(path, value)| (path.into_vec(), value))
.collect();
self.last_id += 1;
let entry = TrieLayer {
nodes,
parent,
id: self.last_id,
};
self.layers.insert(state_root, Arc::new(entry));
}
pub fn rebuild_bloom(&mut self) {
let total_keys: usize = self.layers.values().map(|layer| layer.nodes.len()).sum();
let filter = Self::create_filter(total_keys.max(BLOOM_SIZE));
self.layers.par_iter().for_each(|(_, layer)| {
for path in layer.nodes.keys() {
filter.insert(path);
}
});
self.bloom = filter;
}
pub fn commit(&mut self, state_root: H256) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
let mut layers_to_commit = vec![];
let mut current_state_root = state_root;
while let Some(layer) = self.layers.remove(¤t_state_root) {
let layer = Arc::unwrap_or_clone(layer);
current_state_root = layer.parent;
layers_to_commit.push(layer);
}
let top_layer_id = layers_to_commit.first()?.id;
self.layers.retain(|_, item| item.id > top_layer_id);
self.rebuild_bloom(); let nodes_to_commit = layers_to_commit
.into_iter()
.rev()
.flat_map(|layer| layer.nodes)
.collect();
Some(nodes_to_commit)
}
}
pub struct TrieWrapper {
pub state_root: H256,
pub inner: Arc<TrieLayerCache>,
pub db: Box<dyn TrieDB>,
prefix_nibbles: Option<Nibbles>,
}
impl TrieWrapper {
pub fn new(
state_root: H256,
inner: Arc<TrieLayerCache>,
db: Box<dyn TrieDB>,
prefix: Option<H256>,
) -> Self {
let prefix_nibbles = prefix.map(|p| Nibbles::from_bytes(p.as_bytes()).append_new(17));
Self {
state_root,
inner,
db,
prefix_nibbles,
}
}
}
pub fn apply_prefix(prefix: Option<H256>, path: Nibbles) -> Nibbles {
match prefix {
Some(prefix) => Nibbles::from_bytes(prefix.as_bytes())
.append_new(17)
.concat(&path),
None => path,
}
}
impl TrieDB for TrieWrapper {
fn flatkeyvalue_computed(&self, key: Nibbles) -> bool {
let key = match &self.prefix_nibbles {
Some(prefix) => prefix.concat(&key),
None => key,
};
self.db.flatkeyvalue_computed(key)
}
fn get(&self, key: Nibbles) -> Result<Option<Vec<u8>>, TrieError> {
let key = match &self.prefix_nibbles {
Some(prefix) => prefix.concat(&key),
None => key,
};
if let Some(value) = self.inner.get(self.state_root, key.as_ref()) {
return Ok(Some(value));
}
self.db.get(key)
}
fn put_batch(&self, _key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
unimplemented!("This function should not be called");
}
}