ckb_rocksdb/
transaction.rs

1use crate::ffi;
2use crate::{
3    ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions, ffi_util,
4    handle::{ConstHandle, Handle},
5    ops::*,
6};
7use libc::{c_char, c_uchar, c_void, size_t};
8use std::marker::PhantomData;
9use std::ptr;
10
11pub struct Transaction<'a, T> {
12    inner: *mut ffi::rocksdb_transaction_t,
13    db: PhantomData<&'a T>,
14}
15
16impl<'a, T> Transaction<'a, T> {
17    pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> Transaction<'a, T> {
18        Transaction {
19            inner,
20            db: PhantomData,
21        }
22    }
23
24    /// commits a transaction
25    pub fn commit(&self) -> Result<(), Error> {
26        unsafe {
27            ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
28        }
29        Ok(())
30    }
31
32    /// Transaction rollback
33    pub fn rollback(&self) -> Result<(), Error> {
34        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
35        Ok(())
36    }
37
38    /// Transaction rollback to savepoint
39    pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
40        unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
41        Ok(())
42    }
43
44    /// Set savepoint for transaction
45    pub fn set_savepoint(&self) {
46        unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
47    }
48
49    /// Get Snapshot
50    pub fn snapshot(&'a self) -> TransactionSnapshot<'a, T> {
51        unsafe {
52            let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
53            TransactionSnapshot {
54                inner: snapshot,
55                db: self,
56            }
57        }
58    }
59
60    /// Get For Update
61    /// ReadOptions: Default
62    /// exclusive: true
63    pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
64        let opt = ReadOptions::default();
65        self.get_for_update_opt(key, &opt, true)
66    }
67
68    /// Get For Update with custom ReadOptions and exclusive
69    pub fn get_for_update_opt<K: AsRef<[u8]>>(
70        &self,
71        key: K,
72        readopts: &ReadOptions,
73        exclusive: bool,
74    ) -> Result<Option<DBVector>, Error> {
75        let key = key.as_ref();
76        let key_ptr = key.as_ptr() as *const c_char;
77        let key_len = key.len() as size_t;
78        unsafe {
79            let mut val_len: size_t = 0;
80            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
81                self.handle(),
82                readopts.handle(),
83                key_ptr,
84                key_len,
85                &mut val_len,
86                exclusive as c_uchar,
87            )) as *mut u8;
88
89            if val.is_null() {
90                Ok(None)
91            } else {
92                Ok(Some(DBVector::from_c(val, val_len)))
93            }
94        }
95    }
96
97    pub fn get_for_update_cf<K: AsRef<[u8]>>(
98        &self,
99        cf: &ColumnFamily,
100        key: K,
101    ) -> Result<Option<DBVector>, Error> {
102        let opt = ReadOptions::default();
103        self.get_for_update_cf_opt(cf, key, &opt, true)
104    }
105
106    pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
107        &self,
108        cf: &ColumnFamily,
109        key: K,
110        readopts: &ReadOptions,
111        exclusive: bool,
112    ) -> Result<Option<DBVector>, Error> {
113        let key = key.as_ref();
114        let key_ptr = key.as_ptr() as *const c_char;
115        let key_len = key.len() as size_t;
116        unsafe {
117            let mut val_len: size_t = 0;
118            let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
119                self.handle(),
120                readopts.handle(),
121                cf.handle(),
122                key_ptr,
123                key_len,
124                &mut val_len,
125                exclusive as c_uchar,
126            )) as *mut u8;
127
128            if val.is_null() {
129                Ok(None)
130            } else {
131                Ok(Some(DBVector::from_c(val, val_len)))
132            }
133        }
134    }
135}
136
137impl<T> Drop for Transaction<'_, T> {
138    fn drop(&mut self) {
139        unsafe {
140            ffi::rocksdb_transaction_destroy(self.inner);
141        }
142    }
143}
144
145impl<T> Handle<ffi::rocksdb_transaction_t> for Transaction<'_, T> {
146    fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
147        self.inner
148    }
149}
150
151impl<T> Read for Transaction<'_, T> {}
152
153impl<'a, T> GetCF<ReadOptions> for Transaction<'a, T>
154where
155    Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
156{
157    fn get_cf_full<K: AsRef<[u8]>>(
158        &self,
159        cf: Option<&ColumnFamily>,
160        key: K,
161        readopts: Option<&ReadOptions>,
162    ) -> Result<Option<DBVector>, Error> {
163        let mut default_readopts = None;
164
165        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
166
167        let key = key.as_ref();
168        let key_ptr = key.as_ptr() as *const c_char;
169        let key_len = key.len() as size_t;
170
171        unsafe {
172            let mut val_len: size_t = 0;
173
174            let val = match cf {
175                Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
176                    self.handle(),
177                    ro_handle,
178                    cf.inner,
179                    key_ptr,
180                    key_len,
181                    &mut val_len,
182                )),
183                None => ffi_try!(ffi::rocksdb_transaction_get(
184                    self.handle(),
185                    ro_handle,
186                    key_ptr,
187                    key_len,
188                    &mut val_len,
189                )),
190            } as *mut u8;
191
192            if val.is_null() {
193                Ok(None)
194            } else {
195                Ok(Some(DBVector::from_c(val, val_len)))
196            }
197        }
198    }
199}
200
201impl<'a, T> MultiGet<ReadOptions> for Transaction<'a, T>
202where
203    Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
204{
205    fn multi_get_full<K, I>(
206        &self,
207        keys: I,
208        readopts: Option<&ReadOptions>,
209    ) -> Vec<Result<Option<DBVector>, Error>>
210    where
211        K: AsRef<[u8]>,
212        I: IntoIterator<Item = K>,
213    {
214        let mut default_readopts = None;
215        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
216            Ok(ro) => ro,
217            Err(e) => {
218                let key_count = keys.into_iter().count();
219
220                return vec![e; key_count]
221                    .iter()
222                    .map(|e| Err(e.to_owned()))
223                    .collect();
224            }
225        };
226
227        let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
228            .into_iter()
229            .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
230            .unzip();
231        let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
232
233        let mut values = vec![ptr::null_mut(); keys.len()];
234        let mut values_sizes = vec![0_usize; keys.len()];
235        let mut errors = vec![ptr::null_mut(); keys.len()];
236        unsafe {
237            ffi::rocksdb_transaction_multi_get(
238                self.inner,
239                ro_handle,
240                ptr_keys.len(),
241                ptr_keys.as_ptr(),
242                keys_sizes.as_ptr(),
243                values.as_mut_ptr(),
244                values_sizes.as_mut_ptr(),
245                errors.as_mut_ptr(),
246            );
247        }
248
249        convert_values(values, values_sizes, errors)
250    }
251}
252
253impl<'a, T> MultiGetCF<ReadOptions> for Transaction<'a, T>
254where
255    Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
256{
257    fn multi_get_cf_full<'m, K, I>(
258        &self,
259        keys: I,
260        readopts: Option<&ReadOptions>,
261    ) -> Vec<Result<Option<DBVector>, Error>>
262    where
263        K: AsRef<[u8]>,
264        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
265    {
266        let mut default_readopts = None;
267        let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
268            Ok(ro) => ro,
269            Err(e) => {
270                let key_count = keys.into_iter().count();
271
272                return vec![e; key_count]
273                    .iter()
274                    .map(|e| Err(e.to_owned()))
275                    .collect();
276            }
277        };
278        let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
279            .into_iter()
280            .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
281            .unzip();
282        let ptr_keys: Vec<_> = cfs_and_keys
283            .iter()
284            .map(|(_, k)| k.as_ptr() as *const c_char)
285            .collect();
286        let ptr_cfs: Vec<_> = cfs_and_keys
287            .iter()
288            .map(|(c, _)| c.inner as *const _)
289            .collect();
290
291        let mut values = vec![ptr::null_mut(); ptr_keys.len()];
292        let mut values_sizes = vec![0_usize; ptr_keys.len()];
293        let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
294        unsafe {
295            ffi::rocksdb_transaction_multi_get_cf(
296                self.inner,
297                ro_handle,
298                ptr_cfs.as_ptr(),
299                ptr_keys.len(),
300                ptr_keys.as_ptr(),
301                keys_sizes.as_ptr(),
302                values.as_mut_ptr(),
303                values_sizes.as_mut_ptr(),
304                errors.as_mut_ptr(),
305            );
306        }
307
308        convert_values(values, values_sizes, errors)
309    }
310}
311
312impl<T> Iterate for Transaction<'_, T> {
313    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
314        unsafe {
315            DBRawIterator {
316                inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
317                db: PhantomData,
318            }
319        }
320    }
321}
322
323impl<T> IterateCF for Transaction<'_, T> {
324    fn get_raw_iter_cf<'a: 'b, 'b>(
325        &'a self,
326        cf_handle: &ColumnFamily,
327        readopts: &ReadOptions,
328    ) -> Result<DBRawIterator<'b>, Error> {
329        unsafe {
330            Ok(DBRawIterator {
331                inner: ffi::rocksdb_transaction_create_iterator_cf(
332                    self.inner,
333                    readopts.handle(),
334                    cf_handle.inner,
335                ),
336                db: PhantomData,
337            })
338        }
339    }
340}
341
342pub struct TransactionSnapshot<'a, T> {
343    db: &'a Transaction<'a, T>,
344    inner: *const ffi::rocksdb_snapshot_t,
345}
346
347impl<T> ConstHandle<ffi::rocksdb_snapshot_t> for TransactionSnapshot<'_, T> {
348    fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
349        self.inner
350    }
351}
352
353impl<T> Read for TransactionSnapshot<'_, T> {}
354
355impl<'a, T> GetCF<ReadOptions> for TransactionSnapshot<'a, T>
356where
357    Transaction<'a, T>: GetCF<ReadOptions>,
358{
359    fn get_cf_full<K: AsRef<[u8]>>(
360        &self,
361        cf: Option<&ColumnFamily>,
362        key: K,
363        readopts: Option<&ReadOptions>,
364    ) -> Result<Option<DBVector>, Error> {
365        let mut ro = readopts.cloned().unwrap_or_default();
366        ro.set_snapshot(self);
367        self.db.get_cf_full(cf, key, Some(&ro))
368    }
369}
370
371impl<'a, T> MultiGet<ReadOptions> for TransactionSnapshot<'a, T>
372where
373    Transaction<'a, T>: MultiGet<ReadOptions>,
374{
375    fn multi_get_full<K, I>(
376        &self,
377        keys: I,
378        readopts: Option<&ReadOptions>,
379    ) -> Vec<Result<Option<DBVector>, Error>>
380    where
381        K: AsRef<[u8]>,
382        I: IntoIterator<Item = K>,
383    {
384        let mut ro = readopts.cloned().unwrap_or_default();
385        ro.set_snapshot(self);
386        self.db.multi_get_full(keys, Some(&ro))
387    }
388}
389
390impl<'a, T> MultiGetCF<ReadOptions> for TransactionSnapshot<'a, T>
391where
392    Transaction<'a, T>: MultiGet<ReadOptions>,
393{
394    fn multi_get_cf_full<'m, K, I>(
395        &self,
396        keys: I,
397        readopts: Option<&ReadOptions>,
398    ) -> Vec<Result<Option<DBVector>, Error>>
399    where
400        K: AsRef<[u8]>,
401        I: IntoIterator<Item = (&'m ColumnFamily, K)>,
402    {
403        let mut ro = readopts.cloned().unwrap_or_default();
404        ro.set_snapshot(self);
405        self.db.multi_get_cf_full(keys, Some(&ro))
406    }
407}
408
409impl<T> PutCF<()> for Transaction<'_, T> {
410    fn put_cf_full<K, V>(
411        &self,
412        cf: Option<&ColumnFamily>,
413        key: K,
414        value: V,
415        _: Option<&()>,
416    ) -> Result<(), Error>
417    where
418        K: AsRef<[u8]>,
419        V: AsRef<[u8]>,
420    {
421        let key = key.as_ref();
422        let value = value.as_ref();
423        let key_ptr = key.as_ptr() as *const c_char;
424        let key_len = key.len() as size_t;
425        let val_ptr = value.as_ptr() as *const c_char;
426        let val_len = value.len() as size_t;
427
428        unsafe {
429            match cf {
430                Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
431                    self.handle(),
432                    cf.handle(),
433                    key_ptr,
434                    key_len,
435                    val_ptr,
436                    val_len,
437                )),
438                None => ffi_try!(ffi::rocksdb_transaction_put(
439                    self.handle(),
440                    key_ptr,
441                    key_len,
442                    val_ptr,
443                    val_len,
444                )),
445            }
446
447            Ok(())
448        }
449    }
450}
451
452impl<T> MergeCF<()> for Transaction<'_, T> {
453    fn merge_cf_full<K, V>(
454        &self,
455        cf: Option<&ColumnFamily>,
456        key: K,
457        value: V,
458        _: Option<&()>,
459    ) -> Result<(), Error>
460    where
461        K: AsRef<[u8]>,
462        V: AsRef<[u8]>,
463    {
464        let key = key.as_ref();
465        let value = value.as_ref();
466        let key_ptr = key.as_ptr() as *const c_char;
467        let key_len = key.len() as size_t;
468        let val_ptr = value.as_ptr() as *const c_char;
469        let val_len = value.len() as size_t;
470
471        unsafe {
472            match cf {
473                Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
474                    self.handle(),
475                    cf.handle(),
476                    key_ptr,
477                    key_len,
478                    val_ptr,
479                    val_len,
480                )),
481                None => ffi_try!(ffi::rocksdb_transaction_merge(
482                    self.handle(),
483                    key_ptr,
484                    key_len,
485                    val_ptr,
486                    val_len,
487                )),
488            }
489
490            Ok(())
491        }
492    }
493}
494
495impl<T> DeleteCF<()> for Transaction<'_, T> {
496    fn delete_cf_full<K>(
497        &self,
498        cf: Option<&ColumnFamily>,
499        key: K,
500        _: Option<&()>,
501    ) -> Result<(), Error>
502    where
503        K: AsRef<[u8]>,
504    {
505        let key = key.as_ref();
506        let key_ptr = key.as_ptr() as *const c_char;
507        let key_len = key.len() as size_t;
508
509        unsafe {
510            match cf {
511                Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
512                    self.handle(),
513                    cf.inner,
514                    key_ptr,
515                    key_len,
516                )),
517                None => ffi_try!(ffi::rocksdb_transaction_delete(
518                    self.handle(),
519                    key_ptr,
520                    key_len,
521                )),
522            }
523
524            Ok(())
525        }
526    }
527}
528
529impl<T> Drop for TransactionSnapshot<'_, T> {
530    fn drop(&mut self) {
531        unsafe {
532            ffi::rocksdb_free(self.inner as *mut c_void);
533        }
534    }
535}
536
537impl<T: Iterate> Iterate for TransactionSnapshot<'_, T> {
538    fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
539        let mut readopts = readopts.to_owned();
540        readopts.set_snapshot(self);
541        self.db.get_raw_iter(&readopts)
542    }
543}
544
545impl<T: IterateCF> IterateCF for TransactionSnapshot<'_, T> {
546    fn get_raw_iter_cf<'a: 'b, 'b>(
547        &'a self,
548        cf_handle: &ColumnFamily,
549        readopts: &ReadOptions,
550    ) -> Result<DBRawIterator<'b>, Error> {
551        let mut readopts = readopts.to_owned();
552        readopts.set_snapshot(self);
553        self.db.get_raw_iter_cf(cf_handle, &readopts)
554    }
555}
556
557impl<'a, T> GetPinnedCF<'a> for Transaction<'a, T> {
558    type ColumnFamily = &'a ColumnFamily;
559    type ReadOptions = &'a ReadOptions;
560
561    fn get_pinned_cf_full<K: AsRef<[u8]>>(
562        &'a self,
563        cf: Option<Self::ColumnFamily>,
564        key: K,
565        readopts: Option<Self::ReadOptions>,
566    ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
567        let mut default_readopts = None;
568
569        let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
570
571        let key = key.as_ref();
572        let key_ptr = key.as_ptr() as *const c_char;
573        let key_len = key.len() as size_t;
574
575        unsafe {
576            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
577            let val = match cf {
578                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
579                    self.handle(),
580                    ro_handle,
581                    cf.handle(),
582                    key_ptr,
583                    key_len,
584                    &mut err,
585                ),
586                None => ffi::rocksdb_transaction_get_pinned(
587                    self.handle(),
588                    ro_handle,
589                    key_ptr,
590                    key_len,
591                    &mut err,
592                ),
593            };
594
595            if !err.is_null() {
596                return Err(Error::new(ffi_util::error_message(err)));
597            }
598
599            if val.is_null() {
600                Ok(None)
601            } else {
602                Ok(Some(DBPinnableSlice::from_c(val)))
603            }
604        }
605    }
606}
607
608impl<'a, T> GetPinnedCF<'a> for TransactionSnapshot<'a, T> {
609    type ColumnFamily = &'a ColumnFamily;
610    type ReadOptions = &'a ReadOptions;
611
612    fn get_pinned_cf_full<K: AsRef<[u8]>>(
613        &'a self,
614        cf: Option<Self::ColumnFamily>,
615        key: K,
616        readopts: Option<Self::ReadOptions>,
617    ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
618        let mut ro = readopts.cloned().unwrap_or_default();
619        ro.set_snapshot(self);
620
621        let key = key.as_ref();
622        let key_ptr = key.as_ptr() as *const c_char;
623        let key_len = key.len() as size_t;
624
625        unsafe {
626            let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
627            let val = match cf {
628                Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
629                    self.db.handle(),
630                    ro.handle(),
631                    cf.handle(),
632                    key_ptr,
633                    key_len,
634                    &mut err,
635                ),
636                None => ffi::rocksdb_transaction_get_pinned(
637                    self.db.handle(),
638                    ro.handle(),
639                    key_ptr,
640                    key_len,
641                    &mut err,
642                ),
643            };
644
645            if !err.is_null() {
646                return Err(Error::new(ffi_util::error_message(err)));
647            }
648
649            if val.is_null() {
650                Ok(None)
651            } else {
652                Ok(Some(DBPinnableSlice::from_c(val)))
653            }
654        }
655    }
656}