rust_rocksdb/
db.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use crate::{
17    column_family::AsColumnFamilyRef,
18    column_family::BoundColumnFamily,
19    column_family::UnboundColumnFamily,
20    db_options::OptionsMustOutliveDB,
21    ffi,
22    ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23    ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24    DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25    IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26    WaitForCompactOptions, WriteBatch, WriteBatchWithIndex, WriteOptions,
27    DEFAULT_COLUMN_FAMILY_NAME,
28};
29
30use crate::column_family::ColumnFamilyTtl;
31use crate::ffi_util::CSlice;
32use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
33use parking_lot::RwLock;
34use std::collections::BTreeMap;
35use std::ffi::{CStr, CString};
36use std::fmt;
37use std::fs;
38use std::iter;
39use std::path::Path;
40use std::path::PathBuf;
41use std::ptr;
42use std::slice;
43use std::str;
44use std::sync::Arc;
45use std::time::Duration;
46
47/// A range of keys, `start_key` is included, but not `end_key`.
48///
49/// You should make sure `end_key` is not less than `start_key`.
50pub struct Range<'a> {
51    start_key: &'a [u8],
52    end_key: &'a [u8],
53}
54
55impl<'a> Range<'a> {
56    pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
57        Range { start_key, end_key }
58    }
59}
60
61/// Marker trait to specify single or multi threaded column family alternations for
62/// [`DBWithThreadMode<T>`]
63///
64/// This arrangement makes differences in self mutability and return type in
65/// some of `DBWithThreadMode` methods.
66///
67/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
68/// also has a minimum set of not-encapsulated internal methods between
69/// [`SingleThreaded`] and [`MultiThreaded`].  These methods aren't expected to be
70/// called and defined externally.
71pub trait ThreadMode {
72    /// Internal implementation for storing column family handles
73    fn new_cf_map_internal(
74        cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
75    ) -> Self;
76    /// Internal implementation for dropping column family handles
77    fn drop_all_cfs_internal(&mut self);
78}
79
80/// Actual marker type for the marker trait `ThreadMode`, which holds
81/// a collection of column families without synchronization primitive, providing
82/// no overhead for the single-threaded column family alternations. The other
83/// mode is [`MultiThreaded`].
84///
85/// See [`DB`] for more details, including performance implications for each mode
86pub struct SingleThreaded {
87    pub(crate) cfs: BTreeMap<String, ColumnFamily>,
88}
89
90/// Actual marker type for the marker trait `ThreadMode`, which holds
91/// a collection of column families wrapped in a RwLock to be mutated
92/// concurrently. The other mode is [`SingleThreaded`].
93///
94/// See [`DB`] for more details, including performance implications for each mode
95pub struct MultiThreaded {
96    pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
97}
98
99impl ThreadMode for SingleThreaded {
100    fn new_cf_map_internal(
101        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
102    ) -> Self {
103        Self {
104            cfs: cfs
105                .into_iter()
106                .map(|(n, c)| (n, ColumnFamily { inner: c }))
107                .collect(),
108        }
109    }
110
111    fn drop_all_cfs_internal(&mut self) {
112        // Cause all ColumnFamily objects to be Drop::drop()-ed.
113        self.cfs.clear();
114    }
115}
116
117impl ThreadMode for MultiThreaded {
118    fn new_cf_map_internal(
119        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
120    ) -> Self {
121        Self {
122            cfs: RwLock::new(
123                cfs.into_iter()
124                    .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
125                    .collect(),
126            ),
127        }
128    }
129
130    fn drop_all_cfs_internal(&mut self) {
131        // Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
132        self.cfs.write().clear();
133    }
134}
135
136/// Get underlying `rocksdb_t`.
137pub trait DBInner {
138    fn inner(&self) -> *mut ffi::rocksdb_t;
139}
140
141/// A helper type to implement some common methods for [`DBWithThreadMode`]
142/// and [`OptimisticTransactionDB`].
143///
144/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
145pub struct DBCommon<T: ThreadMode, D: DBInner> {
146    pub(crate) inner: D,
147    cfs: T, // Column families are held differently depending on thread mode
148    path: PathBuf,
149    _outlive: Vec<OptionsMustOutliveDB>,
150}
151
152/// Minimal set of DB-related methods, intended to be generic over
153/// `DBWithThreadMode<T>`. Mainly used internally
154pub trait DBAccess {
155    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
156
157    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
158
159    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
160
161    unsafe fn create_iterator_cf(
162        &self,
163        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
164        readopts: &ReadOptions,
165    ) -> *mut ffi::rocksdb_iterator_t;
166
167    fn get_opt<K: AsRef<[u8]>>(
168        &self,
169        key: K,
170        readopts: &ReadOptions,
171    ) -> Result<Option<Vec<u8>>, Error>;
172
173    fn get_cf_opt<K: AsRef<[u8]>>(
174        &self,
175        cf: &impl AsColumnFamilyRef,
176        key: K,
177        readopts: &ReadOptions,
178    ) -> Result<Option<Vec<u8>>, Error>;
179
180    fn get_pinned_opt<K: AsRef<[u8]>>(
181        &'_ self,
182        key: K,
183        readopts: &ReadOptions,
184    ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
185
186    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
187        &'_ self,
188        cf: &impl AsColumnFamilyRef,
189        key: K,
190        readopts: &ReadOptions,
191    ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
192
193    fn multi_get_opt<K, I>(
194        &self,
195        keys: I,
196        readopts: &ReadOptions,
197    ) -> Vec<Result<Option<Vec<u8>>, Error>>
198    where
199        K: AsRef<[u8]>,
200        I: IntoIterator<Item = K>;
201
202    fn multi_get_cf_opt<'b, K, I, W>(
203        &self,
204        keys_cf: I,
205        readopts: &ReadOptions,
206    ) -> Vec<Result<Option<Vec<u8>>, Error>>
207    where
208        K: AsRef<[u8]>,
209        I: IntoIterator<Item = (&'b W, K)>,
210        W: AsColumnFamilyRef + 'b;
211}
212
213impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
214    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
215        unsafe { ffi::rocksdb_create_snapshot(self.inner.inner()) }
216    }
217
218    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
219        unsafe {
220            ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
221        }
222    }
223
224    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
225        unsafe { ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner) }
226    }
227
228    unsafe fn create_iterator_cf(
229        &self,
230        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
231        readopts: &ReadOptions,
232    ) -> *mut ffi::rocksdb_iterator_t {
233        unsafe { ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle) }
234    }
235
236    fn get_opt<K: AsRef<[u8]>>(
237        &self,
238        key: K,
239        readopts: &ReadOptions,
240    ) -> Result<Option<Vec<u8>>, Error> {
241        self.get_opt(key, readopts)
242    }
243
244    fn get_cf_opt<K: AsRef<[u8]>>(
245        &self,
246        cf: &impl AsColumnFamilyRef,
247        key: K,
248        readopts: &ReadOptions,
249    ) -> Result<Option<Vec<u8>>, Error> {
250        self.get_cf_opt(cf, key, readopts)
251    }
252
253    fn get_pinned_opt<K: AsRef<[u8]>>(
254        &'_ self,
255        key: K,
256        readopts: &ReadOptions,
257    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
258        self.get_pinned_opt(key, readopts)
259    }
260
261    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
262        &'_ self,
263        cf: &impl AsColumnFamilyRef,
264        key: K,
265        readopts: &ReadOptions,
266    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
267        self.get_pinned_cf_opt(cf, key, readopts)
268    }
269
270    fn multi_get_opt<K, Iter>(
271        &self,
272        keys: Iter,
273        readopts: &ReadOptions,
274    ) -> Vec<Result<Option<Vec<u8>>, Error>>
275    where
276        K: AsRef<[u8]>,
277        Iter: IntoIterator<Item = K>,
278    {
279        self.multi_get_opt(keys, readopts)
280    }
281
282    fn multi_get_cf_opt<'b, K, Iter, W>(
283        &self,
284        keys_cf: Iter,
285        readopts: &ReadOptions,
286    ) -> Vec<Result<Option<Vec<u8>>, Error>>
287    where
288        K: AsRef<[u8]>,
289        Iter: IntoIterator<Item = (&'b W, K)>,
290        W: AsColumnFamilyRef + 'b,
291    {
292        self.multi_get_cf_opt(keys_cf, readopts)
293    }
294}
295
296pub struct DBWithThreadModeInner {
297    inner: *mut ffi::rocksdb_t,
298}
299
300impl DBInner for DBWithThreadModeInner {
301    fn inner(&self) -> *mut ffi::rocksdb_t {
302        self.inner
303    }
304}
305
306impl Drop for DBWithThreadModeInner {
307    fn drop(&mut self) {
308        unsafe {
309            ffi::rocksdb_close(self.inner);
310        }
311    }
312}
313
314/// A type alias to RocksDB database.
315///
316/// See crate level documentation for a simple usage example.
317/// See [`DBCommon`] for full list of methods.
318pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
319
320/// A type alias to DB instance type with the single-threaded column family
321/// creations/deletions
322///
323/// # Compatibility and multi-threaded mode
324///
325/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
326/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
327/// column family alternations.
328///
329/// # Limited performance implication for single-threaded mode
330///
331/// Even with [`SingleThreaded`], almost all of RocksDB operations is
332/// multi-threaded unless the underlying RocksDB instance is
333/// specifically configured otherwise. `SingleThreaded` only forces
334/// serialization of column family alternations by requiring `&mut self` of DB
335/// instance due to its wrapper implementation details.
336///
337/// # Multi-threaded mode
338///
339/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
340/// workload including multi-threaded column family alternations, costing the
341/// RwLock overhead inside `DB`.
342#[cfg(not(feature = "multi-threaded-cf"))]
343pub type DB = DBWithThreadMode<SingleThreaded>;
344
345#[cfg(feature = "multi-threaded-cf")]
346pub type DB = DBWithThreadMode<MultiThreaded>;
347
348// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
349// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
350// rocksdb internally does not rely on thread-local information for its user-exposed types.
351unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
352
353// Sync is similarly safe for many types because they do not expose interior mutability, and their
354// use within the rocksdb library is generally behind a const reference
355unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
356
357// Specifies whether open DB for read only.
358enum AccessType<'a> {
359    ReadWrite,
360    ReadOnly { error_if_log_file_exist: bool },
361    Secondary { secondary_path: &'a Path },
362    WithTTL { ttl: Duration },
363}
364
365/// Methods of `DBWithThreadMode`.
366impl<T: ThreadMode> DBWithThreadMode<T> {
367    /// Opens a database with default options.
368    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
369        let mut opts = Options::default();
370        opts.create_if_missing(true);
371        Self::open(&opts, path)
372    }
373
374    /// Opens the database with the specified options.
375    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
376        Self::open_cf(opts, path, None::<&str>)
377    }
378
379    /// Opens the database for read only with the specified options.
380    pub fn open_for_read_only<P: AsRef<Path>>(
381        opts: &Options,
382        path: P,
383        error_if_log_file_exist: bool,
384    ) -> Result<Self, Error> {
385        Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
386    }
387
388    /// Opens the database as a secondary.
389    pub fn open_as_secondary<P: AsRef<Path>>(
390        opts: &Options,
391        primary_path: P,
392        secondary_path: P,
393    ) -> Result<Self, Error> {
394        Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
395    }
396
397    /// Opens the database with a Time to Live compaction filter.
398    ///
399    /// This applies the given `ttl` to all column families created without an explicit TTL.
400    /// See [`DB::open_cf_descriptors_with_ttl`] for more control over individual column family TTLs.
401    pub fn open_with_ttl<P: AsRef<Path>>(
402        opts: &Options,
403        path: P,
404        ttl: Duration,
405    ) -> Result<Self, Error> {
406        Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
407    }
408
409    /// Opens the database with a Time to Live compaction filter and column family names.
410    ///
411    /// Column families opened using this function will be created with default `Options`.
412    pub fn open_cf_with_ttl<P, I, N>(
413        opts: &Options,
414        path: P,
415        cfs: I,
416        ttl: Duration,
417    ) -> Result<Self, Error>
418    where
419        P: AsRef<Path>,
420        I: IntoIterator<Item = N>,
421        N: AsRef<str>,
422    {
423        let cfs = cfs
424            .into_iter()
425            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
426
427        Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
428    }
429
430    /// Opens a database with the given database with a Time to Live compaction filter and
431    /// column family descriptors.
432    ///
433    /// Applies the provided `ttl` as the default TTL for all column families.
434    /// Column families will inherit this TTL by default, unless their descriptor explicitly
435    /// sets a different TTL using [`ColumnFamilyTtl::Duration`] or opts out using [`ColumnFamilyTtl::Disabled`].
436    ///
437    /// *NOTE*: The `default` column family is opened with `Options::default()` unless
438    /// explicitly configured within the `cfs` iterator.
439    /// To customize the `default` column family's options, include a `ColumnFamilyDescriptor`
440    /// with the name "default" in the `cfs` iterator.
441    ///
442    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
443    pub fn open_cf_descriptors_with_ttl<P, I>(
444        opts: &Options,
445        path: P,
446        cfs: I,
447        ttl: Duration,
448    ) -> Result<Self, Error>
449    where
450        P: AsRef<Path>,
451        I: IntoIterator<Item = ColumnFamilyDescriptor>,
452    {
453        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
454    }
455
456    /// Opens a database with the given database options and column family names.
457    ///
458    /// Column families opened using this function will be created with default `Options`.
459    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
460    where
461        P: AsRef<Path>,
462        I: IntoIterator<Item = N>,
463        N: AsRef<str>,
464    {
465        let cfs = cfs
466            .into_iter()
467            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
468
469        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
470    }
471
472    /// Opens a database with the given database options and column family names.
473    ///
474    /// Column families opened using given `Options`.
475    pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
476    where
477        P: AsRef<Path>,
478        I: IntoIterator<Item = (N, Options)>,
479        N: AsRef<str>,
480    {
481        let cfs = cfs
482            .into_iter()
483            .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
484
485        Self::open_cf_descriptors(opts, path, cfs)
486    }
487
488    /// Opens a database for read only with the given database options and column family names.
489    /// *NOTE*: `default` column family is opened with `Options::default()`.
490    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
491    pub fn open_cf_for_read_only<P, I, N>(
492        opts: &Options,
493        path: P,
494        cfs: I,
495        error_if_log_file_exist: bool,
496    ) -> Result<Self, Error>
497    where
498        P: AsRef<Path>,
499        I: IntoIterator<Item = N>,
500        N: AsRef<str>,
501    {
502        let cfs = cfs
503            .into_iter()
504            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
505
506        Self::open_cf_descriptors_internal(
507            opts,
508            path,
509            cfs,
510            &AccessType::ReadOnly {
511                error_if_log_file_exist,
512            },
513        )
514    }
515
516    /// Opens a database for read only with the given database options and column family names.
517    /// *NOTE*: `default` column family is opened with `Options::default()`.
518    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
519    pub fn open_cf_with_opts_for_read_only<P, I, N>(
520        db_opts: &Options,
521        path: P,
522        cfs: I,
523        error_if_log_file_exist: bool,
524    ) -> Result<Self, Error>
525    where
526        P: AsRef<Path>,
527        I: IntoIterator<Item = (N, Options)>,
528        N: AsRef<str>,
529    {
530        let cfs = cfs
531            .into_iter()
532            .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
533
534        Self::open_cf_descriptors_internal(
535            db_opts,
536            path,
537            cfs,
538            &AccessType::ReadOnly {
539                error_if_log_file_exist,
540            },
541        )
542    }
543
544    /// Opens a database for ready only with the given database options and
545    /// column family descriptors.
546    /// *NOTE*: `default` column family is opened with `Options::default()`.
547    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
548    pub fn open_cf_descriptors_read_only<P, I>(
549        opts: &Options,
550        path: P,
551        cfs: I,
552        error_if_log_file_exist: bool,
553    ) -> Result<Self, Error>
554    where
555        P: AsRef<Path>,
556        I: IntoIterator<Item = ColumnFamilyDescriptor>,
557    {
558        Self::open_cf_descriptors_internal(
559            opts,
560            path,
561            cfs,
562            &AccessType::ReadOnly {
563                error_if_log_file_exist,
564            },
565        )
566    }
567
568    /// Opens the database as a secondary with the given database options and column family names.
569    /// *NOTE*: `default` column family is opened with `Options::default()`.
570    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
571    pub fn open_cf_as_secondary<P, I, N>(
572        opts: &Options,
573        primary_path: P,
574        secondary_path: P,
575        cfs: I,
576    ) -> Result<Self, Error>
577    where
578        P: AsRef<Path>,
579        I: IntoIterator<Item = N>,
580        N: AsRef<str>,
581    {
582        let cfs = cfs
583            .into_iter()
584            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
585
586        Self::open_cf_descriptors_internal(
587            opts,
588            primary_path,
589            cfs,
590            &AccessType::Secondary {
591                secondary_path: secondary_path.as_ref(),
592            },
593        )
594    }
595
596    /// Opens the database as a secondary with the given database options and
597    /// column family descriptors.
598    /// *NOTE*: `default` column family is opened with `Options::default()`.
599    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
600    pub fn open_cf_descriptors_as_secondary<P, I>(
601        opts: &Options,
602        path: P,
603        secondary_path: P,
604        cfs: I,
605    ) -> Result<Self, Error>
606    where
607        P: AsRef<Path>,
608        I: IntoIterator<Item = ColumnFamilyDescriptor>,
609    {
610        Self::open_cf_descriptors_internal(
611            opts,
612            path,
613            cfs,
614            &AccessType::Secondary {
615                secondary_path: secondary_path.as_ref(),
616            },
617        )
618    }
619
620    /// Opens a database with the given database options and column family descriptors.
621    /// *NOTE*: `default` column family is opened with `Options::default()`.
622    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
623    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
624    where
625        P: AsRef<Path>,
626        I: IntoIterator<Item = ColumnFamilyDescriptor>,
627    {
628        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
629    }
630
631    /// Internal implementation for opening RocksDB.
632    fn open_cf_descriptors_internal<P, I>(
633        opts: &Options,
634        path: P,
635        cfs: I,
636        access_type: &AccessType,
637    ) -> Result<Self, Error>
638    where
639        P: AsRef<Path>,
640        I: IntoIterator<Item = ColumnFamilyDescriptor>,
641    {
642        let cfs: Vec<_> = cfs.into_iter().collect();
643        let outlive = iter::once(opts.outlive.clone())
644            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
645            .collect();
646
647        let cpath = to_cpath(&path)?;
648
649        if let Err(e) = fs::create_dir_all(&path) {
650            return Err(Error::new(format!(
651                "Failed to create RocksDB directory: `{e:?}`."
652            )));
653        }
654
655        let db: *mut ffi::rocksdb_t;
656        let mut cf_map = BTreeMap::new();
657
658        if cfs.is_empty() {
659            db = Self::open_raw(opts, &cpath, access_type)?;
660        } else {
661            let mut cfs_v = cfs;
662            // Always open the default column family.
663            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
664                cfs_v.push(ColumnFamilyDescriptor {
665                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
666                    options: Options::default(),
667                    ttl: ColumnFamilyTtl::SameAsDb,
668                });
669            }
670            // We need to store our CStrings in an intermediate vector
671            // so that their pointers remain valid.
672            let c_cfs: Vec<CString> = cfs_v
673                .iter()
674                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
675                .collect();
676
677            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
678
679            // These handles will be populated by DB.
680            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
681
682            let cfopts: Vec<_> = cfs_v
683                .iter()
684                .map(|cf| cf.options.inner.cast_const())
685                .collect();
686
687            db = Self::open_cf_raw(
688                opts,
689                &cpath,
690                &cfs_v,
691                &cfnames,
692                &cfopts,
693                &mut cfhandles,
694                access_type,
695            )?;
696            for handle in &cfhandles {
697                if handle.is_null() {
698                    return Err(Error::new(
699                        "Received null column family handle from DB.".to_owned(),
700                    ));
701                }
702            }
703
704            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
705                cf_map.insert(cf_desc.name.clone(), inner);
706            }
707        }
708
709        if db.is_null() {
710            return Err(Error::new("Could not initialize database.".to_owned()));
711        }
712
713        Ok(Self {
714            inner: DBWithThreadModeInner { inner: db },
715            path: path.as_ref().to_path_buf(),
716            cfs: T::new_cf_map_internal(cf_map),
717            _outlive: outlive,
718        })
719    }
720
721    fn open_raw(
722        opts: &Options,
723        cpath: &CString,
724        access_type: &AccessType,
725    ) -> Result<*mut ffi::rocksdb_t, Error> {
726        let db = unsafe {
727            match *access_type {
728                AccessType::ReadOnly {
729                    error_if_log_file_exist,
730                } => ffi_try!(ffi::rocksdb_open_for_read_only(
731                    opts.inner,
732                    cpath.as_ptr(),
733                    c_uchar::from(error_if_log_file_exist),
734                )),
735                AccessType::ReadWrite => {
736                    ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
737                }
738                AccessType::Secondary { secondary_path } => {
739                    ffi_try!(ffi::rocksdb_open_as_secondary(
740                        opts.inner,
741                        cpath.as_ptr(),
742                        to_cpath(secondary_path)?.as_ptr(),
743                    ))
744                }
745                AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
746                    opts.inner,
747                    cpath.as_ptr(),
748                    ttl.as_secs() as c_int,
749                )),
750            }
751        };
752        Ok(db)
753    }
754
755    #[allow(clippy::pedantic)]
756    fn open_cf_raw(
757        opts: &Options,
758        cpath: &CString,
759        cfs_v: &[ColumnFamilyDescriptor],
760        cfnames: &[*const c_char],
761        cfopts: &[*const ffi::rocksdb_options_t],
762        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
763        access_type: &AccessType,
764    ) -> Result<*mut ffi::rocksdb_t, Error> {
765        let db = unsafe {
766            match *access_type {
767                AccessType::ReadOnly {
768                    error_if_log_file_exist,
769                } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
770                    opts.inner,
771                    cpath.as_ptr(),
772                    cfs_v.len() as c_int,
773                    cfnames.as_ptr(),
774                    cfopts.as_ptr(),
775                    cfhandles.as_mut_ptr(),
776                    c_uchar::from(error_if_log_file_exist),
777                )),
778                AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
779                    opts.inner,
780                    cpath.as_ptr(),
781                    cfs_v.len() as c_int,
782                    cfnames.as_ptr(),
783                    cfopts.as_ptr(),
784                    cfhandles.as_mut_ptr(),
785                )),
786                AccessType::Secondary { secondary_path } => {
787                    ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
788                        opts.inner,
789                        cpath.as_ptr(),
790                        to_cpath(secondary_path)?.as_ptr(),
791                        cfs_v.len() as c_int,
792                        cfnames.as_ptr(),
793                        cfopts.as_ptr(),
794                        cfhandles.as_mut_ptr(),
795                    ))
796                }
797                AccessType::WithTTL { ttl } => {
798                    let ttls: Vec<_> = cfs_v
799                        .iter()
800                        .map(|cf| match cf.ttl {
801                            ColumnFamilyTtl::Disabled => i32::MAX,
802                            ColumnFamilyTtl::Duration(duration) => duration.as_secs() as i32,
803                            ColumnFamilyTtl::SameAsDb => ttl.as_secs() as i32,
804                        })
805                        .collect();
806
807                    ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
808                        opts.inner,
809                        cpath.as_ptr(),
810                        cfs_v.len() as c_int,
811                        cfnames.as_ptr(),
812                        cfopts.as_ptr(),
813                        cfhandles.as_mut_ptr(),
814                        ttls.as_ptr(),
815                    ))
816                }
817            }
818        };
819        Ok(db)
820    }
821
822    /// Removes the database entries in the range `["from", "to")` using given write options.
823    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
824        &self,
825        cf: &impl AsColumnFamilyRef,
826        from: K,
827        to: K,
828        writeopts: &WriteOptions,
829    ) -> Result<(), Error> {
830        let from = from.as_ref();
831        let to = to.as_ref();
832
833        unsafe {
834            ffi_try!(ffi::rocksdb_delete_range_cf(
835                self.inner.inner(),
836                writeopts.inner,
837                cf.inner(),
838                from.as_ptr() as *const c_char,
839                from.len() as size_t,
840                to.as_ptr() as *const c_char,
841                to.len() as size_t,
842            ));
843            Ok(())
844        }
845    }
846
847    /// Removes the database entries in the range `["from", "to")` using default write options.
848    pub fn delete_range_cf<K: AsRef<[u8]>>(
849        &self,
850        cf: &impl AsColumnFamilyRef,
851        from: K,
852        to: K,
853    ) -> Result<(), Error> {
854        self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
855    }
856
857    pub fn write_opt(&self, batch: &WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
858        unsafe {
859            ffi_try!(ffi::rocksdb_write(
860                self.inner.inner(),
861                writeopts.inner,
862                batch.inner
863            ));
864        }
865        Ok(())
866    }
867
868    pub fn write(&self, batch: &WriteBatch) -> Result<(), Error> {
869        self.write_opt(batch, &WriteOptions::default())
870    }
871
872    pub fn write_without_wal(&self, batch: &WriteBatch) -> Result<(), Error> {
873        let mut wo = WriteOptions::new();
874        wo.disable_wal(true);
875        self.write_opt(batch, &wo)
876    }
877
878    pub fn write_wbwi(&self, wbwi: &WriteBatchWithIndex) -> Result<(), Error> {
879        self.write_wbwi_opt(wbwi, &WriteOptions::default())
880    }
881
882    pub fn write_wbwi_opt(
883        &self,
884        wbwi: &WriteBatchWithIndex,
885        writeopts: &WriteOptions,
886    ) -> Result<(), Error> {
887        unsafe {
888            ffi_try!(ffi::rocksdb_write_writebatch_wi(
889                self.inner.inner(),
890                writeopts.inner,
891                wbwi.inner
892            ));
893
894            Ok(())
895        }
896    }
897}
898
899/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
900impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
901    pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
902        Self {
903            inner,
904            cfs,
905            path,
906            _outlive: outlive,
907        }
908    }
909
910    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
911        let cpath = to_cpath(path)?;
912        let mut length = 0;
913
914        unsafe {
915            let ptr = ffi_try!(ffi::rocksdb_list_column_families(
916                opts.inner,
917                cpath.as_ptr(),
918                &mut length,
919            ));
920
921            let vec = slice::from_raw_parts(ptr, length)
922                .iter()
923                .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
924                .collect();
925            ffi::rocksdb_list_column_families_destroy(ptr, length);
926            Ok(vec)
927        }
928    }
929
930    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
931        let cpath = to_cpath(path)?;
932        unsafe {
933            ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
934        }
935        Ok(())
936    }
937
938    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
939        let cpath = to_cpath(path)?;
940        unsafe {
941            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
942        }
943        Ok(())
944    }
945
946    pub fn path(&self) -> &Path {
947        self.path.as_path()
948    }
949
950    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
951    /// the data to disk.
952    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
953        unsafe {
954            ffi_try!(ffi::rocksdb_flush_wal(
955                self.inner.inner(),
956                c_uchar::from(sync)
957            ));
958        }
959        Ok(())
960    }
961
962    /// Flushes database memtables to SST files on the disk.
963    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
964        unsafe {
965            ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
966        }
967        Ok(())
968    }
969
970    /// Flushes database memtables to SST files on the disk using default options.
971    pub fn flush(&self) -> Result<(), Error> {
972        self.flush_opt(&FlushOptions::default())
973    }
974
975    /// Flushes database memtables to SST files on the disk for a given column family.
976    pub fn flush_cf_opt(
977        &self,
978        cf: &impl AsColumnFamilyRef,
979        flushopts: &FlushOptions,
980    ) -> Result<(), Error> {
981        unsafe {
982            ffi_try!(ffi::rocksdb_flush_cf(
983                self.inner.inner(),
984                flushopts.inner,
985                cf.inner()
986            ));
987        }
988        Ok(())
989    }
990
991    /// Flushes multiple column families.
992    ///
993    /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
994    /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
995    /// number at the time when flush is requested.
996    pub fn flush_cfs_opt(
997        &self,
998        cfs: &[&impl AsColumnFamilyRef],
999        opts: &FlushOptions,
1000    ) -> Result<(), Error> {
1001        let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
1002        unsafe {
1003            ffi_try!(ffi::rocksdb_flush_cfs(
1004                self.inner.inner(),
1005                opts.inner,
1006                cfs.as_mut_ptr(),
1007                cfs.len() as libc::c_int,
1008            ));
1009        }
1010        Ok(())
1011    }
1012
1013    /// Flushes database memtables to SST files on the disk for a given column family using default
1014    /// options.
1015    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
1016        self.flush_cf_opt(cf, &FlushOptions::default())
1017    }
1018
1019    /// Return the bytes associated with a key value with read options. If you only intend to use
1020    /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
1021    /// to avoid unnecessary memory copy.
1022    pub fn get_opt<K: AsRef<[u8]>>(
1023        &self,
1024        key: K,
1025        readopts: &ReadOptions,
1026    ) -> Result<Option<Vec<u8>>, Error> {
1027        self.get_pinned_opt(key, readopts)
1028            .map(|x| x.map(|v| v.as_ref().to_vec()))
1029    }
1030
1031    /// Return the bytes associated with a key value. If you only intend to use the vector returned
1032    /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
1033    /// copy.
1034    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
1035        self.get_opt(key.as_ref(), &ReadOptions::default())
1036    }
1037
1038    /// Return the bytes associated with a key value and the given column family with read options.
1039    /// If you only intend to use the vector returned temporarily, consider using
1040    /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
1041    pub fn get_cf_opt<K: AsRef<[u8]>>(
1042        &self,
1043        cf: &impl AsColumnFamilyRef,
1044        key: K,
1045        readopts: &ReadOptions,
1046    ) -> Result<Option<Vec<u8>>, Error> {
1047        self.get_pinned_cf_opt(cf, key, readopts)
1048            .map(|x| x.map(|v| v.as_ref().to_vec()))
1049    }
1050
1051    /// Return the bytes associated with a key value and the given column family. If you only
1052    /// intend to use the vector returned temporarily, consider using
1053    /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
1054    pub fn get_cf<K: AsRef<[u8]>>(
1055        &self,
1056        cf: &impl AsColumnFamilyRef,
1057        key: K,
1058    ) -> Result<Option<Vec<u8>>, Error> {
1059        self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
1060    }
1061
1062    /// Return the value associated with a key using RocksDB's PinnableSlice
1063    /// so as to avoid unnecessary memory copy.
1064    pub fn get_pinned_opt<K: AsRef<[u8]>>(
1065        &'_ self,
1066        key: K,
1067        readopts: &ReadOptions,
1068    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1069        if readopts.inner.is_null() {
1070            return Err(Error::new(
1071                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1072                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1073                    .to_owned(),
1074            ));
1075        }
1076
1077        let key = key.as_ref();
1078        unsafe {
1079            let val = ffi_try!(ffi::rocksdb_get_pinned(
1080                self.inner.inner(),
1081                readopts.inner,
1082                key.as_ptr() as *const c_char,
1083                key.len() as size_t,
1084            ));
1085            if val.is_null() {
1086                Ok(None)
1087            } else {
1088                Ok(Some(DBPinnableSlice::from_c(val)))
1089            }
1090        }
1091    }
1092
1093    /// Return the value associated with a key using RocksDB's PinnableSlice
1094    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1095    /// leverages default options.
1096    pub fn get_pinned<K: AsRef<[u8]>>(
1097        &'_ self,
1098        key: K,
1099    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1100        self.get_pinned_opt(key, &ReadOptions::default())
1101    }
1102
1103    /// Return the value associated with a key using RocksDB's PinnableSlice
1104    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1105    /// allows specifying ColumnFamily
1106    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1107        &'_ self,
1108        cf: &impl AsColumnFamilyRef,
1109        key: K,
1110        readopts: &ReadOptions,
1111    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1112        if readopts.inner.is_null() {
1113            return Err(Error::new(
1114                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1115                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1116                    .to_owned(),
1117            ));
1118        }
1119
1120        let key = key.as_ref();
1121        unsafe {
1122            let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1123                self.inner.inner(),
1124                readopts.inner,
1125                cf.inner(),
1126                key.as_ptr() as *const c_char,
1127                key.len() as size_t,
1128            ));
1129            if val.is_null() {
1130                Ok(None)
1131            } else {
1132                Ok(Some(DBPinnableSlice::from_c(val)))
1133            }
1134        }
1135    }
1136
1137    /// Return the value associated with a key using RocksDB's PinnableSlice
1138    /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
1139    /// leverages default options.
1140    pub fn get_pinned_cf<K: AsRef<[u8]>>(
1141        &'_ self,
1142        cf: &impl AsColumnFamilyRef,
1143        key: K,
1144    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1145        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1146    }
1147
1148    /// Return the values associated with the given keys.
1149    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1150    where
1151        K: AsRef<[u8]>,
1152        I: IntoIterator<Item = K>,
1153    {
1154        self.multi_get_opt(keys, &ReadOptions::default())
1155    }
1156
1157    /// Return the values associated with the given keys using read options.
1158    pub fn multi_get_opt<K, I>(
1159        &self,
1160        keys: I,
1161        readopts: &ReadOptions,
1162    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1163    where
1164        K: AsRef<[u8]>,
1165        I: IntoIterator<Item = K>,
1166    {
1167        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1168            .into_iter()
1169            .map(|k| {
1170                let k = k.as_ref();
1171                (Box::from(k), k.len())
1172            })
1173            .unzip();
1174        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1175
1176        let mut values = vec![ptr::null_mut(); keys.len()];
1177        let mut values_sizes = vec![0_usize; keys.len()];
1178        let mut errors = vec![ptr::null_mut(); keys.len()];
1179        unsafe {
1180            ffi::rocksdb_multi_get(
1181                self.inner.inner(),
1182                readopts.inner,
1183                ptr_keys.len(),
1184                ptr_keys.as_ptr(),
1185                keys_sizes.as_ptr(),
1186                values.as_mut_ptr(),
1187                values_sizes.as_mut_ptr(),
1188                errors.as_mut_ptr(),
1189            );
1190        }
1191
1192        convert_values(values, values_sizes, errors)
1193    }
1194
1195    /// Return the values associated with the given keys and column families.
1196    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1197        &'a self,
1198        keys: I,
1199    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1200    where
1201        K: AsRef<[u8]>,
1202        I: IntoIterator<Item = (&'b W, K)>,
1203        W: 'b + AsColumnFamilyRef,
1204    {
1205        self.multi_get_cf_opt(keys, &ReadOptions::default())
1206    }
1207
1208    /// Return the values associated with the given keys and column families using read options.
1209    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1210        &'a self,
1211        keys: I,
1212        readopts: &ReadOptions,
1213    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1214    where
1215        K: AsRef<[u8]>,
1216        I: IntoIterator<Item = (&'b W, K)>,
1217        W: 'b + AsColumnFamilyRef,
1218    {
1219        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1220            .into_iter()
1221            .map(|(cf, key)| {
1222                let key = key.as_ref();
1223                ((cf, Box::from(key)), key.len())
1224            })
1225            .unzip();
1226        let ptr_keys: Vec<_> = cfs_and_keys
1227            .iter()
1228            .map(|(_, k)| k.as_ptr() as *const c_char)
1229            .collect();
1230        let ptr_cfs: Vec<_> = cfs_and_keys
1231            .iter()
1232            .map(|(c, _)| c.inner().cast_const())
1233            .collect();
1234
1235        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1236        let mut values_sizes = vec![0_usize; ptr_keys.len()];
1237        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1238        unsafe {
1239            ffi::rocksdb_multi_get_cf(
1240                self.inner.inner(),
1241                readopts.inner,
1242                ptr_cfs.as_ptr(),
1243                ptr_keys.len(),
1244                ptr_keys.as_ptr(),
1245                keys_sizes.as_ptr(),
1246                values.as_mut_ptr(),
1247                values_sizes.as_mut_ptr(),
1248                errors.as_mut_ptr(),
1249            );
1250        }
1251
1252        convert_values(values, values_sizes, errors)
1253    }
1254
1255    /// Return the values associated with the given keys and the specified column family
1256    /// where internally the read requests are processed in batch if block-based table
1257    /// SST format is used.  It is a more optimized version of multi_get_cf.
1258    pub fn batched_multi_get_cf<'a, K, I>(
1259        &'_ self,
1260        cf: &impl AsColumnFamilyRef,
1261        keys: I,
1262        sorted_input: bool,
1263    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1264    where
1265        K: AsRef<[u8]> + 'a + ?Sized,
1266        I: IntoIterator<Item = &'a K>,
1267    {
1268        self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1269    }
1270
1271    /// Return the values associated with the given keys and the specified column family
1272    /// where internally the read requests are processed in batch if block-based table
1273    /// SST format is used. It is a more optimized version of multi_get_cf_opt.
1274    pub fn batched_multi_get_cf_opt<'a, K, I>(
1275        &'_ self,
1276        cf: &impl AsColumnFamilyRef,
1277        keys: I,
1278        sorted_input: bool,
1279        readopts: &ReadOptions,
1280    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1281    where
1282        K: AsRef<[u8]> + 'a + ?Sized,
1283        I: IntoIterator<Item = &'a K>,
1284    {
1285        let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1286            .into_iter()
1287            .map(|k| {
1288                let k = k.as_ref();
1289                (k.as_ptr() as *const c_char, k.len())
1290            })
1291            .unzip();
1292
1293        let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1294        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1295
1296        unsafe {
1297            ffi::rocksdb_batched_multi_get_cf(
1298                self.inner.inner(),
1299                readopts.inner,
1300                cf.inner(),
1301                ptr_keys.len(),
1302                ptr_keys.as_ptr(),
1303                keys_sizes.as_ptr(),
1304                pinned_values.as_mut_ptr(),
1305                errors.as_mut_ptr(),
1306                sorted_input,
1307            );
1308            pinned_values
1309                .into_iter()
1310                .zip(errors)
1311                .map(|(v, e)| {
1312                    if e.is_null() {
1313                        if v.is_null() {
1314                            Ok(None)
1315                        } else {
1316                            Ok(Some(DBPinnableSlice::from_c(v)))
1317                        }
1318                    } else {
1319                        Err(Error::new(crate::ffi_util::error_message(e)))
1320                    }
1321                })
1322                .collect()
1323        }
1324    }
1325
1326    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1327    /// `true`. This function uses default `ReadOptions`.
1328    pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1329        self.key_may_exist_opt(key, &ReadOptions::default())
1330    }
1331
1332    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1333    /// `true`.
1334    pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1335        let key = key.as_ref();
1336        unsafe {
1337            0 != ffi::rocksdb_key_may_exist(
1338                self.inner.inner(),
1339                readopts.inner,
1340                key.as_ptr() as *const c_char,
1341                key.len() as size_t,
1342                ptr::null_mut(), /*value*/
1343                ptr::null_mut(), /*val_len*/
1344                ptr::null(),     /*timestamp*/
1345                0,               /*timestamp_len*/
1346                ptr::null_mut(), /*value_found*/
1347            )
1348        }
1349    }
1350
1351    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1352    /// otherwise returns `true`. This function uses default `ReadOptions`.
1353    pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1354        self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1355    }
1356
1357    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1358    /// otherwise returns `true`.
1359    pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1360        &self,
1361        cf: &impl AsColumnFamilyRef,
1362        key: K,
1363        readopts: &ReadOptions,
1364    ) -> bool {
1365        let key = key.as_ref();
1366        0 != unsafe {
1367            ffi::rocksdb_key_may_exist_cf(
1368                self.inner.inner(),
1369                readopts.inner,
1370                cf.inner(),
1371                key.as_ptr() as *const c_char,
1372                key.len() as size_t,
1373                ptr::null_mut(), /*value*/
1374                ptr::null_mut(), /*val_len*/
1375                ptr::null(),     /*timestamp*/
1376                0,               /*timestamp_len*/
1377                ptr::null_mut(), /*value_found*/
1378            )
1379        }
1380    }
1381
1382    /// If the key definitely does not exist in the database, then this method
1383    /// returns `(false, None)`, else `(true, None)` if it may.
1384    /// If the key is found in memory, then it returns `(true, Some<CSlice>)`.
1385    ///
1386    /// This check is potentially lighter-weight than calling `get()`. One way
1387    /// to make this lighter weight is to avoid doing any IOs.
1388    pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1389        &self,
1390        cf: &impl AsColumnFamilyRef,
1391        key: K,
1392        readopts: &ReadOptions,
1393    ) -> (bool, Option<CSlice>) {
1394        let key = key.as_ref();
1395        let mut val: *mut c_char = ptr::null_mut();
1396        let mut val_len: usize = 0;
1397        let mut value_found: c_uchar = 0;
1398        let may_exists = 0
1399            != unsafe {
1400                ffi::rocksdb_key_may_exist_cf(
1401                    self.inner.inner(),
1402                    readopts.inner,
1403                    cf.inner(),
1404                    key.as_ptr() as *const c_char,
1405                    key.len() as size_t,
1406                    &mut val,         /*value*/
1407                    &mut val_len,     /*val_len*/
1408                    ptr::null(),      /*timestamp*/
1409                    0,                /*timestamp_len*/
1410                    &mut value_found, /*value_found*/
1411                )
1412            };
1413        // The value is only allocated (using malloc) and returned if it is found and
1414        // value_found isn't NULL. In that case the user is responsible for freeing it.
1415        if may_exists && value_found != 0 {
1416            (
1417                may_exists,
1418                Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1419            )
1420        } else {
1421            (may_exists, None)
1422        }
1423    }
1424
1425    fn create_inner_cf_handle(
1426        &self,
1427        name: impl CStrLike,
1428        opts: &Options,
1429    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1430        let cf_name = name.bake().map_err(|err| {
1431            Error::new(format!(
1432                "Failed to convert path to CString when creating cf: {err}"
1433            ))
1434        })?;
1435        Ok(unsafe {
1436            ffi_try!(ffi::rocksdb_create_column_family(
1437                self.inner.inner(),
1438                opts.inner,
1439                cf_name.as_ptr(),
1440            ))
1441        })
1442    }
1443
1444    pub fn iterator<'a: 'b, 'b>(
1445        &'a self,
1446        mode: IteratorMode,
1447    ) -> DBIteratorWithThreadMode<'b, Self> {
1448        let readopts = ReadOptions::default();
1449        self.iterator_opt(mode, readopts)
1450    }
1451
1452    pub fn iterator_opt<'a: 'b, 'b>(
1453        &'a self,
1454        mode: IteratorMode,
1455        readopts: ReadOptions,
1456    ) -> DBIteratorWithThreadMode<'b, Self> {
1457        DBIteratorWithThreadMode::new(self, readopts, mode)
1458    }
1459
1460    /// Opens an iterator using the provided ReadOptions.
1461    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
1462    pub fn iterator_cf_opt<'a: 'b, 'b>(
1463        &'a self,
1464        cf_handle: &impl AsColumnFamilyRef,
1465        readopts: ReadOptions,
1466        mode: IteratorMode,
1467    ) -> DBIteratorWithThreadMode<'b, Self> {
1468        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1469    }
1470
1471    /// Opens an iterator with `set_total_order_seek` enabled.
1472    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
1473    /// with a Hash-based implementation.
1474    pub fn full_iterator<'a: 'b, 'b>(
1475        &'a self,
1476        mode: IteratorMode,
1477    ) -> DBIteratorWithThreadMode<'b, Self> {
1478        let mut opts = ReadOptions::default();
1479        opts.set_total_order_seek(true);
1480        DBIteratorWithThreadMode::new(self, opts, mode)
1481    }
1482
1483    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1484        &'a self,
1485        prefix: P,
1486    ) -> DBIteratorWithThreadMode<'b, Self> {
1487        let mut opts = ReadOptions::default();
1488        opts.set_prefix_same_as_start(true);
1489        DBIteratorWithThreadMode::new(
1490            self,
1491            opts,
1492            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1493        )
1494    }
1495
1496    pub fn iterator_cf<'a: 'b, 'b>(
1497        &'a self,
1498        cf_handle: &impl AsColumnFamilyRef,
1499        mode: IteratorMode,
1500    ) -> DBIteratorWithThreadMode<'b, Self> {
1501        let opts = ReadOptions::default();
1502        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1503    }
1504
1505    pub fn full_iterator_cf<'a: 'b, 'b>(
1506        &'a self,
1507        cf_handle: &impl AsColumnFamilyRef,
1508        mode: IteratorMode,
1509    ) -> DBIteratorWithThreadMode<'b, Self> {
1510        let mut opts = ReadOptions::default();
1511        opts.set_total_order_seek(true);
1512        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1513    }
1514
1515    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1516        &'a self,
1517        cf_handle: &impl AsColumnFamilyRef,
1518        prefix: P,
1519    ) -> DBIteratorWithThreadMode<'a, Self> {
1520        let mut opts = ReadOptions::default();
1521        opts.set_prefix_same_as_start(true);
1522        DBIteratorWithThreadMode::<'a, Self>::new_cf(
1523            self,
1524            cf_handle.inner(),
1525            opts,
1526            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1527        )
1528    }
1529
1530    /// Opens a raw iterator over the database, using the default read options
1531    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1532        let opts = ReadOptions::default();
1533        DBRawIteratorWithThreadMode::new(self, opts)
1534    }
1535
1536    /// Opens a raw iterator over the given column family, using the default read options
1537    pub fn raw_iterator_cf<'a: 'b, 'b>(
1538        &'a self,
1539        cf_handle: &impl AsColumnFamilyRef,
1540    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1541        let opts = ReadOptions::default();
1542        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1543    }
1544
1545    /// Opens a raw iterator over the database, using the given read options
1546    pub fn raw_iterator_opt<'a: 'b, 'b>(
1547        &'a self,
1548        readopts: ReadOptions,
1549    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1550        DBRawIteratorWithThreadMode::new(self, readopts)
1551    }
1552
1553    /// Opens a raw iterator over the given column family, using the given read options
1554    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1555        &'a self,
1556        cf_handle: &impl AsColumnFamilyRef,
1557        readopts: ReadOptions,
1558    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1559        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1560    }
1561
1562    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
1563        SnapshotWithThreadMode::<Self>::new(self)
1564    }
1565
1566    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1567    where
1568        K: AsRef<[u8]>,
1569        V: AsRef<[u8]>,
1570    {
1571        let key = key.as_ref();
1572        let value = value.as_ref();
1573
1574        unsafe {
1575            ffi_try!(ffi::rocksdb_put(
1576                self.inner.inner(),
1577                writeopts.inner,
1578                key.as_ptr() as *const c_char,
1579                key.len() as size_t,
1580                value.as_ptr() as *const c_char,
1581                value.len() as size_t,
1582            ));
1583            Ok(())
1584        }
1585    }
1586
1587    pub fn put_cf_opt<K, V>(
1588        &self,
1589        cf: &impl AsColumnFamilyRef,
1590        key: K,
1591        value: V,
1592        writeopts: &WriteOptions,
1593    ) -> Result<(), Error>
1594    where
1595        K: AsRef<[u8]>,
1596        V: AsRef<[u8]>,
1597    {
1598        let key = key.as_ref();
1599        let value = value.as_ref();
1600
1601        unsafe {
1602            ffi_try!(ffi::rocksdb_put_cf(
1603                self.inner.inner(),
1604                writeopts.inner,
1605                cf.inner(),
1606                key.as_ptr() as *const c_char,
1607                key.len() as size_t,
1608                value.as_ptr() as *const c_char,
1609                value.len() as size_t,
1610            ));
1611            Ok(())
1612        }
1613    }
1614
1615    /// Set the database entry for "key" to "value" with WriteOptions.
1616    /// If "key" already exists, it will coexist with previous entry.
1617    /// `Get` with a timestamp ts specified in ReadOptions will return
1618    /// the most recent key/value whose timestamp is smaller than or equal to ts.
1619    /// Takes an additional argument `ts` as the timestamp.
1620    /// Note: the DB must be opened with user defined timestamp enabled.
1621    pub fn put_with_ts_opt<K, V, S>(
1622        &self,
1623        key: K,
1624        ts: S,
1625        value: V,
1626        writeopts: &WriteOptions,
1627    ) -> Result<(), Error>
1628    where
1629        K: AsRef<[u8]>,
1630        V: AsRef<[u8]>,
1631        S: AsRef<[u8]>,
1632    {
1633        let key = key.as_ref();
1634        let value = value.as_ref();
1635        let ts = ts.as_ref();
1636        unsafe {
1637            ffi_try!(ffi::rocksdb_put_with_ts(
1638                self.inner.inner(),
1639                writeopts.inner,
1640                key.as_ptr() as *const c_char,
1641                key.len() as size_t,
1642                ts.as_ptr() as *const c_char,
1643                ts.len() as size_t,
1644                value.as_ptr() as *const c_char,
1645                value.len() as size_t,
1646            ));
1647            Ok(())
1648        }
1649    }
1650
1651    /// Put with timestamp in a specific column family with WriteOptions.
1652    /// If "key" already exists, it will coexist with previous entry.
1653    /// `Get` with a timestamp ts specified in ReadOptions will return
1654    /// the most recent key/value whose timestamp is smaller than or equal to ts.
1655    /// Takes an additional argument `ts` as the timestamp.
1656    /// Note: the DB must be opened with user defined timestamp enabled.
1657    pub fn put_cf_with_ts_opt<K, V, S>(
1658        &self,
1659        cf: &impl AsColumnFamilyRef,
1660        key: K,
1661        ts: S,
1662        value: V,
1663        writeopts: &WriteOptions,
1664    ) -> Result<(), Error>
1665    where
1666        K: AsRef<[u8]>,
1667        V: AsRef<[u8]>,
1668        S: AsRef<[u8]>,
1669    {
1670        let key = key.as_ref();
1671        let value = value.as_ref();
1672        let ts = ts.as_ref();
1673        unsafe {
1674            ffi_try!(ffi::rocksdb_put_cf_with_ts(
1675                self.inner.inner(),
1676                writeopts.inner,
1677                cf.inner(),
1678                key.as_ptr() as *const c_char,
1679                key.len() as size_t,
1680                ts.as_ptr() as *const c_char,
1681                ts.len() as size_t,
1682                value.as_ptr() as *const c_char,
1683                value.len() as size_t,
1684            ));
1685            Ok(())
1686        }
1687    }
1688
1689    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1690    where
1691        K: AsRef<[u8]>,
1692        V: AsRef<[u8]>,
1693    {
1694        let key = key.as_ref();
1695        let value = value.as_ref();
1696
1697        unsafe {
1698            ffi_try!(ffi::rocksdb_merge(
1699                self.inner.inner(),
1700                writeopts.inner,
1701                key.as_ptr() as *const c_char,
1702                key.len() as size_t,
1703                value.as_ptr() as *const c_char,
1704                value.len() as size_t,
1705            ));
1706            Ok(())
1707        }
1708    }
1709
1710    pub fn merge_cf_opt<K, V>(
1711        &self,
1712        cf: &impl AsColumnFamilyRef,
1713        key: K,
1714        value: V,
1715        writeopts: &WriteOptions,
1716    ) -> Result<(), Error>
1717    where
1718        K: AsRef<[u8]>,
1719        V: AsRef<[u8]>,
1720    {
1721        let key = key.as_ref();
1722        let value = value.as_ref();
1723
1724        unsafe {
1725            ffi_try!(ffi::rocksdb_merge_cf(
1726                self.inner.inner(),
1727                writeopts.inner,
1728                cf.inner(),
1729                key.as_ptr() as *const c_char,
1730                key.len() as size_t,
1731                value.as_ptr() as *const c_char,
1732                value.len() as size_t,
1733            ));
1734            Ok(())
1735        }
1736    }
1737
1738    pub fn delete_opt<K: AsRef<[u8]>>(
1739        &self,
1740        key: K,
1741        writeopts: &WriteOptions,
1742    ) -> Result<(), Error> {
1743        let key = key.as_ref();
1744
1745        unsafe {
1746            ffi_try!(ffi::rocksdb_delete(
1747                self.inner.inner(),
1748                writeopts.inner,
1749                key.as_ptr() as *const c_char,
1750                key.len() as size_t,
1751            ));
1752            Ok(())
1753        }
1754    }
1755
1756    pub fn delete_cf_opt<K: AsRef<[u8]>>(
1757        &self,
1758        cf: &impl AsColumnFamilyRef,
1759        key: K,
1760        writeopts: &WriteOptions,
1761    ) -> Result<(), Error> {
1762        let key = key.as_ref();
1763
1764        unsafe {
1765            ffi_try!(ffi::rocksdb_delete_cf(
1766                self.inner.inner(),
1767                writeopts.inner,
1768                cf.inner(),
1769                key.as_ptr() as *const c_char,
1770                key.len() as size_t,
1771            ));
1772            Ok(())
1773        }
1774    }
1775
1776    /// Remove the database entry (if any) for "key" with WriteOptions.
1777    /// Takes an additional argument `ts` as the timestamp.
1778    /// Note: the DB must be opened with user defined timestamp enabled.
1779    pub fn delete_with_ts_opt<K, S>(
1780        &self,
1781        key: K,
1782        ts: S,
1783        writeopts: &WriteOptions,
1784    ) -> Result<(), Error>
1785    where
1786        K: AsRef<[u8]>,
1787        S: AsRef<[u8]>,
1788    {
1789        let key = key.as_ref();
1790        let ts = ts.as_ref();
1791        unsafe {
1792            ffi_try!(ffi::rocksdb_delete_with_ts(
1793                self.inner.inner(),
1794                writeopts.inner,
1795                key.as_ptr() as *const c_char,
1796                key.len() as size_t,
1797                ts.as_ptr() as *const c_char,
1798                ts.len() as size_t,
1799            ));
1800            Ok(())
1801        }
1802    }
1803
1804    /// Delete with timestamp in a specific column family with WriteOptions.
1805    /// Takes an additional argument `ts` as the timestamp.
1806    /// Note: the DB must be opened with user defined timestamp enabled.
1807    pub fn delete_cf_with_ts_opt<K, S>(
1808        &self,
1809        cf: &impl AsColumnFamilyRef,
1810        key: K,
1811        ts: S,
1812        writeopts: &WriteOptions,
1813    ) -> Result<(), Error>
1814    where
1815        K: AsRef<[u8]>,
1816        S: AsRef<[u8]>,
1817    {
1818        let key = key.as_ref();
1819        let ts = ts.as_ref();
1820        unsafe {
1821            ffi_try!(ffi::rocksdb_delete_cf_with_ts(
1822                self.inner.inner(),
1823                writeopts.inner,
1824                cf.inner(),
1825                key.as_ptr() as *const c_char,
1826                key.len() as size_t,
1827                ts.as_ptr() as *const c_char,
1828                ts.len() as size_t,
1829            ));
1830            Ok(())
1831        }
1832    }
1833
1834    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1835    where
1836        K: AsRef<[u8]>,
1837        V: AsRef<[u8]>,
1838    {
1839        self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1840    }
1841
1842    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1843    where
1844        K: AsRef<[u8]>,
1845        V: AsRef<[u8]>,
1846    {
1847        self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1848    }
1849
1850    /// Set the database entry for "key" to "value".
1851    /// If "key" already exists, it will coexist with previous entry.
1852    /// `Get` with a timestamp ts specified in ReadOptions will return
1853    /// the most recent key/value whose timestamp is smaller than or equal to ts.
1854    /// Takes an additional argument `ts` as the timestamp.
1855    /// Note: the DB must be opened with user defined timestamp enabled.
1856    pub fn put_with_ts<K, V, S>(&self, key: K, ts: S, value: V) -> Result<(), Error>
1857    where
1858        K: AsRef<[u8]>,
1859        V: AsRef<[u8]>,
1860        S: AsRef<[u8]>,
1861    {
1862        self.put_with_ts_opt(
1863            key.as_ref(),
1864            ts.as_ref(),
1865            value.as_ref(),
1866            &WriteOptions::default(),
1867        )
1868    }
1869
1870    /// Put with timestamp in a specific column family.
1871    /// If "key" already exists, it will coexist with previous entry.
1872    /// `Get` with a timestamp ts specified in ReadOptions will return
1873    /// the most recent key/value whose timestamp is smaller than or equal to ts.
1874    /// Takes an additional argument `ts` as the timestamp.
1875    /// Note: the DB must be opened with user defined timestamp enabled.
1876    pub fn put_cf_with_ts<K, V, S>(
1877        &self,
1878        cf: &impl AsColumnFamilyRef,
1879        key: K,
1880        ts: S,
1881        value: V,
1882    ) -> Result<(), Error>
1883    where
1884        K: AsRef<[u8]>,
1885        V: AsRef<[u8]>,
1886        S: AsRef<[u8]>,
1887    {
1888        self.put_cf_with_ts_opt(
1889            cf,
1890            key.as_ref(),
1891            ts.as_ref(),
1892            value.as_ref(),
1893            &WriteOptions::default(),
1894        )
1895    }
1896
1897    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1898    where
1899        K: AsRef<[u8]>,
1900        V: AsRef<[u8]>,
1901    {
1902        self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1903    }
1904
1905    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1906    where
1907        K: AsRef<[u8]>,
1908        V: AsRef<[u8]>,
1909    {
1910        self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1911    }
1912
1913    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1914        self.delete_opt(key.as_ref(), &WriteOptions::default())
1915    }
1916
1917    pub fn delete_cf<K: AsRef<[u8]>>(
1918        &self,
1919        cf: &impl AsColumnFamilyRef,
1920        key: K,
1921    ) -> Result<(), Error> {
1922        self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1923    }
1924
1925    /// Remove the database entry (if any) for "key".
1926    /// Takes an additional argument `ts` as the timestamp.
1927    /// Note: the DB must be opened with user defined timestamp enabled.
1928    pub fn delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
1929        &self,
1930        key: K,
1931        ts: S,
1932    ) -> Result<(), Error> {
1933        self.delete_with_ts_opt(key.as_ref(), ts.as_ref(), &WriteOptions::default())
1934    }
1935
1936    /// Delete with timestamp in a specific column family.
1937    /// Takes an additional argument `ts` as the timestamp.
1938    /// Note: the DB must be opened with user defined timestamp enabled.
1939    pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
1940        &self,
1941        cf: &impl AsColumnFamilyRef,
1942        key: K,
1943        ts: S,
1944    ) -> Result<(), Error> {
1945        self.delete_cf_with_ts_opt(cf, key.as_ref(), ts.as_ref(), &WriteOptions::default())
1946    }
1947
1948    /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
1949    pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1950        unsafe {
1951            let start = start.as_ref().map(AsRef::as_ref);
1952            let end = end.as_ref().map(AsRef::as_ref);
1953
1954            ffi::rocksdb_compact_range(
1955                self.inner.inner(),
1956                opt_bytes_to_ptr(start),
1957                start.map_or(0, <[u8]>::len) as size_t,
1958                opt_bytes_to_ptr(end),
1959                end.map_or(0, <[u8]>::len) as size_t,
1960            );
1961        }
1962    }
1963
1964    /// Same as `compact_range` but with custom options.
1965    pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1966        &self,
1967        start: Option<S>,
1968        end: Option<E>,
1969        opts: &CompactOptions,
1970    ) {
1971        unsafe {
1972            let start = start.as_ref().map(AsRef::as_ref);
1973            let end = end.as_ref().map(AsRef::as_ref);
1974
1975            ffi::rocksdb_compact_range_opt(
1976                self.inner.inner(),
1977                opts.inner,
1978                opt_bytes_to_ptr(start),
1979                start.map_or(0, <[u8]>::len) as size_t,
1980                opt_bytes_to_ptr(end),
1981                end.map_or(0, <[u8]>::len) as size_t,
1982            );
1983        }
1984    }
1985
1986    /// Runs a manual compaction on the Range of keys given on the
1987    /// given column family. This is not likely to be needed for typical usage.
1988    pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1989        &self,
1990        cf: &impl AsColumnFamilyRef,
1991        start: Option<S>,
1992        end: Option<E>,
1993    ) {
1994        unsafe {
1995            let start = start.as_ref().map(AsRef::as_ref);
1996            let end = end.as_ref().map(AsRef::as_ref);
1997
1998            ffi::rocksdb_compact_range_cf(
1999                self.inner.inner(),
2000                cf.inner(),
2001                opt_bytes_to_ptr(start),
2002                start.map_or(0, <[u8]>::len) as size_t,
2003                opt_bytes_to_ptr(end),
2004                end.map_or(0, <[u8]>::len) as size_t,
2005            );
2006        }
2007    }
2008
2009    /// Same as `compact_range_cf` but with custom options.
2010    pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2011        &self,
2012        cf: &impl AsColumnFamilyRef,
2013        start: Option<S>,
2014        end: Option<E>,
2015        opts: &CompactOptions,
2016    ) {
2017        unsafe {
2018            let start = start.as_ref().map(AsRef::as_ref);
2019            let end = end.as_ref().map(AsRef::as_ref);
2020
2021            ffi::rocksdb_compact_range_cf_opt(
2022                self.inner.inner(),
2023                cf.inner(),
2024                opts.inner,
2025                opt_bytes_to_ptr(start),
2026                start.map_or(0, <[u8]>::len) as size_t,
2027                opt_bytes_to_ptr(end),
2028                end.map_or(0, <[u8]>::len) as size_t,
2029            );
2030        }
2031    }
2032
2033    /// Wait for all flush and compactions jobs to finish. Jobs to wait include the
2034    /// unscheduled (queued, but not scheduled yet).
2035    ///
2036    /// NOTE: This may also never return if there's sufficient ongoing writes that
2037    /// keeps flush and compaction going without stopping. The user would have to
2038    /// cease all the writes to DB to make this eventually return in a stable
2039    /// state. The user may also use timeout option in WaitForCompactOptions to
2040    /// make this stop waiting and return when timeout expires.
2041    pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
2042        unsafe {
2043            ffi_try!(ffi::rocksdb_wait_for_compact(
2044                self.inner.inner(),
2045                opts.inner
2046            ));
2047        }
2048        Ok(())
2049    }
2050
2051    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
2052        let copts = convert_options(opts)?;
2053        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2054        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2055        let count = opts.len() as i32;
2056        unsafe {
2057            ffi_try!(ffi::rocksdb_set_options(
2058                self.inner.inner(),
2059                count,
2060                cnames.as_ptr(),
2061                cvalues.as_ptr(),
2062            ));
2063        }
2064        Ok(())
2065    }
2066
2067    pub fn set_options_cf(
2068        &self,
2069        cf: &impl AsColumnFamilyRef,
2070        opts: &[(&str, &str)],
2071    ) -> Result<(), Error> {
2072        let copts = convert_options(opts)?;
2073        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2074        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2075        let count = opts.len() as i32;
2076        unsafe {
2077            ffi_try!(ffi::rocksdb_set_options_cf(
2078                self.inner.inner(),
2079                cf.inner(),
2080                count,
2081                cnames.as_ptr(),
2082                cvalues.as_ptr(),
2083            ));
2084        }
2085        Ok(())
2086    }
2087
2088    /// Implementation for property_value et al methods.
2089    ///
2090    /// `name` is the name of the property.  It will be converted into a CString
2091    /// and passed to `get_property` as argument.  `get_property` reads the
2092    /// specified property and either returns NULL or a pointer to a C allocated
2093    /// string; this method takes ownership of that string and will free it at
2094    /// the end. That string is parsed using `parse` callback which produces
2095    /// the returned result.
2096    fn property_value_impl<R>(
2097        name: impl CStrLike,
2098        get_property: impl FnOnce(*const c_char) -> *mut c_char,
2099        parse: impl FnOnce(&str) -> Result<R, Error>,
2100    ) -> Result<Option<R>, Error> {
2101        let value = match name.bake() {
2102            Ok(prop_name) => get_property(prop_name.as_ptr()),
2103            Err(e) => {
2104                return Err(Error::new(format!(
2105                    "Failed to convert property name to CString: {e}"
2106                )));
2107            }
2108        };
2109        if value.is_null() {
2110            return Ok(None);
2111        }
2112        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
2113            Ok(s) => parse(s).map(|value| Some(value)),
2114            Err(e) => Err(Error::new(format!(
2115                "Failed to convert property value to string: {e}"
2116            ))),
2117        };
2118        unsafe {
2119            ffi::rocksdb_free(value as *mut c_void);
2120        }
2121        result
2122    }
2123
2124    /// Retrieves a RocksDB property by name.
2125    ///
2126    /// Full list of properties could be find
2127    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
2128    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
2129        Self::property_value_impl(
2130            name,
2131            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
2132            |str_value| Ok(str_value.to_owned()),
2133        )
2134    }
2135
2136    /// Retrieves a RocksDB property by name, for a specific column family.
2137    ///
2138    /// Full list of properties could be find
2139    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
2140    pub fn property_value_cf(
2141        &self,
2142        cf: &impl AsColumnFamilyRef,
2143        name: impl CStrLike,
2144    ) -> Result<Option<String>, Error> {
2145        Self::property_value_impl(
2146            name,
2147            |prop_name| unsafe {
2148                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
2149            },
2150            |str_value| Ok(str_value.to_owned()),
2151        )
2152    }
2153
2154    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
2155        value.parse::<u64>().map_err(|err| {
2156            Error::new(format!(
2157                "Failed to convert property value {value} to int: {err}"
2158            ))
2159        })
2160    }
2161
2162    /// Retrieves a RocksDB property and casts it to an integer.
2163    ///
2164    /// Full list of properties that return int values could be find
2165    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
2166    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
2167        Self::property_value_impl(
2168            name,
2169            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
2170            Self::parse_property_int_value,
2171        )
2172    }
2173
2174    /// Retrieves a RocksDB property for a specific column family and casts it to an integer.
2175    ///
2176    /// Full list of properties that return int values could be find
2177    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
2178    pub fn property_int_value_cf(
2179        &self,
2180        cf: &impl AsColumnFamilyRef,
2181        name: impl CStrLike,
2182    ) -> Result<Option<u64>, Error> {
2183        Self::property_value_impl(
2184            name,
2185            |prop_name| unsafe {
2186                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
2187            },
2188            Self::parse_property_int_value,
2189        )
2190    }
2191
2192    /// The sequence number of the most recent transaction.
2193    pub fn latest_sequence_number(&self) -> u64 {
2194        unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
2195    }
2196
2197    /// Return the approximate file system space used by keys in each ranges.
2198    ///
2199    /// Note that the returned sizes measure file system space usage, so
2200    /// if the user data compresses by a factor of ten, the returned
2201    /// sizes will be one-tenth the size of the corresponding user data size.
2202    ///
2203    /// Due to lack of abi, only data flushed to disk is taken into account.
2204    pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
2205        self.get_approximate_sizes_cfopt(None::<&ColumnFamily>, ranges)
2206    }
2207
2208    pub fn get_approximate_sizes_cf(
2209        &self,
2210        cf: &impl AsColumnFamilyRef,
2211        ranges: &[Range],
2212    ) -> Vec<u64> {
2213        self.get_approximate_sizes_cfopt(Some(cf), ranges)
2214    }
2215
2216    fn get_approximate_sizes_cfopt(
2217        &self,
2218        cf: Option<&impl AsColumnFamilyRef>,
2219        ranges: &[Range],
2220    ) -> Vec<u64> {
2221        let start_keys: Vec<*const c_char> = ranges
2222            .iter()
2223            .map(|x| x.start_key.as_ptr() as *const c_char)
2224            .collect();
2225        let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
2226        let end_keys: Vec<*const c_char> = ranges
2227            .iter()
2228            .map(|x| x.end_key.as_ptr() as *const c_char)
2229            .collect();
2230        let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
2231        let mut sizes: Vec<u64> = vec![0; ranges.len()];
2232        let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
2233            ranges.len() as i32,
2234            start_keys.as_ptr(),
2235            start_key_lens.as_ptr(),
2236            end_keys.as_ptr(),
2237            end_key_lens.as_ptr(),
2238            sizes.as_mut_ptr(),
2239        );
2240        let mut err: *mut c_char = ptr::null_mut();
2241        match cf {
2242            None => unsafe {
2243                ffi::rocksdb_approximate_sizes(
2244                    self.inner.inner(),
2245                    n,
2246                    start_key_ptr,
2247                    start_key_len_ptr,
2248                    end_key_ptr,
2249                    end_key_len_ptr,
2250                    size_ptr,
2251                    &mut err,
2252                );
2253            },
2254            Some(cf) => unsafe {
2255                ffi::rocksdb_approximate_sizes_cf(
2256                    self.inner.inner(),
2257                    cf.inner(),
2258                    n,
2259                    start_key_ptr,
2260                    start_key_len_ptr,
2261                    end_key_ptr,
2262                    end_key_len_ptr,
2263                    size_ptr,
2264                    &mut err,
2265                );
2266            },
2267        }
2268        sizes
2269    }
2270
2271    /// Iterate over batches of write operations since a given sequence.
2272    ///
2273    /// Produce an iterator that will provide the batches of write operations
2274    /// that have occurred since the given sequence (see
2275    /// `latest_sequence_number()`). Use the provided iterator to retrieve each
2276    /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
2277    /// deletes using the `WriteBatch::iterate()` function.
2278    ///
2279    /// Calling `get_updates_since()` with a sequence number that is out of
2280    /// bounds will return an error.
2281    pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
2282        unsafe {
2283            // rocksdb_wal_readoptions_t does not appear to have any functions
2284            // for creating and destroying it; fortunately we can pass a nullptr
2285            // here to get the default behavior
2286            let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
2287            let iter = ffi_try!(ffi::rocksdb_get_updates_since(
2288                self.inner.inner(),
2289                seq_number,
2290                opts
2291            ));
2292            Ok(DBWALIterator {
2293                inner: iter,
2294                start_seq_number: seq_number,
2295            })
2296        }
2297    }
2298
2299    /// Tries to catch up with the primary by reading as much as possible from the
2300    /// log files.
2301    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
2302        unsafe {
2303            ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
2304        }
2305        Ok(())
2306    }
2307
2308    /// Loads a list of external SST files created with SstFileWriter into the DB with default opts
2309    pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
2310        let opts = IngestExternalFileOptions::default();
2311        self.ingest_external_file_opts(&opts, paths)
2312    }
2313
2314    /// Loads a list of external SST files created with SstFileWriter into the DB
2315    pub fn ingest_external_file_opts<P: AsRef<Path>>(
2316        &self,
2317        opts: &IngestExternalFileOptions,
2318        paths: Vec<P>,
2319    ) -> Result<(), Error> {
2320        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
2321        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
2322
2323        self.ingest_external_file_raw(opts, &paths_v, &cpaths)
2324    }
2325
2326    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
2327    /// with default opts
2328    pub fn ingest_external_file_cf<P: AsRef<Path>>(
2329        &self,
2330        cf: &impl AsColumnFamilyRef,
2331        paths: Vec<P>,
2332    ) -> Result<(), Error> {
2333        let opts = IngestExternalFileOptions::default();
2334        self.ingest_external_file_cf_opts(cf, &opts, paths)
2335    }
2336
2337    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
2338    pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
2339        &self,
2340        cf: &impl AsColumnFamilyRef,
2341        opts: &IngestExternalFileOptions,
2342        paths: Vec<P>,
2343    ) -> Result<(), Error> {
2344        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
2345        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
2346
2347        self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
2348    }
2349
2350    fn ingest_external_file_raw(
2351        &self,
2352        opts: &IngestExternalFileOptions,
2353        paths_v: &[CString],
2354        cpaths: &[*const c_char],
2355    ) -> Result<(), Error> {
2356        unsafe {
2357            ffi_try!(ffi::rocksdb_ingest_external_file(
2358                self.inner.inner(),
2359                cpaths.as_ptr(),
2360                paths_v.len(),
2361                opts.inner.cast_const()
2362            ));
2363            Ok(())
2364        }
2365    }
2366
2367    fn ingest_external_file_raw_cf(
2368        &self,
2369        cf: &impl AsColumnFamilyRef,
2370        opts: &IngestExternalFileOptions,
2371        paths_v: &[CString],
2372        cpaths: &[*const c_char],
2373    ) -> Result<(), Error> {
2374        unsafe {
2375            ffi_try!(ffi::rocksdb_ingest_external_file_cf(
2376                self.inner.inner(),
2377                cf.inner(),
2378                cpaths.as_ptr(),
2379                paths_v.len(),
2380                opts.inner.cast_const()
2381            ));
2382            Ok(())
2383        }
2384    }
2385
2386    /// Obtains the LSM-tree meta data of the default column family of the DB
2387    pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
2388        unsafe {
2389            let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
2390
2391            let metadata = ColumnFamilyMetaData {
2392                size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2393                name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2394                file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2395            };
2396
2397            // destroy
2398            ffi::rocksdb_column_family_metadata_destroy(ptr);
2399
2400            // return
2401            metadata
2402        }
2403    }
2404
2405    /// Obtains the LSM-tree meta data of the specified column family of the DB
2406    pub fn get_column_family_metadata_cf(
2407        &self,
2408        cf: &impl AsColumnFamilyRef,
2409    ) -> ColumnFamilyMetaData {
2410        unsafe {
2411            let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
2412
2413            let metadata = ColumnFamilyMetaData {
2414                size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2415                name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2416                file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2417            };
2418
2419            // destroy
2420            ffi::rocksdb_column_family_metadata_destroy(ptr);
2421
2422            // return
2423            metadata
2424        }
2425    }
2426
2427    /// Returns a list of all table files with their level, start key
2428    /// and end key
2429    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
2430        unsafe {
2431            let files = ffi::rocksdb_livefiles(self.inner.inner());
2432            if files.is_null() {
2433                Err(Error::new("Could not get live files".to_owned()))
2434            } else {
2435                let n = ffi::rocksdb_livefiles_count(files);
2436
2437                let mut livefiles = Vec::with_capacity(n as usize);
2438                let mut key_size: usize = 0;
2439
2440                for i in 0..n {
2441                    let column_family_name =
2442                        from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2443                    let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2444                    let size = ffi::rocksdb_livefiles_size(files, i);
2445                    let level = ffi::rocksdb_livefiles_level(files, i);
2446
2447                    // get smallest key inside file
2448                    let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2449                    let smallest_key = raw_data(smallest_key, key_size);
2450
2451                    // get largest key inside file
2452                    let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2453                    let largest_key = raw_data(largest_key, key_size);
2454
2455                    livefiles.push(LiveFile {
2456                        column_family_name,
2457                        name,
2458                        size,
2459                        level,
2460                        start_key: smallest_key,
2461                        end_key: largest_key,
2462                        num_entries: ffi::rocksdb_livefiles_entries(files, i),
2463                        num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2464                    });
2465                }
2466
2467                // destroy livefiles metadata(s)
2468                ffi::rocksdb_livefiles_destroy(files);
2469
2470                // return
2471                Ok(livefiles)
2472            }
2473        }
2474    }
2475
2476    /// Delete sst files whose keys are entirely in the given range.
2477    ///
2478    /// Could leave some keys in the range which are in files which are not
2479    /// entirely in the range.
2480    ///
2481    /// Note: L0 files are left regardless of whether they're in the range.
2482    ///
2483    /// SnapshotWithThreadModes before the delete might not see the data in the given range.
2484    pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2485        let from = from.as_ref();
2486        let to = to.as_ref();
2487        unsafe {
2488            ffi_try!(ffi::rocksdb_delete_file_in_range(
2489                self.inner.inner(),
2490                from.as_ptr() as *const c_char,
2491                from.len() as size_t,
2492                to.as_ptr() as *const c_char,
2493                to.len() as size_t,
2494            ));
2495            Ok(())
2496        }
2497    }
2498
2499    /// Same as `delete_file_in_range` but only for specific column family
2500    pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2501        &self,
2502        cf: &impl AsColumnFamilyRef,
2503        from: K,
2504        to: K,
2505    ) -> Result<(), Error> {
2506        let from = from.as_ref();
2507        let to = to.as_ref();
2508        unsafe {
2509            ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2510                self.inner.inner(),
2511                cf.inner(),
2512                from.as_ptr() as *const c_char,
2513                from.len() as size_t,
2514                to.as_ptr() as *const c_char,
2515                to.len() as size_t,
2516            ));
2517            Ok(())
2518        }
2519    }
2520
2521    /// Request stopping background work, if wait is true wait until it's done.
2522    pub fn cancel_all_background_work(&self, wait: bool) {
2523        unsafe {
2524            ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2525        }
2526    }
2527
2528    fn drop_column_family<C>(
2529        &self,
2530        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2531        cf: C,
2532    ) -> Result<(), Error> {
2533        unsafe {
2534            // first mark the column family as dropped
2535            ffi_try!(ffi::rocksdb_drop_column_family(
2536                self.inner.inner(),
2537                cf_inner
2538            ));
2539        }
2540        // then finally reclaim any resources (mem, files) by destroying the only single column
2541        // family handle by drop()-ing it
2542        drop(cf);
2543        Ok(())
2544    }
2545
2546    /// Increase the full_history_ts of column family. The new ts_low value should
2547    /// be newer than current full_history_ts value.
2548    /// If another thread updates full_history_ts_low concurrently to a higher
2549    /// timestamp than the requested ts_low, a try again error will be returned.
2550    pub fn increase_full_history_ts_low<S: AsRef<[u8]>>(
2551        &self,
2552        cf: &impl AsColumnFamilyRef,
2553        ts: S,
2554    ) -> Result<(), Error> {
2555        let ts = ts.as_ref();
2556        unsafe {
2557            ffi_try!(ffi::rocksdb_increase_full_history_ts_low(
2558                self.inner.inner(),
2559                cf.inner(),
2560                ts.as_ptr() as *const c_char,
2561                ts.len() as size_t,
2562            ));
2563            Ok(())
2564        }
2565    }
2566
2567    /// Get current full_history_ts value.
2568    pub fn get_full_history_ts_low(&self, cf: &impl AsColumnFamilyRef) -> Result<Vec<u8>, Error> {
2569        unsafe {
2570            let mut ts_lowlen = 0;
2571            let ts = ffi_try!(ffi::rocksdb_get_full_history_ts_low(
2572                self.inner.inner(),
2573                cf.inner(),
2574                &mut ts_lowlen,
2575            ));
2576
2577            if ts.is_null() {
2578                Err(Error::new("Could not get full_history_ts_low".to_owned()))
2579            } else {
2580                let mut vec = vec![0; ts_lowlen];
2581                ptr::copy_nonoverlapping(ts as *mut u8, vec.as_mut_ptr(), ts_lowlen);
2582                ffi::rocksdb_free(ts as *mut c_void);
2583                Ok(vec)
2584            }
2585        }
2586    }
2587
2588    /// Returns the DB identity. This is typically ASCII bytes, but that is not guaranteed.
2589    pub fn get_db_identity(&self) -> Result<Vec<u8>, Error> {
2590        unsafe {
2591            let mut length: usize = 0;
2592            let identity_ptr = ffi::rocksdb_get_db_identity(self.inner.inner(), &mut length);
2593            let identity_vec = raw_data(identity_ptr, length);
2594            ffi::rocksdb_free(identity_ptr as *mut c_void);
2595            // In RocksDB: get_db_identity copies a std::string so it should not fail, but
2596            // the API allows it to be overridden, so it might
2597            identity_vec.ok_or_else(|| Error::new("get_db_identity returned NULL".to_string()))
2598        }
2599    }
2600}
2601
2602impl<I: DBInner> DBCommon<SingleThreaded, I> {
2603    /// Creates column family with given name and options
2604    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2605        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2606        self.cfs
2607            .cfs
2608            .insert(name.as_ref().to_string(), ColumnFamily { inner });
2609        Ok(())
2610    }
2611
2612    /// Drops the column family with the given name
2613    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2614        match self.cfs.cfs.remove(name) {
2615            Some(cf) => self.drop_column_family(cf.inner, cf),
2616            _ => Err(Error::new(format!("Invalid column family: {name}"))),
2617        }
2618    }
2619
2620    /// Returns the underlying column family handle
2621    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2622        self.cfs.cfs.get(name)
2623    }
2624
2625    /// Returns the list of column families currently open
2626    pub fn cf_names(&self) -> Vec<String> {
2627        self.cfs.cfs.keys().cloned().collect()
2628    }
2629}
2630
2631impl<I: DBInner> DBCommon<MultiThreaded, I> {
2632    /// Creates column family with given name and options
2633    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2634        // Note that we acquire the cfs lock before inserting: otherwise we might race
2635        // another caller who observed the handle as missing.
2636        let mut cfs = self.cfs.cfs.write();
2637        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2638        cfs.insert(
2639            name.as_ref().to_string(),
2640            Arc::new(UnboundColumnFamily { inner }),
2641        );
2642        Ok(())
2643    }
2644
2645    /// Drops the column family with the given name by internally locking the inner column
2646    /// family map. This avoids needing `&mut self` reference
2647    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2648        match self.cfs.cfs.write().remove(name) {
2649            Some(cf) => self.drop_column_family(cf.inner, cf),
2650            _ => Err(Error::new(format!("Invalid column family: {name}"))),
2651        }
2652    }
2653
2654    /// Returns the underlying column family handle
2655    pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
2656        self.cfs
2657            .cfs
2658            .read()
2659            .get(name)
2660            .cloned()
2661            .map(UnboundColumnFamily::bound_column_family)
2662    }
2663
2664    /// Returns the list of column families currently open
2665    pub fn cf_names(&self) -> Vec<String> {
2666        self.cfs.cfs.read().keys().cloned().collect()
2667    }
2668}
2669
2670impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2671    fn drop(&mut self) {
2672        self.cfs.drop_all_cfs_internal();
2673    }
2674}
2675
2676impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2677    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2678        write!(f, "RocksDB {{ path: {} }}", self.path().display())
2679    }
2680}
2681
2682/// The metadata that describes a column family.
2683#[derive(Debug, Clone)]
2684pub struct ColumnFamilyMetaData {
2685    // The size of this column family in bytes, which is equal to the sum of
2686    // the file size of its "levels".
2687    pub size: u64,
2688    // The name of the column family.
2689    pub name: String,
2690    // The number of files in this column family.
2691    pub file_count: usize,
2692}
2693
2694/// The metadata that describes a SST file
2695#[derive(Debug, Clone)]
2696pub struct LiveFile {
2697    /// Name of the column family the file belongs to
2698    pub column_family_name: String,
2699    /// Name of the file
2700    pub name: String,
2701    /// Size of the file
2702    pub size: usize,
2703    /// Level at which this file resides
2704    pub level: i32,
2705    /// Smallest user defined key in the file
2706    pub start_key: Option<Vec<u8>>,
2707    /// Largest user defined key in the file
2708    pub end_key: Option<Vec<u8>>,
2709    /// Number of entries/alive keys in the file
2710    pub num_entries: u64,
2711    /// Number of deletions/tomb key(s) in the file
2712    pub num_deletions: u64,
2713}
2714
2715fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2716    opts.iter()
2717        .map(|(name, value)| {
2718            let cname = match CString::new(name.as_bytes()) {
2719                Ok(cname) => cname,
2720                Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2721            };
2722            let cvalue = match CString::new(value.as_bytes()) {
2723                Ok(cvalue) => cvalue,
2724                Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2725            };
2726            Ok((cname, cvalue))
2727        })
2728        .collect()
2729}
2730
2731pub(crate) fn convert_values(
2732    values: Vec<*mut c_char>,
2733    values_sizes: Vec<usize>,
2734    errors: Vec<*mut c_char>,
2735) -> Vec<Result<Option<Vec<u8>>, Error>> {
2736    values
2737        .into_iter()
2738        .zip(values_sizes)
2739        .zip(errors)
2740        .map(|((v, s), e)| {
2741            if e.is_null() {
2742                let value = unsafe { crate::ffi_util::raw_data(v, s) };
2743                unsafe {
2744                    ffi::rocksdb_free(v as *mut c_void);
2745                }
2746                Ok(value)
2747            } else {
2748                Err(Error::new(crate::ffi_util::error_message(e)))
2749            }
2750        })
2751        .collect()
2752}