use crate::storage_engine::{DatabaseObject, Operations, StorageEngine};
use dragonfly_client_core::{
error::{ErrorType, OrErr},
Error, Result,
};
use rocksdb::WriteOptions;
use std::{
ops::Deref,
path::{Path, PathBuf},
};
use tracing::{info, warn};
pub struct RocksdbStorageEngine {
inner: rocksdb::DB,
}
impl Deref for RocksdbStorageEngine {
type Target = rocksdb::DB;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl RocksdbStorageEngine {
const DEFAULT_DIR_NAME: &'static str = "metadata";
const DEFAULT_MEMTABLE_MEMORY_BUDGET: usize = 512 * 1024 * 1024;
const DEFAULT_MAX_BACKGROUND_JOBS: i32 = 2;
const DEFAULT_BLOCK_SIZE: usize = 64 * 1024;
const DEFAULT_CACHE_SIZE: usize = 1024 * 1024 * 1024;
const DEFAULT_LOG_MAX_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_LOG_MAX_FILES: usize = 10;
const DEFAULT_BYTES_PER_SYNC: u64 = 2 * 1024 * 1024;
pub fn open(dir: &Path, log_dir: &PathBuf, cf_names: &[&str], keep: bool) -> Result<Self> {
info!("initializing metadata directory: {:?} {:?}", dir, cf_names);
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
options.set_compression_type(rocksdb::DBCompressionType::Lz4);
options.set_bottommost_compression_type(rocksdb::DBCompressionType::Lz4);
options.increase_parallelism(num_cpus::get() as i32);
options.set_max_background_jobs(std::cmp::max(
num_cpus::get() as i32,
Self::DEFAULT_MAX_BACKGROUND_JOBS,
));
options.set_use_fsync(false);
options.set_bytes_per_sync(Self::DEFAULT_BYTES_PER_SYNC);
options.set_db_log_dir(log_dir);
options.set_log_level(rocksdb::LogLevel::Info);
options.set_max_log_file_size(Self::DEFAULT_LOG_MAX_SIZE);
options.set_keep_log_file_num(Self::DEFAULT_LOG_MAX_FILES);
let mut block_options = rocksdb::BlockBasedOptions::default();
block_options.set_block_cache(&rocksdb::Cache::new_lru_cache(Self::DEFAULT_CACHE_SIZE));
block_options.set_block_size(Self::DEFAULT_BLOCK_SIZE);
block_options.set_cache_index_and_filter_blocks(true);
block_options.set_pin_l0_filter_and_index_blocks_in_cache(true);
options.set_block_based_table_factory(&block_options);
let mut cf_options = rocksdb::Options::default();
cf_options.set_prefix_extractor(rocksdb::SliceTransform::create_fixed_prefix(64));
cf_options.set_memtable_prefix_bloom_ratio(0.25);
cf_options.optimize_level_style_compaction(Self::DEFAULT_MEMTABLE_MEMORY_BUDGET);
let cfs = cf_names
.iter()
.map(|name| (name.to_string(), cf_options.clone()))
.collect::<Vec<_>>();
let dir = dir.join(Self::DEFAULT_DIR_NAME);
if !keep {
rocksdb::DB::destroy(&options, &dir).unwrap_or_else(|err| {
warn!("destroy {:?} failed: {}", dir, err);
});
}
let db =
rocksdb::DB::open_cf_with_opts(&options, &dir, cfs).or_err(ErrorType::StorageError)?;
Ok(Self { inner: db })
}
}
impl Operations for RocksdbStorageEngine {
fn get<O: DatabaseObject>(&self, key: &[u8]) -> Result<Option<O>> {
let cf = cf_handle::<O>(self)?;
let value = self.get_cf(cf, key).or_err(ErrorType::StorageError)?;
match value {
Some(value) => Ok(Some(O::deserialize_from(&value)?)),
None => Ok(None),
}
}
fn exists<O: DatabaseObject>(&self, key: &[u8]) -> Result<bool> {
let cf = cf_handle::<O>(self)?;
Ok(self
.get_cf(cf, key)
.or_err(ErrorType::StorageError)?
.is_some())
}
fn put<O: DatabaseObject>(&self, key: &[u8], value: &O) -> Result<()> {
let cf = cf_handle::<O>(self)?;
let mut options = rocksdb::WriteOptions::default();
options.set_sync(false);
self.put_cf_opt(cf, key, value.serialized()?, &options)
.or_err(ErrorType::StorageError)?;
Ok(())
}
fn delete<O: DatabaseObject>(&self, key: &[u8]) -> Result<()> {
let cf = cf_handle::<O>(self)?;
let mut options = WriteOptions::default();
options.set_sync(true);
self.delete_cf_opt(cf, key, &options)
.or_err(ErrorType::StorageError)?;
Ok(())
}
fn iter<O: DatabaseObject>(&self) -> Result<impl Iterator<Item = Result<(Box<[u8]>, O)>>> {
let cf = cf_handle::<O>(self)?;
let iter = self.iterator_cf(cf, rocksdb::IteratorMode::Start);
Ok(iter.map(|ele| {
let (key, value) = ele.or_err(ErrorType::StorageError)?;
Ok((key, O::deserialize_from(&value)?))
}))
}
fn iter_raw<O: DatabaseObject>(
&self,
) -> Result<impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>>> {
let cf = cf_handle::<O>(self)?;
Ok(self
.iterator_cf(cf, rocksdb::IteratorMode::Start)
.map(|ele| {
let (key, value) = ele.or_err(ErrorType::StorageError)?;
Ok((key, value))
}))
}
fn prefix_iter<O: DatabaseObject>(
&self,
prefix: &[u8],
) -> Result<impl Iterator<Item = Result<(Box<[u8]>, O)>>> {
let cf = cf_handle::<O>(self)?;
let iter = self.prefix_iterator_cf(cf, prefix);
Ok(iter.map(|ele| {
let (key, value) = ele.or_err(ErrorType::StorageError)?;
Ok((key, O::deserialize_from(&value)?))
}))
}
fn prefix_iter_raw<O: DatabaseObject>(
&self,
prefix: &[u8],
) -> Result<impl Iterator<Item = Result<(Box<[u8]>, Box<[u8]>)>>> {
let cf = cf_handle::<O>(self)?;
Ok(self.prefix_iterator_cf(cf, prefix).map(|ele| {
let (key, value) = ele.or_err(ErrorType::StorageError)?;
Ok((key, value))
}))
}
fn batch_delete<O: DatabaseObject>(&self, keys: Vec<&[u8]>) -> Result<()> {
let cf = cf_handle::<O>(self)?;
let mut batch = rocksdb::WriteBatch::default();
for key in keys {
batch.delete_cf(cf, key);
}
let mut options = WriteOptions::default();
options.set_sync(true);
Ok(self
.write_opt(batch, &options)
.or_err(ErrorType::StorageError)?)
}
}
impl StorageEngine<'_> for RocksdbStorageEngine {}
fn cf_handle<T>(db: &rocksdb::DB) -> Result<&rocksdb::ColumnFamily>
where
T: DatabaseObject,
{
let cf_name = T::NAMESPACE;
db.cf_handle(cf_name)
.ok_or_else(|| Error::ColumnFamilyNotFound(cf_name.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
use tempfile::tempdir;
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
struct Object {
id: String,
value: i32,
}
impl DatabaseObject for Object {
const NAMESPACE: &'static str = "object";
}
fn create_test_engine() -> RocksdbStorageEngine {
let temp_dir = tempdir().unwrap();
let log_dir = temp_dir.path().to_path_buf();
RocksdbStorageEngine::open(temp_dir.path(), &log_dir, &[Object::NAMESPACE], false).unwrap()
}
#[test]
fn test_put_and_get() {
let engine = create_test_engine();
let object = Object {
id: "1".to_string(),
value: 42,
};
engine.put::<Object>(object.id.as_bytes(), &object).unwrap();
let retrieved_object = engine.get::<Object>(object.id.as_bytes()).unwrap().unwrap();
assert_eq!(object, retrieved_object);
}
#[test]
fn test_exists() {
let engine = create_test_engine();
let object = Object {
id: "2".to_string(),
value: 100,
};
assert!(!engine.exists::<Object>(object.id.as_bytes()).unwrap());
engine.put::<Object>(object.id.as_bytes(), &object).unwrap();
assert!(engine.exists::<Object>(object.id.as_bytes()).unwrap());
}
#[test]
fn test_delete() {
let engine = create_test_engine();
let object = Object {
id: "3".to_string(),
value: 200,
};
engine.put::<Object>(object.id.as_bytes(), &object).unwrap();
assert!(engine.exists::<Object>(object.id.as_bytes()).unwrap());
engine.delete::<Object>(object.id.as_bytes()).unwrap();
assert!(!engine.exists::<Object>(object.id.as_bytes()).unwrap());
}
#[test]
fn test_batch_delete() {
let engine = create_test_engine();
let objects = vec![
Object {
id: "1".to_string(),
value: 1,
},
Object {
id: "2".to_string(),
value: 2,
},
Object {
id: "3".to_string(),
value: 3,
},
];
for object in &objects {
engine.put::<Object>(object.id.as_bytes(), object).unwrap();
assert!(engine.exists::<Object>(object.id.as_bytes()).unwrap());
}
let ids: Vec<&[u8]> = objects.iter().map(|object| object.id.as_bytes()).collect();
engine.batch_delete::<Object>(ids).unwrap();
for object in &objects {
assert!(!engine.exists::<Object>(object.id.as_bytes()).unwrap());
}
}
#[test]
fn test_iter() {
let engine = create_test_engine();
let objects = vec![
Object {
id: "1".to_string(),
value: 10,
},
Object {
id: "2".to_string(),
value: 20,
},
Object {
id: "3".to_string(),
value: 30,
},
];
for object in &objects {
engine.put::<Object>(object.id.as_bytes(), object).unwrap();
}
let retrieved_objects = engine
.iter::<Object>()
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(retrieved_objects.len(), objects.len());
for object in &objects {
let found = retrieved_objects
.iter()
.any(|(_, v)| v.id == object.id && v.value == object.value);
assert!(found, "could not find object with id {:?}", object.id);
}
}
#[test]
fn test_prefix_iter() {
let engine = create_test_engine();
let prefix_a = [b'a'; 64];
let prefix_b = [b'b'; 64];
let key_a1 = [&prefix_a[..], b"_suffix1"].concat();
let key_a2 = [&prefix_a[..], b"_suffix2"].concat();
let key_b1 = [&prefix_b[..], b"_suffix1"].concat();
let key_b2 = [&prefix_b[..], b"_suffix2"].concat();
let objects_with_prefix_a = vec![
(
key_a1.clone(),
Object {
id: "prefix_id_a1".to_string(),
value: 100,
},
),
(
key_a2.clone(),
Object {
id: "prefix_id_a2".to_string(),
value: 200,
},
),
];
let objects_with_prefix_b = vec![
(
key_b1.clone(),
Object {
id: "prefix_id_b1".to_string(),
value: 300,
},
),
(
key_b2.clone(),
Object {
id: "prefix_id_b2".to_string(),
value: 400,
},
),
];
for (key, obj) in &objects_with_prefix_a {
engine.put::<Object>(key, obj).unwrap();
}
for (key, obj) in &objects_with_prefix_b {
engine.put::<Object>(key, obj).unwrap();
}
let retrieved_objects = engine
.prefix_iter::<Object>(&prefix_a)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(
retrieved_objects.len(),
objects_with_prefix_a.len(),
"expected {} objects with prefix 'a', but got {}",
objects_with_prefix_a.len(),
retrieved_objects.len()
);
for (key, object) in &objects_with_prefix_a {
let found = retrieved_objects
.iter()
.any(|(_, v)| v.id == object.id && v.value == object.value);
assert!(found, "could not find object with key {:?}", key);
}
for (key, object) in &objects_with_prefix_b {
let found = retrieved_objects
.iter()
.any(|(_, v)| v.id == object.id && v.value == object.value);
assert!(!found, "found object with different prefix: {:?}", key);
}
}
#[test]
fn test_iter_raw() {
let engine = create_test_engine();
let objects = vec![
Object {
id: "1".to_string(),
value: 10,
},
Object {
id: "2".to_string(),
value: 20,
},
Object {
id: "3".to_string(),
value: 30,
},
];
for object in &objects {
engine.put::<Object>(object.id.as_bytes(), object).unwrap();
}
let retrieved_objects = engine
.iter_raw::<Object>()
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(retrieved_objects.len(), objects.len());
for object in &objects {
let found = retrieved_objects
.iter()
.any(|(_, v)| match Object::deserialize_from(v) {
Ok(deserialized) => {
deserialized.id == object.id && deserialized.value == object.value
}
Err(_) => false,
});
assert!(
found,
"could not find or deserialize object with key {:?}",
object.id
);
}
}
#[test]
fn test_prefix_iter_raw() {
let engine = create_test_engine();
let prefix_a = [b'a'; 64];
let prefix_b = [b'b'; 64];
let key_a1 = [&prefix_a[..], b"_raw_suffix1"].concat();
let key_a2 = [&prefix_a[..], b"_raw_suffix2"].concat();
let key_b1 = [&prefix_b[..], b"_raw_suffix1"].concat();
let key_b2 = [&prefix_b[..], b"_raw_suffix2"].concat();
let objects_with_prefix_a = vec![
(
key_a1.clone(),
Object {
id: "raw_prefix_id_a1".to_string(),
value: 100,
},
),
(
key_a2.clone(),
Object {
id: "raw_prefix_id_a2".to_string(),
value: 200,
},
),
];
let objects_with_prefix_b = vec![
(
key_b1.clone(),
Object {
id: "raw_prefix_id_b1".to_string(),
value: 300,
},
),
(
key_b2.clone(),
Object {
id: "raw_prefix_id_b2".to_string(),
value: 400,
},
),
];
for (key, obj) in &objects_with_prefix_a {
engine.put::<Object>(key, obj).unwrap();
}
for (key, obj) in &objects_with_prefix_b {
engine.put::<Object>(key, obj).unwrap();
}
let retrieved_objects = engine
.prefix_iter_raw::<Object>(&prefix_a)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
assert_eq!(
retrieved_objects.len(),
objects_with_prefix_a.len(),
"expected {} raw objects with prefix 'a', but got {}",
objects_with_prefix_a.len(),
retrieved_objects.len()
);
for (_, object) in &objects_with_prefix_a {
let found = retrieved_objects
.iter()
.any(|(_, v)| match Object::deserialize_from(v) {
Ok(deserialized) => {
deserialized.id == object.id && deserialized.value == object.value
}
Err(_) => false,
});
assert!(
found,
"could not find or deserialize object with key {:?}",
object.id
);
}
for (key, _) in &objects_with_prefix_b {
let found = retrieved_objects
.iter()
.any(|(k, _)| k.as_ref() == key.as_slice());
assert!(!found, "found object with different prefix: {:?}", key);
}
}
#[test]
fn test_column_family_not_found() {
let engine = create_test_engine();
#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct UnregisteredObject {
data: String,
}
impl DatabaseObject for UnregisteredObject {
const NAMESPACE: &'static str = "unregistered";
}
let key = b"unregistered";
let result = engine.get::<UnregisteredObject>(key);
assert!(result.is_err());
if let Err(err) = result {
assert!(format!("{:?}", err).contains("ColumnFamilyNotFound"));
}
}
}