rust_rocksdb/
write_batch.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{ffi, AsColumnFamilyRef};
16use libc::{c_char, c_void, size_t};
17use std::slice;
18
19/// A type alias to keep compatibility. See [`WriteBatchWithTransaction`] for details
20pub type WriteBatch = WriteBatchWithTransaction<false>;
21
22/// An atomic batch of write operations.
23///
24/// [`delete_range`](#method.delete_range) is not supported in [`Transaction`].
25///
26/// Making an atomic commit of several writes:
27///
28/// ```
29/// use rust_rocksdb::{DB, Options, WriteBatchWithTransaction};
30///
31/// let tempdir = tempfile::Builder::new()
32///     .prefix("_path_for_rocksdb_storage1")
33///     .tempdir()
34///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage1");
35/// let path = tempdir.path();
36/// {
37///     let db = DB::open_default(path).unwrap();
38///     let mut batch = WriteBatchWithTransaction::<false>::default();
39///     batch.put(b"my key", b"my value");
40///     batch.put(b"key2", b"value2");
41///     batch.put(b"key3", b"value3");
42///
43///     // delete_range is supported when use without transaction
44///     batch.delete_range(b"key2", b"key3");
45///
46///     db.write(batch); // Atomically commits the batch
47/// }
48/// let _ = DB::destroy(&Options::default(), path);
49/// ```
50///
51/// [`Transaction`]: crate::Transaction
52pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
53    pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
54}
55
56/// Receives the puts and deletes of a write batch.
57///
58/// The application must provide an implementation of this trait when
59/// iterating the operations within a `WriteBatch`
60pub trait WriteBatchIterator {
61    /// Called with a key and value that were `put` into the batch.
62    fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>);
63    /// Called with a key that was `delete`d from the batch.
64    fn delete(&mut self, key: Box<[u8]>);
65}
66
67unsafe extern "C" fn writebatch_put_callback(
68    state: *mut c_void,
69    k: *const c_char,
70    klen: usize,
71    v: *const c_char,
72    vlen: usize,
73) {
74    // coerce the raw pointer back into a box, but "leak" it so we prevent
75    // freeing the resource before we are done with it
76    let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
77    let leaked_cb = Box::leak(boxed_cb);
78    let key = slice::from_raw_parts(k as *const u8, klen);
79    let value = slice::from_raw_parts(v as *const u8, vlen);
80    leaked_cb.put(
81        key.to_vec().into_boxed_slice(),
82        value.to_vec().into_boxed_slice(),
83    );
84}
85
86unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) {
87    // coerce the raw pointer back into a box, but "leak" it so we prevent
88    // freeing the resource before we are done with it
89    let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
90    let leaked_cb = Box::leak(boxed_cb);
91    let key = slice::from_raw_parts(k as *const u8, klen);
92    leaked_cb.delete(key.to_vec().into_boxed_slice());
93}
94
95impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
96    /// Create a new `WriteBatch` without allocating memory.
97    pub fn new() -> Self {
98        Self {
99            inner: unsafe { ffi::rocksdb_writebatch_create() },
100        }
101    }
102
103    /// Creates `WriteBatch` with the specified `capacity` in bytes. Allocates immediately.
104    pub fn with_capacity_bytes(capacity_bytes: usize) -> Self {
105        Self {
106            // zeroes from default constructor
107            // https://github.com/facebook/rocksdb/blob/0f35db55d86ea8699ea936c9e2a4e34c82458d6b/include/rocksdb/write_batch.h#L66
108            inner: unsafe { ffi::rocksdb_writebatch_create_with_params(capacity_bytes, 0, 0, 0) },
109        }
110    }
111
112    /// Construct with a reference to a byte array serialized by [`WriteBatch`].
113    pub fn from_data(data: &[u8]) -> Self {
114        unsafe {
115            let ptr = data.as_ptr();
116            let len = data.len();
117            Self {
118                inner: ffi::rocksdb_writebatch_create_from(
119                    ptr as *const libc::c_char,
120                    len as size_t,
121                ),
122            }
123        }
124    }
125
126    pub fn len(&self) -> usize {
127        unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
128    }
129
130    /// Return WriteBatch serialized size (in bytes).
131    pub fn size_in_bytes(&self) -> usize {
132        unsafe {
133            let mut batch_size: size_t = 0;
134            ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
135            batch_size
136        }
137    }
138
139    /// Return a reference to a byte array which represents a serialized version of the batch.
140    pub fn data(&self) -> &[u8] {
141        unsafe {
142            let mut batch_size: size_t = 0;
143            let batch_data = ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
144            std::slice::from_raw_parts(batch_data as _, batch_size)
145        }
146    }
147
148    pub fn is_empty(&self) -> bool {
149        self.len() == 0
150    }
151
152    /// Iterate the put and delete operations within this write batch. Note that
153    /// this does _not_ return an `Iterator` but instead will invoke the `put()`
154    /// and `delete()` member functions of the provided `WriteBatchIterator`
155    /// trait implementation.
156    pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) {
157        let state = Box::into_raw(Box::new(callbacks));
158        unsafe {
159            ffi::rocksdb_writebatch_iterate(
160                self.inner,
161                state as *mut c_void,
162                Some(writebatch_put_callback),
163                Some(writebatch_delete_callback),
164            );
165            // we must manually set the raw box free since there is no
166            // associated "destroy" callback for this object
167            drop(Box::from_raw(state));
168        }
169    }
170
171    /// Insert a value into the database under the given key.
172    pub fn put<K, V>(&mut self, key: K, value: V)
173    where
174        K: AsRef<[u8]>,
175        V: AsRef<[u8]>,
176    {
177        let key = key.as_ref();
178        let value = value.as_ref();
179
180        unsafe {
181            ffi::rocksdb_writebatch_put(
182                self.inner,
183                key.as_ptr() as *const c_char,
184                key.len() as size_t,
185                value.as_ptr() as *const c_char,
186                value.len() as size_t,
187            );
188        }
189    }
190
191    /// Insert a value into the specific column family of the database under the given key.
192    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
193    where
194        K: AsRef<[u8]>,
195        V: AsRef<[u8]>,
196    {
197        let key = key.as_ref();
198        let value = value.as_ref();
199
200        unsafe {
201            ffi::rocksdb_writebatch_put_cf(
202                self.inner,
203                cf.inner(),
204                key.as_ptr() as *const c_char,
205                key.len() as size_t,
206                value.as_ptr() as *const c_char,
207                value.len() as size_t,
208            );
209        }
210    }
211
212    /// Insert a value into the specific column family of the database
213    /// under the given key with timestamp.
214    pub fn put_cf_with_ts<K, V, S>(&mut self, cf: &impl AsColumnFamilyRef, key: K, ts: S, value: V)
215    where
216        K: AsRef<[u8]>,
217        V: AsRef<[u8]>,
218        S: AsRef<[u8]>,
219    {
220        let key = key.as_ref();
221        let value = value.as_ref();
222        let ts = ts.as_ref();
223        unsafe {
224            ffi::rocksdb_writebatch_put_cf_with_ts(
225                self.inner,
226                cf.inner(),
227                key.as_ptr() as *const c_char,
228                key.len() as size_t,
229                ts.as_ptr() as *const c_char,
230                ts.len() as size_t,
231                value.as_ptr() as *const c_char,
232                value.len() as size_t,
233            );
234        }
235    }
236
237    pub fn merge<K, V>(&mut self, key: K, value: V)
238    where
239        K: AsRef<[u8]>,
240        V: AsRef<[u8]>,
241    {
242        let key = key.as_ref();
243        let value = value.as_ref();
244
245        unsafe {
246            ffi::rocksdb_writebatch_merge(
247                self.inner,
248                key.as_ptr() as *const c_char,
249                key.len() as size_t,
250                value.as_ptr() as *const c_char,
251                value.len() as size_t,
252            );
253        }
254    }
255
256    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
257    where
258        K: AsRef<[u8]>,
259        V: AsRef<[u8]>,
260    {
261        let key = key.as_ref();
262        let value = value.as_ref();
263
264        unsafe {
265            ffi::rocksdb_writebatch_merge_cf(
266                self.inner,
267                cf.inner(),
268                key.as_ptr() as *const c_char,
269                key.len() as size_t,
270                value.as_ptr() as *const c_char,
271                value.len() as size_t,
272            );
273        }
274    }
275
276    /// Removes the database entry for key. Does nothing if the key was not found.
277    pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
278        let key = key.as_ref();
279
280        unsafe {
281            ffi::rocksdb_writebatch_delete(
282                self.inner,
283                key.as_ptr() as *const c_char,
284                key.len() as size_t,
285            );
286        }
287    }
288
289    /// Removes the database entry in the specific column family for key.
290    /// Does nothing if the key was not found.
291    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
292        let key = key.as_ref();
293
294        unsafe {
295            ffi::rocksdb_writebatch_delete_cf(
296                self.inner,
297                cf.inner(),
298                key.as_ptr() as *const c_char,
299                key.len() as size_t,
300            );
301        }
302    }
303
304    /// Removes the database entry in the specific column family with timestamp for key.
305    /// Does nothing if the key was not found.
306    pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
307        &mut self,
308        cf: &impl AsColumnFamilyRef,
309        key: K,
310        ts: S,
311    ) {
312        let key = key.as_ref();
313        let ts = ts.as_ref();
314        unsafe {
315            ffi::rocksdb_writebatch_delete_cf_with_ts(
316                self.inner,
317                cf.inner(),
318                key.as_ptr() as *const c_char,
319                key.len() as size_t,
320                ts.as_ptr() as *const c_char,
321                ts.len() as size_t,
322            );
323        }
324    }
325
326    // Append a blob of arbitrary size to the records in this batch. The blob will
327    // be stored in the transaction log but not in any other file. In particular,
328    // it will not be persisted to the SST files. When iterating over this
329    // WriteBatch, WriteBatch::Handler::LogData will be called with the contents
330    // of the blob as it is encountered. Blobs, puts, deletes, and merges will be
331    // encountered in the same order in which they were inserted. The blob will
332    // NOT consume sequence number(s) and will NOT increase the count of the batch
333    //
334    // Example application: add timestamps to the transaction log for use in
335    // replication.
336    pub fn put_log_data<V: AsRef<[u8]>>(&mut self, log_data: V) {
337        let log_data = log_data.as_ref();
338
339        unsafe {
340            ffi::rocksdb_writebatch_put_log_data(
341                self.inner,
342                log_data.as_ptr() as *const c_char,
343                log_data.len() as size_t,
344            );
345        }
346    }
347
348    /// Clear all updates buffered in this batch.
349    pub fn clear(&mut self) {
350        unsafe {
351            ffi::rocksdb_writebatch_clear(self.inner);
352        }
353    }
354}
355
356impl WriteBatchWithTransaction<false> {
357    /// Remove database entries from start key to end key.
358    ///
359    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
360    /// including "begin_key" and excluding "end_key". It is not an error if no
361    /// keys exist in the range ["begin_key", "end_key").
362    pub fn delete_range<K: AsRef<[u8]>>(&mut self, from: K, to: K) {
363        let (start_key, end_key) = (from.as_ref(), to.as_ref());
364
365        unsafe {
366            ffi::rocksdb_writebatch_delete_range(
367                self.inner,
368                start_key.as_ptr() as *const c_char,
369                start_key.len() as size_t,
370                end_key.as_ptr() as *const c_char,
371                end_key.len() as size_t,
372            );
373        }
374    }
375
376    /// Remove database entries in column family from start key to end key.
377    ///
378    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
379    /// including "begin_key" and excluding "end_key". It is not an error if no
380    /// keys exist in the range ["begin_key", "end_key").
381    pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
382        let (start_key, end_key) = (from.as_ref(), to.as_ref());
383
384        unsafe {
385            ffi::rocksdb_writebatch_delete_range_cf(
386                self.inner,
387                cf.inner(),
388                start_key.as_ptr() as *const c_char,
389                start_key.len() as size_t,
390                end_key.as_ptr() as *const c_char,
391                end_key.len() as size_t,
392            );
393        }
394    }
395}
396
397impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
398    fn default() -> Self {
399        Self::new()
400    }
401}
402
403impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
404    fn drop(&mut self) {
405        unsafe {
406            ffi::rocksdb_writebatch_destroy(self.inner);
407        }
408    }
409}
410
411unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}