1use crate::{
17 column_family::AsColumnFamilyRef,
18 column_family::BoundColumnFamily,
19 column_family::UnboundColumnFamily,
20 db_options::OptionsMustOutliveDB,
21 ffi,
22 ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23 ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24 DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25 IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26 WaitForCompactOptions, WriteBatch, WriteBatchWithIndex, WriteOptions,
27 DEFAULT_COLUMN_FAMILY_NAME,
28};
29
30use crate::column_family::ColumnFamilyTtl;
31use crate::ffi_util::CSlice;
32use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
33use parking_lot::RwLock;
34use std::collections::BTreeMap;
35use std::ffi::{CStr, CString};
36use std::fmt;
37use std::fs;
38use std::iter;
39use std::path::Path;
40use std::path::PathBuf;
41use std::ptr;
42use std::slice;
43use std::str;
44use std::sync::Arc;
45use std::time::Duration;
46
47pub struct Range<'a> {
51 start_key: &'a [u8],
52 end_key: &'a [u8],
53}
54
55impl<'a> Range<'a> {
56 pub fn new(start_key: &'a [u8], end_key: &'a [u8]) -> Range<'a> {
57 Range { start_key, end_key }
58 }
59}
60
61pub trait ThreadMode {
72 fn new_cf_map_internal(
74 cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
75 ) -> Self;
76 fn drop_all_cfs_internal(&mut self);
78}
79
80pub struct SingleThreaded {
87 pub(crate) cfs: BTreeMap<String, ColumnFamily>,
88}
89
90pub struct MultiThreaded {
96 pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
97}
98
99impl ThreadMode for SingleThreaded {
100 fn new_cf_map_internal(
101 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
102 ) -> Self {
103 Self {
104 cfs: cfs
105 .into_iter()
106 .map(|(n, c)| (n, ColumnFamily { inner: c }))
107 .collect(),
108 }
109 }
110
111 fn drop_all_cfs_internal(&mut self) {
112 self.cfs.clear();
114 }
115}
116
117impl ThreadMode for MultiThreaded {
118 fn new_cf_map_internal(
119 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
120 ) -> Self {
121 Self {
122 cfs: RwLock::new(
123 cfs.into_iter()
124 .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
125 .collect(),
126 ),
127 }
128 }
129
130 fn drop_all_cfs_internal(&mut self) {
131 self.cfs.write().clear();
133 }
134}
135
136pub trait DBInner {
138 fn inner(&self) -> *mut ffi::rocksdb_t;
139}
140
141pub struct DBCommon<T: ThreadMode, D: DBInner> {
146 pub(crate) inner: D,
147 cfs: T, path: PathBuf,
149 _outlive: Vec<OptionsMustOutliveDB>,
150}
151
152pub trait DBAccess {
155 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
156
157 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
158
159 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
160
161 unsafe fn create_iterator_cf(
162 &self,
163 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
164 readopts: &ReadOptions,
165 ) -> *mut ffi::rocksdb_iterator_t;
166
167 fn get_opt<K: AsRef<[u8]>>(
168 &self,
169 key: K,
170 readopts: &ReadOptions,
171 ) -> Result<Option<Vec<u8>>, Error>;
172
173 fn get_cf_opt<K: AsRef<[u8]>>(
174 &self,
175 cf: &impl AsColumnFamilyRef,
176 key: K,
177 readopts: &ReadOptions,
178 ) -> Result<Option<Vec<u8>>, Error>;
179
180 fn get_pinned_opt<K: AsRef<[u8]>>(
181 &'_ self,
182 key: K,
183 readopts: &ReadOptions,
184 ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
185
186 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
187 &'_ self,
188 cf: &impl AsColumnFamilyRef,
189 key: K,
190 readopts: &ReadOptions,
191 ) -> Result<Option<DBPinnableSlice<'_>>, Error>;
192
193 fn multi_get_opt<K, I>(
194 &self,
195 keys: I,
196 readopts: &ReadOptions,
197 ) -> Vec<Result<Option<Vec<u8>>, Error>>
198 where
199 K: AsRef<[u8]>,
200 I: IntoIterator<Item = K>;
201
202 fn multi_get_cf_opt<'b, K, I, W>(
203 &self,
204 keys_cf: I,
205 readopts: &ReadOptions,
206 ) -> Vec<Result<Option<Vec<u8>>, Error>>
207 where
208 K: AsRef<[u8]>,
209 I: IntoIterator<Item = (&'b W, K)>,
210 W: AsColumnFamilyRef + 'b;
211}
212
213impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
214 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
215 unsafe { ffi::rocksdb_create_snapshot(self.inner.inner()) }
216 }
217
218 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
219 unsafe {
220 ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
221 }
222 }
223
224 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
225 unsafe { ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner) }
226 }
227
228 unsafe fn create_iterator_cf(
229 &self,
230 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
231 readopts: &ReadOptions,
232 ) -> *mut ffi::rocksdb_iterator_t {
233 unsafe { ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle) }
234 }
235
236 fn get_opt<K: AsRef<[u8]>>(
237 &self,
238 key: K,
239 readopts: &ReadOptions,
240 ) -> Result<Option<Vec<u8>>, Error> {
241 self.get_opt(key, readopts)
242 }
243
244 fn get_cf_opt<K: AsRef<[u8]>>(
245 &self,
246 cf: &impl AsColumnFamilyRef,
247 key: K,
248 readopts: &ReadOptions,
249 ) -> Result<Option<Vec<u8>>, Error> {
250 self.get_cf_opt(cf, key, readopts)
251 }
252
253 fn get_pinned_opt<K: AsRef<[u8]>>(
254 &'_ self,
255 key: K,
256 readopts: &ReadOptions,
257 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
258 self.get_pinned_opt(key, readopts)
259 }
260
261 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
262 &'_ self,
263 cf: &impl AsColumnFamilyRef,
264 key: K,
265 readopts: &ReadOptions,
266 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
267 self.get_pinned_cf_opt(cf, key, readopts)
268 }
269
270 fn multi_get_opt<K, Iter>(
271 &self,
272 keys: Iter,
273 readopts: &ReadOptions,
274 ) -> Vec<Result<Option<Vec<u8>>, Error>>
275 where
276 K: AsRef<[u8]>,
277 Iter: IntoIterator<Item = K>,
278 {
279 self.multi_get_opt(keys, readopts)
280 }
281
282 fn multi_get_cf_opt<'b, K, Iter, W>(
283 &self,
284 keys_cf: Iter,
285 readopts: &ReadOptions,
286 ) -> Vec<Result<Option<Vec<u8>>, Error>>
287 where
288 K: AsRef<[u8]>,
289 Iter: IntoIterator<Item = (&'b W, K)>,
290 W: AsColumnFamilyRef + 'b,
291 {
292 self.multi_get_cf_opt(keys_cf, readopts)
293 }
294}
295
296pub struct DBWithThreadModeInner {
297 inner: *mut ffi::rocksdb_t,
298}
299
300impl DBInner for DBWithThreadModeInner {
301 fn inner(&self) -> *mut ffi::rocksdb_t {
302 self.inner
303 }
304}
305
306impl Drop for DBWithThreadModeInner {
307 fn drop(&mut self) {
308 unsafe {
309 ffi::rocksdb_close(self.inner);
310 }
311 }
312}
313
314pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
319
320#[cfg(not(feature = "multi-threaded-cf"))]
343pub type DB = DBWithThreadMode<SingleThreaded>;
344
345#[cfg(feature = "multi-threaded-cf")]
346pub type DB = DBWithThreadMode<MultiThreaded>;
347
348unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
352
353unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
356
357enum AccessType<'a> {
359 ReadWrite,
360 ReadOnly { error_if_log_file_exist: bool },
361 Secondary { secondary_path: &'a Path },
362 WithTTL { ttl: Duration },
363}
364
365impl<T: ThreadMode> DBWithThreadMode<T> {
367 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
369 let mut opts = Options::default();
370 opts.create_if_missing(true);
371 Self::open(&opts, path)
372 }
373
374 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
376 Self::open_cf(opts, path, None::<&str>)
377 }
378
379 pub fn open_for_read_only<P: AsRef<Path>>(
381 opts: &Options,
382 path: P,
383 error_if_log_file_exist: bool,
384 ) -> Result<Self, Error> {
385 Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
386 }
387
388 pub fn open_as_secondary<P: AsRef<Path>>(
390 opts: &Options,
391 primary_path: P,
392 secondary_path: P,
393 ) -> Result<Self, Error> {
394 Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
395 }
396
397 pub fn open_with_ttl<P: AsRef<Path>>(
402 opts: &Options,
403 path: P,
404 ttl: Duration,
405 ) -> Result<Self, Error> {
406 Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
407 }
408
409 pub fn open_cf_with_ttl<P, I, N>(
413 opts: &Options,
414 path: P,
415 cfs: I,
416 ttl: Duration,
417 ) -> Result<Self, Error>
418 where
419 P: AsRef<Path>,
420 I: IntoIterator<Item = N>,
421 N: AsRef<str>,
422 {
423 let cfs = cfs
424 .into_iter()
425 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
426
427 Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
428 }
429
430 pub fn open_cf_descriptors_with_ttl<P, I>(
444 opts: &Options,
445 path: P,
446 cfs: I,
447 ttl: Duration,
448 ) -> Result<Self, Error>
449 where
450 P: AsRef<Path>,
451 I: IntoIterator<Item = ColumnFamilyDescriptor>,
452 {
453 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
454 }
455
456 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
460 where
461 P: AsRef<Path>,
462 I: IntoIterator<Item = N>,
463 N: AsRef<str>,
464 {
465 let cfs = cfs
466 .into_iter()
467 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
468
469 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
470 }
471
472 pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
476 where
477 P: AsRef<Path>,
478 I: IntoIterator<Item = (N, Options)>,
479 N: AsRef<str>,
480 {
481 let cfs = cfs
482 .into_iter()
483 .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
484
485 Self::open_cf_descriptors(opts, path, cfs)
486 }
487
488 pub fn open_cf_for_read_only<P, I, N>(
492 opts: &Options,
493 path: P,
494 cfs: I,
495 error_if_log_file_exist: bool,
496 ) -> Result<Self, Error>
497 where
498 P: AsRef<Path>,
499 I: IntoIterator<Item = N>,
500 N: AsRef<str>,
501 {
502 let cfs = cfs
503 .into_iter()
504 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
505
506 Self::open_cf_descriptors_internal(
507 opts,
508 path,
509 cfs,
510 &AccessType::ReadOnly {
511 error_if_log_file_exist,
512 },
513 )
514 }
515
516 pub fn open_cf_with_opts_for_read_only<P, I, N>(
520 db_opts: &Options,
521 path: P,
522 cfs: I,
523 error_if_log_file_exist: bool,
524 ) -> Result<Self, Error>
525 where
526 P: AsRef<Path>,
527 I: IntoIterator<Item = (N, Options)>,
528 N: AsRef<str>,
529 {
530 let cfs = cfs
531 .into_iter()
532 .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
533
534 Self::open_cf_descriptors_internal(
535 db_opts,
536 path,
537 cfs,
538 &AccessType::ReadOnly {
539 error_if_log_file_exist,
540 },
541 )
542 }
543
544 pub fn open_cf_descriptors_read_only<P, I>(
549 opts: &Options,
550 path: P,
551 cfs: I,
552 error_if_log_file_exist: bool,
553 ) -> Result<Self, Error>
554 where
555 P: AsRef<Path>,
556 I: IntoIterator<Item = ColumnFamilyDescriptor>,
557 {
558 Self::open_cf_descriptors_internal(
559 opts,
560 path,
561 cfs,
562 &AccessType::ReadOnly {
563 error_if_log_file_exist,
564 },
565 )
566 }
567
568 pub fn open_cf_as_secondary<P, I, N>(
572 opts: &Options,
573 primary_path: P,
574 secondary_path: P,
575 cfs: I,
576 ) -> Result<Self, Error>
577 where
578 P: AsRef<Path>,
579 I: IntoIterator<Item = N>,
580 N: AsRef<str>,
581 {
582 let cfs = cfs
583 .into_iter()
584 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
585
586 Self::open_cf_descriptors_internal(
587 opts,
588 primary_path,
589 cfs,
590 &AccessType::Secondary {
591 secondary_path: secondary_path.as_ref(),
592 },
593 )
594 }
595
596 pub fn open_cf_descriptors_as_secondary<P, I>(
601 opts: &Options,
602 path: P,
603 secondary_path: P,
604 cfs: I,
605 ) -> Result<Self, Error>
606 where
607 P: AsRef<Path>,
608 I: IntoIterator<Item = ColumnFamilyDescriptor>,
609 {
610 Self::open_cf_descriptors_internal(
611 opts,
612 path,
613 cfs,
614 &AccessType::Secondary {
615 secondary_path: secondary_path.as_ref(),
616 },
617 )
618 }
619
620 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
624 where
625 P: AsRef<Path>,
626 I: IntoIterator<Item = ColumnFamilyDescriptor>,
627 {
628 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
629 }
630
631 fn open_cf_descriptors_internal<P, I>(
633 opts: &Options,
634 path: P,
635 cfs: I,
636 access_type: &AccessType,
637 ) -> Result<Self, Error>
638 where
639 P: AsRef<Path>,
640 I: IntoIterator<Item = ColumnFamilyDescriptor>,
641 {
642 let cfs: Vec<_> = cfs.into_iter().collect();
643 let outlive = iter::once(opts.outlive.clone())
644 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
645 .collect();
646
647 let cpath = to_cpath(&path)?;
648
649 if let Err(e) = fs::create_dir_all(&path) {
650 return Err(Error::new(format!(
651 "Failed to create RocksDB directory: `{e:?}`."
652 )));
653 }
654
655 let db: *mut ffi::rocksdb_t;
656 let mut cf_map = BTreeMap::new();
657
658 if cfs.is_empty() {
659 db = Self::open_raw(opts, &cpath, access_type)?;
660 } else {
661 let mut cfs_v = cfs;
662 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
664 cfs_v.push(ColumnFamilyDescriptor {
665 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
666 options: Options::default(),
667 ttl: ColumnFamilyTtl::SameAsDb,
668 });
669 }
670 let c_cfs: Vec<CString> = cfs_v
673 .iter()
674 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
675 .collect();
676
677 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
678
679 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
681
682 let cfopts: Vec<_> = cfs_v
683 .iter()
684 .map(|cf| cf.options.inner.cast_const())
685 .collect();
686
687 db = Self::open_cf_raw(
688 opts,
689 &cpath,
690 &cfs_v,
691 &cfnames,
692 &cfopts,
693 &mut cfhandles,
694 access_type,
695 )?;
696 for handle in &cfhandles {
697 if handle.is_null() {
698 return Err(Error::new(
699 "Received null column family handle from DB.".to_owned(),
700 ));
701 }
702 }
703
704 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
705 cf_map.insert(cf_desc.name.clone(), inner);
706 }
707 }
708
709 if db.is_null() {
710 return Err(Error::new("Could not initialize database.".to_owned()));
711 }
712
713 Ok(Self {
714 inner: DBWithThreadModeInner { inner: db },
715 path: path.as_ref().to_path_buf(),
716 cfs: T::new_cf_map_internal(cf_map),
717 _outlive: outlive,
718 })
719 }
720
721 fn open_raw(
722 opts: &Options,
723 cpath: &CString,
724 access_type: &AccessType,
725 ) -> Result<*mut ffi::rocksdb_t, Error> {
726 let db = unsafe {
727 match *access_type {
728 AccessType::ReadOnly {
729 error_if_log_file_exist,
730 } => ffi_try!(ffi::rocksdb_open_for_read_only(
731 opts.inner,
732 cpath.as_ptr(),
733 c_uchar::from(error_if_log_file_exist),
734 )),
735 AccessType::ReadWrite => {
736 ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
737 }
738 AccessType::Secondary { secondary_path } => {
739 ffi_try!(ffi::rocksdb_open_as_secondary(
740 opts.inner,
741 cpath.as_ptr(),
742 to_cpath(secondary_path)?.as_ptr(),
743 ))
744 }
745 AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
746 opts.inner,
747 cpath.as_ptr(),
748 ttl.as_secs() as c_int,
749 )),
750 }
751 };
752 Ok(db)
753 }
754
755 #[allow(clippy::pedantic)]
756 fn open_cf_raw(
757 opts: &Options,
758 cpath: &CString,
759 cfs_v: &[ColumnFamilyDescriptor],
760 cfnames: &[*const c_char],
761 cfopts: &[*const ffi::rocksdb_options_t],
762 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
763 access_type: &AccessType,
764 ) -> Result<*mut ffi::rocksdb_t, Error> {
765 let db = unsafe {
766 match *access_type {
767 AccessType::ReadOnly {
768 error_if_log_file_exist,
769 } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
770 opts.inner,
771 cpath.as_ptr(),
772 cfs_v.len() as c_int,
773 cfnames.as_ptr(),
774 cfopts.as_ptr(),
775 cfhandles.as_mut_ptr(),
776 c_uchar::from(error_if_log_file_exist),
777 )),
778 AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
779 opts.inner,
780 cpath.as_ptr(),
781 cfs_v.len() as c_int,
782 cfnames.as_ptr(),
783 cfopts.as_ptr(),
784 cfhandles.as_mut_ptr(),
785 )),
786 AccessType::Secondary { secondary_path } => {
787 ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
788 opts.inner,
789 cpath.as_ptr(),
790 to_cpath(secondary_path)?.as_ptr(),
791 cfs_v.len() as c_int,
792 cfnames.as_ptr(),
793 cfopts.as_ptr(),
794 cfhandles.as_mut_ptr(),
795 ))
796 }
797 AccessType::WithTTL { ttl } => {
798 let ttls: Vec<_> = cfs_v
799 .iter()
800 .map(|cf| match cf.ttl {
801 ColumnFamilyTtl::Disabled => i32::MAX,
802 ColumnFamilyTtl::Duration(duration) => duration.as_secs() as i32,
803 ColumnFamilyTtl::SameAsDb => ttl.as_secs() as i32,
804 })
805 .collect();
806
807 ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
808 opts.inner,
809 cpath.as_ptr(),
810 cfs_v.len() as c_int,
811 cfnames.as_ptr(),
812 cfopts.as_ptr(),
813 cfhandles.as_mut_ptr(),
814 ttls.as_ptr(),
815 ))
816 }
817 }
818 };
819 Ok(db)
820 }
821
822 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
824 &self,
825 cf: &impl AsColumnFamilyRef,
826 from: K,
827 to: K,
828 writeopts: &WriteOptions,
829 ) -> Result<(), Error> {
830 let from = from.as_ref();
831 let to = to.as_ref();
832
833 unsafe {
834 ffi_try!(ffi::rocksdb_delete_range_cf(
835 self.inner.inner(),
836 writeopts.inner,
837 cf.inner(),
838 from.as_ptr() as *const c_char,
839 from.len() as size_t,
840 to.as_ptr() as *const c_char,
841 to.len() as size_t,
842 ));
843 Ok(())
844 }
845 }
846
847 pub fn delete_range_cf<K: AsRef<[u8]>>(
849 &self,
850 cf: &impl AsColumnFamilyRef,
851 from: K,
852 to: K,
853 ) -> Result<(), Error> {
854 self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
855 }
856
857 pub fn write_opt(&self, batch: &WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
858 unsafe {
859 ffi_try!(ffi::rocksdb_write(
860 self.inner.inner(),
861 writeopts.inner,
862 batch.inner
863 ));
864 }
865 Ok(())
866 }
867
868 pub fn write(&self, batch: &WriteBatch) -> Result<(), Error> {
869 self.write_opt(batch, &WriteOptions::default())
870 }
871
872 pub fn write_without_wal(&self, batch: &WriteBatch) -> Result<(), Error> {
873 let mut wo = WriteOptions::new();
874 wo.disable_wal(true);
875 self.write_opt(batch, &wo)
876 }
877
878 pub fn write_wbwi(&self, wbwi: &WriteBatchWithIndex) -> Result<(), Error> {
879 self.write_wbwi_opt(wbwi, &WriteOptions::default())
880 }
881
882 pub fn write_wbwi_opt(
883 &self,
884 wbwi: &WriteBatchWithIndex,
885 writeopts: &WriteOptions,
886 ) -> Result<(), Error> {
887 unsafe {
888 ffi_try!(ffi::rocksdb_write_writebatch_wi(
889 self.inner.inner(),
890 writeopts.inner,
891 wbwi.inner
892 ));
893
894 Ok(())
895 }
896 }
897
898 pub fn disable_file_deletions(&self) -> Result<(), Error> {
903 unsafe {
904 ffi_try!(ffi::rocksdb_disable_file_deletions(self.inner.inner()));
905 }
906 Ok(())
907 }
908
909 pub fn enable_file_deletions(&self) -> Result<(), Error> {
921 unsafe {
922 ffi_try!(ffi::rocksdb_enable_file_deletions(self.inner.inner()));
923 }
924 Ok(())
925 }
926}
927
928impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
930 pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
931 Self {
932 inner,
933 cfs,
934 path,
935 _outlive: outlive,
936 }
937 }
938
939 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
940 let cpath = to_cpath(path)?;
941 let mut length = 0;
942
943 unsafe {
944 let ptr = ffi_try!(ffi::rocksdb_list_column_families(
945 opts.inner,
946 cpath.as_ptr(),
947 &mut length,
948 ));
949
950 let vec = slice::from_raw_parts(ptr, length)
951 .iter()
952 .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
953 .collect();
954 ffi::rocksdb_list_column_families_destroy(ptr, length);
955 Ok(vec)
956 }
957 }
958
959 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
960 let cpath = to_cpath(path)?;
961 unsafe {
962 ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
963 }
964 Ok(())
965 }
966
967 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
968 let cpath = to_cpath(path)?;
969 unsafe {
970 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
971 }
972 Ok(())
973 }
974
975 pub fn path(&self) -> &Path {
976 self.path.as_path()
977 }
978
979 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
982 unsafe {
983 ffi_try!(ffi::rocksdb_flush_wal(
984 self.inner.inner(),
985 c_uchar::from(sync)
986 ));
987 }
988 Ok(())
989 }
990
991 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
993 unsafe {
994 ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
995 }
996 Ok(())
997 }
998
999 pub fn flush(&self) -> Result<(), Error> {
1001 self.flush_opt(&FlushOptions::default())
1002 }
1003
1004 pub fn flush_cf_opt(
1006 &self,
1007 cf: &impl AsColumnFamilyRef,
1008 flushopts: &FlushOptions,
1009 ) -> Result<(), Error> {
1010 unsafe {
1011 ffi_try!(ffi::rocksdb_flush_cf(
1012 self.inner.inner(),
1013 flushopts.inner,
1014 cf.inner()
1015 ));
1016 }
1017 Ok(())
1018 }
1019
1020 pub fn flush_cfs_opt(
1026 &self,
1027 cfs: &[&impl AsColumnFamilyRef],
1028 opts: &FlushOptions,
1029 ) -> Result<(), Error> {
1030 let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
1031 unsafe {
1032 ffi_try!(ffi::rocksdb_flush_cfs(
1033 self.inner.inner(),
1034 opts.inner,
1035 cfs.as_mut_ptr(),
1036 cfs.len() as libc::c_int,
1037 ));
1038 }
1039 Ok(())
1040 }
1041
1042 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
1045 self.flush_cf_opt(cf, &FlushOptions::default())
1046 }
1047
1048 pub fn get_opt<K: AsRef<[u8]>>(
1052 &self,
1053 key: K,
1054 readopts: &ReadOptions,
1055 ) -> Result<Option<Vec<u8>>, Error> {
1056 self.get_pinned_opt(key, readopts)
1057 .map(|x| x.map(|v| v.as_ref().to_vec()))
1058 }
1059
1060 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
1064 self.get_opt(key.as_ref(), &ReadOptions::default())
1065 }
1066
1067 pub fn get_cf_opt<K: AsRef<[u8]>>(
1071 &self,
1072 cf: &impl AsColumnFamilyRef,
1073 key: K,
1074 readopts: &ReadOptions,
1075 ) -> Result<Option<Vec<u8>>, Error> {
1076 self.get_pinned_cf_opt(cf, key, readopts)
1077 .map(|x| x.map(|v| v.as_ref().to_vec()))
1078 }
1079
1080 pub fn get_cf<K: AsRef<[u8]>>(
1084 &self,
1085 cf: &impl AsColumnFamilyRef,
1086 key: K,
1087 ) -> Result<Option<Vec<u8>>, Error> {
1088 self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
1089 }
1090
1091 pub fn get_pinned_opt<K: AsRef<[u8]>>(
1094 &'_ self,
1095 key: K,
1096 readopts: &ReadOptions,
1097 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1098 if readopts.inner.is_null() {
1099 return Err(Error::new(
1100 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1101 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1102 .to_owned(),
1103 ));
1104 }
1105
1106 let key = key.as_ref();
1107 unsafe {
1108 let val = ffi_try!(ffi::rocksdb_get_pinned(
1109 self.inner.inner(),
1110 readopts.inner,
1111 key.as_ptr() as *const c_char,
1112 key.len() as size_t,
1113 ));
1114 if val.is_null() {
1115 Ok(None)
1116 } else {
1117 Ok(Some(DBPinnableSlice::from_c(val)))
1118 }
1119 }
1120 }
1121
1122 pub fn get_pinned<K: AsRef<[u8]>>(
1126 &'_ self,
1127 key: K,
1128 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1129 self.get_pinned_opt(key, &ReadOptions::default())
1130 }
1131
1132 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1136 &'_ self,
1137 cf: &impl AsColumnFamilyRef,
1138 key: K,
1139 readopts: &ReadOptions,
1140 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1141 if readopts.inner.is_null() {
1142 return Err(Error::new(
1143 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1144 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1145 .to_owned(),
1146 ));
1147 }
1148
1149 let key = key.as_ref();
1150 unsafe {
1151 let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1152 self.inner.inner(),
1153 readopts.inner,
1154 cf.inner(),
1155 key.as_ptr() as *const c_char,
1156 key.len() as size_t,
1157 ));
1158 if val.is_null() {
1159 Ok(None)
1160 } else {
1161 Ok(Some(DBPinnableSlice::from_c(val)))
1162 }
1163 }
1164 }
1165
1166 pub fn get_pinned_cf<K: AsRef<[u8]>>(
1170 &'_ self,
1171 cf: &impl AsColumnFamilyRef,
1172 key: K,
1173 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
1174 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1175 }
1176
1177 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1179 where
1180 K: AsRef<[u8]>,
1181 I: IntoIterator<Item = K>,
1182 {
1183 self.multi_get_opt(keys, &ReadOptions::default())
1184 }
1185
1186 pub fn multi_get_opt<K, I>(
1188 &self,
1189 keys: I,
1190 readopts: &ReadOptions,
1191 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1192 where
1193 K: AsRef<[u8]>,
1194 I: IntoIterator<Item = K>,
1195 {
1196 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1197 .into_iter()
1198 .map(|k| {
1199 let k = k.as_ref();
1200 (Box::from(k), k.len())
1201 })
1202 .unzip();
1203 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1204
1205 let mut values = vec![ptr::null_mut(); keys.len()];
1206 let mut values_sizes = vec![0_usize; keys.len()];
1207 let mut errors = vec![ptr::null_mut(); keys.len()];
1208 unsafe {
1209 ffi::rocksdb_multi_get(
1210 self.inner.inner(),
1211 readopts.inner,
1212 ptr_keys.len(),
1213 ptr_keys.as_ptr(),
1214 keys_sizes.as_ptr(),
1215 values.as_mut_ptr(),
1216 values_sizes.as_mut_ptr(),
1217 errors.as_mut_ptr(),
1218 );
1219 }
1220
1221 convert_values(values, values_sizes, errors)
1222 }
1223
1224 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1226 &'a self,
1227 keys: I,
1228 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1229 where
1230 K: AsRef<[u8]>,
1231 I: IntoIterator<Item = (&'b W, K)>,
1232 W: 'b + AsColumnFamilyRef,
1233 {
1234 self.multi_get_cf_opt(keys, &ReadOptions::default())
1235 }
1236
1237 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1239 &'a self,
1240 keys: I,
1241 readopts: &ReadOptions,
1242 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1243 where
1244 K: AsRef<[u8]>,
1245 I: IntoIterator<Item = (&'b W, K)>,
1246 W: 'b + AsColumnFamilyRef,
1247 {
1248 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1249 .into_iter()
1250 .map(|(cf, key)| {
1251 let key = key.as_ref();
1252 ((cf, Box::from(key)), key.len())
1253 })
1254 .unzip();
1255 let ptr_keys: Vec<_> = cfs_and_keys
1256 .iter()
1257 .map(|(_, k)| k.as_ptr() as *const c_char)
1258 .collect();
1259 let ptr_cfs: Vec<_> = cfs_and_keys
1260 .iter()
1261 .map(|(c, _)| c.inner().cast_const())
1262 .collect();
1263
1264 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1265 let mut values_sizes = vec![0_usize; ptr_keys.len()];
1266 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1267 unsafe {
1268 ffi::rocksdb_multi_get_cf(
1269 self.inner.inner(),
1270 readopts.inner,
1271 ptr_cfs.as_ptr(),
1272 ptr_keys.len(),
1273 ptr_keys.as_ptr(),
1274 keys_sizes.as_ptr(),
1275 values.as_mut_ptr(),
1276 values_sizes.as_mut_ptr(),
1277 errors.as_mut_ptr(),
1278 );
1279 }
1280
1281 convert_values(values, values_sizes, errors)
1282 }
1283
1284 pub fn batched_multi_get_cf<'a, K, I>(
1288 &'_ self,
1289 cf: &impl AsColumnFamilyRef,
1290 keys: I,
1291 sorted_input: bool,
1292 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1293 where
1294 K: AsRef<[u8]> + 'a + ?Sized,
1295 I: IntoIterator<Item = &'a K>,
1296 {
1297 self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1298 }
1299
1300 pub fn batched_multi_get_cf_opt<'a, K, I>(
1304 &'_ self,
1305 cf: &impl AsColumnFamilyRef,
1306 keys: I,
1307 sorted_input: bool,
1308 readopts: &ReadOptions,
1309 ) -> Vec<Result<Option<DBPinnableSlice<'_>>, Error>>
1310 where
1311 K: AsRef<[u8]> + 'a + ?Sized,
1312 I: IntoIterator<Item = &'a K>,
1313 {
1314 let (ptr_keys, keys_sizes): (Vec<_>, Vec<_>) = keys
1315 .into_iter()
1316 .map(|k| {
1317 let k = k.as_ref();
1318 (k.as_ptr() as *const c_char, k.len())
1319 })
1320 .unzip();
1321
1322 let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1323 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1324
1325 unsafe {
1326 ffi::rocksdb_batched_multi_get_cf(
1327 self.inner.inner(),
1328 readopts.inner,
1329 cf.inner(),
1330 ptr_keys.len(),
1331 ptr_keys.as_ptr(),
1332 keys_sizes.as_ptr(),
1333 pinned_values.as_mut_ptr(),
1334 errors.as_mut_ptr(),
1335 sorted_input,
1336 );
1337 pinned_values
1338 .into_iter()
1339 .zip(errors)
1340 .map(|(v, e)| {
1341 if e.is_null() {
1342 if v.is_null() {
1343 Ok(None)
1344 } else {
1345 Ok(Some(DBPinnableSlice::from_c(v)))
1346 }
1347 } else {
1348 Err(Error::new(crate::ffi_util::error_message(e)))
1349 }
1350 })
1351 .collect()
1352 }
1353 }
1354
1355 pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1358 self.key_may_exist_opt(key, &ReadOptions::default())
1359 }
1360
1361 pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1364 let key = key.as_ref();
1365 unsafe {
1366 0 != ffi::rocksdb_key_may_exist(
1367 self.inner.inner(),
1368 readopts.inner,
1369 key.as_ptr() as *const c_char,
1370 key.len() as size_t,
1371 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1377 }
1378 }
1379
1380 pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1383 self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1384 }
1385
1386 pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1389 &self,
1390 cf: &impl AsColumnFamilyRef,
1391 key: K,
1392 readopts: &ReadOptions,
1393 ) -> bool {
1394 let key = key.as_ref();
1395 0 != unsafe {
1396 ffi::rocksdb_key_may_exist_cf(
1397 self.inner.inner(),
1398 readopts.inner,
1399 cf.inner(),
1400 key.as_ptr() as *const c_char,
1401 key.len() as size_t,
1402 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1408 }
1409 }
1410
1411 pub fn key_may_exist_cf_opt_value<K: AsRef<[u8]>>(
1418 &self,
1419 cf: &impl AsColumnFamilyRef,
1420 key: K,
1421 readopts: &ReadOptions,
1422 ) -> (bool, Option<CSlice>) {
1423 let key = key.as_ref();
1424 let mut val: *mut c_char = ptr::null_mut();
1425 let mut val_len: usize = 0;
1426 let mut value_found: c_uchar = 0;
1427 let may_exists = 0
1428 != unsafe {
1429 ffi::rocksdb_key_may_exist_cf(
1430 self.inner.inner(),
1431 readopts.inner,
1432 cf.inner(),
1433 key.as_ptr() as *const c_char,
1434 key.len() as size_t,
1435 &mut val, &mut val_len, ptr::null(), 0, &mut value_found, )
1441 };
1442 if may_exists && value_found != 0 {
1445 (
1446 may_exists,
1447 Some(unsafe { CSlice::from_raw_parts(val, val_len) }),
1448 )
1449 } else {
1450 (may_exists, None)
1451 }
1452 }
1453
1454 fn create_inner_cf_handle(
1455 &self,
1456 name: impl CStrLike,
1457 opts: &Options,
1458 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1459 let cf_name = name.bake().map_err(|err| {
1460 Error::new(format!(
1461 "Failed to convert path to CString when creating cf: {err}"
1462 ))
1463 })?;
1464 Ok(unsafe {
1465 ffi_try!(ffi::rocksdb_create_column_family(
1466 self.inner.inner(),
1467 opts.inner,
1468 cf_name.as_ptr(),
1469 ))
1470 })
1471 }
1472
1473 pub fn iterator<'a: 'b, 'b>(
1474 &'a self,
1475 mode: IteratorMode,
1476 ) -> DBIteratorWithThreadMode<'b, Self> {
1477 let readopts = ReadOptions::default();
1478 self.iterator_opt(mode, readopts)
1479 }
1480
1481 pub fn iterator_opt<'a: 'b, 'b>(
1482 &'a self,
1483 mode: IteratorMode,
1484 readopts: ReadOptions,
1485 ) -> DBIteratorWithThreadMode<'b, Self> {
1486 DBIteratorWithThreadMode::new(self, readopts, mode)
1487 }
1488
1489 pub fn iterator_cf_opt<'a: 'b, 'b>(
1492 &'a self,
1493 cf_handle: &impl AsColumnFamilyRef,
1494 readopts: ReadOptions,
1495 mode: IteratorMode,
1496 ) -> DBIteratorWithThreadMode<'b, Self> {
1497 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1498 }
1499
1500 pub fn full_iterator<'a: 'b, 'b>(
1504 &'a self,
1505 mode: IteratorMode,
1506 ) -> DBIteratorWithThreadMode<'b, Self> {
1507 let mut opts = ReadOptions::default();
1508 opts.set_total_order_seek(true);
1509 DBIteratorWithThreadMode::new(self, opts, mode)
1510 }
1511
1512 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1513 &'a self,
1514 prefix: P,
1515 ) -> DBIteratorWithThreadMode<'b, Self> {
1516 let mut opts = ReadOptions::default();
1517 opts.set_prefix_same_as_start(true);
1518 DBIteratorWithThreadMode::new(
1519 self,
1520 opts,
1521 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1522 )
1523 }
1524
1525 pub fn iterator_cf<'a: 'b, 'b>(
1526 &'a self,
1527 cf_handle: &impl AsColumnFamilyRef,
1528 mode: IteratorMode,
1529 ) -> DBIteratorWithThreadMode<'b, Self> {
1530 let opts = ReadOptions::default();
1531 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1532 }
1533
1534 pub fn full_iterator_cf<'a: 'b, 'b>(
1535 &'a self,
1536 cf_handle: &impl AsColumnFamilyRef,
1537 mode: IteratorMode,
1538 ) -> DBIteratorWithThreadMode<'b, Self> {
1539 let mut opts = ReadOptions::default();
1540 opts.set_total_order_seek(true);
1541 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1542 }
1543
1544 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1545 &'a self,
1546 cf_handle: &impl AsColumnFamilyRef,
1547 prefix: P,
1548 ) -> DBIteratorWithThreadMode<'a, Self> {
1549 let mut opts = ReadOptions::default();
1550 opts.set_prefix_same_as_start(true);
1551 DBIteratorWithThreadMode::<'a, Self>::new_cf(
1552 self,
1553 cf_handle.inner(),
1554 opts,
1555 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1556 )
1557 }
1558
1559 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1561 let opts = ReadOptions::default();
1562 DBRawIteratorWithThreadMode::new(self, opts)
1563 }
1564
1565 pub fn raw_iterator_cf<'a: 'b, 'b>(
1567 &'a self,
1568 cf_handle: &impl AsColumnFamilyRef,
1569 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1570 let opts = ReadOptions::default();
1571 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1572 }
1573
1574 pub fn raw_iterator_opt<'a: 'b, 'b>(
1576 &'a self,
1577 readopts: ReadOptions,
1578 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1579 DBRawIteratorWithThreadMode::new(self, readopts)
1580 }
1581
1582 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1584 &'a self,
1585 cf_handle: &impl AsColumnFamilyRef,
1586 readopts: ReadOptions,
1587 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1588 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1589 }
1590
1591 pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
1592 SnapshotWithThreadMode::<Self>::new(self)
1593 }
1594
1595 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1596 where
1597 K: AsRef<[u8]>,
1598 V: AsRef<[u8]>,
1599 {
1600 let key = key.as_ref();
1601 let value = value.as_ref();
1602
1603 unsafe {
1604 ffi_try!(ffi::rocksdb_put(
1605 self.inner.inner(),
1606 writeopts.inner,
1607 key.as_ptr() as *const c_char,
1608 key.len() as size_t,
1609 value.as_ptr() as *const c_char,
1610 value.len() as size_t,
1611 ));
1612 Ok(())
1613 }
1614 }
1615
1616 pub fn put_cf_opt<K, V>(
1617 &self,
1618 cf: &impl AsColumnFamilyRef,
1619 key: K,
1620 value: V,
1621 writeopts: &WriteOptions,
1622 ) -> Result<(), Error>
1623 where
1624 K: AsRef<[u8]>,
1625 V: AsRef<[u8]>,
1626 {
1627 let key = key.as_ref();
1628 let value = value.as_ref();
1629
1630 unsafe {
1631 ffi_try!(ffi::rocksdb_put_cf(
1632 self.inner.inner(),
1633 writeopts.inner,
1634 cf.inner(),
1635 key.as_ptr() as *const c_char,
1636 key.len() as size_t,
1637 value.as_ptr() as *const c_char,
1638 value.len() as size_t,
1639 ));
1640 Ok(())
1641 }
1642 }
1643
1644 pub fn put_with_ts_opt<K, V, S>(
1651 &self,
1652 key: K,
1653 ts: S,
1654 value: V,
1655 writeopts: &WriteOptions,
1656 ) -> Result<(), Error>
1657 where
1658 K: AsRef<[u8]>,
1659 V: AsRef<[u8]>,
1660 S: AsRef<[u8]>,
1661 {
1662 let key = key.as_ref();
1663 let value = value.as_ref();
1664 let ts = ts.as_ref();
1665 unsafe {
1666 ffi_try!(ffi::rocksdb_put_with_ts(
1667 self.inner.inner(),
1668 writeopts.inner,
1669 key.as_ptr() as *const c_char,
1670 key.len() as size_t,
1671 ts.as_ptr() as *const c_char,
1672 ts.len() as size_t,
1673 value.as_ptr() as *const c_char,
1674 value.len() as size_t,
1675 ));
1676 Ok(())
1677 }
1678 }
1679
1680 pub fn put_cf_with_ts_opt<K, V, S>(
1687 &self,
1688 cf: &impl AsColumnFamilyRef,
1689 key: K,
1690 ts: S,
1691 value: V,
1692 writeopts: &WriteOptions,
1693 ) -> Result<(), Error>
1694 where
1695 K: AsRef<[u8]>,
1696 V: AsRef<[u8]>,
1697 S: AsRef<[u8]>,
1698 {
1699 let key = key.as_ref();
1700 let value = value.as_ref();
1701 let ts = ts.as_ref();
1702 unsafe {
1703 ffi_try!(ffi::rocksdb_put_cf_with_ts(
1704 self.inner.inner(),
1705 writeopts.inner,
1706 cf.inner(),
1707 key.as_ptr() as *const c_char,
1708 key.len() as size_t,
1709 ts.as_ptr() as *const c_char,
1710 ts.len() as size_t,
1711 value.as_ptr() as *const c_char,
1712 value.len() as size_t,
1713 ));
1714 Ok(())
1715 }
1716 }
1717
1718 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1719 where
1720 K: AsRef<[u8]>,
1721 V: AsRef<[u8]>,
1722 {
1723 let key = key.as_ref();
1724 let value = value.as_ref();
1725
1726 unsafe {
1727 ffi_try!(ffi::rocksdb_merge(
1728 self.inner.inner(),
1729 writeopts.inner,
1730 key.as_ptr() as *const c_char,
1731 key.len() as size_t,
1732 value.as_ptr() as *const c_char,
1733 value.len() as size_t,
1734 ));
1735 Ok(())
1736 }
1737 }
1738
1739 pub fn merge_cf_opt<K, V>(
1740 &self,
1741 cf: &impl AsColumnFamilyRef,
1742 key: K,
1743 value: V,
1744 writeopts: &WriteOptions,
1745 ) -> Result<(), Error>
1746 where
1747 K: AsRef<[u8]>,
1748 V: AsRef<[u8]>,
1749 {
1750 let key = key.as_ref();
1751 let value = value.as_ref();
1752
1753 unsafe {
1754 ffi_try!(ffi::rocksdb_merge_cf(
1755 self.inner.inner(),
1756 writeopts.inner,
1757 cf.inner(),
1758 key.as_ptr() as *const c_char,
1759 key.len() as size_t,
1760 value.as_ptr() as *const c_char,
1761 value.len() as size_t,
1762 ));
1763 Ok(())
1764 }
1765 }
1766
1767 pub fn delete_opt<K: AsRef<[u8]>>(
1768 &self,
1769 key: K,
1770 writeopts: &WriteOptions,
1771 ) -> Result<(), Error> {
1772 let key = key.as_ref();
1773
1774 unsafe {
1775 ffi_try!(ffi::rocksdb_delete(
1776 self.inner.inner(),
1777 writeopts.inner,
1778 key.as_ptr() as *const c_char,
1779 key.len() as size_t,
1780 ));
1781 Ok(())
1782 }
1783 }
1784
1785 pub fn delete_cf_opt<K: AsRef<[u8]>>(
1786 &self,
1787 cf: &impl AsColumnFamilyRef,
1788 key: K,
1789 writeopts: &WriteOptions,
1790 ) -> Result<(), Error> {
1791 let key = key.as_ref();
1792
1793 unsafe {
1794 ffi_try!(ffi::rocksdb_delete_cf(
1795 self.inner.inner(),
1796 writeopts.inner,
1797 cf.inner(),
1798 key.as_ptr() as *const c_char,
1799 key.len() as size_t,
1800 ));
1801 Ok(())
1802 }
1803 }
1804
1805 pub fn delete_with_ts_opt<K, S>(
1809 &self,
1810 key: K,
1811 ts: S,
1812 writeopts: &WriteOptions,
1813 ) -> Result<(), Error>
1814 where
1815 K: AsRef<[u8]>,
1816 S: AsRef<[u8]>,
1817 {
1818 let key = key.as_ref();
1819 let ts = ts.as_ref();
1820 unsafe {
1821 ffi_try!(ffi::rocksdb_delete_with_ts(
1822 self.inner.inner(),
1823 writeopts.inner,
1824 key.as_ptr() as *const c_char,
1825 key.len() as size_t,
1826 ts.as_ptr() as *const c_char,
1827 ts.len() as size_t,
1828 ));
1829 Ok(())
1830 }
1831 }
1832
1833 pub fn delete_cf_with_ts_opt<K, S>(
1837 &self,
1838 cf: &impl AsColumnFamilyRef,
1839 key: K,
1840 ts: S,
1841 writeopts: &WriteOptions,
1842 ) -> Result<(), Error>
1843 where
1844 K: AsRef<[u8]>,
1845 S: AsRef<[u8]>,
1846 {
1847 let key = key.as_ref();
1848 let ts = ts.as_ref();
1849 unsafe {
1850 ffi_try!(ffi::rocksdb_delete_cf_with_ts(
1851 self.inner.inner(),
1852 writeopts.inner,
1853 cf.inner(),
1854 key.as_ptr() as *const c_char,
1855 key.len() as size_t,
1856 ts.as_ptr() as *const c_char,
1857 ts.len() as size_t,
1858 ));
1859 Ok(())
1860 }
1861 }
1862
1863 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1864 where
1865 K: AsRef<[u8]>,
1866 V: AsRef<[u8]>,
1867 {
1868 self.put_opt(key, value, &WriteOptions::default())
1869 }
1870
1871 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1872 where
1873 K: AsRef<[u8]>,
1874 V: AsRef<[u8]>,
1875 {
1876 self.put_cf_opt(cf, key, value, &WriteOptions::default())
1877 }
1878
1879 pub fn put_with_ts<K, V, S>(&self, key: K, ts: S, value: V) -> Result<(), Error>
1886 where
1887 K: AsRef<[u8]>,
1888 V: AsRef<[u8]>,
1889 S: AsRef<[u8]>,
1890 {
1891 self.put_with_ts_opt(
1892 key.as_ref(),
1893 ts.as_ref(),
1894 value.as_ref(),
1895 &WriteOptions::default(),
1896 )
1897 }
1898
1899 pub fn put_cf_with_ts<K, V, S>(
1906 &self,
1907 cf: &impl AsColumnFamilyRef,
1908 key: K,
1909 ts: S,
1910 value: V,
1911 ) -> Result<(), Error>
1912 where
1913 K: AsRef<[u8]>,
1914 V: AsRef<[u8]>,
1915 S: AsRef<[u8]>,
1916 {
1917 self.put_cf_with_ts_opt(
1918 cf,
1919 key.as_ref(),
1920 ts.as_ref(),
1921 value.as_ref(),
1922 &WriteOptions::default(),
1923 )
1924 }
1925
1926 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1927 where
1928 K: AsRef<[u8]>,
1929 V: AsRef<[u8]>,
1930 {
1931 self.merge_opt(key, value, &WriteOptions::default())
1932 }
1933
1934 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1935 where
1936 K: AsRef<[u8]>,
1937 V: AsRef<[u8]>,
1938 {
1939 self.merge_cf_opt(cf, key, value, &WriteOptions::default())
1940 }
1941
1942 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1943 self.delete_opt(key, &WriteOptions::default())
1944 }
1945
1946 pub fn delete_cf<K: AsRef<[u8]>>(
1947 &self,
1948 cf: &impl AsColumnFamilyRef,
1949 key: K,
1950 ) -> Result<(), Error> {
1951 self.delete_cf_opt(cf, key, &WriteOptions::default())
1952 }
1953
1954 pub fn delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
1958 &self,
1959 key: K,
1960 ts: S,
1961 ) -> Result<(), Error> {
1962 self.delete_with_ts_opt(key, ts, &WriteOptions::default())
1963 }
1964
1965 pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
1969 &self,
1970 cf: &impl AsColumnFamilyRef,
1971 key: K,
1972 ts: S,
1973 ) -> Result<(), Error> {
1974 self.delete_cf_with_ts_opt(cf, key, ts, &WriteOptions::default())
1975 }
1976
1977 pub fn single_delete_opt<K: AsRef<[u8]>>(
1997 &self,
1998 key: K,
1999 writeopts: &WriteOptions,
2000 ) -> Result<(), Error> {
2001 let key = key.as_ref();
2002
2003 unsafe {
2004 ffi_try!(ffi::rocksdb_singledelete(
2005 self.inner.inner(),
2006 writeopts.inner,
2007 key.as_ptr() as *const c_char,
2008 key.len() as size_t,
2009 ));
2010 Ok(())
2011 }
2012 }
2013
2014 pub fn single_delete_cf_opt<K: AsRef<[u8]>>(
2018 &self,
2019 cf: &impl AsColumnFamilyRef,
2020 key: K,
2021 writeopts: &WriteOptions,
2022 ) -> Result<(), Error> {
2023 let key = key.as_ref();
2024
2025 unsafe {
2026 ffi_try!(ffi::rocksdb_singledelete_cf(
2027 self.inner.inner(),
2028 writeopts.inner,
2029 cf.inner(),
2030 key.as_ptr() as *const c_char,
2031 key.len() as size_t,
2032 ));
2033 Ok(())
2034 }
2035 }
2036
2037 pub fn single_delete_with_ts_opt<K, S>(
2044 &self,
2045 key: K,
2046 ts: S,
2047 writeopts: &WriteOptions,
2048 ) -> Result<(), Error>
2049 where
2050 K: AsRef<[u8]>,
2051 S: AsRef<[u8]>,
2052 {
2053 let key = key.as_ref();
2054 let ts = ts.as_ref();
2055 unsafe {
2056 ffi_try!(ffi::rocksdb_singledelete_with_ts(
2057 self.inner.inner(),
2058 writeopts.inner,
2059 key.as_ptr() as *const c_char,
2060 key.len() as size_t,
2061 ts.as_ptr() as *const c_char,
2062 ts.len() as size_t,
2063 ));
2064 Ok(())
2065 }
2066 }
2067
2068 pub fn single_delete_cf_with_ts_opt<K, S>(
2075 &self,
2076 cf: &impl AsColumnFamilyRef,
2077 key: K,
2078 ts: S,
2079 writeopts: &WriteOptions,
2080 ) -> Result<(), Error>
2081 where
2082 K: AsRef<[u8]>,
2083 S: AsRef<[u8]>,
2084 {
2085 let key = key.as_ref();
2086 let ts = ts.as_ref();
2087 unsafe {
2088 ffi_try!(ffi::rocksdb_singledelete_cf_with_ts(
2089 self.inner.inner(),
2090 writeopts.inner,
2091 cf.inner(),
2092 key.as_ptr() as *const c_char,
2093 key.len() as size_t,
2094 ts.as_ptr() as *const c_char,
2095 ts.len() as size_t,
2096 ));
2097 Ok(())
2098 }
2099 }
2100
2101 pub fn single_delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
2105 self.single_delete_opt(key, &WriteOptions::default())
2106 }
2107
2108 pub fn single_delete_cf<K: AsRef<[u8]>>(
2112 &self,
2113 cf: &impl AsColumnFamilyRef,
2114 key: K,
2115 ) -> Result<(), Error> {
2116 self.single_delete_cf_opt(cf, key, &WriteOptions::default())
2117 }
2118
2119 pub fn single_delete_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2126 &self,
2127 key: K,
2128 ts: S,
2129 ) -> Result<(), Error> {
2130 self.single_delete_with_ts_opt(key, ts, &WriteOptions::default())
2131 }
2132
2133 pub fn single_delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
2140 &self,
2141 cf: &impl AsColumnFamilyRef,
2142 key: K,
2143 ts: S,
2144 ) -> Result<(), Error> {
2145 self.single_delete_cf_with_ts_opt(cf, key, ts, &WriteOptions::default())
2146 }
2147
2148 pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
2150 unsafe {
2151 let start = start.as_ref().map(AsRef::as_ref);
2152 let end = end.as_ref().map(AsRef::as_ref);
2153
2154 ffi::rocksdb_compact_range(
2155 self.inner.inner(),
2156 opt_bytes_to_ptr(start),
2157 start.map_or(0, <[u8]>::len) as size_t,
2158 opt_bytes_to_ptr(end),
2159 end.map_or(0, <[u8]>::len) as size_t,
2160 );
2161 }
2162 }
2163
2164 pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2166 &self,
2167 start: Option<S>,
2168 end: Option<E>,
2169 opts: &CompactOptions,
2170 ) {
2171 unsafe {
2172 let start = start.as_ref().map(AsRef::as_ref);
2173 let end = end.as_ref().map(AsRef::as_ref);
2174
2175 ffi::rocksdb_compact_range_opt(
2176 self.inner.inner(),
2177 opts.inner,
2178 opt_bytes_to_ptr(start),
2179 start.map_or(0, <[u8]>::len) as size_t,
2180 opt_bytes_to_ptr(end),
2181 end.map_or(0, <[u8]>::len) as size_t,
2182 );
2183 }
2184 }
2185
2186 pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2189 &self,
2190 cf: &impl AsColumnFamilyRef,
2191 start: Option<S>,
2192 end: Option<E>,
2193 ) {
2194 unsafe {
2195 let start = start.as_ref().map(AsRef::as_ref);
2196 let end = end.as_ref().map(AsRef::as_ref);
2197
2198 ffi::rocksdb_compact_range_cf(
2199 self.inner.inner(),
2200 cf.inner(),
2201 opt_bytes_to_ptr(start),
2202 start.map_or(0, <[u8]>::len) as size_t,
2203 opt_bytes_to_ptr(end),
2204 end.map_or(0, <[u8]>::len) as size_t,
2205 );
2206 }
2207 }
2208
2209 pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
2211 &self,
2212 cf: &impl AsColumnFamilyRef,
2213 start: Option<S>,
2214 end: Option<E>,
2215 opts: &CompactOptions,
2216 ) {
2217 unsafe {
2218 let start = start.as_ref().map(AsRef::as_ref);
2219 let end = end.as_ref().map(AsRef::as_ref);
2220
2221 ffi::rocksdb_compact_range_cf_opt(
2222 self.inner.inner(),
2223 cf.inner(),
2224 opts.inner,
2225 opt_bytes_to_ptr(start),
2226 start.map_or(0, <[u8]>::len) as size_t,
2227 opt_bytes_to_ptr(end),
2228 end.map_or(0, <[u8]>::len) as size_t,
2229 );
2230 }
2231 }
2232
2233 pub fn wait_for_compact(&self, opts: &WaitForCompactOptions) -> Result<(), Error> {
2242 unsafe {
2243 ffi_try!(ffi::rocksdb_wait_for_compact(
2244 self.inner.inner(),
2245 opts.inner
2246 ));
2247 }
2248 Ok(())
2249 }
2250
2251 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
2252 let copts = convert_options(opts)?;
2253 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2254 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2255 let count = opts.len() as i32;
2256 unsafe {
2257 ffi_try!(ffi::rocksdb_set_options(
2258 self.inner.inner(),
2259 count,
2260 cnames.as_ptr(),
2261 cvalues.as_ptr(),
2262 ));
2263 }
2264 Ok(())
2265 }
2266
2267 pub fn set_options_cf(
2268 &self,
2269 cf: &impl AsColumnFamilyRef,
2270 opts: &[(&str, &str)],
2271 ) -> Result<(), Error> {
2272 let copts = convert_options(opts)?;
2273 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
2274 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
2275 let count = opts.len() as i32;
2276 unsafe {
2277 ffi_try!(ffi::rocksdb_set_options_cf(
2278 self.inner.inner(),
2279 cf.inner(),
2280 count,
2281 cnames.as_ptr(),
2282 cvalues.as_ptr(),
2283 ));
2284 }
2285 Ok(())
2286 }
2287
2288 fn property_value_impl<R>(
2297 name: impl CStrLike,
2298 get_property: impl FnOnce(*const c_char) -> *mut c_char,
2299 parse: impl FnOnce(&str) -> Result<R, Error>,
2300 ) -> Result<Option<R>, Error> {
2301 let value = match name.bake() {
2302 Ok(prop_name) => get_property(prop_name.as_ptr()),
2303 Err(e) => {
2304 return Err(Error::new(format!(
2305 "Failed to convert property name to CString: {e}"
2306 )));
2307 }
2308 };
2309 if value.is_null() {
2310 return Ok(None);
2311 }
2312 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
2313 Ok(s) => parse(s).map(|value| Some(value)),
2314 Err(e) => Err(Error::new(format!(
2315 "Failed to convert property value to string: {e}"
2316 ))),
2317 };
2318 unsafe {
2319 ffi::rocksdb_free(value as *mut c_void);
2320 }
2321 result
2322 }
2323
2324 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
2329 Self::property_value_impl(
2330 name,
2331 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
2332 |str_value| Ok(str_value.to_owned()),
2333 )
2334 }
2335
2336 pub fn property_value_cf(
2341 &self,
2342 cf: &impl AsColumnFamilyRef,
2343 name: impl CStrLike,
2344 ) -> Result<Option<String>, Error> {
2345 Self::property_value_impl(
2346 name,
2347 |prop_name| unsafe {
2348 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
2349 },
2350 |str_value| Ok(str_value.to_owned()),
2351 )
2352 }
2353
2354 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
2355 value.parse::<u64>().map_err(|err| {
2356 Error::new(format!(
2357 "Failed to convert property value {value} to int: {err}"
2358 ))
2359 })
2360 }
2361
2362 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
2367 Self::property_value_impl(
2368 name,
2369 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
2370 Self::parse_property_int_value,
2371 )
2372 }
2373
2374 pub fn property_int_value_cf(
2379 &self,
2380 cf: &impl AsColumnFamilyRef,
2381 name: impl CStrLike,
2382 ) -> Result<Option<u64>, Error> {
2383 Self::property_value_impl(
2384 name,
2385 |prop_name| unsafe {
2386 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
2387 },
2388 Self::parse_property_int_value,
2389 )
2390 }
2391
2392 pub fn latest_sequence_number(&self) -> u64 {
2394 unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
2395 }
2396
2397 pub fn get_approximate_sizes(&self, ranges: &[Range]) -> Vec<u64> {
2405 self.get_approximate_sizes_cfopt(None::<&ColumnFamily>, ranges)
2406 }
2407
2408 pub fn get_approximate_sizes_cf(
2409 &self,
2410 cf: &impl AsColumnFamilyRef,
2411 ranges: &[Range],
2412 ) -> Vec<u64> {
2413 self.get_approximate_sizes_cfopt(Some(cf), ranges)
2414 }
2415
2416 fn get_approximate_sizes_cfopt(
2417 &self,
2418 cf: Option<&impl AsColumnFamilyRef>,
2419 ranges: &[Range],
2420 ) -> Vec<u64> {
2421 let start_keys: Vec<*const c_char> = ranges
2422 .iter()
2423 .map(|x| x.start_key.as_ptr() as *const c_char)
2424 .collect();
2425 let start_key_lens: Vec<_> = ranges.iter().map(|x| x.start_key.len()).collect();
2426 let end_keys: Vec<*const c_char> = ranges
2427 .iter()
2428 .map(|x| x.end_key.as_ptr() as *const c_char)
2429 .collect();
2430 let end_key_lens: Vec<_> = ranges.iter().map(|x| x.end_key.len()).collect();
2431 let mut sizes: Vec<u64> = vec![0; ranges.len()];
2432 let (n, start_key_ptr, start_key_len_ptr, end_key_ptr, end_key_len_ptr, size_ptr) = (
2433 ranges.len() as i32,
2434 start_keys.as_ptr(),
2435 start_key_lens.as_ptr(),
2436 end_keys.as_ptr(),
2437 end_key_lens.as_ptr(),
2438 sizes.as_mut_ptr(),
2439 );
2440 let mut err: *mut c_char = ptr::null_mut();
2441 match cf {
2442 None => unsafe {
2443 ffi::rocksdb_approximate_sizes(
2444 self.inner.inner(),
2445 n,
2446 start_key_ptr,
2447 start_key_len_ptr,
2448 end_key_ptr,
2449 end_key_len_ptr,
2450 size_ptr,
2451 &mut err,
2452 );
2453 },
2454 Some(cf) => unsafe {
2455 ffi::rocksdb_approximate_sizes_cf(
2456 self.inner.inner(),
2457 cf.inner(),
2458 n,
2459 start_key_ptr,
2460 start_key_len_ptr,
2461 end_key_ptr,
2462 end_key_len_ptr,
2463 size_ptr,
2464 &mut err,
2465 );
2466 },
2467 }
2468 sizes
2469 }
2470
2471 pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
2482 unsafe {
2483 let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
2487 let iter = ffi_try!(ffi::rocksdb_get_updates_since(
2488 self.inner.inner(),
2489 seq_number,
2490 opts
2491 ));
2492 Ok(DBWALIterator {
2493 inner: iter,
2494 start_seq_number: seq_number,
2495 })
2496 }
2497 }
2498
2499 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
2502 unsafe {
2503 ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
2504 }
2505 Ok(())
2506 }
2507
2508 pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
2510 let opts = IngestExternalFileOptions::default();
2511 self.ingest_external_file_opts(&opts, paths)
2512 }
2513
2514 pub fn ingest_external_file_opts<P: AsRef<Path>>(
2516 &self,
2517 opts: &IngestExternalFileOptions,
2518 paths: Vec<P>,
2519 ) -> Result<(), Error> {
2520 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
2521 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
2522
2523 self.ingest_external_file_raw(opts, &paths_v, &cpaths)
2524 }
2525
2526 pub fn ingest_external_file_cf<P: AsRef<Path>>(
2529 &self,
2530 cf: &impl AsColumnFamilyRef,
2531 paths: Vec<P>,
2532 ) -> Result<(), Error> {
2533 let opts = IngestExternalFileOptions::default();
2534 self.ingest_external_file_cf_opts(cf, &opts, paths)
2535 }
2536
2537 pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
2539 &self,
2540 cf: &impl AsColumnFamilyRef,
2541 opts: &IngestExternalFileOptions,
2542 paths: Vec<P>,
2543 ) -> Result<(), Error> {
2544 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
2545 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
2546
2547 self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
2548 }
2549
2550 fn ingest_external_file_raw(
2551 &self,
2552 opts: &IngestExternalFileOptions,
2553 paths_v: &[CString],
2554 cpaths: &[*const c_char],
2555 ) -> Result<(), Error> {
2556 unsafe {
2557 ffi_try!(ffi::rocksdb_ingest_external_file(
2558 self.inner.inner(),
2559 cpaths.as_ptr(),
2560 paths_v.len(),
2561 opts.inner.cast_const()
2562 ));
2563 Ok(())
2564 }
2565 }
2566
2567 fn ingest_external_file_raw_cf(
2568 &self,
2569 cf: &impl AsColumnFamilyRef,
2570 opts: &IngestExternalFileOptions,
2571 paths_v: &[CString],
2572 cpaths: &[*const c_char],
2573 ) -> Result<(), Error> {
2574 unsafe {
2575 ffi_try!(ffi::rocksdb_ingest_external_file_cf(
2576 self.inner.inner(),
2577 cf.inner(),
2578 cpaths.as_ptr(),
2579 paths_v.len(),
2580 opts.inner.cast_const()
2581 ));
2582 Ok(())
2583 }
2584 }
2585
2586 pub fn get_column_family_metadata(&self) -> ColumnFamilyMetaData {
2588 unsafe {
2589 let ptr = ffi::rocksdb_get_column_family_metadata(self.inner.inner());
2590
2591 let metadata = ColumnFamilyMetaData {
2592 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2593 name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2594 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2595 };
2596
2597 ffi::rocksdb_column_family_metadata_destroy(ptr);
2599
2600 metadata
2602 }
2603 }
2604
2605 pub fn get_column_family_metadata_cf(
2607 &self,
2608 cf: &impl AsColumnFamilyRef,
2609 ) -> ColumnFamilyMetaData {
2610 unsafe {
2611 let ptr = ffi::rocksdb_get_column_family_metadata_cf(self.inner.inner(), cf.inner());
2612
2613 let metadata = ColumnFamilyMetaData {
2614 size: ffi::rocksdb_column_family_metadata_get_size(ptr),
2615 name: from_cstr(ffi::rocksdb_column_family_metadata_get_name(ptr)),
2616 file_count: ffi::rocksdb_column_family_metadata_get_file_count(ptr),
2617 };
2618
2619 ffi::rocksdb_column_family_metadata_destroy(ptr);
2621
2622 metadata
2624 }
2625 }
2626
2627 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
2630 unsafe {
2631 let files = ffi::rocksdb_livefiles(self.inner.inner());
2632 if files.is_null() {
2633 Err(Error::new("Could not get live files".to_owned()))
2634 } else {
2635 let n = ffi::rocksdb_livefiles_count(files);
2636
2637 let mut livefiles = Vec::with_capacity(n as usize);
2638 let mut key_size: usize = 0;
2639
2640 for i in 0..n {
2641 let column_family_name =
2642 from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
2643 let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
2644 let size = ffi::rocksdb_livefiles_size(files, i);
2645 let level = ffi::rocksdb_livefiles_level(files, i);
2646
2647 let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
2649 let smallest_key = raw_data(smallest_key, key_size);
2650
2651 let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
2653 let largest_key = raw_data(largest_key, key_size);
2654
2655 livefiles.push(LiveFile {
2656 column_family_name,
2657 name,
2658 size,
2659 level,
2660 start_key: smallest_key,
2661 end_key: largest_key,
2662 num_entries: ffi::rocksdb_livefiles_entries(files, i),
2663 num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
2664 });
2665 }
2666
2667 ffi::rocksdb_livefiles_destroy(files);
2669
2670 Ok(livefiles)
2672 }
2673 }
2674 }
2675
2676 pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2685 let from = from.as_ref();
2686 let to = to.as_ref();
2687 unsafe {
2688 ffi_try!(ffi::rocksdb_delete_file_in_range(
2689 self.inner.inner(),
2690 from.as_ptr() as *const c_char,
2691 from.len() as size_t,
2692 to.as_ptr() as *const c_char,
2693 to.len() as size_t,
2694 ));
2695 Ok(())
2696 }
2697 }
2698
2699 pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2701 &self,
2702 cf: &impl AsColumnFamilyRef,
2703 from: K,
2704 to: K,
2705 ) -> Result<(), Error> {
2706 let from = from.as_ref();
2707 let to = to.as_ref();
2708 unsafe {
2709 ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2710 self.inner.inner(),
2711 cf.inner(),
2712 from.as_ptr() as *const c_char,
2713 from.len() as size_t,
2714 to.as_ptr() as *const c_char,
2715 to.len() as size_t,
2716 ));
2717 Ok(())
2718 }
2719 }
2720
2721 pub fn cancel_all_background_work(&self, wait: bool) {
2723 unsafe {
2724 ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2725 }
2726 }
2727
2728 fn drop_column_family<C>(
2729 &self,
2730 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2731 cf: C,
2732 ) -> Result<(), Error> {
2733 unsafe {
2734 ffi_try!(ffi::rocksdb_drop_column_family(
2736 self.inner.inner(),
2737 cf_inner
2738 ));
2739 }
2740 drop(cf);
2743 Ok(())
2744 }
2745
2746 pub fn increase_full_history_ts_low<S: AsRef<[u8]>>(
2751 &self,
2752 cf: &impl AsColumnFamilyRef,
2753 ts: S,
2754 ) -> Result<(), Error> {
2755 let ts = ts.as_ref();
2756 unsafe {
2757 ffi_try!(ffi::rocksdb_increase_full_history_ts_low(
2758 self.inner.inner(),
2759 cf.inner(),
2760 ts.as_ptr() as *const c_char,
2761 ts.len() as size_t,
2762 ));
2763 Ok(())
2764 }
2765 }
2766
2767 pub fn get_full_history_ts_low(&self, cf: &impl AsColumnFamilyRef) -> Result<Vec<u8>, Error> {
2769 unsafe {
2770 let mut ts_lowlen = 0;
2771 let ts = ffi_try!(ffi::rocksdb_get_full_history_ts_low(
2772 self.inner.inner(),
2773 cf.inner(),
2774 &mut ts_lowlen,
2775 ));
2776
2777 if ts.is_null() {
2778 Err(Error::new("Could not get full_history_ts_low".to_owned()))
2779 } else {
2780 let mut vec = vec![0; ts_lowlen];
2781 ptr::copy_nonoverlapping(ts as *mut u8, vec.as_mut_ptr(), ts_lowlen);
2782 ffi::rocksdb_free(ts as *mut c_void);
2783 Ok(vec)
2784 }
2785 }
2786 }
2787
2788 pub fn get_db_identity(&self) -> Result<Vec<u8>, Error> {
2790 unsafe {
2791 let mut length: usize = 0;
2792 let identity_ptr = ffi::rocksdb_get_db_identity(self.inner.inner(), &mut length);
2793 let identity_vec = raw_data(identity_ptr, length);
2794 ffi::rocksdb_free(identity_ptr as *mut c_void);
2795 identity_vec.ok_or_else(|| Error::new("get_db_identity returned NULL".to_string()))
2798 }
2799 }
2800}
2801
2802impl<I: DBInner> DBCommon<SingleThreaded, I> {
2803 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2805 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2806 self.cfs
2807 .cfs
2808 .insert(name.as_ref().to_string(), ColumnFamily { inner });
2809 Ok(())
2810 }
2811
2812 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2814 match self.cfs.cfs.remove(name) {
2815 Some(cf) => self.drop_column_family(cf.inner, cf),
2816 _ => Err(Error::new(format!("Invalid column family: {name}"))),
2817 }
2818 }
2819
2820 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2822 self.cfs.cfs.get(name)
2823 }
2824
2825 pub fn cf_names(&self) -> Vec<String> {
2827 self.cfs.cfs.keys().cloned().collect()
2828 }
2829}
2830
2831impl<I: DBInner> DBCommon<MultiThreaded, I> {
2832 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2834 let mut cfs = self.cfs.cfs.write();
2837 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2838 cfs.insert(
2839 name.as_ref().to_string(),
2840 Arc::new(UnboundColumnFamily { inner }),
2841 );
2842 Ok(())
2843 }
2844
2845 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2848 match self.cfs.cfs.write().remove(name) {
2849 Some(cf) => self.drop_column_family(cf.inner, cf),
2850 _ => Err(Error::new(format!("Invalid column family: {name}"))),
2851 }
2852 }
2853
2854 pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
2856 self.cfs
2857 .cfs
2858 .read()
2859 .get(name)
2860 .cloned()
2861 .map(UnboundColumnFamily::bound_column_family)
2862 }
2863
2864 pub fn cf_names(&self) -> Vec<String> {
2866 self.cfs.cfs.read().keys().cloned().collect()
2867 }
2868}
2869
2870impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2871 fn drop(&mut self) {
2872 self.cfs.drop_all_cfs_internal();
2873 }
2874}
2875
2876impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2877 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2878 write!(f, "RocksDB {{ path: {} }}", self.path().display())
2879 }
2880}
2881
2882#[derive(Debug, Clone)]
2884pub struct ColumnFamilyMetaData {
2885 pub size: u64,
2888 pub name: String,
2890 pub file_count: usize,
2892}
2893
2894#[derive(Debug, Clone)]
2896pub struct LiveFile {
2897 pub column_family_name: String,
2899 pub name: String,
2901 pub size: usize,
2903 pub level: i32,
2905 pub start_key: Option<Vec<u8>>,
2907 pub end_key: Option<Vec<u8>>,
2909 pub num_entries: u64,
2911 pub num_deletions: u64,
2913}
2914
2915fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2916 opts.iter()
2917 .map(|(name, value)| {
2918 let cname = match CString::new(name.as_bytes()) {
2919 Ok(cname) => cname,
2920 Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2921 };
2922 let cvalue = match CString::new(value.as_bytes()) {
2923 Ok(cvalue) => cvalue,
2924 Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2925 };
2926 Ok((cname, cvalue))
2927 })
2928 .collect()
2929}
2930
2931pub(crate) fn convert_values(
2932 values: Vec<*mut c_char>,
2933 values_sizes: Vec<usize>,
2934 errors: Vec<*mut c_char>,
2935) -> Vec<Result<Option<Vec<u8>>, Error>> {
2936 values
2937 .into_iter()
2938 .zip(values_sizes)
2939 .zip(errors)
2940 .map(|((v, s), e)| {
2941 if e.is_null() {
2942 let value = unsafe { crate::ffi_util::raw_data(v, s) };
2943 unsafe {
2944 ffi::rocksdb_free(v as *mut c_void);
2945 }
2946 Ok(value)
2947 } else {
2948 Err(Error::new(crate::ffi_util::error_message(e)))
2949 }
2950 })
2951 .collect()
2952}