rocksdb/
merge_operator.rs1use libc::{self, c_char, c_int, c_void, size_t};
58use std::ffi::CString;
59use std::mem;
60use std::ptr;
61use std::slice;
62
63pub type MergeFn = fn(&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>;
64
65pub struct MergeOperatorCallback {
66 pub name: CString,
67 pub merge_fn: MergeFn,
68}
69
70pub unsafe extern "C" fn destructor_callback(raw_cb: *mut c_void) {
71 let _: Box<MergeOperatorCallback> = mem::transmute(raw_cb);
72}
73
74pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
75 let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
76 cb.name.as_ptr()
77}
78
79pub unsafe extern "C" fn full_merge_callback(raw_cb: *mut c_void,
80 raw_key: *const c_char,
81 key_len: size_t,
82 existing_value: *const c_char,
83 existing_value_len: size_t,
84 operands_list: *const *const c_char,
85 operands_list_len: *const size_t,
86 num_operands: c_int,
87 success: *mut u8,
88 new_value_length: *mut size_t)
89 -> *mut c_char {
90 let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
91 let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
92 let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
93 let oldval = slice::from_raw_parts(existing_value as *const u8, existing_value_len as usize);
94 let mut result = (cb.merge_fn)(key, Some(oldval), operands);
95 result.shrink_to_fit();
96 let buf = libc::malloc(result.len() as size_t);
98 assert!(!buf.is_null());
99 *new_value_length = result.len() as size_t;
100 *success = 1 as u8;
101 ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len());
102 buf as *mut c_char
103}
104
105pub unsafe extern "C" fn partial_merge_callback(raw_cb: *mut c_void,
106 raw_key: *const c_char,
107 key_len: size_t,
108 operands_list: *const *const c_char,
109 operands_list_len: *const size_t,
110 num_operands: c_int,
111 success: *mut u8,
112 new_value_length: *mut size_t)
113 -> *mut c_char {
114 let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
115 let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
116 let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
117 let mut result = (cb.merge_fn)(key, None, operands);
118 result.shrink_to_fit();
119 let buf = libc::malloc(result.len() as size_t);
121 assert!(!buf.is_null());
122 *new_value_length = result.len() as size_t;
123 *success = 1 as u8;
124 ptr::copy(result.as_ptr() as *mut c_void, &mut *buf, result.len());
125 buf as *mut c_char
126}
127
128
129pub struct MergeOperands {
130 operands_list: *const *const c_char,
131 operands_list_len: *const size_t,
132 num_operands: usize,
133 cursor: usize,
134}
135
136impl MergeOperands {
137 fn new(operands_list: *const *const c_char,
138 operands_list_len: *const size_t,
139 num_operands: c_int)
140 -> MergeOperands {
141 assert!(num_operands >= 0);
142 MergeOperands {
143 operands_list: operands_list,
144 operands_list_len: operands_list_len,
145 num_operands: num_operands as usize,
146 cursor: 0,
147 }
148 }
149}
150
151impl<'a> Iterator for &'a mut MergeOperands {
152 type Item = &'a [u8];
153
154 fn next(&mut self) -> Option<&'a [u8]> {
155 if self.cursor == self.num_operands {
156 None
157 } else {
158 unsafe {
159 let base = self.operands_list as usize;
160 let base_len = self.operands_list_len as usize;
161 let spacing = mem::size_of::<*const *const u8>();
162 let spacing_len = mem::size_of::<*const size_t>();
163 let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t;
164 let len = *len_ptr as usize;
165 let ptr = base + (spacing * self.cursor);
166 self.cursor += 1;
167 Some(mem::transmute(slice::from_raw_parts(*(ptr as *const *const u8) as *const u8,
168 len)))
169 }
170 }
171 }
172
173 fn size_hint(&self) -> (usize, Option<usize>) {
174 let remaining = self.num_operands - self.cursor;
175 (remaining, Some(remaining))
176 }
177}
178
179#[cfg(test)]
180#[allow(unused_variables)]
181fn test_provided_merge(new_key: &[u8],
182 existing_val: Option<&[u8]>,
183 operands: &mut MergeOperands)
184 -> Vec<u8> {
185 let nops = operands.size_hint().0;
186 let mut result: Vec<u8> = Vec::with_capacity(nops);
187 if let Some(v) = existing_val {
188 for e in v {
189 result.push(*e);
190 }
191 }
192 for op in operands {
193 for e in op {
194 result.push(*e);
195 }
196 }
197 result
198}
199
200#[test]
201fn mergetest() {
202 use {DB, Options};
203
204 let path = "_rust_rocksdb_mergetest";
205 let mut opts = Options::default();
206 opts.create_if_missing(true);
207 opts.set_merge_operator("test operator", test_provided_merge);
208 {
209 let db = DB::open(&opts, path).unwrap();
210 let p = db.put(b"k1", b"a");
211 assert!(p.is_ok());
212 let _ = db.merge(b"k1", b"b");
213 let _ = db.merge(b"k1", b"c");
214 let _ = db.merge(b"k1", b"d");
215 let _ = db.merge(b"k1", b"efg");
216 let m = db.merge(b"k1", b"h");
217 assert!(m.is_ok());
218 match db.get(b"k1") {
219 Ok(Some(value)) => {
220 match value.to_utf8() {
221 Some(v) => println!("retrieved utf8 value: {}", v),
222 None => println!("did not read valid utf-8 out of the db"),
223 }
224 }
225 Err(_) => println!("error reading value"),
226 _ => panic!("value not present"),
227 }
228
229 assert!(m.is_ok());
230 let r = db.get(b"k1");
231 assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh");
232 assert!(db.delete(b"k1").is_ok());
233 assert!(db.get(b"k1").unwrap().is_none());
234 }
235 assert!(DB::destroy(&opts, path).is_ok());
236}