rust_rocksdb/transactions/
transaction.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::marker::PhantomData;
17
18use crate::{
19    db::{convert_values, DBAccess},
20    ffi, AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
21    Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
22};
23use libc::{c_char, c_void, size_t};
24
25// Default options are kept per-thread to avoid re-allocating on every call while
26// also preventing cross-thread sharing. Some RocksDB option wrappers hold
27// pointers into internal buffers and are not safe to share across threads.
28// Using thread_local allows cheap reuse in the common "default options" path
29// without synchronization overhead. Callers who need non-defaults must pass
30// explicit options.
31thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
32
33/// RocksDB Transaction.
34///
35/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
36///
37/// [`TransactionDB`]: crate::TransactionDB
38/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
39pub struct Transaction<'db, DB> {
40    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
41    pub(crate) _marker: PhantomData<&'db DB>,
42}
43
44unsafe impl<DB> Send for Transaction<'_, DB> {}
45
46impl<DB> DBAccess for Transaction<'_, DB> {
47    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
48        unsafe { ffi::rocksdb_transaction_get_snapshot(self.inner) }
49    }
50
51    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
52        unsafe {
53            ffi::rocksdb_free(snapshot as *mut c_void);
54        }
55    }
56
57    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
58        unsafe { ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner) }
59    }
60
61    unsafe fn create_iterator_cf(
62        &self,
63        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
64        readopts: &ReadOptions,
65    ) -> *mut ffi::rocksdb_iterator_t {
66        unsafe {
67            ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
68        }
69    }
70
71    fn get_opt<K: AsRef<[u8]>>(
72        &self,
73        key: K,
74        readopts: &ReadOptions,
75    ) -> Result<Option<Vec<u8>>, Error> {
76        self.get_opt(key, readopts)
77    }
78
79    fn get_cf_opt<K: AsRef<[u8]>>(
80        &self,
81        cf: &impl AsColumnFamilyRef,
82        key: K,
83        readopts: &ReadOptions,
84    ) -> Result<Option<Vec<u8>>, Error> {
85        self.get_cf_opt(cf, key, readopts)
86    }
87
88    fn get_pinned_opt<K: AsRef<[u8]>>(
89        &'_ self,
90        key: K,
91        readopts: &ReadOptions,
92    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
93        self.get_pinned_opt(key, readopts)
94    }
95
96    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
97        &'_ self,
98        cf: &impl AsColumnFamilyRef,
99        key: K,
100        readopts: &ReadOptions,
101    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
102        self.get_pinned_cf_opt(cf, key, readopts)
103    }
104
105    fn multi_get_opt<K, I>(
106        &self,
107        keys: I,
108        readopts: &ReadOptions,
109    ) -> Vec<Result<Option<Vec<u8>>, Error>>
110    where
111        K: AsRef<[u8]>,
112        I: IntoIterator<Item = K>,
113    {
114        self.multi_get_opt(keys, readopts)
115    }
116
117    fn multi_get_cf_opt<'b, K, I, W>(
118        &self,
119        keys_cf: I,
120        readopts: &ReadOptions,
121    ) -> Vec<Result<Option<Vec<u8>>, Error>>
122    where
123        K: AsRef<[u8]>,
124        I: IntoIterator<Item = (&'b W, K)>,
125        W: AsColumnFamilyRef + 'b,
126    {
127        self.multi_get_cf_opt(keys_cf, readopts)
128    }
129}
130
131impl<DB> Transaction<'_, DB> {
132    /// Write all batched keys to the DB atomically.
133    ///
134    /// May return any error that could be returned by `DB::write`.
135    ///
136    /// If this transaction was created by a [`TransactionDB`], an error of
137    /// the [`Expired`] kind may be returned if this transaction has
138    /// lived longer than expiration time in [`TransactionOptions`].
139    ///
140    /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
141    /// the [`Busy`] kind may be returned if the transaction
142    /// could not guarantee that there are no write conflicts.
143    /// An error of the [`TryAgain`] kind may be returned if the memtable
144    /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
145    ///
146    /// [`Expired`]: crate::ErrorKind::Expired
147    /// [`TransactionOptions`]: crate::TransactionOptions
148    /// [`TransactionDB`]: crate::TransactionDB
149    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
150    /// [`Busy`]: crate::ErrorKind::Busy
151    /// [`TryAgain`]: crate::ErrorKind::TryAgain
152    /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
153    pub fn commit(self) -> Result<(), Error> {
154        unsafe {
155            ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
156        }
157        Ok(())
158    }
159
160    pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
161        let ptr = name.as_ptr();
162        let len = name.len();
163        unsafe {
164            ffi_try!(ffi::rocksdb_transaction_set_name(
165                self.inner, ptr as _, len as _
166            ));
167        }
168
169        Ok(())
170    }
171
172    pub fn get_name(&self) -> Option<Vec<u8>> {
173        unsafe {
174            let mut name_len = 0;
175            let name = ffi::rocksdb_transaction_get_name(self.inner, &mut name_len);
176            if name.is_null() {
177                None
178            } else {
179                let mut vec = vec![0; name_len];
180                std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
181                ffi::rocksdb_free(name as *mut c_void);
182                Some(vec)
183            }
184        }
185    }
186
187    pub fn prepare(&self) -> Result<(), Error> {
188        unsafe {
189            ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
190        }
191        Ok(())
192    }
193
194    /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
195    /// Otherwise, returns a snapshot with `nullptr` inside which doesn't affect read operations.
196    ///
197    /// [`TransactionOptions`]: crate::TransactionOptions
198    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
199        SnapshotWithThreadMode::new(self)
200    }
201
202    /// Discard all batched writes in this transaction.
203    pub fn rollback(&self) -> Result<(), Error> {
204        unsafe {
205            ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
206            Ok(())
207        }
208    }
209
210    /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
211    /// May be called multiple times to set multiple save points.
212    ///
213    /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
214    pub fn set_savepoint(&self) {
215        unsafe {
216            ffi::rocksdb_transaction_set_savepoint(self.inner);
217        }
218    }
219
220    /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
221    /// and removes the most recent [`set_savepoint`].
222    ///
223    /// Returns error if there is no previous call to [`set_savepoint`].
224    ///
225    /// [`set_savepoint`]: Self::set_savepoint
226    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
227        unsafe {
228            ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
229            Ok(())
230        }
231    }
232
233    /// Get the bytes associated with a key value.
234    ///
235    /// See [`get_cf_opt`] for details.
236    ///
237    /// [`get_cf_opt`]: Self::get_cf_opt
238    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
239        DEFAULT_READ_OPTS.with(|opts| self.get_opt(key, opts))
240    }
241
242    pub fn get_pinned<K: AsRef<[u8]>>(
243        &'_ self,
244        key: K,
245    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
246        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
247    }
248
249    /// Get the bytes associated with a key value and the given column family.
250    ///
251    /// See [`get_cf_opt`] for details.
252    ///
253    /// [`get_cf_opt`]: Self::get_cf_opt
254    pub fn get_cf<K: AsRef<[u8]>>(
255        &self,
256        cf: &impl AsColumnFamilyRef,
257        key: K,
258    ) -> Result<Option<Vec<u8>>, Error> {
259        DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key, opts))
260    }
261
262    pub fn get_pinned_cf<K: AsRef<[u8]>>(
263        &'_ self,
264        cf: &impl AsColumnFamilyRef,
265        key: K,
266    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
267        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
268    }
269
270    /// Get the key and ensure that this transaction will only
271    /// be able to be committed if this key is not written outside this
272    /// transaction after it has first been read (or after the snapshot if a
273    /// snapshot is set in this transaction).
274    ///
275    /// See [`get_for_update_cf_opt`] for details.
276    ///
277    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
278    pub fn get_for_update<K: AsRef<[u8]>>(
279        &self,
280        key: K,
281        exclusive: bool,
282    ) -> Result<Option<Vec<u8>>, Error> {
283        DEFAULT_READ_OPTS.with(|opts| self.get_for_update_opt(key, exclusive, opts))
284    }
285
286    pub fn get_pinned_for_update<K: AsRef<[u8]>>(
287        &'_ self,
288        key: K,
289        exclusive: bool,
290    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
291        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_opt(key, exclusive, opts))
292    }
293
294    /// Get the key in the given column family and ensure that this transaction will only
295    /// be able to be committed if this key is not written outside this
296    /// transaction after it has first been read (or after the snapshot if a
297    /// snapshot is set in this transaction).
298    ///
299    /// See [`get_for_update_cf_opt`] for details.
300    ///
301    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
302    pub fn get_for_update_cf<K: AsRef<[u8]>>(
303        &self,
304        cf: &impl AsColumnFamilyRef,
305        key: K,
306        exclusive: bool,
307    ) -> Result<Option<Vec<u8>>, Error> {
308        DEFAULT_READ_OPTS.with(|opts| self.get_for_update_cf_opt(cf, key, exclusive, opts))
309    }
310
311    pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
312        &'_ self,
313        cf: &impl AsColumnFamilyRef,
314        key: K,
315        exclusive: bool,
316    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
317        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts))
318    }
319
320    /// Returns the bytes associated with a key value with read options.
321    ///
322    /// See [`get_cf_opt`] for details.
323    ///
324    /// [`get_cf_opt`]: Self::get_cf_opt
325    pub fn get_opt<K: AsRef<[u8]>>(
326        &self,
327        key: K,
328        readopts: &ReadOptions,
329    ) -> Result<Option<Vec<u8>>, Error> {
330        self.get_pinned_opt(key, readopts)
331            .map(|x| x.map(|v| v.as_ref().to_vec()))
332    }
333
334    pub fn get_pinned_opt<K: AsRef<[u8]>>(
335        &'_ self,
336        key: K,
337        readopts: &ReadOptions,
338    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
339        let key = key.as_ref();
340        unsafe {
341            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
342                self.inner,
343                readopts.inner,
344                key.as_ptr() as *const c_char,
345                key.len(),
346            ));
347            if val.is_null() {
348                Ok(None)
349            } else {
350                Ok(Some(DBPinnableSlice::from_c(val)))
351            }
352        }
353    }
354
355    /// Get the bytes associated with a key value and the given column family with read options.
356    ///
357    /// This function will also read pending changes in this transaction.
358    /// Currently, this function will return an error of the [`MergeInProgress`] kind
359    /// if the most recent write to the queried key in this batch is a Merge.
360    ///
361    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
362    pub fn get_cf_opt<K: AsRef<[u8]>>(
363        &self,
364        cf: &impl AsColumnFamilyRef,
365        key: K,
366        readopts: &ReadOptions,
367    ) -> Result<Option<Vec<u8>>, Error> {
368        self.get_pinned_cf_opt(cf, key, readopts)
369            .map(|x| x.map(|v| v.as_ref().to_vec()))
370    }
371
372    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
373        &'_ self,
374        cf: &impl AsColumnFamilyRef,
375        key: K,
376        readopts: &ReadOptions,
377    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
378        let key = key.as_ref();
379        unsafe {
380            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
381                self.inner,
382                readopts.inner,
383                cf.inner(),
384                key.as_ptr() as *const c_char,
385                key.len(),
386            ));
387            if val.is_null() {
388                Ok(None)
389            } else {
390                Ok(Some(DBPinnableSlice::from_c(val)))
391            }
392        }
393    }
394
395    /// Get the key with read options and ensure that this transaction will only
396    /// be able to be committed if this key is not written outside this
397    /// transaction after it has first been read (or after the snapshot if a
398    /// snapshot is set in this transaction).
399    ///
400    /// See [`get_for_update_cf_opt`] for details.
401    ///
402    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
403    pub fn get_for_update_opt<K: AsRef<[u8]>>(
404        &self,
405        key: K,
406        exclusive: bool,
407        opts: &ReadOptions,
408    ) -> Result<Option<Vec<u8>>, Error> {
409        self.get_pinned_for_update_opt(key, exclusive, opts)
410            .map(|x| x.map(|v| v.as_ref().to_vec()))
411    }
412
413    pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
414        &'_ self,
415        key: K,
416        exclusive: bool,
417        opts: &ReadOptions,
418    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
419        let key = key.as_ref();
420        unsafe {
421            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
422                self.inner,
423                opts.inner,
424                key.as_ptr() as *const c_char,
425                key.len() as size_t,
426                u8::from(exclusive),
427            ));
428            if val.is_null() {
429                Ok(None)
430            } else {
431                Ok(Some(DBPinnableSlice::from_c(val)))
432            }
433        }
434    }
435
436    /// Get the key in the given column family with read options
437    /// and ensure that this transaction will only
438    /// be able to be committed if this key is not written outside this
439    /// transaction after it has first been read (or after the snapshot if a
440    /// snapshot is set in this transaction).
441    ///
442    /// Currently, this function will return an error of the [`MergeInProgress`]
443    /// if the most recent write to the queried key in this batch is a Merge.
444    ///
445    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
446    /// * [`Busy`] if there is a write conflict.
447    /// * [`TimedOut`] if a lock could not be acquired.
448    /// * [`TryAgain`] if the memtable history size is not large enough.
449    /// * [`MergeInProgress`] if merge operations cannot be resolved.
450    /// * or other errors if this key could not be read.
451    ///
452    /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
453    /// can cause [`commit`] to fail. Otherwise, it could return any error that could
454    /// be returned by `[DB::get]`.
455    ///
456    /// [`Busy`]: crate::ErrorKind::Busy
457    /// [`TimedOut`]: crate::ErrorKind::TimedOut
458    /// [`TryAgain`]: crate::ErrorKind::TryAgain
459    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
460    /// [`TransactionDB`]: crate::TransactionDB
461    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
462    /// [`commit`]: Self::commit
463    /// [`DB::get`]: crate::DB::get
464    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
465        &self,
466        cf: &impl AsColumnFamilyRef,
467        key: K,
468        exclusive: bool,
469        opts: &ReadOptions,
470    ) -> Result<Option<Vec<u8>>, Error> {
471        self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
472            .map(|x| x.map(|v| v.as_ref().to_vec()))
473    }
474
475    pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
476        &'_ self,
477        cf: &impl AsColumnFamilyRef,
478        key: K,
479        exclusive: bool,
480        opts: &ReadOptions,
481    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
482        let key = key.as_ref();
483        unsafe {
484            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
485                self.inner,
486                opts.inner,
487                cf.inner(),
488                key.as_ptr() as *const c_char,
489                key.len() as size_t,
490                u8::from(exclusive),
491            ));
492            if val.is_null() {
493                Ok(None)
494            } else {
495                Ok(Some(DBPinnableSlice::from_c(val)))
496            }
497        }
498    }
499
500    /// Return the values associated with the given keys.
501    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
502    where
503        K: AsRef<[u8]>,
504        I: IntoIterator<Item = K>,
505    {
506        self.multi_get_opt(keys, &ReadOptions::default())
507    }
508
509    /// Return the values associated with the given keys using read options.
510    pub fn multi_get_opt<K, I>(
511        &self,
512        keys: I,
513        readopts: &ReadOptions,
514    ) -> Vec<Result<Option<Vec<u8>>, Error>>
515    where
516        K: AsRef<[u8]>,
517        I: IntoIterator<Item = K>,
518    {
519        let owned_keys: Vec<K> = keys.into_iter().collect();
520        let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
521        let ptr_keys: Vec<*const c_char> = owned_keys
522            .iter()
523            .map(|k| k.as_ref().as_ptr() as *const c_char)
524            .collect();
525
526        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
527        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
528        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
529        unsafe {
530            ffi::rocksdb_transaction_multi_get(
531                self.inner,
532                readopts.inner,
533                ptr_keys.len(),
534                ptr_keys.as_ptr(),
535                keys_sizes.as_ptr(),
536                values.as_mut_ptr(),
537                values_sizes.as_mut_ptr(),
538                errors.as_mut_ptr(),
539            );
540        }
541
542        unsafe {
543            values.set_len(ptr_keys.len());
544            values_sizes.set_len(ptr_keys.len());
545            errors.set_len(ptr_keys.len());
546        }
547
548        convert_values(values, values_sizes, errors)
549    }
550
551    /// Return the values associated with the given keys and column families.
552    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
553        &'a self,
554        keys: I,
555    ) -> Vec<Result<Option<Vec<u8>>, Error>>
556    where
557        K: AsRef<[u8]>,
558        I: IntoIterator<Item = (&'b W, K)>,
559        W: 'b + AsColumnFamilyRef,
560    {
561        DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
562    }
563
564    /// Return the values associated with the given keys and column families using read options.
565    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
566        &'a self,
567        keys: I,
568        readopts: &ReadOptions,
569    ) -> Vec<Result<Option<Vec<u8>>, Error>>
570    where
571        K: AsRef<[u8]>,
572        I: IntoIterator<Item = (&'b W, K)>,
573        W: 'b + AsColumnFamilyRef,
574    {
575        let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
576        let keys_sizes: Vec<usize> = cfs_and_owned_keys
577            .iter()
578            .map(|(_, k)| k.as_ref().len())
579            .collect();
580        let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
581            .iter()
582            .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
583            .collect();
584        let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
585            .iter()
586            .map(|(c, _)| c.inner().cast_const())
587            .collect();
588        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
589        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
590        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
591        unsafe {
592            ffi::rocksdb_transaction_multi_get_cf(
593                self.inner,
594                readopts.inner,
595                ptr_cfs.as_ptr(),
596                ptr_keys.len(),
597                ptr_keys.as_ptr(),
598                keys_sizes.as_ptr(),
599                values.as_mut_ptr(),
600                values_sizes.as_mut_ptr(),
601                errors.as_mut_ptr(),
602            );
603        }
604
605        unsafe {
606            values.set_len(ptr_keys.len());
607            values_sizes.set_len(ptr_keys.len());
608            errors.set_len(ptr_keys.len());
609        }
610
611        convert_values(values, values_sizes, errors)
612    }
613
614    /// Put the key value in default column family and do conflict checking on the key.
615    ///
616    /// See [`put_cf`] for details.
617    ///
618    /// [`put_cf`]: Self::put_cf
619    pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
620        let key = key.as_ref();
621        let value = value.as_ref();
622        unsafe {
623            ffi_try!(ffi::rocksdb_transaction_put(
624                self.inner,
625                key.as_ptr() as *const c_char,
626                key.len() as size_t,
627                value.as_ptr() as *const c_char,
628                value.len() as size_t,
629            ));
630            Ok(())
631        }
632    }
633
634    /// Put the key value in the given column family and do conflict checking on the key.
635    ///
636    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
637    /// * [`Busy`] if there is a write conflict.
638    /// * [`TimedOut`] if a lock could not be acquired.
639    /// * [`TryAgain`] if the memtable history size is not large enough.
640    /// * [`MergeInProgress`] if merge operations cannot be resolved.
641    /// * or other errors on unexpected failures.
642    ///
643    /// [`Busy`]: crate::ErrorKind::Busy
644    /// [`TimedOut`]: crate::ErrorKind::TimedOut
645    /// [`TryAgain`]: crate::ErrorKind::TryAgain
646    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
647    /// [`TransactionDB`]: crate::TransactionDB
648    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
649    pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
650        &self,
651        cf: &impl AsColumnFamilyRef,
652        key: K,
653        value: V,
654    ) -> Result<(), Error> {
655        let key = key.as_ref();
656        let value = value.as_ref();
657        unsafe {
658            ffi_try!(ffi::rocksdb_transaction_put_cf(
659                self.inner,
660                cf.inner(),
661                key.as_ptr() as *const c_char,
662                key.len() as size_t,
663                value.as_ptr() as *const c_char,
664                value.len() as size_t,
665            ));
666            Ok(())
667        }
668    }
669
670    /// Merge value with existing value of key, and also do conflict checking on the key.
671    ///
672    /// See [`merge_cf`] for details.
673    ///
674    /// [`merge_cf`]: Self::merge_cf
675    pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
676        let key = key.as_ref();
677        let value = value.as_ref();
678        unsafe {
679            ffi_try!(ffi::rocksdb_transaction_merge(
680                self.inner,
681                key.as_ptr() as *const c_char,
682                key.len() as size_t,
683                value.as_ptr() as *const c_char,
684                value.len() as size_t
685            ));
686            Ok(())
687        }
688    }
689
690    /// Merge `value` with existing value of `key` in the given column family,
691    /// and also do conflict checking on the key.
692    ///
693    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
694    /// * [`Busy`] if there is a write conflict.
695    /// * [`TimedOut`] if a lock could not be acquired.
696    /// * [`TryAgain`] if the memtable history size is not large enough.
697    /// * [`MergeInProgress`] if merge operations cannot be resolved.
698    /// * or other errors on unexpected failures.
699    ///
700    /// [`Busy`]: crate::ErrorKind::Busy
701    /// [`TimedOut`]: crate::ErrorKind::TimedOut
702    /// [`TryAgain`]: crate::ErrorKind::TryAgain
703    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
704    /// [`TransactionDB`]: crate::TransactionDB
705    pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
706        &self,
707        cf: &impl AsColumnFamilyRef,
708        key: K,
709        value: V,
710    ) -> Result<(), Error> {
711        let key = key.as_ref();
712        let value = value.as_ref();
713        unsafe {
714            ffi_try!(ffi::rocksdb_transaction_merge_cf(
715                self.inner,
716                cf.inner(),
717                key.as_ptr() as *const c_char,
718                key.len() as size_t,
719                value.as_ptr() as *const c_char,
720                value.len() as size_t
721            ));
722            Ok(())
723        }
724    }
725
726    /// Delete the key value if it exists and do conflict checking on the key.
727    ///
728    /// See [`delete_cf`] for details.
729    ///
730    /// [`delete_cf`]: Self::delete_cf
731    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
732        let key = key.as_ref();
733        unsafe {
734            ffi_try!(ffi::rocksdb_transaction_delete(
735                self.inner,
736                key.as_ptr() as *const c_char,
737                key.len() as size_t
738            ));
739        }
740        Ok(())
741    }
742
743    /// Delete the key value in the given column family and do conflict checking.
744    ///
745    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
746    /// * [`Busy`] if there is a write conflict.
747    /// * [`TimedOut`] if a lock could not be acquired.
748    /// * [`TryAgain`] if the memtable history size is not large enough.
749    /// * [`MergeInProgress`] if merge operations cannot be resolved.
750    /// * or other errors on unexpected failures.
751    ///
752    /// [`Busy`]: crate::ErrorKind::Busy
753    /// [`TimedOut`]: crate::ErrorKind::TimedOut
754    /// [`TryAgain`]: crate::ErrorKind::TryAgain
755    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
756    /// [`TransactionDB`]: crate::TransactionDB
757    pub fn delete_cf<K: AsRef<[u8]>>(
758        &self,
759        cf: &impl AsColumnFamilyRef,
760        key: K,
761    ) -> Result<(), Error> {
762        let key = key.as_ref();
763        unsafe {
764            ffi_try!(ffi::rocksdb_transaction_delete_cf(
765                self.inner,
766                cf.inner(),
767                key.as_ptr() as *const c_char,
768                key.len() as size_t
769            ));
770        }
771        Ok(())
772    }
773
774    pub fn iterator<'a: 'b, 'b>(
775        &'a self,
776        mode: IteratorMode,
777    ) -> DBIteratorWithThreadMode<'b, Self> {
778        let readopts = ReadOptions::default();
779        self.iterator_opt(mode, readopts)
780    }
781
782    pub fn iterator_opt<'a: 'b, 'b>(
783        &'a self,
784        mode: IteratorMode,
785        readopts: ReadOptions,
786    ) -> DBIteratorWithThreadMode<'b, Self> {
787        DBIteratorWithThreadMode::new(self, readopts, mode)
788    }
789
790    /// Opens an iterator using the provided ReadOptions.
791    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
792    pub fn iterator_cf_opt<'a: 'b, 'b>(
793        &'a self,
794        cf_handle: &impl AsColumnFamilyRef,
795        readopts: ReadOptions,
796        mode: IteratorMode,
797    ) -> DBIteratorWithThreadMode<'b, Self> {
798        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
799    }
800
801    /// Opens an iterator with `set_total_order_seek` enabled.
802    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
803    /// with a Hash-based implementation.
804    pub fn full_iterator<'a: 'b, 'b>(
805        &'a self,
806        mode: IteratorMode,
807    ) -> DBIteratorWithThreadMode<'b, Self> {
808        let mut opts = ReadOptions::default();
809        opts.set_total_order_seek(true);
810        DBIteratorWithThreadMode::new(self, opts, mode)
811    }
812
813    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
814        &'a self,
815        prefix: P,
816    ) -> DBIteratorWithThreadMode<'b, Self> {
817        let mut opts = ReadOptions::default();
818        opts.set_prefix_same_as_start(true);
819        DBIteratorWithThreadMode::new(
820            self,
821            opts,
822            IteratorMode::From(prefix.as_ref(), Direction::Forward),
823        )
824    }
825
826    pub fn iterator_cf<'a: 'b, 'b>(
827        &'a self,
828        cf_handle: &impl AsColumnFamilyRef,
829        mode: IteratorMode,
830    ) -> DBIteratorWithThreadMode<'b, Self> {
831        let opts = ReadOptions::default();
832        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
833    }
834
835    pub fn full_iterator_cf<'a: 'b, 'b>(
836        &'a self,
837        cf_handle: &impl AsColumnFamilyRef,
838        mode: IteratorMode,
839    ) -> DBIteratorWithThreadMode<'b, Self> {
840        let mut opts = ReadOptions::default();
841        opts.set_total_order_seek(true);
842        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
843    }
844
845    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
846        &'a self,
847        cf_handle: &impl AsColumnFamilyRef,
848        prefix: P,
849    ) -> DBIteratorWithThreadMode<'a, Self> {
850        let mut opts = ReadOptions::default();
851        opts.set_prefix_same_as_start(true);
852        DBIteratorWithThreadMode::<'a, Self>::new_cf(
853            self,
854            cf_handle.inner(),
855            opts,
856            IteratorMode::From(prefix.as_ref(), Direction::Forward),
857        )
858    }
859
860    /// Opens a raw iterator over the database, using the default read options
861    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
862        let opts = ReadOptions::default();
863        DBRawIteratorWithThreadMode::new(self, opts)
864    }
865
866    /// Opens a raw iterator over the given column family, using the default read options
867    pub fn raw_iterator_cf<'a: 'b, 'b>(
868        &'a self,
869        cf_handle: &impl AsColumnFamilyRef,
870    ) -> DBRawIteratorWithThreadMode<'b, Self> {
871        let opts = ReadOptions::default();
872        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
873    }
874
875    /// Opens a raw iterator over the database, using the given read options
876    pub fn raw_iterator_opt<'a: 'b, 'b>(
877        &'a self,
878        readopts: ReadOptions,
879    ) -> DBRawIteratorWithThreadMode<'b, Self> {
880        DBRawIteratorWithThreadMode::new(self, readopts)
881    }
882
883    /// Opens a raw iterator over the given column family, using the given read options
884    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
885        &'a self,
886        cf_handle: &impl AsColumnFamilyRef,
887        readopts: ReadOptions,
888    ) -> DBRawIteratorWithThreadMode<'b, Self> {
889        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
890    }
891
892    pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
893        unsafe {
894            let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
895            let mut len: usize = 0;
896            let ptr = ffi::rocksdb_writebatch_wi_data(wi, std::ptr::from_mut(&mut len));
897            let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
898            ffi::rocksdb_free(wi as *mut c_void);
899            WriteBatchWithTransaction { inner: writebatch }
900        }
901    }
902
903    pub fn rebuild_from_writebatch(
904        &self,
905        writebatch: &WriteBatchWithTransaction<true>,
906    ) -> Result<(), Error> {
907        unsafe {
908            ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
909                self.inner,
910                writebatch.inner
911            ));
912        }
913        Ok(())
914    }
915}
916
917impl<DB> Drop for Transaction<'_, DB> {
918    fn drop(&mut self) {
919        unsafe {
920            ffi::rocksdb_transaction_destroy(self.inner);
921        }
922    }
923}