electrs_rocksdb/
db.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use crate::{
17    column_family::AsColumnFamilyRef,
18    column_family::BoundColumnFamily,
19    column_family::UnboundColumnFamily,
20    db_options::OptionsMustOutliveDB,
21    ffi,
22    ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23    ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24    DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25    IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26    WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
27};
28
29use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
30use std::collections::BTreeMap;
31use std::ffi::{CStr, CString};
32use std::fmt;
33use std::fs;
34use std::iter;
35use std::path::Path;
36use std::path::PathBuf;
37use std::ptr;
38use std::slice;
39use std::str;
40use std::sync::Arc;
41use std::sync::RwLock;
42use std::time::Duration;
43
44/// Marker trait to specify single or multi threaded column family alternations for
45/// [`DBWithThreadMode<T>`]
46///
47/// This arrangement makes differences in self mutability and return type in
48/// some of `DBWithThreadMode` methods.
49///
50/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
51/// also has a minimum set of not-encapsulated internal methods between
52/// [`SingleThreaded`] and [`MultiThreaded`].  These methods aren't expected to be
53/// called and defined externally.
54pub trait ThreadMode {
55    /// Internal implementation for storing column family handles
56    fn new_cf_map_internal(
57        cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
58    ) -> Self;
59    /// Internal implementation for dropping column family handles
60    fn drop_all_cfs_internal(&mut self);
61}
62
63/// Actual marker type for the marker trait `ThreadMode`, which holds
64/// a collection of column families without synchronization primitive, providing
65/// no overhead for the single-threaded column family alternations. The other
66/// mode is [`MultiThreaded`].
67///
68/// See [`DB`] for more details, including performance implications for each mode
69pub struct SingleThreaded {
70    pub(crate) cfs: BTreeMap<String, ColumnFamily>,
71}
72
73/// Actual marker type for the marker trait `ThreadMode`, which holds
74/// a collection of column families wrapped in a RwLock to be mutated
75/// concurrently. The other mode is [`SingleThreaded`].
76///
77/// See [`DB`] for more details, including performance implications for each mode
78pub struct MultiThreaded {
79    pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
80}
81
82impl ThreadMode for SingleThreaded {
83    fn new_cf_map_internal(
84        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
85    ) -> Self {
86        Self {
87            cfs: cfs
88                .into_iter()
89                .map(|(n, c)| (n, ColumnFamily { inner: c }))
90                .collect(),
91        }
92    }
93
94    fn drop_all_cfs_internal(&mut self) {
95        // Cause all ColumnFamily objects to be Drop::drop()-ed.
96        self.cfs.clear();
97    }
98}
99
100impl ThreadMode for MultiThreaded {
101    fn new_cf_map_internal(
102        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
103    ) -> Self {
104        Self {
105            cfs: RwLock::new(
106                cfs.into_iter()
107                    .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
108                    .collect(),
109            ),
110        }
111    }
112
113    fn drop_all_cfs_internal(&mut self) {
114        // Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
115        self.cfs.write().unwrap().clear();
116    }
117}
118
119/// Get underlying `rocksdb_t`.
120pub trait DBInner {
121    fn inner(&self) -> *mut ffi::rocksdb_t;
122}
123
124/// A helper type to implement some common methods for [`DBWithThreadMode`]
125/// and [`OptimisticTransactionDB`].
126///
127/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
128pub struct DBCommon<T: ThreadMode, D: DBInner> {
129    pub(crate) inner: D,
130    cfs: T, // Column families are held differently depending on thread mode
131    path: PathBuf,
132    _outlive: Vec<OptionsMustOutliveDB>,
133}
134
135/// Minimal set of DB-related methods, intended to be generic over
136/// `DBWithThreadMode<T>`. Mainly used internally
137pub trait DBAccess {
138    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
139
140    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
141
142    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
143
144    unsafe fn create_iterator_cf(
145        &self,
146        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
147        readopts: &ReadOptions,
148    ) -> *mut ffi::rocksdb_iterator_t;
149
150    fn get_opt<K: AsRef<[u8]>>(
151        &self,
152        key: K,
153        readopts: &ReadOptions,
154    ) -> Result<Option<Vec<u8>>, Error>;
155
156    fn get_cf_opt<K: AsRef<[u8]>>(
157        &self,
158        cf: &impl AsColumnFamilyRef,
159        key: K,
160        readopts: &ReadOptions,
161    ) -> Result<Option<Vec<u8>>, Error>;
162
163    fn get_pinned_opt<K: AsRef<[u8]>>(
164        &self,
165        key: K,
166        readopts: &ReadOptions,
167    ) -> Result<Option<DBPinnableSlice>, Error>;
168
169    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
170        &self,
171        cf: &impl AsColumnFamilyRef,
172        key: K,
173        readopts: &ReadOptions,
174    ) -> Result<Option<DBPinnableSlice>, Error>;
175
176    fn multi_get_opt<K, I>(
177        &self,
178        keys: I,
179        readopts: &ReadOptions,
180    ) -> Vec<Result<Option<Vec<u8>>, Error>>
181    where
182        K: AsRef<[u8]>,
183        I: IntoIterator<Item = K>;
184
185    fn multi_get_cf_opt<'b, K, I, W>(
186        &self,
187        keys_cf: I,
188        readopts: &ReadOptions,
189    ) -> Vec<Result<Option<Vec<u8>>, Error>>
190    where
191        K: AsRef<[u8]>,
192        I: IntoIterator<Item = (&'b W, K)>,
193        W: AsColumnFamilyRef + 'b;
194}
195
196impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
197    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
198        ffi::rocksdb_create_snapshot(self.inner.inner())
199    }
200
201    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
202        ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
203    }
204
205    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
206        ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
207    }
208
209    unsafe fn create_iterator_cf(
210        &self,
211        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
212        readopts: &ReadOptions,
213    ) -> *mut ffi::rocksdb_iterator_t {
214        ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
215    }
216
217    fn get_opt<K: AsRef<[u8]>>(
218        &self,
219        key: K,
220        readopts: &ReadOptions,
221    ) -> Result<Option<Vec<u8>>, Error> {
222        self.get_opt(key, readopts)
223    }
224
225    fn get_cf_opt<K: AsRef<[u8]>>(
226        &self,
227        cf: &impl AsColumnFamilyRef,
228        key: K,
229        readopts: &ReadOptions,
230    ) -> Result<Option<Vec<u8>>, Error> {
231        self.get_cf_opt(cf, key, readopts)
232    }
233
234    fn get_pinned_opt<K: AsRef<[u8]>>(
235        &self,
236        key: K,
237        readopts: &ReadOptions,
238    ) -> Result<Option<DBPinnableSlice>, Error> {
239        self.get_pinned_opt(key, readopts)
240    }
241
242    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
243        &self,
244        cf: &impl AsColumnFamilyRef,
245        key: K,
246        readopts: &ReadOptions,
247    ) -> Result<Option<DBPinnableSlice>, Error> {
248        self.get_pinned_cf_opt(cf, key, readopts)
249    }
250
251    fn multi_get_opt<K, Iter>(
252        &self,
253        keys: Iter,
254        readopts: &ReadOptions,
255    ) -> Vec<Result<Option<Vec<u8>>, Error>>
256    where
257        K: AsRef<[u8]>,
258        Iter: IntoIterator<Item = K>,
259    {
260        self.multi_get_opt(keys, readopts)
261    }
262
263    fn multi_get_cf_opt<'b, K, Iter, W>(
264        &self,
265        keys_cf: Iter,
266        readopts: &ReadOptions,
267    ) -> Vec<Result<Option<Vec<u8>>, Error>>
268    where
269        K: AsRef<[u8]>,
270        Iter: IntoIterator<Item = (&'b W, K)>,
271        W: AsColumnFamilyRef + 'b,
272    {
273        self.multi_get_cf_opt(keys_cf, readopts)
274    }
275}
276
277pub struct DBWithThreadModeInner {
278    inner: *mut ffi::rocksdb_t,
279}
280
281impl DBInner for DBWithThreadModeInner {
282    fn inner(&self) -> *mut ffi::rocksdb_t {
283        self.inner
284    }
285}
286
287impl Drop for DBWithThreadModeInner {
288    fn drop(&mut self) {
289        unsafe {
290            ffi::rocksdb_close(self.inner);
291        }
292    }
293}
294
295/// A type alias to RocksDB database.
296///
297/// See crate level documentation for a simple usage example.
298/// See [`DBCommon`] for full list of methods.
299pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
300
301/// A type alias to DB instance type with the single-threaded column family
302/// creations/deletions
303///
304/// # Compatibility and multi-threaded mode
305///
306/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
307/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
308/// column family alternations.
309///
310/// # Limited performance implication for single-threaded mode
311///
312/// Even with [`SingleThreaded`], almost all of RocksDB operations is
313/// multi-threaded unless the underlying RocksDB instance is
314/// specifically configured otherwise. `SingleThreaded` only forces
315/// serialization of column family alternations by requiring `&mut self` of DB
316/// instance due to its wrapper implementation details.
317///
318/// # Multi-threaded mode
319///
320/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
321/// workload including multi-threaded column family alternations, costing the
322/// RwLock overhead inside `DB`.
323#[cfg(not(feature = "multi-threaded-cf"))]
324pub type DB = DBWithThreadMode<SingleThreaded>;
325
326#[cfg(feature = "multi-threaded-cf")]
327pub type DB = DBWithThreadMode<MultiThreaded>;
328
329// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
330// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
331// rocksdb internally does not rely on thread-local information for its user-exposed types.
332unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
333
334// Sync is similarly safe for many types because they do not expose interior mutability, and their
335// use within the rocksdb library is generally behind a const reference
336unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
337
338// Specifies whether open DB for read only.
339enum AccessType<'a> {
340    ReadWrite,
341    ReadOnly { error_if_log_file_exist: bool },
342    Secondary { secondary_path: &'a Path },
343    WithTTL { ttl: Duration },
344}
345
346/// Methods of `DBWithThreadMode`.
347impl<T: ThreadMode> DBWithThreadMode<T> {
348    /// Opens a database with default options.
349    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
350        let mut opts = Options::default();
351        opts.create_if_missing(true);
352        Self::open(&opts, path)
353    }
354
355    /// Opens the database with the specified options.
356    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
357        Self::open_cf(opts, path, None::<&str>)
358    }
359
360    /// Opens the database for read only with the specified options.
361    pub fn open_for_read_only<P: AsRef<Path>>(
362        opts: &Options,
363        path: P,
364        error_if_log_file_exist: bool,
365    ) -> Result<Self, Error> {
366        Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
367    }
368
369    /// Opens the database as a secondary.
370    pub fn open_as_secondary<P: AsRef<Path>>(
371        opts: &Options,
372        primary_path: P,
373        secondary_path: P,
374    ) -> Result<Self, Error> {
375        Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
376    }
377
378    /// Opens the database with a Time to Live compaction filter.
379    pub fn open_with_ttl<P: AsRef<Path>>(
380        opts: &Options,
381        path: P,
382        ttl: Duration,
383    ) -> Result<Self, Error> {
384        Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
385    }
386
387    /// Opens the database with a Time to Live compaction filter and column family names.
388    ///
389    /// Column families opened using this function will be created with default `Options`.
390    pub fn open_cf_with_ttl<P, I, N>(
391        opts: &Options,
392        path: P,
393        cfs: I,
394        ttl: Duration,
395    ) -> Result<Self, Error>
396    where
397        P: AsRef<Path>,
398        I: IntoIterator<Item = N>,
399        N: AsRef<str>,
400    {
401        let cfs = cfs
402            .into_iter()
403            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
404
405        Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
406    }
407
408    /// Opens a database with the given database with a Time to Live compaction filter and
409    /// column family descriptors.
410    pub fn open_cf_descriptors_with_ttl<P, I>(
411        opts: &Options,
412        path: P,
413        cfs: I,
414        ttl: Duration,
415    ) -> Result<Self, Error>
416    where
417        P: AsRef<Path>,
418        I: IntoIterator<Item = ColumnFamilyDescriptor>,
419    {
420        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
421    }
422
423    /// Opens a database with the given database options and column family names.
424    ///
425    /// Column families opened using this function will be created with default `Options`.
426    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
427    where
428        P: AsRef<Path>,
429        I: IntoIterator<Item = N>,
430        N: AsRef<str>,
431    {
432        let cfs = cfs
433            .into_iter()
434            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
435
436        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
437    }
438
439    /// Opens a database with the given database options and column family names.
440    ///
441    /// Column families opened using given `Options`.
442    pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
443    where
444        P: AsRef<Path>,
445        I: IntoIterator<Item = (N, Options)>,
446        N: AsRef<str>,
447    {
448        let cfs = cfs
449            .into_iter()
450            .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
451
452        Self::open_cf_descriptors(opts, path, cfs)
453    }
454
455    /// Opens a database for read only with the given database options and column family names.
456    pub fn open_cf_for_read_only<P, I, N>(
457        opts: &Options,
458        path: P,
459        cfs: I,
460        error_if_log_file_exist: bool,
461    ) -> Result<Self, Error>
462    where
463        P: AsRef<Path>,
464        I: IntoIterator<Item = N>,
465        N: AsRef<str>,
466    {
467        let cfs = cfs
468            .into_iter()
469            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
470
471        Self::open_cf_descriptors_internal(
472            opts,
473            path,
474            cfs,
475            &AccessType::ReadOnly {
476                error_if_log_file_exist,
477            },
478        )
479    }
480
481    /// Opens a database for read only with the given database options and column family names.
482    pub fn open_cf_with_opts_for_read_only<P, I, N>(
483        db_opts: &Options,
484        path: P,
485        cfs: I,
486        error_if_log_file_exist: bool,
487    ) -> Result<Self, Error>
488    where
489        P: AsRef<Path>,
490        I: IntoIterator<Item = (N, Options)>,
491        N: AsRef<str>,
492    {
493        let cfs = cfs
494            .into_iter()
495            .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
496
497        Self::open_cf_descriptors_internal(
498            db_opts,
499            path,
500            cfs,
501            &AccessType::ReadOnly {
502                error_if_log_file_exist,
503            },
504        )
505    }
506
507    /// Opens a database for ready only with the given database options and
508    /// column family descriptors.
509    pub fn open_cf_descriptors_read_only<P, I>(
510        opts: &Options,
511        path: P,
512        cfs: I,
513        error_if_log_file_exist: bool,
514    ) -> Result<Self, Error>
515    where
516        P: AsRef<Path>,
517        I: IntoIterator<Item = ColumnFamilyDescriptor>,
518    {
519        Self::open_cf_descriptors_internal(
520            opts,
521            path,
522            cfs,
523            &AccessType::ReadOnly {
524                error_if_log_file_exist,
525            },
526        )
527    }
528
529    /// Opens the database as a secondary with the given database options and column family names.
530    pub fn open_cf_as_secondary<P, I, N>(
531        opts: &Options,
532        primary_path: P,
533        secondary_path: P,
534        cfs: I,
535    ) -> Result<Self, Error>
536    where
537        P: AsRef<Path>,
538        I: IntoIterator<Item = N>,
539        N: AsRef<str>,
540    {
541        let cfs = cfs
542            .into_iter()
543            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
544
545        Self::open_cf_descriptors_internal(
546            opts,
547            primary_path,
548            cfs,
549            &AccessType::Secondary {
550                secondary_path: secondary_path.as_ref(),
551            },
552        )
553    }
554
555    /// Opens the database as a secondary with the given database options and
556    /// column family descriptors.
557    pub fn open_cf_descriptors_as_secondary<P, I>(
558        opts: &Options,
559        path: P,
560        secondary_path: P,
561        cfs: I,
562    ) -> Result<Self, Error>
563    where
564        P: AsRef<Path>,
565        I: IntoIterator<Item = ColumnFamilyDescriptor>,
566    {
567        Self::open_cf_descriptors_internal(
568            opts,
569            path,
570            cfs,
571            &AccessType::Secondary {
572                secondary_path: secondary_path.as_ref(),
573            },
574        )
575    }
576
577    /// Opens a database with the given database options and column family descriptors.
578    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
579    where
580        P: AsRef<Path>,
581        I: IntoIterator<Item = ColumnFamilyDescriptor>,
582    {
583        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
584    }
585
586    /// Internal implementation for opening RocksDB.
587    fn open_cf_descriptors_internal<P, I>(
588        opts: &Options,
589        path: P,
590        cfs: I,
591        access_type: &AccessType,
592    ) -> Result<Self, Error>
593    where
594        P: AsRef<Path>,
595        I: IntoIterator<Item = ColumnFamilyDescriptor>,
596    {
597        let cfs: Vec<_> = cfs.into_iter().collect();
598        let outlive = iter::once(opts.outlive.clone())
599            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
600            .collect();
601
602        let cpath = to_cpath(&path)?;
603
604        if let Err(e) = fs::create_dir_all(&path) {
605            return Err(Error::new(format!(
606                "Failed to create RocksDB directory: `{:?}`.",
607                e
608            )));
609        }
610
611        let db: *mut ffi::rocksdb_t;
612        let mut cf_map = BTreeMap::new();
613
614        if cfs.is_empty() {
615            db = Self::open_raw(opts, &cpath, access_type)?;
616        } else {
617            let mut cfs_v = cfs;
618            // Always open the default column family.
619            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
620                cfs_v.push(ColumnFamilyDescriptor {
621                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
622                    options: Options::default(),
623                });
624            }
625            // We need to store our CStrings in an intermediate vector
626            // so that their pointers remain valid.
627            let c_cfs: Vec<CString> = cfs_v
628                .iter()
629                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
630                .collect();
631
632            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
633
634            // These handles will be populated by DB.
635            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
636
637            let cfopts: Vec<_> = cfs_v
638                .iter()
639                .map(|cf| cf.options.inner as *const _)
640                .collect();
641
642            db = Self::open_cf_raw(
643                opts,
644                &cpath,
645                &cfs_v,
646                &cfnames,
647                &cfopts,
648                &mut cfhandles,
649                access_type,
650            )?;
651            for handle in &cfhandles {
652                if handle.is_null() {
653                    return Err(Error::new(
654                        "Received null column family handle from DB.".to_owned(),
655                    ));
656                }
657            }
658
659            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
660                cf_map.insert(cf_desc.name.clone(), inner);
661            }
662        }
663
664        if db.is_null() {
665            return Err(Error::new("Could not initialize database.".to_owned()));
666        }
667
668        Ok(Self {
669            inner: DBWithThreadModeInner { inner: db },
670            path: path.as_ref().to_path_buf(),
671            cfs: T::new_cf_map_internal(cf_map),
672            _outlive: outlive,
673        })
674    }
675
676    fn open_raw(
677        opts: &Options,
678        cpath: &CString,
679        access_type: &AccessType,
680    ) -> Result<*mut ffi::rocksdb_t, Error> {
681        let db = unsafe {
682            match *access_type {
683                AccessType::ReadOnly {
684                    error_if_log_file_exist,
685                } => ffi_try!(ffi::rocksdb_open_for_read_only(
686                    opts.inner,
687                    cpath.as_ptr(),
688                    c_uchar::from(error_if_log_file_exist),
689                )),
690                AccessType::ReadWrite => {
691                    ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
692                }
693                AccessType::Secondary { secondary_path } => {
694                    ffi_try!(ffi::rocksdb_open_as_secondary(
695                        opts.inner,
696                        cpath.as_ptr(),
697                        to_cpath(secondary_path)?.as_ptr(),
698                    ))
699                }
700                AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
701                    opts.inner,
702                    cpath.as_ptr(),
703                    ttl.as_secs() as c_int,
704                )),
705            }
706        };
707        Ok(db)
708    }
709
710    #[allow(clippy::pedantic)]
711    fn open_cf_raw(
712        opts: &Options,
713        cpath: &CString,
714        cfs_v: &[ColumnFamilyDescriptor],
715        cfnames: &[*const c_char],
716        cfopts: &[*const ffi::rocksdb_options_t],
717        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
718        access_type: &AccessType,
719    ) -> Result<*mut ffi::rocksdb_t, Error> {
720        let db = unsafe {
721            match *access_type {
722                AccessType::ReadOnly {
723                    error_if_log_file_exist,
724                } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
725                    opts.inner,
726                    cpath.as_ptr(),
727                    cfs_v.len() as c_int,
728                    cfnames.as_ptr(),
729                    cfopts.as_ptr(),
730                    cfhandles.as_mut_ptr(),
731                    c_uchar::from(error_if_log_file_exist),
732                )),
733                AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
734                    opts.inner,
735                    cpath.as_ptr(),
736                    cfs_v.len() as c_int,
737                    cfnames.as_ptr(),
738                    cfopts.as_ptr(),
739                    cfhandles.as_mut_ptr(),
740                )),
741                AccessType::Secondary { secondary_path } => {
742                    ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
743                        opts.inner,
744                        cpath.as_ptr(),
745                        to_cpath(secondary_path)?.as_ptr(),
746                        cfs_v.len() as c_int,
747                        cfnames.as_ptr(),
748                        cfopts.as_ptr(),
749                        cfhandles.as_mut_ptr(),
750                    ))
751                }
752                AccessType::WithTTL { ttl } => {
753                    let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
754                    ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
755                        opts.inner,
756                        cpath.as_ptr(),
757                        cfs_v.len() as c_int,
758                        cfnames.as_ptr(),
759                        cfopts.as_ptr(),
760                        cfhandles.as_mut_ptr(),
761                        ttls_v.as_ptr(),
762                    ))
763                }
764            }
765        };
766        Ok(db)
767    }
768
769    /// Removes the database entries in the range `["from", "to")` using given write options.
770    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
771        &self,
772        cf: &impl AsColumnFamilyRef,
773        from: K,
774        to: K,
775        writeopts: &WriteOptions,
776    ) -> Result<(), Error> {
777        let from = from.as_ref();
778        let to = to.as_ref();
779
780        unsafe {
781            ffi_try!(ffi::rocksdb_delete_range_cf(
782                self.inner.inner(),
783                writeopts.inner,
784                cf.inner(),
785                from.as_ptr() as *const c_char,
786                from.len() as size_t,
787                to.as_ptr() as *const c_char,
788                to.len() as size_t,
789            ));
790            Ok(())
791        }
792    }
793
794    /// Removes the database entries in the range `["from", "to")` using default write options.
795    pub fn delete_range_cf<K: AsRef<[u8]>>(
796        &self,
797        cf: &impl AsColumnFamilyRef,
798        from: K,
799        to: K,
800    ) -> Result<(), Error> {
801        self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
802    }
803
804    pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
805        unsafe {
806            ffi_try!(ffi::rocksdb_write(
807                self.inner.inner(),
808                writeopts.inner,
809                batch.inner
810            ));
811        }
812        Ok(())
813    }
814
815    pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
816        self.write_opt(batch, &WriteOptions::default())
817    }
818
819    pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
820        let mut wo = WriteOptions::new();
821        wo.disable_wal(true);
822        self.write_opt(batch, &wo)
823    }
824}
825
826/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
827impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
828    pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
829        Self {
830            inner,
831            cfs,
832            path,
833            _outlive: outlive,
834        }
835    }
836
837    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
838        let cpath = to_cpath(path)?;
839        let mut length = 0;
840
841        unsafe {
842            let ptr = ffi_try!(ffi::rocksdb_list_column_families(
843                opts.inner,
844                cpath.as_ptr(),
845                &mut length,
846            ));
847
848            let vec = slice::from_raw_parts(ptr, length)
849                .iter()
850                .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
851                .collect();
852            ffi::rocksdb_list_column_families_destroy(ptr, length);
853            Ok(vec)
854        }
855    }
856
857    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
858        let cpath = to_cpath(path)?;
859        unsafe {
860            ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
861        }
862        Ok(())
863    }
864
865    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
866        let cpath = to_cpath(path)?;
867        unsafe {
868            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
869        }
870        Ok(())
871    }
872
873    pub fn path(&self) -> &Path {
874        self.path.as_path()
875    }
876
877    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
878    /// the data to disk.
879    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
880        unsafe {
881            ffi_try!(ffi::rocksdb_flush_wal(
882                self.inner.inner(),
883                c_uchar::from(sync)
884            ));
885        }
886        Ok(())
887    }
888
889    /// Flushes database memtables to SST files on the disk.
890    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
891        unsafe {
892            ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
893        }
894        Ok(())
895    }
896
897    /// Flushes database memtables to SST files on the disk using default options.
898    pub fn flush(&self) -> Result<(), Error> {
899        self.flush_opt(&FlushOptions::default())
900    }
901
902    /// Flushes database memtables to SST files on the disk for a given column family.
903    pub fn flush_cf_opt(
904        &self,
905        cf: &impl AsColumnFamilyRef,
906        flushopts: &FlushOptions,
907    ) -> Result<(), Error> {
908        unsafe {
909            ffi_try!(ffi::rocksdb_flush_cf(
910                self.inner.inner(),
911                flushopts.inner,
912                cf.inner()
913            ));
914        }
915        Ok(())
916    }
917
918    /// Flushes database memtables to SST files on the disk for a given column family using default
919    /// options.
920    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
921        self.flush_cf_opt(cf, &FlushOptions::default())
922    }
923
924    /// Return the bytes associated with a key value with read options. If you only intend to use
925    /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
926    /// to avoid unnecessary memory copy.
927    pub fn get_opt<K: AsRef<[u8]>>(
928        &self,
929        key: K,
930        readopts: &ReadOptions,
931    ) -> Result<Option<Vec<u8>>, Error> {
932        self.get_pinned_opt(key, readopts)
933            .map(|x| x.map(|v| v.as_ref().to_vec()))
934    }
935
936    /// Return the bytes associated with a key value. If you only intend to use the vector returned
937    /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
938    /// copy.
939    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
940        self.get_opt(key.as_ref(), &ReadOptions::default())
941    }
942
943    /// Return the bytes associated with a key value and the given column family with read options.
944    /// If you only intend to use the vector returned temporarily, consider using
945    /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
946    pub fn get_cf_opt<K: AsRef<[u8]>>(
947        &self,
948        cf: &impl AsColumnFamilyRef,
949        key: K,
950        readopts: &ReadOptions,
951    ) -> Result<Option<Vec<u8>>, Error> {
952        self.get_pinned_cf_opt(cf, key, readopts)
953            .map(|x| x.map(|v| v.as_ref().to_vec()))
954    }
955
956    /// Return the bytes associated with a key value and the given column family. If you only
957    /// intend to use the vector returned temporarily, consider using
958    /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
959    pub fn get_cf<K: AsRef<[u8]>>(
960        &self,
961        cf: &impl AsColumnFamilyRef,
962        key: K,
963    ) -> Result<Option<Vec<u8>>, Error> {
964        self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
965    }
966
967    /// Return the value associated with a key using RocksDB's PinnableSlice
968    /// so as to avoid unnecessary memory copy.
969    pub fn get_pinned_opt<K: AsRef<[u8]>>(
970        &self,
971        key: K,
972        readopts: &ReadOptions,
973    ) -> Result<Option<DBPinnableSlice>, Error> {
974        if readopts.inner.is_null() {
975            return Err(Error::new(
976                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
977                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
978                    .to_owned(),
979            ));
980        }
981
982        let key = key.as_ref();
983        unsafe {
984            let val = ffi_try!(ffi::rocksdb_get_pinned(
985                self.inner.inner(),
986                readopts.inner,
987                key.as_ptr() as *const c_char,
988                key.len() as size_t,
989            ));
990            if val.is_null() {
991                Ok(None)
992            } else {
993                Ok(Some(DBPinnableSlice::from_c(val)))
994            }
995        }
996    }
997
998    /// Return the value associated with a key using RocksDB's PinnableSlice
999    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1000    /// leverages default options.
1001    pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1002        self.get_pinned_opt(key, &ReadOptions::default())
1003    }
1004
1005    /// Return the value associated with a key using RocksDB's PinnableSlice
1006    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1007    /// allows specifying ColumnFamily
1008    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1009        &self,
1010        cf: &impl AsColumnFamilyRef,
1011        key: K,
1012        readopts: &ReadOptions,
1013    ) -> Result<Option<DBPinnableSlice>, Error> {
1014        if readopts.inner.is_null() {
1015            return Err(Error::new(
1016                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1017                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1018                    .to_owned(),
1019            ));
1020        }
1021
1022        let key = key.as_ref();
1023        unsafe {
1024            let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1025                self.inner.inner(),
1026                readopts.inner,
1027                cf.inner(),
1028                key.as_ptr() as *const c_char,
1029                key.len() as size_t,
1030            ));
1031            if val.is_null() {
1032                Ok(None)
1033            } else {
1034                Ok(Some(DBPinnableSlice::from_c(val)))
1035            }
1036        }
1037    }
1038
1039    /// Return the value associated with a key using RocksDB's PinnableSlice
1040    /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
1041    /// leverages default options.
1042    pub fn get_pinned_cf<K: AsRef<[u8]>>(
1043        &self,
1044        cf: &impl AsColumnFamilyRef,
1045        key: K,
1046    ) -> Result<Option<DBPinnableSlice>, Error> {
1047        self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1048    }
1049
1050    /// Return the values associated with the given keys.
1051    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1052    where
1053        K: AsRef<[u8]>,
1054        I: IntoIterator<Item = K>,
1055    {
1056        self.multi_get_opt(keys, &ReadOptions::default())
1057    }
1058
1059    /// Return the values associated with the given keys using read options.
1060    pub fn multi_get_opt<K, I>(
1061        &self,
1062        keys: I,
1063        readopts: &ReadOptions,
1064    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1065    where
1066        K: AsRef<[u8]>,
1067        I: IntoIterator<Item = K>,
1068    {
1069        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1070            .into_iter()
1071            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1072            .unzip();
1073        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1074
1075        let mut values = vec![ptr::null_mut(); keys.len()];
1076        let mut values_sizes = vec![0_usize; keys.len()];
1077        let mut errors = vec![ptr::null_mut(); keys.len()];
1078        unsafe {
1079            ffi::rocksdb_multi_get(
1080                self.inner.inner(),
1081                readopts.inner,
1082                ptr_keys.len(),
1083                ptr_keys.as_ptr(),
1084                keys_sizes.as_ptr(),
1085                values.as_mut_ptr(),
1086                values_sizes.as_mut_ptr(),
1087                errors.as_mut_ptr(),
1088            );
1089        }
1090
1091        convert_values(values, values_sizes, errors)
1092    }
1093
1094    /// Return the values associated with the given keys and column families.
1095    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1096        &'a self,
1097        keys: I,
1098    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1099    where
1100        K: AsRef<[u8]>,
1101        I: IntoIterator<Item = (&'b W, K)>,
1102        W: 'b + AsColumnFamilyRef,
1103    {
1104        self.multi_get_cf_opt(keys, &ReadOptions::default())
1105    }
1106
1107    /// Return the values associated with the given keys and column families using read options.
1108    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1109        &'a self,
1110        keys: I,
1111        readopts: &ReadOptions,
1112    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1113    where
1114        K: AsRef<[u8]>,
1115        I: IntoIterator<Item = (&'b W, K)>,
1116        W: 'b + AsColumnFamilyRef,
1117    {
1118        let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1119            .into_iter()
1120            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1121            .unzip();
1122        let ptr_keys: Vec<_> = cfs_and_keys
1123            .iter()
1124            .map(|(_, k)| k.as_ptr() as *const c_char)
1125            .collect();
1126        let ptr_cfs: Vec<_> = cfs_and_keys
1127            .iter()
1128            .map(|(c, _)| c.inner() as *const _)
1129            .collect();
1130
1131        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1132        let mut values_sizes = vec![0_usize; ptr_keys.len()];
1133        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1134        unsafe {
1135            ffi::rocksdb_multi_get_cf(
1136                self.inner.inner(),
1137                readopts.inner,
1138                ptr_cfs.as_ptr(),
1139                ptr_keys.len(),
1140                ptr_keys.as_ptr(),
1141                keys_sizes.as_ptr(),
1142                values.as_mut_ptr(),
1143                values_sizes.as_mut_ptr(),
1144                errors.as_mut_ptr(),
1145            );
1146        }
1147
1148        convert_values(values, values_sizes, errors)
1149    }
1150
1151    /// Return the values associated with the given keys and the specified column family
1152    /// where internally the read requests are processed in batch if block-based table
1153    /// SST format is used.  It is a more optimized version of multi_get_cf.
1154    pub fn batched_multi_get_cf<K, I>(
1155        &self,
1156        cf: &impl AsColumnFamilyRef,
1157        keys: I,
1158        sorted_input: bool,
1159    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1160    where
1161        K: AsRef<[u8]>,
1162        I: IntoIterator<Item = K>,
1163    {
1164        self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1165    }
1166
1167    /// Return the values associated with the given keys and the specified column family
1168    /// where internally the read requests are processed in batch if block-based table
1169    /// SST format is used.  It is a more optimized version of multi_get_cf_opt.
1170    pub fn batched_multi_get_cf_opt<K, I>(
1171        &self,
1172        cf: &impl AsColumnFamilyRef,
1173        keys: I,
1174        sorted_input: bool,
1175        readopts: &ReadOptions,
1176    ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1177    where
1178        K: AsRef<[u8]>,
1179        I: IntoIterator<Item = K>,
1180    {
1181        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1182            .into_iter()
1183            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1184            .unzip();
1185        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1186
1187        let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1188        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1189
1190        unsafe {
1191            ffi::rocksdb_batched_multi_get_cf(
1192                self.inner.inner(),
1193                readopts.inner,
1194                cf.inner(),
1195                ptr_keys.len(),
1196                ptr_keys.as_ptr(),
1197                keys_sizes.as_ptr(),
1198                pinned_values.as_mut_ptr(),
1199                errors.as_mut_ptr(),
1200                sorted_input,
1201            );
1202            pinned_values
1203                .into_iter()
1204                .zip(errors.into_iter())
1205                .map(|(v, e)| {
1206                    if e.is_null() {
1207                        if v.is_null() {
1208                            Ok(None)
1209                        } else {
1210                            Ok(Some(DBPinnableSlice::from_c(v)))
1211                        }
1212                    } else {
1213                        Err(Error::new(crate::ffi_util::error_message(e)))
1214                    }
1215                })
1216                .collect()
1217        }
1218    }
1219
1220    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1221    /// `true`. This function uses default `ReadOptions`.
1222    pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1223        self.key_may_exist_opt(key, &ReadOptions::default())
1224    }
1225
1226    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1227    /// `true`.
1228    pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1229        let key = key.as_ref();
1230        unsafe {
1231            0 != ffi::rocksdb_key_may_exist(
1232                self.inner.inner(),
1233                readopts.inner,
1234                key.as_ptr() as *const c_char,
1235                key.len() as size_t,
1236                ptr::null_mut(), /*value*/
1237                ptr::null_mut(), /*val_len*/
1238                ptr::null(),     /*timestamp*/
1239                0,               /*timestamp_len*/
1240                ptr::null_mut(), /*value_found*/
1241            )
1242        }
1243    }
1244
1245    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1246    /// otherwise returns `true`. This function uses default `ReadOptions`.
1247    pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1248        self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1249    }
1250
1251    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1252    /// otherwise returns `true`.
1253    pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1254        &self,
1255        cf: &impl AsColumnFamilyRef,
1256        key: K,
1257        readopts: &ReadOptions,
1258    ) -> bool {
1259        let key = key.as_ref();
1260        0 != unsafe {
1261            ffi::rocksdb_key_may_exist_cf(
1262                self.inner.inner(),
1263                readopts.inner,
1264                cf.inner(),
1265                key.as_ptr() as *const c_char,
1266                key.len() as size_t,
1267                ptr::null_mut(), /*value*/
1268                ptr::null_mut(), /*val_len*/
1269                ptr::null(),     /*timestamp*/
1270                0,               /*timestamp_len*/
1271                ptr::null_mut(), /*value_found*/
1272            )
1273        }
1274    }
1275
1276    fn create_inner_cf_handle(
1277        &self,
1278        name: impl CStrLike,
1279        opts: &Options,
1280    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1281        let cf_name = name.bake().map_err(|err| {
1282            Error::new(format!(
1283                "Failed to convert path to CString when creating cf: {err}"
1284            ))
1285        })?;
1286        Ok(unsafe {
1287            ffi_try!(ffi::rocksdb_create_column_family(
1288                self.inner.inner(),
1289                opts.inner,
1290                cf_name.as_ptr(),
1291            ))
1292        })
1293    }
1294
1295    pub fn iterator<'a: 'b, 'b>(
1296        &'a self,
1297        mode: IteratorMode,
1298    ) -> DBIteratorWithThreadMode<'b, Self> {
1299        let readopts = ReadOptions::default();
1300        self.iterator_opt(mode, readopts)
1301    }
1302
1303    pub fn iterator_opt<'a: 'b, 'b>(
1304        &'a self,
1305        mode: IteratorMode,
1306        readopts: ReadOptions,
1307    ) -> DBIteratorWithThreadMode<'b, Self> {
1308        DBIteratorWithThreadMode::new(self, readopts, mode)
1309    }
1310
1311    /// Opens an iterator using the provided ReadOptions.
1312    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
1313    pub fn iterator_cf_opt<'a: 'b, 'b>(
1314        &'a self,
1315        cf_handle: &impl AsColumnFamilyRef,
1316        readopts: ReadOptions,
1317        mode: IteratorMode,
1318    ) -> DBIteratorWithThreadMode<'b, Self> {
1319        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1320    }
1321
1322    /// Opens an iterator with `set_total_order_seek` enabled.
1323    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
1324    /// with a Hash-based implementation.
1325    pub fn full_iterator<'a: 'b, 'b>(
1326        &'a self,
1327        mode: IteratorMode,
1328    ) -> DBIteratorWithThreadMode<'b, Self> {
1329        let mut opts = ReadOptions::default();
1330        opts.set_total_order_seek(true);
1331        DBIteratorWithThreadMode::new(self, opts, mode)
1332    }
1333
1334    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1335        &'a self,
1336        prefix: P,
1337    ) -> DBIteratorWithThreadMode<'b, Self> {
1338        let mut opts = ReadOptions::default();
1339        opts.set_prefix_same_as_start(true);
1340        DBIteratorWithThreadMode::new(
1341            self,
1342            opts,
1343            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1344        )
1345    }
1346
1347    pub fn iterator_cf<'a: 'b, 'b>(
1348        &'a self,
1349        cf_handle: &impl AsColumnFamilyRef,
1350        mode: IteratorMode,
1351    ) -> DBIteratorWithThreadMode<'b, Self> {
1352        let opts = ReadOptions::default();
1353        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1354    }
1355
1356    pub fn full_iterator_cf<'a: 'b, 'b>(
1357        &'a self,
1358        cf_handle: &impl AsColumnFamilyRef,
1359        mode: IteratorMode,
1360    ) -> DBIteratorWithThreadMode<'b, Self> {
1361        let mut opts = ReadOptions::default();
1362        opts.set_total_order_seek(true);
1363        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1364    }
1365
1366    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1367        &'a self,
1368        cf_handle: &impl AsColumnFamilyRef,
1369        prefix: P,
1370    ) -> DBIteratorWithThreadMode<'a, Self> {
1371        let mut opts = ReadOptions::default();
1372        opts.set_prefix_same_as_start(true);
1373        DBIteratorWithThreadMode::<'a, Self>::new_cf(
1374            self,
1375            cf_handle.inner(),
1376            opts,
1377            IteratorMode::From(prefix.as_ref(), Direction::Forward),
1378        )
1379    }
1380
1381    /// Opens a raw iterator over the database, using the default read options
1382    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1383        let opts = ReadOptions::default();
1384        DBRawIteratorWithThreadMode::new(self, opts)
1385    }
1386
1387    /// Opens a raw iterator over the given column family, using the default read options
1388    pub fn raw_iterator_cf<'a: 'b, 'b>(
1389        &'a self,
1390        cf_handle: &impl AsColumnFamilyRef,
1391    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1392        let opts = ReadOptions::default();
1393        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1394    }
1395
1396    /// Opens a raw iterator over the database, using the given read options
1397    pub fn raw_iterator_opt<'a: 'b, 'b>(
1398        &'a self,
1399        readopts: ReadOptions,
1400    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1401        DBRawIteratorWithThreadMode::new(self, readopts)
1402    }
1403
1404    /// Opens a raw iterator over the given column family, using the given read options
1405    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1406        &'a self,
1407        cf_handle: &impl AsColumnFamilyRef,
1408        readopts: ReadOptions,
1409    ) -> DBRawIteratorWithThreadMode<'b, Self> {
1410        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1411    }
1412
1413    pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1414        SnapshotWithThreadMode::<Self>::new(self)
1415    }
1416
1417    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1418    where
1419        K: AsRef<[u8]>,
1420        V: AsRef<[u8]>,
1421    {
1422        let key = key.as_ref();
1423        let value = value.as_ref();
1424
1425        unsafe {
1426            ffi_try!(ffi::rocksdb_put(
1427                self.inner.inner(),
1428                writeopts.inner,
1429                key.as_ptr() as *const c_char,
1430                key.len() as size_t,
1431                value.as_ptr() as *const c_char,
1432                value.len() as size_t,
1433            ));
1434            Ok(())
1435        }
1436    }
1437
1438    pub fn put_cf_opt<K, V>(
1439        &self,
1440        cf: &impl AsColumnFamilyRef,
1441        key: K,
1442        value: V,
1443        writeopts: &WriteOptions,
1444    ) -> Result<(), Error>
1445    where
1446        K: AsRef<[u8]>,
1447        V: AsRef<[u8]>,
1448    {
1449        let key = key.as_ref();
1450        let value = value.as_ref();
1451
1452        unsafe {
1453            ffi_try!(ffi::rocksdb_put_cf(
1454                self.inner.inner(),
1455                writeopts.inner,
1456                cf.inner(),
1457                key.as_ptr() as *const c_char,
1458                key.len() as size_t,
1459                value.as_ptr() as *const c_char,
1460                value.len() as size_t,
1461            ));
1462            Ok(())
1463        }
1464    }
1465
1466    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1467    where
1468        K: AsRef<[u8]>,
1469        V: AsRef<[u8]>,
1470    {
1471        let key = key.as_ref();
1472        let value = value.as_ref();
1473
1474        unsafe {
1475            ffi_try!(ffi::rocksdb_merge(
1476                self.inner.inner(),
1477                writeopts.inner,
1478                key.as_ptr() as *const c_char,
1479                key.len() as size_t,
1480                value.as_ptr() as *const c_char,
1481                value.len() as size_t,
1482            ));
1483            Ok(())
1484        }
1485    }
1486
1487    pub fn merge_cf_opt<K, V>(
1488        &self,
1489        cf: &impl AsColumnFamilyRef,
1490        key: K,
1491        value: V,
1492        writeopts: &WriteOptions,
1493    ) -> Result<(), Error>
1494    where
1495        K: AsRef<[u8]>,
1496        V: AsRef<[u8]>,
1497    {
1498        let key = key.as_ref();
1499        let value = value.as_ref();
1500
1501        unsafe {
1502            ffi_try!(ffi::rocksdb_merge_cf(
1503                self.inner.inner(),
1504                writeopts.inner,
1505                cf.inner(),
1506                key.as_ptr() as *const c_char,
1507                key.len() as size_t,
1508                value.as_ptr() as *const c_char,
1509                value.len() as size_t,
1510            ));
1511            Ok(())
1512        }
1513    }
1514
1515    pub fn delete_opt<K: AsRef<[u8]>>(
1516        &self,
1517        key: K,
1518        writeopts: &WriteOptions,
1519    ) -> Result<(), Error> {
1520        let key = key.as_ref();
1521
1522        unsafe {
1523            ffi_try!(ffi::rocksdb_delete(
1524                self.inner.inner(),
1525                writeopts.inner,
1526                key.as_ptr() as *const c_char,
1527                key.len() as size_t,
1528            ));
1529            Ok(())
1530        }
1531    }
1532
1533    pub fn delete_cf_opt<K: AsRef<[u8]>>(
1534        &self,
1535        cf: &impl AsColumnFamilyRef,
1536        key: K,
1537        writeopts: &WriteOptions,
1538    ) -> Result<(), Error> {
1539        let key = key.as_ref();
1540
1541        unsafe {
1542            ffi_try!(ffi::rocksdb_delete_cf(
1543                self.inner.inner(),
1544                writeopts.inner,
1545                cf.inner(),
1546                key.as_ptr() as *const c_char,
1547                key.len() as size_t,
1548            ));
1549            Ok(())
1550        }
1551    }
1552
1553    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1554    where
1555        K: AsRef<[u8]>,
1556        V: AsRef<[u8]>,
1557    {
1558        self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1559    }
1560
1561    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1562    where
1563        K: AsRef<[u8]>,
1564        V: AsRef<[u8]>,
1565    {
1566        self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1567    }
1568
1569    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1570    where
1571        K: AsRef<[u8]>,
1572        V: AsRef<[u8]>,
1573    {
1574        self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1575    }
1576
1577    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1578    where
1579        K: AsRef<[u8]>,
1580        V: AsRef<[u8]>,
1581    {
1582        self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1583    }
1584
1585    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1586        self.delete_opt(key.as_ref(), &WriteOptions::default())
1587    }
1588
1589    pub fn delete_cf<K: AsRef<[u8]>>(
1590        &self,
1591        cf: &impl AsColumnFamilyRef,
1592        key: K,
1593    ) -> Result<(), Error> {
1594        self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1595    }
1596
1597    /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
1598    pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1599        unsafe {
1600            let start = start.as_ref().map(AsRef::as_ref);
1601            let end = end.as_ref().map(AsRef::as_ref);
1602
1603            ffi::rocksdb_compact_range(
1604                self.inner.inner(),
1605                opt_bytes_to_ptr(start),
1606                start.map_or(0, <[u8]>::len) as size_t,
1607                opt_bytes_to_ptr(end),
1608                end.map_or(0, <[u8]>::len) as size_t,
1609            );
1610        }
1611    }
1612
1613    /// Same as `compact_range` but with custom options.
1614    pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1615        &self,
1616        start: Option<S>,
1617        end: Option<E>,
1618        opts: &CompactOptions,
1619    ) {
1620        unsafe {
1621            let start = start.as_ref().map(AsRef::as_ref);
1622            let end = end.as_ref().map(AsRef::as_ref);
1623
1624            ffi::rocksdb_compact_range_opt(
1625                self.inner.inner(),
1626                opts.inner,
1627                opt_bytes_to_ptr(start),
1628                start.map_or(0, <[u8]>::len) as size_t,
1629                opt_bytes_to_ptr(end),
1630                end.map_or(0, <[u8]>::len) as size_t,
1631            );
1632        }
1633    }
1634
1635    /// Runs a manual compaction on the Range of keys given on the
1636    /// given column family. This is not likely to be needed for typical usage.
1637    pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1638        &self,
1639        cf: &impl AsColumnFamilyRef,
1640        start: Option<S>,
1641        end: Option<E>,
1642    ) {
1643        unsafe {
1644            let start = start.as_ref().map(AsRef::as_ref);
1645            let end = end.as_ref().map(AsRef::as_ref);
1646
1647            ffi::rocksdb_compact_range_cf(
1648                self.inner.inner(),
1649                cf.inner(),
1650                opt_bytes_to_ptr(start),
1651                start.map_or(0, <[u8]>::len) as size_t,
1652                opt_bytes_to_ptr(end),
1653                end.map_or(0, <[u8]>::len) as size_t,
1654            );
1655        }
1656    }
1657
1658    /// Same as `compact_range_cf` but with custom options.
1659    pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1660        &self,
1661        cf: &impl AsColumnFamilyRef,
1662        start: Option<S>,
1663        end: Option<E>,
1664        opts: &CompactOptions,
1665    ) {
1666        unsafe {
1667            let start = start.as_ref().map(AsRef::as_ref);
1668            let end = end.as_ref().map(AsRef::as_ref);
1669
1670            ffi::rocksdb_compact_range_cf_opt(
1671                self.inner.inner(),
1672                cf.inner(),
1673                opts.inner,
1674                opt_bytes_to_ptr(start),
1675                start.map_or(0, <[u8]>::len) as size_t,
1676                opt_bytes_to_ptr(end),
1677                end.map_or(0, <[u8]>::len) as size_t,
1678            );
1679        }
1680    }
1681
1682    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1683        let copts = convert_options(opts)?;
1684        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1685        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1686        let count = opts.len() as i32;
1687        unsafe {
1688            ffi_try!(ffi::rocksdb_set_options(
1689                self.inner.inner(),
1690                count,
1691                cnames.as_ptr(),
1692                cvalues.as_ptr(),
1693            ));
1694        }
1695        Ok(())
1696    }
1697
1698    pub fn set_options_cf(
1699        &self,
1700        cf: &impl AsColumnFamilyRef,
1701        opts: &[(&str, &str)],
1702    ) -> Result<(), Error> {
1703        let copts = convert_options(opts)?;
1704        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1705        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1706        let count = opts.len() as i32;
1707        unsafe {
1708            ffi_try!(ffi::rocksdb_set_options_cf(
1709                self.inner.inner(),
1710                cf.inner(),
1711                count,
1712                cnames.as_ptr(),
1713                cvalues.as_ptr(),
1714            ));
1715        }
1716        Ok(())
1717    }
1718
1719    /// Implementation for property_value et al methods.
1720    ///
1721    /// `name` is the name of the property.  It will be converted into a CString
1722    /// and passed to `get_property` as argument.  `get_property` reads the
1723    /// specified property and either returns NULL or a pointer to a C allocated
1724    /// string; this method takes ownership of that string and will free it at
1725    /// the end. That string is parsed using `parse` callback which produces
1726    /// the returned result.
1727    fn property_value_impl<R>(
1728        name: impl CStrLike,
1729        get_property: impl FnOnce(*const c_char) -> *mut c_char,
1730        parse: impl FnOnce(&str) -> Result<R, Error>,
1731    ) -> Result<Option<R>, Error> {
1732        let value = match name.bake() {
1733            Ok(prop_name) => get_property(prop_name.as_ptr()),
1734            Err(e) => {
1735                return Err(Error::new(format!(
1736                    "Failed to convert property name to CString: {}",
1737                    e
1738                )));
1739            }
1740        };
1741        if value.is_null() {
1742            return Ok(None);
1743        }
1744        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1745            Ok(s) => parse(s).map(|value| Some(value)),
1746            Err(e) => Err(Error::new(format!(
1747                "Failed to convert property value to string: {}",
1748                e
1749            ))),
1750        };
1751        unsafe {
1752            libc::free(value as *mut c_void);
1753        }
1754        result
1755    }
1756
1757    /// Retrieves a RocksDB property by name.
1758    ///
1759    /// Full list of properties could be find
1760    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1761    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1762        Self::property_value_impl(
1763            name,
1764            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1765            |str_value| Ok(str_value.to_owned()),
1766        )
1767    }
1768
1769    /// Retrieves a RocksDB property by name, for a specific column family.
1770    ///
1771    /// Full list of properties could be find
1772    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
1773    pub fn property_value_cf(
1774        &self,
1775        cf: &impl AsColumnFamilyRef,
1776        name: impl CStrLike,
1777    ) -> Result<Option<String>, Error> {
1778        Self::property_value_impl(
1779            name,
1780            |prop_name| unsafe {
1781                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1782            },
1783            |str_value| Ok(str_value.to_owned()),
1784        )
1785    }
1786
1787    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1788        value.parse::<u64>().map_err(|err| {
1789            Error::new(format!(
1790                "Failed to convert property value {} to int: {}",
1791                value, err
1792            ))
1793        })
1794    }
1795
1796    /// Retrieves a RocksDB property and casts it to an integer.
1797    ///
1798    /// Full list of properties that return int values could be find
1799    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1800    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1801        Self::property_value_impl(
1802            name,
1803            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1804            Self::parse_property_int_value,
1805        )
1806    }
1807
1808    /// Retrieves a RocksDB property for a specific column family and casts it to an integer.
1809    ///
1810    /// Full list of properties that return int values could be find
1811    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
1812    pub fn property_int_value_cf(
1813        &self,
1814        cf: &impl AsColumnFamilyRef,
1815        name: impl CStrLike,
1816    ) -> Result<Option<u64>, Error> {
1817        Self::property_value_impl(
1818            name,
1819            |prop_name| unsafe {
1820                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1821            },
1822            Self::parse_property_int_value,
1823        )
1824    }
1825
1826    /// The sequence number of the most recent transaction.
1827    pub fn latest_sequence_number(&self) -> u64 {
1828        unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1829    }
1830
1831    /// Iterate over batches of write operations since a given sequence.
1832    ///
1833    /// Produce an iterator that will provide the batches of write operations
1834    /// that have occurred since the given sequence (see
1835    /// `latest_sequence_number()`). Use the provided iterator to retrieve each
1836    /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
1837    /// deletes using the `WriteBatch::iterate()` function.
1838    ///
1839    /// Calling `get_updates_since()` with a sequence number that is out of
1840    /// bounds will return an error.
1841    pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1842        unsafe {
1843            // rocksdb_wal_readoptions_t does not appear to have any functions
1844            // for creating and destroying it; fortunately we can pass a nullptr
1845            // here to get the default behavior
1846            let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1847            let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1848                self.inner.inner(),
1849                seq_number,
1850                opts
1851            ));
1852            Ok(DBWALIterator { inner: iter })
1853        }
1854    }
1855
1856    /// Tries to catch up with the primary by reading as much as possible from the
1857    /// log files.
1858    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1859        unsafe {
1860            ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1861        }
1862        Ok(())
1863    }
1864
1865    /// Loads a list of external SST files created with SstFileWriter into the DB with default opts
1866    pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1867        let opts = IngestExternalFileOptions::default();
1868        self.ingest_external_file_opts(&opts, paths)
1869    }
1870
1871    /// Loads a list of external SST files created with SstFileWriter into the DB
1872    pub fn ingest_external_file_opts<P: AsRef<Path>>(
1873        &self,
1874        opts: &IngestExternalFileOptions,
1875        paths: Vec<P>,
1876    ) -> Result<(), Error> {
1877        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1878        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1879
1880        self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1881    }
1882
1883    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1884    /// with default opts
1885    pub fn ingest_external_file_cf<P: AsRef<Path>>(
1886        &self,
1887        cf: &impl AsColumnFamilyRef,
1888        paths: Vec<P>,
1889    ) -> Result<(), Error> {
1890        let opts = IngestExternalFileOptions::default();
1891        self.ingest_external_file_cf_opts(cf, &opts, paths)
1892    }
1893
1894    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
1895    pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1896        &self,
1897        cf: &impl AsColumnFamilyRef,
1898        opts: &IngestExternalFileOptions,
1899        paths: Vec<P>,
1900    ) -> Result<(), Error> {
1901        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1902        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1903
1904        self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1905    }
1906
1907    fn ingest_external_file_raw(
1908        &self,
1909        opts: &IngestExternalFileOptions,
1910        paths_v: &[CString],
1911        cpaths: &[*const c_char],
1912    ) -> Result<(), Error> {
1913        unsafe {
1914            ffi_try!(ffi::rocksdb_ingest_external_file(
1915                self.inner.inner(),
1916                cpaths.as_ptr(),
1917                paths_v.len(),
1918                opts.inner as *const _
1919            ));
1920            Ok(())
1921        }
1922    }
1923
1924    fn ingest_external_file_raw_cf(
1925        &self,
1926        cf: &impl AsColumnFamilyRef,
1927        opts: &IngestExternalFileOptions,
1928        paths_v: &[CString],
1929        cpaths: &[*const c_char],
1930    ) -> Result<(), Error> {
1931        unsafe {
1932            ffi_try!(ffi::rocksdb_ingest_external_file_cf(
1933                self.inner.inner(),
1934                cf.inner(),
1935                cpaths.as_ptr(),
1936                paths_v.len(),
1937                opts.inner as *const _
1938            ));
1939            Ok(())
1940        }
1941    }
1942
1943    /// Returns a list of all table files with their level, start key
1944    /// and end key
1945    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
1946        unsafe {
1947            let files = ffi::rocksdb_livefiles(self.inner.inner());
1948            if files.is_null() {
1949                Err(Error::new("Could not get live files".to_owned()))
1950            } else {
1951                let n = ffi::rocksdb_livefiles_count(files);
1952
1953                let mut livefiles = Vec::with_capacity(n as usize);
1954                let mut key_size: usize = 0;
1955
1956                for i in 0..n {
1957                    let column_family_name =
1958                        from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
1959                    let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
1960                    let size = ffi::rocksdb_livefiles_size(files, i);
1961                    let level = ffi::rocksdb_livefiles_level(files, i);
1962
1963                    // get smallest key inside file
1964                    let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
1965                    let smallest_key = raw_data(smallest_key, key_size);
1966
1967                    // get largest key inside file
1968                    let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
1969                    let largest_key = raw_data(largest_key, key_size);
1970
1971                    livefiles.push(LiveFile {
1972                        column_family_name,
1973                        name,
1974                        size,
1975                        level,
1976                        start_key: smallest_key,
1977                        end_key: largest_key,
1978                        num_entries: ffi::rocksdb_livefiles_entries(files, i),
1979                        num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
1980                    });
1981                }
1982
1983                // destroy livefiles metadata(s)
1984                ffi::rocksdb_livefiles_destroy(files);
1985
1986                // return
1987                Ok(livefiles)
1988            }
1989        }
1990    }
1991
1992    /// Delete sst files whose keys are entirely in the given range.
1993    ///
1994    /// Could leave some keys in the range which are in files which are not
1995    /// entirely in the range.
1996    ///
1997    /// Note: L0 files are left regardless of whether they're in the range.
1998    ///
1999    /// SnapshotWithThreadModes before the delete might not see the data in the given range.
2000    pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2001        let from = from.as_ref();
2002        let to = to.as_ref();
2003        unsafe {
2004            ffi_try!(ffi::rocksdb_delete_file_in_range(
2005                self.inner.inner(),
2006                from.as_ptr() as *const c_char,
2007                from.len() as size_t,
2008                to.as_ptr() as *const c_char,
2009                to.len() as size_t,
2010            ));
2011            Ok(())
2012        }
2013    }
2014
2015    /// Same as `delete_file_in_range` but only for specific column family
2016    pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2017        &self,
2018        cf: &impl AsColumnFamilyRef,
2019        from: K,
2020        to: K,
2021    ) -> Result<(), Error> {
2022        let from = from.as_ref();
2023        let to = to.as_ref();
2024        unsafe {
2025            ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2026                self.inner.inner(),
2027                cf.inner(),
2028                from.as_ptr() as *const c_char,
2029                from.len() as size_t,
2030                to.as_ptr() as *const c_char,
2031                to.len() as size_t,
2032            ));
2033            Ok(())
2034        }
2035    }
2036
2037    /// Request stopping background work, if wait is true wait until it's done.
2038    pub fn cancel_all_background_work(&self, wait: bool) {
2039        unsafe {
2040            ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2041        }
2042    }
2043
2044    fn drop_column_family<C>(
2045        &self,
2046        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2047        cf: C,
2048    ) -> Result<(), Error> {
2049        unsafe {
2050            // first mark the column family as dropped
2051            ffi_try!(ffi::rocksdb_drop_column_family(
2052                self.inner.inner(),
2053                cf_inner
2054            ));
2055        }
2056        // then finally reclaim any resources (mem, files) by destroying the only single column
2057        // family handle by drop()-ing it
2058        drop(cf);
2059        Ok(())
2060    }
2061}
2062
2063impl<I: DBInner> DBCommon<SingleThreaded, I> {
2064    /// Creates column family with given name and options
2065    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2066        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2067        self.cfs
2068            .cfs
2069            .insert(name.as_ref().to_string(), ColumnFamily { inner });
2070        Ok(())
2071    }
2072
2073    /// Drops the column family with the given name
2074    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2075        if let Some(cf) = self.cfs.cfs.remove(name) {
2076            self.drop_column_family(cf.inner, cf)
2077        } else {
2078            Err(Error::new(format!("Invalid column family: {name}")))
2079        }
2080    }
2081
2082    /// Returns the underlying column family handle
2083    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2084        self.cfs.cfs.get(name)
2085    }
2086}
2087
2088impl<I: DBInner> DBCommon<MultiThreaded, I> {
2089    /// Creates column family with given name and options
2090    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2091        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2092        self.cfs.cfs.write().unwrap().insert(
2093            name.as_ref().to_string(),
2094            Arc::new(UnboundColumnFamily { inner }),
2095        );
2096        Ok(())
2097    }
2098
2099    /// Drops the column family with the given name by internally locking the inner column
2100    /// family map. This avoids needing `&mut self` reference
2101    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2102        if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2103            self.drop_column_family(cf.inner, cf)
2104        } else {
2105            Err(Error::new(format!("Invalid column family: {name}")))
2106        }
2107    }
2108
2109    /// Returns the underlying column family handle
2110    pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2111        self.cfs
2112            .cfs
2113            .read()
2114            .unwrap()
2115            .get(name)
2116            .cloned()
2117            .map(UnboundColumnFamily::bound_column_family)
2118    }
2119}
2120
2121impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2122    fn drop(&mut self) {
2123        self.cfs.drop_all_cfs_internal();
2124    }
2125}
2126
2127impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2128    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2129        write!(f, "RocksDB {{ path: {:?} }}", self.path())
2130    }
2131}
2132
2133/// The metadata that describes a SST file
2134#[derive(Debug, Clone)]
2135pub struct LiveFile {
2136    /// Name of the column family the file belongs to
2137    pub column_family_name: String,
2138    /// Name of the file
2139    pub name: String,
2140    /// Size of the file
2141    pub size: usize,
2142    /// Level at which this file resides
2143    pub level: i32,
2144    /// Smallest user defined key in the file
2145    pub start_key: Option<Vec<u8>>,
2146    /// Largest user defined key in the file
2147    pub end_key: Option<Vec<u8>>,
2148    /// Number of entries/alive keys in the file
2149    pub num_entries: u64,
2150    /// Number of deletions/tomb key(s) in the file
2151    pub num_deletions: u64,
2152}
2153
2154fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2155    opts.iter()
2156        .map(|(name, value)| {
2157            let cname = match CString::new(name.as_bytes()) {
2158                Ok(cname) => cname,
2159                Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2160            };
2161            let cvalue = match CString::new(value.as_bytes()) {
2162                Ok(cvalue) => cvalue,
2163                Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2164            };
2165            Ok((cname, cvalue))
2166        })
2167        .collect()
2168}
2169
2170pub(crate) fn convert_values(
2171    values: Vec<*mut c_char>,
2172    values_sizes: Vec<usize>,
2173    errors: Vec<*mut c_char>,
2174) -> Vec<Result<Option<Vec<u8>>, Error>> {
2175    values
2176        .into_iter()
2177        .zip(values_sizes.into_iter())
2178        .zip(errors.into_iter())
2179        .map(|((v, s), e)| {
2180            if e.is_null() {
2181                let value = unsafe { crate::ffi_util::raw_data(v, s) };
2182                unsafe {
2183                    ffi::rocksdb_free(v as *mut c_void);
2184                }
2185                Ok(value)
2186            } else {
2187                Err(Error::new(crate::ffi_util::error_message(e)))
2188            }
2189        })
2190        .collect()
2191}