1use reifydb_core::common::CommitVersion;
11use reifydb_type::util::cowvec::CowVec;
12
13use crate::{
14 Result,
15 tier::{EntryKind, TierStorage},
16};
17
18#[derive(Debug, Clone)]
20pub struct DropEntry {
21 pub key: CowVec<u8>,
23 pub version: CommitVersion,
25 pub value_bytes: u64,
27}
28
29pub(crate) fn find_keys_to_drop<S: TierStorage>(
39 storage: &S,
40 table: EntryKind,
41 key: &[u8],
42 up_to_version: Option<CommitVersion>,
43 keep_last_versions: Option<usize>,
44 pending_version: Option<CommitVersion>,
45) -> Result<Vec<DropEntry>> {
46 let all_versions = storage.get_all_versions(table, key)?;
48
49 let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
51 .into_iter()
52 .map(|(version, value)| {
53 let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
54 (version, value_bytes)
55 })
56 .collect();
57
58 if let Some(pending_ver) = pending_version {
61 if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
63 versioned_entries.push((pending_ver, 0));
66 }
67 }
68
69 versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
71
72 let mut entries_to_drop = Vec::new();
74 let key_cow = CowVec::new(key.to_vec());
75
76 for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
77 let should_drop = match (up_to_version, keep_last_versions) {
81 (None, None) => true,
83 (Some(threshold), None) => entry_version < threshold,
85 (None, Some(keep_count)) => idx >= keep_count,
87 (Some(threshold), Some(keep_count)) => entry_version < threshold && idx >= keep_count,
90 };
91
92 if should_drop {
93 if Some(entry_version) == pending_version {
95 continue;
96 }
97
98 entries_to_drop.push(DropEntry {
99 key: key_cow.clone(),
100 version: entry_version,
101 value_bytes,
102 });
103 }
104 }
105
106 Ok(entries_to_drop)
107}
108
109#[cfg(test)]
110pub mod tests {
111 use std::collections::HashMap;
112
113 use super::*;
114 use crate::hot::storage::HotStorage;
115
116 fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
118 for v in versions {
119 let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
120 storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
121 }
122 }
123
124 fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
126 entries.iter().map(|e| e.version.0).collect()
127 }
128
129 #[test]
130 fn test_drop_all_versions() {
131 let storage = HotStorage::memory();
132 let table = EntryKind::Multi;
133 let key = b"test_key";
134
135 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
136
137 let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
138
139 assert_eq!(to_drop.len(), 5);
140 let versions = extract_dropped_versions(&to_drop);
141 assert!(versions.contains(&1));
142 assert!(versions.contains(&5));
143 assert!(versions.contains(&10));
144 assert!(versions.contains(&20));
145 assert!(versions.contains(&100));
146 }
147
148 #[test]
149 fn test_drop_up_to_version() {
150 let storage = HotStorage::memory();
151 let table = EntryKind::Multi;
152 let key = b"test_key";
153
154 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
156
157 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
159
160 let versions = extract_dropped_versions(&to_drop);
161 assert_eq!(versions.len(), 2);
162 assert!(versions.contains(&1));
163 assert!(versions.contains(&5));
164 assert!(!versions.contains(&10));
165 assert!(!versions.contains(&20));
166 assert!(!versions.contains(&100));
167 }
168
169 #[test]
170 fn test_drop_up_to_version_boundary() {
171 let storage = HotStorage::memory();
173 let table = EntryKind::Multi;
174 let key = b"test_key";
175
176 setup_versioned_entries(&storage, table, key, &[9, 10, 11]);
177
178 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
179
180 let versions = extract_dropped_versions(&to_drop);
181 assert_eq!(versions.len(), 1);
182 assert!(versions.contains(&9)); }
184
185 #[test]
186 fn test_keep_last_n_versions() {
187 let storage = HotStorage::memory();
188 let table = EntryKind::Multi;
189 let key = b"test_key";
190
191 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
193
194 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(2), None).unwrap();
196
197 let versions = extract_dropped_versions(&to_drop);
198 assert_eq!(versions.len(), 3);
199 assert!(versions.contains(&1));
200 assert!(versions.contains(&5));
201 assert!(versions.contains(&10));
202 assert!(!versions.contains(&20));
203 assert!(!versions.contains(&100));
204 }
205
206 #[test]
207 fn test_keep_more_than_exists() {
208 let storage = HotStorage::memory();
210 let table = EntryKind::Multi;
211 let key = b"test_key";
212
213 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
214
215 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(10), None).unwrap();
216
217 assert!(to_drop.is_empty());
218 }
219
220 #[test]
221 fn test_keep_zero_versions() {
222 let storage = HotStorage::memory();
224 let table = EntryKind::Multi;
225 let key = b"test_key";
226
227 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
228
229 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(0), None).unwrap();
230
231 assert_eq!(to_drop.len(), 3);
232 }
233
234 #[test]
235 fn test_keep_one_version() {
236 let storage = HotStorage::memory();
237 let table = EntryKind::Multi;
238 let key = b"test_key";
239
240 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
241
242 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
244
245 let versions = extract_dropped_versions(&to_drop);
246 assert_eq!(versions.len(), 4);
247 assert!(!versions.contains(&100)); }
249
250 #[test]
251 fn test_combined_constraints_keep_protects() {
252 let storage = HotStorage::memory();
253 let table = EntryKind::Multi;
254 let key = b"test_key";
255
256 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
258
259 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(15)), Some(3), None).unwrap();
268
269 let versions = extract_dropped_versions(&to_drop);
270 assert_eq!(versions.len(), 2); assert!(versions.contains(&1));
272 assert!(versions.contains(&5));
273 assert!(!versions.contains(&10)); assert!(!versions.contains(&20));
275 assert!(!versions.contains(&100));
276 }
277
278 #[test]
279 fn test_combined_constraints_version_restricts() {
280 let storage = HotStorage::memory();
282 let table = EntryKind::Multi;
283 let key = b"test_key";
284
285 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
287
288 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(3)), Some(2), None).unwrap();
297
298 let versions = extract_dropped_versions(&to_drop);
299 assert_eq!(versions.len(), 1); assert!(versions.contains(&1));
301 }
302
303 #[test]
304 fn test_combined_constraints_both_aggressive() {
305 let storage = HotStorage::memory();
307 let table = EntryKind::Multi;
308 let key = b"test_key";
309
310 setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
312
313 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(50)), Some(1), None).unwrap();
322
323 let versions = extract_dropped_versions(&to_drop);
324 assert_eq!(versions.len(), 4); assert!(versions.contains(&1));
326 assert!(versions.contains(&5));
327 assert!(versions.contains(&10));
328 assert!(versions.contains(&20));
329 assert!(!versions.contains(&100)); }
331
332 #[test]
335 fn test_empty_storage() {
336 let storage = HotStorage::memory();
337 let table = EntryKind::Multi;
338 let key = b"nonexistent";
339
340 let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
341 assert!(to_drop.is_empty());
342 }
343
344 #[test]
345 fn test_single_version_drop_all() {
346 let storage = HotStorage::memory();
347 let table = EntryKind::Multi;
348 let key = b"test_key";
349
350 setup_versioned_entries(&storage, table, key, &[42]);
351
352 let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
354 assert_eq!(to_drop.len(), 1);
355 }
356
357 #[test]
358 fn test_single_version_keep_one() {
359 let storage = HotStorage::memory();
360 let table = EntryKind::Multi;
361 let key = b"test_key";
362
363 setup_versioned_entries(&storage, table, key, &[42]);
364
365 let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
367 assert!(to_drop.is_empty());
368 }
369
370 #[test]
371 fn test_different_keys_isolated() {
372 let storage = HotStorage::memory();
373 let table = EntryKind::Multi;
374
375 setup_versioned_entries(&storage, table, b"key_a", &[1, 2, 3]);
376 setup_versioned_entries(&storage, table, b"key_b", &[10, 20, 30]);
377
378 let to_drop = find_keys_to_drop(&storage, table, b"key_a", None, None, None).unwrap();
380
381 assert_eq!(to_drop.len(), 3);
382 for entry in &to_drop {
384 assert_eq!(entry.key.as_slice(), b"key_a");
385 }
386 }
387
388 #[test]
389 fn test_up_to_version_zero() {
390 let storage = HotStorage::memory();
392 let table = EntryKind::Multi;
393 let key = b"test_key";
394
395 setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
396
397 let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(0)), None, None).unwrap();
398
399 assert!(to_drop.is_empty());
400 }
401
402 #[test]
403 fn test_up_to_version_max() {
404 let storage = HotStorage::memory();
406 let table = EntryKind::Multi;
407 let key = b"test_key";
408
409 setup_versioned_entries(&storage, table, key, &[1, 5, u64::MAX - 1]);
410
411 let to_drop =
412 find_keys_to_drop(&storage, table, key, Some(CommitVersion(u64::MAX)), None, None).unwrap();
413
414 assert_eq!(to_drop.len(), 3);
415 }
416}