Skip to main content

rust_rocksdb/
db.rs

1// Copyright 2020 Tyler Neely
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use std::cell::RefCell;
17use std::collections::{BTreeMap, HashMap};
18use std::ffi::{CStr, CString};
19use std::fmt;
20use std::fs;
21use std::iter;
22use std::path::Path;
23use std::path::PathBuf;
24use std::ptr;
25use std::slice;
26use std::str;
27use std::sync::Arc;
28use std::time::Duration;
29
30use crate::column_family::ColumnFamilyTtl;
31use crate::ffi_util::CSlice;
32use crate::{
33    ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
34    DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, DEFAULT_COLUMN_FAMILY_NAME,
35    Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, Options, ReadOptions,
36    SnapshotWithThreadMode, WaitForCompactOptions, WriteBatch, WriteBatchWithIndex, WriteOptions,
37    column_family::{AsColumnFamilyRef, BoundColumnFamily, UnboundColumnFamily},
38    db_options::{ImportColumnFamilyOptions, OptionsMustOutliveDB},
39    ffi,
40    ffi_util::{
41        CStrLike, convert_rocksdb_error, from_cstr_and_free, from_cstr_without_free,
42        opt_bytes_to_ptr, raw_data, to_cpath,
43    },
44};
45use rust_librocksdb_sys::{
46    rocksdb_livefile_destroy, rocksdb_livefile_t, rocksdb_livefiles_destroy, rocksdb_livefiles_t,
47};
48
49use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
50use parking_lot::RwLock;
51
52// Default options are kept per-thread to avoid re-allocating on every call while
53// also preventing cross-thread sharing. Some RocksDB option wrappers hold
54// pointers into internal buffers and are not safe to share across threads.
55// Using thread_local allows cheap reuse in the common "default options" path
56// without synchronization overhead. Callers who need non-defaults must pass
57// explicit options.
58thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
59thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
60thread_local! { static DEFAULT_FLUSH_OPTS: FlushOptions = FlushOptions::default(); }
61// Thread-local ReadOptions for hot prefix probes; preconfigured for prefix scans.
62thread_local! { static PREFIX_READ_OPTS: RefCell<ReadOptions> = RefCell::new({ let mut o = ReadOptions::default(); o.set_prefix_same_as_start(true); o }); }
63
64/// A range of keys, `start_key` is included, but not `end_key`.
65///
66/// You should make sure `end_key` is not less than `start_key`.
67pub struct Range<'a> {
68    start_key: &'a [u8],
69    end_key: &'a [u8],
70}
71
72impl<'a> Range<'a> {
73    pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
74        Range { start_key, end_key }
75    }
76}
77
78/// Result of a [`get_into_buffer`](DBCommon::get_into_buffer) operation.
79///
80/// This enum represents the outcome of attempting to read a value directly
81/// into a caller-provided buffer, avoiding memory allocation. This is the most
82/// efficient way to read values when you have a pre-allocated buffer available.
83///
84/// # Performance
85///
86/// Using `get_into_buffer` with a reusable buffer can significantly reduce
87/// allocation overhead in hot paths compared to [`get`](DBCommon::get) or even
88/// [`get_pinned`](DBCommon::get_pinned):
89///
90/// - [`get`](DBCommon::get): Allocates a new `Vec<u8>` for each call
91/// - [`get_pinned`](DBCommon::get_pinned): Pins memory in RocksDB's block cache
92/// - `get_into_buffer`: Zero allocation when buffer is large enough
93///
94/// # Example
95///
96/// ```
97/// use rust_rocksdb::{DB, GetIntoBufferResult};
98///
99/// # let tempdir = tempfile::Builder::new().prefix("ex").tempdir().unwrap();
100/// let db = DB::open_default(tempdir.path()).unwrap();
101/// db.put(b"key", b"value").unwrap();
102///
103/// let mut buffer = [0u8; 1024];
104/// match db.get_into_buffer(b"key", &mut buffer).unwrap() {
105///     GetIntoBufferResult::Found(len) => {
106///         println!("Value: {:?}", &buffer[..len]);
107///     }
108///     GetIntoBufferResult::NotFound => {
109///         println!("Key not found");
110///     }
111///     GetIntoBufferResult::BufferTooSmall(needed) => {
112///         println!("Need a buffer of at least {} bytes", needed);
113///     }
114/// }
115/// ```
116#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
117pub enum GetIntoBufferResult {
118    /// The key was not found in the database.
119    NotFound,
120    /// The value was found and successfully copied into the buffer.
121    /// The `usize` contains the actual size of the value (number of bytes written).
122    Found(usize),
123    /// The value was found but the provided buffer was too small to hold it.
124    /// The `usize` contains the actual size of the value, allowing the caller
125    /// to allocate a larger buffer and retry.
126    ///
127    /// Note: When this variant is returned, no data is written to the buffer.
128    BufferTooSmall(usize),
129}
130
131impl GetIntoBufferResult {
132    /// Returns `true` if the key was found (regardless of buffer size).
133    #[inline]
134    pub fn is_found(&self) -> bool {
135        matches!(self, Self::Found(_) | Self::BufferTooSmall(_))
136    }
137
138    /// Returns `true` if the key was not found.
139    #[inline]
140    pub fn is_not_found(&self) -> bool {
141        matches!(self, Self::NotFound)
142    }
143
144    /// Returns the value size if the key was found, `None` otherwise.
145    #[inline]
146    pub fn value_size(&self) -> Option<usize> {
147        match self {
148            Self::Found(size) | Self::BufferTooSmall(size) => Some(*size),
149            Self::NotFound => None,
150        }
151    }
152}
153
154/// A reusable prefix probe that avoids per-call iterator creation/destruction.
155///
156/// Use this when performing many prefix existence checks in a tight loop.
157pub struct PrefixProber<'a, D: DBAccess> {
158    raw: DBRawIteratorWithThreadMode<'a, D>,
159}
160
161impl<D: DBAccess> PrefixProber<'_, D> {
162    /// Returns true if any key exists with the given prefix.
163    /// This performs a seek to the prefix and checks the current key.
164    pub fn exists(&mut self, prefix: &[u8]) -> Result<bool, Error> {
165        self.raw.seek(prefix);
166        if self.raw.valid()
167            && let Some(k) = self.raw.key()
168        {
169            return Ok(k.starts_with(prefix));
170        }
171        self.raw.status()?;
172        Ok(false)
173    }
174}
175
176/// Marker trait to specify single or multi threaded column family alternations for
177/// [`DBWithThreadMode<T>`]
178///
179/// This arrangement makes differences in self mutability and return type in
180/// some of `DBWithThreadMode` methods.
181///
182/// While being a marker trait to be generic over `DBWithThreadMode`, this trait
183/// also has a minimum set of not-encapsulated internal methods between
184/// [`SingleThreaded`] and [`MultiThreaded`].  These methods aren't expected to be
185/// called and defined externally.
186pub trait ThreadMode {
187    /// Internal implementation for storing column family handles
188    fn new_cf_map_internal(
189        cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
190    ) -> Self;
191    /// Internal implementation for dropping column family handles
192    fn drop_all_cfs_internal(&mut self);
193}
194
195/// Actual marker type for the marker trait `ThreadMode`, which holds
196/// a collection of column families without synchronization primitive, providing
197/// no overhead for the single-threaded column family alternations. The other
198/// mode is [`MultiThreaded`].
199///
200/// See [`DB`] for more details, including performance implications for each mode
201pub struct SingleThreaded {
202    pub(crate) cfs: HashMap<String, ColumnFamily>,
203}
204
205/// Actual marker type for the marker trait `ThreadMode`, which holds
206/// a collection of column families wrapped in a RwLock to be mutated
207/// concurrently. The other mode is [`SingleThreaded`].
208///
209/// See [`DB`] for more details, including performance implications for each mode
210pub struct MultiThreaded {
211    pub(crate) cfs: RwLock<HashMap<String, Arc<UnboundColumnFamily>>>,
212}
213
214impl ThreadMode for SingleThreaded {
215    fn new_cf_map_internal(
216        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
217    ) -> Self {
218        Self {
219            cfs: cfs
220                .into_iter()
221                .map(|(n, c)| (n, ColumnFamily { inner: c }))
222                .collect(),
223        }
224    }
225
226    fn drop_all_cfs_internal(&mut self) {
227        // Cause all ColumnFamily objects to be Drop::drop()-ed.
228        self.cfs.clear();
229    }
230}
231
232impl ThreadMode for MultiThreaded {
233    fn new_cf_map_internal(
234        cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
235    ) -> Self {
236        Self {
237            cfs: RwLock::new(
238                cfs.into_iter()
239                    .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
240                    .collect(),
241            ),
242        }
243    }
244
245    fn drop_all_cfs_internal(&mut self) {
246        // Cause all UnboundColumnFamily objects to be Drop::drop()-ed.
247        self.cfs.write().clear();
248    }
249}
250
251/// Get underlying `rocksdb_t`.
252pub trait DBInner {
253    fn inner(&self) -> *mut ffi::rocksdb_t;
254}
255
256/// A helper type to implement some common methods for [`DBWithThreadMode`]
257/// and [`OptimisticTransactionDB`].
258///
259/// [`OptimisticTransactionDB`]: crate::OptimisticTransactionDB
260///
261/// When using [`SingleThreaded`] mode, `create_cf` requires `&mut self`,
262/// preventing multiple immutable references from calling it concurrently:
263///
264/// ```compile_fail,E0596
265/// use rust_rocksdb::{DBWithThreadMode, Options, SingleThreaded};
266///
267/// let db = DBWithThreadMode::<SingleThreaded>::open_default("/path/to/dummy").unwrap();
268/// let db_ref1 = &db;
269/// let db_ref2 = &db;
270/// let opts = Options::default();
271/// db_ref1.create_cf("cf1", &opts).unwrap();
272/// db_ref2.create_cf("cf2", &opts).unwrap();
273/// ```
274///
275/// [`SingleThreaded`]: crate::SingleThreaded
276pub struct DBCommon<T: ThreadMode, D: DBInner> {
277    pub(crate) inner: D,
278    cfs: T, // Column families are held differently depending on thread mode
279    path: PathBuf,
280    _outlive: Vec<OptionsMustOutliveDB>,
281}
282
283/// Minimal set of DB-related methods, intended to be generic over
284/// `DBWithThreadMode<T>`. Mainly used internally
285pub trait DBAccess {
286    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
287
288    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
289
290    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
291
292    unsafe fn create_iterator_cf(
293        &self,
294        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
295        readopts: &ReadOptions,
296    ) -> *mut ffi::rocksdb_iterator_t;
297
298    fn get_opt<K: AsRef<[u8]>>(
299        &self,
300        key: K,
301        readopts: &ReadOptions,
302    ) -> Result<Option<Vec<u8>>, Error>;
303
304    fn get_cf_opt<K: AsRef<[u8]>>(
305        &self,
306        cf: &impl AsColumnFamilyRef,
307        key: K,
308        readopts: &ReadOptions,
309    ) -> Result<Option<Vec<u8>>, Error>;
310
311    fn get_pinned_opt<K: AsRef<[u8]>>(
312        &'_ self,
313        key: K,
314        readopts: &ReadOptions,
315    ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
316
317    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
318        &'_ self,
319        cf: &impl AsColumnFamilyRef,
320        key: K,
321        readopts: &ReadOptions,
322    ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
323
324    fn multi_get_opt<K, I>(
325        &self,
326        keys: I,
327        readopts: &ReadOptions,
328    ) -> Vec<Result<Option<Vec<u8>>, Error>>
329    where
330        K: AsRef<[u8]>,
331        I: IntoIterator<Item = K>;
332
333    fn multi_get_cf_opt<'b, K, I, W>(
334        &self,
335        keys_cf: I,
336        readopts: &ReadOptions,
337    ) -> Vec<Result<Option<Vec<u8>>, Error>>
338    where
339        K: AsRef<[u8]>,
340        I: IntoIterator<Item = (&'b W, K)>,
341        W: AsColumnFamilyRef + 'b;
342}
343
344impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
345    unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
346        unsafe { ffi::rocksdb_create_snapshot(self.inner.inner()) }
347    }
348
349    unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
350        unsafe {
351            ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
352        }
353    }
354
355    unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
356        unsafe { ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner) }
357    }
358
359    unsafe fn create_iterator_cf(
360        &self,
361        cf_handle: *mut ffi::rocksdb_column_family_handle_t,
362        readopts: &ReadOptions,
363    ) -> *mut ffi::rocksdb_iterator_t {
364        unsafe { ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle) }
365    }
366
367    fn get_opt<K: AsRef<[u8]>>(
368        &self,
369        key: K,
370        readopts: &ReadOptions,
371    ) -> Result<Option<Vec<u8>>, Error> {
372        self.get_opt(key, readopts)
373    }
374
375    fn get_cf_opt<K: AsRef<[u8]>>(
376        &self,
377        cf: &impl AsColumnFamilyRef,
378        key: K,
379        readopts: &ReadOptions,
380    ) -> Result<Option<Vec<u8>>, Error> {
381        self.get_cf_opt(cf, key, readopts)
382    }
383
384    fn get_pinned_opt<K: AsRef<[u8]>>(
385        &'_ self,
386        key: K,
387        readopts: &ReadOptions,
388    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
389        self.get_pinned_opt(key, readopts)
390    }
391
392    fn get_pinned_cf_opt<K: AsRef<[u8]>>(
393        &'_ self,
394        cf: &impl AsColumnFamilyRef,
395        key: K,
396        readopts: &ReadOptions,
397    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
398        self.get_pinned_cf_opt(cf, key, readopts)
399    }
400
401    fn multi_get_opt<K, Iter>(
402        &self,
403        keys: Iter,
404        readopts: &ReadOptions,
405    ) -> Vec<Result<Option<Vec<u8>>, Error>>
406    where
407        K: AsRef<[u8]>,
408        Iter: IntoIterator<Item = K>,
409    {
410        self.multi_get_opt(keys, readopts)
411    }
412
413    fn multi_get_cf_opt<'b, K, Iter, W>(
414        &self,
415        keys_cf: Iter,
416        readopts: &ReadOptions,
417    ) -> Vec<Result<Option<Vec<u8>>, Error>>
418    where
419        K: AsRef<[u8]>,
420        Iter: IntoIterator<Item = (&'b W, K)>,
421        W: AsColumnFamilyRef + 'b,
422    {
423        self.multi_get_cf_opt(keys_cf, readopts)
424    }
425}
426
427pub struct DBWithThreadModeInner {
428    inner: *mut ffi::rocksdb_t,
429}
430
431impl DBInner for DBWithThreadModeInner {
432    fn inner(&self) -> *mut ffi::rocksdb_t {
433        self.inner
434    }
435}
436
437impl Drop for DBWithThreadModeInner {
438    fn drop(&mut self) {
439        unsafe {
440            ffi::rocksdb_close(self.inner);
441        }
442    }
443}
444
445/// A type alias to RocksDB database.
446///
447/// See crate level documentation for a simple usage example.
448/// See [`DBCommon`] for full list of methods.
449pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
450
451/// A type alias to DB instance type with the single-threaded column family
452/// creations/deletions
453///
454/// # Compatibility and multi-threaded mode
455///
456/// Previously, [`DB`] was defined as a direct `struct`. Now, it's type-aliased for
457/// compatibility. Use `DBCommon<MultiThreaded>` for multi-threaded
458/// column family alternations.
459///
460/// # Limited performance implication for single-threaded mode
461///
462/// Even with [`SingleThreaded`], almost all of RocksDB operations is
463/// multi-threaded unless the underlying RocksDB instance is
464/// specifically configured otherwise. `SingleThreaded` only forces
465/// serialization of column family alternations by requiring `&mut self` of DB
466/// instance due to its wrapper implementation details.
467///
468/// # Multi-threaded mode
469///
470/// [`MultiThreaded`] can be appropriate for the situation of multi-threaded
471/// workload including multi-threaded column family alternations, costing the
472/// RwLock overhead inside `DB`.
473#[cfg(not(feature = "multi-threaded-cf"))]
474pub type DB = DBWithThreadMode<SingleThreaded>;
475
476#[cfg(feature = "multi-threaded-cf")]
477pub type DB = DBWithThreadMode<MultiThreaded>;
478
479// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
480// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
481// rocksdb internally does not rely on thread-local information for its user-exposed types.
482unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
483
484// Sync is similarly safe for many types because they do not expose interior mutability, and their
485// use within the rocksdb library is generally behind a const reference
486unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
487
488// Specifies whether open DB for read only.
489enum AccessType<'a> {
490    ReadWrite,
491    ReadOnly { error_if_log_file_exist: bool },
492    Secondary { secondary_path: &'a Path },
493    WithTTL { ttl: Duration },
494}
495
496/// Methods of `DBWithThreadMode`.
497impl<T: ThreadMode> DBWithThreadMode<T> {
498    /// Opens a database with default options.
499    pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
500        let mut opts = Options::default();
501        opts.create_if_missing(true);
502        Self::open(&opts, path)
503    }
504
505    /// Opens the database with the specified options.
506    pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
507        Self::open_cf(opts, path, None::<&str>)
508    }
509
510    /// Opens the database for read only with the specified options.
511    pub fn open_for_read_only<P: AsRef<Path>>(
512        opts: &Options,
513        path: P,
514        error_if_log_file_exist: bool,
515    ) -> Result<Self, Error> {
516        Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
517    }
518
519    /// Opens the database as a secondary.
520    pub fn open_as_secondary<P: AsRef<Path>>(
521        opts: &Options,
522        primary_path: P,
523        secondary_path: P,
524    ) -> Result<Self, Error> {
525        Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
526    }
527
528    /// Opens the database with a Time to Live compaction filter.
529    ///
530    /// This applies the given `ttl` to all column families created without an explicit TTL.
531    /// See [`DB::open_cf_descriptors_with_ttl`] for more control over individual column family TTLs.
532    pub fn open_with_ttl<P: AsRef<Path>>(
533        opts: &Options,
534        path: P,
535        ttl: Duration,
536    ) -> Result<Self, Error> {
537        Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
538    }
539
540    /// Opens the database with a Time to Live compaction filter and column family names.
541    ///
542    /// Column families opened using this function will be created with default `Options`.
543    pub fn open_cf_with_ttl<P, I, N>(
544        opts: &Options,
545        path: P,
546        cfs: I,
547        ttl: Duration,
548    ) -> Result<Self, Error>
549    where
550        P: AsRef<Path>,
551        I: IntoIterator<Item = N>,
552        N: AsRef<str>,
553    {
554        let cfs = cfs
555            .into_iter()
556            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
557
558        Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
559    }
560
561    /// Opens a database with the given database with a Time to Live compaction filter and
562    /// column family descriptors.
563    ///
564    /// Applies the provided `ttl` as the default TTL for all column families.
565    /// Column families will inherit this TTL by default, unless their descriptor explicitly
566    /// sets a different TTL using [`ColumnFamilyTtl::Duration`] or opts out using [`ColumnFamilyTtl::Disabled`].
567    ///
568    /// *NOTE*: The `default` column family is opened with `Options::default()` unless
569    /// explicitly configured within the `cfs` iterator.
570    /// To customize the `default` column family's options, include a `ColumnFamilyDescriptor`
571    /// with the name "default" in the `cfs` iterator.
572    ///
573    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
574    pub fn open_cf_descriptors_with_ttl<P, I>(
575        opts: &Options,
576        path: P,
577        cfs: I,
578        ttl: Duration,
579    ) -> Result<Self, Error>
580    where
581        P: AsRef<Path>,
582        I: IntoIterator<Item = ColumnFamilyDescriptor>,
583    {
584        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
585    }
586
587    /// Opens a database with the given database options and column family names.
588    ///
589    /// Column families opened using this function will be created with default `Options`.
590    pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
591    where
592        P: AsRef<Path>,
593        I: IntoIterator<Item = N>,
594        N: AsRef<str>,
595    {
596        let cfs = cfs
597            .into_iter()
598            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
599
600        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
601    }
602
603    /// Opens a database with the given database options and column family names.
604    ///
605    /// Column families opened using given `Options`.
606    pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
607    where
608        P: AsRef<Path>,
609        I: IntoIterator<Item = (N, Options)>,
610        N: AsRef<str>,
611    {
612        let cfs = cfs
613            .into_iter()
614            .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
615
616        Self::open_cf_descriptors(opts, path, cfs)
617    }
618
619    /// Opens a database for read only with the given database options and column family names.
620    /// *NOTE*: `default` column family is opened with `Options::default()`.
621    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
622    pub fn open_cf_for_read_only<P, I, N>(
623        opts: &Options,
624        path: P,
625        cfs: I,
626        error_if_log_file_exist: bool,
627    ) -> Result<Self, Error>
628    where
629        P: AsRef<Path>,
630        I: IntoIterator<Item = N>,
631        N: AsRef<str>,
632    {
633        let cfs = cfs
634            .into_iter()
635            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
636
637        Self::open_cf_descriptors_internal(
638            opts,
639            path,
640            cfs,
641            &AccessType::ReadOnly {
642                error_if_log_file_exist,
643            },
644        )
645    }
646
647    /// Opens a database for read only with the given database options and column family names.
648    /// *NOTE*: `default` column family is opened with `Options::default()`.
649    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
650    pub fn open_cf_with_opts_for_read_only<P, I, N>(
651        db_opts: &Options,
652        path: P,
653        cfs: I,
654        error_if_log_file_exist: bool,
655    ) -> Result<Self, Error>
656    where
657        P: AsRef<Path>,
658        I: IntoIterator<Item = (N, Options)>,
659        N: AsRef<str>,
660    {
661        let cfs = cfs
662            .into_iter()
663            .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
664
665        Self::open_cf_descriptors_internal(
666            db_opts,
667            path,
668            cfs,
669            &AccessType::ReadOnly {
670                error_if_log_file_exist,
671            },
672        )
673    }
674
675    /// Opens a database for ready only with the given database options and
676    /// column family descriptors.
677    /// *NOTE*: `default` column family is opened with `Options::default()`.
678    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
679    pub fn open_cf_descriptors_read_only<P, I>(
680        opts: &Options,
681        path: P,
682        cfs: I,
683        error_if_log_file_exist: bool,
684    ) -> Result<Self, Error>
685    where
686        P: AsRef<Path>,
687        I: IntoIterator<Item = ColumnFamilyDescriptor>,
688    {
689        Self::open_cf_descriptors_internal(
690            opts,
691            path,
692            cfs,
693            &AccessType::ReadOnly {
694                error_if_log_file_exist,
695            },
696        )
697    }
698
699    /// Opens the database as a secondary with the given database options and column family names.
700    /// *NOTE*: `default` column family is opened with `Options::default()`.
701    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
702    pub fn open_cf_as_secondary<P, I, N>(
703        opts: &Options,
704        primary_path: P,
705        secondary_path: P,
706        cfs: I,
707    ) -> Result<Self, Error>
708    where
709        P: AsRef<Path>,
710        I: IntoIterator<Item = N>,
711        N: AsRef<str>,
712    {
713        let cfs = cfs
714            .into_iter()
715            .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
716
717        Self::open_cf_descriptors_internal(
718            opts,
719            primary_path,
720            cfs,
721            &AccessType::Secondary {
722                secondary_path: secondary_path.as_ref(),
723            },
724        )
725    }
726
727    /// Opens the database as a secondary with the given database options and
728    /// column family descriptors.
729    /// *NOTE*: `default` column family is opened with `Options::default()`.
730    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
731    pub fn open_cf_descriptors_as_secondary<P, I>(
732        opts: &Options,
733        path: P,
734        secondary_path: P,
735        cfs: I,
736    ) -> Result<Self, Error>
737    where
738        P: AsRef<Path>,
739        I: IntoIterator<Item = ColumnFamilyDescriptor>,
740    {
741        Self::open_cf_descriptors_internal(
742            opts,
743            path,
744            cfs,
745            &AccessType::Secondary {
746                secondary_path: secondary_path.as_ref(),
747            },
748        )
749    }
750
751    /// Opens a database with the given database options and column family descriptors.
752    /// *NOTE*: `default` column family is opened with `Options::default()`.
753    /// If you want to open `default` cf with different options, set them explicitly in `cfs`.
754    pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
755    where
756        P: AsRef<Path>,
757        I: IntoIterator<Item = ColumnFamilyDescriptor>,
758    {
759        Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
760    }
761
762    /// Internal implementation for opening RocksDB.
763    fn open_cf_descriptors_internal<P, I>(
764        opts: &Options,
765        path: P,
766        cfs: I,
767        access_type: &AccessType,
768    ) -> Result<Self, Error>
769    where
770        P: AsRef<Path>,
771        I: IntoIterator<Item = ColumnFamilyDescriptor>,
772    {
773        let cfs: Vec<_> = cfs.into_iter().collect();
774        let outlive = iter::once(opts.outlive.clone())
775            .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
776            .collect();
777
778        let cpath = to_cpath(&path)?;
779
780        if let Err(e) = fs::create_dir_all(&path) {
781            return Err(Error::new(format!(
782                "Failed to create RocksDB directory: `{e:?}`."
783            )));
784        }
785
786        let db: *mut ffi::rocksdb_t;
787        let mut cf_map = BTreeMap::new();
788
789        if cfs.is_empty() {
790            db = Self::open_raw(opts, &cpath, access_type)?;
791        } else {
792            let mut cfs_v = cfs;
793            // Always open the default column family.
794            if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
795                cfs_v.push(ColumnFamilyDescriptor {
796                    name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
797                    options: Options::default(),
798                    ttl: ColumnFamilyTtl::SameAsDb,
799                });
800            }
801            // We need to store our CStrings in an intermediate vector
802            // so that their pointers remain valid.
803            let c_cfs: Vec<CString> = cfs_v
804                .iter()
805                .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
806                .collect();
807
808            let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
809
810            // These handles will be populated by DB.
811            let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
812
813            let cfopts: Vec<_> = cfs_v
814                .iter()
815                .map(|cf| cf.options.inner.cast_const())
816                .collect();
817
818            db = Self::open_cf_raw(
819                opts,
820                &cpath,
821                &cfs_v,
822                &cfnames,
823                &cfopts,
824                &mut cfhandles,
825                access_type,
826            )?;
827            for handle in &cfhandles {
828                if handle.is_null() {
829                    return Err(Error::new(
830                        "Received null column family handle from DB.".to_owned(),
831                    ));
832                }
833            }
834
835            for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
836                cf_map.insert(cf_desc.name.clone(), inner);
837            }
838        }
839
840        if db.is_null() {
841            return Err(Error::new("Could not initialize database.".to_owned()));
842        }
843
844        Ok(Self {
845            inner: DBWithThreadModeInner { inner: db },
846            path: path.as_ref().to_path_buf(),
847            cfs: T::new_cf_map_internal(cf_map),
848            _outlive: outlive,
849        })
850    }
851
852    fn open_raw(
853        opts: &Options,
854        cpath: &CString,
855        access_type: &AccessType,
856    ) -> Result<*mut ffi::rocksdb_t, Error> {
857        let db = unsafe {
858            match *access_type {
859                AccessType::ReadOnly {
860                    error_if_log_file_exist,
861                } => ffi_try!(ffi::rocksdb_open_for_read_only(
862                    opts.inner,
863                    cpath.as_ptr(),
864                    c_uchar::from(error_if_log_file_exist),
865                )),
866                AccessType::ReadWrite => {
867                    ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
868                }
869                AccessType::Secondary { secondary_path } => {
870                    ffi_try!(ffi::rocksdb_open_as_secondary(
871                        opts.inner,
872                        cpath.as_ptr(),
873                        to_cpath(secondary_path)?.as_ptr(),
874                    ))
875                }
876                AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
877                    opts.inner,
878                    cpath.as_ptr(),
879                    ttl.as_secs() as c_int,
880                )),
881            }
882        };
883        Ok(db)
884    }
885
886    #[allow(clippy::pedantic)]
887    fn open_cf_raw(
888        opts: &Options,
889        cpath: &CString,
890        cfs_v: &[ColumnFamilyDescriptor],
891        cfnames: &[*const c_char],
892        cfopts: &[*const ffi::rocksdb_options_t],
893        cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
894        access_type: &AccessType,
895    ) -> Result<*mut ffi::rocksdb_t, Error> {
896        let db = unsafe {
897            match *access_type {
898                AccessType::ReadOnly {
899                    error_if_log_file_exist,
900                } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
901                    opts.inner,
902                    cpath.as_ptr(),
903                    cfs_v.len() as c_int,
904                    cfnames.as_ptr(),
905                    cfopts.as_ptr(),
906                    cfhandles.as_mut_ptr(),
907                    c_uchar::from(error_if_log_file_exist),
908                )),
909                AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
910                    opts.inner,
911                    cpath.as_ptr(),
912                    cfs_v.len() as c_int,
913                    cfnames.as_ptr(),
914                    cfopts.as_ptr(),
915                    cfhandles.as_mut_ptr(),
916                )),
917                AccessType::Secondary { secondary_path } => {
918                    ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
919                        opts.inner,
920                        cpath.as_ptr(),
921                        to_cpath(secondary_path)?.as_ptr(),
922                        cfs_v.len() as c_int,
923                        cfnames.as_ptr(),
924                        cfopts.as_ptr(),
925                        cfhandles.as_mut_ptr(),
926                    ))
927                }
928                AccessType::WithTTL { ttl } => {
929                    let ttls: Vec<_> = cfs_v
930                        .iter()
931                        .map(|cf| match cf.ttl {
932                            ColumnFamilyTtl::Disabled => i32::MAX,
933                            ColumnFamilyTtl::Duration(duration) => duration.as_secs() as i32,
934                            ColumnFamilyTtl::SameAsDb => ttl.as_secs() as i32,
935                        })
936                        .collect();
937
938                    ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
939                        opts.inner,
940                        cpath.as_ptr(),
941                        cfs_v.len() as c_int,
942                        cfnames.as_ptr(),
943                        cfopts.as_ptr(),
944                        cfhandles.as_mut_ptr(),
945                        ttls.as_ptr(),
946                    ))
947                }
948            }
949        };
950        Ok(db)
951    }
952
953    /// Removes the database entries in the range `["from", "to")` using given write options.
954    pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
955        &self,
956        cf: &impl AsColumnFamilyRef,
957        from: K,
958        to: K,
959        writeopts: &WriteOptions,
960    ) -> Result<(), Error> {
961        let from = from.as_ref();
962        let to = to.as_ref();
963
964        unsafe {
965            ffi_try!(ffi::rocksdb_delete_range_cf(
966                self.inner.inner(),
967                writeopts.inner,
968                cf.inner(),
969                from.as_ptr() as *const c_char,
970                from.len() as size_t,
971                to.as_ptr() as *const c_char,
972                to.len() as size_t,
973            ));
974            Ok(())
975        }
976    }
977
978    /// Removes the database entries in the range `["from", "to")` using default write options.
979    pub fn delete_range_cf<K: AsRef<[u8]>>(
980        &self,
981        cf: &impl AsColumnFamilyRef,
982        from: K,
983        to: K,
984    ) -> Result<(), Error> {
985        DEFAULT_WRITE_OPTS.with(|opts| self.delete_range_cf_opt(cf, from, to, opts))
986    }
987
988    pub fn write_opt(&self, batch: &WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
989        unsafe {
990            ffi_try!(ffi::rocksdb_write(
991                self.inner.inner(),
992                writeopts.inner,
993                batch.inner
994            ));
995        }
996        Ok(())
997    }
998
999    pub fn write(&self, batch: &WriteBatch) -> Result<(), Error> {
1000        DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
1001    }
1002
1003    pub fn write_without_wal(&self, batch: &WriteBatch) -> Result<(), Error> {
1004        let mut wo = WriteOptions::new();
1005        wo.disable_wal(true);
1006        self.write_opt(batch, &wo)
1007    }
1008
1009    pub fn write_wbwi(&self, wbwi: &WriteBatchWithIndex) -> Result<(), Error> {
1010        DEFAULT_WRITE_OPTS.with(|opts| self.write_wbwi_opt(wbwi, opts))
1011    }
1012
1013    pub fn write_wbwi_opt(
1014        &self,
1015        wbwi: &WriteBatchWithIndex,
1016        writeopts: &WriteOptions,
1017    ) -> Result<(), Error> {
1018        unsafe {
1019            ffi_try!(ffi::rocksdb_write_writebatch_wi(
1020                self.inner.inner(),
1021                writeopts.inner,
1022                wbwi.inner
1023            ));
1024
1025            Ok(())
1026        }
1027    }
1028
1029    /// Suspend deleting obsolete files. Compactions will continue to occur,
1030    /// but no obsolete files will be deleted. To resume file deletions, each
1031    /// call to disable_file_deletions() must be matched by a subsequent call to
1032    /// enable_file_deletions(). For more details, see enable_file_deletions().
1033    pub fn disable_file_deletions(&self) -> Result<(), Error> {
1034        unsafe {
1035            ffi_try!(ffi::rocksdb_disable_file_deletions(self.inner.inner()));
1036        }
1037        Ok(())
1038    }
1039
1040    /// Resume deleting obsolete files, following up on `disable_file_deletions()`.
1041    ///
1042    /// File deletions disabling and enabling is not controlled by a binary flag,
1043    /// instead it's represented as a counter to allow different callers to
1044    /// independently disable file deletion. Disabling file deletion can be
1045    /// critical for operations like making a backup. So the counter implementation
1046    /// makes the file deletion disabled as long as there is one caller requesting
1047    /// so, and only when every caller agrees to re-enable file deletion, it will
1048    /// be enabled. Two threads can call this method concurrently without
1049    /// synchronization -- i.e., file deletions will be enabled only after both
1050    /// threads call enable_file_deletions()
1051    pub fn enable_file_deletions(&self) -> Result<(), Error> {
1052        unsafe {
1053            ffi_try!(ffi::rocksdb_enable_file_deletions(self.inner.inner()));
1054        }
1055        Ok(())
1056    }
1057}
1058
1059/// Common methods of `DBWithThreadMode` and `OptimisticTransactionDB`.
1060impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
1061    pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
1062        Self {
1063            inner,
1064            cfs,
1065            path,
1066            _outlive: outlive,
1067        }
1068    }
1069
1070    pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
1071        let cpath = to_cpath(path)?;
1072        let mut length = 0;
1073
1074        unsafe {
1075            let ptr = ffi_try!(ffi::rocksdb_list_column_families(
1076                opts.inner,
1077                cpath.as_ptr(),
1078                &raw mut length,
1079            ));
1080
1081            let vec = slice::from_raw_parts(ptr, length)
1082                .iter()
1083                .map(|ptr| from_cstr_without_free(*ptr))
1084                .collect();
1085            ffi::rocksdb_list_column_families_destroy(ptr, length);
1086            Ok(vec)
1087        }
1088    }
1089
1090    pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
1091        let cpath = to_cpath(path)?;
1092        unsafe {
1093            ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
1094        }
1095        Ok(())
1096    }
1097
1098    pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
1099        let cpath = to_cpath(path)?;
1100        unsafe {
1101            ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
1102        }
1103        Ok(())
1104    }
1105
1106    pub fn path(&self) -> &Path {
1107        self.path.as_path()
1108    }
1109
1110    /// Flushes the WAL buffer. If `sync` is set to `true`, also syncs
1111    /// the data to disk.
1112    pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
1113        unsafe {
1114            ffi_try!(ffi::rocksdb_flush_wal(
1115                self.inner.inner(),
1116                c_uchar::from(sync)
1117            ));
1118        }
1119        Ok(())
1120    }
1121
1122    /// Flushes database memtables to SST files on the disk.
1123    pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
1124        unsafe {
1125            ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
1126        }
1127        Ok(())
1128    }
1129
1130    /// Flushes database memtables to SST files on the disk using default options.
1131    pub fn flush(&self) -> Result<(), Error> {
1132        self.flush_opt(&FlushOptions::default())
1133    }
1134
1135    /// Flushes database memtables to SST files on the disk for a given column family.
1136    pub fn flush_cf_opt(
1137        &self,
1138        cf: &impl AsColumnFamilyRef,
1139        flushopts: &FlushOptions,
1140    ) -> Result<(), Error> {
1141        unsafe {
1142            ffi_try!(ffi::rocksdb_flush_cf(
1143                self.inner.inner(),
1144                flushopts.inner,
1145                cf.inner()
1146            ));
1147        }
1148        Ok(())
1149    }
1150
1151    /// Flushes multiple column families.
1152    ///
1153    /// If atomic flush is not enabled, it is equivalent to calling flush_cf multiple times.
1154    /// If atomic flush is enabled, it will flush all column families specified in `cfs` up to the latest sequence
1155    /// number at the time when flush is requested.
1156    pub fn flush_cfs_opt(
1157        &self,
1158        cfs: &[&impl AsColumnFamilyRef],
1159        opts: &FlushOptions,
1160    ) -> Result<(), Error> {
1161        let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
1162        unsafe {
1163            ffi_try!(ffi::rocksdb_flush_cfs(
1164                self.inner.inner(),
1165                opts.inner,
1166                cfs.as_mut_ptr(),
1167                cfs.len() as libc::c_int,
1168            ));
1169        }
1170        Ok(())
1171    }
1172
1173    /// Flushes database memtables to SST files on the disk for a given column family using default
1174    /// options.
1175    pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
1176        DEFAULT_FLUSH_OPTS.with(|opts| self.flush_cf_opt(cf, opts))
1177    }
1178
1179    /// Return the bytes associated with a key value with read options. If you only intend to use
1180    /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
1181    /// to avoid unnecessary memory copy.
1182    pub fn get_opt<K: AsRef<[u8]>>(
1183        &self,
1184        key: K,
1185        readopts: &ReadOptions,
1186    ) -> Result<Option<Vec<u8>>, Error> {
1187        self.get_pinned_opt(key, readopts)
1188            .map(|x| x.map(|v| v.as_ref().to_vec()))
1189    }
1190
1191    /// Return the bytes associated with a key value. If you only intend to use the vector returned
1192    /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
1193    /// copy.
1194    pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
1195        DEFAULT_READ_OPTS.with(|opts| self.get_opt(key.as_ref(), opts))
1196    }
1197
1198    /// Return the bytes associated with a key value and the given column family with read options.
1199    /// If you only intend to use the vector returned temporarily, consider using
1200    /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
1201    pub fn get_cf_opt<K: AsRef<[u8]>>(
1202        &self,
1203        cf: &impl AsColumnFamilyRef,
1204        key: K,
1205        readopts: &ReadOptions,
1206    ) -> Result<Option<Vec<u8>>, Error> {
1207        self.get_pinned_cf_opt(cf, key, readopts)
1208            .map(|x| x.map(|v| v.as_ref().to_vec()))
1209    }
1210
1211    /// Return the bytes associated with a key value and the given column family. If you only
1212    /// intend to use the vector returned temporarily, consider using
1213    /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
1214    pub fn get_cf<K: AsRef<[u8]>>(
1215        &self,
1216        cf: &impl AsColumnFamilyRef,
1217        key: K,
1218    ) -> Result<Option<Vec<u8>>, Error> {
1219        DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key.as_ref(), opts))
1220    }
1221
1222    /// Return the value associated with a key using RocksDB's PinnableSlice
1223    /// so as to avoid unnecessary memory copy.
1224    pub fn get_pinned_opt<K: AsRef<[u8]>>(
1225        &'_ self,
1226        key: K,
1227        readopts: &ReadOptions,
1228    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1229        if readopts.inner.is_null() {
1230            return Err(Error::new(
1231                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1232                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1233                    .to_owned(),
1234            ));
1235        }
1236
1237        let key = key.as_ref();
1238        unsafe {
1239            let val = ffi_try!(ffi::rocksdb_get_pinned(
1240                self.inner.inner(),
1241                readopts.inner,
1242                key.as_ptr() as *const c_char,
1243                key.len() as size_t,
1244            ));
1245            if val.is_null() {
1246                Ok(None)
1247            } else {
1248                Ok(Some(DBPinnableSlice::from_c(val)))
1249            }
1250        }
1251    }
1252
1253    /// Return the value associated with a key using RocksDB's PinnableSlice
1254    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1255    /// leverages default options.
1256    pub fn get_pinned<K: AsRef<[u8]>>(
1257        &'_ self,
1258        key: K,
1259    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1260        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
1261    }
1262
1263    /// Return the value associated with a key using RocksDB's PinnableSlice
1264    /// so as to avoid unnecessary memory copy. Similar to get_pinned_opt but
1265    /// allows specifying ColumnFamily
1266    pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1267        &'_ self,
1268        cf: &impl AsColumnFamilyRef,
1269        key: K,
1270        readopts: &ReadOptions,
1271    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1272        if readopts.inner.is_null() {
1273            return Err(Error::new(
1274                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1275                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1276                    .to_owned(),
1277            ));
1278        }
1279
1280        let key = key.as_ref();
1281        unsafe {
1282            let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1283                self.inner.inner(),
1284                readopts.inner,
1285                cf.inner(),
1286                key.as_ptr() as *const c_char,
1287                key.len() as size_t,
1288            ));
1289            if val.is_null() {
1290                Ok(None)
1291            } else {
1292                Ok(Some(DBPinnableSlice::from_c(val)))
1293            }
1294        }
1295    }
1296
1297    /// Return the value associated with a key using RocksDB's PinnableSlice
1298    /// so as to avoid unnecessary memory copy. Similar to get_pinned_cf_opt but
1299    /// leverages default options.
1300    pub fn get_pinned_cf<K: AsRef<[u8]>>(
1301        &'_ self,
1302        cf: &impl AsColumnFamilyRef,
1303        key: K,
1304    ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1305        DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
1306    }
1307
1308    /// Read a value directly into a caller-provided buffer, avoiding memory allocation.
1309    ///
1310    /// This is the most efficient way to read values when you have a pre-allocated
1311    /// buffer. It completely avoids the allocation overhead of [`get`](#method.get)
1312    /// and even the pinning overhead of [`get_pinned`](#method.get_pinned).
1313    ///
1314    /// # Arguments
1315    ///
1316    /// * `key` - The key to look up
1317    /// * `buffer` - A mutable byte slice to write the value into. Can be empty if you
1318    ///   only want to check if a key exists and get its value size.
1319    ///
1320    /// # Returns
1321    ///
1322    /// * `Ok(GetIntoBufferResult::NotFound)` - The key doesn't exist
1323    /// * `Ok(GetIntoBufferResult::Found(size))` - Value was copied into the buffer.
1324    ///   `size` is the number of bytes written.
1325    /// * `Ok(GetIntoBufferResult::BufferTooSmall(size))` - The value exists but the buffer
1326    ///   is too small. `size` is the actual value size needed. No data is written.
1327    /// * `Err(...)` - Database error occurred
1328    ///
1329    /// # Performance
1330    ///
1331    /// This method is ideal for high-throughput scenarios where you can reuse a buffer:
1332    ///
1333    /// ```ignore
1334    /// use rust_rocksdb::{DB, GetIntoBufferResult};
1335    ///
1336    /// let db: DB = /* open database */;
1337    /// let keys_to_lookup: Vec<&[u8]> = /* keys to look up */;
1338    /// let mut buffer = vec![0u8; 4096]; // Reusable buffer
1339    ///
1340    /// for key in keys_to_lookup {
1341    ///     match db.get_into_buffer(key, &mut buffer).unwrap() {
1342    ///         GetIntoBufferResult::Found(len) => {
1343    ///             process_value(&buffer[..len]);
1344    ///         }
1345    ///         GetIntoBufferResult::BufferTooSmall(needed) => {
1346    ///             buffer.resize(needed, 0);
1347    ///             // Retry with larger buffer...
1348    ///         }
1349    ///         GetIntoBufferResult::NotFound => {}
1350    ///     }
1351    /// }
1352    /// ```
1353    ///
1354    /// # Example
1355    ///
1356    /// ```
1357    /// use rust_rocksdb::{DB, GetIntoBufferResult};
1358    ///
1359    /// let tempdir = tempfile::Builder::new()
1360    ///     .prefix("rocksdb_get_into_buffer")
1361    ///     .tempdir()
1362    ///     .unwrap();
1363    /// let db = DB::open_default(tempdir.path()).unwrap();
1364    /// db.put(b"key", b"value").unwrap();
1365    ///
1366    /// let mut buffer = [0u8; 100];
1367    /// match db.get_into_buffer(b"key", &mut buffer).unwrap() {
1368    ///     GetIntoBufferResult::Found(size) => {
1369    ///         assert_eq!(&buffer[..size], b"value");
1370    ///     }
1371    ///     GetIntoBufferResult::NotFound => panic!("expected value"),
1372    ///     GetIntoBufferResult::BufferTooSmall(needed) => {
1373    ///         panic!("buffer too small, need {} bytes", needed)
1374    ///     }
1375    /// }
1376    /// ```
1377    pub fn get_into_buffer<K: AsRef<[u8]>>(
1378        &self,
1379        key: K,
1380        buffer: &mut [u8],
1381    ) -> Result<GetIntoBufferResult, Error> {
1382        DEFAULT_READ_OPTS.with(|opts| self.get_into_buffer_opt(key, buffer, opts))
1383    }
1384
1385    /// Read a value directly into a caller-provided buffer with custom read options.
1386    ///
1387    /// This is the same as [`get_into_buffer`](#method.get_into_buffer) but allows
1388    /// specifying custom [`ReadOptions`], such as setting a snapshot or fill cache behavior.
1389    ///
1390    /// See [`get_into_buffer`](#method.get_into_buffer) for full documentation.
1391    pub fn get_into_buffer_opt<K: AsRef<[u8]>>(
1392        &self,
1393        key: K,
1394        buffer: &mut [u8],
1395        readopts: &ReadOptions,
1396    ) -> Result<GetIntoBufferResult, Error> {
1397        if readopts.inner.is_null() {
1398            return Err(Error::new(
1399                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1400                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1401                    .to_owned(),
1402            ));
1403        }
1404
1405        let key = key.as_ref();
1406        let mut val_len: size_t = 0;
1407        let mut found: c_uchar = 0;
1408
1409        unsafe {
1410            let success = ffi_try!(ffi::rocksdb_get_into_buffer(
1411                self.inner.inner(),
1412                readopts.inner,
1413                key.as_ptr() as *const c_char,
1414                key.len() as size_t,
1415                buffer.as_mut_ptr() as *mut c_char,
1416                buffer.len() as size_t,
1417                &raw mut val_len,
1418                &raw mut found,
1419            ));
1420
1421            if found == 0 {
1422                Ok(GetIntoBufferResult::NotFound)
1423            } else if success != 0 {
1424                Ok(GetIntoBufferResult::Found(val_len))
1425            } else {
1426                Ok(GetIntoBufferResult::BufferTooSmall(val_len))
1427            }
1428        }
1429    }
1430
1431    /// Read a value from a column family directly into a caller-provided buffer.
1432    ///
1433    /// This is the column family variant of [`get_into_buffer`](#method.get_into_buffer).
1434    /// See that method for full documentation on the zero-allocation buffer API.
1435    ///
1436    /// # Arguments
1437    ///
1438    /// * `cf` - The column family to read from
1439    /// * `key` - The key to look up
1440    /// * `buffer` - A mutable byte slice to write the value into
1441    pub fn get_into_buffer_cf<K: AsRef<[u8]>>(
1442        &self,
1443        cf: &impl AsColumnFamilyRef,
1444        key: K,
1445        buffer: &mut [u8],
1446    ) -> Result<GetIntoBufferResult, Error> {
1447        DEFAULT_READ_OPTS.with(|opts| self.get_into_buffer_cf_opt(cf, key, buffer, opts))
1448    }
1449
1450    /// Read a value from a column family directly into a caller-provided buffer
1451    /// with custom read options.
1452    ///
1453    /// This is the column family variant of [`get_into_buffer_opt`](#method.get_into_buffer_opt).
1454    /// See [`get_into_buffer`](#method.get_into_buffer) for full documentation.
1455    pub fn get_into_buffer_cf_opt<K: AsRef<[u8]>>(
1456        &self,
1457        cf: &impl AsColumnFamilyRef,
1458        key: K,
1459        buffer: &mut [u8],
1460        readopts: &ReadOptions,
1461    ) -> Result<GetIntoBufferResult, Error> {
1462        if readopts.inner.is_null() {
1463            return Err(Error::new(
1464                "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1465                 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1466                    .to_owned(),
1467            ));
1468        }
1469
1470        let key = key.as_ref();
1471        let mut val_len: size_t = 0;
1472        let mut found: c_uchar = 0;
1473
1474        unsafe {
1475            let success = ffi_try!(ffi::rocksdb_get_into_buffer_cf(
1476                self.inner.inner(),
1477                readopts.inner,
1478                cf.inner(),
1479                key.as_ptr() as *const c_char,
1480                key.len() as size_t,
1481                buffer.as_mut_ptr() as *mut c_char,
1482                buffer.len() as size_t,
1483                &raw mut val_len,
1484                &raw mut found,
1485            ));
1486
1487            if found == 0 {
1488                Ok(GetIntoBufferResult::NotFound)
1489            } else if success != 0 {
1490                Ok(GetIntoBufferResult::Found(val_len))
1491            } else {
1492                Ok(GetIntoBufferResult::BufferTooSmall(val_len))
1493            }
1494        }
1495    }
1496
1497    /// Return the values associated with the given keys.
1498    pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1499    where
1500        K: AsRef<[u8]>,
1501        I: IntoIterator<Item = K>,
1502    {
1503        DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
1504    }
1505
1506    /// Return the values associated with the given keys using read options.
1507    pub fn multi_get_opt<K, I>(
1508        &self,
1509        keys: I,
1510        readopts: &ReadOptions,
1511    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1512    where
1513        K: AsRef<[u8]>,
1514        I: IntoIterator<Item = K>,
1515    {
1516        let owned_keys: Vec<K> = keys.into_iter().collect();
1517        let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
1518        let ptr_keys: Vec<*const c_char> = owned_keys
1519            .iter()
1520            .map(|k| k.as_ref().as_ptr() as *const c_char)
1521            .collect();
1522
1523        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1524        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
1525        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1526        unsafe {
1527            ffi::rocksdb_multi_get(
1528                self.inner.inner(),
1529                readopts.inner,
1530                ptr_keys.len(),
1531                ptr_keys.as_ptr(),
1532                keys_sizes.as_ptr(),
1533                values.as_mut_ptr(),
1534                values_sizes.as_mut_ptr(),
1535                errors.as_mut_ptr(),
1536            );
1537        }
1538
1539        unsafe {
1540            values.set_len(ptr_keys.len());
1541            values_sizes.set_len(ptr_keys.len());
1542            errors.set_len(ptr_keys.len());
1543        }
1544
1545        convert_values(values, values_sizes, errors)
1546    }
1547
1548    /// Returns pinned values associated with the given keys using default read options.
1549    ///
1550    /// This iterates `get_pinned_opt` for each key and returns zero-copy
1551    /// `DBPinnableSlice` values when present, avoiding value copies.
1552    pub fn multi_get_pinned<K, I>(
1553        &'_ self,
1554        keys: I,
1555    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1556    where
1557        K: AsRef<[u8]>,
1558        I: IntoIterator<Item = K>,
1559    {
1560        DEFAULT_READ_OPTS.with(|opts| self.multi_get_pinned_opt(keys, opts))
1561    }
1562
1563    /// Returns pinned values associated with the given keys using the provided read options.
1564    ///
1565    /// This iterates `get_pinned_opt` for each key and returns zero-copy
1566    /// `DBPinnableSlice` values when present, avoiding value copies.
1567    pub fn multi_get_pinned_opt<K, I>(
1568        &'_ self,
1569        keys: I,
1570        readopts: &ReadOptions,
1571    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1572    where
1573        K: AsRef<[u8]>,
1574        I: IntoIterator<Item = K>,
1575    {
1576        keys.into_iter()
1577            .map(|k| self.get_pinned_opt(k, readopts))
1578            .collect()
1579    }
1580
1581    /// Returns pinned values associated with the given keys and column families
1582    /// using default read options.
1583    pub fn multi_get_pinned_cf<'a, 'b: 'a, K, I, W>(
1584        &'a self,
1585        keys: I,
1586    ) -> Vec<Result<Option<DBPinnableSlice<'a>>, Error>>
1587    where
1588        K: AsRef<[u8]>,
1589        I: IntoIterator<Item = (&'b W, K)>,
1590        W: 'b + AsColumnFamilyRef,
1591    {
1592        DEFAULT_READ_OPTS.with(|opts| self.multi_get_pinned_cf_opt(keys, opts))
1593    }
1594
1595    /// Returns pinned values associated with the given keys and column families
1596    /// using the provided read options.
1597    pub fn multi_get_pinned_cf_opt<'a, 'b: 'a, K, I, W>(
1598        &'a self,
1599        keys: I,
1600        readopts: &ReadOptions,
1601    ) -> Vec<Result<Option<DBPinnableSlice<'a>>, Error>>
1602    where
1603        K: AsRef<[u8]>,
1604        I: IntoIterator<Item = (&'b W, K)>,
1605        W: 'b + AsColumnFamilyRef,
1606    {
1607        keys.into_iter()
1608            .map(|(cf, k)| self.get_pinned_cf_opt(cf, k, readopts))
1609            .collect()
1610    }
1611
1612    /// Return the values associated with the given keys and column families.
1613    pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1614        &'a self,
1615        keys: I,
1616    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1617    where
1618        K: AsRef<[u8]>,
1619        I: IntoIterator<Item = (&'b W, K)>,
1620        W: 'b + AsColumnFamilyRef,
1621    {
1622        DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
1623    }
1624
1625    /// Return the values associated with the given keys and column families using read options.
1626    pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1627        &'a self,
1628        keys: I,
1629        readopts: &ReadOptions,
1630    ) -> Vec<Result<Option<Vec<u8>>, Error>>
1631    where
1632        K: AsRef<[u8]>,
1633        I: IntoIterator<Item = (&'b W, K)>,
1634        W: 'b + AsColumnFamilyRef,
1635    {
1636        let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
1637        let keys_sizes: Vec<usize> = cfs_and_owned_keys
1638            .iter()
1639            .map(|(_, k)| k.as_ref().len())
1640            .collect();
1641        let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
1642            .iter()
1643            .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
1644            .collect();
1645        let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
1646            .iter()
1647            .map(|(c, _)| c.inner().cast_const())
1648            .collect();
1649        let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1650        let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
1651        let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1652        unsafe {
1653            ffi::rocksdb_multi_get_cf(
1654                self.inner.inner(),
1655                readopts.inner,
1656                ptr_cfs.as_ptr(),
1657                ptr_keys.len(),
1658                ptr_keys.as_ptr(),
1659                keys_sizes.as_ptr(),
1660                values.as_mut_ptr(),
1661                values_sizes.as_mut_ptr(),
1662                errors.as_mut_ptr(),
1663            );
1664        }
1665
1666        unsafe {
1667            values.set_len(ptr_keys.len());
1668            values_sizes.set_len(ptr_keys.len());
1669            errors.set_len(ptr_keys.len());
1670        }
1671
1672        convert_values(values, values_sizes, errors)
1673    }
1674
1675    /// Return the values associated with the given keys and the specified column family
1676    /// where internally the read requests are processed in batch if block-based table
1677    /// SST format is used.  It is a more optimized version of multi_get_cf.
1678    pub fn batched_multi_get_cf<'a, K, I>(
1679        &'_ self,
1680        cf: &impl AsColumnFamilyRef,
1681        keys: I,
1682        sorted_input: bool,
1683    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1684    where
1685        K: AsRef<[u8]> + 'a + ?Sized,
1686        I: IntoIterator<Item = &'a K>,
1687    {
1688        DEFAULT_READ_OPTS.with(|opts| self.batched_multi_get_cf_opt(cf, keys, sorted_input, opts))
1689    }
1690
1691    /// Return the values associated with the given keys and the specified column family
1692    /// where internally the read requests are processed in batch if block-based table
1693    /// SST format is used. It is a more optimized version of multi_get_cf_opt.
1694    pub fn batched_multi_get_cf_opt<'a, K, I>(
1695        &'_ self,
1696        cf: &impl AsColumnFamilyRef,
1697        keys: I,
1698        sorted_input: bool,
1699        readopts: &ReadOptions,
1700    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1701    where
1702        K: AsRef<[u8]> + 'a + ?Sized,
1703        I: IntoIterator<Item = &'a K>,
1704    {
1705        let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1706            .into_iter()
1707            .map(|k| {
1708                let k = k.as_ref();
1709                (k.as_ptr() as *const c_char, k.len())
1710            })
1711            .unzip();
1712
1713        let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1714        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1715
1716        unsafe {
1717            ffi::rocksdb_batched_multi_get_cf(
1718                self.inner.inner(),
1719                readopts.inner,
1720                cf.inner(),
1721                ptr_keys.len(),
1722                ptr_keys.as_ptr(),
1723                keys_sizes.as_ptr(),
1724                pinned_values.as_mut_ptr(),
1725                errors.as_mut_ptr(),
1726                sorted_input,
1727            );
1728            pinned_values
1729                .into_iter()
1730                .zip(errors)
1731                .map(|(v, e)| {
1732                    if e.is_null() {
1733                        if v.is_null() {
1734                            Ok(None)
1735                        } else {
1736                            Ok(Some(DBPinnableSlice::from_c(v)))
1737                        }
1738                    } else {
1739                        Err(convert_rocksdb_error(e))
1740                    }
1741                })
1742                .collect()
1743        }
1744    }
1745
1746    /// Return the values associated with the given keys and the specified column family
1747    /// using an optimized slice-based API.
1748    ///
1749    /// This method uses RocksDB's optimized `rocksdb_batched_multi_get_cf_slice` C API,
1750    /// which takes a `rocksdb_slice_t` array directly. This eliminates the internal
1751    /// overhead of converting keys from separate pointer+size arrays to Slice objects.
1752    ///
1753    /// # Arguments
1754    ///
1755    /// * `cf` - The column family to read from
1756    /// * `keys` - An iterator of keys to look up
1757    /// * `sorted_input` - If `true`, indicates the keys are already sorted in ascending
1758    ///   order, which allows RocksDB to skip internal sorting and improve performance.
1759    ///   **Important**: If you pass `true` but keys are not sorted, results may be incorrect.
1760    ///
1761    /// # Returns
1762    ///
1763    /// A vector of results in the same order as the input keys. Each element is:
1764    /// - `Ok(Some(DBPinnableSlice))` if the key was found
1765    /// - `Ok(None)` if the key was not found
1766    /// - `Err(...)` if an error occurred for that key
1767    ///
1768    /// # Performance
1769    ///
1770    /// This is the fastest batch lookup method when:
1771    /// - You're looking up many keys (10+) from the same column family
1772    /// - You can pre-sort your keys (set `sorted_input = true`)
1773    /// - Block-based table format is used (default)
1774    ///
1775    /// For small numbers of keys, the overhead of batching may not be worth it.
1776    /// Consider using [`get_pinned_cf`](#method.get_pinned_cf) for single key lookups.
1777    ///
1778    /// # Example
1779    ///
1780    /// ```
1781    /// use rust_rocksdb::{DB, Options, ColumnFamilyDescriptor};
1782    ///
1783    /// let tempdir = tempfile::Builder::new().prefix("batch_slice").tempdir().unwrap();
1784    /// let mut opts = Options::default();
1785    /// opts.create_if_missing(true);
1786    /// opts.create_missing_column_families(true);
1787    /// let db = DB::open_cf_descriptors(&opts, tempdir.path(),
1788    ///     vec![ColumnFamilyDescriptor::new("cf", Options::default())]).unwrap();
1789    ///
1790    /// let cf = db.cf_handle("cf").unwrap();
1791    /// db.put_cf(&cf, b"k1", b"v1").unwrap();
1792    /// db.put_cf(&cf, b"k2", b"v2").unwrap();
1793    ///
1794    /// // Keys are sorted, so we can set sorted_input = true
1795    /// let keys: Vec<&[u8]> = vec![b"k1", b"k2", b"k3"];
1796    /// let results = db.batched_multi_get_cf_slice(&cf, keys, true);
1797    ///
1798    /// assert!(results[0].as_ref().unwrap().is_some()); // k1 found
1799    /// assert!(results[1].as_ref().unwrap().is_some()); // k2 found
1800    /// assert!(results[2].as_ref().unwrap().is_none()); // k3 not found
1801    /// ```
1802    pub fn batched_multi_get_cf_slice<'a, K, I>(
1803        &'_ self,
1804        cf: &impl AsColumnFamilyRef,
1805        keys: I,
1806        sorted_input: bool,
1807    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1808    where
1809        K: AsRef<[u8]> + 'a + ?Sized,
1810        I: IntoIterator<Item = &'a K>,
1811    {
1812        DEFAULT_READ_OPTS
1813            .with(|opts| self.batched_multi_get_cf_slice_opt(cf, keys, sorted_input, opts))
1814    }
1815
1816    /// Return the values associated with the given keys and the specified column family
1817    /// using an optimized slice-based API with custom read options.
1818    ///
1819    /// This is the same as [`batched_multi_get_cf_slice`](#method.batched_multi_get_cf_slice)
1820    /// but allows specifying custom [`ReadOptions`].
1821    ///
1822    /// See [`batched_multi_get_cf_slice`](#method.batched_multi_get_cf_slice) for full documentation.
1823    pub fn batched_multi_get_cf_slice_opt<'a, K, I>(
1824        &'_ self,
1825        cf: &impl AsColumnFamilyRef,
1826        keys: I,
1827        sorted_input: bool,
1828        readopts: &ReadOptions,
1829    ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1830    where
1831        K: AsRef<[u8]> + 'a + ?Sized,
1832        I: IntoIterator<Item = &'a K>,
1833    {
1834        // Convert keys to rocksdb_slice_t array
1835        let slices: Vec<ffi::rocksdb_slice_t> = keys
1836            .into_iter()
1837            .map(|k| {
1838                let k = k.as_ref();
1839                ffi::rocksdb_slice_t {
1840                    data: k.as_ptr() as *const c_char,
1841                    size: k.len(),
1842                }
1843            })
1844            .collect();
1845
1846        if slices.is_empty() {
1847            return Vec::new();
1848        }
1849
1850        let mut pinned_values = vec![ptr::null_mut(); slices.len()];
1851        let mut errors = vec![ptr::null_mut(); slices.len()];
1852
1853        unsafe {
1854            ffi::rocksdb_batched_multi_get_cf_slice(
1855                self.inner.inner(),
1856                readopts.inner,
1857                cf.inner(),
1858                slices.len(),
1859                slices.as_ptr(),
1860                pinned_values.as_mut_ptr(),
1861                errors.as_mut_ptr(),
1862                sorted_input,
1863            );
1864            pinned_values
1865                .into_iter()
1866                .zip(errors)
1867                .map(|(v, e)| {
1868                    if e.is_null() {
1869                        if v.is_null() {
1870                            Ok(None)
1871                        } else {
1872                            Ok(Some(DBPinnableSlice::from_c(v)))
1873                        }
1874                    } else {
1875                        Err(convert_rocksdb_error(e))
1876                    }
1877                })
1878                .collect()
1879        }
1880    }
1881
1882    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1883    /// `true`. This function uses default `ReadOptions`.
1884    pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1885        DEFAULT_READ_OPTS.with(|opts| self.key_may_exist_opt(key, opts))
1886    }
1887
1888    /// Returns `false` if the given key definitely doesn't exist in the database, otherwise returns
1889    /// `true`.
1890    pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1891        let key = key.as_ref();
1892        unsafe {
1893            0 != ffi::rocksdb_key_may_exist(
1894                self.inner.inner(),
1895                readopts.inner,
1896                key.as_ptr() as *const c_char,
1897                key.len() as size_t,
1898                ptr::null_mut(), /*value*/
1899                ptr::null_mut(), /*val_len*/
1900                ptr::null(),     /*timestamp*/
1901                0,               /*timestamp_len*/
1902                ptr::null_mut(), /*value_found*/
1903            )
1904        }
1905    }
1906
1907    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1908    /// otherwise returns `true`. This function uses default `ReadOptions`.
1909    pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1910        DEFAULT_READ_OPTS.with(|opts| self.key_may_exist_cf_opt(cf, key, opts))
1911    }
1912
1913    /// Returns `false` if the given key definitely doesn't exist in the specified column family,
1914    /// otherwise returns `true`.
1915    pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1916        &self,
1917        cf: &impl AsColumnFamilyRef,
1918        key: K,
1919        readopts: &ReadOptions,
1920    ) -> bool {
1921        let key = key.as_ref();
1922        0 != unsafe {
1923            ffi::rocksdb_key_may_exist_cf(
1924                self.inner.inner(),
1925                readopts.inner,
1926                cf.inner(),
1927                key.as_ptr() as *const c_char,
1928                key.len() as size_t,
1929                ptr::null_mut(), /*value*/
1930                ptr::null_mut(), /*val_len*/
1931                ptr::null(),     /*timestamp*/
1932                0,               /*timestamp_len*/
1933                ptr::null_mut(), /*value_found*/
1934            )
1935        }
1936    }
1937
1938    /// If the key definitely does not exist in the database, then this method
1939    /// returns `(false, None)`, else `(true, None)` if it may.
1940    /// If the key is found in memory, then it returns `(true, Some<CSlice>)`.
1941    ///
1942    /// This check is potentially lighter-weight than calling `get()`. One way
1943    /// to make this lighter weight is to avoid doing any IOs.
1944    pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1945        &self,
1946        cf: &impl AsColumnFamilyRef,
1947        key: K,
1948        readopts: &ReadOptions,
1949    ) -> (bool, Option<CSlice>) {
1950        let key = key.as_ref();
1951        let mut val: *mut c_char = ptr::null_mut();
1952        let mut val_len: usize = 0;
1953        let mut value_found: c_uchar = 0;
1954        let may_exists = 0
1955            != unsafe {
1956                ffi::rocksdb_key_may_exist_cf(
1957                    self.inner.inner(),
1958                    readopts.inner,
1959                    cf.inner(),
1960                    key.as_ptr() as *const c_char,
1961                    key.len() as size_t,
1962                    &raw mut val,         /*value*/
1963                    &raw mut val_len,     /*val_len*/
1964                    ptr::null(),          /*timestamp*/
1965                    0,                    /*timestamp_len*/
1966                    &raw mut value_found, /*value_found*/
1967                )
1968            };
1969        // The value is only allocated (using malloc) and returned if it is found and
1970        // value_found isn't NULL. In that case the user is responsible for freeing it.
1971        if may_exists && value_found != 0 {
1972            (
1973                may_exists,
1974                Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1975            )
1976        } else {
1977            (may_exists, None)
1978        }
1979    }
1980
1981    fn create_inner_cf_handle(
1982        &self,
1983        name: impl CStrLike,
1984        opts: &Options,
1985    ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1986        let cf_name = name.bake().map_err(|err| {
1987            Error::new(format!(
1988                "Failed to convert path to CString when creating cf: {err}"
1989            ))
1990        })?;
1991
1992        // Can't use ffi_try: rocksdb_create_column_family has a bug where it allocates a
1993        // result that needs to be freed on error
1994        let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
1995        let cf_handle = unsafe {
1996            ffi::rocksdb_create_column_family(
1997                self.inner.inner(),
1998                opts.inner,
1999                cf_name.as_ptr(),
2000                &raw mut err,
2001            )
2002        };
2003        if !err.is_null() {
2004            if !cf_handle.is_null() {
2005                unsafe { ffi::rocksdb_column_family_handle_destroy(cf_handle) };
2006            }
2007            return Err(convert_rocksdb_error(err));
2008        }
2009        Ok(cf_handle)
2010    }
2011
2012    pub fn iterator<'a: 'b, 'b>(
2013        &'a self,
2014        mode: IteratorMode,
2015    ) -> DBIteratorWithThreadMode<'b, Self> {
2016        let readopts = ReadOptions::default();
2017        self.iterator_opt(mode, readopts)
2018    }
2019
2020    pub fn iterator_opt<'a: 'b, 'b>(
2021        &'a self,
2022        mode: IteratorMode,
2023        readopts: ReadOptions,
2024    ) -> DBIteratorWithThreadMode<'b, Self> {
2025        DBIteratorWithThreadMode::new(self, readopts, mode)
2026    }
2027
2028    /// Opens an iterator using the provided ReadOptions.
2029    /// This is used when you want to iterate over a specific ColumnFamily with a modified ReadOptions
2030    pub fn iterator_cf_opt<'a: 'b, 'b>(
2031        &'a self,
2032        cf_handle: &impl AsColumnFamilyRef,
2033        readopts: ReadOptions,
2034        mode: IteratorMode,
2035    ) -> DBIteratorWithThreadMode<'b, Self> {
2036        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
2037    }
2038
2039    /// Opens an iterator with `set_total_order_seek` enabled.
2040    /// This must be used to iterate across prefixes when `set_memtable_factory` has been called
2041    /// with a Hash-based implementation.
2042    pub fn full_iterator<'a: 'b, 'b>(
2043        &'a self,
2044        mode: IteratorMode,
2045    ) -> DBIteratorWithThreadMode<'b, Self> {
2046        let mut opts = ReadOptions::default();
2047        opts.set_total_order_seek(true);
2048        DBIteratorWithThreadMode::new(self, opts, mode)
2049    }
2050
2051    pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
2052        &'a self,
2053        prefix: P,
2054    ) -> DBIteratorWithThreadMode<'b, Self> {
2055        let mut opts = ReadOptions::default();
2056        opts.set_prefix_same_as_start(true);
2057        DBIteratorWithThreadMode::new(
2058            self,
2059            opts,
2060            IteratorMode::From(prefix.as_ref(), Direction::Forward),
2061        )
2062    }
2063
2064    pub fn iterator_cf<'a: 'b, 'b>(
2065        &'a self,
2066        cf_handle: &impl AsColumnFamilyRef,
2067        mode: IteratorMode,
2068    ) -> DBIteratorWithThreadMode<'b, Self> {
2069        let opts = ReadOptions::default();
2070        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
2071    }
2072
2073    pub fn full_iterator_cf<'a: 'b, 'b>(
2074        &'a self,
2075        cf_handle: &impl AsColumnFamilyRef,
2076        mode: IteratorMode,
2077    ) -> DBIteratorWithThreadMode<'b, Self> {
2078        let mut opts = ReadOptions::default();
2079        opts.set_total_order_seek(true);
2080        DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
2081    }
2082
2083    pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
2084        &'a self,
2085        cf_handle: &impl AsColumnFamilyRef,
2086        prefix: P,
2087    ) -> DBIteratorWithThreadMode<'a, Self> {
2088        let mut opts = ReadOptions::default();
2089        opts.set_prefix_same_as_start(true);
2090        DBIteratorWithThreadMode::<'a, Self>::new_cf(
2091            self,
2092            cf_handle.inner(),
2093            opts,
2094            IteratorMode::From(prefix.as_ref(), Direction::Forward),
2095        )
2096    }
2097
2098    /// Returns `true` if there exists at least one key with the given prefix
2099    /// in the default column family using default read options.
2100    ///
2101    /// When to use: prefer this for one-shot checks. It enables
2102    /// `prefix_same_as_start(true)` and bounds the iterator to the
2103    /// prefix via `PrefixRange`, minimizing stray IO per call.
2104    pub fn prefix_exists<P: AsRef<[u8]>>(&self, prefix: P) -> Result<bool, Error> {
2105        let p = prefix.as_ref();
2106        PREFIX_READ_OPTS.with(|rc| {
2107            let mut opts = rc.borrow_mut();
2108            opts.set_iterate_range(crate::PrefixRange(p));
2109            self.prefix_exists_opt(p, &opts)
2110        })
2111    }
2112
2113    /// Returns `true` if there exists at least one key with the given prefix
2114    /// in the default column family using the provided read options.
2115    pub fn prefix_exists_opt<P: AsRef<[u8]>>(
2116        &self,
2117        prefix: P,
2118        readopts: &ReadOptions,
2119    ) -> Result<bool, Error> {
2120        let prefix = prefix.as_ref();
2121        let iter = unsafe { self.create_iterator(readopts) };
2122        let res = unsafe {
2123            ffi::rocksdb_iter_seek(
2124                iter,
2125                prefix.as_ptr() as *const c_char,
2126                prefix.len() as size_t,
2127            );
2128            if ffi::rocksdb_iter_valid(iter) != 0 {
2129                let mut key_len: size_t = 0;
2130                let key_ptr = ffi::rocksdb_iter_key(iter, &raw mut key_len);
2131                let key = slice::from_raw_parts(key_ptr as *const u8, key_len as usize);
2132                Ok(key.starts_with(prefix))
2133            } else if let Err(e) = (|| {
2134                // Check status to differentiate end-of-range vs error
2135                ffi_try!(ffi::rocksdb_iter_get_error(iter));
2136                Ok::<(), Error>(())
2137            })() {
2138                Err(e)
2139            } else {
2140                Ok(false)
2141            }
2142        };
2143        unsafe { ffi::rocksdb_iter_destroy(iter) };
2144        res
2145    }
2146
2147    /// Creates a reusable prefix prober over the default column family using
2148    /// read options optimized for prefix probes.
2149    ///
2150    /// When to use: prefer this in hot loops with many checks per second. It
2151    /// reuses a raw iterator to avoid per-call allocation/FFI overhead. If you
2152    /// need custom tuning (e.g. async IO, readahead, cache-only), use
2153    /// `prefix_prober_with_opts`.
2154    pub fn prefix_prober(&self) -> PrefixProber<'_, Self> {
2155        let mut opts = ReadOptions::default();
2156        opts.set_prefix_same_as_start(true);
2157        PrefixProber {
2158            raw: DBRawIteratorWithThreadMode::new(self, opts),
2159        }
2160    }
2161
2162    /// Creates a reusable prefix prober over the default column family using
2163    /// the provided read options (owned).
2164    ///
2165    /// When to use: advanced tuning for heavy workloads. Callers can set
2166    /// `set_async_io(true)`, `set_readahead_size`, `set_read_tier`, etc. Note:
2167    /// the prober owns `ReadOptions` to keep internal buffers alive.
2168    pub fn prefix_prober_with_opts(&self, readopts: ReadOptions) -> PrefixProber<'_, Self> {
2169        PrefixProber {
2170            raw: DBRawIteratorWithThreadMode::new(self, readopts),
2171        }
2172    }
2173
2174    /// Creates a reusable prefix prober over the specified column family using
2175    /// read options optimized for prefix probes.
2176    pub fn prefix_prober_cf(&self, cf_handle: &impl AsColumnFamilyRef) -> PrefixProber<'_, Self> {
2177        let mut opts = ReadOptions::default();
2178        opts.set_prefix_same_as_start(true);
2179        PrefixProber {
2180            raw: DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts),
2181        }
2182    }
2183
2184    /// Creates a reusable prefix prober over the specified column family using
2185    /// the provided read options (owned).
2186    ///
2187    /// When to use: advanced tuning for heavy workloads on a specific CF.
2188    pub fn prefix_prober_cf_with_opts(
2189        &self,
2190        cf_handle: &impl AsColumnFamilyRef,
2191        readopts: ReadOptions,
2192    ) -> PrefixProber<'_, Self> {
2193        PrefixProber {
2194            raw: DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts),
2195        }
2196    }
2197
2198    /// Returns `true` if there exists at least one key with the given prefix
2199    /// in the specified column family using default read options.
2200    ///
2201    /// When to use: one-shot checks on a CF. Enables
2202    /// `prefix_same_as_start(true)` and bounds the iterator via `PrefixRange`.
2203    pub fn prefix_exists_cf<P: AsRef<[u8]>>(
2204        &self,
2205        cf_handle: &impl AsColumnFamilyRef,
2206        prefix: P,
2207    ) -> Result<bool, Error> {
2208        let p = prefix.as_ref();
2209        PREFIX_READ_OPTS.with(|rc| {
2210            let mut opts = rc.borrow_mut();
2211            opts.set_iterate_range(crate::PrefixRange(p));
2212            self.prefix_exists_cf_opt(cf_handle, p, &opts)
2213        })
2214    }
2215
2216    /// Returns `true` if there exists at least one key with the given prefix
2217    /// in the specified column family using the provided read options.
2218    pub fn prefix_exists_cf_opt<P: AsRef<[u8]>>(
2219        &self,
2220        cf_handle: &impl AsColumnFamilyRef,
2221        prefix: P,
2222        readopts: &ReadOptions,
2223    ) -> Result<bool, Error> {
2224        let prefix = prefix.as_ref();
2225        let iter = unsafe { self.create_iterator_cf(cf_handle.inner(), readopts) };
2226        let res = unsafe {
2227            ffi::rocksdb_iter_seek(
2228                iter,
2229                prefix.as_ptr() as *const c_char,
2230                prefix.len() as size_t,
2231            );
2232            if ffi::rocksdb_iter_valid(iter) != 0 {
2233                let mut key_len: size_t = 0;
2234                let key_ptr = ffi::rocksdb_iter_key(iter, &raw mut key_len);
2235                let key = slice::from_raw_parts(key_ptr as *const u8, key_len as usize);
2236                Ok(key.starts_with(prefix))
2237            } else if let Err(e) = (|| {
2238                ffi_try!(ffi::rocksdb_iter_get_error(iter));
2239                Ok::<(), Error>(())
2240            })() {
2241                Err(e)
2242            } else {
2243                Ok(false)
2244            }
2245        };
2246        unsafe { ffi::rocksdb_iter_destroy(iter) };
2247        res
2248    }
2249
2250    /// Opens a raw iterator over the database, using the default read options
2251    pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
2252        let opts = ReadOptions::default();
2253        DBRawIteratorWithThreadMode::new(self, opts)
2254    }
2255
2256    /// Opens a raw iterator over the given column family, using the default read options
2257    pub fn raw_iterator_cf<'a: 'b, 'b>(
2258        &'a self,
2259        cf_handle: &impl AsColumnFamilyRef,
2260    ) -> DBRawIteratorWithThreadMode<'b, Self> {
2261        let opts = ReadOptions::default();
2262        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
2263    }
2264
2265    /// Opens a raw iterator over the database, using the given read options
2266    pub fn raw_iterator_opt<'a: 'b, 'b>(
2267        &'a self,
2268        readopts: ReadOptions,
2269    ) -> DBRawIteratorWithThreadMode<'b, Self> {
2270        DBRawIteratorWithThreadMode::new(self, readopts)
2271    }
2272
2273    /// Opens a raw iterator over the given column family, using the given read options
2274    pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
2275        &'a self,
2276        cf_handle: &impl AsColumnFamilyRef,
2277        readopts: ReadOptions,
2278    ) -> DBRawIteratorWithThreadMode<'b, Self> {
2279        DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
2280    }
2281
2282    pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
2283        SnapshotWithThreadMode::<Self>::new(self)
2284    }
2285
2286    pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
2287    where
2288        K: AsRef<[u8]>,
2289        V: AsRef<[u8]>,
2290    {
2291        let key = key.as_ref();
2292        let value = value.as_ref();
2293
2294        unsafe {
2295            ffi_try!(ffi::rocksdb_put(
2296                self.inner.inner(),
2297                writeopts.inner,
2298                key.as_ptr() as *const c_char,
2299                key.len() as size_t,
2300                value.as_ptr() as *const c_char,
2301                value.len() as size_t,
2302            ));
2303            Ok(())
2304        }
2305    }
2306
2307    pub fn put_cf_opt<K, V>(
2308        &self,
2309        cf: &impl AsColumnFamilyRef,
2310        key: K,
2311        value: V,
2312        writeopts: &WriteOptions,
2313    ) -> Result<(), Error>
2314    where
2315        K: AsRef<[u8]>,
2316        V: AsRef<[u8]>,
2317    {
2318        let key = key.as_ref();
2319        let value = value.as_ref();
2320
2321        unsafe {
2322            ffi_try!(ffi::rocksdb_put_cf(
2323                self.inner.inner(),
2324                writeopts.inner,
2325                cf.inner(),
2326                key.as_ptr() as *const c_char,
2327                key.len() as size_t,
2328                value.as_ptr() as *const c_char,
2329                value.len() as size_t,
2330            ));
2331            Ok(())
2332        }
2333    }
2334
2335    /// Set the database entry for "key" to "value" with WriteOptions.
2336    /// If "key" already exists, it will coexist with previous entry.
2337    /// `Get` with a timestamp ts specified in ReadOptions will return
2338    /// the most recent key/value whose timestamp is smaller than or equal to ts.
2339    /// Takes an additional argument `ts` as the timestamp.
2340    /// Note: the DB must be opened with user defined timestamp enabled.
2341    pub fn put_with_ts_opt<K, V, S>(
2342        &self,
2343        key: K,
2344        ts: S,
2345        value: V,
2346        writeopts: &WriteOptions,
2347    ) -> Result<(), Error>
2348    where
2349        K: AsRef<[u8]>,
2350        V: AsRef<[u8]>,
2351        S: AsRef<[u8]>,
2352    {
2353        let key = key.as_ref();
2354        let value = value.as_ref();
2355        let ts = ts.as_ref();
2356        unsafe {
2357            ffi_try!(ffi::rocksdb_put_with_ts(
2358                self.inner.inner(),
2359                writeopts.inner,
2360                key.as_ptr() as *const c_char,
2361                key.len() as size_t,
2362                ts.as_ptr() as *const c_char,
2363                ts.len() as size_t,
2364                value.as_ptr() as *const c_char,
2365                value.len() as size_t,
2366            ));
2367            Ok(())
2368        }
2369    }
2370
2371    /// Put with timestamp in a specific column family with WriteOptions.
2372    /// If "key" already exists, it will coexist with previous entry.
2373    /// `Get` with a timestamp ts specified in ReadOptions will return
2374    /// the most recent key/value whose timestamp is smaller than or equal to ts.
2375    /// Takes an additional argument `ts` as the timestamp.
2376    /// Note: the DB must be opened with user defined timestamp enabled.
2377    pub fn put_cf_with_ts_opt<K, V, S>(
2378        &self,
2379        cf: &impl AsColumnFamilyRef,
2380        key: K,
2381        ts: S,
2382        value: V,
2383        writeopts: &WriteOptions,
2384    ) -> Result<(), Error>
2385    where
2386        K: AsRef<[u8]>,
2387        V: AsRef<[u8]>,
2388        S: AsRef<[u8]>,
2389    {
2390        let key = key.as_ref();
2391        let value = value.as_ref();
2392        let ts = ts.as_ref();
2393        unsafe {
2394            ffi_try!(ffi::rocksdb_put_cf_with_ts(
2395                self.inner.inner(),
2396                writeopts.inner,
2397                cf.inner(),
2398                key.as_ptr() as *const c_char,
2399                key.len() as size_t,
2400                ts.as_ptr() as *const c_char,
2401                ts.len() as size_t,
2402                value.as_ptr() as *const c_char,
2403                value.len() as size_t,
2404            ));
2405            Ok(())
2406        }
2407    }
2408
2409    pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
2410    where
2411        K: AsRef<[u8]>,
2412        V: AsRef<[u8]>,
2413    {
2414        let key = key.as_ref();
2415        let value = value.as_ref();
2416
2417        unsafe {
2418            ffi_try!(ffi::rocksdb_merge(
2419                self.inner.inner(),
2420                writeopts.inner,
2421                key.as_ptr() as *const c_char,
2422                key.len() as size_t,
2423                value.as_ptr() as *const c_char,
2424                value.len() as size_t,
2425            ));
2426            Ok(())
2427        }
2428    }
2429
2430    pub fn merge_cf_opt<K, V>(
2431        &self,
2432        cf: &impl AsColumnFamilyRef,
2433        key: K,
2434        value: V,
2435        writeopts: &WriteOptions,
2436    ) -> Result<(), Error>
2437    where
2438        K: AsRef<[u8]>,
2439        V: AsRef<[u8]>,
2440    {
2441        let key = key.as_ref();
2442        let value = value.as_ref();
2443
2444        unsafe {
2445            ffi_try!(ffi::rocksdb_merge_cf(
2446                self.inner.inner(),
2447                writeopts.inner,
2448                cf.inner(),
2449                key.as_ptr() as *const c_char,
2450                key.len() as size_t,
2451                value.as_ptr() as *const c_char,
2452                value.len() as size_t,
2453            ));
2454            Ok(())
2455        }
2456    }
2457
2458    pub fn delete_opt<K: AsRef<[u8]>>(
2459        &self,
2460        key: K,
2461        writeopts: &WriteOptions,
2462    ) -> Result<(), Error> {
2463        let key = key.as_ref();
2464
2465        unsafe {
2466            ffi_try!(ffi::rocksdb_delete(
2467                self.inner.inner(),
2468                writeopts.inner,
2469                key.as_ptr() as *const c_char,
2470                key.len() as size_t,
2471            ));
2472            Ok(())
2473        }
2474    }
2475
2476    pub fn delete_cf_opt<K: AsRef<[u8]>>(
2477        &self,
2478        cf: &impl AsColumnFamilyRef,
2479        key: K,
2480        writeopts: &WriteOptions,
2481    ) -> Result<(), Error> {
2482        let key = key.as_ref();
2483
2484        unsafe {
2485            ffi_try!(ffi::rocksdb_delete_cf(
2486                self.inner.inner(),
2487                writeopts.inner,
2488                cf.inner(),
2489                key.as_ptr() as *const c_char,
2490                key.len() as size_t,
2491            ));
2492            Ok(())
2493        }
2494    }
2495
2496    /// Remove the database entry (if any) for "key" with WriteOptions.
2497    /// Takes an additional argument `ts` as the timestamp.
2498    /// Note: the DB must be opened with user defined timestamp enabled.
2499    pub fn delete_with_ts_opt<K, S>(
2500        &self,
2501        key: K,
2502        ts: S,
2503        writeopts: &WriteOptions,
2504    ) -> Result<(), Error>
2505    where
2506        K: AsRef<[u8]>,
2507        S: AsRef<[u8]>,
2508    {
2509        let key = key.as_ref();
2510        let ts = ts.as_ref();
2511        unsafe {
2512            ffi_try!(ffi::rocksdb_delete_with_ts(
2513                self.inner.inner(),
2514                writeopts.inner,
2515                key.as_ptr() as *const c_char,
2516                key.len() as size_t,
2517                ts.as_ptr() as *const c_char,
2518                ts.len() as size_t,
2519            ));
2520            Ok(())
2521        }
2522    }
2523
2524    /// Delete with timestamp in a specific column family with WriteOptions.
2525    /// Takes an additional argument `ts` as the timestamp.
2526    /// Note: the DB must be opened with user defined timestamp enabled.
2527    pub fn delete_cf_with_ts_opt<K, S>(
2528        &self,
2529        cf: &impl AsColumnFamilyRef,
2530        key: K,
2531        ts: S,
2532        writeopts: &WriteOptions,
2533    ) -> Result<(), Error>
2534    where
2535        K: AsRef<[u8]>,
2536        S: AsRef<[u8]>,
2537    {
2538        let key = key.as_ref();
2539        let ts = ts.as_ref();
2540        unsafe {
2541            ffi_try!(ffi::rocksdb_delete_cf_with_ts(
2542                self.inner.inner(),
2543                writeopts.inner,
2544                cf.inner(),
2545                key.as_ptr() as *const c_char,
2546                key.len() as size_t,
2547                ts.as_ptr() as *const c_char,
2548                ts.len() as size_t,
2549            ));
2550            Ok(())
2551        }
2552    }
2553
2554    pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
2555    where
2556        K: AsRef<[u8]>,
2557        V: AsRef<[u8]>,
2558    {
2559        DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
2560    }
2561
2562    pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
2563    where
2564        K: AsRef<[u8]>,
2565        V: AsRef<[u8]>,
2566    {
2567        DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
2568    }
2569
2570    /// Set the database entry for "key" to "value".
2571    /// If "key" already exists, it will coexist with previous entry.
2572    /// `Get` with a timestamp ts specified in ReadOptions will return
2573    /// the most recent key/value whose timestamp is smaller than or equal to ts.
2574    /// Takes an additional argument `ts` as the timestamp.
2575    /// Note: the DB must be opened with user defined timestamp enabled.
2576    pub fn put_with_ts<K, V, S>(&self, key: K, ts: S, value: V) -> Result<(), Error>
2577    where
2578        K: AsRef<[u8]>,
2579        V: AsRef<[u8]>,
2580        S: AsRef<[u8]>,
2581    {
2582        DEFAULT_WRITE_OPTS
2583            .with(|opts| self.put_with_ts_opt(key.as_ref(), ts.as_ref(), value.as_ref(), opts))
2584    }
2585
2586    /// Put with timestamp in a specific column family.
2587    /// If "key" already exists, it will coexist with previous entry.
2588    /// `Get` with a timestamp ts specified in ReadOptions will return
2589    /// the most recent key/value whose timestamp is smaller than or equal to ts.
2590    /// Takes an additional argument `ts` as the timestamp.
2591    /// Note: the DB must be opened with user defined timestamp enabled.
2592    pub fn put_cf_with_ts<K, V, S>(
2593        &self,
2594        cf: &impl AsColumnFamilyRef,
2595        key: K,
2596        ts: S,
2597        value: V,
2598    ) -> Result<(), Error>
2599    where
2600        K: AsRef<[u8]>,
2601        V: AsRef<[u8]>,
2602        S: AsRef<[u8]>,
2603    {
2604        DEFAULT_WRITE_OPTS.with(|opts| {
2605            self.put_cf_with_ts_opt(cf, key.as_ref(), ts.as_ref(), value.as_ref(), opts)
2606        })
2607    }
2608
2609    pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
2610    where
2611        K: AsRef<[u8]>,
2612        V: AsRef<[u8]>,
2613    {
2614        DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
2615    }
2616
2617    pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
2618    where
2619        K: AsRef<[u8]>,
2620        V: AsRef<[u8]>,
2621    {
2622        DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
2623    }
2624
2625    pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2626        DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
2627    }
2628
2629    pub fn delete_cf<K: AsRef<[u8]>>(
2630        &self,
2631        cf: &impl AsColumnFamilyRef,
2632        key: K,
2633    ) -> Result<(), Error> {
2634        DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
2635    }
2636
2637    /// Remove the database entry (if any) for "key".
2638    /// Takes an additional argument `ts` as the timestamp.
2639    /// Note: the DB must be opened with user defined timestamp enabled.
2640    pub fn delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2641        &self,
2642        key: K,
2643        ts: S,
2644    ) -> Result<(), Error> {
2645        DEFAULT_WRITE_OPTS.with(|opts| self.delete_with_ts_opt(key, ts, opts))
2646    }
2647
2648    /// Delete with timestamp in a specific column family.
2649    /// Takes an additional argument `ts` as the timestamp.
2650    /// Note: the DB must be opened with user defined timestamp enabled.
2651    pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2652        &self,
2653        cf: &impl AsColumnFamilyRef,
2654        key: K,
2655        ts: S,
2656    ) -> Result<(), Error> {
2657        DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_with_ts_opt(cf, key, ts, opts))
2658    }
2659
2660    /// Remove the database entry for "key" with WriteOptions.
2661    ///
2662    /// Requires that the key exists and was not overwritten. Returns OK on success,
2663    /// and a non-OK status on error. It is not an error if "key" did not exist in the database.
2664    ///
2665    /// If a key is overwritten (by calling Put() multiple times), then the result
2666    /// of calling SingleDelete() on this key is undefined. SingleDelete() only
2667    /// behaves correctly if there has been only one Put() for this key since the
2668    /// previous call to SingleDelete() for this key.
2669    ///
2670    /// This feature is currently an experimental performance optimization
2671    /// for a very specific workload. It is up to the caller to ensure that
2672    /// SingleDelete is only used for a key that is not deleted using Delete() or
2673    /// written using Merge(). Mixing SingleDelete operations with Deletes and
2674    /// Merges can result in undefined behavior.
2675    ///
2676    /// Note: consider setting options.sync = true.
2677    ///
2678    /// For more information, see <https://github.com/facebook/rocksdb/wiki/Single-Delete>
2679    pub fn single_delete_opt<K: AsRef<[u8]>>(
2680        &self,
2681        key: K,
2682        writeopts: &WriteOptions,
2683    ) -> Result<(), Error> {
2684        let key = key.as_ref();
2685
2686        unsafe {
2687            ffi_try!(ffi::rocksdb_singledelete(
2688                self.inner.inner(),
2689                writeopts.inner,
2690                key.as_ptr() as *const c_char,
2691                key.len() as size_t,
2692            ));
2693            Ok(())
2694        }
2695    }
2696
2697    /// Remove the database entry for "key" from a specific column family with WriteOptions.
2698    ///
2699    /// See single_delete_opt() for detailed behavior and restrictions.
2700    pub fn single_delete_cf_opt<K: AsRef<[u8]>>(
2701        &self,
2702        cf: &impl AsColumnFamilyRef,
2703        key: K,
2704        writeopts: &WriteOptions,
2705    ) -> Result<(), Error> {
2706        let key = key.as_ref();
2707
2708        unsafe {
2709            ffi_try!(ffi::rocksdb_singledelete_cf(
2710                self.inner.inner(),
2711                writeopts.inner,
2712                cf.inner(),
2713                key.as_ptr() as *const c_char,
2714                key.len() as size_t,
2715            ));
2716            Ok(())
2717        }
2718    }
2719
2720    /// Remove the database entry for "key" with WriteOptions.
2721    ///
2722    /// Takes an additional argument `ts` as the timestamp.
2723    /// Note: the DB must be opened with user defined timestamp enabled.
2724    ///
2725    /// See single_delete_opt() for detailed behavior and restrictions.
2726    pub fn single_delete_with_ts_opt<K, S>(
2727        &self,
2728        key: K,
2729        ts: S,
2730        writeopts: &WriteOptions,
2731    ) -> Result<(), Error>
2732    where
2733        K: AsRef<[u8]>,
2734        S: AsRef<[u8]>,
2735    {
2736        let key = key.as_ref();
2737        let ts = ts.as_ref();
2738        unsafe {
2739            ffi_try!(ffi::rocksdb_singledelete_with_ts(
2740                self.inner.inner(),
2741                writeopts.inner,
2742                key.as_ptr() as *const c_char,
2743                key.len() as size_t,
2744                ts.as_ptr() as *const c_char,
2745                ts.len() as size_t,
2746            ));
2747            Ok(())
2748        }
2749    }
2750
2751    /// Remove the database entry for "key" from a specific column family with WriteOptions.
2752    ///
2753    /// Takes an additional argument `ts` as the timestamp.
2754    /// Note: the DB must be opened with user defined timestamp enabled.
2755    ///
2756    /// See single_delete_opt() for detailed behavior and restrictions.
2757    pub fn single_delete_cf_with_ts_opt<K, S>(
2758        &self,
2759        cf: &impl AsColumnFamilyRef,
2760        key: K,
2761        ts: S,
2762        writeopts: &WriteOptions,
2763    ) -> Result<(), Error>
2764    where
2765        K: AsRef<[u8]>,
2766        S: AsRef<[u8]>,
2767    {
2768        let key = key.as_ref();
2769        let ts = ts.as_ref();
2770        unsafe {
2771            ffi_try!(ffi::rocksdb_singledelete_cf_with_ts(
2772                self.inner.inner(),
2773                writeopts.inner,
2774                cf.inner(),
2775                key.as_ptr() as *const c_char,
2776                key.len() as size_t,
2777                ts.as_ptr() as *const c_char,
2778                ts.len() as size_t,
2779            ));
2780            Ok(())
2781        }
2782    }
2783
2784    /// Remove the database entry for "key".
2785    ///
2786    /// See single_delete_opt() for detailed behavior and restrictions.
2787    pub fn single_delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2788        DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_opt(key, opts))
2789    }
2790
2791    /// Remove the database entry for "key" from a specific column family.
2792    ///
2793    /// See single_delete_opt() for detailed behavior and restrictions.
2794    pub fn single_delete_cf<K: AsRef<[u8]>>(
2795        &self,
2796        cf: &impl AsColumnFamilyRef,
2797        key: K,
2798    ) -> Result<(), Error> {
2799        DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_cf_opt(cf, key, opts))
2800    }
2801
2802    /// Remove the database entry for "key".
2803    ///
2804    /// Takes an additional argument `ts` as the timestamp.
2805    /// Note: the DB must be opened with user defined timestamp enabled.
2806    ///
2807    /// See single_delete_opt() for detailed behavior and restrictions.
2808    pub fn single_delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2809        &self,
2810        key: K,
2811        ts: S,
2812    ) -> Result<(), Error> {
2813        DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_with_ts_opt(key, ts, opts))
2814    }
2815
2816    /// Remove the database entry for "key" from a specific column family.
2817    ///
2818    /// Takes an additional argument `ts` as the timestamp.
2819    /// Note: the DB must be opened with user defined timestamp enabled.
2820    ///
2821    /// See single_delete_opt() for detailed behavior and restrictions.
2822    pub fn single_delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2823        &self,
2824        cf: &impl AsColumnFamilyRef,
2825        key: K,
2826        ts: S,
2827    ) -> Result<(), Error> {
2828        DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_cf_with_ts_opt(cf, key, ts, opts))
2829    }
2830
2831    /// Runs a manual compaction on the Range of keys given. This is not likely to be needed for typical usage.
2832    pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
2833        unsafe {
2834            let start = start.as_ref().map(AsRef::as_ref);
2835            let end = end.as_ref().map(AsRef::as_ref);
2836
2837            ffi::rocksdb_compact_range(
2838                self.inner.inner(),
2839                opt_bytes_to_ptr(start),
2840                start.map_or(0, <[u8]>::len) as size_t,
2841                opt_bytes_to_ptr(end),
2842                end.map_or(0, <[u8]>::len) as size_t,
2843            );
2844        }
2845    }
2846
2847    /// Same as `compact_range` but with custom options.
2848    pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2849        &self,
2850        start: Option<S>,
2851        end: Option<E>,
2852        opts: &CompactOptions,
2853    ) {
2854        unsafe {
2855            let start = start.as_ref().map(AsRef::as_ref);
2856            let end = end.as_ref().map(AsRef::as_ref);
2857
2858            ffi::rocksdb_compact_range_opt(
2859                self.inner.inner(),
2860                opts.inner,
2861                opt_bytes_to_ptr(start),
2862                start.map_or(0, <[u8]>::len) as size_t,
2863                opt_bytes_to_ptr(end),
2864                end.map_or(0, <[u8]>::len) as size_t,
2865            );
2866        }
2867    }
2868
2869    /// Runs a manual compaction on the Range of keys given on the
2870    /// given column family. This is not likely to be needed for typical usage.
2871    pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2872        &self,
2873        cf: &impl AsColumnFamilyRef,
2874        start: Option<S>,
2875        end: Option<E>,
2876    ) {
2877        unsafe {
2878            let start = start.as_ref().map(AsRef::as_ref);
2879            let end = end.as_ref().map(AsRef::as_ref);
2880
2881            ffi::rocksdb_compact_range_cf(
2882                self.inner.inner(),
2883                cf.inner(),
2884                opt_bytes_to_ptr(start),
2885                start.map_or(0, <[u8]>::len) as size_t,
2886                opt_bytes_to_ptr(end),
2887                end.map_or(0, <[u8]>::len) as size_t,
2888            );
2889        }
2890    }
2891
2892    /// Same as `compact_range_cf` but with custom options.
2893    pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2894        &self,
2895        cf: &impl AsColumnFamilyRef,
2896        start: Option<S>,
2897        end: Option<E>,
2898        opts: &CompactOptions,
2899    ) {
2900        unsafe {
2901            let start = start.as_ref().map(AsRef::as_ref);
2902            let end = end.as_ref().map(AsRef::as_ref);
2903
2904            ffi::rocksdb_compact_range_cf_opt(
2905                self.inner.inner(),
2906                cf.inner(),
2907                opts.inner,
2908                opt_bytes_to_ptr(start),
2909                start.map_or(0, <[u8]>::len) as size_t,
2910                opt_bytes_to_ptr(end),
2911                end.map_or(0, <[u8]>::len) as size_t,
2912            );
2913        }
2914    }
2915
2916    /// Wait for all flush and compactions jobs to finish. Jobs to wait include the
2917    /// unscheduled (queued, but not scheduled yet).
2918    ///
2919    /// NOTE: This may also never return if there's sufficient ongoing writes that
2920    /// keeps flush and compaction going without stopping. The user would have to
2921    /// cease all the writes to DB to make this eventually return in a stable
2922    /// state. The user may also use timeout option in WaitForCompactOptions to
2923    /// make this stop waiting and return when timeout expires.
2924    pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
2925        unsafe {
2926            ffi_try!(ffi::rocksdb_wait_for_compact(
2927                self.inner.inner(),
2928                opts.inner
2929            ));
2930        }
2931        Ok(())
2932    }
2933
2934    pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
2935        let copts = convert_options(opts)?;
2936        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2937        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2938        let count = opts.len() as i32;
2939        unsafe {
2940            ffi_try!(ffi::rocksdb_set_options(
2941                self.inner.inner(),
2942                count,
2943                cnames.as_ptr(),
2944                cvalues.as_ptr(),
2945            ));
2946        }
2947        Ok(())
2948    }
2949
2950    pub fn set_options_cf(
2951        &self,
2952        cf: &impl AsColumnFamilyRef,
2953        opts: &[(&str, &str)],
2954    ) -> Result<(), Error> {
2955        let copts = convert_options(opts)?;
2956        let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2957        let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2958        let count = opts.len() as i32;
2959        unsafe {
2960            ffi_try!(ffi::rocksdb_set_options_cf(
2961                self.inner.inner(),
2962                cf.inner(),
2963                count,
2964                cnames.as_ptr(),
2965                cvalues.as_ptr(),
2966            ));
2967        }
2968        Ok(())
2969    }
2970
2971    /// Implementation for property_value et al methods.
2972    ///
2973    /// `name` is the name of the property.  It will be converted into a CString
2974    /// and passed to `get_property` as argument.  `get_property` reads the
2975    /// specified property and either returns NULL or a pointer to a C allocated
2976    /// string; this method takes ownership of that string and will free it at
2977    /// the end. That string is parsed using `parse` callback which produces
2978    /// the returned result.
2979    fn property_value_impl<R>(
2980        name: impl CStrLike,
2981        get_property: impl FnOnce(*const c_char) -> *mut c_char,
2982        parse: impl FnOnce(&str) -> Result<R, Error>,
2983    ) -> Result<Option<R>, Error> {
2984        let value = match name.bake() {
2985            Ok(prop_name) => get_property(prop_name.as_ptr()),
2986            Err(e) => {
2987                return Err(Error::new(format!(
2988                    "Failed to convert property name to CString: {e}"
2989                )));
2990            }
2991        };
2992        if value.is_null() {
2993            return Ok(None);
2994        }
2995        let result = match unsafe { CStr::from_ptr(value) }.to_str() {
2996            Ok(s) => parse(s).map(|value| Some(value)),
2997            Err(e) => Err(Error::new(format!(
2998                "Failed to convert property value to string: {e}"
2999            ))),
3000        };
3001        unsafe {
3002            ffi::rocksdb_free(value as *mut c_void);
3003        }
3004        result
3005    }
3006
3007    /// Retrieves a RocksDB property by name.
3008    ///
3009    /// Full list of properties could be find
3010    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
3011    pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
3012        Self::property_value_impl(
3013            name,
3014            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
3015            |str_value| Ok(str_value.to_owned()),
3016        )
3017    }
3018
3019    /// Retrieves a RocksDB property by name, for a specific column family.
3020    ///
3021    /// Full list of properties could be find
3022    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L428-L634).
3023    pub fn property_value_cf(
3024        &self,
3025        cf: &impl AsColumnFamilyRef,
3026        name: impl CStrLike,
3027    ) -> Result<Option<String>, Error> {
3028        Self::property_value_impl(
3029            name,
3030            |prop_name| unsafe {
3031                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
3032            },
3033            |str_value| Ok(str_value.to_owned()),
3034        )
3035    }
3036
3037    fn parse_property_int_value(value: &str) -> Result<u64, Error> {
3038        value.parse::<u64>().map_err(|err| {
3039            Error::new(format!(
3040                "Failed to convert property value {value} to int: {err}"
3041            ))
3042        })
3043    }
3044
3045    /// Retrieves a RocksDB property and casts it to an integer.
3046    ///
3047    /// Full list of properties that return int values could be find
3048    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
3049    pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
3050        Self::property_value_impl(
3051            name,
3052            |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
3053            Self::parse_property_int_value,
3054        )
3055    }
3056
3057    /// Retrieves a RocksDB property for a specific column family and casts it to an integer.
3058    ///
3059    /// Full list of properties that return int values could be find
3060    /// [here](https://github.com/facebook/rocksdb/blob/08809f5e6cd9cc4bc3958dd4d59457ae78c76660/include/rocksdb/db.h#L654-L689).
3061    pub fn property_int_value_cf(
3062        &self,
3063        cf: &impl AsColumnFamilyRef,
3064        name: impl CStrLike,
3065    ) -> Result<Option<u64>, Error> {
3066        Self::property_value_impl(
3067            name,
3068            |prop_name| unsafe {
3069                ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
3070            },
3071            Self::parse_property_int_value,
3072        )
3073    }
3074
3075    /// The sequence number of the most recent transaction.
3076    pub fn latest_sequence_number(&self) -> u64 {
3077        unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
3078    }
3079
3080    /// Return the approximate file system space used by keys in each ranges.
3081    ///
3082    /// Note that the returned sizes measure file system space usage, so
3083    /// if the user data compresses by a factor of ten, the returned
3084    /// sizes will be one-tenth the size of the corresponding user data size.
3085    ///
3086    /// Due to lack of abi, only data flushed to disk is taken into account.
3087    pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
3088        self.get_approximate_sizes_cfopt(None::<&ColumnFamily>, ranges)
3089    }
3090
3091    pub fn get_approximate_sizes_cf(
3092        &self,
3093        cf: &impl AsColumnFamilyRef,
3094        ranges: &[Range],
3095    ) -> Vec<u64> {
3096        self.get_approximate_sizes_cfopt(Some(cf), ranges)
3097    }
3098
3099    fn get_approximate_sizes_cfopt(
3100        &self,
3101        cf: Option<&impl AsColumnFamilyRef>,
3102        ranges: &[Range],
3103    ) -> Vec<u64> {
3104        let start_keys: Vec<*const c_char> = ranges
3105            .iter()
3106            .map(|x| x.start_key.as_ptr() as *const c_char)
3107            .collect();
3108        let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
3109        let end_keys: Vec<*const c_char> = ranges
3110            .iter()
3111            .map(|x| x.end_key.as_ptr() as *const c_char)
3112            .collect();
3113        let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
3114        let mut sizes: Vec<u64> = vec![0; ranges.len()];
3115        let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
3116            ranges.len() as i32,
3117            start_keys.as_ptr(),
3118            start_key_lens.as_ptr(),
3119            end_keys.as_ptr(),
3120            end_key_lens.as_ptr(),
3121            sizes.as_mut_ptr(),
3122        );
3123        let mut err: *mut c_char = ptr::null_mut();
3124        match cf {
3125            None => unsafe {
3126                ffi::rocksdb_approximate_sizes(
3127                    self.inner.inner(),
3128                    n,
3129                    start_key_ptr,
3130                    start_key_len_ptr,
3131                    end_key_ptr,
3132                    end_key_len_ptr,
3133                    size_ptr,
3134                    &raw mut err,
3135                );
3136            },
3137            Some(cf) => unsafe {
3138                ffi::rocksdb_approximate_sizes_cf(
3139                    self.inner.inner(),
3140                    cf.inner(),
3141                    n,
3142                    start_key_ptr,
3143                    start_key_len_ptr,
3144                    end_key_ptr,
3145                    end_key_len_ptr,
3146                    size_ptr,
3147                    &raw mut err,
3148                );
3149            },
3150        }
3151        sizes
3152    }
3153
3154    /// Iterate over batches of write operations since a given sequence.
3155    ///
3156    /// Produce an iterator that will provide the batches of write operations
3157    /// that have occurred since the given sequence (see
3158    /// `latest_sequence_number()`). Use the provided iterator to retrieve each
3159    /// (`u64`, `WriteBatch`) tuple, and then gather the individual puts and
3160    /// deletes using the `WriteBatch::iterate()` function.
3161    ///
3162    /// Calling `get_updates_since()` with a sequence number that is out of
3163    /// bounds will return an error.
3164    pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
3165        unsafe {
3166            // rocksdb_wal_readoptions_t does not appear to have any functions
3167            // for creating and destroying it; fortunately we can pass a nullptr
3168            // here to get the default behavior
3169            let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
3170            let iter = ffi_try!(ffi::rocksdb_get_updates_since(
3171                self.inner.inner(),
3172                seq_number,
3173                opts
3174            ));
3175            Ok(DBWALIterator {
3176                inner: iter,
3177                start_seq_number: seq_number,
3178            })
3179        }
3180    }
3181
3182    /// Tries to catch up with the primary by reading as much as possible from the
3183    /// log files.
3184    pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
3185        unsafe {
3186            ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
3187        }
3188        Ok(())
3189    }
3190
3191    /// Loads a list of external SST files created with SstFileWriter into the DB with default opts
3192    pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
3193        let opts = IngestExternalFileOptions::default();
3194        self.ingest_external_file_opts(&opts, paths)
3195    }
3196
3197    /// Loads a list of external SST files created with SstFileWriter into the DB
3198    pub fn ingest_external_file_opts<P: AsRef<Path>>(
3199        &self,
3200        opts: &IngestExternalFileOptions,
3201        paths: Vec<P>,
3202    ) -> Result<(), Error> {
3203        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
3204        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
3205
3206        self.ingest_external_file_raw(opts, &paths_v, &cpaths)
3207    }
3208
3209    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
3210    /// with default opts
3211    pub fn ingest_external_file_cf<P: AsRef<Path>>(
3212        &self,
3213        cf: &impl AsColumnFamilyRef,
3214        paths: Vec<P>,
3215    ) -> Result<(), Error> {
3216        let opts = IngestExternalFileOptions::default();
3217        self.ingest_external_file_cf_opts(cf, &opts, paths)
3218    }
3219
3220    /// Loads a list of external SST files created with SstFileWriter into the DB for given Column Family
3221    pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
3222        &self,
3223        cf: &impl AsColumnFamilyRef,
3224        opts: &IngestExternalFileOptions,
3225        paths: Vec<P>,
3226    ) -> Result<(), Error> {
3227        let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
3228        let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
3229
3230        self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
3231    }
3232
3233    fn ingest_external_file_raw(
3234        &self,
3235        opts: &IngestExternalFileOptions,
3236        paths_v: &[CString],
3237        cpaths: &[*const c_char],
3238    ) -> Result<(), Error> {
3239        unsafe {
3240            ffi_try!(ffi::rocksdb_ingest_external_file(
3241                self.inner.inner(),
3242                cpaths.as_ptr(),
3243                paths_v.len(),
3244                opts.inner.cast_const()
3245            ));
3246            Ok(())
3247        }
3248    }
3249
3250    fn ingest_external_file_raw_cf(
3251        &self,
3252        cf: &impl AsColumnFamilyRef,
3253        opts: &IngestExternalFileOptions,
3254        paths_v: &[CString],
3255        cpaths: &[*const c_char],
3256    ) -> Result<(), Error> {
3257        unsafe {
3258            ffi_try!(ffi::rocksdb_ingest_external_file_cf(
3259                self.inner.inner(),
3260                cf.inner(),
3261                cpaths.as_ptr(),
3262                paths_v.len(),
3263                opts.inner.cast_const()
3264            ));
3265            Ok(())
3266        }
3267    }
3268
3269    /// Obtains the LSM-tree meta data of the default column family of the DB
3270    pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
3271        unsafe {
3272            let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
3273
3274            let metadata = ColumnFamilyMetaData {
3275                size: ffi::rocksdb_column_family_metadata_get_size(ptr),
3276                name: from_cstr_and_free(ffi::rocksdb_column_family_metadata_get_name(ptr)),
3277                file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
3278            };
3279
3280            // destroy
3281            ffi::rocksdb_column_family_metadata_destroy(ptr);
3282
3283            // return
3284            metadata
3285        }
3286    }
3287
3288    /// Obtains the LSM-tree meta data of the specified column family of the DB
3289    pub fn get_column_family_metadata_cf(
3290        &self,
3291        cf: &impl AsColumnFamilyRef,
3292    ) -> ColumnFamilyMetaData {
3293        unsafe {
3294            let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
3295
3296            let metadata = ColumnFamilyMetaData {
3297                size: ffi::rocksdb_column_family_metadata_get_size(ptr),
3298                name: from_cstr_and_free(ffi::rocksdb_column_family_metadata_get_name(ptr)),
3299                file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
3300            };
3301
3302            // destroy
3303            ffi::rocksdb_column_family_metadata_destroy(ptr);
3304
3305            // return
3306            metadata
3307        }
3308    }
3309
3310    /// Returns a list of all table files with their level, start key
3311    /// and end key
3312    pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
3313        unsafe {
3314            let livefiles_ptr = ffi::rocksdb_livefiles(self.inner.inner());
3315            if livefiles_ptr.is_null() {
3316                Err(Error::new("Could not get live files".to_owned()))
3317            } else {
3318                let files = LiveFile::from_rocksdb_livefiles_ptr(livefiles_ptr);
3319
3320                // destroy livefiles metadata(s)
3321                ffi::rocksdb_livefiles_destroy(livefiles_ptr);
3322
3323                // return
3324                Ok(files)
3325            }
3326        }
3327    }
3328
3329    /// Delete sst files whose keys are entirely in the given range.
3330    ///
3331    /// Could leave some keys in the range which are in files which are not
3332    /// entirely in the range.
3333    ///
3334    /// Note: L0 files are left regardless of whether they're in the range.
3335    ///
3336    /// SnapshotWithThreadModes before the delete might not see the data in the given range.
3337    pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
3338        let from = from.as_ref();
3339        let to = to.as_ref();
3340        unsafe {
3341            ffi_try!(ffi::rocksdb_delete_file_in_range(
3342                self.inner.inner(),
3343                from.as_ptr() as *const c_char,
3344                from.len() as size_t,
3345                to.as_ptr() as *const c_char,
3346                to.len() as size_t,
3347            ));
3348            Ok(())
3349        }
3350    }
3351
3352    /// Same as `delete_file_in_range` but only for specific column family
3353    pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
3354        &self,
3355        cf: &impl AsColumnFamilyRef,
3356        from: K,
3357        to: K,
3358    ) -> Result<(), Error> {
3359        let from = from.as_ref();
3360        let to = to.as_ref();
3361        unsafe {
3362            ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
3363                self.inner.inner(),
3364                cf.inner(),
3365                from.as_ptr() as *const c_char,
3366                from.len() as size_t,
3367                to.as_ptr() as *const c_char,
3368                to.len() as size_t,
3369            ));
3370            Ok(())
3371        }
3372    }
3373
3374    /// Request stopping background work, if wait is true wait until it's done.
3375    pub fn cancel_all_background_work(&self, wait: bool) {
3376        unsafe {
3377            ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
3378        }
3379    }
3380
3381    fn drop_column_family<C>(
3382        &self,
3383        cf_inner: *mut ffi::rocksdb_column_family_handle_t,
3384        cf: C,
3385    ) -> Result<(), Error> {
3386        unsafe {
3387            // first mark the column family as dropped
3388            ffi_try!(ffi::rocksdb_drop_column_family(
3389                self.inner.inner(),
3390                cf_inner
3391            ));
3392        }
3393        // then finally reclaim any resources (mem, files) by destroying the only single column
3394        // family handle by drop()-ing it
3395        drop(cf);
3396        Ok(())
3397    }
3398
3399    /// Increase the full_history_ts of column family. The new ts_low value should
3400    /// be newer than current full_history_ts value.
3401    /// If another thread updates full_history_ts_low concurrently to a higher
3402    /// timestamp than the requested ts_low, a try again error will be returned.
3403    pub fn increase_full_history_ts_low<S: AsRef<[u8]>>(
3404        &self,
3405        cf: &impl AsColumnFamilyRef,
3406        ts: S,
3407    ) -> Result<(), Error> {
3408        let ts = ts.as_ref();
3409        unsafe {
3410            ffi_try!(ffi::rocksdb_increase_full_history_ts_low(
3411                self.inner.inner(),
3412                cf.inner(),
3413                ts.as_ptr() as *const c_char,
3414                ts.len() as size_t,
3415            ));
3416            Ok(())
3417        }
3418    }
3419
3420    /// Get current full_history_ts value.
3421    pub fn get_full_history_ts_low(&self, cf: &impl AsColumnFamilyRef) -> Result<Vec<u8>, Error> {
3422        unsafe {
3423            let mut ts_lowlen = 0;
3424            let ts = ffi_try!(ffi::rocksdb_get_full_history_ts_low(
3425                self.inner.inner(),
3426                cf.inner(),
3427                &raw mut ts_lowlen,
3428            ));
3429
3430            if ts.is_null() {
3431                Err(Error::new("Could not get full_history_ts_low".to_owned()))
3432            } else {
3433                let mut vec = vec![0; ts_lowlen];
3434                ptr::copy_nonoverlapping(ts as *mut u8, vec.as_mut_ptr(), ts_lowlen);
3435                ffi::rocksdb_free(ts as *mut c_void);
3436                Ok(vec)
3437            }
3438        }
3439    }
3440
3441    /// Returns the DB identity. This is typically ASCII bytes, but that is not guaranteed.
3442    pub fn get_db_identity(&self) -> Result<Vec<u8>, Error> {
3443        unsafe {
3444            let mut length: usize = 0;
3445            let identity_ptr = ffi::rocksdb_get_db_identity(self.inner.inner(), &raw mut length);
3446            let identity_vec = raw_data(identity_ptr, length);
3447            ffi::rocksdb_free(identity_ptr as *mut c_void);
3448            // In RocksDB: get_db_identity copies a std::string so it should not fail, but
3449            // the API allows it to be overridden, so it might
3450            identity_vec.ok_or_else(|| Error::new("get_db_identity returned NULL".to_string()))
3451        }
3452    }
3453}
3454
3455impl<I: DBInner> DBCommon<SingleThreaded, I> {
3456    /// Creates column family with given name and options
3457    pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
3458        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
3459        self.cfs
3460            .cfs
3461            .insert(name.as_ref().to_string(), ColumnFamily { inner });
3462        Ok(())
3463    }
3464
3465    #[doc = include_str!("db_create_column_family_with_import.md")]
3466    pub fn create_column_family_with_import<N: AsRef<str>>(
3467        &mut self,
3468        options: &Options,
3469        column_family_name: N,
3470        import_options: &ImportColumnFamilyOptions,
3471        metadata: &ExportImportFilesMetaData,
3472    ) -> Result<(), Error> {
3473        let name = column_family_name.as_ref();
3474        let c_name = CString::new(name).map_err(|err| {
3475            Error::new(format!(
3476                "Failed to convert name to CString while importing column family: {err}"
3477            ))
3478        })?;
3479        let inner = unsafe {
3480            ffi_try!(ffi::rocksdb_create_column_family_with_import(
3481                self.inner.inner(),
3482                options.inner,
3483                c_name.as_ptr(),
3484                import_options.inner,
3485                metadata.inner
3486            ))
3487        };
3488        self.cfs
3489            .cfs
3490            .insert(column_family_name.as_ref().into(), ColumnFamily { inner });
3491        Ok(())
3492    }
3493
3494    /// Drops the column family with the given name
3495    pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
3496        match self.cfs.cfs.remove(name) {
3497            Some(cf) => self.drop_column_family(cf.inner, cf),
3498            _ => Err(Error::new(format!("Invalid column family: {name}"))),
3499        }
3500    }
3501
3502    /// Returns the underlying column family handle
3503    pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
3504        self.cfs.cfs.get(name)
3505    }
3506
3507    /// Returns the list of column families currently open.
3508    ///
3509    /// The order of names is unspecified and may vary between calls.
3510    pub fn cf_names(&self) -> Vec<String> {
3511        self.cfs.cfs.keys().cloned().collect()
3512    }
3513}
3514
3515impl<I: DBInner> DBCommon<MultiThreaded, I> {
3516    /// Creates column family with given name and options
3517    pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
3518        // Note that we acquire the cfs lock before inserting: otherwise we might race
3519        // another caller who observed the handle as missing.
3520        let mut cfs = self.cfs.cfs.write();
3521        let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
3522        cfs.insert(
3523            name.as_ref().to_string(),
3524            Arc::new(UnboundColumnFamily { inner }),
3525        );
3526        Ok(())
3527    }
3528
3529    #[doc = include_str!("db_create_column_family_with_import.md")]
3530    pub fn create_column_family_with_import<N: AsRef<str>>(
3531        &self,
3532        options: &Options,
3533        column_family_name: N,
3534        import_options: &ImportColumnFamilyOptions,
3535        metadata: &ExportImportFilesMetaData,
3536    ) -> Result<(), Error> {
3537        // Acquire CF lock upfront, before creating the CF, to avoid a race with concurrent creators
3538        let mut cfs = self.cfs.cfs.write();
3539        let name = column_family_name.as_ref();
3540        let c_name = CString::new(name).map_err(|err| {
3541            Error::new(format!(
3542                "Failed to convert name to CString while importing column family: {err}"
3543            ))
3544        })?;
3545        let inner = unsafe {
3546            ffi_try!(ffi::rocksdb_create_column_family_with_import(
3547                self.inner.inner(),
3548                options.inner,
3549                c_name.as_ptr(),
3550                import_options.inner,
3551                metadata.inner
3552            ))
3553        };
3554        cfs.insert(
3555            column_family_name.as_ref().to_string(),
3556            Arc::new(UnboundColumnFamily { inner }),
3557        );
3558        Ok(())
3559    }
3560
3561    /// Drops the column family with the given name by internally locking the inner column
3562    /// family map. This avoids needing `&mut self` reference
3563    pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
3564        match self.cfs.cfs.write().remove(name) {
3565            Some(cf) => self.drop_column_family(cf.inner, cf),
3566            _ => Err(Error::new(format!("Invalid column family: {name}"))),
3567        }
3568    }
3569
3570    /// Returns the underlying column family handle
3571    pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
3572        self.cfs
3573            .cfs
3574            .read()
3575            .get(name)
3576            .cloned()
3577            .map(UnboundColumnFamily::bound_column_family)
3578    }
3579
3580    /// Returns the list of column families currently open.
3581    ///
3582    /// The order of names is unspecified and may vary between calls.
3583    pub fn cf_names(&self) -> Vec<String> {
3584        self.cfs.cfs.read().keys().cloned().collect()
3585    }
3586}
3587
3588impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
3589    fn drop(&mut self) {
3590        self.cfs.drop_all_cfs_internal();
3591    }
3592}
3593
3594impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
3595    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3596        write!(f, "RocksDB {{ path: {} }}", self.path().display())
3597    }
3598}
3599
3600/// The metadata that describes a column family.
3601#[derive(Debug, Clone)]
3602pub struct ColumnFamilyMetaData {
3603    // The size of this column family in bytes, which is equal to the sum of
3604    // the file size of its "levels".
3605    pub size: u64,
3606    // The name of the column family.
3607    pub name: String,
3608    // The number of files in this column family.
3609    pub file_count: usize,
3610}
3611
3612/// The metadata that describes a SST file
3613#[derive(Debug, Clone)]
3614pub struct LiveFile {
3615    /// Name of the column family the file belongs to
3616    pub column_family_name: String,
3617    /// Name of the file
3618    pub name: String,
3619    /// The directory containing the file, without a trailing '/'. This could be
3620    /// a DB path, wal_dir, etc.
3621    pub directory: String,
3622    /// Size of the file
3623    pub size: usize,
3624    /// Level at which this file resides
3625    pub level: i32,
3626    /// Smallest user defined key in the file
3627    pub start_key: Option<Vec<u8>>,
3628    /// Largest user defined key in the file
3629    pub end_key: Option<Vec<u8>>,
3630    pub smallest_seqno: u64,
3631    pub largest_seqno: u64,
3632    /// Number of entries/alive keys in the file
3633    pub num_entries: u64,
3634    /// Number of deletions/tomb key(s) in the file
3635    pub num_deletions: u64,
3636}
3637
3638impl LiveFile {
3639    /// Create a `Vec<LiveFile>` from a `rocksdb_livefiles_t` pointer
3640    pub(crate) fn from_rocksdb_livefiles_ptr(
3641        files: *const ffi::rocksdb_livefiles_t,
3642    ) -> Vec<LiveFile> {
3643        unsafe {
3644            let n = ffi::rocksdb_livefiles_count(files);
3645
3646            let mut livefiles = Vec::with_capacity(n as usize);
3647            let mut key_size: usize = 0;
3648
3649            for i in 0..n {
3650                // rocksdb_livefiles_* returns pointers to strings, not copies
3651                let column_family_name =
3652                    from_cstr_without_free(ffi::rocksdb_livefiles_column_family_name(files, i));
3653                let name = from_cstr_without_free(ffi::rocksdb_livefiles_name(files, i));
3654                let directory = from_cstr_without_free(ffi::rocksdb_livefiles_directory(files, i));
3655                let size = ffi::rocksdb_livefiles_size(files, i);
3656                let level = ffi::rocksdb_livefiles_level(files, i);
3657
3658                // get smallest key inside file
3659                let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &raw mut key_size);
3660                let smallest_key = raw_data(smallest_key, key_size);
3661
3662                // get largest key inside file
3663                let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &raw mut key_size);
3664                let largest_key = raw_data(largest_key, key_size);
3665
3666                livefiles.push(LiveFile {
3667                    column_family_name,
3668                    name,
3669                    directory,
3670                    size,
3671                    level,
3672                    start_key: smallest_key,
3673                    end_key: largest_key,
3674                    largest_seqno: ffi::rocksdb_livefiles_largest_seqno(files, i),
3675                    smallest_seqno: ffi::rocksdb_livefiles_smallest_seqno(files, i),
3676                    num_entries: ffi::rocksdb_livefiles_entries(files, i),
3677                    num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
3678                });
3679            }
3680
3681            livefiles
3682        }
3683    }
3684}
3685
3686struct LiveFileGuard(*mut rocksdb_livefile_t);
3687
3688impl LiveFileGuard {
3689    fn into_raw(mut self) -> *mut rocksdb_livefile_t {
3690        let ptr = self.0;
3691        self.0 = ptr::null_mut();
3692        ptr
3693    }
3694}
3695
3696impl Drop for LiveFileGuard {
3697    fn drop(&mut self) {
3698        if !self.0.is_null() {
3699            unsafe {
3700                rocksdb_livefile_destroy(self.0);
3701            }
3702        }
3703    }
3704}
3705
3706struct LiveFilesGuard(*mut rocksdb_livefiles_t);
3707
3708impl LiveFilesGuard {
3709    fn into_raw(mut self) -> *mut rocksdb_livefiles_t {
3710        let ptr = self.0;
3711        self.0 = ptr::null_mut();
3712        ptr
3713    }
3714}
3715
3716impl Drop for LiveFilesGuard {
3717    fn drop(&mut self) {
3718        if !self.0.is_null() {
3719            unsafe {
3720                rocksdb_livefiles_destroy(self.0);
3721            }
3722        }
3723    }
3724}
3725
3726/// Metadata returned as output from [`Checkpoint::export_column_family`][export_column_family] and
3727/// used as input to [`DB::create_column_family_with_import`].
3728///
3729/// [export_column_family]: crate::checkpoint::Checkpoint::export_column_family
3730#[derive(Debug)]
3731pub struct ExportImportFilesMetaData {
3732    pub(crate) inner: *mut ffi::rocksdb_export_import_files_metadata_t,
3733}
3734
3735impl ExportImportFilesMetaData {
3736    pub fn get_db_comparator_name(&self) -> String {
3737        unsafe {
3738            let c_name =
3739                ffi::rocksdb_export_import_files_metadata_get_db_comparator_name(self.inner);
3740            from_cstr_and_free(c_name)
3741        }
3742    }
3743
3744    pub fn set_db_comparator_name(&mut self, name: &str) {
3745        let c_name = CString::new(name.as_bytes()).unwrap();
3746        unsafe {
3747            ffi::rocksdb_export_import_files_metadata_set_db_comparator_name(
3748                self.inner,
3749                c_name.as_ptr(),
3750            );
3751        };
3752    }
3753
3754    pub fn get_files(&self) -> Vec<LiveFile> {
3755        unsafe {
3756            let livefiles_ptr = ffi::rocksdb_export_import_files_metadata_get_files(self.inner);
3757            let files = LiveFile::from_rocksdb_livefiles_ptr(livefiles_ptr);
3758            ffi::rocksdb_livefiles_destroy(livefiles_ptr);
3759            files
3760        }
3761    }
3762
3763    pub fn set_files(&mut self, files: &[LiveFile]) -> Result<(), Error> {
3764        // Use a non-null empty pointer for zero-length keys
3765        static EMPTY: [u8; 0] = [];
3766        let empty_ptr = EMPTY.as_ptr() as *const libc::c_char;
3767
3768        unsafe {
3769            let live_files = LiveFilesGuard(ffi::rocksdb_livefiles_create());
3770
3771            for file in files {
3772                let live_file = LiveFileGuard(ffi::rocksdb_livefile_create());
3773                ffi::rocksdb_livefile_set_level(live_file.0, file.level);
3774
3775                // SAFETY: C strings are copied inside the FFI layer so do not need to be kept alive
3776                let c_cf_name = CString::new(file.column_family_name.as_str()).map_err(|err| {
3777                    Error::new(format!("Unable to convert column family to CString: {err}"))
3778                })?;
3779                ffi::rocksdb_livefile_set_column_family_name(live_file.0, c_cf_name.as_ptr());
3780
3781                let c_name = CString::new(file.name.as_str()).map_err(|err| {
3782                    Error::new(format!("Unable to convert file name to CString: {err}"))
3783                })?;
3784                ffi::rocksdb_livefile_set_name(live_file.0, c_name.as_ptr());
3785
3786                let c_directory = CString::new(file.directory.as_str()).map_err(|err| {
3787                    Error::new(format!("Unable to convert directory to CString: {err}"))
3788                })?;
3789                ffi::rocksdb_livefile_set_directory(live_file.0, c_directory.as_ptr());
3790
3791                ffi::rocksdb_livefile_set_size(live_file.0, file.size);
3792
3793                let (start_key_ptr, start_key_len) = match &file.start_key {
3794                    None => (empty_ptr, 0),
3795                    Some(key) => (key.as_ptr() as *const libc::c_char, key.len()),
3796                };
3797                ffi::rocksdb_livefile_set_smallest_key(live_file.0, start_key_ptr, start_key_len);
3798
3799                let (largest_key_ptr, largest_key_len) = match &file.end_key {
3800                    None => (empty_ptr, 0),
3801                    Some(key) => (key.as_ptr() as *const libc::c_char, key.len()),
3802                };
3803                ffi::rocksdb_livefile_set_largest_key(
3804                    live_file.0,
3805                    largest_key_ptr,
3806                    largest_key_len,
3807                );
3808                ffi::rocksdb_livefile_set_smallest_seqno(live_file.0, file.smallest_seqno);
3809                ffi::rocksdb_livefile_set_largest_seqno(live_file.0, file.largest_seqno);
3810                ffi::rocksdb_livefile_set_num_entries(live_file.0, file.num_entries);
3811                ffi::rocksdb_livefile_set_num_deletions(live_file.0, file.num_deletions);
3812
3813                // moves ownership of live_files into live_file
3814                ffi::rocksdb_livefiles_add(live_files.0, live_file.into_raw());
3815            }
3816
3817            // moves ownership of live_files into inner
3818            ffi::rocksdb_export_import_files_metadata_set_files(self.inner, live_files.into_raw());
3819            Ok(())
3820        }
3821    }
3822}
3823
3824impl Default for ExportImportFilesMetaData {
3825    fn default() -> Self {
3826        let inner = unsafe { ffi::rocksdb_export_import_files_metadata_create() };
3827        assert!(
3828            !inner.is_null(),
3829            "Could not create rocksdb_export_import_files_metadata_t"
3830        );
3831
3832        Self { inner }
3833    }
3834}
3835
3836impl Drop for ExportImportFilesMetaData {
3837    fn drop(&mut self) {
3838        unsafe {
3839            ffi::rocksdb_export_import_files_metadata_destroy(self.inner);
3840        }
3841    }
3842}
3843
3844unsafe impl Send for ExportImportFilesMetaData {}
3845unsafe impl Sync for ExportImportFilesMetaData {}
3846
3847fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
3848    opts.iter()
3849        .map(|(name, value)| {
3850            let cname = match CString::new(name.as_bytes()) {
3851                Ok(cname) => cname,
3852                Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
3853            };
3854            let cvalue = match CString::new(value.as_bytes()) {
3855                Ok(cvalue) => cvalue,
3856                Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
3857            };
3858            Ok((cname, cvalue))
3859        })
3860        .collect()
3861}
3862
3863pub(crate) fn convert_values(
3864    values: Vec<*mut c_char>,
3865    values_sizes: Vec<usize>,
3866    errors: Vec<*mut c_char>,
3867) -> Vec<Result<Option<Vec<u8>>, Error>> {
3868    values
3869        .into_iter()
3870        .zip(values_sizes)
3871        .zip(errors)
3872        .map(|((v, s), e)| {
3873            if e.is_null() {
3874                let value = unsafe { crate::ffi_util::raw_data(v, s) };
3875                unsafe {
3876                    ffi::rocksdb_free(v as *mut c_void);
3877                }
3878                Ok(value)
3879            } else {
3880                Err(convert_rocksdb_error(e))
3881            }
3882        })
3883        .collect()
3884}