rocksdb/
merge_operator.rs

1// Copyright 2014 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! rustic merge operator
17//!
18//! ```
19//! use rocksdb::{Options, DB, MergeOperands};
20//!
21//! fn concat_merge(new_key: &[u8],
22//!                 existing_val: Option<&[u8]>,
23//!                 operands: &mut MergeOperands)
24//!                 -> Vec<u8> {
25//!
26//!    let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().0);
27//!    existing_val.map(|v| {
28//!        for e in v {
29//!            result.push(*e)
30//!        }
31//!    });
32//!    for op in operands {
33//!        for e in op {
34//!            result.push(*e)
35//!        }
36//!    }
37//!    result
38//! }
39//!
40//! fn main() {
41//!    let path = "path/to/rocksdb";
42//!    let mut opts = Options::default();
43//!    opts.create_if_missing(true);
44//!    opts.add_merge_operator("test operator", concat_merge);
45//!    let db = DB::open(&opts, path).unwrap();
46//!    let p = db.put(b"k1", b"a");
47//!    db.merge(b"k1", b"b");
48//!    db.merge(b"k1", b"c");
49//!    db.merge(b"k1", b"d");
50//!    db.merge(b"k1", b"efg");
51//!    let r = db.get(b"k1");
52//!    assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefg");
53//! }
54//! ```
55
56
57use libc::{self, c_char, c_int, c_void, size_t};
58use std::ffi::CString;
59use std::mem;
60use std::ptr;
61use std::slice;
62
63pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>;
64
65pub struct MergeOperatorCallback {
66    pub name: CString,
67    pub merge_fn: MergeFn,
68}
69
70pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) {
71    let _: Box<MergeOperatorCallback> = mem::transmute(raw_cb);
72}
73
74pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
75    let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
76    cb.name.as_ptr()
77}
78
79pub unsafe extern "C" fn full_merge_callback(raw_cb: *mut c_void,
80                                             raw_key: *const c_char,
81                                             key_len: size_t,
82                                             existing_value: *const c_char,
83                                             existing_value_len: size_t,
84                                             operands_list: *const *const c_char,
85                                             operands_list_len: *const size_t,
86                                             num_operands: c_int,
87                                             success: *mut u8,
88                                             new_value_length: *mut size_t)
89                                             -> *mut c_char {
90    let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
91    let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
92    let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
93    let oldval = slice::from_raw_parts(existing_value as *const u8, existing_value_len as usize);
94    let mut result = (cb.merge_fn)(key, Some(oldval), operands);
95    result.shrink_to_fit();
96    // TODO(tan) investigate zero-copy techniques to improve performance
97    let buf = libc::malloc(result.len() as size_t);
98    assert!(!buf.is_null());
99    *new_value_length = result.len() as size_t;
100    *success = 1 as u8;
101    ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len());
102    buf as *mut c_char
103}
104
105pub unsafe extern "C" fn partial_merge_callback(raw_cb: *mut c_void,
106                                                raw_key: *const c_char,
107                                                key_len: size_t,
108                                                operands_list: *const *const c_char,
109                                                operands_list_len: *const size_t,
110                                                num_operands: c_int,
111                                                success: *mut u8,
112                                                new_value_length: *mut size_t)
113                                                -> *mut c_char {
114    let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
115    let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
116    let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
117    let mut result = (cb.merge_fn)(key, None, operands);
118    result.shrink_to_fit();
119    // TODO(tan) investigate zero-copy techniques to improve performance
120    let buf = libc::malloc(result.len() as size_t);
121    assert!(!buf.is_null());
122    *new_value_length = result.len() as size_t;
123    *success = 1 as u8;
124    ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len());
125    buf as *mut c_char
126}
127
128
129pub struct MergeOperands {
130    operands_list: *const *const c_char,
131    operands_list_len: *const size_t,
132    num_operands: usize,
133    cursor: usize,
134}
135
136impl MergeOperands {
137    fn new(operands_list: *const *const c_char,
138           operands_list_len: *const size_t,
139           num_operands: c_int)
140           -> MergeOperands {
141        assert!(num_operands >= 0);
142        MergeOperands {
143            operands_list: operands_list,
144            operands_list_len: operands_list_len,
145            num_operands: num_operands as usize,
146            cursor: 0,
147        }
148    }
149}
150
151impl<'a> Iterator for &'a mut MergeOperands {
152    type Item = &'a [u8];
153
154    fn next(&mut self) -> Option<&'a [u8]> {
155        if self.cursor == self.num_operands {
156            None
157        } else {
158            unsafe {
159                let base = self.operands_list as usize;
160                let base_len = self.operands_list_len as usize;
161                let spacing = mem::size_of::<*const *const u8>();
162                let spacing_len = mem::size_of::<*const size_t>();
163                let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t;
164                let len = *len_ptr as usize;
165                let ptr = base + (spacing * self.cursor);
166                self.cursor += 1;
167                Some(mem::transmute(slice::from_raw_parts(*(ptr as *const *const u8) as *const u8,
168                                                          len)))
169            }
170        }
171    }
172
173    fn size_hint(&self) -> (usize, Option<usize>) {
174        let remaining = self.num_operands - self.cursor;
175        (remaining, Some(remaining))
176    }
177}
178
179#[cfg(test)]
180#[allow(unused_variables)]
181fn test_provided_merge(new_key: &[u8],
182                       existing_val: Option<&[u8]>,
183                       operands: &mut MergeOperands)
184                       -> Vec<u8> {
185    let nops = operands.size_hint().0;
186    let mut result: Vec<u8> = Vec::with_capacity(nops);
187    if let Some(v) = existing_val {
188        for e in v {
189            result.push(*e);
190        }
191    }
192    for op in operands {
193        for e in op {
194            result.push(*e);
195        }
196    }
197    result
198}
199
200#[test]
201fn mergetest() {
202    use {DB, Options};
203
204    let path = "_rust_rocksdb_mergetest";
205    let mut opts = Options::default();
206    opts.create_if_missing(true);
207    opts.set_merge_operator("test operator", test_provided_merge);
208    {
209        let db = DB::open(&opts, path).unwrap();
210        let p = db.put(b"k1", b"a");
211        assert!(p.is_ok());
212        let _ = db.merge(b"k1", b"b");
213        let _ = db.merge(b"k1", b"c");
214        let _ = db.merge(b"k1", b"d");
215        let _ = db.merge(b"k1", b"efg");
216        let m = db.merge(b"k1", b"h");
217        assert!(m.is_ok());
218        match db.get(b"k1") {
219            Ok(Some(value)) => {
220                match value.to_utf8() {
221                    Some(v) => println!("retrieved utf8 value: {}", v),
222                    None => println!("did not read valid utf-8 out of the db"),
223                }
224            }
225            Err(_) => println!("error reading value"),
226            _ => panic!("value not present"),
227        }
228
229        assert!(m.is_ok());
230        let r = db.get(b"k1");
231        assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh");
232        assert!(db.delete(b"k1").is_ok());
233        assert!(db.get(b"k1").unwrap().is_none());
234    }
235    assert!(DB::destroy(&opts, path).is_ok());
236}