use crate::digest::ValueDigest;
use crate::node::ProllyNode;
use crate::storage::{NodeStorage, StorageError};
use lru::LruCache;
use rocksdb::{
BlockBasedOptions, Cache, DBCompressionType, Options, SliceTransform, WriteBatch, DB,
};
use std::num::NonZeroUsize;
const DEFAULT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap();
use parking_lot::Mutex;
use std::path::PathBuf;
use std::sync::Arc;
const CONFIG_PREFIX: &[u8] = b"config:";
const NODE_PREFIX: &[u8] = b"node:";
#[derive(Debug)]
pub struct RocksDBNodeStorage<const N: usize> {
db: Arc<DB>,
cache: Arc<Mutex<LruCache<ValueDigest<N>, Arc<ProllyNode<N>>>>>,
}
impl<const N: usize> Clone for RocksDBNodeStorage<N> {
fn clone(&self) -> Self {
RocksDBNodeStorage {
db: self.db.clone(),
cache: Arc::new(Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE))),
}
}
}
impl<const N: usize> RocksDBNodeStorage<N> {
pub fn new(path: PathBuf) -> Result<Self, rocksdb::Error> {
let opts = Self::default_options();
let db = DB::open(&opts, path)?;
Ok(RocksDBNodeStorage {
db: Arc::new(db),
cache: Arc::new(Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE))),
})
}
pub fn with_options(path: PathBuf, opts: Options) -> Result<Self, rocksdb::Error> {
let db = DB::open(&opts, path)?;
Ok(RocksDBNodeStorage {
db: Arc::new(db),
cache: Arc::new(Mutex::new(LruCache::new(DEFAULT_CACHE_SIZE))),
})
}
pub fn with_cache_size(path: PathBuf, cache_size: usize) -> Result<Self, rocksdb::Error> {
let opts = Self::default_options();
let db = DB::open(&opts, path)?;
Ok(RocksDBNodeStorage {
db: Arc::new(db),
cache: Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(cache_size).unwrap_or(DEFAULT_CACHE_SIZE),
))),
})
}
pub fn default_options() -> Options {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_write_buffer_size(128 * 1024 * 1024); opts.set_max_write_buffer_number(4);
opts.set_min_write_buffer_number_to_merge(2);
opts.set_compression_type(DBCompressionType::Lz4);
opts.set_bottommost_compression_type(DBCompressionType::Zstd);
let mut block_opts = BlockBasedOptions::default();
block_opts.set_bloom_filter(10.0, false);
let cache = Cache::new_lru_cache(512 * 1024 * 1024); block_opts.set_block_cache(&cache);
opts.set_block_based_table_factory(&block_opts);
let prefix_len = NODE_PREFIX.len() + N;
opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(prefix_len));
opts
}
fn node_key(hash: &ValueDigest<N>) -> Vec<u8> {
let mut key = Vec::with_capacity(NODE_PREFIX.len() + N);
key.extend_from_slice(NODE_PREFIX);
key.extend_from_slice(&hash.0);
key
}
fn config_key(key: &str) -> Vec<u8> {
let mut result = Vec::with_capacity(CONFIG_PREFIX.len() + key.len());
result.extend_from_slice(CONFIG_PREFIX);
result.extend_from_slice(key.as_bytes());
result
}
}
impl<const N: usize> NodeStorage<N> for RocksDBNodeStorage<N> {
fn get_node_by_hash(&self, hash: &ValueDigest<N>) -> Option<Arc<ProllyNode<N>>> {
if let Some(node) = self.cache.lock().get(hash) {
return Some(Arc::clone(node));
}
let key = Self::node_key(hash);
match self.db.get(&key) {
Ok(Some(data)) => {
match bincode::deserialize::<ProllyNode<N>>(&data) {
Ok(node) => {
let node = Arc::new(node);
self.cache.lock().put(hash.clone(), Arc::clone(&node));
Some(node)
}
Err(_) => None,
}
}
_ => None,
}
}
fn insert_node(
&mut self,
hash: ValueDigest<N>,
node: ProllyNode<N>,
) -> Result<(), StorageError> {
self.cache.lock().put(hash.clone(), Arc::new(node.clone()));
let data = bincode::serialize(&node)?;
let key = Self::node_key(&hash);
self.db
.put(&key, data)
.map_err(|e| StorageError::Other(e.to_string()))
}
fn delete_node(&mut self, hash: &ValueDigest<N>) -> Result<(), StorageError> {
self.cache.lock().pop(hash);
let key = Self::node_key(hash);
self.db
.delete(&key)
.map_err(|e| StorageError::Other(e.to_string()))
}
fn save_config(&self, key: &str, config: &[u8]) {
let db_key = Self::config_key(key);
let _ = self.db.put(&db_key, config);
}
fn get_config(&self, key: &str) -> Option<Vec<u8>> {
let db_key = Self::config_key(key);
self.db.get(&db_key).ok().flatten()
}
}
impl<const N: usize> RocksDBNodeStorage<N> {
pub fn batch_insert_nodes(
&mut self,
nodes: Vec<(ValueDigest<N>, ProllyNode<N>)>,
) -> Result<(), rocksdb::Error> {
let mut batch = WriteBatch::default();
let mut cache = self.cache.lock();
for (hash, node) in nodes {
cache.put(hash.clone(), Arc::new(node.clone()));
match bincode::serialize(&node) {
Ok(data) => {
let key = Self::node_key(&hash);
batch.put(&key, data);
}
Err(_) => {
continue;
}
}
}
self.db.write(batch)
}
pub fn batch_delete_nodes(&mut self, hashes: &[ValueDigest<N>]) -> Result<(), rocksdb::Error> {
let mut batch = WriteBatch::default();
let mut cache = self.cache.lock();
for hash in hashes {
cache.pop(hash);
let key = Self::node_key(hash);
batch.delete(&key);
}
self.db.write(batch)
}
pub fn flush(&self) -> Result<(), rocksdb::Error> {
self.db.flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::TreeConfig;
use tempfile::TempDir;
fn create_test_node<const N: usize>() -> ProllyNode<N> {
let config: TreeConfig<N> = TreeConfig::default();
ProllyNode {
keys: vec![b"key1".to_vec(), b"key2".to_vec()],
key_schema: config.key_schema.clone(),
values: vec![b"value1".to_vec(), b"value2".to_vec()],
value_schema: config.value_schema.clone(),
is_leaf: true,
level: 0,
base: config.base,
modulus: config.modulus,
min_chunk_size: config.min_chunk_size,
max_chunk_size: config.max_chunk_size,
pattern: config.pattern,
split: false,
merged: false,
encode_types: Vec::new(),
encode_values: Vec::new(),
}
}
#[test]
fn test_rocksdb_basic_operations() {
let temp_dir = TempDir::new().unwrap();
let mut storage = RocksDBNodeStorage::<32>::new(temp_dir.path().to_path_buf()).unwrap();
let node = create_test_node();
let hash = node.get_hash();
storage.insert_node(hash.clone(), node.clone()).unwrap();
let retrieved = storage.get_node_by_hash(&hash);
assert!(retrieved.is_some());
let retrieved_node = retrieved.unwrap();
assert_eq!(retrieved_node.keys, node.keys);
assert_eq!(retrieved_node.values, node.values);
assert_eq!(retrieved_node.is_leaf, node.is_leaf);
storage.delete_node(&hash).unwrap();
assert!(storage.get_node_by_hash(&hash).is_none());
}
#[test]
fn test_config_operations() {
let temp_dir = TempDir::new().unwrap();
let storage = RocksDBNodeStorage::<32>::new(temp_dir.path().to_path_buf()).unwrap();
let config_data = b"test config data";
storage.save_config("test_key", config_data);
let retrieved = storage.get_config("test_key");
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap(), config_data);
assert!(storage.get_config("non_existent").is_none());
}
#[test]
fn test_batch_operations() {
let temp_dir = TempDir::new().unwrap();
let mut storage = RocksDBNodeStorage::<32>::new(temp_dir.path().to_path_buf()).unwrap();
let mut nodes = Vec::new();
for i in 0..10 {
let mut node = create_test_node();
node.keys[0] = format!("key{}", i).into_bytes();
let hash = node.get_hash();
nodes.push((hash, node));
}
let hashes: Vec<_> = nodes.iter().map(|(h, _)| h.clone()).collect();
assert!(storage.batch_insert_nodes(nodes.clone()).is_ok());
for (hash, _) in &nodes {
assert!(storage.get_node_by_hash(hash).is_some());
}
assert!(storage.batch_delete_nodes(&hashes).is_ok());
for hash in &hashes {
assert!(storage.get_node_by_hash(hash).is_none());
}
}
#[test]
fn test_cache_functionality() {
let temp_dir = TempDir::new().unwrap();
let mut storage =
RocksDBNodeStorage::<32>::with_cache_size(temp_dir.path().to_path_buf(), 2).unwrap();
let node1 = create_test_node();
let hash1 = node1.get_hash();
storage.insert_node(hash1.clone(), node1.clone());
assert!(storage.get_node_by_hash(&hash1).is_some());
}
}