1use crate::ffi;
2use crate::{
3 ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions, ffi_util,
4 handle::{ConstHandle, Handle},
5 ops::*,
6};
7use libc::{c_char, c_uchar, c_void, size_t};
8use std::marker::PhantomData;
9use std::ptr;
10
11pub struct Transaction<'a, T> {
12 inner: *mut ffi::rocksdb_transaction_t,
13 db: PhantomData<&'a T>,
14}
15
16impl<'a, T> Transaction<'a, T> {
17 pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> Transaction<'a, T> {
18 Transaction {
19 inner,
20 db: PhantomData,
21 }
22 }
23
24 pub fn commit(&self) -> Result<(), Error> {
26 unsafe {
27 ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
28 }
29 Ok(())
30 }
31
32 pub fn rollback(&self) -> Result<(), Error> {
34 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
35 Ok(())
36 }
37
38 pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
40 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
41 Ok(())
42 }
43
44 pub fn set_savepoint(&self) {
46 unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
47 }
48
49 pub fn snapshot(&'a self) -> TransactionSnapshot<'a, T> {
51 unsafe {
52 let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
53 TransactionSnapshot {
54 inner: snapshot,
55 db: self,
56 }
57 }
58 }
59
60 pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
64 let opt = ReadOptions::default();
65 self.get_for_update_opt(key, &opt, true)
66 }
67
68 pub fn get_for_update_opt<K: AsRef<[u8]>>(
70 &self,
71 key: K,
72 readopts: &ReadOptions,
73 exclusive: bool,
74 ) -> Result<Option<DBVector>, Error> {
75 let key = key.as_ref();
76 let key_ptr = key.as_ptr() as *const c_char;
77 let key_len = key.len() as size_t;
78 unsafe {
79 let mut val_len: size_t = 0;
80 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
81 self.handle(),
82 readopts.handle(),
83 key_ptr,
84 key_len,
85 &mut val_len,
86 exclusive as c_uchar,
87 )) as *mut u8;
88
89 if val.is_null() {
90 Ok(None)
91 } else {
92 Ok(Some(DBVector::from_c(val, val_len)))
93 }
94 }
95 }
96
97 pub fn get_for_update_cf<K: AsRef<[u8]>>(
98 &self,
99 cf: &ColumnFamily,
100 key: K,
101 ) -> Result<Option<DBVector>, Error> {
102 let opt = ReadOptions::default();
103 self.get_for_update_cf_opt(cf, key, &opt, true)
104 }
105
106 pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
107 &self,
108 cf: &ColumnFamily,
109 key: K,
110 readopts: &ReadOptions,
111 exclusive: bool,
112 ) -> Result<Option<DBVector>, Error> {
113 let key = key.as_ref();
114 let key_ptr = key.as_ptr() as *const c_char;
115 let key_len = key.len() as size_t;
116 unsafe {
117 let mut val_len: size_t = 0;
118 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
119 self.handle(),
120 readopts.handle(),
121 cf.handle(),
122 key_ptr,
123 key_len,
124 &mut val_len,
125 exclusive as c_uchar,
126 )) as *mut u8;
127
128 if val.is_null() {
129 Ok(None)
130 } else {
131 Ok(Some(DBVector::from_c(val, val_len)))
132 }
133 }
134 }
135}
136
137impl<T> Drop for Transaction<'_, T> {
138 fn drop(&mut self) {
139 unsafe {
140 ffi::rocksdb_transaction_destroy(self.inner);
141 }
142 }
143}
144
145impl<T> Handle<ffi::rocksdb_transaction_t> for Transaction<'_, T> {
146 fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
147 self.inner
148 }
149}
150
151impl<T> Read for Transaction<'_, T> {}
152
153impl<'a, T> GetCF<ReadOptions> for Transaction<'a, T>
154where
155 Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
156{
157 fn get_cf_full<K: AsRef<[u8]>>(
158 &self,
159 cf: Option<&ColumnFamily>,
160 key: K,
161 readopts: Option<&ReadOptions>,
162 ) -> Result<Option<DBVector>, Error> {
163 let mut default_readopts = None;
164
165 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
166
167 let key = key.as_ref();
168 let key_ptr = key.as_ptr() as *const c_char;
169 let key_len = key.len() as size_t;
170
171 unsafe {
172 let mut val_len: size_t = 0;
173
174 let val = match cf {
175 Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
176 self.handle(),
177 ro_handle,
178 cf.inner,
179 key_ptr,
180 key_len,
181 &mut val_len,
182 )),
183 None => ffi_try!(ffi::rocksdb_transaction_get(
184 self.handle(),
185 ro_handle,
186 key_ptr,
187 key_len,
188 &mut val_len,
189 )),
190 } as *mut u8;
191
192 if val.is_null() {
193 Ok(None)
194 } else {
195 Ok(Some(DBVector::from_c(val, val_len)))
196 }
197 }
198 }
199}
200
201impl<'a, T> MultiGet<ReadOptions> for Transaction<'a, T>
202where
203 Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
204{
205 fn multi_get_full<K, I>(
206 &self,
207 keys: I,
208 readopts: Option<&ReadOptions>,
209 ) -> Vec<Result<Option<DBVector>, Error>>
210 where
211 K: AsRef<[u8]>,
212 I: IntoIterator<Item = K>,
213 {
214 let mut default_readopts = None;
215 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
216 Ok(ro) => ro,
217 Err(e) => {
218 let key_count = keys.into_iter().count();
219
220 return vec![e; key_count]
221 .iter()
222 .map(|e| Err(e.to_owned()))
223 .collect();
224 }
225 };
226
227 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
228 .into_iter()
229 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
230 .unzip();
231 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
232
233 let mut values = vec![ptr::null_mut(); keys.len()];
234 let mut values_sizes = vec![0_usize; keys.len()];
235 let mut errors = vec![ptr::null_mut(); keys.len()];
236 unsafe {
237 ffi::rocksdb_transaction_multi_get(
238 self.inner,
239 ro_handle,
240 ptr_keys.len(),
241 ptr_keys.as_ptr(),
242 keys_sizes.as_ptr(),
243 values.as_mut_ptr(),
244 values_sizes.as_mut_ptr(),
245 errors.as_mut_ptr(),
246 );
247 }
248
249 convert_values(values, values_sizes, errors)
250 }
251}
252
253impl<'a, T> MultiGetCF<ReadOptions> for Transaction<'a, T>
254where
255 Transaction<'a, T>: Handle<ffi::rocksdb_transaction_t> + Read,
256{
257 fn multi_get_cf_full<'m, K, I>(
258 &self,
259 keys: I,
260 readopts: Option<&ReadOptions>,
261 ) -> Vec<Result<Option<DBVector>, Error>>
262 where
263 K: AsRef<[u8]>,
264 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
265 {
266 let mut default_readopts = None;
267 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
268 Ok(ro) => ro,
269 Err(e) => {
270 let key_count = keys.into_iter().count();
271
272 return vec![e; key_count]
273 .iter()
274 .map(|e| Err(e.to_owned()))
275 .collect();
276 }
277 };
278 let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
279 .into_iter()
280 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
281 .unzip();
282 let ptr_keys: Vec<_> = cfs_and_keys
283 .iter()
284 .map(|(_, k)| k.as_ptr() as *const c_char)
285 .collect();
286 let ptr_cfs: Vec<_> = cfs_and_keys
287 .iter()
288 .map(|(c, _)| c.inner as *const _)
289 .collect();
290
291 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
292 let mut values_sizes = vec![0_usize; ptr_keys.len()];
293 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
294 unsafe {
295 ffi::rocksdb_transaction_multi_get_cf(
296 self.inner,
297 ro_handle,
298 ptr_cfs.as_ptr(),
299 ptr_keys.len(),
300 ptr_keys.as_ptr(),
301 keys_sizes.as_ptr(),
302 values.as_mut_ptr(),
303 values_sizes.as_mut_ptr(),
304 errors.as_mut_ptr(),
305 );
306 }
307
308 convert_values(values, values_sizes, errors)
309 }
310}
311
312impl<T> Iterate for Transaction<'_, T> {
313 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
314 unsafe {
315 DBRawIterator {
316 inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
317 db: PhantomData,
318 }
319 }
320 }
321}
322
323impl<T> IterateCF for Transaction<'_, T> {
324 fn get_raw_iter_cf<'a: 'b, 'b>(
325 &'a self,
326 cf_handle: &ColumnFamily,
327 readopts: &ReadOptions,
328 ) -> Result<DBRawIterator<'b>, Error> {
329 unsafe {
330 Ok(DBRawIterator {
331 inner: ffi::rocksdb_transaction_create_iterator_cf(
332 self.inner,
333 readopts.handle(),
334 cf_handle.inner,
335 ),
336 db: PhantomData,
337 })
338 }
339 }
340}
341
342pub struct TransactionSnapshot<'a, T> {
343 db: &'a Transaction<'a, T>,
344 inner: *const ffi::rocksdb_snapshot_t,
345}
346
347impl<T> ConstHandle<ffi::rocksdb_snapshot_t> for TransactionSnapshot<'_, T> {
348 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
349 self.inner
350 }
351}
352
353impl<T> Read for TransactionSnapshot<'_, T> {}
354
355impl<'a, T> GetCF<ReadOptions> for TransactionSnapshot<'a, T>
356where
357 Transaction<'a, T>: GetCF<ReadOptions>,
358{
359 fn get_cf_full<K: AsRef<[u8]>>(
360 &self,
361 cf: Option<&ColumnFamily>,
362 key: K,
363 readopts: Option<&ReadOptions>,
364 ) -> Result<Option<DBVector>, Error> {
365 let mut ro = readopts.cloned().unwrap_or_default();
366 ro.set_snapshot(self);
367 self.db.get_cf_full(cf, key, Some(&ro))
368 }
369}
370
371impl<'a, T> MultiGet<ReadOptions> for TransactionSnapshot<'a, T>
372where
373 Transaction<'a, T>: MultiGet<ReadOptions>,
374{
375 fn multi_get_full<K, I>(
376 &self,
377 keys: I,
378 readopts: Option<&ReadOptions>,
379 ) -> Vec<Result<Option<DBVector>, Error>>
380 where
381 K: AsRef<[u8]>,
382 I: IntoIterator<Item = K>,
383 {
384 let mut ro = readopts.cloned().unwrap_or_default();
385 ro.set_snapshot(self);
386 self.db.multi_get_full(keys, Some(&ro))
387 }
388}
389
390impl<'a, T> MultiGetCF<ReadOptions> for TransactionSnapshot<'a, T>
391where
392 Transaction<'a, T>: MultiGet<ReadOptions>,
393{
394 fn multi_get_cf_full<'m, K, I>(
395 &self,
396 keys: I,
397 readopts: Option<&ReadOptions>,
398 ) -> Vec<Result<Option<DBVector>, Error>>
399 where
400 K: AsRef<[u8]>,
401 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
402 {
403 let mut ro = readopts.cloned().unwrap_or_default();
404 ro.set_snapshot(self);
405 self.db.multi_get_cf_full(keys, Some(&ro))
406 }
407}
408
409impl<T> PutCF<()> for Transaction<'_, T> {
410 fn put_cf_full<K, V>(
411 &self,
412 cf: Option<&ColumnFamily>,
413 key: K,
414 value: V,
415 _: Option<&()>,
416 ) -> Result<(), Error>
417 where
418 K: AsRef<[u8]>,
419 V: AsRef<[u8]>,
420 {
421 let key = key.as_ref();
422 let value = value.as_ref();
423 let key_ptr = key.as_ptr() as *const c_char;
424 let key_len = key.len() as size_t;
425 let val_ptr = value.as_ptr() as *const c_char;
426 let val_len = value.len() as size_t;
427
428 unsafe {
429 match cf {
430 Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
431 self.handle(),
432 cf.handle(),
433 key_ptr,
434 key_len,
435 val_ptr,
436 val_len,
437 )),
438 None => ffi_try!(ffi::rocksdb_transaction_put(
439 self.handle(),
440 key_ptr,
441 key_len,
442 val_ptr,
443 val_len,
444 )),
445 }
446
447 Ok(())
448 }
449 }
450}
451
452impl<T> MergeCF<()> for Transaction<'_, T> {
453 fn merge_cf_full<K, V>(
454 &self,
455 cf: Option<&ColumnFamily>,
456 key: K,
457 value: V,
458 _: Option<&()>,
459 ) -> Result<(), Error>
460 where
461 K: AsRef<[u8]>,
462 V: AsRef<[u8]>,
463 {
464 let key = key.as_ref();
465 let value = value.as_ref();
466 let key_ptr = key.as_ptr() as *const c_char;
467 let key_len = key.len() as size_t;
468 let val_ptr = value.as_ptr() as *const c_char;
469 let val_len = value.len() as size_t;
470
471 unsafe {
472 match cf {
473 Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
474 self.handle(),
475 cf.handle(),
476 key_ptr,
477 key_len,
478 val_ptr,
479 val_len,
480 )),
481 None => ffi_try!(ffi::rocksdb_transaction_merge(
482 self.handle(),
483 key_ptr,
484 key_len,
485 val_ptr,
486 val_len,
487 )),
488 }
489
490 Ok(())
491 }
492 }
493}
494
495impl<T> DeleteCF<()> for Transaction<'_, T> {
496 fn delete_cf_full<K>(
497 &self,
498 cf: Option<&ColumnFamily>,
499 key: K,
500 _: Option<&()>,
501 ) -> Result<(), Error>
502 where
503 K: AsRef<[u8]>,
504 {
505 let key = key.as_ref();
506 let key_ptr = key.as_ptr() as *const c_char;
507 let key_len = key.len() as size_t;
508
509 unsafe {
510 match cf {
511 Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
512 self.handle(),
513 cf.inner,
514 key_ptr,
515 key_len,
516 )),
517 None => ffi_try!(ffi::rocksdb_transaction_delete(
518 self.handle(),
519 key_ptr,
520 key_len,
521 )),
522 }
523
524 Ok(())
525 }
526 }
527}
528
529impl<T> Drop for TransactionSnapshot<'_, T> {
530 fn drop(&mut self) {
531 unsafe {
532 ffi::rocksdb_free(self.inner as *mut c_void);
533 }
534 }
535}
536
537impl<T: Iterate> Iterate for TransactionSnapshot<'_, T> {
538 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
539 let mut readopts = readopts.to_owned();
540 readopts.set_snapshot(self);
541 self.db.get_raw_iter(&readopts)
542 }
543}
544
545impl<T: IterateCF> IterateCF for TransactionSnapshot<'_, T> {
546 fn get_raw_iter_cf<'a: 'b, 'b>(
547 &'a self,
548 cf_handle: &ColumnFamily,
549 readopts: &ReadOptions,
550 ) -> Result<DBRawIterator<'b>, Error> {
551 let mut readopts = readopts.to_owned();
552 readopts.set_snapshot(self);
553 self.db.get_raw_iter_cf(cf_handle, &readopts)
554 }
555}
556
557impl<'a, T> GetPinnedCF<'a> for Transaction<'a, T> {
558 type ColumnFamily = &'a ColumnFamily;
559 type ReadOptions = &'a ReadOptions;
560
561 fn get_pinned_cf_full<K: AsRef<[u8]>>(
562 &'a self,
563 cf: Option<Self::ColumnFamily>,
564 key: K,
565 readopts: Option<Self::ReadOptions>,
566 ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
567 let mut default_readopts = None;
568
569 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
570
571 let key = key.as_ref();
572 let key_ptr = key.as_ptr() as *const c_char;
573 let key_len = key.len() as size_t;
574
575 unsafe {
576 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
577 let val = match cf {
578 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
579 self.handle(),
580 ro_handle,
581 cf.handle(),
582 key_ptr,
583 key_len,
584 &mut err,
585 ),
586 None => ffi::rocksdb_transaction_get_pinned(
587 self.handle(),
588 ro_handle,
589 key_ptr,
590 key_len,
591 &mut err,
592 ),
593 };
594
595 if !err.is_null() {
596 return Err(Error::new(ffi_util::error_message(err)));
597 }
598
599 if val.is_null() {
600 Ok(None)
601 } else {
602 Ok(Some(DBPinnableSlice::from_c(val)))
603 }
604 }
605 }
606}
607
608impl<'a, T> GetPinnedCF<'a> for TransactionSnapshot<'a, T> {
609 type ColumnFamily = &'a ColumnFamily;
610 type ReadOptions = &'a ReadOptions;
611
612 fn get_pinned_cf_full<K: AsRef<[u8]>>(
613 &'a self,
614 cf: Option<Self::ColumnFamily>,
615 key: K,
616 readopts: Option<Self::ReadOptions>,
617 ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
618 let mut ro = readopts.cloned().unwrap_or_default();
619 ro.set_snapshot(self);
620
621 let key = key.as_ref();
622 let key_ptr = key.as_ptr() as *const c_char;
623 let key_len = key.len() as size_t;
624
625 unsafe {
626 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
627 let val = match cf {
628 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
629 self.db.handle(),
630 ro.handle(),
631 cf.handle(),
632 key_ptr,
633 key_len,
634 &mut err,
635 ),
636 None => ffi::rocksdb_transaction_get_pinned(
637 self.db.handle(),
638 ro.handle(),
639 key_ptr,
640 key_len,
641 &mut err,
642 ),
643 };
644
645 if !err.is_null() {
646 return Err(Error::new(ffi_util::error_message(err)));
647 }
648
649 if val.is_null() {
650 Ok(None)
651 } else {
652 Ok(Some(DBPinnableSlice::from_c(val)))
653 }
654 }
655 }
656}