1use reifydb_core::common::CommitVersion;
11use reifydb_type::util::cowvec::CowVec;
12
13use crate::tier::{EntryKind, TierStorage};
14
15#[derive(Debug, Clone)]
17pub struct DropEntry {
18 pub key: CowVec<u8>,
20 pub version: CommitVersion,
22 pub value_bytes: u64,
24}
25
26pub(crate) fn find_keys_to_drop<S: TierStorage>(
36 storage: &S,
37 table: EntryKind,
38 key: &[u8],
39 up_to_version: Option<CommitVersion>,
40 keep_last_versions: Option<usize>,
41 pending_version: Option<CommitVersion>,
42) -> crate::Result<Vec<DropEntry>> {
43 let all_versions = storage.get_all_versions(table, key)?;
45
46 let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
48 .into_iter()
49 .map(|(version, value)| {
50 let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
51 (version, value_bytes)
52 })
53 .collect();
54
55 if let Some(pending_ver) = pending_version {
58 if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
60 versioned_entries.push((pending_ver, 0));
63 }
64 }
65
66 versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
68
69 let mut entries_to_drop = Vec::new();
71 let key_cow = CowVec::new(key.to_vec());
72
73 for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
74 let should_drop = match (up_to_version, keep_last_versions) {
78 (None, None) => true,
80 (Some(threshold), None) => entry_version < threshold,
82 (None, Some(keep_count)) => idx >= keep_count,
84 (Some(threshold), Some(keep_count)) => entry_version < threshold && idx >= keep_count,
87 };
88
89 if should_drop {
90 if Some(entry_version) == pending_version {
92 continue;
93 }
94
95 entries_to_drop.push(DropEntry {
96 key: key_cow.clone(),
97 version: entry_version,
98 value_bytes,
99 });
100 }
101 }
102
103 Ok(entries_to_drop)
104}
105
106#[cfg(test)]
107pub mod tests {
108 use std::collections::HashMap;
109
110 use super::*;
111 use crate::hot::storage::HotStorage;
112
113 fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
115 for v in versions {
116 let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
117 storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
118 }
119 }
120
121 fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
123 entries.iter().map(|e| e.version.0).collect()
124 }
125
126 #[test]
127 fn test_drop_all_versions() {
128 let storage = HotStorage::memory();
129 let table = EntryKind::Multi;
130 let key = b"test_key";
131
132 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
133
134 let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
135
136 assert_eq!(to_drop.len(), 5);
137 let versions = extract_dropped_versions(&to_drop);
138 assert!(versions.contains(&1));
139 assert!(versions.contains(&5));
140 assert!(versions.contains(&10));
141 assert!(versions.contains(&20));
142 assert!(versions.contains(&100));
143 }
144
145 #[test]
146 fn test_drop_up_to_version() {
147 let storage = HotStorage::memory();
148 let table = EntryKind::Multi;
149 let key = b"test_key";
150
151 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
153
154 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
156
157 let versions = extract_dropped_versions(&to_drop);
158 assert_eq!(versions.len(), 2);
159 assert!(versions.contains(&1));
160 assert!(versions.contains(&5));
161 assert!(!versions.contains(&10));
162 assert!(!versions.contains(&20));
163 assert!(!versions.contains(&100));
164 }
165
166 #[test]
167 fn test_drop_up_to_version_boundary() {
168 let storage = HotStorage::memory();
170 let table = EntryKind::Multi;
171 let key = b"test_key";
172
173 setup_versioned_entries(&storage, table, key, &[9, 10, 11]);
174
175 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
176
177 let versions = extract_dropped_versions(&to_drop);
178 assert_eq!(versions.len(), 1);
179 assert!(versions.contains(&9)); }
181
182 #[test]
183 fn test_keep_last_n_versions() {
184 let storage = HotStorage::memory();
185 let table = EntryKind::Multi;
186 let key = b"test_key";
187
188 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
190
191 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(2), None).unwrap();
193
194 let versions = extract_dropped_versions(&to_drop);
195 assert_eq!(versions.len(), 3);
196 assert!(versions.contains(&1));
197 assert!(versions.contains(&5));
198 assert!(versions.contains(&10));
199 assert!(!versions.contains(&20));
200 assert!(!versions.contains(&100));
201 }
202
203 #[test]
204 fn test_keep_more_than_exists() {
205 let storage = HotStorage::memory();
207 let table = EntryKind::Multi;
208 let key = b"test_key";
209
210 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
211
212 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(10), None).unwrap();
213
214 assert!(to_drop.is_empty());
215 }
216
217 #[test]
218 fn test_keep_zero_versions() {
219 let storage = HotStorage::memory();
221 let table = EntryKind::Multi;
222 let key = b"test_key";
223
224 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
225
226 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(0), None).unwrap();
227
228 assert_eq!(to_drop.len(), 3);
229 }
230
231 #[test]
232 fn test_keep_one_version() {
233 let storage = HotStorage::memory();
234 let table = EntryKind::Multi;
235 let key = b"test_key";
236
237 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
238
239 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
241
242 let versions = extract_dropped_versions(&to_drop);
243 assert_eq!(versions.len(), 4);
244 assert!(!versions.contains(&100)); }
246
247 #[test]
248 fn test_combined_constraints_keep_protects() {
249 let storage = HotStorage::memory();
250 let table = EntryKind::Multi;
251 let key = b"test_key";
252
253 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
255
256 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(15)), Some(3), None).unwrap();
265
266 let versions = extract_dropped_versions(&to_drop);
267 assert_eq!(versions.len(), 2); assert!(versions.contains(&1));
269 assert!(versions.contains(&5));
270 assert!(!versions.contains(&10)); assert!(!versions.contains(&20));
272 assert!(!versions.contains(&100));
273 }
274
275 #[test]
276 fn test_combined_constraints_version_restricts() {
277 let storage = HotStorage::memory();
279 let table = EntryKind::Multi;
280 let key = b"test_key";
281
282 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
284
285 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(3)), Some(2), None).unwrap();
294
295 let versions = extract_dropped_versions(&to_drop);
296 assert_eq!(versions.len(), 1); assert!(versions.contains(&1));
298 }
299
300 #[test]
301 fn test_combined_constraints_both_aggressive() {
302 let storage = HotStorage::memory();
304 let table = EntryKind::Multi;
305 let key = b"test_key";
306
307 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
309
310 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(50)), Some(1), None).unwrap();
319
320 let versions = extract_dropped_versions(&to_drop);
321 assert_eq!(versions.len(), 4); assert!(versions.contains(&1));
323 assert!(versions.contains(&5));
324 assert!(versions.contains(&10));
325 assert!(versions.contains(&20));
326 assert!(!versions.contains(&100)); }
328
329 #[test]
332 fn test_empty_storage() {
333 let storage = HotStorage::memory();
334 let table = EntryKind::Multi;
335 let key = b"nonexistent";
336
337 let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
338 assert!(to_drop.is_empty());
339 }
340
341 #[test]
342 fn test_single_version_drop_all() {
343 let storage = HotStorage::memory();
344 let table = EntryKind::Multi;
345 let key = b"test_key";
346
347 setup_versioned_entries(&storage, table, key, &[42]);
348
349 let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
351 assert_eq!(to_drop.len(), 1);
352 }
353
354 #[test]
355 fn test_single_version_keep_one() {
356 let storage = HotStorage::memory();
357 let table = EntryKind::Multi;
358 let key = b"test_key";
359
360 setup_versioned_entries(&storage, table, key, &[42]);
361
362 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
364 assert!(to_drop.is_empty());
365 }
366
367 #[test]
368 fn test_different_keys_isolated() {
369 let storage = HotStorage::memory();
370 let table = EntryKind::Multi;
371
372 setup_versioned_entries(&storage, table, b"key_a", &[1, 2, 3]);
373 setup_versioned_entries(&storage, table, b"key_b", &[10, 20, 30]);
374
375 let to_drop = find_keys_to_drop(&storage, table, b"key_a", None, None, None).unwrap();
377
378 assert_eq!(to_drop.len(), 3);
379 for entry in &to_drop {
381 assert_eq!(entry.key.as_slice(), b"key_a");
382 }
383 }
384
385 #[test]
386 fn test_up_to_version_zero() {
387 let storage = HotStorage::memory();
389 let table = EntryKind::Multi;
390 let key = b"test_key";
391
392 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
393
394 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(0)), None, None).unwrap();
395
396 assert!(to_drop.is_empty());
397 }
398
399 #[test]
400 fn test_up_to_version_max() {
401 let storage = HotStorage::memory();
403 let table = EntryKind::Multi;
404 let key = b"test_key";
405
406 setup_versioned_entries(&storage, table, key, &[1, 5, u64::MAX - 1]);
407
408 let to_drop =
409 find_keys_to_drop(&storage, table, key, Some(CommitVersion(u64::MAX)), None, None).unwrap();
410
411 assert_eq!(to_drop.len(), 3);
412 }
413}