rust_rocksdb/transactions/
transaction_db.rs

1// Copyright 2021 Yiyuan Liu
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::{
17    collections::BTreeMap,
18    ffi::CString,
19    fs, iter,
20    marker::PhantomData,
21    path::{Path, PathBuf},
22    ptr,
23    sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31    column_family::UnboundColumnFamily,
32    db::{convert_values, DBAccess},
33    db_options::OptionsMustOutliveDB,
34    ffi,
35    ffi_util::to_cpath,
36    AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
37    DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
38    IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
39    ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
40    WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_void, size_t};
44
45// Default options are kept per-thread to avoid re-allocating on every call while
46// also preventing cross-thread sharing. Some RocksDB option wrappers hold
47// pointers into internal buffers and are not safe to share across threads.
48// Using thread_local allows cheap reuse in the common "default options" path
49// without synchronization overhead. Callers who need non-defaults must pass
50// explicit options.
51thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
52thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
53
54#[cfg(not(feature = "multi-threaded-cf"))]
55type DefaultThreadMode = crate::SingleThreaded;
56#[cfg(feature = "multi-threaded-cf")]
57type DefaultThreadMode = crate::MultiThreaded;
58
59/// RocksDB TransactionDB.
60///
61/// Please read the official [guide](https://github.com/facebook/rocksdb/wiki/Transactions)
62/// to learn more about RocksDB TransactionDB.
63///
64/// The default thread mode for [`TransactionDB`] is [`SingleThreaded`]
65/// if feature `multi-threaded-cf` is not enabled.
66///
67/// ```
68/// use rust_rocksdb::{DB, Options, TransactionDB, SingleThreaded};
69/// let tempdir = tempfile::Builder::new()
70///     .prefix("_path_for_transaction_db")
71///     .tempdir()
72///     .expect("Failed to create temporary path for the _path_for_transaction_db");
73/// let path = tempdir.path();
74/// {
75///     let db: TransactionDB = TransactionDB::open_default(path).unwrap();
76///     db.put(b"my key", b"my value").unwrap();
77///
78///     // create transaction
79///     let txn = db.transaction();
80///     txn.put(b"key2", b"value2");
81///     txn.put(b"key3", b"value3");
82///     txn.commit().unwrap();
83/// }
84/// let _ = DB::destroy(&Options::default(), path);
85/// ```
86///
87/// [`SingleThreaded`]: crate::SingleThreaded
88pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
89    pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
90    cfs: T,
91    path: PathBuf,
92    // prepared 2pc transactions.
93    prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
94    _outlive: Vec<OptionsMustOutliveDB>,
95}
96
97unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
98unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
99
100impl<T: ThreadMode> DBAccess for TransactionDB<T> {
101    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
102        unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
103    }
104
105    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
106        unsafe {
107            ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
108        }
109    }
110
111    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
112        unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
113    }
114
115    unsafe fn create_iterator_cf(
116        &self,
117        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
118        readopts: &ReadOptions,
119    ) -> *mut ffi::rocksdb_iterator_t {
120        unsafe {
121            ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
122        }
123    }
124
125    fn get_opt<K: AsRef<[u8]>>(
126        &self,
127        key: K,
128        readopts: &ReadOptions,
129    ) -> Result<Option<Vec<u8>>, Error> {
130        self.get_opt(key, readopts)
131    }
132
133    fn get_cf_opt<K: AsRef<[u8]>>(
134        &self,
135        cf: &impl AsColumnFamilyRef,
136        key: K,
137        readopts: &ReadOptions,
138    ) -> Result<Option<Vec<u8>>, Error> {
139        self.get_cf_opt(cf, key, readopts)
140    }
141
142    fn get_pinned_opt<K: AsRef<[u8]>>(
143        &'_ self,
144        key: K,
145        readopts: &ReadOptions,
146    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
147        self.get_pinned_opt(key, readopts)
148    }
149
150    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
151        &'_ self,
152        cf: &impl AsColumnFamilyRef,
153        key: K,
154        readopts: &ReadOptions,
155    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
156        self.get_pinned_cf_opt(cf, key, readopts)
157    }
158
159    fn multi_get_opt<K, I>(
160        &self,
161        keys: I,
162        readopts: &ReadOptions,
163    ) -> Vec<Result<Option<Vec<u8>>, Error>>
164    where
165        K: AsRef<[u8]>,
166        I: IntoIterator<Item = K>,
167    {
168        self.multi_get_opt(keys, readopts)
169    }
170
171    fn multi_get_cf_opt<'b, K, I, W>(
172        &self,
173        keys_cf: I,
174        readopts: &ReadOptions,
175    ) -> Vec<Result<Option<Vec<u8>>, Error>>
176    where
177        K: AsRef<[u8]>,
178        I: IntoIterator<Item = (&'b W, K)>,
179        W: AsColumnFamilyRef + 'b,
180    {
181        self.multi_get_cf_opt(keys_cf, readopts)
182    }
183}
184
185impl<T: ThreadMode> TransactionDB<T> {
186    /// Opens a database with default options.
187    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
188        let mut opts = Options::default();
189        opts.create_if_missing(true);
190        let txn_db_opts = TransactionDBOptions::default();
191        Self::open(&opts, &txn_db_opts, path)
192    }
193
194    /// Opens the database with the specified options.
195    pub fn open<P: AsRef<Path>>(
196        opts: &Options,
197        txn_db_opts: &TransactionDBOptions,
198        path: P,
199    ) -> Result<Self, Error> {
200        Self::open_cf(opts, txn_db_opts, path, None::<&str>)
201    }
202
203    /// Opens a database with the given database options and column family names.
204    ///
205    /// Column families opened using this function will be created with default `Options`.
206    pub fn open_cf<P, I, N>(
207        opts: &Options,
208        txn_db_opts: &TransactionDBOptions,
209        path: P,
210        cfs: I,
211    ) -> Result<Self, Error>
212    where
213        P: AsRef<Path>,
214        I: IntoIterator<Item = N>,
215        N: AsRef<str>,
216    {
217        let cfs = cfs
218            .into_iter()
219            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
220
221        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
222    }
223
224    /// Opens a database with the given database options and column family descriptors.
225    pub fn open_cf_descriptors<P, I>(
226        opts: &Options,
227        txn_db_opts: &TransactionDBOptions,
228        path: P,
229        cfs: I,
230    ) -> Result<Self, Error>
231    where
232        P: AsRef<Path>,
233        I: IntoIterator<Item = ColumnFamilyDescriptor>,
234    {
235        Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
236    }
237
238    /// Internal implementation for opening RocksDB.
239    fn open_cf_descriptors_internal<P, I>(
240        opts: &Options,
241        txn_db_opts: &TransactionDBOptions,
242        path: P,
243        cfs: I,
244    ) -> Result<Self, Error>
245    where
246        P: AsRef<Path>,
247        I: IntoIterator<Item = ColumnFamilyDescriptor>,
248    {
249        let cfs: Vec<_> = cfs.into_iter().collect();
250        let outlive = iter::once(opts.outlive.clone())
251            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
252            .collect();
253
254        let cpath = to_cpath(&path)?;
255
256        if let Err(e) = fs::create_dir_all(&path) {
257            return Err(Error::new(format!(
258                "Failed to create RocksDB directory: `{e:?}`."
259            )));
260        }
261
262        let db: *mut ffi::rocksdb_transactiondb_t;
263        let mut cf_map = BTreeMap::new();
264
265        if cfs.is_empty() {
266            db = Self::open_raw(opts, txn_db_opts, &cpath)?;
267        } else {
268            let mut cfs_v = cfs;
269            // Always open the default column family.
270            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
271                cfs_v.push(ColumnFamilyDescriptor {
272                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
273                    options: Options::default(),
274                    ttl: ColumnFamilyTtl::SameAsDb, // it will have ttl specified in `DBWithThreadMode::open_with_ttl`
275                });
276            }
277            // We need to store our CStrings in an intermediate vector
278            // so that their pointers remain valid.
279            let c_cfs: Vec<CString> = cfs_v
280                .iter()
281                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
282                .collect();
283
284            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
285
286            // These handles will be populated by DB.
287            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
288
289            let cfopts: Vec<_> = cfs_v
290                .iter()
291                .map(|cf| cf.options.inner.cast_const())
292                .collect();
293
294            db = Self::open_cf_raw(
295                opts,
296                txn_db_opts,
297                &cpath,
298                &cfs_v,
299                &cfnames,
300                &cfopts,
301                &mut cfhandles,
302            )?;
303
304            for handle in &cfhandles {
305                if handle.is_null() {
306                    return Err(Error::new(
307                        "Received null column family handle from DB.".to_owned(),
308                    ));
309                }
310            }
311
312            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
313                cf_map.insert(cf_desc.name.clone(), inner);
314            }
315        }
316
317        if db.is_null() {
318            return Err(Error::new("Could not initialize database.".to_owned()));
319        }
320
321        let prepared = unsafe {
322            let mut cnt = 0;
323            let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
324            let mut vec = vec![std::ptr::null_mut(); cnt];
325            if !ptr.is_null() {
326                std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
327                ffi::rocksdb_free(ptr as *mut c_void);
328            }
329            vec
330        };
331
332        Ok(TransactionDB {
333            inner: db,
334            cfs: T::new_cf_map_internal(cf_map),
335            path: path.as_ref().to_path_buf(),
336            prepared: Mutex::new(prepared),
337            _outlive: outlive,
338        })
339    }
340
341    fn open_raw(
342        opts: &Options,
343        txn_db_opts: &TransactionDBOptions,
344        cpath: &CString,
345    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
346        unsafe {
347            let db = ffi_try!(ffi::rocksdb_transactiondb_open(
348                opts.inner,
349                txn_db_opts.inner,
350                cpath.as_ptr()
351            ));
352            Ok(db)
353        }
354    }
355
356    fn open_cf_raw(
357        opts: &Options,
358        txn_db_opts: &TransactionDBOptions,
359        cpath: &CString,
360        cfs_v: &[ColumnFamilyDescriptor],
361        cfnames: &[*const c_char],
362        cfopts: &[*const ffi::rocksdb_options_t],
363        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
364    ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
365        unsafe {
366            let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
367                opts.inner,
368                txn_db_opts.inner,
369                cpath.as_ptr(),
370                cfs_v.len() as c_int,
371                cfnames.as_ptr(),
372                cfopts.as_ptr(),
373                cfhandles.as_mut_ptr(),
374            ));
375            Ok(db)
376        }
377    }
378
379    fn create_inner_cf_handle(
380        &self,
381        name: &str,
382        opts: &Options,
383    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
384        let cf_name = CString::new(name.as_bytes()).map_err(|_| {
385            Error::new("Failed to convert path to CString when creating cf".to_owned())
386        })?;
387
388        Ok(unsafe {
389            ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
390                self.inner,
391                opts.inner,
392                cf_name.as_ptr(),
393            ))
394        })
395    }
396
397    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
398        DB::list_cf(opts, path)
399    }
400
401    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
402        DB::destroy(opts, path)
403    }
404
405    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
406        DB::repair(opts, path)
407    }
408
409    pub fn path(&self) -> &Path {
410        self.path.as_path()
411    }
412
413    /// Creates a transaction with default options.
414    pub fn transaction(&'_ self) -> Transaction<'_, Self> {
415        DEFAULT_WRITE_OPTS.with(|opts| self.transaction_opt(opts, &TransactionOptions::default()))
416    }
417
418    /// Creates a transaction with options.
419    pub fn transaction_opt<'a>(
420        &'a self,
421        write_opts: &WriteOptions,
422        txn_opts: &TransactionOptions,
423    ) -> Transaction<'a, Self> {
424        Transaction {
425            inner: unsafe {
426                ffi::rocksdb_transaction_begin(
427                    self.inner,
428                    write_opts.inner,
429                    txn_opts.inner,
430                    std::ptr::null_mut(),
431                )
432            },
433            _marker: PhantomData,
434        }
435    }
436
437    /// Get all prepared transactions for recovery.
438    ///
439    /// This function is expected to call once after open database.
440    /// User should commit or rollback all transactions before start other transactions.
441    pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
442        self.prepared
443            .lock()
444            .unwrap()
445            .drain(0..)
446            .map(|inner| Transaction {
447                inner,
448                _marker: PhantomData,
449            })
450            .collect()
451    }
452
453    /// Returns the bytes associated with a key value.
454    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
455        self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
456    }
457
458    /// Returns the bytes associated with a key value and the given column family.
459    pub fn get_cf<K: AsRef<[u8]>>(
460        &self,
461        cf: &impl AsColumnFamilyRef,
462        key: K,
463    ) -> Result<Option<Vec<u8>>, Error> {
464        self.get_pinned_cf(cf, key)
465            .map(|x| x.map(|v| v.as_ref().to_vec()))
466    }
467
468    /// Returns the bytes associated with a key value with read options.
469    pub fn get_opt<K: AsRef<[u8]>>(
470        &self,
471        key: K,
472        readopts: &ReadOptions,
473    ) -> Result<Option<Vec<u8>>, Error> {
474        self.get_pinned_opt(key, readopts)
475            .map(|x| x.map(|v| v.as_ref().to_vec()))
476    }
477
478    /// Returns the bytes associated with a key value and the given column family with read options.
479    pub fn get_cf_opt<K: AsRef<[u8]>>(
480        &self,
481        cf: &impl AsColumnFamilyRef,
482        key: K,
483        readopts: &ReadOptions,
484    ) -> Result<Option<Vec<u8>>, Error> {
485        self.get_pinned_cf_opt(cf, key, readopts)
486            .map(|x| x.map(|v| v.as_ref().to_vec()))
487    }
488
489    pub fn get_pinned<K: AsRef<[u8]>>(
490        &'_ self,
491        key: K,
492    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
493        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
494    }
495
496    /// Returns the bytes associated with a key value and the given column family.
497    pub fn get_pinned_cf<K: AsRef<[u8]>>(
498        &'_ self,
499        cf: &impl AsColumnFamilyRef,
500        key: K,
501    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
502        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
503    }
504
505    /// Returns the bytes associated with a key value with read options.
506    pub fn get_pinned_opt<K: AsRef<[u8]>>(
507        &'_ self,
508        key: K,
509        readopts: &ReadOptions,
510    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
511        let key = key.as_ref();
512        unsafe {
513            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
514                self.inner,
515                readopts.inner,
516                key.as_ptr() as *const c_char,
517                key.len() as size_t,
518            ));
519            if val.is_null() {
520                Ok(None)
521            } else {
522                Ok(Some(DBPinnableSlice::from_c(val)))
523            }
524        }
525    }
526
527    /// Returns the bytes associated with a key value and the given column family with read options.
528    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
529        &'_ self,
530        cf: &impl AsColumnFamilyRef,
531        key: K,
532        readopts: &ReadOptions,
533    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
534        let key = key.as_ref();
535        unsafe {
536            let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
537                self.inner,
538                readopts.inner,
539                cf.inner(),
540                key.as_ptr() as *const c_char,
541                key.len() as size_t,
542            ));
543            if val.is_null() {
544                Ok(None)
545            } else {
546                Ok(Some(DBPinnableSlice::from_c(val)))
547            }
548        }
549    }
550
551    /// Return the values associated with the given keys.
552    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
553    where
554        K: AsRef<[u8]>,
555        I: IntoIterator<Item = K>,
556    {
557        DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
558    }
559
560    /// Return the values associated with the given keys using read options.
561    pub fn multi_get_opt<K, I>(
562        &self,
563        keys: I,
564        readopts: &ReadOptions,
565    ) -> Vec<Result<Option<Vec<u8>>, Error>>
566    where
567        K: AsRef<[u8]>,
568        I: IntoIterator<Item = K>,
569    {
570        let owned_keys: Vec<K> = keys.into_iter().collect();
571        let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
572        let ptr_keys: Vec<*const c_char> = owned_keys
573            .iter()
574            .map(|k| k.as_ref().as_ptr() as *const c_char)
575            .collect();
576
577        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
578        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
579        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
580        unsafe {
581            ffi::rocksdb_transactiondb_multi_get(
582                self.inner,
583                readopts.inner,
584                ptr_keys.len(),
585                ptr_keys.as_ptr(),
586                keys_sizes.as_ptr(),
587                values.as_mut_ptr(),
588                values_sizes.as_mut_ptr(),
589                errors.as_mut_ptr(),
590            );
591        }
592
593        unsafe {
594            values.set_len(ptr_keys.len());
595            values_sizes.set_len(ptr_keys.len());
596            errors.set_len(ptr_keys.len());
597        }
598
599        convert_values(values, values_sizes, errors)
600    }
601
602    /// Return the values associated with the given keys and column families.
603    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
604        &'a self,
605        keys: I,
606    ) -> Vec<Result<Option<Vec<u8>>, Error>>
607    where
608        K: AsRef<[u8]>,
609        I: IntoIterator<Item = (&'b W, K)>,
610        W: 'b + AsColumnFamilyRef,
611    {
612        DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
613    }
614
615    /// Return the values associated with the given keys and column families using read options.
616    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
617        &'a self,
618        keys: I,
619        readopts: &ReadOptions,
620    ) -> Vec<Result<Option<Vec<u8>>, Error>>
621    where
622        K: AsRef<[u8]>,
623        I: IntoIterator<Item = (&'b W, K)>,
624        W: 'b + AsColumnFamilyRef,
625    {
626        let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
627        let keys_sizes: Vec<usize> = cfs_and_owned_keys
628            .iter()
629            .map(|(_, k)| k.as_ref().len())
630            .collect();
631        let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
632            .iter()
633            .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
634            .collect();
635        let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
636            .iter()
637            .map(|(c, _)| c.inner().cast_const())
638            .collect();
639        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
640        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
641        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
642        unsafe {
643            ffi::rocksdb_transactiondb_multi_get_cf(
644                self.inner,
645                readopts.inner,
646                ptr_cfs.as_ptr(),
647                ptr_keys.len(),
648                ptr_keys.as_ptr(),
649                keys_sizes.as_ptr(),
650                values.as_mut_ptr(),
651                values_sizes.as_mut_ptr(),
652                errors.as_mut_ptr(),
653            );
654        }
655
656        unsafe {
657            values.set_len(ptr_keys.len());
658            values_sizes.set_len(ptr_keys.len());
659            errors.set_len(ptr_keys.len());
660        }
661
662        convert_values(values, values_sizes, errors)
663    }
664
665    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
666    where
667        K: AsRef<[u8]>,
668        V: AsRef<[u8]>,
669    {
670        DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
671    }
672
673    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
674    where
675        K: AsRef<[u8]>,
676        V: AsRef<[u8]>,
677    {
678        DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
679    }
680
681    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
682    where
683        K: AsRef<[u8]>,
684        V: AsRef<[u8]>,
685    {
686        let key = key.as_ref();
687        let value = value.as_ref();
688        unsafe {
689            ffi_try!(ffi::rocksdb_transactiondb_put(
690                self.inner,
691                writeopts.inner,
692                key.as_ptr() as *const c_char,
693                key.len() as size_t,
694                value.as_ptr() as *const c_char,
695                value.len() as size_t
696            ));
697        }
698        Ok(())
699    }
700
701    pub fn put_cf_opt<K, V>(
702        &self,
703        cf: &impl AsColumnFamilyRef,
704        key: K,
705        value: V,
706        writeopts: &WriteOptions,
707    ) -> Result<(), Error>
708    where
709        K: AsRef<[u8]>,
710        V: AsRef<[u8]>,
711    {
712        let key = key.as_ref();
713        let value = value.as_ref();
714        unsafe {
715            ffi_try!(ffi::rocksdb_transactiondb_put_cf(
716                self.inner,
717                writeopts.inner,
718                cf.inner(),
719                key.as_ptr() as *const c_char,
720                key.len() as size_t,
721                value.as_ptr() as *const c_char,
722                value.len() as size_t
723            ));
724        }
725        Ok(())
726    }
727
728    pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
729        DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
730    }
731
732    pub fn write_opt(
733        &self,
734        batch: &WriteBatchWithTransaction<true>,
735        writeopts: &WriteOptions,
736    ) -> Result<(), Error> {
737        unsafe {
738            ffi_try!(ffi::rocksdb_transactiondb_write(
739                self.inner,
740                writeopts.inner,
741                batch.inner
742            ));
743        }
744        Ok(())
745    }
746
747    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
748    where
749        K: AsRef<[u8]>,
750        V: AsRef<[u8]>,
751    {
752        DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
753    }
754
755    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
756    where
757        K: AsRef<[u8]>,
758        V: AsRef<[u8]>,
759    {
760        DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
761    }
762
763    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
764    where
765        K: AsRef<[u8]>,
766        V: AsRef<[u8]>,
767    {
768        let key = key.as_ref();
769        let value = value.as_ref();
770        unsafe {
771            ffi_try!(ffi::rocksdb_transactiondb_merge(
772                self.inner,
773                writeopts.inner,
774                key.as_ptr() as *const c_char,
775                key.len() as size_t,
776                value.as_ptr() as *const c_char,
777                value.len() as size_t,
778            ));
779            Ok(())
780        }
781    }
782
783    pub fn merge_cf_opt<K, V>(
784        &self,
785        cf: &impl AsColumnFamilyRef,
786        key: K,
787        value: V,
788        writeopts: &WriteOptions,
789    ) -> Result<(), Error>
790    where
791        K: AsRef<[u8]>,
792        V: AsRef<[u8]>,
793    {
794        let key = key.as_ref();
795        let value = value.as_ref();
796        unsafe {
797            ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
798                self.inner,
799                writeopts.inner,
800                cf.inner(),
801                key.as_ptr() as *const c_char,
802                key.len() as size_t,
803                value.as_ptr() as *const c_char,
804                value.len() as size_t,
805            ));
806            Ok(())
807        }
808    }
809
810    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
811        DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
812    }
813
814    pub fn delete_cf<K: AsRef<[u8]>>(
815        &self,
816        cf: &impl AsColumnFamilyRef,
817        key: K,
818    ) -> Result<(), Error> {
819        DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
820    }
821
822    pub fn delete_opt<K: AsRef<[u8]>>(
823        &self,
824        key: K,
825        writeopts: &WriteOptions,
826    ) -> Result<(), Error> {
827        let key = key.as_ref();
828        unsafe {
829            ffi_try!(ffi::rocksdb_transactiondb_delete(
830                self.inner,
831                writeopts.inner,
832                key.as_ptr() as *const c_char,
833                key.len() as size_t,
834            ));
835        }
836        Ok(())
837    }
838
839    pub fn delete_cf_opt<K: AsRef<[u8]>>(
840        &self,
841        cf: &impl AsColumnFamilyRef,
842        key: K,
843        writeopts: &WriteOptions,
844    ) -> Result<(), Error> {
845        let key = key.as_ref();
846        unsafe {
847            ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
848                self.inner,
849                writeopts.inner,
850                cf.inner(),
851                key.as_ptr() as *const c_char,
852                key.len() as size_t,
853            ));
854        }
855        Ok(())
856    }
857
858    pub fn iterator<'a: 'b, 'b>(
859        &'a self,
860        mode: IteratorMode,
861    ) -> DBIteratorWithThreadMode<'b, Self> {
862        let readopts = ReadOptions::default();
863        self.iterator_opt(mode, readopts)
864    }
865
866    pub fn iterator_opt<'a: 'b, 'b>(
867        &'a self,
868        mode: IteratorMode,
869        readopts: ReadOptions,
870    ) -> DBIteratorWithThreadMode<'b, Self> {
871        DBIteratorWithThreadMode::new(self, readopts, mode)
872    }
873
874    /// Opens an iterator using the provided ReadOptions.
875    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
876    pub fn iterator_cf_opt<'a: 'b, 'b>(
877        &'a self,
878        cf_handle: &impl AsColumnFamilyRef,
879        readopts: ReadOptions,
880        mode: IteratorMode,
881    ) -> DBIteratorWithThreadMode<'b, Self> {
882        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
883    }
884
885    /// Opens an iterator with `set_total_order_seek` enabled.
886    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
887    /// with a Hash-based implementation.
888    pub fn full_iterator<'a: 'b, 'b>(
889        &'a self,
890        mode: IteratorMode,
891    ) -> DBIteratorWithThreadMode<'b, Self> {
892        let mut opts = ReadOptions::default();
893        opts.set_total_order_seek(true);
894        DBIteratorWithThreadMode::new(self, opts, mode)
895    }
896
897    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
898        &'a self,
899        prefix: P,
900    ) -> DBIteratorWithThreadMode<'b, Self> {
901        let mut opts = ReadOptions::default();
902        opts.set_prefix_same_as_start(true);
903        DBIteratorWithThreadMode::new(
904            self,
905            opts,
906            IteratorMode::From(prefix.as_ref(), Direction::Forward),
907        )
908    }
909
910    pub fn iterator_cf<'a: 'b, 'b>(
911        &'a self,
912        cf_handle: &impl AsColumnFamilyRef,
913        mode: IteratorMode,
914    ) -> DBIteratorWithThreadMode<'b, Self> {
915        let opts = ReadOptions::default();
916        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
917    }
918
919    pub fn full_iterator_cf<'a: 'b, 'b>(
920        &'a self,
921        cf_handle: &impl AsColumnFamilyRef,
922        mode: IteratorMode,
923    ) -> DBIteratorWithThreadMode<'b, Self> {
924        let mut opts = ReadOptions::default();
925        opts.set_total_order_seek(true);
926        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
927    }
928
929    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
930        &'a self,
931        cf_handle: &impl AsColumnFamilyRef,
932        prefix: P,
933    ) -> DBIteratorWithThreadMode<'a, Self> {
934        let mut opts = ReadOptions::default();
935        opts.set_prefix_same_as_start(true);
936        DBIteratorWithThreadMode::<'a, Self>::new_cf(
937            self,
938            cf_handle.inner(),
939            opts,
940            IteratorMode::From(prefix.as_ref(), Direction::Forward),
941        )
942    }
943
944    /// Opens a raw iterator over the database, using the default read options
945    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
946        let opts = ReadOptions::default();
947        DBRawIteratorWithThreadMode::new(self, opts)
948    }
949
950    /// Opens a raw iterator over the given column family, using the default read options
951    pub fn raw_iterator_cf<'a: 'b, 'b>(
952        &'a self,
953        cf_handle: &impl AsColumnFamilyRef,
954    ) -> DBRawIteratorWithThreadMode<'b, Self> {
955        let opts = ReadOptions::default();
956        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
957    }
958
959    /// Opens a raw iterator over the database, using the given read options
960    pub fn raw_iterator_opt<'a: 'b, 'b>(
961        &'a self,
962        readopts: ReadOptions,
963    ) -> DBRawIteratorWithThreadMode<'b, Self> {
964        DBRawIteratorWithThreadMode::new(self, readopts)
965    }
966
967    /// Opens a raw iterator over the given column family, using the given read options
968    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
969        &'a self,
970        cf_handle: &impl AsColumnFamilyRef,
971        readopts: ReadOptions,
972    ) -> DBRawIteratorWithThreadMode<'b, Self> {
973        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
974    }
975
976    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
977        SnapshotWithThreadMode::<Self>::new(self)
978    }
979
980    fn drop_column_family<C>(
981        &self,
982        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
983        _cf: C,
984    ) -> Result<(), Error> {
985        unsafe {
986            // first mark the column family as dropped
987            ffi_try!(ffi::rocksdb_drop_column_family(
988                self.inner as *mut ffi::rocksdb_t,
989                cf_inner
990            ));
991        }
992        // Since `_cf` is dropped here, the column family handle is destroyed
993        // and any resources (mem, files) are reclaimed.
994        Ok(())
995    }
996}
997
998impl TransactionDB<SingleThreaded> {
999    /// Creates column family with given name and options.
1000    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
1001        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1002        self.cfs
1003            .cfs
1004            .insert(name.as_ref().to_string(), ColumnFamily { inner });
1005        Ok(())
1006    }
1007
1008    /// Returns the underlying column family handle.
1009    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
1010        self.cfs.cfs.get(name)
1011    }
1012
1013    /// Drops the column family with the given name
1014    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
1015        match self.cfs.cfs.remove(name) {
1016            Some(cf) => self.drop_column_family(cf.inner, cf),
1017            _ => Err(Error::new(format!("Invalid column family: {name}"))),
1018        }
1019    }
1020}
1021
1022impl TransactionDB<MultiThreaded> {
1023    /// Creates column family with given name and options.
1024    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1025        // Note that we acquire the cfs lock before inserting: otherwise we might race
1026        // another caller who observed the handle as missing.
1027        let mut cfs = self.cfs.cfs.write();
1028        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1029        cfs.insert(
1030            name.as_ref().to_string(),
1031            Arc::new(UnboundColumnFamily { inner }),
1032        );
1033        Ok(())
1034    }
1035
1036    /// Returns the underlying column family handle.
1037    pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
1038        self.cfs
1039            .cfs
1040            .read()
1041            .get(name)
1042            .cloned()
1043            .map(UnboundColumnFamily::bound_column_family)
1044    }
1045
1046    /// Drops the column family with the given name by internally locking the inner column
1047    /// family map. This avoids needing `&mut self` reference
1048    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1049        match self.cfs.cfs.write().remove(name) {
1050            Some(cf) => self.drop_column_family(cf.inner, cf),
1051            _ => Err(Error::new(format!("Invalid column family: {name}"))),
1052        }
1053    }
1054
1055    /// Implementation for property_value et al methods.
1056    ///
1057    /// `name` is the name of the property.  It will be converted into a CString
1058    /// and passed to `get_property` as argument.  `get_property` reads the
1059    /// specified property and either returns NULL or a pointer to a C allocated
1060    /// string; this method takes ownership of that string and will free it at
1061    /// the end. That string is parsed using `parse` callback which produces
1062    /// the returned result.
1063    fn property_value_impl<R>(
1064        name: impl CStrLike,
1065        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1066        parse: impl FnOnce(&str) -> Result<R, Error>,
1067    ) -> Result<Option<R>, Error> {
1068        let value = match name.bake() {
1069            Ok(prop_name) => get_property(prop_name.as_ptr()),
1070            Err(e) => {
1071                return Err(Error::new(format!(
1072                    "Failed to convert property name to CString: {e}"
1073                )));
1074            }
1075        };
1076        if value.is_null() {
1077            return Ok(None);
1078        }
1079        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1080            Ok(s) => parse(s).map(|value| Some(value)),
1081            Err(e) => Err(Error::new(format!(
1082                "Failed to convert property value to string: {e}"
1083            ))),
1084        };
1085        unsafe {
1086            ffi::rocksdb_free(value as *mut c_void);
1087        }
1088        result
1089    }
1090
1091    /// Retrieves a RocksDB property by name.
1092    ///
1093    /// Full list of properties could be find
1094    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1095    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1096        Self::property_value_impl(
1097            name,
1098            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1099            |str_value| Ok(str_value.to_owned()),
1100        )
1101    }
1102
1103    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1104        value.parse::<u64>().map_err(|err| {
1105            Error::new(format!(
1106                "Failed to convert property value {value} to int: {err}"
1107            ))
1108        })
1109    }
1110
1111    /// Retrieves a RocksDB property and casts it to an integer.
1112    ///
1113    /// Full list of properties that return int values could be find
1114    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1115    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1116        Self::property_value_impl(
1117            name,
1118            |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1119            Self::parse_property_int_value,
1120        )
1121    }
1122}
1123
1124impl<T: ThreadMode> Drop for TransactionDB<T> {
1125    fn drop(&mut self) {
1126        unsafe {
1127            self.prepared_transactions().clear();
1128            self.cfs.drop_all_cfs_internal();
1129            ffi::rocksdb_transactiondb_close(self.inner);
1130        }
1131    }
1132}