use crate::{AsColumnFamilyRef, ffi};
use libc::{c_char, c_void, size_t};
use std::slice;
pub type WriteBatch = WriteBatchWithTransaction<false>;
pub struct WriteBatchWithTransaction<const TRANSACTION: bool> {
pub(crate) inner: *mut ffi::rocksdb_writebatch_t,
}
pub trait WriteBatchIterator {
fn put(&mut self, key: &[u8], value: &[u8]);
fn delete(&mut self, key: &[u8]);
}
pub trait WriteBatchIteratorCf {
fn put_cf(&mut self, cf_id: u32, key: &[u8], value: &[u8]);
fn delete_cf(&mut self, cf_id: u32, key: &[u8]);
fn merge_cf(&mut self, cf_id: u32, key: &[u8], value: &[u8]);
}
unsafe extern "C" fn writebatch_put_callback<T: WriteBatchIterator>(
state: *mut c_void,
k: *const c_char,
klen: usize,
v: *const c_char,
vlen: usize,
) {
unsafe {
let callbacks = &mut *(state as *mut T);
let key = slice::from_raw_parts(k as *const u8, klen);
let value = slice::from_raw_parts(v as *const u8, vlen);
callbacks.put(key, value);
}
}
unsafe extern "C" fn writebatch_delete_callback<T: WriteBatchIterator>(
state: *mut c_void,
k: *const c_char,
klen: usize,
) {
unsafe {
let callbacks = &mut *(state as *mut T);
let key = slice::from_raw_parts(k as *const u8, klen);
callbacks.delete(key);
}
}
unsafe extern "C" fn writebatch_put_cf_callback<T: WriteBatchIteratorCf>(
state: *mut c_void,
cfid: u32,
k: *const c_char,
klen: usize,
v: *const c_char,
vlen: usize,
) {
unsafe {
let callbacks = &mut *(state as *mut T);
let key = slice::from_raw_parts(k as *const u8, klen);
let value = slice::from_raw_parts(v as *const u8, vlen);
callbacks.put_cf(cfid, key, value);
}
}
unsafe extern "C" fn writebatch_delete_cf_callback<T: WriteBatchIteratorCf>(
state: *mut c_void,
cfid: u32,
k: *const c_char,
klen: usize,
) {
unsafe {
let callbacks = &mut *(state as *mut T);
let key = slice::from_raw_parts(k as *const u8, klen);
callbacks.delete_cf(cfid, key);
}
}
unsafe extern "C" fn writebatch_merge_cf_callback<T: WriteBatchIteratorCf>(
state: *mut c_void,
cfid: u32,
k: *const c_char,
klen: usize,
v: *const c_char,
vlen: usize,
) {
unsafe {
let callbacks = &mut *(state as *mut T);
let key = slice::from_raw_parts(k as *const u8, klen);
let value = slice::from_raw_parts(v as *const u8, vlen);
callbacks.merge_cf(cfid, key, value);
}
}
impl<const TRANSACTION: bool> WriteBatchWithTransaction<TRANSACTION> {
pub fn new() -> Self {
Self {
inner: unsafe { ffi::rocksdb_writebatch_create() },
}
}
pub fn with_capacity_bytes(capacity_bytes: usize) -> Self {
Self {
inner: unsafe { ffi::rocksdb_writebatch_create_with_params(capacity_bytes, 0, 0, 0) },
}
}
pub fn from_data(data: &[u8]) -> Self {
unsafe {
let ptr = data.as_ptr();
let len = data.len();
Self {
inner: ffi::rocksdb_writebatch_create_from(
ptr as *const libc::c_char,
len as size_t,
),
}
}
}
pub fn len(&self) -> usize {
unsafe { ffi::rocksdb_writebatch_count(self.inner) as usize }
}
pub fn size_in_bytes(&self) -> usize {
unsafe {
let mut batch_size: size_t = 0;
ffi::rocksdb_writebatch_data(self.inner, &raw mut batch_size);
batch_size
}
}
pub fn data(&self) -> &[u8] {
unsafe {
let mut batch_size: size_t = 0;
let batch_data = ffi::rocksdb_writebatch_data(self.inner, &raw mut batch_size);
std::slice::from_raw_parts(batch_data as _, batch_size)
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn iterate<T: WriteBatchIterator>(&self, callbacks: &mut T) {
let state = std::ptr::from_mut::<T>(callbacks) as *mut c_void;
unsafe {
ffi::rocksdb_writebatch_iterate(
self.inner,
state,
Some(writebatch_put_callback::<T>),
Some(writebatch_delete_callback::<T>),
);
}
}
pub fn iterate_cf<T: WriteBatchIteratorCf>(&self, callbacks: &mut T) {
let state = std::ptr::from_mut::<T>(callbacks) as *mut c_void;
unsafe {
ffi::rocksdb_writebatch_iterate_cf(
self.inner,
state,
Some(writebatch_put_cf_callback::<T>),
Some(writebatch_delete_cf_callback::<T>),
Some(writebatch_merge_cf_callback::<T>),
);
}
}
pub fn put<K, V>(&mut self, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi::rocksdb_writebatch_put(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
}
}
pub fn put_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi::rocksdb_writebatch_put_cf(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
}
}
pub fn put_cf_with_ts<K, V, S>(&mut self, cf: &impl AsColumnFamilyRef, key: K, ts: S, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
S: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
let ts = ts.as_ref();
unsafe {
ffi::rocksdb_writebatch_put_cf_with_ts(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
ts.as_ptr() as *const c_char,
ts.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
}
}
pub fn merge<K, V>(&mut self, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi::rocksdb_writebatch_merge(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
}
}
pub fn merge_cf<K, V>(&mut self, cf: &impl AsColumnFamilyRef, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
unsafe {
ffi::rocksdb_writebatch_merge_cf(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
}
}
pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref();
unsafe {
ffi::rocksdb_writebatch_delete(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, key: K) {
let key = key.as_ref();
unsafe {
ffi::rocksdb_writebatch_delete_cf(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
pub fn delete_cf_with_ts<K: AsRef<[u8]>, S: AsRef<[u8]>>(
&mut self,
cf: &impl AsColumnFamilyRef,
key: K,
ts: S,
) {
let key = key.as_ref();
let ts = ts.as_ref();
unsafe {
ffi::rocksdb_writebatch_delete_cf_with_ts(
self.inner,
cf.inner(),
key.as_ptr() as *const c_char,
key.len() as size_t,
ts.as_ptr() as *const c_char,
ts.len() as size_t,
);
}
}
pub fn put_log_data<V: AsRef<[u8]>>(&mut self, log_data: V) {
let log_data = log_data.as_ref();
unsafe {
ffi::rocksdb_writebatch_put_log_data(
self.inner,
log_data.as_ptr() as *const c_char,
log_data.len() as size_t,
);
}
}
pub fn clear(&mut self) {
unsafe {
ffi::rocksdb_writebatch_clear(self.inner);
}
}
}
impl WriteBatchWithTransaction<false> {
pub fn delete_range<K: AsRef<[u8]>>(&mut self, from: K, to: K) {
let (start_key, end_key) = (from.as_ref(), to.as_ref());
unsafe {
ffi::rocksdb_writebatch_delete_range(
self.inner,
start_key.as_ptr() as *const c_char,
start_key.len() as size_t,
end_key.as_ptr() as *const c_char,
end_key.len() as size_t,
);
}
}
pub fn delete_range_cf<K: AsRef<[u8]>>(&mut self, cf: &impl AsColumnFamilyRef, from: K, to: K) {
let (start_key, end_key) = (from.as_ref(), to.as_ref());
unsafe {
ffi::rocksdb_writebatch_delete_range_cf(
self.inner,
cf.inner(),
start_key.as_ptr() as *const c_char,
start_key.len() as size_t,
end_key.as_ptr() as *const c_char,
end_key.len() as size_t,
);
}
}
}
impl<const TRANSACTION: bool> Default for WriteBatchWithTransaction<TRANSACTION> {
fn default() -> Self {
Self::new()
}
}
impl<const TRANSACTION: bool> Drop for WriteBatchWithTransaction<TRANSACTION> {
fn drop(&mut self) {
unsafe {
ffi::rocksdb_writebatch_destroy(self.inner);
}
}
}
unsafe impl<const TRANSACTION: bool> Send for WriteBatchWithTransaction<TRANSACTION> {}