Skip to main content

rust_rocksdb/transactions/
transaction.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::marker::PhantomData;
17
18use crate::{
19    AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
20    Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
21    db::{DBAccess, convert_values},
22    ffi,
23};
24use libc::{c_char, c_void, size_t};
25
26// Default options are kept per-thread to avoid re-allocating on every call while
27// also preventing cross-thread sharing. Some RocksDB option wrappers hold
28// pointers into internal buffers and are not safe to share across threads.
29// Using thread_local allows cheap reuse in the common "default options" path
30// without synchronization overhead. Callers who need non-defaults must pass
31// explicit options.
32thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
33
34/// RocksDB Transaction.
35///
36/// To use transactions, you must first create a [`TransactionDB`] or [`OptimisticTransactionDB`].
37///
38/// [`TransactionDB`]: crate::TransactionDB
39/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
40///
41/// A `Transaction` must not outlive the `TransactionDB` it was created from:
42///
43/// ```compile_fail,E0597
44/// use rust_rocksdb::{SingleThreaded, TransactionDB};
45///
46/// let _txn = {
47///     let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
48///     db.transaction()
49/// };
50/// ```
51///
52/// A `Snapshot` taken from a `Transaction` must not outlive the `Transaction`:
53///
54/// ```compile_fail,E0597
55/// use rust_rocksdb::{SingleThreaded, TransactionDB};
56///
57/// let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
58/// let _snapshot = {
59///     let txn = db.transaction();
60///     txn.snapshot()
61/// };
62/// ```
63pub struct Transaction<'db, DB> {
64    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
65    pub(crate) _marker: PhantomData<&'db DB>,
66}
67
68unsafe impl<DB> Send for Transaction<'_, DB> {}
69
70impl<DB> DBAccess for Transaction<'_, DB> {
71    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
72        unsafe { ffi::rocksdb_transaction_get_snapshot(self.inner) }
73    }
74
75    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
76        unsafe {
77            ffi::rocksdb_free(snapshot as *mut c_void);
78        }
79    }
80
81    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
82        unsafe { ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner) }
83    }
84
85    unsafe fn create_iterator_cf(
86        &self,
87        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
88        readopts: &ReadOptions,
89    ) -> *mut ffi::rocksdb_iterator_t {
90        unsafe {
91            ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
92        }
93    }
94
95    fn get_opt<K: AsRef<[u8]>>(
96        &self,
97        key: K,
98        readopts: &ReadOptions,
99    ) -> Result<Option<Vec<u8>>, Error> {
100        self.get_opt(key, readopts)
101    }
102
103    fn get_cf_opt<K: AsRef<[u8]>>(
104        &self,
105        cf: &impl AsColumnFamilyRef,
106        key: K,
107        readopts: &ReadOptions,
108    ) -> Result<Option<Vec<u8>>, Error> {
109        self.get_cf_opt(cf, key, readopts)
110    }
111
112    fn get_pinned_opt<K: AsRef<[u8]>>(
113        &'_ self,
114        key: K,
115        readopts: &ReadOptions,
116    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
117        self.get_pinned_opt(key, readopts)
118    }
119
120    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
121        &'_ self,
122        cf: &impl AsColumnFamilyRef,
123        key: K,
124        readopts: &ReadOptions,
125    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
126        self.get_pinned_cf_opt(cf, key, readopts)
127    }
128
129    fn multi_get_opt<K, I>(
130        &self,
131        keys: I,
132        readopts: &ReadOptions,
133    ) -> Vec<Result<Option<Vec<u8>>, Error>>
134    where
135        K: AsRef<[u8]>,
136        I: IntoIterator<Item = K>,
137    {
138        self.multi_get_opt(keys, readopts)
139    }
140
141    fn multi_get_cf_opt<'b, K, I, W>(
142        &self,
143        keys_cf: I,
144        readopts: &ReadOptions,
145    ) -> Vec<Result<Option<Vec<u8>>, Error>>
146    where
147        K: AsRef<[u8]>,
148        I: IntoIterator<Item = (&'b W, K)>,
149        W: AsColumnFamilyRef + 'b,
150    {
151        self.multi_get_cf_opt(keys_cf, readopts)
152    }
153}
154
155impl<DB> Transaction<'_, DB> {
156    /// Write all batched keys to the DB atomically.
157    ///
158    /// May return any error that could be returned by `DB::write`.
159    ///
160    /// If this transaction was created by a [`TransactionDB`], an error of
161    /// the [`Expired`] kind may be returned if this transaction has
162    /// lived longer than expiration time in [`TransactionOptions`].
163    ///
164    /// If this transaction was created by an [`OptimisticTransactionDB`], an error of
165    /// the [`Busy`] kind may be returned if the transaction
166    /// could not guarantee that there are no write conflicts.
167    /// An error of the [`TryAgain`] kind may be returned if the memtable
168    /// history size is not large enough (see [`Options::set_max_write_buffer_size_to_maintain`]).
169    ///
170    /// [`Expired`]: crate::ErrorKind::Expired
171    /// [`TransactionOptions`]: crate::TransactionOptions
172    /// [`TransactionDB`]: crate::TransactionDB
173    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
174    /// [`Busy`]: crate::ErrorKind::Busy
175    /// [`TryAgain`]: crate::ErrorKind::TryAgain
176    /// [`Options::set_max_write_buffer_size_to_maintain`]: crate::Options::set_max_write_buffer_size_to_maintain
177    pub fn commit(self) -> Result<(), Error> {
178        unsafe {
179            ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
180        }
181        Ok(())
182    }
183
184    pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
185        let ptr = name.as_ptr();
186        let len = name.len();
187        unsafe {
188            ffi_try!(ffi::rocksdb_transaction_set_name(
189                self.inner, ptr as _, len as _
190            ));
191        }
192
193        Ok(())
194    }
195
196    pub fn get_name(&self) -> Option<Vec<u8>> {
197        unsafe {
198            let mut name_len = 0;
199            let name = ffi::rocksdb_transaction_get_name(self.inner, &raw mut name_len);
200            if name.is_null() {
201                None
202            } else {
203                let mut vec = vec![0; name_len];
204                std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
205                ffi::rocksdb_free(name as *mut c_void);
206                Some(vec)
207            }
208        }
209    }
210
211    pub fn prepare(&self) -> Result<(), Error> {
212        unsafe {
213            ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
214        }
215        Ok(())
216    }
217
218    /// Returns snapshot associated with transaction if snapshot was enabled in [`TransactionOptions`].
219    /// Otherwise, returns a snapshot with `nullptr` inside which doesn't affect read operations.
220    ///
221    /// [`TransactionOptions`]: crate::TransactionOptions
222    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
223        SnapshotWithThreadMode::new(self)
224    }
225
226    /// Discard all batched writes in this transaction.
227    pub fn rollback(&self) -> Result<(), Error> {
228        unsafe {
229            ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
230            Ok(())
231        }
232    }
233
234    /// Record the state of the transaction for future calls to [`rollback_to_savepoint`].
235    /// May be called multiple times to set multiple save points.
236    ///
237    /// [`rollback_to_savepoint`]: Self::rollback_to_savepoint
238    pub fn set_savepoint(&self) {
239        unsafe {
240            ffi::rocksdb_transaction_set_savepoint(self.inner);
241        }
242    }
243
244    /// Undo all operations in this transaction since the most recent call to [`set_savepoint`]
245    /// and removes the most recent [`set_savepoint`].
246    ///
247    /// Returns error if there is no previous call to [`set_savepoint`].
248    ///
249    /// [`set_savepoint`]: Self::set_savepoint
250    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
251        unsafe {
252            ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
253            Ok(())
254        }
255    }
256
257    /// Get the bytes associated with a key value.
258    ///
259    /// See [`get_cf_opt`] for details.
260    ///
261    /// [`get_cf_opt`]: Self::get_cf_opt
262    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
263        DEFAULT_READ_OPTS.with(|opts| self.get_opt(key, opts))
264    }
265
266    pub fn get_pinned<K: AsRef<[u8]>>(
267        &'_ self,
268        key: K,
269    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
270        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
271    }
272
273    /// Get the bytes associated with a key value and the given column family.
274    ///
275    /// See [`get_cf_opt`] for details.
276    ///
277    /// [`get_cf_opt`]: Self::get_cf_opt
278    pub fn get_cf<K: AsRef<[u8]>>(
279        &self,
280        cf: &impl AsColumnFamilyRef,
281        key: K,
282    ) -> Result<Option<Vec<u8>>, Error> {
283        DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key, opts))
284    }
285
286    pub fn get_pinned_cf<K: AsRef<[u8]>>(
287        &'_ self,
288        cf: &impl AsColumnFamilyRef,
289        key: K,
290    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
291        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
292    }
293
294    /// Get the key and ensure that this transaction will only
295    /// be able to be committed if this key is not written outside this
296    /// transaction after it has first been read (or after the snapshot if a
297    /// snapshot is set in this transaction).
298    ///
299    /// See [`get_for_update_cf_opt`] for details.
300    ///
301    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
302    pub fn get_for_update<K: AsRef<[u8]>>(
303        &self,
304        key: K,
305        exclusive: bool,
306    ) -> Result<Option<Vec<u8>>, Error> {
307        DEFAULT_READ_OPTS.with(|opts| self.get_for_update_opt(key, exclusive, opts))
308    }
309
310    pub fn get_pinned_for_update<K: AsRef<[u8]>>(
311        &'_ self,
312        key: K,
313        exclusive: bool,
314    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
315        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_opt(key, exclusive, opts))
316    }
317
318    /// Get the key in the given column family and ensure that this transaction will only
319    /// be able to be committed if this key is not written outside this
320    /// transaction after it has first been read (or after the snapshot if a
321    /// snapshot is set in this transaction).
322    ///
323    /// See [`get_for_update_cf_opt`] for details.
324    ///
325    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
326    pub fn get_for_update_cf<K: AsRef<[u8]>>(
327        &self,
328        cf: &impl AsColumnFamilyRef,
329        key: K,
330        exclusive: bool,
331    ) -> Result<Option<Vec<u8>>, Error> {
332        DEFAULT_READ_OPTS.with(|opts| self.get_for_update_cf_opt(cf, key, exclusive, opts))
333    }
334
335    pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
336        &'_ self,
337        cf: &impl AsColumnFamilyRef,
338        key: K,
339        exclusive: bool,
340    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
341        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts))
342    }
343
344    /// Returns the bytes associated with a key value with read options.
345    ///
346    /// See [`get_cf_opt`] for details.
347    ///
348    /// [`get_cf_opt`]: Self::get_cf_opt
349    pub fn get_opt<K: AsRef<[u8]>>(
350        &self,
351        key: K,
352        readopts: &ReadOptions,
353    ) -> Result<Option<Vec<u8>>, Error> {
354        self.get_pinned_opt(key, readopts)
355            .map(|x| x.map(|v| v.as_ref().to_vec()))
356    }
357
358    pub fn get_pinned_opt<K: AsRef<[u8]>>(
359        &'_ self,
360        key: K,
361        readopts: &ReadOptions,
362    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
363        let key = key.as_ref();
364        unsafe {
365            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
366                self.inner,
367                readopts.inner,
368                key.as_ptr() as *const c_char,
369                key.len(),
370            ));
371            if val.is_null() {
372                Ok(None)
373            } else {
374                Ok(Some(DBPinnableSlice::from_c(val)))
375            }
376        }
377    }
378
379    /// Get the bytes associated with a key value and the given column family with read options.
380    ///
381    /// This function will also read pending changes in this transaction.
382    /// Currently, this function will return an error of the [`MergeInProgress`] kind
383    /// if the most recent write to the queried key in this batch is a Merge.
384    ///
385    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
386    pub fn get_cf_opt<K: AsRef<[u8]>>(
387        &self,
388        cf: &impl AsColumnFamilyRef,
389        key: K,
390        readopts: &ReadOptions,
391    ) -> Result<Option<Vec<u8>>, Error> {
392        self.get_pinned_cf_opt(cf, key, readopts)
393            .map(|x| x.map(|v| v.as_ref().to_vec()))
394    }
395
396    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
397        &'_ self,
398        cf: &impl AsColumnFamilyRef,
399        key: K,
400        readopts: &ReadOptions,
401    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
402        let key = key.as_ref();
403        unsafe {
404            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
405                self.inner,
406                readopts.inner,
407                cf.inner(),
408                key.as_ptr() as *const c_char,
409                key.len(),
410            ));
411            if val.is_null() {
412                Ok(None)
413            } else {
414                Ok(Some(DBPinnableSlice::from_c(val)))
415            }
416        }
417    }
418
419    /// Get the key with read options and ensure that this transaction will only
420    /// be able to be committed if this key is not written outside this
421    /// transaction after it has first been read (or after the snapshot if a
422    /// snapshot is set in this transaction).
423    ///
424    /// See [`get_for_update_cf_opt`] for details.
425    ///
426    /// [`get_for_update_cf_opt`]: Self::get_for_update_cf_opt
427    pub fn get_for_update_opt<K: AsRef<[u8]>>(
428        &self,
429        key: K,
430        exclusive: bool,
431        opts: &ReadOptions,
432    ) -> Result<Option<Vec<u8>>, Error> {
433        self.get_pinned_for_update_opt(key, exclusive, opts)
434            .map(|x| x.map(|v| v.as_ref().to_vec()))
435    }
436
437    pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
438        &'_ self,
439        key: K,
440        exclusive: bool,
441        opts: &ReadOptions,
442    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
443        let key = key.as_ref();
444        unsafe {
445            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
446                self.inner,
447                opts.inner,
448                key.as_ptr() as *const c_char,
449                key.len() as size_t,
450                u8::from(exclusive),
451            ));
452            if val.is_null() {
453                Ok(None)
454            } else {
455                Ok(Some(DBPinnableSlice::from_c(val)))
456            }
457        }
458    }
459
460    /// Get the key in the given column family with read options
461    /// and ensure that this transaction will only
462    /// be able to be committed if this key is not written outside this
463    /// transaction after it has first been read (or after the snapshot if a
464    /// snapshot is set in this transaction).
465    ///
466    /// Currently, this function will return an error of the [`MergeInProgress`]
467    /// if the most recent write to the queried key in this batch is a Merge.
468    ///
469    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
470    /// * [`Busy`] if there is a write conflict.
471    /// * [`TimedOut`] if a lock could not be acquired.
472    /// * [`TryAgain`] if the memtable history size is not large enough.
473    /// * [`MergeInProgress`] if merge operations cannot be resolved.
474    /// * or other errors if this key could not be read.
475    ///
476    /// If this transaction was created by an `[OptimisticTransactionDB]`, `get_for_update_opt`
477    /// can cause [`commit`] to fail. Otherwise, it could return any error that could
478    /// be returned by `[DB::get]`.
479    ///
480    /// [`Busy`]: crate::ErrorKind::Busy
481    /// [`TimedOut`]: crate::ErrorKind::TimedOut
482    /// [`TryAgain`]: crate::ErrorKind::TryAgain
483    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
484    /// [`TransactionDB`]: crate::TransactionDB
485    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
486    /// [`commit`]: Self::commit
487    /// [`DB::get`]: crate::DB::get
488    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
489        &self,
490        cf: &impl AsColumnFamilyRef,
491        key: K,
492        exclusive: bool,
493        opts: &ReadOptions,
494    ) -> Result<Option<Vec<u8>>, Error> {
495        self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
496            .map(|x| x.map(|v| v.as_ref().to_vec()))
497    }
498
499    pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
500        &'_ self,
501        cf: &impl AsColumnFamilyRef,
502        key: K,
503        exclusive: bool,
504        opts: &ReadOptions,
505    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
506        let key = key.as_ref();
507        unsafe {
508            let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
509                self.inner,
510                opts.inner,
511                cf.inner(),
512                key.as_ptr() as *const c_char,
513                key.len() as size_t,
514                u8::from(exclusive),
515            ));
516            if val.is_null() {
517                Ok(None)
518            } else {
519                Ok(Some(DBPinnableSlice::from_c(val)))
520            }
521        }
522    }
523
524    /// Return the values associated with the given keys.
525    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
526    where
527        K: AsRef<[u8]>,
528        I: IntoIterator<Item = K>,
529    {
530        DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
531    }
532
533    /// Return the values associated with the given keys using read options.
534    pub fn multi_get_opt<K, I>(
535        &self,
536        keys: I,
537        readopts: &ReadOptions,
538    ) -> Vec<Result<Option<Vec<u8>>, Error>>
539    where
540        K: AsRef<[u8]>,
541        I: IntoIterator<Item = K>,
542    {
543        let owned_keys: Vec<K> = keys.into_iter().collect();
544        let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
545        let ptr_keys: Vec<*const c_char> = owned_keys
546            .iter()
547            .map(|k| k.as_ref().as_ptr() as *const c_char)
548            .collect();
549
550        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
551        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
552        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
553        unsafe {
554            ffi::rocksdb_transaction_multi_get(
555                self.inner,
556                readopts.inner,
557                ptr_keys.len(),
558                ptr_keys.as_ptr(),
559                keys_sizes.as_ptr(),
560                values.as_mut_ptr(),
561                values_sizes.as_mut_ptr(),
562                errors.as_mut_ptr(),
563            );
564        }
565
566        unsafe {
567            values.set_len(ptr_keys.len());
568            values_sizes.set_len(ptr_keys.len());
569            errors.set_len(ptr_keys.len());
570        }
571
572        convert_values(values, values_sizes, errors)
573    }
574
575    /// Return the values associated with the given keys and column families.
576    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
577        &'a self,
578        keys: I,
579    ) -> Vec<Result<Option<Vec<u8>>, Error>>
580    where
581        K: AsRef<[u8]>,
582        I: IntoIterator<Item = (&'b W, K)>,
583        W: 'b + AsColumnFamilyRef,
584    {
585        DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
586    }
587
588    /// Return the values associated with the given keys and column families using read options.
589    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
590        &'a self,
591        keys: I,
592        readopts: &ReadOptions,
593    ) -> Vec<Result<Option<Vec<u8>>, Error>>
594    where
595        K: AsRef<[u8]>,
596        I: IntoIterator<Item = (&'b W, K)>,
597        W: 'b + AsColumnFamilyRef,
598    {
599        let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
600        let keys_sizes: Vec<usize> = cfs_and_owned_keys
601            .iter()
602            .map(|(_, k)| k.as_ref().len())
603            .collect();
604        let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
605            .iter()
606            .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
607            .collect();
608        let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
609            .iter()
610            .map(|(c, _)| c.inner().cast_const())
611            .collect();
612        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
613        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
614        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
615        unsafe {
616            ffi::rocksdb_transaction_multi_get_cf(
617                self.inner,
618                readopts.inner,
619                ptr_cfs.as_ptr(),
620                ptr_keys.len(),
621                ptr_keys.as_ptr(),
622                keys_sizes.as_ptr(),
623                values.as_mut_ptr(),
624                values_sizes.as_mut_ptr(),
625                errors.as_mut_ptr(),
626            );
627        }
628
629        unsafe {
630            values.set_len(ptr_keys.len());
631            values_sizes.set_len(ptr_keys.len());
632            errors.set_len(ptr_keys.len());
633        }
634
635        convert_values(values, values_sizes, errors)
636    }
637
638    /// Put the key value in default column family and do conflict checking on the key.
639    ///
640    /// See [`put_cf`] for details.
641    ///
642    /// [`put_cf`]: Self::put_cf
643    pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
644        let key = key.as_ref();
645        let value = value.as_ref();
646        unsafe {
647            ffi_try!(ffi::rocksdb_transaction_put(
648                self.inner,
649                key.as_ptr() as *const c_char,
650                key.len() as size_t,
651                value.as_ptr() as *const c_char,
652                value.len() as size_t,
653            ));
654            Ok(())
655        }
656    }
657
658    /// Put the key value in the given column family and do conflict checking on the key.
659    ///
660    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
661    /// * [`Busy`] if there is a write conflict.
662    /// * [`TimedOut`] if a lock could not be acquired.
663    /// * [`TryAgain`] if the memtable history size is not large enough.
664    /// * [`MergeInProgress`] if merge operations cannot be resolved.
665    /// * or other errors on unexpected failures.
666    ///
667    /// [`Busy`]: crate::ErrorKind::Busy
668    /// [`TimedOut`]: crate::ErrorKind::TimedOut
669    /// [`TryAgain`]: crate::ErrorKind::TryAgain
670    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
671    /// [`TransactionDB`]: crate::TransactionDB
672    /// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
673    pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
674        &self,
675        cf: &impl AsColumnFamilyRef,
676        key: K,
677        value: V,
678    ) -> Result<(), Error> {
679        let key = key.as_ref();
680        let value = value.as_ref();
681        unsafe {
682            ffi_try!(ffi::rocksdb_transaction_put_cf(
683                self.inner,
684                cf.inner(),
685                key.as_ptr() as *const c_char,
686                key.len() as size_t,
687                value.as_ptr() as *const c_char,
688                value.len() as size_t,
689            ));
690            Ok(())
691        }
692    }
693
694    /// Merge value with existing value of key, and also do conflict checking on the key.
695    ///
696    /// See [`merge_cf`] for details.
697    ///
698    /// [`merge_cf`]: Self::merge_cf
699    pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
700        let key = key.as_ref();
701        let value = value.as_ref();
702        unsafe {
703            ffi_try!(ffi::rocksdb_transaction_merge(
704                self.inner,
705                key.as_ptr() as *const c_char,
706                key.len() as size_t,
707                value.as_ptr() as *const c_char,
708                value.len() as size_t
709            ));
710            Ok(())
711        }
712    }
713
714    /// Merge `value` with existing value of `key` in the given column family,
715    /// and also do conflict checking on the key.
716    ///
717    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
718    /// * [`Busy`] if there is a write conflict.
719    /// * [`TimedOut`] if a lock could not be acquired.
720    /// * [`TryAgain`] if the memtable history size is not large enough.
721    /// * [`MergeInProgress`] if merge operations cannot be resolved.
722    /// * or other errors on unexpected failures.
723    ///
724    /// [`Busy`]: crate::ErrorKind::Busy
725    /// [`TimedOut`]: crate::ErrorKind::TimedOut
726    /// [`TryAgain`]: crate::ErrorKind::TryAgain
727    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
728    /// [`TransactionDB`]: crate::TransactionDB
729    pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
730        &self,
731        cf: &impl AsColumnFamilyRef,
732        key: K,
733        value: V,
734    ) -> Result<(), Error> {
735        let key = key.as_ref();
736        let value = value.as_ref();
737        unsafe {
738            ffi_try!(ffi::rocksdb_transaction_merge_cf(
739                self.inner,
740                cf.inner(),
741                key.as_ptr() as *const c_char,
742                key.len() as size_t,
743                value.as_ptr() as *const c_char,
744                value.len() as size_t
745            ));
746            Ok(())
747        }
748    }
749
750    /// Delete the key value if it exists and do conflict checking on the key.
751    ///
752    /// See [`delete_cf`] for details.
753    ///
754    /// [`delete_cf`]: Self::delete_cf
755    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
756        let key = key.as_ref();
757        unsafe {
758            ffi_try!(ffi::rocksdb_transaction_delete(
759                self.inner,
760                key.as_ptr() as *const c_char,
761                key.len() as size_t
762            ));
763        }
764        Ok(())
765    }
766
767    /// Delete the key value in the given column family and do conflict checking.
768    ///
769    /// If this transaction was created by a [`TransactionDB`], it can return error of kind:
770    /// * [`Busy`] if there is a write conflict.
771    /// * [`TimedOut`] if a lock could not be acquired.
772    /// * [`TryAgain`] if the memtable history size is not large enough.
773    /// * [`MergeInProgress`] if merge operations cannot be resolved.
774    /// * or other errors on unexpected failures.
775    ///
776    /// [`Busy`]: crate::ErrorKind::Busy
777    /// [`TimedOut`]: crate::ErrorKind::TimedOut
778    /// [`TryAgain`]: crate::ErrorKind::TryAgain
779    /// [`MergeInProgress`]: crate::ErrorKind::MergeInProgress
780    /// [`TransactionDB`]: crate::TransactionDB
781    pub fn delete_cf<K: AsRef<[u8]>>(
782        &self,
783        cf: &impl AsColumnFamilyRef,
784        key: K,
785    ) -> Result<(), Error> {
786        let key = key.as_ref();
787        unsafe {
788            ffi_try!(ffi::rocksdb_transaction_delete_cf(
789                self.inner,
790                cf.inner(),
791                key.as_ptr() as *const c_char,
792                key.len() as size_t
793            ));
794        }
795        Ok(())
796    }
797
798    pub fn iterator<'a: 'b, 'b>(
799        &'a self,
800        mode: IteratorMode,
801    ) -> DBIteratorWithThreadMode<'b, Self> {
802        let readopts = ReadOptions::default();
803        self.iterator_opt(mode, readopts)
804    }
805
806    pub fn iterator_opt<'a: 'b, 'b>(
807        &'a self,
808        mode: IteratorMode,
809        readopts: ReadOptions,
810    ) -> DBIteratorWithThreadMode<'b, Self> {
811        DBIteratorWithThreadMode::new(self, readopts, mode)
812    }
813
814    /// Opens an iterator using the provided ReadOptions.
815    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions.
816    pub fn iterator_cf_opt<'a: 'b, 'b>(
817        &'a self,
818        cf_handle: &impl AsColumnFamilyRef,
819        readopts: ReadOptions,
820        mode: IteratorMode,
821    ) -> DBIteratorWithThreadMode<'b, Self> {
822        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
823    }
824
825    /// Opens an iterator with `set_total_order_seek` enabled.
826    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
827    /// with a Hash-based implementation.
828    pub fn full_iterator<'a: 'b, 'b>(
829        &'a self,
830        mode: IteratorMode,
831    ) -> DBIteratorWithThreadMode<'b, Self> {
832        let mut opts = ReadOptions::default();
833        opts.set_total_order_seek(true);
834        DBIteratorWithThreadMode::new(self, opts, mode)
835    }
836
837    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
838        &'a self,
839        prefix: P,
840    ) -> DBIteratorWithThreadMode<'b, Self> {
841        let mut opts = ReadOptions::default();
842        opts.set_prefix_same_as_start(true);
843        DBIteratorWithThreadMode::new(
844            self,
845            opts,
846            IteratorMode::From(prefix.as_ref(), Direction::Forward),
847        )
848    }
849
850    pub fn iterator_cf<'a: 'b, 'b>(
851        &'a self,
852        cf_handle: &impl AsColumnFamilyRef,
853        mode: IteratorMode,
854    ) -> DBIteratorWithThreadMode<'b, Self> {
855        let opts = ReadOptions::default();
856        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
857    }
858
859    pub fn full_iterator_cf<'a: 'b, 'b>(
860        &'a self,
861        cf_handle: &impl AsColumnFamilyRef,
862        mode: IteratorMode,
863    ) -> DBIteratorWithThreadMode<'b, Self> {
864        let mut opts = ReadOptions::default();
865        opts.set_total_order_seek(true);
866        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
867    }
868
869    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
870        &'a self,
871        cf_handle: &impl AsColumnFamilyRef,
872        prefix: P,
873    ) -> DBIteratorWithThreadMode<'a, Self> {
874        let mut opts = ReadOptions::default();
875        opts.set_prefix_same_as_start(true);
876        DBIteratorWithThreadMode::<'a, Self>::new_cf(
877            self,
878            cf_handle.inner(),
879            opts,
880            IteratorMode::From(prefix.as_ref(), Direction::Forward),
881        )
882    }
883
884    /// Opens a raw iterator over the database, using the default read options
885    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
886        let opts = ReadOptions::default();
887        DBRawIteratorWithThreadMode::new(self, opts)
888    }
889
890    /// Opens a raw iterator over the given column family, using the default read options
891    pub fn raw_iterator_cf<'a: 'b, 'b>(
892        &'a self,
893        cf_handle: &impl AsColumnFamilyRef,
894    ) -> DBRawIteratorWithThreadMode<'b, Self> {
895        let opts = ReadOptions::default();
896        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
897    }
898
899    /// Opens a raw iterator over the database, using the given read options
900    pub fn raw_iterator_opt<'a: 'b, 'b>(
901        &'a self,
902        readopts: ReadOptions,
903    ) -> DBRawIteratorWithThreadMode<'b, Self> {
904        DBRawIteratorWithThreadMode::new(self, readopts)
905    }
906
907    /// Opens a raw iterator over the given column family, using the given read options
908    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
909        &'a self,
910        cf_handle: &impl AsColumnFamilyRef,
911        readopts: ReadOptions,
912    ) -> DBRawIteratorWithThreadMode<'b, Self> {
913        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
914    }
915
916    pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
917        unsafe {
918            let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
919            let mut len: usize = 0;
920            let ptr = ffi::rocksdb_writebatch_wi_data(wi, std::ptr::from_mut(&mut len));
921            let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
922            ffi::rocksdb_free(wi as *mut c_void);
923            WriteBatchWithTransaction { inner: writebatch }
924        }
925    }
926
927    pub fn rebuild_from_writebatch(
928        &self,
929        writebatch: &WriteBatchWithTransaction<true>,
930    ) -> Result<(), Error> {
931        unsafe {
932            ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
933                self.inner,
934                writebatch.inner
935            ));
936        }
937        Ok(())
938    }
939}
940
941impl<DB> Drop for Transaction<'_, DB> {
942    fn drop(&mut self) {
943        unsafe {
944            ffi::rocksdb_transaction_destroy(self.inner);
945        }
946    }
947}