use crate::api::tables::{
ACCOUNT_CODES, ACCOUNT_FLATKEYVALUE, ACCOUNT_TRIE_NODES, BLOCK_NUMBERS, BODIES,
CANONICAL_BLOCK_HASHES, FULLSYNC_HEADERS, HEADERS, RECEIPTS_V2, STORAGE_FLATKEYVALUE,
STORAGE_TRIE_NODES, TRANSACTION_LOCATIONS,
};
use crate::api::{
PrefixResult, StorageBackend, StorageLockedView, StorageReadView, StorageWriteBatch,
tables::TABLES,
};
use crate::error::StoreError;
use rocksdb::DBWithThreadMode;
use rocksdb::checkpoint::Checkpoint;
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamilyDescriptor, MergeOperands, MultiThreaded, Options,
SnapshotWithThreadMode, WriteBatch,
};
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use tracing::{info, warn};
use crate::store::tx_locations_merge;
fn tx_locations_merge_op(
_new_key: &[u8],
existing: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
tx_locations_merge(existing, operands)
}
#[derive(Debug)]
pub struct RocksDBBackend {
db: Arc<DBWithThreadMode<MultiThreaded>>,
}
impl RocksDBBackend {
pub fn open(path: impl AsRef<Path>, block_cache_size: usize) -> Result<Self, StoreError> {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_max_open_files(-1);
opts.set_max_file_opening_threads(16);
opts.set_max_background_jobs(8);
opts.set_level_zero_file_num_compaction_trigger(2);
opts.set_level_zero_slowdown_writes_trigger(10);
opts.set_level_zero_stop_writes_trigger(16);
opts.set_target_file_size_base(512 * 1024 * 1024); opts.set_max_bytes_for_level_base(2 * 1024 * 1024 * 1024); opts.set_max_bytes_for_level_multiplier(10.0);
opts.set_level_compaction_dynamic_level_bytes(true);
opts.set_db_write_buffer_size(1024 * 1024 * 1024); opts.set_write_buffer_size(128 * 1024 * 1024); opts.set_max_write_buffer_number(4);
opts.set_min_write_buffer_number_to_merge(2);
opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::PointInTime);
opts.set_max_total_wal_size(2 * 1024 * 1024 * 1024); opts.set_wal_bytes_per_sync(32 * 1024 * 1024); opts.set_bytes_per_sync(32 * 1024 * 1024); opts.set_use_fsync(false);
opts.set_enable_pipelined_write(true);
opts.set_allow_concurrent_memtable_write(true);
opts.set_enable_write_thread_adaptive_yield(true);
opts.set_compaction_readahead_size(4 * 1024 * 1024); opts.set_advise_random_on_open(false);
opts.set_compression_type(rocksdb::DBCompressionType::None);
let compressible_tables = [
BLOCK_NUMBERS,
HEADERS,
BODIES,
RECEIPTS_V2,
TRANSACTION_LOCATIONS,
FULLSYNC_HEADERS,
];
let existing_cfs = DBWithThreadMode::<MultiThreaded>::list_cf(&opts, path.as_ref())
.unwrap_or_else(|_| vec!["default".to_string()]);
let mut all_cfs_to_open = HashSet::new();
all_cfs_to_open.extend(existing_cfs.iter().cloned());
all_cfs_to_open.extend(TABLES.iter().map(|table| table.to_string()));
let block_cache = Cache::new_lru_cache(block_cache_size);
let configure_block_cache = |block_opts: &mut BlockBasedOptions| {
block_opts.set_block_cache(&block_cache);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
};
let mut cf_descriptors = Vec::new();
for cf_name in &all_cfs_to_open {
let mut cf_opts = Options::default();
cf_opts.set_level_zero_file_num_compaction_trigger(4);
cf_opts.set_level_zero_slowdown_writes_trigger(20);
cf_opts.set_level_zero_stop_writes_trigger(36);
if compressible_tables.contains(&cf_name.as_str()) {
cf_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
} else {
cf_opts.set_compression_type(rocksdb::DBCompressionType::None);
}
match cf_name.as_str() {
HEADERS | BODIES => {
cf_opts.set_write_buffer_size(128 * 1024 * 1024); cf_opts.set_max_write_buffer_number(4);
cf_opts.set_target_file_size_base(256 * 1024 * 1024);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(32 * 1024); configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
CANONICAL_BLOCK_HASHES | BLOCK_NUMBERS => {
cf_opts.set_write_buffer_size(64 * 1024 * 1024); cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(128 * 1024 * 1024);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); block_opts.set_bloom_filter(10.0, false);
configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
TRANSACTION_LOCATIONS => {
cf_opts.set_write_buffer_size(64 * 1024 * 1024); cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(128 * 1024 * 1024);
cf_opts.set_merge_operator_associative(
"tx_locations_merge",
tx_locations_merge_op,
);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
ACCOUNT_TRIE_NODES | STORAGE_TRIE_NODES => {
cf_opts.set_write_buffer_size(512 * 1024 * 1024); cf_opts.set_max_write_buffer_number(6);
cf_opts.set_min_write_buffer_number_to_merge(2);
cf_opts.set_target_file_size_base(256 * 1024 * 1024); cf_opts.set_memtable_prefix_bloom_ratio(0.2);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); block_opts.set_bloom_filter(10.0, false); configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
ACCOUNT_FLATKEYVALUE | STORAGE_FLATKEYVALUE => {
cf_opts.set_write_buffer_size(512 * 1024 * 1024); cf_opts.set_max_write_buffer_number(6);
cf_opts.set_min_write_buffer_number_to_merge(2);
cf_opts.set_target_file_size_base(256 * 1024 * 1024); cf_opts.set_memtable_prefix_bloom_ratio(0.2);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); block_opts.set_bloom_filter(10.0, false); configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
ACCOUNT_CODES => {
cf_opts.set_write_buffer_size(128 * 1024 * 1024); cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(256 * 1024 * 1024);
cf_opts.set_enable_blob_files(true);
cf_opts.set_min_blob_size(32);
cf_opts.set_blob_compression_type(rocksdb::DBCompressionType::Lz4);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(32 * 1024); configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
RECEIPTS_V2 => {
cf_opts.set_write_buffer_size(128 * 1024 * 1024); cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(256 * 1024 * 1024);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(32 * 1024); configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
_ => {
cf_opts.set_write_buffer_size(64 * 1024 * 1024); cf_opts.set_max_write_buffer_number(3);
cf_opts.set_target_file_size_base(128 * 1024 * 1024);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
configure_block_cache(&mut block_opts);
cf_opts.set_block_based_table_factory(&block_opts);
}
}
cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts));
}
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&opts,
path.as_ref(),
cf_descriptors,
)
.map_err(|e| StoreError::Custom(format!("Failed to open RocksDB with all CFs: {}", e)))?;
Ok(Self { db: Arc::new(db) })
}
pub fn drop_obsolete_cfs(&self, path: impl AsRef<Path>) {
let opts = Options::default();
let existing_cfs =
DBWithThreadMode::<MultiThreaded>::list_cf(&opts, path.as_ref()).unwrap_or_default();
for cf_name in &existing_cfs {
if cf_name != "default" && !TABLES.contains(&cf_name.as_str()) {
let _ = self
.db
.drop_cf(cf_name)
.inspect(|_| info!("Dropped obsolete column family '{}'", cf_name))
.inspect_err(|e|
warn!("Failed to drop obsolete column family '{}': {}", cf_name, e));
}
}
}
}
impl Drop for RocksDBBackend {
fn drop(&mut self) {
if let Some(db) = Arc::get_mut(&mut self.db) {
db.cancel_all_background_work(true);
}
}
}
impl StorageBackend for RocksDBBackend {
fn clear_table(&self, table: &'static str) -> Result<(), StoreError> {
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;
let mut iter = self.db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatch::default();
while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
}
self.db
.write(batch)
.map_err(|e| StoreError::Custom(format!("RocksDB batch write error: {}", e)))
}
fn begin_read(&self) -> Result<Arc<dyn StorageReadView>, StoreError> {
Ok(Arc::new(RocksDBReadTx {
db: self.db.clone(),
}))
}
fn begin_write(&self) -> Result<Box<dyn StorageWriteBatch + 'static>, StoreError> {
let batch = WriteBatch::default();
Ok(Box::new(RocksDBWriteTx {
db: self.db.clone(),
batch,
}))
}
fn begin_locked(
&self,
table_name: &'static str,
) -> Result<Box<dyn StorageLockedView>, StoreError> {
let db = Box::leak(Box::new(self.db.clone()));
let lock = db.snapshot();
let cf = db
.cf_handle(table_name)
.ok_or_else(|| StoreError::Custom(format!("Table {} not found", table_name)))?;
Ok(Box::new(RocksDBLocked { db, lock, cf }))
}
fn create_checkpoint(&self, path: &Path) -> Result<(), StoreError> {
let checkpoint = Checkpoint::new(&self.db)
.map_err(|e| StoreError::Custom(format!("Failed to create checkpoint: {e}")))?;
checkpoint.create_checkpoint(path).map_err(|e| {
StoreError::Custom(format!(
"Failed to create RocksDB checkpoint at {path:?}: {e}"
))
})?;
Ok(())
}
}
pub struct RocksDBReadTx {
db: Arc<DBWithThreadMode<MultiThreaded>>,
}
impl StorageReadView for RocksDBReadTx {
fn get(&self, table: &'static str, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom(format!("Table {} not found", table)))?;
self.db
.get_cf(&cf, key)
.map_err(|e| StoreError::Custom(format!("Failed to get from {}: {}", table, e)))
}
fn prefix_iterator(
&self,
table: &'static str,
prefix: &[u8],
) -> Result<Box<dyn Iterator<Item = PrefixResult> + '_>, StoreError> {
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom(format!("Table {} not found", table)))?;
let iter = self.db.prefix_iterator_cf(&cf, prefix).map(|result| {
result.map_err(|e| StoreError::Custom(format!("Failed to iterate: {e}")))
});
Ok(Box::new(iter))
}
}
pub struct RocksDBWriteTx {
db: Arc<DBWithThreadMode<MultiThreaded>>,
batch: WriteBatch,
}
impl StorageWriteBatch for RocksDBWriteTx {
fn put(&mut self, table: &'static str, key: &[u8], value: &[u8]) -> Result<(), StoreError> {
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom(format!("Table {table:?} not found")))?;
self.batch.put_cf(&cf, key, value);
Ok(())
}
fn put_batch(
&mut self,
table: &'static str,
batch: Vec<(Vec<u8>, Vec<u8>)>,
) -> Result<(), StoreError> {
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom(format!("Table {table:?} not found")))?;
for (key, value) in batch {
self.batch.put_cf(&cf, key, value);
}
Ok(())
}
fn delete(&mut self, table: &'static str, key: &[u8]) -> Result<(), StoreError> {
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom(format!("Table {} not found", table)))?;
self.batch.delete_cf(&cf, key);
Ok(())
}
fn merge(&mut self, table: &'static str, key: &[u8], operand: &[u8]) -> Result<(), StoreError> {
if table != TRANSACTION_LOCATIONS {
return Err(StoreError::Custom(format!(
"merge not supported for table {table} (no merge operator registered)"
)));
}
let cf = self
.db
.cf_handle(table)
.ok_or_else(|| StoreError::Custom(format!("Table {} not found", table)))?;
self.batch.merge_cf(&cf, key, operand);
Ok(())
}
fn commit(&mut self) -> Result<(), StoreError> {
let batch = std::mem::take(&mut self.batch);
self.db
.write(batch)
.map_err(|e| StoreError::Custom(format!("Failed to commit batch: {}", e)))
}
}
pub struct RocksDBLocked {
db: &'static Arc<DBWithThreadMode<MultiThreaded>>,
lock: SnapshotWithThreadMode<'static, DBWithThreadMode<MultiThreaded>>,
cf: Arc<rocksdb::BoundColumnFamily<'static>>,
}
impl StorageLockedView for RocksDBLocked {
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError> {
self.lock
.get_cf(&self.cf, key)
.map_err(|e| StoreError::Custom(format!("Failed to get:{e:?}")))
}
}
impl Drop for RocksDBLocked {
fn drop(&mut self) {
unsafe {
drop(Box::from_raw(
self.db as *const Arc<DBWithThreadMode<MultiThreaded>>
as *mut Arc<DBWithThreadMode<MultiThreaded>>,
));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::encode_tx_location_operand;
use ethrex_common::H256;
use ethrex_common::types::{BlockHash, BlockNumber, Index};
use ethrex_rlp::decode::RLPDecode;
#[test]
fn merge_operator_survives_flush_and_compaction() {
let dir = tempfile::tempdir().unwrap();
let backend = RocksDBBackend::open(
dir.path(),
crate::store::DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES,
)
.unwrap();
let cf = backend.db.cf_handle(TRANSACTION_LOCATIONS).unwrap();
let tx_hash = H256::from_low_u64_be(0xabcd);
let entries: Vec<(BlockNumber, BlockHash, Index)> = (0..6u64)
.map(|i| (100 + i, H256::from_low_u64_be(0x10 + i), i))
.collect();
for (bn, bh, idx) in &entries {
let mut tx = backend.begin_write().unwrap();
tx.merge(
TRANSACTION_LOCATIONS,
tx_hash.as_bytes(),
&encode_tx_location_operand(*bn, *bh, *idx),
)
.unwrap();
tx.commit().unwrap();
backend.db.flush_cf(&cf).unwrap();
}
backend
.db
.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
let read = backend.begin_read().unwrap();
let bytes = read
.get(TRANSACTION_LOCATIONS, tx_hash.as_bytes())
.unwrap()
.expect("key must exist after merge + compaction");
let mut got = <Vec<(BlockNumber, BlockHash, Index)>>::decode(&bytes).unwrap();
got.sort();
let mut want = entries;
want.sort();
assert_eq!(got, want, "no entries may be dropped through compaction");
}
#[test]
fn merge_operator_dedupes_across_compaction() {
let dir = tempfile::tempdir().unwrap();
let backend = RocksDBBackend::open(
dir.path(),
crate::store::DEFAULT_ROCKSDB_BLOCK_CACHE_SIZE_BYTES,
)
.unwrap();
let cf = backend.db.cf_handle(TRANSACTION_LOCATIONS).unwrap();
let tx_hash = H256::from_low_u64_be(0x1234);
let bh = H256::from_low_u64_be(0xaa);
for idx in [3u64, 7u64] {
let mut tx = backend.begin_write().unwrap();
tx.merge(
TRANSACTION_LOCATIONS,
tx_hash.as_bytes(),
&encode_tx_location_operand(200, bh, idx),
)
.unwrap();
tx.commit().unwrap();
backend.db.flush_cf(&cf).unwrap();
}
backend
.db
.compact_range_cf(&cf, None::<&[u8]>, None::<&[u8]>);
let read = backend.begin_read().unwrap();
let bytes = read
.get(TRANSACTION_LOCATIONS, tx_hash.as_bytes())
.unwrap()
.unwrap();
let got = <Vec<(BlockNumber, BlockHash, Index)>>::decode(&bytes).unwrap();
assert_eq!(
got,
vec![(200, bh, 7)],
"later write for same block_hash wins"
);
}
}