use crate::config::{MergeOptions, PutOptions};
use crate::error::SlateDBError;
use crate::iter::{IterationOrder, KeyValueIterator};
use crate::mem_table::{KVTableInternalKeyRange, SequencedKey};
use crate::merge_operator::{MergeOperatorIterator, MergeOperatorType};
use crate::types::{RowEntry, ValueDeletable};
use async_trait::async_trait;
use bytes::Bytes;
use std::collections::{BTreeMap, HashSet};
use std::iter::Peekable;
use std::ops::RangeBounds;
use uuid::Uuid;
#[derive(Clone, Debug)]
pub struct WriteBatch {
pub(crate) ops: BTreeMap<SequencedKey, WriteOp>,
pub(crate) txn_id: Option<Uuid>,
pub(crate) write_idx: u64,
}
impl Default for WriteBatch {
fn default() -> Self {
Self::new()
}
}
#[derive(PartialEq, Clone)]
pub(crate) enum WriteOp {
Put(Bytes, Bytes, PutOptions),
Delete(Bytes),
Merge(Bytes, Bytes, MergeOptions),
}
impl std::fmt::Debug for WriteOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
fn trunc(bytes: &Bytes) -> String {
if bytes.len() > 10 {
format!("{:?}...", &bytes[..10])
} else {
format!("{:?}", bytes)
}
}
match self {
WriteOp::Put(key, value, options) => {
let key = trunc(key);
let value = trunc(value);
write!(f, "Put({key}, {value}, {:?})", options)
}
WriteOp::Delete(key) => {
let key = trunc(key);
write!(f, "Delete({key})")
}
WriteOp::Merge(key, value, options) => {
let key = trunc(key);
let value = trunc(value);
write!(f, "Merge({key}, {value}, {:?})", options)
}
}
}
}
impl WriteOp {
pub(crate) fn to_row_entry(
&self,
seq: u64,
create_ts: Option<i64>,
expire_ts: Option<i64>,
) -> RowEntry {
match self {
WriteOp::Put(key, value, _options) => {
RowEntry::new(
key.clone(),
ValueDeletable::Value(value.clone()),
seq,
create_ts,
expire_ts,
)
}
WriteOp::Delete(key) => RowEntry::new(
key.clone(),
ValueDeletable::Tombstone,
seq,
create_ts,
expire_ts,
),
WriteOp::Merge(key, value, _options) => RowEntry::new(
key.clone(),
ValueDeletable::Merge(value.clone()),
seq,
create_ts,
expire_ts,
),
}
}
}
impl WriteBatch {
pub fn new() -> Self {
WriteBatch {
ops: BTreeMap::new(),
txn_id: None,
write_idx: 0,
}
}
fn remove_ops_by_key(&mut self, key: &Bytes) {
let start = SequencedKey::new(key.clone(), u64::MAX);
let end = SequencedKey::new(key.clone(), 0);
let keys_to_remove: Vec<SequencedKey> = self
.ops
.range(start..=end)
.map(|(k, _)| k.clone())
.collect();
for k in keys_to_remove {
self.ops.remove(&k);
}
}
pub(crate) fn with_txn_id(self, txn_id: Uuid) -> Self {
Self {
ops: self.ops,
txn_id: Some(txn_id),
write_idx: self.write_idx,
}
}
fn assert_kv<K, V>(&self, key: &K, value: &V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
let key = key.as_ref();
let value = value.as_ref();
assert!(!key.is_empty(), "key cannot be empty");
assert!(
key.len() <= u16::MAX as usize,
"key size must be <= u16::MAX"
);
assert!(
value.len() <= u32::MAX as usize,
"value size must be <= u32::MAX"
);
}
pub fn put<K, V>(&mut self, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.put_with_options(key, value, &PutOptions::default())
}
pub fn put_with_options<K, V>(&mut self, key: K, value: V, options: &PutOptions)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.assert_kv(&key, &value);
let key = Bytes::copy_from_slice(key.as_ref());
self.remove_ops_by_key(&key);
self.ops.insert(
SequencedKey::new(key.clone(), self.write_idx),
WriteOp::Put(
key.clone(),
Bytes::copy_from_slice(value.as_ref()),
options.clone(),
),
);
self.write_idx += 1;
}
pub fn merge<K, V>(&mut self, key: K, value: V)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.merge_with_options(key, value, &MergeOptions::default());
}
pub fn merge_with_options<K, V>(&mut self, key: K, value: V, options: &MergeOptions)
where
K: AsRef<[u8]>,
V: AsRef<[u8]>,
{
self.assert_kv(&key, &value);
let key = Bytes::copy_from_slice(key.as_ref());
self.ops.insert(
SequencedKey::new(key.clone(), self.write_idx),
WriteOp::Merge(
key.clone(),
Bytes::copy_from_slice(value.as_ref()),
options.clone(),
),
);
self.write_idx += 1;
}
pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) {
self.assert_kv(&key, &[]);
let key = Bytes::copy_from_slice(key.as_ref());
self.remove_ops_by_key(&key);
self.ops.insert(
SequencedKey::new(key.clone(), self.write_idx),
WriteOp::Delete(key),
);
self.write_idx += 1;
}
pub(crate) fn keys(&self) -> HashSet<Bytes> {
self.ops.keys().map(|key| key.user_key.clone()).collect()
}
pub(crate) fn is_empty(&self) -> bool {
self.ops.is_empty()
}
pub(crate) async fn extract_entries(
&self,
seq: u64,
now: i64,
default_ttl: Option<u64>,
merger: Option<MergeOperatorType>,
) -> Result<Vec<RowEntry>, SlateDBError> {
let mut it: Box<dyn KeyValueIterator> = Box::new(WriteBatchIterator::new_with_seq_and_ttl(
self,
..,
IterationOrder::Ascending,
seq,
now,
default_ttl,
));
if let Some(ref merge_operator) = merger {
it = Box::new(MergeOperatorIterator::new(
merge_operator.clone(),
it,
false,
now,
None,
));
}
let mut entries = Vec::new();
while let Some(entry) = it.next_entry().await? {
entries.push(entry);
}
Ok(entries)
}
}
pub(crate) struct WriteBatchIterator {
iter: Peekable<Box<dyn Iterator<Item = (SequencedKey, RowEntry)> + Send + Sync>>,
ordering: IterationOrder,
}
impl WriteBatchIterator {
pub(crate) fn new(
batch: WriteBatch,
range: impl RangeBounds<Bytes>,
ordering: IterationOrder,
) -> Self {
let range = KVTableInternalKeyRange::from(range);
let mut entries: Vec<(SequencedKey, RowEntry)> = batch
.ops
.range(range)
.map(|(k, v)| (k.clone(), v.to_row_entry(u64::MAX, None, None)))
.collect();
if matches!(ordering, IterationOrder::Descending) {
entries.reverse();
}
let iter: Box<dyn Iterator<Item = (SequencedKey, RowEntry)> + Send + Sync> =
Box::new(entries.into_iter());
Self {
iter: iter.peekable(),
ordering,
}
}
pub(crate) fn new_with_seq_and_ttl(
batch: &WriteBatch,
range: impl RangeBounds<Bytes>,
ordering: IterationOrder,
seq: u64,
now: i64,
default_ttl: Option<u64>,
) -> Self {
let range = KVTableInternalKeyRange::from(range);
let mut entries: Vec<(SequencedKey, RowEntry)> = batch
.ops
.range(range)
.map(|(k, v)| {
let expire_ts = match v {
WriteOp::Put(_, _, opts) => opts.expire_ts_from(default_ttl, now),
WriteOp::Merge(_, _, opts) => opts.expire_ts_from(default_ttl, now),
WriteOp::Delete(_) => None,
};
(k.clone(), v.to_row_entry(seq, Some(now), expire_ts))
})
.collect();
if matches!(ordering, IterationOrder::Descending) {
entries.reverse();
}
let iter: Box<dyn Iterator<Item = (SequencedKey, RowEntry)> + Send + Sync> =
Box::new(entries.into_iter());
Self {
iter: iter.peekable(),
ordering,
}
}
}
#[async_trait]
impl KeyValueIterator for WriteBatchIterator {
async fn init(&mut self) -> Result<(), crate::error::SlateDBError> {
Ok(())
}
async fn next_entry(&mut self) -> Result<Option<RowEntry>, crate::error::SlateDBError> {
Ok(self.iter.next().map(|(_, entry)| entry))
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), crate::error::SlateDBError> {
while let Some((key, _)) = self.iter.peek() {
if match self.ordering {
IterationOrder::Ascending => key.user_key.as_ref() < next_key,
IterationOrder::Descending => key.user_key.as_ref() > next_key,
} {
self.iter.next();
} else {
break;
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::bytes_range::BytesRange;
use crate::config::Ttl;
use crate::test_utils::assert_iterator;
use crate::types::RowEntry;
struct WriteOpTestCase {
key: Vec<u8>,
value: Option<Vec<u8>>,
options: PutOptions,
}
#[rstest]
#[case(vec![WriteOpTestCase {
key: b"key".to_vec(),
value: Some(b"value".to_vec()),
options: PutOptions::default(),
}])]
#[case(vec![WriteOpTestCase {
key: b"key".to_vec(),
value: None,
options: PutOptions::default(),
}])]
#[should_panic(expected = "key size must be <= u16::MAX")]
#[case(vec![WriteOpTestCase {
key: vec![b'k'; 65_536], // 2^16
value: None,
options: PutOptions::default(),
}])]
#[should_panic(expected = "value size must be <= u32::MAX")]
#[case(vec![WriteOpTestCase {
key: b"key".to_vec(),
value: Some(vec![b'x'; u32::MAX as usize + 1]), // 2^32
options: PutOptions::default(),
}])]
#[should_panic(expected = "key cannot be empty")]
#[case(vec![WriteOpTestCase {
key: b"".to_vec(),
value: Some(b"value".to_vec()),
options: PutOptions::default(),
}])]
#[should_panic(expected = "key cannot be empty")]
#[case(vec![WriteOpTestCase {
key: b"".to_vec(),
value: None,
options: PutOptions::default(),
}])]
fn test_put_delete_batch(#[case] test_case: Vec<WriteOpTestCase>) {
let mut batch = WriteBatch::new();
let mut expected_ops: BTreeMap<SequencedKey, WriteOp> = BTreeMap::new();
for (seq, test_case) in test_case.into_iter().enumerate() {
if let Some(value) = test_case.value {
batch.put_with_options(
test_case.key.as_slice(),
value.as_slice(),
&test_case.options,
);
expected_ops.insert(
SequencedKey::new(Bytes::from(test_case.key.clone()), seq as u64),
WriteOp::Put(
Bytes::from(test_case.key),
Bytes::from(value),
test_case.options,
),
);
} else {
batch.delete(test_case.key.as_slice());
expected_ops.insert(
SequencedKey::new(Bytes::from(test_case.key.clone()), seq as u64),
WriteOp::Delete(Bytes::from(test_case.key)),
);
}
}
assert_eq!(batch.ops, expected_ops);
}
#[test]
fn test_writebatch_deduplication() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key1", b"value2");
assert_eq!(batch.ops.len(), 1); let op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 1))
.unwrap();
match op {
WriteOp::Put(_, value, _) => assert_eq!(value.as_ref(), b"value2"),
_ => panic!("Expected Put operation"),
}
}
#[tokio::test]
async fn test_writebatch_iterator_basic() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
batch.put(b"key2", b"value2");
batch.delete(b"key4");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
let expected = vec![
RowEntry::new_value(b"key1", b"value1", u64::MAX),
RowEntry::new_value(b"key2", b"value2", u64::MAX),
RowEntry::new_value(b"key3", b"value3", u64::MAX),
RowEntry::new(
Bytes::from("key4"),
ValueDeletable::Tombstone,
u64::MAX,
None,
None,
),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn test_writebatch_iterator_range() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
batch.put(b"key5", b"value5");
let mut iter = WriteBatchIterator::new(
batch.clone(),
BytesRange::from(Bytes::from_static(b"key2")..Bytes::from_static(b"key4")),
IterationOrder::Ascending,
);
let expected = vec![RowEntry::new_value(b"key3", b"value3", u64::MAX)];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn test_writebatch_iterator_descending() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
batch.put(b"key2", b"value2");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Descending);
let expected = vec![
RowEntry::new_value(b"key3", b"value3", u64::MAX),
RowEntry::new_value(b"key2", b"value2", u64::MAX),
RowEntry::new_value(b"key1", b"value1", u64::MAX),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn test_writebatch_iterator_seek_ascending() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
batch.put(b"key5", b"value5");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
iter.seek(b"key3").await.unwrap();
let expected = vec![
RowEntry::new_value(b"key3", b"value3", u64::MAX),
RowEntry::new_value(b"key5", b"value5", u64::MAX),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn test_writebatch_iterator_seek_descending() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
batch.put(b"key5", b"value5");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Descending);
iter.seek(b"key3").await.unwrap();
let expected = vec![
RowEntry::new_value(b"key3", b"value3", u64::MAX),
RowEntry::new_value(b"key1", b"value1", u64::MAX),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn test_writebatch_iterator_empty_batch() {
let batch = WriteBatch::new();
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
let result = iter.next_entry().await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_writebatch_iterator_seek_to_nonexistent() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
iter.seek(b"key2").await.unwrap();
let result = iter.next_entry().await.unwrap();
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.key, Bytes::from_static(b"key3"));
assert_eq!(
entry.value,
ValueDeletable::Value(Bytes::from_static(b"value3"))
);
}
#[tokio::test]
async fn test_writebatch_iterator_seek_beyond_end() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key3", b"value3");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
iter.seek(b"key9").await.unwrap();
let result = iter.next_entry().await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_writebatch_iterator_multiple_tombstones() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.delete(b"key2");
batch.put(b"key3", b"value3");
batch.delete(b"key4");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
let expected = vec![
RowEntry::new_value(b"key1", b"value1", u64::MAX),
RowEntry::new(
Bytes::from_static(b"key2"),
ValueDeletable::Tombstone,
u64::MAX,
None,
None,
),
RowEntry::new_value(b"key3", b"value3", u64::MAX),
RowEntry::new(
Bytes::from_static(b"key4"),
ValueDeletable::Tombstone,
u64::MAX,
None,
None,
),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn test_writebatch_iterator_seek_before_first() {
let mut batch = WriteBatch::new();
batch.put(b"key2", b"value2");
batch.put(b"key3", b"value3");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
iter.seek(b"key1").await.unwrap();
let result = iter.next_entry().await.unwrap();
assert!(result.is_some());
let entry = result.unwrap();
assert_eq!(entry.key, Bytes::from_static(b"key2"));
}
#[tokio::test]
async fn test_writebatch_iterator_range_with_tombstones() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.delete(b"key2");
batch.put(b"key3", b"value3");
batch.delete(b"key4");
batch.put(b"key5", b"value5");
let mut iter = WriteBatchIterator::new(
batch.clone(),
BytesRange::from(Bytes::from_static(b"key2")..Bytes::from_static(b"key4")),
IterationOrder::Ascending,
);
let expected = vec![
RowEntry::new(
Bytes::from_static(b"key2"),
ValueDeletable::Tombstone,
u64::MAX,
None,
None,
),
RowEntry::new_value(b"key3", b"value3", u64::MAX),
];
assert_iterator(&mut iter, expected).await;
}
#[test]
fn should_create_merge_operation_with_default_options() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"value1");
assert_eq!(batch.ops.len(), 1);
let op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 0))
.unwrap();
match op {
WriteOp::Merge(key, value, options) => {
assert_eq!(key.as_ref(), b"key1");
assert_eq!(value.as_ref(), b"value1");
assert_eq!(options, &MergeOptions::default());
}
_ => panic!("Expected Merge operation"),
}
}
#[test]
fn should_create_merge_operation_with_custom_options() {
let mut batch = WriteBatch::new();
let merge_options = MergeOptions {
ttl: Ttl::ExpireAfter(3600), };
batch.merge_with_options(b"key1", b"value1", &merge_options);
assert_eq!(batch.ops.len(), 1);
let op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 0))
.unwrap();
match op {
WriteOp::Merge(key, value, options) => {
assert_eq!(key.as_ref(), b"key1");
assert_eq!(value.as_ref(), b"value1");
assert_eq!(options.ttl, Ttl::ExpireAfter(3600));
}
_ => panic!("Expected Merge operation"),
}
}
#[test]
fn should_allow_multiple_merges_for_same_key() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"value1");
batch.merge(b"key1", b"value2");
batch.merge(b"key1", b"value3");
assert_eq!(batch.ops.len(), 3);
let op1 = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 0))
.unwrap();
let op2 = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 1))
.unwrap();
let op3 = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 2))
.unwrap();
match (op1, op2, op3) {
(WriteOp::Merge(_, v1, _), WriteOp::Merge(_, v2, _), WriteOp::Merge(_, v3, _)) => {
assert_eq!(v1.as_ref(), b"value1");
assert_eq!(v2.as_ref(), b"value2");
assert_eq!(v3.as_ref(), b"value3");
}
_ => panic!("Expected all Merge operations"),
}
}
#[test]
fn should_allow_merges_for_different_keys() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"value1");
batch.merge(b"key2", b"value2");
batch.merge(b"key3", b"value3");
assert_eq!(batch.ops.len(), 3);
assert!(batch
.ops
.contains_key(&SequencedKey::new(Bytes::from_static(b"key1"), 0)));
assert!(batch
.ops
.contains_key(&SequencedKey::new(Bytes::from_static(b"key2"), 1)));
assert!(batch
.ops
.contains_key(&SequencedKey::new(Bytes::from_static(b"key3"), 2)));
}
#[test]
fn should_preserve_both_put_and_merge_when_merge_comes_after_put() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"put_value");
batch.merge(b"key1", b"merge_value");
assert_eq!(batch.ops.len(), 2);
let put_op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 0))
.unwrap();
match put_op {
WriteOp::Put(key, value, _) => {
assert_eq!(key.as_ref(), b"key1");
assert_eq!(value.as_ref(), b"put_value");
}
_ => panic!("Expected Put operation"),
}
let merge_op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 1))
.unwrap();
match merge_op {
WriteOp::Merge(key, value, _) => {
assert_eq!(key.as_ref(), b"key1");
assert_eq!(value.as_ref(), b"merge_value");
}
_ => panic!("Expected Merge operation"),
}
}
#[test]
fn should_preserve_both_delete_and_merge_when_merge_comes_after_delete() {
let mut batch = WriteBatch::new();
batch.delete(b"key1");
batch.merge(b"key1", b"merge_value");
assert_eq!(batch.ops.len(), 2);
let delete_op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 0))
.unwrap();
match delete_op {
WriteOp::Delete(key) => {
assert_eq!(key.as_ref(), b"key1");
}
_ => panic!("Expected Delete operation"),
}
let merge_op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 1))
.unwrap();
match merge_op {
WriteOp::Merge(key, value, _) => {
assert_eq!(key.as_ref(), b"key1");
assert_eq!(value.as_ref(), b"merge_value");
}
_ => panic!("Expected Merge operation"),
}
}
#[test]
fn should_deduplicate_merge_when_put_is_added_for_same_key() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge_value");
batch.put(b"key1", b"put_value");
assert_eq!(batch.ops.len(), 1);
let op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 1))
.unwrap();
match op {
WriteOp::Put(key, value, _) => {
assert_eq!(key.as_ref(), b"key1");
assert_eq!(value.as_ref(), b"put_value");
}
_ => panic!("Expected Put operation"),
}
}
#[test]
fn should_deduplicate_merge_when_delete_is_added_for_same_key() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge_value");
batch.delete(b"key1");
assert_eq!(batch.ops.len(), 1);
let op = batch
.ops
.get(&SequencedKey::new(Bytes::from_static(b"key1"), 1))
.unwrap();
match op {
WriteOp::Delete(key) => {
assert_eq!(key.as_ref(), b"key1");
}
_ => panic!("Expected Delete operation"),
}
}
#[tokio::test]
async fn should_iterate_over_mixed_operations_including_merges() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.merge(b"key2", b"merge1");
batch.merge(b"key2", b"merge2");
batch.delete(b"key3");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
let expected = vec![
RowEntry::new_value(b"key1", b"value1", u64::MAX),
RowEntry::new(
Bytes::from_static(b"key2"),
ValueDeletable::Merge(Bytes::from_static(b"merge2")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key2"),
ValueDeletable::Merge(Bytes::from_static(b"merge1")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key3"),
ValueDeletable::Tombstone,
u64::MAX,
None,
None,
),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn should_iterate_over_multiple_merges_for_same_key() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge1");
batch.merge(b"key1", b"merge2");
batch.merge(b"key1", b"merge3");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
let expected = vec![
RowEntry::new(
Bytes::from_static(b"key1"),
ValueDeletable::Merge(Bytes::from_static(b"merge3")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key1"),
ValueDeletable::Merge(Bytes::from_static(b"merge2")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key1"),
ValueDeletable::Merge(Bytes::from_static(b"merge1")),
u64::MAX,
None,
None,
),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn should_iterate_over_merges_in_descending_order() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge1");
batch.merge(b"key2", b"merge2");
batch.merge(b"key1", b"merge3");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Descending);
let expected = vec![
RowEntry::new(
Bytes::from_static(b"key2"),
ValueDeletable::Merge(Bytes::from_static(b"merge2")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key1"),
ValueDeletable::Merge(Bytes::from_static(b"merge1")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key1"),
ValueDeletable::Merge(Bytes::from_static(b"merge3")),
u64::MAX,
None,
None,
),
];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn should_iterate_over_merges_in_range() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge1");
batch.merge(b"key3", b"merge3");
batch.merge(b"key5", b"merge5");
let mut iter = WriteBatchIterator::new(
batch.clone(),
BytesRange::from(Bytes::from_static(b"key2")..Bytes::from_static(b"key4")),
IterationOrder::Ascending,
);
let expected = vec![RowEntry::new(
Bytes::from_static(b"key3"),
ValueDeletable::Merge(Bytes::from_static(b"merge3")),
u64::MAX,
None,
None,
)];
assert_iterator(&mut iter, expected).await;
}
#[tokio::test]
async fn should_seek_to_merge_operations() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge1");
batch.merge(b"key3", b"merge3");
batch.merge(b"key5", b"merge5");
let mut iter = WriteBatchIterator::new(batch.clone(), .., IterationOrder::Ascending);
iter.seek(b"key3").await.unwrap();
let expected = vec![
RowEntry::new(
Bytes::from_static(b"key3"),
ValueDeletable::Merge(Bytes::from_static(b"merge3")),
u64::MAX,
None,
None,
),
RowEntry::new(
Bytes::from_static(b"key5"),
ValueDeletable::Merge(Bytes::from_static(b"merge5")),
u64::MAX,
None,
None,
),
];
assert_iterator(&mut iter, expected).await;
}
struct StringConcatMergeOperator;
impl crate::merge_operator::MergeOperator for StringConcatMergeOperator {
fn merge(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
operand: Bytes,
) -> Result<Bytes, crate::merge_operator::MergeOperatorError> {
match existing_value {
Some(base) => {
let mut merged = base.to_vec();
merged.extend_from_slice(&operand);
Ok(Bytes::from(merged))
}
None => Ok(operand),
}
}
}
#[tokio::test]
async fn should_extract_entries_no_merges() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value1");
batch.put(b"key2", b"value2");
batch.delete(b"key3");
let result = batch.extract_entries(100, 1000, None, None).await.unwrap();
let mut entries = result.into_iter().collect::<Vec<_>>();
entries.sort_by_key(|e| e.key.clone());
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].key, Bytes::from_static(b"key1"));
assert_eq!(
entries[0].value,
ValueDeletable::Value(Bytes::from_static(b"value1"))
);
assert_eq!(entries[1].key, Bytes::from_static(b"key2"));
assert_eq!(
entries[1].value,
ValueDeletable::Value(Bytes::from_static(b"value2"))
);
assert_eq!(entries[2].key, Bytes::from_static(b"key3"));
assert_eq!(entries[2].value, ValueDeletable::Tombstone);
}
#[tokio::test]
async fn should_extract_entries_multiple_merges() {
let mut batch = WriteBatch::new();
batch.merge(b"key1", b"merge1");
batch.merge(b"key1", b"merge2");
batch.merge(b"key1", b"merge3");
let merge_operator = Some(std::sync::Arc::new(StringConcatMergeOperator)
as crate::merge_operator::MergeOperatorType);
let result = batch
.extract_entries(100, 1000, None, merge_operator)
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].key, Bytes::from_static(b"key1"));
assert_eq!(
result[0].value,
ValueDeletable::Merge(Bytes::from_static(b"merge1merge2merge3"))
);
}
#[tokio::test]
async fn should_extract_entries_delete_then_merge() {
let mut batch = WriteBatch::new();
batch.delete(b"key1");
batch.merge(b"key1", b"merge1");
batch.merge(b"key1", b"merge2");
let merge_operator = Some(std::sync::Arc::new(StringConcatMergeOperator)
as crate::merge_operator::MergeOperatorType);
let result = batch
.extract_entries(100, 1000, None, merge_operator)
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].key, Bytes::from_static(b"key1"));
assert_eq!(
result[0].value,
ValueDeletable::Value(Bytes::from_static(b"merge1merge2"))
);
}
#[tokio::test]
async fn should_extract_entries_value_then_merge() {
let mut batch = WriteBatch::new();
batch.put(b"key1", b"value");
batch.merge(b"key1", b"merge1");
batch.merge(b"key1", b"merge2");
let merge_operator = Some(std::sync::Arc::new(StringConcatMergeOperator)
as crate::merge_operator::MergeOperatorType);
let result = batch
.extract_entries(100, 1000, None, merge_operator)
.await
.unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].key, Bytes::from_static(b"key1"));
assert_eq!(
result[0].value,
ValueDeletable::Value(Bytes::from_static(b"valuemerge1merge2"))
);
}
}