electrs_rocksdb/transactions/
transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{
17    collections::BTreeMap,
18    ffi::CString,
19    fs, iter,
20    marker::PhantomData,
21    path::{Path, PathBuf},
22    ptr,
23    sync::{Arc, Mutex},
24};
25
26use crate::{
27    column_family::UnboundColumnFamily,
28    db::{convert_values, DBAccess},
29    db_options::OptionsMustOutliveDB,
30    ffi,
31    ffi_util::to_cpath,
32    AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
33    DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
34    IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
35    ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
36    WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
37};
38use ffi::rocksdb_transaction_t;
39use libc::{c_char, c_int, c_void, size_t};
40
41#[cfg(not(feature = "multi-threaded-cf"))]
42type DefaultThreadMode = crate::SingleThreaded;
43#[cfg(feature = "multi-threaded-cf")]
44type DefaultThreadMode = crate::MultiThreaded;
45
46/// RocksDB TransactionDB.
47///
48/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
49/// to learn more about RocksDB TransactionDB.
50///
51/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
52/// if feature `multi-threaded-cf` is not enabled.
53///
54/// ```
55/// use rocksdb::{DB, Options, TransactionDB, SingleThreaded};
56/// let path = "_path_for_transaction_db";
57/// {
58///     let db: TransactionDB = TransactionDB::open_default(path).unwrap();
59///     db.put(b"my key", b"my value").unwrap();
60///     
61///     // create transaction
62///     let txn = db.transaction();
63///     txn.put(b"key2", b"value2");
64///     txn.put(b"key3", b"value3");
65///     txn.commit().unwrap();
66/// }
67/// let _ = DB::destroy(&Options::default(), path);
68/// ```
69///
70/// [`SingleThreaded`]: crate::SingleThreaded
71pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
72    pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
73    cfs: T,
74    path: PathBuf,
75    // prepared 2pc transactions.
76    prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
77    _outlive: Vec<OptionsMustOutliveDB>,
78}
79
80unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
81unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
82
83impl<T: ThreadMode> DBAccess for TransactionDB<T> {
84    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
85        ffi::rocksdb_transactiondb_create_snapshot(self.inner)
86    }
87
88    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
89        ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
90    }
91
92    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
93        ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
94    }
95
96    unsafe fn create_iterator_cf(
97        &self,
98        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
99        readopts: &ReadOptions,
100    ) -> *mut ffi::rocksdb_iterator_t {
101        ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
102    }
103
104    fn get_opt<K: AsRef<[u8]>>(
105        &self,
106        key: K,
107        readopts: &ReadOptions,
108    ) -> Result<Option<Vec<u8>>, Error> {
109        self.get_opt(key, readopts)
110    }
111
112    fn get_cf_opt<K: AsRef<[u8]>>(
113        &self,
114        cf: &impl AsColumnFamilyRef,
115        key: K,
116        readopts: &ReadOptions,
117    ) -> Result<Option<Vec<u8>>, Error> {
118        self.get_cf_opt(cf, key, readopts)
119    }
120
121    fn get_pinned_opt<K: AsRef<[u8]>>(
122        &self,
123        key: K,
124        readopts: &ReadOptions,
125    ) -> Result<Option<DBPinnableSlice>, Error> {
126        self.get_pinned_opt(key, readopts)
127    }
128
129    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
130        &self,
131        cf: &impl AsColumnFamilyRef,
132        key: K,
133        readopts: &ReadOptions,
134    ) -> Result<Option<DBPinnableSlice>, Error> {
135        self.get_pinned_cf_opt(cf, key, readopts)
136    }
137
138    fn multi_get_opt<K, I>(
139        &self,
140        keys: I,
141        readopts: &ReadOptions,
142    ) -> Vec<Result<Option<Vec<u8>>, Error>>
143    where
144        K: AsRef<[u8]>,
145        I: IntoIterator<Item = K>,
146    {
147        self.multi_get_opt(keys, readopts)
148    }
149
150    fn multi_get_cf_opt<'b, K, I, W>(
151        &self,
152        keys_cf: I,
153        readopts: &ReadOptions,
154    ) -> Vec<Result<Option<Vec<u8>>, Error>>
155    where
156        K: AsRef<[u8]>,
157        I: IntoIterator<Item = (&'b W, K)>,
158        W: AsColumnFamilyRef + 'b,
159    {
160        self.multi_get_cf_opt(keys_cf, readopts)
161    }
162}
163
164impl<T: ThreadMode> TransactionDB<T> {
165    /// Opens a database with default options.
166    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
167        let mut opts = Options::default();
168        opts.create_if_missing(true);
169        let txn_db_opts = TransactionDBOptions::default();
170        Self::open(&opts, &txn_db_opts, path)
171    }
172
173    /// Opens the database with the specified options.
174    pub fn open<P: AsRef<Path>>(
175        opts: &Options,
176        txn_db_opts: &TransactionDBOptions,
177        path: P,
178    ) -> Result<Self, Error> {
179        Self::open_cf(opts, txn_db_opts, path, None::<&str>)
180    }
181
182    /// Opens a database with the given database options and column family names.
183    ///
184    /// Column families opened using this function will be created with default `Options`.
185    pub fn open_cf<P, I, N>(
186        opts: &Options,
187        txn_db_opts: &TransactionDBOptions,
188        path: P,
189        cfs: I,
190    ) -> Result<Self, Error>
191    where
192        P: AsRef<Path>,
193        I: IntoIterator<Item = N>,
194        N: AsRef<str>,
195    {
196        let cfs = cfs
197            .into_iter()
198            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
199
200        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
201    }
202
203    /// Opens a database with the given database options and column family descriptors.
204    pub fn open_cf_descriptors<P, I>(
205        opts: &Options,
206        txn_db_opts: &TransactionDBOptions,
207        path: P,
208        cfs: I,
209    ) -> Result<Self, Error>
210    where
211        P: AsRef<Path>,
212        I: IntoIterator<Item = ColumnFamilyDescriptor>,
213    {
214        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
215    }
216
217    /// Internal implementation for opening RocksDB.
218    fn open_cf_descriptors_internal<P, I>(
219        opts: &Options,
220        txn_db_opts: &TransactionDBOptions,
221        path: P,
222        cfs: I,
223    ) -> Result<Self, Error>
224    where
225        P: AsRef<Path>,
226        I: IntoIterator<Item = ColumnFamilyDescriptor>,
227    {
228        let cfs: Vec<_> = cfs.into_iter().collect();
229        let outlive = iter::once(opts.outlive.clone())
230            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
231            .collect();
232
233        let cpath = to_cpath(&path)?;
234
235        if let Err(e) = fs::create_dir_all(&path) {
236            return Err(Error::new(format!(
237                "Failed to create RocksDB directory: `{:?}`.",
238                e
239            )));
240        }
241
242        let db: *mut ffi::rocksdb_transactiondb_t;
243        let mut cf_map = BTreeMap::new();
244
245        if cfs.is_empty() {
246            db = Self::open_raw(opts, txn_db_opts, &cpath)?;
247        } else {
248            let mut cfs_v = cfs;
249            // Always open the default column family.
250            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
251                cfs_v.push(ColumnFamilyDescriptor {
252                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
253                    options: Options::default(),
254                });
255            }
256            // We need to store our CStrings in an intermediate vector
257            // so that their pointers remain valid.
258            let c_cfs: Vec<CString> = cfs_v
259                .iter()
260                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
261                .collect();
262
263            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
264
265            // These handles will be populated by DB.
266            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
267
268            let cfopts: Vec<_> = cfs_v
269                .iter()
270                .map(|cf| cf.options.inner as *const _)
271                .collect();
272
273            db = Self::open_cf_raw(
274                opts,
275                txn_db_opts,
276                &cpath,
277                &cfs_v,
278                &cfnames,
279                &cfopts,
280                &mut cfhandles,
281            )?;
282
283            for handle in &cfhandles {
284                if handle.is_null() {
285                    return Err(Error::new(
286                        "Received null column family handle from DB.".to_owned(),
287                    ));
288                }
289            }
290
291            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
292                cf_map.insert(cf_desc.name.clone(), inner);
293            }
294        }
295
296        if db.is_null() {
297            return Err(Error::new("Could not initialize database.".to_owned()));
298        }
299
300        let prepared = unsafe {
301            let mut cnt = 0;
302            let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
303            let mut vec = vec![std::ptr::null_mut(); cnt];
304            std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
305            if !ptr.is_null() {
306                ffi::rocksdb_free(ptr as *mut c_void);
307            }
308            vec
309        };
310
311        Ok(TransactionDB {
312            inner: db,
313            cfs: T::new_cf_map_internal(cf_map),
314            path: path.as_ref().to_path_buf(),
315            prepared: Mutex::new(prepared),
316            _outlive: outlive,
317        })
318    }
319
320    fn open_raw(
321        opts: &Options,
322        txn_db_opts: &TransactionDBOptions,
323        cpath: &CString,
324    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
325        unsafe {
326            let db = ffi_try!(ffi::rocksdb_transactiondb_open(
327                opts.inner,
328                txn_db_opts.inner,
329                cpath.as_ptr()
330            ));
331            Ok(db)
332        }
333    }
334
335    fn open_cf_raw(
336        opts: &Options,
337        txn_db_opts: &TransactionDBOptions,
338        cpath: &CString,
339        cfs_v: &[ColumnFamilyDescriptor],
340        cfnames: &[*const c_char],
341        cfopts: &[*const ffi::rocksdb_options_t],
342        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
343    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
344        unsafe {
345            let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
346                opts.inner,
347                txn_db_opts.inner,
348                cpath.as_ptr(),
349                cfs_v.len() as c_int,
350                cfnames.as_ptr(),
351                cfopts.as_ptr(),
352                cfhandles.as_mut_ptr(),
353            ));
354            Ok(db)
355        }
356    }
357
358    fn create_inner_cf_handle(
359        &self,
360        name: &str,
361        opts: &Options,
362    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
363        let cf_name = if let Ok(c) = CString::new(name.as_bytes()) {
364            c
365        } else {
366            return Err(Error::new(
367                "Failed to convert path to CString when creating cf".to_owned(),
368            ));
369        };
370        Ok(unsafe {
371            ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
372                self.inner,
373                opts.inner,
374                cf_name.as_ptr(),
375            ))
376        })
377    }
378
379    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
380        DB::list_cf(opts, path)
381    }
382
383    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
384        DB::destroy(opts, path)
385    }
386
387    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
388        DB::repair(opts, path)
389    }
390
391    pub fn path(&self) -> &Path {
392        self.path.as_path()
393    }
394
395    /// Creates a transaction with default options.
396    pub fn transaction(&self) -> Transaction<Self> {
397        self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
398    }
399
400    /// Creates a transaction with options.
401    pub fn transaction_opt<'a>(
402        &'a self,
403        write_opts: &WriteOptions,
404        txn_opts: &TransactionOptions,
405    ) -> Transaction<'a, Self> {
406        Transaction {
407            inner: unsafe {
408                ffi::rocksdb_transaction_begin(
409                    self.inner,
410                    write_opts.inner,
411                    txn_opts.inner,
412                    std::ptr::null_mut(),
413                )
414            },
415            _marker: PhantomData::default(),
416        }
417    }
418
419    /// Get all prepared transactions for recovery.
420    ///
421    /// This function is expected to call once after open database.
422    /// User should commit or rollback all transactions before start other transactions.
423    pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
424        self.prepared
425            .lock()
426            .unwrap()
427            .drain(0..)
428            .map(|inner| Transaction {
429                inner,
430                _marker: PhantomData::default(),
431            })
432            .collect()
433    }
434
435    /// Returns the bytes associated with a key value.
436    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
437        self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
438    }
439
440    /// Returns the bytes associated with a key value and the given column family.
441    pub fn get_cf<K: AsRef<[u8]>>(
442        &self,
443        cf: &impl AsColumnFamilyRef,
444        key: K,
445    ) -> Result<Option<Vec<u8>>, Error> {
446        self.get_pinned_cf(cf, key)
447            .map(|x| x.map(|v| v.as_ref().to_vec()))
448    }
449
450    /// Returns the bytes associated with a key value with read options.
451    pub fn get_opt<K: AsRef<[u8]>>(
452        &self,
453        key: K,
454        readopts: &ReadOptions,
455    ) -> Result<Option<Vec<u8>>, Error> {
456        self.get_pinned_opt(key, readopts)
457            .map(|x| x.map(|v| v.as_ref().to_vec()))
458    }
459
460    /// Returns the bytes associated with a key value and the given column family with read options.
461    pub fn get_cf_opt<K: AsRef<[u8]>>(
462        &self,
463        cf: &impl AsColumnFamilyRef,
464        key: K,
465        readopts: &ReadOptions,
466    ) -> Result<Option<Vec<u8>>, Error> {
467        self.get_pinned_cf_opt(cf, key, readopts)
468            .map(|x| x.map(|v| v.as_ref().to_vec()))
469    }
470
471    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
472        self.get_pinned_opt(key, &ReadOptions::default())
473    }
474
475    /// Returns the bytes associated with a key value and the given column family.
476    pub fn get_pinned_cf<K: AsRef<[u8]>>(
477        &self,
478        cf: &impl AsColumnFamilyRef,
479        key: K,
480    ) -> Result<Option<DBPinnableSlice>, Error> {
481        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
482    }
483
484    /// Returns the bytes associated with a key value with read options.
485    pub fn get_pinned_opt<K: AsRef<[u8]>>(
486        &self,
487        key: K,
488        readopts: &ReadOptions,
489    ) -> Result<Option<DBPinnableSlice>, Error> {
490        let key = key.as_ref();
491        unsafe {
492            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
493                self.inner,
494                readopts.inner,
495                key.as_ptr() as *const c_char,
496                key.len() as size_t,
497            ));
498            if val.is_null() {
499                Ok(None)
500            } else {
501                Ok(Some(DBPinnableSlice::from_c(val)))
502            }
503        }
504    }
505
506    /// Returns the bytes associated with a key value and the given column family with read options.
507    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
508        &self,
509        cf: &impl AsColumnFamilyRef,
510        key: K,
511        readopts: &ReadOptions,
512    ) -> Result<Option<DBPinnableSlice>, Error> {
513        unsafe {
514            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
515                self.inner,
516                readopts.inner,
517                cf.inner(),
518                key.as_ref().as_ptr() as *const c_char,
519                key.as_ref().len() as size_t,
520            ));
521            if val.is_null() {
522                Ok(None)
523            } else {
524                Ok(Some(DBPinnableSlice::from_c(val)))
525            }
526        }
527    }
528
529    /// Return the values associated with the given keys.
530    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
531    where
532        K: AsRef<[u8]>,
533        I: IntoIterator<Item = K>,
534    {
535        self.multi_get_opt(keys, &ReadOptions::default())
536    }
537
538    /// Return the values associated with the given keys using read options.
539    pub fn multi_get_opt<K, I>(
540        &self,
541        keys: I,
542        readopts: &ReadOptions,
543    ) -> Vec<Result<Option<Vec<u8>>, Error>>
544    where
545        K: AsRef<[u8]>,
546        I: IntoIterator<Item = K>,
547    {
548        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
549            .into_iter()
550            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
551            .unzip();
552        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
553
554        let mut values = vec![ptr::null_mut(); keys.len()];
555        let mut values_sizes = vec![0_usize; keys.len()];
556        let mut errors = vec![ptr::null_mut(); keys.len()];
557        unsafe {
558            ffi::rocksdb_transactiondb_multi_get(
559                self.inner,
560                readopts.inner,
561                ptr_keys.len(),
562                ptr_keys.as_ptr(),
563                keys_sizes.as_ptr(),
564                values.as_mut_ptr(),
565                values_sizes.as_mut_ptr(),
566                errors.as_mut_ptr(),
567            );
568        }
569
570        convert_values(values, values_sizes, errors)
571    }
572
573    /// Return the values associated with the given keys and column families.
574    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
575        &'a self,
576        keys: I,
577    ) -> Vec<Result<Option<Vec<u8>>, Error>>
578    where
579        K: AsRef<[u8]>,
580        I: IntoIterator<Item = (&'b W, K)>,
581        W: 'b + AsColumnFamilyRef,
582    {
583        self.multi_get_cf_opt(keys, &ReadOptions::default())
584    }
585
586    /// Return the values associated with the given keys and column families using read options.
587    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
588        &'a self,
589        keys: I,
590        readopts: &ReadOptions,
591    ) -> Vec<Result<Option<Vec<u8>>, Error>>
592    where
593        K: AsRef<[u8]>,
594        I: IntoIterator<Item = (&'b W, K)>,
595        W: 'b + AsColumnFamilyRef,
596    {
597        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
598            .into_iter()
599            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
600            .unzip();
601        let ptr_keys: Vec<_> = cfs_and_keys
602            .iter()
603            .map(|(_, k)| k.as_ptr() as *const c_char)
604            .collect();
605        let ptr_cfs: Vec<_> = cfs_and_keys
606            .iter()
607            .map(|(c, _)| c.inner() as *const _)
608            .collect();
609
610        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
611        let mut values_sizes = vec![0_usize; ptr_keys.len()];
612        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
613        unsafe {
614            ffi::rocksdb_transactiondb_multi_get_cf(
615                self.inner,
616                readopts.inner,
617                ptr_cfs.as_ptr(),
618                ptr_keys.len(),
619                ptr_keys.as_ptr(),
620                keys_sizes.as_ptr(),
621                values.as_mut_ptr(),
622                values_sizes.as_mut_ptr(),
623                errors.as_mut_ptr(),
624            );
625        }
626
627        convert_values(values, values_sizes, errors)
628    }
629
630    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
631    where
632        K: AsRef<[u8]>,
633        V: AsRef<[u8]>,
634    {
635        self.put_opt(key, value, &WriteOptions::default())
636    }
637
638    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
639    where
640        K: AsRef<[u8]>,
641        V: AsRef<[u8]>,
642    {
643        self.put_cf_opt(cf, key, value, &WriteOptions::default())
644    }
645
646    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
647    where
648        K: AsRef<[u8]>,
649        V: AsRef<[u8]>,
650    {
651        unsafe {
652            ffi_try!(ffi::rocksdb_transactiondb_put(
653                self.inner,
654                writeopts.inner,
655                key.as_ref().as_ptr() as *const c_char,
656                key.as_ref().len() as size_t,
657                value.as_ref().as_ptr() as *const c_char,
658                value.as_ref().len() as size_t
659            ));
660        }
661        Ok(())
662    }
663
664    pub fn put_cf_opt<K, V>(
665        &self,
666        cf: &impl AsColumnFamilyRef,
667        key: K,
668        value: V,
669        writeopts: &WriteOptions,
670    ) -> Result<(), Error>
671    where
672        K: AsRef<[u8]>,
673        V: AsRef<[u8]>,
674    {
675        unsafe {
676            ffi_try!(ffi::rocksdb_transactiondb_put_cf(
677                self.inner,
678                writeopts.inner,
679                cf.inner(),
680                key.as_ref().as_ptr() as *const c_char,
681                key.as_ref().len() as size_t,
682                value.as_ref().as_ptr() as *const c_char,
683                value.as_ref().len() as size_t
684            ));
685        }
686        Ok(())
687    }
688
689    pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
690        self.write_opt(batch, &WriteOptions::default())
691    }
692
693    pub fn write_opt(
694        &self,
695        batch: WriteBatchWithTransaction<true>,
696        writeopts: &WriteOptions,
697    ) -> Result<(), Error> {
698        unsafe {
699            ffi_try!(ffi::rocksdb_transactiondb_write(
700                self.inner,
701                writeopts.inner,
702                batch.inner
703            ));
704        }
705        Ok(())
706    }
707
708    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
709    where
710        K: AsRef<[u8]>,
711        V: AsRef<[u8]>,
712    {
713        self.merge_opt(key, value, &WriteOptions::default())
714    }
715
716    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
717    where
718        K: AsRef<[u8]>,
719        V: AsRef<[u8]>,
720    {
721        self.merge_cf_opt(cf, key, value, &WriteOptions::default())
722    }
723
724    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
725    where
726        K: AsRef<[u8]>,
727        V: AsRef<[u8]>,
728    {
729        unsafe {
730            ffi_try!(ffi::rocksdb_transactiondb_merge(
731                self.inner,
732                writeopts.inner,
733                key.as_ref().as_ptr() as *const c_char,
734                key.as_ref().len() as size_t,
735                value.as_ref().as_ptr() as *const c_char,
736                value.as_ref().len() as size_t,
737            ));
738            Ok(())
739        }
740    }
741
742    pub fn merge_cf_opt<K, V>(
743        &self,
744        cf: &impl AsColumnFamilyRef,
745        key: K,
746        value: V,
747        writeopts: &WriteOptions,
748    ) -> Result<(), Error>
749    where
750        K: AsRef<[u8]>,
751        V: AsRef<[u8]>,
752    {
753        unsafe {
754            ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
755                self.inner,
756                writeopts.inner,
757                cf.inner(),
758                key.as_ref().as_ptr() as *const c_char,
759                key.as_ref().len() as size_t,
760                value.as_ref().as_ptr() as *const c_char,
761                value.as_ref().len() as size_t,
762            ));
763            Ok(())
764        }
765    }
766
767    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
768        self.delete_opt(key, &WriteOptions::default())
769    }
770
771    pub fn delete_cf<K: AsRef<[u8]>>(
772        &self,
773        cf: &impl AsColumnFamilyRef,
774        key: K,
775    ) -> Result<(), Error> {
776        self.delete_cf_opt(cf, key, &WriteOptions::default())
777    }
778
779    pub fn delete_opt<K: AsRef<[u8]>>(
780        &self,
781        key: K,
782        writeopts: &WriteOptions,
783    ) -> Result<(), Error> {
784        unsafe {
785            ffi_try!(ffi::rocksdb_transactiondb_delete(
786                self.inner,
787                writeopts.inner,
788                key.as_ref().as_ptr() as *const c_char,
789                key.as_ref().len() as size_t,
790            ));
791        }
792        Ok(())
793    }
794
795    pub fn delete_cf_opt<K: AsRef<[u8]>>(
796        &self,
797        cf: &impl AsColumnFamilyRef,
798        key: K,
799        writeopts: &WriteOptions,
800    ) -> Result<(), Error> {
801        unsafe {
802            ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
803                self.inner,
804                writeopts.inner,
805                cf.inner(),
806                key.as_ref().as_ptr() as *const c_char,
807                key.as_ref().len() as size_t,
808            ));
809        }
810        Ok(())
811    }
812
813    pub fn iterator<'a: 'b, 'b>(
814        &'a self,
815        mode: IteratorMode,
816    ) -> DBIteratorWithThreadMode<'b, Self> {
817        let readopts = ReadOptions::default();
818        self.iterator_opt(mode, readopts)
819    }
820
821    pub fn iterator_opt<'a: 'b, 'b>(
822        &'a self,
823        mode: IteratorMode,
824        readopts: ReadOptions,
825    ) -> DBIteratorWithThreadMode<'b, Self> {
826        DBIteratorWithThreadMode::new(self, readopts, mode)
827    }
828
829    /// Opens an iterator using the provided ReadOptions.
830    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
831    pub fn iterator_cf_opt<'a: 'b, 'b>(
832        &'a self,
833        cf_handle: &impl AsColumnFamilyRef,
834        readopts: ReadOptions,
835        mode: IteratorMode,
836    ) -> DBIteratorWithThreadMode<'b, Self> {
837        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
838    }
839
840    /// Opens an iterator with `set_total_order_seek` enabled.
841    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
842    /// with a Hash-based implementation.
843    pub fn full_iterator<'a: 'b, 'b>(
844        &'a self,
845        mode: IteratorMode,
846    ) -> DBIteratorWithThreadMode<'b, Self> {
847        let mut opts = ReadOptions::default();
848        opts.set_total_order_seek(true);
849        DBIteratorWithThreadMode::new(self, opts, mode)
850    }
851
852    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
853        &'a self,
854        prefix: P,
855    ) -> DBIteratorWithThreadMode<'b, Self> {
856        let mut opts = ReadOptions::default();
857        opts.set_prefix_same_as_start(true);
858        DBIteratorWithThreadMode::new(
859            self,
860            opts,
861            IteratorMode::From(prefix.as_ref(), Direction::Forward),
862        )
863    }
864
865    pub fn iterator_cf<'a: 'b, 'b>(
866        &'a self,
867        cf_handle: &impl AsColumnFamilyRef,
868        mode: IteratorMode,
869    ) -> DBIteratorWithThreadMode<'b, Self> {
870        let opts = ReadOptions::default();
871        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
872    }
873
874    pub fn full_iterator_cf<'a: 'b, 'b>(
875        &'a self,
876        cf_handle: &impl AsColumnFamilyRef,
877        mode: IteratorMode,
878    ) -> DBIteratorWithThreadMode<'b, Self> {
879        let mut opts = ReadOptions::default();
880        opts.set_total_order_seek(true);
881        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
882    }
883
884    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
885        &'a self,
886        cf_handle: &impl AsColumnFamilyRef,
887        prefix: P,
888    ) -> DBIteratorWithThreadMode<'a, Self> {
889        let mut opts = ReadOptions::default();
890        opts.set_prefix_same_as_start(true);
891        DBIteratorWithThreadMode::<'a, Self>::new_cf(
892            self,
893            cf_handle.inner(),
894            opts,
895            IteratorMode::From(prefix.as_ref(), Direction::Forward),
896        )
897    }
898
899    /// Opens a raw iterator over the database, using the default read options
900    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
901        let opts = ReadOptions::default();
902        DBRawIteratorWithThreadMode::new(self, opts)
903    }
904
905    /// Opens a raw iterator over the given column family, using the default read options
906    pub fn raw_iterator_cf<'a: 'b, 'b>(
907        &'a self,
908        cf_handle: &impl AsColumnFamilyRef,
909    ) -> DBRawIteratorWithThreadMode<'b, Self> {
910        let opts = ReadOptions::default();
911        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
912    }
913
914    /// Opens a raw iterator over the database, using the given read options
915    pub fn raw_iterator_opt<'a: 'b, 'b>(
916        &'a self,
917        readopts: ReadOptions,
918    ) -> DBRawIteratorWithThreadMode<'b, Self> {
919        DBRawIteratorWithThreadMode::new(self, readopts)
920    }
921
922    /// Opens a raw iterator over the given column family, using the given read options
923    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
924        &'a self,
925        cf_handle: &impl AsColumnFamilyRef,
926        readopts: ReadOptions,
927    ) -> DBRawIteratorWithThreadMode<'b, Self> {
928        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
929    }
930
931    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
932        SnapshotWithThreadMode::<Self>::new(self)
933    }
934}
935
936impl TransactionDB<SingleThreaded> {
937    /// Creates column family with given name and options.
938    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
939        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
940        self.cfs
941            .cfs
942            .insert(name.as_ref().to_string(), ColumnFamily { inner });
943        Ok(())
944    }
945
946    /// Returns the underlying column family handle.
947    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
948        self.cfs.cfs.get(name)
949    }
950}
951
952impl TransactionDB<MultiThreaded> {
953    /// Creates column family with given name and options.
954    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
955        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
956        self.cfs.cfs.write().unwrap().insert(
957            name.as_ref().to_string(),
958            Arc::new(UnboundColumnFamily { inner }),
959        );
960        Ok(())
961    }
962
963    /// Returns the underlying column family handle.
964    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
965        self.cfs
966            .cfs
967            .read()
968            .unwrap()
969            .get(name)
970            .cloned()
971            .map(UnboundColumnFamily::bound_column_family)
972    }
973}
974
975impl<T: ThreadMode> Drop for TransactionDB<T> {
976    fn drop(&mut self) {
977        unsafe {
978            self.prepared_transactions().clear();
979            self.cfs.drop_all_cfs_internal();
980            ffi::rocksdb_transactiondb_close(self.inner);
981        }
982    }
983}