Skip to main content

rust_rocksdb/transactions/
transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{
17    collections::BTreeMap,
18    ffi::CString,
19    fs, iter,
20    marker::PhantomData,
21    path::{Path, PathBuf},
22    ptr,
23    sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31    AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, DB,
32    DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
33    DEFAULT_COLUMN_FAMILY_NAME, Direction, Error, FlushOptions, IteratorMode, MultiThreaded,
34    Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode, ThreadMode, Transaction,
35    TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction, WriteOptions,
36    column_family::UnboundColumnFamily,
37    db::{DBAccess, convert_values},
38    db_options::OptionsMustOutliveDB,
39    ffi,
40    ffi_util::to_cpath,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_uchar, c_void, size_t};
44
45// Default options are kept per-thread to avoid re-allocating on every call while
46// also preventing cross-thread sharing. Some RocksDB option wrappers hold
47// pointers into internal buffers and are not safe to share across threads.
48// Using thread_local allows cheap reuse in the common "default options" path
49// without synchronization overhead. Callers who need non-defaults must pass
50// explicit options.
51thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
52thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
53thread_local! { static DEFAULT_FLUSH_OPTS: FlushOptions = FlushOptions::default(); }
54
55#[cfg(not(feature = "multi-threaded-cf"))]
56type DefaultThreadMode = crate::SingleThreaded;
57#[cfg(feature = "multi-threaded-cf")]
58type DefaultThreadMode = crate::MultiThreaded;
59
60/// RocksDB TransactionDB.
61///
62/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
63/// to learn more about RocksDB TransactionDB.
64///
65/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
66/// if feature `multi-threaded-cf` is not enabled.
67///
68/// ```
69/// use rust_rocksdb::{DB, Options, TransactionDB, SingleThreaded};
70/// let tempdir = tempfile::Builder::new()
71///     .prefix("_path_for_transaction_db")
72///     .tempdir()
73///     .expect("Failed to create temporary path for the _path_for_transaction_db");
74/// let path = tempdir.path();
75/// {
76///     let db: TransactionDB = TransactionDB::open_default(path).unwrap();
77///     db.put(b"my key", b"my value").unwrap();
78///
79///     // create transaction
80///     let txn = db.transaction();
81///     txn.put(b"key2", b"value2");
82///     txn.put(b"key3", b"value3");
83///     txn.commit().unwrap();
84/// }
85/// let _ = DB::destroy(&Options::default(), path);
86/// ```
87///
88/// [`SingleThreaded`]: crate::SingleThreaded
89///
90/// A `Snapshot` must not outlive the `TransactionDB` it was created from:
91///
92/// ```compile_fail,E0597
93/// use rust_rocksdb::{SingleThreaded, TransactionDB};
94///
95/// let _snapshot = {
96///     let db = TransactionDB::<SingleThreaded>::open_default("foo").unwrap();
97///     db.snapshot()
98/// };
99/// ```
100pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
101    pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
102    cfs: T,
103    path: PathBuf,
104    // prepared 2pc transactions.
105    prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
106    _outlive: Vec<OptionsMustOutliveDB>,
107}
108
109unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
110unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
111
112impl<T: ThreadMode> DBAccess for TransactionDB<T> {
113    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
114        unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
115    }
116
117    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
118        unsafe {
119            ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
120        }
121    }
122
123    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
124        unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
125    }
126
127    unsafe fn create_iterator_cf(
128        &self,
129        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
130        readopts: &ReadOptions,
131    ) -> *mut ffi::rocksdb_iterator_t {
132        unsafe {
133            ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
134        }
135    }
136
137    fn get_opt<K: AsRef<[u8]>>(
138        &self,
139        key: K,
140        readopts: &ReadOptions,
141    ) -> Result<Option<Vec<u8>>, Error> {
142        self.get_opt(key, readopts)
143    }
144
145    fn get_cf_opt<K: AsRef<[u8]>>(
146        &self,
147        cf: &impl AsColumnFamilyRef,
148        key: K,
149        readopts: &ReadOptions,
150    ) -> Result<Option<Vec<u8>>, Error> {
151        self.get_cf_opt(cf, key, readopts)
152    }
153
154    fn get_pinned_opt<K: AsRef<[u8]>>(
155        &'_ self,
156        key: K,
157        readopts: &ReadOptions,
158    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
159        self.get_pinned_opt(key, readopts)
160    }
161
162    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
163        &'_ self,
164        cf: &impl AsColumnFamilyRef,
165        key: K,
166        readopts: &ReadOptions,
167    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
168        self.get_pinned_cf_opt(cf, key, readopts)
169    }
170
171    fn multi_get_opt<K, I>(
172        &self,
173        keys: I,
174        readopts: &ReadOptions,
175    ) -> Vec<Result<Option<Vec<u8>>, Error>>
176    where
177        K: AsRef<[u8]>,
178        I: IntoIterator<Item = K>,
179    {
180        self.multi_get_opt(keys, readopts)
181    }
182
183    fn multi_get_cf_opt<'b, K, I, W>(
184        &self,
185        keys_cf: I,
186        readopts: &ReadOptions,
187    ) -> Vec<Result<Option<Vec<u8>>, Error>>
188    where
189        K: AsRef<[u8]>,
190        I: IntoIterator<Item = (&'b W, K)>,
191        W: AsColumnFamilyRef + 'b,
192    {
193        self.multi_get_cf_opt(keys_cf, readopts)
194    }
195}
196
197impl<T: ThreadMode> TransactionDB<T> {
198    /// Opens a database with default options.
199    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
200        let mut opts = Options::default();
201        opts.create_if_missing(true);
202        let txn_db_opts = TransactionDBOptions::default();
203        Self::open(&opts, &txn_db_opts, path)
204    }
205
206    /// Opens the database with the specified options.
207    pub fn open<P: AsRef<Path>>(
208        opts: &Options,
209        txn_db_opts: &TransactionDBOptions,
210        path: P,
211    ) -> Result<Self, Error> {
212        Self::open_cf(opts, txn_db_opts, path, None::<&str>)
213    }
214
215    /// Opens a database with the given database options and column family names.
216    ///
217    /// Column families opened using this function will be created with default `Options`.
218    pub fn open_cf<P, I, N>(
219        opts: &Options,
220        txn_db_opts: &TransactionDBOptions,
221        path: P,
222        cfs: I,
223    ) -> Result<Self, Error>
224    where
225        P: AsRef<Path>,
226        I: IntoIterator<Item = N>,
227        N: AsRef<str>,
228    {
229        let cfs = cfs
230            .into_iter()
231            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
232
233        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
234    }
235
236    /// Opens a database with the given database options and column family descriptors.
237    pub fn open_cf_descriptors<P, I>(
238        opts: &Options,
239        txn_db_opts: &TransactionDBOptions,
240        path: P,
241        cfs: I,
242    ) -> Result<Self, Error>
243    where
244        P: AsRef<Path>,
245        I: IntoIterator<Item = ColumnFamilyDescriptor>,
246    {
247        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
248    }
249
250    /// Internal implementation for opening RocksDB.
251    fn open_cf_descriptors_internal<P, I>(
252        opts: &Options,
253        txn_db_opts: &TransactionDBOptions,
254        path: P,
255        cfs: I,
256    ) -> Result<Self, Error>
257    where
258        P: AsRef<Path>,
259        I: IntoIterator<Item = ColumnFamilyDescriptor>,
260    {
261        let cfs: Vec<_> = cfs.into_iter().collect();
262        let outlive = iter::once(opts.outlive.clone())
263            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
264            .collect();
265
266        let cpath = to_cpath(&path)?;
267
268        if let Err(e) = fs::create_dir_all(&path) {
269            return Err(Error::new(format!(
270                "Failed to create RocksDB directory: `{e:?}`."
271            )));
272        }
273
274        let db: *mut ffi::rocksdb_transactiondb_t;
275        let mut cf_map = BTreeMap::new();
276
277        if cfs.is_empty() {
278            db = Self::open_raw(opts, txn_db_opts, &cpath)?;
279        } else {
280            let mut cfs_v = cfs;
281            // Always open the default column family.
282            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
283                cfs_v.push(ColumnFamilyDescriptor {
284                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
285                    options: Options::default(),
286                    ttl: ColumnFamilyTtl::SameAsDb, // it will have ttl specified in `DBWithThreadMode::open_with_ttl`
287                });
288            }
289            // We need to store our CStrings in an intermediate vector
290            // so that their pointers remain valid.
291            let c_cfs: Vec<CString> = cfs_v
292                .iter()
293                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
294                .collect();
295
296            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
297
298            // These handles will be populated by DB.
299            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
300
301            let cfopts: Vec<_> = cfs_v
302                .iter()
303                .map(|cf| cf.options.inner.cast_const())
304                .collect();
305
306            db = Self::open_cf_raw(
307                opts,
308                txn_db_opts,
309                &cpath,
310                &cfs_v,
311                &cfnames,
312                &cfopts,
313                &mut cfhandles,
314            )?;
315
316            for handle in &cfhandles {
317                if handle.is_null() {
318                    return Err(Error::new(
319                        "Received null column family handle from DB.".to_owned(),
320                    ));
321                }
322            }
323
324            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
325                cf_map.insert(cf_desc.name.clone(), inner);
326            }
327        }
328
329        if db.is_null() {
330            return Err(Error::new("Could not initialize database.".to_owned()));
331        }
332
333        let prepared = unsafe {
334            let mut cnt = 0;
335            let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &raw mut cnt);
336            let mut vec = vec![std::ptr::null_mut(); cnt];
337            if !ptr.is_null() {
338                std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
339                ffi::rocksdb_free(ptr as *mut c_void);
340            }
341            vec
342        };
343
344        Ok(TransactionDB {
345            inner: db,
346            cfs: T::new_cf_map_internal(cf_map),
347            path: path.as_ref().to_path_buf(),
348            prepared: Mutex::new(prepared),
349            _outlive: outlive,
350        })
351    }
352
353    fn open_raw(
354        opts: &Options,
355        txn_db_opts: &TransactionDBOptions,
356        cpath: &CString,
357    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
358        unsafe {
359            let db = ffi_try!(ffi::rocksdb_transactiondb_open(
360                opts.inner,
361                txn_db_opts.inner,
362                cpath.as_ptr()
363            ));
364            Ok(db)
365        }
366    }
367
368    fn open_cf_raw(
369        opts: &Options,
370        txn_db_opts: &TransactionDBOptions,
371        cpath: &CString,
372        cfs_v: &[ColumnFamilyDescriptor],
373        cfnames: &[*const c_char],
374        cfopts: &[*const ffi::rocksdb_options_t],
375        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
376    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
377        unsafe {
378            let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
379                opts.inner,
380                txn_db_opts.inner,
381                cpath.as_ptr(),
382                cfs_v.len() as c_int,
383                cfnames.as_ptr(),
384                cfopts.as_ptr(),
385                cfhandles.as_mut_ptr(),
386            ));
387            Ok(db)
388        }
389    }
390
391    fn create_inner_cf_handle(
392        &self,
393        name: &str,
394        opts: &Options,
395    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
396        let cf_name = CString::new(name.as_bytes()).map_err(|_| {
397            Error::new("Failed to convert path to CString when creating cf".to_owned())
398        })?;
399
400        Ok(unsafe {
401            ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
402                self.inner,
403                opts.inner,
404                cf_name.as_ptr(),
405            ))
406        })
407    }
408
409    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
410        DB::list_cf(opts, path)
411    }
412
413    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
414        DB::destroy(opts, path)
415    }
416
417    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
418        DB::repair(opts, path)
419    }
420
421    pub fn path(&self) -> &Path {
422        self.path.as_path()
423    }
424
425    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
426    /// the data to disk.
427    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
428        unsafe {
429            ffi_try!(ffi::rocksdb_transactiondb_flush_wal(
430                self.inner,
431                c_uchar::from(sync)
432            ));
433        }
434        Ok(())
435    }
436
437    /// Flushes database memtables to SST files on the disk.
438    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
439        unsafe {
440            ffi_try!(ffi::rocksdb_transactiondb_flush(
441                self.inner,
442                flushopts.inner
443            ));
444        }
445        Ok(())
446    }
447
448    /// Flushes database memtables to SST files on the disk using default options.
449    pub fn flush(&self) -> Result<(), Error> {
450        DEFAULT_FLUSH_OPTS.with(|opts| self.flush_opt(opts))
451    }
452
453    /// Flushes database memtables to SST files on the disk for a given column family.
454    pub fn flush_cf_opt(
455        &self,
456        cf: &impl AsColumnFamilyRef,
457        flushopts: &FlushOptions,
458    ) -> Result<(), Error> {
459        unsafe {
460            ffi_try!(ffi::rocksdb_transactiondb_flush_cf(
461                self.inner,
462                flushopts.inner,
463                cf.inner()
464            ));
465        }
466        Ok(())
467    }
468
469    /// Flushes multiple column families.
470    ///
471    /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
472    /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
473    /// number at the time when flush is requested.
474    pub fn flush_cfs_opt(
475        &self,
476        cfs: &[&impl AsColumnFamilyRef],
477        opts: &FlushOptions,
478    ) -> Result<(), Error> {
479        let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
480        unsafe {
481            ffi_try!(ffi::rocksdb_transactiondb_flush_cfs(
482                self.inner,
483                opts.inner,
484                cfs.as_mut_ptr(),
485                cfs.len() as c_int,
486            ));
487        }
488        Ok(())
489    }
490
491    /// Flushes database memtables to SST files on the disk for a given column family using default
492    /// options.
493    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
494        DEFAULT_FLUSH_OPTS.with(|opts| self.flush_cf_opt(cf, opts))
495    }
496
497    /// Creates a transaction with default options.
498    pub fn transaction(&'_ self) -> Transaction<'_, Self> {
499        DEFAULT_WRITE_OPTS.with(|opts| self.transaction_opt(opts, &TransactionOptions::default()))
500    }
501
502    /// Creates a transaction with options.
503    pub fn transaction_opt<'a>(
504        &'a self,
505        write_opts: &WriteOptions,
506        txn_opts: &TransactionOptions,
507    ) -> Transaction<'a, Self> {
508        Transaction {
509            inner: unsafe {
510                ffi::rocksdb_transaction_begin(
511                    self.inner,
512                    write_opts.inner,
513                    txn_opts.inner,
514                    std::ptr::null_mut(),
515                )
516            },
517            _marker: PhantomData,
518        }
519    }
520
521    /// Get all prepared transactions for recovery.
522    ///
523    /// This function is expected to call once after open database.
524    /// User should commit or rollback all transactions before start other transactions.
525    pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
526        self.prepared
527            .lock()
528            .unwrap()
529            .drain(0..)
530            .map(|inner| Transaction {
531                inner,
532                _marker: PhantomData,
533            })
534            .collect()
535    }
536
537    /// Returns the bytes associated with a key value.
538    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
539        self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
540    }
541
542    /// Returns the bytes associated with a key value and the given column family.
543    pub fn get_cf<K: AsRef<[u8]>>(
544        &self,
545        cf: &impl AsColumnFamilyRef,
546        key: K,
547    ) -> Result<Option<Vec<u8>>, Error> {
548        self.get_pinned_cf(cf, key)
549            .map(|x| x.map(|v| v.as_ref().to_vec()))
550    }
551
552    /// Returns the bytes associated with a key value with read options.
553    pub fn get_opt<K: AsRef<[u8]>>(
554        &self,
555        key: K,
556        readopts: &ReadOptions,
557    ) -> Result<Option<Vec<u8>>, Error> {
558        self.get_pinned_opt(key, readopts)
559            .map(|x| x.map(|v| v.as_ref().to_vec()))
560    }
561
562    /// Returns the bytes associated with a key value and the given column family with read options.
563    pub fn get_cf_opt<K: AsRef<[u8]>>(
564        &self,
565        cf: &impl AsColumnFamilyRef,
566        key: K,
567        readopts: &ReadOptions,
568    ) -> Result<Option<Vec<u8>>, Error> {
569        self.get_pinned_cf_opt(cf, key, readopts)
570            .map(|x| x.map(|v| v.as_ref().to_vec()))
571    }
572
573    pub fn get_pinned<K: AsRef<[u8]>>(
574        &'_ self,
575        key: K,
576    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
577        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
578    }
579
580    /// Returns the bytes associated with a key value and the given column family.
581    pub fn get_pinned_cf<K: AsRef<[u8]>>(
582        &'_ self,
583        cf: &impl AsColumnFamilyRef,
584        key: K,
585    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
586        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
587    }
588
589    /// Returns the bytes associated with a key value with read options.
590    pub fn get_pinned_opt<K: AsRef<[u8]>>(
591        &'_ self,
592        key: K,
593        readopts: &ReadOptions,
594    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
595        let key = key.as_ref();
596        unsafe {
597            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
598                self.inner,
599                readopts.inner,
600                key.as_ptr() as *const c_char,
601                key.len() as size_t,
602            ));
603            if val.is_null() {
604                Ok(None)
605            } else {
606                Ok(Some(DBPinnableSlice::from_c(val)))
607            }
608        }
609    }
610
611    /// Returns the bytes associated with a key value and the given column family with read options.
612    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
613        &'_ self,
614        cf: &impl AsColumnFamilyRef,
615        key: K,
616        readopts: &ReadOptions,
617    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
618        let key = key.as_ref();
619        unsafe {
620            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
621                self.inner,
622                readopts.inner,
623                cf.inner(),
624                key.as_ptr() as *const c_char,
625                key.len() as size_t,
626            ));
627            if val.is_null() {
628                Ok(None)
629            } else {
630                Ok(Some(DBPinnableSlice::from_c(val)))
631            }
632        }
633    }
634
635    /// Return the values associated with the given keys.
636    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
637    where
638        K: AsRef<[u8]>,
639        I: IntoIterator<Item = K>,
640    {
641        DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
642    }
643
644    /// Return the values associated with the given keys using read options.
645    pub fn multi_get_opt<K, I>(
646        &self,
647        keys: I,
648        readopts: &ReadOptions,
649    ) -> Vec<Result<Option<Vec<u8>>, Error>>
650    where
651        K: AsRef<[u8]>,
652        I: IntoIterator<Item = K>,
653    {
654        let owned_keys: Vec<K> = keys.into_iter().collect();
655        let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
656        let ptr_keys: Vec<*const c_char> = owned_keys
657            .iter()
658            .map(|k| k.as_ref().as_ptr() as *const c_char)
659            .collect();
660
661        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
662        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
663        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
664        unsafe {
665            ffi::rocksdb_transactiondb_multi_get(
666                self.inner,
667                readopts.inner,
668                ptr_keys.len(),
669                ptr_keys.as_ptr(),
670                keys_sizes.as_ptr(),
671                values.as_mut_ptr(),
672                values_sizes.as_mut_ptr(),
673                errors.as_mut_ptr(),
674            );
675        }
676
677        unsafe {
678            values.set_len(ptr_keys.len());
679            values_sizes.set_len(ptr_keys.len());
680            errors.set_len(ptr_keys.len());
681        }
682
683        convert_values(values, values_sizes, errors)
684    }
685
686    /// Return the values associated with the given keys and column families.
687    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
688        &'a self,
689        keys: I,
690    ) -> Vec<Result<Option<Vec<u8>>, Error>>
691    where
692        K: AsRef<[u8]>,
693        I: IntoIterator<Item = (&'b W, K)>,
694        W: 'b + AsColumnFamilyRef,
695    {
696        DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
697    }
698
699    /// Return the values associated with the given keys and column families using read options.
700    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
701        &'a self,
702        keys: I,
703        readopts: &ReadOptions,
704    ) -> Vec<Result<Option<Vec<u8>>, Error>>
705    where
706        K: AsRef<[u8]>,
707        I: IntoIterator<Item = (&'b W, K)>,
708        W: 'b + AsColumnFamilyRef,
709    {
710        let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
711        let keys_sizes: Vec<usize> = cfs_and_owned_keys
712            .iter()
713            .map(|(_, k)| k.as_ref().len())
714            .collect();
715        let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
716            .iter()
717            .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
718            .collect();
719        let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
720            .iter()
721            .map(|(c, _)| c.inner().cast_const())
722            .collect();
723        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
724        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
725        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
726        unsafe {
727            ffi::rocksdb_transactiondb_multi_get_cf(
728                self.inner,
729                readopts.inner,
730                ptr_cfs.as_ptr(),
731                ptr_keys.len(),
732                ptr_keys.as_ptr(),
733                keys_sizes.as_ptr(),
734                values.as_mut_ptr(),
735                values_sizes.as_mut_ptr(),
736                errors.as_mut_ptr(),
737            );
738        }
739
740        unsafe {
741            values.set_len(ptr_keys.len());
742            values_sizes.set_len(ptr_keys.len());
743            errors.set_len(ptr_keys.len());
744        }
745
746        convert_values(values, values_sizes, errors)
747    }
748
749    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
750    where
751        K: AsRef<[u8]>,
752        V: AsRef<[u8]>,
753    {
754        DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
755    }
756
757    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
758    where
759        K: AsRef<[u8]>,
760        V: AsRef<[u8]>,
761    {
762        DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
763    }
764
765    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
766    where
767        K: AsRef<[u8]>,
768        V: AsRef<[u8]>,
769    {
770        let key = key.as_ref();
771        let value = value.as_ref();
772        unsafe {
773            ffi_try!(ffi::rocksdb_transactiondb_put(
774                self.inner,
775                writeopts.inner,
776                key.as_ptr() as *const c_char,
777                key.len() as size_t,
778                value.as_ptr() as *const c_char,
779                value.len() as size_t
780            ));
781        }
782        Ok(())
783    }
784
785    pub fn put_cf_opt<K, V>(
786        &self,
787        cf: &impl AsColumnFamilyRef,
788        key: K,
789        value: V,
790        writeopts: &WriteOptions,
791    ) -> Result<(), Error>
792    where
793        K: AsRef<[u8]>,
794        V: AsRef<[u8]>,
795    {
796        let key = key.as_ref();
797        let value = value.as_ref();
798        unsafe {
799            ffi_try!(ffi::rocksdb_transactiondb_put_cf(
800                self.inner,
801                writeopts.inner,
802                cf.inner(),
803                key.as_ptr() as *const c_char,
804                key.len() as size_t,
805                value.as_ptr() as *const c_char,
806                value.len() as size_t
807            ));
808        }
809        Ok(())
810    }
811
812    pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
813        DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
814    }
815
816    pub fn write_opt(
817        &self,
818        batch: &WriteBatchWithTransaction<true>,
819        writeopts: &WriteOptions,
820    ) -> Result<(), Error> {
821        unsafe {
822            ffi_try!(ffi::rocksdb_transactiondb_write(
823                self.inner,
824                writeopts.inner,
825                batch.inner
826            ));
827        }
828        Ok(())
829    }
830
831    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
832    where
833        K: AsRef<[u8]>,
834        V: AsRef<[u8]>,
835    {
836        DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
837    }
838
839    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
840    where
841        K: AsRef<[u8]>,
842        V: AsRef<[u8]>,
843    {
844        DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
845    }
846
847    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
848    where
849        K: AsRef<[u8]>,
850        V: AsRef<[u8]>,
851    {
852        let key = key.as_ref();
853        let value = value.as_ref();
854        unsafe {
855            ffi_try!(ffi::rocksdb_transactiondb_merge(
856                self.inner,
857                writeopts.inner,
858                key.as_ptr() as *const c_char,
859                key.len() as size_t,
860                value.as_ptr() as *const c_char,
861                value.len() as size_t,
862            ));
863            Ok(())
864        }
865    }
866
867    pub fn merge_cf_opt<K, V>(
868        &self,
869        cf: &impl AsColumnFamilyRef,
870        key: K,
871        value: V,
872        writeopts: &WriteOptions,
873    ) -> Result<(), Error>
874    where
875        K: AsRef<[u8]>,
876        V: AsRef<[u8]>,
877    {
878        let key = key.as_ref();
879        let value = value.as_ref();
880        unsafe {
881            ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
882                self.inner,
883                writeopts.inner,
884                cf.inner(),
885                key.as_ptr() as *const c_char,
886                key.len() as size_t,
887                value.as_ptr() as *const c_char,
888                value.len() as size_t,
889            ));
890            Ok(())
891        }
892    }
893
894    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
895        DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
896    }
897
898    pub fn delete_cf<K: AsRef<[u8]>>(
899        &self,
900        cf: &impl AsColumnFamilyRef,
901        key: K,
902    ) -> Result<(), Error> {
903        DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
904    }
905
906    pub fn delete_opt<K: AsRef<[u8]>>(
907        &self,
908        key: K,
909        writeopts: &WriteOptions,
910    ) -> Result<(), Error> {
911        let key = key.as_ref();
912        unsafe {
913            ffi_try!(ffi::rocksdb_transactiondb_delete(
914                self.inner,
915                writeopts.inner,
916                key.as_ptr() as *const c_char,
917                key.len() as size_t,
918            ));
919        }
920        Ok(())
921    }
922
923    pub fn delete_cf_opt<K: AsRef<[u8]>>(
924        &self,
925        cf: &impl AsColumnFamilyRef,
926        key: K,
927        writeopts: &WriteOptions,
928    ) -> Result<(), Error> {
929        let key = key.as_ref();
930        unsafe {
931            ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
932                self.inner,
933                writeopts.inner,
934                cf.inner(),
935                key.as_ptr() as *const c_char,
936                key.len() as size_t,
937            ));
938        }
939        Ok(())
940    }
941
942    pub fn iterator<'a: 'b, 'b>(
943        &'a self,
944        mode: IteratorMode,
945    ) -> DBIteratorWithThreadMode<'b, Self> {
946        let readopts = ReadOptions::default();
947        self.iterator_opt(mode, readopts)
948    }
949
950    pub fn iterator_opt<'a: 'b, 'b>(
951        &'a self,
952        mode: IteratorMode,
953        readopts: ReadOptions,
954    ) -> DBIteratorWithThreadMode<'b, Self> {
955        DBIteratorWithThreadMode::new(self, readopts, mode)
956    }
957
958    /// Opens an iterator using the provided ReadOptions.
959    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
960    pub fn iterator_cf_opt<'a: 'b, 'b>(
961        &'a self,
962        cf_handle: &impl AsColumnFamilyRef,
963        readopts: ReadOptions,
964        mode: IteratorMode,
965    ) -> DBIteratorWithThreadMode<'b, Self> {
966        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
967    }
968
969    /// Opens an iterator with `set_total_order_seek` enabled.
970    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
971    /// with a Hash-based implementation.
972    pub fn full_iterator<'a: 'b, 'b>(
973        &'a self,
974        mode: IteratorMode,
975    ) -> DBIteratorWithThreadMode<'b, Self> {
976        let mut opts = ReadOptions::default();
977        opts.set_total_order_seek(true);
978        DBIteratorWithThreadMode::new(self, opts, mode)
979    }
980
981    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
982        &'a self,
983        prefix: P,
984    ) -> DBIteratorWithThreadMode<'b, Self> {
985        let mut opts = ReadOptions::default();
986        opts.set_prefix_same_as_start(true);
987        DBIteratorWithThreadMode::new(
988            self,
989            opts,
990            IteratorMode::From(prefix.as_ref(), Direction::Forward),
991        )
992    }
993
994    pub fn iterator_cf<'a: 'b, 'b>(
995        &'a self,
996        cf_handle: &impl AsColumnFamilyRef,
997        mode: IteratorMode,
998    ) -> DBIteratorWithThreadMode<'b, Self> {
999        let opts = ReadOptions::default();
1000        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1001    }
1002
1003    pub fn full_iterator_cf<'a: 'b, 'b>(
1004        &'a self,
1005        cf_handle: &impl AsColumnFamilyRef,
1006        mode: IteratorMode,
1007    ) -> DBIteratorWithThreadMode<'b, Self> {
1008        let mut opts = ReadOptions::default();
1009        opts.set_total_order_seek(true);
1010        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1011    }
1012
1013    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1014        &'a self,
1015        cf_handle: &impl AsColumnFamilyRef,
1016        prefix: P,
1017    ) -> DBIteratorWithThreadMode<'a, Self> {
1018        let mut opts = ReadOptions::default();
1019        opts.set_prefix_same_as_start(true);
1020        DBIteratorWithThreadMode::<'a, Self>::new_cf(
1021            self,
1022            cf_handle.inner(),
1023            opts,
1024            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1025        )
1026    }
1027
1028    /// Opens a raw iterator over the database, using the default read options
1029    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1030        let opts = ReadOptions::default();
1031        DBRawIteratorWithThreadMode::new(self, opts)
1032    }
1033
1034    /// Opens a raw iterator over the given column family, using the default read options
1035    pub fn raw_iterator_cf<'a: 'b, 'b>(
1036        &'a self,
1037        cf_handle: &impl AsColumnFamilyRef,
1038    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1039        let opts = ReadOptions::default();
1040        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1041    }
1042
1043    /// Opens a raw iterator over the database, using the given read options
1044    pub fn raw_iterator_opt<'a: 'b, 'b>(
1045        &'a self,
1046        readopts: ReadOptions,
1047    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1048        DBRawIteratorWithThreadMode::new(self, readopts)
1049    }
1050
1051    /// Opens a raw iterator over the given column family, using the given read options
1052    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1053        &'a self,
1054        cf_handle: &impl AsColumnFamilyRef,
1055        readopts: ReadOptions,
1056    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1057        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1058    }
1059
1060    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
1061        SnapshotWithThreadMode::<Self>::new(self)
1062    }
1063
1064    fn drop_column_family<C>(
1065        &self,
1066        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
1067        _cf: C,
1068    ) -> Result<(), Error> {
1069        unsafe {
1070            // first mark the column family as dropped
1071            ffi_try!(ffi::rocksdb_drop_column_family(
1072                self.inner as *mut ffi::rocksdb_t,
1073                cf_inner
1074            ));
1075        }
1076        // Since `_cf` is dropped here, the column family handle is destroyed
1077        // and any resources (mem, files) are reclaimed.
1078        Ok(())
1079    }
1080}
1081
1082impl TransactionDB<SingleThreaded> {
1083    /// Creates column family with given name and options.
1084    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
1085        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1086        self.cfs
1087            .cfs
1088            .insert(name.as_ref().to_string(), ColumnFamily { inner });
1089        Ok(())
1090    }
1091
1092    /// Returns the underlying column family handle.
1093    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
1094        self.cfs.cfs.get(name)
1095    }
1096
1097    /// Drops the column family with the given name
1098    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
1099        match self.cfs.cfs.remove(name) {
1100            Some(cf) => self.drop_column_family(cf.inner, cf),
1101            _ => Err(Error::new(format!("Invalid column family: {name}"))),
1102        }
1103    }
1104}
1105
1106impl TransactionDB<MultiThreaded> {
1107    /// Creates column family with given name and options.
1108    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1109        // Note that we acquire the cfs lock before inserting: otherwise we might race
1110        // another caller who observed the handle as missing.
1111        let mut cfs = self.cfs.cfs.write();
1112        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1113        cfs.insert(
1114            name.as_ref().to_string(),
1115            Arc::new(UnboundColumnFamily { inner }),
1116        );
1117        Ok(())
1118    }
1119
1120    /// Returns the underlying column family handle.
1121    pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
1122        self.cfs
1123            .cfs
1124            .read()
1125            .get(name)
1126            .cloned()
1127            .map(UnboundColumnFamily::bound_column_family)
1128    }
1129
1130    /// Drops the column family with the given name by internally locking the inner column
1131    /// family map. This avoids needing `&mut self` reference
1132    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1133        match self.cfs.cfs.write().remove(name) {
1134            Some(cf) => self.drop_column_family(cf.inner, cf),
1135            _ => Err(Error::new(format!("Invalid column family: {name}"))),
1136        }
1137    }
1138
1139    /// Implementation for property_value et al methods.
1140    ///
1141    /// `name` is the name of the property.  It will be converted into a CString
1142    /// and passed to `get_property` as argument.  `get_property` reads the
1143    /// specified property and either returns NULL or a pointer to a C allocated
1144    /// string; this method takes ownership of that string and will free it at
1145    /// the end. That string is parsed using `parse` callback which produces
1146    /// the returned result.
1147    fn property_value_impl<R>(
1148        name: impl CStrLike,
1149        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1150        parse: impl FnOnce(&str) -> Result<R, Error>,
1151    ) -> Result<Option<R>, Error> {
1152        let value = match name.bake() {
1153            Ok(prop_name) => get_property(prop_name.as_ptr()),
1154            Err(e) => {
1155                return Err(Error::new(format!(
1156                    "Failed to convert property name to CString: {e}"
1157                )));
1158            }
1159        };
1160        if value.is_null() {
1161            return Ok(None);
1162        }
1163        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1164            Ok(s) => parse(s).map(|value| Some(value)),
1165            Err(e) => Err(Error::new(format!(
1166                "Failed to convert property value to string: {e}"
1167            ))),
1168        };
1169        unsafe {
1170            ffi::rocksdb_free(value as *mut c_void);
1171        }
1172        result
1173    }
1174
1175    /// Retrieves a RocksDB property by name.
1176    ///
1177    /// Full list of properties could be find
1178    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1179    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1180        Self::property_value_impl(
1181            name,
1182            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1183            |str_value| Ok(str_value.to_owned()),
1184        )
1185    }
1186
1187    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1188        value.parse::<u64>().map_err(|err| {
1189            Error::new(format!(
1190                "Failed to convert property value {value} to int: {err}"
1191            ))
1192        })
1193    }
1194
1195    /// Retrieves a RocksDB property and casts it to an integer.
1196    ///
1197    /// Full list of properties that return int values could be find
1198    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1199    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1200        Self::property_value_impl(
1201            name,
1202            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1203            Self::parse_property_int_value,
1204        )
1205    }
1206}
1207
1208impl<T: ThreadMode> Drop for TransactionDB<T> {
1209    fn drop(&mut self) {
1210        unsafe {
1211            self.prepared_transactions().clear();
1212            self.cfs.drop_all_cfs_internal();
1213            ffi::rocksdb_transactiondb_close(self.inner);
1214        }
1215    }
1216}