Skip to main content

reifydb_store_multi/store/
drop.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Drop operation implementation for cleaning up versioned entries.
5//!
6//! The Drop operation completely erases versioned entries from storage without
7//! writing tombstones or generating CDC events. It's used for internal cleanup
8//! operations like maintaining single-version semantics for flow node state.
9
10use reifydb_core::common::CommitVersion;
11use reifydb_type::util::cowvec::CowVec;
12
13use crate::tier::{EntryKind, TierStorage};
14
15/// Information about an entry to be dropped.
16#[derive(Debug, Clone)]
17pub struct DropEntry {
18	/// The logical key to delete
19	pub key: CowVec<u8>,
20	/// The specific version to delete
21	pub version: CommitVersion,
22	/// The size of the value being dropped (for metrics tracking)
23	pub value_bytes: u64,
24}
25
26/// Find versioned keys to drop based on constraints.
27///
28/// # Arguments
29/// - `storage`: The storage backend to scan
30/// - `table`: The table containing the keys
31/// - `key`: The logical key (without version suffix)
32/// - `up_to_version`: If Some(v), candidate versions where version < v
33/// - `keep_last_versions`: If Some(n), protect n most recent versions from being dropped
34/// - `pending_version`: Version being written in the same batch (to avoid race)
35pub(crate) fn find_keys_to_drop<S: TierStorage>(
36	storage: &S,
37	table: EntryKind,
38	key: &[u8],
39	up_to_version: Option<CommitVersion>,
40	keep_last_versions: Option<usize>,
41	pending_version: Option<CommitVersion>,
42) -> crate::Result<Vec<DropEntry>> {
43	// Get all versions of this key directly (bypasses MVCC resolution)
44	let all_versions = storage.get_all_versions(table, key)?;
45
46	// Collect all versions with their value sizes
47	let mut versioned_entries: Vec<(CommitVersion, u64)> = all_versions
48		.into_iter()
49		.map(|(version, value)| {
50			let value_bytes = value.as_ref().map(|v| v.len() as u64).unwrap_or(0);
51			(version, value_bytes)
52		})
53		.collect();
54
55	// Include pending version if provided (version being written in current batch)
56	// This prevents a race where Drop scans storage before Set is written
57	if let Some(pending_ver) = pending_version {
58		// Check if pending version already exists (avoid duplicates)
59		if !versioned_entries.iter().any(|(v, _)| *v == pending_ver) {
60			// Add a placeholder entry for the pending version
61			// value_bytes=0 is fine since this entry will never be dropped (it's the newest)
62			versioned_entries.push((pending_ver, 0));
63		}
64	}
65
66	// Sort by version descending (most recent first) for keep_last_versions logic
67	versioned_entries.sort_by(|a, b| b.0.cmp(&a.0));
68
69	// Determine which entries to drop
70	let mut entries_to_drop = Vec::new();
71	let key_cow = CowVec::new(key.to_vec());
72
73	for (idx, (entry_version, value_bytes)) in versioned_entries.into_iter().enumerate() {
74		// Use AND logic for combined constraints:
75		// - keep_last_versions protects the N most recent versions
76		// - up_to_version only drops versions < threshold IF not protected
77		let should_drop = match (up_to_version, keep_last_versions) {
78			// Both None: drop everything
79			(None, None) => true,
80			// Only version constraint: drop if version < threshold
81			(Some(threshold), None) => entry_version < threshold,
82			// Only keep constraint: drop if beyond keep count
83			(None, Some(keep_count)) => idx >= keep_count,
84			// Both constraints (AND): drop only if BOTH say drop
85			// This ensures keep_last_versions always protects N versions
86			(Some(threshold), Some(keep_count)) => entry_version < threshold && idx >= keep_count,
87		};
88
89		if should_drop {
90			// Never drop the pending version (it's being written in this batch)
91			if Some(entry_version) == pending_version {
92				continue;
93			}
94
95			entries_to_drop.push(DropEntry {
96				key: key_cow.clone(),
97				version: entry_version,
98				value_bytes,
99			});
100		}
101	}
102
103	Ok(entries_to_drop)
104}
105
106#[cfg(test)]
107pub mod tests {
108	use std::collections::HashMap;
109
110	use super::*;
111	use crate::hot::storage::HotStorage;
112
113	/// Create versioned test entries for a key
114	fn setup_versioned_entries(storage: &HotStorage, table: EntryKind, key: &[u8], versions: &[u64]) {
115		for v in versions {
116			let entries = vec![(CowVec::new(key.to_vec()), Some(CowVec::new(vec![*v as u8])))];
117			storage.set(CommitVersion(*v), HashMap::from([(table, entries)])).unwrap();
118		}
119	}
120
121	/// Extract version numbers from the drop entries
122	fn extract_dropped_versions(entries: &[DropEntry]) -> Vec<u64> {
123		entries.iter().map(|e| e.version.0).collect()
124	}
125
126	#[test]
127	fn test_drop_all_versions() {
128		let storage = HotStorage::memory();
129		let table = EntryKind::Multi;
130		let key = b"test_key";
131
132		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
133
134		let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
135
136		assert_eq!(to_drop.len(), 5);
137		let versions = extract_dropped_versions(&to_drop);
138		assert!(versions.contains(&1));
139		assert!(versions.contains(&5));
140		assert!(versions.contains(&10));
141		assert!(versions.contains(&20));
142		assert!(versions.contains(&100));
143	}
144
145	#[test]
146	fn test_drop_up_to_version() {
147		let storage = HotStorage::memory();
148		let table = EntryKind::Multi;
149		let key = b"test_key";
150
151		// Versions: 1, 5, 10, 20, 100
152		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
153
154		// Drop versions < 10 (should drop 1, 5)
155		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
156
157		let versions = extract_dropped_versions(&to_drop);
158		assert_eq!(versions.len(), 2);
159		assert!(versions.contains(&1));
160		assert!(versions.contains(&5));
161		assert!(!versions.contains(&10));
162		assert!(!versions.contains(&20));
163		assert!(!versions.contains(&100));
164	}
165
166	#[test]
167	fn test_drop_up_to_version_boundary() {
168		// Test exact boundary - version == threshold should NOT be dropped
169		let storage = HotStorage::memory();
170		let table = EntryKind::Multi;
171		let key = b"test_key";
172
173		setup_versioned_entries(&storage, table, key, &[9, 10, 11]);
174
175		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(10)), None, None).unwrap();
176
177		let versions = extract_dropped_versions(&to_drop);
178		assert_eq!(versions.len(), 1);
179		assert!(versions.contains(&9)); // Only 9 < 10
180	}
181
182	#[test]
183	fn test_keep_last_n_versions() {
184		let storage = HotStorage::memory();
185		let table = EntryKind::Multi;
186		let key = b"test_key";
187
188		// Versions: 1, 5, 10, 20, 100 (sorted descending: 100, 20, 10, 5, 1)
189		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
190
191		// Keep 2 most recent (100, 20), drop others (10, 5, 1)
192		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(2), None).unwrap();
193
194		let versions = extract_dropped_versions(&to_drop);
195		assert_eq!(versions.len(), 3);
196		assert!(versions.contains(&1));
197		assert!(versions.contains(&5));
198		assert!(versions.contains(&10));
199		assert!(!versions.contains(&20));
200		assert!(!versions.contains(&100));
201	}
202
203	#[test]
204	fn test_keep_more_than_exists() {
205		// Keep 10 but only 3 exist - should drop nothing
206		let storage = HotStorage::memory();
207		let table = EntryKind::Multi;
208		let key = b"test_key";
209
210		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
211
212		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(10), None).unwrap();
213
214		assert!(to_drop.is_empty());
215	}
216
217	#[test]
218	fn test_keep_zero_versions() {
219		// Keep 0 = drop all
220		let storage = HotStorage::memory();
221		let table = EntryKind::Multi;
222		let key = b"test_key";
223
224		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
225
226		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(0), None).unwrap();
227
228		assert_eq!(to_drop.len(), 3);
229	}
230
231	#[test]
232	fn test_keep_one_version() {
233		let storage = HotStorage::memory();
234		let table = EntryKind::Multi;
235		let key = b"test_key";
236
237		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
238
239		// Keep only most recent (100)
240		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
241
242		let versions = extract_dropped_versions(&to_drop);
243		assert_eq!(versions.len(), 4);
244		assert!(!versions.contains(&100)); // Most recent kept
245	}
246
247	#[test]
248	fn test_combined_constraints_keep_protects() {
249		let storage = HotStorage::memory();
250		let table = EntryKind::Multi;
251		let key = b"test_key";
252
253		// Versions: 1, 5, 10, 20, 100 (sorted desc: 100, 20, 10, 5, 1)
254		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
255
256		// up_to_version=15 would drop: 1, 5, 10 (all < 15)
257		// keep_last_versions=3 protects: 100, 20, 10 (indices 0, 1, 2)
258		// Combined (AND logic): drop only if (version < 15) AND (idx >= 3)
259		// - 100: idx=0, 100 >= 15 → KEEP
260		// - 20: idx=1, 20 >= 15 → KEEP
261		// - 10: idx=2, 10 < 15 BUT idx < 3 → KEEP (protected!)
262		// - 5: idx=3, 5 < 15 AND idx >= 3 → DROP
263		// - 1: idx=4, 1 < 15 AND idx >= 3 → DROP
264		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(15)), Some(3), None).unwrap();
265
266		let versions = extract_dropped_versions(&to_drop);
267		assert_eq!(versions.len(), 2); // Only 1 and 5 dropped
268		assert!(versions.contains(&1));
269		assert!(versions.contains(&5));
270		assert!(!versions.contains(&10)); // Protected by keep_last=3
271		assert!(!versions.contains(&20));
272		assert!(!versions.contains(&100));
273	}
274
275	#[test]
276	fn test_combined_constraints_version_restricts() {
277		// Test case where up_to_version is more restrictive than keep_last
278		let storage = HotStorage::memory();
279		let table = EntryKind::Multi;
280		let key = b"test_key";
281
282		// Versions: 1, 5, 10, 20, 100 (sorted desc: 100, 20, 10, 5, 1)
283		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
284
285		// up_to_version=3 would drop: only 1 (1 < 3)
286		// keep_last_versions=2 protects: 100, 20 (indices 0, 1)
287		// Combined (AND logic): drop only if (version < 3) AND (idx >= 2)
288		// - 100: idx=0 → KEEP (protected)
289		// - 20: idx=1 → KEEP (protected)
290		// - 10: idx=2, 10 >= 3 → KEEP (version constraint not met)
291		// - 5: idx=3, 5 >= 3 → KEEP (version constraint not met)
292		// - 1: idx=4, 1 < 3 AND idx >= 2 → DROP
293		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(3)), Some(2), None).unwrap();
294
295		let versions = extract_dropped_versions(&to_drop);
296		assert_eq!(versions.len(), 1); // Only 1 dropped
297		assert!(versions.contains(&1));
298	}
299
300	#[test]
301	fn test_combined_constraints_both_aggressive() {
302		// Both constraints are aggressive
303		let storage = HotStorage::memory();
304		let table = EntryKind::Multi;
305		let key = b"test_key";
306
307		// Versions: 1, 5, 10, 20, 100 (sorted desc: 100, 20, 10, 5, 1)
308		setup_versioned_entries(&storage, table, key, &[1, 5, 10, 20, 100]);
309
310		// up_to_version=50 would drop: 1, 5, 10, 20 (all < 50)
311		// keep_last_versions=1 protects: only 100 (index 0)
312		// Combined (AND logic): drop only if (version < 50) AND (idx >= 1)
313		// - 100: idx=0 → KEEP (protected)
314		// - 20: idx=1, 20 < 50 AND idx >= 1 → DROP
315		// - 10: idx=2, 10 < 50 AND idx >= 1 → DROP
316		// - 5: idx=3, 5 < 50 AND idx >= 1 → DROP
317		// - 1: idx=4, 1 < 50 AND idx >= 1 → DROP
318		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(50)), Some(1), None).unwrap();
319
320		let versions = extract_dropped_versions(&to_drop);
321		assert_eq!(versions.len(), 4); // All except 100
322		assert!(versions.contains(&1));
323		assert!(versions.contains(&5));
324		assert!(versions.contains(&10));
325		assert!(versions.contains(&20));
326		assert!(!versions.contains(&100)); // Protected
327	}
328
329	// ==================== Edge cases ====================
330
331	#[test]
332	fn test_empty_storage() {
333		let storage = HotStorage::memory();
334		let table = EntryKind::Multi;
335		let key = b"nonexistent";
336
337		let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
338		assert!(to_drop.is_empty());
339	}
340
341	#[test]
342	fn test_single_version_drop_all() {
343		let storage = HotStorage::memory();
344		let table = EntryKind::Multi;
345		let key = b"test_key";
346
347		setup_versioned_entries(&storage, table, key, &[42]);
348
349		// Drop all
350		let to_drop = find_keys_to_drop(&storage, table, key, None, None, None).unwrap();
351		assert_eq!(to_drop.len(), 1);
352	}
353
354	#[test]
355	fn test_single_version_keep_one() {
356		let storage = HotStorage::memory();
357		let table = EntryKind::Multi;
358		let key = b"test_key";
359
360		setup_versioned_entries(&storage, table, key, &[42]);
361
362		// Keep 1 - should drop nothing
363		let to_drop = find_keys_to_drop(&storage, table, key, None, Some(1), None).unwrap();
364		assert!(to_drop.is_empty());
365	}
366
367	#[test]
368	fn test_different_keys_isolated() {
369		let storage = HotStorage::memory();
370		let table = EntryKind::Multi;
371
372		setup_versioned_entries(&storage, table, b"key_a", &[1, 2, 3]);
373		setup_versioned_entries(&storage, table, b"key_b", &[10, 20, 30]);
374
375		// Drop all versions of key_a
376		let to_drop = find_keys_to_drop(&storage, table, b"key_a", None, None, None).unwrap();
377
378		assert_eq!(to_drop.len(), 3);
379		// Verify all dropped keys are for key_a, not key_b
380		for entry in &to_drop {
381			assert_eq!(entry.key.as_slice(), b"key_a");
382		}
383	}
384
385	#[test]
386	fn test_up_to_version_zero() {
387		// up_to_version=0 means drop nothing (no versions < 0)
388		let storage = HotStorage::memory();
389		let table = EntryKind::Multi;
390		let key = b"test_key";
391
392		setup_versioned_entries(&storage, table, key, &[1, 5, 10]);
393
394		let to_drop = find_keys_to_drop(&storage, table, key, Some(CommitVersion(0)), None, None).unwrap();
395
396		assert!(to_drop.is_empty());
397	}
398
399	#[test]
400	fn test_up_to_version_max() {
401		// up_to_version=MAX means drop all (all versions < MAX)
402		let storage = HotStorage::memory();
403		let table = EntryKind::Multi;
404		let key = b"test_key";
405
406		setup_versioned_entries(&storage, table, key, &[1, 5, u64::MAX - 1]);
407
408		let to_drop =
409			find_keys_to_drop(&storage, table, key, Some(CommitVersion(u64::MAX)), None, None).unwrap();
410
411		assert_eq!(to_drop.len(), 3);
412	}
413}