use crate::bytes::AsBytes;
use crate::cf_store::RocksDbCFStore;
use crate::error::{StoreError, StoreResult};
use crate::serialization;
use crate::types::ValueWithExpiry;
use crate::MergeValue;
use rocksdb::WriteBatch;
use serde::{de::DeserializeOwned, Serialize};
use std::fmt::Debug;
use std::hash::Hash;
use std::mem::ManuallyDrop;
pub struct BatchWriter<'a> {
store: &'a RocksDbCFStore,
batch: ManuallyDrop<WriteBatch>, cf_name: String,
committed_or_discarded: bool,
}
impl<'a> BatchWriter<'a> {
pub(crate) fn new(store: &'a RocksDbCFStore, cf_name: String) -> Self {
BatchWriter {
store,
batch: ManuallyDrop::new(WriteBatch::default()), cf_name,
committed_or_discarded: false,
}
}
fn check_not_committed(&self) -> StoreResult<()> {
if self.committed_or_discarded {
Err(StoreError::Other(
"BatchWriter already committed or discarded".to_string(),
))
} else {
Ok(())
}
}
pub fn set<Key, Val>(&mut self, key: Key, val: &Val) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
Val: Serialize,
{
self.check_not_committed()?;
let sk = serialization::serialize_key(key)?;
let sv = serialization::serialize_value(val)?;
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.put(sk, sv);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.put_cf(&handle, sk, sv);
}
Ok(self)
}
pub fn set_raw<Key>(&mut self, key: Key, raw_val: &[u8]) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
{
self.check_not_committed()?;
let sk = serialization::serialize_key(key)?;
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.put(sk, raw_val);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.put_cf(&handle, sk, raw_val);
}
Ok(self)
}
pub fn set_with_expiry<Key, Val>(&mut self, key: Key, val: &Val, expire_time: u64) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
Val: Serialize + DeserializeOwned + Debug,
{
self.check_not_committed()?;
let sk = serialization::serialize_key(key)?;
let vwe = ValueWithExpiry::from_value(expire_time, val)?;
let sv_with_ts = vwe.serialize_for_storage();
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.put(sk, sv_with_ts);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.put_cf(&handle, sk, sv_with_ts);
}
Ok(self)
}
pub fn merge<Key, PatchVal>(&mut self, key: Key, merge_value: &MergeValue<PatchVal>) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
PatchVal: Serialize + Debug,
{
self.check_not_committed()?;
let sk = serialization::serialize_key(key)?;
let smo = serialization::serialize_value(merge_value)?;
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.merge(sk, smo);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.merge_cf(&handle, sk, smo);
}
Ok(self)
}
pub fn merge_raw<Key>(&mut self, key: Key, raw_merge_op: &[u8]) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
{
self.check_not_committed()?;
let sk = serialization::serialize_key(key)?;
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.merge(sk, raw_merge_op);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.merge_cf(&handle, sk, raw_merge_op);
}
Ok(self)
}
pub fn delete<Key>(&mut self, key: Key) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
{
self.check_not_committed()?;
let sk = serialization::serialize_key(key)?;
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.delete(sk);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.delete_cf(&handle, sk);
}
Ok(self)
}
pub fn delete_range<Key>(&mut self, start_key: Key, end_key: Key) -> StoreResult<&mut Self>
where
Key: AsBytes + Hash + Eq + PartialEq + Debug,
{
self.check_not_committed()?;
let sks = serialization::serialize_key(start_key)?;
let ske = serialization::serialize_key(end_key)?;
let current_batch = &mut *self.batch;
if self.cf_name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME {
current_batch.delete_range(sks, ske);
} else {
let handle = self.store.get_cf_handle(&self.cf_name)?;
current_batch.delete_range_cf(&handle, sks, ske);
}
Ok(self)
}
pub fn raw_batch_mut(&mut self) -> StoreResult<&mut WriteBatch> {
self.check_not_committed()?;
Ok(&mut *self.batch)
}
pub fn commit(mut self) -> StoreResult<()> {
self.check_not_committed()?;
let batch_to_commit = unsafe { ManuallyDrop::take(&mut self.batch) };
self
.store
.db_raw()
.write(batch_to_commit)
.map_err(StoreError::RocksDb)?;
self.committed_or_discarded = true;
Ok(())
}
pub fn discard(mut self) {
let _batch_to_discard = unsafe { ManuallyDrop::take(&mut self.batch) };
self.committed_or_discarded = true;
}
}
impl<'a> Drop for BatchWriter<'a> {
fn drop(&mut self) {
if !self.committed_or_discarded {
log::warn!(
"BatchWriter for DB at '{}' (CF: '{}') dropped without calling commit() or discard(). Batch operations were NOT applied.",
self.store.path(),
self.cf_name
);
}
}
}