1use crate::{
2 ColumnFamily, DBRawIterator, Error, Options, ReadOptions, Transaction, WriteOptions,
3 db_options::OptionsMustOutliveDB,
4 db_vector::DBVector,
5 ffi_util::to_cstring,
6 handle::{ConstHandle, Handle},
7 open_raw::{OpenRaw, OpenRawFFI},
8 ops::*,
9 write_batch::WriteBatch,
10};
11
12use crate::ffi;
13use libc::{c_char, c_uchar, size_t};
14use std::collections::BTreeMap;
15use std::marker::PhantomData;
16use std::path::Path;
17use std::path::PathBuf;
18use std::ptr;
19
20pub struct TransactionDB {
22 inner: *mut ffi::rocksdb_transactiondb_t,
23 path: PathBuf,
24 cfs: BTreeMap<String, ColumnFamily>,
25 _outlive: Vec<OptionsMustOutliveDB>,
26}
27
28impl TransactionDB {
29 pub fn path(&self) -> &Path {
30 self.path.as_path()
31 }
32}
33
34impl Handle<ffi::rocksdb_transactiondb_t> for TransactionDB {
35 fn handle(&self) -> *mut ffi::rocksdb_transactiondb_t {
36 self.inner
37 }
38}
39
40impl Open for TransactionDB {}
41
42impl OpenCF for TransactionDB {}
43
44impl OpenRaw for TransactionDB {
45 type Pointer = ffi::rocksdb_transactiondb_t;
46 type Descriptor = TransactionDBOptions;
47
48 fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
49 let pointer = unsafe {
50 if input.num_column_families <= 0 {
51 ffi_try!(ffi::rocksdb_transactiondb_open(
52 input.options,
53 input.open_descriptor.inner,
54 input.path,
55 ))
56 } else {
57 ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
58 input.options,
59 input.open_descriptor.inner,
60 input.path,
61 input.num_column_families,
62 input.column_family_names,
63 input.column_family_options,
64 input.column_family_handles,
65 ))
66 }
67 };
68
69 Ok(pointer)
70 }
71
72 fn build<I>(
73 path: PathBuf,
74 _open_descriptor: Self::Descriptor,
75 pointer: *mut Self::Pointer,
76 column_families: I,
77 outlive: Vec<OptionsMustOutliveDB>,
78 ) -> Result<Self, Error>
79 where
80 I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
81 {
82 let cfs: BTreeMap<_, _> = column_families
83 .into_iter()
84 .map(|(k, h)| (k, ColumnFamily::new(h)))
85 .collect();
86 Ok(TransactionDB {
87 inner: pointer,
88 path,
89 cfs,
90 _outlive: outlive,
91 })
92 }
93}
94
95impl GetColumnFamilys for TransactionDB {
96 fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
97 &self.cfs
98 }
99 fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
100 &mut self.cfs
101 }
102}
103
104impl Read for TransactionDB {}
105impl Write for TransactionDB {}
106
107unsafe impl Send for TransactionDB {}
108unsafe impl Sync for TransactionDB {}
109
110impl TransactionBegin for TransactionDB {
111 type WriteOptions = WriteOptions;
112 type TransactionOptions = TransactionOptions;
113 fn transaction(
114 &self,
115 write_options: &WriteOptions,
116 tx_options: &TransactionOptions,
117 ) -> Transaction<'_, TransactionDB> {
118 unsafe {
119 let inner = ffi::rocksdb_transaction_begin(
120 self.inner,
121 write_options.handle(),
122 tx_options.inner,
123 ptr::null_mut(),
124 );
125 Transaction::new(inner)
126 }
127 }
128}
129
130impl Iterate for TransactionDB {
131 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
132 unsafe {
133 DBRawIterator {
134 inner: ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.handle()),
135 db: PhantomData,
136 }
137 }
138 }
139}
140
141impl IterateCF for TransactionDB {
142 fn get_raw_iter_cf<'a: 'b, 'b>(
143 &'a self,
144 cf_handle: &ColumnFamily,
145 readopts: &ReadOptions,
146 ) -> Result<DBRawIterator<'b>, Error> {
147 unsafe {
148 Ok(DBRawIterator {
149 inner: ffi::rocksdb_transactiondb_create_iterator_cf(
150 self.inner,
151 readopts.handle(),
152 cf_handle.handle(),
153 ),
154 db: PhantomData,
155 })
156 }
157 }
158}
159
160impl Drop for TransactionDB {
161 fn drop(&mut self) {
162 unsafe {
163 ffi::rocksdb_transactiondb_close(self.inner);
164 }
165 }
166}
167
168pub struct TransactionDBOptions {
169 inner: *mut ffi::rocksdb_transactiondb_options_t,
170}
171
172impl TransactionDBOptions {
173 pub fn new() -> TransactionDBOptions {
175 unsafe {
176 let inner = ffi::rocksdb_transactiondb_options_create();
177 TransactionDBOptions { inner }
178 }
179 }
180
181 pub fn set_default_lock_timeout(&self, default_lock_timeout: i64) {
182 unsafe {
183 ffi::rocksdb_transactiondb_options_set_default_lock_timeout(
184 self.inner,
185 default_lock_timeout,
186 )
187 }
188 }
189
190 pub fn set_max_num_locks(&self, max_num_locks: i64) {
191 unsafe { ffi::rocksdb_transactiondb_options_set_max_num_locks(self.inner, max_num_locks) }
192 }
193
194 pub fn set_num_stripes(&self, num_stripes: usize) {
195 unsafe { ffi::rocksdb_transactiondb_options_set_num_stripes(self.inner, num_stripes) }
196 }
197
198 pub fn set_transaction_lock_timeout(&self, txn_lock_timeout: i64) {
199 unsafe {
200 ffi::rocksdb_transactiondb_options_set_transaction_lock_timeout(
201 self.inner,
202 txn_lock_timeout,
203 )
204 }
205 }
206}
207
208impl Drop for TransactionDBOptions {
209 fn drop(&mut self) {
210 unsafe {
211 ffi::rocksdb_transactiondb_options_destroy(self.inner);
212 }
213 }
214}
215
216impl Default for TransactionDBOptions {
217 fn default() -> TransactionDBOptions {
218 TransactionDBOptions::new()
219 }
220}
221
222pub struct TransactionOptions {
223 inner: *mut ffi::rocksdb_transaction_options_t,
224}
225
226impl TransactionOptions {
227 pub fn new() -> TransactionOptions {
229 unsafe {
230 let inner = ffi::rocksdb_transaction_options_create();
231 TransactionOptions { inner }
232 }
233 }
234
235 pub fn set_deadlock_detect(&self, deadlock_detect: bool) {
236 unsafe {
237 ffi::rocksdb_transaction_options_set_deadlock_detect(
238 self.inner,
239 deadlock_detect as c_uchar,
240 )
241 }
242 }
243
244 pub fn set_deadlock_detect_depth(&self, depth: i64) {
245 unsafe { ffi::rocksdb_transaction_options_set_deadlock_detect_depth(self.inner, depth) }
246 }
247
248 pub fn set_expiration(&self, expiration: i64) {
249 unsafe { ffi::rocksdb_transaction_options_set_expiration(self.inner, expiration) }
250 }
251
252 pub fn set_lock_timeout(&self, lock_timeout: i64) {
253 unsafe { ffi::rocksdb_transaction_options_set_lock_timeout(self.inner, lock_timeout) }
254 }
255
256 pub fn set_max_write_batch_size(&self, size: usize) {
257 unsafe { ffi::rocksdb_transaction_options_set_max_write_batch_size(self.inner, size) }
258 }
259
260 pub fn set_snapshot(&mut self, set_snapshot: bool) {
261 unsafe {
262 ffi::rocksdb_transaction_options_set_set_snapshot(self.inner, set_snapshot as c_uchar);
263 }
264 }
265}
266
267impl Drop for TransactionOptions {
268 fn drop(&mut self) {
269 unsafe {
270 ffi::rocksdb_transaction_options_destroy(self.inner);
271 }
272 }
273}
274
275impl Default for TransactionOptions {
276 fn default() -> TransactionOptions {
277 TransactionOptions::new()
278 }
279}
280
281impl CreateCheckpointObject for TransactionDB {
282 unsafe fn create_checkpoint_object_raw(&self) -> Result<*mut ffi::rocksdb_checkpoint_t, Error> {
283 unsafe {
284 Ok(ffi_try!(
285 ffi::rocksdb_transactiondb_checkpoint_object_create(self.inner,)
286 ))
287 }
288 }
289}
290
291impl GetCF<ReadOptions> for TransactionDB {
292 fn get_cf_full<K: AsRef<[u8]>>(
293 &self,
294 cf: Option<&ColumnFamily>,
295 key: K,
296 readopts: Option<&ReadOptions>,
297 ) -> Result<Option<DBVector>, Error> {
298 let mut default_readopts = None;
299
300 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
301
302 let key = key.as_ref();
303 let key_ptr = key.as_ptr() as *const c_char;
304 let key_len = key.len() as size_t;
305
306 unsafe {
307 let mut val_len: size_t = 0;
308
309 let val = match cf {
310 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_get_cf(
311 self.handle(),
312 ro_handle,
313 cf.handle(),
314 key_ptr,
315 key_len,
316 &mut val_len,
317 )),
318 None => ffi_try!(ffi::rocksdb_transactiondb_get(
319 self.handle(),
320 ro_handle,
321 key_ptr,
322 key_len,
323 &mut val_len,
324 )),
325 } as *mut u8;
326
327 if val.is_null() {
328 Ok(None)
329 } else {
330 Ok(Some(DBVector::from_c(val, val_len)))
331 }
332 }
333 }
334}
335
336impl MultiGet<ReadOptions> for TransactionDB {
337 fn multi_get_full<K, I>(
338 &self,
339 keys: I,
340 readopts: Option<&ReadOptions>,
341 ) -> Vec<Result<Option<DBVector>, Error>>
342 where
343 K: AsRef<[u8]>,
344 I: IntoIterator<Item = K>,
345 {
346 let mut default_readopts = None;
347 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
348 Ok(ro) => ro,
349 Err(e) => {
350 let key_count = keys.into_iter().count();
351
352 return vec![e; key_count]
353 .iter()
354 .map(|e| Err(e.to_owned()))
355 .collect();
356 }
357 };
358
359 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
360 .into_iter()
361 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
362 .unzip();
363 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
364
365 let mut values = vec![ptr::null_mut(); keys.len()];
366 let mut values_sizes = vec![0_usize; keys.len()];
367 let mut errors = vec![ptr::null_mut(); keys.len()];
368 unsafe {
369 ffi::rocksdb_transactiondb_multi_get(
370 self.inner,
371 ro_handle,
372 ptr_keys.len(),
373 ptr_keys.as_ptr(),
374 keys_sizes.as_ptr(),
375 values.as_mut_ptr(),
376 values_sizes.as_mut_ptr(),
377 errors.as_mut_ptr(),
378 );
379 }
380
381 convert_values(values, values_sizes, errors)
382 }
383}
384
385impl MultiGetCF<ReadOptions> for TransactionDB {
386 fn multi_get_cf_full<'a, K, I>(
387 &self,
388 keys: I,
389 readopts: Option<&ReadOptions>,
390 ) -> Vec<Result<Option<DBVector>, Error>>
391 where
392 K: AsRef<[u8]>,
393 I: IntoIterator<Item = (&'a ColumnFamily, K)>,
394 {
395 let mut default_readopts = None;
396 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
397 Ok(ro) => ro,
398 Err(e) => {
399 let key_count = keys.into_iter().count();
400
401 return vec![e; key_count]
402 .iter()
403 .map(|e| Err(e.to_owned()))
404 .collect();
405 }
406 };
407 let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
408 .into_iter()
409 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
410 .unzip();
411 let ptr_keys: Vec<_> = cfs_and_keys
412 .iter()
413 .map(|(_, k)| k.as_ptr() as *const c_char)
414 .collect();
415 let ptr_cfs: Vec<_> = cfs_and_keys
416 .iter()
417 .map(|(c, _)| c.inner as *const _)
418 .collect();
419
420 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
421 let mut values_sizes = vec![0_usize; ptr_keys.len()];
422 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
423 unsafe {
424 ffi::rocksdb_transactiondb_multi_get_cf(
425 self.inner,
426 ro_handle,
427 ptr_cfs.as_ptr(),
428 ptr_keys.len(),
429 ptr_keys.as_ptr(),
430 keys_sizes.as_ptr(),
431 values.as_mut_ptr(),
432 values_sizes.as_mut_ptr(),
433 errors.as_mut_ptr(),
434 );
435 }
436
437 convert_values(values, values_sizes, errors)
438 }
439}
440
441impl PutCF<WriteOptions> for TransactionDB {
442 fn put_cf_full<K, V>(
443 &self,
444 cf: Option<&ColumnFamily>,
445 key: K,
446 value: V,
447 writeopts: Option<&WriteOptions>,
448 ) -> Result<(), Error>
449 where
450 K: AsRef<[u8]>,
451 V: AsRef<[u8]>,
452 {
453 let mut default_writeopts = None;
454
455 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
456
457 let key = key.as_ref();
458 let value = value.as_ref();
459 let key_ptr = key.as_ptr() as *const c_char;
460 let key_len = key.len() as size_t;
461 let val_ptr = value.as_ptr() as *const c_char;
462 let val_len = value.len() as size_t;
463
464 unsafe {
465 match cf {
466 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_put_cf(
467 self.handle(),
468 wo_handle,
469 cf.handle(),
470 key_ptr,
471 key_len,
472 val_ptr,
473 val_len,
474 )),
475 None => ffi_try!(ffi::rocksdb_transactiondb_put(
476 self.handle(),
477 wo_handle,
478 key_ptr,
479 key_len,
480 val_ptr,
481 val_len,
482 )),
483 }
484
485 Ok(())
486 }
487 }
488}
489
490impl DeleteCF<WriteOptions> for TransactionDB {
491 fn delete_cf_full<K>(
492 &self,
493 cf: Option<&ColumnFamily>,
494 key: K,
495 writeopts: Option<&WriteOptions>,
496 ) -> Result<(), Error>
497 where
498 K: AsRef<[u8]>,
499 {
500 let mut default_writeopts = None;
501
502 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
503
504 let key = key.as_ref();
505 let key_ptr = key.as_ptr() as *const c_char;
506 let key_len = key.len() as size_t;
507
508 unsafe {
509 match cf {
510 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
511 self.handle(),
512 wo_handle,
513 cf.handle(),
514 key_ptr,
515 key_len,
516 )),
517 None => ffi_try!(ffi::rocksdb_transactiondb_delete(
518 self.handle(),
519 wo_handle,
520 key_ptr,
521 key_len,
522 )),
523 }
524
525 Ok(())
526 }
527 }
528}
529
530impl MergeCF<WriteOptions> for TransactionDB {
531 fn merge_cf_full<K, V>(
532 &self,
533 cf: Option<&ColumnFamily>,
534 key: K,
535 value: V,
536 writeopts: Option<&WriteOptions>,
537 ) -> Result<(), Error>
538 where
539 K: AsRef<[u8]>,
540 V: AsRef<[u8]>,
541 {
542 let mut default_writeopts = None;
543
544 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
545
546 let key = key.as_ref();
547 let value = value.as_ref();
548 let key_ptr = key.as_ptr() as *const c_char;
549 let key_len = key.len() as size_t;
550 let val_ptr = value.as_ptr() as *const c_char;
551 let val_len = value.len() as size_t;
552
553 unsafe {
554 match cf {
555 Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
556 self.handle(),
557 wo_handle,
558 cf.handle(),
559 key_ptr,
560 key_len,
561 val_ptr,
562 val_len,
563 )),
564 None => ffi_try!(ffi::rocksdb_transactiondb_merge(
565 self.handle(),
566 wo_handle,
567 key_ptr,
568 key_len,
569 val_ptr,
570 val_len,
571 )),
572 }
573
574 Ok(())
575 }
576 }
577}
578
579impl CreateCF for TransactionDB {
580 fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
581 let cname = to_cstring(
582 name.as_ref(),
583 "Failed to convert path to CString when opening rocksdb",
584 )?;
585 unsafe {
586 let cf_handle = ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
587 self.handle(),
588 opts.const_handle(),
589 cname.as_ptr(),
590 ));
591
592 self.get_mut_cfs()
593 .insert(name.as_ref().to_string(), ColumnFamily::new(cf_handle));
594 };
595 Ok(())
596 }
597}
598
599impl TransactionDB {
600 pub fn snapshot(&self) -> Snapshot<'_> {
601 let snapshot = unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) };
602 Snapshot {
603 db: self,
604 inner: snapshot,
605 }
606 }
607}
608
609pub struct Snapshot<'a> {
610 db: &'a TransactionDB,
611 inner: *const ffi::rocksdb_snapshot_t,
612}
613
614impl ConstHandle<ffi::rocksdb_snapshot_t> for Snapshot<'_> {
615 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
616 self.inner
617 }
618}
619
620impl Read for Snapshot<'_> {}
621
622impl GetCF<ReadOptions> for Snapshot<'_> {
623 fn get_cf_full<K: AsRef<[u8]>>(
624 &self,
625 cf: Option<&ColumnFamily>,
626 key: K,
627 readopts: Option<&ReadOptions>,
628 ) -> Result<Option<DBVector>, Error> {
629 let mut ro = readopts.cloned().unwrap_or_default();
630 ro.set_snapshot(self);
631
632 self.db.get_cf_full(cf, key, Some(&ro))
633 }
634}
635
636impl MultiGet<ReadOptions> for Snapshot<'_> {
637 fn multi_get_full<K, I>(
638 &self,
639 keys: I,
640 readopts: Option<&ReadOptions>,
641 ) -> Vec<Result<Option<DBVector>, Error>>
642 where
643 K: AsRef<[u8]>,
644 I: IntoIterator<Item = K>,
645 {
646 let mut ro = readopts.cloned().unwrap_or_default();
647 ro.set_snapshot(self);
648
649 self.db.multi_get_full(keys, Some(&ro))
650 }
651}
652
653impl MultiGetCF<ReadOptions> for Snapshot<'_> {
654 fn multi_get_cf_full<'m, K, I>(
655 &self,
656 keys: I,
657 readopts: Option<&ReadOptions>,
658 ) -> Vec<Result<Option<DBVector>, Error>>
659 where
660 K: AsRef<[u8]>,
661 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
662 {
663 let mut ro = readopts.cloned().unwrap_or_default();
664 ro.set_snapshot(self);
665
666 self.db.multi_get_cf_full(keys, Some(&ro))
667 }
668}
669
670impl Drop for Snapshot<'_> {
671 fn drop(&mut self) {
672 unsafe {
673 ffi::rocksdb_transactiondb_release_snapshot(self.db.inner, self.inner);
674 }
675 }
676}
677
678impl Iterate for Snapshot<'_> {
679 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
680 let mut ro = readopts.to_owned();
681 ro.set_snapshot(self);
682 self.db.get_raw_iter(&ro)
683 }
684}
685
686impl IterateCF for Snapshot<'_> {
687 fn get_raw_iter_cf<'a: 'b, 'b>(
688 &'a self,
689 cf_handle: &ColumnFamily,
690 readopts: &ReadOptions,
691 ) -> Result<DBRawIterator<'b>, Error> {
692 let mut ro = readopts.to_owned();
693 ro.set_snapshot(self);
694 self.db.get_raw_iter_cf(cf_handle, &ro)
695 }
696}
697
698impl WriteOps for TransactionDB {
699 fn write_full(
700 &self,
701 batch: &WriteBatch,
702 writeopts: Option<&WriteOptions>,
703 ) -> Result<(), Error> {
704 let mut default_writeopts = None;
705
706 let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
707
708 unsafe {
709 ffi_try!(ffi::rocksdb_transactiondb_write(
710 self.handle(),
711 wo_handle,
712 batch.handle(),
713 ));
714 Ok(())
715 }
716 }
717}