rust_rocksdb/
write_batch.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{ffi, AsColumnFamilyRef};
16use libc::{c_char, c_void, size_t};
17use std::slice;
18
19/// A type alias to keep compatibility. See [`WriteBatchWithTransaction`] for details
20pub type WriteBatch = WriteBatchWithTransaction<false>;
21
22/// An atomic batch of write operations.
23///
24/// [`delete_range`](#method.delete_range) is not supported in [`Transaction`].
25///
26/// Making an atomic commit of several writes:
27///
28/// ```
29/// use rust_rocksdb::{DB, Options, WriteBatchWithTransaction};
30///
31/// let tempdir = tempfile::Builder::new()
32///     .prefix("_path_for_rocksdb_storage1")
33///     .tempdir()
34///     .expect("Failed to create temporary path for the _path_for_rocksdb_storage1");
35/// let path = tempdir.path();
36/// {
37///     let db = DB::open_default(path).unwrap();
38///     let mut batch = WriteBatchWithTransaction::<false>::default();
39///     batch.put(b"my key", b"my value");
40///     batch.put(b"key2", b"value2");
41///     batch.put(b"key3", b"value3");
42///
43///     // delete_range is supported when use without transaction
44///     batch.delete_range(b"key2", b"key3");
45///
46///     db.write(&batch); // Atomically commits the batch
47/// }
48/// let _ = DB::destroy(&Options::default(), path);
49/// ```
50///
51/// [`Transaction`]: crate::Transaction
52pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
53    pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
54}
55
56/// Receives the puts and deletes of a write batch.
57///
58/// The application must provide an implementation of this trait when
59/// iterating the operations within a `WriteBatch`
60pub trait WriteBatchIterator {
61    /// Called with a key and value that were `put` into the batch.
62    fn put(&mut self, key: &[u8], value: &[u8]);
63    /// Called with a key that was `delete`d from the batch.
64    fn delete(&mut self, key: &[u8]);
65}
66
67/// Receives the puts, deletes, and merges of a write batch with column family
68/// information.
69///
70/// This trait extends write batch iteration to support column family-specific
71/// operations. The application must implement this trait when iterating
72/// operations within a WriteBatch that contains column family-aware writes.
73///
74/// Note that for the default column family "default", the column family ID is 0.
75pub trait WriteBatchIteratorCf {
76    /// Called with a column family ID, key, and value that were put into
77    /// the specific column family of the batch.
78    fn put_cf(&mut self, cf_id: u32, key: &[u8], value: &[u8]);
79    /// Called with a column family ID and key that were `delete`d from the
80    /// specific column family of the batch.
81    fn delete_cf(&mut self, cf_id: u32, key: &[u8]);
82    /// Called with a column family ID, key, and value that were `merge`d into
83    /// the specific column family of the batch.
84    /// Merge operations combine the provided value with the existing value at
85    /// the key using a database-defined merge operator.
86    fn merge_cf(&mut self, cf_id: u32, key: &[u8], value: &[u8]);
87}
88
89unsafe extern "C" fn writebatch_put_callback<T: WriteBatchIterator>(
90    state: *mut c_void,
91    k: *const c_char,
92    klen: usize,
93    v: *const c_char,
94    vlen: usize,
95) {
96    unsafe {
97        let callbacks = &mut *(state as *mut T);
98        let key = slice::from_raw_parts(k as *const u8, klen);
99        let value = slice::from_raw_parts(v as *const u8, vlen);
100        callbacks.put(key, value);
101    }
102}
103
104unsafe extern "C" fn writebatch_delete_callback<T: WriteBatchIterator>(
105    state: *mut c_void,
106    k: *const c_char,
107    klen: usize,
108) {
109    unsafe {
110        let callbacks = &mut *(state as *mut T);
111        let key = slice::from_raw_parts(k as *const u8, klen);
112        callbacks.delete(key);
113    }
114}
115
116unsafe extern "C" fn writebatch_put_cf_callback<T: WriteBatchIteratorCf>(
117    state: *mut c_void,
118    cfid: u32,
119    k: *const c_char,
120    klen: usize,
121    v: *const c_char,
122    vlen: usize,
123) {
124    unsafe {
125        let callbacks = &mut *(state as *mut T);
126        let key = slice::from_raw_parts(k as *const u8, klen);
127        let value = slice::from_raw_parts(v as *const u8, vlen);
128        callbacks.put_cf(cfid, key, value);
129    }
130}
131
132unsafe extern "C" fn writebatch_delete_cf_callback<T: WriteBatchIteratorCf>(
133    state: *mut c_void,
134    cfid: u32,
135    k: *const c_char,
136    klen: usize,
137) {
138    unsafe {
139        let callbacks = &mut *(state as *mut T);
140        let key = slice::from_raw_parts(k as *const u8, klen);
141        callbacks.delete_cf(cfid, key);
142    }
143}
144
145unsafe extern "C" fn writebatch_merge_cf_callback<T: WriteBatchIteratorCf>(
146    state: *mut c_void,
147    cfid: u32,
148    k: *const c_char,
149    klen: usize,
150    v: *const c_char,
151    vlen: usize,
152) {
153    unsafe {
154        let callbacks = &mut *(state as *mut T);
155        let key = slice::from_raw_parts(k as *const u8, klen);
156        let value = slice::from_raw_parts(v as *const u8, vlen);
157        callbacks.merge_cf(cfid, key, value);
158    }
159}
160
161impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
162    /// Create a new `WriteBatch` without allocating memory.
163    pub fn new() -> Self {
164        Self {
165            inner: unsafe { ffi::rocksdb_writebatch_create() },
166        }
167    }
168
169    /// Creates `WriteBatch` with the specified `capacity` in bytes. Allocates immediately.
170    pub fn with_capacity_bytes(capacity_bytes: usize) -> Self {
171        Self {
172            // zeroes from default constructor
173            // https://github.com/facebook/rocksdb/blob/0f35db55d86ea8699ea936c9e2a4e34c82458d6b/include/rocksdb/write_batch.h#L66
174            inner: unsafe { ffi::rocksdb_writebatch_create_with_params(capacity_bytes, 0, 0, 0) },
175        }
176    }
177
178    /// Construct with a reference to a byte array serialized by [`WriteBatch`].
179    pub fn from_data(data: &[u8]) -> Self {
180        unsafe {
181            let ptr = data.as_ptr();
182            let len = data.len();
183            Self {
184                inner: ffi::rocksdb_writebatch_create_from(
185                    ptr as *const libc::c_char,
186                    len as size_t,
187                ),
188            }
189        }
190    }
191
192    pub fn len(&self) -> usize {
193        unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
194    }
195
196    /// Return WriteBatch serialized size (in bytes).
197    pub fn size_in_bytes(&self) -> usize {
198        unsafe {
199            let mut batch_size: size_t = 0;
200            ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
201            batch_size
202        }
203    }
204
205    /// Return a reference to a byte array which represents a serialized version of the batch.
206    pub fn data(&self) -> &[u8] {
207        unsafe {
208            let mut batch_size: size_t = 0;
209            let batch_data = ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
210            std::slice::from_raw_parts(batch_data as _, batch_size)
211        }
212    }
213
214    pub fn is_empty(&self) -> bool {
215        self.len() == 0
216    }
217
218    /// Iterate the put and delete operations within this write batch. Note that
219    /// this does _not_ return an `Iterator` but instead will invoke the `put()`
220    /// and `delete()` member functions of the provided `WriteBatchIterator`
221    /// trait implementation.
222    pub fn iterate<T: WriteBatchIterator>(&self, callbacks: &mut T) {
223        let state = std::ptr::from_mut::<T>(callbacks) as *mut c_void;
224        unsafe {
225            ffi::rocksdb_writebatch_iterate(
226                self.inner,
227                state,
228                Some(writebatch_put_callback::<T>),
229                Some(writebatch_delete_callback::<T>),
230            );
231        }
232    }
233
234    /// Iterate the put, delete, and merge operations within this write batch with column family
235    /// information. Note that this does _not_ return an `Iterator` but instead will invoke the
236    /// `put_cf()`, `delete_cf()`, and `merge_cf()` member functions of the provided
237    /// `WriteBatchIteratorCf` trait implementation.
238    ///
239    /// # Notes
240    /// - For operations on the default column family ("default"), the `cf_id` parameter passed to
241    ///   the callbacks will be 0
242    pub fn iterate_cf<T: WriteBatchIteratorCf>(&self, callbacks: &mut T) {
243        let state = std::ptr::from_mut::<T>(callbacks) as *mut c_void;
244        unsafe {
245            ffi::rocksdb_writebatch_iterate_cf(
246                self.inner,
247                state,
248                Some(writebatch_put_cf_callback::<T>),
249                Some(writebatch_delete_cf_callback::<T>),
250                Some(writebatch_merge_cf_callback::<T>),
251            );
252        }
253    }
254
255    /// Insert a value into the database under the given key.
256    pub fn put<K, V>(&mut self, key: K, value: V)
257    where
258        K: AsRef<[u8]>,
259        V: AsRef<[u8]>,
260    {
261        let key = key.as_ref();
262        let value = value.as_ref();
263
264        unsafe {
265            ffi::rocksdb_writebatch_put(
266                self.inner,
267                key.as_ptr() as *const c_char,
268                key.len() as size_t,
269                value.as_ptr() as *const c_char,
270                value.len() as size_t,
271            );
272        }
273    }
274
275    /// Insert a value into the specific column family of the database under the given key.
276    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
277    where
278        K: AsRef<[u8]>,
279        V: AsRef<[u8]>,
280    {
281        let key = key.as_ref();
282        let value = value.as_ref();
283
284        unsafe {
285            ffi::rocksdb_writebatch_put_cf(
286                self.inner,
287                cf.inner(),
288                key.as_ptr() as *const c_char,
289                key.len() as size_t,
290                value.as_ptr() as *const c_char,
291                value.len() as size_t,
292            );
293        }
294    }
295
296    /// Insert a value into the specific column family of the database
297    /// under the given key with timestamp.
298    pub fn put_cf_with_ts<K, V, S>(&mut self, cf: &impl AsColumnFamilyRef, key: K, ts: S, value: V)
299    where
300        K: AsRef<[u8]>,
301        V: AsRef<[u8]>,
302        S: AsRef<[u8]>,
303    {
304        let key = key.as_ref();
305        let value = value.as_ref();
306        let ts = ts.as_ref();
307        unsafe {
308            ffi::rocksdb_writebatch_put_cf_with_ts(
309                self.inner,
310                cf.inner(),
311                key.as_ptr() as *const c_char,
312                key.len() as size_t,
313                ts.as_ptr() as *const c_char,
314                ts.len() as size_t,
315                value.as_ptr() as *const c_char,
316                value.len() as size_t,
317            );
318        }
319    }
320
321    pub fn merge<K, V>(&mut self, key: K, value: V)
322    where
323        K: AsRef<[u8]>,
324        V: AsRef<[u8]>,
325    {
326        let key = key.as_ref();
327        let value = value.as_ref();
328
329        unsafe {
330            ffi::rocksdb_writebatch_merge(
331                self.inner,
332                key.as_ptr() as *const c_char,
333                key.len() as size_t,
334                value.as_ptr() as *const c_char,
335                value.len() as size_t,
336            );
337        }
338    }
339
340    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
341    where
342        K: AsRef<[u8]>,
343        V: AsRef<[u8]>,
344    {
345        let key = key.as_ref();
346        let value = value.as_ref();
347
348        unsafe {
349            ffi::rocksdb_writebatch_merge_cf(
350                self.inner,
351                cf.inner(),
352                key.as_ptr() as *const c_char,
353                key.len() as size_t,
354                value.as_ptr() as *const c_char,
355                value.len() as size_t,
356            );
357        }
358    }
359
360    /// Removes the database entry for key. Does nothing if the key was not found.
361    pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
362        let key = key.as_ref();
363
364        unsafe {
365            ffi::rocksdb_writebatch_delete(
366                self.inner,
367                key.as_ptr() as *const c_char,
368                key.len() as size_t,
369            );
370        }
371    }
372
373    /// Removes the database entry in the specific column family for key.
374    /// Does nothing if the key was not found.
375    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
376        let key = key.as_ref();
377
378        unsafe {
379            ffi::rocksdb_writebatch_delete_cf(
380                self.inner,
381                cf.inner(),
382                key.as_ptr() as *const c_char,
383                key.len() as size_t,
384            );
385        }
386    }
387
388    /// Removes the database entry in the specific column family with timestamp for key.
389    /// Does nothing if the key was not found.
390    pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
391        &mut self,
392        cf: &impl AsColumnFamilyRef,
393        key: K,
394        ts: S,
395    ) {
396        let key = key.as_ref();
397        let ts = ts.as_ref();
398        unsafe {
399            ffi::rocksdb_writebatch_delete_cf_with_ts(
400                self.inner,
401                cf.inner(),
402                key.as_ptr() as *const c_char,
403                key.len() as size_t,
404                ts.as_ptr() as *const c_char,
405                ts.len() as size_t,
406            );
407        }
408    }
409
410    // Append a blob of arbitrary size to the records in this batch. The blob will
411    // be stored in the transaction log but not in any other file. In particular,
412    // it will not be persisted to the SST files. When iterating over this
413    // WriteBatch, WriteBatch::Handler::LogData will be called with the contents
414    // of the blob as it is encountered. Blobs, puts, deletes, and merges will be
415    // encountered in the same order in which they were inserted. The blob will
416    // NOT consume sequence number(s) and will NOT increase the count of the batch
417    //
418    // Example application: add timestamps to the transaction log for use in
419    // replication.
420    pub fn put_log_data<V: AsRef<[u8]>>(&mut self, log_data: V) {
421        let log_data = log_data.as_ref();
422
423        unsafe {
424            ffi::rocksdb_writebatch_put_log_data(
425                self.inner,
426                log_data.as_ptr() as *const c_char,
427                log_data.len() as size_t,
428            );
429        }
430    }
431
432    /// Clear all updates buffered in this batch.
433    pub fn clear(&mut self) {
434        unsafe {
435            ffi::rocksdb_writebatch_clear(self.inner);
436        }
437    }
438}
439
440impl WriteBatchWithTransaction<false> {
441    /// Remove database entries from start key to end key.
442    ///
443    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
444    /// including "begin_key" and excluding "end_key". It is not an error if no
445    /// keys exist in the range ["begin_key", "end_key").
446    pub fn delete_range<K: AsRef<[u8]>>(&mut self, from: K, to: K) {
447        let (start_key, end_key) = (from.as_ref(), to.as_ref());
448
449        unsafe {
450            ffi::rocksdb_writebatch_delete_range(
451                self.inner,
452                start_key.as_ptr() as *const c_char,
453                start_key.len() as size_t,
454                end_key.as_ptr() as *const c_char,
455                end_key.len() as size_t,
456            );
457        }
458    }
459
460    /// Remove database entries in column family from start key to end key.
461    ///
462    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
463    /// including "begin_key" and excluding "end_key". It is not an error if no
464    /// keys exist in the range ["begin_key", "end_key").
465    pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
466        let (start_key, end_key) = (from.as_ref(), to.as_ref());
467
468        unsafe {
469            ffi::rocksdb_writebatch_delete_range_cf(
470                self.inner,
471                cf.inner(),
472                start_key.as_ptr() as *const c_char,
473                start_key.len() as size_t,
474                end_key.as_ptr() as *const c_char,
475                end_key.len() as size_t,
476            );
477        }
478    }
479}
480
481impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
482    fn default() -> Self {
483        Self::new()
484    }
485}
486
487impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
488    fn drop(&mut self) {
489        unsafe {
490            ffi::rocksdb_writebatch_destroy(self.inner);
491        }
492    }
493}
494
495unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}