1use std::{
17 collections::BTreeMap,
18 ffi::CString,
19 fs, iter,
20 marker::PhantomData,
21 path::{Path, PathBuf},
22 ptr,
23 sync::{Arc, Mutex},
24};
25
26use crate::CStrLike;
27use std::ffi::CStr;
28
29use crate::column_family::ColumnFamilyTtl;
30use crate::{
31 column_family::UnboundColumnFamily,
32 db::{convert_values, DBAccess},
33 db_options::OptionsMustOutliveDB,
34 ffi,
35 ffi_util::to_cpath,
36 AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor,
37 DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode, Direction, Error,
38 IteratorMode, MultiThreaded, Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode,
39 ThreadMode, Transaction, TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction,
40 WriteOptions, DB, DEFAULT_COLUMN_FAMILY_NAME,
41};
42use ffi::rocksdb_transaction_t;
43use libc::{c_char, c_int, c_void, size_t};
44
45thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
52thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
53
54#[cfg(not(feature = "multi-threaded-cf"))]
55type DefaultThreadMode = crate::SingleThreaded;
56#[cfg(feature = "multi-threaded-cf")]
57type DefaultThreadMode = crate::MultiThreaded;
58
59pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
89 pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
90 cfs: T,
91 path: PathBuf,
92 prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
94 _outlive: Vec<OptionsMustOutliveDB>,
95}
96
97unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
98unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
99
100impl<T: ThreadMode> DBAccess for TransactionDB<T> {
101 unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
102 unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
103 }
104
105 unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
106 unsafe {
107 ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
108 }
109 }
110
111 unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
112 unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
113 }
114
115 unsafe fn create_iterator_cf(
116 &self,
117 cf_handle: *mut ffi::rocksdb_column_family_handle_t,
118 readopts: &ReadOptions,
119 ) -> *mut ffi::rocksdb_iterator_t {
120 unsafe {
121 ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
122 }
123 }
124
125 fn get_opt<K: AsRef<[u8]>>(
126 &self,
127 key: K,
128 readopts: &ReadOptions,
129 ) -> Result<Option<Vec<u8>>, Error> {
130 self.get_opt(key, readopts)
131 }
132
133 fn get_cf_opt<K: AsRef<[u8]>>(
134 &self,
135 cf: &impl AsColumnFamilyRef,
136 key: K,
137 readopts: &ReadOptions,
138 ) -> Result<Option<Vec<u8>>, Error> {
139 self.get_cf_opt(cf, key, readopts)
140 }
141
142 fn get_pinned_opt<K: AsRef<[u8]>>(
143 &'_ self,
144 key: K,
145 readopts: &ReadOptions,
146 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
147 self.get_pinned_opt(key, readopts)
148 }
149
150 fn get_pinned_cf_opt<K: AsRef<[u8]>>(
151 &'_ self,
152 cf: &impl AsColumnFamilyRef,
153 key: K,
154 readopts: &ReadOptions,
155 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
156 self.get_pinned_cf_opt(cf, key, readopts)
157 }
158
159 fn multi_get_opt<K, I>(
160 &self,
161 keys: I,
162 readopts: &ReadOptions,
163 ) -> Vec<Result<Option<Vec<u8>>, Error>>
164 where
165 K: AsRef<[u8]>,
166 I: IntoIterator<Item = K>,
167 {
168 self.multi_get_opt(keys, readopts)
169 }
170
171 fn multi_get_cf_opt<'b, K, I, W>(
172 &self,
173 keys_cf: I,
174 readopts: &ReadOptions,
175 ) -> Vec<Result<Option<Vec<u8>>, Error>>
176 where
177 K: AsRef<[u8]>,
178 I: IntoIterator<Item = (&'b W, K)>,
179 W: AsColumnFamilyRef + 'b,
180 {
181 self.multi_get_cf_opt(keys_cf, readopts)
182 }
183}
184
185impl<T: ThreadMode> TransactionDB<T> {
186 pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
188 let mut opts = Options::default();
189 opts.create_if_missing(true);
190 let txn_db_opts = TransactionDBOptions::default();
191 Self::open(&opts, &txn_db_opts, path)
192 }
193
194 pub fn open<P: AsRef<Path>>(
196 opts: &Options,
197 txn_db_opts: &TransactionDBOptions,
198 path: P,
199 ) -> Result<Self, Error> {
200 Self::open_cf(opts, txn_db_opts, path, None::<&str>)
201 }
202
203 pub fn open_cf<P, I, N>(
207 opts: &Options,
208 txn_db_opts: &TransactionDBOptions,
209 path: P,
210 cfs: I,
211 ) -> Result<Self, Error>
212 where
213 P: AsRef<Path>,
214 I: IntoIterator<Item = N>,
215 N: AsRef<str>,
216 {
217 let cfs = cfs
218 .into_iter()
219 .map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
220
221 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
222 }
223
224 pub fn open_cf_descriptors<P, I>(
226 opts: &Options,
227 txn_db_opts: &TransactionDBOptions,
228 path: P,
229 cfs: I,
230 ) -> Result<Self, Error>
231 where
232 P: AsRef<Path>,
233 I: IntoIterator<Item = ColumnFamilyDescriptor>,
234 {
235 Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
236 }
237
238 fn open_cf_descriptors_internal<P, I>(
240 opts: &Options,
241 txn_db_opts: &TransactionDBOptions,
242 path: P,
243 cfs: I,
244 ) -> Result<Self, Error>
245 where
246 P: AsRef<Path>,
247 I: IntoIterator<Item = ColumnFamilyDescriptor>,
248 {
249 let cfs: Vec<_> = cfs.into_iter().collect();
250 let outlive = iter::once(opts.outlive.clone())
251 .chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
252 .collect();
253
254 let cpath = to_cpath(&path)?;
255
256 if let Err(e) = fs::create_dir_all(&path) {
257 return Err(Error::new(format!(
258 "Failed to create RocksDB directory: `{e:?}`."
259 )));
260 }
261
262 let db: *mut ffi::rocksdb_transactiondb_t;
263 let mut cf_map = BTreeMap::new();
264
265 if cfs.is_empty() {
266 db = Self::open_raw(opts, txn_db_opts, &cpath)?;
267 } else {
268 let mut cfs_v = cfs;
269 if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
271 cfs_v.push(ColumnFamilyDescriptor {
272 name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
273 options: Options::default(),
274 ttl: ColumnFamilyTtl::SameAsDb, });
276 }
277 let c_cfs: Vec<CString> = cfs_v
280 .iter()
281 .map(|cf| CString::new(cf.name.as_bytes()).unwrap())
282 .collect();
283
284 let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
285
286 let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
288
289 let cfopts: Vec<_> = cfs_v
290 .iter()
291 .map(|cf| cf.options.inner.cast_const())
292 .collect();
293
294 db = Self::open_cf_raw(
295 opts,
296 txn_db_opts,
297 &cpath,
298 &cfs_v,
299 &cfnames,
300 &cfopts,
301 &mut cfhandles,
302 )?;
303
304 for handle in &cfhandles {
305 if handle.is_null() {
306 return Err(Error::new(
307 "Received null column family handle from DB.".to_owned(),
308 ));
309 }
310 }
311
312 for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
313 cf_map.insert(cf_desc.name.clone(), inner);
314 }
315 }
316
317 if db.is_null() {
318 return Err(Error::new("Could not initialize database.".to_owned()));
319 }
320
321 let prepared = unsafe {
322 let mut cnt = 0;
323 let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &mut cnt);
324 let mut vec = vec![std::ptr::null_mut(); cnt];
325 if !ptr.is_null() {
326 std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
327 ffi::rocksdb_free(ptr as *mut c_void);
328 }
329 vec
330 };
331
332 Ok(TransactionDB {
333 inner: db,
334 cfs: T::new_cf_map_internal(cf_map),
335 path: path.as_ref().to_path_buf(),
336 prepared: Mutex::new(prepared),
337 _outlive: outlive,
338 })
339 }
340
341 fn open_raw(
342 opts: &Options,
343 txn_db_opts: &TransactionDBOptions,
344 cpath: &CString,
345 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
346 unsafe {
347 let db = ffi_try!(ffi::rocksdb_transactiondb_open(
348 opts.inner,
349 txn_db_opts.inner,
350 cpath.as_ptr()
351 ));
352 Ok(db)
353 }
354 }
355
356 fn open_cf_raw(
357 opts: &Options,
358 txn_db_opts: &TransactionDBOptions,
359 cpath: &CString,
360 cfs_v: &[ColumnFamilyDescriptor],
361 cfnames: &[*const c_char],
362 cfopts: &[*const ffi::rocksdb_options_t],
363 cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
364 ) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
365 unsafe {
366 let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
367 opts.inner,
368 txn_db_opts.inner,
369 cpath.as_ptr(),
370 cfs_v.len() as c_int,
371 cfnames.as_ptr(),
372 cfopts.as_ptr(),
373 cfhandles.as_mut_ptr(),
374 ));
375 Ok(db)
376 }
377 }
378
379 fn create_inner_cf_handle(
380 &self,
381 name: &str,
382 opts: &Options,
383 ) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
384 let cf_name = CString::new(name.as_bytes()).map_err(|_| {
385 Error::new("Failed to convert path to CString when creating cf".to_owned())
386 })?;
387
388 Ok(unsafe {
389 ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
390 self.inner,
391 opts.inner,
392 cf_name.as_ptr(),
393 ))
394 })
395 }
396
397 pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
398 DB::list_cf(opts, path)
399 }
400
401 pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
402 DB::destroy(opts, path)
403 }
404
405 pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
406 DB::repair(opts, path)
407 }
408
409 pub fn path(&self) -> &Path {
410 self.path.as_path()
411 }
412
413 pub fn transaction(&'_ self) -> Transaction<'_, Self> {
415 DEFAULT_WRITE_OPTS.with(|opts| self.transaction_opt(opts, &TransactionOptions::default()))
416 }
417
418 pub fn transaction_opt<'a>(
420 &'a self,
421 write_opts: &WriteOptions,
422 txn_opts: &TransactionOptions,
423 ) -> Transaction<'a, Self> {
424 Transaction {
425 inner: unsafe {
426 ffi::rocksdb_transaction_begin(
427 self.inner,
428 write_opts.inner,
429 txn_opts.inner,
430 std::ptr::null_mut(),
431 )
432 },
433 _marker: PhantomData,
434 }
435 }
436
437 pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
442 self.prepared
443 .lock()
444 .unwrap()
445 .drain(0..)
446 .map(|inner| Transaction {
447 inner,
448 _marker: PhantomData,
449 })
450 .collect()
451 }
452
453 pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
455 self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
456 }
457
458 pub fn get_cf<K: AsRef<[u8]>>(
460 &self,
461 cf: &impl AsColumnFamilyRef,
462 key: K,
463 ) -> Result<Option<Vec<u8>>, Error> {
464 self.get_pinned_cf(cf, key)
465 .map(|x| x.map(|v| v.as_ref().to_vec()))
466 }
467
468 pub fn get_opt<K: AsRef<[u8]>>(
470 &self,
471 key: K,
472 readopts: &ReadOptions,
473 ) -> Result<Option<Vec<u8>>, Error> {
474 self.get_pinned_opt(key, readopts)
475 .map(|x| x.map(|v| v.as_ref().to_vec()))
476 }
477
478 pub fn get_cf_opt<K: AsRef<[u8]>>(
480 &self,
481 cf: &impl AsColumnFamilyRef,
482 key: K,
483 readopts: &ReadOptions,
484 ) -> Result<Option<Vec<u8>>, Error> {
485 self.get_pinned_cf_opt(cf, key, readopts)
486 .map(|x| x.map(|v| v.as_ref().to_vec()))
487 }
488
489 pub fn get_pinned<K: AsRef<[u8]>>(
490 &'_ self,
491 key: K,
492 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
493 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
494 }
495
496 pub fn get_pinned_cf<K: AsRef<[u8]>>(
498 &'_ self,
499 cf: &impl AsColumnFamilyRef,
500 key: K,
501 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
502 DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
503 }
504
505 pub fn get_pinned_opt<K: AsRef<[u8]>>(
507 &'_ self,
508 key: K,
509 readopts: &ReadOptions,
510 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
511 let key = key.as_ref();
512 unsafe {
513 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
514 self.inner,
515 readopts.inner,
516 key.as_ptr() as *const c_char,
517 key.len() as size_t,
518 ));
519 if val.is_null() {
520 Ok(None)
521 } else {
522 Ok(Some(DBPinnableSlice::from_c(val)))
523 }
524 }
525 }
526
527 pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
529 &'_ self,
530 cf: &impl AsColumnFamilyRef,
531 key: K,
532 readopts: &ReadOptions,
533 ) -> Result<Option<DBPinnableSlice<'_>>, Error> {
534 let key = key.as_ref();
535 unsafe {
536 let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
537 self.inner,
538 readopts.inner,
539 cf.inner(),
540 key.as_ptr() as *const c_char,
541 key.len() as size_t,
542 ));
543 if val.is_null() {
544 Ok(None)
545 } else {
546 Ok(Some(DBPinnableSlice::from_c(val)))
547 }
548 }
549 }
550
551 pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
553 where
554 K: AsRef<[u8]>,
555 I: IntoIterator<Item = K>,
556 {
557 DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
558 }
559
560 pub fn multi_get_opt<K, I>(
562 &self,
563 keys: I,
564 readopts: &ReadOptions,
565 ) -> Vec<Result<Option<Vec<u8>>, Error>>
566 where
567 K: AsRef<[u8]>,
568 I: IntoIterator<Item = K>,
569 {
570 let owned_keys: Vec<K> = keys.into_iter().collect();
571 let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
572 let ptr_keys: Vec<*const c_char> = owned_keys
573 .iter()
574 .map(|k| k.as_ref().as_ptr() as *const c_char)
575 .collect();
576
577 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
578 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
579 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
580 unsafe {
581 ffi::rocksdb_transactiondb_multi_get(
582 self.inner,
583 readopts.inner,
584 ptr_keys.len(),
585 ptr_keys.as_ptr(),
586 keys_sizes.as_ptr(),
587 values.as_mut_ptr(),
588 values_sizes.as_mut_ptr(),
589 errors.as_mut_ptr(),
590 );
591 }
592
593 unsafe {
594 values.set_len(ptr_keys.len());
595 values_sizes.set_len(ptr_keys.len());
596 errors.set_len(ptr_keys.len());
597 }
598
599 convert_values(values, values_sizes, errors)
600 }
601
602 pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
604 &'a self,
605 keys: I,
606 ) -> Vec<Result<Option<Vec<u8>>, Error>>
607 where
608 K: AsRef<[u8]>,
609 I: IntoIterator<Item = (&'b W, K)>,
610 W: 'b + AsColumnFamilyRef,
611 {
612 DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
613 }
614
615 pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
617 &'a self,
618 keys: I,
619 readopts: &ReadOptions,
620 ) -> Vec<Result<Option<Vec<u8>>, Error>>
621 where
622 K: AsRef<[u8]>,
623 I: IntoIterator<Item = (&'b W, K)>,
624 W: 'b + AsColumnFamilyRef,
625 {
626 let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
627 let keys_sizes: Vec<usize> = cfs_and_owned_keys
628 .iter()
629 .map(|(_, k)| k.as_ref().len())
630 .collect();
631 let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
632 .iter()
633 .map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
634 .collect();
635 let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
636 .iter()
637 .map(|(c, _)| c.inner().cast_const())
638 .collect();
639 let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
640 let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
641 let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
642 unsafe {
643 ffi::rocksdb_transactiondb_multi_get_cf(
644 self.inner,
645 readopts.inner,
646 ptr_cfs.as_ptr(),
647 ptr_keys.len(),
648 ptr_keys.as_ptr(),
649 keys_sizes.as_ptr(),
650 values.as_mut_ptr(),
651 values_sizes.as_mut_ptr(),
652 errors.as_mut_ptr(),
653 );
654 }
655
656 unsafe {
657 values.set_len(ptr_keys.len());
658 values_sizes.set_len(ptr_keys.len());
659 errors.set_len(ptr_keys.len());
660 }
661
662 convert_values(values, values_sizes, errors)
663 }
664
665 pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
666 where
667 K: AsRef<[u8]>,
668 V: AsRef<[u8]>,
669 {
670 DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
671 }
672
673 pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
674 where
675 K: AsRef<[u8]>,
676 V: AsRef<[u8]>,
677 {
678 DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
679 }
680
681 pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
682 where
683 K: AsRef<[u8]>,
684 V: AsRef<[u8]>,
685 {
686 let key = key.as_ref();
687 let value = value.as_ref();
688 unsafe {
689 ffi_try!(ffi::rocksdb_transactiondb_put(
690 self.inner,
691 writeopts.inner,
692 key.as_ptr() as *const c_char,
693 key.len() as size_t,
694 value.as_ptr() as *const c_char,
695 value.len() as size_t
696 ));
697 }
698 Ok(())
699 }
700
701 pub fn put_cf_opt<K, V>(
702 &self,
703 cf: &impl AsColumnFamilyRef,
704 key: K,
705 value: V,
706 writeopts: &WriteOptions,
707 ) -> Result<(), Error>
708 where
709 K: AsRef<[u8]>,
710 V: AsRef<[u8]>,
711 {
712 let key = key.as_ref();
713 let value = value.as_ref();
714 unsafe {
715 ffi_try!(ffi::rocksdb_transactiondb_put_cf(
716 self.inner,
717 writeopts.inner,
718 cf.inner(),
719 key.as_ptr() as *const c_char,
720 key.len() as size_t,
721 value.as_ptr() as *const c_char,
722 value.len() as size_t
723 ));
724 }
725 Ok(())
726 }
727
728 pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
729 DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
730 }
731
732 pub fn write_opt(
733 &self,
734 batch: &WriteBatchWithTransaction<true>,
735 writeopts: &WriteOptions,
736 ) -> Result<(), Error> {
737 unsafe {
738 ffi_try!(ffi::rocksdb_transactiondb_write(
739 self.inner,
740 writeopts.inner,
741 batch.inner
742 ));
743 }
744 Ok(())
745 }
746
747 pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
748 where
749 K: AsRef<[u8]>,
750 V: AsRef<[u8]>,
751 {
752 DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
753 }
754
755 pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
756 where
757 K: AsRef<[u8]>,
758 V: AsRef<[u8]>,
759 {
760 DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
761 }
762
763 pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
764 where
765 K: AsRef<[u8]>,
766 V: AsRef<[u8]>,
767 {
768 let key = key.as_ref();
769 let value = value.as_ref();
770 unsafe {
771 ffi_try!(ffi::rocksdb_transactiondb_merge(
772 self.inner,
773 writeopts.inner,
774 key.as_ptr() as *const c_char,
775 key.len() as size_t,
776 value.as_ptr() as *const c_char,
777 value.len() as size_t,
778 ));
779 Ok(())
780 }
781 }
782
783 pub fn merge_cf_opt<K, V>(
784 &self,
785 cf: &impl AsColumnFamilyRef,
786 key: K,
787 value: V,
788 writeopts: &WriteOptions,
789 ) -> Result<(), Error>
790 where
791 K: AsRef<[u8]>,
792 V: AsRef<[u8]>,
793 {
794 let key = key.as_ref();
795 let value = value.as_ref();
796 unsafe {
797 ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
798 self.inner,
799 writeopts.inner,
800 cf.inner(),
801 key.as_ptr() as *const c_char,
802 key.len() as size_t,
803 value.as_ptr() as *const c_char,
804 value.len() as size_t,
805 ));
806 Ok(())
807 }
808 }
809
810 pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
811 DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
812 }
813
814 pub fn delete_cf<K: AsRef<[u8]>>(
815 &self,
816 cf: &impl AsColumnFamilyRef,
817 key: K,
818 ) -> Result<(), Error> {
819 DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
820 }
821
822 pub fn delete_opt<K: AsRef<[u8]>>(
823 &self,
824 key: K,
825 writeopts: &WriteOptions,
826 ) -> Result<(), Error> {
827 let key = key.as_ref();
828 unsafe {
829 ffi_try!(ffi::rocksdb_transactiondb_delete(
830 self.inner,
831 writeopts.inner,
832 key.as_ptr() as *const c_char,
833 key.len() as size_t,
834 ));
835 }
836 Ok(())
837 }
838
839 pub fn delete_cf_opt<K: AsRef<[u8]>>(
840 &self,
841 cf: &impl AsColumnFamilyRef,
842 key: K,
843 writeopts: &WriteOptions,
844 ) -> Result<(), Error> {
845 let key = key.as_ref();
846 unsafe {
847 ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
848 self.inner,
849 writeopts.inner,
850 cf.inner(),
851 key.as_ptr() as *const c_char,
852 key.len() as size_t,
853 ));
854 }
855 Ok(())
856 }
857
858 pub fn iterator<'a: 'b, 'b>(
859 &'a self,
860 mode: IteratorMode,
861 ) -> DBIteratorWithThreadMode<'b, Self> {
862 let readopts = ReadOptions::default();
863 self.iterator_opt(mode, readopts)
864 }
865
866 pub fn iterator_opt<'a: 'b, 'b>(
867 &'a self,
868 mode: IteratorMode,
869 readopts: ReadOptions,
870 ) -> DBIteratorWithThreadMode<'b, Self> {
871 DBIteratorWithThreadMode::new(self, readopts, mode)
872 }
873
874 pub fn iterator_cf_opt<'a: 'b, 'b>(
877 &'a self,
878 cf_handle: &impl AsColumnFamilyRef,
879 readopts: ReadOptions,
880 mode: IteratorMode,
881 ) -> DBIteratorWithThreadMode<'b, Self> {
882 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
883 }
884
885 pub fn full_iterator<'a: 'b, 'b>(
889 &'a self,
890 mode: IteratorMode,
891 ) -> DBIteratorWithThreadMode<'b, Self> {
892 let mut opts = ReadOptions::default();
893 opts.set_total_order_seek(true);
894 DBIteratorWithThreadMode::new(self, opts, mode)
895 }
896
897 pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
898 &'a self,
899 prefix: P,
900 ) -> DBIteratorWithThreadMode<'b, Self> {
901 let mut opts = ReadOptions::default();
902 opts.set_prefix_same_as_start(true);
903 DBIteratorWithThreadMode::new(
904 self,
905 opts,
906 IteratorMode::From(prefix.as_ref(), Direction::Forward),
907 )
908 }
909
910 pub fn iterator_cf<'a: 'b, 'b>(
911 &'a self,
912 cf_handle: &impl AsColumnFamilyRef,
913 mode: IteratorMode,
914 ) -> DBIteratorWithThreadMode<'b, Self> {
915 let opts = ReadOptions::default();
916 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
917 }
918
919 pub fn full_iterator_cf<'a: 'b, 'b>(
920 &'a self,
921 cf_handle: &impl AsColumnFamilyRef,
922 mode: IteratorMode,
923 ) -> DBIteratorWithThreadMode<'b, Self> {
924 let mut opts = ReadOptions::default();
925 opts.set_total_order_seek(true);
926 DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
927 }
928
929 pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
930 &'a self,
931 cf_handle: &impl AsColumnFamilyRef,
932 prefix: P,
933 ) -> DBIteratorWithThreadMode<'a, Self> {
934 let mut opts = ReadOptions::default();
935 opts.set_prefix_same_as_start(true);
936 DBIteratorWithThreadMode::<'a, Self>::new_cf(
937 self,
938 cf_handle.inner(),
939 opts,
940 IteratorMode::From(prefix.as_ref(), Direction::Forward),
941 )
942 }
943
944 pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
946 let opts = ReadOptions::default();
947 DBRawIteratorWithThreadMode::new(self, opts)
948 }
949
950 pub fn raw_iterator_cf<'a: 'b, 'b>(
952 &'a self,
953 cf_handle: &impl AsColumnFamilyRef,
954 ) -> DBRawIteratorWithThreadMode<'b, Self> {
955 let opts = ReadOptions::default();
956 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
957 }
958
959 pub fn raw_iterator_opt<'a: 'b, 'b>(
961 &'a self,
962 readopts: ReadOptions,
963 ) -> DBRawIteratorWithThreadMode<'b, Self> {
964 DBRawIteratorWithThreadMode::new(self, readopts)
965 }
966
967 pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
969 &'a self,
970 cf_handle: &impl AsColumnFamilyRef,
971 readopts: ReadOptions,
972 ) -> DBRawIteratorWithThreadMode<'b, Self> {
973 DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
974 }
975
976 pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
977 SnapshotWithThreadMode::<Self>::new(self)
978 }
979
980 fn drop_column_family<C>(
981 &self,
982 cf_inner: *mut ffi::rocksdb_column_family_handle_t,
983 _cf: C,
984 ) -> Result<(), Error> {
985 unsafe {
986 ffi_try!(ffi::rocksdb_drop_column_family(
988 self.inner as *mut ffi::rocksdb_t,
989 cf_inner
990 ));
991 }
992 Ok(())
995 }
996}
997
998impl TransactionDB<SingleThreaded> {
999 pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
1001 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1002 self.cfs
1003 .cfs
1004 .insert(name.as_ref().to_string(), ColumnFamily { inner });
1005 Ok(())
1006 }
1007
1008 pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
1010 self.cfs.cfs.get(name)
1011 }
1012
1013 pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
1015 match self.cfs.cfs.remove(name) {
1016 Some(cf) => self.drop_column_family(cf.inner, cf),
1017 _ => Err(Error::new(format!("Invalid column family: {name}"))),
1018 }
1019 }
1020}
1021
1022impl TransactionDB<MultiThreaded> {
1023 pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
1025 let mut cfs = self.cfs.cfs.write();
1028 let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
1029 cfs.insert(
1030 name.as_ref().to_string(),
1031 Arc::new(UnboundColumnFamily { inner }),
1032 );
1033 Ok(())
1034 }
1035
1036 pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
1038 self.cfs
1039 .cfs
1040 .read()
1041 .get(name)
1042 .cloned()
1043 .map(UnboundColumnFamily::bound_column_family)
1044 }
1045
1046 pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
1049 match self.cfs.cfs.write().remove(name) {
1050 Some(cf) => self.drop_column_family(cf.inner, cf),
1051 _ => Err(Error::new(format!("Invalid column family: {name}"))),
1052 }
1053 }
1054
1055 fn property_value_impl<R>(
1064 name: impl CStrLike,
1065 get_property: impl FnOnce(*const c_char) -> *mut c_char,
1066 parse: impl FnOnce(&str) -> Result<R, Error>,
1067 ) -> Result<Option<R>, Error> {
1068 let value = match name.bake() {
1069 Ok(prop_name) => get_property(prop_name.as_ptr()),
1070 Err(e) => {
1071 return Err(Error::new(format!(
1072 "Failed to convert property name to CString: {e}"
1073 )));
1074 }
1075 };
1076 if value.is_null() {
1077 return Ok(None);
1078 }
1079 let result = match unsafe { CStr::from_ptr(value) }.to_str() {
1080 Ok(s) => parse(s).map(|value| Some(value)),
1081 Err(e) => Err(Error::new(format!(
1082 "Failed to convert property value to string: {e}"
1083 ))),
1084 };
1085 unsafe {
1086 ffi::rocksdb_free(value as *mut c_void);
1087 }
1088 result
1089 }
1090
1091 pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
1096 Self::property_value_impl(
1097 name,
1098 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1099 |str_value| Ok(str_value.to_owned()),
1100 )
1101 }
1102
1103 fn parse_property_int_value(value: &str) -> Result<u64, Error> {
1104 value.parse::<u64>().map_err(|err| {
1105 Error::new(format!(
1106 "Failed to convert property value {value} to int: {err}"
1107 ))
1108 })
1109 }
1110
1111 pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
1116 Self::property_value_impl(
1117 name,
1118 |prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
1119 Self::parse_property_int_value,
1120 )
1121 }
1122}
1123
1124impl<T: ThreadMode> Drop for TransactionDB<T> {
1125 fn drop(&mut self) {
1126 unsafe {
1127 self.prepared_transactions().clear();
1128 self.cfs.drop_all_cfs_internal();
1129 ffi::rocksdb_transactiondb_close(self.inner);
1130 }
1131 }
1132}