use reifydb_core::{common::CommitVersion, interface::store::EntryKind};
use reifydb_type::util::cowvec::CowVec;
use crate::{Result, tier::TierStorage};
#[derive(Debug, Clone)]
pub struct DropEntry {
pub key: CowVec<u8>,
pub version: CommitVersion,
pub value_bytes: u64,
}
pub(crate) fn find_keys_to_drop<S: TierStorage>(
storage: &S,
table: EntryKind,
key: &[u8],
up_to_version: Option<CommitVersion>,
keep_last_versions: Option<usize>,
pending_version: Option<CommitVersion>,
) -> Result<Vec<DropEntry>> {
let all_versions = storage.get_all_versions(table, key)?;
let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
.into_iter()
.map(|(version, value)| {
let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
(version, value_bytes)
})
.collect();
if let Some(pending_ver) = pending_version {
if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
versioned_entries.push((pending_ver, 0));
}
}
versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
let mut entries_to_drop = Vec::new();
let key_cow = CowVec::new(key.to_vec());
for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
let should_drop = match (up_to_version, keep_last_versions) {
(None, None) => true,
(Some(threshold), None) => entry_version < threshold,
(None, Some(keep_count)) => idx >= keep_count,
(Some(threshold), Some(keep_count)) => entry_version < threshold && idx >= keep_count,
};
if should_drop {
if Some(entry_version) == pending_version {
continue;
}
entries_to_drop.push(DropEntry {
key: key_cow.clone(),
version: entry_version,
value_bytes,
});
}
}
Ok(entries_to_drop)
}
#[cfg(test)]
pub mod tests {
use std::collections::HashMap;
use super::*;
use crate::hot::storage::HotStorage;
fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
for v in versions {
let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
}
}
fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
entries.iter().map(|e| e.version.0).collect()
}
#[test]
fn test_drop_all_versions() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
assert_eq!(to_drop.len(), 5);
let versions = extract_dropped_versions(&to_drop);
assert!(versions.contains(&1));
assert!(versions.contains(&5));
assert!(versions.contains(&10));
assert!(versions.contains(&20));
assert!(versions.contains(&100));
}
#[test]
fn test_drop_up_to_version() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 2);
assert!(versions.contains(&1));
assert!(versions.contains(&5));
assert!(!versions.contains(&10));
assert!(!versions.contains(&20));
assert!(!versions.contains(&100));
}
#[test]
fn test_drop_up_to_version_boundary() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[9, 10, 11]);
let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 1);
assert!(versions.contains(&9)); }
#[test]
fn test_keep_last_n_versions() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, None, Some(2), None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 3);
assert!(versions.contains(&1));
assert!(versions.contains(&5));
assert!(versions.contains(&10));
assert!(!versions.contains(&20));
assert!(!versions.contains(&100));
}
#[test]
fn test_keep_more_than_exists() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
let to_drop = find_keys_to_drop(&storage, table, key, None, Some(10), None).unwrap();
assert!(to_drop.is_empty());
}
#[test]
fn test_keep_zero_versions() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
let to_drop = find_keys_to_drop(&storage, table, key, None, Some(0), None).unwrap();
assert_eq!(to_drop.len(), 3);
}
#[test]
fn test_keep_one_version() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 4);
assert!(!versions.contains(&100)); }
#[test]
fn test_combined_constraints_keep_protects() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(15)), Some(3), None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 2); assert!(versions.contains(&1));
assert!(versions.contains(&5));
assert!(!versions.contains(&10)); assert!(!versions.contains(&20));
assert!(!versions.contains(&100));
}
#[test]
fn test_combined_constraints_version_restricts() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(3)), Some(2), None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 1); assert!(versions.contains(&1));
}
#[test]
fn test_combined_constraints_both_aggressive() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(50)), Some(1), None).unwrap();
let versions = extract_dropped_versions(&to_drop);
assert_eq!(versions.len(), 4); assert!(versions.contains(&1));
assert!(versions.contains(&5));
assert!(versions.contains(&10));
assert!(versions.contains(&20));
assert!(!versions.contains(&100)); }
#[test]
fn test_empty_storage() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"nonexistent";
let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
assert!(to_drop.is_empty());
}
#[test]
fn test_single_version_drop_all() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[42]);
let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
assert_eq!(to_drop.len(), 1);
}
#[test]
fn test_single_version_keep_one() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[42]);
let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
assert!(to_drop.is_empty());
}
#[test]
fn test_different_keys_isolated() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
setup_versioned_entries(&storage, table, b"key_a", &[1, 2, 3]);
setup_versioned_entries(&storage, table, b"key_b", &[10, 20, 30]);
let to_drop = find_keys_to_drop(&storage, table, b"key_a", None, None, None).unwrap();
assert_eq!(to_drop.len(), 3);
for entry in &to_drop {
assert_eq!(entry.key.as_slice(), b"key_a");
}
}
#[test]
fn test_up_to_version_zero() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(0)), None, None).unwrap();
assert!(to_drop.is_empty());
}
#[test]
fn test_up_to_version_max() {
let storage = HotStorage::memory();
let table = EntryKind::Multi;
let key = b"test_key";
setup_versioned_entries(&storage, table, key, &[1, 5, u64::MAX - 1]);
let to_drop =
find_keys_to_drop(&storage, table, key, Some(CommitVersion(u64::MAX)), None, None).unwrap();
assert_eq!(to_drop.len(), 3);
}
}