1use std::cell::RefCell;
17use std::collections::{BTreeMap, HashMap};
18use std::ffi::{CStr, CString};
19use std::fmt;
20use std::fs;
21use std::iter;
22use std::path::Path;
23use std::path::PathBuf;
24use std::ptr;
25use std::slice;
26use std::str;
27use std::sync::Arc;
28use std::time::Duration;
29
30use crate::column_family::ColumnFamilyTtl;
31use crate::ffi_util::CSlice;
32use crate::{
33 ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
34 DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, DEFAULT_COLUMN_FAMILY_NAME,
35 Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, Options, ReadOptions,
36 SnapshotWithThreadMode, WaitForCompactOptions, WriteBatch, WriteBatchWithIndex, WriteOptions,
37 column_family::{AsColumnFamilyRef, BoundColumnFamily, UnboundColumnFamily},
38 db_options::{ImportColumnFamilyOptions, OptionsMustOutliveDB},
39 ffi,
40 ffi_util::{
41 CStrLike, convert_rocksdb_error, from_cstr_and_free, from_cstr_without_free,
42 opt_bytes_to_ptr, raw_data, to_cpath,
43 },
44};
45use rust_librocksdb_sys::{
46 rocksdb_livefile_destroy, rocksdb_livefile_t, rocksdb_livefiles_destroy, rocksdb_livefiles_t,
47};
48
49use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
50use parking_lot::RwLock;
51
52thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
59thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
60thread_local! { static DEFAULT_FLUSH_OPTS: FlushOptions = FlushOptions::default(); }
61thread_local! { static PREFIX_READ_OPTS: RefCell<ReadOptions> = RefCell::new({ let mut o = ReadOptions::default(); o.set_prefix_same_as_start(true); o }); }
63
64pub struct Range<'a> {
68 start_key: &'a [u8],
69 end_key: &'a [u8],
70}
71
72impl<'a> Range<'a> {
73 pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
74 Range { start_key, end_key }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
117pub enum GetIntoBufferResult {
118 NotFound,
120 Found(usize),
123 BufferTooSmall(usize),
129}
130
131impl GetIntoBufferResult {
132 #[inline]
134 pub fn is_found(&self) -> bool {
135 matches!(self, Self::Found(_) | Self::BufferTooSmall(_))
136 }
137
138 #[inline]
140 pub fn is_not_found(&self) -> bool {
141 matches!(self, Self::NotFound)
142 }
143
144 #[inline]
146 pub fn value_size(&self) -> Option<usize> {
147 match self {
148 Self::Found(size) | Self::BufferTooSmall(size) => Some(*size),
149 Self::NotFound => None,
150 }
151 }
152}
153
154pub struct PrefixProber<'a, D: DBAccess> {
158 raw: DBRawIteratorWithThreadMode<'a, D>,
159}
160
161impl<D: DBAccess> PrefixProber<'_, D> {
162 pub fn exists(&mut self, prefix: &[u8]) -> Result<bool, Error> {
165 self.raw.seek(prefix);
166 if self.raw.valid()
167 && let Some(k) = self.raw.key()
168 {
169 return Ok(k.starts_with(prefix));
170 }
171 self.raw.status()?;
172 Ok(false)
173 }
174}
175
176pub trait ThreadMode {
187 fn new_cf_map_internal(
189 cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
190 ) -> Self;
191 fn drop_all_cfs_internal(&mut self);
193}
194
195pub struct SingleThreaded {
202 pub(crate) cfs: HashMap<String, ColumnFamily>,
203}
204
205pub struct MultiThreaded {
211 pub(crate) cfs: RwLock<HashMap<String, Arc<UnboundColumnFamily>>>,
212}
213
214impl ThreadMode for SingleThreaded {
215 fn new_cf_map_internal(
216 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
217 ) -> Self {
218 Self {
219 cfs: cfs
220 .into_iter()
221 .map(|(n, c)| (n, ColumnFamily { inner: c }))
222 .collect(),
223 }
224 }
225
226 fn drop_all_cfs_internal(&mut self) {
227 self.cfs.clear();
229 }
230}
231
232impl ThreadMode for MultiThreaded {
233 fn new_cf_map_internal(
234 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
235 ) -> Self {
236 Self {
237 cfs: RwLock::new(
238 cfs.into_iter()
239 .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
240 .collect(),
241 ),
242 }
243 }
244
245 fn drop_all_cfs_internal(&mut self) {
246 self.cfs.write().clear();
248 }
249}
250
251pub trait DBInner {
253 fn inner(&self) -> *mut ffi::rocksdb_t;
254}
255
256pub struct DBCommon<T: ThreadMode, D: DBInner> {
261 pub(crate) inner: D,
262 cfs: T, path: PathBuf,
264 _outlive: Vec<OptionsMustOutliveDB>,
265}
266
267pub trait DBAccess {
270 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
271
272 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
273
274 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
275
276 unsafe fn create_iterator_cf(
277 &self,
278 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
279 readopts: &ReadOptions,
280 ) -> *mut ffi::rocksdb_iterator_t;
281
282 fn get_opt<K: AsRef<[u8]>>(
283 &self,
284 key: K,
285 readopts: &ReadOptions,
286 ) -> Result<Option<Vec<u8>>, Error>;
287
288 fn get_cf_opt<K: AsRef<[u8]>>(
289 &self,
290 cf: &impl AsColumnFamilyRef,
291 key: K,
292 readopts: &ReadOptions,
293 ) -> Result<Option<Vec<u8>>, Error>;
294
295 fn get_pinned_opt<K: AsRef<[u8]>>(
296 &'_ self,
297 key: K,
298 readopts: &ReadOptions,
299 ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
300
301 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
302 &'_ self,
303 cf: &impl AsColumnFamilyRef,
304 key: K,
305 readopts: &ReadOptions,
306 ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
307
308 fn multi_get_opt<K, I>(
309 &self,
310 keys: I,
311 readopts: &ReadOptions,
312 ) -> Vec<Result<Option<Vec<u8>>, Error>>
313 where
314 K: AsRef<[u8]>,
315 I: IntoIterator<Item = K>;
316
317 fn multi_get_cf_opt<'b, K, I, W>(
318 &self,
319 keys_cf: I,
320 readopts: &ReadOptions,
321 ) -> Vec<Result<Option<Vec<u8>>, Error>>
322 where
323 K: AsRef<[u8]>,
324 I: IntoIterator<Item = (&'b W, K)>,
325 W: AsColumnFamilyRef + 'b;
326}
327
328impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
329 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
330 unsafe { ffi::rocksdb_create_snapshot(self.inner.inner()) }
331 }
332
333 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
334 unsafe {
335 ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
336 }
337 }
338
339 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
340 unsafe { ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner) }
341 }
342
343 unsafe fn create_iterator_cf(
344 &self,
345 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
346 readopts: &ReadOptions,
347 ) -> *mut ffi::rocksdb_iterator_t {
348 unsafe { ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle) }
349 }
350
351 fn get_opt<K: AsRef<[u8]>>(
352 &self,
353 key: K,
354 readopts: &ReadOptions,
355 ) -> Result<Option<Vec<u8>>, Error> {
356 self.get_opt(key, readopts)
357 }
358
359 fn get_cf_opt<K: AsRef<[u8]>>(
360 &self,
361 cf: &impl AsColumnFamilyRef,
362 key: K,
363 readopts: &ReadOptions,
364 ) -> Result<Option<Vec<u8>>, Error> {
365 self.get_cf_opt(cf, key, readopts)
366 }
367
368 fn get_pinned_opt<K: AsRef<[u8]>>(
369 &'_ self,
370 key: K,
371 readopts: &ReadOptions,
372 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
373 self.get_pinned_opt(key, readopts)
374 }
375
376 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
377 &'_ self,
378 cf: &impl AsColumnFamilyRef,
379 key: K,
380 readopts: &ReadOptions,
381 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
382 self.get_pinned_cf_opt(cf, key, readopts)
383 }
384
385 fn multi_get_opt<K, Iter>(
386 &self,
387 keys: Iter,
388 readopts: &ReadOptions,
389 ) -> Vec<Result<Option<Vec<u8>>, Error>>
390 where
391 K: AsRef<[u8]>,
392 Iter: IntoIterator<Item = K>,
393 {
394 self.multi_get_opt(keys, readopts)
395 }
396
397 fn multi_get_cf_opt<'b, K, Iter, W>(
398 &self,
399 keys_cf: Iter,
400 readopts: &ReadOptions,
401 ) -> Vec<Result<Option<Vec<u8>>, Error>>
402 where
403 K: AsRef<[u8]>,
404 Iter: IntoIterator<Item = (&'b W, K)>,
405 W: AsColumnFamilyRef + 'b,
406 {
407 self.multi_get_cf_opt(keys_cf, readopts)
408 }
409}
410
411pub struct DBWithThreadModeInner {
412 inner: *mut ffi::rocksdb_t,
413}
414
415impl DBInner for DBWithThreadModeInner {
416 fn inner(&self) -> *mut ffi::rocksdb_t {
417 self.inner
418 }
419}
420
421impl Drop for DBWithThreadModeInner {
422 fn drop(&mut self) {
423 unsafe {
424 ffi::rocksdb_close(self.inner);
425 }
426 }
427}
428
429pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
434
435#[cfg(not(feature = "multi-threaded-cf"))]
458pub type DB = DBWithThreadMode<SingleThreaded>;
459
460#[cfg(feature = "multi-threaded-cf")]
461pub type DB = DBWithThreadMode<MultiThreaded>;
462
463unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
467
468unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
471
472enum AccessType<'a> {
474 ReadWrite,
475 ReadOnly { error_if_log_file_exist: bool },
476 Secondary { secondary_path: &'a Path },
477 WithTTL { ttl: Duration },
478}
479
480impl<T: ThreadMode> DBWithThreadMode<T> {
482 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
484 let mut opts = Options::default();
485 opts.create_if_missing(true);
486 Self::open(&opts, path)
487 }
488
489 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
491 Self::open_cf(opts, path, None::<&str>)
492 }
493
494 pub fn open_for_read_only<P: AsRef<Path>>(
496 opts: &Options,
497 path: P,
498 error_if_log_file_exist: bool,
499 ) -> Result<Self, Error> {
500 Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
501 }
502
503 pub fn open_as_secondary<P: AsRef<Path>>(
505 opts: &Options,
506 primary_path: P,
507 secondary_path: P,
508 ) -> Result<Self, Error> {
509 Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
510 }
511
512 pub fn open_with_ttl<P: AsRef<Path>>(
517 opts: &Options,
518 path: P,
519 ttl: Duration,
520 ) -> Result<Self, Error> {
521 Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
522 }
523
524 pub fn open_cf_with_ttl<P, I, N>(
528 opts: &Options,
529 path: P,
530 cfs: I,
531 ttl: Duration,
532 ) -> Result<Self, Error>
533 where
534 P: AsRef<Path>,
535 I: IntoIterator<Item = N>,
536 N: AsRef<str>,
537 {
538 let cfs = cfs
539 .into_iter()
540 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
541
542 Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
543 }
544
545 pub fn open_cf_descriptors_with_ttl<P, I>(
559 opts: &Options,
560 path: P,
561 cfs: I,
562 ttl: Duration,
563 ) -> Result<Self, Error>
564 where
565 P: AsRef<Path>,
566 I: IntoIterator<Item = ColumnFamilyDescriptor>,
567 {
568 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
569 }
570
571 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
575 where
576 P: AsRef<Path>,
577 I: IntoIterator<Item = N>,
578 N: AsRef<str>,
579 {
580 let cfs = cfs
581 .into_iter()
582 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
583
584 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
585 }
586
587 pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
591 where
592 P: AsRef<Path>,
593 I: IntoIterator<Item = (N, Options)>,
594 N: AsRef<str>,
595 {
596 let cfs = cfs
597 .into_iter()
598 .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
599
600 Self::open_cf_descriptors(opts, path, cfs)
601 }
602
603 pub fn open_cf_for_read_only<P, I, N>(
607 opts: &Options,
608 path: P,
609 cfs: I,
610 error_if_log_file_exist: bool,
611 ) -> Result<Self, Error>
612 where
613 P: AsRef<Path>,
614 I: IntoIterator<Item = N>,
615 N: AsRef<str>,
616 {
617 let cfs = cfs
618 .into_iter()
619 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
620
621 Self::open_cf_descriptors_internal(
622 opts,
623 path,
624 cfs,
625 &AccessType::ReadOnly {
626 error_if_log_file_exist,
627 },
628 )
629 }
630
631 pub fn open_cf_with_opts_for_read_only<P, I, N>(
635 db_opts: &Options,
636 path: P,
637 cfs: I,
638 error_if_log_file_exist: bool,
639 ) -> Result<Self, Error>
640 where
641 P: AsRef<Path>,
642 I: IntoIterator<Item = (N, Options)>,
643 N: AsRef<str>,
644 {
645 let cfs = cfs
646 .into_iter()
647 .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
648
649 Self::open_cf_descriptors_internal(
650 db_opts,
651 path,
652 cfs,
653 &AccessType::ReadOnly {
654 error_if_log_file_exist,
655 },
656 )
657 }
658
659 pub fn open_cf_descriptors_read_only<P, I>(
664 opts: &Options,
665 path: P,
666 cfs: I,
667 error_if_log_file_exist: bool,
668 ) -> Result<Self, Error>
669 where
670 P: AsRef<Path>,
671 I: IntoIterator<Item = ColumnFamilyDescriptor>,
672 {
673 Self::open_cf_descriptors_internal(
674 opts,
675 path,
676 cfs,
677 &AccessType::ReadOnly {
678 error_if_log_file_exist,
679 },
680 )
681 }
682
683 pub fn open_cf_as_secondary<P, I, N>(
687 opts: &Options,
688 primary_path: P,
689 secondary_path: P,
690 cfs: I,
691 ) -> Result<Self, Error>
692 where
693 P: AsRef<Path>,
694 I: IntoIterator<Item = N>,
695 N: AsRef<str>,
696 {
697 let cfs = cfs
698 .into_iter()
699 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
700
701 Self::open_cf_descriptors_internal(
702 opts,
703 primary_path,
704 cfs,
705 &AccessType::Secondary {
706 secondary_path: secondary_path.as_ref(),
707 },
708 )
709 }
710
711 pub fn open_cf_descriptors_as_secondary<P, I>(
716 opts: &Options,
717 path: P,
718 secondary_path: P,
719 cfs: I,
720 ) -> Result<Self, Error>
721 where
722 P: AsRef<Path>,
723 I: IntoIterator<Item = ColumnFamilyDescriptor>,
724 {
725 Self::open_cf_descriptors_internal(
726 opts,
727 path,
728 cfs,
729 &AccessType::Secondary {
730 secondary_path: secondary_path.as_ref(),
731 },
732 )
733 }
734
735 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
739 where
740 P: AsRef<Path>,
741 I: IntoIterator<Item = ColumnFamilyDescriptor>,
742 {
743 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
744 }
745
746 fn open_cf_descriptors_internal<P, I>(
748 opts: &Options,
749 path: P,
750 cfs: I,
751 access_type: &AccessType,
752 ) -> Result<Self, Error>
753 where
754 P: AsRef<Path>,
755 I: IntoIterator<Item = ColumnFamilyDescriptor>,
756 {
757 let cfs: Vec<_> = cfs.into_iter().collect();
758 let outlive = iter::once(opts.outlive.clone())
759 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
760 .collect();
761
762 let cpath = to_cpath(&path)?;
763
764 if let Err(e) = fs::create_dir_all(&path) {
765 return Err(Error::new(format!(
766 "Failed to create RocksDB directory: `{e:?}`."
767 )));
768 }
769
770 let db: *mut ffi::rocksdb_t;
771 let mut cf_map = BTreeMap::new();
772
773 if cfs.is_empty() {
774 db = Self::open_raw(opts, &cpath, access_type)?;
775 } else {
776 let mut cfs_v = cfs;
777 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
779 cfs_v.push(ColumnFamilyDescriptor {
780 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
781 options: Options::default(),
782 ttl: ColumnFamilyTtl::SameAsDb,
783 });
784 }
785 let c_cfs: Vec<CString> = cfs_v
788 .iter()
789 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
790 .collect();
791
792 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
793
794 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
796
797 let cfopts: Vec<_> = cfs_v
798 .iter()
799 .map(|cf| cf.options.inner.cast_const())
800 .collect();
801
802 db = Self::open_cf_raw(
803 opts,
804 &cpath,
805 &cfs_v,
806 &cfnames,
807 &cfopts,
808 &mut cfhandles,
809 access_type,
810 )?;
811 for handle in &cfhandles {
812 if handle.is_null() {
813 return Err(Error::new(
814 "Received null column family handle from DB.".to_owned(),
815 ));
816 }
817 }
818
819 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
820 cf_map.insert(cf_desc.name.clone(), inner);
821 }
822 }
823
824 if db.is_null() {
825 return Err(Error::new("Could not initialize database.".to_owned()));
826 }
827
828 Ok(Self {
829 inner: DBWithThreadModeInner { inner: db },
830 path: path.as_ref().to_path_buf(),
831 cfs: T::new_cf_map_internal(cf_map),
832 _outlive: outlive,
833 })
834 }
835
836 fn open_raw(
837 opts: &Options,
838 cpath: &CString,
839 access_type: &AccessType,
840 ) -> Result<*mut ffi::rocksdb_t, Error> {
841 let db = unsafe {
842 match *access_type {
843 AccessType::ReadOnly {
844 error_if_log_file_exist,
845 } => ffi_try!(ffi::rocksdb_open_for_read_only(
846 opts.inner,
847 cpath.as_ptr(),
848 c_uchar::from(error_if_log_file_exist),
849 )),
850 AccessType::ReadWrite => {
851 ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
852 }
853 AccessType::Secondary { secondary_path } => {
854 ffi_try!(ffi::rocksdb_open_as_secondary(
855 opts.inner,
856 cpath.as_ptr(),
857 to_cpath(secondary_path)?.as_ptr(),
858 ))
859 }
860 AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
861 opts.inner,
862 cpath.as_ptr(),
863 ttl.as_secs() as c_int,
864 )),
865 }
866 };
867 Ok(db)
868 }
869
870 #[allow(clippy::pedantic)]
871 fn open_cf_raw(
872 opts: &Options,
873 cpath: &CString,
874 cfs_v: &[ColumnFamilyDescriptor],
875 cfnames: &[*const c_char],
876 cfopts: &[*const ffi::rocksdb_options_t],
877 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
878 access_type: &AccessType,
879 ) -> Result<*mut ffi::rocksdb_t, Error> {
880 let db = unsafe {
881 match *access_type {
882 AccessType::ReadOnly {
883 error_if_log_file_exist,
884 } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
885 opts.inner,
886 cpath.as_ptr(),
887 cfs_v.len() as c_int,
888 cfnames.as_ptr(),
889 cfopts.as_ptr(),
890 cfhandles.as_mut_ptr(),
891 c_uchar::from(error_if_log_file_exist),
892 )),
893 AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
894 opts.inner,
895 cpath.as_ptr(),
896 cfs_v.len() as c_int,
897 cfnames.as_ptr(),
898 cfopts.as_ptr(),
899 cfhandles.as_mut_ptr(),
900 )),
901 AccessType::Secondary { secondary_path } => {
902 ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
903 opts.inner,
904 cpath.as_ptr(),
905 to_cpath(secondary_path)?.as_ptr(),
906 cfs_v.len() as c_int,
907 cfnames.as_ptr(),
908 cfopts.as_ptr(),
909 cfhandles.as_mut_ptr(),
910 ))
911 }
912 AccessType::WithTTL { ttl } => {
913 let ttls: Vec<_> = cfs_v
914 .iter()
915 .map(|cf| match cf.ttl {
916 ColumnFamilyTtl::Disabled => i32::MAX,
917 ColumnFamilyTtl::Duration(duration) => duration.as_secs() as i32,
918 ColumnFamilyTtl::SameAsDb => ttl.as_secs() as i32,
919 })
920 .collect();
921
922 ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
923 opts.inner,
924 cpath.as_ptr(),
925 cfs_v.len() as c_int,
926 cfnames.as_ptr(),
927 cfopts.as_ptr(),
928 cfhandles.as_mut_ptr(),
929 ttls.as_ptr(),
930 ))
931 }
932 }
933 };
934 Ok(db)
935 }
936
937 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
939 &self,
940 cf: &impl AsColumnFamilyRef,
941 from: K,
942 to: K,
943 writeopts: &WriteOptions,
944 ) -> Result<(), Error> {
945 let from = from.as_ref();
946 let to = to.as_ref();
947
948 unsafe {
949 ffi_try!(ffi::rocksdb_delete_range_cf(
950 self.inner.inner(),
951 writeopts.inner,
952 cf.inner(),
953 from.as_ptr() as *const c_char,
954 from.len() as size_t,
955 to.as_ptr() as *const c_char,
956 to.len() as size_t,
957 ));
958 Ok(())
959 }
960 }
961
962 pub fn delete_range_cf<K: AsRef<[u8]>>(
964 &self,
965 cf: &impl AsColumnFamilyRef,
966 from: K,
967 to: K,
968 ) -> Result<(), Error> {
969 DEFAULT_WRITE_OPTS.with(|opts| self.delete_range_cf_opt(cf, from, to, opts))
970 }
971
972 pub fn write_opt(&self, batch: &WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
973 unsafe {
974 ffi_try!(ffi::rocksdb_write(
975 self.inner.inner(),
976 writeopts.inner,
977 batch.inner
978 ));
979 }
980 Ok(())
981 }
982
983 pub fn write(&self, batch: &WriteBatch) -> Result<(), Error> {
984 DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
985 }
986
987 pub fn write_without_wal(&self, batch: &WriteBatch) -> Result<(), Error> {
988 let mut wo = WriteOptions::new();
989 wo.disable_wal(true);
990 self.write_opt(batch, &wo)
991 }
992
993 pub fn write_wbwi(&self, wbwi: &WriteBatchWithIndex) -> Result<(), Error> {
994 DEFAULT_WRITE_OPTS.with(|opts| self.write_wbwi_opt(wbwi, opts))
995 }
996
997 pub fn write_wbwi_opt(
998 &self,
999 wbwi: &WriteBatchWithIndex,
1000 writeopts: &WriteOptions,
1001 ) -> Result<(), Error> {
1002 unsafe {
1003 ffi_try!(ffi::rocksdb_write_writebatch_wi(
1004 self.inner.inner(),
1005 writeopts.inner,
1006 wbwi.inner
1007 ));
1008
1009 Ok(())
1010 }
1011 }
1012
1013 pub fn disable_file_deletions(&self) -> Result<(), Error> {
1018 unsafe {
1019 ffi_try!(ffi::rocksdb_disable_file_deletions(self.inner.inner()));
1020 }
1021 Ok(())
1022 }
1023
1024 pub fn enable_file_deletions(&self) -> Result<(), Error> {
1036 unsafe {
1037 ffi_try!(ffi::rocksdb_enable_file_deletions(self.inner.inner()));
1038 }
1039 Ok(())
1040 }
1041}
1042
1043impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
1045 pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
1046 Self {
1047 inner,
1048 cfs,
1049 path,
1050 _outlive: outlive,
1051 }
1052 }
1053
1054 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
1055 let cpath = to_cpath(path)?;
1056 let mut length = 0;
1057
1058 unsafe {
1059 let ptr = ffi_try!(ffi::rocksdb_list_column_families(
1060 opts.inner,
1061 cpath.as_ptr(),
1062 &raw mut length,
1063 ));
1064
1065 let vec = slice::from_raw_parts(ptr, length)
1066 .iter()
1067 .map(|ptr| from_cstr_without_free(*ptr))
1068 .collect();
1069 ffi::rocksdb_list_column_families_destroy(ptr, length);
1070 Ok(vec)
1071 }
1072 }
1073
1074 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
1075 let cpath = to_cpath(path)?;
1076 unsafe {
1077 ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
1078 }
1079 Ok(())
1080 }
1081
1082 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
1083 let cpath = to_cpath(path)?;
1084 unsafe {
1085 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
1086 }
1087 Ok(())
1088 }
1089
1090 pub fn path(&self) -> &Path {
1091 self.path.as_path()
1092 }
1093
1094 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
1097 unsafe {
1098 ffi_try!(ffi::rocksdb_flush_wal(
1099 self.inner.inner(),
1100 c_uchar::from(sync)
1101 ));
1102 }
1103 Ok(())
1104 }
1105
1106 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
1108 unsafe {
1109 ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
1110 }
1111 Ok(())
1112 }
1113
1114 pub fn flush(&self) -> Result<(), Error> {
1116 self.flush_opt(&FlushOptions::default())
1117 }
1118
1119 pub fn flush_cf_opt(
1121 &self,
1122 cf: &impl AsColumnFamilyRef,
1123 flushopts: &FlushOptions,
1124 ) -> Result<(), Error> {
1125 unsafe {
1126 ffi_try!(ffi::rocksdb_flush_cf(
1127 self.inner.inner(),
1128 flushopts.inner,
1129 cf.inner()
1130 ));
1131 }
1132 Ok(())
1133 }
1134
1135 pub fn flush_cfs_opt(
1141 &self,
1142 cfs: &[&impl AsColumnFamilyRef],
1143 opts: &FlushOptions,
1144 ) -> Result<(), Error> {
1145 let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
1146 unsafe {
1147 ffi_try!(ffi::rocksdb_flush_cfs(
1148 self.inner.inner(),
1149 opts.inner,
1150 cfs.as_mut_ptr(),
1151 cfs.len() as libc::c_int,
1152 ));
1153 }
1154 Ok(())
1155 }
1156
1157 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
1160 DEFAULT_FLUSH_OPTS.with(|opts| self.flush_cf_opt(cf, opts))
1161 }
1162
1163 pub fn get_opt<K: AsRef<[u8]>>(
1167 &self,
1168 key: K,
1169 readopts: &ReadOptions,
1170 ) -> Result<Option<Vec<u8>>, Error> {
1171 self.get_pinned_opt(key, readopts)
1172 .map(|x| x.map(|v| v.as_ref().to_vec()))
1173 }
1174
1175 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
1179 DEFAULT_READ_OPTS.with(|opts| self.get_opt(key.as_ref(), opts))
1180 }
1181
1182 pub fn get_cf_opt<K: AsRef<[u8]>>(
1186 &self,
1187 cf: &impl AsColumnFamilyRef,
1188 key: K,
1189 readopts: &ReadOptions,
1190 ) -> Result<Option<Vec<u8>>, Error> {
1191 self.get_pinned_cf_opt(cf, key, readopts)
1192 .map(|x| x.map(|v| v.as_ref().to_vec()))
1193 }
1194
1195 pub fn get_cf<K: AsRef<[u8]>>(
1199 &self,
1200 cf: &impl AsColumnFamilyRef,
1201 key: K,
1202 ) -> Result<Option<Vec<u8>>, Error> {
1203 DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key.as_ref(), opts))
1204 }
1205
1206 pub fn get_pinned_opt<K: AsRef<[u8]>>(
1209 &'_ self,
1210 key: K,
1211 readopts: &ReadOptions,
1212 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1213 if readopts.inner.is_null() {
1214 return Err(Error::new(
1215 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1216 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1217 .to_owned(),
1218 ));
1219 }
1220
1221 let key = key.as_ref();
1222 unsafe {
1223 let val = ffi_try!(ffi::rocksdb_get_pinned(
1224 self.inner.inner(),
1225 readopts.inner,
1226 key.as_ptr() as *const c_char,
1227 key.len() as size_t,
1228 ));
1229 if val.is_null() {
1230 Ok(None)
1231 } else {
1232 Ok(Some(DBPinnableSlice::from_c(val)))
1233 }
1234 }
1235 }
1236
1237 pub fn get_pinned<K: AsRef<[u8]>>(
1241 &'_ self,
1242 key: K,
1243 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1244 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
1245 }
1246
1247 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1251 &'_ self,
1252 cf: &impl AsColumnFamilyRef,
1253 key: K,
1254 readopts: &ReadOptions,
1255 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1256 if readopts.inner.is_null() {
1257 return Err(Error::new(
1258 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1259 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1260 .to_owned(),
1261 ));
1262 }
1263
1264 let key = key.as_ref();
1265 unsafe {
1266 let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1267 self.inner.inner(),
1268 readopts.inner,
1269 cf.inner(),
1270 key.as_ptr() as *const c_char,
1271 key.len() as size_t,
1272 ));
1273 if val.is_null() {
1274 Ok(None)
1275 } else {
1276 Ok(Some(DBPinnableSlice::from_c(val)))
1277 }
1278 }
1279 }
1280
1281 pub fn get_pinned_cf<K: AsRef<[u8]>>(
1285 &'_ self,
1286 cf: &impl AsColumnFamilyRef,
1287 key: K,
1288 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1289 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
1290 }
1291
1292 pub fn get_into_buffer<K: AsRef<[u8]>>(
1362 &self,
1363 key: K,
1364 buffer: &mut [u8],
1365 ) -> Result<GetIntoBufferResult, Error> {
1366 DEFAULT_READ_OPTS.with(|opts| self.get_into_buffer_opt(key, buffer, opts))
1367 }
1368
1369 pub fn get_into_buffer_opt<K: AsRef<[u8]>>(
1376 &self,
1377 key: K,
1378 buffer: &mut [u8],
1379 readopts: &ReadOptions,
1380 ) -> Result<GetIntoBufferResult, Error> {
1381 if readopts.inner.is_null() {
1382 return Err(Error::new(
1383 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1384 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1385 .to_owned(),
1386 ));
1387 }
1388
1389 let key = key.as_ref();
1390 let mut val_len: size_t = 0;
1391 let mut found: c_uchar = 0;
1392
1393 unsafe {
1394 let success = ffi_try!(ffi::rocksdb_get_into_buffer(
1395 self.inner.inner(),
1396 readopts.inner,
1397 key.as_ptr() as *const c_char,
1398 key.len() as size_t,
1399 buffer.as_mut_ptr() as *mut c_char,
1400 buffer.len() as size_t,
1401 &raw mut val_len,
1402 &raw mut found,
1403 ));
1404
1405 if found == 0 {
1406 Ok(GetIntoBufferResult::NotFound)
1407 } else if success != 0 {
1408 Ok(GetIntoBufferResult::Found(val_len))
1409 } else {
1410 Ok(GetIntoBufferResult::BufferTooSmall(val_len))
1411 }
1412 }
1413 }
1414
1415 pub fn get_into_buffer_cf<K: AsRef<[u8]>>(
1426 &self,
1427 cf: &impl AsColumnFamilyRef,
1428 key: K,
1429 buffer: &mut [u8],
1430 ) -> Result<GetIntoBufferResult, Error> {
1431 DEFAULT_READ_OPTS.with(|opts| self.get_into_buffer_cf_opt(cf, key, buffer, opts))
1432 }
1433
1434 pub fn get_into_buffer_cf_opt<K: AsRef<[u8]>>(
1440 &self,
1441 cf: &impl AsColumnFamilyRef,
1442 key: K,
1443 buffer: &mut [u8],
1444 readopts: &ReadOptions,
1445 ) -> Result<GetIntoBufferResult, Error> {
1446 if readopts.inner.is_null() {
1447 return Err(Error::new(
1448 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1449 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1450 .to_owned(),
1451 ));
1452 }
1453
1454 let key = key.as_ref();
1455 let mut val_len: size_t = 0;
1456 let mut found: c_uchar = 0;
1457
1458 unsafe {
1459 let success = ffi_try!(ffi::rocksdb_get_into_buffer_cf(
1460 self.inner.inner(),
1461 readopts.inner,
1462 cf.inner(),
1463 key.as_ptr() as *const c_char,
1464 key.len() as size_t,
1465 buffer.as_mut_ptr() as *mut c_char,
1466 buffer.len() as size_t,
1467 &raw mut val_len,
1468 &raw mut found,
1469 ));
1470
1471 if found == 0 {
1472 Ok(GetIntoBufferResult::NotFound)
1473 } else if success != 0 {
1474 Ok(GetIntoBufferResult::Found(val_len))
1475 } else {
1476 Ok(GetIntoBufferResult::BufferTooSmall(val_len))
1477 }
1478 }
1479 }
1480
1481 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1483 where
1484 K: AsRef<[u8]>,
1485 I: IntoIterator<Item = K>,
1486 {
1487 DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
1488 }
1489
1490 pub fn multi_get_opt<K, I>(
1492 &self,
1493 keys: I,
1494 readopts: &ReadOptions,
1495 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1496 where
1497 K: AsRef<[u8]>,
1498 I: IntoIterator<Item = K>,
1499 {
1500 let owned_keys: Vec<K> = keys.into_iter().collect();
1501 let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
1502 let ptr_keys: Vec<*const c_char> = owned_keys
1503 .iter()
1504 .map(|k| k.as_ref().as_ptr() as *const c_char)
1505 .collect();
1506
1507 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1508 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
1509 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1510 unsafe {
1511 ffi::rocksdb_multi_get(
1512 self.inner.inner(),
1513 readopts.inner,
1514 ptr_keys.len(),
1515 ptr_keys.as_ptr(),
1516 keys_sizes.as_ptr(),
1517 values.as_mut_ptr(),
1518 values_sizes.as_mut_ptr(),
1519 errors.as_mut_ptr(),
1520 );
1521 }
1522
1523 unsafe {
1524 values.set_len(ptr_keys.len());
1525 values_sizes.set_len(ptr_keys.len());
1526 errors.set_len(ptr_keys.len());
1527 }
1528
1529 convert_values(values, values_sizes, errors)
1530 }
1531
1532 pub fn multi_get_pinned<K, I>(
1537 &'_ self,
1538 keys: I,
1539 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1540 where
1541 K: AsRef<[u8]>,
1542 I: IntoIterator<Item = K>,
1543 {
1544 DEFAULT_READ_OPTS.with(|opts| self.multi_get_pinned_opt(keys, opts))
1545 }
1546
1547 pub fn multi_get_pinned_opt<K, I>(
1552 &'_ self,
1553 keys: I,
1554 readopts: &ReadOptions,
1555 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1556 where
1557 K: AsRef<[u8]>,
1558 I: IntoIterator<Item = K>,
1559 {
1560 keys.into_iter()
1561 .map(|k| self.get_pinned_opt(k, readopts))
1562 .collect()
1563 }
1564
1565 pub fn multi_get_pinned_cf<'a, 'b: 'a, K, I, W>(
1568 &'a self,
1569 keys: I,
1570 ) -> Vec<Result<Option<DBPinnableSlice<'a>>, Error>>
1571 where
1572 K: AsRef<[u8]>,
1573 I: IntoIterator<Item = (&'b W, K)>,
1574 W: 'b + AsColumnFamilyRef,
1575 {
1576 DEFAULT_READ_OPTS.with(|opts| self.multi_get_pinned_cf_opt(keys, opts))
1577 }
1578
1579 pub fn multi_get_pinned_cf_opt<'a, 'b: 'a, K, I, W>(
1582 &'a self,
1583 keys: I,
1584 readopts: &ReadOptions,
1585 ) -> Vec<Result<Option<DBPinnableSlice<'a>>, Error>>
1586 where
1587 K: AsRef<[u8]>,
1588 I: IntoIterator<Item = (&'b W, K)>,
1589 W: 'b + AsColumnFamilyRef,
1590 {
1591 keys.into_iter()
1592 .map(|(cf, k)| self.get_pinned_cf_opt(cf, k, readopts))
1593 .collect()
1594 }
1595
1596 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1598 &'a self,
1599 keys: I,
1600 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1601 where
1602 K: AsRef<[u8]>,
1603 I: IntoIterator<Item = (&'b W, K)>,
1604 W: 'b + AsColumnFamilyRef,
1605 {
1606 DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
1607 }
1608
1609 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1611 &'a self,
1612 keys: I,
1613 readopts: &ReadOptions,
1614 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1615 where
1616 K: AsRef<[u8]>,
1617 I: IntoIterator<Item = (&'b W, K)>,
1618 W: 'b + AsColumnFamilyRef,
1619 {
1620 let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
1621 let keys_sizes: Vec<usize> = cfs_and_owned_keys
1622 .iter()
1623 .map(|(_, k)| k.as_ref().len())
1624 .collect();
1625 let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
1626 .iter()
1627 .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
1628 .collect();
1629 let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
1630 .iter()
1631 .map(|(c, _)| c.inner().cast_const())
1632 .collect();
1633 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1634 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
1635 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1636 unsafe {
1637 ffi::rocksdb_multi_get_cf(
1638 self.inner.inner(),
1639 readopts.inner,
1640 ptr_cfs.as_ptr(),
1641 ptr_keys.len(),
1642 ptr_keys.as_ptr(),
1643 keys_sizes.as_ptr(),
1644 values.as_mut_ptr(),
1645 values_sizes.as_mut_ptr(),
1646 errors.as_mut_ptr(),
1647 );
1648 }
1649
1650 unsafe {
1651 values.set_len(ptr_keys.len());
1652 values_sizes.set_len(ptr_keys.len());
1653 errors.set_len(ptr_keys.len());
1654 }
1655
1656 convert_values(values, values_sizes, errors)
1657 }
1658
1659 pub fn batched_multi_get_cf<'a, K, I>(
1663 &'_ self,
1664 cf: &impl AsColumnFamilyRef,
1665 keys: I,
1666 sorted_input: bool,
1667 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1668 where
1669 K: AsRef<[u8]> + 'a + ?Sized,
1670 I: IntoIterator<Item = &'a K>,
1671 {
1672 DEFAULT_READ_OPTS.with(|opts| self.batched_multi_get_cf_opt(cf, keys, sorted_input, opts))
1673 }
1674
1675 pub fn batched_multi_get_cf_opt<'a, K, I>(
1679 &'_ self,
1680 cf: &impl AsColumnFamilyRef,
1681 keys: I,
1682 sorted_input: bool,
1683 readopts: &ReadOptions,
1684 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1685 where
1686 K: AsRef<[u8]> + 'a + ?Sized,
1687 I: IntoIterator<Item = &'a K>,
1688 {
1689 let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1690 .into_iter()
1691 .map(|k| {
1692 let k = k.as_ref();
1693 (k.as_ptr() as *const c_char, k.len())
1694 })
1695 .unzip();
1696
1697 let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1698 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1699
1700 unsafe {
1701 ffi::rocksdb_batched_multi_get_cf(
1702 self.inner.inner(),
1703 readopts.inner,
1704 cf.inner(),
1705 ptr_keys.len(),
1706 ptr_keys.as_ptr(),
1707 keys_sizes.as_ptr(),
1708 pinned_values.as_mut_ptr(),
1709 errors.as_mut_ptr(),
1710 sorted_input,
1711 );
1712 pinned_values
1713 .into_iter()
1714 .zip(errors)
1715 .map(|(v, e)| {
1716 if e.is_null() {
1717 if v.is_null() {
1718 Ok(None)
1719 } else {
1720 Ok(Some(DBPinnableSlice::from_c(v)))
1721 }
1722 } else {
1723 Err(convert_rocksdb_error(e))
1724 }
1725 })
1726 .collect()
1727 }
1728 }
1729
1730 pub fn batched_multi_get_cf_slice<'a, K, I>(
1787 &'_ self,
1788 cf: &impl AsColumnFamilyRef,
1789 keys: I,
1790 sorted_input: bool,
1791 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1792 where
1793 K: AsRef<[u8]> + 'a + ?Sized,
1794 I: IntoIterator<Item = &'a K>,
1795 {
1796 DEFAULT_READ_OPTS
1797 .with(|opts| self.batched_multi_get_cf_slice_opt(cf, keys, sorted_input, opts))
1798 }
1799
1800 pub fn batched_multi_get_cf_slice_opt<'a, K, I>(
1808 &'_ self,
1809 cf: &impl AsColumnFamilyRef,
1810 keys: I,
1811 sorted_input: bool,
1812 readopts: &ReadOptions,
1813 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1814 where
1815 K: AsRef<[u8]> + 'a + ?Sized,
1816 I: IntoIterator<Item = &'a K>,
1817 {
1818 let slices: Vec<ffi::rocksdb_slice_t> = keys
1820 .into_iter()
1821 .map(|k| {
1822 let k = k.as_ref();
1823 ffi::rocksdb_slice_t {
1824 data: k.as_ptr() as *const c_char,
1825 size: k.len(),
1826 }
1827 })
1828 .collect();
1829
1830 if slices.is_empty() {
1831 return Vec::new();
1832 }
1833
1834 let mut pinned_values = vec![ptr::null_mut(); slices.len()];
1835 let mut errors = vec![ptr::null_mut(); slices.len()];
1836
1837 unsafe {
1838 ffi::rocksdb_batched_multi_get_cf_slice(
1839 self.inner.inner(),
1840 readopts.inner,
1841 cf.inner(),
1842 slices.len(),
1843 slices.as_ptr(),
1844 pinned_values.as_mut_ptr(),
1845 errors.as_mut_ptr(),
1846 sorted_input,
1847 );
1848 pinned_values
1849 .into_iter()
1850 .zip(errors)
1851 .map(|(v, e)| {
1852 if e.is_null() {
1853 if v.is_null() {
1854 Ok(None)
1855 } else {
1856 Ok(Some(DBPinnableSlice::from_c(v)))
1857 }
1858 } else {
1859 Err(convert_rocksdb_error(e))
1860 }
1861 })
1862 .collect()
1863 }
1864 }
1865
1866 pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1869 DEFAULT_READ_OPTS.with(|opts| self.key_may_exist_opt(key, opts))
1870 }
1871
1872 pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1875 let key = key.as_ref();
1876 unsafe {
1877 0 != ffi::rocksdb_key_may_exist(
1878 self.inner.inner(),
1879 readopts.inner,
1880 key.as_ptr() as *const c_char,
1881 key.len() as size_t,
1882 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1888 }
1889 }
1890
1891 pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1894 DEFAULT_READ_OPTS.with(|opts| self.key_may_exist_cf_opt(cf, key, opts))
1895 }
1896
1897 pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1900 &self,
1901 cf: &impl AsColumnFamilyRef,
1902 key: K,
1903 readopts: &ReadOptions,
1904 ) -> bool {
1905 let key = key.as_ref();
1906 0 != unsafe {
1907 ffi::rocksdb_key_may_exist_cf(
1908 self.inner.inner(),
1909 readopts.inner,
1910 cf.inner(),
1911 key.as_ptr() as *const c_char,
1912 key.len() as size_t,
1913 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1919 }
1920 }
1921
1922 pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1929 &self,
1930 cf: &impl AsColumnFamilyRef,
1931 key: K,
1932 readopts: &ReadOptions,
1933 ) -> (bool, Option<CSlice>) {
1934 let key = key.as_ref();
1935 let mut val: *mut c_char = ptr::null_mut();
1936 let mut val_len: usize = 0;
1937 let mut value_found: c_uchar = 0;
1938 let may_exists = 0
1939 != unsafe {
1940 ffi::rocksdb_key_may_exist_cf(
1941 self.inner.inner(),
1942 readopts.inner,
1943 cf.inner(),
1944 key.as_ptr() as *const c_char,
1945 key.len() as size_t,
1946 &raw mut val, &raw mut val_len, ptr::null(), 0, &raw mut value_found, )
1952 };
1953 if may_exists && value_found != 0 {
1956 (
1957 may_exists,
1958 Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1959 )
1960 } else {
1961 (may_exists, None)
1962 }
1963 }
1964
1965 fn create_inner_cf_handle(
1966 &self,
1967 name: impl CStrLike,
1968 opts: &Options,
1969 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1970 let cf_name = name.bake().map_err(|err| {
1971 Error::new(format!(
1972 "Failed to convert path to CString when creating cf: {err}"
1973 ))
1974 })?;
1975
1976 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
1979 let cf_handle = unsafe {
1980 ffi::rocksdb_create_column_family(
1981 self.inner.inner(),
1982 opts.inner,
1983 cf_name.as_ptr(),
1984 &raw mut err,
1985 )
1986 };
1987 if !err.is_null() {
1988 if !cf_handle.is_null() {
1989 unsafe { ffi::rocksdb_column_family_handle_destroy(cf_handle) };
1990 }
1991 return Err(convert_rocksdb_error(err));
1992 }
1993 Ok(cf_handle)
1994 }
1995
1996 pub fn iterator<'a: 'b, 'b>(
1997 &'a self,
1998 mode: IteratorMode,
1999 ) -> DBIteratorWithThreadMode<'b, Self> {
2000 let readopts = ReadOptions::default();
2001 self.iterator_opt(mode, readopts)
2002 }
2003
2004 pub fn iterator_opt<'a: 'b, 'b>(
2005 &'a self,
2006 mode: IteratorMode,
2007 readopts: ReadOptions,
2008 ) -> DBIteratorWithThreadMode<'b, Self> {
2009 DBIteratorWithThreadMode::new(self, readopts, mode)
2010 }
2011
2012 pub fn iterator_cf_opt<'a: 'b, 'b>(
2015 &'a self,
2016 cf_handle: &impl AsColumnFamilyRef,
2017 readopts: ReadOptions,
2018 mode: IteratorMode,
2019 ) -> DBIteratorWithThreadMode<'b, Self> {
2020 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
2021 }
2022
2023 pub fn full_iterator<'a: 'b, 'b>(
2027 &'a self,
2028 mode: IteratorMode,
2029 ) -> DBIteratorWithThreadMode<'b, Self> {
2030 let mut opts = ReadOptions::default();
2031 opts.set_total_order_seek(true);
2032 DBIteratorWithThreadMode::new(self, opts, mode)
2033 }
2034
2035 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
2036 &'a self,
2037 prefix: P,
2038 ) -> DBIteratorWithThreadMode<'b, Self> {
2039 let mut opts = ReadOptions::default();
2040 opts.set_prefix_same_as_start(true);
2041 DBIteratorWithThreadMode::new(
2042 self,
2043 opts,
2044 IteratorMode::From(prefix.as_ref(), Direction::Forward),
2045 )
2046 }
2047
2048 pub fn iterator_cf<'a: 'b, 'b>(
2049 &'a self,
2050 cf_handle: &impl AsColumnFamilyRef,
2051 mode: IteratorMode,
2052 ) -> DBIteratorWithThreadMode<'b, Self> {
2053 let opts = ReadOptions::default();
2054 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
2055 }
2056
2057 pub fn full_iterator_cf<'a: 'b, 'b>(
2058 &'a self,
2059 cf_handle: &impl AsColumnFamilyRef,
2060 mode: IteratorMode,
2061 ) -> DBIteratorWithThreadMode<'b, Self> {
2062 let mut opts = ReadOptions::default();
2063 opts.set_total_order_seek(true);
2064 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
2065 }
2066
2067 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
2068 &'a self,
2069 cf_handle: &impl AsColumnFamilyRef,
2070 prefix: P,
2071 ) -> DBIteratorWithThreadMode<'a, Self> {
2072 let mut opts = ReadOptions::default();
2073 opts.set_prefix_same_as_start(true);
2074 DBIteratorWithThreadMode::<'a, Self>::new_cf(
2075 self,
2076 cf_handle.inner(),
2077 opts,
2078 IteratorMode::From(prefix.as_ref(), Direction::Forward),
2079 )
2080 }
2081
2082 pub fn prefix_exists<P: AsRef<[u8]>>(&self, prefix: P) -> Result<bool, Error> {
2089 let p = prefix.as_ref();
2090 PREFIX_READ_OPTS.with(|rc| {
2091 let mut opts = rc.borrow_mut();
2092 opts.set_iterate_range(crate::PrefixRange(p));
2093 self.prefix_exists_opt(p, &opts)
2094 })
2095 }
2096
2097 pub fn prefix_exists_opt<P: AsRef<[u8]>>(
2100 &self,
2101 prefix: P,
2102 readopts: &ReadOptions,
2103 ) -> Result<bool, Error> {
2104 let prefix = prefix.as_ref();
2105 let iter = unsafe { self.create_iterator(readopts) };
2106 let res = unsafe {
2107 ffi::rocksdb_iter_seek(
2108 iter,
2109 prefix.as_ptr() as *const c_char,
2110 prefix.len() as size_t,
2111 );
2112 if ffi::rocksdb_iter_valid(iter) != 0 {
2113 let mut key_len: size_t = 0;
2114 let key_ptr = ffi::rocksdb_iter_key(iter, &raw mut key_len);
2115 let key = slice::from_raw_parts(key_ptr as *const u8, key_len as usize);
2116 Ok(key.starts_with(prefix))
2117 } else if let Err(e) = (|| {
2118 ffi_try!(ffi::rocksdb_iter_get_error(iter));
2120 Ok::<(), Error>(())
2121 })() {
2122 Err(e)
2123 } else {
2124 Ok(false)
2125 }
2126 };
2127 unsafe { ffi::rocksdb_iter_destroy(iter) };
2128 res
2129 }
2130
2131 pub fn prefix_prober(&self) -> PrefixProber<'_, Self> {
2139 let mut opts = ReadOptions::default();
2140 opts.set_prefix_same_as_start(true);
2141 PrefixProber {
2142 raw: DBRawIteratorWithThreadMode::new(self, opts),
2143 }
2144 }
2145
2146 pub fn prefix_prober_with_opts(&self, readopts: ReadOptions) -> PrefixProber<'_, Self> {
2153 PrefixProber {
2154 raw: DBRawIteratorWithThreadMode::new(self, readopts),
2155 }
2156 }
2157
2158 pub fn prefix_prober_cf(&self, cf_handle: &impl AsColumnFamilyRef) -> PrefixProber<'_, Self> {
2161 let mut opts = ReadOptions::default();
2162 opts.set_prefix_same_as_start(true);
2163 PrefixProber {
2164 raw: DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts),
2165 }
2166 }
2167
2168 pub fn prefix_prober_cf_with_opts(
2173 &self,
2174 cf_handle: &impl AsColumnFamilyRef,
2175 readopts: ReadOptions,
2176 ) -> PrefixProber<'_, Self> {
2177 PrefixProber {
2178 raw: DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts),
2179 }
2180 }
2181
2182 pub fn prefix_exists_cf<P: AsRef<[u8]>>(
2188 &self,
2189 cf_handle: &impl AsColumnFamilyRef,
2190 prefix: P,
2191 ) -> Result<bool, Error> {
2192 let p = prefix.as_ref();
2193 PREFIX_READ_OPTS.with(|rc| {
2194 let mut opts = rc.borrow_mut();
2195 opts.set_iterate_range(crate::PrefixRange(p));
2196 self.prefix_exists_cf_opt(cf_handle, p, &opts)
2197 })
2198 }
2199
2200 pub fn prefix_exists_cf_opt<P: AsRef<[u8]>>(
2203 &self,
2204 cf_handle: &impl AsColumnFamilyRef,
2205 prefix: P,
2206 readopts: &ReadOptions,
2207 ) -> Result<bool, Error> {
2208 let prefix = prefix.as_ref();
2209 let iter = unsafe { self.create_iterator_cf(cf_handle.inner(), readopts) };
2210 let res = unsafe {
2211 ffi::rocksdb_iter_seek(
2212 iter,
2213 prefix.as_ptr() as *const c_char,
2214 prefix.len() as size_t,
2215 );
2216 if ffi::rocksdb_iter_valid(iter) != 0 {
2217 let mut key_len: size_t = 0;
2218 let key_ptr = ffi::rocksdb_iter_key(iter, &raw mut key_len);
2219 let key = slice::from_raw_parts(key_ptr as *const u8, key_len as usize);
2220 Ok(key.starts_with(prefix))
2221 } else if let Err(e) = (|| {
2222 ffi_try!(ffi::rocksdb_iter_get_error(iter));
2223 Ok::<(), Error>(())
2224 })() {
2225 Err(e)
2226 } else {
2227 Ok(false)
2228 }
2229 };
2230 unsafe { ffi::rocksdb_iter_destroy(iter) };
2231 res
2232 }
2233
2234 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
2236 let opts = ReadOptions::default();
2237 DBRawIteratorWithThreadMode::new(self, opts)
2238 }
2239
2240 pub fn raw_iterator_cf<'a: 'b, 'b>(
2242 &'a self,
2243 cf_handle: &impl AsColumnFamilyRef,
2244 ) -> DBRawIteratorWithThreadMode<'b, Self> {
2245 let opts = ReadOptions::default();
2246 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
2247 }
2248
2249 pub fn raw_iterator_opt<'a: 'b, 'b>(
2251 &'a self,
2252 readopts: ReadOptions,
2253 ) -> DBRawIteratorWithThreadMode<'b, Self> {
2254 DBRawIteratorWithThreadMode::new(self, readopts)
2255 }
2256
2257 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
2259 &'a self,
2260 cf_handle: &impl AsColumnFamilyRef,
2261 readopts: ReadOptions,
2262 ) -> DBRawIteratorWithThreadMode<'b, Self> {
2263 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
2264 }
2265
2266 pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
2267 SnapshotWithThreadMode::<Self>::new(self)
2268 }
2269
2270 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
2271 where
2272 K: AsRef<[u8]>,
2273 V: AsRef<[u8]>,
2274 {
2275 let key = key.as_ref();
2276 let value = value.as_ref();
2277
2278 unsafe {
2279 ffi_try!(ffi::rocksdb_put(
2280 self.inner.inner(),
2281 writeopts.inner,
2282 key.as_ptr() as *const c_char,
2283 key.len() as size_t,
2284 value.as_ptr() as *const c_char,
2285 value.len() as size_t,
2286 ));
2287 Ok(())
2288 }
2289 }
2290
2291 pub fn put_cf_opt<K, V>(
2292 &self,
2293 cf: &impl AsColumnFamilyRef,
2294 key: K,
2295 value: V,
2296 writeopts: &WriteOptions,
2297 ) -> Result<(), Error>
2298 where
2299 K: AsRef<[u8]>,
2300 V: AsRef<[u8]>,
2301 {
2302 let key = key.as_ref();
2303 let value = value.as_ref();
2304
2305 unsafe {
2306 ffi_try!(ffi::rocksdb_put_cf(
2307 self.inner.inner(),
2308 writeopts.inner,
2309 cf.inner(),
2310 key.as_ptr() as *const c_char,
2311 key.len() as size_t,
2312 value.as_ptr() as *const c_char,
2313 value.len() as size_t,
2314 ));
2315 Ok(())
2316 }
2317 }
2318
2319 pub fn put_with_ts_opt<K, V, S>(
2326 &self,
2327 key: K,
2328 ts: S,
2329 value: V,
2330 writeopts: &WriteOptions,
2331 ) -> Result<(), Error>
2332 where
2333 K: AsRef<[u8]>,
2334 V: AsRef<[u8]>,
2335 S: AsRef<[u8]>,
2336 {
2337 let key = key.as_ref();
2338 let value = value.as_ref();
2339 let ts = ts.as_ref();
2340 unsafe {
2341 ffi_try!(ffi::rocksdb_put_with_ts(
2342 self.inner.inner(),
2343 writeopts.inner,
2344 key.as_ptr() as *const c_char,
2345 key.len() as size_t,
2346 ts.as_ptr() as *const c_char,
2347 ts.len() as size_t,
2348 value.as_ptr() as *const c_char,
2349 value.len() as size_t,
2350 ));
2351 Ok(())
2352 }
2353 }
2354
2355 pub fn put_cf_with_ts_opt<K, V, S>(
2362 &self,
2363 cf: &impl AsColumnFamilyRef,
2364 key: K,
2365 ts: S,
2366 value: V,
2367 writeopts: &WriteOptions,
2368 ) -> Result<(), Error>
2369 where
2370 K: AsRef<[u8]>,
2371 V: AsRef<[u8]>,
2372 S: AsRef<[u8]>,
2373 {
2374 let key = key.as_ref();
2375 let value = value.as_ref();
2376 let ts = ts.as_ref();
2377 unsafe {
2378 ffi_try!(ffi::rocksdb_put_cf_with_ts(
2379 self.inner.inner(),
2380 writeopts.inner,
2381 cf.inner(),
2382 key.as_ptr() as *const c_char,
2383 key.len() as size_t,
2384 ts.as_ptr() as *const c_char,
2385 ts.len() as size_t,
2386 value.as_ptr() as *const c_char,
2387 value.len() as size_t,
2388 ));
2389 Ok(())
2390 }
2391 }
2392
2393 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
2394 where
2395 K: AsRef<[u8]>,
2396 V: AsRef<[u8]>,
2397 {
2398 let key = key.as_ref();
2399 let value = value.as_ref();
2400
2401 unsafe {
2402 ffi_try!(ffi::rocksdb_merge(
2403 self.inner.inner(),
2404 writeopts.inner,
2405 key.as_ptr() as *const c_char,
2406 key.len() as size_t,
2407 value.as_ptr() as *const c_char,
2408 value.len() as size_t,
2409 ));
2410 Ok(())
2411 }
2412 }
2413
2414 pub fn merge_cf_opt<K, V>(
2415 &self,
2416 cf: &impl AsColumnFamilyRef,
2417 key: K,
2418 value: V,
2419 writeopts: &WriteOptions,
2420 ) -> Result<(), Error>
2421 where
2422 K: AsRef<[u8]>,
2423 V: AsRef<[u8]>,
2424 {
2425 let key = key.as_ref();
2426 let value = value.as_ref();
2427
2428 unsafe {
2429 ffi_try!(ffi::rocksdb_merge_cf(
2430 self.inner.inner(),
2431 writeopts.inner,
2432 cf.inner(),
2433 key.as_ptr() as *const c_char,
2434 key.len() as size_t,
2435 value.as_ptr() as *const c_char,
2436 value.len() as size_t,
2437 ));
2438 Ok(())
2439 }
2440 }
2441
2442 pub fn delete_opt<K: AsRef<[u8]>>(
2443 &self,
2444 key: K,
2445 writeopts: &WriteOptions,
2446 ) -> Result<(), Error> {
2447 let key = key.as_ref();
2448
2449 unsafe {
2450 ffi_try!(ffi::rocksdb_delete(
2451 self.inner.inner(),
2452 writeopts.inner,
2453 key.as_ptr() as *const c_char,
2454 key.len() as size_t,
2455 ));
2456 Ok(())
2457 }
2458 }
2459
2460 pub fn delete_cf_opt<K: AsRef<[u8]>>(
2461 &self,
2462 cf: &impl AsColumnFamilyRef,
2463 key: K,
2464 writeopts: &WriteOptions,
2465 ) -> Result<(), Error> {
2466 let key = key.as_ref();
2467
2468 unsafe {
2469 ffi_try!(ffi::rocksdb_delete_cf(
2470 self.inner.inner(),
2471 writeopts.inner,
2472 cf.inner(),
2473 key.as_ptr() as *const c_char,
2474 key.len() as size_t,
2475 ));
2476 Ok(())
2477 }
2478 }
2479
2480 pub fn delete_with_ts_opt<K, S>(
2484 &self,
2485 key: K,
2486 ts: S,
2487 writeopts: &WriteOptions,
2488 ) -> Result<(), Error>
2489 where
2490 K: AsRef<[u8]>,
2491 S: AsRef<[u8]>,
2492 {
2493 let key = key.as_ref();
2494 let ts = ts.as_ref();
2495 unsafe {
2496 ffi_try!(ffi::rocksdb_delete_with_ts(
2497 self.inner.inner(),
2498 writeopts.inner,
2499 key.as_ptr() as *const c_char,
2500 key.len() as size_t,
2501 ts.as_ptr() as *const c_char,
2502 ts.len() as size_t,
2503 ));
2504 Ok(())
2505 }
2506 }
2507
2508 pub fn delete_cf_with_ts_opt<K, S>(
2512 &self,
2513 cf: &impl AsColumnFamilyRef,
2514 key: K,
2515 ts: S,
2516 writeopts: &WriteOptions,
2517 ) -> Result<(), Error>
2518 where
2519 K: AsRef<[u8]>,
2520 S: AsRef<[u8]>,
2521 {
2522 let key = key.as_ref();
2523 let ts = ts.as_ref();
2524 unsafe {
2525 ffi_try!(ffi::rocksdb_delete_cf_with_ts(
2526 self.inner.inner(),
2527 writeopts.inner,
2528 cf.inner(),
2529 key.as_ptr() as *const c_char,
2530 key.len() as size_t,
2531 ts.as_ptr() as *const c_char,
2532 ts.len() as size_t,
2533 ));
2534 Ok(())
2535 }
2536 }
2537
2538 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
2539 where
2540 K: AsRef<[u8]>,
2541 V: AsRef<[u8]>,
2542 {
2543 DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
2544 }
2545
2546 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
2547 where
2548 K: AsRef<[u8]>,
2549 V: AsRef<[u8]>,
2550 {
2551 DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
2552 }
2553
2554 pub fn put_with_ts<K, V, S>(&self, key: K, ts: S, value: V) -> Result<(), Error>
2561 where
2562 K: AsRef<[u8]>,
2563 V: AsRef<[u8]>,
2564 S: AsRef<[u8]>,
2565 {
2566 DEFAULT_WRITE_OPTS
2567 .with(|opts| self.put_with_ts_opt(key.as_ref(), ts.as_ref(), value.as_ref(), opts))
2568 }
2569
2570 pub fn put_cf_with_ts<K, V, S>(
2577 &self,
2578 cf: &impl AsColumnFamilyRef,
2579 key: K,
2580 ts: S,
2581 value: V,
2582 ) -> Result<(), Error>
2583 where
2584 K: AsRef<[u8]>,
2585 V: AsRef<[u8]>,
2586 S: AsRef<[u8]>,
2587 {
2588 DEFAULT_WRITE_OPTS.with(|opts| {
2589 self.put_cf_with_ts_opt(cf, key.as_ref(), ts.as_ref(), value.as_ref(), opts)
2590 })
2591 }
2592
2593 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
2594 where
2595 K: AsRef<[u8]>,
2596 V: AsRef<[u8]>,
2597 {
2598 DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
2599 }
2600
2601 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
2602 where
2603 K: AsRef<[u8]>,
2604 V: AsRef<[u8]>,
2605 {
2606 DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
2607 }
2608
2609 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2610 DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
2611 }
2612
2613 pub fn delete_cf<K: AsRef<[u8]>>(
2614 &self,
2615 cf: &impl AsColumnFamilyRef,
2616 key: K,
2617 ) -> Result<(), Error> {
2618 DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
2619 }
2620
2621 pub fn delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2625 &self,
2626 key: K,
2627 ts: S,
2628 ) -> Result<(), Error> {
2629 DEFAULT_WRITE_OPTS.with(|opts| self.delete_with_ts_opt(key, ts, opts))
2630 }
2631
2632 pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2636 &self,
2637 cf: &impl AsColumnFamilyRef,
2638 key: K,
2639 ts: S,
2640 ) -> Result<(), Error> {
2641 DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_with_ts_opt(cf, key, ts, opts))
2642 }
2643
2644 pub fn single_delete_opt<K: AsRef<[u8]>>(
2664 &self,
2665 key: K,
2666 writeopts: &WriteOptions,
2667 ) -> Result<(), Error> {
2668 let key = key.as_ref();
2669
2670 unsafe {
2671 ffi_try!(ffi::rocksdb_singledelete(
2672 self.inner.inner(),
2673 writeopts.inner,
2674 key.as_ptr() as *const c_char,
2675 key.len() as size_t,
2676 ));
2677 Ok(())
2678 }
2679 }
2680
2681 pub fn single_delete_cf_opt<K: AsRef<[u8]>>(
2685 &self,
2686 cf: &impl AsColumnFamilyRef,
2687 key: K,
2688 writeopts: &WriteOptions,
2689 ) -> Result<(), Error> {
2690 let key = key.as_ref();
2691
2692 unsafe {
2693 ffi_try!(ffi::rocksdb_singledelete_cf(
2694 self.inner.inner(),
2695 writeopts.inner,
2696 cf.inner(),
2697 key.as_ptr() as *const c_char,
2698 key.len() as size_t,
2699 ));
2700 Ok(())
2701 }
2702 }
2703
2704 pub fn single_delete_with_ts_opt<K, S>(
2711 &self,
2712 key: K,
2713 ts: S,
2714 writeopts: &WriteOptions,
2715 ) -> Result<(), Error>
2716 where
2717 K: AsRef<[u8]>,
2718 S: AsRef<[u8]>,
2719 {
2720 let key = key.as_ref();
2721 let ts = ts.as_ref();
2722 unsafe {
2723 ffi_try!(ffi::rocksdb_singledelete_with_ts(
2724 self.inner.inner(),
2725 writeopts.inner,
2726 key.as_ptr() as *const c_char,
2727 key.len() as size_t,
2728 ts.as_ptr() as *const c_char,
2729 ts.len() as size_t,
2730 ));
2731 Ok(())
2732 }
2733 }
2734
2735 pub fn single_delete_cf_with_ts_opt<K, S>(
2742 &self,
2743 cf: &impl AsColumnFamilyRef,
2744 key: K,
2745 ts: S,
2746 writeopts: &WriteOptions,
2747 ) -> Result<(), Error>
2748 where
2749 K: AsRef<[u8]>,
2750 S: AsRef<[u8]>,
2751 {
2752 let key = key.as_ref();
2753 let ts = ts.as_ref();
2754 unsafe {
2755 ffi_try!(ffi::rocksdb_singledelete_cf_with_ts(
2756 self.inner.inner(),
2757 writeopts.inner,
2758 cf.inner(),
2759 key.as_ptr() as *const c_char,
2760 key.len() as size_t,
2761 ts.as_ptr() as *const c_char,
2762 ts.len() as size_t,
2763 ));
2764 Ok(())
2765 }
2766 }
2767
2768 pub fn single_delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2772 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_opt(key, opts))
2773 }
2774
2775 pub fn single_delete_cf<K: AsRef<[u8]>>(
2779 &self,
2780 cf: &impl AsColumnFamilyRef,
2781 key: K,
2782 ) -> Result<(), Error> {
2783 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_cf_opt(cf, key, opts))
2784 }
2785
2786 pub fn single_delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2793 &self,
2794 key: K,
2795 ts: S,
2796 ) -> Result<(), Error> {
2797 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_with_ts_opt(key, ts, opts))
2798 }
2799
2800 pub fn single_delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2807 &self,
2808 cf: &impl AsColumnFamilyRef,
2809 key: K,
2810 ts: S,
2811 ) -> Result<(), Error> {
2812 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_cf_with_ts_opt(cf, key, ts, opts))
2813 }
2814
2815 pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
2817 unsafe {
2818 let start = start.as_ref().map(AsRef::as_ref);
2819 let end = end.as_ref().map(AsRef::as_ref);
2820
2821 ffi::rocksdb_compact_range(
2822 self.inner.inner(),
2823 opt_bytes_to_ptr(start),
2824 start.map_or(0, <[u8]>::len) as size_t,
2825 opt_bytes_to_ptr(end),
2826 end.map_or(0, <[u8]>::len) as size_t,
2827 );
2828 }
2829 }
2830
2831 pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2833 &self,
2834 start: Option<S>,
2835 end: Option<E>,
2836 opts: &CompactOptions,
2837 ) {
2838 unsafe {
2839 let start = start.as_ref().map(AsRef::as_ref);
2840 let end = end.as_ref().map(AsRef::as_ref);
2841
2842 ffi::rocksdb_compact_range_opt(
2843 self.inner.inner(),
2844 opts.inner,
2845 opt_bytes_to_ptr(start),
2846 start.map_or(0, <[u8]>::len) as size_t,
2847 opt_bytes_to_ptr(end),
2848 end.map_or(0, <[u8]>::len) as size_t,
2849 );
2850 }
2851 }
2852
2853 pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2856 &self,
2857 cf: &impl AsColumnFamilyRef,
2858 start: Option<S>,
2859 end: Option<E>,
2860 ) {
2861 unsafe {
2862 let start = start.as_ref().map(AsRef::as_ref);
2863 let end = end.as_ref().map(AsRef::as_ref);
2864
2865 ffi::rocksdb_compact_range_cf(
2866 self.inner.inner(),
2867 cf.inner(),
2868 opt_bytes_to_ptr(start),
2869 start.map_or(0, <[u8]>::len) as size_t,
2870 opt_bytes_to_ptr(end),
2871 end.map_or(0, <[u8]>::len) as size_t,
2872 );
2873 }
2874 }
2875
2876 pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2878 &self,
2879 cf: &impl AsColumnFamilyRef,
2880 start: Option<S>,
2881 end: Option<E>,
2882 opts: &CompactOptions,
2883 ) {
2884 unsafe {
2885 let start = start.as_ref().map(AsRef::as_ref);
2886 let end = end.as_ref().map(AsRef::as_ref);
2887
2888 ffi::rocksdb_compact_range_cf_opt(
2889 self.inner.inner(),
2890 cf.inner(),
2891 opts.inner,
2892 opt_bytes_to_ptr(start),
2893 start.map_or(0, <[u8]>::len) as size_t,
2894 opt_bytes_to_ptr(end),
2895 end.map_or(0, <[u8]>::len) as size_t,
2896 );
2897 }
2898 }
2899
2900 pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
2909 unsafe {
2910 ffi_try!(ffi::rocksdb_wait_for_compact(
2911 self.inner.inner(),
2912 opts.inner
2913 ));
2914 }
2915 Ok(())
2916 }
2917
2918 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
2919 let copts = convert_options(opts)?;
2920 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2921 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2922 let count = opts.len() as i32;
2923 unsafe {
2924 ffi_try!(ffi::rocksdb_set_options(
2925 self.inner.inner(),
2926 count,
2927 cnames.as_ptr(),
2928 cvalues.as_ptr(),
2929 ));
2930 }
2931 Ok(())
2932 }
2933
2934 pub fn set_options_cf(
2935 &self,
2936 cf: &impl AsColumnFamilyRef,
2937 opts: &[(&str, &str)],
2938 ) -> Result<(), Error> {
2939 let copts = convert_options(opts)?;
2940 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2941 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2942 let count = opts.len() as i32;
2943 unsafe {
2944 ffi_try!(ffi::rocksdb_set_options_cf(
2945 self.inner.inner(),
2946 cf.inner(),
2947 count,
2948 cnames.as_ptr(),
2949 cvalues.as_ptr(),
2950 ));
2951 }
2952 Ok(())
2953 }
2954
2955 fn property_value_impl<R>(
2964 name: impl CStrLike,
2965 get_property: impl FnOnce(*const c_char) -> *mut c_char,
2966 parse: impl FnOnce(&str) -> Result<R, Error>,
2967 ) -> Result<Option<R>, Error> {
2968 let value = match name.bake() {
2969 Ok(prop_name) => get_property(prop_name.as_ptr()),
2970 Err(e) => {
2971 return Err(Error::new(format!(
2972 "Failed to convert property name to CString: {e}"
2973 )));
2974 }
2975 };
2976 if value.is_null() {
2977 return Ok(None);
2978 }
2979 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
2980 Ok(s) => parse(s).map(|value| Some(value)),
2981 Err(e) => Err(Error::new(format!(
2982 "Failed to convert property value to string: {e}"
2983 ))),
2984 };
2985 unsafe {
2986 ffi::rocksdb_free(value as *mut c_void);
2987 }
2988 result
2989 }
2990
2991 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
2996 Self::property_value_impl(
2997 name,
2998 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
2999 |str_value| Ok(str_value.to_owned()),
3000 )
3001 }
3002
3003 pub fn property_value_cf(
3008 &self,
3009 cf: &impl AsColumnFamilyRef,
3010 name: impl CStrLike,
3011 ) -> Result<Option<String>, Error> {
3012 Self::property_value_impl(
3013 name,
3014 |prop_name| unsafe {
3015 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
3016 },
3017 |str_value| Ok(str_value.to_owned()),
3018 )
3019 }
3020
3021 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
3022 value.parse::<u64>().map_err(|err| {
3023 Error::new(format!(
3024 "Failed to convert property value {value} to int: {err}"
3025 ))
3026 })
3027 }
3028
3029 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
3034 Self::property_value_impl(
3035 name,
3036 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
3037 Self::parse_property_int_value,
3038 )
3039 }
3040
3041 pub fn property_int_value_cf(
3046 &self,
3047 cf: &impl AsColumnFamilyRef,
3048 name: impl CStrLike,
3049 ) -> Result<Option<u64>, Error> {
3050 Self::property_value_impl(
3051 name,
3052 |prop_name| unsafe {
3053 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
3054 },
3055 Self::parse_property_int_value,
3056 )
3057 }
3058
3059 pub fn latest_sequence_number(&self) -> u64 {
3061 unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
3062 }
3063
3064 pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
3072 self.get_approximate_sizes_cfopt(None::<&ColumnFamily>, ranges)
3073 }
3074
3075 pub fn get_approximate_sizes_cf(
3076 &self,
3077 cf: &impl AsColumnFamilyRef,
3078 ranges: &[Range],
3079 ) -> Vec<u64> {
3080 self.get_approximate_sizes_cfopt(Some(cf), ranges)
3081 }
3082
3083 fn get_approximate_sizes_cfopt(
3084 &self,
3085 cf: Option<&impl AsColumnFamilyRef>,
3086 ranges: &[Range],
3087 ) -> Vec<u64> {
3088 let start_keys: Vec<*const c_char> = ranges
3089 .iter()
3090 .map(|x| x.start_key.as_ptr() as *const c_char)
3091 .collect();
3092 let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
3093 let end_keys: Vec<*const c_char> = ranges
3094 .iter()
3095 .map(|x| x.end_key.as_ptr() as *const c_char)
3096 .collect();
3097 let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
3098 let mut sizes: Vec<u64> = vec![0; ranges.len()];
3099 let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
3100 ranges.len() as i32,
3101 start_keys.as_ptr(),
3102 start_key_lens.as_ptr(),
3103 end_keys.as_ptr(),
3104 end_key_lens.as_ptr(),
3105 sizes.as_mut_ptr(),
3106 );
3107 let mut err: *mut c_char = ptr::null_mut();
3108 match cf {
3109 None => unsafe {
3110 ffi::rocksdb_approximate_sizes(
3111 self.inner.inner(),
3112 n,
3113 start_key_ptr,
3114 start_key_len_ptr,
3115 end_key_ptr,
3116 end_key_len_ptr,
3117 size_ptr,
3118 &raw mut err,
3119 );
3120 },
3121 Some(cf) => unsafe {
3122 ffi::rocksdb_approximate_sizes_cf(
3123 self.inner.inner(),
3124 cf.inner(),
3125 n,
3126 start_key_ptr,
3127 start_key_len_ptr,
3128 end_key_ptr,
3129 end_key_len_ptr,
3130 size_ptr,
3131 &raw mut err,
3132 );
3133 },
3134 }
3135 sizes
3136 }
3137
3138 pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
3149 unsafe {
3150 let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
3154 let iter = ffi_try!(ffi::rocksdb_get_updates_since(
3155 self.inner.inner(),
3156 seq_number,
3157 opts
3158 ));
3159 Ok(DBWALIterator {
3160 inner: iter,
3161 start_seq_number: seq_number,
3162 })
3163 }
3164 }
3165
3166 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
3169 unsafe {
3170 ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
3171 }
3172 Ok(())
3173 }
3174
3175 pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
3177 let opts = IngestExternalFileOptions::default();
3178 self.ingest_external_file_opts(&opts, paths)
3179 }
3180
3181 pub fn ingest_external_file_opts<P: AsRef<Path>>(
3183 &self,
3184 opts: &IngestExternalFileOptions,
3185 paths: Vec<P>,
3186 ) -> Result<(), Error> {
3187 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
3188 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
3189
3190 self.ingest_external_file_raw(opts, &paths_v, &cpaths)
3191 }
3192
3193 pub fn ingest_external_file_cf<P: AsRef<Path>>(
3196 &self,
3197 cf: &impl AsColumnFamilyRef,
3198 paths: Vec<P>,
3199 ) -> Result<(), Error> {
3200 let opts = IngestExternalFileOptions::default();
3201 self.ingest_external_file_cf_opts(cf, &opts, paths)
3202 }
3203
3204 pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
3206 &self,
3207 cf: &impl AsColumnFamilyRef,
3208 opts: &IngestExternalFileOptions,
3209 paths: Vec<P>,
3210 ) -> Result<(), Error> {
3211 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
3212 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
3213
3214 self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
3215 }
3216
3217 fn ingest_external_file_raw(
3218 &self,
3219 opts: &IngestExternalFileOptions,
3220 paths_v: &[CString],
3221 cpaths: &[*const c_char],
3222 ) -> Result<(), Error> {
3223 unsafe {
3224 ffi_try!(ffi::rocksdb_ingest_external_file(
3225 self.inner.inner(),
3226 cpaths.as_ptr(),
3227 paths_v.len(),
3228 opts.inner.cast_const()
3229 ));
3230 Ok(())
3231 }
3232 }
3233
3234 fn ingest_external_file_raw_cf(
3235 &self,
3236 cf: &impl AsColumnFamilyRef,
3237 opts: &IngestExternalFileOptions,
3238 paths_v: &[CString],
3239 cpaths: &[*const c_char],
3240 ) -> Result<(), Error> {
3241 unsafe {
3242 ffi_try!(ffi::rocksdb_ingest_external_file_cf(
3243 self.inner.inner(),
3244 cf.inner(),
3245 cpaths.as_ptr(),
3246 paths_v.len(),
3247 opts.inner.cast_const()
3248 ));
3249 Ok(())
3250 }
3251 }
3252
3253 pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
3255 unsafe {
3256 let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
3257
3258 let metadata = ColumnFamilyMetaData {
3259 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
3260 name: from_cstr_and_free(ffi::rocksdb_column_family_metadata_get_name(ptr)),
3261 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
3262 };
3263
3264 ffi::rocksdb_column_family_metadata_destroy(ptr);
3266
3267 metadata
3269 }
3270 }
3271
3272 pub fn get_column_family_metadata_cf(
3274 &self,
3275 cf: &impl AsColumnFamilyRef,
3276 ) -> ColumnFamilyMetaData {
3277 unsafe {
3278 let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
3279
3280 let metadata = ColumnFamilyMetaData {
3281 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
3282 name: from_cstr_and_free(ffi::rocksdb_column_family_metadata_get_name(ptr)),
3283 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
3284 };
3285
3286 ffi::rocksdb_column_family_metadata_destroy(ptr);
3288
3289 metadata
3291 }
3292 }
3293
3294 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
3297 unsafe {
3298 let livefiles_ptr = ffi::rocksdb_livefiles(self.inner.inner());
3299 if livefiles_ptr.is_null() {
3300 Err(Error::new("Could not get live files".to_owned()))
3301 } else {
3302 let files = LiveFile::from_rocksdb_livefiles_ptr(livefiles_ptr);
3303
3304 ffi::rocksdb_livefiles_destroy(livefiles_ptr);
3306
3307 Ok(files)
3309 }
3310 }
3311 }
3312
3313 pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
3322 let from = from.as_ref();
3323 let to = to.as_ref();
3324 unsafe {
3325 ffi_try!(ffi::rocksdb_delete_file_in_range(
3326 self.inner.inner(),
3327 from.as_ptr() as *const c_char,
3328 from.len() as size_t,
3329 to.as_ptr() as *const c_char,
3330 to.len() as size_t,
3331 ));
3332 Ok(())
3333 }
3334 }
3335
3336 pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
3338 &self,
3339 cf: &impl AsColumnFamilyRef,
3340 from: K,
3341 to: K,
3342 ) -> Result<(), Error> {
3343 let from = from.as_ref();
3344 let to = to.as_ref();
3345 unsafe {
3346 ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
3347 self.inner.inner(),
3348 cf.inner(),
3349 from.as_ptr() as *const c_char,
3350 from.len() as size_t,
3351 to.as_ptr() as *const c_char,
3352 to.len() as size_t,
3353 ));
3354 Ok(())
3355 }
3356 }
3357
3358 pub fn cancel_all_background_work(&self, wait: bool) {
3360 unsafe {
3361 ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
3362 }
3363 }
3364
3365 fn drop_column_family<C>(
3366 &self,
3367 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
3368 cf: C,
3369 ) -> Result<(), Error> {
3370 unsafe {
3371 ffi_try!(ffi::rocksdb_drop_column_family(
3373 self.inner.inner(),
3374 cf_inner
3375 ));
3376 }
3377 drop(cf);
3380 Ok(())
3381 }
3382
3383 pub fn increase_full_history_ts_low<S: AsRef<[u8]>>(
3388 &self,
3389 cf: &impl AsColumnFamilyRef,
3390 ts: S,
3391 ) -> Result<(), Error> {
3392 let ts = ts.as_ref();
3393 unsafe {
3394 ffi_try!(ffi::rocksdb_increase_full_history_ts_low(
3395 self.inner.inner(),
3396 cf.inner(),
3397 ts.as_ptr() as *const c_char,
3398 ts.len() as size_t,
3399 ));
3400 Ok(())
3401 }
3402 }
3403
3404 pub fn get_full_history_ts_low(&self, cf: &impl AsColumnFamilyRef) -> Result<Vec<u8>, Error> {
3406 unsafe {
3407 let mut ts_lowlen = 0;
3408 let ts = ffi_try!(ffi::rocksdb_get_full_history_ts_low(
3409 self.inner.inner(),
3410 cf.inner(),
3411 &raw mut ts_lowlen,
3412 ));
3413
3414 if ts.is_null() {
3415 Err(Error::new("Could not get full_history_ts_low".to_owned()))
3416 } else {
3417 let mut vec = vec![0; ts_lowlen];
3418 ptr::copy_nonoverlapping(ts as *mut u8, vec.as_mut_ptr(), ts_lowlen);
3419 ffi::rocksdb_free(ts as *mut c_void);
3420 Ok(vec)
3421 }
3422 }
3423 }
3424
3425 pub fn get_db_identity(&self) -> Result<Vec<u8>, Error> {
3427 unsafe {
3428 let mut length: usize = 0;
3429 let identity_ptr = ffi::rocksdb_get_db_identity(self.inner.inner(), &raw mut length);
3430 let identity_vec = raw_data(identity_ptr, length);
3431 ffi::rocksdb_free(identity_ptr as *mut c_void);
3432 identity_vec.ok_or_else(|| Error::new("get_db_identity returned NULL".to_string()))
3435 }
3436 }
3437}
3438
3439impl<I: DBInner> DBCommon<SingleThreaded, I> {
3440 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
3442 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
3443 self.cfs
3444 .cfs
3445 .insert(name.as_ref().to_string(), ColumnFamily { inner });
3446 Ok(())
3447 }
3448
3449 #[doc = include_str!("db_create_column_family_with_import.md")]
3450 pub fn create_column_family_with_import<N: AsRef<str>>(
3451 &mut self,
3452 options: &Options,
3453 column_family_name: N,
3454 import_options: &ImportColumnFamilyOptions,
3455 metadata: &ExportImportFilesMetaData,
3456 ) -> Result<(), Error> {
3457 let name = column_family_name.as_ref();
3458 let c_name = CString::new(name).map_err(|err| {
3459 Error::new(format!(
3460 "Failed to convert name to CString while importing column family: {err}"
3461 ))
3462 })?;
3463 let inner = unsafe {
3464 ffi_try!(ffi::rocksdb_create_column_family_with_import(
3465 self.inner.inner(),
3466 options.inner,
3467 c_name.as_ptr(),
3468 import_options.inner,
3469 metadata.inner
3470 ))
3471 };
3472 self.cfs
3473 .cfs
3474 .insert(column_family_name.as_ref().into(), ColumnFamily { inner });
3475 Ok(())
3476 }
3477
3478 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
3480 match self.cfs.cfs.remove(name) {
3481 Some(cf) => self.drop_column_family(cf.inner, cf),
3482 _ => Err(Error::new(format!("Invalid column family: {name}"))),
3483 }
3484 }
3485
3486 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
3488 self.cfs.cfs.get(name)
3489 }
3490
3491 pub fn cf_names(&self) -> Vec<String> {
3495 self.cfs.cfs.keys().cloned().collect()
3496 }
3497}
3498
3499impl<I: DBInner> DBCommon<MultiThreaded, I> {
3500 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
3502 let mut cfs = self.cfs.cfs.write();
3505 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
3506 cfs.insert(
3507 name.as_ref().to_string(),
3508 Arc::new(UnboundColumnFamily { inner }),
3509 );
3510 Ok(())
3511 }
3512
3513 #[doc = include_str!("db_create_column_family_with_import.md")]
3514 pub fn create_column_family_with_import<N: AsRef<str>>(
3515 &self,
3516 options: &Options,
3517 column_family_name: N,
3518 import_options: &ImportColumnFamilyOptions,
3519 metadata: &ExportImportFilesMetaData,
3520 ) -> Result<(), Error> {
3521 let mut cfs = self.cfs.cfs.write();
3523 let name = column_family_name.as_ref();
3524 let c_name = CString::new(name).map_err(|err| {
3525 Error::new(format!(
3526 "Failed to convert name to CString while importing column family: {err}"
3527 ))
3528 })?;
3529 let inner = unsafe {
3530 ffi_try!(ffi::rocksdb_create_column_family_with_import(
3531 self.inner.inner(),
3532 options.inner,
3533 c_name.as_ptr(),
3534 import_options.inner,
3535 metadata.inner
3536 ))
3537 };
3538 cfs.insert(
3539 column_family_name.as_ref().to_string(),
3540 Arc::new(UnboundColumnFamily { inner }),
3541 );
3542 Ok(())
3543 }
3544
3545 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
3548 match self.cfs.cfs.write().remove(name) {
3549 Some(cf) => self.drop_column_family(cf.inner, cf),
3550 _ => Err(Error::new(format!("Invalid column family: {name}"))),
3551 }
3552 }
3553
3554 pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
3556 self.cfs
3557 .cfs
3558 .read()
3559 .get(name)
3560 .cloned()
3561 .map(UnboundColumnFamily::bound_column_family)
3562 }
3563
3564 pub fn cf_names(&self) -> Vec<String> {
3568 self.cfs.cfs.read().keys().cloned().collect()
3569 }
3570}
3571
3572impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
3573 fn drop(&mut self) {
3574 self.cfs.drop_all_cfs_internal();
3575 }
3576}
3577
3578impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
3579 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3580 write!(f, "RocksDB {{ path: {} }}", self.path().display())
3581 }
3582}
3583
3584#[derive(Debug, Clone)]
3586pub struct ColumnFamilyMetaData {
3587 pub size: u64,
3590 pub name: String,
3592 pub file_count: usize,
3594}
3595
3596#[derive(Debug, Clone)]
3598pub struct LiveFile {
3599 pub column_family_name: String,
3601 pub name: String,
3603 pub directory: String,
3606 pub size: usize,
3608 pub level: i32,
3610 pub start_key: Option<Vec<u8>>,
3612 pub end_key: Option<Vec<u8>>,
3614 pub smallest_seqno: u64,
3615 pub largest_seqno: u64,
3616 pub num_entries: u64,
3618 pub num_deletions: u64,
3620}
3621
3622impl LiveFile {
3623 pub(crate) fn from_rocksdb_livefiles_ptr(
3625 files: *const ffi::rocksdb_livefiles_t,
3626 ) -> Vec<LiveFile> {
3627 unsafe {
3628 let n = ffi::rocksdb_livefiles_count(files);
3629
3630 let mut livefiles = Vec::with_capacity(n as usize);
3631 let mut key_size: usize = 0;
3632
3633 for i in 0..n {
3634 let column_family_name =
3636 from_cstr_without_free(ffi::rocksdb_livefiles_column_family_name(files, i));
3637 let name = from_cstr_without_free(ffi::rocksdb_livefiles_name(files, i));
3638 let directory = from_cstr_without_free(ffi::rocksdb_livefiles_directory(files, i));
3639 let size = ffi::rocksdb_livefiles_size(files, i);
3640 let level = ffi::rocksdb_livefiles_level(files, i);
3641
3642 let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &raw mut key_size);
3644 let smallest_key = raw_data(smallest_key, key_size);
3645
3646 let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &raw mut key_size);
3648 let largest_key = raw_data(largest_key, key_size);
3649
3650 livefiles.push(LiveFile {
3651 column_family_name,
3652 name,
3653 directory,
3654 size,
3655 level,
3656 start_key: smallest_key,
3657 end_key: largest_key,
3658 largest_seqno: ffi::rocksdb_livefiles_largest_seqno(files, i),
3659 smallest_seqno: ffi::rocksdb_livefiles_smallest_seqno(files, i),
3660 num_entries: ffi::rocksdb_livefiles_entries(files, i),
3661 num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
3662 });
3663 }
3664
3665 livefiles
3666 }
3667 }
3668}
3669
3670struct LiveFileGuard(*mut rocksdb_livefile_t);
3671
3672impl LiveFileGuard {
3673 fn into_raw(mut self) -> *mut rocksdb_livefile_t {
3674 let ptr = self.0;
3675 self.0 = ptr::null_mut();
3676 ptr
3677 }
3678}
3679
3680impl Drop for LiveFileGuard {
3681 fn drop(&mut self) {
3682 if !self.0.is_null() {
3683 unsafe {
3684 rocksdb_livefile_destroy(self.0);
3685 }
3686 }
3687 }
3688}
3689
3690struct LiveFilesGuard(*mut rocksdb_livefiles_t);
3691
3692impl LiveFilesGuard {
3693 fn into_raw(mut self) -> *mut rocksdb_livefiles_t {
3694 let ptr = self.0;
3695 self.0 = ptr::null_mut();
3696 ptr
3697 }
3698}
3699
3700impl Drop for LiveFilesGuard {
3701 fn drop(&mut self) {
3702 if !self.0.is_null() {
3703 unsafe {
3704 rocksdb_livefiles_destroy(self.0);
3705 }
3706 }
3707 }
3708}
3709
3710#[derive(Debug)]
3715pub struct ExportImportFilesMetaData {
3716 pub(crate) inner: *mut ffi::rocksdb_export_import_files_metadata_t,
3717}
3718
3719impl ExportImportFilesMetaData {
3720 pub fn get_db_comparator_name(&self) -> String {
3721 unsafe {
3722 let c_name =
3723 ffi::rocksdb_export_import_files_metadata_get_db_comparator_name(self.inner);
3724 from_cstr_and_free(c_name)
3725 }
3726 }
3727
3728 pub fn set_db_comparator_name(&mut self, name: &str) {
3729 let c_name = CString::new(name.as_bytes()).unwrap();
3730 unsafe {
3731 ffi::rocksdb_export_import_files_metadata_set_db_comparator_name(
3732 self.inner,
3733 c_name.as_ptr(),
3734 );
3735 };
3736 }
3737
3738 pub fn get_files(&self) -> Vec<LiveFile> {
3739 unsafe {
3740 let livefiles_ptr = ffi::rocksdb_export_import_files_metadata_get_files(self.inner);
3741 let files = LiveFile::from_rocksdb_livefiles_ptr(livefiles_ptr);
3742 ffi::rocksdb_livefiles_destroy(livefiles_ptr);
3743 files
3744 }
3745 }
3746
3747 pub fn set_files(&mut self, files: &[LiveFile]) -> Result<(), Error> {
3748 static EMPTY: [u8; 0] = [];
3750 let empty_ptr = EMPTY.as_ptr() as *const libc::c_char;
3751
3752 unsafe {
3753 let live_files = LiveFilesGuard(ffi::rocksdb_livefiles_create());
3754
3755 for file in files {
3756 let live_file = LiveFileGuard(ffi::rocksdb_livefile_create());
3757 ffi::rocksdb_livefile_set_level(live_file.0, file.level);
3758
3759 let c_cf_name = CString::new(file.column_family_name.as_str()).map_err(|err| {
3761 Error::new(format!("Unable to convert column family to CString: {err}"))
3762 })?;
3763 ffi::rocksdb_livefile_set_column_family_name(live_file.0, c_cf_name.as_ptr());
3764
3765 let c_name = CString::new(file.name.as_str()).map_err(|err| {
3766 Error::new(format!("Unable to convert file name to CString: {err}"))
3767 })?;
3768 ffi::rocksdb_livefile_set_name(live_file.0, c_name.as_ptr());
3769
3770 let c_directory = CString::new(file.directory.as_str()).map_err(|err| {
3771 Error::new(format!("Unable to convert directory to CString: {err}"))
3772 })?;
3773 ffi::rocksdb_livefile_set_directory(live_file.0, c_directory.as_ptr());
3774
3775 ffi::rocksdb_livefile_set_size(live_file.0, file.size);
3776
3777 let (start_key_ptr, start_key_len) = match &file.start_key {
3778 None => (empty_ptr, 0),
3779 Some(key) => (key.as_ptr() as *const libc::c_char, key.len()),
3780 };
3781 ffi::rocksdb_livefile_set_smallest_key(live_file.0, start_key_ptr, start_key_len);
3782
3783 let (largest_key_ptr, largest_key_len) = match &file.end_key {
3784 None => (empty_ptr, 0),
3785 Some(key) => (key.as_ptr() as *const libc::c_char, key.len()),
3786 };
3787 ffi::rocksdb_livefile_set_largest_key(
3788 live_file.0,
3789 largest_key_ptr,
3790 largest_key_len,
3791 );
3792 ffi::rocksdb_livefile_set_smallest_seqno(live_file.0, file.smallest_seqno);
3793 ffi::rocksdb_livefile_set_largest_seqno(live_file.0, file.largest_seqno);
3794 ffi::rocksdb_livefile_set_num_entries(live_file.0, file.num_entries);
3795 ffi::rocksdb_livefile_set_num_deletions(live_file.0, file.num_deletions);
3796
3797 ffi::rocksdb_livefiles_add(live_files.0, live_file.into_raw());
3799 }
3800
3801 ffi::rocksdb_export_import_files_metadata_set_files(self.inner, live_files.into_raw());
3803 Ok(())
3804 }
3805 }
3806}
3807
3808impl Default for ExportImportFilesMetaData {
3809 fn default() -> Self {
3810 let inner = unsafe { ffi::rocksdb_export_import_files_metadata_create() };
3811 assert!(
3812 !inner.is_null(),
3813 "Could not create rocksdb_export_import_files_metadata_t"
3814 );
3815
3816 Self { inner }
3817 }
3818}
3819
3820impl Drop for ExportImportFilesMetaData {
3821 fn drop(&mut self) {
3822 unsafe {
3823 ffi::rocksdb_export_import_files_metadata_destroy(self.inner);
3824 }
3825 }
3826}
3827
3828unsafe impl Send for ExportImportFilesMetaData {}
3829unsafe impl Sync for ExportImportFilesMetaData {}
3830
3831fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
3832 opts.iter()
3833 .map(|(name, value)| {
3834 let cname = match CString::new(name.as_bytes()) {
3835 Ok(cname) => cname,
3836 Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
3837 };
3838 let cvalue = match CString::new(value.as_bytes()) {
3839 Ok(cvalue) => cvalue,
3840 Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
3841 };
3842 Ok((cname, cvalue))
3843 })
3844 .collect()
3845}
3846
3847pub(crate) fn convert_values(
3848 values: Vec<*mut c_char>,
3849 values_sizes: Vec<usize>,
3850 errors: Vec<*mut c_char>,
3851) -> Vec<Result<Option<Vec<u8>>, Error>> {
3852 values
3853 .into_iter()
3854 .zip(values_sizes)
3855 .zip(errors)
3856 .map(|((v, s), e)| {
3857 if e.is_null() {
3858 let value = unsafe { crate::ffi_util::raw_data(v, s) };
3859 unsafe {
3860 ffi::rocksdb_free(v as *mut c_void);
3861 }
3862 Ok(value)
3863 } else {
3864 Err(convert_rocksdb_error(e))
3865 }
3866 })
3867 .collect()
3868}