Skip to main content

rust_rocksdb/
merge_operator.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16//! rustic merge operator
17//!
18//! ```
19//! use rust_rocksdb::{Options, DB, MergeOperands};
20//!
21//! fn concat_merge(new_key: &[u8],
22//!                 existing_val: Option<&[u8]>,
23//!                 operands: &MergeOperands)
24//!                 -> Option<Vec<u8>> {
25//!
26//!    let mut result: Vec<u8> = Vec::with_capacity(operands.len());
27//!    existing_val.map(|v| {
28//!        for e in v {
29//!            result.push(*e)
30//!        }
31//!    });
32//!    for op in operands {
33//!        for e in op {
34//!            result.push(*e)
35//!        }
36//!    }
37//!    Some(result)
38//! }
39//!
40//!let tempdir = tempfile::Builder::new()
41//!    .prefix("_rust_path_to_rocksdb")
42//!    .tempdir()
43//!    .expect("Failed to create temporary path for the _rust_path_to_rocksdb");
44//!let path = tempdir.path();
45//!let mut opts = Options::default();
46//!
47//!opts.create_if_missing(true);
48//!opts.set_merge_operator_associative("test operator", concat_merge);
49//!{
50//!    let db = DB::open(&opts, path).unwrap();
51//!    let p = db.put(b"k1", b"a");
52//!    db.merge(b"k1", b"b");
53//!    db.merge(b"k1", b"c");
54//!    db.merge(b"k1", b"d");
55//!    db.merge(b"k1", b"efg");
56//!    let r = db.get(b"k1");
57//!    assert_eq!(r.unwrap().unwrap(), b"abcdefg");
58//!}
59//!let _ = DB::destroy(&opts, path);
60//! ```
61
62use libc::{self, c_char, c_int, c_void, size_t};
63use std::ffi::CString;
64use std::ptr;
65use std::slice;
66
67pub trait MergeFn:
68    Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
69{
70}
71impl<F> MergeFn for F where
72    F: Fn(&[u8], Option<&[u8]>, &MergeOperands) -> Option<Vec<u8>> + Send + Sync + 'static
73{
74}
75
76pub struct MergeOperatorCallback<F: MergeFn, PF: MergeFn> {
77    pub name: CString,
78    pub full_merge_fn: F,
79    pub partial_merge_fn: PF,
80}
81
82pub unsafe extern "C" fn destructor_callback<F: MergeFn, PF: MergeFn>(raw_cb: *mut c_void) {
83    unsafe {
84        drop(Box::from_raw(raw_cb as *mut MergeOperatorCallback<F, PF>));
85    }
86}
87
88pub unsafe extern "C" fn delete_callback(
89    _raw_cb: *mut c_void,
90    value: *const c_char,
91    value_length: size_t,
92) {
93    unsafe {
94        if !value.is_null() {
95            // Use pointer form to avoid implicit cast from slice reference to raw slice pointer
96            drop(Box::from_raw(ptr::slice_from_raw_parts_mut(
97                value as *mut u8,
98                value_length,
99            )));
100        }
101    }
102}
103
104pub unsafe extern "C" fn name_callback<F: MergeFn, PF: MergeFn>(
105    raw_cb: *mut c_void,
106) -> *const c_char {
107    unsafe {
108        let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
109        cb.name.as_ptr()
110    }
111}
112
113pub unsafe extern "C" fn full_merge_callback<F: MergeFn, PF: MergeFn>(
114    raw_cb: *mut c_void,
115    raw_key: *const c_char,
116    key_len: size_t,
117    existing_value: *const c_char,
118    existing_value_len: size_t,
119    operands_list: *const *const c_char,
120    operands_list_len: *const size_t,
121    num_operands: c_int,
122    success: *mut u8,
123    new_value_length: *mut size_t,
124) -> *mut c_char {
125    unsafe {
126        let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
127        let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
128        let key = slice::from_raw_parts(raw_key as *const u8, key_len);
129        let oldval = if existing_value.is_null() {
130            None
131        } else {
132            Some(slice::from_raw_parts(
133                existing_value as *const u8,
134                existing_value_len,
135            ))
136        };
137        (cb.full_merge_fn)(key, oldval, operands).map_or_else(
138            || {
139                *new_value_length = 0;
140                *success = 0_u8;
141                ptr::null_mut() as *mut c_char
142            },
143            |result| {
144                *new_value_length = result.len() as size_t;
145                *success = 1_u8;
146                Box::into_raw(result.into_boxed_slice()) as *mut c_char
147            },
148        )
149    }
150}
151
152pub unsafe extern "C" fn partial_merge_callback<F: MergeFn, PF: MergeFn>(
153    raw_cb: *mut c_void,
154    raw_key: *const c_char,
155    key_len: size_t,
156    operands_list: *const *const c_char,
157    operands_list_len: *const size_t,
158    num_operands: c_int,
159    success: *mut u8,
160    new_value_length: *mut size_t,
161) -> *mut c_char {
162    unsafe {
163        let cb = &mut *(raw_cb as *mut MergeOperatorCallback<F, PF>);
164        let operands = &MergeOperands::new(operands_list, operands_list_len, num_operands);
165        let key = slice::from_raw_parts(raw_key as *const u8, key_len);
166        (cb.partial_merge_fn)(key, None, operands).map_or_else(
167            || {
168                *new_value_length = 0;
169                *success = 0_u8;
170                ptr::null_mut::<c_char>()
171            },
172            |result| {
173                *new_value_length = result.len() as size_t;
174                *success = 1_u8;
175                Box::into_raw(result.into_boxed_slice()) as *mut c_char
176            },
177        )
178    }
179}
180
181pub struct MergeOperands {
182    operands_list: *const *const c_char,
183    operands_list_len: *const size_t,
184    num_operands: usize,
185}
186
187impl MergeOperands {
188    fn new(
189        operands_list: *const *const c_char,
190        operands_list_len: *const size_t,
191        num_operands: c_int,
192    ) -> MergeOperands {
193        assert!(num_operands >= 0);
194        MergeOperands {
195            operands_list,
196            operands_list_len,
197            num_operands: num_operands as usize,
198        }
199    }
200
201    pub fn len(&self) -> usize {
202        self.num_operands
203    }
204
205    pub fn is_empty(&self) -> bool {
206        self.num_operands == 0
207    }
208
209    pub fn iter(&'_ self) -> MergeOperandsIter<'_> {
210        MergeOperandsIter {
211            operands: self,
212            cursor: 0,
213        }
214    }
215
216    fn get_operand(&self, index: usize) -> Option<&[u8]> {
217        if index >= self.num_operands {
218            None
219        } else {
220            unsafe {
221                let ptr = *self.operands_list.add(index);
222                let len = *self.operands_list_len.add(index);
223                Some(slice::from_raw_parts(ptr as *const u8, len))
224            }
225        }
226    }
227}
228
229pub struct MergeOperandsIter<'a> {
230    operands: &'a MergeOperands,
231    cursor: usize,
232}
233
234impl<'a> Iterator for MergeOperandsIter<'a> {
235    type Item = &'a [u8];
236
237    fn next(&mut self) -> Option<Self::Item> {
238        let operand = self.operands.get_operand(self.cursor)?;
239        self.cursor += 1;
240        Some(operand)
241    }
242
243    fn size_hint(&self) -> (usize, Option<usize>) {
244        let remaining = self.operands.num_operands - self.cursor;
245        (remaining, Some(remaining))
246    }
247}
248
249impl<'a> IntoIterator for &'a MergeOperands {
250    type Item = &'a [u8];
251    type IntoIter = MergeOperandsIter<'a>;
252
253    fn into_iter(self) -> Self::IntoIter {
254        Self::IntoIter {
255            operands: self,
256            cursor: 0,
257        }
258    }
259}