use std::marker::PhantomData;
use std::mem;
use std::slice;
use rocks_sys as ll;
use crate::env::Logger;
#[repr(C)]
pub struct MergeOperationInput<'a> {
pub key: &'a &'a [u8],
pub existing_value: Option<&'a &'a [u8]>,
operand_list: *mut (),
logger: *mut (),
_marker: PhantomData<&'a ()>,
}
impl<'a> MergeOperationInput<'a> {
pub fn key(&self) -> &[u8] {
self.key
}
pub fn existing_value(&self) -> Option<&[u8]> {
self.existing_value.map(|&val| val)
}
pub fn operands(&self) -> &[&[u8]] {
unsafe {
slice::from_raw_parts(
ll::cxx_vector_slice_nth(self.operand_list as *const _, 0) as *const _,
ll::cxx_vector_slice_size(self.operand_list as *const _),
)
}
}
pub fn logger(&self) -> &Logger {
unimplemented!()
}
}
#[repr(C)]
pub struct MergeOperationOutput<'a> {
new_value: *mut (),
existing_operand: *mut &'a [u8],
}
impl<'a> MergeOperationOutput<'a> {
pub fn assign(&mut self, new_value: &[u8]) {
unsafe {
ll::cxx_string_assign(
self.new_value as *mut _,
new_value.as_ptr() as *const _,
new_value.len(),
);
}
}
pub fn assign_existing_operand(&mut self, old_value: &[u8]) {
unsafe {
*self.existing_operand = mem::transmute(old_value);
}
}
}
pub trait MergeOperator {
fn full_merge(&self, merge_in: &MergeOperationInput, merge_out: &mut MergeOperationOutput) -> bool {
false
}
fn name(&self) -> &str {
"RustMergeOperator\0"
}
}
pub trait AssociativeMergeOperator {
fn merge(&self, key: &[u8], existing_value: Option<&[u8]>, value: &[u8], logger: &Logger) -> Option<Vec<u8>>;
fn name(&self) -> &str {
"RustAssociativeMergeOperator\0"
}
}
#[doc(hidden)]
pub mod c {
use super::*;
#[no_mangle]
pub extern "C" fn rust_merge_operator_call_full_merge_v2(
op: *mut (),
merge_in: *const MergeOperationInput,
merge_out: *mut MergeOperationOutput,
) -> i32 {
assert!(!op.is_null());
unsafe {
let operator = op as *mut Box<dyn MergeOperator>;
let m_in: &MergeOperationInput = &*(merge_in as *const MergeOperationInput);
let m_out: &mut MergeOperationOutput = &mut *(merge_out as *mut MergeOperationOutput);
let ret = (*operator).full_merge(m_in, m_out);
ret as i32
}
}
#[no_mangle]
pub extern "C" fn rust_merge_operator_drop(op: *mut ()) {
assert!(!op.is_null());
unsafe {
let operator = op as *mut Box<dyn MergeOperator>;
Box::from_raw(operator);
}
}
#[no_mangle]
pub extern "C" fn rust_associative_merge_operator_call(
op: *mut (),
key: &&[u8],
existing_value: Option<&&[u8]>,
value: &&[u8],
new_value: *mut *const u8,
new_value_len: *mut usize,
logger: &Logger,
) -> i32 {
assert!(!op.is_null());
unsafe {
let operator = op as *mut Box<dyn AssociativeMergeOperator>;
let nval = (*operator).merge(*key, existing_value.map(|&s| s), *value, logger);
if let Some(val) = nval {
*new_value_len = val.len();
*new_value = val.as_ptr();
mem::forget(val);
true as _
} else {
false as _
}
}
}
#[no_mangle]
pub extern "C" fn rust_associative_merge_operator_name(op: *mut ()) -> *const u8 {
assert!(!op.is_null());
unsafe {
let operator = op as *mut Box<dyn AssociativeMergeOperator>;
(*operator).name().as_bytes().as_ptr()
}
}
#[no_mangle]
pub extern "C" fn rust_merge_operator_name(op: *mut ()) -> *const u8 {
assert!(!op.is_null());
unsafe {
let operator = op as *mut Box<dyn MergeOperator>;
(*operator).name().as_bytes().as_ptr()
}
}
#[no_mangle]
pub extern "C" fn rust_drop_vec_u8(base: *mut u8, len: usize) {
unsafe {
Vec::from_raw_parts(base, len, len);
}
}
#[no_mangle]
pub extern "C" fn rust_associative_merge_operator_drop(op: *mut ()) {
assert!(!op.is_null());
unsafe {
let operator = op as *mut Box<dyn AssociativeMergeOperator>;
Box::from_raw(operator);
}
}
}
#[cfg(test)]
mod tests {
use super::super::rocksdb::*;
use super::*;
pub struct MyAssocMergeOp;
impl AssociativeMergeOperator for MyAssocMergeOp {
fn merge(&self, key: &[u8], existing_value: Option<&[u8]>, value: &[u8], logger: &Logger) -> Option<Vec<u8>> {
Some(b"welcome to china".to_vec())
}
}
#[test]
fn it_works() {
let op: Box<dyn AssociativeMergeOperator> = Box::new(MyAssocMergeOp);
}
#[test]
fn assoc_merge() {
use tempdir::TempDir;
let tmp_dir = TempDir::new_in(".", "rocks").unwrap();
pub struct MyAssocMergeOp;
impl AssociativeMergeOperator for MyAssocMergeOp {
fn merge(
&self,
key: &[u8],
existing_value: Option<&[u8]>,
value: &[u8],
logger: &Logger,
) -> Option<Vec<u8>> {
let mut ret: Vec<u8> = existing_value.map(|s| s.into()).unwrap_or(b"HEAD".to_vec());
ret.push(b'|');
ret.extend_from_slice(value);
Some(ret)
}
}
let db = DB::open(
Options::default()
.map_db_options(|db| db.create_if_missing(true))
.map_cf_options(|cf| cf.associative_merge_operator(Box::new(MyAssocMergeOp))),
tmp_dir,
)
.unwrap();
let ret = db.merge(&WriteOptions::default(), b"name", b"value");
let ret = db.merge(&WriteOptions::default(), b"name", b"value2");
let ret = db.merge(&WriteOptions::default(), b"name", b"value3");
let ret = db.merge(&WriteOptions::default(), b"gender", b"male");
let ret = db.merge(&WriteOptions::default(), b"name", b"value4");
let ret = db.merge(&WriteOptions::default(), b"name", b"value");
let ret = db.get(&ReadOptions::default(), b"name");
assert_eq!(
String::from_utf8_lossy(ret.unwrap().as_ref()),
"HEAD|value|value2|value3|value4|value"
);
}
#[test]
fn merge_assign_concat_operands() {
use crate::merge_operator::{MergeOperationInput, MergeOperationOutput};
use tempdir::TempDir;
let tmp_dir = TempDir::new_in(".", "rocks").unwrap();
pub struct MyMergeOp;
impl MergeOperator for MyMergeOp {
fn full_merge(&self, merge_in: &MergeOperationInput, merge_out: &mut MergeOperationOutput) -> bool {
assert_eq!(merge_in.key, b"name");
let mut ret = b"KEY:".to_vec();
ret.extend_from_slice(merge_in.key);
ret.push(b'|');
assert_eq!(merge_in.operands().len(), 3);
for op in merge_in.operands() {
ret.extend_from_slice(op);
ret.push(b'+');
}
ret.push(b'|');
merge_out.assign(&ret);
true
}
}
let db = DB::open(
Options::default()
.map_db_options(|db| db.create_if_missing(true))
.map_cf_options(|cf| cf.merge_operator(Box::new(MyMergeOp))),
tmp_dir,
)
.unwrap();
let ret = db.merge(&WriteOptions::default(), b"name", b"value");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"new");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"last");
assert!(ret.is_ok());
let ret = db.get(&ReadOptions::default(), b"name");
assert_eq!(ret.unwrap().as_ref(), b"KEY:name|value+new+last+|");
}
#[test]
fn merge_assign_existing_operand() {
use crate::merge_operator::{MergeOperationInput, MergeOperationOutput};
let tmp_dir = ::tempdir::TempDir::new_in(".", "rocks").unwrap();
pub struct MyMergeOp;
impl MergeOperator for MyMergeOp {
fn full_merge(&self, merge_in: &MergeOperationInput, merge_out: &mut MergeOperationOutput) -> bool {
assert_eq!(merge_in.key, b"name");
assert_eq!(merge_in.operands().len(), 6);
let mut set = false;
for op in merge_in.operands() {
if op.starts_with(b"I-am-the-test") {
merge_out.assign_existing_operand(op);
set = true;
break;
}
}
assert!(set);
true
}
}
let db = DB::open(
Options::default()
.map_db_options(|db| db.create_if_missing(true))
.map_cf_options(|cf| cf.merge_operator(Box::new(MyMergeOp))),
&tmp_dir,
)
.unwrap();
let ret = db.merge(&WriteOptions::default(), b"name", b"randome-key");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"asdfkjasdkf");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"sadfjalskdfjlast");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"sadfjalskdfjlast");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"I-am-the-test-233");
assert!(ret.is_ok());
let ret = db.merge(&WriteOptions::default(), b"name", b"I-am-not-the-test");
assert!(ret.is_ok());
let ret = db.get(&ReadOptions::default(), b"name");
assert_eq!(ret.unwrap().as_ref(), b"I-am-the-test-233");
}
}