1use std::{
17 collections::BTreeMap,
18 ffi::CString,
19 fs, iter,
20 marker::PhantomData,
21 path::{Path, PathBuf},
22 ptr,
23 sync::{Arc, Mutex},
24};
25
26use crate::{
27 column_family::UnboundColumnFamily,
28 db::{convert_values, DBAccess},
29 db_options::OptionsMustOutliveDB,
30 ffi,
31 ffi_util::to_cpath,
32 AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
33 DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
34 IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
35 ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
36 WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
37};
38use ffi::rocksdb_transaction_t;
39use libc::{c_char, c_int, c_void, size_t};
40
41#[cfg(not(feature = "multi-threaded-cf"))]
42type DefaultThreadMode = crate::SingleThreaded;
43#[cfg(feature = "multi-threaded-cf")]
44type DefaultThreadMode = crate::MultiThreaded;
45
46pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
72 pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
73 cfs: T,
74 path: PathBuf,
75 prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
77 _outlive: Vec<OptionsMustOutliveDB>,
78}
79
80unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
81unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
82
83impl<T: ThreadMode> DBAccess for TransactionDB<T> {
84 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
85 ffi::rocksdb_transactiondb_create_snapshot(self.inner)
86 }
87
88 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
89 ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
90 }
91
92 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
93 ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner)
94 }
95
96 unsafe fn create_iterator_cf(
97 &self,
98 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
99 readopts: &ReadOptions,
100 ) -> *mut ffi::rocksdb_iterator_t {
101 ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
102 }
103
104 fn get_opt<K: AsRef<[u8]>>(
105 &self,
106 key: K,
107 readopts: &ReadOptions,
108 ) -> Result<Option<Vec<u8>>, Error> {
109 self.get_opt(key, readopts)
110 }
111
112 fn get_cf_opt<K: AsRef<[u8]>>(
113 &self,
114 cf: &impl AsColumnFamilyRef,
115 key: K,
116 readopts: &ReadOptions,
117 ) -> Result<Option<Vec<u8>>, Error> {
118 self.get_cf_opt(cf, key, readopts)
119 }
120
121 fn get_pinned_opt<K: AsRef<[u8]>>(
122 &self,
123 key: K,
124 readopts: &ReadOptions,
125 ) -> Result<Option<DBPinnableSlice>, Error> {
126 self.get_pinned_opt(key, readopts)
127 }
128
129 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
130 &self,
131 cf: &impl AsColumnFamilyRef,
132 key: K,
133 readopts: &ReadOptions,
134 ) -> Result<Option<DBPinnableSlice>, Error> {
135 self.get_pinned_cf_opt(cf, key, readopts)
136 }
137
138 fn multi_get_opt<K, I>(
139 &self,
140 keys: I,
141 readopts: &ReadOptions,
142 ) -> Vec<Result<Option<Vec<u8>>, Error>>
143 where
144 K: AsRef<[u8]>,
145 I: IntoIterator<Item = K>,
146 {
147 self.multi_get_opt(keys, readopts)
148 }
149
150 fn multi_get_cf_opt<'b, K, I, W>(
151 &self,
152 keys_cf: I,
153 readopts: &ReadOptions,
154 ) -> Vec<Result<Option<Vec<u8>>, Error>>
155 where
156 K: AsRef<[u8]>,
157 I: IntoIterator<Item = (&'b W, K)>,
158 W: AsColumnFamilyRef + 'b,
159 {
160 self.multi_get_cf_opt(keys_cf, readopts)
161 }
162}
163
164impl<T: ThreadMode> TransactionDB<T> {
165 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
167 let mut opts = Options::default();
168 opts.create_if_missing(true);
169 let txn_db_opts = TransactionDBOptions::default();
170 Self::open(&opts, &txn_db_opts, path)
171 }
172
173 pub fn open<P: AsRef<Path>>(
175 opts: &Options,
176 txn_db_opts: &TransactionDBOptions,
177 path: P,
178 ) -> Result<Self, Error> {
179 Self::open_cf(opts, txn_db_opts, path, None::<&str>)
180 }
181
182 pub fn open_cf<P, I, N>(
186 opts: &Options,
187 txn_db_opts: &TransactionDBOptions,
188 path: P,
189 cfs: I,
190 ) -> Result<Self, Error>
191 where
192 P: AsRef<Path>,
193 I: IntoIterator<Item = N>,
194 N: AsRef<str>,
195 {
196 let cfs = cfs
197 .into_iter()
198 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
199
200 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
201 }
202
203 pub fn open_cf_descriptors<P, I>(
205 opts: &Options,
206 txn_db_opts: &TransactionDBOptions,
207 path: P,
208 cfs: I,
209 ) -> Result<Self, Error>
210 where
211 P: AsRef<Path>,
212 I: IntoIterator<Item = ColumnFamilyDescriptor>,
213 {
214 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
215 }
216
217 fn open_cf_descriptors_internal<P, I>(
219 opts: &Options,
220 txn_db_opts: &TransactionDBOptions,
221 path: P,
222 cfs: I,
223 ) -> Result<Self, Error>
224 where
225 P: AsRef<Path>,
226 I: IntoIterator<Item = ColumnFamilyDescriptor>,
227 {
228 let cfs: Vec<_> = cfs.into_iter().collect();
229 let outlive = iter::once(opts.outlive.clone())
230 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
231 .collect();
232
233 let cpath = to_cpath(&path)?;
234
235 if let Err(e) = fs::create_dir_all(&path) {
236 return Err(Error::new(format!(
237 "Failed to create RocksDB directory: `{:?}`.",
238 e
239 )));
240 }
241
242 let db: *mut ffi::rocksdb_transactiondb_t;
243 let mut cf_map = BTreeMap::new();
244
245 if cfs.is_empty() {
246 db = Self::open_raw(opts, txn_db_opts, &cpath)?;
247 } else {
248 let mut cfs_v = cfs;
249 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
251 cfs_v.push(ColumnFamilyDescriptor {
252 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
253 options: Options::default(),
254 });
255 }
256 let c_cfs: Vec<CString> = cfs_v
259 .iter()
260 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
261 .collect();
262
263 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
264
265 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
267
268 let cfopts: Vec<_> = cfs_v
269 .iter()
270 .map(|cf| cf.options.inner as *const _)
271 .collect();
272
273 db = Self::open_cf_raw(
274 opts,
275 txn_db_opts,
276 &cpath,
277 &cfs_v,
278 &cfnames,
279 &cfopts,
280 &mut cfhandles,
281 )?;
282
283 for handle in &cfhandles {
284 if handle.is_null() {
285 return Err(Error::new(
286 "Received null column family handle from DB.".to_owned(),
287 ));
288 }
289 }
290
291 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
292 cf_map.insert(cf_desc.name.clone(), inner);
293 }
294 }
295
296 if db.is_null() {
297 return Err(Error::new("Could not initialize database.".to_owned()));
298 }
299
300 let prepared = unsafe {
301 let mut cnt = 0;
302 let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
303 let mut vec = vec![std::ptr::null_mut(); cnt];
304 std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
305 if !ptr.is_null() {
306 ffi::rocksdb_free(ptr as *mut c_void);
307 }
308 vec
309 };
310
311 Ok(TransactionDB {
312 inner: db,
313 cfs: T::new_cf_map_internal(cf_map),
314 path: path.as_ref().to_path_buf(),
315 prepared: Mutex::new(prepared),
316 _outlive: outlive,
317 })
318 }
319
320 fn open_raw(
321 opts: &Options,
322 txn_db_opts: &TransactionDBOptions,
323 cpath: &CString,
324 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
325 unsafe {
326 let db = ffi_try!(ffi::rocksdb_transactiondb_open(
327 opts.inner,
328 txn_db_opts.inner,
329 cpath.as_ptr()
330 ));
331 Ok(db)
332 }
333 }
334
335 fn open_cf_raw(
336 opts: &Options,
337 txn_db_opts: &TransactionDBOptions,
338 cpath: &CString,
339 cfs_v: &[ColumnFamilyDescriptor],
340 cfnames: &[*const c_char],
341 cfopts: &[*const ffi::rocksdb_options_t],
342 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
343 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
344 unsafe {
345 let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
346 opts.inner,
347 txn_db_opts.inner,
348 cpath.as_ptr(),
349 cfs_v.len() as c_int,
350 cfnames.as_ptr(),
351 cfopts.as_ptr(),
352 cfhandles.as_mut_ptr(),
353 ));
354 Ok(db)
355 }
356 }
357
358 fn create_inner_cf_handle(
359 &self,
360 name: &str,
361 opts: &Options,
362 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
363 let cf_name = if let Ok(c) = CString::new(name.as_bytes()) {
364 c
365 } else {
366 return Err(Error::new(
367 "Failed to convert path to CString when creating cf".to_owned(),
368 ));
369 };
370 Ok(unsafe {
371 ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
372 self.inner,
373 opts.inner,
374 cf_name.as_ptr(),
375 ))
376 })
377 }
378
379 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
380 DB::list_cf(opts, path)
381 }
382
383 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
384 DB::destroy(opts, path)
385 }
386
387 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
388 DB::repair(opts, path)
389 }
390
391 pub fn path(&self) -> &Path {
392 self.path.as_path()
393 }
394
395 pub fn transaction(&self) -> Transaction<Self> {
397 self.transaction_opt(&WriteOptions::default(), &TransactionOptions::default())
398 }
399
400 pub fn transaction_opt<'a>(
402 &'a self,
403 write_opts: &WriteOptions,
404 txn_opts: &TransactionOptions,
405 ) -> Transaction<'a, Self> {
406 Transaction {
407 inner: unsafe {
408 ffi::rocksdb_transaction_begin(
409 self.inner,
410 write_opts.inner,
411 txn_opts.inner,
412 std::ptr::null_mut(),
413 )
414 },
415 _marker: PhantomData::default(),
416 }
417 }
418
419 pub fn prepared_transactions(&self) -> Vec<Transaction<Self>> {
424 self.prepared
425 .lock()
426 .unwrap()
427 .drain(0..)
428 .map(|inner| Transaction {
429 inner,
430 _marker: PhantomData::default(),
431 })
432 .collect()
433 }
434
435 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
437 self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
438 }
439
440 pub fn get_cf<K: AsRef<[u8]>>(
442 &self,
443 cf: &impl AsColumnFamilyRef,
444 key: K,
445 ) -> Result<Option<Vec<u8>>, Error> {
446 self.get_pinned_cf(cf, key)
447 .map(|x| x.map(|v| v.as_ref().to_vec()))
448 }
449
450 pub fn get_opt<K: AsRef<[u8]>>(
452 &self,
453 key: K,
454 readopts: &ReadOptions,
455 ) -> Result<Option<Vec<u8>>, Error> {
456 self.get_pinned_opt(key, readopts)
457 .map(|x| x.map(|v| v.as_ref().to_vec()))
458 }
459
460 pub fn get_cf_opt<K: AsRef<[u8]>>(
462 &self,
463 cf: &impl AsColumnFamilyRef,
464 key: K,
465 readopts: &ReadOptions,
466 ) -> Result<Option<Vec<u8>>, Error> {
467 self.get_pinned_cf_opt(cf, key, readopts)
468 .map(|x| x.map(|v| v.as_ref().to_vec()))
469 }
470
471 pub fn get_pinned<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBPinnableSlice>, Error> {
472 self.get_pinned_opt(key, &ReadOptions::default())
473 }
474
475 pub fn get_pinned_cf<K: AsRef<[u8]>>(
477 &self,
478 cf: &impl AsColumnFamilyRef,
479 key: K,
480 ) -> Result<Option<DBPinnableSlice>, Error> {
481 self.get_pinned_cf_opt(cf, key, &ReadOptions::default())
482 }
483
484 pub fn get_pinned_opt<K: AsRef<[u8]>>(
486 &self,
487 key: K,
488 readopts: &ReadOptions,
489 ) -> Result<Option<DBPinnableSlice>, Error> {
490 let key = key.as_ref();
491 unsafe {
492 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
493 self.inner,
494 readopts.inner,
495 key.as_ptr() as *const c_char,
496 key.len() as size_t,
497 ));
498 if val.is_null() {
499 Ok(None)
500 } else {
501 Ok(Some(DBPinnableSlice::from_c(val)))
502 }
503 }
504 }
505
506 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
508 &self,
509 cf: &impl AsColumnFamilyRef,
510 key: K,
511 readopts: &ReadOptions,
512 ) -> Result<Option<DBPinnableSlice>, Error> {
513 unsafe {
514 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
515 self.inner,
516 readopts.inner,
517 cf.inner(),
518 key.as_ref().as_ptr() as *const c_char,
519 key.as_ref().len() as size_t,
520 ));
521 if val.is_null() {
522 Ok(None)
523 } else {
524 Ok(Some(DBPinnableSlice::from_c(val)))
525 }
526 }
527 }
528
529 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
531 where
532 K: AsRef<[u8]>,
533 I: IntoIterator<Item = K>,
534 {
535 self.multi_get_opt(keys, &ReadOptions::default())
536 }
537
538 pub fn multi_get_opt<K, I>(
540 &self,
541 keys: I,
542 readopts: &ReadOptions,
543 ) -> Vec<Result<Option<Vec<u8>>, Error>>
544 where
545 K: AsRef<[u8]>,
546 I: IntoIterator<Item = K>,
547 {
548 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
549 .into_iter()
550 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
551 .unzip();
552 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
553
554 let mut values = vec![ptr::null_mut(); keys.len()];
555 let mut values_sizes = vec![0_usize; keys.len()];
556 let mut errors = vec![ptr::null_mut(); keys.len()];
557 unsafe {
558 ffi::rocksdb_transactiondb_multi_get(
559 self.inner,
560 readopts.inner,
561 ptr_keys.len(),
562 ptr_keys.as_ptr(),
563 keys_sizes.as_ptr(),
564 values.as_mut_ptr(),
565 values_sizes.as_mut_ptr(),
566 errors.as_mut_ptr(),
567 );
568 }
569
570 convert_values(values, values_sizes, errors)
571 }
572
573 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
575 &'a self,
576 keys: I,
577 ) -> Vec<Result<Option<Vec<u8>>, Error>>
578 where
579 K: AsRef<[u8]>,
580 I: IntoIterator<Item = (&'b W, K)>,
581 W: 'b + AsColumnFamilyRef,
582 {
583 self.multi_get_cf_opt(keys, &ReadOptions::default())
584 }
585
586 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
588 &'a self,
589 keys: I,
590 readopts: &ReadOptions,
591 ) -> Vec<Result<Option<Vec<u8>>, Error>>
592 where
593 K: AsRef<[u8]>,
594 I: IntoIterator<Item = (&'b W, K)>,
595 W: 'b + AsColumnFamilyRef,
596 {
597 let (cfs_and_keys, keys_sizes): (Vec<(_, Box<[u8]>)>, Vec<_>) = keys
598 .into_iter()
599 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
600 .unzip();
601 let ptr_keys: Vec<_> = cfs_and_keys
602 .iter()
603 .map(|(_, k)| k.as_ptr() as *const c_char)
604 .collect();
605 let ptr_cfs: Vec<_> = cfs_and_keys
606 .iter()
607 .map(|(c, _)| c.inner() as *const _)
608 .collect();
609
610 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
611 let mut values_sizes = vec![0_usize; ptr_keys.len()];
612 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
613 unsafe {
614 ffi::rocksdb_transactiondb_multi_get_cf(
615 self.inner,
616 readopts.inner,
617 ptr_cfs.as_ptr(),
618 ptr_keys.len(),
619 ptr_keys.as_ptr(),
620 keys_sizes.as_ptr(),
621 values.as_mut_ptr(),
622 values_sizes.as_mut_ptr(),
623 errors.as_mut_ptr(),
624 );
625 }
626
627 convert_values(values, values_sizes, errors)
628 }
629
630 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
631 where
632 K: AsRef<[u8]>,
633 V: AsRef<[u8]>,
634 {
635 self.put_opt(key, value, &WriteOptions::default())
636 }
637
638 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
639 where
640 K: AsRef<[u8]>,
641 V: AsRef<[u8]>,
642 {
643 self.put_cf_opt(cf, key, value, &WriteOptions::default())
644 }
645
646 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
647 where
648 K: AsRef<[u8]>,
649 V: AsRef<[u8]>,
650 {
651 unsafe {
652 ffi_try!(ffi::rocksdb_transactiondb_put(
653 self.inner,
654 writeopts.inner,
655 key.as_ref().as_ptr() as *const c_char,
656 key.as_ref().len() as size_t,
657 value.as_ref().as_ptr() as *const c_char,
658 value.as_ref().len() as size_t
659 ));
660 }
661 Ok(())
662 }
663
664 pub fn put_cf_opt<K, V>(
665 &self,
666 cf: &impl AsColumnFamilyRef,
667 key: K,
668 value: V,
669 writeopts: &WriteOptions,
670 ) -> Result<(), Error>
671 where
672 K: AsRef<[u8]>,
673 V: AsRef<[u8]>,
674 {
675 unsafe {
676 ffi_try!(ffi::rocksdb_transactiondb_put_cf(
677 self.inner,
678 writeopts.inner,
679 cf.inner(),
680 key.as_ref().as_ptr() as *const c_char,
681 key.as_ref().len() as size_t,
682 value.as_ref().as_ptr() as *const c_char,
683 value.as_ref().len() as size_t
684 ));
685 }
686 Ok(())
687 }
688
689 pub fn write(&self, batch: WriteBatchWithTransaction<true>) -> Result<(), Error> {
690 self.write_opt(batch, &WriteOptions::default())
691 }
692
693 pub fn write_opt(
694 &self,
695 batch: WriteBatchWithTransaction<true>,
696 writeopts: &WriteOptions,
697 ) -> Result<(), Error> {
698 unsafe {
699 ffi_try!(ffi::rocksdb_transactiondb_write(
700 self.inner,
701 writeopts.inner,
702 batch.inner
703 ));
704 }
705 Ok(())
706 }
707
708 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
709 where
710 K: AsRef<[u8]>,
711 V: AsRef<[u8]>,
712 {
713 self.merge_opt(key, value, &WriteOptions::default())
714 }
715
716 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
717 where
718 K: AsRef<[u8]>,
719 V: AsRef<[u8]>,
720 {
721 self.merge_cf_opt(cf, key, value, &WriteOptions::default())
722 }
723
724 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
725 where
726 K: AsRef<[u8]>,
727 V: AsRef<[u8]>,
728 {
729 unsafe {
730 ffi_try!(ffi::rocksdb_transactiondb_merge(
731 self.inner,
732 writeopts.inner,
733 key.as_ref().as_ptr() as *const c_char,
734 key.as_ref().len() as size_t,
735 value.as_ref().as_ptr() as *const c_char,
736 value.as_ref().len() as size_t,
737 ));
738 Ok(())
739 }
740 }
741
742 pub fn merge_cf_opt<K, V>(
743 &self,
744 cf: &impl AsColumnFamilyRef,
745 key: K,
746 value: V,
747 writeopts: &WriteOptions,
748 ) -> Result<(), Error>
749 where
750 K: AsRef<[u8]>,
751 V: AsRef<[u8]>,
752 {
753 unsafe {
754 ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
755 self.inner,
756 writeopts.inner,
757 cf.inner(),
758 key.as_ref().as_ptr() as *const c_char,
759 key.as_ref().len() as size_t,
760 value.as_ref().as_ptr() as *const c_char,
761 value.as_ref().len() as size_t,
762 ));
763 Ok(())
764 }
765 }
766
767 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
768 self.delete_opt(key, &WriteOptions::default())
769 }
770
771 pub fn delete_cf<K: AsRef<[u8]>>(
772 &self,
773 cf: &impl AsColumnFamilyRef,
774 key: K,
775 ) -> Result<(), Error> {
776 self.delete_cf_opt(cf, key, &WriteOptions::default())
777 }
778
779 pub fn delete_opt<K: AsRef<[u8]>>(
780 &self,
781 key: K,
782 writeopts: &WriteOptions,
783 ) -> Result<(), Error> {
784 unsafe {
785 ffi_try!(ffi::rocksdb_transactiondb_delete(
786 self.inner,
787 writeopts.inner,
788 key.as_ref().as_ptr() as *const c_char,
789 key.as_ref().len() as size_t,
790 ));
791 }
792 Ok(())
793 }
794
795 pub fn delete_cf_opt<K: AsRef<[u8]>>(
796 &self,
797 cf: &impl AsColumnFamilyRef,
798 key: K,
799 writeopts: &WriteOptions,
800 ) -> Result<(), Error> {
801 unsafe {
802 ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
803 self.inner,
804 writeopts.inner,
805 cf.inner(),
806 key.as_ref().as_ptr() as *const c_char,
807 key.as_ref().len() as size_t,
808 ));
809 }
810 Ok(())
811 }
812
813 pub fn iterator<'a: 'b, 'b>(
814 &'a self,
815 mode: IteratorMode,
816 ) -> DBIteratorWithThreadMode<'b, Self> {
817 let readopts = ReadOptions::default();
818 self.iterator_opt(mode, readopts)
819 }
820
821 pub fn iterator_opt<'a: 'b, 'b>(
822 &'a self,
823 mode: IteratorMode,
824 readopts: ReadOptions,
825 ) -> DBIteratorWithThreadMode<'b, Self> {
826 DBIteratorWithThreadMode::new(self, readopts, mode)
827 }
828
829 pub fn iterator_cf_opt<'a: 'b, 'b>(
832 &'a self,
833 cf_handle: &impl AsColumnFamilyRef,
834 readopts: ReadOptions,
835 mode: IteratorMode,
836 ) -> DBIteratorWithThreadMode<'b, Self> {
837 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
838 }
839
840 pub fn full_iterator<'a: 'b, 'b>(
844 &'a self,
845 mode: IteratorMode,
846 ) -> DBIteratorWithThreadMode<'b, Self> {
847 let mut opts = ReadOptions::default();
848 opts.set_total_order_seek(true);
849 DBIteratorWithThreadMode::new(self, opts, mode)
850 }
851
852 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
853 &'a self,
854 prefix: P,
855 ) -> DBIteratorWithThreadMode<'b, Self> {
856 let mut opts = ReadOptions::default();
857 opts.set_prefix_same_as_start(true);
858 DBIteratorWithThreadMode::new(
859 self,
860 opts,
861 IteratorMode::From(prefix.as_ref(), Direction::Forward),
862 )
863 }
864
865 pub fn iterator_cf<'a: 'b, 'b>(
866 &'a self,
867 cf_handle: &impl AsColumnFamilyRef,
868 mode: IteratorMode,
869 ) -> DBIteratorWithThreadMode<'b, Self> {
870 let opts = ReadOptions::default();
871 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
872 }
873
874 pub fn full_iterator_cf<'a: 'b, 'b>(
875 &'a self,
876 cf_handle: &impl AsColumnFamilyRef,
877 mode: IteratorMode,
878 ) -> DBIteratorWithThreadMode<'b, Self> {
879 let mut opts = ReadOptions::default();
880 opts.set_total_order_seek(true);
881 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
882 }
883
884 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
885 &'a self,
886 cf_handle: &impl AsColumnFamilyRef,
887 prefix: P,
888 ) -> DBIteratorWithThreadMode<'a, Self> {
889 let mut opts = ReadOptions::default();
890 opts.set_prefix_same_as_start(true);
891 DBIteratorWithThreadMode::<'a, Self>::new_cf(
892 self,
893 cf_handle.inner(),
894 opts,
895 IteratorMode::From(prefix.as_ref(), Direction::Forward),
896 )
897 }
898
899 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
901 let opts = ReadOptions::default();
902 DBRawIteratorWithThreadMode::new(self, opts)
903 }
904
905 pub fn raw_iterator_cf<'a: 'b, 'b>(
907 &'a self,
908 cf_handle: &impl AsColumnFamilyRef,
909 ) -> DBRawIteratorWithThreadMode<'b, Self> {
910 let opts = ReadOptions::default();
911 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
912 }
913
914 pub fn raw_iterator_opt<'a: 'b, 'b>(
916 &'a self,
917 readopts: ReadOptions,
918 ) -> DBRawIteratorWithThreadMode<'b, Self> {
919 DBRawIteratorWithThreadMode::new(self, readopts)
920 }
921
922 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
924 &'a self,
925 cf_handle: &impl AsColumnFamilyRef,
926 readopts: ReadOptions,
927 ) -> DBRawIteratorWithThreadMode<'b, Self> {
928 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
929 }
930
931 pub fn snapshot(&self) -> SnapshotWithThreadMode<Self> {
932 SnapshotWithThreadMode::<Self>::new(self)
933 }
934}
935
936impl TransactionDB<SingleThreaded> {
937 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
939 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
940 self.cfs
941 .cfs
942 .insert(name.as_ref().to_string(), ColumnFamily { inner });
943 Ok(())
944 }
945
946 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
948 self.cfs.cfs.get(name)
949 }
950}
951
952impl TransactionDB<MultiThreaded> {
953 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
955 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
956 self.cfs.cfs.write().unwrap().insert(
957 name.as_ref().to_string(),
958 Arc::new(UnboundColumnFamily { inner }),
959 );
960 Ok(())
961 }
962
963 pub fn cf_handle(&self, name: &str) -> Option<Arc<BoundColumnFamily>> {
965 self.cfs
966 .cfs
967 .read()
968 .unwrap()
969 .get(name)
970 .cloned()
971 .map(UnboundColumnFamily::bound_column_family)
972 }
973}
974
975impl<T: ThreadMode> Drop for TransactionDB<T> {
976 fn drop(&mut self) {
977 unsafe {
978 self.prepared_transactions().clear();
979 self.cfs.drop_all_cfs_internal();
980 ffi::rocksdb_transactiondb_close(self.inner);
981 }
982 }
983}