use async_trait::async_trait;
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use crate::clock::SystemClock;
use crate::error::SlateDBError;
use crate::iter::KeyValueIterator;
use crate::seq_tracker::{FindOption, SequenceTracker};
use crate::types::ValueDeletable::Tombstone;
use crate::types::{RowEntry, ValueDeletable};
pub(crate) struct RetentionIterator<T: KeyValueIterator> {
inner: T,
retention_timeout: Option<Duration>,
retention_min_seq: Option<u64>,
buffer: RetentionBuffer,
filter_tombstone: bool,
compaction_start_ts: i64,
system_clock: Arc<dyn SystemClock>,
sequence_tracker: Arc<SequenceTracker>,
total_bytes_processed: u64,
}
impl<T: KeyValueIterator> RetentionIterator<T> {
pub(crate) async fn new(
inner: T,
retention_timeout: Option<Duration>,
retention_min_seq: Option<u64>,
filter_tombstone: bool,
compaction_start_ts: i64,
system_clock: Arc<dyn SystemClock>,
sequence_tracker: Arc<SequenceTracker>,
) -> Result<Self, SlateDBError> {
Ok(Self {
inner,
retention_timeout,
retention_min_seq,
filter_tombstone,
compaction_start_ts,
system_clock,
sequence_tracker,
buffer: RetentionBuffer::new(),
total_bytes_processed: 0,
})
}
fn apply_retention_filter(
versions: BTreeMap<Reverse<u64>, RowEntry>,
compaction_start_ts: i64,
system_clock: Arc<dyn SystemClock>,
retention_timeout: Option<Duration>,
retention_min_seq: Option<u64>,
filter_tombstone: bool,
sequence_tracker: Arc<SequenceTracker>,
) -> BTreeMap<Reverse<u64>, RowEntry> {
let mut filtered_versions = BTreeMap::new();
let current_system_ts = system_clock.now().timestamp_millis();
for (_, entry) in versions.into_iter() {
let is_merge = matches!(&entry.value, ValueDeletable::Merge(_));
let entry = match entry.expire_ts.as_ref() {
Some(expire_ts) if *expire_ts <= compaction_start_ts => {
if is_merge {
continue;
}
RowEntry {
key: entry.key,
value: Tombstone,
seq: entry.seq,
expire_ts: None,
create_ts: entry.create_ts,
}
}
_ => entry,
};
let entry_seq = entry.seq;
filtered_versions.insert(Reverse(entry.seq), entry);
let continue_retain_by_time = retention_timeout
.map(|timeout| {
let create_sys_ts = sequence_tracker
.find_ts(entry_seq, FindOption::RoundUp)
.map(|ts| ts.timestamp_millis())
.unwrap_or(current_system_ts);
create_sys_ts + (timeout.as_millis() as i64) > current_system_ts
})
.unwrap_or(false);
let continue_retain_by_seq = retention_min_seq
.map(|min_seq| entry_seq > min_seq)
.unwrap_or(false);
let continue_retain = continue_retain_by_time || continue_retain_by_seq || is_merge;
if !continue_retain {
break;
}
}
if filter_tombstone {
while filtered_versions
.iter()
.last()
.map(|(_, entry)| entry.value.is_tombstone())
.unwrap_or(false)
{
filtered_versions.pop_last();
}
}
filtered_versions
}
pub(crate) fn total_bytes_processed(&self) -> u64 {
self.total_bytes_processed
}
}
#[async_trait]
impl<T: KeyValueIterator> KeyValueIterator for RetentionIterator<T> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.inner.init().await?;
Ok(())
}
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
loop {
match self.buffer.state() {
RetentionBufferState::NeedPush => {
let entry = match self.inner.next_entry().await? {
Some(entry) => {
self.total_bytes_processed +=
entry.key.len() as u64 + entry.value.len() as u64;
entry
}
None => {
self.buffer.mark_end_of_input();
continue;
}
};
self.buffer.push(entry);
}
RetentionBufferState::NeedPopAndContinue => {
match self.buffer.pop() {
Some(entry) => return Ok(Some(entry)),
None => continue, }
}
RetentionBufferState::NeedPopAndQuit => {
return Ok(self.buffer.pop());
}
RetentionBufferState::NeedProcess => {
let compaction_start_ts = self.compaction_start_ts;
let retention_timeout = self.retention_timeout;
let retention_min_seq = self.retention_min_seq;
let system_clock = self.system_clock.clone();
self.buffer.process_retention(|versions| {
Self::apply_retention_filter(
versions,
compaction_start_ts,
system_clock,
retention_timeout,
retention_min_seq,
self.filter_tombstone,
self.sequence_tracker.clone(),
)
})?;
}
}
}
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
self.buffer.clear();
self.inner.seek(next_key).await?;
Ok(())
}
}
struct RetentionBuffer {
current_versions: BTreeMap<Reverse<u64>, RowEntry>,
next_entry: Option<RowEntry>,
processed: bool,
end_of_input: bool,
}
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
enum RetentionBufferState {
NeedPush,
NeedPopAndContinue,
NeedPopAndQuit,
NeedProcess,
}
impl RetentionBuffer {
fn new() -> Self {
Self {
current_versions: BTreeMap::new(),
next_entry: None,
processed: false,
end_of_input: false,
}
}
fn state(&self) -> RetentionBufferState {
if self.processed {
if self.end_of_input {
return RetentionBufferState::NeedPopAndQuit;
} else {
return RetentionBufferState::NeedPopAndContinue;
}
}
if self.end_of_input || self.next_entry.is_some() {
return RetentionBufferState::NeedProcess;
}
RetentionBufferState::NeedPush
}
fn clear(&mut self) {
self.current_versions.clear();
self.next_entry = None;
self.processed = false;
self.end_of_input = false;
}
fn mark_end_of_input(&mut self) {
self.end_of_input = true;
}
fn push(&mut self, entry: RowEntry) {
let current_key = match self.current_versions.values().next() {
Some(entry) => entry.key.clone(),
None => {
self.current_versions.insert(Reverse(entry.seq), entry);
return;
}
};
if entry.key != current_key {
self.next_entry = Some(entry);
return;
}
self.current_versions.insert(Reverse(entry.seq), entry);
}
fn process_retention(
&mut self,
f: impl FnOnce(BTreeMap<Reverse<u64>, RowEntry>) -> BTreeMap<Reverse<u64>, RowEntry>,
) -> Result<(), SlateDBError> {
if self.processed {
return Ok(());
}
let current_versions = std::mem::take(&mut self.current_versions);
let processed_versions = f(current_versions);
self.current_versions = processed_versions;
self.processed = true;
Ok(())
}
fn pop(&mut self) -> Option<RowEntry> {
match self.current_versions.pop_first() {
Some((_, entry)) => Some(entry),
None => {
let next_entry = self.next_entry.take();
if let Some(entry) = next_entry {
self.current_versions.insert(Reverse(entry.seq), entry);
self.processed = false;
None } else if self.end_of_input {
None
} else {
unreachable!("No next entry but not at end of input - this shouldn't happen");
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::RowEntry;
use rstest::rstest;
#[cfg(feature = "test-util")]
use crate::seq_tracker::TrackedSeq;
struct RetentionBufferTestCase {
name: &'static str,
build: fn() -> RetentionBuffer,
expected_current_versions_len: usize,
expected_has_next_entry: bool,
expected_processed: bool,
expected_end_of_input: bool,
expected_state: RetentionBufferState,
}
#[rstest]
#[case(RetentionBufferTestCase {
name: "empty_buffer",
build: || RetentionBuffer::new(),
expected_current_versions_len: 0,
expected_has_next_entry: false,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPush,
})]
#[case(RetentionBufferTestCase {
name: "single_entry",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer
},
expected_current_versions_len: 1,
expected_has_next_entry: false,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPush,
})]
#[case(RetentionBufferTestCase {
name: "key_transition",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.push(RowEntry::new_value(b"key2", b"value2", 2));
buffer
},
expected_current_versions_len: 1,
expected_has_next_entry: true,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedProcess,
})]
#[case(RetentionBufferTestCase {
name: "processed_state",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.process_retention(|versions| versions).unwrap();
buffer
},
expected_current_versions_len: 1,
expected_has_next_entry: false,
expected_processed: true,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPopAndContinue,
})]
#[case(RetentionBufferTestCase {
name: "end_of_input_processed",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.mark_end_of_input();
buffer.process_retention(|versions| versions).unwrap();
buffer
},
expected_current_versions_len: 1,
expected_has_next_entry: false,
expected_processed: true,
expected_end_of_input: true,
expected_state: RetentionBufferState::NeedPopAndQuit,
})]
#[case(RetentionBufferTestCase {
name: "multiple_versions_same_key",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.push(RowEntry::new_value(b"key1", b"value2", 2));
buffer.push(RowEntry::new_value(b"key1", b"value3", 3));
buffer
},
expected_current_versions_len: 3,
expected_has_next_entry: false,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPush,
})]
#[case(RetentionBufferTestCase {
name: "pop_operation",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.push(RowEntry::new_value(b"key1", b"value2", 2));
buffer.process_retention(|versions| versions).unwrap();
buffer.pop(); // Execute pop operation in the build function
buffer
},
expected_current_versions_len: 1,
expected_has_next_entry: false,
expected_processed: true,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPopAndContinue,
})]
#[case(RetentionBufferTestCase {
name: "clear_operation",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.push(RowEntry::new_value(b"key2", b"value2", 2));
buffer.process_retention(|versions| versions).unwrap();
buffer.mark_end_of_input();
buffer.clear(); // Execute clear operation in the build function
buffer
},
expected_current_versions_len: 0,
expected_has_next_entry: false,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPush,
})]
#[case(RetentionBufferTestCase {
name: "tombstone_entries",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.push(RowEntry::new_tombstone(b"key1", 2));
buffer
},
expected_current_versions_len: 2,
expected_has_next_entry: false,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPush,
})]
#[case(RetentionBufferTestCase {
name: "merge_entries",
build: || {
let mut buffer = RetentionBuffer::new();
buffer.push(RowEntry::new_value(b"key1", b"value1", 1));
buffer.push(RowEntry::new_merge(b"key1", b"merge1", 2));
buffer.push(RowEntry::new_tombstone(b"key1", 3));
buffer
},
expected_current_versions_len: 3,
expected_has_next_entry: false,
expected_processed: false,
expected_end_of_input: false,
expected_state: RetentionBufferState::NeedPush,
})]
fn test_retention_buffer_table_driven(#[case] test_case: RetentionBufferTestCase) {
let buffer = (test_case.build)();
assert_eq!(
buffer.current_versions.len(),
test_case.expected_current_versions_len,
"Test case '{}': current_versions_len mismatch",
test_case.name
);
assert_eq!(
buffer.next_entry.is_some(),
test_case.expected_has_next_entry,
"Test case '{}': has_next_entry mismatch",
test_case.name
);
assert_eq!(
buffer.processed, test_case.expected_processed,
"Test case '{}': processed mismatch",
test_case.name
);
assert_eq!(
buffer.end_of_input, test_case.expected_end_of_input,
"Test case '{}': end_of_input mismatch",
test_case.name
);
let current_state = buffer.state();
assert_eq!(
std::mem::discriminant(¤t_state),
std::mem::discriminant(&test_case.expected_state),
"Test case '{}': state mismatch, expected {:?}, got {:?}",
test_case.name,
test_case.expected_state,
current_state
);
}
#[cfg(feature = "test-util")]
struct RetentionIteratorTestCase {
name: &'static str,
input_entries: Vec<RowEntry>,
retention_timeout: Option<Duration>,
retention_min_seq: Option<u64>,
system_clock_ts: i64,
compaction_start_ts: i64,
expected_entries: Vec<RowEntry>,
filter_tombstone: bool,
}
#[rstest]
#[case(RetentionIteratorTestCase {
name: "empty_iterator",
input_entries: vec![],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "single_entry_within_retention",
input_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950), // 50 seconds ago
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950)
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "single_entry_outside_retention",
input_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // 500 seconds ago
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "multiple_versions_same_key_within_retention",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950), // Latest
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900), // Within retention
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850),
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "multiple_versions_same_key_mixed_retention",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950), // Latest (always kept)
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // Outside retention
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_timeout: Some(Duration::from_secs(3600)), retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850),
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "tombstone_entries",
input_entries: vec![
RowEntry::new_tombstone(b"key1", 3).with_create_ts(950), // Latest
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // Outside retention
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_tombstone(b"key1", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850),
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "merge_entries",
input_entries: vec![
RowEntry::new_merge(b"key1", b"merge3", 3).with_create_ts(950), // Latest
RowEntry::new_merge(b"key1", b"merge2", 2).with_create_ts(500), // Outside retention
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850), // Within retention
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_merge(b"key1", b"merge3", 3).with_create_ts(950),
RowEntry::new_merge(b"key1", b"merge2", 2).with_create_ts(500), // 500 + 3600 = 4100 >= 1000, so kept
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850),
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "zero_retention_time",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(1000), // Current time
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(999), // 1 second ago
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(998), // 2 seconds ago
],
retention_timeout: Some(Duration::from_secs(0)), // No retention
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(1000), // Latest always kept
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "very_long_retention_time",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(100), // Very old
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(50), // Very old
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(10), // Very old
],
retention_timeout: Some(Duration::from_secs(1000)), // Very long retention
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(100),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(50),
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(10),
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "expired_entry_converted_to_tombstone",
input_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950).with_expire_ts(900), // Expired
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_tombstone(b"key1", 1).with_create_ts(950), // Converted to tombstone
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "not_expired_entry_kept_as_is",
input_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950).with_expire_ts(1100), // Not expired
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950).with_expire_ts(1100), // Kept as is
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "mixed_expired_and_not_expired_entries",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950).with_expire_ts(1100), // Not expired
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900).with_expire_ts(950), // Expired
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850).with_expire_ts(1200), // Not expired
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950).with_expire_ts(1100), // Not expired
RowEntry::new_tombstone(b"key1", 2).with_create_ts(900), // Converted to tombstone
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850).with_expire_ts(1200), // Not expired
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "expire_ts_equals_compaction_start_ts",
input_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950).with_expire_ts(1000), // Expired (equal)
],
retention_timeout: Some(Duration::from_secs(3600)), retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_tombstone(b"key1", 1).with_create_ts(950), ],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "retention_min_seq_basic",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 30).with_create_ts(950), // seq > retention_min_seq
RowEntry::new_value(b"key1", b"value2", 20).with_create_ts(900), // seq <= retention_min_seq
RowEntry::new_value(b"key1", b"value1", 10).with_create_ts(850), // seq <= retention_min_seq
],
retention_timeout: None,
retention_min_seq: Some(25),
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 30).with_create_ts(950), // Kept (latest in retention window)
RowEntry::new_value(b"key1", b"value2", 20).with_create_ts(900), // Kept (boundary value for snapshots)
// value1 filtered out because it's older than the boundary
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "retention_min_seq_with_timeout",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 30).with_create_ts(950), // seq > retention_min_seq, within timeout
RowEntry::new_value(b"key1", b"value2", 20).with_create_ts(900), // seq > retention_min_seq, within timeout
RowEntry::new_value(b"key1", b"value1", 10).with_create_ts(850), // seq <= retention_min_seq, within timeout
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: Some(25),
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 30).with_create_ts(950), // Kept (latest)
RowEntry::new_value(b"key1", b"value2", 20).with_create_ts(900), // Kept (within retention window)
RowEntry::new_value(b"key1", b"value1", 10).with_create_ts(850), // Kept (within timeout window)
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "expired_entry_converted_to_tombstone_filter_tombstone_true",
input_entries: vec![
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(950).with_expire_ts(900), // Expired
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
// Tombstone filtered out, so no entries remain
],
filter_tombstone: true, // Converted tombstone filtered out
})]
#[case(RetentionIteratorTestCase {
name: "tombstone_tail_filtered_out",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950), // Latest
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900), // Middle value
RowEntry::new_tombstone(b"key1", 1).with_create_ts(850), // Tombstone at end (oldest)
],
retention_timeout: Some(Duration::from_secs(3600)), retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
],
filter_tombstone: true, })]
#[case(RetentionIteratorTestCase {
name: "all_tombstones_filtered_out",
input_entries: vec![
RowEntry::new_tombstone(b"key1", 3).with_create_ts(950), // Latest is tombstone
RowEntry::new_tombstone(b"key1", 2).with_create_ts(900), // Second tombstone
RowEntry::new_tombstone(b"key1", 1).with_create_ts(850), // Third tombstone
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
// All tombstones filtered out, so no entries remain
],
filter_tombstone: true, // All tombstones filtered out
})]
#[case(RetentionIteratorTestCase {
name: "mixed_expired_and_not_expired_entries_filter_tombstone_true",
input_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950).with_expire_ts(1100), // Not expired
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900).with_expire_ts(950), // Expired
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850).with_expire_ts(1200), // Not expired
],
retention_timeout: Some(Duration::from_secs(3600)), // 1 hour
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value3", 3).with_create_ts(950).with_expire_ts(1100), // Not expired
RowEntry::new_tombstone(b"key1", 2).with_create_ts(900), // Expired变成tombstone,且不会被移除
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850).with_expire_ts(1200), // Not expired
],
filter_tombstone: true, // tombstone is not in the tail, so not filtered out
})]
#[case(RetentionIteratorTestCase {
name: "filter_out_expired_merge_entries",
input_entries: vec![
RowEntry::new_merge(b"key1", b"value3", 3).with_create_ts(950).with_expire_ts(1100), // Not expired
RowEntry::new_merge(b"key1", b"value2", 2).with_create_ts(900).with_expire_ts(950), // Expired
RowEntry::new_merge(b"key1", b"value1", 1).with_create_ts(850).with_expire_ts(1200), // Not expired
],
retention_timeout: Some(Duration::ZERO),
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_merge(b"key1", b"value3", 3).with_create_ts(950).with_expire_ts(1100), // Not expired
RowEntry::new_merge(b"key1", b"value1", 1).with_create_ts(850).with_expire_ts(1200), // Not expired
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "retain_up_to_first_non_merge_entry",
input_entries: vec![
RowEntry::new_merge(b"key1", b"value5", 5).with_create_ts(1050),
RowEntry::new_merge(b"key1", b"value4", 4).with_create_ts(1000),
RowEntry::new_merge(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850),
],
retention_timeout: Some(Duration::ZERO),
retention_min_seq: None,
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_merge(b"key1", b"value5", 5).with_create_ts(1050),
RowEntry::new_merge(b"key1", b"value4", 4).with_create_ts(1000),
RowEntry::new_merge(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
],
filter_tombstone: false,
})]
#[case(RetentionIteratorTestCase {
name: "retain_up_to_first_non_merge_entry_when_seq_num_provided",
input_entries: vec![
RowEntry::new_value(b"key1", b"value5", 5).with_create_ts(1050),
RowEntry::new_merge(b"key1", b"value4", 4).with_create_ts(1000),
RowEntry::new_merge(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
RowEntry::new_value(b"key1", b"value1", 1).with_create_ts(850),
],
retention_timeout: Some(Duration::ZERO),
retention_min_seq: Some(4),
system_clock_ts: 1000,
compaction_start_ts: 1000,
expected_entries: vec![
RowEntry::new_value(b"key1", b"value5", 5).with_create_ts(1050),
RowEntry::new_merge(b"key1", b"value4", 4).with_create_ts(1000),
RowEntry::new_merge(b"key1", b"value3", 3).with_create_ts(950),
RowEntry::new_value(b"key1", b"value2", 2).with_create_ts(900),
],
filter_tombstone: false,
})]
#[test]
#[cfg(feature = "test-util")]
fn test_retention_iterator_table_driven(#[case] test_case: RetentionIteratorTestCase) {
use crate::clock::MockSystemClock;
use crate::test_utils::TestIterator;
let mut versions = std::collections::BTreeMap::new();
for entry in test_case.input_entries.iter() {
versions.insert(Reverse(entry.seq), entry.clone());
}
let system_clock = Arc::new(MockSystemClock::with_time(test_case.system_clock_ts));
let filtered_versions = RetentionIterator::<TestIterator>::apply_retention_filter(
versions,
test_case.compaction_start_ts,
system_clock,
test_case.retention_timeout,
test_case.retention_min_seq,
test_case.filter_tombstone,
Arc::new(SequenceTracker::new()),
);
let mut actual_entries = Vec::new();
for (_, entry) in filtered_versions.iter() {
actual_entries.push(entry.clone());
}
actual_entries.sort_by(|a, b| b.seq.cmp(&a.seq));
assert_eq!(
actual_entries.len(),
test_case.expected_entries.len(),
"Test case '{}': Expected {} entries, got {}",
test_case.name,
test_case.expected_entries.len(),
actual_entries.len()
);
for (i, (actual, expected)) in actual_entries
.iter()
.zip(test_case.expected_entries.iter())
.enumerate()
{
assert_eq!(
actual.key, expected.key,
"Test case '{}': Entry {} key mismatch",
test_case.name, i
);
assert_eq!(
actual.value, expected.value,
"Test case '{}': Entry {} value mismatch",
test_case.name, i
);
assert_eq!(
actual.seq, expected.seq,
"Test case '{}': Entry {} sequence number mismatch",
test_case.name, i
);
assert_eq!(
actual.create_ts, expected.create_ts,
"Test case '{}': Entry {} create timestamp mismatch",
test_case.name, i
);
}
}
#[cfg(feature = "test-util")]
#[rstest]
#[case("exact_match", vec![(5, 1_000)], 5, 1_500, 700)]
#[case("before_first_rounds_up", vec![(10, 2_000)], 7, 2_100, 400)]
#[case(
"between_entries_rounds_up",
vec![(5, 1_000), (15, 2_000)],
11,
2_400,
500
)]
#[case(
"after_last_defaults_to_now",
vec![(5, 1_000)],
20,
1_500,
100
)]
#[case("no_tracker_defaults_to_now", vec![], 8, 2_000, 0)]
fn test_retention_uses_sequence_tracker_timestamp(
#[case] _name: &str,
#[case] tracker_points: Vec<(u64, i64)>,
#[case] entry_seq: u64,
#[case] clock_now: i64,
#[case] timeout_ms: u64,
) {
use crate::clock::MockSystemClock;
use crate::test_utils::TestIterator;
use chrono::TimeZone;
let mut sorted_points = tracker_points;
sorted_points.sort_by_key(|(seq, _)| *seq);
let mut tracker = SequenceTracker::new();
for (seq, ts) in &sorted_points {
let ts = chrono::Utc.timestamp_millis_opt(*ts).unwrap();
tracker.insert(TrackedSeq { seq: *seq, ts });
}
let tracker = Arc::new(tracker);
let system_clock = Arc::new(MockSystemClock::with_time(clock_now));
let latest_seq = entry_seq + 10;
let mut versions = BTreeMap::new();
versions.insert(
Reverse(latest_seq),
RowEntry::new_value(b"k", b"new", latest_seq).with_create_ts(clock_now),
);
let target_entry = RowEntry::new_value(b"k", b"old", entry_seq);
versions.insert(Reverse(entry_seq), target_entry);
let timeout = if timeout_ms == 0 {
Duration::from_millis(0)
} else {
Duration::from_millis(timeout_ms)
};
let filtered = RetentionIterator::<TestIterator>::apply_retention_filter(
versions,
0,
system_clock.clone(),
Some(timeout),
None,
false,
tracker.clone(),
);
let derived_ts = sorted_points
.iter()
.find_map(|(seq, ts)| if *seq >= entry_seq { Some(*ts) } else { None })
.unwrap_or(clock_now);
let expected_keep_by_logic =
(derived_ts as i128 + timeout.as_millis() as i128) > clock_now as i128;
let actual_keep = filtered.contains_key(&Reverse(entry_seq));
assert_eq!(
actual_keep, expected_keep_by_logic,
"{:?}[{}@{} Now({})]",
filtered, entry_seq, derived_ts, clock_now
);
}
}