1use crate::ffi;
2use crate::{
3 ColumnFamily, DBPinnableSlice, DBRawIterator, DBVector, Error, ReadOptions, ffi_util,
4 handle::{ConstHandle, Handle},
5 ops::*,
6};
7use libc::{c_char, c_uchar, c_void, size_t};
8use std::marker::PhantomData;
9use std::ptr;
10
11pub struct OptimisticTransaction {
12 inner: *mut ffi::rocksdb_transaction_t,
13}
14
15unsafe impl Send for OptimisticTransaction {}
16unsafe impl Sync for OptimisticTransaction {}
17
18impl OptimisticTransaction {
19 pub(crate) fn new(inner: *mut ffi::rocksdb_transaction_t) -> OptimisticTransaction {
20 OptimisticTransaction { inner }
21 }
22
23 pub fn commit(&self) -> Result<(), Error> {
25 unsafe {
26 ffi_try!(ffi::rocksdb_transaction_commit(self.inner,));
27 }
28 Ok(())
29 }
30
31 pub fn rollback(&self) -> Result<(), Error> {
33 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback(self.inner,)) }
34 Ok(())
35 }
36
37 pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
39 unsafe { ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner,)) }
40 Ok(())
41 }
42
43 pub fn set_savepoint(&self) {
45 unsafe { ffi::rocksdb_transaction_set_savepoint(self.inner) }
46 }
47
48 pub fn snapshot(&self) -> OptimisticTransactionSnapshot<'_> {
50 unsafe {
51 let snapshot = ffi::rocksdb_transaction_get_snapshot(self.inner);
52 OptimisticTransactionSnapshot {
53 txn: self,
54 inner: snapshot,
55 }
56 }
57 }
58
59 pub fn get_for_update<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
63 let opt = ReadOptions::default();
64 self.get_for_update_opt(key, &opt, true)
65 }
66
67 pub fn get_for_update_opt<K: AsRef<[u8]>>(
69 &self,
70 key: K,
71 readopts: &ReadOptions,
72 exclusive: bool,
73 ) -> Result<Option<DBVector>, Error> {
74 let key = key.as_ref();
75 let key_ptr = key.as_ptr() as *const c_char;
76 let key_len = key.len() as size_t;
77 unsafe {
78 let mut val_len: size_t = 0;
79 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update(
80 self.handle(),
81 readopts.handle(),
82 key_ptr,
83 key_len,
84 &mut val_len,
85 exclusive as c_uchar,
86 )) as *mut u8;
87
88 if val.is_null() {
89 Ok(None)
90 } else {
91 Ok(Some(DBVector::from_c(val, val_len)))
92 }
93 }
94 }
95
96 pub fn get_for_update_cf<K: AsRef<[u8]>>(
97 &self,
98 cf: &ColumnFamily,
99 key: K,
100 ) -> Result<Option<DBVector>, Error> {
101 let opt = ReadOptions::default();
102 self.get_for_update_cf_opt(cf, key, &opt, true)
103 }
104
105 pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
106 &self,
107 cf: &ColumnFamily,
108 key: K,
109 readopts: &ReadOptions,
110 exclusive: bool,
111 ) -> Result<Option<DBVector>, Error> {
112 let key = key.as_ref();
113 let key_ptr = key.as_ptr() as *const c_char;
114 let key_len = key.len() as size_t;
115 unsafe {
116 let mut val_len: size_t = 0;
117 let val = ffi_try!(ffi::rocksdb_transaction_get_for_update_cf(
118 self.handle(),
119 readopts.handle(),
120 cf.handle(),
121 key_ptr,
122 key_len,
123 &mut val_len,
124 exclusive as c_uchar,
125 )) as *mut u8;
126
127 if val.is_null() {
128 Ok(None)
129 } else {
130 Ok(Some(DBVector::from_c(val, val_len)))
131 }
132 }
133 }
134}
135
136impl Drop for OptimisticTransaction {
137 fn drop(&mut self) {
138 unsafe {
139 ffi::rocksdb_transaction_destroy(self.inner);
140 }
141 }
142}
143
144impl Handle<ffi::rocksdb_transaction_t> for OptimisticTransaction {
145 fn handle(&self) -> *mut ffi::rocksdb_transaction_t {
146 self.inner
147 }
148}
149
150impl Read for OptimisticTransaction {}
151
152impl GetCF<ReadOptions> for OptimisticTransaction {
153 fn get_cf_full<K: AsRef<[u8]>>(
154 &self,
155 cf: Option<&ColumnFamily>,
156 key: K,
157 readopts: Option<&ReadOptions>,
158 ) -> Result<Option<DBVector>, Error> {
159 let mut default_readopts = None;
160
161 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
162
163 let key = key.as_ref();
164 let key_ptr = key.as_ptr() as *const c_char;
165 let key_len = key.len() as size_t;
166
167 unsafe {
168 let mut val_len: size_t = 0;
169
170 let val = match cf {
171 Some(cf) => ffi_try!(ffi::rocksdb_transaction_get_cf(
172 self.handle(),
173 ro_handle,
174 cf.inner,
175 key_ptr,
176 key_len,
177 &mut val_len,
178 )),
179 None => ffi_try!(ffi::rocksdb_transaction_get(
180 self.handle(),
181 ro_handle,
182 key_ptr,
183 key_len,
184 &mut val_len,
185 )),
186 } as *mut u8;
187
188 if val.is_null() {
189 Ok(None)
190 } else {
191 Ok(Some(DBVector::from_c(val, val_len)))
192 }
193 }
194 }
195}
196
197impl MultiGet<ReadOptions> for OptimisticTransaction {
198 fn multi_get_full<K, I>(
199 &self,
200 keys: I,
201 readopts: Option<&ReadOptions>,
202 ) -> Vec<Result<Option<DBVector>, Error>>
203 where
204 K: AsRef<[u8]>,
205 I: IntoIterator<Item = K>,
206 {
207 let mut default_readopts = None;
208 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
209 Ok(ro) => ro,
210 Err(e) => {
211 let key_count = keys.into_iter().count();
212
213 return vec![e; key_count]
214 .iter()
215 .map(|e| Err(e.to_owned()))
216 .collect();
217 }
218 };
219
220 let (keys, keys_sizes): (Vec<Box<[u8]>>, Vec<_>) = keys
221 .into_iter()
222 .map(|k| (Box::from(k.as_ref()), k.as_ref().len()))
223 .unzip();
224 let ptr_keys: Vec<_> = keys.iter().map(|k| k.as_ptr() as *const c_char).collect();
225
226 let mut values = vec![ptr::null_mut(); keys.len()];
227 let mut values_sizes = vec![0_usize; keys.len()];
228 let mut errors = vec![ptr::null_mut(); keys.len()];
229 unsafe {
230 ffi::rocksdb_transaction_multi_get(
231 self.inner,
232 ro_handle,
233 ptr_keys.len(),
234 ptr_keys.as_ptr(),
235 keys_sizes.as_ptr(),
236 values.as_mut_ptr(),
237 values_sizes.as_mut_ptr(),
238 errors.as_mut_ptr(),
239 );
240 }
241
242 convert_values(values, values_sizes, errors)
243 }
244}
245
246impl MultiGetCF<ReadOptions> for OptimisticTransaction {
247 fn multi_get_cf_full<'a, K, I>(
248 &self,
249 keys: I,
250 readopts: Option<&ReadOptions>,
251 ) -> Vec<Result<Option<DBVector>, Error>>
252 where
253 K: AsRef<[u8]>,
254 I: IntoIterator<Item = (&'a ColumnFamily, K)>,
255 {
256 let mut default_readopts = None;
257 let ro_handle = match ReadOptions::input_or_default(readopts, &mut default_readopts) {
258 Ok(ro) => ro,
259 Err(e) => {
260 let key_count = keys.into_iter().count();
261
262 return vec![e; key_count]
263 .iter()
264 .map(|e| Err(e.to_owned()))
265 .collect();
266 }
267 };
268
269 let (cfs_and_keys, keys_sizes): (Vec<CFAndKey>, Vec<_>) = keys
270 .into_iter()
271 .map(|(cf, key)| ((cf, Box::from(key.as_ref())), key.as_ref().len()))
272 .unzip();
273 let ptr_keys: Vec<_> = cfs_and_keys
274 .iter()
275 .map(|(_, k)| k.as_ptr() as *const c_char)
276 .collect();
277 let ptr_cfs: Vec<_> = cfs_and_keys
278 .iter()
279 .map(|(c, _)| c.inner as *const _)
280 .collect();
281
282 let mut values = vec![ptr::null_mut(); ptr_keys.len()];
283 let mut values_sizes = vec![0_usize; ptr_keys.len()];
284 let mut errors = vec![ptr::null_mut(); ptr_keys.len()];
285 unsafe {
286 ffi::rocksdb_transaction_multi_get_cf(
287 self.inner,
288 ro_handle,
289 ptr_cfs.as_ptr(),
290 ptr_keys.len(),
291 ptr_keys.as_ptr(),
292 keys_sizes.as_ptr(),
293 values.as_mut_ptr(),
294 values_sizes.as_mut_ptr(),
295 errors.as_mut_ptr(),
296 );
297 }
298
299 convert_values(values, values_sizes, errors)
300 }
301}
302
303impl Iterate for OptimisticTransaction {
304 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
305 unsafe {
306 DBRawIterator {
307 inner: ffi::rocksdb_transaction_create_iterator(self.inner, readopts.handle()),
308 db: PhantomData,
309 }
310 }
311 }
312}
313
314impl IterateCF for OptimisticTransaction {
315 fn get_raw_iter_cf<'a: 'b, 'b>(
316 &'a self,
317 cf_handle: &ColumnFamily,
318 readopts: &ReadOptions,
319 ) -> Result<DBRawIterator<'b>, Error> {
320 unsafe {
321 Ok(DBRawIterator {
322 inner: ffi::rocksdb_transaction_create_iterator_cf(
323 self.inner,
324 readopts.handle(),
325 cf_handle.inner,
326 ),
327 db: PhantomData,
328 })
329 }
330 }
331}
332
333impl PutCF<()> for OptimisticTransaction {
334 fn put_cf_full<K, V>(
335 &self,
336 cf: Option<&ColumnFamily>,
337 key: K,
338 value: V,
339 _: Option<&()>,
340 ) -> Result<(), Error>
341 where
342 K: AsRef<[u8]>,
343 V: AsRef<[u8]>,
344 {
345 let key = key.as_ref();
346 let value = value.as_ref();
347 let key_ptr = key.as_ptr() as *const c_char;
348 let key_len = key.len() as size_t;
349 let val_ptr = value.as_ptr() as *const c_char;
350 let val_len = value.len() as size_t;
351
352 unsafe {
353 match cf {
354 Some(cf) => ffi_try!(ffi::rocksdb_transaction_put_cf(
355 self.handle(),
356 cf.handle(),
357 key_ptr,
358 key_len,
359 val_ptr,
360 val_len,
361 )),
362 None => ffi_try!(ffi::rocksdb_transaction_put(
363 self.handle(),
364 key_ptr,
365 key_len,
366 val_ptr,
367 val_len,
368 )),
369 }
370
371 Ok(())
372 }
373 }
374}
375
376impl MergeCF<()> for OptimisticTransaction {
377 fn merge_cf_full<K, V>(
378 &self,
379 cf: Option<&ColumnFamily>,
380 key: K,
381 value: V,
382 _: Option<&()>,
383 ) -> Result<(), Error>
384 where
385 K: AsRef<[u8]>,
386 V: AsRef<[u8]>,
387 {
388 let key = key.as_ref();
389 let value = value.as_ref();
390 let key_ptr = key.as_ptr() as *const c_char;
391 let key_len = key.len() as size_t;
392 let val_ptr = value.as_ptr() as *const c_char;
393 let val_len = value.len() as size_t;
394
395 unsafe {
396 match cf {
397 Some(cf) => ffi_try!(ffi::rocksdb_transaction_merge_cf(
398 self.handle(),
399 cf.handle(),
400 key_ptr,
401 key_len,
402 val_ptr,
403 val_len,
404 )),
405 None => ffi_try!(ffi::rocksdb_transaction_merge(
406 self.handle(),
407 key_ptr,
408 key_len,
409 val_ptr,
410 val_len,
411 )),
412 }
413
414 Ok(())
415 }
416 }
417}
418
419impl DeleteCF<()> for OptimisticTransaction {
420 fn delete_cf_full<K>(
421 &self,
422 cf: Option<&ColumnFamily>,
423 key: K,
424 _: Option<&()>,
425 ) -> Result<(), Error>
426 where
427 K: AsRef<[u8]>,
428 {
429 let key = key.as_ref();
430 let key_ptr = key.as_ptr() as *const c_char;
431 let key_len = key.len() as size_t;
432
433 unsafe {
434 match cf {
435 Some(cf) => ffi_try!(ffi::rocksdb_transaction_delete_cf(
436 self.handle(),
437 cf.inner,
438 key_ptr,
439 key_len,
440 )),
441 None => ffi_try!(ffi::rocksdb_transaction_delete(
442 self.handle(),
443 key_ptr,
444 key_len,
445 )),
446 }
447
448 Ok(())
449 }
450 }
451}
452
453pub struct OptimisticTransactionSnapshot<'a> {
454 txn: &'a OptimisticTransaction,
455 inner: *const ffi::rocksdb_snapshot_t,
456}
457
458unsafe impl Send for OptimisticTransactionSnapshot<'_> {}
459unsafe impl Sync for OptimisticTransactionSnapshot<'_> {}
460
461impl ConstHandle<ffi::rocksdb_snapshot_t> for OptimisticTransactionSnapshot<'_> {
462 fn const_handle(&self) -> *const ffi::rocksdb_snapshot_t {
463 self.inner
464 }
465}
466
467impl Read for OptimisticTransactionSnapshot<'_> {}
468
469impl GetCF<ReadOptions> for OptimisticTransactionSnapshot<'_> {
470 fn get_cf_full<K: AsRef<[u8]>>(
471 &self,
472 cf: Option<&ColumnFamily>,
473 key: K,
474 readopts: Option<&ReadOptions>,
475 ) -> Result<Option<DBVector>, Error> {
476 let mut ro = readopts.cloned().unwrap_or_default();
477 ro.set_snapshot(self);
478 self.txn.get_cf_full(cf, key, Some(&ro))
479 }
480}
481
482impl MultiGet<ReadOptions> for OptimisticTransactionSnapshot<'_> {
483 fn multi_get_full<K, I>(
484 &self,
485 keys: I,
486 readopts: Option<&ReadOptions>,
487 ) -> Vec<Result<Option<DBVector>, Error>>
488 where
489 K: AsRef<[u8]>,
490 I: IntoIterator<Item = K>,
491 {
492 let mut ro = readopts.cloned().unwrap_or_default();
493 ro.set_snapshot(self);
494 self.txn.multi_get_full(keys, Some(&ro))
495 }
496}
497
498impl MultiGetCF<ReadOptions> for OptimisticTransactionSnapshot<'_> {
499 fn multi_get_cf_full<'m, K, I>(
500 &self,
501 keys: I,
502 readopts: Option<&ReadOptions>,
503 ) -> Vec<Result<Option<DBVector>, Error>>
504 where
505 K: AsRef<[u8]>,
506 I: IntoIterator<Item = (&'m ColumnFamily, K)>,
507 {
508 let mut ro = readopts.cloned().unwrap_or_default();
509 ro.set_snapshot(self);
510 self.txn.multi_get_cf_full(keys, Some(&ro))
511 }
512}
513
514impl Drop for OptimisticTransactionSnapshot<'_> {
515 fn drop(&mut self) {
516 unsafe {
517 ffi::rocksdb_free(self.inner as *mut c_void);
518 }
519 }
520}
521
522impl Iterate for OptimisticTransactionSnapshot<'_> {
523 fn get_raw_iter<'a: 'b, 'b>(&'a self, readopts: &ReadOptions) -> DBRawIterator<'b> {
524 let mut readopts = readopts.to_owned();
525 readopts.set_snapshot(self);
526 self.txn.get_raw_iter(&readopts)
527 }
528}
529
530impl IterateCF for OptimisticTransactionSnapshot<'_> {
531 fn get_raw_iter_cf<'a: 'b, 'b>(
532 &'a self,
533 cf_handle: &ColumnFamily,
534 readopts: &ReadOptions,
535 ) -> Result<DBRawIterator<'b>, Error> {
536 let mut readopts = readopts.to_owned();
537 readopts.set_snapshot(self);
538 self.txn.get_raw_iter_cf(cf_handle, &readopts)
539 }
540}
541
542impl<'a> GetPinnedCF<'a> for OptimisticTransaction {
543 type ColumnFamily = &'a ColumnFamily;
544 type ReadOptions = &'a ReadOptions;
545
546 fn get_pinned_cf_full<K: AsRef<[u8]>>(
547 &'a self,
548 cf: Option<Self::ColumnFamily>,
549 key: K,
550 readopts: Option<Self::ReadOptions>,
551 ) -> Result<Option<DBPinnableSlice<'a>>, Error> {
552 let mut default_readopts = None;
553
554 let ro_handle = ReadOptions::input_or_default(readopts, &mut default_readopts)?;
555
556 let key = key.as_ref();
557 let key_ptr = key.as_ptr() as *const c_char;
558 let key_len = key.len() as size_t;
559
560 unsafe {
561 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
562 let val = match cf {
563 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
564 self.handle(),
565 ro_handle,
566 cf.handle(),
567 key_ptr,
568 key_len,
569 &mut err,
570 ),
571 None => ffi::rocksdb_transaction_get_pinned(
572 self.handle(),
573 ro_handle,
574 key_ptr,
575 key_len,
576 &mut err,
577 ),
578 };
579
580 if !err.is_null() {
581 return Err(Error::new(ffi_util::error_message(err)));
582 }
583
584 if val.is_null() {
585 Ok(None)
586 } else {
587 Ok(Some(DBPinnableSlice::from_c(val)))
588 }
589 }
590 }
591}
592
593impl<'a> GetPinnedCF<'a> for OptimisticTransactionSnapshot<'a> {
594 type ColumnFamily = &'a ColumnFamily;
595 type ReadOptions = &'a ReadOptions;
596
597 fn get_pinned_cf_full<K: AsRef<[u8]>>(
598 &'a self,
599 cf: Option<Self::ColumnFamily>,
600 key: K,
601 readopts: Option<Self::ReadOptions>,
602 ) -> ::std::result::Result<Option<DBPinnableSlice<'a>>, Error> {
603 let mut ro = readopts.cloned().unwrap_or_default();
604 ro.set_snapshot(self);
605
606 let key = key.as_ref();
607 let key_ptr = key.as_ptr() as *const c_char;
608 let key_len = key.len() as size_t;
609
610 unsafe {
611 let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
612 let val = match cf {
613 Some(cf) => ffi::rocksdb_transaction_get_pinned_cf(
614 self.txn.handle(),
615 ro.handle(),
616 cf.handle(),
617 key_ptr,
618 key_len,
619 &mut err,
620 ),
621 None => ffi::rocksdb_transaction_get_pinned(
622 self.txn.handle(),
623 ro.handle(),
624 key_ptr,
625 key_len,
626 &mut err,
627 ),
628 };
629
630 if !err.is_null() {
631 return Err(Error::new(ffi_util::error_message(err)));
632 }
633
634 if val.is_null() {
635 Ok(None)
636 } else {
637 Ok(Some(DBPinnableSlice::from_c(val)))
638 }
639 }
640 }
641}