use bytes::Bytes;
use kimberlite_types::Offset;
use crate::Key;
use crate::types::TableId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WriteOp {
Put {
table: TableId,
key: Key,
value: Bytes,
},
Delete { table: TableId, key: Key },
}
impl WriteOp {
pub fn table(&self) -> TableId {
match self {
WriteOp::Put { table, .. } | WriteOp::Delete { table, .. } => *table,
}
}
pub fn key(&self) -> &Key {
match self {
WriteOp::Put { key, .. } | WriteOp::Delete { key, .. } => key,
}
}
}
#[derive(Debug, Clone)]
pub struct WriteBatch {
position: Offset,
operations: Vec<WriteOp>,
}
impl WriteBatch {
pub fn new(position: Offset) -> Self {
Self {
position,
operations: Vec::new(),
}
}
pub fn with_capacity(position: Offset, capacity: usize) -> Self {
Self {
position,
operations: Vec::with_capacity(capacity),
}
}
pub fn position(&self) -> Offset {
self.position
}
pub fn len(&self) -> usize {
self.operations.len()
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
#[must_use]
pub fn put(mut self, table: TableId, key: Key, value: Bytes) -> Self {
self.operations.push(WriteOp::Put { table, key, value });
self
}
#[must_use]
pub fn delete(mut self, table: TableId, key: Key) -> Self {
self.operations.push(WriteOp::Delete { table, key });
self
}
pub fn push_put(&mut self, table: TableId, key: Key, value: Bytes) {
self.operations.push(WriteOp::Put { table, key, value });
}
pub fn push_delete(&mut self, table: TableId, key: Key) {
self.operations.push(WriteOp::Delete { table, key });
}
pub fn iter(&self) -> impl Iterator<Item = &WriteOp> {
self.operations.iter()
}
pub fn into_operations(self) -> Vec<WriteOp> {
self.operations
}
pub fn operations(&self) -> &[WriteOp] {
&self.operations
}
}
impl IntoIterator for WriteBatch {
type Item = WriteOp;
type IntoIter = std::vec::IntoIter<WriteOp>;
fn into_iter(self) -> Self::IntoIter {
self.operations.into_iter()
}
}
impl<'a> IntoIterator for &'a WriteBatch {
type Item = &'a WriteOp;
type IntoIter = std::slice::Iter<'a, WriteOp>;
fn into_iter(self) -> Self::IntoIter {
self.operations.iter()
}
}
#[cfg(test)]
mod batch_tests {
use super::*;
#[test]
fn test_batch_builder() {
let batch = WriteBatch::new(Offset::new(10))
.put(TableId::new(1), Key::from("key1"), Bytes::from("value1"))
.delete(TableId::new(1), Key::from("key2"))
.put(TableId::new(2), Key::from("key3"), Bytes::from("value3"));
assert_eq!(batch.position(), Offset::new(10));
assert_eq!(batch.len(), 3);
let ops: Vec<_> = batch.iter().collect();
assert!(matches!(ops[0], WriteOp::Put { .. }));
assert!(matches!(ops[1], WriteOp::Delete { .. }));
assert!(matches!(ops[2], WriteOp::Put { .. }));
}
#[test]
fn test_empty_batch() {
let batch = WriteBatch::new(Offset::new(1));
assert!(batch.is_empty());
assert_eq!(batch.len(), 0);
}
}