use std::u32;
use std::slice;
use std::str;
use std::fmt;
use std::mem;
use std::marker::PhantomData;
use std::ops;
use std::os::raw::{c_char, c_void};
use rocks_sys as ll;
use crate::types::SequenceNumber;
use crate::to_raw::{FromRaw, ToRaw};
pub const UNKNOWN_COLUMN_FAMILY_ID: u32 = u32::MAX;
pub struct TablePropertiesCollection {
raw: *mut ll::rocks_table_props_collection_t,
}
impl fmt::Debug for TablePropertiesCollection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TablePropertiesCollection {{{} items}}", self.len())
}
}
impl FromRaw<ll::rocks_table_props_collection_t> for TablePropertiesCollection {
unsafe fn from_ll(raw: *mut ll::rocks_table_props_collection_t) -> Self {
TablePropertiesCollection { raw: raw }
}
}
impl Drop for TablePropertiesCollection {
fn drop(&mut self) {
unsafe {
ll::rocks_table_props_collection_destroy(self.raw);
}
}
}
impl TablePropertiesCollection {
pub fn len(&self) -> usize {
unsafe { ll::rocks_table_props_collection_size(self.raw) as usize }
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> TablePropertiesCollectionIter {
TablePropertiesCollectionIter {
raw: unsafe { ll::rocks_table_props_collection_iter_create(self.raw) },
size: self.len(),
at_end: self.is_empty(),
_marker: PhantomData,
}
}
}
#[doc(hidden)]
pub struct TablePropertiesCollectionIter<'a> {
raw: *mut ll::rocks_table_props_collection_iter_t,
size: usize,
at_end: bool,
_marker: PhantomData<&'a ()>,
}
impl<'a> Drop for TablePropertiesCollectionIter<'a> {
fn drop(&mut self) {
unsafe {
ll::rocks_table_props_collection_iter_destroy(self.raw);
}
}
}
impl<'a> Iterator for TablePropertiesCollectionIter<'a> {
type Item = (&'a str, TableProperties<'a>);
fn next(&mut self) -> Option<(&'a str, TableProperties<'a>)> {
if self.raw.is_null() || self.at_end {
None
} else {
let mut key_len = 0;
unsafe {
let key_ptr = ll::rocks_table_props_collection_iter_key(self.raw, &mut key_len);
let prop = TableProperties::from_ll(ll::rocks_table_props_collection_iter_value(self.raw));
self.at_end = ll::rocks_table_props_collection_iter_next(self.raw) == 0;
let key = str::from_utf8_unchecked(slice::from_raw_parts(key_ptr as *const u8, key_len));
Some((key, prop))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.size, Some(self.size))
}
}
#[repr(C)]
pub struct UserCollectedProperties {
raw: *mut c_void,
}
impl fmt::Debug for UserCollectedProperties {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UserCollectedProperties {{{} items}}", self.len())
}
}
impl ToRaw<ll::rocks_user_collected_props_t> for UserCollectedProperties {
fn raw(&self) -> *mut ll::rocks_user_collected_props_t {
unsafe { mem::transmute(self as *const UserCollectedProperties as *mut c_void) }
}
}
impl UserCollectedProperties {
pub fn insert(&mut self, key: &str, value: &[u8]) {
unsafe {
ll::rocks_user_collected_props_insert(
self.raw(),
key.as_ptr() as *const _,
key.len(),
value.as_ptr() as *const _,
value.len(),
);
}
}
pub fn len(&self) -> usize {
unsafe { ll::rocks_user_collected_props_size(self.raw()) as usize }
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iter(&self) -> UserCollectedPropertiesIter {
UserCollectedPropertiesIter {
raw: unsafe { ll::rocks_user_collected_props_iter_create(self.raw()) },
size: self.len(),
at_end: self.is_empty(),
_marker: PhantomData,
}
}
}
impl<'a> ops::Index<&'a str> for UserCollectedProperties {
type Output = [u8];
fn index(&self, index: &'a str) -> &[u8] {
let mut size = 0;
unsafe {
let val_ptr = ll::rocks_user_collected_props_at(
self.raw(),
index.as_bytes().as_ptr() as *const c_char,
index.len(),
&mut size,
);
if val_ptr.is_null() {
panic!("key not found {:?}", index);
}
slice::from_raw_parts(val_ptr as *const u8, size)
}
}
}
pub struct UserCollectedPropertiesIter<'a> {
raw: *mut ll::rocks_user_collected_props_iter_t,
size: usize,
at_end: bool,
_marker: PhantomData<&'a ()>,
}
impl<'a> Drop for UserCollectedPropertiesIter<'a> {
fn drop(&mut self) {
unsafe {
ll::rocks_user_collected_props_iter_destroy(self.raw);
}
}
}
impl<'a> Iterator for UserCollectedPropertiesIter<'a> {
type Item = (&'a str, &'a [u8]);
fn next(&mut self) -> Option<(&'a str, &'a [u8])> {
if self.raw.is_null() || self.at_end {
None
} else {
let mut key_len = 0;
let mut value_len = 0;
unsafe {
let key_ptr = ll::rocks_user_collected_props_iter_key(self.raw, &mut key_len);
let value_ptr = ll::rocks_user_collected_props_iter_value(self.raw, &mut value_len);
self.at_end = ll::rocks_user_collected_props_iter_next(self.raw) == 0;
let key = str::from_utf8_unchecked(slice::from_raw_parts(key_ptr as *const u8, key_len));
let value = slice::from_raw_parts(value_ptr as *const u8, value_len);
Some((key, value))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.size, Some(self.size))
}
}
#[repr(C)]
pub struct TableProperties<'a> {
raw: *mut ll::rocks_table_props_t,
_marker: PhantomData<&'a ()>,
}
impl<'a> Drop for TableProperties<'a> {
fn drop(&mut self) {
unsafe {
ll::rocks_table_props_destroy(self.raw);
}
}
}
impl<'a> FromRaw<ll::rocks_table_props_t> for TableProperties<'a> {
unsafe fn from_ll(raw: *mut ll::rocks_table_props_t) -> Self {
TableProperties {
raw: raw,
_marker: PhantomData,
}
}
}
impl<'a> fmt::Display for TableProperties<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut ret = String::new();
unsafe {
ll::rocks_table_props_to_string(self.raw, &mut ret as *mut String as *mut c_void);
}
write!(f, "{}", ret)
}
}
impl<'a> fmt::Debug for TableProperties<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "TableProperties {{{:?}}}", self.to_string())
}
}
impl<'a> TableProperties<'a> {
pub fn data_size(&self) -> u64 {
unsafe { ll::rocks_table_props_get_data_size(self.raw) }
}
pub fn index_size(&self) -> u64 {
unsafe { ll::rocks_table_props_get_index_size(self.raw) }
}
pub fn filter_size(&self) -> u64 {
unsafe { ll::rocks_table_props_get_filter_size(self.raw) }
}
pub fn raw_key_size(&self) -> u64 {
unsafe { ll::rocks_table_props_get_raw_key_size(self.raw) }
}
pub fn raw_value_size(&self) -> u64 {
unsafe { ll::rocks_table_props_get_raw_value_size(self.raw) }
}
pub fn num_data_blocks(&self) -> u64 {
unsafe { ll::rocks_table_props_get_num_data_blocks(self.raw) }
}
pub fn num_entries(&self) -> u64 {
unsafe { ll::rocks_table_props_get_num_entries(self.raw) }
}
pub fn format_version(&self) -> u64 {
unsafe { ll::rocks_table_props_get_format_version(self.raw) }
}
pub fn fixed_key_len(&self) -> u64 {
unsafe { ll::rocks_table_props_get_format_version(self.raw) }
}
pub fn column_family_id(&self) -> u32 {
unsafe { ll::rocks_table_props_get_column_family_id(self.raw) }
}
pub fn column_family_name(&self) -> Option<&str> {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_column_family_name(self.raw, &mut len);
if len != 0 {
Some(str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len)))
} else {
None
}
}
}
pub fn filter_policy_name(&self) -> Option<&str> {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_filter_policy_name(self.raw, &mut len);
if len != 0 {
Some(str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len)))
} else {
None
}
}
}
pub fn comparator_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_comparator_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len))
}
}
pub fn merge_operator_name(&self) -> Option<&str> {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_merge_operator_name(self.raw, &mut len);
if len != 0 {
Some(str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len)))
} else {
None
}
}
}
pub fn prefix_extractor_name(&self) -> Option<&str> {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_prefix_extractor_name(self.raw, &mut len);
if len != 0 {
Some(str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len)))
} else {
None
}
}
}
pub fn property_collectors_names(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_property_collectors_names(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len))
}
}
pub fn compression_name(&self) -> &str {
let mut len = 0;
unsafe {
let ptr = ll::rocks_table_props_get_compression_name(self.raw, &mut len);
str::from_utf8_unchecked(slice::from_raw_parts(ptr as *const _, len))
}
}
pub fn user_collected_properties(&self) -> &UserCollectedProperties {
unsafe {
let raw_ptr = ll::rocks_table_props_get_user_collected_properties(self.raw);
&*(raw_ptr as *const UserCollectedProperties)
}
}
pub fn readable_properties(&self) -> &UserCollectedProperties {
unsafe {
let raw_ptr = ll::rocks_table_props_get_readable_properties(self.raw);
&*(raw_ptr as *const UserCollectedProperties)
}
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(C)]
pub enum EntryType {
EntryPut,
EntryDelete, EntrySingleDelete, EntryMerge,
EntryOther,
}
pub trait TablePropertiesCollector {
fn add_user_key(&mut self, key: &[u8], value: &[u8], type_: EntryType, seq: SequenceNumber, file_size: u64);
fn finish(&mut self, properties: &mut UserCollectedProperties);
fn name(&self) -> &str {
"RustTablePropertiesCollector\0"
}
fn readable_properties(&self) -> Vec<(String, String)> {
unimplemented!()
}
fn need_compact(&self) -> bool {
false
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[repr(C)]
pub struct Context {
pub column_family_id: u32,
}
pub trait TablePropertiesCollectorFactory {
fn new_collector(&mut self, context: Context) -> Box<dyn TablePropertiesCollector>;
fn name(&self) -> &str {
"RustTablePropertiesCollectorFactory\0"
}
}
#[doc(hidden)]
pub mod c {
use std::mem;
use std::os::raw::{c_char, c_int, c_uchar};
use super::*;
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_add_user_key(
c: *mut (),
key: &&[u8],
value: &&[u8],
type_: c_int,
seq: u64,
file_size: u64,
) {
assert!(!c.is_null());
let collector = c as *mut Box<dyn TablePropertiesCollector>;
(*collector).add_user_key(key, value, mem::transmute(type_), SequenceNumber(seq), file_size);
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_finish(c: *mut (), props: *mut UserCollectedProperties) {
assert!(!c.is_null());
let collector = c as *mut Box<dyn TablePropertiesCollector>;
props.as_mut().map(|p| (*collector).finish(p));
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_name(c: *mut ()) -> *const c_char {
assert!(!c.is_null());
let collector = c as *mut Box<dyn TablePropertiesCollector>;
(*collector).name().as_ptr() as *const _
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_need_compact(c: *mut ()) -> c_uchar {
assert!(!c.is_null());
let collector = c as *mut Box<dyn TablePropertiesCollector>;
(*collector).need_compact() as c_uchar
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_drop(f: *mut ()) {
assert!(!f.is_null());
let filter = f as *mut Box<dyn TablePropertiesCollector>;
Box::from_raw(filter);
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_factory_new_collector(
f: *mut (),
cf_id: u32,
) -> *mut Box<dyn TablePropertiesCollector> {
assert!(!f.is_null());
let factory = f as *mut Box<dyn TablePropertiesCollectorFactory>;
let collector = (*factory).new_collector(Context { column_family_id: cf_id });
Box::into_raw(Box::new(collector))
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_factory_name(f: *mut ()) -> *const c_char {
assert!(!f.is_null());
let factory = f as *mut Box<dyn TablePropertiesCollectorFactory>;
(*factory).name().as_ptr() as *const _
}
#[no_mangle]
pub unsafe extern "C" fn rust_table_props_collector_factory_drop(f: *mut ()) {
assert!(!f.is_null());
let filter = f as *mut Box<dyn TablePropertiesCollectorFactory>;
Box::from_raw(filter);
}
}
#[cfg(test)]
mod tests {
use std::iter;
use std::time;
use super::*;
use super::super::rocksdb::*;
#[derive(Default)]
pub struct MyTblPropsCollector {
counter: u32,
}
impl TablePropertiesCollector for MyTblPropsCollector {
fn add_user_key(&mut self, key: &[u8], value: &[u8], type_: EntryType, seq: SequenceNumber, file_size: u64) {
}
fn finish(&mut self, props: &mut UserCollectedProperties) {
props.insert("hello", b"world");
props.insert("sample_key", b"sample_value");
props.insert("test.counter", format!("{}", self.counter).as_bytes());
}
}
pub struct MyTblPropsCollectorFactory;
impl TablePropertiesCollectorFactory for MyTblPropsCollectorFactory {
fn new_collector(&mut self, context: Context) -> Box<dyn TablePropertiesCollector> {
Box::new(MyTblPropsCollector {
counter: time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap()
.subsec_nanos(),
})
}
}
#[test]
fn table_properties() {
let tmp_dir = ::tempdir::TempDir::new_in("", "rocks").unwrap();
let db = DB::open(
Options::default()
.map_db_options(|db| db.create_if_missing(true))
.map_cf_options(|cf| {
cf.disable_auto_compactions(true)
.table_properties_collector_factory(Box::new(MyTblPropsCollectorFactory))
}),
&tmp_dir,
).unwrap();
for i in 0..100 {
let key = format!("k{}", i);
let val = format!("v{}", i * i);
let value: String = iter::repeat(val).take(i * i).collect::<Vec<_>>().concat();
db.single_delete(&WriteOptions::default(), b"k5").unwrap();
db.put(&WriteOptions::default(), key.as_bytes(), value.as_bytes())
.unwrap();
assert!(db.flush(&FlushOptions::default().wait(true)).is_ok());
}
let props =
db.get_properties_of_tables_in_range(&db.default_column_family(), &[b"k0".as_ref()..b"k9".as_ref()]);
assert!(props.is_ok());
let props = props.unwrap();
assert!(props.len() > 0);
for (file, prop) in props.iter() {
assert!(file.ends_with(".sst"));
assert!(prop.property_collectors_names().contains(
"RustTablePropertiesCollectorFactory",
));
let user_prop = prop.user_collected_properties();
let diff_vals = user_prop.iter().collect::<Vec<_>>();
assert_eq!(&user_prop["hello"], b"world");
for (k, v) in user_prop.iter() {
assert!(k.len() > 0); assert!(v.len() > 0);
}
}
let mut files = props.iter().map(|(file, _)| file).collect::<Vec<_>>();
files.sort();
files.dedup(); assert_eq!(files.len(), 100);
let mut counters = props
.iter()
.map(|(_, props)| props.user_collected_properties()["test.counter"].to_vec())
.collect::<Vec<_>>();
counters.sort();
counters.dedup(); assert_eq!(counters.len(), 100);
}
}