use std::{
collections::BTreeMap,
ffi::CString,
fs, iter,
marker::PhantomData,
path::{Path, PathBuf},
ptr,
sync::{Arc, Mutex},
};
use crate::CStrLike;
use std::ffi::CStr;
use crate::column_family::ColumnFamilyTtl;
use crate::{
AsColumnFamilyRef, BoundColumnFamily, ColumnFamily, ColumnFamilyDescriptor, DB,
DBIteratorWithThreadMode, DBPinnableSlice, DBRawIteratorWithThreadMode,
DEFAULT_COLUMN_FAMILY_NAME, Direction, Error, FlushOptions, IteratorMode, MultiThreaded,
Options, ReadOptions, SingleThreaded, SnapshotWithThreadMode, ThreadMode, Transaction,
TransactionDBOptions, TransactionOptions, WriteBatchWithTransaction, WriteOptions,
column_family::UnboundColumnFamily,
db::{DBAccess, convert_values},
db_options::OptionsMustOutliveDB,
ffi,
ffi_util::to_cpath,
};
use ffi::rocksdb_transaction_t;
use libc::{c_char, c_int, c_uchar, c_void, size_t};
thread_local! { static DEFAULT_READ_OPTS: ReadOptions = ReadOptions::default(); }
thread_local! { static DEFAULT_WRITE_OPTS: WriteOptions = WriteOptions::default(); }
thread_local! { static DEFAULT_FLUSH_OPTS: FlushOptions = FlushOptions::default(); }
#[cfg(not(feature = "multi-threaded-cf"))]
type DefaultThreadMode = crate::SingleThreaded;
#[cfg(feature = "multi-threaded-cf")]
type DefaultThreadMode = crate::MultiThreaded;
pub struct TransactionDB<T: ThreadMode = DefaultThreadMode> {
pub(crate) inner: *mut ffi::rocksdb_transactiondb_t,
cfs: T,
path: PathBuf,
prepared: Mutex<Vec<*mut rocksdb_transaction_t>>,
_outlive: Vec<OptionsMustOutliveDB>,
}
unsafe impl<T: ThreadMode> Send for TransactionDB<T> {}
unsafe impl<T: ThreadMode> Sync for TransactionDB<T> {}
impl<T: ThreadMode> DBAccess for TransactionDB<T> {
unsafe fn create_snapshot(&self) -> *const ffi::rocksdb_snapshot_t {
unsafe { ffi::rocksdb_transactiondb_create_snapshot(self.inner) }
}
unsafe fn release_snapshot(&self, snapshot: *const ffi::rocksdb_snapshot_t) {
unsafe {
ffi::rocksdb_transactiondb_release_snapshot(self.inner, snapshot);
}
}
unsafe fn create_iterator(&self, readopts: &ReadOptions) -> *mut ffi::rocksdb_iterator_t {
unsafe { ffi::rocksdb_transactiondb_create_iterator(self.inner, readopts.inner) }
}
unsafe fn create_iterator_cf(
&self,
cf_handle: *mut ffi::rocksdb_column_family_handle_t,
readopts: &ReadOptions,
) -> *mut ffi::rocksdb_iterator_t {
unsafe {
ffi::rocksdb_transactiondb_create_iterator_cf(self.inner, readopts.inner, cf_handle)
}
}
fn get_opt<K: AsRef<[u8]>>(
&self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_opt(key, readopts)
}
fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_cf_opt(cf, key, readopts)
}
fn get_pinned_opt<K: AsRef<[u8]>>(
&'_ self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
self.get_pinned_opt(key, readopts)
}
fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
self.get_pinned_cf_opt(cf, key, readopts)
}
fn multi_get_opt<K, I>(
&self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
self.multi_get_opt(keys, readopts)
}
fn multi_get_cf_opt<'b, K, I, W>(
&self,
keys_cf: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: AsColumnFamilyRef + 'b,
{
self.multi_get_cf_opt(keys_cf, readopts)
}
}
impl<T: ThreadMode> TransactionDB<T> {
pub fn open_default<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let mut opts = Options::default();
opts.create_if_missing(true);
let txn_db_opts = TransactionDBOptions::default();
Self::open(&opts, &txn_db_opts, path)
}
pub fn open<P: AsRef<Path>>(
opts: &Options,
txn_db_opts: &TransactionDBOptions,
path: P,
) -> Result<Self, Error> {
Self::open_cf(opts, txn_db_opts, path, None::<&str>)
}
pub fn open_cf<P, I, N>(
opts: &Options,
txn_db_opts: &TransactionDBOptions,
path: P,
cfs: I,
) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = N>,
N: AsRef<str>,
{
let cfs = cfs
.into_iter()
.map(|name| ColumnFamilyDescriptor::new(name.as_ref(), Options::default()));
Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
}
pub fn open_cf_descriptors<P, I>(
opts: &Options,
txn_db_opts: &TransactionDBOptions,
path: P,
cfs: I,
) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
Self::open_cf_descriptors_internal(opts, txn_db_opts, path, cfs)
}
fn open_cf_descriptors_internal<P, I>(
opts: &Options,
txn_db_opts: &TransactionDBOptions,
path: P,
cfs: I,
) -> Result<Self, Error>
where
P: AsRef<Path>,
I: IntoIterator<Item = ColumnFamilyDescriptor>,
{
let cfs: Vec<_> = cfs.into_iter().collect();
let outlive = iter::once(opts.outlive.clone())
.chain(cfs.iter().map(|cf| cf.options.outlive.clone()))
.collect();
let cpath = to_cpath(&path)?;
if let Err(e) = fs::create_dir_all(&path) {
return Err(Error::new(format!(
"Failed to create RocksDB directory: `{e:?}`."
)));
}
let db: *mut ffi::rocksdb_transactiondb_t;
let mut cf_map = BTreeMap::new();
if cfs.is_empty() {
db = Self::open_raw(opts, txn_db_opts, &cpath)?;
} else {
let mut cfs_v = cfs;
if !cfs_v.iter().any(|cf| cf.name == DEFAULT_COLUMN_FAMILY_NAME) {
cfs_v.push(ColumnFamilyDescriptor {
name: String::from(DEFAULT_COLUMN_FAMILY_NAME),
options: Options::default(),
ttl: ColumnFamilyTtl::SameAsDb, });
}
let c_cfs: Vec<CString> = cfs_v
.iter()
.map(|cf| CString::new(cf.name.as_bytes()).unwrap())
.collect();
let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
let cfopts: Vec<_> = cfs_v
.iter()
.map(|cf| cf.options.inner.cast_const())
.collect();
db = Self::open_cf_raw(
opts,
txn_db_opts,
&cpath,
&cfs_v,
&cfnames,
&cfopts,
&mut cfhandles,
)?;
for handle in &cfhandles {
if handle.is_null() {
return Err(Error::new(
"Received null column family handle from DB.".to_owned(),
));
}
}
for (cf_desc, inner) in cfs_v.iter().zip(cfhandles) {
cf_map.insert(cf_desc.name.clone(), inner);
}
}
if db.is_null() {
return Err(Error::new("Could not initialize database.".to_owned()));
}
let prepared = unsafe {
let mut cnt = 0;
let ptr = ffi::rocksdb_transactiondb_get_prepared_transactions(db, &raw mut cnt);
let mut vec = vec![std::ptr::null_mut(); cnt];
if !ptr.is_null() {
std::ptr::copy_nonoverlapping(ptr, vec.as_mut_ptr(), cnt);
ffi::rocksdb_free(ptr as *mut c_void);
}
vec
};
Ok(TransactionDB {
inner: db,
cfs: T::new_cf_map_internal(cf_map),
path: path.as_ref().to_path_buf(),
prepared: Mutex::new(prepared),
_outlive: outlive,
})
}
fn open_raw(
opts: &Options,
txn_db_opts: &TransactionDBOptions,
cpath: &CString,
) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
unsafe {
let db = ffi_try!(ffi::rocksdb_transactiondb_open(
opts.inner,
txn_db_opts.inner,
cpath.as_ptr()
));
Ok(db)
}
}
fn open_cf_raw(
opts: &Options,
txn_db_opts: &TransactionDBOptions,
cpath: &CString,
cfs_v: &[ColumnFamilyDescriptor],
cfnames: &[*const c_char],
cfopts: &[*const ffi::rocksdb_options_t],
cfhandles: &mut [*mut ffi::rocksdb_column_family_handle_t],
) -> Result<*mut ffi::rocksdb_transactiondb_t, Error> {
unsafe {
let db = ffi_try!(ffi::rocksdb_transactiondb_open_column_families(
opts.inner,
txn_db_opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_ptr(),
cfopts.as_ptr(),
cfhandles.as_mut_ptr(),
));
Ok(db)
}
}
fn create_inner_cf_handle(
&self,
name: &str,
opts: &Options,
) -> Result<*mut ffi::rocksdb_column_family_handle_t, Error> {
let cf_name = CString::new(name.as_bytes()).map_err(|_| {
Error::new("Failed to convert path to CString when creating cf".to_owned())
})?;
Ok(unsafe {
ffi_try!(ffi::rocksdb_transactiondb_create_column_family(
self.inner,
opts.inner,
cf_name.as_ptr(),
))
})
}
pub fn list_cf<P: AsRef<Path>>(opts: &Options, path: P) -> Result<Vec<String>, Error> {
DB::list_cf(opts, path)
}
pub fn destroy<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
DB::destroy(opts, path)
}
pub fn repair<P: AsRef<Path>>(opts: &Options, path: P) -> Result<(), Error> {
DB::repair(opts, path)
}
pub fn path(&self) -> &Path {
self.path.as_path()
}
pub fn flush_wal(&self, sync: bool) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_flush_wal(
self.inner,
c_uchar::from(sync)
));
}
Ok(())
}
pub fn flush_opt(&self, flushopts: &FlushOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_flush(
self.inner,
flushopts.inner
));
}
Ok(())
}
pub fn flush(&self) -> Result<(), Error> {
DEFAULT_FLUSH_OPTS.with(|opts| self.flush_opt(opts))
}
pub fn flush_cf_opt(
&self,
cf: &impl AsColumnFamilyRef,
flushopts: &FlushOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_flush_cf(
self.inner,
flushopts.inner,
cf.inner()
));
}
Ok(())
}
pub fn flush_cfs_opt(
&self,
cfs: &[&impl AsColumnFamilyRef],
opts: &FlushOptions,
) -> Result<(), Error> {
let mut cfs = cfs.iter().map(|cf| cf.inner()).collect::<Vec<_>>();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_flush_cfs(
self.inner,
opts.inner,
cfs.as_mut_ptr(),
cfs.len() as c_int,
));
}
Ok(())
}
pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), Error> {
DEFAULT_FLUSH_OPTS.with(|opts| self.flush_cf_opt(cf, opts))
}
pub fn transaction(&'_ self) -> Transaction<'_, Self> {
DEFAULT_WRITE_OPTS.with(|opts| self.transaction_opt(opts, &TransactionOptions::default()))
}
pub fn transaction_opt<'a>(
&'a self,
write_opts: &WriteOptions,
txn_opts: &TransactionOptions,
) -> Transaction<'a, Self> {
Transaction {
inner: unsafe {
ffi::rocksdb_transaction_begin(
self.inner,
write_opts.inner,
txn_opts.inner,
std::ptr::null_mut(),
)
},
_marker: PhantomData,
}
}
pub fn prepared_transactions(&'_ self) -> Vec<Transaction<'_, Self>> {
self.prepared
.lock()
.unwrap()
.drain(0..)
.map(|inner| Transaction {
inner,
_marker: PhantomData,
})
.collect()
}
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned(key).map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_cf(cf, key)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_opt<K: AsRef<[u8]>>(
&self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_opt(key, readopts)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<Vec<u8>>, Error> {
self.get_pinned_cf_opt(cf, key, readopts)
.map(|x| x.map(|v| v.as_ref().to_vec()))
}
pub fn get_pinned<K: AsRef<[u8]>>(
&'_ self,
key: K,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_pinned_opt(key, opts))
}
pub fn get_pinned_cf<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
DEFAULT_READ_OPTS.with(|opts| self.get_pinned_cf_opt(cf, key, opts))
}
pub fn get_pinned_opt<K: AsRef<[u8]>>(
&'_ self,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned(
self.inner,
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBPinnableSlice::from_c(val)))
}
}
}
pub fn get_pinned_cf_opt<K: AsRef<[u8]>>(
&'_ self,
cf: &impl AsColumnFamilyRef,
key: K,
readopts: &ReadOptions,
) -> Result<Option<DBPinnableSlice<'_>>, Error> {
let key = key.as_ref();
unsafe {
let val = ffi_try!(ffi::rocksdb_transactiondb_get_pinned_cf(
self.inner,
readopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
));
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBPinnableSlice::from_c(val)))
}
}
}
pub fn multi_get<K, I>(&self, keys: I) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
DEFAULT_READ_OPTS.with(|opts| self.multi_get_opt(keys, opts))
}
pub fn multi_get_opt<K, I>(
&self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = K>,
{
let owned_keys: Vec<K> = keys.into_iter().collect();
let keys_sizes: Vec<usize> = owned_keys.iter().map(|k| k.as_ref().len()).collect();
let ptr_keys: Vec<*const c_char> = owned_keys
.iter()
.map(|k| k.as_ref().as_ptr() as *const c_char)
.collect();
let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
unsafe {
ffi::rocksdb_transactiondb_multi_get(
self.inner,
readopts.inner,
ptr_keys.len(),
ptr_keys.as_ptr(),
keys_sizes.as_ptr(),
values.as_mut_ptr(),
values_sizes.as_mut_ptr(),
errors.as_mut_ptr(),
);
}
unsafe {
values.set_len(ptr_keys.len());
values_sizes.set_len(ptr_keys.len());
errors.set_len(ptr_keys.len());
}
convert_values(values, values_sizes, errors)
}
pub fn multi_get_cf<'a, 'b: 'a, K, I, W>(
&'a self,
keys: I,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: 'b + AsColumnFamilyRef,
{
DEFAULT_READ_OPTS.with(|opts| self.multi_get_cf_opt(keys, opts))
}
pub fn multi_get_cf_opt<'a, 'b: 'a, K, I, W>(
&'a self,
keys: I,
readopts: &ReadOptions,
) -> Vec<Result<Option<Vec<u8>>, Error>>
where
K: AsRef<[u8]>,
I: IntoIterator<Item = (&'b W, K)>,
W: 'b + AsColumnFamilyRef,
{
let cfs_and_owned_keys: Vec<(&'b W, K)> = keys.into_iter().collect();
let keys_sizes: Vec<usize> = cfs_and_owned_keys
.iter()
.map(|(_, k)| k.as_ref().len())
.collect();
let ptr_keys: Vec<*const c_char> = cfs_and_owned_keys
.iter()
.map(|(_, k)| k.as_ref().as_ptr() as *const c_char)
.collect();
let ptr_cfs: Vec<*const ffi::rocksdb_column_family_handle_t> = cfs_and_owned_keys
.iter()
.map(|(c, _)| c.inner().cast_const())
.collect();
let mut values: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
let mut values_sizes: Vec<usize> = Vec::with_capacity(ptr_keys.len());
let mut errors: Vec<*mut c_char> = Vec::with_capacity(ptr_keys.len());
unsafe {
ffi::rocksdb_transactiondb_multi_get_cf(
self.inner,
readopts.inner,
ptr_cfs.as_ptr(),
ptr_keys.len(),
ptr_keys.as_ptr(),
keys_sizes.as_ptr(),
values.as_mut_ptr(),
values_sizes.as_mut_ptr(),
errors.as_mut_ptr(),
);
}
unsafe {
values.set_len(ptr_keys.len());
values_sizes.set_len(ptr_keys.len());
errors.set_len(ptr_keys.len());
}
convert_values(values, values_sizes, errors)
}
pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
DEFAULT_WRITE_OPTS.with(|opts| self.put_opt(key, value, opts))
}
pub fn put_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
DEFAULT_WRITE_OPTS.with(|opts| self.put_cf_opt(cf, key, value, opts))
}
pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_put(
self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t
));
}
Ok(())
}
pub fn put_cf_opt<K, V>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
value: V,
writeopts: &WriteOptions,
) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_put_cf(
self.inner,
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t
));
}
Ok(())
}
pub fn write(&self, batch: &WriteBatchWithTransaction<true>) -> Result<(), Error> {
DEFAULT_WRITE_OPTS.with(|opts| self.write_opt(batch, opts))
}
pub fn write_opt(
&self,
batch: &WriteBatchWithTransaction<true>,
writeopts: &WriteOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_write(
self.inner,
writeopts.inner,
batch.inner
));
}
Ok(())
}
pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
DEFAULT_WRITE_OPTS.with(|opts| self.merge_opt(key, value, opts))
}
pub fn merge_cf<K, V>(&self, cf: &impl AsColumnFamilyRef, key: K, value: V) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
DEFAULT_WRITE_OPTS.with(|opts| self.merge_cf_opt(cf, key, value, opts))
}
pub fn merge_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_merge(
self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn merge_cf_opt<K, V>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
value: V,
writeopts: &WriteOptions,
) -> Result<(), Error>
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_merge_cf(
self.inner,
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
DEFAULT_WRITE_OPTS.with(|opts| self.delete_opt(key, opts))
}
pub fn delete_cf<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
) -> Result<(), Error> {
DEFAULT_WRITE_OPTS.with(|opts| self.delete_cf_opt(cf, key, opts))
}
pub fn delete_opt<K: AsRef<[u8]>>(
&self,
key: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
let key = key.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_delete(
self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
));
}
Ok(())
}
pub fn delete_cf_opt<K: AsRef<[u8]>>(
&self,
cf: &impl AsColumnFamilyRef,
key: K,
writeopts: &WriteOptions,
) -> Result<(), Error> {
let key = key.as_ref();
unsafe {
ffi_try!(ffi::rocksdb_transactiondb_delete_cf(
self.inner,
writeopts.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
));
}
Ok(())
}
pub fn iterator<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let readopts = ReadOptions::default();
self.iterator_opt(mode, readopts)
}
pub fn iterator_opt<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
readopts: ReadOptions,
) -> DBIteratorWithThreadMode<'b, Self> {
DBIteratorWithThreadMode::new(self, readopts, mode)
}
pub fn iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
readopts: ReadOptions,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts, mode)
}
pub fn full_iterator<'a: 'b, 'b>(
&'a self,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_total_order_seek(true);
DBIteratorWithThreadMode::new(self, opts, mode)
}
pub fn prefix_iterator<'a: 'b, 'b, P: AsRef<[u8]>>(
&'a self,
prefix: P,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
DBIteratorWithThreadMode::new(
self,
opts,
IteratorMode::From(prefix.as_ref(), Direction::Forward),
)
}
pub fn iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
}
pub fn full_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
mode: IteratorMode,
) -> DBIteratorWithThreadMode<'b, Self> {
let mut opts = ReadOptions::default();
opts.set_total_order_seek(true);
DBIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts, mode)
}
pub fn prefix_iterator_cf<'a, P: AsRef<[u8]>>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
prefix: P,
) -> DBIteratorWithThreadMode<'a, Self> {
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
DBIteratorWithThreadMode::<'a, Self>::new_cf(
self,
cf_handle.inner(),
opts,
IteratorMode::From(prefix.as_ref(), Direction::Forward),
)
}
pub fn raw_iterator<'a: 'b, 'b>(&'a self) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIteratorWithThreadMode::new(self, opts)
}
pub fn raw_iterator_cf<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
) -> DBRawIteratorWithThreadMode<'b, Self> {
let opts = ReadOptions::default();
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), opts)
}
pub fn raw_iterator_opt<'a: 'b, 'b>(
&'a self,
readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new(self, readopts)
}
pub fn raw_iterator_cf_opt<'a: 'b, 'b>(
&'a self,
cf_handle: &impl AsColumnFamilyRef,
readopts: ReadOptions,
) -> DBRawIteratorWithThreadMode<'b, Self> {
DBRawIteratorWithThreadMode::new_cf(self, cf_handle.inner(), readopts)
}
pub fn snapshot(&'_ self) -> SnapshotWithThreadMode<'_, Self> {
SnapshotWithThreadMode::<Self>::new(self)
}
fn drop_column_family<C>(
&self,
cf_inner: *mut ffi::rocksdb_column_family_handle_t,
_cf: C,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(
self.inner as *mut ffi::rocksdb_t,
cf_inner
));
}
Ok(())
}
}
impl TransactionDB<SingleThreaded> {
pub fn create_cf<N: AsRef<str>>(&mut self, name: N, opts: &Options) -> Result<(), Error> {
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
self.cfs
.cfs
.insert(name.as_ref().to_string(), ColumnFamily { inner });
Ok(())
}
pub fn cf_handle(&self, name: &str) -> Option<&ColumnFamily> {
self.cfs.cfs.get(name)
}
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
match self.cfs.cfs.remove(name) {
Some(cf) => self.drop_column_family(cf.inner, cf),
_ => Err(Error::new(format!("Invalid column family: {name}"))),
}
}
}
impl TransactionDB<MultiThreaded> {
pub fn create_cf<N: AsRef<str>>(&self, name: N, opts: &Options) -> Result<(), Error> {
let mut cfs = self.cfs.cfs.write();
let inner = self.create_inner_cf_handle(name.as_ref(), opts)?;
cfs.insert(
name.as_ref().to_string(),
Arc::new(UnboundColumnFamily { inner }),
);
Ok(())
}
pub fn cf_handle(&'_ self, name: &str) -> Option<Arc<BoundColumnFamily<'_>>> {
self.cfs
.cfs
.read()
.get(name)
.cloned()
.map(UnboundColumnFamily::bound_column_family)
}
pub fn drop_cf(&self, name: &str) -> Result<(), Error> {
match self.cfs.cfs.write().remove(name) {
Some(cf) => self.drop_column_family(cf.inner, cf),
_ => Err(Error::new(format!("Invalid column family: {name}"))),
}
}
fn property_value_impl<R>(
name: impl CStrLike,
get_property: impl FnOnce(*const c_char) -> *mut c_char,
parse: impl FnOnce(&str) -> Result<R, Error>,
) -> Result<Option<R>, Error> {
let value = match name.bake() {
Ok(prop_name) => get_property(prop_name.as_ptr()),
Err(e) => {
return Err(Error::new(format!(
"Failed to convert property name to CString: {e}"
)));
}
};
if value.is_null() {
return Ok(None);
}
let result = match unsafe { CStr::from_ptr(value) }.to_str() {
Ok(s) => parse(s).map(|value| Some(value)),
Err(e) => Err(Error::new(format!(
"Failed to convert property value to string: {e}"
))),
};
unsafe {
ffi::rocksdb_free(value as *mut c_void);
}
result
}
pub fn property_value(&self, name: impl CStrLike) -> Result<Option<String>, Error> {
Self::property_value_impl(
name,
|prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
|str_value| Ok(str_value.to_owned()),
)
}
fn parse_property_int_value(value: &str) -> Result<u64, Error> {
value.parse::<u64>().map_err(|err| {
Error::new(format!(
"Failed to convert property value {value} to int: {err}"
))
})
}
pub fn property_int_value(&self, name: impl CStrLike) -> Result<Option<u64>, Error> {
Self::property_value_impl(
name,
|prop_name| unsafe { ffi::rocksdb_transactiondb_property_value(self.inner, prop_name) },
Self::parse_property_int_value,
)
}
}
impl<T: ThreadMode> Drop for TransactionDB<T> {
fn drop(&mut self) {
unsafe {
self.prepared_transactions().clear();
self.cfs.drop_all_cfs_internal();
ffi::rocksdb_transactiondb_close(self.inner);
}
}
}