use std::marker::PhantomData;
use crate::{
AsColumnFamilyRef, DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
Direction, Error, IteratorMode, ReadOptions, SnapshotWithThreadMode, WriteBatchWithTransaction,
db::{DBAccess, convert_values},
ffi,
};
use libc::{c_char, c_void, size_t};
thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
pub struct Transaction<'db, DB> {
pub(crate) inner: *mut ffi::rocksdb_transaction_t,
pub(crate) _marker: PhantomData<&'db DB>,
}
unsafe impl<DB> Send for Transaction<'_, DB> {}
impl<DB> DBAccess for Transaction<'_, DB> {
unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
unsafe { ffi::rocksdb_transaction_get_snapshot(self.inner) }
}
unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
unsafe {
ffi::rocksdb_free(snapshot as *mut c_void);
}
}
unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
unsafe { ffi::rocksdb_transaction_create_iterator(self.inner, readopts.inner) }
}
unsafe fn create_iterator_cf(
&self,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: &ReadOptions,
) -> *mut ffi::rocksdb_iterator_t {
unsafe {
ffi::rocksdb_transaction_create_iterator_cf(self.inner, readopts.inner, cf_handle)
}
}
fn get_opt<K: AsRef<[u8]>>(
&self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_opt(key, readopts)
}
fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_cf_opt(cf, key, readopts)
}
fn get_pinned_opt<K: AsRef<[u8]>>(
&'_ self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
self.get_pinned_opt(key, readopts)
}
fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
self.get_pinned_cf_opt(cf, key, readopts)
}
fn multi_get_opt<K, I>(
&self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
self.multi_get_opt(keys, readopts)
}
fn multi_get_cf_opt<'b, K, I, W>(
&self,
keys_cf: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef + 'b,
{
self.multi_get_cf_opt(keys_cf, readopts)
}
}
impl<DB> Transaction<'_, DB> {
pub fn commit(self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transaction_commit(self.inner));
}
Ok(())
}
pub fn set_name(&self, name: &[u8]) -> Result<(), Error> {
let ptr = name.as_ptr();
let len = name.len();
unsafe {
ffi_try!(ffi::rocksdb_transaction_set_name(
self.inner, ptr as _, len as _
));
}
Ok(())
}
pub fn get_name(&self) -> Option<Vec<u8>> {
unsafe {
let mut name_len = 0;
let name = ffi::rocksdb_transaction_get_name(self.inner, &raw mut name_len);
if name.is_null() {
None
} else {
let mut vec = vec![0; name_len];
std::ptr::copy_nonoverlapping(name as *mut u8, vec.as_mut_ptr(), name_len);
ffi::rocksdb_free(name as *mut c_void);
Some(vec)
}
}
}
pub fn prepare(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transaction_prepare(self.inner));
}
Ok(())
}
pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
SnapshotWithThreadMode::new(self)
}
pub fn rollback(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transaction_rollback(self.inner));
Ok(())
}
}
pub fn set_savepoint(&self) {
unsafe {
ffi::rocksdb_transaction_set_savepoint(self.inner);
}
}
pub fn rollback_to_savepoint(&self) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transaction_rollback_to_savepoint(self.inner));
Ok(())
}
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_opt(key, opts))
}
pub fn get_pinned<K: AsRef<[u8]>>(
&'_ self,
key: K,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
}
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<Vec<u8>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_cf_opt(cf, key, opts))
}
pub fn get_pinned_cf<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
}
pub fn get_for_update<K: AsRef<[u8]>>(
&self,
key: K,
exclusive: bool,
) -> Result<Option<Vec<u8>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_for_update_opt(key, exclusive, opts))
}
pub fn get_pinned_for_update<K: AsRef<[u8]>>(
&'_ self,
key: K,
exclusive: bool,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_opt(key, exclusive, opts))
}
pub fn get_for_update_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
exclusive: bool,
) -> Result<Option<Vec<u8>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_for_update_cf_opt(cf, key, exclusive, opts))
}
pub fn get_pinned_for_update_cf<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
exclusive: bool,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts))
}
pub fn get_opt<K: AsRef<[u8]>>(
&self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_opt(key, readopts)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_pinned_opt<K: AsRef<[u8]>>(
&'_ self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_transaction_get_pinned(
self.inner,
readopts.inner,
key.as_ptr() as *const c_char,
key.len(),
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBPinnableSlice::from_c(val)))
}
}
}
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_cf_opt(cf, key, readopts)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_cf(
self.inner,
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len(),
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBPinnableSlice::from_c(val)))
}
}
}
pub fn get_for_update_opt<K: AsRef<[u8]>>(
&self,
key: K,
exclusive: bool,
opts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_for_update_opt(key, exclusive, opts)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_pinned_for_update_opt<K: AsRef<[u8]>>(
&'_ self,
key: K,
exclusive: bool,
opts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update(
self.inner,
opts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
u8::from(exclusive),
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBPinnableSlice::from_c(val)))
}
}
}
pub fn get_for_update_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
exclusive: bool,
opts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_for_update_cf_opt(cf, key, exclusive, opts)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_pinned_for_update_cf_opt<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
exclusive: bool,
opts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_transaction_get_pinned_for_update_cf(
self.inner,
opts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
u8::from(exclusive),
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBPinnableSlice::from_c(val)))
}
}
}
pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
}
pub fn multi_get_opt<K, I>(
&self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
let owned_keys: Vec<K> = keys.into_iter().collect();
let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
let ptr_keys: Vec<*const c_char> = owned_keys
.iter()
.map(|k| k.as_ref().as_ptr() as *const c_char)
.collect();
let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
unsafe {
ffi::rocksdb_transaction_multi_get(
self.inner,
readopts.inner,
ptr_keys.len(),
ptr_keys.as_ptr(),
keys_sizes.as_ptr(),
values.as_mut_ptr(),
values_sizes.as_mut_ptr(),
errors.as_mut_ptr(),
);
}
unsafe {
values.set_len(ptr_keys.len());
values_sizes.set_len(ptr_keys.len());
errors.set_len(ptr_keys.len());
}
convert_values(values, values_sizes, errors)
}
pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
&'a self,
keys: I,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: 'b + AsColumnFamilyRef,
{
DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
}
pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
&'a self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: 'b + AsColumnFamilyRef,
{
let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
let keys_sizes: Vec<usize> = cfs_and_owned_keys
.iter()
.map(|(_, k)| k.as_ref().len())
.collect();
let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
.iter()
.map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
.collect();
let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
.iter()
.map(|(c, _)| c.inner().cast_const())
.collect();
let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
unsafe {
ffi::rocksdb_transaction_multi_get_cf(
self.inner,
readopts.inner,
ptr_cfs.as_ptr(),
ptr_keys.len(),
ptr_keys.as_ptr(),
keys_sizes.as_ptr(),
values.as_mut_ptr(),
values_sizes.as_mut_ptr(),
errors.as_mut_ptr(),
);
}
unsafe {
values.set_len(ptr_keys.len());
values_sizes.set_len(ptr_keys.len());
errors.set_len(ptr_keys.len());
}
convert_values(values, values_sizes, errors)
}
pub fn put<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transaction_put(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
value: V,
) -> Result<(), Error> {
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transaction_put_cf(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn merge<K: AsRef<[u8]>, V: AsRef<[u8]>>(&self, key: K, value: V) -> Result<(), Error> {
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transaction_merge(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t
));
Ok(())
}
}
pub fn merge_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
value: V,
) -> Result<(), Error> {
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transaction_merge_cf(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t
));
Ok(())
}
}
pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
let key = key.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transaction_delete(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t
));
}
Ok(())
}
pub fn delete_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<(), Error> {
let key = key.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transaction_delete_cf(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t
));
}
Ok(())
}
pub fn iterator<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let readopts = ReadOptions::default();
self.iterator_opt(mode, readopts)
}
pub fn iterator_opt<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
readopts: ReadOptions,
) -> DBIteratorWithThreadMode<'b, Self> {
DBIteratorWithThreadMode::new(self, readopts, mode)
}
pub fn iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
readopts: ReadOptions,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
}
pub fn full_iterator<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_total_order_seek(true);
DBIteratorWithThreadMode::new(self, opts, mode)
}
pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
&'a self,
prefix: P,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
DBIteratorWithThreadMode::new(
self,
opts,
IteratorMode::From(prefix.as_ref(), Direction::Forward),
)
}
pub fn iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
}
pub fn full_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_total_order_seek(true);
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
}
pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
prefix: P,
) -> DBIteratorWithThreadMode<'a, Self> {
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
DBIteratorWithThreadMode::<'a, Self>::new_cf(
self,
cf_handle.inner(),
opts,
IteratorMode::From(prefix.as_ref(), Direction::Forward),
)
}
pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIteratorWithThreadMode::new(self, opts)
}
pub fn raw_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
}
pub fn raw_iterator_opt<'a: 'b, 'b>(
&'a self,
readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new(self, readopts)
}
pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
}
pub fn get_writebatch(&self) -> WriteBatchWithTransaction<true> {
unsafe {
let wi = ffi::rocksdb_transaction_get_writebatch_wi(self.inner);
let mut len: usize = 0;
let ptr = ffi::rocksdb_writebatch_wi_data(wi, std::ptr::from_mut(&mut len));
let writebatch = ffi::rocksdb_writebatch_create_from(ptr, len);
ffi::rocksdb_free(wi as *mut c_void);
WriteBatchWithTransaction { inner: writebatch }
}
}
pub fn rebuild_from_writebatch(
&self,
writebatch: &WriteBatchWithTransaction<true>,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transaction_rebuild_from_writebatch(
self.inner,
writebatch.inner
));
}
Ok(())
}
}
impl<DB> Drop for Transaction<'_, DB> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_transaction_destroy(self.inner);
}
}
}