use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use slatedb_common::metrics::CounterFn;
use thiserror::Error;
use crate::{
error::SlateDBError,
iter::{RowEntryIterator, TrackedRowEntryIterator},
types::{RowEntry, ValueDeletable},
utils::merge_options,
};
#[non_exhaustive]
#[derive(Clone, Debug, Error)]
pub enum MergeOperatorError {
#[error("merge_batch called with empty operands and no existing value")]
EmptyBatch,
#[error("{message}")]
Callback { message: String },
}
pub trait MergeOperator {
fn merge(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError>;
fn merge_batch(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
) -> Result<Bytes, MergeOperatorError> {
let mut result = existing_value;
for operand in operands {
result = Some(self.merge(key, result, operand.clone())?);
}
result.ok_or(MergeOperatorError::EmptyBatch)
}
}
pub(crate) type MergeOperatorType = Arc<dyn MergeOperator + Send + Sync>;
pub const MERGE_OPERATOR_OPERANDS: &str = "slatedb.merge_operator_operands";
pub(crate) const MERGE_OPERATOR_OPERANDS_DESCRIPTION: &str = "Merge operator operands resolved";
pub(crate) const MERGE_OPERATOR_PATH_LABEL: &str = "path";
pub(crate) const MERGE_OPERATOR_READ_PATH: &str = "read";
pub(crate) const MERGE_OPERATOR_FLUSH_PATH: &str = "flush";
pub(crate) const MERGE_OPERATOR_COMPACT_PATH: &str = "compact";
pub(crate) fn instrument_merge_operator(
merge_operator: MergeOperatorType,
merge_batch_operands: Arc<dyn CounterFn>,
) -> MergeOperatorType {
Arc::new(InstrumentedMergeOperator {
merge_operator,
merge_batch_operands,
})
}
struct InstrumentedMergeOperator {
merge_operator: MergeOperatorType,
merge_batch_operands: Arc<dyn CounterFn>,
}
impl MergeOperator for InstrumentedMergeOperator {
fn merge(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
self.merge_operator.merge(key, existing_value, value)
}
fn merge_batch(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
) -> Result<Bytes, MergeOperatorError> {
let result = self
.merge_operator
.merge_batch(key, existing_value, operands)?;
self.merge_batch_operands.increment(operands.len() as u64);
Ok(result)
}
}
const MERGE_BATCH_SIZE: usize = 100;
pub(crate) struct MergeOperatorRequiredIterator<T: RowEntryIterator> {
delegate: T,
}
impl<T: RowEntryIterator> MergeOperatorRequiredIterator<T> {
pub(crate) fn new(delegate: T) -> Self {
Self { delegate }
}
}
#[async_trait]
impl<T: RowEntryIterator> RowEntryIterator for MergeOperatorRequiredIterator<T> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.delegate.init().await
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
let next = self.delegate.next().await?;
if let Some(entry) = next {
match &entry.value {
ValueDeletable::Merge(_) => {
return Err(SlateDBError::MergeOperatorMissing);
}
_ => return Ok(Some(entry)),
}
}
Ok(None)
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.delegate.seek(next_key).await
}
}
impl<T: TrackedRowEntryIterator> TrackedRowEntryIterator for MergeOperatorRequiredIterator<T> {
fn bytes_processed(&self) -> u64 {
self.delegate.bytes_processed()
}
}
pub(crate) struct MergeOperatorIterator<T: RowEntryIterator> {
merge_operator: MergeOperatorType,
delegate: T,
buffered_entry: Option<RowEntry>,
merge_different_expire_ts: bool,
snapshot_barrier_seq: Option<u64>,
}
#[derive(Debug, Clone)]
struct MergeTracker {
max_create_ts: Option<i64>,
min_expire_ts: Option<i64>,
seq: u64,
}
impl MergeTracker {
fn update(&mut self, entry: &RowEntry) -> Result<(), SlateDBError> {
self.max_create_ts = merge_options(self.max_create_ts, entry.create_ts, i64::max);
self.min_expire_ts = merge_options(self.min_expire_ts, entry.expire_ts, i64::min);
if self.seq < entry.seq {
return Err(SlateDBError::InvalidSequenceOrder {
current_seq: self.seq,
next_seq: entry.seq,
});
}
self.seq = std::cmp::max(self.seq, entry.seq);
Ok(())
}
}
#[allow(unused)]
impl<T: RowEntryIterator> MergeOperatorIterator<T> {
pub(crate) fn new(
merge_operator: MergeOperatorType,
delegate: T,
merge_different_expire_ts: bool,
snapshot_barrier_seq: Option<u64>,
) -> Self {
Self {
merge_operator,
delegate,
buffered_entry: None,
merge_different_expire_ts,
snapshot_barrier_seq,
}
}
}
impl<T: RowEntryIterator> MergeOperatorIterator<T> {
fn is_matching_entry(
&self,
entry: &RowEntry,
key: &Bytes,
first_expire_ts: Option<i64>,
) -> bool {
entry.key == *key && (self.merge_different_expire_ts || first_expire_ts == entry.expire_ts)
}
fn process_batch(
&self,
key: &Bytes,
batch: &mut Vec<RowEntry>,
merge_tracker: &mut MergeTracker,
) -> Result<Bytes, SlateDBError> {
batch.reverse();
let mut operands: Vec<Bytes> = Vec::with_capacity(batch.len());
for entry in &*batch {
merge_tracker.update(entry)?;
if let Some(v) = entry.value.as_bytes() {
operands.push(v);
}
}
let batch_result = self.merge_operator.merge_batch(key, None, &operands)?;
batch.clear();
Ok(batch_result)
}
async fn merge_with_older_entries(
&mut self,
first_entry: RowEntry,
) -> Result<Option<RowEntry>, SlateDBError> {
let key = first_entry.key.clone();
let first_expire_ts = first_entry.expire_ts;
let mut merge_tracker = MergeTracker {
max_create_ts: None,
min_expire_ts: None,
seq: first_entry.seq,
};
let mut results = Vec::new();
let mut batch = Vec::with_capacity(MERGE_BATCH_SIZE);
let mut next = Some(first_entry);
let base = loop {
if let Some(entry) = next {
if !self.is_matching_entry(&entry, &key, first_expire_ts) {
self.buffered_entry = Some(entry);
break None;
} else if !matches!(entry.value, ValueDeletable::Merge(_)) {
break Some(entry);
} else {
batch.push(entry);
}
if batch.len() >= MERGE_BATCH_SIZE {
results.push(self.process_batch(&key, &mut batch, &mut merge_tracker)?);
}
next = self.delegate.next().await?;
} else {
break None;
}
};
if !batch.is_empty() {
results.push(self.process_batch(&key, &mut batch, &mut merge_tracker)?);
}
let base_value = base.as_ref().and_then(|b| b.value.as_bytes());
let found_base = base.is_some();
if let Some(ref base_entry) = base {
merge_tracker.update(base_entry)?;
}
if results.is_empty() && base_value.is_none() {
return Ok(None);
}
results.reverse();
let final_result = self
.merge_operator
.merge_batch(&key, base_value, &results)?;
Ok(Some(RowEntry {
key: key.clone(),
value: if found_base {
ValueDeletable::Value(final_result)
} else {
ValueDeletable::Merge(final_result)
},
seq: merge_tracker.seq,
create_ts: merge_tracker.max_create_ts,
expire_ts: merge_tracker.min_expire_ts,
}))
}
}
#[async_trait]
impl<T: RowEntryIterator> RowEntryIterator for MergeOperatorIterator<T> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.delegate.init().await
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
let next = match self.buffered_entry.take() {
Some(entry) => Some(entry),
None => self.delegate.next().await?,
};
if let Some(entry) = next {
match &entry.value {
ValueDeletable::Merge(_) => {
if let Some(snapshot_barrier_seq) = self.snapshot_barrier_seq {
if entry.seq > snapshot_barrier_seq {
return Ok(Some(entry));
}
}
return self.merge_with_older_entries(entry).await;
}
_ => return Ok(Some(entry)),
}
}
Ok(None)
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.buffered_entry = None;
self.delegate.seek(next_key).await
}
}
impl<T: TrackedRowEntryIterator> TrackedRowEntryIterator for MergeOperatorIterator<T> {
fn bytes_processed(&self) -> u64 {
self.delegate.bytes_processed()
}
}
#[cfg(test)]
mod tests {
use std::{cmp::Ordering, collections::VecDeque, fmt::Debug};
use rstest::rstest;
use slatedb_common::metrics::{lookup_metric, test_recorder_helper};
use crate::test_utils::assert_iterator;
use super::*;
struct MockMergeOperator;
impl MergeOperator for MockMergeOperator {
fn merge(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
match existing_value {
Some(existing) => {
let mut merged = existing.to_vec();
merged.extend_from_slice(&value);
Ok(Bytes::from(merged))
}
None => Ok(value),
}
}
}
struct MockBatchedMergeOperator {
merge_batch_call_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
impl MockBatchedMergeOperator {
fn new() -> (Self, std::sync::Arc<std::sync::atomic::AtomicUsize>) {
let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
(
Self {
merge_batch_call_count: counter.clone(),
},
counter,
)
}
}
impl MergeOperator for MockBatchedMergeOperator {
fn merge(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
match existing_value {
Some(existing) => {
let mut merged = existing.to_vec();
merged.extend_from_slice(&value);
Ok(Bytes::from(merged))
}
None => Ok(value),
}
}
fn merge_batch(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
) -> Result<Bytes, MergeOperatorError> {
self.merge_batch_call_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut result = existing_value.unwrap_or_default().to_vec();
for operand in operands {
result.extend_from_slice(operand);
}
Ok(Bytes::from(result))
}
}
struct FailingMergeBatchOperator;
impl MergeOperator for FailingMergeBatchOperator {
fn merge(
&self,
_key: &Bytes,
_existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
Ok(value)
}
fn merge_batch(
&self,
key: &Bytes,
_existing_value: Option<Bytes>,
_operands: &[Bytes],
) -> Result<Bytes, MergeOperatorError> {
Err(MergeOperatorError::Callback {
message: format!("failed to merge {key:?}"),
})
}
}
#[test]
fn test_instrumented_merge_operator_counts_successful_batch_operands() {
const TEST_COUNTER: &str = "test.merge_operator.operands";
let (metrics_recorder, recorder) = test_recorder_helper();
let counter = recorder.counter(TEST_COUNTER).register();
let merge_operator = instrument_merge_operator(Arc::new(MockMergeOperator), counter);
let result = merge_operator
.merge_batch(
&Bytes::from_static(b"key1"),
None,
&[Bytes::from_static(b"a"), Bytes::from_static(b"b")],
)
.unwrap();
assert_eq!(result, Bytes::from_static(b"ab"));
assert_eq!(
lookup_metric(metrics_recorder.as_ref(), TEST_COUNTER),
Some(2)
);
}
#[test]
fn test_instrumented_merge_operator_does_not_count_failed_batch_operands() {
const TEST_COUNTER: &str = "test.merge_operator.operands";
let (metrics_recorder, recorder) = test_recorder_helper();
let counter = recorder.counter(TEST_COUNTER).register();
let merge_operator =
instrument_merge_operator(Arc::new(FailingMergeBatchOperator), counter);
let result = merge_operator.merge_batch(
&Bytes::from_static(b"key1"),
None,
&[Bytes::from_static(b"a"), Bytes::from_static(b"b")],
);
assert!(result.is_err());
assert_eq!(
lookup_metric(metrics_recorder.as_ref(), TEST_COUNTER),
Some(0)
);
}
#[tokio::test]
async fn test_merge_operator_iterator() {
let merge_operator = Arc::new(MockMergeOperator {});
let data = vec![
RowEntry::new_merge(b"key1", b"1", 1),
RowEntry::new_merge(b"key1", b"2", 2),
RowEntry::new_merge(b"key1", b"3", 3),
RowEntry::new_merge(b"key1", b"4", 4),
RowEntry::new_value(b"key2", b"1", 5),
RowEntry::new_value(b"key3", b"1", 6),
RowEntry::new_merge(b"key3", b"2", 7),
RowEntry::new_merge(b"key3", b"3", 8),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
assert_iterator(
&mut iterator,
vec![
RowEntry::new_merge(b"key1", b"1234", 4),
RowEntry::new_value(b"key2", b"1", 5),
RowEntry::new_value(b"key3", b"123", 8),
],
)
.await;
}
#[tokio::test]
async fn test_seek_clears_buffered_entry() {
let merge_operator = Arc::new(MockMergeOperator {});
let data = vec![
RowEntry::new_merge(b"key1", b"1", 1),
RowEntry::new_merge(b"key1", b"2", 2),
RowEntry::new_value(b"key2", b"v", 3),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
iterator.init().await.unwrap();
let first = iterator.next().await.unwrap().unwrap();
assert_eq!(first.key.as_ref(), b"key1");
assert_eq!(first.value.as_bytes().unwrap().as_ref(), b"12");
iterator.seek(b"key3").await.unwrap();
assert!(
iterator.next().await.unwrap().is_none(),
"seek should not return a stale buffered entry"
);
}
#[derive(Debug)]
struct TestCase {
unsorted_data: Vec<RowEntry>,
expected: Vec<RowEntry>,
merge_different_expire_ts: bool,
snapshot_barrier_seq: Option<u64>,
}
impl Default for TestCase {
fn default() -> Self {
Self {
unsorted_data: vec![],
expected: vec![],
merge_different_expire_ts: true,
snapshot_barrier_seq: None,
}
}
}
#[rstest]
#[case::different_expire_ts_read_path(TestCase {
unsorted_data: vec![
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(1),
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(2),
RowEntry::new_merge(b"key1", b"3", 3).with_expire_ts(3),
RowEntry::new_value(b"key2", b"1", 4),
RowEntry::new_merge(b"key3", b"1", 5).with_expire_ts(1),
RowEntry::new_merge(b"key3", b"2", 6).with_expire_ts(1),
RowEntry::new_merge(b"key3", b"3", 7).with_expire_ts(2),
],
expected: vec![
RowEntry::new_merge(b"key1", b"123", 3).with_expire_ts(1),
RowEntry::new_value(b"key2", b"1", 4),
RowEntry::new_merge(b"key3", b"123", 7).with_expire_ts(1),
],
..TestCase::default()
})]
#[case::different_expire_ts_write_path(TestCase {
unsorted_data: vec![
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(1),
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(2),
RowEntry::new_merge(b"key1", b"3", 3).with_expire_ts(3),
RowEntry::new_value(b"key2", b"1", 4),
RowEntry::new_merge(b"key3", b"1", 5).with_expire_ts(1),
RowEntry::new_merge(b"key3", b"2", 6).with_expire_ts(1),
RowEntry::new_merge(b"key3", b"3", 7).with_expire_ts(2),
],
expected: vec![
RowEntry::new_merge(b"key1", b"3", 3).with_expire_ts(3),
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(2),
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(1),
RowEntry::new_value(b"key2", b"1", 4),
RowEntry::new_merge(b"key3", b"3", 7).with_expire_ts(2),
RowEntry::new_merge(b"key3", b"12", 6).with_expire_ts(1),
],
// On write path (compaction, memtable), we don't merge entries
// with different expire timestamps to allow per-element expiration.
merge_different_expire_ts: false,
..TestCase::default()
})]
#[case::merge_with_tombstone(TestCase {
unsorted_data: vec![
RowEntry::new_merge(b"key1", b"1", 1),
RowEntry::new_merge(b"key1", b"2", 2),
RowEntry::new_tombstone(b"key1", 3),
RowEntry::new_merge(b"key1", b"3", 4),
RowEntry::new_value(b"key2", b"1", 5)
],
expected: vec![
// Merge + Tombstone becomes a value to invalidate older entries.
RowEntry::new_value(b"key1", b"3", 4),
RowEntry::new_merge(b"key1", b"12", 2),
RowEntry::new_value(b"key2", b"1", 5)
],
..TestCase::default()
})]
#[case::multiple_values(TestCase {
unsorted_data: vec![
RowEntry::new_value(b"key1", b"1", 1),
RowEntry::new_value(b"key1", b"2", 2),
],
expected: vec![
RowEntry::new_value(b"key1", b"2", 2),
RowEntry::new_value(b"key1", b"1", 1),
],
..TestCase::default()
})]
#[case::merge_with_snapshot_barrier(TestCase {
unsorted_data: vec![
RowEntry::new_merge(b"key1", b"1", 1),
RowEntry::new_merge(b"key1", b"2", 2),
RowEntry::new_merge(b"key1", b"3", 3),
RowEntry::new_merge(b"key1", b"4", 4),
RowEntry::new_value(b"key1", b"5", 5)
],
expected: vec![
RowEntry::new_value(b"key1", b"5", 5),
RowEntry::new_merge(b"key1", b"4", 4),
RowEntry::new_merge(b"key1", b"123", 3),
],
snapshot_barrier_seq: Some(3),
..TestCase::default()
})]
#[tokio::test]
async fn test(#[case] test_case: TestCase) {
let merge_operator = Arc::new(MockMergeOperator {});
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
test_case.unsorted_data.into(),
test_case.merge_different_expire_ts,
test_case.snapshot_barrier_seq,
);
assert_iterator(&mut iterator, test_case.expected).await;
}
struct MockRowEntryIterator {
values: VecDeque<RowEntry>,
}
#[async_trait]
impl RowEntryIterator for MockRowEntryIterator {
async fn init(&mut self) -> Result<(), SlateDBError> {
Ok(())
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
Ok(self.values.pop_front())
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.values.retain(|entry| entry.key == next_key);
Ok(())
}
}
impl From<Vec<RowEntry>> for MockRowEntryIterator {
fn from(values: Vec<RowEntry>) -> Self {
let mut sorted_values = values;
sorted_values.sort_by(|left, right| {
let ord = left.key.cmp(&right.key);
if ord == Ordering::Equal {
right.seq.cmp(&left.seq)
} else {
ord
}
});
Self {
values: sorted_values.into(),
}
}
}
struct KeyPrefixMergeOperator;
impl MergeOperator for KeyPrefixMergeOperator {
fn merge(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
if key.starts_with(b"sum:") {
match existing_value {
Some(existing) => {
let existing_num =
u64::from_le_bytes(existing.as_ref().try_into().unwrap());
let new_num = u64::from_le_bytes(value.as_ref().try_into().unwrap());
Ok(Bytes::copy_from_slice(
&(existing_num + new_num).to_le_bytes(),
))
}
None => Ok(value),
}
} else if key.starts_with(b"max:") {
match existing_value {
Some(existing) => {
let existing_num =
u64::from_le_bytes(existing.as_ref().try_into().unwrap());
let new_num = u64::from_le_bytes(value.as_ref().try_into().unwrap());
Ok(Bytes::copy_from_slice(
&existing_num.max(new_num).to_le_bytes(),
))
}
None => Ok(value),
}
} else {
match existing_value {
Some(existing) => {
let mut merged = existing.to_vec();
merged.extend_from_slice(&value);
Ok(Bytes::from(merged))
}
None => Ok(value),
}
}
}
fn merge_batch(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
) -> Result<Bytes, MergeOperatorError> {
if key.starts_with(b"max:") {
let mut max_val = existing_value
.map(|v| u64::from_le_bytes(v.as_ref().try_into().unwrap()))
.unwrap_or(0);
for operand in operands {
let val = u64::from_le_bytes(operand.as_ref().try_into().unwrap());
max_val = max_val.max(val);
}
Ok(Bytes::copy_from_slice(&max_val.to_le_bytes()))
} else {
let mut result = existing_value;
for operand in operands {
result = Some(self.merge(key, result, operand.clone())?);
}
result.ok_or(MergeOperatorError::EmptyBatch)
}
}
}
#[tokio::test]
async fn should_route_merge_based_on_key_prefix() {
let merge_operator = Arc::new(KeyPrefixMergeOperator {});
let data = vec![
RowEntry::new_merge(b"sum:counter", &5u64.to_le_bytes(), 1),
RowEntry::new_merge(b"sum:counter", &3u64.to_le_bytes(), 2),
RowEntry::new_merge(b"sum:counter", &7u64.to_le_bytes(), 3),
RowEntry::new_merge(b"max:score", &5u64.to_le_bytes(), 4),
RowEntry::new_merge(b"max:score", &10u64.to_le_bytes(), 5),
RowEntry::new_merge(b"max:score", &3u64.to_le_bytes(), 6),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
let max_expected = 10u64.to_le_bytes();
let sum_expected = 15u64.to_le_bytes();
assert_iterator(
&mut iterator,
vec![
RowEntry::new_merge(b"max:score", &max_expected, 6),
RowEntry::new_merge(b"sum:counter", &sum_expected, 3),
],
)
.await;
}
#[tokio::test]
async fn test_batched_merge_with_many_operands() {
let merge_operator = Arc::new(MockMergeOperator {});
let mut data = vec![];
for i in 1..=250 {
data.push(RowEntry::new_merge(b"key1", &[i as u8], i));
}
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
let expected_bytes: Vec<u8> = (1..=250).map(|i| i as u8).collect();
let expected = vec![RowEntry::new_merge(b"key1", &expected_bytes, 250)];
assert_iterator(&mut iterator, expected).await;
}
#[tokio::test]
async fn test_batched_merge_with_base_value() {
let merge_operator = Arc::new(MockMergeOperator {});
let mut data = vec![];
data.push(RowEntry::new_value(b"key1", b"BASE", 0));
for i in 1..=150 {
data.push(RowEntry::new_merge(b"key1", &[i as u8], i));
}
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
let mut expected_bytes = b"BASE".to_vec();
expected_bytes.extend((1..=150).map(|i| i as u8));
let expected = vec![RowEntry::new_value(b"key1", &expected_bytes, 150)];
assert_iterator(&mut iterator, expected).await;
}
#[tokio::test]
async fn test_merge_batch_is_actually_called() {
let (merge_operator, call_count) = MockBatchedMergeOperator::new();
let merge_operator = Arc::new(merge_operator);
let mut data = vec![];
for i in 1..=250 {
data.push(RowEntry::new_merge(b"key1", &[i as u8], i));
}
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
let expected_bytes: Vec<u8> = (1..=250).map(|i| i as u8).collect();
let expected = vec![RowEntry::new_merge(b"key1", &expected_bytes, 250)];
assert_iterator(&mut iterator, expected).await;
let actual_calls = call_count.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(
actual_calls, 4,
"Expected merge_batch to be called 4 times for 250 operands (3 batches + 1 final merge), but was called {} times",
actual_calls
);
}
#[tokio::test]
async fn test_merge_batch_with_base_value_call_count() {
let (merge_operator, call_count) = MockBatchedMergeOperator::new();
let merge_operator = Arc::new(merge_operator);
let mut data = vec![];
data.push(RowEntry::new_value(b"key1", b"BASE", 0));
for i in 1..=150 {
data.push(RowEntry::new_merge(b"key1", &[i as u8], i));
}
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
let mut expected_bytes = b"BASE".to_vec();
expected_bytes.extend((1..=150).map(|i| i as u8));
let expected = vec![RowEntry::new_value(b"key1", &expected_bytes, 150)];
assert_iterator(&mut iterator, expected).await;
let actual_calls = call_count.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(
actual_calls, 3,
"Expected merge_batch to be called 3 times for 150 operands (2 batches + 1 final merge), but was called {} times",
actual_calls
);
}
#[tokio::test]
async fn test_merge_operator_merges_entries_with_mixed_expire_ts() {
let merge_operator = Arc::new(MockMergeOperator {});
let data = vec![
RowEntry::new_merge(b"key1", b"4", 4).with_expire_ts(300),
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(150),
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(200),
RowEntry::new_merge(b"key1", b"5", 5).with_expire_ts(100),
RowEntry::new_merge(b"key1", b"3", 3).with_expire_ts(50),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
assert_iterator(
&mut iterator,
vec![RowEntry::new_merge(b"key1", b"12345", 5).with_expire_ts(50)],
)
.await;
}
#[tokio::test]
async fn test_merge_operator_merges_entries_with_base_value_and_mixed_expire_ts() {
let merge_operator = Arc::new(MockMergeOperator {});
let data = vec![
RowEntry::new_merge(b"key1", b"4", 4).with_expire_ts(300),
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(250),
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(150),
RowEntry::new_merge(b"key1", b"3", 3).with_expire_ts(50),
RowEntry::new_value(b"key1", b"BASE", 0).with_expire_ts(200),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
assert_iterator(
&mut iterator,
vec![RowEntry::new_value(b"key1", b"BASE1234", 4).with_expire_ts(50)],
)
.await;
}
#[tokio::test]
async fn test_merge_operator_merges_all_entries_with_expire_ts() {
let merge_operator = Arc::new(MockMergeOperator {});
let data = vec![
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(50),
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(80),
RowEntry::new_merge(b"key1", b"3", 3).with_expire_ts(90),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
assert_iterator(
&mut iterator,
vec![RowEntry::new_merge(b"key1", b"123", 3).with_expire_ts(50)],
)
.await;
}
#[tokio::test]
async fn should_use_base_expire_ts_when_it_is_the_earliest() {
let merge_operator = Arc::new(MockMergeOperator {});
let data = vec![
RowEntry::new_merge(b"key1", b"2", 2).with_expire_ts(300),
RowEntry::new_merge(b"key1", b"1", 1).with_expire_ts(200),
RowEntry::new_value(b"key1", b"BASE", 0).with_expire_ts(50),
];
let mut iterator = MergeOperatorIterator::<MockRowEntryIterator>::new(
merge_operator,
data.into(),
true,
None,
);
assert_iterator(
&mut iterator,
vec![RowEntry::new_value(b"key1", b"BASE12", 2).with_expire_ts(50)],
)
.await;
}
}