use std::collections::hash_map::HashMap;
use std::ffi::{CStr, CString};
use std::fmt;
use std::iter::IntoIterator;
use std::mem;
use std::ops;
use std::os::raw::{c_char, c_int, c_void};
use std::path::Path;
use std::ptr;
use std::slice;
use std::str;
use std::sync::Arc;
use rocks_sys as ll;
use crate::debug::KeyVersionVec;
use crate::iterator::Iterator;
use crate::metadata::{ColumnFamilyMetaData, LevelMetaData, LiveFileMetaData, SstFileMetaData};
use crate::options::{
ColumnFamilyOptions, CompactRangeOptions, CompactionOptions, DBOptions, FlushOptions, IngestExternalFileOptions,
Options, ReadOptions, WriteOptions,
};
use crate::slice::PinnableSlice;
use crate::snapshot::Snapshot;
use crate::table_properties::TablePropertiesCollection;
use crate::to_raw::{FromRaw, ToRaw};
use crate::transaction_log::{LogFile, TransactionLogIterator};
use crate::types::SequenceNumber;
use crate::utilities::path_to_bytes;
use crate::write_batch::WriteBatch;
use crate::{Error, Result};
pub const DEFAULT_COLUMN_FAMILY_NAME: &'static str = "default";
#[derive(Debug)]
pub struct ColumnFamilyDescriptor {
name: CString,
options: ColumnFamilyOptions,
}
impl ColumnFamilyDescriptor {
fn with_name<T: AsRef<str>>(name: T) -> ColumnFamilyDescriptor {
ColumnFamilyDescriptor {
name: CString::new(name.as_ref()).expect("need a valid column family name"),
options: ColumnFamilyOptions::default(),
}
}
fn name_as_ptr(&self) -> *const c_char {
self.name.as_ptr()
}
pub fn new<T: AsRef<str>>(name: T, options: ColumnFamilyOptions) -> ColumnFamilyDescriptor {
ColumnFamilyDescriptor {
name: CString::new(name.as_ref()).expect("need a valid column family name"),
options,
}
}
pub fn name(&self) -> &str {
self.name.to_str().expect("non utf8 cf name")
}
pub fn options(&self) -> &ColumnFamilyOptions {
&self.options
}
pub fn map_cf_options<F: FnOnce(ColumnFamilyOptions) -> ColumnFamilyOptions>(self, f: F) -> Self {
let ColumnFamilyDescriptor { name, options } = self;
let new_options = f(options);
ColumnFamilyDescriptor {
name,
options: new_options,
}
}
}
impl Default for ColumnFamilyDescriptor {
fn default() -> Self {
ColumnFamilyDescriptor::new(DEFAULT_COLUMN_FAMILY_NAME, ColumnFamilyOptions::default())
}
}
impl<T: AsRef<str>> From<T> for ColumnFamilyDescriptor {
fn from(name: T) -> Self {
ColumnFamilyDescriptor::with_name(name)
}
}
pub struct ColumnFamilyHandle {
raw: *mut ll::rocks_column_family_handle_t,
}
impl Drop for ColumnFamilyHandle {
fn drop(&mut self) {
unsafe {
ll::rocks_column_family_handle_destroy(self.raw);
}
}
}
impl AsRef<ColumnFamilyHandle> for ColumnFamilyHandle {
fn as_ref(&self) -> &ColumnFamilyHandle {
self
}
}
impl fmt::Debug for ColumnFamilyHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "CFHandle(id={}, name={:?})", self.id(), self.name())
}
}
impl ToRaw<ll::rocks_column_family_handle_t> for ColumnFamilyHandle {
fn raw(&self) -> *mut ll::rocks_column_family_handle_t {
self.raw
}
}
impl FromRaw<ll::rocks_column_family_handle_t> for ColumnFamilyHandle {
unsafe fn from_ll(raw: *mut ll::rocks_column_family_handle_t) -> ColumnFamilyHandle {
ColumnFamilyHandle { raw: raw }
}
}
impl ColumnFamilyHandle {
pub fn name(&self) -> &str {
unsafe {
let ptr = ll::rocks_column_family_handle_get_name(self.raw);
CStr::from_ptr(ptr).to_str().unwrap()
}
}
pub fn id(&self) -> u32 {
unsafe { ll::rocks_column_family_handle_get_id(self.raw) }
}
}
pub struct ColumnFamily {
handle: ColumnFamilyHandle,
db: Arc<DBRef>,
owned: bool,
}
unsafe impl Sync for ColumnFamily {}
unsafe impl Send for ColumnFamily {}
impl Drop for ColumnFamily {
fn drop(&mut self) {
if self.owned {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_destroy_column_family_handle(self.db.raw, self.raw(), &mut status);
assert!(Error::from_ll(status).is_ok());
self.handle.raw = ptr::null_mut();
}
}
}
}
impl AsRef<ColumnFamilyHandle> for ColumnFamily {
fn as_ref(&self) -> &ColumnFamilyHandle {
&self.handle
}
}
impl ops::Deref for ColumnFamily {
type Target = ColumnFamilyHandle;
fn deref(&self) -> &ColumnFamilyHandle {
&self.handle
}
}
impl fmt::Debug for ColumnFamily {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ColumnFamily")
.field("db", &self.db.name())
.field("name", &self.name())
.field("id", &self.id())
.finish()
}
}
impl ColumnFamily {
pub fn put(&self, options: &WriteOptions, key: &[u8], value: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_put_cf(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
value.as_ptr() as *const _,
value.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn delete(&self, options: &WriteOptions, key: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete_cf(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn single_delete(&self, options: &WriteOptions, key: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_single_delete_cf(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn delete_range(&self, options: &WriteOptions, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete_range_cf(
self.db.raw,
options.raw(),
self.raw(),
begin_key.as_ptr() as *const _,
begin_key.len(),
end_key.as_ptr() as *const _,
end_key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn merge(&self, options: &WriteOptions, key: &[u8], val: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_merge_cf(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
val.as_ptr() as *const _,
val.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn get(&self, options: &ReadOptions, key: &[u8]) -> Result<PinnableSlice> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
let pinnable_val = PinnableSlice::new();
unsafe {
ll::rocks_db_get_cf_pinnable(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
pinnable_val.raw(),
&mut status,
);
Error::from_ll(status).map(|_| pinnable_val)
}
}
pub fn multi_get(&self, options: &ReadOptions, keys: &[&[u8]]) -> Vec<Result<PinnableSlice>> {
let num_keys = keys.len();
let mut statuses: Vec<*mut ll::rocks_status_t> = vec![ptr::null_mut(); num_keys];
let mut c_values = Vec::with_capacity(num_keys);
let values = (0..num_keys)
.map(|_| {
let ret = PinnableSlice::new();
c_values.push(ret.raw());
ret
})
.collect::<Vec<_>>();
unsafe {
ll::rocks_db_multi_get_cf_coerce(
self.db.raw,
options.raw(),
num_keys,
self.raw(),
keys.as_ptr() as _,
c_values.as_mut_ptr(),
statuses.as_mut_ptr(),
);
}
statuses
.into_iter()
.zip(values.into_iter())
.map(|(st, val)| Error::from_ll(st).map(|_| val))
.collect()
}
pub fn key_may_exist(&self, options: &ReadOptions, key: &[u8]) -> bool {
unsafe {
ll::rocks_db_key_may_exist_cf(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
ptr::null_mut(),
ptr::null_mut(),
) != 0
}
}
pub fn key_may_get(&self, options: &ReadOptions, key: &[u8]) -> (bool, Option<Vec<u8>>) {
let mut found = 0;
let mut value: Vec<u8> = vec![];
unsafe {
let ret = ll::rocks_db_key_may_exist_cf(
self.db.raw,
options.raw(),
self.raw(),
key.as_ptr() as *const _,
key.len(),
&mut value as *mut Vec<u8> as *mut c_void,
&mut found,
);
if ret == 0 {
(false, None)
} else if found == 0 {
(true, None)
} else {
(true, Some(value))
}
}
}
pub fn new_iterator(&self, options: &ReadOptions) -> Iterator {
unsafe {
let ptr = ll::rocks_db_create_iterator_cf(self.db.raw, options.raw(), self.raw());
Iterator::from_ll(ptr)
}
}
pub fn get_property(&self, property: &str) -> Option<String> {
let mut ret = String::new();
let ok = unsafe {
ll::rocks_db_get_property_cf(
self.db.raw,
self.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut ret as *mut String as *mut c_void,
) != 0
};
if ok {
Some(ret)
} else {
None
}
}
pub fn get_int_property(&self, property: &str) -> Option<u64> {
let mut val = 0;
let ok = unsafe {
ll::rocks_db_get_int_property_cf(
self.db.raw,
self.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut val,
) != 0
};
if ok {
Some(val)
} else {
None
}
}
pub fn compact_range<R: AsCompactRange>(&self, options: &CompactRangeOptions, range: R) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_compact_range_opt_cf(
self.db.raw,
options.raw(),
self.raw(),
range.start_key() as *const _,
range.start_key_len(),
range.end_key() as *const _,
range.end_key_len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn set_options<T, H>(&self, new_options: H) -> Result<()>
where
T: AsRef<str>,
H: IntoIterator<Item = (T, T)>,
{
let mut key_ptrs = Vec::with_capacity(2);
let mut key_lens = Vec::with_capacity(2);
let mut val_ptrs = Vec::with_capacity(2);
let mut val_lens = Vec::with_capacity(2);
let num_options = new_options
.into_iter()
.map(|(key, val)| {
key_ptrs.push(key.as_ref().as_ptr() as *const c_char);
key_lens.push(key.as_ref().len());
val_ptrs.push(val.as_ref().as_ptr() as *const c_char);
val_lens.push(val.as_ref().len());
})
.count();
let mut status = ptr::null_mut();
unsafe {
ll::rocks_db_set_options_cf(
self.db.raw,
self.raw(),
num_options,
key_ptrs.as_ptr(),
key_lens.as_ptr(),
val_ptrs.as_ptr(),
val_lens.as_ptr(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn get_approximate_sizes(&self, ranges: &[ops::Range<&[u8]>]) -> Vec<u64> {
let num_ranges = ranges.len();
let mut range_start_ptrs = Vec::with_capacity(num_ranges);
let mut range_start_lens = Vec::with_capacity(num_ranges);
let mut range_end_ptrs = Vec::with_capacity(num_ranges);
let mut range_end_lens = Vec::with_capacity(num_ranges);
let mut sizes = vec![0_u64; num_ranges];
for r in ranges {
range_start_ptrs.push(r.start.as_ptr() as *const c_char);
range_start_lens.push(r.start.len());
range_end_ptrs.push(r.end.as_ptr() as *const c_char);
range_end_lens.push(r.end.len());
}
unsafe {
ll::rocks_db_get_approximate_sizes_cf(
self.db.raw,
self.raw(),
num_ranges,
range_start_ptrs.as_ptr(),
range_start_lens.as_ptr(),
range_end_ptrs.as_ptr(),
range_end_lens.as_ptr(),
sizes.as_mut_ptr(),
);
}
sizes
}
pub fn get_approximate_memtable_stats(&self, range: ops::Range<&[u8]>) -> (u64, u64) {
let mut count = 0;
let mut size = 0;
unsafe {
ll::rocks_db_get_approximate_memtable_stats_cf(
self.db.raw,
self.raw(),
range.start.as_ptr() as *const c_char,
range.start.len(),
range.end.as_ptr() as *const c_char,
range.end.len(),
&mut count,
&mut size,
);
}
(count, size)
}
pub fn ingest_external_file<P: AsRef<Path>, T: IntoIterator<Item = P>>(
&self,
external_files: T,
options: &IngestExternalFileOptions,
) -> Result<()> {
let mut num_files = 0;
let mut c_files = vec![];
let mut c_files_lens = vec![];
for f in external_files {
let fpath = f.as_ref().to_str().expect("valid utf8 path");
c_files.push(fpath.as_ptr() as *const _);
c_files_lens.push(fpath.len());
num_files += 1;
}
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_ingest_external_file_cf(
self.db.raw,
self.raw(),
c_files.as_ptr() as *const _,
c_files_lens.as_ptr(),
num_files,
options.raw(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn metadata(&self) -> ColumnFamilyMetaData {
unsafe {
let cfmeta = ll::rocks_db_get_column_family_metadata(self.db.raw, self.raw());
let total_size = ll::rocks_column_family_metadata_size(cfmeta);
let file_count = ll::rocks_column_family_metadata_file_count(cfmeta);
let name = CStr::from_ptr(ll::rocks_column_family_metadata_name(cfmeta))
.to_string_lossy()
.to_owned()
.to_string();
let num_levels = ll::rocks_column_family_metadata_levels_count(cfmeta);
let mut meta = ColumnFamilyMetaData {
size: total_size,
file_count: file_count,
name: name,
levels: Vec::with_capacity(num_levels as usize),
};
for lv in 0..num_levels {
let level = ll::rocks_column_family_metadata_levels_level(cfmeta, lv);
let lv_size = ll::rocks_column_family_metadata_levels_size(cfmeta, lv);
let num_sstfiles = ll::rocks_column_family_metadata_levels_files_count(cfmeta, lv);
let mut current_level = LevelMetaData {
level: level as u32,
size: lv_size,
files: Vec::with_capacity(num_sstfiles as usize),
};
for i in 0..num_sstfiles {
let name = CStr::from_ptr(ll::rocks_column_family_metadata_levels_files_name(cfmeta, lv, i))
.to_string_lossy()
.to_owned()
.to_string();
let db_path: String =
CStr::from_ptr(ll::rocks_column_family_metadata_levels_files_db_path(cfmeta, lv, i))
.to_string_lossy()
.to_owned()
.to_string();
let size = ll::rocks_column_family_metadata_levels_files_size(cfmeta, lv, i);
let small_seqno = ll::rocks_column_family_metadata_levels_files_smallest_seqno(cfmeta, lv, i);
let large_seqno = ll::rocks_column_family_metadata_levels_files_largest_seqno(cfmeta, lv, i);
let mut key_len = 0;
let small_key_ptr =
ll::rocks_column_family_metadata_levels_files_smallestkey(cfmeta, lv, i, &mut key_len);
let small_key = slice::from_raw_parts(small_key_ptr as *const u8, key_len).to_vec();
let large_key_ptr =
ll::rocks_column_family_metadata_levels_files_largestkey(cfmeta, lv, i, &mut key_len);
let large_key = slice::from_raw_parts(large_key_ptr as *const u8, key_len).to_vec();
let being_compacted =
ll::rocks_column_family_metadata_levels_files_being_compacted(cfmeta, lv, i) != 0;
let sst_file = SstFileMetaData {
size: size as u64,
name: name,
db_path: db_path,
smallest_seqno: small_seqno.into(),
largest_seqno: large_seqno.into(),
smallestkey: small_key,
largestkey: large_key,
being_compacted: being_compacted,
};
current_level.files.push(sst_file);
}
meta.levels.push(current_level);
}
ll::rocks_column_family_metadata_destroy(cfmeta);
meta
}
}
}
pub struct DBRef {
raw: *mut ll::rocks_db_t,
}
impl Drop for DBRef {
#[inline]
fn drop(&mut self) {
unsafe {
ll::rocks_db_destroy(self.raw);
}
}
}
impl ToRaw<ll::rocks_db_t> for DBRef {
fn raw(&self) -> *mut ll::rocks_db_t {
self.raw
}
}
unsafe impl Sync for DBRef {}
unsafe impl Send for DBRef {}
pub struct DB {
context: Arc<DBRef>,
}
impl ops::Deref for DB {
type Target = DBRef;
fn deref(&self) -> &DBRef {
&self.context
}
}
impl fmt::Debug for DB {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DB").field("name", &self.name()).finish()
}
}
unsafe impl Sync for DB {}
unsafe impl Send for DB {}
impl ToRaw<ll::rocks_db_t> for DB {
fn raw(&self) -> *mut ll::rocks_db_t {
self.context.raw
}
}
impl FromRaw<ll::rocks_db_t> for DB {
unsafe fn from_ll(raw: *mut ll::rocks_db_t) -> DB {
let context = DBRef { raw: raw };
DB {
context: Arc::new(context),
}
}
}
impl DB {
pub fn open<T: AsRef<Options>, P: AsRef<Path>>(options: T, name: P) -> Result<DB> {
let opt = options.as_ref().raw();
let dbname = CString::new(path_to_bytes(name)).unwrap();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open(opt, dbname.as_ptr(), &mut status);
Error::from_ll(status).map(|_| DB::from_ll(db_ptr))
}
}
pub fn open_with_column_families<CF: Into<ColumnFamilyDescriptor>, P: AsRef<Path>, I: IntoIterator<Item = CF>>(
options: &DBOptions,
name: P,
column_families: I,
) -> Result<(DB, Vec<ColumnFamily>)> {
let opt = options.raw();
let dbname = CString::new(path_to_bytes(name)).unwrap();
let cfs = column_families
.into_iter()
.map(|desc| desc.into())
.collect::<Vec<ColumnFamilyDescriptor>>();
let num_column_families = cfs.len();
let mut cfnames: Vec<*const c_char> = Vec::with_capacity(num_column_families);
let mut cfopts: Vec<*const ll::rocks_cfoptions_t> = Vec::with_capacity(num_column_families);
let mut cfhandles = vec![ptr::null_mut(); num_column_families];
for cf in &cfs {
cfnames.push(cf.name_as_ptr());
cfopts.push(cf.options.raw());
}
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open_column_families(
options.raw(),
dbname.as_ptr(),
num_column_families as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
&mut status,
);
Error::from_ll(status).map(|_| {
let db = DB::from_ll(db_ptr);
let db_ref = db.context.clone();
(
db,
cfhandles
.into_iter()
.map(|p| ColumnFamily {
handle: ColumnFamilyHandle { raw: p },
db: db_ref.clone(),
owned: true,
})
.collect(),
)
})
}
}
pub fn open_for_readonly<P: AsRef<Path>>(options: &Options, name: P, error_if_log_file_exist: bool) -> Result<DB> {
let dbname = CString::new(path_to_bytes(name)).unwrap();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open_for_read_only(
options.raw(),
dbname.as_ptr(),
error_if_log_file_exist as u8,
&mut status,
);
Error::from_ll(status).map(|_| DB::from_ll(db_ptr))
}
}
pub fn open_for_readonly_with_column_families<
CF: Into<ColumnFamilyDescriptor>,
P: AsRef<Path>,
I: IntoIterator<Item = CF>,
>(
options: &DBOptions,
name: P,
column_families: I,
error_if_log_file_exist: bool,
) -> Result<(DB, Vec<ColumnFamily>)> {
let dbname = CString::new(path_to_bytes(name)).unwrap();
let cf_descs = column_families
.into_iter()
.map(|desc| desc.into())
.collect::<Vec<ColumnFamilyDescriptor>>();
let num_column_families = cf_descs.len();
let mut cfnames: Vec<*const c_char> = Vec::with_capacity(num_column_families);
let mut cfopts: Vec<*const ll::rocks_cfoptions_t> = Vec::with_capacity(num_column_families);
let mut cfhandles = vec![ptr::null_mut(); num_column_families];
for cf_desc in &cf_descs {
cfnames.push(cf_desc.name_as_ptr());
cfopts.push(cf_desc.options.raw());
}
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open_for_read_only_column_families(
options.raw(),
dbname.as_ptr(),
num_column_families as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
error_if_log_file_exist as _,
&mut status,
);
Error::from_ll(status).map(|_| {
let db = DB::from_ll(db_ptr);
let db_ref = db.context.clone();
(
db,
cfhandles
.into_iter()
.map(|p| ColumnFamily {
handle: ColumnFamilyHandle { raw: p },
db: db_ref.clone(),
owned: true,
})
.collect(),
)
})
}
}
pub fn open_as_secondary<P1: AsRef<Path>, P2: AsRef<Path>>(
options: &Options,
name: P1,
secondary_path: P2,
) -> Result<DB> {
let dbname = CString::new(path_to_bytes(name)).unwrap();
let secondary_path = CString::new(path_to_bytes(secondary_path)).unwrap();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr =
ll::rocks_db_open_as_secondary(options.raw(), dbname.as_ptr(), secondary_path.as_ptr(), &mut status);
Error::from_ll(status).map(|_| DB::from_ll(db_ptr))
}
}
pub fn open_as_secondary_with_column_families<
P1: AsRef<Path>,
P2: AsRef<Path>,
CF: Into<ColumnFamilyDescriptor>,
I: IntoIterator<Item = CF>,
>(
options: &Options,
name: P1,
secondary_path: P2,
column_families: I,
) -> Result<(DB, Vec<ColumnFamily>)> {
let dbname = CString::new(path_to_bytes(name)).unwrap();
let secondary_path = CString::new(path_to_bytes(secondary_path)).unwrap();
let cf_descs = column_families
.into_iter()
.map(|desc| desc.into())
.collect::<Vec<ColumnFamilyDescriptor>>();
let num_column_families = cf_descs.len();
let mut cfnames: Vec<*const c_char> = Vec::with_capacity(num_column_families);
let mut cfopts: Vec<*const ll::rocks_cfoptions_t> = Vec::with_capacity(num_column_families);
let mut cfhandles = vec![ptr::null_mut(); num_column_families];
for cf_desc in &cf_descs {
cfnames.push(cf_desc.name_as_ptr());
cfopts.push(cf_desc.options.raw());
}
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let db_ptr = ll::rocks_db_open_as_secondary_column_families(
options.raw(),
dbname.as_ptr(),
secondary_path.as_ptr(),
num_column_families as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
&mut status,
);
Error::from_ll(status).map(|_| {
let db = DB::from_ll(db_ptr);
let db_ref = db.context.clone();
(
db,
cfhandles
.into_iter()
.map(|p| ColumnFamily {
handle: ColumnFamilyHandle { raw: p },
db: db_ref.clone(),
owned: true,
})
.collect(),
)
})
}
}
pub fn list_column_families<P: AsRef<Path>>(options: &Options, name: P) -> Result<Vec<String>> {
let dbname = CString::new(path_to_bytes(name)).unwrap();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
let mut lencfs = 0;
unsafe {
let cfs = ll::rocks_db_list_column_families(options.raw(), dbname.as_ptr(), &mut lencfs, &mut status);
Error::from_ll(status).map(|_| {
if lencfs == 0 {
vec![]
} else {
let mut ret = Vec::with_capacity(lencfs);
for i in 0..lencfs {
ret.push(CStr::from_ptr(*cfs.offset(i as isize)).to_str().unwrap().to_string());
}
ll::rocks_db_list_column_families_destroy(cfs, lencfs);
ret
}
})
}
}
pub fn create_column_family(&self, cfopts: &ColumnFamilyOptions, column_family_name: &str) -> Result<ColumnFamily> {
let dbname = CString::new(column_family_name).unwrap();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let handle = ll::rocks_db_create_column_family(self.raw(), cfopts.raw(), dbname.as_ptr(), &mut status);
Error::from_ll(status).map(|_| ColumnFamily {
handle: ColumnFamilyHandle { raw: handle },
db: self.context.clone(),
owned: true,
})
}
}
pub fn drop_column_family(&self, column_family: &ColumnFamilyHandle) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_drop_column_family(self.raw(), column_family.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn default_column_family(&self) -> ColumnFamily {
ColumnFamily {
handle: ColumnFamilyHandle {
raw: unsafe { ll::rocks_db_default_column_family(self.raw()) },
},
db: self.context.clone(),
owned: false,
}
}
}
impl DBRef {
fn raw_default_column_family(&self) -> *mut ll::rocks_column_family_handle_t {
unsafe { ll::rocks_db_default_column_family(self.raw()) }
}
pub unsafe fn close(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
ll::rocks_db_close(self.raw(), &mut status);
Error::from_ll(status)
}
pub fn resume(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_resume(self.raw(), &mut status);
}
Error::from_ll(status)
}
pub fn put(&self, options: &WriteOptions, key: &[u8], value: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_put(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
value.as_ptr() as *const _,
value.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn put_cf(
&self,
options: &WriteOptions,
column_family: &ColumnFamilyHandle,
key: &[u8],
value: &[u8],
) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_put_cf(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as *const _,
key.len(),
value.as_ptr() as *const _,
value.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn delete(&self, options: &WriteOptions, key: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn delete_cf(&self, options: &WriteOptions, column_family: &ColumnFamilyHandle, key: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete_cf(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as *const _,
key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn single_delete(&self, options: &WriteOptions, key: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_single_delete(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn single_delete_cf(
&self,
options: &WriteOptions,
column_family: &ColumnFamilyHandle,
key: &[u8],
) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_single_delete_cf(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as *const _,
key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn delete_range_cf(
&self,
options: &WriteOptions,
column_family: &ColumnFamilyHandle,
begin_key: &[u8],
end_key: &[u8],
) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete_range_cf(
self.raw(),
options.raw(),
column_family.raw(),
begin_key.as_ptr() as *const _,
begin_key.len(),
end_key.as_ptr() as *const _,
end_key.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn merge(&self, options: &WriteOptions, key: &[u8], val: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_merge(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
val.as_ptr() as *const _,
val.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn merge_cf(
&self,
options: &WriteOptions,
column_family: &ColumnFamilyHandle,
key: &[u8],
val: &[u8],
) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_merge_cf(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as *const _,
key.len(),
val.as_ptr() as *const _,
val.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn write(&self, options: &WriteOptions, updates: &WriteBatch) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_write(self.raw(), options.raw(), updates.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn get(&self, options: &ReadOptions, key: &[u8]) -> Result<PinnableSlice> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
let pinnable_val = PinnableSlice::new();
unsafe {
ll::rocks_db_get_pinnable(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
pinnable_val.raw(),
&mut status,
);
Error::from_ll(status).map(|_| pinnable_val)
}
}
pub fn get_cf(
&self,
options: &ReadOptions,
column_family: &ColumnFamilyHandle,
key: &[u8],
) -> Result<PinnableSlice> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
let pinnable_val = PinnableSlice::new();
unsafe {
ll::rocks_db_get_cf_pinnable(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as _,
key.len(),
pinnable_val.raw(),
&mut status,
);
Error::from_ll(status).map(|_| pinnable_val)
}
}
pub fn multi_get(&self, options: &ReadOptions, keys: &[&[u8]]) -> Vec<Result<PinnableSlice>> {
let num_keys = keys.len();
let mut statuses: Vec<*mut ll::rocks_status_t> = vec![ptr::null_mut(); num_keys];
let mut c_values = Vec::with_capacity(num_keys);
let values = (0..num_keys)
.map(|_| {
let ret = PinnableSlice::new();
c_values.push(ret.raw());
ret
})
.collect::<Vec<_>>();
unsafe {
ll::rocks_db_multi_get_cf_coerce(
self.raw(),
options.raw(),
num_keys,
self.raw_default_column_family(),
keys.as_ptr() as _,
c_values.as_mut_ptr(),
statuses.as_mut_ptr(),
);
}
statuses
.into_iter()
.zip(values.into_iter())
.map(|(st, val)| Error::from_ll(st).map(|_| val))
.collect()
}
pub fn multi_get_cf(
&self,
options: &ReadOptions,
column_families: &[&ColumnFamilyHandle],
keys: &[&[u8]],
) -> Vec<Result<PinnableSlice>> {
let num_keys = keys.len();
let c_cfs: Vec<_> = column_families.iter().map(|cf| cf.raw() as *const _).collect();
let mut statuses: Vec<*mut ll::rocks_status_t> = vec![ptr::null_mut(); num_keys];
let mut c_values = Vec::with_capacity(num_keys);
let values = (0..num_keys)
.map(|_| {
let ret = PinnableSlice::new();
c_values.push(ret.raw());
ret
})
.collect::<Vec<_>>();
unsafe {
ll::rocks_db_multi_get_cfs_coerce(
self.raw(),
options.raw(),
num_keys,
c_cfs.as_ptr(),
keys.as_ptr() as _,
c_values.as_mut_ptr(),
statuses.as_mut_ptr(),
);
}
statuses
.into_iter()
.zip(values.into_iter())
.map(|(st, val)| Error::from_ll(st).map(|_| val))
.collect()
}
pub fn key_may_exist(&self, options: &ReadOptions, key: &[u8]) -> bool {
unsafe {
ll::rocks_db_key_may_exist(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
ptr::null_mut(),
ptr::null_mut(),
) != 0
}
}
pub fn key_may_get(&self, options: &ReadOptions, key: &[u8]) -> (bool, Option<Vec<u8>>) {
let mut found = 0;
let mut value: Vec<u8> = vec![];
unsafe {
let ret = ll::rocks_db_key_may_exist(
self.raw(),
options.raw(),
key.as_ptr() as *const _,
key.len(),
&mut value as *mut Vec<u8> as *mut c_void,
&mut found,
);
if ret == 0 {
(false, None)
} else if found == 0 {
(true, None)
} else {
(true, Some(value))
}
}
}
pub fn key_may_exist_cf(&self, options: &ReadOptions, column_family: &ColumnFamilyHandle, key: &[u8]) -> bool {
unsafe {
ll::rocks_db_key_may_exist_cf(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as *const _,
key.len(),
ptr::null_mut(),
ptr::null_mut(),
) != 0
}
}
pub fn key_may_get_cf(
&self,
options: &ReadOptions,
column_family: &ColumnFamilyHandle,
key: &[u8],
) -> (bool, Option<Vec<u8>>) {
let mut found = 0;
let mut value: Vec<u8> = vec![];
unsafe {
let ret = ll::rocks_db_key_may_exist_cf(
self.raw(),
options.raw(),
column_family.raw(),
key.as_ptr() as *const _,
key.len(),
&mut value as *mut Vec<u8> as *mut c_void,
&mut found,
);
if ret == 0 {
(false, None)
} else if found == 0 {
(true, None)
} else {
(true, Some(value))
}
}
}
pub fn new_iterator<'c, 'd: 'c>(&'d self, options: &ReadOptions) -> Iterator<'c> {
unsafe {
let ptr = ll::rocks_db_create_iterator(self.raw(), options.raw());
Iterator::from_ll(ptr)
}
}
pub fn new_iterator_cf<'c, 'd: 'c>(&self, options: &ReadOptions, cf: &'d ColumnFamilyHandle) -> Iterator<'c> {
unsafe {
let ptr = ll::rocks_db_create_iterator_cf(self.raw(), options.raw(), cf.raw());
Iterator::from_ll(ptr)
}
}
pub fn new_iterators<'c, 'b: 'c, T: AsRef<ColumnFamilyHandle>>(
&'b self,
options: &ReadOptions,
cfs: &[T],
) -> Result<Vec<Iterator<'c>>> {
let c_cfs = cfs.iter().map(|cf| cf.as_ref().raw()).collect::<Vec<_>>();
let cfs_len = cfs.len();
let mut c_iters = vec![ptr::null_mut(); cfs_len];
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_create_iterators(
self.raw(),
options.raw(),
c_cfs.as_ptr() as _,
c_iters.as_mut_ptr(),
cfs_len,
&mut status,
);
Error::from_ll(status).map(|_| c_iters.into_iter().map(|ptr| Iterator::from_ll(ptr)).collect())
}
}
pub fn get_snapshot(&self) -> Option<Snapshot> {
unsafe {
let ptr = ll::rocks_db_get_snapshot(self.raw());
if ptr.is_null() {
None
} else {
Some(Snapshot::from_ll(ptr))
}
}
}
pub fn release_snapshot(&self, snapshot: Snapshot) {
unsafe {
ll::rocks_db_release_snapshot(self.raw(), snapshot.raw());
}
}
pub fn get_property(&self, property: &str) -> Option<String> {
let mut ret = String::new();
let ok = unsafe {
ll::rocks_db_get_property(
self.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut ret as *mut String as *mut c_void,
) != 0
};
if ok {
Some(ret)
} else {
None
}
}
pub fn get_property_cf(&self, column_family: &ColumnFamilyHandle, property: &str) -> Option<String> {
let mut ret = String::new();
let ok = unsafe {
ll::rocks_db_get_property_cf(
self.raw(),
column_family.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut ret as *mut String as *mut c_void,
) != 0
};
if ok {
Some(ret)
} else {
None
}
}
pub fn get_map_property(&self, property: &str) -> Option<()> {
unimplemented!()
}
pub fn get_int_property(&self, property: &str) -> Option<u64> {
let mut val = 0;
let ok = unsafe {
ll::rocks_db_get_int_property(
self.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut val,
) != 0
};
if ok {
Some(val)
} else {
None
}
}
pub fn get_int_property_cf(&self, column_family: &ColumnFamilyHandle, property: &str) -> Option<u64> {
let mut val = 0;
let ok = unsafe {
ll::rocks_db_get_int_property_cf(
self.raw(),
column_family.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut val,
) != 0
};
if ok {
Some(val)
} else {
None
}
}
pub fn get_aggregated_int_property(&self, property: &str) -> Option<u64> {
let mut val = 0;
let ok = unsafe {
ll::rocks_db_get_aggregated_int_property(
self.raw(),
property.as_bytes().as_ptr() as *const _,
property.len(),
&mut val,
) != 0
};
if ok {
Some(val)
} else {
None
}
}
pub fn get_approximate_sizes(&self, column_family: &ColumnFamilyHandle, ranges: &[ops::Range<&[u8]>]) -> Vec<u64> {
let num_ranges = ranges.len();
let mut range_start_ptrs = Vec::with_capacity(num_ranges);
let mut range_start_lens = Vec::with_capacity(num_ranges);
let mut range_end_ptrs = Vec::with_capacity(num_ranges);
let mut range_end_lens = Vec::with_capacity(num_ranges);
let mut sizes = vec![0_u64; num_ranges];
for r in ranges {
range_start_ptrs.push(r.start.as_ptr() as *const c_char);
range_start_lens.push(r.start.len());
range_end_ptrs.push(r.end.as_ptr() as *const c_char);
range_end_lens.push(r.end.len());
}
unsafe {
ll::rocks_db_get_approximate_sizes_cf(
self.raw(),
column_family.raw(),
num_ranges,
range_start_ptrs.as_ptr(),
range_start_lens.as_ptr(),
range_end_ptrs.as_ptr(),
range_end_lens.as_ptr(),
sizes.as_mut_ptr(),
);
}
sizes
}
pub fn get_approximate_memtable_stats(
&self,
column_family: &ColumnFamilyHandle,
range: ops::Range<&[u8]>,
) -> (u64, u64) {
let mut count = 0;
let mut size = 0;
unsafe {
ll::rocks_db_get_approximate_memtable_stats_cf(
self.raw(),
column_family.raw(),
range.start.as_ptr() as *const c_char,
range.start.len(),
range.end.as_ptr() as *const c_char,
range.end.len(),
&mut count,
&mut size,
);
}
(count, size)
}
pub fn compact_range<R: AsCompactRange>(&self, options: &CompactRangeOptions, range: R) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_compact_range_opt(
self.raw(),
options.raw(),
range.start_key() as *const _,
range.start_key_len(),
range.end_key() as *const _,
range.end_key_len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn set_options<T, H>(&self, column_family: &ColumnFamilyHandle, new_options: H) -> Result<()>
where
T: AsRef<str>,
H: IntoIterator<Item = (T, T)>,
{
let mut key_ptrs = Vec::with_capacity(2);
let mut key_lens = Vec::with_capacity(2);
let mut val_ptrs = Vec::with_capacity(2);
let mut val_lens = Vec::with_capacity(2);
let num_options = new_options
.into_iter()
.map(|(key, val)| {
key_ptrs.push(key.as_ref().as_ptr() as *const c_char);
key_lens.push(key.as_ref().len());
val_ptrs.push(val.as_ref().as_ptr() as *const c_char);
val_lens.push(val.as_ref().len());
})
.count();
let mut status = ptr::null_mut();
unsafe {
ll::rocks_db_set_options_cf(
self.raw(),
column_family.raw,
num_options,
key_ptrs.as_ptr(),
key_lens.as_ptr(),
val_ptrs.as_ptr(),
val_lens.as_ptr(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn set_db_options(&self, new_options: &HashMap<&str, &str>) -> Result<()> {
let num_options = new_options.len();
let mut key_ptrs = Vec::with_capacity(num_options);
let mut key_lens = Vec::with_capacity(num_options);
let mut val_ptrs = Vec::with_capacity(num_options);
let mut val_lens = Vec::with_capacity(num_options);
new_options
.iter()
.map(|(key, val)| {
key_ptrs.push(key.as_ptr() as *const c_char);
key_lens.push(key.len());
val_ptrs.push(val.as_ptr() as *const c_char);
val_lens.push(val.len());
})
.last();
let mut status = ptr::null_mut();
unsafe {
ll::rocks_db_set_db_options(
self.raw(),
num_options,
key_ptrs.as_ptr(),
key_lens.as_ptr(),
val_ptrs.as_ptr(),
val_lens.as_ptr(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn compact_files<P: AsRef<Path>, I: IntoIterator<Item = P>>(
&self,
compact_options: &CompactionOptions,
input_file_names: I,
output_level: i32,
) -> Result<()> {
self.compact_files_to(compact_options, input_file_names, output_level, -1)
}
pub fn compact_files_to<P: AsRef<Path>, I: IntoIterator<Item = P>>(
&self,
compact_options: &CompactionOptions,
input_file_names: I,
output_level: i32,
output_path_id: i32,
) -> Result<()> {
let mut c_file_names = Vec::new();
let mut c_file_name_sizes = Vec::new();
for file_name in input_file_names {
let file_path = file_name.as_ref().to_str().unwrap();
c_file_names.push(file_path.as_bytes().as_ptr() as *const _);
c_file_name_sizes.push(file_path.len());
}
let mut status = ptr::null_mut();
unsafe {
ll::rocks_db_compact_files(
self.raw(),
compact_options.raw(),
c_file_names.len(),
c_file_names.as_ptr(),
c_file_name_sizes.as_ptr(),
output_level as c_int,
output_path_id as c_int,
&mut status,
);
Error::from_ll(status)
}
}
pub fn pause_background_work(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_pause_background_work(self.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn continue_background_work(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_continue_background_work(self.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn cancel_background_work(&self, wait: bool) {
unsafe {
ll::rocks_cancel_all_background_work(self.raw(), wait as u8);
}
}
pub fn enable_auto_compaction(&self, column_family_handles: &[&ColumnFamilyHandle]) -> Result<()> {
let c_cfs = column_family_handles
.iter()
.map(|cf| cf.as_ref().raw() as *const _)
.collect::<Vec<*const _>>();
let cfs_len = column_family_handles.len();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_enable_auto_compaction(self.raw(), c_cfs.as_ptr(), cfs_len, &mut status);
Error::from_ll(status)
}
}
pub fn number_levels(&self) -> u32 {
unsafe { ll::rocks_db_number_levels(self.raw()) as u32 }
}
pub fn max_mem_compaction_level(&self) -> u32 {
unsafe { ll::rocks_db_max_mem_compaction_level(self.raw()) as u32 }
}
pub fn level0_stop_write_trigger(&self) -> u32 {
unsafe { ll::rocks_db_level0_stop_write_trigger(self.raw()) as u32 }
}
pub fn name(&self) -> String {
let mut name = String::new();
unsafe {
ll::rocks_db_get_name(self.raw(), &mut name as *mut String as *mut c_void);
}
name
}
pub fn flush(&self, options: &FlushOptions) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_flush(self.raw(), options.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn sync_wal(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_sync_wal(self.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn get_latest_sequence_number(&self) -> SequenceNumber {
unsafe { ll::rocks_db_get_latest_sequence_number(self.raw()).into() }
}
pub fn disable_file_deletions(&self) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_disable_file_deletions(self.raw(), &mut status);
Error::from_ll(status)
}
}
pub fn enable_file_deletions(&self, force: bool) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_enable_file_deletions(self.raw(), force as u8, &mut status);
Error::from_ll(status)
}
}
pub fn get_live_files(&self, flush_memtable: bool) -> Result<(u64, Vec<String>)> {
let mut manifest_file_size = 0;
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let files =
ll::rocks_db_get_live_files(self.raw(), flush_memtable as u8, &mut manifest_file_size, &mut status);
Error::from_ll(status).map(|_| {
let n = ll::cxx_string_vector_size(files) as usize;
let mut ret = Vec::with_capacity(n);
for i in 0..n {
let f = slice::from_raw_parts(
ll::cxx_string_vector_nth(files, i) as *const u8,
ll::cxx_string_vector_nth_size(files, i),
);
ret.push(String::from_utf8_lossy(f).to_owned().to_string());
}
ll::cxx_string_vector_destory(files);
(manifest_file_size, ret)
})
}
}
pub fn get_sorted_wal_files(&self) -> Result<Vec<LogFile>> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let cfiles = ll::rocks_db_get_sorted_wal_files(self.raw(), &mut status);
Error::from_ll(status).map(|()| {
let num_files = ll::rocks_logfiles_size(cfiles);
let mut files = Vec::with_capacity(num_files);
for i in 0..num_files {
let mut path_name = String::new();
ll::rocks_logfiles_nth_path_name(cfiles, i, &mut path_name as *mut String as *mut c_void);
let log_num = ll::rocks_logfiles_nth_log_number(cfiles, i);
let file_type = mem::transmute(ll::rocks_logfiles_nth_type(cfiles, i));
let start_seq = ll::rocks_logfiles_nth_start_sequence(cfiles, i);
let file_size = ll::rocks_logfiles_nth_file_size(cfiles, i);
files.push(LogFile {
path_name: path_name,
log_number: log_num,
file_type: file_type,
start_sequence: start_seq.into(),
size_in_bytes: file_size,
})
}
ll::rocks_logfiles_destroy(cfiles);
files
})
}
}
pub fn get_updates_since(&self, seq_number: SequenceNumber) -> Result<TransactionLogIterator> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
let iter_raw_ptr = ll::rocks_db_get_update_since(self.raw(), seq_number.0, &mut status);
Error::from_ll(status).map(|_| TransactionLogIterator::from_ll(iter_raw_ptr))
}
}
pub fn delete_file(&self, name: &str) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete_file(
self.raw(),
name.as_bytes().as_ptr() as *const _,
name.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn delete_files_in_range(&self, column_family: &ColumnFamilyHandle, begin: &[u8], end: &[u8]) -> Result<()> {
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_delete_files_in_range(
self.raw(),
column_family.raw(),
begin.as_ptr() as *const _,
begin.len(),
end.as_ptr() as *const _,
end.len(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn get_live_files_metadata(&self) -> Vec<LiveFileMetaData> {
unsafe {
let livefiles = ll::rocks_db_get_livefiles_metadata(self.raw());
let cnt = ll::rocks_livefiles_count(livefiles);
let mut ret = Vec::with_capacity(cnt as usize);
for i in 0..cnt {
let name = CStr::from_ptr(ll::rocks_livefiles_name(livefiles, i))
.to_string_lossy()
.to_owned()
.to_string();
let db_path: String = CStr::from_ptr(ll::rocks_livefiles_db_path(livefiles, i))
.to_string_lossy()
.to_owned()
.to_string();
let size = ll::rocks_livefiles_size(livefiles, i);
let small_seqno = ll::rocks_livefiles_smallest_seqno(livefiles, i);
let large_seqno = ll::rocks_livefiles_largest_seqno(livefiles, i);
let mut key_len = 0;
let small_key_ptr = ll::rocks_livefiles_smallestkey(livefiles, i, &mut key_len);
let small_key = slice::from_raw_parts(small_key_ptr as *const u8, key_len).to_vec();
let large_key_ptr = ll::rocks_livefiles_largestkey(livefiles, i, &mut key_len);
let large_key = slice::from_raw_parts(large_key_ptr as *const u8, key_len).to_vec();
let being_compacted = ll::rocks_livefiles_being_compacted(livefiles, i) != 0;
let cf_name = CStr::from_ptr(ll::rocks_livefiles_column_family_name(livefiles, i))
.to_string_lossy()
.to_owned()
.to_string();
let level = ll::rocks_livefiles_level(livefiles, i);
let meta = LiveFileMetaData {
sst_file: SstFileMetaData {
size: size as u64,
name: name,
db_path: db_path,
smallest_seqno: small_seqno.into(),
largest_seqno: large_seqno.into(),
smallestkey: small_key,
largestkey: large_key,
being_compacted: being_compacted,
},
column_family_name: cf_name,
level: level as u32,
};
ret.push(meta);
}
ll::rocks_livefiles_destroy(livefiles);
ret
}
}
pub fn get_column_family_metadata(&self, column_family: &ColumnFamilyHandle) -> ColumnFamilyMetaData {
unsafe {
let cfmeta = ll::rocks_db_get_column_family_metadata(self.raw(), column_family.raw());
let total_size = ll::rocks_column_family_metadata_size(cfmeta);
let file_count = ll::rocks_column_family_metadata_file_count(cfmeta);
let name = CStr::from_ptr(ll::rocks_column_family_metadata_name(cfmeta))
.to_string_lossy()
.to_owned()
.to_string();
let num_levels = ll::rocks_column_family_metadata_levels_count(cfmeta);
let mut meta = ColumnFamilyMetaData {
size: total_size,
file_count: file_count,
name: name,
levels: Vec::with_capacity(num_levels as usize),
};
for lv in 0..num_levels {
let level = ll::rocks_column_family_metadata_levels_level(cfmeta, lv);
let lv_size = ll::rocks_column_family_metadata_levels_size(cfmeta, lv);
let num_sstfiles = ll::rocks_column_family_metadata_levels_files_count(cfmeta, lv);
let mut current_level = LevelMetaData {
level: level as u32,
size: lv_size,
files: Vec::with_capacity(num_sstfiles as usize),
};
for i in 0..num_sstfiles {
let name = CStr::from_ptr(ll::rocks_column_family_metadata_levels_files_name(cfmeta, lv, i))
.to_string_lossy()
.to_owned()
.to_string();
let db_path: String =
CStr::from_ptr(ll::rocks_column_family_metadata_levels_files_db_path(cfmeta, lv, i))
.to_string_lossy()
.to_owned()
.to_string();
let size = ll::rocks_column_family_metadata_levels_files_size(cfmeta, lv, i);
let small_seqno = ll::rocks_column_family_metadata_levels_files_smallest_seqno(cfmeta, lv, i);
let large_seqno = ll::rocks_column_family_metadata_levels_files_largest_seqno(cfmeta, lv, i);
let mut key_len = 0;
let small_key_ptr =
ll::rocks_column_family_metadata_levels_files_smallestkey(cfmeta, lv, i, &mut key_len);
let small_key = slice::from_raw_parts(small_key_ptr as *const u8, key_len).to_vec();
let large_key_ptr =
ll::rocks_column_family_metadata_levels_files_largestkey(cfmeta, lv, i, &mut key_len);
let large_key = slice::from_raw_parts(large_key_ptr as *const u8, key_len).to_vec();
let being_compacted =
ll::rocks_column_family_metadata_levels_files_being_compacted(cfmeta, lv, i) != 0;
let sst_file = SstFileMetaData {
size: size as u64,
name: name,
db_path: db_path,
smallest_seqno: small_seqno.into(),
largest_seqno: large_seqno.into(),
smallestkey: small_key,
largestkey: large_key,
being_compacted: being_compacted,
};
current_level.files.push(sst_file);
}
meta.levels.push(current_level);
}
ll::rocks_column_family_metadata_destroy(cfmeta);
meta
}
}
pub fn ingest_external_file<P: AsRef<Path>, T: IntoIterator<Item = P>>(
&self,
external_files: T,
options: &IngestExternalFileOptions,
) -> Result<()> {
let mut num_files = 0;
let mut c_files = vec![];
let mut c_files_lens = vec![];
for f in external_files {
let fpath = f.as_ref().to_str().expect("valid utf8 path");
c_files.push(fpath.as_ptr() as *const _);
c_files_lens.push(fpath.len());
num_files += 1;
}
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_ingest_external_file(
self.raw(),
c_files.as_ptr() as *const _,
c_files_lens.as_ptr(),
num_files,
options.raw(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn ingest_external_file_cf<P: AsRef<Path>, T: IntoIterator<Item = P>>(
&self,
column_family: &ColumnFamilyHandle,
external_files: T,
options: &IngestExternalFileOptions,
) -> Result<()> {
let mut num_files = 0;
let mut c_files = vec![];
let mut c_files_lens = vec![];
for f in external_files {
let fpath = f.as_ref().to_str().expect("valid utf8 path");
c_files.push(fpath.as_ptr() as *const _);
c_files_lens.push(fpath.len());
num_files += 1;
}
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_ingest_external_file_cf(
self.raw(),
column_family.raw,
c_files.as_ptr() as *const _,
c_files_lens.as_ptr(),
num_files,
options.raw(),
&mut status,
);
Error::from_ll(status)
}
}
pub fn get_db_identity(&self) -> Result<String> {
let mut identity = String::new();
let mut status = ptr::null_mut::<ll::rocks_status_t>();
unsafe {
ll::rocks_db_get_db_identity(self.raw(), &mut identity as *mut String as *mut _, &mut status);
Error::from_ll(status).map(|_| identity)
}
}
pub fn get_properties_of_all_tables_cf(
&self,
column_family: &ColumnFamilyHandle,
) -> Result<TablePropertiesCollection> {
let mut status = ptr::null_mut();
unsafe {
let props_ptr = ll::rocks_db_get_properties_of_all_tables(self.raw(), column_family.raw, &mut status);
Error::from_ll(status).map(|()| TablePropertiesCollection::from_ll(props_ptr))
}
}
pub fn get_properties_of_tables_in_range(
&self,
column_family: &ColumnFamilyHandle,
ranges: &[ops::Range<&[u8]>],
) -> Result<TablePropertiesCollection> {
let mut status = ptr::null_mut();
let num_ranges = ranges.len();
let mut start_keys = Vec::with_capacity(num_ranges);
let mut start_key_lens = Vec::with_capacity(num_ranges);
let mut limit_keys = Vec::with_capacity(num_ranges);
let mut limit_key_lens = Vec::with_capacity(num_ranges);
for r in ranges {
start_keys.push(r.start.as_ptr() as *const c_char);
start_key_lens.push(r.start.len());
limit_keys.push(r.end.as_ptr() as *const c_char);
limit_key_lens.push(r.end.len());
}
unsafe {
let props_ptr = ll::rocks_db_get_properties_of_tables_in_range(
self.raw(),
column_family.raw,
num_ranges,
start_keys.as_ptr(),
start_key_lens.as_ptr(),
limit_keys.as_ptr(),
limit_key_lens.as_ptr(),
&mut status,
);
Error::from_ll(status).map(|()| TablePropertiesCollection::from_ll(props_ptr))
}
}
pub fn get_all_key_versions(&self, begin_key: &[u8], end_key: &[u8]) -> Result<KeyVersionVec> {
let mut status = ptr::null_mut();
unsafe {
let coll_ptr = ll::rocks_db_get_all_key_versions(
self.raw(),
begin_key.as_ptr() as *const _,
begin_key.len(),
end_key.as_ptr() as *const _,
end_key.len(),
&mut status,
);
Error::from_ll(status).map(|()| KeyVersionVec::from_ll(coll_ptr))
}
}
pub fn try_catch_up_with_primary(&self) -> Result<()> {
let mut status = ptr::null_mut();
unsafe {
ll::rocks_db_try_catch_up_with_primary(self.raw(), &mut status);
}
Error::from_ll(status)
}
}
pub fn destroy_db<P: AsRef<Path>>(options: &Options, name: P) -> Result<()> {
let name = name.as_ref().to_str().expect("valid utf8");
let mut status = ptr::null_mut();
unsafe {
ll::rocks_destroy_db(options.raw(), name.as_ptr() as *const _, name.len(), &mut status);
Error::from_ll(status)
}
}
pub fn repair_db_with_cf<P: AsRef<Path>>(
db_options: &DBOptions,
dbname: P,
column_families: &[&ColumnFamilyDescriptor],
) -> Result<()> {
unimplemented!()
}
pub fn repair_db_with_unknown_cf_opts<P: AsRef<Path>>(
db_options: &DBOptions,
dbname: P,
column_families: &[&ColumnFamilyDescriptor],
unknown_cf_opts: &ColumnFamilyOptions,
) -> Result<()> {
unimplemented!()
}
pub fn repair_db<P: AsRef<Path>>(options: &Options, name: P) -> Result<()> {
let name = name.as_ref().to_str().expect("valid utf8");
let mut status = ptr::null_mut();
unsafe {
ll::rocks_repair_db(options.raw(), name.as_ptr() as *const _, name.len(), &mut status);
Error::from_ll(status)
}
}
pub trait AsCompactRange {
fn start_key(&self) -> *const u8 {
ptr::null()
}
fn start_key_len(&self) -> usize {
0
}
fn end_key(&self) -> *const u8 {
ptr::null()
}
fn end_key_len(&self) -> usize {
0
}
}
#[deprecated(since = "0.1.7", note = "Please use RangeInclusive instead: `start..=end`")]
impl AsCompactRange for ops::Range<&'_ [u8]> {
fn start_key(&self) -> *const u8 {
self.start.as_ptr()
}
fn start_key_len(&self) -> usize {
self.start.len()
}
fn end_key(&self) -> *const u8 {
self.end.as_ptr()
}
fn end_key_len(&self) -> usize {
self.end.len()
}
}
impl<'a> AsCompactRange for ops::RangeInclusive<&'a [u8]> {
fn start_key(&self) -> *const u8 {
self.start().as_ptr()
}
fn start_key_len(&self) -> usize {
self.start().len()
}
fn end_key(&self) -> *const u8 {
self.end().as_ptr()
}
fn end_key_len(&self) -> usize {
self.end().len()
}
}
#[deprecated(since = "0.1.7", note = "Please use RangeToInclusive instead: `..=end`")]
impl<'a> AsCompactRange for ops::RangeTo<&'a [u8]> {
fn end_key(&self) -> *const u8 {
self.end.as_ptr()
}
fn end_key_len(&self) -> usize {
self.end.len()
}
}
impl<'a> AsCompactRange for ops::RangeToInclusive<&'a [u8]> {
fn end_key(&self) -> *const u8 {
self.end.as_ptr()
}
fn end_key_len(&self) -> usize {
self.end.len()
}
}
impl<'a> AsCompactRange for ops::RangeFrom<&'a [u8]> {
fn start_key(&self) -> *const u8 {
self.start.as_ptr()
}
fn start_key_len(&self) -> usize {
self.start.len()
}
}
impl AsCompactRange for ops::RangeFull {}