ckb_rocksdb/
merge_operator.rs

1// Copyright 2014 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! rustic merge operator
17//!
18//! ```
19//! use ckb_rocksdb::{prelude::*, MergeOperands};
20//! # use ckb_rocksdb::TemporaryDBPath;
21//!
22//! fn concat_merge(new_key: &[u8],
23//!                 existing_val: Option<&[u8]>,
24//!                 operands: &mut MergeOperands)
25//!                 -> Option<Vec<u8>> {
26//!
27//!    let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
28//!    existing_val.map(|v| {
29//!        for e in v {
30//!            result.push(*e)
31//!        }
32//!    });
33//!    for op in operands {
34//!        for e in op {
35//!            result.push(*e)
36//!        }
37//!    }
38//!    Some(result)
39//! }
40//!
41//! fn main() {
42//!   let path = "_rust_path_to_rocksdb";
43//! # let path = TemporaryDBPath::new();
44//!   let mut opts = Options::default();
45//!   opts.create_if_missing(true);
46//!   opts.set_merge_operator_associative("test operator", concat_merge);
47//! # {
48
49//!   let db = DB::open(&opts, &path).unwrap();
50//!   let p = db.put(b"k1", b"a");
51//!   db.merge(b"k1", b"b");
52//!   db.merge(b"k1", b"c");
53//!   db.merge(b"k1", b"d");
54//!   db.merge(b"k1", b"efg");
55//!   let r = db.get(b"k1");
56//!   assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefg");
57
58//! # }
59//! }
60//! ```
61
62use libc::{self, c_char, c_int, c_void, size_t};
63use std::ffi::CString;
64use std::mem;
65use std::ptr;
66use std::slice;
67
68pub trait MergeFn:
69    Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
70{
71}
72impl<F> MergeFn for F where
73    F: Fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
74{
75}
76
77pub struct MergeOperatorCallback<F: MergeFn, PF: MergeFn> {
78    pub name: CString,
79    pub full_merge_fn: F,
80    pub partial_merge_fn: PF,
81}
82
83pub unsafe extern "C" fn destructor_callback<F: MergeFn, PF: MergeFn>(raw_cb: *mut c_void) {
84    unsafe {
85        let _: Box<MergeOperatorCallback<F, PF>> =
86            Box::from_raw(raw_cb as *mut MergeOperatorCallback<F, PF>);
87    }
88}
89
90pub unsafe extern "C" fn delete_callback(
91    _raw_cb: *mut c_void,
92    value: *const c_char,
93    value_length: size_t,
94) {
95    unsafe {
96        if !value.is_null() {
97            let _ = Box::from_raw(slice::from_raw_parts_mut(value as *mut u8, value_length));
98        }
99    }
100}
101
102pub unsafe extern "C" fn name_callback<F: MergeFn, PF: MergeFn>(
103    raw_cb: *mut c_void,
104) -> *const c_char {
105    unsafe {
106        let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
107        cb.name.as_ptr()
108    }
109}
110
111pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
112    raw_cb: *mut c_void,
113    raw_key: *const c_char,
114    key_len: size_t,
115    existing_value: *const c_char,
116    existing_value_len: size_t,
117    operands_list: *const *const c_char,
118    operands_list_len: *const size_t,
119    num_operands: c_int,
120    success: *mut u8,
121    new_value_length: *mut size_t,
122) -> *mut c_char {
123    unsafe {
124        let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
125        let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
126        let key = slice::from_raw_parts(raw_key as *const u8, key_len);
127        let oldval = if existing_value.is_null() {
128            None
129        } else {
130            Some(slice::from_raw_parts(
131                existing_value as *const u8,
132                existing_value_len,
133            ))
134        };
135        (cb.full_merge_fn)(key, oldval, operands).map_or_else(
136            || {
137                *new_value_length = 0;
138                *success = 0_u8;
139                ptr::null_mut() as *mut c_char
140            },
141            |result| {
142                *new_value_length = result.len() as size_t;
143                *success = 1_u8;
144                Box::into_raw(result.into_boxed_slice()) as *mut c_char
145            },
146        )
147    }
148}
149
150pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
151    raw_cb: *mut c_void,
152    raw_key: *const c_char,
153    key_len: size_t,
154    operands_list: *const *const c_char,
155    operands_list_len: *const size_t,
156    num_operands: c_int,
157    success: *mut u8,
158    new_value_length: *mut size_t,
159) -> *mut c_char {
160    unsafe {
161        let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
162        let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
163        let key = slice::from_raw_parts(raw_key as *const u8, key_len);
164        (cb.partial_merge_fn)(key, None, operands).map_or_else(
165            || {
166                *new_value_length = 0;
167                *success = 0_u8;
168                ptr::null_mut::<c_char>()
169            },
170            |result| {
171                *new_value_length = result.len() as size_t;
172                *success = 1_u8;
173                Box::into_raw(result.into_boxed_slice()) as *mut c_char
174            },
175        )
176    }
177}
178
179pub struct MergeOperands {
180    operands_list: *const *const c_char,
181    operands_list_len: *const size_t,
182    num_operands: usize,
183    cursor: usize,
184}
185
186impl MergeOperands {
187    fn new(
188        operands_list: *const *const c_char,
189        operands_list_len: *const size_t,
190        num_operands: c_int,
191    ) -> MergeOperands {
192        assert!(num_operands >= 0);
193        MergeOperands {
194            operands_list,
195            operands_list_len,
196            num_operands: num_operands as usize,
197            cursor: 0,
198        }
199    }
200}
201
202impl<'a> Iterator for &'a mut MergeOperands {
203    type Item = &'a [u8];
204
205    fn next(&mut self) -> Option<&'a [u8]> {
206        if self.cursor == self.num_operands {
207            None
208        } else {
209            unsafe {
210                let base = self.operands_list as usize;
211                let base_len = self.operands_list_len as usize;
212                let spacing = mem::size_of::<*const *const u8>();
213                let spacing_len = mem::size_of::<*const size_t>();
214                let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t;
215                let len = *len_ptr;
216                let ptr = base + (spacing * self.cursor);
217                self.cursor += 1;
218                Some(slice::from_raw_parts(*(ptr as *const *const u8), len))
219            }
220        }
221    }
222
223    fn size_hint(&self) -> (usize, Option<usize>) {
224        let remaining = self.num_operands - self.cursor;
225        (remaining, Some(remaining))
226    }
227}