ckb_rocksdb/
transaction_db.rs

1use crate::{
2    ColumnFamily, DBRawIterator, Error, Options, ReadOptions, Transaction, WriteOptions,
3    db_options::OptionsMustOutliveDB,
4    db_vector::DBVector,
5    ffi_util::to_cstring,
6    handle::{ConstHandle, Handle},
7    open_raw::{OpenRaw, OpenRawFFI},
8    ops::*,
9    write_batch::WriteBatch,
10};
11
12use crate::ffi;
13use libc::{c_char, c_uchar, size_t};
14use std::collections::BTreeMap;
15use std::marker::PhantomData;
16use std::path::Path;
17use std::path::PathBuf;
18use std::ptr;
19
20/// A transaction database.
21pub struct TransactionDB {
22    inner: *mut ffi::rocksdb_transactiondb_t,
23    path: PathBuf,
24    cfs: BTreeMap<String, ColumnFamily>,
25    _outlive: Vec<OptionsMustOutliveDB>,
26}
27
28impl TransactionDB {
29    pub fn path(&self) -> &Path {
30        self.path.as_path()
31    }
32}
33
34impl Handle<ffi::rocksdb_transactiondb_t> for TransactionDB {
35    fn handle(&self) -> *mut ffi::rocksdb_transactiondb_t {
36        self.inner
37    }
38}
39
40impl Open for TransactionDB {}
41
42impl OpenCF for TransactionDB {}
43
44impl OpenRaw for TransactionDB {
45    type Pointer = ffi::rocksdb_transactiondb_t;
46    type Descriptor = TransactionDBOptions;
47
48    fn open_ffi(input: OpenRawFFI<'_, Self::Descriptor>) -> Result<*mut Self::Pointer, Error> {
49        let pointer = unsafe {
50            if input.num_column_families <= 0 {
51                ffi_try!(ffi::rocksdb_transactiondb_open(
52                    input.options,
53                    input.open_descriptor.inner,
54                    input.path,
55                ))
56            } else {
57                ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
58                    input.options,
59                    input.open_descriptor.inner,
60                    input.path,
61                    input.num_column_families,
62                    input.column_family_names,
63                    input.column_family_options,
64                    input.column_family_handles,
65                ))
66            }
67        };
68
69        Ok(pointer)
70    }
71
72    fn build<I>(
73        path: PathBuf,
74        _open_descriptor: Self::Descriptor,
75        pointer: *mut Self::Pointer,
76        column_families: I,
77        outlive: Vec<OptionsMustOutliveDB>,
78    ) -> Result<Self, Error>
79    where
80        I: IntoIterator<Item = (String, *mut ffi::rocksdb_column_family_handle_t)>,
81    {
82        let cfs: BTreeMap<_, _> = column_families
83            .into_iter()
84            .map(|(k, h)| (k, ColumnFamily::new(h)))
85            .collect();
86        Ok(TransactionDB {
87            inner: pointer,
88            path,
89            cfs,
90            _outlive: outlive,
91        })
92    }
93}
94
95impl GetColumnFamilys for TransactionDB {
96    fn get_cfs(&self) -> &BTreeMap<String, ColumnFamily> {
97        &self.cfs
98    }
99    fn get_mut_cfs(&mut self) -> &mut BTreeMap<String, ColumnFamily> {
100        &mut self.cfs
101    }
102}
103
104impl Read for TransactionDB {}
105impl Write for TransactionDB {}
106
107unsafe impl Send for TransactionDB {}
108unsafe impl Sync for TransactionDB {}
109
110impl TransactionBegin for TransactionDB {
111    type WriteOptions = WriteOptions;
112    type TransactionOptions = TransactionOptions;
113    fn transaction(
114        &self,
115        write_options: &WriteOptions,
116        tx_options: &TransactionOptions,
117    ) -> Transaction<'_, TransactionDB> {
118        unsafe {
119            let inner = ffi::rocksdb_transaction_begin(
120                self.inner,
121                write_options.handle(),
122                tx_options.inner,
123                ptr::null_mut(),
124            );
125            Transaction::new(inner)
126        }
127    }
128}
129
130impl Iterate for TransactionDB {
131    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
132        unsafe {
133            DBRawIterator {
134                inner: ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.handle()),
135                db: PhantomData,
136            }
137        }
138    }
139}
140
141impl IterateCF for TransactionDB {
142    fn get_raw_iter_cf<'a: 'b, 'b>(
143        &'a self,
144        cf_handle: &ColumnFamily,
145        readopts: &ReadOptions,
146    ) -> Result<DBRawIterator<'b>, Error> {
147        unsafe {
148            Ok(DBRawIterator {
149                inner: ffi::rocksdb_transactiondb_create_iterator_cf(
150                    self.inner,
151                    readopts.handle(),
152                    cf_handle.handle(),
153                ),
154                db: PhantomData,
155            })
156        }
157    }
158}
159
160impl Drop for TransactionDB {
161    fn drop(&mut self) {
162        unsafe {
163            ffi::rocksdb_transactiondb_close(self.inner);
164        }
165    }
166}
167
168pub struct TransactionDBOptions {
169    inner: *mut ffi::rocksdb_transactiondb_options_t,
170}
171
172impl TransactionDBOptions {
173    /// Create new transaction options
174    pub fn new() -> TransactionDBOptions {
175        unsafe {
176            let inner = ffi::rocksdb_transactiondb_options_create();
177            TransactionDBOptions { inner }
178        }
179    }
180
181    pub fn set_default_lock_timeout(&self, default_lock_timeout: i64) {
182        unsafe {
183            ffi::rocksdb_transactiondb_options_set_default_lock_timeout(
184                self.inner,
185                default_lock_timeout,
186            )
187        }
188    }
189
190    pub fn set_max_num_locks(&self, max_num_locks: i64) {
191        unsafe { ffi::rocksdb_transactiondb_options_set_max_num_locks(self.inner, max_num_locks) }
192    }
193
194    pub fn set_num_stripes(&self, num_stripes: usize) {
195        unsafe { ffi::rocksdb_transactiondb_options_set_num_stripes(self.inner, num_stripes) }
196    }
197
198    pub fn set_transaction_lock_timeout(&self, txn_lock_timeout: i64) {
199        unsafe {
200            ffi::rocksdb_transactiondb_options_set_transaction_lock_timeout(
201                self.inner,
202                txn_lock_timeout,
203            )
204        }
205    }
206}
207
208impl Drop for TransactionDBOptions {
209    fn drop(&mut self) {
210        unsafe {
211            ffi::rocksdb_transactiondb_options_destroy(self.inner);
212        }
213    }
214}
215
216impl Default for TransactionDBOptions {
217    fn default() -> TransactionDBOptions {
218        TransactionDBOptions::new()
219    }
220}
221
222pub struct TransactionOptions {
223    inner: *mut ffi::rocksdb_transaction_options_t,
224}
225
226impl TransactionOptions {
227    /// Create new transaction options
228    pub fn new() -> TransactionOptions {
229        unsafe {
230            let inner = ffi::rocksdb_transaction_options_create();
231            TransactionOptions { inner }
232        }
233    }
234
235    pub fn set_deadlock_detect(&self, deadlock_detect: bool) {
236        unsafe {
237            ffi::rocksdb_transaction_options_set_deadlock_detect(
238                self.inner,
239                deadlock_detect as c_uchar,
240            )
241        }
242    }
243
244    pub fn set_deadlock_detect_depth(&self, depth: i64) {
245        unsafe { ffi::rocksdb_transaction_options_set_deadlock_detect_depth(self.inner, depth) }
246    }
247
248    pub fn set_expiration(&self, expiration: i64) {
249        unsafe { ffi::rocksdb_transaction_options_set_expiration(self.inner, expiration) }
250    }
251
252    pub fn set_lock_timeout(&self, lock_timeout: i64) {
253        unsafe { ffi::rocksdb_transaction_options_set_lock_timeout(self.inner, lock_timeout) }
254    }
255
256    pub fn set_max_write_batch_size(&self, size: usize) {
257        unsafe { ffi::rocksdb_transaction_options_set_max_write_batch_size(self.inner, size) }
258    }
259
260    pub fn set_snapshot(&mut self, set_snapshot: bool) {
261        unsafe {
262            ffi::rocksdb_transaction_options_set_set_snapshot(self.inner, set_snapshot as c_uchar);
263        }
264    }
265}
266
267impl Drop for TransactionOptions {
268    fn drop(&mut self) {
269        unsafe {
270            ffi::rocksdb_transaction_options_destroy(self.inner);
271        }
272    }
273}
274
275impl Default for TransactionOptions {
276    fn default() -> TransactionOptions {
277        TransactionOptions::new()
278    }
279}
280
281impl CreateCheckpointObject for TransactionDB {
282    unsafe fn create_checkpoint_object_raw(&self) -> Result<*mut ffi::rocksdb_checkpoint_t, Error> {
283        unsafe {
284            Ok(ffi_try!(
285                ffi::rocksdb_transactiondb_checkpoint_object_create(self.inner,)
286            ))
287        }
288    }
289}
290
291impl GetCF<ReadOptions> for TransactionDB {
292    fn get_cf_full<K: AsRef<[u8]>>(
293        &self,
294        cf: Option<&ColumnFamily>,
295        key: K,
296        readopts: Option<&ReadOptions>,
297    ) -> Result<Option<DBVector>, Error> {
298        let mut default_readopts = None;
299
300        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
301
302        let key = key.as_ref();
303        let key_ptr = key.as_ptr() as *const c_char;
304        let key_len = key.len() as size_t;
305
306        unsafe {
307            let mut val_len: size_t = 0;
308
309            let val = match cf {
310                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_get_cf(
311                    self.handle(),
312                    ro_handle,
313                    cf.handle(),
314                    key_ptr,
315                    key_len,
316                    &mut val_len,
317                )),
318                None => ffi_try!(ffi::rocksdb_transactiondb_get(
319                    self.handle(),
320                    ro_handle,
321                    key_ptr,
322                    key_len,
323                    &mut val_len,
324                )),
325            } as *mut u8;
326
327            if val.is_null() {
328                Ok(None)
329            } else {
330                Ok(Some(DBVector::from_c(val, val_len)))
331            }
332        }
333    }
334}
335
336impl MultiGet<ReadOptions> for TransactionDB {
337    fn multi_get_full<K, I>(
338        &self,
339        keys: I,
340        readopts: Option<&ReadOptions>,
341    ) -> Vec<Result<Option<DBVector>, Error>>
342    where
343        K: AsRef<[u8]>,
344        I: IntoIterator<Item = K>,
345    {
346        let mut default_readopts = None;
347        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
348            Ok(ro) => ro,
349            Err(e) => {
350                let key_count = keys.into_iter().count();
351
352                return vec![e; key_count]
353                    .iter()
354                    .map(|e| Err(e.to_owned()))
355                    .collect();
356            }
357        };
358
359        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
360            .into_iter()
361            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
362            .unzip();
363        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
364
365        let mut values = vec![ptr::null_mut(); keys.len()];
366        let mut values_sizes = vec![0_usize; keys.len()];
367        let mut errors = vec![ptr::null_mut(); keys.len()];
368        unsafe {
369            ffi::rocksdb_transactiondb_multi_get(
370                self.inner,
371                ro_handle,
372                ptr_keys.len(),
373                ptr_keys.as_ptr(),
374                keys_sizes.as_ptr(),
375                values.as_mut_ptr(),
376                values_sizes.as_mut_ptr(),
377                errors.as_mut_ptr(),
378            );
379        }
380
381        convert_values(values, values_sizes, errors)
382    }
383}
384
385impl MultiGetCF<ReadOptions> for TransactionDB {
386    fn multi_get_cf_full<'a, K, I>(
387        &self,
388        keys: I,
389        readopts: Option<&ReadOptions>,
390    ) -> Vec<Result<Option<DBVector>, Error>>
391    where
392        K: AsRef<[u8]>,
393        I: IntoIterator<Item = (&'a ColumnFamily, K)>,
394    {
395        let mut default_readopts = None;
396        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
397            Ok(ro) => ro,
398            Err(e) => {
399                let key_count = keys.into_iter().count();
400
401                return vec![e; key_count]
402                    .iter()
403                    .map(|e| Err(e.to_owned()))
404                    .collect();
405            }
406        };
407        let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
408            .into_iter()
409            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
410            .unzip();
411        let ptr_keys: Vec<_> = cfs_and_keys
412            .iter()
413            .map(|(_, k)| k.as_ptr() as *const c_char)
414            .collect();
415        let ptr_cfs: Vec<_> = cfs_and_keys
416            .iter()
417            .map(|(c, _)| c.inner as *const _)
418            .collect();
419
420        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
421        let mut values_sizes = vec![0_usize; ptr_keys.len()];
422        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
423        unsafe {
424            ffi::rocksdb_transactiondb_multi_get_cf(
425                self.inner,
426                ro_handle,
427                ptr_cfs.as_ptr(),
428                ptr_keys.len(),
429                ptr_keys.as_ptr(),
430                keys_sizes.as_ptr(),
431                values.as_mut_ptr(),
432                values_sizes.as_mut_ptr(),
433                errors.as_mut_ptr(),
434            );
435        }
436
437        convert_values(values, values_sizes, errors)
438    }
439}
440
441impl PutCF<WriteOptions> for TransactionDB {
442    fn put_cf_full<K, V>(
443        &self,
444        cf: Option<&ColumnFamily>,
445        key: K,
446        value: V,
447        writeopts: Option<&WriteOptions>,
448    ) -> Result<(), Error>
449    where
450        K: AsRef<[u8]>,
451        V: AsRef<[u8]>,
452    {
453        let mut default_writeopts = None;
454
455        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
456
457        let key = key.as_ref();
458        let value = value.as_ref();
459        let key_ptr = key.as_ptr() as *const c_char;
460        let key_len = key.len() as size_t;
461        let val_ptr = value.as_ptr() as *const c_char;
462        let val_len = value.len() as size_t;
463
464        unsafe {
465            match cf {
466                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_put_cf(
467                    self.handle(),
468                    wo_handle,
469                    cf.handle(),
470                    key_ptr,
471                    key_len,
472                    val_ptr,
473                    val_len,
474                )),
475                None => ffi_try!(ffi::rocksdb_transactiondb_put(
476                    self.handle(),
477                    wo_handle,
478                    key_ptr,
479                    key_len,
480                    val_ptr,
481                    val_len,
482                )),
483            }
484
485            Ok(())
486        }
487    }
488}
489
490impl DeleteCF<WriteOptions> for TransactionDB {
491    fn delete_cf_full<K>(
492        &self,
493        cf: Option<&ColumnFamily>,
494        key: K,
495        writeopts: Option<&WriteOptions>,
496    ) -> Result<(), Error>
497    where
498        K: AsRef<[u8]>,
499    {
500        let mut default_writeopts = None;
501
502        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
503
504        let key = key.as_ref();
505        let key_ptr = key.as_ptr() as *const c_char;
506        let key_len = key.len() as size_t;
507
508        unsafe {
509            match cf {
510                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
511                    self.handle(),
512                    wo_handle,
513                    cf.handle(),
514                    key_ptr,
515                    key_len,
516                )),
517                None => ffi_try!(ffi::rocksdb_transactiondb_delete(
518                    self.handle(),
519                    wo_handle,
520                    key_ptr,
521                    key_len,
522                )),
523            }
524
525            Ok(())
526        }
527    }
528}
529
530impl MergeCF<WriteOptions> for TransactionDB {
531    fn merge_cf_full<K, V>(
532        &self,
533        cf: Option<&ColumnFamily>,
534        key: K,
535        value: V,
536        writeopts: Option<&WriteOptions>,
537    ) -> Result<(), Error>
538    where
539        K: AsRef<[u8]>,
540        V: AsRef<[u8]>,
541    {
542        let mut default_writeopts = None;
543
544        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
545
546        let key = key.as_ref();
547        let value = value.as_ref();
548        let key_ptr = key.as_ptr() as *const c_char;
549        let key_len = key.len() as size_t;
550        let val_ptr = value.as_ptr() as *const c_char;
551        let val_len = value.len() as size_t;
552
553        unsafe {
554            match cf {
555                Some(cf) => ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
556                    self.handle(),
557                    wo_handle,
558                    cf.handle(),
559                    key_ptr,
560                    key_len,
561                    val_ptr,
562                    val_len,
563                )),
564                None => ffi_try!(ffi::rocksdb_transactiondb_merge(
565                    self.handle(),
566                    wo_handle,
567                    key_ptr,
568                    key_len,
569                    val_ptr,
570                    val_len,
571                )),
572            }
573
574            Ok(())
575        }
576    }
577}
578
579impl CreateCF for TransactionDB {
580    fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
581        let cname = to_cstring(
582            name.as_ref(),
583            "Failed to convert path to CString when opening rocksdb",
584        )?;
585        unsafe {
586            let cf_handle = ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
587                self.handle(),
588                opts.const_handle(),
589                cname.as_ptr(),
590            ));
591
592            self.get_mut_cfs()
593                .insert(name.as_ref().to_string(), ColumnFamily::new(cf_handle));
594        };
595        Ok(())
596    }
597}
598
599impl TransactionDB {
600    pub fn snapshot(&self) -> Snapshot<'_> {
601        let snapshot = unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) };
602        Snapshot {
603            db: self,
604            inner: snapshot,
605        }
606    }
607}
608
609pub struct Snapshot<'a> {
610    db: &'a TransactionDB,
611    inner: *const ffi::rocksdb_snapshot_t,
612}
613
614impl ConstHandle<ffi::rocksdb_snapshot_t> for Snapshot<'_> {
615    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
616        self.inner
617    }
618}
619
620impl Read for Snapshot<'_> {}
621
622impl GetCF<ReadOptions> for Snapshot<'_> {
623    fn get_cf_full<K: AsRef<[u8]>>(
624        &self,
625        cf: Option<&ColumnFamily>,
626        key: K,
627        readopts: Option<&ReadOptions>,
628    ) -> Result<Option<DBVector>, Error> {
629        let mut ro = readopts.cloned().unwrap_or_default();
630        ro.set_snapshot(self);
631
632        self.db.get_cf_full(cf, key, Some(&ro))
633    }
634}
635
636impl MultiGet<ReadOptions> for Snapshot<'_> {
637    fn multi_get_full<K, I>(
638        &self,
639        keys: I,
640        readopts: Option<&ReadOptions>,
641    ) -> Vec<Result<Option<DBVector>, Error>>
642    where
643        K: AsRef<[u8]>,
644        I: IntoIterator<Item = K>,
645    {
646        let mut ro = readopts.cloned().unwrap_or_default();
647        ro.set_snapshot(self);
648
649        self.db.multi_get_full(keys, Some(&ro))
650    }
651}
652
653impl MultiGetCF<ReadOptions> for Snapshot<'_> {
654    fn multi_get_cf_full<'m, K, I>(
655        &self,
656        keys: I,
657        readopts: Option<&ReadOptions>,
658    ) -> Vec<Result<Option<DBVector>, Error>>
659    where
660        K: AsRef<[u8]>,
661        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
662    {
663        let mut ro = readopts.cloned().unwrap_or_default();
664        ro.set_snapshot(self);
665
666        self.db.multi_get_cf_full(keys, Some(&ro))
667    }
668}
669
670impl Drop for Snapshot<'_> {
671    fn drop(&mut self) {
672        unsafe {
673            ffi::rocksdb_transactiondb_release_snapshot(self.db.inner, self.inner);
674        }
675    }
676}
677
678impl Iterate for Snapshot<'_> {
679    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
680        let mut ro = readopts.to_owned();
681        ro.set_snapshot(self);
682        self.db.get_raw_iter(&ro)
683    }
684}
685
686impl IterateCF for Snapshot<'_> {
687    fn get_raw_iter_cf<'a: 'b, 'b>(
688        &'a self,
689        cf_handle: &ColumnFamily,
690        readopts: &ReadOptions,
691    ) -> Result<DBRawIterator<'b>, Error> {
692        let mut ro = readopts.to_owned();
693        ro.set_snapshot(self);
694        self.db.get_raw_iter_cf(cf_handle, &ro)
695    }
696}
697
698impl WriteOps for TransactionDB {
699    fn write_full(
700        &self,
701        batch: &WriteBatch,
702        writeopts: Option<&WriteOptions>,
703    ) -> Result<(), Error> {
704        let mut default_writeopts = None;
705
706        let wo_handle = WriteOptions::input_or_default(writeopts, &mut default_writeopts)?;
707
708        unsafe {
709            ffi_try!(ffi::rocksdb_transactiondb_write(
710                self.handle(),
711                wo_handle,
712                batch.handle(),
713            ));
714            Ok(())
715        }
716    }
717}