electrs_rocksdb/
write_batch.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{ffi, AsColumnFamilyRef};
16use libc::{c_char, c_void, size_t};
17use std::slice;
18
19/// A type alias to keep compatibility. See [`WriteBatchWithTransaction`] for details
20pub type WriteBatch = WriteBatchWithTransaction<false>;
21
22/// An atomic batch of write operations.
23///
24/// [`delete_range`] is not supported in [`Transaction`].
25///
26/// Making an atomic commit of several writes:
27///
28/// ```
29/// use rocksdb::{DB, Options, WriteBatchWithTransaction};
30///
31/// let path = "_path_for_rocksdb_storage1";
32/// {
33///     let db = DB::open_default(path).unwrap();
34///     let mut batch = WriteBatchWithTransaction::<false>::default();
35///     batch.put(b"my key", b"my value");
36///     batch.put(b"key2", b"value2");
37///     batch.put(b"key3", b"value3");
38///
39///     // DeleteRange is supported when use without transaction
40///     batch.delete_range(b"key2", b"key3");
41///
42///     db.write(batch); // Atomically commits the batch
43/// }
44/// let _ = DB::destroy(&Options::default(), path);
45/// ```
46///
47/// [`DeleteRange`]: Self::delete_range
48/// [`Transaction`]: crate::Transaction
49pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
50    pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
51}
52
53/// Receives the puts and deletes of a write batch.
54///
55/// The application must provide an implementation of this trait when
56/// iterating the operations within a `WriteBatch`
57pub trait WriteBatchIterator {
58    /// Called with a key and value that were `put` into the batch.
59    fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>);
60    /// Called with a key that was `delete`d from the batch.
61    fn delete(&mut self, key: Box<[u8]>);
62}
63
64unsafe extern "C" fn writebatch_put_callback(
65    state: *mut c_void,
66    k: *const c_char,
67    klen: usize,
68    v: *const c_char,
69    vlen: usize,
70) {
71    // coerce the raw pointer back into a box, but "leak" it so we prevent
72    // freeing the resource before we are done with it
73    let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
74    let leaked_cb = Box::leak(boxed_cb);
75    let key = slice::from_raw_parts(k as *const u8, klen);
76    let value = slice::from_raw_parts(v as *const u8, vlen);
77    leaked_cb.put(
78        key.to_vec().into_boxed_slice(),
79        value.to_vec().into_boxed_slice(),
80    );
81}
82
83unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) {
84    // coerce the raw pointer back into a box, but "leak" it so we prevent
85    // freeing the resource before we are done with it
86    let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
87    let leaked_cb = Box::leak(boxed_cb);
88    let key = slice::from_raw_parts(k as *const u8, klen);
89    leaked_cb.delete(key.to_vec().into_boxed_slice());
90}
91
92impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
93    /// Construct with a reference to a byte array serialized by [`WriteBatch`].
94    pub fn from_data(data: &[u8]) -> Self {
95        unsafe {
96            let ptr = data.as_ptr();
97            let len = data.len();
98            Self {
99                inner: ffi::rocksdb_writebatch_create_from(
100                    ptr as *const libc::c_char,
101                    len as size_t,
102                ),
103            }
104        }
105    }
106
107    pub fn len(&self) -> usize {
108        unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
109    }
110
111    /// Return WriteBatch serialized size (in bytes).
112    pub fn size_in_bytes(&self) -> usize {
113        unsafe {
114            let mut batch_size: size_t = 0;
115            ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
116            batch_size
117        }
118    }
119
120    /// Return a reference to a byte array which represents a serialized version of the batch.
121    pub fn data(&self) -> &[u8] {
122        unsafe {
123            let mut batch_size: size_t = 0;
124            let batch_data = ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
125            std::slice::from_raw_parts(batch_data as _, batch_size)
126        }
127    }
128
129    pub fn is_empty(&self) -> bool {
130        self.len() == 0
131    }
132
133    /// Iterate the put and delete operations within this write batch. Note that
134    /// this does _not_ return an `Iterator` but instead will invoke the `put()`
135    /// and `delete()` member functions of the provided `WriteBatchIterator`
136    /// trait implementation.
137    pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) {
138        let state = Box::into_raw(Box::new(callbacks));
139        unsafe {
140            ffi::rocksdb_writebatch_iterate(
141                self.inner,
142                state as *mut c_void,
143                Some(writebatch_put_callback),
144                Some(writebatch_delete_callback),
145            );
146            // we must manually set the raw box free since there is no
147            // associated "destroy" callback for this object
148            drop(Box::from_raw(state));
149        }
150    }
151
152    /// Insert a value into the database under the given key.
153    pub fn put<K, V>(&mut self, key: K, value: V)
154    where
155        K: AsRef<[u8]>,
156        V: AsRef<[u8]>,
157    {
158        let key = key.as_ref();
159        let value = value.as_ref();
160
161        unsafe {
162            ffi::rocksdb_writebatch_put(
163                self.inner,
164                key.as_ptr() as *const c_char,
165                key.len() as size_t,
166                value.as_ptr() as *const c_char,
167                value.len() as size_t,
168            );
169        }
170    }
171
172    pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
173    where
174        K: AsRef<[u8]>,
175        V: AsRef<[u8]>,
176    {
177        let key = key.as_ref();
178        let value = value.as_ref();
179
180        unsafe {
181            ffi::rocksdb_writebatch_put_cf(
182                self.inner,
183                cf.inner(),
184                key.as_ptr() as *const c_char,
185                key.len() as size_t,
186                value.as_ptr() as *const c_char,
187                value.len() as size_t,
188            );
189        }
190    }
191
192    pub fn merge<K, V>(&mut self, key: K, value: V)
193    where
194        K: AsRef<[u8]>,
195        V: AsRef<[u8]>,
196    {
197        let key = key.as_ref();
198        let value = value.as_ref();
199
200        unsafe {
201            ffi::rocksdb_writebatch_merge(
202                self.inner,
203                key.as_ptr() as *const c_char,
204                key.len() as size_t,
205                value.as_ptr() as *const c_char,
206                value.len() as size_t,
207            );
208        }
209    }
210
211    pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
212    where
213        K: AsRef<[u8]>,
214        V: AsRef<[u8]>,
215    {
216        let key = key.as_ref();
217        let value = value.as_ref();
218
219        unsafe {
220            ffi::rocksdb_writebatch_merge_cf(
221                self.inner,
222                cf.inner(),
223                key.as_ptr() as *const c_char,
224                key.len() as size_t,
225                value.as_ptr() as *const c_char,
226                value.len() as size_t,
227            );
228        }
229    }
230
231    /// Removes the database entry for key. Does nothing if the key was not found.
232    pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
233        let key = key.as_ref();
234
235        unsafe {
236            ffi::rocksdb_writebatch_delete(
237                self.inner,
238                key.as_ptr() as *const c_char,
239                key.len() as size_t,
240            );
241        }
242    }
243
244    pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
245        let key = key.as_ref();
246
247        unsafe {
248            ffi::rocksdb_writebatch_delete_cf(
249                self.inner,
250                cf.inner(),
251                key.as_ptr() as *const c_char,
252                key.len() as size_t,
253            );
254        }
255    }
256
257    /// Clear all updates buffered in this batch.
258    pub fn clear(&mut self) {
259        unsafe {
260            ffi::rocksdb_writebatch_clear(self.inner);
261        }
262    }
263}
264
265impl WriteBatchWithTransaction<false> {
266    /// Remove database entries from start key to end key.
267    ///
268    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
269    /// including "begin_key" and excluding "end_key". It is not an error if no
270    /// keys exist in the range ["begin_key", "end_key").
271    pub fn delete_range<K: AsRef<[u8]>>(&mut self, from: K, to: K) {
272        let (start_key, end_key) = (from.as_ref(), to.as_ref());
273
274        unsafe {
275            ffi::rocksdb_writebatch_delete_range(
276                self.inner,
277                start_key.as_ptr() as *const c_char,
278                start_key.len() as size_t,
279                end_key.as_ptr() as *const c_char,
280                end_key.len() as size_t,
281            );
282        }
283    }
284
285    /// Remove database entries in column family from start key to end key.
286    ///
287    /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
288    /// including "begin_key" and excluding "end_key". It is not an error if no
289    /// keys exist in the range ["begin_key", "end_key").
290    pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
291        let (start_key, end_key) = (from.as_ref(), to.as_ref());
292
293        unsafe {
294            ffi::rocksdb_writebatch_delete_range_cf(
295                self.inner,
296                cf.inner(),
297                start_key.as_ptr() as *const c_char,
298                start_key.len() as size_t,
299                end_key.as_ptr() as *const c_char,
300                end_key.len() as size_t,
301            );
302        }
303    }
304}
305
306impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
307    fn default() -> Self {
308        Self {
309            inner: unsafe { ffi::rocksdb_writebatch_create() },
310        }
311    }
312}
313
314impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
315    fn drop(&mut self) {
316        unsafe {
317            ffi::rocksdb_writebatch_destroy(self.inner);
318        }
319    }
320}
321
322unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}