use crate::error::Result;
use crate::iterator::KvIterator;
use crate::schema::Schema;
use crate::ttl::TTLProvider;
use crate::r#type::{Column, KvValue};
use bytes::Bytes;
use std::sync::Arc;
type MergeCallback = Box<dyn FnMut(Option<&Column>, Option<&Column>)>;
pub struct DeduplicatingIterator<I> {
inner: I,
num_columns: usize,
current_key: Option<Bytes>,
current_value: Option<KvValue>,
ttl_provider: Arc<TTLProvider>,
on_merge: Option<MergeCallback>,
allow_terminal_shortcut: bool,
schema: Arc<Schema>,
}
fn collect_value(
value: KvValue,
num_columns: usize,
ttl_provider: &TTLProvider,
allow_terminal_shortcut: bool,
values: &mut Vec<KvValue>,
selected_value: &mut Option<KvValue>,
stop_collecting: &mut bool,
) -> Result<()> {
let expired_at = value.expired_at()?;
if ttl_provider.expired(&expired_at) {
return Ok(());
}
let is_terminal = value.is_terminal(num_columns)?;
if allow_terminal_shortcut && selected_value.is_none() && values.is_empty() && is_terminal {
*selected_value = Some(value);
*stop_collecting = true;
return Ok(());
}
values.push(value);
if is_terminal {
*stop_collecting = true;
}
Ok(())
}
impl<I> DeduplicatingIterator<I> {
pub fn new(
inner: I,
num_columns: usize,
ttl_provider: Arc<TTLProvider>,
on_merge: Option<MergeCallback>,
schema: Arc<Schema>,
) -> Self {
let allow_terminal_shortcut = on_merge.is_none();
Self {
inner,
num_columns,
current_key: None,
current_value: None,
ttl_provider,
on_merge,
allow_terminal_shortcut,
schema,
}
}
fn collect_and_merge<'a>(&mut self) -> Result<()>
where
I: KvIterator<'a>,
{
let allow_terminal_shortcut = self.allow_terminal_shortcut;
loop {
if !self.inner.valid() {
self.current_key = None;
self.current_value = None;
return Ok(());
}
let Some((current_key, first_value)) = self.inner.take_current()? else {
self.current_key = None;
self.current_value = None;
return Ok(());
};
let mut values: Vec<KvValue> = Vec::new();
let mut selected_value: Option<KvValue> = None;
let mut stop_collecting = false;
collect_value(
first_value,
self.num_columns,
&self.ttl_provider,
allow_terminal_shortcut,
&mut values,
&mut selected_value,
&mut stop_collecting,
)?;
while self.inner.next()? {
let Some(next_key) = self.inner.key()? else {
break;
};
if next_key != current_key.as_ref() {
break;
}
if stop_collecting && allow_terminal_shortcut {
continue;
}
if let Some(next_kv_value) = self.inner.take_value()? {
collect_value(
next_kv_value,
self.num_columns,
&self.ttl_provider,
allow_terminal_shortcut,
&mut values,
&mut selected_value,
&mut stop_collecting,
)?;
}
}
if let Some(value) = selected_value {
self.current_key = Some(current_key);
self.current_value = Some(value);
return Ok(());
}
if values.is_empty() {
continue;
}
let mut values_iter = values.into_iter().rev();
let first = values_iter.next().expect("values is non-empty");
let mut merged_value = first.into_decoded(self.num_columns)?;
if let Some(callback) = self.on_merge.as_deref_mut() {
for column in merged_value.columns() {
if column.is_some() {
callback(None, column.as_ref());
}
}
for newer_value in values_iter {
let newer_value = newer_value.into_decoded(self.num_columns)?;
merged_value = merged_value.merge_with_callback(
newer_value,
&self.schema,
Some(self.ttl_provider.time_provider()),
callback,
)?;
}
} else {
for newer_value in values_iter {
let newer_value = newer_value.into_decoded(self.num_columns)?;
merged_value = merged_value.merge(
newer_value,
&self.schema,
Some(self.ttl_provider.time_provider()),
)?;
}
}
self.current_key = Some(current_key);
self.current_value = Some(KvValue::Decoded(merged_value));
return Ok(());
}
}
}
impl<'a, I> KvIterator<'a> for DeduplicatingIterator<I>
where
I: KvIterator<'a>,
{
fn seek(&mut self, target: &[u8]) -> Result<()> {
self.inner.seek(target)?;
self.collect_and_merge()
}
fn seek_to_first(&mut self) -> Result<()> {
self.inner.seek_to_first()?;
self.collect_and_merge()
}
fn next(&mut self) -> Result<bool> {
if !self.inner.valid() {
self.current_key = None;
self.current_value = None;
return Ok(false);
}
self.collect_and_merge()?;
Ok(self.current_key.is_some())
}
fn valid(&self) -> bool {
self.current_key.is_some()
}
fn key(&self) -> Result<Option<&[u8]>> {
Ok(self.current_key.as_deref())
}
fn take_key(&mut self) -> Result<Option<Bytes>> {
Ok(self.current_key.take())
}
fn take_value(&mut self) -> Result<Option<KvValue>> {
Ok(self.current_value.take())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::iterator::mock_iterator::MockIterator;
use crate::sst::row_codec::encode_value;
use crate::r#type::{Column, Value, ValueType};
fn make_value_bytes(columns: Vec<Option<Column>>, num_columns: usize) -> Vec<u8> {
let value = Value::new(columns);
encode_value(&value, num_columns).to_vec()
}
fn make_value_bytes_with_expiry(
columns: Vec<Option<Column>>,
num_columns: usize,
expired_at: Option<u32>,
) -> Vec<u8> {
let value = Value::new_with_expired_at(columns, expired_at);
encode_value(&value, num_columns).to_vec()
}
#[test]
fn test_deduplicating_no_duplicates() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v1".to_vec()))],
num_columns,
),
),
(
b"b",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v2".to_vec()))],
num_columns,
),
),
(
b"c",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v3".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let mut results = vec![];
while dedup.valid() {
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
results.push((k, decoded));
dedup.next().unwrap();
}
assert_eq!(results.len(), 3);
assert_eq!(results[0].0.as_ref(), b"a");
assert_eq!(results[1].0.as_ref(), b"b");
assert_eq!(results[2].0.as_ref(), b"c");
}
#[test]
fn test_deduplicating_with_put_overwrites() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"new".to_vec()))],
num_columns,
),
),
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"old".to_vec()))],
num_columns,
),
),
(
b"b",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v2".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let mut results = vec![];
while dedup.valid() {
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
results.push((k, decoded));
dedup.next().unwrap();
}
assert_eq!(results.len(), 2);
assert_eq!(results[0].0.as_ref(), b"a");
assert_eq!(
results[0].1.columns()[0].as_ref().unwrap().data().as_ref(),
b"new"
);
assert_eq!(results[1].0.as_ref(), b"b");
}
#[test]
fn test_deduplicating_merge_callback() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"new".to_vec()))],
num_columns,
),
),
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::PutSeparated, b"old".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let overlapped = std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
let overlapped_for_callback = std::rc::Rc::clone(&overlapped);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
Some(Box::new(move |old_column, _new_column| {
if let Some(old_column) = old_column {
overlapped_for_callback
.borrow_mut()
.push(old_column.value_type);
}
})),
Schema::empty(),
);
dedup.seek_to_first().unwrap();
assert!(dedup.valid());
assert_eq!(&*overlapped.borrow(), &[ValueType::PutSeparated]);
}
#[test]
fn test_deduplicating_with_merge_concatenates() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Merge, b"_suffix".to_vec()))],
num_columns,
),
),
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"base".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
assert_eq!(k.as_ref(), b"a");
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"base_suffix"
);
}
#[test]
fn test_deduplicating_multiple_same_keys() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Merge, b"3".to_vec()))],
num_columns,
),
),
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Merge, b"2".to_vec()))],
num_columns,
),
),
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"1".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
assert_eq!(k.as_ref(), b"a");
assert_eq!(
decoded.columns()[0].as_ref().unwrap().data().as_ref(),
b"123"
);
}
#[test]
fn test_deduplicating_with_delete() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Delete, b"".to_vec()))],
num_columns,
),
),
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"old".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
assert_eq!(k.as_ref(), b"a");
assert!(matches!(
decoded.columns()[0].as_ref().unwrap().value_type(),
ValueType::Delete
));
}
#[test]
fn test_deduplicating_empty() {
let iter = MockIterator::new(Vec::<(&[u8], &[u8])>::new());
let mut dedup = DeduplicatingIterator::new(
iter,
1,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
assert!(!dedup.valid());
}
#[test]
fn test_deduplicating_seek() {
let num_columns = 1;
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v1".to_vec()))],
num_columns,
),
),
(
b"b",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v2".to_vec()))],
num_columns,
),
),
(
b"c",
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"v3".to_vec()))],
num_columns,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek(b"b").unwrap();
assert!(dedup.valid());
assert_eq!(dedup.key().unwrap().unwrap(), b"b");
}
#[test]
fn test_deduplicating_multi_column() {
let num_columns = 2;
let v1 = make_value_bytes(
vec![
Some(Column::new(ValueType::Put, b"col1_new".to_vec())),
Some(Column::new(ValueType::Merge, b"_append".to_vec())),
],
num_columns,
);
let v2 = make_value_bytes(
vec![
Some(Column::new(ValueType::Put, b"col1_old".to_vec())),
Some(Column::new(ValueType::Put, b"col2_old".to_vec())),
],
num_columns,
);
let entries: Vec<(&[u8], Vec<u8>)> = vec![(b"a", v1), (b"a", v2)];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
let cols = decoded.columns();
assert_eq!(k.as_ref(), b"a");
assert_eq!(cols[0].as_ref().unwrap().data().as_ref(), b"col1_new");
assert_eq!(
cols[1].as_ref().unwrap().data().as_ref(),
b"col2_old_append"
);
}
#[test]
fn test_deduplicating_skips_expired_for_key() {
let num_columns = 1;
let ttl_provider = Arc::new(TTLProvider::new(
&crate::ttl::TtlConfig {
enabled: true,
default_ttl_seconds: None,
},
Arc::new(crate::time::ManualTimeProvider::new(10)),
));
let now = ttl_provider.now_seconds();
let entries: Vec<(&[u8], Vec<u8>)> = vec![
(
b"a",
make_value_bytes_with_expiry(
vec![Some(Column::new(ValueType::Put, b"new".to_vec()))],
num_columns,
Some(now - 1),
),
),
(
b"a",
make_value_bytes_with_expiry(
vec![Some(Column::new(ValueType::Put, b"old".to_vec()))],
num_columns,
None,
),
),
(
b"b",
make_value_bytes_with_expiry(
vec![Some(Column::new(ValueType::Put, b"b_new".to_vec()))],
num_columns,
Some(now - 1),
),
),
(
b"b",
make_value_bytes_with_expiry(
vec![Some(Column::new(ValueType::Put, b"b_old".to_vec()))],
num_columns,
Some(now - 1),
),
),
(
b"c",
make_value_bytes_with_expiry(
vec![Some(Column::new(ValueType::Put, b"c".to_vec()))],
num_columns,
None,
),
),
];
let iter = MockIterator::new(entries);
let mut dedup = DeduplicatingIterator::new(
iter,
num_columns,
ttl_provider.clone(),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let mut results = vec![];
while dedup.valid() {
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
results.push((k, decoded));
dedup.next().unwrap();
}
assert_eq!(results.len(), 2);
assert_eq!(results[0].0.as_ref(), b"a");
assert_eq!(
results[0].1.columns()[0].as_ref().unwrap().data().as_ref(),
b"old"
);
assert_eq!(results[1].0.as_ref(), b"c");
assert_eq!(
results[1].1.columns()[0].as_ref().unwrap().data().as_ref(),
b"c"
);
}
#[test]
fn test_deduplicating_with_merging_iterator() {
use crate::iterator::MergingIterator;
let num_columns = 1;
let iter1 = MockIterator::new(vec![
(
b"a" as &[u8],
make_value_bytes(
vec![Some(Column::new(ValueType::Merge, b"_suffix".to_vec()))],
num_columns,
),
),
(
b"b" as &[u8],
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"b1".to_vec()))],
num_columns,
),
),
]);
let iter2 = MockIterator::new(vec![
(
b"a" as &[u8],
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"base".to_vec()))],
num_columns,
),
),
(
b"c" as &[u8],
make_value_bytes(
vec![Some(Column::new(ValueType::Put, b"c1".to_vec()))],
num_columns,
),
),
]);
let merging_iter = MergingIterator::new(vec![iter1, iter2]);
let mut dedup = DeduplicatingIterator::new(
merging_iter,
num_columns,
Arc::new(TTLProvider::disabled()),
None,
Schema::empty(),
);
dedup.seek_to_first().unwrap();
let mut results = vec![];
while dedup.valid() {
let (k, kv) = dedup.take_current().unwrap().unwrap();
let decoded = kv.into_decoded(num_columns).unwrap();
results.push((k, decoded));
dedup.next().unwrap();
}
assert_eq!(results.len(), 3);
assert_eq!(results[0].0.as_ref(), b"a");
assert_eq!(
results[0].1.columns()[0].as_ref().unwrap().data().as_ref(),
b"base_suffix"
);
assert_eq!(results[1].0.as_ref(), b"b");
assert_eq!(
results[1].1.columns()[0].as_ref().unwrap().data().as_ref(),
b"b1"
);
assert_eq!(results[2].0.as_ref(), b"c");
assert_eq!(
results[2].1.columns()[0].as_ref().unwrap().data().as_ref(),
b"c1"
);
}
}