use ave_actors_store::{
Error, StoreOperation,
config::{MachineSpec, resolve_spec},
database::{Collection, DbManager, State},
};
use rocksdb::{
BlockBasedOptions, Cache, ColumnFamilyDescriptor, DB, DBCompactionStyle,
DBCompressionType, DBIteratorWithThreadMode, Direction, IteratorMode,
LogLevel, Options, WriteOptions,
};
use tracing::{debug, error, info, warn};
use std::{
fs,
path::{Path, PathBuf},
sync::Arc,
};
#[derive(Clone)]
pub struct RocksDbManager {
opts: Options,
path: PathBuf,
db: Arc<DB>,
strong_durability: bool,
}
impl RocksDbManager {
pub fn new(
path: &PathBuf,
durability: bool,
spec: Option<MachineSpec>,
) -> Result<Self, Error> {
info!("Creating RocksDB database manager");
if !Path::new(&path).exists() {
debug!("Path does not exist, creating it");
fs::create_dir_all(path).map_err(|e| {
error!(path = %path.display(), error = %e, "Failed to create RocksDB directory");
Error::CreateStore {
reason: format!(
"fail RockDB create directory: {}",
e
),
}
})?;
}
let spec = resolve_spec(spec);
let (ram_mb, cores) = (spec.ram_mb, spec.cpu_cores);
info!("RocksDB tuning: ram_mb={}, cpu_cores={}", ram_mb, cores);
let strong_durability = durability;
let mut options = Options::default();
apply_common_tuning(&mut options);
apply_tuning(&mut options, ram_mb, cores);
let cfs = match DB::list_cf(&options, path) {
Ok(cf_names) => {
debug!(
count = cf_names.len(),
"Found existing column families"
);
cf_names
}
Err(_) => {
debug!("No existing column families, using default");
vec!["default".to_string()]
}
};
let cf_opts = options.clone();
let cf_descriptors: Vec<_> = cfs
.iter()
.map(|cf| ColumnFamilyDescriptor::new(cf, cf_opts.clone()))
.collect();
debug!(path = %path.display(), "Opening RocksDB database");
let db = DB::open_cf_descriptors(&options, path, cf_descriptors)
.map_err(|e| {
error!(path = %path.display(), error = %e, "Failed to open RocksDB");
Error::CreateStore { reason: format!("Can not open RockDB: {}", e) }
})?;
debug!("RocksDB database manager created successfully");
Ok(Self {
opts: options,
path: path.clone(),
db: Arc::new(db),
strong_durability,
})
}
}
fn apply_common_tuning(options: &mut Options) {
options.create_if_missing(true);
options.set_compaction_style(DBCompactionStyle::Level);
options.set_level_compaction_dynamic_level_bytes(true);
options.set_level_zero_file_num_compaction_trigger(8);
options.set_level_zero_slowdown_writes_trigger(20);
options.set_level_zero_stop_writes_trigger(36);
options.set_compression_type(DBCompressionType::Lz4);
options.set_bottommost_compression_type(DBCompressionType::Zstd);
options.set_enable_pipelined_write(true);
options.set_bytes_per_sync(2 * 1024 * 1024); options.set_wal_bytes_per_sync(512 * 1024); options.set_log_level(LogLevel::Warn);
options.set_max_log_file_size(10 * 1024 * 1024); options.set_keep_log_file_num(5);
options.set_recycle_log_file_num(2);
options.set_log_file_time_to_roll(60 * 60); }
fn apply_tuning(options: &mut Options, ram_mb: u64, cores: usize) {
let parallelism = ((cores / 2) as i32).max(1).min(4);
options.increase_parallelism(parallelism);
options.set_max_background_jobs(parallelism);
let budget = ram_mb * 1024 * 1024 * 5 / 100;
let cache_bytes = (budget * 40 / 100)
.max(4 * 1024 * 1024)
.min(512 * 1024 * 1024);
let wb_count: u64 = match ram_mb {
0..=1024 => 2,
1025..=4096 => 3,
4097..=16384 => 4,
_ => 6,
};
let wb_size = (budget * 40 / 100 / wb_count)
.max(4 * 1024 * 1024)
.min(256 * 1024 * 1024);
let wal_bytes = (budget * 20 / 100)
.max(8 * 1024 * 1024)
.min(512 * 1024 * 1024);
let merge: i32 = if wb_count <= 2 { 1 } else { 2 };
options.set_write_buffer_size(wb_size as usize);
options.set_max_write_buffer_number(wb_count as i32);
options.set_min_write_buffer_number_to_merge(merge);
options.set_target_file_size_base(wb_size);
options.set_max_total_wal_size(wal_bytes);
let mut bb = BlockBasedOptions::default();
bb.set_bloom_filter(10.0, false);
bb.set_cache_index_and_filter_blocks(true);
bb.set_block_cache(&Cache::new_lru_cache(cache_bytes as usize));
options.set_block_based_table_factory(&bb);
}
fn write_options(sync: bool) -> WriteOptions {
let mut opts = WriteOptions::default();
opts.set_sync(sync);
opts
}
impl RocksDbManager {
fn ensure_cf(&self, name: &str) -> Result<(), Error> {
if self.db.cf_handle(name).is_none() {
debug!(cf = name, "Creating column family");
self.db.create_cf(name, &self.opts).map_err(|e| {
error!(cf = name, error = %e, "Failed to create column family");
Error::CreateStore {
reason: format!("{:?}", e),
}
})?;
}
Ok(())
}
}
impl DbManager<RocksDbStore, RocksDbStore> for RocksDbManager {
fn create_collection(
&self,
name: &str,
prefix: &str,
) -> Result<RocksDbStore, Error> {
self.ensure_cf(name)?;
debug!(cf = name, prefix = prefix, "Collection created");
Ok(RocksDbStore {
name: name.to_owned(),
prefix: prefix.to_owned(),
store: self.db.clone(),
strong_durability: self.strong_durability,
})
}
fn create_state(
&self,
name: &str,
prefix: &str,
) -> Result<RocksDbStore, Error> {
self.ensure_cf(name)?;
debug!(cf = name, prefix = prefix, "State created");
Ok(RocksDbStore {
name: name.to_owned(),
prefix: prefix.to_owned(),
store: self.db.clone(),
strong_durability: self.strong_durability,
})
}
fn stop(&mut self) -> Result<(), Error> {
debug!("Stopping RocksDB manager, flushing memtables and WAL");
self.db.flush_wal(true).map_err(|e| {
error!(error = %e, "Failed to flush WAL on stop");
Error::Store {
operation: StoreOperation::FlushWal,
reason: format!("{:?}", e),
}
})?;
let cf_names =
DB::list_cf(&self.opts, &self.path).map_err(|e| Error::Store {
operation: StoreOperation::ListCf,
reason: format!("{:?}", e),
})?;
for name in &cf_names {
if let Some(handle) = self.db.cf_handle(name) {
if let Err(e) = self.db.flush_cf(&handle) {
warn!(cf = name, error = %e, "Failed to flush column family on stop");
}
}
}
debug!("RocksDB stop complete");
Ok(())
}
}
pub struct RocksDbStore {
name: String,
prefix: String,
store: Arc<DB>,
strong_durability: bool,
}
impl State for RocksDbStore {
fn name(&self) -> &str {
&self.name
}
fn get(&self) -> Result<Vec<u8>, Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let result = self
.store
.get_cf(&handle, self.prefix.clone())
.map_err(|e| {
error!(cf = %self.name, error = %e, "Failed to get state");
Error::Get {
key: self.prefix.clone(),
reason: format!("{:?}", e),
}
})?;
match result {
Some(value) => Ok(value),
_ => Err(Error::EntryNotFound {
key: self.prefix.clone(),
}),
}
} else {
error!(cf = %self.name, "Column family not found for state get");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn put(&mut self, data: &[u8]) -> Result<(), Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let wopts = write_options(self.strong_durability);
Ok(self
.store
.put_cf_opt(&handle, self.prefix.clone(), data, &wopts)
.map_err(|e| {
error!(cf = %self.name, error = %e, "Failed to put state");
Error::Store {
operation: StoreOperation::RocksdbOperation,
reason: format!("{:?}", e),
}
})?)
} else {
error!(cf = %self.name, "Column family not found for state put");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn del(&mut self) -> Result<(), Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let key = self.prefix.clone();
let exists = self
.store
.get_cf(&handle, &key)
.map_err(|e| {
error!(cf = %self.name, key = %key, error = %e, "Failed to check state before delete");
Error::Get {
key: key.clone(),
reason: format!("{:?}", e),
}
})?
.is_some();
if !exists {
return Err(Error::EntryNotFound { key });
}
let wopts = write_options(self.strong_durability);
Ok(self
.store
.delete_cf_opt(&handle, self.prefix.clone(), &wopts)
.map_err(|e| {
warn!(cf = %self.name, error = %e, "Failed to delete state");
Error::Store {
operation: StoreOperation::RocksdbOperation,
reason: format!("{:?}", e),
}
})?)
} else {
error!(cf = %self.name, "Column family not found for state delete");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn purge(&mut self) -> Result<(), Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let wopts = write_options(self.strong_durability);
self.store
.delete_cf_opt(&handle, self.prefix.clone(), &wopts)
.map_err(|e| {
error!(cf = %self.name, error = %e, "Failed to purge state");
Error::Store {
operation: StoreOperation::RocksdbOperation,
reason: format!("{:?}", e),
}
})
} else {
error!(cf = %self.name, "Column family not found for state purge");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
}
impl Collection for RocksDbStore {
fn last(&self) -> Result<Option<(String, Vec<u8>)>, Error> {
let mut iter = self.iter(true)?;
let value = iter.next().transpose()?;
debug!("Last value: {:?}", value);
Ok(value)
}
fn name(&self) -> &str {
&self.name
}
fn get(&self, key: &str) -> Result<Vec<u8>, Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let full_key = format!("{}.{}", self.prefix, key);
let result = self
.store
.get_cf(&handle, &full_key)
.map_err(|e| {
error!(cf = %self.name, key = %full_key, error = %e, "Failed to get collection entry");
Error::Get { key: full_key.clone(), reason: format!("{:?}", e) }
})?;
match result {
Some(value) => Ok(value),
_ => Err(Error::EntryNotFound { key: full_key }),
}
} else {
error!(cf = %self.name, "Column family not found for collection get");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn put(&mut self, key: &str, data: &[u8]) -> Result<(), Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let key = format!("{}.{}", self.prefix, key);
let wopts = write_options(self.strong_durability);
Ok(self
.store
.put_cf_opt(&handle, key, data, &wopts)
.map_err(|e| {
error!(cf = %self.name, error = %e, "Failed to put collection entry");
Error::Store {
operation: StoreOperation::RocksdbOperation,
reason: format!("{:?}", e),
}
})?)
} else {
error!(cf = %self.name, "Column family not found for collection put");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn del(&mut self, key: &str) -> Result<(), Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let key = format!("{}.{}", self.prefix, key);
let exists = self
.store
.get_cf(&handle, &key)
.map_err(|e| {
error!(cf = %self.name, key = %key, error = %e, "Failed to check collection entry before delete");
Error::Get {
key: key.clone(),
reason: format!("{:?}", e),
}
})?
.is_some();
if !exists {
return Err(Error::EntryNotFound { key });
}
let wopts = write_options(self.strong_durability);
Ok(self
.store
.delete_cf_opt(&handle, key, &wopts)
.map_err(|e| {
warn!(cf = %self.name, error = %e, "Failed to delete collection entry");
Error::Store {
operation: StoreOperation::RocksdbOperation,
reason: format!("{:?}", e),
}
})?)
} else {
error!(cf = %self.name, "Column family not found for collection delete");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn purge(&mut self) -> Result<(), Error> {
if let Some(handle) = self.store.cf_handle(&self.name) {
let wopts = write_options(self.strong_durability);
let start = format!("{}.", self.prefix).into_bytes();
let mut end = start.clone();
end.push(0xFF);
debug!(cf = %self.name, "Purging collection with range delete");
self.store
.delete_range_cf_opt(&handle, start, end, &wopts)
.map_err(|e| {
error!(cf = %self.name, error = %e, "Failed to purge collection");
Error::Store {
operation: StoreOperation::RocksdbOperation,
reason: format!("{:?}", e),
}
})
} else {
error!(cf = %self.name, "Column family not found for collection purge");
Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
})
}
}
fn iter<'a>(
&'a self,
reverse: bool,
) -> Result<
Box<dyn Iterator<Item = Result<(String, Vec<u8>), Error>> + 'a>,
Error,
> {
let Some(_handle) = self.store.cf_handle(&self.name) else {
error!(cf = %self.name, "Column family not found for collection iter");
return Err(Error::Store {
operation: StoreOperation::ColumnAccess,
reason: "RocksDB column for the store does not exist."
.to_owned(),
});
};
Ok(Box::new(RocksDbIterator::new(
&self.store,
self.name.clone(),
self.prefix.clone(),
reverse,
)))
}
}
pub struct RocksDbIterator<'a> {
prefix_dot: Vec<u8>,
iter: DBIteratorWithThreadMode<'a, DB>,
}
impl<'a> RocksDbIterator<'a> {
pub fn new(
store: &'a Arc<DB>,
name: String,
prefix: String,
reverse: bool,
) -> Self {
let prefix_dot = format!("{}.", prefix).into_bytes();
let mut upper_bound = prefix_dot.clone();
upper_bound.push(0xFF);
let handle = store
.cf_handle(&name)
.expect("RocksDB column for the store does not exist.");
let mode = if reverse {
IteratorMode::From(&upper_bound, Direction::Reverse)
} else {
IteratorMode::From(&prefix_dot, Direction::Forward)
};
let iter = store.iterator_cf(&handle, mode);
Self { prefix_dot, iter }
}
}
impl Iterator for RocksDbIterator<'_> {
type Item = Result<(String, Vec<u8>), Error>;
fn next(&mut self) -> Option<Self::Item> {
for item in self.iter.by_ref() {
match item {
Ok((key, value)) => {
if !key.starts_with(&self.prefix_dot) {
return None;
}
let suffix = &key[self.prefix_dot.len()..];
let key_str = match String::from_utf8(suffix.to_vec()) {
Ok(key_str) => key_str,
Err(error) => {
return Some(Err(Error::Get {
key: String::from_utf8_lossy(&key).into_owned(),
reason: format!("{}", error),
}));
}
};
return Some(Ok((key_str, value.to_vec())));
}
Err(e) => {
error!(error = %e, "RocksDB iteration error");
return Some(Err(Error::Get {
key: String::from_utf8_lossy(&self.prefix_dot)
.into_owned(),
reason: format!("{}", e),
}));
}
}
}
None
}
}
#[cfg(test)]
mod tests {
impl Default for RocksDbManager {
fn default() -> Self {
let dir = tempfile::tempdir()
.expect("Can not create temporal directory.");
let path = dir.keep();
RocksDbManager::new(&path, false, None)
.expect("Can not create the database.")
}
}
use super::*;
use ave_actors_store::test_store_trait;
test_store_trait! {
unit_test_rocksdb_manager:crate::db::RocksDbManager:RocksDbStore
}
}