1use crate::{
17 column_family::AsColumnFamilyRef,
18 column_family::BoundColumnFamily,
19 column_family::UnboundColumnFamily,
20 db_options::OptionsMustOutliveDB,
21 ffi,
22 ffi_util::{from_cstr, opt_bytes_to_ptr, raw_data, to_cpath, CStrLike},
23 ColumnFamily, ColumnFamilyDescriptor, CompactOptions, DBIteratorWithThreadMode,
24 DBPinnableSlice, DBRawIteratorWithThreadMode, DBWALIterator, Direction, Error, FlushOptions,
25 IngestExternalFileOptions, IteratorMode, Options, ReadOptions, SnapshotWithThreadMode,
26 WriteBatch, WriteOptions, DEFAULT_COLUMN_FAMILY_NAME,
27};
28
29use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
30use std::collections::BTreeMap;
31use std::ffi::{CStr, CString};
32use std::fmt;
33use std::fs;
34use std::iter;
35use std::path::Path;
36use std::path::PathBuf;
37use std::ptr;
38use std::slice;
39use std::str;
40use std::sync::Arc;
41use std::sync::RwLock;
42use std::time::Duration;
43
44pub trait ThreadMode {
55 fn new_cf_map_internal(
57 cf_map: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
58 ) -> Self;
59 fn drop_all_cfs_internal(&mut self);
61}
62
63pub struct SingleThreaded {
70 pub(crate) cfs: BTreeMap<String, ColumnFamily>,
71}
72
73pub struct MultiThreaded {
79 pub(crate) cfs: RwLock<BTreeMap<String, Arc<UnboundColumnFamily>>>,
80}
81
82impl ThreadMode for SingleThreaded {
83 fn new_cf_map_internal(
84 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
85 ) -> Self {
86 Self {
87 cfs: cfs
88 .into_iter()
89 .map(|(n, c)| (n, ColumnFamily { inner: c }))
90 .collect(),
91 }
92 }
93
94 fn drop_all_cfs_internal(&mut self) {
95 self.cfs.clear();
97 }
98}
99
100impl ThreadMode for MultiThreaded {
101 fn new_cf_map_internal(
102 cfs: BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>,
103 ) -> Self {
104 Self {
105 cfs: RwLock::new(
106 cfs.into_iter()
107 .map(|(n, c)| (n, Arc::new(UnboundColumnFamily { inner: c })))
108 .collect(),
109 ),
110 }
111 }
112
113 fn drop_all_cfs_internal(&mut self) {
114 self.cfs.write().unwrap().clear();
116 }
117}
118
119pub trait DBInner {
121 fn inner(&self) -> *mut ffi::rocksdb_t;
122}
123
124pub struct DBCommon<T: ThreadMode, D: DBInner> {
129 pub(crate) inner: D,
130 cfs: T, path: PathBuf,
132 _outlive: Vec<OptionsMustOutliveDB>,
133}
134
135pub trait DBAccess {
138 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t;
139
140 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t);
141
142 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t;
143
144 unsafe fn create_iterator_cf(
145 &self,
146 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
147 readopts: &ReadOptions,
148 ) -> *mut ffi::rocksdb_iterator_t;
149
150 fn get_opt<K: AsRef<[u8]>>(
151 &self,
152 key: K,
153 readopts: &ReadOptions,
154 ) -> Result<Option<Vec<u8>>, Error>;
155
156 fn get_cf_opt<K: AsRef<[u8]>>(
157 &self,
158 cf: &impl AsColumnFamilyRef,
159 key: K,
160 readopts: &ReadOptions,
161 ) -> Result<Option<Vec<u8>>, Error>;
162
163 fn get_pinned_opt<K: AsRef<[u8]>>(
164 &self,
165 key: K,
166 readopts: &ReadOptions,
167 ) -> Result<Option<DBPinnableSlice>, Error>;
168
169 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
170 &self,
171 cf: &impl AsColumnFamilyRef,
172 key: K,
173 readopts: &ReadOptions,
174 ) -> Result<Option<DBPinnableSlice>, Error>;
175
176 fn multi_get_opt<K, I>(
177 &self,
178 keys: I,
179 readopts: &ReadOptions,
180 ) -> Vec<Result<Option<Vec<u8>>, Error>>
181 where
182 K: AsRef<[u8]>,
183 I: IntoIterator<Item = K>;
184
185 fn multi_get_cf_opt<'b, K, I, W>(
186 &self,
187 keys_cf: I,
188 readopts: &ReadOptions,
189 ) -> Vec<Result<Option<Vec<u8>>, Error>>
190 where
191 K: AsRef<[u8]>,
192 I: IntoIterator<Item = (&'b W, K)>,
193 W: AsColumnFamilyRef + 'b;
194}
195
196impl<T: ThreadMode, D: DBInner> DBAccess for DBCommon<T, D> {
197 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
198 ffi::rocksdb_create_snapshot(self.inner.inner())
199 }
200
201 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
202 ffi::rocksdb_release_snapshot(self.inner.inner(), snapshot);
203 }
204
205 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
206 ffi::rocksdb_create_iterator(self.inner.inner(), readopts.inner)
207 }
208
209 unsafe fn create_iterator_cf(
210 &self,
211 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
212 readopts: &ReadOptions,
213 ) -> *mut ffi::rocksdb_iterator_t {
214 ffi::rocksdb_create_iterator_cf(self.inner.inner(), readopts.inner, cf_handle)
215 }
216
217 fn get_opt<K: AsRef<[u8]>>(
218 &self,
219 key: K,
220 readopts: &ReadOptions,
221 ) -> Result<Option<Vec<u8>>, Error> {
222 self.get_opt(key, readopts)
223 }
224
225 fn get_cf_opt<K: AsRef<[u8]>>(
226 &self,
227 cf: &impl AsColumnFamilyRef,
228 key: K,
229 readopts: &ReadOptions,
230 ) -> Result<Option<Vec<u8>>, Error> {
231 self.get_cf_opt(cf, key, readopts)
232 }
233
234 fn get_pinned_opt<K: AsRef<[u8]>>(
235 &self,
236 key: K,
237 readopts: &ReadOptions,
238 ) -> Result<Option<DBPinnableSlice>, Error> {
239 self.get_pinned_opt(key, readopts)
240 }
241
242 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
243 &self,
244 cf: &impl AsColumnFamilyRef,
245 key: K,
246 readopts: &ReadOptions,
247 ) -> Result<Option<DBPinnableSlice>, Error> {
248 self.get_pinned_cf_opt(cf, key, readopts)
249 }
250
251 fn multi_get_opt<K, Iter>(
252 &self,
253 keys: Iter,
254 readopts: &ReadOptions,
255 ) -> Vec<Result<Option<Vec<u8>>, Error>>
256 where
257 K: AsRef<[u8]>,
258 Iter: IntoIterator<Item = K>,
259 {
260 self.multi_get_opt(keys, readopts)
261 }
262
263 fn multi_get_cf_opt<'b, K, Iter, W>(
264 &self,
265 keys_cf: Iter,
266 readopts: &ReadOptions,
267 ) -> Vec<Result<Option<Vec<u8>>, Error>>
268 where
269 K: AsRef<[u8]>,
270 Iter: IntoIterator<Item = (&'b W, K)>,
271 W: AsColumnFamilyRef + 'b,
272 {
273 self.multi_get_cf_opt(keys_cf, readopts)
274 }
275}
276
277pub struct DBWithThreadModeInner {
278 inner: *mut ffi::rocksdb_t,
279}
280
281impl DBInner for DBWithThreadModeInner {
282 fn inner(&self) -> *mut ffi::rocksdb_t {
283 self.inner
284 }
285}
286
287impl Drop for DBWithThreadModeInner {
288 fn drop(&mut self) {
289 unsafe {
290 ffi::rocksdb_close(self.inner);
291 }
292 }
293}
294
295pub type DBWithThreadMode<T> = DBCommon<T, DBWithThreadModeInner>;
300
301#[cfg(not(feature = "multi-threaded-cf"))]
324pub type DB = DBWithThreadMode<SingleThreaded>;
325
326#[cfg(feature = "multi-threaded-cf")]
327pub type DB = DBWithThreadMode<MultiThreaded>;
328
329unsafe impl<T: ThreadMode + Send, I: DBInner> Send for DBCommon<T, I> {}
333
334unsafe impl<T: ThreadMode, I: DBInner> Sync for DBCommon<T, I> {}
337
338enum AccessType<'a> {
340 ReadWrite,
341 ReadOnly { error_if_log_file_exist: bool },
342 Secondary { secondary_path: &'a Path },
343 WithTTL { ttl: Duration },
344}
345
346impl<T: ThreadMode> DBWithThreadMode<T> {
348 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
350 let mut opts = Options::default();
351 opts.create_if_missing(true);
352 Self::open(&opts, path)
353 }
354
355 pub fn open<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Self, Error> {
357 Self::open_cf(opts, path, None::<&str>)
358 }
359
360 pub fn open_for_read_only<P: AsRef<Path>>(
362 opts: &Options,
363 path: P,
364 error_if_log_file_exist: bool,
365 ) -> Result<Self, Error> {
366 Self::open_cf_for_read_only(opts, path, None::<&str>, error_if_log_file_exist)
367 }
368
369 pub fn open_as_secondary<P: AsRef<Path>>(
371 opts: &Options,
372 primary_path: P,
373 secondary_path: P,
374 ) -> Result<Self, Error> {
375 Self::open_cf_as_secondary(opts, primary_path, secondary_path, None::<&str>)
376 }
377
378 pub fn open_with_ttl<P: AsRef<Path>>(
380 opts: &Options,
381 path: P,
382 ttl: Duration,
383 ) -> Result<Self, Error> {
384 Self::open_cf_descriptors_with_ttl(opts, path, std::iter::empty(), ttl)
385 }
386
387 pub fn open_cf_with_ttl<P, I, N>(
391 opts: &Options,
392 path: P,
393 cfs: I,
394 ttl: Duration,
395 ) -> Result<Self, Error>
396 where
397 P: AsRef<Path>,
398 I: IntoIterator<Item = N>,
399 N: AsRef<str>,
400 {
401 let cfs = cfs
402 .into_iter()
403 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
404
405 Self::open_cf_descriptors_with_ttl(opts, path, cfs, ttl)
406 }
407
408 pub fn open_cf_descriptors_with_ttl<P, I>(
411 opts: &Options,
412 path: P,
413 cfs: I,
414 ttl: Duration,
415 ) -> Result<Self, Error>
416 where
417 P: AsRef<Path>,
418 I: IntoIterator<Item = ColumnFamilyDescriptor>,
419 {
420 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::WithTTL { ttl })
421 }
422
423 pub fn open_cf<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
427 where
428 P: AsRef<Path>,
429 I: IntoIterator<Item = N>,
430 N: AsRef<str>,
431 {
432 let cfs = cfs
433 .into_iter()
434 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
435
436 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
437 }
438
439 pub fn open_cf_with_opts<P, I, N>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
443 where
444 P: AsRef<Path>,
445 I: IntoIterator<Item = (N, Options)>,
446 N: AsRef<str>,
447 {
448 let cfs = cfs
449 .into_iter()
450 .map(|(name, opts)| ColumnFamilyDescriptor::new(name.as_ref(), opts));
451
452 Self::open_cf_descriptors(opts, path, cfs)
453 }
454
455 pub fn open_cf_for_read_only<P, I, N>(
457 opts: &Options,
458 path: P,
459 cfs: I,
460 error_if_log_file_exist: bool,
461 ) -> Result<Self, Error>
462 where
463 P: AsRef<Path>,
464 I: IntoIterator<Item = N>,
465 N: AsRef<str>,
466 {
467 let cfs = cfs
468 .into_iter()
469 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
470
471 Self::open_cf_descriptors_internal(
472 opts,
473 path,
474 cfs,
475 &AccessType::ReadOnly {
476 error_if_log_file_exist,
477 },
478 )
479 }
480
481 pub fn open_cf_with_opts_for_read_only<P, I, N>(
483 db_opts: &Options,
484 path: P,
485 cfs: I,
486 error_if_log_file_exist: bool,
487 ) -> Result<Self, Error>
488 where
489 P: AsRef<Path>,
490 I: IntoIterator<Item = (N, Options)>,
491 N: AsRef<str>,
492 {
493 let cfs = cfs
494 .into_iter()
495 .map(|(name, cf_opts)| ColumnFamilyDescriptor::new(name.as_ref(), cf_opts));
496
497 Self::open_cf_descriptors_internal(
498 db_opts,
499 path,
500 cfs,
501 &AccessType::ReadOnly {
502 error_if_log_file_exist,
503 },
504 )
505 }
506
507 pub fn open_cf_descriptors_read_only<P, I>(
510 opts: &Options,
511 path: P,
512 cfs: I,
513 error_if_log_file_exist: bool,
514 ) -> Result<Self, Error>
515 where
516 P: AsRef<Path>,
517 I: IntoIterator<Item = ColumnFamilyDescriptor>,
518 {
519 Self::open_cf_descriptors_internal(
520 opts,
521 path,
522 cfs,
523 &AccessType::ReadOnly {
524 error_if_log_file_exist,
525 },
526 )
527 }
528
529 pub fn open_cf_as_secondary<P, I, N>(
531 opts: &Options,
532 primary_path: P,
533 secondary_path: P,
534 cfs: I,
535 ) -> Result<Self, Error>
536 where
537 P: AsRef<Path>,
538 I: IntoIterator<Item = N>,
539 N: AsRef<str>,
540 {
541 let cfs = cfs
542 .into_iter()
543 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
544
545 Self::open_cf_descriptors_internal(
546 opts,
547 primary_path,
548 cfs,
549 &AccessType::Secondary {
550 secondary_path: secondary_path.as_ref(),
551 },
552 )
553 }
554
555 pub fn open_cf_descriptors_as_secondary<P, I>(
558 opts: &Options,
559 path: P,
560 secondary_path: P,
561 cfs: I,
562 ) -> Result<Self, Error>
563 where
564 P: AsRef<Path>,
565 I: IntoIterator<Item = ColumnFamilyDescriptor>,
566 {
567 Self::open_cf_descriptors_internal(
568 opts,
569 path,
570 cfs,
571 &AccessType::Secondary {
572 secondary_path: secondary_path.as_ref(),
573 },
574 )
575 }
576
577 pub fn open_cf_descriptors<P, I>(opts: &Options, path: P, cfs: I) -> Result<Self, Error>
579 where
580 P: AsRef<Path>,
581 I: IntoIterator<Item = ColumnFamilyDescriptor>,
582 {
583 Self::open_cf_descriptors_internal(opts, path, cfs, &AccessType::ReadWrite)
584 }
585
586 fn open_cf_descriptors_internal<P, I>(
588 opts: &Options,
589 path: P,
590 cfs: I,
591 access_type: &AccessType,
592 ) -> Result<Self, Error>
593 where
594 P: AsRef<Path>,
595 I: IntoIterator<Item = ColumnFamilyDescriptor>,
596 {
597 let cfs: Vec<_> = cfs.into_iter().collect();
598 let outlive = iter::once(opts.outlive.clone())
599 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
600 .collect();
601
602 let cpath = to_cpath(&path)?;
603
604 if let Err(e) = fs::create_dir_all(&path) {
605 return Err(Error::new(format!(
606 "Failed to create RocksDB directory: `{:?}`.",
607 e
608 )));
609 }
610
611 let db: *mut ffi::rocksdb_t;
612 let mut cf_map = BTreeMap::new();
613
614 if cfs.is_empty() {
615 db = Self::open_raw(opts, &cpath, access_type)?;
616 } else {
617 let mut cfs_v = cfs;
618 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
620 cfs_v.push(ColumnFamilyDescriptor {
621 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
622 options: Options::default(),
623 });
624 }
625 let c_cfs: Vec<CString> = cfs_v
628 .iter()
629 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
630 .collect();
631
632 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
633
634 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
636
637 let cfopts: Vec<_> = cfs_v
638 .iter()
639 .map(|cf| cf.options.inner as *const _)
640 .collect();
641
642 db = Self::open_cf_raw(
643 opts,
644 &cpath,
645 &cfs_v,
646 &cfnames,
647 &cfopts,
648 &mut cfhandles,
649 access_type,
650 )?;
651 for handle in &cfhandles {
652 if handle.is_null() {
653 return Err(Error::new(
654 "Received null column family handle from DB.".to_owned(),
655 ));
656 }
657 }
658
659 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
660 cf_map.insert(cf_desc.name.clone(), inner);
661 }
662 }
663
664 if db.is_null() {
665 return Err(Error::new("Could not initialize database.".to_owned()));
666 }
667
668 Ok(Self {
669 inner: DBWithThreadModeInner { inner: db },
670 path: path.as_ref().to_path_buf(),
671 cfs: T::new_cf_map_internal(cf_map),
672 _outlive: outlive,
673 })
674 }
675
676 fn open_raw(
677 opts: &Options,
678 cpath: &CString,
679 access_type: &AccessType,
680 ) -> Result<*mut ffi::rocksdb_t, Error> {
681 let db = unsafe {
682 match *access_type {
683 AccessType::ReadOnly {
684 error_if_log_file_exist,
685 } => ffi_try!(ffi::rocksdb_open_for_read_only(
686 opts.inner,
687 cpath.as_ptr(),
688 c_uchar::from(error_if_log_file_exist),
689 )),
690 AccessType::ReadWrite => {
691 ffi_try!(ffi::rocksdb_open(opts.inner, cpath.as_ptr()))
692 }
693 AccessType::Secondary { secondary_path } => {
694 ffi_try!(ffi::rocksdb_open_as_secondary(
695 opts.inner,
696 cpath.as_ptr(),
697 to_cpath(secondary_path)?.as_ptr(),
698 ))
699 }
700 AccessType::WithTTL { ttl } => ffi_try!(ffi::rocksdb_open_with_ttl(
701 opts.inner,
702 cpath.as_ptr(),
703 ttl.as_secs() as c_int,
704 )),
705 }
706 };
707 Ok(db)
708 }
709
710 #[allow(clippy::pedantic)]
711 fn open_cf_raw(
712 opts: &Options,
713 cpath: &CString,
714 cfs_v: &[ColumnFamilyDescriptor],
715 cfnames: &[*const c_char],
716 cfopts: &[*const ffi::rocksdb_options_t],
717 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
718 access_type: &AccessType,
719 ) -> Result<*mut ffi::rocksdb_t, Error> {
720 let db = unsafe {
721 match *access_type {
722 AccessType::ReadOnly {
723 error_if_log_file_exist,
724 } => ffi_try!(ffi::rocksdb_open_for_read_only_column_families(
725 opts.inner,
726 cpath.as_ptr(),
727 cfs_v.len() as c_int,
728 cfnames.as_ptr(),
729 cfopts.as_ptr(),
730 cfhandles.as_mut_ptr(),
731 c_uchar::from(error_if_log_file_exist),
732 )),
733 AccessType::ReadWrite => ffi_try!(ffi::rocksdb_open_column_families(
734 opts.inner,
735 cpath.as_ptr(),
736 cfs_v.len() as c_int,
737 cfnames.as_ptr(),
738 cfopts.as_ptr(),
739 cfhandles.as_mut_ptr(),
740 )),
741 AccessType::Secondary { secondary_path } => {
742 ffi_try!(ffi::rocksdb_open_as_secondary_column_families(
743 opts.inner,
744 cpath.as_ptr(),
745 to_cpath(secondary_path)?.as_ptr(),
746 cfs_v.len() as c_int,
747 cfnames.as_ptr(),
748 cfopts.as_ptr(),
749 cfhandles.as_mut_ptr(),
750 ))
751 }
752 AccessType::WithTTL { ttl } => {
753 let ttls_v = vec![ttl.as_secs() as c_int; cfs_v.len()];
754 ffi_try!(ffi::rocksdb_open_column_families_with_ttl(
755 opts.inner,
756 cpath.as_ptr(),
757 cfs_v.len() as c_int,
758 cfnames.as_ptr(),
759 cfopts.as_ptr(),
760 cfhandles.as_mut_ptr(),
761 ttls_v.as_ptr(),
762 ))
763 }
764 }
765 };
766 Ok(db)
767 }
768
769 pub fn delete_range_cf_opt<K: AsRef<[u8]>>(
771 &self,
772 cf: &impl AsColumnFamilyRef,
773 from: K,
774 to: K,
775 writeopts: &WriteOptions,
776 ) -> Result<(), Error> {
777 let from = from.as_ref();
778 let to = to.as_ref();
779
780 unsafe {
781 ffi_try!(ffi::rocksdb_delete_range_cf(
782 self.inner.inner(),
783 writeopts.inner,
784 cf.inner(),
785 from.as_ptr() as *const c_char,
786 from.len() as size_t,
787 to.as_ptr() as *const c_char,
788 to.len() as size_t,
789 ));
790 Ok(())
791 }
792 }
793
794 pub fn delete_range_cf<K: AsRef<[u8]>>(
796 &self,
797 cf: &impl AsColumnFamilyRef,
798 from: K,
799 to: K,
800 ) -> Result<(), Error> {
801 self.delete_range_cf_opt(cf, from, to, &WriteOptions::default())
802 }
803
804 pub fn write_opt(&self, batch: WriteBatch, writeopts: &WriteOptions) -> Result<(), Error> {
805 unsafe {
806 ffi_try!(ffi::rocksdb_write(
807 self.inner.inner(),
808 writeopts.inner,
809 batch.inner
810 ));
811 }
812 Ok(())
813 }
814
815 pub fn write(&self, batch: WriteBatch) -> Result<(), Error> {
816 self.write_opt(batch, &WriteOptions::default())
817 }
818
819 pub fn write_without_wal(&self, batch: WriteBatch) -> Result<(), Error> {
820 let mut wo = WriteOptions::new();
821 wo.disable_wal(true);
822 self.write_opt(batch, &wo)
823 }
824}
825
826impl<T: ThreadMode, D: DBInner> DBCommon<T, D> {
828 pub(crate) fn new(inner: D, cfs: T, path: PathBuf, outlive: Vec<OptionsMustOutliveDB>) -> Self {
829 Self {
830 inner,
831 cfs,
832 path,
833 _outlive: outlive,
834 }
835 }
836
837 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
838 let cpath = to_cpath(path)?;
839 let mut length = 0;
840
841 unsafe {
842 let ptr = ffi_try!(ffi::rocksdb_list_column_families(
843 opts.inner,
844 cpath.as_ptr(),
845 &mut length,
846 ));
847
848 let vec = slice::from_raw_parts(ptr, length)
849 .iter()
850 .map(|ptr| CStr::from_ptr(*ptr).to_string_lossy().into_owned())
851 .collect();
852 ffi::rocksdb_list_column_families_destroy(ptr, length);
853 Ok(vec)
854 }
855 }
856
857 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
858 let cpath = to_cpath(path)?;
859 unsafe {
860 ffi_try!(ffi::rocksdb_destroy_db(opts.inner, cpath.as_ptr()));
861 }
862 Ok(())
863 }
864
865 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
866 let cpath = to_cpath(path)?;
867 unsafe {
868 ffi_try!(ffi::rocksdb_repair_db(opts.inner, cpath.as_ptr()));
869 }
870 Ok(())
871 }
872
873 pub fn path(&self) -> &Path {
874 self.path.as_path()
875 }
876
877 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
880 unsafe {
881 ffi_try!(ffi::rocksdb_flush_wal(
882 self.inner.inner(),
883 c_uchar::from(sync)
884 ));
885 }
886 Ok(())
887 }
888
889 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
891 unsafe {
892 ffi_try!(ffi::rocksdb_flush(self.inner.inner(), flushopts.inner));
893 }
894 Ok(())
895 }
896
897 pub fn flush(&self) -> Result<(), Error> {
899 self.flush_opt(&FlushOptions::default())
900 }
901
902 pub fn flush_cf_opt(
904 &self,
905 cf: &impl AsColumnFamilyRef,
906 flushopts: &FlushOptions,
907 ) -> Result<(), Error> {
908 unsafe {
909 ffi_try!(ffi::rocksdb_flush_cf(
910 self.inner.inner(),
911 flushopts.inner,
912 cf.inner()
913 ));
914 }
915 Ok(())
916 }
917
918 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
921 self.flush_cf_opt(cf, &FlushOptions::default())
922 }
923
924 pub fn get_opt<K: AsRef<[u8]>>(
928 &self,
929 key: K,
930 readopts: &ReadOptions,
931 ) -> Result<Option<Vec<u8>>, Error> {
932 self.get_pinned_opt(key, readopts)
933 .map(|x| x.map(|v| v.as_ref().to_vec()))
934 }
935
936 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
940 self.get_opt(key.as_ref(), &ReadOptions::default())
941 }
942
943 pub fn get_cf_opt<K: AsRef<[u8]>>(
947 &self,
948 cf: &impl AsColumnFamilyRef,
949 key: K,
950 readopts: &ReadOptions,
951 ) -> Result<Option<Vec<u8>>, Error> {
952 self.get_pinned_cf_opt(cf, key, readopts)
953 .map(|x| x.map(|v| v.as_ref().to_vec()))
954 }
955
956 pub fn get_cf<K: AsRef<[u8]>>(
960 &self,
961 cf: &impl AsColumnFamilyRef,
962 key: K,
963 ) -> Result<Option<Vec<u8>>, Error> {
964 self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
965 }
966
967 pub fn get_pinned_opt<K: AsRef<[u8]>>(
970 &self,
971 key: K,
972 readopts: &ReadOptions,
973 ) -> Result<Option<DBPinnableSlice>, Error> {
974 if readopts.inner.is_null() {
975 return Err(Error::new(
976 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
977 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
978 .to_owned(),
979 ));
980 }
981
982 let key = key.as_ref();
983 unsafe {
984 let val = ffi_try!(ffi::rocksdb_get_pinned(
985 self.inner.inner(),
986 readopts.inner,
987 key.as_ptr() as *const c_char,
988 key.len() as size_t,
989 ));
990 if val.is_null() {
991 Ok(None)
992 } else {
993 Ok(Some(DBPinnableSlice::from_c(val)))
994 }
995 }
996 }
997
998 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
1002 self.get_pinned_opt(key, &ReadOptions::default())
1003 }
1004
1005 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
1009 &self,
1010 cf: &impl AsColumnFamilyRef,
1011 key: K,
1012 readopts: &ReadOptions,
1013 ) -> Result<Option<DBPinnableSlice>, Error> {
1014 if readopts.inner.is_null() {
1015 return Err(Error::new(
1016 "Unable to create RocksDB read options. This is a fairly trivial call, and its \
1017 failure may be indicative of a mis-compiled or mis-loaded RocksDB library."
1018 .to_owned(),
1019 ));
1020 }
1021
1022 let key = key.as_ref();
1023 unsafe {
1024 let val = ffi_try!(ffi::rocksdb_get_pinned_cf(
1025 self.inner.inner(),
1026 readopts.inner,
1027 cf.inner(),
1028 key.as_ptr() as *const c_char,
1029 key.len() as size_t,
1030 ));
1031 if val.is_null() {
1032 Ok(None)
1033 } else {
1034 Ok(Some(DBPinnableSlice::from_c(val)))
1035 }
1036 }
1037 }
1038
1039 pub fn get_pinned_cf<K: AsRef<[u8]>>(
1043 &self,
1044 cf: &impl AsColumnFamilyRef,
1045 key: K,
1046 ) -> Result<Option<DBPinnableSlice>, Error> {
1047 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
1048 }
1049
1050 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
1052 where
1053 K: AsRef<[u8]>,
1054 I: IntoIterator<Item = K>,
1055 {
1056 self.multi_get_opt(keys, &ReadOptions::default())
1057 }
1058
1059 pub fn multi_get_opt<K, I>(
1061 &self,
1062 keys: I,
1063 readopts: &ReadOptions,
1064 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1065 where
1066 K: AsRef<[u8]>,
1067 I: IntoIterator<Item = K>,
1068 {
1069 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1070 .into_iter()
1071 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1072 .unzip();
1073 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1074
1075 let mut values = vec![ptr::null_mut(); keys.len()];
1076 let mut values_sizes = vec![0_usize; keys.len()];
1077 let mut errors = vec![ptr::null_mut(); keys.len()];
1078 unsafe {
1079 ffi::rocksdb_multi_get(
1080 self.inner.inner(),
1081 readopts.inner,
1082 ptr_keys.len(),
1083 ptr_keys.as_ptr(),
1084 keys_sizes.as_ptr(),
1085 values.as_mut_ptr(),
1086 values_sizes.as_mut_ptr(),
1087 errors.as_mut_ptr(),
1088 );
1089 }
1090
1091 convert_values(values, values_sizes, errors)
1092 }
1093
1094 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
1096 &'a self,
1097 keys: I,
1098 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1099 where
1100 K: AsRef<[u8]>,
1101 I: IntoIterator<Item = (&'b W, K)>,
1102 W: 'b + AsColumnFamilyRef,
1103 {
1104 self.multi_get_cf_opt(keys, &ReadOptions::default())
1105 }
1106
1107 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
1109 &'a self,
1110 keys: I,
1111 readopts: &ReadOptions,
1112 ) -> Vec<Result<Option<Vec<u8>>, Error>>
1113 where
1114 K: AsRef<[u8]>,
1115 I: IntoIterator<Item = (&'b W, K)>,
1116 W: 'b + AsColumnFamilyRef,
1117 {
1118 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
1119 .into_iter()
1120 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
1121 .unzip();
1122 let ptr_keys: Vec<_> = cfs_and_keys
1123 .iter()
1124 .map(|(_, k)| k.as_ptr() as *const c_char)
1125 .collect();
1126 let ptr_cfs: Vec<_> = cfs_and_keys
1127 .iter()
1128 .map(|(c, _)| c.inner() as *const _)
1129 .collect();
1130
1131 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
1132 let mut values_sizes = vec![0_usize; ptr_keys.len()];
1133 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1134 unsafe {
1135 ffi::rocksdb_multi_get_cf(
1136 self.inner.inner(),
1137 readopts.inner,
1138 ptr_cfs.as_ptr(),
1139 ptr_keys.len(),
1140 ptr_keys.as_ptr(),
1141 keys_sizes.as_ptr(),
1142 values.as_mut_ptr(),
1143 values_sizes.as_mut_ptr(),
1144 errors.as_mut_ptr(),
1145 );
1146 }
1147
1148 convert_values(values, values_sizes, errors)
1149 }
1150
1151 pub fn batched_multi_get_cf<K, I>(
1155 &self,
1156 cf: &impl AsColumnFamilyRef,
1157 keys: I,
1158 sorted_input: bool,
1159 ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1160 where
1161 K: AsRef<[u8]>,
1162 I: IntoIterator<Item = K>,
1163 {
1164 self.batched_multi_get_cf_opt(cf, keys, sorted_input, &ReadOptions::default())
1165 }
1166
1167 pub fn batched_multi_get_cf_opt<K, I>(
1171 &self,
1172 cf: &impl AsColumnFamilyRef,
1173 keys: I,
1174 sorted_input: bool,
1175 readopts: &ReadOptions,
1176 ) -> Vec<Result<Option<DBPinnableSlice>, Error>>
1177 where
1178 K: AsRef<[u8]>,
1179 I: IntoIterator<Item = K>,
1180 {
1181 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
1182 .into_iter()
1183 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
1184 .unzip();
1185 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
1186
1187 let mut pinned_values = vec![ptr::null_mut(); ptr_keys.len()];
1188 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
1189
1190 unsafe {
1191 ffi::rocksdb_batched_multi_get_cf(
1192 self.inner.inner(),
1193 readopts.inner,
1194 cf.inner(),
1195 ptr_keys.len(),
1196 ptr_keys.as_ptr(),
1197 keys_sizes.as_ptr(),
1198 pinned_values.as_mut_ptr(),
1199 errors.as_mut_ptr(),
1200 sorted_input,
1201 );
1202 pinned_values
1203 .into_iter()
1204 .zip(errors.into_iter())
1205 .map(|(v, e)| {
1206 if e.is_null() {
1207 if v.is_null() {
1208 Ok(None)
1209 } else {
1210 Ok(Some(DBPinnableSlice::from_c(v)))
1211 }
1212 } else {
1213 Err(Error::new(crate::ffi_util::error_message(e)))
1214 }
1215 })
1216 .collect()
1217 }
1218 }
1219
1220 pub fn key_may_exist<K: AsRef<[u8]>>(&self, key: K) -> bool {
1223 self.key_may_exist_opt(key, &ReadOptions::default())
1224 }
1225
1226 pub fn key_may_exist_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> bool {
1229 let key = key.as_ref();
1230 unsafe {
1231 0 != ffi::rocksdb_key_may_exist(
1232 self.inner.inner(),
1233 readopts.inner,
1234 key.as_ptr() as *const c_char,
1235 key.len() as size_t,
1236 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1242 }
1243 }
1244
1245 pub fn key_may_exist_cf<K: AsRef<[u8]>>(&self, cf: &impl AsColumnFamilyRef, key: K) -> bool {
1248 self.key_may_exist_cf_opt(cf, key, &ReadOptions::default())
1249 }
1250
1251 pub fn key_may_exist_cf_opt<K: AsRef<[u8]>>(
1254 &self,
1255 cf: &impl AsColumnFamilyRef,
1256 key: K,
1257 readopts: &ReadOptions,
1258 ) -> bool {
1259 let key = key.as_ref();
1260 0 != unsafe {
1261 ffi::rocksdb_key_may_exist_cf(
1262 self.inner.inner(),
1263 readopts.inner,
1264 cf.inner(),
1265 key.as_ptr() as *const c_char,
1266 key.len() as size_t,
1267 ptr::null_mut(), ptr::null_mut(), ptr::null(), 0, ptr::null_mut(), )
1273 }
1274 }
1275
1276 fn create_inner_cf_handle(
1277 &self,
1278 name: impl CStrLike,
1279 opts: &Options,
1280 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
1281 let cf_name = name.bake().map_err(|err| {
1282 Error::new(format!(
1283 "Failed to convert path to CString when creating cf: {err}"
1284 ))
1285 })?;
1286 Ok(unsafe {
1287 ffi_try!(ffi::rocksdb_create_column_family(
1288 self.inner.inner(),
1289 opts.inner,
1290 cf_name.as_ptr(),
1291 ))
1292 })
1293 }
1294
1295 pub fn iterator<'a: 'b, 'b>(
1296 &'a self,
1297 mode: IteratorMode,
1298 ) -> DBIteratorWithThreadMode<'b, Self> {
1299 let readopts = ReadOptions::default();
1300 self.iterator_opt(mode, readopts)
1301 }
1302
1303 pub fn iterator_opt<'a: 'b, 'b>(
1304 &'a self,
1305 mode: IteratorMode,
1306 readopts: ReadOptions,
1307 ) -> DBIteratorWithThreadMode<'b, Self> {
1308 DBIteratorWithThreadMode::new(self, readopts, mode)
1309 }
1310
1311 pub fn iterator_cf_opt<'a: 'b, 'b>(
1314 &'a self,
1315 cf_handle: &impl AsColumnFamilyRef,
1316 readopts: ReadOptions,
1317 mode: IteratorMode,
1318 ) -> DBIteratorWithThreadMode<'b, Self> {
1319 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
1320 }
1321
1322 pub fn full_iterator<'a: 'b, 'b>(
1326 &'a self,
1327 mode: IteratorMode,
1328 ) -> DBIteratorWithThreadMode<'b, Self> {
1329 let mut opts = ReadOptions::default();
1330 opts.set_total_order_seek(true);
1331 DBIteratorWithThreadMode::new(self, opts, mode)
1332 }
1333
1334 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
1335 &'a self,
1336 prefix: P,
1337 ) -> DBIteratorWithThreadMode<'b, Self> {
1338 let mut opts = ReadOptions::default();
1339 opts.set_prefix_same_as_start(true);
1340 DBIteratorWithThreadMode::new(
1341 self,
1342 opts,
1343 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1344 )
1345 }
1346
1347 pub fn iterator_cf<'a: 'b, 'b>(
1348 &'a self,
1349 cf_handle: &impl AsColumnFamilyRef,
1350 mode: IteratorMode,
1351 ) -> DBIteratorWithThreadMode<'b, Self> {
1352 let opts = ReadOptions::default();
1353 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1354 }
1355
1356 pub fn full_iterator_cf<'a: 'b, 'b>(
1357 &'a self,
1358 cf_handle: &impl AsColumnFamilyRef,
1359 mode: IteratorMode,
1360 ) -> DBIteratorWithThreadMode<'b, Self> {
1361 let mut opts = ReadOptions::default();
1362 opts.set_total_order_seek(true);
1363 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1364 }
1365
1366 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1367 &'a self,
1368 cf_handle: &impl AsColumnFamilyRef,
1369 prefix: P,
1370 ) -> DBIteratorWithThreadMode<'a, Self> {
1371 let mut opts = ReadOptions::default();
1372 opts.set_prefix_same_as_start(true);
1373 DBIteratorWithThreadMode::<'a, Self>::new_cf(
1374 self,
1375 cf_handle.inner(),
1376 opts,
1377 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1378 )
1379 }
1380
1381 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1383 let opts = ReadOptions::default();
1384 DBRawIteratorWithThreadMode::new(self, opts)
1385 }
1386
1387 pub fn raw_iterator_cf<'a: 'b, 'b>(
1389 &'a self,
1390 cf_handle: &impl AsColumnFamilyRef,
1391 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1392 let opts = ReadOptions::default();
1393 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1394 }
1395
1396 pub fn raw_iterator_opt<'a: 'b, 'b>(
1398 &'a self,
1399 readopts: ReadOptions,
1400 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1401 DBRawIteratorWithThreadMode::new(self, readopts)
1402 }
1403
1404 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1406 &'a self,
1407 cf_handle: &impl AsColumnFamilyRef,
1408 readopts: ReadOptions,
1409 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1410 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1411 }
1412
1413 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
1414 SnapshotWithThreadMode::<Self>::new(self)
1415 }
1416
1417 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1418 where
1419 K: AsRef<[u8]>,
1420 V: AsRef<[u8]>,
1421 {
1422 let key = key.as_ref();
1423 let value = value.as_ref();
1424
1425 unsafe {
1426 ffi_try!(ffi::rocksdb_put(
1427 self.inner.inner(),
1428 writeopts.inner,
1429 key.as_ptr() as *const c_char,
1430 key.len() as size_t,
1431 value.as_ptr() as *const c_char,
1432 value.len() as size_t,
1433 ));
1434 Ok(())
1435 }
1436 }
1437
1438 pub fn put_cf_opt<K, V>(
1439 &self,
1440 cf: &impl AsColumnFamilyRef,
1441 key: K,
1442 value: V,
1443 writeopts: &WriteOptions,
1444 ) -> Result<(), Error>
1445 where
1446 K: AsRef<[u8]>,
1447 V: AsRef<[u8]>,
1448 {
1449 let key = key.as_ref();
1450 let value = value.as_ref();
1451
1452 unsafe {
1453 ffi_try!(ffi::rocksdb_put_cf(
1454 self.inner.inner(),
1455 writeopts.inner,
1456 cf.inner(),
1457 key.as_ptr() as *const c_char,
1458 key.len() as size_t,
1459 value.as_ptr() as *const c_char,
1460 value.len() as size_t,
1461 ));
1462 Ok(())
1463 }
1464 }
1465
1466 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
1467 where
1468 K: AsRef<[u8]>,
1469 V: AsRef<[u8]>,
1470 {
1471 let key = key.as_ref();
1472 let value = value.as_ref();
1473
1474 unsafe {
1475 ffi_try!(ffi::rocksdb_merge(
1476 self.inner.inner(),
1477 writeopts.inner,
1478 key.as_ptr() as *const c_char,
1479 key.len() as size_t,
1480 value.as_ptr() as *const c_char,
1481 value.len() as size_t,
1482 ));
1483 Ok(())
1484 }
1485 }
1486
1487 pub fn merge_cf_opt<K, V>(
1488 &self,
1489 cf: &impl AsColumnFamilyRef,
1490 key: K,
1491 value: V,
1492 writeopts: &WriteOptions,
1493 ) -> Result<(), Error>
1494 where
1495 K: AsRef<[u8]>,
1496 V: AsRef<[u8]>,
1497 {
1498 let key = key.as_ref();
1499 let value = value.as_ref();
1500
1501 unsafe {
1502 ffi_try!(ffi::rocksdb_merge_cf(
1503 self.inner.inner(),
1504 writeopts.inner,
1505 cf.inner(),
1506 key.as_ptr() as *const c_char,
1507 key.len() as size_t,
1508 value.as_ptr() as *const c_char,
1509 value.len() as size_t,
1510 ));
1511 Ok(())
1512 }
1513 }
1514
1515 pub fn delete_opt<K: AsRef<[u8]>>(
1516 &self,
1517 key: K,
1518 writeopts: &WriteOptions,
1519 ) -> Result<(), Error> {
1520 let key = key.as_ref();
1521
1522 unsafe {
1523 ffi_try!(ffi::rocksdb_delete(
1524 self.inner.inner(),
1525 writeopts.inner,
1526 key.as_ptr() as *const c_char,
1527 key.len() as size_t,
1528 ));
1529 Ok(())
1530 }
1531 }
1532
1533 pub fn delete_cf_opt<K: AsRef<[u8]>>(
1534 &self,
1535 cf: &impl AsColumnFamilyRef,
1536 key: K,
1537 writeopts: &WriteOptions,
1538 ) -> Result<(), Error> {
1539 let key = key.as_ref();
1540
1541 unsafe {
1542 ffi_try!(ffi::rocksdb_delete_cf(
1543 self.inner.inner(),
1544 writeopts.inner,
1545 cf.inner(),
1546 key.as_ptr() as *const c_char,
1547 key.len() as size_t,
1548 ));
1549 Ok(())
1550 }
1551 }
1552
1553 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
1554 where
1555 K: AsRef<[u8]>,
1556 V: AsRef<[u8]>,
1557 {
1558 self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1559 }
1560
1561 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1562 where
1563 K: AsRef<[u8]>,
1564 V: AsRef<[u8]>,
1565 {
1566 self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1567 }
1568
1569 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
1570 where
1571 K: AsRef<[u8]>,
1572 V: AsRef<[u8]>,
1573 {
1574 self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
1575 }
1576
1577 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
1578 where
1579 K: AsRef<[u8]>,
1580 V: AsRef<[u8]>,
1581 {
1582 self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
1583 }
1584
1585 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
1586 self.delete_opt(key.as_ref(), &WriteOptions::default())
1587 }
1588
1589 pub fn delete_cf<K: AsRef<[u8]>>(
1590 &self,
1591 cf: &impl AsColumnFamilyRef,
1592 key: K,
1593 ) -> Result<(), Error> {
1594 self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
1595 }
1596
1597 pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
1599 unsafe {
1600 let start = start.as_ref().map(AsRef::as_ref);
1601 let end = end.as_ref().map(AsRef::as_ref);
1602
1603 ffi::rocksdb_compact_range(
1604 self.inner.inner(),
1605 opt_bytes_to_ptr(start),
1606 start.map_or(0, <[u8]>::len) as size_t,
1607 opt_bytes_to_ptr(end),
1608 end.map_or(0, <[u8]>::len) as size_t,
1609 );
1610 }
1611 }
1612
1613 pub fn compact_range_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1615 &self,
1616 start: Option<S>,
1617 end: Option<E>,
1618 opts: &CompactOptions,
1619 ) {
1620 unsafe {
1621 let start = start.as_ref().map(AsRef::as_ref);
1622 let end = end.as_ref().map(AsRef::as_ref);
1623
1624 ffi::rocksdb_compact_range_opt(
1625 self.inner.inner(),
1626 opts.inner,
1627 opt_bytes_to_ptr(start),
1628 start.map_or(0, <[u8]>::len) as size_t,
1629 opt_bytes_to_ptr(end),
1630 end.map_or(0, <[u8]>::len) as size_t,
1631 );
1632 }
1633 }
1634
1635 pub fn compact_range_cf<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1638 &self,
1639 cf: &impl AsColumnFamilyRef,
1640 start: Option<S>,
1641 end: Option<E>,
1642 ) {
1643 unsafe {
1644 let start = start.as_ref().map(AsRef::as_ref);
1645 let end = end.as_ref().map(AsRef::as_ref);
1646
1647 ffi::rocksdb_compact_range_cf(
1648 self.inner.inner(),
1649 cf.inner(),
1650 opt_bytes_to_ptr(start),
1651 start.map_or(0, <[u8]>::len) as size_t,
1652 opt_bytes_to_ptr(end),
1653 end.map_or(0, <[u8]>::len) as size_t,
1654 );
1655 }
1656 }
1657
1658 pub fn compact_range_cf_opt<S: AsRef<[u8]>, E: AsRef<[u8]>>(
1660 &self,
1661 cf: &impl AsColumnFamilyRef,
1662 start: Option<S>,
1663 end: Option<E>,
1664 opts: &CompactOptions,
1665 ) {
1666 unsafe {
1667 let start = start.as_ref().map(AsRef::as_ref);
1668 let end = end.as_ref().map(AsRef::as_ref);
1669
1670 ffi::rocksdb_compact_range_cf_opt(
1671 self.inner.inner(),
1672 cf.inner(),
1673 opts.inner,
1674 opt_bytes_to_ptr(start),
1675 start.map_or(0, <[u8]>::len) as size_t,
1676 opt_bytes_to_ptr(end),
1677 end.map_or(0, <[u8]>::len) as size_t,
1678 );
1679 }
1680 }
1681
1682 pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), Error> {
1683 let copts = convert_options(opts)?;
1684 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1685 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1686 let count = opts.len() as i32;
1687 unsafe {
1688 ffi_try!(ffi::rocksdb_set_options(
1689 self.inner.inner(),
1690 count,
1691 cnames.as_ptr(),
1692 cvalues.as_ptr(),
1693 ));
1694 }
1695 Ok(())
1696 }
1697
1698 pub fn set_options_cf(
1699 &self,
1700 cf: &impl AsColumnFamilyRef,
1701 opts: &[(&str, &str)],
1702 ) -> Result<(), Error> {
1703 let copts = convert_options(opts)?;
1704 let cnames: Vec<*const c_char> = copts.iter().map(|opt| opt.0.as_ptr()).collect();
1705 let cvalues: Vec<*const c_char> = copts.iter().map(|opt| opt.1.as_ptr()).collect();
1706 let count = opts.len() as i32;
1707 unsafe {
1708 ffi_try!(ffi::rocksdb_set_options_cf(
1709 self.inner.inner(),
1710 cf.inner(),
1711 count,
1712 cnames.as_ptr(),
1713 cvalues.as_ptr(),
1714 ));
1715 }
1716 Ok(())
1717 }
1718
1719 fn property_value_impl<R>(
1728 name: impl CStrLike,
1729 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1730 parse: impl FnOnce(&str) -> Result<R, Error>,
1731 ) -> Result<Option<R>, Error> {
1732 let value = match name.bake() {
1733 Ok(prop_name) => get_property(prop_name.as_ptr()),
1734 Err(e) => {
1735 return Err(Error::new(format!(
1736 "Failed to convert property name to CString: {}",
1737 e
1738 )));
1739 }
1740 };
1741 if value.is_null() {
1742 return Ok(None);
1743 }
1744 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1745 Ok(s) => parse(s).map(|value| Some(value)),
1746 Err(e) => Err(Error::new(format!(
1747 "Failed to convert property value to string: {}",
1748 e
1749 ))),
1750 };
1751 unsafe {
1752 libc::free(value as *mut c_void);
1753 }
1754 result
1755 }
1756
1757 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1762 Self::property_value_impl(
1763 name,
1764 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1765 |str_value| Ok(str_value.to_owned()),
1766 )
1767 }
1768
1769 pub fn property_value_cf(
1774 &self,
1775 cf: &impl AsColumnFamilyRef,
1776 name: impl CStrLike,
1777 ) -> Result<Option<String>, Error> {
1778 Self::property_value_impl(
1779 name,
1780 |prop_name| unsafe {
1781 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1782 },
1783 |str_value| Ok(str_value.to_owned()),
1784 )
1785 }
1786
1787 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1788 value.parse::<u64>().map_err(|err| {
1789 Error::new(format!(
1790 "Failed to convert property value {} to int: {}",
1791 value, err
1792 ))
1793 })
1794 }
1795
1796 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1801 Self::property_value_impl(
1802 name,
1803 |prop_name| unsafe { ffi::rocksdb_property_value(self.inner.inner(), prop_name) },
1804 Self::parse_property_int_value,
1805 )
1806 }
1807
1808 pub fn property_int_value_cf(
1813 &self,
1814 cf: &impl AsColumnFamilyRef,
1815 name: impl CStrLike,
1816 ) -> Result<Option<u64>, Error> {
1817 Self::property_value_impl(
1818 name,
1819 |prop_name| unsafe {
1820 ffi::rocksdb_property_value_cf(self.inner.inner(), cf.inner(), prop_name)
1821 },
1822 Self::parse_property_int_value,
1823 )
1824 }
1825
1826 pub fn latest_sequence_number(&self) -> u64 {
1828 unsafe { ffi::rocksdb_get_latest_sequence_number(self.inner.inner()) }
1829 }
1830
1831 pub fn get_updates_since(&self, seq_number: u64) -> Result<DBWALIterator, Error> {
1842 unsafe {
1843 let opts: *const ffi::rocksdb_wal_readoptions_t = ptr::null();
1847 let iter = ffi_try!(ffi::rocksdb_get_updates_since(
1848 self.inner.inner(),
1849 seq_number,
1850 opts
1851 ));
1852 Ok(DBWALIterator { inner: iter })
1853 }
1854 }
1855
1856 pub fn try_catch_up_with_primary(&self) -> Result<(), Error> {
1859 unsafe {
1860 ffi_try!(ffi::rocksdb_try_catch_up_with_primary(self.inner.inner()));
1861 }
1862 Ok(())
1863 }
1864
1865 pub fn ingest_external_file<P: AsRef<Path>>(&self, paths: Vec<P>) -> Result<(), Error> {
1867 let opts = IngestExternalFileOptions::default();
1868 self.ingest_external_file_opts(&opts, paths)
1869 }
1870
1871 pub fn ingest_external_file_opts<P: AsRef<Path>>(
1873 &self,
1874 opts: &IngestExternalFileOptions,
1875 paths: Vec<P>,
1876 ) -> Result<(), Error> {
1877 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1878 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1879
1880 self.ingest_external_file_raw(opts, &paths_v, &cpaths)
1881 }
1882
1883 pub fn ingest_external_file_cf<P: AsRef<Path>>(
1886 &self,
1887 cf: &impl AsColumnFamilyRef,
1888 paths: Vec<P>,
1889 ) -> Result<(), Error> {
1890 let opts = IngestExternalFileOptions::default();
1891 self.ingest_external_file_cf_opts(cf, &opts, paths)
1892 }
1893
1894 pub fn ingest_external_file_cf_opts<P: AsRef<Path>>(
1896 &self,
1897 cf: &impl AsColumnFamilyRef,
1898 opts: &IngestExternalFileOptions,
1899 paths: Vec<P>,
1900 ) -> Result<(), Error> {
1901 let paths_v: Vec<CString> = paths.iter().map(to_cpath).collect::<Result<Vec<_>, _>>()?;
1902 let cpaths: Vec<_> = paths_v.iter().map(|path| path.as_ptr()).collect();
1903
1904 self.ingest_external_file_raw_cf(cf, opts, &paths_v, &cpaths)
1905 }
1906
1907 fn ingest_external_file_raw(
1908 &self,
1909 opts: &IngestExternalFileOptions,
1910 paths_v: &[CString],
1911 cpaths: &[*const c_char],
1912 ) -> Result<(), Error> {
1913 unsafe {
1914 ffi_try!(ffi::rocksdb_ingest_external_file(
1915 self.inner.inner(),
1916 cpaths.as_ptr(),
1917 paths_v.len(),
1918 opts.inner as *const _
1919 ));
1920 Ok(())
1921 }
1922 }
1923
1924 fn ingest_external_file_raw_cf(
1925 &self,
1926 cf: &impl AsColumnFamilyRef,
1927 opts: &IngestExternalFileOptions,
1928 paths_v: &[CString],
1929 cpaths: &[*const c_char],
1930 ) -> Result<(), Error> {
1931 unsafe {
1932 ffi_try!(ffi::rocksdb_ingest_external_file_cf(
1933 self.inner.inner(),
1934 cf.inner(),
1935 cpaths.as_ptr(),
1936 paths_v.len(),
1937 opts.inner as *const _
1938 ));
1939 Ok(())
1940 }
1941 }
1942
1943 pub fn live_files(&self) -> Result<Vec<LiveFile>, Error> {
1946 unsafe {
1947 let files = ffi::rocksdb_livefiles(self.inner.inner());
1948 if files.is_null() {
1949 Err(Error::new("Could not get live files".to_owned()))
1950 } else {
1951 let n = ffi::rocksdb_livefiles_count(files);
1952
1953 let mut livefiles = Vec::with_capacity(n as usize);
1954 let mut key_size: usize = 0;
1955
1956 for i in 0..n {
1957 let column_family_name =
1958 from_cstr(ffi::rocksdb_livefiles_column_family_name(files, i));
1959 let name = from_cstr(ffi::rocksdb_livefiles_name(files, i));
1960 let size = ffi::rocksdb_livefiles_size(files, i);
1961 let level = ffi::rocksdb_livefiles_level(files, i);
1962
1963 let smallest_key = ffi::rocksdb_livefiles_smallestkey(files, i, &mut key_size);
1965 let smallest_key = raw_data(smallest_key, key_size);
1966
1967 let largest_key = ffi::rocksdb_livefiles_largestkey(files, i, &mut key_size);
1969 let largest_key = raw_data(largest_key, key_size);
1970
1971 livefiles.push(LiveFile {
1972 column_family_name,
1973 name,
1974 size,
1975 level,
1976 start_key: smallest_key,
1977 end_key: largest_key,
1978 num_entries: ffi::rocksdb_livefiles_entries(files, i),
1979 num_deletions: ffi::rocksdb_livefiles_deletions(files, i),
1980 });
1981 }
1982
1983 ffi::rocksdb_livefiles_destroy(files);
1985
1986 Ok(livefiles)
1988 }
1989 }
1990 }
1991
1992 pub fn delete_file_in_range<K: AsRef<[u8]>>(&self, from: K, to: K) -> Result<(), Error> {
2001 let from = from.as_ref();
2002 let to = to.as_ref();
2003 unsafe {
2004 ffi_try!(ffi::rocksdb_delete_file_in_range(
2005 self.inner.inner(),
2006 from.as_ptr() as *const c_char,
2007 from.len() as size_t,
2008 to.as_ptr() as *const c_char,
2009 to.len() as size_t,
2010 ));
2011 Ok(())
2012 }
2013 }
2014
2015 pub fn delete_file_in_range_cf<K: AsRef<[u8]>>(
2017 &self,
2018 cf: &impl AsColumnFamilyRef,
2019 from: K,
2020 to: K,
2021 ) -> Result<(), Error> {
2022 let from = from.as_ref();
2023 let to = to.as_ref();
2024 unsafe {
2025 ffi_try!(ffi::rocksdb_delete_file_in_range_cf(
2026 self.inner.inner(),
2027 cf.inner(),
2028 from.as_ptr() as *const c_char,
2029 from.len() as size_t,
2030 to.as_ptr() as *const c_char,
2031 to.len() as size_t,
2032 ));
2033 Ok(())
2034 }
2035 }
2036
2037 pub fn cancel_all_background_work(&self, wait: bool) {
2039 unsafe {
2040 ffi::rocksdb_cancel_all_background_work(self.inner.inner(), c_uchar::from(wait));
2041 }
2042 }
2043
2044 fn drop_column_family<C>(
2045 &self,
2046 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
2047 cf: C,
2048 ) -> Result<(), Error> {
2049 unsafe {
2050 ffi_try!(ffi::rocksdb_drop_column_family(
2052 self.inner.inner(),
2053 cf_inner
2054 ));
2055 }
2056 drop(cf);
2059 Ok(())
2060 }
2061}
2062
2063impl<I: DBInner> DBCommon<SingleThreaded, I> {
2064 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
2066 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2067 self.cfs
2068 .cfs
2069 .insert(name.as_ref().to_string(), ColumnFamily { inner });
2070 Ok(())
2071 }
2072
2073 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
2075 if let Some(cf) = self.cfs.cfs.remove(name) {
2076 self.drop_column_family(cf.inner, cf)
2077 } else {
2078 Err(Error::new(format!("Invalid column family: {name}")))
2079 }
2080 }
2081
2082 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
2084 self.cfs.cfs.get(name)
2085 }
2086}
2087
2088impl<I: DBInner> DBCommon<MultiThreaded, I> {
2089 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
2091 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
2092 self.cfs.cfs.write().unwrap().insert(
2093 name.as_ref().to_string(),
2094 Arc::new(UnboundColumnFamily { inner }),
2095 );
2096 Ok(())
2097 }
2098
2099 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
2102 if let Some(cf) = self.cfs.cfs.write().unwrap().remove(name) {
2103 self.drop_column_family(cf.inner, cf)
2104 } else {
2105 Err(Error::new(format!("Invalid column family: {name}")))
2106 }
2107 }
2108
2109 pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
2111 self.cfs
2112 .cfs
2113 .read()
2114 .unwrap()
2115 .get(name)
2116 .cloned()
2117 .map(UnboundColumnFamily::bound_column_family)
2118 }
2119}
2120
2121impl<T: ThreadMode, I: DBInner> Drop for DBCommon<T, I> {
2122 fn drop(&mut self) {
2123 self.cfs.drop_all_cfs_internal();
2124 }
2125}
2126
2127impl<T: ThreadMode, I: DBInner> fmt::Debug for DBCommon<T, I> {
2128 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2129 write!(f, "RocksDB {{ path: {:?} }}", self.path())
2130 }
2131}
2132
2133#[derive(Debug, Clone)]
2135pub struct LiveFile {
2136 pub column_family_name: String,
2138 pub name: String,
2140 pub size: usize,
2142 pub level: i32,
2144 pub start_key: Option<Vec<u8>>,
2146 pub end_key: Option<Vec<u8>>,
2148 pub num_entries: u64,
2150 pub num_deletions: u64,
2152}
2153
2154fn convert_options(opts: &[(&str, &str)]) -> Result<Vec<(CString, CString)>, Error> {
2155 opts.iter()
2156 .map(|(name, value)| {
2157 let cname = match CString::new(name.as_bytes()) {
2158 Ok(cname) => cname,
2159 Err(e) => return Err(Error::new(format!("Invalid option name `{e}`"))),
2160 };
2161 let cvalue = match CString::new(value.as_bytes()) {
2162 Ok(cvalue) => cvalue,
2163 Err(e) => return Err(Error::new(format!("Invalid option value: `{e}`"))),
2164 };
2165 Ok((cname, cvalue))
2166 })
2167 .collect()
2168}
2169
2170pub(crate) fn convert_values(
2171 values: Vec<*mut c_char>,
2172 values_sizes: Vec<usize>,
2173 errors: Vec<*mut c_char>,
2174) -> Vec<Result<Option<Vec<u8>>, Error>> {
2175 values
2176 .into_iter()
2177 .zip(values_sizes.into_iter())
2178 .zip(errors.into_iter())
2179 .map(|((v, s), e)| {
2180 if e.is_null() {
2181 let value = unsafe { crate::ffi_util::raw_data(v, s) };
2182 unsafe {
2183 ffi::rocksdb_free(v as *mut c_void);
2184 }
2185 Ok(value)
2186 } else {
2187 Err(Error::new(crate::ffi_util::error_message(e)))
2188 }
2189 })
2190 .collect()
2191}