use std::panic::{AssertUnwindSafe, catch_unwind};
use std::path::Path;
use std::ptr::{NonNull, null_mut};
use std::slice;
use std::sync::Arc;
use libc::{self, c_char, c_double, c_int, c_uchar, c_uint, c_void, size_t};
use crate::cache::Cache;
use crate::column_family::ColumnFamilyTtl;
use crate::event_listener::{EventListener, new_event_listener};
use crate::ffi_util::from_cstr_and_free;
use crate::sst_file_manager::SstFileManager;
use crate::statistics::{Histogram, HistogramData, StatsLevel};
use crate::write_buffer_manager::WriteBufferManager;
use crate::{
ColumnFamilyDescriptor, Error, SnapshotWithThreadMode,
compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn},
compaction_filter_factory::{self, CompactionFilterFactory},
comparator::{
ComparatorCallback, ComparatorWithTsCallback, CompareFn, CompareTsFn, CompareWithoutTsFn,
},
db::DBAccess,
env::Env,
ffi,
ffi_util::{CStrLike, to_cpath},
merge_operator::{
self, MergeFn, MergeOperatorCallback, full_merge_callback, partial_merge_callback,
},
slice_transform::SliceTransform,
statistics::Ticker,
};
type LogCallbackFn = dyn Fn(LogLevel, &str) + 'static + Send + Sync;
type LoggerCallback = Box<dyn Fn(LogLevel, &str) + Sync + Send>;
struct LogCallback {
callback: Box<LogCallbackFn>,
}
#[derive(Default)]
pub(crate) struct OptionsMustOutliveDB {
env: Option<Env>,
row_cache: Option<Cache>,
blob_cache: Option<Cache>,
block_based: Option<BlockBasedOptionsMustOutliveDB>,
write_buffer_manager: Option<WriteBufferManager>,
sst_file_manager: Option<SstFileManager>,
log_callback: Option<Arc<LogCallback>>,
comparator: Option<Arc<OwnedComparator>>,
compaction_filter: Option<Arc<OwnedCompactionFilter>>,
logger_callback: Option<Arc<LoggerCallback>>,
}
impl OptionsMustOutliveDB {
pub(crate) fn clone(&self) -> Self {
Self {
env: self.env.clone(),
row_cache: self.row_cache.clone(),
blob_cache: self.blob_cache.clone(),
block_based: self
.block_based
.as_ref()
.map(BlockBasedOptionsMustOutliveDB::clone),
write_buffer_manager: self.write_buffer_manager.clone(),
sst_file_manager: self.sst_file_manager.clone(),
log_callback: self.log_callback.clone(),
comparator: self.comparator.clone(),
compaction_filter: self.compaction_filter.clone(),
logger_callback: self.logger_callback.clone(),
}
}
}
struct OwnedComparator {
inner: NonNull<ffi::rocksdb_comparator_t>,
}
impl OwnedComparator {
fn new(inner: NonNull<ffi::rocksdb_comparator_t>) -> Self {
Self { inner }
}
}
impl Drop for OwnedComparator {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_comparator_destroy(self.inner.as_ptr());
}
}
}
struct OwnedCompactionFilter {
inner: NonNull<ffi::rocksdb_compactionfilter_t>,
}
impl OwnedCompactionFilter {
fn new(inner: NonNull<ffi::rocksdb_compactionfilter_t>) -> Self {
Self { inner }
}
}
impl Drop for OwnedCompactionFilter {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_compactionfilter_destroy(self.inner.as_ptr());
}
}
}
#[derive(Default)]
struct BlockBasedOptionsMustOutliveDB {
block_cache: Option<Cache>,
}
impl BlockBasedOptionsMustOutliveDB {
fn clone(&self) -> Self {
Self {
block_cache: self.block_cache.clone(),
}
}
}
pub struct Options {
pub(crate) inner: *mut ffi::rocksdb_options_t,
pub(crate) outlive: OptionsMustOutliveDB,
}
pub struct WriteOptions {
pub(crate) inner: *mut ffi::rocksdb_writeoptions_t,
}
pub struct LruCacheOptions {
pub(crate) inner: *mut ffi::rocksdb_lru_cache_options_t,
}
pub struct FlushOptions {
pub(crate) inner: *mut ffi::rocksdb_flushoptions_t,
}
pub struct BlockBasedOptions {
pub(crate) inner: *mut ffi::rocksdb_block_based_table_options_t,
outlive: BlockBasedOptionsMustOutliveDB,
}
pub struct ReadOptions {
pub(crate) inner: *mut ffi::rocksdb_readoptions_t,
timestamp: Option<Vec<u8>>,
iter_start_ts: Option<Vec<u8>>,
iterate_upper_bound: Option<Vec<u8>>,
iterate_lower_bound: Option<Vec<u8>>,
}
pub struct CuckooTableOptions {
pub(crate) inner: *mut ffi::rocksdb_cuckoo_table_options_t,
}
pub struct IngestExternalFileOptions {
pub(crate) inner: *mut ffi::rocksdb_ingestexternalfileoptions_t,
}
unsafe impl Send for Options {}
unsafe impl Send for WriteOptions {}
unsafe impl Send for LruCacheOptions {}
unsafe impl Send for FlushOptions {}
unsafe impl Send for BlockBasedOptions {}
unsafe impl Send for CuckooTableOptions {}
unsafe impl Send for ReadOptions {}
unsafe impl Send for IngestExternalFileOptions {}
unsafe impl Send for CompactOptions {}
unsafe impl Send for ImportColumnFamilyOptions {}
unsafe impl Send for OwnedComparator {}
unsafe impl Send for OwnedCompactionFilter {}
unsafe impl Sync for Options {}
unsafe impl Sync for WriteOptions {}
unsafe impl Sync for LruCacheOptions {}
unsafe impl Sync for FlushOptions {}
unsafe impl Sync for BlockBasedOptions {}
unsafe impl Sync for CuckooTableOptions {}
unsafe impl Sync for ReadOptions {}
unsafe impl Sync for IngestExternalFileOptions {}
unsafe impl Sync for CompactOptions {}
unsafe impl Sync for ImportColumnFamilyOptions {}
unsafe impl Sync for OwnedComparator {}
unsafe impl Sync for OwnedCompactionFilter {}
impl Drop for Options {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_options_destroy(self.inner);
}
}
}
impl Clone for Options {
fn clone(&self) -> Self {
let inner = unsafe { ffi::rocksdb_options_create_copy(self.inner) };
assert!(!inner.is_null(), "Could not copy RocksDB options");
Self {
inner,
outlive: self.outlive.clone(),
}
}
}
impl Drop for BlockBasedOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_block_based_options_destroy(self.inner);
}
}
}
impl Drop for CuckooTableOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_cuckoo_options_destroy(self.inner);
}
}
}
impl Drop for FlushOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_flushoptions_destroy(self.inner);
}
}
}
impl Drop for WriteOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_writeoptions_destroy(self.inner);
}
}
}
impl Drop for LruCacheOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_lru_cache_options_destroy(self.inner);
}
}
}
impl Drop for ReadOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_readoptions_destroy(self.inner);
}
}
}
impl Drop for IngestExternalFileOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_destroy(self.inner);
}
}
}
impl BlockBasedOptions {
pub fn set_block_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_block_based_options_set_block_size(self.inner, size);
}
}
pub fn set_metadata_block_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_block_based_options_set_metadata_block_size(self.inner, size as u64);
}
}
pub fn set_partition_filters(&mut self, size: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_partition_filters(self.inner, c_uchar::from(size));
}
}
pub fn set_block_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_block_based_options_set_block_cache(self.inner, cache.0.inner.as_ptr());
}
self.outlive.block_cache = Some(cache.clone());
}
pub fn disable_cache(&mut self) {
unsafe {
ffi::rocksdb_block_based_options_set_no_block_cache(self.inner, c_uchar::from(true));
}
}
pub fn set_bloom_filter(&mut self, bits_per_key: c_double, block_based: bool) {
unsafe {
let bloom = if block_based {
ffi::rocksdb_filterpolicy_create_bloom(bits_per_key as _)
} else {
ffi::rocksdb_filterpolicy_create_bloom_full(bits_per_key as _)
};
ffi::rocksdb_block_based_options_set_filter_policy(self.inner, bloom);
}
}
pub fn set_ribbon_filter(&mut self, bloom_equivalent_bits_per_key: c_double) {
unsafe {
let ribbon = ffi::rocksdb_filterpolicy_create_ribbon(bloom_equivalent_bits_per_key);
ffi::rocksdb_block_based_options_set_filter_policy(self.inner, ribbon);
}
}
pub fn set_hybrid_ribbon_filter(
&mut self,
bloom_equivalent_bits_per_key: c_double,
bloom_before_level: c_int,
) {
unsafe {
let ribbon = ffi::rocksdb_filterpolicy_create_ribbon_hybrid(
bloom_equivalent_bits_per_key,
bloom_before_level,
);
ffi::rocksdb_block_based_options_set_filter_policy(self.inner, ribbon);
}
}
pub fn set_cache_index_and_filter_blocks(&mut self, v: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_cache_index_and_filter_blocks(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_cache_index_and_filter_blocks_with_high_priority(&mut self, v: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_cache_index_and_filter_blocks_with_high_priority(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_index_type(&mut self, index_type: BlockBasedIndexType) {
let index = index_type as i32;
unsafe {
ffi::rocksdb_block_based_options_set_index_type(self.inner, index);
}
}
pub fn set_pin_l0_filter_and_index_blocks_in_cache(&mut self, v: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_pin_l0_filter_and_index_blocks_in_cache(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_pin_top_level_index_and_filter(&mut self, v: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_pin_top_level_index_and_filter(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_format_version(&mut self, version: i32) {
unsafe {
ffi::rocksdb_block_based_options_set_format_version(self.inner, version);
}
}
pub fn set_use_delta_encoding(&mut self, enable: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_use_delta_encoding(
self.inner,
c_uchar::from(enable),
);
}
}
pub fn set_block_restart_interval(&mut self, interval: i32) {
unsafe {
ffi::rocksdb_block_based_options_set_block_restart_interval(self.inner, interval);
}
}
pub fn set_index_block_restart_interval(&mut self, interval: i32) {
unsafe {
ffi::rocksdb_block_based_options_set_index_block_restart_interval(self.inner, interval);
}
}
pub fn set_data_block_index_type(&mut self, index_type: DataBlockIndexType) {
let index_t = index_type as i32;
unsafe {
ffi::rocksdb_block_based_options_set_data_block_index_type(self.inner, index_t);
}
}
pub fn set_data_block_hash_ratio(&mut self, ratio: f64) {
unsafe {
ffi::rocksdb_block_based_options_set_data_block_hash_ratio(self.inner, ratio);
}
}
pub fn set_whole_key_filtering(&mut self, v: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_whole_key_filtering(self.inner, c_uchar::from(v));
}
}
pub fn set_checksum_type(&mut self, checksum_type: ChecksumType) {
unsafe {
ffi::rocksdb_block_based_options_set_checksum(self.inner, checksum_type as c_char);
}
}
pub fn set_optimize_filters_for_memory(&mut self, v: bool) {
unsafe {
ffi::rocksdb_block_based_options_set_optimize_filters_for_memory(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_top_level_index_pinning_tier(&mut self, tier: BlockBasedPinningTier) {
unsafe {
ffi::rocksdb_block_based_options_set_top_level_index_pinning_tier(
self.inner,
tier as c_int,
);
}
}
pub fn set_partition_pinning_tier(&mut self, tier: BlockBasedPinningTier) {
unsafe {
ffi::rocksdb_block_based_options_set_partition_pinning_tier(self.inner, tier as c_int);
}
}
pub fn set_unpartitioned_pinning_tier(&mut self, tier: BlockBasedPinningTier) {
unsafe {
ffi::rocksdb_block_based_options_set_unpartitioned_pinning_tier(
self.inner,
tier as c_int,
);
}
}
}
impl Default for BlockBasedOptions {
fn default() -> Self {
let block_opts = unsafe { ffi::rocksdb_block_based_options_create() };
assert!(
!block_opts.is_null(),
"Could not create RocksDB block based options"
);
Self {
inner: block_opts,
outlive: BlockBasedOptionsMustOutliveDB::default(),
}
}
}
impl CuckooTableOptions {
pub fn set_hash_ratio(&mut self, ratio: f64) {
unsafe {
ffi::rocksdb_cuckoo_options_set_hash_ratio(self.inner, ratio);
}
}
pub fn set_max_search_depth(&mut self, depth: u32) {
unsafe {
ffi::rocksdb_cuckoo_options_set_max_search_depth(self.inner, depth);
}
}
pub fn set_cuckoo_block_size(&mut self, size: u32) {
unsafe {
ffi::rocksdb_cuckoo_options_set_cuckoo_block_size(self.inner, size);
}
}
pub fn set_identity_as_first_hash(&mut self, flag: bool) {
unsafe {
ffi::rocksdb_cuckoo_options_set_identity_as_first_hash(self.inner, c_uchar::from(flag));
}
}
pub fn set_use_module_hash(&mut self, flag: bool) {
unsafe {
ffi::rocksdb_cuckoo_options_set_use_module_hash(self.inner, c_uchar::from(flag));
}
}
}
impl Default for CuckooTableOptions {
fn default() -> Self {
let opts = unsafe { ffi::rocksdb_cuckoo_options_create() };
assert!(!opts.is_null(), "Could not create RocksDB cuckoo options");
Self { inner: opts }
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(i32)]
pub enum LogLevel {
Debug = 0,
Info,
Warn,
Error,
Fatal,
Header,
}
impl LogLevel {
pub(crate) fn try_from_raw(raw: i32) -> Option<Self> {
match raw {
n if n == LogLevel::Debug as i32 => Some(LogLevel::Debug),
n if n == LogLevel::Info as i32 => Some(LogLevel::Info),
n if n == LogLevel::Warn as i32 => Some(LogLevel::Warn),
n if n == LogLevel::Error as i32 => Some(LogLevel::Error),
n if n == LogLevel::Fatal as i32 => Some(LogLevel::Fatal),
n if n == LogLevel::Header as i32 => Some(LogLevel::Header),
_ => None,
}
}
}
impl Options {
pub fn load_latest<P: AsRef<Path>>(
path: P,
env: Env,
ignore_unknown_options: bool,
cache: Cache,
) -> Result<(Options, Vec<ColumnFamilyDescriptor>), Error> {
let path = to_cpath(path)?;
let mut db_options: *mut ffi::rocksdb_options_t = null_mut();
let mut num_column_families: usize = 0;
let mut column_family_names: *mut *mut c_char = null_mut();
let mut column_family_options: *mut *mut ffi::rocksdb_options_t = null_mut();
unsafe {
ffi_try!(ffi::rocksdb_load_latest_options(
path.as_ptr(),
env.0.inner,
ignore_unknown_options,
cache.0.inner.as_ptr(),
&raw mut db_options,
&raw mut num_column_families,
&raw mut column_family_names,
&raw mut column_family_options,
));
}
let options = Options {
inner: db_options,
outlive: OptionsMustOutliveDB::default(),
};
let column_families = unsafe {
Options::read_column_descriptors(
num_column_families,
column_family_names,
column_family_options,
)
};
Ok((options, column_families))
}
pub fn get_options_from_string<S: AsRef<str>>(
&mut self,
opts_str: S,
) -> Result<Options, Error> {
let options = Options {
inner: unsafe { ffi::rocksdb_options_create() },
outlive: OptionsMustOutliveDB::default(),
};
let opts_cstr = opts_str.as_ref().into_c_string().map_err(|e| {
Error::new(format!(
"options string must not contain NUL (0x00) bytes: {e}"
))
})?;
unsafe {
ffi_try!(ffi::rocksdb_get_options_from_string(
self.inner.cast_const(),
opts_cstr.as_ptr(),
options.inner,
));
}
Ok(options)
}
#[inline]
unsafe fn read_column_descriptors(
num_column_families: usize,
column_family_names: *mut *mut c_char,
column_family_options: *mut *mut ffi::rocksdb_options_t,
) -> Vec<ColumnFamilyDescriptor> {
let column_family_names_iter = unsafe {
slice::from_raw_parts(column_family_names, num_column_families)
.iter()
.map(|ptr| from_cstr_and_free(*ptr))
};
let column_family_options_iter = unsafe {
slice::from_raw_parts(column_family_options, num_column_families)
.iter()
.map(|ptr| Options {
inner: *ptr,
outlive: OptionsMustOutliveDB::default(),
})
};
let column_descriptors = column_family_names_iter
.zip(column_family_options_iter)
.map(|(name, options)| ColumnFamilyDescriptor {
name,
options,
ttl: ColumnFamilyTtl::Disabled,
})
.collect::<Vec<_>>();
unsafe {
ffi::rocksdb_free(column_family_names as *mut c_void);
ffi::rocksdb_free(column_family_options as *mut c_void);
column_descriptors
}
}
pub fn increase_parallelism(&mut self, parallelism: i32) {
unsafe {
ffi::rocksdb_options_increase_parallelism(self.inner, parallelism);
}
}
pub fn optimize_level_style_compaction(&mut self, memtable_memory_budget: usize) {
unsafe {
ffi::rocksdb_options_optimize_level_style_compaction(
self.inner,
memtable_memory_budget as u64,
);
}
}
pub fn optimize_universal_style_compaction(&mut self, memtable_memory_budget: usize) {
unsafe {
ffi::rocksdb_options_optimize_universal_style_compaction(
self.inner,
memtable_memory_budget as u64,
);
}
}
pub fn create_if_missing(&mut self, create_if_missing: bool) {
unsafe {
ffi::rocksdb_options_set_create_if_missing(
self.inner,
c_uchar::from(create_if_missing),
);
}
}
pub fn create_missing_column_families(&mut self, create_missing_cfs: bool) {
unsafe {
ffi::rocksdb_options_set_create_missing_column_families(
self.inner,
c_uchar::from(create_missing_cfs),
);
}
}
pub fn set_error_if_exists(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_error_if_exists(self.inner, c_uchar::from(enabled));
}
}
pub fn set_paranoid_checks(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_paranoid_checks(self.inner, c_uchar::from(enabled));
}
}
pub fn set_db_paths(&mut self, paths: &[DBPath]) {
let mut paths: Vec<_> = paths.iter().map(|path| path.inner.cast_const()).collect();
let num_paths = paths.len();
unsafe {
ffi::rocksdb_options_set_db_paths(self.inner, paths.as_mut_ptr(), num_paths);
}
}
pub fn set_env(&mut self, env: &Env) {
unsafe {
ffi::rocksdb_options_set_env(self.inner, env.0.inner);
}
self.outlive.env = Some(env.clone());
}
pub fn set_compression_type(&mut self, t: DBCompressionType) {
unsafe {
ffi::rocksdb_options_set_compression(self.inner, t as c_int);
}
}
pub fn set_compression_options_parallel_threads(&mut self, num: i32) {
unsafe {
ffi::rocksdb_options_set_compression_options_parallel_threads(self.inner, num);
}
}
pub fn set_wal_compression_type(&mut self, t: DBCompressionType) {
match t {
DBCompressionType::None | DBCompressionType::Zstd => unsafe {
ffi::rocksdb_options_set_wal_compression(self.inner, t as c_int);
},
other => unimplemented!("{:?} is not supported for WAL compression", other),
}
}
pub fn set_bottommost_compression_type(&mut self, t: DBCompressionType) {
unsafe {
ffi::rocksdb_options_set_bottommost_compression(self.inner, t as c_int);
}
}
pub fn set_compression_per_level(&mut self, level_types: &[DBCompressionType]) {
unsafe {
let mut level_types: Vec<_> = level_types.iter().map(|&t| t as c_int).collect();
ffi::rocksdb_options_set_compression_per_level(
self.inner,
level_types.as_mut_ptr(),
level_types.len() as size_t,
);
}
}
pub fn set_compression_options(
&mut self,
w_bits: c_int,
level: c_int,
strategy: c_int,
max_dict_bytes: c_int,
) {
unsafe {
ffi::rocksdb_options_set_compression_options(
self.inner,
w_bits,
level,
strategy,
max_dict_bytes,
);
}
}
pub fn set_bottommost_compression_options(
&mut self,
w_bits: c_int,
level: c_int,
strategy: c_int,
max_dict_bytes: c_int,
enabled: bool,
) {
unsafe {
ffi::rocksdb_options_set_bottommost_compression_options(
self.inner,
w_bits,
level,
strategy,
max_dict_bytes,
c_uchar::from(enabled),
);
}
}
pub fn set_zstd_max_train_bytes(&mut self, value: c_int) {
unsafe {
ffi::rocksdb_options_set_compression_options_zstd_max_train_bytes(self.inner, value);
}
}
pub fn set_bottommost_zstd_max_train_bytes(&mut self, value: c_int, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
self.inner,
value,
c_uchar::from(enabled),
);
}
}
pub fn set_compaction_readahead_size(&mut self, compaction_readahead_size: usize) {
unsafe {
ffi::rocksdb_options_compaction_readahead_size(self.inner, compaction_readahead_size);
}
}
pub fn set_level_compaction_dynamic_level_bytes(&mut self, v: bool) {
unsafe {
ffi::rocksdb_options_set_level_compaction_dynamic_level_bytes(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_periodic_compaction_seconds(&mut self, secs: u64) {
unsafe {
ffi::rocksdb_options_set_periodic_compaction_seconds(self.inner, secs);
}
}
pub fn set_memtable_op_scan_flush_trigger(&mut self, num: u32) {
unsafe {
ffi::rocksdb_options_set_memtable_op_scan_flush_trigger(self.inner, num);
}
}
pub fn set_memtable_avg_op_scan_flush_trigger(&mut self, num: u32) {
unsafe {
ffi::rocksdb_options_set_memtable_avg_op_scan_flush_trigger(self.inner, num);
}
}
pub fn set_ttl(&mut self, secs: u64) {
unsafe {
ffi::rocksdb_options_set_ttl(self.inner, secs);
}
}
pub fn set_merge_operator_associative<F: MergeFn + Clone>(
&mut self,
name: impl CStrLike,
full_merge_fn: F,
) {
let cb = Box::new(MergeOperatorCallback {
name: name.into_c_string().unwrap(),
full_merge_fn: full_merge_fn.clone(),
partial_merge_fn: full_merge_fn,
});
unsafe {
let mo = ffi::rocksdb_mergeoperator_create(
Box::into_raw(cb).cast::<c_void>(),
Some(merge_operator::destructor_callback::<F, F>),
Some(full_merge_callback::<F, F>),
Some(partial_merge_callback::<F, F>),
Some(merge_operator::delete_callback),
Some(merge_operator::name_callback::<F, F>),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
}
pub fn set_merge_operator<F: MergeFn, PF: MergeFn>(
&mut self,
name: impl CStrLike,
full_merge_fn: F,
partial_merge_fn: PF,
) {
let cb = Box::new(MergeOperatorCallback {
name: name.into_c_string().unwrap(),
full_merge_fn,
partial_merge_fn,
});
unsafe {
let mo = ffi::rocksdb_mergeoperator_create(
Box::into_raw(cb).cast::<c_void>(),
Some(merge_operator::destructor_callback::<F, PF>),
Some(full_merge_callback::<F, PF>),
Some(partial_merge_callback::<F, PF>),
Some(merge_operator::delete_callback),
Some(merge_operator::name_callback::<F, PF>),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
}
#[deprecated(
since = "0.5.0",
note = "add_merge_operator has been renamed to set_merge_operator"
)]
pub fn add_merge_operator<F: MergeFn + Clone>(&mut self, name: &str, merge_fn: F) {
self.set_merge_operator_associative(name, merge_fn);
}
pub fn set_compaction_filter<F>(&mut self, name: impl CStrLike, filter_fn: F)
where
F: CompactionFilterFn + Send + 'static,
{
let cb = Box::new(CompactionFilterCallback {
name: name.into_c_string().unwrap(),
filter_fn,
});
let filter = unsafe {
let cf = ffi::rocksdb_compactionfilter_create(
Box::into_raw(cb).cast::<c_void>(),
Some(compaction_filter::destructor_callback::<CompactionFilterCallback<F>>),
Some(compaction_filter::filter_callback::<CompactionFilterCallback<F>>),
Some(compaction_filter::name_callback::<CompactionFilterCallback<F>>),
);
ffi::rocksdb_options_set_compaction_filter(self.inner, cf);
OwnedCompactionFilter::new(NonNull::new(cf).unwrap())
};
self.outlive.compaction_filter = Some(Arc::new(filter));
}
pub fn add_event_listener<L: EventListener>(&mut self, l: L) {
let handle = new_event_listener(l);
unsafe { ffi::rocksdb_options_add_eventlistener(self.inner, handle.inner) }
}
pub fn set_compaction_filter_factory<F>(&mut self, factory: F)
where
F: CompactionFilterFactory + 'static,
{
let factory = Box::new(factory);
unsafe {
let cff = ffi::rocksdb_compactionfilterfactory_create(
Box::into_raw(factory).cast::<c_void>(),
Some(compaction_filter_factory::destructor_callback::<F>),
Some(compaction_filter_factory::create_compaction_filter_callback::<F>),
Some(compaction_filter_factory::name_callback::<F>),
);
ffi::rocksdb_options_set_compaction_filter_factory(self.inner, cff);
}
}
pub fn set_comparator(&mut self, name: impl CStrLike, compare_fn: Box<CompareFn>) {
let cb = Box::new(ComparatorCallback {
name: name.into_c_string().unwrap(),
compare_fn,
});
let cmp = unsafe {
let cmp = ffi::rocksdb_comparator_create(
Box::into_raw(cb).cast::<c_void>(),
Some(ComparatorCallback::destructor_callback),
Some(ComparatorCallback::compare_callback),
Some(ComparatorCallback::name_callback),
);
ffi::rocksdb_options_set_comparator(self.inner, cmp);
OwnedComparator::new(NonNull::new(cmp).unwrap())
};
self.outlive.comparator = Some(Arc::new(cmp));
}
pub fn set_comparator_with_ts(
&mut self,
name: impl CStrLike,
timestamp_size: usize,
compare_fn: Box<CompareFn>,
compare_ts_fn: Box<CompareTsFn>,
compare_without_ts_fn: Box<CompareWithoutTsFn>,
) {
let cb = Box::new(ComparatorWithTsCallback {
name: name.into_c_string().unwrap(),
compare_fn,
compare_ts_fn,
compare_without_ts_fn,
});
let cmp = unsafe {
let cmp = ffi::rocksdb_comparator_with_ts_create(
Box::into_raw(cb).cast::<c_void>(),
Some(ComparatorWithTsCallback::destructor_callback),
Some(ComparatorWithTsCallback::compare_callback),
Some(ComparatorWithTsCallback::compare_ts_callback),
Some(ComparatorWithTsCallback::compare_without_ts_callback),
Some(ComparatorWithTsCallback::name_callback),
timestamp_size,
);
ffi::rocksdb_options_set_comparator(self.inner, cmp);
OwnedComparator::new(NonNull::new(cmp).unwrap())
};
self.outlive.comparator = Some(Arc::new(cmp));
}
pub fn set_prefix_extractor(&mut self, prefix_extractor: SliceTransform) {
unsafe {
ffi::rocksdb_options_set_prefix_extractor(self.inner, prefix_extractor.inner);
}
}
pub fn optimize_for_point_lookup(&mut self, block_cache_size_mb: u64) {
unsafe {
ffi::rocksdb_options_optimize_for_point_lookup(self.inner, block_cache_size_mb);
}
}
pub fn set_optimize_filters_for_hits(&mut self, optimize_for_hits: bool) {
unsafe {
ffi::rocksdb_options_set_optimize_filters_for_hits(
self.inner,
c_int::from(optimize_for_hits),
);
}
}
pub fn set_delete_obsolete_files_period_micros(&mut self, micros: u64) {
unsafe {
ffi::rocksdb_options_set_delete_obsolete_files_period_micros(self.inner, micros);
}
}
pub fn prepare_for_bulk_load(&mut self) {
unsafe {
ffi::rocksdb_options_prepare_for_bulk_load(self.inner);
}
}
pub fn set_max_open_files(&mut self, nfiles: c_int) {
unsafe {
ffi::rocksdb_options_set_max_open_files(self.inner, nfiles);
}
}
pub fn set_max_file_opening_threads(&mut self, nthreads: c_int) {
unsafe {
ffi::rocksdb_options_set_max_file_opening_threads(self.inner, nthreads);
}
}
pub fn set_use_fsync(&mut self, useit: bool) {
unsafe {
ffi::rocksdb_options_set_use_fsync(self.inner, c_int::from(useit));
}
}
pub fn get_use_fsync(&self) -> bool {
let val = unsafe { ffi::rocksdb_options_get_use_fsync(self.inner) };
val != 0
}
pub fn set_db_log_dir<P: AsRef<Path>>(&mut self, path: P) {
let p = to_cpath(path).unwrap();
unsafe {
ffi::rocksdb_options_set_db_log_dir(self.inner, p.as_ptr());
}
}
pub fn set_log_level(&mut self, level: LogLevel) {
unsafe {
ffi::rocksdb_options_set_info_log_level(self.inner, level as c_int);
}
}
pub fn set_bytes_per_sync(&mut self, nbytes: u64) {
unsafe {
ffi::rocksdb_options_set_bytes_per_sync(self.inner, nbytes);
}
}
pub fn set_wal_bytes_per_sync(&mut self, nbytes: u64) {
unsafe {
ffi::rocksdb_options_set_wal_bytes_per_sync(self.inner, nbytes);
}
}
pub fn set_writable_file_max_buffer_size(&mut self, nbytes: u64) {
unsafe {
ffi::rocksdb_options_set_writable_file_max_buffer_size(self.inner, nbytes);
}
}
pub fn set_allow_concurrent_memtable_write(&mut self, allow: bool) {
unsafe {
ffi::rocksdb_options_set_allow_concurrent_memtable_write(
self.inner,
c_uchar::from(allow),
);
}
}
pub fn set_enable_write_thread_adaptive_yield(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_enable_write_thread_adaptive_yield(
self.inner,
c_uchar::from(enabled),
);
}
}
pub fn set_max_sequential_skip_in_iterations(&mut self, num: u64) {
unsafe {
ffi::rocksdb_options_set_max_sequential_skip_in_iterations(self.inner, num);
}
}
pub fn set_use_direct_reads(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_use_direct_reads(self.inner, c_uchar::from(enabled));
}
}
pub fn set_use_direct_io_for_flush_and_compaction(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_use_direct_io_for_flush_and_compaction(
self.inner,
c_uchar::from(enabled),
);
}
}
pub fn set_is_fd_close_on_exec(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_is_fd_close_on_exec(self.inner, c_uchar::from(enabled));
}
}
#[deprecated(
since = "0.7.0",
note = "replaced with set_use_direct_reads/set_use_direct_io_for_flush_and_compaction methods"
)]
pub fn set_allow_os_buffer(&mut self, is_allow: bool) {
self.set_use_direct_reads(!is_allow);
self.set_use_direct_io_for_flush_and_compaction(!is_allow);
}
pub fn set_table_cache_num_shard_bits(&mut self, nbits: c_int) {
unsafe {
ffi::rocksdb_options_set_table_cache_numshardbits(self.inner, nbits);
}
}
pub fn set_target_file_size_multiplier(&mut self, multiplier: i32) {
unsafe {
ffi::rocksdb_options_set_target_file_size_multiplier(self.inner, multiplier as c_int);
}
}
pub fn set_min_write_buffer_number(&mut self, nbuf: c_int) {
unsafe {
ffi::rocksdb_options_set_min_write_buffer_number_to_merge(self.inner, nbuf);
}
}
pub fn set_max_write_buffer_number(&mut self, nbuf: c_int) {
unsafe {
ffi::rocksdb_options_set_max_write_buffer_number(self.inner, nbuf);
}
}
pub fn set_write_buffer_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_options_set_write_buffer_size(self.inner, size);
}
}
pub fn set_db_write_buffer_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_options_set_db_write_buffer_size(self.inner, size);
}
}
pub fn set_max_bytes_for_level_base(&mut self, size: u64) {
unsafe {
ffi::rocksdb_options_set_max_bytes_for_level_base(self.inner, size);
}
}
pub fn set_max_bytes_for_level_multiplier(&mut self, mul: f64) {
unsafe {
ffi::rocksdb_options_set_max_bytes_for_level_multiplier(self.inner, mul);
}
}
pub fn set_max_manifest_file_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_options_set_max_manifest_file_size(self.inner, size);
}
}
pub fn set_target_file_size_base(&mut self, size: u64) {
unsafe {
ffi::rocksdb_options_set_target_file_size_base(self.inner, size);
}
}
pub fn set_min_write_buffer_number_to_merge(&mut self, to_merge: c_int) {
unsafe {
ffi::rocksdb_options_set_min_write_buffer_number_to_merge(self.inner, to_merge);
}
}
pub fn set_level_zero_file_num_compaction_trigger(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_options_set_level0_file_num_compaction_trigger(self.inner, n);
}
}
pub fn set_level_zero_slowdown_writes_trigger(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_options_set_level0_slowdown_writes_trigger(self.inner, n);
}
}
pub fn set_level_zero_stop_writes_trigger(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_options_set_level0_stop_writes_trigger(self.inner, n);
}
}
pub fn set_compaction_style(&mut self, style: DBCompactionStyle) {
unsafe {
ffi::rocksdb_options_set_compaction_style(self.inner, style as c_int);
}
}
pub fn set_universal_compaction_options(&mut self, uco: &UniversalCompactOptions) {
unsafe {
ffi::rocksdb_options_set_universal_compaction_options(self.inner, uco.inner);
}
}
pub fn set_fifo_compaction_options(&mut self, fco: &FifoCompactOptions) {
unsafe {
ffi::rocksdb_options_set_fifo_compaction_options(self.inner, fco.inner);
}
}
pub fn set_unordered_write(&mut self, unordered: bool) {
unsafe {
ffi::rocksdb_options_set_unordered_write(self.inner, c_uchar::from(unordered));
}
}
pub fn set_max_subcompactions(&mut self, num: u32) {
unsafe {
ffi::rocksdb_options_set_max_subcompactions(self.inner, num);
}
}
pub fn set_max_background_jobs(&mut self, jobs: c_int) {
unsafe {
ffi::rocksdb_options_set_max_background_jobs(self.inner, jobs);
}
}
#[deprecated(
since = "0.15.0",
note = "RocksDB automatically decides this based on the value of max_background_jobs"
)]
pub fn set_max_background_compactions(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_options_set_max_background_compactions(self.inner, n);
}
}
#[deprecated(
since = "0.15.0",
note = "RocksDB automatically decides this based on the value of max_background_jobs"
)]
pub fn set_max_background_flushes(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_options_set_max_background_flushes(self.inner, n);
}
}
pub fn set_disable_auto_compactions(&mut self, disable: bool) {
unsafe {
ffi::rocksdb_options_set_disable_auto_compactions(self.inner, c_int::from(disable));
}
}
pub fn set_memtable_huge_page_size(&mut self, size: size_t) {
unsafe {
ffi::rocksdb_options_set_memtable_huge_page_size(self.inner, size);
}
}
pub fn set_max_successive_merges(&mut self, num: usize) {
unsafe {
ffi::rocksdb_options_set_max_successive_merges(self.inner, num);
}
}
pub fn set_bloom_locality(&mut self, v: u32) {
unsafe {
ffi::rocksdb_options_set_bloom_locality(self.inner, v);
}
}
pub fn set_inplace_update_support(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_inplace_update_support(self.inner, c_uchar::from(enabled));
}
}
pub fn set_inplace_update_locks(&mut self, num: usize) {
unsafe {
ffi::rocksdb_options_set_inplace_update_num_locks(self.inner, num);
}
}
pub fn set_max_bytes_for_level_multiplier_additional(&mut self, level_values: &[i32]) {
let count = level_values.len();
unsafe {
ffi::rocksdb_options_set_max_bytes_for_level_multiplier_additional(
self.inner,
level_values.as_ptr().cast_mut(),
count,
);
}
}
pub fn set_max_write_buffer_size_to_maintain(&mut self, size: i64) {
unsafe {
ffi::rocksdb_options_set_max_write_buffer_size_to_maintain(self.inner, size);
}
}
pub fn set_enable_pipelined_write(&mut self, value: bool) {
unsafe {
ffi::rocksdb_options_set_enable_pipelined_write(self.inner, c_uchar::from(value));
}
}
pub fn set_memtable_factory(&mut self, factory: MemtableFactory) {
match factory {
MemtableFactory::Vector => unsafe {
ffi::rocksdb_options_set_memtable_vector_rep(self.inner);
},
MemtableFactory::HashSkipList {
bucket_count,
height,
branching_factor,
} => unsafe {
ffi::rocksdb_options_set_hash_skip_list_rep(
self.inner,
bucket_count,
height,
branching_factor,
);
},
MemtableFactory::HashLinkList { bucket_count } => unsafe {
ffi::rocksdb_options_set_hash_link_list_rep(self.inner, bucket_count);
},
}
}
pub fn set_block_based_table_factory(&mut self, factory: &BlockBasedOptions) {
unsafe {
ffi::rocksdb_options_set_block_based_table_factory(self.inner, factory.inner);
}
self.outlive.block_based = Some(factory.outlive.clone());
}
pub fn set_cuckoo_table_factory(&mut self, factory: &CuckooTableOptions) {
unsafe {
ffi::rocksdb_options_set_cuckoo_table_factory(self.inner, factory.inner);
}
}
pub fn set_plain_table_factory(&mut self, options: &PlainTableFactoryOptions) {
unsafe {
ffi::rocksdb_options_set_plain_table_factory(
self.inner,
options.user_key_length,
options.bloom_bits_per_key,
options.hash_table_ratio,
options.index_sparseness,
options.huge_page_tlb_size,
options.encoding_type as c_char,
c_uchar::from(options.full_scan_mode),
c_uchar::from(options.store_index_in_file),
);
}
}
pub fn set_min_level_to_compress(&mut self, lvl: c_int) {
unsafe {
ffi::rocksdb_options_set_min_level_to_compress(self.inner, lvl);
}
}
pub fn set_report_bg_io_stats(&mut self, enable: bool) {
unsafe {
ffi::rocksdb_options_set_report_bg_io_stats(self.inner, c_int::from(enable));
}
}
pub fn set_max_total_wal_size(&mut self, size: u64) {
unsafe {
ffi::rocksdb_options_set_max_total_wal_size(self.inner, size);
}
}
pub fn set_wal_recovery_mode(&mut self, mode: DBRecoveryMode) {
unsafe {
ffi::rocksdb_options_set_wal_recovery_mode(self.inner, mode as c_int);
}
}
pub fn enable_statistics(&mut self) {
unsafe {
ffi::rocksdb_options_enable_statistics(self.inner);
}
}
pub fn get_statistics(&self) -> Option<String> {
unsafe {
let value = ffi::rocksdb_options_statistics_get_string(self.inner);
if value.is_null() {
return None;
}
Some(from_cstr_and_free(value))
}
}
pub fn set_statistics_level(&self, level: StatsLevel) {
unsafe { ffi::rocksdb_options_set_statistics_level(self.inner, level as c_int) }
}
pub fn get_ticker_count(&self, ticker: Ticker) -> u64 {
unsafe { ffi::rocksdb_options_statistics_get_ticker_count(self.inner, ticker as u32) }
}
pub fn get_histogram_data(&self, histogram: Histogram) -> HistogramData {
unsafe {
let data = HistogramData::default();
ffi::rocksdb_options_statistics_get_histogram_data(
self.inner,
histogram as u32,
data.inner,
);
data
}
}
pub fn set_stats_dump_period_sec(&mut self, period: c_uint) {
unsafe {
ffi::rocksdb_options_set_stats_dump_period_sec(self.inner, period);
}
}
pub fn set_stats_persist_period_sec(&mut self, period: c_uint) {
unsafe {
ffi::rocksdb_options_set_stats_persist_period_sec(self.inner, period);
}
}
pub fn set_advise_random_on_open(&mut self, advise: bool) {
unsafe {
ffi::rocksdb_options_set_advise_random_on_open(self.inner, c_uchar::from(advise));
}
}
pub fn set_use_adaptive_mutex(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_use_adaptive_mutex(self.inner, c_uchar::from(enabled));
}
}
pub fn set_num_levels(&mut self, n: c_int) {
unsafe {
ffi::rocksdb_options_set_num_levels(self.inner, n);
}
}
pub fn set_memtable_prefix_bloom_ratio(&mut self, ratio: f64) {
unsafe {
ffi::rocksdb_options_set_memtable_prefix_bloom_size_ratio(self.inner, ratio);
}
}
pub fn set_max_compaction_bytes(&mut self, nbytes: u64) {
unsafe {
ffi::rocksdb_options_set_max_compaction_bytes(self.inner, nbytes);
}
}
pub fn set_wal_dir<P: AsRef<Path>>(&mut self, path: P) {
let p = to_cpath(path).unwrap();
unsafe {
ffi::rocksdb_options_set_wal_dir(self.inner, p.as_ptr());
}
}
pub fn set_wal_ttl_seconds(&mut self, secs: u64) {
unsafe {
ffi::rocksdb_options_set_WAL_ttl_seconds(self.inner, secs);
}
}
pub fn set_wal_size_limit_mb(&mut self, size: u64) {
unsafe {
ffi::rocksdb_options_set_WAL_size_limit_MB(self.inner, size);
}
}
pub fn set_manifest_preallocation_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_options_set_manifest_preallocation_size(self.inner, size);
}
}
pub fn set_skip_stats_update_on_db_open(&mut self, skip: bool) {
unsafe {
ffi::rocksdb_options_set_skip_stats_update_on_db_open(self.inner, c_uchar::from(skip));
}
}
pub fn set_keep_log_file_num(&mut self, nfiles: usize) {
unsafe {
ffi::rocksdb_options_set_keep_log_file_num(self.inner, nfiles);
}
}
pub fn set_allow_mmap_writes(&mut self, is_enabled: bool) {
unsafe {
ffi::rocksdb_options_set_allow_mmap_writes(self.inner, c_uchar::from(is_enabled));
}
}
pub fn set_allow_mmap_reads(&mut self, is_enabled: bool) {
unsafe {
ffi::rocksdb_options_set_allow_mmap_reads(self.inner, c_uchar::from(is_enabled));
}
}
pub fn set_manual_wal_flush(&mut self, is_enabled: bool) {
unsafe {
ffi::rocksdb_options_set_manual_wal_flush(self.inner, c_uchar::from(is_enabled));
}
}
pub fn set_atomic_flush(&mut self, atomic_flush: bool) {
unsafe {
ffi::rocksdb_options_set_atomic_flush(self.inner, c_uchar::from(atomic_flush));
}
}
pub fn set_row_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_options_set_row_cache(self.inner, cache.0.inner.as_ptr());
}
self.outlive.row_cache = Some(cache.clone());
}
pub fn set_ratelimiter(
&mut self,
rate_bytes_per_sec: i64,
refill_period_us: i64,
fairness: i32,
) {
unsafe {
let ratelimiter =
ffi::rocksdb_ratelimiter_create(rate_bytes_per_sec, refill_period_us, fairness);
ffi::rocksdb_options_set_ratelimiter(self.inner, ratelimiter);
ffi::rocksdb_ratelimiter_destroy(ratelimiter);
}
}
pub fn set_auto_tuned_ratelimiter(
&mut self,
rate_bytes_per_sec: i64,
refill_period_us: i64,
fairness: i32,
) {
unsafe {
let ratelimiter = ffi::rocksdb_ratelimiter_create_auto_tuned(
rate_bytes_per_sec,
refill_period_us,
fairness,
);
ffi::rocksdb_options_set_ratelimiter(self.inner, ratelimiter);
ffi::rocksdb_ratelimiter_destroy(ratelimiter);
}
}
pub fn set_ratelimiter_with_mode(
&mut self,
rate_bytes_per_sec: i64,
refill_period_us: i64,
fairness: i32,
mode: RateLimiterMode,
auto_tuned: bool,
) {
unsafe {
let ratelimiter = ffi::rocksdb_ratelimiter_create_with_mode(
rate_bytes_per_sec,
refill_period_us,
fairness,
mode as c_int,
auto_tuned,
);
ffi::rocksdb_options_set_ratelimiter(self.inner, ratelimiter);
ffi::rocksdb_ratelimiter_destroy(ratelimiter);
}
}
pub fn set_max_log_file_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_options_set_max_log_file_size(self.inner, size);
}
}
pub fn set_log_file_time_to_roll(&mut self, secs: usize) {
unsafe {
ffi::rocksdb_options_set_log_file_time_to_roll(self.inner, secs);
}
}
pub fn set_recycle_log_file_num(&mut self, num: usize) {
unsafe {
ffi::rocksdb_options_set_recycle_log_file_num(self.inner, num);
}
}
pub fn set_stderr_logger(&mut self, log_level: LogLevel, prefix: impl CStrLike) {
let p = prefix.into_c_string().unwrap();
unsafe {
let logger = ffi::rocksdb_logger_create_stderr_logger(log_level as c_int, p.as_ptr());
ffi::rocksdb_options_set_info_log(self.inner, logger);
ffi::rocksdb_logger_destroy(logger);
}
}
pub fn set_callback_logger(
&mut self,
log_level: LogLevel,
callback: impl Fn(LogLevel, &str) + 'static + Send + Sync,
) {
let holder = Arc::new(LogCallback {
callback: Box::new(callback),
});
let holder_ptr = holder.as_ref() as *const LogCallback;
let holder_cvoid = holder_ptr.cast::<c_void>().cast_mut();
unsafe {
let logger = ffi::rocksdb_logger_create_callback_logger(
log_level as c_int,
Some(Self::logger_callback),
holder_cvoid,
);
ffi::rocksdb_options_set_info_log(self.inner, logger);
ffi::rocksdb_logger_destroy(logger);
}
self.outlive.log_callback = Some(holder);
}
extern "C" fn logger_callback(func: *mut c_void, level: u32, msg: *mut c_char, len: usize) {
use std::{mem, process, str};
let level = unsafe { mem::transmute::<u32, LogLevel>(level) };
let slice = unsafe { slice::from_raw_parts_mut(msg.cast::<u8>(), len) };
let msg = unsafe { str::from_utf8_unchecked(slice) };
let holder = unsafe { &mut *func.cast::<LogCallback>() };
let mut callback_in_catch_unwind = AssertUnwindSafe(&mut holder.callback);
if catch_unwind(move || callback_in_catch_unwind(level, msg)).is_err() {
process::abort();
}
}
pub fn set_soft_pending_compaction_bytes_limit(&mut self, limit: usize) {
unsafe {
ffi::rocksdb_options_set_soft_pending_compaction_bytes_limit(self.inner, limit);
}
}
pub fn set_hard_pending_compaction_bytes_limit(&mut self, limit: usize) {
unsafe {
ffi::rocksdb_options_set_hard_pending_compaction_bytes_limit(self.inner, limit);
}
}
pub fn set_arena_block_size(&mut self, size: usize) {
unsafe {
ffi::rocksdb_options_set_arena_block_size(self.inner, size);
}
}
pub fn set_dump_malloc_stats(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_dump_malloc_stats(self.inner, c_uchar::from(enabled));
}
}
pub fn set_memtable_whole_key_filtering(&mut self, whole_key_filter: bool) {
unsafe {
ffi::rocksdb_options_set_memtable_whole_key_filtering(
self.inner,
c_uchar::from(whole_key_filter),
);
}
}
pub fn set_enable_blob_files(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_enable_blob_files(self.inner, u8::from(val));
}
}
pub fn set_min_blob_size(&mut self, val: u64) {
unsafe {
ffi::rocksdb_options_set_min_blob_size(self.inner, val);
}
}
pub fn set_blob_file_size(&mut self, val: u64) {
unsafe {
ffi::rocksdb_options_set_blob_file_size(self.inner, val);
}
}
pub fn set_blob_compression_type(&mut self, val: DBCompressionType) {
unsafe {
ffi::rocksdb_options_set_blob_compression_type(self.inner, val as _);
}
}
pub fn set_enable_blob_gc(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_enable_blob_gc(self.inner, u8::from(val));
}
}
pub fn set_blob_gc_age_cutoff(&mut self, val: c_double) {
unsafe {
ffi::rocksdb_options_set_blob_gc_age_cutoff(self.inner, val);
}
}
pub fn set_blob_gc_force_threshold(&mut self, val: c_double) {
unsafe {
ffi::rocksdb_options_set_blob_gc_force_threshold(self.inner, val);
}
}
pub fn set_blob_compaction_readahead_size(&mut self, val: u64) {
unsafe {
ffi::rocksdb_options_set_blob_compaction_readahead_size(self.inner, val);
}
}
pub fn set_blob_cache(&mut self, cache: &Cache) {
unsafe {
ffi::rocksdb_options_set_blob_cache(self.inner, cache.0.inner.as_ptr());
}
self.outlive.blob_cache = Some(cache.clone());
}
pub fn set_allow_ingest_behind(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_allow_ingest_behind(self.inner, c_uchar::from(val));
}
}
pub fn add_compact_on_deletion_collector_factory(
&mut self,
window_size: size_t,
num_dels_trigger: size_t,
deletion_ratio: f64,
) {
unsafe {
ffi::rocksdb_options_add_compact_on_deletion_collector_factory_del_ratio(
self.inner,
window_size,
num_dels_trigger,
deletion_ratio,
);
}
}
pub fn add_compact_on_deletion_collector_factory_min_file_size(
&mut self,
window_size: size_t,
num_dels_trigger: size_t,
deletion_ratio: f64,
min_file_size: u64,
) {
unsafe {
ffi::rocksdb_options_add_compact_on_deletion_collector_factory_min_file_size(
self.inner,
window_size,
num_dels_trigger,
deletion_ratio,
min_file_size,
);
}
}
pub fn set_write_buffer_manager(&mut self, write_buffer_manager: &WriteBufferManager) {
unsafe {
ffi::rocksdb_options_set_write_buffer_manager(
self.inner,
write_buffer_manager.0.inner.as_ptr(),
);
}
self.outlive.write_buffer_manager = Some(write_buffer_manager.clone());
}
pub fn set_sst_file_manager(&mut self, sst_file_manager: &SstFileManager) {
unsafe {
ffi::rocksdb_options_set_sst_file_manager(
self.inner,
sst_file_manager.0.inner.as_ptr(),
);
}
self.outlive.sst_file_manager = Some(sst_file_manager.clone());
}
pub fn set_avoid_unnecessary_blocking_io(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_avoid_unnecessary_blocking_io(self.inner, u8::from(val));
}
}
pub fn set_compaction_pri(&mut self, pri: DBCompactionPri) {
unsafe {
ffi::rocksdb_options_set_compaction_pri(self.inner, pri as c_int);
}
}
pub fn set_track_and_verify_wals_in_manifest(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_track_and_verify_wals_in_manifest(self.inner, u8::from(val));
}
}
pub fn get_track_and_verify_wals_in_manifest(&self) -> bool {
let val_u8 =
unsafe { ffi::rocksdb_options_get_track_and_verify_wals_in_manifest(self.inner) };
val_u8 != 0
}
pub fn set_write_dbid_to_manifest(&mut self, val: bool) {
unsafe {
ffi::rocksdb_options_set_write_dbid_to_manifest(self.inner, u8::from(val));
}
}
pub fn get_write_dbid_to_manifest(&self) -> bool {
let val_u8 = unsafe { ffi::rocksdb_options_get_write_dbid_to_manifest(self.inner) };
val_u8 != 0
}
pub fn set_info_logger(&mut self, mut logger: InfoLogger) {
self.outlive.logger_callback = logger.callback.take();
unsafe {
ffi::rocksdb_options_set_info_log(self.inner, logger.inner);
}
}
pub fn get_info_logger(&self) -> InfoLogger {
let raw = unsafe { ffi::rocksdb_options_get_info_log(self.inner) };
InfoLogger {
inner: raw,
callback: self.outlive.logger_callback.clone(),
}
}
}
impl Default for Options {
fn default() -> Self {
unsafe {
let opts = ffi::rocksdb_options_create();
assert!(!opts.is_null(), "Could not create RocksDB options");
Self {
inner: opts,
outlive: OptionsMustOutliveDB::default(),
}
}
}
}
impl FlushOptions {
pub fn new() -> FlushOptions {
FlushOptions::default()
}
pub fn set_wait(&mut self, wait: bool) {
unsafe {
ffi::rocksdb_flushoptions_set_wait(self.inner, c_uchar::from(wait));
}
}
}
impl Default for FlushOptions {
fn default() -> Self {
let flush_opts = unsafe { ffi::rocksdb_flushoptions_create() };
assert!(
!flush_opts.is_null(),
"Could not create RocksDB flush options"
);
Self { inner: flush_opts }
}
}
impl WriteOptions {
pub fn new() -> WriteOptions {
WriteOptions::default()
}
pub fn set_sync(&mut self, sync: bool) {
unsafe {
ffi::rocksdb_writeoptions_set_sync(self.inner, c_uchar::from(sync));
}
}
pub fn disable_wal(&mut self, disable: bool) {
unsafe {
ffi::rocksdb_writeoptions_disable_WAL(self.inner, c_int::from(disable));
}
}
pub fn set_ignore_missing_column_families(&mut self, ignore: bool) {
unsafe {
ffi::rocksdb_writeoptions_set_ignore_missing_column_families(
self.inner,
c_uchar::from(ignore),
);
}
}
pub fn set_no_slowdown(&mut self, no_slowdown: bool) {
unsafe {
ffi::rocksdb_writeoptions_set_no_slowdown(self.inner, c_uchar::from(no_slowdown));
}
}
pub fn set_low_pri(&mut self, v: bool) {
unsafe {
ffi::rocksdb_writeoptions_set_low_pri(self.inner, c_uchar::from(v));
}
}
pub fn set_memtable_insert_hint_per_batch(&mut self, v: bool) {
unsafe {
ffi::rocksdb_writeoptions_set_memtable_insert_hint_per_batch(
self.inner,
c_uchar::from(v),
);
}
}
}
impl Default for WriteOptions {
fn default() -> Self {
let write_opts = unsafe { ffi::rocksdb_writeoptions_create() };
assert!(
!write_opts.is_null(),
"Could not create RocksDB write options"
);
Self { inner: write_opts }
}
}
impl LruCacheOptions {
pub fn set_capacity(&mut self, cap: usize) {
unsafe {
ffi::rocksdb_lru_cache_options_set_capacity(self.inner, cap);
}
}
pub fn set_num_shard_bits(&mut self, val: c_int) {
unsafe {
ffi::rocksdb_lru_cache_options_set_num_shard_bits(self.inner, val);
}
}
}
impl Default for LruCacheOptions {
fn default() -> Self {
let inner = unsafe { ffi::rocksdb_lru_cache_options_create() };
assert!(
!inner.is_null(),
"Could not create RocksDB LRU cache options"
);
Self { inner }
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[repr(i32)]
pub enum ReadTier {
All = 0,
BlockCache,
Persisted,
Memtable,
}
impl ReadOptions {
pub fn fill_cache(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_fill_cache(self.inner, c_uchar::from(v));
}
}
pub fn set_snapshot<D: DBAccess>(&mut self, snapshot: &SnapshotWithThreadMode<D>) {
unsafe {
ffi::rocksdb_readoptions_set_snapshot(self.inner, snapshot.inner);
}
}
pub fn set_iterate_lower_bound<K: Into<Vec<u8>>>(&mut self, key: K) {
self.set_lower_bound_impl(Some(key.into()));
}
pub fn set_iterate_upper_bound<K: Into<Vec<u8>>>(&mut self, key: K) {
self.set_upper_bound_impl(Some(key.into()));
}
pub fn set_iterate_range(&mut self, range: impl crate::IterateBounds) {
let (lower, upper) = range.into_bounds();
self.set_lower_bound_impl(lower);
self.set_upper_bound_impl(upper);
}
fn set_lower_bound_impl(&mut self, bound: Option<Vec<u8>>) {
let (ptr, len) = if let Some(ref bound) = bound {
(bound.as_ptr() as *const c_char, bound.len())
} else if self.iterate_lower_bound.is_some() {
(std::ptr::null(), 0)
} else {
return;
};
self.iterate_lower_bound = bound;
unsafe {
ffi::rocksdb_readoptions_set_iterate_lower_bound(self.inner, ptr, len);
}
}
fn set_upper_bound_impl(&mut self, bound: Option<Vec<u8>>) {
let (ptr, len) = if let Some(ref bound) = bound {
(bound.as_ptr() as *const c_char, bound.len())
} else if self.iterate_upper_bound.is_some() {
(std::ptr::null(), 0)
} else {
return;
};
self.iterate_upper_bound = bound;
unsafe {
ffi::rocksdb_readoptions_set_iterate_upper_bound(self.inner, ptr, len);
}
}
pub fn set_read_tier(&mut self, tier: ReadTier) {
unsafe {
ffi::rocksdb_readoptions_set_read_tier(self.inner, tier as c_int);
}
}
pub fn set_prefix_same_as_start(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_prefix_same_as_start(self.inner, c_uchar::from(v));
}
}
pub fn set_total_order_seek(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_total_order_seek(self.inner, c_uchar::from(v));
}
}
pub fn set_max_skippable_internal_keys(&mut self, num: u64) {
unsafe {
ffi::rocksdb_readoptions_set_max_skippable_internal_keys(self.inner, num);
}
}
pub fn set_background_purge_on_iterator_cleanup(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_background_purge_on_iterator_cleanup(
self.inner,
c_uchar::from(v),
);
}
}
#[deprecated(
note = "deprecated in RocksDB 10.2.1: no performance impact if DeleteRange is not used"
)]
pub fn set_ignore_range_deletions(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_ignore_range_deletions(self.inner, c_uchar::from(v));
}
}
pub fn set_verify_checksums(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_verify_checksums(self.inner, c_uchar::from(v));
}
}
pub fn set_readahead_size(&mut self, v: usize) {
unsafe {
ffi::rocksdb_readoptions_set_readahead_size(self.inner, v as size_t);
}
}
pub fn set_auto_readahead_size(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_auto_readahead_size(self.inner, c_uchar::from(v));
}
}
pub fn set_tailing(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_tailing(self.inner, c_uchar::from(v));
}
}
pub fn set_pin_data(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_pin_data(self.inner, c_uchar::from(v));
}
}
pub fn set_async_io(&mut self, v: bool) {
unsafe {
ffi::rocksdb_readoptions_set_async_io(self.inner, c_uchar::from(v));
}
}
pub fn set_deadline(&mut self, microseconds: u64) {
unsafe {
ffi::rocksdb_readoptions_set_deadline(self.inner, microseconds);
}
}
pub fn set_io_timeout(&mut self, microseconds: u64) {
unsafe {
ffi::rocksdb_readoptions_set_io_timeout(self.inner, microseconds);
}
}
pub fn set_timestamp<S: Into<Vec<u8>>>(&mut self, ts: S) {
self.set_timestamp_impl(Some(ts.into()));
}
fn set_timestamp_impl(&mut self, ts: Option<Vec<u8>>) {
let (ptr, len) = if let Some(ref ts) = ts {
(ts.as_ptr() as *const c_char, ts.len())
} else if self.timestamp.is_some() {
(std::ptr::null(), 0)
} else {
return;
};
self.timestamp = ts;
unsafe {
ffi::rocksdb_readoptions_set_timestamp(self.inner, ptr, len);
}
}
pub fn set_iter_start_ts<S: Into<Vec<u8>>>(&mut self, ts: S) {
self.set_iter_start_ts_impl(Some(ts.into()));
}
fn set_iter_start_ts_impl(&mut self, ts: Option<Vec<u8>>) {
let (ptr, len) = if let Some(ref ts) = ts {
(ts.as_ptr() as *const c_char, ts.len())
} else if self.timestamp.is_some() {
(std::ptr::null(), 0)
} else {
return;
};
self.iter_start_ts = ts;
unsafe {
ffi::rocksdb_readoptions_set_iter_start_ts(self.inner, ptr, len);
}
}
}
impl Default for ReadOptions {
fn default() -> Self {
unsafe {
Self {
inner: ffi::rocksdb_readoptions_create(),
timestamp: None,
iter_start_ts: None,
iterate_upper_bound: None,
iterate_lower_bound: None,
}
}
}
}
impl IngestExternalFileOptions {
pub fn set_move_files(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_move_files(self.inner, c_uchar::from(v));
}
}
pub fn set_snapshot_consistency(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_snapshot_consistency(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_allow_global_seqno(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_allow_global_seqno(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_allow_blocking_flush(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_allow_blocking_flush(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_ingest_behind(&mut self, v: bool) {
unsafe {
ffi::rocksdb_ingestexternalfileoptions_set_ingest_behind(self.inner, c_uchar::from(v));
}
}
}
impl Default for IngestExternalFileOptions {
fn default() -> Self {
unsafe {
Self {
inner: ffi::rocksdb_ingestexternalfileoptions_create(),
}
}
}
}
pub enum BlockBasedIndexType {
BinarySearch,
HashSearch,
TwoLevelIndexSearch,
}
#[repr(C)]
pub enum DataBlockIndexType {
BinarySearch = 0,
BinaryAndHash = 1,
}
pub enum MemtableFactory {
Vector,
HashSkipList {
bucket_count: usize,
height: i32,
branching_factor: i32,
},
HashLinkList {
bucket_count: usize,
},
}
pub enum ChecksumType {
NoChecksum = 0,
CRC32c = 1,
XXHash = 2,
XXHash64 = 3,
XXH3 = 4, }
#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
pub enum KeyEncodingType {
#[default]
Plain = 0,
Prefix = 1,
}
pub struct PlainTableFactoryOptions {
pub user_key_length: u32,
pub bloom_bits_per_key: i32,
pub hash_table_ratio: f64,
pub index_sparseness: usize,
pub huge_page_tlb_size: usize,
pub encoding_type: KeyEncodingType,
pub full_scan_mode: bool,
pub store_index_in_file: bool,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum DBCompressionType {
None = ffi::rocksdb_no_compression as isize,
Snappy = ffi::rocksdb_snappy_compression as isize,
Zlib = ffi::rocksdb_zlib_compression as isize,
Bz2 = ffi::rocksdb_bz2_compression as isize,
Lz4 = ffi::rocksdb_lz4_compression as isize,
Lz4hc = ffi::rocksdb_lz4hc_compression as isize,
Zstd = ffi::rocksdb_zstd_compression as isize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum DBCompactionStyle {
Level = ffi::rocksdb_level_compaction as isize,
Universal = ffi::rocksdb_universal_compaction as isize,
Fifo = ffi::rocksdb_fifo_compaction as isize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum DBRecoveryMode {
TolerateCorruptedTailRecords = ffi::rocksdb_tolerate_corrupted_tail_records_recovery as isize,
AbsoluteConsistency = ffi::rocksdb_absolute_consistency_recovery as isize,
PointInTime = ffi::rocksdb_point_in_time_recovery as isize,
SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(i32)]
pub enum RateLimiterMode {
KReadsOnly = 0,
KWritesOnly = 1,
KAllIo = 2,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum DBCompactionPri {
ByCompensatedSize = ffi::rocksdb_k_by_compensated_size_compaction_pri as isize,
OldestLargestSeqFirst = ffi::rocksdb_k_oldest_largest_seq_first_compaction_pri as isize,
OldestSmallestSeqFirst = ffi::rocksdb_k_oldest_smallest_seq_first_compaction_pri as isize,
MinOverlappingRatio = ffi::rocksdb_k_min_overlapping_ratio_compaction_pri as isize,
RoundRobin = ffi::rocksdb_k_round_robin_compaction_pri as isize,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum BlockBasedPinningTier {
Fallback = ffi::rocksdb_block_based_k_fallback_pinning_tier as isize,
None = ffi::rocksdb_block_based_k_none_pinning_tier as isize,
FlushAndSimilar = ffi::rocksdb_block_based_k_flush_and_similar_pinning_tier as isize,
All = ffi::rocksdb_block_based_k_all_pinning_tier as isize,
}
pub struct FifoCompactOptions {
pub(crate) inner: *mut ffi::rocksdb_fifo_compaction_options_t,
}
impl Default for FifoCompactOptions {
fn default() -> Self {
let opts = unsafe { ffi::rocksdb_fifo_compaction_options_create() };
assert!(
!opts.is_null(),
"Could not create RocksDB Fifo Compaction Options"
);
Self { inner: opts }
}
}
impl Drop for FifoCompactOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_fifo_compaction_options_destroy(self.inner);
}
}
}
impl FifoCompactOptions {
pub fn set_max_table_files_size(&mut self, nbytes: u64) {
unsafe {
ffi::rocksdb_fifo_compaction_options_set_max_table_files_size(self.inner, nbytes);
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum UniversalCompactionStopStyle {
Similar = ffi::rocksdb_similar_size_compaction_stop_style as isize,
Total = ffi::rocksdb_total_size_compaction_stop_style as isize,
}
pub struct UniversalCompactOptions {
pub(crate) inner: *mut ffi::rocksdb_universal_compaction_options_t,
}
impl Default for UniversalCompactOptions {
fn default() -> Self {
let opts = unsafe { ffi::rocksdb_universal_compaction_options_create() };
assert!(
!opts.is_null(),
"Could not create RocksDB Universal Compaction Options"
);
Self { inner: opts }
}
}
impl Drop for UniversalCompactOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_universal_compaction_options_destroy(self.inner);
}
}
}
impl UniversalCompactOptions {
pub fn set_size_ratio(&mut self, ratio: c_int) {
unsafe {
ffi::rocksdb_universal_compaction_options_set_size_ratio(self.inner, ratio);
}
}
pub fn set_min_merge_width(&mut self, num: c_int) {
unsafe {
ffi::rocksdb_universal_compaction_options_set_min_merge_width(self.inner, num);
}
}
pub fn set_max_merge_width(&mut self, num: c_int) {
unsafe {
ffi::rocksdb_universal_compaction_options_set_max_merge_width(self.inner, num);
}
}
pub fn set_max_size_amplification_percent(&mut self, v: c_int) {
unsafe {
ffi::rocksdb_universal_compaction_options_set_max_size_amplification_percent(
self.inner, v,
);
}
}
pub fn set_compression_size_percent(&mut self, v: c_int) {
unsafe {
ffi::rocksdb_universal_compaction_options_set_compression_size_percent(self.inner, v);
}
}
pub fn set_stop_style(&mut self, style: UniversalCompactionStopStyle) {
unsafe {
ffi::rocksdb_universal_compaction_options_set_stop_style(self.inner, style as c_int);
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
#[repr(u8)]
pub enum BottommostLevelCompaction {
Skip = 0,
IfHaveCompactionFilter,
Force,
ForceOptimized,
}
pub struct CompactOptions {
pub(crate) inner: *mut ffi::rocksdb_compactoptions_t,
full_history_ts_low: Option<Vec<u8>>,
}
impl Default for CompactOptions {
fn default() -> Self {
let opts = unsafe { ffi::rocksdb_compactoptions_create() };
assert!(!opts.is_null(), "Could not create RocksDB Compact Options");
Self {
inner: opts,
full_history_ts_low: None,
}
}
}
impl Drop for CompactOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_compactoptions_destroy(self.inner);
}
}
}
impl CompactOptions {
pub fn set_exclusive_manual_compaction(&mut self, v: bool) {
unsafe {
ffi::rocksdb_compactoptions_set_exclusive_manual_compaction(
self.inner,
c_uchar::from(v),
);
}
}
pub fn set_bottommost_level_compaction(&mut self, lvl: BottommostLevelCompaction) {
unsafe {
ffi::rocksdb_compactoptions_set_bottommost_level_compaction(self.inner, lvl as c_uchar);
}
}
pub fn set_change_level(&mut self, v: bool) {
unsafe {
ffi::rocksdb_compactoptions_set_change_level(self.inner, c_uchar::from(v));
}
}
pub fn set_target_level(&mut self, lvl: c_int) {
unsafe {
ffi::rocksdb_compactoptions_set_target_level(self.inner, lvl);
}
}
pub fn set_full_history_ts_low<S: Into<Vec<u8>>>(&mut self, ts: S) {
self.set_full_history_ts_low_impl(Some(ts.into()));
}
fn set_full_history_ts_low_impl(&mut self, ts: Option<Vec<u8>>) {
let (ptr, len) = if let Some(ref ts) = ts {
(ts.as_ptr() as *mut c_char, ts.len())
} else if self.full_history_ts_low.is_some() {
(std::ptr::null::<Vec<u8>>() as *mut c_char, 0)
} else {
return;
};
self.full_history_ts_low = ts;
unsafe {
ffi::rocksdb_compactoptions_set_full_history_ts_low(self.inner, ptr, len);
}
}
}
pub struct WaitForCompactOptions {
pub(crate) inner: *mut ffi::rocksdb_wait_for_compact_options_t,
}
impl Default for WaitForCompactOptions {
fn default() -> Self {
let opts = unsafe { ffi::rocksdb_wait_for_compact_options_create() };
assert!(
!opts.is_null(),
"Could not create RocksDB Wait For Compact Options"
);
Self { inner: opts }
}
}
impl Drop for WaitForCompactOptions {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_wait_for_compact_options_destroy(self.inner);
}
}
}
impl WaitForCompactOptions {
pub fn set_abort_on_pause(&mut self, v: bool) {
unsafe {
ffi::rocksdb_wait_for_compact_options_set_abort_on_pause(self.inner, c_uchar::from(v));
}
}
pub fn set_flush(&mut self, v: bool) {
unsafe {
ffi::rocksdb_wait_for_compact_options_set_flush(self.inner, c_uchar::from(v));
}
}
pub fn set_timeout(&mut self, microseconds: u64) {
unsafe {
ffi::rocksdb_wait_for_compact_options_set_timeout(self.inner, microseconds);
}
}
}
pub struct DBPath {
pub(crate) inner: *mut ffi::rocksdb_dbpath_t,
}
impl DBPath {
pub fn new<P: AsRef<Path>>(path: P, target_size: u64) -> Result<Self, Error> {
let p = to_cpath(path.as_ref()).unwrap();
let dbpath = unsafe { ffi::rocksdb_dbpath_create(p.as_ptr(), target_size) };
if dbpath.is_null() {
Err(Error::new(format!(
"Could not create path for storing sst files at location: {}",
path.as_ref().display()
)))
} else {
Ok(DBPath { inner: dbpath })
}
}
}
impl Drop for DBPath {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_dbpath_destroy(self.inner);
}
}
}
pub struct InfoLogger {
pub(crate) inner: *mut ffi::rocksdb_logger_t,
callback: Option<Arc<LoggerCallback>>,
}
impl InfoLogger {
pub fn new_stderr_logger<S: AsRef<str>>(log_level: LogLevel, prefix: Option<S>) -> Self {
let prefix = prefix.map(|s| {
s.as_ref()
.into_c_string()
.expect("cannot have NULL in prefix")
});
let prefix_ptr = match prefix.as_ref() {
Some(s) => s.as_ptr(),
None => std::ptr::null(),
};
let inner =
unsafe { ffi::rocksdb_logger_create_stderr_logger(log_level as i32, prefix_ptr) };
Self {
inner,
callback: None,
}
}
pub fn new_callback_logger<F: Fn(LogLevel, &str) + Sync + Send + 'static>(
level: LogLevel,
cb: F,
) -> Self {
let arc_cb: Arc<LoggerCallback> = Arc::new(Box::new(cb));
let raw_cb: LoggerCallbackPtr = Arc::as_ptr(&arc_cb);
let inner = unsafe {
ffi::rocksdb_logger_create_callback_logger(
level as i32,
Some(logger_callback),
raw_cb as *mut c_void,
)
};
Self {
inner,
callback: Some(arc_cb),
}
}
}
impl Drop for InfoLogger {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_logger_destroy(self.inner);
}
}
}
pub struct ImportColumnFamilyOptions {
pub(crate) inner: *mut ffi::rocksdb_import_column_family_options_t,
}
impl ImportColumnFamilyOptions {
pub fn new() -> Self {
let inner = unsafe { ffi::rocksdb_import_column_family_options_create() };
ImportColumnFamilyOptions { inner }
}
pub fn set_move_files(&mut self, move_files: bool) {
unsafe {
ffi::rocksdb_import_column_family_options_set_move_files(
self.inner,
c_uchar::from(move_files),
);
}
}
}
impl Default for ImportColumnFamilyOptions {
fn default() -> Self {
Self::new()
}
}
impl Drop for ImportColumnFamilyOptions {
fn drop(&mut self) {
unsafe { ffi::rocksdb_import_column_family_options_destroy(self.inner) }
}
}
type LoggerCallbackPtr = *const LoggerCallback;
unsafe extern "C" fn logger_callback(
raw_cb: *mut c_void,
level: c_uint,
msg: *mut c_char,
len: size_t,
) {
let rust_callback: &LoggerCallback = unsafe { &*(raw_cb as LoggerCallbackPtr) };
let raw_msg = unsafe { std::slice::from_raw_parts(msg as *const u8, len) };
let msg = String::from_utf8_lossy(raw_msg);
let level =
LogLevel::try_from_raw(level as i32).expect("rocksdb generated an invalid log level");
(rust_callback)(level, &msg);
}
#[cfg(test)]
mod tests {
use crate::cache::Cache;
use crate::db_options::{DBCompactionPri, InfoLogger, WriteBufferManager};
use crate::{MemtableFactory, Options};
#[test]
fn test_enable_statistics() {
let mut opts = Options::default();
assert_eq!(None, opts.get_statistics());
opts.enable_statistics();
opts.set_stats_dump_period_sec(60);
assert!(opts.get_statistics().is_some());
let opts = Options::default();
assert!(opts.get_statistics().is_none());
}
#[test]
fn test_set_memtable_factory() {
let mut opts = Options::default();
opts.set_memtable_factory(MemtableFactory::Vector);
opts.set_memtable_factory(MemtableFactory::HashLinkList { bucket_count: 100 });
opts.set_memtable_factory(MemtableFactory::HashSkipList {
bucket_count: 100,
height: 4,
branching_factor: 4,
});
}
#[test]
fn test_use_fsync() {
let mut opts = Options::default();
assert!(!opts.get_use_fsync());
opts.set_use_fsync(true);
assert!(opts.get_use_fsync());
}
#[test]
fn test_set_stats_persist_period_sec() {
let mut opts = Options::default();
opts.enable_statistics();
opts.set_stats_persist_period_sec(5);
assert!(opts.get_statistics().is_some());
let opts = Options::default();
assert!(opts.get_statistics().is_none());
}
#[test]
fn test_set_write_buffer_manager() {
let mut opts = Options::default();
let lrucache = Cache::new_lru_cache(100);
let write_buffer_manager =
WriteBufferManager::new_write_buffer_manager_with_cache(100, false, lrucache);
assert_eq!(write_buffer_manager.get_buffer_size(), 100);
assert_eq!(write_buffer_manager.get_usage(), 0);
assert!(write_buffer_manager.enabled());
opts.set_write_buffer_manager(&write_buffer_manager);
drop(opts);
assert!(write_buffer_manager.enabled());
}
#[test]
fn compaction_pri() {
let mut opts = Options::default();
opts.set_compaction_pri(DBCompactionPri::RoundRobin);
opts.create_if_missing(true);
let tmp = tempfile::tempdir().unwrap();
let _db = crate::DB::open(&opts, tmp.path()).unwrap();
let options = std::fs::read_dir(tmp.path())
.unwrap()
.find_map(|x| {
let x = x.ok()?;
x.file_name()
.into_string()
.unwrap()
.contains("OPTIONS")
.then_some(x.path())
})
.map(std::fs::read_to_string)
.unwrap()
.unwrap();
assert!(options.contains("compaction_pri=kRoundRobin"));
}
#[test]
fn test_callback_logger() {
let (log_snd, log_rcv) = std::sync::mpsc::channel();
let callback = move |level, msg: &str| {
log_snd.send((level, msg.to_string())).ok();
};
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_info_logger(InfoLogger::new_callback_logger(
super::LogLevel::Debug,
callback,
));
let tmp = tempfile::tempdir().unwrap();
let db = crate::DB::open(&opts, tmp.path()).unwrap();
db.put(b"testkey", b"testvalue").unwrap();
db.flush().unwrap();
db.delete(b"testkey").unwrap();
db.flush().unwrap();
db.compact_range(Some(b"a"), Some(b"z"));
assert!(log_rcv.try_recv().is_ok());
drop(db);
let tmp2 = tempfile::tempdir().unwrap();
let db2 = crate::DB::open(&opts, tmp2.path()).unwrap();
let logger = opts.get_info_logger();
drop(opts);
while log_rcv.try_recv().is_ok() {}
assert!(log_rcv.try_recv().is_err());
db2.put(b"testkey2", b"testvalue2").unwrap();
db2.flush().unwrap();
db2.delete(b"testkey2").unwrap();
db2.flush().unwrap();
db2.compact_range(Some(b"a"), Some(b"z"));
drop(db2);
assert!(log_rcv.try_recv().is_ok());
while log_rcv.try_recv().is_ok() {}
assert!(log_rcv.try_recv().is_err());
let tmp3 = tempfile::tempdir().unwrap();
let mut opts2 = Options::default();
opts2.create_if_missing(true);
opts2.set_info_logger(logger);
let db3 = crate::DB::open(&opts2, tmp3.path()).unwrap();
drop(opts2);
db3.put(b"testkey3", b"testvalue3").unwrap();
db3.flush().unwrap();
db3.delete(b"testkey3").unwrap();
db3.flush().unwrap();
db3.compact_range(Some(b"a"), Some(b"z"));
assert!(log_rcv.try_recv().is_ok());
drop(db3);
}
}