use alloc::{boxed::Box, vec::Vec};
use core::fmt;
use std::{mem::ManuallyDrop, path::PathBuf, sync::Arc};
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamilyDescriptor, DB, DBCompactionStyle, DBCompressionType,
DBIteratorWithThreadMode, FlushOptions, IteratorMode, Options, ReadOptions, WriteBatch,
};
use super::{
SmtStorage, SmtStorageReader, StorageError, StorageResult, StorageUpdateParts, StorageUpdates,
SubtreeUpdate,
};
use crate::{
EMPTY_WORD, Word,
merkle::{
NodeIndex,
smt::{
InnerNode, Map, SmtLeaf,
large::{IN_MEMORY_DEPTH, LargeSmt, subtree::Subtree},
},
},
utils::{Deserializable, Serializable},
};
const LEAVES_CF: &str = "leaves";
const SUBTREE_16_CF: &str = "st16";
const SUBTREE_24_CF: &str = "st24";
const SUBTREE_32_CF: &str = "st32";
const SUBTREE_40_CF: &str = "st40";
const SUBTREE_48_CF: &str = "st48";
const SUBTREE_56_CF: &str = "st56";
const METADATA_CF: &str = "metadata";
const IN_MEM_DEPTH_CF: &str = "in_mem_depth";
const LEAF_COUNT_KEY: &[u8] = b"leaf_count";
const ENTRY_COUNT_KEY: &[u8] = b"entry_count";
#[derive(Debug, Clone)]
pub struct RocksDbStorage {
db: Arc<DB>,
}
impl RocksDbStorage {
pub fn open(config: RocksDbConfig) -> StorageResult<Self> {
let mut db_opts = Options::default();
db_opts.create_if_missing(true);
db_opts.create_missing_column_families(true);
db_opts.increase_parallelism(rayon::current_num_threads() as i32);
db_opts.set_max_open_files(config.max_open_files);
db_opts.set_max_background_jobs(rayon::current_num_threads() as i32);
db_opts.set_max_total_wal_size(512 * 1024 * 1024);
let cache = Cache::new_lru_cache(config.cache_size);
let mut table_opts = BlockBasedOptions::default();
table_opts.set_block_cache(&cache);
table_opts.set_bloom_filter(10.0, false);
table_opts.set_whole_key_filtering(true);
table_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
let mut leaves_opts = Options::default();
leaves_opts.set_block_based_table_factory(&table_opts);
leaves_opts.set_write_buffer_size(128 << 20);
leaves_opts.set_max_write_buffer_number(3);
leaves_opts.set_min_write_buffer_number_to_merge(1);
leaves_opts.set_max_write_buffer_size_to_maintain(0);
leaves_opts.set_compaction_style(DBCompactionStyle::Level);
leaves_opts.set_target_file_size_base(512 << 20);
leaves_opts.set_target_file_size_multiplier(2);
leaves_opts.set_compression_type(DBCompressionType::Lz4);
leaves_opts.set_level_zero_file_num_compaction_trigger(8);
fn subtree_cf(cache: &Cache, bloom_filter_bits: f64) -> Options {
let mut tbl = BlockBasedOptions::default();
tbl.set_block_cache(cache);
tbl.set_bloom_filter(bloom_filter_bits, false);
tbl.set_whole_key_filtering(true);
tbl.set_pin_l0_filter_and_index_blocks_in_cache(true);
let mut opts = Options::default();
opts.set_block_based_table_factory(&tbl);
opts.set_write_buffer_size(128 << 20);
opts.set_max_write_buffer_number(3);
opts.set_min_write_buffer_number_to_merge(1);
opts.set_max_write_buffer_size_to_maintain(0);
opts.set_compaction_style(DBCompactionStyle::Level);
opts.set_level_zero_file_num_compaction_trigger(4);
opts.set_target_file_size_base(512 << 20);
opts.set_target_file_size_multiplier(2);
opts.set_compression_type(DBCompressionType::Lz4);
opts.set_level_zero_file_num_compaction_trigger(8);
opts
}
let mut in_mem_depth_opts = Options::default();
in_mem_depth_opts.set_compression_type(DBCompressionType::Lz4);
in_mem_depth_opts.set_block_based_table_factory(&table_opts);
let mut metadata_opts = Options::default();
metadata_opts.set_compression_type(DBCompressionType::None);
let cfs = vec![
ColumnFamilyDescriptor::new(LEAVES_CF, leaves_opts),
ColumnFamilyDescriptor::new(SUBTREE_16_CF, subtree_cf(&cache, 8.0)),
ColumnFamilyDescriptor::new(SUBTREE_24_CF, subtree_cf(&cache, 8.0)),
ColumnFamilyDescriptor::new(SUBTREE_32_CF, subtree_cf(&cache, 10.0)),
ColumnFamilyDescriptor::new(SUBTREE_40_CF, subtree_cf(&cache, 10.0)),
ColumnFamilyDescriptor::new(SUBTREE_48_CF, subtree_cf(&cache, 12.0)),
ColumnFamilyDescriptor::new(SUBTREE_56_CF, subtree_cf(&cache, 12.0)),
ColumnFamilyDescriptor::new(METADATA_CF, metadata_opts),
ColumnFamilyDescriptor::new(IN_MEM_DEPTH_CF, in_mem_depth_opts),
];
let db = DB::open_cf_descriptors(&db_opts, config.path, cfs)?;
Ok(Self { db: Arc::new(db) })
}
fn sync(&self) -> StorageResult<()> {
let mut fopts = FlushOptions::default();
fopts.set_wait(true);
for name in [
LEAVES_CF,
SUBTREE_16_CF,
SUBTREE_24_CF,
SUBTREE_32_CF,
SUBTREE_40_CF,
SUBTREE_48_CF,
SUBTREE_56_CF,
METADATA_CF,
IN_MEM_DEPTH_CF,
] {
let cf = self.cf_handle(name)?;
self.db.flush_cf_opt(cf, &fopts)?;
}
self.db.flush_wal(true)?;
Ok(())
}
#[inline(always)]
fn index_db_key(index: u64) -> [u8; 8] {
index.to_be_bytes()
}
#[inline(always)]
fn subtree_db_key(index: NodeIndex) -> KeyBytes {
let keep = match index.depth() {
16 => 2,
24 => 3,
32 => 4,
40 => 5,
48 => 6,
56 => 7,
d => panic!("unsupported depth {d}"),
};
KeyBytes::new(index.position(), keep)
}
fn cf_handle(&self, name: &str) -> StorageResult<&rocksdb::ColumnFamily> {
self.db
.cf_handle(name)
.ok_or_else(|| StorageError::Unsupported(format!("unknown column family `{name}`")))
}
#[inline(always)]
fn subtree_cf(&self, index: NodeIndex) -> &rocksdb::ColumnFamily {
let name = cf_for_depth(index.depth());
self.cf_handle(name).expect("CF handle missing")
}
}
impl SmtStorageReader for RocksDbStorage {
fn leaf_count(&self) -> StorageResult<usize> {
let cf = self.cf_handle(METADATA_CF)?;
self.db.get_cf(cf, LEAF_COUNT_KEY)?.map_or(Ok(0), |bytes| {
let arr: [u8; 8] =
bytes.as_slice().try_into().map_err(|_| StorageError::BadValueLen {
what: "leaf count",
expected: 8,
found: bytes.len(),
})?;
Ok(usize::from_be_bytes(arr))
})
}
fn entry_count(&self) -> StorageResult<usize> {
let cf = self.cf_handle(METADATA_CF)?;
self.db.get_cf(cf, ENTRY_COUNT_KEY)?.map_or(Ok(0), |bytes| {
let arr: [u8; 8] =
bytes.as_slice().try_into().map_err(|_| StorageError::BadValueLen {
what: "entry count",
expected: 8,
found: bytes.len(),
})?;
Ok(usize::from_be_bytes(arr))
})
}
fn get_leaf(&self, index: u64) -> StorageResult<Option<SmtLeaf>> {
let cf = self.cf_handle(LEAVES_CF)?;
let key = Self::index_db_key(index);
match self.db.get_cf(cf, key)? {
Some(bytes) => {
let leaf = SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?;
Ok(Some(leaf))
},
None => Ok(None),
}
}
fn get_leaves(&self, indices: &[u64]) -> StorageResult<Vec<Option<SmtLeaf>>> {
let cf = self.cf_handle(LEAVES_CF)?;
let db_keys: Vec<[u8; 8]> = indices.iter().map(|&idx| Self::index_db_key(idx)).collect();
let results = self.db.multi_get_cf(db_keys.iter().map(|k| (cf, k.as_ref())));
results
.into_iter()
.map(|result| match result {
Ok(Some(bytes)) => {
Ok(Some(SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?))
},
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
})
.collect()
}
fn has_leaves(&self) -> StorageResult<bool> {
Ok(self.leaf_count()? > 0)
}
fn get_subtree(&self, index: NodeIndex) -> StorageResult<Option<Subtree>> {
let cf = self.subtree_cf(index);
let key = Self::subtree_db_key(index);
match self.db.get_cf(cf, key)? {
Some(bytes) => {
let subtree = Subtree::from_vec(index, &bytes)?;
Ok(Some(subtree))
},
None => Ok(None),
}
}
fn get_subtrees(&self, indices: &[NodeIndex]) -> StorageResult<Vec<Option<Subtree>>> {
use p3_maybe_rayon::prelude::*;
let mut depth_buckets: [Vec<(usize, NodeIndex)>; 6] = Default::default();
for (original_index, &node_index) in indices.iter().enumerate() {
let depth = node_index.depth();
let bucket_index = match depth {
56 => 0,
48 => 1,
40 => 2,
32 => 3,
24 => 4,
16 => 5,
_ => {
return Err(StorageError::Unsupported(format!(
"unsupported subtree depth {depth}"
)));
},
};
depth_buckets[bucket_index].push((original_index, node_index));
}
let mut results = vec![None; indices.len()];
let bucket_results: StorageResult<Vec<_>> = depth_buckets
.into_par_iter()
.enumerate()
.filter(|(_, bucket)| !bucket.is_empty())
.map(|(bucket_index, bucket)| -> StorageResult<Vec<(usize, Option<Subtree>)>> {
let depth = LargeSmt::<RocksDbStorage>::SUBTREE_DEPTHS[bucket_index];
let cf = self.cf_handle(cf_for_depth(depth))?;
let keys: Vec<_> =
bucket.iter().map(|(_, idx)| Self::subtree_db_key(*idx)).collect();
let db_results = self.db.multi_get_cf(keys.iter().map(|k| (cf, k.as_ref())));
bucket
.into_iter()
.zip(db_results)
.map(|((original_index, node_index), db_result)| {
let subtree = match db_result {
Ok(Some(bytes)) => Some(Subtree::from_vec(node_index, &bytes)?),
Ok(None) => None,
Err(e) => return Err(e.into()),
};
Ok((original_index, subtree))
})
.collect()
})
.collect();
for bucket_result in bucket_results? {
for (original_index, subtree) in bucket_result {
results[original_index] = subtree;
}
}
Ok(results)
}
fn get_inner_node(&self, index: NodeIndex) -> StorageResult<Option<InnerNode>> {
if index.depth() < IN_MEMORY_DEPTH {
return Err(StorageError::Unsupported(
"Cannot get inner node from upper part of the tree".into(),
));
}
let subtree_root_index = Subtree::find_subtree_root(index);
Ok(self
.get_subtree(subtree_root_index)?
.and_then(|subtree| subtree.get_inner_node(index)))
}
fn iter_leaves(
&self,
) -> StorageResult<Box<dyn Iterator<Item = StorageResult<(u64, SmtLeaf)>> + '_>> {
let cf = self.cf_handle(LEAVES_CF)?;
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let db_iter = self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start);
Ok(Box::new(RocksDbDirectLeafIterator { iter: db_iter }))
}
fn iter_subtrees(
&self,
) -> StorageResult<Box<dyn Iterator<Item = StorageResult<Subtree>> + '_>> {
const SUBTREE_CFS: [&str; 6] = [
SUBTREE_16_CF,
SUBTREE_24_CF,
SUBTREE_32_CF,
SUBTREE_40_CF,
SUBTREE_48_CF,
SUBTREE_56_CF,
];
let mut cf_handles = Vec::new();
for cf_name in SUBTREE_CFS {
cf_handles.push(self.cf_handle(cf_name)?);
}
Ok(Box::new(RocksDbSubtreeIterator::new(&self.db, cf_handles)))
}
fn get_top_subtree_roots(&self) -> StorageResult<Vec<(u64, Word)>> {
let cf = self.cf_handle(IN_MEM_DEPTH_CF)?;
let iter = self.db.iterator_cf(cf, IteratorMode::Start);
let mut hashes = Vec::new();
for item in iter {
let (key_bytes, value_bytes) = item?;
let index = index_from_key_bytes(&key_bytes)?;
let hash = Word::read_from_bytes_with_budget(&value_bytes, value_bytes.len())?;
hashes.push((index, hash));
}
Ok(hashes)
}
}
impl SmtStorage for RocksDbStorage {
type Reader = RocksDbSnapshotStorage;
fn reader(&self) -> StorageResult<Self::Reader> {
Ok(RocksDbSnapshotStorage::new(Arc::clone(&self.db)))
}
fn insert_value(&mut self, index: u64, key: Word, value: Word) -> StorageResult<Option<Word>> {
debug_assert_ne!(value, EMPTY_WORD);
let mut batch = WriteBatch::default();
let mut current_leaf_count = self.leaf_count()?;
let mut current_entry_count = self.entry_count()?;
let leaves_cf = self.cf_handle(LEAVES_CF)?;
let db_key = Self::index_db_key(index);
let maybe_leaf = self.get_leaf(index)?;
let value_to_return: Option<Word> = match maybe_leaf {
Some(mut existing_leaf) => {
let old_value = existing_leaf.insert(key, value).expect("Failed to insert value");
if old_value.is_none_or(|old_v| old_v == EMPTY_WORD) {
current_entry_count += 1;
}
batch.put_cf(leaves_cf, db_key, existing_leaf.to_bytes());
old_value
},
None => {
let new_leaf = SmtLeaf::Single((key, value));
current_leaf_count += 1;
current_entry_count += 1;
batch.put_cf(leaves_cf, db_key, new_leaf.to_bytes());
None
},
};
let metadata_cf = self.cf_handle(METADATA_CF)?;
batch.put_cf(metadata_cf, LEAF_COUNT_KEY, current_leaf_count.to_be_bytes());
batch.put_cf(metadata_cf, ENTRY_COUNT_KEY, current_entry_count.to_be_bytes());
self.db.write(batch)?;
Ok(value_to_return)
}
fn remove_value(&mut self, index: u64, key: Word) -> StorageResult<Option<Word>> {
let Some(mut leaf) = self.get_leaf(index)? else {
return Ok(None);
};
let mut batch = WriteBatch::default();
let cf = self.cf_handle(LEAVES_CF)?;
let metadata_cf = self.cf_handle(METADATA_CF)?;
let db_key = Self::index_db_key(index);
let mut entry_count = self.entry_count()?;
let mut leaf_count = self.leaf_count()?;
let (current_value, is_empty) = leaf.remove(key);
if let Some(current_value) = current_value
&& current_value != EMPTY_WORD
{
entry_count -= 1;
}
if is_empty {
leaf_count -= 1;
batch.delete_cf(cf, db_key);
} else {
batch.put_cf(cf, db_key, leaf.to_bytes());
}
batch.put_cf(metadata_cf, LEAF_COUNT_KEY, leaf_count.to_be_bytes());
batch.put_cf(metadata_cf, ENTRY_COUNT_KEY, entry_count.to_be_bytes());
self.db.write(batch)?;
Ok(current_value)
}
fn set_leaves(&mut self, leaves: Map<u64, SmtLeaf>) -> StorageResult<()> {
let cf = self.cf_handle(LEAVES_CF)?;
let leaf_count: usize = leaves.len();
let entry_count: usize = leaves.values().map(|leaf| leaf.entries().len()).sum();
let mut batch = WriteBatch::default();
for (idx, leaf) in leaves {
let key = Self::index_db_key(idx);
let value = leaf.to_bytes();
batch.put_cf(cf, key, &value);
}
let metadata_cf = self.cf_handle(METADATA_CF)?;
batch.put_cf(metadata_cf, LEAF_COUNT_KEY, leaf_count.to_be_bytes());
batch.put_cf(metadata_cf, ENTRY_COUNT_KEY, entry_count.to_be_bytes());
self.db.write(batch)?;
Ok(())
}
fn remove_leaf(&mut self, index: u64) -> StorageResult<Option<SmtLeaf>> {
let key = Self::index_db_key(index);
let cf = self.cf_handle(LEAVES_CF)?;
let old_bytes = self.db.get_cf(cf, key)?;
self.db.delete_cf(cf, key)?;
Ok(old_bytes.map(|bytes| {
SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())
.expect("failed to deserialize leaf")
}))
}
fn set_subtree(&mut self, subtree: &Subtree) -> StorageResult<()> {
let subtrees_cf = self.subtree_cf(subtree.root_index());
let mut batch = WriteBatch::default();
let key = Self::subtree_db_key(subtree.root_index());
let value = subtree.to_vec();
batch.put_cf(subtrees_cf, key, value);
if subtree.root_index().depth() == IN_MEMORY_DEPTH {
let root_hash = subtree
.get_inner_node(subtree.root_index())
.ok_or_else(|| StorageError::Unsupported("Subtree root node not found".into()))?
.hash();
let in_mem_depth_cf = self.cf_handle(IN_MEM_DEPTH_CF)?;
let hash_key = Self::index_db_key(subtree.root_index().position());
batch.put_cf(in_mem_depth_cf, hash_key, root_hash.to_bytes());
}
self.db.write(batch)?;
Ok(())
}
fn set_subtrees(&mut self, subtrees: Vec<Subtree>) -> StorageResult<()> {
let in_mem_depth_cf = self.cf_handle(IN_MEM_DEPTH_CF)?;
let mut batch = WriteBatch::default();
for subtree in subtrees {
let subtrees_cf = self.subtree_cf(subtree.root_index());
let key = Self::subtree_db_key(subtree.root_index());
let value = subtree.to_vec();
batch.put_cf(subtrees_cf, key, value);
if subtree.root_index().depth() == IN_MEMORY_DEPTH
&& let Some(root_node) = subtree.get_inner_node(subtree.root_index())
{
let hash_key = Self::index_db_key(subtree.root_index().position());
batch.put_cf(in_mem_depth_cf, hash_key, root_node.hash().to_bytes());
}
}
self.db.write(batch)?;
Ok(())
}
fn remove_subtree(&mut self, index: NodeIndex) -> StorageResult<()> {
let subtrees_cf = self.subtree_cf(index);
let mut batch = WriteBatch::default();
let key = Self::subtree_db_key(index);
batch.delete_cf(subtrees_cf, key);
if index.depth() == IN_MEMORY_DEPTH {
let in_mem_depth_cf = self.cf_handle(IN_MEM_DEPTH_CF)?;
let hash_key = Self::index_db_key(index.position());
batch.delete_cf(in_mem_depth_cf, hash_key);
}
self.db.write(batch)?;
Ok(())
}
fn set_inner_node(
&mut self,
index: NodeIndex,
node: InnerNode,
) -> StorageResult<Option<InnerNode>> {
if index.depth() < IN_MEMORY_DEPTH {
return Err(StorageError::Unsupported(
"Cannot set inner node in upper part of the tree".into(),
));
}
let subtree_root_index = Subtree::find_subtree_root(index);
let mut subtree = self
.get_subtree(subtree_root_index)?
.unwrap_or_else(|| Subtree::new(subtree_root_index));
let old_node = subtree.insert_inner_node(index, node);
self.set_subtree(&subtree)?;
Ok(old_node)
}
fn remove_inner_node(&mut self, index: NodeIndex) -> StorageResult<Option<InnerNode>> {
if index.depth() < IN_MEMORY_DEPTH {
return Err(StorageError::Unsupported(
"Cannot remove inner node from upper part of the tree".into(),
));
}
let subtree_root_index = Subtree::find_subtree_root(index);
self.get_subtree(subtree_root_index)
.and_then(|maybe_subtree| match maybe_subtree {
Some(mut subtree) => {
let old_node = subtree.remove_inner_node(index);
let db_operation_result = if subtree.is_empty() {
self.remove_subtree(subtree_root_index)
} else {
self.set_subtree(&subtree)
};
db_operation_result.map(|_| old_node)
},
None => Ok(None),
})
}
fn apply(&mut self, updates: StorageUpdates) -> StorageResult<()> {
use p3_maybe_rayon::prelude::*;
let mut batch = WriteBatch::default();
let leaves_cf = self.cf_handle(LEAVES_CF)?;
let metadata_cf = self.cf_handle(METADATA_CF)?;
let in_mem_depth_cf = self.cf_handle(IN_MEM_DEPTH_CF)?;
let StorageUpdateParts {
leaf_updates,
subtree_updates,
leaf_count_delta,
entry_count_delta,
} = updates.into_parts();
for (index, maybe_leaf) in leaf_updates {
let key = Self::index_db_key(index);
match maybe_leaf {
Some(leaf) => batch.put_cf(leaves_cf, key, leaf.to_bytes()),
None => batch.delete_cf(leaves_cf, key),
}
}
let is_in_mem_depth = |index: NodeIndex| index.depth() == IN_MEMORY_DEPTH;
let subtree_ops: StorageResult<Vec<_>> = subtree_updates
.into_par_iter()
.map(|update| -> StorageResult<_> {
let (index, maybe_bytes, in_mem_depth_op) = match update {
SubtreeUpdate::Store { index, subtree } => {
let bytes = subtree.to_vec();
let in_mem_depth_op = is_in_mem_depth(index)
.then(|| subtree.get_inner_node(index))
.flatten()
.map(|root_node| {
let hash_key = Self::index_db_key(index.position());
(hash_key, Some(root_node.hash().to_bytes()))
});
(index, Some(bytes), in_mem_depth_op)
},
SubtreeUpdate::Delete { index } => {
let in_mem_depth_op = is_in_mem_depth(index).then(|| {
let hash_key = Self::index_db_key(index.position());
(hash_key, None)
});
(index, None, in_mem_depth_op)
},
};
let key = Self::subtree_db_key(index);
let subtrees_cf = self.subtree_cf(index);
Ok((subtrees_cf, key, maybe_bytes, in_mem_depth_op))
})
.collect();
for (subtrees_cf, key, maybe_bytes, in_mem_depth_op) in subtree_ops? {
match maybe_bytes {
Some(bytes) => batch.put_cf(subtrees_cf, key, bytes),
None => batch.delete_cf(subtrees_cf, key),
}
if let Some((hash_key, maybe_hash_bytes)) = in_mem_depth_op {
match maybe_hash_bytes {
Some(hash_bytes) => batch.put_cf(in_mem_depth_cf, hash_key, hash_bytes),
None => batch.delete_cf(in_mem_depth_cf, hash_key),
}
}
}
if leaf_count_delta != 0 || entry_count_delta != 0 {
let current_leaf_count = self.leaf_count()?;
let current_entry_count = self.entry_count()?;
let new_leaf_count = current_leaf_count.saturating_add_signed(leaf_count_delta);
let new_entry_count = current_entry_count.saturating_add_signed(entry_count_delta);
batch.put_cf(metadata_cf, LEAF_COUNT_KEY, new_leaf_count.to_be_bytes());
batch.put_cf(metadata_cf, ENTRY_COUNT_KEY, new_entry_count.to_be_bytes());
}
let mut write_opts = rocksdb::WriteOptions::default();
write_opts.set_sync(false);
self.db.write_opt(batch, &write_opts)?;
Ok(())
}
}
impl Drop for RocksDbStorage {
fn drop(&mut self) {
if let Err(e) = self.sync() {
panic!("failed to flush RocksDB on drop: {e}");
}
}
}
struct RocksDbDirectLeafIterator<'a> {
iter: DBIteratorWithThreadMode<'a, DB>,
}
impl Iterator for RocksDbDirectLeafIterator<'_> {
type Item = StorageResult<(u64, SmtLeaf)>;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|result| {
result.map_err(StorageError::from).and_then(|(key_bytes, value_bytes)| {
let leaf_idx = index_from_key_bytes(&key_bytes)?;
let leaf = SmtLeaf::read_from_bytes_with_budget(&value_bytes, value_bytes.len())?;
Ok((leaf_idx, leaf))
})
})
}
}
struct RocksDbSubtreeIterator<'a> {
db: &'a DB,
cf_handles: Vec<&'a rocksdb::ColumnFamily>,
current_cf_index: usize,
current_iter: Option<DBIteratorWithThreadMode<'a, DB>>,
}
impl<'a> RocksDbSubtreeIterator<'a> {
fn new(db: &'a DB, cf_handles: Vec<&'a rocksdb::ColumnFamily>) -> Self {
let mut iterator = Self {
db,
cf_handles,
current_cf_index: 0,
current_iter: None,
};
iterator.advance_to_next_cf();
iterator
}
fn advance_to_next_cf(&mut self) {
if self.current_cf_index < self.cf_handles.len() {
let cf = self.cf_handles[self.current_cf_index];
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
self.current_iter = Some(self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start));
} else {
self.current_iter = None;
}
}
fn next_from_iter(
iter: &mut DBIteratorWithThreadMode<DB>,
cf_index: usize,
) -> Option<StorageResult<Subtree>> {
iter.next().map(|result| {
result.map_err(StorageError::from).and_then(|(key_bytes, value_bytes)| {
let depth = IN_MEMORY_DEPTH + (cf_index * 8) as u8;
let node_idx = subtree_root_from_key_bytes(&key_bytes, depth)?;
let value_vec = value_bytes.into_vec();
Ok(Subtree::from_vec(node_idx, &value_vec)?)
})
})
}
}
impl Iterator for RocksDbSubtreeIterator<'_> {
type Item = StorageResult<Subtree>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let iter = self.current_iter.as_mut()?;
if let Some(result) = Self::next_from_iter(iter, self.current_cf_index) {
return Some(result);
}
self.current_cf_index += 1;
self.advance_to_next_cf();
self.current_iter.as_ref()?;
}
}
}
#[derive(Debug, Clone)]
pub struct RocksDbConfig {
pub(crate) path: PathBuf,
pub(crate) cache_size: usize,
pub(crate) max_open_files: i32,
}
impl RocksDbConfig {
pub fn new<P: Into<PathBuf>>(path: P) -> Self {
Self {
path: path.into(),
cache_size: 1 << 30,
max_open_files: 512,
}
}
pub fn with_cache_size(mut self, size: usize) -> Self {
self.cache_size = size;
self
}
pub fn with_max_open_files(mut self, count: i32) -> Self {
self.max_open_files = count;
self
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
pub(crate) struct KeyBytes {
bytes: [u8; 8],
len: u8,
}
impl KeyBytes {
#[inline(always)]
pub fn new(value: u64, keep: usize) -> Self {
debug_assert!((2..=7).contains(&keep));
let bytes = value.to_be_bytes();
debug_assert!(bytes[..8 - keep].iter().all(|&b| b == 0));
Self { bytes, len: keep as u8 }
}
#[inline(always)]
pub fn as_slice(&self) -> &[u8] {
&self.bytes[8 - self.len as usize..]
}
}
impl AsRef<[u8]> for KeyBytes {
#[inline(always)]
fn as_ref(&self) -> &[u8] {
self.as_slice()
}
}
fn index_from_key_bytes(key_bytes: &[u8]) -> StorageResult<u64> {
if key_bytes.len() != 8 {
return Err(StorageError::BadKeyLen { expected: 8, found: key_bytes.len() });
}
let mut arr = [0u8; 8];
arr.copy_from_slice(key_bytes);
Ok(u64::from_be_bytes(arr))
}
fn read_count(what: &'static str, bytes: &[u8]) -> StorageResult<usize> {
let arr: [u8; 8] = bytes.try_into().map_err(|_| StorageError::BadValueLen {
what,
expected: 8,
found: bytes.len(),
})?;
Ok(usize::from_be_bytes(arr))
}
fn collect_to_subtree_roots(
iter: DBIteratorWithThreadMode<'_, DB>,
) -> StorageResult<Vec<(u64, Word)>> {
let mut hashes = Vec::new();
for item in iter {
let (key_bytes, value_bytes) = item?;
let index = index_from_key_bytes(&key_bytes)?;
let hash = Word::read_from_bytes_with_budget(&value_bytes, value_bytes.len())?;
hashes.push((index, hash));
}
Ok(hashes)
}
#[inline(always)]
fn subtree_root_from_key_bytes(key_bytes: &[u8], depth: u8) -> StorageResult<NodeIndex> {
let expected = match depth {
16 => 2,
24 => 3,
32 => 4,
40 => 5,
48 => 6,
56 => 7,
d => return Err(StorageError::Unsupported(format!("unsupported subtree depth {d}"))),
};
if key_bytes.len() != expected {
return Err(StorageError::BadSubtreeKeyLen { depth, expected, found: key_bytes.len() });
}
let mut buf = [0u8; 8];
buf[8 - expected..].copy_from_slice(key_bytes);
let value = u64::from_be_bytes(buf);
Ok(NodeIndex::new_unchecked(depth, value))
}
#[inline(always)]
fn cf_for_depth(depth: u8) -> &'static str {
match depth {
16 => SUBTREE_16_CF,
24 => SUBTREE_24_CF,
32 => SUBTREE_32_CF,
40 => SUBTREE_40_CF,
48 => SUBTREE_48_CF,
56 => SUBTREE_56_CF,
_ => panic!("unsupported subtree depth: {depth}"),
}
}
impl From<rocksdb::Error> for StorageError {
fn from(e: rocksdb::Error) -> Self {
StorageError::Backend(Box::new(e))
}
}
#[derive(Clone)]
pub struct RocksDbSnapshotStorage {
inner: Arc<RocksDbSnapshotInner>,
}
impl fmt::Debug for RocksDbSnapshotStorage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RocksDbSnapshotStorage").finish_non_exhaustive()
}
}
struct RocksDbSnapshotInner {
snapshot: ManuallyDrop<rocksdb::Snapshot<'static>>,
db: Arc<DB>,
}
impl RocksDbSnapshotInner {
fn new(db: Arc<DB>) -> Self {
let snapshot = db.snapshot();
let snapshot = unsafe {
core::mem::transmute::<rocksdb::Snapshot<'_>, rocksdb::Snapshot<'static>>(snapshot)
};
Self {
snapshot: ManuallyDrop::new(snapshot),
db,
}
}
}
impl Drop for RocksDbSnapshotInner {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.snapshot);
}
}
}
impl RocksDbSnapshotStorage {
pub fn new(db: Arc<DB>) -> Self {
Self {
inner: Arc::new(RocksDbSnapshotInner::new(db)),
}
}
fn cf_handle(&self, name: &str) -> StorageResult<&rocksdb::ColumnFamily> {
self.inner
.db
.cf_handle(name)
.ok_or_else(|| StorageError::Unsupported(format!("unknown column family `{name}`")))
}
#[inline(always)]
fn subtree_cf(&self, index: NodeIndex) -> &rocksdb::ColumnFamily {
let name = cf_for_depth(index.depth());
self.cf_handle(name).expect("CF handle missing")
}
}
impl SmtStorageReader for RocksDbSnapshotStorage {
fn leaf_count(&self) -> StorageResult<usize> {
let cf = self.cf_handle(METADATA_CF)?;
self.inner
.snapshot
.get_cf(cf, LEAF_COUNT_KEY)?
.map_or(Ok(0), |bytes| read_count("leaf count", &bytes))
}
fn entry_count(&self) -> StorageResult<usize> {
let cf = self.cf_handle(METADATA_CF)?;
self.inner
.snapshot
.get_cf(cf, ENTRY_COUNT_KEY)?
.map_or(Ok(0), |bytes| read_count("entry count", &bytes))
}
fn get_leaf(&self, index: u64) -> StorageResult<Option<SmtLeaf>> {
let cf = self.cf_handle(LEAVES_CF)?;
let key = RocksDbStorage::index_db_key(index);
match self.inner.snapshot.get_cf(cf, key)? {
Some(bytes) => {
let leaf = SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?;
Ok(Some(leaf))
},
None => Ok(None),
}
}
fn get_leaves(&self, indices: &[u64]) -> StorageResult<Vec<Option<SmtLeaf>>> {
let cf = self.cf_handle(LEAVES_CF)?;
let db_keys: Vec<[u8; 8]> =
indices.iter().map(|&idx| RocksDbStorage::index_db_key(idx)).collect();
let results = self.inner.snapshot.multi_get_cf(db_keys.iter().map(|k| (cf, k.as_ref())));
results
.into_iter()
.map(|result| match result {
Ok(Some(bytes)) => {
Ok(Some(SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?))
},
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
})
.collect()
}
fn has_leaves(&self) -> StorageResult<bool> {
Ok(self.leaf_count()? > 0)
}
fn get_subtree(&self, index: NodeIndex) -> StorageResult<Option<Subtree>> {
let cf = self.subtree_cf(index);
let key = RocksDbStorage::subtree_db_key(index);
match self.inner.snapshot.get_cf(cf, key)? {
Some(bytes) => {
let subtree = Subtree::from_vec(index, &bytes)?;
Ok(Some(subtree))
},
None => Ok(None),
}
}
fn get_subtrees(&self, indices: &[NodeIndex]) -> StorageResult<Vec<Option<Subtree>>> {
use p3_maybe_rayon::prelude::*;
let mut depth_buckets: [Vec<(usize, NodeIndex)>; 6] = Default::default();
for (original_index, &node_index) in indices.iter().enumerate() {
let depth = node_index.depth();
let bucket_index = match depth {
56 => 0,
48 => 1,
40 => 2,
32 => 3,
24 => 4,
16 => 5,
_ => {
return Err(StorageError::Unsupported(format!(
"unsupported subtree depth {depth}"
)));
},
};
depth_buckets[bucket_index].push((original_index, node_index));
}
let mut results = vec![None; indices.len()];
let bucket_results: StorageResult<Vec<_>> = depth_buckets
.into_par_iter()
.enumerate()
.filter(|(_, bucket)| !bucket.is_empty())
.map(|(bucket_index, bucket)| -> StorageResult<Vec<(usize, Option<Subtree>)>> {
let depth = LargeSmt::<RocksDbStorage>::SUBTREE_DEPTHS[bucket_index];
let cf = self.cf_handle(cf_for_depth(depth))?;
let keys: Vec<_> =
bucket.iter().map(|(_, idx)| RocksDbStorage::subtree_db_key(*idx)).collect();
let db_results =
self.inner.snapshot.multi_get_cf(keys.iter().map(|k| (cf, k.as_ref())));
bucket
.into_iter()
.zip(db_results)
.map(|((original_index, node_index), db_result)| {
let subtree = match db_result {
Ok(Some(bytes)) => Some(Subtree::from_vec(node_index, &bytes)?),
Ok(None) => None,
Err(e) => return Err(e.into()),
};
Ok((original_index, subtree))
})
.collect()
})
.collect();
for bucket_result in bucket_results? {
for (original_index, subtree) in bucket_result {
results[original_index] = subtree;
}
}
Ok(results)
}
fn get_inner_node(&self, index: NodeIndex) -> StorageResult<Option<InnerNode>> {
if index.depth() < IN_MEMORY_DEPTH {
return Err(StorageError::Unsupported(
"Cannot get inner node from upper part of the tree".into(),
));
}
let subtree_root_index = Subtree::find_subtree_root(index);
Ok(self
.get_subtree(subtree_root_index)?
.and_then(|subtree| subtree.get_inner_node(index)))
}
fn iter_leaves(
&self,
) -> StorageResult<Box<dyn Iterator<Item = StorageResult<(u64, SmtLeaf)>> + '_>> {
let cf = self.cf_handle(LEAVES_CF)?;
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let db_iter = self.inner.snapshot.iterator_cf_opt(cf, read_opts, IteratorMode::Start);
Ok(Box::new(RocksDbDirectLeafIterator { iter: db_iter }))
}
fn iter_subtrees(
&self,
) -> StorageResult<Box<dyn Iterator<Item = StorageResult<Subtree>> + '_>> {
const SUBTREE_CFS: [&str; 6] = [
SUBTREE_16_CF,
SUBTREE_24_CF,
SUBTREE_32_CF,
SUBTREE_40_CF,
SUBTREE_48_CF,
SUBTREE_56_CF,
];
let mut cf_handles = Vec::new();
for cf_name in SUBTREE_CFS {
cf_handles.push(self.cf_handle(cf_name)?);
}
Ok(Box::new(RocksDbSnapshotSubtreeIterator::new(&self.inner.snapshot, cf_handles)))
}
fn get_top_subtree_roots(&self) -> StorageResult<Vec<(u64, Word)>> {
let cf = self.cf_handle(IN_MEM_DEPTH_CF)?;
let iter = self.inner.snapshot.iterator_cf(cf, IteratorMode::Start);
collect_to_subtree_roots(iter)
}
}
struct RocksDbSnapshotSubtreeIterator<'a> {
snapshot: &'a rocksdb::Snapshot<'static>,
cf_handles: Vec<&'a rocksdb::ColumnFamily>,
current_cf_index: usize,
current_iter: Option<DBIteratorWithThreadMode<'a, DB>>,
}
impl<'a> RocksDbSnapshotSubtreeIterator<'a> {
fn new(
snapshot: &'a rocksdb::Snapshot<'static>,
cf_handles: Vec<&'a rocksdb::ColumnFamily>,
) -> Self {
let mut iterator = Self {
snapshot,
cf_handles,
current_cf_index: 0,
current_iter: None,
};
iterator.advance_to_next_cf();
iterator
}
fn advance_to_next_cf(&mut self) {
if self.current_cf_index < self.cf_handles.len() {
let cf = self.cf_handles[self.current_cf_index];
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
self.current_iter =
Some(self.snapshot.iterator_cf_opt(cf, read_opts, IteratorMode::Start));
} else {
self.current_iter = None;
}
}
}
impl Iterator for RocksDbSnapshotSubtreeIterator<'_> {
type Item = StorageResult<Subtree>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let iter = self.current_iter.as_mut()?;
if let Some(result) =
RocksDbSubtreeIterator::next_from_iter(iter, self.current_cf_index)
{
return Some(result);
}
self.current_cf_index += 1;
self.advance_to_next_cf();
self.current_iter.as_ref()?;
}
}
}