use crocksdb_ffi::{self, DBBackupEngine, DBCFHandle, DBCompressionType, DBEnv, DBInstance,
DBPinnableSlice, DBSequentialFile, DBStatisticsHistogramType,
DBStatisticsTickerType, DBWriteBatch};
use libc::{self, c_int, c_void, iovec, size_t};
use rocksdb_options::{ColumnFamilyDescriptor, ColumnFamilyOptions, CompactOptions, DBOptions,
EnvOptions, FlushOptions, HistogramData, IngestExternalFileOptions,
ReadOptions, RestoreOptions, UnsafeSnap, WriteOptions};
use std::{fs, ptr, slice};
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::ffi::{CStr, CString};
use std::fmt::{self, Debug, Formatter};
use std::io;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::str::from_utf8;
use table_properties::TablePropertiesCollection;
pub struct CFHandle {
inner: *mut DBCFHandle,
}
impl CFHandle {
pub fn id(&self) -> u32 {
unsafe { crocksdb_ffi::crocksdb_column_family_handle_id(self.inner) }
}
}
impl Drop for CFHandle {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_column_family_handle_destroy(self.inner);
}
}
}
fn ensure_default_cf_exists<'a>(list: &mut Vec<ColumnFamilyDescriptor<'a>>) {
let contains = list.iter().any(|ref cf| cf.is_default());
if !contains {
list.push(ColumnFamilyDescriptor::default());
}
}
fn split_descriptors<'a>(
list: Vec<ColumnFamilyDescriptor<'a>>,
) -> (Vec<&'a str>, Vec<ColumnFamilyOptions>) {
let mut v1 = Vec::with_capacity(list.len());
let mut v2 = Vec::with_capacity(list.len());
for d in list {
v1.push(d.name);
v2.push(d.options);
}
(v1, v2)
}
fn build_cstring_list(str_list: &[&str]) -> Vec<CString> {
str_list
.into_iter()
.map(|s| CString::new(s.as_bytes()).unwrap())
.collect()
}
pub struct DB {
inner: *mut DBInstance,
cfs: BTreeMap<String, CFHandle>,
path: String,
opts: DBOptions,
_cf_opts: Vec<ColumnFamilyOptions>,
}
impl Debug for DB {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Db [path={}]", self.path)
}
}
unsafe impl Send for DB {}
unsafe impl Sync for DB {}
pub struct WriteBatch {
inner: *mut DBWriteBatch,
}
unsafe impl Send for WriteBatch {}
pub struct Snapshot<D: Deref<Target = DB>> {
db: D,
snap: UnsafeSnap,
}
pub struct DBIterator<D: Deref<Target = DB>> {
inner: UnsafeIter,
_db: D,
}
pub enum SeekKey<'a> {
Start,
End,
Key(&'a [u8]),
}
impl<'a> From<&'a [u8]> for SeekKey<'a> {
fn from(bs: &'a [u8]) -> SeekKey {
SeekKey::Key(bs)
}
}
impl<D: Deref<Target = DB>> DBIterator<D> {
pub fn new(db: D, readopts: ReadOptions) -> DBIterator<D> {
let inner = UnsafeIter::new(&db, readopts);
DBIterator {
_db: db,
inner,
}
}
pub fn seek(&mut self, key: SeekKey) -> bool {
match key {
SeekKey::Start => self.inner.seek_to_first(),
SeekKey::End => self.inner.seek_to_last(),
SeekKey::Key(key) => self.inner.seek(iovec {
iov_base: key.as_ptr() as *mut c_void,
iov_len: key.len() as size_t,
}),
}
}
pub fn seek_for_prev(&mut self, key: SeekKey) -> bool {
match key {
SeekKey::Start => self.inner.seek_to_first(),
SeekKey::End => self.inner.seek_to_last(),
SeekKey::Key(key) => self.inner.seek_for_prev(iovec {
iov_base: key.as_ptr() as *mut c_void,
iov_len: key.len() as size_t,
}),
}
}
pub fn prev(&mut self) -> bool {
unsafe {
crocksdb_ffi::crocksdb_iter_prev(self.inner.as_mut_ptr());
}
self.valid()
}
pub fn next(&mut self) -> bool {
unsafe {
crocksdb_ffi::crocksdb_iter_next(self.inner.as_mut_ptr());
}
self.valid()
}
pub fn key(&self) -> &[u8] {
assert!(self.valid());
unsafe {
let iov = self.inner.iter_key();
slice::from_raw_parts(iov.iov_base as *const u8, iov.iov_len as usize)
}
}
pub fn value(&self) -> &[u8] {
assert!(self.valid());
unsafe {
let iov = self.inner.iter_value();
slice::from_raw_parts(iov.iov_base as *const u8, iov.iov_len as usize)
}
}
pub fn kv(&self) -> Option<(Vec<u8>, Vec<u8>)> {
if self.valid() {
Some((self.key().to_vec(), self.value().to_vec()))
} else {
None
}
}
pub fn valid(&self) -> bool {
unsafe { self.inner.valid() }
}
pub fn new_cf(db: D, cf_handle: &CFHandle, readopts: ReadOptions) -> DBIterator<D> {
let inner = UnsafeIter::new_cf(&db, cf_handle, readopts);
DBIterator {
_db: db,
inner,
}
}
}
pub type Kv = (Vec<u8>, Vec<u8>);
impl<'b, D: Deref<Target = DB>> Iterator for &'b mut DBIterator<D> {
type Item = Kv;
fn next(&mut self) -> Option<Kv> {
let kv = self.kv();
if kv.is_some() {
DBIterator::next(self);
}
kv
}
}
unsafe impl<D: Deref<Target = DB> + Send> Send for DBIterator<D> {}
unsafe impl<D: Deref<Target = DB> + Send + Sync> Send for Snapshot<D> {}
unsafe impl<D: Deref<Target = DB> + Send + Sync> Sync for Snapshot<D> {}
impl<D: Deref<Target = DB> + Clone> Snapshot<D> {
pub fn iter_opt_clone(&self, mut opt: ReadOptions) -> DBIterator<D> {
unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(self.db.clone(), opt)
}
}
impl<D: Deref<Target = DB>> Snapshot<D> {
pub fn new(db: D) -> Snapshot<D> {
unsafe {
Snapshot {
snap: db.unsafe_snap(),
db: db,
}
}
}
pub fn iter(&self) -> DBIterator<&DB> {
let readopts = ReadOptions::new();
self.iter_opt(readopts)
}
pub fn iter_opt(&self, mut opt: ReadOptions) -> DBIterator<&DB> {
unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(&self.db, opt)
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
let mut readopts = ReadOptions::new();
unsafe {
readopts.set_snapshot(&self.snap);
}
self.db.get_opt(key, &readopts)
}
pub fn get_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<Option<DBVector>, String> {
let mut readopts = ReadOptions::new();
unsafe {
readopts.set_snapshot(&self.snap);
}
self.db.get_cf_opt(cf, key, &readopts)
}
}
impl<D: Deref<Target = DB>> Drop for Snapshot<D> {
fn drop(&mut self) {
unsafe { self.db.release_snap(&self.snap) }
}
}
pub trait Writable {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn put_cf(&self, cf: &CFHandle, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String>;
fn merge_cf(&self, cf: &CFHandle, key: &[u8], value: &[u8]) -> Result<(), String>;
fn delete(&self, key: &[u8]) -> Result<(), String>;
fn delete_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<(), String>;
fn single_delete(&self, key: &[u8]) -> Result<(), String>;
fn single_delete_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<(), String>;
fn delete_range(&self, begin_key: &[u8], end_key: &[u8]) -> Result<(), String>;
fn delete_range_cf(
&self,
cf: &CFHandle,
begin_key: &[u8],
end_key: &[u8],
) -> Result<(), String>;
}
pub struct Range<'a> {
start_key: &'a [u8],
end_key: &'a [u8],
}
impl<'a> Range<'a> {
pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
assert!(start_key <= end_key);
Range {
start_key: start_key,
end_key: end_key,
}
}
}
pub struct KeyVersion {
pub key: String,
pub value: String,
pub seq: u64,
pub value_type: c_int,
}
impl DB {
pub fn open_default(path: &str) -> Result<DB, String> {
let mut opts = DBOptions::new();
opts.create_if_missing(true);
DB::open(opts, path)
}
pub fn open(opts: DBOptions, path: &str) -> Result<DB, String> {
let cfds: Vec<&str> = vec![];
DB::open_cf(opts, path, cfds)
}
pub fn open_cf<'a, T>(opts: DBOptions, path: &str, cfds: Vec<T>) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
DB::open_cf_internal(opts, path, cfds, None)
}
pub fn open_for_read_only(
opts: DBOptions,
path: &str,
error_if_log_file_exist: bool,
) -> Result<DB, String> {
let cfds: Vec<&str> = vec![];
DB::open_cf_for_read_only(opts, path, cfds, error_if_log_file_exist)
}
pub fn open_cf_for_read_only<'a, T>(
opts: DBOptions,
path: &str,
cfds: Vec<T>,
error_if_log_file_exist: bool,
) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
DB::open_cf_internal(opts, path, cfds, Some(error_if_log_file_exist))
}
fn open_cf_internal<'a, T>(
opts: DBOptions,
path: &str,
cfds: Vec<T>,
error_if_log_file_exist: Option<bool>,
) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
const ERR_CONVERT_PATH: &str = "Failed to convert path to CString when opening rocksdb";
const ERR_NULL_DB_ONINIT: &str = "Could not initialize database";
const ERR_NULL_CF_HANDLE: &str = "Received null column family handle from DB";
let cpath = CString::new(path.as_bytes())
.map_err(|_| ERR_CONVERT_PATH.to_owned())?;
fs::create_dir_all(&Path::new(path)).map_err(|e| {
format!(
"Failed to create rocksdb directory: \
src/rocksdb.rs: \
{:?}",
e
)
})?;
let mut descs = cfds.into_iter().map(|t| t.into()).collect();
ensure_default_cf_exists(&mut descs);
let (names, options) = split_descriptors(descs);
let cstrings = build_cstring_list(&names);
let cf_names: Vec<*const _> = cstrings.iter().map(|cs| cs.as_ptr()).collect();
let cf_handles: Vec<_> = vec![ptr::null_mut(); cf_names.len()];
let cf_options: Vec<_> = options
.iter()
.map(|x| x.inner as *const crocksdb_ffi::Options)
.collect();
let db = {
let db_options = opts.inner;
let db_path = cpath.as_ptr();
let db_cfs_count = cf_names.len() as c_int;
let db_cf_ptrs = cf_names.as_ptr();
let db_cf_opts = cf_options.as_ptr();
let db_cf_handles = cf_handles.as_ptr();
if let Some(flag) = error_if_log_file_exist {
unsafe {
ffi_try!(crocksdb_open_for_read_only_column_families(
db_options,
db_path,
db_cfs_count,
db_cf_ptrs,
db_cf_opts,
db_cf_handles,
flag
))
}
} else {
unsafe {
ffi_try!(crocksdb_open_column_families(
db_options,
db_path,
db_cfs_count,
db_cf_ptrs,
db_cf_opts,
db_cf_handles
))
}
}
};
if cf_handles.iter().any(|h| h.is_null()) {
return Err(ERR_NULL_CF_HANDLE.to_owned());
}
if db.is_null() {
return Err(ERR_NULL_DB_ONINIT.to_owned());
}
let cfs = names
.into_iter()
.zip(cf_handles)
.map(|(s, h)| (s.to_owned(), CFHandle { inner: h }))
.collect();
Ok(DB {
inner: db,
cfs: cfs,
path: path.to_owned(),
opts: opts,
_cf_opts: options,
})
}
pub fn destroy(opts: &DBOptions, path: &str) -> Result<(), String> {
let cpath = CString::new(path.as_bytes()).unwrap();
unsafe {
ffi_try!(crocksdb_destroy_db(opts.inner, cpath.as_ptr()));
}
Ok(())
}
pub fn repair(opts: DBOptions, path: &str) -> Result<(), String> {
let cpath = CString::new(path.as_bytes()).unwrap();
unsafe {
ffi_try!(crocksdb_repair_db(opts.inner, cpath.as_ptr()));
}
Ok(())
}
pub fn list_column_families(opts: &DBOptions, path: &str) -> Result<Vec<String>, String> {
let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(
"Failed to convert path to CString when list \
column families"
.to_owned(),
)
}
};
let mut cfs: Vec<String> = vec![];
unsafe {
let mut lencf: size_t = 0;
let list = ffi_try!(crocksdb_list_column_families(
opts.inner,
cpath.as_ptr(),
&mut lencf
));
let list_cfs = slice::from_raw_parts(list, lencf);
for &cf_name in list_cfs {
let cf = match CStr::from_ptr(cf_name).to_owned().into_string() {
Ok(s) => s,
Err(e) => return Err(format!("invalid utf8 bytes: {:?}", e)),
};
cfs.push(cf);
}
crocksdb_ffi::crocksdb_list_column_families_destroy(list, lencf);
}
Ok(cfs)
}
pub fn pause_bg_work(&self) {
unsafe {
crocksdb_ffi::crocksdb_pause_bg_work(self.inner);
}
}
pub fn continue_bg_work(&self) {
unsafe {
crocksdb_ffi::crocksdb_continue_bg_work(self.inner);
}
}
pub fn path(&self) -> &str {
&self.path
}
pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_write(self.inner, writeopts.inner, batch.inner));
}
Ok(())
}
pub fn write(&self, batch: WriteBatch) -> Result<(), String> {
self.write_opt(batch, &WriteOptions::new())
}
pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), String> {
let mut wo = WriteOptions::new();
wo.disable_wal(true);
self.write_opt(batch, &wo)
}
pub fn get_opt(&self, key: &[u8], readopts: &ReadOptions) -> Result<Option<DBVector>, String> {
unsafe {
let val = ffi_try!(crocksdb_get_pinned(
self.inner,
readopts.get_inner(),
key.as_ptr(),
key.len() as size_t
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBVector::from_pinned_slice(val)))
}
}
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, String> {
self.get_opt(key, &ReadOptions::new())
}
pub fn get_cf_opt(
&self,
cf: &CFHandle,
key: &[u8],
readopts: &ReadOptions,
) -> Result<Option<DBVector>, String> {
unsafe {
let val = ffi_try!(crocksdb_get_pinned_cf(
self.inner,
readopts.get_inner(),
cf.inner,
key.as_ptr(),
key.len() as size_t
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBVector::from_pinned_slice(val)))
}
}
}
pub fn get_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<Option<DBVector>, String> {
self.get_cf_opt(cf, key, &ReadOptions::new())
}
pub fn create_cf<'a, T>(&mut self, cfd: T) -> Result<&CFHandle, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
let cfd = cfd.into();
let cname = match CString::new(cfd.name.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(
"Failed to convert path to CString when opening rocksdb".to_owned(),
)
}
};
let cname_ptr = cname.as_ptr();
unsafe {
let cf_handler = ffi_try!(crocksdb_create_column_family(
self.inner,
cfd.options.inner,
cname_ptr
));
let handle = CFHandle { inner: cf_handler };
self._cf_opts.push(cfd.options);
Ok(match self.cfs.entry(cfd.name.to_owned()) {
Entry::Occupied(mut e) => {
e.insert(handle);
e.into_mut()
}
Entry::Vacant(e) => e.insert(handle),
})
}
}
pub fn drop_cf(&mut self, name: &str) -> Result<(), String> {
let cf = self.cfs.remove(name);
if cf.is_none() {
return Err(format!("Invalid column family: {}", name).clone());
}
unsafe {
ffi_try!(crocksdb_drop_column_family(self.inner, cf.unwrap().inner));
}
Ok(())
}
pub fn cf_handle(&self, name: &str) -> Option<&CFHandle> {
self.cfs.get(name)
}
pub fn cf_names(&self) -> Vec<&str> {
self.cfs.iter().map(|(k, _)| k.as_str()).collect()
}
pub fn iter(&self) -> DBIterator<&DB> {
let opts = ReadOptions::new();
self.iter_opt(opts)
}
pub fn iter_opt(&self, opt: ReadOptions) -> DBIterator<&DB> {
DBIterator::new(&self, opt)
}
pub fn iter_cf(&self, cf_handle: &CFHandle) -> DBIterator<&DB> {
let opts = ReadOptions::new();
DBIterator::new_cf(self, cf_handle, opts)
}
pub fn iter_cf_opt(&self, cf_handle: &CFHandle, opts: ReadOptions) -> DBIterator<&DB> {
DBIterator::new_cf(self, cf_handle, opts)
}
pub fn snapshot(&self) -> Snapshot<&DB> {
Snapshot::new(self)
}
pub unsafe fn unsafe_snap(&self) -> UnsafeSnap {
UnsafeSnap::new(self.inner)
}
pub unsafe fn release_snap(&self, snap: &UnsafeSnap) {
crocksdb_ffi::crocksdb_release_snapshot(self.inner, snap.get_inner())
}
pub fn put_opt(
&self,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_put(
self.inner,
writeopts.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t
));
Ok(())
}
}
pub fn put_cf_opt(
&self,
cf: &CFHandle,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_put_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t
));
Ok(())
}
}
pub fn merge_opt(
&self,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_merge(
self.inner,
writeopts.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t
));
Ok(())
}
}
fn merge_cf_opt(
&self,
cf: &CFHandle,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_merge_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t
));
Ok(())
}
}
fn delete_opt(&self, key: &[u8], writeopts: &WriteOptions) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete(
self.inner,
writeopts.inner,
key.as_ptr(),
key.len() as size_t
));
Ok(())
}
}
fn delete_cf_opt(
&self,
cf: &CFHandle,
key: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t
));
Ok(())
}
}
fn single_delete_opt(&self, key: &[u8], writeopts: &WriteOptions) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_single_delete(
self.inner,
writeopts.inner,
key.as_ptr(),
key.len() as size_t
));
Ok(())
}
}
fn single_delete_cf_opt(
&self,
cf: &CFHandle,
key: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_single_delete_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t
));
Ok(())
}
}
fn delete_range_cf_opt(
&self,
cf: &CFHandle,
begin_key: &[u8],
end_key: &[u8],
writeopts: &WriteOptions,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_range_cf(
self.inner,
writeopts.inner,
cf.inner,
begin_key.as_ptr(),
begin_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t
));
Ok(())
}
}
pub fn flush(&self, sync: bool) -> Result<(), String> {
unsafe {
let mut opts = FlushOptions::new();
opts.set_wait(sync);
ffi_try!(crocksdb_flush(self.inner, opts.inner));
Ok(())
}
}
pub fn flush_cf(&self, cf: &CFHandle, sync: bool) -> Result<(), String> {
unsafe {
let mut opts = FlushOptions::new();
opts.set_wait(sync);
ffi_try!(crocksdb_flush_cf(self.inner, cf.inner, opts.inner));
Ok(())
}
}
pub fn flush_wal(&self, sync: bool) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_flush_wal(self.inner, sync));
Ok(())
}
}
pub fn sync_wal(&self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sync_wal(self.inner));
Ok(())
}
}
pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
self.get_approximate_sizes_cfopt(None, ranges)
}
pub fn get_approximate_sizes_cf(&self, cf: &CFHandle, ranges: &[Range]) -> Vec<u64> {
self.get_approximate_sizes_cfopt(Some(cf), ranges)
}
fn get_approximate_sizes_cfopt(&self, cf: Option<&CFHandle>, ranges: &[Range]) -> Vec<u64> {
let start_keys: Vec<*const u8> = ranges.iter().map(|x| x.start_key.as_ptr()).collect();
let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
let end_keys: Vec<*const u8> = ranges.iter().map(|x| x.end_key.as_ptr()).collect();
let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
let mut sizes: Vec<u64> = vec![0; ranges.len()];
let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
ranges.len() as i32,
start_keys.as_ptr(),
start_key_lens.as_ptr(),
end_keys.as_ptr(),
end_key_lens.as_ptr(),
sizes.as_mut_ptr(),
);
match cf {
None => unsafe {
crocksdb_ffi::crocksdb_approximate_sizes(
self.inner,
n,
start_key_ptr,
start_key_len_ptr,
end_key_ptr,
end_key_len_ptr,
size_ptr,
)
},
Some(cf) => unsafe {
crocksdb_ffi::crocksdb_approximate_sizes_cf(
self.inner,
cf.inner,
n,
start_key_ptr,
start_key_len_ptr,
end_key_ptr,
end_key_len_ptr,
size_ptr,
)
},
}
sizes
}
pub fn get_approximate_memtable_stats(&self, range: &Range) -> (u64, u64) {
let (mut count, mut size) = (0, 0);
unsafe {
crocksdb_ffi::crocksdb_approximate_memtable_stats(
self.inner,
range.start_key.as_ptr(),
range.start_key.len(),
range.end_key.as_ptr(),
range.end_key.len(),
&mut count,
&mut size,
);
}
(count, size)
}
pub fn get_approximate_memtable_stats_cf(&self, cf: &CFHandle, range: &Range) -> (u64, u64) {
let (mut count, mut size) = (0, 0);
unsafe {
crocksdb_ffi::crocksdb_approximate_memtable_stats_cf(
self.inner,
cf.inner,
range.start_key.as_ptr(),
range.start_key.len(),
range.end_key.as_ptr(),
range.end_key.len(),
&mut count,
&mut size,
);
}
(count, size)
}
pub fn compact_range(&self, start_key: Option<&[u8]>, end_key: Option<&[u8]>) {
unsafe {
let (start, s_len) = start_key.map_or((ptr::null(), 0), |k| (k.as_ptr(), k.len()));
let (end, e_len) = end_key.map_or((ptr::null(), 0), |k| (k.as_ptr(), k.len()));
crocksdb_ffi::crocksdb_compact_range(self.inner, start, s_len, end, e_len);
}
}
pub fn compact_range_cf(
&self,
cf: &CFHandle,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
) {
unsafe {
let (start, s_len) = start_key.map_or((ptr::null(), 0), |k| (k.as_ptr(), k.len()));
let (end, e_len) = end_key.map_or((ptr::null(), 0), |k| (k.as_ptr(), k.len()));
crocksdb_ffi::crocksdb_compact_range_cf(self.inner, cf.inner, start, s_len, end, e_len);
}
}
pub fn compact_range_cf_opt(
&self,
cf: &CFHandle,
compact_options: &CompactOptions,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
) {
unsafe {
let (start, s_len) = start_key.map_or((ptr::null(), 0), |k| (k.as_ptr(), k.len()));
let (end, e_len) = end_key.map_or((ptr::null(), 0), |k| (k.as_ptr(), k.len()));
crocksdb_ffi::crocksdb_compact_range_cf_opt(
self.inner,
cf.inner,
compact_options.inner,
start,
s_len,
end,
e_len,
);
}
}
pub fn delete_files_in_range(
&self,
start_key: &[u8],
end_key: &[u8],
include_end: bool,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_files_in_range(
self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
Ok(())
}
}
pub fn delete_files_in_range_cf(
&self,
cf: &CFHandle,
start_key: &[u8],
end_key: &[u8],
include_end: bool,
) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_delete_files_in_range_cf(
self.inner,
cf.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t,
include_end
));
Ok(())
}
}
pub fn delete_files_in_ranges_cf(
&self,
cf: &CFHandle,
ranges: &[Range],
include_end: bool,
) -> Result<(), String> {
let start_keys: Vec<*const u8> = ranges.iter().map(|x| x.start_key.as_ptr()).collect();
let start_keys_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
let limit_keys: Vec<*const u8> = ranges.iter().map(|x| x.end_key.as_ptr()).collect();
let limit_keys_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
unsafe {
ffi_try!(crocksdb_delete_files_in_ranges_cf(
self.inner,
cf.inner,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr(),
ranges.len(),
include_end
));
}
Ok(())
}
pub fn get_property_value(&self, name: &str) -> Option<String> {
self.get_property_value_cf_opt(None, name)
}
pub fn get_property_value_cf(&self, cf: &CFHandle, name: &str) -> Option<String> {
self.get_property_value_cf_opt(Some(cf), name)
}
pub fn get_property_int(&self, name: &str) -> Option<u64> {
self.get_property_int_cf_opt(None, name)
}
pub fn get_property_int_cf(&self, cf: &CFHandle, name: &str) -> Option<u64> {
self.get_property_int_cf_opt(Some(cf), name)
}
fn get_property_value_cf_opt(&self, cf: Option<&CFHandle>, name: &str) -> Option<String> {
unsafe {
let prop_name = CString::new(name).unwrap();
let value = match cf {
None => crocksdb_ffi::crocksdb_property_value(self.inner, prop_name.as_ptr()),
Some(cf) => crocksdb_ffi::crocksdb_property_value_cf(
self.inner,
cf.inner,
prop_name.as_ptr(),
),
};
if value.is_null() {
return None;
}
let s = CStr::from_ptr(value).to_str().unwrap().to_owned();
libc::free(value as *mut c_void);
Some(s)
}
}
fn get_property_int_cf_opt(&self, cf: Option<&CFHandle>, name: &str) -> Option<u64> {
if let Some(value) = self.get_property_value_cf_opt(cf, name) {
if let Ok(num) = value.as_str().parse::<u64>() {
return Some(num);
}
}
None
}
pub fn get_statistics(&self) -> Option<String> {
self.opts.get_statistics()
}
pub fn reset_statistics(&self) {
self.opts.reset_statistics();
}
pub fn get_statistics_ticker_count(&self, ticker_type: DBStatisticsTickerType) -> u64 {
self.opts.get_statistics_ticker_count(ticker_type)
}
pub fn get_and_reset_statistics_ticker_count(
&self,
ticker_type: DBStatisticsTickerType,
) -> u64 {
self.opts.get_and_reset_statistics_ticker_count(ticker_type)
}
pub fn get_statistics_histogram_string(
&self,
hist_type: DBStatisticsHistogramType,
) -> Option<String> {
self.opts.get_statistics_histogram_string(hist_type)
}
pub fn get_statistics_histogram(
&self,
hist_type: DBStatisticsHistogramType,
) -> Option<HistogramData> {
self.opts.get_statistics_histogram(hist_type)
}
pub fn get_options(&self) -> ColumnFamilyOptions {
let cf = self.cf_handle("default").unwrap();
unsafe {
let inner = crocksdb_ffi::crocksdb_get_options_cf(self.inner, cf.inner);
ColumnFamilyOptions::from_raw(inner)
}
}
pub fn get_options_cf(&self, cf: &CFHandle) -> ColumnFamilyOptions {
unsafe {
let inner = crocksdb_ffi::crocksdb_get_options_cf(self.inner, cf.inner);
ColumnFamilyOptions::from_raw(inner)
}
}
pub fn ingest_external_file(
&self,
opt: &IngestExternalFileOptions,
files: &[&str],
) -> Result<(), String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
unsafe {
ffi_try!(crocksdb_ingest_external_file(
self.inner,
c_files_ptrs.as_ptr(),
c_files.len(),
opt.inner
));
}
Ok(())
}
pub fn ingest_external_file_cf(
&self,
cf: &CFHandle,
opt: &IngestExternalFileOptions,
files: &[&str],
) -> Result<(), String> {
let c_files = build_cstring_list(files);
let c_files_ptrs: Vec<*const _> = c_files.iter().map(|s| s.as_ptr()).collect();
unsafe {
ffi_try!(crocksdb_ingest_external_file_cf(
self.inner,
cf.inner,
c_files_ptrs.as_ptr(),
c_files_ptrs.len(),
opt.inner
));
}
Ok(())
}
pub fn backup_at(&self, path: &str) -> Result<BackupEngine, String> {
let backup_engine = BackupEngine::open(DBOptions::new(), path).unwrap();
unsafe {
ffi_try!(crocksdb_backup_engine_create_new_backup(
backup_engine.inner,
self.inner
))
}
Ok(backup_engine)
}
pub fn restore_from<'a, T>(
backup_engine: &BackupEngine,
restore_db_path: &str,
restore_wal_path: &str,
ropts: &RestoreOptions,
db_opts: DBOptions,
cfds: Vec<T>,
) -> Result<DB, String>
where
T: Into<ColumnFamilyDescriptor<'a>>,
{
let c_db_path = match CString::new(restore_db_path.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(
"Failed to convert restore_db_path to CString when restoring rocksdb"
.to_owned(),
)
}
};
let c_wal_path = match CString::new(restore_wal_path.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(
"Failed to convert restore_wal_path to CString when restoring rocksdb"
.to_owned(),
)
}
};
unsafe {
ffi_try!(crocksdb_backup_engine_restore_db_from_latest_backup(
backup_engine.inner,
c_db_path.as_ptr(),
c_wal_path.as_ptr(),
ropts.inner
))
};
DB::open_cf(db_opts, restore_db_path, cfds)
}
pub fn get_block_cache_usage(&self) -> u64 {
self.get_options().get_block_cache_usage()
}
pub fn get_block_cache_usage_cf(&self, cf: &CFHandle) -> u64 {
self.get_options_cf(cf).get_block_cache_usage()
}
pub fn get_properties_of_all_tables(&self) -> Result<TablePropertiesCollection, String> {
unsafe {
let props = ffi_try!(crocksdb_get_properties_of_all_tables(self.inner));
Ok(TablePropertiesCollection::from_raw(props))
}
}
pub fn get_properties_of_all_tables_cf(
&self,
cf: &CFHandle,
) -> Result<TablePropertiesCollection, String> {
unsafe {
let props = ffi_try!(crocksdb_get_properties_of_all_tables_cf(
self.inner,
cf.inner
));
Ok(TablePropertiesCollection::from_raw(props))
}
}
pub fn get_properties_of_tables_in_range(
&self,
cf: &CFHandle,
ranges: &[Range],
) -> Result<TablePropertiesCollection, String> {
let start_keys: Vec<*const u8> = ranges.iter().map(|x| x.start_key.as_ptr()).collect();
let start_keys_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
let limit_keys: Vec<*const u8> = ranges.iter().map(|x| x.end_key.as_ptr()).collect();
let limit_keys_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
unsafe {
let props = ffi_try!(crocksdb_get_properties_of_tables_in_range(
self.inner,
cf.inner,
ranges.len() as i32,
start_keys.as_ptr(),
start_keys_lens.as_ptr(),
limit_keys.as_ptr(),
limit_keys_lens.as_ptr()
));
Ok(TablePropertiesCollection::from_raw(props))
}
}
pub fn get_all_key_versions(
&self,
start_key: &[u8],
end_key: &[u8],
) -> Result<Vec<KeyVersion>, String> {
unsafe {
let kvs = ffi_try!(crocksdb_get_all_key_versions(
self.inner,
start_key.as_ptr(),
start_key.len() as size_t,
end_key.as_ptr(),
end_key.len() as size_t
));
let size = crocksdb_ffi::crocksdb_keyversions_count(kvs) as usize;
let mut key_versions = Vec::with_capacity(size);
for i in 0..size {
key_versions.push(KeyVersion {
key: CStr::from_ptr(crocksdb_ffi::crocksdb_keyversions_key(kvs, i))
.to_string_lossy()
.into_owned(),
value: CStr::from_ptr(crocksdb_ffi::crocksdb_keyversions_value(kvs, i))
.to_string_lossy()
.into_owned(),
seq: crocksdb_ffi::crocksdb_keyversions_seq(kvs, i),
value_type: crocksdb_ffi::crocksdb_keyversions_type(kvs, i),
})
}
crocksdb_ffi::crocksdb_keyversions_destroy(kvs);
Ok(key_versions)
}
}
}
impl Writable for DB {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
self.put_opt(key, value, &WriteOptions::new())
}
fn put_cf(&self, cf: &CFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
self.put_cf_opt(cf, key, value, &WriteOptions::new())
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
self.merge_opt(key, value, &WriteOptions::new())
}
fn merge_cf(&self, cf: &CFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
self.merge_cf_opt(cf, key, value, &WriteOptions::new())
}
fn delete(&self, key: &[u8]) -> Result<(), String> {
self.delete_opt(key, &WriteOptions::new())
}
fn delete_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<(), String> {
self.delete_cf_opt(cf, key, &WriteOptions::new())
}
fn single_delete(&self, key: &[u8]) -> Result<(), String> {
self.single_delete_opt(key, &WriteOptions::new())
}
fn single_delete_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<(), String> {
self.single_delete_cf_opt(cf, key, &WriteOptions::new())
}
fn delete_range(&self, begin_key: &[u8], end_key: &[u8]) -> Result<(), String> {
let handle = self.cf_handle("default").unwrap();
self.delete_range_cf(handle, begin_key, end_key)
}
fn delete_range_cf(
&self,
cf: &CFHandle,
begin_key: &[u8],
end_key: &[u8],
) -> Result<(), String> {
self.delete_range_cf_opt(cf, begin_key, end_key, &WriteOptions::new())
}
}
impl Default for WriteBatch {
fn default() -> WriteBatch {
WriteBatch {
inner: unsafe { crocksdb_ffi::crocksdb_writebatch_create() },
}
}
}
impl WriteBatch {
pub fn new() -> WriteBatch {
WriteBatch::default()
}
pub fn with_capacity(cap: usize) -> WriteBatch {
WriteBatch {
inner: unsafe { crocksdb_ffi::crocksdb_writebatch_create_with_capacity(cap) },
}
}
pub fn count(&self) -> usize {
unsafe { crocksdb_ffi::crocksdb_writebatch_count(self.inner) as usize }
}
pub fn is_empty(&self) -> bool {
self.count() == 0
}
pub fn data_size(&self) -> usize {
unsafe {
let mut data_size: usize = 0;
let _ = crocksdb_ffi::crocksdb_writebatch_data(self.inner, &mut data_size);
return data_size;
}
}
pub fn clear(&self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_clear(self.inner);
}
}
pub fn set_save_point(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_writebatch_set_save_point(self.inner);
}
}
pub fn rollback_to_save_point(&mut self) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_writebatch_rollback_to_save_point(self.inner));
}
Ok(())
}
}
impl Drop for WriteBatch {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_writebatch_destroy(self.inner) }
}
}
impl Drop for DB {
fn drop(&mut self) {
unsafe {
self.cfs.clear();
crocksdb_ffi::crocksdb_close(self.inner);
}
}
}
impl Writable for WriteBatch {
fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_put(
self.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
);
Ok(())
}
}
fn put_cf(&self, cf: &CFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_put_cf(
self.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
);
Ok(())
}
}
fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_merge(
self.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
);
Ok(())
}
}
fn merge_cf(&self, cf: &CFHandle, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_merge_cf(
self.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t,
value.as_ptr(),
value.len() as size_t,
);
Ok(())
}
}
fn delete(&self, key: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_delete(self.inner, key.as_ptr(), key.len() as size_t);
Ok(())
}
}
fn delete_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_delete_cf(
self.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t,
);
Ok(())
}
}
fn single_delete(&self, key: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_single_delete(
self.inner,
key.as_ptr(),
key.len() as size_t,
);
Ok(())
}
}
fn single_delete_cf(&self, cf: &CFHandle, key: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_single_delete_cf(
self.inner,
cf.inner,
key.as_ptr(),
key.len() as size_t,
);
Ok(())
}
}
fn delete_range(&self, begin_key: &[u8], end_key: &[u8]) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_delete_range(
self.inner,
begin_key.as_ptr(),
begin_key.len(),
end_key.as_ptr(),
end_key.len(),
);
Ok(())
}
}
fn delete_range_cf(
&self,
cf: &CFHandle,
begin_key: &[u8],
end_key: &[u8],
) -> Result<(), String> {
unsafe {
crocksdb_ffi::crocksdb_writebatch_delete_range_cf(
self.inner,
cf.inner,
begin_key.as_ptr(),
begin_key.len(),
end_key.as_ptr(),
end_key.len(),
);
Ok(())
}
}
}
pub struct DBVector {
pinned_slice: *mut DBPinnableSlice,
}
unsafe impl Send for DBVector {}
unsafe impl Sync for DBVector {}
impl Debug for DBVector {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "{:?}", &**self)
}
}
impl<'a> PartialEq<&'a [u8]> for DBVector {
fn eq(&self, rhs: &&[u8]) -> bool {
**rhs == **self
}
}
impl Deref for DBVector {
type Target = [u8];
fn deref(&self) -> &[u8] {
let mut val_len: size_t = 0;
let val_len_ptr = &mut val_len as *mut size_t;
unsafe {
let val = crocksdb_ffi::crocksdb_pinnableslice_value(self.pinned_slice, val_len_ptr);
slice::from_raw_parts(val, val_len)
}
}
}
impl Drop for DBVector {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_pinnableslice_destroy(self.pinned_slice);
}
}
}
impl DBVector {
pub fn from_pinned_slice(s: *mut DBPinnableSlice) -> DBVector {
DBVector { pinned_slice: s }
}
pub fn to_utf8(&self) -> Option<&str> {
from_utf8(self.deref()).ok()
}
}
pub struct BackupEngine {
inner: *mut DBBackupEngine,
}
impl BackupEngine {
pub fn open(opts: DBOptions, path: &str) -> Result<BackupEngine, String> {
let cpath = match CString::new(path.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(
"Failed to convert path to CString when opening rocksdb backup engine"
.to_owned(),
)
}
};
if let Err(e) = fs::create_dir_all(path) {
return Err(format!(
"Failed to create rocksdb backup directory: {:?}",
e
));
}
let backup_engine =
unsafe { ffi_try!(crocksdb_backup_engine_open(opts.inner, cpath.as_ptr())) };
Ok(BackupEngine {
inner: backup_engine,
})
}
}
impl Drop for BackupEngine {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_backup_engine_close(self.inner);
}
}
}
pub struct SstFileWriter {
inner: *mut crocksdb_ffi::SstFileWriter,
_env_opt: EnvOptions,
_opt: ColumnFamilyOptions,
}
unsafe impl Send for SstFileWriter {}
impl SstFileWriter {
pub fn new(env_opt: EnvOptions, opt: ColumnFamilyOptions) -> SstFileWriter {
unsafe {
SstFileWriter {
inner: crocksdb_ffi::crocksdb_sstfilewriter_create(env_opt.inner, opt.inner),
_env_opt: env_opt,
_opt: opt,
}
}
}
pub fn new_cf(env_opt: EnvOptions, opt: ColumnFamilyOptions, cf: &CFHandle) -> SstFileWriter {
unsafe {
SstFileWriter {
inner: crocksdb_ffi::crocksdb_sstfilewriter_create_cf(
env_opt.inner,
opt.inner,
cf.inner,
),
_env_opt: env_opt,
_opt: opt,
}
}
}
pub fn open(&mut self, name: &str) -> Result<(), String> {
let path = match CString::new(name.to_owned()) {
Err(e) => return Err(format!("invalid path {}: {:?}", name, e)),
Ok(p) => p,
};
unsafe {
Ok(ffi_try!(
crocksdb_sstfilewriter_open(self.inner, path.as_ptr())
))
}
}
pub fn put(&mut self, key: &[u8], val: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_put(
self.inner,
key.as_ptr(),
key.len(),
val.as_ptr(),
val.len()
));
Ok(())
}
}
pub fn merge(&mut self, key: &[u8], val: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_merge(
self.inner,
key.as_ptr(),
key.len(),
val.as_ptr(),
val.len()
));
Ok(())
}
}
pub fn delete(&mut self, key: &[u8]) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sstfilewriter_delete(
self.inner,
key.as_ptr(),
key.len()
));
Ok(())
}
}
pub fn finish(&mut self) -> Result<ExternalSstFileInfo, String> {
let info = ExternalSstFileInfo::new();
unsafe {
ffi_try!(crocksdb_sstfilewriter_finish(self.inner, info.inner));
}
Ok(info)
}
pub fn file_size(&mut self) -> u64 {
unsafe { crocksdb_ffi::crocksdb_sstfilewriter_file_size(self.inner) as u64 }
}
}
impl Drop for SstFileWriter {
fn drop(&mut self) {
unsafe { crocksdb_ffi::crocksdb_sstfilewriter_destroy(self.inner) }
}
}
pub struct ExternalSstFileInfo {
inner: *mut crocksdb_ffi::ExternalSstFileInfo,
}
impl ExternalSstFileInfo {
pub fn new() -> ExternalSstFileInfo {
unsafe {
ExternalSstFileInfo {
inner: crocksdb_ffi::crocksdb_externalsstfileinfo_create(),
}
}
}
pub fn file_path(&self) -> PathBuf {
let mut len: size_t = 0;
unsafe {
let ptr = crocksdb_ffi::crocksdb_externalsstfileinfo_file_path(self.inner, &mut len);
let bytes = slice::from_raw_parts(ptr, len as usize);
PathBuf::from(String::from_utf8(bytes.to_owned()).unwrap())
}
}
pub fn smallest_key(&self) -> &[u8] {
let mut len: size_t = 0;
unsafe {
let ptr = crocksdb_ffi::crocksdb_externalsstfileinfo_smallest_key(self.inner, &mut len);
slice::from_raw_parts(ptr, len as usize)
}
}
pub fn largest_key(&self) -> &[u8] {
let mut len: size_t = 0;
unsafe {
let ptr = crocksdb_ffi::crocksdb_externalsstfileinfo_largest_key(self.inner, &mut len);
slice::from_raw_parts(ptr, len as usize)
}
}
pub fn sequence_number(&self) -> u64 {
unsafe { crocksdb_ffi::crocksdb_externalsstfileinfo_sequence_number(self.inner) as u64 }
}
pub fn file_size(&self) -> u64 {
unsafe { crocksdb_ffi::crocksdb_externalsstfileinfo_file_size(self.inner) as u64 }
}
pub fn num_entries(&self) -> u64 {
unsafe { crocksdb_ffi::crocksdb_externalsstfileinfo_num_entries(self.inner) as u64 }
}
}
impl Drop for ExternalSstFileInfo {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_externalsstfileinfo_destroy(self.inner);
}
}
}
pub fn supported_compression() -> Vec<DBCompressionType> {
unsafe {
let size = crocksdb_ffi::crocksdb_get_supported_compression_number() as usize;
let mut v: Vec<DBCompressionType> = Vec::with_capacity(size);
let pv = v.as_mut_ptr();
crocksdb_ffi::crocksdb_get_supported_compression(pv, size as size_t);
v.set_len(size);
v
}
}
pub struct Env {
pub inner: *mut DBEnv,
}
unsafe impl Send for Env {}
unsafe impl Sync for Env {}
impl Default for Env {
fn default() -> Env {
unsafe {
Env {
inner: crocksdb_ffi::crocksdb_create_default_env(),
}
}
}
}
impl Env {
pub fn new_mem() -> Env {
unsafe {
Env {
inner: crocksdb_ffi::crocksdb_create_mem_env(),
}
}
}
pub fn new_sequential_file(
&self,
path: &str,
opts: EnvOptions,
) -> Result<SequentialFile, String> {
unsafe {
let file_path = CString::new(path).unwrap();
let file = ffi_try!(crocksdb_sequential_file_create(
self.inner,
file_path.as_ptr(),
opts.inner
));
Ok(SequentialFile::new(file))
}
}
pub fn file_exists(&self, path: &str) -> Result<(), String> {
unsafe {
let file_path = CString::new(path).unwrap();
ffi_try!(crocksdb_env_file_exists(self.inner, file_path.as_ptr()));
Ok(())
}
}
pub fn delete_file(&self, path: &str) -> Result<(), String> {
unsafe {
let file_path = CString::new(path).unwrap();
ffi_try!(crocksdb_env_delete_file(self.inner, file_path.as_ptr()));
Ok(())
}
}
}
impl Drop for Env {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_env_destroy(self.inner);
}
}
}
pub struct SequentialFile {
inner: *mut DBSequentialFile,
}
impl SequentialFile {
fn new(inner: *mut DBSequentialFile) -> SequentialFile {
SequentialFile { inner: inner }
}
pub fn skip(&mut self, n: usize) -> Result<(), String> {
unsafe {
ffi_try!(crocksdb_sequential_file_skip(self.inner, n as size_t));
Ok(())
}
}
}
impl io::Read for SequentialFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
unsafe {
let mut err = ptr::null_mut();
let size = crocksdb_ffi::crocksdb_sequential_file_read(
self.inner,
buf.len() as size_t,
buf.as_mut_ptr(),
&mut err,
);
if !err.is_null() {
return Err(io::Error::new(
io::ErrorKind::Other,
crocksdb_ffi::error_message(err),
));
}
Ok(size as usize)
}
}
}
impl Drop for SequentialFile {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_sequential_file_destroy(self.inner);
}
}
}
pub fn set_external_sst_file_global_seq_no(
db: &DB,
cf: &CFHandle,
file: &str,
seq_no: u64,
) -> Result<u64, String> {
let cfile = CString::new(file).unwrap();
unsafe {
let pre_seq_no = ffi_try!(crocksdb_set_external_sst_file_global_seq_no(
db.inner,
cf.inner,
cfile.as_ptr(),
seq_no
));
Ok(pre_seq_no)
}
}
pub struct UnsafeIter {
inner: *mut crocksdb_ffi::DBIterator,
item_ready: bool,
_readopts: ReadOptions,
}
impl UnsafeIter {
unsafe fn iter_get<F>(&self, mut f: F) -> iovec
where
F: FnMut(*const crocksdb_ffi::DBIterator, *mut size_t) -> *mut u8,
{
let mut iov_len: size_t = 0;
let iov_base = f(self.inner, &mut iov_len as *mut size_t) as *mut c_void;
iovec { iov_base, iov_len }
}
pub unsafe fn iter_key(&self) -> iovec {
self.iter_get(|iter, len| crocksdb_ffi::crocksdb_iter_key(iter, len))
}
pub unsafe fn iter_value(&self) -> iovec {
self.iter_get(|iter, len| crocksdb_ffi::crocksdb_iter_value(iter, len))
}
unsafe fn check_item_ready(&mut self) -> bool {
self.item_ready = self.valid();
self.item_ready
}
pub fn seek_to_first(&mut self) -> bool {
unsafe {
crocksdb_ffi::crocksdb_iter_seek_to_first(self.inner);
self.check_item_ready()
}
}
pub fn seek_to_last(&mut self) -> bool {
unsafe {
crocksdb_ffi::crocksdb_iter_seek_to_last(self.inner);
self.check_item_ready()
}
}
pub fn seek(&mut self, key: iovec) -> bool {
unsafe {
crocksdb_ffi::crocksdb_iter_seek(self.inner, key.iov_base as *const u8, key.iov_len);
self.check_item_ready()
}
}
pub fn seek_for_prev(&mut self, key: iovec) -> bool {
unsafe {
crocksdb_ffi::crocksdb_iter_seek_for_prev(
self.inner,
key.iov_base as *const u8,
key.iov_len,
);
self.check_item_ready()
}
}
pub fn new(db: &DB, readopts: ReadOptions) -> Self {
let inner = unsafe {
let opts_inner = readopts.get_inner();
crocksdb_ffi::crocksdb_create_iterator(db.inner, opts_inner)
};
Self {
inner,
item_ready: false,
_readopts: readopts,
}
}
pub fn new_cf(db: &DB, cf_handle: &CFHandle, readopts: ReadOptions) -> Self {
let inner = unsafe {
let opts_inner = readopts.get_inner();
crocksdb_ffi::crocksdb_create_iterator_cf(db.inner, opts_inner, cf_handle.inner)
};
Self {
inner,
item_ready: false,
_readopts: readopts,
}
}
pub unsafe fn valid(&self) -> bool {
crocksdb_ffi::crocksdb_iter_valid(self.inner)
}
pub fn as_ptr(&self) -> *const crocksdb_ffi::DBIterator {
self.inner as *const crocksdb_ffi::DBIterator
}
pub(crate) fn as_mut_ptr(&self) -> *mut crocksdb_ffi::DBIterator {
self.inner
}
}
impl Iterator for UnsafeIter {
type Item = (iovec, iovec);
fn next(&mut self) -> Option<<Self as Iterator>::Item> {
unsafe {
if !self.item_ready && self.valid() {
crocksdb_ffi::crocksdb_iter_next(self.inner);
}
if self.valid() {
self.item_ready = false;
Some((self.iter_key(), self.iter_value()))
} else {
None
}
}
}
}
impl Drop for UnsafeIter {
fn drop(&mut self) {
unsafe {
crocksdb_ffi::crocksdb_iter_destroy(self.inner);
}
}
}
unsafe impl Send for UnsafeIter {}
#[cfg(test)]
mod test {
use super::*;
use std::fs;
use std::path::Path;
use std::str;
use std::string::String;
use std::sync::*;
use std::thread;
use tempdir::TempDir;
#[test]
fn external() {
let path = TempDir::new("_rust_rocksdb_externaltest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let r: Result<Option<DBVector>, String> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
#[allow(unused_variables)]
#[test]
fn errors_do_stuff() {
let path = TempDir::new("_rust_rocksdb_error").expect("");
let path_str = path.path().to_str().unwrap();
let db = DB::open_default(path_str).unwrap();
let opts = DBOptions::new();
match DB::destroy(&opts, path_str) {
Err(ref s) => {
assert!(
s.contains("IO error: ") && s.contains("lock"),
"expect lock fail, but got {}",
s
);
}
Ok(_) => panic!("should fail"),
}
}
#[test]
fn writebatch_works() {
let path = TempDir::new("_rust_rocksdb_writebacktest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
let batch = WriteBatch::new();
assert!(db.get(b"k1").unwrap().is_none());
assert_eq!(batch.count(), 0);
assert!(batch.is_empty());
let _ = batch.put(b"k1", b"v1111");
assert_eq!(batch.count(), 1);
assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none());
let p = db.write(batch);
assert!(p.is_ok());
let r = db.get(b"k1");
assert_eq!(r.unwrap().unwrap(), b"v1111");
let batch = WriteBatch::new();
let _ = batch.delete(b"k1");
assert_eq!(batch.count(), 1);
assert!(!batch.is_empty());
let p = db.write(batch);
assert!(p.is_ok());
assert!(db.get(b"k1").unwrap().is_none());
let batch = WriteBatch::new();
let prev_size = batch.data_size();
let _ = batch.delete(b"k1");
assert!(batch.data_size() > prev_size);
batch.clear();
assert_eq!(batch.data_size(), prev_size);
let mut batch = WriteBatch::new();
batch.put(b"k10", b"v10").unwrap();
batch.set_save_point();
batch.put(b"k11", b"v11").unwrap();
batch.set_save_point();
batch.put(b"k12", b"v12").unwrap();
batch.set_save_point();
batch.put(b"k13", b"v13").unwrap();
batch.rollback_to_save_point().unwrap();
batch.rollback_to_save_point().unwrap();
let p = db.write(batch);
assert!(p.is_ok());
let r = db.get(b"k10");
assert_eq!(r.unwrap().unwrap(), b"v10");
let r = db.get(b"k11");
assert_eq!(r.unwrap().unwrap(), b"v11");
let r = db.get(b"k12");
assert!(r.unwrap().is_none());
let r = db.get(b"k13");
assert!(r.unwrap().is_none());
let batch = WriteBatch::with_capacity(1024);
batch.put(b"kc1", b"v1").unwrap();
batch.put(b"kc2", b"v2").unwrap();
let p = db.write(batch);
assert!(p.is_ok());
let r = db.get(b"kc1");
assert!(r.unwrap().is_some());
let r = db.get(b"kc2");
assert!(r.unwrap().is_some());
}
#[test]
fn iterator_test() {
let path = TempDir::new("_rust_rocksdb_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
db.put(b"k1", b"v1111").expect("");
db.put(b"k2", b"v2222").expect("");
db.put(b"k3", b"v3333").expect("");
let mut iter = db.iter();
iter.seek(SeekKey::Start);
for (k, v) in &mut iter {
println!(
"Hello {}: {}",
str::from_utf8(&*k).unwrap(),
str::from_utf8(&*v).unwrap()
);
}
}
#[test]
fn approximate_size_test() {
let path = TempDir::new("_rust_rocksdb_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
for i in 1..8000 {
db.put(
format!("{:04}", i).as_bytes(),
format!("{:04}", i).as_bytes(),
).expect("");
}
db.flush(true).expect("");
assert!(db.get(b"0001").expect("").is_some());
db.flush(true).expect("");
let sizes = db.get_approximate_sizes(&[
Range::new(b"0000", b"2000"),
Range::new(b"2000", b"4000"),
Range::new(b"4000", b"6000"),
Range::new(b"6000", b"8000"),
Range::new(b"8000", b"9999"),
]);
assert_eq!(sizes.len(), 5);
for s in &sizes[0..4] {
assert!(*s > 0);
}
assert_eq!(sizes[4], 0);
}
#[test]
fn property_test() {
let path = TempDir::new("_rust_rocksdb_propertytest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
db.put(b"a1", b"v1").unwrap();
db.flush(true).unwrap();
let prop_name = "rocksdb.total-sst-files-size";
let st1 = db.get_property_int(prop_name).unwrap();
assert!(st1 > 0);
db.put(b"a2", b"v2").unwrap();
db.flush(true).unwrap();
let st2 = db.get_property_int(prop_name).unwrap();
assert!(st2 > st1);
}
#[test]
fn list_column_families_test() {
let path = TempDir::new("_rust_rocksdb_list_column_families_test").expect("");
let mut cfs = ["default", "cf1", "cf2", "cf3"];
{
let mut cfs_opts = vec![];
for _ in 0..cfs.len() {
cfs_opts.push(ColumnFamilyOptions::new());
}
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
for (cf, cf_opts) in cfs.iter().zip(cfs_opts) {
if *cf == "default" {
continue;
}
db.create_cf((*cf, cf_opts)).unwrap();
}
}
let opts_list_cfs = DBOptions::new();
let mut cfs_vec =
DB::list_column_families(&opts_list_cfs, path.path().to_str().unwrap()).unwrap();
cfs_vec.sort();
cfs.sort();
assert_eq!(cfs_vec, cfs);
}
#[test]
fn backup_db_test() {
let key = b"foo";
let value = b"bar";
let db_dir = TempDir::new("_rust_rocksdb_backuptest").unwrap();
let db = DB::open_default(db_dir.path().to_str().unwrap()).unwrap();
let p = db.put(key, value);
assert!(p.is_ok());
let backup_dir = TempDir::new("_rust_rocksdb_backuptest_backup").unwrap();
let backup_path = backup_dir.path().to_str().unwrap();
assert!(db.backup_at(backup_path).is_ok());
let backup_engine = BackupEngine::open(DBOptions::default(), backup_path).unwrap();
let ropt1 = RestoreOptions::new();
let mut ropt2 = RestoreOptions::new();
ropt2.set_keep_log_files(true);
let ropts = [ropt1, ropt2];
for ropt in &ropts {
let restore_dir = TempDir::new("_rust_rocksdb_backuptest_restore").unwrap();
let cfds: Vec<&str> = vec![];
let restored_db = DB::restore_from(
&backup_engine,
restore_dir.path().to_str().unwrap(),
restore_dir.path().to_str().unwrap(),
&ropt,
DBOptions::default(),
cfds,
).unwrap();
let r = restored_db.get(key);
assert!(r.unwrap().unwrap().to_utf8().unwrap() == str::from_utf8(value).unwrap());
}
}
#[test]
fn log_dir_test() {
let db_dir = TempDir::new("_rust_rocksdb_logdirtest").unwrap();
let db_path = db_dir.path().to_str().unwrap();
let log_path = format!("{}", Path::new(&db_path).join("log_path").display());
fs::create_dir_all(&log_path).unwrap();
let mut opts = DBOptions::new();
opts.create_if_missing(true);
opts.set_db_log_dir(&log_path);
DB::open(opts, db_path).unwrap();
let mut read_dir = fs::read_dir(&log_path).unwrap();
let entry = read_dir.next().unwrap().unwrap();
let name = entry.file_name();
name.to_str().unwrap().find("LOG").unwrap();
for entry in fs::read_dir(&db_path).unwrap() {
let entry = entry.unwrap();
let name = entry.file_name();
assert!(name.to_str().unwrap().find("LOG").is_none());
}
}
#[test]
fn single_delete_test() {
let path = TempDir::new("_rust_rocksdb_singledeletetest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
db.put(b"a", b"v1").unwrap();
let a = db.get(b"a");
assert_eq!(a.unwrap().unwrap().to_utf8().unwrap(), "v1");
db.single_delete(b"a").unwrap();
let a = db.get(b"a");
assert!(a.unwrap().is_none());
db.put(b"a", b"v2").unwrap();
let a = db.get(b"a");
assert_eq!(a.unwrap().unwrap().to_utf8().unwrap(), "v2");
db.single_delete(b"a").unwrap();
let a = db.get(b"a");
assert!(a.unwrap().is_none());
let cf_handle = db.cf_handle("default").unwrap();
db.put_cf(cf_handle, b"a", b"v3").unwrap();
let a = db.get_cf(cf_handle, b"a");
assert_eq!(a.unwrap().unwrap().to_utf8().unwrap(), "v3");
db.single_delete_cf(cf_handle, b"a").unwrap();
let a = db.get_cf(cf_handle, b"a");
assert!(a.unwrap().is_none());
db.put_cf(cf_handle, b"a", b"v4").unwrap();
let a = db.get_cf(cf_handle, b"a");
assert_eq!(a.unwrap().unwrap().to_utf8().unwrap(), "v4");
db.single_delete_cf(cf_handle, b"a").unwrap();
let a = db.get_cf(cf_handle, b"a");
assert!(a.unwrap().is_none());
}
#[test]
fn test_pause_bg_work() {
let path = TempDir::new("_rust_rocksdb_pause_bg_work").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
let db = Arc::new(db);
let db1 = db.clone();
let builder = thread::Builder::new().name(String::from("put-thread"));
let h = builder
.spawn(move || {
db1.put(b"k1", b"v1").unwrap();
db1.put(b"k2", b"v2").unwrap();
db1.flush(true).unwrap();
db1.compact_range(None, None);
})
.unwrap();
db.pause_bg_work();
assert_eq!(
db.get_property_int("rocksdb.num-running-compactions")
.unwrap(),
0
);
assert_eq!(
db.get_property_int("rocksdb.num-running-flushes").unwrap(),
0
);
db.continue_bg_work();
h.join().unwrap();
}
#[test]
fn snapshot_test() {
let path = "_rust_rocksdb_snapshottest";
{
let db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111");
assert!(p.is_ok());
let snap = db.snapshot();
let mut r: Result<Option<DBVector>, String> = snap.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
r = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
let p = db.put(b"k2", b"v2222");
assert!(p.is_ok());
assert!(db.get(b"k2").unwrap().is_some());
assert!(snap.get(b"k2").unwrap().is_none());
}
let opts = DBOptions::new();
assert!(DB::destroy(&opts, path).is_ok());
}
#[test]
fn block_cache_usage() {
let path = TempDir::new("_rust_rocksdb_block_cache_usage").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
for i in 0..200 {
db.put(format!("k_{}", i).as_bytes(), b"v").unwrap();
}
db.flush(true).unwrap();
for i in 0..200 {
db.get(format!("k_{}", i).as_bytes()).unwrap();
}
assert!(db.get_block_cache_usage() > 0);
let cf_handle = db.cf_handle("default").unwrap();
assert!(db.get_block_cache_usage_cf(cf_handle) > 0);
}
#[test]
fn flush_cf() {
let path = TempDir::new("_rust_rocksdb_flush_cf").expect("");
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let mut db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
db.create_cf("cf").unwrap();
let cf_handle = db.cf_handle("cf").unwrap();
for i in 0..200 {
db.put_cf(cf_handle, format!("k_{}", i).as_bytes(), b"v")
.unwrap();
}
db.flush_cf(cf_handle, true).unwrap();
let total_sst_files_size =
db.get_property_int_cf(cf_handle, "rocksdb.total-sst-files-size")
.unwrap();
assert!(total_sst_files_size > 0);
}
#[test]
fn test_supported_compression() {
let mut com = supported_compression();
let len_before = com.len();
assert!(com.len() != 0);
com.dedup();
assert_eq!(len_before, com.len());
for c in com {
println!("{:?}", c);
println!("{}", c as u32);
match c as u32 {
0...5 | 7 | 0x40 => assert!(true),
_ => assert!(false),
}
}
}
#[test]
fn test_get_all_key_versions() {
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let path = TempDir::new("_rust_rocksdb_get_all_key_version_test").expect("");
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let samples = vec![
(b"key1".to_vec(), b"value1".to_vec()),
(b"key2".to_vec(), b"value2".to_vec()),
(b"key3".to_vec(), b"value3".to_vec()),
(b"key4".to_vec(), b"value4".to_vec()),
];
for &(ref k, ref v) in &samples {
db.put(k, v).unwrap();
assert_eq!(v.as_slice(), &*db.get(k).unwrap().unwrap());
}
db.flush(true).unwrap();
let key_versions = db.get_all_key_versions(b"key2", b"key4").unwrap();
assert_eq!(key_versions[1].key, "key3");
assert_eq!(key_versions[1].value, "value3");
assert_eq!(key_versions[1].seq, 3);
}
#[test]
fn test_get_approximate_memtable_stats() {
let mut opts = DBOptions::new();
opts.create_if_missing(true);
let path = TempDir::new("_rust_rocksdb_get_approximate_memtable_stats").expect("");
let db = DB::open(opts, path.path().to_str().unwrap()).unwrap();
let samples = [
(b"key1", b"value1"),
(b"key2", b"value2"),
(b"key3", b"value3"),
(b"key4", b"value4"),
];
for &(k, v) in &samples {
db.put(k, v).unwrap();
}
let range = Range::new(b"a", b"z");
let (count, size) = db.get_approximate_memtable_stats(&range);
assert!(count > 0);
assert!(size > 0);
let cf = db.cf_handle("default").unwrap();
let (count, size) = db.get_approximate_memtable_stats_cf(cf, &range);
assert!(count > 0);
assert!(size > 0);
}
fn unsafe_iter_to_vec(iter: &mut UnsafeIter) -> Vec<(Vec<u8>, Vec<u8>)> {
iter
.into_iter()
.map(|(k, v)| {
let key =
unsafe { slice::from_raw_parts(k.iov_base as *const u8, k.iov_len as usize) };
let val =
unsafe { slice::from_raw_parts(v.iov_base as *const u8, v.iov_len as usize) };
(key.to_vec(), val.to_vec())
})
.collect::<Vec<_>>()
}
#[test]
fn unsafe_iterator_test() {
let path = TempDir::new("_rust_rocksdb_unsafe_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
db.put(b"k22", b"v20002").expect("");
db.put(b"k21", b"v20001").expect("");
db.put(b"k23", b"v20003").expect("");
db.put(b"k31", b"v30001").expect("");
db.put(b"k11", b"v1111").expect("");
db.put(b"k12", b"v1112").expect("");
let mut read_opt = ReadOptions::default();
read_opt.set_iterate_upper_bound(b"k3");
let mut iter = UnsafeIter::new(&db, read_opt);
let mut pref = b"k21".to_vec();
iter.seek(iovec {
iov_base: pref.as_mut_ptr() as *mut c_void,
iov_len: pref.len() as size_t,
});
let expected = vec![
(b"k21".to_vec(), b"v20001".to_vec()),
(b"k22".to_vec(), b"v20002".to_vec()),
(b"k23".to_vec(), b"v20003".to_vec()),
];
assert_eq!(unsafe_iter_to_vec(&mut iter), expected);
}
#[test]
fn unsafe_iterator_unitialized_test() {
let path = TempDir::new("_rust_rocksdb_unsafe_noinit_iteratortest").expect("");
let db = DB::open_default(path.path().to_str().unwrap()).unwrap();
db.put(b"k22", b"v20002").expect("");
let mut iter = UnsafeIter::new(&db, ReadOptions::default());
assert_eq!(unsafe_iter_to_vec(&mut iter), vec![]);
}
}