rust_rocksdb/transactions/
transaction.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::marker::PhantomData;
17
18use crate::{
19    AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
20    Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
21    db::{DBAccess, convert_values},
22    ffi,
23};
24use libc::{c_char, c_void, size_t};
25
26// Default options are kept per-thread to avoid re-allocating on every call while
27// also preventing cross-thread sharing. Some RocksDB option wrappers hold
28// pointers into internal buffers and are not safe to share across threads.
29// Using thread_local allows cheap reuse in the common "default options" path
30// without synchronization overhead. Callers who need non-defaults must pass
31// explicit options.
32thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
33
34/// RocksDB Transaction.
35///
36/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
37///
38/// [`TransactionDB`]: crate::TransactionDB
39/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
40pub struct Transaction<'db, DB> {
41    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
42    pub(crate) _marker: PhantomData<&'db DB>,
43}
44
45unsafe impl<DB> Send for Transaction<'_, DB> {}
46
47impl<DB> DBAccess for Transaction<'_, DB> {
48    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
49        unsafe { ffi::rocksdb_transaction_get_snapshot(self.inner) }
50    }
51
52    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
53        unsafe {
54            ffi::rocksdb_free(snapshot as *mut c_void);
55        }
56    }
57
58    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
59        unsafe { ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner) }
60    }
61
62    unsafe fn create_iterator_cf(
63        &self,
64        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
65        readopts: &ReadOptions,
66    ) -> *mut ffi::rocksdb_iterator_t {
67        unsafe {
68            ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
69        }
70    }
71
72    fn get_opt<K: AsRef<[u8]>>(
73        &self,
74        key: K,
75        readopts: &ReadOptions,
76    ) -> Result<Option<Vec<u8>>, Error> {
77        self.get_opt(key, readopts)
78    }
79
80    fn get_cf_opt<K: AsRef<[u8]>>(
81        &self,
82        cf: &impl AsColumnFamilyRef,
83        key: K,
84        readopts: &ReadOptions,
85    ) -> Result<Option<Vec<u8>>, Error> {
86        self.get_cf_opt(cf, key, readopts)
87    }
88
89    fn get_pinned_opt<K: AsRef<[u8]>>(
90        &'_ self,
91        key: K,
92        readopts: &ReadOptions,
93    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
94        self.get_pinned_opt(key, readopts)
95    }
96
97    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
98        &'_ self,
99        cf: &impl AsColumnFamilyRef,
100        key: K,
101        readopts: &ReadOptions,
102    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
103        self.get_pinned_cf_opt(cf, key, readopts)
104    }
105
106    fn multi_get_opt<K, I>(
107        &self,
108        keys: I,
109        readopts: &ReadOptions,
110    ) -> Vec<Result<Option<Vec<u8>>, Error>>
111    where
112        K: AsRef<[u8]>,
113        I: IntoIterator<Item = K>,
114    {
115        self.multi_get_opt(keys, readopts)
116    }
117
118    fn multi_get_cf_opt<'b, K, I, W>(
119        &self,
120        keys_cf: I,
121        readopts: &ReadOptions,
122    ) -> Vec<Result<Option<Vec<u8>>, Error>>
123    where
124        K: AsRef<[u8]>,
125        I: IntoIterator<Item = (&'b W, K)>,
126        W: AsColumnFamilyRef + 'b,
127    {
128        self.multi_get_cf_opt(keys_cf, readopts)
129    }
130}
131
132impl<DB> Transaction<'_, DB> {
133    /// Write all batched keys to the DB atomically.
134    ///
135    /// May return any error that could be returned by `DB::write`.
136    ///
137    /// If this transaction was created by a [`TransactionDB`], an error of
138    /// the [`Expired`] kind may be returned if this transaction has
139    /// lived longer than expiration time in [`TransactionOptions`].
140    ///
141    /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
142    /// the [`Busy`] kind may be returned if the transaction
143    /// could not guarantee that there are no write conflicts.
144    /// An error of the [`TryAgain`] kind may be returned if the memtable
145    /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
146    ///
147    /// [`Expired`]: crate::ErrorKind::Expired
148    /// [`TransactionOptions`]: crate::TransactionOptions
149    /// [`TransactionDB`]: crate::TransactionDB
150    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
151    /// [`Busy`]: crate::ErrorKind::Busy
152    /// [`TryAgain`]: crate::ErrorKind::TryAgain
153    /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
154    pub fn commit(self) -> Result<(), Error> {
155        unsafe {
156            ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
157        }
158        Ok(())
159    }
160
161    pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
162        let ptr = name.as_ptr();
163        let len = name.len();
164        unsafe {
165            ffi_try!(ffi::rocksdb_transaction_set_name(
166                self.inner, ptr as _, len as _
167            ));
168        }
169
170        Ok(())
171    }
172
173    pub fn get_name(&self) -> Option<Vec<u8>> {
174        unsafe {
175            let mut name_len = 0;
176            let name = ffi::rocksdb_transaction_get_name(self.inner, &raw mut name_len);
177            if name.is_null() {
178                None
179            } else {
180                let mut vec = vec![0; name_len];
181                std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
182                ffi::rocksdb_free(name as *mut c_void);
183                Some(vec)
184            }
185        }
186    }
187
188    pub fn prepare(&self) -> Result<(), Error> {
189        unsafe {
190            ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
191        }
192        Ok(())
193    }
194
195    /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
196    /// Otherwise, returns a snapshot with `nullptr` inside which doesn't affect read operations.
197    ///
198    /// [`TransactionOptions`]: crate::TransactionOptions
199    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
200        SnapshotWithThreadMode::new(self)
201    }
202
203    /// Discard all batched writes in this transaction.
204    pub fn rollback(&self) -> Result<(), Error> {
205        unsafe {
206            ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
207            Ok(())
208        }
209    }
210
211    /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
212    /// May be called multiple times to set multiple save points.
213    ///
214    /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
215    pub fn set_savepoint(&self) {
216        unsafe {
217            ffi::rocksdb_transaction_set_savepoint(self.inner);
218        }
219    }
220
221    /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
222    /// and removes the most recent [`set_savepoint`].
223    ///
224    /// Returns error if there is no previous call to [`set_savepoint`].
225    ///
226    /// [`set_savepoint`]: Self::set_savepoint
227    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
228        unsafe {
229            ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
230            Ok(())
231        }
232    }
233
234    /// Get the bytes associated with a key value.
235    ///
236    /// See [`get_cf_opt`] for details.
237    ///
238    /// [`get_cf_opt`]: Self::get_cf_opt
239    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
240        DEFAULT_READ_OPTS.with(|opts| self.get_opt(key, opts))
241    }
242
243    pub fn get_pinned<K: AsRef<[u8]>>(
244        &'_ self,
245        key: K,
246    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
247        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
248    }
249
250    /// Get the bytes associated with a key value and the given column family.
251    ///
252    /// See [`get_cf_opt`] for details.
253    ///
254    /// [`get_cf_opt`]: Self::get_cf_opt
255    pub fn get_cf<K: AsRef<[u8]>>(
256        &self,
257        cf: &impl AsColumnFamilyRef,
258        key: K,
259    ) -> Result<Option<Vec<u8>>, Error> {
260        DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key, opts))
261    }
262
263    pub fn get_pinned_cf<K: AsRef<[u8]>>(
264        &'_ self,
265        cf: &impl AsColumnFamilyRef,
266        key: K,
267    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
268        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
269    }
270
271    /// Get the key and ensure that this transaction will only
272    /// be able to be committed if this key is not written outside this
273    /// transaction after it has first been read (or after the snapshot if a
274    /// snapshot is set in this transaction).
275    ///
276    /// See [`get_for_update_cf_opt`] for details.
277    ///
278    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
279    pub fn get_for_update<K: AsRef<[u8]>>(
280        &self,
281        key: K,
282        exclusive: bool,
283    ) -> Result<Option<Vec<u8>>, Error> {
284        DEFAULT_READ_OPTS.with(|opts| self.get_for_update_opt(key, exclusive, opts))
285    }
286
287    pub fn get_pinned_for_update<K: AsRef<[u8]>>(
288        &'_ self,
289        key: K,
290        exclusive: bool,
291    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
292        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_opt(key, exclusive, opts))
293    }
294
295    /// Get the key in the given column family and ensure that this transaction will only
296    /// be able to be committed if this key is not written outside this
297    /// transaction after it has first been read (or after the snapshot if a
298    /// snapshot is set in this transaction).
299    ///
300    /// See [`get_for_update_cf_opt`] for details.
301    ///
302    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
303    pub fn get_for_update_cf<K: AsRef<[u8]>>(
304        &self,
305        cf: &impl AsColumnFamilyRef,
306        key: K,
307        exclusive: bool,
308    ) -> Result<Option<Vec<u8>>, Error> {
309        DEFAULT_READ_OPTS.with(|opts| self.get_for_update_cf_opt(cf, key, exclusive, opts))
310    }
311
312    pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
313        &'_ self,
314        cf: &impl AsColumnFamilyRef,
315        key: K,
316        exclusive: bool,
317    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
318        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts))
319    }
320
321    /// Returns the bytes associated with a key value with read options.
322    ///
323    /// See [`get_cf_opt`] for details.
324    ///
325    /// [`get_cf_opt`]: Self::get_cf_opt
326    pub fn get_opt<K: AsRef<[u8]>>(
327        &self,
328        key: K,
329        readopts: &ReadOptions,
330    ) -> Result<Option<Vec<u8>>, Error> {
331        self.get_pinned_opt(key, readopts)
332            .map(|x| x.map(|v| v.as_ref().to_vec()))
333    }
334
335    pub fn get_pinned_opt<K: AsRef<[u8]>>(
336        &'_ self,
337        key: K,
338        readopts: &ReadOptions,
339    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
340        let key = key.as_ref();
341        unsafe {
342            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
343                self.inner,
344                readopts.inner,
345                key.as_ptr() as *const c_char,
346                key.len(),
347            ));
348            if val.is_null() {
349                Ok(None)
350            } else {
351                Ok(Some(DBPinnableSlice::from_c(val)))
352            }
353        }
354    }
355
356    /// Get the bytes associated with a key value and the given column family with read options.
357    ///
358    /// This function will also read pending changes in this transaction.
359    /// Currently, this function will return an error of the [`MergeInProgress`] kind
360    /// if the most recent write to the queried key in this batch is a Merge.
361    ///
362    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
363    pub fn get_cf_opt<K: AsRef<[u8]>>(
364        &self,
365        cf: &impl AsColumnFamilyRef,
366        key: K,
367        readopts: &ReadOptions,
368    ) -> Result<Option<Vec<u8>>, Error> {
369        self.get_pinned_cf_opt(cf, key, readopts)
370            .map(|x| x.map(|v| v.as_ref().to_vec()))
371    }
372
373    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
374        &'_ self,
375        cf: &impl AsColumnFamilyRef,
376        key: K,
377        readopts: &ReadOptions,
378    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
379        let key = key.as_ref();
380        unsafe {
381            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
382                self.inner,
383                readopts.inner,
384                cf.inner(),
385                key.as_ptr() as *const c_char,
386                key.len(),
387            ));
388            if val.is_null() {
389                Ok(None)
390            } else {
391                Ok(Some(DBPinnableSlice::from_c(val)))
392            }
393        }
394    }
395
396    /// Get the key with read options and ensure that this transaction will only
397    /// be able to be committed if this key is not written outside this
398    /// transaction after it has first been read (or after the snapshot if a
399    /// snapshot is set in this transaction).
400    ///
401    /// See [`get_for_update_cf_opt`] for details.
402    ///
403    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
404    pub fn get_for_update_opt<K: AsRef<[u8]>>(
405        &self,
406        key: K,
407        exclusive: bool,
408        opts: &ReadOptions,
409    ) -> Result<Option<Vec<u8>>, Error> {
410        self.get_pinned_for_update_opt(key, exclusive, opts)
411            .map(|x| x.map(|v| v.as_ref().to_vec()))
412    }
413
414    pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
415        &'_ self,
416        key: K,
417        exclusive: bool,
418        opts: &ReadOptions,
419    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
420        let key = key.as_ref();
421        unsafe {
422            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
423                self.inner,
424                opts.inner,
425                key.as_ptr() as *const c_char,
426                key.len() as size_t,
427                u8::from(exclusive),
428            ));
429            if val.is_null() {
430                Ok(None)
431            } else {
432                Ok(Some(DBPinnableSlice::from_c(val)))
433            }
434        }
435    }
436
437    /// Get the key in the given column family with read options
438    /// and ensure that this transaction will only
439    /// be able to be committed if this key is not written outside this
440    /// transaction after it has first been read (or after the snapshot if a
441    /// snapshot is set in this transaction).
442    ///
443    /// Currently, this function will return an error of the [`MergeInProgress`]
444    /// if the most recent write to the queried key in this batch is a Merge.
445    ///
446    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
447    /// * [`Busy`] if there is a write conflict.
448    /// * [`TimedOut`] if a lock could not be acquired.
449    /// * [`TryAgain`] if the memtable history size is not large enough.
450    /// * [`MergeInProgress`] if merge operations cannot be resolved.
451    /// * or other errors if this key could not be read.
452    ///
453    /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
454    /// can cause [`commit`] to fail. Otherwise, it could return any error that could
455    /// be returned by `[DB::get]`.
456    ///
457    /// [`Busy`]: crate::ErrorKind::Busy
458    /// [`TimedOut`]: crate::ErrorKind::TimedOut
459    /// [`TryAgain`]: crate::ErrorKind::TryAgain
460    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
461    /// [`TransactionDB`]: crate::TransactionDB
462    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
463    /// [`commit`]: Self::commit
464    /// [`DB::get`]: crate::DB::get
465    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
466        &self,
467        cf: &impl AsColumnFamilyRef,
468        key: K,
469        exclusive: bool,
470        opts: &ReadOptions,
471    ) -> Result<Option<Vec<u8>>, Error> {
472        self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
473            .map(|x| x.map(|v| v.as_ref().to_vec()))
474    }
475
476    pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
477        &'_ self,
478        cf: &impl AsColumnFamilyRef,
479        key: K,
480        exclusive: bool,
481        opts: &ReadOptions,
482    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
483        let key = key.as_ref();
484        unsafe {
485            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
486                self.inner,
487                opts.inner,
488                cf.inner(),
489                key.as_ptr() as *const c_char,
490                key.len() as size_t,
491                u8::from(exclusive),
492            ));
493            if val.is_null() {
494                Ok(None)
495            } else {
496                Ok(Some(DBPinnableSlice::from_c(val)))
497            }
498        }
499    }
500
501    /// Return the values associated with the given keys.
502    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
503    where
504        K: AsRef<[u8]>,
505        I: IntoIterator<Item = K>,
506    {
507        self.multi_get_opt(keys, &ReadOptions::default())
508    }
509
510    /// Return the values associated with the given keys using read options.
511    pub fn multi_get_opt<K, I>(
512        &self,
513        keys: I,
514        readopts: &ReadOptions,
515    ) -> Vec<Result<Option<Vec<u8>>, Error>>
516    where
517        K: AsRef<[u8]>,
518        I: IntoIterator<Item = K>,
519    {
520        let owned_keys: Vec<K> = keys.into_iter().collect();
521        let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
522        let ptr_keys: Vec<*const c_char> = owned_keys
523            .iter()
524            .map(|k| k.as_ref().as_ptr() as *const c_char)
525            .collect();
526
527        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
528        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
529        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
530        unsafe {
531            ffi::rocksdb_transaction_multi_get(
532                self.inner,
533                readopts.inner,
534                ptr_keys.len(),
535                ptr_keys.as_ptr(),
536                keys_sizes.as_ptr(),
537                values.as_mut_ptr(),
538                values_sizes.as_mut_ptr(),
539                errors.as_mut_ptr(),
540            );
541        }
542
543        unsafe {
544            values.set_len(ptr_keys.len());
545            values_sizes.set_len(ptr_keys.len());
546            errors.set_len(ptr_keys.len());
547        }
548
549        convert_values(values, values_sizes, errors)
550    }
551
552    /// Return the values associated with the given keys and column families.
553    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
554        &'a self,
555        keys: I,
556    ) -> Vec<Result<Option<Vec<u8>>, Error>>
557    where
558        K: AsRef<[u8]>,
559        I: IntoIterator<Item = (&'b W, K)>,
560        W: 'b + AsColumnFamilyRef,
561    {
562        DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
563    }
564
565    /// Return the values associated with the given keys and column families using read options.
566    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
567        &'a self,
568        keys: I,
569        readopts: &ReadOptions,
570    ) -> Vec<Result<Option<Vec<u8>>, Error>>
571    where
572        K: AsRef<[u8]>,
573        I: IntoIterator<Item = (&'b W, K)>,
574        W: 'b + AsColumnFamilyRef,
575    {
576        let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
577        let keys_sizes: Vec<usize> = cfs_and_owned_keys
578            .iter()
579            .map(|(_, k)| k.as_ref().len())
580            .collect();
581        let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
582            .iter()
583            .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
584            .collect();
585        let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
586            .iter()
587            .map(|(c, _)| c.inner().cast_const())
588            .collect();
589        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
590        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
591        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
592        unsafe {
593            ffi::rocksdb_transaction_multi_get_cf(
594                self.inner,
595                readopts.inner,
596                ptr_cfs.as_ptr(),
597                ptr_keys.len(),
598                ptr_keys.as_ptr(),
599                keys_sizes.as_ptr(),
600                values.as_mut_ptr(),
601                values_sizes.as_mut_ptr(),
602                errors.as_mut_ptr(),
603            );
604        }
605
606        unsafe {
607            values.set_len(ptr_keys.len());
608            values_sizes.set_len(ptr_keys.len());
609            errors.set_len(ptr_keys.len());
610        }
611
612        convert_values(values, values_sizes, errors)
613    }
614
615    /// Put the key value in default column family and do conflict checking on the key.
616    ///
617    /// See [`put_cf`] for details.
618    ///
619    /// [`put_cf`]: Self::put_cf
620    pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
621        let key = key.as_ref();
622        let value = value.as_ref();
623        unsafe {
624            ffi_try!(ffi::rocksdb_transaction_put(
625                self.inner,
626                key.as_ptr() as *const c_char,
627                key.len() as size_t,
628                value.as_ptr() as *const c_char,
629                value.len() as size_t,
630            ));
631            Ok(())
632        }
633    }
634
635    /// Put the key value in the given column family and do conflict checking on the key.
636    ///
637    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
638    /// * [`Busy`] if there is a write conflict.
639    /// * [`TimedOut`] if a lock could not be acquired.
640    /// * [`TryAgain`] if the memtable history size is not large enough.
641    /// * [`MergeInProgress`] if merge operations cannot be resolved.
642    /// * or other errors on unexpected failures.
643    ///
644    /// [`Busy`]: crate::ErrorKind::Busy
645    /// [`TimedOut`]: crate::ErrorKind::TimedOut
646    /// [`TryAgain`]: crate::ErrorKind::TryAgain
647    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
648    /// [`TransactionDB`]: crate::TransactionDB
649    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
650    pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
651        &self,
652        cf: &impl AsColumnFamilyRef,
653        key: K,
654        value: V,
655    ) -> Result<(), Error> {
656        let key = key.as_ref();
657        let value = value.as_ref();
658        unsafe {
659            ffi_try!(ffi::rocksdb_transaction_put_cf(
660                self.inner,
661                cf.inner(),
662                key.as_ptr() as *const c_char,
663                key.len() as size_t,
664                value.as_ptr() as *const c_char,
665                value.len() as size_t,
666            ));
667            Ok(())
668        }
669    }
670
671    /// Merge value with existing value of key, and also do conflict checking on the key.
672    ///
673    /// See [`merge_cf`] for details.
674    ///
675    /// [`merge_cf`]: Self::merge_cf
676    pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
677        let key = key.as_ref();
678        let value = value.as_ref();
679        unsafe {
680            ffi_try!(ffi::rocksdb_transaction_merge(
681                self.inner,
682                key.as_ptr() as *const c_char,
683                key.len() as size_t,
684                value.as_ptr() as *const c_char,
685                value.len() as size_t
686            ));
687            Ok(())
688        }
689    }
690
691    /// Merge `value` with existing value of `key` in the given column family,
692    /// and also do conflict checking on the key.
693    ///
694    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
695    /// * [`Busy`] if there is a write conflict.
696    /// * [`TimedOut`] if a lock could not be acquired.
697    /// * [`TryAgain`] if the memtable history size is not large enough.
698    /// * [`MergeInProgress`] if merge operations cannot be resolved.
699    /// * or other errors on unexpected failures.
700    ///
701    /// [`Busy`]: crate::ErrorKind::Busy
702    /// [`TimedOut`]: crate::ErrorKind::TimedOut
703    /// [`TryAgain`]: crate::ErrorKind::TryAgain
704    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
705    /// [`TransactionDB`]: crate::TransactionDB
706    pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
707        &self,
708        cf: &impl AsColumnFamilyRef,
709        key: K,
710        value: V,
711    ) -> Result<(), Error> {
712        let key = key.as_ref();
713        let value = value.as_ref();
714        unsafe {
715            ffi_try!(ffi::rocksdb_transaction_merge_cf(
716                self.inner,
717                cf.inner(),
718                key.as_ptr() as *const c_char,
719                key.len() as size_t,
720                value.as_ptr() as *const c_char,
721                value.len() as size_t
722            ));
723            Ok(())
724        }
725    }
726
727    /// Delete the key value if it exists and do conflict checking on the key.
728    ///
729    /// See [`delete_cf`] for details.
730    ///
731    /// [`delete_cf`]: Self::delete_cf
732    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
733        let key = key.as_ref();
734        unsafe {
735            ffi_try!(ffi::rocksdb_transaction_delete(
736                self.inner,
737                key.as_ptr() as *const c_char,
738                key.len() as size_t
739            ));
740        }
741        Ok(())
742    }
743
744    /// Delete the key value in the given column family and do conflict checking.
745    ///
746    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
747    /// * [`Busy`] if there is a write conflict.
748    /// * [`TimedOut`] if a lock could not be acquired.
749    /// * [`TryAgain`] if the memtable history size is not large enough.
750    /// * [`MergeInProgress`] if merge operations cannot be resolved.
751    /// * or other errors on unexpected failures.
752    ///
753    /// [`Busy`]: crate::ErrorKind::Busy
754    /// [`TimedOut`]: crate::ErrorKind::TimedOut
755    /// [`TryAgain`]: crate::ErrorKind::TryAgain
756    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
757    /// [`TransactionDB`]: crate::TransactionDB
758    pub fn delete_cf<K: AsRef<[u8]>>(
759        &self,
760        cf: &impl AsColumnFamilyRef,
761        key: K,
762    ) -> Result<(), Error> {
763        let key = key.as_ref();
764        unsafe {
765            ffi_try!(ffi::rocksdb_transaction_delete_cf(
766                self.inner,
767                cf.inner(),
768                key.as_ptr() as *const c_char,
769                key.len() as size_t
770            ));
771        }
772        Ok(())
773    }
774
775    pub fn iterator<'a: 'b, 'b>(
776        &'a self,
777        mode: IteratorMode,
778    ) -> DBIteratorWithThreadMode<'b, Self> {
779        let readopts = ReadOptions::default();
780        self.iterator_opt(mode, readopts)
781    }
782
783    pub fn iterator_opt<'a: 'b, 'b>(
784        &'a self,
785        mode: IteratorMode,
786        readopts: ReadOptions,
787    ) -> DBIteratorWithThreadMode<'b, Self> {
788        DBIteratorWithThreadMode::new(self, readopts, mode)
789    }
790
791    /// Opens an iterator using the provided ReadOptions.
792    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
793    pub fn iterator_cf_opt<'a: 'b, 'b>(
794        &'a self,
795        cf_handle: &impl AsColumnFamilyRef,
796        readopts: ReadOptions,
797        mode: IteratorMode,
798    ) -> DBIteratorWithThreadMode<'b, Self> {
799        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
800    }
801
802    /// Opens an iterator with `set_total_order_seek` enabled.
803    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
804    /// with a Hash-based implementation.
805    pub fn full_iterator<'a: 'b, 'b>(
806        &'a self,
807        mode: IteratorMode,
808    ) -> DBIteratorWithThreadMode<'b, Self> {
809        let mut opts = ReadOptions::default();
810        opts.set_total_order_seek(true);
811        DBIteratorWithThreadMode::new(self, opts, mode)
812    }
813
814    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
815        &'a self,
816        prefix: P,
817    ) -> DBIteratorWithThreadMode<'b, Self> {
818        let mut opts = ReadOptions::default();
819        opts.set_prefix_same_as_start(true);
820        DBIteratorWithThreadMode::new(
821            self,
822            opts,
823            IteratorMode::From(prefix.as_ref(), Direction::Forward),
824        )
825    }
826
827    pub fn iterator_cf<'a: 'b, 'b>(
828        &'a self,
829        cf_handle: &impl AsColumnFamilyRef,
830        mode: IteratorMode,
831    ) -> DBIteratorWithThreadMode<'b, Self> {
832        let opts = ReadOptions::default();
833        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
834    }
835
836    pub fn full_iterator_cf<'a: 'b, 'b>(
837        &'a self,
838        cf_handle: &impl AsColumnFamilyRef,
839        mode: IteratorMode,
840    ) -> DBIteratorWithThreadMode<'b, Self> {
841        let mut opts = ReadOptions::default();
842        opts.set_total_order_seek(true);
843        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
844    }
845
846    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
847        &'a self,
848        cf_handle: &impl AsColumnFamilyRef,
849        prefix: P,
850    ) -> DBIteratorWithThreadMode<'a, Self> {
851        let mut opts = ReadOptions::default();
852        opts.set_prefix_same_as_start(true);
853        DBIteratorWithThreadMode::<'a, Self>::new_cf(
854            self,
855            cf_handle.inner(),
856            opts,
857            IteratorMode::From(prefix.as_ref(), Direction::Forward),
858        )
859    }
860
861    /// Opens a raw iterator over the database, using the default read options
862    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
863        let opts = ReadOptions::default();
864        DBRawIteratorWithThreadMode::new(self, opts)
865    }
866
867    /// Opens a raw iterator over the given column family, using the default read options
868    pub fn raw_iterator_cf<'a: 'b, 'b>(
869        &'a self,
870        cf_handle: &impl AsColumnFamilyRef,
871    ) -> DBRawIteratorWithThreadMode<'b, Self> {
872        let opts = ReadOptions::default();
873        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
874    }
875
876    /// Opens a raw iterator over the database, using the given read options
877    pub fn raw_iterator_opt<'a: 'b, 'b>(
878        &'a self,
879        readopts: ReadOptions,
880    ) -> DBRawIteratorWithThreadMode<'b, Self> {
881        DBRawIteratorWithThreadMode::new(self, readopts)
882    }
883
884    /// Opens a raw iterator over the given column family, using the given read options
885    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
886        &'a self,
887        cf_handle: &impl AsColumnFamilyRef,
888        readopts: ReadOptions,
889    ) -> DBRawIteratorWithThreadMode<'b, Self> {
890        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
891    }
892
893    pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
894        unsafe {
895            let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
896            let mut len: usize = 0;
897            let ptr = ffi::rocksdb_writebatch_wi_data(wi, std::ptr::from_mut(&mut len));
898            let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
899            ffi::rocksdb_free(wi as *mut c_void);
900            WriteBatchWithTransaction { inner: writebatch }
901        }
902    }
903
904    pub fn rebuild_from_writebatch(
905        &self,
906        writebatch: &WriteBatchWithTransaction<true>,
907    ) -> Result<(), Error> {
908        unsafe {
909            ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
910                self.inner,
911                writebatch.inner
912            ));
913        }
914        Ok(())
915    }
916}
917
918impl<DB> Drop for Transaction<'_, DB> {
919    fn drop(&mut self) {
920        unsafe {
921            ffi::rocksdb_transaction_destroy(self.inner);
922        }
923    }
924}