ckb_rocksdb/
optimistic_transaction.rs

1use crate::ffi;
2use crate::{
3    ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions, ffi_util,
4    handle::{ConstHandle, Handle},
5    ops::*,
6};
7use libc::{c_char, c_uchar, c_void, size_t};
8use std::marker::PhantomData;
9use std::ptr;
10
11pub struct OptimisticTransaction {
12    inner: *mut ffi::rocksdb_transaction_t,
13}
14
15unsafe impl Send for OptimisticTransaction {}
16unsafe impl Sync for OptimisticTransaction {}
17
18impl OptimisticTransaction {
19    pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> OptimisticTransaction {
20        OptimisticTransaction { inner }
21    }
22
23    /// commits a transaction
24    pub fn commit(&self) -> Result<(), Error> {
25        unsafe {
26            ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
27        }
28        Ok(())
29    }
30
31    /// Transaction rollback
32    pub fn rollback(&self) -> Result<(), Error> {
33        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
34        Ok(())
35    }
36
37    /// Transaction rollback to savepoint
38    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
39        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
40        Ok(())
41    }
42
43    /// Set savepoint for transaction
44    pub fn set_savepoint(&self) {
45        unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
46    }
47
48    /// Get Snapshot
49    pub fn snapshot(&self) -> OptimisticTransactionSnapshot<'_> {
50        unsafe {
51            let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
52            OptimisticTransactionSnapshot {
53                txn: self,
54                inner: snapshot,
55            }
56        }
57    }
58
59    /// Get For Update
60    /// ReadOptions: Default
61    /// exclusive: true
62    pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
63        let opt = ReadOptions::default();
64        self.get_for_update_opt(key, &opt, true)
65    }
66
67    /// Get For Update with custom ReadOptions and exclusive
68    pub fn get_for_update_opt<K: AsRef<[u8]>>(
69        &self,
70        key: K,
71        readopts: &ReadOptions,
72        exclusive: bool,
73    ) -> Result<Option<DBVector>, Error> {
74        let key = key.as_ref();
75        let key_ptr = key.as_ptr() as *const c_char;
76        let key_len = key.len() as size_t;
77        unsafe {
78            let mut val_len: size_t = 0;
79            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
80                self.handle(),
81                readopts.handle(),
82                key_ptr,
83                key_len,
84                &mut val_len,
85                exclusive as c_uchar,
86            )) as *mut u8;
87
88            if val.is_null() {
89                Ok(None)
90            } else {
91                Ok(Some(DBVector::from_c(val, val_len)))
92            }
93        }
94    }
95
96    pub fn get_for_update_cf<K: AsRef<[u8]>>(
97        &self,
98        cf: &ColumnFamily,
99        key: K,
100    ) -> Result<Option<DBVector>, Error> {
101        let opt = ReadOptions::default();
102        self.get_for_update_cf_opt(cf, key, &opt, true)
103    }
104
105    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
106        &self,
107        cf: &ColumnFamily,
108        key: K,
109        readopts: &ReadOptions,
110        exclusive: bool,
111    ) -> Result<Option<DBVector>, Error> {
112        let key = key.as_ref();
113        let key_ptr = key.as_ptr() as *const c_char;
114        let key_len = key.len() as size_t;
115        unsafe {
116            let mut val_len: size_t = 0;
117            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
118                self.handle(),
119                readopts.handle(),
120                cf.handle(),
121                key_ptr,
122                key_len,
123                &mut val_len,
124                exclusive as c_uchar,
125            )) as *mut u8;
126
127            if val.is_null() {
128                Ok(None)
129            } else {
130                Ok(Some(DBVector::from_c(val, val_len)))
131            }
132        }
133    }
134}
135
136impl Drop for OptimisticTransaction {
137    fn drop(&mut self) {
138        unsafe {
139            ffi::rocksdb_transaction_destroy(self.inner);
140        }
141    }
142}
143
144impl Handle<ffi::rocksdb_transaction_t> for OptimisticTransaction {
145    fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
146        self.inner
147    }
148}
149
150impl Read for OptimisticTransaction {}
151
152impl GetCF<ReadOptions> for OptimisticTransaction {
153    fn get_cf_full<K: AsRef<[u8]>>(
154        &self,
155        cf: Option<&ColumnFamily>,
156        key: K,
157        readopts: Option<&ReadOptions>,
158    ) -> Result<Option<DBVector>, Error> {
159        let mut default_readopts = None;
160
161        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
162
163        let key = key.as_ref();
164        let key_ptr = key.as_ptr() as *const c_char;
165        let key_len = key.len() as size_t;
166
167        unsafe {
168            let mut val_len: size_t = 0;
169
170            let val = match cf {
171                Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
172                    self.handle(),
173                    ro_handle,
174                    cf.inner,
175                    key_ptr,
176                    key_len,
177                    &mut val_len,
178                )),
179                None => ffi_try!(ffi::rocksdb_transaction_get(
180                    self.handle(),
181                    ro_handle,
182                    key_ptr,
183                    key_len,
184                    &mut val_len,
185                )),
186            } as *mut u8;
187
188            if val.is_null() {
189                Ok(None)
190            } else {
191                Ok(Some(DBVector::from_c(val, val_len)))
192            }
193        }
194    }
195}
196
197impl MultiGet<ReadOptions> for OptimisticTransaction {
198    fn multi_get_full<K, I>(
199        &self,
200        keys: I,
201        readopts: Option<&ReadOptions>,
202    ) -> Vec<Result<Option<DBVector>, Error>>
203    where
204        K: AsRef<[u8]>,
205        I: IntoIterator<Item = K>,
206    {
207        let mut default_readopts = None;
208        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
209            Ok(ro) => ro,
210            Err(e) => {
211                let key_count = keys.into_iter().count();
212
213                return vec![e; key_count]
214                    .iter()
215                    .map(|e| Err(e.to_owned()))
216                    .collect();
217            }
218        };
219
220        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
221            .into_iter()
222            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
223            .unzip();
224        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
225
226        let mut values = vec![ptr::null_mut(); keys.len()];
227        let mut values_sizes = vec![0_usize; keys.len()];
228        let mut errors = vec![ptr::null_mut(); keys.len()];
229        unsafe {
230            ffi::rocksdb_transaction_multi_get(
231                self.inner,
232                ro_handle,
233                ptr_keys.len(),
234                ptr_keys.as_ptr(),
235                keys_sizes.as_ptr(),
236                values.as_mut_ptr(),
237                values_sizes.as_mut_ptr(),
238                errors.as_mut_ptr(),
239            );
240        }
241
242        convert_values(values, values_sizes, errors)
243    }
244}
245
246impl MultiGetCF<ReadOptions> for OptimisticTransaction {
247    fn multi_get_cf_full<'a, K, I>(
248        &self,
249        keys: I,
250        readopts: Option<&ReadOptions>,
251    ) -> Vec<Result<Option<DBVector>, Error>>
252    where
253        K: AsRef<[u8]>,
254        I: IntoIterator<Item = (&'a ColumnFamily, K)>,
255    {
256        let mut default_readopts = None;
257        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
258            Ok(ro) => ro,
259            Err(e) => {
260                let key_count = keys.into_iter().count();
261
262                return vec![e; key_count]
263                    .iter()
264                    .map(|e| Err(e.to_owned()))
265                    .collect();
266            }
267        };
268
269        let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
270            .into_iter()
271            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
272            .unzip();
273        let ptr_keys: Vec<_> = cfs_and_keys
274            .iter()
275            .map(|(_, k)| k.as_ptr() as *const c_char)
276            .collect();
277        let ptr_cfs: Vec<_> = cfs_and_keys
278            .iter()
279            .map(|(c, _)| c.inner as *const _)
280            .collect();
281
282        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
283        let mut values_sizes = vec![0_usize; ptr_keys.len()];
284        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
285        unsafe {
286            ffi::rocksdb_transaction_multi_get_cf(
287                self.inner,
288                ro_handle,
289                ptr_cfs.as_ptr(),
290                ptr_keys.len(),
291                ptr_keys.as_ptr(),
292                keys_sizes.as_ptr(),
293                values.as_mut_ptr(),
294                values_sizes.as_mut_ptr(),
295                errors.as_mut_ptr(),
296            );
297        }
298
299        convert_values(values, values_sizes, errors)
300    }
301}
302
303impl Iterate for OptimisticTransaction {
304    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
305        unsafe {
306            DBRawIterator {
307                inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
308                db: PhantomData,
309            }
310        }
311    }
312}
313
314impl IterateCF for OptimisticTransaction {
315    fn get_raw_iter_cf<'a: 'b, 'b>(
316        &'a self,
317        cf_handle: &ColumnFamily,
318        readopts: &ReadOptions,
319    ) -> Result<DBRawIterator<'b>, Error> {
320        unsafe {
321            Ok(DBRawIterator {
322                inner: ffi::rocksdb_transaction_create_iterator_cf(
323                    self.inner,
324                    readopts.handle(),
325                    cf_handle.inner,
326                ),
327                db: PhantomData,
328            })
329        }
330    }
331}
332
333impl PutCF<()> for OptimisticTransaction {
334    fn put_cf_full<K, V>(
335        &self,
336        cf: Option<&ColumnFamily>,
337        key: K,
338        value: V,
339        _: Option<&()>,
340    ) -> Result<(), Error>
341    where
342        K: AsRef<[u8]>,
343        V: AsRef<[u8]>,
344    {
345        let key = key.as_ref();
346        let value = value.as_ref();
347        let key_ptr = key.as_ptr() as *const c_char;
348        let key_len = key.len() as size_t;
349        let val_ptr = value.as_ptr() as *const c_char;
350        let val_len = value.len() as size_t;
351
352        unsafe {
353            match cf {
354                Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
355                    self.handle(),
356                    cf.handle(),
357                    key_ptr,
358                    key_len,
359                    val_ptr,
360                    val_len,
361                )),
362                None => ffi_try!(ffi::rocksdb_transaction_put(
363                    self.handle(),
364                    key_ptr,
365                    key_len,
366                    val_ptr,
367                    val_len,
368                )),
369            }
370
371            Ok(())
372        }
373    }
374}
375
376impl MergeCF<()> for OptimisticTransaction {
377    fn merge_cf_full<K, V>(
378        &self,
379        cf: Option<&ColumnFamily>,
380        key: K,
381        value: V,
382        _: Option<&()>,
383    ) -> Result<(), Error>
384    where
385        K: AsRef<[u8]>,
386        V: AsRef<[u8]>,
387    {
388        let key = key.as_ref();
389        let value = value.as_ref();
390        let key_ptr = key.as_ptr() as *const c_char;
391        let key_len = key.len() as size_t;
392        let val_ptr = value.as_ptr() as *const c_char;
393        let val_len = value.len() as size_t;
394
395        unsafe {
396            match cf {
397                Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
398                    self.handle(),
399                    cf.handle(),
400                    key_ptr,
401                    key_len,
402                    val_ptr,
403                    val_len,
404                )),
405                None => ffi_try!(ffi::rocksdb_transaction_merge(
406                    self.handle(),
407                    key_ptr,
408                    key_len,
409                    val_ptr,
410                    val_len,
411                )),
412            }
413
414            Ok(())
415        }
416    }
417}
418
419impl DeleteCF<()> for OptimisticTransaction {
420    fn delete_cf_full<K>(
421        &self,
422        cf: Option<&ColumnFamily>,
423        key: K,
424        _: Option<&()>,
425    ) -> Result<(), Error>
426    where
427        K: AsRef<[u8]>,
428    {
429        let key = key.as_ref();
430        let key_ptr = key.as_ptr() as *const c_char;
431        let key_len = key.len() as size_t;
432
433        unsafe {
434            match cf {
435                Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
436                    self.handle(),
437                    cf.inner,
438                    key_ptr,
439                    key_len,
440                )),
441                None => ffi_try!(ffi::rocksdb_transaction_delete(
442                    self.handle(),
443                    key_ptr,
444                    key_len,
445                )),
446            }
447
448            Ok(())
449        }
450    }
451}
452
453pub struct OptimisticTransactionSnapshot<'a> {
454    txn: &'a OptimisticTransaction,
455    inner: *const ffi::rocksdb_snapshot_t,
456}
457
458unsafe impl Send for OptimisticTransactionSnapshot<'_> {}
459unsafe impl Sync for OptimisticTransactionSnapshot<'_> {}
460
461impl ConstHandle<ffi::rocksdb_snapshot_t> for OptimisticTransactionSnapshot<'_> {
462    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
463        self.inner
464    }
465}
466
467impl Read for OptimisticTransactionSnapshot<'_> {}
468
469impl GetCF<ReadOptions> for OptimisticTransactionSnapshot<'_> {
470    fn get_cf_full<K: AsRef<[u8]>>(
471        &self,
472        cf: Option<&ColumnFamily>,
473        key: K,
474        readopts: Option<&ReadOptions>,
475    ) -> Result<Option<DBVector>, Error> {
476        let mut ro = readopts.cloned().unwrap_or_default();
477        ro.set_snapshot(self);
478        self.txn.get_cf_full(cf, key, Some(&ro))
479    }
480}
481
482impl MultiGet<ReadOptions> for OptimisticTransactionSnapshot<'_> {
483    fn multi_get_full<K, I>(
484        &self,
485        keys: I,
486        readopts: Option<&ReadOptions>,
487    ) -> Vec<Result<Option<DBVector>, Error>>
488    where
489        K: AsRef<[u8]>,
490        I: IntoIterator<Item = K>,
491    {
492        let mut ro = readopts.cloned().unwrap_or_default();
493        ro.set_snapshot(self);
494        self.txn.multi_get_full(keys, Some(&ro))
495    }
496}
497
498impl MultiGetCF<ReadOptions> for OptimisticTransactionSnapshot<'_> {
499    fn multi_get_cf_full<'m, K, I>(
500        &self,
501        keys: I,
502        readopts: Option<&ReadOptions>,
503    ) -> Vec<Result<Option<DBVector>, Error>>
504    where
505        K: AsRef<[u8]>,
506        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
507    {
508        let mut ro = readopts.cloned().unwrap_or_default();
509        ro.set_snapshot(self);
510        self.txn.multi_get_cf_full(keys, Some(&ro))
511    }
512}
513
514impl Drop for OptimisticTransactionSnapshot<'_> {
515    fn drop(&mut self) {
516        unsafe {
517            ffi::rocksdb_free(self.inner as *mut c_void);
518        }
519    }
520}
521
522impl Iterate for OptimisticTransactionSnapshot<'_> {
523    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
524        let mut readopts = readopts.to_owned();
525        readopts.set_snapshot(self);
526        self.txn.get_raw_iter(&readopts)
527    }
528}
529
530impl IterateCF for OptimisticTransactionSnapshot<'_> {
531    fn get_raw_iter_cf<'a: 'b, 'b>(
532        &'a self,
533        cf_handle: &ColumnFamily,
534        readopts: &ReadOptions,
535    ) -> Result<DBRawIterator<'b>, Error> {
536        let mut readopts = readopts.to_owned();
537        readopts.set_snapshot(self);
538        self.txn.get_raw_iter_cf(cf_handle, &readopts)
539    }
540}
541
542impl<'a> GetPinnedCF<'a> for OptimisticTransaction {
543    type ColumnFamily = &'a ColumnFamily;
544    type ReadOptions = &'a ReadOptions;
545
546    fn get_pinned_cf_full<K: AsRef<[u8]>>(
547        &'a self,
548        cf: Option<Self::ColumnFamily>,
549        key: K,
550        readopts: Option<Self::ReadOptions>,
551    ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
552        let mut default_readopts = None;
553
554        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
555
556        let key = key.as_ref();
557        let key_ptr = key.as_ptr() as *const c_char;
558        let key_len = key.len() as size_t;
559
560        unsafe {
561            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
562            let val = match cf {
563                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
564                    self.handle(),
565                    ro_handle,
566                    cf.handle(),
567                    key_ptr,
568                    key_len,
569                    &mut err,
570                ),
571                None => ffi::rocksdb_transaction_get_pinned(
572                    self.handle(),
573                    ro_handle,
574                    key_ptr,
575                    key_len,
576                    &mut err,
577                ),
578            };
579
580            if !err.is_null() {
581                return Err(Error::new(ffi_util::error_message(err)));
582            }
583
584            if val.is_null() {
585                Ok(None)
586            } else {
587                Ok(Some(DBPinnableSlice::from_c(val)))
588            }
589        }
590    }
591}
592
593impl<'a> GetPinnedCF<'a> for OptimisticTransactionSnapshot<'a> {
594    type ColumnFamily = &'a ColumnFamily;
595    type ReadOptions = &'a ReadOptions;
596
597    fn get_pinned_cf_full<K: AsRef<[u8]>>(
598        &'a self,
599        cf: Option<Self::ColumnFamily>,
600        key: K,
601        readopts: Option<Self::ReadOptions>,
602    ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
603        let mut ro = readopts.cloned().unwrap_or_default();
604        ro.set_snapshot(self);
605
606        let key = key.as_ref();
607        let key_ptr = key.as_ptr() as *const c_char;
608        let key_len = key.len() as size_t;
609
610        unsafe {
611            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
612            let val = match cf {
613                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
614                    self.txn.handle(),
615                    ro.handle(),
616                    cf.handle(),
617                    key_ptr,
618                    key_len,
619                    &mut err,
620                ),
621                None => ffi::rocksdb_transaction_get_pinned(
622                    self.txn.handle(),
623                    ro.handle(),
624                    key_ptr,
625                    key_len,
626                    &mut err,
627                ),
628            };
629
630            if !err.is_null() {
631                return Err(Error::new(ffi_util::error_message(err)));
632            }
633
634            if val.is_null() {
635                Ok(None)
636            } else {
637                Ok(Some(DBPinnableSlice::from_c(val)))
638            }
639        }
640    }
641}