pub mod item;
use crate::{Database, Keyspace, PersistMode};
use item::Item;
use lsm_tree::{AbstractTree, UserKey, UserValue, ValueType};
use std::collections::HashSet;
pub struct WriteBatch {
pub(crate) data: Vec<Item>,
db: Database,
durability: Option<PersistMode>,
}
impl WriteBatch {
pub(crate) fn new(db: Database) -> Self {
Self {
data: Vec::new(),
db,
durability: None,
}
}
#[must_use]
pub fn with_capacity(db: Database, capacity: usize) -> Self {
Self {
data: Vec::with_capacity(capacity),
db,
durability: None,
}
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
self.durability = mode;
self
}
pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(&mut self, p: &Keyspace, key: K, value: V) {
self.data
.push(Item::new(p.clone(), key, value, ValueType::Value));
}
pub fn remove<K: Into<UserKey>>(&mut self, p: &Keyspace, key: K) {
self.data
.push(Item::new(p.clone(), key, vec![], ValueType::Tombstone));
}
#[doc(hidden)]
pub fn remove_weak<K: Into<UserKey>>(&mut self, p: &Keyspace, key: K) {
self.data
.push(Item::new(p.clone(), key, vec![], ValueType::WeakTombstone));
}
#[allow(clippy::missing_panics_doc)]
pub fn commit(mut self) -> crate::Result<()> {
use std::sync::atomic::Ordering;
if self.is_empty() {
return Ok(());
}
log::trace!("batch: Acquiring journal writer");
let mut journal_writer = self.db.supervisor.journal.get_writer();
if self.db.is_poisoned.load(Ordering::Relaxed) {
return Err(crate::Error::Poisoned);
}
let batch_seqno = self.db.supervisor.seqno.next();
let _ = journal_writer.write_batch(self.data.iter(), self.data.len(), batch_seqno);
if let Some(mode) = self.durability {
if let Err(e) = journal_writer.persist(mode) {
self.db.is_poisoned.store(true, Ordering::Release);
log::error!(
"persist failed, which is a FATAL, and possibly hardware-related, failure: {e:?}"
);
return Err(crate::Error::Poisoned);
}
}
#[expect(clippy::mutable_key_type)]
let mut keyspaces_with_possible_stall = HashSet::new();
#[expect(clippy::expect_used)]
let keyspaces = self
.db
.supervisor
.keyspaces
.read()
.expect("lock is poisoned");
let mut batch_size = 0u64;
log::trace!("Applying batch (size={}) to memtable(s)", self.data.len());
for item in std::mem::take(&mut self.data) {
let (item_size, _) = match item.value_type {
ValueType::Value => item.keyspace.tree.insert(item.key, item.value, batch_seqno),
ValueType::Tombstone => item.keyspace.tree.remove(item.key, batch_seqno),
ValueType::WeakTombstone => item.keyspace.tree.remove_weak(item.key, batch_seqno),
ValueType::Indirection => unreachable!(),
};
batch_size += item_size;
keyspaces_with_possible_stall.insert(item.keyspace.clone());
}
self.db.supervisor.snapshot_tracker.publish(batch_seqno);
drop(journal_writer);
log::trace!("batch: Freed journal writer");
drop(keyspaces);
self.db.supervisor.write_buffer_size.allocate(batch_size);
for keyspace in &keyspaces_with_possible_stall {
let memtable_size = keyspace.tree.active_memtable().size();
keyspace.check_memtable_rotate(memtable_size);
keyspace.local_backpressure();
}
Ok(())
}
}