use crate::double_ended_peekable::{DoubleEndedPeekable, DoubleEndedPeekableExt};
use crate::merge_operator::MergeOperator;
use crate::range_tombstone::RangeTombstone;
use crate::{InternalValue, SeqNo, UserKey, UserValue, ValueType, comparator::SharedComparator};
use std::sync::Arc;
pub struct MvccStream<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> {
inner: DoubleEndedPeekable<crate::Result<InternalValue>, I>,
merge_operator: Option<Arc<dyn MergeOperator>>,
comparator: SharedComparator,
range_tombstones: Vec<(RangeTombstone, SeqNo)>,
key_entries_buf: Vec<InternalValue>,
}
impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> MvccStream<I> {
#[must_use]
pub fn new(iter: I, merge_operator: Option<Arc<dyn MergeOperator>>) -> Self {
Self::new_with_comparator(
iter,
merge_operator,
crate::comparator::default_comparator(),
)
}
#[must_use]
pub fn new_with_comparator(
iter: I,
merge_operator: Option<Arc<dyn MergeOperator>>,
comparator: SharedComparator,
) -> Self {
Self {
inner: iter.double_ended_peekable(),
merge_operator,
comparator,
range_tombstones: Vec::new(),
key_entries_buf: Vec::new(),
}
}
#[must_use]
pub fn with_range_tombstones(mut self, rts: Vec<(RangeTombstone, SeqNo)>) -> Self {
self.range_tombstones = rts;
self
}
fn is_rt_suppressed(&self, entry: &InternalValue) -> bool {
self.range_tombstones.iter().any(|(rt, cutoff)| {
rt.should_suppress_with(
&entry.key.user_key,
entry.key.seqno,
*cutoff,
self.comparator.as_ref(),
)
})
}
fn resolve_merge_forward(
&mut self,
head: &InternalValue,
merge_op: &dyn MergeOperator,
) -> crate::Result<InternalValue> {
let user_key = &head.key.user_key;
let mut operands: Vec<UserValue> = vec![head.value.clone()];
let mut base_value: Option<UserValue> = None;
let mut found_base = false;
let mut saw_indirection_base = false;
loop {
let Some(next) = self.inner.next_if(|kv| {
if let Ok(kv) = kv {
kv.key.user_key == *user_key
} else {
true
}
}) else {
break;
};
let next = next?;
if self.is_rt_suppressed(&next) {
found_base = true;
break;
}
match next.key.value_type {
ValueType::MergeOperand => {
operands.push(next.value);
}
ValueType::Value => {
base_value = Some(next.value);
found_base = true;
break;
}
ValueType::Indirection => {
found_base = true;
saw_indirection_base = true;
break;
}
ValueType::Tombstone | ValueType::WeakTombstone => {
found_base = true;
break;
}
}
}
if found_base {
self.drain_key_min(user_key)?;
}
if saw_indirection_base {
return Ok(head.clone());
}
operands.reverse();
let operand_refs: Vec<&[u8]> = operands.iter().map(AsRef::as_ref).collect();
let merged = merge_op.merge(user_key, base_value.as_deref(), &operand_refs)?;
Ok(InternalValue::from_components(
user_key.clone(),
merged,
head.key.seqno,
ValueType::Value,
))
}
fn resolve_merge_buffered(&self, entries: Vec<InternalValue>) -> crate::Result<InternalValue> {
let Some(merge_op) = &self.merge_operator else {
return entries
.into_iter()
.last()
.ok_or(crate::Error::Unrecoverable);
};
let newest = entries.last().ok_or(crate::Error::Unrecoverable)?;
let mut operands: Vec<UserValue> = Vec::new();
let mut base_value: Option<UserValue> = None;
let result_seqno = newest.key.seqno;
let result_key = newest.key.user_key.clone();
let mut saw_indirection = false;
for entry in entries.iter().rev() {
if self.is_rt_suppressed(entry) {
break;
}
match entry.key.value_type {
ValueType::MergeOperand => {
operands.push(entry.value.clone());
}
ValueType::Value => {
base_value = Some(entry.value.clone());
break;
}
ValueType::Indirection => {
saw_indirection = true;
break;
}
ValueType::Tombstone | ValueType::WeakTombstone => {
break;
}
}
}
if saw_indirection {
return entries
.into_iter()
.last()
.ok_or(crate::Error::Unrecoverable);
}
operands.reverse();
let operand_refs: Vec<&[u8]> = operands.iter().map(AsRef::as_ref).collect();
let merged = merge_op.merge(&result_key, base_value.as_deref(), &operand_refs)?;
Ok(InternalValue::from_components(
result_key,
merged,
result_seqno,
ValueType::Value,
))
}
fn drain_key_min(&mut self, key: &UserKey) -> crate::Result<()> {
loop {
let Some(next) = self.inner.next_if(|kv| {
if let Ok(kv) = kv {
kv.key.user_key == key
} else {
true
}
}) else {
return Ok(());
};
next?;
}
}
}
impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> Iterator for MvccStream<I> {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
let head = fail_iter!(self.inner.next()?);
if head.key.value_type.is_merge_operand() {
if let Some(merge_op) = self.merge_operator.clone()
&& !self.is_rt_suppressed(&head)
{
let result = self.resolve_merge_forward(&head, merge_op.as_ref());
return Some(result);
}
}
fail_iter!(self.drain_key_min(&head.key.user_key));
Some(Ok(head))
}
}
impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> DoubleEndedIterator
for MvccStream<I>
{
fn next_back(&mut self) -> Option<Self::Item> {
let has_merge_op = self.merge_operator.is_some();
self.key_entries_buf.clear();
loop {
let tail = fail_iter!(self.inner.next_back()?);
let prev = match self.inner.peek_back() {
Some(Ok(prev)) => prev,
Some(Err(_)) => {
#[expect(
clippy::expect_used,
reason = "we just asserted, the peeked value is an error"
)]
return Some(Err(self
.inner
.next_back()
.expect("should exist")
.expect_err("should be error")));
}
None => {
if has_merge_op
&& tail.key.value_type.is_merge_operand()
&& !self.is_rt_suppressed(&tail)
{
self.key_entries_buf.push(tail);
let entries = self.key_entries_buf.drain(..).collect();
return Some(self.resolve_merge_buffered(entries));
}
return Some(Ok(tail));
}
};
if prev.key.user_key < tail.key.user_key {
if has_merge_op
&& tail.key.value_type.is_merge_operand()
&& !self.is_rt_suppressed(&tail)
{
self.key_entries_buf.push(tail);
let entries = std::mem::take(&mut self.key_entries_buf);
return Some(self.resolve_merge_buffered(entries));
}
return Some(Ok(tail));
}
if has_merge_op {
self.key_entries_buf.push(tail);
}
}
}
}
#[cfg(test)]
#[allow(clippy::string_lit_as_bytes)]
#[allow(
clippy::unwrap_used,
clippy::indexing_slicing,
clippy::useless_vec,
reason = "test code"
)]
mod tests {
use super::*;
use crate::{ValueType, value::InternalValue};
use test_log::test;
macro_rules! stream {
($($key:expr, $sub_key:expr, $value_type:expr),* $(,)?) => {{
let mut values = Vec::new();
let mut counters = std::collections::HashMap::new();
$(
let key = $key.as_bytes();
let sub_key = $sub_key.as_bytes();
let value_type = match $value_type {
"V" => ValueType::Value,
"T" => ValueType::Tombstone,
"W" => ValueType::WeakTombstone,
_ => panic!("Unknown value type"),
};
let counter = counters.entry($key).and_modify(|x| { *x -= 1 }).or_insert(999);
values.push(InternalValue::from_components(key, sub_key, *counter, value_type));
)*
values
}};
}
macro_rules! iter_closed {
($iter:expr) => {
assert!($iter.next().is_none(), "iterator should be closed (done)");
assert!(
$iter.next_back().is_none(),
"iterator should be closed (done)"
);
};
}
macro_rules! test_reverse {
($v:expr) => {
let iter = Box::new($v.iter().cloned().map(Ok));
let iter = MvccStream::new(iter, None);
let mut forwards = iter.flatten().collect::<Vec<_>>();
forwards.reverse();
let iter = Box::new($v.iter().cloned().map(Ok));
let iter = MvccStream::new(iter, None);
let backwards = iter.rev().flatten().collect::<Vec<_>>();
assert_eq!(forwards, backwards);
};
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_error() -> crate::Result<()> {
{
let vec = [
Ok(InternalValue::from_components(
"a",
"new",
999,
ValueType::Value,
)),
Err(crate::Error::Io(std::io::Error::other("test error"))),
];
let iter = Box::new(vec.into_iter());
let mut iter = MvccStream::new(iter, None);
assert!(matches!(iter.next().unwrap(), Err(crate::Error::Io(_))));
iter_closed!(iter);
}
{
let vec = [
Ok(InternalValue::from_components(
"a",
"new",
999,
ValueType::Value,
)),
Err(crate::Error::Io(std::io::Error::other("test error"))),
];
let iter = Box::new(vec.into_iter());
let mut iter = MvccStream::new(iter, None);
assert!(matches!(
iter.next_back().unwrap(),
Err(crate::Error::Io(_))
));
assert_eq!(
InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
iter.next_back().unwrap()?,
);
iter_closed!(iter);
}
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_queue_reverse_almost_gone() -> crate::Result<()> {
let vec = [
InternalValue::from_components("a", "a", 0, ValueType::Value),
InternalValue::from_components("b", "", 1, ValueType::Tombstone),
InternalValue::from_components("b", "b", 0, ValueType::Value),
InternalValue::from_components("c", "", 1, ValueType::Tombstone),
InternalValue::from_components("c", "c", 0, ValueType::Value),
InternalValue::from_components("d", "", 1, ValueType::Tombstone),
InternalValue::from_components("d", "d", 0, ValueType::Value),
InternalValue::from_components("e", "", 1, ValueType::Tombstone),
InternalValue::from_components("e", "e", 0, ValueType::Value),
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_queue_almost_gone_2() -> crate::Result<()> {
let vec = [
InternalValue::from_components("a", "a", 0, ValueType::Value),
InternalValue::from_components("b", "", 1, ValueType::Tombstone),
InternalValue::from_components("c", "", 1, ValueType::Tombstone),
InternalValue::from_components("d", "", 1, ValueType::Tombstone),
InternalValue::from_components("e", "", 1, ValueType::Tombstone),
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"d", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_queue() -> crate::Result<()> {
let vec = [
InternalValue::from_components("a", "a", 0, ValueType::Value),
InternalValue::from_components("b", "b", 0, ValueType::Value),
InternalValue::from_components("c", "c", 0, ValueType::Value),
InternalValue::from_components("d", "d", 0, ValueType::Value),
InternalValue::from_components("e", "", 1, ValueType::Tombstone),
InternalValue::from_components("e", "e", 0, ValueType::Value),
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"e", *b"", 1, ValueType::Tombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_queue_weak_almost_gone() -> crate::Result<()> {
let vec = [
InternalValue::from_components("a", "a", 0, ValueType::Value),
InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("b", "b", 0, ValueType::Value),
InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("c", "c", 0, ValueType::Value),
InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("d", "d", 0, ValueType::Value),
InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("e", "e", 0, ValueType::Value),
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_queue_weak_almost_gone_2() -> crate::Result<()> {
let vec = [
InternalValue::from_components("a", "a", 0, ValueType::Value),
InternalValue::from_components("b", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("c", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("d", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"d", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_queue_weak_reverse() -> crate::Result<()> {
let vec = [
InternalValue::from_components("a", "a", 0, ValueType::Value),
InternalValue::from_components("b", "b", 0, ValueType::Value),
InternalValue::from_components("c", "c", 0, ValueType::Value),
InternalValue::from_components("d", "d", 0, ValueType::Value),
InternalValue::from_components("e", "", 1, ValueType::WeakTombstone),
InternalValue::from_components("e", "e", 0, ValueType::Value),
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"a", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"b", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"c", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"d", *b"d", 0, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"e", *b"", 1, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_simple() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "new", "V",
"a", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_simple_multi_keys() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "new", "V",
"a", "old", "V",
"b", "new", "V",
"b", "old", "V",
"c", "newnew", "V",
"c", "new", "V",
"c", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"new", 999, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"new", 999, ValueType::Value),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"newnew", 999, ValueType::Value),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_tombstone() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "", "T",
"a", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_tombstone_multi_keys() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "", "T",
"a", "old", "V",
"b", "", "T",
"b", "old", "V",
"c", "", "T",
"c", "", "T",
"c", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"", 999, ValueType::Tombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"", 999, ValueType::Tombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_weak_tombstone_simple() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "", "W",
"a", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_weak_tombstone_resurrection() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "", "W",
"a", "new", "V",
"a", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_weak_tombstone_priority() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "", "T",
"a", "", "W",
"a", "new", "V",
"a", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"", 999, ValueType::Tombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_stream_weak_tombstone_multi_keys() -> crate::Result<()> {
#[rustfmt::skip]
let vec = stream![
"a", "", "W",
"a", "old", "V",
"b", "", "W",
"b", "old", "V",
"c", "", "W",
"c", "old", "V",
];
let iter = Box::new(vec.iter().cloned().map(Ok));
let mut iter = MvccStream::new(iter, None);
assert_eq!(
InternalValue::from_components(*b"a", *b"", 999, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"b", *b"", 999, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
assert_eq!(
InternalValue::from_components(*b"c", *b"", 999, ValueType::WeakTombstone),
iter.next().unwrap()?,
);
iter_closed!(iter);
test_reverse!(vec);
Ok(())
}
#[allow(clippy::doc_markdown, clippy::unnecessary_wraps)]
mod merge_operator_tests {
use super::*;
use std::sync::Arc;
use test_log::test;
struct ConcatMerge;
impl crate::merge_operator::MergeOperator for ConcatMerge {
fn merge(
&self,
_key: &[u8],
base_value: Option<&[u8]>,
operands: &[&[u8]],
) -> crate::Result<crate::UserValue> {
let mut result = match base_value {
Some(b) => String::from_utf8_lossy(b).to_string(),
None => String::new(),
};
for op in operands {
if !result.is_empty() {
result.push(',');
}
result.push_str(&String::from_utf8_lossy(op));
}
Ok(result.into_bytes().into())
}
}
fn merge_op() -> Arc<dyn crate::merge_operator::MergeOperator> {
Arc::new(ConcatMerge)
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_forward_operands_only() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 1, ValueType::MergeOperand),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next().unwrap()?;
assert_eq!(item.key.value_type, ValueType::Value);
assert_eq!(&*item.value, b"op1,op2");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_forward_with_base() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next().unwrap()?;
assert_eq!(&*item.value, b"base,op1,op2");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_forward_with_tombstone() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "", 2, ValueType::Tombstone),
InternalValue::from_components("a", "old", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next().unwrap()?;
assert_eq!(&*item.value, b"op1");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[allow(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_forward_mixed_keys() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "val_a", 5, ValueType::Value),
InternalValue::from_components("b", "op2", 4, ValueType::MergeOperand),
InternalValue::from_components("b", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("c", "val_c", 2, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let iter = MvccStream::new(iter, Some(merge_op()));
let out: Vec<_> = iter.map(Result::unwrap).collect();
assert_eq!(out.len(), 3);
assert_eq!(&*out[0].value, b"val_a");
assert_eq!(&*out[1].value, b"op1,op2");
assert_eq!(&*out[2].value, b"val_c");
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_reverse_operands_with_base() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next_back().unwrap()?;
assert_eq!(&*item.value, b"base,op1,op2");
assert!(iter.next_back().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_reverse_operands_only() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 1, ValueType::MergeOperand),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next_back().unwrap()?;
assert_eq!(&*item.value, b"op1,op2");
assert!(iter.next_back().is_none());
Ok(())
}
#[test]
#[allow(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_reverse_mixed_keys() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "val_a", 5, ValueType::Value),
InternalValue::from_components("b", "op2", 4, ValueType::MergeOperand),
InternalValue::from_components("b", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("c", "val_c", 2, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let iter = MvccStream::new(iter, Some(merge_op()));
let out: Vec<_> = iter.rev().map(Result::unwrap).collect();
assert_eq!(out.len(), 3);
assert_eq!(&*out[0].value, b"val_c");
assert_eq!(&*out[1].value, b"op1,op2");
assert_eq!(&*out[2].value, b"val_a");
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_reverse_single_operand_last() -> crate::Result<()> {
let vec = vec![InternalValue::from_components(
"a",
"op1",
1,
ValueType::MergeOperand,
)];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next_back().unwrap()?;
assert_eq!(&*item.value, b"op1");
assert_eq!(item.key.value_type, ValueType::Value);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_no_operator_passthrough() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 1, ValueType::MergeOperand),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, None);
let item = iter.next().unwrap()?;
assert_eq!(item.key.value_type, ValueType::MergeOperand);
assert_eq!(&*item.value, b"op2"); assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn mvcc_merge_reverse_single_operand_with_different_key() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "val_a", 5, ValueType::Value),
InternalValue::from_components("b", "op1", 3, ValueType::MergeOperand),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next_back().unwrap()?;
assert_eq!(&*item.key.user_key, b"b");
assert_eq!(&*item.value, b"op1");
assert_eq!(item.key.value_type, ValueType::Value);
let item = iter.next_back().unwrap()?;
assert_eq!(&*item.key.user_key, b"a");
assert!(iter.next_back().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_forward_indirection_base_returns_head() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "blob_ptr", 1, ValueType::Indirection),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next().unwrap()?;
assert_eq!(&*item.key.user_key, b"a");
assert_eq!(item.key.value_type, ValueType::MergeOperand);
assert_eq!(&*item.value, b"op2");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_reverse_indirection_base_returns_newest() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op2", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "blob_ptr", 1, ValueType::Indirection),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next_back().unwrap()?;
assert_eq!(&*item.key.user_key, b"a");
assert_eq!(item.key.value_type, ValueType::MergeOperand);
assert_eq!(&*item.value, b"op2");
assert!(iter.next_back().is_none());
Ok(())
}
#[test]
fn merge_forward_error_propagation() {
struct FailMerge;
impl crate::merge_operator::MergeOperator for FailMerge {
fn merge(
&self,
_key: &[u8],
_base_value: Option<&[u8]>,
_operands: &[&[u8]],
) -> crate::Result<crate::UserValue> {
Err(crate::Error::MergeOperator)
}
}
let vec = vec![
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let fail_op: Option<Arc<dyn crate::merge_operator::MergeOperator>> =
Some(Arc::new(FailMerge));
let mut iter = MvccStream::new(iter, fail_op);
assert!(matches!(
iter.next(),
Some(Err(crate::Error::MergeOperator))
));
}
#[test]
fn merge_reverse_error_propagation() {
struct FailMerge;
impl crate::merge_operator::MergeOperator for FailMerge {
fn merge(
&self,
_key: &[u8],
_base_value: Option<&[u8]>,
_operands: &[&[u8]],
) -> crate::Result<crate::UserValue> {
Err(crate::Error::MergeOperator)
}
}
let vec = vec![
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let fail_op: Option<Arc<dyn crate::merge_operator::MergeOperator>> =
Some(Arc::new(FailMerge));
let mut iter = MvccStream::new(iter, fail_op);
assert!(matches!(
iter.next_back(),
Some(Err(crate::Error::MergeOperator))
));
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_forward_weak_tombstone_stops_base() -> crate::Result<()> {
let vec = vec![
InternalValue::from_components("a", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "", 2, ValueType::WeakTombstone),
InternalValue::from_components("a", "old_base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter = MvccStream::new(iter, Some(merge_op()));
let item = iter.next().unwrap()?;
assert_eq!(item.key.value_type, ValueType::Value);
assert_eq!(&*item.value, b"op1");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_forward_rt_suppresses_base() -> crate::Result<()> {
use crate::range_tombstone::RangeTombstone;
let rt = RangeTombstone::new(b"a".to_vec().into(), b"b".to_vec().into(), 2);
let vec = vec![
InternalValue::from_components("a", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter =
MvccStream::new(iter, Some(merge_op())).with_range_tombstones(vec![(rt, 4)]);
let item = iter.next().unwrap()?;
assert_eq!(item.key.value_type, ValueType::Value);
assert_eq!(&*item.value, b"op1");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_forward_rt_suppresses_operand() -> crate::Result<()> {
use crate::range_tombstone::RangeTombstone;
let rt = RangeTombstone::new(b"a".to_vec().into(), b"b".to_vec().into(), 3);
let vec = vec![
InternalValue::from_components("a", "op2", 4, ValueType::MergeOperand),
InternalValue::from_components("a", "op1", 2, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter =
MvccStream::new(iter, Some(merge_op())).with_range_tombstones(vec![(rt, 5)]);
let item = iter.next().unwrap()?;
assert_eq!(item.key.value_type, ValueType::Value);
assert_eq!(&*item.value, b"op2");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_reverse_rt_suppresses_base() -> crate::Result<()> {
use crate::range_tombstone::RangeTombstone;
let rt = RangeTombstone::new(b"a".to_vec().into(), b"b".to_vec().into(), 2);
let vec = vec![
InternalValue::from_components("a", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter =
MvccStream::new(iter, Some(merge_op())).with_range_tombstones(vec![(rt, 4)]);
let item = iter.next_back().unwrap()?;
assert_eq!(item.key.value_type, ValueType::Value);
assert_eq!(&*item.value, b"op1");
assert!(iter.next_back().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_forward_rt_suppresses_head() -> crate::Result<()> {
use crate::range_tombstone::RangeTombstone;
let rt = RangeTombstone::new(b"a".to_vec().into(), b"b".to_vec().into(), 5);
let vec = vec![
InternalValue::from_components("a", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter =
MvccStream::new(iter, Some(merge_op())).with_range_tombstones(vec![(rt, 6)]);
let item = iter.next().unwrap()?;
assert_eq!(item.key.value_type, ValueType::MergeOperand);
assert_eq!(&*item.value, b"op1");
assert!(iter.next().is_none());
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertion")]
fn merge_reverse_rt_suppresses_head() -> crate::Result<()> {
use crate::range_tombstone::RangeTombstone;
let rt = RangeTombstone::new(b"a".to_vec().into(), b"b".to_vec().into(), 5);
let vec = vec![
InternalValue::from_components("a", "op1", 3, ValueType::MergeOperand),
InternalValue::from_components("a", "base", 1, ValueType::Value),
];
let iter = Box::new(vec.into_iter().map(Ok));
let mut iter =
MvccStream::new(iter, Some(merge_op())).with_range_tombstones(vec![(rt, 6)]);
let item = iter.next_back().unwrap()?;
assert_eq!(item.key.value_type, ValueType::MergeOperand);
assert_eq!(&*item.value, b"op1");
assert!(iter.next_back().is_none());
Ok(())
}
}
}