use std::{
collections::HashMap,
sync::{atomic::Ordering, Arc},
};
use bytes::{BufMut, Bytes, BytesMut};
use parking_lot::Mutex;
use prost::{decode_length_delimiter, encode_length_delimiter};
use crate::{
db::{Db, ErrDb, LogDb, LogDbType, ResultDb, WriteBatchOptions},
lite::LiteDb,
};
const TXN_FIN_KEY: &[u8] = "txn-fin".as_bytes();
pub(crate) const NON_TRANSACTION_SEQ_NO: usize = 0;
pub struct WriteBatch<'a> {
pub(super) pending: Arc<Mutex<HashMap<Vec<u8>, LogDb>>>,
pub(super) db: &'a LiteDb,
pub(super) options: WriteBatchOptions,
}
impl WriteBatch<'_> {
pub fn put(&self, key: Bytes, value: Bytes) -> ResultDb<()> {
if key.is_empty() {
return Err(ErrDb::InvalidParameter);
}
let log_db = LogDb {
key: key.to_vec(),
value: value.to_vec(),
rec_type: LogDbType::NORMAL,
};
let mut pending_writes = self.pending.lock();
pending_writes.insert(key.to_vec(), log_db);
Ok(())
}
pub fn delete(&self, key: Bytes) -> ResultDb<()> {
if key.is_empty() {
return Err(ErrDb::InvalidParameter);
}
let mut pending_writes = self.pending.lock();
let index_pos = self.db.index.get(key.to_vec());
if index_pos.is_none() {
if pending_writes.contains_key(&key.to_vec()) {
pending_writes.remove(&key.to_vec());
}
return Ok(());
}
let log_db = LogDb {
key: key.to_vec(),
value: Default::default(),
rec_type: LogDbType::DELETED,
};
pending_writes.insert(key.to_vec(), log_db);
Ok(())
}
pub fn commit(&self) -> ResultDb<()> {
let mut pending_writes = self.pending.lock();
if pending_writes.is_empty() {
return Ok(());
}
if pending_writes.len() > self.options.max_batch_num {
return Err(ErrDb::InvalidBatch);
}
let _lock = self.db.batch_commit_lock.lock();
let seq_no = self.db.seq_no.fetch_add(1, Ordering::SeqCst);
let mut positions = HashMap::new();
for (_, item) in pending_writes.iter() {
let mut log_db = LogDb {
key: log_db_key_with_seq(item.key.clone(), seq_no),
value: item.value.clone(),
rec_type: item.rec_type,
};
let pos = self.db.append_log_db(&mut log_db)?;
positions.insert(item.key.clone(), pos);
}
let mut finish_log_db = LogDb {
key: log_db_key_with_seq(TXN_FIN_KEY.to_vec(), seq_no),
value: Default::default(),
rec_type: LogDbType::TXNFINISHED,
};
self.db.append_log_db(&mut finish_log_db)?;
if self.options.sync_writes {
self.db.sync()?;
}
for (_, item) in pending_writes.iter() {
if item.rec_type == LogDbType::NORMAL {
let log_db_pos = positions.get(&item.key).unwrap();
if let Some(old_pos) = self.db.index.put(item.key.clone(), *log_db_pos) {
self.db.reclaim_size.fetch_add(old_pos.size as usize, Ordering::SeqCst);
}
}
if item.rec_type == LogDbType::DELETED {
if let Some(old_pos) = self.db.index.delete(item.key.clone()) {
self.db.reclaim_size.fetch_add(old_pos.size as usize, Ordering::SeqCst);
}
}
}
pending_writes.clear();
Ok(())
}
}
pub(crate) fn log_db_key_with_seq(key: Vec<u8>, seq_no: usize) -> Vec<u8> {
let mut enc_key = BytesMut::new();
encode_length_delimiter(seq_no, &mut enc_key).unwrap();
enc_key.extend_from_slice(&key.to_vec());
enc_key.to_vec()
}
pub(crate) fn parse_log_db_key(key: Vec<u8>) -> (Vec<u8>, usize) {
let mut buf = BytesMut::new();
buf.put_slice(&key);
let seq_no = decode_length_delimiter(&mut buf).unwrap();
(buf.to_vec(), seq_no)
}
#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::atomic::Ordering};
use crate::{
db::{Closer, Config, ErrDb, Getter, WriteBatchOptions},
kits,
lite::LiteDb,
};
#[test]
fn test_write_batch_1() {
let mut config = Config::default();
config.path_db = PathBuf::from("/tmp/bitcask-rs-batch-1");
config.file_size_db = 64 * 1024 * 1024;
let lite_db = LiteDb::open(config.clone()).expect("failed to open engine");
let wb = lite_db.new_write_batch(WriteBatchOptions::default()).expect("failed to create write batch");
let put_res1 = wb.put(kits::rand_kv::get_test_key(1), kits::rand_kv::get_test_value(10));
assert!(put_res1.is_ok());
let put_res2 = wb.put(kits::rand_kv::get_test_key(2), kits::rand_kv::get_test_value(10));
assert!(put_res2.is_ok());
let res1 = lite_db.get(&kits::rand_kv::get_test_key(1));
assert_eq!(ErrDb::NotFindKey, res1.err().unwrap());
let commit_res = wb.commit();
assert!(commit_res.is_ok());
let res2 = lite_db.get(&kits::rand_kv::get_test_key(1));
assert!(res2.is_ok());
let seq_no = wb.db.seq_no.load(Ordering::SeqCst);
assert_eq!(2, seq_no);
std::fs::remove_dir_all(config.path_db.clone()).expect("failed to remove path");
}
#[test]
fn test_write_batch_2() {
let mut config = Config::default();
config.path_db = PathBuf::from("/tmp/bitcask-rs-batch-2");
config.file_size_db = 64 * 1024 * 1024;
let engine = LiteDb::open(config.clone()).expect("failed to open engine");
let wb = engine.new_write_batch(WriteBatchOptions::default()).expect("failed to create write batch");
let put_res1 = wb.put(kits::rand_kv::get_test_key(1), kits::rand_kv::get_test_value(10));
assert!(put_res1.is_ok());
let put_res2 = wb.put(kits::rand_kv::get_test_key(2), kits::rand_kv::get_test_value(10));
assert!(put_res2.is_ok());
let commit_res1 = wb.commit();
assert!(commit_res1.is_ok());
let put_res3 = wb.put(kits::rand_kv::get_test_key(1), kits::rand_kv::get_test_value(10));
assert!(put_res3.is_ok());
let commit_res2 = wb.commit();
assert!(commit_res2.is_ok());
engine.close().expect("failed to close");
std::mem::drop(engine);
let lite_db2 = LiteDb::open(config.clone()).expect("failed to open engine");
let seq_no = lite_db2.seq_no.load(Ordering::SeqCst);
assert_eq!(3, seq_no);
std::fs::remove_dir_all(config.path_db.clone()).expect("failed to remove path");
}
}