1use std::{
17 collections::BTreeMap,
18 ffi::CString,
19 fs, iter,
20 marker::PhantomData,
21 path::{Path, PathBuf},
22 ptr,
23 sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31 AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, DB,
32 DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
33 DEFAULT_COLUMN_FAMILY_NAME, Direction, Error, FlushOptions, IteratorMode, MultiThreaded,
34 Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode, ThreadMode, Transaction,
35 TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction, WriteOptions,
36 column_family::UnboundColumnFamily,
37 db::{DBAccess, convert_values},
38 db_options::OptionsMustOutliveDB,
39 ffi,
40 ffi_util::to_cpath,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_uchar, c_void, size_t};
44
45thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
52thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
53thread_local! { static DEFAULT_FLUSH_OPTS: FlushOptions = FlushOptions::default(); }
54
55#[cfg(not(feature = "multi-threaded-cf"))]
56type DefaultThreadMode = crate::SingleThreaded;
57#[cfg(feature = "multi-threaded-cf")]
58type DefaultThreadMode = crate::MultiThreaded;
59
60pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
101 pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
102 cfs: T,
103 path: PathBuf,
104 prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
106 _outlive: Vec<OptionsMustOutliveDB>,
107}
108
109unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
110unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
111
112impl<T: ThreadMode> DBAccess for TransactionDB<T> {
113 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
114 unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
115 }
116
117 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
118 unsafe {
119 ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
120 }
121 }
122
123 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
124 unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
125 }
126
127 unsafe fn create_iterator_cf(
128 &self,
129 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
130 readopts: &ReadOptions,
131 ) -> *mut ffi::rocksdb_iterator_t {
132 unsafe {
133 ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
134 }
135 }
136
137 fn get_opt<K: AsRef<[u8]>>(
138 &self,
139 key: K,
140 readopts: &ReadOptions,
141 ) -> Result<Option<Vec<u8>>, Error> {
142 self.get_opt(key, readopts)
143 }
144
145 fn get_cf_opt<K: AsRef<[u8]>>(
146 &self,
147 cf: &impl AsColumnFamilyRef,
148 key: K,
149 readopts: &ReadOptions,
150 ) -> Result<Option<Vec<u8>>, Error> {
151 self.get_cf_opt(cf, key, readopts)
152 }
153
154 fn get_pinned_opt<K: AsRef<[u8]>>(
155 &'_ self,
156 key: K,
157 readopts: &ReadOptions,
158 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
159 self.get_pinned_opt(key, readopts)
160 }
161
162 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
163 &'_ self,
164 cf: &impl AsColumnFamilyRef,
165 key: K,
166 readopts: &ReadOptions,
167 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
168 self.get_pinned_cf_opt(cf, key, readopts)
169 }
170
171 fn multi_get_opt<K, I>(
172 &self,
173 keys: I,
174 readopts: &ReadOptions,
175 ) -> Vec<Result<Option<Vec<u8>>, Error>>
176 where
177 K: AsRef<[u8]>,
178 I: IntoIterator<Item = K>,
179 {
180 self.multi_get_opt(keys, readopts)
181 }
182
183 fn multi_get_cf_opt<'b, K, I, W>(
184 &self,
185 keys_cf: I,
186 readopts: &ReadOptions,
187 ) -> Vec<Result<Option<Vec<u8>>, Error>>
188 where
189 K: AsRef<[u8]>,
190 I: IntoIterator<Item = (&'b W, K)>,
191 W: AsColumnFamilyRef + 'b,
192 {
193 self.multi_get_cf_opt(keys_cf, readopts)
194 }
195}
196
197impl<T: ThreadMode> TransactionDB<T> {
198 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
200 let mut opts = Options::default();
201 opts.create_if_missing(true);
202 let txn_db_opts = TransactionDBOptions::default();
203 Self::open(&opts, &txn_db_opts, path)
204 }
205
206 pub fn open<P: AsRef<Path>>(
208 opts: &Options,
209 txn_db_opts: &TransactionDBOptions,
210 path: P,
211 ) -> Result<Self, Error> {
212 Self::open_cf(opts, txn_db_opts, path, None::<&str>)
213 }
214
215 pub fn open_cf<P, I, N>(
219 opts: &Options,
220 txn_db_opts: &TransactionDBOptions,
221 path: P,
222 cfs: I,
223 ) -> Result<Self, Error>
224 where
225 P: AsRef<Path>,
226 I: IntoIterator<Item = N>,
227 N: AsRef<str>,
228 {
229 let cfs = cfs
230 .into_iter()
231 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
232
233 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
234 }
235
236 pub fn open_cf_descriptors<P, I>(
238 opts: &Options,
239 txn_db_opts: &TransactionDBOptions,
240 path: P,
241 cfs: I,
242 ) -> Result<Self, Error>
243 where
244 P: AsRef<Path>,
245 I: IntoIterator<Item = ColumnFamilyDescriptor>,
246 {
247 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
248 }
249
250 fn open_cf_descriptors_internal<P, I>(
252 opts: &Options,
253 txn_db_opts: &TransactionDBOptions,
254 path: P,
255 cfs: I,
256 ) -> Result<Self, Error>
257 where
258 P: AsRef<Path>,
259 I: IntoIterator<Item = ColumnFamilyDescriptor>,
260 {
261 let cfs: Vec<_> = cfs.into_iter().collect();
262 let outlive = iter::once(opts.outlive.clone())
263 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
264 .collect();
265
266 let cpath = to_cpath(&path)?;
267
268 if let Err(e) = fs::create_dir_all(&path) {
269 return Err(Error::new(format!(
270 "Failed to create RocksDB directory: `{e:?}`."
271 )));
272 }
273
274 let db: *mut ffi::rocksdb_transactiondb_t;
275 let mut cf_map = BTreeMap::new();
276
277 if cfs.is_empty() {
278 db = Self::open_raw(opts, txn_db_opts, &cpath)?;
279 } else {
280 let mut cfs_v = cfs;
281 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
283 cfs_v.push(ColumnFamilyDescriptor {
284 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
285 options: Options::default(),
286 ttl: ColumnFamilyTtl::SameAsDb, });
288 }
289 let c_cfs: Vec<CString> = cfs_v
292 .iter()
293 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
294 .collect();
295
296 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
297
298 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
300
301 let cfopts: Vec<_> = cfs_v
302 .iter()
303 .map(|cf| cf.options.inner.cast_const())
304 .collect();
305
306 db = Self::open_cf_raw(
307 opts,
308 txn_db_opts,
309 &cpath,
310 &cfs_v,
311 &cfnames,
312 &cfopts,
313 &mut cfhandles,
314 )?;
315
316 for handle in &cfhandles {
317 if handle.is_null() {
318 return Err(Error::new(
319 "Received null column family handle from DB.".to_owned(),
320 ));
321 }
322 }
323
324 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
325 cf_map.insert(cf_desc.name.clone(), inner);
326 }
327 }
328
329 if db.is_null() {
330 return Err(Error::new("Could not initialize database.".to_owned()));
331 }
332
333 let prepared = unsafe {
334 let mut cnt = 0;
335 let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &raw mut cnt);
336 let mut vec = vec![std::ptr::null_mut(); cnt];
337 if !ptr.is_null() {
338 std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
339 ffi::rocksdb_free(ptr as *mut c_void);
340 }
341 vec
342 };
343
344 Ok(TransactionDB {
345 inner: db,
346 cfs: T::new_cf_map_internal(cf_map),
347 path: path.as_ref().to_path_buf(),
348 prepared: Mutex::new(prepared),
349 _outlive: outlive,
350 })
351 }
352
353 fn open_raw(
354 opts: &Options,
355 txn_db_opts: &TransactionDBOptions,
356 cpath: &CString,
357 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
358 unsafe {
359 let db = ffi_try!(ffi::rocksdb_transactiondb_open(
360 opts.inner,
361 txn_db_opts.inner,
362 cpath.as_ptr()
363 ));
364 Ok(db)
365 }
366 }
367
368 fn open_cf_raw(
369 opts: &Options,
370 txn_db_opts: &TransactionDBOptions,
371 cpath: &CString,
372 cfs_v: &[ColumnFamilyDescriptor],
373 cfnames: &[*const c_char],
374 cfopts: &[*const ffi::rocksdb_options_t],
375 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
376 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
377 unsafe {
378 let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
379 opts.inner,
380 txn_db_opts.inner,
381 cpath.as_ptr(),
382 cfs_v.len() as c_int,
383 cfnames.as_ptr(),
384 cfopts.as_ptr(),
385 cfhandles.as_mut_ptr(),
386 ));
387 Ok(db)
388 }
389 }
390
391 fn create_inner_cf_handle(
392 &self,
393 name: &str,
394 opts: &Options,
395 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
396 let cf_name = CString::new(name.as_bytes()).map_err(|_| {
397 Error::new("Failed to convert path to CString when creating cf".to_owned())
398 })?;
399
400 Ok(unsafe {
401 ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
402 self.inner,
403 opts.inner,
404 cf_name.as_ptr(),
405 ))
406 })
407 }
408
409 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
410 DB::list_cf(opts, path)
411 }
412
413 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
414 DB::destroy(opts, path)
415 }
416
417 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
418 DB::repair(opts, path)
419 }
420
421 pub fn path(&self) -> &Path {
422 self.path.as_path()
423 }
424
425 pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
428 unsafe {
429 ffi_try!(ffi::rocksdb_transactiondb_flush_wal(
430 self.inner,
431 c_uchar::from(sync)
432 ));
433 }
434 Ok(())
435 }
436
437 pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
439 unsafe {
440 ffi_try!(ffi::rocksdb_transactiondb_flush(
441 self.inner,
442 flushopts.inner
443 ));
444 }
445 Ok(())
446 }
447
448 pub fn flush(&self) -> Result<(), Error> {
450 DEFAULT_FLUSH_OPTS.with(|opts| self.flush_opt(opts))
451 }
452
453 pub fn flush_cf_opt(
455 &self,
456 cf: &impl AsColumnFamilyRef,
457 flushopts: &FlushOptions,
458 ) -> Result<(), Error> {
459 unsafe {
460 ffi_try!(ffi::rocksdb_transactiondb_flush_cf(
461 self.inner,
462 flushopts.inner,
463 cf.inner()
464 ));
465 }
466 Ok(())
467 }
468
469 pub fn flush_cfs_opt(
475 &self,
476 cfs: &[&impl AsColumnFamilyRef],
477 opts: &FlushOptions,
478 ) -> Result<(), Error> {
479 let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
480 unsafe {
481 ffi_try!(ffi::rocksdb_transactiondb_flush_cfs(
482 self.inner,
483 opts.inner,
484 cfs.as_mut_ptr(),
485 cfs.len() as c_int,
486 ));
487 }
488 Ok(())
489 }
490
491 pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
494 DEFAULT_FLUSH_OPTS.with(|opts| self.flush_cf_opt(cf, opts))
495 }
496
497 pub fn transaction(&'_ self) -> Transaction<'_, Self> {
499 DEFAULT_WRITE_OPTS.with(|opts| self.transaction_opt(opts, &TransactionOptions::default()))
500 }
501
502 pub fn transaction_opt<'a>(
504 &'a self,
505 write_opts: &WriteOptions,
506 txn_opts: &TransactionOptions,
507 ) -> Transaction<'a, Self> {
508 Transaction {
509 inner: unsafe {
510 ffi::rocksdb_transaction_begin(
511 self.inner,
512 write_opts.inner,
513 txn_opts.inner,
514 std::ptr::null_mut(),
515 )
516 },
517 _marker: PhantomData,
518 }
519 }
520
521 pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
526 self.prepared
527 .lock()
528 .unwrap()
529 .drain(0..)
530 .map(|inner| Transaction {
531 inner,
532 _marker: PhantomData,
533 })
534 .collect()
535 }
536
537 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
539 self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
540 }
541
542 pub fn get_cf<K: AsRef<[u8]>>(
544 &self,
545 cf: &impl AsColumnFamilyRef,
546 key: K,
547 ) -> Result<Option<Vec<u8>>, Error> {
548 self.get_pinned_cf(cf, key)
549 .map(|x| x.map(|v| v.as_ref().to_vec()))
550 }
551
552 pub fn get_opt<K: AsRef<[u8]>>(
554 &self,
555 key: K,
556 readopts: &ReadOptions,
557 ) -> Result<Option<Vec<u8>>, Error> {
558 self.get_pinned_opt(key, readopts)
559 .map(|x| x.map(|v| v.as_ref().to_vec()))
560 }
561
562 pub fn get_cf_opt<K: AsRef<[u8]>>(
564 &self,
565 cf: &impl AsColumnFamilyRef,
566 key: K,
567 readopts: &ReadOptions,
568 ) -> Result<Option<Vec<u8>>, Error> {
569 self.get_pinned_cf_opt(cf, key, readopts)
570 .map(|x| x.map(|v| v.as_ref().to_vec()))
571 }
572
573 pub fn get_pinned<K: AsRef<[u8]>>(
574 &'_ self,
575 key: K,
576 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
577 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
578 }
579
580 pub fn get_pinned_cf<K: AsRef<[u8]>>(
582 &'_ self,
583 cf: &impl AsColumnFamilyRef,
584 key: K,
585 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
586 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
587 }
588
589 pub fn get_pinned_opt<K: AsRef<[u8]>>(
591 &'_ self,
592 key: K,
593 readopts: &ReadOptions,
594 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
595 let key = key.as_ref();
596 unsafe {
597 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
598 self.inner,
599 readopts.inner,
600 key.as_ptr() as *const c_char,
601 key.len() as size_t,
602 ));
603 if val.is_null() {
604 Ok(None)
605 } else {
606 Ok(Some(DBPinnableSlice::from_c(val)))
607 }
608 }
609 }
610
611 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
613 &'_ self,
614 cf: &impl AsColumnFamilyRef,
615 key: K,
616 readopts: &ReadOptions,
617 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
618 let key = key.as_ref();
619 unsafe {
620 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
621 self.inner,
622 readopts.inner,
623 cf.inner(),
624 key.as_ptr() as *const c_char,
625 key.len() as size_t,
626 ));
627 if val.is_null() {
628 Ok(None)
629 } else {
630 Ok(Some(DBPinnableSlice::from_c(val)))
631 }
632 }
633 }
634
635 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
637 where
638 K: AsRef<[u8]>,
639 I: IntoIterator<Item = K>,
640 {
641 DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
642 }
643
644 pub fn multi_get_opt<K, I>(
646 &self,
647 keys: I,
648 readopts: &ReadOptions,
649 ) -> Vec<Result<Option<Vec<u8>>, Error>>
650 where
651 K: AsRef<[u8]>,
652 I: IntoIterator<Item = K>,
653 {
654 let owned_keys: Vec<K> = keys.into_iter().collect();
655 let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
656 let ptr_keys: Vec<*const c_char> = owned_keys
657 .iter()
658 .map(|k| k.as_ref().as_ptr() as *const c_char)
659 .collect();
660
661 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
662 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
663 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
664 unsafe {
665 ffi::rocksdb_transactiondb_multi_get(
666 self.inner,
667 readopts.inner,
668 ptr_keys.len(),
669 ptr_keys.as_ptr(),
670 keys_sizes.as_ptr(),
671 values.as_mut_ptr(),
672 values_sizes.as_mut_ptr(),
673 errors.as_mut_ptr(),
674 );
675 }
676
677 unsafe {
678 values.set_len(ptr_keys.len());
679 values_sizes.set_len(ptr_keys.len());
680 errors.set_len(ptr_keys.len());
681 }
682
683 convert_values(values, values_sizes, errors)
684 }
685
686 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
688 &'a self,
689 keys: I,
690 ) -> Vec<Result<Option<Vec<u8>>, Error>>
691 where
692 K: AsRef<[u8]>,
693 I: IntoIterator<Item = (&'b W, K)>,
694 W: 'b + AsColumnFamilyRef,
695 {
696 DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
697 }
698
699 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
701 &'a self,
702 keys: I,
703 readopts: &ReadOptions,
704 ) -> Vec<Result<Option<Vec<u8>>, Error>>
705 where
706 K: AsRef<[u8]>,
707 I: IntoIterator<Item = (&'b W, K)>,
708 W: 'b + AsColumnFamilyRef,
709 {
710 let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
711 let keys_sizes: Vec<usize> = cfs_and_owned_keys
712 .iter()
713 .map(|(_, k)| k.as_ref().len())
714 .collect();
715 let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
716 .iter()
717 .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
718 .collect();
719 let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
720 .iter()
721 .map(|(c, _)| c.inner().cast_const())
722 .collect();
723 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
724 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
725 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
726 unsafe {
727 ffi::rocksdb_transactiondb_multi_get_cf(
728 self.inner,
729 readopts.inner,
730 ptr_cfs.as_ptr(),
731 ptr_keys.len(),
732 ptr_keys.as_ptr(),
733 keys_sizes.as_ptr(),
734 values.as_mut_ptr(),
735 values_sizes.as_mut_ptr(),
736 errors.as_mut_ptr(),
737 );
738 }
739
740 unsafe {
741 values.set_len(ptr_keys.len());
742 values_sizes.set_len(ptr_keys.len());
743 errors.set_len(ptr_keys.len());
744 }
745
746 convert_values(values, values_sizes, errors)
747 }
748
749 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
750 where
751 K: AsRef<[u8]>,
752 V: AsRef<[u8]>,
753 {
754 DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
755 }
756
757 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
758 where
759 K: AsRef<[u8]>,
760 V: AsRef<[u8]>,
761 {
762 DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
763 }
764
765 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
766 where
767 K: AsRef<[u8]>,
768 V: AsRef<[u8]>,
769 {
770 let key = key.as_ref();
771 let value = value.as_ref();
772 unsafe {
773 ffi_try!(ffi::rocksdb_transactiondb_put(
774 self.inner,
775 writeopts.inner,
776 key.as_ptr() as *const c_char,
777 key.len() as size_t,
778 value.as_ptr() as *const c_char,
779 value.len() as size_t
780 ));
781 }
782 Ok(())
783 }
784
785 pub fn put_cf_opt<K, V>(
786 &self,
787 cf: &impl AsColumnFamilyRef,
788 key: K,
789 value: V,
790 writeopts: &WriteOptions,
791 ) -> Result<(), Error>
792 where
793 K: AsRef<[u8]>,
794 V: AsRef<[u8]>,
795 {
796 let key = key.as_ref();
797 let value = value.as_ref();
798 unsafe {
799 ffi_try!(ffi::rocksdb_transactiondb_put_cf(
800 self.inner,
801 writeopts.inner,
802 cf.inner(),
803 key.as_ptr() as *const c_char,
804 key.len() as size_t,
805 value.as_ptr() as *const c_char,
806 value.len() as size_t
807 ));
808 }
809 Ok(())
810 }
811
812 pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
813 DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
814 }
815
816 pub fn write_opt(
817 &self,
818 batch: &WriteBatchWithTransaction<true>,
819 writeopts: &WriteOptions,
820 ) -> Result<(), Error> {
821 unsafe {
822 ffi_try!(ffi::rocksdb_transactiondb_write(
823 self.inner,
824 writeopts.inner,
825 batch.inner
826 ));
827 }
828 Ok(())
829 }
830
831 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
832 where
833 K: AsRef<[u8]>,
834 V: AsRef<[u8]>,
835 {
836 DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
837 }
838
839 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
840 where
841 K: AsRef<[u8]>,
842 V: AsRef<[u8]>,
843 {
844 DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
845 }
846
847 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
848 where
849 K: AsRef<[u8]>,
850 V: AsRef<[u8]>,
851 {
852 let key = key.as_ref();
853 let value = value.as_ref();
854 unsafe {
855 ffi_try!(ffi::rocksdb_transactiondb_merge(
856 self.inner,
857 writeopts.inner,
858 key.as_ptr() as *const c_char,
859 key.len() as size_t,
860 value.as_ptr() as *const c_char,
861 value.len() as size_t,
862 ));
863 Ok(())
864 }
865 }
866
867 pub fn merge_cf_opt<K, V>(
868 &self,
869 cf: &impl AsColumnFamilyRef,
870 key: K,
871 value: V,
872 writeopts: &WriteOptions,
873 ) -> Result<(), Error>
874 where
875 K: AsRef<[u8]>,
876 V: AsRef<[u8]>,
877 {
878 let key = key.as_ref();
879 let value = value.as_ref();
880 unsafe {
881 ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
882 self.inner,
883 writeopts.inner,
884 cf.inner(),
885 key.as_ptr() as *const c_char,
886 key.len() as size_t,
887 value.as_ptr() as *const c_char,
888 value.len() as size_t,
889 ));
890 Ok(())
891 }
892 }
893
894 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
895 DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
896 }
897
898 pub fn delete_cf<K: AsRef<[u8]>>(
899 &self,
900 cf: &impl AsColumnFamilyRef,
901 key: K,
902 ) -> Result<(), Error> {
903 DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
904 }
905
906 pub fn delete_opt<K: AsRef<[u8]>>(
907 &self,
908 key: K,
909 writeopts: &WriteOptions,
910 ) -> Result<(), Error> {
911 let key = key.as_ref();
912 unsafe {
913 ffi_try!(ffi::rocksdb_transactiondb_delete(
914 self.inner,
915 writeopts.inner,
916 key.as_ptr() as *const c_char,
917 key.len() as size_t,
918 ));
919 }
920 Ok(())
921 }
922
923 pub fn delete_cf_opt<K: AsRef<[u8]>>(
924 &self,
925 cf: &impl AsColumnFamilyRef,
926 key: K,
927 writeopts: &WriteOptions,
928 ) -> Result<(), Error> {
929 let key = key.as_ref();
930 unsafe {
931 ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
932 self.inner,
933 writeopts.inner,
934 cf.inner(),
935 key.as_ptr() as *const c_char,
936 key.len() as size_t,
937 ));
938 }
939 Ok(())
940 }
941
942 pub fn iterator<'a: 'b, 'b>(
943 &'a self,
944 mode: IteratorMode,
945 ) -> DBIteratorWithThreadMode<'b, Self> {
946 let readopts = ReadOptions::default();
947 self.iterator_opt(mode, readopts)
948 }
949
950 pub fn iterator_opt<'a: 'b, 'b>(
951 &'a self,
952 mode: IteratorMode,
953 readopts: ReadOptions,
954 ) -> DBIteratorWithThreadMode<'b, Self> {
955 DBIteratorWithThreadMode::new(self, readopts, mode)
956 }
957
958 pub fn iterator_cf_opt<'a: 'b, 'b>(
961 &'a self,
962 cf_handle: &impl AsColumnFamilyRef,
963 readopts: ReadOptions,
964 mode: IteratorMode,
965 ) -> DBIteratorWithThreadMode<'b, Self> {
966 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
967 }
968
969 pub fn full_iterator<'a: 'b, 'b>(
973 &'a self,
974 mode: IteratorMode,
975 ) -> DBIteratorWithThreadMode<'b, Self> {
976 let mut opts = ReadOptions::default();
977 opts.set_total_order_seek(true);
978 DBIteratorWithThreadMode::new(self, opts, mode)
979 }
980
981 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
982 &'a self,
983 prefix: P,
984 ) -> DBIteratorWithThreadMode<'b, Self> {
985 let mut opts = ReadOptions::default();
986 opts.set_prefix_same_as_start(true);
987 DBIteratorWithThreadMode::new(
988 self,
989 opts,
990 IteratorMode::From(prefix.as_ref(), Direction::Forward),
991 )
992 }
993
994 pub fn iterator_cf<'a: 'b, 'b>(
995 &'a self,
996 cf_handle: &impl AsColumnFamilyRef,
997 mode: IteratorMode,
998 ) -> DBIteratorWithThreadMode<'b, Self> {
999 let opts = ReadOptions::default();
1000 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1001 }
1002
1003 pub fn full_iterator_cf<'a: 'b, 'b>(
1004 &'a self,
1005 cf_handle: &impl AsColumnFamilyRef,
1006 mode: IteratorMode,
1007 ) -> DBIteratorWithThreadMode<'b, Self> {
1008 let mut opts = ReadOptions::default();
1009 opts.set_total_order_seek(true);
1010 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
1011 }
1012
1013 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
1014 &'a self,
1015 cf_handle: &impl AsColumnFamilyRef,
1016 prefix: P,
1017 ) -> DBIteratorWithThreadMode<'a, Self> {
1018 let mut opts = ReadOptions::default();
1019 opts.set_prefix_same_as_start(true);
1020 DBIteratorWithThreadMode::<'a, Self>::new_cf(
1021 self,
1022 cf_handle.inner(),
1023 opts,
1024 IteratorMode::From(prefix.as_ref(), Direction::Forward),
1025 )
1026 }
1027
1028 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
1030 let opts = ReadOptions::default();
1031 DBRawIteratorWithThreadMode::new(self, opts)
1032 }
1033
1034 pub fn raw_iterator_cf<'a: 'b, 'b>(
1036 &'a self,
1037 cf_handle: &impl AsColumnFamilyRef,
1038 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1039 let opts = ReadOptions::default();
1040 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
1041 }
1042
1043 pub fn raw_iterator_opt<'a: 'b, 'b>(
1045 &'a self,
1046 readopts: ReadOptions,
1047 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1048 DBRawIteratorWithThreadMode::new(self, readopts)
1049 }
1050
1051 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
1053 &'a self,
1054 cf_handle: &impl AsColumnFamilyRef,
1055 readopts: ReadOptions,
1056 ) -> DBRawIteratorWithThreadMode<'b, Self> {
1057 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
1058 }
1059
1060 pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
1061 SnapshotWithThreadMode::<Self>::new(self)
1062 }
1063
1064 fn drop_column_family<C>(
1065 &self,
1066 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
1067 _cf: C,
1068 ) -> Result<(), Error> {
1069 unsafe {
1070 ffi_try!(ffi::rocksdb_drop_column_family(
1072 self.inner as *mut ffi::rocksdb_t,
1073 cf_inner
1074 ));
1075 }
1076 Ok(())
1079 }
1080}
1081
1082impl TransactionDB<SingleThreaded> {
1083 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
1085 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1086 self.cfs
1087 .cfs
1088 .insert(name.as_ref().to_string(), ColumnFamily { inner });
1089 Ok(())
1090 }
1091
1092 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
1094 self.cfs.cfs.get(name)
1095 }
1096
1097 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
1099 match self.cfs.cfs.remove(name) {
1100 Some(cf) => self.drop_column_family(cf.inner, cf),
1101 _ => Err(Error::new(format!("Invalid column family: {name}"))),
1102 }
1103 }
1104}
1105
1106impl TransactionDB<MultiThreaded> {
1107 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1109 let mut cfs = self.cfs.cfs.write();
1112 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1113 cfs.insert(
1114 name.as_ref().to_string(),
1115 Arc::new(UnboundColumnFamily { inner }),
1116 );
1117 Ok(())
1118 }
1119
1120 pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
1122 self.cfs
1123 .cfs
1124 .read()
1125 .get(name)
1126 .cloned()
1127 .map(UnboundColumnFamily::bound_column_family)
1128 }
1129
1130 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1133 match self.cfs.cfs.write().remove(name) {
1134 Some(cf) => self.drop_column_family(cf.inner, cf),
1135 _ => Err(Error::new(format!("Invalid column family: {name}"))),
1136 }
1137 }
1138
1139 fn property_value_impl<R>(
1148 name: impl CStrLike,
1149 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1150 parse: impl FnOnce(&str) -> Result<R, Error>,
1151 ) -> Result<Option<R>, Error> {
1152 let value = match name.bake() {
1153 Ok(prop_name) => get_property(prop_name.as_ptr()),
1154 Err(e) => {
1155 return Err(Error::new(format!(
1156 "Failed to convert property name to CString: {e}"
1157 )));
1158 }
1159 };
1160 if value.is_null() {
1161 return Ok(None);
1162 }
1163 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1164 Ok(s) => parse(s).map(|value| Some(value)),
1165 Err(e) => Err(Error::new(format!(
1166 "Failed to convert property value to string: {e}"
1167 ))),
1168 };
1169 unsafe {
1170 ffi::rocksdb_free(value as *mut c_void);
1171 }
1172 result
1173 }
1174
1175 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1180 Self::property_value_impl(
1181 name,
1182 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1183 |str_value| Ok(str_value.to_owned()),
1184 )
1185 }
1186
1187 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1188 value.parse::<u64>().map_err(|err| {
1189 Error::new(format!(
1190 "Failed to convert property value {value} to int: {err}"
1191 ))
1192 })
1193 }
1194
1195 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1200 Self::property_value_impl(
1201 name,
1202 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1203 Self::parse_property_int_value,
1204 )
1205 }
1206}
1207
1208impl<T: ThreadMode> Drop for TransactionDB<T> {
1209 fn drop(&mut self) {
1210 unsafe {
1211 self.prepared_transactions().clear();
1212 self.cfs.drop_all_cfs_internal();
1213 ffi::rocksdb_transactiondb_close(self.inner);
1214 }
1215 }
1216}