haizhi_rocksdb/
db.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use crate::{
17    checkpoint::ExportImportFilesMetaData,
18    column_family::AsColumnFamilyRef,
19    column_family::BoundColumnFamily,
20    column_family::UnboundColumnFamily,
21    db_options::OptionsMustOutliveDB,
22    ffi,
23    ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
24    ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
25    DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
26    IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
27    WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
28};
29
30use crate::ffi_util::CSlice;
31use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
32use std::collections::BTreeMap;
33use std::ffi::{CStr, CString};
34use std::fmt;
35use std::fs;
36use std::iter;
37use std::path::Path;
38use std::path::PathBuf;
39use std::ptr;
40use std::slice;
41use std::str;
42use std::sync::Arc;
43use std::sync::RwLock;
44use std::time::Duration;
45
46/// Marker trait to specify single or multi threaded column family alternations for
47/// [`DBWithThreadMode<T>`]
48///
49/// This arrangement makes differences in self mutability and return type in
50/// some of `DBWithThreadMode` methods.
51///
52/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
53/// also has a minimum set of not-encapsulated internal methods between
54/// [`SingleThreaded`] and [`MultiThreaded`].  These methods aren't expected to be
55/// called and defined externally.
56pub trait ThreadMode {
57    /// Internal implementation for storing column family handles
58    fn new_cf_map_internal(
59        cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
60    ) -> Self;
61    /// Internal implementation for dropping column family handles
62    fn drop_all_cfs_internal(&mut self);
63}
64
65/// Actual marker type for the marker trait `ThreadMode`, which holds
66/// a collection of column families without synchronization primitive, providing
67/// no overhead for the single-threaded column family alternations. The other
68/// mode is [`MultiThreaded`].
69///
70/// See [`DB`] for more details, including performance implications for each mode
71pub struct SingleThreaded {
72    pub(crate) cfs: BTreeMap<String, ColumnFamily>,
73}
74
75/// Actual marker type for the marker trait `ThreadMode`, which holds
76/// a collection of column families wrapped in a RwLock to be mutated
77/// concurrently. The other mode is [`SingleThreaded`].
78///
79/// See [`DB`] for more details, including performance implications for each mode
80pub struct MultiThreaded {
81    pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
82}
83
84impl ThreadMode for SingleThreaded {
85    fn new_cf_map_internal(
86        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
87    ) -> Self {
88        Self {
89            cfs: cfs
90                .into_iter()
91                .map(|(n, c)| (n, ColumnFamily { inner: c }))
92                .collect(),
93        }
94    }
95
96    fn drop_all_cfs_internal(&mut self) {
97        // Cause all ColumnFamily objects to be Drop::drop()-ed.
98        self.cfs.clear();
99    }
100}
101
102impl ThreadMode for MultiThreaded {
103    fn new_cf_map_internal(
104        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
105    ) -> Self {
106        Self {
107            cfs: RwLock::new(
108                cfs.into_iter()
109                    .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
110                    .collect(),
111            ),
112        }
113    }
114
115    fn drop_all_cfs_internal(&mut self) {
116        // Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
117        self.cfs.write().unwrap().clear();
118    }
119}
120
121/// Get underlying `rocksdb_t`.
122pub trait DBInner {
123    fn inner(&self) -> *mut ffi::rocksdb_t;
124}
125
126/// A helper type to implement some common methods for [`DBWithThreadMode`]
127/// and [`OptimisticTransactionDB`].
128///
129/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
130pub struct DBCommon<T: ThreadMode, D: DBInner> {
131    pub(crate) inner: D,
132    cfs: T, // Column families are held differently depending on thread mode
133    path: PathBuf,
134    _outlive: Vec<OptionsMustOutliveDB>,
135}
136
137/// Minimal set of DB-related methods, intended to be generic over
138/// `DBWithThreadMode<T>`. Mainly used internally
139pub trait DBAccess {
140    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
141
142    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
143
144    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
145
146    unsafe fn create_iterator_cf(
147        &self,
148        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
149        readopts: &ReadOptions,
150    ) -> *mut ffi::rocksdb_iterator_t;
151
152    fn get_opt<K: AsRef<[u8]>>(
153        &self,
154        key: K,
155        readopts: &ReadOptions,
156    ) -> Result<Option<Vec<u8>>, Error>;
157
158    fn get_cf_opt<K: AsRef<[u8]>>(
159        &self,
160        cf: &impl AsColumnFamilyRef,
161        key: K,
162        readopts: &ReadOptions,
163    ) -> Result<Option<Vec<u8>>, Error>;
164
165    fn get_pinned_opt<K: AsRef<[u8]>>(
166        &self,
167        key: K,
168        readopts: &ReadOptions,
169    ) -> Result<Option<DBPinnableSlice>, Error>;
170
171    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
172        &self,
173        cf: &impl AsColumnFamilyRef,
174        key: K,
175        readopts: &ReadOptions,
176    ) -> Result<Option<DBPinnableSlice>, Error>;
177
178    fn multi_get_opt<K, I>(
179        &self,
180        keys: I,
181        readopts: &ReadOptions,
182    ) -> Vec<Result<Option<Vec<u8>>, Error>>
183    where
184        K: AsRef<[u8]>,
185        I: IntoIterator<Item = K>;
186
187    fn multi_get_cf_opt<'b, K, I, W>(
188        &self,
189        keys_cf: I,
190        readopts: &ReadOptions,
191    ) -> Vec<Result<Option<Vec<u8>>, Error>>
192    where
193        K: AsRef<[u8]>,
194        I: IntoIterator<Item = (&'b W, K)>,
195        W: AsColumnFamilyRef + 'b;
196}
197
198impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
199    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
200        ffi::rocksdb_create_snapshot(self.inner.inner())
201    }
202
203    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
204        ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
205    }
206
207    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
208        ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
209    }
210
211    unsafe fn create_iterator_cf(
212        &self,
213        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
214        readopts: &ReadOptions,
215    ) -> *mut ffi::rocksdb_iterator_t {
216        ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
217    }
218
219    fn get_opt<K: AsRef<[u8]>>(
220        &self,
221        key: K,
222        readopts: &ReadOptions,
223    ) -> Result<Option<Vec<u8>>, Error> {
224        self.get_opt(key, readopts)
225    }
226
227    fn get_cf_opt<K: AsRef<[u8]>>(
228        &self,
229        cf: &impl AsColumnFamilyRef,
230        key: K,
231        readopts: &ReadOptions,
232    ) -> Result<Option<Vec<u8>>, Error> {
233        self.get_cf_opt(cf, key, readopts)
234    }
235
236    fn get_pinned_opt<K: AsRef<[u8]>>(
237        &self,
238        key: K,
239        readopts: &ReadOptions,
240    ) -> Result<Option<DBPinnableSlice>, Error> {
241        self.get_pinned_opt(key, readopts)
242    }
243
244    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
245        &self,
246        cf: &impl AsColumnFamilyRef,
247        key: K,
248        readopts: &ReadOptions,
249    ) -> Result<Option<DBPinnableSlice>, Error> {
250        self.get_pinned_cf_opt(cf, key, readopts)
251    }
252
253    fn multi_get_opt<K, Iter>(
254        &self,
255        keys: Iter,
256        readopts: &ReadOptions,
257    ) -> Vec<Result<Option<Vec<u8>>, Error>>
258    where
259        K: AsRef<[u8]>,
260        Iter: IntoIterator<Item = K>,
261    {
262        self.multi_get_opt(keys, readopts)
263    }
264
265    fn multi_get_cf_opt<'b, K, Iter, W>(
266        &self,
267        keys_cf: Iter,
268        readopts: &ReadOptions,
269    ) -> Vec<Result<Option<Vec<u8>>, Error>>
270    where
271        K: AsRef<[u8]>,
272        Iter: IntoIterator<Item = (&'b W, K)>,
273        W: AsColumnFamilyRef + 'b,
274    {
275        self.multi_get_cf_opt(keys_cf, readopts)
276    }
277}
278
279pub struct Ranges<'a> {
280    start_key: &'a [u8],
281    end_key: &'a [u8],
282}
283
284impl<'a> Ranges<'a> {
285    pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Ranges<'a> {
286        Ranges { start_key, end_key }
287    }
288}
289
290pub struct DBWithThreadModeInner {
291    inner: *mut ffi::rocksdb_t,
292}
293
294impl DBInner for DBWithThreadModeInner {
295    fn inner(&self) -> *mut ffi::rocksdb_t {
296        self.inner
297    }
298}
299
300impl Drop for DBWithThreadModeInner {
301    fn drop(&mut self) {
302        unsafe {
303            ffi::rocksdb_close(self.inner);
304        }
305    }
306}
307
308/// A type alias to RocksDB database.
309///
310/// See crate level documentation for a simple usage example.
311/// See [`DBCommon`] for full list of methods.
312pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
313
314/// A type alias to DB instance type with the single-threaded column family
315/// creations/deletions
316///
317/// # Compatibility and multi-threaded mode
318///
319/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
320/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
321/// column family alternations.
322///
323/// # Limited performance implication for single-threaded mode
324///
325/// Even with [`SingleThreaded`], almost all of RocksDB operations is
326/// multi-threaded unless the underlying RocksDB instance is
327/// specifically configured otherwise. `SingleThreaded` only forces
328/// serialization of column family alternations by requiring `&mut self` of DB
329/// instance due to its wrapper implementation details.
330///
331/// # Multi-threaded mode
332///
333/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
334/// workload including multi-threaded column family alternations, costing the
335/// RwLock overhead inside `DB`.
336#[cfg(not(feature = "multi-threaded-cf"))]
337pub type DB = DBWithThreadMode<SingleThreaded>;
338
339#[cfg(feature = "multi-threaded-cf")]
340pub type DB = DBWithThreadMode<MultiThreaded>;
341
342// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
343// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
344// rocksdb internally does not rely on thread-local information for its user-exposed types.
345unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
346
347// Sync is similarly safe for many types because they do not expose interior mutability, and their
348// use within the rocksdb library is generally behind a const reference
349unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
350
351// Specifies whether open DB for read only.
352enum AccessType<'a> {
353    ReadWrite,
354    ReadOnly { error_if_log_file_exist: bool },
355    Secondary { secondary_path: &'a Path },
356    WithTTL { ttl: Duration },
357}
358
359impl<T: ThreadMode> DBWithThreadMode<T> {
360    /// Opens a database with default options.
361    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
362        let mut opts = Options::default();
363        opts.create_if_missing(true);
364        Self::open(&opts, path)
365    }
366
367    /// Opens the database with the specified options.
368    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
369        Self::open_cf(opts, path, None::<&str>)
370    }
371
372    /// Opens the database for read only with the specified options.
373    pub fn open_for_read_only<P: AsRef<Path>>(
374        opts: &Options,
375        path: P,
376        error_if_log_file_exist: bool,
377    ) -> Result<Self, Error> {
378        Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
379    }
380
381    /// Opens the database as a secondary.
382    pub fn open_as_secondary<P: AsRef<Path>>(
383        opts: &Options,
384        primary_path: P,
385        secondary_path: P,
386    ) -> Result<Self, Error> {
387        Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
388    }
389
390    /// Opens the database with a Time to Live compaction filter.
391    pub fn open_with_ttl<P: AsRef<Path>>(
392        opts: &Options,
393        path: P,
394        ttl: Duration,
395    ) -> Result<Self, Error> {
396        Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
397    }
398
399    /// Opens the database with a Time to Live compaction filter and column family names.
400    ///
401    /// Column families opened using this function will be created with default `Options`.
402    pub fn open_cf_with_ttl<P, I, N>(
403        opts: &Options,
404        path: P,
405        cfs: I,
406        ttl: Duration,
407    ) -> Result<Self, Error>
408    where
409        P: AsRef<Path>,
410        I: IntoIterator<Item = N>,
411        N: AsRef<str>,
412    {
413        let cfs = cfs
414            .into_iter()
415            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
416
417        Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
418    }
419
420    /// Opens a database with the given database with a Time to Live compaction filter and
421    /// column family descriptors.
422    pub fn open_cf_descriptors_with_ttl<P, I>(
423        opts: &Options,
424        path: P,
425        cfs: I,
426        ttl: Duration,
427    ) -> Result<Self, Error>
428    where
429        P: AsRef<Path>,
430        I: IntoIterator<Item = ColumnFamilyDescriptor>,
431    {
432        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
433    }
434
435    /// Opens a database with the given database options and column family names.
436    ///
437    /// Column families opened using this function will be created with default `Options`.
438    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
439    where
440        P: AsRef<Path>,
441        I: IntoIterator<Item = N>,
442        N: AsRef<str>,
443    {
444        let cfs = cfs
445            .into_iter()
446            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
447
448        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
449    }
450
451    /// Opens a database with the given database options and column family names.
452    ///
453    /// Column families opened using given `Options`.
454    pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
455    where
456        P: AsRef<Path>,
457        I: IntoIterator<Item = (N, Options)>,
458        N: AsRef<str>,
459    {
460        let cfs = cfs
461            .into_iter()
462            .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
463
464        Self::open_cf_descriptors(opts, path, cfs)
465    }
466
467    /// Opens a database for read only with the given database options and column family names.
468    pub fn open_cf_for_read_only<P, I, N>(
469        opts: &Options,
470        path: P,
471        cfs: I,
472        error_if_log_file_exist: bool,
473    ) -> Result<Self, Error>
474    where
475        P: AsRef<Path>,
476        I: IntoIterator<Item = N>,
477        N: AsRef<str>,
478    {
479        let cfs = cfs
480            .into_iter()
481            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
482
483        Self::open_cf_descriptors_internal(
484            opts,
485            path,
486            cfs,
487            &AccessType::ReadOnly {
488                error_if_log_file_exist,
489            },
490        )
491    }
492
493    /// Opens a database for read only with the given database options and column family names.
494    pub fn open_cf_with_opts_for_read_only<P, I, N>(
495        db_opts: &Options,
496        path: P,
497        cfs: I,
498        error_if_log_file_exist: bool,
499    ) -> Result<Self, Error>
500    where
501        P: AsRef<Path>,
502        I: IntoIterator<Item = (N, Options)>,
503        N: AsRef<str>,
504    {
505        let cfs = cfs
506            .into_iter()
507            .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
508
509        Self::open_cf_descriptors_internal(
510            db_opts,
511            path,
512            cfs,
513            &AccessType::ReadOnly {
514                error_if_log_file_exist,
515            },
516        )
517    }
518
519    /// Opens a database for ready only with the given database options and
520    /// column family descriptors.
521    pub fn open_cf_descriptors_read_only<P, I>(
522        opts: &Options,
523        path: P,
524        cfs: I,
525        error_if_log_file_exist: bool,
526    ) -> Result<Self, Error>
527    where
528        P: AsRef<Path>,
529        I: IntoIterator<Item = ColumnFamilyDescriptor>,
530    {
531        Self::open_cf_descriptors_internal(
532            opts,
533            path,
534            cfs,
535            &AccessType::ReadOnly {
536                error_if_log_file_exist,
537            },
538        )
539    }
540
541    /// Opens the database as a secondary with the given database options and column family names.
542    pub fn open_cf_as_secondary<P, I, N>(
543        opts: &Options,
544        primary_path: P,
545        secondary_path: P,
546        cfs: I,
547    ) -> Result<Self, Error>
548    where
549        P: AsRef<Path>,
550        I: IntoIterator<Item = N>,
551        N: AsRef<str>,
552    {
553        let cfs = cfs
554            .into_iter()
555            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
556
557        Self::open_cf_descriptors_internal(
558            opts,
559            primary_path,
560            cfs,
561            &AccessType::Secondary {
562                secondary_path: secondary_path.as_ref(),
563            },
564        )
565    }
566
567    /// Opens the database as a secondary with the given database options and
568    /// column family descriptors.
569    pub fn open_cf_descriptors_as_secondary<P, I>(
570        opts: &Options,
571        path: P,
572        secondary_path: P,
573        cfs: I,
574    ) -> Result<Self, Error>
575    where
576        P: AsRef<Path>,
577        I: IntoIterator<Item = ColumnFamilyDescriptor>,
578    {
579        Self::open_cf_descriptors_internal(
580            opts,
581            path,
582            cfs,
583            &AccessType::Secondary {
584                secondary_path: secondary_path.as_ref(),
585            },
586        )
587    }
588
589    /// Opens a database with the given database options and column family descriptors.
590    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
591    where
592        P: AsRef<Path>,
593        I: IntoIterator<Item = ColumnFamilyDescriptor>,
594    {
595        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
596    }
597
598    /// Internal implementation for opening RocksDB.
599    fn open_cf_descriptors_internal<P, I>(
600        opts: &Options,
601        path: P,
602        cfs: I,
603        access_type: &AccessType,
604    ) -> Result<Self, Error>
605    where
606        P: AsRef<Path>,
607        I: IntoIterator<Item = ColumnFamilyDescriptor>,
608    {
609        let cfs: Vec<_> = cfs.into_iter().collect();
610        let outlive = iter::once(opts.outlive.clone())
611            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
612            .collect();
613
614        let cpath = to_cpath(&path)?;
615
616        if let Err(e) = fs::create_dir_all(&path) {
617            return Err(Error::new(format!(
618                "Failed to create RocksDB directory: `{e:?}`."
619            )));
620        }
621
622        let db: *mut ffi::rocksdb_t;
623        let mut cf_map = BTreeMap::new();
624
625        if cfs.is_empty() {
626            db = Self::open_raw(opts, &cpath, access_type)?;
627        } else {
628            let mut cfs_v = cfs;
629            // Always open the default column family.
630            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
631                cfs_v.push(ColumnFamilyDescriptor {
632                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
633                    options: Options::default(),
634                });
635            }
636            // We need to store our CStrings in an intermediate vector
637            // so that their pointers remain valid.
638            let c_cfs: Vec<CString> = cfs_v
639                .iter()
640                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
641                .collect();
642
643            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
644
645            // These handles will be populated by DB.
646            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
647
648            let cfopts: Vec<_> = cfs_v
649                .iter()
650                .map(|cf| cf.options.inner as *const _)
651                .collect();
652
653            db = Self::open_cf_raw(
654                opts,
655                &cpath,
656                &cfs_v,
657                &cfnames,
658                &cfopts,
659                &mut cfhandles,
660                access_type,
661            )?;
662            for handle in &cfhandles {
663                if handle.is_null() {
664                    return Err(Error::new(
665                        "Received null column family handle from DB.".to_owned(),
666                    ));
667                }
668            }
669
670            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
671                cf_map.insert(cf_desc.name.clone(), inner);
672            }
673        }
674
675        if db.is_null() {
676            return Err(Error::new("Could not initialize database.".to_owned()));
677        }
678
679        Ok(Self {
680            inner: DBWithThreadModeInner { inner: db },
681            path: path.as_ref().to_path_buf(),
682            cfs: T::new_cf_map_internal(cf_map),
683            _outlive: outlive,
684        })
685    }
686
687    fn open_raw(
688        opts: &Options,
689        cpath: &CString,
690        access_type: &AccessType,
691    ) -> Result<*mut ffi::rocksdb_t, Error> {
692        let db = unsafe {
693            match *access_type {
694                AccessType::ReadOnly {
695                    error_if_log_file_exist,
696                } => ffi_try!(ffi::rocksdb_open_for_read_only(
697                    opts.inner,
698                    cpath.as_ptr(),
699                    c_uchar::from(error_if_log_file_exist),
700                )),
701                AccessType::ReadWrite => {
702                    ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
703                }
704                AccessType::Secondary { secondary_path } => {
705                    ffi_try!(ffi::rocksdb_open_as_secondary(
706                        opts.inner,
707                        cpath.as_ptr(),
708                        to_cpath(secondary_path)?.as_ptr(),
709                    ))
710                }
711                AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
712                    opts.inner,
713                    cpath.as_ptr(),
714                    ttl.as_secs() as c_int,
715                )),
716            }
717        };
718        Ok(db)
719    }
720
721    #[allow(clippy::pedantic)]
722    fn open_cf_raw(
723        opts: &Options,
724        cpath: &CString,
725        cfs_v: &[ColumnFamilyDescriptor],
726        cfnames: &[*const c_char],
727        cfopts: &[*const ffi::rocksdb_options_t],
728        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
729        access_type: &AccessType,
730    ) -> Result<*mut ffi::rocksdb_t, Error> {
731        let db = unsafe {
732            match *access_type {
733                AccessType::ReadOnly {
734                    error_if_log_file_exist,
735                } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
736                    opts.inner,
737                    cpath.as_ptr(),
738                    cfs_v.len() as c_int,
739                    cfnames.as_ptr(),
740                    cfopts.as_ptr(),
741                    cfhandles.as_mut_ptr(),
742                    c_uchar::from(error_if_log_file_exist),
743                )),
744                AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
745                    opts.inner,
746                    cpath.as_ptr(),
747                    cfs_v.len() as c_int,
748                    cfnames.as_ptr(),
749                    cfopts.as_ptr(),
750                    cfhandles.as_mut_ptr(),
751                )),
752                AccessType::Secondary { secondary_path } => {
753                    ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
754                        opts.inner,
755                        cpath.as_ptr(),
756                        to_cpath(secondary_path)?.as_ptr(),
757                        cfs_v.len() as c_int,
758                        cfnames.as_ptr(),
759                        cfopts.as_ptr(),
760                        cfhandles.as_mut_ptr(),
761                    ))
762                }
763                AccessType::WithTTL { ttl } => {
764                    let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
765                    ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
766                        opts.inner,
767                        cpath.as_ptr(),
768                        cfs_v.len() as c_int,
769                        cfnames.as_ptr(),
770                        cfopts.as_ptr(),
771                        cfhandles.as_mut_ptr(),
772                        ttls_v.as_ptr(),
773                    ))
774                }
775            }
776        };
777        Ok(db)
778    }
779
780    /// Removes the database entries in the range `["from", "to")` using given write options.
781    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
782        &self,
783        cf: &impl AsColumnFamilyRef,
784        from: K,
785        to: K,
786        writeopts: &WriteOptions,
787    ) -> Result<(), Error> {
788        let from = from.as_ref();
789        let to = to.as_ref();
790
791        unsafe {
792            ffi_try!(ffi::rocksdb_delete_range_cf(
793                self.inner.inner(),
794                writeopts.inner,
795                cf.inner(),
796                from.as_ptr() as *const c_char,
797                from.len() as size_t,
798                to.as_ptr() as *const c_char,
799                to.len() as size_t,
800            ));
801            Ok(())
802        }
803    }
804
805    /// Removes the database entries in the range `["from", "to")` using default write options.
806    pub fn delete_range_cf<K: AsRef<[u8]>>(
807        &self,
808        cf: &impl AsColumnFamilyRef,
809        from: K,
810        to: K,
811    ) -> Result<(), Error> {
812        self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
813    }
814
815    pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
816        unsafe {
817            ffi_try!(ffi::rocksdb_write(
818                self.inner.inner(),
819                writeopts.inner,
820                batch.inner
821            ));
822        }
823        Ok(())
824    }
825
826    pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
827        self.write_opt(batch, &WriteOptions::default())
828    }
829
830    pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
831        let mut wo = WriteOptions::new();
832        wo.disable_wal(true);
833        self.write_opt(batch, &wo)
834    }
835}
836
837/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
838impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
839    pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
840        Self {
841            inner,
842            cfs,
843            path,
844            _outlive: outlive,
845        }
846    }
847
848    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
849        let cpath = to_cpath(path)?;
850        let mut length = 0;
851
852        unsafe {
853            let ptr = ffi_try!(ffi::rocksdb_list_column_families(
854                opts.inner,
855                cpath.as_ptr(),
856                &mut length,
857            ));
858
859            let vec = slice::from_raw_parts(ptr, length)
860                .iter()
861                .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
862                .collect();
863            ffi::rocksdb_list_column_families_destroy(ptr, length);
864            Ok(vec)
865        }
866    }
867
868    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
869        let cpath = to_cpath(path)?;
870        unsafe {
871            ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
872        }
873        Ok(())
874    }
875
876    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
877        let cpath = to_cpath(path)?;
878        unsafe {
879            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
880        }
881        Ok(())
882    }
883
884    pub fn path(&self) -> &Path {
885        self.path.as_path()
886    }
887
888    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
889    /// the data to disk.
890    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
891        unsafe {
892            ffi_try!(ffi::rocksdb_flush_wal(
893                self.inner.inner(),
894                c_uchar::from(sync)
895            ));
896        }
897        Ok(())
898    }
899
900    /// Flushes database memtables to SST files on the disk.
901    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
902        unsafe {
903            ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
904        }
905        Ok(())
906    }
907
908    /// Flushes database memtables to SST files on the disk using default options.
909    pub fn flush(&self) -> Result<(), Error> {
910        self.flush_opt(&FlushOptions::default())
911    }
912
913    /// Flushes database memtables to SST files on the disk for a given column family.
914    pub fn flush_cf_opt(
915        &self,
916        cf: &impl AsColumnFamilyRef,
917        flushopts: &FlushOptions,
918    ) -> Result<(), Error> {
919        unsafe {
920            ffi_try!(ffi::rocksdb_flush_cf(
921                self.inner.inner(),
922                flushopts.inner,
923                cf.inner()
924            ));
925        }
926        Ok(())
927    }
928
929    /// Flushes database memtables to SST files on the disk for a given column family using default
930    /// options.
931    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
932        self.flush_cf_opt(cf, &FlushOptions::default())
933    }
934
935    /// Return the bytes associated with a key value with read options. If you only intend to use
936    /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
937    /// to avoid unnecessary memory copy.
938    pub fn get_opt<K: AsRef<[u8]>>(
939        &self,
940        key: K,
941        readopts: &ReadOptions,
942    ) -> Result<Option<Vec<u8>>, Error> {
943        self.get_pinned_opt(key, readopts)
944            .map(|x| x.map(|v| v.as_ref().to_vec()))
945    }
946
947    /// Return the bytes associated with a key value. If you only intend to use the vector returned
948    /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
949    /// copy.
950    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
951        self.get_opt(key.as_ref(), &ReadOptions::default())
952    }
953
954    /// Return the bytes associated with a key value and the given column family with read options.
955    /// If you only intend to use the vector returned temporarily, consider using
956    /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
957    pub fn get_cf_opt<K: AsRef<[u8]>>(
958        &self,
959        cf: &impl AsColumnFamilyRef,
960        key: K,
961        readopts: &ReadOptions,
962    ) -> Result<Option<Vec<u8>>, Error> {
963        self.get_pinned_cf_opt(cf, key, readopts)
964            .map(|x| x.map(|v| v.as_ref().to_vec()))
965    }
966
967    /// Return the bytes associated with a key value and the given column family. If you only
968    /// intend to use the vector returned temporarily, consider using
969    /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
970    pub fn get_cf<K: AsRef<[u8]>>(
971        &self,
972        cf: &impl AsColumnFamilyRef,
973        key: K,
974    ) -> Result<Option<Vec<u8>>, Error> {
975        self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
976    }
977
978    /// Return the value associated with a key using RocksDB's PinnableSlice
979    /// so as to avoid unnecessary memory copy.
980    pub fn get_pinned_opt<K: AsRef<[u8]>>(
981        &self,
982        key: K,
983        readopts: &ReadOptions,
984    ) -> Result<Option<DBPinnableSlice>, Error> {
985        if readopts.inner.is_null() {
986            return Err(Error::new(
987                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
988                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
989                    .to_owned(),
990            ));
991        }
992
993        let key = key.as_ref();
994        unsafe {
995            let val = ffi_try!(ffi::rocksdb_get_pinned(
996                self.inner.inner(),
997                readopts.inner,
998                key.as_ptr() as *const c_char,
999                key.len() as size_t,
1000            ));
1001            if val.is_null() {
1002                Ok(None)
1003            } else {
1004                Ok(Some(DBPinnableSlice::from_c(val)))
1005            }
1006        }
1007    }
1008
1009    /// Return the value associated with a key using RocksDB's PinnableSlice
1010    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1011    /// leverages default options.
1012    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1013        self.get_pinned_opt(key, &ReadOptions::default())
1014    }
1015
1016    /// Return the value associated with a key using RocksDB's PinnableSlice
1017    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1018    /// allows specifying ColumnFamily
1019    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1020        &self,
1021        cf: &impl AsColumnFamilyRef,
1022        key: K,
1023        readopts: &ReadOptions,
1024    ) -> Result<Option<DBPinnableSlice>, Error> {
1025        if readopts.inner.is_null() {
1026            return Err(Error::new(
1027                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1028                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1029                    .to_owned(),
1030            ));
1031        }
1032
1033        let key = key.as_ref();
1034        unsafe {
1035            let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1036                self.inner.inner(),
1037                readopts.inner,
1038                cf.inner(),
1039                key.as_ptr() as *const c_char,
1040                key.len() as size_t,
1041            ));
1042            if val.is_null() {
1043                Ok(None)
1044            } else {
1045                Ok(Some(DBPinnableSlice::from_c(val)))
1046            }
1047        }
1048    }
1049
1050    /// Return the value associated with a key using RocksDB's PinnableSlice
1051    /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
1052    /// leverages default options.
1053    pub fn get_pinned_cf<K: AsRef<[u8]>>(
1054        &self,
1055        cf: &impl AsColumnFamilyRef,
1056        key: K,
1057    ) -> Result<Option<DBPinnableSlice>, Error> {
1058        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1059    }
1060
1061    /// Return the values associated with the given keys.
1062    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1063    where
1064        K: AsRef<[u8]>,
1065        I: IntoIterator<Item = K>,
1066    {
1067        self.multi_get_opt(keys, &ReadOptions::default())
1068    }
1069
1070    /// Return the values associated with the given keys using read options.
1071    pub fn multi_get_opt<K, I>(
1072        &self,
1073        keys: I,
1074        readopts: &ReadOptions,
1075    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1076    where
1077        K: AsRef<[u8]>,
1078        I: IntoIterator<Item = K>,
1079    {
1080        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1081            .into_iter()
1082            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1083            .unzip();
1084        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1085
1086        let mut values = vec![ptr::null_mut(); keys.len()];
1087        let mut values_sizes = vec![0_usize; keys.len()];
1088        let mut errors = vec![ptr::null_mut(); keys.len()];
1089        unsafe {
1090            ffi::rocksdb_multi_get(
1091                self.inner.inner(),
1092                readopts.inner,
1093                ptr_keys.len(),
1094                ptr_keys.as_ptr(),
1095                keys_sizes.as_ptr(),
1096                values.as_mut_ptr(),
1097                values_sizes.as_mut_ptr(),
1098                errors.as_mut_ptr(),
1099            );
1100        }
1101
1102        convert_values(values, values_sizes, errors)
1103    }
1104
1105    /// Return the values associated with the given keys and column families.
1106    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1107        &'a self,
1108        keys: I,
1109    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1110    where
1111        K: AsRef<[u8]>,
1112        I: IntoIterator<Item = (&'b W, K)>,
1113        W: 'b + AsColumnFamilyRef,
1114    {
1115        self.multi_get_cf_opt(keys, &ReadOptions::default())
1116    }
1117
1118    /// Return the values associated with the given keys and column families using read options.
1119    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1120        &'a self,
1121        keys: I,
1122        readopts: &ReadOptions,
1123    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1124    where
1125        K: AsRef<[u8]>,
1126        I: IntoIterator<Item = (&'b W, K)>,
1127        W: 'b + AsColumnFamilyRef,
1128    {
1129        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1130            .into_iter()
1131            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1132            .unzip();
1133        let ptr_keys: Vec<_> = cfs_and_keys
1134            .iter()
1135            .map(|(_, k)| k.as_ptr() as *const c_char)
1136            .collect();
1137        let ptr_cfs: Vec<_> = cfs_and_keys
1138            .iter()
1139            .map(|(c, _)| c.inner() as *const _)
1140            .collect();
1141
1142        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1143        let mut values_sizes = vec![0_usize; ptr_keys.len()];
1144        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1145        unsafe {
1146            ffi::rocksdb_multi_get_cf(
1147                self.inner.inner(),
1148                readopts.inner,
1149                ptr_cfs.as_ptr(),
1150                ptr_keys.len(),
1151                ptr_keys.as_ptr(),
1152                keys_sizes.as_ptr(),
1153                values.as_mut_ptr(),
1154                values_sizes.as_mut_ptr(),
1155                errors.as_mut_ptr(),
1156            );
1157        }
1158
1159        convert_values(values, values_sizes, errors)
1160    }
1161
1162    /// Return the values associated with the given keys and the specified column family
1163    /// where internally the read requests are processed in batch if block-based table
1164    /// SST format is used.  It is a more optimized version of multi_get_cf.
1165    pub fn batched_multi_get_cf<K, I>(
1166        &self,
1167        cf: &impl AsColumnFamilyRef,
1168        keys: I,
1169        sorted_input: bool,
1170    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1171    where
1172        K: AsRef<[u8]>,
1173        I: IntoIterator<Item = K>,
1174    {
1175        self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1176    }
1177
1178    /// Return the values associated with the given keys and the specified column family
1179    /// where internally the read requests are processed in batch if block-based table
1180    /// SST format is used.  It is a more optimized version of multi_get_cf_opt.
1181    pub fn batched_multi_get_cf_opt<K, I>(
1182        &self,
1183        cf: &impl AsColumnFamilyRef,
1184        keys: I,
1185        sorted_input: bool,
1186        readopts: &ReadOptions,
1187    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1188    where
1189        K: AsRef<[u8]>,
1190        I: IntoIterator<Item = K>,
1191    {
1192        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1193            .into_iter()
1194            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1195            .unzip();
1196        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1197
1198        let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1199        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1200
1201        unsafe {
1202            ffi::rocksdb_batched_multi_get_cf(
1203                self.inner.inner(),
1204                readopts.inner,
1205                cf.inner(),
1206                ptr_keys.len(),
1207                ptr_keys.as_ptr(),
1208                keys_sizes.as_ptr(),
1209                pinned_values.as_mut_ptr(),
1210                errors.as_mut_ptr(),
1211                sorted_input,
1212            );
1213            pinned_values
1214                .into_iter()
1215                .zip(errors.into_iter())
1216                .map(|(v, e)| {
1217                    if e.is_null() {
1218                        if v.is_null() {
1219                            Ok(None)
1220                        } else {
1221                            Ok(Some(DBPinnableSlice::from_c(v)))
1222                        }
1223                    } else {
1224                        Err(Error::new(crate::ffi_util::error_message(e)))
1225                    }
1226                })
1227                .collect()
1228        }
1229    }
1230
1231    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1232    /// `true`. This function uses default `ReadOptions`.
1233    pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1234        self.key_may_exist_opt(key, &ReadOptions::default())
1235    }
1236
1237    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1238    /// `true`.
1239    pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1240        let key = key.as_ref();
1241        unsafe {
1242            0 != ffi::rocksdb_key_may_exist(
1243                self.inner.inner(),
1244                readopts.inner,
1245                key.as_ptr() as *const c_char,
1246                key.len() as size_t,
1247                ptr::null_mut(), /*value*/
1248                ptr::null_mut(), /*val_len*/
1249                ptr::null(),     /*timestamp*/
1250                0,               /*timestamp_len*/
1251                ptr::null_mut(), /*value_found*/
1252            )
1253        }
1254    }
1255
1256    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1257    /// otherwise returns `true`. This function uses default `ReadOptions`.
1258    pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1259        self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1260    }
1261
1262    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1263    /// otherwise returns `true`.
1264    pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1265        &self,
1266        cf: &impl AsColumnFamilyRef,
1267        key: K,
1268        readopts: &ReadOptions,
1269    ) -> bool {
1270        let key = key.as_ref();
1271        0 != unsafe {
1272            ffi::rocksdb_key_may_exist_cf(
1273                self.inner.inner(),
1274                readopts.inner,
1275                cf.inner(),
1276                key.as_ptr() as *const c_char,
1277                key.len() as size_t,
1278                ptr::null_mut(), /*value*/
1279                ptr::null_mut(), /*val_len*/
1280                ptr::null(),     /*timestamp*/
1281                0,               /*timestamp_len*/
1282                ptr::null_mut(), /*value_found*/
1283            )
1284        }
1285    }
1286
1287    /// If the key definitely does not exist in the database, then this method
1288    /// returns `(false, None)`, else `(true, None)` if it may.
1289    /// If the key is found in memory, then it returns `(true, Some<CSlice>)`.
1290    ///
1291    /// This check is potentially lighter-weight than calling `get()`. One way
1292    /// to make this lighter weight is to avoid doing any IOs.
1293    pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1294        &self,
1295        cf: &impl AsColumnFamilyRef,
1296        key: K,
1297        readopts: &ReadOptions,
1298    ) -> (bool, Option<CSlice>) {
1299        let key = key.as_ref();
1300        let mut val: *mut c_char = ptr::null_mut();
1301        let mut val_len: usize = 0;
1302        let mut value_found: c_uchar = 0;
1303        let may_exists = 0
1304            != unsafe {
1305                ffi::rocksdb_key_may_exist_cf(
1306                    self.inner.inner(),
1307                    readopts.inner,
1308                    cf.inner(),
1309                    key.as_ptr() as *const c_char,
1310                    key.len() as size_t,
1311                    &mut val,         /*value*/
1312                    &mut val_len,     /*val_len*/
1313                    ptr::null(),      /*timestamp*/
1314                    0,                /*timestamp_len*/
1315                    &mut value_found, /*value_found*/
1316                )
1317            };
1318        // The value is only allocated (using malloc) and returned if it is found and
1319        // value_found isn't NULL. In that case the user is responsible for freeing it.
1320        if may_exists && value_found != 0 {
1321            (
1322                may_exists,
1323                Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1324            )
1325        } else {
1326            (may_exists, None)
1327        }
1328    }
1329
1330    fn create_inner_cf_handle(
1331        &self,
1332        name: impl CStrLike,
1333        opts: &Options,
1334    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1335        let cf_name = name.bake().map_err(|err| {
1336            Error::new(format!(
1337                "Failed to convert path to CString when creating cf: {err}"
1338            ))
1339        })?;
1340        Ok(unsafe {
1341            ffi_try!(ffi::rocksdb_create_column_family(
1342                self.inner.inner(),
1343                opts.inner,
1344                cf_name.as_ptr(),
1345            ))
1346        })
1347    }
1348
1349    pub fn iterator<'a: 'b, 'b>(
1350        &'a self,
1351        mode: IteratorMode,
1352    ) -> DBIteratorWithThreadMode<'b, Self> {
1353        let readopts = ReadOptions::default();
1354        self.iterator_opt(mode, readopts)
1355    }
1356
1357    pub fn iterator_opt<'a: 'b, 'b>(
1358        &'a self,
1359        mode: IteratorMode,
1360        readopts: ReadOptions,
1361    ) -> DBIteratorWithThreadMode<'b, Self> {
1362        DBIteratorWithThreadMode::new(self, readopts, mode)
1363    }
1364
1365    /// Opens an iterator using the provided ReadOptions.
1366    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
1367    pub fn iterator_cf_opt<'a: 'b, 'b>(
1368        &'a self,
1369        cf_handle: &impl AsColumnFamilyRef,
1370        readopts: ReadOptions,
1371        mode: IteratorMode,
1372    ) -> DBIteratorWithThreadMode<'b, Self> {
1373        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1374    }
1375
1376    /// Opens an iterator with `set_total_order_seek` enabled.
1377    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
1378    /// with a Hash-based implementation.
1379    pub fn full_iterator<'a: 'b, 'b>(
1380        &'a self,
1381        mode: IteratorMode,
1382    ) -> DBIteratorWithThreadMode<'b, Self> {
1383        let mut opts = ReadOptions::default();
1384        opts.set_total_order_seek(true);
1385        DBIteratorWithThreadMode::new(self, opts, mode)
1386    }
1387
1388    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1389        &'a self,
1390        prefix: P,
1391    ) -> DBIteratorWithThreadMode<'b, Self> {
1392        let mut opts = ReadOptions::default();
1393        opts.set_prefix_same_as_start(true);
1394        DBIteratorWithThreadMode::new(
1395            self,
1396            opts,
1397            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1398        )
1399    }
1400
1401    pub fn iterator_cf<'a: 'b, 'b>(
1402        &'a self,
1403        cf_handle: &impl AsColumnFamilyRef,
1404        mode: IteratorMode,
1405    ) -> DBIteratorWithThreadMode<'b, Self> {
1406        let opts = ReadOptions::default();
1407        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1408    }
1409
1410    pub fn full_iterator_cf<'a: 'b, 'b>(
1411        &'a self,
1412        cf_handle: &impl AsColumnFamilyRef,
1413        mode: IteratorMode,
1414    ) -> DBIteratorWithThreadMode<'b, Self> {
1415        let mut opts = ReadOptions::default();
1416        opts.set_total_order_seek(true);
1417        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1418    }
1419
1420    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1421        &'a self,
1422        cf_handle: &impl AsColumnFamilyRef,
1423        prefix: P,
1424    ) -> DBIteratorWithThreadMode<'a, Self> {
1425        let mut opts = ReadOptions::default();
1426        opts.set_prefix_same_as_start(true);
1427        DBIteratorWithThreadMode::<'a, Self>::new_cf(
1428            self,
1429            cf_handle.inner(),
1430            opts,
1431            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1432        )
1433    }
1434
1435    /// Opens a raw iterator over the database, using the default read options
1436    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1437        let opts = ReadOptions::default();
1438        DBRawIteratorWithThreadMode::new(self, opts)
1439    }
1440
1441    /// Opens a raw iterator over the given column family, using the default read options
1442    pub fn raw_iterator_cf<'a: 'b, 'b>(
1443        &'a self,
1444        cf_handle: &impl AsColumnFamilyRef,
1445    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1446        let opts = ReadOptions::default();
1447        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1448    }
1449
1450    /// Opens a raw iterator over the database, using the given read options
1451    pub fn raw_iterator_opt<'a: 'b, 'b>(
1452        &'a self,
1453        readopts: ReadOptions,
1454    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1455        DBRawIteratorWithThreadMode::new(self, readopts)
1456    }
1457
1458    /// Opens a raw iterator over the given column family, using the given read options
1459    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1460        &'a self,
1461        cf_handle: &impl AsColumnFamilyRef,
1462        readopts: ReadOptions,
1463    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1464        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1465    }
1466
1467    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1468        SnapshotWithThreadMode::<Self>::new(self)
1469    }
1470
1471    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1472    where
1473        K: AsRef<[u8]>,
1474        V: AsRef<[u8]>,
1475    {
1476        let key = key.as_ref();
1477        let value = value.as_ref();
1478
1479        unsafe {
1480            ffi_try!(ffi::rocksdb_put(
1481                self.inner.inner(),
1482                writeopts.inner,
1483                key.as_ptr() as *const c_char,
1484                key.len() as size_t,
1485                value.as_ptr() as *const c_char,
1486                value.len() as size_t,
1487            ));
1488            Ok(())
1489        }
1490    }
1491
1492    pub fn put_cf_opt<K, V>(
1493        &self,
1494        cf: &impl AsColumnFamilyRef,
1495        key: K,
1496        value: V,
1497        writeopts: &WriteOptions,
1498    ) -> Result<(), Error>
1499    where
1500        K: AsRef<[u8]>,
1501        V: AsRef<[u8]>,
1502    {
1503        let key = key.as_ref();
1504        let value = value.as_ref();
1505
1506        unsafe {
1507            ffi_try!(ffi::rocksdb_put_cf(
1508                self.inner.inner(),
1509                writeopts.inner,
1510                cf.inner(),
1511                key.as_ptr() as *const c_char,
1512                key.len() as size_t,
1513                value.as_ptr() as *const c_char,
1514                value.len() as size_t,
1515            ));
1516            Ok(())
1517        }
1518    }
1519
1520    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1521    where
1522        K: AsRef<[u8]>,
1523        V: AsRef<[u8]>,
1524    {
1525        let key = key.as_ref();
1526        let value = value.as_ref();
1527
1528        unsafe {
1529            ffi_try!(ffi::rocksdb_merge(
1530                self.inner.inner(),
1531                writeopts.inner,
1532                key.as_ptr() as *const c_char,
1533                key.len() as size_t,
1534                value.as_ptr() as *const c_char,
1535                value.len() as size_t,
1536            ));
1537            Ok(())
1538        }
1539    }
1540
1541    pub fn merge_cf_opt<K, V>(
1542        &self,
1543        cf: &impl AsColumnFamilyRef,
1544        key: K,
1545        value: V,
1546        writeopts: &WriteOptions,
1547    ) -> Result<(), Error>
1548    where
1549        K: AsRef<[u8]>,
1550        V: AsRef<[u8]>,
1551    {
1552        let key = key.as_ref();
1553        let value = value.as_ref();
1554
1555        unsafe {
1556            ffi_try!(ffi::rocksdb_merge_cf(
1557                self.inner.inner(),
1558                writeopts.inner,
1559                cf.inner(),
1560                key.as_ptr() as *const c_char,
1561                key.len() as size_t,
1562                value.as_ptr() as *const c_char,
1563                value.len() as size_t,
1564            ));
1565            Ok(())
1566        }
1567    }
1568
1569    pub fn delete_opt<K: AsRef<[u8]>>(
1570        &self,
1571        key: K,
1572        writeopts: &WriteOptions,
1573    ) -> Result<(), Error> {
1574        let key = key.as_ref();
1575
1576        unsafe {
1577            ffi_try!(ffi::rocksdb_delete(
1578                self.inner.inner(),
1579                writeopts.inner,
1580                key.as_ptr() as *const c_char,
1581                key.len() as size_t,
1582            ));
1583            Ok(())
1584        }
1585    }
1586
1587    pub fn delete_cf_opt<K: AsRef<[u8]>>(
1588        &self,
1589        cf: &impl AsColumnFamilyRef,
1590        key: K,
1591        writeopts: &WriteOptions,
1592    ) -> Result<(), Error> {
1593        let key = key.as_ref();
1594
1595        unsafe {
1596            ffi_try!(ffi::rocksdb_delete_cf(
1597                self.inner.inner(),
1598                writeopts.inner,
1599                cf.inner(),
1600                key.as_ptr() as *const c_char,
1601                key.len() as size_t,
1602            ));
1603            Ok(())
1604        }
1605    }
1606
1607    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1608    where
1609        K: AsRef<[u8]>,
1610        V: AsRef<[u8]>,
1611    {
1612        self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1613    }
1614
1615    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1616    where
1617        K: AsRef<[u8]>,
1618        V: AsRef<[u8]>,
1619    {
1620        self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1621    }
1622
1623    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1624    where
1625        K: AsRef<[u8]>,
1626        V: AsRef<[u8]>,
1627    {
1628        self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1629    }
1630
1631    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1632    where
1633        K: AsRef<[u8]>,
1634        V: AsRef<[u8]>,
1635    {
1636        self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1637    }
1638
1639    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1640        self.delete_opt(key.as_ref(), &WriteOptions::default())
1641    }
1642
1643    pub fn delete_cf<K: AsRef<[u8]>>(
1644        &self,
1645        cf: &impl AsColumnFamilyRef,
1646        key: K,
1647    ) -> Result<(), Error> {
1648        self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1649    }
1650
1651    /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
1652    pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1653        unsafe {
1654            let start = start.as_ref().map(AsRef::as_ref);
1655            let end = end.as_ref().map(AsRef::as_ref);
1656
1657            ffi::rocksdb_compact_range(
1658                self.inner.inner(),
1659                opt_bytes_to_ptr(start),
1660                start.map_or(0, <[u8]>::len) as size_t,
1661                opt_bytes_to_ptr(end),
1662                end.map_or(0, <[u8]>::len) as size_t,
1663            );
1664        }
1665    }
1666
1667    /// Same as `compact_range` but with custom options.
1668    pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1669        &self,
1670        start: Option<S>,
1671        end: Option<E>,
1672        opts: &CompactOptions,
1673    ) {
1674        unsafe {
1675            let start = start.as_ref().map(AsRef::as_ref);
1676            let end = end.as_ref().map(AsRef::as_ref);
1677
1678            ffi::rocksdb_compact_range_opt(
1679                self.inner.inner(),
1680                opts.inner,
1681                opt_bytes_to_ptr(start),
1682                start.map_or(0, <[u8]>::len) as size_t,
1683                opt_bytes_to_ptr(end),
1684                end.map_or(0, <[u8]>::len) as size_t,
1685            );
1686        }
1687    }
1688
1689    /// Runs a manual compaction on the Range of keys given on the
1690    /// given column family. This is not likely to be needed for typical usage.
1691    pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1692        &self,
1693        cf: &impl AsColumnFamilyRef,
1694        start: Option<S>,
1695        end: Option<E>,
1696    ) {
1697        unsafe {
1698            let start = start.as_ref().map(AsRef::as_ref);
1699            let end = end.as_ref().map(AsRef::as_ref);
1700
1701            ffi::rocksdb_compact_range_cf(
1702                self.inner.inner(),
1703                cf.inner(),
1704                opt_bytes_to_ptr(start),
1705                start.map_or(0, <[u8]>::len) as size_t,
1706                opt_bytes_to_ptr(end),
1707                end.map_or(0, <[u8]>::len) as size_t,
1708            );
1709        }
1710    }
1711
1712    /// Same as `compact_range_cf` but with custom options.
1713    pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1714        &self,
1715        cf: &impl AsColumnFamilyRef,
1716        start: Option<S>,
1717        end: Option<E>,
1718        opts: &CompactOptions,
1719    ) {
1720        unsafe {
1721            let start = start.as_ref().map(AsRef::as_ref);
1722            let end = end.as_ref().map(AsRef::as_ref);
1723
1724            ffi::rocksdb_compact_range_cf_opt(
1725                self.inner.inner(),
1726                cf.inner(),
1727                opts.inner,
1728                opt_bytes_to_ptr(start),
1729                start.map_or(0, <[u8]>::len) as size_t,
1730                opt_bytes_to_ptr(end),
1731                end.map_or(0, <[u8]>::len) as size_t,
1732            );
1733        }
1734    }
1735
1736    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1737        let copts = convert_options(opts)?;
1738        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1739        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1740        let count = opts.len() as i32;
1741        unsafe {
1742            ffi_try!(ffi::rocksdb_set_options(
1743                self.inner.inner(),
1744                count,
1745                cnames.as_ptr(),
1746                cvalues.as_ptr(),
1747            ));
1748        }
1749        Ok(())
1750    }
1751
1752    pub fn set_options_cf(
1753        &self,
1754        cf: &impl AsColumnFamilyRef,
1755        opts: &[(&str, &str)],
1756    ) -> Result<(), Error> {
1757        let copts = convert_options(opts)?;
1758        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1759        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1760        let count = opts.len() as i32;
1761        unsafe {
1762            ffi_try!(ffi::rocksdb_set_options_cf(
1763                self.inner.inner(),
1764                cf.inner(),
1765                count,
1766                cnames.as_ptr(),
1767                cvalues.as_ptr(),
1768            ));
1769        }
1770        Ok(())
1771    }
1772
1773    /// Implementation for property_value et al methods.
1774    ///
1775    /// `name` is the name of the property.  It will be converted into a CString
1776    /// and passed to `get_property` as argument.  `get_property` reads the
1777    /// specified property and either returns NULL or a pointer to a C allocated
1778    /// string; this method takes ownership of that string and will free it at
1779    /// the end. That string is parsed using `parse` callback which produces
1780    /// the returned result.
1781    fn property_value_impl<R>(
1782        name: impl CStrLike,
1783        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1784        parse: impl FnOnce(&str) -> Result<R, Error>,
1785    ) -> Result<Option<R>, Error> {
1786        let value = match name.bake() {
1787            Ok(prop_name) => get_property(prop_name.as_ptr()),
1788            Err(e) => {
1789                return Err(Error::new(format!(
1790                    "Failed to convert property name to CString: {e}"
1791                )));
1792            }
1793        };
1794        if value.is_null() {
1795            return Ok(None);
1796        }
1797        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1798            Ok(s) => parse(s).map(|value| Some(value)),
1799            Err(e) => Err(Error::new(format!(
1800                "Failed to convert property value to string: {e}"
1801            ))),
1802        };
1803        unsafe {
1804            libc::free(value as *mut c_void);
1805        }
1806        result
1807    }
1808
1809    /// Retrieves a RocksDB property by name.
1810    ///
1811    /// Full list of properties could be find
1812    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1813    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1814        Self::property_value_impl(
1815            name,
1816            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1817            |str_value| Ok(str_value.to_owned()),
1818        )
1819    }
1820
1821    /// Retrieves a RocksDB property by name, for a specific column family.
1822    ///
1823    /// Full list of properties could be find
1824    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1825    pub fn property_value_cf(
1826        &self,
1827        cf: &impl AsColumnFamilyRef,
1828        name: impl CStrLike,
1829    ) -> Result<Option<String>, Error> {
1830        Self::property_value_impl(
1831            name,
1832            |prop_name| unsafe {
1833                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1834            },
1835            |str_value| Ok(str_value.to_owned()),
1836        )
1837    }
1838
1839    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1840        value.parse::<u64>().map_err(|err| {
1841            Error::new(format!(
1842                "Failed to convert property value {value} to int: {err}"
1843            ))
1844        })
1845    }
1846
1847    /// Retrieves a RocksDB property and casts it to an integer.
1848    ///
1849    /// Full list of properties that return int values could be find
1850    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1851    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1852        Self::property_value_impl(
1853            name,
1854            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1855            Self::parse_property_int_value,
1856        )
1857    }
1858
1859    /// Retrieves a RocksDB property for a specific column family and casts it to an integer.
1860    ///
1861    /// Full list of properties that return int values could be find
1862    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1863    pub fn property_int_value_cf(
1864        &self,
1865        cf: &impl AsColumnFamilyRef,
1866        name: impl CStrLike,
1867    ) -> Result<Option<u64>, Error> {
1868        Self::property_value_impl(
1869            name,
1870            |prop_name| unsafe {
1871                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1872            },
1873            Self::parse_property_int_value,
1874        )
1875    }
1876
1877    /// The sequence number of the most recent transaction.
1878    pub fn latest_sequence_number(&self) -> u64 {
1879        unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1880    }
1881
1882    /// Iterate over batches of write operations since a given sequence.
1883    ///
1884    /// Produce an iterator that will provide the batches of write operations
1885    /// that have occurred since the given sequence (see
1886    /// `latest_sequence_number()`). Use the provided iterator to retrieve each
1887    /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
1888    /// deletes using the `WriteBatch::iterate()` function.
1889    ///
1890    /// Calling `get_updates_since()` with a sequence number that is out of
1891    /// bounds will return an error.
1892    pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1893        unsafe {
1894            // rocksdb_wal_readoptions_t does not appear to have any functions
1895            // for creating and destroying it; fortunately we can pass a nullptr
1896            // here to get the default behavior
1897            let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1898            let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1899                self.inner.inner(),
1900                seq_number,
1901                opts
1902            ));
1903            Ok(DBWALIterator {
1904                inner: iter,
1905                start_seq_number: seq_number,
1906            })
1907        }
1908    }
1909
1910    /// Tries to catch up with the primary by reading as much as possible from the
1911    /// log files.
1912    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1913        unsafe {
1914            ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1915        }
1916        Ok(())
1917    }
1918
1919    /// Loads a list of external SST files created with SstFileWriter into the DB with default opts
1920    pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1921        let opts = IngestExternalFileOptions::default();
1922        self.ingest_external_file_opts(&opts, paths)
1923    }
1924
1925    /// Loads a list of external SST files created with SstFileWriter into the DB
1926    pub fn ingest_external_file_opts<P: AsRef<Path>>(
1927        &self,
1928        opts: &IngestExternalFileOptions,
1929        paths: Vec<P>,
1930    ) -> Result<(), Error> {
1931        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1932        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1933
1934        self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1935    }
1936
1937    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1938    /// with default opts
1939    pub fn ingest_external_file_cf<P: AsRef<Path>>(
1940        &self,
1941        cf: &impl AsColumnFamilyRef,
1942        paths: Vec<P>,
1943    ) -> Result<(), Error> {
1944        let opts = IngestExternalFileOptions::default();
1945        self.ingest_external_file_cf_opts(cf, &opts, paths)
1946    }
1947
1948    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1949    pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1950        &self,
1951        cf: &impl AsColumnFamilyRef,
1952        opts: &IngestExternalFileOptions,
1953        paths: Vec<P>,
1954    ) -> Result<(), Error> {
1955        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1956        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1957
1958        self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1959    }
1960
1961    fn ingest_external_file_raw(
1962        &self,
1963        opts: &IngestExternalFileOptions,
1964        paths_v: &[CString],
1965        cpaths: &[*const c_char],
1966    ) -> Result<(), Error> {
1967        unsafe {
1968            ffi_try!(ffi::rocksdb_ingest_external_file(
1969                self.inner.inner(),
1970                cpaths.as_ptr(),
1971                paths_v.len(),
1972                opts.inner as *const _
1973            ));
1974            Ok(())
1975        }
1976    }
1977
1978    fn ingest_external_file_raw_cf(
1979        &self,
1980        cf: &impl AsColumnFamilyRef,
1981        opts: &IngestExternalFileOptions,
1982        paths_v: &[CString],
1983        cpaths: &[*const c_char],
1984    ) -> Result<(), Error> {
1985        unsafe {
1986            ffi_try!(ffi::rocksdb_ingest_external_file_cf(
1987                self.inner.inner(),
1988                cf.inner(),
1989                cpaths.as_ptr(),
1990                paths_v.len(),
1991                opts.inner as *const _
1992            ));
1993            Ok(())
1994        }
1995    }
1996
1997    pub fn get_approximate_sizes_with_option(
1998        &self,
1999        cf: &impl AsColumnFamilyRef,
2000        ranges: &[Ranges],
2001        files_size_error_margin: f64,
2002    ) -> Result<Vec<u64>, Error> {
2003        let start_keys: Vec<*const _> = ranges
2004            .iter()
2005            .map(|x| x.start_key.as_ptr() as *const _)
2006            .collect();
2007        let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
2008        let end_keys: Vec<*const _> = ranges
2009            .iter()
2010            .map(|x| x.end_key.as_ptr() as *const _)
2011            .collect();
2012        let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
2013        let db = self.inner.inner();
2014        // let include_files = approximate_option.include_files;
2015        // let include_memtable = approximate_option.include_memtable;
2016        // let files_size_error_margin = approximate_option.files_size_error_margin;
2017        let mut sizes: Vec<u64> = vec![0; ranges.len()];
2018        let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
2019            ranges.len() as i32,
2020            start_keys.as_ptr(),
2021            start_key_lens.as_ptr(),
2022            end_keys.as_ptr(),
2023            end_key_lens.as_ptr(),
2024            sizes.as_mut_ptr(),
2025        );
2026        unsafe {
2027            ffi_try!(ffi::rocksdb_approximate_sizes_cf_with_options(
2028                db,
2029                cf.inner(),
2030                n,
2031                start_key_ptr,
2032                start_key_len_ptr,
2033                end_key_ptr,
2034                end_key_len_ptr,
2035                size_ptr,
2036                true,
2037                true,
2038                files_size_error_margin,
2039            ));
2040        }
2041        Ok(sizes)
2042    }
2043
2044    /// Returns a list of all table files with their level, start key
2045    /// and end key
2046    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
2047        unsafe {
2048            let files = ffi::rocksdb_livefiles(self.inner.inner());
2049            if files.is_null() {
2050                Err(Error::new("Could not get live files".to_owned()))
2051            } else {
2052                let n = ffi::rocksdb_livefiles_count(files);
2053
2054                let mut livefiles = Vec::with_capacity(n as usize);
2055                let mut key_size: usize = 0;
2056
2057                for i in 0..n {
2058                    let column_family_name =
2059                        from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2060                    let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2061                    let size = ffi::rocksdb_livefiles_size(files, i);
2062                    let level = ffi::rocksdb_livefiles_level(files, i);
2063
2064                    // get smallest key inside file
2065                    let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2066                    let smallest_key = raw_data(smallest_key, key_size);
2067
2068                    // get largest key inside file
2069                    let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2070                    let largest_key = raw_data(largest_key, key_size);
2071
2072                    livefiles.push(LiveFile {
2073                        column_family_name,
2074                        name,
2075                        size,
2076                        level,
2077                        start_key: smallest_key,
2078                        end_key: largest_key,
2079                        num_entries: ffi::rocksdb_livefiles_entries(files, i),
2080                        num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2081                    });
2082                }
2083
2084                // destroy livefiles metadata(s)
2085                ffi::rocksdb_livefiles_destroy(files);
2086
2087                // return
2088                Ok(livefiles)
2089            }
2090        }
2091    }
2092
2093    /// Delete sst files whose keys are entirely in the given range.
2094    ///
2095    /// Could leave some keys in the range which are in files which are not
2096    /// entirely in the range.
2097    ///
2098    /// Note: L0 files are left regardless of whether they're in the range.
2099    ///
2100    /// SnapshotWithThreadModes before the delete might not see the data in the given range.
2101    pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2102        let from = from.as_ref();
2103        let to = to.as_ref();
2104        unsafe {
2105            ffi_try!(ffi::rocksdb_delete_file_in_range(
2106                self.inner.inner(),
2107                from.as_ptr() as *const c_char,
2108                from.len() as size_t,
2109                to.as_ptr() as *const c_char,
2110                to.len() as size_t,
2111            ));
2112            Ok(())
2113        }
2114    }
2115
2116    /// Same as `delete_file_in_range` but only for specific column family
2117    pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2118        &self,
2119        cf: &impl AsColumnFamilyRef,
2120        from: K,
2121        to: K,
2122    ) -> Result<(), Error> {
2123        let from = from.as_ref();
2124        let to = to.as_ref();
2125        unsafe {
2126            ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2127                self.inner.inner(),
2128                cf.inner(),
2129                from.as_ptr() as *const c_char,
2130                from.len() as size_t,
2131                to.as_ptr() as *const c_char,
2132                to.len() as size_t,
2133            ));
2134            Ok(())
2135        }
2136    }
2137
2138    /// Request stopping background work, if wait is true wait until it's done.
2139    pub fn cancel_all_background_work(&self, wait: bool) {
2140        unsafe {
2141            ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2142        }
2143    }
2144
2145    fn drop_column_family<C>(
2146        &self,
2147        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2148        cf: C,
2149    ) -> Result<(), Error> {
2150        unsafe {
2151            // first mark the column family as dropped
2152            ffi_try!(ffi::rocksdb_drop_column_family(
2153                self.inner.inner(),
2154                cf_inner
2155            ));
2156        }
2157        // then finally reclaim any resources (mem, files) by destroying the only single column
2158        // family handle by drop()-ing it
2159        drop(cf);
2160        Ok(())
2161    }
2162
2163    /// Create column family from importing data
2164    fn create_column_family_with_import(
2165        &self,
2166        name: &str,
2167        opts: &Options,
2168        metadata: &ExportImportFilesMetaData,
2169    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
2170        let cf_name = CString::new(name.as_bytes()).map_err(|err| {
2171            Error::new(format!(
2172                "Failed to convert path to CString when creating cf: {err}"
2173            ))
2174        })?;
2175        Ok(unsafe {
2176            ffi_try!(ffi::rocksdb_create_column_family_with_import(
2177                self.inner.inner(),
2178                opts.inner,
2179                cf_name.as_ptr(),
2180                metadata.inner,
2181            ))
2182        })
2183    }
2184}
2185
2186impl<I: DBInner> DBCommon<SingleThreaded, I> {
2187    /// Creates column family with given name and options
2188    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2189        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2190        self.cfs
2191            .cfs
2192            .insert(name.as_ref().to_string(), ColumnFamily { inner });
2193        Ok(())
2194    }
2195
2196    /// Create column family from importing data
2197    pub fn create_cf_with_import<N: AsRef<str>>(
2198        &mut self,
2199        name: N,
2200        opts: &Options,
2201        metadata: &ExportImportFilesMetaData,
2202    ) -> Result<(), Error> {
2203        let inner = self.create_column_family_with_import(name.as_ref(), opts, metadata)?;
2204        self.cfs
2205            .cfs
2206            .insert(name.as_ref().to_string(), ColumnFamily { inner });
2207        Ok(())
2208    }
2209    /// Drops the column family with the given name
2210    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2211        if let Some(cf) = self.cfs.cfs.remove(name) {
2212            self.drop_column_family(cf.inner, cf)
2213        } else {
2214            Err(Error::new(format!("Invalid column family: {name}")))
2215        }
2216    }
2217
2218    /// Returns the underlying column family handle
2219    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2220        self.cfs.cfs.get(name)
2221    }
2222}
2223
2224impl<I: DBInner> DBCommon<MultiThreaded, I> {
2225    /// Creates column family with given name and options
2226    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2227        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2228        self.cfs.cfs.write().unwrap().insert(
2229            name.as_ref().to_string(),
2230            Arc::new(UnboundColumnFamily { inner }),
2231        );
2232        Ok(())
2233    }
2234
2235    /// Create column family from importing data
2236    pub fn create_cf_with_import<N: AsRef<str>>(
2237        &self,
2238        name: N,
2239        opts: &Options,
2240        metadata: &ExportImportFilesMetaData,
2241    ) -> Result<(), Error> {
2242        let inner = self.create_column_family_with_import(name.as_ref(), opts, metadata)?;
2243        self.cfs.cfs.write().unwrap().insert(
2244            name.as_ref().to_string(),
2245            Arc::new(UnboundColumnFamily { inner }),
2246        );
2247        Ok(())
2248    }
2249
2250    /// Drops the column family with the given name by internally locking the inner column
2251    /// family map. This avoids needing `&mut self` reference
2252    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2253        if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2254            self.drop_column_family(cf.inner, cf)
2255        } else {
2256            Err(Error::new(format!("Invalid column family: {name}")))
2257        }
2258    }
2259
2260    /// Returns the underlying column family handle
2261    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2262        self.cfs
2263            .cfs
2264            .read()
2265            .unwrap()
2266            .get(name)
2267            .cloned()
2268            .map(UnboundColumnFamily::bound_column_family)
2269    }
2270}
2271
2272impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2273    fn drop(&mut self) {
2274        self.cfs.drop_all_cfs_internal();
2275    }
2276}
2277
2278impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2279    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2280        write!(f, "RocksDB {{ path: {:?} }}", self.path())
2281    }
2282}
2283
2284/// The metadata that describes a SST file
2285#[derive(Debug, Clone)]
2286pub struct LiveFile {
2287    /// Name of the column family the file belongs to
2288    pub column_family_name: String,
2289    /// Name of the file
2290    pub name: String,
2291    /// Size of the file
2292    pub size: usize,
2293    /// Level at which this file resides
2294    pub level: i32,
2295    /// Smallest user defined key in the file
2296    pub start_key: Option<Vec<u8>>,
2297    /// Largest user defined key in the file
2298    pub end_key: Option<Vec<u8>>,
2299    /// Number of entries/alive keys in the file
2300    pub num_entries: u64,
2301    /// Number of deletions/tomb key(s) in the file
2302    pub num_deletions: u64,
2303}
2304
2305fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2306    opts.iter()
2307        .map(|(name, value)| {
2308            let cname = match CString::new(name.as_bytes()) {
2309                Ok(cname) => cname,
2310                Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2311            };
2312            let cvalue = match CString::new(value.as_bytes()) {
2313                Ok(cvalue) => cvalue,
2314                Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2315            };
2316            Ok((cname, cvalue))
2317        })
2318        .collect()
2319}
2320
2321pub(crate) fn convert_values(
2322    values: Vec<*mut c_char>,
2323    values_sizes: Vec<usize>,
2324    errors: Vec<*mut c_char>,
2325) -> Vec<Result<Option<Vec<u8>>, Error>> {
2326    values
2327        .into_iter()
2328        .zip(values_sizes.into_iter())
2329        .zip(errors.into_iter())
2330        .map(|((v, s), e)| {
2331            if e.is_null() {
2332                let value = unsafe { crate::ffi_util::raw_data(v, s) };
2333                unsafe {
2334                    ffi::rocksdb_free(v as *mut c_void);
2335                }
2336                Ok(value)
2337            } else {
2338                Err(Error::new(crate::ffi_util::error_message(e)))
2339            }
2340        })
2341        .collect()
2342}