rust_rocksdb/write_batch.rs
1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::{ffi, AsColumnFamilyRef};
16use libc::{c_char, c_void, size_t};
17use std::slice;
18
19/// A type alias to keep compatibility. See [`WriteBatchWithTransaction`] for details
20pub type WriteBatch = WriteBatchWithTransaction<false>;
21
22/// An atomic batch of write operations.
23///
24/// [`delete_range`](#method.delete_range) is not supported in [`Transaction`].
25///
26/// Making an atomic commit of several writes:
27///
28/// ```
29/// use rust_rocksdb::{DB, Options, WriteBatchWithTransaction};
30///
31/// let tempdir = tempfile::Builder::new()
32/// .prefix("_path_for_rocksdb_storage1")
33/// .tempdir()
34/// .expect("Failed to create temporary path for the _path_for_rocksdb_storage1");
35/// let path = tempdir.path();
36/// {
37/// let db = DB::open_default(path).unwrap();
38/// let mut batch = WriteBatchWithTransaction::<false>::default();
39/// batch.put(b"my key", b"my value");
40/// batch.put(b"key2", b"value2");
41/// batch.put(b"key3", b"value3");
42///
43/// // delete_range is supported when use without transaction
44/// batch.delete_range(b"key2", b"key3");
45///
46/// db.write(batch); // Atomically commits the batch
47/// }
48/// let _ = DB::destroy(&Options::default(), path);
49/// ```
50///
51/// [`Transaction`]: crate::Transaction
52pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
53 pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
54}
55
56/// Receives the puts and deletes of a write batch.
57///
58/// The application must provide an implementation of this trait when
59/// iterating the operations within a `WriteBatch`
60pub trait WriteBatchIterator {
61 /// Called with a key and value that were `put` into the batch.
62 fn put(&mut self, key: Box<[u8]>, value: Box<[u8]>);
63 /// Called with a key that was `delete`d from the batch.
64 fn delete(&mut self, key: Box<[u8]>);
65}
66
67unsafe extern "C" fn writebatch_put_callback(
68 state: *mut c_void,
69 k: *const c_char,
70 klen: usize,
71 v: *const c_char,
72 vlen: usize,
73) {
74 // coerce the raw pointer back into a box, but "leak" it so we prevent
75 // freeing the resource before we are done with it
76 let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
77 let leaked_cb = Box::leak(boxed_cb);
78 let key = slice::from_raw_parts(k as *const u8, klen);
79 let value = slice::from_raw_parts(v as *const u8, vlen);
80 leaked_cb.put(
81 key.to_vec().into_boxed_slice(),
82 value.to_vec().into_boxed_slice(),
83 );
84}
85
86unsafe extern "C" fn writebatch_delete_callback(state: *mut c_void, k: *const c_char, klen: usize) {
87 // coerce the raw pointer back into a box, but "leak" it so we prevent
88 // freeing the resource before we are done with it
89 let boxed_cb = Box::from_raw(state as *mut &mut dyn WriteBatchIterator);
90 let leaked_cb = Box::leak(boxed_cb);
91 let key = slice::from_raw_parts(k as *const u8, klen);
92 leaked_cb.delete(key.to_vec().into_boxed_slice());
93}
94
95impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
96 /// Create a new `WriteBatch` without allocating memory.
97 pub fn new() -> Self {
98 Self {
99 inner: unsafe { ffi::rocksdb_writebatch_create() },
100 }
101 }
102
103 /// Creates `WriteBatch` with the specified `capacity` in bytes. Allocates immediately.
104 pub fn with_capacity_bytes(capacity_bytes: usize) -> Self {
105 Self {
106 // zeroes from default constructor
107 // https://github.com/facebook/rocksdb/blob/0f35db55d86ea8699ea936c9e2a4e34c82458d6b/include/rocksdb/write_batch.h#L66
108 inner: unsafe { ffi::rocksdb_writebatch_create_with_params(capacity_bytes, 0, 0, 0) },
109 }
110 }
111
112 /// Construct with a reference to a byte array serialized by [`WriteBatch`].
113 pub fn from_data(data: &[u8]) -> Self {
114 unsafe {
115 let ptr = data.as_ptr();
116 let len = data.len();
117 Self {
118 inner: ffi::rocksdb_writebatch_create_from(
119 ptr as *const libc::c_char,
120 len as size_t,
121 ),
122 }
123 }
124 }
125
126 pub fn len(&self) -> usize {
127 unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
128 }
129
130 /// Return WriteBatch serialized size (in bytes).
131 pub fn size_in_bytes(&self) -> usize {
132 unsafe {
133 let mut batch_size: size_t = 0;
134 ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
135 batch_size
136 }
137 }
138
139 /// Return a reference to a byte array which represents a serialized version of the batch.
140 pub fn data(&self) -> &[u8] {
141 unsafe {
142 let mut batch_size: size_t = 0;
143 let batch_data = ffi::rocksdb_writebatch_data(self.inner, &mut batch_size);
144 std::slice::from_raw_parts(batch_data as _, batch_size)
145 }
146 }
147
148 pub fn is_empty(&self) -> bool {
149 self.len() == 0
150 }
151
152 /// Iterate the put and delete operations within this write batch. Note that
153 /// this does _not_ return an `Iterator` but instead will invoke the `put()`
154 /// and `delete()` member functions of the provided `WriteBatchIterator`
155 /// trait implementation.
156 pub fn iterate(&self, callbacks: &mut dyn WriteBatchIterator) {
157 let state = Box::into_raw(Box::new(callbacks));
158 unsafe {
159 ffi::rocksdb_writebatch_iterate(
160 self.inner,
161 state as *mut c_void,
162 Some(writebatch_put_callback),
163 Some(writebatch_delete_callback),
164 );
165 // we must manually set the raw box free since there is no
166 // associated "destroy" callback for this object
167 drop(Box::from_raw(state));
168 }
169 }
170
171 /// Insert a value into the database under the given key.
172 pub fn put<K, V>(&mut self, key: K, value: V)
173 where
174 K: AsRef<[u8]>,
175 V: AsRef<[u8]>,
176 {
177 let key = key.as_ref();
178 let value = value.as_ref();
179
180 unsafe {
181 ffi::rocksdb_writebatch_put(
182 self.inner,
183 key.as_ptr() as *const c_char,
184 key.len() as size_t,
185 value.as_ptr() as *const c_char,
186 value.len() as size_t,
187 );
188 }
189 }
190
191 /// Insert a value into the specific column family of the database under the given key.
192 pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
193 where
194 K: AsRef<[u8]>,
195 V: AsRef<[u8]>,
196 {
197 let key = key.as_ref();
198 let value = value.as_ref();
199
200 unsafe {
201 ffi::rocksdb_writebatch_put_cf(
202 self.inner,
203 cf.inner(),
204 key.as_ptr() as *const c_char,
205 key.len() as size_t,
206 value.as_ptr() as *const c_char,
207 value.len() as size_t,
208 );
209 }
210 }
211
212 /// Insert a value into the specific column family of the database
213 /// under the given key with timestamp.
214 pub fn put_cf_with_ts<K, V, S>(&mut self, cf: &impl AsColumnFamilyRef, key: K, ts: S, value: V)
215 where
216 K: AsRef<[u8]>,
217 V: AsRef<[u8]>,
218 S: AsRef<[u8]>,
219 {
220 let key = key.as_ref();
221 let value = value.as_ref();
222 let ts = ts.as_ref();
223 unsafe {
224 ffi::rocksdb_writebatch_put_cf_with_ts(
225 self.inner,
226 cf.inner(),
227 key.as_ptr() as *const c_char,
228 key.len() as size_t,
229 ts.as_ptr() as *const c_char,
230 ts.len() as size_t,
231 value.as_ptr() as *const c_char,
232 value.len() as size_t,
233 );
234 }
235 }
236
237 pub fn merge<K, V>(&mut self, key: K, value: V)
238 where
239 K: AsRef<[u8]>,
240 V: AsRef<[u8]>,
241 {
242 let key = key.as_ref();
243 let value = value.as_ref();
244
245 unsafe {
246 ffi::rocksdb_writebatch_merge(
247 self.inner,
248 key.as_ptr() as *const c_char,
249 key.len() as size_t,
250 value.as_ptr() as *const c_char,
251 value.len() as size_t,
252 );
253 }
254 }
255
256 pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
257 where
258 K: AsRef<[u8]>,
259 V: AsRef<[u8]>,
260 {
261 let key = key.as_ref();
262 let value = value.as_ref();
263
264 unsafe {
265 ffi::rocksdb_writebatch_merge_cf(
266 self.inner,
267 cf.inner(),
268 key.as_ptr() as *const c_char,
269 key.len() as size_t,
270 value.as_ptr() as *const c_char,
271 value.len() as size_t,
272 );
273 }
274 }
275
276 /// Removes the database entry for key. Does nothing if the key was not found.
277 pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
278 let key = key.as_ref();
279
280 unsafe {
281 ffi::rocksdb_writebatch_delete(
282 self.inner,
283 key.as_ptr() as *const c_char,
284 key.len() as size_t,
285 );
286 }
287 }
288
289 /// Removes the database entry in the specific column family for key.
290 /// Does nothing if the key was not found.
291 pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
292 let key = key.as_ref();
293
294 unsafe {
295 ffi::rocksdb_writebatch_delete_cf(
296 self.inner,
297 cf.inner(),
298 key.as_ptr() as *const c_char,
299 key.len() as size_t,
300 );
301 }
302 }
303
304 /// Removes the database entry in the specific column family with timestamp for key.
305 /// Does nothing if the key was not found.
306 pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
307 &mut self,
308 cf: &impl AsColumnFamilyRef,
309 key: K,
310 ts: S,
311 ) {
312 let key = key.as_ref();
313 let ts = ts.as_ref();
314 unsafe {
315 ffi::rocksdb_writebatch_delete_cf_with_ts(
316 self.inner,
317 cf.inner(),
318 key.as_ptr() as *const c_char,
319 key.len() as size_t,
320 ts.as_ptr() as *const c_char,
321 ts.len() as size_t,
322 );
323 }
324 }
325
326 // Append a blob of arbitrary size to the records in this batch. The blob will
327 // be stored in the transaction log but not in any other file. In particular,
328 // it will not be persisted to the SST files. When iterating over this
329 // WriteBatch, WriteBatch::Handler::LogData will be called with the contents
330 // of the blob as it is encountered. Blobs, puts, deletes, and merges will be
331 // encountered in the same order in which they were inserted. The blob will
332 // NOT consume sequence number(s) and will NOT increase the count of the batch
333 //
334 // Example application: add timestamps to the transaction log for use in
335 // replication.
336 pub fn put_log_data<V: AsRef<[u8]>>(&mut self, log_data: V) {
337 let log_data = log_data.as_ref();
338
339 unsafe {
340 ffi::rocksdb_writebatch_put_log_data(
341 self.inner,
342 log_data.as_ptr() as *const c_char,
343 log_data.len() as size_t,
344 );
345 }
346 }
347
348 /// Clear all updates buffered in this batch.
349 pub fn clear(&mut self) {
350 unsafe {
351 ffi::rocksdb_writebatch_clear(self.inner);
352 }
353 }
354}
355
356impl WriteBatchWithTransaction<false> {
357 /// Remove database entries from start key to end key.
358 ///
359 /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
360 /// including "begin_key" and excluding "end_key". It is not an error if no
361 /// keys exist in the range ["begin_key", "end_key").
362 pub fn delete_range<K: AsRef<[u8]>>(&mut self, from: K, to: K) {
363 let (start_key, end_key) = (from.as_ref(), to.as_ref());
364
365 unsafe {
366 ffi::rocksdb_writebatch_delete_range(
367 self.inner,
368 start_key.as_ptr() as *const c_char,
369 start_key.len() as size_t,
370 end_key.as_ptr() as *const c_char,
371 end_key.len() as size_t,
372 );
373 }
374 }
375
376 /// Remove database entries in column family from start key to end key.
377 ///
378 /// Removes the database entries in the range ["begin_key", "end_key"), i.e.,
379 /// including "begin_key" and excluding "end_key". It is not an error if no
380 /// keys exist in the range ["begin_key", "end_key").
381 pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
382 let (start_key, end_key) = (from.as_ref(), to.as_ref());
383
384 unsafe {
385 ffi::rocksdb_writebatch_delete_range_cf(
386 self.inner,
387 cf.inner(),
388 start_key.as_ptr() as *const c_char,
389 start_key.len() as size_t,
390 end_key.as_ptr() as *const c_char,
391 end_key.len() as size_t,
392 );
393 }
394 }
395}
396
397impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
398 fn default() -> Self {
399 Self::new()
400 }
401}
402
403impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
404 fn drop(&mut self) {
405 unsafe {
406 ffi::rocksdb_writebatch_destroy(self.inner);
407 }
408 }
409}
410
411unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}