rust_rocksdb/transactions/
transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{
17    collections::BTreeMap,
18    ffi::CString,
19    fs, iter,
20    marker::PhantomData,
21    path::{Path, PathBuf},
22    ptr,
23    sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31    column_family::UnboundColumnFamily,
32    db::{convert_values, DBAccess},
33    db_options::OptionsMustOutliveDB,
34    ffi,
35    ffi_util::to_cpath,
36    AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
37    DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
38    IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
39    ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
40    WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_void, size_t};
44
45#[cfg(not(feature = "multi-threaded-cf"))]
46type DefaultThreadMode = crate::SingleThreaded;
47#[cfg(feature = "multi-threaded-cf")]
48type DefaultThreadMode = crate::MultiThreaded;
49
50/// RocksDB TransactionDB.
51///
52/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
53/// to learn more about RocksDB TransactionDB.
54///
55/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
56/// if feature `multi-threaded-cf` is not enabled.
57///
58/// ```
59/// use rust_rocksdb::{DB, Options, TransactionDB, SingleThreaded};
60/// let tempdir = tempfile::Builder::new()
61///     .prefix("_path_for_transaction_db")
62///     .tempdir()
63///     .expect("Failed to create temporary path for the _path_for_transaction_db");
64/// let path = tempdir.path();
65/// {
66///     let db: TransactionDB = TransactionDB::open_default(path).unwrap();
67///     db.put(b"my key", b"my value").unwrap();
68///
69///     // create transaction
70///     let txn = db.transaction();
71///     txn.put(b"key2", b"value2");
72///     txn.put(b"key3", b"value3");
73///     txn.commit().unwrap();
74/// }
75/// let _ = DB::destroy(&Options::default(), path);
76/// ```
77///
78/// [`SingleThreaded`]: crate::SingleThreaded
79pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
80    pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
81    cfs: T,
82    path: PathBuf,
83    // prepared 2pc transactions.
84    prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
85    _outlive: Vec<OptionsMustOutliveDB>,
86}
87
88unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
89unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
90
91impl<T: ThreadMode> DBAccess for TransactionDB<T> {
92    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
93        unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
94    }
95
96    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
97        unsafe {
98            ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
99        }
100    }
101
102    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
103        unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
104    }
105
106    unsafe fn create_iterator_cf(
107        &self,
108        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
109        readopts: &ReadOptions,
110    ) -> *mut ffi::rocksdb_iterator_t {
111        unsafe {
112            ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
113        }
114    }
115
116    fn get_opt<K: AsRef<[u8]>>(
117        &self,
118        key: K,
119        readopts: &ReadOptions,
120    ) -> Result<Option<Vec<u8>>, Error> {
121        self.get_opt(key, readopts)
122    }
123
124    fn get_cf_opt<K: AsRef<[u8]>>(
125        &self,
126        cf: &impl AsColumnFamilyRef,
127        key: K,
128        readopts: &ReadOptions,
129    ) -> Result<Option<Vec<u8>>, Error> {
130        self.get_cf_opt(cf, key, readopts)
131    }
132
133    fn get_pinned_opt<K: AsRef<[u8]>>(
134        &'_ self,
135        key: K,
136        readopts: &ReadOptions,
137    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
138        self.get_pinned_opt(key, readopts)
139    }
140
141    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
142        &'_ self,
143        cf: &impl AsColumnFamilyRef,
144        key: K,
145        readopts: &ReadOptions,
146    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
147        self.get_pinned_cf_opt(cf, key, readopts)
148    }
149
150    fn multi_get_opt<K, I>(
151        &self,
152        keys: I,
153        readopts: &ReadOptions,
154    ) -> Vec<Result<Option<Vec<u8>>, Error>>
155    where
156        K: AsRef<[u8]>,
157        I: IntoIterator<Item = K>,
158    {
159        self.multi_get_opt(keys, readopts)
160    }
161
162    fn multi_get_cf_opt<'b, K, I, W>(
163        &self,
164        keys_cf: I,
165        readopts: &ReadOptions,
166    ) -> Vec<Result<Option<Vec<u8>>, Error>>
167    where
168        K: AsRef<[u8]>,
169        I: IntoIterator<Item = (&'b W, K)>,
170        W: AsColumnFamilyRef + 'b,
171    {
172        self.multi_get_cf_opt(keys_cf, readopts)
173    }
174}
175
176impl<T: ThreadMode> TransactionDB<T> {
177    /// Opens a database with default options.
178    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
179        let mut opts = Options::default();
180        opts.create_if_missing(true);
181        let txn_db_opts = TransactionDBOptions::default();
182        Self::open(&opts, &txn_db_opts, path)
183    }
184
185    /// Opens the database with the specified options.
186    pub fn open<P: AsRef<Path>>(
187        opts: &Options,
188        txn_db_opts: &TransactionDBOptions,
189        path: P,
190    ) -> Result<Self, Error> {
191        Self::open_cf(opts, txn_db_opts, path, None::<&str>)
192    }
193
194    /// Opens a database with the given database options and column family names.
195    ///
196    /// Column families opened using this function will be created with default `Options`.
197    pub fn open_cf<P, I, N>(
198        opts: &Options,
199        txn_db_opts: &TransactionDBOptions,
200        path: P,
201        cfs: I,
202    ) -> Result<Self, Error>
203    where
204        P: AsRef<Path>,
205        I: IntoIterator<Item = N>,
206        N: AsRef<str>,
207    {
208        let cfs = cfs
209            .into_iter()
210            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
211
212        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
213    }
214
215    /// Opens a database with the given database options and column family descriptors.
216    pub fn open_cf_descriptors<P, I>(
217        opts: &Options,
218        txn_db_opts: &TransactionDBOptions,
219        path: P,
220        cfs: I,
221    ) -> Result<Self, Error>
222    where
223        P: AsRef<Path>,
224        I: IntoIterator<Item = ColumnFamilyDescriptor>,
225    {
226        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
227    }
228
229    /// Internal implementation for opening RocksDB.
230    fn open_cf_descriptors_internal<P, I>(
231        opts: &Options,
232        txn_db_opts: &TransactionDBOptions,
233        path: P,
234        cfs: I,
235    ) -> Result<Self, Error>
236    where
237        P: AsRef<Path>,
238        I: IntoIterator<Item = ColumnFamilyDescriptor>,
239    {
240        let cfs: Vec<_> = cfs.into_iter().collect();
241        let outlive = iter::once(opts.outlive.clone())
242            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
243            .collect();
244
245        let cpath = to_cpath(&path)?;
246
247        if let Err(e) = fs::create_dir_all(&path) {
248            return Err(Error::new(format!(
249                "Failed to create RocksDB directory: `{e:?}`."
250            )));
251        }
252
253        let db: *mut ffi::rocksdb_transactiondb_t;
254        let mut cf_map = BTreeMap::new();
255
256        if cfs.is_empty() {
257            db = Self::open_raw(opts, txn_db_opts, &cpath)?;
258        } else {
259            let mut cfs_v = cfs;
260            // Always open the default column family.
261            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
262                cfs_v.push(ColumnFamilyDescriptor {
263                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
264                    options: Options::default(),
265                    ttl: ColumnFamilyTtl::SameAsDb, // it will have ttl specified in `DBWithThreadMode::open_with_ttl`
266                });
267            }
268            // We need to store our CStrings in an intermediate vector
269            // so that their pointers remain valid.
270            let c_cfs: Vec<CString> = cfs_v
271                .iter()
272                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
273                .collect();
274
275            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
276
277            // These handles will be populated by DB.
278            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
279
280            let cfopts: Vec<_> = cfs_v
281                .iter()
282                .map(|cf| cf.options.inner.cast_const())
283                .collect();
284
285            db = Self::open_cf_raw(
286                opts,
287                txn_db_opts,
288                &cpath,
289                &cfs_v,
290                &cfnames,
291                &cfopts,
292                &mut cfhandles,
293            )?;
294
295            for handle in &cfhandles {
296                if handle.is_null() {
297                    return Err(Error::new(
298                        "Received null column family handle from DB.".to_owned(),
299                    ));
300                }
301            }
302
303            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
304                cf_map.insert(cf_desc.name.clone(), inner);
305            }
306        }
307
308        if db.is_null() {
309            return Err(Error::new("Could not initialize database.".to_owned()));
310        }
311
312        let prepared = unsafe {
313            let mut cnt = 0;
314            let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
315            let mut vec = vec![std::ptr::null_mut(); cnt];
316            if !ptr.is_null() {
317                std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
318                ffi::rocksdb_free(ptr as *mut c_void);
319            }
320            vec
321        };
322
323        Ok(TransactionDB {
324            inner: db,
325            cfs: T::new_cf_map_internal(cf_map),
326            path: path.as_ref().to_path_buf(),
327            prepared: Mutex::new(prepared),
328            _outlive: outlive,
329        })
330    }
331
332    fn open_raw(
333        opts: &Options,
334        txn_db_opts: &TransactionDBOptions,
335        cpath: &CString,
336    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
337        unsafe {
338            let db = ffi_try!(ffi::rocksdb_transactiondb_open(
339                opts.inner,
340                txn_db_opts.inner,
341                cpath.as_ptr()
342            ));
343            Ok(db)
344        }
345    }
346
347    fn open_cf_raw(
348        opts: &Options,
349        txn_db_opts: &TransactionDBOptions,
350        cpath: &CString,
351        cfs_v: &[ColumnFamilyDescriptor],
352        cfnames: &[*const c_char],
353        cfopts: &[*const ffi::rocksdb_options_t],
354        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
355    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
356        unsafe {
357            let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
358                opts.inner,
359                txn_db_opts.inner,
360                cpath.as_ptr(),
361                cfs_v.len() as c_int,
362                cfnames.as_ptr(),
363                cfopts.as_ptr(),
364                cfhandles.as_mut_ptr(),
365            ));
366            Ok(db)
367        }
368    }
369
370    fn create_inner_cf_handle(
371        &self,
372        name: &str,
373        opts: &Options,
374    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
375        let cf_name = CString::new(name.as_bytes()).map_err(|_| {
376            Error::new("Failed to convert path to CString when creating cf".to_owned())
377        })?;
378
379        Ok(unsafe {
380            ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
381                self.inner,
382                opts.inner,
383                cf_name.as_ptr(),
384            ))
385        })
386    }
387
388    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
389        DB::list_cf(opts, path)
390    }
391
392    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
393        DB::destroy(opts, path)
394    }
395
396    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
397        DB::repair(opts, path)
398    }
399
400    pub fn path(&self) -> &Path {
401        self.path.as_path()
402    }
403
404    /// Creates a transaction with default options.
405    pub fn transaction(&'_ self) -> Transaction<'_, Self> {
406        self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
407    }
408
409    /// Creates a transaction with options.
410    pub fn transaction_opt<'a>(
411        &'a self,
412        write_opts: &WriteOptions,
413        txn_opts: &TransactionOptions,
414    ) -> Transaction<'a, Self> {
415        Transaction {
416            inner: unsafe {
417                ffi::rocksdb_transaction_begin(
418                    self.inner,
419                    write_opts.inner,
420                    txn_opts.inner,
421                    std::ptr::null_mut(),
422                )
423            },
424            _marker: PhantomData,
425        }
426    }
427
428    /// Get all prepared transactions for recovery.
429    ///
430    /// This function is expected to call once after open database.
431    /// User should commit or rollback all transactions before start other transactions.
432    pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
433        self.prepared
434            .lock()
435            .unwrap()
436            .drain(0..)
437            .map(|inner| Transaction {
438                inner,
439                _marker: PhantomData,
440            })
441            .collect()
442    }
443
444    /// Returns the bytes associated with a key value.
445    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
446        self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
447    }
448
449    /// Returns the bytes associated with a key value and the given column family.
450    pub fn get_cf<K: AsRef<[u8]>>(
451        &self,
452        cf: &impl AsColumnFamilyRef,
453        key: K,
454    ) -> Result<Option<Vec<u8>>, Error> {
455        self.get_pinned_cf(cf, key)
456            .map(|x| x.map(|v| v.as_ref().to_vec()))
457    }
458
459    /// Returns the bytes associated with a key value with read options.
460    pub fn get_opt<K: AsRef<[u8]>>(
461        &self,
462        key: K,
463        readopts: &ReadOptions,
464    ) -> Result<Option<Vec<u8>>, Error> {
465        self.get_pinned_opt(key, readopts)
466            .map(|x| x.map(|v| v.as_ref().to_vec()))
467    }
468
469    /// Returns the bytes associated with a key value and the given column family with read options.
470    pub fn get_cf_opt<K: AsRef<[u8]>>(
471        &self,
472        cf: &impl AsColumnFamilyRef,
473        key: K,
474        readopts: &ReadOptions,
475    ) -> Result<Option<Vec<u8>>, Error> {
476        self.get_pinned_cf_opt(cf, key, readopts)
477            .map(|x| x.map(|v| v.as_ref().to_vec()))
478    }
479
480    pub fn get_pinned<K: AsRef<[u8]>>(
481        &'_ self,
482        key: K,
483    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
484        self.get_pinned_opt(key, &ReadOptions::default())
485    }
486
487    /// Returns the bytes associated with a key value and the given column family.
488    pub fn get_pinned_cf<K: AsRef<[u8]>>(
489        &'_ self,
490        cf: &impl AsColumnFamilyRef,
491        key: K,
492    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
493        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
494    }
495
496    /// Returns the bytes associated with a key value with read options.
497    pub fn get_pinned_opt<K: AsRef<[u8]>>(
498        &'_ self,
499        key: K,
500        readopts: &ReadOptions,
501    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
502        let key = key.as_ref();
503        unsafe {
504            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
505                self.inner,
506                readopts.inner,
507                key.as_ptr() as *const c_char,
508                key.len() as size_t,
509            ));
510            if val.is_null() {
511                Ok(None)
512            } else {
513                Ok(Some(DBPinnableSlice::from_c(val)))
514            }
515        }
516    }
517
518    /// Returns the bytes associated with a key value and the given column family with read options.
519    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
520        &'_ self,
521        cf: &impl AsColumnFamilyRef,
522        key: K,
523        readopts: &ReadOptions,
524    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
525        let key = key.as_ref();
526        unsafe {
527            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
528                self.inner,
529                readopts.inner,
530                cf.inner(),
531                key.as_ptr() as *const c_char,
532                key.len() as size_t,
533            ));
534            if val.is_null() {
535                Ok(None)
536            } else {
537                Ok(Some(DBPinnableSlice::from_c(val)))
538            }
539        }
540    }
541
542    /// Return the values associated with the given keys.
543    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
544    where
545        K: AsRef<[u8]>,
546        I: IntoIterator<Item = K>,
547    {
548        self.multi_get_opt(keys, &ReadOptions::default())
549    }
550
551    /// Return the values associated with the given keys using read options.
552    pub fn multi_get_opt<K, I>(
553        &self,
554        keys: I,
555        readopts: &ReadOptions,
556    ) -> Vec<Result<Option<Vec<u8>>, Error>>
557    where
558        K: AsRef<[u8]>,
559        I: IntoIterator<Item = K>,
560    {
561        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
562            .into_iter()
563            .map(|key| {
564                let key = key.as_ref();
565                (Box::from(key), key.len())
566            })
567            .unzip();
568        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
569
570        let mut values = vec![ptr::null_mut(); keys.len()];
571        let mut values_sizes = vec![0_usize; keys.len()];
572        let mut errors = vec![ptr::null_mut(); keys.len()];
573        unsafe {
574            ffi::rocksdb_transactiondb_multi_get(
575                self.inner,
576                readopts.inner,
577                ptr_keys.len(),
578                ptr_keys.as_ptr(),
579                keys_sizes.as_ptr(),
580                values.as_mut_ptr(),
581                values_sizes.as_mut_ptr(),
582                errors.as_mut_ptr(),
583            );
584        }
585
586        convert_values(values, values_sizes, errors)
587    }
588
589    /// Return the values associated with the given keys and column families.
590    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
591        &'a self,
592        keys: I,
593    ) -> Vec<Result<Option<Vec<u8>>, Error>>
594    where
595        K: AsRef<[u8]>,
596        I: IntoIterator<Item = (&'b W, K)>,
597        W: 'b + AsColumnFamilyRef,
598    {
599        self.multi_get_cf_opt(keys, &ReadOptions::default())
600    }
601
602    /// Return the values associated with the given keys and column families using read options.
603    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
604        &'a self,
605        keys: I,
606        readopts: &ReadOptions,
607    ) -> Vec<Result<Option<Vec<u8>>, Error>>
608    where
609        K: AsRef<[u8]>,
610        I: IntoIterator<Item = (&'b W, K)>,
611        W: 'b + AsColumnFamilyRef,
612    {
613        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
614            .into_iter()
615            .map(|(cf, key)| {
616                let key = key.as_ref();
617                ((cf, Box::from(key)), key.len())
618            })
619            .unzip();
620        let ptr_keys: Vec<_> = cfs_and_keys
621            .iter()
622            .map(|(_, k)| k.as_ptr() as *const c_char)
623            .collect();
624        let ptr_cfs: Vec<_> = cfs_and_keys
625            .iter()
626            .map(|(c, _)| c.inner().cast_const())
627            .collect();
628
629        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
630        let mut values_sizes = vec![0_usize; ptr_keys.len()];
631        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
632        unsafe {
633            ffi::rocksdb_transactiondb_multi_get_cf(
634                self.inner,
635                readopts.inner,
636                ptr_cfs.as_ptr(),
637                ptr_keys.len(),
638                ptr_keys.as_ptr(),
639                keys_sizes.as_ptr(),
640                values.as_mut_ptr(),
641                values_sizes.as_mut_ptr(),
642                errors.as_mut_ptr(),
643            );
644        }
645
646        convert_values(values, values_sizes, errors)
647    }
648
649    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
650    where
651        K: AsRef<[u8]>,
652        V: AsRef<[u8]>,
653    {
654        self.put_opt(key, value, &WriteOptions::default())
655    }
656
657    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
658    where
659        K: AsRef<[u8]>,
660        V: AsRef<[u8]>,
661    {
662        self.put_cf_opt(cf, key, value, &WriteOptions::default())
663    }
664
665    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
666    where
667        K: AsRef<[u8]>,
668        V: AsRef<[u8]>,
669    {
670        let key = key.as_ref();
671        let value = value.as_ref();
672        unsafe {
673            ffi_try!(ffi::rocksdb_transactiondb_put(
674                self.inner,
675                writeopts.inner,
676                key.as_ptr() as *const c_char,
677                key.len() as size_t,
678                value.as_ptr() as *const c_char,
679                value.len() as size_t
680            ));
681        }
682        Ok(())
683    }
684
685    pub fn put_cf_opt<K, V>(
686        &self,
687        cf: &impl AsColumnFamilyRef,
688        key: K,
689        value: V,
690        writeopts: &WriteOptions,
691    ) -> Result<(), Error>
692    where
693        K: AsRef<[u8]>,
694        V: AsRef<[u8]>,
695    {
696        let key = key.as_ref();
697        let value = value.as_ref();
698        unsafe {
699            ffi_try!(ffi::rocksdb_transactiondb_put_cf(
700                self.inner,
701                writeopts.inner,
702                cf.inner(),
703                key.as_ptr() as *const c_char,
704                key.len() as size_t,
705                value.as_ptr() as *const c_char,
706                value.len() as size_t
707            ));
708        }
709        Ok(())
710    }
711
712    pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
713        self.write_opt(batch, &WriteOptions::default())
714    }
715
716    pub fn write_opt(
717        &self,
718        batch: &WriteBatchWithTransaction<true>,
719        writeopts: &WriteOptions,
720    ) -> Result<(), Error> {
721        unsafe {
722            ffi_try!(ffi::rocksdb_transactiondb_write(
723                self.inner,
724                writeopts.inner,
725                batch.inner
726            ));
727        }
728        Ok(())
729    }
730
731    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
732    where
733        K: AsRef<[u8]>,
734        V: AsRef<[u8]>,
735    {
736        self.merge_opt(key, value, &WriteOptions::default())
737    }
738
739    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
740    where
741        K: AsRef<[u8]>,
742        V: AsRef<[u8]>,
743    {
744        self.merge_cf_opt(cf, key, value, &WriteOptions::default())
745    }
746
747    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
748    where
749        K: AsRef<[u8]>,
750        V: AsRef<[u8]>,
751    {
752        let key = key.as_ref();
753        let value = value.as_ref();
754        unsafe {
755            ffi_try!(ffi::rocksdb_transactiondb_merge(
756                self.inner,
757                writeopts.inner,
758                key.as_ptr() as *const c_char,
759                key.len() as size_t,
760                value.as_ptr() as *const c_char,
761                value.len() as size_t,
762            ));
763            Ok(())
764        }
765    }
766
767    pub fn merge_cf_opt<K, V>(
768        &self,
769        cf: &impl AsColumnFamilyRef,
770        key: K,
771        value: V,
772        writeopts: &WriteOptions,
773    ) -> Result<(), Error>
774    where
775        K: AsRef<[u8]>,
776        V: AsRef<[u8]>,
777    {
778        let key = key.as_ref();
779        let value = value.as_ref();
780        unsafe {
781            ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
782                self.inner,
783                writeopts.inner,
784                cf.inner(),
785                key.as_ptr() as *const c_char,
786                key.len() as size_t,
787                value.as_ptr() as *const c_char,
788                value.len() as size_t,
789            ));
790            Ok(())
791        }
792    }
793
794    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
795        self.delete_opt(key, &WriteOptions::default())
796    }
797
798    pub fn delete_cf<K: AsRef<[u8]>>(
799        &self,
800        cf: &impl AsColumnFamilyRef,
801        key: K,
802    ) -> Result<(), Error> {
803        self.delete_cf_opt(cf, key, &WriteOptions::default())
804    }
805
806    pub fn delete_opt<K: AsRef<[u8]>>(
807        &self,
808        key: K,
809        writeopts: &WriteOptions,
810    ) -> Result<(), Error> {
811        let key = key.as_ref();
812        unsafe {
813            ffi_try!(ffi::rocksdb_transactiondb_delete(
814                self.inner,
815                writeopts.inner,
816                key.as_ptr() as *const c_char,
817                key.len() as size_t,
818            ));
819        }
820        Ok(())
821    }
822
823    pub fn delete_cf_opt<K: AsRef<[u8]>>(
824        &self,
825        cf: &impl AsColumnFamilyRef,
826        key: K,
827        writeopts: &WriteOptions,
828    ) -> Result<(), Error> {
829        let key = key.as_ref();
830        unsafe {
831            ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
832                self.inner,
833                writeopts.inner,
834                cf.inner(),
835                key.as_ptr() as *const c_char,
836                key.len() as size_t,
837            ));
838        }
839        Ok(())
840    }
841
842    pub fn iterator<'a: 'b, 'b>(
843        &'a self,
844        mode: IteratorMode,
845    ) -> DBIteratorWithThreadMode<'b, Self> {
846        let readopts = ReadOptions::default();
847        self.iterator_opt(mode, readopts)
848    }
849
850    pub fn iterator_opt<'a: 'b, 'b>(
851        &'a self,
852        mode: IteratorMode,
853        readopts: ReadOptions,
854    ) -> DBIteratorWithThreadMode<'b, Self> {
855        DBIteratorWithThreadMode::new(self, readopts, mode)
856    }
857
858    /// Opens an iterator using the provided ReadOptions.
859    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
860    pub fn iterator_cf_opt<'a: 'b, 'b>(
861        &'a self,
862        cf_handle: &impl AsColumnFamilyRef,
863        readopts: ReadOptions,
864        mode: IteratorMode,
865    ) -> DBIteratorWithThreadMode<'b, Self> {
866        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
867    }
868
869    /// Opens an iterator with `set_total_order_seek` enabled.
870    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
871    /// with a Hash-based implementation.
872    pub fn full_iterator<'a: 'b, 'b>(
873        &'a self,
874        mode: IteratorMode,
875    ) -> DBIteratorWithThreadMode<'b, Self> {
876        let mut opts = ReadOptions::default();
877        opts.set_total_order_seek(true);
878        DBIteratorWithThreadMode::new(self, opts, mode)
879    }
880
881    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
882        &'a self,
883        prefix: P,
884    ) -> DBIteratorWithThreadMode<'b, Self> {
885        let mut opts = ReadOptions::default();
886        opts.set_prefix_same_as_start(true);
887        DBIteratorWithThreadMode::new(
888            self,
889            opts,
890            IteratorMode::From(prefix.as_ref(), Direction::Forward),
891        )
892    }
893
894    pub fn iterator_cf<'a: 'b, 'b>(
895        &'a self,
896        cf_handle: &impl AsColumnFamilyRef,
897        mode: IteratorMode,
898    ) -> DBIteratorWithThreadMode<'b, Self> {
899        let opts = ReadOptions::default();
900        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
901    }
902
903    pub fn full_iterator_cf<'a: 'b, 'b>(
904        &'a self,
905        cf_handle: &impl AsColumnFamilyRef,
906        mode: IteratorMode,
907    ) -> DBIteratorWithThreadMode<'b, Self> {
908        let mut opts = ReadOptions::default();
909        opts.set_total_order_seek(true);
910        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
911    }
912
913    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
914        &'a self,
915        cf_handle: &impl AsColumnFamilyRef,
916        prefix: P,
917    ) -> DBIteratorWithThreadMode<'a, Self> {
918        let mut opts = ReadOptions::default();
919        opts.set_prefix_same_as_start(true);
920        DBIteratorWithThreadMode::<'a, Self>::new_cf(
921            self,
922            cf_handle.inner(),
923            opts,
924            IteratorMode::From(prefix.as_ref(), Direction::Forward),
925        )
926    }
927
928    /// Opens a raw iterator over the database, using the default read options
929    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
930        let opts = ReadOptions::default();
931        DBRawIteratorWithThreadMode::new(self, opts)
932    }
933
934    /// Opens a raw iterator over the given column family, using the default read options
935    pub fn raw_iterator_cf<'a: 'b, 'b>(
936        &'a self,
937        cf_handle: &impl AsColumnFamilyRef,
938    ) -> DBRawIteratorWithThreadMode<'b, Self> {
939        let opts = ReadOptions::default();
940        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
941    }
942
943    /// Opens a raw iterator over the database, using the given read options
944    pub fn raw_iterator_opt<'a: 'b, 'b>(
945        &'a self,
946        readopts: ReadOptions,
947    ) -> DBRawIteratorWithThreadMode<'b, Self> {
948        DBRawIteratorWithThreadMode::new(self, readopts)
949    }
950
951    /// Opens a raw iterator over the given column family, using the given read options
952    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
953        &'a self,
954        cf_handle: &impl AsColumnFamilyRef,
955        readopts: ReadOptions,
956    ) -> DBRawIteratorWithThreadMode<'b, Self> {
957        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
958    }
959
960    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
961        SnapshotWithThreadMode::<Self>::new(self)
962    }
963
964    fn drop_column_family<C>(
965        &self,
966        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
967        _cf: C,
968    ) -> Result<(), Error> {
969        unsafe {
970            // first mark the column family as dropped
971            ffi_try!(ffi::rocksdb_drop_column_family(
972                self.inner as *mut ffi::rocksdb_t,
973                cf_inner
974            ));
975        }
976        // Since `_cf` is dropped here, the column family handle is destroyed
977        // and any resources (mem, files) are reclaimed.
978        Ok(())
979    }
980}
981
982impl TransactionDB<SingleThreaded> {
983    /// Creates column family with given name and options.
984    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
985        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
986        self.cfs
987            .cfs
988            .insert(name.as_ref().to_string(), ColumnFamily { inner });
989        Ok(())
990    }
991
992    /// Returns the underlying column family handle.
993    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
994        self.cfs.cfs.get(name)
995    }
996
997    /// Drops the column family with the given name
998    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
999        match self.cfs.cfs.remove(name) {
1000            Some(cf) => self.drop_column_family(cf.inner, cf),
1001            _ => Err(Error::new(format!("Invalid column family: {name}"))),
1002        }
1003    }
1004}
1005
1006impl TransactionDB<MultiThreaded> {
1007    /// Creates column family with given name and options.
1008    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1009        // Note that we acquire the cfs lock before inserting: otherwise we might race
1010        // another caller who observed the handle as missing.
1011        let mut cfs = self.cfs.cfs.write();
1012        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1013        cfs.insert(
1014            name.as_ref().to_string(),
1015            Arc::new(UnboundColumnFamily { inner }),
1016        );
1017        Ok(())
1018    }
1019
1020    /// Returns the underlying column family handle.
1021    pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
1022        self.cfs
1023            .cfs
1024            .read()
1025            .get(name)
1026            .cloned()
1027            .map(UnboundColumnFamily::bound_column_family)
1028    }
1029
1030    /// Drops the column family with the given name by internally locking the inner column
1031    /// family map. This avoids needing `&mut self` reference
1032    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1033        match self.cfs.cfs.write().remove(name) {
1034            Some(cf) => self.drop_column_family(cf.inner, cf),
1035            _ => Err(Error::new(format!("Invalid column family: {name}"))),
1036        }
1037    }
1038
1039    /// Implementation for property_value et al methods.
1040    ///
1041    /// `name` is the name of the property.  It will be converted into a CString
1042    /// and passed to `get_property` as argument.  `get_property` reads the
1043    /// specified property and either returns NULL or a pointer to a C allocated
1044    /// string; this method takes ownership of that string and will free it at
1045    /// the end. That string is parsed using `parse` callback which produces
1046    /// the returned result.
1047    fn property_value_impl<R>(
1048        name: impl CStrLike,
1049        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1050        parse: impl FnOnce(&str) -> Result<R, Error>,
1051    ) -> Result<Option<R>, Error> {
1052        let value = match name.bake() {
1053            Ok(prop_name) => get_property(prop_name.as_ptr()),
1054            Err(e) => {
1055                return Err(Error::new(format!(
1056                    "Failed to convert property name to CString: {e}"
1057                )));
1058            }
1059        };
1060        if value.is_null() {
1061            return Ok(None);
1062        }
1063        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1064            Ok(s) => parse(s).map(|value| Some(value)),
1065            Err(e) => Err(Error::new(format!(
1066                "Failed to convert property value to string: {e}"
1067            ))),
1068        };
1069        unsafe {
1070            ffi::rocksdb_free(value as *mut c_void);
1071        }
1072        result
1073    }
1074
1075    /// Retrieves a RocksDB property by name.
1076    ///
1077    /// Full list of properties could be find
1078    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1079    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1080        Self::property_value_impl(
1081            name,
1082            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1083            |str_value| Ok(str_value.to_owned()),
1084        )
1085    }
1086
1087    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1088        value.parse::<u64>().map_err(|err| {
1089            Error::new(format!(
1090                "Failed to convert property value {value} to int: {err}"
1091            ))
1092        })
1093    }
1094
1095    /// Retrieves a RocksDB property and casts it to an integer.
1096    ///
1097    /// Full list of properties that return int values could be find
1098    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1099    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1100        Self::property_value_impl(
1101            name,
1102            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1103            Self::parse_property_int_value,
1104        )
1105    }
1106}
1107
1108impl<T: ThreadMode> Drop for TransactionDB<T> {
1109    fn drop(&mut self) {
1110        unsafe {
1111            self.prepared_transactions().clear();
1112            self.cfs.drop_all_cfs_internal();
1113            ffi::rocksdb_transactiondb_close(self.inner);
1114        }
1115    }
1116}