1use std::{
17 collections::BTreeMap,
18 ffi::CString,
19 fs, iter,
20 marker::PhantomData,
21 path::{Path, PathBuf},
22 ptr,
23 sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31 column_family::UnboundColumnFamily,
32 db::{convert_values, DBAccess},
33 db_options::OptionsMustOutliveDB,
34 ffi,
35 ffi_util::to_cpath,
36 AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
37 DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
38 IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
39 ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
40 WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_void, size_t};
44
45#[cfg(not(feature = "multi-threaded-cf"))]
46type DefaultThreadMode = crate::SingleThreaded;
47#[cfg(feature = "multi-threaded-cf")]
48type DefaultThreadMode = crate::MultiThreaded;
49
50pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
80 pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
81 cfs: T,
82 path: PathBuf,
83 prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
85 _outlive: Vec<OptionsMustOutliveDB>,
86}
87
88unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
89unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
90
91impl<T: ThreadMode> DBAccess for TransactionDB<T> {
92 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
93 unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
94 }
95
96 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
97 unsafe {
98 ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
99 }
100 }
101
102 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
103 unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
104 }
105
106 unsafe fn create_iterator_cf(
107 &self,
108 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
109 readopts: &ReadOptions,
110 ) -> *mut ffi::rocksdb_iterator_t {
111 unsafe {
112 ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
113 }
114 }
115
116 fn get_opt<K: AsRef<[u8]>>(
117 &self,
118 key: K,
119 readopts: &ReadOptions,
120 ) -> Result<Option<Vec<u8>>, Error> {
121 self.get_opt(key, readopts)
122 }
123
124 fn get_cf_opt<K: AsRef<[u8]>>(
125 &self,
126 cf: &impl AsColumnFamilyRef,
127 key: K,
128 readopts: &ReadOptions,
129 ) -> Result<Option<Vec<u8>>, Error> {
130 self.get_cf_opt(cf, key, readopts)
131 }
132
133 fn get_pinned_opt<K: AsRef<[u8]>>(
134 &'_ self,
135 key: K,
136 readopts: &ReadOptions,
137 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
138 self.get_pinned_opt(key, readopts)
139 }
140
141 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
142 &'_ self,
143 cf: &impl AsColumnFamilyRef,
144 key: K,
145 readopts: &ReadOptions,
146 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
147 self.get_pinned_cf_opt(cf, key, readopts)
148 }
149
150 fn multi_get_opt<K, I>(
151 &self,
152 keys: I,
153 readopts: &ReadOptions,
154 ) -> Vec<Result<Option<Vec<u8>>, Error>>
155 where
156 K: AsRef<[u8]>,
157 I: IntoIterator<Item = K>,
158 {
159 self.multi_get_opt(keys, readopts)
160 }
161
162 fn multi_get_cf_opt<'b, K, I, W>(
163 &self,
164 keys_cf: I,
165 readopts: &ReadOptions,
166 ) -> Vec<Result<Option<Vec<u8>>, Error>>
167 where
168 K: AsRef<[u8]>,
169 I: IntoIterator<Item = (&'b W, K)>,
170 W: AsColumnFamilyRef + 'b,
171 {
172 self.multi_get_cf_opt(keys_cf, readopts)
173 }
174}
175
176impl<T: ThreadMode> TransactionDB<T> {
177 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
179 let mut opts = Options::default();
180 opts.create_if_missing(true);
181 let txn_db_opts = TransactionDBOptions::default();
182 Self::open(&opts, &txn_db_opts, path)
183 }
184
185 pub fn open<P: AsRef<Path>>(
187 opts: &Options,
188 txn_db_opts: &TransactionDBOptions,
189 path: P,
190 ) -> Result<Self, Error> {
191 Self::open_cf(opts, txn_db_opts, path, None::<&str>)
192 }
193
194 pub fn open_cf<P, I, N>(
198 opts: &Options,
199 txn_db_opts: &TransactionDBOptions,
200 path: P,
201 cfs: I,
202 ) -> Result<Self, Error>
203 where
204 P: AsRef<Path>,
205 I: IntoIterator<Item = N>,
206 N: AsRef<str>,
207 {
208 let cfs = cfs
209 .into_iter()
210 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
211
212 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
213 }
214
215 pub fn open_cf_descriptors<P, I>(
217 opts: &Options,
218 txn_db_opts: &TransactionDBOptions,
219 path: P,
220 cfs: I,
221 ) -> Result<Self, Error>
222 where
223 P: AsRef<Path>,
224 I: IntoIterator<Item = ColumnFamilyDescriptor>,
225 {
226 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
227 }
228
229 fn open_cf_descriptors_internal<P, I>(
231 opts: &Options,
232 txn_db_opts: &TransactionDBOptions,
233 path: P,
234 cfs: I,
235 ) -> Result<Self, Error>
236 where
237 P: AsRef<Path>,
238 I: IntoIterator<Item = ColumnFamilyDescriptor>,
239 {
240 let cfs: Vec<_> = cfs.into_iter().collect();
241 let outlive = iter::once(opts.outlive.clone())
242 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
243 .collect();
244
245 let cpath = to_cpath(&path)?;
246
247 if let Err(e) = fs::create_dir_all(&path) {
248 return Err(Error::new(format!(
249 "Failed to create RocksDB directory: `{e:?}`."
250 )));
251 }
252
253 let db: *mut ffi::rocksdb_transactiondb_t;
254 let mut cf_map = BTreeMap::new();
255
256 if cfs.is_empty() {
257 db = Self::open_raw(opts, txn_db_opts, &cpath)?;
258 } else {
259 let mut cfs_v = cfs;
260 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
262 cfs_v.push(ColumnFamilyDescriptor {
263 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
264 options: Options::default(),
265 ttl: ColumnFamilyTtl::SameAsDb, });
267 }
268 let c_cfs: Vec<CString> = cfs_v
271 .iter()
272 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
273 .collect();
274
275 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
276
277 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
279
280 let cfopts: Vec<_> = cfs_v
281 .iter()
282 .map(|cf| cf.options.inner.cast_const())
283 .collect();
284
285 db = Self::open_cf_raw(
286 opts,
287 txn_db_opts,
288 &cpath,
289 &cfs_v,
290 &cfnames,
291 &cfopts,
292 &mut cfhandles,
293 )?;
294
295 for handle in &cfhandles {
296 if handle.is_null() {
297 return Err(Error::new(
298 "Received null column family handle from DB.".to_owned(),
299 ));
300 }
301 }
302
303 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
304 cf_map.insert(cf_desc.name.clone(), inner);
305 }
306 }
307
308 if db.is_null() {
309 return Err(Error::new("Could not initialize database.".to_owned()));
310 }
311
312 let prepared = unsafe {
313 let mut cnt = 0;
314 let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
315 let mut vec = vec![std::ptr::null_mut(); cnt];
316 if !ptr.is_null() {
317 std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
318 ffi::rocksdb_free(ptr as *mut c_void);
319 }
320 vec
321 };
322
323 Ok(TransactionDB {
324 inner: db,
325 cfs: T::new_cf_map_internal(cf_map),
326 path: path.as_ref().to_path_buf(),
327 prepared: Mutex::new(prepared),
328 _outlive: outlive,
329 })
330 }
331
332 fn open_raw(
333 opts: &Options,
334 txn_db_opts: &TransactionDBOptions,
335 cpath: &CString,
336 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
337 unsafe {
338 let db = ffi_try!(ffi::rocksdb_transactiondb_open(
339 opts.inner,
340 txn_db_opts.inner,
341 cpath.as_ptr()
342 ));
343 Ok(db)
344 }
345 }
346
347 fn open_cf_raw(
348 opts: &Options,
349 txn_db_opts: &TransactionDBOptions,
350 cpath: &CString,
351 cfs_v: &[ColumnFamilyDescriptor],
352 cfnames: &[*const c_char],
353 cfopts: &[*const ffi::rocksdb_options_t],
354 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
355 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
356 unsafe {
357 let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
358 opts.inner,
359 txn_db_opts.inner,
360 cpath.as_ptr(),
361 cfs_v.len() as c_int,
362 cfnames.as_ptr(),
363 cfopts.as_ptr(),
364 cfhandles.as_mut_ptr(),
365 ));
366 Ok(db)
367 }
368 }
369
370 fn create_inner_cf_handle(
371 &self,
372 name: &str,
373 opts: &Options,
374 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
375 let cf_name = CString::new(name.as_bytes()).map_err(|_| {
376 Error::new("Failed to convert path to CString when creating cf".to_owned())
377 })?;
378
379 Ok(unsafe {
380 ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
381 self.inner,
382 opts.inner,
383 cf_name.as_ptr(),
384 ))
385 })
386 }
387
388 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
389 DB::list_cf(opts, path)
390 }
391
392 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
393 DB::destroy(opts, path)
394 }
395
396 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
397 DB::repair(opts, path)
398 }
399
400 pub fn path(&self) -> &Path {
401 self.path.as_path()
402 }
403
404 pub fn transaction(&'_ self) -> Transaction<'_, Self> {
406 self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
407 }
408
409 pub fn transaction_opt<'a>(
411 &'a self,
412 write_opts: &WriteOptions,
413 txn_opts: &TransactionOptions,
414 ) -> Transaction<'a, Self> {
415 Transaction {
416 inner: unsafe {
417 ffi::rocksdb_transaction_begin(
418 self.inner,
419 write_opts.inner,
420 txn_opts.inner,
421 std::ptr::null_mut(),
422 )
423 },
424 _marker: PhantomData,
425 }
426 }
427
428 pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
433 self.prepared
434 .lock()
435 .unwrap()
436 .drain(0..)
437 .map(|inner| Transaction {
438 inner,
439 _marker: PhantomData,
440 })
441 .collect()
442 }
443
444 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
446 self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
447 }
448
449 pub fn get_cf<K: AsRef<[u8]>>(
451 &self,
452 cf: &impl AsColumnFamilyRef,
453 key: K,
454 ) -> Result<Option<Vec<u8>>, Error> {
455 self.get_pinned_cf(cf, key)
456 .map(|x| x.map(|v| v.as_ref().to_vec()))
457 }
458
459 pub fn get_opt<K: AsRef<[u8]>>(
461 &self,
462 key: K,
463 readopts: &ReadOptions,
464 ) -> Result<Option<Vec<u8>>, Error> {
465 self.get_pinned_opt(key, readopts)
466 .map(|x| x.map(|v| v.as_ref().to_vec()))
467 }
468
469 pub fn get_cf_opt<K: AsRef<[u8]>>(
471 &self,
472 cf: &impl AsColumnFamilyRef,
473 key: K,
474 readopts: &ReadOptions,
475 ) -> Result<Option<Vec<u8>>, Error> {
476 self.get_pinned_cf_opt(cf, key, readopts)
477 .map(|x| x.map(|v| v.as_ref().to_vec()))
478 }
479
480 pub fn get_pinned<K: AsRef<[u8]>>(
481 &'_ self,
482 key: K,
483 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
484 self.get_pinned_opt(key, &ReadOptions::default())
485 }
486
487 pub fn get_pinned_cf<K: AsRef<[u8]>>(
489 &'_ self,
490 cf: &impl AsColumnFamilyRef,
491 key: K,
492 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
493 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
494 }
495
496 pub fn get_pinned_opt<K: AsRef<[u8]>>(
498 &'_ self,
499 key: K,
500 readopts: &ReadOptions,
501 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
502 let key = key.as_ref();
503 unsafe {
504 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
505 self.inner,
506 readopts.inner,
507 key.as_ptr() as *const c_char,
508 key.len() as size_t,
509 ));
510 if val.is_null() {
511 Ok(None)
512 } else {
513 Ok(Some(DBPinnableSlice::from_c(val)))
514 }
515 }
516 }
517
518 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
520 &'_ self,
521 cf: &impl AsColumnFamilyRef,
522 key: K,
523 readopts: &ReadOptions,
524 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
525 let key = key.as_ref();
526 unsafe {
527 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
528 self.inner,
529 readopts.inner,
530 cf.inner(),
531 key.as_ptr() as *const c_char,
532 key.len() as size_t,
533 ));
534 if val.is_null() {
535 Ok(None)
536 } else {
537 Ok(Some(DBPinnableSlice::from_c(val)))
538 }
539 }
540 }
541
542 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
544 where
545 K: AsRef<[u8]>,
546 I: IntoIterator<Item = K>,
547 {
548 self.multi_get_opt(keys, &ReadOptions::default())
549 }
550
551 pub fn multi_get_opt<K, I>(
553 &self,
554 keys: I,
555 readopts: &ReadOptions,
556 ) -> Vec<Result<Option<Vec<u8>>, Error>>
557 where
558 K: AsRef<[u8]>,
559 I: IntoIterator<Item = K>,
560 {
561 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
562 .into_iter()
563 .map(|key| {
564 let key = key.as_ref();
565 (Box::from(key), key.len())
566 })
567 .unzip();
568 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
569
570 let mut values = vec![ptr::null_mut(); keys.len()];
571 let mut values_sizes = vec![0_usize; keys.len()];
572 let mut errors = vec![ptr::null_mut(); keys.len()];
573 unsafe {
574 ffi::rocksdb_transactiondb_multi_get(
575 self.inner,
576 readopts.inner,
577 ptr_keys.len(),
578 ptr_keys.as_ptr(),
579 keys_sizes.as_ptr(),
580 values.as_mut_ptr(),
581 values_sizes.as_mut_ptr(),
582 errors.as_mut_ptr(),
583 );
584 }
585
586 convert_values(values, values_sizes, errors)
587 }
588
589 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
591 &'a self,
592 keys: I,
593 ) -> Vec<Result<Option<Vec<u8>>, Error>>
594 where
595 K: AsRef<[u8]>,
596 I: IntoIterator<Item = (&'b W, K)>,
597 W: 'b + AsColumnFamilyRef,
598 {
599 self.multi_get_cf_opt(keys, &ReadOptions::default())
600 }
601
602 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
604 &'a self,
605 keys: I,
606 readopts: &ReadOptions,
607 ) -> Vec<Result<Option<Vec<u8>>, Error>>
608 where
609 K: AsRef<[u8]>,
610 I: IntoIterator<Item = (&'b W, K)>,
611 W: 'b + AsColumnFamilyRef,
612 {
613 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
614 .into_iter()
615 .map(|(cf, key)| {
616 let key = key.as_ref();
617 ((cf, Box::from(key)), key.len())
618 })
619 .unzip();
620 let ptr_keys: Vec<_> = cfs_and_keys
621 .iter()
622 .map(|(_, k)| k.as_ptr() as *const c_char)
623 .collect();
624 let ptr_cfs: Vec<_> = cfs_and_keys
625 .iter()
626 .map(|(c, _)| c.inner().cast_const())
627 .collect();
628
629 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
630 let mut values_sizes = vec![0_usize; ptr_keys.len()];
631 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
632 unsafe {
633 ffi::rocksdb_transactiondb_multi_get_cf(
634 self.inner,
635 readopts.inner,
636 ptr_cfs.as_ptr(),
637 ptr_keys.len(),
638 ptr_keys.as_ptr(),
639 keys_sizes.as_ptr(),
640 values.as_mut_ptr(),
641 values_sizes.as_mut_ptr(),
642 errors.as_mut_ptr(),
643 );
644 }
645
646 convert_values(values, values_sizes, errors)
647 }
648
649 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
650 where
651 K: AsRef<[u8]>,
652 V: AsRef<[u8]>,
653 {
654 self.put_opt(key, value, &WriteOptions::default())
655 }
656
657 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
658 where
659 K: AsRef<[u8]>,
660 V: AsRef<[u8]>,
661 {
662 self.put_cf_opt(cf, key, value, &WriteOptions::default())
663 }
664
665 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
666 where
667 K: AsRef<[u8]>,
668 V: AsRef<[u8]>,
669 {
670 let key = key.as_ref();
671 let value = value.as_ref();
672 unsafe {
673 ffi_try!(ffi::rocksdb_transactiondb_put(
674 self.inner,
675 writeopts.inner,
676 key.as_ptr() as *const c_char,
677 key.len() as size_t,
678 value.as_ptr() as *const c_char,
679 value.len() as size_t
680 ));
681 }
682 Ok(())
683 }
684
685 pub fn put_cf_opt<K, V>(
686 &self,
687 cf: &impl AsColumnFamilyRef,
688 key: K,
689 value: V,
690 writeopts: &WriteOptions,
691 ) -> Result<(), Error>
692 where
693 K: AsRef<[u8]>,
694 V: AsRef<[u8]>,
695 {
696 let key = key.as_ref();
697 let value = value.as_ref();
698 unsafe {
699 ffi_try!(ffi::rocksdb_transactiondb_put_cf(
700 self.inner,
701 writeopts.inner,
702 cf.inner(),
703 key.as_ptr() as *const c_char,
704 key.len() as size_t,
705 value.as_ptr() as *const c_char,
706 value.len() as size_t
707 ));
708 }
709 Ok(())
710 }
711
712 pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
713 self.write_opt(batch, &WriteOptions::default())
714 }
715
716 pub fn write_opt(
717 &self,
718 batch: &WriteBatchWithTransaction<true>,
719 writeopts: &WriteOptions,
720 ) -> Result<(), Error> {
721 unsafe {
722 ffi_try!(ffi::rocksdb_transactiondb_write(
723 self.inner,
724 writeopts.inner,
725 batch.inner
726 ));
727 }
728 Ok(())
729 }
730
731 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
732 where
733 K: AsRef<[u8]>,
734 V: AsRef<[u8]>,
735 {
736 self.merge_opt(key, value, &WriteOptions::default())
737 }
738
739 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
740 where
741 K: AsRef<[u8]>,
742 V: AsRef<[u8]>,
743 {
744 self.merge_cf_opt(cf, key, value, &WriteOptions::default())
745 }
746
747 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
748 where
749 K: AsRef<[u8]>,
750 V: AsRef<[u8]>,
751 {
752 let key = key.as_ref();
753 let value = value.as_ref();
754 unsafe {
755 ffi_try!(ffi::rocksdb_transactiondb_merge(
756 self.inner,
757 writeopts.inner,
758 key.as_ptr() as *const c_char,
759 key.len() as size_t,
760 value.as_ptr() as *const c_char,
761 value.len() as size_t,
762 ));
763 Ok(())
764 }
765 }
766
767 pub fn merge_cf_opt<K, V>(
768 &self,
769 cf: &impl AsColumnFamilyRef,
770 key: K,
771 value: V,
772 writeopts: &WriteOptions,
773 ) -> Result<(), Error>
774 where
775 K: AsRef<[u8]>,
776 V: AsRef<[u8]>,
777 {
778 let key = key.as_ref();
779 let value = value.as_ref();
780 unsafe {
781 ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
782 self.inner,
783 writeopts.inner,
784 cf.inner(),
785 key.as_ptr() as *const c_char,
786 key.len() as size_t,
787 value.as_ptr() as *const c_char,
788 value.len() as size_t,
789 ));
790 Ok(())
791 }
792 }
793
794 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
795 self.delete_opt(key, &WriteOptions::default())
796 }
797
798 pub fn delete_cf<K: AsRef<[u8]>>(
799 &self,
800 cf: &impl AsColumnFamilyRef,
801 key: K,
802 ) -> Result<(), Error> {
803 self.delete_cf_opt(cf, key, &WriteOptions::default())
804 }
805
806 pub fn delete_opt<K: AsRef<[u8]>>(
807 &self,
808 key: K,
809 writeopts: &WriteOptions,
810 ) -> Result<(), Error> {
811 let key = key.as_ref();
812 unsafe {
813 ffi_try!(ffi::rocksdb_transactiondb_delete(
814 self.inner,
815 writeopts.inner,
816 key.as_ptr() as *const c_char,
817 key.len() as size_t,
818 ));
819 }
820 Ok(())
821 }
822
823 pub fn delete_cf_opt<K: AsRef<[u8]>>(
824 &self,
825 cf: &impl AsColumnFamilyRef,
826 key: K,
827 writeopts: &WriteOptions,
828 ) -> Result<(), Error> {
829 let key = key.as_ref();
830 unsafe {
831 ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
832 self.inner,
833 writeopts.inner,
834 cf.inner(),
835 key.as_ptr() as *const c_char,
836 key.len() as size_t,
837 ));
838 }
839 Ok(())
840 }
841
842 pub fn iterator<'a: 'b, 'b>(
843 &'a self,
844 mode: IteratorMode,
845 ) -> DBIteratorWithThreadMode<'b, Self> {
846 let readopts = ReadOptions::default();
847 self.iterator_opt(mode, readopts)
848 }
849
850 pub fn iterator_opt<'a: 'b, 'b>(
851 &'a self,
852 mode: IteratorMode,
853 readopts: ReadOptions,
854 ) -> DBIteratorWithThreadMode<'b, Self> {
855 DBIteratorWithThreadMode::new(self, readopts, mode)
856 }
857
858 pub fn iterator_cf_opt<'a: 'b, 'b>(
861 &'a self,
862 cf_handle: &impl AsColumnFamilyRef,
863 readopts: ReadOptions,
864 mode: IteratorMode,
865 ) -> DBIteratorWithThreadMode<'b, Self> {
866 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
867 }
868
869 pub fn full_iterator<'a: 'b, 'b>(
873 &'a self,
874 mode: IteratorMode,
875 ) -> DBIteratorWithThreadMode<'b, Self> {
876 let mut opts = ReadOptions::default();
877 opts.set_total_order_seek(true);
878 DBIteratorWithThreadMode::new(self, opts, mode)
879 }
880
881 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
882 &'a self,
883 prefix: P,
884 ) -> DBIteratorWithThreadMode<'b, Self> {
885 let mut opts = ReadOptions::default();
886 opts.set_prefix_same_as_start(true);
887 DBIteratorWithThreadMode::new(
888 self,
889 opts,
890 IteratorMode::From(prefix.as_ref(), Direction::Forward),
891 )
892 }
893
894 pub fn iterator_cf<'a: 'b, 'b>(
895 &'a self,
896 cf_handle: &impl AsColumnFamilyRef,
897 mode: IteratorMode,
898 ) -> DBIteratorWithThreadMode<'b, Self> {
899 let opts = ReadOptions::default();
900 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
901 }
902
903 pub fn full_iterator_cf<'a: 'b, 'b>(
904 &'a self,
905 cf_handle: &impl AsColumnFamilyRef,
906 mode: IteratorMode,
907 ) -> DBIteratorWithThreadMode<'b, Self> {
908 let mut opts = ReadOptions::default();
909 opts.set_total_order_seek(true);
910 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
911 }
912
913 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
914 &'a self,
915 cf_handle: &impl AsColumnFamilyRef,
916 prefix: P,
917 ) -> DBIteratorWithThreadMode<'a, Self> {
918 let mut opts = ReadOptions::default();
919 opts.set_prefix_same_as_start(true);
920 DBIteratorWithThreadMode::<'a, Self>::new_cf(
921 self,
922 cf_handle.inner(),
923 opts,
924 IteratorMode::From(prefix.as_ref(), Direction::Forward),
925 )
926 }
927
928 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
930 let opts = ReadOptions::default();
931 DBRawIteratorWithThreadMode::new(self, opts)
932 }
933
934 pub fn raw_iterator_cf<'a: 'b, 'b>(
936 &'a self,
937 cf_handle: &impl AsColumnFamilyRef,
938 ) -> DBRawIteratorWithThreadMode<'b, Self> {
939 let opts = ReadOptions::default();
940 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
941 }
942
943 pub fn raw_iterator_opt<'a: 'b, 'b>(
945 &'a self,
946 readopts: ReadOptions,
947 ) -> DBRawIteratorWithThreadMode<'b, Self> {
948 DBRawIteratorWithThreadMode::new(self, readopts)
949 }
950
951 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
953 &'a self,
954 cf_handle: &impl AsColumnFamilyRef,
955 readopts: ReadOptions,
956 ) -> DBRawIteratorWithThreadMode<'b, Self> {
957 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
958 }
959
960 pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
961 SnapshotWithThreadMode::<Self>::new(self)
962 }
963
964 fn drop_column_family<C>(
965 &self,
966 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
967 _cf: C,
968 ) -> Result<(), Error> {
969 unsafe {
970 ffi_try!(ffi::rocksdb_drop_column_family(
972 self.inner as *mut ffi::rocksdb_t,
973 cf_inner
974 ));
975 }
976 Ok(())
979 }
980}
981
982impl TransactionDB<SingleThreaded> {
983 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
985 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
986 self.cfs
987 .cfs
988 .insert(name.as_ref().to_string(), ColumnFamily { inner });
989 Ok(())
990 }
991
992 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
994 self.cfs.cfs.get(name)
995 }
996
997 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
999 match self.cfs.cfs.remove(name) {
1000 Some(cf) => self.drop_column_family(cf.inner, cf),
1001 _ => Err(Error::new(format!("Invalid column family: {name}"))),
1002 }
1003 }
1004}
1005
1006impl TransactionDB<MultiThreaded> {
1007 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1009 let mut cfs = self.cfs.cfs.write();
1012 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1013 cfs.insert(
1014 name.as_ref().to_string(),
1015 Arc::new(UnboundColumnFamily { inner }),
1016 );
1017 Ok(())
1018 }
1019
1020 pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
1022 self.cfs
1023 .cfs
1024 .read()
1025 .get(name)
1026 .cloned()
1027 .map(UnboundColumnFamily::bound_column_family)
1028 }
1029
1030 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1033 match self.cfs.cfs.write().remove(name) {
1034 Some(cf) => self.drop_column_family(cf.inner, cf),
1035 _ => Err(Error::new(format!("Invalid column family: {name}"))),
1036 }
1037 }
1038
1039 fn property_value_impl<R>(
1048 name: impl CStrLike,
1049 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1050 parse: impl FnOnce(&str) -> Result<R, Error>,
1051 ) -> Result<Option<R>, Error> {
1052 let value = match name.bake() {
1053 Ok(prop_name) => get_property(prop_name.as_ptr()),
1054 Err(e) => {
1055 return Err(Error::new(format!(
1056 "Failed to convert property name to CString: {e}"
1057 )));
1058 }
1059 };
1060 if value.is_null() {
1061 return Ok(None);
1062 }
1063 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1064 Ok(s) => parse(s).map(|value| Some(value)),
1065 Err(e) => Err(Error::new(format!(
1066 "Failed to convert property value to string: {e}"
1067 ))),
1068 };
1069 unsafe {
1070 ffi::rocksdb_free(value as *mut c_void);
1071 }
1072 result
1073 }
1074
1075 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1080 Self::property_value_impl(
1081 name,
1082 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1083 |str_value| Ok(str_value.to_owned()),
1084 )
1085 }
1086
1087 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1088 value.parse::<u64>().map_err(|err| {
1089 Error::new(format!(
1090 "Failed to convert property value {value} to int: {err}"
1091 ))
1092 })
1093 }
1094
1095 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1100 Self::property_value_impl(
1101 name,
1102 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1103 Self::parse_property_int_value,
1104 )
1105 }
1106}
1107
1108impl<T: ThreadMode> Drop for TransactionDB<T> {
1109 fn drop(&mut self) {
1110 unsafe {
1111 self.prepared_transactions().clear();
1112 self.cfs.drop_all_cfs_internal();
1113 ffi::rocksdb_transactiondb_close(self.inner);
1114 }
1115 }
1116}