1use std::cell::RefCell;
17use std::collections::{BTreeMap, HashMap};
18use std::ffi::{CStr, CString};
19use std::fmt;
20use std::fs;
21use std::iter;
22use std::path::Path;
23use std::path::PathBuf;
24use std::ptr;
25use std::slice;
26use std::str;
27use std::sync::Arc;
28use std::time::Duration;
29
30use crate::column_family::ColumnFamilyTtl;
31use crate::ffi_util::CSlice;
32use crate::{
33 ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
34 DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, DEFAULT_COLUMN_FAMILY_NAME,
35 Direction, Error, FlushOptions, IngestExternalFileOptions, IteratorMode, Options, ReadOptions,
36 SnapshotWithThreadMode, WaitForCompactOptions, WriteBatch, WriteBatchWithIndex, WriteOptions,
37 column_family::{AsColumnFamilyRef, BoundColumnFamily, UnboundColumnFamily},
38 db_options::{ImportColumnFamilyOptions, OptionsMustOutliveDB},
39 ffi,
40 ffi_util::{
41 CStrLike, convert_rocksdb_error, from_cstr_and_free, from_cstr_without_free,
42 opt_bytes_to_ptr, raw_data, to_cpath,
43 },
44};
45use rust_librocksdb_sys::{
46 rocksdb_livefile_destroy, rocksdb_livefile_t, rocksdb_livefiles_destroy, rocksdb_livefiles_t,
47};
48
49use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
50use parking_lot::RwLock;
51
52thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
59thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
60thread_local! { static DEFAULT_FLUSH_OPTS: FlushOptions = FlushOptions::default(); }
61thread_local! { static PREFIX_READ_OPTS: RefCell<ReadOptions> = RefCell::new({ let mut o = ReadOptions::default(); o.set_prefix_same_as_start(true); o }); }
63
64pub struct Range<'a> {
68 start_key: &'a [u8],
69 end_key: &'a [u8],
70}
71
72impl<'a> Range<'a> {
73 pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
74 Range { start_key, end_key }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
117pub enum GetIntoBufferResult {
118 NotFound,
120 Found(usize),
123 BufferTooSmall(usize),
129}
130
131impl GetIntoBufferResult {
132 #[inline]
134 pub fn is_found(&self) -> bool {
135 matches!(self, Self::Found(_) | Self::BufferTooSmall(_))
136 }
137
138 #[inline]
140 pub fn is_not_found(&self) -> bool {
141 matches!(self, Self::NotFound)
142 }
143
144 #[inline]
146 pub fn value_size(&self) -> Option<usize> {
147 match self {
148 Self::Found(size) | Self::BufferTooSmall(size) => Some(*size),
149 Self::NotFound => None,
150 }
151 }
152}
153
154pub struct PrefixProber<'a, D: DBAccess> {
158 raw: DBRawIteratorWithThreadMode<'a, D>,
159}
160
161impl<D: DBAccess> PrefixProber<'_, D> {
162 pub fn exists(&mut self, prefix: &[u8]) -> Result<bool, Error> {
165 self.raw.seek(prefix);
166 if self.raw.valid()
167 && let Some(k) = self.raw.key()
168 {
169 return Ok(k.starts_with(prefix));
170 }
171 self.raw.status()?;
172 Ok(false)
173 }
174}
175
176pub trait ThreadMode {
187 fn new_cf_map_internal(
189 cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
190 ) -> Self;
191 fn drop_all_cfs_internal(&mut self);
193}
194
195pub struct SingleThreaded {
202 pub(crate) cfs: HashMap<String, ColumnFamily>,
203}
204
205pub struct MultiThreaded {
211 pub(crate) cfs: RwLock<HashMap<String, Arc<UnboundColumnFamily>>>,
212}
213
214impl ThreadMode for SingleThreaded {
215 fn new_cf_map_internal(
216 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
217 ) -> Self {
218 Self {
219 cfs: cfs
220 .into_iter()
221 .map(|(n, c)| (n, ColumnFamily { inner: c }))
222 .collect(),
223 }
224 }
225
226 fn drop_all_cfs_internal(&mut self) {
227 self.cfs.clear();
229 }
230}
231
232impl ThreadMode for MultiThreaded {
233 fn new_cf_map_internal(
234 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
235 ) -> Self {
236 Self {
237 cfs: RwLock::new(
238 cfs.into_iter()
239 .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
240 .collect(),
241 ),
242 }
243 }
244
245 fn drop_all_cfs_internal(&mut self) {
246 self.cfs.write().clear();
248 }
249}
250
251pub trait DBInner {
253 fn inner(&self) -> *mut ffi::rocksdb_t;
254}
255
256pub struct DBCommon<T: ThreadMode, D: DBInner> {
277 pub(crate) inner: D,
278 cfs: T, path: PathBuf,
280 _outlive: Vec<OptionsMustOutliveDB>,
281}
282
283pub trait DBAccess {
286 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
287
288 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
289
290 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
291
292 unsafe fn create_iterator_cf(
293 &self,
294 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
295 readopts: &ReadOptions,
296 ) -> *mut ffi::rocksdb_iterator_t;
297
298 fn get_opt<K: AsRef<[u8]>>(
299 &self,
300 key: K,
301 readopts: &ReadOptions,
302 ) -> Result<Option<Vec<u8>>, Error>;
303
304 fn get_cf_opt<K: AsRef<[u8]>>(
305 &self,
306 cf: &impl AsColumnFamilyRef,
307 key: K,
308 readopts: &ReadOptions,
309 ) -> Result<Option<Vec<u8>>, Error>;
310
311 fn get_pinned_opt<K: AsRef<[u8]>>(
312 &'_ self,
313 key: K,
314 readopts: &ReadOptions,
315 ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
316
317 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
318 &'_ self,
319 cf: &impl AsColumnFamilyRef,
320 key: K,
321 readopts: &ReadOptions,
322 ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
323
324 fn multi_get_opt<K, I>(
325 &self,
326 keys: I,
327 readopts: &ReadOptions,
328 ) -> Vec<Result<Option<Vec<u8>>, Error>>
329 where
330 K: AsRef<[u8]>,
331 I: IntoIterator<Item = K>;
332
333 fn multi_get_cf_opt<'b, K, I, W>(
334 &self,
335 keys_cf: I,
336 readopts: &ReadOptions,
337 ) -> Vec<Result<Option<Vec<u8>>, Error>>
338 where
339 K: AsRef<[u8]>,
340 I: IntoIterator<Item = (&'b W, K)>,
341 W: AsColumnFamilyRef + 'b;
342}
343
344impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
345 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
346 unsafe { ffi::rocksdb_create_snapshot(self.inner.inner()) }
347 }
348
349 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
350 unsafe {
351 ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
352 }
353 }
354
355 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
356 unsafe { ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner) }
357 }
358
359 unsafe fn create_iterator_cf(
360 &self,
361 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
362 readopts: &ReadOptions,
363 ) -> *mut ffi::rocksdb_iterator_t {
364 unsafe { ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle) }
365 }
366
367 fn get_opt<K: AsRef<[u8]>>(
368 &self,
369 key: K,
370 readopts: &ReadOptions,
371 ) -> Result<Option<Vec<u8>>, Error> {
372 self.get_opt(key, readopts)
373 }
374
375 fn get_cf_opt<K: AsRef<[u8]>>(
376 &self,
377 cf: &impl AsColumnFamilyRef,
378 key: K,
379 readopts: &ReadOptions,
380 ) -> Result<Option<Vec<u8>>, Error> {
381 self.get_cf_opt(cf, key, readopts)
382 }
383
384 fn get_pinned_opt<K: AsRef<[u8]>>(
385 &'_ self,
386 key: K,
387 readopts: &ReadOptions,
388 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
389 self.get_pinned_opt(key, readopts)
390 }
391
392 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
393 &'_ self,
394 cf: &impl AsColumnFamilyRef,
395 key: K,
396 readopts: &ReadOptions,
397 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
398 self.get_pinned_cf_opt(cf, key, readopts)
399 }
400
401 fn multi_get_opt<K, Iter>(
402 &self,
403 keys: Iter,
404 readopts: &ReadOptions,
405 ) -> Vec<Result<Option<Vec<u8>>, Error>>
406 where
407 K: AsRef<[u8]>,
408 Iter: IntoIterator<Item = K>,
409 {
410 self.multi_get_opt(keys, readopts)
411 }
412
413 fn multi_get_cf_opt<'b, K, Iter, W>(
414 &self,
415 keys_cf: Iter,
416 readopts: &ReadOptions,
417 ) -> Vec<Result<Option<Vec<u8>>, Error>>
418 where
419 K: AsRef<[u8]>,
420 Iter: IntoIterator<Item = (&'b W, K)>,
421 W: AsColumnFamilyRef + 'b,
422 {
423 self.multi_get_cf_opt(keys_cf, readopts)
424 }
425}
426
427pub struct DBWithThreadModeInner {
428 inner: *mut ffi::rocksdb_t,
429}
430
431impl DBInner for DBWithThreadModeInner {
432 fn inner(&self) -> *mut ffi::rocksdb_t {
433 self.inner
434 }
435}
436
437impl Drop for DBWithThreadModeInner {
438 fn drop(&mut self) {
439 unsafe {
440 ffi::rocksdb_close(self.inner);
441 }
442 }
443}
444
445pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
450
451#[cfg(not(feature = "multi-threaded-cf"))]
474pub type DB = DBWithThreadMode<SingleThreaded>;
475
476#[cfg(feature = "multi-threaded-cf")]
477pub type DB = DBWithThreadMode<MultiThreaded>;
478
479unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
483
484unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
487
488enum AccessType<'a> {
490 ReadWrite,
491 ReadOnly { error_if_log_file_exist: bool },
492 Secondary { secondary_path: &'a Path },
493 WithTTL { ttl: Duration },
494}
495
496impl<T: ThreadMode> DBWithThreadMode<T> {
498 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
500 let mut opts = Options::default();
501 opts.create_if_missing(true);
502 Self::open(&opts, path)
503 }
504
505 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
507 Self::open_cf(opts, path, None::<&str>)
508 }
509
510 pub fn open_for_read_only<P: AsRef<Path>>(
512 opts: &Options,
513 path: P,
514 error_if_log_file_exist: bool,
515 ) -> Result<Self, Error> {
516 Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
517 }
518
519 pub fn open_as_secondary<P: AsRef<Path>>(
521 opts: &Options,
522 primary_path: P,
523 secondary_path: P,
524 ) -> Result<Self, Error> {
525 Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
526 }
527
528 pub fn open_with_ttl<P: AsRef<Path>>(
533 opts: &Options,
534 path: P,
535 ttl: Duration,
536 ) -> Result<Self, Error> {
537 Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
538 }
539
540 pub fn open_cf_with_ttl<P, I, N>(
544 opts: &Options,
545 path: P,
546 cfs: I,
547 ttl: Duration,
548 ) -> Result<Self, Error>
549 where
550 P: AsRef<Path>,
551 I: IntoIterator<Item = N>,
552 N: AsRef<str>,
553 {
554 let cfs = cfs
555 .into_iter()
556 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
557
558 Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
559 }
560
561 pub fn open_cf_descriptors_with_ttl<P, I>(
575 opts: &Options,
576 path: P,
577 cfs: I,
578 ttl: Duration,
579 ) -> Result<Self, Error>
580 where
581 P: AsRef<Path>,
582 I: IntoIterator<Item = ColumnFamilyDescriptor>,
583 {
584 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
585 }
586
587 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
591 where
592 P: AsRef<Path>,
593 I: IntoIterator<Item = N>,
594 N: AsRef<str>,
595 {
596 let cfs = cfs
597 .into_iter()
598 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
599
600 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
601 }
602
603 pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
607 where
608 P: AsRef<Path>,
609 I: IntoIterator<Item = (N, Options)>,
610 N: AsRef<str>,
611 {
612 let cfs = cfs
613 .into_iter()
614 .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
615
616 Self::open_cf_descriptors(opts, path, cfs)
617 }
618
619 pub fn open_cf_for_read_only<P, I, N>(
623 opts: &Options,
624 path: P,
625 cfs: I,
626 error_if_log_file_exist: bool,
627 ) -> Result<Self, Error>
628 where
629 P: AsRef<Path>,
630 I: IntoIterator<Item = N>,
631 N: AsRef<str>,
632 {
633 let cfs = cfs
634 .into_iter()
635 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
636
637 Self::open_cf_descriptors_internal(
638 opts,
639 path,
640 cfs,
641 &AccessType::ReadOnly {
642 error_if_log_file_exist,
643 },
644 )
645 }
646
647 pub fn open_cf_with_opts_for_read_only<P, I, N>(
651 db_opts: &Options,
652 path: P,
653 cfs: I,
654 error_if_log_file_exist: bool,
655 ) -> Result<Self, Error>
656 where
657 P: AsRef<Path>,
658 I: IntoIterator<Item = (N, Options)>,
659 N: AsRef<str>,
660 {
661 let cfs = cfs
662 .into_iter()
663 .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
664
665 Self::open_cf_descriptors_internal(
666 db_opts,
667 path,
668 cfs,
669 &AccessType::ReadOnly {
670 error_if_log_file_exist,
671 },
672 )
673 }
674
675 pub fn open_cf_descriptors_read_only<P, I>(
680 opts: &Options,
681 path: P,
682 cfs: I,
683 error_if_log_file_exist: bool,
684 ) -> Result<Self, Error>
685 where
686 P: AsRef<Path>,
687 I: IntoIterator<Item = ColumnFamilyDescriptor>,
688 {
689 Self::open_cf_descriptors_internal(
690 opts,
691 path,
692 cfs,
693 &AccessType::ReadOnly {
694 error_if_log_file_exist,
695 },
696 )
697 }
698
699 pub fn open_cf_as_secondary<P, I, N>(
703 opts: &Options,
704 primary_path: P,
705 secondary_path: P,
706 cfs: I,
707 ) -> Result<Self, Error>
708 where
709 P: AsRef<Path>,
710 I: IntoIterator<Item = N>,
711 N: AsRef<str>,
712 {
713 let cfs = cfs
714 .into_iter()
715 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
716
717 Self::open_cf_descriptors_internal(
718 opts,
719 primary_path,
720 cfs,
721 &AccessType::Secondary {
722 secondary_path: secondary_path.as_ref(),
723 },
724 )
725 }
726
727 pub fn open_cf_descriptors_as_secondary<P, I>(
732 opts: &Options,
733 path: P,
734 secondary_path: P,
735 cfs: I,
736 ) -> Result<Self, Error>
737 where
738 P: AsRef<Path>,
739 I: IntoIterator<Item = ColumnFamilyDescriptor>,
740 {
741 Self::open_cf_descriptors_internal(
742 opts,
743 path,
744 cfs,
745 &AccessType::Secondary {
746 secondary_path: secondary_path.as_ref(),
747 },
748 )
749 }
750
751 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
755 where
756 P: AsRef<Path>,
757 I: IntoIterator<Item = ColumnFamilyDescriptor>,
758 {
759 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
760 }
761
762 fn open_cf_descriptors_internal<P, I>(
764 opts: &Options,
765 path: P,
766 cfs: I,
767 access_type: &AccessType,
768 ) -> Result<Self, Error>
769 where
770 P: AsRef<Path>,
771 I: IntoIterator<Item = ColumnFamilyDescriptor>,
772 {
773 let cfs: Vec<_> = cfs.into_iter().collect();
774 let outlive = iter::once(opts.outlive.clone())
775 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
776 .collect();
777
778 let cpath = to_cpath(&path)?;
779
780 if let Err(e) = fs::create_dir_all(&path) {
781 return Err(Error::new(format!(
782 "Failed to create RocksDB directory: `{e:?}`."
783 )));
784 }
785
786 let db: *mut ffi::rocksdb_t;
787 let mut cf_map = BTreeMap::new();
788
789 if cfs.is_empty() {
790 db = Self::open_raw(opts, &cpath, access_type)?;
791 } else {
792 let mut cfs_v = cfs;
793 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
795 cfs_v.push(ColumnFamilyDescriptor {
796 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
797 options: Options::default(),
798 ttl: ColumnFamilyTtl::SameAsDb,
799 });
800 }
801 let c_cfs: Vec<CString> = cfs_v
804 .iter()
805 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
806 .collect();
807
808 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
809
810 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
812
813 let cfopts: Vec<_> = cfs_v
814 .iter()
815 .map(|cf| cf.options.inner.cast_const())
816 .collect();
817
818 db = Self::open_cf_raw(
819 opts,
820 &cpath,
821 &cfs_v,
822 &cfnames,
823 &cfopts,
824 &mut cfhandles,
825 access_type,
826 )?;
827 for handle in &cfhandles {
828 if handle.is_null() {
829 return Err(Error::new(
830 "Received null column family handle from DB.".to_owned(),
831 ));
832 }
833 }
834
835 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
836 cf_map.insert(cf_desc.name.clone(), inner);
837 }
838 }
839
840 if db.is_null() {
841 return Err(Error::new("Could not initialize database.".to_owned()));
842 }
843
844 Ok(Self {
845 inner: DBWithThreadModeInner { inner: db },
846 path: path.as_ref().to_path_buf(),
847 cfs: T::new_cf_map_internal(cf_map),
848 _outlive: outlive,
849 })
850 }
851
852 fn open_raw(
853 opts: &Options,
854 cpath: &CString,
855 access_type: &AccessType,
856 ) -> Result<*mut ffi::rocksdb_t, Error> {
857 let db = unsafe {
858 match *access_type {
859 AccessType::ReadOnly {
860 error_if_log_file_exist,
861 } => ffi_try!(ffi::rocksdb_open_for_read_only(
862 opts.inner,
863 cpath.as_ptr(),
864 c_uchar::from(error_if_log_file_exist),
865 )),
866 AccessType::ReadWrite => {
867 ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
868 }
869 AccessType::Secondary { secondary_path } => {
870 ffi_try!(ffi::rocksdb_open_as_secondary(
871 opts.inner,
872 cpath.as_ptr(),
873 to_cpath(secondary_path)?.as_ptr(),
874 ))
875 }
876 AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
877 opts.inner,
878 cpath.as_ptr(),
879 ttl.as_secs() as c_int,
880 )),
881 }
882 };
883 Ok(db)
884 }
885
886 #[allow(clippy::pedantic)]
887 fn open_cf_raw(
888 opts: &Options,
889 cpath: &CString,
890 cfs_v: &[ColumnFamilyDescriptor],
891 cfnames: &[*const c_char],
892 cfopts: &[*const ffi::rocksdb_options_t],
893 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
894 access_type: &AccessType,
895 ) -> Result<*mut ffi::rocksdb_t, Error> {
896 let db = unsafe {
897 match *access_type {
898 AccessType::ReadOnly {
899 error_if_log_file_exist,
900 } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
901 opts.inner,
902 cpath.as_ptr(),
903 cfs_v.len() as c_int,
904 cfnames.as_ptr(),
905 cfopts.as_ptr(),
906 cfhandles.as_mut_ptr(),
907 c_uchar::from(error_if_log_file_exist),
908 )),
909 AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
910 opts.inner,
911 cpath.as_ptr(),
912 cfs_v.len() as c_int,
913 cfnames.as_ptr(),
914 cfopts.as_ptr(),
915 cfhandles.as_mut_ptr(),
916 )),
917 AccessType::Secondary { secondary_path } => {
918 ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
919 opts.inner,
920 cpath.as_ptr(),
921 to_cpath(secondary_path)?.as_ptr(),
922 cfs_v.len() as c_int,
923 cfnames.as_ptr(),
924 cfopts.as_ptr(),
925 cfhandles.as_mut_ptr(),
926 ))
927 }
928 AccessType::WithTTL { ttl } => {
929 let ttls: Vec<_> = cfs_v
930 .iter()
931 .map(|cf| match cf.ttl {
932 ColumnFamilyTtl::Disabled => i32::MAX,
933 ColumnFamilyTtl::Duration(duration) => duration.as_secs() as i32,
934 ColumnFamilyTtl::SameAsDb => ttl.as_secs() as i32,
935 })
936 .collect();
937
938 ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
939 opts.inner,
940 cpath.as_ptr(),
941 cfs_v.len() as c_int,
942 cfnames.as_ptr(),
943 cfopts.as_ptr(),
944 cfhandles.as_mut_ptr(),
945 ttls.as_ptr(),
946 ))
947 }
948 }
949 };
950 Ok(db)
951 }
952
953 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
955 &self,
956 cf: &impl AsColumnFamilyRef,
957 from: K,
958 to: K,
959 writeopts: &WriteOptions,
960 ) -> Result<(), Error> {
961 let from = from.as_ref();
962 let to = to.as_ref();
963
964 unsafe {
965 ffi_try!(ffi::rocksdb_delete_range_cf(
966 self.inner.inner(),
967 writeopts.inner,
968 cf.inner(),
969 from.as_ptr() as *const c_char,
970 from.len() as size_t,
971 to.as_ptr() as *const c_char,
972 to.len() as size_t,
973 ));
974 Ok(())
975 }
976 }
977
978 pub fn delete_range_cf<K: AsRef<[u8]>>(
980 &self,
981 cf: &impl AsColumnFamilyRef,
982 from: K,
983 to: K,
984 ) -> Result<(), Error> {
985 DEFAULT_WRITE_OPTS.with(|opts| self.delete_range_cf_opt(cf, from, to, opts))
986 }
987
988 pub fn write_opt(&self, batch: &WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
989 unsafe {
990 ffi_try!(ffi::rocksdb_write(
991 self.inner.inner(),
992 writeopts.inner,
993 batch.inner
994 ));
995 }
996 Ok(())
997 }
998
999 pub fn write(&self, batch: &WriteBatch) -> Result<(), Error> {
1000 DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
1001 }
1002
1003 pub fn write_without_wal(&self, batch: &WriteBatch) -> Result<(), Error> {
1004 let mut wo = WriteOptions::new();
1005 wo.disable_wal(true);
1006 self.write_opt(batch, &wo)
1007 }
1008
1009 pub fn write_wbwi(&self, wbwi: &WriteBatchWithIndex) -> Result<(), Error> {
1010 DEFAULT_WRITE_OPTS.with(|opts| self.write_wbwi_opt(wbwi, opts))
1011 }
1012
1013 pub fn write_wbwi_opt(
1014 &self,
1015 wbwi: &WriteBatchWithIndex,
1016 writeopts: &WriteOptions,
1017 ) -> Result<(), Error> {
1018 unsafe {
1019 ffi_try!(ffi::rocksdb_write_writebatch_wi(
1020 self.inner.inner(),
1021 writeopts.inner,
1022 wbwi.inner
1023 ));
1024
1025 Ok(())
1026 }
1027 }
1028
1029 pub fn disable_file_deletions(&self) -> Result<(), Error> {
1034 unsafe {
1035 ffi_try!(ffi::rocksdb_disable_file_deletions(self.inner.inner()));
1036 }
1037 Ok(())
1038 }
1039
1040 pub fn enable_file_deletions(&self) -> Result<(), Error> {
1052 unsafe {
1053 ffi_try!(ffi::rocksdb_enable_file_deletions(self.inner.inner()));
1054 }
1055 Ok(())
1056 }
1057}
1058
1059impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
1061 pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
1062 Self {
1063 inner,
1064 cfs,
1065 path,
1066 _outlive: outlive,
1067 }
1068 }
1069
1070 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
1071 let cpath = to_cpath(path)?;
1072 let mut length = 0;
1073
1074 unsafe {
1075 let ptr = ffi_try!(ffi::rocksdb_list_column_families(
1076 opts.inner,
1077 cpath.as_ptr(),
1078 &raw mut length,
1079 ));
1080
1081 let vec = slice::from_raw_parts(ptr, length)
1082 .iter()
1083 .map(|ptr| from_cstr_without_free(*ptr))
1084 .collect();
1085 ffi::rocksdb_list_column_families_destroy(ptr, length);
1086 Ok(vec)
1087 }
1088 }
1089
1090 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
1091 let cpath = to_cpath(path)?;
1092 unsafe {
1093 ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
1094 }
1095 Ok(())
1096 }
1097
1098 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
1099 let cpath = to_cpath(path)?;
1100 unsafe {
1101 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
1102 }
1103 Ok(())
1104 }
1105
1106 pub fn path(&self) -> &Path {
1107 self.path.as_path()
1108 }
1109
1110 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
1113 unsafe {
1114 ffi_try!(ffi::rocksdb_flush_wal(
1115 self.inner.inner(),
1116 c_uchar::from(sync)
1117 ));
1118 }
1119 Ok(())
1120 }
1121
1122 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
1124 unsafe {
1125 ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
1126 }
1127 Ok(())
1128 }
1129
1130 pub fn flush(&self) -> Result<(), Error> {
1132 self.flush_opt(&FlushOptions::default())
1133 }
1134
1135 pub fn flush_cf_opt(
1137 &self,
1138 cf: &impl AsColumnFamilyRef,
1139 flushopts: &FlushOptions,
1140 ) -> Result<(), Error> {
1141 unsafe {
1142 ffi_try!(ffi::rocksdb_flush_cf(
1143 self.inner.inner(),
1144 flushopts.inner,
1145 cf.inner()
1146 ));
1147 }
1148 Ok(())
1149 }
1150
1151 pub fn flush_cfs_opt(
1157 &self,
1158 cfs: &[&impl AsColumnFamilyRef],
1159 opts: &FlushOptions,
1160 ) -> Result<(), Error> {
1161 let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
1162 unsafe {
1163 ffi_try!(ffi::rocksdb_flush_cfs(
1164 self.inner.inner(),
1165 opts.inner,
1166 cfs.as_mut_ptr(),
1167 cfs.len() as libc::c_int,
1168 ));
1169 }
1170 Ok(())
1171 }
1172
1173 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
1176 DEFAULT_FLUSH_OPTS.with(|opts| self.flush_cf_opt(cf, opts))
1177 }
1178
1179 pub fn get_opt<K: AsRef<[u8]>>(
1183 &self,
1184 key: K,
1185 readopts: &ReadOptions,
1186 ) -> Result<Option<Vec<u8>>, Error> {
1187 self.get_pinned_opt(key, readopts)
1188 .map(|x| x.map(|v| v.as_ref().to_vec()))
1189 }
1190
1191 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
1195 DEFAULT_READ_OPTS.with(|opts| self.get_opt(key.as_ref(), opts))
1196 }
1197
1198 pub fn get_cf_opt<K: AsRef<[u8]>>(
1202 &self,
1203 cf: &impl AsColumnFamilyRef,
1204 key: K,
1205 readopts: &ReadOptions,
1206 ) -> Result<Option<Vec<u8>>, Error> {
1207 self.get_pinned_cf_opt(cf, key, readopts)
1208 .map(|x| x.map(|v| v.as_ref().to_vec()))
1209 }
1210
1211 pub fn get_cf<K: AsRef<[u8]>>(
1215 &self,
1216 cf: &impl AsColumnFamilyRef,
1217 key: K,
1218 ) -> Result<Option<Vec<u8>>, Error> {
1219 DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key.as_ref(), opts))
1220 }
1221
1222 pub fn get_pinned_opt<K: AsRef<[u8]>>(
1225 &'_ self,
1226 key: K,
1227 readopts: &ReadOptions,
1228 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1229 if readopts.inner.is_null() {
1230 return Err(Error::new(
1231 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1232 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1233 .to_owned(),
1234 ));
1235 }
1236
1237 let key = key.as_ref();
1238 unsafe {
1239 let val = ffi_try!(ffi::rocksdb_get_pinned(
1240 self.inner.inner(),
1241 readopts.inner,
1242 key.as_ptr() as *const c_char,
1243 key.len() as size_t,
1244 ));
1245 if val.is_null() {
1246 Ok(None)
1247 } else {
1248 Ok(Some(DBPinnableSlice::from_c(val)))
1249 }
1250 }
1251 }
1252
1253 pub fn get_pinned<K: AsRef<[u8]>>(
1257 &'_ self,
1258 key: K,
1259 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1260 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
1261 }
1262
1263 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1267 &'_ self,
1268 cf: &impl AsColumnFamilyRef,
1269 key: K,
1270 readopts: &ReadOptions,
1271 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1272 if readopts.inner.is_null() {
1273 return Err(Error::new(
1274 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1275 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1276 .to_owned(),
1277 ));
1278 }
1279
1280 let key = key.as_ref();
1281 unsafe {
1282 let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1283 self.inner.inner(),
1284 readopts.inner,
1285 cf.inner(),
1286 key.as_ptr() as *const c_char,
1287 key.len() as size_t,
1288 ));
1289 if val.is_null() {
1290 Ok(None)
1291 } else {
1292 Ok(Some(DBPinnableSlice::from_c(val)))
1293 }
1294 }
1295 }
1296
1297 pub fn get_pinned_cf<K: AsRef<[u8]>>(
1301 &'_ self,
1302 cf: &impl AsColumnFamilyRef,
1303 key: K,
1304 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1305 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
1306 }
1307
1308 pub fn get_into_buffer<K: AsRef<[u8]>>(
1378 &self,
1379 key: K,
1380 buffer: &mut [u8],
1381 ) -> Result<GetIntoBufferResult, Error> {
1382 DEFAULT_READ_OPTS.with(|opts| self.get_into_buffer_opt(key, buffer, opts))
1383 }
1384
1385 pub fn get_into_buffer_opt<K: AsRef<[u8]>>(
1392 &self,
1393 key: K,
1394 buffer: &mut [u8],
1395 readopts: &ReadOptions,
1396 ) -> Result<GetIntoBufferResult, Error> {
1397 if readopts.inner.is_null() {
1398 return Err(Error::new(
1399 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1400 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1401 .to_owned(),
1402 ));
1403 }
1404
1405 let key = key.as_ref();
1406 let mut val_len: size_t = 0;
1407 let mut found: c_uchar = 0;
1408
1409 unsafe {
1410 let success = ffi_try!(ffi::rocksdb_get_into_buffer(
1411 self.inner.inner(),
1412 readopts.inner,
1413 key.as_ptr() as *const c_char,
1414 key.len() as size_t,
1415 buffer.as_mut_ptr() as *mut c_char,
1416 buffer.len() as size_t,
1417 &raw mut val_len,
1418 &raw mut found,
1419 ));
1420
1421 if found == 0 {
1422 Ok(GetIntoBufferResult::NotFound)
1423 } else if success != 0 {
1424 Ok(GetIntoBufferResult::Found(val_len))
1425 } else {
1426 Ok(GetIntoBufferResult::BufferTooSmall(val_len))
1427 }
1428 }
1429 }
1430
1431 pub fn get_into_buffer_cf<K: AsRef<[u8]>>(
1442 &self,
1443 cf: &impl AsColumnFamilyRef,
1444 key: K,
1445 buffer: &mut [u8],
1446 ) -> Result<GetIntoBufferResult, Error> {
1447 DEFAULT_READ_OPTS.with(|opts| self.get_into_buffer_cf_opt(cf, key, buffer, opts))
1448 }
1449
1450 pub fn get_into_buffer_cf_opt<K: AsRef<[u8]>>(
1456 &self,
1457 cf: &impl AsColumnFamilyRef,
1458 key: K,
1459 buffer: &mut [u8],
1460 readopts: &ReadOptions,
1461 ) -> Result<GetIntoBufferResult, Error> {
1462 if readopts.inner.is_null() {
1463 return Err(Error::new(
1464 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1465 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1466 .to_owned(),
1467 ));
1468 }
1469
1470 let key = key.as_ref();
1471 let mut val_len: size_t = 0;
1472 let mut found: c_uchar = 0;
1473
1474 unsafe {
1475 let success = ffi_try!(ffi::rocksdb_get_into_buffer_cf(
1476 self.inner.inner(),
1477 readopts.inner,
1478 cf.inner(),
1479 key.as_ptr() as *const c_char,
1480 key.len() as size_t,
1481 buffer.as_mut_ptr() as *mut c_char,
1482 buffer.len() as size_t,
1483 &raw mut val_len,
1484 &raw mut found,
1485 ));
1486
1487 if found == 0 {
1488 Ok(GetIntoBufferResult::NotFound)
1489 } else if success != 0 {
1490 Ok(GetIntoBufferResult::Found(val_len))
1491 } else {
1492 Ok(GetIntoBufferResult::BufferTooSmall(val_len))
1493 }
1494 }
1495 }
1496
1497 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1499 where
1500 K: AsRef<[u8]>,
1501 I: IntoIterator<Item = K>,
1502 {
1503 DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
1504 }
1505
1506 pub fn multi_get_opt<K, I>(
1508 &self,
1509 keys: I,
1510 readopts: &ReadOptions,
1511 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1512 where
1513 K: AsRef<[u8]>,
1514 I: IntoIterator<Item = K>,
1515 {
1516 let owned_keys: Vec<K> = keys.into_iter().collect();
1517 let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
1518 let ptr_keys: Vec<*const c_char> = owned_keys
1519 .iter()
1520 .map(|k| k.as_ref().as_ptr() as *const c_char)
1521 .collect();
1522
1523 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1524 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
1525 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1526 unsafe {
1527 ffi::rocksdb_multi_get(
1528 self.inner.inner(),
1529 readopts.inner,
1530 ptr_keys.len(),
1531 ptr_keys.as_ptr(),
1532 keys_sizes.as_ptr(),
1533 values.as_mut_ptr(),
1534 values_sizes.as_mut_ptr(),
1535 errors.as_mut_ptr(),
1536 );
1537 }
1538
1539 unsafe {
1540 values.set_len(ptr_keys.len());
1541 values_sizes.set_len(ptr_keys.len());
1542 errors.set_len(ptr_keys.len());
1543 }
1544
1545 convert_values(values, values_sizes, errors)
1546 }
1547
1548 pub fn multi_get_pinned<K, I>(
1553 &'_ self,
1554 keys: I,
1555 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1556 where
1557 K: AsRef<[u8]>,
1558 I: IntoIterator<Item = K>,
1559 {
1560 DEFAULT_READ_OPTS.with(|opts| self.multi_get_pinned_opt(keys, opts))
1561 }
1562
1563 pub fn multi_get_pinned_opt<K, I>(
1568 &'_ self,
1569 keys: I,
1570 readopts: &ReadOptions,
1571 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1572 where
1573 K: AsRef<[u8]>,
1574 I: IntoIterator<Item = K>,
1575 {
1576 keys.into_iter()
1577 .map(|k| self.get_pinned_opt(k, readopts))
1578 .collect()
1579 }
1580
1581 pub fn multi_get_pinned_cf<'a, 'b: 'a, K, I, W>(
1584 &'a self,
1585 keys: I,
1586 ) -> Vec<Result<Option<DBPinnableSlice<'a>>, Error>>
1587 where
1588 K: AsRef<[u8]>,
1589 I: IntoIterator<Item = (&'b W, K)>,
1590 W: 'b + AsColumnFamilyRef,
1591 {
1592 DEFAULT_READ_OPTS.with(|opts| self.multi_get_pinned_cf_opt(keys, opts))
1593 }
1594
1595 pub fn multi_get_pinned_cf_opt<'a, 'b: 'a, K, I, W>(
1598 &'a self,
1599 keys: I,
1600 readopts: &ReadOptions,
1601 ) -> Vec<Result<Option<DBPinnableSlice<'a>>, Error>>
1602 where
1603 K: AsRef<[u8]>,
1604 I: IntoIterator<Item = (&'b W, K)>,
1605 W: 'b + AsColumnFamilyRef,
1606 {
1607 keys.into_iter()
1608 .map(|(cf, k)| self.get_pinned_cf_opt(cf, k, readopts))
1609 .collect()
1610 }
1611
1612 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1614 &'a self,
1615 keys: I,
1616 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1617 where
1618 K: AsRef<[u8]>,
1619 I: IntoIterator<Item = (&'b W, K)>,
1620 W: 'b + AsColumnFamilyRef,
1621 {
1622 DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
1623 }
1624
1625 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1627 &'a self,
1628 keys: I,
1629 readopts: &ReadOptions,
1630 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1631 where
1632 K: AsRef<[u8]>,
1633 I: IntoIterator<Item = (&'b W, K)>,
1634 W: 'b + AsColumnFamilyRef,
1635 {
1636 let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
1637 let keys_sizes: Vec<usize> = cfs_and_owned_keys
1638 .iter()
1639 .map(|(_, k)| k.as_ref().len())
1640 .collect();
1641 let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
1642 .iter()
1643 .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
1644 .collect();
1645 let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
1646 .iter()
1647 .map(|(c, _)| c.inner().cast_const())
1648 .collect();
1649 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1650 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
1651 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
1652 unsafe {
1653 ffi::rocksdb_multi_get_cf(
1654 self.inner.inner(),
1655 readopts.inner,
1656 ptr_cfs.as_ptr(),
1657 ptr_keys.len(),
1658 ptr_keys.as_ptr(),
1659 keys_sizes.as_ptr(),
1660 values.as_mut_ptr(),
1661 values_sizes.as_mut_ptr(),
1662 errors.as_mut_ptr(),
1663 );
1664 }
1665
1666 unsafe {
1667 values.set_len(ptr_keys.len());
1668 values_sizes.set_len(ptr_keys.len());
1669 errors.set_len(ptr_keys.len());
1670 }
1671
1672 convert_values(values, values_sizes, errors)
1673 }
1674
1675 pub fn batched_multi_get_cf<'a, K, I>(
1679 &'_ self,
1680 cf: &impl AsColumnFamilyRef,
1681 keys: I,
1682 sorted_input: bool,
1683 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1684 where
1685 K: AsRef<[u8]> + 'a + ?Sized,
1686 I: IntoIterator<Item = &'a K>,
1687 {
1688 DEFAULT_READ_OPTS.with(|opts| self.batched_multi_get_cf_opt(cf, keys, sorted_input, opts))
1689 }
1690
1691 pub fn batched_multi_get_cf_opt<'a, K, I>(
1695 &'_ self,
1696 cf: &impl AsColumnFamilyRef,
1697 keys: I,
1698 sorted_input: bool,
1699 readopts: &ReadOptions,
1700 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1701 where
1702 K: AsRef<[u8]> + 'a + ?Sized,
1703 I: IntoIterator<Item = &'a K>,
1704 {
1705 let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1706 .into_iter()
1707 .map(|k| {
1708 let k = k.as_ref();
1709 (k.as_ptr() as *const c_char, k.len())
1710 })
1711 .unzip();
1712
1713 let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1714 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1715
1716 unsafe {
1717 ffi::rocksdb_batched_multi_get_cf(
1718 self.inner.inner(),
1719 readopts.inner,
1720 cf.inner(),
1721 ptr_keys.len(),
1722 ptr_keys.as_ptr(),
1723 keys_sizes.as_ptr(),
1724 pinned_values.as_mut_ptr(),
1725 errors.as_mut_ptr(),
1726 sorted_input,
1727 );
1728 pinned_values
1729 .into_iter()
1730 .zip(errors)
1731 .map(|(v, e)| {
1732 if e.is_null() {
1733 if v.is_null() {
1734 Ok(None)
1735 } else {
1736 Ok(Some(DBPinnableSlice::from_c(v)))
1737 }
1738 } else {
1739 Err(convert_rocksdb_error(e))
1740 }
1741 })
1742 .collect()
1743 }
1744 }
1745
1746 pub fn batched_multi_get_cf_slice<'a, K, I>(
1803 &'_ self,
1804 cf: &impl AsColumnFamilyRef,
1805 keys: I,
1806 sorted_input: bool,
1807 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1808 where
1809 K: AsRef<[u8]> + 'a + ?Sized,
1810 I: IntoIterator<Item = &'a K>,
1811 {
1812 DEFAULT_READ_OPTS
1813 .with(|opts| self.batched_multi_get_cf_slice_opt(cf, keys, sorted_input, opts))
1814 }
1815
1816 pub fn batched_multi_get_cf_slice_opt<'a, K, I>(
1824 &'_ self,
1825 cf: &impl AsColumnFamilyRef,
1826 keys: I,
1827 sorted_input: bool,
1828 readopts: &ReadOptions,
1829 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1830 where
1831 K: AsRef<[u8]> + 'a + ?Sized,
1832 I: IntoIterator<Item = &'a K>,
1833 {
1834 let slices: Vec<ffi::rocksdb_slice_t> = keys
1836 .into_iter()
1837 .map(|k| {
1838 let k = k.as_ref();
1839 ffi::rocksdb_slice_t {
1840 data: k.as_ptr() as *const c_char,
1841 size: k.len(),
1842 }
1843 })
1844 .collect();
1845
1846 if slices.is_empty() {
1847 return Vec::new();
1848 }
1849
1850 let mut pinned_values = vec![ptr::null_mut(); slices.len()];
1851 let mut errors = vec![ptr::null_mut(); slices.len()];
1852
1853 unsafe {
1854 ffi::rocksdb_batched_multi_get_cf_slice(
1855 self.inner.inner(),
1856 readopts.inner,
1857 cf.inner(),
1858 slices.len(),
1859 slices.as_ptr(),
1860 pinned_values.as_mut_ptr(),
1861 errors.as_mut_ptr(),
1862 sorted_input,
1863 );
1864 pinned_values
1865 .into_iter()
1866 .zip(errors)
1867 .map(|(v, e)| {
1868 if e.is_null() {
1869 if v.is_null() {
1870 Ok(None)
1871 } else {
1872 Ok(Some(DBPinnableSlice::from_c(v)))
1873 }
1874 } else {
1875 Err(convert_rocksdb_error(e))
1876 }
1877 })
1878 .collect()
1879 }
1880 }
1881
1882 pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1885 DEFAULT_READ_OPTS.with(|opts| self.key_may_exist_opt(key, opts))
1886 }
1887
1888 pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1891 let key = key.as_ref();
1892 unsafe {
1893 0 != ffi::rocksdb_key_may_exist(
1894 self.inner.inner(),
1895 readopts.inner,
1896 key.as_ptr() as *const c_char,
1897 key.len() as size_t,
1898 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1904 }
1905 }
1906
1907 pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1910 DEFAULT_READ_OPTS.with(|opts| self.key_may_exist_cf_opt(cf, key, opts))
1911 }
1912
1913 pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1916 &self,
1917 cf: &impl AsColumnFamilyRef,
1918 key: K,
1919 readopts: &ReadOptions,
1920 ) -> bool {
1921 let key = key.as_ref();
1922 0 != unsafe {
1923 ffi::rocksdb_key_may_exist_cf(
1924 self.inner.inner(),
1925 readopts.inner,
1926 cf.inner(),
1927 key.as_ptr() as *const c_char,
1928 key.len() as size_t,
1929 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1935 }
1936 }
1937
1938 pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1945 &self,
1946 cf: &impl AsColumnFamilyRef,
1947 key: K,
1948 readopts: &ReadOptions,
1949 ) -> (bool, Option<CSlice>) {
1950 let key = key.as_ref();
1951 let mut val: *mut c_char = ptr::null_mut();
1952 let mut val_len: usize = 0;
1953 let mut value_found: c_uchar = 0;
1954 let may_exists = 0
1955 != unsafe {
1956 ffi::rocksdb_key_may_exist_cf(
1957 self.inner.inner(),
1958 readopts.inner,
1959 cf.inner(),
1960 key.as_ptr() as *const c_char,
1961 key.len() as size_t,
1962 &raw mut val, &raw mut val_len, ptr::null(), 0, &raw mut value_found, )
1968 };
1969 if may_exists && value_found != 0 {
1972 (
1973 may_exists,
1974 Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1975 )
1976 } else {
1977 (may_exists, None)
1978 }
1979 }
1980
1981 fn create_inner_cf_handle(
1982 &self,
1983 name: impl CStrLike,
1984 opts: &Options,
1985 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1986 let cf_name = name.bake().map_err(|err| {
1987 Error::new(format!(
1988 "Failed to convert path to CString when creating cf: {err}"
1989 ))
1990 })?;
1991
1992 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
1995 let cf_handle = unsafe {
1996 ffi::rocksdb_create_column_family(
1997 self.inner.inner(),
1998 opts.inner,
1999 cf_name.as_ptr(),
2000 &raw mut err,
2001 )
2002 };
2003 if !err.is_null() {
2004 if !cf_handle.is_null() {
2005 unsafe { ffi::rocksdb_column_family_handle_destroy(cf_handle) };
2006 }
2007 return Err(convert_rocksdb_error(err));
2008 }
2009 Ok(cf_handle)
2010 }
2011
2012 pub fn iterator<'a: 'b, 'b>(
2013 &'a self,
2014 mode: IteratorMode,
2015 ) -> DBIteratorWithThreadMode<'b, Self> {
2016 let readopts = ReadOptions::default();
2017 self.iterator_opt(mode, readopts)
2018 }
2019
2020 pub fn iterator_opt<'a: 'b, 'b>(
2021 &'a self,
2022 mode: IteratorMode,
2023 readopts: ReadOptions,
2024 ) -> DBIteratorWithThreadMode<'b, Self> {
2025 DBIteratorWithThreadMode::new(self, readopts, mode)
2026 }
2027
2028 pub fn iterator_cf_opt<'a: 'b, 'b>(
2031 &'a self,
2032 cf_handle: &impl AsColumnFamilyRef,
2033 readopts: ReadOptions,
2034 mode: IteratorMode,
2035 ) -> DBIteratorWithThreadMode<'b, Self> {
2036 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
2037 }
2038
2039 pub fn full_iterator<'a: 'b, 'b>(
2043 &'a self,
2044 mode: IteratorMode,
2045 ) -> DBIteratorWithThreadMode<'b, Self> {
2046 let mut opts = ReadOptions::default();
2047 opts.set_total_order_seek(true);
2048 DBIteratorWithThreadMode::new(self, opts, mode)
2049 }
2050
2051 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
2052 &'a self,
2053 prefix: P,
2054 ) -> DBIteratorWithThreadMode<'b, Self> {
2055 let mut opts = ReadOptions::default();
2056 opts.set_prefix_same_as_start(true);
2057 DBIteratorWithThreadMode::new(
2058 self,
2059 opts,
2060 IteratorMode::From(prefix.as_ref(), Direction::Forward),
2061 )
2062 }
2063
2064 pub fn iterator_cf<'a: 'b, 'b>(
2065 &'a self,
2066 cf_handle: &impl AsColumnFamilyRef,
2067 mode: IteratorMode,
2068 ) -> DBIteratorWithThreadMode<'b, Self> {
2069 let opts = ReadOptions::default();
2070 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
2071 }
2072
2073 pub fn full_iterator_cf<'a: 'b, 'b>(
2074 &'a self,
2075 cf_handle: &impl AsColumnFamilyRef,
2076 mode: IteratorMode,
2077 ) -> DBIteratorWithThreadMode<'b, Self> {
2078 let mut opts = ReadOptions::default();
2079 opts.set_total_order_seek(true);
2080 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
2081 }
2082
2083 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
2084 &'a self,
2085 cf_handle: &impl AsColumnFamilyRef,
2086 prefix: P,
2087 ) -> DBIteratorWithThreadMode<'a, Self> {
2088 let mut opts = ReadOptions::default();
2089 opts.set_prefix_same_as_start(true);
2090 DBIteratorWithThreadMode::<'a, Self>::new_cf(
2091 self,
2092 cf_handle.inner(),
2093 opts,
2094 IteratorMode::From(prefix.as_ref(), Direction::Forward),
2095 )
2096 }
2097
2098 pub fn prefix_exists<P: AsRef<[u8]>>(&self, prefix: P) -> Result<bool, Error> {
2105 let p = prefix.as_ref();
2106 PREFIX_READ_OPTS.with(|rc| {
2107 let mut opts = rc.borrow_mut();
2108 opts.set_iterate_range(crate::PrefixRange(p));
2109 self.prefix_exists_opt(p, &opts)
2110 })
2111 }
2112
2113 pub fn prefix_exists_opt<P: AsRef<[u8]>>(
2116 &self,
2117 prefix: P,
2118 readopts: &ReadOptions,
2119 ) -> Result<bool, Error> {
2120 let prefix = prefix.as_ref();
2121 let iter = unsafe { self.create_iterator(readopts) };
2122 let res = unsafe {
2123 ffi::rocksdb_iter_seek(
2124 iter,
2125 prefix.as_ptr() as *const c_char,
2126 prefix.len() as size_t,
2127 );
2128 if ffi::rocksdb_iter_valid(iter) != 0 {
2129 let mut key_len: size_t = 0;
2130 let key_ptr = ffi::rocksdb_iter_key(iter, &raw mut key_len);
2131 let key = slice::from_raw_parts(key_ptr as *const u8, key_len as usize);
2132 Ok(key.starts_with(prefix))
2133 } else if let Err(e) = (|| {
2134 ffi_try!(ffi::rocksdb_iter_get_error(iter));
2136 Ok::<(), Error>(())
2137 })() {
2138 Err(e)
2139 } else {
2140 Ok(false)
2141 }
2142 };
2143 unsafe { ffi::rocksdb_iter_destroy(iter) };
2144 res
2145 }
2146
2147 pub fn prefix_prober(&self) -> PrefixProber<'_, Self> {
2155 let mut opts = ReadOptions::default();
2156 opts.set_prefix_same_as_start(true);
2157 PrefixProber {
2158 raw: DBRawIteratorWithThreadMode::new(self, opts),
2159 }
2160 }
2161
2162 pub fn prefix_prober_with_opts(&self, readopts: ReadOptions) -> PrefixProber<'_, Self> {
2169 PrefixProber {
2170 raw: DBRawIteratorWithThreadMode::new(self, readopts),
2171 }
2172 }
2173
2174 pub fn prefix_prober_cf(&self, cf_handle: &impl AsColumnFamilyRef) -> PrefixProber<'_, Self> {
2177 let mut opts = ReadOptions::default();
2178 opts.set_prefix_same_as_start(true);
2179 PrefixProber {
2180 raw: DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts),
2181 }
2182 }
2183
2184 pub fn prefix_prober_cf_with_opts(
2189 &self,
2190 cf_handle: &impl AsColumnFamilyRef,
2191 readopts: ReadOptions,
2192 ) -> PrefixProber<'_, Self> {
2193 PrefixProber {
2194 raw: DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts),
2195 }
2196 }
2197
2198 pub fn prefix_exists_cf<P: AsRef<[u8]>>(
2204 &self,
2205 cf_handle: &impl AsColumnFamilyRef,
2206 prefix: P,
2207 ) -> Result<bool, Error> {
2208 let p = prefix.as_ref();
2209 PREFIX_READ_OPTS.with(|rc| {
2210 let mut opts = rc.borrow_mut();
2211 opts.set_iterate_range(crate::PrefixRange(p));
2212 self.prefix_exists_cf_opt(cf_handle, p, &opts)
2213 })
2214 }
2215
2216 pub fn prefix_exists_cf_opt<P: AsRef<[u8]>>(
2219 &self,
2220 cf_handle: &impl AsColumnFamilyRef,
2221 prefix: P,
2222 readopts: &ReadOptions,
2223 ) -> Result<bool, Error> {
2224 let prefix = prefix.as_ref();
2225 let iter = unsafe { self.create_iterator_cf(cf_handle.inner(), readopts) };
2226 let res = unsafe {
2227 ffi::rocksdb_iter_seek(
2228 iter,
2229 prefix.as_ptr() as *const c_char,
2230 prefix.len() as size_t,
2231 );
2232 if ffi::rocksdb_iter_valid(iter) != 0 {
2233 let mut key_len: size_t = 0;
2234 let key_ptr = ffi::rocksdb_iter_key(iter, &raw mut key_len);
2235 let key = slice::from_raw_parts(key_ptr as *const u8, key_len as usize);
2236 Ok(key.starts_with(prefix))
2237 } else if let Err(e) = (|| {
2238 ffi_try!(ffi::rocksdb_iter_get_error(iter));
2239 Ok::<(), Error>(())
2240 })() {
2241 Err(e)
2242 } else {
2243 Ok(false)
2244 }
2245 };
2246 unsafe { ffi::rocksdb_iter_destroy(iter) };
2247 res
2248 }
2249
2250 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
2252 let opts = ReadOptions::default();
2253 DBRawIteratorWithThreadMode::new(self, opts)
2254 }
2255
2256 pub fn raw_iterator_cf<'a: 'b, 'b>(
2258 &'a self,
2259 cf_handle: &impl AsColumnFamilyRef,
2260 ) -> DBRawIteratorWithThreadMode<'b, Self> {
2261 let opts = ReadOptions::default();
2262 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
2263 }
2264
2265 pub fn raw_iterator_opt<'a: 'b, 'b>(
2267 &'a self,
2268 readopts: ReadOptions,
2269 ) -> DBRawIteratorWithThreadMode<'b, Self> {
2270 DBRawIteratorWithThreadMode::new(self, readopts)
2271 }
2272
2273 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
2275 &'a self,
2276 cf_handle: &impl AsColumnFamilyRef,
2277 readopts: ReadOptions,
2278 ) -> DBRawIteratorWithThreadMode<'b, Self> {
2279 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
2280 }
2281
2282 pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
2283 SnapshotWithThreadMode::<Self>::new(self)
2284 }
2285
2286 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
2287 where
2288 K: AsRef<[u8]>,
2289 V: AsRef<[u8]>,
2290 {
2291 let key = key.as_ref();
2292 let value = value.as_ref();
2293
2294 unsafe {
2295 ffi_try!(ffi::rocksdb_put(
2296 self.inner.inner(),
2297 writeopts.inner,
2298 key.as_ptr() as *const c_char,
2299 key.len() as size_t,
2300 value.as_ptr() as *const c_char,
2301 value.len() as size_t,
2302 ));
2303 Ok(())
2304 }
2305 }
2306
2307 pub fn put_cf_opt<K, V>(
2308 &self,
2309 cf: &impl AsColumnFamilyRef,
2310 key: K,
2311 value: V,
2312 writeopts: &WriteOptions,
2313 ) -> Result<(), Error>
2314 where
2315 K: AsRef<[u8]>,
2316 V: AsRef<[u8]>,
2317 {
2318 let key = key.as_ref();
2319 let value = value.as_ref();
2320
2321 unsafe {
2322 ffi_try!(ffi::rocksdb_put_cf(
2323 self.inner.inner(),
2324 writeopts.inner,
2325 cf.inner(),
2326 key.as_ptr() as *const c_char,
2327 key.len() as size_t,
2328 value.as_ptr() as *const c_char,
2329 value.len() as size_t,
2330 ));
2331 Ok(())
2332 }
2333 }
2334
2335 pub fn put_with_ts_opt<K, V, S>(
2342 &self,
2343 key: K,
2344 ts: S,
2345 value: V,
2346 writeopts: &WriteOptions,
2347 ) -> Result<(), Error>
2348 where
2349 K: AsRef<[u8]>,
2350 V: AsRef<[u8]>,
2351 S: AsRef<[u8]>,
2352 {
2353 let key = key.as_ref();
2354 let value = value.as_ref();
2355 let ts = ts.as_ref();
2356 unsafe {
2357 ffi_try!(ffi::rocksdb_put_with_ts(
2358 self.inner.inner(),
2359 writeopts.inner,
2360 key.as_ptr() as *const c_char,
2361 key.len() as size_t,
2362 ts.as_ptr() as *const c_char,
2363 ts.len() as size_t,
2364 value.as_ptr() as *const c_char,
2365 value.len() as size_t,
2366 ));
2367 Ok(())
2368 }
2369 }
2370
2371 pub fn put_cf_with_ts_opt<K, V, S>(
2378 &self,
2379 cf: &impl AsColumnFamilyRef,
2380 key: K,
2381 ts: S,
2382 value: V,
2383 writeopts: &WriteOptions,
2384 ) -> Result<(), Error>
2385 where
2386 K: AsRef<[u8]>,
2387 V: AsRef<[u8]>,
2388 S: AsRef<[u8]>,
2389 {
2390 let key = key.as_ref();
2391 let value = value.as_ref();
2392 let ts = ts.as_ref();
2393 unsafe {
2394 ffi_try!(ffi::rocksdb_put_cf_with_ts(
2395 self.inner.inner(),
2396 writeopts.inner,
2397 cf.inner(),
2398 key.as_ptr() as *const c_char,
2399 key.len() as size_t,
2400 ts.as_ptr() as *const c_char,
2401 ts.len() as size_t,
2402 value.as_ptr() as *const c_char,
2403 value.len() as size_t,
2404 ));
2405 Ok(())
2406 }
2407 }
2408
2409 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
2410 where
2411 K: AsRef<[u8]>,
2412 V: AsRef<[u8]>,
2413 {
2414 let key = key.as_ref();
2415 let value = value.as_ref();
2416
2417 unsafe {
2418 ffi_try!(ffi::rocksdb_merge(
2419 self.inner.inner(),
2420 writeopts.inner,
2421 key.as_ptr() as *const c_char,
2422 key.len() as size_t,
2423 value.as_ptr() as *const c_char,
2424 value.len() as size_t,
2425 ));
2426 Ok(())
2427 }
2428 }
2429
2430 pub fn merge_cf_opt<K, V>(
2431 &self,
2432 cf: &impl AsColumnFamilyRef,
2433 key: K,
2434 value: V,
2435 writeopts: &WriteOptions,
2436 ) -> Result<(), Error>
2437 where
2438 K: AsRef<[u8]>,
2439 V: AsRef<[u8]>,
2440 {
2441 let key = key.as_ref();
2442 let value = value.as_ref();
2443
2444 unsafe {
2445 ffi_try!(ffi::rocksdb_merge_cf(
2446 self.inner.inner(),
2447 writeopts.inner,
2448 cf.inner(),
2449 key.as_ptr() as *const c_char,
2450 key.len() as size_t,
2451 value.as_ptr() as *const c_char,
2452 value.len() as size_t,
2453 ));
2454 Ok(())
2455 }
2456 }
2457
2458 pub fn delete_opt<K: AsRef<[u8]>>(
2459 &self,
2460 key: K,
2461 writeopts: &WriteOptions,
2462 ) -> Result<(), Error> {
2463 let key = key.as_ref();
2464
2465 unsafe {
2466 ffi_try!(ffi::rocksdb_delete(
2467 self.inner.inner(),
2468 writeopts.inner,
2469 key.as_ptr() as *const c_char,
2470 key.len() as size_t,
2471 ));
2472 Ok(())
2473 }
2474 }
2475
2476 pub fn delete_cf_opt<K: AsRef<[u8]>>(
2477 &self,
2478 cf: &impl AsColumnFamilyRef,
2479 key: K,
2480 writeopts: &WriteOptions,
2481 ) -> Result<(), Error> {
2482 let key = key.as_ref();
2483
2484 unsafe {
2485 ffi_try!(ffi::rocksdb_delete_cf(
2486 self.inner.inner(),
2487 writeopts.inner,
2488 cf.inner(),
2489 key.as_ptr() as *const c_char,
2490 key.len() as size_t,
2491 ));
2492 Ok(())
2493 }
2494 }
2495
2496 pub fn delete_with_ts_opt<K, S>(
2500 &self,
2501 key: K,
2502 ts: S,
2503 writeopts: &WriteOptions,
2504 ) -> Result<(), Error>
2505 where
2506 K: AsRef<[u8]>,
2507 S: AsRef<[u8]>,
2508 {
2509 let key = key.as_ref();
2510 let ts = ts.as_ref();
2511 unsafe {
2512 ffi_try!(ffi::rocksdb_delete_with_ts(
2513 self.inner.inner(),
2514 writeopts.inner,
2515 key.as_ptr() as *const c_char,
2516 key.len() as size_t,
2517 ts.as_ptr() as *const c_char,
2518 ts.len() as size_t,
2519 ));
2520 Ok(())
2521 }
2522 }
2523
2524 pub fn delete_cf_with_ts_opt<K, S>(
2528 &self,
2529 cf: &impl AsColumnFamilyRef,
2530 key: K,
2531 ts: S,
2532 writeopts: &WriteOptions,
2533 ) -> Result<(), Error>
2534 where
2535 K: AsRef<[u8]>,
2536 S: AsRef<[u8]>,
2537 {
2538 let key = key.as_ref();
2539 let ts = ts.as_ref();
2540 unsafe {
2541 ffi_try!(ffi::rocksdb_delete_cf_with_ts(
2542 self.inner.inner(),
2543 writeopts.inner,
2544 cf.inner(),
2545 key.as_ptr() as *const c_char,
2546 key.len() as size_t,
2547 ts.as_ptr() as *const c_char,
2548 ts.len() as size_t,
2549 ));
2550 Ok(())
2551 }
2552 }
2553
2554 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
2555 where
2556 K: AsRef<[u8]>,
2557 V: AsRef<[u8]>,
2558 {
2559 DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
2560 }
2561
2562 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
2563 where
2564 K: AsRef<[u8]>,
2565 V: AsRef<[u8]>,
2566 {
2567 DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
2568 }
2569
2570 pub fn put_with_ts<K, V, S>(&self, key: K, ts: S, value: V) -> Result<(), Error>
2577 where
2578 K: AsRef<[u8]>,
2579 V: AsRef<[u8]>,
2580 S: AsRef<[u8]>,
2581 {
2582 DEFAULT_WRITE_OPTS
2583 .with(|opts| self.put_with_ts_opt(key.as_ref(), ts.as_ref(), value.as_ref(), opts))
2584 }
2585
2586 pub fn put_cf_with_ts<K, V, S>(
2593 &self,
2594 cf: &impl AsColumnFamilyRef,
2595 key: K,
2596 ts: S,
2597 value: V,
2598 ) -> Result<(), Error>
2599 where
2600 K: AsRef<[u8]>,
2601 V: AsRef<[u8]>,
2602 S: AsRef<[u8]>,
2603 {
2604 DEFAULT_WRITE_OPTS.with(|opts| {
2605 self.put_cf_with_ts_opt(cf, key.as_ref(), ts.as_ref(), value.as_ref(), opts)
2606 })
2607 }
2608
2609 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
2610 where
2611 K: AsRef<[u8]>,
2612 V: AsRef<[u8]>,
2613 {
2614 DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
2615 }
2616
2617 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
2618 where
2619 K: AsRef<[u8]>,
2620 V: AsRef<[u8]>,
2621 {
2622 DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
2623 }
2624
2625 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2626 DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
2627 }
2628
2629 pub fn delete_cf<K: AsRef<[u8]>>(
2630 &self,
2631 cf: &impl AsColumnFamilyRef,
2632 key: K,
2633 ) -> Result<(), Error> {
2634 DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
2635 }
2636
2637 pub fn delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2641 &self,
2642 key: K,
2643 ts: S,
2644 ) -> Result<(), Error> {
2645 DEFAULT_WRITE_OPTS.with(|opts| self.delete_with_ts_opt(key, ts, opts))
2646 }
2647
2648 pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2652 &self,
2653 cf: &impl AsColumnFamilyRef,
2654 key: K,
2655 ts: S,
2656 ) -> Result<(), Error> {
2657 DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_with_ts_opt(cf, key, ts, opts))
2658 }
2659
2660 pub fn single_delete_opt<K: AsRef<[u8]>>(
2680 &self,
2681 key: K,
2682 writeopts: &WriteOptions,
2683 ) -> Result<(), Error> {
2684 let key = key.as_ref();
2685
2686 unsafe {
2687 ffi_try!(ffi::rocksdb_singledelete(
2688 self.inner.inner(),
2689 writeopts.inner,
2690 key.as_ptr() as *const c_char,
2691 key.len() as size_t,
2692 ));
2693 Ok(())
2694 }
2695 }
2696
2697 pub fn single_delete_cf_opt<K: AsRef<[u8]>>(
2701 &self,
2702 cf: &impl AsColumnFamilyRef,
2703 key: K,
2704 writeopts: &WriteOptions,
2705 ) -> Result<(), Error> {
2706 let key = key.as_ref();
2707
2708 unsafe {
2709 ffi_try!(ffi::rocksdb_singledelete_cf(
2710 self.inner.inner(),
2711 writeopts.inner,
2712 cf.inner(),
2713 key.as_ptr() as *const c_char,
2714 key.len() as size_t,
2715 ));
2716 Ok(())
2717 }
2718 }
2719
2720 pub fn single_delete_with_ts_opt<K, S>(
2727 &self,
2728 key: K,
2729 ts: S,
2730 writeopts: &WriteOptions,
2731 ) -> Result<(), Error>
2732 where
2733 K: AsRef<[u8]>,
2734 S: AsRef<[u8]>,
2735 {
2736 let key = key.as_ref();
2737 let ts = ts.as_ref();
2738 unsafe {
2739 ffi_try!(ffi::rocksdb_singledelete_with_ts(
2740 self.inner.inner(),
2741 writeopts.inner,
2742 key.as_ptr() as *const c_char,
2743 key.len() as size_t,
2744 ts.as_ptr() as *const c_char,
2745 ts.len() as size_t,
2746 ));
2747 Ok(())
2748 }
2749 }
2750
2751 pub fn single_delete_cf_with_ts_opt<K, S>(
2758 &self,
2759 cf: &impl AsColumnFamilyRef,
2760 key: K,
2761 ts: S,
2762 writeopts: &WriteOptions,
2763 ) -> Result<(), Error>
2764 where
2765 K: AsRef<[u8]>,
2766 S: AsRef<[u8]>,
2767 {
2768 let key = key.as_ref();
2769 let ts = ts.as_ref();
2770 unsafe {
2771 ffi_try!(ffi::rocksdb_singledelete_cf_with_ts(
2772 self.inner.inner(),
2773 writeopts.inner,
2774 cf.inner(),
2775 key.as_ptr() as *const c_char,
2776 key.len() as size_t,
2777 ts.as_ptr() as *const c_char,
2778 ts.len() as size_t,
2779 ));
2780 Ok(())
2781 }
2782 }
2783
2784 pub fn single_delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2788 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_opt(key, opts))
2789 }
2790
2791 pub fn single_delete_cf<K: AsRef<[u8]>>(
2795 &self,
2796 cf: &impl AsColumnFamilyRef,
2797 key: K,
2798 ) -> Result<(), Error> {
2799 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_cf_opt(cf, key, opts))
2800 }
2801
2802 pub fn single_delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2809 &self,
2810 key: K,
2811 ts: S,
2812 ) -> Result<(), Error> {
2813 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_with_ts_opt(key, ts, opts))
2814 }
2815
2816 pub fn single_delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2823 &self,
2824 cf: &impl AsColumnFamilyRef,
2825 key: K,
2826 ts: S,
2827 ) -> Result<(), Error> {
2828 DEFAULT_WRITE_OPTS.with(|opts| self.single_delete_cf_with_ts_opt(cf, key, ts, opts))
2829 }
2830
2831 pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
2833 unsafe {
2834 let start = start.as_ref().map(AsRef::as_ref);
2835 let end = end.as_ref().map(AsRef::as_ref);
2836
2837 ffi::rocksdb_compact_range(
2838 self.inner.inner(),
2839 opt_bytes_to_ptr(start),
2840 start.map_or(0, <[u8]>::len) as size_t,
2841 opt_bytes_to_ptr(end),
2842 end.map_or(0, <[u8]>::len) as size_t,
2843 );
2844 }
2845 }
2846
2847 pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2849 &self,
2850 start: Option<S>,
2851 end: Option<E>,
2852 opts: &CompactOptions,
2853 ) {
2854 unsafe {
2855 let start = start.as_ref().map(AsRef::as_ref);
2856 let end = end.as_ref().map(AsRef::as_ref);
2857
2858 ffi::rocksdb_compact_range_opt(
2859 self.inner.inner(),
2860 opts.inner,
2861 opt_bytes_to_ptr(start),
2862 start.map_or(0, <[u8]>::len) as size_t,
2863 opt_bytes_to_ptr(end),
2864 end.map_or(0, <[u8]>::len) as size_t,
2865 );
2866 }
2867 }
2868
2869 pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2872 &self,
2873 cf: &impl AsColumnFamilyRef,
2874 start: Option<S>,
2875 end: Option<E>,
2876 ) {
2877 unsafe {
2878 let start = start.as_ref().map(AsRef::as_ref);
2879 let end = end.as_ref().map(AsRef::as_ref);
2880
2881 ffi::rocksdb_compact_range_cf(
2882 self.inner.inner(),
2883 cf.inner(),
2884 opt_bytes_to_ptr(start),
2885 start.map_or(0, <[u8]>::len) as size_t,
2886 opt_bytes_to_ptr(end),
2887 end.map_or(0, <[u8]>::len) as size_t,
2888 );
2889 }
2890 }
2891
2892 pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2894 &self,
2895 cf: &impl AsColumnFamilyRef,
2896 start: Option<S>,
2897 end: Option<E>,
2898 opts: &CompactOptions,
2899 ) {
2900 unsafe {
2901 let start = start.as_ref().map(AsRef::as_ref);
2902 let end = end.as_ref().map(AsRef::as_ref);
2903
2904 ffi::rocksdb_compact_range_cf_opt(
2905 self.inner.inner(),
2906 cf.inner(),
2907 opts.inner,
2908 opt_bytes_to_ptr(start),
2909 start.map_or(0, <[u8]>::len) as size_t,
2910 opt_bytes_to_ptr(end),
2911 end.map_or(0, <[u8]>::len) as size_t,
2912 );
2913 }
2914 }
2915
2916 pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
2925 unsafe {
2926 ffi_try!(ffi::rocksdb_wait_for_compact(
2927 self.inner.inner(),
2928 opts.inner
2929 ));
2930 }
2931 Ok(())
2932 }
2933
2934 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
2935 let copts = convert_options(opts)?;
2936 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2937 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2938 let count = opts.len() as i32;
2939 unsafe {
2940 ffi_try!(ffi::rocksdb_set_options(
2941 self.inner.inner(),
2942 count,
2943 cnames.as_ptr(),
2944 cvalues.as_ptr(),
2945 ));
2946 }
2947 Ok(())
2948 }
2949
2950 pub fn set_options_cf(
2951 &self,
2952 cf: &impl AsColumnFamilyRef,
2953 opts: &[(&str, &str)],
2954 ) -> Result<(), Error> {
2955 let copts = convert_options(opts)?;
2956 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2957 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2958 let count = opts.len() as i32;
2959 unsafe {
2960 ffi_try!(ffi::rocksdb_set_options_cf(
2961 self.inner.inner(),
2962 cf.inner(),
2963 count,
2964 cnames.as_ptr(),
2965 cvalues.as_ptr(),
2966 ));
2967 }
2968 Ok(())
2969 }
2970
2971 fn property_value_impl<R>(
2980 name: impl CStrLike,
2981 get_property: impl FnOnce(*const c_char) -> *mut c_char,
2982 parse: impl FnOnce(&str) -> Result<R, Error>,
2983 ) -> Result<Option<R>, Error> {
2984 let value = match name.bake() {
2985 Ok(prop_name) => get_property(prop_name.as_ptr()),
2986 Err(e) => {
2987 return Err(Error::new(format!(
2988 "Failed to convert property name to CString: {e}"
2989 )));
2990 }
2991 };
2992 if value.is_null() {
2993 return Ok(None);
2994 }
2995 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
2996 Ok(s) => parse(s).map(|value| Some(value)),
2997 Err(e) => Err(Error::new(format!(
2998 "Failed to convert property value to string: {e}"
2999 ))),
3000 };
3001 unsafe {
3002 ffi::rocksdb_free(value as *mut c_void);
3003 }
3004 result
3005 }
3006
3007 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
3012 Self::property_value_impl(
3013 name,
3014 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
3015 |str_value| Ok(str_value.to_owned()),
3016 )
3017 }
3018
3019 pub fn property_value_cf(
3024 &self,
3025 cf: &impl AsColumnFamilyRef,
3026 name: impl CStrLike,
3027 ) -> Result<Option<String>, Error> {
3028 Self::property_value_impl(
3029 name,
3030 |prop_name| unsafe {
3031 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
3032 },
3033 |str_value| Ok(str_value.to_owned()),
3034 )
3035 }
3036
3037 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
3038 value.parse::<u64>().map_err(|err| {
3039 Error::new(format!(
3040 "Failed to convert property value {value} to int: {err}"
3041 ))
3042 })
3043 }
3044
3045 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
3050 Self::property_value_impl(
3051 name,
3052 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
3053 Self::parse_property_int_value,
3054 )
3055 }
3056
3057 pub fn property_int_value_cf(
3062 &self,
3063 cf: &impl AsColumnFamilyRef,
3064 name: impl CStrLike,
3065 ) -> Result<Option<u64>, Error> {
3066 Self::property_value_impl(
3067 name,
3068 |prop_name| unsafe {
3069 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
3070 },
3071 Self::parse_property_int_value,
3072 )
3073 }
3074
3075 pub fn latest_sequence_number(&self) -> u64 {
3077 unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
3078 }
3079
3080 pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
3088 self.get_approximate_sizes_cfopt(None::<&ColumnFamily>, ranges)
3089 }
3090
3091 pub fn get_approximate_sizes_cf(
3092 &self,
3093 cf: &impl AsColumnFamilyRef,
3094 ranges: &[Range],
3095 ) -> Vec<u64> {
3096 self.get_approximate_sizes_cfopt(Some(cf), ranges)
3097 }
3098
3099 fn get_approximate_sizes_cfopt(
3100 &self,
3101 cf: Option<&impl AsColumnFamilyRef>,
3102 ranges: &[Range],
3103 ) -> Vec<u64> {
3104 let start_keys: Vec<*const c_char> = ranges
3105 .iter()
3106 .map(|x| x.start_key.as_ptr() as *const c_char)
3107 .collect();
3108 let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
3109 let end_keys: Vec<*const c_char> = ranges
3110 .iter()
3111 .map(|x| x.end_key.as_ptr() as *const c_char)
3112 .collect();
3113 let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
3114 let mut sizes: Vec<u64> = vec![0; ranges.len()];
3115 let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
3116 ranges.len() as i32,
3117 start_keys.as_ptr(),
3118 start_key_lens.as_ptr(),
3119 end_keys.as_ptr(),
3120 end_key_lens.as_ptr(),
3121 sizes.as_mut_ptr(),
3122 );
3123 let mut err: *mut c_char = ptr::null_mut();
3124 match cf {
3125 None => unsafe {
3126 ffi::rocksdb_approximate_sizes(
3127 self.inner.inner(),
3128 n,
3129 start_key_ptr,
3130 start_key_len_ptr,
3131 end_key_ptr,
3132 end_key_len_ptr,
3133 size_ptr,
3134 &raw mut err,
3135 );
3136 },
3137 Some(cf) => unsafe {
3138 ffi::rocksdb_approximate_sizes_cf(
3139 self.inner.inner(),
3140 cf.inner(),
3141 n,
3142 start_key_ptr,
3143 start_key_len_ptr,
3144 end_key_ptr,
3145 end_key_len_ptr,
3146 size_ptr,
3147 &raw mut err,
3148 );
3149 },
3150 }
3151 sizes
3152 }
3153
3154 pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
3165 unsafe {
3166 let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
3170 let iter = ffi_try!(ffi::rocksdb_get_updates_since(
3171 self.inner.inner(),
3172 seq_number,
3173 opts
3174 ));
3175 Ok(DBWALIterator {
3176 inner: iter,
3177 start_seq_number: seq_number,
3178 })
3179 }
3180 }
3181
3182 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
3185 unsafe {
3186 ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
3187 }
3188 Ok(())
3189 }
3190
3191 pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
3193 let opts = IngestExternalFileOptions::default();
3194 self.ingest_external_file_opts(&opts, paths)
3195 }
3196
3197 pub fn ingest_external_file_opts<P: AsRef<Path>>(
3199 &self,
3200 opts: &IngestExternalFileOptions,
3201 paths: Vec<P>,
3202 ) -> Result<(), Error> {
3203 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
3204 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
3205
3206 self.ingest_external_file_raw(opts, &paths_v, &cpaths)
3207 }
3208
3209 pub fn ingest_external_file_cf<P: AsRef<Path>>(
3212 &self,
3213 cf: &impl AsColumnFamilyRef,
3214 paths: Vec<P>,
3215 ) -> Result<(), Error> {
3216 let opts = IngestExternalFileOptions::default();
3217 self.ingest_external_file_cf_opts(cf, &opts, paths)
3218 }
3219
3220 pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
3222 &self,
3223 cf: &impl AsColumnFamilyRef,
3224 opts: &IngestExternalFileOptions,
3225 paths: Vec<P>,
3226 ) -> Result<(), Error> {
3227 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
3228 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
3229
3230 self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
3231 }
3232
3233 fn ingest_external_file_raw(
3234 &self,
3235 opts: &IngestExternalFileOptions,
3236 paths_v: &[CString],
3237 cpaths: &[*const c_char],
3238 ) -> Result<(), Error> {
3239 unsafe {
3240 ffi_try!(ffi::rocksdb_ingest_external_file(
3241 self.inner.inner(),
3242 cpaths.as_ptr(),
3243 paths_v.len(),
3244 opts.inner.cast_const()
3245 ));
3246 Ok(())
3247 }
3248 }
3249
3250 fn ingest_external_file_raw_cf(
3251 &self,
3252 cf: &impl AsColumnFamilyRef,
3253 opts: &IngestExternalFileOptions,
3254 paths_v: &[CString],
3255 cpaths: &[*const c_char],
3256 ) -> Result<(), Error> {
3257 unsafe {
3258 ffi_try!(ffi::rocksdb_ingest_external_file_cf(
3259 self.inner.inner(),
3260 cf.inner(),
3261 cpaths.as_ptr(),
3262 paths_v.len(),
3263 opts.inner.cast_const()
3264 ));
3265 Ok(())
3266 }
3267 }
3268
3269 pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
3271 unsafe {
3272 let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
3273
3274 let metadata = ColumnFamilyMetaData {
3275 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
3276 name: from_cstr_and_free(ffi::rocksdb_column_family_metadata_get_name(ptr)),
3277 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
3278 };
3279
3280 ffi::rocksdb_column_family_metadata_destroy(ptr);
3282
3283 metadata
3285 }
3286 }
3287
3288 pub fn get_column_family_metadata_cf(
3290 &self,
3291 cf: &impl AsColumnFamilyRef,
3292 ) -> ColumnFamilyMetaData {
3293 unsafe {
3294 let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
3295
3296 let metadata = ColumnFamilyMetaData {
3297 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
3298 name: from_cstr_and_free(ffi::rocksdb_column_family_metadata_get_name(ptr)),
3299 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
3300 };
3301
3302 ffi::rocksdb_column_family_metadata_destroy(ptr);
3304
3305 metadata
3307 }
3308 }
3309
3310 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
3313 unsafe {
3314 let livefiles_ptr = ffi::rocksdb_livefiles(self.inner.inner());
3315 if livefiles_ptr.is_null() {
3316 Err(Error::new("Could not get live files".to_owned()))
3317 } else {
3318 let files = LiveFile::from_rocksdb_livefiles_ptr(livefiles_ptr);
3319
3320 ffi::rocksdb_livefiles_destroy(livefiles_ptr);
3322
3323 Ok(files)
3325 }
3326 }
3327 }
3328
3329 pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
3338 let from = from.as_ref();
3339 let to = to.as_ref();
3340 unsafe {
3341 ffi_try!(ffi::rocksdb_delete_file_in_range(
3342 self.inner.inner(),
3343 from.as_ptr() as *const c_char,
3344 from.len() as size_t,
3345 to.as_ptr() as *const c_char,
3346 to.len() as size_t,
3347 ));
3348 Ok(())
3349 }
3350 }
3351
3352 pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
3354 &self,
3355 cf: &impl AsColumnFamilyRef,
3356 from: K,
3357 to: K,
3358 ) -> Result<(), Error> {
3359 let from = from.as_ref();
3360 let to = to.as_ref();
3361 unsafe {
3362 ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
3363 self.inner.inner(),
3364 cf.inner(),
3365 from.as_ptr() as *const c_char,
3366 from.len() as size_t,
3367 to.as_ptr() as *const c_char,
3368 to.len() as size_t,
3369 ));
3370 Ok(())
3371 }
3372 }
3373
3374 pub fn cancel_all_background_work(&self, wait: bool) {
3376 unsafe {
3377 ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
3378 }
3379 }
3380
3381 fn drop_column_family<C>(
3382 &self,
3383 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
3384 cf: C,
3385 ) -> Result<(), Error> {
3386 unsafe {
3387 ffi_try!(ffi::rocksdb_drop_column_family(
3389 self.inner.inner(),
3390 cf_inner
3391 ));
3392 }
3393 drop(cf);
3396 Ok(())
3397 }
3398
3399 pub fn increase_full_history_ts_low<S: AsRef<[u8]>>(
3404 &self,
3405 cf: &impl AsColumnFamilyRef,
3406 ts: S,
3407 ) -> Result<(), Error> {
3408 let ts = ts.as_ref();
3409 unsafe {
3410 ffi_try!(ffi::rocksdb_increase_full_history_ts_low(
3411 self.inner.inner(),
3412 cf.inner(),
3413 ts.as_ptr() as *const c_char,
3414 ts.len() as size_t,
3415 ));
3416 Ok(())
3417 }
3418 }
3419
3420 pub fn get_full_history_ts_low(&self, cf: &impl AsColumnFamilyRef) -> Result<Vec<u8>, Error> {
3422 unsafe {
3423 let mut ts_lowlen = 0;
3424 let ts = ffi_try!(ffi::rocksdb_get_full_history_ts_low(
3425 self.inner.inner(),
3426 cf.inner(),
3427 &raw mut ts_lowlen,
3428 ));
3429
3430 if ts.is_null() {
3431 Err(Error::new("Could not get full_history_ts_low".to_owned()))
3432 } else {
3433 let mut vec = vec![0; ts_lowlen];
3434 ptr::copy_nonoverlapping(ts as *mut u8, vec.as_mut_ptr(), ts_lowlen);
3435 ffi::rocksdb_free(ts as *mut c_void);
3436 Ok(vec)
3437 }
3438 }
3439 }
3440
3441 pub fn get_db_identity(&self) -> Result<Vec<u8>, Error> {
3443 unsafe {
3444 let mut length: usize = 0;
3445 let identity_ptr = ffi::rocksdb_get_db_identity(self.inner.inner(), &raw mut length);
3446 let identity_vec = raw_data(identity_ptr, length);
3447 ffi::rocksdb_free(identity_ptr as *mut c_void);
3448 identity_vec.ok_or_else(|| Error::new("get_db_identity returned NULL".to_string()))
3451 }
3452 }
3453}
3454
3455impl<I: DBInner> DBCommon<SingleThreaded, I> {
3456 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
3458 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
3459 self.cfs
3460 .cfs
3461 .insert(name.as_ref().to_string(), ColumnFamily { inner });
3462 Ok(())
3463 }
3464
3465 #[doc = include_str!("db_create_column_family_with_import.md")]
3466 pub fn create_column_family_with_import<N: AsRef<str>>(
3467 &mut self,
3468 options: &Options,
3469 column_family_name: N,
3470 import_options: &ImportColumnFamilyOptions,
3471 metadata: &ExportImportFilesMetaData,
3472 ) -> Result<(), Error> {
3473 let name = column_family_name.as_ref();
3474 let c_name = CString::new(name).map_err(|err| {
3475 Error::new(format!(
3476 "Failed to convert name to CString while importing column family: {err}"
3477 ))
3478 })?;
3479 let inner = unsafe {
3480 ffi_try!(ffi::rocksdb_create_column_family_with_import(
3481 self.inner.inner(),
3482 options.inner,
3483 c_name.as_ptr(),
3484 import_options.inner,
3485 metadata.inner
3486 ))
3487 };
3488 self.cfs
3489 .cfs
3490 .insert(column_family_name.as_ref().into(), ColumnFamily { inner });
3491 Ok(())
3492 }
3493
3494 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
3496 match self.cfs.cfs.remove(name) {
3497 Some(cf) => self.drop_column_family(cf.inner, cf),
3498 _ => Err(Error::new(format!("Invalid column family: {name}"))),
3499 }
3500 }
3501
3502 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
3504 self.cfs.cfs.get(name)
3505 }
3506
3507 pub fn cf_names(&self) -> Vec<String> {
3511 self.cfs.cfs.keys().cloned().collect()
3512 }
3513}
3514
3515impl<I: DBInner> DBCommon<MultiThreaded, I> {
3516 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
3518 let mut cfs = self.cfs.cfs.write();
3521 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
3522 cfs.insert(
3523 name.as_ref().to_string(),
3524 Arc::new(UnboundColumnFamily { inner }),
3525 );
3526 Ok(())
3527 }
3528
3529 #[doc = include_str!("db_create_column_family_with_import.md")]
3530 pub fn create_column_family_with_import<N: AsRef<str>>(
3531 &self,
3532 options: &Options,
3533 column_family_name: N,
3534 import_options: &ImportColumnFamilyOptions,
3535 metadata: &ExportImportFilesMetaData,
3536 ) -> Result<(), Error> {
3537 let mut cfs = self.cfs.cfs.write();
3539 let name = column_family_name.as_ref();
3540 let c_name = CString::new(name).map_err(|err| {
3541 Error::new(format!(
3542 "Failed to convert name to CString while importing column family: {err}"
3543 ))
3544 })?;
3545 let inner = unsafe {
3546 ffi_try!(ffi::rocksdb_create_column_family_with_import(
3547 self.inner.inner(),
3548 options.inner,
3549 c_name.as_ptr(),
3550 import_options.inner,
3551 metadata.inner
3552 ))
3553 };
3554 cfs.insert(
3555 column_family_name.as_ref().to_string(),
3556 Arc::new(UnboundColumnFamily { inner }),
3557 );
3558 Ok(())
3559 }
3560
3561 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
3564 match self.cfs.cfs.write().remove(name) {
3565 Some(cf) => self.drop_column_family(cf.inner, cf),
3566 _ => Err(Error::new(format!("Invalid column family: {name}"))),
3567 }
3568 }
3569
3570 pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
3572 self.cfs
3573 .cfs
3574 .read()
3575 .get(name)
3576 .cloned()
3577 .map(UnboundColumnFamily::bound_column_family)
3578 }
3579
3580 pub fn cf_names(&self) -> Vec<String> {
3584 self.cfs.cfs.read().keys().cloned().collect()
3585 }
3586}
3587
3588impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
3589 fn drop(&mut self) {
3590 self.cfs.drop_all_cfs_internal();
3591 }
3592}
3593
3594impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
3595 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3596 write!(f, "RocksDB {{ path: {} }}", self.path().display())
3597 }
3598}
3599
3600#[derive(Debug, Clone)]
3602pub struct ColumnFamilyMetaData {
3603 pub size: u64,
3606 pub name: String,
3608 pub file_count: usize,
3610}
3611
3612#[derive(Debug, Clone)]
3614pub struct LiveFile {
3615 pub column_family_name: String,
3617 pub name: String,
3619 pub directory: String,
3622 pub size: usize,
3624 pub level: i32,
3626 pub start_key: Option<Vec<u8>>,
3628 pub end_key: Option<Vec<u8>>,
3630 pub smallest_seqno: u64,
3631 pub largest_seqno: u64,
3632 pub num_entries: u64,
3634 pub num_deletions: u64,
3636}
3637
3638impl LiveFile {
3639 pub(crate) fn from_rocksdb_livefiles_ptr(
3641 files: *const ffi::rocksdb_livefiles_t,
3642 ) -> Vec<LiveFile> {
3643 unsafe {
3644 let n = ffi::rocksdb_livefiles_count(files);
3645
3646 let mut livefiles = Vec::with_capacity(n as usize);
3647 let mut key_size: usize = 0;
3648
3649 for i in 0..n {
3650 let column_family_name =
3652 from_cstr_without_free(ffi::rocksdb_livefiles_column_family_name(files, i));
3653 let name = from_cstr_without_free(ffi::rocksdb_livefiles_name(files, i));
3654 let directory = from_cstr_without_free(ffi::rocksdb_livefiles_directory(files, i));
3655 let size = ffi::rocksdb_livefiles_size(files, i);
3656 let level = ffi::rocksdb_livefiles_level(files, i);
3657
3658 let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &raw mut key_size);
3660 let smallest_key = raw_data(smallest_key, key_size);
3661
3662 let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &raw mut key_size);
3664 let largest_key = raw_data(largest_key, key_size);
3665
3666 livefiles.push(LiveFile {
3667 column_family_name,
3668 name,
3669 directory,
3670 size,
3671 level,
3672 start_key: smallest_key,
3673 end_key: largest_key,
3674 largest_seqno: ffi::rocksdb_livefiles_largest_seqno(files, i),
3675 smallest_seqno: ffi::rocksdb_livefiles_smallest_seqno(files, i),
3676 num_entries: ffi::rocksdb_livefiles_entries(files, i),
3677 num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
3678 });
3679 }
3680
3681 livefiles
3682 }
3683 }
3684}
3685
3686struct LiveFileGuard(*mut rocksdb_livefile_t);
3687
3688impl LiveFileGuard {
3689 fn into_raw(mut self) -> *mut rocksdb_livefile_t {
3690 let ptr = self.0;
3691 self.0 = ptr::null_mut();
3692 ptr
3693 }
3694}
3695
3696impl Drop for LiveFileGuard {
3697 fn drop(&mut self) {
3698 if !self.0.is_null() {
3699 unsafe {
3700 rocksdb_livefile_destroy(self.0);
3701 }
3702 }
3703 }
3704}
3705
3706struct LiveFilesGuard(*mut rocksdb_livefiles_t);
3707
3708impl LiveFilesGuard {
3709 fn into_raw(mut self) -> *mut rocksdb_livefiles_t {
3710 let ptr = self.0;
3711 self.0 = ptr::null_mut();
3712 ptr
3713 }
3714}
3715
3716impl Drop for LiveFilesGuard {
3717 fn drop(&mut self) {
3718 if !self.0.is_null() {
3719 unsafe {
3720 rocksdb_livefiles_destroy(self.0);
3721 }
3722 }
3723 }
3724}
3725
3726#[derive(Debug)]
3731pub struct ExportImportFilesMetaData {
3732 pub(crate) inner: *mut ffi::rocksdb_export_import_files_metadata_t,
3733}
3734
3735impl ExportImportFilesMetaData {
3736 pub fn get_db_comparator_name(&self) -> String {
3737 unsafe {
3738 let c_name =
3739 ffi::rocksdb_export_import_files_metadata_get_db_comparator_name(self.inner);
3740 from_cstr_and_free(c_name)
3741 }
3742 }
3743
3744 pub fn set_db_comparator_name(&mut self, name: &str) {
3745 let c_name = CString::new(name.as_bytes()).unwrap();
3746 unsafe {
3747 ffi::rocksdb_export_import_files_metadata_set_db_comparator_name(
3748 self.inner,
3749 c_name.as_ptr(),
3750 );
3751 };
3752 }
3753
3754 pub fn get_files(&self) -> Vec<LiveFile> {
3755 unsafe {
3756 let livefiles_ptr = ffi::rocksdb_export_import_files_metadata_get_files(self.inner);
3757 let files = LiveFile::from_rocksdb_livefiles_ptr(livefiles_ptr);
3758 ffi::rocksdb_livefiles_destroy(livefiles_ptr);
3759 files
3760 }
3761 }
3762
3763 pub fn set_files(&mut self, files: &[LiveFile]) -> Result<(), Error> {
3764 static EMPTY: [u8; 0] = [];
3766 let empty_ptr = EMPTY.as_ptr() as *const libc::c_char;
3767
3768 unsafe {
3769 let live_files = LiveFilesGuard(ffi::rocksdb_livefiles_create());
3770
3771 for file in files {
3772 let live_file = LiveFileGuard(ffi::rocksdb_livefile_create());
3773 ffi::rocksdb_livefile_set_level(live_file.0, file.level);
3774
3775 let c_cf_name = CString::new(file.column_family_name.as_str()).map_err(|err| {
3777 Error::new(format!("Unable to convert column family to CString: {err}"))
3778 })?;
3779 ffi::rocksdb_livefile_set_column_family_name(live_file.0, c_cf_name.as_ptr());
3780
3781 let c_name = CString::new(file.name.as_str()).map_err(|err| {
3782 Error::new(format!("Unable to convert file name to CString: {err}"))
3783 })?;
3784 ffi::rocksdb_livefile_set_name(live_file.0, c_name.as_ptr());
3785
3786 let c_directory = CString::new(file.directory.as_str()).map_err(|err| {
3787 Error::new(format!("Unable to convert directory to CString: {err}"))
3788 })?;
3789 ffi::rocksdb_livefile_set_directory(live_file.0, c_directory.as_ptr());
3790
3791 ffi::rocksdb_livefile_set_size(live_file.0, file.size);
3792
3793 let (start_key_ptr, start_key_len) = match &file.start_key {
3794 None => (empty_ptr, 0),
3795 Some(key) => (key.as_ptr() as *const libc::c_char, key.len()),
3796 };
3797 ffi::rocksdb_livefile_set_smallest_key(live_file.0, start_key_ptr, start_key_len);
3798
3799 let (largest_key_ptr, largest_key_len) = match &file.end_key {
3800 None => (empty_ptr, 0),
3801 Some(key) => (key.as_ptr() as *const libc::c_char, key.len()),
3802 };
3803 ffi::rocksdb_livefile_set_largest_key(
3804 live_file.0,
3805 largest_key_ptr,
3806 largest_key_len,
3807 );
3808 ffi::rocksdb_livefile_set_smallest_seqno(live_file.0, file.smallest_seqno);
3809 ffi::rocksdb_livefile_set_largest_seqno(live_file.0, file.largest_seqno);
3810 ffi::rocksdb_livefile_set_num_entries(live_file.0, file.num_entries);
3811 ffi::rocksdb_livefile_set_num_deletions(live_file.0, file.num_deletions);
3812
3813 ffi::rocksdb_livefiles_add(live_files.0, live_file.into_raw());
3815 }
3816
3817 ffi::rocksdb_export_import_files_metadata_set_files(self.inner, live_files.into_raw());
3819 Ok(())
3820 }
3821 }
3822}
3823
3824impl Default for ExportImportFilesMetaData {
3825 fn default() -> Self {
3826 let inner = unsafe { ffi::rocksdb_export_import_files_metadata_create() };
3827 assert!(
3828 !inner.is_null(),
3829 "Could not create rocksdb_export_import_files_metadata_t"
3830 );
3831
3832 Self { inner }
3833 }
3834}
3835
3836impl Drop for ExportImportFilesMetaData {
3837 fn drop(&mut self) {
3838 unsafe {
3839 ffi::rocksdb_export_import_files_metadata_destroy(self.inner);
3840 }
3841 }
3842}
3843
3844unsafe impl Send for ExportImportFilesMetaData {}
3845unsafe impl Sync for ExportImportFilesMetaData {}
3846
3847fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
3848 opts.iter()
3849 .map(|(name, value)| {
3850 let cname = match CString::new(name.as_bytes()) {
3851 Ok(cname) => cname,
3852 Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
3853 };
3854 let cvalue = match CString::new(value.as_bytes()) {
3855 Ok(cvalue) => cvalue,
3856 Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
3857 };
3858 Ok((cname, cvalue))
3859 })
3860 .collect()
3861}
3862
3863pub(crate) fn convert_values(
3864 values: Vec<*mut c_char>,
3865 values_sizes: Vec<usize>,
3866 errors: Vec<*mut c_char>,
3867) -> Vec<Result<Option<Vec<u8>>, Error>> {
3868 values
3869 .into_iter()
3870 .zip(values_sizes)
3871 .zip(errors)
3872 .map(|((v, s), e)| {
3873 if e.is_null() {
3874 let value = unsafe { crate::ffi_util::raw_data(v, s) };
3875 unsafe {
3876 ffi::rocksdb_free(v as *mut c_void);
3877 }
3878 Ok(value)
3879 } else {
3880 Err(convert_rocksdb_error(e))
3881 }
3882 })
3883 .collect()
3884}