reifydb_store_multi/store/
drop.rs1use reifydb_core::{common::CommitVersion, interface::store::EntryKind};
5use reifydb_type::util::cowvec::CowVec;
6
7use crate::{Result, tier::TierStorage};
8
9#[derive(Debug, Clone)]
11pub struct DropEntry {
12 pub key: CowVec<u8>,
14 pub version: CommitVersion,
16 pub value_bytes: u64,
18}
19
20pub(crate) fn find_keys_to_drop<S: TierStorage>(
30 storage: &S,
31 table: EntryKind,
32 key: &[u8],
33 pending_version: Option<CommitVersion>,
34) -> Result<Vec<DropEntry>> {
35 let all_versions = storage.get_all_versions(table, key)?;
37
38 let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
40 .into_iter()
41 .map(|(version, value)| {
42 let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
43 (version, value_bytes)
44 })
45 .collect();
46
47 if let Some(pending_ver) = pending_version {
50 if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
52 versioned_entries.push((pending_ver, 0));
55 }
56 }
57
58 versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
60
61 let mut entries_to_drop = Vec::new();
63 let key_cow = CowVec::new(key.to_vec());
64
65 for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
66 let should_drop = idx > 0;
68
69 if should_drop {
70 if Some(entry_version) == pending_version {
72 continue;
73 }
74
75 entries_to_drop.push(DropEntry {
76 key: key_cow.clone(),
77 version: entry_version,
78 value_bytes,
79 });
80 }
81 }
82
83 Ok(entries_to_drop)
84}
85
86#[cfg(test)]
87pub mod tests {
88 use std::collections::HashMap;
89
90 use super::*;
91 use crate::hot::storage::HotStorage;
92
93 fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
95 for v in versions {
96 let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
97 storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
98 }
99 }
100
101 fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
103 entries.iter().map(|e| e.version.0).collect()
104 }
105
106 #[test]
107 fn test_drop_historical_versions() {
108 let storage = HotStorage::memory();
109 let table = EntryKind::Multi;
110 let key = b"test_key";
111
112 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
114
115 let to_drop = find_keys_to_drop(&storage, table, key, None).unwrap();
117
118 assert_eq!(to_drop.len(), 4);
119 let versions = extract_dropped_versions(&to_drop);
120 assert!(versions.contains(&1));
121 assert!(versions.contains(&5));
122 assert!(versions.contains(&10));
123 assert!(versions.contains(&20));
124 assert!(!versions.contains(&100));
125 }
126
127 #[test]
128 fn test_keep_latest_with_pending() {
129 let storage = HotStorage::memory();
130 let table = EntryKind::Multi;
131 let key = b"test_key";
132
133 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
135
136 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(20))).unwrap();
138
139 assert_eq!(to_drop.len(), 3);
140 let versions = extract_dropped_versions(&to_drop);
141 assert!(versions.contains(&1));
142 assert!(versions.contains(&5));
143 assert!(versions.contains(&10));
144 assert!(!versions.contains(&20));
145 }
146
147 #[test]
148 fn test_single_version_no_drop() {
149 let storage = HotStorage::memory();
150 let table = EntryKind::Multi;
151 let key = b"test_key";
152
153 setup_versioned_entries(&storage, table, key, &[42]);
154
155 let to_drop = find_keys_to_drop(&storage, table, key, None).unwrap();
157 assert!(to_drop.is_empty());
158 }
159
160 #[test]
161 fn test_empty_storage() {
162 let storage = HotStorage::memory();
163 let table = EntryKind::Multi;
164 let key = b"nonexistent";
165
166 let to_drop = find_keys_to_drop(&storage, table, key, None).unwrap();
167 assert!(to_drop.is_empty());
168 }
169}