use bytes::{Buf, BufMut, Bytes, BytesMut};
use integer_encoding::VarInt;
use tempest_core::utils::PrettyBytes;
use crate::base::{KeyKind, KeyTrailer, SeqNum};
#[derive(derive_more::Debug)]
#[debug("WriteBatch(len={}, count={}, committed={})", buf.len(), count, committed)]
pub struct WriteBatch {
buf: BytesMut,
count: u32,
committed: bool,
}
impl WriteBatch {
pub fn new() -> Self {
let mut buf = BytesMut::with_capacity(4096);
buf.put_bytes(0, 12);
Self {
buf,
count: 0,
committed: false,
}
}
pub fn from_wal(buf: BytesMut) -> Self {
let count = u32::from_le_bytes(buf[8..12].try_into().unwrap());
Self {
buf,
count,
committed: true,
}
}
pub fn put(&mut self, key: &[u8], value: &[u8]) -> &mut Self {
trace!(
key = ?PrettyBytes(key), value = ?PrettyBytes(value),
"write batch: put",
);
self.count += 1;
self.put_varint(key.len());
self.buf.put(key);
self.buf.put_u8(KeyKind::Put.into());
self.put_varint(value.len());
self.buf.put(value);
self
}
pub fn delete(&mut self, key: &[u8]) -> &mut Self {
trace!(
key = ?PrettyBytes(key),
"write batch: delete",
);
self.count += 1;
self.put_varint(key.len());
self.buf.put(key);
self.buf.put_u8(KeyKind::Delete.into());
self
}
pub(crate) fn put_varint(&mut self, i: usize) {
let varint_size = i.required_space();
self.buf.put_bytes(0, varint_size);
let buflen = self.buf.len();
let written = i.encode_var(&mut self.buf[buflen - varint_size..]);
debug_assert_eq!(written, varint_size);
}
pub(crate) fn commit(&mut self, seqnum: SeqNum) {
assert!(!self.committed);
self.buf[0..8].copy_from_slice(&seqnum.get().to_le_bytes());
self.buf[8..12].copy_from_slice(&self.count.to_le_bytes());
self.committed = true;
}
pub(crate) fn seqnum(buf: &[u8]) -> SeqNum {
let seqnum_raw = u64::from_le_bytes(buf[0..8].try_into().unwrap());
SeqNum::new(seqnum_raw).expect("batch contained invalid seqnum")
}
pub(crate) fn into_iter(self) -> WriteBatchIterator {
WriteBatchIterator::new(self.buf.freeze())
}
pub(crate) fn count(&self) -> u32 {
self.count
}
pub(crate) fn buf(&self) -> &[u8] {
assert!(
self.committed,
"trying to access a write batch, that was not assigned a seqnum"
);
&self.buf
}
}
pub(crate) struct WriteBatchIterator {
seqnum: SeqNum,
buf: Bytes,
}
impl WriteBatchIterator {
pub(crate) fn new(mut buf: Bytes) -> Self {
let seqnum_raw = u64::from_le_bytes(buf[0..8].try_into().unwrap());
let seqnum = SeqNum::new(seqnum_raw).expect("batch contained invalid seqnum");
buf.advance(12); Self { seqnum, buf }
}
}
impl Iterator for WriteBatchIterator {
type Item = (Bytes, KeyTrailer, Bytes);
fn next(&mut self) -> Option<Self::Item> {
if self.buf.is_empty() {
return None;
}
let (key_len, varint_len) =
usize::decode_var(&self.buf).expect("invalid key len varint in write batch buf");
self.buf.advance(varint_len);
let key = self.buf.split_to(key_len);
let key_kind = self.buf[0];
self.buf.advance(1);
let kind = KeyKind::try_from(key_kind).expect("invalid key kind in write batch buf");
let value = match kind {
KeyKind::Delete => Bytes::new(),
KeyKind::Put => {
let (value_len, varint_len) = usize::decode_var(&self.buf)
.expect("invalid value len varint in write batch buf");
self.buf.advance(varint_len);
let value = self.buf.split_to(value_len);
value
}
};
Some((key, KeyTrailer::new(self.seqnum, kind), value))
}
}