rust_rocksdb/transactions/
transaction.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{marker::PhantomData, ptr};
17
18use crate::{
19    db::{convert_values, DBAccess},
20    ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
21    Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
22};
23use libc::{c_char, c_void, size_t};
24
25/// RocksDB Transaction.
26///
27/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
28///
29/// [`TransactionDB`]: crate::TransactionDB
30/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
31pub struct Transaction<'db, DB> {
32    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
33    pub(crate) _marker: PhantomData<&'db DB>,
34}
35
36unsafe impl<DB> Send for Transaction<'_, DB> {}
37
38impl<DB> DBAccess for Transaction<'_, DB> {
39    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
40        unsafe { ffi::rocksdb_transaction_get_snapshot(self.inner) }
41    }
42
43    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
44        unsafe {
45            ffi::rocksdb_free(snapshot as *mut c_void);
46        }
47    }
48
49    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
50        unsafe { ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner) }
51    }
52
53    unsafe fn create_iterator_cf(
54        &self,
55        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
56        readopts: &ReadOptions,
57    ) -> *mut ffi::rocksdb_iterator_t {
58        unsafe {
59            ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
60        }
61    }
62
63    fn get_opt<K: AsRef<[u8]>>(
64        &self,
65        key: K,
66        readopts: &ReadOptions,
67    ) -> Result<Option<Vec<u8>>, Error> {
68        self.get_opt(key, readopts)
69    }
70
71    fn get_cf_opt<K: AsRef<[u8]>>(
72        &self,
73        cf: &impl AsColumnFamilyRef,
74        key: K,
75        readopts: &ReadOptions,
76    ) -> Result<Option<Vec<u8>>, Error> {
77        self.get_cf_opt(cf, key, readopts)
78    }
79
80    fn get_pinned_opt<K: AsRef<[u8]>>(
81        &'_ self,
82        key: K,
83        readopts: &ReadOptions,
84    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
85        self.get_pinned_opt(key, readopts)
86    }
87
88    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
89        &'_ self,
90        cf: &impl AsColumnFamilyRef,
91        key: K,
92        readopts: &ReadOptions,
93    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
94        self.get_pinned_cf_opt(cf, key, readopts)
95    }
96
97    fn multi_get_opt<K, I>(
98        &self,
99        keys: I,
100        readopts: &ReadOptions,
101    ) -> Vec<Result<Option<Vec<u8>>, Error>>
102    where
103        K: AsRef<[u8]>,
104        I: IntoIterator<Item = K>,
105    {
106        self.multi_get_opt(keys, readopts)
107    }
108
109    fn multi_get_cf_opt<'b, K, I, W>(
110        &self,
111        keys_cf: I,
112        readopts: &ReadOptions,
113    ) -> Vec<Result<Option<Vec<u8>>, Error>>
114    where
115        K: AsRef<[u8]>,
116        I: IntoIterator<Item = (&'b W, K)>,
117        W: AsColumnFamilyRef + 'b,
118    {
119        self.multi_get_cf_opt(keys_cf, readopts)
120    }
121}
122
123impl<DB> Transaction<'_, DB> {
124    /// Write all batched keys to the DB atomically.
125    ///
126    /// May return any error that could be returned by `DB::write`.
127    ///
128    /// If this transaction was created by a [`TransactionDB`], an error of
129    /// the [`Expired`] kind may be returned if this transaction has
130    /// lived longer than expiration time in [`TransactionOptions`].
131    ///
132    /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
133    /// the [`Busy`] kind may be returned if the transaction
134    /// could not guarantee that there are no write conflicts.
135    /// An error of the [`TryAgain`] kind may be returned if the memtable
136    /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
137    ///
138    /// [`Expired`]: crate::ErrorKind::Expired
139    /// [`TransactionOptions`]: crate::TransactionOptions
140    /// [`TransactionDB`]: crate::TransactionDB
141    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
142    /// [`Busy`]: crate::ErrorKind::Busy
143    /// [`TryAgain`]: crate::ErrorKind::TryAgain
144    /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
145    pub fn commit(self) -> Result<(), Error> {
146        unsafe {
147            ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
148        }
149        Ok(())
150    }
151
152    pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
153        let ptr = name.as_ptr();
154        let len = name.len();
155        unsafe {
156            ffi_try!(ffi::rocksdb_transaction_set_name(
157                self.inner, ptr as _, len as _
158            ));
159        }
160
161        Ok(())
162    }
163
164    pub fn get_name(&self) -> Option<Vec<u8>> {
165        unsafe {
166            let mut name_len = 0;
167            let name = ffi::rocksdb_transaction_get_name(self.inner, &mut name_len);
168            if name.is_null() {
169                None
170            } else {
171                let mut vec = vec![0; name_len];
172                std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
173                ffi::rocksdb_free(name as *mut c_void);
174                Some(vec)
175            }
176        }
177    }
178
179    pub fn prepare(&self) -> Result<(), Error> {
180        unsafe {
181            ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
182        }
183        Ok(())
184    }
185
186    /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
187    /// Otherwise, returns a snapshot with `nullptr` inside which doesn't affect read operations.
188    ///
189    /// [`TransactionOptions`]: crate::TransactionOptions
190    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
191        SnapshotWithThreadMode::new(self)
192    }
193
194    /// Discard all batched writes in this transaction.
195    pub fn rollback(&self) -> Result<(), Error> {
196        unsafe {
197            ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
198            Ok(())
199        }
200    }
201
202    /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
203    /// May be called multiple times to set multiple save points.
204    ///
205    /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
206    pub fn set_savepoint(&self) {
207        unsafe {
208            ffi::rocksdb_transaction_set_savepoint(self.inner);
209        }
210    }
211
212    /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
213    /// and removes the most recent [`set_savepoint`].
214    ///
215    /// Returns error if there is no previous call to [`set_savepoint`].
216    ///
217    /// [`set_savepoint`]: Self::set_savepoint
218    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
219        unsafe {
220            ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
221            Ok(())
222        }
223    }
224
225    /// Get the bytes associated with a key value.
226    ///
227    /// See [`get_cf_opt`] for details.
228    ///
229    /// [`get_cf_opt`]: Self::get_cf_opt
230    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
231        self.get_opt(key, &ReadOptions::default())
232    }
233
234    pub fn get_pinned<K: AsRef<[u8]>>(
235        &'_ self,
236        key: K,
237    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
238        self.get_pinned_opt(key, &ReadOptions::default())
239    }
240
241    /// Get the bytes associated with a key value and the given column family.
242    ///
243    /// See [`get_cf_opt`] for details.
244    ///
245    /// [`get_cf_opt`]: Self::get_cf_opt
246    pub fn get_cf<K: AsRef<[u8]>>(
247        &self,
248        cf: &impl AsColumnFamilyRef,
249        key: K,
250    ) -> Result<Option<Vec<u8>>, Error> {
251        self.get_cf_opt(cf, key, &ReadOptions::default())
252    }
253
254    pub fn get_pinned_cf<K: AsRef<[u8]>>(
255        &'_ self,
256        cf: &impl AsColumnFamilyRef,
257        key: K,
258    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
259        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
260    }
261
262    /// Get the key and ensure that this transaction will only
263    /// be able to be committed if this key is not written outside this
264    /// transaction after it has first been read (or after the snapshot if a
265    /// snapshot is set in this transaction).
266    ///
267    /// See [`get_for_update_cf_opt`] for details.
268    ///
269    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
270    pub fn get_for_update<K: AsRef<[u8]>>(
271        &self,
272        key: K,
273        exclusive: bool,
274    ) -> Result<Option<Vec<u8>>, Error> {
275        self.get_for_update_opt(key, exclusive, &ReadOptions::default())
276    }
277
278    pub fn get_pinned_for_update<K: AsRef<[u8]>>(
279        &'_ self,
280        key: K,
281        exclusive: bool,
282    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
283        self.get_pinned_for_update_opt(key, exclusive, &ReadOptions::default())
284    }
285
286    /// Get the key in the given column family and ensure that this transaction will only
287    /// be able to be committed if this key is not written outside this
288    /// transaction after it has first been read (or after the snapshot if a
289    /// snapshot is set in this transaction).
290    ///
291    /// See [`get_for_update_cf_opt`] for details.
292    ///
293    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
294    pub fn get_for_update_cf<K: AsRef<[u8]>>(
295        &self,
296        cf: &impl AsColumnFamilyRef,
297        key: K,
298        exclusive: bool,
299    ) -> Result<Option<Vec<u8>>, Error> {
300        self.get_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
301    }
302
303    pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
304        &'_ self,
305        cf: &impl AsColumnFamilyRef,
306        key: K,
307        exclusive: bool,
308    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
309        self.get_pinned_for_update_cf_opt(cf, key, exclusive, &ReadOptions::default())
310    }
311
312    /// Returns the bytes associated with a key value with read options.
313    ///
314    /// See [`get_cf_opt`] for details.
315    ///
316    /// [`get_cf_opt`]: Self::get_cf_opt
317    pub fn get_opt<K: AsRef<[u8]>>(
318        &self,
319        key: K,
320        readopts: &ReadOptions,
321    ) -> Result<Option<Vec<u8>>, Error> {
322        self.get_pinned_opt(key, readopts)
323            .map(|x| x.map(|v| v.as_ref().to_vec()))
324    }
325
326    pub fn get_pinned_opt<K: AsRef<[u8]>>(
327        &'_ self,
328        key: K,
329        readopts: &ReadOptions,
330    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
331        let key = key.as_ref();
332        unsafe {
333            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
334                self.inner,
335                readopts.inner,
336                key.as_ptr() as *const c_char,
337                key.len(),
338            ));
339            if val.is_null() {
340                Ok(None)
341            } else {
342                Ok(Some(DBPinnableSlice::from_c(val)))
343            }
344        }
345    }
346
347    /// Get the bytes associated with a key value and the given column family with read options.
348    ///
349    /// This function will also read pending changes in this transaction.
350    /// Currently, this function will return an error of the [`MergeInProgress`] kind
351    /// if the most recent write to the queried key in this batch is a Merge.
352    ///
353    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
354    pub fn get_cf_opt<K: AsRef<[u8]>>(
355        &self,
356        cf: &impl AsColumnFamilyRef,
357        key: K,
358        readopts: &ReadOptions,
359    ) -> Result<Option<Vec<u8>>, Error> {
360        self.get_pinned_cf_opt(cf, key, readopts)
361            .map(|x| x.map(|v| v.as_ref().to_vec()))
362    }
363
364    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
365        &'_ self,
366        cf: &impl AsColumnFamilyRef,
367        key: K,
368        readopts: &ReadOptions,
369    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
370        let key = key.as_ref();
371        unsafe {
372            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
373                self.inner,
374                readopts.inner,
375                cf.inner(),
376                key.as_ptr() as *const c_char,
377                key.len(),
378            ));
379            if val.is_null() {
380                Ok(None)
381            } else {
382                Ok(Some(DBPinnableSlice::from_c(val)))
383            }
384        }
385    }
386
387    /// Get the key with read options and ensure that this transaction will only
388    /// be able to be committed if this key is not written outside this
389    /// transaction after it has first been read (or after the snapshot if a
390    /// snapshot is set in this transaction).
391    ///
392    /// See [`get_for_update_cf_opt`] for details.
393    ///
394    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
395    pub fn get_for_update_opt<K: AsRef<[u8]>>(
396        &self,
397        key: K,
398        exclusive: bool,
399        opts: &ReadOptions,
400    ) -> Result<Option<Vec<u8>>, Error> {
401        self.get_pinned_for_update_opt(key, exclusive, opts)
402            .map(|x| x.map(|v| v.as_ref().to_vec()))
403    }
404
405    pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
406        &'_ self,
407        key: K,
408        exclusive: bool,
409        opts: &ReadOptions,
410    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
411        let key = key.as_ref();
412        unsafe {
413            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
414                self.inner,
415                opts.inner,
416                key.as_ptr() as *const c_char,
417                key.len() as size_t,
418                u8::from(exclusive),
419            ));
420            if val.is_null() {
421                Ok(None)
422            } else {
423                Ok(Some(DBPinnableSlice::from_c(val)))
424            }
425        }
426    }
427
428    /// Get the key in the given column family with read options
429    /// and ensure that this transaction will only
430    /// be able to be committed if this key is not written outside this
431    /// transaction after it has first been read (or after the snapshot if a
432    /// snapshot is set in this transaction).
433    ///
434    /// Currently, this function will return an error of the [`MergeInProgress`]
435    /// if the most recent write to the queried key in this batch is a Merge.
436    ///
437    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
438    /// * [`Busy`] if there is a write conflict.
439    /// * [`TimedOut`] if a lock could not be acquired.
440    /// * [`TryAgain`] if the memtable history size is not large enough.
441    /// * [`MergeInProgress`] if merge operations cannot be resolved.
442    /// * or other errors if this key could not be read.
443    ///
444    /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
445    /// can cause [`commit`] to fail. Otherwise, it could return any error that could
446    /// be returned by `[DB::get]`.
447    ///
448    /// [`Busy`]: crate::ErrorKind::Busy
449    /// [`TimedOut`]: crate::ErrorKind::TimedOut
450    /// [`TryAgain`]: crate::ErrorKind::TryAgain
451    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
452    /// [`TransactionDB`]: crate::TransactionDB
453    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
454    /// [`commit`]: Self::commit
455    /// [`DB::get`]: crate::DB::get
456    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
457        &self,
458        cf: &impl AsColumnFamilyRef,
459        key: K,
460        exclusive: bool,
461        opts: &ReadOptions,
462    ) -> Result<Option<Vec<u8>>, Error> {
463        self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
464            .map(|x| x.map(|v| v.as_ref().to_vec()))
465    }
466
467    pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
468        &'_ self,
469        cf: &impl AsColumnFamilyRef,
470        key: K,
471        exclusive: bool,
472        opts: &ReadOptions,
473    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
474        let key = key.as_ref();
475        unsafe {
476            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
477                self.inner,
478                opts.inner,
479                cf.inner(),
480                key.as_ptr() as *const c_char,
481                key.len() as size_t,
482                u8::from(exclusive),
483            ));
484            if val.is_null() {
485                Ok(None)
486            } else {
487                Ok(Some(DBPinnableSlice::from_c(val)))
488            }
489        }
490    }
491
492    /// Return the values associated with the given keys.
493    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
494    where
495        K: AsRef<[u8]>,
496        I: IntoIterator<Item = K>,
497    {
498        self.multi_get_opt(keys, &ReadOptions::default())
499    }
500
501    /// Return the values associated with the given keys using read options.
502    pub fn multi_get_opt<K, I>(
503        &self,
504        keys: I,
505        readopts: &ReadOptions,
506    ) -> Vec<Result<Option<Vec<u8>>, Error>>
507    where
508        K: AsRef<[u8]>,
509        I: IntoIterator<Item = K>,
510    {
511        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
512            .into_iter()
513            .map(|key| {
514                let key = key.as_ref();
515                (Box::from(key), key.len())
516            })
517            .unzip();
518        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
519
520        let mut values = vec![ptr::null_mut(); keys.len()];
521        let mut values_sizes = vec![0_usize; keys.len()];
522        let mut errors = vec![ptr::null_mut(); keys.len()];
523        unsafe {
524            ffi::rocksdb_transaction_multi_get(
525                self.inner,
526                readopts.inner,
527                ptr_keys.len(),
528                ptr_keys.as_ptr(),
529                keys_sizes.as_ptr(),
530                values.as_mut_ptr(),
531                values_sizes.as_mut_ptr(),
532                errors.as_mut_ptr(),
533            );
534        }
535
536        convert_values(values, values_sizes, errors)
537    }
538
539    /// Return the values associated with the given keys and column families.
540    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
541        &'a self,
542        keys: I,
543    ) -> Vec<Result<Option<Vec<u8>>, Error>>
544    where
545        K: AsRef<[u8]>,
546        I: IntoIterator<Item = (&'b W, K)>,
547        W: 'b + AsColumnFamilyRef,
548    {
549        self.multi_get_cf_opt(keys, &ReadOptions::default())
550    }
551
552    /// Return the values associated with the given keys and column families using read options.
553    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
554        &'a self,
555        keys: I,
556        readopts: &ReadOptions,
557    ) -> Vec<Result<Option<Vec<u8>>, Error>>
558    where
559        K: AsRef<[u8]>,
560        I: IntoIterator<Item = (&'b W, K)>,
561        W: 'b + AsColumnFamilyRef,
562    {
563        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
564            .into_iter()
565            .map(|(cf, key)| {
566                let key = key.as_ref();
567                ((cf, Box::from(key)), key.len())
568            })
569            .unzip();
570        let ptr_keys: Vec<_> = cfs_and_keys
571            .iter()
572            .map(|(_, k)| k.as_ptr() as *const c_char)
573            .collect();
574        let ptr_cfs: Vec<_> = cfs_and_keys
575            .iter()
576            .map(|(c, _)| c.inner().cast_const())
577            .collect();
578
579        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
580        let mut values_sizes = vec![0_usize; ptr_keys.len()];
581        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
582        unsafe {
583            ffi::rocksdb_transaction_multi_get_cf(
584                self.inner,
585                readopts.inner,
586                ptr_cfs.as_ptr(),
587                ptr_keys.len(),
588                ptr_keys.as_ptr(),
589                keys_sizes.as_ptr(),
590                values.as_mut_ptr(),
591                values_sizes.as_mut_ptr(),
592                errors.as_mut_ptr(),
593            );
594        }
595
596        convert_values(values, values_sizes, errors)
597    }
598
599    /// Put the key value in default column family and do conflict checking on the key.
600    ///
601    /// See [`put_cf`] for details.
602    ///
603    /// [`put_cf`]: Self::put_cf
604    pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
605        let key = key.as_ref();
606        let value = value.as_ref();
607        unsafe {
608            ffi_try!(ffi::rocksdb_transaction_put(
609                self.inner,
610                key.as_ptr() as *const c_char,
611                key.len() as size_t,
612                value.as_ptr() as *const c_char,
613                value.len() as size_t,
614            ));
615            Ok(())
616        }
617    }
618
619    /// Put the key value in the given column family and do conflict checking on the key.
620    ///
621    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
622    /// * [`Busy`] if there is a write conflict.
623    /// * [`TimedOut`] if a lock could not be acquired.
624    /// * [`TryAgain`] if the memtable history size is not large enough.
625    /// * [`MergeInProgress`] if merge operations cannot be resolved.
626    /// * or other errors on unexpected failures.
627    ///
628    /// [`Busy`]: crate::ErrorKind::Busy
629    /// [`TimedOut`]: crate::ErrorKind::TimedOut
630    /// [`TryAgain`]: crate::ErrorKind::TryAgain
631    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
632    /// [`TransactionDB`]: crate::TransactionDB
633    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
634    pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
635        &self,
636        cf: &impl AsColumnFamilyRef,
637        key: K,
638        value: V,
639    ) -> Result<(), Error> {
640        let key = key.as_ref();
641        let value = value.as_ref();
642        unsafe {
643            ffi_try!(ffi::rocksdb_transaction_put_cf(
644                self.inner,
645                cf.inner(),
646                key.as_ptr() as *const c_char,
647                key.len() as size_t,
648                value.as_ptr() as *const c_char,
649                value.len() as size_t,
650            ));
651            Ok(())
652        }
653    }
654
655    /// Merge value with existing value of key, and also do conflict checking on the key.
656    ///
657    /// See [`merge_cf`] for details.
658    ///
659    /// [`merge_cf`]: Self::merge_cf
660    pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
661        let key = key.as_ref();
662        let value = value.as_ref();
663        unsafe {
664            ffi_try!(ffi::rocksdb_transaction_merge(
665                self.inner,
666                key.as_ptr() as *const c_char,
667                key.len() as size_t,
668                value.as_ptr() as *const c_char,
669                value.len() as size_t
670            ));
671            Ok(())
672        }
673    }
674
675    /// Merge `value` with existing value of `key` in the given column family,
676    /// and also do conflict checking on the key.
677    ///
678    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
679    /// * [`Busy`] if there is a write conflict.
680    /// * [`TimedOut`] if a lock could not be acquired.
681    /// * [`TryAgain`] if the memtable history size is not large enough.
682    /// * [`MergeInProgress`] if merge operations cannot be resolved.
683    /// * or other errors on unexpected failures.
684    ///
685    /// [`Busy`]: crate::ErrorKind::Busy
686    /// [`TimedOut`]: crate::ErrorKind::TimedOut
687    /// [`TryAgain`]: crate::ErrorKind::TryAgain
688    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
689    /// [`TransactionDB`]: crate::TransactionDB
690    pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
691        &self,
692        cf: &impl AsColumnFamilyRef,
693        key: K,
694        value: V,
695    ) -> Result<(), Error> {
696        let key = key.as_ref();
697        let value = value.as_ref();
698        unsafe {
699            ffi_try!(ffi::rocksdb_transaction_merge_cf(
700                self.inner,
701                cf.inner(),
702                key.as_ptr() as *const c_char,
703                key.len() as size_t,
704                value.as_ptr() as *const c_char,
705                value.len() as size_t
706            ));
707            Ok(())
708        }
709    }
710
711    /// Delete the key value if it exists and do conflict checking on the key.
712    ///
713    /// See [`delete_cf`] for details.
714    ///
715    /// [`delete_cf`]: Self::delete_cf
716    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
717        let key = key.as_ref();
718        unsafe {
719            ffi_try!(ffi::rocksdb_transaction_delete(
720                self.inner,
721                key.as_ptr() as *const c_char,
722                key.len() as size_t
723            ));
724        }
725        Ok(())
726    }
727
728    /// Delete the key value in the given column family and do conflict checking.
729    ///
730    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
731    /// * [`Busy`] if there is a write conflict.
732    /// * [`TimedOut`] if a lock could not be acquired.
733    /// * [`TryAgain`] if the memtable history size is not large enough.
734    /// * [`MergeInProgress`] if merge operations cannot be resolved.
735    /// * or other errors on unexpected failures.
736    ///
737    /// [`Busy`]: crate::ErrorKind::Busy
738    /// [`TimedOut`]: crate::ErrorKind::TimedOut
739    /// [`TryAgain`]: crate::ErrorKind::TryAgain
740    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
741    /// [`TransactionDB`]: crate::TransactionDB
742    pub fn delete_cf<K: AsRef<[u8]>>(
743        &self,
744        cf: &impl AsColumnFamilyRef,
745        key: K,
746    ) -> Result<(), Error> {
747        let key = key.as_ref();
748        unsafe {
749            ffi_try!(ffi::rocksdb_transaction_delete_cf(
750                self.inner,
751                cf.inner(),
752                key.as_ptr() as *const c_char,
753                key.len() as size_t
754            ));
755        }
756        Ok(())
757    }
758
759    pub fn iterator<'a: 'b, 'b>(
760        &'a self,
761        mode: IteratorMode,
762    ) -> DBIteratorWithThreadMode<'b, Self> {
763        let readopts = ReadOptions::default();
764        self.iterator_opt(mode, readopts)
765    }
766
767    pub fn iterator_opt<'a: 'b, 'b>(
768        &'a self,
769        mode: IteratorMode,
770        readopts: ReadOptions,
771    ) -> DBIteratorWithThreadMode<'b, Self> {
772        DBIteratorWithThreadMode::new(self, readopts, mode)
773    }
774
775    /// Opens an iterator using the provided ReadOptions.
776    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
777    pub fn iterator_cf_opt<'a: 'b, 'b>(
778        &'a self,
779        cf_handle: &impl AsColumnFamilyRef,
780        readopts: ReadOptions,
781        mode: IteratorMode,
782    ) -> DBIteratorWithThreadMode<'b, Self> {
783        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
784    }
785
786    /// Opens an iterator with `set_total_order_seek` enabled.
787    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
788    /// with a Hash-based implementation.
789    pub fn full_iterator<'a: 'b, 'b>(
790        &'a self,
791        mode: IteratorMode,
792    ) -> DBIteratorWithThreadMode<'b, Self> {
793        let mut opts = ReadOptions::default();
794        opts.set_total_order_seek(true);
795        DBIteratorWithThreadMode::new(self, opts, mode)
796    }
797
798    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
799        &'a self,
800        prefix: P,
801    ) -> DBIteratorWithThreadMode<'b, Self> {
802        let mut opts = ReadOptions::default();
803        opts.set_prefix_same_as_start(true);
804        DBIteratorWithThreadMode::new(
805            self,
806            opts,
807            IteratorMode::From(prefix.as_ref(), Direction::Forward),
808        )
809    }
810
811    pub fn iterator_cf<'a: 'b, 'b>(
812        &'a self,
813        cf_handle: &impl AsColumnFamilyRef,
814        mode: IteratorMode,
815    ) -> DBIteratorWithThreadMode<'b, Self> {
816        let opts = ReadOptions::default();
817        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
818    }
819
820    pub fn full_iterator_cf<'a: 'b, 'b>(
821        &'a self,
822        cf_handle: &impl AsColumnFamilyRef,
823        mode: IteratorMode,
824    ) -> DBIteratorWithThreadMode<'b, Self> {
825        let mut opts = ReadOptions::default();
826        opts.set_total_order_seek(true);
827        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
828    }
829
830    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
831        &'a self,
832        cf_handle: &impl AsColumnFamilyRef,
833        prefix: P,
834    ) -> DBIteratorWithThreadMode<'a, Self> {
835        let mut opts = ReadOptions::default();
836        opts.set_prefix_same_as_start(true);
837        DBIteratorWithThreadMode::<'a, Self>::new_cf(
838            self,
839            cf_handle.inner(),
840            opts,
841            IteratorMode::From(prefix.as_ref(), Direction::Forward),
842        )
843    }
844
845    /// Opens a raw iterator over the database, using the default read options
846    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
847        let opts = ReadOptions::default();
848        DBRawIteratorWithThreadMode::new(self, opts)
849    }
850
851    /// Opens a raw iterator over the given column family, using the default read options
852    pub fn raw_iterator_cf<'a: 'b, 'b>(
853        &'a self,
854        cf_handle: &impl AsColumnFamilyRef,
855    ) -> DBRawIteratorWithThreadMode<'b, Self> {
856        let opts = ReadOptions::default();
857        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
858    }
859
860    /// Opens a raw iterator over the database, using the given read options
861    pub fn raw_iterator_opt<'a: 'b, 'b>(
862        &'a self,
863        readopts: ReadOptions,
864    ) -> DBRawIteratorWithThreadMode<'b, Self> {
865        DBRawIteratorWithThreadMode::new(self, readopts)
866    }
867
868    /// Opens a raw iterator over the given column family, using the given read options
869    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
870        &'a self,
871        cf_handle: &impl AsColumnFamilyRef,
872        readopts: ReadOptions,
873    ) -> DBRawIteratorWithThreadMode<'b, Self> {
874        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
875    }
876
877    pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
878        unsafe {
879            let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
880            let mut len: usize = 0;
881            let ptr = ffi::rocksdb_writebatch_wi_data(wi, std::ptr::from_mut(&mut len));
882            let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
883            ffi::rocksdb_free(wi as *mut c_void);
884            WriteBatchWithTransaction { inner: writebatch }
885        }
886    }
887
888    pub fn rebuild_from_writebatch(
889        &self,
890        writebatch: &WriteBatchWithTransaction<true>,
891    ) -> Result<(), Error> {
892        unsafe {
893            ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
894                self.inner,
895                writebatch.inner
896            ));
897        }
898        Ok(())
899    }
900}
901
902impl<DB> Drop for Transaction<'_, DB> {
903    fn drop(&mut self) {
904        unsafe {
905            ffi::rocksdb_transaction_destroy(self.inner);
906        }
907    }
908}